diff --git a/crates/plugin-native/src/wrapper.rs b/crates/plugin-native/src/wrapper.rs index 9d37a861..1d1f20d0 100644 --- a/crates/plugin-native/src/wrapper.rs +++ b/crates/plugin-native/src/wrapper.rs @@ -17,12 +17,15 @@ use streamkit_core::control::NodeControlMessage; use streamkit_core::telemetry::TelemetryEvent; use streamkit_core::types::Packet; use streamkit_core::{ - InputPin, NodeContext, NodeState, NodeStateUpdate, OutputPin, ProcessorNode, StopReason, - StreamKitError, + AudioFramePool, InputPin, NodeContext, NodeState, NodeStateUpdate, OutputPin, ProcessorNode, + StopReason, StreamKitError, VideoFramePool, }; use streamkit_plugin_sdk_native::{ conversions, - types::{CNativePluginAPI, CPacket, CPluginHandle, CResult}, + types::{ + CAllocAudioResult, CAllocVideoResult, CNativePluginAPI, CNodeCallbacks, CPacket, + CPluginHandle, CResult, + }, }; use tracing::{error, info, warn}; @@ -282,6 +285,8 @@ impl NativeNodeWrapper { let (merged_tx, mut merged_rx) = tokio::sync::mpsc::channel::<(usize, Packet)>(context.batch_size.max(1)); let cancellation_token = context.cancellation_token.clone(); + let video_pool = context.video_pool.clone(); + let audio_pool = context.audio_pool.clone(); for (pin_name, mut rx) in inputs.drain() { let pin_cstr = CString::new(pin_name.as_str()).map_err(|e| { @@ -432,18 +437,18 @@ impl NativeNodeWrapper { telemetry_tx, session_id, node_id, + video_pool, + audio_pool, }; let callback_data = (&raw mut callback_ctx).cast::(); + let node_callbacks = build_node_callbacks(callback_data); // Call plugin's flush function tracing::info!("Calling api.flush()"); let result = (api.flush)( handle, - output_callback_shim, - callback_data, - Some(telemetry_callback_shim), - callback_data, + &raw const node_callbacks, ); tracing::info!(success = result.success, "Flush returned"); @@ -488,6 +493,8 @@ impl NativeNodeWrapper { let session_id = context.session_id.clone(); let node_id = node_name.clone(); let pin_cstr = Arc::clone(&input_pin_cstrs[pin_index]); + let video_pool = video_pool.clone(); + let audio_pool = audio_pool.clone(); let (outputs, error) = tokio::task::spawn_blocking(move || { let Some(handle) = state.begin_call() else { return (Vec::new(), None); @@ -505,19 +512,19 @@ impl NativeNodeWrapper { telemetry_tx, session_id, node_id, + video_pool, + audio_pool, }; let callback_data = (&raw mut callback_ctx).cast::(); + let node_callbacks = build_node_callbacks(callback_data); // Call plugin's process function (BLOCKING - but we're in spawn_blocking) let result = (api.process_packet)( handle, pin_cstr.as_ptr(), &raw const packet_repr.packet, - output_callback_shim, - callback_data, - Some(telemetry_callback_shim), - callback_data, + &raw const node_callbacks, ); // Check for errors @@ -612,6 +619,8 @@ impl NativeNodeWrapper { } let node_name = context.output_sender.node_name().to_string(); + let video_pool = context.video_pool.clone(); + let audio_pool = context.audio_pool.clone(); tracing::info!(node = %node_name, "Native source plugin wrapper starting"); @@ -845,6 +854,8 @@ impl NativeNodeWrapper { let telemetry_tx = context.telemetry_tx.clone(); let session_id = context.session_id.clone(); let node_id = node_name.clone(); + let video_pool = video_pool.clone(); + let audio_pool = audio_pool.clone(); let outcome = tokio::task::spawn_blocking(move || { let Some(handle) = state.begin_call() else { return TickOutcome { @@ -863,17 +874,14 @@ impl NativeNodeWrapper { telemetry_tx, session_id, node_id, + video_pool, + audio_pool, }; let callback_data = (&raw mut callback_ctx).cast::(); + let node_callbacks = build_node_callbacks(callback_data); - let result = tick_fn( - handle, - output_callback_shim, - callback_data, - Some(telemetry_callback_shim), - callback_data, - ); + let result = tick_fn(handle, &raw const node_callbacks); // Extract error string while pointers are still valid. let error_msg = if result.result.success { @@ -1032,6 +1040,43 @@ struct CallbackContext { telemetry_tx: Option>, session_id: Option, node_id: String, + video_pool: Option>, + audio_pool: Option>, +} + +/// Free any pool-allocated `buffer_handle` embedded in a raw [`CPacket`]. +/// +/// This is the safety net for error paths in [`output_callback_shim`]: if +/// `packet_from_c` is never called (e.g. invalid pin name) or if it fails +/// before reclaiming the handle, the pooled buffer would leak because the +/// SDK already marked it as consumed (suppressing `Drop`). +/// +/// # Safety +/// +/// `c_packet` must be a valid, non-null pointer to a [`CPacket`]. +unsafe fn free_packet_buffer_handle(c_packet: *const CPacket) { + use streamkit_core::frame_pool::{PooledSamples, PooledVideoData}; + use streamkit_plugin_sdk_native::types::CPacketType; + + let pkt = &*c_packet; + if pkt.data.is_null() { + return; + } + match pkt.packet_type { + CPacketType::RawVideo => { + let frame = &*pkt.data.cast::(); + if !frame.buffer_handle.is_null() { + drop(Box::from_raw(frame.buffer_handle.cast::())); + } + }, + CPacketType::RawAudio => { + let frame = &*pkt.data.cast::(); + if !frame.buffer_handle.is_null() { + drop(Box::from_raw(frame.buffer_handle.cast::())); + } + }, + _ => {}, + } } /// C callback function for sending output packets @@ -1054,6 +1099,8 @@ extern "C" fn output_callback_shim( Ok(s) => s, Err(e) => { ctx.error = Some(format!("Invalid pin name: {e}")); + // Free any pooled buffer the plugin already consumed. + unsafe { free_packet_buffer_handle(c_packet) }; return CResult::error(std::ptr::null()); }, }; @@ -1062,6 +1109,8 @@ extern "C" fn output_callback_shim( let packet = match unsafe { conversions::packet_from_c(c_packet) } { Ok(p) => p, Err(e) => { + // packet_from_c already frees the buffer_handle on its own error + // paths (Critical #1), so no extra cleanup needed here. ctx.error = Some(format!("Failed to convert packet: {e}")); return CResult::error(std::ptr::null()); }, @@ -1151,3 +1200,83 @@ extern "C" fn telemetry_callback_shim( CResult::success() } + +// ── Frame pool allocation shims (v6) ───────────────────────────────────── + +/// Allocate a video buffer from the host's frame pool. +extern "C" fn alloc_video_shim(min_bytes: usize, user_data: *mut c_void) -> CAllocVideoResult { + use streamkit_core::frame_pool::PooledVideoData; + + if user_data.is_null() { + return CAllocVideoResult::null(); + } + + let ctx = unsafe { &*user_data.cast::() }; + let Some(pool) = ctx.video_pool.as_ref() else { + return CAllocVideoResult::null(); + }; + + let mut pooled: PooledVideoData = pool.get(min_bytes); + let data_ptr = pooled.as_mut_ptr(); + let len = pooled.len(); + let handle = Box::into_raw(Box::new(pooled)).cast::(); + + CAllocVideoResult { data: data_ptr, len, handle, free_fn: Some(free_video_buffer) } +} + +/// Free a video buffer without sending it (error/discard path). +extern "C" fn free_video_buffer(handle: *mut c_void) { + use streamkit_core::frame_pool::PooledVideoData; + + if !handle.is_null() { + // SAFETY: handle was created by alloc_video_shim via Box::into_raw. + let _ = unsafe { Box::from_raw(handle.cast::()) }; + } +} + +/// Allocate an audio buffer from the host's frame pool. +extern "C" fn alloc_audio_shim(min_samples: usize, user_data: *mut c_void) -> CAllocAudioResult { + use streamkit_core::frame_pool::PooledSamples; + + if user_data.is_null() { + return CAllocAudioResult::null(); + } + + let ctx = unsafe { &*user_data.cast::() }; + let Some(pool) = ctx.audio_pool.as_ref() else { + return CAllocAudioResult::null(); + }; + + let mut pooled: PooledSamples = pool.get(min_samples); + let data_ptr = pooled.as_mut_ptr(); + let sample_count = pooled.len(); + let handle = Box::into_raw(Box::new(pooled)).cast::(); + + CAllocAudioResult { data: data_ptr, sample_count, handle, free_fn: Some(free_audio_buffer) } +} + +/// Free an audio buffer without sending it (error/discard path). +extern "C" fn free_audio_buffer(handle: *mut c_void) { + use streamkit_core::frame_pool::PooledSamples; + + if !handle.is_null() { + let _ = unsafe { Box::from_raw(handle.cast::()) }; + } +} + +/// Build a `CNodeCallbacks` struct from a `CallbackContext` pointer. +/// +/// The returned struct borrows `callback_data` — it must not outlive the +/// `CallbackContext`. +fn build_node_callbacks(callback_data: *mut c_void) -> CNodeCallbacks { + CNodeCallbacks { + struct_size: std::mem::size_of::(), + output_callback: output_callback_shim, + output_user_data: callback_data, + telemetry_callback: Some(telemetry_callback_shim), + telemetry_user_data: callback_data, + alloc_video: Some(alloc_video_shim), + alloc_audio: Some(alloc_audio_shim), + alloc_user_data: callback_data, + } +} diff --git a/examples/plugins/gain-native/Cargo.lock b/examples/plugins/gain-native/Cargo.lock index 13ebec70..e9e67204 100644 --- a/examples/plugins/gain-native/Cargo.lock +++ b/examples/plugins/gain-native/Cargo.lock @@ -123,9 +123,9 @@ checksum = "62049b2877bf12821e8f9ad256ee38fdc31db7387ec2d3b3f403024de2034aea" [[package]] name = "schemars" -version = "1.1.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ "dyn-clone", "ref-cast", @@ -136,9 +136,9 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "1.1.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301858a4023d78debd2353c7426dc486001bddc91ae31a76fb1f55132f7e2633" +checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f" dependencies = [ "proc-macro2", "quote", @@ -208,7 +208,7 @@ checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "streamkit-core" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "base64", @@ -226,7 +226,7 @@ dependencies = [ [[package]] name = "streamkit-plugin-sdk-native" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "bytes", @@ -278,9 +278,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.48.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd" dependencies = [ "pin-project-lite", "tokio-macros", @@ -288,9 +288,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", @@ -299,9 +299,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.17" +version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", @@ -343,9 +343,9 @@ dependencies = [ [[package]] name = "ts-rs" -version = "11.1.0" +version = "12.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4994acea2522cd2b3b85c1d9529a55991e3ad5e25cdcd3de9d505972c4379424" +checksum = "756050066659291d47a554a9f558125db17428b073c5ffce1daf5dcb0f7231d8" dependencies = [ "serde_json", "thiserror", @@ -354,9 +354,9 @@ dependencies = [ [[package]] name = "ts-rs-macros" -version = "11.1.0" +version = "12.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee6ff59666c9cbaec3533964505d39154dc4e0a56151fdea30a09ed0301f62e2" +checksum = "38d90eea51bc7988ef9e674bf80a85ba6804739e535e9cab48e4bb34a8b652aa" dependencies = [ "proc-macro2", "quote", diff --git a/plugins/native/slint/src/slint_node.rs b/plugins/native/slint/src/slint_node.rs index 4ce29e66..2c996a70 100644 --- a/plugins/native/slint/src/slint_node.rs +++ b/plugins/native/slint/src/slint_node.rs @@ -215,16 +215,29 @@ impl NativeSourceNode for SlintSourcePlugin { keyframe: Some(true), }); - let frame = VideoFrame::with_metadata( - self.config.width, - self.config.height, - PixelFormat::Rgba8, - rgba_data, - metadata, - ) - .map_err(|e| format!("Failed to create video frame: {e}"))?; - - output.send("out", &Packet::Video(frame))?; + // Try zero-copy pool allocation; fall back to legacy copy path. + if let Some(mut buf) = output.alloc_video(rgba_data.len()) { + buf.as_mut_slice()[..rgba_data.len()].copy_from_slice(&rgba_data); + output.send_video( + "out", + self.config.width, + self.config.height, + PixelFormat::Rgba8, + buf, + metadata.as_ref(), + )?; + } else { + let frame = VideoFrame::with_metadata( + self.config.width, + self.config.height, + PixelFormat::Rgba8, + rgba_data, + metadata, + ) + .map_err(|e| format!("Failed to create video frame: {e}"))?; + + output.send("out", &Packet::Video(frame))?; + } self.tick_count += 1; Ok(false) diff --git a/sdks/plugin-sdk/native/src/conversions.rs b/sdks/plugin-sdk/native/src/conversions.rs index 7cec1592..3e0c63f3 100644 --- a/sdks/plugin-sdk/native/src/conversions.rs +++ b/sdks/plugin-sdk/native/src/conversions.rs @@ -14,6 +14,7 @@ use std::cell::RefCell; use std::ffi::{c_void, CStr, CString}; use std::os::raw::c_char; use std::sync::Arc; +use streamkit_core::frame_pool::{PooledSamples, PooledVideoData}; use streamkit_core::types::{ AudioCodec, AudioFormat, AudioFrame, CustomEncoding, CustomPacketData, EncodedAudioFormat, Packet, PacketMetadata, PacketType, PixelFormat, RawVideoFormat, SampleFormat, @@ -304,13 +305,25 @@ pub struct CPacketRepr { #[allow(dead_code)] // Owned values are kept alive to support FFI pointers during callbacks. enum CPacketOwned { None, - Audio(Box), - Video(Box), + Audio(AudioOwned), + Video(VideoOwned), Text(CString), Bytes(Vec), Custom(CustomOwned), } +#[allow(dead_code)] // Owned values are kept alive to support FFI pointers during callbacks. +struct VideoOwned { + frame: Box, + metadata: Option>, +} + +#[allow(dead_code)] // Owned values are kept alive to support FFI pointers during callbacks. +struct AudioOwned { + frame: Box, + metadata: Option>, +} + #[allow(dead_code)] // Owned values are kept alive to support FFI pointers during callbacks. struct CustomOwned { type_id: CString, @@ -319,7 +332,7 @@ struct CustomOwned { custom: Box, } -fn metadata_to_c(meta: &PacketMetadata) -> CPacketMetadata { +pub fn metadata_to_c(meta: &PacketMetadata) -> CPacketMetadata { CPacketMetadata { timestamp_us: meta.timestamp_us.unwrap_or_default(), has_timestamp_us: meta.timestamp_us.is_some(), @@ -349,18 +362,24 @@ fn cstring_sanitize(s: &str) -> CString { pub fn packet_to_c(packet: &Packet) -> CPacketRepr { match packet { Packet::Audio(frame) => { + let metadata = frame.metadata.as_ref().map(|m| Box::new(metadata_to_c(m))); let c_frame = Box::new(CAudioFrame { sample_rate: frame.sample_rate, channels: frame.channels, samples: frame.samples.as_ptr(), sample_count: frame.samples.len(), + buffer_handle: std::ptr::null_mut(), + metadata: metadata.as_deref().map_or(std::ptr::null(), std::ptr::from_ref), }); let packet = CPacket { packet_type: CPacketType::RawAudio, data: std::ptr::from_ref::(&*c_frame).cast::(), len: std::mem::size_of::(), }; - CPacketRepr { packet, _owned: CPacketOwned::Audio(c_frame) } + CPacketRepr { + packet, + _owned: CPacketOwned::Audio(AudioOwned { frame: c_frame, metadata }), + } }, Packet::Text(text) => { let s = text.as_ref(); @@ -437,19 +456,25 @@ pub fn packet_to_c(packet: &Packet) -> CPacketRepr { _owned: CPacketOwned::None, }, Packet::Video(frame) => { + let metadata = frame.metadata.as_ref().map(|m| Box::new(metadata_to_c(m))); let c_frame = Box::new(CVideoFrame { width: frame.width, height: frame.height, pixel_format: pixel_format_to_c(frame.pixel_format), data: frame.data.as_ptr(), data_len: frame.data.len(), + buffer_handle: std::ptr::null_mut(), + metadata: metadata.as_deref().map_or(std::ptr::null(), std::ptr::from_ref), }); let packet = CPacket { packet_type: CPacketType::RawVideo, data: std::ptr::from_ref::(&*c_frame).cast::(), len: std::mem::size_of::(), }; - CPacketRepr { packet, _owned: CPacketOwned::Video(c_frame) } + CPacketRepr { + packet, + _owned: CPacketOwned::Video(VideoOwned { frame: c_frame, metadata }), + } }, } } @@ -485,16 +510,38 @@ pub unsafe fn packet_from_c(c_packet: *const CPacket) -> Result CPacketType::RawAudio => { let c_frame = &*c_pkt.data.cast::(); if c_frame.samples.is_null() { + if !c_frame.buffer_handle.is_null() { + drop(Box::from_raw(c_frame.buffer_handle.cast::())); + } return Err("Null samples pointer in audio frame".to_string()); } - let samples = std::slice::from_raw_parts(c_frame.samples, c_frame.sample_count); + let metadata = if c_frame.metadata.is_null() { + None + } else { + Some(metadata_from_c(&*c_frame.metadata)) + }; - Ok(Packet::Audio(AudioFrame::new( - c_frame.sample_rate, - c_frame.channels, - samples.to_vec(), - ))) + if c_frame.buffer_handle.is_null() { + // Legacy copy path. + let samples = + std::slice::from_raw_parts(c_frame.samples, c_frame.sample_count).to_vec(); + Ok(Packet::Audio(AudioFrame::with_metadata( + c_frame.sample_rate, + c_frame.channels, + samples, + metadata, + ))) + } else { + // Zero-copy path: reclaim the PooledSamples from the handle. + let pooled = *Box::from_raw(c_frame.buffer_handle.cast::()); + Ok(Packet::Audio(AudioFrame::from_pooled( + c_frame.sample_rate, + c_frame.channels, + pooled, + metadata, + ))) + } }, CPacketType::Text => { let c_str = CStr::from_ptr(c_pkt.data.cast::()); @@ -548,13 +595,44 @@ pub unsafe fn packet_from_c(c_packet: *const CPacket) -> Result CPacketType::RawVideo => { let c_frame = &*c_pkt.data.cast::(); if c_frame.data.is_null() { + if !c_frame.buffer_handle.is_null() { + drop(Box::from_raw(c_frame.buffer_handle.cast::())); + } return Err("Null data pointer in video frame".to_string()); } let pixel_format = pixel_format_from_c(c_frame.pixel_format); - let data = std::slice::from_raw_parts(c_frame.data, c_frame.data_len).to_vec(); - VideoFrame::with_metadata(c_frame.width, c_frame.height, pixel_format, data, None) + + let metadata = if c_frame.metadata.is_null() { + None + } else { + Some(metadata_from_c(&*c_frame.metadata)) + }; + + if c_frame.buffer_handle.is_null() { + // Legacy copy path. + let data = std::slice::from_raw_parts(c_frame.data, c_frame.data_len).to_vec(); + VideoFrame::with_metadata( + c_frame.width, + c_frame.height, + pixel_format, + data, + metadata, + ) + .map(Packet::Video) + .map_err(|e| format!("Invalid video frame: {e}")) + } else { + // Zero-copy path: reclaim the PooledVideoData from the handle. + let pooled = *Box::from_raw(c_frame.buffer_handle.cast::()); + VideoFrame::from_pooled( + c_frame.width, + c_frame.height, + pixel_format, + pooled, + metadata, + ) .map(Packet::Video) .map_err(|e| format!("Invalid video frame: {e}")) + } }, CPacketType::EncodedVideo => { // Encoded video is carried as opaque bytes across the C ABI. @@ -685,4 +763,64 @@ mod tests { free_c_string(c_msg); } } + + /// Regression test: packet_from_c must free a pooled video buffer_handle + /// when the data pointer is null, rather than leaking it. + #[test] + fn packet_from_c_frees_video_handle_on_null_data() { + let pooled = PooledVideoData::from_vec(vec![0u8; 1024]); + let handle = Box::into_raw(Box::new(pooled)).cast::(); + + let c_frame = CVideoFrame { + width: 640, + height: 480, + pixel_format: CPixelFormat::Rgba8, + data: std::ptr::null(), + data_len: 0, + buffer_handle: handle, + metadata: std::ptr::null(), + }; + + let c_pkt = CPacket { + packet_type: CPacketType::RawVideo, + data: std::ptr::from_ref(&c_frame).cast(), + len: std::mem::size_of::(), + }; + + let result = unsafe { packet_from_c(&raw const c_pkt) }; + match result { + Err(msg) => assert!(msg.contains("Null data pointer"), "unexpected: {msg}"), + Ok(_) => panic!("expected error for null video data pointer"), + } + // If the handle were leaked, Miri / DHAT would catch it. + } + + /// Regression test: packet_from_c must free a pooled audio buffer_handle + /// when the samples pointer is null, rather than leaking it. + #[test] + fn packet_from_c_frees_audio_handle_on_null_samples() { + let pooled = PooledSamples::from_vec(vec![0.0f32; 960]); + let handle = Box::into_raw(Box::new(pooled)).cast::(); + + let c_frame = CAudioFrame { + sample_rate: 48_000, + channels: 1, + samples: std::ptr::null(), + sample_count: 0, + buffer_handle: handle, + metadata: std::ptr::null(), + }; + + let c_pkt = CPacket { + packet_type: CPacketType::RawAudio, + data: std::ptr::from_ref(&c_frame).cast(), + len: std::mem::size_of::(), + }; + + let result = unsafe { packet_from_c(&raw const c_pkt) }; + match result { + Err(msg) => assert!(msg.contains("Null samples pointer"), "unexpected: {msg}"), + Ok(_) => panic!("expected error for null audio samples pointer"), + } + } } diff --git a/sdks/plugin-sdk/native/src/lib.rs b/sdks/plugin-sdk/native/src/lib.rs index 858b829d..480e1804 100644 --- a/sdks/plugin-sdk/native/src/lib.rs +++ b/sdks/plugin-sdk/native/src/lib.rs @@ -53,6 +53,25 @@ use streamkit_core::{InputPin, OutputPin, PinCardinality, Resource}; use logger::Logger; +/// Convert a [`CResult`] from a host callback into a Rust `Result`. +/// +/// # Safety +/// +/// If `result.error_message` is non-null it must point to a valid, +/// NUL-terminated C string. +unsafe fn result_from_c(result: types::CResult) -> Result<(), String> { + if result.success { + return Ok(()); + } + let error_msg = if result.error_message.is_null() { + "Unknown error".to_string() + } else { + conversions::c_str_to_string(result.error_message) + .unwrap_or_else(|_| "Unknown error".to_string()) + }; + Err(error_msg) +} + pub use streamkit_core; pub use types::*; @@ -63,7 +82,7 @@ pub mod prelude { pub use crate::{ native_plugin_entry, native_source_plugin_entry, plugin_debug, plugin_error, plugin_info, plugin_log, plugin_trace, plugin_warn, NativeProcessorNode, NativeSourceNode, NodeMetadata, - OutputSender, ResourceSupport, SourceConfig, + OutputSender, PooledAudioBuffer, PooledVideoBuffer, ResourceSupport, SourceConfig, }; pub use streamkit_core::types::{AudioFrame, Packet, PacketType}; pub use streamkit_core::{InputPin, OutputPin, PinCardinality, Resource, UpstreamHint}; @@ -160,35 +179,124 @@ impl NodeMetadataBuilder { } } +/// A video buffer allocated from the host's frame pool. +/// +/// Follows linear-type semantics: after allocation the plugin must either +/// pass the buffer to [`OutputSender::send_video`] (which consumes it) or +/// let it drop (which calls `free_fn` to return the buffer to the pool). +pub struct PooledVideoBuffer { + data: *mut u8, + len: usize, + handle: *mut std::os::raw::c_void, + free_fn: extern "C" fn(*mut std::os::raw::c_void), + consumed: bool, +} + +impl PooledVideoBuffer { + /// Writable slice into the pooled buffer. + #[allow(clippy::missing_const_for_fn)] // Dereferences a heap pointer; will never be called in const context. + pub fn as_mut_slice(&mut self) -> &mut [u8] { + // SAFETY: `data` was returned by the host's `alloc_video` callback + // and is valid for `len` bytes. Exclusive access is guaranteed by + // `&mut self` — no other reference exists. + unsafe { std::slice::from_raw_parts_mut(self.data, self.len) } + } + + /// Number of usable bytes. + #[must_use] + pub const fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the buffer is empty. + #[must_use] + pub const fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Mark the buffer as consumed (ownership transferred to the host). + #[allow(clippy::missing_const_for_fn)] // Mutates runtime state; const context is meaningless here. + fn consume(&mut self) -> (*mut std::os::raw::c_void, *const u8) { + self.consumed = true; + (self.handle, self.data) + } +} + +impl Drop for PooledVideoBuffer { + fn drop(&mut self) { + if !self.consumed { + (self.free_fn)(self.handle); + } + } +} + +/// An audio buffer allocated from the host's frame pool. +/// +/// Same linear-type semantics as [`PooledVideoBuffer`]. +pub struct PooledAudioBuffer { + data: *mut f32, + sample_count: usize, + handle: *mut std::os::raw::c_void, + free_fn: extern "C" fn(*mut std::os::raw::c_void), + consumed: bool, +} + +impl PooledAudioBuffer { + /// Writable slice into the pooled buffer. + #[allow(clippy::missing_const_for_fn)] // Dereferences a heap pointer; will never be called in const context. + pub fn as_mut_slice(&mut self) -> &mut [f32] { + // SAFETY: same as PooledVideoBuffer. + unsafe { std::slice::from_raw_parts_mut(self.data, self.sample_count) } + } + + /// Number of usable samples. + #[must_use] + pub const fn sample_count(&self) -> usize { + self.sample_count + } + + /// Returns `true` if the buffer is empty. + #[must_use] + pub const fn is_empty(&self) -> bool { + self.sample_count == 0 + } + + /// Mark the buffer as consumed (ownership transferred to the host). + #[allow(clippy::missing_const_for_fn)] // Mutates runtime state; const context is meaningless here. + fn consume(&mut self) -> (*mut std::os::raw::c_void, *const f32) { + self.consumed = true; + (self.handle, self.data) + } +} + +impl Drop for PooledAudioBuffer { + fn drop(&mut self) { + if !self.consumed { + (self.free_fn)(self.handle); + } + } +} + /// Output sender for sending packets to output pins pub struct OutputSender { - output_callback: COutputCallback, - output_user_data: *mut std::os::raw::c_void, - telemetry_callback: types::CTelemetryCallback, - telemetry_user_data: *mut std::os::raw::c_void, + callbacks: *const types::CNodeCallbacks, } impl OutputSender { - /// Create a new output sender from C callback - pub fn from_callback(callback: COutputCallback, user_data: *mut std::os::raw::c_void) -> Self { - Self { - output_callback: callback, - output_user_data: user_data, - telemetry_callback: None, - telemetry_user_data: std::ptr::null_mut(), - } + /// Create an `OutputSender` from a `CNodeCallbacks` pointer. + /// + /// # Safety + /// + /// The pointer must remain valid for the lifetime of this `OutputSender`. + pub const unsafe fn from_node_callbacks(callbacks: *const types::CNodeCallbacks) -> Self { + Self { callbacks } } - /// Create a new output sender from C callbacks. - /// - /// `telemetry_callback` may be null if the host doesn't provide telemetry support. - pub fn from_callbacks( - output_callback: COutputCallback, - output_user_data: *mut std::os::raw::c_void, - telemetry_callback: types::CTelemetryCallback, - telemetry_user_data: *mut std::os::raw::c_void, - ) -> Self { - Self { output_callback, output_user_data, telemetry_callback, telemetry_user_data } + /// Access the underlying callbacks. + const fn cb(&self) -> &types::CNodeCallbacks { + // SAFETY: The pointer is valid for the lifetime of this `OutputSender`, + // guaranteed by the caller of `from_node_callbacks`. + unsafe { &*self.callbacks } } /// Send a packet to an output pin @@ -200,27 +308,149 @@ impl OutputSender { /// - The C callback returns an error pub fn send(&self, pin: &str, packet: &Packet) -> Result<(), String> { let pin_c = CString::new(pin).map_err(|e| format!("Invalid pin name: {e}"))?; + let cb = self.cb(); let packet_repr = conversions::packet_to_c(packet); - let result = (self.output_callback)( + let result = (cb.output_callback)( pin_c.as_ptr(), &raw const packet_repr.packet, - self.output_user_data, + cb.output_user_data, ); - if result.success { - Ok(()) - } else { - let error_msg = if result.error_message.is_null() { - "Unknown error".to_string() - } else { - unsafe { - conversions::c_str_to_string(result.error_message) - .unwrap_or_else(|_| "Unknown error".to_string()) - } - }; - Err(error_msg) + // SAFETY: CResult from host callback; error_message is a valid C string if non-null. + unsafe { result_from_c(result) } + } + + // ── Frame pool allocation ───────────────────────────────────────────── + + /// Allocate a video buffer from the host's frame pool. + /// + /// Returns `None` if the host has no video pool or allocation fails. + pub fn alloc_video(&self, min_bytes: usize) -> Option { + let cb = self.cb(); + let alloc_fn = cb.alloc_video?; + let res = alloc_fn(min_bytes, cb.alloc_user_data); + if res.data.is_null() || res.free_fn.is_none() { + return None; } + Some(PooledVideoBuffer { + data: res.data, + len: res.len, + handle: res.handle, + // SAFETY: free_fn is guaranteed to be Some by the check above. + free_fn: unsafe { res.free_fn.unwrap_unchecked() }, + consumed: false, + }) + } + + /// Allocate an audio buffer from the host's frame pool. + /// + /// Returns `None` if the host has no audio pool or allocation fails. + pub fn alloc_audio(&self, min_samples: usize) -> Option { + let cb = self.cb(); + let alloc_fn = cb.alloc_audio?; + let res = alloc_fn(min_samples, cb.alloc_user_data); + if res.data.is_null() || res.free_fn.is_none() { + return None; + } + Some(PooledAudioBuffer { + data: res.data, + sample_count: res.sample_count, + handle: res.handle, + // SAFETY: free_fn is guaranteed to be Some by the check above. + free_fn: unsafe { res.free_fn.unwrap_unchecked() }, + consumed: false, + }) + } + + /// Send a video frame using a pool-allocated buffer (zero-copy path). + /// + /// Consumes `buf` — ownership transfers to the host. + /// + /// # Errors + /// + /// Returns an error if the pin name is invalid or the host rejects the + /// packet. + pub fn send_video( + &self, + pin: &str, + width: u32, + height: u32, + pixel_format: streamkit_core::types::PixelFormat, + mut buf: PooledVideoBuffer, + metadata: Option<&streamkit_core::types::PacketMetadata>, + ) -> Result<(), String> { + let pin_c = CString::new(pin).map_err(|e| format!("Invalid pin name: {e}"))?; + let cb = self.cb(); + + let (handle, data_ptr) = buf.consume(); + + let c_meta = metadata.map(conversions::metadata_to_c); + let c_meta_ptr = c_meta.as_ref().map_or(std::ptr::null(), std::ptr::from_ref); + + let c_frame = types::CVideoFrame { + width, + height, + pixel_format: conversions::pixel_format_to_c(pixel_format), + data: data_ptr, + data_len: buf.len(), + buffer_handle: handle, + metadata: c_meta_ptr, + }; + + let c_pkt = types::CPacket { + packet_type: types::CPacketType::RawVideo, + data: std::ptr::from_ref(&c_frame).cast(), + len: std::mem::size_of::(), + }; + + let result = (cb.output_callback)(pin_c.as_ptr(), &raw const c_pkt, cb.output_user_data); + // SAFETY: CResult from host callback; error_message is a valid C string if non-null. + unsafe { result_from_c(result) } + } + + /// Send an audio frame using a pool-allocated buffer (zero-copy path). + /// + /// Consumes `buf` — ownership transfers to the host. + /// + /// # Errors + /// + /// Returns an error if the pin name is invalid or the host rejects the + /// packet. + pub fn send_audio( + &self, + pin: &str, + sample_rate: u32, + channels: u16, + mut buf: PooledAudioBuffer, + metadata: Option<&streamkit_core::types::PacketMetadata>, + ) -> Result<(), String> { + let pin_c = CString::new(pin).map_err(|e| format!("Invalid pin name: {e}"))?; + let cb = self.cb(); + + let (handle, data_ptr) = buf.consume(); + + let c_meta = metadata.map(conversions::metadata_to_c); + let c_meta_ptr = c_meta.as_ref().map_or(std::ptr::null(), std::ptr::from_ref); + + let c_frame = types::CAudioFrame { + sample_rate, + channels, + samples: data_ptr, + sample_count: buf.sample_count(), + buffer_handle: handle, + metadata: c_meta_ptr, + }; + + let c_pkt = types::CPacket { + packet_type: types::CPacketType::RawAudio, + data: std::ptr::from_ref(&c_frame).cast(), + len: std::mem::size_of::(), + }; + + let result = (cb.output_callback)(pin_c.as_ptr(), &raw const c_pkt, cb.output_user_data); + // SAFETY: CResult from host callback; error_message is a valid C string if non-null. + unsafe { result_from_c(result) } } /// Emit a telemetry event to the host telemetry bus (best-effort). @@ -241,7 +471,8 @@ impl OutputSender { data: &serde_json::Value, timestamp_us: Option, ) -> Result<(), String> { - let Some(cb) = self.telemetry_callback else { + let cb = self.cb(); + let Some(telemetry_cb) = cb.telemetry_callback else { return Ok(()); }; @@ -260,27 +491,16 @@ impl OutputSender { }); let meta_ptr = meta.as_ref().map_or(std::ptr::null(), std::ptr::from_ref); - let result = cb( + let result = telemetry_cb( event_type_c.as_ptr(), data_json.as_ptr(), data_json.len(), meta_ptr, - self.telemetry_user_data, + cb.telemetry_user_data, ); - if result.success { - Ok(()) - } else { - let error_msg = if result.error_message.is_null() { - "Unknown error".to_string() - } else { - unsafe { - conversions::c_str_to_string(result.error_message) - .unwrap_or_else(|_| "Unknown error".to_string()) - } - }; - Err(error_msg) - } + // SAFETY: CResult from host callback; error_message is a valid C string if non-null. + unsafe { result_from_c(result) } } } @@ -984,12 +1204,9 @@ macro_rules! native_plugin_entry { handle: $crate::types::CPluginHandle, input_pin: *const std::os::raw::c_char, packet: *const $crate::types::CPacket, - output_callback: $crate::types::COutputCallback, - callback_data: *mut std::os::raw::c_void, - telemetry_callback: $crate::types::CTelemetryCallback, - telemetry_callback_data: *mut std::os::raw::c_void, + callbacks: *const $crate::types::CNodeCallbacks, ) -> $crate::types::CResult { - if handle.is_null() || input_pin.is_null() || packet.is_null() { + if handle.is_null() || input_pin.is_null() || packet.is_null() || callbacks.is_null() { return $crate::types::CResult::error(std::ptr::null()); } @@ -1011,12 +1228,7 @@ macro_rules! native_plugin_entry { } }; - let output = $crate::OutputSender::from_callbacks( - output_callback, - callback_data, - telemetry_callback, - telemetry_callback_data, - ); + let output = unsafe { $crate::OutputSender::from_node_callbacks(callbacks) }; match instance.process(&pin_name, rust_packet, &output) { Ok(()) => $crate::types::CResult::success(), @@ -1070,33 +1282,24 @@ macro_rules! native_plugin_entry { extern "C" fn __plugin_flush( handle: $crate::types::CPluginHandle, - callback: $crate::types::COutputCallback, - callback_data: *mut std::os::raw::c_void, - telemetry_callback: $crate::types::CTelemetryCallback, - telemetry_callback_data: *mut std::os::raw::c_void, + callbacks: *const $crate::types::CNodeCallbacks, ) -> $crate::types::CResult { - tracing::info!("__plugin_flush called"); - if handle.is_null() { - tracing::error!("Handle is null"); - let err_msg = $crate::conversions::error_to_c("Invalid handle (null)"); + tracing::trace!("__plugin_flush called"); + if handle.is_null() || callbacks.is_null() { + tracing::error!("Handle or callbacks is null"); + let err_msg = $crate::conversions::error_to_c("Invalid handle or callbacks (null)"); return $crate::types::CResult::error(err_msg); } let instance = unsafe { &mut *(handle as *mut $plugin_type) }; - tracing::info!("Got instance pointer"); - - // Create OutputSender wrapper for the callback - let output_sender = $crate::OutputSender::from_callbacks( - callback, - callback_data, - telemetry_callback, - telemetry_callback_data, - ); - tracing::info!("Created OutputSender, calling instance.flush()"); + tracing::trace!("Got instance pointer"); + + let output_sender = unsafe { $crate::OutputSender::from_node_callbacks(callbacks) }; + tracing::trace!("Created OutputSender, calling instance.flush()"); match instance.flush(&output_sender) { Ok(()) => { - tracing::info!("instance.flush() returned Ok"); + tracing::trace!("instance.flush() returned Ok"); $crate::types::CResult::success() }, Err(e) => { @@ -1529,23 +1732,15 @@ macro_rules! native_source_plugin_entry { extern "C" fn __plugin_tick( handle: $crate::types::CPluginHandle, - output_callback: $crate::types::COutputCallback, - callback_data: *mut std::os::raw::c_void, - telemetry_callback: $crate::types::CTelemetryCallback, - telemetry_callback_data: *mut std::os::raw::c_void, + callbacks: *const $crate::types::CNodeCallbacks, ) -> $crate::types::CTickResult { - if handle.is_null() { - let err = $crate::conversions::error_to_c("Invalid handle (null)"); + if handle.is_null() || callbacks.is_null() { + let err = $crate::conversions::error_to_c("Invalid handle or callbacks (null)"); return $crate::types::CTickResult::error(err); } let instance = unsafe { &mut *(handle as *mut $plugin_type) }; - let output = $crate::OutputSender::from_callbacks( - output_callback, - callback_data, - telemetry_callback, - telemetry_callback_data, - ); + let output = unsafe { $crate::OutputSender::from_node_callbacks(callbacks) }; match instance.tick(&output) { Ok(done) => { @@ -1568,10 +1763,7 @@ macro_rules! native_source_plugin_entry { _handle: $crate::types::CPluginHandle, _input_pin: *const std::os::raw::c_char, _packet: *const $crate::types::CPacket, - _output_callback: $crate::types::COutputCallback, - _callback_data: *mut std::os::raw::c_void, - _telemetry_callback: $crate::types::CTelemetryCallback, - _telemetry_callback_data: *mut std::os::raw::c_void, + _callbacks: *const $crate::types::CNodeCallbacks, ) -> $crate::types::CResult { let err = $crate::conversions::error_to_c( "process_packet called on source plugin (not supported)", @@ -1581,10 +1773,7 @@ macro_rules! native_source_plugin_entry { extern "C" fn __plugin_flush_noop( _handle: $crate::types::CPluginHandle, - _callback: $crate::types::COutputCallback, - _callback_data: *mut std::os::raw::c_void, - _telemetry_callback: $crate::types::CTelemetryCallback, - _telemetry_callback_data: *mut std::os::raw::c_void, + _callbacks: *const $crate::types::CNodeCallbacks, ) -> $crate::types::CResult { $crate::types::CResult::success() } diff --git a/sdks/plugin-sdk/native/src/types.rs b/sdks/plugin-sdk/native/src/types.rs index 8b3e3e1c..46094c77 100644 --- a/sdks/plugin-sdk/native/src/types.rs +++ b/sdks/plugin-sdk/native/src/types.rs @@ -20,7 +20,11 @@ use std::os::raw::{c_char, c_void}; /// dynamic runtime parameter discovery. /// v5: Added `on_upstream_hint` for receiving advisory hints from /// downstream consumers (e.g. preferred output resolution). -pub const NATIVE_PLUGIN_API_VERSION: u32 = 5; +/// v6: Added frame pool allocation (`CNodeCallbacks`, `CAllocVideoResult`, +/// `CAllocAudioResult`). Consolidated per-call callback parameters +/// into a single `CNodeCallbacks` struct. Extended `CVideoFrame` and +/// `CAudioFrame` with `buffer_handle` and `metadata` fields. +pub const NATIVE_PLUGIN_API_VERSION: u32 = 6; /// Opaque handle to a plugin instance pub type CPluginHandle = *mut c_void; @@ -165,6 +169,10 @@ pub struct CRawVideoFormat { /// Video frame data passed across the C ABI boundary. /// /// `data` points to raw pixel bytes; layout depends on `pixel_format`. +/// +/// When `buffer_handle` is non-null, the buffer was allocated from the +/// host's frame pool via [`CAllocVideoFn`]. The host reclaims the +/// underlying [`PooledVideoData`] directly — no copy is needed. #[repr(C)] pub struct CVideoFrame { pub width: u32, @@ -172,6 +180,11 @@ pub struct CVideoFrame { pub pixel_format: CPixelFormat, pub data: *const u8, pub data_len: usize, + /// Opaque handle returned by [`CAllocVideoFn`]. NULL for legacy + /// (non-pooled) frames. + pub buffer_handle: *mut c_void, + /// Optional metadata (may be null). + pub metadata: *const CPacketMetadata, } /// Encoding for Custom packets. @@ -225,12 +238,21 @@ pub struct CPacketTypeInfo { } /// Audio frame data (for RawAudio packets) +/// +/// When `buffer_handle` is non-null, the buffer was allocated from the +/// host's audio frame pool via [`CAllocAudioFn`]. The host reclaims +/// the underlying [`PooledSamples`] directly — no copy is needed. #[repr(C)] pub struct CAudioFrame { pub sample_rate: u32, pub channels: u16, pub samples: *const f32, pub sample_count: usize, + /// Opaque handle returned by [`CAllocAudioFn`]. NULL for legacy + /// (non-pooled) frames. + pub buffer_handle: *mut c_void, + /// Optional metadata (may be null). + pub metadata: *const CPacketMetadata, } /// Generic packet container @@ -358,18 +380,12 @@ pub struct CNativePluginAPI { /// handle: Plugin instance handle. /// input_pin: Name of the input pin. /// packet: The packet to process. - /// output_callback: Callback to send output packets. - /// callback_data: User data to pass to output callback. - /// telemetry_callback: Callback to emit telemetry events. - /// telemetry_user_data: User data to pass to telemetry callback. + /// callbacks: Consolidated callback bundle (output + telemetry + alloc). pub process_packet: extern "C" fn( CPluginHandle, *const c_char, *const CPacket, - COutputCallback, - *mut c_void, - CTelemetryCallback, - *mut c_void, + *const CNodeCallbacks, ) -> CResult, /// Update runtime parameters. @@ -379,17 +395,8 @@ pub struct CNativePluginAPI { /// Flush any buffered data (called when input stream ends). /// handle: Plugin instance handle. - /// output_callback: Callback to send output packets. - /// callback_data: User data to pass to output callback. - /// telemetry_callback: Callback to emit telemetry events. - /// telemetry_user_data: User data to pass to telemetry callback. - pub flush: extern "C" fn( - CPluginHandle, - COutputCallback, - *mut c_void, - CTelemetryCallback, - *mut c_void, - ) -> CResult, + /// callbacks: Consolidated callback bundle (output + telemetry + alloc). + pub flush: extern "C" fn(CPluginHandle, *const CNodeCallbacks) -> CResult, /// Destroy a plugin instance. /// handle: Plugin instance handle. @@ -411,15 +418,7 @@ pub struct CNativePluginAPI { /// completion. /// /// `None` for processor plugins. - pub tick: Option< - extern "C" fn( - CPluginHandle, - COutputCallback, - *mut c_void, - CTelemetryCallback, - *mut c_void, - ) -> CTickResult, - >, + pub tick: Option CTickResult>, // ── v4 additions ────────────────────────────────────────────────────── /// Query runtime-discovered param schema after instance creation. @@ -446,5 +445,102 @@ pub struct CNativePluginAPI { pub on_upstream_hint: Option CResult>, } +// ── v6 additions: frame pool allocation ──────────────────────────────── + +/// Result of a video buffer allocation from the host's frame pool. +/// +/// If `data` is non-null the allocation succeeded and the plugin owns the +/// buffer until it either passes it back via `CVideoFrame::buffer_handle` +/// or calls `free_fn(handle)` to release it without sending. +#[repr(C)] +pub struct CAllocVideoResult { + /// Pointer to the writable buffer, or null on failure. + pub data: *mut u8, + /// Usable byte count (≥ requested `min_bytes`). + pub len: usize, + /// Opaque handle the plugin must store in `CVideoFrame::buffer_handle` + /// (or pass to `free_fn` if the buffer is discarded). + pub handle: *mut c_void, + /// Releases the buffer without sending. The plugin **must** call this + /// if it decides not to send the frame (e.g. on error paths). + pub free_fn: Option, +} + +impl CAllocVideoResult { + /// Null / failed allocation sentinel. + pub const fn null() -> Self { + Self { data: std::ptr::null_mut(), len: 0, handle: std::ptr::null_mut(), free_fn: None } + } +} + +/// Result of an audio buffer allocation from the host's frame pool. +#[repr(C)] +pub struct CAllocAudioResult { + /// Pointer to the writable sample buffer, or null on failure. + pub data: *mut f32, + /// Number of usable samples (≥ requested `min_samples`). + pub sample_count: usize, + /// Opaque handle the plugin must store in `CAudioFrame::buffer_handle` + /// (or pass to `free_fn` if the buffer is discarded). + pub handle: *mut c_void, + /// Releases the buffer without sending. + pub free_fn: Option, +} + +impl CAllocAudioResult { + /// Null / failed allocation sentinel. + pub const fn null() -> Self { + Self { + data: std::ptr::null_mut(), + sample_count: 0, + handle: std::ptr::null_mut(), + free_fn: None, + } + } +} + +/// Callback: allocate a video buffer from the host's frame pool. +/// +/// `min_bytes` — minimum buffer size in bytes. +/// `user_data` — opaque pointer provided by the host. +pub type CAllocVideoFn = extern "C" fn(usize, *mut c_void) -> CAllocVideoResult; + +/// Callback: allocate an audio buffer from the host's frame pool. +/// +/// `min_samples` — minimum buffer size in samples. +/// `user_data` — opaque pointer provided by the host. +pub type CAllocAudioFn = extern "C" fn(usize, *mut c_void) -> CAllocAudioResult; + +/// Consolidated callback bundle passed to `process_packet`, `flush`, and +/// `tick` starting in API v6. +/// +/// Replaces the previous positional callback + user-data pairs with a +/// single struct pointer, making the ABI easier to extend in the future. +/// +/// `struct_size` is set by the host so that a v7 plugin running on a v6 +/// host can detect which fields are present. +#[repr(C)] +pub struct CNodeCallbacks { + /// Size of this struct in bytes (set by the host). + pub struct_size: usize, + + // ── output ────────────────────────────────────────────────────────── + pub output_callback: COutputCallback, + pub output_user_data: *mut c_void, + + // ── telemetry ─────────────────────────────────────────────────────── + pub telemetry_callback: CTelemetryCallback, + pub telemetry_user_data: *mut c_void, + + // ── frame pool allocation (v6) ───────────────────────────────────── + /// May be `None` if the host has no video pool for this pipeline. + pub alloc_video: Option, + /// May be `None` if the host has no audio pool for this pipeline. + pub alloc_audio: Option, + /// Opaque pointer passed as the last argument to `alloc_video` / + /// `alloc_audio`. + pub alloc_user_data: *mut c_void, +} + /// Symbol name that plugins must export pub const PLUGIN_API_SYMBOL: &[u8] = b"streamkit_native_plugin_api\0";