Skip to content
12 changes: 12 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ AS_CASE([$enable_ofi_mr],
AC_DEFINE([ENABLE_MR_SCALABLE], [1], [If defined, the OFI transport will use FI_MR_SCALABLE])],
[AC_MSG_ERROR([Invalid OFI memory registration mode: $enable_ofi_mr])])

AC_ARG_ENABLE([mr-endpoint],
[AC_HELP_STRING([--enable-mr-endpoint],
[Use FI_MR_ENDPOINT to enable cxi provider. (default: disabled)])])
AS_IF([test "$enable_mr_endpoint" = "yes"],
[AC_DEFINE([ENABLE_MR_ENDPOINT], [1], [If defined, the OFI transport will use FI_MR_ENDPOINT])])

AC_ARG_ENABLE([ofi-manual-progress],
[AC_HELP_STRING([--enable-ofi-manual-progress],
[Use FI_MANUAL_PROGRESS for data progress control mode. (default: disabled)])])
AS_IF([test "$enable_ofi_manual_progress" = "yes"],
[AC_DEFINE([ENABLE_FI_MANUAL_PROGRESS], [1], [If defined, the OFI will use FI_MANUAL_PROGRESS as data progress mode. This is currently required for cxi provider.])])

AC_ARG_ENABLE([max-teams],
[AC_HELP_STRING([--enable-max-teams=NUMBER],
[Default value for the maximum number of teams allowed (default: 10)])])
Expand Down
5 changes: 4 additions & 1 deletion src/shmem_synchronization.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ shmem_internal_fence(shmem_ctx_t ctx)
} \
} while(0)

#if defined(ENABLE_HARD_POLLING)
/* Polling based wait is required for providers that need
* manual progress, i.e., cxi. This is enabled through
* ENABLE_FI_MANUAL_PROGRESS */
#if defined(ENABLE_HARD_POLLING) || defined(ENABLE_FI_MANUAL_PROGRESS)
#define SHMEM_INTERNAL_WAIT_UNTIL(var, cond, value) \
SHMEM_WAIT_UNTIL_POLL(var, cond, value)
#define SHMEM_INTERNAL_SIGNAL_WAIT_UNTIL(var, cond, value, sat_value) \
Expand Down
71 changes: 51 additions & 20 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,15 +678,17 @@ int allocate_recv_cntr_mr(void)
/* Register separate data and heap segments using keys 0 and 1,
* respectively. In MR_BASIC_MODE, the keys are ignored and selected by
* the provider. */
uint64_t key = 1;
ret = fi_mr_reg(shmem_transport_ofi_domainfd, shmem_internal_heap_base,
shmem_internal_heap_length,
FI_REMOTE_READ | FI_REMOTE_WRITE, 0, 1ULL, flags,
FI_REMOTE_READ | FI_REMOTE_WRITE, 0, key, flags,
&shmem_transport_ofi_target_heap_mrfd, NULL);
OFI_CHECK_RETURN_STR(ret, "target memory (heap) registration failed");

key = 0;
ret = fi_mr_reg(shmem_transport_ofi_domainfd, shmem_internal_data_base,
shmem_internal_data_length,
FI_REMOTE_READ | FI_REMOTE_WRITE, 0, 0ULL, flags,
FI_REMOTE_READ | FI_REMOTE_WRITE, 0, key, flags,
&shmem_transport_ofi_target_data_mrfd, NULL);
OFI_CHECK_RETURN_STR(ret, "target memory (data) registration failed");

Expand All @@ -702,6 +704,28 @@ int allocate_recv_cntr_mr(void)
FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to data MR failed");

#ifdef ENABLE_MR_ENDPOINT
if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_ENDPOINT) {
ret = fi_ep_bind(shmem_transport_ofi_target_ep,
&shmem_transport_ofi_target_cntrfd->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to target EP failed");

ret = fi_mr_bind(shmem_transport_ofi_target_heap_mrfd,
&shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target EP binding to heap MR failed");

ret = fi_mr_enable(shmem_transport_ofi_target_heap_mrfd);
OFI_CHECK_RETURN_STR(ret, "target heap MR enable failed");

ret = fi_mr_bind(shmem_transport_ofi_target_data_mrfd,
&shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target EP binding to data MR failed");

ret = fi_mr_enable(shmem_transport_ofi_target_data_mrfd);
OFI_CHECK_RETURN_STR(ret, "target data MR enable failed");
}
#endif

#ifdef ENABLE_MR_RMA_EVENT
if (shmem_transport_ofi_mr_rma_event) {
ret = fi_mr_enable(shmem_transport_ofi_target_data_mrfd);
Expand Down Expand Up @@ -729,8 +753,8 @@ int publish_mr_info(void)
heap_key = fi_mr_key(shmem_transport_ofi_target_heap_mrfd);
data_key = fi_mr_key(shmem_transport_ofi_target_data_mrfd);
} else {
heap_key = 1ULL;
data_key = 0ULL;
heap_key = 1;
data_key = 0;
}

err = shmem_runtime_put("fi_heap_key", &heap_key, sizeof(uint64_t));
Expand Down Expand Up @@ -1155,7 +1179,11 @@ int query_for_fabric(struct fabric_info *info)
for put with signal implementation */
#endif
hints.addr_format = FI_FORMAT_UNSPEC;
#ifdef ENABLE_FI_MANUAL_PROGRESS
domain_attr.data_progress = FI_PROGRESS_MANUAL;
#else
domain_attr.data_progress = FI_PROGRESS_AUTO;
#endif
domain_attr.resource_mgmt = FI_RM_ENABLED;
#ifdef ENABLE_MR_SCALABLE
/* Scalable, offset-based addressing, formerly FI_MR_SCALABLE */
Expand All @@ -1167,6 +1195,9 @@ int query_for_fabric(struct fabric_info *info)
/* Portable, absolute addressing, formerly FI_MR_BASIC */
domain_attr.mr_mode = FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY;
#endif
#ifdef ENABLE_MR_ENDPOINT
domain_attr.mr_mode |= FI_MR_ENDPOINT;
#endif
#if !defined(ENABLE_MR_SCALABLE) || !defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING)
domain_attr.mr_key_size = 1; /* Heap and data use different MR keys, need
at least 1 byte */
Expand Down Expand Up @@ -1299,22 +1330,22 @@ static int shmem_transport_ofi_target_ep_init(void)
ret = fi_ep_bind(shmem_transport_ofi_target_ep, &shmem_transport_ofi_avfd->fid, 0);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to target endpoint failed");

ret = allocate_recv_cntr_mr();
if (ret != 0) return ret;

struct fi_cq_attr cq_attr = {0};
struct fi_cq_attr cq_attr = {0};

ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr,
&shmem_transport_ofi_target_cq, NULL);
OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno));
ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr,
&shmem_transport_ofi_target_cq, NULL);
OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno));

ret = fi_ep_bind(shmem_transport_ofi_target_ep,
&shmem_transport_ofi_target_cq->fid, FI_RECV);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to target endpoint failed");
ret = fi_ep_bind(shmem_transport_ofi_target_ep,
&shmem_transport_ofi_target_cq->fid, FI_TRANSMIT | FI_RECV);
OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to target endpoint failed");

ret = fi_enable(shmem_transport_ofi_target_ep);
OFI_CHECK_RETURN_STR(ret, "fi_enable on target endpoint failed");

ret = allocate_recv_cntr_mr();
if (ret) return ret;

return 0;
}

Expand Down Expand Up @@ -1782,12 +1813,6 @@ int shmem_transport_fini(void)
}
if (shmem_transport_ofi_stx_pool) free(shmem_transport_ofi_stx_pool);

ret = fi_close(&shmem_transport_ofi_target_ep->fid);
OFI_CHECK_ERROR_MSG(ret, "Target endpoint close failed (%s)\n", fi_strerror(errno));

ret = fi_close(&shmem_transport_ofi_target_cq->fid);
OFI_CHECK_ERROR_MSG(ret, "Target CQ close failed (%s)\n", fi_strerror(errno));

#if defined(ENABLE_MR_SCALABLE) && defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING)
ret = fi_close(&shmem_transport_ofi_target_mrfd->fid);
OFI_CHECK_ERROR_MSG(ret, "Target MR close failed (%s)\n", fi_strerror(errno));
Expand All @@ -1799,6 +1824,12 @@ int shmem_transport_fini(void)
OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(errno));
#endif

ret = fi_close(&shmem_transport_ofi_target_ep->fid);
OFI_CHECK_ERROR_MSG(ret, "Target endpoint close failed (%s)\n", fi_strerror(errno));

ret = fi_close(&shmem_transport_ofi_target_cq->fid);
OFI_CHECK_ERROR_MSG(ret, "Target CQ close failed (%s)\n", fi_strerror(errno));

#if ENABLE_TARGET_CNTR
ret = fi_close(&shmem_transport_ofi_target_cntrfd->fid);
OFI_CHECK_ERROR_MSG(ret, "Target CT close failed (%s)\n", fi_strerror(errno));
Expand Down
9 changes: 9 additions & 0 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -1314,8 +1314,17 @@ void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target,
const void *source, size_t len, int pe,
int datatype)
{
#ifdef ENABLE_MR_ENDPOINT
/* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE
* That is why non-blocking API is used which uses FI_INJECT. FI_ATOMIC_READ is
* also not supported currently */
long long dummy = 0;
shmem_transport_fetch_atomic_nbi(ctx, (void *) source, (const void *) &dummy,
target, len, pe, FI_SUM, datatype);
#else
shmem_transport_fetch_atomic(ctx, (void *) source, (const void *) NULL,
target, len, pe, FI_ATOMIC_READ, datatype);
#endif
}


Expand Down