Skip to content

Commit ce5c210

Browse files
kys0213claude
andauthored
feat(daemon): record evaluate subprocess token_usage in database (#716)
- Add runtime_name and model fields to belt agent --json output - Add parse_ipc_token_usage() method on EvaluateResult to extract TokenUsage, runtime_name, and model from subprocess IPC JSON - Record parsed token usage to DB after run_evaluate subprocess path - Add IpcTokenUsage struct and try_record_ipc_token_usage() helper Closes #677 Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6717124 commit ce5c210

3 files changed

Lines changed: 344 additions & 1 deletion

File tree

crates/belt-cli/src/agent.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,8 @@ pub async fn run_agent(
535535
"cache_read_tokens": u.cache_read_tokens,
536536
"cache_write_tokens": u.cache_write_tokens,
537537
})),
538+
"runtime_name": action_result.runtime_name,
539+
"model": action_result.model,
538540
});
539541
println!("{}", serde_json::to_string_pretty(&output)?);
540542
} else {

crates/belt-daemon/src/daemon.rs

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1596,7 +1596,20 @@ impl Daemon {
15961596
Err(e) => Err(e),
15971597
}
15981598
} else {
1599-
self.evaluator.run_evaluate(&self.belt_home).await
1599+
let result = self.evaluator.run_evaluate(&self.belt_home).await;
1600+
1601+
// Parse and record token usage from subprocess IPC JSON output.
1602+
if let Ok(ref eval_result) = result
1603+
&& let Some(ref ipc_usage) = eval_result.parse_ipc_token_usage()
1604+
{
1605+
for work_id in &completed {
1606+
if let Some(item) = self.queue.iter().find(|i| i.work_id == *work_id) {
1607+
self.try_record_ipc_token_usage(item, ipc_usage);
1608+
}
1609+
}
1610+
}
1611+
1612+
result
16001613
}
16011614
};
16021615

@@ -2237,6 +2250,41 @@ impl Daemon {
22372250
}
22382251
}
22392252

2253+
/// Record token usage parsed from evaluate subprocess IPC JSON output.
2254+
///
2255+
/// When the evaluate subprocess runs via `belt agent --json`, the stdout
2256+
/// JSON may contain `token_usage`, `runtime_name`, and `model` fields.
2257+
/// This method extracts those and records them to the DB.
2258+
fn try_record_ipc_token_usage(
2259+
&self,
2260+
item: &QueueItem,
2261+
ipc_usage: &crate::evaluator::IpcTokenUsage,
2262+
) {
2263+
if let Some(ref db) = self.db {
2264+
let model = ipc_usage.model.as_deref().unwrap_or("unknown");
2265+
if let Err(e) = db.record_token_usage(
2266+
&item.work_id,
2267+
&self.config.name,
2268+
&ipc_usage.runtime_name,
2269+
model,
2270+
&ipc_usage.token_usage,
2271+
ipc_usage.duration_ms,
2272+
) {
2273+
tracing::warn!(
2274+
"failed to record evaluate IPC token usage for {}: {e}",
2275+
item.work_id
2276+
);
2277+
} else {
2278+
tracing::debug!(
2279+
"recorded evaluate IPC token usage for {}: input={}, output={}",
2280+
item.work_id,
2281+
ipc_usage.token_usage.input_tokens,
2282+
ipc_usage.token_usage.output_tokens
2283+
);
2284+
}
2285+
}
2286+
}
2287+
22402288
fn record_history(&mut self, item: &QueueItem, status: &str, error: Option<&str>) {
22412289
let attempt = self
22422290
.history
@@ -4474,6 +4522,84 @@ sources:
44744522
assert_eq!(rows[0].model, "unknown");
44754523
}
44764524

4525+
// ---------------------------------------------------------------
4526+
// IPC token usage recording tests
4527+
// ---------------------------------------------------------------
4528+
4529+
#[test]
4530+
fn try_record_ipc_token_usage_saves_to_db() {
4531+
use belt_core::runtime::TokenUsage;
4532+
use belt_infra::db::Database;
4533+
4534+
let tmp = TempDir::new().unwrap();
4535+
let source = MockDataSource::new("github");
4536+
let db = Database::open_in_memory().unwrap();
4537+
let daemon = setup_daemon(&tmp, source, vec![]).with_db(db);
4538+
4539+
let item = test_item("github:org/repo#10", "analyze");
4540+
let ipc_usage = crate::evaluator::IpcTokenUsage {
4541+
token_usage: TokenUsage {
4542+
input_tokens: 800,
4543+
output_tokens: 400,
4544+
cache_read_tokens: Some(30),
4545+
cache_write_tokens: None,
4546+
},
4547+
runtime_name: "claude".to_string(),
4548+
model: Some("opus-4".to_string()),
4549+
duration_ms: Some(2500),
4550+
};
4551+
4552+
daemon.try_record_ipc_token_usage(&item, &ipc_usage);
4553+
4554+
let rows = daemon
4555+
.db
4556+
.as_ref()
4557+
.unwrap()
4558+
.get_token_usage_by_work_id("github:org/repo#10:analyze")
4559+
.unwrap();
4560+
assert_eq!(rows.len(), 1);
4561+
assert_eq!(rows[0].input_tokens, 800);
4562+
assert_eq!(rows[0].output_tokens, 400);
4563+
assert_eq!(rows[0].model, "opus-4");
4564+
assert_eq!(rows[0].runtime, "claude");
4565+
}
4566+
4567+
#[test]
4568+
fn try_record_ipc_token_usage_uses_unknown_model_when_absent() {
4569+
use belt_core::runtime::TokenUsage;
4570+
use belt_infra::db::Database;
4571+
4572+
let tmp = TempDir::new().unwrap();
4573+
let source = MockDataSource::new("github");
4574+
let db = Database::open_in_memory().unwrap();
4575+
let daemon = setup_daemon(&tmp, source, vec![]).with_db(db);
4576+
4577+
let item = test_item("github:org/repo#11", "analyze");
4578+
let ipc_usage = crate::evaluator::IpcTokenUsage {
4579+
token_usage: TokenUsage {
4580+
input_tokens: 100,
4581+
output_tokens: 50,
4582+
cache_read_tokens: None,
4583+
cache_write_tokens: None,
4584+
},
4585+
runtime_name: "gemini".to_string(),
4586+
model: None,
4587+
duration_ms: None,
4588+
};
4589+
4590+
daemon.try_record_ipc_token_usage(&item, &ipc_usage);
4591+
4592+
let rows = daemon
4593+
.db
4594+
.as_ref()
4595+
.unwrap()
4596+
.get_token_usage_by_work_id("github:org/repo#11:analyze")
4597+
.unwrap();
4598+
assert_eq!(rows.len(), 1);
4599+
assert_eq!(rows[0].model, "unknown");
4600+
assert_eq!(rows[0].runtime, "gemini");
4601+
}
4602+
44774603
// ---------------------------------------------------------------
44784604
// advance_ready_to_running: HashMap per-workspace concurrency (additional)
44794605
// ---------------------------------------------------------------

crates/belt-daemon/src/evaluator.rs

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use anyhow::Result;
77
use belt_core::action::Action;
88
use belt_core::phase::QueuePhase;
99
use belt_core::queue::QueueItem;
10+
use belt_core::runtime::TokenUsage;
1011

1112
use crate::executor::{ActionEnv, ActionExecutor, ActionResult};
1213

@@ -301,10 +302,63 @@ pub struct EvaluateResult {
301302
pub ipc_result: Option<serde_json::Value>,
302303
}
303304

305+
/// Token usage and runtime metadata parsed from subprocess IPC JSON output.
306+
#[derive(Debug, Clone)]
307+
pub struct IpcTokenUsage {
308+
pub token_usage: TokenUsage,
309+
pub runtime_name: String,
310+
pub model: Option<String>,
311+
pub duration_ms: Option<u64>,
312+
}
313+
304314
impl EvaluateResult {
305315
pub fn success(&self) -> bool {
306316
self.exit_code == 0
307317
}
318+
319+
/// Parse token usage from the subprocess IPC JSON output.
320+
///
321+
/// When the evaluate subprocess is invoked with `--json`, the stdout
322+
/// contains a JSON object with optional `token_usage`, `runtime_name`,
323+
/// and `model` fields. This method extracts those fields into an
324+
/// [`IpcTokenUsage`] for DB recording.
325+
///
326+
/// Returns `None` if `ipc_result` is absent, or if `token_usage` or
327+
/// `runtime_name` is missing from the JSON.
328+
pub fn parse_ipc_token_usage(&self) -> Option<IpcTokenUsage> {
329+
let json = self.ipc_result.as_ref()?;
330+
let tu = json.get("token_usage")?;
331+
if tu.is_null() {
332+
return None;
333+
}
334+
let runtime_name = json.get("runtime_name")?.as_str()?;
335+
if runtime_name.is_empty() {
336+
return None;
337+
}
338+
339+
let input_tokens = tu.get("input_tokens")?.as_u64()?;
340+
let output_tokens = tu.get("output_tokens")?.as_u64()?;
341+
let cache_read_tokens = tu.get("cache_read_tokens").and_then(|v| v.as_u64());
342+
let cache_write_tokens = tu.get("cache_write_tokens").and_then(|v| v.as_u64());
343+
344+
let model = json
345+
.get("model")
346+
.and_then(|v| v.as_str())
347+
.map(|s| s.to_string());
348+
let duration_ms = json.get("duration_ms").and_then(|v| v.as_u64());
349+
350+
Some(IpcTokenUsage {
351+
token_usage: TokenUsage {
352+
input_tokens,
353+
output_tokens,
354+
cache_read_tokens,
355+
cache_write_tokens,
356+
},
357+
runtime_name: runtime_name.to_string(),
358+
model,
359+
duration_ms,
360+
})
361+
}
308362
}
309363

310364
impl From<ActionResult> for EvaluateResult {
@@ -1760,4 +1814,165 @@ exit {exit_code}
17601814
let child_output = result.unwrap().unwrap();
17611815
assert_eq!(child_output.status.code().unwrap(), 0);
17621816
}
1817+
1818+
// --- Tests for parse_ipc_token_usage ---
1819+
1820+
#[test]
1821+
fn parse_ipc_token_usage_extracts_full_fields() {
1822+
let json = serde_json::json!({
1823+
"exit_code": 0,
1824+
"token_usage": {
1825+
"input_tokens": 1000,
1826+
"output_tokens": 500,
1827+
"cache_read_tokens": 50,
1828+
"cache_write_tokens": 25,
1829+
},
1830+
"runtime_name": "claude",
1831+
"model": "opus-4",
1832+
"duration_ms": 3200,
1833+
});
1834+
let result = EvaluateResult {
1835+
exit_code: 0,
1836+
stdout: String::new(),
1837+
stderr: String::new(),
1838+
action_result: None,
1839+
ipc_result: Some(json),
1840+
};
1841+
1842+
let ipc_usage = result
1843+
.parse_ipc_token_usage()
1844+
.expect("should parse token usage from IPC JSON");
1845+
assert_eq!(ipc_usage.token_usage.input_tokens, 1000);
1846+
assert_eq!(ipc_usage.token_usage.output_tokens, 500);
1847+
assert_eq!(ipc_usage.token_usage.cache_read_tokens, Some(50));
1848+
assert_eq!(ipc_usage.token_usage.cache_write_tokens, Some(25));
1849+
assert_eq!(ipc_usage.runtime_name, "claude");
1850+
assert_eq!(ipc_usage.model.as_deref(), Some("opus-4"));
1851+
assert_eq!(ipc_usage.duration_ms, Some(3200));
1852+
}
1853+
1854+
#[test]
1855+
fn parse_ipc_token_usage_returns_none_without_ipc_result() {
1856+
let result = EvaluateResult {
1857+
exit_code: 0,
1858+
stdout: String::new(),
1859+
stderr: String::new(),
1860+
action_result: None,
1861+
ipc_result: None,
1862+
};
1863+
assert!(result.parse_ipc_token_usage().is_none());
1864+
}
1865+
1866+
#[test]
1867+
fn parse_ipc_token_usage_returns_none_when_token_usage_null() {
1868+
let json = serde_json::json!({
1869+
"exit_code": 0,
1870+
"token_usage": null,
1871+
"runtime_name": "claude",
1872+
});
1873+
let result = EvaluateResult {
1874+
exit_code: 0,
1875+
stdout: String::new(),
1876+
stderr: String::new(),
1877+
action_result: None,
1878+
ipc_result: Some(json),
1879+
};
1880+
assert!(result.parse_ipc_token_usage().is_none());
1881+
}
1882+
1883+
#[test]
1884+
fn parse_ipc_token_usage_returns_none_when_runtime_name_missing() {
1885+
let json = serde_json::json!({
1886+
"exit_code": 0,
1887+
"token_usage": {
1888+
"input_tokens": 100,
1889+
"output_tokens": 50,
1890+
},
1891+
});
1892+
let result = EvaluateResult {
1893+
exit_code: 0,
1894+
stdout: String::new(),
1895+
stderr: String::new(),
1896+
action_result: None,
1897+
ipc_result: Some(json),
1898+
};
1899+
assert!(result.parse_ipc_token_usage().is_none());
1900+
}
1901+
1902+
#[test]
1903+
fn parse_ipc_token_usage_returns_none_when_runtime_name_empty() {
1904+
let json = serde_json::json!({
1905+
"exit_code": 0,
1906+
"token_usage": {
1907+
"input_tokens": 100,
1908+
"output_tokens": 50,
1909+
},
1910+
"runtime_name": "",
1911+
});
1912+
let result = EvaluateResult {
1913+
exit_code: 0,
1914+
stdout: String::new(),
1915+
stderr: String::new(),
1916+
action_result: None,
1917+
ipc_result: Some(json),
1918+
};
1919+
assert!(result.parse_ipc_token_usage().is_none());
1920+
}
1921+
1922+
#[test]
1923+
fn parse_ipc_token_usage_handles_optional_cache_tokens() {
1924+
let json = serde_json::json!({
1925+
"exit_code": 0,
1926+
"token_usage": {
1927+
"input_tokens": 200,
1928+
"output_tokens": 100,
1929+
"cache_read_tokens": null,
1930+
"cache_write_tokens": null,
1931+
},
1932+
"runtime_name": "gemini",
1933+
"duration_ms": 1500,
1934+
});
1935+
let result = EvaluateResult {
1936+
exit_code: 0,
1937+
stdout: String::new(),
1938+
stderr: String::new(),
1939+
action_result: None,
1940+
ipc_result: Some(json),
1941+
};
1942+
1943+
let ipc_usage = result
1944+
.parse_ipc_token_usage()
1945+
.expect("should parse even with null cache tokens");
1946+
assert_eq!(ipc_usage.token_usage.input_tokens, 200);
1947+
assert_eq!(ipc_usage.token_usage.output_tokens, 100);
1948+
assert!(ipc_usage.token_usage.cache_read_tokens.is_none());
1949+
assert!(ipc_usage.token_usage.cache_write_tokens.is_none());
1950+
assert_eq!(ipc_usage.runtime_name, "gemini");
1951+
assert!(ipc_usage.model.is_none());
1952+
}
1953+
1954+
#[test]
1955+
fn parse_ipc_token_usage_without_model_and_duration() {
1956+
let json = serde_json::json!({
1957+
"token_usage": {
1958+
"input_tokens": 50,
1959+
"output_tokens": 25,
1960+
},
1961+
"runtime_name": "codex",
1962+
});
1963+
let result = EvaluateResult {
1964+
exit_code: 0,
1965+
stdout: String::new(),
1966+
stderr: String::new(),
1967+
action_result: None,
1968+
ipc_result: Some(json),
1969+
};
1970+
1971+
let ipc_usage = result
1972+
.parse_ipc_token_usage()
1973+
.expect("should parse without model and duration");
1974+
assert_eq!(ipc_usage.runtime_name, "codex");
1975+
assert!(ipc_usage.model.is_none());
1976+
assert!(ipc_usage.duration_ms.is_none());
1977+
}
17631978
}

0 commit comments

Comments
 (0)