From 0a4e998dc9e41296e5af89fddc8c6d8882f7cdd2 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Mon, 30 Dec 2024 11:11:09 -0500 Subject: [PATCH] separate wbuf_queue and write_queue to make cleanup simpler: just free packets and call write->on_ack on each write --- include/udx.h | 2 ++ src/udx.c | 75 ++++++++++++++++++--------------------------------- 2 files changed, 28 insertions(+), 49 deletions(-) diff --git a/include/udx.h b/include/udx.h index 821cedd..d1b88fa 100644 --- a/include/udx.h +++ b/include/udx.h @@ -298,6 +298,7 @@ struct udx_stream_s { // congestion state udx_cong_t cong; + udx_queue_t wbuf_queue; udx_queue_t write_queue; udx_cirbuf_t outgoing; @@ -365,6 +366,7 @@ struct udx_stream_write_buf_s { struct udx_stream_write_s { size_t size; + udx_queue_node_t queue; size_t bytes_acked; bool is_write_end; diff --git a/src/udx.c b/src/udx.c index 8678dd7..8a3b128 100644 --- a/src/udx.c +++ b/src/udx.c @@ -186,7 +186,7 @@ on_uv_close (uv_handle_t *handle) { static bool stream_has_data (udx_stream_t *stream) { - return stream->write_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0; + return stream->wbuf_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0; } static bool @@ -416,46 +416,18 @@ wbufs_offset (udx_packet_t *pkt) { static void clear_outgoing_packets (udx_stream_t *stream) { - // todo: skip the math, and just - // 1. destroy all packets - // 2. destroy all wbufs - // 3. set write->bytes_acked = write->size and call ack(cancel) on all writes - // We should make sure all existing packets do not send, and notify the user that they failed + // We should make sure all existing packets do not send for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) { udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_remove(&(stream->outgoing), seq); - if (pkt == NULL) continue; - - assert(pkt->nbufs >= 2); - - uv_buf_t *bufs = (uv_buf_t *) (pkt + 1); - udx_stream_write_buf_t **wbufs = wbufs_offset(pkt); - - for (int i = 1; i < pkt->nbufs; i++) { - size_t pkt_len = bufs[i].len; - udx_stream_write_buf_t *wbuf = wbufs[i - 1]; - on_bytes_acked(wbuf, pkt_len, true); - - // todo: move into on_bytes_acked itself - udx_stream_write_t *write = wbuf->write; - - if (write->bytes_acked == write->size && write->on_ack) { - write->on_ack(write, UV_ECANCELED, 0); - } - } - free(pkt); } + // notify the user that their pending writes failed while (stream->write_queue.len > 0) { - udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_shift(&stream->write_queue), udx_stream_write_buf_t, queue); - assert(wbuf != NULL); - - on_bytes_acked(wbuf, wbuf->buf.len - wbuf->bytes_acked, true); - // todo: move into on_bytes_acked itself - udx_stream_write_t *write = wbuf->write; - if (write->bytes_acked == write->size && write->on_ack) { + udx_stream_write_t *write = udx__queue_data(udx__queue_shift(&stream->write_queue), udx_stream_write_t, queue); + if (write->on_ack) { write->on_ack(write, UV_ECANCELED, 0); } } @@ -1068,13 +1040,15 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) { udx_stream_write_t *write = wbuf->write; - if (write->bytes_acked == write->size && write->on_ack) { - write->on_ack(write, 0, sack); - - // reentry from write->on_ack - if (stream->status & UDX_STREAM_DEAD) { - free(pkt); - return 2; + if (write->bytes_acked == write->size) { + udx__queue_unlink(&stream->write_queue, &write->queue); + if (write->on_ack) { + write->on_ack(write, 0, sack); + // reentry from write->on_ack + if (stream->status & UDX_STREAM_DEAD) { + free(pkt); + return 2; + } } } } @@ -1084,7 +1058,7 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) { // TODO: the end condition needs work here to be more "stateless" // ie if the remote has acked all our writes, then instead of waiting for retransmits, we should // clear those and mark as local ended NOW. - if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_END && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) { + if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_END && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->wbuf_queue.len == 0) { stream->status |= UDX_STREAM_ENDED; return 2; } @@ -1649,6 +1623,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) { assert(stream->retransmit_queue.len == 0); + assert(stream->wbuf_queue.len == 0); assert(stream->write_queue.len == 0); close_stream(stream, 0); return true; @@ -1709,7 +1684,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { arm_stream_timers(stream, false); } - while (stream->write_queue.len > 0 && (stream_may_send(stream) || stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP)) { + while (stream->wbuf_queue.len > 0 && (stream_may_send(stream) || stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP)) { bool tlp = stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP; bool zwp = stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP; @@ -1731,8 +1706,8 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { int nwbufs = 0; size_t size = 0; - while (capacity > 0 && nwbufs < UDX_MAX_COMBINED_WRITES && stream->write_queue.len > 0) { - udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_peek(&stream->write_queue), udx_stream_write_buf_t, queue); + while (capacity > 0 && nwbufs < UDX_MAX_COMBINED_WRITES && stream->wbuf_queue.len > 0) { + udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_peek(&stream->wbuf_queue), udx_stream_write_buf_t, queue); uv_buf_t *buf = &wbuf->buf; @@ -1759,7 +1734,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { if (wbuf->is_write_end) { header_flag |= UDX_HEADER_END; } - udx__queue_shift(&stream->write_queue); + udx__queue_shift(&stream->wbuf_queue); } } @@ -1790,7 +1765,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { while (i--) { udx_stream_write_buf_t *wbuf = wbufs[i]; if (wbuf->bytes_acked + wbuf->bytes_inflight == wbuf->buf.len) { - udx__queue_head(&stream->write_queue, &wbuf->queue); + udx__queue_head(&stream->wbuf_queue, &wbuf->queue); } wbuf->bytes_inflight -= bufs[i].len; } @@ -1834,12 +1809,12 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { assert(stream->status != UDX_STREAM_CLOSED); - if (stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP && stream->write_queue.len == 0 && stream->retransmit_queue.len == 0) { + if (stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP && stream->wbuf_queue.len == 0 && stream->retransmit_queue.len == 0) { // if there's no data then we don't need to probe the window anyways. stream->write_wanted &= ~UDX_STREAM_WRITE_WANT_ZWP; } - if (stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP && stream->write_queue.len == 0) { + if (stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP && stream->wbuf_queue.len == 0) { // rack 7.3 udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, stream->seq - 1); @@ -2380,6 +2355,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream udx__queue_init(&stream->inflight_queue); udx__queue_init(&stream->retransmit_queue); udx__queue_init(&stream->unordered_queue); + udx__queue_init(&stream->wbuf_queue); udx__queue_init(&stream->write_queue); stream->pkts_buffered = 0; @@ -2711,8 +2687,9 @@ _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf if (is_write_end && i == bufs_len - 1) { wbuf->is_write_end = true; } - udx__queue_tail(&stream->write_queue, &wbuf->queue); + udx__queue_tail(&stream->wbuf_queue, &wbuf->queue); } + udx__queue_tail(&stream->write_queue, &write->queue); // if an idle, zero window stream has data queued, send a zero-window probe immediately if (stream->writes_queued_bytes > 0 && stream->send_rwnd == 0) {