7 #if ITYR_RMA_IMPL == utofu && __has_include(<utofu.h>)
16 : vcq_hdl_(init_vcq_hdl()),
17 vcq_ids_(init_vcq_ids()) {}
21 utofu_free_vcq(vcq_hdl_);
26 win(utofu_vcq_hdl_t vcq_hdl,
void* baseptr, std::size_t bytes)
28 baseptr_(reinterpret_cast<std::byte*>(baseptr)),
30 stadds_(init_stadds()) {}
35 win& operator=(
const win&) =
delete;
37 win(
win&& w) : vcq_hdl_(w.vcq_hdl_), baseptr_(w.baseptr_), bytes_(w.bytes_), stadds_(std::
move(w.stadds_)) {
42 vcq_hdl_ = w.vcq_hdl_;
43 baseptr_ = w.baseptr_;
54 return stadds_[target_rank];
57 utofu_stadd_t my_stadd()
const {
61 utofu_stadd_t my_stadd(
const void* ptr)
const {
62 std::size_t offset =
reinterpret_cast<const std::byte*
>(ptr) - baseptr_;
70 utofu_dereg_mem(vcq_hdl_, my_stadd(), 0);
74 std::vector<utofu_stadd_t> init_stadds() {
75 utofu_stadd_t my_stadd;
76 int r = utofu_reg_mem(vcq_hdl_, baseptr_, bytes_, 0, &my_stadd);
80 MPI_Allgather(&my_stadd, 1, MPI_UINT64_T, stadds.data(), 1, MPI_UINT64_T, MPI_COMM_WORLD);
84 utofu_vcq_hdl_t vcq_hdl_;
87 std::vector<utofu_stadd_t> stadds_;
91 return win(vcq_hdl_, baseptr, bytes);
95 std::byte* origin_addr,
97 const win& target_win,
99 std::size_t target_disp) {
100 constexpr
unsigned long int post_flags = UTOFU_ONESIDED_FLAG_TCQ_NOTICE |
101 UTOFU_ONESIDED_FLAG_LOCAL_MRQ_NOTICE;
103 int r = utofu_get(vcq_hdl_, vcq_ids_[target_rank], origin_win.my_stadd(origin_addr),
104 target_win.stadd(target_rank) + target_disp, bytes, 0, post_flags, 0);
105 if (r == UTOFU_ERR_BUSY) {
106 poll_tcq_until_empty();
113 n_ongoing_tcq_reqs_++;
114 n_ongoing_mrq_reqs_++;
117 void get_nb(std::byte*, std::size_t,
const win&,
int, std::size_t) {
118 common::die(
"utofu rma layer is not supported for get/put (nocache) interface");
122 const std::byte* origin_addr,
124 const win& target_win,
126 std::size_t target_disp) {
127 constexpr
unsigned long int post_flags = UTOFU_ONESIDED_FLAG_TCQ_NOTICE |
128 UTOFU_ONESIDED_FLAG_LOCAL_MRQ_NOTICE;
130 int r = utofu_put(vcq_hdl_, vcq_ids_[target_rank], origin_win.my_stadd(origin_addr),
131 target_win.stadd(target_rank) + target_disp, bytes, 0, post_flags, 0);
132 if (r == UTOFU_ERR_BUSY) {
133 poll_tcq_until_empty();
140 n_ongoing_tcq_reqs_++;
141 n_ongoing_mrq_reqs_++;
144 void put_nb(
const std::byte*, std::size_t,
const win&,
int, std::size_t) {
145 common::die(
"utofu rma layer is not supported for get/put (nocache) interface");
150 for (
int i = 0; i < n_ongoing_tcq_reqs_; i++) {
153 int r = utofu_poll_tcq(vcq_hdl_, 0, &cbdata);
154 if (r != UTOFU_ERR_NOT_FOUND) {
160 n_ongoing_tcq_reqs_ = 0;
162 for (
int i = 0; i < n_ongoing_mrq_reqs_; i++) {
163 struct utofu_mrq_notice notice;
165 int r = utofu_poll_mrq(vcq_hdl_, 0, ¬ice);
166 if (r != UTOFU_ERR_NOT_FOUND) {
172 n_ongoing_mrq_reqs_ = 0;
176 utofu_vcq_hdl_t init_vcq_hdl() {
177 std::size_t num_tnis;
178 utofu_tni_id_t* tni_ids;
179 utofu_get_onesided_tnis(&tni_ids, &num_tnis);
181 utofu_tni_id_t tni_id = tni_ids[0];
184 utofu_vcq_hdl_t vcq_hdl;
185 int r = utofu_create_vcq(tni_id, 0, &vcq_hdl);
191 std::vector<utofu_vcq_id_t> init_vcq_ids() {
192 utofu_vcq_id_t my_vcq_id;
193 int r = utofu_query_vcq_id(vcq_hdl_, &my_vcq_id);
197 MPI_Allgather(&my_vcq_id, 1, MPI_UINT64_T, vcq_ids.data(), 1, MPI_UINT64_T, MPI_COMM_WORLD);
202 void poll_tcq_until_empty() {
205 int r = utofu_poll_tcq(vcq_hdl_, 0, &cbdata);
206 if (r == UTOFU_SUCCESS) {
207 n_ongoing_tcq_reqs_--;
215 utofu_vcq_hdl_t vcq_hdl_;
216 std::vector<utofu_vcq_id_t> vcq_ids_;
217 int n_ongoing_tcq_reqs_ = 0;
218 int n_ongoing_mrq_reqs_ = 0;
#define ITYR_CHECK_MESSAGE(cond,...)
Definition: util.hpp:49
#define ITYR_CHECK(cond)
Definition: util.hpp:48
void get_nb(const win &origin_win, T *origin_addr, std::size_t count, const win &target_win, int target_rank, std::size_t target_disp)
Definition: rma.hpp:25
void flush(const win &target_win)
Definition: rma.hpp:76
std::unique_ptr< win > create_win(T *baseptr, std::size_t count)
Definition: rma.hpp:16
ITYR_RMA_IMPL::win win
Definition: rma.hpp:13
void put_nb(const win &origin_win, const T *origin_addr, std::size_t count, const win &target_win, int target_rank, std::size_t target_disp)
Definition: rma.hpp:51
rank_t n_ranks()
Definition: topology.hpp:208
int rank_t
Definition: topology.hpp:12
MPI_Comm mpicomm()
Definition: topology.hpp:206
rank_t my_rank()
Definition: topology.hpp:207
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
void free(global_ptr< T > ptr, std::size_t count)
Definition: ori.hpp:75
ForwardIteratorD move(const ExecutionPolicy &policy, ForwardIterator1 first1, ForwardIterator1 last1, ForwardIteratorD first_d)
Move a range to another.
Definition: parallel_loop.hpp:934