diff --git a/examples/udxperf.c b/examples/udxperf.c index 64cac9e..394c083 100644 --- a/examples/udxperf.c +++ b/examples/udxperf.c @@ -276,7 +276,7 @@ print_interval (udxperf_client_t *client, uint64_t bytes, uint64_t start, uint64 printf("[%3d] %6.4f-%6.4f sec %s %s/sec", stream->local_id, (start - client->start_time) / 1000.0, (end - client->start_time) / 1000.0, bytes_buf, bps_buf); if (is_client && extra_wanted) { - printf(" cwnd=%d ssthresh=%d pacing_rate=%u fast_recovery_count=%d rto_count=%d rtx_count=%d", stream->cwnd, stream->pacing_bytes_per_ms, stream->ssthresh, stream->fast_recovery_count, stream->rto_count, stream->retransmit_count); + printf(" cwnd=%d ssthresh=%d pacing_rate_pkts_per_ms=%f fast_recovery_count=%d rto_count=%d rtx_count=%d", stream->cwnd, stream->ssthresh, stream->pacing_packet_per_ms, stream->fast_recovery_count, stream->rto_count, stream->retransmit_count); } printf("\n"); } diff --git a/include/udx.h b/include/udx.h index eceadeb..54e8038 100644 --- a/include/udx.h +++ b/include/udx.h @@ -325,14 +325,12 @@ struct udx_stream_s { } bbr; - uint32_t pacing_bytes_per_ms; // computed by bbr module. 'BBR.pacing_rate' in IETF draft + uint64_t next_send_ts; // time to send next packet, may be now, in the future, or in the past. + double next_send_ts_fraction; // fractions of a millisecond, in range [0.0-1.0) + double pacing_packet_per_ms; // pacing rate in packets per millisecond, set by BBR uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)? - // pacing (tb = token bucket) - uint32_t tb_available; - uint64_t tb_last_refill_ms; - // tlp bool tlp_is_retrans; // the probe in-flight was a retransmission bool tlp_in_flight; // if set, tlp_end_seq indicates the seqno @@ -345,7 +343,7 @@ struct udx_stream_s { uv_timer_t rack_reo_timer; uv_timer_t tlp_timer; uv_timer_t zwp_timer; - uv_timer_t refill_pacing_timer; + uv_timer_t pacing_timer; size_t inflight; diff --git a/src/udx.c b/src/udx.c index 54334dd..6564854 100644 --- a/src/udx.c +++ b/src/udx.c @@ -33,7 +33,6 @@ #define UDX_HEADER_DATA_OR_END (UDX_HEADER_DATA | UDX_HEADER_END) #define UDX_DEFAULT_TTL 64 -#define UDX_INIT_PACING_RATE 25000 // 25MB/s, 200mbit. updated by bbr_init #define UDX_DEFAULT_SNDBUF_SIZE 212992 #define UDX_MAX_RTO_TIMEOUTS 6 @@ -150,8 +149,6 @@ stream_has_data (udx_stream_t *stream) { return stream->write_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0; } -static void -update_pacing_time (udx_stream_t *stream); static bool stream_write_wanted (udx_stream_t *stream) { if (!(stream->status & UDX_STREAM_CONNECTED)) { @@ -167,9 +164,8 @@ stream_write_wanted (udx_stream_t *stream) { return true; } - update_pacing_time(stream); - - return stream->inflight_queue.len < send_window_in_packets(stream) && stream->tb_available && stream_has_data(stream); + return stream->inflight_queue.len < send_window_in_packets(stream) && + stream->next_send_ts <= uv_now(stream->udx->loop) && stream_has_data(stream); } static bool @@ -500,13 +496,13 @@ close_stream (udx_stream_t *stream, int err) { uv_timer_stop(&stream->rack_reo_timer); uv_timer_stop(&stream->tlp_timer); uv_timer_stop(&stream->zwp_timer); - uv_timer_stop(&stream->refill_pacing_timer); + uv_timer_stop(&stream->pacing_timer); uv_close((uv_handle_t *) &stream->rto_timer, finalize_maybe); uv_close((uv_handle_t *) &stream->rack_reo_timer, finalize_maybe); uv_close((uv_handle_t *) &stream->tlp_timer, finalize_maybe); uv_close((uv_handle_t *) &stream->zwp_timer, finalize_maybe); - uv_close((uv_handle_t *) &stream->refill_pacing_timer, finalize_maybe); + uv_close((uv_handle_t *) &stream->pacing_timer, finalize_maybe); if (udx->teardown && socket != NULL && socket->streams == NULL) { udx_socket_close(socket); @@ -1382,22 +1378,9 @@ send_datagrams (udx_socket_t *socket) { return true; } -static void -update_pacing_time (udx_stream_t *stream) { - uint64_t now = uv_now(stream->udx->loop); // 1ms granularity - - if (now > stream->tb_last_refill_ms) { - uint64_t factor = now - stream->tb_last_refill_ms; - assert(stream->pacing_bytes_per_ms > 0); - stream->tb_available = factor * stream->pacing_bytes_per_ms; - stream->tb_last_refill_ms = now; - } -} - static bool stream_may_send (udx_stream_t *stream) { - update_pacing_time(stream); - if (stream->tb_available == 0) { + if (stream->next_send_ts > uv_now(stream->udx->loop)) { return false; } return stream->inflight_queue.len < send_window_in_packets(stream) || stream->write_wanted & UDX_STREAM_WRITE_WANT_ZWP; @@ -1406,11 +1389,25 @@ stream_may_send (udx_stream_t *stream) { void pacing_timer_timeout (uv_timer_t *timer) { udx_stream_t *stream = timer->data; - - update_pacing_time(stream); + // todo: make pacing timer socket-wide update_poll(stream->socket); } +static void +advance_next_send_ts (udx_stream_t *stream, uint64_t time_per_packet_ms, double time_per_packet_fraction) { + + stream->next_send_ts_fraction += time_per_packet_fraction; + if (stream->next_send_ts_fraction > 1) { + stream->next_send_ts_fraction -= 1; + stream->next_send_ts += 1; + } + stream->next_send_ts += time_per_packet_ms; + + if (stream->next_send_ts > uv_now(stream->udx->loop)) { + uv_timer_start(&stream->pacing_timer, pacing_timer_timeout, stream->next_send_ts - uv_now(stream->udx->loop), 0); + } +} + static bool send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { bool inflight_queue_was_empty = stream->inflight_queue.len == 0; @@ -1546,6 +1543,14 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { return true; } + // compute rate for sending data packets + // stream->next_send_ts is incremented by time_per_packet_ms + // stream->next_send_ts_fraction is incremented by time_per_packet_fraction + + double time_per_packet = 1.0 / stream->pacing_packet_per_ms; + uint64_t time_per_packet_ms = (uint64_t) time_per_packet; + double time_per_packet_fraction = time_per_packet - time_per_packet_ms; + while (stream->retransmit_queue.len > 0 && stream_may_send(stream)) { udx_packet_t *pkt = udx__queue_data(udx__queue_peek(&stream->retransmit_queue), udx_packet_t, queue); assert(pkt != NULL); @@ -1562,12 +1567,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { udx__queue_tail(&stream->inflight_queue, &pkt->queue); stream->inflight += pkt->size; - stream->tb_available = pkt->size > stream->tb_available ? 0 : stream->tb_available - pkt->size; - - if (stream->tb_available == 0) { - - uv_timer_start(&stream->refill_pacing_timer, pacing_timer_timeout, 1, 0); - } + advance_next_send_ts(stream, time_per_packet_ms, time_per_packet_fraction); stream->packets_tx++; stream->bytes_tx += pkt->size; @@ -1689,11 +1689,7 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) { assert(pkt->size > 0 && pkt->size < 1500); stream->inflight += pkt->size; - stream->tb_available = pkt->size > stream->tb_available ? 0 : stream->tb_available - pkt->size; - - if (stream->tb_available == 0) { - uv_timer_start(&stream->refill_pacing_timer, pacing_timer_timeout, 1, 0); - } + advance_next_send_ts(stream, time_per_packet_ms, time_per_packet_fraction); if (tlp) { stream->write_wanted &= ~UDX_STREAM_WRITE_WANT_TLP; @@ -2202,6 +2198,8 @@ int udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream_close_cb close_cb, udx_stream_finalize_cb finalize_cb) { if (udx->teardown) return UV_EINVAL; + memset(stream, 0, sizeof(udx_stream_t)); + udx->refs++; if (!(udx->has_streams)) { @@ -2276,29 +2274,25 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream stream->rack_next_seq = 0; stream->rack_fack = 0; - stream->tb_available = UDX_INIT_PACING_RATE; - stream->tb_last_refill_ms = uv_now(udx->loop); + stream->next_send_ts = uv_now(udx->loop); + stream->pacing_packet_per_ms = 1.0; // 9.6 megabits / second, adjusted by BBR stream->tlp_in_flight = false; stream->tlp_end_seq = 0; stream->tlp_is_retrans = false; stream->tlp_permitted = false; - memset(&stream->rack_reo_timer, 0, sizeof(uv_timer_t)); uv_timer_init(udx->loop, &stream->rack_reo_timer); stream->rack_reo_timer.data = stream; - memset(&stream->tlp_timer, 0, sizeof(uv_timer_t)); uv_timer_init(udx->loop, &stream->tlp_timer); stream->tlp_timer.data = stream; - memset(&stream->zwp_timer, 0, sizeof(uv_timer_t)); uv_timer_init(udx->loop, &stream->zwp_timer); stream->zwp_timer.data = stream; - memset(&stream->refill_pacing_timer, 0, sizeof(uv_timer_t)); - uv_timer_init(udx->loop, &stream->refill_pacing_timer); - stream->refill_pacing_timer.data = stream; + uv_timer_init(udx->loop, &stream->pacing_timer); + stream->pacing_timer.data = stream; stream->nrefs = 5; stream->deferred_ack = 0; @@ -2613,6 +2607,8 @@ static void _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb, bool is_write_end) { assert(bufs_len > 0); + bool was_idle = stream->writes_queued_bytes == 0; + // initialize write object write->size = 0; @@ -2651,6 +2647,11 @@ _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf stream->write_wanted |= UDX_STREAM_WRITE_WANT_ZWP; uv_timer_start(&stream->zwp_timer, udx_zwp_timeout, stream->rto, 0); } + + if (was_idle) { + // prevent bursts when resuming from idle + stream->next_send_ts = uv_now(stream->udx->loop); + } } int diff --git a/src/udx_bbr.c b/src/udx_bbr.c index 63e7688..045dbbd 100644 --- a/src/udx_bbr.c +++ b/src/udx_bbr.c @@ -84,11 +84,10 @@ bbr_extra_acked (udx_stream_t *stream) { return max_uint32(stream->bbr.extra_acked[0], stream->bbr.extra_acked[1]); } -static uint64_t -bbr_bw_to_pacing_rate (udx_stream_t *stream, double bw, double gain) { - uint32_t mss = udx__max_payload(stream); - uint64_t bpms = bw * mss * gain * bbr_pacing_margin_percent; - return max_uint64(bpms, 1); +// unit: packets per millisecond +static double +bbr_bw_to_pacing_rate (double bw, double gain) { + return bw * gain * bbr_pacing_margin_percent; } static void @@ -106,19 +105,19 @@ bbr_init_pacing_rate_from_rtt (udx_stream_t *stream) { assert(bw != 0); - stream->pacing_bytes_per_ms = bbr_bw_to_pacing_rate(stream, bw, bbr_high_gain); + stream->pacing_packet_per_ms = bbr_bw_to_pacing_rate(bw, bbr_high_gain); } static void bbr_set_pacing_rate (udx_stream_t *stream, double bw, double gain) { - uint64_t rate_bpms = bbr_bw_to_pacing_rate(stream, bw, gain); + double rate_ppms = bbr_bw_to_pacing_rate(bw, gain); if (!stream->bbr.has_seen_rtt && stream->srtt) { bbr_init_pacing_rate_from_rtt(stream); } - if (stream->bbr.full_bw_reached || rate_bpms > stream->pacing_bytes_per_ms) { - stream->pacing_bytes_per_ms = rate_bpms; + if (stream->bbr.full_bw_reached || rate_ppms > stream->pacing_packet_per_ms) { + stream->pacing_packet_per_ms = rate_ppms; } } diff --git a/test/stream-write-read-perf.c b/test/stream-write-read-perf.c index a7722fd..83ca652 100644 --- a/test/stream-write-read-perf.c +++ b/test/stream-write-read-perf.c @@ -90,8 +90,18 @@ print_rate_on_interval (uv_timer_t *t) { udx_stream_get_bw(&astream, &a_bw); udx_stream_get_bw(&bstream, &b_bw); - printf("A bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", a_bw, astream.rate_delivered / (1.0f * astream.rate_interval_ms), astream.rate_sample_is_app_limited ? "(app limited)" : ""); - printf("B bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", b_bw, bstream.rate_delivered / (1.0f * bstream.rate_interval_ms), bstream.rate_sample_is_app_limited ? "(app limited)" : ""); + double a_rate = 0; + double b_rate = 0; + + if (astream.rate_interval_ms) { + a_rate = astream.rate_delivered / (1.0f * astream.rate_interval_ms); + } + if (bstream.rate_interval_ms) { + b_rate = bstream.rate_delivered / (1.0f * bstream.rate_interval_ms); + } + + printf("A bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", a_bw, a_rate, astream.rate_sample_is_app_limited ? "(app limited)" : ""); + printf("B bbr.bw=%" PRIu64 " rate=%9.3f/kpkts/sec %s\n", b_bw, b_rate, bstream.rate_sample_is_app_limited ? "(app limited)" : ""); } int