Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ struct udx_packet_s {

udx_stream_t *stream; // for incrementing counters when packet is sent

bool cancelled; // true if the stream was closed while this packet is in the uv_udp_send queue
// immediateyl call on_ack(UV_ECANCELLED) in the callback
bool lost;
bool retransmitted;
uint8_t transmits;
Expand Down
72 changes: 45 additions & 27 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,25 +200,11 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {
}
}

static void
deref_packet (udx_packet_t *pkt) {
if (--pkt->ref_count == 0) {
if (pkt->bufs != &pkt->buf_sml[0]) {
free(pkt->bufs);
free(pkt->wbufs);
}
free(pkt);
}
}

static void
cancel_packet (udx_packet_t *pkt) {
uv_buf_t *bufs = pkt->bufs;
udx_stream_write_buf_t **wbufs = pkt->wbufs;

for (int i = 0; i < pkt->nwbufs; i++) {
size_t buf_len = bufs[i + 1].len;
udx_stream_write_buf_t *wbuf = wbufs[i];
size_t buf_len = pkt->bufs[i + 1].len;
udx_stream_write_buf_t *wbuf = pkt->wbufs[i];
on_bytes_acked(wbuf, buf_len, true);

// todo: move into on_bytes_acked itself
Expand All @@ -228,8 +214,20 @@ cancel_packet (udx_packet_t *pkt) {
write->on_ack(write, UV_ECANCELED, 0);
}
}
}

deref_packet(pkt);
static void
deref_packet (udx_packet_t *pkt) {
if (--pkt->ref_count == 0) {
if (pkt->cancelled) {
cancel_packet(pkt);
}
if (pkt->bufs != &pkt->buf_sml[0]) {
free(pkt->bufs);
free(pkt->wbufs);
}
free(pkt);
}
}

static void
Expand All @@ -242,15 +240,16 @@ clear_outgoing_packets (udx_stream_t *stream) {
if (stream->pkt) {
assert(stream->pkt->ref_count == 1);
cancel_packet(stream->pkt);
deref_packet(stream->pkt);
}

// We should make sure all existing packets do not send, and notify the user that they failed
for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) {
udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_remove(&(stream->outgoing), seq);

if (pkt == NULL) continue;

cancel_packet(pkt);
pkt->cancelled = true;
deref_packet(pkt);
}

while (stream->write_queue.len > 0) {
Expand Down Expand Up @@ -310,8 +309,9 @@ udx_write_header (uint8_t header[20], udx_stream_t *stream, int type) {
// returns 1 on success, zero if packet can't be promoted to a probe packet
static int
mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {

assert(wanted_size > pkt->size);
if (wanted_size > pkt->size) {
return 0;
}

// cannot probeify a packet with 1) no data 2) already has padding
if (pkt->nwbufs < 1 || pkt->header[3] != 0) {
Expand Down Expand Up @@ -370,8 +370,19 @@ finalize_maybe (uv_handle_t *timer) {
// 2. if you call this on the send path, you must immediately return from
// send_stream_packets

static int
static void
close_stream_internal (udx_stream_t *stream, int err);

void
close_stream (udx_stream_t *stream, int err) {
if (stream->status & UDX_STREAM_DESTROYING) {
return;
}
close_stream_internal(stream, err);
}

void
close_stream_internal (udx_stream_t *stream, int err) {
assert((stream->status & UDX_STREAM_CLOSED) == 0);
stream->status |= UDX_STREAM_CLOSED;
stream->status &= ~UDX_STREAM_CONNECTED;
Expand Down Expand Up @@ -441,8 +452,6 @@ close_stream (udx_stream_t *stream, int err) {
if (udx->teardown && socket != NULL && socket->streams == NULL) {
udx_socket_close(socket);
}

return 1;
}

static void
Expand Down Expand Up @@ -873,7 +882,7 @@ send_new_packet (udx_stream_t *stream, int probe_type) {

if (first_alloc) {
pkt->bufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0]));
pkt->wbufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->wbufs[0]));
pkt->wbufs = malloc((pkt->nwbufs_capacity) * sizeof(pkt->wbufs[0]));
memcpy(pkt->bufs, pkt->buf_sml, sizeof(pkt->buf_sml));
memcpy(pkt->wbufs, pkt->wbuf_sml, sizeof(pkt->wbuf_sml));
} else {
Expand Down Expand Up @@ -1798,12 +1807,21 @@ static void
on_uv_udp_recv (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
if (nread == 0 && addr == NULL) return;

if (nread < 0) {
debug_printf("udx: uv_udp_recv err=%s\n", uv_strerror(nread));
assert(nread != UV_EBADF);
assert(nread != UV_ENOTSOCK);
assert(nread != UV_EINVAL);
assert(nread != UV_EFAULT);
return;
}

udx_socket_t *socket = handle->data; // todo: cast instead, save a dereference ?

assert(!(socket->status & UDX_SOCKET_CLOSED));

if (flags & UV_UDP_PARTIAL) {
assert(false && "todo: log error for large messages?");
debug_printf("udx: uv_udp_recv received partial packet\n");
}

assert((size_t) nread <= buf->len);
Expand Down Expand Up @@ -2681,7 +2699,7 @@ stream_on_destroy_send (udx_stream_t *stream) {
udx->packets_tx++;
udx->bytes_tx += UDX_HEADER_SIZE;

close_stream(stream, 0);
close_stream_internal(stream, 0);
}

static void
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ list(APPEND tests
socket-send-recv-dualstack
socket-send-recv-ipv6
stream-destroy
stream-destroy-slowpath
stream-destroy-before-connect
stream-preconnect
stream-preconnect-same-socket
Expand Down
70 changes: 70 additions & 0 deletions test/stream-destroy-slowpath.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>

#include "../include/udx.h"

int
close_stream (udx_stream_t *stream, int status);

uv_loop_t loop;
udx_t udx;
udx_socket_t sock;

bool close_called = false;

void
on_close (udx_stream_t *handle, int status) {
assert(status == 0);

int e = udx_socket_close(&sock);

assert(e == 0);

close_called = true;
}

void
ack_cb (udx_stream_write_t *req, int status, int unordered) {

printf("ack status=%d unordered=%d\n", status, unordered);
}

int
main () {
int e;

uv_loop_init(&loop);

e = udx_init(&loop, &udx, NULL);
assert(e == 0);

e = udx_socket_init(&udx, &sock, NULL);
assert(e == 0);

struct sockaddr_in addr;
uv_ip4_addr("127.0.0.1", 8081, &addr);
e = udx_socket_bind(&sock, (struct sockaddr *) &addr, 0);
assert(e == 0);

udx_stream_t stream;
e = udx_stream_init(&udx, &stream, 1, on_close, NULL);
assert(e == 0);

e = udx_stream_connect(&stream, &sock, 2, (struct sockaddr *) &addr);
assert(e == 0);

uv_buf_t buf = uv_buf_init("hello", 5);
udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1));

udx_stream_write(req, &stream, &buf, 1, ack_cb);
e = udx_stream_destroy(&stream);
close_stream(&stream, 0);

uv_run(&loop, UV_RUN_DEFAULT);
free(req);

assert(close_called);

return 0;
}
Loading