diff --git a/src/collectives.c b/src/collectives.c index 07f9c311c..762dfaf74 100644 --- a/src/collectives.c +++ b/src/collectives.c @@ -268,6 +268,8 @@ shmem_internal_sync_linear(int PE_start, int PE_stride, int PE_size, long *pSync i < PE_size ; i++, pe += PE_stride) { shmem_internal_put_scalar(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), pe, nic_idx); + //shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &one, sizeof(one), pe, + // SHM_INTERNAL_SUM, SHM_INTERNAL_LONG, nic_idx); } } else { diff --git a/src/shmem_synchronization.h b/src/shmem_synchronization.h index f18fb4072..de9f00260 100644 --- a/src/shmem_synchronization.h +++ b/src/shmem_synchronization.h @@ -99,36 +99,59 @@ shmem_internal_fence(shmem_ctx_t ctx) #define SHMEM_TEST(type, a, b, ret) COMP(type, SYNC_LOAD(a), b, ret) -#define SHMEM_WAIT_POLL(var, value) \ - do { \ - while (SYNC_LOAD(var) == value) { \ - shmem_transport_probe(); \ - SPINLOCK_BODY(); \ - } \ +#ifdef USE_OFI +#define SHMEM_TRANSPORT_PROBE_ALL() \ + do { \ + for (size_t nic_idx = 0; nic_idx < shmem_transport_ofi_num_nics; nic_idx++) { \ + shmem_transport_probe(nic_idx); \ + } \ + } while (0); +#define SHMEM_TRANSPORT_PROBE_ALL_SPINLOCK() \ + do { \ + for (size_t nic_idx = 0; nic_idx < shmem_transport_ofi_num_nics; nic_idx++) { \ + shmem_transport_probe(nic_idx); \ + SPINLOCK_BODY(); \ + } \ + } while (0); +#else +#define SHMEM_TRANSPORT_PROBE_ALL() \ + do { \ + shmem_transport_probe(0); \ + } while (0); +#define SHMEM_TRANSPORT_PROBE_ALL_SPINLOCK() \ + do { \ + shmem_transport_probe(0); \ + SPINLOCK_BODY(); \ + } while (0); +#endif + +#define SHMEM_WAIT_POLL(var, value) \ + do { \ + while (SYNC_LOAD(var) == value) { \ + SHMEM_TRANSPORT_PROBE_ALL_SPINLOCK(); \ + } \ } while(0) -#define SHMEM_WAIT_UNTIL_POLL(var, cond, value) \ - do { \ - int cmpret; \ - \ - COMP(cond, SYNC_LOAD(var), value, cmpret); \ - while (!cmpret) { \ - shmem_transport_probe(); \ - SPINLOCK_BODY(); \ - COMP(cond, SYNC_LOAD(var), value, cmpret); \ - } \ +#define SHMEM_WAIT_UNTIL_POLL(var, cond, value) \ + do { \ + int cmpret; \ + \ + COMP(cond, SYNC_LOAD(var), value, cmpret); \ + while (!cmpret) { \ + SHMEM_TRANSPORT_PROBE_ALL_SPINLOCK(); \ + COMP(cond, SYNC_LOAD(var), value, cmpret); \ + } \ } while(0) -#define SHMEM_SIGNAL_WAIT_UNTIL_POLL(var, cond, value, sat_value) \ - do { \ - int cmpret; \ - \ - COMP_SIGNAL(cond, SYNC_LOAD(var), value, cmpret, sat_value); \ - while (!cmpret) { \ - shmem_transport_probe(); \ - SPINLOCK_BODY(); \ - COMP_SIGNAL(cond, SYNC_LOAD(var), value, cmpret, sat_value); \ - } \ +#define SHMEM_SIGNAL_WAIT_UNTIL_POLL(var, cond, value, sat_value) \ + do { \ + int cmpret; \ + \ + COMP_SIGNAL(cond, SYNC_LOAD(var), value, cmpret, sat_value); \ + while (!cmpret) { \ + SHMEM_TRANSPORT_PROBE_ALL_SPINLOCK(); \ + COMP_SIGNAL(cond, SYNC_LOAD(var), value, cmpret, sat_value); \ + } \ } while(0) #define SHMEM_WAIT_BLOCK(var, value) \ diff --git a/src/symmetric_heap_c.c b/src/symmetric_heap_c.c index 4230c1e6b..176f4d01b 100644 --- a/src/symmetric_heap_c.c +++ b/src/symmetric_heap_c.c @@ -363,7 +363,6 @@ shmem_realloc(void *ptr, size_t size) } SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc); - SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); shmem_internal_barrier_all(nic_idx); return ret; diff --git a/src/synchronization_c.c4 b/src/synchronization_c.c4 index 6e3eef0f2..271bc065e 100644 --- a/src/synchronization_c.c4 +++ b/src/synchronization_c.c4 @@ -232,7 +232,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL') } \ } \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return; \ } \ \ @@ -268,7 +268,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_ALL') } \ \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return; \ } \ \ @@ -304,7 +304,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_ALL_VECTOR') } \ } \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return SIZE_MAX; \ } \ \ @@ -324,7 +324,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_ALL_VECTOR') } \ } \ } \ - if (!cmpret) shmem_transport_probe(); \ + if (!cmpret) SHMEM_TRANSPORT_PROBE_ALL(); \ } \ \ shmem_internal_membar_acq_rel(); \ @@ -354,7 +354,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_ANY') } \ } \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return SIZE_MAX; \ } \ \ @@ -374,7 +374,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_ANY') } \ } \ } \ - if (!cmpret) shmem_transport_probe(); \ + if (!cmpret) SHMEM_TRANSPORT_PROBE_ALL(); \ } \ \ shmem_internal_membar_acq_rel(); \ @@ -408,7 +408,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_ANY_VECTOR') } \ } \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return 0; \ } \ \ @@ -423,7 +423,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_ANY_VECTOR') } \ } \ } \ - if (!cmpret) shmem_transport_probe(); \ + if (!cmpret) SHMEM_TRANSPORT_PROBE_ALL(); \ } \ shmem_internal_membar_acq_rel(); \ shmem_transport_syncmem(); \ @@ -456,7 +456,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_SOME') } \ } \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return 0; \ } \ \ @@ -471,7 +471,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_SOME') } \ } \ } \ - if (!cmpret) shmem_transport_probe(); \ + if (!cmpret) SHMEM_TRANSPORT_PROBE_ALL(); \ } \ shmem_internal_membar_acq_rel(); \ shmem_transport_syncmem(); \ @@ -495,7 +495,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_WAIT_UNTIL_SOME_VECTOR') shmem_internal_membar_acq_rel(); \ shmem_transport_syncmem(); \ } else { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ } \ return cmpret; \ } @@ -520,7 +520,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST') int cmpret; \ SHMEM_TEST(cond, &vars[i], value, cmpret); \ if (!cmpret) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return 0; \ } \ } \ @@ -551,7 +551,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST_ALL') int cmpret; \ SHMEM_TEST(cond, &vars[i], values[i], cmpret); \ if (!cmpret) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return 0; \ } \ } \ @@ -596,7 +596,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST_ALL_VECTOR') shmem_internal_membar_acq_rel(); \ shmem_transport_syncmem(); \ } else \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ \ return found_idx; \ } @@ -635,7 +635,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST_ANY') shmem_internal_membar_acq_rel(); \ shmem_transport_syncmem(); \ } else \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ \ return found_idx; \ } @@ -666,7 +666,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST_ANY_VECTOR') } \ } \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return 0; \ } \ \ @@ -680,7 +680,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST_ANY_VECTOR') } \ } \ } \ - if (!cmpret) shmem_transport_probe(); \ + if (!cmpret) SHMEM_TRANSPORT_PROBE_ALL(); \ shmem_internal_membar_acq_rel(); \ shmem_transport_syncmem(); \ return ncompleted; \ @@ -712,7 +712,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST_SOME') } \ } \ if (nelems == 0 || num_ignored == nelems) { \ - shmem_transport_probe(); \ + SHMEM_TRANSPORT_PROBE_ALL(); \ return 0; \ } \ \ @@ -726,7 +726,7 @@ SHMEM_BIND_C_SYNC(`SHMEM_DEF_TEST_SOME') } \ } \ } \ - if (!cmpret) shmem_transport_probe(); \ + if (!cmpret) SHMEM_TRANSPORT_PROBE_ALL(); \ shmem_internal_membar_acq_rel(); \ shmem_transport_syncmem(); \ return ncompleted; \ diff --git a/src/transport_none.h b/src/transport_none.h index 6d4a9c547..63c6cb014 100644 --- a/src/transport_none.h +++ b/src/transport_none.h @@ -59,7 +59,7 @@ shmem_transport_fini(void) static inline void -shmem_transport_probe(void) +shmem_transport_probe(size_t nic_idx) { return; } @@ -120,7 +120,7 @@ shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void static inline void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, long *completion) + int pe, long *completion, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -135,7 +135,7 @@ shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const v static inline void -shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion, size_t nic_idx) +shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) { /* No op */ } @@ -217,7 +217,7 @@ shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void *sou static inline void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, long *completion) + int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, long *completion, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } diff --git a/src/transport_ofi.c b/src/transport_ofi.c index 7a4451eb5..7f0cef474 100644 --- a/src/transport_ofi.c +++ b/src/transport_ofi.c @@ -64,40 +64,46 @@ struct fabric_info { int npes; }; -struct fid_fabric* shmem_transport_ofi_fabfd; -struct fid_domain* shmem_transport_ofi_domainfd; -struct fid_av* shmem_transport_ofi_avfd; -struct fid_ep* shmem_transport_ofi_target_ep; -struct fid_cq* shmem_transport_ofi_target_cq; -#if ENABLE_TARGET_CNTR -struct fid_cntr* shmem_transport_ofi_target_cntrfd; +struct shmem_transport_ofi_target_ep* shmem_transport_ofi_target_eps; +#ifndef ENABLE_MR_SCALABLE +uint64_t* heap_keys; +uint64_t* data_keys; +#ifndef ENABLE_REMOTE_VIRTUAL_ADDRESSING +uint8_t** heap_addrs; +uint8_t** data_addrs; #endif -#ifdef ENABLE_MR_SCALABLE -#ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING -struct fid_mr* shmem_transport_ofi_target_mrfd; -#else /* !ENABLE_REMOTE_VIRTUAL_ADDRESSING */ -struct fid_mr* shmem_transport_ofi_target_heap_mrfd; -struct fid_mr* shmem_transport_ofi_target_data_mrfd; #endif -#else /* !ENABLE_MR_SCALABLE */ -struct fid_mr* shmem_transport_ofi_target_heap_mrfd; -struct fid_mr* shmem_transport_ofi_target_data_mrfd; -uint64_t* shmem_transport_ofi_target_heap_keys; -uint64_t* shmem_transport_ofi_target_data_keys; -#ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING -int shmem_transport_ofi_use_absolute_address; -#else -uint8_t** shmem_transport_ofi_target_heap_addrs; -uint8_t** shmem_transport_ofi_target_data_addrs; -#endif /* ENABLE_REMOTE_VIRTUAL_ADDRESSING */ -#endif /* ENABLE_MR_SCALABLE */ - #ifdef USE_FI_HMEM -struct fid_mr* shmem_transport_ofi_external_heap_mrfd; -uint64_t* shmem_transport_ofi_external_heap_keys; -uint8_t** shmem_transport_ofi_external_heap_addrs; +uint64_t* external_heap_keys; +uint8_t** external_heap_addrs; #endif +//#ifdef ENABLE_MR_SCALABLE +//#ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING +//struct fid_mr** shmem_transport_ofi_target_mrfd; +//#else /* !ENABLE_REMOTE_VIRTUAL_ADDRESSING */ +//struct fid_mr** shmem_transport_ofi_target_heap_mrfds; +//struct fid_mr** shmem_transport_ofi_target_data_mrfds; +//#endif +//#else /* !ENABLE_MR_SCALABLE */ +//struct fid_mr** shmem_transport_ofi_target_heap_mrfds; +//struct fid_mr** shmem_transport_ofi_target_data_mrfds; +//uint64_t* shmem_transport_ofi_target_heap_keys; +//uint64_t* shmem_transport_ofi_target_data_keys; +//#ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING +//int shmem_transport_ofi_use_absolute_address; +//#else +//uint8_t** shmem_transport_ofi_target_heap_addrs; +//uint8_t** shmem_transport_ofi_target_data_addrs; +//#endif /* ENABLE_REMOTE_VIRTUAL_ADDRESSING */ +//#endif /* ENABLE_MR_SCALABLE */ +// +//#ifdef USE_FI_HMEM +//struct fid_mr* shmem_transport_ofi_external_heap_mrfd; +//uint64_t* shmem_transport_ofi_external_heap_keys; +//uint8_t** shmem_transport_ofi_external_heap_addrs; +//#endif + /* List of MR descriptors: current support is for heap, data, and one external heap */ struct fid_mr* shmem_transport_ofi_mrfd_list[3]; uint64_t shmem_transport_ofi_max_poll; @@ -600,7 +606,6 @@ static inline int bind_enable_ep_resources(shmem_transport_ctx_t *ctx, size_t idx) { int ret = 0; - /* If using SOS-managed STXs, bind the STX */ if (ctx->stx_idx[idx] >= 0) { ret = fi_ep_bind(ctx->ep[idx], &shmem_transport_ofi_stx_pool[idx][ctx->stx_idx[idx]].stx->fid, 0); @@ -643,7 +648,7 @@ int bind_enable_ep_resources(shmem_transport_ctx_t *ctx, size_t idx) #ifdef USE_FI_HMEM static inline -int ofi_mr_reg_external_heap(void) +int ofi_mr_reg_external_heap(size_t nic_idx) { int ret = 0; uint64_t key = 2; @@ -664,24 +669,24 @@ int ofi_mr_reg_external_heap(void) .context = NULL }; - ret = fi_mr_regattr(shmem_transport_ofi_domainfd, &mr_attr, 0, &shmem_transport_ofi_external_heap_mrfd); + ret = fi_mr_regattr(shmem_transport_ofi_target_eps[nic_idx].domainfd, &mr_attr, 0, &shmem_transport_ofi_target_eps[nic_idx].external_heap_mrfd); OFI_CHECK_RETURN_STR(ret, "fi_mr_regattr (heap) failed"); #if ENABLE_TARGET_CNTR - ret = fi_mr_bind(shmem_transport_ofi_external_heap_mrfd, - &shmem_transport_ofi_target_cntrfd->fid, + ret = fi_mr_bind(shmem_transport_ofi_target_eps[nic_idx].external_heap_mrfd, + &shmem_transport_ofi_target_eps[nic_idx].cntrfd->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target CNTR binding to external heap MR failed"); - if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_ENDPOINT) { - ret = fi_ep_bind(shmem_transport_ofi_target_ep, - &shmem_transport_ofi_target_cntrfd->fid, FI_REMOTE_WRITE); + if (provider_list[nic_idx]->domain_attr->mr_mode & FI_MR_ENDPOINT) { + ret = fi_ep_bind(shmem_transport_ofi_target_eps[nic_idx].ep, + &shmem_transport_ofi_target_eps[nic_idx].cntrfd->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target CNTR binding to target EP failed"); - ret = fi_mr_bind(shmem_transport_ofi_external_heap_mrfd, - &shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE); + ret = fi_mr_bind(shmem_transport_ofi_target_eps[nic_idx].external_heap_mrfd, + &shmem_transport_ofi_target_eps[nic_idx].ep->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target EP binding to heap MR failed"); - ret = fi_mr_enable(shmem_transport_ofi_external_heap_mrfd); + ret = fi_mr_enable(shmem_transport_ofi_target_eps[nic_idx].external_heap_mrfd); OFI_CHECK_RETURN_STR(ret, "target heap MR enable failed"); } #endif @@ -691,31 +696,31 @@ int ofi_mr_reg_external_heap(void) #endif /* USE_FI_HMEM */ static inline -int ofi_mr_reg_bind(uint64_t flags) +int ofi_mr_reg_bind(uint64_t flags, size_t nic_idx) { int ret = 0; #if defined(ENABLE_MR_SCALABLE) && defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING) - ret = fi_mr_reg(shmem_transport_ofi_domainfd, 0, UINT64_MAX, + ret = fi_mr_reg(shmem_transport_ofi_target_eps[nic_idx].domainfd, 0, UINT64_MAX, FI_REMOTE_READ | FI_REMOTE_WRITE, 0, 0ULL, flags, - &shmem_transport_ofi_target_mrfd, NULL); + &shmem_transport_ofi_target_eps[nic_idx].mrfd, NULL); OFI_CHECK_RETURN_STR(ret, "target memory (all) registration failed"); /* Bind counter with target memory region for incoming messages */ #if ENABLE_TARGET_CNTR - ret = fi_mr_bind(shmem_transport_ofi_target_mrfd, - &shmem_transport_ofi_target_cntrfd->fid, + ret = fi_mr_bind(shmem_transport_ofi_target_eps[nic_idx].mrfd, + &shmem_transport_ofi_target_eps[nic_idx].cntrfd->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target CNTR binding to MR failed"); #ifdef ENABLE_MR_RMA_EVENT if (shmem_transport_ofi_mr_rma_event) { - ret = fi_mr_enable(shmem_transport_ofi_target_mrfd); + ret = fi_mr_enable(shmem_transport_ofi_target_eps[nic_idx].mrfd); OFI_CHECK_RETURN_STR(ret, "target MR enable failed"); } #endif /* ENABLE_MR_RMA_EVENT */ #endif /* ENABLE_TARGET_CNTR */ - shmem_transport_ofi_mrfd_list[0] = shmem_transport_ofi_target_mrfd; + shmem_transport_ofi_mrfd_list[0] = shmem_transport_ofi_target_eps[nic_idx].mrfd; shmem_transport_ofi_mrfd_list[1] = NULL; #else @@ -723,66 +728,66 @@ int ofi_mr_reg_bind(uint64_t flags) * respectively. In MR_BASIC_MODE, the keys are ignored and selected by * the provider. */ uint64_t key = 1; - ret = fi_mr_reg(shmem_transport_ofi_domainfd, shmem_internal_heap_base, + ret = fi_mr_reg(shmem_transport_ofi_target_eps[nic_idx].domainfd, shmem_internal_heap_base, shmem_internal_heap_length, FI_REMOTE_READ | FI_REMOTE_WRITE, 0, key, flags, - &shmem_transport_ofi_target_heap_mrfd, NULL); + &shmem_transport_ofi_target_eps[nic_idx].heap_mrfd, NULL); OFI_CHECK_RETURN_STR(ret, "target memory (heap) registration failed"); key = 0; - ret = fi_mr_reg(shmem_transport_ofi_domainfd, shmem_internal_data_base, + ret = fi_mr_reg(shmem_transport_ofi_target_eps[nic_idx].domainfd, shmem_internal_data_base, shmem_internal_data_length, FI_REMOTE_READ | FI_REMOTE_WRITE, 0, key, flags, - &shmem_transport_ofi_target_data_mrfd, NULL); + &shmem_transport_ofi_target_eps[nic_idx].data_mrfd, NULL); OFI_CHECK_RETURN_STR(ret, "target memory (data) registration failed"); /* Bind counter with target memory region for incoming messages */ #if ENABLE_TARGET_CNTR - ret = fi_mr_bind(shmem_transport_ofi_target_heap_mrfd, - &shmem_transport_ofi_target_cntrfd->fid, + ret = fi_mr_bind(shmem_transport_ofi_target_eps[nic_idx].heap_mrfd, + &shmem_transport_ofi_target_eps[nic_idx].cntrfd->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target CNTR binding to heap MR failed"); - ret = fi_mr_bind(shmem_transport_ofi_target_data_mrfd, - &shmem_transport_ofi_target_cntrfd->fid, + ret = fi_mr_bind(shmem_transport_ofi_target_eps[nic_idx].data_mrfd, + &shmem_transport_ofi_target_eps[nic_idx].cntrfd->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target CNTR binding to data MR failed"); #ifdef ENABLE_MR_ENDPOINT - if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_ENDPOINT) { - ret = fi_ep_bind(shmem_transport_ofi_target_ep, - &shmem_transport_ofi_target_cntrfd->fid, FI_REMOTE_WRITE); + if (provider_list[nic_idx]->domain_attr->mr_mode & FI_MR_ENDPOINT) { + ret = fi_ep_bind(shmem_transport_ofi_target_eps[nic_idx].ep, + &shmem_transport_ofi_target_eps[nic_idx].cntrfd->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target CNTR binding to target EP failed"); - ret = fi_mr_bind(shmem_transport_ofi_target_heap_mrfd, - &shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE); + ret = fi_mr_bind(shmem_transport_ofi_target_eps[nic_idx].heap_mrfd, + &shmem_transport_ofi_target_eps[nic_idx].ep->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target EP binding to heap MR failed"); - ret = fi_mr_enable(shmem_transport_ofi_target_heap_mrfd); + ret = fi_mr_enable(shmem_transport_ofi_target_eps[nic_idx].heap_mrfd); OFI_CHECK_RETURN_STR(ret, "target heap MR enable failed"); - ret = fi_mr_bind(shmem_transport_ofi_target_data_mrfd, - &shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE); + ret = fi_mr_bind(shmem_transport_ofi_target_eps[nic_idx].data_mrfd, + &shmem_transport_ofi_target_eps[nic_idx].ep->fid, FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target EP binding to data MR failed"); - ret = fi_mr_enable(shmem_transport_ofi_target_data_mrfd); + ret = fi_mr_enable(shmem_transport_ofi_target_eps[nic_idx].data_mrfd); OFI_CHECK_RETURN_STR(ret, "target data MR enable failed"); } #endif #ifdef ENABLE_MR_RMA_EVENT if (shmem_transport_ofi_mr_rma_event) { - ret = fi_mr_enable(shmem_transport_ofi_target_data_mrfd); + ret = fi_mr_enable(shmem_transport_ofi_target_eps[nic_idx].data_mrfd); OFI_CHECK_RETURN_STR(ret, "target data MR enable failed"); - ret = fi_mr_enable(shmem_transport_ofi_target_heap_mrfd); + ret = fi_mr_enable(shmem_transport_ofi_target_eps[nic_idx].heap_mrfd); OFI_CHECK_RETURN_STR(ret, "target heap MR enable failed"); } #endif /* ENABLE_MR_RMA_EVENT */ #endif /* ENABLE_TARGET_CNTR */ - shmem_transport_ofi_mrfd_list[0] = shmem_transport_ofi_target_data_mrfd; - shmem_transport_ofi_mrfd_list[1] = shmem_transport_ofi_target_heap_mrfd; + shmem_transport_ofi_mrfd_list[0] = shmem_transport_ofi_target_eps[nic_idx].data_mrfd; + shmem_transport_ofi_mrfd_list[1] = shmem_transport_ofi_target_eps[nic_idx].heap_mrfd; #endif @@ -790,7 +795,7 @@ int ofi_mr_reg_bind(uint64_t flags) } static inline -int allocate_recv_cntr_mr(void) +int allocate_recv_cntr_mr(size_t nic_idx) { int ret = 0; uint64_t flags = 0; @@ -811,8 +816,8 @@ int allocate_recv_cntr_mr(void) cntr_attr.events = FI_CNTR_EVENTS_COMP; cntr_attr.wait_obj = FI_WAIT_UNSPEC; - ret = fi_cntr_open(shmem_transport_ofi_domainfd, &cntr_attr, - &shmem_transport_ofi_target_cntrfd, NULL); + ret = fi_cntr_open(shmem_transport_ofi_target_eps[nic_idx].domainfd, &cntr_attr, + &shmem_transport_ofi_target_eps[nic_idx].cntrfd, NULL); OFI_CHECK_RETURN_STR(ret, "target CNTR open failed"); #ifdef ENABLE_MR_RMA_EVENT @@ -824,9 +829,9 @@ int allocate_recv_cntr_mr(void) #ifdef USE_FI_HMEM if (shmem_external_heap_pre_initialized) { - ret = ofi_mr_reg_external_heap(); + ret = ofi_mr_reg_external_heap(nic_idx); OFI_CHECK_RETURN_STR(ret, "OFI MR registration with HMEM failed"); - shmem_transport_ofi_mrfd_list[2] = shmem_transport_ofi_external_heap_mrfd; + shmem_transport_ofi_mrfd_list[2] = shmem_transport_ofi_target_eps[nic_idx].external_heap_mrfd; } else { shmem_transport_ofi_mrfd_list[2] = NULL; } @@ -834,7 +839,7 @@ int allocate_recv_cntr_mr(void) shmem_transport_ofi_mrfd_list[2] = NULL; #endif - ret = ofi_mr_reg_bind(flags); + ret = ofi_mr_reg_bind(flags, nic_idx); OFI_CHECK_RETURN_STR(ret, "OFI MR registration failed"); return ret; @@ -844,33 +849,45 @@ int allocate_recv_cntr_mr(void) static int publish_external_mr_info(void) { - int err; - uint64_t ext_heap_key; + for (size_t nic_idx = 0; nic_idx < shmem_transport_ofi_num_nics; nic_idx++) { + int err; + uint64_t ext_heap_key; + struct fi_info *info = provider_list[nic_idx]; + char ext_heap_key_name[32], ext_heap_addr_name[32]; - if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_PROV_KEY) { - ext_heap_key = fi_mr_key(shmem_transport_ofi_external_heap_mrfd); - } else { - ext_heap_key = 2; - } + if (info->domain_attr->mr_mode & FI_MR_PROV_KEY) { + ext_heap_key = fi_mr_key(shmem_transport_ofi_target_eps[nic_idx].external_heap_mrfd); + } else { + ext_heap_key = 2; + } - err = shmem_runtime_put("fi_ext_heap_key", &ext_heap_key, sizeof(uint64_t)); - if (err) { - RAISE_WARN_STR("Put of heap key to runtime KVS failed"); - return 1; - } + err = sprintf(ext_heap_key_name, "fi_ext_heap_key_%zu", nic_idx); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - external heap key"); + } + err = shmem_runtime_put(ext_heap_key_name, &ext_heap_key, sizeof(uint64_t)); + if (err) { + RAISE_WARN_STR("Put of heap key to runtime KVS failed"); + return 1; + } - void *ext_heap_base; + void *ext_heap_base; - if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_VIRT_ADDR) { - ext_heap_base = shmem_external_heap_base; - } else { - ext_heap_base = (void *) 0; - } + if (info->domain_attr->mr_mode & FI_MR_VIRT_ADDR) { + ext_heap_base = shmem_external_heap_base; + } else { + ext_heap_base = (void *) 0; + } - err = shmem_runtime_put("fi_ext_heap_addr", &ext_heap_base, sizeof(uint8_t*)); - if (err) { - RAISE_WARN_STR("Put of heap address to runtime KVS failed"); - return 1; + err = sprintf(ext_heap_addr_name, "fi_ext_heap_addr_%zu", nic_idx); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - external heap addr"); + } + err = shmem_runtime_put(ext_heap_addr_name, &ext_heap_base, sizeof(uint8_t*)); + if (err) { + RAISE_WARN_STR("Put of heap address to runtime KVS failed"); + return 1; + } } return 0; @@ -878,75 +895,96 @@ int publish_external_mr_info(void) #endif static -int publish_mr_info(struct fi_info *info) +int publish_mr_info(void) { + for (size_t nic_idx = 0; nic_idx < shmem_transport_ofi_num_nics; nic_idx++) { + struct fi_info *info = provider_list[nic_idx]; #ifndef ENABLE_MR_SCALABLE - { - int err; - uint64_t heap_key, data_key; - - if (info->domain_attr->mr_mode & FI_MR_PROV_KEY) { - heap_key = fi_mr_key(shmem_transport_ofi_target_heap_mrfd); - data_key = fi_mr_key(shmem_transport_ofi_target_data_mrfd); - } else { - heap_key = 1; - data_key = 0; - } + { + int err; + uint64_t heap_key, data_key; + + char heap_key_name[32], data_key_name[32]; + if (info->domain_attr->mr_mode & FI_MR_PROV_KEY) { + heap_key = fi_mr_key(shmem_transport_ofi_target_eps[nic_idx].heap_mrfd); + data_key = fi_mr_key(shmem_transport_ofi_target_eps[nic_idx].data_mrfd); + } else { + heap_key = 1; + data_key = 0; + } - err = shmem_runtime_put("fi_heap_key", &heap_key, sizeof(uint64_t)); - if (err) { - RAISE_WARN_STR("Put of heap key to runtime KVS failed"); - return 1; - } + err = sprintf(heap_key_name, "fi_heap_key_%zu", nic_idx); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - heap key name"); + } + err = shmem_runtime_put(heap_key_name, &heap_key, sizeof(uint64_t)); + if (err) { + RAISE_WARN_STR("Put of heap key to runtime KVS failed"); + return 1; + } - err = shmem_runtime_put("fi_data_key", &data_key, sizeof(uint64_t)); - if (err) { - RAISE_WARN_STR("Put of data segment key to runtime KVS failed"); - return 1; + err = sprintf(data_key_name, "fi_data_key_%zu", nic_idx); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - data key name"); + } + err = shmem_runtime_put(data_key_name, &data_key, sizeof(uint64_t)); + if (err) { + RAISE_WARN_STR("Put of data segment key to runtime KVS failed"); + return 1; + } } - } #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING - if (info->domain_attr->mr_mode & FI_MR_VIRT_ADDR) - shmem_transport_ofi_use_absolute_address = 1; - else - shmem_transport_ofi_use_absolute_address = 0; + if (info->domain_attr->mr_mode & FI_MR_VIRT_ADDR) + shmem_transport_ofi_target_eps[nic_idx].use_absolute_address = 1; + else + shmem_transport_ofi_target_eps[nic_idx].use_absolute_address = 0; #else /* !ENABLE_REMOTE_VIRTUAL_ADDRESSING */ - { - int err; - void *heap_base, *data_base; - - if (info->domain_attr->mr_mode & FI_MR_VIRT_ADDR) { - heap_base = shmem_internal_heap_base; - data_base = shmem_internal_data_base; - } else { - heap_base = (void *) 0; - data_base = (void *) 0; - } + { + int err; + void *heap_base, *data_base; + char heap_addr_name[32], data_addr_name[32]; + + if (info->domain_attr->mr_mode & FI_MR_VIRT_ADDR) { + heap_base = shmem_internal_heap_base; + data_base = shmem_internal_data_base; + } else { + heap_base = (void *) 0; + data_base = (void *) 0; + } - err = shmem_runtime_put("fi_heap_addr", &heap_base, sizeof(uint8_t*)); - if (err) { - RAISE_WARN_STR("Put of heap address to runtime KVS failed"); - return 1; - } + err = sprintf(heap_addr_name, "fi_heap_addr_%zu", nic_idx); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - heap addr"); + } + err = shmem_runtime_put(heap_addr_name, &heap_base, sizeof(uint8_t*)); + if (err) { + RAISE_WARN_STR("Put of heap address to runtime KVS failed"); + return 1; + } - err = shmem_runtime_put("fi_data_addr", &data_base, sizeof(uint8_t*)); - if (err) { - RAISE_WARN_STR("Put of data segment address to runtime KVS failed"); - return 1; + err = sprintf(data_addr_name, "fi_data_addr_%zu", nic_idx); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - data addr"); + } + err = shmem_runtime_put(data_addr_name, &data_base, sizeof(uint8_t*)); + if (err) { + RAISE_WARN_STR("Put of data segment address to runtime KVS failed"); + return 1; + } } - } #endif /* ENABLE_REMOTE_VIRTUAL_ADDRESSING */ #endif /* !ENABLE_MR_SCALABLE */ + } #ifdef USE_FI_HMEM - if (shmem_external_heap_pre_initialized) { - int err = publish_external_mr_info(); - if (err) { - RAISE_WARN_STR("Publish of external mr info failed"); - return 1; + if (shmem_external_heap_pre_initialized) { + int err = publish_external_mr_info(); + if (err) { + RAISE_WARN_STR("Publish of external mr info failed"); + return 1; + } } - } #endif return 0; @@ -956,39 +994,48 @@ int publish_mr_info(struct fi_info *info) static int populate_external_mr_tables(void) { - int i, err; + int err; - shmem_transport_ofi_external_heap_keys = malloc(sizeof(uint64_t) * shmem_internal_num_pes); - if (NULL == shmem_transport_ofi_external_heap_keys) { + external_heap_keys = malloc(sizeof(uint64_t) * shmem_internal_num_pes * shmem_transport_ofi_num_nics); + if (NULL == external_heap_keys) { RAISE_WARN_STR("Out of memory allocating heap keytable"); return 1; } - /* Called after the upper layer performs the runtime exchange */ - for (i = 0; i < shmem_internal_num_pes; i++) { - err = shmem_runtime_get(i, "fi_ext_heap_key", - &shmem_transport_ofi_external_heap_keys[i], - sizeof(uint64_t)); - if (err) { - RAISE_WARN_STR("Get of heap key from runtime KVS failed"); - return 1; - } - } - - shmem_transport_ofi_external_heap_addrs = malloc(sizeof(uint8_t*) * shmem_internal_num_pes); - if (NULL == shmem_transport_ofi_external_heap_addrs) { + external_heap_addrs = malloc(sizeof(uint8_t*) * shmem_internal_num_pes * shmem_transport_ofi_num_nics); + if (NULL == external_heap_addrs) { RAISE_WARN_STR("Out of memory allocating heap addrtable"); return 1; } /* Called after the upper layer performs the runtime exchange */ - for (i = 0; i < shmem_internal_num_pes; i++) { - err = shmem_runtime_get(i, "fi_ext_heap_addr", - &shmem_transport_ofi_external_heap_addrs[i], - sizeof(uint8_t*)); - if (err) { - RAISE_WARN_STR("Get of heap address from runtime KVS failed"); - return 1; + for (size_t i = 0; i < shmem_internal_num_pes; i++) { + for (size_t j = 0; j < shmem_transport_ofi_num_nics; j++) { + char ext_heap_key_name[32], ext_heap_addr_name[32]; + + err = sprintf(ext_heap_key_name, "fi_ext_heap_key_%zu", j); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - ext. heap key name"); + } + err = shmem_runtime_get(i, ext_heap_key_name, + &external_heap_keys[(i * shmem_transport_ofi_num_nics) + j], + sizeof(uint64_t)); + if (err) { + RAISE_WARN_STR("Get of heap key from runtime KVS failed"); + return 1; + } + + err = sprintf(ext_heap_addr_name, "fi_ext_heap_addr_%zu", j); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - ext. heap addr name"); + } + err = shmem_runtime_get(i, ext_heap_addr_name, + &external_heap_addrs[(i * shmem_transport_ofi_num_nics) + j], + sizeof(uint8_t*)); + if (err) { + RAISE_WARN_STR("Get of heap address from runtime KVS failed"); + return 1; + } } } @@ -1000,85 +1047,110 @@ static int populate_mr_tables(void) { #ifndef ENABLE_MR_SCALABLE - { - int i, err; + heap_keys = malloc(sizeof(uint64_t) * shmem_internal_num_pes * shmem_transport_ofi_num_nics); + if (NULL == heap_keys) { + RAISE_WARN_STR("Out of memory allocating heap keytable"); + return 1; + } - shmem_transport_ofi_target_heap_keys = malloc(sizeof(uint64_t) * shmem_internal_num_pes); - if (NULL == shmem_transport_ofi_target_heap_keys) { - RAISE_WARN_STR("Out of memory allocating heap keytable"); - return 1; - } + data_keys = malloc(sizeof(uint64_t) * shmem_internal_num_pes * shmem_transport_ofi_num_nics); + if (NULL == data_keys) { + RAISE_WARN_STR("Out of memory allocating heap keytable"); + return 1; + } +#endif - shmem_transport_ofi_target_data_keys = malloc(sizeof(uint64_t) * shmem_internal_num_pes); - if (NULL == shmem_transport_ofi_target_data_keys) { - RAISE_WARN_STR("Out of memory allocating heap keytable"); - return 1; - } +#ifndef ENABLE_REMOTE_VIRTUAL_ADDRESSING + heap_addrs = malloc(sizeof(uint8_t*) * shmem_internal_num_pes * shmem_transport_ofi_num_nics); + if (NULL == heap_addrs) { + RAISE_WARN_STR("Out of memory allocating heap addrtable"); + return 1; + } - /* Called after the upper layer performs the runtime exchange */ - for (i = 0; i < shmem_internal_num_pes; i++) { - err = shmem_runtime_get(i, "fi_heap_key", - &shmem_transport_ofi_target_heap_keys[i], - sizeof(uint64_t)); - if (err) { - RAISE_WARN_STR("Get of heap key from runtime KVS failed"); - return 1; - } - err = shmem_runtime_get(i, "fi_data_key", - &shmem_transport_ofi_target_data_keys[i], - sizeof(uint64_t)); - if (err) { - RAISE_WARN_STR("Get of data segment key from runtime KVS failed"); - return 1; - } - } + data_addrs = malloc(sizeof(uint8_t*) * shmem_internal_num_pes * shmem_transport_ofi_num_nics); + if (NULL == data_addrs) { + RAISE_WARN_STR("Out of memory allocating data addrtable"); + return 1; } +#endif -#ifndef ENABLE_REMOTE_VIRTUAL_ADDRESSING - { - int i, err; + int err; + for (size_t i = 0; i < shmem_internal_num_pes; i++) { +#ifndef ENABLE_MR_SCALABLE + { + /* Called after the upper layer performs the runtime exchange */ + for (size_t j = 0; j < shmem_transport_ofi_num_nics; j++) { + char heap_key_name[32], data_key_name[32]; + + err = sprintf(heap_key_name, "fi_heap_key_%zu", j); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - heap key name"); + } + err = shmem_runtime_get(i, heap_key_name, + &heap_keys[(i * shmem_transport_ofi_num_nics) + j], + sizeof(uint64_t)); + if (err) { + RAISE_WARN_STR("Get of heap key from runtime KVS failed"); + return 1; + } - shmem_transport_ofi_target_heap_addrs = malloc(sizeof(uint8_t*) * shmem_internal_num_pes); - if (NULL == shmem_transport_ofi_target_heap_addrs) { - RAISE_WARN_STR("Out of memory allocating heap addrtable"); - return 1; + err = sprintf(data_key_name, "fi_data_key_%zu", j); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - data key name"); + } + err = shmem_runtime_get(i, data_key_name, + &data_keys[(i * shmem_transport_ofi_num_nics) + j], + sizeof(uint64_t)); + if (err) { + RAISE_WARN_STR("Get of data segment key from runtime KVS failed"); + return 1; + } + } } - shmem_transport_ofi_target_data_addrs = malloc(sizeof(uint8_t*) * shmem_internal_num_pes); - if (NULL == shmem_transport_ofi_target_data_addrs) { - RAISE_WARN_STR("Out of memory allocating data addrtable"); - return 1; - } +#ifndef ENABLE_REMOTE_VIRTUAL_ADDRESSING + { + /* Called after the upper layer performs the runtime exchange */ + for (size_t j = 0; j < shmem_transport_ofi_num_nics; j++) { + char heap_addr_name[32], data_addr_name[32]; + + err = sprintf(heap_addr_name, "fi_heap_addr_%zu", j); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - heap addr"); + } + err = shmem_runtime_get(i, heap_addr_name, + &heap_addrs[(i * shmem_transport_ofi_num_nics) + j], + sizeof(uint8_t*)); + if (err) { + RAISE_WARN_STR("Get of heap address from runtime KVS failed"); + return 1; + } - /* Called after the upper layer performs the runtime exchange */ - for (i = 0; i < shmem_internal_num_pes; i++) { - err = shmem_runtime_get(i, "fi_heap_addr", - &shmem_transport_ofi_target_heap_addrs[i], - sizeof(uint8_t*)); - if (err) { - RAISE_WARN_STR("Get of heap address from runtime KVS failed"); - return 1; - } - err = shmem_runtime_get(i, "fi_data_addr", - &shmem_transport_ofi_target_data_addrs[i], - sizeof(uint8_t*)); - if (err) { - RAISE_WARN_STR("Get of data segment address from runtime KVS failed"); - return 1; + err = sprintf(data_addr_name, "fi_data_addr_%zu", j); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - data addr"); + } + err = shmem_runtime_get(i, data_addr_name, + &data_addrs[(i * shmem_transport_ofi_num_nics) + j], + sizeof(uint8_t*)); + if (err) { + RAISE_WARN_STR("Get of data segment address from runtime KVS failed"); + return 1; + } } } - } #endif /* ENABLE_REMOTE_VIRTUAL_ADDRESSING */ #endif /* !ENABLE_MR_SCALABLE */ + } #ifdef USE_FI_HMEM - if (shmem_external_heap_pre_initialized) { - int err = populate_external_mr_tables(); - if (err) { - RAISE_WARN_STR("Populate external MR tables failed"); - return 1; + if (shmem_external_heap_pre_initialized) { + int err = populate_external_mr_tables(); + if (err) { + RAISE_WARN_STR("Populate external MR tables failed"); + return 1; + } } - } #endif return 0; @@ -1245,18 +1317,25 @@ int atomic_limitations_check(size_t idx) static inline int publish_av_info(struct fabric_info *info) { - int ret = 0; + int ret, err = 0; char epname[128]; size_t epnamelen = sizeof(epname); + char epname_key[32]; - ret = fi_getname((fid_t)shmem_transport_ofi_target_ep, epname, &epnamelen); - if (ret != 0 || (epnamelen > sizeof(epname))) { - RAISE_WARN_STR("fi_getname failed"); - return ret; - } + for (size_t nic_idx = 0; nic_idx < shmem_transport_ofi_num_nics; nic_idx++) { + ret = fi_getname((fid_t)shmem_transport_ofi_target_eps[nic_idx].ep, epname, &epnamelen); + if (ret != 0 || (epnamelen > sizeof(epname))) { + RAISE_WARN_STR("fi_getname failed"); + return ret; + } - ret = shmem_runtime_put("fi_epname", epname, epnamelen); - OFI_CHECK_RETURN_STR(ret, "shmem_runtime_put fi_epname failed"); + err = sprintf(epname_key, "fi_epname_%zu", nic_idx); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed - epname"); + } + ret = shmem_runtime_put(epname_key, epname, epnamelen); + OFI_CHECK_RETURN_STR(ret, "shmem_runtime_put fi_epname failed"); + } /* Note: we assume that the length of an address is the same for all * endpoints. This is safe for most HPC systems, but could be incorrect in @@ -1267,44 +1346,53 @@ int publish_av_info(struct fabric_info *info) } static inline -int populate_av(void) +int populate_av() { - int i, ret, err = 0; + int i, j, ret, err = 0; char *alladdrs = NULL; - alladdrs = malloc(shmem_internal_num_pes * shmem_transport_ofi_addrlen); + // Currently assuming all PEs have the same number of NICs (FIXME: This may not always be a reasonable assumption) + alladdrs = malloc(shmem_internal_num_pes * shmem_transport_ofi_addrlen * shmem_transport_ofi_num_nics); if (alladdrs == NULL) { RAISE_WARN_STR("Out of memory allocating 'alladdrs'"); return 1; } + char epname_key[32]; for (i = 0; i < shmem_internal_num_pes; i++) { - char *addr_ptr = alladdrs + i * shmem_transport_ofi_addrlen; - err = shmem_runtime_get(i, "fi_epname", addr_ptr, shmem_transport_ofi_addrlen); - if (err != 0) { - RAISE_ERROR_STR("Runtime get of 'fi_epname' failed"); - } - } + for (j = 0; j < shmem_transport_ofi_num_nics; j++) { + char *addr_ptr = alladdrs + (i * (shmem_transport_ofi_addrlen * shmem_transport_ofi_num_nics) + (j * shmem_transport_ofi_addrlen)); - ret = fi_av_insert(shmem_transport_ofi_avfd, - alladdrs, - shmem_internal_num_pes, - addr_table, - 0, - NULL); - if (ret != shmem_internal_num_pes) { - RAISE_WARN_STR("av insert failed"); - return ret; + err = sprintf(epname_key, "fi_epname_%d", j); + if (err < 0) { + RAISE_ERROR_STR("sprintf failed"); + } + err = shmem_runtime_get(i, epname_key, addr_ptr, shmem_transport_ofi_addrlen); + if (err != 0) { + RAISE_ERROR_STR("Runtime get of 'fi_epname' failed"); + } + } } for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) { + ret = fi_av_insert(shmem_transport_ofi_target_eps[idx].avfd, + alladdrs, + shmem_internal_num_pes * shmem_transport_ofi_num_nics, + addr_table, + 0, + NULL); + if (ret != (shmem_internal_num_pes * shmem_transport_ofi_num_nics)) { + RAISE_WARN_STR("av insert failed"); + return ret; + } + ret = fi_av_insert(shmem_transport_ctx_default.av[idx], alladdrs, - shmem_internal_num_pes, + shmem_internal_num_pes * shmem_transport_ofi_num_nics, addr_table, 0, NULL); - if (ret != shmem_internal_num_pes) { + if (ret != (shmem_internal_num_pes * shmem_transport_ofi_num_nics)) { RAISE_WARN_STR("av insert failed"); return ret; } @@ -1321,48 +1409,50 @@ int allocate_fabric_resources(struct fabric_info *info) int ret = 0; struct fi_av_attr av_attr = {0}; + shmem_transport_ofi_target_eps = (struct shmem_transport_ofi_target_ep*) malloc(shmem_transport_ofi_num_nics * sizeof(struct shmem_transport_ofi_target_ep)); /* fabric domain: define domain of resources physical and logical */ - ret = fi_fabric(info->p_info->fabric_attr, &shmem_transport_ofi_fabfd, NULL); - OFI_CHECK_RETURN_STR(ret, "fabric initialization failed"); - - DEBUG_MSG("OFI version: built %"PRIu32".%"PRIu32", cur. %"PRIu32".%"PRIu32"; " - "provider version: %"PRIu32".%"PRIu32"\n", - FI_MAJOR_VERSION, FI_MINOR_VERSION, - FI_MAJOR(fi_version()), FI_MINOR(fi_version()), - FI_MAJOR(info->p_info->fabric_attr->prov_version), - FI_MINOR(info->p_info->fabric_attr->prov_version)); + for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) { + ret = fi_fabric(provider_list[idx]->fabric_attr, &shmem_transport_ofi_target_eps[idx].fabfd, NULL); + OFI_CHECK_RETURN_STR(ret, "fabric initialization failed"); - if (FI_MAJOR_VERSION != FI_MAJOR(fi_version()) || - FI_MINOR_VERSION != FI_MINOR(fi_version())) { - RAISE_WARN_MSG("OFI version mismatch: built %"PRIu32".%"PRIu32", cur. %"PRIu32".%"PRIu32"\n", - FI_MAJOR_VERSION, FI_MINOR_VERSION, - FI_MAJOR(fi_version()), FI_MINOR(fi_version())); - } + DEBUG_MSG("OFI version: built %"PRIu32".%"PRIu32", cur. %"PRIu32".%"PRIu32"; " + "provider version: %"PRIu32".%"PRIu32"\n", + FI_MAJOR_VERSION, FI_MINOR_VERSION, + FI_MAJOR(fi_version()), FI_MINOR(fi_version()), + FI_MAJOR(info->p_info->fabric_attr->prov_version), + FI_MINOR(info->p_info->fabric_attr->prov_version)); + + if (FI_MAJOR_VERSION != FI_MAJOR(fi_version()) || + FI_MINOR_VERSION != FI_MINOR(fi_version())) { + RAISE_WARN_MSG("OFI version mismatch: built %"PRIu32".%"PRIu32", cur. %"PRIu32".%"PRIu32"\n", + FI_MAJOR_VERSION, FI_MINOR_VERSION, + FI_MAJOR(fi_version()), FI_MINOR(fi_version())); + } - /* access domain: define communication resource limits/boundary within - * fabric domain */ - ret = fi_domain(shmem_transport_ofi_fabfd, info->p_info, - &shmem_transport_ofi_domainfd,NULL); - OFI_CHECK_RETURN_STR(ret, "domain initialization failed"); + /* access domain: define communication resource limits/boundary within + * fabric domain */ + ret = fi_domain(shmem_transport_ofi_target_eps[idx].fabfd, provider_list[idx], + &shmem_transport_ofi_target_eps[idx].domainfd, NULL); + OFI_CHECK_RETURN_STR(ret, "domain initialization failed"); - /* AV table set-up for PE mapping */ + /* AV table set-up for PE mapping */ #ifdef USE_AV_MAP - av_attr.type = FI_AV_MAP; - addr_table = (fi_addr_t*) malloc(info->npes * sizeof(fi_addr_t)); + av_attr.type = FI_AV_MAP; + addr_table = (fi_addr_t*) malloc(info->npes * shmem_transport_ofi_num_nics * sizeof(fi_addr_t)); #else - /* open Address Vector and bind the AV to the domain */ - av_attr.type = FI_AV_TABLE; - addr_table = NULL; + /* open Address Vector and bind the AV to the domain */ + av_attr.type = FI_AV_TABLE; + addr_table = NULL; #endif - ret = fi_av_open(shmem_transport_ofi_domainfd, - &av_attr, - &shmem_transport_ofi_avfd, - NULL); - OFI_CHECK_RETURN_STR(ret, "AV creation failed"); - + ret = fi_av_open(shmem_transport_ofi_target_eps[idx].domainfd, + &av_attr, + &shmem_transport_ofi_target_eps[idx].avfd, + NULL); + OFI_CHECK_RETURN_STR(ret, "AV creation failed"); + } return ret; } @@ -1446,7 +1536,6 @@ struct fi_info *assign_nic_with_hwloc(struct fi_info *fabric, struct fi_info **p hwloc_bitmap_free(bindset); struct fi_info *provider = provider_list[shmem_internal_my_pe % num_close_nics]; - //free(prov_list); shmem_transport_ofi_num_nics = num_close_nics; return provider; @@ -1715,42 +1804,57 @@ static int shmem_transport_ofi_target_ep_init(void) { int ret = 0; - struct fabric_info* info = &shmem_transport_ofi_info; - info->p_info->ep_attr->tx_ctx_cnt = 0; - info->p_info->caps = FI_RMA | FI_ATOMIC | FI_REMOTE_READ | FI_REMOTE_WRITE; +// struct fabric_info* info = &shmem_transport_ofi_info; +// info->p_info->ep_attr->tx_ctx_cnt = 0; +// info->p_info->caps = FI_RMA | FI_ATOMIC | FI_REMOTE_READ | FI_REMOTE_WRITE; +//#if ENABLE_TARGET_CNTR +// info->p_info->caps |= FI_RMA_EVENT; +//#endif +// info->p_info->tx_attr->op_flags = 0; +// info->p_info->mode = 0; +// info->p_info->tx_attr->mode = 0; +// info->p_info->rx_attr->mode = 0; +// info->p_info->tx_attr->caps = FI_RMA | FI_ATOMIC; +// info->p_info->rx_attr->caps = info->p_info->caps; + + for (size_t nic_idx = 0; nic_idx < shmem_transport_ofi_num_nics; nic_idx++) { + struct fi_info* info = provider_list[nic_idx]; + info->ep_attr->tx_ctx_cnt = 0; + info->caps = FI_RMA | FI_ATOMIC | FI_REMOTE_READ | FI_REMOTE_WRITE; #if ENABLE_TARGET_CNTR - info->p_info->caps |= FI_RMA_EVENT; + info->caps |= FI_RMA_EVENT; #endif - info->p_info->tx_attr->op_flags = 0; - info->p_info->mode = 0; - info->p_info->tx_attr->mode = 0; - info->p_info->rx_attr->mode = 0; - info->p_info->tx_attr->caps = FI_RMA | FI_ATOMIC; - info->p_info->rx_attr->caps = info->p_info->caps; + info->tx_attr->op_flags = 0; + info->mode = 0; + info->tx_attr->mode = 0; + info->rx_attr->mode = 0; + info->tx_attr->caps = FI_RMA | FI_ATOMIC; + info->rx_attr->caps = info->caps; - ret = fi_endpoint(shmem_transport_ofi_domainfd, - info->p_info, &shmem_transport_ofi_target_ep, NULL); - OFI_CHECK_RETURN_MSG(ret, "target endpoint creation failed (%s)\n", fi_strerror(errno)); + ret = fi_endpoint(/*shmem_transport_ofi_domainfd*/ shmem_transport_ofi_target_eps[nic_idx].domainfd, + info, &shmem_transport_ofi_target_eps[nic_idx].ep, NULL); + OFI_CHECK_RETURN_MSG(ret, "target endpoint creation failed (%s)\n", fi_strerror(errno)); - /* Attach the address vector */ - ret = fi_ep_bind(shmem_transport_ofi_target_ep, &shmem_transport_ofi_avfd->fid, 0); - OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to target endpoint failed"); + /* Attach the address vector */ + ret = fi_ep_bind(shmem_transport_ofi_target_eps[nic_idx].ep, &shmem_transport_ofi_target_eps[nic_idx].avfd->fid, 0); + OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to target endpoint failed"); - struct fi_cq_attr cq_attr = {0}; + struct fi_cq_attr cq_attr = {0}; - ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr, - &shmem_transport_ofi_target_cq, NULL); - OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno)); + ret = fi_cq_open(shmem_transport_ofi_target_eps[nic_idx].domainfd, &cq_attr, + &shmem_transport_ofi_target_eps[nic_idx].cq, NULL); + OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno)); - ret = fi_ep_bind(shmem_transport_ofi_target_ep, - &shmem_transport_ofi_target_cq->fid, FI_TRANSMIT | FI_RECV); - OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to target endpoint failed"); + ret = fi_ep_bind(shmem_transport_ofi_target_eps[nic_idx].ep, + &shmem_transport_ofi_target_eps[nic_idx].cq->fid, FI_TRANSMIT | FI_RECV); + OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to target endpoint failed"); - ret = fi_enable(shmem_transport_ofi_target_ep); - OFI_CHECK_RETURN_STR(ret, "fi_enable on target endpoint failed"); + ret = fi_enable(shmem_transport_ofi_target_eps[nic_idx].ep); + OFI_CHECK_RETURN_STR(ret, "fi_enable on target endpoint failed"); - ret = allocate_recv_cntr_mr(); - if (ret) return ret; + ret = allocate_recv_cntr_mr(nic_idx); + if (ret) return ret; + } return 0; } @@ -1782,18 +1886,6 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id) struct fi_cq_attr cq_attr = {0}; cq_attr.format = FI_CQ_FORMAT_CONTEXT; - struct fabric_info* info = &shmem_transport_ofi_info; - - // Need to do these steps for all providers in provider_list? - //info->p_info->ep_attr->tx_ctx_cnt = shmem_transport_ofi_stx_max > 0 ? FI_SHARED_CONTEXT : 0; - //info->p_info->caps = FI_RMA | FI_WRITE | FI_READ | FI_ATOMIC | FI_RECV; - //info->p_info->tx_attr->op_flags = FI_DELIVERY_COMPLETE; - //info->p_info->mode = 0; - //info->p_info->tx_attr->mode = 0; - //info->p_info->rx_attr->mode = 0; - //info->p_info->tx_attr->caps = info->p_info->caps; - //info->p_info->rx_attr->caps = FI_RECV; /* to drive progress on the CQ */; - ctx->id = id; ctx->fabric = (struct fid_fabric **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_fabric *)); ctx->domain = (struct fid_domain **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_domain *)); @@ -1831,7 +1923,7 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id) ret = fi_fabric(provider_list[idx]->fabric_attr, &ctx->fabric[idx], NULL); OFI_CHECK_RETURN_STR(ret, "fabric initialization failed"); - ret = fi_domain(/*shmem_transport_ofi_fabfd*/ ctx->fabric[idx], provider_list[idx], + ret = fi_domain(ctx->fabric[idx], provider_list[idx], &ctx->domain[idx], NULL); OFI_CHECK_RETURN_STR(ret, "domain initialization failed"); @@ -1841,28 +1933,28 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id) #else av_attr.type = FI_AV_TABLE; #endif - ret = fi_av_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], + ret = fi_av_open(ctx->domain[idx], &av_attr, - /*&shmem_transport_ofi_avfd*/ &ctx->av[idx], + &ctx->av[idx], NULL); OFI_CHECK_RETURN_STR(ret, "AV creation failed"); - ret = fi_cntr_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], &cntr_put_attr, + ret = fi_cntr_open(ctx->domain[idx], &cntr_put_attr, &ctx->put_cntr[idx], NULL); OFI_CHECK_RETURN_MSG(ret, "put_cntr creation failed (%s)\n", fi_strerror(errno)); - ret = fi_cntr_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], &cntr_get_attr, + ret = fi_cntr_open(ctx->domain[idx], &cntr_get_attr, &ctx->get_cntr[idx], NULL); OFI_CHECK_RETURN_MSG(ret, "get_cntr creation failed (%s)\n", fi_strerror(errno)); - ret = fi_cq_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], &cq_attr, &ctx->cq[idx], NULL); + ret = fi_cq_open(ctx->domain[idx], &cq_attr, &ctx->cq[idx], NULL); if (ret && errno == FI_EMFILE) { DEBUG_STR("Context creation failed because of open files limit, consider increasing with 'ulimit' command"); } OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno)); - ret = fi_endpoint(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], - /*info->p_info*/ provider_list[idx], &ctx->ep[idx], NULL); + ret = fi_endpoint(ctx->domain[idx], + provider_list[idx], &ctx->ep[idx], NULL); OFI_CHECK_RETURN_MSG(ret, "ep creation failed (%s)\n", fi_strerror(errno)); } @@ -1993,7 +2085,7 @@ int shmem_transport_init(void) ret = shmem_transport_ofi_target_ep_init(); if (ret != 0) return ret; - ret = publish_mr_info(shmem_transport_ofi_info.p_info); + ret = publish_mr_info(); if (ret != 0) return ret; ret = publish_av_info(&shmem_transport_ofi_info); @@ -2064,7 +2156,7 @@ int shmem_transport_startup(void) } for (i = 0; i < shmem_transport_ofi_stx_max; i++) { - ret = fi_stx_context(shmem_transport_ofi_domainfd, NULL, + ret = fi_stx_context(shmem_transport_ofi_target_eps[idx].domainfd, NULL, &shmem_transport_ofi_stx_pool[idx][i].stx, NULL); OFI_CHECK_RETURN_MSG(ret, "STX context creation failed (%s)\n", fi_strerror(ret)); shmem_transport_ofi_stx_pool[idx][i].ref_cnt = 0; @@ -2318,62 +2410,67 @@ int shmem_transport_fini(void) } if (shmem_transport_ofi_stx_pool) free(shmem_transport_ofi_stx_pool); +#if !defined(ENABLE_MR_SCALABLE) + free(heap_keys); + free(data_keys); +#if !defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING) + free(heap_addrs); + free(data_addrs); +#endif +#endif + +#ifdef USE_FI_HMEM + free(external_heap_keys); + free(external_heap_addrs); +#endif + + for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) { #if defined(ENABLE_MR_SCALABLE) #if defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING) - ret = fi_close(&shmem_transport_ofi_target_mrfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Target MR close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].mrfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Target MR close failed (%s)\n", fi_strerror(errno)); #else - ret = fi_close(&shmem_transport_ofi_target_heap_mrfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Target heap MR close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].heap_mrfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Target heap MR close failed (%s)\n", fi_strerror(errno)); - ret = fi_close(&shmem_transport_ofi_target_data_mrfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].data_mrfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(errno)); #endif #else - free(shmem_transport_ofi_target_heap_keys); - free(shmem_transport_ofi_target_data_keys); - -#if !defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING) - free(shmem_transport_ofi_target_heap_addrs); - free(shmem_transport_ofi_target_data_addrs); -#endif + ret = fi_close(&shmem_transport_ofi_target_eps[idx].heap_mrfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Target heap MR close failed (%s)\n", fi_strerror(errno)); - ret = fi_close(&shmem_transport_ofi_target_heap_mrfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Target heap MR close failed (%s)\n", fi_strerror(errno)); - - ret = fi_close(&shmem_transport_ofi_target_data_mrfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].data_mrfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(errno)); #endif #ifdef USE_FI_HMEM - if (shmem_external_heap_pre_initialized) { - free(shmem_transport_ofi_external_heap_keys); - free(shmem_transport_ofi_external_heap_addrs); - ret = fi_close(&shmem_transport_ofi_external_heap_mrfd->fid); - OFI_CHECK_ERROR_MSG(ret, "External heap MR close failed (%s)\n", fi_strerror(errno)); - } + if (shmem_external_heap_pre_initialized) { + ret = fi_close(&shmem_transport_ofi_target_eps[idx].external_heap_mrfd->fid); + OFI_CHECK_ERROR_MSG(ret, "External heap MR close failed (%s)\n", fi_strerror(errno)); + } #endif + } for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) { + ret = fi_close(&shmem_transport_ofi_target_eps[idx].ep->fid); + OFI_CHECK_ERROR_MSG(ret, "Target endpoint close failed (%s)\n", fi_strerror(errno)); - ret = fi_close(&shmem_transport_ofi_target_ep->fid); - OFI_CHECK_ERROR_MSG(ret, "Target endpoint close failed (%s)\n", fi_strerror(errno)); - - ret = fi_close(&shmem_transport_ofi_target_cq->fid); - OFI_CHECK_ERROR_MSG(ret, "Target CQ close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].cq->fid); + OFI_CHECK_ERROR_MSG(ret, "Target CQ close failed (%s)\n", fi_strerror(errno)); #if ENABLE_TARGET_CNTR - ret = fi_close(&shmem_transport_ofi_target_cntrfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Target CT close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].cntrfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Target CT close failed (%s)\n", fi_strerror(errno)); #endif - ret = fi_close(&shmem_transport_ofi_avfd->fid); - OFI_CHECK_ERROR_MSG(ret, "AV close failed (%s)\n", fi_strerror(errno)); - - ret = fi_close(&shmem_transport_ofi_domainfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Domain close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].avfd->fid); + OFI_CHECK_ERROR_MSG(ret, "AV close failed (%s)\n", fi_strerror(errno)); - ret = fi_close(&shmem_transport_ofi_fabfd->fid); - OFI_CHECK_ERROR_MSG(ret, "Fabric close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].domainfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Domain close failed (%s)\n", fi_strerror(errno)); + ret = fi_close(&shmem_transport_ofi_target_eps[idx].fabfd->fid); + OFI_CHECK_ERROR_MSG(ret, "Fabric close failed (%s)\n", fi_strerror(errno)); + } #ifdef USE_AV_MAP free(addr_table); #endif diff --git a/src/transport_ofi.h b/src/transport_ofi.h index 8b2049e99..caf1dd8c2 100644 --- a/src/transport_ofi.h +++ b/src/transport_ofi.h @@ -41,26 +41,55 @@ extern size_t shmem_transport_ofi_num_nics; #define ENABLE_TARGET_CNTR 0 #endif -#if ENABLE_TARGET_CNTR -extern struct fid_cntr* shmem_transport_ofi_target_cntrfd; +struct shmem_transport_ofi_target_ep { + struct fid_fabric* fabfd; + struct fid_domain* domainfd; + struct fid_av* avfd; + struct fid_ep* ep; + //#if ENABLE_MANUAL_PROGRESS + struct fid_cq* cq; + //#endif + #if ENABLE_TARGET_CNTR + struct fid_cntr* cntrfd; + #endif + #ifdef ENABLE_MR_SCALABLE + #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING + struct fid_mr* mrfd; + #else /* !ENABLE_REMOTE_VIRTUAL_ADDRESSING */ + struct fid_mr* heap_mrfd; + struct fid_mr* data_mrfd; + #endif + #else /* !ENABLE_MR_SCALABLE */ + struct fid_mr* heap_mrfd; + struct fid_mr* data_mrfd; + //uint64_t* heap_keys; + //uint64_t* data_keys; + #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING + int use_absolute_address; + #else + //uint8_t** heap_addrs; + //uint8_t** data_addrs; + #endif /* ENABLE_REMOTE_VIRTUAL_ADDRESSING */ + #endif /* ENABLE_MR_SCALABLE */ + #ifdef USE_FI_HMEM + struct fid_mr* external_heap_mrfd; + //uint64_t* external_heap_keys; + //uint8_t** external_heap_addrs; + #endif +}; +extern struct shmem_transport_ofi_target_ep* shmem_transport_ofi_target_eps; + +#ifndef ENABLE_MR_SCALABLE +extern uint64_t* heap_keys; +extern uint64_t* data_keys; +#ifndef ENABLE_REMOTE_VIRTUAL_ADDRESSING +extern uint8_t** heap_addrs; +extern uint8_t** data_addrs; #endif -#if ENABLE_MANUAL_PROGRESS -extern struct fid_cq* shmem_transport_ofi_target_cq; #endif -#ifndef ENABLE_MR_SCALABLE -extern uint64_t* shmem_transport_ofi_target_heap_keys; -extern uint64_t* shmem_transport_ofi_target_data_keys; -#ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING -extern int shmem_transport_ofi_use_absolute_address; -#else -extern uint8_t** shmem_transport_ofi_target_heap_addrs; -extern uint8_t** shmem_transport_ofi_target_data_addrs; -#endif /* ENABLE_REMOTE_VIRTUAL_ADDRESSING */ -#endif /* ENABLE_MR_SCALABLE */ - #ifdef USE_FI_HMEM -extern uint64_t* shmem_transport_ofi_external_heap_keys; -extern uint8_t** shmem_transport_ofi_external_heap_addrs; +extern uint64_t* external_heap_keys; +extern uint8_t** external_heap_addrs; #endif extern struct fid_mr* shmem_transport_ofi_mrfd_list[3]; @@ -85,21 +114,21 @@ extern pthread_mutex_t shmem_transport_ofi_progress_lock; } \ } while (0) -#define OFI_CTX_CHECK_ERROR(ctx, /* ssize_t */ err) \ - do { \ - if ((err) == -FI_EAVAIL) { \ - struct fi_cq_err_entry e = {0}; \ - ssize_t ret = fi_cq_readerr((ctx)->cq, (void *)&e, 0); /* FIX */ \ - if (ret == 1) { \ - const char *errmsg = fi_cq_strerror((ctx)->cq /* FIX */, e.prov_errno, \ - e.err_data, NULL, 0); \ - RAISE_ERROR_MSG("Error in operation: %s\n", errmsg); \ - } else { \ - RAISE_ERROR_MSG("Error reading from CQ (%zd)\n", ret); \ - } \ - } else if (err) { \ - RAISE_ERROR_MSG("OFI error %zd: %s\n", err, fi_strerror(err)); \ - } \ +#define OFI_CTX_CHECK_ERROR(ctx, nic_idx, /* ssize_t */ err) \ + do { \ + if ((err) == -FI_EAVAIL) { \ + struct fi_cq_err_entry e = {0}; \ + ssize_t ret = fi_cq_readerr((ctx)->cq[nic_idx], (void *)&e, 0); /* FIXED */ \ + if (ret == 1) { \ + const char *errmsg = fi_cq_strerror((ctx)->cq[nic_idx] /* FIXED */, e.prov_errno, \ + e.err_data, NULL, 0); \ + RAISE_ERROR_MSG("Error in operation: %s\n", errmsg); \ + } else { \ + RAISE_ERROR_MSG("Error reading from CQ (%zd)\n", ret); \ + } \ + } else if (err) { \ + RAISE_ERROR_MSG("OFI error %zd: %s\n", err, fi_strerror(err)); \ + } \ } while (0) #define OFI_CHECK_ERROR_MSG(ret, ...) \ @@ -174,7 +203,7 @@ int shmem_transport_ofi_get_mr_desc_index(const void *addr) { #ifdef ENABLE_MR_SCALABLE static inline void shmem_transport_ofi_get_mr(const void *addr, int dest_pe, - uint8_t **mr_addr, uint64_t *key) { + uint8_t **mr_addr, uint64_t *key, size_t nic_idx) { #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING *key = 0; *mr_addr = (uint8_t*) addr; @@ -202,31 +231,31 @@ void shmem_transport_ofi_get_mr(const void *addr, int dest_pe, #else static inline void shmem_transport_ofi_get_mr(const void *addr, int dest_pe, - uint8_t **mr_addr, uint64_t *key) { + uint8_t **mr_addr, uint64_t *key, size_t nic_idx) { if ((void*) addr >= shmem_internal_data_base && (uint8_t*) addr < (uint8_t*) shmem_internal_data_base + shmem_internal_data_length) { - *key = shmem_transport_ofi_target_data_keys[dest_pe]; + *key = data_keys[(dest_pe * shmem_transport_ofi_num_nics) + nic_idx]; /* FIXED? */ #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING if (shmem_transport_ofi_use_absolute_address) *mr_addr = (uint8_t *) addr; else *mr_addr = (void *) ((uint8_t *) addr - (uint8_t *) shmem_internal_data_base); #else - *mr_addr = shmem_transport_ofi_target_data_addrs[dest_pe] + + *mr_addr = data_addrs[(dest_pe * shmem_transport_ofi_num_nics) + nic_idx] + /* FIXED? */ ((uint8_t *) addr - (uint8_t *) shmem_internal_data_base); #endif } else if ((void*) addr >= shmem_internal_heap_base && (uint8_t*) addr < (uint8_t*) shmem_internal_heap_base + shmem_internal_heap_length) { - *key = shmem_transport_ofi_target_heap_keys[dest_pe]; + *key = heap_keys[(dest_pe * shmem_transport_ofi_num_nics) + nic_idx]; /* FIXED? */ #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING if (shmem_transport_ofi_use_absolute_address) *mr_addr = (uint8_t *) addr; else *mr_addr = (void *) ((uint8_t *) addr - (uint8_t *) shmem_internal_heap_base); #else - *mr_addr = shmem_transport_ofi_target_heap_addrs[dest_pe] + + *mr_addr = heap_addrs[(dest_pe * shmem_transport_ofi_num_nics) + nic_idx] + /* FIXED? */ ((uint8_t *) addr - (uint8_t *) shmem_internal_heap_base); #endif } @@ -234,8 +263,8 @@ void shmem_transport_ofi_get_mr(const void *addr, int dest_pe, else if (shmem_external_heap_pre_initialized) { if ((void*) addr >= shmem_external_heap_base && (uint8_t*) addr < (uint8_t*) shmem_external_heap_base + shmem_external_heap_length) { - *key = shmem_transport_ofi_external_heap_keys[dest_pe]; - *mr_addr = shmem_transport_ofi_external_heap_addrs[dest_pe] + + *key = external_heap_keys[(dest_pe * shmem_transport_ofi_num_nics) + nic_idx]; /* FIXED? */ + *mr_addr = external_heap_addrs[(dest_pe * shmem_transport_ofi_num_nics) + nic_idx] + /* FIXED? */ ((uint8_t *) addr - (uint8_t *) shmem_external_heap_base); } } @@ -271,9 +300,9 @@ typedef enum fi_op shm_internal_op_t; extern fi_addr_t *addr_table; #ifdef USE_AV_MAP -#define GET_DEST(dest) ((fi_addr_t)(addr_table[(dest)])) +#define GET_DEST(dest, nic_idx) ((fi_addr_t)(addr_table[(dest * shmem_transport_ofi_num_nics) + nic_idx])) #else -#define GET_DEST(dest) ((fi_addr_t)(dest)) +#define GET_DEST(dest, nic_idx) ((fi_addr_t)((dest * shmem_transport_ofi_num_nics) + nic_idx)) #endif #ifdef USE_FI_HMEM @@ -344,7 +373,6 @@ struct shmem_transport_ctx_t { typedef struct shmem_transport_ctx_t shmem_transport_ctx_t; extern shmem_transport_ctx_t shmem_transport_ctx_default; -extern struct fid_ep* shmem_transport_ofi_target_ep; #ifdef USE_CTX_LOCK #define SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx) \ @@ -383,14 +411,14 @@ extern struct fid_ep* shmem_transport_ofi_target_ep; } while (0) static inline -void shmem_transport_probe(void) +void shmem_transport_probe(size_t nic_idx) { #if defined(ENABLE_MANUAL_PROGRESS) # ifdef USE_THREAD_COMPLETION if (0 == pthread_mutex_trylock(&shmem_transport_ofi_progress_lock)) { # endif struct fi_cq_entry buf; - int ret = fi_cq_read(shmem_transport_ofi_target_cq, &buf, 1); + int ret = fi_cq_read(shmem_transport_ofi_target_eps[nic_idx].cq, &buf, 1); if (ret == 1) RAISE_WARN_STR("Unexpected event"); # ifdef USE_THREAD_COMPLETION @@ -439,7 +467,7 @@ void shmem_transport_ofi_drain_cq(shmem_transport_ctx_t *ctx, size_t nic_idx) } else if (ret < 0) { - OFI_CTX_CHECK_ERROR(ctx, ret); + OFI_CTX_CHECK_ERROR(ctx, nic_idx, ret); } else { @@ -510,11 +538,10 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx, size_t nic_idx) long poll_count = 0; while (poll_count < shmem_transport_ofi_put_poll_limit || shmem_transport_ofi_put_poll_limit < 0) { - success = fi_cntr_read(ctx->put_cntr[nic_idx]); /* FIXED? */ fail = fi_cntr_readerr(ctx->put_cntr[nic_idx]); /* FIXED? */ cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ - shmem_transport_probe(); + shmem_transport_probe(nic_idx); if (success < cnt && fail == 0) { SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); @@ -534,7 +561,7 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx, size_t nic_idx) cnt = cnt_new; ssize_t ret = fi_cntr_wait(ctx->put_cntr[nic_idx], cnt, -1); /* FIXED? */ cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ - OFI_CTX_CHECK_ERROR(ctx, ret); + OFI_CTX_CHECK_ERROR(ctx, nic_idx, ret); } while (cnt < cnt_new); shmem_internal_assert(cnt == cnt_new); @@ -595,7 +622,7 @@ int try_again(shmem_transport_ctx_t *ctx, const int ret, uint64_t *polled, size_ } } - shmem_transport_probe(); + shmem_transport_probe(nic_idx); (*polled)++; if ((*polled) <= shmem_transport_ofi_max_poll) { @@ -607,7 +634,7 @@ int try_again(shmem_transport_ctx_t *ctx, const int ret, uint64_t *polled, size_ } } else { - OFI_CTX_CHECK_ERROR(ctx, (ssize_t) ret); + OFI_CTX_CHECK_ERROR(ctx, nic_idx, (ssize_t) ret); } } @@ -625,7 +652,7 @@ void shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_internal_assert(len <= shmem_transport_ofi_max_buffered_send); @@ -637,7 +664,7 @@ void shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const ret = fi_inject_write(ctx->ep[nic_idx], /* FIXED? */ source, len, - GET_DEST(dst), + GET_DEST(dst, nic_idx), (uint64_t) addr, key); @@ -655,7 +682,7 @@ void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, con uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); uint8_t *frag_source = (uint8_t *) source; uint64_t frag_target = (uint64_t) addr; @@ -675,7 +702,7 @@ void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, con ret = fi_write(ctx->ep[nic_idx], frag_source, frag_len, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(source)), - GET_DEST(dst), frag_target, + GET_DEST(dst, nic_idx), frag_target, key, NULL); } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ @@ -705,7 +732,7 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_transport_ofi_bounce_buffer_t *buff = create_bounce_buffer(ctx, source, len); @@ -717,7 +744,7 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void .msg_iov = &msg_iov, .desc = GET_MR_DESC_ADDR(shmem_transport_ofi_get_mr_desc_index(source)), .iov_count = 1, - .addr = GET_DEST(dst), + .addr = GET_DEST(dst, nic_idx), .rma_iov = &rma_iov, .rma_iov_count = 1, .context = buff, @@ -744,13 +771,13 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); if (len <= shmem_transport_ofi_max_buffered_send) { uint8_t *src_buf = (uint8_t *) source; SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr); + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ const struct iovec msg_iov = { .iov_base = src_buf, @@ -765,7 +792,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co .msg_iov = &msg_iov, .desc = GET_MR_DESC_ADDR(shmem_transport_ofi_get_mr_desc_index(source)), .iov_count = 1, - .addr = GET_DEST(dst), + .addr = GET_DEST(dst, nic_idx), .rma_iov = &rma_iov, .rma_iov_count = 1, .context = src_buf, @@ -795,7 +822,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co .msg_iov = &msg_iov, .desc = GET_MR_DESC_ADDR(shmem_transport_ofi_get_mr_desc_index(source)), .iov_count = 1, - .addr = GET_DEST(dst), + .addr = GET_DEST(dst, nic_idx), .rma_iov = &rma_iov, .rma_iov_count = 1, .context = frag_source, @@ -840,7 +867,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co #endif /* Transmit the signal */ - shmem_transport_ofi_get_mr(sig_addr, pe, &addr, &key); + shmem_transport_ofi_get_mr(sig_addr, pe, &addr, &key, nic_idx); polled = 0; ret = 0; int atomic_op = (sig_op == SHMEM_SIGNAL_ADD) ? FI_SUM : FI_ATOMIC_WRITE; @@ -861,7 +888,7 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co .msg_iov = &msg_iov_signal, .desc = GET_MR_DESC_ADDR(shmem_transport_ofi_get_mr_desc_index((void *) &signal)), .iov_count = 1, - .addr = GET_DEST(dst), + .addr = GET_DEST(dst, nic_idx), .rma_iov = &rma_iov_signal, .rma_iov_count = 1, .datatype = FI_UINT64, @@ -914,7 +941,7 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(source, pe, &addr, &key); + shmem_transport_ofi_get_mr(source, pe, &addr, &key, nic_idx); SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); if (len <= shmem_transport_ofi_max_msg_size) { @@ -925,7 +952,7 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s target, len, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(target)), - GET_DEST(dst), + GET_DEST(dst, nic_idx), (uint64_t) addr, key, NULL); @@ -947,7 +974,7 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s ret = fi_read(ctx->ep[nic_idx], frag_target, frag_len, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(target)), - GET_DEST(dst), frag_source, + GET_DEST(dst, nic_idx), frag_source, key, NULL); } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ @@ -981,7 +1008,7 @@ void shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t nic_idx) fail = fi_cntr_readerr(ctx->get_cntr[nic_idx]); cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_cntr[nic_idx]); - shmem_transport_probe(); + shmem_transport_probe(nic_idx); if (success < cnt && fail == 0) { SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); @@ -1000,7 +1027,7 @@ void shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t nic_idx) cnt = cnt_new; ssize_t ret = fi_cntr_wait(ctx->get_cntr[nic_idx], cnt, -1); cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_cntr[nic_idx]); - OFI_CTX_CHECK_ERROR(ctx, ret); + OFI_CTX_CHECK_ERROR(ctx, nic_idx, ret); } while (cnt < cnt_new); shmem_internal_assert(cnt == cnt_new); @@ -1019,7 +1046,7 @@ void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_internal_assert(len <= sizeof(double _Complex)); shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); @@ -1031,7 +1058,7 @@ void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const .msg_iov = &sourcev, .desc = GET_MR_DESC_ADDR(shmem_transport_ofi_get_mr_desc_index(source)), .iov_count = 1, - .addr = GET_DEST(dst), + .addr = GET_DEST(dst, nic_idx), .rma_iov = &rmav, .rma_iov_count = 1, .datatype = SHMEM_TRANSPORT_DTYPE(datatype), @@ -1076,7 +1103,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_internal_assert(len <= sizeof(double _Complex)); shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); @@ -1093,7 +1120,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void NULL, dest, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(dest)), - GET_DEST(dst), + GET_DEST(dst, nic_idx), (uint64_t) addr, key, SHMEM_TRANSPORT_DTYPE(datatype), @@ -1115,7 +1142,7 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_internal_assert(len <= sizeof(double _Complex)); shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); @@ -1132,7 +1159,7 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void NULL, dest, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(dest)), - GET_DEST(dst), + GET_DEST(dst, nic_idx), (uint64_t) addr, key, SHMEM_TRANSPORT_DTYPE(datatype), @@ -1153,7 +1180,7 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); @@ -1164,7 +1191,7 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void ret = fi_inject_atomic(ctx->ep[nic_idx], /* FIXED? */ source, 1, - GET_DEST(dst), + GET_DEST(dst, nic_idx), (uint64_t) addr, key, SHMEM_TRANSPORT_DTYPE(datatype), @@ -1201,7 +1228,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi datatype, op); } - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); if ( full_len <= MIN(shmem_transport_ofi_max_buffered_send, max_atomic_size)) { @@ -1214,7 +1241,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi ret = fi_inject_atomic(ctx->ep[nic_idx], /* FIXED? */ source, len, - GET_DEST(dst), + GET_DEST(dst, nic_idx), (uint64_t) addr, key, dt, @@ -1237,7 +1264,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi .msg_iov = &msg_iov, .desc = GET_MR_DESC_ADDR(shmem_transport_ofi_get_mr_desc_index(source)), .iov_count = 1, - .addr = GET_DEST(dst), + .addr = GET_DEST(dst, nic_idx), .rma_iov = &rma_iov, .rma_iov_count = 1, .datatype = dt, @@ -1264,7 +1291,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi (sent*SHMEM_Dtsize[dt])), chunksize, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(source)), - GET_DEST(dst), + GET_DEST(dst, nic_idx), ((uint64_t) addr + (sent*SHMEM_Dtsize[dt])), key, @@ -1294,7 +1321,7 @@ void shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_internal_assert(len <= sizeof(double _Complex)); shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); @@ -1305,7 +1332,7 @@ void shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, .msg_iov = &sourcev, .desc = GET_MR_DESC_ADDR(shmem_transport_ofi_get_mr_desc_index(source)), .iov_count = 1, - .addr = GET_DEST(dst), + .addr = GET_DEST(dst, nic_idx), .rma_iov = &rmav, .rma_iov_count = 1, .datatype = SHMEM_TRANSPORT_DTYPE(datatype), @@ -1349,7 +1376,7 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, uint64_t key; uint8_t *addr; - shmem_transport_ofi_get_mr(target, pe, &addr, &key); + shmem_transport_ofi_get_mr(target, pe, &addr, &key, nic_idx); shmem_internal_assert(len <= sizeof(double _Complex)); shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); @@ -1364,7 +1391,7 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(source)), dest, GET_MR_DESC(shmem_transport_ofi_get_mr_desc_index(dest)), - GET_DEST(dst), + GET_DEST(dst, nic_idx), (uint64_t) addr, key, SHMEM_TRANSPORT_DTYPE(datatype), @@ -1508,7 +1535,7 @@ uint64_t shmem_transport_received_cntr_get(void) shmem_internal_assert(shmem_internal_thread_level == SHMEM_THREAD_SINGLE); /* NOTE-MT: This is only reachable in single-threaded runs, otherwise * we would need a mutex to support FI_THREAD_COMPLETION builds. */ - return fi_cntr_read(shmem_transport_ofi_target_cntrfd); + return fi_cntr_read(shmem_transport_ofi_target_eps[0].cntrfd); /* FIX */ #else RAISE_ERROR_STR("OFI transport configured for hard polling"); return 0; @@ -1522,7 +1549,7 @@ void shmem_transport_received_cntr_wait(uint64_t ge_val) shmem_internal_assert(shmem_internal_thread_level == SHMEM_THREAD_SINGLE); /* NOTE-MT: This is only reachable in single-threaded runs, otherwise * we would need a mutex to support FI_THREAD_COMPLETION builds. */ - int ret = fi_cntr_wait(shmem_transport_ofi_target_cntrfd, ge_val, -1); + int ret = fi_cntr_wait(shmem_transport_ofi_target_eps[0].cntrfd, ge_val, -1); /* FIX */ OFI_CHECK_ERROR(ret); #else @@ -1599,7 +1626,7 @@ uint64_t shmem_transport_pcntr_get_completed_target(void) # ifdef USE_THREAD_COMPLETION if (0 == pthread_mutex_lock(&shmem_transport_ofi_progress_lock)) { # endif - cnt = fi_cntr_read(shmem_transport_ofi_target_cntrfd); + cnt = fi_cntr_read(shmem_transport_ofi_target_eps[0].cntrfd); /* FIX */ # ifdef USE_THREAD_COMPLETION pthread_mutex_unlock(&shmem_transport_ofi_progress_lock); } diff --git a/src/transport_portals4.h b/src/transport_portals4.h index b66fa5210..c93bb6240 100644 --- a/src/transport_portals4.h +++ b/src/transport_portals4.h @@ -244,7 +244,7 @@ int shmem_transport_fini(void); static inline void shmem_transport_get_wait(shmem_transport_ctx_t*, size_t idx); -static inline void shmem_transport_probe(void) { +static inline void shmem_transport_probe(size_t nic_idx) { return; } @@ -762,7 +762,7 @@ shmem_transport_swap_nbi(shmem_transport_ctx_t* ctx, void *target, int pe, ptl_datatype_t datatype, size_t nic_idx) { /* transport_swap already buffers the source argument */ - shmem_transport_swap(ctx, target, source, dest, len, pe, datatype); + shmem_transport_swap(ctx, target, source, dest, len, pe, datatype, nic_idx); } @@ -814,7 +814,7 @@ shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, ptl_datatype_t datatype, size_t nic_idx) { /* transport_cswap already buffers the source and operand arguments */ - shmem_transport_cswap(ctx, target, source, dest, operand, len, pe, datatype); + shmem_transport_cswap(ctx, target, source, dest, operand, len, pe, datatype, nic_idx); } @@ -894,7 +894,7 @@ shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void *sou static inline void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, - ptl_op_t op, ptl_datatype_t datatype, long *completion) + ptl_op_t op, ptl_datatype_t datatype, long *completion, size_t nic_idx) { int ret; ptl_pt_index_t pt; @@ -1064,7 +1064,7 @@ shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, ptl_datatype_t datatype, size_t nic_idx) { /* transport_fetch_atomic already buffers the source argument */ - shmem_transport_fetch_atomic(ctx, target, source, dest, len, pe, op, datatype); + shmem_transport_fetch_atomic(ctx, target, source, dest, len, pe, op, datatype, nic_idx); } diff --git a/src/transport_ucx.c b/src/transport_ucx.c index 5d2c9f265..f6cb9b833 100644 --- a/src/transport_ucx.c +++ b/src/transport_ucx.c @@ -72,7 +72,7 @@ static int shmem_transport_ucx_progress_thread_enabled = 1; static void * shmem_transport_ucx_progress_thread_func(void *arg) { while (__atomic_load_n(&shmem_transport_ucx_progress_thread_enabled, __ATOMIC_ACQUIRE)) { - shmem_transport_probe(); + shmem_transport_probe(0); usleep(shmem_internal_params.PROGRESS_INTERVAL); } diff --git a/src/transport_ucx.h b/src/transport_ucx.h index fb1b3299f..7e74710e6 100644 --- a/src/transport_ucx.h +++ b/src/transport_ucx.h @@ -83,7 +83,7 @@ int shmem_transport_fini(void); static inline void -shmem_transport_probe(void) +shmem_transport_probe(size_t nic_idx) { ucp_worker_progress(shmem_transport_ucp_worker); } @@ -93,14 +93,14 @@ ucs_status_t shmem_transport_ucx_complete_op(ucs_status_ptr_t req) { if (req == NULL) { /* All calls to complete_op must generate progress to avoid deadlock * in application-level polling loops */ - shmem_transport_probe(); + shmem_transport_probe(0); return UCS_OK; } else if (UCS_PTR_IS_ERR(req)) { return UCS_PTR_STATUS(req); } else { ucs_status_t status; do { - shmem_transport_probe(); + shmem_transport_probe(0); status = ucp_request_check_status(req); } while (status == UCS_INPROGRESS); ucp_request_free(req); @@ -252,7 +252,7 @@ shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void static inline void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, long *completion) + int pe, long *completion, size_t nic_idx) { ucs_status_t status; ucp_rkey_h rkey; @@ -278,7 +278,7 @@ void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) { while (__atomic_load_n(completion, __ATOMIC_ACQUIRE) > 0) - shmem_transport_probe(); + shmem_transport_probe(0); } static inline @@ -392,7 +392,7 @@ shmem_transport_swap_nbi(shmem_transport_ctx_t* ctx, void *target, const void *s &shmem_transport_ucx_cb_nop); /* Manual progress to avoid deadlock for application-level polling */ - shmem_transport_probe(); + shmem_transport_probe(0); ucs_status_t status = shmem_transport_ucx_release_op(pstatus); UCX_CHECK_STATUS_INPROGRESS(status); @@ -475,7 +475,7 @@ shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const void * &shmem_transport_ucx_cb_nop); /* Manual progress to avoid deadlock for application-level polling */ - shmem_transport_probe(); + shmem_transport_probe(0); ucs_status_t status = shmem_transport_ucx_release_op(pstatus); UCX_CHECK_STATUS_INPROGRESS(status); @@ -520,7 +520,7 @@ shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void *sou static inline void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, long *completion) + int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, long *completion, size_t nic_idx) { /* Used only by reductions, currently redirected to softwre reductions via * the shmem_transport_atomic_supported query below. */ @@ -604,7 +604,7 @@ shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, const &shmem_transport_ucx_cb_nop); /* Manual progress to avoid deadlock for application-level polling */ - shmem_transport_probe(); + shmem_transport_probe(0); ucs_status_t status = shmem_transport_ucx_release_op(pstatus); UCX_CHECK_STATUS_INPROGRESS(status); @@ -690,15 +690,15 @@ shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *sour while (!done) { uint32_t v; - shmem_transport_atomic_fetch(ctx, &v, target, len, pe, datatype); + shmem_transport_atomic_fetch(ctx, &v, target, len, pe, datatype, nic_idx); uint32_t new = (v & ~*(uint32_t *)mask) | (*(uint32_t *)source & *(uint32_t *)mask); - shmem_transport_cswap(ctx, target, &new, dest, &v, len, pe, datatype); + shmem_transport_cswap(ctx, target, &new, dest, &v, len, pe, datatype, nic_idx); if (*(uint32_t *)dest == v) done = 1; /* Manual progress to avoid deadlock for application-level polling */ - shmem_transport_probe(); + shmem_transport_probe(0); } }