Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ jobs:
xpmem_version: master
sos_config: --with-xpmem=${XPMEM_INSTALL_DIR} --enable-error-checking
--enable-remote-virtual-addressing --enable-pmi-simple
--enable-hard-polling
libfabric_version: v2.1.x
- config_name: XPMEM shared atomics
xpmem_version: master
sos_config: --with-xpmem=${XPMEM_INSTALL_DIR} --enable-shr-atomics
--enable-error-checking --enable-pmi-simple
--enable-hard-polling
libfabric_version: v2.1.x
- config_name: RVA, thread completion
sos_config: --enable-error-checking --enable-remote-virtual-addressing
Expand Down Expand Up @@ -517,7 +519,7 @@ jobs:
sos_config: [--enable-pmi-simple --disable-fortran,
--with-cma --enable-error-checking --enable-profiling
--enable-pmi-simple --disable-fortran --with-hwloc=no,
--with-xpmem --enable-error-checking --enable-pmi-simple --with-hwloc=no]
--with-xpmem --enable-error-checking --enable-pmi-simple --with-hwloc=no --enable-hard-polling]
steps:
- name: Checking OS version
run: |
Expand Down Expand Up @@ -620,7 +622,7 @@ jobs:
matrix:
include:
- config_name: XPMEM with Shared Atomics
sos_config: --with-xpmem --enable-shr-atomics --enable-error-checking --enable-pmi-simple
sos_config: --with-xpmem --enable-shr-atomics --enable-error-checking --enable-pmi-simple --enable-hard-polling
portals4_version: master
xpmem_version: master

Expand Down Expand Up @@ -736,7 +738,7 @@ jobs:
include:
- config_name: transport_none
xpmem_version: master
sos_config: [--with-xpmem --enable-shr-atomics --enable-error-checking --enable-pmi-simple]
sos_config: [--with-xpmem --enable-shr-atomics --enable-error-checking --enable-pmi-simple --enable-hard-polling]

steps:
- name: Checking OS version
Expand Down
1 change: 0 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,6 @@ AM_CONDITIONAL([USE_CMA], [test "$transport_cma" = "yes"])

AS_IF([test "$transport_xpmem" = "yes" -o "$transport_cma" = "yes"],
[AC_DEFINE([USE_ON_NODE_COMMS], [1], [Define if any on-node comm transport is available])
AC_DEFINE([ENABLE_HARD_POLLING], [1], [Enable hard polling])
])

if test "$enable_shr_atomics" = "yes"; then
Expand Down
4 changes: 4 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ shmem_internal_heap_postinit(void)
shmem_internal_heap_base, shmem_internal_heap_length,
shmem_internal_data_base, shmem_internal_data_length);

if (shmem_internal_params.BOUNCE_MLOCK) {
DEBUG_MSG("Bounce buffer locking enabled\n");
}

#ifdef HAVE_SCHED_GETAFFINITY
#ifdef USE_HWLOC
ret = hwloc_topology_init(&shmem_internal_topology);
Expand Down
6 changes: 5 additions & 1 deletion src/shmem_env_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ SHMEM_INTERNAL_ENV_DEF(SYMMETRIC_HEAP_USE_MALLOC, bool, false, SHMEM_INTERNAL_EN
"Allocate the symmetric heap using malloc")
SHMEM_INTERNAL_ENV_DEF(BOUNCE_SIZE, size, DEFAULT_BOUNCE_SIZE, SHMEM_INTERNAL_ENV_CAT_OTHER,
"Maximum message size to bounce buffer")
SHMEM_INTERNAL_ENV_DEF(MAX_BOUNCE_BUFFERS, long, 128, SHMEM_INTERNAL_ENV_CAT_OTHER,
SHMEM_INTERNAL_ENV_DEF(MAX_BOUNCE_BUFFERS, size, 128, SHMEM_INTERNAL_ENV_CAT_OTHER,
"Maximum number of bounce buffers per context")
SHMEM_INTERNAL_ENV_DEF(TRAP_ON_ABORT, bool, false, SHMEM_INTERNAL_ENV_CAT_OTHER,
"Generate trap if the program aborts or calls shmem_global_exit")
Expand Down Expand Up @@ -126,3 +126,7 @@ SHMEM_INTERNAL_ENV_DEF(MPI_THREAD_LEVEL, string, "MPI_THREAD_SINGLE", SHMEM_INTE
SHMEM_INTERNAL_ENV_DEF(BACKTRACE, string, "", SHMEM_INTERNAL_ENV_CAT_OTHER,
"Specify the mechanism to use for backtracing on failure")

SHMEM_INTERNAL_ENV_DEF(BOUNCE_MLOCK, bool, false, SHMEM_INTERNAL_ENV_CAT_OTHER,
"Lock bounce buffer memory preventing buffers from being paged out to swap")
SHMEM_INTERNAL_ENV_DEF(BOUNCE_SHEAP, bool, true, SHMEM_INTERNAL_ENV_CAT_OTHER,
"Allocate bounce buffers using symmetric heap, if supported by transport")
71 changes: 51 additions & 20 deletions src/shmem_free_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
#include "shmem.h"
#include "shmem_free_list.h"

#define NUM_ELEMENTS 2

shmem_free_list_t*
shmem_free_list_init(unsigned int element_size,
shmem_free_list_item_init_fn_t init_fn)
shmem_free_list_init(size_t element_size,
shmem_free_list_item_init_fn_t init_fn,
size_t max_pool_cnt)
{
int ret;
shmem_free_list_t *fl = (shmem_free_list_t*) calloc(1, sizeof(shmem_free_list_t));
Expand All @@ -34,6 +36,18 @@ shmem_free_list_init(unsigned int element_size,
fl->element_size = element_size;
fl->init_fn = init_fn;
fl->nalloc = 0;
fl->alloc_size = sizeof(shmem_free_list_alloc_t) + NUM_ELEMENTS * fl->element_size;
fl->pool_size = 0;
fl->pool_ofs = 0;
fl->pool = NULL;
if (max_pool_cnt) {
/* preallocate pool with shmem malloc */
/* memory must be reserved as a memory pool to prevent address conflicts between PEs */
fl->pool_size = fl->alloc_size * max_pool_cnt;
fl->pool = shmem_internal_shmalloc(fl->pool_size);
}
/* if pool count is zero allocate with bounce buffers with malloc */

SHMEM_MUTEX_INIT(fl->lock);
ret = shmem_free_list_more(fl);
if (0 != ret) {
Expand All @@ -48,13 +62,20 @@ shmem_free_list_init(unsigned int element_size,
void
shmem_free_list_destroy(shmem_free_list_t *fl)
{
shmem_free_list_alloc_t *alloc, *next;

alloc = fl->allocs;
while (NULL != alloc) {
next = alloc->next;
free(alloc);
alloc = next;
if (fl->pool) {
/* allocated with shmem malloc */
shmem_internal_free(fl->pool);
fl->pool = NULL;
fl->pool_ofs = 0;
} else {
/* allocated with malloc */
shmem_free_list_alloc_t *alloc, *next;
alloc = fl->allocs;
while (NULL != alloc) {
next = alloc->next;
free(alloc);
alloc = next;
}
}

SHMEM_MUTEX_DESTROY(fl->lock);
Expand All @@ -64,25 +85,35 @@ shmem_free_list_destroy(shmem_free_list_t *fl)
int
shmem_free_list_more(shmem_free_list_t *fl)
{
size_t page_size = 4096 - sizeof(shmem_free_list_alloc_t);
int num_elements = (fl->element_size < page_size) ? page_size / fl->element_size : 1;
shmem_free_list_item_t *item, *first, *next, *last = NULL;
shmem_free_list_alloc_t *header;
char *buf;
int i;

num_elements = 2;

buf = malloc(sizeof(shmem_free_list_alloc_t) +
num_elements * fl->element_size);
if (NULL == buf) return 1;
uint64_t i;

if (fl->pool) {
if (fl->pool_ofs >= fl->pool_size) {
fprintf(stderr, "[%d] pool memory exhausted\n", shmem_internal_my_pe);
return 1;
}

buf = &fl->pool[fl->pool_ofs];
fl->pool_ofs += fl->alloc_size;

if (shmem_internal_params.BOUNCE_MLOCK) {
mlock(buf, sizeof(shmem_free_list_alloc_t) + NUM_ELEMENTS * fl->element_size);
}
} else {
buf = malloc(sizeof(shmem_free_list_alloc_t) +
NUM_ELEMENTS * fl->element_size);
if (NULL == buf) return 1;
}

header = (shmem_free_list_alloc_t*) buf;
first = item = (shmem_free_list_item_t*) (header + 1);
for (i = 0 ; i < num_elements ; ++i) {
for (i = 0 ; i < NUM_ELEMENTS ; ++i) {
fl->init_fn(item);
next = (shmem_free_list_item_t*)((char*)item + fl->element_size);
if (i == num_elements - 1) {
if (i == NUM_ELEMENTS - 1) {
item->next = NULL;
last = item;
} else {
Expand Down
15 changes: 10 additions & 5 deletions src/shmem_free_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
#define SHMEM_FREE_QUEUE_H

#include <stdint.h>

#include <sys/mman.h>
#include <unistd.h>
#include "shmem_internal.h"

struct shmem_free_list_item_t {
Expand All @@ -33,9 +34,12 @@ typedef struct shmem_free_list_alloc_t shmem_free_list_alloc_t;
typedef void (*shmem_free_list_item_init_fn_t)(shmem_free_list_item_t *item);

struct shmem_free_list_t {
uint32_t element_size;
size_t element_size;
uint64_t nalloc;

uint64_t alloc_size;
uint64_t pool_size;
char *pool;
size_t pool_ofs;
shmem_free_list_item_init_fn_t init_fn;
shmem_free_list_alloc_t *allocs;
shmem_free_list_item_t* head;
Expand All @@ -45,8 +49,9 @@ struct shmem_free_list_t {
};
typedef struct shmem_free_list_t shmem_free_list_t;

shmem_free_list_t* shmem_free_list_init(unsigned int element_size,
shmem_free_list_item_init_fn_t init_fn);
shmem_free_list_t* shmem_free_list_init(size_t element_size,
shmem_free_list_item_init_fn_t init_fn,
size_t max_pool_cnt);
void shmem_free_list_destroy(shmem_free_list_t *fl);
int shmem_free_list_more(shmem_free_list_t *fl);

Expand Down
5 changes: 4 additions & 1 deletion src/shmem_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ extern unsigned int shmem_internal_rand_seed;
extern hwloc_topology_t shmem_internal_topology;
#endif

#define SHMEM_INTERNAL_HEAP_OVERHEAD (1024*1024)
#define SHMEM_INTERNAL_HEAP_OVERHEAD (10*1024*1024)
/* Note: this is an estimate */
#define SHMEM_MAX_BOUNCE_BUFFER_OVERHEAD (2*1024*1024)

#define SHMEM_INTERNAL_DIAG_STRLEN 1024
#define SHMEM_INTERNAL_DIAG_WRAPLEN 72

Expand Down
3 changes: 2 additions & 1 deletion src/symmetric_heap_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ shmem_internal_symmetric_init(void)
{
/* add library overhead such that the max can be shmalloc()'ed */
shmem_internal_heap_length = shmem_internal_params.SYMMETRIC_SIZE +
SHMEM_INTERNAL_HEAP_OVERHEAD;
SHMEM_INTERNAL_HEAP_OVERHEAD +
SHMEM_MAX_BOUNCE_BUFFER_OVERHEAD;

if (!shmem_internal_params.SYMMETRIC_HEAP_USE_MALLOC) {
shmem_internal_heap_base =
Expand Down
13 changes: 11 additions & 2 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ long shmem_transport_ofi_get_poll_limit;
size_t shmem_transport_ofi_max_buffered_send;
size_t shmem_transport_ofi_max_msg_size;
size_t shmem_transport_ofi_bounce_buffer_size;
long shmem_transport_ofi_max_bounce_buffers;
size_t shmem_transport_ofi_max_bounce_buffers;
size_t shmem_transport_ofi_addrlen;
#ifdef ENABLE_MR_RMA_EVENT
int shmem_transport_ofi_mr_rma_event;
Expand Down Expand Up @@ -1731,6 +1731,7 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id)
cntr_put_attr.events = FI_CNTR_EVENTS_COMP;
cntr_get_attr.events = FI_CNTR_EVENTS_COMP;

#if 0
/* Set FI_WAIT based on the put and get polling limits defined above */
if (shmem_transport_ofi_put_poll_limit < 0) {
cntr_put_attr.wait_obj = FI_WAIT_NONE;
Expand All @@ -1742,6 +1743,9 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id)
} else {
cntr_get_attr.wait_obj = FI_WAIT_UNSPEC;
}
#endif
cntr_put_attr.wait_obj = FI_WAIT_UNSPEC;
cntr_get_attr.wait_obj = FI_WAIT_UNSPEC;

/* Allow provider to choose CQ size, since we are using FI_RM_ENABLED.
* Context format is used to return bounce buffer pointers in the event
Expand Down Expand Up @@ -1804,10 +1808,15 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id)
shmem_transport_ofi_bounce_buffer_size > 0 &&
shmem_transport_ofi_max_bounce_buffers > 0)
{
size_t max_bb_pool_cnt = 0;
if (shmem_internal_params.BOUNCE_SHEAP) {
max_bb_pool_cnt = shmem_transport_ofi_max_bounce_buffers;
}
ctx->bounce_buffers =
shmem_free_list_init(sizeof(shmem_transport_ofi_bounce_buffer_t) +
shmem_transport_ofi_bounce_buffer_size,
init_bounce_buffer);
init_bounce_buffer,
max_bb_pool_cnt);
}
else {
ctx->options &= ~SHMEMX_CTX_BOUNCE_BUFFER;
Expand Down
55 changes: 9 additions & 46 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ extern long shmem_transport_ofi_get_poll_limit;
extern size_t shmem_transport_ofi_max_buffered_send;
extern size_t shmem_transport_ofi_max_msg_size;
extern size_t shmem_transport_ofi_bounce_buffer_size;
extern long shmem_transport_ofi_max_bounce_buffers;
extern size_t shmem_transport_ofi_max_bounce_buffers;

extern pthread_mutex_t shmem_transport_ofi_progress_lock;

Expand Down Expand Up @@ -436,7 +436,7 @@ void shmem_transport_ofi_drain_cq(shmem_transport_ctx_t *ctx)
(shmem_transport_ofi_bounce_buffer_t *) frag);
ctx->completed_bb_cntr++;
} else {
RAISE_ERROR_STR("Unrecognized completion object");
RAISE_ERROR_MSG("[%d] Unrecognized completion object %p %x mtofs %p\n", shmem_internal_my_pe, frag, frag->mytype, &frag->mytype);
}
}

Expand Down Expand Up @@ -473,6 +473,10 @@ shmem_transport_ofi_bounce_buffer_t * create_bounce_buffer(shmem_transport_ctx_t
if (NULL == buff)
RAISE_ERROR_STR("Bounce buffer allocation failed");

if (buff->frag.mytype != SHMEM_TRANSPORT_OFI_TYPE_BOUNCE) {
RAISE_ERROR_STR("Bounce buffer allocation failed");
}

shmem_internal_assert(buff->frag.mytype == SHMEM_TRANSPORT_OFI_TYPE_BOUNCE);

memcpy(buff->data, source, len);
Expand Down Expand Up @@ -504,28 +508,8 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx)
* reverse order: first the fid_cntr event counter, then the put issued
* counter. We'll want to preserve this property in the future.
*/
uint64_t success, fail, cnt, cnt_new;
long poll_count = 0;
while (poll_count < shmem_transport_ofi_put_poll_limit ||
shmem_transport_ofi_put_poll_limit < 0) {
success = fi_cntr_read(ctx->put_cntr);
fail = fi_cntr_readerr(ctx->put_cntr);
cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr);

shmem_transport_probe();

if (success < cnt && fail == 0) {
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
SPINLOCK_BODY();
SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
} else if (fail) {
RAISE_ERROR_MSG("Operations completed in error (%" PRIu64 ")\n", fail);
} else {
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
return;
}
poll_count++;
}
uint64_t cnt, cnt_new;

cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr);
do {
cnt = cnt_new;
Expand Down Expand Up @@ -965,31 +949,10 @@ void shmem_transport_get_wait(shmem_transport_ctx_t* ctx)
* reverse order: first the fid_cntr event counter, then the get issued
* counter. We'll want to preserve this property in the future.
*/
uint64_t success, fail, cnt, cnt_new;
long poll_count = 0;
uint64_t cnt, cnt_new;

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);

while (poll_count < shmem_transport_ofi_get_poll_limit ||
shmem_transport_ofi_get_poll_limit < 0) {
success = fi_cntr_read(ctx->get_cntr);
fail = fi_cntr_readerr(ctx->get_cntr);
cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_cntr);

shmem_transport_probe();

if (success < cnt && fail == 0) {
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
SPINLOCK_BODY();
SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
} else if (fail) {
RAISE_ERROR_MSG("Operations completed in error (%" PRIu64 ")\n", fail);
} else {
SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx);
return;
}
poll_count++;
}
cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_get_cntr);
do {
cnt = cnt_new;
Expand Down
6 changes: 4 additions & 2 deletions src/transport_portals4.c
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,13 @@ shmem_transport_init(void)
shmem_transport_portals4_bounce_buffers =
shmem_free_list_init(sizeof(shmem_transport_portals4_bounce_buffer_t) +
shmem_transport_portals4_bounce_buffer_size,
init_bounce_buffer);
init_bounce_buffer,
0);

shmem_transport_portals4_long_frags =
shmem_free_list_init(sizeof(shmem_transport_portals4_long_frag_t),
init_long_frag);
init_long_frag,
0);

/* Initialize network */
ni_req_limits.max_entries = 1024;
Expand Down