Itoyori  v0.0.1
wsqueue.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <optional>
5 #include <memory>
6 #include <type_traits>
7 #include <algorithm>
8 
9 #include "ityr/common/util.hpp"
10 #include "ityr/common/mpi_util.hpp"
11 #include "ityr/common/mpi_rma.hpp"
12 #include "ityr/common/topology.hpp"
14 #include "ityr/common/profiler.hpp"
15 #include "ityr/ito/prof_events.hpp"
16 
17 namespace ityr::ito {
18 
19 class wsqueue_full_exception : public std::exception {
20 public:
21  const char* what() const noexcept override { return "Work stealing queue is full."; }
22 };
23 
24 template <typename Entry, bool EnablePass = true>
25 class wsqueue {
26 public:
27  wsqueue(int n_entries, int n_queues = 1)
28  : n_entries_(n_entries),
29  n_queues_(n_queues),
30  initial_pos_(EnablePass ? n_entries / 2 : 0),
31  queue_state_win_(common::topology::mpicomm(), n_queues_ * 2, initial_pos_),
32  entries_win_(common::topology::mpicomm(), n_entries_ * n_queues_),
33  queue_lock_(n_queues_),
34  local_empty_(n_queues_, false) {}
35 
36  void push(const Entry& entry, int idx = 0) {
38 
39  ITYR_CHECK(idx < n_queues_);
40 
41  queue_state& qs = local_queue_state(idx);
42  auto entries = local_entries(idx);
43 
44  int t = qs.top.load(std::memory_order_relaxed);
45 
46  if (t == n_entries_) {
47  queue_lock_.priolock(common::topology::my_rank(), idx);
48 
49  int b = qs.base.load(std::memory_order_relaxed);
50  int offset = -(b + 1) / 2;
51  move_entries(offset, idx);
52  t += offset;
53 
54  queue_lock_.unlock(common::topology::my_rank(), idx);
55  }
56 
57  entries[t] = entry;
58 
59  qs.top.store(t + 1, std::memory_order_release);
60 
61  if constexpr (!EnablePass) {
62  local_empty_[idx] = false;
63  }
64  }
65 
66  template <bool EnsureEmpty = true>
67  std::optional<Entry> pop(int idx = 0) {
69 
70  ITYR_CHECK(idx < n_queues_);
71 
72  queue_state& qs = local_queue_state(idx);
73  if constexpr (EnablePass) {
74  // Move entries so that the base does not become too close to zero;
75  // otherwise, remote pass operations may fail.
76  // Check before the queue empty check.
77  int b = qs.base.load(std::memory_order_relaxed);
78  if (b < n_entries_ / 10) {
79  int t = qs.top.load(std::memory_order_relaxed);
80  if (n_entries_ - t > n_entries_ / 10) {
81  queue_lock_.priolock(common::topology::my_rank(), idx);
82 
83  int t = qs.top.load(std::memory_order_relaxed);
84  int offset = (n_entries_ - t + 1) / 2;
85  move_entries(offset, idx);
86 
87  queue_lock_.unlock(common::topology::my_rank(), idx);
88  }
89  }
90  }
91 
92  if constexpr (!EnablePass) {
93  if (local_empty_[idx]) {
94  return std::nullopt;
95  }
96  }
97 
98  // We often need to ensure the queue is truly empty because the thief might be concurrently
99  // operating on the queue (the stack copy might be ongoing) and can abort stealing after
100  // updating the `base` variable.
101  // TODO: any better way to handle this ordering?
102  if constexpr (!EnsureEmpty) {
103  if (qs.empty()) {
104  return std::nullopt;
105  }
106  }
107 
108  std::optional<Entry> ret;
109  auto entries = local_entries(idx);
110 
111  int t = qs.top.load(std::memory_order_relaxed) - 1;
112  qs.top.store(t, std::memory_order_relaxed);
113 
114  std::atomic_thread_fence(std::memory_order_seq_cst);
115 
116  int b = qs.base.load(std::memory_order_relaxed);
117 
118  if (b <= t) {
119  ret = entries[t];
120  } else {
121  qs.top.store(t + 1, std::memory_order_relaxed);
122 
123  queue_lock_.priolock(common::topology::my_rank(), idx);
124 
125  qs.top.store(t, std::memory_order_relaxed);
126  int b = qs.base.load(std::memory_order_relaxed);
127 
128  if (b < t) {
129  ret = entries[t];
130  } else if (b == t) {
131  ret = entries[t];
132 
133  qs.top.store(initial_pos_, std::memory_order_relaxed);
134  qs.base.store(initial_pos_, std::memory_order_relaxed);
135 
136  // Once we confirm that this queue is empty by taking the lock, later pop operations will
137  // always fail if the "pass" operation is not allowed for the queue
138  if constexpr (!EnablePass) {
139  local_empty_[idx] = true;
140  }
141  } else {
142  ret = std::nullopt;
143 
144  qs.top.store(initial_pos_, std::memory_order_relaxed);
145  qs.base.store(initial_pos_, std::memory_order_relaxed);
146 
147  if constexpr (!EnablePass) {
148  local_empty_[idx] = true;
149  }
150  }
151 
152  queue_lock_.unlock(common::topology::my_rank(), idx);
153  }
154 
155  return ret;
156  }
157 
158  std::optional<Entry> steal_nolock(common::topology::rank_t target_rank, int idx = 0) {
160 
161  ITYR_CHECK(idx < n_queues_);
162 
163  ITYR_CHECK(queue_lock_.is_locked(target_rank, idx));
164 
165  std::optional<Entry> ret;
166 
167  int b = common::mpi_atomic_faa_value<int>(1, target_rank, queue_state_base_disp(idx), queue_state_win_.win());
168  int t = common::mpi_get_value<int>(target_rank, queue_state_top_disp(idx), queue_state_win_.win());
169 
170  if (b < t) {
171  ret = common::mpi_get_value<Entry>(target_rank, entries_disp(b, idx), entries_win_.win());
172  } else {
173  common::mpi_atomic_faa_value<int>(-1, target_rank, queue_state_base_disp(idx), queue_state_win_.win());
174  ret = std::nullopt;
175  }
176 
177  return ret;
178  }
179 
180  std::optional<Entry> steal(common::topology::rank_t target_rank, int idx = 0) {
181  ITYR_CHECK(idx < n_queues_);
182 
183  queue_lock_.lock(target_rank, idx);
184  auto ret = steal_nolock(target_rank, idx);
185  queue_lock_.unlock(target_rank, idx);
186  return ret;
187  }
188 
189  void abort_steal(common::topology::rank_t target_rank, int idx = 0) {
191 
192  ITYR_CHECK(idx < n_queues_);
193  ITYR_CHECK(queue_lock_.is_locked(target_rank, idx));
194 
195  common::mpi_atomic_faa_value<int>(-1, target_rank, queue_state_base_disp(idx), queue_state_win_.win());
196  }
197 
198  bool trypass(const Entry& entry, common::topology::rank_t target_rank, int idx = 0) {
199  if constexpr (!EnablePass) {
200  common::die("Pass operation is not allowed");
201  }
202 
203  ITYR_CHECK(idx < n_queues_);
204 
205  queue_lock_.lock(target_rank, idx);
206 
207  int b = common::mpi_get_value<int>(target_rank, queue_state_base_disp(idx), queue_state_win_.win());
208 
209  if (b == 0) {
210  queue_lock_.unlock(target_rank, idx);
211  return false;
212  }
213 
214  common::mpi_put_value<Entry>(entry, target_rank, entries_disp(b - 1, idx), entries_win_.win());
215 
216  common::mpi_put_value<int>(b - 1, target_rank, queue_state_base_disp(idx), queue_state_win_.win());
217 
218  queue_lock_.unlock(target_rank, idx);
219 
220  return true;
221  }
222 
223  void pass(const Entry& entry, common::topology::rank_t target_rank, int idx = 0) {
225 
226  ITYR_CHECK(idx < n_queues_);
227  while (!trypass(entry, target_rank, idx));
228  }
229 
230  template <typename Fn, bool EnsureEmpty = true>
231  void for_each_entry(Fn fn, int idx = 0) {
232  ITYR_CHECK(idx < n_queues_);
233 
234  if constexpr (!EnablePass) {
235  if (local_empty_[idx]) {
236  return;
237  }
238  }
239 
240  queue_state& qs = local_queue_state(idx);
241  if constexpr (!EnsureEmpty) {
242  if (qs.empty()) {
243  return;
244  }
245  }
246 
247  auto entries = local_entries(idx);
248 
249  queue_lock_.priolock(common::topology::my_rank(), idx);
250 
251  int t = qs.top.load(std::memory_order_relaxed);
252  int b = qs.base.load(std::memory_order_relaxed);
253  for (int i = b; i < t; i++) {
254  fn(entries[i]);
255  }
256 
257  if constexpr (!EnablePass) {
258  if (t <= b) {
259  local_empty_[idx] = true;
260  }
261  }
262 
263  queue_lock_.unlock(common::topology::my_rank(), idx);
264  }
265 
266  int size(int idx = 0) const {
267  ITYR_CHECK(idx < n_queues_);
268  return local_queue_state(idx).size();
269  }
270 
271  bool empty(common::topology::rank_t target_rank, int idx = 0) const {
273 
274  ITYR_CHECK(idx < n_queues_);
275 
276  auto remote_qs = common::mpi_get_value<queue_state>(target_rank, queue_state_disp(idx), queue_state_win_.win());
277  return remote_qs.empty();
278  }
279 
280  template <typename Fn>
282  int idx_begin, int idx_end, bool reverse, Fn fn) {
283  {
285  common::mpi_get(&local_queue_state_buf(idx_begin), idx_end - idx_begin,
286  target_rank, queue_state_disp(idx_begin), queue_state_win_.win());
287  }
288  if (reverse) {
289  for (int idx = idx_end - 1; idx >= idx_begin; idx--) {
290  if (!local_queue_state_buf(idx).empty()) {
291  // `fn` should return a boolean (whether to break the loop or not)
292  if (fn(idx)) break;
293  }
294  }
295  } else {
296  for (int idx = idx_begin; idx < idx_end; idx++) {
297  if (!local_queue_state_buf(idx).empty()) {
298  if (fn(idx)) break;
299  }
300  }
301  }
302  }
303 
304  const common::global_lock& lock() const { return queue_lock_; }
305  int n_queues() const { return n_queues_; }
306 
307 private:
308  struct queue_state {
309  std::atomic<int> top;
310  std::atomic<int> base;
311  // Check if they are safe to be accessed by MPI RMA
312  static_assert(sizeof(std::atomic<int>) == sizeof(int));
313 
314  queue_state(int initial_pos = 0) : top(initial_pos), base(initial_pos) {}
315 
316  // Copy constructors for std::atomic are deleted
317  queue_state(const queue_state& qs)
318  : top(qs.top.load(std::memory_order_relaxed)),
319  base(qs.base.load(std::memory_order_relaxed)) {}
320  queue_state& operator=(const queue_state& qs) {
321  top.store(qs.top.load(std::memory_order_relaxed), std::memory_order_relaxed);
322  base.store(qs.base.load(std::memory_order_relaxed), std::memory_order_relaxed);
323  }
324 
325  int size() const {
326  return std::max(0, top.load(std::memory_order_relaxed) -
327  base.load(std::memory_order_relaxed));
328  }
329 
330  bool empty() const {
331  return top.load(std::memory_order_relaxed) <=
332  base.load(std::memory_order_relaxed);
333  }
334  };
335 
336  static_assert(std::is_standard_layout_v<queue_state>);
337  // FIXME: queue_state is no longer trivially copyable.
338  // Thus, strictly speaking, using MPI RMA for queue_state is illegal.
339  // static_assert(std::is_trivially_copyable_v<queue_state>);
340 
341  /* static constexpr std::size_t queue_state_align = common::hardware_destructive_interference_size; */
342  static constexpr std::size_t queue_state_align = sizeof(queue_state);
343 
344  struct alignas(queue_state_align) queue_state_wrapper {
345  template <typename... Args>
346  queue_state_wrapper(Args&&... args) : value(std::forward<Args>(args)...) {}
347  queue_state value;
348  };
349 
350  std::size_t queue_state_disp(int idx) const {
351  return idx * sizeof(queue_state_wrapper) + offsetof(queue_state_wrapper, value);
352  }
353 
354  std::size_t queue_state_top_disp(int idx) const {
355  return idx * sizeof(queue_state_wrapper) + offsetof(queue_state_wrapper, value) + offsetof(queue_state, top);
356  }
357 
358  std::size_t queue_state_base_disp(int idx) const {
359  return idx * sizeof(queue_state_wrapper) + offsetof(queue_state_wrapper, value) + offsetof(queue_state, base);
360  }
361 
362  std::size_t entries_disp(int entry_num, int idx) const {
363  return (entry_num + idx * n_entries_) * sizeof(Entry);
364  }
365 
366  queue_state& local_queue_state(int idx) const {
367  return queue_state_win_.local_buf()[idx].value;
368  }
369 
370  queue_state& local_queue_state_buf(int idx) const {
371  // Memory twice as large as the local queue states is allocated for the MPI window
372  return queue_state_win_.local_buf()[n_queues_ + idx].value;
373  }
374 
375  auto local_entries(int idx) const {
376  return entries_win_.local_buf().subspan(idx * n_entries_, n_entries_);
377  }
378 
379  void move_entries(int offset, int idx) {
380  ITYR_CHECK(queue_lock_.is_locked(common::topology::my_rank(), idx));
381 
382  queue_state& qs = local_queue_state(idx);
383  auto entries = local_entries(idx);
384 
385  int t = qs.top.load(std::memory_order_relaxed);
386  int b = qs.base.load(std::memory_order_relaxed);
387 
388  ITYR_CHECK(b <= t);
389 
390  int new_b = b + offset;
391  int new_t = t + offset;
392 
393  if (offset == 0 || new_b < 0 || n_entries_ < new_t) {
394  throw wsqueue_full_exception{};
395  }
396 
397  std::move(&entries[b], &entries[t], &entries[new_b]);
398 
399  qs.top.store(new_t, std::memory_order_relaxed);
400  qs.base.store(new_b, std::memory_order_relaxed);
401  }
402 
403  int n_entries_;
404  int n_queues_;
405  int initial_pos_;
406  common::mpi_win_manager<queue_state_wrapper> queue_state_win_;
407  common::mpi_win_manager<Entry> entries_win_;
408  common::global_lock queue_lock_;
409  std::vector<bool> local_empty_;
410 };
411 
412 ITYR_TEST_CASE("[ityr::ito::wsqueue] single queue") {
413  int n_entries = 1000;
414  using entry_t = int;
415 
416  common::runtime_options common_opts;
417  common::singleton_initializer<common::topology::instance> topo;
418  wsqueue<entry_t> wsq(n_entries);
419 
422 
423  ITYR_SUBCASE("local push and pop") {
424  int n_trial = 3;
425  for (int t = 0; t < n_trial; t++) {
426  for (int i = 0; i < n_entries; i++) {
427  wsq.push(i);
428  }
429  for (int i = 0; i < n_entries; i++) {
430  auto result = wsq.pop();
431  ITYR_CHECK(result.has_value());
432  ITYR_CHECK(*result == n_entries - i - 1); // LIFO order
433  }
434  }
435  }
436 
437  ITYR_SUBCASE("should throw exception when full") {
438  for (int i = 0; i < n_entries; i++) {
439  wsq.push(i);
440  }
441  ITYR_CHECK_THROWS_AS(wsq.push(n_entries), wsqueue_full_exception);
442  }
443 
444  ITYR_SUBCASE("steal") {
445  if (n_ranks == 1) return;
446 
447  for (common::topology::rank_t target_rank = 0; target_rank < n_ranks; target_rank++) {
448  ITYR_CHECK(wsq.empty(target_rank));
449 
451 
452  entry_t sum_expected = 0;
453  if (target_rank == my_rank) {
454  for (int i = 0; i < n_entries; i++) {
455  wsq.push(i);
456  sum_expected += i;
457  }
458  }
459 
461 
462  entry_t local_sum = 0;
463 
464  ITYR_SUBCASE("remote steal by only one process") {
465  if ((target_rank + 1) % n_ranks == my_rank) {
466  for (int i = 0; i < n_entries; i++) {
467  auto result = wsq.steal(target_rank);
468  ITYR_CHECK(result.has_value());
469  ITYR_CHECK(*result == i); // FIFO order
470  local_sum += *result;
471  }
472  }
473  }
474 
475  ITYR_SUBCASE("remote steal concurrently") {
476  if (target_rank != my_rank) {
477  while (!wsq.empty(target_rank)) {
478  auto result = wsq.steal(target_rank);
479  if (result.has_value()) {
480  local_sum += *result;
481  }
482  }
483  }
484  }
485 
486  ITYR_SUBCASE("local pop and remote steal concurrently") {
487  if (target_rank == my_rank) {
488  while (!wsq.empty(my_rank)) {
489  auto result = wsq.pop();
490  if (result.has_value()) {
491  local_sum += *result;
492  }
493  }
494  } else {
495  while (!wsq.empty(target_rank)) {
496  auto result = wsq.steal(target_rank);
497  if (result.has_value()) {
498  local_sum += *result;
499  }
500  }
501  }
502  }
503 
505  entry_t sum_all = common::mpi_reduce_value(local_sum, target_rank, common::topology::mpicomm());
506 
507  ITYR_CHECK(wsq.empty(target_rank));
508 
509  if (target_rank == my_rank) {
510  ITYR_CHECK(sum_all == sum_expected);
511  }
512 
514  }
515  }
516 
517  ITYR_SUBCASE("all operations concurrently") {
518  int n_repeats = 5;
519 
520  for (common::topology::rank_t target_rank = 0; target_rank < n_ranks; target_rank++) {
521  ITYR_CHECK(wsq.empty(target_rank));
522 
524 
525  if (target_rank == my_rank) {
526  entry_t sum_expected = 0;
527  entry_t local_sum = 0;
528 
529  // repeat push and pop
530  for (int r = 0; r < n_repeats; r++) {
531  for (int i = 0; i < n_entries; i++) {
532  wsq.push(i);
533  sum_expected += i;
534  }
535  while (!wsq.empty(my_rank)) {
536  auto result = wsq.pop();
537  if (result.has_value()) {
538  local_sum += *result;
539  }
540  }
541  }
542 
544  common::mpi_wait(req);
545 
546  entry_t sum_all = common::mpi_reduce_value(local_sum, target_rank, common::topology::mpicomm());
547 
548  ITYR_CHECK(sum_all == sum_expected);
549 
550  } else {
551  entry_t local_sum = 0;
552 
554  while (!common::mpi_test(req)) {
555  auto result = wsq.steal(target_rank);
556  if (result.has_value()) {
557  local_sum += *result;
558  }
559  }
560 
561  ITYR_CHECK(wsq.empty(target_rank));
562 
563  common::mpi_reduce_value(local_sum, target_rank, common::topology::mpicomm());
564  }
565 
567  }
568  }
569 
570  ITYR_SUBCASE("resize queue") {
571  if (n_ranks == 1) return;
572 
573  for (common::topology::rank_t target_rank = 0; target_rank < n_ranks; target_rank++) {
574  ITYR_CHECK(wsq.empty(target_rank));
575 
577 
578  if (target_rank == my_rank) {
579  for (int i = 0; i < n_entries; i++) {
580  wsq.push(i);
581  }
582  }
583 
585 
586  // only one process steals
587  if ((target_rank + 1) % n_ranks == my_rank) {
588  // steal half of the queue
589  for (int i = 0; i < n_entries / 2; i++) {
590  auto result = wsq.steal(target_rank);
591  ITYR_CHECK(result.has_value());
592  ITYR_CHECK(*result == i);
593  }
594  }
595 
597 
598  if (target_rank == my_rank) {
599  // push half of the queue
600  for (int i = 0; i < n_entries / 2; i++) {
601  wsq.push(i);
602  }
603  // pop all
604  for (int i = 0; i < n_entries; i++) {
605  auto result = wsq.pop();
606  ITYR_CHECK(result.has_value());
607  }
608  }
609 
611 
612  ITYR_CHECK(wsq.empty(target_rank));
613  }
614  }
615 
616  ITYR_SUBCASE("pass") {
617  for (common::topology::rank_t target_rank = 0; target_rank < n_ranks; target_rank++) {
618  ITYR_CHECK(wsq.empty(target_rank));
619 
621 
622  if (target_rank == my_rank) {
623  entry_t sum = 0;
624 
626  while (!common::mpi_test(req) || !wsq.empty(my_rank)) {
627  auto result = wsq.steal(target_rank);
628  if (result.has_value()) {
629  sum += *result;
630  }
631  }
632 
633  ITYR_CHECK(sum == (n_entries / n_ranks) * (n_entries / n_ranks - 1) / 2 * (n_ranks - 1));
634 
635  } else {
636  for (int i = 0; i < n_entries / n_ranks; i++) {
637  wsq.pass(i, target_rank);
638  }
639 
641  common::mpi_wait(req);
642  }
643 
645  }
646  }
647 }
648 
649 ITYR_TEST_CASE("[ityr::ito::wsqueue] multiple queues") {
650  int n_entries = 1000;
651  int n_queues = 3;
652  using entry_t = int;
653 
654  common::runtime_options common_opts;
655  common::singleton_initializer<common::topology::instance> topo;
656  wsqueue<entry_t> wsq(n_entries, n_queues);
657 
660 
661  int n_repeats = 5;
662 
663  for (common::topology::rank_t target_rank = 0; target_rank < n_ranks; target_rank++) {
664  for (int q = 0; q < n_queues; q++) {
665  ITYR_CHECK(wsq.empty(target_rank, q));
666  }
667 
669 
670  if (target_rank == my_rank) {
671  entry_t sum_expected = 0;
672  entry_t local_sum = 0;
673 
674  // repeat push and pop
675  for (int r = 0; r < n_repeats; r++) {
676  for (int i = 0; i < n_entries; i++) {
677  for (int q = 0; q < n_queues; q++) {
678  wsq.push(i, q);
679  sum_expected += i;
680  }
681  }
682  for (int q = 0; q < n_queues; q++) {
683  while (!wsq.empty(my_rank, q)) {
684  auto result = wsq.pop(q);
685  if (result.has_value()) {
686  local_sum += *result;
687  }
688  }
689  }
690  }
691 
693  common::mpi_wait(req);
694 
695  entry_t sum_all = common::mpi_reduce_value(local_sum, target_rank, common::topology::mpicomm());
696 
697  ITYR_CHECK(sum_all == sum_expected);
698 
699  } else {
700  entry_t local_sum = 0;
701 
703  while (!common::mpi_test(req)) {
704  for (int q = 0; q < n_queues; q++) {
705  auto result = wsq.steal(target_rank, q);
706  if (result.has_value()) {
707  local_sum += *result;
708  }
709  }
710  }
711 
712  for (int q = 0; q < n_queues; q++) {
713  ITYR_CHECK(wsq.empty(target_rank, q));
714  }
715 
716  common::mpi_reduce_value(local_sum, target_rank, common::topology::mpicomm());
717  }
718 
720  }
721 }
722 
723 }
Definition: global_lock.hpp:15
void priolock(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:38
void lock(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:33
void unlock(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:53
bool is_locked(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:61
MPI_Win win() const
Definition: mpi_rma.hpp:409
Definition: wsqueue.hpp:19
const char * what() const noexcept override
Definition: wsqueue.hpp:21
Definition: wsqueue.hpp:25
std::optional< Entry > pop(int idx=0)
Definition: wsqueue.hpp:67
wsqueue(int n_entries, int n_queues=1)
Definition: wsqueue.hpp:27
void abort_steal(common::topology::rank_t target_rank, int idx=0)
Definition: wsqueue.hpp:189
std::optional< Entry > steal_nolock(common::topology::rank_t target_rank, int idx=0)
Definition: wsqueue.hpp:158
void pass(const Entry &entry, common::topology::rank_t target_rank, int idx=0)
Definition: wsqueue.hpp:223
int n_queues() const
Definition: wsqueue.hpp:305
bool trypass(const Entry &entry, common::topology::rank_t target_rank, int idx=0)
Definition: wsqueue.hpp:198
void push(const Entry &entry, int idx=0)
Definition: wsqueue.hpp:36
std::optional< Entry > steal(common::topology::rank_t target_rank, int idx=0)
Definition: wsqueue.hpp:180
void for_each_entry(Fn fn, int idx=0)
Definition: wsqueue.hpp:231
const common::global_lock & lock() const
Definition: wsqueue.hpp:304
bool empty(common::topology::rank_t target_rank, int idx=0) const
Definition: wsqueue.hpp:271
int size(int idx=0) const
Definition: wsqueue.hpp:266
void for_each_nonempty_queue(common::topology::rank_t target_rank, int idx_begin, int idx_end, bool reverse, Fn fn)
Definition: wsqueue.hpp:281
#define ITYR_CHECK_THROWS_AS(exp, exception)
Definition: util.hpp:51
#define ITYR_SUBCASE(name)
Definition: util.hpp:41
#define ITYR_CHECK(cond)
Definition: util.hpp:48
rank_t n_ranks()
Definition: topology.hpp:208
int rank_t
Definition: topology.hpp:12
MPI_Comm mpicomm()
Definition: topology.hpp:206
rank_t my_rank()
Definition: topology.hpp:207
va_list args
Definition: util.hpp:76
void mpi_get(T *origin, std::size_t count, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:69
void mpi_wait(MPI_Request &req)
Definition: mpi_util.hpp:250
MPI_Request mpi_ibarrier(MPI_Comm comm)
Definition: mpi_util.hpp:46
T mpi_reduce_value(const T &value, int root_rank, MPI_Comm comm, MPI_Op op=MPI_SUM)
Definition: mpi_util.hpp:170
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
bool mpi_test(MPI_Request &req)
Definition: mpi_util.hpp:254
Definition: aarch64.hpp:5
monoid< T, max_functor<>, lowest< T > > max
Definition: reducer.hpp:104
rank_t my_rank()
Return the rank of the process running the current thread.
Definition: ityr.hpp:99
rank_t n_ranks()
Return the total number of processes.
Definition: ityr.hpp:107
void reverse(const ExecutionPolicy &policy, BidirectionalIterator first, BidirectionalIterator last)
Reverse a range.
Definition: parallel_loop.hpp:1014
ForwardIteratorD move(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d)
Move a range to another.
Definition: parallel_loop.hpp:934
#define ITYR_PROFILER_RECORD(event,...)
Definition: profiler.hpp:319
Definition: prof_events.hpp:130
Definition: prof_events.hpp:125
Definition: prof_events.hpp:120
Definition: prof_events.hpp:105
Definition: prof_events.hpp:100
Definition: prof_events.hpp:115
Definition: prof_events.hpp:110