Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions coral/agent/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, config: CoralConfig, verbose: bool = False, config_dir: Path
self._agent_eval_counts: dict[str, int] = {}
self._agent_best_scores: dict[str, float] = {}
self._agent_evals_since_improvement: dict[str, int] = {}
self._dispatcher = None # QueueDispatcher, if queue is enabled

def start_all(self) -> list[AgentHandle]:
"""Create workspace structure and spawn all agents."""
Expand All @@ -72,12 +73,15 @@ def start_all(self) -> list[AgentHandle]:
logger.info(f" coral_dir: {self.paths.coral_dir}")
logger.info(f" repo_dir: {self.paths.repo_dir}")

# 2. Seed global heartbeat config if not already present
# 2. Initialize queue dispatcher
self._init_dispatcher(self.paths.coral_dir)

# 3. Seed global heartbeat config if not already present
if not read_global_heartbeat(self.paths.coral_dir):
write_global_heartbeat(self.paths.coral_dir, default_global_actions(self.config))
logger.info("Seeded global heartbeat config")

# 3. For each agent: create worktree, generate CLAUDE.md, spawn runtime
# 4. For each agent: create worktree, generate CLAUDE.md, spawn runtime
handles = []
for i in range(self.config.agents.count):
agent_id = f"agent-{i + 1}"
Expand Down Expand Up @@ -221,6 +225,9 @@ def resume_all(self, paths: ProjectPaths, instruction: str | None = None) -> lis
self._start_time = datetime.now(UTC)
self.paths = paths

# Initialize queue dispatcher
self._init_dispatcher(paths.coral_dir)

# Kill any leftover agent processes from a previous run so they
# don't hold session locks and block the new agents.
self._kill_old_agent_processes()
Expand Down Expand Up @@ -331,6 +338,20 @@ def _find_latest_session_from_logs(self, agent_id: str) -> str | None:
return self.runtime.extract_session_id(logs[-1])
return None

def _init_dispatcher(self, coral_dir: Path) -> None:
"""Initialize the queue dispatcher."""
from coral.queue.dispatcher import QueueDispatcher

# Ensure queue directories exist
(coral_dir / "queue" / "pending").mkdir(parents=True, exist_ok=True)
(coral_dir / "queue" / "results").mkdir(parents=True, exist_ok=True)

self._dispatcher = QueueDispatcher(coral_dir, self.config.grader.queue)
logger.info(
f"Queue dispatcher initialized (max_concurrent={self.config.grader.queue.max_concurrent}, "
f"strategy={self.config.grader.queue.strategy}, executor={self.config.grader.queue.executor})"
)

def stop_all(self) -> None:
"""Gracefully stop all agents.

Expand All @@ -342,6 +363,9 @@ def stop_all(self) -> None:
self._stopping = True
self._running = False
self._stop_event.set()
# Shut down queue dispatcher
if self._dispatcher is not None:
self._dispatcher.shutdown()
# Save session IDs before killing processes
self._save_sessions()
for handle in self.handles:
Expand Down Expand Up @@ -489,6 +513,13 @@ def _signal_handler(sig: int, frame: Any) -> None:
logger.info(f"Monitoring {len(self.handles)} agent(s) (check every {check_interval}s)...")

while self._running:
# Poll queue dispatcher if enabled
if self._dispatcher is not None:
try:
self._dispatcher.poll_and_dispatch()
except Exception as e:
logger.error(f"Queue dispatcher error: {e}")

# Check for new attempts
current_attempts = self._get_seen_attempts()
new_attempts = current_attempts - seen_attempts
Expand Down
15 changes: 15 additions & 0 deletions coral/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ class TaskConfig:
seed: list[str] = field(default_factory=list) # files/dirs to copy into workspace


@dataclass
class QueueConfig:
"""Task queue configuration for centralized grading."""

max_concurrent: int = 1 # max simultaneous grader jobs
strategy: str = "fair" # "fair" | "fifo" | "priority"
rate_limit: float = 0.0 # min seconds between evals per agent (0 = unlimited)
max_queue_size: int = 0 # max pending requests (0 = unlimited)
poll_interval: float = 1.0 # agent-side result polling interval (seconds)
timeout: int = 0 # max seconds agent waits for result (0 = grader timeout + 60s)
executor: str = "local" # "local" | "slurm"
executor_args: dict[str, Any] = field(default_factory=dict) # passed to submitit executor


@dataclass
class GraderConfig:
"""Grader configuration."""
Expand All @@ -33,6 +47,7 @@ class GraderConfig:
default_factory=list
) # files/dirs copied to .coral/ (hidden from agents)
direction: str = "maximize" # "maximize" or "minimize"
queue: QueueConfig = field(default_factory=QueueConfig)


@dataclass
Expand Down
191 changes: 59 additions & 132 deletions coral/hooks/post_commit.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
"""Eval implementation: git-add, git-commit, grade, write attempt JSON, print score."""
"""Eval implementation: git-add, git-commit, queue for grading, write attempt JSON, print score."""

from __future__ import annotations

import json
import logging
import multiprocessing
import subprocess
import traceback
from datetime import UTC, datetime
from pathlib import Path

from coral.config import CoralConfig
from coral.grader.loader import load_grader
from coral.hub.attempts import get_agent_attempts, write_attempt
from coral.hub.checkpoint import checkpoint
from coral.types import Attempt, Task
from coral.queue.client import QueueClient
from coral.queue.counter import increment_eval_count
from coral.types import Attempt

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,79 +63,58 @@ def _get_parent_hash(commit_hash: str, cwd: str) -> str | None:
return None


def _increment_eval_count(coral_dir: Path) -> int:
"""Increment and return the eval counter for this run."""
counter_file = coral_dir / "public" / "eval_count"
count = 0
if counter_file.exists():
try:
count = int(counter_file.read_text().strip())
except ValueError:
pass
count += 1
counter_file.write_text(str(count))
return count


def _grader_worker(config_path: str, coral_dir: str, codebase_path: str, tasks: list, result_queue):
"""Run grader.grade() in a child process. Puts result or exception into queue.

Re-loads the grader from config inside the child to avoid pickling
dynamically-imported modules across process boundaries.
"""
import asyncio
try:
config = CoralConfig.from_yaml(config_path)
grader = load_grader(config, coral_dir=coral_dir)
result = asyncio.run(grader.grade(codebase_path, tasks))
result_queue.put(("ok", result))
except Exception as e:
result_queue.put(("error", e, traceback.format_exc()))

def _run_eval_via_queue(
config: CoralConfig,
coral_dir: Path,
agent_id: str,
commit_hash: str,
config_path: str,
codebase_path: str,
) -> tuple[float | None, str, str]:
"""Submit eval request to the queue and wait for a result."""
client = QueueClient(coral_dir, config.grader.queue, grader_timeout=config.grader.timeout)
request = client.submit(agent_id, commit_hash, config_path, codebase_path)
position = client.get_position(request.ticket_id)
print(f"Eval queued (position {position}, ticket {request.ticket_id[:8]})")

def _run_grader_with_timeout(config_path: str, coral_dir: str, codebase_path: str, tasks: list, timeout: int):
"""Run grader in a separate process with a hard timeout.

Uses multiprocessing so we can kill blocking synchronous code (numpy, etc.)
that asyncio.wait_for can't interrupt. The grader is re-loaded from config
inside the child process to avoid pickle issues with dynamic imports.
"""
if timeout <= 0:
# No timeout — run directly
import asyncio
config = CoralConfig.from_yaml(config_path)
grader = load_grader(config, coral_dir=coral_dir)
return asyncio.run(grader.grade(codebase_path, tasks))

result_queue: multiprocessing.Queue = multiprocessing.Queue()
proc = multiprocessing.Process(
target=_grader_worker,
args=(config_path, coral_dir, codebase_path, tasks, result_queue),
)
try:
proc.start()
proc.join(timeout=timeout)

if proc.is_alive():
# Timed out — kill the process
proc.kill()
proc.join(timeout=5)
raise TimeoutError(f"Grader timed out after {timeout}s")

if result_queue.empty():
raise RuntimeError("Grader process exited without returning a result")

status, *payload = result_queue.get_nowait()
if status == "ok":
return payload[0]
else:
# Re-raise the exception from the child process
exc, tb_str = payload
raise RuntimeError(f"Grader failed: {exc}\n{tb_str}")
finally:
result_queue.close()
result_queue.join_thread()
proc.close()
eval_result = client.wait_for_result(request.ticket_id)
except TimeoutError:
logger.error("Queue timeout waiting for eval result")
return None, "Timed out waiting for eval result from queue.", "timeout"

if eval_result.status == "error":
logger.error(f"Queue eval failed: {eval_result.error}")
return None, eval_result.error, "crashed"

if eval_result.status == "timeout":
return None, eval_result.error or "Grader timed out.", "timeout"

score = eval_result.score
feedback = eval_result.feedback

if score is None:
return score, feedback, "crashed"

# Compare against previous best
prev_attempts = get_agent_attempts(str(coral_dir), agent_id)
prev_scores = [a.score for a in prev_attempts if a.score is not None]
minimize = config.grader.direction == "minimize"
if minimize:
prev_best = min(prev_scores) if prev_scores else None
else:
prev_best = max(prev_scores) if prev_scores else None
if prev_best is None:
status = "improved"
elif minimize and score < prev_best:
status = "improved"
elif not minimize and score > prev_best:
status = "improved"
elif score == prev_best:
status = "baseline"
else:
status = "regressed"
return score, feedback, status


def _find_coral_dir(workdir: Path) -> Path | None:
Expand All @@ -151,7 +129,7 @@ def _find_coral_dir(workdir: Path) -> Path | None:


def run_eval(message: str, agent_id: str, workdir: str = ".") -> Attempt:
"""Stage changes, commit with message, run evaluation, and return an Attempt record.
"""Stage changes, commit with message, submit to eval queue, and return an Attempt record.

This is the core of `coral eval -m "description"`.
"""
Expand All @@ -175,62 +153,11 @@ def run_eval(message: str, agent_id: str, workdir: str = ".") -> Attempt:
commit_hash = _git_add_and_commit(message, str(workdir_path))
parent_hash = _get_parent_hash(commit_hash, str(workdir_path))

# Create task from config
task = Task(
id=config.task.name,
name=config.task.name,
description=config.task.description,
metadata={"files": config.task.files},
# Submit to queue and wait for result
score, feedback, status = _run_eval_via_queue(
config, coral_dir, agent_id, commit_hash, str(config_path), str(workdir_path),
)

# Run evaluation with timeout
eval_timeout = config.grader.timeout # 0 = no limit

try:
result = _run_grader_with_timeout(str(config_path), str(coral_dir), str(workdir_path), [task], eval_timeout)
score = result.aggregated
# Build feedback from bundle-level feedback + per-score explanations
parts = []
if result.feedback:
parts.append(result.feedback)
if result.scores:
for name, s in result.scores.items():
if s.explanation:
parts.append(f"{name}: {s.explanation}")
feedback = "\n".join(parts)
# score is None when grader returns fail() — treat as crashed
if score is None:
status = "crashed"
else:
# Compare against this agent's previous best score
prev_attempts = get_agent_attempts(str(coral_dir), agent_id)
prev_scores = [a.score for a in prev_attempts if a.score is not None]
minimize = config.grader.direction == "minimize"
if minimize:
prev_best = min(prev_scores) if prev_scores else None
else:
prev_best = max(prev_scores) if prev_scores else None
if prev_best is None:
status = "improved"
elif minimize and score < prev_best:
status = "improved"
elif not minimize and score > prev_best:
status = "improved"
elif score == prev_best:
status = "baseline"
else:
status = "regressed"
except TimeoutError:
logger.error(f"Evaluation timed out after {eval_timeout}s")
score = None
status = "timeout"
feedback = f"Eval timed out after {eval_timeout}s."
except Exception as e:
logger.error(f"Evaluation failed: {e}")
score = None
status = "crashed"
feedback = str(e)

# Look up parent attempt's shared state hash
parent_shared_state_hash = None
if parent_hash:
Expand Down Expand Up @@ -262,7 +189,7 @@ def run_eval(message: str, agent_id: str, workdir: str = ".") -> Attempt:
write_attempt(str(coral_dir), attempt)

# Track eval count
eval_count = _increment_eval_count(coral_dir)
eval_count = increment_eval_count(coral_dir)
attempt._eval_count = eval_count # type: ignore[attr-defined]

return attempt
1 change: 1 addition & 0 deletions coral/queue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Task queue for centralized grading with submitit."""
Loading