Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ SRCS := src/main.c \
src/audio_capture.c \
src/audio_playback.c \
src/network.c \
src/network_tcp.c \
src/network_reconnect.c \
src/input.c \
src/crypto.c \
src/discovery.c \
Expand All @@ -173,6 +175,8 @@ endif
ifdef NO_CRYPTO
SRCS := $(filter-out src/crypto.c,$(SRCS))
SRCS := $(filter-out src/network.c,$(SRCS))
SRCS := $(filter-out src/network_tcp.c,$(SRCS))
SRCS := $(filter-out src/network_reconnect.c,$(SRCS))
SRCS += src/crypto_stub.c
SRCS += src/network_stub.c
endif
Expand Down Expand Up @@ -229,7 +233,7 @@ $(TARGET): $(OBJS)

# Build rstr-player tool
# Note: Needs many modules for dependencies - simplified player would be better long-term
$(PLAYER): tools/rstr-player.c src/recording.c src/vaapi_decoder.c src/display_sdl2.c src/network.c src/crypto.c src/config.c src/input.c src/opus_codec.c src/audio_playback.c src/latency.c src/platform/platform_linux.c src/packet_validate.c
$(PLAYER): tools/rstr-player.c src/recording.c src/vaapi_decoder.c src/display_sdl2.c src/network.c src/network_tcp.c src/network_reconnect.c src/crypto.c src/config.c src/input.c src/opus_codec.c src/audio_playback.c src/latency.c src/platform/platform_linux.c src/packet_validate.c
@echo "🔗 Building rstr-player..."
@$(CC) $(CFLAGS) $^ -o $(PLAYER) $(LDFLAGS) $(LIBS)
@echo "✓ Build complete: $(PLAYER)"
Expand Down
28 changes: 28 additions & 0 deletions include/rootstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,15 @@ typedef enum {
PEER_HANDSHAKE_RECEIVED, /* Received handshake, session established */
PEER_CONNECTED, /* Fully authenticated */
PEER_DISCONNECTED, /* Lost connection */
PEER_FAILED, /* Max reconnection attempts exceeded */
} peer_state_t;

/* Network transport types (PHASE 4) */
typedef enum {
TRANSPORT_UDP = 1, /* UDP P2P (primary) */
TRANSPORT_TCP = 2, /* TCP fallback */
} transport_type_t;

typedef struct {
char rootstream_code[ROOTSTREAM_CODE_MAX_LEN]; /* Peer's code */
uint8_t public_key[CRYPTO_PUBLIC_KEY_BYTES]; /* Peer's public key */
Expand All @@ -329,6 +336,12 @@ typedef struct {
uint64_t last_ping; /* Last keepalive ping time (ms) */
uint8_t protocol_version; /* Peer protocol version */
uint8_t protocol_flags; /* Peer protocol flags */

/* Network resilience (PHASE 4) */
transport_type_t transport; /* Current transport (UDP/TCP) */
void *transport_priv; /* Transport-specific private data */
void *reconnect_ctx; /* Reconnection tracking */
uint64_t last_received; /* Last inbound packet time (ms) */
} peer_t;

/* ============================================================================
Expand Down Expand Up @@ -692,6 +705,21 @@ int rootstream_net_handshake(rootstream_ctx_t *ctx, peer_t *peer);
void rootstream_net_tick(rootstream_ctx_t *ctx);
int rootstream_net_validate_packet(const uint8_t *buffer, size_t len);

/* --- Network TCP Fallback (PHASE 4) --- */
int rootstream_net_tcp_connect(rootstream_ctx_t *ctx, peer_t *peer);
int rootstream_net_tcp_send(rootstream_ctx_t *ctx, peer_t *peer,
const uint8_t *data, size_t size);
int rootstream_net_tcp_recv(rootstream_ctx_t *ctx, peer_t *peer,
uint8_t *buffer, size_t *buffer_len);
void rootstream_net_tcp_cleanup(peer_t *peer);
bool rootstream_net_tcp_is_healthy(peer_t *peer);

/* --- Peer Reconnection (PHASE 4) --- */
int peer_reconnect_init(peer_t *peer);
int peer_try_reconnect(rootstream_ctx_t *ctx, peer_t *peer);
void peer_reconnect_cleanup(peer_t *peer);
void peer_reconnect_reset(peer_t *peer);

/* --- Peer Management --- */
peer_t* rootstream_add_peer(rootstream_ctx_t *ctx, const char *rootstream_code);
peer_t* rootstream_find_peer(rootstream_ctx_t *ctx, const uint8_t *public_key);
Expand Down
186 changes: 146 additions & 40 deletions src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
#define PEER_TIMEOUT_MS 5000
#define KEEPALIVE_INTERVAL_MS 1000

/* Forward declarations */
static int process_received_packet(rootstream_ctx_t *ctx, uint8_t *buffer, size_t recv_len,
struct sockaddr_storage *from, socklen_t fromlen,
transport_type_t transport);

static peer_t* rootstream_find_peer_by_addr(rootstream_ctx_t *ctx,
const struct sockaddr_storage *addr,
socklen_t addr_len) {
Expand Down Expand Up @@ -261,6 +266,12 @@ int rootstream_net_send_encrypted(rootstream_ctx_t *ctx, peer_t *peer,
return -1;
}

/* Check connection health */
if (peer->state != PEER_CONNECTED && peer->state != PEER_HANDSHAKE_RECEIVED) {
fprintf(stderr, "WARNING: Peer not fully connected, skipping send\n");
return -1;
}

if (!peer->session.authenticated) {
fprintf(stderr, "ERROR: Cannot send - peer not authenticated\n");
fprintf(stderr, "PEER: %s\n", peer->hostname);
Expand Down Expand Up @@ -308,22 +319,44 @@ int rootstream_net_send_encrypted(rootstream_ctx_t *ctx, peer_t *peer,
hdr->payload_size = cipher_len;
/* MAC is included in cipher_len by crypto_encrypt_packet */

/* Send packet */
int sent = rs_socket_sendto(ctx->sock_fd, packet,
sizeof(packet_header_t) + cipher_len, 0,
(struct sockaddr*)&peer->addr, peer->addr_len);
/* Dispatch to transport */
int ret = -1;
switch (peer->transport) {
case TRANSPORT_UDP:
ret = rs_socket_sendto(ctx->sock_fd, packet,
sizeof(packet_header_t) + cipher_len, 0,
(struct sockaddr*)&peer->addr, peer->addr_len);
if (ret < 0) {
int err = rs_socket_error();
fprintf(stderr, "ERROR: UDP send failed: %s\n", rs_socket_strerror(err));
} else {
ctx->bytes_sent += ret;
peer->last_sent = get_timestamp_ms();
ret = 0; /* Success */
}
break;

case TRANSPORT_TCP:
ret = rootstream_net_tcp_send(ctx, peer, packet, sizeof(packet_header_t) + cipher_len);
break;

default:
fprintf(stderr, "ERROR: Unknown transport type %d\n", peer->transport);
ret = -1;
}

free(packet);

if (sent < 0) {
int err = rs_socket_error();
fprintf(stderr, "ERROR: Send failed\n");
fprintf(stderr, "REASON: %s\n", rs_socket_strerror(err));
if (ret < 0) {
/* Transport failed, mark for reconnection */
fprintf(stderr, "WARNING: Send failed, marking peer for reconnection\n");
peer->state = PEER_DISCONNECTED;
if (peer->reconnect_ctx) {
peer_try_reconnect(ctx, peer);
}
return -1;
}

peer->last_sent = get_timestamp_ms();
ctx->bytes_sent += sent;
return 0;
}

Expand All @@ -347,47 +380,89 @@ int rootstream_net_recv(rootstream_ctx_t *ctx, int timeout_ms) {
return -1;
}

/* Poll for incoming data */
int ret = rs_socket_poll(ctx->sock_fd, timeout_ms);
/* First, check for reconnecting peers (iterate backwards to handle removal safely) */
for (int i = ctx->num_peers - 1; i >= 0; i--) {
peer_t *peer = &ctx->peers[i];

if (peer->state == PEER_DISCONNECTED && peer->reconnect_ctx) {
/* Try to reconnect */
int ret = peer_try_reconnect(ctx, peer);
if (ret < 0) {
/* Max retries exceeded, remove peer */
printf("INFO: Removing peer %s (max reconnection attempts)\n", peer->hostname);
rootstream_remove_peer(ctx, peer);
continue;
}
}
}

/* Poll UDP socket for incoming data */
int ret = rs_socket_poll(ctx->sock_fd, 0); /* Non-blocking check */
if (ret < 0) {
int err = rs_socket_error();
fprintf(stderr, "ERROR: Poll failed: %s\n", rs_socket_strerror(err));
return -1;
}

if (ret == 0) {
/* Timeout - no data */
return 0;
}

/* Receive packet */
uint8_t buffer[MAX_PACKET_SIZE];
struct sockaddr_storage from;
socklen_t fromlen = sizeof(from);
if (ret > 0) {
/* Receive UDP packet */
uint8_t buffer[MAX_PACKET_SIZE];
struct sockaddr_storage from;
socklen_t fromlen = sizeof(from);

int recv_len = rs_socket_recvfrom(ctx->sock_fd, buffer, sizeof(buffer), 0,
(struct sockaddr*)&from, &fromlen);
int recv_len = rs_socket_recvfrom(ctx->sock_fd, buffer, sizeof(buffer), 0,
(struct sockaddr*)&from, &fromlen);

if (recv_len < 0) {
int err = rs_socket_error();
fprintf(stderr, "ERROR: Receive failed: %s\n", rs_socket_strerror(err));
return -1;
if (recv_len >= (int)sizeof(packet_header_t)) {
if (rootstream_net_validate_packet(buffer, (size_t)recv_len) == 0) {
/* Process UDP packet (existing logic) */
process_received_packet(ctx, buffer, recv_len, &from, fromlen, TRANSPORT_UDP);
}
}
}

if (recv_len < (int)sizeof(packet_header_t)) {
fprintf(stderr, "WARNING: Packet too small (%d bytes), ignoring\n", recv_len);
return 0;
}
/* Check TCP peers for data */
for (int i = 0; i < ctx->num_peers; i++) {
peer_t *peer = &ctx->peers[i];

if (peer->transport != TRANSPORT_TCP || peer->state != PEER_CONNECTED) {
continue;
}

if (rootstream_net_validate_packet(buffer, (size_t)recv_len) != 0) {
fprintf(stderr, "WARNING: Invalid packet received (%d bytes)\n", recv_len);
return 0;
uint8_t buffer[MAX_PACKET_SIZE];
size_t buffer_len = 0;

int tcp_ret = rootstream_net_tcp_recv(ctx, peer, buffer, &buffer_len);

if (tcp_ret < 0) {
/* TCP receive failed, disconnect and mark for reconnect */
fprintf(stderr, "WARNING: TCP receive failed for peer %s\n", peer->hostname);
peer->state = PEER_DISCONNECTED;
if (peer->reconnect_ctx) {
peer_try_reconnect(ctx, peer);
}
continue;
}

if (tcp_ret > 0 && buffer_len > 0) {
/* Process TCP packet */
process_received_packet(ctx, buffer, buffer_len, &peer->addr, peer->addr_len, TRANSPORT_TCP);
}
}

return 0;
}

/*
* Process a received packet (helper for both UDP and TCP)
*/
static int process_received_packet(rootstream_ctx_t *ctx, uint8_t *buffer, size_t recv_len,
struct sockaddr_storage *from, socklen_t fromlen,
transport_type_t transport) {
packet_header_t *hdr = (packet_header_t*)buffer;

/* Find or create peer */
peer_t *peer = rootstream_find_peer_by_addr(ctx, &from, fromlen);
peer_t *peer = rootstream_find_peer_by_addr(ctx, from, fromlen);
if (!peer) {
if (hdr->type != PKT_HANDSHAKE) {
fprintf(stderr, "WARNING: Packet from unknown peer (no handshake)\n");
Expand All @@ -400,9 +475,10 @@ int rootstream_net_recv(rootstream_ctx_t *ctx, int timeout_ms) {

peer = &ctx->peers[ctx->num_peers++];
memset(peer, 0, sizeof(peer_t));
memcpy(&peer->addr, &from, fromlen);
memcpy(&peer->addr, from, fromlen);
peer->addr_len = fromlen;
peer->state = PEER_CONNECTING;
peer->transport = transport; /* Set transport type */
peer->video_tx_frame_id = 1;
peer->video_rx_frame_id = 0;
peer->video_rx_buffer = NULL;
Expand All @@ -411,8 +487,9 @@ int rootstream_net_recv(rootstream_ctx_t *ctx, int timeout_ms) {
peer->video_rx_received = 0;
}

/* Update last seen time */
/* Update last seen and received time */
peer->last_seen = get_timestamp_ms();
peer->last_received = get_timestamp_ms();

/* Handle packet based on type */
switch (hdr->type) {
Expand Down Expand Up @@ -807,25 +884,39 @@ int rootstream_net_handshake(rootstream_ctx_t *ctx, peer_t *peer) {
memcpy(packet, &hdr, sizeof(hdr));
memcpy(packet + sizeof(hdr), payload, payload_len);

/* Try UDP handshake first */
int sent = rs_socket_sendto(ctx->sock_fd, packet, sizeof(hdr) + payload_len, 0,
(struct sockaddr*)&peer->addr, peer->addr_len);

if (sent < 0) {
int err = rs_socket_error();
fprintf(stderr, "ERROR: Handshake send failed\n");
fprintf(stderr, "REASON: %s\n", rs_socket_strerror(err));
fprintf(stderr, "WARNING: UDP handshake send failed: %s\n", rs_socket_strerror(err));

/* Try TCP fallback */
printf("INFO: Trying TCP fallback for handshake...\n");
if (rootstream_net_tcp_connect(ctx, peer) == 0) {
/* Send handshake over TCP */
if (rootstream_net_tcp_send(ctx, peer, packet, sizeof(hdr) + payload_len) == 0) {
printf("✓ TCP handshake sent\n");
peer->state = PEER_HANDSHAKE_SENT;
peer->handshake_sent_time = get_timestamp_ms();
return 0;
}
}
fprintf(stderr, "ERROR: Both UDP and TCP handshake failed\n");
return -1;
}

peer->last_sent = get_timestamp_ms();
peer->transport = TRANSPORT_UDP; /* Mark as UDP connection */

/* Update peer state and timestamp for timeout tracking */
if (peer->state != PEER_HANDSHAKE_RECEIVED && peer->state != PEER_CONNECTED) {
peer->state = PEER_HANDSHAKE_SENT;
peer->handshake_sent_time = get_timestamp_ms();
}

printf("→ Sent handshake to peer\n");
printf("→ Sent UDP handshake to peer\n");

return 0;
}
Expand Down Expand Up @@ -960,6 +1051,13 @@ peer_t* rootstream_add_peer(rootstream_ctx_t *ctx, const char *code) {
peer->video_rx_capacity = 0;
peer->video_rx_expected = 0;
peer->video_rx_received = 0;
peer->transport = TRANSPORT_UDP; /* Default to UDP */

/* Initialize reconnection context (PHASE 4) */
if (peer_reconnect_init(peer) < 0) {
fprintf(stderr, "WARNING: Failed to init reconnect context for peer\n");
}

ctx->num_peers++;

char fingerprint[32];
Expand Down Expand Up @@ -1006,6 +1104,14 @@ void rootstream_remove_peer(rootstream_ctx_t *ctx, peer_t *peer) {
return;
}

/* Cleanup PHASE 4 resources */
if (peer->transport == TRANSPORT_TCP) {
rootstream_net_tcp_cleanup(peer);
}
if (peer->reconnect_ctx) {
peer_reconnect_cleanup(peer);
}

if (peer->video_rx_buffer) {
if (ctx->current_frame.data == peer->video_rx_buffer) {
ctx->current_frame.data = NULL;
Expand Down
Loading
Loading