From 9b921306d936723f19d58015e293ea5f68dc4800 Mon Sep 17 00:00:00 2001 From: "Vinciguerra, Armando" Date: Tue, 8 Apr 2025 07:10:23 -0400 Subject: [PATCH 1/2] Add a Fine Grained Completion Routine: shmem_pe_quiet --- mpp/shmemx_c_func.h4 | 4 + src/shmem_synchronization.h | 19 +++++ src/synchronization_f.c | 22 +++++ src/transport_none.h | 7 ++ src/transport_ofi.c | 5 ++ src/transport_ofi.h | 164 +++++++++++++++++++++++++++++++++++- src/transport_portals4.h | 7 ++ src/transport_ucx.h | 16 ++++ 8 files changed, 243 insertions(+), 1 deletion(-) diff --git a/mpp/shmemx_c_func.h4 b/mpp/shmemx_c_func.h4 index 948146156..ff8c80f78 100644 --- a/mpp/shmemx_c_func.h4 +++ b/mpp/shmemx_c_func.h4 @@ -105,3 +105,7 @@ SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_heap_create(void *base, size_t size SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_heap_preinit(void); SHMEM_FUNCTION_ATTRIBUTES int SHPRE()shmemx_heap_preinit_thread(int requested, int *provided); SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_heap_postinit(void); + +/* Memory Ordering Routines */ +SHMEM_FUNCTION_ATTRIBUTES void shmem_pe_quiet(const int *target_pes, int npes); +SHMEM_FUNCTION_ATTRIBUTES void shmem_ctx_quiet(shmem_ctx_t ctx, const int *target_pes, int npes); diff --git a/src/shmem_synchronization.h b/src/shmem_synchronization.h index 0270d6d7b..e93a6226f 100644 --- a/src/shmem_synchronization.h +++ b/src/shmem_synchronization.h @@ -40,6 +40,25 @@ shmem_internal_quiet(shmem_ctx_t ctx) shmem_transport_syncmem(); } +static inline void +shmem_internal_pe_quiet(shmem_ctx_t ctx, const int *target_pes, int npes) +{ + int ret; + + if (ctx == SHMEM_CTX_INVALID) + return; + + ret = shmem_transport_pe_quiet((shmem_transport_ctx_t *)ctx, target_pes, npes); + if (0 != ret) { RAISE_ERROR(ret); } + + shmem_internal_membar(); + + /* Transport level memory flush is required to make memory + * changes (i.e. subsequent coherent load operations + * performed via the shmem_ptr API, the result of atomics + * that targeted the local process) visible */ + shmem_transport_syncmem(); +} static inline void shmem_internal_fence(shmem_ctx_t ctx) diff --git a/src/synchronization_f.c b/src/synchronization_f.c index 74e9c5c4d..2065165ae 100644 --- a/src/synchronization_f.c +++ b/src/synchronization_f.c @@ -37,6 +37,28 @@ FC_SHMEM_QUIET(void) shmem_internal_quiet(SHMEM_CTX_DEFAULT); } +#define FC_SHMEM_PE_QUIET FC_FUNC_(shmem_pe_quiet, SHMEM_PE_QUIET) +void SHMEM_FUNCTION_ATTRIBUTES +FC_SHMEM_PE_QUIET(const int *target_pes, int npes); +void +FC_SHMEM_PE_QUIET(const int *target_pes, int npes) +{ + SHMEM_ERR_CHECK_INITIALIZED(); + + shmem_internal_pe_quiet(SHMEM_CTX_DEFAULT, target_pes, npes); +} + +#define FC_SHMEM_CTX_QUIET FC_FUNC_(shmem_ctx_quiet, SHMEM_CTX_QUIET) +void SHMEM_FUNCTION_ATTRIBUTES +FC_SHMEM_CTX_QUIET(shmem_ctx_t ctx, const int *target_pes, int npes); +void +FC_SHMEM_CTX_QUIET(shmem_ctx_t ctx, const int *target_pes, int npes) +{ + SHMEM_ERR_CHECK_INITIALIZED(); + + shmem_internal_pe_quiet(ctx, target_pes, npes); +} + #define FC_SHMEM_FENCE FC_FUNC_(shmem_fence, SHMEM_FENCE) void SHMEM_FUNCTION_ATTRIBUTES diff --git a/src/transport_none.h b/src/transport_none.h index 9a10ea52c..8da3c6d98 100644 --- a/src/transport_none.h +++ b/src/transport_none.h @@ -103,6 +103,13 @@ shmem_transport_quiet(shmem_transport_ctx_t* ctx) return 0; } +static inline +int +shmem_transport_pe_quiet(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) +{ + return 0; +} + static inline int shmem_transport_fence(shmem_transport_ctx_t* ctx) diff --git a/src/transport_ofi.c b/src/transport_ofi.c index 180445b7b..3778f2a92 100644 --- a/src/transport_ofi.c +++ b/src/transport_ofi.c @@ -2021,6 +2021,11 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options, #ifndef USE_CTX_LOCK shmem_internal_cntr_write(&ctxp->pending_put_cntr, 0); shmem_internal_cntr_write(&ctxp->pending_get_cntr, 0); + + for (int i = 0; i < shmem_internal_num_pes; i++) { + shmem_internal_cntr_write(&ctx->pending_get_per_pe[i], 0); + shmem_internal_cntr_write(&ctx->pending_put_per_pe[i], 0); + } #endif ctxp->stx_idx = -1; diff --git a/src/transport_ofi.h b/src/transport_ofi.h index edf00b2e0..59da1982b 100644 --- a/src/transport_ofi.h +++ b/src/transport_ofi.h @@ -324,9 +324,13 @@ struct shmem_transport_ctx_t { /* Pending cntr accesses are protected by ctx lock */ uint64_t pending_put_cntr; uint64_t pending_get_cntr; + uint64_t pending_put_per_pe[shmem_internal_num_pes]; + uint64_t pending_get_per_pe[shmem_internal_num_pes]; #else shmem_internal_cntr_t pending_put_cntr; shmem_internal_cntr_t pending_get_cntr; + shmem_internal_cntr_t pending_put_per_pe[shmem_internal_num_pes]; + shmem_internal_cntr_t pending_get_per_pe[shmem_internal_num_pes]; #endif /* These counters are protected by the BB lock */ uint64_t pending_bb_cntr; @@ -533,6 +537,72 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx) SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } +static inline +void shmem_transport_pe_put_quiet(shmem_transport_ctx_t *ctx, const int *target_pes, int npes) { + + SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); + + /* Wait for bounce buffered operations to complete */ + if (ctx->bounce_buffers) { + SHMEM_TRANSPORT_OFI_CTX_BB_LOCK(ctx); + + while (ctx->bounce_buffers->nalloc > 0) { + shmem_transport_ofi_drain_cq(ctx); + } + + SHMEM_TRANSPORT_OFI_CTX_BB_UNLOCK(ctx); + } + + /* PE-specific put quiet: wait only for the given PEs */ + uint64_t success, fail, cnt, cnt_new; + long poll_count = 0; + + for (int i = 0; i < npes; i++) { + int pe = target_pes[i]; + + /* Check if this PE has pending puts */ + if (SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]) == 0) { + continue; // No need to wait + } + + while (poll_count < shmem_transport_ofi_put_poll_limit || + shmem_transport_ofi_put_poll_limit < 0) { + success = fi_cntr_read(ctx->put_cntr); + fail = fi_cntr_readerr(ctx->put_cntr); + cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]); + + shmem_transport_probe(); + + if (success < cnt && fail == 0) { + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); + SPINLOCK_BODY(); + SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); + } else if (fail) { + RAISE_ERROR_MSG("Operations to PE %d failed (%" PRIu64 ")\n", pe, fail); + } else { + break; + } + poll_count++; + } + + /* Final blocking wait for remaining puts */ + cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]); + do { + cnt = cnt_new; + ssize_t ret = fi_cntr_wait(ctx->put_cntr, cnt, -1); + cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]); + OFI_CTX_CHECK_ERROR(ctx, ret); + } while (cnt < cnt_new); + + shmem_internal_assert(cnt == cnt_new); + + /* Mark completion for this PE */ + ctx->pending_put_per_pe[pe] = 0; + } + + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); +} + static inline int shmem_transport_quiet(shmem_transport_ctx_t* ctx) { @@ -543,6 +613,16 @@ int shmem_transport_quiet(shmem_transport_ctx_t* ctx) return 0; } +static inline +int shmem_transport_pe_quiet(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) +{ + + shmem_transport_pe_put_quiet(ctx, target_pes, npes); + shmem_transport_pe_get_wait(ctx, target_pes, npes); + + return 0; +} + static inline int shmem_transport_fence(shmem_transport_ctx_t* ctx) @@ -622,7 +702,8 @@ void shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); + do { ret = fi_inject_write(ctx->ep, @@ -661,6 +742,7 @@ void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, con polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_write(ctx->ep, @@ -696,6 +778,7 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); shmem_transport_ofi_get_mr(target, pe, &addr, &key); shmem_transport_ofi_bounce_buffer_t *buff = @@ -742,6 +825,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); const struct iovec msg_iov = { .iov_base = src_buf, @@ -810,6 +894,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co msg.context = frag_source; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE); @@ -838,6 +923,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); const struct fi_ioc msg_iov_signal = { .addr = (uint8_t *) &signal, @@ -910,6 +996,7 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s if (len <= shmem_transport_ofi_max_msg_size) { SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_read(ctx->ep, target, @@ -932,6 +1019,7 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_read(ctx->ep, @@ -997,6 +1085,71 @@ void shmem_transport_get_wait(shmem_transport_ctx_t* ctx) SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } +static inline +void shmem_transport_pe_get_wait(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) +{ + uint64_t success, fail; + uint64_t initial_cnt[shmem_internal_num_pes]; + long poll_count = 0; + + SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); + + // Record initial pending count for each target PE + for (int i = 0; i < npes; i++) { + initial_cnt[i] = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_per_pe[target_pes[i]]); + } + + while (poll_count < shmem_transport_ofi_get_poll_limit || + shmem_transport_ofi_get_poll_limit < 0) { + success = fi_cntr_read(ctx->get_cntr); + fail = fi_cntr_readerr(ctx->get_cntr); + + shmem_transport_probe(); + + bool done = true; + for (int i = 0; i < npes; i++) { + if (success < initial_cnt[i]) { + done = false; + break; + } + } + + if (fail) { + RAISE_ERROR_MSG("Operations completed in error (%" PRIu64 ")\n", fail); + } else if (done) { + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); + return; + } else { + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); + SPINLOCK_BODY(); + SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); + } + + poll_count++; + } + + // If we reached poll limit, use blocking wait + bool done; + do { + done = true; + for (int i = 0; i < npes; i++) { + uint64_t cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_per_pe[target_pes[i]]); + if (cnt_new > initial_cnt[i]) { + initial_cnt[i] = cnt_new; + done = false; + } + } + + if (!done) { + ssize_t ret = fi_cntr_wait(ctx->get_cntr, *initial_cnt, -1); // Using min as a proxy + OFI_CTX_CHECK_ERROR(ctx, ret); + } + + } while (!done); + + SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); +} + static inline void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const @@ -1032,6 +1185,7 @@ void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_compare_atomicmsg(ctx->ep, @@ -1073,6 +1227,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_compare_atomic(ctx->ep, @@ -1112,6 +1267,7 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_compare_atomic(ctx->ep, @@ -1149,6 +1305,7 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_inject_atomic(ctx->ep, @@ -1200,6 +1357,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_inject_atomic(ctx->ep, @@ -1221,6 +1379,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); const struct fi_ioc msg_iov = { .addr = buff->data, .count = len }; const struct fi_rma_ioc rma_iov = { .addr = (uint64_t) addr, .count = len, .key = key }; @@ -1249,6 +1408,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi (max_atomic_size/SHMEM_Dtsize[dt])); polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_atomic(ctx->ep, (void *)((char *)source + @@ -1307,6 +1467,7 @@ void shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_fetch_atomicmsg(ctx->ep, @@ -1346,6 +1507,7 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_fetch_atomic(ctx->ep, diff --git a/src/transport_portals4.h b/src/transport_portals4.h index ab99becaa..ee138cabb 100644 --- a/src/transport_portals4.h +++ b/src/transport_portals4.h @@ -292,6 +292,13 @@ shmem_transport_quiet(shmem_transport_ctx_t* ctx) return 0; } +static inline +int +shmem_transport_pe_quiet(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) +{ + RAISE_ERROR_STR("Not supported"); +} + static inline int diff --git a/src/transport_ucx.h b/src/transport_ucx.h index 2e12d22b2..cf081d7c0 100644 --- a/src/transport_ucx.h +++ b/src/transport_ucx.h @@ -210,6 +210,22 @@ shmem_transport_quiet(shmem_transport_ctx_t* ctx) return 0; } +static inline +int +shmem_transport_pe_quiet(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) +{ + ucs_status_t status; + + for (int i = 0; i < npes; i++) { + int pe = target_pes[i]; + + status = ucp_ep_flush(shmem_transport_peers[pe].ep); + UCX_CHECK_STATUS(status); + } + + return 0; +} + static inline int shmem_transport_fence(shmem_transport_ctx_t* ctx) From 126ce9c7f0138a661ff3edb54ef13943a5489760 Mon Sep 17 00:00:00 2001 From: "Vinciguerra, Armando" Date: Fri, 13 Jun 2025 09:02:26 -0400 Subject: [PATCH 2/2] Calling quiet in OFI and Portals --- mpp/shmemx_c_func.h4 | 2 +- src/synchronization_f.c | 6 +- src/transport_ofi.c | 4 - src/transport_ofi.h | 156 +-------------------------------------- src/transport_portals4.h | 2 +- 5 files changed, 6 insertions(+), 164 deletions(-) diff --git a/mpp/shmemx_c_func.h4 b/mpp/shmemx_c_func.h4 index ff8c80f78..fb3e3f720 100644 --- a/mpp/shmemx_c_func.h4 +++ b/mpp/shmemx_c_func.h4 @@ -108,4 +108,4 @@ SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_heap_postinit(void); /* Memory Ordering Routines */ SHMEM_FUNCTION_ATTRIBUTES void shmem_pe_quiet(const int *target_pes, int npes); -SHMEM_FUNCTION_ATTRIBUTES void shmem_ctx_quiet(shmem_ctx_t ctx, const int *target_pes, int npes); +SHMEM_FUNCTION_ATTRIBUTES void shmem_ctx_pe_quiet(shmem_ctx_t ctx, const int *target_pes, int npes); diff --git a/src/synchronization_f.c b/src/synchronization_f.c index 2065165ae..30f48ec6b 100644 --- a/src/synchronization_f.c +++ b/src/synchronization_f.c @@ -48,11 +48,11 @@ FC_SHMEM_PE_QUIET(const int *target_pes, int npes) shmem_internal_pe_quiet(SHMEM_CTX_DEFAULT, target_pes, npes); } -#define FC_SHMEM_CTX_QUIET FC_FUNC_(shmem_ctx_quiet, SHMEM_CTX_QUIET) +#define FC_SHMEM_CTX_QUIET FC_FUNC_(shmem_ctx_pe_quiet, SHMEM_CTX_PE_QUIET) void SHMEM_FUNCTION_ATTRIBUTES -FC_SHMEM_CTX_QUIET(shmem_ctx_t ctx, const int *target_pes, int npes); +FC_SHMEM_CTX_PE_QUIET(shmem_ctx_t ctx, const int *target_pes, int npes); void -FC_SHMEM_CTX_QUIET(shmem_ctx_t ctx, const int *target_pes, int npes) +FC_SHMEM_CTX_PE_QUIET(shmem_ctx_t ctx, const int *target_pes, int npes) { SHMEM_ERR_CHECK_INITIALIZED(); diff --git a/src/transport_ofi.c b/src/transport_ofi.c index 3778f2a92..ef06eac4b 100644 --- a/src/transport_ofi.c +++ b/src/transport_ofi.c @@ -2022,10 +2022,6 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options, shmem_internal_cntr_write(&ctxp->pending_put_cntr, 0); shmem_internal_cntr_write(&ctxp->pending_get_cntr, 0); - for (int i = 0; i < shmem_internal_num_pes; i++) { - shmem_internal_cntr_write(&ctx->pending_get_per_pe[i], 0); - shmem_internal_cntr_write(&ctx->pending_put_per_pe[i], 0); - } #endif ctxp->stx_idx = -1; diff --git a/src/transport_ofi.h b/src/transport_ofi.h index 59da1982b..495640122 100644 --- a/src/transport_ofi.h +++ b/src/transport_ofi.h @@ -324,13 +324,9 @@ struct shmem_transport_ctx_t { /* Pending cntr accesses are protected by ctx lock */ uint64_t pending_put_cntr; uint64_t pending_get_cntr; - uint64_t pending_put_per_pe[shmem_internal_num_pes]; - uint64_t pending_get_per_pe[shmem_internal_num_pes]; #else shmem_internal_cntr_t pending_put_cntr; shmem_internal_cntr_t pending_get_cntr; - shmem_internal_cntr_t pending_put_per_pe[shmem_internal_num_pes]; - shmem_internal_cntr_t pending_get_per_pe[shmem_internal_num_pes]; #endif /* These counters are protected by the BB lock */ uint64_t pending_bb_cntr; @@ -537,72 +533,6 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx) SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } -static inline -void shmem_transport_pe_put_quiet(shmem_transport_ctx_t *ctx, const int *target_pes, int npes) { - - SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - - /* Wait for bounce buffered operations to complete */ - if (ctx->bounce_buffers) { - SHMEM_TRANSPORT_OFI_CTX_BB_LOCK(ctx); - - while (ctx->bounce_buffers->nalloc > 0) { - shmem_transport_ofi_drain_cq(ctx); - } - - SHMEM_TRANSPORT_OFI_CTX_BB_UNLOCK(ctx); - } - - /* PE-specific put quiet: wait only for the given PEs */ - uint64_t success, fail, cnt, cnt_new; - long poll_count = 0; - - for (int i = 0; i < npes; i++) { - int pe = target_pes[i]; - - /* Check if this PE has pending puts */ - if (SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]) == 0) { - continue; // No need to wait - } - - while (poll_count < shmem_transport_ofi_put_poll_limit || - shmem_transport_ofi_put_poll_limit < 0) { - success = fi_cntr_read(ctx->put_cntr); - fail = fi_cntr_readerr(ctx->put_cntr); - cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]); - - shmem_transport_probe(); - - if (success < cnt && fail == 0) { - SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); - SPINLOCK_BODY(); - SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - } else if (fail) { - RAISE_ERROR_MSG("Operations to PE %d failed (%" PRIu64 ")\n", pe, fail); - } else { - break; - } - poll_count++; - } - - /* Final blocking wait for remaining puts */ - cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]); - do { - cnt = cnt_new; - ssize_t ret = fi_cntr_wait(ctx->put_cntr, cnt, -1); - cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_per_pe[pe]); - OFI_CTX_CHECK_ERROR(ctx, ret); - } while (cnt < cnt_new); - - shmem_internal_assert(cnt == cnt_new); - - /* Mark completion for this PE */ - ctx->pending_put_per_pe[pe] = 0; - } - - SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); -} - static inline int shmem_transport_quiet(shmem_transport_ctx_t* ctx) { @@ -617,8 +547,7 @@ static inline int shmem_transport_pe_quiet(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) { - shmem_transport_pe_put_quiet(ctx, target_pes, npes); - shmem_transport_pe_get_wait(ctx, target_pes, npes); + shmem_transport_quiet(ctx); return 0; } @@ -702,7 +631,6 @@ void shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { @@ -742,7 +670,6 @@ void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, con polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_write(ctx->ep, @@ -778,7 +705,6 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); shmem_transport_ofi_get_mr(target, pe, &addr, &key); shmem_transport_ofi_bounce_buffer_t *buff = @@ -825,7 +751,6 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); const struct iovec msg_iov = { .iov_base = src_buf, @@ -894,7 +819,6 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co msg.context = frag_source; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_writemsg(ctx->ep, &msg, FI_DELIVERY_COMPLETE); @@ -923,7 +847,6 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); const struct fi_ioc msg_iov_signal = { .addr = (uint8_t *) &signal, @@ -996,7 +919,6 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s if (len <= shmem_transport_ofi_max_msg_size) { SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_read(ctx->ep, target, @@ -1019,7 +941,6 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_read(ctx->ep, @@ -1085,72 +1006,6 @@ void shmem_transport_get_wait(shmem_transport_ctx_t* ctx) SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } -static inline -void shmem_transport_pe_get_wait(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) -{ - uint64_t success, fail; - uint64_t initial_cnt[shmem_internal_num_pes]; - long poll_count = 0; - - SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - - // Record initial pending count for each target PE - for (int i = 0; i < npes; i++) { - initial_cnt[i] = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_per_pe[target_pes[i]]); - } - - while (poll_count < shmem_transport_ofi_get_poll_limit || - shmem_transport_ofi_get_poll_limit < 0) { - success = fi_cntr_read(ctx->get_cntr); - fail = fi_cntr_readerr(ctx->get_cntr); - - shmem_transport_probe(); - - bool done = true; - for (int i = 0; i < npes; i++) { - if (success < initial_cnt[i]) { - done = false; - break; - } - } - - if (fail) { - RAISE_ERROR_MSG("Operations completed in error (%" PRIu64 ")\n", fail); - } else if (done) { - SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); - return; - } else { - SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); - SPINLOCK_BODY(); - SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - } - - poll_count++; - } - - // If we reached poll limit, use blocking wait - bool done; - do { - done = true; - for (int i = 0; i < npes; i++) { - uint64_t cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_per_pe[target_pes[i]]); - if (cnt_new > initial_cnt[i]) { - initial_cnt[i] = cnt_new; - done = false; - } - } - - if (!done) { - ssize_t ret = fi_cntr_wait(ctx->get_cntr, *initial_cnt, -1); // Using min as a proxy - OFI_CTX_CHECK_ERROR(ctx, ret); - } - - } while (!done); - - SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); -} - - static inline void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, const void *operand, @@ -1185,7 +1040,6 @@ void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_compare_atomicmsg(ctx->ep, @@ -1227,7 +1081,6 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_compare_atomic(ctx->ep, @@ -1267,7 +1120,6 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_compare_atomic(ctx->ep, @@ -1305,7 +1157,6 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_inject_atomic(ctx->ep, @@ -1357,7 +1208,6 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_inject_atomic(ctx->ep, @@ -1379,7 +1229,6 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); const struct fi_ioc msg_iov = { .addr = buff->data, .count = len }; const struct fi_rma_ioc rma_iov = { .addr = (uint64_t) addr, .count = len, .key = key }; @@ -1408,7 +1257,6 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi (max_atomic_size/SHMEM_Dtsize[dt])); polled = 0; SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_per_pe[pe]); do { ret = fi_atomic(ctx->ep, (void *)((char *)source + @@ -1467,7 +1315,6 @@ void shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_fetch_atomicmsg(ctx->ep, @@ -1507,7 +1354,6 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_per_pe[pe]); do { ret = fi_fetch_atomic(ctx->ep, diff --git a/src/transport_portals4.h b/src/transport_portals4.h index ee138cabb..bd3119d5d 100644 --- a/src/transport_portals4.h +++ b/src/transport_portals4.h @@ -296,7 +296,7 @@ static inline int shmem_transport_pe_quiet(shmem_transport_ctx_t* ctx, const int *target_pes, int npes) { - RAISE_ERROR_STR("Not supported"); + shmem_transport_quiet(ctx); }