Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions mpi-proxy-split/mpi-wrappers/mpi_collective_wrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,69 @@ USER_DEFINED_WRAPPER(int, Ibarrier, (MPI_Comm) comm, (MPI_Request *) request)
return retval;
}

#if 0
This version, MPI_Allreduce_reproducible, can be called from
the MPI_Allreduce wrapper and returned. If desired, it could be
called selectively on certain sizes or certain types or certain op's.

Use MPI_Type_get_envelope and MPI_Type_get_contents
to discover if this is a dup of MPI_DOUBLE

https://www.mcs.anl.gov/papers/P4093-0713_1.pdf
On the Reproducibility of MPI Reduction Operations

https://www.sciencedirect.com/science/article/pii/S0167819121000612
An optimisation of allreduce communication in message-passing systems

MPI standard:
Advice to users. Some applications may not be able to ignore the
non-associative nature of floating-point operations or may use
user-defined operations (see Section 5.9.5) that require a special
reduction order and cannot be treated as associative. Such applications
should enforce the order of evaluation explicitly. For example, in the
case of operations that require a strict left-to-right (or right-to-left)
evaluation order, this could be done by gathering all operands at a single
process (e.g., with MPI_GATHER), applying the reduction operation in the
desired order (e.g., with MPI_REDUCE_LOCAL), and if needed, broadcast or
scatter the result to the other processes (e.g., with MPI_BCAST). (End
of advice to users.)

And note that MPI_Waitany can receive messages non-determistically.
#endif

int MPI_Allreduce_reproducible(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
# define MAX_ALL_SENDBUF_SIZE (1024*1024*16) /* 15 MB */
// We use 'static' becuase we don't want the overhead of the compiler
// initializing these to zero each time the function is called.
static char tmpbuf[MAX_ALL_SENDBUF_SIZE];
int root = 0;
int comm_rank;
int comm_size;
int type_size;

MPI_Comm_rank(comm, &comm_rank);
MPI_Comm_size(comm, &comm_size);
MPI_Type_size(datatype, &type_size);

JASSERT(count * comm_size * type_size <= MAX_ALL_SENDBUF_SIZE);

MPI_Gather(sendbuf, count, datatype, tmpbuf, count * comm_size,
datatype, root, comm);
if (comm_rank == root) {
MPI_Reduce_local(tmpbuf, recvbuf, count, datatype, op);
}
return MPI_Bcast(recvbuf, count, datatype, root, comm);
}

USER_DEFINED_WRAPPER(int, Allreduce,
(const void *) sendbuf, (void *) recvbuf,
(int) count, (MPI_Datatype) datatype,
(MPI_Op) op, (MPI_Comm) comm)
{
return MPI_Allreduce_reproducible(sendbuf, recvbuf, count, datatype,
op, comm);

bool passthrough = false;
commit_begin(comm, passthrough);
int retval;
Expand Down