diff --git a/src/slipstream_server.c b/src/slipstream_server.c index d4f4b07..4693213 100644 --- a/src/slipstream_server.c +++ b/src/slipstream_server.c @@ -195,6 +195,7 @@ typedef struct st_slipstream_server_stream_ctx_t { int pipefd[2]; uint64_t stream_id; volatile sig_atomic_t set_active; + volatile sig_atomic_t polling_active; } slipstream_server_stream_ctx_t; typedef struct st_slipstream_server_ctx_t { @@ -257,7 +258,18 @@ static void slipstream_server_free_stream_context(slipstream_server_ctx_t* serve server_ctx->first_stream = stream_ctx->next_stream; } - stream_ctx->fd = close(stream_ctx->fd); + if (stream_ctx->fd >= 0) { + close(stream_ctx->fd); + stream_ctx->fd = -1; + } + if (stream_ctx->pipefd[0] >= 0) { + close(stream_ctx->pipefd[0]); + stream_ctx->pipefd[0] = -1; + } + if (stream_ctx->pipefd[1] >= 0) { + close(stream_ctx->pipefd[1]); + stream_ctx->pipefd[1] = -1; + } free(stream_ctx); } @@ -386,7 +398,7 @@ void* slipstream_server_poller(void* arg) { break; } - + args->stream_ctx->polling_active = 0; free(args); pthread_exit(NULL); } @@ -414,12 +426,12 @@ void* slipstream_io_copy(void* arg) { addr_len = sizeof(struct sockaddr_in6); } else { perror("Invalid address family"); - return NULL; + goto cleanup; } if (connect(socket, (struct sockaddr*)&server_ctx->upstream_addr, addr_len) < 0) { perror("connect() failed"); - return NULL; + goto cleanup; } DBG_PRINTF("[%lu:%d] setup pipe done", stream_ctx->stream_id, stream_ctx->fd); @@ -440,7 +452,7 @@ void* slipstream_io_copy(void* arg) { DBG_PRINTF("[%lu:%d] read %d bytes", stream_ctx->stream_id, stream_ctx->fd, bytes_read); if (bytes_read < 0) { perror("recv failed"); - return NULL; + goto cleanup; } else if (bytes_read == 0) { // End of stream - source socket closed connection break; @@ -453,13 +465,17 @@ void* slipstream_io_copy(void* arg) { ssize_t bytes_written = send(socket, p, remaining, 0); if (bytes_written < 0) { perror("send failed"); - return NULL; + goto cleanup; } remaining -= bytes_written; p += bytes_written; } } +cleanup: + /* Don't close FDs here — free_stream_context owns them. + * Closing from this thread races with the main thread's ioctl/recv. */ + free(args); return NULL; } @@ -550,6 +566,8 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, /* Connection closed */ DBG_PRINTF("[stream_id=%d] send: closed stream", stream_ctx->stream_id); + picoquic_unlink_app_stream_ctx(cnx, stream_id); + slipstream_server_free_stream_context(server_ctx, stream_ctx); (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); return 0; } @@ -558,16 +576,16 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, } DBG_PRINTF("[stream_id=%d] send: error: %s (%d)", stream_id, strerror(errno), errno); + picoquic_unlink_app_stream_ctx(cnx, stream_id); + slipstream_server_free_stream_context(server_ctx, stream_ctx); (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); return 0; } } if (fin_or_event == picoquic_callback_stream_fin) { DBG_PRINTF("[stream_id=%d] fin", stream_ctx->stream_id); - /* Close the local_sock fd */ - close(stream_ctx->fd); - stream_ctx->fd = -1; picoquic_unlink_app_stream_ctx(cnx, stream_id); + slipstream_server_free_stream_context(server_ctx, stream_ctx); } break; case picoquic_callback_stop_sending: /* Should not happen, treated as reset */ @@ -591,12 +609,13 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, case picoquic_callback_application_close: /* Received application close */ DBG_PRINTF("Connection closed.", NULL); if (server_ctx != NULL) { + picoquic_network_thread_ctx_t* thread_ctx = server_ctx->thread_ctx; slipstream_server_free_context(server_ctx); + /* Remove the application callback */ + picoquic_set_callback(cnx, NULL, NULL); + picoquic_close(cnx, 0); + picoquic_wake_up_network_thread(thread_ctx); } - /* Remove the application callback */ - picoquic_set_callback(cnx, NULL, NULL); - picoquic_close(cnx, 0); - picoquic_wake_up_network_thread(server_ctx->thread_ctx); break; case picoquic_callback_prepare_to_send: /* Active sending API */ @@ -609,7 +628,8 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, // DBG_PRINTF("[stream_id=%d] recv->quic_send (available %d)", stream_id, length_available); if (ret < 0) { DBG_PRINTF("[stream_id=%d] ioctl error: %s (%d)", stream_ctx->stream_id, strerror(errno), errno); - /* TODO: why would it return an error? */ + picoquic_unlink_app_stream_ctx(cnx, stream_id); + slipstream_server_free_stream_context(server_ctx, stream_ctx); (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); break; } @@ -626,26 +646,33 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, (void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 0); DBG_PRINTF("[stream_id=%d] recv->quic_send: empty, disactivate", stream_ctx->stream_id); - slipstream_server_poller_args* args = malloc(sizeof(slipstream_server_poller_args)); - args->fd = stream_ctx->fd; - args->cnx = cnx; - args->server_ctx = server_ctx; - args->stream_ctx = stream_ctx; - - pthread_t thread; - if (pthread_create(&thread, NULL, slipstream_server_poller, args) != 0) { - perror("pthread_create() failed for thread1"); - free(args); - } + if (!stream_ctx->polling_active) { + stream_ctx->polling_active = 1; + + slipstream_server_poller_args* args = malloc(sizeof(slipstream_server_poller_args)); + args->fd = stream_ctx->fd; + args->cnx = cnx; + args->server_ctx = server_ctx; + args->stream_ctx = stream_ctx; + + pthread_t thread; + if (pthread_create(&thread, NULL, slipstream_server_poller, args) != 0) { + perror("pthread_create() failed for thread1"); + stream_ctx->polling_active = 0; + free(args); + } #ifdef __APPLE__ - pthread_setname_np("slipstream_server_poller"); + pthread_setname_np("slipstream_server_poller"); #else - pthread_setname_np(thread, "slipstream_server_poller"); + pthread_setname_np(thread, "slipstream_server_poller"); #endif - pthread_detach(thread); + pthread_detach(thread); + } } if (bytes_read == 0) { DBG_PRINTF("[stream_id=%d] recv: closed stream", stream_ctx->stream_id); + picoquic_unlink_app_stream_ctx(cnx, stream_id); + slipstream_server_free_stream_context(server_ctx, stream_ctx); (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); return 0; } @@ -667,12 +694,15 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, // DBG_PRINTF("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read); if (bytes_read == 0) { DBG_PRINTF("Closed connection on sock %d on recv", stream_ctx->fd); + picoquic_unlink_app_stream_ctx(cnx, stream_id); + slipstream_server_free_stream_context(server_ctx, stream_ctx); (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); return 0; } if (bytes_read < 0) { DBG_PRINTF("recv: %s (%d)", strerror(errno), errno); - /* There should be bytes available, so a return value of 0 is an error */ + picoquic_unlink_app_stream_ctx(cnx, stream_id); + slipstream_server_free_stream_context(server_ctx, stream_ctx); (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); return 0; }