Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/belt-cli/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ pub async fn run_agent(
"cache_read_tokens": u.cache_read_tokens,
"cache_write_tokens": u.cache_write_tokens,
})),
"runtime_name": action_result.runtime_name,
"model": action_result.model,
});
println!("{}", serde_json::to_string_pretty(&output)?);
} else {
Expand Down
128 changes: 127 additions & 1 deletion crates/belt-daemon/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,20 @@ impl Daemon {
Err(e) => Err(e),
}
} else {
self.evaluator.run_evaluate(&self.belt_home).await
let result = self.evaluator.run_evaluate(&self.belt_home).await;

// Parse and record token usage from subprocess IPC JSON output.
if let Ok(ref eval_result) = result
&& let Some(ref ipc_usage) = eval_result.parse_ipc_token_usage()
{
for work_id in &completed {
if let Some(item) = self.queue.iter().find(|i| i.work_id == *work_id) {
self.try_record_ipc_token_usage(item, ipc_usage);
}
}
}

result
}
};

Expand Down Expand Up @@ -2226,6 +2239,41 @@ impl Daemon {
}
}

/// Record token usage parsed from evaluate subprocess IPC JSON output.
///
/// When the evaluate subprocess runs via `belt agent --json`, the stdout
/// JSON may contain `token_usage`, `runtime_name`, and `model` fields.
/// This method extracts those and records them to the DB.
fn try_record_ipc_token_usage(
&self,
item: &QueueItem,
ipc_usage: &crate::evaluator::IpcTokenUsage,
) {
if let Some(ref db) = self.db {
let model = ipc_usage.model.as_deref().unwrap_or("unknown");
if let Err(e) = db.record_token_usage(
&item.work_id,
&self.config.name,
&ipc_usage.runtime_name,
model,
&ipc_usage.token_usage,
ipc_usage.duration_ms,
) {
tracing::warn!(
"failed to record evaluate IPC token usage for {}: {e}",
item.work_id
);
} else {
tracing::debug!(
"recorded evaluate IPC token usage for {}: input={}, output={}",
item.work_id,
ipc_usage.token_usage.input_tokens,
ipc_usage.token_usage.output_tokens
);
}
}
}

fn record_history(&mut self, item: &QueueItem, status: &str, error: Option<&str>) {
let attempt = self
.history
Expand Down Expand Up @@ -4463,6 +4511,84 @@ sources:
assert_eq!(rows[0].model, "unknown");
}

// ---------------------------------------------------------------
// IPC token usage recording tests
// ---------------------------------------------------------------

#[test]
fn try_record_ipc_token_usage_saves_to_db() {
use belt_core::runtime::TokenUsage;
use belt_infra::db::Database;

let tmp = TempDir::new().unwrap();
let source = MockDataSource::new("github");
let db = Database::open_in_memory().unwrap();
let daemon = setup_daemon(&tmp, source, vec![]).with_db(db);

let item = test_item("github:org/repo#10", "analyze");
let ipc_usage = crate::evaluator::IpcTokenUsage {
token_usage: TokenUsage {
input_tokens: 800,
output_tokens: 400,
cache_read_tokens: Some(30),
cache_write_tokens: None,
},
runtime_name: "claude".to_string(),
model: Some("opus-4".to_string()),
duration_ms: Some(2500),
};

daemon.try_record_ipc_token_usage(&item, &ipc_usage);

let rows = daemon
.db
.as_ref()
.unwrap()
.get_token_usage_by_work_id("github:org/repo#10:analyze")
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].input_tokens, 800);
assert_eq!(rows[0].output_tokens, 400);
assert_eq!(rows[0].model, "opus-4");
assert_eq!(rows[0].runtime, "claude");
}

#[test]
fn try_record_ipc_token_usage_uses_unknown_model_when_absent() {
use belt_core::runtime::TokenUsage;
use belt_infra::db::Database;

let tmp = TempDir::new().unwrap();
let source = MockDataSource::new("github");
let db = Database::open_in_memory().unwrap();
let daemon = setup_daemon(&tmp, source, vec![]).with_db(db);

let item = test_item("github:org/repo#11", "analyze");
let ipc_usage = crate::evaluator::IpcTokenUsage {
token_usage: TokenUsage {
input_tokens: 100,
output_tokens: 50,
cache_read_tokens: None,
cache_write_tokens: None,
},
runtime_name: "gemini".to_string(),
model: None,
duration_ms: None,
};

daemon.try_record_ipc_token_usage(&item, &ipc_usage);

let rows = daemon
.db
.as_ref()
.unwrap()
.get_token_usage_by_work_id("github:org/repo#11:analyze")
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].model, "unknown");
assert_eq!(rows[0].runtime, "gemini");
}

// ---------------------------------------------------------------
// advance_ready_to_running: HashMap per-workspace concurrency (additional)
// ---------------------------------------------------------------
Expand Down
215 changes: 215 additions & 0 deletions crates/belt-daemon/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::Result;
use belt_core::action::Action;
use belt_core::phase::QueuePhase;
use belt_core::queue::QueueItem;
use belt_core::runtime::TokenUsage;

use crate::executor::{ActionEnv, ActionExecutor, ActionResult};

Expand Down Expand Up @@ -301,10 +302,63 @@ pub struct EvaluateResult {
pub ipc_result: Option<serde_json::Value>,
}

/// Token usage and runtime metadata parsed from subprocess IPC JSON output.
#[derive(Debug, Clone)]
pub struct IpcTokenUsage {
pub token_usage: TokenUsage,
pub runtime_name: String,
pub model: Option<String>,
pub duration_ms: Option<u64>,
}

impl EvaluateResult {
pub fn success(&self) -> bool {
self.exit_code == 0
}

/// Parse token usage from the subprocess IPC JSON output.
///
/// When the evaluate subprocess is invoked with `--json`, the stdout
/// contains a JSON object with optional `token_usage`, `runtime_name`,
/// and `model` fields. This method extracts those fields into an
/// [`IpcTokenUsage`] for DB recording.
///
/// Returns `None` if `ipc_result` is absent, or if `token_usage` or
/// `runtime_name` is missing from the JSON.
pub fn parse_ipc_token_usage(&self) -> Option<IpcTokenUsage> {
let json = self.ipc_result.as_ref()?;
let tu = json.get("token_usage")?;
if tu.is_null() {
return None;
}
let runtime_name = json.get("runtime_name")?.as_str()?;
if runtime_name.is_empty() {
return None;
}

let input_tokens = tu.get("input_tokens")?.as_u64()?;
let output_tokens = tu.get("output_tokens")?.as_u64()?;
let cache_read_tokens = tu.get("cache_read_tokens").and_then(|v| v.as_u64());
let cache_write_tokens = tu.get("cache_write_tokens").and_then(|v| v.as_u64());

let model = json
.get("model")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let duration_ms = json.get("duration_ms").and_then(|v| v.as_u64());

Some(IpcTokenUsage {
token_usage: TokenUsage {
input_tokens,
output_tokens,
cache_read_tokens,
cache_write_tokens,
},
runtime_name: runtime_name.to_string(),
model,
duration_ms,
})
}
}

impl From<ActionResult> for EvaluateResult {
Expand Down Expand Up @@ -1760,4 +1814,165 @@ exit {exit_code}
let child_output = result.unwrap().unwrap();
assert_eq!(child_output.status.code().unwrap(), 0);
}

// --- Tests for parse_ipc_token_usage ---

#[test]
fn parse_ipc_token_usage_extracts_full_fields() {
let json = serde_json::json!({
"exit_code": 0,
"token_usage": {
"input_tokens": 1000,
"output_tokens": 500,
"cache_read_tokens": 50,
"cache_write_tokens": 25,
},
"runtime_name": "claude",
"model": "opus-4",
"duration_ms": 3200,
});
let result = EvaluateResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
action_result: None,
ipc_result: Some(json),
};

let ipc_usage = result
.parse_ipc_token_usage()
.expect("should parse token usage from IPC JSON");
assert_eq!(ipc_usage.token_usage.input_tokens, 1000);
assert_eq!(ipc_usage.token_usage.output_tokens, 500);
assert_eq!(ipc_usage.token_usage.cache_read_tokens, Some(50));
assert_eq!(ipc_usage.token_usage.cache_write_tokens, Some(25));
assert_eq!(ipc_usage.runtime_name, "claude");
assert_eq!(ipc_usage.model.as_deref(), Some("opus-4"));
assert_eq!(ipc_usage.duration_ms, Some(3200));
}

#[test]
fn parse_ipc_token_usage_returns_none_without_ipc_result() {
let result = EvaluateResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
action_result: None,
ipc_result: None,
};
assert!(result.parse_ipc_token_usage().is_none());
}

#[test]
fn parse_ipc_token_usage_returns_none_when_token_usage_null() {
let json = serde_json::json!({
"exit_code": 0,
"token_usage": null,
"runtime_name": "claude",
});
let result = EvaluateResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
action_result: None,
ipc_result: Some(json),
};
assert!(result.parse_ipc_token_usage().is_none());
}

#[test]
fn parse_ipc_token_usage_returns_none_when_runtime_name_missing() {
let json = serde_json::json!({
"exit_code": 0,
"token_usage": {
"input_tokens": 100,
"output_tokens": 50,
},
});
let result = EvaluateResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
action_result: None,
ipc_result: Some(json),
};
assert!(result.parse_ipc_token_usage().is_none());
}

#[test]
fn parse_ipc_token_usage_returns_none_when_runtime_name_empty() {
let json = serde_json::json!({
"exit_code": 0,
"token_usage": {
"input_tokens": 100,
"output_tokens": 50,
},
"runtime_name": "",
});
let result = EvaluateResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
action_result: None,
ipc_result: Some(json),
};
assert!(result.parse_ipc_token_usage().is_none());
}

#[test]
fn parse_ipc_token_usage_handles_optional_cache_tokens() {
let json = serde_json::json!({
"exit_code": 0,
"token_usage": {
"input_tokens": 200,
"output_tokens": 100,
"cache_read_tokens": null,
"cache_write_tokens": null,
},
"runtime_name": "gemini",
"duration_ms": 1500,
});
let result = EvaluateResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
action_result: None,
ipc_result: Some(json),
};

let ipc_usage = result
.parse_ipc_token_usage()
.expect("should parse even with null cache tokens");
assert_eq!(ipc_usage.token_usage.input_tokens, 200);
assert_eq!(ipc_usage.token_usage.output_tokens, 100);
assert!(ipc_usage.token_usage.cache_read_tokens.is_none());
assert!(ipc_usage.token_usage.cache_write_tokens.is_none());
assert_eq!(ipc_usage.runtime_name, "gemini");
assert!(ipc_usage.model.is_none());
}

#[test]
fn parse_ipc_token_usage_without_model_and_duration() {
let json = serde_json::json!({
"token_usage": {
"input_tokens": 50,
"output_tokens": 25,
},
"runtime_name": "codex",
});
let result = EvaluateResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
action_result: None,
ipc_result: Some(json),
};

let ipc_usage = result
.parse_ipc_token_usage()
.expect("should parse without model and duration");
assert_eq!(ipc_usage.runtime_name, "codex");
assert!(ipc_usage.model.is_none());
assert!(ipc_usage.duration_ms.is_none());
}
}
Loading