diff --git a/include/udx.h b/include/udx.h index 666eb6b..79731c7 100644 --- a/include/udx.h +++ b/include/udx.h @@ -171,17 +171,16 @@ typedef struct udx_queue_s { } udx_queue_t; struct udx_socket_s { - uv_udp_t uv_udp; // must be first + uv_udp_t uv_udp; // must be first + uv_timer_t timer; // used for draining the specific-ttl-send-queue + + int nrefs; // 2 - uv_udp and uv_timer // packets queued with udx_socket_send_ttl that both // 1. override the socket's TTL value and // 2. can't be sent immediately via uv_udp_try_send() - // are queued by sending with uv_udp_send() and simultaneously queued here. - // Then when a packet is sent if the next packet is the packet at the head of this - // queue (ie the next packet has a specified TTL), then the sockets ttl is temporarily - // set via uv_udp_set_ttl, and the udx_socket_send callback will restore the ttl after + // are queued here, and sent later udx_queue_t specific_ttl_send_queue; - uint64_t packets_sent_via_uv_send_queue; udx_socket_t *prev; udx_socket_t *next; @@ -444,13 +443,13 @@ struct udx_packet_s { struct udx_socket_send_s { uv_udp_send_t uv_udp_send; - udx_queue_node_t queue; + udx_queue_node_t queue; // could be singly linked actually uint32_t ttl; - // when queued for sending, the value stored here is: - // socket.packets_sent_via_uv_send_queue + socket.send_queue_count - // it is used to determine when this packet is at the head of the queue - // so that the TTL can be adjusted - uint64_t place_in_queue; + + // these are for saving the buffer + destination when sending with specific TTL on the slow path + uv_buf_t buf; // only for sending with specific ttl, buf.base = (char *) (req+1) + struct sockaddr_storage remote_addr; // only for sending with specific ttl + int remote_addr_len; // only for sending with specific ttl udx_socket_t *socket; udx_socket_send_cb on_send; diff --git a/src/udx.c b/src/udx.c index 5fded99..808d14f 100644 --- a/src/udx.c +++ b/src/udx.c @@ -135,15 +135,20 @@ ref_dec (udx_t *udx) { } static void -on_uv_close (uv_handle_t *handle) { - udx_socket_t *socket = (udx_socket_t *) handle; - udx_t *udx = socket->udx; +on_udx_socket_handle_close (uv_handle_t *handle) { - if (socket->on_close != NULL) { - socket->on_close(socket); - } + udx_socket_t *socket = handle->data; + socket->nrefs--; - ref_dec(udx); + if (socket->nrefs == 0) { + udx_t *udx = socket->udx; + + if (socket->on_close != NULL) { + socket->on_close(socket); + } + + ref_dec(udx); + } } static void @@ -446,36 +451,10 @@ close_stream (udx_stream_t *stream, int err) { static void udx_rto_timeout (uv_timer_t *handle); -static bool -_maybe_adjust_ttl (udx_socket_t *socket) { - - if (socket->specific_ttl_send_queue.len == 0) { - return false; - } - - udx_socket_send_t *req = udx__queue_data(udx__queue_peek(&socket->specific_ttl_send_queue), udx_socket_send_t, queue); - - if (req->place_in_queue == socket->packets_sent_via_uv_send_queue) { - uv_udp_set_ttl(&socket->uv_udp, req->ttl); - return true; - } - return false; -} - -// every packet sent via uv_udp_send() must call this function as part of their callback -static void -maybe_adjust_ttl (uv_udp_t *udp) { - udx_socket_t *socket = (udx_socket_t *) udp; // todo: use offsetof instead? - socket->packets_sent_via_uv_send_queue++; - - _maybe_adjust_ttl(socket); -} - // used to free simple (ack, probe, and relay) memory // stream-write, stream-send and stream-destroy packets have their own callbacks void on_packet_send_slow (uv_udp_send_t *req, int status) { - maybe_adjust_ttl(req->handle); UDX_UNUSED(status); free(req); @@ -670,7 +649,6 @@ on_stream_data_write (uv_udp_send_t *send, int status) { debug_printf("sendmsg: %s\n", uv_strerror(status)); } - maybe_adjust_ttl(send->handle); // send is freed with packet deref_packet(pkt); } @@ -870,7 +848,7 @@ send_new_packet (udx_stream_t *stream, int probe_type) { if (first_alloc) { pkt->bufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0])); - pkt->wbufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->wbufs[0])); + pkt->wbufs = malloc((pkt->nwbufs_capacity) * sizeof(pkt->wbufs[0])); memcpy(pkt->bufs, pkt->buf_sml, sizeof(pkt->buf_sml)); memcpy(pkt->wbufs, pkt->wbuf_sml, sizeof(pkt->wbuf_sml)); } else { @@ -1871,21 +1849,20 @@ udx_socket_init (udx_t *udx, udx_socket_t *socket, udx_socket_close_cb cb) { socket->on_recv = NULL; socket->on_close = cb; - socket->bytes_rx = 0; - socket->bytes_tx = 0; - socket->packets_rx = 0; - socket->packets_tx = 0; - socket->packets_dropped_by_kernel = -1; - uv_udp_t *handle = &socket->uv_udp; // Asserting all the errors here as it massively simplifies error handling. // In practice these will never fail. - int err = uv_udp_init(udx->loop, handle); + int err = uv_udp_init(udx->loop, &socket->uv_udp); + assert(err == 0); + socket->uv_udp.data = socket; + + err = uv_timer_init(udx->loop, &socket->timer); assert(err == 0); + socket->timer.data = socket; - handle->data = socket; + socket->nrefs = 2; return err; } @@ -2039,32 +2016,56 @@ udx_socket_send (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bu static void on_socket_send_slow (uv_udp_send_t *_req, int status) { - udx_socket_send_t *req = (udx_socket_send_t *) ((char *) _req - offsetof(udx_socket_send_t, uv_udp_send)); + udx_socket_send_t *req = container_of(_req, udx_socket_send_t, uv_udp_send); - udx_socket_t *socket = req->socket; - // 1. if packet was sent with specifc ttl, remove it from queue and reset ttl - if (req->ttl) { - udx_socket_send_t *removed = udx__queue_data(udx__queue_shift(&socket->specific_ttl_send_queue), udx_socket_send_t, queue); - assert(removed == req); - // restore ttl after sending - uv_udp_set_ttl(&socket->uv_udp, socket->ttl); - } - - // 2. if next packet is also a specific ttl it will be re-set here - maybe_adjust_ttl(_req->handle); if (req->on_send) { req->on_send(req, status); } } +static void +retry_send_specific_ttl (uv_timer_t *timer) { + udx_socket_t *socket = timer->data; + debug_printf("retry sending specific ttl, qlen=%u\n", socket->specific_ttl_send_queue.len); + + while (socket->specific_ttl_send_queue.len) { + udx_socket_send_t *req = udx__queue_data(udx__queue_peek(&socket->specific_ttl_send_queue), udx_socket_send_t, queue); + uint32_t ttl = req->ttl; + uv_udp_set_ttl(&socket->uv_udp, ttl); + + int err = uv_udp_try_send(&socket->uv_udp, &req->buf, 1, (struct sockaddr *) &req->remote_addr); + + if (err == UV_EAGAIN) { + break; + } else { + req->on_send(req, err >= 0 ? 0 : err); + } + + udx__queue_shift(&socket->specific_ttl_send_queue); + } + + uv_udp_set_ttl(&socket->uv_udp, socket->ttl); + + if (socket->specific_ttl_send_queue.len > 0) { + uv_timer_start(&socket->timer, retry_send_specific_ttl, 1, 0); + } else { + uv_timer_stop(&socket->timer); + } +} + int udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *dest, int ttl, udx_socket_send_cb cb) { if (ttl < 0 /* 0 is "default" */ || ttl > 255) return UV_EINVAL; + if (dest->sa_family != AF_INET && dest->sa_family != AF_INET6) return UV_EINVAL; req->ttl = ttl; req->on_send = cb; req->socket = socket; + req->buf = bufs[0]; + req->remote_addr_len = dest->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + memcpy(&req->remote_addr, dest, req->remote_addr_len); + struct sockaddr_in6 dest6; if (socket->family == 6 && dest->sa_family == AF_INET) { memset(&dest6, 0, sizeof(dest6)); @@ -2098,15 +2099,17 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_ } if (err == UV_EAGAIN) { - // slow path + // slow path (1) with specific TTL if (ttl) { - req->place_in_queue = socket->packets_sent_via_uv_send_queue + socket->uv_udp.send_queue_count; udx__queue_tail(&socket->specific_ttl_send_queue, &req->queue); + if (!uv_is_active((uv_handle_t *) &socket->timer)) { + uv_timer_start(&socket->timer, retry_send_specific_ttl, 1, 0); + } + err = 0; + } else { + // slow path (2) - non-specific TTL + err = uv_udp_send(&req->uv_udp_send, &socket->uv_udp, bufs, bufs_len, dest, on_socket_send_slow); } - - err = uv_udp_send(&req->uv_udp_send, &socket->uv_udp, bufs, bufs_len, dest, on_socket_send_slow); - _maybe_adjust_ttl(socket); // edge case: queue was empty - return err; } @@ -2139,7 +2142,9 @@ udx_socket_close (udx_socket_t *socket) { socket->status |= UDX_SOCKET_CLOSED; - uv_close((uv_handle_t *) &socket->uv_udp, on_uv_close); + uv_close((uv_handle_t *) &socket->uv_udp, on_udx_socket_handle_close); + uv_timer_stop(&socket->timer); + uv_close((uv_handle_t *) &socket->timer, on_udx_socket_handle_close); udx_t *udx = socket->udx; udx__link_remove(udx->sockets, socket); diff --git a/test/socket-send-recv.c b/test/socket-send-recv.c index a3d6949..9a80bc2 100644 --- a/test/socket-send-recv.c +++ b/test/socket-send-recv.c @@ -19,7 +19,7 @@ struct { int next_ttl; bool send_called; bool recv_called; -} tests[NTESTS] = {{.string = "one", .ttl = 0, .next_ttl = 10}, {.string = "two", .ttl = 10, .next_ttl = 20}, {.string = "three", .ttl = 20, .next_ttl = 0}, {.string = "four", .ttl = 0, .next_ttl = 0}}; +} tests[NTESTS] = {{.string = "one", .ttl = 0}, {.string = "two", .ttl = 10}, {.string = "three", .ttl = 20}, {.string = "four", .ttl = 0}}; int nrecv_called; @@ -35,14 +35,19 @@ on_send (udx_socket_send_t *r, int status) { for (i = 0; i < NTESTS; i++) { if (r == &tests[i].req) { - int wanted_ttl = tests[i].next_ttl ?: bsock.ttl; /* UDX_DEFAULT_TTL */ + int wanted_ttl = tests[i].ttl ?: bsock.ttl; /* UDX_DEFAULT_TTL */ tests[i].send_called = true; - // assert(ttl == wanted_ttl); + assert(ttl == wanted_ttl); } } } +int nclose; +void +on_socket_close (udx_socket_t *socket) { + nclose++; +} void on_recv (udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from) { @@ -58,7 +63,8 @@ on_recv (udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const stru } if (++nrecv_called == 4) { - uv_stop(&loop); + udx_socket_close(&asock); + udx_socket_close(&bsock); } } @@ -71,10 +77,10 @@ main () { e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock, NULL); + e = udx_socket_init(&udx, &asock, on_socket_close); assert(e == 0); - e = udx_socket_init(&udx, &bsock, NULL); + e = udx_socket_init(&udx, &bsock, on_socket_close); assert(e == 0); struct sockaddr_in baddr; @@ -103,7 +109,9 @@ main () { } } - uv_run(&loop, UV_RUN_DEFAULT); + int rc = uv_run(&loop, UV_RUN_DEFAULT); + + assert(rc == 0); for (int i = 0; i < NTESTS; i++) { assert(tests[i].send_called);