Itoyori  v0.0.1
ito.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <functional>
4 
5 #include "ityr/common/util.hpp"
11 #include "ityr/ito/util.hpp"
12 #include "ityr/ito/options.hpp"
13 #include "ityr/ito/thread.hpp"
14 #include "ityr/ito/worker.hpp"
15 #include "ityr/ito/prof_events.hpp"
16 
17 namespace ityr::ito {
18 
19 class ito {
20 public:
21  ito(MPI_Comm comm)
22  : mi_(comm),
23  topo_(comm) {}
24 
25 private:
27  common::runtime_options common_opts_;
31  common::prof_events common_prof_events_;
32 
33  runtime_options ito_opts_;
34  aslr_checker aslr_checker_;
36  prof_events ito_prof_events_;
37 };
38 
40 
41 inline void init(MPI_Comm comm = MPI_COMM_WORLD) {
42  instance::init(comm);
43 }
44 
45 inline void fini() {
47 }
48 
49 template <typename Fn, typename... Args>
50 inline auto root_exec(Fn&& fn, Args&&... args) {
51  return root_exec(with_callback, nullptr, std::forward<Fn>(fn), std::forward<Args>(args)...);
52 }
53 
54 template <typename SchedLoopCallback, typename Fn, typename... Args>
55 inline auto root_exec(with_callback_t, SchedLoopCallback cb, Fn&& fn, Args&&... args) {
56  auto& w = worker::instance::get();
57  ITYR_CHECK(w.is_spmd());
58  return w.root_exec(cb, std::forward<Fn>(fn), std::forward<Args>(args)...);
59 }
60 
61 inline bool is_spmd() {
62  auto& w = worker::instance::get();
63  return w.is_spmd();
64 }
65 
66 inline bool is_root() {
67  auto& w = worker::instance::get();
68  return w.sched().is_executing_root();
69 }
70 
71 template <typename Fn, typename... Args>
72 inline auto coll_exec(const Fn& fn, const Args&... args) {
73  ITYR_CHECK(!is_spmd());
75  auto& w = worker::instance::get();
76  return w.coll_exec(fn, args...);
77 }
78 
79 template <typename PreSuspendCallback, typename PostSuspendCallback>
80 inline void migrate_to(common::topology::rank_t target_rank,
81  PreSuspendCallback&& pre_suspend_cb,
82  PostSuspendCallback&& post_suspend_cb) {
83  ITYR_CHECK(!is_spmd());
85  auto& w = worker::instance::get();
86  w.sched().migrate_to(target_rank,
87  std::forward<PreSuspendCallback>(pre_suspend_cb),
88  std::forward<PostSuspendCallback>(post_suspend_cb));
89 }
90 
91 inline void migrate_to(common::topology::rank_t target_rank) {
92  migrate_to(target_rank, nullptr, nullptr);
93 }
94 
95 template <typename PreSuspendCallback, typename PostSuspendCallback>
96 inline void poll(PreSuspendCallback&& pre_suspend_cb,
97  PostSuspendCallback&& post_suspend_cb) {
98  auto& w = worker::instance::get();
99  w.sched().poll(std::forward<PreSuspendCallback>(pre_suspend_cb),
100  std::forward<PostSuspendCallback>(post_suspend_cb));
101 }
102 
104 
105 inline void task_group_begin(task_group_data* tgdata) {
106  auto& w = worker::instance::get();
107  ITYR_CHECK(!w.is_spmd());
108  w.sched().task_group_begin(tgdata);
109 }
110 
111 template <typename PreSuspendCallback, typename PostSuspendCallback>
112 inline void task_group_end(PreSuspendCallback&& pre_suspend_cb,
113  PostSuspendCallback&& post_suspend_cb) {
114  auto& w = worker::instance::get();
115  ITYR_CHECK(!w.is_spmd());
116  w.sched().task_group_end(std::forward<PreSuspendCallback>(pre_suspend_cb),
117  std::forward<PostSuspendCallback>(post_suspend_cb));
118 }
119 
120 inline void dag_prof_begin() {
121  auto& w = worker::instance::get();
122  ITYR_CHECK(w.is_spmd());
123  w.sched().dag_prof_begin();
124 }
125 
126 inline void dag_prof_end() {
127  auto& w = worker::instance::get();
128  ITYR_CHECK(w.is_spmd());
129  w.sched().dag_prof_end();
130 }
131 
132 inline void dag_prof_print() {
133  auto& w = worker::instance::get();
134  ITYR_CHECK(w.is_spmd());
135  w.sched().dag_prof_print();
136 }
137 
138 ITYR_TEST_CASE("[ityr::ito] fib") {
139  init();
140 
141  std::function<int(int)> fib = [&](int n) -> int {
142  if (n <= 1) {
143  return 1;
144  } else {
145  thread<int> th([=]{ return fib(n - 1); });
146  int y = fib(n - 2);
147  int x = th.join();
148  return x + y;
149  }
150  };
151 
152  int r = root_exec(fib, 10);
153  ITYR_CHECK(r == 89);
154 
155  fini();
156 }
157 
158 ITYR_TEST_CASE("[ityr::ito] load balancing") {
159  init();
160 
161  ITYR_CHECK(is_spmd());
162  ITYR_CHECK(!is_root());
163 
164  std::function<void(int)> lb = [&](int n) {
165  if (n == 0) {
166  return;
167  } else if (n == 1) {
169  } else {
170  thread<void> th([=]{ ITYR_CHECK(!is_root()); return lb(n / 2); });
171  lb(n - n / 2);
172  th.join();
173  }
174  };
175 
176  root_exec([&] {
177  ITYR_CHECK(!is_spmd());
178  ITYR_CHECK(is_root());
179 
181 
183  auto ret = coll_exec([=] {
185  });
187  });
188 
189  ITYR_CHECK(is_spmd());
190  ITYR_CHECK(!is_root());
191 
192  fini();
193 }
194 
195 ITYR_TEST_CASE("[ityr::ito] move semantics") {
196  init();
197 
198  common::move_only_t mo1(2);
199  root_exec([](common::move_only_t mo1) {
200  common::move_only_t mo2(3 + mo1.value());
201 
202  thread<common::move_only_t> th([](common::move_only_t mo2) {
203  common::move_only_t mo3(4 + mo2.value());
204  return mo3;
205  }, std::move(mo2));
206 
207  common::move_only_t ret = th.join();
208 
209  ITYR_CHECK(ret.value() == 9);
210  }, std::move(mo1));
211 
212  fini();
213 }
214 
215 ITYR_TEST_CASE("[ityr::ito] nested root/coll_exec()") {
216  init();
217 
218  auto ret = root_exec([] {
219  ITYR_CHECK(is_root());
220  ITYR_CHECK(!is_spmd());
221 
223  ITYR_CHECK(is_spmd());
224 
225  auto ret = root_exec([] {
226  ITYR_CHECK(is_root());
227  ITYR_CHECK(!is_spmd());
228 
230  ITYR_CHECK(is_spmd());
231 
232  auto ret = root_exec([] {
233  ITYR_CHECK(is_root());
234  ITYR_CHECK(!is_spmd());
235 
237  ITYR_CHECK(is_spmd());
239  });
240  });
241 
243  });
244  });
245 
247  });
248  });
250 
251  fini();
252 }
253 
254 ITYR_TEST_CASE("[ityr::ito] migrate_to") {
255  init();
256 
257  root_exec([] {
258  for (int i = 0; i < 10; i++) {
259  auto target_rank = i % common::topology::n_ranks();
260  migrate_to(target_rank);
261  ITYR_CHECK(common::topology::my_rank() == target_rank);
262  }
263  });
264 
265  fini();
266 }
267 
268 }
Definition: mpi_util.hpp:430
Definition: prof_events.hpp:124
Definition: util.hpp:207
Definition: util.hpp:176
static auto & get()
Definition: util.hpp:180
static void init(Args &&... args)
Definition: util.hpp:190
static void fini()
Definition: util.hpp:194
Definition: util.hpp:10
Definition: ito.hpp:19
ito(MPI_Comm comm)
Definition: ito.hpp:21
Definition: prof_events.hpp:220
#define ITYR_CHECK(cond)
Definition: util.hpp:48
rank_t n_ranks()
Definition: topology.hpp:208
int rank_t
Definition: topology.hpp:12
MPI_Comm mpicomm()
Definition: topology.hpp:206
rank_t my_rank()
Definition: topology.hpp:207
va_list args
Definition: util.hpp:76
T mpi_reduce_value(const T &value, int root_rank, MPI_Comm comm, MPI_Op op=MPI_SUM)
Definition: mpi_util.hpp:170
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
Definition: aarch64.hpp:5
void fini()
Definition: ito.hpp:45
void migrate_to(common::topology::rank_t target_rank, PreSuspendCallback &&pre_suspend_cb, PostSuspendCallback &&post_suspend_cb)
Definition: ito.hpp:80
auto root_exec(Fn &&fn, Args &&... args)
Definition: ito.hpp:50
void task_group_begin(task_group_data *tgdata)
Definition: ito.hpp:105
void dag_prof_print()
Definition: ito.hpp:132
auto coll_exec(const Fn &fn, const Args &... args)
Definition: ito.hpp:72
void init(MPI_Comm comm=MPI_COMM_WORLD)
Definition: ito.hpp:41
bool is_root()
Definition: ito.hpp:66
void poll(PreSuspendCallback &&pre_suspend_cb, PostSuspendCallback &&post_suspend_cb)
Definition: ito.hpp:96
bool is_spmd()
Definition: ito.hpp:61
void dag_prof_begin()
Definition: ito.hpp:120
constexpr with_callback_t with_callback
Definition: thread.hpp:11
void task_group_end(PreSuspendCallback &&pre_suspend_cb, PostSuspendCallback &&post_suspend_cb)
Definition: ito.hpp:112
void dag_prof_end()
Definition: ito.hpp:126
scheduler::task_group_data task_group_data
Definition: ito.hpp:103
rank_t my_rank()
Return the rank of the process running the current thread.
Definition: ityr.hpp:99
ForwardIteratorD move(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d)
Move a range to another.
Definition: parallel_loop.hpp:934
Definition: options.hpp:153
Definition: options.hpp:80
Definition: thread.hpp:10