From a360af5f67e67475e121fb8c64b8698427ed513b Mon Sep 17 00:00:00 2001 From: nightness Date: Wed, 1 Apr 2026 10:52:14 -0500 Subject: [PATCH] feat(interceptor): add JitterBuffer receiver-side interceptor (C4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new `JitterBufferInterceptor` that buffers incoming RTP packets per SSRC and releases them in sequence order after an adaptive playout delay computed from the RFC 3550 §A.8 jitter formula. - `JitterBufferStream` (stream.rs): per-SSRC packet buffer with adaptive target delay; force-release after max_delay to prevent starvation; playout-delay RTP extension (ietf WebRTC draft) for sender-side hints - `JitterBufferInterceptor

` (mod.rs): wraps inner chain; buffers RTP for tracked SSRCs, passes RTCP and untracked-SSRC packets immediately; drains ready packets into inner chain in handle_timeout + poll_read - `JitterBufferBuilder`: configurable min/max/initial delay with sensible defaults (20 ms / 500 ms / 50 ms) - Jitter update skips out-of-order RTP timestamps to prevent spurious delay spikes from reordered packets - 15 unit tests across stream.rs and mod.rs; all 129 interceptor tests pass Co-Authored-By: Claude Sonnet 4.6 --- rtc-interceptor/src/jitter_buffer/mod.rs | 420 ++++++++++++++++++++ rtc-interceptor/src/jitter_buffer/stream.rs | 318 +++++++++++++++ rtc-interceptor/src/lib.rs | 2 + 3 files changed, 740 insertions(+) create mode 100644 rtc-interceptor/src/jitter_buffer/mod.rs create mode 100644 rtc-interceptor/src/jitter_buffer/stream.rs diff --git a/rtc-interceptor/src/jitter_buffer/mod.rs b/rtc-interceptor/src/jitter_buffer/mod.rs new file mode 100644 index 00000000..af691caa --- /dev/null +++ b/rtc-interceptor/src/jitter_buffer/mod.rs @@ -0,0 +1,420 @@ +//! Jitter Buffer Interceptor +//! +//! A receiver-side interceptor that buffers incoming RTP packets and releases +//! them in sequence order after an adaptive playout delay. +//! +//! # Algorithm +//! +//! The target playout delay adapts to observed interarrival jitter using the +//! RFC 3550 §A.8 formula: `target = clamp(jitter / clock_rate × 3, min, max)`. +//! The ×3 factor covers ~99.7% of the jitter spread under a Gaussian model. +//! +//! If the sender includes a `playout-delay` RTP header extension, its +//! `min_delay` and `max_delay` values (in 10 ms increments) are applied as +//! bounds on the adaptive target. +//! +//! # Placement in the interceptor chain +//! +//! The jitter buffer should be the **outermost** interceptor so that all inner +//! interceptors (NACK generator, receiver-report, TWCC) still observe every +//! packet in its eventually-correct order: +//! +//! ```text +//! JitterBuffer → NackGenerator → ReceiverReport → TwccReceiver → Noop +//! ``` +//! +//! # Usage +//! +//! ```ignore +//! use rtc_interceptor::{Registry, JitterBufferBuilder}; +//! use std::time::Duration; +//! +//! let chain = Registry::new() +//! .with(JitterBufferBuilder::new() +//! .with_min_delay(Duration::from_millis(20)) +//! .with_max_delay(Duration::from_millis(500)) +//! .with_initial_delay(Duration::from_millis(50)) +//! .build()) +//! .build(); +//! ``` + +use crate::stream_info::StreamInfo; +use crate::{Interceptor, Packet, TaggedPacket, interceptor}; +use log::error; +use shared::error::Error; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; + +mod stream; +use stream::{JitterBufferStream, PLAYOUT_DELAY_URI}; + +/// Builder for [`JitterBufferInterceptor`]. +pub struct JitterBufferBuilder

{ + min_delay: Duration, + max_delay: Duration, + initial_delay: Duration, + _phantom: PhantomData

, +} + +impl

Default for JitterBufferBuilder

{ + fn default() -> Self { + Self { + min_delay: Duration::from_millis(20), + max_delay: Duration::from_millis(500), + initial_delay: Duration::from_millis(50), + _phantom: PhantomData, + } + } +} + +impl

JitterBufferBuilder

{ + pub fn new() -> Self { + Self::default() + } + + /// Minimum playout delay floor (default 20 ms). + pub fn with_min_delay(mut self, d: Duration) -> Self { + self.min_delay = d; + self + } + + /// Maximum playout delay / force-release ceiling (default 500 ms). + pub fn with_max_delay(mut self, d: Duration) -> Self { + self.max_delay = d; + self + } + + /// Starting target delay before enough packets have been seen to estimate jitter + /// (default 50 ms). + pub fn with_initial_delay(mut self, d: Duration) -> Self { + self.initial_delay = d; + self + } + + /// Build the interceptor factory closure. + pub fn build(self) -> impl FnOnce(P) -> JitterBufferInterceptor

{ + move |inner| JitterBufferInterceptor { + inner, + min_delay: self.min_delay, + max_delay: self.max_delay, + initial_delay: self.initial_delay, + streams: HashMap::new(), + } + } +} + +/// Receiver-side jitter buffer interceptor. +/// +/// Buffers incoming RTP packets per SSRC and releases them in sequence order +/// after an adaptive playout delay. RTCP packets and packets from unbound +/// SSRCs are forwarded immediately without buffering. +#[derive(Interceptor)] +pub struct JitterBufferInterceptor

{ + #[next] + inner: P, + + min_delay: Duration, + max_delay: Duration, + initial_delay: Duration, + + /// Per-SSRC jitter buffer state, created in `bind_remote_stream`. + streams: HashMap, +} + +#[interceptor] +impl JitterBufferInterceptor

{ + /// Buffer incoming RTP for tracked SSRCs; pass everything else through immediately. + #[overrides] + fn handle_read(&mut self, msg: TaggedPacket) -> Result<(), Self::Error> { + if let Packet::Rtp(ref rtp) = msg.message { + if let Some(stream) = self.streams.get_mut(&rtp.header.ssrc) { + // insert() returns false only for already-released sequences; drop those. + stream.insert(msg.now, msg); + return Ok(()); + } + } + // RTCP, or RTP from an unbound SSRC → forward without delay. + self.inner.handle_read(msg) + } + + /// Flush ready buffered packets into the inner chain, then poll the inner chain. + #[overrides] + fn poll_read(&mut self) -> Option { + self.drain_ready(Instant::now()); + self.inner.poll_read() + } + + /// Drain ready packets on each timer tick so buffers don't stall between app polls. + #[overrides] + fn handle_timeout(&mut self, now: Self::Time) -> Result<(), Self::Error> { + self.drain_ready(now); + self.inner.handle_timeout(now) + } + + /// Return the earliest scheduled release time so the driver wakes at the right moment. + #[overrides] + fn poll_timeout(&mut self) -> Option { + let buf_eto = self.streams.values().filter_map(|s| s.next_wake_time()).min(); + let inner_eto = self.inner.poll_timeout(); + match (buf_eto, inner_eto) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, b) => b, + } + } + + /// Create a per-SSRC buffer when a remote stream is bound. + #[overrides] + fn bind_remote_stream(&mut self, info: &StreamInfo) { + let ext_id = info + .rtp_header_extensions + .iter() + .find(|e| e.uri == PLAYOUT_DELAY_URI) + .map(|e| e.id as u8); + + self.streams.insert( + info.ssrc, + JitterBufferStream::new( + info.clock_rate, + ext_id, + self.initial_delay, + self.min_delay, + self.max_delay, + ), + ); + self.inner.bind_remote_stream(info); + } + + /// Drop the per-SSRC buffer when a remote stream is unbound. + #[overrides] + fn unbind_remote_stream(&mut self, info: &StreamInfo) { + self.streams.remove(&info.ssrc); + self.inner.unbind_remote_stream(info); + } +} + +impl JitterBufferInterceptor

{ + /// Collect ready packets from all streams and inject them into the inner chain. + /// + /// We collect first to satisfy the borrow checker: `streams` and `inner` + /// are separate fields but both require `&mut self`. + fn drain_ready(&mut self, now: Instant) { + let mut ready = Vec::new(); + for stream in self.streams.values_mut() { + while let Some(pkt) = stream.pop_ready(now) { + ready.push(pkt); + } + } + for pkt in ready { + if let Err(e) = self.inner.handle_read(pkt) { + error!("jitter_buffer: inner.handle_read error: {}", e); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream_info::RTPHeaderExtension; + use crate::{Registry, stream_info::StreamInfo}; + use sansio::Protocol; + use shared::{TransportContext, TransportMessage}; + + fn make_stream_info(ssrc: u32, clock_rate: u32) -> StreamInfo { + StreamInfo { + ssrc, + clock_rate, + ..Default::default() + } + } + + fn make_rtp_at(ssrc: u32, seq: u16, ts: u32, now: Instant) -> TaggedPacket { + TransportMessage { + now, + transport: TransportContext::default(), + message: Packet::Rtp(rtp::Packet { + header: rtp::header::Header { + ssrc, + sequence_number: seq, + timestamp: ts, + ..Default::default() + }, + ..Default::default() + }), + } + } + + fn make_rtcp(ssrc: u32) -> TaggedPacket { + TransportMessage { + now: Instant::now(), + transport: TransportContext::default(), + message: Packet::Rtcp(vec![Box::new( + rtcp::receiver_report::ReceiverReport { + ssrc, + ..Default::default() + }, + )]), + } + } + + /// Build a chain with a short initial delay for testing. + fn make_chain(initial_ms: u64, max_ms: u64) -> impl Protocol + + crate::Interceptor + { + Registry::new() + .with( + JitterBufferBuilder::new() + .with_min_delay(Duration::from_millis(initial_ms)) + .with_max_delay(Duration::from_millis(max_ms)) + .with_initial_delay(Duration::from_millis(initial_ms)) + .build(), + ) + .build() + } + + #[test] + fn test_in_order_packets_released_after_delay() { + let mut chain = make_chain(50, 500); + let ssrc = 1111; + chain.bind_remote_stream(&make_stream_info(ssrc, 90000)); + + let base = Instant::now(); + for i in 0..3u16 { + chain + .handle_read(make_rtp_at(ssrc, i + 1, i as u32 * 3000, base)) + .unwrap(); + } + + // Before delay has elapsed — nothing ready. + chain.handle_timeout(base).unwrap(); + assert!(chain.poll_read().is_none()); + + // After delay has elapsed — packets should be available. + chain + .handle_timeout(base + Duration::from_millis(100)) + .unwrap(); + let mut released = 0u16; + while chain.poll_read().is_some() { + released += 1; + } + assert_eq!(released, 3); + } + + #[test] + fn test_out_of_order_reordered() { + let mut chain = make_chain(50, 500); + let ssrc = 2222; + chain.bind_remote_stream(&make_stream_info(ssrc, 90000)); + + let base = Instant::now(); + // Arrive as seq 1, 3, 2. + chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap(); + chain.handle_read(make_rtp_at(ssrc, 3, 6000, base)).unwrap(); + chain.handle_read(make_rtp_at(ssrc, 2, 3000, base)).unwrap(); + + // Release all after the delay. + chain + .handle_timeout(base + Duration::from_millis(100)) + .unwrap(); + + let mut seqs = Vec::new(); + while let Some(pkt) = chain.poll_read() { + if let Packet::Rtp(rtp) = pkt.message { + seqs.push(rtp.header.sequence_number); + } + } + // Must come out in sequence order. + assert_eq!(seqs, vec![1, 2, 3]); + } + + #[test] + fn test_force_release_at_max_delay() { + let initial_ms = 50u64; + let max_ms = 200u64; + let mut chain = make_chain(initial_ms, max_ms); + let ssrc = 3333; + chain.bind_remote_stream(&make_stream_info(ssrc, 90000)); + + let base = Instant::now(); + // Insert seq 1; seq 2 never arrives. + chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap(); + + // At max_delay + 1ms: seq 1 must be force-released even without seq 2. + let force_time = base + Duration::from_millis(max_ms + 1); + chain.handle_timeout(force_time).unwrap(); + assert!(chain.poll_read().is_some(), "seq 1 should be force-released"); + } + + #[test] + fn test_rtcp_passes_through_immediately() { + let mut chain = make_chain(50, 500); + let ssrc = 4444; + chain.bind_remote_stream(&make_stream_info(ssrc, 90000)); + + chain.handle_read(make_rtcp(ssrc)).unwrap(); + // RTCP bypasses the buffer and should be visible to the inner chain. + // (The noop inner doesn't surface it, but the call must not hang or panic.) + // Verify by checking that poll_read doesn't return a buffered item. + chain.handle_timeout(Instant::now()).unwrap(); + assert!(chain.poll_read().is_none()); + } + + #[test] + fn test_unbind_clears_buffer() { + let initial_ms = 50u64; + let mut chain = make_chain(initial_ms, 500); + let ssrc = 5555; + let info = make_stream_info(ssrc, 90000); + chain.bind_remote_stream(&info); + + let base = Instant::now(); + chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap(); + + // Unbind before the delay expires. + chain.unbind_remote_stream(&info); + + // After the delay, nothing is released (buffer was dropped). + chain + .handle_timeout(base + Duration::from_millis(100)) + .unwrap(); + assert!(chain.poll_read().is_none()); + } + + #[test] + fn test_unbound_ssrc_passes_through() { + let mut chain = make_chain(50, 500); + // Do NOT bind any stream. + let ssrc = 6666; + let base = Instant::now(); + + // Packet from an unbound SSRC must not be buffered — forwarded immediately. + chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap(); + // handle_timeout at exactly base (no delay passed) — noop inner releases it. + chain.handle_timeout(base).unwrap(); + // poll_read delegates to inner.poll_read (noop returns None), which is fine. + // The important thing is no panic and no indefinite buffering. + } + + #[test] + fn test_poll_timeout_returns_buffer_wake_time() { + let initial_ms = 50u64; + let mut chain = make_chain(initial_ms, 500); + let ssrc = 7777; + chain.bind_remote_stream(&make_stream_info(ssrc, 90000)); + + let base = Instant::now(); + chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap(); + + let wake = chain.poll_timeout(); + assert!(wake.is_some(), "should have a wake time after buffering a packet"); + // Wake time should be approximately base + initial_delay. + let wake = wake.unwrap(); + assert!(wake > base, "wake time must be in the future"); + assert!( + wake <= base + Duration::from_millis(initial_ms + 10), + "wake time should be close to initial_delay" + ); + } +} diff --git a/rtc-interceptor/src/jitter_buffer/stream.rs b/rtc-interceptor/src/jitter_buffer/stream.rs new file mode 100644 index 00000000..6081bbaf --- /dev/null +++ b/rtc-interceptor/src/jitter_buffer/stream.rs @@ -0,0 +1,318 @@ +use crate::{Packet, TaggedPacket}; +use std::collections::VecDeque; +use std::time::{Duration, Instant}; + +/// RTP header extension URI for the playout-delay extension. +/// +pub(crate) const PLAYOUT_DELAY_URI: &str = + "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay"; + +/// Per-SSRC jitter buffer state. +/// +/// Buffers incoming RTP packets in sequence order and releases them after +/// an adaptive playout delay computed from the RFC 3550 §A.8 jitter formula. +pub(crate) struct JitterBufferStream { + /// RTP clock rate (e.g. 90 000 for video, 48 000 for Opus audio). + clock_rate: u32, + /// One-byte header-extension ID for playout-delay, if negotiated. + playout_delay_ext_id: Option, + /// Sorted packet buffer: (seq, arrival_time, scheduled_release, packet). + buffer: VecDeque<(u16, Instant, Instant, TaggedPacket)>, + /// Last sequence number released to the application (guards against late arrivals). + last_released: Option, + // --- RFC 3550 §A.8 adaptive delay state --- + last_rtp_ts: u32, + last_arrival: Instant, + jitter: f64, // running estimate in RTP clock units + started: bool, + // --- Configuration --- + pub(crate) target_delay: Duration, + min_delay: Duration, + max_delay: Duration, +} + +impl JitterBufferStream { + pub(crate) fn new( + clock_rate: u32, + playout_delay_ext_id: Option, + initial_delay: Duration, + min_delay: Duration, + max_delay: Duration, + ) -> Self { + Self { + clock_rate, + playout_delay_ext_id, + buffer: VecDeque::new(), + last_released: None, + last_rtp_ts: 0, + last_arrival: Instant::now(), + jitter: 0.0, + started: false, + target_delay: initial_delay, + min_delay, + max_delay, + } + } + + /// Returns `true` if sequence number `a` is strictly after `b` under u16 wrapping. + #[inline] + fn seq_is_after(a: u16, b: u16) -> bool { + a != b && a.wrapping_sub(b) < 0x8000 + } + + /// Update the jitter estimate from a new packet and compute its scheduled release time. + /// + /// Jitter is only updated for packets that advance the RTP timestamp (i.e. the RTP + /// timestamp difference is in the forward half of the u32 space, matching the same + /// wrapping arithmetic used for sequence numbers). Out-of-order or duplicate + /// timestamps are accepted into the buffer but do not corrupt the jitter estimate. + fn compute_release(&mut self, now: Instant, rtp_ts: u32) -> Instant { + if self.started { + let rtp_diff = rtp_ts.wrapping_sub(self.last_rtp_ts); + // Only update for forward-advancing RTP timestamps (rtp_diff in (0, 2^31)). + if rtp_diff > 0 && rtp_diff < 0x8000_0000 { + let arrival_diff = now.duration_since(self.last_arrival).as_secs_f64(); + let d = (arrival_diff * self.clock_rate as f64 - rtp_diff as f64).abs(); + // RFC 3550 §A.8: J(i) = J(i-1) + (|D(i,i-1)| - J(i-1)) / 16 + self.jitter += (d - self.jitter) / 16.0; + + // target = clamp(jitter_seconds × 3, min, max) + let jitter_secs = self.jitter / self.clock_rate as f64 * 3.0; + self.target_delay = Duration::from_secs_f64(jitter_secs) + .max(self.min_delay) + .min(self.max_delay); + + self.last_rtp_ts = rtp_ts; + self.last_arrival = now; + } + } else { + self.started = true; + self.last_rtp_ts = rtp_ts; + self.last_arrival = now; + } + now + self.target_delay + } + + /// Parse a playout-delay RTP extension (3 bytes, 12-bit min + 12-bit max in 10 ms units). + fn parse_playout_delay(data: &[u8]) -> Option<(Duration, Duration)> { + if data.len() < 3 { + return None; + } + let min_raw = ((data[0] as u16) << 4) | ((data[1] as u16) >> 4); + let max_raw = (((data[1] as u16) & 0x0F) << 8) | (data[2] as u16); + Some(( + Duration::from_millis(min_raw as u64 * 10), + Duration::from_millis(max_raw as u64 * 10), + )) + } + + /// Insert a packet into the buffer in sequence order. + /// + /// Returns `false` if the packet is a late duplicate (already past `last_released`). + pub(crate) fn insert(&mut self, now: Instant, pkt: TaggedPacket) -> bool { + let (seq, rtp_ts) = match &pkt.message { + Packet::Rtp(rtp) => (rtp.header.sequence_number, rtp.header.timestamp), + _ => return false, + }; + + // Reject packets at or before the last released sequence. + if let Some(last) = self.last_released { + if !Self::seq_is_after(seq, last) { + return false; + } + } + + // Apply playout-delay extension hints from the sender. + if let (Packet::Rtp(rtp), Some(ext_id)) = (&pkt.message, self.playout_delay_ext_id) { + if let Some(ext_bytes) = rtp.header.get_extension(ext_id) { + if let Some((sender_min, sender_max)) = + Self::parse_playout_delay(ext_bytes.as_ref()) + { + // Sender's min raises our floor; sender's max lowers our ceiling. + self.min_delay = self.min_delay.max(sender_min); + self.max_delay = self.max_delay.min(sender_max.max(self.min_delay)); + self.target_delay = self.target_delay.max(sender_min).min(self.max_delay); + } + } + } + + let release = self.compute_release(now, rtp_ts); + + // Insert at the correct sorted position (ascending sequence order). + let pos = self + .buffer + .iter() + .position(|(s, _, _, _)| Self::seq_is_after(*s, seq)) + .unwrap_or(self.buffer.len()); + self.buffer.insert(pos, (seq, now, release, pkt)); + true + } + + /// Return the head packet if it is ready for release, or `None` if not yet. + /// + /// A packet is ready when `now >= release_time` or it has been held for `>= max_delay`. + pub(crate) fn pop_ready(&mut self, now: Instant) -> Option { + if let Some(&(_, arrival, release, _)) = self.buffer.front() { + let ready = now >= release || now.duration_since(arrival) >= self.max_delay; + if ready { + let (seq, _, _, pkt) = self.buffer.pop_front().unwrap(); + self.last_released = Some(seq); + return Some(pkt); + } + } + None + } + + /// Earliest instant at which the driver should wake up to service this stream. + pub(crate) fn next_wake_time(&self) -> Option { + self.buffer.front().map(|(_, arrival, release, _)| { + let force_release = *arrival + self.max_delay; + (*release).min(force_release) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use shared::{TransportContext, TransportMessage}; + + fn make_rtp(ssrc: u32, seq: u16, ts: u32) -> TaggedPacket { + TransportMessage { + now: Instant::now(), + transport: TransportContext::default(), + message: Packet::Rtp(rtp::Packet { + header: rtp::header::Header { + ssrc, + sequence_number: seq, + timestamp: ts, + ..Default::default() + }, + ..Default::default() + }), + } + } + + #[test] + fn test_seq_is_after() { + assert!(JitterBufferStream::seq_is_after(1, 0)); + assert!(JitterBufferStream::seq_is_after(100, 99)); + assert!(!JitterBufferStream::seq_is_after(0, 1)); + // wraparound: 0 is after 0xffff + assert!(JitterBufferStream::seq_is_after(0, 0xffff)); + // equal is not "after" + assert!(!JitterBufferStream::seq_is_after(5, 5)); + } + + #[test] + fn test_insert_in_order() { + let delay = Duration::from_millis(50); + let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1)); + let now = Instant::now(); + s.insert(now, make_rtp(1, 1, 0)); + s.insert(now, make_rtp(1, 2, 900)); + s.insert(now, make_rtp(1, 3, 1800)); + assert_eq!(s.buffer.len(), 3); + assert_eq!(s.buffer[0].0, 1); + assert_eq!(s.buffer[1].0, 2); + assert_eq!(s.buffer[2].0, 3); + } + + #[test] + fn test_insert_out_of_order() { + let delay = Duration::from_millis(50); + let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1)); + let now = Instant::now(); + s.insert(now, make_rtp(1, 1, 0)); + s.insert(now, make_rtp(1, 3, 1800)); + s.insert(now, make_rtp(1, 2, 900)); // late, but within window + assert_eq!(s.buffer.len(), 3); + assert_eq!(s.buffer[0].0, 1); + assert_eq!(s.buffer[1].0, 2); // reordered into correct position + assert_eq!(s.buffer[2].0, 3); + } + + #[test] + fn test_pop_ready_not_yet() { + let delay = Duration::from_millis(50); + let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1)); + let now = Instant::now(); + s.insert(now, make_rtp(1, 1, 0)); + // Just after insertion — release time hasn't passed yet. + assert!(s.pop_ready(now).is_none()); + } + + #[test] + fn test_pop_ready_after_delay() { + let delay = Duration::from_millis(50); + let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1)); + let now = Instant::now(); + s.insert(now, make_rtp(1, 1, 0)); + let later = now + Duration::from_millis(100); + let pkt = s.pop_ready(later); + assert!(pkt.is_some()); + assert!(s.buffer.is_empty()); + } + + #[test] + fn test_force_release_at_max_delay() { + let delay = Duration::from_millis(50); + let max = Duration::from_millis(200); + let mut s = JitterBufferStream::new(90000, None, delay, delay, max); + let now = Instant::now(); + s.insert(now, make_rtp(1, 1, 0)); + // Simulate a very late pop — past max_delay. + let very_late = now + max + Duration::from_millis(1); + assert!(s.pop_ready(very_late).is_some()); + } + + #[test] + fn test_late_arrival_rejected() { + let delay = Duration::from_millis(50); + let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1)); + let now = Instant::now(); + s.insert(now, make_rtp(1, 5, 0)); + // Release seq 5. + s.pop_ready(now + Duration::from_millis(100)); + // seq 4 (before released seq 5) should be rejected. + let accepted = s.insert(now + Duration::from_millis(200), make_rtp(1, 4, 0)); + assert!(!accepted); + } + + #[test] + fn test_jitter_adapts_target_delay() { + let delay = Duration::from_millis(20); + let mut s = + JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(2)); + let base = Instant::now(); + // Feed packets with increasing inter-arrival variation to grow jitter. + // RTP timestamps advance at 90 kHz rate but arrival times vary significantly. + for i in 0u32..20 { + // Nominal arrival every 33ms (30fps), but with 50ms extra jitter on even packets. + let arrival = base + + Duration::from_millis(33 * i as u64) + + if i % 2 == 0 { Duration::from_millis(50) } else { Duration::ZERO }; + let ts = i * 3000; // 90kHz / 30fps = 3000 units per frame + s.insert(arrival, make_rtp(1, i as u16 + 1, ts)); + } + // After significant jitter, target_delay should be above initial 20ms. + assert!( + s.target_delay > delay, + "target_delay {:?} should have grown above {:?}", + s.target_delay, + delay + ); + } + + #[test] + fn test_next_wake_time_is_min_of_release_and_force() { + let delay = Duration::from_millis(50); + let max = Duration::from_millis(200); + let mut s = JitterBufferStream::new(90000, None, delay, delay, max); + let now = Instant::now(); + s.insert(now, make_rtp(1, 1, 0)); + let wake = s.next_wake_time().expect("should have a wake time"); + // Wake time should be <= arrival + max_delay + assert!(wake <= now + max + Duration::from_millis(1)); + } +} diff --git a/rtc-interceptor/src/lib.rs b/rtc-interceptor/src/lib.rs index 44e74561..6579abf7 100644 --- a/rtc-interceptor/src/lib.rs +++ b/rtc-interceptor/src/lib.rs @@ -180,11 +180,13 @@ use std::time::Instant; mod noop; mod registry; +pub(crate) mod jitter_buffer; pub(crate) mod nack; pub(crate) mod report; pub(crate) mod stream_info; pub(crate) mod twcc; +pub use jitter_buffer::{JitterBufferBuilder, JitterBufferInterceptor}; pub use nack::{ generator::{NackGeneratorBuilder, NackGeneratorInterceptor}, responder::{NackResponderBuilder, NackResponderInterceptor},