diff --git a/Makefile b/Makefile index d5474bf..2aadb93 100644 --- a/Makefile +++ b/Makefile @@ -153,6 +153,8 @@ SRCS := src/main.c \ src/audio_capture.c \ src/audio_playback.c \ src/network.c \ + src/network_tcp.c \ + src/network_reconnect.c \ src/input.c \ src/crypto.c \ src/discovery.c \ @@ -173,6 +175,8 @@ endif ifdef NO_CRYPTO SRCS := $(filter-out src/crypto.c,$(SRCS)) SRCS := $(filter-out src/network.c,$(SRCS)) + SRCS := $(filter-out src/network_tcp.c,$(SRCS)) + SRCS := $(filter-out src/network_reconnect.c,$(SRCS)) SRCS += src/crypto_stub.c SRCS += src/network_stub.c endif @@ -229,7 +233,7 @@ $(TARGET): $(OBJS) # Build rstr-player tool # Note: Needs many modules for dependencies - simplified player would be better long-term -$(PLAYER): tools/rstr-player.c src/recording.c src/vaapi_decoder.c src/display_sdl2.c src/network.c src/crypto.c src/config.c src/input.c src/opus_codec.c src/audio_playback.c src/latency.c src/platform/platform_linux.c src/packet_validate.c +$(PLAYER): tools/rstr-player.c src/recording.c src/vaapi_decoder.c src/display_sdl2.c src/network.c src/network_tcp.c src/network_reconnect.c src/crypto.c src/config.c src/input.c src/opus_codec.c src/audio_playback.c src/latency.c src/platform/platform_linux.c src/packet_validate.c @echo "🔗 Building rstr-player..." @$(CC) $(CFLAGS) $^ -o $(PLAYER) $(LDFLAGS) $(LIBS) @echo "✓ Build complete: $(PLAYER)" diff --git a/include/rootstream.h b/include/rootstream.h index b69ca1d..3fb3725 100644 --- a/include/rootstream.h +++ b/include/rootstream.h @@ -306,8 +306,15 @@ typedef enum { PEER_HANDSHAKE_RECEIVED, /* Received handshake, session established */ PEER_CONNECTED, /* Fully authenticated */ PEER_DISCONNECTED, /* Lost connection */ + PEER_FAILED, /* Max reconnection attempts exceeded */ } peer_state_t; +/* Network transport types (PHASE 4) */ +typedef enum { + TRANSPORT_UDP = 1, /* UDP P2P (primary) */ + TRANSPORT_TCP = 2, /* TCP fallback */ +} transport_type_t; + typedef struct { char rootstream_code[ROOTSTREAM_CODE_MAX_LEN]; /* Peer's code */ uint8_t public_key[CRYPTO_PUBLIC_KEY_BYTES]; /* Peer's public key */ @@ -329,6 +336,12 @@ typedef struct { uint64_t last_ping; /* Last keepalive ping time (ms) */ uint8_t protocol_version; /* Peer protocol version */ uint8_t protocol_flags; /* Peer protocol flags */ + + /* Network resilience (PHASE 4) */ + transport_type_t transport; /* Current transport (UDP/TCP) */ + void *transport_priv; /* Transport-specific private data */ + void *reconnect_ctx; /* Reconnection tracking */ + uint64_t last_received; /* Last inbound packet time (ms) */ } peer_t; /* ============================================================================ @@ -692,6 +705,21 @@ int rootstream_net_handshake(rootstream_ctx_t *ctx, peer_t *peer); void rootstream_net_tick(rootstream_ctx_t *ctx); int rootstream_net_validate_packet(const uint8_t *buffer, size_t len); +/* --- Network TCP Fallback (PHASE 4) --- */ +int rootstream_net_tcp_connect(rootstream_ctx_t *ctx, peer_t *peer); +int rootstream_net_tcp_send(rootstream_ctx_t *ctx, peer_t *peer, + const uint8_t *data, size_t size); +int rootstream_net_tcp_recv(rootstream_ctx_t *ctx, peer_t *peer, + uint8_t *buffer, size_t *buffer_len); +void rootstream_net_tcp_cleanup(peer_t *peer); +bool rootstream_net_tcp_is_healthy(peer_t *peer); + +/* --- Peer Reconnection (PHASE 4) --- */ +int peer_reconnect_init(peer_t *peer); +int peer_try_reconnect(rootstream_ctx_t *ctx, peer_t *peer); +void peer_reconnect_cleanup(peer_t *peer); +void peer_reconnect_reset(peer_t *peer); + /* --- Peer Management --- */ peer_t* rootstream_add_peer(rootstream_ctx_t *ctx, const char *rootstream_code); peer_t* rootstream_find_peer(rootstream_ctx_t *ctx, const uint8_t *public_key); diff --git a/src/network.c b/src/network.c index 4a4fb4d..88f7cff 100644 --- a/src/network.c +++ b/src/network.c @@ -62,6 +62,11 @@ #define PEER_TIMEOUT_MS 5000 #define KEEPALIVE_INTERVAL_MS 1000 +/* Forward declarations */ +static int process_received_packet(rootstream_ctx_t *ctx, uint8_t *buffer, size_t recv_len, + struct sockaddr_storage *from, socklen_t fromlen, + transport_type_t transport); + static peer_t* rootstream_find_peer_by_addr(rootstream_ctx_t *ctx, const struct sockaddr_storage *addr, socklen_t addr_len) { @@ -261,6 +266,12 @@ int rootstream_net_send_encrypted(rootstream_ctx_t *ctx, peer_t *peer, return -1; } + /* Check connection health */ + if (peer->state != PEER_CONNECTED && peer->state != PEER_HANDSHAKE_RECEIVED) { + fprintf(stderr, "WARNING: Peer not fully connected, skipping send\n"); + return -1; + } + if (!peer->session.authenticated) { fprintf(stderr, "ERROR: Cannot send - peer not authenticated\n"); fprintf(stderr, "PEER: %s\n", peer->hostname); @@ -308,22 +319,44 @@ int rootstream_net_send_encrypted(rootstream_ctx_t *ctx, peer_t *peer, hdr->payload_size = cipher_len; /* MAC is included in cipher_len by crypto_encrypt_packet */ - /* Send packet */ - int sent = rs_socket_sendto(ctx->sock_fd, packet, - sizeof(packet_header_t) + cipher_len, 0, - (struct sockaddr*)&peer->addr, peer->addr_len); + /* Dispatch to transport */ + int ret = -1; + switch (peer->transport) { + case TRANSPORT_UDP: + ret = rs_socket_sendto(ctx->sock_fd, packet, + sizeof(packet_header_t) + cipher_len, 0, + (struct sockaddr*)&peer->addr, peer->addr_len); + if (ret < 0) { + int err = rs_socket_error(); + fprintf(stderr, "ERROR: UDP send failed: %s\n", rs_socket_strerror(err)); + } else { + ctx->bytes_sent += ret; + peer->last_sent = get_timestamp_ms(); + ret = 0; /* Success */ + } + break; + + case TRANSPORT_TCP: + ret = rootstream_net_tcp_send(ctx, peer, packet, sizeof(packet_header_t) + cipher_len); + break; + + default: + fprintf(stderr, "ERROR: Unknown transport type %d\n", peer->transport); + ret = -1; + } free(packet); - if (sent < 0) { - int err = rs_socket_error(); - fprintf(stderr, "ERROR: Send failed\n"); - fprintf(stderr, "REASON: %s\n", rs_socket_strerror(err)); + if (ret < 0) { + /* Transport failed, mark for reconnection */ + fprintf(stderr, "WARNING: Send failed, marking peer for reconnection\n"); + peer->state = PEER_DISCONNECTED; + if (peer->reconnect_ctx) { + peer_try_reconnect(ctx, peer); + } return -1; } - peer->last_sent = get_timestamp_ms(); - ctx->bytes_sent += sent; return 0; } @@ -347,47 +380,89 @@ int rootstream_net_recv(rootstream_ctx_t *ctx, int timeout_ms) { return -1; } - /* Poll for incoming data */ - int ret = rs_socket_poll(ctx->sock_fd, timeout_ms); + /* First, check for reconnecting peers (iterate backwards to handle removal safely) */ + for (int i = ctx->num_peers - 1; i >= 0; i--) { + peer_t *peer = &ctx->peers[i]; + + if (peer->state == PEER_DISCONNECTED && peer->reconnect_ctx) { + /* Try to reconnect */ + int ret = peer_try_reconnect(ctx, peer); + if (ret < 0) { + /* Max retries exceeded, remove peer */ + printf("INFO: Removing peer %s (max reconnection attempts)\n", peer->hostname); + rootstream_remove_peer(ctx, peer); + continue; + } + } + } + + /* Poll UDP socket for incoming data */ + int ret = rs_socket_poll(ctx->sock_fd, 0); /* Non-blocking check */ if (ret < 0) { int err = rs_socket_error(); fprintf(stderr, "ERROR: Poll failed: %s\n", rs_socket_strerror(err)); return -1; } - if (ret == 0) { - /* Timeout - no data */ - return 0; - } - - /* Receive packet */ - uint8_t buffer[MAX_PACKET_SIZE]; - struct sockaddr_storage from; - socklen_t fromlen = sizeof(from); + if (ret > 0) { + /* Receive UDP packet */ + uint8_t buffer[MAX_PACKET_SIZE]; + struct sockaddr_storage from; + socklen_t fromlen = sizeof(from); - int recv_len = rs_socket_recvfrom(ctx->sock_fd, buffer, sizeof(buffer), 0, - (struct sockaddr*)&from, &fromlen); + int recv_len = rs_socket_recvfrom(ctx->sock_fd, buffer, sizeof(buffer), 0, + (struct sockaddr*)&from, &fromlen); - if (recv_len < 0) { - int err = rs_socket_error(); - fprintf(stderr, "ERROR: Receive failed: %s\n", rs_socket_strerror(err)); - return -1; + if (recv_len >= (int)sizeof(packet_header_t)) { + if (rootstream_net_validate_packet(buffer, (size_t)recv_len) == 0) { + /* Process UDP packet (existing logic) */ + process_received_packet(ctx, buffer, recv_len, &from, fromlen, TRANSPORT_UDP); + } + } } - if (recv_len < (int)sizeof(packet_header_t)) { - fprintf(stderr, "WARNING: Packet too small (%d bytes), ignoring\n", recv_len); - return 0; - } + /* Check TCP peers for data */ + for (int i = 0; i < ctx->num_peers; i++) { + peer_t *peer = &ctx->peers[i]; + + if (peer->transport != TRANSPORT_TCP || peer->state != PEER_CONNECTED) { + continue; + } - if (rootstream_net_validate_packet(buffer, (size_t)recv_len) != 0) { - fprintf(stderr, "WARNING: Invalid packet received (%d bytes)\n", recv_len); - return 0; + uint8_t buffer[MAX_PACKET_SIZE]; + size_t buffer_len = 0; + + int tcp_ret = rootstream_net_tcp_recv(ctx, peer, buffer, &buffer_len); + + if (tcp_ret < 0) { + /* TCP receive failed, disconnect and mark for reconnect */ + fprintf(stderr, "WARNING: TCP receive failed for peer %s\n", peer->hostname); + peer->state = PEER_DISCONNECTED; + if (peer->reconnect_ctx) { + peer_try_reconnect(ctx, peer); + } + continue; + } + + if (tcp_ret > 0 && buffer_len > 0) { + /* Process TCP packet */ + process_received_packet(ctx, buffer, buffer_len, &peer->addr, peer->addr_len, TRANSPORT_TCP); + } } + return 0; +} + +/* + * Process a received packet (helper for both UDP and TCP) + */ +static int process_received_packet(rootstream_ctx_t *ctx, uint8_t *buffer, size_t recv_len, + struct sockaddr_storage *from, socklen_t fromlen, + transport_type_t transport) { packet_header_t *hdr = (packet_header_t*)buffer; /* Find or create peer */ - peer_t *peer = rootstream_find_peer_by_addr(ctx, &from, fromlen); + peer_t *peer = rootstream_find_peer_by_addr(ctx, from, fromlen); if (!peer) { if (hdr->type != PKT_HANDSHAKE) { fprintf(stderr, "WARNING: Packet from unknown peer (no handshake)\n"); @@ -400,9 +475,10 @@ int rootstream_net_recv(rootstream_ctx_t *ctx, int timeout_ms) { peer = &ctx->peers[ctx->num_peers++]; memset(peer, 0, sizeof(peer_t)); - memcpy(&peer->addr, &from, fromlen); + memcpy(&peer->addr, from, fromlen); peer->addr_len = fromlen; peer->state = PEER_CONNECTING; + peer->transport = transport; /* Set transport type */ peer->video_tx_frame_id = 1; peer->video_rx_frame_id = 0; peer->video_rx_buffer = NULL; @@ -411,8 +487,9 @@ int rootstream_net_recv(rootstream_ctx_t *ctx, int timeout_ms) { peer->video_rx_received = 0; } - /* Update last seen time */ + /* Update last seen and received time */ peer->last_seen = get_timestamp_ms(); + peer->last_received = get_timestamp_ms(); /* Handle packet based on type */ switch (hdr->type) { @@ -807,17 +884,31 @@ int rootstream_net_handshake(rootstream_ctx_t *ctx, peer_t *peer) { memcpy(packet, &hdr, sizeof(hdr)); memcpy(packet + sizeof(hdr), payload, payload_len); + /* Try UDP handshake first */ int sent = rs_socket_sendto(ctx->sock_fd, packet, sizeof(hdr) + payload_len, 0, (struct sockaddr*)&peer->addr, peer->addr_len); if (sent < 0) { int err = rs_socket_error(); - fprintf(stderr, "ERROR: Handshake send failed\n"); - fprintf(stderr, "REASON: %s\n", rs_socket_strerror(err)); + fprintf(stderr, "WARNING: UDP handshake send failed: %s\n", rs_socket_strerror(err)); + + /* Try TCP fallback */ + printf("INFO: Trying TCP fallback for handshake...\n"); + if (rootstream_net_tcp_connect(ctx, peer) == 0) { + /* Send handshake over TCP */ + if (rootstream_net_tcp_send(ctx, peer, packet, sizeof(hdr) + payload_len) == 0) { + printf("✓ TCP handshake sent\n"); + peer->state = PEER_HANDSHAKE_SENT; + peer->handshake_sent_time = get_timestamp_ms(); + return 0; + } + } + fprintf(stderr, "ERROR: Both UDP and TCP handshake failed\n"); return -1; } peer->last_sent = get_timestamp_ms(); + peer->transport = TRANSPORT_UDP; /* Mark as UDP connection */ /* Update peer state and timestamp for timeout tracking */ if (peer->state != PEER_HANDSHAKE_RECEIVED && peer->state != PEER_CONNECTED) { @@ -825,7 +916,7 @@ int rootstream_net_handshake(rootstream_ctx_t *ctx, peer_t *peer) { peer->handshake_sent_time = get_timestamp_ms(); } - printf("→ Sent handshake to peer\n"); + printf("→ Sent UDP handshake to peer\n"); return 0; } @@ -960,6 +1051,13 @@ peer_t* rootstream_add_peer(rootstream_ctx_t *ctx, const char *code) { peer->video_rx_capacity = 0; peer->video_rx_expected = 0; peer->video_rx_received = 0; + peer->transport = TRANSPORT_UDP; /* Default to UDP */ + + /* Initialize reconnection context (PHASE 4) */ + if (peer_reconnect_init(peer) < 0) { + fprintf(stderr, "WARNING: Failed to init reconnect context for peer\n"); + } + ctx->num_peers++; char fingerprint[32]; @@ -1006,6 +1104,14 @@ void rootstream_remove_peer(rootstream_ctx_t *ctx, peer_t *peer) { return; } + /* Cleanup PHASE 4 resources */ + if (peer->transport == TRANSPORT_TCP) { + rootstream_net_tcp_cleanup(peer); + } + if (peer->reconnect_ctx) { + peer_reconnect_cleanup(peer); + } + if (peer->video_rx_buffer) { if (ctx->current_frame.data == peer->video_rx_buffer) { ctx->current_frame.data = NULL; diff --git a/src/network_reconnect.c b/src/network_reconnect.c new file mode 100644 index 0000000..0e9ef90 --- /dev/null +++ b/src/network_reconnect.c @@ -0,0 +1,122 @@ +/* + * network_reconnect.c - Peer reconnection with exponential backoff + * + * Handles temporary connection failures gracefully. + * Auto-reconnects with increasing delays to avoid flooding network. + */ + +#include "../include/rootstream.h" +#include +#include +#include + +#define INITIAL_BACKOFF_MS 100 +#define MAX_BACKOFF_MS 30000 +#define MAX_RECONNECT_ATTEMPTS 10 + +typedef struct { + uint64_t last_attempt; + uint64_t next_attempt; + int attempt_count; + int backoff_ms; + bool is_reconnecting; +} reconnect_ctx_t; + +/* + * Initialize reconnection tracking for peer + */ +int peer_reconnect_init(peer_t *peer) { + if (!peer) return -1; + + reconnect_ctx_t *rc = calloc(1, sizeof(reconnect_ctx_t)); + if (!rc) return -1; + + rc->backoff_ms = INITIAL_BACKOFF_MS; + rc->attempt_count = 0; + rc->last_attempt = 0; + rc->next_attempt = 0; + rc->is_reconnecting = false; + + peer->reconnect_ctx = rc; + return 0; +} + +/* + * Try to reconnect to peer with backoff + */ +int peer_try_reconnect(rootstream_ctx_t *ctx, peer_t *peer) { + if (!ctx || !peer || !peer->reconnect_ctx) return -1; + + reconnect_ctx_t *rc = (reconnect_ctx_t *)peer->reconnect_ctx; + uint64_t now = get_timestamp_ms(); + + /* Check if enough time has passed */ + if (now < rc->next_attempt) { + return 0; /* Not time yet */ + } + + /* Try reconnect */ + printf("INFO: Reconnecting to peer %s (attempt %d/%d)...\n", + peer->hostname, rc->attempt_count + 1, MAX_RECONNECT_ATTEMPTS); + + int ret = 0; + + /* Try UDP first, then TCP */ + if (peer->transport == TRANSPORT_UDP || peer->transport == 0) { + ret = rootstream_net_handshake(ctx, peer); + if (ret < 0) { + printf("INFO: UDP failed, trying TCP fallback...\n"); + ret = rootstream_net_tcp_connect(ctx, peer); + } + } else if (peer->transport == TRANSPORT_TCP) { + ret = rootstream_net_tcp_connect(ctx, peer); + } + + rc->last_attempt = now; + rc->attempt_count++; + + if (ret == 0) { + /* Reconnection successful */ + printf("✓ Peer %s reconnected\n", peer->hostname); + rc->attempt_count = 0; + rc->backoff_ms = INITIAL_BACKOFF_MS; + rc->is_reconnecting = false; + peer->state = PEER_CONNECTED; + return 1; /* Success */ + } + + /* Reconnection failed, schedule next attempt */ + if (rc->attempt_count >= MAX_RECONNECT_ATTEMPTS) { + printf("ERROR: Max reconnection attempts reached for %s\n", peer->hostname); + peer->state = PEER_FAILED; + return -1; /* Give up */ + } + + /* Exponential backoff */ + rc->backoff_ms = (rc->backoff_ms * 2 > MAX_BACKOFF_MS) ? MAX_BACKOFF_MS : rc->backoff_ms * 2; + rc->next_attempt = now + rc->backoff_ms; + rc->is_reconnecting = true; + + printf("WARNING: Will retry peer %s in %dms\n", peer->hostname, rc->backoff_ms); + return 0; /* Still trying */ +} + +/* + * Cleanup reconnection context + */ +void peer_reconnect_cleanup(peer_t *peer) { + if (!peer || !peer->reconnect_ctx) return; + free(peer->reconnect_ctx); + peer->reconnect_ctx = NULL; +} + +/* + * Reset backoff on successful communication + */ +void peer_reconnect_reset(peer_t *peer) { + if (!peer || !peer->reconnect_ctx) return; + reconnect_ctx_t *rc = (reconnect_ctx_t *)peer->reconnect_ctx; + rc->attempt_count = 0; + rc->backoff_ms = INITIAL_BACKOFF_MS; + rc->is_reconnecting = false; +} diff --git a/src/network_stub.c b/src/network_stub.c index 521092e..010ebbf 100644 --- a/src/network_stub.c +++ b/src/network_stub.c @@ -100,3 +100,59 @@ uint64_t get_timestamp_us(void) { clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000000 + ts.tv_nsec / 1000; } + +/* PHASE 4 stubs */ +int rootstream_net_tcp_connect(rootstream_ctx_t *ctx, peer_t *peer) { + (void)ctx; + (void)peer; + fprintf(stderr, "ERROR: TCP transport unavailable (NO_CRYPTO build)\n"); + return -1; +} + +int rootstream_net_tcp_send(rootstream_ctx_t *ctx, peer_t *peer, + const uint8_t *data, size_t size) { + (void)ctx; + (void)peer; + (void)data; + (void)size; + fprintf(stderr, "ERROR: TCP transport unavailable (NO_CRYPTO build)\n"); + return -1; +} + +int rootstream_net_tcp_recv(rootstream_ctx_t *ctx, peer_t *peer, + uint8_t *buffer, size_t *buffer_len) { + (void)ctx; + (void)peer; + (void)buffer; + (void)buffer_len; + return -1; +} + +void rootstream_net_tcp_cleanup(peer_t *peer) { + (void)peer; +} + +bool rootstream_net_tcp_is_healthy(peer_t *peer) { + (void)peer; + return false; +} + +int peer_reconnect_init(peer_t *peer) { + (void)peer; + return 0; +} + +int peer_try_reconnect(rootstream_ctx_t *ctx, peer_t *peer) { + (void)ctx; + (void)peer; + return -1; +} + +void peer_reconnect_cleanup(peer_t *peer) { + (void)peer; +} + +void peer_reconnect_reset(peer_t *peer) { + (void)peer; +} + diff --git a/src/network_tcp.c b/src/network_tcp.c new file mode 100644 index 0000000..4840373 --- /dev/null +++ b/src/network_tcp.c @@ -0,0 +1,255 @@ +/* + * network_tcp.c - TCP fallback transport when UDP blocked + * + * Encrypted TCP tunnel for unreliable networks. + * Uses same encryption/packet format as UDP for compatibility. + * Slower but works everywhere TCP available. + */ + +#include "../include/rootstream.h" +#include "platform/platform.h" +#include +#include +#include +#include + +#ifndef RS_PLATFORM_WINDOWS +#include +#include +#include +#include +#include +#include +#else +#include +#include +#endif + +typedef struct { + rs_socket_t fd; /* TCP socket FD */ + struct sockaddr_in addr; + bool connected; + uint64_t connect_time; + uint8_t read_buffer[MAX_PACKET_SIZE]; + size_t read_offset; +} tcp_peer_ctx_t; + +/* + * Try to establish TCP connection to peer + */ +int rootstream_net_tcp_connect(rootstream_ctx_t *ctx, peer_t *peer) { + if (!ctx || !peer) return -1; + + rs_socket_t fd = rs_socket_create(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (fd == RS_INVALID_SOCKET) { + int err = rs_socket_error(); + fprintf(stderr, "ERROR: Cannot create TCP socket: %s\n", rs_socket_strerror(err)); + return -1; + } + + /* Set non-blocking */ +#ifndef RS_PLATFORM_WINDOWS + int flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); +#else + u_long mode = 1; + ioctlsocket(fd, FIONBIO, &mode); +#endif + + struct sockaddr_in *addr = (struct sockaddr_in *)&peer->addr; + + /* Connect (non-blocking) */ + if (connect(fd, (struct sockaddr *)addr, peer->addr_len) < 0) { + int err = rs_socket_error(); +#ifndef RS_PLATFORM_WINDOWS + if (err != EINPROGRESS) { +#else + if (err != WSAEWOULDBLOCK) { +#endif + fprintf(stderr, "ERROR: TCP connect failed: %s\n", rs_socket_strerror(err)); + rs_socket_close(fd); + return -1; + } + } + + /* Wait for connection with timeout */ +#ifndef RS_PLATFORM_WINDOWS + struct pollfd pfd = { .fd = fd, .events = POLLOUT }; + int ret = poll(&pfd, 1, 5000); /* 5 second timeout */ +#else + fd_set writefds; + FD_ZERO(&writefds); + FD_SET(fd, &writefds); + struct timeval tv = { .tv_sec = 5, .tv_usec = 0 }; + int ret = select((int)fd + 1, NULL, &writefds, NULL, &tv); +#endif + + if (ret <= 0) { + fprintf(stderr, "ERROR: TCP connect timeout\n"); + rs_socket_close(fd); + return -1; + } + + /* Check for connection errors */ + int err = 0; + socklen_t len = sizeof(err); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *)&err, &len) < 0 || err != 0) { + fprintf(stderr, "ERROR: TCP connect error: %s\n", rs_socket_strerror(err)); + rs_socket_close(fd); + return -1; + } + + /* Connection successful */ + tcp_peer_ctx_t *tcp = calloc(1, sizeof(tcp_peer_ctx_t)); + if (!tcp) { + rs_socket_close(fd); + return -1; + } + + tcp->fd = fd; + memcpy(&tcp->addr, addr, sizeof(*addr)); + tcp->connected = true; + tcp->connect_time = get_timestamp_ms(); + tcp->read_offset = 0; + + peer->transport_priv = tcp; + peer->transport = TRANSPORT_TCP; + + printf("✓ TCP connection established to %s\n", peer->hostname); + return 0; +} + +/* + * Send packet via TCP + */ +int rootstream_net_tcp_send(rootstream_ctx_t *ctx, peer_t *peer, + const uint8_t *data, size_t size) { + if (!ctx || !peer || !data || size == 0) return -1; + if (!peer->transport_priv) return -1; + + tcp_peer_ctx_t *tcp = (tcp_peer_ctx_t *)peer->transport_priv; + if (!tcp->connected) return -1; + + size_t sent = 0; + while (sent < size) { +#ifndef RS_PLATFORM_WINDOWS + ssize_t ret = send(tcp->fd, data + sent, size - sent, MSG_NOSIGNAL); +#else + ssize_t ret = send(tcp->fd, (const char *)(data + sent), (int)(size - sent), 0); +#endif + + if (ret < 0) { + int err = rs_socket_error(); +#ifndef RS_PLATFORM_WINDOWS + if (err == EAGAIN || err == EWOULDBLOCK) { +#else + if (err == WSAEWOULDBLOCK) { +#endif + /* Socket buffer full, try again later */ + break; + } else { + fprintf(stderr, "ERROR: TCP send failed: %s\n", rs_socket_strerror(err)); + tcp->connected = false; + return -1; + } + } + + sent += ret; + } + + ctx->bytes_sent += sent; + peer->last_sent = get_timestamp_ms(); + return sent == size ? 0 : -1; +} + +/* + * Receive packet via TCP with reassembly + */ +int rootstream_net_tcp_recv(rootstream_ctx_t *ctx, peer_t *peer, + uint8_t *buffer, size_t *buffer_len) { + if (!ctx || !peer || !buffer || !buffer_len) return -1; + if (!peer->transport_priv) return -1; + + tcp_peer_ctx_t *tcp = (tcp_peer_ctx_t *)peer->transport_priv; + if (!tcp->connected) return -1; + + /* Try to read more data */ +#ifndef RS_PLATFORM_WINDOWS + ssize_t ret = recv(tcp->fd, tcp->read_buffer + tcp->read_offset, + sizeof(tcp->read_buffer) - tcp->read_offset, MSG_DONTWAIT); +#else + ssize_t ret = recv(tcp->fd, (char *)(tcp->read_buffer + tcp->read_offset), + (int)(sizeof(tcp->read_buffer) - tcp->read_offset), 0); +#endif + + if (ret < 0) { + int err = rs_socket_error(); +#ifndef RS_PLATFORM_WINDOWS + if (err != EAGAIN && err != EWOULDBLOCK) { +#else + if (err != WSAEWOULDBLOCK) { +#endif + fprintf(stderr, "ERROR: TCP recv failed: %s\n", rs_socket_strerror(err)); + tcp->connected = false; + return -1; + } + return 0; /* No data available */ + } + + if (ret == 0) { + /* Connection closed */ + fprintf(stderr, "WARNING: TCP peer closed connection\n"); + tcp->connected = false; + return -1; + } + + tcp->read_offset += ret; + + /* Try to extract a complete packet */ + if (tcp->read_offset < sizeof(packet_header_t)) { + return 0; /* Need more data */ + } + + packet_header_t *hdr = (packet_header_t *)tcp->read_buffer; + size_t packet_size = sizeof(packet_header_t) + hdr->payload_size; + + if (tcp->read_offset < packet_size) { + return 0; /* Need more data */ + } + + /* We have a complete packet */ + memcpy(buffer, tcp->read_buffer, packet_size); + *buffer_len = packet_size; + + /* Shift remaining data */ + memmove(tcp->read_buffer, tcp->read_buffer + packet_size, + tcp->read_offset - packet_size); + tcp->read_offset -= packet_size; + + ctx->bytes_received += packet_size; + peer->last_received = get_timestamp_ms(); + return 1; /* Packet ready */ +} + +/* + * Cleanup TCP connection + */ +void rootstream_net_tcp_cleanup(peer_t *peer) { + if (!peer || !peer->transport_priv) return; + + tcp_peer_ctx_t *tcp = (tcp_peer_ctx_t *)peer->transport_priv; + if (tcp->fd != RS_INVALID_SOCKET) { + rs_socket_close(tcp->fd); + } + free(tcp); + peer->transport_priv = NULL; +} + +/* + * Check TCP connection health + */ +bool rootstream_net_tcp_is_healthy(peer_t *peer) { + if (!peer || !peer->transport_priv) return false; + tcp_peer_ctx_t *tcp = (tcp_peer_ctx_t *)peer->transport_priv; + return tcp->connected; +} diff --git a/src/service.c b/src/service.c index 5b8e6d2..f692b9b 100644 --- a/src/service.c +++ b/src/service.c @@ -37,6 +37,29 @@ static void service_signal_handler(int sig) { } } +/* + * Check peer health and initiate reconnection if needed (PHASE 4) + */ +static void check_peer_health(rootstream_ctx_t *ctx) { + if (!ctx) return; + + uint64_t now = get_timestamp_ms(); + for (int i = 0; i < ctx->num_peers; i++) { + peer_t *peer = &ctx->peers[i]; + + /* Check for stale connections (30 second timeout) */ + if (peer->state == PEER_CONNECTED) { + if (peer->last_received > 0 && now - peer->last_received > 30000) { + printf("WARNING: Peer %s timeout (no packets in 30s)\n", peer->hostname); + peer->state = PEER_DISCONNECTED; + if (peer->reconnect_ctx) { + peer_try_reconnect(ctx, peer); + } + } + } + } +} + /* * Daemonize process (if not running under systemd) * @@ -512,6 +535,9 @@ int service_run_host(rootstream_ctx_t *ctx) { rootstream_net_recv(ctx, 1); rootstream_net_tick(ctx); + /* Check peer health and reconnect if needed (PHASE 4) */ + check_peer_health(ctx); + /* Rate limiting */ uint32_t refresh_rate = ctx->display.refresh_rate ? ctx->display.refresh_rate : 60; usleep(1000000 / refresh_rate); @@ -671,6 +697,9 @@ int service_run_client(rootstream_ctx_t *ctx) { uint64_t recv_end_us = get_timestamp_us(); rootstream_net_tick(ctx); + /* Check peer health and reconnect if needed (PHASE 4) */ + check_peer_health(ctx); + /* Check if we received a video frame */ if (ctx->current_frame.data && ctx->current_frame.size > 0) { /* Decode frame */ diff --git a/src/tray.c b/src/tray.c index 91614bf..ed51c21 100644 --- a/src/tray.c +++ b/src/tray.c @@ -284,6 +284,10 @@ static void on_view_peers(GtkMenuItem *item, gpointer user_data) { status_icon = "○"; status_text = "Discovered"; break; + case PEER_FAILED: + status_icon = "✗"; + status_text = "Failed"; + break; default: status_icon = "✗"; status_text = "Disconnected";