Skip to content

Commit bbb5901

Browse files
authored
move from uv_poll API to uv_udp API (holepunchto#263)
* move from uv_poll API to uv_udp API * queue socket_send_ttl messages. Whenever a packet is sent, peek the next packet and if it requires a specific ttl set the socket TTL. After a packet with specific TTL is sent, restore the socket TTL * defer sending first packet when writing to an empty write queue until the packet is full or the next uv_prepare phase
1 parent 25b063b commit bbb5901

8 files changed

Lines changed: 1259 additions & 1439 deletions

File tree

include/udx.h

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ extern "C" {
2020
#define UDX_MTU_MAX 1500
2121
#define UDX_MTU_STEP 32
2222

23+
#define UDX_SOCKET_PACKET_BUFFER_SIZE 2048
24+
25+
#define UDX_MAX_COMBINED_WRITES 1000
26+
2327
#define UDX_MTU_STATE_BASE 1
2428
#define UDX_MTU_STATE_SEARCH 2
2529
#define UDX_MTU_STATE_ERROR 3
@@ -48,14 +52,10 @@ extern "C" {
4852
#define UDX_HEADER_MESSAGE 0b01000
4953
#define UDX_HEADER_DESTROY 0b10000
5054

51-
#define UDX_STREAM_WRITE_WANT_STATE 0b0001
52-
#define UDX_STREAM_WRITE_WANT_TLP 0b0010
53-
#define UDX_STREAM_WRITE_WANT_DESTROY 0b0100
54-
#define UDX_STREAM_WRITE_WANT_ZWP 0b1000
55-
5655
#define UDX_DEBUG_FORCE_RELAY_SLOW_PATH 0x01
5756
#define UDX_DEBUG_FORCE_DROP_PROBES 0x02
5857
#define UDX_DEBUG_FORCE_DROP_DATA 0x04
58+
#define UDX_DEBUG_FORCE_SEND_SLOW_PATH 0x08
5959

6060
typedef struct {
6161
uint32_t seq;
@@ -164,10 +164,17 @@ typedef struct udx_queue_s {
164164
} udx_queue_t;
165165

166166
struct udx_socket_s {
167-
uv_udp_t handle;
168-
uv_poll_t io_poll;
169-
170-
udx_queue_t send_queue;
167+
uv_udp_t uv_udp; // must be first
168+
169+
// packets queued with udx_socket_send_ttl that both
170+
// 1. override the socket's TTL value and
171+
// 2. can't be sent immediately via uv_udp_try_send()
172+
// are queued by sending with uv_udp_send() and simultaneously queued here.
173+
// Then when a packet is sent if the next packet is the packet at the head of this
174+
// queue (ie the next packet has a specified TTL), then the sockets ttl is temporarily
175+
// set via uv_udp_set_ttl, and the udx_socket_send callback will restore the ttl after
176+
udx_queue_t specific_ttl_send_queue;
177+
uint64_t packets_sent_via_uv_send_queue;
171178

172179
udx_socket_t *prev;
173180
udx_socket_t *next;
@@ -177,13 +184,11 @@ struct udx_socket_s {
177184
udx_t *udx;
178185
udx_cirbuf_t *streams_by_id; // for convenience
179186

180-
bool cmsg_wanted; // include a control buffer for recvmsg
181187
int family;
182188
int status;
183189
int readers;
184190
int events;
185191
int ttl;
186-
int pending_closes;
187192

188193
void *data;
189194

@@ -197,6 +202,7 @@ struct udx_socket_s {
197202
uint64_t packets_tx;
198203

199204
int64_t packets_dropped_by_kernel;
205+
uint8_t buffer[UDX_SOCKET_PACKET_BUFFER_SIZE];
200206
};
201207

202208
#define UDX_CA_OPEN 1
@@ -211,9 +217,7 @@ struct udx_stream_s {
211217
udx_stream_t *next;
212218

213219
int status;
214-
int write_wanted;
215220
int out_of_order;
216-
int deferred_ack;
217221

218222
uint8_t ca_state;
219223
uint32_t high_seq; // seq at time of congestion, marks end of recovery
@@ -224,6 +228,19 @@ struct udx_stream_s {
224228
uint16_t retransmit_count;
225229
size_t writes_queued_bytes;
226230

231+
// next packet fields
232+
// buffer data for the next packet here until capacity is filled or send_next_packet
233+
uv_buf_t pkt_buf[UDX_MAX_COMBINED_WRITES];
234+
udx_stream_write_buf_t *pkt_wbuf[UDX_MAX_COMBINED_WRITES];
235+
uint16_t pkt_capacity;
236+
uint16_t pkt_nwbufs;
237+
uint8_t pkt_header_flag;
238+
uv_prepare_t pending_packet_prepare;
239+
240+
// true if data received before we were connected, in this
241+
// situation we must send an ack to the data after we are connected
242+
bool ack_needed;
243+
227244
bool reordering_seen;
228245

229246
udx_t *udx;
@@ -366,9 +383,6 @@ struct udx_stream_s {
366383
udx_queue_t retransmit_queue; // udx_packet_t
367384
udx_queue_t inflight_queue; // udx_packet_t
368385

369-
// udx_queue_t unordered;
370-
udx_queue_t unordered_queue;
371-
372386
uint64_t bytes_rx;
373387
uint64_t bytes_tx;
374388

@@ -383,16 +397,25 @@ struct udx_stream_s {
383397
struct udx_packet_s {
384398
uint32_t seq; // must be the first entry, so its compat with the cirbuf
385399
udx_queue_node_t queue;
400+
uv_udp_send_t uv_udp_send;
386401

387-
int ttl;
402+
udx_stream_t *stream; // for incrementing counters when packet is sent
388403

389404
bool lost;
390405
bool retransmitted;
391406
uint8_t transmits;
392407
uint8_t rto_timeouts;
393408
bool is_mtu_probe;
409+
uint8_t ref_count; // 2 references - the uv_udp_send_t callback and the on_ack callback.
410+
// when 0, packet has been acked and is not in flight. the packet may be free().
394411
uint16_t size;
395412

413+
// we store remote_addr for each packet instead of using stream->remote_addr
414+
// because we want any retransmits to go to the original host even if the user
415+
// calls udx_stream_change_remote().
416+
struct sockaddr_storage remote_addr;
417+
int remote_addr_len;
418+
396419
uint64_t time_sent;
397420

398421
// rate sampling state
@@ -401,21 +424,24 @@ struct udx_packet_s {
401424
uint32_t delivered; // #pkts delivered when packet was transmitted
402425
bool is_app_limited; // was throughput app-limited (vs network limited) at the time the packet was transmitted?
403426

404-
struct sockaddr_storage dest;
405-
int dest_len;
406-
407427
// just alloc it in place here, easier to manage
408428
uint8_t header[UDX_HEADER_SIZE];
409429
uint16_t nbufs;
410430
};
411431

412432
struct udx_socket_send_s {
413-
udx_packet_t pkt;
414-
uv_buf_t bufs[1]; // buf_t[] must be after packet_t
415-
udx_socket_t *socket;
433+
uv_udp_send_t uv_udp_send;
416434

417-
udx_socket_send_cb on_send;
435+
udx_queue_node_t queue;
436+
uint32_t ttl;
437+
// when queued for sending, the value stored here is:
438+
// socket.packets_sent_via_uv_send_queue + socket.send_queue_count
439+
// it is used to determine when this packet is at the head of the queue
440+
// so that the TTL can be adjusted
441+
uint64_t place_in_queue;
418442

443+
udx_socket_t *socket;
444+
udx_socket_send_cb on_send;
419445
void *data;
420446
};
421447

@@ -449,12 +475,13 @@ struct udx_stream_write_s {
449475
};
450476

451477
struct udx_stream_send_s {
452-
udx_packet_t pkt;
453-
uv_buf_t bufs[3]; // buf_t[] must be after packet_t
478+
uv_udp_send_t uv_udp_send;
454479
udx_stream_t *stream;
455480

456481
udx_stream_send_cb on_send;
457482

483+
uint8_t header[20];
484+
uv_buf_t bufs[2]; // [0] udx header [1] user data
458485
void *data;
459486
};
460487

src/internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
#define UDX_PACKET_FREE_ON_SEND (UDX_PACKET_TYPE_STREAM_STATE | UDX_PACKET_TYPE_STREAM_DESTROY | UDX_PACKET_TYPE_STREAM_RELAY)
88

99
#define UDX_UNUSED(x) ((void) (x))
10+
#define UDX_MAX_SACKS 50
11+
12+
#define container_of(ptr, type, member) \
13+
((type *) ((char *) (ptr) - offsetof(type, member)))
1014

1115
typedef struct {
1216
uint64_t prior_timestamp;

src/io.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,10 @@
66
int
77
udx__get_link_mtu (const struct sockaddr *s);
88

9-
ssize_t
10-
udx__sendmsg (udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, struct sockaddr *addr, int addr_len);
11-
12-
ssize_t
13-
udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int addr_len);
14-
159
int
16-
udx__udp_set_rxq_ovfl (uv_os_sock_t fd);
10+
udx__udp_set_dontfrag (uv_os_sock_t fd, bool is_ipv6);
1711

1812
int
19-
udx__udp_set_dontfrag (uv_os_sock_t fd, bool is_ipv6);
13+
udx__get_socket_ttl (udx_socket_t *socket);
2014

2115
#endif // UDX_IO_H

src/io_posix.c

Lines changed: 21 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,27 @@
1515
#include "internal.h"
1616
#include "io.h"
1717

18+
int
19+
udx__get_socket_ttl (udx_socket_t *socket) {
20+
uv_os_fd_t fd;
21+
uv_fileno((uv_handle_t *) &socket->uv_udp, &fd);
22+
23+
int ttl;
24+
socklen_t ttl_opt_size = sizeof ttl;
25+
int rc;
26+
if (socket->family == 4) {
27+
rc = getsockopt((int) fd, IPPROTO_IP, IP_TTL, &ttl, &ttl_opt_size);
28+
} else {
29+
rc = getsockopt((int) fd, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &ttl, &ttl_opt_size);
30+
}
31+
32+
if (rc == -1) {
33+
return -1;
34+
}
35+
36+
return ttl;
37+
}
38+
1839
#if defined(__APPLE__)
1940

2041
int
@@ -84,87 +105,3 @@ udx__udp_set_dontfrag (uv_os_sock_t fd, bool is_ipv6) {
84105
}
85106

86107
#endif
87-
88-
ssize_t
89-
udx__sendmsg (udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, struct sockaddr *addr, int addr_len) {
90-
ssize_t size;
91-
struct msghdr h;
92-
93-
memset(&h, 0, sizeof(h));
94-
95-
h.msg_name = addr;
96-
h.msg_namelen = addr_len;
97-
98-
h.msg_iov = (struct iovec *) bufs;
99-
h.msg_iovlen = bufs_len;
100-
101-
do {
102-
size = sendmsg(handle->io_poll.io_watcher.fd, &h, 0);
103-
} while (size == -1 && errno == EINTR);
104-
105-
return size == -1 ? uv_translate_sys_error(errno) : size;
106-
}
107-
108-
ssize_t
109-
udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int addr_len) {
110-
ssize_t size;
111-
struct msghdr h;
112-
113-
memset(&h, 0, sizeof(h));
114-
115-
h.msg_name = addr;
116-
h.msg_namelen = addr_len;
117-
118-
h.msg_iov = (struct iovec *) buf;
119-
h.msg_iovlen = 1;
120-
121-
union {
122-
struct cmsghdr align;
123-
uint8_t buf[2048];
124-
} u;
125-
126-
h.msg_control = u.buf;
127-
h.msg_controllen = sizeof(u.buf);
128-
129-
do {
130-
size = recvmsg(handle->io_poll.io_watcher.fd, &h, 0);
131-
} while (size == -1 && errno == EINTR);
132-
133-
#if defined(__linux__)
134-
135-
if (size != -1 && h.msg_controllen) {
136-
137-
// relies on SO_RXQ_OVFL being set
138-
uint32_t packets_dropped_by_kernel = 0;
139-
140-
for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&h); cmsg != NULL; cmsg = CMSG_NXTHDR(&h, cmsg)) {
141-
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_RXQ_OVFL) {
142-
memcpy(&packets_dropped_by_kernel, CMSG_DATA(cmsg), sizeof(packets_dropped_by_kernel));
143-
}
144-
}
145-
146-
if (packets_dropped_by_kernel) {
147-
uint32_t delta = packets_dropped_by_kernel - handle->packets_dropped_by_kernel;
148-
handle->udx->packets_dropped_by_kernel += delta;
149-
handle->packets_dropped_by_kernel = packets_dropped_by_kernel;
150-
}
151-
}
152-
153-
#endif
154-
155-
return size == -1 ? uv_translate_sys_error(errno) : size;
156-
}
157-
158-
#if defined(__linux__)
159-
int
160-
udx__udp_set_rxq_ovfl (uv_os_sock_t fd) {
161-
int on = 1;
162-
return setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &on, sizeof(on));
163-
}
164-
#else
165-
int
166-
udx__udp_set_rxq_ovfl (uv_os_sock_t fd) {
167-
UDX_UNUSED(fd);
168-
return -1;
169-
}
170-
#endif

0 commit comments

Comments
 (0)