Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions bin/mana_launch
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
dir=`dirname $0`

if [ -z "$1" ]; then
echo "USAGE: $0 [--verbose] [DMTCP_OPTIONS ...] [--ckptdir DIR]" \\
echo " MANA_EXECUTABLE"
echo "USAGE: $0 [--verbose] [--timing] [DMTCP_OPTIONS ...]" \\
echo " [--ckptdir DIR] MANA_EXECUTABLE"
echo " For DMTCP options, do: $0 --help"
echo " NOTE: MANA_EXECUTABLE must be compiled with libmpistub.so"
echo " See $dir/../contrib/mpi-proxy-split/test/ for examples."
Expand All @@ -35,6 +35,8 @@ srun_sbatch_found=0
while [ -n "$1" ]; do
if [ "$1" == --verbose ]; then
verbose=1
elif [ "$1" == --timing ]; then
export MANA_TIMING=1
elif [ "$1" == --help ]; then
help=1
elif [ "$1" == srun ] || [ "$1" == sbatch ]; then
Expand All @@ -60,6 +62,13 @@ while [ -n "$1" ]; do
done

if [ "$help" -eq 1 ]; then
echo 'MANA OPTIONS:'
echo '--verbose: Display the underlying DMTCP command and DMTCP_OPTIONS used'
echo ' and other info.'
echo '--timing: Print times to stderr for INIT, EXIT,' \
'and ckkpt-restart events'
echo ' (stays active during both mana_launch and mana_restart)'
echo ''
$dir/dmtcp_launch --help $options
exit 0
fi
Expand Down
66 changes: 66 additions & 0 deletions mpi-proxy-split/mpi-wrappers/mpi_collective_wrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ USER_DEFINED_WRAPPER(int, Reduce_scatter,
// of draining the point-to-point MPI calls. p2p_drain_send_recv.cpp
// cannot use the C version in mpi-wrappers/mpi_collective_p2p.c,
// which would generate extra point-to-point MPI calls.
#ifndef MPI_ALLTOALL_RENDEZVOUS
int
MPI_Alltoall_internal(const void *sendbuf, int sendcount,
MPI_Datatype sendtype, void *recvbuf, int recvcount,
Expand All @@ -392,6 +393,71 @@ MPI_Alltoall_internal(const void *sendbuf, int sendcount,
DMTCP_PLUGIN_ENABLE_CKPT();
return retval;
}
#else
// We are having a hanging issue running user programs under certain situations. In
// order to prevent it there is a temporary work around provided by Yao Xu. To
// manually implement a MPI_Alltoall_internal call forcing rendezvous, the hanging
// can be prevented without noticable performance burden.
int
MPI_Alltoall_internal(const void *sendbuf, int sendcount,
MPI_Datatype sendtype, void *recvbuf, int recvcount,
MPI_Datatype recvtype, MPI_Comm comm)
{
static int MPI_ALLTOALL_TAG = 0;
int retval, comm_size, rank;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &comm_size);
MPI_Aint rlb, slb, recvtype_extent,sendtype_extent;
MPI_Type_get_extent(sendtype, &slb, &sendtype_extent);
MPI_Type_get_extent(recvtype, &rlb, &recvtype_extent);
DMTCP_PLUGIN_DISABLE_CKPT();
MPI_Comm realComm = VIRTUAL_TO_REAL_COMM(comm);
MPI_Datatype realSendType = VIRTUAL_TO_REAL_TYPE(sendtype);
MPI_Datatype realRecvType = VIRTUAL_TO_REAL_TYPE(recvtype);
// FIXME: Ideally, check FORTRAN_MPI_IN_PLACE only in the Fortran wrapper.
if (sendbuf == FORTRAN_MPI_IN_PLACE) {
sendbuf = MPI_IN_PLACE;
}

// With our MPI_Alltoall implementation forcing rendezvous
JUMP_TO_LOWER_HALF(lh_info.fsaddr);
int ii, ss, bblock;
int i;
int dst;
bblock = comm_size;
MPI_Request *reqarray = (MPI_Request *) malloc(2 * bblock * sizeof(MPI_Request *));
MPI_Status *starray = (MPI_Status *) malloc(2 * bblock * sizeof(MPI_Status));
for (ii = 0; ii < comm_size; ii += bblock) {
ss = comm_size - ii < bblock ? comm_size - ii : bblock;
for (i = 0; i < ss; i++) {
dst = (rank + i + ii) % comm_size;
NEXT_FUNC(Irecv)(recvbuf + dst * recvcount * recvtype_extent, recvcount, realRecvType,
dst, MPI_ALLTOALL_TAG, realComm, &reqarray[i]);
}
for (i = 0; i < ss; i++) {
dst = (rank - i - ii + comm_size) % comm_size;
// MPI_Issend starts a nonblocking synchronous send
NEXT_FUNC(Issend)(sendbuf + dst * sendcount * sendtype_extent, sendcount, realSendType,
dst, MPI_ALLTOALL_TAG, realComm, &reqarray[i + ss]);
}
}
int flag = 0;
while (!flag) {
flag = 1;
int status_flag = 0;
for (i = 0; i < 2 * ss; i++) {
retval = NEXT_FUNC(Request_get_status)(reqarray[i], &status_flag, &starray[i]);
flag &= status_flag;
}
}
retval = NEXT_FUNC(Waitall)(2 * ss, reqarray, starray);
free(reqarray);
free(starray);
RETURN_TO_UPPER_HALF();
DMTCP_PLUGIN_ENABLE_CKPT();
return retval;
}
#endif

#ifndef MPI_COLLECTIVE_P2P
USER_DEFINED_WRAPPER(int, Alltoall,
Expand Down
19 changes: 0 additions & 19 deletions mpi-proxy-split/mpi-wrappers/mpi_group_wrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,6 @@ USER_DEFINED_WRAPPER(int, Comm_group, (MPI_Comm) comm, (MPI_Group *) group)
return retval;
}

// Calls MPI_Comm_group to define a new group for internal purposes.
// See: p2p_drain_send_recv.cpp
int
MPI_Comm_internal_virt_group(MPI_Comm comm, MPI_Group *group)
{
int retval;
DMTCP_PLUGIN_DISABLE_CKPT();
MPI_Comm realComm = VIRTUAL_TO_REAL_COMM(comm);
JUMP_TO_LOWER_HALF(lh_info.fsaddr);
retval = NEXT_FUNC(Comm_group)(realComm, group);
RETURN_TO_UPPER_HALF();
if (retval == MPI_SUCCESS) {
MPI_Group virtGroup = ADD_NEW_GROUP(*group);
*group = virtGroup;
}
DMTCP_PLUGIN_ENABLE_CKPT();
return retval;
}

USER_DEFINED_WRAPPER(int, Group_size, (MPI_Group) group, (int *) size)
{
int retval;
Expand Down
49 changes: 49 additions & 0 deletions mpi-proxy-split/mpi_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1034,11 +1034,51 @@ save_cartesian_properties(const char *filename)
}
#endif

void printElapsedTime(time_t origin_time, const char *msg) {
char time_string[30];
time_t cur_time = time(NULL);
time_t delta_time = cur_time - origin_time;
strftime(time_string, sizeof(time_string),
"%H:%M:%S", localtime(&cur_time));
if (msg != NULL) {
fprintf(stderr, "%s: *** MANA: %s\n", time_string, msg);
if (origin_time == 0) { return; } // We only print msg, not elapsed time
fprintf(stderr, "%*c ", (int)strlen(time_string), ' ');
} else {
fprintf(stderr, "%s: ", time_string);
}
fprintf(stderr, "Elapsed time since INIT/RESTART: %ld seconds\n", delta_time);
return;
}

// FIXME: Use 'getenv("MANA_TIMING")' instead in 'if' stmt.
// In bin/mana_launch, add '--timing' flag that sets this env. var.
void printEventToStderr(const char *msg) {
if (!getenv("MANA_TIMING")) { return; }

static time_t init_time = 0;
if (init_time == 0 && strstr(msg, "INIT")) {
init_time = time(NULL);
printElapsedTime(0, msg);
return; // Only one process should print, but we don't yet have a rank.
}
int rank = g_world_rank; // MPI_Comm_rank would fail at DMTCP_EVENT_EXIT.
if (rank == 0) {
if (strstr(msg, "RESTART")) {
init_time = time(NULL);
printElapsedTime(0, msg);
} else {
printElapsedTime(init_time, msg);
}
}
}

static void
mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
{
switch (event) {
case DMTCP_EVENT_INIT: {
printEventToStderr("EVENT_INIT"); // We don't have a rank. So no printing.
JTRACE("*** DMTCP_EVENT_INIT");
JASSERT(dmtcp_get_real_tid != NULL);
initialize_signal_handlers();
Expand Down Expand Up @@ -1077,6 +1117,7 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
break;
}
case DMTCP_EVENT_EXIT: {
printEventToStderr("EVENT_EXIT");
JTRACE("*** DMTCP_EVENT_EXIT");
seq_num_destroy();
break;
Expand Down Expand Up @@ -1105,15 +1146,18 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
}

case DMTCP_EVENT_PRESUSPEND: {
printEventToStderr("EVENT_PRESUSPEND (finish collective op's)");
mana_state = CKPT_COLLECTIVE;
// preSuspendBarrier() will send coord response and get worker state.
// FIXME: See commant at: dmtcpplugin.cpp:'case DMTCP_EVENT_PRESUSPEND'
drain_mpi_collective();
openCkptFileFds();
printEventToStderr("EVENT_PRESUSPEND (done)");
break;
}

case DMTCP_EVENT_PRECHECKPOINT: {
printEventToStderr("EVENT_PRECHECKPOINT (drain send/recv)");
recordMpiInitMaps();
recordOpenFds();
dmtcp_local_barrier("MPI:GetLocalLhMmapList");
Expand All @@ -1138,20 +1182,24 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
const char *file = get_cartesian_properties_file_name();
save_cartesian_properties(file);
#endif
printEventToStderr("EVENT_PRECHECKPOINT (done)");
break;
}

case DMTCP_EVENT_RESUME: {
printEventToStderr("EVENT_RESUME");
processingOpenCkpFileFds = false;
dmtcp_local_barrier("MPI:Reset-Drain-Send-Recv-Counters");
resetDrainCounters(); // p2p_drain_send_recv.cpp
seq_num_reset(RESUME);
dmtcp_local_barrier("MPI:seq_num_reset");
mana_state = RUNNING;
printEventToStderr("EVENT_RESUME (done)");
break;
}

case DMTCP_EVENT_RESTART: {
printEventToStderr("EVENT_RESTART");
processingOpenCkpFileFds = false;
logCkptFileFds();

Expand Down Expand Up @@ -1186,6 +1234,7 @@ mpi_plugin_event_hook(DmtcpEvent_t event, DmtcpEventData_t *data)
restore_mpi_files(file);
dmtcp_local_barrier("MPI:Restore-MPI-Files");
mana_state = RUNNING;
printEventToStderr("EVENT_RESTART (done)");
break;
}

Expand Down
20 changes: 11 additions & 9 deletions mpi-proxy-split/p2p_drain_send_recv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ extern int MPI_Comm_create_group_internal(MPI_Comm comm, MPI_Group group,
extern int MPI_Comm_free_internal(MPI_Comm *comm);
extern int MPI_Comm_group_internal(MPI_Comm comm, MPI_Group *group);
extern int MPI_Group_free_internal(MPI_Group *group);
extern int MPI_Comm_internal_virt_group(MPI_Comm comm, MPI_Group *group);
int *g_sendBytesByRank; // Number of bytes sent to other ranks
int *g_rsendBytesByRank; // Number of bytes sent to other ranks by MPI_rsend
int *g_bytesSentToUsByRank; // Number of bytes other ranks sent to us
Expand Down Expand Up @@ -76,7 +75,15 @@ registerLocalSendsAndRecvs()
// Get a copy of MPI_COMM_WORLD
MPI_Group group_world;
MPI_Comm mana_comm;
MPI_Comm_internal_virt_group(MPI_COMM_WORLD, &group_world);
// MPI_Comm_group creates the same real id (and therefore, virtual id) for
// identical calls. If the application already has this virtual id, no extra
// resource is created.
//
// See mpi-proxy-split/virtual-ids.h, onCreate(id), usage of realIdExists.
//
// FIXME: This Comm_group can cause an extra LOG_CALL and LOG_REPLAY. But, it
// needs to happen if the application moves to create the same vid /later/.
MPI_Comm_group(MPI_COMM_WORLD, &group_world);
MPI_Comm_create_group_internal(MPI_COMM_WORLD, group_world, 1, &mana_comm);

// broadcast sendBytes and recvBytes
Expand All @@ -85,15 +92,10 @@ registerLocalSendsAndRecvs()
g_bytesSentToUsByRank[g_world_rank] = 0;

// Free resources
// mana_comm is a real id, and MPI_Comm_free_internal expects a
// virtual id, but it works out because virtualToReal(real_id) is
// defined to be real_id.
MPI_Comm_free_internal(&mana_comm);

// Because group_world is a virtual group, we have to free both its
// virtual and real id to clean up correctly.
MPI_Group_free_internal(&group_world);
REMOVE_OLD_GROUP(group_world);
// We do NOT free group_world, because if the application also made this
// call, there is a collision.
}

// status was received by MPI_Iprobe
Expand Down
26 changes: 9 additions & 17 deletions restart_plugin/dmtcp_restart_plugin.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "workerstate.h"
#include "dmtcp_restart.h"
#include "jassert.h"
#include "jconvert.h"
#include "jfilesystem.h"
#include "util.h"

Expand Down Expand Up @@ -39,29 +40,20 @@ void dmtcp_restart_plugin(const string &restartDir,
// Also, create the DMTCP shared-memory area.
t->initialize();

vector<char *> mtcpArgs = getMtcpArgs();
mtcpArgs.push_back((char *)"--mpi");

const map<string, string> &kvmap = t->getKeyValueMap();

mtcpArgs.push_back((char*) "--minLibsStart");
mtcpArgs.push_back((char*) kvmap.at("MANA_MinLibsStart").c_str());

mtcpArgs.push_back((char*) "--maxLibsEnd");
mtcpArgs.push_back((char*) kvmap.at("MANA_MaxLibsEnd").c_str());

mtcpArgs.push_back((char*) "--minHighMemStart");
mtcpArgs.push_back((char*) kvmap.at("MANA_MinHighMemStart").c_str());
publishKeyValueMapToMtcpEnvironment(t);

if (!restartDir.empty()) {
mtcpArgs.push_back((char *)"--restartdir");
mtcpArgs.push_back((char *)restartDir.c_str());
setenv("MANA_RestartDir=", restartDir.c_str(), 1);
}

for (const string &image : ckptImages) {
mtcpArgs.push_back((char*) image.c_str());
for (size_t i = 0; i < ckptImages.size(); i++) {
string key = "MANA_CkptImage_Rank_" + jalib::XToString(i);
setenv(key.c_str(), ckptImages[i].c_str(), 1);
}

vector<char *> mtcpArgs = getMtcpArgs();
mtcpArgs.push_back((char *)"--mpi");

mtcpArgs.push_back(NULL);
execvp(mtcpArgs[0], &mtcpArgs[0]);
JASSERT(false)(mtcpArgs[0]).Text("execvp failed!");
Expand Down
Loading