Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ extern "C" {
#define UDX_SOCKET_BOUND 0b0010
#define UDX_SOCKET_CLOSED 0b0100

#define UDX_STREAM_CONNECTED 0b000000001
#define UDX_STREAM_RECEIVING 0b000000010
#define UDX_STREAM_READING 0b000000100
#define UDX_STREAM_ENDING 0b000001000
#define UDX_STREAM_ENDING_REMOTE 0b000010000
#define UDX_STREAM_ENDED 0b000100000
#define UDX_STREAM_ENDED_REMOTE 0b001000000
#define UDX_STREAM_DESTROYING 0b010000000
#define UDX_STREAM_CLOSED 0b100000000
#define UDX_STREAM_CONNECTED 0b00000000001
#define UDX_STREAM_RECEIVING 0b00000000010
#define UDX_STREAM_READING 0b00000000100
#define UDX_STREAM_ENDING 0b00000001000
#define UDX_STREAM_ENDING_REMOTE 0b00000010000
#define UDX_STREAM_ENDED 0b00000100000
#define UDX_STREAM_ENDED_REMOTE 0b00001000000
#define UDX_STREAM_NEED_TIME_WAIT 0b00010000000
#define UDX_STREAM_TIME_WAIT 0b00100000000
#define UDX_STREAM_DESTROYING 0b01000000000
#define UDX_STREAM_CLOSED 0b10000000000

#define UDX_HEADER_DATA 0b00001
#define UDX_HEADER_END 0b00010
Expand Down Expand Up @@ -258,6 +260,8 @@ struct udx_stream_s {
uint32_t remote_acked; // tcp snd.una
uint32_t remote_ended;

uint32_t timewait_timeout_ms;

uint32_t srtt;
uint32_t rttvar;
uint32_t rto;
Expand All @@ -279,6 +283,7 @@ struct udx_stream_s {

// optimize: use one timer and a action (RTO, RACK_REO, TLP) variable
int nrefs;

uv_timer_t rto_timer;
uv_timer_t rack_reo_timer;
uv_timer_t tlp_timer;
Expand Down Expand Up @@ -502,6 +507,12 @@ udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack);
int
udx_stream_set_ack (udx_stream_t *stream, uint32_t ack);

int
udx_stream_set_timewait_timeout_ms (udx_stream_t *stream, uint32_t timewait_timeout_ms);

int
udx_stream_get_timewait_timeout_ms (udx_stream_t *stream, uint32_t *timewait_timeout_ms);

int
udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *rwnd_max);

Expand Down
93 changes: 70 additions & 23 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@
#define UDX_STREAM_SHOULD_READ (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD)
#define UDX_STREAM_READ 0

#define UDX_STREAM_SHOULD_END (UDX_STREAM_ENDING | UDX_STREAM_ENDED | UDX_STREAM_DEAD)
#define UDX_STREAM_END UDX_STREAM_ENDING

#define UDX_STREAM_SHOULD_END_REMOTE (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD | UDX_STREAM_ENDING_REMOTE)
#define UDX_STREAM_END_REMOTE UDX_STREAM_ENDING_REMOTE

#define UDX_HEADER_DATA_OR_END (UDX_HEADER_DATA | UDX_HEADER_END)

#define UDX_DEFAULT_TTL 64
Expand All @@ -46,7 +40,8 @@
#define UDX_CONG_MAX_CWND 65536
#define UDX_RTO_MAX_MS 30000
#define UDX_RTT_MAX_MS 30000
#define UDX_DEFAULT_RWND_MAX (4 * 1024 * 1024) // arbitrary, ~175 1500 mtu packets, @20ms latency = 416 mbits/sec
#define UDX_TIME_WAIT_MS 30000 // 30 seconds
#define UDX_DEFAULT_RWND_MAX (4 * 1024 * 1024) // 4mb. 139 mbit/s at 240ms latency

#define UDX_HIGH_WATERMARK 262144

Expand Down Expand Up @@ -599,8 +594,6 @@ close_stream (udx_stream_t *stream, int err) {
stream->status |= UDX_STREAM_CLOSED;
stream->status &= ~UDX_STREAM_CONNECTED;

debug_printf("closing stream local_id=%u \n", stream->local_id);

udx_t *udx = stream->udx;
udx_socket_t *socket = stream->socket;

Expand Down Expand Up @@ -840,6 +833,25 @@ udx_zwp_timeout (uv_timer_t *timer) {
update_poll(stream->socket);
}

static void
udx_timewait_timeout (uv_timer_t *timer) {
udx_stream_t *stream = timer->data;
udx_socket_t *socket = stream->socket;

close_stream(stream, 0);

update_poll(socket);
}

static void
time_wait_stream (udx_stream_t *stream) {
stream->status |= UDX_STREAM_TIME_WAIT;
uv_timer_stop(&stream->zwp_timer);
uv_timer_stop(&stream->tlp_timer);
uv_timer_stop(&stream->rack_reo_timer);
uv_timer_start(&stream->rto_timer, udx_timewait_timeout, stream->timewait_timeout_ms, 0);
}

static void
udx_rto_timeout (uv_timer_t *timer) {
udx_stream_t *stream = timer->data;
Expand Down Expand Up @@ -1081,11 +1093,14 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {

free(pkt);

// TODO: the end condition needs work here to be more "stateless"
// ie if the remote has acked all our writes, then instead of waiting for retransmits, we should
// clear those and mark as local ended NOW.
if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_END && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) {
const int UDX_STREAM_SHOULD_END = UDX_STREAM_ENDING | UDX_STREAM_ENDED | UDX_STREAM_DEAD;

if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_ENDING && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) {
stream->status |= UDX_STREAM_ENDED;
// received the final ack, and we are the passive side (no TIME_WAIT)
if ((stream->status & (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_NEED_TIME_WAIT)) == UDX_STREAM_ENDED_REMOTE) {
close_stream(stream, 0);
}
return 2;
}

Expand Down Expand Up @@ -1399,24 +1414,28 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
}

if (a == 0 || a == 1) continue;
if (a == 2) { // it ended, so ack that and trigger close
// TODO: make this work as well, if the ack packet is lost, ie
// have some internal (capped) queue of "gracefully closed" streams (TIME_WAIT)
if (a == 2) {

if (stream->status & UDX_STREAM_DEAD) {
return 1;
}

if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
close_stream(stream, 0);
return 1;
}

if (stream->remote_acked == stream->seq) {
uv_timer_stop(&stream->rto_timer);
uv_timer_stop(&stream->tlp_timer);
}

if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
if (stream->status & UDX_STREAM_NEED_TIME_WAIT) {
// CLOSING -> TIME_WAIT
time_wait_stream(stream);
return 1;
} else {
close_stream(stream, 0);
return 1;
}
}

// send a final state packet to make sure we've acked the end packet
stream->write_wanted |= UDX_STREAM_WRITE_WANT_STATE;
update_poll(stream->socket);
Expand Down Expand Up @@ -1640,8 +1659,10 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
return false;
}

const int UDX_STREAM_SHOULD_END_REMOTE = UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD | UDX_STREAM_ENDING_REMOTE;

// if this ACK packet acks the remote's END packet, advance from ENDING_REMOTE -> ENDED_REMOTE
if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_END_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) {
if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_ENDING_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) {
stream->status |= UDX_STREAM_ENDED_REMOTE;
if (stream->on_read != NULL) {
uv_buf_t b = uv_buf_init(NULL, 0);
Expand All @@ -1655,9 +1676,16 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
stream->write_wanted &= ~UDX_STREAM_WRITE_WANT_STATE;

if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {

assert(stream->retransmit_queue.len == 0);
assert(stream->write_queue.len == 0);
close_stream(stream, 0);

if (stream->status & UDX_STREAM_NEED_TIME_WAIT) {
// FIN_WAIT -> TIME_WAIT
time_wait_stream(stream);
} else {
close_stream(stream, 0);
}
return true;
}
}
Expand Down Expand Up @@ -2344,6 +2372,8 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
stream->send_wl2 = 0;
stream->remote_acked = 0;

stream->timewait_timeout_ms = UDX_TIME_WAIT_MS;

stream->srtt = 0;
stream->rttvar = 0;
stream->rto = 1000;
Expand Down Expand Up @@ -2455,6 +2485,18 @@ udx_stream_set_ack (udx_stream_t *stream, uint32_t ack) {
return 0;
}

int
udx_stream_get_timewait_timeout_ms (udx_stream_t *stream, uint32_t *timeout_ms) {
*timeout_ms = stream->timewait_timeout_ms;
return 0;
}

int
udx_stream_set_timewait_timeout_ms (udx_stream_t *stream, uint32_t timeout_ms) {
stream->timewait_timeout_ms = timeout_ms;
return 0;
}

int
udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *size) {
*size = stream->recv_rwnd_max;
Expand Down Expand Up @@ -2766,6 +2808,11 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu

stream->status |= UDX_STREAM_ENDING;

// only the 'active' closer must enter TIME_WAIT
if ((stream->status & UDX_STREAM_ENDED_REMOTE) == 0) {
stream->status |= UDX_STREAM_NEED_TIME_WAIT;
}

if (bufs_len > 0) {
req->nwbufs = bufs_len;
_udx_stream_write(req, stream, bufs, bufs_len, ack_cb, true);
Expand Down
5 changes: 5 additions & 0 deletions test/stream-write-read-receive-window.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ on_socket_close (udx_socket_t *s) {
void
on_finalize (udx_stream_t *stream) {
nfinalize++;

if (nfinalize == 2) {
udx_socket_close(&send_sock);
udx_socket_close(&recv_sock);
Expand Down Expand Up @@ -129,9 +130,13 @@ main () {
e = udx_stream_init(&udx, &recv_stream, 1, on_close, on_finalize);
assert(e == 0);

assert(udx_stream_set_timewait_timeout_ms(&recv_stream, 0) == 0);

e = udx_stream_init(&udx, &send_stream, 2, on_close, on_finalize);
assert(e == 0);

assert(udx_stream_set_timewait_timeout_ms(&send_stream, 0) == 0);

recv_stream.get_read_buffer_size = &pretend_buffer_is_full;
send_stream.send_rwnd = 0;
assert(recv_stream.rto == 1000);
Expand Down
9 changes: 7 additions & 2 deletions test/stream-write-read.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bool read_called = false;
bool eof_received = false;

int nclosed;
int nfinalized;

void
on_close (udx_stream_t *s, int status) {
Expand All @@ -29,8 +30,8 @@ on_close (udx_stream_t *s, int status) {
nclosed++;

if (nclosed == 2) {
udx_socket_close(&asock);
udx_socket_close(&bsock);
assert(0 == udx_socket_close(&asock));
assert(0 == udx_socket_close(&bsock));
}
}

Expand Down Expand Up @@ -90,6 +91,10 @@ main () {
e = udx_stream_init(&udx, &bstream, 2, on_close, NULL);
assert(e == 0);

assert(udx_stream_set_timewait_timeout_ms(&astream, 0) == 0);

assert(udx_stream_set_timewait_timeout_ms(&bstream, 0) == 0);

e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr);
assert(e == 0);

Expand Down