Skip to content

Commit e603e7e

Browse files
fix: address code review feedback for AAC encoder PR
- Split wildcard Aac | _ pattern into explicit arms with tracing::warn for unrecognised future audio codecs (Critical #1) - Parameterize DEFAULT_AUDIO_FRAME_DURATION_US by codec: Opus 20ms, AAC ~21.333ms via const fn helpers (Suggestion #2) - Compute AAC timestamps from frame count to avoid truncation drift: sequence * 1024 * 1_000_000 / 48_000 (Suggestion #3) - Document Binary vs EncodedAudio semantic mismatch in AAC encoder output pin (Suggestion #4) - Bundle video/audio codec into MediaCodecConfig struct for handle_pin_management (Suggestion #5) - Deduplicate parse_audio_codec_config: mp4.rs delegates to shared implementation in moq/constants.rs (Nit #1) - Document 960→1024 mixer/encoder frame size interaction and rewrite moq_aac_mixing.yml as documented placeholder (Nit #2) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
1 parent 0ce94e2 commit e603e7e

File tree

19 files changed

+137
-117
lines changed

19 files changed

+137
-117
lines changed

crates/nodes/src/containers/mp4.rs

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,23 @@ use crate::video::DEFAULT_VIDEO_FRAME_DURATION_US;
4848
// Constants
4949
// ---------------------------------------------------------------------------
5050

51-
/// Default audio frame duration when metadata is missing (20 ms Opus frame).
52-
const DEFAULT_AUDIO_FRAME_DURATION_US: u64 = 20_000;
51+
/// Default audio frame duration when metadata is missing.
52+
///
53+
/// The correct value depends on the codec:
54+
/// - Opus: 20 ms (960 samples at 48 kHz)
55+
/// - AAC-LC: ~21.333 ms (1024 samples at 48 kHz)
56+
///
57+
/// Use [`default_audio_frame_duration_us`] to get the codec-aware value.
58+
const DEFAULT_AUDIO_FRAME_DURATION_US_OPUS: u64 = 20_000;
59+
const DEFAULT_AUDIO_FRAME_DURATION_US_AAC: u64 = 21_333;
60+
61+
/// Return the default audio frame duration for the given codec.
62+
const fn default_audio_frame_duration_us(codec: AudioCodec) -> u64 {
63+
match codec {
64+
AudioCodec::Aac => DEFAULT_AUDIO_FRAME_DURATION_US_AAC,
65+
_ => DEFAULT_AUDIO_FRAME_DURATION_US_OPUS,
66+
}
67+
}
5368

5469
/// Default video timescale (90 kHz — standard for MPEG transport streams / MP4).
5570
const DEFAULT_VIDEO_TIMESCALE: NonZeroU32 = match NonZeroU32::new(90_000) {
@@ -608,21 +623,10 @@ fn classify_packet(packet: Packet) -> Option<MuxFrame> {
608623

609624
/// Parse an optional audio codec config string into an [`AudioCodec`].
610625
///
611-
/// Accepted values (case-insensitive): `"opus"`, `"aac"`.
612-
/// Returns `AudioCodec::Opus` when the input is `None` or unrecognised.
626+
/// Delegates to the shared [`crate::transport::moq::parse_audio_codec_config`]
627+
/// parser and defaults to `Opus` when the input is `None` or unrecognised.
613628
fn parse_mp4_audio_codec_config(s: Option<&str>) -> AudioCodec {
614-
match s {
615-
Some(v) if v.eq_ignore_ascii_case("aac") => AudioCodec::Aac,
616-
Some(v) if v.eq_ignore_ascii_case("opus") => AudioCodec::Opus,
617-
Some(other) => {
618-
tracing::warn!(
619-
audio_codec = other,
620-
"unrecognised audio_codec config value, defaulting to Opus"
621-
);
622-
AudioCodec::Opus
623-
},
624-
None => AudioCodec::Opus,
625-
}
629+
s.and_then(crate::transport::moq::parse_audio_codec_config).unwrap_or(AudioCodec::Opus)
626630
}
627631

628632
/// Determine the MP4 MIME content-type string from optional codec info.
@@ -653,10 +657,19 @@ fn mp4_content_type(audio: Option<AudioCodec>, video: Option<VideoCodec>) -> &'s
653657
(Some(AudioCodec::Aac), Some(VideoCodec::H264)) => "video/mp4; codecs=\"avc1,mp4a\"",
654658
// Audio + unknown/future video codec
655659
(Some(AudioCodec::Opus), Some(_)) => "video/mp4; codecs=\"opus\"",
656-
(Some(AudioCodec::Aac | _), Some(_)) => "video/mp4; codecs=\"mp4a\"",
660+
(Some(AudioCodec::Aac), Some(_)) => "video/mp4; codecs=\"mp4a\"",
657661
// Audio-only
658662
(Some(AudioCodec::Opus), None) => "audio/mp4; codecs=\"opus\"",
659-
(Some(AudioCodec::Aac | _), None) => "audio/mp4; codecs=\"mp4a\"",
663+
(Some(AudioCodec::Aac), None) => "audio/mp4; codecs=\"mp4a\"",
664+
// Future audio codec — warn and omit codecs param.
665+
(Some(_), Some(_)) => {
666+
tracing::warn!("mp4_content_type: unrecognised audio codec — omitting codecs param");
667+
"video/mp4"
668+
},
669+
(Some(_), None) => {
670+
tracing::warn!("mp4_content_type: unrecognised audio codec — omitting codecs param");
671+
"audio/mp4"
672+
},
660673
// Video-only
661674
(None, Some(VideoCodec::Av1)) => "video/mp4; codecs=\"av01\"",
662675
(None, Some(VideoCodec::H264)) => "video/mp4; codecs=\"avc1\"",
@@ -1213,7 +1226,7 @@ async fn run_stream_mode(
12131226
let duration_us = metadata
12141227
.as_ref()
12151228
.and_then(|m| m.duration_us)
1216-
.unwrap_or(DEFAULT_AUDIO_FRAME_DURATION_US);
1229+
.unwrap_or_else(|| default_audio_frame_duration_us(session.audio_codec));
12171230
let duration_ticks = us_to_ticks(duration_us, audio_timescale.get());
12181231

12191232
let data_size = data.len();
@@ -1504,7 +1517,13 @@ async fn run_file_mode(
15041517
)?;
15051518
},
15061519
MuxFrame::Audio(data, metadata) => {
1507-
process_file_audio_frame(&data, metadata.as_ref(), &mut state, stats_tracker)?;
1520+
process_file_audio_frame(
1521+
&data,
1522+
metadata.as_ref(),
1523+
session.audio_codec,
1524+
&mut state,
1525+
stats_tracker,
1526+
)?;
15081527
},
15091528
}
15101529
}
@@ -1608,14 +1627,16 @@ fn process_file_video_frame(
16081627
fn process_file_audio_frame(
16091628
data: &Bytes,
16101629
metadata: Option<&PacketMetadata>,
1630+
audio_codec: AudioCodec,
16111631
state: &mut FileMuxState,
16121632
stats_tracker: &mut NodeStatsTracker,
16131633
) -> Result<(), StreamKitError> {
16141634
state.packet_count += 1;
16151635
stats_tracker.received();
16161636

1617-
let duration_us =
1618-
metadata.and_then(|m| m.duration_us).unwrap_or(DEFAULT_AUDIO_FRAME_DURATION_US);
1637+
let duration_us = metadata
1638+
.and_then(|m| m.duration_us)
1639+
.unwrap_or_else(|| default_audio_frame_duration_us(audio_codec));
16191640
let duration_ticks = us_to_ticks(duration_us, state.audio_timescale.get());
16201641

16211642
let data_offset = state

crates/nodes/src/transport/moq/constants.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,16 @@ use streamkit_core::types::{
1212
AudioCodec, EncodedAudioFormat, EncodedVideoFormat, PacketMetadata, PacketType, VideoCodec,
1313
};
1414

15-
pub const DEFAULT_AUDIO_FRAME_DURATION_US: u64 = 20_000;
15+
pub const DEFAULT_AUDIO_FRAME_DURATION_US_OPUS: u64 = 20_000;
16+
pub const DEFAULT_AUDIO_FRAME_DURATION_US_AAC: u64 = 21_333;
17+
18+
/// Return the default audio frame duration for the given codec.
19+
pub const fn default_audio_frame_duration_us(codec: AudioCodec) -> u64 {
20+
match codec {
21+
AudioCodec::Aac => DEFAULT_AUDIO_FRAME_DURATION_US_AAC,
22+
_ => DEFAULT_AUDIO_FRAME_DURATION_US_OPUS,
23+
}
24+
}
1625

1726
pub fn packet_duration_us(metadata: Option<&PacketMetadata>) -> Option<u64> {
1827
metadata.and_then(|m| m.duration_us).filter(|d| *d > 0)

crates/nodes/src/transport/moq/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#![cfg(feature = "moq")]
1313

1414
mod constants;
15+
pub(crate) use constants::parse_audio_codec_config;
1516
mod peer;
1617
mod pull;
1718
mod push;

crates/nodes/src/transport/moq/peer/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ pub(super) enum PublisherEvent {
8989
Disconnected { path: String, error: Option<String> },
9090
}
9191

92+
/// Resolved codec pair for video and audio.
93+
///
94+
/// Bundles the two codec fields that would otherwise be passed as separate
95+
/// parameters to `handle_pin_management` and friends.
96+
#[derive(Debug, Clone, Copy)]
97+
pub(super) struct MediaCodecConfig {
98+
pub video: VideoCodec,
99+
pub audio: AudioCodec,
100+
}
101+
92102
/// Media and output configuration shared across subscriber-related functions.
93103
pub(super) struct SubscriberMediaConfig {
94104
pub has_video: bool,
@@ -158,6 +168,8 @@ pub(super) struct SubscriberSendCtx<'a> {
158168
pub last_audio_ts_ms: Option<u64>,
159169
pub last_video_ts_ms: Option<u64>,
160170
pub stats_delta_tx: &'a mpsc::Sender<NodeStatsDelta>,
171+
/// Resolved audio codec — used for codec-aware default frame durations.
172+
pub audio_codec: AudioCodec,
161173
}
162174

163175
// ── Shared map of dynamic output pin senders ─────────────────────────────────

crates/nodes/src/transport/moq/peer/mod.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ pub use config::MoqPeerConfig;
1818
use config::{
1919
infer_kind_from_packet, join_gateway_path, make_broadcast_frame, media_kind_for_packet_type,
2020
normalize_gateway_path, BidirectionalTaskConfig, BroadcastFrame, DynamicOutputs, FrameResult,
21-
MediaKind, MediaTypeState, NodeStatsDelta, PublisherEvent, PublisherReceiveLoopWithSlotConfig,
22-
SendResult, SubscriberMediaConfig, SubscriberSendCtx, TrackExit,
21+
MediaCodecConfig, MediaKind, MediaTypeState, NodeStatsDelta, PublisherEvent,
22+
PublisherReceiveLoopWithSlotConfig, SendResult, SubscriberMediaConfig, SubscriberSendCtx,
23+
TrackExit,
2324
};
2425

2526
use crate::transport::moq::constants::{
@@ -639,7 +640,7 @@ impl ProcessorNode for MoqPeerNode {
639640
None => std::future::pending().await,
640641
}
641642
} => {
642-
Self::handle_pin_management(msg, &dynamic_outputs, &subscriber_broadcast_tx, &stats_delta_tx, &shutdown_tx, &mut forwarder_handles, video_codec, audio_codec);
643+
Self::handle_pin_management(msg, &dynamic_outputs, &subscriber_broadcast_tx, &stats_delta_tx, &shutdown_tx, &mut forwarder_handles, MediaCodecConfig { video: video_codec, audio: audio_codec });
643644
}
644645

645646
// Check for shutdown signal
@@ -804,22 +805,20 @@ impl MoqPeerNode {
804805
/// respond with an appropriate pin definition.
805806
/// - [`PinManagementMessage::AddedOutputPin`]: the engine has set up the pin
806807
/// distributor and sends us the channel to write frames to.
807-
#[allow(clippy::too_many_arguments)]
808808
fn handle_pin_management(
809809
msg: PinManagementMessage,
810810
dynamic_outputs: &DynamicOutputs,
811811
subscriber_broadcast_tx: &broadcast::Sender<BroadcastFrame>,
812812
stats_delta_tx: &mpsc::Sender<NodeStatsDelta>,
813813
shutdown_tx: &broadcast::Sender<()>,
814814
forwarder_handles: &mut HashMap<String, tokio::task::JoinHandle<()>>,
815-
video_codec: VideoCodec,
816-
audio_codec: AudioCodec,
815+
codecs: MediaCodecConfig,
817816
) {
818817
match msg {
819818
PinManagementMessage::RequestAddOutputPin { suggested_name, response_tx } => {
820819
let pin_name = suggested_name.unwrap_or_else(|| "dynamic_out".to_string());
821820
tracing::info!("MoqPeerNode: creating dynamic output pin '{}'", pin_name);
822-
let pin = make_dynamic_output_pin(&pin_name, video_codec, audio_codec);
821+
let pin = make_dynamic_output_pin(&pin_name, codecs.video, codecs.audio);
823822
let _ = response_tx.send(Ok(pin));
824823
},
825824
PinManagementMessage::AddedOutputPin { pin, channel } => {
@@ -2128,6 +2127,7 @@ impl MoqPeerNode {
21282127
node_id,
21292128
broadcast_name,
21302129
&stats_delta_tx,
2130+
media.audio_codec,
21312131
)
21322132
.await?;
21332133

@@ -2284,6 +2284,7 @@ impl MoqPeerNode {
22842284
node_id: String,
22852285
broadcast_name: String,
22862286
stats_delta_tx: &mpsc::Sender<NodeStatsDelta>,
2287+
audio_codec: AudioCodec,
22872288
) -> Result<u64, StreamKitError> {
22882289
let meter = opentelemetry::global::meter("skit_nodes");
22892290
let gap_histogram = meter
@@ -2310,6 +2311,7 @@ impl MoqPeerNode {
23102311
last_audio_ts_ms: None,
23112312
last_video_ts_ms: None,
23122313
stats_delta_tx,
2314+
audio_codec,
23132315
};
23142316

23152317
loop {
@@ -2412,7 +2414,9 @@ impl MoqPeerNode {
24122414
}
24132415

24142416
let default_duration = match broadcast_frame.kind {
2415-
MediaKind::Audio => super::constants::DEFAULT_AUDIO_FRAME_DURATION_US,
2417+
MediaKind::Audio => {
2418+
super::constants::default_audio_frame_duration_us(ctx.audio_codec)
2419+
},
24162420
MediaKind::Video => crate::video::DEFAULT_VIDEO_FRAME_DURATION_US,
24172421
};
24182422
clock.advance_by_duration_us(broadcast_frame.duration_us, default_duration);
@@ -2556,8 +2560,7 @@ mod tests {
25562560
&stats_delta_tx,
25572561
&shutdown_tx,
25582562
&mut forwarder_handles,
2559-
VideoCodec::Vp9,
2560-
AudioCodec::Opus,
2563+
MediaCodecConfig { video: VideoCodec::Vp9, audio: AudioCodec::Opus },
25612564
);
25622565

25632566
// If the channel was dropped, try_send would return a closed error.

crates/nodes/src/transport/moq/pull.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
//! MoQ Pull Node - subscribes to broadcasts from a MoQ server
66
7-
use super::constants::DEFAULT_AUDIO_FRAME_DURATION_US;
7+
use super::constants::DEFAULT_AUDIO_FRAME_DURATION_US_OPUS;
88
use crate::video::{AV1_CONTENT_TYPE, H264_CONTENT_TYPE, VP9_CONTENT_TYPE};
99
use async_trait::async_trait;
1010
use bytes::Buf;
@@ -855,7 +855,7 @@ impl MoqPullNode {
855855
let (last_ts, default_dur, clock, is_first_in_group) = match source {
856856
ReadSource::Audio => (
857857
&mut last_audio_timestamp_us,
858-
DEFAULT_AUDIO_FRAME_DURATION_US,
858+
DEFAULT_AUDIO_FRAME_DURATION_US_OPUS,
859859
&mut audio_clock,
860860
&mut audio_is_first_in_group,
861861
),

crates/nodes/src/transport/moq/push.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
//! MoQ Push Node - publishes packets to a MoQ broadcast
66
77
use super::constants::{
8-
catalog_audio_codec, catalog_video_codec, moq_accepted_media_types, resolve_audio_codec,
9-
resolve_video_codec, DEFAULT_AUDIO_FRAME_DURATION_US,
8+
catalog_audio_codec, catalog_video_codec, default_audio_frame_duration_us,
9+
moq_accepted_media_types, resolve_audio_codec, resolve_video_codec,
1010
};
1111
use async_trait::async_trait;
1212
use futures::future::poll_fn;
@@ -511,7 +511,7 @@ impl ProcessorNode for MoqPushNode {
511511
(
512512
&mut audio_clock,
513513
&mut audio_seeded,
514-
DEFAULT_AUDIO_FRAME_DURATION_US,
514+
default_audio_frame_duration_us(audio_codec),
515515
&mut audio_first_sent,
516516
ap,
517517
)
@@ -537,7 +537,7 @@ impl ProcessorNode for MoqPushNode {
537537
let dur = if state.is_video {
538538
crate::video::DEFAULT_VIDEO_FRAME_DURATION_US
539539
} else {
540-
DEFAULT_AUDIO_FRAME_DURATION_US
540+
default_audio_frame_duration_us(audio_codec)
541541
};
542542
(
543543
&mut state.clock,

plugins/native/aac-encoder/src/lib.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ pub struct AacEncoderNode {
6161
encoder: shiguredo_fdk_aac::Encoder,
6262
/// Residual f32 samples that didn't fill a complete 1024×2 frame.
6363
residual: Vec<f32>,
64-
/// Running presentation timestamp in microseconds.
65-
timestamp_us: u64,
66-
/// Running sequence counter.
64+
/// Running sequence counter (also used to compute drift-free timestamps).
6765
sequence: u64,
6866
logger: Logger,
6967
}
@@ -100,19 +98,24 @@ impl AacEncoderNode {
10098

10199
/// Send one encoded AAC frame downstream with timing metadata.
102100
fn emit_frame(&mut self, data: &[u8], output: &OutputSender) -> Result<(), String> {
101+
// Compute timestamp from frame count to avoid accumulating truncation
102+
// drift. 1024 samples / 48 000 Hz = 21.333… µs per frame; using
103+
// integer arithmetic: sequence * 1024 * 1_000_000 / 48_000.
104+
let timestamp_us =
105+
(self.sequence as u128 * 1_024 * 1_000_000 / 48_000) as u64;
106+
103107
let packet = Packet::Binary {
104108
data: bytes::Bytes::copy_from_slice(data),
105109
content_type: Some(std::borrow::Cow::Borrowed(AAC_CONTENT_TYPE)),
106110
metadata: Some(PacketMetadata {
107-
timestamp_us: Some(self.timestamp_us),
111+
timestamp_us: Some(timestamp_us),
108112
duration_us: Some(AAC_FRAME_DURATION_US),
109113
sequence: Some(self.sequence),
110114
keyframe: None,
111115
}),
112116
};
113117
output.send("out", &packet)?;
114118

115-
self.timestamp_us += AAC_FRAME_DURATION_US;
116119
self.sequence += 1;
117120
Ok(())
118121
}
@@ -140,6 +143,14 @@ impl NativeProcessorNode for AacEncoderNode {
140143
}),
141144
],
142145
)
146+
// NOTE: The output type is `PacketType::Binary` rather than
147+
// `PacketType::EncodedAudio(Aac)` because the native plugin C ABI
148+
// does not yet have a discriminant for `EncodedAudio`. The
149+
// `BinaryWithMeta` transport preserves `content_type` and metadata
150+
// so downstream nodes that inspect these fields (e.g. MP4 muxer)
151+
// can still identify the codec. MoQ transport nodes, however,
152+
// expect `EncodedAudio` and will reject `Binary` — this is a known
153+
// limitation until `EncodedAudio` support is added to the C ABI.
143154
.output("out", PacketType::Binary)
144155
.param_schema(serde_json::json!({
145156
"type": "object",
@@ -176,7 +187,6 @@ impl NativeProcessorNode for AacEncoderNode {
176187
Ok(Self {
177188
encoder,
178189
residual: Vec::with_capacity(AAC_FRAME_SAMPLES * usize::from(AAC_CHANNELS) * 2),
179-
timestamp_us: 0,
180190
sequence: 0,
181191
logger,
182192
})

plugins/native/kokoro/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/native/matcha/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)