@@ -204,8 +204,7 @@ async fn run_batch(
204204 . await ;
205205
206206 let semaphore = Arc :: new ( Semaphore :: new ( concurrent_limit) ) ;
207- let task_results: Arc < tokio:: sync:: Mutex < Vec < TaskResult > > > =
208- Arc :: new ( tokio:: sync:: Mutex :: new ( Vec :: new ( ) ) ) ;
207+ let batch_result = batch. result . clone ( ) ;
209208
210209 let mut handles = Vec :: new ( ) ;
211210
@@ -218,23 +217,43 @@ async fn run_batch(
218217 let agent_archive = agent_archive. clone ( ) ;
219218 let agent_env = agent_env. clone ( ) ;
220219 let semaphore = semaphore. clone ( ) ;
221- let task_results = task_results . clone ( ) ;
220+ let batch_result = batch_result . clone ( ) ;
222221 let cancel_rx = batch. cancel . subscribe ( ) ;
223222
224223 let handle = tokio:: spawn ( async move {
224+ // Mark task as queued in batch result immediately
225+ {
226+ let mut res = batch_result. lock ( ) . await ;
227+ let mut placeholder = TaskResult :: new ( task. id . clone ( ) ) ;
228+ placeholder. status = TaskStatus :: Queued ;
229+ res. tasks . push ( placeholder) ;
230+ }
231+
225232 let _permit = match semaphore. acquire ( ) . await {
226233 Ok ( p) => p,
227234 Err ( _) => {
228235 warn ! ( task_id = %task. id, "Semaphore closed, skipping task" ) ;
229- let mut result = TaskResult :: new ( task. id . clone ( ) ) ;
230- result. status = TaskStatus :: Failed ;
231- result. error = Some ( "Semaphore closed" . to_string ( ) ) ;
232- task_results. lock ( ) . await . push ( result) ;
236+ let mut res = batch_result. lock ( ) . await ;
237+ if let Some ( t) = res. tasks . iter_mut ( ) . find ( |t| t. task_id == task. id ) {
238+ t. status = TaskStatus :: Failed ;
239+ t. error = Some ( "Semaphore closed" . to_string ( ) ) ;
240+ }
241+ res. completed_tasks += 1 ;
242+ res. failed_tasks += 1 ;
233243 return ;
234244 }
235245 } ;
236246
237247 let task_id = task. id . clone ( ) ;
248+
249+ // Mark task as running
250+ {
251+ let mut res = batch_result. lock ( ) . await ;
252+ if let Some ( t) = res. tasks . iter_mut ( ) . find ( |t| t. task_id == task_id) {
253+ t. status = TaskStatus :: RunningAgent ;
254+ }
255+ }
256+
238257 let _ = events_tx. send ( crate :: session:: WsEvent {
239258 event : "task_started" . to_string ( ) ,
240259 batch_id : batch_id. clone ( ) ,
@@ -265,7 +284,23 @@ async fn run_batch(
265284 } ) ,
266285 } ) ;
267286
268- task_results. lock ( ) . await . push ( result) ;
287+ // Replace placeholder with real result
288+ {
289+ let mut res = batch_result. lock ( ) . await ;
290+ if let Some ( t) = res. tasks . iter_mut ( ) . find ( |t| t. task_id == task_id) {
291+ * t = result;
292+ }
293+ res. completed_tasks += 1 ;
294+ if res
295+ . tasks
296+ . iter ( )
297+ . any ( |t| t. task_id == task_id && t. reward == 1.0 )
298+ {
299+ res. passed_tasks += 1 ;
300+ } else {
301+ res. failed_tasks += 1 ;
302+ }
303+ }
269304 } ) ;
270305
271306 handles. push ( handle) ;
@@ -277,12 +312,9 @@ async fn run_batch(
277312 }
278313 }
279314
280- let results = task_results. lock ( ) . await ;
281- let completed = results. len ( ) ;
282- let passed = results. iter ( ) . filter ( |r| r. reward == 1.0 ) . count ( ) ;
283- let failed = completed - passed;
315+ let res = batch. result . lock ( ) . await ;
284316 let aggregate_reward = if total_tasks > 0 {
285- results . iter ( ) . map ( |r| r. reward ) . sum :: < f64 > ( ) / total_tasks as f64
317+ res . tasks . iter ( ) . map ( |r| r. reward ) . sum :: < f64 > ( ) / total_tasks as f64
286318 } else {
287319 0.0
288320 } ;
@@ -291,10 +323,10 @@ async fn run_batch(
291323 batch_id : batch. id . clone ( ) ,
292324 status : BatchStatus :: Completed ,
293325 total_tasks,
294- completed_tasks : completed ,
295- passed_tasks : passed ,
296- failed_tasks : failed ,
297- tasks : results . clone ( ) ,
326+ completed_tasks : res . completed_tasks ,
327+ passed_tasks : res . passed_tasks ,
328+ failed_tasks : res . failed_tasks ,
329+ tasks : res . tasks . clone ( ) ,
298330 aggregate_reward,
299331 error : None ,
300332 duration_ms : None ,
0 commit comments