diff --git a/Cargo.lock b/Cargo.lock index 57211eec..f04fce2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,7 @@ version = "2.4.1" dependencies = [ "anyhow", "async-trait", + "base64", "clap", "console", "dialoguer", diff --git a/Cargo.toml b/Cargo.toml index 26b3bcb8..c0c2ff61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ path = "src/bin/cadence-updater.rs" [dependencies] anyhow = "1" async-trait = "0.1" +base64 = "0.22" clap = { version = "4", features = ["derive"] } console = "0.15" dialoguer = "0.11" diff --git a/src/agents/warp.rs b/src/agents/warp.rs index a645b756..17310eb0 100644 --- a/src/agents/warp.rs +++ b/src/agents/warp.rs @@ -1,20 +1,23 @@ //! Warp AI session discovery (SQLite). //! //! Warp stores agent sessions in a local SQLite database (`warp.sqlite`) rather -//! than JSONL log files. This module extracts `ai_queries` and `agent_tasks` -//! rows and normalizes them into Cadence-compatible conversation events. +//! than JSONL log files. This module extracts normalized conversation events +//! plus supplemental raw rows from the Warp tables we currently understand. use std::collections::{BTreeMap, HashMap}; use std::path::{Path, PathBuf}; use std::time::Duration; use async_trait::async_trait; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use protobuf::CodedInputStream; use protobuf::rt::WireType; use rusqlite::types::ValueRef; use rusqlite::{Connection, OpenFlags}; use serde_json::{Map, Value, json}; +use crate::publication::sha256_hex; use crate::scanner::AgentType; use super::{AgentExplorer, SessionLog, SessionSource, home_dir}; @@ -156,10 +159,15 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { let rows = fetch_ai_query_rows(&conn, cutoff); let tasks_by_conversation = fetch_agent_tasks_by_conversation(&conn, cutoff); let conversation_meta = fetch_agent_conversation_meta(&conn, cutoff); - if rows.is_empty() && tasks_by_conversation.is_empty() && conversation_meta.is_empty() { + let conversation_rows_by_id = fetch_agent_conversation_rows(&conn, cutoff); + if rows.is_empty() + && tasks_by_conversation.is_empty() + && conversation_meta.is_empty() + && conversation_rows_by_id.is_empty() + { return out; } - let blocks_by_conversation = fetch_block_cwds_by_conversation(&conn); + let block_rows_by_conversation = fetch_block_rows_by_conversation(&conn); let mut by_conversation: BTreeMap> = BTreeMap::new(); let mut unknown_idx = 0usize; @@ -181,6 +189,9 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { for conversation_id in conversation_meta.keys() { by_conversation.entry(conversation_id.clone()).or_default(); } + for conversation_id in conversation_rows_by_id.keys() { + by_conversation.entry(conversation_id.clone()).or_default(); + } for (conversation_id, rows) in by_conversation { let mut max_start = 0i64; @@ -190,6 +201,8 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { let mut events = Vec::new(); let mut ord = 0usize; let task_rows = tasks_by_conversation.get(&conversation_id); + let conversation_rows = conversation_rows_by_id.get(&conversation_id); + let block_rows = block_rows_by_conversation.get(&conversation_id); for row in &rows { max_start = max_start.max(row.start_ts); @@ -238,10 +251,7 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { } if conversation_cwd.is_none() { - conversation_cwd = blocks_by_conversation - .get(&conversation_id) - .cloned() - .flatten(); + conversation_cwd = block_rows.and_then(|rows| first_non_empty_block_pwd(rows)); } if let Some(task_rows) = task_rows { @@ -356,7 +366,15 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { } } - if deduped.is_empty() { + let raw_events = build_raw_events( + &conversation_id, + &rows, + task_rows, + conversation_rows, + block_rows, + ); + + if deduped.is_empty() && raw_events.is_empty() { continue; } @@ -365,6 +383,11 @@ fn query_warp_db(path: &Path, now: i64, since_secs: i64) -> Vec { lines.push(line); } } + for value in raw_events { + if let Ok(line) = serde_json::to_string(&value) { + lines.push(line); + } + } out.push(SessionLog { agent_type: AgentType::Warp, @@ -384,6 +407,15 @@ fn base_event( conversation_id: &str, timestamp: i64, source: &str, +) -> Map { + base_event_with_timestamp(event_type, conversation_id, Some(timestamp), source) +} + +fn base_event_with_timestamp( + event_type: &str, + conversation_id: &str, + timestamp: Option, + source: &str, ) -> Map { let mut obj = Map::new(); obj.insert("type".to_string(), Value::String(event_type.to_string())); @@ -395,11 +427,186 @@ fn base_event( "conversation_id".to_string(), Value::String(conversation_id.to_string()), ); - obj.insert("timestamp".to_string(), Value::Number(timestamp.into())); + if let Some(timestamp) = timestamp { + obj.insert("timestamp".to_string(), Value::Number(timestamp.into())); + } obj.insert("source".to_string(), Value::String(source.to_string())); obj } +fn build_raw_events( + conversation_id: &str, + ai_query_rows: &[AiQueryRow], + task_rows: Option<&Vec>, + conversation_rows: Option<&Vec>, + block_rows: Option<&Vec>, +) -> Vec { + let mut out = Vec::new(); + + for row in ai_query_rows { + out.push(build_raw_ai_query_event(conversation_id, row)); + } + + if let Some(task_rows) = task_rows { + for row in task_rows { + out.push(build_raw_agent_task_event(conversation_id, row)); + } + } + + if let Some(conversation_rows) = conversation_rows { + for row in conversation_rows { + out.push(build_raw_agent_conversation_event(conversation_id, row)); + } + } + + if let Some(block_rows) = block_rows { + for row in block_rows { + out.push(build_raw_block_event(conversation_id, row)); + } + } + + out +} + +fn build_raw_ai_query_event(conversation_id: &str, row: &AiQueryRow) -> Value { + let mut obj = base_event( + "warp_raw_ai_query", + conversation_id, + row.start_ts, + "warp_raw", + ); + obj.insert( + "warp_source_table".to_string(), + Value::String("ai_queries".to_string()), + ); + if let Some(exchange_id) = row.exchange_id.as_deref() { + obj.insert( + "exchange_id".to_string(), + Value::String(exchange_id.to_string()), + ); + } + + let mut raw = Map::new(); + raw.insert("start_ts".to_string(), Value::Number(row.start_ts.into())); + if let Some(input) = &row.input { + raw.insert("input".to_string(), Value::String(input.clone())); + } + if let Some(working_directory) = &row.working_directory { + raw.insert( + "working_directory".to_string(), + Value::String(working_directory.clone()), + ); + } + if let Some(output_status) = &row.output_status { + raw.insert( + "output_status".to_string(), + Value::String(output_status.clone()), + ); + } + if let Some(model_id) = &row.model_id { + raw.insert("model_id".to_string(), Value::String(model_id.clone())); + } + if let Some(model_id) = &row.planning_model_id { + raw.insert( + "planning_model_id".to_string(), + Value::String(model_id.clone()), + ); + } + if let Some(model_id) = &row.coding_model_id { + raw.insert( + "coding_model_id".to_string(), + Value::String(model_id.clone()), + ); + } + obj.insert("raw".to_string(), Value::Object(raw)); + Value::Object(obj) +} + +fn build_raw_agent_task_event(conversation_id: &str, row: &AgentTaskRow) -> Value { + let mut obj = base_event( + "warp_raw_agent_task", + conversation_id, + row.last_modified_ts, + "warp_raw", + ); + obj.insert( + "warp_source_table".to_string(), + Value::String("agent_tasks".to_string()), + ); + obj.insert("task_id".to_string(), Value::String(row.task_id.clone())); + obj.insert( + "task_blob_sha256".to_string(), + Value::String(sha256_hex(&row.task_blob)), + ); + + let mut raw = Map::new(); + raw.insert( + "last_modified_at".to_string(), + Value::Number(row.last_modified_ts.into()), + ); + raw.insert( + "task_blob_base64".to_string(), + Value::String(BASE64_STANDARD.encode(&row.task_blob)), + ); + obj.insert("raw".to_string(), Value::Object(raw)); + Value::Object(obj) +} + +fn build_raw_agent_conversation_event(conversation_id: &str, row: &WarpConversationRow) -> Value { + let mut obj = base_event_with_timestamp( + "warp_raw_agent_conversation", + conversation_id, + row.last_modified_ts, + "warp_raw", + ); + obj.insert( + "warp_source_table".to_string(), + Value::String("agent_conversations".to_string()), + ); + + let mut raw = Map::new(); + raw.insert( + "conversation_data".to_string(), + Value::String(row.conversation_data.clone()), + ); + if let Some(last_modified_ts) = row.last_modified_ts { + raw.insert( + "last_modified_at".to_string(), + Value::Number(last_modified_ts.into()), + ); + } + obj.insert("raw".to_string(), Value::Object(raw)); + Value::Object(obj) +} + +fn build_raw_block_event(conversation_id: &str, row: &WarpBlockRow) -> Value { + let mut obj = base_event_with_timestamp("warp_raw_block", conversation_id, None, "warp_raw"); + obj.insert( + "warp_source_table".to_string(), + Value::String("blocks".to_string()), + ); + + let mut raw = Map::new(); + raw.insert( + "ai_metadata".to_string(), + Value::String(row.ai_metadata.clone()), + ); + if let Some(pwd) = &row.pwd { + raw.insert("pwd".to_string(), Value::String(pwd.clone())); + } + obj.insert("raw".to_string(), Value::Object(raw)); + Value::Object(obj) +} + +fn first_non_empty_block_pwd(rows: &[WarpBlockRow]) -> Option { + rows.iter().find_map(|row| { + row.pwd + .as_deref() + .and_then(non_empty_str) + .map(ToOwned::to_owned) + }) +} + fn normalize_task_events( conversation_id: &str, task: &AgentTaskRow, @@ -993,7 +1200,70 @@ fn fetch_agent_tasks_by_conversation( out } -fn fetch_block_cwds_by_conversation(conn: &Connection) -> HashMap> { +fn fetch_agent_conversation_rows( + conn: &Connection, + cutoff: i64, +) -> BTreeMap> { + let mut out = BTreeMap::new(); + if !table_exists(conn, "agent_conversations") { + return out; + } + + let ts_expr = normalized_start_ts_sql("last_modified_at"); + let query = format!( + "SELECT conversation_id, conversation_data, {ts_expr} AS ts_epoch + FROM agent_conversations + WHERE conversation_id IS NOT NULL AND {ts_expr} >= ? + ORDER BY conversation_id, ts_epoch" + ); + if let Ok(mut stmt) = conn.prepare(&query) + && let Ok(rows) = stmt.query_map([cutoff], |row| { + Ok(( + row.get::<_, String>(0)?, + row_optional_text(row, 1), + row.get::<_, Option>(2)?, + )) + }) + { + for row in rows.flatten() { + let Some(conversation_data) = row.1 else { + continue; + }; + out.entry(row.0).or_default().push(WarpConversationRow { + conversation_data, + last_modified_ts: row.2, + }); + } + return out; + } + + let Ok(mut stmt) = conn.prepare( + "SELECT conversation_id, conversation_data + FROM agent_conversations + WHERE conversation_id IS NOT NULL + ORDER BY conversation_id", + ) else { + return out; + }; + let Ok(rows) = stmt.query_map([], |row| { + Ok((row.get::<_, String>(0)?, row_optional_text(row, 1))) + }) else { + return out; + }; + + for row in rows.flatten() { + let Some(conversation_data) = row.1 else { + continue; + }; + out.entry(row.0).or_default().push(WarpConversationRow { + conversation_data, + last_modified_ts: None, + }); + } + out +} + +fn fetch_block_rows_by_conversation(conn: &Connection) -> HashMap> { let mut out = HashMap::new(); if !table_exists(conn, "blocks") { return out; @@ -1009,28 +1279,39 @@ fn fetch_block_cwds_by_conversation(conn: &Connection) -> HashMap(&ai_metadata) else { + let Some(conversation_id) = parse_block_conversation_id(&ai_metadata) else { continue; }; - let Some(conversation_id) = value - .get("conversation_id") - .or_else(|| value.pointer("/conversation/id")) - .and_then(Value::as_str) - else { - continue; - }; - out.entry(conversation_id.to_string()).or_insert(Some(cwd)); + out.entry(conversation_id).or_default().push(WarpBlockRow { + ai_metadata, + pwd: row.1, + }); } out } +#[cfg(test)] +fn fetch_block_cwds_by_conversation(conn: &Connection) -> HashMap> { + fetch_block_rows_by_conversation(conn) + .into_iter() + .map(|(conversation_id, rows)| (conversation_id, first_non_empty_block_pwd(&rows))) + .collect() +} + +fn parse_block_conversation_id(ai_metadata: &str) -> Option { + let value = serde_json::from_str::(ai_metadata).ok()?; + value + .get("conversation_id") + .or_else(|| value.pointer("/conversation/id")) + .and_then(Value::as_str) + .and_then(non_empty_str) + .map(ToOwned::to_owned) +} + fn row_optional_text(row: &rusqlite::Row<'_>, idx: usize) -> Option { match row.get_ref(idx).ok()? { ValueRef::Null => None, @@ -1134,6 +1415,18 @@ struct AgentTaskRow { task_blob: Vec, } +#[derive(Debug, Clone)] +struct WarpConversationRow { + conversation_data: String, + last_modified_ts: Option, +} + +#[derive(Debug, Clone)] +struct WarpBlockRow { + ai_metadata: String, + pwd: Option, +} + #[derive(Debug, Clone, Default)] struct WarpConversationMeta { cwd: Option, @@ -1627,6 +1920,13 @@ mod tests { conn } + fn parse_jsonl(content: &str) -> Vec { + content + .lines() + .map(|line| serde_json::from_str::(line).expect("valid jsonl line")) + .collect() + } + fn pb_varint(mut value: u64) -> Vec { let mut out = Vec::new(); loop { @@ -2185,6 +2485,141 @@ mod tests { assert!(!content.contains("\"input\":[]")); } + #[tokio::test] + #[serial] + async fn warp_appends_raw_evidence_lines_without_top_level_content_or_cwd_fields() { + let tmp = TempDir::new().expect("tmp"); + let db_path = tmp.path().join("warp.sqlite"); + let conn = create_db(&db_path); + conn.execute( + "INSERT INTO ai_queries (exchange_id, conversation_id, start_ts, input, working_directory, output_status) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + ( + "ex-raw", + "conv-raw", + 1_772_582_520i64, + r#"{"prompt":"hi from raw"}"#, + "/tmp/repo-normalized", + "Completed", + ), + ) + .expect("insert query"); + conn.execute( + "INSERT INTO agent_tasks (conversation_id, task_id, task, last_modified_at) + VALUES (?1, ?2, ?3, ?4)", + ( + "conv-raw", + "task-raw", + build_task_with_user_assistant_and_tool(), + 1_772_582_690i64, + ), + ) + .expect("insert task"); + conn.execute( + "INSERT INTO agent_conversations (conversation_id, conversation_data) VALUES (?1, ?2)", + ( + "conv-raw", + r#"{"working_directory":"/tmp/repo-conversation-raw","summary":"conversation raw"}"#, + ), + ) + .expect("insert conversation"); + conn.execute( + "INSERT INTO blocks (ai_metadata, pwd) VALUES (?1, ?2)", + ( + r#"{"conversation_id":"conv-raw","requested_command_action_id":"tool-1"}"#, + "/tmp/repo-block-raw", + ), + ) + .expect("insert block"); + + unsafe { + std::env::set_var("WARP_DB_PATH", &db_path); + } + let logs = WarpExplorer.discover_recent(1_772_583_000, 10_000).await; + unsafe { + std::env::remove_var("WARP_DB_PATH"); + } + + let log = logs.first().expect("log"); + let content = match &log.source { + SessionSource::Inline { content, .. } => content, + _ => panic!("expected inline"), + }; + let lines = parse_jsonl(content); + let raw_lines: Vec<&Value> = lines + .iter() + .filter(|value| { + value + .get("type") + .and_then(Value::as_str) + .unwrap_or_default() + .starts_with("warp_raw_") + }) + .collect(); + + assert!( + raw_lines + .iter() + .any(|value| value.get("type").and_then(Value::as_str) == Some("warp_raw_ai_query")) + ); + assert!( + raw_lines + .iter() + .any(|value| value.get("type").and_then(Value::as_str) + == Some("warp_raw_agent_task")) + ); + assert!(raw_lines.iter().any(|value| { + value.get("type").and_then(Value::as_str) == Some("warp_raw_agent_conversation") + })); + assert!( + raw_lines + .iter() + .any(|value| value.get("type").and_then(Value::as_str) == Some("warp_raw_block")) + ); + + for value in raw_lines { + assert!(value.get("content").is_none()); + assert!(value.get("cwd").is_none()); + assert!(value.get("workdir").is_none()); + assert!(value.get("working_directory").is_none()); + assert!(value.get("directory").is_none()); + assert!(value.get("workspacePath").is_none()); + assert!(value.get("workspaceDirectory").is_none()); + assert!(value.get("raw").is_some()); + } + } + + #[test] + fn raw_agent_task_event_preserves_base64_blob_and_sha() { + let row = AgentTaskRow { + conversation_id: "conv-1".to_string(), + task_id: "task-1".to_string(), + last_modified_ts: 1_700_000_000, + task_blob: vec![0x00, 0x01, 0x02, 0xf0, 0xff], + }; + + let event = build_raw_agent_task_event("conv-1", &row); + let expected_sha = crate::publication::sha256_hex(&row.task_blob); + + assert_eq!( + event.get("type").and_then(Value::as_str), + Some("warp_raw_agent_task") + ); + assert_eq!( + event.get("task_blob_sha256").and_then(Value::as_str), + Some(expected_sha.as_str()) + ); + assert!(event.get("content").is_none()); + assert!(event.get("cwd").is_none()); + + let encoded = event + .pointer("/raw/task_blob_base64") + .and_then(Value::as_str) + .expect("task blob base64"); + let decoded = BASE64_STANDARD.decode(encoded).expect("decode base64"); + assert_eq!(decoded, row.task_blob); + } + #[test] fn protobuf_decoder_extracts_field_paths_and_nested_strings() { let blob = build_task_with_user_assistant_and_tool();