Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/udxperf.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ print_interval (udxperf_client_t *client, uint64_t bytes, uint64_t start, uint64

printf("[%3d] %6.4f-%6.4f sec %s %s/sec", stream->local_id, (start - client->start_time) / 1000.0, (end - client->start_time) / 1000.0, bytes_buf, bps_buf);
if (is_client && extra_wanted) {
printf(" cwnd=%d ssthresh=%d pacing_rate=%u fast_recovery_count=%d rto_count=%d rtx_count=%d", stream->cwnd, stream->pacing_bytes_per_ms, stream->ssthresh, stream->fast_recovery_count, stream->rto_count, stream->retransmit_count);
printf(" cwnd=%d ssthresh=%d pacing_rate_pkts_per_ms=%f fast_recovery_count=%d rto_count=%d rtx_count=%d", stream->cwnd, stream->ssthresh, stream->pacing_packet_per_ms, stream->fast_recovery_count, stream->rto_count, stream->retransmit_count);
}
printf("\n");
}
Expand Down
10 changes: 4 additions & 6 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,12 @@ struct udx_stream_s {

} bbr;

uint32_t pacing_bytes_per_ms; // computed by bbr module. 'BBR.pacing_rate' in IETF draft
uint64_t next_send_ts; // time to send next packet, may be now, in the future, or in the past.
double next_send_ts_fraction; // fractions of a millisecond, in range [0.0-1.0)
double pacing_packet_per_ms; // pacing rate in packets per millisecond, set by BBR

uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)?

// pacing (tb = token bucket)
uint32_t tb_available;
uint64_t tb_last_refill_ms;

// tlp
bool tlp_is_retrans; // the probe in-flight was a retransmission
bool tlp_in_flight; // if set, tlp_end_seq indicates the seqno
Expand All @@ -345,7 +343,7 @@ struct udx_stream_s {
uv_timer_t rack_reo_timer;
uv_timer_t tlp_timer;
uv_timer_t zwp_timer;
uv_timer_t refill_pacing_timer;
uv_timer_t pacing_timer;

size_t inflight;

Expand Down
87 changes: 44 additions & 43 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#define UDX_HEADER_DATA_OR_END (UDX_HEADER_DATA | UDX_HEADER_END)

#define UDX_DEFAULT_TTL 64
#define UDX_INIT_PACING_RATE 25000 // 25MB/s, 200mbit. updated by bbr_init
#define UDX_DEFAULT_SNDBUF_SIZE 212992

#define UDX_MAX_RTO_TIMEOUTS 6
Expand Down Expand Up @@ -150,8 +149,6 @@ stream_has_data (udx_stream_t *stream) {
return stream->write_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0;
}

static void
update_pacing_time (udx_stream_t *stream);
static bool
stream_write_wanted (udx_stream_t *stream) {
if (!(stream->status & UDX_STREAM_CONNECTED)) {
Expand All @@ -167,9 +164,8 @@ stream_write_wanted (udx_stream_t *stream) {
return true;
}

update_pacing_time(stream);

return stream->inflight_queue.len < send_window_in_packets(stream) && stream->tb_available && stream_has_data(stream);
return stream->inflight_queue.len < send_window_in_packets(stream) &&
stream->next_send_ts <= uv_now(stream->udx->loop) && stream_has_data(stream);
}

static bool
Expand Down Expand Up @@ -500,13 +496,13 @@ close_stream (udx_stream_t *stream, int err) {
uv_timer_stop(&stream->rack_reo_timer);
uv_timer_stop(&stream->tlp_timer);
uv_timer_stop(&stream->zwp_timer);
uv_timer_stop(&stream->refill_pacing_timer);
uv_timer_stop(&stream->pacing_timer);

uv_close((uv_handle_t *) &stream->rto_timer, finalize_maybe);
uv_close((uv_handle_t *) &stream->rack_reo_timer, finalize_maybe);
uv_close((uv_handle_t *) &stream->tlp_timer, finalize_maybe);
uv_close((uv_handle_t *) &stream->zwp_timer, finalize_maybe);
uv_close((uv_handle_t *) &stream->refill_pacing_timer, finalize_maybe);
uv_close((uv_handle_t *) &stream->pacing_timer, finalize_maybe);

if (udx->teardown && socket != NULL && socket->streams == NULL) {
udx_socket_close(socket);
Expand Down Expand Up @@ -1382,22 +1378,9 @@ send_datagrams (udx_socket_t *socket) {
return true;
}

static void
update_pacing_time (udx_stream_t *stream) {
uint64_t now = uv_now(stream->udx->loop); // 1ms granularity

if (now > stream->tb_last_refill_ms) {
uint64_t factor = now - stream->tb_last_refill_ms;
assert(stream->pacing_bytes_per_ms > 0);
stream->tb_available = factor * stream->pacing_bytes_per_ms;
stream->tb_last_refill_ms = now;
}
}

static bool
stream_may_send (udx_stream_t *stream) {
update_pacing_time(stream);
if (stream->tb_available == 0) {
if (stream->next_send_ts > uv_now(stream->udx->loop)) {
return false;
}
return stream->inflight_queue.len < send_window_in_packets(stream) || stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP;
Expand All @@ -1406,11 +1389,25 @@ stream_may_send (udx_stream_t *stream) {
void
pacing_timer_timeout (uv_timer_t *timer) {
udx_stream_t *stream = timer->data;

update_pacing_time(stream);
// todo: make pacing timer socket-wide
update_poll(stream->socket);
}

static void
advance_next_send_ts (udx_stream_t *stream, uint64_t time_per_packet_ms, double time_per_packet_fraction) {

stream->next_send_ts_fraction += time_per_packet_fraction;
if (stream->next_send_ts_fraction > 1) {
stream->next_send_ts_fraction -= 1;
stream->next_send_ts += 1;
}
stream->next_send_ts += time_per_packet_ms;

if (stream->next_send_ts > uv_now(stream->udx->loop)) {
uv_timer_start(&stream->pacing_timer, pacing_timer_timeout, stream->next_send_ts - uv_now(stream->udx->loop), 0);
}
}

static bool
send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
bool inflight_queue_was_empty = stream->inflight_queue.len == 0;
Expand Down Expand Up @@ -1546,6 +1543,14 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
return true;
}

// compute rate for sending data packets
// stream->next_send_ts is incremented by time_per_packet_ms
// stream->next_send_ts_fraction is incremented by time_per_packet_fraction

double time_per_packet = 1.0 / stream->pacing_packet_per_ms;
uint64_t time_per_packet_ms = (uint64_t) time_per_packet;
double time_per_packet_fraction = time_per_packet - time_per_packet_ms;

while (stream->retransmit_queue.len > 0 && stream_may_send(stream)) {
udx_packet_t *pkt = udx__queue_data(udx__queue_peek(&stream->retransmit_queue), udx_packet_t, queue);
assert(pkt != NULL);
Expand All @@ -1562,12 +1567,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
udx__queue_tail(&stream->inflight_queue, &pkt->queue);

stream->inflight += pkt->size;
stream->tb_available = pkt->size > stream->tb_available ? 0 : stream->tb_available - pkt->size;

if (stream->tb_available == 0) {

uv_timer_start(&stream->refill_pacing_timer, pacing_timer_timeout, 1, 0);
}
advance_next_send_ts(stream, time_per_packet_ms, time_per_packet_fraction);

stream->packets_tx++;
stream->bytes_tx += pkt->size;
Expand Down Expand Up @@ -1689,11 +1689,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
assert(pkt->size > 0 && pkt->size < 1500);

stream->inflight += pkt->size;
stream->tb_available = pkt->size > stream->tb_available ? 0 : stream->tb_available - pkt->size;

if (stream->tb_available == 0) {
uv_timer_start(&stream->refill_pacing_timer, pacing_timer_timeout, 1, 0);
}
advance_next_send_ts(stream, time_per_packet_ms, time_per_packet_fraction);

if (tlp) {
stream->write_wanted &= ~UDX_STREAM_WRITE_WANT_TLP;
Expand Down Expand Up @@ -2202,6 +2198,8 @@ int
udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream_close_cb close_cb, udx_stream_finalize_cb finalize_cb) {
if (udx->teardown) return UV_EINVAL;

memset(stream, 0, sizeof(udx_stream_t));

udx->refs++;

if (!(udx->has_streams)) {
Expand Down Expand Up @@ -2276,29 +2274,25 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
stream->rack_next_seq = 0;
stream->rack_fack = 0;

stream->tb_available = UDX_INIT_PACING_RATE;
stream->tb_last_refill_ms = uv_now(udx->loop);
stream->next_send_ts = uv_now(udx->loop);
stream->pacing_packet_per_ms = 1.0; // 9.6 megabits / second, adjusted by BBR

stream->tlp_in_flight = false;
stream->tlp_end_seq = 0;
stream->tlp_is_retrans = false;
stream->tlp_permitted = false;

memset(&stream->rack_reo_timer, 0, sizeof(uv_timer_t));
uv_timer_init(udx->loop, &stream->rack_reo_timer);
stream->rack_reo_timer.data = stream;

memset(&stream->tlp_timer, 0, sizeof(uv_timer_t));
uv_timer_init(udx->loop, &stream->tlp_timer);
stream->tlp_timer.data = stream;

memset(&stream->zwp_timer, 0, sizeof(uv_timer_t));
uv_timer_init(udx->loop, &stream->zwp_timer);
stream->zwp_timer.data = stream;

memset(&stream->refill_pacing_timer, 0, sizeof(uv_timer_t));
uv_timer_init(udx->loop, &stream->refill_pacing_timer);
stream->refill_pacing_timer.data = stream;
uv_timer_init(udx->loop, &stream->pacing_timer);
stream->pacing_timer.data = stream;

stream->nrefs = 5;
stream->deferred_ack = 0;
Expand Down Expand Up @@ -2613,6 +2607,8 @@ static void
_udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb, bool is_write_end) {
assert(bufs_len > 0);

bool was_idle = stream->writes_queued_bytes == 0;

// initialize write object

write->size = 0;
Expand Down Expand Up @@ -2651,6 +2647,11 @@ _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf
stream->write_wanted |= UDX_STREAM_WRITE_WANT_ZWP;
uv_timer_start(&stream->zwp_timer, udx_zwp_timeout, stream->rto, 0);
}

if (was_idle) {
// prevent bursts when resuming from idle
stream->next_send_ts = uv_now(stream->udx->loop);
}
}

int
Expand Down
17 changes: 8 additions & 9 deletions src/udx_bbr.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ bbr_extra_acked (udx_stream_t *stream) {
return max_uint32(stream->bbr.extra_acked[0], stream->bbr.extra_acked[1]);
}

static uint64_t
bbr_bw_to_pacing_rate (udx_stream_t *stream, double bw, double gain) {
uint32_t mss = udx__max_payload(stream);
uint64_t bpms = bw * mss * gain * bbr_pacing_margin_percent;
return max_uint64(bpms, 1);
// unit: packets per millisecond
static double
bbr_bw_to_pacing_rate (double bw, double gain) {
return bw * gain * bbr_pacing_margin_percent;
}

static void
Expand All @@ -106,19 +105,19 @@ bbr_init_pacing_rate_from_rtt (udx_stream_t *stream) {

assert(bw != 0);

stream->pacing_bytes_per_ms = bbr_bw_to_pacing_rate(stream, bw, bbr_high_gain);
stream->pacing_packet_per_ms = bbr_bw_to_pacing_rate(bw, bbr_high_gain);
}

static void
bbr_set_pacing_rate (udx_stream_t *stream, double bw, double gain) {
uint64_t rate_bpms = bbr_bw_to_pacing_rate(stream, bw, gain);
double rate_ppms = bbr_bw_to_pacing_rate(bw, gain);

if (!stream->bbr.has_seen_rtt && stream->srtt) {
bbr_init_pacing_rate_from_rtt(stream);
}

if (stream->bbr.full_bw_reached || rate_bpms > stream->pacing_bytes_per_ms) {
stream->pacing_bytes_per_ms = rate_bpms;
if (stream->bbr.full_bw_reached || rate_ppms > stream->pacing_packet_per_ms) {
stream->pacing_packet_per_ms = rate_ppms;
}
}

Expand Down
14 changes: 12 additions & 2 deletions test/stream-write-read-perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ print_rate_on_interval (uv_timer_t *t) {
udx_stream_get_bw(&astream, &a_bw);
udx_stream_get_bw(&bstream, &b_bw);

printf("A bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", a_bw, astream.rate_delivered / (1.0f * astream.rate_interval_ms), astream.rate_sample_is_app_limited ? "(app limited)" : "");
printf("B bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", b_bw, bstream.rate_delivered / (1.0f * bstream.rate_interval_ms), bstream.rate_sample_is_app_limited ? "(app limited)" : "");
double a_rate = 0;
double b_rate = 0;

if (astream.rate_interval_ms) {
a_rate = astream.rate_delivered / (1.0f * astream.rate_interval_ms);
}
if (bstream.rate_interval_ms) {
b_rate = bstream.rate_delivered / (1.0f * bstream.rate_interval_ms);
}

printf("A bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", a_bw, a_rate, astream.rate_sample_is_app_limited ? "(app limited)" : "");
printf("B bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", b_bw, b_rate, bstream.rate_sample_is_app_limited ? "(app limited)" : "");
}

int
Expand Down