Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/bin/record_vcr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::sync::{Semaphore, mpsc};
use tokio::task::LocalSet;

use coven::commands;
use coven::session::runner::find_session_file;
use coven::vcr::{DEFAULT_TEST_MODEL, Io, MultiStep, TestCase, TriggerController, VcrContext};

/// Writes to stderr with a `[prefix] ` prepended to each line.
Expand Down Expand Up @@ -356,10 +357,55 @@ async fn record_case(case_dir: &Path, name: &str) -> Result<()> {
}

vcr.write_recording(&vcr_path)?;

// Post-recording: verify session files were saved.
verify_session_files(&vcr_path, name)?;

std::fs::remove_dir_all(&tmp_dir).ok();
Ok(())
}

/// Extract session IDs from a VCR recording by scanning for result events
/// that contain a `session_id` field.
fn extract_session_ids(vcr_path: &Path) -> Result<Vec<String>> {
let content = std::fs::read_to_string(vcr_path)?;
let mut ids = Vec::new();
for line in content.lines() {
let entry: serde_json::Value = serde_json::from_str(line)?;
if let Some(result) = entry.get("result")
&& let Some(ok) = result.get("Ok")
&& let Some(claude) = ok.get("Claude").and_then(|c| c.get("Claude"))
&& claude.get("type").and_then(|t| t.as_str()) == Some("result")
&& let Some(id) = claude.get("session_id").and_then(|s| s.as_str())
&& !ids.contains(&id.to_string())
{
ids.push(id.to_string());
}
}
Ok(ids)
}

/// After recording, verify that session files were saved for all sessions
/// in the recording. Logs results but does not fail the recording.
fn verify_session_files(vcr_path: &Path, name: &str) -> Result<()> {
let session_ids = extract_session_ids(vcr_path)?;
for id in &session_ids {
match find_session_file(id) {
Ok(Some(path)) => {
let meta = std::fs::metadata(&path)?;
eprintln!(" Session file for {name} ({id}): {} bytes", meta.len());
}
Ok(None) => {
eprintln!(" WARNING: session file NOT found for {name} ({id})");
}
Err(e) => {
eprintln!(" WARNING: session file check failed for {name}: {e}");
}
}
}
Ok(())
}

/// Record a multi-step test case. Steps are executed sequentially unless they
/// share a `concurrent_group`, in which case they run concurrently.
/// Each step writes its own VCR file: `<test>__<step>.vcr`.
Expand Down
7 changes: 4 additions & 3 deletions src/commands/ralph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ pub async fn ralph<W: Write>(
&features,
)
.await?;
// Kill the CLI process immediately to prevent async task
// notifications from triggering an invisible continuation.
runner.kill().await?;
// The event loop has received the Result event (turn complete).
// Shut down gracefully: close stdin and wait for the process to
// exit (saving session state). Falls back to SIGKILL on timeout.
runner.shutdown().await?;

match handle_session_outcome(
outcome,
Expand Down
8 changes: 3 additions & 5 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ pub async fn run<W: Write>(
}
}

runner.close_input();
let _ = runner.wait().await;
runner.shutdown().await?;
Ok(renderer.into_messages())
}

Expand All @@ -139,8 +138,7 @@ async fn handle_outcome<W: Write>(
{
FollowUpAction::Sent => Ok(true),
FollowUpAction::Interactive => {
runner.close_input();
let _ = runner.wait().await;
runner.shutdown().await?;
let Some(session_id) = state.session_id.take() else {
return Ok(false);
};
Expand All @@ -165,7 +163,7 @@ async fn handle_outcome<W: Write>(
resume_after_pause(session_id, base_session_cfg, runner, state, ctx).await
}
SessionOutcome::Reload { .. } => {
runner.kill().await?;
runner.shutdown().await?;
let Some(session_id) = state.session_id.take() else {
return Ok(false);
};
Expand Down
7 changes: 4 additions & 3 deletions src/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,9 +840,10 @@ async fn run_phase_session<W: Write>(
)
.await?;

// Kill the CLI process immediately to prevent async task
// notifications from triggering an invisible continuation.
runner.kill().await?;
// The event loop has received the Result event (turn complete).
// Kill the process to prevent async task notifications from
// triggering an invisible continuation.
runner.shutdown().await?;

match outcome {
SessionOutcome::Completed { result_text } => {
Expand Down
7 changes: 4 additions & 3 deletions src/session/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,10 @@ async fn execute_fork<W: Write>(
bail!("fork detected but fork_config is None");
};

// Kill the parent CLI process to prevent async task notifications
// from triggering an invisible continuation while fork children run.
runner.kill().await?;
// The event loop has received the Result event (turn complete, with
// fork tags). Kill the parent before fork children run to prevent
// async task notifications from triggering an invisible continuation.
runner.shutdown().await?;

let msg = fork::run_fork(&session_id, tasks, fork_cfg, renderer, vcr).await?;

Expand Down
53 changes: 51 additions & 2 deletions src/session/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::time::Duration;

use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -183,14 +184,35 @@ impl SessionRunner {
}
}

/// Kill the claude process. No-op on stubs.
/// Kill the claude process immediately (SIGKILL). No-op on stubs.
pub async fn kill(&mut self) -> Result<()> {
if let Some(child) = &mut self.child {
child.kill().await?;
}
Ok(())
}

/// Shut down the claude process after the current turn has completed.
///
/// Callers must ensure the event loop has already received a `Result`
/// event (confirming the turn is complete) before calling this. Closes
/// stdin and waits for the process to exit so it can persist session
/// state. Falls back to SIGKILL if the process doesn't exit within
/// the timeout (e.g., an async task notification triggered a new turn).
pub async fn shutdown(&mut self) -> Result<()> {
self.close_input();
if let Some(child) = &mut self.child {
match tokio::time::timeout(Duration::from_secs(5), child.wait()).await {
Ok(_) => {} // Process exited — session state saved.
Err(_) => {
// Timeout — kill to prevent invisible continuation.
child.kill().await.ok();
}
}
}
Ok(())
}

fn spawn_reader(
stdout: ChildStdout,
stderr: ChildStderr,
Expand Down Expand Up @@ -245,6 +267,33 @@ pub(crate) fn has_flag(args: &[String], flag: &str) -> bool {
.any(|a| a == flag || a.starts_with(&format!("{flag}=")))
}

/// Find the claude session file for the given session ID.
///
/// Searches `~/.claude/projects/*/sessions/` for a file whose stem matches
/// the session ID. Returns the file path if found.
pub fn find_session_file(session_id: &str) -> Result<Option<PathBuf>> {
let home = std::env::var("HOME").context("HOME not set")?;
let base = Path::new(&home).join(".claude/projects");
if !base.exists() {
return Ok(None);
}
for project_entry in std::fs::read_dir(&base)? {
let project_dir = project_entry?.path();
let sessions_dir = project_dir.join("sessions");
if !sessions_dir.exists() {
continue;
}
for entry in std::fs::read_dir(&sessions_dir)? {
let path = entry?.path();
let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
if stem == session_id {
return Ok(Some(path));
}
}
}
Ok(None)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
7 changes: 7 additions & 0 deletions tests/cases/ralph/ralph_session_save/ralph_session_save.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[ralph]
prompt = "What is 2+2? Answer briefly then stop."
break_tag = "break"

# No files needed — this is a simple Q&A to verify session shutdown behavior.
# After the model answers and outputs <break>, the session should be saved
# before the process is killed.