Skip to content

Commit 8982356

Browse files
fix: implement review findings for VP9, colorbars & WebM
Implements all 13 actionable findings from the video feature review (finding #11 skipped — would require core PixelFormat serde changes): WebM muxer (webm.rs): - Add shutdown/cancellation handling to the receive loop via tokio::select! on context.control_rx, matching the pattern used by the OGG muxer and colorbars node (fix #1, important) - Remove dead chunk_size config field and DEFAULT_CHUNK_SIZE constant; update test that referenced it (fix #2, important) - Make Seek on Live MuxBuffer return io::Error(Unsupported) instead of warn-and-clamp to fail fast on unexpected seek calls (fix #3, important) - Add comment noting VP9 CodecPrivate constants must stay in sync with encoder config in video/mod.rs (fix #4, important) - Make OpusHead pre_skip configurable via WebMMuxerConfig::opus_preskip_samples instead of always using the hardcoded constant (fix #6, minor) - Group mux_frame loose parameters into MuxState struct (fix #12, nit) - Fix BitReader::read() doc comment range 1..=16 → 1..=32 (fix #14, nit) VP9 codec (vp9.rs): - Add startup-time ABI assertion verifying vpx_codec_vp9_cx/dx return non-null VP9 interfaces (fix #5, minor) Colorbars (colorbars.rs): - Add draw_time_use_pts config option to stamp PTS instead of wall-clock time, more useful for A/V timing debugging (fix #7, minor) - Document studio-range assumption in SMPTE bar YUV table comment with note explaining why white Y=180 (fix #13, nit) OGG muxer (ogg.rs): - Remove dead is_first_packet field and its no-op toggle (fix #10, minor) Tests (tests.rs): - Add File mode (WebMStreamingMode::File) test exercising the seekable temp-file code path (fix #8, minor) - Add edge-case tests: non-keyframe first video packet and truncated/ corrupt VP9 header — verify no panics (fix #9, minor) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Signed-off-by: bot_apk <apk@cognition.ai> Co-Authored-By: Staging-Devin AI <166158716+staging-devin-ai-integration[bot]@users.noreply.github.com>
1 parent c9c6942 commit 8982356

File tree

5 files changed

+439
-102
lines changed

5 files changed

+439
-102
lines changed

crates/nodes/src/containers/ogg.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,11 @@ impl Default for OggMuxerConfig {
8787
/// A node that muxes compressed packets (like Opus) into an Ogg container stream.
8888
pub struct OggMuxerNode {
8989
config: OggMuxerConfig,
90-
is_first_packet: bool,
9190
}
9291

9392
impl OggMuxerNode {
9493
pub const fn new(config: OggMuxerConfig) -> Self {
95-
Self { config, is_first_packet: true }
94+
Self { config }
9695
}
9796
}
9897

@@ -222,9 +221,6 @@ impl ProcessorNode for OggMuxerNode {
222221
// Force every packet to end a page for maximum streaming behavior.
223222
// This allows chunk_size to work as expected by ensuring
224223
// the buffer fills up regularly. Trade-off: slightly higher OGG overhead.
225-
if self.is_first_packet {
226-
self.is_first_packet = false;
227-
}
228224
let pck_info = PacketWriteEndInfo::EndPage;
229225

230226
// Calculate granule position from metadata if available, otherwise use packet count

crates/nodes/src/containers/tests.rs

Lines changed: 230 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#![allow(clippy::unwrap_used, clippy::expect_used, clippy::disallowed_macros)]
88

99
use super::ogg::{OggDemuxerConfig, OggDemuxerNode, OggMuxerConfig, OggMuxerNode};
10-
use super::webm::{WebMMuxerConfig, WebMMuxerNode};
10+
use super::webm::{WebMMuxerConfig, WebMMuxerNode, WebMStreamingMode};
1111
use crate::test_utils::{
1212
assert_state_initializing, assert_state_running, assert_state_stopped,
1313
create_test_binary_packet, create_test_context,
@@ -419,11 +419,9 @@ async fn test_webm_sliding_window() {
419419
}),
420420
);
421421

422-
// Create config with smaller chunk size for testing
423-
let config = WebMMuxerConfig {
424-
chunk_size: 1024, // Small chunk size to force frequent flushes
425-
..Default::default()
426-
};
422+
// Create config (chunk_size was removed — the default streaming mode
423+
// flushes incrementally on every frame write).
424+
let config = WebMMuxerConfig::default();
427425
let node = WebMMuxerNode::new(config);
428426

429427
let node_handle = tokio::spawn(async move { Box::new(node).run(context).await });
@@ -845,3 +843,229 @@ async fn test_webm_mux_vp9_auto_detect_dimensions() {
845843
webm_bytes.len()
846844
);
847845
}
846+
847+
/// Test that WebM muxer works in File mode (seekable temp-file backed).
848+
/// File mode produces a single output packet after finalization with full
849+
/// duration and seeking info.
850+
#[cfg(feature = "vp9")]
851+
#[tokio::test]
852+
async fn test_webm_mux_file_mode() {
853+
use crate::test_utils::create_test_video_frame;
854+
use crate::video::vp9::{Vp9EncoderConfig, Vp9EncoderNode};
855+
use streamkit_core::types::{EncodedVideoFormat, PacketMetadata, PixelFormat, VideoCodec};
856+
857+
// ---- Step 1: Encode raw I420 frames to VP9 ----
858+
859+
let (enc_input_tx, enc_input_rx) = mpsc::channel(10);
860+
let mut enc_inputs = HashMap::new();
861+
enc_inputs.insert("in".to_string(), enc_input_rx);
862+
863+
let (enc_context, enc_sender, mut enc_state_rx) = create_test_context(enc_inputs, 10);
864+
let encoder_config = Vp9EncoderConfig {
865+
keyframe_interval: 1,
866+
bitrate_kbps: 800,
867+
threads: 1,
868+
..Default::default()
869+
};
870+
let encoder = match Vp9EncoderNode::new(encoder_config) {
871+
Ok(enc) => enc,
872+
Err(e) => {
873+
eprintln!("Skipping VP9 File mode mux test: encoder not available ({e})");
874+
return;
875+
},
876+
};
877+
let enc_handle = tokio::spawn(async move { Box::new(encoder).run(enc_context).await });
878+
879+
assert_state_initializing(&mut enc_state_rx).await;
880+
assert_state_running(&mut enc_state_rx).await;
881+
882+
let frame_count = 5u64;
883+
for i in 0..frame_count {
884+
let mut frame = create_test_video_frame(64, 64, PixelFormat::I420, 16);
885+
frame.metadata = Some(PacketMetadata {
886+
timestamp_us: Some(i * 33_333),
887+
duration_us: Some(33_333),
888+
sequence: Some(i),
889+
keyframe: Some(true),
890+
});
891+
enc_input_tx.send(Packet::Video(frame)).await.unwrap();
892+
}
893+
drop(enc_input_tx);
894+
895+
assert_state_stopped(&mut enc_state_rx).await;
896+
enc_handle.await.unwrap().unwrap();
897+
898+
let encoded_packets = enc_sender.get_packets_for_pin("out").await;
899+
assert!(!encoded_packets.is_empty(), "VP9 encoder produced no packets");
900+
901+
// ---- Step 2: Mux in File mode ----
902+
903+
let (mux_video_tx, mux_video_rx) = mpsc::channel(10);
904+
let mut mux_inputs = HashMap::new();
905+
mux_inputs.insert("in".to_string(), mux_video_rx);
906+
907+
let (mut mux_context, mux_sender, mut mux_state_rx) = create_test_context(mux_inputs, 10);
908+
mux_context.input_types.insert(
909+
"in".to_string(),
910+
PacketType::EncodedVideo(EncodedVideoFormat {
911+
codec: VideoCodec::Vp9,
912+
bitstream_format: None,
913+
codec_private: None,
914+
profile: None,
915+
level: None,
916+
}),
917+
);
918+
919+
let mux_config = WebMMuxerConfig {
920+
video_width: 64,
921+
video_height: 64,
922+
streaming_mode: WebMStreamingMode::File,
923+
..WebMMuxerConfig::default()
924+
};
925+
let muxer = WebMMuxerNode::new(mux_config);
926+
let mux_handle = tokio::spawn(async move { Box::new(muxer).run(mux_context).await });
927+
928+
assert_state_initializing(&mut mux_state_rx).await;
929+
assert_state_running(&mut mux_state_rx).await;
930+
931+
for packet in encoded_packets {
932+
mux_video_tx.send(packet).await.unwrap();
933+
}
934+
drop(mux_video_tx);
935+
936+
assert_state_stopped(&mut mux_state_rx).await;
937+
mux_handle.await.unwrap().unwrap();
938+
939+
// ---- Step 3: Validate File mode output ----
940+
941+
let output_packets = mux_sender.get_packets_for_pin("out").await;
942+
// File mode emits a single packet after finalization
943+
assert!(!output_packets.is_empty(), "WebM File mode muxer produced no output");
944+
945+
let mut webm_bytes = Vec::new();
946+
for packet in &output_packets {
947+
if let Packet::Binary { data, .. } = packet {
948+
webm_bytes.extend_from_slice(data);
949+
}
950+
}
951+
952+
assert!(webm_bytes.len() >= 4, "WebM File mode output too small");
953+
assert_eq!(
954+
&webm_bytes[..4],
955+
&[0x1A, 0x45, 0xDF, 0xA3],
956+
"WebM File mode output does not start with EBML header"
957+
);
958+
959+
println!(
960+
"WebM File mode mux test passed: {} output packets, {} total bytes",
961+
output_packets.len(),
962+
webm_bytes.len()
963+
);
964+
}
965+
966+
/// Test muxer behaviour when the first video packet is not a keyframe
967+
/// (e.g. truncated or non-keyframe VP9 data).
968+
#[tokio::test]
969+
async fn test_webm_mux_non_keyframe_first_video() {
970+
use streamkit_core::types::{EncodedVideoFormat, PacketMetadata, VideoCodec};
971+
972+
let (mux_video_tx, mux_video_rx) = mpsc::channel(10);
973+
let mut mux_inputs = HashMap::new();
974+
mux_inputs.insert("in".to_string(), mux_video_rx);
975+
976+
let (mut mux_context, _mux_sender, mut mux_state_rx) = create_test_context(mux_inputs, 10);
977+
mux_context.input_types.insert(
978+
"in".to_string(),
979+
PacketType::EncodedVideo(EncodedVideoFormat {
980+
codec: VideoCodec::Vp9,
981+
bitstream_format: None,
982+
codec_private: None,
983+
profile: None,
984+
level: None,
985+
}),
986+
);
987+
988+
// video_width/height = 0 means auto-detect from first keyframe.
989+
// Sending non-keyframe data first should not panic.
990+
let mux_config =
991+
WebMMuxerConfig { video_width: 0, video_height: 0, ..WebMMuxerConfig::default() };
992+
let muxer = WebMMuxerNode::new(mux_config);
993+
let mux_handle = tokio::spawn(async move { Box::new(muxer).run(mux_context).await });
994+
995+
assert_state_initializing(&mut mux_state_rx).await;
996+
assert_state_running(&mut mux_state_rx).await;
997+
998+
// Send a small non-keyframe packet (random bytes, not a valid VP9 keyframe).
999+
// The muxer should handle this gracefully (skip or error, not panic).
1000+
let non_kf = Packet::Binary {
1001+
data: Bytes::from_static(&[0x00, 0x01, 0x02, 0x03]),
1002+
content_type: None,
1003+
metadata: Some(PacketMetadata {
1004+
timestamp_us: Some(0),
1005+
duration_us: Some(33_333),
1006+
sequence: Some(0),
1007+
keyframe: Some(false),
1008+
}),
1009+
};
1010+
let _ = mux_video_tx.send(non_kf).await;
1011+
1012+
// Close the channel — the muxer should finish without panicking.
1013+
drop(mux_video_tx);
1014+
1015+
let result = mux_handle.await.unwrap();
1016+
// The muxer may return Ok or Err depending on whether it waits
1017+
// for a keyframe forever vs. giving up, but it should not panic.
1018+
let _ = result;
1019+
println!("WebM non-keyframe first video test passed (no panic)");
1020+
}
1021+
1022+
/// Test that sending truncated/corrupt VP9 data does not panic the muxer.
1023+
#[tokio::test]
1024+
async fn test_webm_mux_truncated_vp9_header() {
1025+
use streamkit_core::types::{EncodedVideoFormat, PacketMetadata, VideoCodec};
1026+
1027+
let (mux_video_tx, mux_video_rx) = mpsc::channel(10);
1028+
let mut mux_inputs = HashMap::new();
1029+
mux_inputs.insert("in".to_string(), mux_video_rx);
1030+
1031+
let (mut mux_context, _mux_sender, mut mux_state_rx) = create_test_context(mux_inputs, 10);
1032+
mux_context.input_types.insert(
1033+
"in".to_string(),
1034+
PacketType::EncodedVideo(EncodedVideoFormat {
1035+
codec: VideoCodec::Vp9,
1036+
bitstream_format: None,
1037+
codec_private: None,
1038+
profile: None,
1039+
level: None,
1040+
}),
1041+
);
1042+
1043+
// Auto-detect mode — send corrupt VP9 data
1044+
let mux_config =
1045+
WebMMuxerConfig { video_width: 0, video_height: 0, ..WebMMuxerConfig::default() };
1046+
let muxer = WebMMuxerNode::new(mux_config);
1047+
let mux_handle = tokio::spawn(async move { Box::new(muxer).run(mux_context).await });
1048+
1049+
assert_state_initializing(&mut mux_state_rx).await;
1050+
assert_state_running(&mut mux_state_rx).await;
1051+
1052+
// Send a packet flagged as keyframe but with truncated/corrupt VP9 data
1053+
// (too short for `parse_vp9_keyframe_dimensions` to extract dimensions).
1054+
let truncated = Packet::Binary {
1055+
data: Bytes::from_static(&[0x82, 0x49, 0x83]), // partial sync code
1056+
content_type: None,
1057+
metadata: Some(PacketMetadata {
1058+
timestamp_us: Some(0),
1059+
duration_us: Some(33_333),
1060+
sequence: Some(0),
1061+
keyframe: Some(true),
1062+
}),
1063+
};
1064+
let _ = mux_video_tx.send(truncated).await;
1065+
drop(mux_video_tx);
1066+
1067+
let result = mux_handle.await.unwrap();
1068+
// Should not panic; may return an error about dimension detection.
1069+
let _ = result;
1070+
println!("WebM truncated VP9 header test passed (no panic)");
1071+
}

0 commit comments

Comments
 (0)