cpp-toolbox  0.0.1
A toolbox library for C++
Loading...
Searching...
No Matches
/home/runner/work/cpp-toolbox/cpp-toolbox/src/include/cpp-toolbox/base/thread_pool.hpp

支持任务窃取的简单 C++17 线程池实现/A simple C++17 thread pool implementation with basic work stealing

支持任务窃取的简单 C++17 线程池实现/A simple C++17 thread pool implementation with basic work stealing该线程池允许提交任务并异步获取结果。构造时创建固定数量的工作线程并为每个线程分配本地双端队列。 当本地队列为空时,线程会尝试从其他线程窃取任务。/This thread pool allows submitting tasks and asynchronously retrieving results. Each worker has a local deque and will try to steal tasks from others when idle.

// 创建一个默认线程数的线程池/Create a thread pool with default number of
threads thread_pool_t pool;
// 提交一个简单任务/Submit a simple task
auto future = pool.submit([]() { return 42; });
int result = future.get(); // result will be 42
// 提交带参数的任务/Submit a task with arguments
auto future2 = pool.submit([](int a, int b) { return a + b; }, 10, 20);
int result2 = future2.get(); // result2 will be 30
// 提交一个可能抛出异常的任务/Submit a task that may throw
auto future3 = pool.submit([]() {
throw std::runtime_error("Task failed");
});
try {
future3.get();
} catch(const std::exception& e) {
std::cerr << e.what() << std::endl;
}
#pragma once
#include <atomic> // 用于原子布尔标志/For atomic boolean flag
#include <deque> // 任务双端队列/For task deques
#include <functional> // 用于 std::function, std::bind/For std::function, std::bind
#include <future> // 用于异步任务结果/For asynchronous task results (std::future, std::packaged_task)
#include <iostream> // 用于标准输入输出/For std::cout, std::cerr
#include <memory> // 用于智能指针/For std::make_shared
#include <mutex> // 互斥锁/For std::mutex
#include <stdexcept> // 用于运行时异常/For runtime exceptions
#include <thread> // C++ 线程库/C++ thread library
#include <type_traits> // 用于类型特征/For std::invoke_result_t
#include <utility> // 用于完美转发和移动语义/For std::forward, std::move
#include <vector> // 用于存储工作线程/For storing worker threads
// 导出宏定义/Export macro definition
#include <cpp-toolbox/cpp-toolbox_export.hpp>
// 宏定义/Macro definitions
namespace toolbox::base
{
class CPP_TOOLBOX_EXPORT thread_pool_t
{
public:
explicit thread_pool_t(size_t threads = 0);
~thread_pool_t();
size_t get_thread_count() const { return workers_.size(); }
template<class F, class... Args>
auto submit(F&& f, Args&&... args)
-> std::future<typename std::invoke_result_t<F, Args...>>;
// 删除拷贝构造函数和拷贝赋值运算符以防止意外复制/Delete copy constructor and
// copy assignment operator to prevent accidental copying
CPP_TOOLBOX_DISABLE_COPY(thread_pool_t)
// 删除移动构造函数和移动赋值运算符以简化生命周期管理/Delete move constructor
// and move assignment operator to simplify lifecycle management
CPP_TOOLBOX_DISABLE_MOVE(thread_pool_t)
private:
// 工作线程列表/List of worker threads
std::vector<std::thread> workers_;
// 每个工作线程的任务双端队列/Per worker task deque
// 使用 shared_ptr 包装 deque 以避免复制问题
std::vector<std::shared_ptr<std::deque<std::unique_ptr<detail::task_base>>>> worker_queues_;
// 保护每个双端队列的互斥锁/Mutex protecting each deque
std::vector<std::unique_ptr<std::mutex>> queue_mutexes_;
// 提交任务时下一个目标线程索引/Next worker index for task submission
std::atomic<size_t> next_worker_ {0};
// 指示线程池是否应该停止的原子标志/Atomic flag indicating whether the thread
// pool should stop
std::atomic<bool> stop_;
// 工作线程主循环/Worker loop implementing work stealing
void worker_loop(size_t worker_id);
};
// --- 模板成员函数实现/Template Member Function Implementation ---
template<class F, class... Args>
auto thread_pool_t::submit(F&& f, Args&&... args)
-> std::future<typename std::invoke_result_t<F, Args...>>
{
using return_type = typename std::invoke_result_t<F, Args...>;
if (stop_.load(std::memory_order_relaxed)) {
throw std::runtime_error("Cannot submit task to stopped thread pool");
}
// 1. 创建 promise 并获取 future/Create promise and get future
auto promise = std::make_shared<std::promise<return_type>>();
std::future<return_type> future = promise->get_future();
// 2. 创建执行工作并设置 promise 的 lambda/Create the lambda that does the
// work and sets the promise
auto task_payload =
// 如果 lambda 需要修改其捕获的变量则使用 mutable/Use mutable if the
// lambda needs to modify its captures
[func = std::forward<F>(
f), // 移动/转发原始可调用对象/Move/forward the original callable
args_tuple = std::make_tuple(
std::forward<Args>(args)...), // 移动/转发参数/Move/forward args
promise_ptr = std::move(
promise)]() mutable { // 通过移动的 shared_ptr 捕获 promise/Capture
// promise by moved shared_ptr
try {
if constexpr (std::is_void_v<return_type>) {
std::apply(
func,
std::move(
args_tuple)); // 调用原始函数/Invoke original function
promise_ptr->set_value(); // 设置 void promise/Set void promise
} else {
return_type result = std::apply(
func,
std::move(
args_tuple)); // 调用原始函数/Invoke original function
promise_ptr->set_value(std::move(
result)); // 用结果设置 promise/Set promise with result
}
} catch (...) {
promise_ptr->set_exception(
std::current_exception()); // 在 promise 上设置异常/Set exception
// on promise
}
}; // lambda 结束/End of task_payload lambda
// 3. 使用派生类模板创建类型擦除的任务包装器/Create the type-erased task
// wrapper using the derived class template
// 获取任务负载 lambda 的具体类型/Get the concrete type of the task_payload
// lambda
using PayloadType = decltype(task_payload);
// 创建持有 task_derived<PayloadType> 的 unique_ptr<task_base>/Create
// unique_ptr<task_base> holding task_derived<PayloadType>
std::unique_ptr<detail::task_base> task_wrapper_ptr =
std::make_unique<detail::task_derived<PayloadType>>(
std::move(task_payload) // 将负载 lambda 移动到包装器中/Move the
// payload lambda into the wrapper
);
// 4. 将任务放入某个工作线程的本地队列/Push the task to a worker's local deque
size_t idx =
next_worker_.fetch_add(1, std::memory_order_relaxed) % workers_.size();
{
std::lock_guard<std::mutex> lock(*queue_mutexes_[idx]);
// 获取队列的引用并添加任务
auto& queue = *worker_queues_[idx];
// 确保使用 std::move 来移动 unique_ptr,避免复制
queue.emplace_back(std::move(task_wrapper_ptr));
}
// 5. 返回 future/Return the future
return future;
}
} // namespace toolbox::base
#define CPP_TOOLBOX_DISABLE_COPY(ClassType)
禁用类的拷贝操作 / Disable copy operations for a class
Definition class.hpp:15
#define CPP_TOOLBOX_DISABLE_MOVE(ClassType)
禁用类的移动操作 / Disable move operations for a class
Definition class.hpp:31
auto submit(F &&f, Args &&... args) -> std::future< typename std::invoke_result_t< F, Args... > >
Definition thread_pool.hpp:166
通用的编译器、平台、架构检测和实用宏定义 / Common macros for compiler, platform, architecture detection and utility macro...
提供基础 C++ 工具和组件。 Provides fundamental C++ utilities and components.