cpp-toolbox  0.0.1
A toolbox library for C++
Loading...
Searching...
No Matches
lock_free_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <algorithm> // For std::find
4#include <atomic> // For std::atomic
5#include <functional> // Include for std::function
6#include <memory> // For std::unique_ptr, std::allocator
7#include <mutex> // For protecting global HP list access (can be refined)
8#include <optional> // For std::optional (C++17)
9#include <thread> // For std::this_thread::yield
10#include <unordered_map> // For mapping thread IDs
11#include <vector>
12
13#include <cpp-toolbox/cpp-toolbox_export.hpp>
14
15#include "cpp-toolbox/macro.hpp"
16
17namespace toolbox::container
18{
19
20namespace detail
21{
22
31struct HPRec; // Forward declaration
32
33// 全局危险指针记录列表(每个使用队列的线程一个)/Global list of Hazard Pointer
34// Records (one per thread using the queue)
35// 警告:使用互斥锁保护这个全局列表会产生一个竞争点,违反了HP注册本身的"纯"无锁特性。更高级的方案使用无锁列表或小心使用线程本地存储。
36// (WARNING: Protecting this global list with a mutex adds a contention point,
37// violating the "pure" lock-free property for HP registration itself. More
38// advanced schemes use lock-free lists or thread-local storage carefully.)
39inline std::vector<HPRec*> g_hp_list;
40inline std::mutex g_hp_list_mutex;
41
42// 存储(节点指针,删除器函数)对/Store pairs of (node_pointer, deleter_function)
43inline thread_local std::vector<std::pair<void*, std::function<void(void*)>>>
45
46// 每个线程的最大危险指针数量(对于M&S队列,通常是2个HP)/Maximum hazard pointers
47// per thread (For M&S queue, 2 HPs are typical)
48constexpr size_t MAX_HAZARD_POINTERS_PER_THREAD = 2;
49
50// 每N次retire调用扫描一次retired列表/Scan retired list every N retire calls
51constexpr size_t RETIRE_SCAN_THRESHOLD = 100;
52
56struct HPRec
57{
58 std::atomic<std::thread::id> owner_thread_id;
61 nullptr; // 用于后续可能的无锁列表/For potential lock-free list later
62
70 : owner_thread_id(std::thread::id()) // 初始未被占用/Initially unowned
71 {
72 for (size_t i = 0; i < MAX_HAZARD_POINTERS_PER_THREAD; ++i) {
73 hazard_pointers[i].store(nullptr, std::memory_order_relaxed);
74 }
75 }
76};
77
93{
94 static thread_local HPRec* my_hp_rec = nullptr;
95 if (my_hp_rec)
96 return my_hp_rec;
97
98 // 首先尝试找到一个未使用的记录/Try to find an unused record first
99 {
100 std::lock_guard lock(g_hp_list_mutex);
101 for (HPRec* rec : g_hp_list) {
102 std::thread::id expected = std::thread::id();
103 if (rec->owner_thread_id.compare_exchange_strong(
104 expected, std::this_thread::get_id()))
105 {
106 my_hp_rec = rec;
107 return my_hp_rec;
108 }
109 }
110 }
111
112 // 如果没有未使用的记录,创建一个新的/If no unused record, create a new one
113 HPRec* new_rec = new HPRec(); // 如果线程不干净退出可能会泄露/Leak potential
114 // if thread exits uncleanly
115 new_rec->owner_thread_id.store(std::this_thread::get_id());
116
117 {
118 std::lock_guard lock(g_hp_list_mutex);
119 g_hp_list.push_back(new_rec);
120 }
121 my_hp_rec = new_rec;
122 return my_hp_rec;
123}
124
134inline void release_hp_record(HPRec* rec)
135{
136 if (!rec)
137 return;
138 rec->owner_thread_id.store(
139 std::thread::id()); // 标记为未占用/Mark as unowned
140 for (size_t i = 0; i < MAX_HAZARD_POINTERS_PER_THREAD; ++i) {
141 rec->hazard_pointers[i].store(nullptr, std::memory_order_relaxed);
142 }
143 // 注意:这里不从g_hp_list中移除以允许重用。真实系统需要仔细清理。
144 // (Note: We don't remove from g_hp_list here to allow reuse. A real system
145 // needs careful cleanup.)
146}
147
155inline void set_hazard_pointer(size_t index, void* ptr)
156{
157 HPRec* rec = acquire_hp_record();
158 if (index < MAX_HAZARD_POINTERS_PER_THREAD) {
159 rec->hazard_pointers[index].store(ptr, std::memory_order_release);
160 }
161}
162
169inline void clear_hazard_pointer(size_t index)
170{
171 set_hazard_pointer(index, nullptr);
172}
173
179{
180 // 1. 收集所有线程的活跃危险指针/Collect all active hazard pointers from all
181 // threads
182 std::vector<void*> active_hps;
183 {
184 std::lock_guard lock(g_hp_list_mutex); // 保护迭代/Protects iteration
185 for (const HPRec* rec : g_hp_list) {
186 // 检查记录是否真的被占用/Check if record is actually owned
187 if (rec->owner_thread_id.load(std::memory_order_acquire)
188 != std::thread::id())
189 {
190 for (size_t i = 0; i < MAX_HAZARD_POINTERS_PER_THREAD; ++i) {
191 void* hp = rec->hazard_pointers[i].load(std::memory_order_acquire);
192 if (hp) {
193 active_hps.push_back(hp);
194 }
195 }
196 }
197 }
198 }
199
200 // 2. 检查线程本地退休列表中的每个节点/Check each node in the thread-local
201 // retired list
202 // Use erase idiom for safe removal during iteration
203 for (auto it = t_retired_list.begin(); it != t_retired_list.end();
204 /* no increment here */)
205 {
206 void* node_to_check = it->first;
207 const auto& deleter = it->second; // 获取删除器/Get the deleter
208 bool is_hazardous = false;
209 // 检查收集的危险指针/Check against collected HPs
210 // Optimization: Sort active_hps first for faster lookup if needed
211 // std::sort(active_hps.begin(), active_hps.end());
212 // if (std::binary_search(active_hps.begin(), active_hps.end(),
213 // node_to_check)) Requires sorting active_hps first
214 for (void* active_hp : active_hps) { // Simple linear scan
215 if (node_to_check == active_hp) {
216 is_hazardous = true;
217 break;
218 }
219 }
220
221 // 如果在活跃HP中未找到,则可以安全删除/If not found in active HPs, it's safe
222 // to delete
223 if (!is_hazardous) {
224 // 使用存储的删除器安全删除/Safely delete using the stored deleter
225 deleter(node_to_check);
226 // Erase the current element and get iterator to the next
227 it = t_retired_list.erase(it);
228 } else {
229 ++it; // 保留到下次扫描, 移动到下一个 / Keep for next scan, move to next
230 }
231 }
232}
233
241inline void retire_node(void* node, std::function<void(void*)> deleter)
242{
243 if (!node)
244 return;
245 t_retired_list.emplace_back(
246 node, std::move(deleter)); // 存储节点和删除器/Store node and deleter
249 }
250}
251
261{
262 scan_retired_nodes(); // 尝试最后一次扫描/Try one last scan
263}
264
270{
271public:
278 HazardPointerGuard(size_t index, void* node)
279 : index_(index)
280 {
281 detail::set_hazard_pointer(index_, node);
282 }
283
288
293
294private:
295 size_t index_;
296};
297
298} // namespace detail
299
335template<typename T>
336class CPP_TOOLBOX_EXPORT lock_free_queue_t
337{
338private:
342 struct Node
343 {
344 T data; // 存储实际数据元素(移动进入)/Stores the actual data element (moved
345 // in)
346 std::atomic<Node*>
347 next; // 指向下一个节点的原子指针/Atomic pointer to the next node
348
352 Node(T&& d)
353 : data(std::move(d))
354 , next(nullptr)
355 {
356 }
357
361 Node()
362 : data()
363 , next(nullptr)
364 {
365 } // 需要T可默认构造以用于哑节点/Requires T to be default constructible for
366 // dummy
367 };
368
369 // 链表头尾的原子指针/Atomic pointers to the head and tail of the linked list
370 std::atomic<Node*> head_;
371 std::atomic<Node*> tail_;
372
377 static void retire_queue_node(Node* node)
378 {
379 detail::retire_node(static_cast<void*>(node),
380 [](void* n)
381 {
382 // 这个lambda捕获了正确删除Node的方式
383 // (This lambda captures the correct way to delete a
384 // Node)
385 delete static_cast<Node*>(n);
386 });
387 }
388
389public:
396 {
397 Node* dummy_node = new Node();
398 head_.store(dummy_node, std::memory_order_relaxed);
399 tail_.store(dummy_node, std::memory_order_relaxed);
400 }
401
416 {
417 // 基本清理 - 假设无并发访问/Basic cleanup - assumes no concurrent access
418 T ignored_value;
419 while (try_dequeue(ignored_value)) {
420 // 出队涉及退休节点,让HP系统稍后处理
421 // (Dequeue involves retiring nodes, let HP system handle them later)
422 };
423
424 // 尝试清理*这个*线程退休的节点
425 // (Attempt to clean up nodes retired by *this* thread)
426 detail::cleanup_retired_nodes();
427
428 // 删除初始哑节点(如果为空应该是head)
429 // (Delete the initial dummy node (should be the head if empty))
430 Node* dummy = head_.load(std::memory_order_relaxed);
431 if (dummy) {
432 delete dummy; // 如果HP系统未清理可能有潜在问题/Potential issue if HP
433 // system hasn't cleared it
434 }
435 }
436
437 // 禁用复制和移动以防止节点/指针的所有权问题
438 // (Disable copying and moving to prevent ownership issues with
439 // nodes/pointers)
441
442
458 void enqueue(T value)
459 {
460 Node* new_node = new Node(std::move(value));
461
462 while (true) {
463 Node* tail_snapshot = tail_.load(std::memory_order_acquire);
464 Node* next_snapshot = tail_snapshot->next.load(std::memory_order_acquire);
465
466 // 重新检查尾部一致性/Re-check tail consistency
467 if (tail_snapshot == tail_.load(std::memory_order_acquire)) {
468 if (next_snapshot == nullptr) {
469 // 尝试链接新节点/Try to link the new node
470 if (tail_snapshot->next.compare_exchange_weak(
471 next_snapshot,
472 new_node,
473 std::memory_order_release,
474 std::memory_order_relaxed))
475 {
476 // 尝试移动尾指针(可选,尽力而为)
477 // (Try to swing the tail pointer (optional, best effort))
478 tail_.compare_exchange_strong(tail_snapshot,
479 new_node,
480 std::memory_order_release,
481 std::memory_order_relaxed);
482 return; // 入队成功/Enqueue successful
483 }
484 } else {
485 // 尾部已前进,尝试帮助移动尾指针
486 // (Tail already advanced, try to help swing the tail pointer)
487 tail_.compare_exchange_strong(tail_snapshot,
488 next_snapshot,
489 std::memory_order_release,
490 std::memory_order_relaxed);
491 }
492 }
493 }
494 }
495
522 bool try_dequeue(T& result)
523 {
524 [[maybe_unused]] detail::HPRec* hp_rec =
525 detail::acquire_hp_record(); // 确保HP记录存在/Ensure HP record exists
526
527 while (true) {
528 // 获取危险指针/Acquire Hazard Pointers
529 Node* head_snapshot = head_.load(std::memory_order_acquire);
530 detail::set_hazard_pointer(0, head_snapshot);
531 // 验证设置HP后head未改变/Verify head hasn't changed after setting HP
532 if (head_snapshot != head_.load(std::memory_order_acquire)) {
533 continue; // Head改变,重试/Head changed, retry
534 }
535
536 Node* next_snapshot = head_snapshot->next.load(std::memory_order_acquire);
537 detail::set_hazard_pointer(1, next_snapshot);
538
539 // 设置HP后重新验证head和next指针
540 // (Re-verify head and next pointers after setting HPs)
541 if (head_snapshot != head_.load(std::memory_order_acquire)) {
542 continue; // Head改变,重试循环将重置HP/Head changed, retry loop will
543 // reset HPs
544 }
545 Node* current_next = head_snapshot->next.load(std::memory_order_acquire);
546 if (next_snapshot != current_next) {
547 continue;
548 }
549
550 Node* tail_snapshot = tail_.load(std::memory_order_acquire);
551
552 if (head_snapshot == tail_snapshot) {
553 // 队列为空或暂时不一致(入队进行中)
554 // (Queue is empty or transiently inconsistent (enqueue in progress))
555 if (next_snapshot == nullptr) {
556 detail::clear_hazard_pointer(
557 0); // 返回前清除HP/Clear HPs before returning
558 detail::clear_hazard_pointer(1);
559 return false; // 队列为空/Queue is empty
560 }
561 // 帮助前进尾部/Help advance tail
562 tail_.compare_exchange_strong(tail_snapshot,
563 next_snapshot,
564 std::memory_order_release,
565 std::memory_order_relaxed);
566 } else {
567 // 队列非空,尝试出队/Queue is not empty, try to dequeue
568 if (next_snapshot == nullptr) {
569 continue;
570 }
571
572 // 尝试前移head指针/Attempt to move the head pointer forward
573 if (head_.compare_exchange_weak(head_snapshot,
574 next_snapshot,
575 std::memory_order_release,
576 std::memory_order_relaxed))
577 {
578 result = std::move(
579 next_snapshot->data); // 数据在新head中/Data is in the NEW head
580
581 // 使用危险指针安全退休旧head节点
582 // (Retire the old head node safely using Hazard Pointers)
583 retire_queue_node(head_snapshot); // 退休旧的哑节点/数据节点/Retire
584 // the old dummy/data node
585
586 // 清除此操作的危险指针/Clear Hazard Pointers for this operation
587 detail::clear_hazard_pointer(0);
588 detail::clear_hazard_pointer(1);
589
590 return true; // 出队成功/Dequeue successful
591 }
592 }
593 }
594 // 清理HP(如果某种情况下未返回就退出循环,不应该发生)
595 // (Cleanup HPs if somehow exited loop without returning (shouldn't happen))
596 detail::clear_hazard_pointer(0);
597 detail::clear_hazard_pointer(1);
598 return false; // 应该不可达/Should be unreachable
599 }
600
622 std::optional<T> try_dequeue()
623 {
624 T result;
625 if (try_dequeue(result)) {
626 return std::optional<T>(std::move(result));
627 } else {
628 return std::nullopt;
629 }
630 }
631
665 {
666 // 调用detail实现来扫描当前线程的列表
667 // (Call the detail implementation which scans the current thread's list)
668 detail::cleanup_retired_nodes();
669 }
670};
671
672} // namespace toolbox::container
#define CPP_TOOLBOX_DISABLE_COPY_AND_MOVE(ClassType)
禁用类的拷贝和移动操作 / Disable both copy and move operations
Definition class.hpp:47
危险指针设置/清除的RAII助手类 (RAII helper for setting/clearing hazard pointers)
Definition lock_free_queue.hpp:270
HazardPointerGuard & operator=(HazardPointerGuard &&)=delete
~HazardPointerGuard()
析构函数 (Destructor)
Definition lock_free_queue.hpp:287
HazardPointerGuard(HazardPointerGuard &&)=delete
HazardPointerGuard(size_t index, void *node)
构造函数 (Constructor)
Definition lock_free_queue.hpp:278
HazardPointerGuard & operator=(const HazardPointerGuard &)=delete
HazardPointerGuard(const HazardPointerGuard &)=delete
Definition lock_free_queue.hpp:337
bool try_dequeue(T &result)
Definition lock_free_queue.hpp:522
std::optional< T > try_dequeue()
Definition lock_free_queue.hpp:622
~lock_free_queue_t()
销毁无锁队列 (Destroys the lock-free queue)
Definition lock_free_queue.hpp:415
static void cleanup_this_thread_retired_nodes()
Definition lock_free_queue.hpp:664
lock_free_queue_t()
构造无锁队列 (Constructs the lock-free queue)
Definition lock_free_queue.hpp:395
通用的编译器、平台、架构检测和实用宏定义 / Common macros for compiler, platform, architecture detection and utility macro...
constexpr size_t RETIRE_SCAN_THRESHOLD
Definition lock_free_queue.hpp:51
void scan_retired_nodes()
扫描已退休节点并删除安全的节点 (Scan retired nodes and delete safe ones)
Definition lock_free_queue.hpp:178
HPRec * acquire_hp_record()
Definition lock_free_queue.hpp:92
thread_local std::vector< std::pair< void *, std::function< void(void *)> > > t_retired_list
Definition lock_free_queue.hpp:44
void clear_hazard_pointer(size_t index)
清除当前线程的一个危险指针 (Clear a hazard pointer for the current thread)
Definition lock_free_queue.hpp:169
constexpr size_t MAX_HAZARD_POINTERS_PER_THREAD
Definition lock_free_queue.hpp:48
std::vector< HPRec * > g_hp_list
Definition lock_free_queue.hpp:39
std::mutex g_hp_list_mutex
Definition lock_free_queue.hpp:40
void release_hp_record(HPRec *rec)
释放危险指针记录 (Release a hazard pointer record)
Definition lock_free_queue.hpp:134
void cleanup_retired_nodes()
清理当前线程的所有剩余退休节点 (Clean up all remaining retired nodes for the current thread)
Definition lock_free_queue.hpp:260
void set_hazard_pointer(size_t index, void *ptr)
为当前线程设置一个危险指针 (Set a hazard pointer for the current thread)
Definition lock_free_queue.hpp:155
void retire_node(void *node, std::function< void(void *)> deleter)
退休一个节点(添加到线程本地列表,偶尔触发扫描) (Retire a node - add to thread-local list, trigger scan occasionally)
Definition lock_free_queue.hpp:241
Definition concurrent_queue.hpp:16
危险指针记录结构体 (Hazard Pointer Record structure)
Definition lock_free_queue.hpp:57
HPRec()
构造函数 (Constructor)
Definition lock_free_queue.hpp:69
HPRec * next
Definition lock_free_queue.hpp:60
std::atomic< void * > hazard_pointers[MAX_HAZARD_POINTERS_PER_THREAD]
Definition lock_free_queue.hpp:59
std::atomic< std::thread::id > owner_thread_id
Definition lock_free_queue.hpp:58