Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mpi-proxy-split/mpi_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ computeUnionOfCkptImageAddresses()
minAddrBeyondHeap = area.addr;
}

if (strcmp(area.name, "[vsyscall]") != NULL) {
if (strcmp(area.name, "[vsyscall]") != 0) {
maxAddrBeyondHeap = area.endAddr;
}
}
Expand Down
112 changes: 111 additions & 1 deletion mpi-proxy-split/p2p_drain_send_recv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,119 @@ allDrained()
}
return true;
}


int existsMatchingRequests(MPI_Status status, MPI_Comm comm) {
int isMatchingRequest = 0;
MPI_Request matched_request;
std::map<MPI_Request, mpi_nonblocking_call_t*>::iterator it;
// Check if there are pending MPI_Irecv's that matches the envelope of the
// probed message.
for (it = g_nonblocking_calls.begin(); it != g_nonblocking_calls.end(); it++) {
MPI_Request req = it->first;
mpi_nonblocking_call_t *call = it->second;
if (call->type == IRECV_REQUEST &&
call->comm == comm &&
(call->tag == status.MPI_TAG || call->tag == MPI_ANY_TAG) &&
(call->remote_node == status.MPI_SOURCE ||
call->remote_node == MPI_ANY_SOURCE)) {
matched_request = req;
isMatchingRequest = 1;
break;
}
}
return isMatchingRequest;
}

int
drainRemainingP2pMsgsONLY()
{
int bytesReceived = 0;
std::unordered_set<MPI_Comm>::iterator comm;
for (comm = active_comms.begin(); comm != active_comms.end(); comm++) {
// If the communicator is MPI_COMM_NULL, skip it.
// MPI_COMM_NULL can be returned from functions like MPI_Comm_split
// if the color is specified on only one side of the inter-communicator, or
// specified as MPI_UNDEFINED by the program. In this case, the MPI function
// still returns MPI_SUCCESS, so the MPI_COMM_NULL can be added to the
// active communicator set `active_comms'.
if (*comm == MPI_COMM_NULL) {
continue;
}
int flag = 1;
while (flag) {
MPI_Status status;
int retval = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, *comm, &flag, &status);
JASSERT(retval == MPI_SUCCESS);
if (flag) {
JASSERT(! existsMatchingRequests(status, *comm));
bytesReceived += recvMsgIntoInternalBuffer(status, *comm);
}
}
}
return bytesReceived;
}

void
drainSendRecv()
{
int i;
int numReceived = -1;
int totalRecvBytes = 0;
int lastTotalRecvBytes = 0;
int bytesReceived = 0;
// QUESTION: Are the requests saved as a FIFO queue (so that
// non-overtaking means that we see the oldest request first
// for a source/tag?
// STAGE 1: Test requests, until all requests that can be completed
// have been completed.
int done = 0;
while (bytesReceived > 0 || !done) {
while (bytesReceived > 0) {
done = 0;
// If pending MPI_Irecv or MPI_Isend, use MPI_Test to try to complete it.
bytesReceived += completePendingP2pRequests();
}
// Sleep for 5 seconds to allow more remote send requests to be completed,
// and sent to us. Eventually, all send requests for which we
// have a pending recv request should be sent, if we continue
// to test all send and recv requests.
//
// Now try twice more. This gives the remote process time to
// complete a pending send, if needed.
done = 0;
bytesReceived = 0;
sleep(5);
// If pending MPI_Irecv or MPI_Isend, use MPI_Test to try to complete it.
bytesReceived += completePendingP2pRequests();
sleep(5);
bytesReceived += completePendingP2pRequests();
if (bytesReceived == 0) {
done = 1; // Okay. We waited for slow senders. Now we're done.
}
}

// This is where we can add tests of correctness.
// An example is: sleep(5*60); and see if bit-for-bit bug goes away.
// If 'sleep(5*60)' says that there was a bug earlier in the code,
// now we can debug the implicit assertion that there should be
// no matching remote send request with a local recv request.
// As an example of this, we can do:
// sleep(5*60); Re-execute STAGE 1 to see what new requests were satisfied.
// Another example is to use MPI_Alltoall or MPI_Allreduce to try
// to discover pending matches of send and recv request that
// were not caught earlier.

// STAGE 2: Assume that all tests that could be completed have been
// completed. So, any remaining messages must be drained
// to the MANA internal buffer.
// There are now no matching remote send requests with local recv requests.
// So we can drain all remaining messages into internal buffers.
// If MPI_Irecv not posted, but msg was sent, use MPI_Iprobe to drain msg
drainRemainingP2pMsgsONLY();
}

void
drainSendRecvOLD()
{
int numReceived = -1;
while (numReceived != 0) {
Expand Down