From adbcefcf17eef5e248c4d13f8eebfd6cba2ed912 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Mon, 3 Mar 2025 21:48:27 -0500 Subject: [PATCH] add TIME-WAIT state --- include/udx.h | 29 +++++--- src/udx.c | 93 +++++++++++++++++++------ test/stream-write-read-receive-window.c | 5 ++ test/stream-write-read.c | 9 ++- 4 files changed, 102 insertions(+), 34 deletions(-) diff --git a/include/udx.h b/include/udx.h index 088486e..d239299 100644 --- a/include/udx.h +++ b/include/udx.h @@ -32,15 +32,17 @@ extern "C" { #define UDX_SOCKET_BOUND 0b0010 #define UDX_SOCKET_CLOSED 0b0100 -#define UDX_STREAM_CONNECTED 0b000000001 -#define UDX_STREAM_RECEIVING 0b000000010 -#define UDX_STREAM_READING 0b000000100 -#define UDX_STREAM_ENDING 0b000001000 -#define UDX_STREAM_ENDING_REMOTE 0b000010000 -#define UDX_STREAM_ENDED 0b000100000 -#define UDX_STREAM_ENDED_REMOTE 0b001000000 -#define UDX_STREAM_DESTROYING 0b010000000 -#define UDX_STREAM_CLOSED 0b100000000 +#define UDX_STREAM_CONNECTED 0b00000000001 +#define UDX_STREAM_RECEIVING 0b00000000010 +#define UDX_STREAM_READING 0b00000000100 +#define UDX_STREAM_ENDING 0b00000001000 +#define UDX_STREAM_ENDING_REMOTE 0b00000010000 +#define UDX_STREAM_ENDED 0b00000100000 +#define UDX_STREAM_ENDED_REMOTE 0b00001000000 +#define UDX_STREAM_NEED_TIME_WAIT 0b00010000000 +#define UDX_STREAM_TIME_WAIT 0b00100000000 +#define UDX_STREAM_DESTROYING 0b01000000000 +#define UDX_STREAM_CLOSED 0b10000000000 #define UDX_HEADER_DATA 0b00001 #define UDX_HEADER_END 0b00010 @@ -258,6 +260,8 @@ struct udx_stream_s { uint32_t remote_acked; // tcp snd.una uint32_t remote_ended; + uint32_t timewait_timeout_ms; + uint32_t srtt; uint32_t rttvar; uint32_t rto; @@ -279,6 +283,7 @@ struct udx_stream_s { // optimize: use one timer and a action (RTO, RACK_REO, TLP) variable int nrefs; + uv_timer_t rto_timer; uv_timer_t rack_reo_timer; uv_timer_t tlp_timer; @@ -502,6 +507,12 @@ udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack); int udx_stream_set_ack (udx_stream_t *stream, uint32_t ack); +int +udx_stream_set_timewait_timeout_ms (udx_stream_t *stream, uint32_t timewait_timeout_ms); + +int +udx_stream_get_timewait_timeout_ms (udx_stream_t *stream, uint32_t *timewait_timeout_ms); + int udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *rwnd_max); diff --git a/src/udx.c b/src/udx.c index 5c95f76..0c48be3 100644 --- a/src/udx.c +++ b/src/udx.c @@ -23,12 +23,6 @@ #define UDX_STREAM_SHOULD_READ (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD) #define UDX_STREAM_READ 0 -#define UDX_STREAM_SHOULD_END (UDX_STREAM_ENDING | UDX_STREAM_ENDED | UDX_STREAM_DEAD) -#define UDX_STREAM_END UDX_STREAM_ENDING - -#define UDX_STREAM_SHOULD_END_REMOTE (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD | UDX_STREAM_ENDING_REMOTE) -#define UDX_STREAM_END_REMOTE UDX_STREAM_ENDING_REMOTE - #define UDX_HEADER_DATA_OR_END (UDX_HEADER_DATA | UDX_HEADER_END) #define UDX_DEFAULT_TTL 64 @@ -46,7 +40,8 @@ #define UDX_CONG_MAX_CWND 65536 #define UDX_RTO_MAX_MS 30000 #define UDX_RTT_MAX_MS 30000 -#define UDX_DEFAULT_RWND_MAX (4 * 1024 * 1024) // arbitrary, ~175 1500 mtu packets, @20ms latency = 416 mbits/sec +#define UDX_TIME_WAIT_MS 30000 // 30 seconds +#define UDX_DEFAULT_RWND_MAX (4 * 1024 * 1024) // 4mb. 139 mbit/s at 240ms latency #define UDX_HIGH_WATERMARK 262144 @@ -599,8 +594,6 @@ close_stream (udx_stream_t *stream, int err) { stream->status |= UDX_STREAM_CLOSED; stream->status &= ~UDX_STREAM_CONNECTED; - debug_printf("closing stream local_id=%u \n", stream->local_id); - udx_t *udx = stream->udx; udx_socket_t *socket = stream->socket; @@ -840,6 +833,25 @@ udx_zwp_timeout (uv_timer_t *timer) { update_poll(stream->socket); } +static void +udx_timewait_timeout (uv_timer_t *timer) { + udx_stream_t *stream = timer->data; + udx_socket_t *socket = stream->socket; + + close_stream(stream, 0); + + update_poll(socket); +} + +static void +time_wait_stream (udx_stream_t *stream) { + stream->status |= UDX_STREAM_TIME_WAIT; + uv_timer_stop(&stream->zwp_timer); + uv_timer_stop(&stream->tlp_timer); + uv_timer_stop(&stream->rack_reo_timer); + uv_timer_start(&stream->rto_timer, udx_timewait_timeout, stream->timewait_timeout_ms, 0); +} + static void udx_rto_timeout (uv_timer_t *timer) { udx_stream_t *stream = timer->data; @@ -1081,11 +1093,14 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) { free(pkt); - // TODO: the end condition needs work here to be more "stateless" - // ie if the remote has acked all our writes, then instead of waiting for retransmits, we should - // clear those and mark as local ended NOW. - if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_END && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) { + const int UDX_STREAM_SHOULD_END = UDX_STREAM_ENDING | UDX_STREAM_ENDED | UDX_STREAM_DEAD; + + if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_ENDING && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) { stream->status |= UDX_STREAM_ENDED; + // received the final ack, and we are the passive side (no TIME_WAIT) + if ((stream->status & (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_NEED_TIME_WAIT)) == UDX_STREAM_ENDED_REMOTE) { + close_stream(stream, 0); + } return 2; } @@ -1399,24 +1414,28 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd } if (a == 0 || a == 1) continue; - if (a == 2) { // it ended, so ack that and trigger close - // TODO: make this work as well, if the ack packet is lost, ie - // have some internal (capped) queue of "gracefully closed" streams (TIME_WAIT) + if (a == 2) { if (stream->status & UDX_STREAM_DEAD) { return 1; } - if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) { - close_stream(stream, 0); - return 1; - } - if (stream->remote_acked == stream->seq) { uv_timer_stop(&stream->rto_timer); uv_timer_stop(&stream->tlp_timer); } + if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) { + if (stream->status & UDX_STREAM_NEED_TIME_WAIT) { + // CLOSING -> TIME_WAIT + time_wait_stream(stream); + return 1; + } else { + close_stream(stream, 0); + return 1; + } + } + // send a final state packet to make sure we've acked the end packet stream->write_wanted |= UDX_STREAM_WRITE_WANT_STATE; update_poll(stream->socket); @@ -1640,8 +1659,10 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { return false; } + const int UDX_STREAM_SHOULD_END_REMOTE = UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD | UDX_STREAM_ENDING_REMOTE; + // if this ACK packet acks the remote's END packet, advance from ENDING_REMOTE -> ENDED_REMOTE - if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_END_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) { + if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_ENDING_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) { stream->status |= UDX_STREAM_ENDED_REMOTE; if (stream->on_read != NULL) { uv_buf_t b = uv_buf_init(NULL, 0); @@ -1655,9 +1676,16 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { stream->write_wanted &= ~UDX_STREAM_WRITE_WANT_STATE; if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) { + assert(stream->retransmit_queue.len == 0); assert(stream->write_queue.len == 0); - close_stream(stream, 0); + + if (stream->status & UDX_STREAM_NEED_TIME_WAIT) { + // FIN_WAIT -> TIME_WAIT + time_wait_stream(stream); + } else { + close_stream(stream, 0); + } return true; } } @@ -2344,6 +2372,8 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream stream->send_wl2 = 0; stream->remote_acked = 0; + stream->timewait_timeout_ms = UDX_TIME_WAIT_MS; + stream->srtt = 0; stream->rttvar = 0; stream->rto = 1000; @@ -2455,6 +2485,18 @@ udx_stream_set_ack (udx_stream_t *stream, uint32_t ack) { return 0; } +int +udx_stream_get_timewait_timeout_ms (udx_stream_t *stream, uint32_t *timeout_ms) { + *timeout_ms = stream->timewait_timeout_ms; + return 0; +} + +int +udx_stream_set_timewait_timeout_ms (udx_stream_t *stream, uint32_t timeout_ms) { + stream->timewait_timeout_ms = timeout_ms; + return 0; +} + int udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *size) { *size = stream->recv_rwnd_max; @@ -2766,6 +2808,11 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu stream->status |= UDX_STREAM_ENDING; + // only the 'active' closer must enter TIME_WAIT + if ((stream->status & UDX_STREAM_ENDED_REMOTE) == 0) { + stream->status |= UDX_STREAM_NEED_TIME_WAIT; + } + if (bufs_len > 0) { req->nwbufs = bufs_len; _udx_stream_write(req, stream, bufs, bufs_len, ack_cb, true); diff --git a/test/stream-write-read-receive-window.c b/test/stream-write-read-receive-window.c index af4425f..c4e23a9 100644 --- a/test/stream-write-read-receive-window.c +++ b/test/stream-write-read-receive-window.c @@ -55,6 +55,7 @@ on_socket_close (udx_socket_t *s) { void on_finalize (udx_stream_t *stream) { nfinalize++; + if (nfinalize == 2) { udx_socket_close(&send_sock); udx_socket_close(&recv_sock); @@ -129,9 +130,13 @@ main () { e = udx_stream_init(&udx, &recv_stream, 1, on_close, on_finalize); assert(e == 0); + assert(udx_stream_set_timewait_timeout_ms(&recv_stream, 0) == 0); + e = udx_stream_init(&udx, &send_stream, 2, on_close, on_finalize); assert(e == 0); + assert(udx_stream_set_timewait_timeout_ms(&send_stream, 0) == 0); + recv_stream.get_read_buffer_size = &pretend_buffer_is_full; send_stream.send_rwnd = 0; assert(recv_stream.rto == 1000); diff --git a/test/stream-write-read.c b/test/stream-write-read.c index 27ac2de..f04f629 100644 --- a/test/stream-write-read.c +++ b/test/stream-write-read.c @@ -21,6 +21,7 @@ bool read_called = false; bool eof_received = false; int nclosed; +int nfinalized; void on_close (udx_stream_t *s, int status) { @@ -29,8 +30,8 @@ on_close (udx_stream_t *s, int status) { nclosed++; if (nclosed == 2) { - udx_socket_close(&asock); - udx_socket_close(&bsock); + assert(0 == udx_socket_close(&asock)); + assert(0 == udx_socket_close(&bsock)); } } @@ -90,6 +91,10 @@ main () { e = udx_stream_init(&udx, &bstream, 2, on_close, NULL); assert(e == 0); + assert(udx_stream_set_timewait_timeout_ms(&astream, 0) == 0); + + assert(udx_stream_set_timewait_timeout_ms(&bstream, 0) == 0); + e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr); assert(e == 0);