Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,16 @@ typedef struct udx_queue_s {
} udx_queue_t;

struct udx_socket_s {
uv_udp_t uv_udp; // must be first
uv_udp_t uv_udp; // must be first
uv_timer_t timer; // used for draining the specific-ttl-send-queue

int nrefs; // 2 - uv_udp and uv_timer

// packets queued with udx_socket_send_ttl that both
// 1. override the socket's TTL value and
// 2. can't be sent immediately via uv_udp_try_send()
// are queued by sending with uv_udp_send() and simultaneously queued here.
// Then when a packet is sent if the next packet is the packet at the head of this
// queue (ie the next packet has a specified TTL), then the sockets ttl is temporarily
// set via uv_udp_set_ttl, and the udx_socket_send callback will restore the ttl after
// are queued here, and sent later
udx_queue_t specific_ttl_send_queue;
uint64_t packets_sent_via_uv_send_queue;

udx_socket_t *prev;
udx_socket_t *next;
Expand Down Expand Up @@ -444,13 +443,13 @@ struct udx_packet_s {
struct udx_socket_send_s {
uv_udp_send_t uv_udp_send;

udx_queue_node_t queue;
udx_queue_node_t queue; // could be singly linked actually
uint32_t ttl;
// when queued for sending, the value stored here is:
// socket.packets_sent_via_uv_send_queue + socket.send_queue_count
// it is used to determine when this packet is at the head of the queue
// so that the TTL can be adjusted
uint64_t place_in_queue;

// these are for saving the buffer + destination when sending with specific TTL on the slow path
uv_buf_t buf; // only for sending with specific ttl, buf.base = (char *) (req+1)
struct sockaddr_storage remote_addr; // only for sending with specific ttl
int remote_addr_len; // only for sending with specific ttl

udx_socket_t *socket;
udx_socket_send_cb on_send;
Expand Down
129 changes: 67 additions & 62 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,20 @@ ref_dec (udx_t *udx) {
}

static void
on_uv_close (uv_handle_t *handle) {
udx_socket_t *socket = (udx_socket_t *) handle;
udx_t *udx = socket->udx;
on_udx_socket_handle_close (uv_handle_t *handle) {

if (socket->on_close != NULL) {
socket->on_close(socket);
}
udx_socket_t *socket = handle->data;
socket->nrefs--;

ref_dec(udx);
if (socket->nrefs == 0) {
udx_t *udx = socket->udx;

if (socket->on_close != NULL) {
socket->on_close(socket);
}

ref_dec(udx);
}
}

static void
Expand Down Expand Up @@ -446,36 +451,10 @@ close_stream (udx_stream_t *stream, int err) {
static void
udx_rto_timeout (uv_timer_t *handle);

static bool
_maybe_adjust_ttl (udx_socket_t *socket) {

if (socket->specific_ttl_send_queue.len == 0) {
return false;
}

udx_socket_send_t *req = udx__queue_data(udx__queue_peek(&socket->specific_ttl_send_queue), udx_socket_send_t, queue);

if (req->place_in_queue == socket->packets_sent_via_uv_send_queue) {
uv_udp_set_ttl(&socket->uv_udp, req->ttl);
return true;
}
return false;
}

// every packet sent via uv_udp_send() must call this function as part of their callback
static void
maybe_adjust_ttl (uv_udp_t *udp) {
udx_socket_t *socket = (udx_socket_t *) udp; // todo: use offsetof instead?
socket->packets_sent_via_uv_send_queue++;

_maybe_adjust_ttl(socket);
}

// used to free simple (ack, probe, and relay) memory
// stream-write, stream-send and stream-destroy packets have their own callbacks
void
on_packet_send_slow (uv_udp_send_t *req, int status) {
maybe_adjust_ttl(req->handle);

UDX_UNUSED(status);
free(req);
Expand Down Expand Up @@ -670,7 +649,6 @@ on_stream_data_write (uv_udp_send_t *send, int status) {
debug_printf("sendmsg: %s\n", uv_strerror(status));
}

maybe_adjust_ttl(send->handle); // send is freed with packet
deref_packet(pkt);
}

Expand Down Expand Up @@ -870,7 +848,7 @@ send_new_packet (udx_stream_t *stream, int probe_type) {

if (first_alloc) {
pkt->bufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0]));
pkt->wbufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->wbufs[0]));
pkt->wbufs = malloc((pkt->nwbufs_capacity) * sizeof(pkt->wbufs[0]));
memcpy(pkt->bufs, pkt->buf_sml, sizeof(pkt->buf_sml));
memcpy(pkt->wbufs, pkt->wbuf_sml, sizeof(pkt->wbuf_sml));
} else {
Expand Down Expand Up @@ -1871,21 +1849,20 @@ udx_socket_init (udx_t *udx, udx_socket_t *socket, udx_socket_close_cb cb) {
socket->on_recv = NULL;
socket->on_close = cb;

socket->bytes_rx = 0;
socket->bytes_tx = 0;
socket->packets_rx = 0;
socket->packets_tx = 0;

socket->packets_dropped_by_kernel = -1;
uv_udp_t *handle = &socket->uv_udp;

// Asserting all the errors here as it massively simplifies error handling.
// In practice these will never fail.

int err = uv_udp_init(udx->loop, handle);
int err = uv_udp_init(udx->loop, &socket->uv_udp);
assert(err == 0);
socket->uv_udp.data = socket;

err = uv_timer_init(udx->loop, &socket->timer);
assert(err == 0);
socket->timer.data = socket;

handle->data = socket;
socket->nrefs = 2;

return err;
}
Expand Down Expand Up @@ -2039,32 +2016,56 @@ udx_socket_send (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bu

static void
on_socket_send_slow (uv_udp_send_t *_req, int status) {
udx_socket_send_t *req = (udx_socket_send_t *) ((char *) _req - offsetof(udx_socket_send_t, uv_udp_send));
udx_socket_send_t *req = container_of(_req, udx_socket_send_t, uv_udp_send);

udx_socket_t *socket = req->socket;
// 1. if packet was sent with specifc ttl, remove it from queue and reset ttl
if (req->ttl) {
udx_socket_send_t *removed = udx__queue_data(udx__queue_shift(&socket->specific_ttl_send_queue), udx_socket_send_t, queue);
assert(removed == req);
// restore ttl after sending
uv_udp_set_ttl(&socket->uv_udp, socket->ttl);
}

// 2. if next packet is also a specific ttl it will be re-set here
maybe_adjust_ttl(_req->handle);
if (req->on_send) {
req->on_send(req, status);
}
}

static void
retry_send_specific_ttl (uv_timer_t *timer) {
udx_socket_t *socket = timer->data;
debug_printf("retry sending specific ttl, qlen=%u\n", socket->specific_ttl_send_queue.len);

while (socket->specific_ttl_send_queue.len) {
udx_socket_send_t *req = udx__queue_data(udx__queue_peek(&socket->specific_ttl_send_queue), udx_socket_send_t, queue);
uint32_t ttl = req->ttl;
uv_udp_set_ttl(&socket->uv_udp, ttl);

int err = uv_udp_try_send(&socket->uv_udp, &req->buf, 1, (struct sockaddr *) &req->remote_addr);

if (err == UV_EAGAIN) {
break;
} else {
req->on_send(req, err >= 0 ? 0 : err);
}

udx__queue_shift(&socket->specific_ttl_send_queue);
}

uv_udp_set_ttl(&socket->uv_udp, socket->ttl);

if (socket->specific_ttl_send_queue.len > 0) {
uv_timer_start(&socket->timer, retry_send_specific_ttl, 1, 0);
} else {
uv_timer_stop(&socket->timer);
}
}

int
udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *dest, int ttl, udx_socket_send_cb cb) {
if (ttl < 0 /* 0 is "default" */ || ttl > 255) return UV_EINVAL;
if (dest->sa_family != AF_INET && dest->sa_family != AF_INET6) return UV_EINVAL;

req->ttl = ttl;
req->on_send = cb;
req->socket = socket;

req->buf = bufs[0];
req->remote_addr_len = dest->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
memcpy(&req->remote_addr, dest, req->remote_addr_len);

struct sockaddr_in6 dest6;
if (socket->family == 6 && dest->sa_family == AF_INET) {
memset(&dest6, 0, sizeof(dest6));
Expand Down Expand Up @@ -2098,15 +2099,17 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_
}

if (err == UV_EAGAIN) {
// slow path
// slow path (1) with specific TTL
if (ttl) {
req->place_in_queue = socket->packets_sent_via_uv_send_queue + socket->uv_udp.send_queue_count;
udx__queue_tail(&socket->specific_ttl_send_queue, &req->queue);
if (!uv_is_active((uv_handle_t *) &socket->timer)) {
uv_timer_start(&socket->timer, retry_send_specific_ttl, 1, 0);
}
err = 0;
} else {
// slow path (2) - non-specific TTL
err = uv_udp_send(&req->uv_udp_send, &socket->uv_udp, bufs, bufs_len, dest, on_socket_send_slow);
}

err = uv_udp_send(&req->uv_udp_send, &socket->uv_udp, bufs, bufs_len, dest, on_socket_send_slow);
_maybe_adjust_ttl(socket); // edge case: queue was empty

return err;
}

Expand Down Expand Up @@ -2139,7 +2142,9 @@ udx_socket_close (udx_socket_t *socket) {

socket->status |= UDX_SOCKET_CLOSED;

uv_close((uv_handle_t *) &socket->uv_udp, on_uv_close);
uv_close((uv_handle_t *) &socket->uv_udp, on_udx_socket_handle_close);
uv_timer_stop(&socket->timer);
uv_close((uv_handle_t *) &socket->timer, on_udx_socket_handle_close);

udx_t *udx = socket->udx;
udx__link_remove(udx->sockets, socket);
Expand Down
22 changes: 15 additions & 7 deletions test/socket-send-recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct {
int next_ttl;
bool send_called;
bool recv_called;
} tests[NTESTS] = {{.string = "one", .ttl = 0, .next_ttl = 10}, {.string = "two", .ttl = 10, .next_ttl = 20}, {.string = "three", .ttl = 20, .next_ttl = 0}, {.string = "four", .ttl = 0, .next_ttl = 0}};
} tests[NTESTS] = {{.string = "one", .ttl = 0}, {.string = "two", .ttl = 10}, {.string = "three", .ttl = 20}, {.string = "four", .ttl = 0}};

int nrecv_called;

Expand All @@ -35,14 +35,19 @@ on_send (udx_socket_send_t *r, int status) {

for (i = 0; i < NTESTS; i++) {
if (r == &tests[i].req) {
int wanted_ttl = tests[i].next_ttl ?: bsock.ttl; /* UDX_DEFAULT_TTL */
int wanted_ttl = tests[i].ttl ?: bsock.ttl; /* UDX_DEFAULT_TTL */

tests[i].send_called = true;

// assert(ttl == wanted_ttl);
assert(ttl == wanted_ttl);
}
}
}
int nclose;
void
on_socket_close (udx_socket_t *socket) {
nclose++;
}

void
on_recv (udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from) {
Expand All @@ -58,7 +63,8 @@ on_recv (udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const stru
}

if (++nrecv_called == 4) {
uv_stop(&loop);
udx_socket_close(&asock);
udx_socket_close(&bsock);
}
}

Expand All @@ -71,10 +77,10 @@ main () {
e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &asock, NULL);
e = udx_socket_init(&udx, &asock, on_socket_close);
assert(e == 0);

e = udx_socket_init(&udx, &bsock, NULL);
e = udx_socket_init(&udx, &bsock, on_socket_close);
assert(e == 0);

struct sockaddr_in baddr;
Expand Down Expand Up @@ -103,7 +109,9 @@ main () {
}
}

uv_run(&loop, UV_RUN_DEFAULT);
int rc = uv_run(&loop, UV_RUN_DEFAULT);

assert(rc == 0);

for (int i = 0; i < NTESTS; i++) {
assert(tests[i].send_called);
Expand Down