From bb556a1eb31da52d739b9b86992bcb722c0a4a07 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Wed, 8 Apr 2026 15:00:43 +0000 Subject: [PATCH 1/5] feat(nodes): add S3/object store sink node via OpenDAL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ObjectStoreWriteNode — a new sink node that streams Binary packets to S3-compatible object storage using Apache OpenDAL. Supports AWS S3, MinIO, RustFS, GCS, Azure Blob, and other S3-compatible backends. Key design: - Multipart upload via OpenDAL Writer with configurable chunk size (default 5 MiB) for bounded memory usage - Credentials resolved from env vars (precedence) or inline config, following the RTMP node's stream_key_env pattern - Full state lifecycle: Initializing → Running → Stopped/Failed - Tracing at each stage for runtime debugging - Behind opt-in 'object_store' feature flag (not in default) Includes: - 3 sample pipelines (TTS→S3, transcode→S3, MoQ+S3 archive) - docker-compose.rustfs.yml for local E2E validation with RustFS - 8 unit tests covering pins, factory validation, credential resolution, and state transitions Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- Cargo.lock | 283 ++++++++- crates/nodes/Cargo.toml | 6 + crates/nodes/src/core/mod.rs | 21 + crates/nodes/src/core/object_store_write.rs | 550 ++++++++++++++++++ docker-compose.rustfs.yml | 48 ++ samples/pipelines/dynamic/moq_s3_archive.yml | 83 +++ samples/pipelines/oneshot/transcode_to_s3.yml | 57 ++ samples/pipelines/oneshot/tts_to_s3.yml | 70 +++ 8 files changed, 1106 insertions(+), 12 deletions(-) create mode 100644 crates/nodes/src/core/object_store_write.rs create mode 100644 docker-compose.rustfs.yml create mode 100644 samples/pipelines/dynamic/moq_s3_archive.yml create mode 100644 samples/pipelines/oneshot/transcode_to_s3.yml create mode 100644 samples/pipelines/oneshot/tts_to_s3.yml diff --git a/Cargo.lock b/Cargo.lock index 78e246af..800ab5c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,7 +131,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -142,7 +142,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -526,6 +526,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -828,8 +839,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -1013,6 +1026,32 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.17", + "once_cell", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.10.0" @@ -1231,6 +1270,15 @@ version = "0.128.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "903adeaf4938e60209a97b53a2e4326cd2d356aab9764a1934630204bae381c9" +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -1449,6 +1497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -1495,6 +1544,15 @@ dependencies = [ "syn", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "document-features" version = "0.2.12" @@ -1600,7 +1658,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2012,6 +2070,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "gpu-allocator" version = "0.28.0" @@ -2106,6 +2176,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -2166,6 +2242,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfa686283ad6dd069f105e5ab091b04c62850d3e4cf5d67debad1933f55023df" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "home" version = "0.5.12" @@ -2541,7 +2626,7 @@ dependencies = [ "log", "num-format", "once_cell", - "quick-xml", + "quick-xml 0.26.0", "rgb", "str_stack", ] @@ -2603,7 +2688,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2682,6 +2767,47 @@ dependencies = [ "tracing", ] +[[package]] +name = "jiff" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359" +dependencies = [ + "jiff-static", + "jiff-tzdb-platform", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", + "windows-sys 0.61.2", +] + +[[package]] +name = "jiff-static" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "jiff-tzdb" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c900ef84826f1338a557697dc8fc601df9ca9af4ac137c7fb61d4c6f2dfd3076" + +[[package]] +name = "jiff-tzdb-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8" +dependencies = [ + "jiff-tzdb", +] + [[package]] name = "jni" version = "0.21.1" @@ -2926,6 +3052,16 @@ dependencies = [ "rayon", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.8.0" @@ -3221,7 +3357,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3473,6 +3609,35 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "opendal" +version = "0.55.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a" +dependencies = [ + "anyhow", + "backon", + "base64 0.22.1", + "bytes", + "crc32c", + "futures", + "getrandom 0.2.17", + "http", + "http-body", + "jiff", + "log", + "md-5", + "percent-encoding", + "quick-xml 0.38.4", + "reqsign", + "reqwest 0.12.28", + "serde", + "serde_json", + "tokio", + "url", + "uuid", +] + [[package]] name = "openh264" version = "0.9.3" @@ -3632,6 +3797,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -4090,6 +4265,26 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quick-xml" +version = "0.38.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66c2058c55a409d601666cffe35f04333cf1013010882cec174a7467cd4e21c" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.9" @@ -4503,6 +4698,35 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832" +[[package]] +name = "reqsign" +version = "0.16.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "chrono", + "form_urlencoded", + "getrandom 0.2.17", + "hex", + "hmac", + "home", + "http", + "log", + "percent-encoding", + "quick-xml 0.37.5", + "rand 0.8.5", + "reqwest 0.12.28", + "rust-ini", + "serde", + "serde_json", + "sha1", + "sha2", + "tokio", +] + [[package]] name = "reqwest" version = "0.12.28" @@ -4528,12 +4752,14 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-util 0.7.18", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams 0.4.2", "web-sys", ] @@ -4577,7 +4803,7 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.5.0", "web-sys", ] @@ -4707,6 +4933,16 @@ dependencies = [ "walkdir", ] +[[package]] +name = "rust-ini" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796e8d2b6696392a43bea58116b667fb4c29727dc5abd27d6acf338bb4f688c7" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + [[package]] name = "rustc-demangle" version = "0.1.27" @@ -4780,7 +5016,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4858,7 +5094,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5321,7 +5557,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -5465,6 +5701,7 @@ dependencies = [ "moq-lite", "moq-native", "ogg", + "opendal", "openh264", "opentelemetry", "opus", @@ -5933,7 +6170,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6062,6 +6299,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tiny-skia" version = "0.12.0" @@ -6996,6 +7242,19 @@ dependencies = [ "wasmparser 0.244.0", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-streams" version = "0.5.0" @@ -7717,7 +7976,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/crates/nodes/Cargo.toml b/crates/nodes/Cargo.toml index e0c277f8..a949afc2 100644 --- a/crates/nodes/Cargo.toml +++ b/crates/nodes/Cargo.toml @@ -96,6 +96,11 @@ rustls-platform-verifier = { version = "0.6", optional = true } rav1e = { version = "0.8", optional = true, default-features = false, features = ["threading", "asm"] } rav1d = { version = "1.1", optional = true, default-features = false, features = ["bitdepth_8", "bitdepth_16", "asm"] } +# Object storage (optional, behind `object_store` feature) +opendal = { version = "0.55", optional = true, default-features = false, features = [ + "services-s3", +] } + # GPU compositing (optional, behind `gpu` feature) wgpu = { version = "29", optional = true, default-features = false, features = ["vulkan", "metal", "dx12", "wgsl"] } pollster = { version = "0.4", optional = true } @@ -167,6 +172,7 @@ dav1d_static = ["dav1d"] colorbars = ["dep:schemars", "dep:serde_json", "dep:fontdue"] compositor = ["dep:schemars", "dep:serde_json", "dep:image", "dep:tiny-skia", "dep:rayon", "dep:fontdue", "dep:smallvec", "dep:uuid", "dep:resvg"] gpu = ["compositor", "dep:wgpu", "dep:pollster", "dep:bytemuck"] +object_store = ["dep:opendal", "dep:schemars"] codegen = ["dep:ts-rs"] video = ["vp9", "av1", "openh264", "colorbars", "compositor"] diff --git a/crates/nodes/src/core/mod.rs b/crates/nodes/src/core/mod.rs index 16682cf9..21ec0c29 100644 --- a/crates/nodes/src/core/mod.rs +++ b/crates/nodes/src/core/mod.rs @@ -10,6 +10,8 @@ pub mod bytes_output; pub mod file_read; pub mod file_write; pub mod json_serialize; +#[cfg(feature = "object_store")] +pub mod object_store_write; pub mod pacer; mod passthrough; #[cfg(feature = "script")] @@ -190,4 +192,23 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode // --- Register TelemetryOut Node --- telemetry_out::register(registry); + + // --- Register ObjectStoreWriteNode --- + #[cfg(feature = "object_store")] + { + use schemars::schema_for; + + let factory = object_store_write::ObjectStoreWriteNode::factory(); + registry.register_dynamic_with_description( + "core::object_store_writer", + move |params| (factory)(params), + serde_json::to_value(schema_for!(object_store_write::ObjectStoreWriteConfig)) + .expect("ObjectStoreWriteConfig schema should serialize to JSON"), + vec!["core".to_string(), "io".to_string(), "object_store".to_string()], + false, + "Streams binary data to S3-compatible object storage (AWS S3, GCS, Azure, MinIO, RustFS, etc.). \ + Uses multipart upload for bounded memory usage. \ + Credentials can be provided via config or environment variables.", + ); + } } diff --git a/crates/nodes/src/core/object_store_write.rs b/crates/nodes/src/core/object_store_write.rs new file mode 100644 index 00000000..bd294881 --- /dev/null +++ b/crates/nodes/src/core/object_store_write.rs @@ -0,0 +1,550 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Object store write node — streams binary data to S3-compatible object storage. +//! +//! Uses [Apache OpenDAL](https://opendal.apache.org/) to support S3, GCS, +//! Azure Blob, MinIO, RustFS, and other compatible backends. +//! +//! Incoming [`Packet::Binary`] packets are buffered up to `chunk_size` and +//! written via OpenDAL's multipart [`Writer`](opendal::Writer), keeping memory +//! bounded regardless of the total upload size. + +use async_trait::async_trait; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use streamkit_core::types::{Packet, PacketType}; +use streamkit_core::{ + config_helpers, state_helpers, stats::NodeStatsTracker, InputPin, NodeContext, OutputPin, + PinCardinality, ProcessorNode, StreamKitError, +}; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +/// Default buffer/chunk size: 5 MiB (the S3 minimum multipart part size). +const DEFAULT_CHUNK_SIZE: usize = 5 * 1024 * 1024; + +const fn default_chunk_size() -> usize { + DEFAULT_CHUNK_SIZE +} + +fn default_region() -> String { + "us-east-1".to_string() +} + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +/// Configuration for the object store write node. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreWriteConfig { + /// S3-compatible endpoint URL. + /// + /// Examples: + /// - AWS S3: `https://s3.amazonaws.com` + /// - MinIO / RustFS: `http://localhost:9000` + /// - Cloudflare R2: `https://.r2.cloudflarestorage.com` + pub endpoint: String, + + /// Bucket name. + pub bucket: String, + + /// Object key (path within the bucket). + pub key: String, + + /// AWS region (default: `us-east-1`). + /// + /// Most S3-compatible services accept any region string; set this to + /// match the bucket's actual region for AWS S3. + #[serde(default = "default_region")] + pub region: String, + + /// Access key ID. + /// + /// If omitted, the node falls back to `access_key_id_env`. + #[serde(default)] + pub access_key_id: Option, + + /// Environment variable name containing the access key ID. + /// + /// Read at node startup. Takes precedence over `access_key_id`. + #[serde(default)] + pub access_key_id_env: Option, + + /// Secret access key. + /// + /// If omitted, the node falls back to `secret_key_env`. + #[serde(default)] + pub secret_access_key: Option, + + /// Environment variable name containing the secret access key. + /// + /// Read at node startup. Takes precedence over `secret_access_key`. + #[serde(default)] + pub secret_key_env: Option, + + /// Buffer size before flushing to the object store (default: 5 MiB). + /// + /// This controls the multipart upload part size. S3 requires a minimum + /// part size of 5 MiB (except the last part). + #[serde(default = "default_chunk_size")] + #[schemars(range(min = 1))] + pub chunk_size: usize, + + /// Optional MIME content type for the uploaded object + /// (e.g. `audio/ogg`, `video/mp4`). + #[serde(default)] + pub content_type: Option, +} + +// --------------------------------------------------------------------------- +// Credential helpers +// --------------------------------------------------------------------------- + +/// Resolve a credential value from an env-var name (takes precedence) or a +/// literal fallback. +fn resolve_credential( + env_name: Option<&str>, + literal: Option<&str>, + label: &str, +) -> Result { + if let Some(env) = env_name { + match std::env::var(env) { + Ok(val) if !val.is_empty() => { + tracing::debug!("Resolved {label} from env var {env}"); + return Ok(val); + }, + Ok(_) => { + return Err(StreamKitError::Configuration(format!( + "Environment variable '{env}' for {label} is empty" + ))); + }, + Err(_) => { + return Err(StreamKitError::Configuration(format!( + "Environment variable '{env}' for {label} is not set" + ))); + }, + } + } + if let Some(val) = literal { + if val.is_empty() { + return Err(StreamKitError::Configuration(format!("{label} is empty"))); + } + return Ok(val.to_string()); + } + Err(StreamKitError::Configuration(format!("No {label} provided (set via config or env var)"))) +} + +// --------------------------------------------------------------------------- +// Node +// --------------------------------------------------------------------------- + +/// Sink node that streams [`Packet::Binary`] data to S3-compatible object +/// storage via OpenDAL's multipart upload. +pub struct ObjectStoreWriteNode { + config: ObjectStoreWriteConfig, +} + +impl ObjectStoreWriteNode { + pub fn factory() -> streamkit_core::node::NodeFactory { + std::sync::Arc::new(|params| { + let config: ObjectStoreWriteConfig = if params.is_none() { + // Default config for pin inspection only (dynamic registration) + ObjectStoreWriteConfig { + endpoint: String::new(), + bucket: String::new(), + key: String::new(), + region: default_region(), + access_key_id: None, + access_key_id_env: None, + secret_access_key: None, + secret_key_env: None, + chunk_size: default_chunk_size(), + content_type: None, + } + } else { + config_helpers::parse_config_required(params)? + }; + + if config.chunk_size == 0 { + return Err(StreamKitError::Configuration( + "chunk_size must be greater than 0".to_string(), + )); + } + + Ok(Box::new(Self { config })) + }) + } +} + +// --------------------------------------------------------------------------- +// ProcessorNode implementation +// --------------------------------------------------------------------------- + +#[async_trait] +impl ProcessorNode for ObjectStoreWriteNode { + fn input_pins(&self) -> Vec { + vec![InputPin { + name: "in".to_string(), + accepts_types: vec![PacketType::Binary], + cardinality: PinCardinality::One, + }] + } + + fn output_pins(&self) -> Vec { + // Sink — no outputs. + vec![] + } + + async fn run(self: Box, mut context: NodeContext) -> Result<(), StreamKitError> { + let node_name = context.output_sender.node_name().to_string(); + state_helpers::emit_initializing(&context.state_tx, &node_name); + + // ── Resolve credentials ────────────────────────────────────────── + let access_key = resolve_credential( + self.config.access_key_id_env.as_deref(), + self.config.access_key_id.as_deref(), + "access_key_id", + ) + .inspect_err(|e| { + state_helpers::emit_failed(&context.state_tx, &node_name, e.to_string()); + })?; + + let secret_key = resolve_credential( + self.config.secret_key_env.as_deref(), + self.config.secret_access_key.as_deref(), + "secret_access_key", + ) + .inspect_err(|e| { + state_helpers::emit_failed(&context.state_tx, &node_name, e.to_string()); + })?; + + tracing::info!( + %node_name, + endpoint = %self.config.endpoint, + bucket = %self.config.bucket, + key = %self.config.key, + region = %self.config.region, + chunk_size = self.config.chunk_size, + "ObjectStoreWriteNode initializing" + ); + + // ── Build OpenDAL operator ─────────────────────────────────────── + let operator = { + let mut cfg = std::collections::HashMap::new(); + cfg.insert("bucket".to_string(), self.config.bucket.clone()); + cfg.insert("endpoint".to_string(), self.config.endpoint.clone()); + cfg.insert("region".to_string(), self.config.region.clone()); + cfg.insert("access_key_id".to_string(), access_key); + cfg.insert("secret_access_key".to_string(), secret_key); + // Disable credential loading from environment/instance metadata — + // we resolve credentials explicitly above. + cfg.insert("disable_config_load".to_string(), "true".to_string()); + + opendal::Operator::from_iter::(cfg) + .map_err(|e| { + let msg = format!("Failed to build S3 operator: {e}"); + state_helpers::emit_failed(&context.state_tx, &node_name, &msg); + StreamKitError::Runtime(msg) + })? + .finish() + }; + + tracing::info!(%node_name, "S3 operator created, opening writer"); + + // ── Open writer (multipart upload) ─────────────────────────────── + let writer_future = operator.writer_with(&self.config.key).chunk(self.config.chunk_size); + + // Apply content type if configured. + let mut writer = if let Some(ref ct) = self.config.content_type { + writer_future.content_type(ct).await + } else { + writer_future.await + } + .map_err(|e| { + let msg = format!("Failed to open S3 writer for '{}': {e}", self.config.key); + state_helpers::emit_failed(&context.state_tx, &node_name, &msg); + StreamKitError::Runtime(msg) + })?; + + tracing::info!( + %node_name, + key = %self.config.key, + "S3 multipart writer opened, entering receive loop" + ); + + state_helpers::emit_running(&context.state_tx, &node_name); + + // ── Receive loop ───────────────────────────────────────────────── + let mut input_rx = context.take_input("in")?; + let mut stats_tracker = NodeStatsTracker::new(node_name.clone(), context.stats_tx.clone()); + let mut packet_count: u64 = 0; + let mut total_bytes: u64 = 0; + let mut buffer = Vec::with_capacity(self.config.chunk_size); + let mut chunks_written: u64 = 0; + let mut reason = "input_closed".to_string(); + + while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await { + if let Packet::Binary { data, .. } = packet { + stats_tracker.received(); + packet_count += 1; + total_bytes += data.len() as u64; + + buffer.extend_from_slice(&data); + + // Flush when buffer reaches chunk_size + while buffer.len() >= self.config.chunk_size { + let chunk: Vec = buffer.drain(..self.config.chunk_size).collect(); + if let Err(e) = writer.write(chunk).await { + stats_tracker.errored(); + stats_tracker.force_send(); + let msg = format!("S3 write error: {e}"); + state_helpers::emit_failed(&context.state_tx, &node_name, &msg); + // Attempt to abort the multipart upload to clean up + if let Err(abort_err) = writer.abort().await { + tracing::error!( + %node_name, + error = %abort_err, + "Failed to abort S3 multipart upload after write error" + ); + } + return Err(StreamKitError::Runtime(msg)); + } + chunks_written += 1; + tracing::debug!( + %node_name, + chunks_written, + total_bytes, + "Flushed chunk to S3" + ); + } + + stats_tracker.sent(); + stats_tracker.maybe_send(); + } else { + tracing::warn!( + %node_name, + "Received non-Binary packet, ignoring" + ); + stats_tracker.discarded(); + } + } + + // ── Flush remaining buffer ─────────────────────────────────────── + if !buffer.is_empty() { + tracing::debug!( + %node_name, + remaining = buffer.len(), + "Flushing remaining buffer to S3" + ); + if let Err(e) = writer.write(buffer).await { + stats_tracker.errored(); + stats_tracker.force_send(); + let msg = format!("S3 write error (final flush): {e}"); + state_helpers::emit_failed(&context.state_tx, &node_name, &msg); + if let Err(abort_err) = writer.abort().await { + tracing::error!( + %node_name, + error = %abort_err, + "Failed to abort S3 multipart upload after final flush error" + ); + } + return Err(StreamKitError::Runtime(msg)); + } + chunks_written += 1; + } + + // ── Close (finalize multipart upload) ──────────────────────────── + tracing::info!( + %node_name, + "Closing S3 writer (finalizing multipart upload)" + ); + if let Err(e) = writer.close().await { + stats_tracker.errored(); + stats_tracker.force_send(); + let msg = format!("Failed to finalize S3 upload: {e}"); + state_helpers::emit_failed(&context.state_tx, &node_name, &msg); + reason = format!("close_failed: {e}"); + state_helpers::emit_stopped(&context.state_tx, &node_name, reason); + return Err(StreamKitError::Runtime(msg)); + } + + stats_tracker.force_send(); + tracing::info!( + %node_name, + packet_count, + total_bytes, + chunks_written, + key = %self.config.key, + "ObjectStoreWriteNode finished uploading to S3" + ); + + state_helpers::emit_stopped(&context.state_tx, &node_name, reason); + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use std::collections::HashMap; + use streamkit_core::node::RoutedPacketMessage; + use streamkit_core::NodeStatsUpdate; + use tokio::sync::mpsc; + + /// Verify pin definitions for the object store write node. + #[test] + fn test_pin_definitions() { + let node = ObjectStoreWriteNode { + config: ObjectStoreWriteConfig { + endpoint: String::new(), + bucket: String::new(), + key: String::new(), + region: default_region(), + access_key_id: None, + access_key_id_env: None, + secret_access_key: None, + secret_key_env: None, + chunk_size: default_chunk_size(), + content_type: None, + }, + }; + + let inputs = node.input_pins(); + assert_eq!(inputs.len(), 1); + assert_eq!(inputs[0].name, "in"); + assert_eq!(inputs[0].accepts_types, vec![PacketType::Binary]); + + let outputs = node.output_pins(); + assert!(outputs.is_empty(), "Sink node should have no output pins"); + } + + /// Verify factory rejects zero chunk_size. + #[test] + fn test_factory_rejects_zero_chunk_size() { + let factory = ObjectStoreWriteNode::factory(); + let params = serde_json::json!({ + "endpoint": "http://localhost:9000", + "bucket": "test", + "key": "test.bin", + "chunk_size": 0, + }); + let result = factory(Some(¶ms)); + assert!(result.is_err()); + let err = match result { + Err(e) => e.to_string(), + Ok(_) => panic!("Expected error for zero chunk_size"), + }; + assert!(err.contains("chunk_size"), "Error should mention chunk_size: {err}"); + } + + /// Verify credential resolution logic. + #[test] + fn test_resolve_credential_literal() { + let result = resolve_credential(None, Some("my-key"), "test"); + assert_eq!(result.unwrap(), "my-key"); + } + + #[test] + fn test_resolve_credential_empty_literal() { + let result = resolve_credential(None, Some(""), "test"); + assert!(result.is_err()); + } + + #[test] + fn test_resolve_credential_missing() { + let result = resolve_credential(None, None, "test"); + assert!(result.is_err()); + } + + #[test] + fn test_resolve_credential_env_precedence() { + // Use a unique env var name to avoid conflicts with parallel tests + let env_name = "_SK_TEST_OBJSTORE_CRED_PREC"; + std::env::set_var(env_name, "from-env"); + let result = resolve_credential(Some(env_name), Some("from-literal"), "test"); + assert_eq!(result.unwrap(), "from-env"); + std::env::remove_var(env_name); + } + + #[test] + fn test_resolve_credential_env_not_set() { + let result = resolve_credential(Some("_SK_TEST_OBJSTORE_NONEXISTENT_VAR"), None, "test"); + assert!(result.is_err()); + } + + /// Verify that the node emits the correct state transitions and handles + /// credential failures gracefully (without needing a real S3 endpoint). + #[tokio::test] + async fn test_node_fails_on_missing_credentials() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), input_rx); + + let (_control_tx, control_rx) = mpsc::channel(10); + let (state_tx, mut state_rx) = mpsc::channel(10); + let (stats_tx, _stats_rx) = mpsc::channel::(10); + let (mock_sender, _packet_rx) = mpsc::channel::(10); + + let output_sender = streamkit_core::OutputSender::new( + "test_objstore_write".to_string(), + streamkit_core::node::OutputRouting::Routed(mock_sender), + ); + + let context = NodeContext { + inputs, + input_types: HashMap::new(), + control_rx, + output_sender, + batch_size: 32, + state_tx, + stats_tx: Some(stats_tx), + telemetry_tx: None, + session_id: None, + cancellation_token: None, + pin_management_rx: None, + audio_pool: None, + video_pool: None, + pipeline_mode: streamkit_core::PipelineMode::Dynamic, + view_data_tx: None, + }; + + // No credentials provided — should fail during init + let config = ObjectStoreWriteConfig { + endpoint: "http://localhost:9000".to_string(), + bucket: "test-bucket".to_string(), + key: "test/output.bin".to_string(), + region: default_region(), + access_key_id: None, + access_key_id_env: None, + secret_access_key: None, + secret_key_env: None, + chunk_size: default_chunk_size(), + content_type: None, + }; + let node = Box::new(ObjectStoreWriteNode { config }); + + // Keep input_tx alive until after the node is checked + let _keep_alive = input_tx; + + let result = node.run(context).await; + assert!(result.is_err(), "Node should fail when credentials are missing"); + + // Should have emitted Initializing then Failed + let state = state_rx.recv().await.unwrap(); + assert!(matches!(state.state, streamkit_core::NodeState::Initializing)); + + let state = state_rx.recv().await.unwrap(); + assert!(matches!(state.state, streamkit_core::NodeState::Failed { .. })); + } +} diff --git a/docker-compose.rustfs.yml b/docker-compose.rustfs.yml new file mode 100644 index 00000000..30bccbaa --- /dev/null +++ b/docker-compose.rustfs.yml @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +# RustFS — lightweight S3-compatible object storage for local development +# and integration testing. +# +# Usage: +# docker compose -f docker-compose.rustfs.yml up -d +# +# S3 API: http://localhost:9000 +# Console: http://localhost:9001 +# Credentials: rustfsadmin / rustfsadmin +# +# Configure StreamKit to use it: +# export SK_S3_ACCESS_KEY=rustfsadmin +# export SK_S3_SECRET_KEY=rustfsadmin +# +# Create a bucket (via the console or any S3 client): +# aws --endpoint-url http://localhost:9000 s3 mb s3://streamkit-output +# aws --endpoint-url http://localhost:9000 s3 mb s3://streamkit-archive + +services: + rustfs: + image: rustfs/rustfs:latest + container_name: rustfs-server + ports: + - "9000:9000" # S3 API + - "9001:9001" # Console + environment: + - RUSTFS_VOLUMES=/data + - RUSTFS_ADDRESS=0.0.0.0:9000 + - RUSTFS_CONSOLE_ADDRESS=0.0.0.0:9001 + - RUSTFS_CONSOLE_ENABLE=true + - RUSTFS_ACCESS_KEY=rustfsadmin + - RUSTFS_SECRET_KEY=rustfsadmin + volumes: + - rustfs_data:/data + restart: unless-stopped + healthcheck: + test: ["CMD", "sh", "-c", "curl -f http://127.0.0.1:9000/health"] + interval: 15s + timeout: 5s + retries: 3 + start_period: 10s + +volumes: + rustfs_data: diff --git a/samples/pipelines/dynamic/moq_s3_archive.yml b/samples/pipelines/dynamic/moq_s3_archive.yml new file mode 100644 index 00000000..affb46b1 --- /dev/null +++ b/samples/pipelines/dynamic/moq_s3_archive.yml @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +# MoQ broadcast with S3 archive. +# +# Receives audio from a WebTransport publisher via MoQ, applies gain, +# broadcasts back via MoQ (primary output for live monitoring), and +# simultaneously muxes to OGG and archives to S3-compatible object +# storage (secondary output). +# +# The opus_encoder output pin uses Broadcast cardinality, so the same +# encoded packets feed both the MoQ peer and the OGG muxer → S3 branch +# without any extra splitter node. +# +# Requires: +# - S3-compatible endpoint (e.g. RustFS via docker-compose.rustfs.yml) +# - Set SK_S3_ACCESS_KEY and SK_S3_SECRET_KEY env vars + +name: MoQ Broadcast + S3 Archive +description: | + Accepts Opus audio via MoQ, broadcasts back via MoQ for live monitoring, + and simultaneously archives the stream to S3-compatible object storage. +mode: dynamic +client: + gateway_path: /moq/s3-archive + publish: + broadcast: input + tracks: + - kind: audio + source: microphone + watch: + broadcast: output + audio: true + video: false + +nodes: + # ── MoQ input/output (primary live broadcast) ────────────────────── + moq_peer: + kind: transport::moq::peer + params: + gateway_path: /moq/s3-archive + input_broadcasts: + - input + output_broadcast: output + allow_reconnect: true + needs: opus_encoder + + # ── Audio path: decode → gain → encode ───────────────────────────── + opus_decoder: + kind: audio::opus::decoder + needs: + in: moq_peer.audio/data + + gain: + kind: audio::gain + params: + gain: 1.0 + needs: opus_decoder + + opus_encoder: + kind: audio::opus::encoder + needs: gain + + # ── S3 archive branch (secondary output) ─────────────────────────── + # Taps the same encoded Opus stream and muxes into OGG for archival. + ogg_muxer: + kind: containers::ogg::muxer + params: + channels: 1 + needs: opus_encoder + + s3_writer: + kind: core::object_store_writer + params: + endpoint: "http://localhost:9000" + bucket: "streamkit-archive" + key: "recordings/live-session.ogg" + region: "us-east-1" + access_key_id_env: "SK_S3_ACCESS_KEY" + secret_key_env: "SK_S3_SECRET_KEY" + content_type: "audio/ogg" + needs: ogg_muxer diff --git a/samples/pipelines/oneshot/transcode_to_s3.yml b/samples/pipelines/oneshot/transcode_to_s3.yml new file mode 100644 index 00000000..c23362fd --- /dev/null +++ b/samples/pipelines/oneshot/transcode_to_s3.yml @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +# Demonstrates the object store writer node with audio transcoding. +# +# Transcodes an uploaded Ogg/Opus file to MP4 (Opus-in-MP4) and uploads +# the result to S3-compatible object storage. +# +# Requires: +# - S3-compatible endpoint (e.g. RustFS via docker-compose.rustfs.yml) +# - Set SK_S3_ACCESS_KEY and SK_S3_SECRET_KEY env vars + +name: Transcode to S3 (Ogg → MP4) +description: Transcodes uploaded Ogg/Opus audio to MP4 and uploads to S3-compatible storage +mode: oneshot +client: + input: + type: file_upload + accept: "audio/opus" + output: + type: none +nodes: + http_input: + kind: streamkit::http_input + + ogg_demuxer: + kind: containers::ogg::demuxer + needs: http_input + + opus_decoder: + kind: audio::opus::decoder + needs: ogg_demuxer + + opus_encoder: + kind: audio::opus::encoder + needs: opus_decoder + + mp4_muxer: + kind: containers::mp4::muxer + params: + mode: file + sample_rate: 48000 + channels: 2 + needs: opus_encoder + + s3_writer: + kind: core::object_store_writer + params: + endpoint: "http://localhost:9000" + bucket: "streamkit-output" + key: "transcode/output.mp4" + region: "us-east-1" + access_key_id_env: "SK_S3_ACCESS_KEY" + secret_key_env: "SK_S3_SECRET_KEY" + content_type: "audio/mp4" + needs: mp4_muxer diff --git a/samples/pipelines/oneshot/tts_to_s3.yml b/samples/pipelines/oneshot/tts_to_s3.yml new file mode 100644 index 00000000..1b036849 --- /dev/null +++ b/samples/pipelines/oneshot/tts_to_s3.yml @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +# Demonstrates the object store writer node with TTS output. +# +# Synthesizes speech from text using Kokoro TTS, encodes to Opus/OGG, +# and uploads the result to S3-compatible object storage. +# +# Requires: +# - kokoro native plugin (just install-plugin kokoro) +# - S3-compatible endpoint (e.g. RustFS via docker-compose.rustfs.yml) +# - Set SK_S3_ACCESS_KEY and SK_S3_SECRET_KEY env vars + +name: TTS to S3 (Kokoro) +description: Synthesizes speech from text and uploads the OGG result to S3-compatible storage +mode: oneshot +client: + input: + type: text + placeholder: "Enter text to synthesize" + output: + type: none +nodes: + http_input: + kind: streamkit::http_input + + text_chunker: + kind: core::text_chunker + params: + min_length: 10 + needs: http_input + + tts: + kind: plugin::native::kokoro + params: + model_dir: "models/kokoro-multi-lang-v1_1" + speaker_id: 0 + speed: 1.0 + needs: text_chunker + + resampler: + kind: audio::resampler + params: + chunk_frames: 960 + output_frame_size: 960 + target_sample_rate: 48000 + needs: tts + + opus_encoder: + kind: audio::opus::encoder + needs: resampler + + ogg_muxer: + kind: containers::ogg::muxer + params: + channels: 1 + needs: opus_encoder + + s3_writer: + kind: core::object_store_writer + params: + endpoint: "http://localhost:9000" + bucket: "streamkit-output" + key: "tts/output.ogg" + region: "us-east-1" + access_key_id_env: "SK_S3_ACCESS_KEY" + secret_key_env: "SK_S3_SECRET_KEY" + content_type: "audio/ogg" + needs: ogg_muxer From d3af52545540b351bd1c9387bb14e52409fc3c92 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Wed, 8 Apr 2026 15:06:29 +0000 Subject: [PATCH 2/5] fix(nodes): fix close() failure path in object store writer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove double state emission (emit_failed + emit_stopped) on close failure — only emit_failed, matching every other error path in the codebase and avoiding spurious Failed→Stopped state sequence. - Add writer.abort() on close() failure to clean up orphaned multipart uploads, matching the write-error paths. Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/object_store_write.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/nodes/src/core/object_store_write.rs b/crates/nodes/src/core/object_store_write.rs index bd294881..f924c3bf 100644 --- a/crates/nodes/src/core/object_store_write.rs +++ b/crates/nodes/src/core/object_store_write.rs @@ -287,7 +287,7 @@ impl ProcessorNode for ObjectStoreWriteNode { let mut total_bytes: u64 = 0; let mut buffer = Vec::with_capacity(self.config.chunk_size); let mut chunks_written: u64 = 0; - let mut reason = "input_closed".to_string(); + let reason = "input_closed".to_string(); while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await { if let Packet::Binary { data, .. } = packet { @@ -369,8 +369,14 @@ impl ProcessorNode for ObjectStoreWriteNode { stats_tracker.force_send(); let msg = format!("Failed to finalize S3 upload: {e}"); state_helpers::emit_failed(&context.state_tx, &node_name, &msg); - reason = format!("close_failed: {e}"); - state_helpers::emit_stopped(&context.state_tx, &node_name, reason); + // Attempt to abort the multipart upload to avoid orphaned parts. + if let Err(abort_err) = writer.abort().await { + tracing::error!( + %node_name, + error = %abort_err, + "Failed to abort S3 multipart upload after close error" + ); + } return Err(StreamKitError::Runtime(msg)); } From 717635ca6291b05c4b8732406dfa4b3578c79956 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Wed, 8 Apr 2026 19:18:11 +0000 Subject: [PATCH 3/5] fix(nodes): address review findings for object store writer - Add AbortOnDrop guard to abort orphaned multipart uploads on task cancellation (critical: prevents storage leaks) - Replace drain().collect() with split_off + mem::replace for zero-copy chunk extraction - Validate endpoint, bucket, and key are non-empty in factory (fail-fast instead of runtime S3 errors) - Refactor resolve_credential to accept an env lookup closure, eliminating unsound std::env::set_var calls in tests - Inline reason string at use site (remove early allocation) - Add comment about static key limitation in moq_s3_archive.yml - Add tests for empty endpoint/bucket/key factory validation Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/object_store_write.rs | 233 +++++++++++++++---- samples/pipelines/dynamic/moq_s3_archive.yml | 3 + 2 files changed, 193 insertions(+), 43 deletions(-) diff --git a/crates/nodes/src/core/object_store_write.rs b/crates/nodes/src/core/object_store_write.rs index f924c3bf..cab19290 100644 --- a/crates/nodes/src/core/object_store_write.rs +++ b/crates/nodes/src/core/object_store_write.rs @@ -106,15 +106,23 @@ pub struct ObjectStoreWriteConfig { // Credential helpers // --------------------------------------------------------------------------- -/// Resolve a credential value from an env-var name (takes precedence) or a -/// literal fallback. +/// Resolve a credential value. +/// +/// Resolution order: +/// 1. Environment variable named by `env_name` (if provided and non-empty). +/// 2. Literal value from `literal` (if provided and non-empty). +/// 3. Error. +/// +/// The `env_lookup` parameter allows injecting a custom lookup function +/// for testability (avoids `std::env::set_var` unsoundness in tests). fn resolve_credential( env_name: Option<&str>, literal: Option<&str>, label: &str, + env_lookup: impl Fn(&str) -> Result, ) -> Result { if let Some(env) = env_name { - match std::env::var(env) { + match env_lookup(env) { Ok(val) if !val.is_empty() => { tracing::debug!("Resolved {label} from env var {env}"); return Ok(val); @@ -137,13 +145,86 @@ fn resolve_credential( } return Ok(val.to_string()); } - Err(StreamKitError::Configuration(format!("No {label} provided (set via config or env var)"))) + Err(StreamKitError::Configuration(format!( + "No {label} provided (set via config or env var)" + ))) } // --------------------------------------------------------------------------- // Node // --------------------------------------------------------------------------- +/// RAII guard that aborts an OpenDAL multipart upload on drop unless +/// explicitly disarmed via [`AbortOnDrop::disarm`]. Protects against +/// orphaned multipart parts when the Tokio task is cancelled mid-upload. +struct AbortOnDrop { + writer: Option, + node_name: String, +} + +impl AbortOnDrop { + const fn new(writer: opendal::Writer, node_name: String) -> Self { + Self { + writer: Some(writer), + node_name, + } + } + + /// Return a mutable reference to the inner writer. + /// + /// # Panics + /// + /// Only if called after [`disarm`], which is impossible because `disarm` + /// consumes `self`. + #[allow(clippy::expect_used)] // Invariant: writer is always Some until disarm/drop. + const fn writer_mut(&mut self) -> &mut opendal::Writer { + self.writer + .as_mut() + .expect("writer consumed after disarm") + } + + /// Take ownership of the writer, disabling the abort-on-drop guard. + /// Call this once the upload has been successfully closed. + /// + /// # Panics + /// + /// Only if the `Option` is already `None`, which cannot happen because + /// `disarm` consumes `self` and `Drop` only runs afterwards. + #[allow(clippy::expect_used, clippy::missing_const_for_fn)] // Not const: Self has a destructor. + fn disarm(mut self) -> opendal::Writer { + self.writer.take().expect("writer already consumed") + } +} + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + if let Some(writer) = self.writer.take() { + let node_name = self.node_name.clone(); + tracing::warn!( + %node_name, + "ObjectStoreWriteNode dropped with active writer — spawning abort task" + ); + tokio::spawn(async move { + // Writer::abort is not available on all backends, but for S3 + // it cleans up the incomplete multipart upload. + let mut w = writer; + if let Err(e) = w.abort().await { + tracing::error!( + %node_name, + error = %e, + "Failed to abort orphaned S3 multipart upload" + ); + } else { + tracing::info!( + %node_name, + "Successfully aborted orphaned S3 multipart upload" + ); + } + }); + } + } +} + /// Sink node that streams [`Packet::Binary`] data to S3-compatible object /// storage via OpenDAL's multipart upload. pub struct ObjectStoreWriteNode { @@ -171,6 +252,25 @@ impl ObjectStoreWriteNode { config_helpers::parse_config_required(params)? }; + // Validate required fields early (don't defer to runtime S3 errors). + if params.is_some() { + if config.endpoint.is_empty() { + return Err(StreamKitError::Configuration( + "endpoint must not be empty".to_string(), + )); + } + if config.bucket.is_empty() { + return Err(StreamKitError::Configuration( + "bucket must not be empty".to_string(), + )); + } + if config.key.is_empty() { + return Err(StreamKitError::Configuration( + "key must not be empty".to_string(), + )); + } + } + if config.chunk_size == 0 { return Err(StreamKitError::Configuration( "chunk_size must be greater than 0".to_string(), @@ -210,6 +310,7 @@ impl ProcessorNode for ObjectStoreWriteNode { self.config.access_key_id_env.as_deref(), self.config.access_key_id.as_deref(), "access_key_id", + |name| std::env::var(name), ) .inspect_err(|e| { state_helpers::emit_failed(&context.state_tx, &node_name, e.to_string()); @@ -219,6 +320,7 @@ impl ProcessorNode for ObjectStoreWriteNode { self.config.secret_key_env.as_deref(), self.config.secret_access_key.as_deref(), "secret_access_key", + |name| std::env::var(name), ) .inspect_err(|e| { state_helpers::emit_failed(&context.state_tx, &node_name, e.to_string()); @@ -261,7 +363,7 @@ impl ProcessorNode for ObjectStoreWriteNode { let writer_future = operator.writer_with(&self.config.key).chunk(self.config.chunk_size); // Apply content type if configured. - let mut writer = if let Some(ref ct) = self.config.content_type { + let writer = if let Some(ref ct) = self.config.content_type { writer_future.content_type(ct).await } else { writer_future.await @@ -272,6 +374,10 @@ impl ProcessorNode for ObjectStoreWriteNode { StreamKitError::Runtime(msg) })?; + // Wrap in AbortOnDrop so a Tokio task cancellation doesn't leak + // orphaned multipart parts on the storage backend. + let mut guard = AbortOnDrop::new(writer, node_name.clone()); + tracing::info!( %node_name, key = %self.config.key, @@ -287,7 +393,6 @@ impl ProcessorNode for ObjectStoreWriteNode { let mut total_bytes: u64 = 0; let mut buffer = Vec::with_capacity(self.config.chunk_size); let mut chunks_written: u64 = 0; - let reason = "input_closed".to_string(); while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await { if let Packet::Binary { data, .. } = packet { @@ -299,20 +404,14 @@ impl ProcessorNode for ObjectStoreWriteNode { // Flush when buffer reaches chunk_size while buffer.len() >= self.config.chunk_size { - let chunk: Vec = buffer.drain(..self.config.chunk_size).collect(); - if let Err(e) = writer.write(chunk).await { + let tail = buffer.split_off(self.config.chunk_size); + let chunk = std::mem::replace(&mut buffer, tail); + if let Err(e) = guard.writer_mut().write(chunk).await { stats_tracker.errored(); stats_tracker.force_send(); let msg = format!("S3 write error: {e}"); state_helpers::emit_failed(&context.state_tx, &node_name, &msg); - // Attempt to abort the multipart upload to clean up - if let Err(abort_err) = writer.abort().await { - tracing::error!( - %node_name, - error = %abort_err, - "Failed to abort S3 multipart upload after write error" - ); - } + // Guard will abort the multipart upload on drop. return Err(StreamKitError::Runtime(msg)); } chunks_written += 1; @@ -342,18 +441,12 @@ impl ProcessorNode for ObjectStoreWriteNode { remaining = buffer.len(), "Flushing remaining buffer to S3" ); - if let Err(e) = writer.write(buffer).await { + if let Err(e) = guard.writer_mut().write(buffer).await { stats_tracker.errored(); stats_tracker.force_send(); let msg = format!("S3 write error (final flush): {e}"); state_helpers::emit_failed(&context.state_tx, &node_name, &msg); - if let Err(abort_err) = writer.abort().await { - tracing::error!( - %node_name, - error = %abort_err, - "Failed to abort S3 multipart upload after final flush error" - ); - } + // Guard will abort the multipart upload on drop. return Err(StreamKitError::Runtime(msg)); } chunks_written += 1; @@ -364,22 +457,18 @@ impl ProcessorNode for ObjectStoreWriteNode { %node_name, "Closing S3 writer (finalizing multipart upload)" ); - if let Err(e) = writer.close().await { + if let Err(e) = guard.writer_mut().close().await { stats_tracker.errored(); stats_tracker.force_send(); let msg = format!("Failed to finalize S3 upload: {e}"); state_helpers::emit_failed(&context.state_tx, &node_name, &msg); - // Attempt to abort the multipart upload to avoid orphaned parts. - if let Err(abort_err) = writer.abort().await { - tracing::error!( - %node_name, - error = %abort_err, - "Failed to abort S3 multipart upload after close error" - ); - } + // Guard will abort the multipart upload on drop. return Err(StreamKitError::Runtime(msg)); } + // Upload committed successfully — disarm the abort guard. + guard.disarm(); + stats_tracker.force_send(); tracing::info!( %node_name, @@ -390,7 +479,7 @@ impl ProcessorNode for ObjectStoreWriteNode { "ObjectStoreWriteNode finished uploading to S3" ); - state_helpers::emit_stopped(&context.state_tx, &node_name, reason); + state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); Ok(()) } } @@ -454,41 +543,99 @@ mod tests { assert!(err.contains("chunk_size"), "Error should mention chunk_size: {err}"); } + /// Stub lookup that never finds any variable. + fn no_env(_name: &str) -> Result { + Err(std::env::VarError::NotPresent) + } + /// Verify credential resolution logic. #[test] fn test_resolve_credential_literal() { - let result = resolve_credential(None, Some("my-key"), "test"); + let result = resolve_credential(None, Some("my-key"), "test", no_env); assert_eq!(result.unwrap(), "my-key"); } #[test] fn test_resolve_credential_empty_literal() { - let result = resolve_credential(None, Some(""), "test"); + let result = resolve_credential(None, Some(""), "test", no_env); assert!(result.is_err()); } #[test] fn test_resolve_credential_missing() { - let result = resolve_credential(None, None, "test"); + let result = resolve_credential(None, None, "test", no_env); assert!(result.is_err()); } #[test] fn test_resolve_credential_env_precedence() { - // Use a unique env var name to avoid conflicts with parallel tests - let env_name = "_SK_TEST_OBJSTORE_CRED_PREC"; - std::env::set_var(env_name, "from-env"); - let result = resolve_credential(Some(env_name), Some("from-literal"), "test"); + // Inject a fake lookup — no std::env::set_var needed. + let lookup = |_: &str| Ok("from-env".to_string()); + let result = resolve_credential(Some("ANY_VAR"), Some("from-literal"), "test", lookup); assert_eq!(result.unwrap(), "from-env"); - std::env::remove_var(env_name); + } + + #[test] + fn test_resolve_credential_env_empty() { + let lookup = |_: &str| Ok(String::new()); + let result = resolve_credential(Some("ANY_VAR"), None, "test", lookup); + assert!(result.is_err()); } #[test] fn test_resolve_credential_env_not_set() { - let result = resolve_credential(Some("_SK_TEST_OBJSTORE_NONEXISTENT_VAR"), None, "test"); + let result = resolve_credential(Some("MISSING"), None, "test", no_env); assert!(result.is_err()); } + /// Verify factory rejects empty endpoint. + #[test] + fn test_factory_rejects_empty_endpoint() { + let factory = ObjectStoreWriteNode::factory(); + let params = serde_json::json!({ + "endpoint": "", + "bucket": "test", + "key": "test.bin", + }); + let err = match factory(Some(¶ms)) { + Err(e) => e.to_string(), + Ok(_) => panic!("Expected error for empty endpoint"), + }; + assert!(err.contains("endpoint"), "Error should mention endpoint: {err}"); + } + + /// Verify factory rejects empty bucket. + #[test] + fn test_factory_rejects_empty_bucket() { + let factory = ObjectStoreWriteNode::factory(); + let params = serde_json::json!({ + "endpoint": "http://localhost:9000", + "bucket": "", + "key": "test.bin", + }); + let err = match factory(Some(¶ms)) { + Err(e) => e.to_string(), + Ok(_) => panic!("Expected error for empty bucket"), + }; + assert!(err.contains("bucket"), "Error should mention bucket: {err}"); + } + + /// Verify factory rejects empty key. + #[test] + fn test_factory_rejects_empty_key() { + let factory = ObjectStoreWriteNode::factory(); + let params = serde_json::json!({ + "endpoint": "http://localhost:9000", + "bucket": "test", + "key": "", + }); + let err = match factory(Some(¶ms)) { + Err(e) => e.to_string(), + Ok(_) => panic!("Expected error for empty key"), + }; + assert!(err.contains("key"), "Error should mention key: {err}"); + } + /// Verify that the node emits the correct state transitions and handles /// credential failures gracefully (without needing a real S3 endpoint). #[tokio::test] diff --git a/samples/pipelines/dynamic/moq_s3_archive.yml b/samples/pipelines/dynamic/moq_s3_archive.yml index affb46b1..0b1ff1fd 100644 --- a/samples/pipelines/dynamic/moq_s3_archive.yml +++ b/samples/pipelines/dynamic/moq_s3_archive.yml @@ -75,6 +75,9 @@ nodes: params: endpoint: "http://localhost:9000" bucket: "streamkit-archive" + # NOTE: Static key — every session overwrites the previous recording. + # A future enhancement could support template variables (e.g. + # "recordings/{session_id}.ogg") for true multi-session archival. key: "recordings/live-session.ogg" region: "us-east-1" access_key_id_env: "SK_S3_ACCESS_KEY" From 913b49c331c3c594409d0b329569ca3e86c904d4 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Wed, 8 Apr 2026 19:18:22 +0000 Subject: [PATCH 4/5] style: cargo fmt Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/nodes/src/core/object_store_write.rs | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/crates/nodes/src/core/object_store_write.rs b/crates/nodes/src/core/object_store_write.rs index cab19290..682e72c8 100644 --- a/crates/nodes/src/core/object_store_write.rs +++ b/crates/nodes/src/core/object_store_write.rs @@ -145,9 +145,7 @@ fn resolve_credential( } return Ok(val.to_string()); } - Err(StreamKitError::Configuration(format!( - "No {label} provided (set via config or env var)" - ))) + Err(StreamKitError::Configuration(format!("No {label} provided (set via config or env var)"))) } // --------------------------------------------------------------------------- @@ -164,10 +162,7 @@ struct AbortOnDrop { impl AbortOnDrop { const fn new(writer: opendal::Writer, node_name: String) -> Self { - Self { - writer: Some(writer), - node_name, - } + Self { writer: Some(writer), node_name } } /// Return a mutable reference to the inner writer. @@ -178,9 +173,7 @@ impl AbortOnDrop { /// consumes `self`. #[allow(clippy::expect_used)] // Invariant: writer is always Some until disarm/drop. const fn writer_mut(&mut self) -> &mut opendal::Writer { - self.writer - .as_mut() - .expect("writer consumed after disarm") + self.writer.as_mut().expect("writer consumed after disarm") } /// Take ownership of the writer, disabling the abort-on-drop guard. @@ -265,9 +258,7 @@ impl ObjectStoreWriteNode { )); } if config.key.is_empty() { - return Err(StreamKitError::Configuration( - "key must not be empty".to_string(), - )); + return Err(StreamKitError::Configuration("key must not be empty".to_string())); } } From 6b38e3bfa522390970ca74ad4ec349463d3e8cdb Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Wed, 8 Apr 2026 20:02:28 +0000 Subject: [PATCH 5/5] refactor: move docker-compose.rustfs.yml to docker/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback — docker/ is a better home than repo root since this is useful beyond just e2e/playwright testing. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- docker-compose.rustfs.yml => docker/docker-compose.rustfs.yml | 2 +- samples/pipelines/dynamic/moq_s3_archive.yml | 2 +- samples/pipelines/oneshot/transcode_to_s3.yml | 2 +- samples/pipelines/oneshot/tts_to_s3.yml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename docker-compose.rustfs.yml => docker/docker-compose.rustfs.yml (95%) diff --git a/docker-compose.rustfs.yml b/docker/docker-compose.rustfs.yml similarity index 95% rename from docker-compose.rustfs.yml rename to docker/docker-compose.rustfs.yml index 30bccbaa..840c49c1 100644 --- a/docker-compose.rustfs.yml +++ b/docker/docker-compose.rustfs.yml @@ -6,7 +6,7 @@ # and integration testing. # # Usage: -# docker compose -f docker-compose.rustfs.yml up -d +# docker compose -f docker/docker-compose.rustfs.yml up -d # # S3 API: http://localhost:9000 # Console: http://localhost:9001 diff --git a/samples/pipelines/dynamic/moq_s3_archive.yml b/samples/pipelines/dynamic/moq_s3_archive.yml index 0b1ff1fd..2bfda1a1 100644 --- a/samples/pipelines/dynamic/moq_s3_archive.yml +++ b/samples/pipelines/dynamic/moq_s3_archive.yml @@ -14,7 +14,7 @@ # without any extra splitter node. # # Requires: -# - S3-compatible endpoint (e.g. RustFS via docker-compose.rustfs.yml) +# - S3-compatible endpoint (e.g. RustFS via docker/docker-compose.rustfs.yml) # - Set SK_S3_ACCESS_KEY and SK_S3_SECRET_KEY env vars name: MoQ Broadcast + S3 Archive diff --git a/samples/pipelines/oneshot/transcode_to_s3.yml b/samples/pipelines/oneshot/transcode_to_s3.yml index c23362fd..a9ce33c4 100644 --- a/samples/pipelines/oneshot/transcode_to_s3.yml +++ b/samples/pipelines/oneshot/transcode_to_s3.yml @@ -8,7 +8,7 @@ # the result to S3-compatible object storage. # # Requires: -# - S3-compatible endpoint (e.g. RustFS via docker-compose.rustfs.yml) +# - S3-compatible endpoint (e.g. RustFS via docker/docker-compose.rustfs.yml) # - Set SK_S3_ACCESS_KEY and SK_S3_SECRET_KEY env vars name: Transcode to S3 (Ogg → MP4) diff --git a/samples/pipelines/oneshot/tts_to_s3.yml b/samples/pipelines/oneshot/tts_to_s3.yml index 1b036849..24965775 100644 --- a/samples/pipelines/oneshot/tts_to_s3.yml +++ b/samples/pipelines/oneshot/tts_to_s3.yml @@ -9,7 +9,7 @@ # # Requires: # - kokoro native plugin (just install-plugin kokoro) -# - S3-compatible endpoint (e.g. RustFS via docker-compose.rustfs.yml) +# - S3-compatible endpoint (e.g. RustFS via docker/docker-compose.rustfs.yml) # - Set SK_S3_ACCESS_KEY and SK_S3_SECRET_KEY env vars name: TTS to S3 (Kokoro)