cpp-toolbox  0.0.1
A toolbox library for C++
Loading...
Searching...
No Matches
parallel_raw.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <algorithm> // for std::min
4#include <cmath> // for std::ceil
5#include <future>
6#include <iterator>
7#include <numeric> // for std::accumulate (in reduce example)
8#include <stdexcept> // for exceptions
9#include <vector>
10
12#include <cpp-toolbox/cpp-toolbox_export.hpp>
13
15{
16
17// 声明 default_pool 函数
19
20template<typename Iterator, typename Function>
21void parallel_for_each(Iterator begin, Iterator end, Function func)
22{
23 using traits = std::iterator_traits<Iterator>;
24 static_assert(std::is_base_of<std::random_access_iterator_tag,
25 typename traits::iterator_category>::value,
26 "parallel_for_each currently requires random access iterators "
27 "for efficient chunking.");
28
29 const auto total_size = std::distance(begin, end);
30 if (total_size <= 0) {
31 return;
32 }
33
34 auto& pool = default_pool();
35 const size_t num_threads = pool.get_thread_count();
36 const size_t min_chunk_size = 1;
37 const size_t hardware_threads =
38 std::max(1u, std::thread::hardware_concurrency());
39 const size_t num_tasks = std::max(num_threads, hardware_threads);
40
41 size_t chunk_size =
42 std::max(min_chunk_size,
43 static_cast<size_t>(
44 std::ceil(static_cast<double>(total_size) / num_tasks)));
45
46 std::vector<std::future<void>> futures;
47 futures.reserve(num_tasks);
48
49 for (size_t i = 0; i < total_size; i += chunk_size) {
50 size_t end_idx = std::min(i + chunk_size, static_cast<size_t>(total_size));
51 futures.emplace_back(pool.submit(
52 [begin, i, end_idx, func]() mutable
53 {
54 auto chunk_begin = std::next(begin, static_cast<long>(i));
55 auto chunk_end = std::next(begin, static_cast<long>(end_idx));
56 std::for_each(chunk_begin, chunk_end, func);
57 }));
58 }
59
60 try {
61 for (auto& fut : futures) {
62 fut.get();
63 }
64 } catch (...) {
65 throw;
66 }
67}
68
69template<typename InputIt, typename OutputIt, typename UnaryOperation>
70void parallel_transform(InputIt first1,
71 InputIt last1,
72 OutputIt d_first,
73 UnaryOperation unary_op)
74{
75 using InputTraits = std::iterator_traits<InputIt>;
76 using OutputTraits = std::iterator_traits<OutputIt>;
77
78 static_assert(
79 std::is_base_of<std::random_access_iterator_tag,
80 typename InputTraits::iterator_category>::value,
81 "parallel_transform currently requires random access input iterators.");
82 static_assert(
83 std::is_base_of<std::random_access_iterator_tag,
84 typename OutputTraits::iterator_category>::value,
85 "parallel_transform currently requires random access output iterators.");
86
87 const auto total_size = std::distance(first1, last1);
88 if (total_size <= 0) {
89 return;
90 }
91
92 auto& pool = default_pool();
93 const size_t num_threads = pool.get_thread_count();
94 const size_t min_chunk_size = 1;
95 const size_t hardware_threads =
96 std::max(1U, std::thread::hardware_concurrency());
97 const size_t num_tasks = std::max(num_threads, hardware_threads);
98
99 size_t chunk_size =
100 std::max(min_chunk_size,
101 static_cast<size_t>(
102 std::ceil(static_cast<double>(total_size) / num_tasks)));
103
104 std::vector<std::future<void>> futures;
105 futures.reserve(num_tasks);
106
107 for (size_t i = 0; i < total_size; i += chunk_size) {
108 size_t end_idx = std::min(i + chunk_size, static_cast<size_t>(total_size));
109 futures.emplace_back(pool.submit(
110 [first1, d_first, i, end_idx, unary_op]() mutable
111 {
112 auto chunk_begin = std::next(first1, static_cast<long>(i));
113 auto chunk_end = std::next(first1, static_cast<long>(end_idx));
114 auto chunk_d_first = std::next(d_first, static_cast<long>(i));
115 std::transform(chunk_begin, chunk_end, chunk_d_first, unary_op);
116 }));
117 }
118
119 try {
120 for (auto& fut : futures) {
121 fut.get();
122 }
123 } catch (...) {
124 throw;
125 }
126}
127
128template<typename Iterator, typename T, typename BinaryOperation>
129T parallel_reduce(Iterator begin,
130 Iterator end,
131 T identity,
132 BinaryOperation reduce_op)
133{
134 using traits = std::iterator_traits<Iterator>;
135 static_assert(std::is_base_of_v<std::random_access_iterator_tag,
136 typename traits::iterator_category>,
137 "parallel_reduce currently requires random access iterators "
138 "for efficient chunking.");
139
140 const auto total_size = std::distance(begin, end);
141 if (total_size <= 0) {
142 return identity;
143 }
144
145 auto& pool = default_pool();
146 const size_t num_threads = pool.get_thread_count();
147 const size_t min_chunk_size = 256;
148 const size_t hardware_threads =
149 std::max(1U, std::thread::hardware_concurrency());
150 const size_t num_tasks = std::max(static_cast<size_t>(1ul),
151 std::min(num_threads, hardware_threads));
152
153 size_t chunk_size =
154 std::max(min_chunk_size,
155 static_cast<size_t>(
156 std::ceil(static_cast<double>(total_size) / num_tasks)));
157 size_t actual_num_tasks = static_cast<size_t>(
158 std::ceil(static_cast<double>(total_size) / chunk_size));
159
160 std::vector<std::future<T>> futures;
161 futures.reserve(actual_num_tasks);
162
163 for (size_t i = 0; i < total_size; i += chunk_size) {
164 size_t end_idx = std::min(i + chunk_size, static_cast<size_t>(total_size));
165 futures.emplace_back(pool.submit(
166 [begin, i, end_idx, identity, reduce_op]() mutable
167 {
168 auto chunk_begin = std::next(begin, static_cast<long>(i));
169 auto chunk_end = std::next(begin, static_cast<long>(end_idx));
170 return std::accumulate(chunk_begin, chunk_end, identity, reduce_op);
171 }));
172 }
173
174 std::vector<T> partial_results;
175 partial_results.reserve(futures.size());
176
177 try {
178 for (auto& fut : futures) {
179 partial_results.push_back(fut.get());
180 }
181 } catch (...) {
182 throw;
183 }
184
185 // 如果只有一个部分结果,直接返回它,避免再次应用 identity
186 if (partial_results.size() == 1) {
187 return partial_results[0];
188 }
189
190 // 否则,使用第一个部分结果作为初始值,而不是 identity
191 return std::accumulate(partial_results.begin() + 1,
192 partial_results.end(),
193 partial_results[0],
194 reduce_op);
195}
196
197template<typename InputIt,
198 typename OutputIt,
199 typename T,
200 typename BinaryOperation>
201void parallel_inclusive_scan(InputIt first,
202 InputIt last,
203 OutputIt d_first,
204 T init,
205 BinaryOperation binary_op,
206 T identity)
207{
208 using traits = std::iterator_traits<InputIt>;
209 static_assert(std::is_base_of_v<std::random_access_iterator_tag,
210 typename traits::iterator_category>,
211 "parallel_inclusive_scan requires random access iterators.");
212
213 const auto total_size = std::distance(first, last);
214 if (total_size <= 0) {
215 return;
216 }
217
218 auto& pool = default_pool();
219 const size_t num_threads = pool.get_thread_count();
220 const size_t hardware_threads =
221 std::max(1U, std::thread::hardware_concurrency());
222 size_t num_tasks = std::max(num_threads, hardware_threads);
223
224 auto chunk_size = static_cast<size_t>(std::ceil(
225 static_cast<double>(total_size) / static_cast<double>(num_tasks)));
226 num_tasks = static_cast<size_t>(std::ceil(static_cast<double>(total_size)
227 / static_cast<double>(chunk_size)));
228
229 std::vector<T> chunk_sums(num_tasks);
230 std::vector<std::pair<size_t, size_t>> ranges;
231 ranges.reserve(num_tasks);
232
233 std::vector<std::future<T>> sum_futures;
234 sum_futures.reserve(num_tasks);
235
236 InputIt chunk_begin = first;
237 for (size_t i = 0; i < num_tasks; ++i) {
238 InputIt chunk_end = chunk_begin;
239 size_t current_size = std::min(
240 chunk_size, static_cast<size_t>(std::distance(chunk_begin, last)));
241 if (current_size == 0) {
242 break;
243 }
244 std::advance(chunk_end, current_size);
245 ranges.emplace_back(static_cast<size_t>(std::distance(first, chunk_begin)),
246 current_size);
247
248 sum_futures.emplace_back(pool.submit(
249 [chunk_begin, chunk_end, identity, binary_op]()
250 {
251 T local_sum = identity;
252 for (auto it = chunk_begin; it != chunk_end; ++it) {
253 local_sum = binary_op(local_sum, *it);
254 }
255 return local_sum;
256 }));
257
258 chunk_begin = chunk_end;
259 if (chunk_begin == last) {
260 break;
261 }
262 }
263
264 size_t actual_tasks = ranges.size();
265 for (size_t i = 0; i < actual_tasks; ++i) {
266 chunk_sums[i] = sum_futures[i].get();
267 }
268
269 std::vector<T> offsets(actual_tasks);
270 T running = init;
271 for (size_t i = 0; i < actual_tasks; ++i) {
272 offsets[i] = running;
273 running = binary_op(running, chunk_sums[i]);
274 }
275
276 std::vector<std::future<void>> futures;
277 futures.reserve(actual_tasks);
278 for (size_t i = 0; i < actual_tasks; ++i) {
279 size_t start_index = ranges[i].first;
280 size_t len = ranges[i].second;
281
282 InputIt chunk_begin_it =
283 first + static_cast<typename traits::difference_type>(start_index);
284 OutputIt dest_begin_it =
285 d_first + static_cast<typename traits::difference_type>(start_index);
286 T offset = offsets[i];
287
288 futures.emplace_back(pool.submit(
289 [chunk_begin_it, dest_begin_it, len, offset, binary_op]() mutable
290 {
291 T local = offset;
292 for (size_t j = 0; j < len; ++j) {
293 local = binary_op(
294 local,
295 *(chunk_begin_it
296 + static_cast<typename traits::difference_type>(j)));
297 *(dest_begin_it
298 + static_cast<typename traits::difference_type>(j)) = local;
299 }
300 }));
301 }
302
303 for (auto& fut : futures) {
304 fut.get();
305 }
306}
307
308template<typename RandomIt, typename Compare>
309void parallel_merge_sort(RandomIt begin, RandomIt end, Compare comp)
310{
311 const auto total_size = std::distance(begin, end);
312 if (total_size <= 1) {
313 return;
314 }
315
316 auto& pool = default_pool();
317 const size_t num_threads = pool.get_thread_count();
318 const size_t hardware_threads =
319 std::max(1U, std::thread::hardware_concurrency());
320 size_t num_tasks = std::max(num_threads, hardware_threads);
321
322 auto chunk_size = static_cast<size_t>(std::ceil(
323 static_cast<double>(total_size) / static_cast<double>(num_tasks)));
324 num_tasks = static_cast<size_t>(std::ceil(static_cast<double>(total_size)
325 / static_cast<double>(chunk_size)));
326
327 std::vector<std::future<void>> futures;
328 futures.reserve(num_tasks);
329 std::vector<std::pair<RandomIt, RandomIt>> ranges;
330 ranges.reserve(num_tasks);
331
332 RandomIt chunk_begin = begin;
333 for (size_t i = 0; i < num_tasks; ++i) {
334 RandomIt chunk_end = chunk_begin;
335 size_t current_size = std::min(
336 chunk_size, static_cast<size_t>(std::distance(chunk_begin, end)));
337 if (current_size == 0) {
338 break;
339 }
340 std::advance(chunk_end, current_size);
341 ranges.emplace_back(chunk_begin, chunk_end);
342 futures.emplace_back(
343 pool.submit([chunk_begin, chunk_end, comp]()
344 { std::sort(chunk_begin, chunk_end, comp); }));
345 chunk_begin = chunk_end;
346 if (chunk_begin == end) {
347 break;
348 }
349 }
350
351 for (auto& fut : futures) {
352 fut.get();
353 }
354
355 while (ranges.size() > 1) {
356 std::vector<std::pair<RandomIt, RandomIt>> new_ranges;
357 std::vector<std::future<void>> merge_futs;
358 for (size_t i = 0; i + 1 < ranges.size(); i += 2) {
359 auto begin1 = ranges[i].first;
360 auto mid = ranges[i].second;
361 auto end2 = ranges[i + 1].second;
362 merge_futs.emplace_back(
363 pool.submit([begin1, mid, end2, comp]() mutable
364 { std::inplace_merge(begin1, mid, end2, comp); }));
365 new_ranges.emplace_back(begin1, end2);
366 }
367
368 for (auto& fut : merge_futs) {
369 fut.get();
370 }
371
372 if (ranges.size() % 2 == 1) {
373 new_ranges.push_back(ranges.back());
374 }
375 ranges.swap(new_ranges);
376 }
377}
378
379template<typename RandomIt, typename Compare>
380void parallel_tim_sort(RandomIt begin, RandomIt end, Compare comp)
381{
382 const auto total_size = std::distance(begin, end);
383 if (total_size <= 1) {
384 return;
385 }
386
387 constexpr size_t kRun = 32;
388 std::vector<std::pair<RandomIt, RandomIt>> ranges;
389 ranges.reserve(
390 static_cast<size_t>(std::ceil(static_cast<double>(total_size) / kRun)));
391
392 RandomIt run_begin = begin;
393 while (run_begin != end) {
394 RandomIt run_end = run_begin;
395 size_t len =
396 std::min(static_cast<size_t>(std::distance(run_begin, end)), kRun);
397 std::advance(run_end, len);
398 std::sort(run_begin, run_end, comp); // small run sort
399 ranges.emplace_back(run_begin, run_end);
400 run_begin = run_end;
401 }
402
403 auto& pool = default_pool();
404 while (ranges.size() > 1) {
405 std::vector<std::pair<RandomIt, RandomIt>> new_ranges;
406 std::vector<std::future<void>> futures;
407 for (size_t i = 0; i + 1 < ranges.size(); i += 2) {
408 auto begin1 = ranges[i].first;
409 auto mid = ranges[i].second;
410 auto end2 = ranges[i + 1].second;
411 futures.emplace_back(
412 pool.submit([begin1, mid, end2, comp]()
413 { std::inplace_merge(begin1, mid, end2, comp); }));
414 new_ranges.emplace_back(begin1, end2);
415 }
416 for (auto& fut : futures) {
417 fut.get();
418 }
419 if (ranges.size() % 2 == 1) {
420 new_ranges.push_back(ranges.back());
421 }
422 ranges.swap(new_ranges);
423 }
424}
425
426} // namespace toolbox::concurrent
简单的线程池单例封装/A simple singleton wrapper around thread_pool_t
Definition thread_pool_singleton.hpp:18
Definition parallel_raw.hpp:15
void parallel_merge_sort(RandomIt begin, RandomIt end, Compare comp)
使用TBB实现并行合并排序
Definition parallel_raw.hpp:309
void parallel_transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op)
使用TBB并行转换范围[first1, last1)中的元素并存储到从d_first开始的范围
Definition parallel_raw.hpp:70
void parallel_inclusive_scan(InputIt first, InputIt last, OutputIt d_first, T init, BinaryOperation binary_op, T identity)
使用TBB执行并行包含扫描操作
Definition parallel_raw.hpp:201
void parallel_for_each(Iterator begin, Iterator end, Function func)
使用TBB并行对范围[begin, end)中的每个元素应用函数
Definition parallel_raw.hpp:21
void parallel_tim_sort(RandomIt begin, RandomIt end, Compare comp)
使用TBB实现并行TimSort
Definition parallel_raw.hpp:380
base::thread_pool_singleton_t & default_pool()
获取默认线程池实例/Get the default thread pool instance
Definition parallel.hpp:22
T parallel_reduce(Iterator begin, Iterator end, T identity, BinaryOperation reduce_op)
使用TBB对范围[begin, end)执行并行归约操作
Definition parallel_raw.hpp:129