Skip to content

Commit 2b29d5d

Browse files
staging-devin-ai-integration[bot]streamkit-devinstreamer45
authored
fix(nodes): object store writer improvements — passthrough, bucket check, review fixes (#278)
* fix: enable object_store by default and fix sample pipeline client sections - Add object_store to default features so the node is available without extra build flags. - Remove invalid 'output: type: none' from tts_to_s3.yml and transcode_to_s3.yml — the output section is optional, so omitting it is the correct way to indicate no client output. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * feat(nodes): add passthrough mode to object store writer for oneshot pipelines The oneshot engine requires linear pipelines (no fan-out). To support S3 upload alongside http_output in oneshot mode, the object store writer now accepts a 'passthrough' config option. When enabled, it forwards every incoming Binary packet to its 'out' pin after buffering for S3, allowing it to sit inline: muxer → s3_writer(passthrough) → http_output. Changes: - Add 'passthrough' bool config field (default: false) - Conditionally expose 'out' output pin in passthrough mode - Forward packets downstream after S3 buffering - Update oneshot pipeline YAMLs to use linear passthrough topology - Add Playwright E2E test (convert-s3.spec.ts) verifying both HTTP response and S3 upload via RustFS - Add unit tests for passthrough pin definitions Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): preserve content_type and metadata in passthrough forwarding The passthrough mode was reconstructing Binary packets with None for content_type and metadata, discarding the original values set by upstream muxers. Capture all fields in the destructure and forward them as-is. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style(e2e): format convert-s3.spec.ts with prettier Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style(e2e): reformat convert-s3.spec.ts with UI prettier config The E2E project uses the UI's prettierrc (single quotes), not the default prettier config. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): verify S3 bucket at init time and unify sample bucket names Add a bucket existence check (HEAD request via operator.stat) during node initialization, before opening the multipart writer. This catches 'NoSuchBucket' and permission errors immediately rather than after streaming data for minutes. Also unify all S3 sample pipelines to use 'streamkit-output' as the bucket name for simpler local validation. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): address review findings for object store writer Critical: - resolve_credential now falls through to literal when env var is missing or empty, instead of hard-erroring (defense-in-depth works) Suggestions: - Warn at config time when chunk_size < 5 MiB (S3 minimum part size) - Continue S3 write when downstream output channel closes in passthrough mode (archive completeness over early exit) - Add #[serde(skip_serializing)] on access_key_id and secret_access_key to prevent credential leakage in API responses / debug dumps - Switch E2E test from execSync to execFileSync (no shell injection) - Add inline comment explaining stats_tracker.sent() in sink mode Nits: - Use let _ = guard.disarm() to acknowledge unused return value - Fix module doc: only services-s3 is compiled, not GCS/Azure - Fix sample pipeline accept type: audio/ogg (not audio/opus) Tests: - Split env fallback tests into with-literal and without-literal variants (15 tests total, all passing) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style: format resolve_credential tracing calls Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: move S3_MIN_PART_SIZE constant to module scope Fixes clippy::items_after_statements lint by moving the constant out of the factory closure to the module-level constants section. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): pin RustFS image version and reject chunk_size < 5 MiB - Pin docker-compose RustFS image to 1.0.0-alpha.90 (was :latest) - Harden chunk_size validation: reject values below 5 MiB at config time instead of warning (prevents runtime EntityTooSmall errors) - Update schemars annotation to reflect 5 MiB minimum - Add test for sub-5MiB chunk_size rejection (16 tests total) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style: format chunk_size test assertion Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: add underscores to schemars range literal for clippy Fixes clippy::unreadable_literal on the 5242880 constant in the schemars attribute. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> --------- Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com>
1 parent 2e6bea7 commit 2b29d5d

File tree

8 files changed

+416
-29
lines changed

8 files changed

+416
-29
lines changed

crates/nodes/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ default = [
128128
"mp4",
129129
"openh264",
130130
"rtmp",
131+
"object_store",
131132
]
132133

133134
# Individual features for each node.

crates/nodes/src/core/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode
208208
false,
209209
"Streams binary data to S3-compatible object storage (AWS S3, GCS, Azure, MinIO, RustFS, etc.). \
210210
Uses multipart upload for bounded memory usage. \
211-
Credentials can be provided via config or environment variables.",
211+
Credentials can be provided via config or environment variables. \
212+
Set passthrough: true to forward packets downstream (required for oneshot pipelines).",
212213
);
213214
}
214215
}

crates/nodes/src/core/object_store_write.rs

Lines changed: 157 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,20 @@
44

55
//! Object store write node — streams binary data to S3-compatible object storage.
66
//!
7-
//! Uses [Apache OpenDAL](https://opendal.apache.org/) to support S3, GCS,
8-
//! Azure Blob, MinIO, RustFS, and other compatible backends.
7+
//! Uses [Apache OpenDAL](https://opendal.apache.org/) to support S3, MinIO,
8+
//! RustFS, and other S3-compatible backends. (Only the `services-s3` feature
9+
//! is compiled; GCS / Azure would require additional OpenDAL features.)
910
//!
1011
//! Incoming [`Packet::Binary`] packets are buffered up to `chunk_size` and
1112
//! written via OpenDAL's multipart [`Writer`](opendal::Writer), keeping memory
1213
//! bounded regardless of the total upload size.
14+
//!
15+
//! ## Passthrough mode
16+
//!
17+
//! When `passthrough` is enabled (default: `false`), the node also forwards
18+
//! every incoming packet to its `"out"` pin, allowing it to sit inline in a
19+
//! linear pipeline (e.g. `muxer → s3_writer → http_output`). This is
20+
//! required for oneshot pipelines which do not support fan-out.
1321
1422
use async_trait::async_trait;
1523
use schemars::JsonSchema;
@@ -27,6 +35,11 @@ use streamkit_core::{
2735
/// Default buffer/chunk size: 5 MiB (the S3 minimum multipart part size).
2836
const DEFAULT_CHUNK_SIZE: usize = 5 * 1024 * 1024;
2937

38+
/// S3 minimum multipart part size (5 MiB). Intermediate parts smaller than
39+
/// this will be rejected with `EntityTooSmall`; only the final part may be
40+
/// smaller.
41+
const S3_MIN_PART_SIZE: usize = 5 * 1024 * 1024;
42+
3043
const fn default_chunk_size() -> usize {
3144
DEFAULT_CHUNK_SIZE
3245
}
@@ -67,7 +80,7 @@ pub struct ObjectStoreWriteConfig {
6780
/// Access key ID.
6881
///
6982
/// If omitted, the node falls back to `access_key_id_env`.
70-
#[serde(default)]
83+
#[serde(default, skip_serializing)]
7184
pub access_key_id: Option<String>,
7285

7386
/// Environment variable name containing the access key ID.
@@ -79,7 +92,7 @@ pub struct ObjectStoreWriteConfig {
7992
/// Secret access key.
8093
///
8194
/// If omitted, the node falls back to `secret_key_env`.
82-
#[serde(default)]
95+
#[serde(default, skip_serializing)]
8396
pub secret_access_key: Option<String>,
8497

8598
/// Environment variable name containing the secret access key.
@@ -93,13 +106,22 @@ pub struct ObjectStoreWriteConfig {
93106
/// This controls the multipart upload part size. S3 requires a minimum
94107
/// part size of 5 MiB (except the last part).
95108
#[serde(default = "default_chunk_size")]
96-
#[schemars(range(min = 1))]
109+
#[schemars(range(min = 5_242_880))]
97110
pub chunk_size: usize,
98111

99112
/// Optional MIME content type for the uploaded object
100113
/// (e.g. `audio/ogg`, `video/mp4`).
101114
#[serde(default)]
102115
pub content_type: Option<String>,
116+
117+
/// When `true`, the node forwards every incoming packet to its `"out"`
118+
/// pin in addition to writing it to object storage. This allows the
119+
/// node to sit inline in a linear pipeline (required for oneshot mode
120+
/// which does not support fan-out).
121+
///
122+
/// Default: `false` (pure sink — no output pin).
123+
#[serde(default)]
124+
pub passthrough: bool,
103125
}
104126

105127
// ---------------------------------------------------------------------------
@@ -128,14 +150,12 @@ fn resolve_credential(
128150
return Ok(val);
129151
},
130152
Ok(_) => {
131-
return Err(StreamKitError::Configuration(format!(
132-
"Environment variable '{env}' for {label} is empty"
133-
)));
153+
// Env var exists but is empty — fall through to literal.
154+
tracing::debug!("Env var '{env}' for {label} is empty, trying literal fallback");
134155
},
135156
Err(_) => {
136-
return Err(StreamKitError::Configuration(format!(
137-
"Environment variable '{env}' for {label} is not set"
138-
)));
157+
// Env var not set — fall through to literal.
158+
tracing::debug!("Env var '{env}' for {label} is not set, trying literal fallback");
139159
},
140160
}
141161
}
@@ -240,6 +260,7 @@ impl ObjectStoreWriteNode {
240260
secret_key_env: None,
241261
chunk_size: default_chunk_size(),
242262
content_type: None,
263+
passthrough: false,
243264
}
244265
} else {
245266
config_helpers::parse_config_required(params)?
@@ -268,6 +289,14 @@ impl ObjectStoreWriteNode {
268289
));
269290
}
270291

292+
if config.chunk_size < S3_MIN_PART_SIZE && params.is_some() {
293+
return Err(StreamKitError::Configuration(format!(
294+
"chunk_size ({}) is below the S3 minimum multipart part size ({} bytes / 5 MiB). \
295+
Intermediate parts smaller than 5 MiB will be rejected by S3 with EntityTooSmall.",
296+
config.chunk_size, S3_MIN_PART_SIZE,
297+
)));
298+
}
299+
271300
Ok(Box::new(Self { config }))
272301
})
273302
}
@@ -288,8 +317,16 @@ impl ProcessorNode for ObjectStoreWriteNode {
288317
}
289318

290319
fn output_pins(&self) -> Vec<OutputPin> {
291-
// Sink — no outputs.
292-
vec![]
320+
if self.config.passthrough {
321+
vec![OutputPin {
322+
name: "out".to_string(),
323+
produces_type: PacketType::Passthrough,
324+
cardinality: PinCardinality::Broadcast,
325+
}]
326+
} else {
327+
// Pure sink — no outputs.
328+
vec![]
329+
}
293330
}
294331

295332
async fn run(self: Box<Self>, mut context: NodeContext) -> Result<(), StreamKitError> {
@@ -348,7 +385,22 @@ impl ProcessorNode for ObjectStoreWriteNode {
348385
.finish()
349386
};
350387

351-
tracing::info!(%node_name, "S3 operator created, opening writer");
388+
tracing::info!(%node_name, "S3 operator created, verifying bucket access");
389+
390+
// ── Verify bucket exists and is accessible ────────────────────────
391+
// Stat the root path — this issues a lightweight HEAD request to the
392+
// bucket, catching "NoSuchBucket" or permission errors at init time
393+
// rather than after streaming data for minutes.
394+
operator.stat("/").await.map_err(|e| {
395+
let msg = format!(
396+
"S3 bucket '{}' is not accessible at '{}': {e}",
397+
self.config.bucket, self.config.endpoint
398+
);
399+
state_helpers::emit_failed(&context.state_tx, &node_name, &msg);
400+
StreamKitError::Runtime(msg)
401+
})?;
402+
403+
tracing::info!(%node_name, "Bucket verified, opening writer");
352404

353405
// ── Open writer (multipart upload) ───────────────────────────────
354406
let writer_future = operator.writer_with(&self.config.key).chunk(self.config.chunk_size);
@@ -384,9 +436,12 @@ impl ProcessorNode for ObjectStoreWriteNode {
384436
let mut total_bytes: u64 = 0;
385437
let mut buffer = Vec::with_capacity(self.config.chunk_size);
386438
let mut chunks_written: u64 = 0;
439+
// Tracks whether the downstream output channel has closed (passthrough
440+
// mode only). When true we stop forwarding but keep writing to S3.
441+
let mut output_closed = false;
387442

388443
while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await {
389-
if let Packet::Binary { data, .. } = packet {
444+
if let Packet::Binary { data, content_type, metadata } = packet {
390445
stats_tracker.received();
391446
packet_count += 1;
392447
total_bytes += data.len() as u64;
@@ -414,6 +469,23 @@ impl ProcessorNode for ObjectStoreWriteNode {
414469
);
415470
}
416471

472+
// Forward the packet downstream when in passthrough mode.
473+
if self.config.passthrough && !output_closed {
474+
let forwarded = Packet::Binary { data, content_type, metadata };
475+
if context.output_sender.send("out", forwarded).await.is_err() {
476+
// Downstream closed but we keep writing to S3 so the
477+
// archive is complete. Skip further send attempts.
478+
tracing::info!(
479+
%node_name,
480+
"Output channel closed; continuing S3 write without forwarding"
481+
);
482+
output_closed = true;
483+
}
484+
}
485+
486+
// Consistent with file_write.rs: report a "sent" stat for
487+
// every packet consumed, even in pure-sink mode where there
488+
// is no downstream receiver.
417489
stats_tracker.sent();
418490
stats_tracker.maybe_send();
419491
} else {
@@ -458,7 +530,7 @@ impl ProcessorNode for ObjectStoreWriteNode {
458530
}
459531

460532
// Upload committed successfully — disarm the abort guard.
461-
guard.disarm();
533+
let _ = guard.disarm();
462534

463535
stats_tracker.force_send();
464536
tracing::info!(
@@ -488,9 +560,9 @@ mod tests {
488560
use streamkit_core::NodeStatsUpdate;
489561
use tokio::sync::mpsc;
490562

491-
/// Verify pin definitions for the object store write node.
563+
/// Verify pin definitions for the object store write node (sink mode).
492564
#[test]
493-
fn test_pin_definitions() {
565+
fn test_pin_definitions_sink() {
494566
let node = ObjectStoreWriteNode {
495567
config: ObjectStoreWriteConfig {
496568
endpoint: String::new(),
@@ -503,6 +575,7 @@ mod tests {
503575
secret_key_env: None,
504576
chunk_size: default_chunk_size(),
505577
content_type: None,
578+
passthrough: false,
506579
},
507580
};
508581

@@ -515,6 +588,35 @@ mod tests {
515588
assert!(outputs.is_empty(), "Sink node should have no output pins");
516589
}
517590

591+
/// Verify pin definitions for passthrough mode.
592+
#[test]
593+
fn test_pin_definitions_passthrough() {
594+
let node = ObjectStoreWriteNode {
595+
config: ObjectStoreWriteConfig {
596+
endpoint: String::new(),
597+
bucket: String::new(),
598+
key: String::new(),
599+
region: default_region(),
600+
access_key_id: None,
601+
access_key_id_env: None,
602+
secret_access_key: None,
603+
secret_key_env: None,
604+
chunk_size: default_chunk_size(),
605+
content_type: None,
606+
passthrough: true,
607+
},
608+
};
609+
610+
let inputs = node.input_pins();
611+
assert_eq!(inputs.len(), 1);
612+
assert_eq!(inputs[0].name, "in");
613+
614+
let outputs = node.output_pins();
615+
assert_eq!(outputs.len(), 1, "Passthrough mode should have one output pin");
616+
assert_eq!(outputs[0].name, "out");
617+
assert_eq!(outputs[0].produces_type, PacketType::Passthrough);
618+
}
619+
518620
/// Verify factory rejects zero chunk_size.
519621
#[test]
520622
fn test_factory_rejects_zero_chunk_size() {
@@ -534,6 +636,25 @@ mod tests {
534636
assert!(err.contains("chunk_size"), "Error should mention chunk_size: {err}");
535637
}
536638

639+
/// Verify factory rejects chunk_size below S3 minimum (5 MiB).
640+
#[test]
641+
fn test_factory_rejects_sub_5mib_chunk_size() {
642+
let factory = ObjectStoreWriteNode::factory();
643+
let params = serde_json::json!({
644+
"endpoint": "http://localhost:9000",
645+
"bucket": "test",
646+
"key": "test.bin",
647+
"chunk_size": 1024,
648+
});
649+
let result = factory(Some(&params));
650+
assert!(result.is_err());
651+
let err = match result {
652+
Err(e) => e.to_string(),
653+
Ok(_) => panic!("Expected error for sub-5MiB chunk_size"),
654+
};
655+
assert!(err.contains("5 MiB"), "Error should mention 5 MiB minimum: {err}");
656+
}
657+
537658
/// Stub lookup that never finds any variable.
538659
fn no_env(_name: &str) -> Result<String, std::env::VarError> {
539660
Err(std::env::VarError::NotPresent)
@@ -568,13 +689,30 @@ mod tests {
568689

569690
#[test]
570691
fn test_resolve_credential_env_empty() {
692+
// Env var is empty — should fall through to literal.
693+
let lookup = |_: &str| Ok(String::new());
694+
let result = resolve_credential(Some("ANY_VAR"), Some("fallback"), "test", lookup);
695+
assert_eq!(result.unwrap(), "fallback");
696+
}
697+
698+
#[test]
699+
fn test_resolve_credential_env_empty_no_literal() {
700+
// Env var is empty and no literal — should error.
571701
let lookup = |_: &str| Ok(String::new());
572702
let result = resolve_credential(Some("ANY_VAR"), None, "test", lookup);
573703
assert!(result.is_err());
574704
}
575705

576706
#[test]
577707
fn test_resolve_credential_env_not_set() {
708+
// Env var not set — should fall through to literal.
709+
let result = resolve_credential(Some("MISSING"), Some("fallback"), "test", no_env);
710+
assert_eq!(result.unwrap(), "fallback");
711+
}
712+
713+
#[test]
714+
fn test_resolve_credential_env_not_set_no_literal() {
715+
// Env var not set and no literal — should error.
578716
let result = resolve_credential(Some("MISSING"), None, "test", no_env);
579717
assert!(result.is_err());
580718
}
@@ -675,6 +813,7 @@ mod tests {
675813
secret_key_env: None,
676814
chunk_size: default_chunk_size(),
677815
content_type: None,
816+
passthrough: false,
678817
};
679818
let node = Box::new(ObjectStoreWriteNode { config });
680819

docker/docker-compose.rustfs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
services:
2424
rustfs:
25-
image: rustfs/rustfs:latest
25+
image: rustfs/rustfs:1.0.0-alpha.90
2626
container_name: rustfs-server
2727
ports:
2828
- "9000:9000" # S3 API

0 commit comments

Comments
 (0)