17 template <
typename W,
typename AccumulateOp,
typename CombineOp,
typename Reducer,
18 typename ReleaseHandler,
typename ForwardIterator,
typename... ForwardIterators>
19 inline typename Reducer::accumulator_type
20 parallel_reduce_generic(
const execution::parallel_policy<W>& policy,
21 AccumulateOp accumulate_op,
24 typename Reducer::accumulator_type&& acc,
26 ForwardIterator first,
28 ForwardIterators... firsts) {
29 using acc_t =
typename Reducer::accumulator_type;
37 std::size_t d = std::distance(first, last);
38 if (d <= policy.cutoff_count) {
40 execution::internal::to_sequenced_policy(policy),
42 accumulate_op(acc, std::forward<decltype(refs)>(refs)...);
44 first, last, firsts...);
48 auto mid = std::next(first, d / 2);
53 auto&& [p1, p2] = execution::internal::get_child_policies(policy);
55 ito::thread<acc_t> th(
57 execution::internal::get_workhint(policy),
58 [=, p1 = p1, acc =
std::move(acc)]()
mutable {
59 return parallel_reduce_generic(p1, accumulate_op, combine_op, reducer,
60 std::move(acc), rh, first, mid, firsts...);
63 if (th.serialized()) {
64 acc_t acc_r = parallel_reduce_generic(p2, accumulate_op, combine_op, reducer,
65 th.join(), rh, mid, last, std::next(firsts, d / 2)...);
72 acc_t new_acc = reducer();
75 acc_t acc_r = parallel_reduce_generic(p2, accumulate_op, combine_op, reducer,
76 std::move(new_acc), rh, mid, last, std::next(firsts, d / 2)...);
80 acc_t acc_l = th.join();
86 combine_op(acc_l,
std::move(acc_r), first, mid, last, firsts...);
91 template <
typename AccumulateOp,
typename CombineOp,
typename Reducer,
92 typename ForwardIterator,
typename... ForwardIterators>
93 inline typename Reducer::accumulator_type
94 reduce_generic(
const execution::sequenced_policy& policy,
95 AccumulateOp accumulate_op,
96 CombineOp combine_op [[maybe_unused]],
97 Reducer reducer [[maybe_unused]],
98 typename Reducer::accumulator_type&& acc,
99 ForwardIterator first,
100 ForwardIterator last,
101 ForwardIterators... firsts) {
102 execution::internal::assert_policy(policy);
104 execution::internal::to_sequenced_policy(policy),
105 [&](
auto&&... refs) {
106 accumulate_op(acc, std::forward<decltype(refs)>(refs)...);
108 first, last, firsts...);
112 template <
typename W,
typename AccumulateOp,
typename CombineOp,
typename Reducer,
113 typename ForwardIterator,
typename... ForwardIterators>
114 inline typename Reducer::accumulator_type
115 reduce_generic(
const execution::parallel_policy<W>& policy,
116 AccumulateOp accumulate_op,
117 CombineOp combine_op,
119 typename Reducer::accumulator_type&& acc,
120 ForwardIterator first,
121 ForwardIterator last,
122 ForwardIterators... firsts) {
123 execution::internal::assert_policy(policy);
125 return parallel_reduce_generic(policy, accumulate_op, combine_op, reducer,
std::move(acc),
126 rh, first, last, firsts...);
164 template <
typename ExecutionPolicy,
typename ForwardIterator,
165 typename Reducer,
typename UnaryTransformOp>
166 inline typename Reducer::accumulator_type
168 ForwardIterator first,
169 ForwardIterator last,
171 UnaryTransformOp unary_transform_op) {
172 if constexpr (ori::is_global_ptr_v<ForwardIterator>) {
181 auto accumulate_op = [=](
auto&& acc,
const auto& r) {
182 reducer(std::forward<decltype(acc)>(acc), unary_transform_op(r));
185 auto combine_op = [=](
auto&& acc1,
auto&& acc2,
186 ForwardIterator, ForwardIterator, ForwardIterator) {
187 reducer(std::forward<decltype(acc1)>(acc1), std::forward<decltype(acc2)>(acc2));
190 return internal::reduce_generic(policy, accumulate_op, combine_op, reducer,
191 reducer(), first, last);
230 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIterator2,
231 typename Reducer,
typename BinaryTransformOp>
232 inline typename Reducer::accumulator_type
234 ForwardIterator1 first1,
235 ForwardIterator1 last1,
236 ForwardIterator2 first2,
238 BinaryTransformOp binary_transform_op) {
239 if constexpr (ori::is_global_ptr_v<ForwardIterator1> ||
240 ori::is_global_ptr_v<ForwardIterator2>) {
247 binary_transform_op);
250 auto accumulate_op = [=](
auto&& acc,
const auto& r1,
const auto& r2) {
251 reducer(std::forward<decltype(acc)>(acc), binary_transform_op(r1, r2));
254 auto combine_op = [=](
auto&& acc1,
auto&& acc2,
255 ForwardIterator1, ForwardIterator1, ForwardIterator1, ForwardIterator2) {
256 reducer(std::forward<decltype(acc1)>(acc1), std::forward<decltype(acc2)>(acc2));
259 return internal::reduce_generic(policy, accumulate_op, combine_op, reducer,
260 reducer(), first1, last1, first2);
292 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIterator2>
294 ForwardIterator1 first1,
295 ForwardIterator1 last1,
296 ForwardIterator2 first2) {
297 using T = decltype((*first1) * (*first2));
338 template <
typename ExecutionPolicy,
typename ForwardIterator,
typename Reducer>
339 inline typename Reducer::accumulator_type
341 ForwardIterator first,
342 ForwardIterator last,
345 [](
auto&& r) -> decltype(
auto) {
return std::forward<decltype(r)>(r); });
360 template <
typename ExecutionPolicy,
typename ForwardIterator>
361 inline typename std::iterator_traits<ForwardIterator>::value_type
363 ForwardIterator first,
364 ForwardIterator last) {
365 using T =
typename std::iterator_traits<ForwardIterator>::value_type;
369 ITYR_TEST_CASE(
"[ityr::pattern::parallel_reduce] reduce and transform_reduce") {
378 count_iterator<long>(0),
379 count_iterator<long>(n));
388 execution::parallel_policy(100),
389 count_iterator<long>(0),
390 count_iterator<long>(n));
399 execution::parallel_policy(100),
400 count_iterator<long>(0),
401 count_iterator<long>(n),
402 reducer::plus<long>{},
403 [](
long x) {
return x * x; });
405 ITYR_CHECK(r == n * (n - 1) * (2 * n - 1) / 6);
412 execution::parallel_policy(100),
413 count_iterator<long>(0),
414 count_iterator<long>(n),
415 count_iterator<long>(0),
416 reducer::plus<long>{},
417 [](
long x,
long y) {
return x * y; });
419 ITYR_CHECK(r == n * (n - 1) * (2 * n - 1) / 6);
425 execution::parallel_policy(100),
426 count_iterator<long>(0),
427 count_iterator<long>(0));
436 ITYR_TEST_CASE(
"[ityr::pattern::parallel_reduce] parallel reduce with global_ptr") {
441 ori::global_ptr<long> p = ori::malloc_coll<long>(n);
446 execution::sequenced_policy(100),
449 [&](
long& v) { v = count++; });
464 execution::parallel_policy(100),
476 reducer::plus<long>{},
477 [](ori::global_ref<long> gref) {
487 execution::sequenced_policy(100),
494 ori::global_ptr<common::move_only_t> p_mo = ori::malloc_coll<common::move_only_t>(n);
498 execution::parallel_policy(100),
499 count_iterator<long>(0),
500 count_iterator<long>(n),
502 [&](
long i, common::move_only_t& v) { v = common::move_only_t(i); });
504 common::move_only_t r =
reduce(
571 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIteratorD,
572 typename Reducer,
typename UnaryTransformOp>
573 inline ForwardIteratorD
575 ForwardIterator1 first1,
576 ForwardIterator1 last1,
577 ForwardIteratorD first_d,
579 UnaryTransformOp unary_transform_op,
580 typename Reducer::accumulator_type&&
init) {
581 if constexpr (ori::is_global_ptr_v<ForwardIterator1> ||
582 ori::is_global_ptr_v<ForwardIteratorD>) {
583 using value_type_d =
typename std::iterator_traits<ForwardIteratorD>::value_type;
588 internal::convert_to_global_iterator(first_d, internal::dest_checkout_mode_t<value_type_d>{}),
594 auto accumulate_op = [=](
auto&& acc,
const auto& r1,
auto&& d) {
595 reducer(acc, unary_transform_op(r1));
600 auto combine_op = [=](
auto&& acc1,
602 ForwardIterator1 first_,
603 ForwardIterator1 mid_,
604 ForwardIterator1 last_,
605 ForwardIteratorD first_d_) {
607 auto dm = std::distance(first_, mid_);
608 auto dl = std::distance(first_, last_);
609 if constexpr (!is_global_iterator_v<ForwardIteratorD>) {
610 for_each(policy, std::next(first_d_, dm), std::next(first_d_, dl),
611 [=](
auto&& acc_r) { reducer(acc1, acc_r); });
612 }
else if constexpr (std::is_same_v<typename ForwardIteratorD::mode, checkout_mode::no_access_t>) {
613 for_each(policy, std::next(first_d_, dm), std::next(first_d_, dl),
614 [=](
auto&& acc_r) { reducer(acc1, acc_r); });
618 for_each(policy, std::next(fd, dm), std::next(fd, dl),
619 [=](
auto&& acc_r) { reducer(acc1, acc_r); });
621 reducer(std::forward<decltype(acc1)>(acc1), std::forward<decltype(acc2)>(acc2));
624 internal::reduce_generic(policy, accumulate_op, combine_op, reducer,
627 return std::next(first_d, std::distance(first1, last1));
662 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIteratorD,
663 typename Reducer,
typename UnaryTransformOp>
665 ForwardIterator1 first1,
666 ForwardIterator1 last1,
667 ForwardIteratorD first_d,
669 UnaryTransformOp unary_transform_op) {
671 unary_transform_op, reducer());
721 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIteratorD,
723 inline ForwardIteratorD
725 ForwardIterator1 first1,
726 ForwardIterator1 last1,
727 ForwardIteratorD first_d,
729 typename Reducer::accumulator_type&&
init) {
731 [](
auto&& r) -> decltype(
auto) {
return std::forward<decltype(r)>(r); },
std::move(
init));
762 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIteratorD,
765 ForwardIterator1 first1,
766 ForwardIterator1 last1,
767 ForwardIteratorD first_d,
769 return inclusive_scan(policy, first1, last1, first_d, reducer, reducer());
799 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIteratorD>
801 ForwardIterator1 first1,
802 ForwardIterator1 last1,
803 ForwardIteratorD first_d) {
804 using T =
typename std::iterator_traits<ForwardIterator1>::value_type;
808 ITYR_TEST_CASE(
"[ityr::pattern::parallel_reduce] inclusive scan") {
813 ori::global_ptr<long> p1 = ori::malloc_coll<long>(n);
814 ori::global_ptr<long> p2 = ori::malloc_coll<long>(n);
817 fill(execution::parallel_policy(100),
821 execution::parallel_policy(100),
828 execution::parallel_policy(100),
834 execution::parallel_policy(100),
835 p1, p1 + n, p2, reducer::multiplies<long>{}, 10);
841 execution::parallel_policy(100),
842 p1, p1 + n, p2, reducer::plus<long>{}, [](
long x) {
return x + 1; }, 10);
885 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIterator2,
886 typename BinaryPredicate>
887 inline bool equal(
const ExecutionPolicy& policy,
888 ForwardIterator1 first1,
889 ForwardIterator1 last1,
890 ForwardIterator2 first2,
891 BinaryPredicate pred) {
926 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIterator2,
927 typename BinaryPredicate>
928 inline bool equal(
const ExecutionPolicy& policy,
929 ForwardIterator1 first1,
930 ForwardIterator1 last1,
931 ForwardIterator2 first2,
932 ForwardIterator2 last2,
933 BinaryPredicate pred) {
934 return std::distance(first1, last1) == std::distance(first2, last2) &&
935 equal(policy, first1, last1, first2, pred);
952 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIterator2>
953 inline bool equal(
const ExecutionPolicy& policy,
954 ForwardIterator1 first1,
955 ForwardIterator1 last1,
956 ForwardIterator2 first2) {
957 return equal(policy, first1, last1, first2, std::equal_to<>{});
975 template <
typename ExecutionPolicy,
typename ForwardIterator1,
typename ForwardIterator2>
976 inline bool equal(
const ExecutionPolicy& policy,
977 ForwardIterator1 first1,
978 ForwardIterator1 last1,
979 ForwardIterator2 first2,
980 ForwardIterator2 last2) {
981 return equal(policy, first1, last1, first2, last2, std::equal_to<>{});
984 ITYR_TEST_CASE(
"[ityr::pattern::parallel_reduce] equal") {
989 ori::global_ptr<long> p1 = ori::malloc_coll<long>(n);
990 ori::global_ptr<long> p2 = ori::malloc_coll<long>(n);
994 execution::parallel_policy(100),
997 count_iterator<long>(0),
998 [=](
long& v,
long i) { v = i * 2; });
1000 copy(execution::parallel_policy(100), p1, p1 + n, p2);
1003 p1, p1 + n, p2) ==
true);
1006 p1, p1 + n, p2, p2 + n) ==
true);
1009 p1, p1 + n, p2, p2 + n - 1) ==
false);
1013 ITYR_CHECK(
equal(execution::parallel_policy(100), p1, p1 + n, p2) ==
false);
1053 template <
typename ExecutionPolicy,
typename ForwardIterator,
typename Compare>
1055 ForwardIterator first,
1056 ForwardIterator last,
1059 return std::distance(first, last) <= 1 ||
1088 template <
typename ExecutionPolicy,
typename ForwardIterator>
1090 ForwardIterator first,
1091 ForwardIterator last) {
1092 return is_sorted(policy, first, last, std::less<>{});
1095 ITYR_TEST_CASE(
"[ityr::pattern::parallel_reduce] is_sorted") {
1100 ori::global_ptr<long> p = ori::malloc_coll<long>(n);
1104 execution::parallel_policy(100),
1107 count_iterator<long>(0),
1108 [=](
long& v,
long i) { v = i / 3; });
1114 p, p + n, std::greater<>{}) ==
false);
1119 std::greater<>{}) ==
true);
1124 p, p + n) ==
false);
#define ITYR_SUBCASE(name)
Definition: util.hpp:41
#define ITYR_CHECK(cond)
Definition: util.hpp:48
constexpr read_write_t read_write
Read+Write checkout mode.
Definition: checkout_span.hpp:39
constexpr read_t read
Read-only checkout mode.
Definition: checkout_span.hpp:19
constexpr no_access_t no_access
Checkout mode to disable automatic checkout.
Definition: checkout_span.hpp:48
constexpr write_t write
Write-only checkout mode.
Definition: checkout_span.hpp:29
constexpr parallel_policy par
Default parallel execution policy for iterator-based loop functions.
Definition: execution.hpp:89
void fini()
Definition: ito.hpp:45
auto root_exec(Fn &&fn, Args &&... args)
Definition: ito.hpp:50
void task_group_begin(task_group_data *tgdata)
Definition: ito.hpp:105
void init(MPI_Comm comm=MPI_COMM_WORLD)
Definition: ito.hpp:41
void poll(PreSuspendCallback &&pre_suspend_cb, PostSuspendCallback &&post_suspend_cb)
Definition: ito.hpp:96
constexpr with_callback_t with_callback
Definition: thread.hpp:11
void task_group_end(PreSuspendCallback &&pre_suspend_cb, PostSuspendCallback &&post_suspend_cb)
Definition: ito.hpp:112
scheduler::task_group_data task_group_data
Definition: ito.hpp:103
void fini()
Definition: ori.hpp:49
void get(global_ptr< ConstT > from_ptr, T *to_ptr, std::size_t count)
Definition: ori.hpp:80
void init(MPI_Comm comm=MPI_COMM_WORLD)
Definition: ori.hpp:45
auto release_lazy()
Definition: ori.hpp:200
void free_coll(global_ptr< T > ptr)
Definition: ori.hpp:70
core::instance::instance_type::release_handler release_handler
Definition: ori.hpp:204
void poll()
Definition: ori.hpp:224
void release()
Definition: ori.hpp:196
void acquire()
Definition: ori.hpp:206
Definition: allocator.hpp:16
global_reverse_iterator< global_iterator< T, Mode > > make_reverse_iterator(ori::global_ptr< T > gptr, Mode mode)
Make a reverse iterator for global memory.
Definition: global_iterator.hpp:333
ForwardIteratorD transform_inclusive_scan(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d, Reducer reducer, UnaryTransformOp unary_transform_op, typename Reducer::accumulator_type &&init)
Calculate a prefix sum (inclusive scan) while transforming each element.
Definition: parallel_reduce.hpp:574
ForwardIteratorD copy(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d)
Copy a range to another.
Definition: parallel_loop.hpp:856
void for_each(const ExecutionPolicy &policy, ForwardIterator first, ForwardIterator last, Op op)
Apply an operator to each element in a range.
Definition: parallel_loop.hpp:136
void fill(const ExecutionPolicy &policy, ForwardIterator first, ForwardIterator last, const T &value)
Fill a range with a given value.
Definition: parallel_loop.hpp:771
Reducer::accumulator_type transform_reduce(const ExecutionPolicy &policy, ForwardIterator first, ForwardIterator last, Reducer reducer, UnaryTransformOp unary_transform_op)
Calculate reduction while transforming each element.
Definition: parallel_reduce.hpp:167
void init(MPI_Comm comm=MPI_COMM_WORLD)
Initialize Itoyori (collective).
Definition: ityr.hpp:69
Reducer::accumulator_type reduce(const ExecutionPolicy &policy, ForwardIterator first, ForwardIterator last, Reducer reducer)
Calculate reduction.
Definition: parallel_reduce.hpp:340
global_iterator< T, Mode > make_global_iterator(ori::global_ptr< T > gptr, Mode)
Make a global iterator to enable/disable automatic checkout.
Definition: global_iterator.hpp:158
bool is_sorted(const ExecutionPolicy &policy, ForwardIterator first, ForwardIterator last, Compare comp)
Check if a range is sorted.
Definition: parallel_reduce.hpp:1054
ForwardIteratorD inclusive_scan(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d, Reducer reducer, typename Reducer::accumulator_type &&init)
Calculate a prefix sum (inclusive scan).
Definition: parallel_reduce.hpp:724
bool equal(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIterator2 first2, BinaryPredicate pred)
Check if two ranges have equal values.
Definition: parallel_reduce.hpp:887
ForwardIteratorD move(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d)
Move a range to another.
Definition: parallel_loop.hpp:934
Definition: reducer.hpp:15