From 18158f137c6e68c91198cc0aa0b875bc1ba98742 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Thu, 2 Apr 2026 03:26:39 -0400 Subject: [PATCH 1/3] wait to call on_ack(CANCEL) if packet is in uv_udp_send queue. Wait to close stream if a destroy packet is already in the uv_udp_send queue. Add asserts to catch unexpected conditions --- include/udx.h | 2 ++ src/udx.c | 66 +++++++++++++++++++++++++++------------------ test/CMakeLists.txt | 1 + 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/include/udx.h b/include/udx.h index 5dd0554..782cd8b 100644 --- a/include/udx.h +++ b/include/udx.h @@ -405,6 +405,8 @@ struct udx_packet_s { udx_stream_t *stream; // for incrementing counters when packet is sent + bool cancelled; // true if the stream was closed while this packet is in the uv_udp_send queue + // immediateyl call on_ack(UV_ECANCELLED) in the callback bool lost; bool retransmitted; uint8_t transmits; diff --git a/src/udx.c b/src/udx.c index 4c9b634..f094228 100644 --- a/src/udx.c +++ b/src/udx.c @@ -200,25 +200,11 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) { } } -static void -deref_packet (udx_packet_t *pkt) { - if (--pkt->ref_count == 0) { - if (pkt->bufs != &pkt->buf_sml[0]) { - free(pkt->bufs); - free(pkt->wbufs); - } - free(pkt); - } -} - static void cancel_packet (udx_packet_t *pkt) { - uv_buf_t *bufs = pkt->bufs; - udx_stream_write_buf_t **wbufs = pkt->wbufs; - for (int i = 0; i < pkt->nwbufs; i++) { - size_t buf_len = bufs[i + 1].len; - udx_stream_write_buf_t *wbuf = wbufs[i]; + size_t buf_len = pkt->bufs[i + 1].len; + udx_stream_write_buf_t *wbuf = pkt->wbufs[i]; on_bytes_acked(wbuf, buf_len, true); // todo: move into on_bytes_acked itself @@ -228,8 +214,20 @@ cancel_packet (udx_packet_t *pkt) { write->on_ack(write, UV_ECANCELED, 0); } } +} - deref_packet(pkt); +static void +deref_packet (udx_packet_t *pkt) { + if (--pkt->ref_count == 0) { + if (pkt->cancelled) { + cancel_packet(pkt); + } + if (pkt->bufs != &pkt->buf_sml[0]) { + free(pkt->bufs); + free(pkt->wbufs); + } + free(pkt); + } } static void @@ -242,6 +240,7 @@ clear_outgoing_packets (udx_stream_t *stream) { if (stream->pkt) { assert(stream->pkt->ref_count == 1); cancel_packet(stream->pkt); + deref_packet(stream->pkt); } // We should make sure all existing packets do not send, and notify the user that they failed @@ -249,8 +248,8 @@ clear_outgoing_packets (udx_stream_t *stream) { udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_remove(&(stream->outgoing), seq); if (pkt == NULL) continue; - - cancel_packet(pkt); + pkt->cancelled = true; + deref_packet(pkt); } while (stream->write_queue.len > 0) { @@ -310,8 +309,9 @@ udx_write_header (uint8_t header[20], udx_stream_t *stream, int type) { // returns 1 on success, zero if packet can't be promoted to a probe packet static int mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) { - - assert(wanted_size > pkt->size); + if (wanted_size > pkt->size) { + return 0; + } // cannot probeify a packet with 1) no data 2) already has padding if (pkt->nwbufs < 1 || pkt->header[3] != 0) { @@ -370,8 +370,19 @@ finalize_maybe (uv_handle_t *timer) { // 2. if you call this on the send path, you must immediately return from // send_stream_packets -static int +static void +close_stream_internal (udx_stream_t *stream, int err); + +void close_stream (udx_stream_t *stream, int err) { + if (stream->status & UDX_STREAM_DESTROYING) { + return; + } + close_stream_internal(stream, err); +} + +void +close_stream_internal (udx_stream_t *stream, int err) { assert((stream->status & UDX_STREAM_CLOSED) == 0); stream->status |= UDX_STREAM_CLOSED; stream->status &= ~UDX_STREAM_CONNECTED; @@ -441,8 +452,6 @@ close_stream (udx_stream_t *stream, int err) { if (udx->teardown && socket != NULL && socket->streams == NULL) { udx_socket_close(socket); } - - return 1; } static void @@ -873,7 +882,7 @@ send_new_packet (udx_stream_t *stream, int probe_type) { if (first_alloc) { pkt->bufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0])); - pkt->wbufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->wbufs[0])); + pkt->wbufs = malloc((pkt->nwbufs_capacity) * sizeof(pkt->wbufs[0])); memcpy(pkt->bufs, pkt->buf_sml, sizeof(pkt->buf_sml)); memcpy(pkt->wbufs, pkt->wbuf_sml, sizeof(pkt->wbuf_sml)); } else { @@ -1798,6 +1807,11 @@ static void on_uv_udp_recv (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { if (nread == 0 && addr == NULL) return; + if (nread < 0) { + debug_printf("udx: uv_udp_recv err=%s\n", uv_strerror(nread)); + } + assert(nread >= 0); + udx_socket_t *socket = handle->data; // todo: cast instead, save a dereference ? assert(!(socket->status & UDX_SOCKET_CLOSED)); @@ -2681,7 +2695,7 @@ stream_on_destroy_send (udx_stream_t *stream) { udx->packets_tx++; udx->bytes_tx += UDX_HEADER_SIZE; - close_stream(stream, 0); + close_stream_internal(stream, 0); } static void diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6a9e91c..6627ceb 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -6,6 +6,7 @@ list(APPEND tests socket-send-recv-dualstack socket-send-recv-ipv6 stream-destroy + stream-destroy-slowpath stream-destroy-before-connect stream-preconnect stream-preconnect-same-socket From 92416cb8fc85f5dc389f9be82f1a782eef42d3e0 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Thu, 2 Apr 2026 03:45:26 -0400 Subject: [PATCH 2/3] don't assert on all uv_udp_recv errors, only EBADF, EINVAL, EINVAL and EFAULT, just log and drop the packet for others. also log partial packets, but process them. --- src/udx.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/udx.c b/src/udx.c index f094228..6b71ef8 100644 --- a/src/udx.c +++ b/src/udx.c @@ -1809,15 +1809,19 @@ on_uv_udp_recv (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const stru if (nread < 0) { debug_printf("udx: uv_udp_recv err=%s\n", uv_strerror(nread)); + assert(nread != UV_EBADF); + assert(nread != UV_ENOTSOCK); + assert(nread != UV_EINVAL); + assert(nread != UV_EFAULT); + return; } - assert(nread >= 0); udx_socket_t *socket = handle->data; // todo: cast instead, save a dereference ? assert(!(socket->status & UDX_SOCKET_CLOSED)); if (flags & UV_UDP_PARTIAL) { - assert(false && "todo: log error for large messages?"); + debug_printf("udx: uv_udp_recv received partial packet\n", uv_strerror(nread)); } assert((size_t) nread <= buf->len); From dd1f4209928c69a623cf9e1d1000bc1ab73fa08c Mon Sep 17 00:00:00 2001 From: James Thomas Date: Thu, 2 Apr 2026 03:52:10 -0400 Subject: [PATCH 3/3] add missing test --- src/udx.c | 2 +- test/stream-destroy-slowpath.c | 70 ++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 test/stream-destroy-slowpath.c diff --git a/src/udx.c b/src/udx.c index 6b71ef8..9249cc7 100644 --- a/src/udx.c +++ b/src/udx.c @@ -1821,7 +1821,7 @@ on_uv_udp_recv (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const stru assert(!(socket->status & UDX_SOCKET_CLOSED)); if (flags & UV_UDP_PARTIAL) { - debug_printf("udx: uv_udp_recv received partial packet\n", uv_strerror(nread)); + debug_printf("udx: uv_udp_recv received partial packet\n"); } assert((size_t) nread <= buf->len); diff --git a/test/stream-destroy-slowpath.c b/test/stream-destroy-slowpath.c new file mode 100644 index 0000000..f1b5bf1 --- /dev/null +++ b/test/stream-destroy-slowpath.c @@ -0,0 +1,70 @@ +#include +#include +#include + +#include "../include/udx.h" + +int +close_stream (udx_stream_t *stream, int status); + +uv_loop_t loop; +udx_t udx; +udx_socket_t sock; + +bool close_called = false; + +void +on_close (udx_stream_t *handle, int status) { + assert(status == 0); + + int e = udx_socket_close(&sock); + + assert(e == 0); + + close_called = true; +} + +void +ack_cb (udx_stream_write_t *req, int status, int unordered) { + + printf("ack status=%d unordered=%d\n", status, unordered); +} + +int +main () { + int e; + + uv_loop_init(&loop); + + e = udx_init(&loop, &udx, NULL); + assert(e == 0); + + e = udx_socket_init(&udx, &sock, NULL); + assert(e == 0); + + struct sockaddr_in addr; + uv_ip4_addr("127.0.0.1", 8081, &addr); + e = udx_socket_bind(&sock, (struct sockaddr *) &addr, 0); + assert(e == 0); + + udx_stream_t stream; + e = udx_stream_init(&udx, &stream, 1, on_close, NULL); + assert(e == 0); + + e = udx_stream_connect(&stream, &sock, 2, (struct sockaddr *) &addr); + assert(e == 0); + + uv_buf_t buf = uv_buf_init("hello", 5); + udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1)); + + udx_stream_write(req, &stream, &buf, 1, ack_cb); + e = udx_stream_destroy(&stream); + close_stream(&stream, 0); + + uv_run(&loop, UV_RUN_DEFAULT); + free(req); + + assert(close_called); + + return 0; +}