diff --git a/include/udx.h b/include/udx.h index 5dd0554..782cd8b 100644 --- a/include/udx.h +++ b/include/udx.h @@ -405,6 +405,8 @@ struct udx_packet_s { udx_stream_t *stream; // for incrementing counters when packet is sent + bool cancelled; // true if the stream was closed while this packet is in the uv_udp_send queue + // immediateyl call on_ack(UV_ECANCELLED) in the callback bool lost; bool retransmitted; uint8_t transmits; diff --git a/src/udx.c b/src/udx.c index 4c9b634..9249cc7 100644 --- a/src/udx.c +++ b/src/udx.c @@ -200,25 +200,11 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) { } } -static void -deref_packet (udx_packet_t *pkt) { - if (--pkt->ref_count == 0) { - if (pkt->bufs != &pkt->buf_sml[0]) { - free(pkt->bufs); - free(pkt->wbufs); - } - free(pkt); - } -} - static void cancel_packet (udx_packet_t *pkt) { - uv_buf_t *bufs = pkt->bufs; - udx_stream_write_buf_t **wbufs = pkt->wbufs; - for (int i = 0; i < pkt->nwbufs; i++) { - size_t buf_len = bufs[i + 1].len; - udx_stream_write_buf_t *wbuf = wbufs[i]; + size_t buf_len = pkt->bufs[i + 1].len; + udx_stream_write_buf_t *wbuf = pkt->wbufs[i]; on_bytes_acked(wbuf, buf_len, true); // todo: move into on_bytes_acked itself @@ -228,8 +214,20 @@ cancel_packet (udx_packet_t *pkt) { write->on_ack(write, UV_ECANCELED, 0); } } +} - deref_packet(pkt); +static void +deref_packet (udx_packet_t *pkt) { + if (--pkt->ref_count == 0) { + if (pkt->cancelled) { + cancel_packet(pkt); + } + if (pkt->bufs != &pkt->buf_sml[0]) { + free(pkt->bufs); + free(pkt->wbufs); + } + free(pkt); + } } static void @@ -242,6 +240,7 @@ clear_outgoing_packets (udx_stream_t *stream) { if (stream->pkt) { assert(stream->pkt->ref_count == 1); cancel_packet(stream->pkt); + deref_packet(stream->pkt); } // We should make sure all existing packets do not send, and notify the user that they failed @@ -249,8 +248,8 @@ clear_outgoing_packets (udx_stream_t *stream) { udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_remove(&(stream->outgoing), seq); if (pkt == NULL) continue; - - cancel_packet(pkt); + pkt->cancelled = true; + deref_packet(pkt); } while (stream->write_queue.len > 0) { @@ -310,8 +309,9 @@ udx_write_header (uint8_t header[20], udx_stream_t *stream, int type) { // returns 1 on success, zero if packet can't be promoted to a probe packet static int mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) { - - assert(wanted_size > pkt->size); + if (wanted_size > pkt->size) { + return 0; + } // cannot probeify a packet with 1) no data 2) already has padding if (pkt->nwbufs < 1 || pkt->header[3] != 0) { @@ -370,8 +370,19 @@ finalize_maybe (uv_handle_t *timer) { // 2. if you call this on the send path, you must immediately return from // send_stream_packets -static int +static void +close_stream_internal (udx_stream_t *stream, int err); + +void close_stream (udx_stream_t *stream, int err) { + if (stream->status & UDX_STREAM_DESTROYING) { + return; + } + close_stream_internal(stream, err); +} + +void +close_stream_internal (udx_stream_t *stream, int err) { assert((stream->status & UDX_STREAM_CLOSED) == 0); stream->status |= UDX_STREAM_CLOSED; stream->status &= ~UDX_STREAM_CONNECTED; @@ -441,8 +452,6 @@ close_stream (udx_stream_t *stream, int err) { if (udx->teardown && socket != NULL && socket->streams == NULL) { udx_socket_close(socket); } - - return 1; } static void @@ -873,7 +882,7 @@ send_new_packet (udx_stream_t *stream, int probe_type) { if (first_alloc) { pkt->bufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0])); - pkt->wbufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->wbufs[0])); + pkt->wbufs = malloc((pkt->nwbufs_capacity) * sizeof(pkt->wbufs[0])); memcpy(pkt->bufs, pkt->buf_sml, sizeof(pkt->buf_sml)); memcpy(pkt->wbufs, pkt->wbuf_sml, sizeof(pkt->wbuf_sml)); } else { @@ -1798,12 +1807,21 @@ static void on_uv_udp_recv (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { if (nread == 0 && addr == NULL) return; + if (nread < 0) { + debug_printf("udx: uv_udp_recv err=%s\n", uv_strerror(nread)); + assert(nread != UV_EBADF); + assert(nread != UV_ENOTSOCK); + assert(nread != UV_EINVAL); + assert(nread != UV_EFAULT); + return; + } + udx_socket_t *socket = handle->data; // todo: cast instead, save a dereference ? assert(!(socket->status & UDX_SOCKET_CLOSED)); if (flags & UV_UDP_PARTIAL) { - assert(false && "todo: log error for large messages?"); + debug_printf("udx: uv_udp_recv received partial packet\n"); } assert((size_t) nread <= buf->len); @@ -2681,7 +2699,7 @@ stream_on_destroy_send (udx_stream_t *stream) { udx->packets_tx++; udx->bytes_tx += UDX_HEADER_SIZE; - close_stream(stream, 0); + close_stream_internal(stream, 0); } static void diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6a9e91c..6627ceb 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -6,6 +6,7 @@ list(APPEND tests socket-send-recv-dualstack socket-send-recv-ipv6 stream-destroy + stream-destroy-slowpath stream-destroy-before-connect stream-preconnect stream-preconnect-same-socket diff --git a/test/stream-destroy-slowpath.c b/test/stream-destroy-slowpath.c new file mode 100644 index 0000000..f1b5bf1 --- /dev/null +++ b/test/stream-destroy-slowpath.c @@ -0,0 +1,70 @@ +#include +#include +#include + +#include "../include/udx.h" + +int +close_stream (udx_stream_t *stream, int status); + +uv_loop_t loop; +udx_t udx; +udx_socket_t sock; + +bool close_called = false; + +void +on_close (udx_stream_t *handle, int status) { + assert(status == 0); + + int e = udx_socket_close(&sock); + + assert(e == 0); + + close_called = true; +} + +void +ack_cb (udx_stream_write_t *req, int status, int unordered) { + + printf("ack status=%d unordered=%d\n", status, unordered); +} + +int +main () { + int e; + + uv_loop_init(&loop); + + e = udx_init(&loop, &udx, NULL); + assert(e == 0); + + e = udx_socket_init(&udx, &sock, NULL); + assert(e == 0); + + struct sockaddr_in addr; + uv_ip4_addr("127.0.0.1", 8081, &addr); + e = udx_socket_bind(&sock, (struct sockaddr *) &addr, 0); + assert(e == 0); + + udx_stream_t stream; + e = udx_stream_init(&udx, &stream, 1, on_close, NULL); + assert(e == 0); + + e = udx_stream_connect(&stream, &sock, 2, (struct sockaddr *) &addr); + assert(e == 0); + + uv_buf_t buf = uv_buf_init("hello", 5); + udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1)); + + udx_stream_write(req, &stream, &buf, 1, ack_cb); + e = udx_stream_destroy(&stream); + close_stream(&stream, 0); + + uv_run(&loop, UV_RUN_DEFAULT); + free(req); + + assert(close_called); + + return 0; +}