diff --git a/crates/loopal-runtime/src/agent_loop/llm.rs b/crates/loopal-runtime/src/agent_loop/llm.rs index 94658c1..d329d45 100644 --- a/crates/loopal-runtime/src/agent_loop/llm.rs +++ b/crates/loopal-runtime/src/agent_loop/llm.rs @@ -7,7 +7,7 @@ use loopal_message::{ContentBlock, Message}; use loopal_protocol::AgentEventPayload; use loopal_provider_api::StreamChunk; use std::time::Instant; -use tracing::{error, info}; +use tracing::{error, info, warn}; impl AgentLoopRunner { /// Stream the LLM response using a provided working copy of messages. @@ -52,13 +52,14 @@ impl AgentLoopRunner { .retry_stream_chat(&chat_params, &*provider, cancel) .await?; let mut result = LlmStreamResult::default(); + let mut received_done = false; loop { tokio::select! { biased; chunk = stream.next() => { let Some(chunk_result) = chunk else { break; }; - if !self.handle_stream_chunk(chunk_result, &mut result).await? { + if !self.handle_stream_chunk(chunk_result, &mut result, &mut received_done).await? { break; } } @@ -70,6 +71,14 @@ impl AgentLoopRunner { } } + // Stream EOF without Done → connection dropped mid-stream. + // Exclude cancellation: retry_stream_chat returns empty stream on cancel, + // which would look like truncation but is intentional. + if !received_done && !result.stream_error && !cancel.is_cancelled() { + warn!("SSE stream ended without message_stop — treating as stream truncation"); + result.stream_error = true; + } + self.emit_thinking_complete(&result).await?; let llm_duration = llm_start.elapsed(); info!( @@ -88,6 +97,7 @@ impl AgentLoopRunner { &mut self, chunk: std::result::Result, result: &mut LlmStreamResult, + received_done: &mut bool, ) -> Result { match chunk { Ok(StreamChunk::Text { text }) => { @@ -162,6 +172,7 @@ impl AgentLoopRunner { .await?; } Ok(StreamChunk::Done { stop_reason }) => { + *received_done = true; result.stop_reason = stop_reason; return Ok(false); } diff --git a/crates/loopal-runtime/src/agent_loop/llm_result.rs b/crates/loopal-runtime/src/agent_loop/llm_result.rs index 7a80dd1..fda2277 100644 --- a/crates/loopal-runtime/src/agent_loop/llm_result.rs +++ b/crates/loopal-runtime/src/agent_loop/llm_result.rs @@ -5,6 +5,9 @@ use loopal_provider_api::StopReason; pub struct LlmStreamResult { pub assistant_text: String, pub tool_uses: Vec<(String, String, serde_json::Value)>, + /// True when the stream did not complete normally: explicit error chunk, + /// user cancellation, or silent truncation (EOF without `message_stop`). + /// `turn_exec` uses this to decide whether to auto-continue or bail. pub stream_error: bool, pub stop_reason: StopReason, pub thinking_text: String, diff --git a/crates/loopal-runtime/src/agent_loop/mod.rs b/crates/loopal-runtime/src/agent_loop/mod.rs index 74fb053..ea23363 100644 --- a/crates/loopal-runtime/src/agent_loop/mod.rs +++ b/crates/loopal-runtime/src/agent_loop/mod.rs @@ -33,6 +33,7 @@ pub mod turn_context; mod turn_exec; pub(crate) mod turn_metrics; pub mod turn_observer; +mod turn_observer_dispatch; use std::collections::HashSet; use std::sync::Arc; diff --git a/crates/loopal-runtime/src/agent_loop/turn_exec.rs b/crates/loopal-runtime/src/agent_loop/turn_exec.rs index fe13868..e03b0ad 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_exec.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_exec.rs @@ -1,4 +1,6 @@ -//! Inner turn execution loop and observer dispatch. +//! Inner turn execution loop. +//! +//! Observer dispatch lives in `turn_observer_dispatch.rs`. use loopal_error::Result; use loopal_protocol::AgentEventPayload; @@ -8,7 +10,6 @@ use tracing::{debug, info, warn}; use super::TurnOutput; use super::runner::AgentLoopRunner; use super::turn_context::TurnContext; -use super::turn_observer::ObserverAction; impl AgentLoopRunner { /// Inner loop: LLM → [tools → LLM]* → done. @@ -26,9 +27,7 @@ impl AgentLoopRunner { return Ok(TurnOutput { output: last_text }); } - // Persistent compaction (LLM summarization if over budget) self.check_and_compact().await?; - // Prepare context for LLM (clone + strip old thinking) let working = self.params.store.prepare_for_llm(); turn_ctx.metrics.llm_calls += 1; let result = self.stream_llm_with(&working, &turn_ctx.cancel).await?; @@ -45,12 +44,30 @@ impl AgentLoopRunner { &result.tool_uses }; - // Auto-continue triggers: MaxTokens+tools, PauseTurn (server-side limit) + // Auto-continue: MaxTokens+tools, PauseTurn, or stream truncation. + // Exclude user cancellation — cancel should stop, not retry. let needs_auto_continue = truncated || result.stop_reason == StopReason::PauseTurn; - if needs_auto_continue { + let stream_truncated = result.stream_error + && !turn_ctx.cancel.is_cancelled() + && !(result.assistant_text.is_empty() && result.tool_uses.is_empty()); + + if needs_auto_continue || stream_truncated { + if stream_truncated { + warn!( + text_len = result.assistant_text.len(), + tool_calls = result.tool_uses.len(), + "stream truncated — discarding incomplete tool calls" + ); + } + // For truncated streams, discard possibly-incomplete tool_use blocks. + let tools = if stream_truncated { + &[][..] + } else { + effective_tools + }; self.record_assistant_message( &result.assistant_text, - effective_tools, + tools, &result.thinking_text, result.thinking_signature.as_deref(), result.server_blocks, @@ -71,6 +88,23 @@ impl AgentLoopRunner { return Ok(TurnOutput { output: last_text }); } + // Stream error (cancel or empty truncation) — record any partial + // text that was already streamed to the user, then exit. + if result.stream_error { + if !result.assistant_text.is_empty() { + let no_tools: &[(String, String, serde_json::Value)] = &[]; + self.record_assistant_message( + &result.assistant_text, + no_tools, + &result.thinking_text, + result.thinking_signature.as_deref(), + result.server_blocks, + ); + last_text.clone_from(&result.assistant_text); + } + return Ok(TurnOutput { output: last_text }); + } + self.record_assistant_message( &result.assistant_text, &result.tool_uses, @@ -82,13 +116,6 @@ impl AgentLoopRunner { last_text.clone_from(&result.assistant_text); } - if result.stream_error - && result.tool_uses.is_empty() - && result.assistant_text.is_empty() - { - return Ok(TurnOutput { output: last_text }); - } - if result.tool_uses.is_empty() && result.stop_reason == StopReason::MaxTokens { if continuation_count < self.params.harness.max_auto_continuations { continuation_count += 1; @@ -104,32 +131,14 @@ impl AgentLoopRunner { } if result.tool_uses.is_empty() { - // Stop hook: if any hook provides feedback, continue (bounded). - if stop_feedback_count < max_stop_feedback { - let stop_outputs = self - .params - .deps - .kernel - .hook_service() - .run_hooks( - loopal_config::HookEvent::Stop, - &loopal_hooks::HookContext { - stop_reason: Some("end_turn"), - ..Default::default() - }, - ) - .await; - let feedback: Vec<&str> = stop_outputs - .iter() - .filter_map(|o| o.additional_context.as_deref()) - .collect(); - if !feedback.is_empty() { - stop_feedback_count += 1; - self.params - .store - .append_warnings_to_last_user(vec![feedback.join("\n")]); - continue; - } + if stop_feedback_count < max_stop_feedback + && let Some(feedback) = self.run_stop_hooks().await + { + stop_feedback_count += 1; + self.params + .store + .append_warnings_to_last_user(vec![feedback]); + continue; } return Ok(TurnOutput { output: last_text }); } @@ -158,12 +167,10 @@ impl AgentLoopRunner { turn_ctx.metrics.tool_errors += stats.errors; info!("tool exec complete"); - // Append observer warnings after tool results (must follow ToolResult blocks). let warnings = std::mem::take(&mut turn_ctx.pending_warnings); self.params.store.append_warnings_to_last_user(warnings); self.inject_pending_messages().await; - // Observer: on_after_tools with results from the last message let result_blocks = self .params .store @@ -178,27 +185,4 @@ impl AgentLoopRunner { continuation_count = 0; } } - - /// Run before-tools observers. Returns `true` if the turn should abort. - pub(super) async fn run_before_tools( - &mut self, - turn_ctx: &mut TurnContext, - tool_uses: &[(String, String, serde_json::Value)], - ) -> Result { - for obs in &mut self.observers { - match obs.on_before_tools(turn_ctx, tool_uses) { - ObserverAction::Continue => {} - ObserverAction::InjectWarning(msg) => { - turn_ctx.pending_warnings.push(msg); - } - ObserverAction::AbortTurn(reason) => { - warn!(%reason, "observer aborted turn"); - self.emit(AgentEventPayload::Error { message: reason }) - .await?; - return Ok(true); - } - } - } - Ok(false) - } } diff --git a/crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs b/crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs new file mode 100644 index 0000000..f050e88 --- /dev/null +++ b/crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs @@ -0,0 +1,64 @@ +//! Stop hook and observer dispatch for the turn execution loop. +//! +//! Extracted from `turn_exec` — these are lifecycle extension points +//! with independent change reasons (hook config, observer API). + +use loopal_error::Result; +use loopal_protocol::AgentEventPayload; +use tracing::warn; + +use super::runner::AgentLoopRunner; +use super::turn_context::TurnContext; +use super::turn_observer::ObserverAction; + +impl AgentLoopRunner { + /// Run before-tools observers. Returns `true` if the turn should abort. + pub(super) async fn run_before_tools( + &mut self, + turn_ctx: &mut TurnContext, + tool_uses: &[(String, String, serde_json::Value)], + ) -> Result { + for obs in &mut self.observers { + match obs.on_before_tools(turn_ctx, tool_uses) { + ObserverAction::Continue => {} + ObserverAction::InjectWarning(msg) => { + turn_ctx.pending_warnings.push(msg); + } + ObserverAction::AbortTurn(reason) => { + warn!(%reason, "observer aborted turn"); + self.emit(AgentEventPayload::Error { message: reason }) + .await?; + return Ok(true); + } + } + } + Ok(false) + } + + /// Run Stop lifecycle hooks. Returns feedback to inject if hooks want + /// the agent to continue, or `None` to let the turn end. + pub(super) async fn run_stop_hooks(&self) -> Option { + let stop_outputs = self + .params + .deps + .kernel + .hook_service() + .run_hooks( + loopal_config::HookEvent::Stop, + &loopal_hooks::HookContext { + stop_reason: Some("end_turn"), + ..Default::default() + }, + ) + .await; + let feedback: Vec<&str> = stop_outputs + .iter() + .filter_map(|o| o.additional_context.as_deref()) + .collect(); + if feedback.is_empty() { + None + } else { + Some(feedback.join("\n")) + } + } +} diff --git a/crates/loopal-runtime/tests/agent_loop/llm_test.rs b/crates/loopal-runtime/tests/agent_loop/llm_test.rs index 4e10b9a..9e2b58b 100644 --- a/crates/loopal-runtime/tests/agent_loop/llm_test.rs +++ b/crates/loopal-runtime/tests/agent_loop/llm_test.rs @@ -177,7 +177,7 @@ async fn test_stream_llm_error_in_stream() { #[tokio::test] async fn test_stream_llm_empty_stream() { - // Empty stream (no chunks at all) — tests the while loop body never executing + // Empty stream (no chunks at all) — stream EOF without Done = truncation. let chunks = vec![]; let (mut runner, _event_rx, _input_tx, _ctrl_tx) = make_runner_with_mock_provider(chunks); @@ -189,7 +189,10 @@ async fn test_stream_llm_empty_stream() { let stream_error = result.stream_error; assert!(text.is_empty()); assert!(tool_uses.is_empty()); - assert!(!stream_error); + assert!( + stream_error, + "empty stream (no Done) should set stream_error" + ); } #[test] diff --git a/crates/loopal-runtime/tests/agent_loop/llm_truncation_test.rs b/crates/loopal-runtime/tests/agent_loop/llm_truncation_test.rs new file mode 100644 index 0000000..7edfd18 --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/llm_truncation_test.rs @@ -0,0 +1,132 @@ +//! Unit tests for stream truncation detection in `stream_llm_with`. +//! +//! These test the llm.rs `received_done` / `stream_error` logic directly, +//! without going through the full turn execution loop. + +use loopal_provider_api::{StopReason, StreamChunk}; + +use super::{make_cancel, make_runner_with_mock_provider}; + +/// EOF after text chunks but no Done → stream_error=true (silent truncation). +#[tokio::test] +async fn test_eof_with_text_no_done_sets_stream_error() { + let chunks = vec![ + Ok(StreamChunk::Text { + text: "Let me create the file.".into(), + }), + // Stream ends — no Done chunk + ]; + let (mut runner, _rx, _mbox, _ctrl) = make_runner_with_mock_provider(chunks); + let msgs = runner.params.store.messages().to_vec(); + let cancel = make_cancel(); + + let result = runner.stream_llm_with(&msgs, &cancel).await.unwrap(); + + assert_eq!(result.assistant_text, "Let me create the file."); + assert!( + result.stream_error, + "EOF without Done should set stream_error" + ); + assert!(result.tool_uses.is_empty()); + // stop_reason stays at default EndTurn (Done never arrived to set it) + assert_eq!(result.stop_reason, StopReason::EndTurn); +} + +/// EOF after text + complete tool_use but no Done → stream_error=true, +/// tool_use is still collected (tool JSON was complete at content_block_stop). +#[tokio::test] +async fn test_eof_with_text_and_tool_no_done_sets_stream_error() { + let chunks = vec![ + Ok(StreamChunk::Text { + text: "Let me read.".into(), + }), + Ok(StreamChunk::ToolUse { + id: "tc-1".into(), + name: "Read".into(), + input: serde_json::json!({"file_path": "/tmp/test.txt"}), + }), + // No Done — stream truncated after tool_use + ]; + let (mut runner, _rx, _mbox, _ctrl) = make_runner_with_mock_provider(chunks); + let msgs = runner.params.store.messages().to_vec(); + let cancel = make_cancel(); + + let result = runner.stream_llm_with(&msgs, &cancel).await.unwrap(); + + assert_eq!(result.assistant_text, "Let me read."); + assert!(result.stream_error); + // Tool was fully parsed (content_block_stop emitted ToolUse chunk) + assert_eq!(result.tool_uses.len(), 1); + assert_eq!(result.tool_uses[0].1, "Read"); +} + +/// Normal response with Done → stream_error=false. +/// Regression guard: ensure normal flow is unaffected. +#[tokio::test] +async fn test_normal_response_with_done_no_stream_error() { + let chunks = vec![ + Ok(StreamChunk::Text { + text: "All done.".into(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ]; + let (mut runner, _rx, _mbox, _ctrl) = make_runner_with_mock_provider(chunks); + let msgs = runner.params.store.messages().to_vec(); + let cancel = make_cancel(); + + let result = runner.stream_llm_with(&msgs, &cancel).await.unwrap(); + + assert_eq!(result.assistant_text, "All done."); + assert!(!result.stream_error); + assert_eq!(result.stop_reason, StopReason::EndTurn); +} + +/// Explicit Err chunk (e.g. StreamEnded) + partial text → stream_error=true. +/// The Err handler (not the truncation detector) sets stream_error. +#[tokio::test] +async fn test_err_chunk_with_text_sets_stream_error() { + let chunks = vec![ + Ok(StreamChunk::Text { + text: "partial".into(), + }), + Err(loopal_error::LoopalError::Provider( + loopal_error::ProviderError::StreamEnded, + )), + ]; + let (mut runner, _rx, _mbox, _ctrl) = make_runner_with_mock_provider(chunks); + let msgs = runner.params.store.messages().to_vec(); + let cancel = make_cancel(); + + let result = runner.stream_llm_with(&msgs, &cancel).await.unwrap(); + + assert_eq!(result.assistant_text, "partial"); + assert!(result.stream_error); + // Truncation detector skips because stream_error already set by Err handler +} + +/// MaxTokens Done → stream_error=false, stop_reason=MaxTokens. +/// Regression guard: MaxTokens should not be confused with truncation. +#[tokio::test] +async fn test_max_tokens_done_not_confused_with_truncation() { + let chunks = vec![ + Ok(StreamChunk::Text { + text: "long text...".into(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::MaxTokens, + }), + ]; + let (mut runner, _rx, _mbox, _ctrl) = make_runner_with_mock_provider(chunks); + let msgs = runner.params.store.messages().to_vec(); + let cancel = make_cancel(); + + let result = runner.stream_llm_with(&msgs, &cancel).await.unwrap(); + + assert!( + !result.stream_error, + "MaxTokens with Done is not truncation" + ); + assert_eq!(result.stop_reason, StopReason::MaxTokens); +} diff --git a/crates/loopal-runtime/tests/agent_loop/mod.rs b/crates/loopal-runtime/tests/agent_loop/mod.rs index 569f31b..73a5b23 100644 --- a/crates/loopal-runtime/tests/agent_loop/mod.rs +++ b/crates/loopal-runtime/tests/agent_loop/mod.rs @@ -48,6 +48,7 @@ mod input_scheduled_test; mod input_test; mod integration_test; mod llm_test; +mod llm_truncation_test; pub mod mock_provider; pub use mock_provider::make_runner_with_mock_provider; mod cancel_test; @@ -60,6 +61,8 @@ mod preflight_test; mod record_message_test; mod retry_cancel_test; mod run_test; +mod stream_truncation_edge_test; +mod stream_truncation_test; mod tools_test; mod turn_completion_edge_test; mod turn_completion_test; diff --git a/crates/loopal-runtime/tests/agent_loop/stream_truncation_edge_test.rs b/crates/loopal-runtime/tests/agent_loop/stream_truncation_edge_test.rs new file mode 100644 index 0000000..72d1c4f --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/stream_truncation_edge_test.rs @@ -0,0 +1,157 @@ +//! Edge-case integration tests for stream truncation. +//! +//! Covers EOF-without-Done scenarios and continuation exhaustion. + +use loopal_error::TerminateReason; +use loopal_provider_api::{StopReason, StreamChunk}; + +use super::mock_provider::make_multi_runner; + +/// EOF without Done + partial text → auto-continue → tool call → complete. +/// Reproduces the original sub-agent bug: proxy cuts SSE mid-response. +#[tokio::test] +async fn test_eof_text_then_tool_on_continue() { + let tmp = std::env::temp_dir().join(format!("la_trunc_{}.txt", std::process::id())); + std::fs::write(&tmp, "data").unwrap(); + let calls = vec![ + // First LLM: text only, EOF (no Done) — proxy cut + vec![Ok(StreamChunk::Text { + text: "Let me create the file.".into(), + })], + // Auto-continue: model sees partial text, issues tool + vec![ + Ok(StreamChunk::ToolUse { + id: "tc-1".into(), + name: "Read".into(), + input: serde_json::json!({"file_path": tmp.to_str().unwrap()}), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + // Final: text after tool execution + vec![ + Ok(StreamChunk::Text { + text: "File created.".into(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert_eq!(output.result, "File created."); + assert_eq!(output.terminate_reason, TerminateReason::Goal); + let _ = std::fs::remove_file(&tmp); +} + +/// Repeated EOF truncation exhausts max_auto_continuations → preserves last text. +#[tokio::test] +async fn test_repeated_truncation_exhausts_continuations() { + // max_auto_continuations = 3: original + 3 retries = 4 LLM calls + let calls = vec![ + vec![Ok(StreamChunk::Text { + text: "attempt 1".into(), + })], + vec![Ok(StreamChunk::Text { + text: "attempt 2".into(), + })], + vec![Ok(StreamChunk::Text { + text: "attempt 3".into(), + })], + vec![Ok(StreamChunk::Text { + text: "attempt 4".into(), + })], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert_eq!(output.result, "attempt 4"); + assert_eq!(output.terminate_reason, TerminateReason::Goal); +} + +/// Prior successful turn → second turn truncates → auto-continue → final output. +#[tokio::test] +async fn test_truncation_preserves_prior_turn_output() { + let tmp = std::env::temp_dir().join(format!("la_prior_{}.txt", std::process::id())); + std::fs::write(&tmp, "x").unwrap(); + let calls = vec![ + // First LLM: text + tool (normal) + vec![ + Ok(StreamChunk::Text { + text: "Step one done.".into(), + }), + Ok(StreamChunk::ToolUse { + id: "tc-1".into(), + name: "Read".into(), + input: serde_json::json!({"file_path": tmp.to_str().unwrap()}), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + // Second LLM: truncated (text only, no Done) + vec![Ok(StreamChunk::Text { + text: "Starting step two...".into(), + })], + // Third LLM (auto-continue): final + vec![ + Ok(StreamChunk::Text { + text: "Step two complete.".into(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert_eq!(output.result, "Step two complete."); + assert_eq!(output.terminate_reason, TerminateReason::Goal); + let _ = std::fs::remove_file(&tmp); +} + +/// Truncation with only tool_uses (no text) → tools discarded, auto-continue. +#[tokio::test] +async fn test_truncation_tool_only_discards_and_continues() { + let tmp = std::env::temp_dir().join(format!("la_toolonly_{}.txt", std::process::id())); + std::fs::write(&tmp, "y").unwrap(); + let path_json = serde_json::json!({"file_path": tmp.to_str().unwrap()}); + let calls = vec![ + vec![Ok(StreamChunk::ToolUse { + id: "tc-1".into(), + name: "Read".into(), + input: path_json.clone(), + })], + vec![ + Ok(StreamChunk::ToolUse { + id: "tc-2".into(), + name: "Read".into(), + input: path_json, + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + vec![ + Ok(StreamChunk::Text { + text: "Read complete.".into(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert_eq!(output.result, "Read complete."); + let _ = std::fs::remove_file(&tmp); +} diff --git a/crates/loopal-runtime/tests/agent_loop/stream_truncation_test.rs b/crates/loopal-runtime/tests/agent_loop/stream_truncation_test.rs new file mode 100644 index 0000000..173efc1 --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/stream_truncation_test.rs @@ -0,0 +1,126 @@ +//! Integration tests for stream truncation auto-continue. +//! +//! Full turn_exec loop: truncation → record partial → auto-continue. + +use loopal_error::TerminateReason; +use loopal_provider_api::{StopReason, StreamChunk}; + +use super::mock_provider::make_multi_runner; + +/// Explicit Err (e.g. StreamEnded) after partial text → auto-continue +/// → second LLM call completes normally. +#[tokio::test] +async fn test_err_with_text_triggers_auto_continue() { + let calls = vec![ + // First LLM call: text then error (no Done) + vec![ + Ok(StreamChunk::Text { + text: "Let me check.".into(), + }), + Err(loopal_error::LoopalError::Provider( + loopal_error::ProviderError::StreamEnded, + )), + ], + // Second LLM call (auto-continue): model finishes + vec![ + Ok(StreamChunk::Text { + text: "Here is the result.".into(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert_eq!(output.result, "Here is the result."); + assert_eq!(output.terminate_reason, TerminateReason::Goal); +} + +/// Err after text + complete tool_use → tool is discarded (may be subset +/// of intended tools), auto-continue → model re-issues the tool call. +#[tokio::test] +async fn test_err_with_tool_discards_and_continues() { + let tmp = std::env::temp_dir().join(format!("la_errtool_{}.txt", std::process::id())); + std::fs::write(&tmp, "data").unwrap(); + let calls = vec![ + // First LLM: text + tool + Err (proxy dropped after first tool) + vec![ + Ok(StreamChunk::Text { + text: "Reading file.".into(), + }), + Ok(StreamChunk::ToolUse { + id: "tc-1".into(), + name: "Read".into(), + input: serde_json::json!({"file_path": tmp.to_str().unwrap()}), + }), + Err(loopal_error::LoopalError::Provider( + loopal_error::ProviderError::StreamEnded, + )), + ], + // Second LLM (auto-continue): re-issues the tool + vec![ + Ok(StreamChunk::ToolUse { + id: "tc-2".into(), + name: "Read".into(), + input: serde_json::json!({"file_path": tmp.to_str().unwrap()}), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + // Third LLM: final text after tool + vec![ + Ok(StreamChunk::Text { + text: "File contents retrieved.".into(), + }), + Ok(StreamChunk::Done { + stop_reason: StopReason::EndTurn, + }), + ], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert_eq!(output.result, "File contents retrieved."); + assert_eq!(output.terminate_reason, TerminateReason::Goal); + let _ = std::fs::remove_file(&tmp); +} + +/// Empty stream error (Err on first chunk, no text) → no auto-continue, exit. +#[tokio::test] +async fn test_empty_stream_error_exits_without_continue() { + let calls = vec![ + // Only one LLM call: immediate error + vec![Err(loopal_error::LoopalError::Provider( + loopal_error::ProviderError::StreamEnded, + ))], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert!(output.result.is_empty()); + assert_eq!(output.terminate_reason, TerminateReason::Goal); + // Only 1 LLM call — no retry for completely empty error + assert_eq!(runner.turn_count, 1); +} + +/// EOF without Done + empty response → exit (nothing to continue from). +#[tokio::test] +async fn test_eof_empty_stream_exits_without_continue() { + let calls = vec![ + // Empty stream: no chunks at all, no Done + vec![], + ]; + let (mut runner, mut event_rx) = make_multi_runner(calls); + tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); + + let output = runner.run().await.unwrap(); + assert!(output.result.is_empty()); + assert_eq!(output.terminate_reason, TerminateReason::Goal); + assert_eq!(runner.turn_count, 1); +}