From bc332d4327d20c5cc82c57738cea7e69ea26d5ef Mon Sep 17 00:00:00 2001 From: Zack Corr Date: Thu, 9 Apr 2026 12:07:37 +1000 Subject: [PATCH 1/3] feat(opencode): stitch child sessions into root uploads Collapse explicit OpenCode child sessions into their parent upload so Cadence tracks one logical root session instead of separate top-level uploads. Preserve child lineage and merged recency metadata in the synthetic JSONL so backend analysis can still recover nested activity later. --- src/agents/opencode.rs | 834 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 757 insertions(+), 77 deletions(-) diff --git a/src/agents/opencode.rs b/src/agents/opencode.rs index 92971bca..de52c27c 100644 --- a/src/agents/opencode.rs +++ b/src/agents/opencode.rs @@ -76,6 +76,7 @@ fn data_roots_for_home(home: &Path) -> Vec { struct SessionRecord { directory: Option, title: Option, + parent_session_id: Option, source_file: String, created_at: Option, updated_at: Option, @@ -150,6 +151,12 @@ async fn discover_recent_in(roots: &[PathBuf], now: i64, since_secs: i64) -> Vec .get("title") .and_then(Value::as_str) .map(str::to_string), + parent_session_id: value + .get("parentID") + .or_else(|| value.get("parentSessionID")) + .or_else(|| value.get("parent_id")) + .and_then(Value::as_str) + .map(str::to_string), source_file: path.to_string_lossy().to_string(), created_at, updated_at, @@ -280,39 +287,16 @@ fn render_session_logs( mut messages_by_session: BTreeMap>, mut parts_by_session: BTreeMap>, ) -> Vec { - let mut session_ids: HashSet = HashSet::new(); - session_ids.extend(sessions.keys().cloned()); - session_ids.extend(messages_by_session.keys().cloned()); - session_ids.extend(parts_by_session.keys().cloned()); - + let clusters = build_session_clusters(&sessions, &messages_by_session, &parts_by_session); let mut output = Vec::new(); - let mut sorted_ids: Vec<_> = session_ids.into_iter().collect(); - sorted_ids.sort(); - - for session_id in sorted_ids { - let session_record = sessions.get(&session_id); - let mut messages = messages_by_session.remove(&session_id).unwrap_or_default(); - let mut parts = parts_by_session.remove(&session_id).unwrap_or_default(); - - messages.sort_by(|a, b| { - a.created_at - .unwrap_or_default() - .cmp(&b.created_at.unwrap_or_default()) - .then(a.id.cmp(&b.id)) - }); - parts.sort_by(|a, b| { - a.created_at - .unwrap_or_default() - .cmp(&b.created_at.unwrap_or_default()) - .then(a.id.cmp(&b.id)) - }); - + for cluster in clusters { + let session_record = sessions.get(&cluster.root_session_id); let mut lines = Vec::new(); let mut max_updated = 0i64; let session_created = session_record.and_then(|s| s.created_at); let session_updated = session_record.and_then(|s| s.updated_at); - let cwd = session_record.and_then(|s| s.directory.clone()); + let cwd = cluster.cwd.clone(); let title = session_record.and_then(|s| s.title.clone()); let source_file = session_record.map(|s| s.source_file.clone()); @@ -326,12 +310,17 @@ fn render_session_logs( let session_meta = json!({ "type": "session_meta", "source": "opencode", - "sessionID": session_id, - "session_id": session_id, + "sessionID": cluster.root_session_id, + "session_id": cluster.root_session_id, + "rootSessionID": cluster.root_session_id, + "sessionRole": "root", "directory": cwd, - "cwd": session_record.and_then(|s| s.directory.clone()), + "cwd": cwd, "title": title, "source_file": source_file, + "clusterSessionCount": cluster.session_ids.len(), + "childSessionIDs": cluster.child_session_ids, + "stitchedFromSessionIDs": cluster.session_ids, "time": { "created": session_created, "updated": session_updated, @@ -340,43 +329,154 @@ fn render_session_logs( }); lines.push(session_meta.to_string()); - for message in messages { - if let Some(ts) = message.created_at.or(message.file_mtime) { + let mut ordered_records = Vec::new(); + + for session_id in &cluster.session_ids { + if let Some(record) = sessions.get(session_id) + && let Some(ts) = record + .updated_at + .or(record.created_at) + .or(record.file_mtime) + { max_updated = max_updated.max(ts); } - let event = json!({ - "type": "message", - "source": "opencode", - "sessionID": message.session_id, - "messageID": message.id, - "role": message.role, - "time": { - "created": message.created_at, - }, - "source_file": message.source_file, - "payload": message.raw, - }); - lines.push(event.to_string()); + + if session_id == &cluster.root_session_id { + continue; + } + + if let Some(record) = sessions.get(session_id) { + ordered_records.push(RenderedClusterRecord { + timestamp: record + .created_at + .or(record.updated_at) + .or(record.file_mtime), + kind_rank: 0, + session_id: session_id.clone(), + record_id: session_id.clone(), + line: json!({ + "type": "session_member", + "source": "opencode", + "rootSessionID": cluster.root_session_id, + "originSessionID": session_id, + "sessionRole": "child", + "parentSessionID": session_parent_link(record), + "time": { + "created": record.created_at, + "updated": record.updated_at, + }, + "source_file": record.source_file, + "payload": record.raw, + }) + .to_string(), + }); + } } - for part in parts { - if let Some(ts) = part.created_at.or(part.file_mtime) { - max_updated = max_updated.max(ts); + for session_id in &cluster.session_ids { + let mut messages = messages_by_session.remove(session_id).unwrap_or_default(); + messages.sort_by(|a, b| { + a.created_at + .unwrap_or_default() + .cmp(&b.created_at.unwrap_or_default()) + .then(a.id.cmp(&b.id)) + }); + + let parent_session_id = sessions.get(session_id).and_then(session_parent_link); + let session_role = if session_id == &cluster.root_session_id { + "root" + } else { + "child" + }; + + for message in messages { + if let Some(ts) = message.created_at.or(message.file_mtime) { + max_updated = max_updated.max(ts); + } + ordered_records.push(RenderedClusterRecord { + timestamp: message.created_at.or(message.file_mtime), + kind_rank: 1, + session_id: message.session_id.clone(), + record_id: message.id.clone(), + line: json!({ + "type": "message", + "source": "opencode", + "rootSessionID": cluster.root_session_id, + "originSessionID": message.session_id, + "sessionRole": session_role, + "parentSessionID": parent_session_id, + "sessionID": message.session_id, + "messageID": message.id, + "role": message.role, + "time": { + "created": message.created_at, + }, + "source_file": message.source_file, + "payload": message.raw, + }) + .to_string(), + }); } - let event = json!({ - "type": "part", - "source": "opencode", - "sessionID": part.session_id, - "messageID": part.message_id, - "partID": part.id, - "partType": part.part_type, - "time": { - "created": part.created_at, - }, - "source_file": part.source_file, - "payload": part.raw, + } + + for session_id in &cluster.session_ids { + let mut parts = parts_by_session.remove(session_id).unwrap_or_default(); + parts.sort_by(|a, b| { + a.created_at + .unwrap_or_default() + .cmp(&b.created_at.unwrap_or_default()) + .then(a.id.cmp(&b.id)) }); - lines.push(event.to_string()); + + let parent_session_id = sessions.get(session_id).and_then(session_parent_link); + let session_role = if session_id == &cluster.root_session_id { + "root" + } else { + "child" + }; + + for part in parts { + if let Some(ts) = part.created_at.or(part.file_mtime) { + max_updated = max_updated.max(ts); + } + ordered_records.push(RenderedClusterRecord { + timestamp: part.created_at.or(part.file_mtime), + kind_rank: 2, + session_id: part.session_id.clone().unwrap_or_default(), + record_id: part.id.clone(), + line: json!({ + "type": "part", + "source": "opencode", + "rootSessionID": cluster.root_session_id, + "originSessionID": part.session_id, + "sessionRole": session_role, + "parentSessionID": parent_session_id, + "sessionID": part.session_id, + "messageID": part.message_id, + "partID": part.id, + "partType": part.part_type, + "time": { + "created": part.created_at, + }, + "source_file": part.source_file, + "payload": part.raw, + }) + .to_string(), + }); + } + } + + ordered_records.sort_by(|a, b| { + a.timestamp + .unwrap_or_default() + .cmp(&b.timestamp.unwrap_or_default()) + .then(a.kind_rank.cmp(&b.kind_rank)) + .then(a.session_id.cmp(&b.session_id)) + .then(a.record_id.cmp(&b.record_id)) + }); + + for record in ordered_records { + lines.push(record.line); } if max_updated == 0 { @@ -389,7 +489,7 @@ fn render_session_logs( output.push(SessionLog { agent_type: AgentType::OpenCode, source: SessionSource::Inline { - label: format!("opencode:{session_id}"), + label: format!("opencode:{}", cluster.root_session_id), content: lines.join("\n"), }, updated_at: Some(max_updated), @@ -399,6 +499,199 @@ fn render_session_logs( output } +#[derive(Debug, Clone)] +struct OpenCodeSessionCluster { + root_session_id: String, + session_ids: Vec, + child_session_ids: Vec, + cwd: Option, +} + +#[derive(Debug, Clone)] +struct RenderedClusterRecord { + timestamp: Option, + kind_rank: u8, + session_id: String, + record_id: String, + line: String, +} + +fn build_session_clusters( + sessions: &HashMap, + messages_by_session: &BTreeMap>, + parts_by_session: &BTreeMap>, +) -> Vec { + let mut session_ids: HashSet = HashSet::new(); + session_ids.extend(sessions.keys().cloned()); + session_ids.extend(messages_by_session.keys().cloned()); + session_ids.extend(parts_by_session.keys().cloned()); + + let mut root_to_sessions: BTreeMap> = BTreeMap::new(); + let mut resolved_roots: HashMap = HashMap::new(); + + for session_id in session_ids { + let root_session_id = resolve_root_session_id(&session_id, sessions, &mut resolved_roots); + root_to_sessions + .entry(root_session_id) + .or_default() + .push(session_id); + } + + let mut clusters = Vec::new(); + for (root_session_id, mut cluster_session_ids) in root_to_sessions { + cluster_session_ids + .sort_by(|a, b| session_sort_tuple(a, sessions).cmp(&session_sort_tuple(b, sessions))); + if let Some(root_idx) = cluster_session_ids + .iter() + .position(|id| id == &root_session_id) + { + cluster_session_ids.swap(0, root_idx); + } + + let cwd = sessions + .get(&root_session_id) + .and_then(|record| record.directory.clone()) + .or_else(|| { + cluster_session_ids.iter().find_map(|session_id| { + sessions + .get(session_id) + .and_then(|record| record.directory.clone()) + }) + }); + + let child_session_ids = cluster_session_ids + .iter() + .filter(|session_id| *session_id != &root_session_id) + .cloned() + .collect(); + + clusters.push(OpenCodeSessionCluster { + root_session_id, + session_ids: cluster_session_ids, + child_session_ids, + cwd, + }); + } + + clusters.sort_by(|a, b| { + cluster_sort_tuple(a, sessions) + .cmp(&cluster_sort_tuple(b, sessions)) + .then(a.root_session_id.cmp(&b.root_session_id)) + }); + clusters +} + +fn resolve_root_session_id( + session_id: &str, + sessions: &HashMap, + resolved_roots: &mut HashMap, +) -> String { + if let Some(root) = resolved_roots.get(session_id) { + return root.clone(); + } + + let mut lineage = Vec::new(); + let mut seen = HashSet::new(); + let mut current = session_id.to_string(); + + loop { + if let Some(root) = resolved_roots.get(¤t) { + let root = root.clone(); + for id in lineage { + resolved_roots.insert(id, root.clone()); + } + return root; + } + + if !seen.insert(current.clone()) { + let mut cycle: Vec<_> = seen.into_iter().collect(); + cycle.sort_by(|a, b| { + session_sort_tuple(a, sessions).cmp(&session_sort_tuple(b, sessions)) + }); + let root = cycle + .into_iter() + .next() + .unwrap_or_else(|| session_id.to_string()); + for id in lineage { + resolved_roots.insert(id, root.clone()); + } + return root; + } + + lineage.push(current.clone()); + let Some(parent_session_id) = sessions + .get(¤t) + .and_then(session_parent_link) + .filter(|parent| parent != ¤t) + else { + for id in lineage { + resolved_roots.insert(id, current.clone()); + } + return current; + }; + + if !sessions.contains_key(&parent_session_id) { + for id in lineage { + resolved_roots.insert(id, current.clone()); + } + return current; + } + + current = parent_session_id; + } +} + +fn session_parent_link(record: &SessionRecord) -> Option { + record.parent_session_id.clone().or_else(|| { + record + .raw + .get("parentID") + .or_else(|| record.raw.get("parentSessionID")) + .or_else(|| record.raw.get("parent_id")) + .and_then(Value::as_str) + .map(str::to_string) + }) +} + +fn session_sort_tuple( + session_id: &str, + sessions: &HashMap, +) -> (i64, i64, String) { + let created = sessions + .get(session_id) + .and_then(|record| record.created_at) + .unwrap_or_default(); + let updated = sessions + .get(session_id) + .and_then(|record| record.updated_at.or(record.file_mtime)) + .unwrap_or_default(); + (created, updated, session_id.to_string()) +} + +fn cluster_sort_tuple( + cluster: &OpenCodeSessionCluster, + sessions: &HashMap, +) -> (i64, i64) { + let created = sessions + .get(&cluster.root_session_id) + .and_then(|record| record.created_at) + .unwrap_or_default(); + let updated = cluster + .session_ids + .iter() + .filter_map(|session_id| { + sessions.get(session_id).and_then(|record| { + record + .updated_at + .or(record.created_at) + .or(record.file_mtime) + }) + }) + .max() + .unwrap_or_default(); + (created, updated) +} + fn merge_session_records( sessions: &mut HashMap, incoming: Vec<(String, SessionRecord)>, @@ -414,9 +707,24 @@ fn merge_session_records( .updated_at .or(existing.file_mtime) .unwrap_or_default(); - if record_recency > existing_recency { - sessions.insert(session_id, record); + + let (mut merged, fallback) = if record_recency > existing_recency { + (record, existing) + } else { + (existing.clone(), &record) + }; + + if merged.parent_session_id.is_none() { + merged.parent_session_id = fallback.parent_session_id.clone(); + } + if merged.directory.is_none() { + merged.directory = fallback.directory.clone(); + } + if merged.title.is_none() { + merged.title = fallback.title.clone(); } + + sessions.insert(session_id, merged); } } } @@ -497,11 +805,12 @@ fn query_recent_db_records(root: &Path, cutoff: i64) -> Option { if recent_session_ids.is_empty() { return Some(DbRecords::default()); } + let cluster_session_ids = db_session_ids_with_ancestors(&conn, &recent_session_ids); Some(DbRecords { - sessions: fetch_db_sessions(&conn, &db_path, &recent_session_ids), - messages: fetch_db_messages(&conn, &db_path, &recent_session_ids), - parts: fetch_db_parts(&conn, &db_path, &recent_session_ids), + sessions: fetch_db_sessions(&conn, &db_path, &cluster_session_ids), + messages: fetch_db_messages(&conn, &db_path, &cluster_session_ids), + parts: fetch_db_parts(&conn, &db_path, &cluster_session_ids), }) } @@ -527,6 +836,44 @@ fn recent_db_session_ids(conn: &Connection, cutoff: i64) -> HashSet { ids } +fn db_session_ids_with_ancestors( + conn: &Connection, + session_ids: &HashSet, +) -> HashSet { + let mut all_ids = session_ids.clone(); + let mut frontier = session_ids.clone(); + + while !frontier.is_empty() { + let placeholders = sql_in_placeholders(frontier.len()); + let Ok(mut stmt) = conn.prepare(&format!( + "SELECT id, parent_id FROM session WHERE id IN ({placeholders})" + )) else { + break; + }; + + let Ok(rows) = stmt.query_map(params_from_iter(frontier.iter()), |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, Option>(1).ok().flatten(), + )) + }) else { + break; + }; + + let mut next_frontier = HashSet::new(); + for (_, parent_id) in rows.flatten() { + if let Some(parent_id) = parent_id + && all_ids.insert(parent_id.clone()) + { + next_frontier.insert(parent_id); + } + } + frontier = next_frontier; + } + + all_ids +} + fn fetch_db_sessions( conn: &Connection, db_path: &Path, @@ -538,11 +885,45 @@ fn fetch_db_sessions( let placeholders = sql_in_placeholders(session_ids.len()); - if let Ok(mut stmt) = - conn.prepare(&format!( - "SELECT id, directory, title, time_created, time_updated, data FROM session WHERE id IN ({placeholders})" - )) - { + if let Ok(mut stmt) = conn.prepare(&format!( + "SELECT id, parent_id, directory, title, time_created, time_updated, data FROM session WHERE id IN ({placeholders})" + )) { + let Ok(rows) = stmt.query_map(params_from_iter(session_ids.iter()), |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, Option>(1).ok().flatten(), + row.get::<_, String>(2).ok(), + row.get::<_, String>(3).ok(), + row.get::<_, i64>(4).ok(), + row.get::<_, i64>(5).ok(), + row.get::<_, String>(6).ok(), + )) + }) else { + return Vec::new(); + }; + + return rows + .flatten() + .map( + |(session_id, parent_session_id, directory, title, time_created, time_updated, raw_json)| { + build_db_session_record( + db_path, + session_id, + parent_session_id, + directory, + title, + time_created, + time_updated, + raw_json, + ) + }, + ) + .collect(); + } + + if let Ok(mut stmt) = conn.prepare(&format!( + "SELECT id, directory, title, time_created, time_updated, data FROM session WHERE id IN ({placeholders})" + )) { let Ok(rows) = stmt.query_map(params_from_iter(session_ids.iter()), |row| { Ok(( row.get::<_, String>(0)?, @@ -563,6 +944,7 @@ fn fetch_db_sessions( build_db_session_record( db_path, session_id, + None, directory, title, time_created, @@ -574,10 +956,44 @@ fn fetch_db_sessions( .collect(); } + if let Ok(mut stmt) = conn.prepare(&format!( + "SELECT id, parent_id, directory, title, time_created, time_updated FROM session WHERE id IN ({placeholders})" + )) { + let Ok(rows) = stmt.query_map(params_from_iter(session_ids.iter()), |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, Option>(1).ok().flatten(), + row.get::<_, String>(2).ok(), + row.get::<_, String>(3).ok(), + row.get::<_, i64>(4).ok(), + row.get::<_, i64>(5).ok(), + )) + }) else { + return Vec::new(); + }; + + return rows + .flatten() + .map( + |(session_id, parent_session_id, directory, title, time_created, time_updated)| { + build_db_session_record( + db_path, + session_id, + parent_session_id, + directory, + title, + time_created, + time_updated, + None, + ) + }, + ) + .collect(); + } + let Ok(mut stmt) = conn.prepare(&format!( "SELECT id, directory, title, time_created, time_updated FROM session WHERE id IN ({placeholders})" - )) - else { + )) else { return Vec::new(); }; @@ -599,6 +1015,7 @@ fn fetch_db_sessions( build_db_session_record( db_path, session_id, + None, directory, title, time_created, @@ -613,6 +1030,7 @@ fn fetch_db_sessions( fn build_db_session_record( db_path: &Path, session_id: String, + parent_session_id: Option, directory: Option, title: Option, time_created: Option, @@ -626,9 +1044,10 @@ fn build_db_session_record( let raw = merge_json_object( raw, json!({ - "id": session_id, - "directory": directory, - "title": title, + "id": session_id.clone(), + "parentID": parent_session_id.clone(), + "directory": directory.clone(), + "title": title.clone(), "time": { "created": normalize_db_epoch(time_created), "updated": normalize_db_epoch(time_updated), @@ -638,6 +1057,7 @@ fn build_db_session_record( let record = SessionRecord { directory, title, + parent_session_id, source_file: format!("{}#session/{}", db_path.display(), session_id), created_at: normalize_db_epoch(time_created), updated_at: normalize_db_epoch(time_updated), @@ -891,6 +1311,56 @@ mod tests { tokio::fs::write(path, value).await.unwrap(); } + fn session_record( + directory: Option<&str>, + parent_session_id: Option<&str>, + created_at: Option, + updated_at: Option, + ) -> SessionRecord { + SessionRecord { + directory: directory.map(str::to_string), + title: Some("session".to_string()), + parent_session_id: parent_session_id.map(str::to_string), + source_file: "session.json".to_string(), + created_at, + updated_at, + file_mtime: None, + raw: json!({ + "directory": directory, + "parentID": parent_session_id, + "time": { + "created": created_at, + "updated": updated_at, + } + }), + } + } + + fn message_record(session_id: &str, id: &str, created_at: i64) -> MessageRecord { + MessageRecord { + id: id.to_string(), + session_id: session_id.to_string(), + role: Some("assistant".to_string()), + source_file: format!("{id}.json"), + created_at: Some(created_at), + file_mtime: None, + raw: json!({ + "id": id, + "sessionID": session_id, + "time": { + "created": created_at, + } + }), + } + } + + fn inline_content(log: &SessionLog) -> &str { + match &log.source { + SessionSource::Inline { content, .. } => content, + SessionSource::File(_) => panic!("expected inline session"), + } + } + #[tokio::test] async fn test_opencode_normalizes_session_messages_and_parts() { let tmp = tempfile::TempDir::new().unwrap(); @@ -1316,6 +1786,216 @@ mod tests { } } + #[test] + fn test_opencode_renders_one_log_for_root_and_child_cluster() { + let mut sessions = HashMap::new(); + sessions.insert( + "ses_root".to_string(), + session_record(Some("/repo"), None, Some(100), Some(150)), + ); + sessions.insert( + "ses_child".to_string(), + session_record(Some("/repo"), Some("ses_root"), Some(120), Some(180)), + ); + + let mut messages_by_session = BTreeMap::new(); + messages_by_session.insert( + "ses_child".to_string(), + vec![message_record("ses_child", "msg_child", 170)], + ); + + let logs = render_session_logs(0, sessions, messages_by_session, BTreeMap::new()); + assert_eq!(logs.len(), 1); + match &logs[0].source { + SessionSource::Inline { label, content } => { + assert_eq!(label, "opencode:ses_root"); + assert!(content.contains("\"clusterSessionCount\":2")); + assert!(content.contains("\"childSessionIDs\":[\"ses_child\"]")); + assert!(content.contains("\"type\":\"session_member\"")); + assert!(content.contains("\"originSessionID\":\"ses_child\"")); + assert!(content.contains("\"parentSessionID\":\"ses_root\"")); + let session_meta_idx = content.find("\"type\":\"session_meta\"").unwrap(); + let session_member_idx = content.find("\"type\":\"session_member\"").unwrap(); + let child_message_idx = content.find("\"messageID\":\"msg_child\"").unwrap(); + assert!(session_meta_idx < session_member_idx); + assert!(session_member_idx < child_message_idx); + } + SessionSource::File(_) => panic!("expected inline session"), + } + } + + #[test] + fn test_opencode_renders_multiple_children_under_one_root() { + let mut sessions = HashMap::new(); + sessions.insert( + "ses_root".to_string(), + session_record(Some("/repo"), None, Some(100), Some(150)), + ); + sessions.insert( + "ses_child_a".to_string(), + session_record(Some("/repo"), Some("ses_root"), Some(120), Some(170)), + ); + sessions.insert( + "ses_child_b".to_string(), + session_record(Some("/repo"), Some("ses_root"), Some(125), Some(190)), + ); + + let logs = render_session_logs(0, sessions, BTreeMap::new(), BTreeMap::new()); + assert_eq!(logs.len(), 1); + let content = inline_content(&logs[0]); + assert!(content.contains("\"childSessionIDs\":[\"ses_child_a\",\"ses_child_b\"]")); + } + + #[test] + fn test_opencode_leaves_unrelated_sessions_separate() { + let mut sessions = HashMap::new(); + sessions.insert( + "ses_one".to_string(), + session_record(Some("/repo-one"), None, Some(100), Some(150)), + ); + sessions.insert( + "ses_two".to_string(), + session_record(Some("/repo-two"), None, Some(200), Some(250)), + ); + + let logs = render_session_logs(0, sessions, BTreeMap::new(), BTreeMap::new()); + assert_eq!(logs.len(), 2); + } + + #[test] + fn test_opencode_cluster_recency_tracks_newer_child_activity() { + let mut sessions = HashMap::new(); + sessions.insert( + "ses_root".to_string(), + session_record(Some("/repo"), None, Some(100), Some(110)), + ); + sessions.insert( + "ses_child".to_string(), + session_record(Some("/repo"), Some("ses_root"), Some(120), Some(130)), + ); + + let mut messages_by_session = BTreeMap::new(); + messages_by_session.insert( + "ses_child".to_string(), + vec![message_record("ses_child", "msg_child", 400)], + ); + + let logs = render_session_logs(0, sessions, messages_by_session, BTreeMap::new()); + assert_eq!(logs[0].updated_at, Some(400)); + } + + #[test] + fn test_opencode_root_cwd_falls_back_to_child_directory() { + let mut sessions = HashMap::new(); + sessions.insert( + "ses_root".to_string(), + session_record(None, None, Some(100), Some(150)), + ); + sessions.insert( + "ses_child".to_string(), + session_record( + Some("/repo-from-child"), + Some("ses_root"), + Some(120), + Some(180), + ), + ); + + let logs = render_session_logs(0, sessions, BTreeMap::new(), BTreeMap::new()); + let content = inline_content(&logs[0]); + let metadata = crate::scanner::parse_session_metadata_str(content); + assert_eq!(metadata.session_id, Some("ses_root".to_string())); + assert_eq!(metadata.cwd, Some("/repo-from-child".to_string())); + } + + #[tokio::test] + async fn test_opencode_discovers_child_activity_under_root_upload_identity() { + let tmp = tempfile::TempDir::new().unwrap(); + let root = tmp.path(); + let db_path = root.join("opencode.db"); + let conn = Connection::open(&db_path).unwrap(); + + conn.execute_batch( + " + CREATE TABLE session ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + parent_id TEXT, + directory TEXT NOT NULL, + title TEXT NOT NULL, + version TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL, + data TEXT NOT NULL + ); + CREATE TABLE message ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL, + data TEXT NOT NULL + ); + CREATE TABLE part ( + id TEXT PRIMARY KEY, + message_id TEXT NOT NULL, + session_id TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_updated INTEGER NOT NULL, + data TEXT NOT NULL + ); + ", + ) + .unwrap(); + + conn.execute( + "INSERT INTO session (id, project_id, parent_id, directory, title, version, time_created, time_updated, data) + VALUES (?1, 'workspace', NULL, '/repo', 'root', '1.0.0', ?2, ?3, ?4)", + rusqlite::params![ + "ses_root", + 1_700_000_000_000_i64, + 1_700_000_001_000_i64, + r#"{"slug":"root"}"#, + ], + ) + .unwrap(); + conn.execute( + "INSERT INTO session (id, project_id, parent_id, directory, title, version, time_created, time_updated, data) + VALUES (?1, 'workspace', ?2, '/repo', 'child', '1.0.0', ?3, ?4, ?5)", + rusqlite::params![ + "ses_child", + "ses_root", + 1_700_000_100_000_i64, + 1_700_000_200_000_i64, + r#"{"slug":"child"}"#, + ], + ) + .unwrap(); + conn.execute( + "INSERT INTO message (id, session_id, time_created, time_updated, data) + VALUES (?1, ?2, ?3, ?4, ?5)", + rusqlite::params![ + "msg_child", + "ses_child", + 1_700_000_300_000_i64, + 1_700_000_300_000_i64, + r#"{"role":"assistant","time":{"created":1700000300000}}"#, + ], + ) + .unwrap(); + + let logs = discover_recent_in(&[root.to_path_buf()], 1_700_000_400, 500).await; + assert_eq!(logs.len(), 1); + match &logs[0].source { + SessionSource::Inline { label, content } => { + assert_eq!(label, "opencode:ses_root"); + assert!(content.contains("\"originSessionID\":\"ses_child\"")); + assert!(content.contains("\"parentSessionID\":\"ses_root\"")); + } + SessionSource::File(_) => panic!("expected inline session"), + } + assert_eq!(logs[0].updated_at, Some(1_700_000_300)); + } + #[test] fn test_data_roots_include_xdg_location() { let home = Path::new("/Users/tester"); From 621f9c70c037bc616b805619381170050224cca5 Mon Sep 17 00:00:00 2001 From: Zack Corr Date: Thu, 9 Apr 2026 12:30:42 +1000 Subject: [PATCH 2/3] fix(opencode): satisfy CI lint rules Refactor the OpenCode session stitching helpers to satisfy clippy without changing behavior. Replace the manual cluster sort closure and bundle DB session row fields into a struct so the CI check passes across platforms. --- src/agents/opencode.rs | 98 +++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 40 deletions(-) diff --git a/src/agents/opencode.rs b/src/agents/opencode.rs index de52c27c..66bee8e7 100644 --- a/src/agents/opencode.rs +++ b/src/agents/opencode.rs @@ -539,8 +539,7 @@ fn build_session_clusters( let mut clusters = Vec::new(); for (root_session_id, mut cluster_session_ids) in root_to_sessions { - cluster_session_ids - .sort_by(|a, b| session_sort_tuple(a, sessions).cmp(&session_sort_tuple(b, sessions))); + cluster_session_ids.sort_by_key(|session_id| session_sort_tuple(session_id, sessions)); if let Some(root_idx) = cluster_session_ids .iter() .position(|id| id == &root_session_id) @@ -790,6 +789,16 @@ struct DbRecords { parts: Vec, } +struct DbSessionRowData { + session_id: String, + parent_session_id: Option, + directory: Option, + title: Option, + time_created: Option, + time_updated: Option, + raw_json: Option, +} + fn query_recent_db_records(root: &Path, cutoff: i64) -> Option { let db_path = root.join("opencode.db"); if !db_path.exists() { @@ -908,13 +917,15 @@ fn fetch_db_sessions( |(session_id, parent_session_id, directory, title, time_created, time_updated, raw_json)| { build_db_session_record( db_path, - session_id, - parent_session_id, - directory, - title, - time_created, - time_updated, - raw_json, + DbSessionRowData { + session_id, + parent_session_id, + directory, + title, + time_created, + time_updated, + raw_json, + }, ) }, ) @@ -943,13 +954,15 @@ fn fetch_db_sessions( |(session_id, directory, title, time_created, time_updated, raw_json)| { build_db_session_record( db_path, - session_id, - None, - directory, - title, - time_created, - time_updated, - raw_json, + DbSessionRowData { + session_id, + parent_session_id: None, + directory, + title, + time_created, + time_updated, + raw_json, + }, ) }, ) @@ -978,13 +991,15 @@ fn fetch_db_sessions( |(session_id, parent_session_id, directory, title, time_created, time_updated)| { build_db_session_record( db_path, - session_id, - parent_session_id, - directory, - title, - time_created, - time_updated, - None, + DbSessionRowData { + session_id, + parent_session_id, + directory, + title, + time_created, + time_updated, + raw_json: None, + }, ) }, ) @@ -1014,29 +1029,32 @@ fn fetch_db_sessions( |(session_id, directory, title, time_created, time_updated)| { build_db_session_record( db_path, - session_id, - None, - directory, - title, - time_created, - time_updated, - None, + DbSessionRowData { + session_id, + parent_session_id: None, + directory, + title, + time_created, + time_updated, + raw_json: None, + }, ) }, ) .collect() } -fn build_db_session_record( - db_path: &Path, - session_id: String, - parent_session_id: Option, - directory: Option, - title: Option, - time_created: Option, - time_updated: Option, - raw_json: Option, -) -> (String, SessionRecord) { +fn build_db_session_record(db_path: &Path, row: DbSessionRowData) -> (String, SessionRecord) { + let DbSessionRowData { + session_id, + parent_session_id, + directory, + title, + time_created, + time_updated, + raw_json, + } = row; + let raw = raw_json .as_deref() .and_then(parse_json) From 36652cb7688ff411bc610232efaf0c7377b304da Mon Sep 17 00:00:00 2001 From: Zack Corr Date: Thu, 9 Apr 2026 12:37:48 +1000 Subject: [PATCH 3/3] fix(opencode): load parent file sessions outside cutoff Preserve root-based OpenCode upload identity when a recent file-backed child references an older parent session JSON. Hydrate missing parent session records from disk before clustering and cover the cutoff regression with a dedicated test. --- src/agents/opencode.rs | 215 +++++++++++++++++++++++++++++++---------- 1 file changed, 163 insertions(+), 52 deletions(-) diff --git a/src/agents/opencode.rs b/src/agents/opencode.rs index 66bee8e7..fc88dda5 100644 --- a/src/agents/opencode.rs +++ b/src/agents/opencode.rs @@ -120,68 +120,24 @@ async fn discover_recent_in(roots: &[PathBuf], now: i64, since_secs: i64) -> Vec let message_dir = root.join("storage").join("message"); let part_dir = root.join("storage").join("part"); + let mut recent_file_session_ids = HashSet::new(); + for candidate in collect_recent_json_files(&session_dir, cutoff).await { let path = candidate.path; let Some(value) = read_json(&path).await else { continue; }; - let Some(session_id) = value - .get("id") - .or_else(|| value.get("sessionID")) - .and_then(Value::as_str) + let Some((session_id, record)) = + parse_session_record(value, path, candidate.mtime_epoch) else { continue; }; - let session_id = session_id.to_string(); - - let created_at = value - .pointer("/time/created") - .and_then(parse_epoch_from_json_value); - let updated_at = value - .pointer("/time/updated") - .and_then(parse_epoch_from_json_value) - .or(created_at); - - let record = SessionRecord { - directory: value - .get("directory") - .and_then(Value::as_str) - .map(str::to_string), - title: value - .get("title") - .and_then(Value::as_str) - .map(str::to_string), - parent_session_id: value - .get("parentID") - .or_else(|| value.get("parentSessionID")) - .or_else(|| value.get("parent_id")) - .and_then(Value::as_str) - .map(str::to_string), - source_file: path.to_string_lossy().to_string(), - created_at, - updated_at, - file_mtime: candidate.mtime_epoch, - raw: value, - }; - - match sessions.get(&session_id) { - None => { - sessions.insert(session_id, record); - } - Some(existing) => { - let record_recency = - record.updated_at.or(record.file_mtime).unwrap_or_default(); - let existing_recency = existing - .updated_at - .or(existing.file_mtime) - .unwrap_or_default(); - if record_recency > existing_recency { - sessions.insert(session_id, record); - } - } - } + recent_file_session_ids.insert(session_id.clone()); + merge_session_records(&mut sessions, vec![(session_id, record)]); } + load_session_file_ancestors(&session_dir, &mut sessions, &recent_file_session_ids).await; + for candidate in collect_recent_json_files(&message_dir, cutoff).await { let path = candidate.path; let Some(value) = read_json(&path).await else { @@ -281,6 +237,93 @@ async fn discover_recent_in(roots: &[PathBuf], now: i64, since_secs: i64) -> Vec render_session_logs(cutoff, sessions, messages_by_session, parts_by_session) } +fn parse_session_record( + value: Value, + path: PathBuf, + file_mtime: Option, +) -> Option<(String, SessionRecord)> { + let session_id = value + .get("id") + .or_else(|| value.get("sessionID")) + .and_then(Value::as_str)? + .to_string(); + + let created_at = value + .pointer("/time/created") + .and_then(parse_epoch_from_json_value); + let updated_at = value + .pointer("/time/updated") + .and_then(parse_epoch_from_json_value) + .or(created_at); + + Some(( + session_id, + SessionRecord { + directory: value + .get("directory") + .and_then(Value::as_str) + .map(str::to_string), + title: value + .get("title") + .and_then(Value::as_str) + .map(str::to_string), + parent_session_id: value + .get("parentID") + .or_else(|| value.get("parentSessionID")) + .or_else(|| value.get("parent_id")) + .and_then(Value::as_str) + .map(str::to_string), + source_file: path.to_string_lossy().to_string(), + created_at, + updated_at, + file_mtime, + raw: value, + }, + )) +} + +async fn load_session_file_ancestors( + session_dir: &Path, + sessions: &mut HashMap, + initial_session_ids: &HashSet, +) { + let mut pending: Vec = initial_session_ids.iter().cloned().collect(); + let mut seen = HashSet::new(); + + while let Some(session_id) = pending.pop() { + if !seen.insert(session_id.clone()) { + continue; + } + + let Some(parent_session_id) = sessions + .get(&session_id) + .and_then(session_parent_link) + .filter(|parent| !sessions.contains_key(parent)) + else { + continue; + }; + + let Some(path) = find_session_json_file(session_dir, &parent_session_id).await else { + continue; + }; + let mtime_epoch = file_mtime_epoch(&path).await; + let Some(value) = read_json(&path).await else { + continue; + }; + let Some((loaded_session_id, record)) = parse_session_record(value, path, mtime_epoch) + else { + continue; + }; + + if loaded_session_id != parent_session_id { + continue; + } + + merge_session_records(sessions, vec![(loaded_session_id.clone(), record)]); + pending.push(loaded_session_id); + } +} + fn render_session_logs( cutoff: i64, sessions: HashMap, @@ -1281,6 +1324,36 @@ async fn collect_recent_json_files(root: &Path, cutoff: i64) -> Vec Option { + let mut stack = vec![root.to_path_buf()]; + let expected_file_name = format!("{session_id}.json"); + + while let Some(dir) = stack.pop() { + let mut entries = tokio::fs::read_dir(&dir).await.ok()?; + + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + let file_type = match entry.file_type().await { + Ok(file_type) => file_type, + Err(_) => continue, + }; + + if file_type.is_dir() { + stack.push(path); + continue; + } + + if file_type.is_file() + && path.file_name().and_then(|name| name.to_str()) == Some(&expected_file_name) + { + return Some(path); + } + } + } + + None +} + async fn read_json(path: &Path) -> Option { let content = tokio::fs::read_to_string(path).await.ok()?; serde_json::from_str::(&content).ok() @@ -1496,6 +1569,44 @@ mod tests { assert_eq!(logs.len(), 1); } + #[tokio::test] + async fn test_opencode_loads_older_file_backed_root_for_recent_child() { + let tmp = tempfile::TempDir::new().unwrap(); + let root = tmp.path(); + + let root_file = root + .join("storage") + .join("session") + .join("global") + .join("ses_root.json"); + write_json(&root_file, r#"{"id":"ses_root","directory":"/repo"}"#).await; + set_file_mtime(&root_file, 100); + + let child_file = root + .join("storage") + .join("session") + .join("global") + .join("ses_child.json"); + write_json( + &child_file, + r#"{"id":"ses_child","parentID":"ses_root","directory":"/repo"}"#, + ) + .await; + set_file_mtime(&child_file, 9_950); + + let logs = discover_recent_in(&[root.to_path_buf()], 10_000, 100).await; + assert_eq!(logs.len(), 1); + + match &logs[0].source { + SessionSource::Inline { label, content } => { + assert_eq!(label, "opencode:ses_root"); + assert!(content.contains("\"originSessionID\":\"ses_child\"")); + assert!(content.contains("\"parentSessionID\":\"ses_root\"")); + } + SessionSource::File(_) => panic!("expected inline session"), + } + } + #[tokio::test] async fn test_opencode_prefers_newer_mtime_when_updated_missing() { let tmp = tempfile::TempDir::new().unwrap();