cpp-toolbox  0.0.1
A toolbox library for C++
Loading...
Searching...
No Matches
parallel_tbb.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <algorithm> // for std::min
4#include <cmath> // for std::ceil
5#include <future>
6#include <iterator>
7#include <numeric> // for std::accumulate (in reduce example)
8#include <stdexcept> // for exceptions
9#include <vector>
10
11// TBB 头文件
13#include <cpp-toolbox/cpp-toolbox_export.hpp>
14#include <tbb/blocked_range.h>
15#include <tbb/parallel_for.h>
16#include <tbb/parallel_for_each.h>
17#include <tbb/parallel_reduce.h>
18#include <tbb/parallel_scan.h>
19#include <tbb/parallel_sort.h>
20
21namespace toolbox::concurrent
22{
23
24// 声明 default_pool 函数
25inline base::thread_pool_singleton_t& default_pool();
26
36template<typename Iterator, typename Function>
37void parallel_for_each(Iterator begin, Iterator end, Function func)
38{
39 using traits = std::iterator_traits<Iterator>;
40 static_assert(std::is_base_of_v<std::random_access_iterator_tag,
41 typename traits::iterator_category>,
42 "parallel_for_each currently requires random access iterators "
43 "for efficient chunking.");
44
45 const auto total_size = std::distance(begin, end);
46 if (total_size <= 0) {
47 return;
48 }
49
50 // 使用TBB的parallel_for_each直接处理
51 tbb::parallel_for_each(begin, end, func);
52}
53
65template<typename InputIt, typename OutputIt, typename UnaryOperation>
66void parallel_transform(InputIt first1,
67 InputIt last1,
68 OutputIt d_first,
69 UnaryOperation unary_op)
70{
71 using InputTraits = std::iterator_traits<InputIt>;
72 using OutputTraits = std::iterator_traits<OutputIt>;
73
74 static_assert(
75 std::is_base_of_v<std::random_access_iterator_tag,
76 typename InputTraits::iterator_category>,
77 "parallel_transform currently requires random access input iterators.");
78 static_assert(
79 std::is_base_of_v<std::random_access_iterator_tag,
80 typename OutputTraits::iterator_category>,
81 "parallel_transform currently requires random access output iterators.");
82
83 const auto total_size = std::distance(first1, last1);
84 if (total_size <= 0) {
85 return;
86 }
87
88 // 使用TBB的parallel_for实现transform
89 tbb::parallel_for(
90 tbb::blocked_range<size_t>(0, static_cast<size_t>(total_size)),
91 [&](const tbb::blocked_range<size_t>& range)
92 {
93 for (size_t i = range.begin(); i != range.end(); ++i) {
94 *(d_first + static_cast<typename OutputTraits::difference_type>(i)) =
95 unary_op(
96 *(first1
97 + static_cast<typename InputTraits::difference_type>(i)));
98 }
99 });
100}
101
114template<typename Iterator, typename T, typename BinaryOperation>
115T parallel_reduce(Iterator begin,
116 Iterator end,
117 T identity,
118 BinaryOperation reduce_op)
119{
120 using traits = std::iterator_traits<Iterator>;
121 static_assert(std::is_base_of_v<std::random_access_iterator_tag,
122 typename traits::iterator_category>,
123 "parallel_reduce currently requires random access iterators "
124 "for efficient chunking.");
125
126 const auto total_size = std::distance(begin, end);
127 if (total_size <= 0) {
128 return identity;
129 }
130
131 // 使用TBB的parallel_reduce
132 return tbb::parallel_reduce(
133 tbb::blocked_range<size_t>(0, static_cast<size_t>(total_size)),
134 identity,
135 [&](const tbb::blocked_range<size_t>& range, T init)
136 {
137 for (size_t i = range.begin(); i != range.end(); ++i) {
138 init = reduce_op(
139 init,
140 *(begin + static_cast<typename traits::difference_type>(i)));
141 }
142 return init;
143 },
144 reduce_op);
145}
146
161template<typename InputIt,
162 typename OutputIt,
163 typename T,
164 typename BinaryOperation>
165void parallel_inclusive_scan(InputIt first,
166 InputIt last,
167 OutputIt d_first,
168 T init,
169 BinaryOperation binary_op,
170 [[maybe_unused]] T identity)
171{
172 using traits = std::iterator_traits<InputIt>;
173 static_assert(std::is_base_of_v<std::random_access_iterator_tag,
174 typename traits::iterator_category>,
175 "parallel_inclusive_scan requires random access iterators.");
176
177 const auto total_size = std::distance(first, last);
178 if (total_size <= 0) {
179 return;
180 }
181
182 // 使用TBB的parallel_scan实现inclusive_scan
183 class ScanBody
184 {
185 InputIt input_begin_;
186 OutputIt output_begin_;
187 T init_;
188 BinaryOperation binary_op_;
189 T sum_;
190 bool is_final_scan_;
191
192 public:
193 ScanBody(InputIt input_begin,
194 OutputIt output_begin,
195 T init,
196 BinaryOperation binary_op)
197 : input_begin_(input_begin)
198 , output_begin_(output_begin)
199 , init_(init)
200 , binary_op_(binary_op)
201 , sum_(init)
202 , is_final_scan_(false)
203 {
204 }
205
206 // 拷贝构造函数,用于TBB内部分割
207 ScanBody(const ScanBody& other, tbb::split)
208 : input_begin_(other.input_begin_)
209 , output_begin_(other.output_begin_)
210 , init_(other.init_)
211 , binary_op_(other.binary_op_)
212 , sum_(other.init_)
213 , is_final_scan_(false)
214 {
215 }
216
217 // 预扫描,计算范围内的累积和
218 void pre_scan(const tbb::blocked_range<size_t>& range, T& sum)
219 {
220 T temp = sum;
221 for (size_t i = range.begin(); i != range.end(); ++i) {
222 temp = binary_op_(
223 temp,
224 *(input_begin_ + static_cast<typename traits::difference_type>(i)));
225 }
226 sum = temp;
227 }
228
229 // 最终扫描,计算并存储结果
230 void final_scan(const tbb::blocked_range<size_t>& range, T& sum)
231 {
232 T temp = sum;
233 for (size_t i = range.begin(); i != range.end(); ++i) {
234 temp = binary_op_(
235 temp,
236 *(input_begin_ + static_cast<typename traits::difference_type>(i)));
237 *(output_begin_ + static_cast<typename traits::difference_type>(i)) =
238 temp;
239 }
240 sum = temp;
241 }
242
243 // 合并两个body
244 void reverse_join(ScanBody& left) { sum_ = binary_op_(left.sum_, sum_); }
245
246 // 根据is_final_scan_标志选择执行pre_scan或final_scan
247 void operator()(const tbb::blocked_range<size_t>& range,
248 tbb::pre_scan_tag /* tag */)
249 {
250 pre_scan(range, sum_);
251 }
252
253 void operator()(const tbb::blocked_range<size_t>& range,
254 tbb::final_scan_tag /* tag */)
255 {
256 final_scan(range, sum_);
257 }
258
259 // 设置为最终扫描模式
260 void assign(T value) { sum_ = value; }
261 };
262
263 ScanBody body(first, d_first, init, binary_op);
264 tbb::parallel_scan(
265 tbb::blocked_range<size_t>(0, static_cast<size_t>(total_size)), body);
266}
267
277template<typename RandomIt, typename Compare>
278void parallel_merge_sort(RandomIt begin, RandomIt end, Compare comp)
279{
280 const auto total_size = std::distance(begin, end);
281 if (total_size <= 1) {
282 return;
283 }
284
285 // 直接使用TBB的parallel_sort
286 tbb::parallel_sort(begin, end, comp);
287}
288
299template<typename RandomIt, typename Compare>
300void parallel_tim_sort(RandomIt begin, RandomIt end, Compare comp)
301{
302 const auto total_size = std::distance(begin, end);
303 if (total_size <= 1) {
304 return;
305 }
306
307 // 直接使用TBB的parallel_sort,它已经是高度优化的并行排序算法
308 tbb::parallel_sort(begin, end, comp);
309}
310
311} // namespace toolbox::concurrent
Definition parallel_raw.hpp:15
void parallel_merge_sort(RandomIt begin, RandomIt end, Compare comp)
使用TBB实现并行合并排序
Definition parallel_raw.hpp:309
void parallel_transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
使用TBB并行转换范围[first1, last1)中的元素并存储到从d_first开始的范围
Definition parallel_raw.hpp:70
void parallel_inclusive_scan(InputIt first, InputIt last, OutputIt d_first, T init, BinaryOperation binary_op, T identity)
使用TBB执行并行包含扫描操作
Definition parallel_raw.hpp:201
void parallel_for_each(Iterator begin, Iterator end, Function func)
使用TBB并行对范围[begin, end)中的每个元素应用函数
Definition parallel_raw.hpp:21
void parallel_tim_sort(RandomIt begin, RandomIt end, Compare comp)
使用TBB实现并行TimSort
Definition parallel_raw.hpp:380
base::thread_pool_singleton_t & default_pool()
获取默认线程池实例/Get the default thread pool instance
Definition parallel.hpp:22
T parallel_reduce(Iterator begin, Iterator end, T identity, BinaryOperation reduce_op)
使用TBB对范围[begin, end)执行并行归约操作
Definition parallel_raw.hpp:129
TContainer::value_type sum(const TContainer &data)
计算容器中元素的总和(返回容器元素类型)/Compute the sum of elements in the container (returns container element type)
Definition statistics.hpp:288
TContainer::value_type range(const TContainer &data)
计算容器中元素的全距(最大值-最小值)/Compute the range (max - min) of elements in the container
Definition statistics.hpp:402