From a791e7c7d437e97a750c3afeb02d78418d1f3dd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9A=A9=EC=84=B1?= <66245186+kys0213@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:36:40 +0900 Subject: [PATCH] feat(daemon): record evaluate subprocess token_usage in database - Add runtime_name and model fields to belt agent --json output - Add parse_ipc_token_usage() method on EvaluateResult to extract TokenUsage, runtime_name, and model from subprocess IPC JSON - Record parsed token usage to DB after run_evaluate subprocess path - Add IpcTokenUsage struct and try_record_ipc_token_usage() helper Closes #677 Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/belt-cli/src/agent.rs | 2 + crates/belt-daemon/src/daemon.rs | 128 ++++++++++++++++- crates/belt-daemon/src/evaluator.rs | 215 ++++++++++++++++++++++++++++ 3 files changed, 344 insertions(+), 1 deletion(-) diff --git a/crates/belt-cli/src/agent.rs b/crates/belt-cli/src/agent.rs index 9ecf5c1..e5142bd 100644 --- a/crates/belt-cli/src/agent.rs +++ b/crates/belt-cli/src/agent.rs @@ -535,6 +535,8 @@ pub async fn run_agent( "cache_read_tokens": u.cache_read_tokens, "cache_write_tokens": u.cache_write_tokens, })), + "runtime_name": action_result.runtime_name, + "model": action_result.model, }); println!("{}", serde_json::to_string_pretty(&output)?); } else { diff --git a/crates/belt-daemon/src/daemon.rs b/crates/belt-daemon/src/daemon.rs index 35ad830..0cd2a90 100644 --- a/crates/belt-daemon/src/daemon.rs +++ b/crates/belt-daemon/src/daemon.rs @@ -1596,7 +1596,20 @@ impl Daemon { Err(e) => Err(e), } } else { - self.evaluator.run_evaluate(&self.belt_home).await + let result = self.evaluator.run_evaluate(&self.belt_home).await; + + // Parse and record token usage from subprocess IPC JSON output. + if let Ok(ref eval_result) = result + && let Some(ref ipc_usage) = eval_result.parse_ipc_token_usage() + { + for work_id in &completed { + if let Some(item) = self.queue.iter().find(|i| i.work_id == *work_id) { + self.try_record_ipc_token_usage(item, ipc_usage); + } + } + } + + result } }; @@ -2226,6 +2239,41 @@ impl Daemon { } } + /// Record token usage parsed from evaluate subprocess IPC JSON output. + /// + /// When the evaluate subprocess runs via `belt agent --json`, the stdout + /// JSON may contain `token_usage`, `runtime_name`, and `model` fields. + /// This method extracts those and records them to the DB. + fn try_record_ipc_token_usage( + &self, + item: &QueueItem, + ipc_usage: &crate::evaluator::IpcTokenUsage, + ) { + if let Some(ref db) = self.db { + let model = ipc_usage.model.as_deref().unwrap_or("unknown"); + if let Err(e) = db.record_token_usage( + &item.work_id, + &self.config.name, + &ipc_usage.runtime_name, + model, + &ipc_usage.token_usage, + ipc_usage.duration_ms, + ) { + tracing::warn!( + "failed to record evaluate IPC token usage for {}: {e}", + item.work_id + ); + } else { + tracing::debug!( + "recorded evaluate IPC token usage for {}: input={}, output={}", + item.work_id, + ipc_usage.token_usage.input_tokens, + ipc_usage.token_usage.output_tokens + ); + } + } + } + fn record_history(&mut self, item: &QueueItem, status: &str, error: Option<&str>) { let attempt = self .history @@ -4463,6 +4511,84 @@ sources: assert_eq!(rows[0].model, "unknown"); } + // --------------------------------------------------------------- + // IPC token usage recording tests + // --------------------------------------------------------------- + + #[test] + fn try_record_ipc_token_usage_saves_to_db() { + use belt_core::runtime::TokenUsage; + use belt_infra::db::Database; + + let tmp = TempDir::new().unwrap(); + let source = MockDataSource::new("github"); + let db = Database::open_in_memory().unwrap(); + let daemon = setup_daemon(&tmp, source, vec![]).with_db(db); + + let item = test_item("github:org/repo#10", "analyze"); + let ipc_usage = crate::evaluator::IpcTokenUsage { + token_usage: TokenUsage { + input_tokens: 800, + output_tokens: 400, + cache_read_tokens: Some(30), + cache_write_tokens: None, + }, + runtime_name: "claude".to_string(), + model: Some("opus-4".to_string()), + duration_ms: Some(2500), + }; + + daemon.try_record_ipc_token_usage(&item, &ipc_usage); + + let rows = daemon + .db + .as_ref() + .unwrap() + .get_token_usage_by_work_id("github:org/repo#10:analyze") + .unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].input_tokens, 800); + assert_eq!(rows[0].output_tokens, 400); + assert_eq!(rows[0].model, "opus-4"); + assert_eq!(rows[0].runtime, "claude"); + } + + #[test] + fn try_record_ipc_token_usage_uses_unknown_model_when_absent() { + use belt_core::runtime::TokenUsage; + use belt_infra::db::Database; + + let tmp = TempDir::new().unwrap(); + let source = MockDataSource::new("github"); + let db = Database::open_in_memory().unwrap(); + let daemon = setup_daemon(&tmp, source, vec![]).with_db(db); + + let item = test_item("github:org/repo#11", "analyze"); + let ipc_usage = crate::evaluator::IpcTokenUsage { + token_usage: TokenUsage { + input_tokens: 100, + output_tokens: 50, + cache_read_tokens: None, + cache_write_tokens: None, + }, + runtime_name: "gemini".to_string(), + model: None, + duration_ms: None, + }; + + daemon.try_record_ipc_token_usage(&item, &ipc_usage); + + let rows = daemon + .db + .as_ref() + .unwrap() + .get_token_usage_by_work_id("github:org/repo#11:analyze") + .unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].model, "unknown"); + assert_eq!(rows[0].runtime, "gemini"); + } + // --------------------------------------------------------------- // advance_ready_to_running: HashMap per-workspace concurrency (additional) // --------------------------------------------------------------- diff --git a/crates/belt-daemon/src/evaluator.rs b/crates/belt-daemon/src/evaluator.rs index 4abc379..14576f6 100644 --- a/crates/belt-daemon/src/evaluator.rs +++ b/crates/belt-daemon/src/evaluator.rs @@ -7,6 +7,7 @@ use anyhow::Result; use belt_core::action::Action; use belt_core::phase::QueuePhase; use belt_core::queue::QueueItem; +use belt_core::runtime::TokenUsage; use crate::executor::{ActionEnv, ActionExecutor, ActionResult}; @@ -301,10 +302,63 @@ pub struct EvaluateResult { pub ipc_result: Option, } +/// Token usage and runtime metadata parsed from subprocess IPC JSON output. +#[derive(Debug, Clone)] +pub struct IpcTokenUsage { + pub token_usage: TokenUsage, + pub runtime_name: String, + pub model: Option, + pub duration_ms: Option, +} + impl EvaluateResult { pub fn success(&self) -> bool { self.exit_code == 0 } + + /// Parse token usage from the subprocess IPC JSON output. + /// + /// When the evaluate subprocess is invoked with `--json`, the stdout + /// contains a JSON object with optional `token_usage`, `runtime_name`, + /// and `model` fields. This method extracts those fields into an + /// [`IpcTokenUsage`] for DB recording. + /// + /// Returns `None` if `ipc_result` is absent, or if `token_usage` or + /// `runtime_name` is missing from the JSON. + pub fn parse_ipc_token_usage(&self) -> Option { + let json = self.ipc_result.as_ref()?; + let tu = json.get("token_usage")?; + if tu.is_null() { + return None; + } + let runtime_name = json.get("runtime_name")?.as_str()?; + if runtime_name.is_empty() { + return None; + } + + let input_tokens = tu.get("input_tokens")?.as_u64()?; + let output_tokens = tu.get("output_tokens")?.as_u64()?; + let cache_read_tokens = tu.get("cache_read_tokens").and_then(|v| v.as_u64()); + let cache_write_tokens = tu.get("cache_write_tokens").and_then(|v| v.as_u64()); + + let model = json + .get("model") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let duration_ms = json.get("duration_ms").and_then(|v| v.as_u64()); + + Some(IpcTokenUsage { + token_usage: TokenUsage { + input_tokens, + output_tokens, + cache_read_tokens, + cache_write_tokens, + }, + runtime_name: runtime_name.to_string(), + model, + duration_ms, + }) + } } impl From for EvaluateResult { @@ -1760,4 +1814,165 @@ exit {exit_code} let child_output = result.unwrap().unwrap(); assert_eq!(child_output.status.code().unwrap(), 0); } + + // --- Tests for parse_ipc_token_usage --- + + #[test] + fn parse_ipc_token_usage_extracts_full_fields() { + let json = serde_json::json!({ + "exit_code": 0, + "token_usage": { + "input_tokens": 1000, + "output_tokens": 500, + "cache_read_tokens": 50, + "cache_write_tokens": 25, + }, + "runtime_name": "claude", + "model": "opus-4", + "duration_ms": 3200, + }); + let result = EvaluateResult { + exit_code: 0, + stdout: String::new(), + stderr: String::new(), + action_result: None, + ipc_result: Some(json), + }; + + let ipc_usage = result + .parse_ipc_token_usage() + .expect("should parse token usage from IPC JSON"); + assert_eq!(ipc_usage.token_usage.input_tokens, 1000); + assert_eq!(ipc_usage.token_usage.output_tokens, 500); + assert_eq!(ipc_usage.token_usage.cache_read_tokens, Some(50)); + assert_eq!(ipc_usage.token_usage.cache_write_tokens, Some(25)); + assert_eq!(ipc_usage.runtime_name, "claude"); + assert_eq!(ipc_usage.model.as_deref(), Some("opus-4")); + assert_eq!(ipc_usage.duration_ms, Some(3200)); + } + + #[test] + fn parse_ipc_token_usage_returns_none_without_ipc_result() { + let result = EvaluateResult { + exit_code: 0, + stdout: String::new(), + stderr: String::new(), + action_result: None, + ipc_result: None, + }; + assert!(result.parse_ipc_token_usage().is_none()); + } + + #[test] + fn parse_ipc_token_usage_returns_none_when_token_usage_null() { + let json = serde_json::json!({ + "exit_code": 0, + "token_usage": null, + "runtime_name": "claude", + }); + let result = EvaluateResult { + exit_code: 0, + stdout: String::new(), + stderr: String::new(), + action_result: None, + ipc_result: Some(json), + }; + assert!(result.parse_ipc_token_usage().is_none()); + } + + #[test] + fn parse_ipc_token_usage_returns_none_when_runtime_name_missing() { + let json = serde_json::json!({ + "exit_code": 0, + "token_usage": { + "input_tokens": 100, + "output_tokens": 50, + }, + }); + let result = EvaluateResult { + exit_code: 0, + stdout: String::new(), + stderr: String::new(), + action_result: None, + ipc_result: Some(json), + }; + assert!(result.parse_ipc_token_usage().is_none()); + } + + #[test] + fn parse_ipc_token_usage_returns_none_when_runtime_name_empty() { + let json = serde_json::json!({ + "exit_code": 0, + "token_usage": { + "input_tokens": 100, + "output_tokens": 50, + }, + "runtime_name": "", + }); + let result = EvaluateResult { + exit_code: 0, + stdout: String::new(), + stderr: String::new(), + action_result: None, + ipc_result: Some(json), + }; + assert!(result.parse_ipc_token_usage().is_none()); + } + + #[test] + fn parse_ipc_token_usage_handles_optional_cache_tokens() { + let json = serde_json::json!({ + "exit_code": 0, + "token_usage": { + "input_tokens": 200, + "output_tokens": 100, + "cache_read_tokens": null, + "cache_write_tokens": null, + }, + "runtime_name": "gemini", + "duration_ms": 1500, + }); + let result = EvaluateResult { + exit_code: 0, + stdout: String::new(), + stderr: String::new(), + action_result: None, + ipc_result: Some(json), + }; + + let ipc_usage = result + .parse_ipc_token_usage() + .expect("should parse even with null cache tokens"); + assert_eq!(ipc_usage.token_usage.input_tokens, 200); + assert_eq!(ipc_usage.token_usage.output_tokens, 100); + assert!(ipc_usage.token_usage.cache_read_tokens.is_none()); + assert!(ipc_usage.token_usage.cache_write_tokens.is_none()); + assert_eq!(ipc_usage.runtime_name, "gemini"); + assert!(ipc_usage.model.is_none()); + } + + #[test] + fn parse_ipc_token_usage_without_model_and_duration() { + let json = serde_json::json!({ + "token_usage": { + "input_tokens": 50, + "output_tokens": 25, + }, + "runtime_name": "codex", + }); + let result = EvaluateResult { + exit_code: 0, + stdout: String::new(), + stderr: String::new(), + action_result: None, + ipc_result: Some(json), + }; + + let ipc_usage = result + .parse_ipc_token_usage() + .expect("should parse without model and duration"); + assert_eq!(ipc_usage.runtime_name, "codex"); + assert!(ipc_usage.model.is_none()); + assert!(ipc_usage.duration_ms.is_none()); + } }