Skip to content

Commit 2e6bea7

Browse files
staging-devin-ai-integration[bot]streamkit-devinstreamer45
authored
feat(nodes): add S3/object store sink node via OpenDAL (#273)
* feat(nodes): add S3/object store sink node via OpenDAL Add ObjectStoreWriteNode — a new sink node that streams Binary packets to S3-compatible object storage using Apache OpenDAL. Supports AWS S3, MinIO, RustFS, GCS, Azure Blob, and other S3-compatible backends. Key design: - Multipart upload via OpenDAL Writer with configurable chunk size (default 5 MiB) for bounded memory usage - Credentials resolved from env vars (precedence) or inline config, following the RTMP node's stream_key_env pattern - Full state lifecycle: Initializing → Running → Stopped/Failed - Tracing at each stage for runtime debugging - Behind opt-in 'object_store' feature flag (not in default) Includes: - 3 sample pipelines (TTS→S3, transcode→S3, MoQ+S3 archive) - docker-compose.rustfs.yml for local E2E validation with RustFS - 8 unit tests covering pins, factory validation, credential resolution, and state transitions Signed-off-by: Devin AI <devin@devin.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): fix close() failure path in object store writer - Remove double state emission (emit_failed + emit_stopped) on close failure — only emit_failed, matching every other error path in the codebase and avoiding spurious Failed→Stopped state sequence. - Add writer.abort() on close() failure to clean up orphaned multipart uploads, matching the write-error paths. Signed-off-by: Devin AI <devin@devin.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): address review findings for object store writer - Add AbortOnDrop guard to abort orphaned multipart uploads on task cancellation (critical: prevents storage leaks) - Replace drain().collect() with split_off + mem::replace for zero-copy chunk extraction - Validate endpoint, bucket, and key are non-empty in factory (fail-fast instead of runtime S3 errors) - Refactor resolve_credential to accept an env lookup closure, eliminating unsound std::env::set_var calls in tests - Inline reason string at use site (remove early allocation) - Add comment about static key limitation in moq_s3_archive.yml - Add tests for empty endpoint/bucket/key factory validation Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style: cargo fmt Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor: move docker-compose.rustfs.yml to docker/ Per review feedback — docker/ is a better home than repo root since this is useful beyond just e2e/playwright testing. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> --------- Signed-off-by: Devin AI <devin@devin.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com>
1 parent daa53f7 commit 2e6bea7

File tree

8 files changed

+1253
-12
lines changed

8 files changed

+1253
-12
lines changed

Cargo.lock

Lines changed: 271 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/nodes/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ rustls-platform-verifier = { version = "0.6", optional = true }
9696
rav1e = { version = "0.8", optional = true, default-features = false, features = ["threading", "asm"] }
9797
rav1d = { version = "1.1", optional = true, default-features = false, features = ["bitdepth_8", "bitdepth_16", "asm"] }
9898

99+
# Object storage (optional, behind `object_store` feature)
100+
opendal = { version = "0.55", optional = true, default-features = false, features = [
101+
"services-s3",
102+
] }
103+
99104
# GPU compositing (optional, behind `gpu` feature)
100105
wgpu = { version = "29", optional = true, default-features = false, features = ["vulkan", "metal", "dx12", "wgsl"] }
101106
pollster = { version = "0.4", optional = true }
@@ -167,6 +172,7 @@ dav1d_static = ["dav1d"]
167172
colorbars = ["dep:schemars", "dep:serde_json", "dep:fontdue"]
168173
compositor = ["dep:schemars", "dep:serde_json", "dep:image", "dep:tiny-skia", "dep:rayon", "dep:fontdue", "dep:smallvec", "dep:uuid", "dep:resvg"]
169174
gpu = ["compositor", "dep:wgpu", "dep:pollster", "dep:bytemuck"]
175+
object_store = ["dep:opendal", "dep:schemars"]
170176
codegen = ["dep:ts-rs"]
171177
video = ["vp9", "av1", "openh264", "colorbars", "compositor"]
172178

crates/nodes/src/core/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub mod bytes_output;
1010
pub mod file_read;
1111
pub mod file_write;
1212
pub mod json_serialize;
13+
#[cfg(feature = "object_store")]
14+
pub mod object_store_write;
1315
pub mod pacer;
1416
mod passthrough;
1517
#[cfg(feature = "script")]
@@ -190,4 +192,23 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode
190192

191193
// --- Register TelemetryOut Node ---
192194
telemetry_out::register(registry);
195+
196+
// --- Register ObjectStoreWriteNode ---
197+
#[cfg(feature = "object_store")]
198+
{
199+
use schemars::schema_for;
200+
201+
let factory = object_store_write::ObjectStoreWriteNode::factory();
202+
registry.register_dynamic_with_description(
203+
"core::object_store_writer",
204+
move |params| (factory)(params),
205+
serde_json::to_value(schema_for!(object_store_write::ObjectStoreWriteConfig))
206+
.expect("ObjectStoreWriteConfig schema should serialize to JSON"),
207+
vec!["core".to_string(), "io".to_string(), "object_store".to_string()],
208+
false,
209+
"Streams binary data to S3-compatible object storage (AWS S3, GCS, Azure, MinIO, RustFS, etc.). \
210+
Uses multipart upload for bounded memory usage. \
211+
Credentials can be provided via config or environment variables.",
212+
);
213+
}
193214
}

0 commit comments

Comments
 (0)