Skip to content

Commit a4be94f

Browse files
committed
fix: report task status incrementally during batch execution
Tasks are now visible in /batch/{id}/tasks as soon as they start: - Queued: task spawned, waiting for semaphore - RunningAgent: task acquired permit, agent executing - Completed/Failed: task finished with result Previously, all tasks were only reported after the entire batch completed.
1 parent 3924735 commit a4be94f

File tree

1 file changed

+45
-17
lines changed

1 file changed

+45
-17
lines changed

src/executor.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,7 @@ async fn run_batch(
204204
.await;
205205

206206
let semaphore = Arc::new(Semaphore::new(concurrent_limit));
207-
let task_results: Arc<tokio::sync::Mutex<Vec<TaskResult>>> =
208-
Arc::new(tokio::sync::Mutex::new(Vec::new()));
207+
let batch_result = batch.result.clone();
209208

210209
let mut handles = Vec::new();
211210

@@ -218,23 +217,43 @@ async fn run_batch(
218217
let agent_archive = agent_archive.clone();
219218
let agent_env = agent_env.clone();
220219
let semaphore = semaphore.clone();
221-
let task_results = task_results.clone();
220+
let batch_result = batch_result.clone();
222221
let cancel_rx = batch.cancel.subscribe();
223222

224223
let handle = tokio::spawn(async move {
224+
// Mark task as queued in batch result immediately
225+
{
226+
let mut res = batch_result.lock().await;
227+
let mut placeholder = TaskResult::new(task.id.clone());
228+
placeholder.status = TaskStatus::Queued;
229+
res.tasks.push(placeholder);
230+
}
231+
225232
let _permit = match semaphore.acquire().await {
226233
Ok(p) => p,
227234
Err(_) => {
228235
warn!(task_id = %task.id, "Semaphore closed, skipping task");
229-
let mut result = TaskResult::new(task.id.clone());
230-
result.status = TaskStatus::Failed;
231-
result.error = Some("Semaphore closed".to_string());
232-
task_results.lock().await.push(result);
236+
let mut res = batch_result.lock().await;
237+
if let Some(t) = res.tasks.iter_mut().find(|t| t.task_id == task.id) {
238+
t.status = TaskStatus::Failed;
239+
t.error = Some("Semaphore closed".to_string());
240+
}
241+
res.completed_tasks += 1;
242+
res.failed_tasks += 1;
233243
return;
234244
}
235245
};
236246

237247
let task_id = task.id.clone();
248+
249+
// Mark task as running
250+
{
251+
let mut res = batch_result.lock().await;
252+
if let Some(t) = res.tasks.iter_mut().find(|t| t.task_id == task_id) {
253+
t.status = TaskStatus::RunningAgent;
254+
}
255+
}
256+
238257
let _ = events_tx.send(crate::session::WsEvent {
239258
event: "task_started".to_string(),
240259
batch_id: batch_id.clone(),
@@ -265,7 +284,19 @@ async fn run_batch(
265284
}),
266285
});
267286

268-
task_results.lock().await.push(result);
287+
// Replace placeholder with real result
288+
{
289+
let mut res = batch_result.lock().await;
290+
if let Some(t) = res.tasks.iter_mut().find(|t| t.task_id == task_id) {
291+
*t = result;
292+
}
293+
res.completed_tasks += 1;
294+
if res.tasks.iter().any(|t| t.task_id == task_id && t.reward == 1.0) {
295+
res.passed_tasks += 1;
296+
} else {
297+
res.failed_tasks += 1;
298+
}
299+
}
269300
});
270301

271302
handles.push(handle);
@@ -277,12 +308,9 @@ async fn run_batch(
277308
}
278309
}
279310

280-
let results = task_results.lock().await;
281-
let completed = results.len();
282-
let passed = results.iter().filter(|r| r.reward == 1.0).count();
283-
let failed = completed - passed;
311+
let res = batch.result.lock().await;
284312
let aggregate_reward = if total_tasks > 0 {
285-
results.iter().map(|r| r.reward).sum::<f64>() / total_tasks as f64
313+
res.tasks.iter().map(|r| r.reward).sum::<f64>() / total_tasks as f64
286314
} else {
287315
0.0
288316
};
@@ -291,10 +319,10 @@ async fn run_batch(
291319
batch_id: batch.id.clone(),
292320
status: BatchStatus::Completed,
293321
total_tasks,
294-
completed_tasks: completed,
295-
passed_tasks: passed,
296-
failed_tasks: failed,
297-
tasks: results.clone(),
322+
completed_tasks: res.completed_tasks,
323+
passed_tasks: res.passed_tasks,
324+
failed_tasks: res.failed_tasks,
325+
tasks: res.tasks.clone(),
298326
aggregate_reward,
299327
error: None,
300328
duration_ms: None,

0 commit comments

Comments
 (0)