Skip to content

Commit 28081eb

Browse files
fix(compositor): handle non-video packets and single channel close in recv_from_any_slot
Introduces SlotRecvResult enum with Frame/ChannelClosed/NonVideo/Empty variants. The main loop now removes closed slots and skips non-video packets instead of treating any single channel close as all-inputs-closed. Also adds a comment about dropped in-flight results on shutdown (Fix #6). Optimizes overlay cloning by using Arc<[Arc<DecodedOverlay>]> instead of Vec<Arc<DecodedOverlay>> so cloning into the work item each frame is a single ref-count bump instead of a full Vec clone (Fix #8). Fixes: #1, #6, #8 Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
1 parent 1c1f49c commit 28081eb

File tree

2 files changed

+80
-30
lines changed

2 files changed

+80
-30
lines changed

crates/nodes/src/video/compositor/kernel.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,11 @@ pub struct CompositeWorkItem {
3232
pub canvas_w: u32,
3333
pub canvas_h: u32,
3434
pub layers: Vec<Option<LayerSnapshot>>,
35-
pub image_overlays: Vec<Arc<DecodedOverlay>>,
36-
pub text_overlays: Vec<Arc<DecodedOverlay>>,
35+
/// Shared, immutable overlay lists. Using `Arc<[…]>` means cloning
36+
/// into the work item each frame is a single ref-count bump instead
37+
/// of cloning the entire `Vec`.
38+
pub image_overlays: Arc<[Arc<DecodedOverlay>]>,
39+
pub text_overlays: Arc<[Arc<DecodedOverlay>]>,
3740
pub video_pool: Option<Arc<streamkit_core::VideoFramePool>>,
3841
pub output_format: PixelFormat,
3942
}

crates/nodes/src/video/compositor/mod.rs

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl ProcessorNode for CompositorNode {
203203

204204
// Decode image overlays (once). Wrap in Arc so per-frame clones
205205
// into the work item are cheap reference-count bumps.
206-
let mut image_overlays: Vec<Arc<DecodedOverlay>> =
206+
let mut image_overlays_vec: Vec<Arc<DecodedOverlay>> =
207207
Vec::with_capacity(self.config.image_overlays.len());
208208
for (i, img_cfg) in self.config.image_overlays.iter().enumerate() {
209209
match decode_image_overlay(img_cfg) {
@@ -218,7 +218,7 @@ impl ProcessorNode for CompositorNode {
218218
overlay.rect.width,
219219
overlay.rect.height,
220220
);
221-
image_overlays.push(Arc::new(overlay));
221+
image_overlays_vec.push(Arc::new(overlay));
222222
},
223223
Err(e) => {
224224
tracing::warn!("Failed to decode image overlay {}: {}", i, e);
@@ -227,12 +227,17 @@ impl ProcessorNode for CompositorNode {
227227
}
228228

229229
// Rasterize text overlays (once; re-done on UpdateParams). Also Arc-wrapped.
230-
let mut text_overlays: Vec<Arc<DecodedOverlay>> =
230+
let mut text_overlays_vec: Vec<Arc<DecodedOverlay>> =
231231
Vec::with_capacity(self.config.text_overlays.len());
232232
for txt_cfg in &self.config.text_overlays {
233-
text_overlays.push(Arc::new(rasterize_text_overlay(txt_cfg)));
233+
text_overlays_vec.push(Arc::new(rasterize_text_overlay(txt_cfg)));
234234
}
235235

236+
// Wrap in Arc<[...]> so per-frame clones into the work item are
237+
// a single ref-count bump instead of cloning the entire Vec.
238+
let mut image_overlays: Arc<[Arc<DecodedOverlay>]> = Arc::from(image_overlays_vec);
239+
let mut text_overlays: Arc<[Arc<DecodedOverlay>]> = Arc::from(text_overlays_vec);
240+
236241
// Collect initial input slots from pre-connected pins.
237242
let mut slots: Vec<InputSlot> = Vec::new();
238243
for pin_name in context.inputs.keys() {
@@ -426,13 +431,34 @@ impl ProcessorNode for CompositorNode {
426431

427432
// Wait for a frame from any connected input.
428433
result = recv_from_any_slot(&mut slots) => {
429-
if let Some((slot_idx, frame)) = result {
430-
slots[slot_idx].latest_frame = Some(frame);
431-
received_frame = true;
432-
} else {
433-
// All inputs closed.
434-
stop_reason = "all_inputs_closed";
435-
should_break = true;
434+
match result {
435+
SlotRecvResult::Frame(slot_idx, frame) => {
436+
slots[slot_idx].latest_frame = Some(frame);
437+
received_frame = true;
438+
}
439+
SlotRecvResult::ChannelClosed(slot_idx) => {
440+
tracing::info!(
441+
"CompositorNode: input '{}' closed",
442+
slots[slot_idx].name
443+
);
444+
slots.remove(slot_idx);
445+
if slots.is_empty() {
446+
stop_reason = "all_inputs_closed";
447+
should_break = true;
448+
}
449+
// Otherwise continue — remaining slots are still active.
450+
}
451+
SlotRecvResult::NonVideo(slot_idx) => {
452+
tracing::debug!(
453+
"CompositorNode: ignoring non-video packet on '{}'",
454+
slots[slot_idx].name
455+
);
456+
// Skip and continue waiting.
457+
}
458+
SlotRecvResult::Empty => {
459+
stop_reason = "all_inputs_closed";
460+
should_break = true;
461+
}
436462
}
437463
}
438464
}
@@ -620,6 +646,10 @@ impl ProcessorNode for CompositorNode {
620646
}
621647

622648
// Drop the work sender to signal the compositing thread to exit.
649+
// NOTE: Any composite result currently in-flight (sent to the thread
650+
// but not yet received back via result_rx) will be lost here. This is
651+
// acceptable for shutdown semantics — we prefer a fast exit over
652+
// draining one extra frame that may never be forwarded downstream.
623653
drop(work_tx);
624654
let _ = composite_thread.await;
625655

@@ -634,8 +664,8 @@ impl ProcessorNode for CompositorNode {
634664
impl CompositorNode {
635665
fn apply_update_params(
636666
config: &mut CompositorConfig,
637-
image_overlays: &mut Vec<Arc<DecodedOverlay>>,
638-
text_overlays: &mut Vec<Arc<DecodedOverlay>>,
667+
image_overlays: &mut Arc<[Arc<DecodedOverlay>]>,
668+
text_overlays: &mut Arc<[Arc<DecodedOverlay>]>,
639669
params: serde_json::Value,
640670
stats_tracker: &mut NodeStatsTracker,
641671
) {
@@ -652,19 +682,22 @@ impl CompositorNode {
652682

653683
// Always re-decode image overlays (content may have changed
654684
// even if the count is the same).
655-
image_overlays.clear();
685+
let mut new_image_overlays = Vec::with_capacity(new_config.image_overlays.len());
656686
for img_cfg in &new_config.image_overlays {
657687
match decode_image_overlay(img_cfg) {
658-
Ok(ov) => image_overlays.push(Arc::new(ov)),
688+
Ok(ov) => new_image_overlays.push(Arc::new(ov)),
659689
Err(e) => tracing::warn!("Image overlay decode failed: {e}"),
660690
}
661691
}
692+
*image_overlays = Arc::from(new_image_overlays);
662693

663694
// Re-rasterize text overlays.
664-
text_overlays.clear();
665-
for txt_cfg in &new_config.text_overlays {
666-
text_overlays.push(Arc::new(rasterize_text_overlay(txt_cfg)));
667-
}
695+
let new_text_overlays: Vec<Arc<DecodedOverlay>> = new_config
696+
.text_overlays
697+
.iter()
698+
.map(|txt_cfg| Arc::new(rasterize_text_overlay(txt_cfg)))
699+
.collect();
700+
*text_overlays = Arc::from(new_text_overlays);
668701

669702
*config = new_config;
670703
},
@@ -712,13 +745,26 @@ impl CompositorNode {
712745

713746
// ── Frame receive helper ────────────────────────────────────────────────────
714747

715-
/// Wait for a video frame from any of the input slots. Returns the slot index
716-
/// and the received frame, or `None` if all channels are closed.
748+
/// Result of waiting for a frame from input slots.
749+
enum SlotRecvResult {
750+
/// A video frame was received from the slot at the given index.
751+
Frame(usize, VideoFrame),
752+
/// The channel at the given index was closed.
753+
ChannelClosed(usize),
754+
/// A non-video packet was received (and discarded) from the given index.
755+
NonVideo(usize),
756+
/// All slots are empty (should not happen if caller checks).
757+
Empty,
758+
}
759+
717760
type SlotRecvFut<'a> = Pin<Box<dyn futures::Future<Output = (usize, Option<Packet>)> + Send + 'a>>;
718761

719-
async fn recv_from_any_slot(slots: &mut [InputSlot]) -> Option<(usize, VideoFrame)> {
762+
/// Wait for a packet from any of the input slots. Returns a typed result so
763+
/// the caller can distinguish between a received video frame, a closed channel,
764+
/// and a non-video packet (which should be skipped, not treated as closure).
765+
async fn recv_from_any_slot(slots: &mut [InputSlot]) -> SlotRecvResult {
720766
if slots.is_empty() {
721-
return None;
767+
return SlotRecvResult::Empty;
722768
}
723769

724770
// Use futures to poll all receivers concurrently.
@@ -735,16 +781,17 @@ async fn recv_from_any_slot(slots: &mut [InputSlot]) -> Option<(usize, VideoFram
735781
.collect();
736782

737783
if futs.is_empty() {
738-
return None;
784+
return SlotRecvResult::Empty;
739785
}
740786

741787
let (result, _idx, _remaining) = select_all(futs).await;
742788
let (slot_idx, maybe_packet) = result;
743789

744-
maybe_packet.and_then(|pkt| match pkt {
745-
Packet::Video(frame) => Some((slot_idx, frame)),
746-
_ => None,
747-
})
790+
match maybe_packet {
791+
Some(Packet::Video(frame)) => SlotRecvResult::Frame(slot_idx, frame),
792+
Some(_) => SlotRecvResult::NonVideo(slot_idx),
793+
None => SlotRecvResult::ChannelClosed(slot_idx),
794+
}
748795
}
749796

750797
// ── Registration ────────────────────────────────────────────────────────────

0 commit comments

Comments
 (0)