diff --git a/rtc-interceptor/src/jitter_buffer/mod.rs b/rtc-interceptor/src/jitter_buffer/mod.rs
new file mode 100644
index 00000000..af691caa
--- /dev/null
+++ b/rtc-interceptor/src/jitter_buffer/mod.rs
@@ -0,0 +1,420 @@
+//! Jitter Buffer Interceptor
+//!
+//! A receiver-side interceptor that buffers incoming RTP packets and releases
+//! them in sequence order after an adaptive playout delay.
+//!
+//! # Algorithm
+//!
+//! The target playout delay adapts to observed interarrival jitter using the
+//! RFC 3550 §A.8 formula: `target = clamp(jitter / clock_rate × 3, min, max)`.
+//! The ×3 factor covers ~99.7% of the jitter spread under a Gaussian model.
+//!
+//! If the sender includes a `playout-delay` RTP header extension, its
+//! `min_delay` and `max_delay` values (in 10 ms increments) are applied as
+//! bounds on the adaptive target.
+//!
+//! # Placement in the interceptor chain
+//!
+//! The jitter buffer should be the **outermost** interceptor so that all inner
+//! interceptors (NACK generator, receiver-report, TWCC) still observe every
+//! packet in its eventually-correct order:
+//!
+//! ```text
+//! JitterBuffer → NackGenerator → ReceiverReport → TwccReceiver → Noop
+//! ```
+//!
+//! # Usage
+//!
+//! ```ignore
+//! use rtc_interceptor::{Registry, JitterBufferBuilder};
+//! use std::time::Duration;
+//!
+//! let chain = Registry::new()
+//! .with(JitterBufferBuilder::new()
+//! .with_min_delay(Duration::from_millis(20))
+//! .with_max_delay(Duration::from_millis(500))
+//! .with_initial_delay(Duration::from_millis(50))
+//! .build())
+//! .build();
+//! ```
+
+use crate::stream_info::StreamInfo;
+use crate::{Interceptor, Packet, TaggedPacket, interceptor};
+use log::error;
+use shared::error::Error;
+use std::collections::HashMap;
+use std::marker::PhantomData;
+use std::time::{Duration, Instant};
+
+mod stream;
+use stream::{JitterBufferStream, PLAYOUT_DELAY_URI};
+
+/// Builder for [`JitterBufferInterceptor`].
+pub struct JitterBufferBuilder
{
+ min_delay: Duration,
+ max_delay: Duration,
+ initial_delay: Duration,
+ _phantom: PhantomData
,
+}
+
+impl
Default for JitterBufferBuilder
{
+ fn default() -> Self {
+ Self {
+ min_delay: Duration::from_millis(20),
+ max_delay: Duration::from_millis(500),
+ initial_delay: Duration::from_millis(50),
+ _phantom: PhantomData,
+ }
+ }
+}
+
+impl
JitterBufferBuilder
{
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Minimum playout delay floor (default 20 ms).
+ pub fn with_min_delay(mut self, d: Duration) -> Self {
+ self.min_delay = d;
+ self
+ }
+
+ /// Maximum playout delay / force-release ceiling (default 500 ms).
+ pub fn with_max_delay(mut self, d: Duration) -> Self {
+ self.max_delay = d;
+ self
+ }
+
+ /// Starting target delay before enough packets have been seen to estimate jitter
+ /// (default 50 ms).
+ pub fn with_initial_delay(mut self, d: Duration) -> Self {
+ self.initial_delay = d;
+ self
+ }
+
+ /// Build the interceptor factory closure.
+ pub fn build(self) -> impl FnOnce(P) -> JitterBufferInterceptor
{
+ move |inner| JitterBufferInterceptor {
+ inner,
+ min_delay: self.min_delay,
+ max_delay: self.max_delay,
+ initial_delay: self.initial_delay,
+ streams: HashMap::new(),
+ }
+ }
+}
+
+/// Receiver-side jitter buffer interceptor.
+///
+/// Buffers incoming RTP packets per SSRC and releases them in sequence order
+/// after an adaptive playout delay. RTCP packets and packets from unbound
+/// SSRCs are forwarded immediately without buffering.
+#[derive(Interceptor)]
+pub struct JitterBufferInterceptor
{
+ #[next]
+ inner: P,
+
+ min_delay: Duration,
+ max_delay: Duration,
+ initial_delay: Duration,
+
+ /// Per-SSRC jitter buffer state, created in `bind_remote_stream`.
+ streams: HashMap,
+}
+
+#[interceptor]
+impl JitterBufferInterceptor {
+ /// Buffer incoming RTP for tracked SSRCs; pass everything else through immediately.
+ #[overrides]
+ fn handle_read(&mut self, msg: TaggedPacket) -> Result<(), Self::Error> {
+ if let Packet::Rtp(ref rtp) = msg.message {
+ if let Some(stream) = self.streams.get_mut(&rtp.header.ssrc) {
+ // insert() returns false only for already-released sequences; drop those.
+ stream.insert(msg.now, msg);
+ return Ok(());
+ }
+ }
+ // RTCP, or RTP from an unbound SSRC → forward without delay.
+ self.inner.handle_read(msg)
+ }
+
+ /// Flush ready buffered packets into the inner chain, then poll the inner chain.
+ #[overrides]
+ fn poll_read(&mut self) -> Option {
+ self.drain_ready(Instant::now());
+ self.inner.poll_read()
+ }
+
+ /// Drain ready packets on each timer tick so buffers don't stall between app polls.
+ #[overrides]
+ fn handle_timeout(&mut self, now: Self::Time) -> Result<(), Self::Error> {
+ self.drain_ready(now);
+ self.inner.handle_timeout(now)
+ }
+
+ /// Return the earliest scheduled release time so the driver wakes at the right moment.
+ #[overrides]
+ fn poll_timeout(&mut self) -> Option {
+ let buf_eto = self.streams.values().filter_map(|s| s.next_wake_time()).min();
+ let inner_eto = self.inner.poll_timeout();
+ match (buf_eto, inner_eto) {
+ (Some(a), Some(b)) => Some(a.min(b)),
+ (Some(a), None) => Some(a),
+ (None, b) => b,
+ }
+ }
+
+ /// Create a per-SSRC buffer when a remote stream is bound.
+ #[overrides]
+ fn bind_remote_stream(&mut self, info: &StreamInfo) {
+ let ext_id = info
+ .rtp_header_extensions
+ .iter()
+ .find(|e| e.uri == PLAYOUT_DELAY_URI)
+ .map(|e| e.id as u8);
+
+ self.streams.insert(
+ info.ssrc,
+ JitterBufferStream::new(
+ info.clock_rate,
+ ext_id,
+ self.initial_delay,
+ self.min_delay,
+ self.max_delay,
+ ),
+ );
+ self.inner.bind_remote_stream(info);
+ }
+
+ /// Drop the per-SSRC buffer when a remote stream is unbound.
+ #[overrides]
+ fn unbind_remote_stream(&mut self, info: &StreamInfo) {
+ self.streams.remove(&info.ssrc);
+ self.inner.unbind_remote_stream(info);
+ }
+}
+
+impl JitterBufferInterceptor {
+ /// Collect ready packets from all streams and inject them into the inner chain.
+ ///
+ /// We collect first to satisfy the borrow checker: `streams` and `inner`
+ /// are separate fields but both require `&mut self`.
+ fn drain_ready(&mut self, now: Instant) {
+ let mut ready = Vec::new();
+ for stream in self.streams.values_mut() {
+ while let Some(pkt) = stream.pop_ready(now) {
+ ready.push(pkt);
+ }
+ }
+ for pkt in ready {
+ if let Err(e) = self.inner.handle_read(pkt) {
+ error!("jitter_buffer: inner.handle_read error: {}", e);
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::stream_info::RTPHeaderExtension;
+ use crate::{Registry, stream_info::StreamInfo};
+ use sansio::Protocol;
+ use shared::{TransportContext, TransportMessage};
+
+ fn make_stream_info(ssrc: u32, clock_rate: u32) -> StreamInfo {
+ StreamInfo {
+ ssrc,
+ clock_rate,
+ ..Default::default()
+ }
+ }
+
+ fn make_rtp_at(ssrc: u32, seq: u16, ts: u32, now: Instant) -> TaggedPacket {
+ TransportMessage {
+ now,
+ transport: TransportContext::default(),
+ message: Packet::Rtp(rtp::Packet {
+ header: rtp::header::Header {
+ ssrc,
+ sequence_number: seq,
+ timestamp: ts,
+ ..Default::default()
+ },
+ ..Default::default()
+ }),
+ }
+ }
+
+ fn make_rtcp(ssrc: u32) -> TaggedPacket {
+ TransportMessage {
+ now: Instant::now(),
+ transport: TransportContext::default(),
+ message: Packet::Rtcp(vec![Box::new(
+ rtcp::receiver_report::ReceiverReport {
+ ssrc,
+ ..Default::default()
+ },
+ )]),
+ }
+ }
+
+ /// Build a chain with a short initial delay for testing.
+ fn make_chain(initial_ms: u64, max_ms: u64) -> impl Protocol
+ + crate::Interceptor
+ {
+ Registry::new()
+ .with(
+ JitterBufferBuilder::new()
+ .with_min_delay(Duration::from_millis(initial_ms))
+ .with_max_delay(Duration::from_millis(max_ms))
+ .with_initial_delay(Duration::from_millis(initial_ms))
+ .build(),
+ )
+ .build()
+ }
+
+ #[test]
+ fn test_in_order_packets_released_after_delay() {
+ let mut chain = make_chain(50, 500);
+ let ssrc = 1111;
+ chain.bind_remote_stream(&make_stream_info(ssrc, 90000));
+
+ let base = Instant::now();
+ for i in 0..3u16 {
+ chain
+ .handle_read(make_rtp_at(ssrc, i + 1, i as u32 * 3000, base))
+ .unwrap();
+ }
+
+ // Before delay has elapsed — nothing ready.
+ chain.handle_timeout(base).unwrap();
+ assert!(chain.poll_read().is_none());
+
+ // After delay has elapsed — packets should be available.
+ chain
+ .handle_timeout(base + Duration::from_millis(100))
+ .unwrap();
+ let mut released = 0u16;
+ while chain.poll_read().is_some() {
+ released += 1;
+ }
+ assert_eq!(released, 3);
+ }
+
+ #[test]
+ fn test_out_of_order_reordered() {
+ let mut chain = make_chain(50, 500);
+ let ssrc = 2222;
+ chain.bind_remote_stream(&make_stream_info(ssrc, 90000));
+
+ let base = Instant::now();
+ // Arrive as seq 1, 3, 2.
+ chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap();
+ chain.handle_read(make_rtp_at(ssrc, 3, 6000, base)).unwrap();
+ chain.handle_read(make_rtp_at(ssrc, 2, 3000, base)).unwrap();
+
+ // Release all after the delay.
+ chain
+ .handle_timeout(base + Duration::from_millis(100))
+ .unwrap();
+
+ let mut seqs = Vec::new();
+ while let Some(pkt) = chain.poll_read() {
+ if let Packet::Rtp(rtp) = pkt.message {
+ seqs.push(rtp.header.sequence_number);
+ }
+ }
+ // Must come out in sequence order.
+ assert_eq!(seqs, vec![1, 2, 3]);
+ }
+
+ #[test]
+ fn test_force_release_at_max_delay() {
+ let initial_ms = 50u64;
+ let max_ms = 200u64;
+ let mut chain = make_chain(initial_ms, max_ms);
+ let ssrc = 3333;
+ chain.bind_remote_stream(&make_stream_info(ssrc, 90000));
+
+ let base = Instant::now();
+ // Insert seq 1; seq 2 never arrives.
+ chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap();
+
+ // At max_delay + 1ms: seq 1 must be force-released even without seq 2.
+ let force_time = base + Duration::from_millis(max_ms + 1);
+ chain.handle_timeout(force_time).unwrap();
+ assert!(chain.poll_read().is_some(), "seq 1 should be force-released");
+ }
+
+ #[test]
+ fn test_rtcp_passes_through_immediately() {
+ let mut chain = make_chain(50, 500);
+ let ssrc = 4444;
+ chain.bind_remote_stream(&make_stream_info(ssrc, 90000));
+
+ chain.handle_read(make_rtcp(ssrc)).unwrap();
+ // RTCP bypasses the buffer and should be visible to the inner chain.
+ // (The noop inner doesn't surface it, but the call must not hang or panic.)
+ // Verify by checking that poll_read doesn't return a buffered item.
+ chain.handle_timeout(Instant::now()).unwrap();
+ assert!(chain.poll_read().is_none());
+ }
+
+ #[test]
+ fn test_unbind_clears_buffer() {
+ let initial_ms = 50u64;
+ let mut chain = make_chain(initial_ms, 500);
+ let ssrc = 5555;
+ let info = make_stream_info(ssrc, 90000);
+ chain.bind_remote_stream(&info);
+
+ let base = Instant::now();
+ chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap();
+
+ // Unbind before the delay expires.
+ chain.unbind_remote_stream(&info);
+
+ // After the delay, nothing is released (buffer was dropped).
+ chain
+ .handle_timeout(base + Duration::from_millis(100))
+ .unwrap();
+ assert!(chain.poll_read().is_none());
+ }
+
+ #[test]
+ fn test_unbound_ssrc_passes_through() {
+ let mut chain = make_chain(50, 500);
+ // Do NOT bind any stream.
+ let ssrc = 6666;
+ let base = Instant::now();
+
+ // Packet from an unbound SSRC must not be buffered — forwarded immediately.
+ chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap();
+ // handle_timeout at exactly base (no delay passed) — noop inner releases it.
+ chain.handle_timeout(base).unwrap();
+ // poll_read delegates to inner.poll_read (noop returns None), which is fine.
+ // The important thing is no panic and no indefinite buffering.
+ }
+
+ #[test]
+ fn test_poll_timeout_returns_buffer_wake_time() {
+ let initial_ms = 50u64;
+ let mut chain = make_chain(initial_ms, 500);
+ let ssrc = 7777;
+ chain.bind_remote_stream(&make_stream_info(ssrc, 90000));
+
+ let base = Instant::now();
+ chain.handle_read(make_rtp_at(ssrc, 1, 0, base)).unwrap();
+
+ let wake = chain.poll_timeout();
+ assert!(wake.is_some(), "should have a wake time after buffering a packet");
+ // Wake time should be approximately base + initial_delay.
+ let wake = wake.unwrap();
+ assert!(wake > base, "wake time must be in the future");
+ assert!(
+ wake <= base + Duration::from_millis(initial_ms + 10),
+ "wake time should be close to initial_delay"
+ );
+ }
+}
diff --git a/rtc-interceptor/src/jitter_buffer/stream.rs b/rtc-interceptor/src/jitter_buffer/stream.rs
new file mode 100644
index 00000000..6081bbaf
--- /dev/null
+++ b/rtc-interceptor/src/jitter_buffer/stream.rs
@@ -0,0 +1,318 @@
+use crate::{Packet, TaggedPacket};
+use std::collections::VecDeque;
+use std::time::{Duration, Instant};
+
+/// RTP header extension URI for the playout-delay extension.
+///
+pub(crate) const PLAYOUT_DELAY_URI: &str =
+ "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay";
+
+/// Per-SSRC jitter buffer state.
+///
+/// Buffers incoming RTP packets in sequence order and releases them after
+/// an adaptive playout delay computed from the RFC 3550 §A.8 jitter formula.
+pub(crate) struct JitterBufferStream {
+ /// RTP clock rate (e.g. 90 000 for video, 48 000 for Opus audio).
+ clock_rate: u32,
+ /// One-byte header-extension ID for playout-delay, if negotiated.
+ playout_delay_ext_id: Option,
+ /// Sorted packet buffer: (seq, arrival_time, scheduled_release, packet).
+ buffer: VecDeque<(u16, Instant, Instant, TaggedPacket)>,
+ /// Last sequence number released to the application (guards against late arrivals).
+ last_released: Option,
+ // --- RFC 3550 §A.8 adaptive delay state ---
+ last_rtp_ts: u32,
+ last_arrival: Instant,
+ jitter: f64, // running estimate in RTP clock units
+ started: bool,
+ // --- Configuration ---
+ pub(crate) target_delay: Duration,
+ min_delay: Duration,
+ max_delay: Duration,
+}
+
+impl JitterBufferStream {
+ pub(crate) fn new(
+ clock_rate: u32,
+ playout_delay_ext_id: Option,
+ initial_delay: Duration,
+ min_delay: Duration,
+ max_delay: Duration,
+ ) -> Self {
+ Self {
+ clock_rate,
+ playout_delay_ext_id,
+ buffer: VecDeque::new(),
+ last_released: None,
+ last_rtp_ts: 0,
+ last_arrival: Instant::now(),
+ jitter: 0.0,
+ started: false,
+ target_delay: initial_delay,
+ min_delay,
+ max_delay,
+ }
+ }
+
+ /// Returns `true` if sequence number `a` is strictly after `b` under u16 wrapping.
+ #[inline]
+ fn seq_is_after(a: u16, b: u16) -> bool {
+ a != b && a.wrapping_sub(b) < 0x8000
+ }
+
+ /// Update the jitter estimate from a new packet and compute its scheduled release time.
+ ///
+ /// Jitter is only updated for packets that advance the RTP timestamp (i.e. the RTP
+ /// timestamp difference is in the forward half of the u32 space, matching the same
+ /// wrapping arithmetic used for sequence numbers). Out-of-order or duplicate
+ /// timestamps are accepted into the buffer but do not corrupt the jitter estimate.
+ fn compute_release(&mut self, now: Instant, rtp_ts: u32) -> Instant {
+ if self.started {
+ let rtp_diff = rtp_ts.wrapping_sub(self.last_rtp_ts);
+ // Only update for forward-advancing RTP timestamps (rtp_diff in (0, 2^31)).
+ if rtp_diff > 0 && rtp_diff < 0x8000_0000 {
+ let arrival_diff = now.duration_since(self.last_arrival).as_secs_f64();
+ let d = (arrival_diff * self.clock_rate as f64 - rtp_diff as f64).abs();
+ // RFC 3550 §A.8: J(i) = J(i-1) + (|D(i,i-1)| - J(i-1)) / 16
+ self.jitter += (d - self.jitter) / 16.0;
+
+ // target = clamp(jitter_seconds × 3, min, max)
+ let jitter_secs = self.jitter / self.clock_rate as f64 * 3.0;
+ self.target_delay = Duration::from_secs_f64(jitter_secs)
+ .max(self.min_delay)
+ .min(self.max_delay);
+
+ self.last_rtp_ts = rtp_ts;
+ self.last_arrival = now;
+ }
+ } else {
+ self.started = true;
+ self.last_rtp_ts = rtp_ts;
+ self.last_arrival = now;
+ }
+ now + self.target_delay
+ }
+
+ /// Parse a playout-delay RTP extension (3 bytes, 12-bit min + 12-bit max in 10 ms units).
+ fn parse_playout_delay(data: &[u8]) -> Option<(Duration, Duration)> {
+ if data.len() < 3 {
+ return None;
+ }
+ let min_raw = ((data[0] as u16) << 4) | ((data[1] as u16) >> 4);
+ let max_raw = (((data[1] as u16) & 0x0F) << 8) | (data[2] as u16);
+ Some((
+ Duration::from_millis(min_raw as u64 * 10),
+ Duration::from_millis(max_raw as u64 * 10),
+ ))
+ }
+
+ /// Insert a packet into the buffer in sequence order.
+ ///
+ /// Returns `false` if the packet is a late duplicate (already past `last_released`).
+ pub(crate) fn insert(&mut self, now: Instant, pkt: TaggedPacket) -> bool {
+ let (seq, rtp_ts) = match &pkt.message {
+ Packet::Rtp(rtp) => (rtp.header.sequence_number, rtp.header.timestamp),
+ _ => return false,
+ };
+
+ // Reject packets at or before the last released sequence.
+ if let Some(last) = self.last_released {
+ if !Self::seq_is_after(seq, last) {
+ return false;
+ }
+ }
+
+ // Apply playout-delay extension hints from the sender.
+ if let (Packet::Rtp(rtp), Some(ext_id)) = (&pkt.message, self.playout_delay_ext_id) {
+ if let Some(ext_bytes) = rtp.header.get_extension(ext_id) {
+ if let Some((sender_min, sender_max)) =
+ Self::parse_playout_delay(ext_bytes.as_ref())
+ {
+ // Sender's min raises our floor; sender's max lowers our ceiling.
+ self.min_delay = self.min_delay.max(sender_min);
+ self.max_delay = self.max_delay.min(sender_max.max(self.min_delay));
+ self.target_delay = self.target_delay.max(sender_min).min(self.max_delay);
+ }
+ }
+ }
+
+ let release = self.compute_release(now, rtp_ts);
+
+ // Insert at the correct sorted position (ascending sequence order).
+ let pos = self
+ .buffer
+ .iter()
+ .position(|(s, _, _, _)| Self::seq_is_after(*s, seq))
+ .unwrap_or(self.buffer.len());
+ self.buffer.insert(pos, (seq, now, release, pkt));
+ true
+ }
+
+ /// Return the head packet if it is ready for release, or `None` if not yet.
+ ///
+ /// A packet is ready when `now >= release_time` or it has been held for `>= max_delay`.
+ pub(crate) fn pop_ready(&mut self, now: Instant) -> Option {
+ if let Some(&(_, arrival, release, _)) = self.buffer.front() {
+ let ready = now >= release || now.duration_since(arrival) >= self.max_delay;
+ if ready {
+ let (seq, _, _, pkt) = self.buffer.pop_front().unwrap();
+ self.last_released = Some(seq);
+ return Some(pkt);
+ }
+ }
+ None
+ }
+
+ /// Earliest instant at which the driver should wake up to service this stream.
+ pub(crate) fn next_wake_time(&self) -> Option {
+ self.buffer.front().map(|(_, arrival, release, _)| {
+ let force_release = *arrival + self.max_delay;
+ (*release).min(force_release)
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use shared::{TransportContext, TransportMessage};
+
+ fn make_rtp(ssrc: u32, seq: u16, ts: u32) -> TaggedPacket {
+ TransportMessage {
+ now: Instant::now(),
+ transport: TransportContext::default(),
+ message: Packet::Rtp(rtp::Packet {
+ header: rtp::header::Header {
+ ssrc,
+ sequence_number: seq,
+ timestamp: ts,
+ ..Default::default()
+ },
+ ..Default::default()
+ }),
+ }
+ }
+
+ #[test]
+ fn test_seq_is_after() {
+ assert!(JitterBufferStream::seq_is_after(1, 0));
+ assert!(JitterBufferStream::seq_is_after(100, 99));
+ assert!(!JitterBufferStream::seq_is_after(0, 1));
+ // wraparound: 0 is after 0xffff
+ assert!(JitterBufferStream::seq_is_after(0, 0xffff));
+ // equal is not "after"
+ assert!(!JitterBufferStream::seq_is_after(5, 5));
+ }
+
+ #[test]
+ fn test_insert_in_order() {
+ let delay = Duration::from_millis(50);
+ let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1));
+ let now = Instant::now();
+ s.insert(now, make_rtp(1, 1, 0));
+ s.insert(now, make_rtp(1, 2, 900));
+ s.insert(now, make_rtp(1, 3, 1800));
+ assert_eq!(s.buffer.len(), 3);
+ assert_eq!(s.buffer[0].0, 1);
+ assert_eq!(s.buffer[1].0, 2);
+ assert_eq!(s.buffer[2].0, 3);
+ }
+
+ #[test]
+ fn test_insert_out_of_order() {
+ let delay = Duration::from_millis(50);
+ let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1));
+ let now = Instant::now();
+ s.insert(now, make_rtp(1, 1, 0));
+ s.insert(now, make_rtp(1, 3, 1800));
+ s.insert(now, make_rtp(1, 2, 900)); // late, but within window
+ assert_eq!(s.buffer.len(), 3);
+ assert_eq!(s.buffer[0].0, 1);
+ assert_eq!(s.buffer[1].0, 2); // reordered into correct position
+ assert_eq!(s.buffer[2].0, 3);
+ }
+
+ #[test]
+ fn test_pop_ready_not_yet() {
+ let delay = Duration::from_millis(50);
+ let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1));
+ let now = Instant::now();
+ s.insert(now, make_rtp(1, 1, 0));
+ // Just after insertion — release time hasn't passed yet.
+ assert!(s.pop_ready(now).is_none());
+ }
+
+ #[test]
+ fn test_pop_ready_after_delay() {
+ let delay = Duration::from_millis(50);
+ let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1));
+ let now = Instant::now();
+ s.insert(now, make_rtp(1, 1, 0));
+ let later = now + Duration::from_millis(100);
+ let pkt = s.pop_ready(later);
+ assert!(pkt.is_some());
+ assert!(s.buffer.is_empty());
+ }
+
+ #[test]
+ fn test_force_release_at_max_delay() {
+ let delay = Duration::from_millis(50);
+ let max = Duration::from_millis(200);
+ let mut s = JitterBufferStream::new(90000, None, delay, delay, max);
+ let now = Instant::now();
+ s.insert(now, make_rtp(1, 1, 0));
+ // Simulate a very late pop — past max_delay.
+ let very_late = now + max + Duration::from_millis(1);
+ assert!(s.pop_ready(very_late).is_some());
+ }
+
+ #[test]
+ fn test_late_arrival_rejected() {
+ let delay = Duration::from_millis(50);
+ let mut s = JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(1));
+ let now = Instant::now();
+ s.insert(now, make_rtp(1, 5, 0));
+ // Release seq 5.
+ s.pop_ready(now + Duration::from_millis(100));
+ // seq 4 (before released seq 5) should be rejected.
+ let accepted = s.insert(now + Duration::from_millis(200), make_rtp(1, 4, 0));
+ assert!(!accepted);
+ }
+
+ #[test]
+ fn test_jitter_adapts_target_delay() {
+ let delay = Duration::from_millis(20);
+ let mut s =
+ JitterBufferStream::new(90000, None, delay, delay, Duration::from_secs(2));
+ let base = Instant::now();
+ // Feed packets with increasing inter-arrival variation to grow jitter.
+ // RTP timestamps advance at 90 kHz rate but arrival times vary significantly.
+ for i in 0u32..20 {
+ // Nominal arrival every 33ms (30fps), but with 50ms extra jitter on even packets.
+ let arrival = base
+ + Duration::from_millis(33 * i as u64)
+ + if i % 2 == 0 { Duration::from_millis(50) } else { Duration::ZERO };
+ let ts = i * 3000; // 90kHz / 30fps = 3000 units per frame
+ s.insert(arrival, make_rtp(1, i as u16 + 1, ts));
+ }
+ // After significant jitter, target_delay should be above initial 20ms.
+ assert!(
+ s.target_delay > delay,
+ "target_delay {:?} should have grown above {:?}",
+ s.target_delay,
+ delay
+ );
+ }
+
+ #[test]
+ fn test_next_wake_time_is_min_of_release_and_force() {
+ let delay = Duration::from_millis(50);
+ let max = Duration::from_millis(200);
+ let mut s = JitterBufferStream::new(90000, None, delay, delay, max);
+ let now = Instant::now();
+ s.insert(now, make_rtp(1, 1, 0));
+ let wake = s.next_wake_time().expect("should have a wake time");
+ // Wake time should be <= arrival + max_delay
+ assert!(wake <= now + max + Duration::from_millis(1));
+ }
+}
diff --git a/rtc-interceptor/src/lib.rs b/rtc-interceptor/src/lib.rs
index 44e74561..6579abf7 100644
--- a/rtc-interceptor/src/lib.rs
+++ b/rtc-interceptor/src/lib.rs
@@ -180,11 +180,13 @@ use std::time::Instant;
mod noop;
mod registry;
+pub(crate) mod jitter_buffer;
pub(crate) mod nack;
pub(crate) mod report;
pub(crate) mod stream_info;
pub(crate) mod twcc;
+pub use jitter_buffer::{JitterBufferBuilder, JitterBufferInterceptor};
pub use nack::{
generator::{NackGeneratorBuilder, NackGeneratorInterceptor},
responder::{NackResponderBuilder, NackResponderInterceptor},