Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ build/

mpi-proxy-split/mpi-wrappers/p2p-deterministic.h
mpi-proxy-split/mpi-wrappers/mana_p2p_update_logs
manpages/mana.1.gz
manpages/mana.1.gz
14 changes: 14 additions & 0 deletions mpi-proxy-split/mpi-wrappers/mpi_p2p_wrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ USER_DEFINED_WRAPPER(int, Send,
(const void *) buf, (int) count, (MPI_Datatype) datatype,
(int) dest, (int) tag, (MPI_Comm) comm)
{
global_p2p_communication_barrier();

int retval;
#if 0
DMTCP_PLUGIN_DISABLE_CKPT();
Expand Down Expand Up @@ -70,6 +72,8 @@ USER_DEFINED_WRAPPER(int, Isend,
(int) dest, (int) tag,
(MPI_Comm) comm, (MPI_Request *) request)
{
global_p2p_communication_barrier();

int retval;
DMTCP_PLUGIN_DISABLE_CKPT();
MPI_Comm realComm = VIRTUAL_TO_REAL_COMM(comm);
Expand Down Expand Up @@ -105,6 +109,8 @@ USER_DEFINED_WRAPPER(int, Rsend, (const void*) ibuf, (int) count,
(MPI_Datatype) datatype, (int) dest,
(int) tag, (MPI_Comm) comm)
{
global_p2p_communication_barrier();

// FIXME: Implement this wrapper with MPI_Irsend
int retval;
DMTCP_PLUGIN_DISABLE_CKPT();
Expand Down Expand Up @@ -135,6 +141,8 @@ USER_DEFINED_WRAPPER(int, Recv,
(int) source, (int) tag,
(MPI_Comm) comm, (MPI_Status *) status)
{
global_p2p_communication_barrier();

int retval;
#if 0
DMTCP_PLUGIN_DISABLE_CKPT();
Expand Down Expand Up @@ -164,6 +172,8 @@ USER_DEFINED_WRAPPER(int, Irecv,
(int) source, (int) tag,
(MPI_Comm) comm, (MPI_Request *) request)
{
global_p2p_communication_barrier();

int retval;
int flag = 0;
int size = 0;
Expand Down Expand Up @@ -240,6 +250,8 @@ USER_DEFINED_WRAPPER(int, Sendrecv, (const void *) sendbuf, (int) sendcount,
(int) recvcount, (MPI_Datatype) recvtype, (int) source,
(int) recvtag, (MPI_Comm) comm, (MPI_Status *) status)
{
global_p2p_communication_barrier();

int retval;
#if 0
DMTCP_PLUGIN_DISABLE_CKPT();
Expand Down Expand Up @@ -283,6 +295,8 @@ USER_DEFINED_WRAPPER(int, Sendrecv_replace, (void *) buf, (int) count,
(int) sendtag, (int) source,
(int) recvtag, (MPI_Comm) comm, (MPI_Status *) status)
{
global_p2p_communication_barrier();

MPI_Request reqs[2];
MPI_Status sts[2];

Expand Down
15 changes: 15 additions & 0 deletions mpi-proxy-split/mpi-wrappers/mpi_request_wrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ EXTERNC
USER_DEFINED_WRAPPER(int, Test, (MPI_Request*) request,
(int*) flag, (MPI_Status*) status)
{
global_p2p_communication_barrier();

int retval;
if (*request == MPI_REQUEST_NULL) {
// *request might be in read-only memory. So we can't overwrite it with
Expand Down Expand Up @@ -123,6 +125,8 @@ USER_DEFINED_WRAPPER(int, Testall, (int) count,
(MPI_Request *) array_of_requests, (int *) flag,
(MPI_Status *) array_of_statuses)
{
global_p2p_communication_barrier();

// NOTE: See MPI_Testany below for the rationale for these variables.
int local_count = count;
MPI_Request *local_array_of_requests = array_of_requests;
Expand Down Expand Up @@ -162,6 +166,8 @@ USER_DEFINED_WRAPPER(int, Testany, (int) count,
(MPI_Request *) array_of_requests, (int *) index,
(int *) flag, (MPI_Status *) status)
{
global_p2p_communication_barrier();

// FIXME: Revise this note if definition if FORTRAM_MPI_STATUS_IGNORE
// fixes the problem.
// NOTE: We're seeing a weird bug with the Fortran-to-C interface when Nimrod
Expand Down Expand Up @@ -200,6 +206,7 @@ USER_DEFINED_WRAPPER(int, Waitall, (int) count,
(MPI_Request *) array_of_requests,
(MPI_Status *) array_of_statuses)
{
global_p2p_communication_barrier();
// FIXME: Revisit this wrapper - call VIRTUAL_TO_REAL_REQUEST on array
int retval = MPI_SUCCESS;
#if 0
Expand Down Expand Up @@ -244,6 +251,8 @@ USER_DEFINED_WRAPPER(int, Waitany, (int) count,
(MPI_Request *) array_of_requests, (int *) index,
(MPI_Status *) status)
{
global_p2p_communication_barrier();

// NOTE: See MPI_Testany above for the rationale for these variables.
int local_count = count;
MPI_Request *local_array_of_requests = array_of_requests;
Expand Down Expand Up @@ -318,6 +327,8 @@ USER_DEFINED_WRAPPER(int, Waitany, (int) count,

USER_DEFINED_WRAPPER(int, Wait, (MPI_Request*) request, (MPI_Status*) status)
{
global_p2p_communication_barrier();

int retval;
if (*request == MPI_REQUEST_NULL) {
// *request might be in read-only memory. So we can't overwrite it with
Expand Down Expand Up @@ -377,6 +388,8 @@ USER_DEFINED_WRAPPER(int, Wait, (MPI_Request*) request, (MPI_Status*) status)
USER_DEFINED_WRAPPER(int, Probe, (int) source, (int) tag,
(MPI_Comm) comm, (MPI_Status *) status)
{
global_p2p_communication_barrier();

int retval;
int flag = 0;
while (!flag) {
Expand All @@ -388,6 +401,8 @@ USER_DEFINED_WRAPPER(int, Probe, (int) source, (int) tag,
USER_DEFINED_WRAPPER(int, Iprobe, (int) source, (int) tag, (MPI_Comm) comm,
(int *) flag, (MPI_Status *) status)
{
global_p2p_communication_barrier();

int retval;
DMTCP_PLUGIN_DISABLE_CKPT();
// LOG_PRE_Iprobe(status);
Expand Down
83 changes: 78 additions & 5 deletions mpi-proxy-split/mpi_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,42 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
// FIXME: See commant at: dmtcpplugin.cpp:'case DMTCP_EVENT_PRESUSPEND'
drain_mpi_collective();
openCkptFileFds();

/* P2P messages draining */
fprintf(stdout, "\n[Rank-%d] Suspending P2P communication...",
g_world_rank);
fflush(stdout);

// Suspend the global P2P communication
global_p2p_communication = 0;
sleep(2); // Wait for the user thread to get stuck in the P2P API

fprintf(stdout, "\n[Rank-%d] MPI:Register-local-sends-and-receives\n",
g_world_rank);
fflush(stdout);
dmtcp_global_barrier("MPI:Register-local-sends-and-receives");

fprintf(stdout, "\n[Rank-%d] mana_state = CKPT_P2P\n", g_world_rank);
fflush(stdout);
mana_state = CKPT_P2P;

fprintf(stdout, "\n[Rank-%d] registerLocalSendsAndRecvs()\n",
g_world_rank);
fflush(stdout);
registerLocalSendsAndRecvs(); // p2p_drain_send_recv.cpp

fprintf(stdout, "\n[Rank-%d] MPI:Drain-Send-Recv\n", g_world_rank);
fflush(stdout);
dmtcp_global_barrier("MPI:Drain-Send-Recv");

fprintf(stdout, "\n[Rank-%d] drainSendRecv()\n", g_world_rank);
fflush(stdout);
drainSendRecv(); // p2p_drain_send_recv.cpp

fprintf(stdout, "\n[Rank-%d] Exiting DMTCP_EVENT_PRESUSPEND\n",
g_world_rank);
fflush(stdout);

break;
}

Expand All @@ -1042,11 +1078,12 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
getLocalRankInfo(); // p2p_log_replay.cpp
dmtcp_global_barrier("MPI:update-ckpt-dir-by-rank");
updateCkptDirByRank(); // mpi_plugin.cpp
dmtcp_global_barrier("MPI:Register-local-sends-and-receives");
mana_state = CKPT_P2P;
registerLocalSendsAndRecvs(); // p2p_drain_send_recv.cpp
dmtcp_global_barrier("MPI:Drain-Send-Recv");
drainSendRecv(); // p2p_drain_send_recv.cpp
// dmtcp_global_barrier("MPI:Register-local-sends-and-receives");
// mana_state = CKPT_P2P;
// registerLocalSendsAndRecvs(); // p2p_drain_send_recv.cpp
// dmtcp_global_barrier("MPI:Drain-Send-Recv");
// drainSendRecv(); // p2p_drain_send_recv.cpp
dmtcp_global_barrier("MPI:computeUnionOfCkptImageAddresses");
computeUnionOfCkptImageAddresses();
dmtcp_global_barrier("MPI:save-mana-header-and-mpi-files");
const char *file = get_mana_header_file_name();
Expand All @@ -1062,6 +1099,9 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
}

case DMTCP_EVENT_RESUME: {
// Resume the global P2P communication
global_p2p_communication = 1;

processingOpenCkpFileFds = false;
dmtcp_local_barrier("MPI:Reset-Drain-Send-Recv-Counters");
resetDrainCounters(); // p2p_drain_send_recv.cpp
Expand All @@ -1072,6 +1112,9 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
}

case DMTCP_EVENT_RESTART: {
// Resume the global P2P communication
global_p2p_communication = 1;

processingOpenCkpFileFds = false;
logCkptFileFds();

Expand Down Expand Up @@ -1123,6 +1166,36 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
}
}

void
global_p2p_communication_barrier()
{
if (global_p2p_communication == 1 ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have race condition between checkpoint thread and user application threads? Should we use lock to protect global_p2p_communication access?

(global_p2p_communication == 0 && internal_p2p_communication == 1))
return;

time_t my_time = time(NULL);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this code, a call to MPI_Send will always call this routine, which will always print a message. Computing a time and then doing a print is very expensive compared to MPI_Send. So, we should compute a time (and do some printing) only in the uncommon case, and not in the common case.

Do I understand the code correctly? If this is not a problem, then could you add some comments somewhere to make it clear that this happens only in the uncommon case?

Maybe my confusion comes from not understanding the variable global_p2p_communication. I think it's a global variable initialized to 1, which would mean that we will not compute a time in the common case. But then, can you add some comments there about global_p2p_communication and internal_p2p_communication?

Finally, should we be printing only in the case that we want to debug (#ifdef DEBUG)? I'm not sure that all of the end users in the production code need to see this information.

char *time_str = ctime(&my_time);

my_time = time(NULL);
time_str = ctime(&my_time);
time_str[strlen(time_str) - 1] = '\0';

fprintf(stdout, "\n%s [Rank-%d] Global P2P communication barrier entered.",
time_str, g_world_rank);
fflush(stdout);

while (global_p2p_communication == 0) {
}

my_time = time(NULL);
time_str = ctime(&my_time);
time_str[strlen(time_str) - 1] = '\0';

fprintf(stdout, "\n%s [Rank-%d] Global P2P communication barrier exited.\n",
time_str, g_world_rank);
fflush(stdout);
}

DmtcpPluginDescriptor_t mpi_plugin = {
DMTCP_PLUGIN_API_VERSION,
PACKAGE_VERSION,
Expand Down
17 changes: 17 additions & 0 deletions mpi-proxy-split/mpi_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,21 @@ enum mana_state_t {

extern mana_state_t mana_state;

/******************************************************************************/
/*
In applications using MPI, an MPI thread is responsible for managing asynchronous requests in the background. During checkpointing, it is necessary to suspend Collective and P2P communication so that MANA can drain messages from the network and create a checkpoint image. Previously, we had been draining collective messages during the PRESUSPEND event, while all threads were still running. This is because draining collective messages requires all ranks to be in a consistent state, which can only be achieved through trial and error and by allowing the user application to make progress. However, we were draining P2P messages after suspending all threads, including the MPI thread. This was because each rank explicitly asks other ranks for any pending messages floating in the network, and MANA drains them into its internal buffer so that they can be passed to the user application during checkpoint restart. Additionally, we rely on MPI to provide information about pending requests or messages in the network.

While this approach seemed reasonable, it did not account for a case where the MPI thread was creating metadata for a P2P request and was suspended in between. In such a scenario, that message would not be visible to other ranks since the MPI APIs used by MANA to gather information about pending requests would report that there are no messages, even though there is a message pending that is not yet visible to all ranks because its metadata has not yet been generated.
*/

/*
The P2P communication in an application is controlled globally by <global_p2p_communication> variable. When we say "control", we mean that it suspends P2P communication and prevents the user application from being in the lower half. This allows for P2P message draining and checkpointing during the PRESUSPEND checkpoint event. If the P2P message draining is performed after PRESUSPEND and before PRECHECKPOINT, the variable mentioned above is not needed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reformat to 80 char line as above.

by <global_p2p_communication> variable -> by the <global_p2p_communication> variable

the variable mentioned above -> the <global_p2p_communication> variable

*/
static int global_p2p_communication = 1;
static int internal_p2p_communication = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's almost always wrong to allocate storage for a variable within a .h file. We should declare the type in a .h variable (e.g., extern int global_p2p_communication;), and then to allocate storage for the variable in a .cpp file (e.g., int global_p2p_communication = 1; at global level, near the top of one .cpp file).

If you don't follow this convention, you can create lots of bugs. Right now, you're saying that if mpi_plugin.h is included into two distinct .cpp files, then since you use static, you have now created two independent copies of global_p2p_communication.

Also, the convention within MANA is to use the prefix g_ in the name of any global variable. For example, see g_numMaps.

Even worse, you seem to use in both mpi_plugin.cpp and p2p_drain_send_recv.cpp. So, you seem to have fallen into this trap, and created two copies of internal_p2p_communication.
Could you verify the logic in this case?


void global_p2p_communication_barrier();

/******************************************************************************/

#endif // ifndef _MPI_PLUGIN_H
4 changes: 4 additions & 0 deletions mpi-proxy-split/p2p_drain_send_recv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ recvMsgIntoInternalBuffer(MPI_Status status, MPI_Comm comm)
MPI_Type_size(MPI_BYTE, &size);
JASSERT(size == 1);
void *buf = JALLOC_HELPER_MALLOC(count);
internal_p2p_communication = 1;
int retval = MPI_Recv(buf, count, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG,
comm, MPI_STATUS_IGNORE);
internal_p2p_communication = 0;
JASSERT(retval == MPI_SUCCESS);

mpi_message_t *message = (mpi_message_t *)JALLOC_HELPER_MALLOC(sizeof(mpi_message_t));
Expand Down Expand Up @@ -185,7 +187,9 @@ drainRemainingP2pMsgs()
int flag = 1;
while (flag) {
MPI_Status status;
internal_p2p_communication = 1;
int retval = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, *comm, &flag, &status);
internal_p2p_communication = 0;
JASSERT(retval == MPI_SUCCESS);
if (flag) {
MPI_Request matched_request = MPI_REQUEST_NULL;
Expand Down