Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions crates/nodes/src/containers/ogg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ impl Default for OggMuxerConfig {
/// A node that muxes compressed packets (like Opus) into an Ogg container stream.
pub struct OggMuxerNode {
config: OggMuxerConfig,
is_first_packet: bool,
}

impl OggMuxerNode {
pub const fn new(config: OggMuxerConfig) -> Self {
Self { config, is_first_packet: true }
Self { config }
}
}

Expand Down Expand Up @@ -222,9 +221,6 @@ impl ProcessorNode for OggMuxerNode {
// Force every packet to end a page for maximum streaming behavior.
// This allows chunk_size to work as expected by ensuring
// the buffer fills up regularly. Trade-off: slightly higher OGG overhead.
if self.is_first_packet {
self.is_first_packet = false;
}
let pck_info = PacketWriteEndInfo::EndPage;

// Calculate granule position from metadata if available, otherwise use packet count
Expand Down
236 changes: 230 additions & 6 deletions crates/nodes/src/containers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#![allow(clippy::unwrap_used, clippy::expect_used, clippy::disallowed_macros)]

use super::ogg::{OggDemuxerConfig, OggDemuxerNode, OggMuxerConfig, OggMuxerNode};
use super::webm::{WebMMuxerConfig, WebMMuxerNode};
use super::webm::{WebMMuxerConfig, WebMMuxerNode, WebMStreamingMode};
use crate::test_utils::{
assert_state_initializing, assert_state_running, assert_state_stopped,
create_test_binary_packet, create_test_context,
Expand Down Expand Up @@ -419,11 +419,9 @@ async fn test_webm_sliding_window() {
}),
);

// Create config with smaller chunk size for testing
let config = WebMMuxerConfig {
chunk_size: 1024, // Small chunk size to force frequent flushes
..Default::default()
};
// Create config (chunk_size was removed — the default streaming mode
// flushes incrementally on every frame write).
let config = WebMMuxerConfig::default();
let node = WebMMuxerNode::new(config);

let node_handle = tokio::spawn(async move { Box::new(node).run(context).await });
Expand Down Expand Up @@ -845,3 +843,229 @@ async fn test_webm_mux_vp9_auto_detect_dimensions() {
webm_bytes.len()
);
}

/// Test that WebM muxer works in File mode (seekable temp-file backed).
/// File mode produces a single output packet after finalization with full
/// duration and seeking info.
#[cfg(feature = "vp9")]
#[tokio::test]
async fn test_webm_mux_file_mode() {
use crate::test_utils::create_test_video_frame;
use crate::video::vp9::{Vp9EncoderConfig, Vp9EncoderNode};
use streamkit_core::types::{EncodedVideoFormat, PacketMetadata, PixelFormat, VideoCodec};

// ---- Step 1: Encode raw I420 frames to VP9 ----

let (enc_input_tx, enc_input_rx) = mpsc::channel(10);
let mut enc_inputs = HashMap::new();
enc_inputs.insert("in".to_string(), enc_input_rx);

let (enc_context, enc_sender, mut enc_state_rx) = create_test_context(enc_inputs, 10);
let encoder_config = Vp9EncoderConfig {
keyframe_interval: 1,
bitrate_kbps: 800,
threads: 1,
..Default::default()
};
let encoder = match Vp9EncoderNode::new(encoder_config) {
Ok(enc) => enc,
Err(e) => {
eprintln!("Skipping VP9 File mode mux test: encoder not available ({e})");
return;
},
};
let enc_handle = tokio::spawn(async move { Box::new(encoder).run(enc_context).await });

assert_state_initializing(&mut enc_state_rx).await;
assert_state_running(&mut enc_state_rx).await;

let frame_count = 5u64;
for i in 0..frame_count {
let mut frame = create_test_video_frame(64, 64, PixelFormat::I420, 16);
frame.metadata = Some(PacketMetadata {
timestamp_us: Some(i * 33_333),
duration_us: Some(33_333),
sequence: Some(i),
keyframe: Some(true),
});
enc_input_tx.send(Packet::Video(frame)).await.unwrap();
}
drop(enc_input_tx);

assert_state_stopped(&mut enc_state_rx).await;
enc_handle.await.unwrap().unwrap();

let encoded_packets = enc_sender.get_packets_for_pin("out").await;
assert!(!encoded_packets.is_empty(), "VP9 encoder produced no packets");

// ---- Step 2: Mux in File mode ----

let (mux_video_tx, mux_video_rx) = mpsc::channel(10);
let mut mux_inputs = HashMap::new();
mux_inputs.insert("in".to_string(), mux_video_rx);

let (mut mux_context, mux_sender, mut mux_state_rx) = create_test_context(mux_inputs, 10);
mux_context.input_types.insert(
"in".to_string(),
PacketType::EncodedVideo(EncodedVideoFormat {
codec: VideoCodec::Vp9,
bitstream_format: None,
codec_private: None,
profile: None,
level: None,
}),
);

let mux_config = WebMMuxerConfig {
video_width: 64,
video_height: 64,
streaming_mode: WebMStreamingMode::File,
..WebMMuxerConfig::default()
};
let muxer = WebMMuxerNode::new(mux_config);
let mux_handle = tokio::spawn(async move { Box::new(muxer).run(mux_context).await });

assert_state_initializing(&mut mux_state_rx).await;
assert_state_running(&mut mux_state_rx).await;

for packet in encoded_packets {
mux_video_tx.send(packet).await.unwrap();
}
drop(mux_video_tx);

assert_state_stopped(&mut mux_state_rx).await;
mux_handle.await.unwrap().unwrap();

// ---- Step 3: Validate File mode output ----

let output_packets = mux_sender.get_packets_for_pin("out").await;
// File mode emits a single packet after finalization
assert!(!output_packets.is_empty(), "WebM File mode muxer produced no output");

let mut webm_bytes = Vec::new();
for packet in &output_packets {
if let Packet::Binary { data, .. } = packet {
webm_bytes.extend_from_slice(data);
}
}

assert!(webm_bytes.len() >= 4, "WebM File mode output too small");
assert_eq!(
&webm_bytes[..4],
&[0x1A, 0x45, 0xDF, 0xA3],
"WebM File mode output does not start with EBML header"
);

println!(
"WebM File mode mux test passed: {} output packets, {} total bytes",
output_packets.len(),
webm_bytes.len()
);
}

/// Test muxer behaviour when the first video packet is not a keyframe
/// (e.g. truncated or non-keyframe VP9 data).
#[tokio::test]
async fn test_webm_mux_non_keyframe_first_video() {
use streamkit_core::types::{EncodedVideoFormat, PacketMetadata, VideoCodec};

let (mux_video_tx, mux_video_rx) = mpsc::channel(10);
let mut mux_inputs = HashMap::new();
mux_inputs.insert("in".to_string(), mux_video_rx);

let (mut mux_context, _mux_sender, mut mux_state_rx) = create_test_context(mux_inputs, 10);
mux_context.input_types.insert(
"in".to_string(),
PacketType::EncodedVideo(EncodedVideoFormat {
codec: VideoCodec::Vp9,
bitstream_format: None,
codec_private: None,
profile: None,
level: None,
}),
);

// video_width/height = 0 means auto-detect from first keyframe.
// Sending non-keyframe data first should not panic.
let mux_config =
WebMMuxerConfig { video_width: 0, video_height: 0, ..WebMMuxerConfig::default() };
let muxer = WebMMuxerNode::new(mux_config);
let mux_handle = tokio::spawn(async move { Box::new(muxer).run(mux_context).await });

assert_state_initializing(&mut mux_state_rx).await;
assert_state_running(&mut mux_state_rx).await;

// Send a small non-keyframe packet (random bytes, not a valid VP9 keyframe).
// The muxer should handle this gracefully (skip or error, not panic).
let non_kf = Packet::Binary {
data: Bytes::from_static(&[0x00, 0x01, 0x02, 0x03]),
content_type: None,
metadata: Some(PacketMetadata {
timestamp_us: Some(0),
duration_us: Some(33_333),
sequence: Some(0),
keyframe: Some(false),
}),
};
let _ = mux_video_tx.send(non_kf).await;

// Close the channel — the muxer should finish without panicking.
drop(mux_video_tx);

let result = mux_handle.await.unwrap();
// The muxer may return Ok or Err depending on whether it waits
// for a keyframe forever vs. giving up, but it should not panic.
let _ = result;
println!("WebM non-keyframe first video test passed (no panic)");
}

/// Test that sending truncated/corrupt VP9 data does not panic the muxer.
#[tokio::test]
async fn test_webm_mux_truncated_vp9_header() {
use streamkit_core::types::{EncodedVideoFormat, PacketMetadata, VideoCodec};

let (mux_video_tx, mux_video_rx) = mpsc::channel(10);
let mut mux_inputs = HashMap::new();
mux_inputs.insert("in".to_string(), mux_video_rx);

let (mut mux_context, _mux_sender, mut mux_state_rx) = create_test_context(mux_inputs, 10);
mux_context.input_types.insert(
"in".to_string(),
PacketType::EncodedVideo(EncodedVideoFormat {
codec: VideoCodec::Vp9,
bitstream_format: None,
codec_private: None,
profile: None,
level: None,
}),
);

// Auto-detect mode — send corrupt VP9 data
let mux_config =
WebMMuxerConfig { video_width: 0, video_height: 0, ..WebMMuxerConfig::default() };
let muxer = WebMMuxerNode::new(mux_config);
let mux_handle = tokio::spawn(async move { Box::new(muxer).run(mux_context).await });

assert_state_initializing(&mut mux_state_rx).await;
assert_state_running(&mut mux_state_rx).await;

// Send a packet flagged as keyframe but with truncated/corrupt VP9 data
// (too short for `parse_vp9_keyframe_dimensions` to extract dimensions).
let truncated = Packet::Binary {
data: Bytes::from_static(&[0x82, 0x49, 0x83]), // partial sync code
content_type: None,
metadata: Some(PacketMetadata {
timestamp_us: Some(0),
duration_us: Some(33_333),
sequence: Some(0),
keyframe: Some(true),
}),
};
let _ = mux_video_tx.send(truncated).await;
drop(mux_video_tx);

let result = mux_handle.await.unwrap();
// Should not panic; may return an error about dimension detection.
let _ = result;
println!("WebM truncated VP9 header test passed (no panic)");
}
Loading