From c258dcfde4c66397611631f904256b69e44e0f07 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Tue, 8 Oct 2024 10:28:32 +0300 Subject: [PATCH 01/14] librdmacm: Add support for epoll: epoll_create, epoll_create1 This commit introduces epoll_create functionality to support a centralized thread for managing all epoll instances. The epoll_create call creates an epoll_inst struct and two epoll file descriptors: a "regular epfd" for handling real file descriptors and another epfd that includes the "regular epfd" added using epoll_ctl. The latter epfd is returned from the epoll_create function. Additionally, the new epoll instance is registered with a global thread that processes all instances in a round-robin fashion, efficiently handling events for both regular and rsocket file descriptors. The global thread manages polling in two steps for each epoll instance. First, it iterates through the list of rsocket fds in the epoll struct, polling each one to check for events. Second, it calls epoll_wait on the "regular epfd" to gather events from the real file descriptors. The thread keeps the events in the struct, and proceeds to the next epoll instance. Signed-off-by: Batsheva Black --- debian/librdmacm1.symbols | 2 + librdmacm/CMakeLists.txt | 2 +- librdmacm/cma.h | 1 + librdmacm/librdmacm.map | 5 + librdmacm/librspreload.map | 2 + librdmacm/man/rsocket.7.in | 5 +- librdmacm/preload.c | 43 ++++++ librdmacm/rsocket.c | 307 ++++++++++++++++++++++++++++++++++++- librdmacm/rsocket.h | 3 + 9 files changed, 367 insertions(+), 3 deletions(-) diff --git a/debian/librdmacm1.symbols b/debian/librdmacm1.symbols index 8580fb2dd..4daf6609f 100644 --- a/debian/librdmacm1.symbols +++ b/debian/librdmacm1.symbols @@ -5,10 +5,12 @@ librdmacm.so.1 librdmacm1 #MINVER# RDMACM_1.2@RDMACM_1.2 23 RDMACM_1.3@RDMACM_1.3 31 RDMACM_1.4@RDMACM_1.4 60 + RDMACM_1.5@RDMACM_1.5 61 raccept@RDMACM_1.0 1.0.16 rbind@RDMACM_1.0 1.0.16 rclose@RDMACM_1.0 1.0.16 rconnect@RDMACM_1.0 1.0.16 + repoll_create1@RDMACM_1.5 61 rdma_accept@RDMACM_1.0 1.0.15 rdma_ack_cm_event@RDMACM_1.0 1.0.15 rdma_bind_addr@RDMACM_1.0 1.0.15 diff --git a/librdmacm/CMakeLists.txt b/librdmacm/CMakeLists.txt index ea1d1550f..d245b63ec 100644 --- a/librdmacm/CMakeLists.txt +++ b/librdmacm/CMakeLists.txt @@ -11,7 +11,7 @@ publish_headers(infiniband rdma_library(rdmacm librdmacm.map # See Documentation/versioning.md - 1 1.4.${PACKAGE_VERSION} + 1 1.5.${PACKAGE_VERSION} acm.c addrinfo.c cma.c diff --git a/librdmacm/cma.h b/librdmacm/cma.h index e14c51778..92225b56e 100644 --- a/librdmacm/cma.h +++ b/librdmacm/cma.h @@ -96,6 +96,7 @@ int ucma_init(void); extern int af_ib_support; #define RAI_ROUTEONLY 0x01000000 +#define EPOLL_FLAG 0x80000000 void ucma_ib_init(void); void ucma_ib_cleanup(void); diff --git a/librdmacm/librdmacm.map b/librdmacm/librdmacm.map index 76853f2f7..66557716e 100644 --- a/librdmacm/librdmacm.map +++ b/librdmacm/librdmacm.map @@ -94,3 +94,8 @@ RDMACM_1.4 { rdma_resolve_addrinfo; rdma_write_cm_event; } RDMACM_1.3; + +RDMACM_1.5 { + global: + repoll_create1; +} RDMACM_1.4; diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index 67ecf33b8..665dff853 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -29,5 +29,7 @@ socket; write; writev; + epoll_create; + epoll_create1; local: *; }; diff --git a/librdmacm/man/rsocket.7.in b/librdmacm/man/rsocket.7.in index 7dc479ece..0c0c758cb 100644 --- a/librdmacm/man/rsocket.7.in +++ b/librdmacm/man/rsocket.7.in @@ -1,5 +1,5 @@ .\" Licensed under the OpenIB.org BSD license (FreeBSD Variant) - See COPYING.md -.TH "RSOCKET" 7 "2019-04-16" "librdmacm" "Librdmacm Programmer's Manual" librdmacm +.TH "RSOCKET" 7 "2024-12-24" "librdmacm" "Librdmacm Programmer's Manual" librdmacm .SH NAME rsocket \- RDMA socket API .SH SYNOPSIS @@ -30,6 +30,7 @@ rpoll, rselect rgetpeername, rgetsockname .P rsetsockopt, rgetsockopt, rfcntl +repoll_create1 .P Functions take the same parameters as that used for sockets. The follow capabilities and flags are supported at this time: @@ -151,6 +152,8 @@ wake_up_interval - maximum number of milliseconds to block in poll. This value is used to safe guard against potential application hangs in rpoll(). .P +max_events - maximum number of events for the epoll thread to handle for an epoll_instance. +.P All configuration files should contain a single integer value. Values may be set by issuing a command similar to the following example. .P diff --git a/librdmacm/preload.c b/librdmacm/preload.c index b3175dd5d..02b00f61b 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -90,13 +90,18 @@ struct socket_calls { int (*dup2)(int oldfd, int newfd); ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); int (*fxstat)(int ver, int fd, struct stat *buf); + int (*epoll_create)(int size); + int (*epoll_create1)(int flags); + int (*epoll_ctl)(int epfd, int op, int fd, struct epoll_event *event); }; static struct socket_calls real; static struct socket_calls rs; static struct index_map idm; +static struct index_map ep_idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t ep_mut = PTHREAD_MUTEX_INITIALIZER; static int sq_size; static int rq_size; @@ -411,6 +416,9 @@ static void init_preload(void) real.dup2 = dlsym(RTLD_NEXT, "dup2"); real.sendfile = dlsym(RTLD_NEXT, "sendfile"); real.fxstat = dlsym(RTLD_NEXT, "__fxstat"); + real.epoll_create = dlsym(RTLD_NEXT, "epoll_create"); + real.epoll_create1 = dlsym(RTLD_NEXT, "epoll_create1"); + real.epoll_ctl = dlsym(RTLD_NEXT, "epoll_ctl"); rs.socket = dlsym(RTLD_DEFAULT, "rsocket"); rs.bind = dlsym(RTLD_DEFAULT, "rbind"); @@ -1195,3 +1203,38 @@ int __fxstat(int ver, int socket, struct stat *buf) } return ret; } + +int epoll_create(int size) +{ + if (size <= 0) + return ERR(EINVAL); + + return epoll_create1(0); +} + +int epoll_create1(int flags) +{ + init_preload(); + + int epfd = real.epoll_create1(flags); + int ep_reg = real.epoll_create1(flags); + struct epoll_event event; + + if (epfd < 0 || ep_reg < 0) + return -1; + + event.events = EPOLLIN | EPOLLOUT; + event.data.fd = ep_reg; + + (void)real.epoll_ctl(epfd, EPOLL_CTL_ADD, ep_reg, &event); + pthread_mutex_lock(&ep_mut); + if (idm_set(&ep_idm, epfd, (void *)(uintptr_t)ep_reg) < 0) { + pthread_mutex_unlock(&ep_mut); + close(epfd); + close(ep_reg); + return -1; + } + + pthread_mutex_unlock(&ep_mut); + return repoll_create1(epfd); +} diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index 005bd0be8..891b88478 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -136,6 +136,7 @@ static uint32_t def_mem = (1 << 17); static uint32_t def_wmem = (1 << 17); static uint32_t polling_time = 10; static int wake_up_interval = 5000; +static int max_events = 40000; /* * Immediate data format is determined by the upper bits @@ -423,6 +424,46 @@ struct ds_udp_header { } addr; }; +struct rfd_nd { + int fd; + uint32_t events; + struct rfd_nd *next; + struct rfd_nd *prev; +}; + +struct epoll_inst { + int epfd; + struct rfd_nd *fds; + int rdy_cnt; + int fd_cnt; + int wakeup; + pthread_t thread; + pthread_spinlock_t evlock; + pthread_spinlock_t fdlock; + struct epoll_event *rev; +}; + +struct fdm_entry { + int key; + struct epoll_inst *ep; +}; + +static int wake_pipe[2]; +static int pipe_init; + +static pthread_t glep_thread; +static pthread_mutex_t glep_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t glep_cond = PTHREAD_COND_INITIALIZER; +static struct index_map evidm; + +struct eplist { + struct epoll_inst **ep; + int cnt; + int cap; + int idx; +}; +static struct eplist glep_list = {NULL, 0, 0, 0}; + #define DS_UDP_IPV4_HDR_LEN 16 #define DS_UDP_IPV6_HDR_LEN 28 @@ -584,6 +625,13 @@ static void rs_configure(void) failable_fscanf(f, "%d", &wake_up_interval); fclose(f); } + + f = fopen(RS_CONF_DIR "/max_events", "r"); + if (f) { + failable_fscanf(f, "%d", &max_events); + fclose(f); + } + if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) { failable_fscanf(f, "%hu", &def_inline); fclose(f); @@ -4565,7 +4613,7 @@ static void tcp_svc_send_keepalive(struct rsocket *rs) 0, (uintptr_t) NULL, (uintptr_t) NULL); } fastlock_release(&rs->cq_lock); -} +} static void *tcp_svc_run(void *arg) { @@ -4697,3 +4745,260 @@ static void *cm_svc_run(void *arg) return NULL; } + +static uint32_t epoll_rs(int fd, uint32_t events); + +uint32_t epoll_rs(int fd, uint32_t events) +{ + struct pollfd fds; + uint32_t revents = 0; + int ret; + struct rsocket *rs = idm_lookup(&idm, fd); + +check_cq: + if ((rs->type & SOCK_STREAM) && ((rs->state & rs_connected) || + (rs->state == rs_disconnected) || (rs->state & rs_error))) { + rs_process_cq(rs, 1, rs_poll_all); + + if ((events & EPOLLIN) && rs_conn_have_rdata(rs)) + revents |= EPOLLIN; + if ((events & EPOLLOUT) && rs_can_send(rs)) + revents |= EPOLLOUT; + if (!(rs->state & rs_connected)) { + if (rs->state == rs_disconnected) + revents |= EPOLLHUP; + else + revents |= EPOLLERR; + } + + return revents; + } else if (rs->type & SOCK_DGRAM) { + ds_process_cqs(rs, 1, rs_poll_all); + + if ((events & EPOLLIN) && rs_have_rdata(rs)) + revents |= EPOLLIN; + if ((events & EPOLLOUT) && ds_can_send(rs)) + revents |= EPOLLOUT; + + return revents; + } + + if (rs->state == rs_listening) { + fds.fd = rs->accept_queue[0]; + fds.revents = 0; + if (events & EPOLLIN) + fds.events |= POLLIN; + if (events & EPOLLOUT) + fds.events |= POLLOUT; + if (events & EPOLLERR) + fds.events |= POLLERR; + if (events & EPOLLHUP) + fds.events |= POLLHUP; + + + poll(&fds, 1, 0); + if (fds.revents & POLLIN) + revents |= EPOLLIN; + if (fds.revents & POLLOUT) + revents |= EPOLLOUT; + if (fds.revents & POLLERR) + revents |= EPOLLERR; + if (fds.revents & POLLHUP) + revents |= EPOLLHUP; + + return revents; + } + + if (rs->state & rs_opening) { + ret = rs_do_connect(rs); + if (ret && (errno == EINPROGRESS)) + errno = 0; + else + goto check_cq; + } + + if (rs->state == rs_connect_error) { + if (events & EPOLLOUT) + revents |= EPOLLOUT; + if (events & EPOLLIN) + revents |= EPOLLIN; + revents |= EPOLLERR; + return revents; + } + + return 0; +} + +static void get_pipe(void) +{ + uint64_t u = 1; + + if (!pipe_init) { + if (pipe(wake_pipe) != 0) + return; + if (write(wake_pipe[1], &u, sizeof(u)) != (ssize_t)sizeof(u)) + return; + pipe_init = 1; + } + +} + +static void update_wakeup(struct epoll_inst *instance, int count) +{ + struct epoll_event eve; + + eve.events = EPOLLIN | EPOLLET; + eve.data.fd = wake_pipe[0]; + + if (count && !instance->wakeup) { + (void)epoll_ctl(instance->epfd, EPOLL_CTL_ADD | EPOLL_FLAG, wake_pipe[0], &eve); + instance->wakeup = 1; + } else if (!count && instance->wakeup) { + (void)epoll_ctl(instance->epfd, EPOLL_CTL_DEL | EPOLL_FLAG, wake_pipe[0], NULL); + instance->wakeup = 0; + } + +} + +static int add_epins(int fd, struct epoll_inst *instance) +{ + struct fdm_entry *entry = malloc(sizeof(struct fdm_entry)); + + if (!entry) + return ERR(ENOMEM); + + entry->key = fd; + entry->ep = instance; + idm_set(&evidm, fd, entry); + + return fd; +} + +static void *epoll_thread(void *arg) +{ + struct eplist *list = &glep_list; + struct epoll_event *temp_revents2; + struct rfd_nd *curr; + struct epoll_inst *instance; + int count, revent; + + temp_revents2 = malloc(max_events * sizeof(struct epoll_event)); + if (!temp_revents2) + return NULL; + + while (1) { + pthread_mutex_lock(&glep_lock); + while (list->cnt == 0 || list->idx >= list->cnt) + pthread_cond_wait(&glep_cond, &glep_lock); + + instance = list->ep[list->idx]; + list->idx = (list->idx + 1) % list->cnt; + pthread_mutex_unlock(&glep_lock); + + memset(temp_revents2, 0, max_events * sizeof(struct epoll_event)); + + count = 0; + pthread_spin_lock(&instance->fdlock); + curr = instance->fds; + while (curr != NULL) { + if (count >= max_events) + break; + + revent = epoll_rs(curr->fd, curr->events); + if (revent) { + temp_revents2[count].data.fd = curr->fd; + temp_revents2[count].events = revent; + count++; + } + curr = curr->next; + } + + pthread_spin_unlock(&instance->fdlock); + + pthread_spin_lock(&instance->evlock); + memset(instance->rev, 0, max_events * sizeof(struct epoll_event)); + memcpy(instance->rev, temp_revents2, count * sizeof(struct epoll_event)); + + instance->rdy_cnt = count; + update_wakeup(instance, count); + + pthread_spin_unlock(&instance->evlock); + + } + + return NULL; +} + +static int mng_epfd_thread(struct epoll_inst *instance) +{ + struct epoll_inst **new_instances; + int ret, new_capacity; + + pthread_mutex_lock(&glep_lock); + + // Check if we need to expand the global instance list + if (glep_list.cnt == glep_list.cap) { + new_capacity = glep_list.cap ? glep_list.cap * 2 : 4; + new_instances = realloc(glep_list.ep, + new_capacity * sizeof(struct epoll_inst *)); + if (!new_instances) { + pthread_mutex_unlock(&glep_lock); + return ERR(ENOMEM); + } + + glep_list.ep = new_instances; + glep_list.cap = new_capacity; + } + + // Add the new instance to the global instance list + glep_list.ep[glep_list.cnt++] = instance; + + // Create the global epoll thread if it doesn't exist + if (glep_list.cnt > 0 && !glep_thread) { + ret = pthread_create(&glep_thread, NULL, epoll_thread, NULL); + if (ret) { + pthread_mutex_unlock(&glep_lock); + // TODO: Handle the thread creation error properly + return ERR(EAGAIN); + } + } + + // Signal the global thread to start or resume processing + glep_list.idx = 0; // Start from the beginning + pthread_cond_signal(&glep_cond); + pthread_mutex_unlock(&glep_lock); + + return 0; +} + +int repoll_create1(int flags) +{ + get_pipe(); + int ret, epfd = flags; + struct epoll_inst *instance; + + instance = malloc(sizeof(struct epoll_inst)); + if (!instance) + goto error; + + instance->epfd = epfd; + instance->fds = NULL; + instance->rdy_cnt = 0; + instance->fd_cnt = 0; + instance->wakeup = 0; + instance->rev = (struct epoll_event *)malloc(max_events * sizeof(struct epoll_event)); + + pthread_spin_init(&instance->evlock, PTHREAD_PROCESS_PRIVATE); + pthread_spin_init(&instance->fdlock, PTHREAD_PROCESS_PRIVATE); + + ret = mng_epfd_thread(instance); + if (ret) + goto free_instance; + + return add_epins(epfd, instance); + +free_instance: + free(instance); +error: + return ERR(ENOMEM); +} diff --git a/librdmacm/rsocket.h b/librdmacm/rsocket.h index efd0db58b..94a22f230 100644 --- a/librdmacm/rsocket.h +++ b/librdmacm/rsocket.h @@ -40,6 +40,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -70,6 +71,8 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout); int rselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); +int repoll_create1(int flags); + int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen); int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen); From d562932e3733cda89851d2a2d0091bdcd9f6d6ad Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Tue, 8 Oct 2024 10:59:52 +0300 Subject: [PATCH 02/14] librdmacm: Add support for epoll: epoll_ctl This commit implements epoll_ctl with tailored handling for real and rsocket file descriptors. For regular file descriptors, epoll_ctl directly operates on the "regular epfd". For rsocket file descriptors, they are added to a dedicated list maintained in the epoll instance struct. This list ensures that the global thread can handle these file descriptors during its polling cycle. epoll_ctl triggers the thread to reprocess the epoll instance to update the ready list. Reflecting any events on the newly added file descriptors. Signed-off-by: Batsheva Black --- debian/librdmacm1.symbols | 1 + librdmacm/librdmacm.map | 1 + librdmacm/librspreload.map | 1 + librdmacm/man/rsocket.7.in | 2 +- librdmacm/preload.c | 38 +++++++++++++ librdmacm/rsocket.c | 107 +++++++++++++++++++++++++++++++++++++ librdmacm/rsocket.h | 1 + 7 files changed, 150 insertions(+), 1 deletion(-) diff --git a/debian/librdmacm1.symbols b/debian/librdmacm1.symbols index 4daf6609f..cd5ac0556 100644 --- a/debian/librdmacm1.symbols +++ b/debian/librdmacm1.symbols @@ -11,6 +11,7 @@ librdmacm.so.1 librdmacm1 #MINVER# rclose@RDMACM_1.0 1.0.16 rconnect@RDMACM_1.0 1.0.16 repoll_create1@RDMACM_1.5 61 + repoll_ctl@RDMACM_1.5 61 rdma_accept@RDMACM_1.0 1.0.15 rdma_ack_cm_event@RDMACM_1.0 1.0.15 rdma_bind_addr@RDMACM_1.0 1.0.15 diff --git a/librdmacm/librdmacm.map b/librdmacm/librdmacm.map index 66557716e..f4a71f1ea 100644 --- a/librdmacm/librdmacm.map +++ b/librdmacm/librdmacm.map @@ -98,4 +98,5 @@ RDMACM_1.4 { RDMACM_1.5 { global: repoll_create1; + repoll_ctl; } RDMACM_1.4; diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index 665dff853..31ae7b7ba 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -31,5 +31,6 @@ writev; epoll_create; epoll_create1; + epoll_ctl; local: *; }; diff --git a/librdmacm/man/rsocket.7.in b/librdmacm/man/rsocket.7.in index 0c0c758cb..0c7aeadd5 100644 --- a/librdmacm/man/rsocket.7.in +++ b/librdmacm/man/rsocket.7.in @@ -30,7 +30,7 @@ rpoll, rselect rgetpeername, rgetsockname .P rsetsockopt, rgetsockopt, rfcntl -repoll_create1 +repoll_create1, repoll_ctl .P Functions take the same parameters as that used for sockets. The follow capabilities and flags are supported at this time: diff --git a/librdmacm/preload.c b/librdmacm/preload.c index 02b00f61b..3257f0189 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -99,9 +99,11 @@ static struct socket_calls real; static struct socket_calls rs; static struct index_map idm; +static struct index_map ridm; static struct index_map ep_idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t ep_mut = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t rs_mut = PTHREAD_MUTEX_INITIALIZER; static int sq_size; static int rq_size; @@ -1238,3 +1240,39 @@ int epoll_create1(int flags) pthread_mutex_unlock(&ep_mut); return repoll_create1(epfd); } + +int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) +{ + int ep_reg, rfd; + + if (fd < 0) + return ERR(EBADF); + if (op & EPOLL_FLAG) + return real.epoll_ctl(epfd, op & ~EPOLL_FLAG, fd, event); + + if ((fd == epfd) || (event->events & EPOLLET)) + return ERR(EINVAL); + + if (!(fd_gett(fd) == fd_rsocket)) { + pthread_mutex_lock(&ep_mut); + ep_reg = (uintptr_t)idm_at(&ep_idm, epfd); + pthread_mutex_unlock(&ep_mut); + + if (!ep_reg) + return ERR(ENOENT); + return real.epoll_ctl(ep_reg, op, fd, event); + } + + rfd = fd_getd(fd); + if (op == EPOLL_CTL_ADD) { + pthread_mutex_lock(&rs_mut); + if (idm_set(&ridm, rfd, (void *)(uintptr_t)fd) < 0) { + pthread_mutex_unlock(&rs_mut); + return ERR(EFAULT); + } + + pthread_mutex_unlock(&rs_mut); + } + + return repoll_ctl(epfd, op, rfd, event); +} diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index 891b88478..8b9b1a4c3 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -4874,6 +4874,13 @@ static int add_epins(int fd, struct epoll_inst *instance) return fd; } +static struct epoll_inst *get_epins(int fd) +{ + struct fdm_entry *entry = idm_lookup(&evidm, fd); + + return entry ? entry->ep : NULL; +} + static void *epoll_thread(void *arg) { struct eplist *list = &glep_list; @@ -4971,6 +4978,69 @@ static int mng_epfd_thread(struct epoll_inst *instance) return 0; } +static int epoll_add(struct epoll_inst *instance, int fd, struct epoll_event *event) +{ + struct rfd_nd *node = (struct rfd_nd *)malloc(sizeof(struct rfd_nd)); + + if (!node) + return ERR(ENOMEM); + + node->fd = fd; + node->events = event->events; + node->prev = NULL; + + pthread_spin_lock(&instance->fdlock); + + node->next = instance->fds; + if (instance->fds) + instance->fds->prev = node; + + instance->fds = node; + instance->fd_cnt++; + + pthread_spin_unlock(&instance->fdlock); + + return 0; +} + +static void epoll_mod(struct epoll_inst *instance, struct epoll_event *event, struct rfd_nd *node) +{ + pthread_spin_lock(&instance->fdlock); + node->events = event->events; + pthread_spin_unlock(&instance->fdlock); +} + +static void epoll_del(struct epoll_inst *instance, struct rfd_nd *node) +{ + pthread_spin_lock(&instance->fdlock); + instance->fd_cnt--; + if (!node->prev) + instance->fds = node->next; + else + node->prev->next = node->next; + + if (node->next) + node->next->prev = node->prev; + + pthread_spin_unlock(&instance->fdlock); + + free(node); +} + +static struct rfd_nd *find_rfd(struct epoll_inst *instance, int fd) +{ + struct rfd_nd *curr = instance->fds; + + while (curr != NULL) { + if (curr->fd == fd) + return curr; + + curr = curr->next; + } + + return NULL; +} + int repoll_create1(int flags) { get_pipe(); @@ -5002,3 +5072,40 @@ int repoll_create1(int flags) error: return ERR(ENOMEM); } + +int repoll_ctl(int epfd, int op, int fd, struct epoll_event *event) +{ + struct epoll_inst *instance = get_epins(epfd); + + if (!instance) + return ERR(EBADF); + + struct rfd_nd *nd_fd = find_rfd(instance, fd); + + switch (op) { + case EPOLL_CTL_ADD: + if (nd_fd) + return ERR(EEXIST); + + return epoll_add(instance, fd, event); + + case EPOLL_CTL_MOD: + if (!nd_fd) + return ERR(ENOENT); + + epoll_mod(instance, event, nd_fd); + break; + + case EPOLL_CTL_DEL: + if (!nd_fd) + return ERR(ENOENT); + + epoll_del(instance, nd_fd); + break; + + default: + return ERR(EINVAL); + } + + return 0; +} diff --git a/librdmacm/rsocket.h b/librdmacm/rsocket.h index 94a22f230..09abb53d5 100644 --- a/librdmacm/rsocket.h +++ b/librdmacm/rsocket.h @@ -72,6 +72,7 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); int repoll_create1(int flags); +int repoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen); int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen); From 2fc13a984c5205ce08652fe5da8297505b72c08d Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Tue, 8 Oct 2024 11:22:05 +0300 Subject: [PATCH 03/14] librdmacm: Add support for epoll: epoll_wait This commit implements epoll_wait to retrieve events processed by the centralized thread for an epoll instance. When epoll_wait is called, it copies the events collected by the global thread from the ready list in the epoll instance to the user-provided events buffer. If no events are available in the `revents` field, the function triggers the thread to recheck for events. Epoll_wait returns the total number of ready events. Signed-off-by: Batsheva Black --- debian/librdmacm1.symbols | 1 + librdmacm/librdmacm.map | 1 + librdmacm/librspreload.map | 1 + librdmacm/man/rsocket.7.in | 2 +- librdmacm/preload.c | 59 ++++++++++++++++++++++++++++++++++++++ librdmacm/rsocket.c | 16 +++++++++++ librdmacm/rsocket.h | 1 + 7 files changed, 80 insertions(+), 1 deletion(-) diff --git a/debian/librdmacm1.symbols b/debian/librdmacm1.symbols index cd5ac0556..34cd16268 100644 --- a/debian/librdmacm1.symbols +++ b/debian/librdmacm1.symbols @@ -12,6 +12,7 @@ librdmacm.so.1 librdmacm1 #MINVER# rconnect@RDMACM_1.0 1.0.16 repoll_create1@RDMACM_1.5 61 repoll_ctl@RDMACM_1.5 61 + repoll_wait@RDMACM_1.5 61 rdma_accept@RDMACM_1.0 1.0.15 rdma_ack_cm_event@RDMACM_1.0 1.0.15 rdma_bind_addr@RDMACM_1.0 1.0.15 diff --git a/librdmacm/librdmacm.map b/librdmacm/librdmacm.map index f4a71f1ea..b374a49e1 100644 --- a/librdmacm/librdmacm.map +++ b/librdmacm/librdmacm.map @@ -99,4 +99,5 @@ RDMACM_1.5 { global: repoll_create1; repoll_ctl; + repoll_wait; } RDMACM_1.4; diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index 31ae7b7ba..7ce0c0515 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -32,5 +32,6 @@ epoll_create; epoll_create1; epoll_ctl; + epoll_wait; local: *; }; diff --git a/librdmacm/man/rsocket.7.in b/librdmacm/man/rsocket.7.in index 0c7aeadd5..802d44069 100644 --- a/librdmacm/man/rsocket.7.in +++ b/librdmacm/man/rsocket.7.in @@ -30,7 +30,7 @@ rpoll, rselect rgetpeername, rgetsockname .P rsetsockopt, rgetsockopt, rfcntl -repoll_create1, repoll_ctl +repoll_create1, repoll_ctl, repoll_wait .P Functions take the same parameters as that used for sockets. The follow capabilities and flags are supported at this time: diff --git a/librdmacm/preload.c b/librdmacm/preload.c index 3257f0189..ea4556626 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -93,6 +93,7 @@ struct socket_calls { int (*epoll_create)(int size); int (*epoll_create1)(int flags); int (*epoll_ctl)(int epfd, int op, int fd, struct epoll_event *event); + int (*epoll_wait)(int epfd, struct epoll_event *events, int maxevents, int timeout); }; static struct socket_calls real; @@ -421,6 +422,7 @@ static void init_preload(void) real.epoll_create = dlsym(RTLD_NEXT, "epoll_create"); real.epoll_create1 = dlsym(RTLD_NEXT, "epoll_create1"); real.epoll_ctl = dlsym(RTLD_NEXT, "epoll_ctl"); + real.epoll_wait = dlsym(RTLD_NEXT, "epoll_wait"); rs.socket = dlsym(RTLD_DEFAULT, "rsocket"); rs.bind = dlsym(RTLD_DEFAULT, "rbind"); @@ -1276,3 +1278,60 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) return repoll_ctl(epfd, op, rfd, event); } + +int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) +{ + struct epoll_event ev; + int ret, reg_count = 0, count = 0; + int ep_reg = (uintptr_t)idm_at(&ep_idm, epfd); + int fd; + + // Defensive check: events must be valid when maxevents > 0 + { + struct epoll_event *evp = events; + + if (maxevents > 0 && !evp) + return ERR(EFAULT); + } + + //no events requested + if (maxevents <= 0) + return ERR(EINVAL); + + count = repoll_wait(epfd, events, maxevents, 0); + for (int i = 0; i < count; i++) { + fd = (uintptr_t)idm_lookup(&ridm, events[i].data.fd); + if (!fd) + return -1; + events[i].data.fd = fd; + } + + if ((count < maxevents) && (ep_reg >= 0)) { + reg_count = real.epoll_wait(ep_reg, events + count, maxevents-count, 0); + if (reg_count < 0) + return reg_count; + } + + if (count + reg_count) + return count + reg_count; + + ret = real.epoll_wait(epfd, &ev, 1, timeout); + if (!ret) + return ret; + + count = repoll_wait(epfd, events, maxevents, 0); + for (int j = 0; j < count; j++) { + fd = (uintptr_t)idm_lookup(&ridm, events[j].data.fd); + if (!fd) + return -1; + events[j].data.fd = fd; + } + + if ((count < maxevents) && (ep_reg >= 0)) { + reg_count = real.epoll_wait(ep_reg, events + count, maxevents-count, 0); + if (reg_count < 0) + return reg_count; + } + + return count + reg_count; +} diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index 8b9b1a4c3..a3a2a254c 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -5109,3 +5109,19 @@ int repoll_ctl(int epfd, int op, int fd, struct epoll_event *event) return 0; } + +int repoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) +{ + struct epoll_inst *instance = get_epins(epfd); + + pthread_spin_lock(&instance->evlock); + + int count = (instance->rdy_cnt > maxevents) ? maxevents : instance->rdy_cnt; + + for (int i = 0; i < count; i++) + events[i] = instance->rev[i]; + + pthread_spin_unlock(&instance->evlock); + + return count; +} diff --git a/librdmacm/rsocket.h b/librdmacm/rsocket.h index 09abb53d5..ac8cad517 100644 --- a/librdmacm/rsocket.h +++ b/librdmacm/rsocket.h @@ -73,6 +73,7 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds, int repoll_create1(int flags); int repoll_ctl(int epfd, int op, int fd, struct epoll_event *event); +int repoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen); int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen); From 293e6bf26597dd3b2e22411b0bd80a8e36be0c36 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Thu, 14 Nov 2024 21:00:09 +0200 Subject: [PATCH 04/14] librdmacm: Fix rpoll in case of timeout in case of timeout which causes poll to return, clear all signals that arrived by calling rs_poll_exit. Signed-off-by: Batsheva Black --- librdmacm/rsocket.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index a3a2a254c..d5a7cbd9d 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -3415,8 +3415,10 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) if (timeout >= 0) { timeout -= (int) ((rs_time_us() - start_time) / 1000); - if (timeout <= 0) + if (timeout <= 0) { + rs_poll_exit(); return 0; + } pollsleep = min(timeout, wake_up_interval); } else { pollsleep = wake_up_interval; From 1dbe739a6e076bd5583d8bae14602da0a3399bc4 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Sun, 6 Oct 2024 16:01:30 +0300 Subject: [PATCH 05/14] librdmacm: Fix select function Keep the list of the fds that are sent to poll in order to know which fd belongs to each rfd when returning the revents to the fds list. Signed-off-by: Batsheva Black --- librdmacm/preload.c | 47 +++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/librdmacm/preload.c b/librdmacm/preload.c index ea4556626..975ab58bf 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -901,6 +901,22 @@ static struct pollfd *fds_alloc(nfds_t nfds) return rfds; } +static int *fds_r_alloc(nfds_t nfds) +{ + static __thread int *rfds_r; + static __thread nfds_t rnfds; + + if (nfds > rnfds) { + if (rfds_r) + free(rfds_r); + + rfds_r = malloc(sizeof(*rfds_r) * nfds); + rnfds = rfds_r ? nfds : 0; + } + + return rfds_r; +} + int poll(struct pollfd *fds, nfds_t nfds, int timeout) { struct pollfd *rfds; @@ -934,7 +950,7 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout) } static void select_to_rpoll(struct pollfd *fds, int *nfds, - fd_set *readfds, fd_set *writefds, fd_set *exceptfds) + fd_set *readfds, fd_set *writefds, fd_set *exceptfds, int *fds_r) { int fd, events, i = 0; @@ -945,7 +961,8 @@ static void select_to_rpoll(struct pollfd *fds, int *nfds, if (events || (exceptfds && FD_ISSET(fd, exceptfds))) { fds[i].fd = fd_getd(fd); - fds[i++].events = events; + fds[i].events = events; + fds_r[i++] = fd; } } @@ -953,30 +970,27 @@ static void select_to_rpoll(struct pollfd *fds, int *nfds, } static int rpoll_to_select(struct pollfd *fds, int nfds, - fd_set *readfds, fd_set *writefds, fd_set *exceptfds) + fd_set *readfds, fd_set *writefds, fd_set *exceptfds, int *fds_r) { - int fd, rfd, i, cnt = 0; + int i, cnt = 0; - for (i = 0, fd = 0; i < nfds; fd++) { - rfd = fd_getd(fd); - if (rfd != fds[i].fd) - continue; + for (i = 0; i < nfds; i++) { + assert(fds[i].fd == fd_getd(fds_r[i])); if (readfds && (fds[i].revents & POLLIN)) { - FD_SET(fd, readfds); + FD_SET(fds_r[i], readfds); cnt++; } if (writefds && (fds[i].revents & POLLOUT)) { - FD_SET(fd, writefds); + FD_SET(fds_r[i], writefds); cnt++; } if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) { - FD_SET(fd, exceptfds); + FD_SET(fds_r[i], exceptfds); cnt++; } - i++; } return cnt; @@ -992,12 +1006,17 @@ int select(int nfds, fd_set *readfds, fd_set *writefds, { struct pollfd *fds; int ret; + int *fds_r; fds = fds_alloc(nfds); if (!fds) return ERR(ENOMEM); - select_to_rpoll(fds, &nfds, readfds, writefds, exceptfds); + fds_r = fds_r_alloc(nfds); + if (!fds_r) + return ERR(ENOMEM); + + select_to_rpoll(fds, &nfds, readfds, writefds, exceptfds, fds_r); ret = rpoll(fds, nfds, rs_convert_timeout(timeout)); if (readfds) @@ -1008,7 +1027,7 @@ int select(int nfds, fd_set *readfds, fd_set *writefds, FD_ZERO(exceptfds); if (ret > 0) - ret = rpoll_to_select(fds, nfds, readfds, writefds, exceptfds); + ret = rpoll_to_select(fds, nfds, readfds, writefds, exceptfds, fds_r); return ret; } From ed5e1e0b0fa34c95308db34a74e2943740490c64 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Mon, 23 Dec 2024 13:34:09 +0200 Subject: [PATCH 06/14] librdmacm: Add support for accept4 function The accept4 implementation extends accept to support the additional atomic flag-setting functionality provided by accept4. Signed-off-by: Batsheva Black --- librdmacm/librspreload.map | 1 + librdmacm/preload.c | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index 7ce0c0515..0025fbdb2 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -4,6 +4,7 @@ the signature this will go sideways.. */ global: accept; + accept4; bind; close; connect; diff --git a/librdmacm/preload.c b/librdmacm/preload.c index 975ab58bf..bb523a2ca 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -64,6 +64,7 @@ struct socket_calls { int (*bind)(int socket, const struct sockaddr *addr, socklen_t addrlen); int (*listen)(int socket, int backlog); int (*accept)(int socket, struct sockaddr *addr, socklen_t *addrlen); + int (*accept4)(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags); int (*connect)(int socket, const struct sockaddr *addr, socklen_t addrlen); ssize_t (*recv)(int socket, void *buf, size_t len, int flags); ssize_t (*recvfrom)(int socket, void *buf, size_t len, int flags, @@ -397,6 +398,7 @@ static void init_preload(void) real.bind = dlsym(RTLD_NEXT, "bind"); real.listen = dlsym(RTLD_NEXT, "listen"); real.accept = dlsym(RTLD_NEXT, "accept"); + real.accept4 = dlsym(RTLD_NEXT, "accept4"); real.connect = dlsym(RTLD_NEXT, "connect"); real.recv = dlsym(RTLD_NEXT, "recv"); real.recvfrom = dlsym(RTLD_NEXT, "recvfrom"); @@ -632,6 +634,38 @@ int accept(int socket, struct sockaddr *addr, socklen_t *addrlen) } } +int accept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags) +{ + int cur_flags = 0; + int fd = accept(socket, addr, addrlen); + + if (fd < 0) + return fd; + if (flags & SOCK_NONBLOCK) { + cur_flags = fcntl(fd, F_GETFL); + + if (cur_flags != -1) + cur_flags = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + + if (cur_flags == -1) + goto close; + } + + if (flags & SOCK_CLOEXEC) { + cur_flags = fcntl(fd, F_GETFD); + if (cur_flags != -1) + cur_flags = fcntl(fd, F_SETFD, flags | FD_CLOEXEC); + if (cur_flags == -1) + goto close; + } + + return fd; +close: + close(fd); + return -1; +} + + /* * We can't fork RDMA connections and pass them from the parent to the child * process. Instead, we need to establish the RDMA connection after calling From c03e57b5fed83a627f50df626a6156b48daa2b82 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Sun, 6 Oct 2024 15:42:16 +0300 Subject: [PATCH 07/14] librdmacm: Add support for fcntl64 Add preload interception for fcntl64 so rsocket file descriptors support the same flag semantics as the glibc fcntl64 API. Signed-off-by: Batsheva Black --- librdmacm/CMakeLists.txt | 42 ++++++++++++++++++++++++++ librdmacm/librspreload.map | 1 + librdmacm/preload.c | 60 +++++++++++++++++++++++++++++++++++++- 3 files changed, 102 insertions(+), 1 deletion(-) diff --git a/librdmacm/CMakeLists.txt b/librdmacm/CMakeLists.txt index d245b63ec..d0f77a79c 100644 --- a/librdmacm/CMakeLists.txt +++ b/librdmacm/CMakeLists.txt @@ -25,12 +25,54 @@ target_link_libraries(rdmacm LINK_PRIVATE ${RT_LIBRARIES} ) +# When building without LFS (_FILE_OFFSET_BITS != 64), we may need to wrap +# fcntl64. Only do so if the system libc actually provides them +set(CMAKE_REQUIRED_QUIET 1) +set(SAVE_DEFS "${CMAKE_REQUIRED_DEFINITIONS}") +set(CMAKE_REQUIRED_DEFINITIONS "") +check_c_source_compiles(" +#include +int main(void) { + (void)&fcntl64; + return 0; +} +" RDMA_PRELOAD_HAVE_64) +set(CMAKE_REQUIRED_DEFINITIONS "${SAVE_DEFS}") +set(CMAKE_REQUIRED_QUIET 0) +if(RDMA_PRELOAD_HAVE_64) + set(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS 1) +else() + set(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS 0) +endif() + +# Detect at configure time if fcntl64 is declared so we can +# add our forward declarations only when needed. +set(RDMA_PRELOAD_FCNTL64_IN_HEADER 0) +if(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS) + set(CMAKE_REQUIRED_QUIET 1) + set(SAVE_FLAGS "${CMAKE_REQUIRED_FLAGS}") + set(CMAKE_REQUIRED_FLAGS "${CMAKE_C_FLAGS}") + check_c_source_compiles(" +#define _GNU_SOURCE +#include +int fcntl64(int socket, int cmd, ...) { (void)socket;(void)cmd; return 0; } +" RDMA_PRELOAD_FCNTL64_DECLARED_IN_HEADER) + if(RDMA_PRELOAD_FCNTL64_DECLARED_IN_HEADER) + set(RDMA_PRELOAD_FCNTL64_IN_HEADER 1) + endif() + set(CMAKE_REQUIRED_FLAGS "${SAVE_FLAGS}") + set(CMAKE_REQUIRED_QUIET 0) +endif() + # The preload library is a bit special, it needs to be open coded # Since it is a LD_PRELOAD it has no soname, and is installed in sub dir add_library(rspreload MODULE preload.c indexer.c ) +target_compile_definitions(rspreload PRIVATE + RDMA_PRELOAD_HAVE_64=${RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS} + RDMA_PRELOAD_FCNTL64_IN_HEADER=${RDMA_PRELOAD_FCNTL64_IN_HEADER}) # Even though this is a module we still want to use Wl,--no-undefined set_target_properties(rspreload PROPERTIES LINK_FLAGS ${CMAKE_SHARED_LINKER_FLAGS}) set_target_properties(rspreload PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${BUILD_LIB}") diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index 0025fbdb2..a82820b67 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -10,6 +10,7 @@ connect; dup2; fcntl; + fcntl64; getpeername; getsockname; getsockopt; diff --git a/librdmacm/preload.c b/librdmacm/preload.c index bb523a2ca..cc98b11f1 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -59,6 +59,14 @@ #include "cma.h" #include "indexer.h" +//Whether to compile fcntl64/sendfile64 wrappers. Need both: +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) +#if !RDMA_PRELOAD_FCNTL64_IN_HEADER +int fcntl64(int socket, int cmd, ... /* arg */); +#endif +#endif + struct socket_calls { int (*socket)(int domain, int type, int protocol); int (*bind)(int socket, const struct sockaddr *addr, socklen_t addrlen); @@ -88,6 +96,10 @@ struct socket_calls { int (*getsockopt)(int socket, int level, int optname, void *optval, socklen_t *optlen); int (*fcntl)(int socket, int cmd, ... /* arg */); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ + (!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + int (*fcntl64)(int socket, int cmd, ... /* arg */); +#endif int (*dup2)(int oldfd, int newfd); ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); int (*fxstat)(int ver, int fd, struct stat *buf); @@ -418,6 +430,10 @@ static void init_preload(void) real.setsockopt = dlsym(RTLD_NEXT, "setsockopt"); real.getsockopt = dlsym(RTLD_NEXT, "getsockopt"); real.fcntl = dlsym(RTLD_NEXT, "fcntl"); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + real.fcntl64 = dlsym(RTLD_NEXT, "fcntl64"); +#endif real.dup2 = dlsym(RTLD_NEXT, "dup2"); real.sendfile = dlsym(RTLD_NEXT, "sendfile"); real.fxstat = dlsym(RTLD_NEXT, "__fxstat"); @@ -658,7 +674,6 @@ int accept4(int socket, struct sockaddr *addr, socklen_t *addrlen, int flags) if (cur_flags == -1) goto close; } - return fd; close: close(fd); @@ -1174,6 +1189,49 @@ int fcntl(int socket, int cmd, ... /* arg */) return ret; } +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) +int fcntl64(int socket, int cmd, ... /* arg */) +{ + va_list args; + long lparam; + void *pparam; + int fd, ret; + + init_preload(); + va_start(args, cmd); + switch (cmd) { + case F_GETFD: + case F_GETFL: + case F_GETOWN: + case F_GETSIG: + case F_GETLEASE: + ret = (fd_get(socket, &fd) == fd_rsocket) ? + rfcntl(fd, cmd) : real.fcntl64(fd, cmd); + break; + case F_DUPFD: + /*case F_DUPFD_CLOEXEC:*/ + case F_SETFD: + case F_SETFL: + case F_SETOWN: + case F_SETSIG: + case F_SETLEASE: + case F_NOTIFY: + lparam = va_arg(args, long); + ret = (fd_get(socket, &fd) == fd_rsocket) ? + rfcntl(fd, cmd, lparam) : real.fcntl64(fd, cmd, lparam); + break; + default: + pparam = va_arg(args, void *); + ret = (fd_get(socket, &fd) == fd_rsocket) ? + rfcntl(fd, cmd, pparam) : real.fcntl64(fd, cmd, pparam); + break; + } + va_end(args); + return ret; +} +#endif + /* * dup2 is not thread safe */ From c28c80d0bc263d07ecbd7f5892d07e0dd5ca0ee1 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Sun, 6 Oct 2024 17:01:46 +0300 Subject: [PATCH 08/14] librdmacm: Add support to more optnames in getsockopt, setsockopt getsockopt: TCP_INFO, TCP_CONGESTION, SO_BROADCAST & IP_TOS. setsockopt: IP_TOS & TCP_CONGESTION. Signed-off-by: Batsheva Black --- librdmacm/rsocket.c | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index d5a7cbd9d..5a048f8cb 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -378,6 +378,7 @@ struct rsocket { int opts; int fd_flags; + int ipv4_opts; uint64_t so_opts; uint64_t ipv6_opts; void *optval; @@ -3760,6 +3761,18 @@ int rsetsockopt(int socket, int level, int optname, break; } break; + case IPPROTO_IP: + switch (optname) { + case IP_TOS: + rs->ipv4_opts = *(int *)optval; + ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, + RDMA_OPTION_ID_TOS, + (void *) optval, optlen); + break; + default: + break; + } + break; case IPPROTO_TCP: opts = &rs->tcp_opts; switch (optname) { @@ -3781,6 +3794,7 @@ int rsetsockopt(int socket, int level, int optname, ret = 0; break; case TCP_MAXSEG: + case TCP_CONGESTION: ret = 0; break; default: @@ -3884,6 +3898,7 @@ int rgetsockopt(int socket, int level, int optname, struct rsocket *rs; void *opt; struct ibv_sa_path_rec *path_rec; + struct tcp_info *info; struct ibv_path_data path_data; socklen_t len; int ret = 0; @@ -3921,6 +3936,21 @@ int rgetsockopt(int socket, int level, int optname, *optlen = sizeof(int); rs->err = 0; break; + case SO_BROADCAST: + ret = 0; + break; + default: + ret = ENOTSUP; + break; + } + break; + case IPPROTO_IP: + switch (optname) { + case IP_TOS: + *((int *) optval) = rs->ipv4_opts; + *optlen = sizeof(int); + break; + default: ret = ENOTSUP; break; @@ -3928,6 +3958,7 @@ int rgetsockopt(int socket, int level, int optname, break; case IPPROTO_TCP: switch (optname) { + case TCP_CONGESTION: case TCP_KEEPCNT: case TCP_KEEPINTVL: *((int *) optval) = 1; /* N/A */ @@ -3946,6 +3977,17 @@ int rgetsockopt(int socket, int level, int optname, 2048; *optlen = sizeof(int); break; + case TCP_INFO: + //TODO: support other tcp_info fields. + info = (struct tcp_info *) optval; + memset(info, 0, sizeof(struct tcp_info)); + info->tcpi_state = (rs->state == rs_connected) ? + TCP_ESTABLISHED : TCP_CLOSE; + info->tcpi_snd_cwnd = rs->sq_size; + + *optlen = sizeof(struct tcp_info); + break; + default: ret = ENOTSUP; break; From a7d36422c0f937482adc77291943fce615f73c2f Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Mon, 23 Dec 2024 13:33:44 +0200 Subject: [PATCH 09/14] librdmacm: Fix rfcntl to keep fs flags separately rfcntl keeps the files flags all in the fd_flags argument. Adding the new field fs_flags to the rs struct allows the fcntl function to keep the file status flags separately from the file descriptor flags. Signed-off-by: Batsheva Black --- librdmacm/rsocket.c | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index 5a048f8cb..d89451a32 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -378,6 +378,7 @@ struct rsocket { int opts; int fd_flags; + int fs_flags; int ipv4_opts; uint64_t so_opts; uint64_t ipv6_opts; @@ -895,7 +896,7 @@ static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) if (!cm_id->recv_cq) goto err1; - if (rs->fd_flags & O_NONBLOCK) { + if (rs->fs_flags & O_NONBLOCK) { if (set_fd_nonblock(cm_id->recv_cq_channel->fd, true)) goto err2; } @@ -1327,7 +1328,7 @@ int rlisten(int socket, int backlog) if (ret) return ret; - if (rs->fd_flags & O_NONBLOCK) { + if (rs->fs_flags & O_NONBLOCK) { ret = set_fd_nonblock(rs->accept_queue[0], true); if (ret) return ret; @@ -1519,7 +1520,7 @@ static int rs_do_connect(struct rsocket *rs) rs->state = rs_connect_rdwr; break; case rs_accepting: - if (!(rs->fd_flags & O_NONBLOCK)) + if (!(rs->fs_flags & O_NONBLOCK)) set_fd_nonblock(rs->cm_id->channel->fd, true); ret = ucma_complete(rs->cm_id); @@ -2394,7 +2395,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc static int rs_nonblocking(struct rsocket *rs, int flags) { - return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); + return (rs->fs_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); } static int rs_is_cq_armed(struct rsocket *rs) @@ -3543,7 +3544,7 @@ int rshutdown(int socket, int how) if (rs->opts & RS_OPT_KEEPALIVE) rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); - if (rs->fd_flags & O_NONBLOCK) + if (rs->fs_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); if (rs->state & rs_connected) { @@ -3576,8 +3577,8 @@ int rshutdown(int socket, int how) rs_process_cq(rs, 0, rs_conn_all_sends_done); out: - if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected)) - rs_set_nonblocking(rs, rs->fd_flags); + if ((rs->fs_flags & O_NONBLOCK) && (rs->state & rs_connected)) + rs_set_nonblocking(rs, rs->fs_flags); if (rs->state & rs_disconnected) { /* Generate event by flushing receives to unblock rpoll */ @@ -3593,14 +3594,14 @@ static void ds_shutdown(struct rsocket *rs) if (rs->opts & RS_OPT_UDP_SVC) rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM); - if (rs->fd_flags & O_NONBLOCK) + if (rs->fs_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); rs->state &= ~(rs_readable | rs_writable); ds_process_cqs(rs, 0, ds_all_sends_done); - if (rs->fd_flags & O_NONBLOCK) - rs_set_nonblocking(rs, rs->fd_flags); + if (rs->fs_flags & O_NONBLOCK) + rs_set_nonblocking(rs, rs->fs_flags); } int rclose(int socket) @@ -4078,15 +4079,22 @@ int rfcntl(int socket, int cmd, ... /* arg */ ) va_start(args, cmd); switch (cmd) { case F_GETFL: - ret = rs->fd_flags; + ret = rs->fs_flags; break; case F_SETFL: param = va_arg(args, int); - if ((rs->fd_flags & O_NONBLOCK) != (param & O_NONBLOCK)) + if ((rs->fs_flags & O_NONBLOCK) != (param & O_NONBLOCK)) ret = rs_set_nonblocking(rs, param & O_NONBLOCK); if (!ret) - rs->fd_flags = param; + rs->fs_flags = param; + break; + case F_GETFD: + ret = rs->fd_flags; + break; + case F_SETFD: + param = va_arg(args, int); + rs->fd_flags = param; break; default: ret = ERR(ENOTSUP); From e712a3bf97b465b3eb5af78f436f41bf088e0c99 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Sun, 22 Dec 2024 10:32:20 +0200 Subject: [PATCH 10/14] librdmacm: Add support for sendfile64 Add preload interception for sendfile64 so applications using the 64-bit offset sendfile64 API work correctly with rsocket file descriptors. Signed-off-by: Batsheva Black --- librdmacm/CMakeLists.txt | 21 ++++++++++++++++++--- librdmacm/librspreload.map | 1 + librdmacm/preload.c | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/librdmacm/CMakeLists.txt b/librdmacm/CMakeLists.txt index d0f77a79c..0f8f6437b 100644 --- a/librdmacm/CMakeLists.txt +++ b/librdmacm/CMakeLists.txt @@ -26,14 +26,16 @@ target_link_libraries(rdmacm LINK_PRIVATE ) # When building without LFS (_FILE_OFFSET_BITS != 64), we may need to wrap -# fcntl64. Only do so if the system libc actually provides them +# fcntl64/sendfile64. Only do so if the system libc actually provides them set(CMAKE_REQUIRED_QUIET 1) set(SAVE_DEFS "${CMAKE_REQUIRED_DEFINITIONS}") set(CMAKE_REQUIRED_DEFINITIONS "") check_c_source_compiles(" #include +#include int main(void) { (void)&fcntl64; + (void)&sendfile64; return 0; } " RDMA_PRELOAD_HAVE_64) @@ -45,9 +47,10 @@ else() set(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS 0) endif() -# Detect at configure time if fcntl64 is declared so we can +# Detect at configure time if fcntl64/sendfile64 is declared so we can # add our forward declarations only when needed. set(RDMA_PRELOAD_FCNTL64_IN_HEADER 0) +set(RDMA_PRELOAD_SENDFILE64_IN_HEADER 0) if(RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS) set(CMAKE_REQUIRED_QUIET 1) set(SAVE_FLAGS "${CMAKE_REQUIRED_FLAGS}") @@ -60,6 +63,17 @@ int fcntl64(int socket, int cmd, ...) { (void)socket;(void)cmd; return 0; } if(RDMA_PRELOAD_FCNTL64_DECLARED_IN_HEADER) set(RDMA_PRELOAD_FCNTL64_IN_HEADER 1) endif() + check_c_source_compiles(" +#define _GNU_SOURCE +#include +#include +ssize_t sendfile64(int out_fd, int in_fd, off64_t *offset64, size_t count) { + (void)out_fd;(void)in_fd;(void)offset64;(void)count; return 0; +} +" RDMA_PRELOAD_SENDFILE64_DECLARED_IN_HEADER) + if(RDMA_PRELOAD_SENDFILE64_DECLARED_IN_HEADER) + set(RDMA_PRELOAD_SENDFILE64_IN_HEADER 1) + endif() set(CMAKE_REQUIRED_FLAGS "${SAVE_FLAGS}") set(CMAKE_REQUIRED_QUIET 0) endif() @@ -72,7 +86,8 @@ add_library(rspreload MODULE ) target_compile_definitions(rspreload PRIVATE RDMA_PRELOAD_HAVE_64=${RDMA_PRELOAD_HAVE_LFS_WRAPPER_SYMS} - RDMA_PRELOAD_FCNTL64_IN_HEADER=${RDMA_PRELOAD_FCNTL64_IN_HEADER}) + RDMA_PRELOAD_FCNTL64_IN_HEADER=${RDMA_PRELOAD_FCNTL64_IN_HEADER} + RDMA_PRELOAD_SENDFILE64_IN_HEADER=${RDMA_PRELOAD_SENDFILE64_IN_HEADER}) # Even though this is a module we still want to use Wl,--no-undefined set_target_properties(rspreload PROPERTIES LINK_FLAGS ${CMAKE_SHARED_LINKER_FLAGS}) set_target_properties(rspreload PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${BUILD_LIB}") diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index a82820b67..f453275d1 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -24,6 +24,7 @@ select; send; sendfile; + sendfile64; sendmsg; sendto; setsockopt; diff --git a/librdmacm/preload.c b/librdmacm/preload.c index cc98b11f1..737d771da 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -65,6 +65,9 @@ #if !RDMA_PRELOAD_FCNTL64_IN_HEADER int fcntl64(int socket, int cmd, ... /* arg */); #endif +#if !RDMA_PRELOAD_SENDFILE64_IN_HEADER +ssize_t sendfile64(int out_fd, int in_fd, off64_t *offset64, size_t count); +#endif #endif struct socket_calls { @@ -102,6 +105,10 @@ struct socket_calls { #endif int (*dup2)(int oldfd, int newfd); ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + ssize_t (*sendfile64)(int out_fd, int in_fd, off64_t *offset64, size_t count); +#endif int (*fxstat)(int ver, int fd, struct stat *buf); int (*epoll_create)(int size); int (*epoll_create1)(int flags); @@ -436,6 +443,10 @@ static void init_preload(void) #endif real.dup2 = dlsym(RTLD_NEXT, "dup2"); real.sendfile = dlsym(RTLD_NEXT, "sendfile"); +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) + real.sendfile64 = dlsym(RTLD_NEXT, "sendfile64"); +#endif real.fxstat = dlsym(RTLD_NEXT, "__fxstat"); real.epoll_create = dlsym(RTLD_NEXT, "epoll_create"); real.epoll_create1 = dlsym(RTLD_NEXT, "epoll_create1"); @@ -1304,6 +1315,29 @@ ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count) return ret; } +#if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ +(!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) +ssize_t sendfile64(int out_fd, int in_fd, off64_t *offset64, size_t count) +{ + void *file_addr; + int fd; + size_t ret; + + if (fd_get(out_fd, &fd) != fd_rsocket) + return real.sendfile64(fd, in_fd, offset64, count); + + file_addr = mmap(NULL, count, PROT_READ, 0, in_fd, offset64 ? *offset64 : 0); + if (file_addr == (void *) -1) + return -1; + + ret = rwrite(fd, file_addr, count); + if ((ret > 0) && offset64) + lseek(in_fd, ret, SEEK_CUR); + munmap(file_addr, count); + return ret; +} +#endif + int __fxstat(int ver, int socket, struct stat *buf) { int fd, ret; From b9c661735d0f9248a36f2ec1ca75d5660cc022a2 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Sun, 22 Dec 2024 10:39:13 +0200 Subject: [PATCH 11/14] librdmacm: Add support for dup Add preload interception for dup so that duplicating an rsocket file descriptor produces another rsocket fd that refers to the same connection. Signed-off-by: Batsheva Black --- librdmacm/librspreload.map | 1 + librdmacm/preload.c | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/librdmacm/librspreload.map b/librdmacm/librspreload.map index f453275d1..7151f76f0 100644 --- a/librdmacm/librspreload.map +++ b/librdmacm/librspreload.map @@ -8,6 +8,7 @@ bind; close; connect; + dup; dup2; fcntl; fcntl64; diff --git a/librdmacm/preload.c b/librdmacm/preload.c index 737d771da..376f22ba7 100644 --- a/librdmacm/preload.c +++ b/librdmacm/preload.c @@ -103,6 +103,7 @@ struct socket_calls { (!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) int (*fcntl64)(int socket, int cmd, ... /* arg */); #endif + int (*dup)(int oldfd); int (*dup2)(int oldfd, int newfd); ssize_t (*sendfile)(int out_fd, int in_fd, off_t *offset, size_t count); #if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ @@ -441,6 +442,7 @@ static void init_preload(void) (!defined(RDMA_PRELOAD_HAVE_64) || RDMA_PRELOAD_HAVE_64) real.fcntl64 = dlsym(RTLD_NEXT, "fcntl64"); #endif + real.dup = dlsym(RTLD_NEXT, "dup"); real.dup2 = dlsym(RTLD_NEXT, "dup2"); real.sendfile = dlsym(RTLD_NEXT, "sendfile"); #if (!defined(_FILE_OFFSET_BITS) || _FILE_OFFSET_BITS != 64) && \ @@ -1243,6 +1245,13 @@ int fcntl64(int socket, int cmd, ... /* arg */) } #endif +int dup(int oldfd) +{ + int new_fd = fcntl(oldfd, F_DUPFD, 0); + + return dup2(oldfd, new_fd); +} + /* * dup2 is not thread safe */ From c4322e8fe5d944083b70a9efc693c86c333cbf76 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Mon, 16 Dec 2024 16:49:26 +0200 Subject: [PATCH 12/14] librdmacm: Run connect service with TCP protocol To allow us to respond to disconnect events initiated by the peer kernel CM, run the connect service always with TCP protocol- also when connection succeeds. Signed-off-by: Batsheva Black --- librdmacm/rsocket.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index d89451a32..aa60cd9d2 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -1775,12 +1775,12 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) if (rs->type == SOCK_STREAM) { memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); ret = rs_do_connect(rs); - if (ret == -1 && errno == EINPROGRESS) { - save_errno = errno; - /* The app can still drive the CM state on failure */ - rs_notify_svc(&connect_svc, rs, RS_SVC_ADD_CM); - errno = save_errno; - } + save_errno = errno; + /* The app can still drive the CM state on failure, + * and can respond to disconnect requests + */ + rs_notify_svc(&connect_svc, rs, RS_SVC_ADD_CM); + errno = save_errno; } else { if (rs->state == rs_init) { ret = ds_init_ep(rs); From ef770953981da40a0a7d9dca00534de1701d4259 Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Sun, 10 Nov 2024 14:20:28 +0200 Subject: [PATCH 13/14] librdmacm: Change wake-up timeout from rpoll The changes to rpoll to use a signaling fd to wake up blocked threads, combined with suspending polling while rsockets states may be changing _should_ prevent any threads from blocking indefinitely in rpoll() when a desired state change occurs. We periodically wake up any polling thread, so that it can recheck its rsocket states. The sleeping interval was set to an arbitrary value of 5 seconds, this interval is too long for apps that request a connection and are dependent on the thread waking up, so it's changed now to 0.5 seconds, but can be overridden using config files. Signed-off-by: Batsheva Black --- librdmacm/rsocket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index aa60cd9d2..fd8f32558 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -135,7 +135,7 @@ static uint16_t def_rqsize = 384; static uint32_t def_mem = (1 << 17); static uint32_t def_wmem = (1 << 17); static uint32_t polling_time = 10; -static int wake_up_interval = 5000; +static int wake_up_interval = 500; static int max_events = 40000; /* From 840c6d10469d3c5d702b05230b47dabad32cc1cd Mon Sep 17 00:00:00 2001 From: Batsheva Black Date: Sun, 6 Oct 2024 16:15:33 +0300 Subject: [PATCH 14/14] librdmacm: Fix SOCK_STREAM and SOCK_DGRAM types Updated type checks to identify socket types even when additional flags are present in the type field. Changed the comparison to use bitwise AND for more accurate detection. Signed-off-by: Batsheva Black --- librdmacm/rsocket.c | 68 ++++++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index fd8f32558..06a1da679 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -319,6 +319,7 @@ struct ds_qp { struct rsocket { int type; + int category; int index; fastlock_t slock; fastlock_t rlock; @@ -693,7 +694,7 @@ static void rs_remove(struct rsocket *rs) } /* We only inherit from listening sockets */ -static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) +static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type, int category) { struct rsocket *rs; @@ -702,8 +703,10 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) return NULL; rs->type = type; + rs->category = category; + rs->index = -1; - if (type == SOCK_DGRAM) { + if (category == SOCK_DGRAM) { rs->udp_sock = -1; rs->epfd = -1; } @@ -714,7 +717,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) rs->sq_inline = inherited_rs->sq_inline; rs->sq_size = inherited_rs->sq_size; rs->rq_size = inherited_rs->rq_size; - if (type == SOCK_STREAM) { + if (category == SOCK_STREAM) { rs->ctrl_max_seqno = inherited_rs->ctrl_max_seqno; rs->target_iomap_size = inherited_rs->target_iomap_size; } @@ -724,7 +727,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) rs->sq_inline = def_inline; rs->sq_size = def_sqsize; rs->rq_size = def_rqsize; - if (type == SOCK_STREAM) { + if (category == SOCK_STREAM) { rs->ctrl_max_seqno = RS_QP_CTRL_SIZE; rs->target_iomap_size = def_iomap_size; } @@ -744,7 +747,7 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg) struct ds_qp *qp; int ret = 0; - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (rs->cm_id->recv_cq_channel) ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); @@ -1097,7 +1100,7 @@ static void ds_free(struct rsocket *rs) static void rs_free(struct rsocket *rs) { - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { ds_free(rs); return; } @@ -1248,18 +1251,19 @@ int rsocket(int domain, int type, int protocol) struct rsocket *rs; int index, ret; + int category = type & ~(SOCK_CLOEXEC | SOCK_NONBLOCK); if ((domain != AF_INET && domain != AF_INET6 && domain != AF_IB) || - ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) || - (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) || - (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP)) + ((!(category == SOCK_STREAM)) && (!(category == SOCK_DGRAM))) || + ((category == SOCK_STREAM) && protocol && protocol != IPPROTO_TCP) || + ((category == SOCK_DGRAM) && protocol && protocol != IPPROTO_UDP)) return ERR(ENOTSUP); rs_configure(); - rs = rs_alloc(NULL, type); + rs = rs_alloc(NULL, type, category); if (!rs) return ERR(ENOMEM); - if (type == SOCK_STREAM) { + if (category == SOCK_STREAM) { ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); if (ret) goto err; @@ -1293,7 +1297,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); if (!ret) rs->state = rs_bound; @@ -1359,7 +1363,7 @@ static void rs_accept(struct rsocket *rs) if (ret) return; - new_rs = rs_alloc(rs, rs->type); + new_rs = rs_alloc(rs, rs->type, rs->category); if (!new_rs) goto err; new_rs->cm_id = cm_id; @@ -1772,7 +1776,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); ret = rs_do_connect(rs); save_errno = errno; @@ -2575,7 +2579,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { fastlock_acquire(&rs->rlock); ret = ds_recvfrom(rs, buf, len, flags, NULL, NULL); fastlock_release(&rs->rlock); @@ -2645,7 +2649,7 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags, rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { fastlock_acquire(&rs->rlock); ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen); fastlock_release(&rs->rlock); @@ -2850,7 +2854,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM) { + if (rs->category == SOCK_DGRAM) { fastlock_acquire(&rs->slock); ret = dsend(rs, buf, len, flags); fastlock_release(&rs->slock); @@ -2937,7 +2941,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, rs = idm_at(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (dest_addr || addrlen) return ERR(EISCONN); @@ -3246,7 +3250,7 @@ static int rs_poll_rs(struct rsocket *rs, int events, int ret; check_cq: - if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) || + if ((rs->category == SOCK_STREAM) && ((rs->state & rs_connected) || (rs->state == rs_disconnected) || (rs->state & rs_error))) { rs_process_cq(rs, nonblock, test); @@ -3263,7 +3267,7 @@ static int rs_poll_rs(struct rsocket *rs, int events, } return revents; - } else if (rs->type == SOCK_DGRAM) { + } else if (rs->category == SOCK_DGRAM) { ds_process_cqs(rs, nonblock, test); revents = 0; @@ -3335,7 +3339,7 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) if (fds[i].revents) return 1; - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (rs->state >= rs_connected) rfds[i].fd = rs->cm_id->recv_cq_channel->fd; else @@ -3363,7 +3367,7 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) if (rs) { if (rfds[i].revents) { fastlock_acquire(&rs->cq_wait_lock); - if (rs->type == SOCK_STREAM) + if (rs->category == SOCK_STREAM) rs_get_cq_event(rs); else ds_get_cq_event(rs); @@ -3611,7 +3615,7 @@ int rclose(int socket) rs = idm_lookup(&idm, socket); if (!rs) return EBADF; - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { if (rs->state & rs_connected) rshutdown(socket, SHUT_RDWR); if (rs->opts & RS_OPT_KEEPALIVE) @@ -3649,7 +3653,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); return 0; } else { @@ -3664,7 +3668,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); return 0; } else { @@ -3710,7 +3714,7 @@ int rsetsockopt(int socket, int level, int optname, rs = idm_lookup(&idm, socket); if (!rs) return ERR(EBADF); - if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { + if ((rs->category == SOCK_DGRAM) && level != SOL_RDMA) { ret = setsockopt(rs->udp_sock, level, optname, optval, optlen); if (ret) return ret; @@ -3721,7 +3725,7 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->so_opts; switch (optname) { case SO_REUSEADDR: - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR, (void *) optval, optlen); @@ -3733,8 +3737,8 @@ int rsetsockopt(int socket, int level, int optname, opt_on = *(int *) optval; break; case SO_RCVBUF: - if ((rs->type == SOCK_STREAM && !rs->rbuf) || - (rs->type == SOCK_DGRAM && !rs->qp_list)) + if (((rs->category == SOCK_STREAM) && !rs->rbuf) || + ((rs->category == SOCK_DGRAM) && !rs->qp_list)) rs->rbuf_size = (*(uint32_t *) optval) << 1; ret = 0; break; @@ -3806,7 +3810,7 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->ipv6_opts; switch (optname) { case IPV6_V6ONLY: - if (rs->type == SOCK_STREAM) { + if (rs->category == SOCK_STREAM) { ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, RDMA_OPTION_ID_AFONLY, (void *) optval, optlen); @@ -4808,7 +4812,7 @@ uint32_t epoll_rs(int fd, uint32_t events) struct rsocket *rs = idm_lookup(&idm, fd); check_cq: - if ((rs->type & SOCK_STREAM) && ((rs->state & rs_connected) || + if ((rs->category == SOCK_STREAM) && ((rs->state & rs_connected) || (rs->state == rs_disconnected) || (rs->state & rs_error))) { rs_process_cq(rs, 1, rs_poll_all); @@ -4824,7 +4828,7 @@ uint32_t epoll_rs(int fd, uint32_t events) } return revents; - } else if (rs->type & SOCK_DGRAM) { + } else if (rs->category == SOCK_DGRAM) { ds_process_cqs(rs, 1, rs_poll_all); if ((events & EPOLLIN) && rs_have_rdata(rs))