diff --git a/src/agents/cursor.rs b/src/agents/cursor.rs index 6a0429b1..e4f28626 100644 --- a/src/agents/cursor.rs +++ b/src/agents/cursor.rs @@ -1,24 +1,45 @@ //! Cursor agent log discovery. //! -//! Cursor stores chat sessions in two places: -//! - VS Code style chatSessions: -//! - macOS: ~/Library/Application Support/Cursor/User/workspaceStorage/*/chatSessions/*.json -//! - Linux: ~/.config/Cursor/User/workspaceStorage/*/chatSessions/*.json -//! - Windows: %APPDATA%\\Cursor\\User\\workspaceStorage\\*\\chatSessions\\*.json -//! - Cursor projects: -//! ~/.cursor/projects//*.{json,txt} +//! Cursor stores sessions in a few different local formats: +//! - Legacy/VS Code style `chatSessions/*.json` +//! - Agent transcript JSONL files under `~/.cursor/projects/*/agent-transcripts/**` +//! - Desktop composer state split across `workspaceStorage/*` and `globalStorage/state.vscdb` -use std::path::{Path, PathBuf}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::path::{MAIN_SEPARATOR, Path, PathBuf}; +use std::time::{Duration, UNIX_EPOCH}; + +use async_trait::async_trait; +use rusqlite::{Connection, OpenFlags, params}; +use serde_json::{Value, json}; use super::{ AgentExplorer, SessionLog, SessionSource, app_config_dir_in, find_chat_session_dirs, home_dir, recent_files_with_exts, }; -use crate::scanner::AgentType; -use async_trait::async_trait; +use crate::scanner::{self, AgentType}; + +const CURSOR_PROJECT_SKIP_DIRS: &[&str] = &["mcps", "agent-tools", "terminals"]; +const CURSOR_PATH_DECODE_MAX_CANDIDATES: usize = 65_536; -const CURSOR_PROJECT_SKIP_DIRS: &[&str] = &["mcps"]; +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum CursorSourcePriority { + LegacyChatSession = 0, + AgentTranscript = 1, + StoreDb = 2, + DesktopComposer = 3, +} + +#[derive(Debug, Clone)] +struct CursorDiscoveredSession { + canonical_id: Option, + workspace_hint: Option, + content_len: usize, + priority: CursorSourcePriority, + log: SessionLog, +} +#[allow(dead_code)] /// Return all Cursor log directories for use by the post-commit hook. pub async fn log_dirs() -> Vec { let home = match home_dir() { @@ -28,6 +49,7 @@ pub async fn log_dirs() -> Vec { log_dirs_in(&home).await } +#[allow(dead_code)] /// Return all Cursor log directories for backfill (not repo-scoped). pub async fn all_log_dirs() -> Vec { log_dirs().await @@ -38,77 +60,1211 @@ pub struct CursorExplorer; #[async_trait] impl AgentExplorer for CursorExplorer { async fn discover_recent(&self, now: i64, since_secs: i64) -> Vec { - let dirs = all_log_dirs().await; - recent_files_with_exts(&dirs, now, since_secs, &["json", "txt"]) - .await - .into_iter() - .map(|file| SessionLog { + let home = match home_dir() { + Some(h) => h, + None => return Vec::new(), + }; + discover_recent_in(&home, now, since_secs).await + } +} + +async fn discover_recent_in(home: &Path, now: i64, since_secs: i64) -> Vec { + let mut discovered: HashMap = HashMap::new(); + + let ws_root = app_config_dir_in("Cursor", home) + .join("User") + .join("workspaceStorage"); + let chat_session_dirs = find_chat_session_dirs(&ws_root).await; + for file in recent_files_with_exts(&chat_session_dirs, now, since_secs, &["json"]).await { + let metadata = scanner::parse_session_metadata(&file.path).await; + let candidate = CursorDiscoveredSession { + canonical_id: metadata.session_id, + workspace_hint: metadata.cwd, + content_len: 0, + priority: CursorSourcePriority::LegacyChatSession, + log: SessionLog { agent_type: AgentType::Cursor, source: SessionSource::File(file.path), updated_at: Some(file.mtime_epoch), - }) - .collect() + }, + }; + merge_cursor_session(&mut discovered, candidate); + } + + for candidate in discover_agent_transcripts(home, now, since_secs).await { + merge_cursor_session(&mut discovered, candidate); + } + + for candidate in discover_store_db_sessions(home, now, since_secs).await { + merge_cursor_session(&mut discovered, candidate); } + + let desktop_logs = discover_desktop_sessions(home, now, since_secs).await; + for candidate in desktop_logs { + merge_cursor_session(&mut discovered, candidate); + } + + let mut logs = discovered + .into_values() + .map(|candidate| candidate.log) + .collect::>(); + + logs.sort_by(|a, b| { + a.updated_at + .unwrap_or_default() + .cmp(&b.updated_at.unwrap_or_default()) + .then_with(|| a.source_label().cmp(&b.source_label())) + }); + logs } +#[allow(dead_code)] async fn log_dirs_in(home: &Path) -> Vec { let mut dirs = Vec::new(); - // VS Code style chatSessions. let ws_root = app_config_dir_in("Cursor", home) .join("User") .join("workspaceStorage"); dirs.extend(find_chat_session_dirs(&ws_root).await); - // Cursor projects directory (scan recursively for json/txt files). let projects_dir = home.join(".cursor").join("projects"); - collect_cursor_project_dirs(&projects_dir, &mut dirs).await; + dirs.extend(collect_agent_transcript_dirs(&projects_dir).await); dirs } -async fn collect_cursor_project_dirs(root: &Path, results: &mut Vec) { +async fn collect_agent_transcript_dirs(root: &Path) -> Vec { + let mut out = Vec::new(); let mut stack = vec![root.to_path_buf()]; + while let Some(dir) = stack.pop() { let mut entries = match tokio::fs::read_dir(&dir).await { Ok(entries) => entries, Err(_) => continue, }; - let mut has_match = false; while let Ok(Some(entry)) = entries.next_entry().await { let path = entry.path(); let file_type = match entry.file_type().await { Ok(file_type) => file_type, Err(_) => continue, }; - if file_type.is_dir() { - let Some(name) = path.file_name().and_then(|name| name.to_str()) else { + + if !file_type.is_dir() { + continue; + } + + let Some(name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + + if CURSOR_PROJECT_SKIP_DIRS.contains(&name) { + continue; + } + + if name == "agent-transcripts" { + out.push(path); + continue; + } + + stack.push(path); + } + } + + out +} + +async fn discover_agent_transcripts( + home: &Path, + now: i64, + since_secs: i64, +) -> Vec { + let cutoff = now - since_secs; + let projects_dir = home.join(".cursor").join("projects"); + let transcript_dirs = collect_agent_transcript_dirs(&projects_dir).await; + let workspace_map = collect_workspace_paths(home).await; + let mut workspace_decode_cache: HashMap> = HashMap::new(); + let mut out = Vec::new(); + + for dir in transcript_dirs { + let mut stack = vec![dir.clone()]; + while let Some(current) = stack.pop() { + let mut entries = match tokio::fs::read_dir(¤t).await { + Ok(entries) => entries, + Err(_) => continue, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + let file_type = match entry.file_type().await { + Ok(file_type) => file_type, + Err(_) => continue, + }; + + if file_type.is_dir() { + stack.push(path); + continue; + } + if !file_type.is_file() { continue; + } + if path.extension().and_then(|ext| ext.to_str()) != Some("jsonl") { + continue; + } + + let metadata = match tokio::fs::metadata(&path).await { + Ok(metadata) => metadata, + Err(_) => continue, }; - if CURSOR_PROJECT_SKIP_DIRS.contains(&name) { + let mtime_epoch = match metadata.modified() { + Ok(mtime) => match mtime.duration_since(UNIX_EPOCH) { + Ok(d) => d.as_secs() as i64, + Err(_) => continue, + }, + Err(_) => continue, + }; + if mtime_epoch < cutoff { continue; } + + let Some(session_id) = path.file_stem().and_then(|stem| stem.to_str()) else { + continue; + }; + let workspace_key = path + .strip_prefix(&projects_dir) + .ok() + .and_then(|relative| relative.components().next()) + .map(|component| component.as_os_str().to_string_lossy().to_string()); + let cwd = if let Some(workspace_key) = workspace_key.as_deref() { + if let Some(path) = workspace_map.get(workspace_key).cloned() { + Some(path) + } else { + if let Some(cached) = workspace_decode_cache.get(workspace_key) { + cached.clone() + } else { + let decoded = decode_cursor_workspace_path(workspace_key).await; + workspace_decode_cache + .insert(workspace_key.to_string(), decoded.clone()); + decoded + } + } + } else { + None + }; + let content = match tokio::fs::read_to_string(&path).await { + Ok(content) => content, + Err(_) => continue, + }; + let label = path.to_string_lossy().to_string(); + let source = SessionSource::Inline { + label, + content: prepend_cursor_metadata_line( + &content, + session_id, + cwd.as_deref(), + None, + "agent_transcript", + ), + }; + out.push(CursorDiscoveredSession { + canonical_id: Some(session_id.to_string()), + workspace_hint: cwd, + content_len: content.len(), + priority: CursorSourcePriority::AgentTranscript, + log: SessionLog { + agent_type: AgentType::Cursor, + source, + updated_at: Some(mtime_epoch), + }, + }); + } + } + } + + out +} + +async fn collect_workspace_paths(home: &Path) -> HashMap { + let ws_root = app_config_dir_in("Cursor", home) + .join("User") + .join("workspaceStorage"); + let mut out = HashMap::new(); + let mut entries = match tokio::fs::read_dir(&ws_root).await { + Ok(entries) => entries, + Err(_) => return out, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let workspace_dir = entry.path(); + if !entry + .file_type() + .await + .map(|ft| ft.is_dir()) + .unwrap_or(false) + { + continue; + } + let workspace_json = workspace_dir.join("workspace.json"); + let Ok(content) = tokio::fs::read_to_string(&workspace_json).await else { + continue; + }; + let Some(path) = parse_workspace_json_path(&content) else { + continue; + }; + let encoded_key = encode_cursor_project_workspace_key(Path::new(&path)); + out.entry(encoded_key).or_insert(path); + } + + out +} + +async fn discover_store_db_sessions( + home: &Path, + now: i64, + since_secs: i64, +) -> Vec { + let chats_root = home.join(".cursor").join("chats"); + if !tokio::fs::try_exists(&chats_root).await.unwrap_or(false) { + return Vec::new(); + } + + let cutoff = now - since_secs; + let chats_root = chats_root.clone(); + tokio::task::spawn_blocking(move || query_cursor_store_db_sessions(&chats_root, cutoff)) + .await + .unwrap_or_default() +} + +async fn discover_desktop_sessions( + home: &Path, + now: i64, + since_secs: i64, +) -> Vec { + let ws_root = app_config_dir_in("Cursor", home) + .join("User") + .join("workspaceStorage"); + let global_db = app_config_dir_in("Cursor", home) + .join("User") + .join("globalStorage") + .join("state.vscdb"); + + if !global_db.exists() || !ws_root.exists() { + return Vec::new(); + } + + let ws_root = ws_root.clone(); + let global_db = global_db.clone(); + tokio::task::spawn_blocking(move || { + query_cursor_desktop_sessions(&ws_root, &global_db, now, since_secs) + }) + .await + .unwrap_or_default() +} + +fn query_cursor_desktop_sessions( + ws_root: &Path, + global_db: &Path, + now: i64, + since_secs: i64, +) -> Vec { + let cutoff_ms = (now - since_secs) * 1000; + let global = match open_sqlite_readonly(global_db) { + Ok(conn) => conn, + Err(_) => return Vec::new(), + }; + let mut logs = Vec::new(); + let workspace_dirs = match std::fs::read_dir(ws_root) { + Ok(entries) => entries, + Err(_) => return logs, + }; + + for workspace_entry in workspace_dirs.flatten() { + let workspace_dir = workspace_entry.path(); + if !workspace_dir.is_dir() { + continue; + } + + let workspace_db = workspace_dir.join("state.vscdb"); + let workspace_json = workspace_dir.join("workspace.json"); + if !workspace_db.exists() || !workspace_json.exists() { + continue; + } + + let cwd = std::fs::read_to_string(&workspace_json) + .ok() + .and_then(|content| parse_workspace_json_path(&content)); + + let workspace = match open_sqlite_readonly(&workspace_db) { + Ok(conn) => conn, + Err(_) => continue, + }; + let Some(composer_data_raw) = sqlite_query_string( + &workspace, + "SELECT value FROM ItemTable WHERE key = 'composer.composerData'", + params![], + ) else { + continue; + }; + let composers = parse_workspace_composers(&composer_data_raw); + for composer in composers { + if composer.last_updated_at < cutoff_ms { + continue; + } + let Some(content) = build_desktop_cursor_session_content( + &global, + &composer.composer_id, + cwd.as_deref(), + ) else { + continue; + }; + logs.push(CursorDiscoveredSession { + canonical_id: Some(composer.composer_id.clone()), + workspace_hint: cwd.clone(), + content_len: content.len(), + priority: CursorSourcePriority::DesktopComposer, + log: SessionLog { + agent_type: AgentType::Cursor, + source: SessionSource::Inline { + label: format!( + "cursor-desktop:{}:{}", + workspace_dir + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("workspace"), + composer.composer_id + ), + content, + }, + updated_at: Some(composer.last_updated_at / 1000), + }, + }); + } + } + + logs +} + +fn open_sqlite_readonly(path: &Path) -> rusqlite::Result { + let conn = Connection::open_with_flags( + path, + OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, + )?; + let _ = conn.busy_timeout(Duration::from_millis(200)); + Ok(conn) +} + +fn sqlite_query_string

(conn: &Connection, sql: &str, params: P) -> Option +where + P: rusqlite::Params, +{ + conn.query_row(sql, params, |row| row.get::<_, String>(0)) + .ok() +} + +fn parse_workspace_json_path(content: &str) -> Option { + let value = serde_json::from_str::(content).ok()?; + value + .get("folder") + .or_else(|| value.get("path")) + .and_then(Value::as_str) + .map(normalize_cursor_workspace_path) +} + +fn encode_cursor_project_workspace_key(path: &Path) -> String { + use std::path::Component; + + let mut encoded = String::new(); + for component in path.components() { + match component { + Component::Prefix(prefix) => { + let raw = prefix.as_os_str().to_string_lossy(); + if let Some(drive) = raw.strip_suffix(':') { + encoded.push_str(drive); + encoded.push_str("--"); + } else if !raw.is_empty() { + if !encoded.is_empty() && !encoded.ends_with('-') { + encoded.push('-'); + } + encoded.push_str(&raw.replace(['/', '\\', ':'], "-")); + } + } + Component::RootDir => {} + Component::Normal(segment) => { + let segment = segment.to_string_lossy(); + if segment.is_empty() { + continue; + } + if !encoded.is_empty() && !encoded.ends_with('-') { + encoded.push('-'); + } + encoded.push_str(&segment); + } + Component::CurDir | Component::ParentDir => {} + } + } + encoded +} + +fn normalize_cursor_workspace_path(path: &str) -> String { + let trimmed = path.strip_prefix("file://").unwrap_or(path); + if cfg!(target_os = "windows") && trimmed.starts_with('/') && trimmed.len() > 3 { + trimmed[1..].to_string() + } else { + trimmed.to_string() + } +} + +#[derive(Debug, Clone)] +struct WorkspaceComposerHead { + composer_id: String, + last_updated_at: i64, +} + +fn parse_workspace_composers(raw: &str) -> Vec { + let value = match serde_json::from_str::(raw) { + Ok(value) => value, + Err(_) => return Vec::new(), + }; + value + .get("allComposers") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter_map(|entry| { + Some(WorkspaceComposerHead { + composer_id: entry.get("composerId")?.as_str()?.to_string(), + last_updated_at: entry + .get("lastUpdatedAt") + .and_then(Value::as_i64) + .or_else(|| entry.get("createdAt").and_then(Value::as_i64))?, + }) + }) + .collect() +} + +fn build_desktop_cursor_session_content( + global: &Connection, + composer_id: &str, + cwd: Option<&str>, +) -> Option { + let composer_raw = sqlite_query_string( + global, + "SELECT value FROM cursorDiskKV WHERE key = ?1", + params![format!("composerData:{composer_id}")], + )?; + let composer = serde_json::from_str::(&composer_raw).ok()?; + let model = composer + .pointer("/modelConfig/modelName") + .or_else(|| composer.pointer("/lastUsedModel/name")) + .or_else(|| composer.pointer("/model/name")) + .and_then(Value::as_str) + .filter(|value| !value.trim().is_empty()); + + let mut header_ids = composer + .get("fullConversationHeadersOnly") + .and_then(Value::as_array) + .map(|headers| { + headers + .iter() + .filter_map(|header| header.get("bubbleId").and_then(Value::as_str)) + .map(ToOwned::to_owned) + .collect::>() + }) + .unwrap_or_default(); + + header_ids.extend(extract_bubble_ids_from_value( + composer.get("conversation").unwrap_or(&Value::Null), + )); + header_ids.extend(extract_bubble_ids_from_value( + composer.get("conversationMap").unwrap_or(&Value::Null), + )); + dedupe_preserving_order(&mut header_ids); + + let mut bubbles = query_cursor_bubbles(global, composer_id); + if header_ids.is_empty() { + header_ids.extend(bubbles.keys().cloned()); + header_ids.sort(); + } + + let mut lines = Vec::new(); + lines.push( + json!({ + "sessionId": composer_id, + "session_id": composer_id, + "cwd": cwd, + "workspacePath": cwd, + "model": model, + "createdAt": composer.get("createdAt").and_then(Value::as_i64), + "lastUpdatedAt": composer.get("lastUpdatedAt").and_then(Value::as_i64), + "cursor_source": "desktop_state_vscdb", + "workspaceId": composer.get("workspaceId").and_then(Value::as_str), + }) + .to_string(), + ); + + for bubble_id in header_ids { + let Some(bubble) = bubbles.remove(&bubble_id) else { + continue; + }; + let Some(role) = cursor_role_for_value(&bubble) else { + continue; + }; + let Some(content) = extract_cursor_bubble_text(&bubble) else { + continue; + }; + lines.push( + json!({ + "role": role, + "content": content, + "timestamp": bubble.get("createdAt").and_then(Value::as_str), + "model": bubble.pointer("/modelInfo/modelName").and_then(Value::as_str).or(model), + "bubbleId": bubble_id, + }) + .to_string(), + ); + } + + if lines.len() == 1 { + None + } else { + Some(lines.join("\n")) + } +} + +fn query_cursor_bubbles(global: &Connection, composer_id: &str) -> BTreeMap { + let mut out = BTreeMap::new(); + let Ok(mut stmt) = + global.prepare("SELECT key, value FROM cursorDiskKV WHERE key LIKE ?1 ORDER BY key") + else { + return out; + }; + let pattern = format!("bubbleId:{composer_id}:%"); + let Ok(rows) = stmt.query_map(params![pattern], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) + }) else { + return out; + }; + + for row in rows.flatten() { + let Some(bubble_id) = row.0.rsplit(':').next() else { + continue; + }; + let Ok(value) = serde_json::from_str::(&row.1) else { + continue; + }; + out.insert(bubble_id.to_string(), value); + } + out +} + +fn cursor_role_for_type(kind: Option) -> Option<&'static str> { + match kind { + Some(1) => Some("user"), + Some(2) => Some("assistant"), + _ => None, + } +} + +fn cursor_role_for_value(value: &Value) -> Option<&'static str> { + cursor_role_for_type(value.get("type").and_then(Value::as_i64)).or_else(|| { + value + .get("role") + .and_then(Value::as_str) + .and_then(|role| match role { + "user" => Some("user"), + "assistant" => Some("assistant"), + _ => None, + }) + }) +} + +fn extract_cursor_bubble_text(bubble: &Value) -> Option { + for key in ["text", "markdown", "content", "description"] { + if let Some(text) = bubble.get(key).and_then(Value::as_str) { + let trimmed = text.trim(); + if !trimmed.is_empty() { + return Some(trimmed.to_string()); + } + } + } + + for pointer in [ + "/toolFormerData/rawArgs", + "/toolFormerData/result", + "/toolFormerData/output", + "/toolResult/output", + "/errorDetails/message", + ] { + if let Some(text) = bubble.pointer(pointer).and_then(Value::as_str) { + let trimmed = text.trim(); + if !trimmed.is_empty() { + return Some(trimmed.to_string()); + } + } + } + + if let Some(rich_text) = bubble.get("richText").and_then(Value::as_str) + && let Ok(parsed) = serde_json::from_str::(rich_text) + { + let mut fragments = Vec::new(); + collect_rich_text_fragments(&parsed, &mut fragments); + let joined = fragments.join(" ").trim().to_string(); + if !joined.is_empty() { + return Some(joined); + } + } + + None +} + +fn extract_bubble_ids_from_value(value: &Value) -> Vec { + let mut out = Vec::new(); + collect_bubble_ids(value, &mut out); + out +} + +fn dedupe_preserving_order(values: &mut Vec) { + let mut seen = HashSet::new(); + values.retain(|value| seen.insert(value.clone())); +} + +fn collect_bubble_ids(value: &Value, out: &mut Vec) { + match value { + Value::Object(map) => { + for key in ["bubbleId", "id"] { + if let Some(id) = map.get(key).and_then(Value::as_str) + && !id.trim().is_empty() + { + out.push(id.to_string()); + } + } + for child in map.values() { + collect_bubble_ids(child, out); + } + } + Value::Array(items) => { + for item in items { + collect_bubble_ids(item, out); + } + } + _ => {} + } +} + +fn collect_rich_text_fragments(value: &Value, out: &mut Vec) { + match value { + Value::Object(map) => { + if let Some(text) = map.get("text").and_then(Value::as_str) { + let trimmed = text.trim(); + if !trimmed.is_empty() { + out.push(trimmed.to_string()); + } + } + if let Some(children) = map.get("children").and_then(Value::as_array) { + for child in children { + collect_rich_text_fragments(child, out); + } + } + } + Value::Array(items) => { + for item in items { + collect_rich_text_fragments(item, out); + } + } + _ => {} + } +} + +fn prepend_cursor_metadata_line( + content: &str, + session_id: &str, + cwd: Option<&str>, + model: Option<&str>, + source_kind: &str, +) -> String { + let metadata = json!({ + "sessionId": session_id, + "session_id": session_id, + "cwd": cwd, + "workspacePath": cwd, + "model": model, + "cursor_source": source_kind, + }) + .to_string(); + if content.is_empty() { + metadata + } else { + format!("{metadata}\n{content}") + } +} + +async fn decode_cursor_workspace_path(encoded: &str) -> Option { + let decoded = decode_cursor_workspace_path_buf(encoded).await?; + Some(decoded.to_string_lossy().to_string()) +} + +fn initial_cursor_path_state(trimmed: &str) -> (PathBuf, usize) { + if MAIN_SEPARATOR == '\\' { + let bytes = trimmed.as_bytes(); + if bytes.len() >= 3 && bytes[0].is_ascii_alphabetic() { + let drive = char::from(bytes[0]); + if bytes[1] == b'-' && bytes[2] == b'-' { + return (PathBuf::from(format!("{drive}:{MAIN_SEPARATOR}")), 3); + } + } + } + + (PathBuf::from(MAIN_SEPARATOR.to_string()), 0) +} + +async fn decode_cursor_workspace_path_buf(encoded: &str) -> Option { + if encoded.trim().is_empty() { + return None; + } + + let mut stack = vec![initial_cursor_path_state(encoded)]; + let mut checked_candidates = 0usize; + while let Some((prefix, start_idx)) = stack.pop() { + if start_idx >= encoded.len() { + return Some(prefix); + } + + let mut next = Vec::new(); + let bytes = encoded.as_bytes(); + for end_idx in (start_idx + 1..=encoded.len()).rev() { + if end_idx != encoded.len() && bytes[end_idx] != b'-' { + continue; + } + + checked_candidates += 1; + if checked_candidates > CURSOR_PATH_DECODE_MAX_CANDIDATES { + return None; + } + + let segment = &encoded[start_idx..end_idx]; + if segment.is_empty() { + continue; + } + + let candidate = prefix.join(segment); + if tokio::fs::try_exists(&candidate).await.unwrap_or(false) { + let next_idx = if end_idx == encoded.len() { + end_idx + } else { + end_idx + 1 + }; + next.push((candidate, next_idx)); + } + } + + stack.extend(next); + } + + None +} + +fn merge_cursor_session( + discovered: &mut HashMap, + incoming: CursorDiscoveredSession, +) { + let key = cursor_discovered_key(&incoming); + match discovered.get(&key) { + None => { + discovered.insert(key, incoming); + } + Some(existing) => { + if should_replace_cursor_candidate(existing, &incoming) { + discovered.insert(key, incoming); + } + } + } +} + +fn cursor_discovered_key(candidate: &CursorDiscoveredSession) -> String { + match (&candidate.canonical_id, &candidate.workspace_hint) { + (Some(session_id), Some(cwd)) => format!("{session_id}::{cwd}"), + (Some(session_id), None) => format!("{session_id}::"), + (None, Some(cwd)) => format!("::<{cwd}>"), + (None, None) => candidate.log.source_label(), + } +} + +fn should_replace_cursor_candidate( + existing: &CursorDiscoveredSession, + incoming: &CursorDiscoveredSession, +) -> bool { + if incoming.priority != existing.priority { + return incoming.priority > existing.priority; + } + if incoming.content_len != existing.content_len { + return incoming.content_len > existing.content_len; + } + incoming.log.updated_at.unwrap_or_default() > existing.log.updated_at.unwrap_or_default() +} + +fn query_cursor_store_db_sessions(chats_root: &Path, cutoff: i64) -> Vec { + let mut out = Vec::new(); + let mut stack = vec![chats_root.to_path_buf()]; + while let Some(path) = stack.pop() { + let entries = match std::fs::read_dir(&path) { + Ok(entries) => entries, + Err(_) => continue, + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { stack.push(path); - } else if file_type.is_file() && !has_match { - let has_supported_ext = - path.extension() - .and_then(|ext| ext.to_str()) - .is_some_and(|ext| { - ["json", "txt"] - .iter() - .any(|allowed| allowed.eq_ignore_ascii_case(ext)) - }); - if has_supported_ext { - has_match = true; + continue; + } + if path.file_name().and_then(|name| name.to_str()) != Some("store.db") { + continue; + } + let Some(candidate) = build_cursor_store_db_session(&path) else { + continue; + }; + if candidate.log.updated_at.unwrap_or_default() < cutoff { + continue; + } + out.push(candidate); + } + } + out +} + +fn build_cursor_store_db_session(path: &Path) -> Option { + let conn = open_sqlite_readonly(path).ok()?; + let mut records = Vec::new(); + let mut meta_values = Vec::new(); + for table in ["meta", "blobs"] { + if !sqlite_table_exists(&conn, table) { + continue; + } + let rows = sqlite_table_string_rows(&conn, table); + meta_values.extend(rows.iter().map(|(_, value)| value.clone())); + records.extend(rows); + } + if records.is_empty() { + return None; + } + + let metadata = parse_cursor_store_db_metadata(&meta_values)?; + let content = build_cursor_store_db_content(&metadata, &records)?; + let file_mtime = std::fs::metadata(path) + .ok()? + .modified() + .ok()? + .duration_since(UNIX_EPOCH) + .ok()? + .as_secs() as i64; + let metadata_updated_at = metadata + .updated_at + .or(metadata.created_at) + .map(normalize_epoch_guess) + .unwrap_or_default(); + let updated_at = file_mtime.max(metadata_updated_at); + Some(CursorDiscoveredSession { + canonical_id: Some(metadata.session_id.clone()), + workspace_hint: metadata.cwd.clone(), + content_len: content.len(), + priority: CursorSourcePriority::StoreDb, + log: SessionLog { + agent_type: AgentType::Cursor, + source: SessionSource::Inline { + label: format!("cursor-store:{}", path.display()), + content, + }, + updated_at: Some(updated_at), + }, + }) +} + +#[derive(Debug, Clone)] +struct CursorStoreDbMetadata { + session_id: String, + cwd: Option, + model: Option, + created_at: Option, + updated_at: Option, +} + +fn parse_cursor_store_db_metadata(values: &[String]) -> Option { + let mut session_id = None; + let mut cwd = None; + let mut model = None; + let mut created_at = None; + let mut updated_at = None; + + for value in values { + let Ok(json) = serde_json::from_str::(value) else { + continue; + }; + session_id = session_id + .or_else(|| find_string_in_value(&json, &["agentId", "sessionId", "composerId", "id"])); + cwd = cwd.or_else(|| { + find_string_in_value( + &json, + &[ + "workspacePath", + "cwd", + "workingDirectory", + "working_directory", + "path", + ], + ) + }); + model = + model.or_else(|| find_string_in_value(&json, &["lastUsedModel", "modelName", "model"])); + created_at = created_at.or_else(|| find_i64_in_value(&json, &["createdAt", "created_at"])); + updated_at = updated_at + .or_else(|| find_i64_in_value(&json, &["lastUpdatedAt", "updatedAt", "updated_at"])); + } + + Some(CursorStoreDbMetadata { + session_id: session_id?, + cwd, + model, + created_at, + updated_at, + }) +} + +fn build_cursor_store_db_content( + metadata: &CursorStoreDbMetadata, + records: &[(String, String)], +) -> Option { + let mut lines = vec![ + json!({ + "sessionId": metadata.session_id, + "session_id": metadata.session_id, + "cwd": metadata.cwd, + "workspacePath": metadata.cwd, + "model": metadata.model, + "createdAt": metadata.created_at, + "lastUpdatedAt": metadata.updated_at, + "cursor_source": "store_db", + }) + .to_string(), + ]; + let mut seen = HashSet::new(); + for (_, value) in records { + let Ok(json) = serde_json::from_str::(value) else { + continue; + }; + for message in extract_cursor_message_candidates(&json) { + let fingerprint = format!("{}:{}", message.role, message.content); + if !seen.insert(fingerprint) { + continue; + } + lines.push( + json!({ + "role": message.role, + "content": message.content, + "timestamp": message.timestamp, + "model": message.model, + }) + .to_string(), + ); + } + } + if lines.len() <= 1 { + None + } else { + Some(lines.join("\n")) + } +} + +#[derive(Debug, Clone)] +struct CursorMessageCandidate { + role: String, + content: String, + timestamp: Option, + model: Option, +} + +fn extract_cursor_message_candidates(value: &Value) -> Vec { + let mut out = Vec::new(); + collect_cursor_message_candidates(value, &mut out); + out +} + +fn collect_cursor_message_candidates(value: &Value, out: &mut Vec) { + match value { + Value::Object(map) => { + let role = map + .get("role") + .and_then(Value::as_str) + .or_else(|| map.get("type").and_then(Value::as_str)); + let content = map + .get("content") + .and_then(Value::as_str) + .or_else(|| map.get("text").and_then(Value::as_str)) + .or_else(|| map.get("markdown").and_then(Value::as_str)); + if let (Some(role), Some(content)) = (role, content) + && matches!(role, "user" | "assistant") + && !content.trim().is_empty() + { + out.push(CursorMessageCandidate { + role: role.to_string(), + content: content.trim().to_string(), + timestamp: map + .get("createdAt") + .and_then(Value::as_i64) + .or_else(|| map.get("timestamp").and_then(Value::as_i64)), + model: map + .get("model") + .and_then(Value::as_str) + .map(ToOwned::to_owned) + .or_else(|| { + value + .pointer("/modelInfo/modelName") + .and_then(Value::as_str) + .map(ToOwned::to_owned) + }), + }); + } + for child in map.values() { + collect_cursor_message_candidates(child, out); + } + } + Value::Array(items) => { + for item in items { + collect_cursor_message_candidates(item, out); + } + } + _ => {} + } +} + +fn find_string_in_value(value: &Value, keys: &[&str]) -> Option { + match value { + Value::Object(map) => { + for key in keys { + if let Some(string) = map.get(*key).and_then(Value::as_str) + && !string.trim().is_empty() + { + return Some(string.to_string()); + } + } + for child in map.values() { + if let Some(found) = find_string_in_value(child, keys) { + return Some(found); + } + } + None + } + Value::Array(items) => items + .iter() + .find_map(|item| find_string_in_value(item, keys)), + _ => None, + } +} + +fn find_i64_in_value(value: &Value, keys: &[&str]) -> Option { + match value { + Value::Object(map) => { + for key in keys { + if let Some(number) = map.get(*key).and_then(Value::as_i64) { + return Some(number); + } + } + for child in map.values() { + if let Some(found) = find_i64_in_value(child, keys) { + return Some(found); } } + None } + Value::Array(items) => items.iter().find_map(|item| find_i64_in_value(item, keys)), + _ => None, + } +} - if has_match { - results.push(dir); +fn normalize_epoch_guess(value: i64) -> i64 { + if value > 100_000_000_000 { + value / 1000 + } else { + value + } +} + +fn sqlite_table_exists(conn: &Connection, table: &str) -> bool { + conn.query_row( + "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 LIMIT 1", + params![table], + |_| Ok(()), + ) + .is_ok() +} + +fn sqlite_table_string_rows(conn: &Connection, table: &str) -> Vec<(String, String)> { + let mut out = Vec::new(); + let direct_sql = format!("SELECT key, CAST(value AS TEXT) FROM {table}"); + if let Ok(mut stmt) = conn.prepare(&direct_sql) + && let Ok(rows) = stmt.query_map([], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) + }) + { + out.extend( + rows.flatten() + .filter(|(_, value)| looks_like_jsonish(value)), + ); + if !out.is_empty() { + return out; } } + + let pragma = format!("PRAGMA table_info({table})"); + let Ok(mut info_stmt) = conn.prepare(&pragma) else { + return out; + }; + let Ok(columns) = info_stmt.query_map([], |row| row.get::<_, String>(1)) else { + return out; + }; + let columns = columns.flatten().collect::>(); + if columns.is_empty() { + return out; + } + let sql = format!("SELECT * FROM {table}"); + let Ok(mut stmt) = conn.prepare(&sql) else { + return out; + }; + let Ok(mut rows) = stmt.query([]) else { + return out; + }; + while let Ok(Some(row)) = rows.next() { + let mut key = None; + let mut value = None; + for (idx, column) in columns.iter().enumerate() { + if key.is_none() + && (column == "key" || column == "id" || column.ends_with("Id")) + && let Ok(text) = row.get::<_, String>(idx) + { + key = Some(text); + } + if value.is_none() + && let Ok(text) = row.get::<_, String>(idx) + && looks_like_jsonish(&text) + { + value = Some(text); + } + if value.is_none() + && let Ok(bytes) = row.get::<_, Vec>(idx) + && let Ok(text) = String::from_utf8(bytes) + && looks_like_jsonish(&text) + { + value = Some(text); + } + } + if let Some(value) = value { + out.push((key.unwrap_or_else(|| table.to_string()), value)); + } + } + out +} + +fn looks_like_jsonish(text: &str) -> bool { + let trimmed = text.trim(); + trimmed.starts_with('{') || trimmed.starts_with('[') } // --------------------------------------------------------------------------- @@ -119,10 +1275,139 @@ async fn collect_cursor_project_dirs(root: &Path, results: &mut Vec) { mod tests { use super::*; use crate::agents::app_config_dir_in; + use crate::scanner; use tempfile::TempDir; + fn create_cursor_db(path: &Path) -> Connection { + let conn = Connection::open(path).unwrap(); + conn.execute_batch( + "CREATE TABLE ItemTable (key TEXT PRIMARY KEY, value BLOB); + CREATE TABLE cursorDiskKV (key TEXT PRIMARY KEY, value BLOB);", + ) + .unwrap(); + conn + } + + fn create_cursor_store_db(path: &Path) -> Connection { + let conn = Connection::open(path).unwrap(); + conn.execute_batch( + "CREATE TABLE meta (key TEXT PRIMARY KEY, value BLOB); + CREATE TABLE blobs (key TEXT PRIMARY KEY, value BLOB);", + ) + .unwrap(); + conn + } + + async fn write_workspace(path: &Path, folder: &str, composer_id: &str, updated_at: i64) { + tokio::fs::create_dir_all(path).await.unwrap(); + tokio::fs::write( + path.join("workspace.json"), + json!({ "folder": folder }).to_string(), + ) + .await + .unwrap(); + let db = create_cursor_db(&path.join("state.vscdb")); + db.execute( + "INSERT INTO ItemTable (key, value) VALUES (?1, ?2)", + params![ + "composer.composerData", + json!({ + "allComposers": [{ + "composerId": composer_id, + "lastUpdatedAt": updated_at, + "createdAt": updated_at - 1000, + }] + }) + .to_string() + ], + ) + .unwrap(); + } + + fn seed_global_cursor_session(path: &Path, composer_id: &str) { + let db = create_cursor_db(path); + db.execute( + "INSERT INTO cursorDiskKV (key, value) VALUES (?1, ?2)", + params![ + format!("composerData:{composer_id}"), + json!({ + "composerId": composer_id, + "createdAt": 1_775_083_355_225i64, + "lastUpdatedAt": 1_775_083_382_256i64, + "modelConfig": { "modelName": "composer-2" }, + "fullConversationHeadersOnly": [ + { "bubbleId": "u1", "type": 1 }, + { "bubbleId": "a1", "type": 2 } + ] + }) + .to_string() + ], + ) + .unwrap(); + db.execute( + "INSERT INTO cursorDiskKV (key, value) VALUES (?1, ?2)", + params![ + format!("bubbleId:{composer_id}:u1"), + json!({ + "type": 1, + "text": "Explain the architecture", + "createdAt": "2026-04-01T22:42:55.658Z", + "modelInfo": { "modelName": "composer-2" } + }) + .to_string() + ], + ) + .unwrap(); + db.execute( + "INSERT INTO cursorDiskKV (key, value) VALUES (?1, ?2)", + params![ + format!("bubbleId:{composer_id}:a1"), + json!({ + "type": 2, + "markdown": "Here is the architecture.", + "createdAt": "2026-04-01T22:42:57.468Z" + }) + .to_string() + ], + ) + .unwrap(); + } + + fn seed_cursor_store_db(path: &Path, session_id: &str) { + let db = create_cursor_store_db(path); + db.execute( + "INSERT INTO meta (key, value) VALUES (?1, ?2)", + params![ + "sessionMeta", + json!({ + "agentId": session_id, + "workspacePath": "/Users/zack/dev/cadence-cli", + "lastUsedModel": "claude-sonnet-4", + "createdAt": 1_775_083_355_225i64, + "lastUpdatedAt": 1_775_083_382_256i64, + }) + .to_string() + ], + ) + .unwrap(); + db.execute( + "INSERT INTO blobs (key, value) VALUES (?1, ?2)", + params![ + "message-1", + json!({ + "messages": [ + {"role": "user", "content": "Investigate the outage", "createdAt": 1_775_083_355_225i64}, + {"role": "assistant", "content": "Here is the outage analysis.", "createdAt": 1_775_083_382_256i64} + ] + }) + .to_string() + ], + ) + .unwrap(); + } + #[tokio::test] - async fn test_cursor_log_dirs_collects_chat_sessions_and_projects() { + async fn test_cursor_log_dirs_collects_chat_sessions_and_agent_transcripts() { let home = TempDir::new().unwrap(); let ws_root = app_config_dir_in("Cursor", home.path()) @@ -132,43 +1417,252 @@ mod tests { .join("chatSessions"); tokio::fs::create_dir_all(&ws_root).await.unwrap(); - let projects_dir = home.path().join(".cursor").join("projects").join("p1"); - tokio::fs::create_dir_all(&projects_dir).await.unwrap(); - tokio::fs::write(projects_dir.join("session.txt"), "content") + let transcripts_dir = home + .path() + .join(".cursor") + .join("projects") + .join("Users-zack-dev-cadence-cli") + .join("agent-transcripts") + .join("conversation"); + tokio::fs::create_dir_all(&transcripts_dir).await.unwrap(); + tokio::fs::write(transcripts_dir.join("conversation.jsonl"), "{}\n") .await .unwrap(); let dirs = log_dirs_in(home.path()).await; assert!(dirs.contains(&ws_root)); - assert!(dirs.contains(&projects_dir)); + assert!(dirs.iter().any(|dir| dir.ends_with("agent-transcripts"))); } #[tokio::test] - async fn test_cursor_log_dirs_skips_mcps_project_metadata() { + async fn test_cursor_log_dirs_ignore_non_session_project_dirs() { let home = TempDir::new().unwrap(); - let mcps_tools_dir = home + let agent_tools_dir = home .path() .join(".cursor") .join("projects") .join("p1") - .join("mcps") - .join("user-shadcn-ui") - .join("tools"); - tokio::fs::create_dir_all(&mcps_tools_dir).await.unwrap(); - tokio::fs::write(mcps_tools_dir.join("get_component.json"), "{}") + .join("agent-tools"); + tokio::fs::create_dir_all(&agent_tools_dir).await.unwrap(); + tokio::fs::write(agent_tools_dir.join("tool.txt"), "content") .await .unwrap(); - let real_project_dir = home.path().join(".cursor").join("projects").join("p2"); - tokio::fs::create_dir_all(&real_project_dir).await.unwrap(); - tokio::fs::write(real_project_dir.join("session.txt"), "content") + let terminals_dir = home + .path() + .join(".cursor") + .join("projects") + .join("p1") + .join("terminals"); + tokio::fs::create_dir_all(&terminals_dir).await.unwrap(); + tokio::fs::write(terminals_dir.join("1.txt"), "content") .await .unwrap(); let dirs = log_dirs_in(home.path()).await; + assert!(dirs.is_empty()); + } + + #[tokio::test] + async fn test_cursor_discovers_agent_transcript_and_recovers_metadata() { + let home = TempDir::new().unwrap(); + let repo_root = home + .path() + .join("Users") + .join("zack") + .join("dev") + .join("cadence-cli"); + tokio::fs::create_dir_all(&repo_root).await.unwrap(); - assert!(dirs.contains(&real_project_dir)); - assert!(!dirs.contains(&mcps_tools_dir)); + let transcript = home + .path() + .join(".cursor") + .join("projects") + .join(encode_cursor_project_workspace_key(&repo_root)) + .join("agent-transcripts") + .join("abc") + .join("abc.jsonl"); + tokio::fs::create_dir_all(transcript.parent().unwrap()) + .await + .unwrap(); + tokio::fs::write( + &transcript, + "{\"role\":\"user\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}}\n", + ) + .await + .unwrap(); + + let now = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + let logs = discover_recent_in(home.path(), now, 3600).await; + assert_eq!(logs.len(), 1); + + let content = match &logs[0].source { + SessionSource::Inline { content, .. } => content, + SessionSource::File(_) => panic!("expected inline transcript"), + }; + let metadata = scanner::parse_session_metadata_str(content); + assert_eq!(metadata.session_id.as_deref(), Some("abc")); + assert_eq!( + metadata.cwd.as_deref(), + Some(repo_root.to_string_lossy().as_ref()) + ); + } + + #[tokio::test] + async fn test_cursor_discovers_desktop_state_vscdb_sessions() { + let home = TempDir::new().unwrap(); + let global_dir = app_config_dir_in("Cursor", home.path()) + .join("User") + .join("globalStorage"); + tokio::fs::create_dir_all(&global_dir).await.unwrap(); + seed_global_cursor_session(&global_dir.join("state.vscdb"), "composer-1"); + + let workspace_dir = app_config_dir_in("Cursor", home.path()) + .join("User") + .join("workspaceStorage") + .join("workspace-1"); + write_workspace( + &workspace_dir, + if cfg!(windows) { + "file:///C:/cursor-test-workspace" + } else { + "file:///tmp/cursor-test-workspace" + }, + "composer-1", + 1_775_083_382_256i64, + ) + .await; + + let logs = discover_recent_in(home.path(), 1_775_083_400, 3600).await; + assert_eq!(logs.len(), 1); + let content = match &logs[0].source { + SessionSource::Inline { content, .. } => content, + SessionSource::File(_) => panic!("expected inline desktop log"), + }; + let metadata = scanner::parse_session_metadata_str(content); + assert_eq!(metadata.session_id.as_deref(), Some("composer-1")); + assert_eq!( + metadata.cwd.as_deref(), + Some(if cfg!(windows) { + "C:/cursor-test-workspace" + } else { + "/tmp/cursor-test-workspace" + }) + ); + assert!(content.contains("Explain the architecture")); + assert!(content.contains("Here is the architecture.")); + } + + #[test] + fn test_cursor_dedupe_preserves_bubble_order() { + let mut ids = vec![ + "bubble-2".to_string(), + "bubble-10".to_string(), + "bubble-2".to_string(), + "bubble-1".to_string(), + ]; + + dedupe_preserving_order(&mut ids); + + assert_eq!(ids, vec!["bubble-2", "bubble-10", "bubble-1"]); + } + + #[tokio::test] + async fn test_cursor_discovers_store_db_sessions() { + let home = TempDir::new().unwrap(); + let store_dir = home + .path() + .join(".cursor") + .join("chats") + .join("a") + .join("b"); + tokio::fs::create_dir_all(&store_dir).await.unwrap(); + seed_cursor_store_db(&store_dir.join("store.db"), "store-session-1"); + + let logs = + discover_store_db_sessions(home.path(), current_unix_epoch_for_tests(), 3600).await; + assert_eq!(logs.len(), 1); + let content = match &logs[0].log.source { + SessionSource::Inline { content, .. } => content, + SessionSource::File(_) => panic!("expected inline store db session"), + }; + let metadata = scanner::parse_session_metadata_str(content); + assert_eq!(metadata.session_id.as_deref(), Some("store-session-1")); + assert_eq!(metadata.cwd.as_deref(), Some("/Users/zack/dev/cadence-cli")); + assert!(content.contains("Investigate the outage")); + assert!(content.contains("Here is the outage analysis.")); + } + + #[tokio::test] + async fn test_cursor_dedupes_duplicate_transcript_and_desktop_session_ids() { + let home = TempDir::new().unwrap(); + let repo_root = PathBuf::from("/Users/zack/dev/cadence-cli"); + + let global_dir = app_config_dir_in("Cursor", home.path()) + .join("User") + .join("globalStorage"); + tokio::fs::create_dir_all(&global_dir).await.unwrap(); + seed_global_cursor_session(&global_dir.join("state.vscdb"), "shared-session"); + + let workspace_dir = app_config_dir_in("Cursor", home.path()) + .join("User") + .join("workspaceStorage") + .join("workspace-1"); + write_workspace( + &workspace_dir, + &format!("file://{}", repo_root.display()), + "shared-session", + 1_775_083_382_256i64, + ) + .await; + + let encoded = encode_cursor_project_workspace_key(&repo_root); + let transcript = home + .path() + .join(".cursor") + .join("projects") + .join(encoded) + .join("agent-transcripts") + .join("shared-session") + .join("shared-session.jsonl"); + tokio::fs::create_dir_all(transcript.parent().unwrap()) + .await + .unwrap(); + tokio::fs::write( + &transcript, + "{\"role\":\"user\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"short transcript\"}]}}\n", + ) + .await + .unwrap(); + + let logs = discover_recent_in(home.path(), 1_775_083_400, 3600).await; + assert_eq!(logs.len(), 1); + let content = match &logs[0].source { + SessionSource::Inline { content, .. } => content, + SessionSource::File(_) => panic!("expected inline session"), + }; + assert!(content.contains("Explain the architecture")); + assert!(!content.contains("short transcript")); + } + + fn current_unix_epoch_for_tests() -> i64 { + std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + } + + #[test] + fn test_encode_cursor_project_workspace_key_windows_drive_format() { + let encoded = + encode_cursor_project_workspace_key(Path::new(r"C:\Users\zack\dev\cadence-cli")); + + if cfg!(windows) { + assert_eq!(encoded, "C--Users-zack-dev-cadence-cli"); + } } } diff --git a/src/agents/warp.rs b/src/agents/warp.rs index 657c4acb..a645b756 100644 --- a/src/agents/warp.rs +++ b/src/agents/warp.rs @@ -11,6 +11,7 @@ use std::time::Duration; use async_trait::async_trait; use protobuf::CodedInputStream; use protobuf::rt::WireType; +use rusqlite::types::ValueRef; use rusqlite::{Connection, OpenFlags}; use serde_json::{Map, Value, json}; @@ -70,14 +71,46 @@ fn warp_db_paths() -> Vec { ] { paths.push(base.join(name).join("warp.sqlite")); } - } else if cfg!(target_os = "windows") { - if let Ok(appdata) = std::env::var("APPDATA") { - paths.push(PathBuf::from(appdata).join("Warp").join("warp.sqlite")); + for legacy in [ + home.join("Library") + .join("Application Support") + .join("dev.warp.Warp-Stable") + .join("warp.sqlite"), + home.join("Library") + .join("Application Support") + .join("dev.warp.Warp") + .join("warp.sqlite"), + ] { + paths.push(legacy); } + } else if cfg!(target_os = "windows") { if let Ok(local) = std::env::var("LOCALAPPDATA") { + paths.push( + PathBuf::from(&local) + .join("warp") + .join("Warp") + .join("data") + .join("warp.sqlite"), + ); paths.push(PathBuf::from(local).join("Warp").join("warp.sqlite")); } + if let Ok(appdata) = std::env::var("APPDATA") { + paths.push(PathBuf::from(appdata).join("Warp").join("warp.sqlite")); + } } else { + if let Ok(xdg_state_home) = std::env::var("XDG_STATE_HOME") { + paths.push( + PathBuf::from(xdg_state_home) + .join("warp-terminal") + .join("warp.sqlite"), + ); + } + paths.push( + home.join(".local") + .join("state") + .join("warp-terminal") + .join("warp.sqlite"), + ); paths.push( home.join(".local") .join("share") @@ -87,7 +120,18 @@ fn warp_db_paths() -> Vec { paths.push(home.join(".config").join("warp").join("warp.sqlite")); } - paths + dedupe_paths(paths) +} + +fn dedupe_paths(paths: Vec) -> Vec { + let mut seen = std::collections::HashSet::new(); + let mut out = Vec::new(); + for path in paths { + if seen.insert(path.clone()) { + out.push(path); + } + } + out } fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { @@ -101,16 +145,21 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { }; let _ = conn.busy_timeout(Duration::from_millis(200)); - if !table_exists(&conn, "ai_queries") { + if !table_exists(&conn, "ai_queries") + && !table_exists(&conn, "agent_conversations") + && !table_exists(&conn, "agent_tasks") + { return out; } let cutoff = now - since_secs; let rows = fetch_ai_query_rows(&conn, cutoff); let tasks_by_conversation = fetch_agent_tasks_by_conversation(&conn, cutoff); - if rows.is_empty() && tasks_by_conversation.is_empty() { + let conversation_meta = fetch_agent_conversation_meta(&conn, cutoff); + if rows.is_empty() && tasks_by_conversation.is_empty() && conversation_meta.is_empty() { return out; } + let blocks_by_conversation = fetch_block_cwds_by_conversation(&conn); let mut by_conversation: BTreeMap> = BTreeMap::new(); let mut unknown_idx = 0usize; @@ -129,10 +178,15 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { for conversation_id in tasks_by_conversation.keys() { by_conversation.entry(conversation_id.clone()).or_default(); } + for conversation_id in conversation_meta.keys() { + by_conversation.entry(conversation_id.clone()).or_default(); + } for (conversation_id, rows) in by_conversation { let mut max_start = 0i64; - let mut conversation_cwd: Option = None; + let mut conversation_cwd = conversation_meta + .get(&conversation_id) + .and_then(|meta| meta.cwd.clone()); let mut events = Vec::new(); let mut ord = 0usize; let task_rows = tasks_by_conversation.get(&conversation_id); @@ -145,6 +199,11 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { { conversation_cwd = Some(cwd.to_string()); } + if conversation_cwd.is_none() + && let Some(cwd) = row.input.as_deref().and_then(extract_cwd_from_ai_input) + { + conversation_cwd = Some(cwd); + } let prompt = row .input @@ -178,10 +237,23 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { } } + if conversation_cwd.is_none() { + conversation_cwd = blocks_by_conversation + .get(&conversation_id) + .cloned() + .flatten(); + } + if let Some(task_rows) = task_rows { for task in task_rows { max_start = max_start.max(task.last_modified_ts); let decoded = decode_warp_task(&task.task_blob); + if conversation_cwd.is_none() + && let Some(decoded) = decoded.as_ref() + && let Some(cwd) = extract_cwd_from_decoded_task(decoded) + { + conversation_cwd = Some(cwd); + } let normalized = normalize_task_events( &conversation_id, task, @@ -244,6 +316,9 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { Value::String(model.to_string()), ); } + if let Some(meta) = conversation_meta.get(&conversation_id) { + meta.apply_to(&mut obj); + } deduped.push(Value::Object(obj)); } else if let Some(first_task) = task_rows.and_then(|rows| rows.first()) { let mut obj = base_event( @@ -259,6 +334,24 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { if let Some(cwd) = conversation_cwd.as_deref() { obj.insert("cwd".to_string(), Value::String(cwd.to_string())); } + if let Some(meta) = conversation_meta.get(&conversation_id) { + meta.apply_to(&mut obj); + } + deduped.push(Value::Object(obj)); + } else if let Some(meta) = conversation_meta.get(&conversation_id) { + let mut obj = base_event( + "warp_meta", + &conversation_id, + meta.last_modified_at.unwrap_or_default(), + "agent_conversations", + ); + if let Some(cwd) = conversation_cwd.as_deref() { + obj.insert("cwd".to_string(), Value::String(cwd.to_string())); + } + meta.apply_to(&mut obj); + if let Some(text) = meta.summary_text.as_deref() { + obj.insert("content".to_string(), Value::String(text.to_string())); + } deduped.push(Value::Object(obj)); } } @@ -613,6 +706,11 @@ fn extract_prompt_from_ai_input(raw: &str) -> Option { None } +fn extract_cwd_from_ai_input(raw: &str) -> Option { + let value = serde_json::from_str::(raw).ok()?; + extract_cwd_from_ai_value(&value) +} + fn extract_prompt_from_value(value: &Value) -> Option { let obj = value.as_object()?; for key in [ @@ -632,6 +730,85 @@ fn extract_prompt_from_value(value: &Value) -> Option { None } +fn extract_cwd_from_ai_value(value: &Value) -> Option { + for pointer in [ + "/cwd", + "/working_directory", + "/workspacePath", + "/Query/context/Directory/pwd", + "/Query/context/directory/pwd", + "/context/Directory/pwd", + "/context/directory/pwd", + ] { + if let Some(path) = value.pointer(pointer).and_then(Value::as_str) + && let Some(path) = non_empty_str(path) + { + return Some(path.to_string()); + } + } + + if let Value::Array(items) = value { + for item in items { + if let Some(query) = item.get("Query") + && let Some(path) = query + .pointer("/context/Directory/pwd") + .or_else(|| query.pointer("/context/directory/pwd")) + .and_then(Value::as_str) + .and_then(non_empty_str) + { + return Some(path.to_string()); + } + } + } + + None +} + +fn extract_summary_text(value: &Value) -> Option { + for pointer in [ + "/title", + "/summary", + "/latest_user_message", + "/latest_message", + ] { + if let Some(text) = value.pointer(pointer).and_then(Value::as_str) + && let Some(text) = sanitize_text(text) + { + return Some(text); + } + } + None +} + +fn extract_cwd_from_decoded_task(decoded: &WarpDecodedTask) -> Option { + for node in &decoded.flat_nodes { + let Some(text) = node.string_value.as_deref() else { + continue; + }; + let candidate = text.trim(); + if is_probable_filesystem_path(candidate) { + return Some(candidate.to_string()); + } + } + None +} + +fn is_probable_filesystem_path(text: &str) -> bool { + let trimmed = text.trim(); + if trimmed.len() < 2 || trimmed.len() > 1024 { + return false; + } + if trimmed.contains('\n') || trimmed.contains('\r') { + return false; + } + if trimmed.starts_with('/') { + return true; + } + trimmed.len() > 3 + && trimmed.as_bytes()[1] == b':' + && (trimmed.as_bytes()[2] == b'\\' || trimmed.as_bytes()[2] == b'/') +} + fn sanitize_text(raw: &str) -> Option { let trimmed = raw.trim(); if trimmed.is_empty() { @@ -706,6 +883,9 @@ fn table_exists(conn: &Connection, table: &str) -> bool { } fn fetch_ai_query_rows(conn: &Connection, cutoff: i64) -> Vec { + if !table_exists(conn, "ai_queries") { + return Vec::new(); + } let start_ts_expr = normalized_start_ts_sql("start_ts"); let query_full = format!( "SELECT exchange_id, conversation_id, {start_ts_expr} AS start_ts_epoch, input, working_directory, output_status, model_id, planning_model_id, coding_model_id @@ -813,6 +993,118 @@ fn fetch_agent_tasks_by_conversation( out } +fn fetch_block_cwds_by_conversation(conn: &Connection) -> HashMap> { + let mut out = HashMap::new(); + if !table_exists(conn, "blocks") { + return out; + } + + let Ok(mut stmt) = conn.prepare("SELECT ai_metadata, pwd FROM blocks") else { + return out; + }; + let Ok(rows) = stmt.query_map([], |row| { + Ok((row_optional_text(row, 0), row_optional_text(row, 1))) + }) else { + return out; + }; + + for row in rows.flatten() { + let (Some(ai_metadata), Some(pwd)) = row else { + continue; + }; + let Some(cwd) = non_empty_str(&pwd).map(ToOwned::to_owned) else { + continue; + }; + let Ok(value) = serde_json::from_str::(&ai_metadata) else { + continue; + }; + let Some(conversation_id) = value + .get("conversation_id") + .or_else(|| value.pointer("/conversation/id")) + .and_then(Value::as_str) + else { + continue; + }; + out.entry(conversation_id.to_string()).or_insert(Some(cwd)); + } + + out +} + +fn row_optional_text(row: &rusqlite::Row<'_>, idx: usize) -> Option { + match row.get_ref(idx).ok()? { + ValueRef::Null => None, + ValueRef::Text(bytes) => String::from_utf8(bytes.to_vec()).ok(), + ValueRef::Blob(bytes) => String::from_utf8(bytes.to_vec()).ok(), + ValueRef::Integer(value) => Some(value.to_string()), + ValueRef::Real(value) => Some(value.to_string()), + } +} + +fn fetch_agent_conversation_meta( + conn: &Connection, + cutoff: i64, +) -> HashMap { + let mut out = HashMap::new(); + if !table_exists(conn, "agent_conversations") { + return out; + } + + let ts_expr = normalized_start_ts_sql("last_modified_at"); + let query = format!( + "SELECT conversation_id, conversation_data, {ts_expr} AS ts_epoch FROM agent_conversations WHERE conversation_id IS NOT NULL AND {ts_expr} >= ?1" + ); + if let Ok(mut stmt) = conn.prepare(&query) + && let Ok(rows) = stmt.query_map([cutoff], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + )) + }) + { + for row in rows.flatten() { + let Some(raw) = row.1 else { + continue; + }; + let Ok(mut value) = serde_json::from_str::(&raw) else { + continue; + }; + if let Some(ts) = row.2 + && let Value::Object(map) = &mut value + { + map.entry("last_modified_at".to_string()) + .or_insert_with(|| Value::Number(ts.into())); + } + out.insert(row.0, WarpConversationMeta::from_value(&value)); + } + return out; + } + + let Ok(mut stmt) = conn.prepare( + "SELECT conversation_id, conversation_data FROM agent_conversations WHERE conversation_id IS NOT NULL", + ) else { + return out; + }; + let Ok(rows) = stmt.query_map([], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, Option>(1)?)) + }) else { + return out; + }; + + for row in rows.flatten() { + let Some(raw) = row.1 else { + continue; + }; + let Ok(value) = serde_json::from_str::(&raw) else { + continue; + }; + out.insert(row.0, WarpConversationMeta::from_value(&value)); + } + + out +} + #[derive(Debug, Clone)] struct EmittedEvent { ts: i64, @@ -842,6 +1134,83 @@ struct AgentTaskRow { task_blob: Vec, } +#[derive(Debug, Clone, Default)] +struct WarpConversationMeta { + cwd: Option, + model_id: Option, + planning_model_id: Option, + coding_model_id: Option, + server_conversation_token: Option, + last_modified_at: Option, + summary_text: Option, +} + +impl WarpConversationMeta { + fn from_value(value: &Value) -> Self { + Self { + cwd: value + .pointer("/working_directory") + .or_else(|| value.pointer("/cwd")) + .or_else(|| value.pointer("/workspacePath")) + .and_then(Value::as_str) + .and_then(non_empty_str) + .map(ToOwned::to_owned), + model_id: value + .pointer("/model_id") + .or_else(|| value.pointer("/model/name")) + .or_else(|| value.pointer("/conversation_usage_metadata/model_id")) + .and_then(Value::as_str) + .and_then(non_empty_str) + .map(ToOwned::to_owned), + planning_model_id: value + .pointer("/planning_model_id") + .and_then(Value::as_str) + .and_then(non_empty_str) + .map(ToOwned::to_owned), + coding_model_id: value + .pointer("/coding_model_id") + .and_then(Value::as_str) + .and_then(non_empty_str) + .map(ToOwned::to_owned), + server_conversation_token: value + .get("server_conversation_token") + .and_then(Value::as_str) + .and_then(non_empty_str) + .map(ToOwned::to_owned), + last_modified_at: value + .pointer("/last_modified_at") + .and_then(Value::as_i64) + .or_else(|| value.pointer("/updated_at").and_then(Value::as_i64)), + summary_text: extract_prompt_from_value(value).or_else(|| extract_summary_text(value)), + } + } + + fn apply_to(&self, obj: &mut Map) { + if let Some(cwd) = self.cwd.as_deref() { + obj.entry("cwd".to_string()) + .or_insert_with(|| Value::String(cwd.to_string())); + } + if let Some(model_id) = self.model_id.as_deref() { + obj.entry("model_id".to_string()) + .or_insert_with(|| Value::String(model_id.to_string())); + } + if let Some(model_id) = self.planning_model_id.as_deref() { + obj.entry("planning_model_id".to_string()) + .or_insert_with(|| Value::String(model_id.to_string())); + } + if let Some(model_id) = self.coding_model_id.as_deref() { + obj.entry("coding_model_id".to_string()) + .or_insert_with(|| Value::String(model_id.to_string())); + } + if let Some(token) = self.server_conversation_token.as_deref() { + obj.insert( + "server_conversation_token".to_string(), + Value::String(token.to_string()), + ); + } + } +} + #[derive(Debug, Clone)] struct WarpDecodedTask { flat_nodes: Vec, @@ -1238,13 +1607,21 @@ mod tests { planning_model_id TEXT, coding_model_id TEXT ); - CREATE TABLE agent_tasks ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - conversation_id TEXT NOT NULL, - task_id TEXT NOT NULL, - task BLOB NOT NULL, - last_modified_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP - );", + CREATE TABLE agent_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id TEXT NOT NULL, + task_id TEXT NOT NULL, + task BLOB NOT NULL, + last_modified_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE blocks ( + ai_metadata TEXT, + pwd TEXT + ); + CREATE TABLE agent_conversations ( + conversation_id TEXT, + conversation_data TEXT + );", ) .expect("create"); conn @@ -1418,6 +1795,231 @@ mod tests { assert!(metadata.cwd.is_none()); } + #[tokio::test] + #[serial] + async fn warp_recovers_cwd_from_ai_query_input_context() { + let tmp = TempDir::new().expect("tmp"); + let db_path = tmp.path().join("warp.sqlite"); + let conn = create_db(&db_path); + conn.execute( + "INSERT INTO ai_queries (exchange_id, conversation_id, start_ts, input, output_status) + VALUES (?1, ?2, ?3, ?4, ?5)", + ( + "ex1", + "conv-input-cwd", + 1_700_000_000i64, + r#"[{"Query":{"text":"hi","context":{"Directory":{"pwd":"/tmp/repo-from-input"}}}}]"#, + "Succeeded", + ), + ) + .expect("insert"); + + unsafe { + std::env::set_var("WARP_DB_PATH", &db_path); + } + let logs = WarpExplorer.discover_recent(1_700_000_200, 1_000).await; + unsafe { + std::env::remove_var("WARP_DB_PATH"); + } + + let log = logs.first().expect("log"); + let content = match &log.source { + SessionSource::Inline { content, .. } => content, + _ => panic!("expected inline"), + }; + let metadata = scanner::parse_session_metadata_str(content); + assert_eq!(metadata.cwd.as_deref(), Some("/tmp/repo-from-input")); + } + + #[tokio::test] + #[serial] + async fn warp_recovers_cwd_from_blocks_when_queries_lack_it() { + let tmp = TempDir::new().expect("tmp"); + let db_path = tmp.path().join("warp.sqlite"); + let conn = create_db(&db_path); + conn.execute( + "INSERT INTO ai_queries (exchange_id, conversation_id, start_ts, input, output_status) + VALUES (?1, ?2, ?3, ?4, ?5)", + ( + "ex1", + "conv-block-cwd", + 1_700_000_000i64, + r#"{"prompt":"hi"}"#, + "Succeeded", + ), + ) + .expect("insert"); + conn.execute( + "INSERT INTO blocks (ai_metadata, pwd) VALUES (?1, ?2)", + ( + r#"{"conversation_id":"conv-block-cwd"}"#, + "/tmp/repo-from-blocks", + ), + ) + .expect("insert block"); + + unsafe { + std::env::set_var("WARP_DB_PATH", &db_path); + } + let logs = WarpExplorer.discover_recent(1_700_000_200, 1_000).await; + unsafe { + std::env::remove_var("WARP_DB_PATH"); + } + + let log = logs.first().expect("log"); + let content = match &log.source { + SessionSource::Inline { content, .. } => content, + _ => panic!("expected inline"), + }; + let metadata = scanner::parse_session_metadata_str(content); + assert_eq!(metadata.cwd.as_deref(), Some("/tmp/repo-from-blocks")); + } + + #[tokio::test] + #[serial] + async fn warp_enriches_meta_from_agent_conversations() { + let tmp = TempDir::new().expect("tmp"); + let db_path = tmp.path().join("warp.sqlite"); + let conn = create_db(&db_path); + conn.execute( + "INSERT INTO agent_tasks (conversation_id, task_id, task, last_modified_at) + VALUES (?1, ?2, ?3, ?4)", + ("conv-meta", "task-1", Vec::::new(), 1_772_582_530i64), + ) + .expect("insert task"); + conn.execute( + "INSERT INTO agent_conversations (conversation_id, conversation_data) VALUES (?1, ?2)", + ( + "conv-meta", + r#"{"working_directory":"/tmp/repo-meta","model_id":"claude-opus-4.1","server_conversation_token":"token-123"}"#, + ), + ) + .expect("insert conversation meta"); + + unsafe { + std::env::set_var("WARP_DB_PATH", &db_path); + } + let logs = WarpExplorer.discover_recent(1_772_583_000, 1_000).await; + unsafe { + std::env::remove_var("WARP_DB_PATH"); + } + + let log = logs.first().expect("log"); + let content = match &log.source { + SessionSource::Inline { content, .. } => content, + _ => panic!("expected inline"), + }; + assert!(content.contains("/tmp/repo-meta")); + assert!(content.contains("claude-opus-4.1")); + assert!(content.contains("token-123")); + } + + #[tokio::test] + #[serial] + async fn warp_meta_only_conversation_still_uses_block_cwd_fallback() { + let tmp = TempDir::new().expect("tmp"); + let db_path = tmp.path().join("warp.sqlite"); + let conn = create_db(&db_path); + conn.execute( + "INSERT INTO agent_conversations (conversation_id, conversation_data) VALUES (?1, ?2)", + ( + "conv-meta-only", + r#"{"server_conversation_token":"token-123","conversation_usage_metadata":{"token_usage":[{"model_id":"Claude Opus 4.6"}]}}"#, + ), + ) + .expect("insert agent conversation"); + conn.execute( + "INSERT INTO blocks (ai_metadata, pwd) VALUES (?1, ?2)", + ( + br#"{"conversation_id":"conv-meta-only","requested_command_action_id":"tool-1"}"# + .to_vec(), + "/tmp/repo-from-block-fallback", + ), + ) + .expect("insert block fallback"); + + unsafe { + std::env::set_var("WARP_DB_PATH", &db_path); + } + let logs = WarpExplorer.discover_recent(1_775_089_527, 2_592_000).await; + unsafe { + std::env::remove_var("WARP_DB_PATH"); + } + + let log = logs + .iter() + .find(|log| log.source_label() == "warp:conv-meta-only") + .expect("expected warp log to exist"); + let content = match &log.source { + SessionSource::Inline { content, .. } => content, + _ => panic!("expected inline warp log"), + }; + let metadata = scanner::parse_session_metadata_str(content); + assert_eq!(metadata.session_id.as_deref(), Some("conv-meta-only")); + assert_eq!( + metadata.cwd.as_deref(), + Some("/tmp/repo-from-block-fallback") + ); + assert!(content.contains("token-123")); + } + + #[test] + fn warp_reads_block_cwd_when_ai_metadata_is_blob() { + let conn = Connection::open_in_memory().expect("mem db"); + conn.execute_batch("CREATE TABLE blocks (ai_metadata BLOB, pwd TEXT);") + .expect("create blocks"); + conn.execute( + "INSERT INTO blocks (ai_metadata, pwd) VALUES (?1, ?2)", + ( + br#"{"conversation_id":"conv-1"}"#.to_vec(), + "/tmp/repo-from-blob", + ), + ) + .expect("insert block row"); + + let by_conversation = fetch_block_cwds_by_conversation(&conn); + assert_eq!( + by_conversation.get("conv-1").cloned().flatten().as_deref(), + Some("/tmp/repo-from-blob") + ); + } + + #[test] + fn warp_agent_conversation_meta_respects_cutoff() { + let conn = Connection::open_in_memory().expect("mem db"); + conn.execute_batch( + "CREATE TABLE agent_conversations ( + conversation_id TEXT, + conversation_data TEXT, + last_modified_at INTEGER + );", + ) + .expect("create agent conversations"); + conn.execute( + "INSERT INTO agent_conversations (conversation_id, conversation_data, last_modified_at) VALUES (?1, ?2, ?3)", + ( + "old-conv", + r#"{"working_directory":"/tmp/old","summary":"old"}"#, + 1_700_000_000i64, + ), + ) + .expect("insert old conversation"); + conn.execute( + "INSERT INTO agent_conversations (conversation_id, conversation_data, last_modified_at) VALUES (?1, ?2, ?3)", + ( + "new-conv", + r#"{"working_directory":"/tmp/new","summary":"new"}"#, + 1_800_000_000i64, + ), + ) + .expect("insert new conversation"); + + let by_conversation = fetch_agent_conversation_meta(&conn, 1_750_000_000); + + assert!(!by_conversation.contains_key("old-conv")); + assert!(by_conversation.contains_key("new-conv")); + } + #[tokio::test] #[serial] async fn warp_ms_timestamps_respected() { diff --git a/src/main.rs b/src/main.rs index 1ba98b35..5c47a77d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -746,6 +746,32 @@ async fn session_log_metadata(log: &agents::SessionLog) -> scanner::SessionMetad metadata } +fn discovery_skip_reason_for_missing_metadata(log: &agents::SessionLog) -> &'static str { + match log.agent_type { + scanner::AgentType::Cursor => { + let label = log.source_label(); + if label.contains("agent-tools") + || label.contains("terminals") + || label.contains("mcps") + { + "cursor_non_session_artifact" + } else { + "cursor_unsupported_storage_shape" + } + } + scanner::AgentType::Warp => "warp_missing_session_id", + _ => "missing_session_metadata", + } +} + +fn discovery_skip_reason_for_missing_cwd(log: &agents::SessionLog) -> &'static str { + match log.agent_type { + scanner::AgentType::Cursor => "cursor_workspace_unresolved", + scanner::AgentType::Warp => "warp_cwd_unrecoverable_after_fallbacks", + _ => "missing_cwd", + } +} + async fn session_log_content_async(log: &agents::SessionLog) -> Option { match &log.source { agents::SessionSource::File(path) => tokio::fs::read_to_string(path).await.ok(), @@ -1372,10 +1398,12 @@ async fn run_backfill_inner_with_invocation( // Skip files with no session metadata (e.g., file-history-snapshot files) if metadata.session_id.is_none() && metadata.cwd.is_none() { + let reason = discovery_skip_reason_for_missing_metadata(log); ::tracing::info!( event = "session_discovery_skipped", file = file_path.as_str(), - reason = "missing_session_metadata" + agent = log.agent_type.to_string(), + reason ); if let Some(ref pb) = progress { pb.inc(1); @@ -1387,11 +1415,13 @@ async fn run_backfill_inner_with_invocation( let cwd = match &metadata.cwd { Some(c) => c.clone(), None => { + let reason = discovery_skip_reason_for_missing_cwd(log); ::tracing::info!( event = "session_discovery_skipped", file = file_path.as_str(), + agent = log.agent_type.to_string(), session_id = ?metadata.session_id, - reason = "missing_cwd" + reason ); if let Some(ref pb) = progress { pb.inc(1); @@ -3069,6 +3099,48 @@ mod tests { String::from_utf8(buf).expect("utf8 output") } + fn current_unix_epoch() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("current unix epoch") + .as_secs() as i64 + } + + fn encode_cursor_workspace_key_for_tests(path: &Path) -> String { + use std::path::Component; + + let mut encoded = String::new(); + for component in path.components() { + match component { + Component::Prefix(prefix) => { + let raw = prefix.as_os_str().to_string_lossy(); + if let Some(drive) = raw.strip_suffix(':') { + encoded.push_str(drive); + encoded.push_str("--"); + } else if !raw.is_empty() { + if !encoded.is_empty() && !encoded.ends_with('-') { + encoded.push('-'); + } + encoded.push_str(&raw.replace(['/', '\\', ':'], "-")); + } + } + Component::RootDir => {} + Component::Normal(segment) => { + let segment = segment.to_string_lossy(); + if segment.is_empty() { + continue; + } + if !encoded.is_empty() && !encoded.ends_with('-') { + encoded.push('-'); + } + encoded.push_str(&segment); + } + Component::CurDir | Component::ParentDir => {} + } + } + encoded + } + fn occurrence_count(haystack: &str, needle: &str) -> usize { haystack.matches(needle).count() } @@ -3174,6 +3246,196 @@ mod tests { path } + async fn write_cursor_agent_transcript( + home: &Path, + workspace_key: &str, + session_id: &str, + body: &str, + updated_at: i64, + ) -> PathBuf { + let path = home + .join(".cursor") + .join("projects") + .join(workspace_key) + .join("agent-transcripts") + .join(session_id) + .join(format!("{session_id}.jsonl")); + tokio::fs::create_dir_all(path.parent().expect("parent")) + .await + .expect("create cursor transcript dir"); + tokio::fs::write(&path, body) + .await + .expect("write cursor transcript"); + filetime::set_file_mtime(&path, filetime::FileTime::from_unix_time(updated_at, 0)) + .expect("set cursor transcript mtime"); + path + } + + async fn write_cursor_project_noise(home: &Path, workspace_key: &str) { + for relative in [ + PathBuf::from("agent-tools/tool-output.txt"), + PathBuf::from("terminals/1.txt"), + PathBuf::from("mcps/example/tools/tool.json"), + ] { + let path = home + .join(".cursor") + .join("projects") + .join(workspace_key) + .join(relative); + tokio::fs::create_dir_all(path.parent().expect("noise parent")) + .await + .expect("create cursor noise dir"); + tokio::fs::write(&path, "noise") + .await + .expect("write cursor noise file"); + } + } + + fn create_cursor_state_db(path: &Path) -> rusqlite::Connection { + let conn = rusqlite::Connection::open(path).expect("open cursor state db"); + conn.execute_batch( + "CREATE TABLE ItemTable (key TEXT PRIMARY KEY, value BLOB); + CREATE TABLE cursorDiskKV (key TEXT PRIMARY KEY, value BLOB);", + ) + .expect("create cursor state schema"); + conn + } + + async fn write_cursor_desktop_session( + home: &Path, + workspace_id: &str, + composer_id: &str, + repo_root: &Path, + updated_at_ms: i64, + user_text: &str, + assistant_text: &str, + ) { + let global_dir = agents::app_config_dir_in("Cursor", home) + .join("User") + .join("globalStorage"); + tokio::fs::create_dir_all(&global_dir) + .await + .expect("create cursor global storage dir"); + let global_db_path = global_dir.join("state.vscdb"); + let global = create_cursor_state_db(&global_db_path); + global + .execute( + "INSERT INTO cursorDiskKV (key, value) VALUES (?1, ?2)", + rusqlite::params![ + format!("composerData:{composer_id}"), + serde_json::json!({ + "composerId": composer_id, + "createdAt": updated_at_ms - 5_000, + "lastUpdatedAt": updated_at_ms, + "modelConfig": { "modelName": "claude-opus-4.1" }, + "fullConversationHeadersOnly": [ + { "bubbleId": "bubble-user", "type": 1 }, + { "bubbleId": "bubble-assistant", "type": 2 } + ] + }) + .to_string() + ], + ) + .expect("insert cursor composer"); + global + .execute( + "INSERT INTO cursorDiskKV (key, value) VALUES (?1, ?2)", + rusqlite::params![ + format!("bubbleId:{composer_id}:bubble-user"), + serde_json::json!({ + "type": 1, + "text": user_text, + "createdAt": "2026-04-01T22:42:55.658Z", + "modelInfo": { "modelName": "claude-opus-4.1" } + }) + .to_string() + ], + ) + .expect("insert cursor user bubble"); + global + .execute( + "INSERT INTO cursorDiskKV (key, value) VALUES (?1, ?2)", + rusqlite::params![ + format!("bubbleId:{composer_id}:bubble-assistant"), + serde_json::json!({ + "type": 2, + "markdown": assistant_text, + "createdAt": "2026-04-01T22:42:57.468Z" + }) + .to_string() + ], + ) + .expect("insert cursor assistant bubble"); + + let workspace_dir = agents::app_config_dir_in("Cursor", home) + .join("User") + .join("workspaceStorage") + .join(workspace_id); + tokio::fs::create_dir_all(&workspace_dir) + .await + .expect("create cursor workspace dir"); + tokio::fs::write( + workspace_dir.join("workspace.json"), + serde_json::json!({ + "folder": format!("file://{}", repo_root.to_string_lossy()) + }) + .to_string(), + ) + .await + .expect("write cursor workspace json"); + let workspace = create_cursor_state_db(&workspace_dir.join("state.vscdb")); + workspace + .execute( + "INSERT INTO ItemTable (key, value) VALUES (?1, ?2)", + rusqlite::params![ + "composer.composerData", + serde_json::json!({ + "allComposers": [{ + "composerId": composer_id, + "lastUpdatedAt": updated_at_ms, + "createdAt": updated_at_ms - 5_000, + }] + }) + .to_string() + ], + ) + .expect("insert cursor workspace composer"); + } + + fn create_warp_fixture_db(path: &Path) -> rusqlite::Connection { + let conn = rusqlite::Connection::open(path).expect("open warp fixture db"); + conn.execute_batch( + "CREATE TABLE ai_queries ( + exchange_id TEXT, + conversation_id TEXT, + start_ts INTEGER, + input TEXT, + working_directory TEXT, + output_status TEXT, + model_id TEXT, + planning_model_id TEXT, + coding_model_id TEXT + ); + CREATE TABLE agent_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id TEXT NOT NULL, + task_id TEXT NOT NULL, + task BLOB NOT NULL, + last_modified_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE blocks ( + ai_metadata TEXT, + pwd TEXT + ); + CREATE TABLE agent_conversations ( + conversation_id TEXT, + conversation_data TEXT + );", + ) + .expect("create warp fixture schema"); + conn + } + #[test] fn cli_parses_login_command() { let cli = Cli::parse_from(["cadence", "login"]); @@ -4772,6 +5034,339 @@ mod tests { assert_eq!(requests[0].canonical_repo_root, repo_a.to_string_lossy()); } + #[tokio::test] + #[serial] + async fn run_backfill_inner_discovers_real_world_cursor_sessions_and_ignores_noise() { + let home = TempDir::new().expect("home tempdir"); + let _env = DiscoveryTestEnv::install(home.path()); + + let global_config = home.path().join("global.gitconfig"); + tokio::fs::write(&global_config, "") + .await + .expect("write empty global gitconfig"); + let global_guard = EnvGuard::new("GIT_CONFIG_GLOBAL"); + global_guard.set_path(&global_config); + + let api_url_guard = EnvGuard::new("CADENCE_API_URL"); + + let repo = init_repo().await; + run_git( + repo.path(), + &[ + "remote", + "add", + "origin", + "git@github.com:test-org/example.git", + ], + ) + .await; + + let workspace_key = encode_cursor_workspace_key_for_tests(repo.path()); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + write_cursor_agent_transcript( + home.path(), + &workspace_key, + "cursor-transcript-session", + concat!( + "{\"role\":\"user\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"Explain the architecture of the project\"}]}}\n", + "{\"role\":\"assistant\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"Here is the project architecture.\"}]}}\n" + ), + now, + ) + .await; + write_cursor_project_noise(home.path(), &workspace_key).await; + write_cursor_desktop_session( + home.path(), + "workspace-abc", + "cursor-desktop-session", + repo.path(), + now * 1000, + "Review the deployment flow", + "Deployment flow summary", + ) + .await; + + let server = crate::upload::test_support::spawn_test_upload_server( + crate::upload::test_support::TestUploadServerConfig::default(), + ) + .await + .expect("spawn upload test server"); + api_url_guard.set_str(server.base_url.as_str()); + + let cfg = config::CliConfig { + token: Some("test-token".to_string()), + ..config::CliConfig::default() + }; + cfg.save().await.expect("save config"); + + let files = + agents::discover_recent_sessions_for_backfill(current_unix_epoch(), 86_400).await; + let cursor_files = files + .into_iter() + .filter(|log| log.agent_type == scanner::AgentType::Cursor) + .collect::>(); + assert_eq!(cursor_files.len(), 2); + + let parsed_logs = parse_session_logs_bounded(cursor_files).await; + assert_eq!(parsed_logs.len(), 2); + for parsed in &parsed_logs { + assert_eq!(parsed.metadata.agent_type, Some(scanner::AgentType::Cursor)); + } + + let upload_context = upload::resolve_upload_context(Some(server.base_url.as_str())) + .await + .expect("resolve upload context"); + for parsed in &parsed_logs { + let outcome = upload_session_from_log( + &upload_context, + parsed, + repo.path(), + &repo.path().to_string_lossy(), + PublicationMode::Backfill, + ) + .await; + assert!(matches!(outcome, UploadFromLogOutcome::Uploaded)); + } + + let upload_bodies = server + .upload_requests() + .into_iter() + .map(|request| request.body) + .collect::>(); + assert_eq!(upload_bodies.len(), 2); + assert!( + upload_bodies + .iter() + .any(|body| body.contains("Explain the architecture of the project")) + ); + assert!( + upload_bodies + .iter() + .any(|body| body.contains("Deployment flow summary")) + ); + } + + #[tokio::test] + #[serial] + async fn run_backfill_inner_uploads_warp_session_with_cwd_from_query_context() { + let home = TempDir::new().expect("home tempdir"); + let _env = DiscoveryTestEnv::install(home.path()); + + let global_config = home.path().join("global.gitconfig"); + tokio::fs::write(&global_config, "") + .await + .expect("write empty global gitconfig"); + let global_guard = EnvGuard::new("GIT_CONFIG_GLOBAL"); + global_guard.set_path(&global_config); + let api_url_guard = EnvGuard::new("CADENCE_API_URL"); + let warp_db_guard = EnvGuard::new("WARP_DB_PATH"); + + let repo = init_repo().await; + run_git( + repo.path(), + &[ + "remote", + "add", + "origin", + "git@github.com:test-org/example.git", + ], + ) + .await; + + let warp_db_path = home.path().join("warp.sqlite"); + let conn = create_warp_fixture_db(&warp_db_path); + conn.execute( + "INSERT INTO ai_queries (exchange_id, conversation_id, start_ts, input, output_status, model_id) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + rusqlite::params![ + "ex1", + "warp-query-context", + current_unix_epoch(), + serde_json::json!([ + {"Query": { + "text": "Explain the service boundaries", + "context": {"Directory": {"pwd": repo.path().to_string_lossy().to_string()}} + }} + ]) + .to_string(), + "Succeeded", + "claude-opus-4.1" + ], + ) + .expect("insert warp query row"); + warp_db_guard.set_path(&warp_db_path); + + let server = crate::upload::test_support::spawn_test_upload_server( + crate::upload::test_support::TestUploadServerConfig::default(), + ) + .await + .expect("spawn upload test server"); + api_url_guard.set_str(server.base_url.as_str()); + + let cfg = config::CliConfig { + token: Some("test-token".to_string()), + ..config::CliConfig::default() + }; + cfg.save().await.expect("save config"); + + let files = + agents::discover_recent_sessions_for_backfill(current_unix_epoch(), 86_400).await; + let warp_files = files + .into_iter() + .filter(|log| log.agent_type == scanner::AgentType::Warp) + .collect::>(); + assert_eq!(warp_files.len(), 1); + let parsed_logs = parse_session_logs_bounded(warp_files).await; + assert_eq!(parsed_logs.len(), 1); + assert_eq!( + parsed_logs[0].metadata.session_id.as_deref(), + Some("warp-query-context") + ); + + let upload_context = upload::resolve_upload_context(Some(server.base_url.as_str())) + .await + .expect("resolve upload context"); + let outcome = upload_session_from_log( + &upload_context, + &parsed_logs[0], + repo.path(), + &repo.path().to_string_lossy(), + PublicationMode::Backfill, + ) + .await; + assert!(matches!(outcome, UploadFromLogOutcome::Uploaded)); + + let uploads = server.upload_requests(); + assert_eq!(uploads.len(), 1); + assert!(uploads[0].body.contains("Explain the service boundaries")); + let create_requests = server.create_requests(); + assert_eq!(create_requests.len(), 1); + assert_eq!( + create_requests[0].canonical_repo_root, + repo.path().to_string_lossy() + ); + } + + #[tokio::test] + #[serial] + async fn run_backfill_inner_skips_only_truly_unrecoverable_warp_sessions() { + let home = TempDir::new().expect("home tempdir"); + let _env = DiscoveryTestEnv::install(home.path()); + + let global_config = home.path().join("global.gitconfig"); + tokio::fs::write(&global_config, "") + .await + .expect("write empty global gitconfig"); + let global_guard = EnvGuard::new("GIT_CONFIG_GLOBAL"); + global_guard.set_path(&global_config); + let api_url_guard = EnvGuard::new("CADENCE_API_URL"); + let warp_db_guard = EnvGuard::new("WARP_DB_PATH"); + + let repo = init_repo().await; + run_git( + repo.path(), + &[ + "remote", + "add", + "origin", + "git@github.com:test-org/example.git", + ], + ) + .await; + + let warp_db_path = home.path().join("warp.sqlite"); + let conn = create_warp_fixture_db(&warp_db_path); + conn.execute( + "INSERT INTO ai_queries (exchange_id, conversation_id, start_ts, input, output_status) + VALUES (?1, ?2, ?3, ?4, ?5)", + rusqlite::params![ + "ex-recoverable", + "warp-block-recovery", + current_unix_epoch(), + serde_json::json!({"prompt": "Recover from blocks"}).to_string(), + "Succeeded" + ], + ) + .expect("insert recoverable warp row"); + conn.execute( + "INSERT INTO blocks (ai_metadata, pwd) VALUES (?1, ?2)", + rusqlite::params![ + serde_json::json!({"conversation_id": "warp-block-recovery"}).to_string(), + repo.path().to_string_lossy().to_string() + ], + ) + .expect("insert recoverable warp block"); + conn.execute( + "INSERT INTO ai_queries (exchange_id, conversation_id, start_ts, input, output_status) + VALUES (?1, ?2, ?3, ?4, ?5)", + rusqlite::params![ + "ex-missing", + "warp-unrecoverable", + current_unix_epoch(), + serde_json::json!({"prompt": "No cwd anywhere"}).to_string(), + "Succeeded" + ], + ) + .expect("insert missing warp row"); + warp_db_guard.set_path(&warp_db_path); + + let server = crate::upload::test_support::spawn_test_upload_server( + crate::upload::test_support::TestUploadServerConfig::default(), + ) + .await + .expect("spawn upload test server"); + api_url_guard.set_str(server.base_url.as_str()); + + let cfg = config::CliConfig { + token: Some("test-token".to_string()), + ..config::CliConfig::default() + }; + cfg.save().await.expect("save config"); + + let files = + agents::discover_recent_sessions_for_backfill(current_unix_epoch(), 86_400).await; + let warp_files = files + .into_iter() + .filter(|log| log.agent_type == scanner::AgentType::Warp) + .collect::>(); + assert_eq!(warp_files.len(), 2); + let parsed_logs = parse_session_logs_bounded(warp_files).await; + assert_eq!(parsed_logs.len(), 2); + let mut recoverable = None; + let mut missing = None; + for parsed in parsed_logs { + match parsed.metadata.session_id.as_deref() { + Some("warp-block-recovery") => recoverable = Some(parsed), + Some("warp-unrecoverable") => missing = Some(parsed), + _ => {} + } + } + let recoverable = recoverable.expect("recoverable warp session"); + let missing = missing.expect("missing warp session"); + assert!(missing.metadata.cwd.is_none()); + + let upload_context = upload::resolve_upload_context(Some(server.base_url.as_str())) + .await + .expect("resolve upload context"); + let outcome = upload_session_from_log( + &upload_context, + &recoverable, + repo.path(), + &repo.path().to_string_lossy(), + PublicationMode::Backfill, + ) + .await; + assert!(matches!(outcome, UploadFromLogOutcome::Uploaded)); + + let uploads = server.upload_requests(); + assert_eq!(uploads.len(), 1); + assert!(uploads[0].body.contains("Recover from blocks")); + assert!(!uploads[0].body.contains("No cwd anywhere")); + } + // ----------------------------------------------------------------------- // Uninstall command parsing tests // -----------------------------------------------------------------------