Itoyori  v0.0.1
utofu.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include "ityr/common/util.hpp"
6 
7 #if ITYR_RMA_IMPL == utofu && __has_include(<utofu.h>)
8 
9 #include <utofu.h>
10 
11 namespace ityr::common::rma {
12 
13 class utofu {
14 public:
15  utofu()
16  : vcq_hdl_(init_vcq_hdl()),
17  vcq_ids_(init_vcq_ids()) {}
18 
19  ~utofu() {
21  utofu_free_vcq(vcq_hdl_);
22  }
23 
24  class win {
25  public:
26  win(utofu_vcq_hdl_t vcq_hdl, void* baseptr, std::size_t bytes)
27  : vcq_hdl_(vcq_hdl),
28  baseptr_(reinterpret_cast<std::byte*>(baseptr)),
29  bytes_(bytes),
30  stadds_(init_stadds()) {}
31 
32  ~win() { destroy(); }
33 
34  win(const win&) = delete;
35  win& operator=(const win&) = delete;
36 
37  win(win&& w) : vcq_hdl_(w.vcq_hdl_), baseptr_(w.baseptr_), bytes_(w.bytes_), stadds_(std::move(w.stadds_)) {
38  w.stadds_ = {};
39  }
40  win& operator=(win&& w) {
41  destroy();
42  vcq_hdl_ = w.vcq_hdl_;
43  baseptr_ = w.baseptr_;
44  bytes_ = w.bytes_;
45  stadds_ = std::move(w.stadds_);
46  w.stadds_ = {};
47  return *this;
48  }
49 
50  utofu_stadd_t stadd(topology::rank_t target_rank) const {
51  ITYR_CHECK(0 <= target_rank);
52  ITYR_CHECK(target_rank < topology::n_ranks());
53  ITYR_CHECK(stadds_.size() == topology::n_ranks());
54  return stadds_[target_rank];
55  }
56 
57  utofu_stadd_t my_stadd() const {
58  return stadd(topology::my_rank());
59  }
60 
61  utofu_stadd_t my_stadd(const void* ptr) const {
62  std::size_t offset = reinterpret_cast<const std::byte*>(ptr) - baseptr_;
63  return stadd(topology::my_rank()) + offset;
64  }
65 
66  private:
67  void destroy() {
68  if (stadds_.size() == topology::n_ranks()) {
70  utofu_dereg_mem(vcq_hdl_, my_stadd(), 0);
71  }
72  }
73 
74  std::vector<utofu_stadd_t> init_stadds() {
75  utofu_stadd_t my_stadd;
76  int r = utofu_reg_mem(vcq_hdl_, baseptr_, bytes_, 0, &my_stadd);
77  ITYR_CHECK_MESSAGE(r == UTOFU_SUCCESS, "utofu_reg_mem() error: %d", r);
78 
79  std::vector<utofu_stadd_t> stadds(topology::n_ranks());
80  MPI_Allgather(&my_stadd, 1, MPI_UINT64_T, stadds.data(), 1, MPI_UINT64_T, MPI_COMM_WORLD);
81  return stadds;
82  }
83 
84  utofu_vcq_hdl_t vcq_hdl_;
85  std::byte* baseptr_;
86  std::size_t bytes_;
87  std::vector<utofu_stadd_t> stadds_;
88  };
89 
90  win create_win(void* baseptr, std::size_t bytes) {
91  return win(vcq_hdl_, baseptr, bytes);
92  }
93 
94  void get_nb(const win& origin_win,
95  std::byte* origin_addr,
96  std::size_t bytes,
97  const win& target_win,
98  int target_rank,
99  std::size_t target_disp) {
100  constexpr unsigned long int post_flags = UTOFU_ONESIDED_FLAG_TCQ_NOTICE |
101  UTOFU_ONESIDED_FLAG_LOCAL_MRQ_NOTICE;
102  while (true) {
103  int r = utofu_get(vcq_hdl_, vcq_ids_[target_rank], origin_win.my_stadd(origin_addr),
104  target_win.stadd(target_rank) + target_disp, bytes, 0, post_flags, 0);
105  if (r == UTOFU_ERR_BUSY) {
106  poll_tcq_until_empty();
107  } else {
108  ITYR_CHECK_MESSAGE(r == UTOFU_SUCCESS, "utofu_get() error: %d", r);
109  break;
110  }
111  }
112 
113  n_ongoing_tcq_reqs_++;
114  n_ongoing_mrq_reqs_++;
115  }
116 
117  void get_nb(std::byte*, std::size_t, const win&, int, std::size_t) {
118  common::die("utofu rma layer is not supported for get/put (nocache) interface");
119  }
120 
121  void put_nb(const win& origin_win,
122  const std::byte* origin_addr,
123  std::size_t bytes,
124  const win& target_win,
125  int target_rank,
126  std::size_t target_disp) {
127  constexpr unsigned long int post_flags = UTOFU_ONESIDED_FLAG_TCQ_NOTICE |
128  UTOFU_ONESIDED_FLAG_LOCAL_MRQ_NOTICE;
129  while (true) {
130  int r = utofu_put(vcq_hdl_, vcq_ids_[target_rank], origin_win.my_stadd(origin_addr),
131  target_win.stadd(target_rank) + target_disp, bytes, 0, post_flags, 0);
132  if (r == UTOFU_ERR_BUSY) {
133  poll_tcq_until_empty();
134  } else {
135  ITYR_CHECK_MESSAGE(r == UTOFU_SUCCESS, "utofu_put() error: %d", r);
136  break;
137  }
138  }
139 
140  n_ongoing_tcq_reqs_++;
141  n_ongoing_mrq_reqs_++;
142  }
143 
144  void put_nb(const std::byte*, std::size_t, const win&, int, std::size_t) {
145  common::die("utofu rma layer is not supported for get/put (nocache) interface");
146  }
147 
148  void flush(const win&) {
149  // TODO: flush for each win
150  for (int i = 0; i < n_ongoing_tcq_reqs_; i++) {
151  void* cbdata;
152  while (true) {
153  int r = utofu_poll_tcq(vcq_hdl_, 0, &cbdata);
154  if (r != UTOFU_ERR_NOT_FOUND) {
155  ITYR_CHECK_MESSAGE(r == UTOFU_SUCCESS, "utofu_poll_tcq() error: %d", r);
156  break;
157  }
158  }
159  }
160  n_ongoing_tcq_reqs_ = 0;
161 
162  for (int i = 0; i < n_ongoing_mrq_reqs_; i++) {
163  struct utofu_mrq_notice notice;
164  while (true) {
165  int r = utofu_poll_mrq(vcq_hdl_, 0, &notice);
166  if (r != UTOFU_ERR_NOT_FOUND) {
167  ITYR_CHECK_MESSAGE(r == UTOFU_SUCCESS, "utofu_poll_mrq() error: %d", r);
168  break;
169  }
170  }
171  }
172  n_ongoing_mrq_reqs_ = 0;
173  }
174 
175 private:
176  utofu_vcq_hdl_t init_vcq_hdl() {
177  std::size_t num_tnis;
178  utofu_tni_id_t* tni_ids;
179  utofu_get_onesided_tnis(&tni_ids, &num_tnis);
180 
181  utofu_tni_id_t tni_id = tni_ids[0];
182  free(tni_ids);
183 
184  utofu_vcq_hdl_t vcq_hdl;
185  int r = utofu_create_vcq(tni_id, 0, &vcq_hdl);
186  ITYR_CHECK_MESSAGE(r == UTOFU_SUCCESS, "utofu_create_vcq() error: %d", r);
187 
188  return vcq_hdl;
189  }
190 
191  std::vector<utofu_vcq_id_t> init_vcq_ids() {
192  utofu_vcq_id_t my_vcq_id;
193  int r = utofu_query_vcq_id(vcq_hdl_, &my_vcq_id);
194  ITYR_CHECK_MESSAGE(r == UTOFU_SUCCESS, "utofu_query_vcq_id() error: %d", r);
195 
196  std::vector<utofu_vcq_id_t> vcq_ids(topology::n_ranks());
197  MPI_Allgather(&my_vcq_id, 1, MPI_UINT64_T, vcq_ids.data(), 1, MPI_UINT64_T, MPI_COMM_WORLD);
198 
199  return vcq_ids;
200  }
201 
202  void poll_tcq_until_empty() {
203  void* cbdata;
204  while (true) {
205  int r = utofu_poll_tcq(vcq_hdl_, 0, &cbdata);
206  if (r == UTOFU_SUCCESS) {
207  n_ongoing_tcq_reqs_--;
208  } else {
209  ITYR_CHECK_MESSAGE(r == UTOFU_ERR_NOT_FOUND, "utofu_poll_tcq() error: %d", r);
210  break;
211  }
212  }
213  }
214 
215  utofu_vcq_hdl_t vcq_hdl_;
216  std::vector<utofu_vcq_id_t> vcq_ids_;
217  int n_ongoing_tcq_reqs_ = 0;
218  int n_ongoing_mrq_reqs_ = 0;
219 };
220 
221 }
222 
223 #endif
#define ITYR_CHECK_MESSAGE(cond,...)
Definition: util.hpp:49
#define ITYR_CHECK(cond)
Definition: util.hpp:48
Definition: mpi.hpp:8
void get_nb(const win &origin_win, T *origin_addr, std::size_t count, const win &target_win, int target_rank, std::size_t target_disp)
Definition: rma.hpp:25
void flush(const win &target_win)
Definition: rma.hpp:76
std::unique_ptr< win > create_win(T *baseptr, std::size_t count)
Definition: rma.hpp:16
ITYR_RMA_IMPL::win win
Definition: rma.hpp:13
void put_nb(const win &origin_win, const T *origin_addr, std::size_t count, const win &target_win, int target_rank, std::size_t target_disp)
Definition: rma.hpp:51
rank_t n_ranks()
Definition: topology.hpp:208
int rank_t
Definition: topology.hpp:12
MPI_Comm mpicomm()
Definition: topology.hpp:206
rank_t my_rank()
Definition: topology.hpp:207
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
void free(global_ptr< T > ptr, std::size_t count)
Definition: ori.hpp:75
ForwardIteratorD move(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d)
Move a range to another.
Definition: parallel_loop.hpp:934