Skip to content

Commit 0c20db5

Browse files
committed
moved examples from various repos here
1 parent 60696d2 commit 0c20db5

File tree

8 files changed

+1056
-89
lines changed

8 files changed

+1056
-89
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// circular_mp_test.cpp : This file contains the 'main' function. Program execution begins and ends there.
2+
//
3+
4+
#include <iostream>
5+
#include <thread>
6+
#include <chrono>
7+
#include <vector>
8+
#include <circular_queue_mp.h>
9+
10+
struct qitem
11+
{
12+
// producer id
13+
int id;
14+
// monotonic increasing value
15+
int val = 0;
16+
};
17+
18+
constexpr int TOTALMESSAGESTARGET = 60000000;
19+
// reserve one thread as consumer
20+
const auto THREADS = std::thread::hardware_concurrency() / 2 - 1;
21+
const int MESSAGES = TOTALMESSAGESTARGET / THREADS;
22+
circular_queue<std::thread> threads(THREADS);
23+
circular_queue_mp<qitem> queue(threads.capacity()* MESSAGES / 10);
24+
std::vector<int> checks(threads.capacity());
25+
26+
int main()
27+
{
28+
using namespace std::chrono_literals;
29+
std::cerr << "Utilizing " << THREADS << " producer threads" << std::endl;
30+
for (int i = 0; i < threads.capacity(); ++i)
31+
{
32+
threads.push(std::thread([i]() {
33+
for (int c = 0; c < MESSAGES;)
34+
{
35+
// simulate some load
36+
auto start = std::chrono::system_clock::now();
37+
while (std::chrono::system_clock::now() - start < 1us);
38+
if (queue.push({ i, c }))
39+
{
40+
++c;
41+
}
42+
else
43+
{
44+
//std::cerr << "queue full" << std::endl;
45+
//std::this_thread::sleep_for(10us);
46+
}
47+
//if (0 == c % 10000) std::this_thread::sleep_for(10us);
48+
}
49+
}));
50+
}
51+
for (int o = 0; o < threads.available() * MESSAGES; ++o)
52+
{
53+
auto now = std::chrono::system_clock::now();
54+
while (!queue.available())
55+
{
56+
auto starvedFor = std::chrono::system_clock::now() - now;
57+
if (starvedFor > 20s) std::cerr << "queue starved for > 20s" << std::endl;
58+
//std::this_thread::sleep_for(20ms);
59+
}
60+
auto item = queue.pop();
61+
if (checks[item.id] != item.val)
62+
{
63+
std::cerr << "item mismatch" << std::endl;
64+
}
65+
checks[item.id] = item.val + 1;
66+
if (0 == item.val % 1000) std::this_thread::sleep_for(100us);
67+
}
68+
while (threads.available())
69+
{
70+
auto thread = threads.pop();
71+
thread.join();
72+
}
73+
return 0;
74+
}

0 commit comments

Comments
 (0)