Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ struct udx_stream_s {
// congestion state
udx_cong_t cong;

udx_queue_t wbuf_queue;
udx_queue_t write_queue;

udx_cirbuf_t outgoing;
Expand Down Expand Up @@ -365,6 +366,7 @@ struct udx_stream_write_buf_s {

struct udx_stream_write_s {
size_t size;
udx_queue_node_t queue;
size_t bytes_acked;
bool is_write_end;

Expand Down
75 changes: 26 additions & 49 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ on_uv_close (uv_handle_t *handle) {

static bool
stream_has_data (udx_stream_t *stream) {
return stream->write_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0;
return stream->wbuf_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0;
}

static bool
Expand Down Expand Up @@ -416,46 +416,18 @@ wbufs_offset (udx_packet_t *pkt) {

static void
clear_outgoing_packets (udx_stream_t *stream) {
// todo: skip the math, and just
// 1. destroy all packets
// 2. destroy all wbufs
// 3. set write->bytes_acked = write->size and call ack(cancel) on all writes

// We should make sure all existing packets do not send, and notify the user that they failed
// We should make sure all existing packets do not send
for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) {
udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_remove(&(stream->outgoing), seq);

if (pkt == NULL) continue;

assert(pkt->nbufs >= 2);

uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
udx_stream_write_buf_t **wbufs = wbufs_offset(pkt);

for (int i = 1; i < pkt->nbufs; i++) {
size_t pkt_len = bufs[i].len;
udx_stream_write_buf_t *wbuf = wbufs[i - 1];
on_bytes_acked(wbuf, pkt_len, true);

// todo: move into on_bytes_acked itself
udx_stream_write_t *write = wbuf->write;

if (write->bytes_acked == write->size && write->on_ack) {
write->on_ack(write, UV_ECANCELED, 0);
}
}

free(pkt);
}

// notify the user that their pending writes failed
while (stream->write_queue.len > 0) {
udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_shift(&stream->write_queue), udx_stream_write_buf_t, queue);
assert(wbuf != NULL);

on_bytes_acked(wbuf, wbuf->buf.len - wbuf->bytes_acked, true);
// todo: move into on_bytes_acked itself
udx_stream_write_t *write = wbuf->write;
if (write->bytes_acked == write->size && write->on_ack) {
udx_stream_write_t *write = udx__queue_data(udx__queue_shift(&stream->write_queue), udx_stream_write_t, queue);
if (write->on_ack) {
write->on_ack(write, UV_ECANCELED, 0);
}
}
Expand Down Expand Up @@ -1068,13 +1040,15 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {

udx_stream_write_t *write = wbuf->write;

if (write->bytes_acked == write->size && write->on_ack) {
write->on_ack(write, 0, sack);

// reentry from write->on_ack
if (stream->status & UDX_STREAM_DEAD) {
free(pkt);
return 2;
if (write->bytes_acked == write->size) {
udx__queue_unlink(&stream->write_queue, &write->queue);
if (write->on_ack) {
write->on_ack(write, 0, sack);
// reentry from write->on_ack
if (stream->status & UDX_STREAM_DEAD) {
free(pkt);
return 2;
}
}
}
}
Expand All @@ -1084,7 +1058,7 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {
// TODO: the end condition needs work here to be more "stateless"
// ie if the remote has acked all our writes, then instead of waiting for retransmits, we should
// clear those and mark as local ended NOW.
if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_END && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) {
if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_END && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->wbuf_queue.len == 0) {
stream->status |= UDX_STREAM_ENDED;
return 2;
}
Expand Down Expand Up @@ -1649,6 +1623,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {

if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
assert(stream->retransmit_queue.len == 0);
assert(stream->wbuf_queue.len == 0);
assert(stream->write_queue.len == 0);
close_stream(stream, 0);
return true;
Expand Down Expand Up @@ -1709,7 +1684,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
arm_stream_timers(stream, false);
}

while (stream->write_queue.len > 0 && (stream_may_send(stream) || stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP)) {
while (stream->wbuf_queue.len > 0 && (stream_may_send(stream) || stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP)) {

bool tlp = stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP;
bool zwp = stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP;
Expand All @@ -1731,8 +1706,8 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
int nwbufs = 0;
size_t size = 0;

while (capacity > 0 && nwbufs < UDX_MAX_COMBINED_WRITES && stream->write_queue.len > 0) {
udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_peek(&stream->write_queue), udx_stream_write_buf_t, queue);
while (capacity > 0 && nwbufs < UDX_MAX_COMBINED_WRITES && stream->wbuf_queue.len > 0) {
udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_peek(&stream->wbuf_queue), udx_stream_write_buf_t, queue);

uv_buf_t *buf = &wbuf->buf;

Expand All @@ -1759,7 +1734,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
if (wbuf->is_write_end) {
header_flag |= UDX_HEADER_END;
}
udx__queue_shift(&stream->write_queue);
udx__queue_shift(&stream->wbuf_queue);
}
}

Expand Down Expand Up @@ -1790,7 +1765,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
while (i--) {
udx_stream_write_buf_t *wbuf = wbufs[i];
if (wbuf->bytes_acked + wbuf->bytes_inflight == wbuf->buf.len) {
udx__queue_head(&stream->write_queue, &wbuf->queue);
udx__queue_head(&stream->wbuf_queue, &wbuf->queue);
}
wbuf->bytes_inflight -= bufs[i].len;
}
Expand Down Expand Up @@ -1834,12 +1809,12 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {

assert(stream->status != UDX_STREAM_CLOSED);

if (stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP && stream->write_queue.len == 0 && stream->retransmit_queue.len == 0) {
if (stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP && stream->wbuf_queue.len == 0 && stream->retransmit_queue.len == 0) {
// if there's no data then we don't need to probe the window anyways.
stream->write_wanted &= ~UDX_STREAM_WRITE_WANT_ZWP;
}

if (stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP && stream->write_queue.len == 0) {
if (stream->write_wanted & UDX_STREAM_WRITE_WANT_TLP && stream->wbuf_queue.len == 0) {
// rack 7.3
udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_get(&stream->outgoing, stream->seq - 1);

Expand Down Expand Up @@ -2380,6 +2355,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
udx__queue_init(&stream->inflight_queue);
udx__queue_init(&stream->retransmit_queue);
udx__queue_init(&stream->unordered_queue);
udx__queue_init(&stream->wbuf_queue);
udx__queue_init(&stream->write_queue);

stream->pkts_buffered = 0;
Expand Down Expand Up @@ -2711,8 +2687,9 @@ _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf
if (is_write_end && i == bufs_len - 1) {
wbuf->is_write_end = true;
}
udx__queue_tail(&stream->write_queue, &wbuf->queue);
udx__queue_tail(&stream->wbuf_queue, &wbuf->queue);
}
udx__queue_tail(&stream->write_queue, &write->queue);

// if an idle, zero window stream has data queued, send a zero-window probe immediately
if (stream->writes_queued_bytes > 0 && stream->send_rwnd == 0) {
Expand Down