From 2e9873823311c9fb9a576a8aa9064e1970c1c202 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Fri, 20 Feb 2026 03:22:23 +0000 Subject: [PATCH 1/4] Graceful shutdown so claude can save session state before exit Replace immediate kill() with shutdown() (close stdin + 2s timeout + fallback SIGKILL) in ralph, worker, fork, and reload paths. This gives the claude CLI time to persist the final message to its session file before the process is terminated, fixing lost messages on resume. Fixes #32 Co-Authored-By: Claude Opus 4.6 --- src/commands/ralph.rs | 7 ++++--- src/commands/run.rs | 2 +- src/commands/worker.rs | 7 ++++--- src/session/event_loop.rs | 7 ++++--- src/session/runner.rs | 24 +++++++++++++++++++++++- 5 files changed, 36 insertions(+), 11 deletions(-) diff --git a/src/commands/ralph.rs b/src/commands/ralph.rs index 165e1de..a02504e 100644 --- a/src/commands/ralph.rs +++ b/src/commands/ralph.rs @@ -203,9 +203,10 @@ pub async fn ralph( &features, ) .await?; - // Kill the CLI process immediately to prevent async task - // notifications from triggering an invisible continuation. - runner.kill().await?; + // Shut down gracefully, giving claude time to save the final + // message to the session file. The timeout prevents invisible + // continuations from async task notifications. + runner.shutdown().await?; match handle_session_outcome( outcome, diff --git a/src/commands/run.rs b/src/commands/run.rs index 3482381..befc138 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -165,7 +165,7 @@ async fn handle_outcome( resume_after_pause(session_id, base_session_cfg, runner, state, ctx).await } SessionOutcome::Reload { .. } => { - runner.kill().await?; + runner.shutdown().await?; let Some(session_id) = state.session_id.take() else { return Ok(false); }; diff --git a/src/commands/worker.rs b/src/commands/worker.rs index 221b7f4..7498ef4 100644 --- a/src/commands/worker.rs +++ b/src/commands/worker.rs @@ -840,9 +840,10 @@ async fn run_phase_session( ) .await?; - // Kill the CLI process immediately to prevent async task - // notifications from triggering an invisible continuation. - runner.kill().await?; + // Shut down gracefully, giving claude time to save the final + // message to the session file. The timeout prevents invisible + // continuations from async task notifications. + runner.shutdown().await?; match outcome { SessionOutcome::Completed { result_text } => { diff --git a/src/session/event_loop.rs b/src/session/event_loop.rs index 4a9cb5d..cd1f85b 100644 --- a/src/session/event_loop.rs +++ b/src/session/event_loop.rs @@ -500,9 +500,10 @@ async fn execute_fork( bail!("fork detected but fork_config is None"); }; - // Kill the parent CLI process to prevent async task notifications - // from triggering an invisible continuation while fork children run. - runner.kill().await?; + // Shut down the parent gracefully so it can save the final message + // (with fork tags) to the session file before fork children run. + // The timeout prevents invisible continuations from async tasks. + runner.shutdown().await?; let msg = fork::run_fork(&session_id, tasks, fork_cfg, renderer, vcr).await?; diff --git a/src/session/runner.rs b/src/session/runner.rs index a0d1912..a47a687 100644 --- a/src/session/runner.rs +++ b/src/session/runner.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; use std::process::Stdio; +use std::time::Duration; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; @@ -183,7 +184,7 @@ impl SessionRunner { } } - /// Kill the claude process. No-op on stubs. + /// Kill the claude process immediately (SIGKILL). No-op on stubs. pub async fn kill(&mut self) -> Result<()> { if let Some(child) = &mut self.child { child.kill().await?; @@ -191,6 +192,27 @@ impl SessionRunner { Ok(()) } + /// Gracefully shut down the claude process. + /// + /// Closes stdin to signal EOF, then waits briefly for the process to exit + /// (so it can save session state). Falls back to SIGKILL if the process + /// doesn't exit within the timeout. + pub async fn shutdown(&mut self) -> Result<()> { + self.close_input(); + if let Some(child) = &mut self.child { + match tokio::time::timeout(Duration::from_secs(2), child.wait()).await { + Ok(Ok(_)) => {} + Ok(Err(e)) => return Err(e.into()), + Err(_) => { + // Timed out — force-kill to prevent invisible continuations + // from async task notifications. + child.kill().await?; + } + } + } + Ok(()) + } + fn spawn_reader( stdout: ChildStdout, stderr: ChildStderr, From 79d2ccfdb915f471f200de6b6e970714b24b7bce Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Sat, 21 Feb 2026 09:33:59 +0000 Subject: [PATCH 2/4] Wait for stdout close instead of fixed timeout in shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of a 2-second timeout on child.wait(), track the stdout reader task's JoinHandle and await it in shutdown(). The reader completes when the CLI closes stdout, which happens after session state is persisted — making it a reliable signal that the session was saved. A 30-second timeout is kept as a safety net for pathological cases (e.g., async task notifications triggering an unexpected new turn). Also update run.rs to use shutdown() for end-of-session and pre- interactive cleanup, ensuring session state is saved before proceeding. Co-Authored-By: Claude Opus 4.6 --- src/commands/run.rs | 6 ++---- src/session/runner.rs | 46 ++++++++++++++++++++++++++++++------------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/commands/run.rs b/src/commands/run.rs index befc138..1ed562d 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -112,8 +112,7 @@ pub async fn run( } } - runner.close_input(); - let _ = runner.wait().await; + runner.shutdown().await?; Ok(renderer.into_messages()) } @@ -139,8 +138,7 @@ async fn handle_outcome( { FollowUpAction::Sent => Ok(true), FollowUpAction::Interactive => { - runner.close_input(); - let _ = runner.wait().await; + runner.shutdown().await?; let Some(session_id) = state.session_id.take() else { return Ok(false); }; diff --git a/src/session/runner.rs b/src/session/runner.rs index a47a687..8d4051b 100644 --- a/src/session/runner.rs +++ b/src/session/runner.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout}; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use crate::event::AppEvent; use crate::protocol::emit::format_user_message; @@ -48,6 +49,10 @@ impl SessionConfig { pub struct SessionRunner { child: Option, stdin: Option, + /// Handle to the background stdout reader task. When this completes, + /// the process has closed stdout (which happens after session state + /// is persisted). Used by `shutdown()` to detect save completion. + reader_handle: Option>, } impl SessionRunner { @@ -96,11 +101,12 @@ impl SessionRunner { } // Spawn stdout reader task (also collects stderr on exit) - Self::spawn_reader(stdout, stderr, event_tx); + let reader_handle = Self::spawn_reader(stdout, stderr, event_tx); Ok(Self { child: Some(child), stdin: Some(stdin), + reader_handle: Some(reader_handle), }) } @@ -110,6 +116,7 @@ impl SessionRunner { Self { child: None, stdin: None, + reader_handle: None, } } @@ -194,21 +201,32 @@ impl SessionRunner { /// Gracefully shut down the claude process. /// - /// Closes stdin to signal EOF, then waits briefly for the process to exit - /// (so it can save session state). Falls back to SIGKILL if the process - /// doesn't exit within the timeout. + /// Closes stdin to signal EOF, then waits for the stdout reader task to + /// finish. The reader completes when the CLI closes stdout, which happens + /// after session state has been persisted — so stdout closing is a + /// reliable signal that the session was saved. A generous timeout is kept + /// as a safety net for pathological cases (e.g., async task notifications + /// triggering an unexpected new turn after the result event). pub async fn shutdown(&mut self) -> Result<()> { self.close_input(); + + // Wait for the stdout reader to finish. Once stdout closes, the CLI + // has finished all output including persisting session state. + let timed_out = if let Some(handle) = self.reader_handle.take() { + tokio::time::timeout(Duration::from_secs(30), handle) + .await + .is_err() + } else { + false + }; + if let Some(child) = &mut self.child { - match tokio::time::timeout(Duration::from_secs(2), child.wait()).await { - Ok(Ok(_)) => {} - Ok(Err(e)) => return Err(e.into()), - Err(_) => { - // Timed out — force-kill to prevent invisible continuations - // from async task notifications. - child.kill().await?; - } + if timed_out { + child.kill().await?; } + // Reap the child process. After stdout closes the process has + // typically already exited, so this returns near-instantly. + let _ = child.wait().await; } Ok(()) } @@ -217,7 +235,7 @@ impl SessionRunner { stdout: ChildStdout, stderr: ChildStderr, event_tx: mpsc::UnboundedSender, - ) { + ) -> JoinHandle<()> { // Collect stderr in the background so it doesn't block the process. let stderr_handle = tokio::spawn(async move { let mut reader = BufReader::new(stderr); @@ -257,7 +275,7 @@ impl SessionRunner { } let _ = event_tx.send(AppEvent::ProcessExit(None)); - }); + }) } } From c7ffd8f166391d1b2bd740358fb93baa8e29eed1 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Sat, 21 Feb 2026 11:05:54 +0000 Subject: [PATCH 3/4] Kill after turn completion instead of waiting for stdout close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous approach waited for the stdout reader task to finish (with a 30s timeout), but claude only closes stdout after the assistant is fully done — which may include starting new turns from async task notifications. Instead, since shutdown() is always called after the event loop has received a Result event (confirming the current turn is complete), we can just close stdin and kill immediately. This removes the reader_handle tracking (no longer needed) and simplifies shutdown to: close stdin + SIGKILL. Co-Authored-By: Claude Opus 4.6 --- src/commands/ralph.rs | 6 ++--- src/commands/worker.rs | 6 ++--- src/session/event_loop.rs | 6 ++--- src/session/runner.rs | 46 ++++++++++----------------------------- 4 files changed, 20 insertions(+), 44 deletions(-) diff --git a/src/commands/ralph.rs b/src/commands/ralph.rs index a02504e..b51b0c7 100644 --- a/src/commands/ralph.rs +++ b/src/commands/ralph.rs @@ -203,9 +203,9 @@ pub async fn ralph( &features, ) .await?; - // Shut down gracefully, giving claude time to save the final - // message to the session file. The timeout prevents invisible - // continuations from async task notifications. + // The event loop has received the Result event (turn complete). + // Kill the process to prevent async task notifications from + // triggering an invisible continuation. runner.shutdown().await?; match handle_session_outcome( diff --git a/src/commands/worker.rs b/src/commands/worker.rs index 7498ef4..811d4f6 100644 --- a/src/commands/worker.rs +++ b/src/commands/worker.rs @@ -840,9 +840,9 @@ async fn run_phase_session( ) .await?; - // Shut down gracefully, giving claude time to save the final - // message to the session file. The timeout prevents invisible - // continuations from async task notifications. + // The event loop has received the Result event (turn complete). + // Kill the process to prevent async task notifications from + // triggering an invisible continuation. runner.shutdown().await?; match outcome { diff --git a/src/session/event_loop.rs b/src/session/event_loop.rs index cd1f85b..04636e4 100644 --- a/src/session/event_loop.rs +++ b/src/session/event_loop.rs @@ -500,9 +500,9 @@ async fn execute_fork( bail!("fork detected but fork_config is None"); }; - // Shut down the parent gracefully so it can save the final message - // (with fork tags) to the session file before fork children run. - // The timeout prevents invisible continuations from async tasks. + // The event loop has received the Result event (turn complete, with + // fork tags). Kill the parent before fork children run to prevent + // async task notifications from triggering an invisible continuation. runner.shutdown().await?; let msg = fork::run_fork(&session_id, tasks, fork_cfg, renderer, vcr).await?; diff --git a/src/session/runner.rs b/src/session/runner.rs index 8d4051b..2cd7376 100644 --- a/src/session/runner.rs +++ b/src/session/runner.rs @@ -1,13 +1,11 @@ use std::path::PathBuf; use std::process::Stdio; -use std::time::Duration; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout}; use tokio::sync::mpsc; -use tokio::task::JoinHandle; use crate::event::AppEvent; use crate::protocol::emit::format_user_message; @@ -49,10 +47,6 @@ impl SessionConfig { pub struct SessionRunner { child: Option, stdin: Option, - /// Handle to the background stdout reader task. When this completes, - /// the process has closed stdout (which happens after session state - /// is persisted). Used by `shutdown()` to detect save completion. - reader_handle: Option>, } impl SessionRunner { @@ -101,12 +95,11 @@ impl SessionRunner { } // Spawn stdout reader task (also collects stderr on exit) - let reader_handle = Self::spawn_reader(stdout, stderr, event_tx); + Self::spawn_reader(stdout, stderr, event_tx); Ok(Self { child: Some(child), stdin: Some(stdin), - reader_handle: Some(reader_handle), }) } @@ -116,7 +109,6 @@ impl SessionRunner { Self { child: None, stdin: None, - reader_handle: None, } } @@ -199,34 +191,18 @@ impl SessionRunner { Ok(()) } - /// Gracefully shut down the claude process. + /// Shut down the claude process after the current turn has completed. /// - /// Closes stdin to signal EOF, then waits for the stdout reader task to - /// finish. The reader completes when the CLI closes stdout, which happens - /// after session state has been persisted — so stdout closing is a - /// reliable signal that the session was saved. A generous timeout is kept - /// as a safety net for pathological cases (e.g., async task notifications - /// triggering an unexpected new turn after the result event). + /// Callers must ensure the event loop has already received a `Result` + /// event (confirming the turn is complete) before calling this. Closes + /// stdin and kills the process immediately to prevent the assistant from + /// starting a new turn (e.g., from async task notifications). pub async fn shutdown(&mut self) -> Result<()> { self.close_input(); - - // Wait for the stdout reader to finish. Once stdout closes, the CLI - // has finished all output including persisting session state. - let timed_out = if let Some(handle) = self.reader_handle.take() { - tokio::time::timeout(Duration::from_secs(30), handle) - .await - .is_err() - } else { - false - }; - if let Some(child) = &mut self.child { - if timed_out { - child.kill().await?; - } - // Reap the child process. After stdout closes the process has - // typically already exited, so this returns near-instantly. - let _ = child.wait().await; + // kill() sends SIGKILL and waits for the process to exit. + // The process may have already exited after stdin close. + child.kill().await.ok(); } Ok(()) } @@ -235,7 +211,7 @@ impl SessionRunner { stdout: ChildStdout, stderr: ChildStderr, event_tx: mpsc::UnboundedSender, - ) -> JoinHandle<()> { + ) { // Collect stderr in the background so it doesn't block the process. let stderr_handle = tokio::spawn(async move { let mut reader = BufReader::new(stderr); @@ -275,7 +251,7 @@ impl SessionRunner { } let _ = event_tx.send(AppEvent::ProcessExit(None)); - }) + }); } } From 01141099dfd42100f55c61515ddafb1ac53723c6 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Sat, 21 Feb 2026 12:14:00 +0000 Subject: [PATCH 4/4] Wait for process exit on shutdown instead of killing immediately MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous shutdown() was effectively the same as kill() on main — it closed stdin then immediately sent SIGKILL, giving claude no time to save session state. Now shutdown() closes stdin and waits up to 5 seconds for the process to exit naturally (allowing session file persistence), falling back to SIGKILL only on timeout. Also adds session file verification infrastructure to record-vcr: after recording, extract_session_ids() parses session IDs from the VCR recording and verify_session_files() checks that the corresponding session files exist under ~/.claude/projects/*/sessions/. Includes a find_session_file() helper in runner.rs and a new ralph_session_save test case TOML (needs recording with API key). Co-Authored-By: Claude Opus 4.6 --- src/bin/record_vcr.rs | 46 +++++++++++++++++++ src/commands/ralph.rs | 4 +- src/session/runner.rs | 45 +++++++++++++++--- .../ralph_session_save.toml | 7 +++ 4 files changed, 94 insertions(+), 8 deletions(-) create mode 100644 tests/cases/ralph/ralph_session_save/ralph_session_save.toml diff --git a/src/bin/record_vcr.rs b/src/bin/record_vcr.rs index ca09f13..e1a336c 100644 --- a/src/bin/record_vcr.rs +++ b/src/bin/record_vcr.rs @@ -6,6 +6,7 @@ use tokio::sync::{Semaphore, mpsc}; use tokio::task::LocalSet; use coven::commands; +use coven::session::runner::find_session_file; use coven::vcr::{DEFAULT_TEST_MODEL, Io, MultiStep, TestCase, TriggerController, VcrContext}; /// Writes to stderr with a `[prefix] ` prepended to each line. @@ -356,10 +357,55 @@ async fn record_case(case_dir: &Path, name: &str) -> Result<()> { } vcr.write_recording(&vcr_path)?; + + // Post-recording: verify session files were saved. + verify_session_files(&vcr_path, name)?; + std::fs::remove_dir_all(&tmp_dir).ok(); Ok(()) } +/// Extract session IDs from a VCR recording by scanning for result events +/// that contain a `session_id` field. +fn extract_session_ids(vcr_path: &Path) -> Result> { + let content = std::fs::read_to_string(vcr_path)?; + let mut ids = Vec::new(); + for line in content.lines() { + let entry: serde_json::Value = serde_json::from_str(line)?; + if let Some(result) = entry.get("result") + && let Some(ok) = result.get("Ok") + && let Some(claude) = ok.get("Claude").and_then(|c| c.get("Claude")) + && claude.get("type").and_then(|t| t.as_str()) == Some("result") + && let Some(id) = claude.get("session_id").and_then(|s| s.as_str()) + && !ids.contains(&id.to_string()) + { + ids.push(id.to_string()); + } + } + Ok(ids) +} + +/// After recording, verify that session files were saved for all sessions +/// in the recording. Logs results but does not fail the recording. +fn verify_session_files(vcr_path: &Path, name: &str) -> Result<()> { + let session_ids = extract_session_ids(vcr_path)?; + for id in &session_ids { + match find_session_file(id) { + Ok(Some(path)) => { + let meta = std::fs::metadata(&path)?; + eprintln!(" Session file for {name} ({id}): {} bytes", meta.len()); + } + Ok(None) => { + eprintln!(" WARNING: session file NOT found for {name} ({id})"); + } + Err(e) => { + eprintln!(" WARNING: session file check failed for {name}: {e}"); + } + } + } + Ok(()) +} + /// Record a multi-step test case. Steps are executed sequentially unless they /// share a `concurrent_group`, in which case they run concurrently. /// Each step writes its own VCR file: `__.vcr`. diff --git a/src/commands/ralph.rs b/src/commands/ralph.rs index b51b0c7..5ce917e 100644 --- a/src/commands/ralph.rs +++ b/src/commands/ralph.rs @@ -204,8 +204,8 @@ pub async fn ralph( ) .await?; // The event loop has received the Result event (turn complete). - // Kill the process to prevent async task notifications from - // triggering an invisible continuation. + // Shut down gracefully: close stdin and wait for the process to + // exit (saving session state). Falls back to SIGKILL on timeout. runner.shutdown().await?; match handle_session_outcome( diff --git a/src/session/runner.rs b/src/session/runner.rs index 2cd7376..330c9f4 100644 --- a/src/session/runner.rs +++ b/src/session/runner.rs @@ -1,5 +1,6 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::Stdio; +use std::time::Duration; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; @@ -195,14 +196,19 @@ impl SessionRunner { /// /// Callers must ensure the event loop has already received a `Result` /// event (confirming the turn is complete) before calling this. Closes - /// stdin and kills the process immediately to prevent the assistant from - /// starting a new turn (e.g., from async task notifications). + /// stdin and waits for the process to exit so it can persist session + /// state. Falls back to SIGKILL if the process doesn't exit within + /// the timeout (e.g., an async task notification triggered a new turn). pub async fn shutdown(&mut self) -> Result<()> { self.close_input(); if let Some(child) = &mut self.child { - // kill() sends SIGKILL and waits for the process to exit. - // The process may have already exited after stdin close. - child.kill().await.ok(); + match tokio::time::timeout(Duration::from_secs(5), child.wait()).await { + Ok(_) => {} // Process exited — session state saved. + Err(_) => { + // Timeout — kill to prevent invisible continuation. + child.kill().await.ok(); + } + } } Ok(()) } @@ -261,6 +267,33 @@ pub(crate) fn has_flag(args: &[String], flag: &str) -> bool { .any(|a| a == flag || a.starts_with(&format!("{flag}="))) } +/// Find the claude session file for the given session ID. +/// +/// Searches `~/.claude/projects/*/sessions/` for a file whose stem matches +/// the session ID. Returns the file path if found. +pub fn find_session_file(session_id: &str) -> Result> { + let home = std::env::var("HOME").context("HOME not set")?; + let base = Path::new(&home).join(".claude/projects"); + if !base.exists() { + return Ok(None); + } + for project_entry in std::fs::read_dir(&base)? { + let project_dir = project_entry?.path(); + let sessions_dir = project_dir.join("sessions"); + if !sessions_dir.exists() { + continue; + } + for entry in std::fs::read_dir(&sessions_dir)? { + let path = entry?.path(); + let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or(""); + if stem == session_id { + return Ok(Some(path)); + } + } + } + Ok(None) +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/cases/ralph/ralph_session_save/ralph_session_save.toml b/tests/cases/ralph/ralph_session_save/ralph_session_save.toml new file mode 100644 index 0000000..04dc483 --- /dev/null +++ b/tests/cases/ralph/ralph_session_save/ralph_session_save.toml @@ -0,0 +1,7 @@ +[ralph] +prompt = "What is 2+2? Answer briefly then stop." +break_tag = "break" + +# No files needed — this is a simple Q&A to verify session shutdown behavior. +# After the model answers and outputs , the session should be saved +# before the process is killed.