Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ULFM_PREFIX ?= $(CURDIR)/ompi/install
ULFM_PREFIX ?= /usr/local/
ULFM_FILE = $(ULFM_PREFIX)/bin/mpiexec
CC = $(ULFM_PREFIX)/bin/mpicc

Expand Down
2 changes: 1 addition & 1 deletion legiotest/request/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ int main(int argc, char** argv)

send = rank;
MPI_Barrier(MPI_COMM_WORLD);
if(rank == 4) raise(SIGINT);
if(rank == 2) raise(SIGINT);
int source = (rank-1 < 0 ? size-1 : rank-1);
int destination = (rank+1 == size ? 0 : rank+1);
MPI_Request send_req, recv_req;
Expand Down
1 change: 1 addition & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ set(LIBRARY_SOURCES
"${LIBRARY_SRC_PATH}/session.cpp"
"${LIBRARY_SRC_PATH}/supported_comm.cpp"
"${LIBRARY_SRC_PATH}/utils.cpp"
"${LIBRARY_SRC_PATH}/mpi_wrapper.cpp"
)
if(${WITH_SESSION})
set(LIBRARY_SOURCES ${LIBRARY_SOURCES} "${LIBRARY_SRC_PATH}/session_manager.cpp")
Expand Down
7 changes: 7 additions & 0 deletions lib/include/legio.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@
#define LEGIO_FAILURE_REPAIR_SELF_VALUE 3

void fault_number(MPI_Comm, int*);
// input is a communicator
// output is an integer that indicates how many faults have happened

void who_failed(MPI_Comm, int*, int*);
// input is a comm.
// ypu should first call fault_number to know how much space to allocate for the second int*
// which is a buffer to hold the rank of all the failed processes
// the first int* is the same as the previous function
// which is basically the number of failed processes

int MPIX_Comm_agree_group(MPI_Comm, MPI_Group, int*);

Expand Down
18 changes: 17 additions & 1 deletion lib/src/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ int MPI_Irecv(void* buf,
int source_rank = translate_ranks(source, translated);
if (source_rank == MPI_UNDEFINED)
{
if constexpr (BuildOptions::recv_resiliency)
if constexpr (BuildOptions::recv_resiliency){
rc = MPI_SUCCESS;
}
else
{
legio::log("##### Irecv failed, stopping a node", LogLevel::errors_only);
Expand Down Expand Up @@ -180,3 +181,18 @@ int MPI_Request_free(MPI_Request* request)
Context::get().m_comm.remove_structure(request);
return PMPI_Request_free(request);
}


// to be checked
int MPI_Waitall(int count, MPI_Request *array_of_requests, MPI_Status *array_of_statuses) {
int rc;
int error_flag = 0;
for (int i = 0; i < count; ++i) {
rc = MPI_Wait(&array_of_requests[i], &array_of_statuses[i]);
if (rc != MPI_SUCCESS) {
error_flag++;
}
}
return error_flag;
}

24 changes: 23 additions & 1 deletion lib/src/coll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,26 @@ int MPI_Scan(const void* sendbuf,
else
return rc;
}
}
}

// to be checked
int MPI_Allgather(const void* sendbuf, int sendcount, MPI_Datatype sendtype,
void* recvbuf, int recvcount, MPI_Datatype recvtype,
MPI_Comm comm) {

int rc;
int size;

MPI_Comm_size(comm, &size);

// Gather data from all processes to root process
rc = MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, 0, comm);

// broadcast gathered data from root process to all processes
rc = MPI_Bcast(recvbuf, recvcount * size, recvtype, 0, comm);

return rc;
}



1 change: 1 addition & 0 deletions lib/src/comm_manipulation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void legio::replace_comm(ComplexComm& cur_complex)
PMPI_Comm_free(&new_comm);
else
{
MPIX_Comm_failure_ack(cur_complex.get_alias());
MPI_Comm_set_errhandler(new_comm, MPI_ERRORS_RETURN);
cur_complex.replace_comm(new_comm);
}
Expand Down
216 changes: 216 additions & 0 deletions lib/src/mpi_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#include <signal.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#include "context.hpp"

extern "C" {
#include <legio.h>

void fort_fault_number(MPI_Fint comm, int *size){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
fault_number(c_comm, size);
}

void fort_who_failed(MPI_Fint comm, int* size, int* ranks){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
who_failed(c_comm, size, ranks);
}

void raise_sigint() {
raise(SIGINT);
}

// MPI_INIT
void my_MPI_Init(MPI_Fint comm_w, MPI_Fint comm_s, int *ierr){
MPI_Comm c_comm_w = MPI_Comm_f2c(comm_w);
MPI_Comm c_comm_s = MPI_Comm_f2c(comm_s);
MPI_Comm_set_errhandler(c_comm_w, MPI_ERRORS_RETURN);
MPI_Comm_set_errhandler(c_comm_s, MPI_ERRORS_RETURN);

Context::get().m_comm.add_comm(MPI_COMM_SELF);
Context::get().m_comm.add_comm(MPI_COMM_WORLD);
}

// MPI_COMM_RANK
// checked
void my_MPI_Comm_rank(MPI_Fint comm, int *rank, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Comm_rank(c_comm, rank);
}

// MPI_BARRIER
// checked
void my_MPI_Barrier(MPI_Fint comm, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Barrier(c_comm);
}

// MPI_ABORT
void my_MPI_Abort(MPI_Fint comm, int errorcode, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Abort(c_comm, errorcode);
}

// MPI_COMM_SIZE
void my_MPI_Comm_size(MPI_Fint comm, int *size, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Comm_size(c_comm, size);
}

// MPI_WAITALL
void my_MPI_Waitall(int count, MPI_Fint *array_of_requests, MPI_Fint *array_of_statuses, int *ierr) {
// if the statuses are the output of the function you might want to
// define them as c type inside the function and then convert back to fortran
// after the mpi call
// just like the my_mpi_Wait function
MPI_Request *cmpi_requests = (MPI_Request *)malloc(count * sizeof(MPI_Request));
MPI_Status *cmpi_statuses = (MPI_Status *)malloc(count * sizeof(MPI_Status));
MPI_Status c_status;

// Convert Fortran MPI handles to C MPI handles
for (int i = 0; i < count; ++i) {
cmpi_requests[i] = MPI_Request_f2c(array_of_requests[i]);
MPI_Status_f2c(&array_of_statuses[i], &c_status);
cmpi_statuses[i] = c_status;
}

// Call MPI_Waitall
// int message = 42;
// MPI_Request request;
// MPI_Status status;
// // Initiate a non-blocking send to itself
// MPI_Isend(&message, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &request);
// cmpi_requests[0] = request;

*ierr = MPI_Waitall(count, cmpi_requests, cmpi_statuses);

// convert c requests back to fortran
for (int i = 0; i < count; ++i) {
MPI_Status_c2f(&cmpi_statuses[i], &array_of_statuses[i]);
array_of_requests[i] = MPI_Request_c2f(cmpi_requests[i]);
}

// Free allocated memory
free(cmpi_requests);
free(cmpi_statuses);
}


// MPI_ISEND
void my_MPI_Isend(void *buf, int count, MPI_Fint datatype, int *dest,
int tag, MPI_Fint comm, MPI_Fint *request, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_datatype = MPI_Type_f2c(datatype);
MPI_Request c_request;
*ierr = MPI_Isend(buf, count, c_datatype, *dest, tag, c_comm, &c_request);
*request = MPI_Request_c2f(c_request);
}

// MPI_SEND
void my_MPI_Send(void *buf, int count, MPI_Fint datatype, int *dest,
int tag, MPI_Fint comm, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_datatype = MPI_Type_f2c(datatype);
*ierr = MPI_Send(buf, count, c_datatype, *dest, tag, c_comm);
}

// MPI_IRECV
void my_MPI_Irecv(void *buf, int count, MPI_Fint datatype, int *source,
int tag, MPI_Fint comm, MPI_Fint *request, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_datatype = MPI_Type_f2c(datatype);
MPI_Request c_request;
*ierr = MPI_Irecv(buf, count, c_datatype, *source, tag, c_comm, &c_request);
*request = MPI_Request_c2f(c_request);
}

// MPI_RECV
void my_MPI_Recv(void *buf, int count, MPI_Fint datatype, int *source,
int tag, MPI_Fint comm, MPI_Fint status, int *ierr){
MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_datatype = MPI_Type_f2c(datatype);
MPI_Status c_status;
MPI_Status_f2c(&status, &c_status);
*ierr = MPI_Recv(buf, count, c_datatype, *source, tag, c_comm, &c_status);
}

// MPI_WAIT
void my_MPI_Wait(MPI_Fint request, MPI_Fint status, int *ierr){
MPI_Request c_request = MPI_Request_f2c(request);
MPI_Status c_status;
MPI_Status_f2c(&status, &c_status);
*ierr = MPI_Wait(&c_request, &c_status);
}

// MPI_GATHER
void my_MPI_Gather(void *sendbuf, int sendcount, MPI_Fint sendtype,
void *recvbuf, int recvcount, MPI_Fint recvtype,
int root, MPI_Fint comm, int *ierr){

MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_send_datatype = MPI_Type_f2c(sendtype);
MPI_Datatype c_rec_datatype = MPI_Type_f2c(recvtype);

MPI_Gather(sendbuf, sendcount, c_send_datatype, recvbuf,
recvcount, c_rec_datatype, root, c_comm);

}

// MPI_ALLGATHER
void my_MPI_Allgather(void *sendbuf, int sendcount, MPI_Fint sendtype,
void *recvbuf, int recvcount, MPI_Fint recvtype,
MPI_Fint comm, int *ierr){

MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_send_datatype = MPI_Type_f2c(sendtype);
MPI_Datatype c_rec_datatype = MPI_Type_f2c(recvtype);

*ierr = MPI_Allgather(sendbuf, sendcount, c_send_datatype, recvbuf,
recvcount, c_rec_datatype, c_comm);
}

// MPI_REDUCE
void my_MPI_Reduce(void *sendbuf, void *recvbuf, int count,
MPI_Fint datatype, MPI_Fint op, int root, MPI_Fint comm, int *ierr){

MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_datatype = MPI_Type_f2c(datatype);
MPI_Op c_op = MPI_Op_f2c(op);

*ierr = MPI_Reduce(sendbuf, recvbuf, count, c_datatype, c_op, root, c_comm);
}

// MPI_ALLREDUCE

void my_MPI_Allreduce(void *sendbuf, void *recvbuf, int count,
MPI_Fint datatype, MPI_Fint op, MPI_Fint comm, int *ierr){

MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_datatype = MPI_Type_f2c(datatype);
MPI_Op c_op = MPI_Op_f2c(op);

* ierr = MPI_Allreduce(sendbuf, recvbuf, count, c_datatype, c_op, c_comm);
}


void my_MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Fint sendtype,
int *dest, int sendtag, void* recvbuf, int recvcount,
MPI_Fint recvtype, int *source, int recvtag,
MPI_Fint comm, MPI_Fint status){

MPI_Comm c_comm = MPI_Comm_f2c(comm);
MPI_Datatype c_sendtype = MPI_Type_f2c(sendtype);
MPI_Datatype c_recvtype = MPI_Type_f2c(recvtype);

MPI_Status c_status;
MPI_Status_f2c(&status, &c_status);

MPI_Sendrecv(sendbuf, sendcount, c_sendtype, *dest, sendtag,
recvbuf, recvcount, c_recvtype, *source, recvtag,
c_comm, &c_status);

}

}
1 change: 1 addition & 0 deletions ompi
Submodule ompi added at 8ecda5