diff --git a/.gitignore b/.gitignore index 8328cf79a..21efcae26 100644 --- a/.gitignore +++ b/.gitignore @@ -63,4 +63,4 @@ build/ mpi-proxy-split/mpi-wrappers/p2p-deterministic.h mpi-proxy-split/mpi-wrappers/mana_p2p_update_logs -manpages/mana.1.gz \ No newline at end of file +manpages/mana.1.gz diff --git a/mpi-proxy-split/mpi-wrappers/mpi_p2p_wrappers.cpp b/mpi-proxy-split/mpi-wrappers/mpi_p2p_wrappers.cpp index f1a0c5ef8..2ee758ce7 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_p2p_wrappers.cpp +++ b/mpi-proxy-split/mpi-wrappers/mpi_p2p_wrappers.cpp @@ -41,6 +41,8 @@ USER_DEFINED_WRAPPER(int, Send, (const void *) buf, (int) count, (MPI_Datatype) datatype, (int) dest, (int) tag, (MPI_Comm) comm) { + global_p2p_communication_barrier(); + int retval; #if 0 DMTCP_PLUGIN_DISABLE_CKPT(); @@ -70,6 +72,8 @@ USER_DEFINED_WRAPPER(int, Isend, (int) dest, (int) tag, (MPI_Comm) comm, (MPI_Request *) request) { + global_p2p_communication_barrier(); + int retval; DMTCP_PLUGIN_DISABLE_CKPT(); MPI_Comm realComm = VIRTUAL_TO_REAL_COMM(comm); @@ -105,6 +109,8 @@ USER_DEFINED_WRAPPER(int, Rsend, (const void*) ibuf, (int) count, (MPI_Datatype) datatype, (int) dest, (int) tag, (MPI_Comm) comm) { + global_p2p_communication_barrier(); + // FIXME: Implement this wrapper with MPI_Irsend int retval; DMTCP_PLUGIN_DISABLE_CKPT(); @@ -135,6 +141,8 @@ USER_DEFINED_WRAPPER(int, Recv, (int) source, (int) tag, (MPI_Comm) comm, (MPI_Status *) status) { + global_p2p_communication_barrier(); + int retval; #if 0 DMTCP_PLUGIN_DISABLE_CKPT(); @@ -164,6 +172,8 @@ USER_DEFINED_WRAPPER(int, Irecv, (int) source, (int) tag, (MPI_Comm) comm, (MPI_Request *) request) { + global_p2p_communication_barrier(); + int retval; int flag = 0; int size = 0; @@ -240,6 +250,8 @@ USER_DEFINED_WRAPPER(int, Sendrecv, (const void *) sendbuf, (int) sendcount, (int) recvcount, (MPI_Datatype) recvtype, (int) source, (int) recvtag, (MPI_Comm) comm, (MPI_Status *) status) { + global_p2p_communication_barrier(); + int retval; #if 0 DMTCP_PLUGIN_DISABLE_CKPT(); @@ -283,6 +295,8 @@ USER_DEFINED_WRAPPER(int, Sendrecv_replace, (void *) buf, (int) count, (int) sendtag, (int) source, (int) recvtag, (MPI_Comm) comm, (MPI_Status *) status) { + global_p2p_communication_barrier(); + MPI_Request reqs[2]; MPI_Status sts[2]; diff --git a/mpi-proxy-split/mpi-wrappers/mpi_request_wrappers.cpp b/mpi-proxy-split/mpi-wrappers/mpi_request_wrappers.cpp index bb683af51..f79412dac 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_request_wrappers.cpp +++ b/mpi-proxy-split/mpi-wrappers/mpi_request_wrappers.cpp @@ -61,6 +61,8 @@ EXTERNC USER_DEFINED_WRAPPER(int, Test, (MPI_Request*) request, (int*) flag, (MPI_Status*) status) { + global_p2p_communication_barrier(); + int retval; if (*request == MPI_REQUEST_NULL) { // *request might be in read-only memory. So we can't overwrite it with @@ -123,6 +125,8 @@ USER_DEFINED_WRAPPER(int, Testall, (int) count, (MPI_Request *) array_of_requests, (int *) flag, (MPI_Status *) array_of_statuses) { + global_p2p_communication_barrier(); + // NOTE: See MPI_Testany below for the rationale for these variables. int local_count = count; MPI_Request *local_array_of_requests = array_of_requests; @@ -162,6 +166,8 @@ USER_DEFINED_WRAPPER(int, Testany, (int) count, (MPI_Request *) array_of_requests, (int *) index, (int *) flag, (MPI_Status *) status) { + global_p2p_communication_barrier(); + // FIXME: Revise this note if definition if FORTRAM_MPI_STATUS_IGNORE // fixes the problem. // NOTE: We're seeing a weird bug with the Fortran-to-C interface when Nimrod @@ -200,6 +206,7 @@ USER_DEFINED_WRAPPER(int, Waitall, (int) count, (MPI_Request *) array_of_requests, (MPI_Status *) array_of_statuses) { + global_p2p_communication_barrier(); // FIXME: Revisit this wrapper - call VIRTUAL_TO_REAL_REQUEST on array int retval = MPI_SUCCESS; #if 0 @@ -244,6 +251,8 @@ USER_DEFINED_WRAPPER(int, Waitany, (int) count, (MPI_Request *) array_of_requests, (int *) index, (MPI_Status *) status) { + global_p2p_communication_barrier(); + // NOTE: See MPI_Testany above for the rationale for these variables. int local_count = count; MPI_Request *local_array_of_requests = array_of_requests; @@ -318,6 +327,8 @@ USER_DEFINED_WRAPPER(int, Waitany, (int) count, USER_DEFINED_WRAPPER(int, Wait, (MPI_Request*) request, (MPI_Status*) status) { + global_p2p_communication_barrier(); + int retval; if (*request == MPI_REQUEST_NULL) { // *request might be in read-only memory. So we can't overwrite it with @@ -377,6 +388,8 @@ USER_DEFINED_WRAPPER(int, Wait, (MPI_Request*) request, (MPI_Status*) status) USER_DEFINED_WRAPPER(int, Probe, (int) source, (int) tag, (MPI_Comm) comm, (MPI_Status *) status) { + global_p2p_communication_barrier(); + int retval; int flag = 0; while (!flag) { @@ -388,6 +401,8 @@ USER_DEFINED_WRAPPER(int, Probe, (int) source, (int) tag, USER_DEFINED_WRAPPER(int, Iprobe, (int) source, (int) tag, (MPI_Comm) comm, (int *) flag, (MPI_Status *) status) { + global_p2p_communication_barrier(); + int retval; DMTCP_PLUGIN_DISABLE_CKPT(); // LOG_PRE_Iprobe(status); diff --git a/mpi-proxy-split/mpi_plugin.cpp b/mpi-proxy-split/mpi_plugin.cpp index 57a8dfb52..c0c7b4244 100644 --- a/mpi-proxy-split/mpi_plugin.cpp +++ b/mpi-proxy-split/mpi_plugin.cpp @@ -1030,6 +1030,42 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data) // FIXME: See commant at: dmtcpplugin.cpp:'case DMTCP_EVENT_PRESUSPEND' drain_mpi_collective(); openCkptFileFds(); + + /* P2P messages draining */ + fprintf(stdout, "\n[Rank-%d] Suspending P2P communication...", + g_world_rank); + fflush(stdout); + + // Suspend the global P2P communication + global_p2p_communication = 0; + sleep(2); // Wait for the user thread to get stuck in the P2P API + + fprintf(stdout, "\n[Rank-%d] MPI:Register-local-sends-and-receives\n", + g_world_rank); + fflush(stdout); + dmtcp_global_barrier("MPI:Register-local-sends-and-receives"); + + fprintf(stdout, "\n[Rank-%d] mana_state = CKPT_P2P\n", g_world_rank); + fflush(stdout); + mana_state = CKPT_P2P; + + fprintf(stdout, "\n[Rank-%d] registerLocalSendsAndRecvs()\n", + g_world_rank); + fflush(stdout); + registerLocalSendsAndRecvs(); // p2p_drain_send_recv.cpp + + fprintf(stdout, "\n[Rank-%d] MPI:Drain-Send-Recv\n", g_world_rank); + fflush(stdout); + dmtcp_global_barrier("MPI:Drain-Send-Recv"); + + fprintf(stdout, "\n[Rank-%d] drainSendRecv()\n", g_world_rank); + fflush(stdout); + drainSendRecv(); // p2p_drain_send_recv.cpp + + fprintf(stdout, "\n[Rank-%d] Exiting DMTCP_EVENT_PRESUSPEND\n", + g_world_rank); + fflush(stdout); + break; } @@ -1042,11 +1078,12 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data) getLocalRankInfo(); // p2p_log_replay.cpp dmtcp_global_barrier("MPI:update-ckpt-dir-by-rank"); updateCkptDirByRank(); // mpi_plugin.cpp - dmtcp_global_barrier("MPI:Register-local-sends-and-receives"); - mana_state = CKPT_P2P; - registerLocalSendsAndRecvs(); // p2p_drain_send_recv.cpp - dmtcp_global_barrier("MPI:Drain-Send-Recv"); - drainSendRecv(); // p2p_drain_send_recv.cpp + // dmtcp_global_barrier("MPI:Register-local-sends-and-receives"); + // mana_state = CKPT_P2P; + // registerLocalSendsAndRecvs(); // p2p_drain_send_recv.cpp + // dmtcp_global_barrier("MPI:Drain-Send-Recv"); + // drainSendRecv(); // p2p_drain_send_recv.cpp + dmtcp_global_barrier("MPI:computeUnionOfCkptImageAddresses"); computeUnionOfCkptImageAddresses(); dmtcp_global_barrier("MPI:save-mana-header-and-mpi-files"); const char *file = get_mana_header_file_name(); @@ -1062,6 +1099,9 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data) } case DMTCP_EVENT_RESUME: { + // Resume the global P2P communication + global_p2p_communication = 1; + processingOpenCkpFileFds = false; dmtcp_local_barrier("MPI:Reset-Drain-Send-Recv-Counters"); resetDrainCounters(); // p2p_drain_send_recv.cpp @@ -1072,6 +1112,9 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data) } case DMTCP_EVENT_RESTART: { + // Resume the global P2P communication + global_p2p_communication = 1; + processingOpenCkpFileFds = false; logCkptFileFds(); @@ -1123,6 +1166,36 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data) } } +void +global_p2p_communication_barrier() +{ + if (global_p2p_communication == 1 || + (global_p2p_communication == 0 && internal_p2p_communication == 1)) + return; + + time_t my_time = time(NULL); + char *time_str = ctime(&my_time); + + my_time = time(NULL); + time_str = ctime(&my_time); + time_str[strlen(time_str) - 1] = '\0'; + + fprintf(stdout, "\n%s [Rank-%d] Global P2P communication barrier entered.", + time_str, g_world_rank); + fflush(stdout); + + while (global_p2p_communication == 0) { + } + + my_time = time(NULL); + time_str = ctime(&my_time); + time_str[strlen(time_str) - 1] = '\0'; + + fprintf(stdout, "\n%s [Rank-%d] Global P2P communication barrier exited.\n", + time_str, g_world_rank); + fflush(stdout); +} + DmtcpPluginDescriptor_t mpi_plugin = { DMTCP_PLUGIN_API_VERSION, PACKAGE_VERSION, diff --git a/mpi-proxy-split/mpi_plugin.h b/mpi-proxy-split/mpi_plugin.h index 6420ba6d1..94468f93f 100644 --- a/mpi-proxy-split/mpi_plugin.h +++ b/mpi-proxy-split/mpi_plugin.h @@ -67,4 +67,21 @@ enum mana_state_t { extern mana_state_t mana_state; +/******************************************************************************/ +/* +In applications using MPI, an MPI thread is responsible for managing asynchronous requests in the background. During checkpointing, it is necessary to suspend Collective and P2P communication so that MANA can drain messages from the network and create a checkpoint image. Previously, we had been draining collective messages during the PRESUSPEND event, while all threads were still running. This is because draining collective messages requires all ranks to be in a consistent state, which can only be achieved through trial and error and by allowing the user application to make progress. However, we were draining P2P messages after suspending all threads, including the MPI thread. This was because each rank explicitly asks other ranks for any pending messages floating in the network, and MANA drains them into its internal buffer so that they can be passed to the user application during checkpoint restart. Additionally, we rely on MPI to provide information about pending requests or messages in the network. + +While this approach seemed reasonable, it did not account for a case where the MPI thread was creating metadata for a P2P request and was suspended in between. In such a scenario, that message would not be visible to other ranks since the MPI APIs used by MANA to gather information about pending requests would report that there are no messages, even though there is a message pending that is not yet visible to all ranks because its metadata has not yet been generated. +*/ + +/* +The P2P communication in an application is controlled globally by variable. When we say "control", we mean that it suspends P2P communication and prevents the user application from being in the lower half. This allows for P2P message draining and checkpointing during the PRESUSPEND checkpoint event. If the P2P message draining is performed after PRESUSPEND and before PRECHECKPOINT, the variable mentioned above is not needed. +*/ +static int global_p2p_communication = 1; +static int internal_p2p_communication = 0; + +void global_p2p_communication_barrier(); + +/******************************************************************************/ + #endif // ifndef _MPI_PLUGIN_H diff --git a/mpi-proxy-split/p2p_drain_send_recv.cpp b/mpi-proxy-split/p2p_drain_send_recv.cpp index c8bc46b27..634eb6670 100644 --- a/mpi-proxy-split/p2p_drain_send_recv.cpp +++ b/mpi-proxy-split/p2p_drain_send_recv.cpp @@ -97,8 +97,10 @@ recvMsgIntoInternalBuffer(MPI_Status status, MPI_Comm comm) MPI_Type_size(MPI_BYTE, &size); JASSERT(size == 1); void *buf = JALLOC_HELPER_MALLOC(count); + internal_p2p_communication = 1; int retval = MPI_Recv(buf, count, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, comm, MPI_STATUS_IGNORE); + internal_p2p_communication = 0; JASSERT(retval == MPI_SUCCESS); mpi_message_t *message = (mpi_message_t *)JALLOC_HELPER_MALLOC(sizeof(mpi_message_t)); @@ -185,7 +187,9 @@ drainRemainingP2pMsgs() int flag = 1; while (flag) { MPI_Status status; + internal_p2p_communication = 1; int retval = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, *comm, &flag, &status); + internal_p2p_communication = 0; JASSERT(retval == MPI_SUCCESS); if (flag) { MPI_Request matched_request = MPI_REQUEST_NULL;