Skip to content

Commit 9fe9d1f

Browse files
committed
[线程类与线程池重构]:切换至 std::jthread 并优化任务队列管理
- **线程类升级**:将原本基于 `std::thread` 的实现切换为使用 C++20 新引入的 `std::jthread`,新增对任务取消和中断的支持,提升线程管理的灵活性与安全性 - **任务队列优化**:新增独立的 `queue.hpp` 文件,实现线程安全的生产者-消费者队列类,支持任务数量限制、动态调整队列大小,并提供更精细的同步控制 - **线程池重构**:基于新的队列类重写了线程池逻辑,简化任务添加流程,优化任务分配机制,提升并发处理效率 - **跨平台适配**:调整 CMake 配置,在 Windows 平台启用线程模块,Linux 平台保留原有网络相关子模块的构建逻辑 - **文档更新**:修改 README 中线程模块的描述,明确指出基于 `std::jthread` 实现,并标注 Apple Clang 编译器暂不支持该特性 - **测试增强**:更新线程和线程池的单元测试用例,增加对任务中断、超时等待等新功能的测试覆盖
1 parent 447c703 commit 9fe9d1f

File tree

8 files changed

+290
-101
lines changed

8 files changed

+290
-101
lines changed

CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 3.25.1)
22

33
include(cmake/vcpkg.cmake)
44

5-
# 设定工程名
65
project(
76
Cpp-Examples
87
VERSION 0.0.1
@@ -69,12 +68,14 @@ add_subdirectory(MonitorDir_EFSW)
6968
add_subdirectory(OpenSSL)
7069
add_subdirectory(SafeCallback)
7170
add_subdirectory(SpinMutex)
72-
add_subdirectory(Thread)
7371

74-
if(CMAKE_HOST_LINUX)
72+
if(CMAKE_HOST_WIN32)
73+
add_subdirectory(Thread)
74+
elseif(CMAKE_HOST_LINUX)
7575
add_subdirectory(Client)
7676
add_subdirectory(Icmp)
7777
add_subdirectory(Server)
78+
add_subdirectory(Thread)
7879
endif()
7980

8081
include(cmake/build_info.cmake)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@
5151
2. [server_poll](/Server/server_poll.cc)——poll的例子;
5252
3. [server_select](/Server/server_select.cc)——select的例子;
5353
23. [SpinMutex](/SpinMutex)——使用std::atomic_flag实现的简单互斥锁和自旋锁;
54-
24. [Thread](/Thread/)——基于std::thread实现的线程类,包括线程池;
54+
24. [Thread](/Thread/)——基于std::jthread实现的线程类,包括线程池`Apple Clang`不支持)
5555
1. [Thread](/Thread/thread.hpp)——线程类;
5656
2. [ThreadPool](/Thread/threadpool.hpp)——线程池;

Thread/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ target_link_libraries(thread_unittest PRIVATE GTest::gtest GTest::gtest_main
33
GTest::gmock GTest::gmock_main)
44
add_test(NAME thread_unittest COMMAND thread_unittest)
55

6-
add_executable(threadpool_unittest threadpool_unittest.cc threadpool.hpp
7-
thread.hpp)
6+
add_executable(threadpool_unittest queue.hpp threadpool_unittest.cc
7+
threadpool.hpp thread.hpp)
88
target_link_libraries(
99
threadpool_unittest PRIVATE GTest::gtest GTest::gtest_main GTest::gmock
1010
GTest::gmock_main)

Thread/queue.hpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#pragma once
2+
3+
#include <utils/object.hpp>
4+
5+
#include <condition_variable>
6+
#include <mutex>
7+
#include <utility>
8+
#include <queue>
9+
10+
template<typename T>
11+
requires std::is_move_assignable_v<T>
12+
class Queue : noncopyable
13+
{
14+
std::queue<T> m_queue;
15+
bool m_stop = false;
16+
mutable std::mutex m_mutex;
17+
std::condition_variable m_condEmpty;
18+
std::condition_variable m_condFull;
19+
std::size_t m_maxSize = 0;
20+
21+
public:
22+
Queue() = default;
23+
explicit Queue(std::size_t maxSize)
24+
: m_maxSize(maxSize == 0 ? 1 : maxSize)
25+
{}
26+
~Queue() { stop(); }
27+
28+
[[nodiscard]] auto push(T &&item) -> bool
29+
{
30+
{
31+
std::unique_lock lock(m_mutex);
32+
m_condFull.wait(lock, [&]() { return !m_stop && (m_queue.size() < m_maxSize); });
33+
if (m_stop) {
34+
return false;
35+
}
36+
m_queue.push(std::move(item));
37+
}
38+
m_condEmpty.notify_one();
39+
return true;
40+
}
41+
42+
[[nodiscard]] auto pop(T &item) -> bool
43+
{
44+
std::unique_lock lock(m_mutex);
45+
m_condEmpty.wait(lock, [&]() { return !m_queue.empty() || m_stop; });
46+
if (m_queue.empty()) {
47+
return false;
48+
}
49+
item = std::move(m_queue.front());
50+
m_queue.pop();
51+
m_condFull.notify_one();
52+
return true;
53+
}
54+
55+
void setMaxSize(std::size_t maxSize)
56+
{
57+
std::scoped_lock guard(m_mutex);
58+
m_maxSize = maxSize == 0 ? 1 : maxSize;
59+
m_condFull.notify_all();
60+
}
61+
62+
[[nodiscard]] auto getMaxSize() const -> std::size_t
63+
{
64+
std::scoped_lock guard(m_mutex);
65+
return m_maxSize;
66+
}
67+
68+
[[nodiscard]] auto size() const -> std::size_t
69+
{
70+
std::scoped_lock guard(m_mutex);
71+
return m_queue.size();
72+
}
73+
74+
[[nodiscard]] auto empty() const -> bool
75+
{
76+
std::scoped_lock guard(m_mutex);
77+
return m_queue.empty();
78+
}
79+
80+
void clear()
81+
{
82+
std::scoped_lock guard(m_mutex);
83+
std::queue<T>().swap(m_queue);
84+
m_condFull.notify_all();
85+
m_condEmpty.notify_all();
86+
}
87+
88+
[[nodiscard]] auto isStopped() const -> bool
89+
{
90+
std::scoped_lock guard(m_mutex);
91+
return m_stop;
92+
}
93+
94+
void start()
95+
{
96+
std::scoped_lock guard(m_mutex);
97+
m_stop = false;
98+
}
99+
100+
void stop()
101+
{
102+
{
103+
std::scoped_lock guard(m_mutex);
104+
m_stop = true;
105+
}
106+
m_condEmpty.notify_all();
107+
m_condFull.notify_all();
108+
}
109+
};

Thread/thread.hpp

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,83 @@
77
#include <chrono>
88
#include <condition_variable>
99
#include <functional>
10+
#include <iostream>
1011
#include <mutex>
1112
#include <thread>
1213

1314
class Thread : noncopyable
1415
{
1516
public:
16-
using Task = std::function<void()>;
17+
using Task = std::function<void(std::stop_token)>;
1718

1819
Thread() = default;
19-
explicit Thread(Task task) { setTask(std::move(task)); }
20+
21+
explicit Thread(Task task)
22+
: m_task(std::move(task))
23+
{}
24+
2025
~Thread() { stop(); }
2126

22-
void setTask(Task task) { m_task = std::move(task); }
27+
void setTask(Task task)
28+
{
29+
std::lock_guard<std::mutex> lock(m_mutex);
30+
m_task = std::move(task);
31+
}
2332

24-
void start()
33+
bool start()
2534
{
35+
if (m_jthread.joinable()) {
36+
return false; // Thread is already running
37+
}
38+
2639
m_running.store(true);
27-
m_thread = std::thread([this]() {
40+
m_jthread = std::jthread([this](std::stop_token token) {
2841
m_condition.notify_one();
2942
if (m_task) {
30-
m_task();
43+
try {
44+
m_task(token);
45+
} catch (const std::exception &e) {
46+
std::cerr << "Thread exception: " << e.what() << std::endl;
47+
}
3148
}
49+
50+
m_running.store(false);
3251
});
52+
53+
return true; // Thread started successfully
3354
}
3455

3556
void stop()
3657
{
37-
if (m_thread.joinable()) {
38-
m_thread.join();
58+
if (m_jthread.joinable()) {
59+
m_jthread.request_stop();
60+
m_jthread.join();
3961
}
62+
4063
m_running.store(false);
4164
}
4265

43-
void waitForStarted()
66+
void interrupt()
67+
{
68+
if (m_jthread.joinable()) {
69+
m_jthread.request_stop();
70+
std::cout << "Thread interrupted: " << m_jthread.get_id() << std::endl;
71+
}
72+
}
73+
74+
void waitForStarted(std::optional<std::chrono::milliseconds> timeout = std::nullopt)
4475
{
4576
std::unique_lock<std::mutex> lock(m_mutex);
46-
m_condition.wait(lock, [this]() { return m_running.load(); });
77+
if (timeout.has_value()) {
78+
m_condition.wait_for(lock, timeout.value(), [this]() { return m_running.load(); });
79+
} else {
80+
m_condition.wait(lock, [this]() { return m_running.load(); });
81+
}
4782
}
4883

49-
[[nodiscard]] auto isRunning() const -> bool { return m_running; }
84+
[[nodiscard]] bool isRunning() const { return m_running.load(); }
5085

51-
[[nodiscard]] auto getThreadId() const -> std::thread::id { return m_thread.get_id(); }
86+
[[nodiscard]] std::thread::id getThreadId() const { return m_jthread.get_id(); }
5287

5388
static void yield() { std::this_thread::yield(); }
5489

@@ -64,13 +99,13 @@ class Thread : noncopyable
6499

65100
static auto hardwareConcurrency() -> unsigned int
66101
{
67-
auto availableCores = std::thread::hardware_concurrency(); // 如果不支持,返回0
102+
auto availableCores = std::thread::hardware_concurrency();
68103
assert(availableCores > 0);
69104
return availableCores;
70105
}
71106

72107
private:
73-
std::thread m_thread;
108+
std::jthread m_jthread;
74109
std::atomic_bool m_running{false};
75110
std::mutex m_mutex;
76111
std::condition_variable m_condition;

Thread/thread_unittest.cc

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,66 @@
22

33
#include <gtest/gtest.h>
44

5-
TEST(Thread, start)
5+
void testTaskFunction(std::stop_token stopToken)
66
{
7-
Thread thread([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
7+
for (int i = 0; i < 5; ++i) {
8+
if (stopToken.stop_requested()) {
9+
break;
10+
}
11+
12+
std::cout << "Test task is running: " << i << std::endl;
13+
Thread::sleepFor(std::chrono::milliseconds(200));
14+
}
15+
}
16+
17+
TEST(ThreadTest, StartAndStop)
18+
{
19+
Thread thread;
20+
thread.setTask(testTaskFunction);
821
thread.start();
922
thread.waitForStarted();
10-
EXPECT_TRUE(thread.isRunning());
1123
thread.stop();
1224
EXPECT_FALSE(thread.isRunning());
1325
}
1426

15-
TEST(Thread, setTask)
27+
TEST(ThreadTest, Interrupt)
1628
{
1729
Thread thread;
18-
thread.setTask([]() { Thread::sleepFor(std::chrono::milliseconds(100)); });
30+
thread.setTask(testTaskFunction);
1931
thread.start();
2032
thread.waitForStarted();
33+
thread.interrupt();
34+
Thread::sleepFor(std::chrono::seconds(1));
35+
EXPECT_FALSE(thread.isRunning());
36+
}
37+
38+
TEST(ThreadTest, WaitForStarted)
39+
{
40+
Thread thread;
41+
thread.setTask(testTaskFunction);
42+
thread.start();
43+
thread.waitForStarted(std::chrono::milliseconds(500));
2144
EXPECT_TRUE(thread.isRunning());
2245
thread.stop();
23-
EXPECT_FALSE(thread.isRunning());
2446
}
2547

26-
auto main(int argc, char *argv[]) -> int
48+
TEST(ThreadTest, TaskExecution)
49+
{
50+
Thread thread;
51+
bool taskExecuted = false;
52+
thread.setTask([&taskExecuted](std::stop_token) {
53+
taskExecuted = true;
54+
std::cout << "Task executed." << std::endl;
55+
});
56+
thread.start();
57+
thread.waitForStarted();
58+
thread.sleepFor(std::chrono::seconds(1));
59+
thread.stop();
60+
EXPECT_TRUE(taskExecuted);
61+
}
62+
63+
int main(int argc, char **argv)
2764
{
28-
testing::InitGoogleTest(&argc, argv);
65+
::testing::InitGoogleTest(&argc, argv);
2966
return RUN_ALL_TESTS();
3067
}

0 commit comments

Comments
 (0)