Skip to content

Commit 87917b4

Browse files
committed
[线程模块重构与优化]: 全面升级线程及线程池实现,引入 C++20 新特性
- **线程实现更新**: 将传统 `std::thread` 替换为 C++20 的 `std::jthread`,简化线程管理并增强异常处理能力,同时更新线程任务函数签名以支持停止令牌(stop token) - **新增线程安全队列**: 设计通用模板类 `Queue` 作为线程池任务队列,具备线程安全特性、大小限制及阻塞操作功能 - **线程池重构**: 基于新队列重写线程池逻辑,移除冗余互斥锁与条件变量,优化任务添加、等待及清除机制,提升并发性能 - **测试用例扩充**: 增加中断测试、带超时等待测试及任务执行验证测试,完善线程和线程池单元测试覆盖 - **跨平台适配调整**: 根据不同操作系统条件编译线程模块,解决 Apple Clang 编译器兼容性问题并在文档中标注限制 - **构建配置优化**: 调整 CMake 配置文件,规范线程模块测试可执行文件依赖项顺序,增强构建稳定性
1 parent 447c703 commit 87917b4

File tree

8 files changed

+296
-106
lines changed

8 files changed

+296
-106
lines changed

CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 3.25.1)
22

33
include(cmake/vcpkg.cmake)
44

5-
# 设定工程名
65
project(
76
Cpp-Examples
87
VERSION 0.0.1
@@ -69,12 +68,14 @@ add_subdirectory(MonitorDir_EFSW)
6968
add_subdirectory(OpenSSL)
7069
add_subdirectory(SafeCallback)
7170
add_subdirectory(SpinMutex)
72-
add_subdirectory(Thread)
7371

74-
if(CMAKE_HOST_LINUX)
72+
if(CMAKE_HOST_WIN32)
73+
add_subdirectory(Thread)
74+
elseif(CMAKE_HOST_LINUX)
7575
add_subdirectory(Client)
7676
add_subdirectory(Icmp)
7777
add_subdirectory(Server)
78+
add_subdirectory(Thread)
7879
endif()
7980

8081
include(cmake/build_info.cmake)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@
5151
2. [server_poll](/Server/server_poll.cc)——poll的例子;
5252
3. [server_select](/Server/server_select.cc)——select的例子;
5353
23. [SpinMutex](/SpinMutex)——使用std::atomic_flag实现的简单互斥锁和自旋锁;
54-
24. [Thread](/Thread/)——基于std::thread实现的线程类,包括线程池;
54+
24. [Thread](/Thread/)——基于std::jthread实现的线程类,包括线程池`Apple Clang`不支持)
5555
1. [Thread](/Thread/thread.hpp)——线程类;
5656
2. [ThreadPool](/Thread/threadpool.hpp)——线程池;

Thread/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ target_link_libraries(thread_unittest PRIVATE GTest::gtest GTest::gtest_main
33
GTest::gmock GTest::gmock_main)
44
add_test(NAME thread_unittest COMMAND thread_unittest)
55

6-
add_executable(threadpool_unittest threadpool_unittest.cc threadpool.hpp
7-
thread.hpp)
6+
add_executable(threadpool_unittest queue.hpp threadpool_unittest.cc
7+
threadpool.hpp thread.hpp)
88
target_link_libraries(
99
threadpool_unittest PRIVATE GTest::gtest GTest::gtest_main GTest::gmock
1010
GTest::gmock_main)

Thread/queue.hpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#pragma once
2+
3+
#include <utils/object.hpp>
4+
5+
#include <condition_variable>
6+
#include <mutex>
7+
#include <utility>
8+
#include <queue>
9+
10+
template<typename T>
11+
requires std::is_move_assignable_v<T>
12+
class Queue : noncopyable
13+
{
14+
std::queue<T> m_queue;
15+
bool m_stop = false;
16+
mutable std::mutex m_mutex;
17+
std::condition_variable m_condEmpty;
18+
std::condition_variable m_condFull;
19+
std::size_t m_maxSize = 0;
20+
21+
public:
22+
Queue() = default;
23+
explicit Queue(std::size_t maxSize)
24+
: m_maxSize(maxSize == 0 ? 1 : maxSize)
25+
{}
26+
~Queue() { stop(); }
27+
28+
[[nodiscard]] auto push(T &&item) -> bool
29+
{
30+
{
31+
std::unique_lock lock(m_mutex);
32+
m_condFull.wait(lock, [&]() { return !m_stop && (m_queue.size() < m_maxSize); });
33+
if (m_stop) {
34+
return false;
35+
}
36+
m_queue.push(std::move(item));
37+
}
38+
m_condEmpty.notify_one();
39+
return true;
40+
}
41+
42+
[[nodiscard]] auto pop(T &item) -> bool
43+
{
44+
std::unique_lock lock(m_mutex);
45+
m_condEmpty.wait(lock, [&]() { return !m_queue.empty() || m_stop; });
46+
if (m_queue.empty()) {
47+
return false;
48+
}
49+
item = std::move(m_queue.front());
50+
m_queue.pop();
51+
m_condFull.notify_one();
52+
return true;
53+
}
54+
55+
void setMaxSize(std::size_t maxSize)
56+
{
57+
std::scoped_lock guard(m_mutex);
58+
m_maxSize = maxSize == 0 ? 1 : maxSize;
59+
m_condFull.notify_all();
60+
}
61+
62+
[[nodiscard]] auto getMaxSize() const -> std::size_t
63+
{
64+
std::scoped_lock guard(m_mutex);
65+
return m_maxSize;
66+
}
67+
68+
[[nodiscard]] auto size() const -> std::size_t
69+
{
70+
std::scoped_lock guard(m_mutex);
71+
return m_queue.size();
72+
}
73+
74+
[[nodiscard]] auto empty() const -> bool
75+
{
76+
std::scoped_lock guard(m_mutex);
77+
return m_queue.empty();
78+
}
79+
80+
void clear()
81+
{
82+
std::scoped_lock guard(m_mutex);
83+
std::queue<T>().swap(m_queue);
84+
m_condFull.notify_all();
85+
m_condEmpty.notify_all();
86+
}
87+
88+
[[nodiscard]] auto isStopped() const -> bool
89+
{
90+
std::scoped_lock guard(m_mutex);
91+
return m_stop;
92+
}
93+
94+
void start()
95+
{
96+
std::scoped_lock guard(m_mutex);
97+
m_stop = false;
98+
}
99+
100+
void stop()
101+
{
102+
{
103+
std::scoped_lock guard(m_mutex);
104+
m_stop = true;
105+
}
106+
m_condEmpty.notify_all();
107+
m_condFull.notify_all();
108+
}
109+
};

Thread/thread.hpp

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,80 @@
77
#include <chrono>
88
#include <condition_variable>
99
#include <functional>
10+
#include <iostream>
1011
#include <mutex>
1112
#include <thread>
1213

1314
class Thread : noncopyable
1415
{
1516
public:
16-
using Task = std::function<void()>;
17+
using Task = std::function<void(std::stop_token)>;
1718

1819
Thread() = default;
19-
explicit Thread(Task task) { setTask(std::move(task)); }
20+
21+
explicit Thread(Task task)
22+
: m_task(std::move(task))
23+
{}
24+
2025
~Thread() { stop(); }
2126

22-
void setTask(Task task) { m_task = std::move(task); }
27+
void setTask(Task task)
28+
{
29+
std::lock_guard<std::mutex> lock(m_mutex);
30+
m_task = std::move(task);
31+
}
2332

24-
void start()
33+
bool start()
2534
{
35+
if (m_jthread.joinable()) {
36+
return false;
37+
}
38+
2639
m_running.store(true);
27-
m_thread = std::thread([this]() {
28-
m_condition.notify_one();
40+
m_jthread = std::jthread([this](std::stop_token token) {
2941
if (m_task) {
30-
m_task();
42+
try {
43+
m_task(token);
44+
} catch (const std::exception &e) {
45+
std::cerr << "Thread exception: " << e.what() << std::endl;
46+
}
3147
}
48+
49+
m_running.store(false);
50+
m_running.notify_all();
3251
});
52+
53+
return true;
3354
}
3455

3556
void stop()
3657
{
37-
if (m_thread.joinable()) {
38-
m_thread.join();
58+
if (m_jthread.joinable()) {
59+
m_jthread.request_stop();
60+
m_jthread.join();
3961
}
62+
4063
m_running.store(false);
4164
}
4265

43-
void waitForStarted()
66+
void interrupt()
4467
{
45-
std::unique_lock<std::mutex> lock(m_mutex);
46-
m_condition.wait(lock, [this]() { return m_running.load(); });
68+
if (m_jthread.joinable()) {
69+
m_jthread.request_stop();
70+
std::cout << "Thread interrupted: " << m_jthread.get_id() << std::endl;
71+
}
72+
}
73+
74+
void waitForFinished()
75+
{
76+
while (m_running.load() && m_jthread.joinable()) {
77+
m_running.wait(true);
78+
}
4779
}
4880

49-
[[nodiscard]] auto isRunning() const -> bool { return m_running; }
81+
[[nodiscard]] bool isRunning() const { return m_running.load(); }
5082

51-
[[nodiscard]] auto getThreadId() const -> std::thread::id { return m_thread.get_id(); }
83+
[[nodiscard]] std::thread::id getThreadId() const { return m_jthread.get_id(); }
5284

5385
static void yield() { std::this_thread::yield(); }
5486

@@ -64,15 +96,14 @@ class Thread : noncopyable
6496

6597
static auto hardwareConcurrency() -> unsigned int
6698
{
67-
auto availableCores = std::thread::hardware_concurrency(); // 如果不支持,返回0
99+
auto availableCores = std::thread::hardware_concurrency();
68100
assert(availableCores > 0);
69101
return availableCores;
70102
}
71103

72104
private:
73-
std::thread m_thread;
105+
std::jthread m_jthread;
74106
std::atomic_bool m_running{false};
75107
std::mutex m_mutex;
76-
std::condition_variable m_condition;
77108
Task m_task;
78109
};

Thread/thread_unittest.cc

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,53 @@
22

33
#include <gtest/gtest.h>
44

5-
TEST(Thread, start)
5+
void testTaskFunction(std::stop_token stopToken)
66
{
7-
Thread thread([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
7+
for (int i = 0; i < 5; ++i) {
8+
if (stopToken.stop_requested()) {
9+
break;
10+
}
11+
12+
std::cout << "Test task is running: " << i << std::endl;
13+
Thread::sleepFor(std::chrono::milliseconds(200));
14+
}
15+
}
16+
17+
TEST(ThreadTest, StartAndStop)
18+
{
19+
Thread thread;
20+
thread.setTask(testTaskFunction);
821
thread.start();
9-
thread.waitForStarted();
10-
EXPECT_TRUE(thread.isRunning());
1122
thread.stop();
1223
EXPECT_FALSE(thread.isRunning());
1324
}
1425

15-
TEST(Thread, setTask)
26+
TEST(ThreadTest, Interrupt)
1627
{
1728
Thread thread;
18-
thread.setTask([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
29+
thread.setTask(testTaskFunction);
1930
thread.start();
20-
thread.waitForStarted();
21-
EXPECT_TRUE(thread.isRunning());
22-
thread.stop();
31+
thread.interrupt();
32+
Thread::sleepFor(std::chrono::seconds(1));
2333
EXPECT_FALSE(thread.isRunning());
2434
}
2535

26-
auto main(int argc, char *argv[]) -> int
36+
TEST(ThreadTest, TaskExecution)
37+
{
38+
Thread thread;
39+
bool taskExecuted = false;
40+
thread.setTask([&taskExecuted](std::stop_token) {
41+
taskExecuted = true;
42+
std::cout << "Task executed." << std::endl;
43+
});
44+
thread.start();
45+
thread.sleepFor(std::chrono::seconds(1));
46+
thread.stop();
47+
EXPECT_TRUE(taskExecuted);
48+
}
49+
50+
int main(int argc, char **argv)
2751
{
28-
testing::InitGoogleTest(&argc, argv);
52+
::testing::InitGoogleTest(&argc, argv);
2953
return RUN_ALL_TESTS();
3054
}

0 commit comments

Comments
 (0)