Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ set_target_properties(
)

if(UNIX)
target_compile_options(udx PRIVATE -Wall -Wextra)
target_compile_options(udx PRIVATE -Wall -Werror -Wextra)
endif()

if(WIN32)
Expand All @@ -42,6 +42,8 @@ target_sources(
src/io.h
src/udx.c
src/udx_rate.c
src/udx_sack_tree.c
src/udx_sack_tree.h
src/win_filter.h
src/win_filter.c
src/win_filter_f64.h
Expand Down
28 changes: 24 additions & 4 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,29 @@ struct udx_socket_s {
#define UDX_CA_RECOVERY 2
#define UDX_CA_LOSS 3

typedef struct udx_sack_block_s udx_sack_block_t;

struct udx_sack_block_s {
uint32_t start;
uint32_t end;

udx_sack_block_t *left;
udx_sack_block_t *right;
udx_sack_block_t *parent;

uint8_t color;

size_t len; // nbytes of actual data
size_t nalloc; // nbytes we have room for
uint8_t *data; // points to just after this struct
};

typedef struct {
udx_sack_block_t *root;
udx_sack_block_t *sentinel; // sentinel simplifies code over using NULL for empty leaf nodes
udx_sack_block_t _sentinel;
} udx_sack_tree_t;

struct udx_stream_s {
uint32_t local_id; // must be first entry, so its compat with the cirbuf
uint32_t remote_id;
Expand All @@ -224,7 +247,6 @@ struct udx_stream_s {
udx_stream_t *next;

int status;
int out_of_order;

uint8_t ca_state;
uint32_t high_seq; // seq at time of congestion, marks end of recovery
Expand Down Expand Up @@ -348,8 +370,6 @@ struct udx_stream_s {

uint32_t pacing_bytes_per_ms; // computed by bbr module. 'BBR.pacing_rate' in IETF draft

uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)?

// pacing (tb = token bucket)
uint32_t tb_available;
uint64_t tb_last_refill_ms;
Expand Down Expand Up @@ -382,7 +402,7 @@ struct udx_stream_s {
udx_queue_t write_queue;

udx_cirbuf_t outgoing;
udx_cirbuf_t incoming;
udx_sack_tree_t sack_tree;

udx_queue_t retransmit_queue; // udx_packet_t
udx_queue_t inflight_queue; // udx_packet_t
Expand Down
153 changes: 76 additions & 77 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "io.h"
#include "link.h"
#include "queue.h"
#include "udx_sack_tree.h"
#include "win_filter.h"

#define UDX_STREAM_ALL_ENDED (UDX_STREAM_ENDED | UDX_STREAM_ENDED_REMOTE)
Expand Down Expand Up @@ -56,14 +57,6 @@
static void
arm_stream_timers (udx_stream_t *stream, bool sent_tlp);

typedef struct {
uint32_t seq; // must be the first entry, so its compat with the cirbuf

int type;

uv_buf_t buf;
} udx_pending_read_t;

static uint32_t
seq_max (uint32_t a, uint32_t b) {
return seq_compare(a, b) < 0 ? b : a;
Expand Down Expand Up @@ -151,15 +144,12 @@ update_pacing_time (udx_stream_t *stream);

static void
clear_incoming_packets (udx_stream_t *stream) {
uint32_t seq = stream->ack;
udx_cirbuf_t *inc = &(stream->incoming);

while (stream->pkts_buffered) {
udx_pending_read_t *pkt = (udx_pending_read_t *) udx__cirbuf_remove(inc, seq++);
if (pkt == NULL) continue;

stream->pkts_buffered--;
free(pkt);
while (stream->sack_tree.root != stream->sack_tree.sentinel) {
udx_sack_block_t *root = stream->sack_tree.root;
udx_sack_tree_remove(&stream->sack_tree, root);
free(root->data);
free(root);
}
}

Expand Down Expand Up @@ -422,7 +412,6 @@ close_stream (udx_stream_t *stream, int err) {
}

udx__cirbuf_destroy(&stream->relaying_streams);
udx__cirbuf_destroy(&stream->incoming);
udx__cirbuf_destroy(&stream->outgoing);

uv_timer_stop(&stream->rto_timer);
Expand Down Expand Up @@ -572,38 +561,19 @@ send_ack (udx_stream_t *stream) {
} sacks[UDX_MAX_SACKS];
} pkt;

bool in_sack_block = false;
int ooo = stream->out_of_order;
udx_sack_block_t *p = udx_sack_tree_min(&stream->sack_tree);

int nsacks = 0;
uint32_t start = 0;
uint32_t end = 0;
// todo: highest sequence should end at rack.fack, use that as maximum?
for (int i = 0; i < 65536 && ooo > 0 && nsacks < UDX_MAX_SACKS; i++) {
uint32_t seq = stream->ack + 1 + i;
bool received = udx__cirbuf_get(&stream->incoming, seq) != NULL;
if (received) {
if (!in_sack_block) {
in_sack_block = true;
start = seq;
end = seq + 1;
} else {
end = seq + 1;
}
while (p != NULL && nsacks < UDX_MAX_SACKS) {
if (nsacks > 0 && pkt.sacks[nsacks - 1].end == p->start) {
// merge adjacent ooo blocks
pkt.sacks[nsacks - 1].end = udx__swap_uint32_if_be(p->end);
} else {
ooo--;
if (in_sack_block) {
in_sack_block = false;
pkt.sacks[nsacks].start = udx__swap_uint32_if_be(start);
pkt.sacks[nsacks].end = udx__swap_uint32_if_be(end);
nsacks++;
}
pkt.sacks[nsacks].start = udx__swap_uint32_if_be(p->start);
pkt.sacks[nsacks].end = udx__swap_uint32_if_be(p->end);
nsacks++;
}
}

if (in_sack_block && nsacks < UDX_MAX_SACKS) {
pkt.sacks[nsacks].start = udx__swap_uint32_if_be(start);
pkt.sacks[nsacks].end = udx__swap_uint32_if_be(end);
nsacks++;
p = udx_sack_tree_next(&stream->sack_tree, p);
}

// debug_printf("sending ack ack=%u nsasks=%d\n", stream->ack, nsacks);
Expand Down Expand Up @@ -1332,6 +1302,19 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack, udx_rate_sample_t *rs)
return 1;
}

static uint64_t
next_power_of_two (uint64_t v) {
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
v++;
return v;
}

static void
process_data_packet (udx_stream_t *stream, int type, uint32_t seq, char *data, ssize_t data_len) {
if (seq == stream->ack && type & UDX_HEADER_DATA) {
Expand All @@ -1345,24 +1328,34 @@ process_data_packet (udx_stream_t *stream, int type, uint32_t seq, char *data, s
return;
}

stream->out_of_order++;

// Slow path, packet out of order.
// Copy over incoming buffer as we do not own it (stack allocated upstream)
char *ptr = malloc(sizeof(udx_pending_read_t) + data_len);

udx_pending_read_t *pkt = (udx_pending_read_t *) ptr;
char *cpy = ptr + sizeof(udx_pending_read_t);

memcpy(cpy, data, data_len);

pkt->type = type;
pkt->seq = seq;
pkt->buf.base = cpy;
pkt->buf.len = data_len;
// store data in a sack block

udx_sack_block_t *block = udx_sack_tree_find(&stream->sack_tree, seq);

if (block == NULL) {
block = calloc(1, sizeof(udx_sack_block_t) + data_len); // todo: leave room for more data?
block->nalloc = data_len;
assert(block != NULL);
block->start = seq;
block->end = seq;
block->len = 0;
block->data = malloc(data_len);
udx_sack_tree_insert(&stream->sack_tree, block);
}

if (block->end == seq) {
block->end++;
if (block->len + data_len > block->nalloc) {
block->nalloc = next_power_of_two(block->len + data_len);
block->data = realloc(block->data, block->nalloc);
}
}

stream->pkts_buffered++;
udx__cirbuf_set(&(stream->incoming), (udx_cirbuf_val_t *) pkt);
memcpy(block->data + block->len, data, data_len);
block->len += data_len;
assert(block->len <= block->nalloc);
}

static int
Expand Down Expand Up @@ -1517,11 +1510,11 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
buf_len -= data_offset;
}

udx_cirbuf_t *inc = &(stream->incoming);

// For all stream packets, ensure that they are causally newer (or same)
if (seq_compare(stream->ack, seq) <= 0) {
if (type & UDX_HEADER_DATA_OR_END && udx__cirbuf_get(inc, seq) == NULL && (stream->status & UDX_STREAM_SHOULD_READ) == UDX_STREAM_READ) {
udx_sack_block_t *sack = udx_sack_tree_find(&stream->sack_tree, seq);
bool sacked = sack && sack->end != seq;
if (type & UDX_HEADER_DATA_OR_END && !sacked && (stream->status & UDX_STREAM_SHOULD_READ) == UDX_STREAM_READ) {
process_data_packet(stream, type, seq, buf, buf_len);
if (stream->status & UDX_STREAM_DEAD) {
return 1; // re-entry on read callback
Expand Down Expand Up @@ -1549,24 +1542,30 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
}
}

// process the (out of order) read queue
while ((stream->status & UDX_STREAM_SHOULD_READ) == UDX_STREAM_READ) {
udx_pending_read_t *pkt = (udx_pending_read_t *) udx__cirbuf_remove(inc, stream->ack);
if (pkt == NULL) break;

stream->out_of_order--;
stream->pkts_buffered--;
stream->ack++;
udx_sack_block_t *p = udx_sack_tree_min(&stream->sack_tree);

if ((pkt->type & UDX_HEADER_DATA) && stream->on_read != NULL) {
stream->on_read(stream, pkt->buf.len, &(pkt->buf));
// process the (out of order) read queue
while ((stream->status & UDX_STREAM_SHOULD_READ) == UDX_STREAM_READ && p && p->start == stream->ack) {
udx_sack_block_t *block = p;
p = udx_sack_tree_next(&stream->sack_tree, p);
udx_sack_tree_remove(&stream->sack_tree, block);

stream->ack += seq_diff(block->end, block->start);
assert(stream->ack == block->end);

// block could have 0 length if it was a sack of a pure END packet
if (block->len && stream->on_read != NULL) {
uv_buf_t b = uv_buf_init((char *) block->data, block->len);
stream->on_read(stream, block->len, &b);
if (stream->status & UDX_STREAM_DEAD) {
free(pkt);
free(block->data);
free(block);
return 1;
}
}

free(pkt);
free(block->data);
free(block);
}

// Check if the ack is oob.
Expand Down Expand Up @@ -2255,7 +2254,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream

// Init stream write/read buffers
udx__cirbuf_init(&(stream->outgoing), 16);
udx__cirbuf_init(&(stream->incoming), 16);
udx_sack_tree_init(&stream->sack_tree);
udx__queue_init(&stream->inflight_queue);
udx__queue_init(&stream->retransmit_queue);

Expand Down
2 changes: 1 addition & 1 deletion src/udx_rate.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ udx__rate_gen (udx_stream_t *stream, uint32_t delivered, uint32_t lost, udx_rate

if (rs->interval_ms < udx_rtt_min(stream)) {
if (!rs->is_retrans) {
debug_printf("rs->interval_ms=%ld, rs->delivered=%d, stream->ca_state=%s, stream->min_rtt=%u\n", rs->interval_ms, rs->delivered, ca_state_string[stream->ca_state], udx_rtt_min(stream));
debug_printf("rs->interval_ms=%" PRId64 ", rs->delivered=%d, stream->ca_state=%s, stream->min_rtt=%u\n", rs->interval_ms, rs->delivered, ca_state_string[stream->ca_state], udx_rtt_min(stream));
}
rs->interval_ms = -1;
return;
Expand Down
Loading
Loading