From 1033bdeebeba34dc469ca3dc6cc99661c7009a29 Mon Sep 17 00:00:00 2001 From: Tarun Malviya Date: Mon, 8 Aug 2022 19:51:18 -0500 Subject: [PATCH 1/2] Support for MPI_Win_xxx APIs used in VASP6 application added. --- .../mpi-wrappers/mpi_fortran_wrappers.txt | 20 +- .../mpi_unimplemented_wrappers.txt | 17 - .../mpi-wrappers/mpi_win_wrappers.cpp | 413 +++++++++++++++++- mpi-proxy-split/record-replay.cpp | 56 +++ mpi-proxy-split/record-replay.h | 3 + mpi-proxy-split/virtual-ids.h | 21 +- mpi-proxy-split/window.h | 33 ++ 7 files changed, 541 insertions(+), 22 deletions(-) create mode 100644 mpi-proxy-split/window.h diff --git a/mpi-proxy-split/mpi-wrappers/mpi_fortran_wrappers.txt b/mpi-proxy-split/mpi-wrappers/mpi_fortran_wrappers.txt index 11cd33e3d..d5b9ac44a 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_fortran_wrappers.txt +++ b/mpi-proxy-split/mpi-wrappers/mpi_fortran_wrappers.txt @@ -100,4 +100,22 @@ int MPI_Group_translate_ranks(MPI_Group group1, int n, const int* ranks1, MPI_Gr int MPI_Alloc_mem(MPI_Aint size, MPI_Info info, void* baseptr); int MPI_Free_mem(void* base); -int MPI_Error_string(int errorcode, char* string, int* resultlen); \ No newline at end of file +int MPI_Error_string(int errorcode, char* string, int* resultlen); + +int MPI_Win_get_info(MPI_Win win, MPI_Info* info_used); +int MPI_Win_set_info(MPI_Win win, MPI_Info info); +int MPI_Win_create(void* base, MPI_Aint size, int disp_unit, + MPI_Info info, MPI_Comm comm, MPI_Win* win); +int MPI_Win_create_dynamic(MPI_Info info, MPI_Comm comm, MPI_Win* win); +int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void* baseptr, MPI_Win* win); +int MPI_Win_allocate_shared(MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void* baseptr, MPI_Win* win); +int MPI_Win_shared_query(MPI_Win win, int rank, MPI_Aint* size, + int* disp_unit, void* baseptr); +int MPI_Win_free(MPI_Win* win); +int MPI_Win_fence(int assert, MPI_Win win); +int MPI_Win_flush(int rank, MPI_Win win); +int MPI_Win_lock_all(int assert, MPI_Win win); +int MPI_Win_sync(MPI_Win win); +int MPI_Win_unlock_all(MPI_Win win); +int MPI_Get(void* origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); +int MPI_Put(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); diff --git a/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt b/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt index 845ab1b9b..8eafb4c09 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt +++ b/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt @@ -144,7 +144,6 @@ int MPI_File_sync(MPI_File fh); int MPI_Igather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request); int MPI_Igatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request); int MPI_Get_address(const void *location, MPI_Aint *address); -int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); int MPI_Get_accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win); int MPI_Get_library_version(char *version, int *resultlen); int MPI_Get_version(int *version, int *subversion); @@ -207,7 +206,6 @@ int MPI_Open_port(MPI_Info info, char *port_name); int MPI_Pack_external(const char datarep[], const void *inbuf, int incount, MPI_Datatype datatype, void *outbuf, MPI_Aint outsize, MPI_Aint *position); int MPI_Pack_external_size(const char datarep[], int incount, MPI_Datatype datatype, MPI_Aint *size); int MPI_Publish_name(const char *service_name, MPI_Info info, const char *port_name); -int MPI_Put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); int MPI_Query_thread(int *provided); int MPI_Raccumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request *request); int MPI_Recv_init(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request); @@ -271,15 +269,10 @@ int MPI_Unpack(const void *inbuf, int insize, int *position, void *outbuf, int o int MPI_Unpublish_name(const char *service_name, MPI_Info info, const char *port_name); int MPI_Unpack_external (const char datarep[], const void *inbuf, MPI_Aint insize, MPI_Aint *position, void *outbuf, int outcount, MPI_Datatype datatype); int MPI_Waitsome(int incount, MPI_Request array_of_requests[], int *outcount, int array_of_indices[], MPI_Status array_of_statuses[]); -int MPI_Win_allocate(MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void *baseptr, MPI_Win *win); -int MPI_Win_allocate_shared(MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void *baseptr, MPI_Win *win); int MPI_Win_attach(MPI_Win win, void *base, MPI_Aint size); int MPI_Win_call_errhandler(MPI_Win win, int errorcode); int MPI_Win_complete(MPI_Win win); -int MPI_Win_create(void *base, MPI_Aint size, int disp_unit, - MPI_Info info, MPI_Comm comm, MPI_Win *win); -int MPI_Win_create_dynamic(MPI_Info info, MPI_Comm comm, MPI_Win *win); int MPI_Win_create_errhandler(MPI_Win_errhandler_function *function, MPI_Errhandler *errhandler); int MPI_Win_create_keyval(MPI_Win_copy_attr_function *win_copy_attr_fn, @@ -288,32 +281,22 @@ int MPI_Win_create_keyval(MPI_Win_copy_attr_function *win_copy_attr_fn, int MPI_Win_delete_attr(MPI_Win win, int win_keyval); int MPI_Win_detach(MPI_Win win, const void *base); -int MPI_Win_fence(int assert, MPI_Win win); -int MPI_Win_flush(int rank, MPI_Win win); int MPI_Win_flush_all(MPI_Win win); int MPI_Win_flush_local(int rank, MPI_Win win); int MPI_Win_flush_local_all(MPI_Win win); -int MPI_Win_free(MPI_Win *win); int MPI_Win_free_keyval(int *win_keyval); int MPI_Win_get_attr(MPI_Win win, int win_keyval, void *attribute_val, int *flag); int MPI_Win_get_errhandler(MPI_Win win, MPI_Errhandler *errhandler); int MPI_Win_get_group(MPI_Win win, MPI_Group *group); -int MPI_Win_get_info(MPI_Win win, MPI_Info *info_used); int MPI_Win_get_name(MPI_Win win, char *win_name, int *resultlen); int MPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win); -int MPI_Win_lock_all(int assert, MPI_Win win); int MPI_Win_post(MPI_Group group, int assert, MPI_Win win); int MPI_Win_set_attr(MPI_Win win, int win_keyval, void *attribute_val); int MPI_Win_set_errhandler(MPI_Win win, MPI_Errhandler errhandler); -int MPI_Win_set_info(MPI_Win win, MPI_Info info); int MPI_Win_set_name(MPI_Win win, const char *win_name); -int MPI_Win_shared_query(MPI_Win win, int rank, MPI_Aint *size, - int *disp_unit, void *baseptr); int MPI_Win_start(MPI_Group group, int assert, MPI_Win win); -int MPI_Win_sync(MPI_Win win); int MPI_Win_test(MPI_Win win, int *flag); int MPI_Win_unlock(int rank, MPI_Win win); -int MPI_Win_unlock_all(MPI_Win win); int MPI_Win_wait(MPI_Win win); double MPI_Wtick(void); diff --git a/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp b/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp index 8967b9491..5ea27bcf2 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp +++ b/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp @@ -1,3 +1,11 @@ +#include +#include +#include +#include +#include +#include +#include + #include "mpi_plugin.h" #include "config.h" #include "dmtcp.h" @@ -5,7 +13,13 @@ #include "jassert.h" #include "jfilesystem.h" #include "protectedfds.h" + #include "mpi_nextfunc.h" +#include "record-replay.h" +#include "virtual-ids.h" +#include "two-phase-algo.h" +#include "p2p_drain_send_recv.h" +#include "../window.h" USER_DEFINED_WRAPPER(int, Alloc_mem, (MPI_Aint) size, (MPI_Info) info, (void *) baseptr) @@ -13,7 +27,9 @@ USER_DEFINED_WRAPPER(int, Alloc_mem, (MPI_Aint) size, (MPI_Info) info, // Since memory allocated by the lower half will be discarded during // checkpoint, we need to translate MPI_Alloc_mem and MPI_Free_mem // to malloc and free. This may slow down the program. - *(void**)baseptr = malloc(size * sizeof(MPI_Aint)); + DMTCP_PLUGIN_DISABLE_CKPT(); + *(void**)baseptr = malloc(size); + DMTCP_PLUGIN_ENABLE_CKPT(); return MPI_SUCCESS; } @@ -22,9 +38,404 @@ USER_DEFINED_WRAPPER(int, Free_mem, (void *) baseptr) // Since memory allocated by the lower half will be discarded during // checkpoint, we need to translate MPI_Alloc_mem and MPI_Free_mem // to malloc and free. This may slow down the program. + DMTCP_PLUGIN_DISABLE_CKPT(); free(baseptr); + DMTCP_PLUGIN_ENABLE_CKPT(); return MPI_SUCCESS; } +using namespace dmtcp_mpi; + +USER_DEFINED_WRAPPER(int, Win_get_info, (MPI_Win) win, (MPI_Info *) info_used) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_get_info)(realWin, info_used); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Win_set_info, (MPI_Win) win, (MPI_Info) info) +{ + MPI_Comm comm = virtualWindowVsVirtualCommMap[win]; + std::function realBarrierCb = [=]() { + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_set_info)(realWin, info); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; + }; + return twoPhaseCommit(comm, realBarrierCb); +} + +USER_DEFINED_WRAPPER(int, Win_create, (void *) base, (MPI_Aint) size, + (int) disp_unit, (MPI_Info) info, (MPI_Comm) comm, + (MPI_Win *) win) +{ + std::function realBarrierCb = [=]() { + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Comm realComm = VIRTUAL_TO_REAL_COMM(comm); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = + NEXT_FUNC(Win_create)(base, size, disp_unit, info, realComm, win); + RETURN_TO_UPPER_HALF(); + if (retval == MPI_SUCCESS && MPI_LOGGING()) { + MPI_Win virtualWin = ADD_NEW_WIN(*win); + *win = virtualWin; + LOG_CALL(restoreWindows, Win_create, (long)base, size, disp_unit, info, + comm, virtualWin); + virtualWindowVsVirtualCommMap[virtualWin] = comm; + } + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; + }; + return twoPhaseCommit(comm, realBarrierCb); +} + +USER_DEFINED_WRAPPER(int, Win_create_dynamic, (MPI_Info) info, (MPI_Comm) comm, + (MPI_Win *) win) +{ + std::function realBarrierCb = [=]() { + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Comm realComm = VIRTUAL_TO_REAL_COMM(comm); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_create_dynamic)(info, realComm, win); + RETURN_TO_UPPER_HALF(); + if (retval == MPI_SUCCESS && MPI_LOGGING()) { + MPI_Win virtualWin = ADD_NEW_WIN(*win); + *win = virtualWin; + LOG_CALL(restoreWindows, Win_create_dynamic, info, comm, virtualWin); + virtualWindowVsVirtualCommMap[virtualWin] = comm; + } + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; + }; + return twoPhaseCommit(comm, realBarrierCb); +} + +USER_DEFINED_WRAPPER(int, Win_allocate, (MPI_Aint) size, (int) disp_unit, + (MPI_Info) info, (MPI_Comm) comm, (void *) baseptr, + (MPI_Win *) win) +{ + std::function realBarrierCb = [=]() { + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + retval = MPI_Alloc_mem(size, info, baseptr); + JASSERT(retval == MPI_SUCCESS) + .Text("MPI_Win_allocate: Failed to allocate memory."); + retval = + MPI_Win_create(*(void **)baseptr, size, disp_unit, info, comm, win); + virtualAllocateWindowVsBasePtrMap[*win] = (long)(*(void **)baseptr); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; + }; + return twoPhaseCommit(comm, realBarrierCb); +} + +int +validate_win_allocate_shared_parameters(MPI_Comm comm) +{ + int retval; + int *baseptr; + MPI_Win win; + MPI_Comm realComm = VIRTUAL_TO_REAL_COMM(comm); + + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_allocate_shared)(1, 1, MPI_INFO_NULL, realComm, + &baseptr, &win); + RETURN_TO_UPPER_HALF(); + JASSERT(retval == MPI_SUCCESS) + .Text("Win_allocate_shared: Unable to create dummy window object."); + + // Window should be of flavor MPI_WIN_FLAVOR_SHARED. + int flag = -1; + int *create_kind; + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = + NEXT_FUNC(Win_get_attr)(win, MPI_WIN_CREATE_FLAVOR, &create_kind, &flag); + RETURN_TO_UPPER_HALF(); + JASSERT(retval == MPI_SUCCESS) + .Text("Win_allocate_shared: Unable to get window attribute."); + return (*create_kind == MPI_WIN_FLAVOR_SHARED) ? 1 : 0; +} + +USER_DEFINED_WRAPPER(int, Win_allocate_shared, (MPI_Aint) size, (int) disp_unit, + (MPI_Info) info, (MPI_Comm) comm, (void *) baseptr, + (MPI_Win *) win) +{ + std::function realBarrierCb = [=]() { + DMTCP_PLUGIN_DISABLE_CKPT(); + JASSERT(validate_win_allocate_shared_parameters(comm) == 1) + .Text("Win_allocate_shared: Unable to create window, check parameters."); + + int retval; + + int max_size; + retval = MPI_Allreduce(&size, &max_size, 1, MPI_INT, MPI_MAX, comm); + JASSERT(retval == MPI_SUCCESS) + .Text("Win_allocate_shared: Failed to get maximum shared memory window " + "size."); + + int comm_size; + retval = MPI_Comm_size(comm, &comm_size); + JASSERT(retval == MPI_SUCCESS) + .Text("Win_allocate_shared: Failed to get the size of ."); + + int comm_rank; + retval = MPI_Comm_rank(comm, &comm_rank); + JASSERT(retval == MPI_SUCCESS) + .Text("Win_allocate_shared: Failed to get the rank of the process in " + "."); + + pthread_mutex_lock(&sharedWindowCounterLock); + sharedWindowCounter++; + pthread_mutex_unlock(&sharedWindowCounterLock); + + char key[MPI_MAX_INFO_KEY]; + if (comm_rank == 0) { + int world_rank; + retval = MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); + snprintf(key, sizeof(key), "/WIN-ALLOCATE-SHARED-%d-%d", world_rank, + sharedWindowCounter); + JASSERT(strlen(key) < sizeof(key))(key)(sizeof(key)); + } + MPI_Bcast(key, sizeof(key), MPI_CHAR, 0, comm); + +#if 0 + volatile int dummy = 1; + while (dummy) {}; +#endif + + if (comm_rank != 0) { + retval = MPI_Barrier(comm); + } + + // Create shared memory segment + int win_size = max_size; + int shm_size = comm_size * (win_size + sizeof(ShmWindowProperties)); + int shmid = shmget(ftok(key, 1), shm_size, 0644 | IPC_CREAT); + JASSERT(shmid != -1) + .Text("Win_allocate_shared: shmget(O_CREAT) - Unable to create shared " + "memory segment."); + + if (comm_rank == 0) { + retval = MPI_Barrier(comm); + } + + // Attach to the segment to get a pointer to it. + void *shm_baseptr = shmat(shmid, NULL, 0); + JASSERT(shm_baseptr != (void *)-1) + .Text("Win_allocate_shared: shmat() - Unable to attach to shared memory " + "segment."); + + // Calculate base pointer values + void *shm_win_baseptr = + shm_baseptr + (comm_size * sizeof(ShmWindowProperties)); + *(void **)baseptr = shm_win_baseptr + (comm_rank * win_size); + + // Create window + retval = + MPI_Win_create(*(void **)baseptr, size, disp_unit, info, comm, win); + + sharedWindowProperties[sharedWindowCounter].shmid = shmid; + sharedWindowProperties[sharedWindowCounter].shm_size = shm_size; + sharedWindowProperties[sharedWindowCounter].shm_baseptr = shm_baseptr; + sharedWindowProperties[sharedWindowCounter].shm_win_baseptr = + shm_win_baseptr; + sharedWindowProperties[sharedWindowCounter].win_size = win_size; + sharedWindowProperties[sharedWindowCounter].comm = comm; + sharedWindowProperties[sharedWindowCounter].local_size = size; + sharedWindowProperties[sharedWindowCounter].local_disp_unit = disp_unit; + sharedWindowProperties[sharedWindowCounter].local_baseptr = + *(void **)baseptr; + sharedWindowProperties[sharedWindowCounter].local_virtual_win = *win; + + memcpy(shm_baseptr + (comm_rank * sizeof(ShmWindowProperties)), + &sharedWindowProperties[sharedWindowCounter], + sizeof(ShmWindowProperties)); + + virtualSharedWindowVsIndex[*win] = sharedWindowCounter; + DMTCP_PLUGIN_ENABLE_CKPT(); + return MPI_SUCCESS; + }; + return twoPhaseCommit(comm, realBarrierCb); +} + +USER_DEFINED_WRAPPER(int, Win_shared_query, (MPI_Win) win, (int) rank, + (MPI_Aint *) size, (int *) disp_unit, (void *) baseptr) +{ + int retval = MPI_ERR_WIN; + DMTCP_PLUGIN_DISABLE_CKPT(); + + if (virtualSharedWindowVsIndex.find(win) != + virtualSharedWindowVsIndex.end()) { + int index = virtualSharedWindowVsIndex[win]; + void *shm_baseptr = sharedWindowProperties[index].shm_baseptr; + ShmWindowProperties swp; + memcpy(&swp, shm_baseptr + (rank * sizeof(ShmWindowProperties)), + sizeof(ShmWindowProperties)); + *size = swp.local_size; + *disp_unit = swp.local_disp_unit; + *(void **)baseptr = swp.local_baseptr; + retval = MPI_SUCCESS; + } + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Win_free, (MPI_Win *) win) +{ + int retval = MPI_ERR_WIN; + DMTCP_PLUGIN_DISABLE_CKPT(); + + if (virtualAllocateWindowVsBasePtrMap.find(*win) != + virtualAllocateWindowVsBasePtrMap.end()) { + MPI_Free_mem((void *)virtualAllocateWindowVsBasePtrMap[*win]); + *win = MPI_WIN_NULL; + retval = MPI_SUCCESS; + } else if (virtualSharedWindowVsIndex.find(*win) != + virtualSharedWindowVsIndex.end()) { + int index = virtualSharedWindowVsIndex[*win]; + int shmid = sharedWindowProperties[index].shmid; + const void *shmaddr = sharedWindowProperties[index].shm_baseptr; + if (shmdt(shmaddr) != -1 && shmctl(shmid, IPC_RMID, 0) != -1) { + *win = MPI_WIN_NULL; + retval = MPI_SUCCESS; + } + } + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Win_fence, (int) assert, (MPI_Win) win) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_fence)(assert, realWin); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Win_flush, (int) rank, (MPI_Win) win) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_flush)(rank, realWin); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Win_lock_all, (int) assert, (MPI_Win) win) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_lock_all)(assert, realWin); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Win_sync, (MPI_Win) win) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_sync)(realWin); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Win_unlock_all, (MPI_Win) win) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_unlock_all)(realWin); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Get, (void *) origin_addr, (int) origin_count, + (MPI_Datatype) origin_datatype, (int) target_rank, + (MPI_Aint) target_disp, (int) target_count, + (MPI_Datatype) target_datatype, (MPI_Win) win) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = + NEXT_FUNC(Get)(origin_addr, origin_count, origin_datatype, target_rank, + target_disp, target_count, target_datatype, realWin); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + +USER_DEFINED_WRAPPER(int, Put, (const void *) origin_addr, (int) origin_count, + (MPI_Datatype) origin_datatype, (int) target_rank, + (MPI_Aint) target_disp, (int) target_count, + (MPI_Datatype) target_datatype, (MPI_Win) win) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = + NEXT_FUNC(Put)(origin_addr, origin_count, origin_datatype, target_rank, + target_disp, target_count, target_datatype, realWin); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + PMPI_IMPL(int, MPI_Alloc_mem, MPI_Aint size, MPI_Info info, void *baseptr) PMPI_IMPL(int, MPI_Free_mem, void *baseptr) + +PMPI_IMPL(int, MPI_Win_get_info, MPI_Win win, MPI_Info *info_used) // +PMPI_IMPL(int, MPI_Win_set_info, MPI_Win win, MPI_Info info) + +PMPI_IMPL(int, MPI_Win_create, void *base, MPI_Aint size, int disp_unit, + MPI_Info info, MPI_Comm comm, MPI_Win *win) +PMPI_IMPL(int, MPI_Win_create_dynamic, MPI_Info info, MPI_Comm comm, + MPI_Win *win) +PMPI_IMPL(int, MPI_Win_allocate, MPI_Aint size, int disp_unit, MPI_Info info, + MPI_Comm comm, void *baseptr, MPI_Win *win) +PMPI_IMPL(int, MPI_Win_allocate_shared, MPI_Aint size, int disp_unit, + MPI_Info info, MPI_Comm comm, void *baseptr, MPI_Win *win) + +PMPI_IMPL(int, MPI_Win_shared_query, MPI_Win win, int rank, MPI_Aint *size, + int *disp_unit, void *baseptr) // +PMPI_IMPL(int, MPI_Win_free, MPI_Win *win) // +PMPI_IMPL(int, MPI_Win_fence, int assert, MPI_Win win) // +PMPI_IMPL(int, MPI_Win_flush, int rank, MPI_Win win) // +PMPI_IMPL(int, MPI_Win_lock_all, int assert, MPI_Win win) +PMPI_IMPL(int, MPI_Win_sync, MPI_Win win) // +PMPI_IMPL(int, MPI_Win_unlock_all, MPI_Win win) + +PMPI_IMPL(int, MPI_Get, void *origin_addr, int origin_count, + MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, + int target_count, MPI_Datatype target_datatype, MPI_Win win) +PMPI_IMPL(int, MPI_Put, const void *origin_addr, int origin_count, + MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, + int target_count, MPI_Datatype target_datatype, MPI_Win win); \ No newline at end of file diff --git a/mpi-proxy-split/record-replay.cpp b/mpi-proxy-split/record-replay.cpp index 7fc79aa0b..53784a5d4 100644 --- a/mpi-proxy-split/record-replay.cpp +++ b/mpi-proxy-split/record-replay.cpp @@ -65,6 +65,9 @@ static int restoreCartSub(MpiRecord& rec); static int restoreOpCreate(MpiRecord& rec); static int restoreOpFree(MpiRecord& rec); +static int restoreWinCreate(MpiRecord &rec); +static int restoreWinCreateDynamic(MpiRecord &rec); + static int restoreIbcast(MpiRecord& rec); static int restoreIreduce(MpiRecord& rec); static int restoreIbarrier(MpiRecord& rec); @@ -262,6 +265,27 @@ dmtcp_mpi::restoreOps(MpiRecord &rec) return rc; } +int +dmtcp_mpi::restoreWindows(MpiRecord &rec) +{ + int rc = -1; + JTRACE("Restoring MPI Windows"); + switch (rec.getType()) { + case GENERATE_ENUM(Win_create): + JTRACE("restoreWinCreate"); + rc = restoreWinCreate(rec); + break; + case GENERATE_ENUM(Win_create_dynamic): + JTRACE("restoreWinCreateDynamic"); + rc = restoreWinCreateDynamic(rec); + break; + default: + JWARNING(false)(rec.getType()).Text("Unknown call"); + break; + } + return rc; +} + int dmtcp_mpi::restoreRequests(MpiRecord &rec) { @@ -978,6 +1002,38 @@ restoreOpFree(MpiRecord& rec) return retval; } +static int +restoreWinCreate(MpiRecord &rec) +{ + int retval = -1; + void *base = (void *)rec.args(0); + MPI_Aint size = (MPI_Aint)rec.args(1); + int disp_unit = rec.args(2); + MPI_Comm comm = rec.args(4); + MPI_Win newWin; + retval = FNC_CALL(Win_create, rec)(base, size, disp_unit, MPI_INFO_NULL, comm, + &newWin); + if (retval == MPI_SUCCESS) { + MPI_Win virtualWinId = rec.args(5); + UPDATE_WIN_MAP(virtualWinId, newWin); + } + return retval; +} + +static int +restoreWinCreateDynamic(MpiRecord &rec) +{ + int retval = -1; + MPI_Comm comm = rec.args(1); + MPI_Win newWin; + retval = FNC_CALL(Win_create_dynamic, rec)(MPI_INFO_NULL, comm, &newWin); + if (retval == MPI_SUCCESS) { + MPI_Win virtualWinId = rec.args(2); + UPDATE_WIN_MAP(virtualWinId, newWin); + } + return retval; +} + static int restoreIbcast(MpiRecord& rec) { int retval = -1; void *buf = rec.args(0); diff --git a/mpi-proxy-split/record-replay.h b/mpi-proxy-split/record-replay.h index 359bbf5ed..b61c82a5f 100644 --- a/mpi-proxy-split/record-replay.h +++ b/mpi-proxy-split/record-replay.h @@ -718,6 +718,9 @@ namespace dmtcp_mpi // Restores the MPI ops and returns MPI_SUCCESS on success extern int restoreOps(MpiRecord& ); + // Restores the MPI windows and returns MPI_SUCCESS on success + extern int restoreWindows(MpiRecord& ); + }; // namespace dmtcp_mpi // Restores the MPI state by recreating the communicator, groups, types, etc. diff --git a/mpi-proxy-split/virtual-ids.h b/mpi-proxy-split/virtual-ids.h index 2b258b03b..e30d18871 100644 --- a/mpi-proxy-split/virtual-ids.h +++ b/mpi-proxy-split/virtual-ids.h @@ -39,6 +39,7 @@ #define MpiOpList dmtcp_mpi::MpiVirtualization #define MpiCommKeyvalList dmtcp_mpi::MpiVirtualization #define MpiRequestList dmtcp_mpi::MpiVirtualization +#define MpiWinList dmtcp_mpi::MpiVirtualization #ifndef NEXT_FUNC # define NEXT_FUNC(func) \ ({ \ @@ -123,6 +124,17 @@ #define UPDATE_REQUEST_MAP(v, r) r #endif +#define REAL_TO_VIRTUAL_WIN(id) \ + MpiWinList::instance("MpiWin", MPI_WIN_NULL).realToVirtual(id) +#define VIRTUAL_TO_REAL_WIN(id) \ + MpiWinList::instance("MpiWin", MPI_WIN_NULL).virtualToReal(id) +#define ADD_NEW_WIN(id) \ + MpiWinList::instance("MpiWin", MPI_WIN_NULL).onCreate(id) +#define REMOVE_OLD_WIN(id) \ + MpiWinList::instance("MpiWin", MPI_WIN_NULL).onRemove(id) +#define UPDATE_WIN_MAP(v, r) \ + MpiWinList::instance("MpiWin", MPI_WIN_NULL).updateMapping(v, r) + namespace dmtcp_mpi { @@ -162,9 +174,12 @@ namespace dmtcp_mpi return _virTableMpiCommKeyval; } else if (strcmp(name, "MpiRequest") == 0) { static MpiVirtualization _virTableMpiRequest(name, nullId); - return _virTableMpiRequest; - } - JWARNING(false)(name)(nullId).Text("Unhandled type"); + return _virTableMpiRequest; + } else if (strcmp(name, "MpiWin") == 0) { + static MpiVirtualization _virTableMpiWin(name, nullId); + return _virTableMpiWin; + } + JWARNING(false)(name)(nullId).Text("Unhandled type"); static MpiVirtualization _virTableNoSuchObject(name, nullId); return _virTableNoSuchObject; } diff --git a/mpi-proxy-split/window.h b/mpi-proxy-split/window.h new file mode 100644 index 000000000..9e42cf2b8 --- /dev/null +++ b/mpi-proxy-split/window.h @@ -0,0 +1,33 @@ +#ifndef WINDOW_H +#define WINDOW_H + +#include + +#include + +#define MAX_SHM_WINDOWS 1000 + +typedef struct ShmWindowProperties { + int shmid; + int shm_size; + void *shm_baseptr; + void *shm_win_baseptr; + int win_size; + MPI_Comm comm; + MPI_Aint local_size; + int local_disp_unit; + void *local_baseptr; + MPI_Win local_virtual_win; +} ShmWindowProperties; + +// Window +std::map virtualAllocateWindowVsBasePtrMap; +std::map virtualWindowVsVirtualCommMap; + +// Shared Memory Window +static int sharedWindowCounter = -1; +static pthread_mutex_t sharedWindowCounterLock = PTHREAD_MUTEX_INITIALIZER; +std::map virtualSharedWindowVsIndex; +ShmWindowProperties sharedWindowProperties[MAX_SHM_WINDOWS]; + +#endif // ifndef WINDOW_H From 4920072e58a45fe669992516c9d2411c1a23dc57 Mon Sep 17 00:00:00 2001 From: Tarun Malviya Date: Thu, 25 Aug 2022 13:42:13 -0700 Subject: [PATCH 2/2] Added wrapper function for the MPI_Get_address, MPI_Win_attach & MPI_Win_detach APIs. --- .../mpi-wrappers/mpi_type_wrappers.cpp | 13 +++++++- .../mpi_unimplemented_wrappers.txt | 3 -- .../mpi-wrappers/mpi_win_wrappers.cpp | 30 ++++++++++++++++++- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/mpi-proxy-split/mpi-wrappers/mpi_type_wrappers.cpp b/mpi-proxy-split/mpi-wrappers/mpi_type_wrappers.cpp index db2aa9bc1..af17867c0 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_type_wrappers.cpp +++ b/mpi-proxy-split/mpi-wrappers/mpi_type_wrappers.cpp @@ -255,6 +255,17 @@ USER_DEFINED_WRAPPER(int, Pack, (const void*) inbuf, (int) incount, return retval; } +USER_DEFINED_WRAPPER(int, Get_address, (const void *) location, (MPI_Aint *) address) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Get_address)(location, address); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + DEFINE_FNC(int, Type_size_x, (MPI_Datatype) type, (MPI_Count *) size); PMPI_IMPL(int, MPI_Type_size, MPI_Datatype datatype, int *size) @@ -290,4 +301,4 @@ PMPI_IMPL(int, MPI_Pack_size, int incount, MPI_Datatype datatype, MPI_Comm comm, int *size) PMPI_IMPL(int, MPI_Pack, const void *inbuf, int incount, MPI_Datatype datatype, void *outbuf, int outsize, int *position, MPI_Comm comm) - +PMPI_IMPL(int, MPI_Get_address, const void *location, MPI_Aint *address) diff --git a/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt b/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt index 8eafb4c09..2c5bc8fbd 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt +++ b/mpi-proxy-split/mpi-wrappers/mpi_unimplemented_wrappers.txt @@ -143,7 +143,6 @@ int MPI_File_get_atomicity(MPI_File fh, int *flag); int MPI_File_sync(MPI_File fh); int MPI_Igather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request); int MPI_Igatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request); -int MPI_Get_address(const void *location, MPI_Aint *address); int MPI_Get_accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win); int MPI_Get_library_version(char *version, int *resultlen); int MPI_Get_version(int *version, int *subversion); @@ -269,7 +268,6 @@ int MPI_Unpack(const void *inbuf, int insize, int *position, void *outbuf, int o int MPI_Unpublish_name(const char *service_name, MPI_Info info, const char *port_name); int MPI_Unpack_external (const char datarep[], const void *inbuf, MPI_Aint insize, MPI_Aint *position, void *outbuf, int outcount, MPI_Datatype datatype); int MPI_Waitsome(int incount, MPI_Request array_of_requests[], int *outcount, int array_of_indices[], MPI_Status array_of_statuses[]); -int MPI_Win_attach(MPI_Win win, void *base, MPI_Aint size); int MPI_Win_call_errhandler(MPI_Win win, int errorcode); int MPI_Win_complete(MPI_Win win); @@ -279,7 +277,6 @@ int MPI_Win_create_keyval(MPI_Win_copy_attr_function *win_copy_attr_fn, MPI_Win_delete_attr_function *win_delete_attr_fn, int *win_keyval, void *extra_state); int MPI_Win_delete_attr(MPI_Win win, int win_keyval); -int MPI_Win_detach(MPI_Win win, const void *base); int MPI_Win_flush_all(MPI_Win win); int MPI_Win_flush_local(int rank, MPI_Win win); diff --git a/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp b/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp index 5ea27bcf2..cd4939413 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp +++ b/mpi-proxy-split/mpi-wrappers/mpi_win_wrappers.cpp @@ -74,6 +74,31 @@ USER_DEFINED_WRAPPER(int, Win_set_info, (MPI_Win) win, (MPI_Info) info) return twoPhaseCommit(comm, realBarrierCb); } +USER_DEFINED_WRAPPER(int, Win_attach, (MPI_Win) win, (void *) base, (MPI_Aint) size) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_attach)(realWin, base, size); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + + +USER_DEFINED_WRAPPER(int, Win_detach, (MPI_Win) win, (const void *) base) +{ + int retval; + DMTCP_PLUGIN_DISABLE_CKPT(); + MPI_Win realWin = VIRTUAL_TO_REAL_WIN(win); + JUMP_TO_LOWER_HALF(lh_info.fsaddr); + retval = NEXT_FUNC(Win_detach)(realWin, base); + RETURN_TO_UPPER_HALF(); + DMTCP_PLUGIN_ENABLE_CKPT(); + return retval; +} + USER_DEFINED_WRAPPER(int, Win_create, (void *) base, (MPI_Aint) size, (int) disp_unit, (MPI_Info) info, (MPI_Comm) comm, (MPI_Win *) win) @@ -415,6 +440,9 @@ PMPI_IMPL(int, MPI_Free_mem, void *baseptr) PMPI_IMPL(int, MPI_Win_get_info, MPI_Win win, MPI_Info *info_used) // PMPI_IMPL(int, MPI_Win_set_info, MPI_Win win, MPI_Info info) +PMPI_IMPL(int, MPI_Win_attach, MPI_Win win, void *base, MPI_Aint size) +PMPI_IMPL(int, MPI_Win_detach, MPI_Win win, const void *base) + PMPI_IMPL(int, MPI_Win_create, void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, MPI_Win *win) PMPI_IMPL(int, MPI_Win_create_dynamic, MPI_Info info, MPI_Comm comm, @@ -438,4 +466,4 @@ PMPI_IMPL(int, MPI_Get, void *origin_addr, int origin_count, int target_count, MPI_Datatype target_datatype, MPI_Win win) PMPI_IMPL(int, MPI_Put, const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, - int target_count, MPI_Datatype target_datatype, MPI_Win win); \ No newline at end of file + int target_count, MPI_Datatype target_datatype, MPI_Win win)