Skip to content

Commit a357a01

Browse files
authored
Simpler Combined Write (holepunchto#265)
remove stream->pkt_buf, instead accumulate iovecs in the next packet and dynamically allocate memory if necessary
1 parent bbb5901 commit a357a01

4 files changed

Lines changed: 243 additions & 71 deletions

File tree

include/udx.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ extern "C" {
2121
#define UDX_MTU_STEP 32
2222

2323
#define UDX_SOCKET_PACKET_BUFFER_SIZE 2048
24-
25-
#define UDX_MAX_COMBINED_WRITES 1000
24+
// max writes = 1 write per byte
25+
#define UDX_MAX_COMBINED_WRITES (UDX_MTU_MAX - UDX_IPV4_HEADER_SIZE)
2626

2727
#define UDX_MTU_STATE_BASE 1
2828
#define UDX_MTU_STATE_SEARCH 2
@@ -228,13 +228,9 @@ struct udx_stream_s {
228228
uint16_t retransmit_count;
229229
size_t writes_queued_bytes;
230230

231-
// next packet fields
232-
// buffer data for the next packet here until capacity is filled or send_next_packet
233-
uv_buf_t pkt_buf[UDX_MAX_COMBINED_WRITES];
234-
udx_stream_write_buf_t *pkt_wbuf[UDX_MAX_COMBINED_WRITES];
235231
uint16_t pkt_capacity;
236-
uint16_t pkt_nwbufs;
237232
uint8_t pkt_header_flag;
233+
udx_packet_t *pkt; // in construction packet or NULL
238234
uv_prepare_t pending_packet_prepare;
239235

240236
// true if data received before we were connected, in this
@@ -426,7 +422,14 @@ struct udx_packet_s {
426422

427423
// just alloc it in place here, easier to manage
428424
uint8_t header[UDX_HEADER_SIZE];
429-
uint16_t nbufs;
425+
uint16_t nwbufs; // nwbufs = nbufs - 1
426+
uint16_t nwbufs_capacity; // initially ARRAY_SIZEOF(wbuf_sml), used for realloc
427+
428+
uv_buf_t *bufs;
429+
udx_stream_write_buf_t **wbufs;
430+
431+
uv_buf_t buf_sml[6];
432+
udx_stream_write_buf_t *wbuf_sml[5];
430433
};
431434

432435
struct udx_socket_send_s {

src/udx.c

Lines changed: 87 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
#define UDX_PROBE_TYPE_TLP 1 // packet is a tail-loss probe, can exceed cwnd, set tail probe flags
5252
#define UDX_PROBE_TYPE_ZWP 2 // packet is a zero-window probe, can exceed rwnd
5353

54+
#define UDX_ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0]))
55+
5456
static void
5557
arm_stream_timers (udx_stream_t *stream, bool sent_tlp);
5658

@@ -135,12 +137,13 @@ ref_dec (udx_t *udx) {
135137
static void
136138
on_uv_close (uv_handle_t *handle) {
137139
udx_socket_t *socket = (udx_socket_t *) handle;
140+
udx_t *udx = socket->udx;
138141

139142
if (socket->on_close != NULL) {
140143
socket->on_close(socket);
141144
}
142145

143-
ref_dec(socket->udx);
146+
ref_dec(udx);
144147
}
145148

146149
static void
@@ -195,14 +198,36 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {
195198
}
196199
}
197200

198-
static udx_stream_write_buf_t **
199-
wbufs_offset (udx_packet_t *pkt) {
200-
return (udx_stream_write_buf_t **) (((uv_buf_t *) (pkt + 1)) + pkt->nbufs);
201+
static void
202+
deref_packet (udx_packet_t *pkt) {
203+
if (--pkt->ref_count == 0) {
204+
if (pkt->bufs != &pkt->buf_sml[0]) {
205+
free(pkt->bufs);
206+
free(pkt->wbufs);
207+
}
208+
free(pkt);
209+
}
201210
}
202211

203212
static void
204-
deref_packet (udx_packet_t *pkt) {
205-
if (--pkt->ref_count == 0) free(pkt);
213+
cancel_packet (udx_packet_t *pkt) {
214+
uv_buf_t *bufs = pkt->bufs;
215+
udx_stream_write_buf_t **wbufs = pkt->wbufs;
216+
217+
for (int i = 0; i < pkt->nwbufs; i++) {
218+
size_t buf_len = bufs[i + 1].len;
219+
udx_stream_write_buf_t *wbuf = wbufs[i];
220+
on_bytes_acked(wbuf, buf_len, true);
221+
222+
// todo: move into on_bytes_acked itself
223+
udx_stream_write_t *write = wbuf->write;
224+
225+
if (write->bytes_acked == write->size && write->on_ack) {
226+
write->on_ack(write, UV_ECANCELED, 0);
227+
}
228+
}
229+
230+
deref_packet(pkt);
206231
}
207232

208233
static void
@@ -212,31 +237,18 @@ clear_outgoing_packets (udx_stream_t *stream) {
212237
// 2. destroy all wbufs
213238
// 3. set write->bytes_acked = write->size and call ack(cancel) on all writes
214239

240+
if (stream->pkt) {
241+
assert(stream->pkt->ref_count == 1);
242+
cancel_packet(stream->pkt);
243+
}
244+
215245
// We should make sure all existing packets do not send, and notify the user that they failed
216246
for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) {
217247
udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_remove(&(stream->outgoing), seq);
218248

219249
if (pkt == NULL) continue;
220250

221-
assert(pkt->nbufs >= 2);
222-
223-
uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
224-
udx_stream_write_buf_t **wbufs = wbufs_offset(pkt);
225-
226-
for (int i = 1; i < pkt->nbufs; i++) {
227-
size_t pkt_len = bufs[i].len;
228-
udx_stream_write_buf_t *wbuf = wbufs[i - 1];
229-
on_bytes_acked(wbuf, pkt_len, true);
230-
231-
// todo: move into on_bytes_acked itself
232-
udx_stream_write_t *write = wbuf->write;
233-
234-
if (write->bytes_acked == write->size && write->on_ack) {
235-
write->on_ack(write, UV_ECANCELED, 0);
236-
}
237-
}
238-
239-
deref_packet(pkt);
251+
cancel_packet(pkt);
240252
}
241253

242254
while (stream->write_queue.len > 0) {
@@ -299,7 +311,8 @@ mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {
299311

300312
assert(wanted_size > pkt->size);
301313

302-
if (pkt->nbufs < 2 || pkt->header[3] != 0) {
314+
// cannot probeify a packet with 1) no data 2) already has padding
315+
if (pkt->nwbufs < 1 || pkt->header[3] != 0) {
303316
return 0;
304317
}
305318

@@ -648,8 +661,8 @@ _send_packet (udx_stream_t *stream, udx_packet_t *pkt, bool is_retransmit) {
648661
udx__queue_tail(&stream->inflight_queue, &pkt->queue);
649662
stream->inflight += pkt->size;
650663

651-
uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
652-
int nbufs = pkt->nbufs;
664+
uv_buf_t *bufs = pkt->bufs;
665+
int nbufs = pkt->nwbufs + 1; // udx header
653666

654667
uv_buf_t _bufs[UDX_MAX_COMBINED_WRITES + 2];
655668

@@ -660,7 +673,7 @@ _send_packet (udx_stream_t *stream, udx_packet_t *pkt, bool is_retransmit) {
660673
_bufs[1].base = probe_data;
661674
_bufs[1].len = padding_size;
662675

663-
for (int i = 1; i < pkt->nbufs; i++) {
676+
for (int i = 1; i < nbufs; i++) {
664677
_bufs[1 + i] = bufs[i];
665678
}
666679
bufs = _bufs;
@@ -702,15 +715,28 @@ _send_packet (udx_stream_t *stream, udx_packet_t *pkt, bool is_retransmit) {
702715

703716
static void
704717
reset_next_packet (udx_stream_t *stream) {
718+
705719
stream->pkt_capacity = udx__max_payload(stream);
706720
stream->pkt_header_flag = 0;
707-
stream->pkt_nwbufs = 0;
721+
stream->pkt = NULL;
722+
723+
// allocate packet
724+
// most fields are not set until sending
725+
stream->pkt = malloc(sizeof(udx_packet_t));
726+
udx_packet_t *pkt = stream->pkt;
727+
memset(pkt, 0, sizeof(*pkt));
728+
pkt->ref_count = 1;
729+
pkt->bufs = &pkt->buf_sml[0];
730+
pkt->wbufs = &pkt->wbuf_sml[0];
731+
pkt->size = 20;
732+
pkt->nwbufs_capacity = UDX_ARRAY_SIZE(pkt->wbuf_sml);
733+
pkt->nwbufs = 0;
708734

709735
uv_prepare_stop(&stream->pending_packet_prepare);
710736
}
711737

712-
// called by send_new_packet and send_delayed_packet
713-
// allocates and sends data packets
738+
// called by send_new_packet and on_pending_packet_prepare
739+
// sends stream->pkt
714740
static void
715741
_send_new_packet (udx_stream_t *stream, int probe_type) {
716742

@@ -719,33 +745,16 @@ _send_new_packet (udx_stream_t *stream, int probe_type) {
719745
bool inflight_queue_was_empty = stream->inflight_queue.len == 0;
720746
bool tlp = probe_type == UDX_PROBE_TYPE_TLP;
721747

722-
int nwbufs = stream->pkt_nwbufs;
723-
int nbufs = nwbufs + 1; // extra buf for header
748+
udx_packet_t *pkt = stream->pkt;
724749

725-
udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + sizeof(uv_buf_t) * nbufs + sizeof(void *) * nwbufs);
726-
727-
// init_stream_packet
728-
memset(pkt, 0, sizeof(*pkt));
729750
udx_write_header(pkt->header, stream, stream->pkt_header_flag);
730751
pkt->seq = stream->seq;
731752
pkt->stream = stream; // todo: necessary?
732753
pkt->remote_addr = stream->remote_addr;
733754
pkt->remote_addr_len = stream->remote_addr_len;
734-
pkt->nbufs = nbufs;
735-
736-
uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
737-
udx_stream_write_buf_t **wbufs = wbufs_offset(pkt);
738-
739755
pkt->ref_count = 1;
740756

741-
bufs[0] = uv_buf_init((char *) &pkt->header, UDX_HEADER_SIZE);
742-
memcpy(&bufs[1], stream->pkt_buf, sizeof(bufs[0]) * nwbufs);
743-
memcpy(wbufs, stream->pkt_wbuf, sizeof(wbufs[0]) * nwbufs);
744-
745-
pkt->size = 0;
746-
for (int i = 0; i < nbufs; i++) {
747-
pkt->size += bufs[i].len;
748-
}
757+
pkt->bufs[0] = uv_buf_init((char *) &pkt->header, UDX_HEADER_SIZE);
749758

750759
bool mtu_probe = stream->mtu_probe_wanted && probe_type == UDX_PROBE_TYPE_NONE && !stream->remote_changing && mtu_probeify_packet(pkt, stream->mtu_probe_size);
751760

@@ -793,10 +802,10 @@ send_new_packet (udx_stream_t *stream, int probe_type) {
793802
if (!stream_may_send(stream) && probe_type == UDX_PROBE_TYPE_NONE) return false;
794803

795804
bool tlp = probe_type == UDX_PROBE_TYPE_TLP;
796-
797805
bool inflight_queue_was_empty = stream->inflight_queue.len == 0;
806+
udx_packet_t *pkt = stream->pkt;
798807

799-
while (stream->pkt_capacity > 0 && stream->pkt_nwbufs < UDX_MAX_COMBINED_WRITES && stream->write_queue.len > 0) {
808+
while (stream->pkt_capacity > 0 && stream->write_queue.len > 0) {
800809
udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_peek(&stream->write_queue), udx_stream_write_buf_t, queue);
801810

802811
uint64_t writesz = wbuf->buf.len - wbuf->bytes_acked - wbuf->bytes_inflight;
@@ -807,11 +816,27 @@ send_new_packet (udx_stream_t *stream, int probe_type) {
807816
uv_buf_t partial = uv_buf_init(wbuf->buf.base + wbuf->bytes_acked + wbuf->bytes_inflight, len);
808817
wbuf->bytes_inflight += len;
809818
stream->pkt_capacity -= len;
819+
pkt->size += len;
810820

811-
stream->pkt_buf[stream->pkt_nwbufs] = partial;
812-
stream->pkt_wbuf[stream->pkt_nwbufs] = wbuf;
821+
if (pkt->nwbufs == pkt->nwbufs_capacity) {
822+
pkt->nwbufs_capacity *= 2;
823+
bool first_alloc = pkt->bufs == pkt->buf_sml;
813824

814-
stream->pkt_nwbufs++;
825+
if (first_alloc) {
826+
pkt->bufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0]));
827+
pkt->wbufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->wbufs[0]));
828+
memcpy(pkt->bufs, pkt->buf_sml, sizeof(pkt->buf_sml));
829+
memcpy(pkt->wbufs, pkt->wbuf_sml, sizeof(pkt->wbuf_sml));
830+
} else {
831+
pkt->bufs = realloc(pkt->bufs, (pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0]));
832+
pkt->wbufs = realloc(pkt->wbufs, pkt->nwbufs_capacity * sizeof(pkt->wbufs[0]));
833+
}
834+
}
835+
836+
pkt->bufs[pkt->nwbufs + 1] = partial;
837+
pkt->wbufs[pkt->nwbufs] = wbuf;
838+
839+
pkt->nwbufs++;
815840

816841
if (len > 0) {
817842
stream->pkt_header_flag |= UDX_HEADER_DATA;
@@ -1218,12 +1243,12 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack, udx_rate_sample_t *rs)
12181243
stream->reordering_seen = true;
12191244
}
12201245

1221-
uv_buf_t *bufs = (uv_buf_t *) (pkt + 1);
1222-
udx_stream_write_buf_t **wbufs = wbufs_offset(pkt);
1246+
uv_buf_t *bufs = pkt->bufs;
1247+
udx_stream_write_buf_t **wbufs = pkt->wbufs;
12231248

1224-
for (int i = 1; i < pkt->nbufs; i++) {
1225-
size_t pkt_len = bufs[i].len;
1226-
udx_stream_write_buf_t *wbuf = wbufs[i - 1];
1249+
for (int i = 0; i < pkt->nwbufs; i++) {
1250+
size_t pkt_len = bufs[i + 1].len;
1251+
udx_stream_write_buf_t *wbuf = wbufs[i];
12271252

12281253
on_bytes_acked(wbuf, pkt_len, false);
12291254

@@ -2079,7 +2104,6 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
20792104
stream->ca_state = UDX_CA_OPEN;
20802105
stream->udx = udx;
20812106
reset_mtu_state_machine(stream);
2082-
stream->pkt_capacity = udx__max_payload(stream);
20832107

20842108
// initially stream is application limited, since we haven't
20852109
// discovered a network limit.
@@ -2138,6 +2162,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
21382162
udx__cirbuf_set(&(udx->streams_by_id), (udx_cirbuf_val_t *) stream);
21392163

21402164
debug_throughput_init(stream);
2165+
reset_next_packet(stream);
21412166

21422167
return 0;
21432168
}

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ list(APPEND tests
2222
stream-change-remote
2323
stream-multiple
2424
stream-write-read-receive-window
25-
win-filter
25+
win-filter
2626
stream-bbr-state
27+
stream-combined-write
2728
)
2829

2930
list(APPEND skipped_tests

0 commit comments

Comments
 (0)