cpp-toolbox  0.0.1
A toolbox library for C++
Loading...
Searching...
No Matches
thread_pool.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <atomic> // 用于原子布尔标志/For atomic boolean flag
4#include <deque> // 任务双端队列/For task deques
5#include <functional> // 用于 std::function, std::bind/For std::function, std::bind
6#include <future> // 用于异步任务结果/For asynchronous task results (std::future, std::packaged_task)
7#include <iostream> // 用于标准输入输出/For std::cout, std::cerr
8#include <memory> // 用于智能指针/For std::make_shared
9#include <mutex> // 互斥锁/For std::mutex
10#include <stdexcept> // 用于运行时异常/For runtime exceptions
11#include <thread> // C++ 线程库/C++ thread library
12#include <type_traits> // 用于类型特征/For std::invoke_result_t
13#include <utility> // 用于完美转发和移动语义/For std::forward, std::move
14#include <vector> // 用于存储工作线程/For storing worker threads
15
17// 导出宏定义/Export macro definition
18#include <cpp-toolbox/cpp-toolbox_export.hpp>
19// 宏定义/Macro definitions
20#include <cpp-toolbox/macro.hpp>
21
22namespace toolbox::base
23{
24
60{
61public:
81 explicit thread_pool_t(size_t threads = 0);
82
93
98 size_t get_thread_count() const { return workers_.size(); }
99
134 template<class F, class... Args>
135 auto submit(F&& f, Args&&... args)
136 -> std::future<typename std::invoke_result_t<F, Args...>>;
137
138 // 删除拷贝构造函数和拷贝赋值运算符以防止意外复制/Delete copy constructor and
139 // copy assignment operator to prevent accidental copying
141 // 删除移动构造函数和移动赋值运算符以简化生命周期管理/Delete move constructor
142 // and move assignment operator to simplify lifecycle management
144
145private:
146 // 工作线程列表/List of worker threads
147 std::vector<std::thread> workers_;
148 // 每个工作线程的任务双端队列/Per worker task deque
149 // 使用 shared_ptr 包装 deque 以避免复制问题
150 std::vector<std::shared_ptr<std::deque<std::unique_ptr<detail::task_base>>>> worker_queues_;
151 // 保护每个双端队列的互斥锁/Mutex protecting each deque
152 std::vector<std::unique_ptr<std::mutex>> queue_mutexes_;
153 // 提交任务时下一个目标线程索引/Next worker index for task submission
154 std::atomic<size_t> next_worker_ {0};
155 // 指示线程池是否应该停止的原子标志/Atomic flag indicating whether the thread
156 // pool should stop
157 std::atomic<bool> stop_;
158
159 // 工作线程主循环/Worker loop implementing work stealing
160 void worker_loop(size_t worker_id);
161};
162
163// --- 模板成员函数实现/Template Member Function Implementation ---
164
165template<class F, class... Args>
167 -> std::future<typename std::invoke_result_t<F, Args...>>
168{
169 using return_type = typename std::invoke_result_t<F, Args...>;
170
171 if (stop_.load(std::memory_order_relaxed)) {
172 throw std::runtime_error("Cannot submit task to stopped thread pool");
173 }
174
175 // 1. 创建 promise 并获取 future/Create promise and get future
176 auto promise = std::make_shared<std::promise<return_type>>();
177 std::future<return_type> future = promise->get_future();
178
179 // 2. 创建执行工作并设置 promise 的 lambda/Create the lambda that does the
180 // work and sets the promise
181 auto task_payload =
182 // 如果 lambda 需要修改其捕获的变量则使用 mutable/Use mutable if the
183 // lambda needs to modify its captures
184 [func = std::forward<F>(
185 f), // 移动/转发原始可调用对象/Move/forward the original callable
186 args_tuple = std::make_tuple(
187 std::forward<Args>(args)...), // 移动/转发参数/Move/forward args
188 promise_ptr = std::move(
189 promise)]() mutable { // 通过移动的 shared_ptr 捕获 promise/Capture
190 // promise by moved shared_ptr
191 try {
192 if constexpr (std::is_void_v<return_type>) {
193 std::apply(
194 func,
195 std::move(
196 args_tuple)); // 调用原始函数/Invoke original function
197 promise_ptr->set_value(); // 设置 void promise/Set void promise
198 } else {
199 return_type result = std::apply(
200 func,
201 std::move(
202 args_tuple)); // 调用原始函数/Invoke original function
203 promise_ptr->set_value(std::move(
204 result)); // 用结果设置 promise/Set promise with result
205 }
206 } catch (...) {
207 promise_ptr->set_exception(
208 std::current_exception()); // 在 promise 上设置异常/Set exception
209 // on promise
210 }
211 }; // lambda 结束/End of task_payload lambda
212
213 // 3. 使用派生类模板创建类型擦除的任务包装器/Create the type-erased task
214 // wrapper using the derived class template
215 // 获取任务负载 lambda 的具体类型/Get the concrete type of the task_payload
216 // lambda
217 using PayloadType = decltype(task_payload);
218 // 创建持有 task_derived<PayloadType> 的 unique_ptr<task_base>/Create
219 // unique_ptr<task_base> holding task_derived<PayloadType>
220 std::unique_ptr<detail::task_base> task_wrapper_ptr =
221 std::make_unique<detail::task_derived<PayloadType>>(
222 std::move(task_payload) // 将负载 lambda 移动到包装器中/Move the
223 // payload lambda into the wrapper
224 );
225
226 // 4. 将任务放入某个工作线程的本地队列/Push the task to a worker's local deque
227 size_t idx =
228 next_worker_.fetch_add(1, std::memory_order_relaxed) % workers_.size();
229 {
230 std::lock_guard<std::mutex> lock(*queue_mutexes_[idx]);
231 // 获取队列的引用并添加任务
232 auto& queue = *worker_queues_[idx];
233 // 确保使用 std::move 来移动 unique_ptr,避免复制
234 queue.emplace_back(std::move(task_wrapper_ptr));
235 }
236
237 // 5. 返回 future/Return the future
238 return future;
239}
240} // namespace toolbox::base
#define CPP_TOOLBOX_DISABLE_COPY(ClassType)
禁用类的拷贝操作 / Disable copy operations for a class
Definition class.hpp:15
#define CPP_TOOLBOX_DISABLE_MOVE(ClassType)
禁用类的移动操作 / Disable move operations for a class
Definition class.hpp:31
Definition object_pool.hpp:100
Definition thread_pool.hpp:60
auto submit(F &&f, Args &&... args) -> std::future< typename std::invoke_result_t< F, Args... > >
Definition thread_pool.hpp:166
~thread_pool_t()
析构函数,停止线程池并等待所有工作线程完成/Destructor that stops the thread pool and waits for all worker threads to finis...
size_t get_thread_count() const
获取线程池中的工作线程数量/Get the number of worker threads
Definition thread_pool.hpp:98
thread_pool_t(size_t threads=0)
通用的编译器、平台、架构检测和实用宏定义 / Common macros for compiler, platform, architecture detection and utility macro...
提供基础 C++ 工具和组件。 Provides fundamental C++ utilities and components.