diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index ab28ae60..8c91db10 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -170,8 +170,9 @@ def __init__( self._lock = threading.Lock() # Coding agents: feature_id -> process self.running_coding_agents: dict[int, subprocess.Popen] = {} - # Testing agents: feature_id -> process (feature being tested) - self.running_testing_agents: dict[int, subprocess.Popen] = {} + # Testing agents: pid -> (feature_id, process) + # Keyed by PID (not feature_id) because multiple agents can test the same feature + self.running_testing_agents: dict[int, tuple[int, subprocess.Popen]] = {} # Legacy alias for backward compatibility self.running_agents = self.running_coding_agents self.abort_events: dict[int, threading.Event] = {} @@ -429,7 +430,10 @@ def _maintain_testing_agents(self) -> None: # Spawn outside lock (I/O bound operation) print(f"[DEBUG] Spawning testing agent ({spawn_index}/{desired})", flush=True) - self._spawn_testing_agent() + success, msg = self._spawn_testing_agent() + if not success: + debug_log.log("TESTING", f"Spawn failed, stopping: {msg}") + return def start_feature(self, feature_id: int, resume: bool = False) -> tuple[bool, str]: """Start a single coding agent for a feature. @@ -611,8 +615,9 @@ def _spawn_testing_agent(self) -> tuple[bool, str]: debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}") return False, f"Failed to start testing agent: {e}" - # Register process with feature ID (same pattern as coding agents) - self.running_testing_agents[feature_id] = proc + # Register process by PID (not feature_id) to avoid overwrites + # when multiple agents test the same feature + self.running_testing_agents[proc.pid] = (feature_id, proc) testing_count = len(self.running_testing_agents) # Start output reader thread with feature ID (same as coding agents) @@ -795,11 +800,8 @@ def _on_agent_complete( """ if agent_type == "testing": with self._lock: - # Remove from dict by finding the feature_id for this proc - for fid, p in list(self.running_testing_agents.items()): - if p is proc: - del self.running_testing_agents[fid] - break + # Remove by PID + self.running_testing_agents.pop(proc.pid, None) status = "completed" if return_code == 0 else "failed" print(f"Feature #{feature_id} testing {status}", flush=True) @@ -898,9 +900,9 @@ def stop_all(self) -> None: with self._lock: testing_items = list(self.running_testing_agents.items()) - for feature_id, proc in testing_items: + for pid, (feature_id, proc) in testing_items: result = kill_process_tree(proc, timeout=5.0) - debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {proc.pid})", + debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {pid})", status=result.status, children_found=result.children_found, children_terminated=result.children_terminated, children_killed=result.children_killed)