Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions parallel_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ def __init__(
self._lock = threading.Lock()
# Coding agents: feature_id -> process
self.running_coding_agents: dict[int, subprocess.Popen] = {}
# Testing agents: feature_id -> process (feature being tested)
self.running_testing_agents: dict[int, subprocess.Popen] = {}
# Testing agents: pid -> (feature_id, process)
# Keyed by PID (not feature_id) because multiple agents can test the same feature
self.running_testing_agents: dict[int, tuple[int, subprocess.Popen]] = {}
# Legacy alias for backward compatibility
self.running_agents = self.running_coding_agents
self.abort_events: dict[int, threading.Event] = {}
Expand Down Expand Up @@ -429,7 +430,10 @@ def _maintain_testing_agents(self) -> None:

# Spawn outside lock (I/O bound operation)
print(f"[DEBUG] Spawning testing agent ({spawn_index}/{desired})", flush=True)
self._spawn_testing_agent()
success, msg = self._spawn_testing_agent()
if not success:
debug_log.log("TESTING", f"Spawn failed, stopping: {msg}")
return

def start_feature(self, feature_id: int, resume: bool = False) -> tuple[bool, str]:
"""Start a single coding agent for a feature.
Expand Down Expand Up @@ -611,8 +615,9 @@ def _spawn_testing_agent(self) -> tuple[bool, str]:
debug_log.log("TESTING", f"FAILED to spawn testing agent: {e}")
return False, f"Failed to start testing agent: {e}"

# Register process with feature ID (same pattern as coding agents)
self.running_testing_agents[feature_id] = proc
# Register process by PID (not feature_id) to avoid overwrites
# when multiple agents test the same feature
self.running_testing_agents[proc.pid] = (feature_id, proc)
testing_count = len(self.running_testing_agents)

# Start output reader thread with feature ID (same as coding agents)
Expand Down Expand Up @@ -795,11 +800,8 @@ def _on_agent_complete(
"""
if agent_type == "testing":
with self._lock:
# Remove from dict by finding the feature_id for this proc
for fid, p in list(self.running_testing_agents.items()):
if p is proc:
del self.running_testing_agents[fid]
break
# Remove by PID
self.running_testing_agents.pop(proc.pid, None)

status = "completed" if return_code == 0 else "failed"
print(f"Feature #{feature_id} testing {status}", flush=True)
Expand Down Expand Up @@ -898,9 +900,9 @@ def stop_all(self) -> None:
with self._lock:
testing_items = list(self.running_testing_agents.items())

for feature_id, proc in testing_items:
for pid, (feature_id, proc) in testing_items:
result = kill_process_tree(proc, timeout=5.0)
debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {proc.pid})",
debug_log.log("STOP", f"Killed testing agent for feature #{feature_id} (PID {pid})",
status=result.status, children_found=result.children_found,
children_terminated=result.children_terminated, children_killed=result.children_killed)

Expand Down
Loading