diff --git a/mpi-proxy-split/mpi-wrappers/mpi_collective_wrappers.cpp b/mpi-proxy-split/mpi-wrappers/mpi_collective_wrappers.cpp index c0611bec3..c5ec0d134 100644 --- a/mpi-proxy-split/mpi-wrappers/mpi_collective_wrappers.cpp +++ b/mpi-proxy-split/mpi-wrappers/mpi_collective_wrappers.cpp @@ -170,11 +170,69 @@ USER_DEFINED_WRAPPER(int, Ibarrier, (MPI_Comm) comm, (MPI_Request *) request) return retval; } +#if 0 +This version, MPI_Allreduce_reproducible, can be called from +the MPI_Allreduce wrapper and returned. If desired, it could be +called selectively on certain sizes or certain types or certain op's. + +Use MPI_Type_get_envelope and MPI_Type_get_contents + to discover if this is a dup of MPI_DOUBLE + +https://www.mcs.anl.gov/papers/P4093-0713_1.pdf + On the Reproducibility of MPI Reduction Operations + +https://www.sciencedirect.com/science/article/pii/S0167819121000612 + An optimisation of allreduce communication in message-passing systems + +MPI standard: +Advice to users. Some applications may not be able to ignore the +non-associative nature of floating-point operations or may use +user-defined operations (see Section 5.9.5) that require a special +reduction order and cannot be treated as associative. Such applications +should enforce the order of evaluation explicitly. For example, in the +case of operations that require a strict left-to-right (or right-to-left) +evaluation order, this could be done by gathering all operands at a single +process (e.g., with MPI_GATHER), applying the reduction operation in the +desired order (e.g., with MPI_REDUCE_LOCAL), and if needed, broadcast or +scatter the result to the other processes (e.g., with MPI_BCAST). (End +of advice to users.) + +And note that MPI_Waitany can receive messages non-determistically. +#endif + +int MPI_Allreduce_reproducible(const void *sendbuf, void *recvbuf, int count, + MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { +# define MAX_ALL_SENDBUF_SIZE (1024*1024*16) /* 15 MB */ + // We use 'static' becuase we don't want the overhead of the compiler + // initializing these to zero each time the function is called. + static char tmpbuf[MAX_ALL_SENDBUF_SIZE]; + int root = 0; + int comm_rank; + int comm_size; + int type_size; + + MPI_Comm_rank(comm, &comm_rank); + MPI_Comm_size(comm, &comm_size); + MPI_Type_size(datatype, &type_size); + + JASSERT(count * comm_size * type_size <= MAX_ALL_SENDBUF_SIZE); + + MPI_Gather(sendbuf, count, datatype, tmpbuf, count * comm_size, + datatype, root, comm); + if (comm_rank == root) { + MPI_Reduce_local(tmpbuf, recvbuf, count, datatype, op); + } + return MPI_Bcast(recvbuf, count, datatype, root, comm); +} + USER_DEFINED_WRAPPER(int, Allreduce, (const void *) sendbuf, (void *) recvbuf, (int) count, (MPI_Datatype) datatype, (MPI_Op) op, (MPI_Comm) comm) { + return MPI_Allreduce_reproducible(sendbuf, recvbuf, count, datatype, + op, comm); + bool passthrough = false; commit_begin(comm, passthrough); int retval;