From 0303927444e0f0f26bb4666550abb72403d5ea25 Mon Sep 17 00:00:00 2001 From: nightness Date: Wed, 1 Apr 2026 10:30:58 -0500 Subject: [PATCH] feat(turn): add TCP TURN client example and expose turn_server_addr/local_addr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add rtc-turn/examples/turn_client_tcp.rs: demonstrates TURN over TCP using RFC 4571 framing (TcpFrameDecoder + frame_packet from rtc-shared) - Make Client::turn_server_addr() and Client::local_addr() public so callers (e.g. the ICE gatherer in the async layer) can identify TURN server addresses - Fix relay.rs and mod.rs call sites from the return-type change (Result → Option for turn_server_addr) Co-Authored-By: Claude Sonnet 4.6 --- rtc-turn/Cargo.toml | 5 + rtc-turn/examples/turn_client_tcp.rs | 246 +++++++++++++++++++++++++++ rtc-turn/src/client/mod.rs | 15 +- rtc-turn/src/client/relay.rs | 10 +- 4 files changed, 266 insertions(+), 10 deletions(-) create mode 100644 rtc-turn/examples/turn_client_tcp.rs diff --git a/rtc-turn/Cargo.toml b/rtc-turn/Cargo.toml index 7e258b73..0bf28ef3 100644 --- a/rtc-turn/Cargo.toml +++ b/rtc-turn/Cargo.toml @@ -39,3 +39,8 @@ harness = false name = "turn_client_udp" path = "examples/turn_client_udp.rs" bench = false + +[[example]] +name = "turn_client_tcp" +path = "examples/turn_client_tcp.rs" +bench = false diff --git a/rtc-turn/examples/turn_client_tcp.rs b/rtc-turn/examples/turn_client_tcp.rs new file mode 100644 index 00000000..51bb8052 --- /dev/null +++ b/rtc-turn/examples/turn_client_tcp.rs @@ -0,0 +1,246 @@ +use bytes::BytesMut; +use clap::Parser; +use log::trace; +use rtc_turn::client::*; +use sansio::Protocol; +use shared::error::{Error, Result}; +use shared::tcp_framing::{TcpFrameDecoder, frame_packet}; +use shared::{TransportContext, TransportMessage, TransportProtocol}; +use std::io::{ErrorKind, Read, Write}; +use std::net::TcpStream; +use std::str::FromStr; +use std::time::{Duration, Instant}; + +// First, start turn server with TCP support: +// +// Option 1: webrtc-rs/webrtc/turn/examples/turn_server_tcp: +// RUST_LOG=trace cargo run --color=always --package turn --example turn_server_tcp -- --public-ip 127.0.0.1 --users user=pass +// +// Option 2: coturn (reference TURN server): +// turnserver --lt-cred-mech --user user:pass --realm webrtc.rs --no-dtls --no-tls +// +// Then, start this example: +// RUST_LOG=trace cargo run --color=always --package rtc-turn --example turn_client_tcp -- --host 127.0.0.1 --user user=pass + +#[derive(Parser)] +#[command(name = "TURN Client TCP")] +#[command(author = "Brainwires ")] +#[command(version = "0.1.0")] +#[command(about = "An example of TURN Client over TCP (RFC 6062)", long_about = None)] +struct Cli { + #[arg(long, default_value_t = format!("127.0.0.1"))] + host: String, + #[arg(long, default_value_t = 3478)] + port: u16, + #[arg(long)] + user: String, + #[arg(long, default_value_t = format!("webrtc.rs"))] + realm: String, + + #[arg(short, long)] + debug: bool, + #[arg(long, default_value_t = format!("INFO"))] + log_level: String, +} + +fn main() -> Result<()> { + let cli = Cli::parse(); + if cli.debug { + let log_level = log::LevelFilter::from_str(&cli.log_level).unwrap(); + env_logger::Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{}:{} [{}] {} - {}", + record.file().unwrap_or("unknown"), + record.line().unwrap_or(0), + record.level(), + chrono::Local::now().format("%H:%M:%S.%6f"), + record.args() + ) + }) + .filter(None, log_level) + .init(); + } + + let host = cli.host; + let port = cli.port; + let user = cli.user; + let cred: Vec<&str> = user.splitn(2, '=').collect(); + let realm = cli.realm; + + let turn_server_addr = format!("{host}:{port}"); + + // Connect a TCP socket to the TURN server. + // Unlike UDP, the TURN client over TCP uses a single persistent connection. + let mut stream = TcpStream::connect(&turn_server_addr)?; + let local_addr = stream.local_addr()?; + let peer_addr = stream.peer_addr()?; + + println!("TCP connected: {} → {}", local_addr, peer_addr); + + // Configure the TCP stream for non-blocking reads in the polling loop. + stream.set_nonblocking(true)?; + let mut stream_write = stream.try_clone()?; + + let cfg = ClientConfig { + stun_serv_addr: turn_server_addr.clone(), + turn_serv_addr: turn_server_addr, + local_addr, + transport_protocol: TransportProtocol::TCP, + username: cred[0].to_string(), + password: cred[1].to_string(), + realm: realm.to_string(), + software: String::new(), + rto_in_ms: 0, + }; + + let mut client = Client::new(cfg)?; + + // Allocate a relay socket on the TURN server (over TCP). + let allocate_tid = client.allocate()?; + let mut relayed_addr = None; + + let (stop_tx, stop_rx) = crossbeam_channel::bounded::<()>(1); + println!("Press Ctrl-C to stop"); + std::thread::spawn(move || { + let mut stop_tx = Some(stop_tx); + ctrlc::set_handler(move || { + if let Some(stop_tx) = stop_tx.take() { + let _ = stop_tx.send(()); + } + }) + .expect("Error setting Ctrl-C handler"); + }); + + // RFC 4571 decoder for inbound TCP frames. + let mut decoder = TcpFrameDecoder::new(); + let mut buf = vec![0u8; 4096]; + + loop { + match stop_rx.try_recv() { + Ok(_) => break, + Err(err) => { + if err.is_disconnected() { + break; + } + } + }; + + // Flush outbound TURN messages (each wrapped in a 2-byte length prefix per RFC 4571). + while let Some(transmit) = client.poll_write() { + let framed = frame_packet(&transmit.message); + stream_write.write_all(&framed)?; + trace!( + "tcp.sent {} bytes to {}", + transmit.message.len(), + transmit.transport.peer_addr + ); + } + + // Process TURN events. + while let Some(event) = client.poll_event() { + match event { + Event::TransactionTimeout(_) => return Err(Error::ErrTimeout), + Event::BindingResponse(_, reflexive_addr) => { + println!("reflexive address {}", reflexive_addr); + } + Event::BindingError(_, err) => return Err(err), + Event::AllocateResponse(tid, addr) => { + println!("relayed address {}", addr); + if relayed_addr.is_none() { + assert_eq!(tid, allocate_tid); + relayed_addr = Some(addr); + println!( + "TURN relay allocated over TCP: {} (refresh will keep it alive)", + addr + ); + } + } + Event::AllocateError(_, err) => return Err(err), + Event::CreatePermissionResponse(tid, peer_addr) => { + println!("CreatePermission for peer addr {} is granted (tid={:?})", peer_addr, tid); + } + Event::CreatePermissionError(_, err) => return Err(err), + Event::DataIndicationOrChannelData(_, from, data) => { + println!("relay read: {:?} from {}", &data[..], from); + // Echo back + if let Some(&relay_addr) = relayed_addr.as_ref() { + client.relay(relay_addr)?.send_to(&data[..], from)?; + } + } + } + } + + // Compute next timeout. + let mut eto = Instant::now() + Duration::from_millis(100); + if let Some(to) = client.poll_timeout() { + if to < eto { + eto = to; + } + } + let delay_from_now = eto + .checked_duration_since(Instant::now()) + .unwrap_or(Duration::from_secs(0)); + + // Non-blocking read from TCP socket. + // RFC 4571: each TURN message is prefixed with a 2-byte big-endian length. + match read_tcp_input(&mut stream, &mut buf, &mut decoder) { + Some(data) => { + trace!( + "tcp.recv {} bytes from {}", + data.len(), + peer_addr + ); + let msg = TransportMessage { + now: Instant::now(), + transport: TransportContext { + local_addr, + peer_addr, + transport_protocol: TransportProtocol::TCP, + ecn: None, + }, + message: BytesMut::from(data.as_slice()), + }; + client.handle_read(msg)?; + } + None => { + // No complete frame yet — sleep briefly to avoid busy-polling. + if !delay_from_now.is_zero() { + std::thread::sleep(std::cmp::min( + delay_from_now, + Duration::from_millis(5), + )); + } + } + } + + // Drive time forward. + client.handle_timeout(Instant::now())?; + } + + client.close() +} + +/// Read from a non-blocking TCP stream, decode RFC 4571 frames. +/// Returns the next complete TURN message payload (without the 2-byte length header), +/// or `None` if no complete frame is available yet. +fn read_tcp_input( + stream: &mut TcpStream, + buf: &mut Vec, + decoder: &mut TcpFrameDecoder, +) -> Option> { + // Drain available bytes into the decoder. + loop { + match stream.read(buf.as_mut_slice()) { + Ok(0) => break, // EOF + Ok(n) => decoder.extend_from_slice(&buf[..n]), + Err(e) if e.kind() == ErrorKind::WouldBlock => break, + Err(e) => { + eprintln!("TCP read error: {e}"); + break; + } + } + } + decoder.next_packet() +} diff --git a/rtc-turn/src/client/mod.rs b/rtc-turn/src/client/mod.rs index 5655f322..9d49b1d0 100644 --- a/rtc-turn/src/client/mod.rs +++ b/rtc-turn/src/client/mod.rs @@ -448,7 +448,7 @@ impl Client { debug!("client.Allocate call PerformTransaction 1"); let mut tid = self.perform_transaction( &msg, - self.turn_server_addr()?, + self.turn_server_addr().ok_or(Error::ErrNilTurnSocket)?, TransactionType::AllocateAttempt, ); tid.0[TRANSACTION_ID_SIZE - 1] = tid.0[TRANSACTION_ID_SIZE - 1].wrapping_add(1); @@ -514,7 +514,7 @@ impl Client { debug!("client.Allocate call PerformTransaction 2"); self.perform_transaction( &msg, - self.turn_server_addr()?, + self.turn_server_addr().ok_or(Error::ErrNilTurnSocket)?, TransactionType::AllocateRequest(nonce), ); } @@ -554,9 +554,14 @@ impl Client { Ok(()) } - /// turn_server_addr return the TURN server address - fn turn_server_addr(&self) -> Result { - self.turn_serv_addr.ok_or(Error::ErrNilTurnSocket) + /// Returns the TURN server address, if configured. + pub fn turn_server_addr(&self) -> Option { + self.turn_serv_addr + } + + /// Returns the local address this client is bound to. + pub fn local_addr(&self) -> SocketAddr { + self.local_addr } /// username returns username diff --git a/rtc-turn/src/client/relay.rs b/rtc-turn/src/client/relay.rs index f9ef8174..9239bbfe 100644 --- a/rtc-turn/src/client/relay.rs +++ b/rtc-turn/src/client/relay.rs @@ -207,7 +207,7 @@ impl Relay<'_> { // indication has no transaction (fire-and-forget) self.client - .write_to(&msg.raw, self.client.turn_server_addr()?); + .write_to(&msg.raw, self.client.turn_server_addr().ok_or(Error::ErrNilTurnSocket)?); return Ok(()); } @@ -271,7 +271,7 @@ impl Relay<'_> { Ok(self.client.perform_transaction( &msg, - self.client.turn_server_addr()?, + self.client.turn_server_addr().ok_or(Error::ErrNilTurnSocket)?, TransactionType::CreatePermissionRequest(self.relayed_addr, peer_addr_opt), )) } else { @@ -337,7 +337,7 @@ impl Relay<'_> { let _ = self.client.perform_transaction( &msg, - self.client.turn_server_addr()?, + self.client.turn_server_addr().ok_or(Error::ErrNilTurnSocket)?, TransactionType::RefreshRequest(self.relayed_addr), ); @@ -418,7 +418,7 @@ impl Relay<'_> { let mut msg = Message::new(); msg.build(&setters)?; - (msg, self.client.turn_server_addr()?) + (msg, self.client.turn_server_addr().ok_or(Error::ErrNilTurnSocket)?) }; debug!("UDPConn.bind call PerformTransaction 1"); @@ -483,7 +483,7 @@ impl Relay<'_> { ch_data.encode(); self.client - .write_to(&ch_data.raw, self.client.turn_server_addr()?); + .write_to(&ch_data.raw, self.client.turn_server_addr().ok_or(Error::ErrNilTurnSocket)?); Ok(()) }