From 11bd32c5cdebf02b3808ce3ced094124af85a333 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 10 Jan 2025 10:26:14 -0800 Subject: [PATCH 01/11] initial commit --- CMakeLists.txt | 1 + stress/CMakeLists.txt | 3 + stress/common/CMakeLists.txt | 11 +++ stress/common/stress.cc | 124 +++++++++++++++++++++++++++++ stress/common/stress.h | 58 ++++++++++++++ stress/metrics/CMakeLists.txt | 27 +++++++ stress/metrics/metrics.cc | 145 ++++++++++++++++++++++++++++++++++ 7 files changed, 369 insertions(+) create mode 100644 stress/CMakeLists.txt create mode 100644 stress/common/CMakeLists.txt create mode 100644 stress/common/stress.cc create mode 100644 stress/common/stress.h create mode 100644 stress/metrics/CMakeLists.txt create mode 100644 stress/metrics/metrics.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 408ee43046..c5e99d96d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -745,6 +745,7 @@ if(NOT WITH_API_ONLY) if(WITH_FUNC_TESTS) add_subdirectory(functional) endif() + add_subdirectory(stress) endif() include(cmake/opentelemetry-build-external-component.cmake) diff --git a/stress/CMakeLists.txt b/stress/CMakeLists.txt new file mode 100644 index 0000000000..7528f3ccd1 --- /dev/null +++ b/stress/CMakeLists.txt @@ -0,0 +1,3 @@ +# Add subdirectories for common and metrics components +add_subdirectory(common) +add_subdirectory(metrics) \ No newline at end of file diff --git a/stress/common/CMakeLists.txt b/stress/common/CMakeLists.txt new file mode 100644 index 0000000000..48aee9792d --- /dev/null +++ b/stress/common/CMakeLists.txt @@ -0,0 +1,11 @@ +add_library(stress STATIC stress.cc) + +# Include directory for the throughput library +target_include_directories(stress PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) + +# Set C++ standard +set_target_properties( + stress + PROPERTIES CXX_STANDARD 17 + CXX_STANDARD_REQUIRED YES + CXX_EXTENSIONS NO) \ No newline at end of file diff --git a/stress/common/stress.cc b/stress/common/stress.cc new file mode 100644 index 0000000000..4e08ea3d4b --- /dev/null +++ b/stress/common/stress.cc @@ -0,0 +1,124 @@ +#include "stress.h" + +// Global flags +std::atomic STOP(false); // Global flag to stop the stress test when signaled (e.g., via Ctrl+C) +std::atomic READY(false); // Global flag to synchronize thread start + + +// StressTest constructor +Stress::Stress(std::function func, size_t numThreads) + : func_(std::move(func)), stats_(numThreads), numThreads_(numThreads) {} + +// Main function to start the stress test +void Stress::run() { + std::cout << "Starting stress test with " << numThreads_ << " threads...\n"; + auto startTime = std::chrono::steady_clock::now(); + + READY.store(false, std::memory_order_release); + + std::thread controllerThread(&Stress::monitorThroughput, this); + + threads_.reserve(numThreads_); + for (size_t i = 0; i < numThreads_; ++i) { + threads_.emplace_back(&Stress::workerThread, this, i); + } + + READY.store(true, std::memory_order_release); + + for (auto& thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } + + if (controllerThread.joinable()) { + controllerThread.join(); + } + + auto endTime = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + uint64_t totalCount = 0; + for (const auto& stat : stats_) { + totalCount += stat.count.load(std::memory_order_relaxed); + } + + std::cout << "\nTest completed:\n" + << "Total iterations: " << formatNumber(totalCount) << "\n" + << "Duration: " << duration.count() << " seconds\n" + << "Average throughput: " << formatNumber(totalCount / duration.count()) + << " iterations/sec\n"; +} + +// Worker thread function +void Stress::workerThread(size_t threadIndex) { +#ifdef __linux__ + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(threadIndex % std::thread::hardware_concurrency(), &cpuset); + pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); +#endif + + while (!STOP.load(std::memory_order_acquire)) { + func_(); + stats_[threadIndex].count.fetch_add(1, std::memory_order_relaxed); + } +} + +// Monitoring thread function +void Stress::monitorThroughput() { + uint64_t lastTotalCount = 0; + auto lastTime = std::chrono::steady_clock::now(); + std::vector throughputHistory; + + while (!STOP.load(std::memory_order_acquire)) { + std::this_thread::sleep_for(std::chrono::seconds(SLIDING_WINDOW_SIZE)); + + auto currentTime = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(currentTime - lastTime).count(); + + uint64_t totalCount = 0; + for (const auto& stat : stats_) { + totalCount += stat.count.load(std::memory_order_relaxed); + } + + uint64_t currentCount = totalCount - lastTotalCount; + lastTotalCount = totalCount; + lastTime = currentTime; + + if (elapsed > 0) { + uint64_t throughput = currentCount / elapsed; + throughputHistory.push_back(throughput); + + double avg = 0; + uint64_t min = throughput; + uint64_t max = throughput; + + for (uint64_t t : throughputHistory) { + avg += t; + min = std::min(min, t); + max = std::max(max, t); + } + avg /= throughputHistory.size(); + + std::cout << "\rThroughput: " << formatNumber(throughput) + << " it/s | Avg: " << formatNumber(static_cast(avg)) + << " | Min: " << formatNumber(min) + << " | Max: " << formatNumber(max) << std::flush; + } + } + std::cout << std::endl; +} + +// Helper function to format numbers with commas +std::string Stress::formatNumber(uint64_t num) { + std::ostringstream oss; + oss.imbue(std::locale("")); + oss << std::fixed << num; + return oss.str(); +} + +// Signal handler to set the STOP flag when receiving a termination signal +void Stress::stop() { + STOP.store(true, std::memory_order_release); +} diff --git a/stress/common/stress.h b/stress/common/stress.h new file mode 100644 index 0000000000..c0049ccd48 --- /dev/null +++ b/stress/common/stress.h @@ -0,0 +1,58 @@ +#ifndef STRESS_H +#define STRESS_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Configuration constants +constexpr uint64_t SLIDING_WINDOW_SIZE = 2; // Time window for throughput calculation (in seconds) +constexpr size_t CACHE_LINE_SIZE = 64; // Typical CPU cache line size for alignment + +// WorkerStats structure for tracking iteration counts per thread +struct alignas(CACHE_LINE_SIZE) WorkerStats { + std::atomic count{0}; // Count of iterations for a specific thread + char padding[CACHE_LINE_SIZE - sizeof(std::atomic)]; // Padding to ensure proper alignment +}; + +// StressTest class +class Stress { +public: + // Constructor + Stress(std::function func, size_t numThreads = std::thread::hardware_concurrency()); + + // Main function to start the stress test + void run(); + + // function to stop the test + void stop(); + +private: + const size_t numThreads_; // Number of threads to run + std::function func_; // Function to be executed by each thread + std::vector threads_; // Vector to hold worker threads + std::vector stats_; // Vector to hold statistics for each thread + std::atomic stopFlag_{false}; // signal to stop the test + + + // Worker thread function + void workerThread(size_t threadIndex); + + // Monitoring thread function to calculate and display throughput + void monitorThroughput(); + + // Helper function to format numbers with commas for readability + static std::string formatNumber(uint64_t num); + +}; + +// Signal handler to set the STOP flag when receiving a termination signal +void signal_handler(int); + +#endif // STRESS_TEST_H diff --git a/stress/metrics/CMakeLists.txt b/stress/metrics/CMakeLists.txt new file mode 100644 index 0000000000..7a6ab24d0c --- /dev/null +++ b/stress/metrics/CMakeLists.txt @@ -0,0 +1,27 @@ +# Define the metrics executable +add_executable(stress_metrics metrics.cc) + +# Link throughput library and OpenTelemetry Metrics API +target_link_libraries( + stress_metrics PRIVATE stress opentelemetry_metrics # OpenTelemetry Metrics + # SDK +) + +# Include directories for throughput +target_include_directories(stress_metrics + PRIVATE ${CMAKE_SOURCE_DIR}/stress/common) + +# Set properties +set_target_properties( + stress_metrics + PROPERTIES CXX_STANDARD 17 + CXX_STANDARD_REQUIRED YES + CXX_EXTENSIONS NO) + +# Optional: Installation +if(OPENTELEMETRY_INSTALL) + install( + TARGETS stress_metrics + EXPORT "${PROJECT_NAME}-target" + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) +endif() \ No newline at end of file diff --git a/stress/metrics/metrics.cc b/stress/metrics/metrics.cc new file mode 100644 index 0000000000..560b354ce0 --- /dev/null +++ b/stress/metrics/metrics.cc @@ -0,0 +1,145 @@ +#include "stress.h" // Project-specific header + + +#include +#include +#include +#include +#include +#include + + +#include "opentelemetry/common/attribute_value.h" +#include "opentelemetry/metrics/meter_provider.h" +#include "opentelemetry/metrics/provider.h" +#include "opentelemetry/nostd/shared_ptr.h" +#include "opentelemetry/sdk/metrics/aggregation/aggregation_config.h" +#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_factory.h" +#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h" +#include "opentelemetry/sdk/metrics/instruments.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" +#include "opentelemetry/sdk/metrics/meter_provider_factory.h" +#include "opentelemetry/sdk/metrics/metric_reader.h" +#include "opentelemetry/sdk/metrics/push_metric_exporter.h" +#include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h" +#include "opentelemetry/sdk/metrics/view/instrument_selector.h" +#include "opentelemetry/sdk/metrics/view/instrument_selector_factory.h" +#include "opentelemetry/sdk/metrics/view/meter_selector.h" +#include "opentelemetry/sdk/metrics/view/meter_selector_factory.h" +#include "opentelemetry/sdk/metrics/view/view.h" +#include "opentelemetry/sdk/metrics/view/view_factory.h" + +#include "stress.h" + +Stress* globalStressTest = nullptr; + +void signalHandler(int) { + if (globalStressTest) { + globalStressTest->stop(); + } +} + +namespace metrics_sdk = opentelemetry::sdk::metrics; +namespace metrics_api = opentelemetry::metrics; + + +namespace +{ +class MockMetricExporter : public opentelemetry::sdk::metrics::PushMetricExporter +{ +public: + opentelemetry::sdk::common::ExportResult Export( + const opentelemetry::sdk::metrics::ResourceMetrics & /*data*/) noexcept override + { + // Ignore all metrics and return success + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override + { + return true; // No-op + } + + + bool Shutdown(std::chrono::microseconds /*timeout*/) noexcept override + { + return true; // No-op + } + + opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality( + opentelemetry::sdk::metrics::InstrumentType) const noexcept override + { + return opentelemetry::sdk::metrics::AggregationTemporality::kDelta; + } +}; + +// Pre-generate random attributes +std::vector> GenerateAttributeSet(size_t count) +{ + std::vector> attributes_set; + for (size_t i = 0; i < count; ++i) + { + std::map attributes; + attributes["dim1"] = rand() % 100; // Random value between 0 and 99 + attributes["dim2"] = rand() % 100; // Random value between 0 and 99 + attributes["dim3"] = rand() % 100; // Random value between 0 and 99 + attributes_set.push_back(attributes); + } + return attributes_set; +} + +void InitMetrics(const std::string &name) +{ + metrics_sdk::PeriodicExportingMetricReaderOptions options; + options.export_interval_millis = std::chrono::milliseconds(1000); + options.export_timeout_millis = std::chrono::milliseconds(500); + auto exporter = std::make_unique(); + auto reader = + metrics_sdk::PeriodicExportingMetricReaderFactory::Create(std::move(exporter), options); + auto provider = metrics_sdk::MeterProviderFactory::Create(); + provider->AddMetricReader(std::move(reader)); + std::shared_ptr api_provider(std::move(provider)); + metrics_api::Provider::SetMeterProvider(api_provider); +} + +void CleanupMetrics() +{ + std::shared_ptr none; + metrics_api::Provider::SetMeterProvider(none); +} + +void CounterExample(opentelemetry::nostd::unique_ptr> &counter, + const std::vector> &attributes_set) +{ + // Pick a random attribute set + size_t random_index = rand() % attributes_set.size(); + const auto &attributes = attributes_set[random_index]; + + // Record the metric with the selected attributes + counter->Add( + 1.0, opentelemetry::common::KeyValueIterableView>(attributes), + opentelemetry::context::Context{}); +} +} // namespace + + +int main(int argc, char *argv[]) +{ + std::srand(std::time(nullptr)); // Seed the random numbe + // Pre-generate a set of random attributes + size_t attribute_count = 1000; // Number of attribute sets to pre-generate + auto attributes_set = GenerateAttributeSet(attribute_count); + + + InitMetrics("metrics_stress_test"); + auto provider = metrics_api::Provider::GetMeterProvider(); + auto meter = provider->GetMeter("metrics_stress_test", "1.0.0"); + auto counter = meter->CreateDoubleCounter("metrics_stress_test_counter"); + auto func = [&counter, &attributes_set]() { CounterExample(counter, attributes_set); }; + Stress stressTest(func, std::thread::hardware_concurrency()); + globalStressTest = &stressTest; + std::signal(SIGINT, signalHandler); + stressTest.run(); + CleanupMetrics(); + return 0; +} \ No newline at end of file From 22a178b3341f1e17bced8c7b997ad3418720ec80 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 10 Jan 2025 18:42:27 +0000 Subject: [PATCH 02/11] formar --- stress/CMakeLists.txt | 2 +- stress/common/CMakeLists.txt | 2 +- stress/common/stress.cc | 201 ++++++++++++++++++---------------- stress/common/stress.h | 57 +++++----- stress/metrics/CMakeLists.txt | 2 +- stress/metrics/metrics.cc | 20 ++-- 6 files changed, 148 insertions(+), 136 deletions(-) diff --git a/stress/CMakeLists.txt b/stress/CMakeLists.txt index 7528f3ccd1..a3eecbb0d3 100644 --- a/stress/CMakeLists.txt +++ b/stress/CMakeLists.txt @@ -1,3 +1,3 @@ # Add subdirectories for common and metrics components add_subdirectory(common) -add_subdirectory(metrics) \ No newline at end of file +add_subdirectory(metrics) diff --git a/stress/common/CMakeLists.txt b/stress/common/CMakeLists.txt index 48aee9792d..eb06ec5e32 100644 --- a/stress/common/CMakeLists.txt +++ b/stress/common/CMakeLists.txt @@ -8,4 +8,4 @@ set_target_properties( stress PROPERTIES CXX_STANDARD 17 CXX_STANDARD_REQUIRED YES - CXX_EXTENSIONS NO) \ No newline at end of file + CXX_EXTENSIONS NO) diff --git a/stress/common/stress.cc b/stress/common/stress.cc index 4e08ea3d4b..47eceff1e5 100644 --- a/stress/common/stress.cc +++ b/stress/common/stress.cc @@ -1,124 +1,139 @@ #include "stress.h" // Global flags -std::atomic STOP(false); // Global flag to stop the stress test when signaled (e.g., via Ctrl+C) -std::atomic READY(false); // Global flag to synchronize thread start - +std::atomic STOP( + false); // Global flag to stop the stress test when signaled (e.g., via Ctrl+C) +std::atomic READY(false); // Global flag to synchronize thread start // StressTest constructor Stress::Stress(std::function func, size_t numThreads) - : func_(std::move(func)), stats_(numThreads), numThreads_(numThreads) {} + : func_(std::move(func)), stats_(numThreads), numThreads_(numThreads) +{} // Main function to start the stress test -void Stress::run() { - std::cout << "Starting stress test with " << numThreads_ << " threads...\n"; - auto startTime = std::chrono::steady_clock::now(); - - READY.store(false, std::memory_order_release); - - std::thread controllerThread(&Stress::monitorThroughput, this); - - threads_.reserve(numThreads_); - for (size_t i = 0; i < numThreads_; ++i) { - threads_.emplace_back(&Stress::workerThread, this, i); - } +void Stress::run() +{ + std::cout << "Starting stress test with " << numThreads_ << " threads...\n"; + auto startTime = std::chrono::steady_clock::now(); - READY.store(true, std::memory_order_release); + READY.store(false, std::memory_order_release); - for (auto& thread : threads_) { - if (thread.joinable()) { - thread.join(); - } - } + std::thread controllerThread(&Stress::monitorThroughput, this); - if (controllerThread.joinable()) { - controllerThread.join(); - } + threads_.reserve(numThreads_); + for (size_t i = 0; i < numThreads_; ++i) + { + threads_.emplace_back(&Stress::workerThread, this, i); + } - auto endTime = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(endTime - startTime); + READY.store(true, std::memory_order_release); - uint64_t totalCount = 0; - for (const auto& stat : stats_) { - totalCount += stat.count.load(std::memory_order_relaxed); + for (auto &thread : threads_) + { + if (thread.joinable()) + { + thread.join(); } - - std::cout << "\nTest completed:\n" - << "Total iterations: " << formatNumber(totalCount) << "\n" - << "Duration: " << duration.count() << " seconds\n" - << "Average throughput: " << formatNumber(totalCount / duration.count()) - << " iterations/sec\n"; + } + + if (controllerThread.joinable()) + { + controllerThread.join(); + } + + auto endTime = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + uint64_t totalCount = 0; + for (const auto &stat : stats_) + { + totalCount += stat.count.load(std::memory_order_relaxed); + } + + std::cout << "\nTest completed:\n" + << "Total iterations: " << formatNumber(totalCount) << "\n" + << "Duration: " << duration.count() << " seconds\n" + << "Average throughput: " << formatNumber(totalCount / duration.count()) + << " iterations/sec\n"; } // Worker thread function -void Stress::workerThread(size_t threadIndex) { +void Stress::workerThread(size_t threadIndex) +{ #ifdef __linux__ - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(threadIndex % std::thread::hardware_concurrency(), &cpuset); - pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(threadIndex % std::thread::hardware_concurrency(), &cpuset); + pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); #endif - while (!STOP.load(std::memory_order_acquire)) { - func_(); - stats_[threadIndex].count.fetch_add(1, std::memory_order_relaxed); - } + while (!STOP.load(std::memory_order_acquire)) + { + func_(); + stats_[threadIndex].count.fetch_add(1, std::memory_order_relaxed); + } } // Monitoring thread function -void Stress::monitorThroughput() { - uint64_t lastTotalCount = 0; - auto lastTime = std::chrono::steady_clock::now(); - std::vector throughputHistory; - - while (!STOP.load(std::memory_order_acquire)) { - std::this_thread::sleep_for(std::chrono::seconds(SLIDING_WINDOW_SIZE)); - - auto currentTime = std::chrono::steady_clock::now(); - auto elapsed = std::chrono::duration_cast(currentTime - lastTime).count(); - - uint64_t totalCount = 0; - for (const auto& stat : stats_) { - totalCount += stat.count.load(std::memory_order_relaxed); - } - - uint64_t currentCount = totalCount - lastTotalCount; - lastTotalCount = totalCount; - lastTime = currentTime; - - if (elapsed > 0) { - uint64_t throughput = currentCount / elapsed; - throughputHistory.push_back(throughput); - - double avg = 0; - uint64_t min = throughput; - uint64_t max = throughput; - - for (uint64_t t : throughputHistory) { - avg += t; - min = std::min(min, t); - max = std::max(max, t); - } - avg /= throughputHistory.size(); - - std::cout << "\rThroughput: " << formatNumber(throughput) - << " it/s | Avg: " << formatNumber(static_cast(avg)) - << " | Min: " << formatNumber(min) - << " | Max: " << formatNumber(max) << std::flush; - } +void Stress::monitorThroughput() +{ + uint64_t lastTotalCount = 0; + auto lastTime = std::chrono::steady_clock::now(); + std::vector throughputHistory; + + while (!STOP.load(std::memory_order_acquire)) + { + std::this_thread::sleep_for(std::chrono::seconds(SLIDING_WINDOW_SIZE)); + + auto currentTime = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(currentTime - lastTime).count(); + + uint64_t totalCount = 0; + for (const auto &stat : stats_) + { + totalCount += stat.count.load(std::memory_order_relaxed); + } + + uint64_t currentCount = totalCount - lastTotalCount; + lastTotalCount = totalCount; + lastTime = currentTime; + + if (elapsed > 0) + { + uint64_t throughput = currentCount / elapsed; + throughputHistory.push_back(throughput); + + double avg = 0; + uint64_t min = throughput; + uint64_t max = throughput; + + for (uint64_t t : throughputHistory) + { + avg += t; + min = std::min(min, t); + max = std::max(max, t); + } + avg /= throughputHistory.size(); + + std::cout << "\rThroughput: " << formatNumber(throughput) + << " it/s | Avg: " << formatNumber(static_cast(avg)) + << " | Min: " << formatNumber(min) << " | Max: " << formatNumber(max) << std::flush; } - std::cout << std::endl; + } + std::cout << std::endl; } // Helper function to format numbers with commas -std::string Stress::formatNumber(uint64_t num) { - std::ostringstream oss; - oss.imbue(std::locale("")); - oss << std::fixed << num; - return oss.str(); +std::string Stress::formatNumber(uint64_t num) +{ + std::ostringstream oss; + oss.imbue(std::locale("")); + oss << std::fixed << num; + return oss.str(); } // Signal handler to set the STOP flag when receiving a termination signal -void Stress::stop() { - STOP.store(true, std::memory_order_release); +void Stress::stop() +{ + STOP.store(true, std::memory_order_release); } diff --git a/stress/common/stress.h b/stress/common/stress.h index c0049ccd48..ab3e1a46b6 100644 --- a/stress/common/stress.h +++ b/stress/common/stress.h @@ -3,56 +3,57 @@ #include #include +#include +#include #include #include #include #include #include -#include -#include // Configuration constants -constexpr uint64_t SLIDING_WINDOW_SIZE = 2; // Time window for throughput calculation (in seconds) -constexpr size_t CACHE_LINE_SIZE = 64; // Typical CPU cache line size for alignment +constexpr uint64_t SLIDING_WINDOW_SIZE = 2; // Time window for throughput calculation (in seconds) +constexpr size_t CACHE_LINE_SIZE = 64; // Typical CPU cache line size for alignment // WorkerStats structure for tracking iteration counts per thread -struct alignas(CACHE_LINE_SIZE) WorkerStats { - std::atomic count{0}; // Count of iterations for a specific thread - char padding[CACHE_LINE_SIZE - sizeof(std::atomic)]; // Padding to ensure proper alignment +struct alignas(CACHE_LINE_SIZE) WorkerStats +{ + std::atomic count{0}; // Count of iterations for a specific thread + char padding[CACHE_LINE_SIZE - + sizeof(std::atomic)]; // Padding to ensure proper alignment }; // StressTest class -class Stress { +class Stress +{ public: - // Constructor - Stress(std::function func, size_t numThreads = std::thread::hardware_concurrency()); + // Constructor + Stress(std::function func, size_t numThreads = std::thread::hardware_concurrency()); - // Main function to start the stress test - void run(); + // Main function to start the stress test + void run(); - // function to stop the test - void stop(); + // function to stop the test + void stop(); private: - const size_t numThreads_; // Number of threads to run - std::function func_; // Function to be executed by each thread - std::vector threads_; // Vector to hold worker threads - std::vector stats_; // Vector to hold statistics for each thread - std::atomic stopFlag_{false}; // signal to stop the test - - - // Worker thread function - void workerThread(size_t threadIndex); + const size_t numThreads_; // Number of threads to run + std::function func_; // Function to be executed by each thread + std::vector threads_; // Vector to hold worker threads + std::vector stats_; // Vector to hold statistics for each thread + std::atomic stopFlag_{false}; // signal to stop the test - // Monitoring thread function to calculate and display throughput - void monitorThroughput(); + // Worker thread function + void workerThread(size_t threadIndex); - // Helper function to format numbers with commas for readability - static std::string formatNumber(uint64_t num); + // Monitoring thread function to calculate and display throughput + void monitorThroughput(); + // Helper function to format numbers with commas for readability + static std::string formatNumber(uint64_t num); }; // Signal handler to set the STOP flag when receiving a termination signal void signal_handler(int); -#endif // STRESS_TEST_H +#endif // STRESS_TEST_H diff --git a/stress/metrics/CMakeLists.txt b/stress/metrics/CMakeLists.txt index 7a6ab24d0c..593001988f 100644 --- a/stress/metrics/CMakeLists.txt +++ b/stress/metrics/CMakeLists.txt @@ -24,4 +24,4 @@ if(OPENTELEMETRY_INSTALL) TARGETS stress_metrics EXPORT "${PROJECT_NAME}-target" RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) -endif() \ No newline at end of file +endif() diff --git a/stress/metrics/metrics.cc b/stress/metrics/metrics.cc index 560b354ce0..d92a3cd334 100644 --- a/stress/metrics/metrics.cc +++ b/stress/metrics/metrics.cc @@ -1,6 +1,5 @@ #include "stress.h" // Project-specific header - #include #include #include @@ -8,7 +7,6 @@ #include #include - #include "opentelemetry/common/attribute_value.h" #include "opentelemetry/metrics/meter_provider.h" #include "opentelemetry/metrics/provider.h" @@ -31,18 +29,19 @@ #include "stress.h" -Stress* globalStressTest = nullptr; +Stress *globalStressTest = nullptr; -void signalHandler(int) { - if (globalStressTest) { - globalStressTest->stop(); - } +void signalHandler(int) +{ + if (globalStressTest) + { + globalStressTest->stop(); + } } namespace metrics_sdk = opentelemetry::sdk::metrics; namespace metrics_api = opentelemetry::metrics; - namespace { class MockMetricExporter : public opentelemetry::sdk::metrics::PushMetricExporter @@ -60,7 +59,6 @@ class MockMetricExporter : public opentelemetry::sdk::metrics::PushMetricExporte return true; // No-op } - bool Shutdown(std::chrono::microseconds /*timeout*/) noexcept override { return true; // No-op @@ -122,7 +120,6 @@ void CounterExample(opentelemetry::nostd::unique_ptrGetMeter("metrics_stress_test", "1.0.0"); auto counter = meter->CreateDoubleCounter("metrics_stress_test_counter"); - auto func = [&counter, &attributes_set]() { CounterExample(counter, attributes_set); }; + auto func = [&counter, &attributes_set]() { CounterExample(counter, attributes_set); }; Stress stressTest(func, std::thread::hardware_concurrency()); globalStressTest = &stressTest; std::signal(SIGINT, signalHandler); From 449f360de17db3af98d95e3a16f5188ef3f4ea41 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 10 Jan 2025 18:47:38 +0000 Subject: [PATCH 03/11] add docs --- stress/common/stress.cc | 3 +++ stress/common/stress.h | 49 +++++++++++++++++++++++++++++++++++++-- stress/metrics/metrics.cc | 7 +++--- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/stress/common/stress.cc b/stress/common/stress.cc index 47eceff1e5..4d53dd37aa 100644 --- a/stress/common/stress.cc +++ b/stress/common/stress.cc @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + #include "stress.h" // Global flags diff --git a/stress/common/stress.h b/stress/common/stress.h index ab3e1a46b6..185feca893 100644 --- a/stress/common/stress.h +++ b/stress/common/stress.h @@ -1,5 +1,50 @@ -#ifndef STRESS_H -#define STRESS_H +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +/** + * A multi-threaded stress test framework to measure throughput and performance of a given workload. + * + * ## Overview + * Multi-threaded stress test framework designed to execute a specified function + * in parallel across multiple threads and measure its throughput. The results are displayed + * dynamically, including current throughput, average throughput, and minimum/maximum throughput + * during the test. + * + * ## Key Features + * - **Multi-threading**: Uses std::thread to execute the workload in parallel across a user-defined + * number of threads. + * - **Thread Safety**: Tracks iteration counts per thread using an aligned and padded structure + * (WorkerStats) to avoid false sharing and ensure efficient thread-safe updates. + * - **Dynamic Metrics**: Continuously calculates and displays throughput (iterations/sec) over + * sliding time windows. + * - **Graceful Termination**: Captures signals (e.g., Ctrl+C) to cleanly stop all threads and + * summarize the results. + * - **Thread Affinity (Linux-only)**: Optionally binds threads to specific CPU cores for consistent + * performance. + * + * ## Implementation Details + * - **Worker Threads**: + * - Each worker thread executes the workload function (func) in a loop until a global STOP flag + * is set. + * - Each thread maintains its own iteration count to minimize contention. + * + * - **Throughput Monitoring**: + * - A separate controller thread monitors throughput by periodically summing up iteration counts + * across threads. + * - Throughput is calculated over a sliding window (SLIDING_WINDOW_SIZE) and displayed + * dynamically. + * + * - **Thread Synchronization**: + * - The STOP flag, an std::atomic, ensures all threads stop gracefully when signaled. + * - Memory ordering (e.g., std::memory_order_relaxed, std::memory_order_acquire/release) is used + * to optimize performance while maintaining correctness. + * + * - **Final Summary**: + * - At the end of the test, the program calculates and prints the total iterations, duration, and + * average throughput. + */ + +#pragma once #include #include diff --git a/stress/metrics/metrics.cc b/stress/metrics/metrics.cc index d92a3cd334..20c9c3a597 100644 --- a/stress/metrics/metrics.cc +++ b/stress/metrics/metrics.cc @@ -1,4 +1,7 @@ -#include "stress.h" // Project-specific header +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "stress.h" #include #include @@ -27,8 +30,6 @@ #include "opentelemetry/sdk/metrics/view/view.h" #include "opentelemetry/sdk/metrics/view/view_factory.h" -#include "stress.h" - Stress *globalStressTest = nullptr; void signalHandler(int) From f9b081419c8a4969c0d3076d69d63d1b5763353d Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 12 Jan 2025 21:04:24 -0800 Subject: [PATCH 04/11] remove extra endif --- stress/common/stress.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/stress/common/stress.h b/stress/common/stress.h index 185feca893..f9ec4283f7 100644 --- a/stress/common/stress.h +++ b/stress/common/stress.h @@ -98,7 +98,3 @@ class Stress static std::string formatNumber(uint64_t num); }; -// Signal handler to set the STOP flag when receiving a termination signal -void signal_handler(int); - -#endif // STRESS_TEST_H From 5f9d0da19161702f76235726898bd3404acf0fef Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 12 Jan 2025 21:34:25 -0800 Subject: [PATCH 05/11] maintainer mode build --- stress/common/stress.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stress/common/stress.h b/stress/common/stress.h index f9ec4283f7..f8abeb8c68 100644 --- a/stress/common/stress.h +++ b/stress/common/stress.h @@ -82,10 +82,10 @@ class Stress void stop(); private: - const size_t numThreads_; // Number of threads to run std::function func_; // Function to be executed by each thread std::vector threads_; // Vector to hold worker threads std::vector stats_; // Vector to hold statistics for each thread + const size_t numThreads_; // Number of threads to run std::atomic stopFlag_{false}; // signal to stop the test // Worker thread function From 32d06ffee18c3c56fefefe17b2ce06a5fca93e83 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 12 Jan 2025 21:44:46 -0800 Subject: [PATCH 06/11] fix copyright --- stress/common/CMakeLists.txt | 3 +++ stress/metrics/CMakeLists.txt | 3 +++ stress/metrics/metrics.cc | 4 ++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/stress/common/CMakeLists.txt b/stress/common/CMakeLists.txt index eb06ec5e32..2168cff9b4 100644 --- a/stress/common/CMakeLists.txt +++ b/stress/common/CMakeLists.txt @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + add_library(stress STATIC stress.cc) # Include directory for the throughput library diff --git a/stress/metrics/CMakeLists.txt b/stress/metrics/CMakeLists.txt index 593001988f..2b55522c84 100644 --- a/stress/metrics/CMakeLists.txt +++ b/stress/metrics/CMakeLists.txt @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + # Define the metrics executable add_executable(stress_metrics metrics.cc) diff --git a/stress/metrics/metrics.cc b/stress/metrics/metrics.cc index 20c9c3a597..18c7b5dc36 100644 --- a/stress/metrics/metrics.cc +++ b/stress/metrics/metrics.cc @@ -87,7 +87,7 @@ std::vector> GenerateAttributeSet(size_t count) return attributes_set; } -void InitMetrics(const std::string &name) +void InitMetrics(const std::string /*&name*/) { metrics_sdk::PeriodicExportingMetricReaderOptions options; options.export_interval_millis = std::chrono::milliseconds(1000); @@ -121,7 +121,7 @@ void CounterExample(opentelemetry::nostd::unique_ptr Date: Sun, 12 Jan 2025 21:49:46 -0800 Subject: [PATCH 07/11] copyright --- stress/common/CMakeLists.txt | 4 ++-- stress/metrics/CMakeLists.txt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stress/common/CMakeLists.txt b/stress/common/CMakeLists.txt index 2168cff9b4..de8b0200c8 100644 --- a/stress/common/CMakeLists.txt +++ b/stress/common/CMakeLists.txt @@ -1,5 +1,5 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 add_library(stress STATIC stress.cc) diff --git a/stress/metrics/CMakeLists.txt b/stress/metrics/CMakeLists.txt index 2b55522c84..eb8d457d88 100644 --- a/stress/metrics/CMakeLists.txt +++ b/stress/metrics/CMakeLists.txt @@ -1,5 +1,5 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 # Define the metrics executable add_executable(stress_metrics metrics.cc) From 5a222fdd025588e4105ff772ca570fd77af5334f Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 13 Jan 2025 06:01:22 +0000 Subject: [PATCH 08/11] fix format --- stress/common/stress.h | 1 - 1 file changed, 1 deletion(-) diff --git a/stress/common/stress.h b/stress/common/stress.h index f8abeb8c68..3b2401104e 100644 --- a/stress/common/stress.h +++ b/stress/common/stress.h @@ -97,4 +97,3 @@ class Stress // Helper function to format numbers with commas for readability static std::string formatNumber(uint64_t num); }; - From 03ffa5453843d815e811307b2a1497d6c0fa950e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 12 Jan 2025 22:21:16 -0800 Subject: [PATCH 09/11] Add copyright and license information --- stress/CMakeLists.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/stress/CMakeLists.txt b/stress/CMakeLists.txt index a3eecbb0d3..a1184b2513 100644 --- a/stress/CMakeLists.txt +++ b/stress/CMakeLists.txt @@ -1,3 +1,6 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + # Add subdirectories for common and metrics components add_subdirectory(common) add_subdirectory(metrics) From ab075534c4347e11d881b39843ff723100a98f5b Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 12 Jan 2025 22:45:03 -0800 Subject: [PATCH 10/11] fix msvc error --- stress/metrics/metrics.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stress/metrics/metrics.cc b/stress/metrics/metrics.cc index 18c7b5dc36..62bc8d8b51 100644 --- a/stress/metrics/metrics.cc +++ b/stress/metrics/metrics.cc @@ -123,7 +123,7 @@ void CounterExample(opentelemetry::nostd::unique_ptr(std::time(nullptr))); // Seed the random number generator // Pre-generate a set of random attributes size_t attribute_count = 1000; // Number of attribute sets to pre-generate auto attributes_set = GenerateAttributeSet(attribute_count); From b56f996cb76a5217ff0b42535af063fef742d724 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 12 Jan 2025 22:59:31 -0800 Subject: [PATCH 11/11] add newline --- stress/metrics/metrics.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stress/metrics/metrics.cc b/stress/metrics/metrics.cc index 62bc8d8b51..ac1fe2c9e0 100644 --- a/stress/metrics/metrics.cc +++ b/stress/metrics/metrics.cc @@ -139,4 +139,4 @@ int main(int /*argc*/, char ** /*argv[]*/) stressTest.run(); CleanupMetrics(); return 0; -} \ No newline at end of file +}