Itoyori  v0.0.1
mpi_rma.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <mutex>
4 
5 #include "ityr/common/util.hpp"
8 #include "ityr/common/span.hpp"
10 #include "ityr/common/profiler.hpp"
12 
13 namespace ityr::common {
14 
15 inline void mpi_win_flush(int target_rank, MPI_Win win) {
17 #if ITYR_DEBUG_UCX
18  ucs_trace_func("origin: %d, target: %d", topology::my_rank(), target_rank);
19  auto t0 = wallclock::gettime_ns();
20 #endif
21  MPI_Win_flush(target_rank, win);
22 #if ITYR_DEBUG_UCX
23  auto t1 = wallclock::gettime_ns();
24  ucs_trace_func("time: %d ns", t1 - t0);
25  if (t1 - t0 > 1000000000L) {
26  ucs_info("MPI_Win_flush() took too long time: %f s", (double)(t1 - t0) / 1000000000.0);
27  }
28 #endif
29 }
30 
31 inline void mpi_win_flush_all(MPI_Win win) {
33 #if ITYR_DEBUG_UCX
34  ucs_trace_func("origin: %d", topology::my_rank());
35  auto t0 = wallclock::gettime_ns();
36 #endif
37  MPI_Win_flush_all(win);
38 #if ITYR_DEBUG_UCX
39  auto t1 = wallclock::gettime_ns();
40  ucs_trace_func("time: %d ns", t1 - t0);
41  if (t1 - t0 > 1000000000L) {
42  ucs_info("MPI_Win_flush_all() took too long time: %f s", (double)(t1 - t0) / 1000000000.0);
43  }
44 #endif
45 }
46 
47 template <typename T>
48 inline void mpi_get_nb(T* origin,
49  std::size_t count,
50  int target_rank,
51  std::size_t target_disp,
52  MPI_Win win) {
54 #if ITYR_DEBUG_UCX
55  ucs_trace_func("origin: %d, target: %d, %ld bytes", topology::my_rank(), target_rank, sizeof(T) * count);
56 #endif
57  ITYR_CHECK(win != MPI_WIN_NULL);
58  MPI_Get(origin,
59  sizeof(T) * count,
60  MPI_BYTE,
61  target_rank,
62  target_disp,
63  sizeof(T) * count,
64  MPI_BYTE,
65  win);
66 }
67 
68 template <typename T>
69 inline void mpi_get(T* origin,
70  std::size_t count,
71  int target_rank,
72  std::size_t target_disp,
73  MPI_Win win) {
74  mpi_get_nb(origin, count, target_rank, target_disp, win);
75  mpi_win_flush(target_rank, win);
76 }
77 
78 template <typename T>
79 inline MPI_Request mpi_rget(T* origin,
80  std::size_t count,
81  int target_rank,
82  std::size_t target_disp,
83  MPI_Win win) {
84  ITYR_CHECK(win != MPI_WIN_NULL);
85 #if ITYR_DEBUG_UCX
86  ucs_trace_func("origin: %d, target: %d, %ld bytes", topology::my_rank(), target_rank, sizeof(T) * count);
87 #endif
88  MPI_Request req;
89  MPI_Rget(origin,
90  sizeof(T) * count,
91  MPI_BYTE,
92  target_rank,
93  target_disp,
94  sizeof(T) * count,
95  MPI_BYTE,
96  win,
97  &req);
98  return req;
99 }
100 
101 template <typename T>
102 inline T mpi_get_value(int target_rank,
103  std::size_t target_disp,
104  MPI_Win win) {
105  T value;
106  mpi_get(&value, 1, target_rank, target_disp, win);
107  return value;
108 }
109 
110 template <typename T>
111 inline void mpi_put_nb(const T* origin,
112  std::size_t count,
113  int target_rank,
114  std::size_t target_disp,
115  MPI_Win win) {
117 #if ITYR_DEBUG_UCX
118  ucs_trace_func("origin: %d, target: %d, %ld bytes", topology::my_rank(), target_rank, sizeof(T) * count);
119 #endif
120  ITYR_CHECK(win != MPI_WIN_NULL);
121  MPI_Put(origin,
122  sizeof(T) * count,
123  MPI_BYTE,
124  target_rank,
125  target_disp,
126  sizeof(T) * count,
127  MPI_BYTE,
128  win);
129 }
130 
131 template <typename T>
132 inline void mpi_put(const T* origin,
133  std::size_t count,
134  int target_rank,
135  std::size_t target_disp,
136  MPI_Win win) {
137  mpi_put_nb(origin, count, target_rank, target_disp, win);
138  mpi_win_flush(target_rank, win);
139 }
140 
141 template <typename T>
142 inline MPI_Request mpi_rput(const T* origin,
143  std::size_t count,
144  int target_rank,
145  std::size_t target_disp,
146  MPI_Win win) {
147  ITYR_CHECK(win != MPI_WIN_NULL);
148 #if ITYR_DEBUG_UCX
149  ucs_trace_func("origin: %d, target: %d, %ld bytes", topology::my_rank(), target_rank, sizeof(T) * count);
150 #endif
151  MPI_Request req;
152  MPI_Rput(origin,
153  sizeof(T) * count,
154  MPI_BYTE,
155  target_rank,
156  target_disp,
157  sizeof(T) * count,
158  MPI_BYTE,
159  win,
160  &req);
161  return req;
162 }
163 
164 template <typename T>
165 inline void mpi_put_value(const T& value,
166  int target_rank,
167  std::size_t target_disp,
168  MPI_Win win) {
169  mpi_put(&value, 1, target_rank, target_disp, win);
170 }
171 
172 template <typename T>
173 inline void mpi_atomic_faa_nb(const T* origin,
174  T* result,
175  int target_rank,
176  std::size_t target_disp,
177  MPI_Win win) {
179 #if ITYR_DEBUG_UCX
180  ucs_trace_func("origin: %d, target: %d", topology::my_rank(), target_rank);
181 #endif
182  ITYR_CHECK(win != MPI_WIN_NULL);
183  MPI_Fetch_and_op(origin,
184  result,
185  mpi_type<T>(),
186  target_rank,
187  target_disp,
188  MPI_SUM,
189  win);
190 }
191 
192 template <typename T>
193 inline T mpi_atomic_faa_value(const T& value,
194  int target_rank,
195  std::size_t target_disp,
196  MPI_Win win) {
197  T result;
198  mpi_atomic_faa_nb(&value, &result, target_rank, target_disp, win);
199  mpi_win_flush(target_rank, win);
200  return result;
201 }
202 
203 template <typename T>
204 inline void mpi_atomic_cas_nb(const T* origin,
205  const T* compare,
206  T* result,
207  int target_rank,
208  std::size_t target_disp,
209  MPI_Win win) {
211  ITYR_CHECK(win != MPI_WIN_NULL);
212  MPI_Compare_and_swap(origin,
213  compare,
214  result,
215  mpi_type<T>(),
216  target_rank,
217  target_disp,
218  win);
219 }
220 
221 template <typename T>
222 inline T mpi_atomic_cas_value(const T& value,
223  const T& compare,
224  int target_rank,
225  std::size_t target_disp,
226  MPI_Win win) {
227  T result;
228  mpi_atomic_cas_nb(&value, &compare, &result, target_rank, target_disp, win);
229  mpi_win_flush(target_rank, win);
230  return result;
231 }
232 
233 template <typename T>
234 inline void mpi_atomic_get_nb(T* origin,
235  int target_rank,
236  std::size_t target_disp,
237  MPI_Win win) {
239 #if ITYR_DEBUG_UCX
240  ucs_trace_func("origin: %d, target: %d", topology::my_rank(), target_rank);
241 #endif
242  ITYR_CHECK(win != MPI_WIN_NULL);
243  MPI_Fetch_and_op(nullptr,
244  origin,
245  mpi_type<T>(),
246  target_rank,
247  target_disp,
248  MPI_NO_OP,
249  win);
250 }
251 
252 template <typename T>
253 inline T mpi_atomic_get_value(int target_rank,
254  std::size_t target_disp,
255  MPI_Win win) {
256  T result;
257  mpi_atomic_get_nb(&result, target_rank, target_disp, win);
258  mpi_win_flush(target_rank, win);
259  return result;
260 }
261 
262 template <typename T>
263 inline void mpi_atomic_put_nb(const T* origin,
264  T* result,
265  int target_rank,
266  std::size_t target_disp,
267  MPI_Win win) {
269 #if ITYR_DEBUG_UCX
270  ucs_trace_func("origin: %d, target: %d", topology::my_rank(), target_rank);
271 #endif
272  ITYR_CHECK(win != MPI_WIN_NULL);
273  MPI_Fetch_and_op(origin,
274  result,
275  mpi_type<T>(),
276  target_rank,
277  target_disp,
278  MPI_REPLACE,
279  win);
280 }
281 
282 template <typename T>
283 inline T mpi_atomic_put_value(const T& value,
284  int target_rank,
285  std::size_t target_disp,
286  MPI_Win win) {
287  T result;
288  mpi_atomic_put_nb(&value, &result, target_rank, target_disp, win);
289  mpi_win_flush(target_rank, win);
290  return result;
291 }
292 
293 template <typename T>
294 class mpi_win_manager;
295 
296 template <>
297 class mpi_win_manager<void> {
298 public:
300  mpi_win_manager(MPI_Comm comm) {
301  MPI_Win_create_dynamic(MPI_INFO_NULL, comm, &win_);
302  MPI_Win_lock_all(MPI_MODE_NOCHECK, win_);
303  wireup(comm);
304  }
305  mpi_win_manager(MPI_Comm comm, std::size_t size, std::size_t alignment = alignof(max_align_t)) {
307  // TODO: handle alignment
308  MPI_Win_allocate(size, 1, MPI_INFO_NULL, comm, &baseptr_, &win_);
309  } else {
310  // In Fujitsu MPI, a large communication latency was observed only when we used
311  // MPI_Win_allocate, and here is a workaround for it.
312  baseptr_ = std::aligned_alloc(alignment, size);
313  MPI_Win_create(baseptr_, size, 1, MPI_INFO_NULL, comm, &win_);
314  }
315  ITYR_CHECK(win_ != MPI_WIN_NULL);
316  MPI_Win_lock_all(MPI_MODE_NOCHECK, win_);
317  wireup(comm);
318  }
319  mpi_win_manager(MPI_Comm comm, void* baseptr, std::size_t size) : baseptr_(baseptr) {
320  MPI_Win_create(baseptr,
321  size,
322  1,
323  MPI_INFO_NULL,
324  comm,
325  &win_);
326  ITYR_CHECK(win_ != MPI_WIN_NULL);
327  MPI_Win_lock_all(MPI_MODE_NOCHECK, win_);
328  wireup(comm);
329  }
330 
331  ~mpi_win_manager() { destroy(); }
332 
335 
336  mpi_win_manager(mpi_win_manager&& wm) noexcept : win_(wm.win_) { wm.win_ = MPI_WIN_NULL; }
338  destroy();
339  win_ = wm.win_;
340  wm.win_ = MPI_WIN_NULL;
341  return *this;
342  }
343 
344  MPI_Win win() const { return win_; }
345  void* baseptr() const { return baseptr_; }
346 
347 private:
348  void destroy() {
349  if (win_ != MPI_WIN_NULL) {
350  MPI_Win_unlock_all(win_);
351  MPI_Win_free(&win_);
352  // TODO: free baseptr_ when ITYR_RMA_USE_MPI_WIN_ALLOCATE=false and the user
353  // did not provide a buffer (or remove the option)
354  }
355  }
356 
357  void wireup(MPI_Comm comm) {
358  static std::once_flag flag;
359  std::call_once(flag, [&]() {
360  // Invoke wireup routines in the internal of MPI, assuming that this is the first
361  // one-sided communication since MPI_Init. MPI_MODE_NOCHECK will not involve communication.
362  int my_rank = mpi_comm_rank(comm);
363  int n_ranks = mpi_comm_size(comm);
364  for (int i = 1; i <= n_ranks / 2; i++) {
365  int target_rank = (my_rank + i) % n_ranks;
366  mpi_get_value<char>(target_rank, 0, win_);
367  }
368  });
369  }
370 
371  MPI_Win win_ = MPI_WIN_NULL;
372  void* baseptr_ = nullptr;
373 };
374 
375 // This value should be larger than a cacheline size, because otherwise
376 // the buffers on different processes may be allocated to the same cacheline,
377 // which can cause unintended cache misses (false sharing).
378 // TODO: use hardware_destructive_interference_size?
379 inline constexpr std::size_t mpi_win_size_min = 1024;
380 
381 template <typename T>
383 public:
385  mpi_win_manager(MPI_Comm comm)
386  : win_(comm),
387  comm_(comm) {}
388  template <typename... ElemArgs>
389  mpi_win_manager(MPI_Comm comm, std::size_t count, ElemArgs&&... args)
390  : win_(comm, round_up_pow2(sizeof(T) * count, mpi_win_size_min), alignof(T)),
391  comm_(comm),
392  local_buf_(init_local_buf(count, std::forward<ElemArgs>(args)...)) {}
393  mpi_win_manager(MPI_Comm comm, T* baseptr, std::size_t count)
394  : win_(comm, baseptr, sizeof(T) * count),
395  comm_(comm) {}
396 
398  if (win_.win() != MPI_WIN_NULL) {
399  destroy_local_buf();
400  }
401  }
402 
405 
408 
409  MPI_Win win() const { return win_.win(); }
410  T* baseptr() const { return reinterpret_cast<T*>(win_.baseptr()); }
411 
412  span<T> local_buf() const { return local_buf_; }
413 
414 private:
415  template <typename... ElemArgs>
416  span<T> init_local_buf(std::size_t count, ElemArgs... args) const {
417  T* local_base = baseptr();
418  ITYR_REQUIRE(reinterpret_cast<uintptr_t>(local_base) % alignof(T) == 0);
419 
420  for (std::size_t i = 0; i < count; i++) {
421  new (local_base + i) T{args...};
422  }
423  mpi_barrier(comm_);
424  return span<T>{local_base, count};
425  }
426 
427  void destroy_local_buf() const {
428  if (!local_buf_.empty()) {
429  mpi_barrier(comm_);
430  std::destroy(local_buf_.begin(), local_buf_.end());
431  }
432  }
433 
434  mpi_win_manager<void> win_;
435  MPI_Comm comm_;
436  span<T> local_buf_;
437 };
438 
439 }
mpi_win_manager & operator=(const mpi_win_manager &)=delete
mpi_win_manager(MPI_Comm comm, std::size_t size, std::size_t alignment=alignof(max_align_t))
Definition: mpi_rma.hpp:305
mpi_win_manager(MPI_Comm comm, void *baseptr, std::size_t size)
Definition: mpi_rma.hpp:319
mpi_win_manager & operator=(mpi_win_manager &&wm) noexcept
Definition: mpi_rma.hpp:337
MPI_Win win() const
Definition: mpi_rma.hpp:344
mpi_win_manager(mpi_win_manager &&wm) noexcept
Definition: mpi_rma.hpp:336
mpi_win_manager(const mpi_win_manager &)=delete
mpi_win_manager()
Definition: mpi_rma.hpp:299
~mpi_win_manager()
Definition: mpi_rma.hpp:331
mpi_win_manager(MPI_Comm comm)
Definition: mpi_rma.hpp:300
void * baseptr() const
Definition: mpi_rma.hpp:345
Definition: mpi_rma.hpp:382
span< T > local_buf() const
Definition: mpi_rma.hpp:412
mpi_win_manager(mpi_win_manager &&)=default
mpi_win_manager(MPI_Comm comm, std::size_t count, ElemArgs &&... args)
Definition: mpi_rma.hpp:389
mpi_win_manager(const mpi_win_manager &)=delete
mpi_win_manager & operator=(mpi_win_manager &&)=default
mpi_win_manager(MPI_Comm comm)
Definition: mpi_rma.hpp:385
mpi_win_manager()
Definition: mpi_rma.hpp:384
T * baseptr() const
Definition: mpi_rma.hpp:410
mpi_win_manager & operator=(const mpi_win_manager &)=delete
~mpi_win_manager()
Definition: mpi_rma.hpp:397
mpi_win_manager(MPI_Comm comm, T *baseptr, std::size_t count)
Definition: mpi_rma.hpp:393
MPI_Win win() const
Definition: mpi_rma.hpp:409
static value_type value()
Definition: options.hpp:62
Definition: span.hpp:11
#define ITYR_CHECK(cond)
Definition: util.hpp:48
#define ITYR_REQUIRE(cond)
Definition: util.hpp:42
ITYR_RMA_IMPL::win win
Definition: rma.hpp:13
rank_t my_rank()
Definition: topology.hpp:207
wallclock_t gettime_ns()
Definition: wallclock.hpp:88
Definition: allocator.hpp:16
T round_up_pow2(T x, T alignment)
Definition: util.hpp:142
T mpi_atomic_put_value(const T &value, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:283
va_list args
Definition: util.hpp:76
void mpi_win_flush_all(MPI_Win win)
Definition: mpi_rma.hpp:31
void mpi_get(T *origin, std::size_t count, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:69
void mpi_get_nb(T *origin, std::size_t count, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:48
void mpi_atomic_get_nb(T *origin, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:234
T mpi_get_value(int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:102
void mpi_put_nb(const T *origin, std::size_t count, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:111
int mpi_comm_rank(MPI_Comm comm)
Definition: mpi_util.hpp:28
T mpi_atomic_get_value(int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:253
void mpi_put(const T *origin, std::size_t count, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:132
void mpi_win_flush(int target_rank, MPI_Win win)
Definition: mpi_rma.hpp:15
constexpr auto size(const span< T > &s) noexcept
Definition: span.hpp:61
void mpi_put_value(const T &value, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:165
int mpi_comm_size(MPI_Comm comm)
Definition: mpi_util.hpp:35
constexpr std::size_t mpi_win_size_min
Definition: mpi_rma.hpp:379
T mpi_atomic_cas_value(const T &value, const T &compare, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:222
void mpi_atomic_put_nb(const T *origin, T *result, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:263
MPI_Request mpi_rput(const T *origin, std::size_t count, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:142
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
void mpi_atomic_faa_nb(const T *origin, T *result, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:173
void mpi_atomic_cas_nb(const T *origin, const T *compare, T *result, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:204
MPI_Request mpi_rget(T *origin, std::size_t count, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:79
T mpi_atomic_faa_value(const T &value, int target_rank, std::size_t target_disp, MPI_Win win)
Definition: mpi_rma.hpp:193
rank_t my_rank()
Return the rank of the process running the current thread.
Definition: ityr.hpp:99
rank_t n_ranks()
Return the total number of processes.
Definition: ityr.hpp:107
#define ITYR_PROFILER_RECORD(event,...)
Definition: profiler.hpp:319
Definition: prof_events.hpp:54
Definition: prof_events.hpp:49
Definition: prof_events.hpp:59
Definition: prof_events.hpp:64
Definition: prof_events.hpp:69
Definition: prof_events.hpp:39
Definition: prof_events.hpp:44