Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 59 additions & 29 deletions src/slipstream_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ typedef struct st_slipstream_server_stream_ctx_t {
int pipefd[2];
uint64_t stream_id;
volatile sig_atomic_t set_active;
volatile sig_atomic_t polling_active;
} slipstream_server_stream_ctx_t;

typedef struct st_slipstream_server_ctx_t {
Expand Down Expand Up @@ -257,7 +258,18 @@ static void slipstream_server_free_stream_context(slipstream_server_ctx_t* serve
server_ctx->first_stream = stream_ctx->next_stream;
}

stream_ctx->fd = close(stream_ctx->fd);
if (stream_ctx->fd >= 0) {
close(stream_ctx->fd);
stream_ctx->fd = -1;
}
if (stream_ctx->pipefd[0] >= 0) {
close(stream_ctx->pipefd[0]);
stream_ctx->pipefd[0] = -1;
}
if (stream_ctx->pipefd[1] >= 0) {
close(stream_ctx->pipefd[1]);
stream_ctx->pipefd[1] = -1;
}

free(stream_ctx);
}
Expand Down Expand Up @@ -386,7 +398,7 @@ void* slipstream_server_poller(void* arg) {
break;
}


args->stream_ctx->polling_active = 0;
free(args);
pthread_exit(NULL);
}
Expand Down Expand Up @@ -414,12 +426,12 @@ void* slipstream_io_copy(void* arg) {
addr_len = sizeof(struct sockaddr_in6);
} else {
perror("Invalid address family");
return NULL;
goto cleanup;
}

if (connect(socket, (struct sockaddr*)&server_ctx->upstream_addr, addr_len) < 0) {
perror("connect() failed");
return NULL;
goto cleanup;
}

DBG_PRINTF("[%lu:%d] setup pipe done", stream_ctx->stream_id, stream_ctx->fd);
Expand All @@ -440,7 +452,7 @@ void* slipstream_io_copy(void* arg) {
DBG_PRINTF("[%lu:%d] read %d bytes", stream_ctx->stream_id, stream_ctx->fd, bytes_read);
if (bytes_read < 0) {
perror("recv failed");
return NULL;
goto cleanup;
} else if (bytes_read == 0) {
// End of stream - source socket closed connection
break;
Expand All @@ -453,13 +465,17 @@ void* slipstream_io_copy(void* arg) {
ssize_t bytes_written = send(socket, p, remaining, 0);
if (bytes_written < 0) {
perror("send failed");
return NULL;
goto cleanup;
}
remaining -= bytes_written;
p += bytes_written;
}
}

cleanup:
/* Don't close FDs here — free_stream_context owns them.
* Closing from this thread races with the main thread's ioctl/recv. */
free(args);
return NULL;
}

Expand Down Expand Up @@ -550,6 +566,8 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
/* Connection closed */
DBG_PRINTF("[stream_id=%d] send: closed stream", stream_ctx->stream_id);

picoquic_unlink_app_stream_ctx(cnx, stream_id);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR);
return 0;
}
Expand All @@ -558,16 +576,16 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
}

DBG_PRINTF("[stream_id=%d] send: error: %s (%d)", stream_id, strerror(errno), errno);
picoquic_unlink_app_stream_ctx(cnx, stream_id);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR);
return 0;
}
}
if (fin_or_event == picoquic_callback_stream_fin) {
DBG_PRINTF("[stream_id=%d] fin", stream_ctx->stream_id);
/* Close the local_sock fd */
close(stream_ctx->fd);
stream_ctx->fd = -1;
picoquic_unlink_app_stream_ctx(cnx, stream_id);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
}
break;
case picoquic_callback_stop_sending: /* Should not happen, treated as reset */
Expand All @@ -591,12 +609,13 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
case picoquic_callback_application_close: /* Received application close */
DBG_PRINTF("Connection closed.", NULL);
if (server_ctx != NULL) {
picoquic_network_thread_ctx_t* thread_ctx = server_ctx->thread_ctx;
slipstream_server_free_context(server_ctx);
/* Remove the application callback */
picoquic_set_callback(cnx, NULL, NULL);
picoquic_close(cnx, 0);
picoquic_wake_up_network_thread(thread_ctx);
}
/* Remove the application callback */
picoquic_set_callback(cnx, NULL, NULL);
picoquic_close(cnx, 0);
picoquic_wake_up_network_thread(server_ctx->thread_ctx);
break;
case picoquic_callback_prepare_to_send:
/* Active sending API */
Expand All @@ -609,7 +628,8 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
// DBG_PRINTF("[stream_id=%d] recv->quic_send (available %d)", stream_id, length_available);
if (ret < 0) {
DBG_PRINTF("[stream_id=%d] ioctl error: %s (%d)", stream_ctx->stream_id, strerror(errno), errno);
/* TODO: why would it return an error? */
picoquic_unlink_app_stream_ctx(cnx, stream_id);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR);
break;
}
Expand All @@ -626,26 +646,33 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
(void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 0);
DBG_PRINTF("[stream_id=%d] recv->quic_send: empty, disactivate", stream_ctx->stream_id);

slipstream_server_poller_args* args = malloc(sizeof(slipstream_server_poller_args));
args->fd = stream_ctx->fd;
args->cnx = cnx;
args->server_ctx = server_ctx;
args->stream_ctx = stream_ctx;

pthread_t thread;
if (pthread_create(&thread, NULL, slipstream_server_poller, args) != 0) {
perror("pthread_create() failed for thread1");
free(args);
}
if (!stream_ctx->polling_active) {
stream_ctx->polling_active = 1;

slipstream_server_poller_args* args = malloc(sizeof(slipstream_server_poller_args));
args->fd = stream_ctx->fd;
args->cnx = cnx;
args->server_ctx = server_ctx;
args->stream_ctx = stream_ctx;

pthread_t thread;
if (pthread_create(&thread, NULL, slipstream_server_poller, args) != 0) {
perror("pthread_create() failed for thread1");
stream_ctx->polling_active = 0;
free(args);
}
#ifdef __APPLE__
pthread_setname_np("slipstream_server_poller");
pthread_setname_np("slipstream_server_poller");
#else
pthread_setname_np(thread, "slipstream_server_poller");
pthread_setname_np(thread, "slipstream_server_poller");
#endif
pthread_detach(thread);
pthread_detach(thread);
}
}
if (bytes_read == 0) {
DBG_PRINTF("[stream_id=%d] recv: closed stream", stream_ctx->stream_id);
picoquic_unlink_app_stream_ctx(cnx, stream_id);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR);
return 0;
}
Expand All @@ -667,12 +694,15 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
// DBG_PRINTF("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read);
if (bytes_read == 0) {
DBG_PRINTF("Closed connection on sock %d on recv", stream_ctx->fd);
picoquic_unlink_app_stream_ctx(cnx, stream_id);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR);
return 0;
}
if (bytes_read < 0) {
DBG_PRINTF("recv: %s (%d)", strerror(errno), errno);
/* There should be bytes available, so a return value of 0 is an error */
picoquic_unlink_app_stream_ctx(cnx, stream_id);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR);
return 0;
}
Expand Down