Itoyori  v0.0.1
noncoll_mem.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include "ityr/common/util.hpp"
4 #include "ityr/common/rma.hpp"
6 
7 namespace ityr::ori {
8 
9 // TODO: unify these implementations with the common allocator
10 
11 class root_resource final : public common::pmr::memory_resource {
12 public:
13  root_resource(void* addr, std::size_t size)
14  : addr_(addr),
15  size_(size),
16  freelist_(reinterpret_cast<uintptr_t>(addr_), size_) {}
17 
18  void* do_allocate(std::size_t bytes, std::size_t alignment) override {
19  ITYR_CHECK(bytes <= size_);
20 
21  auto s = freelist_.get(bytes, alignment);
22  if (!s.has_value()) {
23  throw std::bad_alloc();
24  }
25 
26  return reinterpret_cast<void*>(*s);
27  }
28 
29  void do_deallocate(void* p, std::size_t bytes, std::size_t alignment [[maybe_unused]]) override {
30  ITYR_CHECK(p);
31  ITYR_CHECK(bytes <= size_);
32 
33  freelist_.add(reinterpret_cast<uintptr_t>(p), bytes);
34  }
35 
36  bool do_is_equal(const common::pmr::memory_resource& other) const noexcept override {
37  return this == &other;
38  }
39 
40 private:
41  void* addr_;
42  std::size_t size_;
43  common::freelist freelist_;
44 };
45 
46 class noncoll_mem final : public common::pmr::memory_resource {
47 public:
48  noncoll_mem(std::size_t local_max_size, std::size_t alignment)
49  : local_max_size_(common::round_up_pow2(local_max_size, alignment)),
50  global_max_size_(local_max_size_ * common::topology::n_ranks()),
51  vm_(common::reserve_same_vm_coll(global_max_size_, alignment)),
52  local_base_addr_(reinterpret_cast<std::byte*>(vm_.addr()) + local_max_size_ * common::topology::my_rank()),
53  pm_(init_pm()),
54  win_(common::rma::create_win(local_base_addr_, local_max_size_)),
55  root_mr_(local_base_addr_, local_max_size_ - sizeof(int)), // The last element is used for a flag value for deallocation
56  std_pool_mr_(my_std_pool_options(), &root_mr_),
57  max_unflushed_free_objs_(common::allocator_max_unflushed_free_objs_option::value()),
58  allocated_size_(0),
59  collect_threshold_(std::size_t(16) * 1024),
60  collect_threshold_max_(local_max_size_ * 8 / 10) {
61  // Set the flag value for deallocation
62  *reinterpret_cast<int*>(
63  reinterpret_cast<std::byte*>(local_base_addr_) + local_max_size_ - sizeof(int)) = remote_free_flag_value;
64  }
65 
66  const common::rma::win& win() const { return *win_; }
67 
68  bool has(const void* p) const {
69  return vm_.addr() <= p && p < reinterpret_cast<std::byte*>(vm_.addr()) + global_max_size_;
70  }
71 
72  common::topology::rank_t get_owner(const void* p) const {
73  return (reinterpret_cast<uintptr_t>(p) - reinterpret_cast<uintptr_t>(vm_.addr())) / local_max_size_;
74  }
75 
76  std::size_t get_disp(const void* p) const {
77  return (reinterpret_cast<uintptr_t>(p) - reinterpret_cast<uintptr_t>(vm_.addr())) % local_max_size_;
78  }
79 
80  void* do_allocate(std::size_t bytes, std::size_t alignment = alignof(max_align_t)) override {
82 
83  std::size_t pad_bytes = common::round_up_pow2(sizeof(header), alignment);
84  std::size_t real_bytes = bytes + pad_bytes;
85 
86  if (allocated_size_ >= collect_threshold_) {
88  }
89 
90  std::byte* p;
91  try {
92  p = reinterpret_cast<std::byte*>(std_pool_mr_.allocate(real_bytes, alignment));
93  } catch (std::bad_alloc& e) {
94  // collect remotely freed objects and try allocation again
96  try {
97  p = reinterpret_cast<std::byte*>(std_pool_mr_.allocate(real_bytes, alignment));
98  } catch (std::bad_alloc& e) {
99  // TODO: throw std::bad_alloc?
100  common::die("[ityr::ori::noncoll_mem] Could not allocate memory for malloc_local()");
101  }
102  };
103 
104  std::byte* ret = p + pad_bytes;
105 
106  ITYR_CHECK(ret + bytes <= p + real_bytes);
107  ITYR_CHECK(p + sizeof(header) <= ret);
108 
109  header* h = new (p) header {
110  .prev = allocated_list_end_, .next = nullptr,
111  .size = real_bytes, .alignment = alignment, .freed = 0};
112  ITYR_CHECK(allocated_list_end_->next == nullptr);
113  allocated_list_end_->next = h;
114  allocated_list_end_ = h;
115 
116  allocated_size_ += real_bytes;
117 
118  return ret;
119  }
120 
121  void do_deallocate(void* p, std::size_t bytes, std::size_t alignment = alignof(max_align_t)) override {
122  auto target_rank = get_owner(p);
123  if (target_rank == common::topology::my_rank()) {
124  local_deallocate(p, bytes, alignment);
125  } else {
126  remote_deallocate(p, bytes, target_rank, alignment);
127  }
128  }
129 
130  bool do_is_equal(const common::pmr::memory_resource& other) const noexcept override {
131  return this == &other;
132  }
133 
134  void local_deallocate(void* p, std::size_t bytes, std::size_t alignment = alignof(max_align_t)) {
136 
138 
139  std::size_t pad_bytes = common::round_up_pow2(sizeof(header), alignment);
140  std::size_t real_bytes = bytes + pad_bytes;
141 
142  header* h = reinterpret_cast<header*>(reinterpret_cast<std::byte*>(p) - pad_bytes);
143  ITYR_CHECK(h->size == real_bytes);
144  ITYR_CHECK(h->alignment == alignment);
145  ITYR_CHECK(h->freed == 0);
146 
147  local_deallocate_impl(h, real_bytes, alignment);
148  }
149 
150  void remote_deallocate(void* p, std::size_t bytes [[maybe_unused]], int target_rank, std::size_t alignment = alignof(max_align_t)) {
152 
153  ITYR_CHECK(common::topology::my_rank() != target_rank);
154  ITYR_CHECK(get_owner(p) == target_rank);
155 
156  int* flag_val_p = reinterpret_cast<int*>(
157  reinterpret_cast<std::byte*>(local_base_addr_) + local_max_size_ - sizeof(int));
158  common::rma::put_nb(win(), flag_val_p, 1, win(), target_rank, get_header_disp(p, alignment));
159 
160  static int count = 0;
161  count++;
162  if (count >= max_unflushed_free_objs_) {
164  count = 0;
165  }
166  }
167 
170 
171  header *h = allocated_list_.next;
172  while (h) {
173  int flag = h->freed.load(std::memory_order_acquire);
174  if (flag) {
175  ITYR_CHECK_MESSAGE(flag == remote_free_flag_value, "noncoll memory corruption");
176  header* h_next = h->next;
177  local_deallocate_impl(h, h->size, h->alignment);
178  h = h_next;
179  } else {
180  h = h->next;
181  }
182  }
183 
184  collect_threshold_ = allocated_size_ * 2;
185  if (collect_threshold_ > collect_threshold_max_) {
186  collect_threshold_ = (collect_threshold_max_ + allocated_size_) / 2;
187  }
188  }
189 
190  bool is_locally_accessible(const void* p) const {
192  }
193 
194  bool is_remotely_freed(void* p, std::size_t alignment = alignof(max_align_t)) {
196 
197  std::size_t pad_bytes = common::round_up_pow2(sizeof(header), alignment);
198  header* h = reinterpret_cast<header*>(reinterpret_cast<std::byte*>(p) - pad_bytes);
199 
200  if (h->freed.load(std::memory_order_acquire)) {
201  local_deallocate_impl(h, h->size, h->alignment);
202  return true;
203  }
204  return false;
205  }
206 
207  // mainly for debugging
208  bool empty() {
209  return allocated_list_.next == nullptr;
210  }
211 
212 private:
213  static std::string allocator_shmem_name(int inter_rank) {
214  static int count = 0;
215  std::stringstream ss;
216  ss << "/ityr_noncoll_" << count++ << "_" << inter_rank;
217  return ss.str();
218  }
219 
220  common::physical_mem init_pm() const {
222 
223  if (common::topology::intra_my_rank() == 0) {
224  pm = common::physical_mem(allocator_shmem_name(common::topology::inter_my_rank()), global_max_size_, true);
225  }
226 
228 
229  if (common::topology::intra_my_rank() != 0) {
230  pm = common::physical_mem(allocator_shmem_name(common::topology::inter_my_rank()), global_max_size_, false);
231  }
232 
233  ITYR_CHECK(vm_.size() == global_max_size_);
234 
236  auto target_rank = common::topology::intra2global_rank(r);
237  auto offset = local_max_size_ * target_rank;
238  void* begin_addr = reinterpret_cast<std::byte*>(vm_.addr()) + offset;
239  pm.map_to_vm(begin_addr, local_max_size_, offset);
240  }
241 
243  common::numa::bind_to(local_base_addr_, local_max_size_, common::topology::numa_my_node());
245  }
246 
247  return pm;
248  }
249 
250  common::pmr::pool_options my_std_pool_options() const {
251  common::pmr::pool_options opts;
252  opts.max_blocks_per_chunk = local_max_size_ / 10;
253  return opts;
254  }
255 
256  struct header {
257  header* prev = nullptr;
258  header* next = nullptr;
259  std::size_t size = 0;
260  std::size_t alignment = 0;
261  std::atomic<int> freed = 0;
262  };
263 
264  void remove_header_from_list(header* h) {
265  ITYR_CHECK(h->prev);
266  h->prev->next = h->next;
267 
268  if (h->next) {
269  h->next->prev = h->prev;
270  } else {
271  ITYR_CHECK(h == allocated_list_end_);
272  allocated_list_end_ = h->prev;
273  }
274  }
275 
276  std::size_t get_header_disp(const void* p, std::size_t alignment) const {
277  std::size_t pad_bytes = common::round_up_pow2(sizeof(header), alignment);
278  auto h = reinterpret_cast<const header*>(reinterpret_cast<const std::byte*>(p) - pad_bytes);
279  const void* flag_addr = &h->freed;
280 
281  return get_disp(flag_addr);
282  }
283 
284  void local_deallocate_impl(header* h, std::size_t size, std::size_t alignment) {
285  remove_header_from_list(h);
286  std::destroy_at(h);
287  std_pool_mr_.deallocate(h, size, alignment);
288 
289  ITYR_CHECK(allocated_size_ >= size);
290  allocated_size_ -= size;
291  }
292 
293  static constexpr int remote_free_flag_value = 417;
294 
295  std::size_t local_max_size_;
296  std::size_t global_max_size_;
297  common::virtual_mem vm_;
298  void* local_base_addr_;
299  common::physical_mem pm_;
300  std::unique_ptr<common::rma::win> win_;
301  root_resource root_mr_;
302  common::pmr::unsynchronized_pool_resource std_pool_mr_;
303  int max_unflushed_free_objs_;
304  header allocated_list_;
305  header* allocated_list_end_ = &allocated_list_;
306  std::size_t allocated_size_;
307  std::size_t collect_threshold_;
308  std::size_t collect_threshold_max_;
309 };
310 
311 }
Definition: freelist.hpp:13
void add(uintptr_t addr, std::size_t size)
Definition: freelist.hpp:68
std::optional< uintptr_t > get(std::size_t size)
Definition: freelist.hpp:18
Definition: physical_mem.hpp:18
void map_to_vm(void *addr, std::size_t size, std::size_t offset) const
Definition: physical_mem.hpp:43
void * addr() const
Definition: virtual_mem.hpp:46
std::size_t size() const
Definition: virtual_mem.hpp:47
Definition: noncoll_mem.hpp:46
const common::rma::win & win() const
Definition: noncoll_mem.hpp:66
void do_deallocate(void *p, std::size_t bytes, std::size_t alignment=alignof(max_align_t)) override
Definition: noncoll_mem.hpp:121
std::size_t get_disp(const void *p) const
Definition: noncoll_mem.hpp:76
void remote_deallocate(void *p, std::size_t bytes[[maybe_unused]], int target_rank, std::size_t alignment=alignof(max_align_t))
Definition: noncoll_mem.hpp:150
void local_deallocate(void *p, std::size_t bytes, std::size_t alignment=alignof(max_align_t))
Definition: noncoll_mem.hpp:134
bool is_remotely_freed(void *p, std::size_t alignment=alignof(max_align_t))
Definition: noncoll_mem.hpp:194
bool empty()
Definition: noncoll_mem.hpp:208
bool is_locally_accessible(const void *p) const
Definition: noncoll_mem.hpp:190
bool do_is_equal(const common::pmr::memory_resource &other) const noexcept override
Definition: noncoll_mem.hpp:130
common::topology::rank_t get_owner(const void *p) const
Definition: noncoll_mem.hpp:72
bool has(const void *p) const
Definition: noncoll_mem.hpp:68
void collect_deallocated()
Definition: noncoll_mem.hpp:168
void * do_allocate(std::size_t bytes, std::size_t alignment=alignof(max_align_t)) override
Definition: noncoll_mem.hpp:80
noncoll_mem(std::size_t local_max_size, std::size_t alignment)
Definition: noncoll_mem.hpp:48
Definition: noncoll_mem.hpp:11
void * do_allocate(std::size_t bytes, std::size_t alignment) override
Definition: noncoll_mem.hpp:18
root_resource(void *addr, std::size_t size)
Definition: noncoll_mem.hpp:13
bool do_is_equal(const common::pmr::memory_resource &other) const noexcept override
Definition: noncoll_mem.hpp:36
void do_deallocate(void *p, std::size_t bytes, std::size_t alignment[[maybe_unused]]) override
Definition: noncoll_mem.hpp:29
#define ITYR_CHECK_MESSAGE(cond,...)
Definition: util.hpp:49
#define ITYR_CHECK(cond)
Definition: util.hpp:48
void bind_to(void *, std::size_t, node_t)
Definition: numa.hpp:88
void flush(const win &target_win)
Definition: rma.hpp:76
std::unique_ptr< win > create_win(T *baseptr, std::size_t count)
Definition: rma.hpp:16
ITYR_RMA_IMPL::win win
Definition: rma.hpp:13
void put_nb(const win &origin_win, const T *origin_addr, std::size_t count, const win &target_win, int target_rank, std::size_t target_disp)
Definition: rma.hpp:51
rank_t inter_my_rank()
Definition: topology.hpp:215
bool numa_enabled()
Definition: topology.hpp:226
int rank_t
Definition: topology.hpp:12
rank_t inter_rank(rank_t global_rank)
Definition: topology.hpp:219
rank_t intra_my_rank()
Definition: topology.hpp:211
bool is_locally_accessible(rank_t target_global_rank)
Definition: topology.hpp:224
MPI_Comm intra_mpicomm()
Definition: topology.hpp:210
rank_t intra_n_ranks()
Definition: topology.hpp:212
rank_t my_rank()
Definition: topology.hpp:207
numa::node_t numa_my_node()
Definition: topology.hpp:227
rank_t intra2global_rank(rank_t intra_rank)
Definition: topology.hpp:221
T round_up_pow2(T x, T alignment)
Definition: util.hpp:142
virtual_mem reserve_same_vm_coll(std::size_t size, std::size_t alignment=alignof(max_align_t))
Definition: virtual_mem.hpp:170
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
Definition: block_region_set.hpp:9
rank_t my_rank()
Return the rank of the process running the current thread.
Definition: ityr.hpp:99
rank_t n_ranks()
Return the total number of processes.
Definition: ityr.hpp:107
constexpr auto size(const checkout_span< T, Mode > &cs) noexcept
Definition: checkout_span.hpp:178
#define ITYR_PROFILER_RECORD(event,...)
Definition: profiler.hpp:319
Definition: prof_events.hpp:104
Definition: prof_events.hpp:119
Definition: prof_events.hpp:109
Definition: prof_events.hpp:114