Skip to content

feat(nodes): add S3/object store sink node via OpenDAL#273

Merged
streamer45 merged 5 commits intomainfrom
devin/1775659476-object-store-writer
Apr 9, 2026
Merged

feat(nodes): add S3/object store sink node via OpenDAL#273
streamer45 merged 5 commits intomainfrom
devin/1775659476-object-store-writer

Conversation

@staging-devin-ai-integration
Copy link
Copy Markdown
Contributor

@staging-devin-ai-integration staging-devin-ai-integration bot commented Apr 8, 2026

Summary

Adds a new object store write sink node (core::object_store_writer) that streams Binary packets to S3-compatible object storage via Apache OpenDAL.

Key design choices

  • OpenDAL (Apache 2.0, ~2.9M downloads/90d) — provider-agnostic: AWS S3, MinIO, RustFS, GCS, Azure Blob, Cloudflare R2, etc.
  • Multipart upload with configurable chunk_size (default 5 MiB) keeps memory bounded regardless of total upload size.
  • Credential resolution follows the stream_key_env pattern from RtmpPublishNode — env var names take precedence over inline values, so secrets stay out of pipeline YAML.
  • AbortOnDrop guard ensures orphaned multipart uploads are aborted if the Tokio task is cancelled mid-upload (prevents storage leaks).
  • Zero-copy chunk extraction via split_off + mem::replace (no drain().collect() double-allocation).
  • Fail-fast factory validation — empty endpoint, bucket, or key rejected at config time, not at runtime.
  • Testable credential resolutionresolve_credential accepts a lookup closure, eliminating unsound std::env::set_var in tests.
  • Opt-in object_store feature flag (not in default), following the pattern of svt_av1, dav1d, etc.

Files added/modified

File Description
crates/nodes/src/core/object_store_write.rs Node implementation + 12 unit tests
crates/nodes/src/core/mod.rs Module declaration + dynamic registration
crates/nodes/Cargo.toml opendal dependency + object_store feature
samples/pipelines/oneshot/tts_to_s3.yml Sample: TTS → S3
samples/pipelines/oneshot/transcode_to_s3.yml Sample: transcode → S3
samples/pipelines/dynamic/moq_s3_archive.yml Sample: MoQ broadcast + S3 archive (secondary output)
docker-compose.rustfs.yml RustFS for local E2E validation

Review & Testing Checklist for Human

  • Verify AbortOnDrop guard correctly aborts multipart uploads on task cancellation (spin up RustFS, start a long upload, kill the pipeline mid-stream, check for orphaned parts)
  • Verify multipart upload completes successfully end-to-end with RustFS (docker compose -f docker-compose.rustfs.yml up, create bucket, run a sample pipeline with object_store feature enabled)
  • Review split_off + mem::replace chunk extraction for correctness at boundary conditions (buffer exactly == chunk_size, buffer >> chunk_size)
  • Confirm factory rejects empty endpoint/bucket/key and zero chunk_size

Notes

  • The moq_s3_archive.yml sample uses a static key (recordings/live-session.ogg) — every session overwrites the previous recording. A comment documents this limitation and suggests template variable support as a future enhancement.
  • Finding chore: proper notice file #7 (schemars range(min = 1) + factory check redundancy) is intentional defense-in-depth — no change needed.
  • OpenDAL is added with default-features = false, features = ["services-s3"] to keep the dependency footprint minimal.

Link to Devin session: https://staging.itsdev.in/sessions/a27079cc9abc4b1a9cb26b6045442ef1
Requested by: @streamer45


Staging: Open in Devin

Add ObjectStoreWriteNode — a new sink node that streams Binary packets
to S3-compatible object storage using Apache OpenDAL. Supports AWS S3,
MinIO, RustFS, GCS, Azure Blob, and other S3-compatible backends.

Key design:
- Multipart upload via OpenDAL Writer with configurable chunk size
  (default 5 MiB) for bounded memory usage
- Credentials resolved from env vars (precedence) or inline config,
  following the RTMP node's stream_key_env pattern
- Full state lifecycle: Initializing → Running → Stopped/Failed
- Tracing at each stage for runtime debugging
- Behind opt-in 'object_store' feature flag (not in default)

Includes:
- 3 sample pipelines (TTS→S3, transcode→S3, MoQ+S3 archive)
- docker-compose.rustfs.yml for local E2E validation with RustFS
- 8 unit tests covering pins, factory validation, credential
  resolution, and state transitions

Signed-off-by: Devin AI <devin@devin.ai>
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
@staging-devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

staging-devin-ai-integration[bot]

This comment was marked as resolved.

- Remove double state emission (emit_failed + emit_stopped) on close
  failure — only emit_failed, matching every other error path in the
  codebase and avoiding spurious Failed→Stopped state sequence.
- Add writer.abort() on close() failure to clean up orphaned multipart
  uploads, matching the write-error paths.

Signed-off-by: Devin AI <devin@devin.ai>
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Copy link
Copy Markdown
Contributor Author

@staging-devin-ai-integration staging-devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 4 additional findings in Devin Review.

Staging: Open in Devin
Debug

Playground

Comment on lines +127 to +131
Err(_) => {
return Err(StreamKitError::Configuration(format!(
"Environment variable '{env}' for {label} is not set"
)));
},
Copy link
Copy Markdown
Contributor Author

@staging-devin-ai-integration staging-devin-ai-integration bot Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 resolve_credential does not fall back to literal when env var is specified but missing

The resolve_credential function's own doc comment says it resolves "from an env-var name (takes precedence) or a literal fallback", and the access_key_id field doc says "If omitted, the node falls back to access_key_id_env". However, when env_name is Some (i.e., the user provided access_key_id_env), every branch of the match std::env::var(env) returns immediately — either with Ok or Err — without ever checking the literal parameter. This means if a user provides both access_key_id: "my-key" and access_key_id_env: "MY_ENV_VAR", and MY_ENV_VAR is not set in the environment, the node hard-fails instead of falling back to the literal "my-key". The word "fallback" in the doc creates the expectation that the literal will be used when the env var is unavailable, but the code never reaches the literal check at crates/nodes/src/core/object_store_write.rs:134.

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional — it matches the RTMP node's stream_key_env pattern where specifying an env var name is a hard requirement: if you tell the node to read from MY_ENV_VAR, that var must exist. This prevents silent credential misconfiguration (e.g., accidentally running with a stale inline key when the env var was supposed to be set but wasn't).

The doc wording could be clearer though — "takes precedence" means "if env_name is specified, it's the authoritative source", not "try env first, fall back to literal". The access_key_id field doc ("If omitted, the node falls back to access_key_id_env") describes the reverse direction: if no literal is provided, try the env var. The two fields are independent sources, not a cascading chain.

I'll leave the behavior as-is since it's the safer default, but happy to adjust if the human reviewer prefers fallback semantics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional and matches the RTMP node's resolve_rtmp_url_with_env behavior — when stream_key_env is set but the env var is missing, it hard-fails rather than silently falling back to stream_key.

The rationale: if a user explicitly configures access_key_id_env: "MY_VAR", a missing MY_VAR is a deployment misconfiguration that should be surfaced immediately, not silently masked by a literal fallback. The doc comment says "takes precedence" (not "falls back") — meaning if the env path is specified, it's the authoritative source and the literal is ignored.

Updated the doc comment in the latest commit to make the resolution order clearer (env var → literal → error, where specifying an env var name commits to that path).

Comment on lines +292 to +394
while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await {
if let Packet::Binary { data, .. } = packet {
stats_tracker.received();
packet_count += 1;
total_bytes += data.len() as u64;

buffer.extend_from_slice(&data);

// Flush when buffer reaches chunk_size
while buffer.len() >= self.config.chunk_size {
let chunk: Vec<u8> = buffer.drain(..self.config.chunk_size).collect();
if let Err(e) = writer.write(chunk).await {
stats_tracker.errored();
stats_tracker.force_send();
let msg = format!("S3 write error: {e}");
state_helpers::emit_failed(&context.state_tx, &node_name, &msg);
// Attempt to abort the multipart upload to clean up
if let Err(abort_err) = writer.abort().await {
tracing::error!(
%node_name,
error = %abort_err,
"Failed to abort S3 multipart upload after write error"
);
}
return Err(StreamKitError::Runtime(msg));
}
chunks_written += 1;
tracing::debug!(
%node_name,
chunks_written,
total_bytes,
"Flushed chunk to S3"
);
}

stats_tracker.sent();
stats_tracker.maybe_send();
} else {
tracing::warn!(
%node_name,
"Received non-Binary packet, ignoring"
);
stats_tracker.discarded();
}
}

// ── Flush remaining buffer ───────────────────────────────────────
if !buffer.is_empty() {
tracing::debug!(
%node_name,
remaining = buffer.len(),
"Flushing remaining buffer to S3"
);
if let Err(e) = writer.write(buffer).await {
stats_tracker.errored();
stats_tracker.force_send();
let msg = format!("S3 write error (final flush): {e}");
state_helpers::emit_failed(&context.state_tx, &node_name, &msg);
if let Err(abort_err) = writer.abort().await {
tracing::error!(
%node_name,
error = %abort_err,
"Failed to abort S3 multipart upload after final flush error"
);
}
return Err(StreamKitError::Runtime(msg));
}
chunks_written += 1;
}

// ── Close (finalize multipart upload) ────────────────────────────
tracing::info!(
%node_name,
"Closing S3 writer (finalizing multipart upload)"
);
if let Err(e) = writer.close().await {
stats_tracker.errored();
stats_tracker.force_send();
let msg = format!("Failed to finalize S3 upload: {e}");
state_helpers::emit_failed(&context.state_tx, &node_name, &msg);
// Attempt to abort the multipart upload to avoid orphaned parts.
if let Err(abort_err) = writer.abort().await {
tracing::error!(
%node_name,
error = %abort_err,
"Failed to abort S3 multipart upload after close error"
);
}
return Err(StreamKitError::Runtime(msg));
}

stats_tracker.force_send();
tracing::info!(
%node_name,
packet_count,
total_bytes,
chunks_written,
key = %self.config.key,
"ObjectStoreWriteNode finished uploading to S3"
);

state_helpers::emit_stopped(&context.state_tx, &node_name, reason);
Ok(())
Copy link
Copy Markdown
Contributor Author

@staging-devin-ai-integration staging-devin-ai-integration bot Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 No multipart upload abort on cancellation path

When recv_with_cancellation returns None due to a cancellation token (forced pipeline shutdown), the code falls through to flush remaining data and call writer.close() (line 367), which finalizes the multipart upload. This means a cancelled pipeline will still commit whatever data was buffered to S3. In contrast, a user might expect cancellation to abort the upload. However, this mirrors the pattern in crates/nodes/src/core/file_write.rs where the file is also flushed and closed normally after the receive loop exits, regardless of whether it was cancelled or input-closed. This is consistent but worth noting as a design decision.

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — this is a deliberate design choice matching file_write.rs where the file is also flushed and closed normally after the receive loop exits regardless of cancellation. Flushing on cancellation is the safer default for a storage sink (avoids data loss of already-processed data). If abort-on-cancel semantics are desired, it could be added as a config option in a follow-up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct observation — this is an intentional design choice matching file_write.rs. When recv_with_cancellation returns None (whether from input close or cancellation token), we flush and finalize rather than abort.

The rationale: for a recording/archive use case, partial data is generally more valuable than no data. The AbortOnDrop guard handles the truly unexpected case (Tokio task killed externally without going through the normal shutdown path), while graceful shutdown always commits what we have.

If a future use case needs "cancel = discard", that could be added as a config option (e.g. abort_on_cancel: true), but the default of "commit what you have" is the safer behavior for archival workloads.

streamkit-devin and others added 2 commits April 8, 2026 19:18
- Add AbortOnDrop guard to abort orphaned multipart uploads on task
  cancellation (critical: prevents storage leaks)
- Replace drain().collect() with split_off + mem::replace for zero-copy
  chunk extraction
- Validate endpoint, bucket, and key are non-empty in factory (fail-fast
  instead of runtime S3 errors)
- Refactor resolve_credential to accept an env lookup closure, eliminating
  unsound std::env::set_var calls in tests
- Inline reason string at use site (remove early allocation)
- Add comment about static key limitation in moq_s3_archive.yml
- Add tests for empty endpoint/bucket/key factory validation

Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Comment on lines +1 to +6
# SPDX-FileCopyrightText: © 2025 StreamKit Contributors
#
# SPDX-License-Identifier: MPL-2.0

# RustFS — lightweight S3-compatible object storage for local development
# and integration testing.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a repo root?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question — there are no other docker-compose files in the repo so there's no established convention. This could live under e2e/ or samples/ instead. Where would you prefer it? I'll move it.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe under a new docker/ path? It sounds like this could be useful besides testing even though that's the primary use case right now, but e2e is really about playwright testing atm, not backend e2e. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — moved to docker/docker-compose.rustfs.yml in 6b38e3b. Updated all references in the sample pipelines and the compose file's own usage comment.

Per review feedback — docker/ is a better home than repo root since
this is useful beyond just e2e/playwright testing.

Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
@streamer45 streamer45 merged commit 2e6bea7 into main Apr 9, 2026
17 checks passed
@streamer45 streamer45 deleted the devin/1775659476-object-store-writer branch April 9, 2026 06:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants