Skip to content

Commit c4c9809

Browse files
committed
[线程模块重构]: 基于 C++20 标准优化线程类及线程池实现
- 线程类优化:将底层实现从 `std::thread` 替换为 `std::jthread`,利用其内置的停止机制增强线程控制能力;调整线程启动逻辑,增加对停止请求的处理,提高线程安全性与可维护性 - 线程池改进:使用自定义 `Queue` 类替换原任务队列实现,提升任务管理效率与线程间通信能力;重构任务添加与执行流程,充分利用 `std::jthread` 特性优化线程调度与资源管理 - 依赖调整:更新 CMakeLists.txt 文件,修正线程模块在不同操作系统下的编译条件,确保代码在 Windows 平台的兼容性 - 文档更新:同步更新 README.md 中线程模块的描述,明确指出基于 `std::jthread` 实现且不支持 Apple Clang 编译器的特性说明 - 单元测试增强:完善线程与线程池的单元测试用例,覆盖启动、停止、中断、任务执行等关键场景,验证新实现的正确性与稳定性 - 代码结构优化:整理线程模块代码结构,规范化命名与注释,提升代码可读性与可扩展性,为后续功能开发奠定基础 - 性能提升:通过减少线程创建销毁开销、优化锁竞争机制,有效提高线程池在高并发场景下的执行效率与响应速度
1 parent 447c703 commit c4c9809

File tree

8 files changed

+315
-107
lines changed

8 files changed

+315
-107
lines changed

CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 3.25.1)
22

33
include(cmake/vcpkg.cmake)
44

5-
# 设定工程名
65
project(
76
Cpp-Examples
87
VERSION 0.0.1
@@ -69,12 +68,14 @@ add_subdirectory(MonitorDir_EFSW)
6968
add_subdirectory(OpenSSL)
7069
add_subdirectory(SafeCallback)
7170
add_subdirectory(SpinMutex)
72-
add_subdirectory(Thread)
7371

74-
if(CMAKE_HOST_LINUX)
72+
if(CMAKE_HOST_WIN32)
73+
add_subdirectory(Thread)
74+
elseif(CMAKE_HOST_LINUX)
7575
add_subdirectory(Client)
7676
add_subdirectory(Icmp)
7777
add_subdirectory(Server)
78+
add_subdirectory(Thread)
7879
endif()
7980

8081
include(cmake/build_info.cmake)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@
5151
2. [server_poll](/Server/server_poll.cc)——poll的例子;
5252
3. [server_select](/Server/server_select.cc)——select的例子;
5353
23. [SpinMutex](/SpinMutex)——使用std::atomic_flag实现的简单互斥锁和自旋锁;
54-
24. [Thread](/Thread/)——基于std::thread实现的线程类,包括线程池;
54+
24. [Thread](/Thread/)——基于std::jthread实现的线程类,包括线程池`Apple Clang`不支持)
5555
1. [Thread](/Thread/thread.hpp)——线程类;
5656
2. [ThreadPool](/Thread/threadpool.hpp)——线程池;

Thread/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ target_link_libraries(thread_unittest PRIVATE GTest::gtest GTest::gtest_main
33
GTest::gmock GTest::gmock_main)
44
add_test(NAME thread_unittest COMMAND thread_unittest)
55

6-
add_executable(threadpool_unittest threadpool_unittest.cc threadpool.hpp
7-
thread.hpp)
6+
add_executable(threadpool_unittest queue.hpp threadpool_unittest.cc
7+
threadpool.hpp thread.hpp)
88
target_link_libraries(
99
threadpool_unittest PRIVATE GTest::gtest GTest::gtest_main GTest::gmock
1010
GTest::gmock_main)

Thread/queue.hpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#pragma once
2+
3+
#include <utils/object.hpp>
4+
5+
#include <condition_variable>
6+
#include <mutex>
7+
#include <utility>
8+
#include <queue>
9+
10+
template<typename T>
11+
requires std::is_move_assignable_v<T>
12+
class Queue : noncopyable
13+
{
14+
std::queue<T> m_queue;
15+
bool m_stop = false;
16+
mutable std::mutex m_mutex;
17+
std::condition_variable m_condEmpty;
18+
std::condition_variable m_condFull;
19+
std::size_t m_maxSize = 0;
20+
21+
public:
22+
Queue() = default;
23+
explicit Queue(std::size_t maxSize)
24+
: m_maxSize(maxSize == 0 ? 1 : maxSize)
25+
{}
26+
~Queue() { stop(); }
27+
28+
[[nodiscard]] auto push(T &&item) -> bool
29+
{
30+
{
31+
std::unique_lock lock(m_mutex);
32+
m_condFull.wait(lock, [&]() { return !m_stop && (m_queue.size() < m_maxSize); });
33+
if (m_stop) {
34+
return false;
35+
}
36+
m_queue.push(std::move(item));
37+
}
38+
m_condEmpty.notify_one();
39+
return true;
40+
}
41+
42+
[[nodiscard]] auto pop(T &item) -> bool
43+
{
44+
std::unique_lock lock(m_mutex);
45+
m_condEmpty.wait(lock, [&]() { return !m_queue.empty() || m_stop; });
46+
if (m_queue.empty()) {
47+
return false;
48+
}
49+
item = std::move(m_queue.front());
50+
m_queue.pop();
51+
m_condFull.notify_one();
52+
return true;
53+
}
54+
55+
void setMaxSize(std::size_t maxSize)
56+
{
57+
std::scoped_lock guard(m_mutex);
58+
m_maxSize = maxSize == 0 ? 1 : maxSize;
59+
m_condFull.notify_all();
60+
}
61+
62+
[[nodiscard]] auto getMaxSize() const -> std::size_t
63+
{
64+
std::scoped_lock guard(m_mutex);
65+
return m_maxSize;
66+
}
67+
68+
[[nodiscard]] auto size() const -> std::size_t
69+
{
70+
std::scoped_lock guard(m_mutex);
71+
return m_queue.size();
72+
}
73+
74+
[[nodiscard]] auto empty() const -> bool
75+
{
76+
std::scoped_lock guard(m_mutex);
77+
return m_queue.empty();
78+
}
79+
80+
void clear()
81+
{
82+
std::scoped_lock guard(m_mutex);
83+
std::queue<T>().swap(m_queue);
84+
m_condFull.notify_all();
85+
m_condEmpty.notify_all();
86+
}
87+
88+
[[nodiscard]] auto isStopped() const -> bool
89+
{
90+
std::scoped_lock guard(m_mutex);
91+
return m_stop;
92+
}
93+
94+
void start()
95+
{
96+
std::scoped_lock guard(m_mutex);
97+
m_stop = false;
98+
}
99+
100+
void stop()
101+
{
102+
{
103+
std::scoped_lock guard(m_mutex);
104+
m_stop = true;
105+
}
106+
m_condEmpty.notify_all();
107+
m_condFull.notify_all();
108+
}
109+
};

Thread/thread.hpp

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,82 @@
77
#include <chrono>
88
#include <condition_variable>
99
#include <functional>
10+
#include <iostream>
1011
#include <mutex>
1112
#include <thread>
1213

1314
class Thread : noncopyable
1415
{
1516
public:
16-
using Task = std::function<void()>;
17+
using Task = std::function<void(std::stop_token)>;
1718

1819
Thread() = default;
19-
explicit Thread(Task task) { setTask(std::move(task)); }
20+
21+
explicit Thread(Task task)
22+
: m_task(std::move(task))
23+
{}
24+
2025
~Thread() { stop(); }
2126

22-
void setTask(Task task) { m_task = std::move(task); }
27+
void setTask(Task task)
28+
{
29+
std::lock_guard<std::mutex> lock(m_mutex);
30+
m_task = std::move(task);
31+
}
2332

24-
void start()
33+
bool start()
2534
{
35+
if (m_jthread.joinable()) {
36+
return false;
37+
}
38+
2639
m_running.store(true);
27-
m_thread = std::thread([this]() {
28-
m_condition.notify_one();
29-
if (m_task) {
30-
m_task();
40+
m_jthread = std::jthread([this](std::stop_token token) {
41+
{
42+
std::unique_lock<std::mutex> lock(m_mutex);
43+
if (m_task) {
44+
try {
45+
m_task(token);
46+
} catch (const std::exception &e) {
47+
std::cerr << "Thread exception: " << e.what() << std::endl;
48+
}
49+
}
3150
}
51+
52+
m_running.store(false);
53+
m_running.notify_all();
3254
});
55+
56+
return true;
3357
}
3458

3559
void stop()
3660
{
37-
if (m_thread.joinable()) {
38-
m_thread.join();
61+
if (m_jthread.joinable()) {
62+
m_jthread.request_stop();
63+
m_jthread.join();
3964
}
65+
4066
m_running.store(false);
4167
}
4268

43-
void waitForStarted()
69+
void interrupt()
4470
{
45-
std::unique_lock<std::mutex> lock(m_mutex);
46-
m_condition.wait(lock, [this]() { return m_running.load(); });
71+
if (m_jthread.joinable()) {
72+
m_jthread.request_stop();
73+
}
74+
}
75+
76+
void waitForFinished()
77+
{
78+
while (m_running.load()) {
79+
m_running.wait(true);
80+
}
4781
}
4882

49-
[[nodiscard]] auto isRunning() const -> bool { return m_running; }
83+
[[nodiscard]] bool isRunning() const { return m_running.load(); }
5084

51-
[[nodiscard]] auto getThreadId() const -> std::thread::id { return m_thread.get_id(); }
85+
[[nodiscard]] std::thread::id getThreadId() const { return m_jthread.get_id(); }
5286

5387
static void yield() { std::this_thread::yield(); }
5488

@@ -64,15 +98,14 @@ class Thread : noncopyable
6498

6599
static auto hardwareConcurrency() -> unsigned int
66100
{
67-
auto availableCores = std::thread::hardware_concurrency(); // 如果不支持,返回0
101+
auto availableCores = std::thread::hardware_concurrency();
68102
assert(availableCores > 0);
69103
return availableCores;
70104
}
71105

72106
private:
73-
std::thread m_thread;
107+
std::jthread m_jthread;
74108
std::atomic_bool m_running{false};
75109
std::mutex m_mutex;
76-
std::condition_variable m_condition;
77110
Task m_task;
78111
};

Thread/thread_unittest.cc

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,69 @@
22

33
#include <gtest/gtest.h>
44

5-
TEST(Thread, start)
5+
void testTaskFunction(std::stop_token stopToken)
66
{
7-
Thread thread([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
7+
for (int i = 0; i < 5; ++i) {
8+
if (stopToken.stop_requested()) {
9+
break;
10+
}
11+
12+
std::cout << "Test task is running: " << i << std::endl;
13+
Thread::sleepFor(std::chrono::milliseconds(200));
14+
}
15+
}
16+
17+
TEST(ThreadTest, StartAndStop)
18+
{
19+
Thread thread;
20+
thread.setTask(testTaskFunction);
821
thread.start();
9-
thread.waitForStarted();
10-
EXPECT_TRUE(thread.isRunning());
1122
thread.stop();
1223
EXPECT_FALSE(thread.isRunning());
1324
}
1425

15-
TEST(Thread, setTask)
26+
TEST(ThreadTest, Interrupt)
1627
{
1728
Thread thread;
18-
thread.setTask([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
29+
thread.setTask(testTaskFunction);
1930
thread.start();
20-
thread.waitForStarted();
21-
EXPECT_TRUE(thread.isRunning());
22-
thread.stop();
31+
thread.interrupt();
32+
Thread::sleepFor(std::chrono::seconds(1));
33+
EXPECT_FALSE(thread.isRunning());
34+
}
35+
36+
TEST(ThreadTest, WaitForFinished)
37+
{
38+
Thread thread;
39+
thread.setTask(testTaskFunction);
40+
thread.start();
41+
thread.waitForFinished();
2342
EXPECT_FALSE(thread.isRunning());
2443
}
2544

26-
auto main(int argc, char *argv[]) -> int
45+
TEST(ThreadTest, WaitForFinished2)
46+
{
47+
Thread thread;
48+
thread.waitForFinished();
49+
EXPECT_FALSE(thread.isRunning());
50+
}
51+
52+
TEST(ThreadTest, TaskExecution)
53+
{
54+
Thread thread;
55+
bool taskExecuted = false;
56+
thread.setTask([&taskExecuted](std::stop_token) {
57+
taskExecuted = true;
58+
std::cout << "Task executed." << std::endl;
59+
});
60+
thread.start();
61+
thread.sleepFor(std::chrono::seconds(1));
62+
thread.stop();
63+
EXPECT_TRUE(taskExecuted);
64+
}
65+
66+
int main(int argc, char **argv)
2767
{
28-
testing::InitGoogleTest(&argc, argv);
68+
::testing::InitGoogleTest(&argc, argv);
2969
return RUN_ALL_TESTS();
3070
}

0 commit comments

Comments
 (0)