diff --git a/configure.ac b/configure.ac index 99112ca9a..76ec09546 100755 --- a/configure.ac +++ b/configure.ac @@ -208,6 +208,18 @@ AS_CASE([$enable_ofi_mr], AC_DEFINE([ENABLE_MR_SCALABLE], [1], [If defined, the OFI transport will use FI_MR_SCALABLE])], [AC_MSG_ERROR([Invalid OFI memory registration mode: $enable_ofi_mr])]) +AC_ARG_ENABLE([mr-endpoint], + [AC_HELP_STRING([--enable-mr-endpoint], + [Use FI_MR_ENDPOINT to enable cxi provider. (default: disabled)])]) +AS_IF([test "$enable_mr_endpoint" = "yes"], + [AC_DEFINE([ENABLE_MR_ENDPOINT], [1], [If defined, the OFI transport will use FI_MR_ENDPOINT])]) + +AC_ARG_ENABLE([ofi-manual-progress], + [AC_HELP_STRING([--enable-ofi-manual-progress], + [Use FI_MANUAL_PROGRESS for data progress control mode. (default: disabled)])]) +AS_IF([test "$enable_ofi_manual_progress" = "yes"], + [AC_DEFINE([ENABLE_FI_MANUAL_PROGRESS], [1], [If defined, the OFI will use FI_MANUAL_PROGRESS as data progress mode. This is currently required for cxi provider.])]) + AC_ARG_ENABLE([max-teams], [AC_HELP_STRING([--enable-max-teams=NUMBER], [Default value for the maximum number of teams allowed (default: 10)])]) diff --git a/src/shmem_synchronization.h b/src/shmem_synchronization.h index 7a3eafc31..0270d6d7b 100644 --- a/src/shmem_synchronization.h +++ b/src/shmem_synchronization.h @@ -174,7 +174,10 @@ shmem_internal_fence(shmem_ctx_t ctx) } \ } while(0) -#if defined(ENABLE_HARD_POLLING) +/* Polling based wait is required for providers that need + * manual progress, i.e., cxi. This is enabled through + * ENABLE_FI_MANUAL_PROGRESS */ +#if defined(ENABLE_HARD_POLLING) || defined(ENABLE_FI_MANUAL_PROGRESS) #define SHMEM_INTERNAL_WAIT_UNTIL(var, cond, value) \ SHMEM_WAIT_UNTIL_POLL(var, cond, value) #define SHMEM_INTERNAL_SIGNAL_WAIT_UNTIL(var, cond, value, sat_value) \ diff --git a/src/transport_ofi.c b/src/transport_ofi.c index 46a67bb19..7e605f369 100644 --- a/src/transport_ofi.c +++ b/src/transport_ofi.c @@ -678,15 +678,17 @@ int allocate_recv_cntr_mr(void) /* Register separate data and heap segments using keys 0 and 1, * respectively. In MR_BASIC_MODE, the keys are ignored and selected by * the provider. */ + uint64_t key = 1; ret = fi_mr_reg(shmem_transport_ofi_domainfd, shmem_internal_heap_base, shmem_internal_heap_length, - FI_REMOTE_READ | FI_REMOTE_WRITE, 0, 1ULL, flags, + FI_REMOTE_READ | FI_REMOTE_WRITE, 0, key, flags, &shmem_transport_ofi_target_heap_mrfd, NULL); OFI_CHECK_RETURN_STR(ret, "target memory (heap) registration failed"); + key = 0; ret = fi_mr_reg(shmem_transport_ofi_domainfd, shmem_internal_data_base, shmem_internal_data_length, - FI_REMOTE_READ | FI_REMOTE_WRITE, 0, 0ULL, flags, + FI_REMOTE_READ | FI_REMOTE_WRITE, 0, key, flags, &shmem_transport_ofi_target_data_mrfd, NULL); OFI_CHECK_RETURN_STR(ret, "target memory (data) registration failed"); @@ -702,6 +704,28 @@ int allocate_recv_cntr_mr(void) FI_REMOTE_WRITE); OFI_CHECK_RETURN_STR(ret, "target CNTR binding to data MR failed"); +#ifdef ENABLE_MR_ENDPOINT + if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_ENDPOINT) { + ret = fi_ep_bind(shmem_transport_ofi_target_ep, + &shmem_transport_ofi_target_cntrfd->fid, FI_REMOTE_WRITE); + OFI_CHECK_RETURN_STR(ret, "target CNTR binding to target EP failed"); + + ret = fi_mr_bind(shmem_transport_ofi_target_heap_mrfd, + &shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE); + OFI_CHECK_RETURN_STR(ret, "target EP binding to heap MR failed"); + + ret = fi_mr_enable(shmem_transport_ofi_target_heap_mrfd); + OFI_CHECK_RETURN_STR(ret, "target heap MR enable failed"); + + ret = fi_mr_bind(shmem_transport_ofi_target_data_mrfd, + &shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE); + OFI_CHECK_RETURN_STR(ret, "target EP binding to data MR failed"); + + ret = fi_mr_enable(shmem_transport_ofi_target_data_mrfd); + OFI_CHECK_RETURN_STR(ret, "target data MR enable failed"); + } +#endif + #ifdef ENABLE_MR_RMA_EVENT if (shmem_transport_ofi_mr_rma_event) { ret = fi_mr_enable(shmem_transport_ofi_target_data_mrfd); @@ -729,8 +753,8 @@ int publish_mr_info(void) heap_key = fi_mr_key(shmem_transport_ofi_target_heap_mrfd); data_key = fi_mr_key(shmem_transport_ofi_target_data_mrfd); } else { - heap_key = 1ULL; - data_key = 0ULL; + heap_key = 1; + data_key = 0; } err = shmem_runtime_put("fi_heap_key", &heap_key, sizeof(uint64_t)); @@ -1155,7 +1179,11 @@ int query_for_fabric(struct fabric_info *info) for put with signal implementation */ #endif hints.addr_format = FI_FORMAT_UNSPEC; +#ifdef ENABLE_FI_MANUAL_PROGRESS + domain_attr.data_progress = FI_PROGRESS_MANUAL; +#else domain_attr.data_progress = FI_PROGRESS_AUTO; +#endif domain_attr.resource_mgmt = FI_RM_ENABLED; #ifdef ENABLE_MR_SCALABLE /* Scalable, offset-based addressing, formerly FI_MR_SCALABLE */ @@ -1167,6 +1195,9 @@ int query_for_fabric(struct fabric_info *info) /* Portable, absolute addressing, formerly FI_MR_BASIC */ domain_attr.mr_mode = FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY; #endif +#ifdef ENABLE_MR_ENDPOINT + domain_attr.mr_mode |= FI_MR_ENDPOINT; +#endif #if !defined(ENABLE_MR_SCALABLE) || !defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING) domain_attr.mr_key_size = 1; /* Heap and data use different MR keys, need at least 1 byte */ @@ -1299,22 +1330,22 @@ static int shmem_transport_ofi_target_ep_init(void) ret = fi_ep_bind(shmem_transport_ofi_target_ep, &shmem_transport_ofi_avfd->fid, 0); OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to target endpoint failed"); - ret = allocate_recv_cntr_mr(); - if (ret != 0) return ret; - - struct fi_cq_attr cq_attr = {0}; + struct fi_cq_attr cq_attr = {0}; - ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr, - &shmem_transport_ofi_target_cq, NULL); - OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno)); + ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr, + &shmem_transport_ofi_target_cq, NULL); + OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno)); - ret = fi_ep_bind(shmem_transport_ofi_target_ep, - &shmem_transport_ofi_target_cq->fid, FI_RECV); - OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to target endpoint failed"); + ret = fi_ep_bind(shmem_transport_ofi_target_ep, + &shmem_transport_ofi_target_cq->fid, FI_TRANSMIT | FI_RECV); + OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to target endpoint failed"); ret = fi_enable(shmem_transport_ofi_target_ep); OFI_CHECK_RETURN_STR(ret, "fi_enable on target endpoint failed"); + ret = allocate_recv_cntr_mr(); + if (ret) return ret; + return 0; } @@ -1782,12 +1813,6 @@ int shmem_transport_fini(void) } if (shmem_transport_ofi_stx_pool) free(shmem_transport_ofi_stx_pool); - ret = fi_close(&shmem_transport_ofi_target_ep->fid); - OFI_CHECK_ERROR_MSG(ret, "Target endpoint close failed (%s)\n", fi_strerror(errno)); - - ret = fi_close(&shmem_transport_ofi_target_cq->fid); - OFI_CHECK_ERROR_MSG(ret, "Target CQ close failed (%s)\n", fi_strerror(errno)); - #if defined(ENABLE_MR_SCALABLE) && defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING) ret = fi_close(&shmem_transport_ofi_target_mrfd->fid); OFI_CHECK_ERROR_MSG(ret, "Target MR close failed (%s)\n", fi_strerror(errno)); @@ -1799,6 +1824,12 @@ int shmem_transport_fini(void) OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(errno)); #endif + ret = fi_close(&shmem_transport_ofi_target_ep->fid); + OFI_CHECK_ERROR_MSG(ret, "Target endpoint close failed (%s)\n", fi_strerror(errno)); + + ret = fi_close(&shmem_transport_ofi_target_cq->fid); + OFI_CHECK_ERROR_MSG(ret, "Target CQ close failed (%s)\n", fi_strerror(errno)); + #if ENABLE_TARGET_CNTR ret = fi_close(&shmem_transport_ofi_target_cntrfd->fid); OFI_CHECK_ERROR_MSG(ret, "Target CT close failed (%s)\n", fi_strerror(errno)); diff --git a/src/transport_ofi.h b/src/transport_ofi.h index 099c6468c..e1e803c8e 100644 --- a/src/transport_ofi.h +++ b/src/transport_ofi.h @@ -1314,8 +1314,17 @@ void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, int datatype) { +#ifdef ENABLE_MR_ENDPOINT + /* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE + * That is why non-blocking API is used which uses FI_INJECT. FI_ATOMIC_READ is + * also not supported currently */ + long long dummy = 0; + shmem_transport_fetch_atomic_nbi(ctx, (void *) source, (const void *) &dummy, + target, len, pe, FI_SUM, datatype); +#else shmem_transport_fetch_atomic(ctx, (void *) source, (const void *) NULL, target, len, pe, FI_ATOMIC_READ, datatype); +#endif }