diff --git a/debian/librdmacm1.symbols b/debian/librdmacm1.symbols index 8580fb2dd..34cd16268 100644 --- a/debian/librdmacm1.symbols +++ b/debian/librdmacm1.symbols @@ -5,10 +5,14 @@ librdmacm.so.1 librdmacm1 #MINVER# RDMACM_1.2@RDMACM_1.2 23 RDMACM_1.3@RDMACM_1.3 31 RDMACM_1.4@RDMACM_1.4 60 + RDMACM_1.5@RDMACM_1.5 61 raccept@RDMACM_1.0 1.0.16 rbind@RDMACM_1.0 1.0.16 rclose@RDMACM_1.0 1.0.16 rconnect@RDMACM_1.0 1.0.16 + repoll_create1@RDMACM_1.5 61 + repoll_ctl@RDMACM_1.5 61 + repoll_wait@RDMACM_1.5 61 rdma_accept@RDMACM_1.0 1.0.15 rdma_ack_cm_event@RDMACM_1.0 1.0.15 rdma_bind_addr@RDMACM_1.0 1.0.15 diff --git a/librdmacm/CMakeLists.txt b/librdmacm/CMakeLists.txt index ea1d1550f..0f8f6437b 100644 --- a/librdmacm/CMakeLists.txt +++ b/librdmacm/CMakeLists.txt @@ -11,7 +11,7 @@ publish_headers(infiniband rdma_library(rdmacm librdmacm.map # See Documentation/versioning.md - 1 1.4.${PACKAGE_VERSION} + 1 1.5.${PACKAGE_VERSION} acm.c addrinfo.c cma.c @@ -25,12 +25,69 @@ target_link_libraries(rdmacm LINK_PRIVATE ${RT_LIBRARIES} ) +# When building without LFS (_FILE_OFFSET_BITS != 64), we may need to wrap +# fcntl64/sendfile64. Only do so if the system libc actually provides them +set(CMAKE_REQUIRED_QUIET 1) +set(SAVE_DEFS "${CMAKE_REQUIRED_DEFINITIONS}") +set(CMAKE_REQUIRED_DEFINITIONS "") +check_c_source_compiles(" +#include +#include +int main(void) { + (void)&fcntl64; + (void)&sendfile64; + return 0; +} +" RDMA_PRELOAD_HAVE_64) +set(CMAKE_REQUIRED_DEFINITIONS "${SAVE_DEFS}") +set(CMAKE_REQUIRED_QUIET 0) +if(RDMA_PRELOAD_HAVE_64) + set(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS 1) +else() + set(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS 0) +endif() + +# Detect at configure time if fcntl64/sendfile64 is declared so we can +# add our forward declarations only when needed. +set(RDMA_PRELOAD_FCNTL64_IN_HEADER 0) +set(RDMA_PRELOAD_SENDFILE64_IN_HEADER 0) +if(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS) + set(CMAKE_REQUIRED_QUIET 1) + set(SAVE_FLAGS "${CMAKE_REQUIRED_FLAGS}") + set(CMAKE_REQUIRED_FLAGS "${CMAKE_C_FLAGS}") + check_c_source_compiles(" +#define _GNU_SOURCE +#include +int fcntl64(int socket, int cmd, ...) { (void)socket;(void)cmd; return 0; } +" RDMA_PRELOAD_FCNTL64_DECLARED_IN_HEADER) + if(RDMA_PRELOAD_FCNTL64_DECLARED_IN_HEADER) + set(RDMA_PRELOAD_FCNTL64_IN_HEADER 1) + endif() + check_c_source_compiles(" +#define _GNU_SOURCE +#include +#include +ssize_t sendfile64(int out_fd, int in_fd, off64_t *offset64, size_t count) { + (void)out_fd;(void)in_fd;(void)offset64;(void)count; return 0; +} +" RDMA_PRELOAD_SENDFILE64_DECLARED_IN_HEADER) + if(RDMA_PRELOAD_SENDFILE64_DECLARED_IN_HEADER) + set(RDMA_PRELOAD_SENDFILE64_IN_HEADER 1) + endif() + set(CMAKE_REQUIRED_FLAGS "${SAVE_FLAGS}") + set(CMAKE_REQUIRED_QUIET 0) +endif() + # The preload library is a bit special, it needs to be open coded # Since it is a LD_PRELOAD it has no soname, and is installed in sub dir add_library(rspreload MODULE preload.c indexer.c ) +target_compile_definitions(rspreload PRIVATE + RDMA_PRELOAD_HAVE_64=${RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS} + RDMA_PRELOAD_FCNTL64_IN_HEADER=${RDMA_PRELOAD_FCNTL64_IN_HEADER} + RDMA_PRELOAD_SENDFILE64_IN_HEADER=${RDMA_PRELOAD_SENDFILE64_IN_HEADER}) # Even though this is a module we still want to use Wl,--no-undefined set_target_properties(rspreload PROPERTIES LINK_FLAGS ${CMAKE_SHARED_LINKER_FLAGS}) set_target_properties(rspreload PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${BUILD_LIB}") diff --git a/librdmacm/cma.h b/librdmacm/cma.h index e14c51778..92225b56e 100644 --- a/librdmacm/cma.h +++ b/librdmacm/cma.h @@ -96,6 +96,7 @@ int ucma_init(void); extern int af_ib_support; #define RAI_ROUTEONLY 0x01000000 +#define EPOLL_FLAG 0x80000000 void ucma_ib_init(void); void ucma_ib_cleanup(void); diff --git a/librdmacm/librdmacm.map b/librdmacm/librdmacm.map index 76853f2f7..b374a49e1 100644 --- a/librdmacm/librdmacm.map +++ b/librdmacm/librdmacm.map @@ -94,3 +94,10 @@ RDMACM_1.4 { rdma_resolve_addrinfo; rdma_write_cm_event; } RDMACM_1.3; + +RDMACM_1.5 { + global: + repoll_create1; + repoll_ctl; + repoll_wait; +} RDMACM_1.4; diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index 67ecf33b8..7151f76f0 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -4,11 +4,14 @@ the signature this will go sideways.. */ global: accept; + accept4; bind; close; connect; + dup; dup2; fcntl; + fcntl64; getpeername; getsockname; getsockopt; @@ -22,6 +25,7 @@ select; send; sendfile; + sendfile64; sendmsg; sendto; setsockopt; @@ -29,5 +33,9 @@ socket; write; writev; + epoll_create; + epoll_create1; + epoll_ctl; + epoll_wait; local: *; }; diff --git a/librdmacm/man/rsocket.7.in b/librdmacm/man/rsocket.7.in index 7dc479ece..802d44069 100644 --- a/librdmacm/man/rsocket.7.in +++ b/librdmacm/man/rsocket.7.in @@ -1,5 +1,5 @@ .\" Licensed under the OpenIB.org BSD license (FreeBSD Variant) - See COPYING.md -.TH "RSOCKET" 7 "2019-04-16" "librdmacm" "Librdmacm Programmer's Manual" librdmacm +.TH "RSOCKET" 7 "2024-12-24" "librdmacm" "Librdmacm Programmer's Manual" librdmacm .SH NAME rsocket \- RDMA socket API .SH SYNOPSIS @@ -30,6 +30,7 @@ rpoll, rselect rgetpeername, rgetsockname .P rsetsockopt, rgetsockopt, rfcntl +repoll_create1, repoll_ctl, repoll_wait .P Functions take the same parameters as that used for sockets. The follow capabilities and flags are supported at this time: @@ -151,6 +152,8 @@ wake_up_interval - maximum number of milliseconds to block in poll. This value is used to safe guard against potential application hangs in rpoll(). .P +max_events - maximum number of events for the epoll thread to handle for an epoll_instance. +.P All configuration files should contain a single integer value. Values may be set by issuing a command similar to the following example. .P diff --git a/librdmacm/preload.c b/librdmacm/preload.c index b3175dd5d..376f22ba7 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -59,11 +59,23 @@ #include "cma.h" #include "indexer.h" +//Whether to compile fcntl64/sendfile64 wrappers. Need both: +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) +#if !RDMA_PRELOAD_FCNTL64_IN_HEADER +int fcntl64(int socket, int cmd, ... /* arg */); +#endif +#if !RDMA_PRELOAD_SENDFILE64_IN_HEADER +ssize_t sendfile64(int out_fd, int in_fd, off64_t *offset64, size_t count); +#endif +#endif + struct socket_calls { int (*socket)(int domain, int type, int protocol); int (*bind)(int socket, const struct sockaddr *addr, socklen_t addrlen); int (*listen)(int socket, int backlog); int (*accept)(int socket, struct sockaddr *addr, socklen_t *addrlen); + int (*accept4)(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags); int (*connect)(int socket, const struct sockaddr *addr, socklen_t addrlen); ssize_t (*recv)(int socket, void *buf, size_t len, int flags); ssize_t (*recvfrom)(int socket, void *buf, size_t len, int flags, @@ -87,16 +99,33 @@ struct socket_calls { int (*getsockopt)(int socket, int level, int optname, void *optval, socklen_t *optlen); int (*fcntl)(int socket, int cmd, ... /* arg */); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ + (!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + int (*fcntl64)(int socket, int cmd, ... /* arg */); +#endif + int (*dup)(int oldfd); int (*dup2)(int oldfd, int newfd); ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + ssize_t (*sendfile64)(int out_fd, int in_fd, off64_t *offset64, size_t count); +#endif int (*fxstat)(int ver, int fd, struct stat *buf); + int (*epoll_create)(int size); + int (*epoll_create1)(int flags); + int (*epoll_ctl)(int epfd, int op, int fd, struct epoll_event *event); + int (*epoll_wait)(int epfd, struct epoll_event *events, int maxevents, int timeout); }; static struct socket_calls real; static struct socket_calls rs; static struct index_map idm; +static struct index_map ridm; +static struct index_map ep_idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t ep_mut = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t rs_mut = PTHREAD_MUTEX_INITIALIZER; static int sq_size; static int rq_size; @@ -389,6 +418,7 @@ static void init_preload(void) real.bind = dlsym(RTLD_NEXT, "bind"); real.listen = dlsym(RTLD_NEXT, "listen"); real.accept = dlsym(RTLD_NEXT, "accept"); + real.accept4 = dlsym(RTLD_NEXT, "accept4"); real.connect = dlsym(RTLD_NEXT, "connect"); real.recv = dlsym(RTLD_NEXT, "recv"); real.recvfrom = dlsym(RTLD_NEXT, "recvfrom"); @@ -408,9 +438,22 @@ static void init_preload(void) real.setsockopt = dlsym(RTLD_NEXT, "setsockopt"); real.getsockopt = dlsym(RTLD_NEXT, "getsockopt"); real.fcntl = dlsym(RTLD_NEXT, "fcntl"); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + real.fcntl64 = dlsym(RTLD_NEXT, "fcntl64"); +#endif + real.dup = dlsym(RTLD_NEXT, "dup"); real.dup2 = dlsym(RTLD_NEXT, "dup2"); real.sendfile = dlsym(RTLD_NEXT, "sendfile"); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + real.sendfile64 = dlsym(RTLD_NEXT, "sendfile64"); +#endif real.fxstat = dlsym(RTLD_NEXT, "__fxstat"); + real.epoll_create = dlsym(RTLD_NEXT, "epoll_create"); + real.epoll_create1 = dlsym(RTLD_NEXT, "epoll_create1"); + real.epoll_ctl = dlsym(RTLD_NEXT, "epoll_ctl"); + real.epoll_wait = dlsym(RTLD_NEXT, "epoll_wait"); rs.socket = dlsym(RTLD_DEFAULT, "rsocket"); rs.bind = dlsym(RTLD_DEFAULT, "rbind"); @@ -620,6 +663,37 @@ int accept(int socket, struct sockaddr *addr, socklen_t *addrlen) } } +int accept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + int cur_flags = 0; + int fd = accept(socket, addr, addrlen); + + if (fd < 0) + return fd; + if (flags & SOCK_NONBLOCK) { + cur_flags = fcntl(fd, F_GETFL); + + if (cur_flags != -1) + cur_flags = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + + if (cur_flags == -1) + goto close; + } + + if (flags & SOCK_CLOEXEC) { + cur_flags = fcntl(fd, F_GETFD); + if (cur_flags != -1) + cur_flags = fcntl(fd, F_SETFD, flags | FD_CLOEXEC); + if (cur_flags == -1) + goto close; + } + return fd; +close: + close(fd); + return -1; +} + + /* * We can't fork RDMA connections and pass them from the parent to the child * process. Instead, we need to establish the RDMA connection after calling @@ -889,6 +963,22 @@ static struct pollfd *fds_alloc(nfds_t nfds) return rfds; } +static int *fds_r_alloc(nfds_t nfds) +{ + static __thread int *rfds_r; + static __thread nfds_t rnfds; + + if (nfds > rnfds) { + if (rfds_r) + free(rfds_r); + + rfds_r = malloc(sizeof(*rfds_r) * nfds); + rnfds = rfds_r ? nfds : 0; + } + + return rfds_r; +} + int poll(struct pollfd *fds, nfds_t nfds, int timeout) { struct pollfd *rfds; @@ -922,7 +1012,7 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout) } static void select_to_rpoll(struct pollfd *fds, int *nfds, - fd_set *readfds, fd_set *writefds, fd_set *exceptfds) + fd_set *readfds, fd_set *writefds, fd_set *exceptfds, int *fds_r) { int fd, events, i = 0; @@ -933,7 +1023,8 @@ static void select_to_rpoll(struct pollfd *fds, int *nfds, if (events || (exceptfds && FD_ISSET(fd, exceptfds))) { fds[i].fd = fd_getd(fd); - fds[i++].events = events; + fds[i].events = events; + fds_r[i++] = fd; } } @@ -941,30 +1032,27 @@ static void select_to_rpoll(struct pollfd *fds, int *nfds, } static int rpoll_to_select(struct pollfd *fds, int nfds, - fd_set *readfds, fd_set *writefds, fd_set *exceptfds) + fd_set *readfds, fd_set *writefds, fd_set *exceptfds, int *fds_r) { - int fd, rfd, i, cnt = 0; + int i, cnt = 0; - for (i = 0, fd = 0; i < nfds; fd++) { - rfd = fd_getd(fd); - if (rfd != fds[i].fd) - continue; + for (i = 0; i < nfds; i++) { + assert(fds[i].fd == fd_getd(fds_r[i])); if (readfds && (fds[i].revents & POLLIN)) { - FD_SET(fd, readfds); + FD_SET(fds_r[i], readfds); cnt++; } if (writefds && (fds[i].revents & POLLOUT)) { - FD_SET(fd, writefds); + FD_SET(fds_r[i], writefds); cnt++; } if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) { - FD_SET(fd, exceptfds); + FD_SET(fds_r[i], exceptfds); cnt++; } - i++; } return cnt; @@ -980,12 +1068,17 @@ int select(int nfds, fd_set *readfds, fd_set *writefds, { struct pollfd *fds; int ret; + int *fds_r; fds = fds_alloc(nfds); if (!fds) return ERR(ENOMEM); - select_to_rpoll(fds, &nfds, readfds, writefds, exceptfds); + fds_r = fds_r_alloc(nfds); + if (!fds_r) + return ERR(ENOMEM); + + select_to_rpoll(fds, &nfds, readfds, writefds, exceptfds, fds_r); ret = rpoll(fds, nfds, rs_convert_timeout(timeout)); if (readfds) @@ -996,7 +1089,7 @@ int select(int nfds, fd_set *readfds, fd_set *writefds, FD_ZERO(exceptfds); if (ret > 0) - ret = rpoll_to_select(fds, nfds, readfds, writefds, exceptfds); + ret = rpoll_to_select(fds, nfds, readfds, writefds, exceptfds, fds_r); return ret; } @@ -1109,6 +1202,56 @@ int fcntl(int socket, int cmd, ... /* arg */) return ret; } +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) +int fcntl64(int socket, int cmd, ... /* arg */) +{ + va_list args; + long lparam; + void *pparam; + int fd, ret; + + init_preload(); + va_start(args, cmd); + switch (cmd) { + case F_GETFD: + case F_GETFL: + case F_GETOWN: + case F_GETSIG: + case F_GETLEASE: + ret = (fd_get(socket, &fd) == fd_rsocket) ? + rfcntl(fd, cmd) : real.fcntl64(fd, cmd); + break; + case F_DUPFD: + /*case F_DUPFD_CLOEXEC:*/ + case F_SETFD: + case F_SETFL: + case F_SETOWN: + case F_SETSIG: + case F_SETLEASE: + case F_NOTIFY: + lparam = va_arg(args, long); + ret = (fd_get(socket, &fd) == fd_rsocket) ? + rfcntl(fd, cmd, lparam) : real.fcntl64(fd, cmd, lparam); + break; + default: + pparam = va_arg(args, void *); + ret = (fd_get(socket, &fd) == fd_rsocket) ? + rfcntl(fd, cmd, pparam) : real.fcntl64(fd, cmd, pparam); + break; + } + va_end(args); + return ret; +} +#endif + +int dup(int oldfd) +{ + int new_fd = fcntl(oldfd, F_DUPFD, 0); + + return dup2(oldfd, new_fd); +} + /* * dup2 is not thread safe */ @@ -1181,6 +1324,29 @@ ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count) return ret; } +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) +ssize_t sendfile64(int out_fd, int in_fd, off64_t *offset64, size_t count) +{ + void *file_addr; + int fd; + size_t ret; + + if (fd_get(out_fd, &fd) != fd_rsocket) + return real.sendfile64(fd, in_fd, offset64, count); + + file_addr = mmap(NULL, count, PROT_READ, 0, in_fd, offset64 ? *offset64 : 0); + if (file_addr == (void *) -1) + return -1; + + ret = rwrite(fd, file_addr, count); + if ((ret > 0) && offset64) + lseek(in_fd, ret, SEEK_CUR); + munmap(file_addr, count); + return ret; +} +#endif + int __fxstat(int ver, int socket, struct stat *buf) { int fd, ret; @@ -1195,3 +1361,131 @@ int __fxstat(int ver, int socket, struct stat *buf) } return ret; } + +int epoll_create(int size) +{ + if (size <= 0) + return ERR(EINVAL); + + return epoll_create1(0); +} + +int epoll_create1(int flags) +{ + init_preload(); + + int epfd = real.epoll_create1(flags); + int ep_reg = real.epoll_create1(flags); + struct epoll_event event; + + if (epfd < 0 || ep_reg < 0) + return -1; + + event.events = EPOLLIN | EPOLLOUT; + event.data.fd = ep_reg; + + (void)real.epoll_ctl(epfd, EPOLL_CTL_ADD, ep_reg, &event); + pthread_mutex_lock(&ep_mut); + if (idm_set(&ep_idm, epfd, (void *)(uintptr_t)ep_reg) < 0) { + pthread_mutex_unlock(&ep_mut); + close(epfd); + close(ep_reg); + return -1; + } + + pthread_mutex_unlock(&ep_mut); + return repoll_create1(epfd); +} + +int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) +{ + int ep_reg, rfd; + + if (fd < 0) + return ERR(EBADF); + if (op & EPOLL_FLAG) + return real.epoll_ctl(epfd, op & ~EPOLL_FLAG, fd, event); + + if ((fd == epfd) || (event->events & EPOLLET)) + return ERR(EINVAL); + + if (!(fd_gett(fd) == fd_rsocket)) { + pthread_mutex_lock(&ep_mut); + ep_reg = (uintptr_t)idm_at(&ep_idm, epfd); + pthread_mutex_unlock(&ep_mut); + + if (!ep_reg) + return ERR(ENOENT); + return real.epoll_ctl(ep_reg, op, fd, event); + } + + rfd = fd_getd(fd); + if (op == EPOLL_CTL_ADD) { + pthread_mutex_lock(&rs_mut); + if (idm_set(&ridm, rfd, (void *)(uintptr_t)fd) < 0) { + pthread_mutex_unlock(&rs_mut); + return ERR(EFAULT); + } + + pthread_mutex_unlock(&rs_mut); + } + + return repoll_ctl(epfd, op, rfd, event); +} + +int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) +{ + struct epoll_event ev; + int ret, reg_count = 0, count = 0; + int ep_reg = (uintptr_t)idm_at(&ep_idm, epfd); + int fd; + + // Defensive check: events must be valid when maxevents > 0 + { + struct epoll_event *evp = events; + + if (maxevents > 0 && !evp) + return ERR(EFAULT); + } + + //no events requested + if (maxevents <= 0) + return ERR(EINVAL); + + count = repoll_wait(epfd, events, maxevents, 0); + for (int i = 0; i < count; i++) { + fd = (uintptr_t)idm_lookup(&ridm, events[i].data.fd); + if (!fd) + return -1; + events[i].data.fd = fd; + } + + if ((count < maxevents) && (ep_reg >= 0)) { + reg_count = real.epoll_wait(ep_reg, events + count, maxevents-count, 0); + if (reg_count < 0) + return reg_count; + } + + if (count + reg_count) + return count + reg_count; + + ret = real.epoll_wait(epfd, &ev, 1, timeout); + if (!ret) + return ret; + + count = repoll_wait(epfd, events, maxevents, 0); + for (int j = 0; j < count; j++) { + fd = (uintptr_t)idm_lookup(&ridm, events[j].data.fd); + if (!fd) + return -1; + events[j].data.fd = fd; + } + + if ((count < maxevents) && (ep_reg >= 0)) { + reg_count = real.epoll_wait(ep_reg, events + count, maxevents-count, 0); + if (reg_count < 0) + return reg_count; + } + + return count + reg_count; +} diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index 005bd0be8..06a1da679 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -135,7 +135,8 @@ static uint16_t def_rqsize = 384; static uint32_t def_mem = (1 << 17); static uint32_t def_wmem = (1 << 17); static uint32_t polling_time = 10; -static int wake_up_interval = 5000; +static int wake_up_interval = 500; +static int max_events = 40000; /* * Immediate data format is determined by the upper bits @@ -318,6 +319,7 @@ struct ds_qp { struct rsocket { int type; + int category; int index; fastlock_t slock; fastlock_t rlock; @@ -377,6 +379,8 @@ struct rsocket { int opts; int fd_flags; + int fs_flags; + int ipv4_opts; uint64_t so_opts; uint64_t ipv6_opts; void *optval; @@ -423,6 +427,46 @@ struct ds_udp_header { } addr; }; +struct rfd_nd { + int fd; + uint32_t events; + struct rfd_nd *next; + struct rfd_nd *prev; +}; + +struct epoll_inst { + int epfd; + struct rfd_nd *fds; + int rdy_cnt; + int fd_cnt; + int wakeup; + pthread_t thread; + pthread_spinlock_t evlock; + pthread_spinlock_t fdlock; + struct epoll_event *rev; +}; + +struct fdm_entry { + int key; + struct epoll_inst *ep; +}; + +static int wake_pipe[2]; +static int pipe_init; + +static pthread_t glep_thread; +static pthread_mutex_t glep_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t glep_cond = PTHREAD_COND_INITIALIZER; +static struct index_map evidm; + +struct eplist { + struct epoll_inst **ep; + int cnt; + int cap; + int idx; +}; +static struct eplist glep_list = {NULL, 0, 0, 0}; + #define DS_UDP_IPV4_HDR_LEN 16 #define DS_UDP_IPV6_HDR_LEN 28 @@ -584,6 +628,13 @@ static void rs_configure(void) failable_fscanf(f, "%d", &wake_up_interval); fclose(f); } + + f = fopen(RS_CONF_DIR "/max_events", "r"); + if (f) { + failable_fscanf(f, "%d", &max_events); + fclose(f); + } + if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) { failable_fscanf(f, "%hu", &def_inline); fclose(f); @@ -643,7 +694,7 @@ static void rs_remove(struct rsocket *rs) } /* We only inherit from listening sockets */ -static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) +static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type, int category) { struct rsocket *rs; @@ -652,8 +703,10 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) return NULL; rs->type = type; + rs->category = category; + rs->index = -1; - if (type == SOCK_DGRAM) { + if (category == SOCK_DGRAM) { rs->udp_sock = -1; rs->epfd = -1; } @@ -664,7 +717,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) rs->sq_inline = inherited_rs->sq_inline; rs->sq_size = inherited_rs->sq_size; rs->rq_size = inherited_rs->rq_size; - if (type == SOCK_STREAM) { + if (category == SOCK_STREAM) { rs->ctrl_max_seqno = inherited_rs->ctrl_max_seqno; rs->target_iomap_size = inherited_rs->target_iomap_size; } @@ -674,7 +727,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) rs->sq_inline = def_inline; rs->sq_size = def_sqsize; rs->rq_size = def_rqsize; - if (type == SOCK_STREAM) { + if (category == SOCK_STREAM) { rs->ctrl_max_seqno = RS_QP_CTRL_SIZE; rs->target_iomap_size = def_iomap_size; } @@ -694,7 +747,7 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg) struct ds_qp *qp; int ret = 0; - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (rs->cm_id->recv_cq_channel) ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); @@ -846,7 +899,7 @@ static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) if (!cm_id->recv_cq) goto err1; - if (rs->fd_flags & O_NONBLOCK) { + if (rs->fs_flags & O_NONBLOCK) { if (set_fd_nonblock(cm_id->recv_cq_channel->fd, true)) goto err2; } @@ -1047,7 +1100,7 @@ static void ds_free(struct rsocket *rs) static void rs_free(struct rsocket *rs) { - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { ds_free(rs); return; } @@ -1198,18 +1251,19 @@ int rsocket(int domain, int type, int protocol) struct rsocket *rs; int index, ret; + int category = type & ~(SOCK_CLOEXEC | SOCK_NONBLOCK); if ((domain != AF_INET && domain != AF_INET6 && domain != AF_IB) || - ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) || - (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) || - (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP)) + ((!(category == SOCK_STREAM)) && (!(category == SOCK_DGRAM))) || + ((category == SOCK_STREAM) && protocol && protocol != IPPROTO_TCP) || + ((category == SOCK_DGRAM) && protocol && protocol != IPPROTO_UDP)) return ERR(ENOTSUP); rs_configure(); - rs = rs_alloc(NULL, type); + rs = rs_alloc(NULL, type, category); if (!rs) return ERR(ENOMEM); - if (type == SOCK_STREAM) { + if (category == SOCK_STREAM) { ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); if (ret) goto err; @@ -1243,7 +1297,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); if (!ret) rs->state = rs_bound; @@ -1278,7 +1332,7 @@ int rlisten(int socket, int backlog) if (ret) return ret; - if (rs->fd_flags & O_NONBLOCK) { + if (rs->fs_flags & O_NONBLOCK) { ret = set_fd_nonblock(rs->accept_queue[0], true); if (ret) return ret; @@ -1309,7 +1363,7 @@ static void rs_accept(struct rsocket *rs) if (ret) return; - new_rs = rs_alloc(rs, rs->type); + new_rs = rs_alloc(rs, rs->type, rs->category); if (!new_rs) goto err; new_rs->cm_id = cm_id; @@ -1470,7 +1524,7 @@ static int rs_do_connect(struct rsocket *rs) rs->state = rs_connect_rdwr; break; case rs_accepting: - if (!(rs->fd_flags & O_NONBLOCK)) + if (!(rs->fs_flags & O_NONBLOCK)) set_fd_nonblock(rs->cm_id->channel->fd, true); ret = ucma_complete(rs->cm_id); @@ -1722,15 +1776,15 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); ret = rs_do_connect(rs); - if (ret == -1 && errno == EINPROGRESS) { - save_errno = errno; - /* The app can still drive the CM state on failure */ - rs_notify_svc(&connect_svc, rs, RS_SVC_ADD_CM); - errno = save_errno; - } + save_errno = errno; + /* The app can still drive the CM state on failure, + * and can respond to disconnect requests + */ + rs_notify_svc(&connect_svc, rs, RS_SVC_ADD_CM); + errno = save_errno; } else { if (rs->state == rs_init) { ret = ds_init_ep(rs); @@ -2345,7 +2399,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc static int rs_nonblocking(struct rsocket *rs, int flags) { - return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); + return (rs->fs_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); } static int rs_is_cq_armed(struct rsocket *rs) @@ -2525,7 +2579,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { fastlock_acquire(&rs->rlock); ret = ds_recvfrom(rs, buf, len, flags, NULL, NULL); fastlock_release(&rs->rlock); @@ -2595,7 +2649,7 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags, rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { fastlock_acquire(&rs->rlock); ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen); fastlock_release(&rs->rlock); @@ -2800,7 +2854,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { fastlock_acquire(&rs->slock); ret = dsend(rs, buf, len, flags); fastlock_release(&rs->slock); @@ -2887,7 +2941,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (dest_addr || addrlen) return ERR(EISCONN); @@ -3196,7 +3250,7 @@ static int rs_poll_rs(struct rsocket *rs, int events, int ret; check_cq: - if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) || + if ((rs->category == SOCK_STREAM) && ((rs->state & rs_connected) || (rs->state == rs_disconnected) || (rs->state & rs_error))) { rs_process_cq(rs, nonblock, test); @@ -3213,7 +3267,7 @@ static int rs_poll_rs(struct rsocket *rs, int events, } return revents; - } else if (rs->type == SOCK_DGRAM) { + } else if (rs->category == SOCK_DGRAM) { ds_process_cqs(rs, nonblock, test); revents = 0; @@ -3285,7 +3339,7 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) if (fds[i].revents) return 1; - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (rs->state >= rs_connected) rfds[i].fd = rs->cm_id->recv_cq_channel->fd; else @@ -3313,7 +3367,7 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) if (rs) { if (rfds[i].revents) { fastlock_acquire(&rs->cq_wait_lock); - if (rs->type == SOCK_STREAM) + if (rs->category == SOCK_STREAM) rs_get_cq_event(rs); else ds_get_cq_event(rs); @@ -3367,8 +3421,10 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) if (timeout >= 0) { timeout -= (int) ((rs_time_us() - start_time) / 1000); - if (timeout <= 0) + if (timeout <= 0) { + rs_poll_exit(); return 0; + } pollsleep = min(timeout, wake_up_interval); } else { pollsleep = wake_up_interval; @@ -3492,7 +3548,7 @@ int rshutdown(int socket, int how) if (rs->opts & RS_OPT_KEEPALIVE) rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); - if (rs->fd_flags & O_NONBLOCK) + if (rs->fs_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); if (rs->state & rs_connected) { @@ -3525,8 +3581,8 @@ int rshutdown(int socket, int how) rs_process_cq(rs, 0, rs_conn_all_sends_done); out: - if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected)) - rs_set_nonblocking(rs, rs->fd_flags); + if ((rs->fs_flags & O_NONBLOCK) && (rs->state & rs_connected)) + rs_set_nonblocking(rs, rs->fs_flags); if (rs->state & rs_disconnected) { /* Generate event by flushing receives to unblock rpoll */ @@ -3542,14 +3598,14 @@ static void ds_shutdown(struct rsocket *rs) if (rs->opts & RS_OPT_UDP_SVC) rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM); - if (rs->fd_flags & O_NONBLOCK) + if (rs->fs_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); rs->state &= ~(rs_readable | rs_writable); ds_process_cqs(rs, 0, ds_all_sends_done); - if (rs->fd_flags & O_NONBLOCK) - rs_set_nonblocking(rs, rs->fd_flags); + if (rs->fs_flags & O_NONBLOCK) + rs_set_nonblocking(rs, rs->fs_flags); } int rclose(int socket) @@ -3559,7 +3615,7 @@ int rclose(int socket) rs = idm_lookup(&idm, socket); if (!rs) return EBADF; - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (rs->state & rs_connected) rshutdown(socket, SHUT_RDWR); if (rs->opts & RS_OPT_KEEPALIVE) @@ -3597,7 +3653,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); return 0; } else { @@ -3612,7 +3668,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); return 0; } else { @@ -3658,7 +3714,7 @@ int rsetsockopt(int socket, int level, int optname, rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { + if ((rs->category == SOCK_DGRAM) && level != SOL_RDMA) { ret = setsockopt(rs->udp_sock, level, optname, optval, optlen); if (ret) return ret; @@ -3669,7 +3725,7 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->so_opts; switch (optname) { case SO_REUSEADDR: - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR, (void *) optval, optlen); @@ -3681,8 +3737,8 @@ int rsetsockopt(int socket, int level, int optname, opt_on = *(int *) optval; break; case SO_RCVBUF: - if ((rs->type == SOCK_STREAM && !rs->rbuf) || - (rs->type == SOCK_DGRAM && !rs->qp_list)) + if (((rs->category == SOCK_STREAM) && !rs->rbuf) || + ((rs->category == SOCK_DGRAM) && !rs->qp_list)) rs->rbuf_size = (*(uint32_t *) optval) << 1; ret = 0; break; @@ -3710,6 +3766,18 @@ int rsetsockopt(int socket, int level, int optname, break; } break; + case IPPROTO_IP: + switch (optname) { + case IP_TOS: + rs->ipv4_opts = *(int *)optval; + ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, + RDMA_OPTION_ID_TOS, + (void *) optval, optlen); + break; + default: + break; + } + break; case IPPROTO_TCP: opts = &rs->tcp_opts; switch (optname) { @@ -3731,6 +3799,7 @@ int rsetsockopt(int socket, int level, int optname, ret = 0; break; case TCP_MAXSEG: + case TCP_CONGESTION: ret = 0; break; default: @@ -3741,7 +3810,7 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->ipv6_opts; switch (optname) { case IPV6_V6ONLY: - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_AFONLY, (void *) optval, optlen); @@ -3834,6 +3903,7 @@ int rgetsockopt(int socket, int level, int optname, struct rsocket *rs; void *opt; struct ibv_sa_path_rec *path_rec; + struct tcp_info *info; struct ibv_path_data path_data; socklen_t len; int ret = 0; @@ -3871,6 +3941,21 @@ int rgetsockopt(int socket, int level, int optname, *optlen = sizeof(int); rs->err = 0; break; + case SO_BROADCAST: + ret = 0; + break; + default: + ret = ENOTSUP; + break; + } + break; + case IPPROTO_IP: + switch (optname) { + case IP_TOS: + *((int *) optval) = rs->ipv4_opts; + *optlen = sizeof(int); + break; + default: ret = ENOTSUP; break; @@ -3878,6 +3963,7 @@ int rgetsockopt(int socket, int level, int optname, break; case IPPROTO_TCP: switch (optname) { + case TCP_CONGESTION: case TCP_KEEPCNT: case TCP_KEEPINTVL: *((int *) optval) = 1; /* N/A */ @@ -3896,6 +3982,17 @@ int rgetsockopt(int socket, int level, int optname, 2048; *optlen = sizeof(int); break; + case TCP_INFO: + //TODO: support other tcp_info fields. + info = (struct tcp_info *) optval; + memset(info, 0, sizeof(struct tcp_info)); + info->tcpi_state = (rs->state == rs_connected) ? + TCP_ESTABLISHED : TCP_CLOSE; + info->tcpi_snd_cwnd = rs->sq_size; + + *optlen = sizeof(struct tcp_info); + break; + default: ret = ENOTSUP; break; @@ -3986,15 +4083,22 @@ int rfcntl(int socket, int cmd, ... /* arg */ ) va_start(args, cmd); switch (cmd) { case F_GETFL: - ret = rs->fd_flags; + ret = rs->fs_flags; break; case F_SETFL: param = va_arg(args, int); - if ((rs->fd_flags & O_NONBLOCK) != (param & O_NONBLOCK)) + if ((rs->fs_flags & O_NONBLOCK) != (param & O_NONBLOCK)) ret = rs_set_nonblocking(rs, param & O_NONBLOCK); if (!ret) - rs->fd_flags = param; + rs->fs_flags = param; + break; + case F_GETFD: + ret = rs->fd_flags; + break; + case F_SETFD: + param = va_arg(args, int); + rs->fd_flags = param; break; default: ret = ERR(ENOTSUP); @@ -4565,7 +4669,7 @@ static void tcp_svc_send_keepalive(struct rsocket *rs) 0, (uintptr_t) NULL, (uintptr_t) NULL); } fastlock_release(&rs->cq_lock); -} +} static void *tcp_svc_run(void *arg) { @@ -4697,3 +4801,383 @@ static void *cm_svc_run(void *arg) return NULL; } + +static uint32_t epoll_rs(int fd, uint32_t events); + +uint32_t epoll_rs(int fd, uint32_t events) +{ + struct pollfd fds; + uint32_t revents = 0; + int ret; + struct rsocket *rs = idm_lookup(&idm, fd); + +check_cq: + if ((rs->category == SOCK_STREAM) && ((rs->state & rs_connected) || + (rs->state == rs_disconnected) || (rs->state & rs_error))) { + rs_process_cq(rs, 1, rs_poll_all); + + if ((events & EPOLLIN) && rs_conn_have_rdata(rs)) + revents |= EPOLLIN; + if ((events & EPOLLOUT) && rs_can_send(rs)) + revents |= EPOLLOUT; + if (!(rs->state & rs_connected)) { + if (rs->state == rs_disconnected) + revents |= EPOLLHUP; + else + revents |= EPOLLERR; + } + + return revents; + } else if (rs->category == SOCK_DGRAM) { + ds_process_cqs(rs, 1, rs_poll_all); + + if ((events & EPOLLIN) && rs_have_rdata(rs)) + revents |= EPOLLIN; + if ((events & EPOLLOUT) && ds_can_send(rs)) + revents |= EPOLLOUT; + + return revents; + } + + if (rs->state == rs_listening) { + fds.fd = rs->accept_queue[0]; + fds.revents = 0; + if (events & EPOLLIN) + fds.events |= POLLIN; + if (events & EPOLLOUT) + fds.events |= POLLOUT; + if (events & EPOLLERR) + fds.events |= POLLERR; + if (events & EPOLLHUP) + fds.events |= POLLHUP; + + + poll(&fds, 1, 0); + if (fds.revents & POLLIN) + revents |= EPOLLIN; + if (fds.revents & POLLOUT) + revents |= EPOLLOUT; + if (fds.revents & POLLERR) + revents |= EPOLLERR; + if (fds.revents & POLLHUP) + revents |= EPOLLHUP; + + return revents; + } + + if (rs->state & rs_opening) { + ret = rs_do_connect(rs); + if (ret && (errno == EINPROGRESS)) + errno = 0; + else + goto check_cq; + } + + if (rs->state == rs_connect_error) { + if (events & EPOLLOUT) + revents |= EPOLLOUT; + if (events & EPOLLIN) + revents |= EPOLLIN; + revents |= EPOLLERR; + return revents; + } + + return 0; +} + +static void get_pipe(void) +{ + uint64_t u = 1; + + if (!pipe_init) { + if (pipe(wake_pipe) != 0) + return; + if (write(wake_pipe[1], &u, sizeof(u)) != (ssize_t)sizeof(u)) + return; + pipe_init = 1; + } + +} + +static void update_wakeup(struct epoll_inst *instance, int count) +{ + struct epoll_event eve; + + eve.events = EPOLLIN | EPOLLET; + eve.data.fd = wake_pipe[0]; + + if (count && !instance->wakeup) { + (void)epoll_ctl(instance->epfd, EPOLL_CTL_ADD | EPOLL_FLAG, wake_pipe[0], &eve); + instance->wakeup = 1; + } else if (!count && instance->wakeup) { + (void)epoll_ctl(instance->epfd, EPOLL_CTL_DEL | EPOLL_FLAG, wake_pipe[0], NULL); + instance->wakeup = 0; + } + +} + +static int add_epins(int fd, struct epoll_inst *instance) +{ + struct fdm_entry *entry = malloc(sizeof(struct fdm_entry)); + + if (!entry) + return ERR(ENOMEM); + + entry->key = fd; + entry->ep = instance; + idm_set(&evidm, fd, entry); + + return fd; +} + +static struct epoll_inst *get_epins(int fd) +{ + struct fdm_entry *entry = idm_lookup(&evidm, fd); + + return entry ? entry->ep : NULL; +} + +static void *epoll_thread(void *arg) +{ + struct eplist *list = &glep_list; + struct epoll_event *temp_revents2; + struct rfd_nd *curr; + struct epoll_inst *instance; + int count, revent; + + temp_revents2 = malloc(max_events * sizeof(struct epoll_event)); + if (!temp_revents2) + return NULL; + + while (1) { + pthread_mutex_lock(&glep_lock); + while (list->cnt == 0 || list->idx >= list->cnt) + pthread_cond_wait(&glep_cond, &glep_lock); + + instance = list->ep[list->idx]; + list->idx = (list->idx + 1) % list->cnt; + pthread_mutex_unlock(&glep_lock); + + memset(temp_revents2, 0, max_events * sizeof(struct epoll_event)); + + count = 0; + pthread_spin_lock(&instance->fdlock); + curr = instance->fds; + while (curr != NULL) { + if (count >= max_events) + break; + + revent = epoll_rs(curr->fd, curr->events); + if (revent) { + temp_revents2[count].data.fd = curr->fd; + temp_revents2[count].events = revent; + count++; + } + curr = curr->next; + } + + pthread_spin_unlock(&instance->fdlock); + + pthread_spin_lock(&instance->evlock); + memset(instance->rev, 0, max_events * sizeof(struct epoll_event)); + memcpy(instance->rev, temp_revents2, count * sizeof(struct epoll_event)); + + instance->rdy_cnt = count; + update_wakeup(instance, count); + + pthread_spin_unlock(&instance->evlock); + + } + + return NULL; +} + +static int mng_epfd_thread(struct epoll_inst *instance) +{ + struct epoll_inst **new_instances; + int ret, new_capacity; + + pthread_mutex_lock(&glep_lock); + + // Check if we need to expand the global instance list + if (glep_list.cnt == glep_list.cap) { + new_capacity = glep_list.cap ? glep_list.cap * 2 : 4; + new_instances = realloc(glep_list.ep, + new_capacity * sizeof(struct epoll_inst *)); + if (!new_instances) { + pthread_mutex_unlock(&glep_lock); + return ERR(ENOMEM); + } + + glep_list.ep = new_instances; + glep_list.cap = new_capacity; + } + + // Add the new instance to the global instance list + glep_list.ep[glep_list.cnt++] = instance; + + // Create the global epoll thread if it doesn't exist + if (glep_list.cnt > 0 && !glep_thread) { + ret = pthread_create(&glep_thread, NULL, epoll_thread, NULL); + if (ret) { + pthread_mutex_unlock(&glep_lock); + // TODO: Handle the thread creation error properly + return ERR(EAGAIN); + } + } + + // Signal the global thread to start or resume processing + glep_list.idx = 0; // Start from the beginning + pthread_cond_signal(&glep_cond); + pthread_mutex_unlock(&glep_lock); + + return 0; +} + +static int epoll_add(struct epoll_inst *instance, int fd, struct epoll_event *event) +{ + struct rfd_nd *node = (struct rfd_nd *)malloc(sizeof(struct rfd_nd)); + + if (!node) + return ERR(ENOMEM); + + node->fd = fd; + node->events = event->events; + node->prev = NULL; + + pthread_spin_lock(&instance->fdlock); + + node->next = instance->fds; + if (instance->fds) + instance->fds->prev = node; + + instance->fds = node; + instance->fd_cnt++; + + pthread_spin_unlock(&instance->fdlock); + + return 0; +} + +static void epoll_mod(struct epoll_inst *instance, struct epoll_event *event, struct rfd_nd *node) +{ + pthread_spin_lock(&instance->fdlock); + node->events = event->events; + pthread_spin_unlock(&instance->fdlock); +} + +static void epoll_del(struct epoll_inst *instance, struct rfd_nd *node) +{ + pthread_spin_lock(&instance->fdlock); + instance->fd_cnt--; + if (!node->prev) + instance->fds = node->next; + else + node->prev->next = node->next; + + if (node->next) + node->next->prev = node->prev; + + pthread_spin_unlock(&instance->fdlock); + + free(node); +} + +static struct rfd_nd *find_rfd(struct epoll_inst *instance, int fd) +{ + struct rfd_nd *curr = instance->fds; + + while (curr != NULL) { + if (curr->fd == fd) + return curr; + + curr = curr->next; + } + + return NULL; +} + +int repoll_create1(int flags) +{ + get_pipe(); + int ret, epfd = flags; + struct epoll_inst *instance; + + instance = malloc(sizeof(struct epoll_inst)); + if (!instance) + goto error; + + instance->epfd = epfd; + instance->fds = NULL; + instance->rdy_cnt = 0; + instance->fd_cnt = 0; + instance->wakeup = 0; + instance->rev = (struct epoll_event *)malloc(max_events * sizeof(struct epoll_event)); + + pthread_spin_init(&instance->evlock, PTHREAD_PROCESS_PRIVATE); + pthread_spin_init(&instance->fdlock, PTHREAD_PROCESS_PRIVATE); + + ret = mng_epfd_thread(instance); + if (ret) + goto free_instance; + + return add_epins(epfd, instance); + +free_instance: + free(instance); +error: + return ERR(ENOMEM); +} + +int repoll_ctl(int epfd, int op, int fd, struct epoll_event *event) +{ + struct epoll_inst *instance = get_epins(epfd); + + if (!instance) + return ERR(EBADF); + + struct rfd_nd *nd_fd = find_rfd(instance, fd); + + switch (op) { + case EPOLL_CTL_ADD: + if (nd_fd) + return ERR(EEXIST); + + return epoll_add(instance, fd, event); + + case EPOLL_CTL_MOD: + if (!nd_fd) + return ERR(ENOENT); + + epoll_mod(instance, event, nd_fd); + break; + + case EPOLL_CTL_DEL: + if (!nd_fd) + return ERR(ENOENT); + + epoll_del(instance, nd_fd); + break; + + default: + return ERR(EINVAL); + } + + return 0; +} + +int repoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) +{ + struct epoll_inst *instance = get_epins(epfd); + + pthread_spin_lock(&instance->evlock); + + int count = (instance->rdy_cnt > maxevents) ? maxevents : instance->rdy_cnt; + + for (int i = 0; i < count; i++) + events[i] = instance->rev[i]; + + pthread_spin_unlock(&instance->evlock); + + return count; +} diff --git a/librdmacm/rsocket.h b/librdmacm/rsocket.h index efd0db58b..ac8cad517 100644 --- a/librdmacm/rsocket.h +++ b/librdmacm/rsocket.h @@ -40,6 +40,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -70,6 +71,10 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout); int rselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); +int repoll_create1(int flags); +int repoll_ctl(int epfd, int op, int fd, struct epoll_event *event); +int repoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); + int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen); int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen);