Itoyori  v0.0.1
global_lock.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <new>
4 #include <atomic>
5 
6 #include "ityr/common/util.hpp"
10 #include "ityr/common/profiler.hpp"
12 
13 namespace ityr::common {
14 
15 class global_lock {
16 public:
17  global_lock(int n_locks = 1)
18  : n_locks_(n_locks),
19  lock_win_(topology::mpicomm(), n_locks_, 0) {}
20 
21  bool trylock(topology::rank_t target_rank, int idx = 0) const {
23 
24  ITYR_CHECK(idx < n_locks_);
25 
26  lock_t result = mpi_atomic_cas_value<lock_t>(1, 0, target_rank, get_disp(idx), lock_win_.win());
27 
28  ITYR_CHECK(0 <= result);
29  ITYR_CHECK(result <= 2);
30  return result == 0;
31  }
32 
33  void lock(topology::rank_t target_rank, int idx = 0) const {
34  ITYR_CHECK(idx < n_locks_);
35  while (!trylock(target_rank, idx));
36  }
37 
38  void priolock(topology::rank_t target_rank, int idx = 0) const {
39  // Only one process can call this priority lock at the same time
41 
42  ITYR_CHECK(idx < n_locks_);
43 
44  lock_t result = mpi_atomic_faa_value<lock_t>(1, target_rank, get_disp(idx), lock_win_.win());
45  if (result == 0) {
46  return;
47  }
48 
49  // Wait until the previous lock holder releases the lock
50  while (mpi_atomic_get_value<lock_t>(target_rank, get_disp(idx), lock_win_.win()) != 1);
51  }
52 
53  void unlock(topology::rank_t target_rank, int idx = 0) const {
55 
56  ITYR_CHECK(idx < n_locks_);
57 
58  mpi_atomic_faa_value<lock_t>(-1, target_rank, get_disp(idx), lock_win_.win());
59  }
60 
61  bool is_locked(topology::rank_t target_rank, int idx = 0) const {
62  ITYR_CHECK(idx < n_locks_);
63 
64  lock_t result = mpi_atomic_get_value<lock_t>(target_rank, get_disp(idx), lock_win_.win());
65  return result > 0;
66  }
67 
68 private:
69  using lock_t = int;
70 
71  struct alignas(common::hardware_destructive_interference_size) lock_wrapper {
72  template <typename... Args>
73  lock_wrapper(Args&&... args) : value(std::forward<Args>(args)...) {}
74  std::atomic<lock_t> value;
75  };
76 
77  std::size_t get_disp(int idx) const {
78  return idx * sizeof(lock_wrapper) + offsetof(lock_wrapper, value);
79  }
80 
81  int n_locks_;
82  mpi_win_manager<lock_wrapper> lock_win_;
83 };
84 
85 ITYR_TEST_CASE("[ityr::common::global_lock] lock and unlock") {
86  runtime_options opts;
87  singleton_initializer<topology::instance> topo;
88 
89  ITYR_SUBCASE("single element") {
90  global_lock lock;
91 
92  using value_t = std::size_t;
93  mpi_win_manager<value_t> value_win(topology::mpicomm(), 1);
94 
95  ITYR_CHECK(value_win.local_buf()[0] == 0);
96 
98 
99  auto n_ranks = topology::n_ranks();
100 
101  std::size_t n_updates = 100;
102 
103  for (topology::rank_t target_rank = 0; target_rank < n_ranks; target_rank++) {
104  for (std::size_t i = 0; i < n_updates; i++) {
105  lock.lock(target_rank);
106 
107  auto v = common::mpi_get_value<value_t>(target_rank, 0, value_win.win());
108  common::mpi_put_value<value_t>(v + 1, target_rank, 0, value_win.win());
109 
110  lock.unlock(target_rank);
111  }
112 
114  }
115 
116  ITYR_CHECK(value_win.local_buf()[0] == n_updates * n_ranks);
117  }
118 
119  ITYR_SUBCASE("multiple elements") {
120  int n_elems = 3;
121  global_lock lock(n_elems);
122 
123  using value_t = std::size_t;
124  mpi_win_manager<value_t> value_win(topology::mpicomm(), n_elems);
125 
126  for (int i = 0; i < n_elems; i++) {
127  ITYR_CHECK(value_win.local_buf()[i] == 0);
128  }
129 
130  auto n_ranks = topology::n_ranks();
131 
132  std::size_t n_updates = 1000;
133 
134  for (topology::rank_t target_rank = 0; target_rank < n_ranks; target_rank++) {
135  for (std::size_t i = 0; i < n_updates; i++) {
136  int idx = i % n_elems;
137  lock.lock(target_rank, idx);
138 
139  auto v = common::mpi_get_value<value_t>(target_rank, idx * sizeof(value_t), value_win.win());
140  common::mpi_put_value<value_t>(v + 1, target_rank, idx * sizeof(value_t), value_win.win());
141 
142  lock.unlock(target_rank, idx);
143  }
144 
146  }
147 
148  value_t sum = 0;
149  for (int i = 0; i < n_elems; i++) {
150  sum += value_win.local_buf()[i];
151  }
152 
153  ITYR_CHECK(sum == n_updates * n_ranks);
154  }
155 }
156 
157 }
Definition: global_lock.hpp:15
void priolock(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:38
void lock(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:33
void unlock(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:53
bool trylock(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:21
bool is_locked(topology::rank_t target_rank, int idx=0) const
Definition: global_lock.hpp:61
global_lock(int n_locks=1)
Definition: global_lock.hpp:17
MPI_Win win() const
Definition: mpi_rma.hpp:409
#define ITYR_SUBCASE(name)
Definition: util.hpp:41
#define ITYR_CHECK(cond)
Definition: util.hpp:48
rank_t n_ranks()
Definition: topology.hpp:208
int rank_t
Definition: topology.hpp:12
MPI_Comm mpicomm()
Definition: topology.hpp:206
Definition: allocator.hpp:16
va_list args
Definition: util.hpp:76
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
rank_t n_ranks()
Return the total number of processes.
Definition: ityr.hpp:107
#define ITYR_PROFILER_RECORD(event,...)
Definition: profiler.hpp:319
Definition: prof_events.hpp:94
Definition: prof_events.hpp:89
Definition: prof_events.hpp:99