diff --git a/CMakeLists.txt b/CMakeLists.txt index 029ae198..c51b3248 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,18 +32,28 @@ add_subdirectory(3rdparty) include(cmake/opencv_config.cmake) -if (NOT WIN32) - find_package(OpenMP REQUIRED) -endif() - -if (NOT WIN32) - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -Werror") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Werror") +find_package(OpenMP) + +if(OpenMP_FOUND) + message(STATUS "OpenMP found - enabling parallel support") + add_definitions(-DHAS_OPENMP) + if(TARGET OpenMP::OpenMP_CXX) + set(OPENMP_TARGET OpenMP::OpenMP_CXX) + message(STATUS "Using OpenMP target: OpenMP::OpenMP_CXX") + else() + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}") + if(OpenMP_CXX_LIBRARIES) + set(OPENMP_LIBRARIES ${OpenMP_CXX_LIBRARIES}) + endif() + message(STATUS "OpenMP CXX flags: ${OpenMP_CXX_FLAGS}") + message(STATUS "OpenMP libraries: ${OpenMP_CXX_LIBRARIES}") + endif() else() - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /W4 /wd4996 /wd4190 /wd4189 /WX") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4 /wd4996 /wd4190 /wd4189 /WX") + message(STATUS "OpenMP not found - parallel features disabled") endif() + foreach(CONFIG "" _DEBUG _RELEASE) set("CMAKE_ARCHIVE_OUTPUT_DIRECTORY${CONFIG}" "${CMAKE_BINARY_DIR}/lib") set("CMAKE_LIBRARY_OUTPUT_DIRECTORY${CONFIG}" "${CMAKE_BINARY_DIR}/lib") diff --git a/include/layers/EWLayer.hpp b/include/layers/EWLayer.hpp index 7361689c..22a2fd25 100644 --- a/include/layers/EWLayer.hpp +++ b/include/layers/EWLayer.hpp @@ -46,7 +46,7 @@ class EWLayerImpl : public LayerImpl { public: EWLayerImpl() = delete; EWLayerImpl(const Shape& shape, std::string function, float alpha = 0.0F, - float beta = 0.0F); + float beta = 0.0F, int type_parall = 0); EWLayerImpl(const EWLayerImpl& c) = default; EWLayerImpl& operator=(const EWLayerImpl& c) = default; std::vector run( @@ -56,57 +56,83 @@ class EWLayerImpl : public LayerImpl { std::string func_; float alpha_; float beta_; + int type_parall_; }; template EWLayerImpl::EWLayerImpl(const Shape& shape, std::string function, - float alpha, float beta) + float alpha, float beta, int type_parall) : LayerImpl(shape, shape), func_(std::move(function)), alpha_(alpha), - beta_(beta) {} + beta_(beta), + type_parall_(type_parall) {} template std::vector EWLayerImpl::run( const std::vector& input) const { std::vector res(this->outputShape_.count()); + int available_threads = -1; + if (type_parall_ == 0) available_threads = 1; + if (type_parall_ == 1) + available_threads = std::thread::hardware_concurrency(); + if (type_parall_ == 2) + available_threads = oneapi::tbb::info::default_concurrency(); + if (type_parall_ == 3) available_threads = omp_get_max_threads(); + if (func_ == "relu") { - std::transform(input.begin(), input.end(), res.begin(), relu); + parallel_for( + input.size(), + [&](int i) { + res[i] = input[i] > ValueType(0) ? input[i] : ValueType(0); + }, + type_parall_); } else if (func_ == "tanh") { - auto tanh = [&](const ValueType& value) -> ValueType { - return static_cast(std::tanh(value)); - }; - std::transform(input.begin(), input.end(), res.begin(), tanh); + parallel_for( + input.size(), + [&](int i) { res[i] = static_cast(std::tanh(input[i])); }, + type_parall_); } else if (func_ == "sin") { - auto sin = [&](const ValueType& value) -> ValueType { - return static_cast(std::sin(value)); - }; - std::transform(input.begin(), input.end(), res.begin(), sin); + parallel_for( + input.size(), + [&](int i) { res[i] = static_cast(std::sin(input[i])); }, + type_parall_); } else if (func_ == "minus") { - auto minus = [&](const ValueType& value) -> ValueType { return -value; }; - std::transform(input.begin(), input.end(), res.begin(), minus); + parallel_for( + input.size(), [&](int i) { res[i] = -input[i]; }, type_parall_); } else if (func_ == "linear") { - auto linear = [&](const ValueType& value) -> ValueType { - return value * static_cast(alpha_) + - static_cast(beta_); - }; - std::transform(input.begin(), input.end(), res.begin(), linear); + parallel_for( + input.size(), + [&](int i) { + res[i] = input[i] * static_cast(alpha_) + + static_cast(beta_); + }, + type_parall_); } else if (func_ == "sigmoid") { - auto sigmoid = [](ValueType x) -> ValueType { - if constexpr (std::is_integral_v) { - auto x_float = static_cast(x); - float result = 1.0F / (1.0F + std::exp(-x_float)); - return static_cast(std::round(result)); - } else { - if (x >= ValueType(0)) { - ValueType z = std::exp(-x); - return ValueType(1) / (ValueType(1) + z); - } - ValueType z = std::exp(x); - return z / (ValueType(1) + z); - } - }; - std::transform(input.cbegin(), input.cend(), res.begin(), sigmoid); + if constexpr (std::is_integral_v) { + parallel_for( + input.size(), + [&](int i) { + auto x_float = static_cast(input[i]); + float result = 1.0F / (1.0F + std::exp(-x_float)); + res[i] = static_cast(std::round(result)); + }, + type_parall_); + } else { + parallel_for( + input.size(), + [&](int i) { + ValueType x = input[i]; + if (x >= ValueType(0)) { + ValueType z = std::exp(-x); + res[i] = ValueType(1) / (ValueType(1) + z); + } else { + ValueType z = std::exp(x); + res[i] = z / (ValueType(1) + z); + } + }, + type_parall_); + } } else { throw std::invalid_argument("No such function for EWLayer"); } diff --git a/include/layers/Layer.hpp b/include/layers/Layer.hpp index 2da4e0a5..fc665846 100644 --- a/include/layers/Layer.hpp +++ b/include/layers/Layer.hpp @@ -1,5 +1,11 @@ #pragma once +#include + +#include +#include +#include #include +#include #include #include #include @@ -49,6 +55,7 @@ class Layer { PostOperations postops; int getID() const { return id_; } void setID(int id) { id_ = id; } + void setTypeParall(int type) { type_parall_ = type; } LayerType getName() const { return type_; } virtual void run(const std::vector& input, std::vector& output) = 0; @@ -59,6 +66,7 @@ class Layer { protected: int id_ = 0; LayerType type_; + int type_parall_; }; template @@ -83,4 +91,125 @@ class LayerImpl { Shape outputShape_; }; +template +inline void parallel_for(int count, Func func, int mode = 0) { + static bool stl_available = true; + static bool tbb_available = true; + static bool omp_available = true; + const int MIN_CHUNK_SIZE = 1000; + if (count < MIN_CHUNK_SIZE) { + mode = 0; + } + + switch (mode) { + case 0: // Sequential + { + for (int i = 0; i < count; ++i) { + func(i); + } + break; + } + + case 1: // STL + { + if (stl_available) { + try { + int num_threads = + static_cast(std::thread::hardware_concurrency()); + if (num_threads == 0) num_threads = 4; + + int min_chunk_size = std::max(1000, count / (num_threads * 4)); + if (count / num_threads < min_chunk_size) { + num_threads = std::max(1, count / min_chunk_size); + } + + std::vector threads; + threads.reserve(num_threads); + + int chunk_size = count / num_threads; + int remainder = count % num_threads; + + int start = 0; + for (int t = 0; t < num_threads; ++t) { + int end = start + chunk_size + (t < remainder ? 1 : 0); + if (start >= end) break; + + threads.emplace_back([start, end, &func]() { + for (int i = start; i < end; ++i) { + func(i); + } + }); + + start = end; + } + + for (auto& thread : threads) { + thread.join(); + } + + } catch (const std::exception& e) { + std::cout << "Thread execution failed: " << e.what() + << ". Falling back to sequential.\n"; + stl_available = false; + for (int i = 0; i < count; ++i) func(i); + } + } else { + for (int i = 0; i < count; ++i) func(i); + } + break; + } + + case 2: // Intel TBB + { + if (tbb_available) { + try { + oneapi::tbb::parallel_for( + oneapi::tbb::blocked_range(0, count), + [&](const oneapi::tbb::blocked_range& range) { + for (int i = range.begin(); i < range.end(); ++i) { + func(i); + } + }, + oneapi::tbb::auto_partitioner()); + } catch (const std::exception& e) { + std::cout << "TBB execution failed: " << e.what() + << ". Falling back to sequential.\n"; + tbb_available = false; + for (int i = 0; i < count; ++i) func(i); + } + } else { + for (int i = 0; i < count; ++i) func(i); + } + break; + } + + case 3: // OpenMP + { + if (omp_available) { + try { + int num_threads = omp_get_max_threads(); + + int chunk_size = std::max(1000, count / (num_threads * 8)); + +#pragma omp parallel for schedule(static, chunk_size) num_threads(num_threads) + for (int i = 0; i < count; ++i) { + func(i); + } + + } catch (...) { + std::cout << "OpenMP execution failed. Falling back to sequential.\n"; + omp_available = false; + for (int i = 0; i < count; ++i) func(i); + } + } else { + for (int i = 0; i < count; ++i) func(i); + } + break; + } + + default: + for (int i = 0; i < count; ++i) func(i); + } +} + } // namespace it_lab_ai diff --git a/src/layers/CMakeLists.txt b/src/layers/CMakeLists.txt index f8ac6d84..7f22b872 100644 --- a/src/layers/CMakeLists.txt +++ b/src/layers/CMakeLists.txt @@ -1,4 +1,7 @@ file(GLOB_RECURSE layers_src *.cpp) add_library(layers_lib STATIC "${LAYERS_HEADERS}" "${layers_src}") target_link_libraries(layers_lib PUBLIC TBB_unified) +# if(OpenMP_FOUND) +# target_link_libraries(layers_lib PUBLIC OpenMP::OpenMP_CXX) +# endif() target_link_libraries(layers_lib PUBLIC dnnl) diff --git a/src/layers/EWLayer.cpp b/src/layers/EWLayer.cpp index dc86b381..72c9f6c8 100644 --- a/src/layers/EWLayer.cpp +++ b/src/layers/EWLayer.cpp @@ -9,13 +9,20 @@ void EWLayer::run(const std::vector& input, } switch (input[0].get_type()) { case Type::kInt: { - EWLayerImpl used_impl(input[0].get_shape(), func_, alpha_, beta_); - output[0] = - make_tensor(used_impl.run(*input[0].as()), input[0].get_shape()); + EWLayerImpl used_impl(input[0].get_shape(), func_, alpha_, beta_, + type_parall_); + std::vector tmp = used_impl.run(*input[0].as()); + auto start = std::chrono::high_resolution_clock::now(); + output[0] = make_tensor(tmp, input[0].get_shape()); + auto end = std::chrono::high_resolution_clock::now(); + auto total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; break; } case Type::kFloat: { - EWLayerImpl used_impl(input[0].get_shape(), func_, alpha_, beta_); + EWLayerImpl used_impl(input[0].get_shape(), func_, alpha_, beta_, + type_parall_); output[0] = make_tensor(used_impl.run(*input[0].as()), input[0].get_shape()); break; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index fc95325c..0e91e931 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,7 +1,7 @@ file(GLOB_RECURSE TEST_SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) add_executable(run_test ${TEST_SRC_FILES}) -if (NOT WIN32) +if(OpenMP_FOUND) target_link_libraries(run_test PUBLIC OpenMP::OpenMP_CXX) endif() target_link_libraries(run_test PUBLIC perf_lib layers_lib layers_oneDNN_lib) diff --git a/test/single_layer/test_ewlayer.cpp b/test/single_layer/test_ewlayer.cpp index 65547b2a..fc63c658 100644 --- a/test/single_layer/test_ewlayer.cpp +++ b/test/single_layer/test_ewlayer.cpp @@ -215,3 +215,199 @@ TEST(ewlayer, new_ewlayer_can_sigmoid_float_extreme_values) { EXPECT_NEAR((*out[0].as())[i], expected_output[i], 1e-5F); } } + +TEST(ewlayer, parallel_for_ew) { + EWLayer layer0("relu"); + layer0.setTypeParall(0); + EWLayer layer1("relu"); + layer1.setTypeParall(1); + EWLayer layer2("relu"); + layer2.setTypeParall(2); + EWLayer layer3("relu"); + layer3.setTypeParall(3); + std::vector vec(800000000, -1); + Tensor input = make_tensor(vec); + Tensor output; + std::vector in{input}; + std::vector out{output}; + + auto start = std::chrono::high_resolution_clock::now(); + layer0.run(in, out); + auto end = std::chrono::high_resolution_clock::now(); + auto total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (size_t i = 0; i < 800000000; i++) { + EXPECT_EQ((*out[0].as())[i], 0); + } + + start = std::chrono::high_resolution_clock::now(); + layer1.run(in, out); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (size_t i = 0; i < 800000000; i++) { + EXPECT_EQ((*out[0].as())[i], 0); + } + + start = std::chrono::high_resolution_clock::now(); + layer2.run(in, out); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (size_t i = 0; i < 800000000; i++) { + EXPECT_EQ((*out[0].as())[i], 0); + } + + start = std::chrono::high_resolution_clock::now(); + layer3.run(in, out); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (size_t i = 0; i < 800000000; i++) { + EXPECT_EQ((*out[0].as())[i], 0); + } +} + +TEST(ewlayer, parallel_for_ew_sigmoid) { + EWLayer layer0("sigmoid"); + layer0.setTypeParall(0); + EWLayer layer1("sigmoid"); + layer1.setTypeParall(1); + EWLayer layer2("sigmoid"); + layer2.setTypeParall(2); + EWLayer layer3("sigmoid"); + layer3.setTypeParall(3); + std::vector vec(800000000, -1); + Tensor input = make_tensor(vec); + Tensor output; + std::vector in{input}; + std::vector out{output}; + + auto start = std::chrono::high_resolution_clock::now(); + layer0.run(in, out); + auto end = std::chrono::high_resolution_clock::now(); + auto total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + + start = std::chrono::high_resolution_clock::now(); + layer1.run(in, out); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + + start = std::chrono::high_resolution_clock::now(); + layer2.run(in, out); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + + start = std::chrono::high_resolution_clock::now(); + layer3.run(in, out); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + + EXPECT_EQ(0, 0); +} + +TEST(ewlayer, parallel_for_) { + const int SIZE = 20000; + std::vector matrix1(SIZE * SIZE); + std::vector matrix2(SIZE * SIZE); + std::vector result(SIZE * SIZE); + + for (int i = 0; i < SIZE * SIZE; ++i) { + matrix1[i] = 1; + matrix2[i] = 1; + } + + auto start = std::chrono::high_resolution_clock::now(); + parallel_for( + SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + matrix2[i]; }, 0); + + auto end = std::chrono::high_resolution_clock::now(); + auto total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); + + start = std::chrono::high_resolution_clock::now(); + parallel_for( + SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + matrix2[i]; }, 1); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); + + start = std::chrono::high_resolution_clock::now(); + parallel_for( + SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + matrix2[i]; }, 2); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); + + start = std::chrono::high_resolution_clock::now(); + parallel_for( + SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + matrix2[i]; }, 3); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); +} + +TEST(ewlayer, parallel_for_notmatrix) { + const int SIZE = 30000; + std::vector matrix1(SIZE * SIZE); + std::vector result(SIZE * SIZE); + + for (int i = 0; i < SIZE * SIZE; ++i) { + matrix1[i] = 1; + } + + auto start = std::chrono::high_resolution_clock::now(); + parallel_for(SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + 1; }, 0); + + auto end = std::chrono::high_resolution_clock::now(); + auto total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); + + start = std::chrono::high_resolution_clock::now(); + parallel_for(SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + 1; }, 1); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); + + start = std::chrono::high_resolution_clock::now(); + parallel_for(SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + 1; }, 2); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); + + start = std::chrono::high_resolution_clock::now(); + parallel_for(SIZE * SIZE, [&](int i) { result[i] = matrix1[i] + 1; }, 3); + end = std::chrono::high_resolution_clock::now(); + total_duration = + std::chrono::duration_cast(end - start); + std::cout << total_duration.count() << std::endl; + for (int i = 0; i < SIZE * SIZE; i++) ASSERT_EQ(result[i], 2); +}