Skip to content

Commit c7f1ddd

Browse files
staging-devin-ai-integration[bot]streamkit-devinstreamer45
authored
feat(transport): add dynamic pin support to moq_peer and moq_push (#200)
* feat(transport): add dynamic pin support to moq_peer and moq_push Generalize MoQ transport nodes to discover and create tracks/pins dynamically from catalogs instead of hardcoding audio+video pairs. moq_peer changes: - Set supports_dynamic_pins() to true - Thread DynamicOutputs (Arc<RwLock<HashMap>>) through the publisher call chain: run -> start_publisher_task_with_permit -> publisher_receive_loop -> watch_catalog_and_process -> spawn_track_processor -> process_publisher_frames -> process_frame_from_group - In watch_catalog_and_process, build track-named dynamic pin names (e.g. audio/data, video/hd) from catalog entries - In process_frame_from_group, send frames to both the dynamic (track-named) output pin and the legacy pin for backward compat - Handle all PinManagementMessage variants in handle_pin_management - Accept both EncodedAudio(Opus) and EncodedVideo(VP9) on both input pins (in/in_1) for flexible media routing moq_push changes: - Set supports_dynamic_pins() to true - Accept both EncodedAudio(Opus) and EncodedVideo(VP9) on both input pins (in/in_1) - Handle dynamic input pin creation via PinManagementMessage, mapping each new pin to a corresponding MoQ track - Add pin management select branch in the run loop Engine changes (dynamic_actor.rs): - In validate_connection_types, skip strict type validation for source pins on nodes that support dynamic pins - In connect_nodes, create output pins on-demand via RequestAddOutputPin -> AddedOutputPin flow when the pin distributor doesn't exist but the node supports dynamic pins All existing pipeline YAML files continue to work unchanged. Legacy out/out_1 and in/in_1 pins remain as stable fallbacks. Refs: #197 Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): address review feedback on dynamic pin support - Poll dynamic input receivers in moq_push select loop using poll_fn - Determine is_video from pin name prefix convention instead of accepts_types - Forward dynamic input pin packets in moq_peer instead of dropping channel - Use DynamicInputState struct instead of tuple for type clarity Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * test(transport): add regression tests for dynamic pin fixes - Test make_dynamic_output_pin produces correct types for video/audio/bare names - Test AddedInputPin channel is not dropped (regression for channel discard bug) - Test is_video determination uses pin name prefix convention - Test track name derivation from pin names Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): fix double-prefixed pin names and shutdown cleanup - Use catalog track names directly (already prefixed) instead of re-prefixing with audio/ or video/, which caused double-prefixed names like 'audio/audio/data' - Finish dynamic input track producers on MoqPushNode shutdown - Add regression test for double-prefix bug Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): finish track producers on remove, add stats to dynamic input forwarding - RemoveInputPin now calls finish() on track producers before dropping - Dynamic input forwarding tasks in moq_peer report received/sent stats via stats_delta_tx, matching the static pin handler pattern Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor(transport): remove legacy pin names, use track-named pins exclusively BREAKING CHANGE: moq_peer output pins renamed from out/out_1 to audio/data and video/data to match catalog track names. Removes audio_output_pin/video_output_pin parameters from the entire publisher call chain (start_publisher_task_with_permit, publisher_receive_loop, watch_catalog_and_process, spawn_track_processor, process_publisher_frames, process_frame_from_group). Unifies output_pin and dynamic_pin_name into a single track-name-based output pin. Updates all sample pipeline YAML files to reference the new pin names. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: update remaining out_1 references in samples, e2e fixtures, tests, and docs Updates missed references to the old moq_peer out/out_1 pin names: - samples/pipelines/dynamic/video_moq_webcam_pip.yml - samples/pipelines/dynamic/video_moq_screen_share.yml - e2e/fixtures/webcam-pip.yaml - e2e/fixtures/webcam-pip-cropped.yaml - e2e/fixtures/webcam-pip-circle.yaml - crates/api/src/yaml.rs (parser tests) - docs/src/content/docs/guides/creating-pipelines.md Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor(transport): address all 9 review items on dynamic pin support Critical fixes: - Exclusive routing: dynamic channel OR static output, never both (fix #1) - RwLock poison logged as error instead of silently swallowed (fix #2) Improvements: - Spawned input-forwarding task uses tokio::select! with shutdown_rx (fix #3) - validate_connection_types logs at warn for dynamic pin skip (fix #4) - Document poll_fn starvation bias as accepted trade-off (fix #5) - Remove unused channels parameter from handle_pin_management (fix #6) Nits: - Update DynamicOutputs doc comment, remove stale legacy reference (fix #7) - Use Arc short form (already imported) (fix #8) - Improve test to exercise MoqPeerNode::new + output_pins + make_dynamic_output_pin (fix #9) Also refactored handle_pin_management and process_frame_from_group to reduce cognitive complexity below the 50-point lint threshold by extracting route_packet, spawn_dynamic_input_forwarder, insert_dynamic_output, remove_dynamic_output, and make_dynamic_input_pin helper methods. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): eliminate TOCTOU race in route_packet Hold a single read lock for both the existence check and the send in route_packet, preventing a concurrent RemoveOutputPin from removing the entry between two separate lock acquisitions which would silently drop the packet. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): handle closed dynamic output channels in route_packet Distinguish try_send results: Ok and Full return true (packet sent or acceptable frame drop for real-time media), Closed returns false to trigger shutdown — matching the static output path behaviour. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): keep track processor alive on closed dynamic channel A closed dynamic output channel (downstream consumer disconnected) now removes the stale entry and continues instead of triggering FrameResult::Shutdown. This prevents a single consumer disconnect from killing the entire track processor. Also extract track_name_from_pin() and is_video_pin() into named functions in push.rs so tests exercise the real production code instead of duplicating the logic inline. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): keep dynamic input forwarder alive when no subscribers Match the static input path behaviour: discard frames with `let _ = tx.send(frame)` instead of breaking out of the loop when there are no active broadcast receivers. This prevents the dynamic input forwarder from permanently shutting down between subscriber connections. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): address review round 3 — catalog republish, single-lock route_packet, cleanup - Re-publish MoQ catalog when dynamic tracks are added/removed (push.rs) - Merge route_packet double RwLock acquisition into single lock with RouteOutcome enum - Add design rationale comment on std::sync::RwLock choice for DynamicOutputs - Extract moq_accepted_media_types() helper, deduplicate across peer/mod.rs and push.rs - Change dynamic pin validation log from warn to debug (dynamic_actor.rs) - Use Arc::default() consistently for DynamicOutputs construction - Update moq_peer.yml comment to mention video/data output pin - Remove unused type imports from push.rs Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(transport): downgrade catalog republish log to debug Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: address round 4 review — packet drops, forwarder lifecycle, type validation - route_packet: match on TrySendError::Full/Closed via RouteOutcome enum, log dropped packets at debug level instead of silently discarding - Store JoinHandle for each dynamic input forwarder in a HashMap; abort on RemoveInputPin to prevent task leaks - After dynamic output pin creation in connect_nodes, validate type compatibility using can_connect_any before wiring - republish_catalog returns bool; on failure roll back catalog entry and skip adding DynamicInputState - Use swap_remove instead of remove for O(1) dynamic_inputs removal - Consistent lock-poisoning recovery via unwrap_or_else(PoisonError::into_inner) - Align default dynamic pin names (in_dyn → dynamic_in) - Extract activate_dynamic_input, insert/remove_catalog_rendition helpers to stay within cognitive_complexity limit Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: clean up stale resources on dynamic pin creation failures - Type-mismatch early return in connect_nodes now removes the orphaned PinDistributor entry and stale pin metadata before returning - AddedOutputPin send failure path gets the same cleanup - Document that validate_connection_types skips dest-pin validation too when source node supports dynamic pins (known limitation) - RemoveInputPin in push.rs uses swap_remove instead of drain+collect - Prune finished forwarder JoinHandles on AddedInputPin to prevent unbounded growth from naturally-closed channels - Add safety comment about poll_fn/select! mutable borrow interaction - Deduplicate output_pins() by reusing make_dynamic_output_pin Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: finish leaked producers, shut down orphaned distributors, cleanup nits - activate_dynamic_input: finish track producer before returning on catalog republish failure to avoid dangling broadcast track - connect_nodes: send PinConfigMsg::Shutdown to the spawned PinDistributor on both type-mismatch and AddedOutputPin send failure error paths, preventing orphaned actor tasks - Abort all forwarder JoinHandles on node shutdown for deterministic cleanup instead of relying on channel close propagation - Remove redundant 'let mut catalog_producer = catalog_producer' rebinding - Downgrade subscriber_count atomics from SeqCst to Relaxed (only used for logging, no cross-variable synchronization needed) Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: rollback leaked input pins, guard duplicates, timeout pin creation - Add rollback_dynamic_input helper to clean up destination input pins when step-2 (output pin creation) fails in connect_nodes - Track created_dynamic_input to conditionally rollback on all 6 step-2 failure paths (type mismatch, send failures, timeouts) - Wrap RequestAddInputPin and RequestAddOutputPin responses with tokio::time::timeout(5s) to prevent engine deadlock - Guard duplicate dynamic input pin names in push.rs with check-and-replace via swap_remove - Abort old forwarder handle on re-add collision in peer/mod.rs - Extract activate_dynamic_input_forwarder to reduce cognitive complexity - Bump stale dynamic output entry log from debug to info - Make original catalog binding mut, remove redundant rebind - Align moq_accepted_media_types() import qualification - Shut down orphaned PinDistributor actors on type-mismatch and AddedOutputPin send failure paths Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: assert pin.name == from_pin invariant on dynamic output creation Add debug_assert_eq! after receiving the pin definition from RequestAddOutputPin to make the implicit contract explicit: the node must return the suggested name unchanged. Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> --------- Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com>
1 parent d739172 commit c7f1ddd

21 files changed

+1147
-161
lines changed

crates/api/src/yaml.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,7 +1145,8 @@ nodes:
11451145
output_broadcast: output
11461146
ogg_muxer:
11471147
kind: containers::ogg::muxer
1148-
needs: moq_peer
1148+
needs:
1149+
in: moq_peer.audio/data
11491150
file_writer:
11501151
kind: core::file_writer
11511152
params:
@@ -1176,7 +1177,8 @@ mode: dynamic
11761177
nodes:
11771178
decoder:
11781179
kind: audio::opus::decoder
1179-
needs: moq_peer
1180+
needs:
1181+
in: moq_peer.audio/data
11801182
encoder:
11811183
kind: audio::opus::encoder
11821184
needs: mixer

crates/engine/src/dynamic_actor.rs

Lines changed: 236 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -667,8 +667,26 @@ impl DynamicEngine {
667667
.output_pins
668668
.iter()
669669
.find(|p| p.name == from_pin)
670-
.or_else(|| match_dynamic_output_pin(&source_metadata.output_pins, from_pin))
671-
.ok_or_else(|| format!("Source pin '{from_pin}' not found on node '{from_node}'"))?;
670+
.or_else(|| match_dynamic_output_pin(&source_metadata.output_pins, from_pin));
671+
let Some(source_pin) = source_pin else {
672+
// If the source pin is not found but the node supports dynamic pins,
673+
// allow the connection — the output pin will be created on-demand in
674+
// connect_nodes via RequestAddOutputPin.
675+
//
676+
// NOTE: this skips destination-pin validation too. When both nodes
677+
// support dynamic pins and neither pin exists yet, no compile-time
678+
// type checking occurs — mismatches will only surface at runtime
679+
// (or via the post-creation check in connect_nodes).
680+
if self.pin_management_txs.contains_key(from_node) {
681+
tracing::debug!(
682+
"Source pin {}.{} not in metadata, but node supports dynamic pins; skipping strict type validation",
683+
from_node,
684+
from_pin
685+
);
686+
return Ok(());
687+
}
688+
return Err(format!("Source pin '{from_pin}' not found on node '{from_node}'"));
689+
};
672690

673691
// Find destination input pin (exact match or dynamic pin family template).
674692
//
@@ -777,6 +795,9 @@ impl DynamicEngine {
777795

778796
// 1. Find the destination input Sender
779797
// If the pin doesn't exist and the node supports dynamic pins, create it first
798+
// Track whether we dynamically created the input pin so we can roll it
799+
// back if step 2 (output pin creation) fails.
800+
let mut created_dynamic_input: Option<String> = None;
780801
let dest_tx = if let Some(tx) = self.node_inputs.get(&(to_node.clone(), to_pin.clone())) {
781802
tx.clone()
782803
} else if let Some(pin_mgmt_tx) = self.pin_management_txs.get(&to_node) {
@@ -802,17 +823,31 @@ impl DynamicEngine {
802823
return;
803824
}
804825

805-
// Wait for the pin to be created
806-
let pin = match response_rx.await {
807-
Ok(Ok(pin)) => pin,
808-
Ok(Err(e)) => {
809-
tracing::error!("Node '{}' rejected pin creation: {}", to_node, e);
810-
return;
811-
},
812-
Err(_) => {
813-
tracing::error!("Node '{}' did not respond to pin creation request", to_node);
814-
return;
815-
},
826+
// Wait for the pin to be created (with timeout to avoid blocking
827+
// the engine indefinitely if the node is unresponsive).
828+
let pin = if let Ok(inner) =
829+
tokio::time::timeout(std::time::Duration::from_secs(5), response_rx).await
830+
{
831+
match inner {
832+
Ok(Ok(pin)) => pin,
833+
Ok(Err(e)) => {
834+
tracing::error!("Node '{}' rejected pin creation: {}", to_node, e);
835+
return;
836+
},
837+
Err(_) => {
838+
tracing::error!(
839+
"Node '{}' did not respond to pin creation request",
840+
to_node
841+
);
842+
return;
843+
},
844+
}
845+
} else {
846+
tracing::error!(
847+
"Timed out waiting for input pin creation response from node '{}'",
848+
to_node
849+
);
850+
return;
816851
};
817852

818853
// Create the channel for this new pin
@@ -841,6 +876,7 @@ impl DynamicEngine {
841876
return;
842877
}
843878

879+
created_dynamic_input = Some(pin.name.clone());
844880
tx
845881
} else {
846882
tracing::error!(
@@ -852,14 +888,177 @@ impl DynamicEngine {
852888
};
853889

854890
// 2. Find the source Pin Distributor configuration Sender
855-
// Use let...else for cleaner early return pattern
856-
let Some(config_tx) = self.pin_distributors.get(&(from_node.clone(), from_pin.clone()))
857-
else {
858-
tracing::error!(
859-
"Cannot connect: Source output '{}.{}' distributor not found.",
891+
// If the pin doesn't exist and the node supports dynamic pins, create it first
892+
let config_tx = if let Some(tx) =
893+
self.pin_distributors.get(&(from_node.clone(), from_pin.clone()))
894+
{
895+
tx.clone()
896+
} else if let Some(pin_mgmt_tx) = self.pin_management_txs.get(&from_node) {
897+
// Node supports dynamic pins — create the output pin on-demand
898+
tracing::info!(
899+
"Dynamically creating output pin '{}.{}' for connection",
860900
from_node,
861901
from_pin
862902
);
903+
904+
// Request pin creation from the node
905+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
906+
let msg = streamkit_core::pins::PinManagementMessage::RequestAddOutputPin {
907+
suggested_name: Some(from_pin.clone()),
908+
response_tx,
909+
};
910+
911+
if pin_mgmt_tx.send(msg).await.is_err() {
912+
tracing::error!(
913+
"Failed to send output pin creation request to node '{}'. It may have stopped.",
914+
from_node
915+
);
916+
if let Some(ref input_pin) = created_dynamic_input {
917+
self.rollback_dynamic_input(&to_node, input_pin).await;
918+
}
919+
return;
920+
}
921+
922+
// Wait for the node to respond with the pin definition (with
923+
// timeout to avoid blocking the engine indefinitely).
924+
let pin = if let Ok(inner) =
925+
tokio::time::timeout(std::time::Duration::from_secs(5), response_rx).await
926+
{
927+
match inner {
928+
Ok(Ok(pin)) => pin,
929+
Ok(Err(e)) => {
930+
tracing::error!("Node '{}' rejected output pin creation: {}", from_node, e);
931+
if let Some(ref input_pin) = created_dynamic_input {
932+
self.rollback_dynamic_input(&to_node, input_pin).await;
933+
}
934+
return;
935+
},
936+
Err(_) => {
937+
tracing::error!(
938+
"Node '{}' did not respond to output pin creation request",
939+
from_node
940+
);
941+
if let Some(ref input_pin) = created_dynamic_input {
942+
self.rollback_dynamic_input(&to_node, input_pin).await;
943+
}
944+
return;
945+
},
946+
}
947+
} else {
948+
tracing::error!(
949+
"Timed out waiting for output pin creation response from node '{}'",
950+
from_node
951+
);
952+
if let Some(ref input_pin) = created_dynamic_input {
953+
self.rollback_dynamic_input(&to_node, input_pin).await;
954+
}
955+
return;
956+
};
957+
958+
// The engine uses `from_pin` as the connection key while the
959+
// distributor is stored under `pin.name`. These must match;
960+
// a divergence would cause disconnect_nodes to miss the entry.
961+
debug_assert_eq!(
962+
pin.name, from_pin,
963+
"Node returned pin name '{}' but engine expected '{}'",
964+
pin.name, from_pin
965+
);
966+
967+
// Create channels for the PinDistributor
968+
let (data_tx, data_rx) = mpsc::channel(self.pin_distributor_capacity);
969+
let (cfg_tx, cfg_rx) = mpsc::channel(CONTROL_CAPACITY);
970+
971+
// Spawn the PinDistributorActor
972+
let distributor =
973+
PinDistributorActor::new(data_rx, cfg_rx, from_node.clone(), pin.name.clone());
974+
tokio::spawn(distributor.run());
975+
976+
// Store the configuration sender in the engine state
977+
self.pin_distributors.insert((from_node.clone(), pin.name.clone()), cfg_tx.clone());
978+
979+
// Update pin metadata so future validations can resolve this pin by name
980+
let meta = self.node_pin_metadata.entry(from_node.clone()).or_insert_with(|| {
981+
NodePinMetadata { input_pins: Vec::new(), output_pins: Vec::new() }
982+
});
983+
if !meta.output_pins.iter().any(|p| p.name == pin.name) {
984+
meta.output_pins.push(pin.clone());
985+
}
986+
987+
// Now that we have the concrete pin definition, validate type
988+
// compatibility against the destination. This catches YAML typos
989+
// like `moq_peer.nonexistent/garbage` that were previously allowed
990+
// through the early-return in validate_connection_types.
991+
if let Some(dest_meta) = self.node_pin_metadata.get(&to_node) {
992+
let dest_pin_def = dest_meta.input_pins.iter().find(|p| p.name == to_pin);
993+
if let Some(dest_pin_def) = dest_pin_def {
994+
let registry = streamkit_core::packet_meta::packet_type_registry();
995+
if !streamkit_core::packet_meta::can_connect_any(
996+
&pin.produces_type,
997+
&dest_pin_def.accepts_types,
998+
registry,
999+
) {
1000+
tracing::error!(
1001+
"Type mismatch after dynamic pin creation: {}.{} produces {:?}, but {}.{} accepts {:?}",
1002+
from_node, pin.name, pin.produces_type,
1003+
to_node, to_pin, dest_pin_def.accepts_types
1004+
);
1005+
// Clean up the distributor actor and metadata that were
1006+
// just created — leaving them would leak an orphaned
1007+
// task and stale metadata for the session.
1008+
if let Some(cfg) =
1009+
self.pin_distributors.remove(&(from_node.clone(), pin.name.clone()))
1010+
{
1011+
let _ = cfg.send(PinConfigMsg::Shutdown).await;
1012+
}
1013+
if let Some(meta) = self.node_pin_metadata.get_mut(&from_node) {
1014+
meta.output_pins.retain(|p| p.name != pin.name);
1015+
}
1016+
if let Some(ref input_pin) = created_dynamic_input {
1017+
self.rollback_dynamic_input(&to_node, input_pin).await;
1018+
}
1019+
return;
1020+
}
1021+
}
1022+
}
1023+
1024+
// Notify the node that the output pin is ready with its channel
1025+
let pin_name_for_cleanup = pin.name.clone();
1026+
let added_msg = streamkit_core::pins::PinManagementMessage::AddedOutputPin {
1027+
pin,
1028+
channel: data_tx,
1029+
};
1030+
1031+
if pin_mgmt_tx.send(added_msg).await.is_err() {
1032+
tracing::error!(
1033+
"Failed to send output pin activation to node '{}'. It may have stopped.",
1034+
from_node
1035+
);
1036+
// Clean up the distributor and metadata — the node never
1037+
// received AddedOutputPin so nothing will produce into this pin.
1038+
if let Some(cfg) =
1039+
self.pin_distributors.remove(&(from_node.clone(), pin_name_for_cleanup.clone()))
1040+
{
1041+
let _ = cfg.send(PinConfigMsg::Shutdown).await;
1042+
}
1043+
if let Some(meta) = self.node_pin_metadata.get_mut(&from_node) {
1044+
meta.output_pins.retain(|p| p.name != pin_name_for_cleanup);
1045+
}
1046+
if let Some(ref input_pin) = created_dynamic_input {
1047+
self.rollback_dynamic_input(&to_node, input_pin).await;
1048+
}
1049+
return;
1050+
}
1051+
1052+
cfg_tx
1053+
} else {
1054+
tracing::error!(
1055+
"Cannot connect: Source output '{}.{}' distributor not found and node doesn't support dynamic pins.",
1056+
from_node,
1057+
from_pin
1058+
);
1059+
if let Some(ref input_pin) = created_dynamic_input {
1060+
self.rollback_dynamic_input(&to_node, input_pin).await;
1061+
}
8631062
return;
8641063
};
8651064

@@ -881,6 +1080,25 @@ impl DynamicEngine {
8811080
}
8821081
}
8831082

1083+
/// Roll back a dynamically created input pin when a subsequent step in
1084+
/// `connect_nodes` fails. Removes the pin's channel from `node_inputs`,
1085+
/// prunes the metadata entry, and notifies the destination node via
1086+
/// `RemoveInputPin` so it can clean up its internal state (e.g. drop a
1087+
/// `DynamicInputState` in `MoqPushNode` or abort a forwarder task in
1088+
/// `MoqPeerNode`).
1089+
async fn rollback_dynamic_input(&mut self, node_id: &str, pin_name: &str) {
1090+
self.node_inputs.remove(&(node_id.to_string(), pin_name.to_string()));
1091+
if let Some(meta) = self.node_pin_metadata.get_mut(node_id) {
1092+
meta.input_pins.retain(|p| p.name != pin_name);
1093+
}
1094+
if let Some(pin_mgmt_tx) = self.pin_management_txs.get(node_id) {
1095+
let msg = streamkit_core::pins::PinManagementMessage::RemoveInputPin {
1096+
pin_name: pin_name.to_string(),
1097+
};
1098+
let _ = pin_mgmt_tx.send(msg).await;
1099+
}
1100+
}
1101+
8841102
/// Helper function to disconnect nodes.
8851103
///
8861104
/// Takes `&self` not `&mut self` because it only reads from HashMaps and sends messages

crates/nodes/src/transport/moq/constants.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,32 @@
44

55
//! Shared constants for MoQ transport nodes
66
7-
use streamkit_core::types::PacketMetadata;
7+
use streamkit_core::types::{
8+
AudioCodec, EncodedAudioFormat, EncodedVideoFormat, PacketMetadata, PacketType, VideoCodec,
9+
};
810

911
pub const DEFAULT_AUDIO_FRAME_DURATION_US: u64 = 20_000;
1012

1113
pub fn packet_duration_us(metadata: Option<&PacketMetadata>) -> Option<u64> {
1214
metadata.and_then(|m| m.duration_us).filter(|d| *d > 0)
1315
}
16+
17+
/// Return the accepted media types for dynamic MoQ pins (Opus audio + VP9 video).
18+
///
19+
/// This is shared across `moq_peer` and `moq_push` to avoid duplicating the
20+
/// type construction in every `RequestAddInputPin` / `RequestAddOutputPin` handler.
21+
pub fn moq_accepted_media_types() -> Vec<PacketType> {
22+
vec![
23+
PacketType::EncodedAudio(EncodedAudioFormat {
24+
codec: AudioCodec::Opus,
25+
codec_private: None,
26+
}),
27+
PacketType::EncodedVideo(EncodedVideoFormat {
28+
codec: VideoCodec::Vp9,
29+
bitstream_format: None,
30+
codec_private: None,
31+
profile: None,
32+
level: None,
33+
}),
34+
]
35+
}

0 commit comments

Comments
 (0)