Itoyori  v0.0.1
util.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <random>
4 #include <atomic>
5 
6 #include "ityr/common/util.hpp"
11 #include "ityr/ito/util.hpp"
12 #include "ityr/ito/options.hpp"
13 #include "ityr/ito/prof_events.hpp"
14 
15 namespace ityr::ito {
16 
17 /*
18  * DAG profiler
19  */
20 
22 public:
23  static constexpr bool enabled = false;
24  void start() {}
25  void stop() {}
26  bool is_stopped() const { return true; }
27  void clear() {}
32  void print() const {}
33 };
34 
36 public:
37  static constexpr bool enabled = true;
38 
39  void start() {
41  t_start_ = common::wallclock::gettime_ns();
42  }
43 
44  void stop() {
46  auto t_stop = common::wallclock::gettime_ns();
47  work_ += t_stop - t_start_;
48  span_ += t_stop - t_start_;
49  t_start_ = 0;
50  }
51 
52  bool is_stopped() const { return t_start_ == 0; }
53 
54  void clear() {
55  t_start_ = 0;
56  work_ = 0;
57  span_ = 0;
58  n_threads_ = 0;
59  n_strands_ = 0;
60  }
61 
64  ITYR_CHECK(dp.is_stopped());
65 
66  work_ += dp.work_;
67  span_ += dp.span_;
68  n_threads_ += dp.n_threads_;
69  n_strands_ += dp.n_strands_;
70  }
71 
74  ITYR_CHECK(dp.is_stopped());
75 
76  work_ += dp.work_;
77  span_ = std::max(span_, dp.span_);
78  n_threads_ += dp.n_threads_;
79  n_strands_ += dp.n_strands_;
80  }
81 
82  void increment_thread_count() { n_threads_++; }
83  void increment_strand_count() { n_strands_++; }
84 
85  void print() const {
86  printf("work: %ld ns span: %ld ns parallelism: %f\n"
87  "n_threads: %ld (ave: %ld ns) n_strands: %ld (ave: %ld ns)\n\n",
88  work_, span_, (span_ == 0) ? 0 : static_cast<double>(work_) / span_,
89  n_threads_, (n_threads_ == 0) ? 0 : work_ / n_threads_,
90  n_strands_, (n_strands_ == 0) ? 0 : work_ / n_strands_);
91  fflush(stdout);
92  }
93 
94 private:
95  common::wallclock::wallclock_t t_start_ = 0;
98  uint64_t n_threads_ = 0;
99  uint64_t n_strands_ = 0;
100 };
101 
103 
104 /*
105  * Misc
106  */
107 
109 public:
110  virtual ~task_general() = default;
111  virtual void execute() = 0;
112 };
113 
114 template <typename Fn, typename... Args>
115 class callable_task : public task_general {
116 public:
117  template <typename Fn_, typename... Args_>
118  callable_task(Fn_&& fn, Args_&&... args)
119  : fn_(std::forward<Fn_>(fn)), arg_(std::forward<Args_>(args)...) {}
120  void execute() { std::apply(std::move(fn_), std::move(arg_)); }
121 private:
122  Fn fn_;
123  std::tuple<Args...> arg_;
124 };
125 
126 struct no_retval_t {};
127 
130  static std::mt19937 engine(std::random_device{}());
131 
132  ITYR_CHECK(0 <= a);
133  ITYR_CHECK(a <= b);
135  std::uniform_int_distribution<common::topology::rank_t> dist(a, b);
136 
138  do {
139  rank = dist(engine);
140  } while (rank == common::topology::my_rank());
141 
142  ITYR_CHECK(a <= rank);
144  ITYR_CHECK(rank <= b);
145  return rank;
146 }
147 
148 template <typename T, typename Fn, typename ArgsTuple>
149 inline decltype(auto) invoke_fn(Fn&& fn, ArgsTuple&& args_tuple) {
150  if constexpr (!std::is_same_v<T, no_retval_t>) {
151  return std::apply(std::forward<Fn>(fn), std::forward<ArgsTuple>(args_tuple));
152  } else {
153  std::apply(std::forward<Fn>(fn), std::forward<ArgsTuple>(args_tuple));
154  return no_retval_t{};
155  }
156 }
157 
158 /*
159  * Call with profiler events
160  */
161 
162 template <typename Fn, typename... Args>
164  using type = std::invoke_result_t<Fn, Args...>;
165 };
166 
167 template <typename... Args>
168 struct callback_retval<std::nullptr_t, Args...> {
169  using type = void;
170 };
171 
172 template <typename... Args>
173 struct callback_retval<std::nullptr_t&, Args...> {
174  using type = void;
175 };
176 
177 template <typename Fn, typename... Args>
178 using callback_retval_t = typename callback_retval<Fn, Args...>::type;
179 
180 template <typename PhaseFrom, typename PhaseFn, typename PhaseTo,
181  typename Fn, typename... Args>
182 inline auto call_with_prof_events(Fn&& fn, Args&&... args) {
183  using retval_t = callback_retval_t<Fn, Args...>;
184 
185  if constexpr (!std::is_null_pointer_v<std::remove_reference_t<Fn>>) {
186  common::profiler::switch_phase<PhaseFrom, PhaseFn>();
187 
188  if constexpr (!std::is_void_v<retval_t>) {
189  auto ret = std::forward<Fn>(fn)(std::forward<Args>(args)...);
190  common::profiler::switch_phase<PhaseFn, PhaseTo>();
191  return ret;
192 
193  } else {
194  std::forward<Fn>(fn)(std::forward<Args>(args)...);
195  common::profiler::switch_phase<PhaseFn, PhaseTo>();
196  }
197 
198  } else if constexpr (!std::is_same_v<PhaseFrom, PhaseTo>) {
199  common::profiler::switch_phase<PhaseFrom, PhaseTo>();
200  }
201 
202  if constexpr (!std::is_void_v<retval_t>) {
203  return retval_t{};
204  } else {
205  return no_retval_t{};
206  }
207 }
208 
209 template <typename PhaseFrom, typename PhaseFn, typename PhaseTo,
210  typename Fn, typename... Args>
211 inline auto call_with_prof_events(Fn&& fn, no_retval_t, Args&&... args) {
212  // skip no_retval_t args
213  return call_with_prof_events<PhaseFrom, PhaseFn, PhaseTo>(
214  std::forward<Fn>(fn), std::forward<Args>(args)...);
215 }
216 
217 /*
218  * Mailbox
219  */
220 
221 template <typename Entry>
223  static_assert(std::is_trivially_copyable_v<Entry>);
224 
225 public:
227  : win_(common::topology::mpicomm(), 1) {}
228 
229  void put(const Entry& entry, common::topology::rank_t target_rank) {
231 
232  ITYR_CHECK(!common::mpi_get_value<int>(target_rank, offsetof(mailbox, arrived), win_.win()));
233  common::mpi_put_value(entry, target_rank, offsetof(mailbox, entry), win_.win());
234  common::mpi_atomic_put_value(1, target_rank, offsetof(mailbox, arrived), win_.win());
235  }
236 
237  std::optional<Entry> pop() {
238  mailbox& mb = win_.local_buf()[0];
239  if (mb.arrived.load(std::memory_order_acquire)) {
240  mb.arrived.store(0, std::memory_order_relaxed);
241  return mb.entry;
242  } else {
243  return std::nullopt;
244  }
245  }
246 
247  bool arrived() const {
248  return win_.local_buf()[0].arrived.load(std::memory_order_relaxed);
249  }
250 
251 private:
252  struct mailbox {
253  Entry entry;
254  std::atomic<int> arrived = 0; // TODO: better to use std::atomic_ref in C++20
255  };
256 
258 };
259 
260 template <>
261 class oneslot_mailbox<void> {
262 public:
264  : win_(common::topology::mpicomm(), 1) {}
265 
266  void put(common::topology::rank_t target_rank) {
268 
269  ITYR_CHECK(!common::mpi_get_value<int>(target_rank, offsetof(mailbox, arrived), win_.win()));
270  common::mpi_atomic_put_value(1, target_rank, offsetof(mailbox, arrived), win_.win());
271  }
272 
273  bool pop() {
274  mailbox& mb = win_.local_buf()[0];
275  if (mb.arrived.load(std::memory_order_acquire)) {
276  mb.arrived.store(0, std::memory_order_relaxed);
277  return true;
278  } else {
279  return false;
280  }
281  }
282 
283  bool arrived() const {
284  return win_.local_buf()[0].arrived.load(std::memory_order_relaxed);
285  }
286 
287 private:
288  struct mailbox {
289  std::atomic<int> arrived = 0; // TODO: better to use std::atomic_ref in C++20
290  };
291 
293 };
294 
295 }
span< T > local_buf() const
Definition: mpi_rma.hpp:412
MPI_Win win() const
Definition: mpi_rma.hpp:409
Definition: util.hpp:115
void execute()
Definition: util.hpp:120
callable_task(Fn_ &&fn, Args_ &&... args)
Definition: util.hpp:118
Definition: util.hpp:21
void clear()
Definition: util.hpp:27
void increment_strand_count()
Definition: util.hpp:31
static constexpr bool enabled
Definition: util.hpp:23
void merge_serial(const dag_profiler_disabled &)
Definition: util.hpp:28
void stop()
Definition: util.hpp:25
void increment_thread_count()
Definition: util.hpp:30
void start()
Definition: util.hpp:24
void merge_parallel(const dag_profiler_disabled &)
Definition: util.hpp:29
void print() const
Definition: util.hpp:32
bool is_stopped() const
Definition: util.hpp:26
Definition: util.hpp:35
void stop()
Definition: util.hpp:44
void print() const
Definition: util.hpp:85
void start()
Definition: util.hpp:39
void increment_strand_count()
Definition: util.hpp:83
static constexpr bool enabled
Definition: util.hpp:37
void clear()
Definition: util.hpp:54
void merge_parallel(const dag_profiler_workspan &dp)
Definition: util.hpp:72
void increment_thread_count()
Definition: util.hpp:82
bool is_stopped() const
Definition: util.hpp:52
void merge_serial(const dag_profiler_workspan &dp)
Definition: util.hpp:62
void put(common::topology::rank_t target_rank)
Definition: util.hpp:266
oneslot_mailbox()
Definition: util.hpp:263
bool arrived() const
Definition: util.hpp:283
bool pop()
Definition: util.hpp:273
Definition: util.hpp:222
bool arrived() const
Definition: util.hpp:247
std::optional< Entry > pop()
Definition: util.hpp:237
oneslot_mailbox()
Definition: util.hpp:226
void put(const Entry &entry, common::topology::rank_t target_rank)
Definition: util.hpp:229
Definition: util.hpp:108
virtual void execute()=0
virtual ~task_general()=default
#define ITYR_CONCAT(x, y)
Definition: util.hpp:20
#define ITYR_CHECK(cond)
Definition: util.hpp:48
#define ITYR_ITO_DAG_PROF
rank_t n_ranks()
Definition: topology.hpp:208
int rank_t
Definition: topology.hpp:12
MPI_Comm mpicomm()
Definition: topology.hpp:206
rank_t my_rank()
Definition: topology.hpp:207
uint64_t wallclock_t
Definition: wallclock.hpp:13
wallclock_t gettime_ns()
Definition: wallclock.hpp:88
T mpi_atomic_put_value(const T &value, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:283
va_list args
Definition: util.hpp:76
fflush(stderr)
void mpi_put_value(const T &value, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:165
Definition: aarch64.hpp:5
typename callback_retval< Fn, Args... >::type callback_retval_t
Definition: util.hpp:178
decltype(auto) invoke_fn(Fn &&fn, ArgsTuple &&args_tuple)
Definition: util.hpp:149
auto call_with_prof_events(Fn &&fn, Args &&... args)
Definition: util.hpp:182
ITYR_CONCAT(dag_profiler_, ITYR_ITO_DAG_PROF) dag_profiler
Definition: util.hpp:102
common::topology::rank_t get_random_rank(common::topology::rank_t a, common::topology::rank_t b)
Definition: util.hpp:128
monoid< T, max_functor<>, lowest< T > > max
Definition: reducer.hpp:104
ForwardIteratorD move(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d)
Move a range to another.
Definition: parallel_loop.hpp:934
#define ITYR_PROFILER_RECORD(event,...)
Definition: profiler.hpp:319
Definition: util.hpp:163
std::invoke_result_t< Fn, Args... > type
Definition: util.hpp:164
Definition: util.hpp:126
Definition: prof_events.hpp:90