Skip to content

Commit f4f9d31

Browse files
chaliyclaude
andauthored
feat(tool): add streaming output support (#220)
## Summary - Add `exec_streaming` to `Bash` and `ToolDef::invoke_streaming` to the tool layer, delivering stdout/stderr incrementally via callbacks while still returning the full `ToolResult` - Update spec `009-tool-contract.md` with streaming API documentation - Add example (`streaming_output.rs`) and equivalence tests verifying streamed chunks reassemble to full output ## Test plan - [x] `cargo test --all-features` passes (includes streaming equivalence tests) - [x] `cargo run --example streaming_output` runs end-to-end - [ ] CI green --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 36dc727 commit f4f9d31

File tree

6 files changed

+665
-7
lines changed

6 files changed

+665
-7
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//! Streaming output example
2+
//!
3+
//! Demonstrates `exec_streaming` which delivers output incrementally
4+
//! via a callback, while still returning the full result at the end.
5+
//!
6+
//! Run with: cargo run --example streaming_output
7+
8+
use bashkit::Bash;
9+
use std::sync::{Arc, Mutex};
10+
11+
#[tokio::main]
12+
async fn main() -> anyhow::Result<()> {
13+
// --- Bash-level streaming ---
14+
println!("=== exec_streaming: for loop ===");
15+
let mut bash = Bash::new();
16+
let result = bash
17+
.exec_streaming(
18+
"for i in 1 2 3 4 5; do echo \"iteration $i\"; done",
19+
Box::new(|stdout, _stderr| {
20+
// Called after each loop iteration
21+
print!("[stream] {stdout}");
22+
}),
23+
)
24+
.await?;
25+
println!("--- final stdout ---\n{}", result.stdout);
26+
27+
// --- Collecting chunks ---
28+
println!("=== exec_streaming: collecting chunks ===");
29+
let chunks: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
30+
let chunks_cb = chunks.clone();
31+
let mut bash = Bash::new();
32+
let result = bash
33+
.exec_streaming(
34+
"echo start; for x in a b c; do echo $x; done; echo end",
35+
Box::new(move |stdout, _stderr| {
36+
chunks_cb.lock().unwrap().push(stdout.to_string());
37+
}),
38+
)
39+
.await?;
40+
{
41+
let chunks = chunks.lock().unwrap();
42+
println!("Chunks received: {chunks:?}");
43+
// Concatenated chunks == full stdout
44+
let reassembled: String = chunks.iter().cloned().collect();
45+
assert_eq!(reassembled, result.stdout);
46+
println!("Reassembled matches final stdout: OK");
47+
}
48+
49+
// --- Streaming stderr ---
50+
println!("\n=== exec_streaming: stderr ===");
51+
let mut bash = Bash::new();
52+
let result = bash
53+
.exec_streaming(
54+
"echo out1; echo err1 >&2; echo out2",
55+
Box::new(|stdout, stderr| {
56+
if !stdout.is_empty() {
57+
print!("[stdout] {stdout}");
58+
}
59+
if !stderr.is_empty() {
60+
eprint!("[stderr] {stderr}");
61+
}
62+
}),
63+
)
64+
.await?;
65+
println!("--- final stdout: {:?}", result.stdout);
66+
println!("--- final stderr: {:?}", result.stderr);
67+
68+
// --- Non-streaming still works ---
69+
println!("\n=== exec (non-streaming) for comparison ===");
70+
let mut bash = Bash::new();
71+
let result = bash.exec("for i in 1 2 3; do echo $i; done").await?;
72+
println!("stdout: {}", result.stdout);
73+
74+
Ok(())
75+
}

crates/bashkit/src/interpreter/mod.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ use crate::error::Error;
3030
use crate::error::Result;
3131
use crate::fs::FileSystem;
3232
use crate::limits::{ExecutionCounters, ExecutionLimits};
33+
34+
/// Callback for streaming output chunks as they are produced.
35+
///
36+
/// Arguments: `(stdout_chunk, stderr_chunk)`. Called after each loop iteration
37+
/// and each top-level command completes. Only non-empty chunks trigger a call.
38+
///
39+
/// Requires `Send + Sync` because the interpreter holds this across `.await` points.
40+
/// Closures capturing `Arc<Mutex<_>>` satisfy both bounds automatically.
41+
pub type OutputCallback = Box<dyn FnMut(&str, &str) + Send + Sync>;
3342
use crate::parser::{
3443
ArithmeticForCommand, AssignmentValue, CaseCommand, Command, CommandList, CompoundCommand,
3544
ForCommand, FunctionDef, IfCommand, ListOperator, ParameterOp, Parser, Pipeline, Redirect,
@@ -122,6 +131,13 @@ pub struct Interpreter {
122131
/// Stdin inherited from pipeline for compound commands (while read, etc.)
123132
/// Each read operation consumes one line, advancing through the data.
124133
pipeline_stdin: Option<String>,
134+
/// Optional callback for streaming output chunks during execution.
135+
/// When set, output is emitted incrementally via this callback in addition
136+
/// to being accumulated in the returned ExecResult.
137+
output_callback: Option<OutputCallback>,
138+
/// Monotonic counter incremented each time output is emitted via callback.
139+
/// Used to detect whether sub-calls already emitted output, preventing duplicates.
140+
output_emit_count: u64,
125141
}
126142

127143
impl Interpreter {
@@ -283,6 +299,8 @@ impl Interpreter {
283299
#[cfg(feature = "git")]
284300
git_client: None,
285301
pipeline_stdin: None,
302+
output_callback: None,
303+
output_emit_count: 0,
286304
}
287305
}
288306

@@ -325,6 +343,46 @@ impl Interpreter {
325343
self.cwd = cwd;
326344
}
327345

346+
/// Set an output callback for streaming output during execution.
347+
///
348+
/// When set, the interpreter calls this callback with `(stdout_chunk, stderr_chunk)`
349+
/// after each loop iteration, command list element, and top-level command.
350+
/// Output is still accumulated in the returned `ExecResult` for the final result.
351+
pub fn set_output_callback(&mut self, callback: OutputCallback) {
352+
self.output_callback = Some(callback);
353+
self.output_emit_count = 0;
354+
}
355+
356+
/// Clear the output callback.
357+
pub fn clear_output_callback(&mut self) {
358+
self.output_callback = None;
359+
self.output_emit_count = 0;
360+
}
361+
362+
/// Emit output via the callback if set, and if sub-calls didn't already emit.
363+
/// Returns `true` if output was emitted.
364+
///
365+
/// `emit_count_before` is the value of `output_emit_count` before the sub-call
366+
/// that produced this output. If the count advanced, sub-calls already emitted
367+
/// and we skip to avoid duplicates.
368+
fn maybe_emit_output(&mut self, stdout: &str, stderr: &str, emit_count_before: u64) -> bool {
369+
if self.output_callback.is_none() {
370+
return false;
371+
}
372+
// Sub-calls already emitted — skip to avoid duplicates
373+
if self.output_emit_count != emit_count_before {
374+
return false;
375+
}
376+
if stdout.is_empty() && stderr.is_empty() {
377+
return false;
378+
}
379+
if let Some(ref mut cb) = self.output_callback {
380+
cb(stdout, stderr);
381+
self.output_emit_count += 1;
382+
}
383+
true
384+
}
385+
328386
/// Set the HTTP client for network builtins (curl, wget).
329387
///
330388
/// This is only available when the `http_client` feature is enabled.
@@ -352,7 +410,9 @@ impl Interpreter {
352410
let mut exit_code = 0;
353411

354412
for command in &script.commands {
413+
let emit_before = self.output_emit_count;
355414
let result = self.execute_command(command).await?;
415+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
356416
stdout.push_str(&result.stdout);
357417
stderr.push_str(&result.stderr);
358418
exit_code = result.exit_code;
@@ -551,7 +611,9 @@ impl Interpreter {
551611
.insert(for_cmd.variable.clone(), value.clone());
552612

553613
// Execute body
614+
let emit_before = self.output_emit_count;
554615
let result = self.execute_command_sequence(&for_cmd.body).await?;
616+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
555617
stdout.push_str(&result.stdout);
556618
stderr.push_str(&result.stderr);
557619
exit_code = result.exit_code;
@@ -645,7 +707,9 @@ impl Interpreter {
645707
}
646708

647709
// Execute body
710+
let emit_before = self.output_emit_count;
648711
let result = self.execute_command_sequence(&arith_for.body).await?;
712+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
649713
stdout.push_str(&result.stdout);
650714
stderr.push_str(&result.stderr);
651715
exit_code = result.exit_code;
@@ -862,7 +926,9 @@ impl Interpreter {
862926
}
863927

864928
// Execute body
929+
let emit_before = self.output_emit_count;
865930
let result = self.execute_command_sequence(&while_cmd.body).await?;
931+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
866932
stdout.push_str(&result.stdout);
867933
stderr.push_str(&result.stderr);
868934
exit_code = result.exit_code;
@@ -945,7 +1011,9 @@ impl Interpreter {
9451011
}
9461012

9471013
// Execute body
1014+
let emit_before = self.output_emit_count;
9481015
let result = self.execute_command_sequence(&until_cmd.body).await?;
1016+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
9491017
stdout.push_str(&result.stdout);
9501018
stderr.push_str(&result.stderr);
9511019
exit_code = result.exit_code;
@@ -1592,7 +1660,9 @@ impl Interpreter {
15921660
let mut exit_code = 0;
15931661

15941662
for command in commands {
1663+
let emit_before = self.output_emit_count;
15951664
let result = self.execute_command(command).await?;
1665+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
15961666
stdout.push_str(&result.stdout);
15971667
stderr.push_str(&result.stderr);
15981668
exit_code = result.exit_code;
@@ -1671,7 +1741,9 @@ impl Interpreter {
16711741
let mut stdout = String::new();
16721742
let mut stderr = String::new();
16731743
let mut exit_code;
1744+
let emit_before = self.output_emit_count;
16741745
let result = self.execute_command(&list.first).await?;
1746+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
16751747
stdout.push_str(&result.stdout);
16761748
stderr.push_str(&result.stderr);
16771749
exit_code = result.exit_code;
@@ -1746,7 +1818,9 @@ impl Interpreter {
17461818
};
17471819

17481820
if should_execute {
1821+
let emit_before = self.output_emit_count;
17491822
let result = self.execute_command(cmd).await?;
1823+
self.maybe_emit_output(&result.stdout, &result.stderr, emit_before);
17501824
stdout.push_str(&result.stdout);
17511825
stderr.push_str(&result.stderr);
17521826
exit_code = result.exit_code;

0 commit comments

Comments
 (0)