8 #define ITYR_DEBUG_UCX 0
12 #include <ucs/debug/log_def.h>
20 template <
typename T>
inline MPI_Datatype
mpi_type();
26 template <>
inline MPI_Datatype
mpi_type<void*>() {
return mpi_type<uintptr_t>(); }
30 MPI_Comm_rank(comm, &rank);
37 MPI_Comm_size(comm, &
size);
48 MPI_Ibarrier(comm, &req);
88 mpi_send(&value, 1, target_rank, tag, comm);
106 template <
typename T>
123 template <
typename T>
128 mpi_recv(&result, 1, target_rank, tag, comm);
132 template <
typename T>
144 template <
typename T>
153 template <
typename T>
159 MPI_Op op = MPI_SUM) {
169 template <
typename T>
173 MPI_Op op = MPI_SUM) {
175 mpi_reduce(&value, &result, 1, root_rank, comm, op);
179 template <
typename T>
184 MPI_Op op = MPI_SUM) {
185 MPI_Allreduce(sendbuf,
193 template <
typename T>
196 MPI_Op op = MPI_SUM) {
202 template <
typename T>
204 std::size_t sendcount,
206 std::size_t recvcount,
208 MPI_Allgather(sendbuf,
217 template <
typename T>
225 template <
typename T>
241 template <
typename T>
251 MPI_Wait(&req, MPI_STATUS_IGNORE);
256 MPI_Test(&req, &flag, MPI_STATUS_IGNORE);
262 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
266 static MPI_Comm comm = MPI_COMM_WORLD;
272 #define UCS_LOG_TIME_FMT "[%lu.%06lu]"
273 #define UCS_LOG_METADATA_FMT "%17s:%-4u %-4s %-5s %*s"
274 #define UCS_LOG_PROC_DATA_FMT "[%s:%-5d:%s]"
276 #define UCS_LOG_FMT UCS_LOG_TIME_FMT " " UCS_LOG_PROC_DATA_FMT " " \
277 UCS_LOG_METADATA_FMT "%s\n"
279 #define UCS_LOG_TIME_ARG(_tv) (_tv).tv_sec, (_tv).tv_usec
281 #define UCS_LOG_METADATA_ARG(_short_file, _line, _level, _comp_conf) \
282 (_short_file), (_line), (_comp_conf)->name, \
283 ucs_log_level_names[_level], 0, ""
285 #define UCS_LOG_PROC_DATA_ARG() \
286 ucs_get_host_name(), ucs_log_get_pid(), ucs_log_get_thread_name()
288 #define UCS_LOG_COMPACT_ARG(_tv)\
289 UCS_LOG_TIME_ARG(_tv), UCS_LOG_PROC_DATA_ARG()
291 #define UCS_LOG_ARG(_short_file, _line, _level, _comp_conf, _tv, _message) \
292 UCS_LOG_TIME_ARG(_tv), UCS_LOG_PROC_DATA_ARG(), \
293 UCS_LOG_METADATA_ARG(_short_file, _line, _level, _comp_conf), (_message)
295 inline const char *ucs_log_level_names[] = {
296 [UCS_LOG_LEVEL_FATAL] =
"FATAL",
297 [UCS_LOG_LEVEL_ERROR] =
"ERROR",
298 [UCS_LOG_LEVEL_WARN] =
"WARN",
299 [UCS_LOG_LEVEL_DIAG] =
"DIAG",
300 [UCS_LOG_LEVEL_INFO] =
"INFO",
301 [UCS_LOG_LEVEL_DEBUG] =
"DEBUG",
302 [UCS_LOG_LEVEL_TRACE] =
"TRACE",
303 [UCS_LOG_LEVEL_TRACE_REQ] =
"REQ",
304 [UCS_LOG_LEVEL_TRACE_DATA] =
"DATA",
305 [UCS_LOG_LEVEL_TRACE_ASYNC] =
"ASYNC",
306 [UCS_LOG_LEVEL_TRACE_FUNC] =
"FUNC",
307 [UCS_LOG_LEVEL_TRACE_POLL] =
"POLL",
308 [UCS_LOG_LEVEL_LAST] = NULL,
309 [UCS_LOG_LEVEL_PRINT] =
"PRINT"
312 inline const char* ucs_get_host_name() {
313 static char hostname[256] = {0};
314 if (*hostname == 0) {
315 gethostname(hostname,
sizeof(hostname));
316 strtok(hostname,
".");
321 inline int ucs_log_get_pid() {
322 static int ucs_log_pid = 0;
323 if (ucs_log_pid == 0) {
329 inline const char* ucs_log_get_thread_name() {
330 static thread_local
char ucs_log_thread_name[32] = {0};
331 static std::atomic<int> ucs_log_thread_count = 0;
332 char *name = ucs_log_thread_name;
335 if (name[0] ==
'\0') {
336 int thread_num = std::atomic_fetch_add(&ucs_log_thread_count, 1);
337 snprintf(ucs_log_thread_name,
sizeof(ucs_log_thread_name),
"%d", thread_num);
343 inline const char* ucs_basename(
const char *path) {
344 const char *name = strrchr(path,
'/');
345 return (name == NULL) ? path : name + 1;
348 inline FILE* ityr_ucx_log_fileptr() {
349 static std::unique_ptr<char[]> outbuf;
350 static std::unique_ptr<FILE, void(*)(FILE*)> outfile(NULL, [](FILE*){});
351 std::size_t outbufsize = 1L * 1024 * 1024 * 1024;
353 if (outfile ==
nullptr) {
354 outbuf = std::make_unique<char[]>(outbufsize);
357 snprintf(buf,
sizeof(buf),
"ityr_ucx.log.%d",
mpi_comm_rank(MPI_COMM_WORLD));
358 outfile = std::unique_ptr<FILE, void(*)(FILE*)>(fopen(buf,
"w"),
359 [](FILE* fp) {
if (fp) ::fclose(fp); });
360 if (outfile ==
nullptr) {
362 die(
"could not open file %s", buf);
365 int ret = setvbuf(outfile.get(), outbuf.get(), _IOFBF, outbufsize);
368 die(
"setvbuf failed");
372 return outfile.get();
375 inline bool ityr_ucx_log_enable(
int mode = -1) {
379 }
else if (
mode == 1) {
385 inline void ityr_ucx_log_flush() {
386 fflush(ityr_ucx_log_fileptr());
389 inline ucs_log_func_rc_t
390 ityr_ucx_log_handler(
const char *file,
unsigned line,
const char *
function,
391 ucs_log_level_t level,
392 const ucs_log_component_config_t *comp_conf,
393 const char *format, va_list ap) {
394 if (!ityr_ucx_log_enable()) {
395 return UCS_LOG_FUNC_RC_CONTINUE;
398 if (!ucs_log_component_is_enabled(level, comp_conf) &&
399 (level != UCS_LOG_LEVEL_PRINT)) {
400 return UCS_LOG_FUNC_RC_CONTINUE;
403 size_t buffer_size = ucs_log_get_buffer_size();
404 char* buf =
reinterpret_cast<char*
>(alloca(buffer_size + 1));
405 buf[buffer_size] = 0;
408 const char* short_file = ucs_basename(file);
410 gettimeofday(&tv, NULL);
413 char* log_line = strtok_r(buf,
"\n", &saveptr);
414 while (log_line != NULL) {
415 fprintf(ityr_ucx_log_fileptr(), UCS_LOG_FMT,
416 UCS_LOG_ARG(short_file, line, level,
417 comp_conf, tv, log_line));
418 log_line = strtok_r(NULL,
"\n", &saveptr);
422 if (level <= UCS_LOG_LEVEL_ERROR) {
423 ityr_ucx_log_flush();
426 return UCS_LOG_FUNC_RC_CONTINUE;
434 MPI_Initialized(&initialized_outside_);
435 if (!initialized_outside_) {
436 MPI_Init(
nullptr,
nullptr);
439 while (ucs_log_num_handlers() > 0) {
440 ucs_log_pop_handler();
442 ityr_ucx_log_fileptr();
443 ucs_log_push_handler(ityr_ucx_log_handler);
449 ucs_log_pop_handler();
451 if (!initialized_outside_) {
463 int initialized_outside_ = 1;
466 template <
typename T>
Definition: mpi_util.hpp:430
~mpi_initializer()
Definition: mpi_util.hpp:447
mpi_initializer(mpi_initializer &&)=delete
mpi_initializer & operator=(mpi_initializer &&)=delete
mpi_initializer & operator=(const mpi_initializer &)=delete
mpi_initializer(const mpi_initializer &)=delete
mpi_initializer(MPI_Comm comm)
Definition: mpi_util.hpp:432
#define ITYR_CHECK(cond)
Definition: util.hpp:48
bool enabled()
Definition: numa.hpp:86
ITYR_CONCAT(mode_, ITYR_PROFILER_MODE) mode
Definition: profiler.hpp:257
Definition: allocator.hpp:16
void mpi_recv(T *buf, std::size_t count, int target_rank, int tag, MPI_Comm comm)
Definition: mpi_util.hpp:92
MPI_Datatype mpi_type< bool >()
Definition: mpi_util.hpp:25
std::vector< T > mpi_allgather_value(const T &value, MPI_Comm comm)
Definition: mpi_util.hpp:218
void mpi_allgather(const T *sendbuf, std::size_t sendcount, T *recvbuf, std::size_t recvcount, MPI_Comm comm)
Definition: mpi_util.hpp:203
void mpi_reduce(const T *sendbuf, T *recvbuf, std::size_t count, int root_rank, MPI_Comm comm, MPI_Op op=MPI_SUM)
Definition: mpi_util.hpp:154
MPI_Datatype mpi_type< unsigned long >()
Definition: mpi_util.hpp:24
fprintf(stderr, "\x1b[31m%s\x1b[39m\n", msg)
void mpi_make_progress()
Definition: mpi_util.hpp:260
T mpi_bcast_value(const T &value, int root_rank, MPI_Comm comm)
Definition: mpi_util.hpp:145
vsnprintf(msg, slen, fmt, args)
MPI_Datatype mpi_type< void * >()
Definition: mpi_util.hpp:26
T mpi_allreduce_value(const T &value, MPI_Comm comm, MPI_Op op=MPI_SUM)
Definition: mpi_util.hpp:194
MPI_Datatype mpi_type< long >()
Definition: mpi_util.hpp:23
T mpi_scatter_value(const T *sendbuf, int root_rank, MPI_Comm comm)
Definition: mpi_util.hpp:242
void mpi_wait(MPI_Request &req)
Definition: mpi_util.hpp:250
T mpi_recv_value(int target_rank, int tag, MPI_Comm comm)
Definition: mpi_util.hpp:124
MPI_Comm & mpi_comm_root()
Definition: mpi_util.hpp:265
MPI_Request mpi_isend(const T *buf, std::size_t count, int target_rank, int tag, MPI_Comm comm)
Definition: mpi_util.hpp:67
T getenv_with_default(const char *env_var, T default_val)
Definition: util.hpp:88
int mpi_comm_rank(MPI_Comm comm)
Definition: mpi_util.hpp:28
MPI_Request mpi_ibarrier(MPI_Comm comm)
Definition: mpi_util.hpp:46
constexpr auto size(const span< T > &s) noexcept
Definition: span.hpp:61
void mpi_bcast(T *buf, std::size_t count, int root_rank, MPI_Comm comm)
Definition: mpi_util.hpp:133
int mpi_comm_size(MPI_Comm comm)
Definition: mpi_util.hpp:35
T getenv_coll(const std::string &env_var, T default_val)
Definition: mpi_util.hpp:467
T mpi_reduce_value(const T &value, int root_rank, MPI_Comm comm, MPI_Op op=MPI_SUM)
Definition: mpi_util.hpp:170
void mpi_barrier(MPI_Comm comm)
Definition: mpi_util.hpp:42
void mpi_send_value(const T &value, int target_rank, int tag, MPI_Comm comm)
Definition: mpi_util.hpp:84
void mpi_allreduce(const T *sendbuf, T *recvbuf, std::size_t count, MPI_Comm comm, MPI_Op op=MPI_SUM)
Definition: mpi_util.hpp:180
MPI_Datatype mpi_type< unsigned int >()
Definition: mpi_util.hpp:22
MPI_Datatype mpi_type< int >()
Definition: mpi_util.hpp:21
void mpi_send(const T *buf, std::size_t count, int target_rank, int tag, MPI_Comm comm)
Definition: mpi_util.hpp:53
MPI_Request mpi_irecv(T *buf, std::size_t count, int target_rank, int tag, MPI_Comm comm)
Definition: mpi_util.hpp:107
void mpi_scatter(const T *sendbuf, T *recvbuf, std::size_t count, int root_rank, MPI_Comm comm)
Definition: mpi_util.hpp:226
bool mpi_test(MPI_Request &req)
Definition: mpi_util.hpp:254