diff --git a/README.md b/README.md index a306faf..5db114d 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ export PINCHBENCH_OFFICIAL_KEY=your_official_key | Flag | Description | | ------------------------ | ----------------------------------------------------------------------------- | | `--model MODEL` | Model to test (e.g., `openrouter/anthropic/claude-sonnet-4`) | -| `--judge MODEL` | Judge model for LLM grading (default: `openrouter/anthropic/claude-opus-4.5`) | +| `--judge MODEL` | Judge model for LLM grading; uses direct API when set (see below) | | `--suite SUITE` | `all`, `automated-only`, or comma-separated task IDs | | `--runs N` | Number of runs per task for averaging | | `--timeout-multiplier N` | Scale timeouts for slower models | @@ -103,6 +103,29 @@ export PINCHBENCH_OFFICIAL_KEY=your_official_key | `--upload FILE` | Upload a previous results JSON | | `--official-key KEY` | Mark submission as official (or use `PINCHBENCH_OFFICIAL_KEY` env var) | +### Judge + +By default (no `--judge` flag), the LLM judge runs as an OpenClaw agent session. When `--judge` is specified, it calls the model API directly instead, bypassing OpenClaw personality injection. + +```bash +# Default: OpenClaw agent session (no --judge needed) +./scripts/run.sh --model openrouter/anthropic/claude-sonnet-4 + +# Direct API via OpenRouter +./scripts/run.sh --model openai/gpt-4o --judge openrouter/anthropic/claude-sonnet-4-5 + +# Direct API via Anthropic +./scripts/run.sh --model openai/gpt-4o --judge anthropic/claude-sonnet-4-5-20250514 + +# Direct API via OpenAI +./scripts/run.sh --model openai/gpt-4o --judge openai/gpt-4o + +# Headless Claude CLI +./scripts/run.sh --model openai/gpt-4o --judge claude +``` + +Required env vars: `OPENROUTER_API_KEY`, `ANTHROPIC_API_KEY`, or `OPENAI_API_KEY` depending on the judge model prefix. + ## Contributing Tasks We welcome new tasks! Check out [`tasks/TASK_TEMPLATE.md`](tasks/TASK_TEMPLATE.md) for the format. Good tasks are: @@ -112,6 +135,10 @@ We welcome new tasks! Check out [`tasks/TASK_TEMPLATE.md`](tasks/TASK_TEMPLATE.m - **Reproducible** — Same task should produce consistent grading - **Challenging** — Tests agent capabilities, not just LLM knowledge +### Transcript Archive + +Session transcripts are automatically saved to `results/{run_id}_transcripts/` alongside the results JSON. Each task's full agent conversation is preserved as a JSONL file (e.g. `task_01_calendar.jsonl`) for post-run analysis. + ## Links - **Leaderboard:** [pinchbench.com](https://pinchbench.com) diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 566bcb3..b99b024 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -217,7 +217,11 @@ def _parse_args() -> argparse.Namespace: parser.add_argument( "--judge", default=None, - help="Judge model identifier (default: openrouter/anthropic/claude-opus-4.5)", + help=( + "Judge model or backend. Default (unset): OpenClaw agent session with " + "openrouter/anthropic/claude-opus-4.5. Set to a model ID to call its API " + "directly (e.g. openai/gpt-4o, anthropic/claude-sonnet-4-5-20250514, claude)" + ), ) parser.add_argument( "--verbose", @@ -582,6 +586,47 @@ def main(): tasks_by_id = {task.task_id: task for task in tasks_to_run} runs_per_task = max(1, args.runs) + + # Incremental result writer: builds partial result JSON from completed + # tasks so external tools can poll progress while the benchmark runs. + incremental_dir = Path(args.output_dir) + incremental_dir.mkdir(parents=True, exist_ok=True) + incremental_path = incremental_dir / f"{run_id}_{model_slug}.json" + + def _write_incremental_results(): + task_entries = [ + { + "task_id": r["task_id"], + "status": r["status"], + "timed_out": r["timed_out"], + "execution_time": r["execution_time"], + "transcript_length": len(r["transcript"]), + "usage": r.get("usage", {}), + "workspace": r["workspace"], + "grading": grades_by_task_id.get(r["task_id"], {}), + "frontmatter": tasks_by_id[r["task_id"]].frontmatter, + } + for r in results + ] + efficiency = _compute_efficiency_summary(task_entries, grades_by_task_id) + partial = { + "model": args.model, + "benchmark_version": _get_git_version(skill_root), + "run_id": run_id, + "timestamp": time.time(), + "suite": args.suite, + "runs_per_task": runs_per_task, + "tasks": task_entries, + "efficiency": efficiency, + "in_progress": True, + "completed_tasks": len(grades_by_task_id), + "total_tasks": len(tasks_to_run), + } + try: + incremental_path.write_text(json.dumps(partial, indent=2), encoding="utf-8") + except OSError: + pass + for i, task in enumerate(tasks_to_run, 1): task_grades = [] task_results = [] @@ -604,6 +649,7 @@ def main(): run_id=f"{run_id}-{run_index + 1}", timeout_multiplier=args.timeout_multiplier, skill_dir=skill_dir, + output_dir=Path(args.output_dir) / f"{run_id}_transcripts", verbose=args.verbose, ) except Exception as exc: @@ -628,6 +674,7 @@ def main(): ) if args.judge: grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" grade = grade_task(**grade_kwargs) except Exception as exc: if execution_error: @@ -693,39 +740,45 @@ def main(): "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." ) + # Incremental write: update result JSON after each task so partial + # results are available while the benchmark is still running. + _write_incremental_results() + output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / f"{run_id}_{model_slug}.json" - task_entries = [ - { - "task_id": result["task_id"], - "status": result["status"], - "timed_out": result["timed_out"], - "execution_time": result["execution_time"], - "transcript_length": len(result["transcript"]), - "usage": result.get("usage", {}), - "workspace": result["workspace"], - "grading": grades_by_task_id[result["task_id"]], - "frontmatter": tasks_by_id[result["task_id"]].frontmatter, + def _build_and_write_results(): + """Build aggregate result from completed tasks and write to output_path.""" + task_entries = [ + { + "task_id": result["task_id"], + "status": result["status"], + "timed_out": result["timed_out"], + "execution_time": result["execution_time"], + "transcript_length": len(result["transcript"]), + "usage": result.get("usage", {}), + "workspace": result["workspace"], + "grading": grades_by_task_id[result["task_id"]], + "frontmatter": tasks_by_id[result["task_id"]].frontmatter, + } + for result in results + ] + efficiency = _compute_efficiency_summary(task_entries, grades_by_task_id) + aggregate = { + "model": args.model, + "benchmark_version": _get_git_version(skill_root), + "run_id": run_id, + "timestamp": time.time(), + "suite": args.suite, + "runs_per_task": runs_per_task, + "tasks": task_entries, + "efficiency": efficiency, } - for result in results - ] - - efficiency = _compute_efficiency_summary(task_entries, grades_by_task_id) - - aggregate = { - "model": args.model, - "benchmark_version": _get_git_version(skill_root), - "run_id": run_id, - "timestamp": time.time(), - "suite": args.suite, - "runs_per_task": runs_per_task, - "tasks": task_entries, - "efficiency": efficiency, - } + output_path.write_text(json.dumps(aggregate, indent=2), encoding="utf-8") + return task_entries, efficiency - output_path = output_dir / f"{run_id}_{model_slug}.json" - output_path.write_text(json.dumps(aggregate, indent=2), encoding="utf-8") + task_entries, efficiency = _build_and_write_results() # Calculate and log final score summary total_score = sum(grades_by_task_id[tid]["mean"] for tid in grades_by_task_id) diff --git a/scripts/lib_agent.py b/scripts/lib_agent.py index 7f261bf..0b2e4a9 100644 --- a/scripts/lib_agent.py +++ b/scripts/lib_agent.py @@ -366,14 +366,12 @@ def prepare_task_workspace(skill_dir: Path, run_id: str, task: Task, agent_id: s _BOOTSTRAP_FILES = ["SOUL.md", "BOOTSTRAP.md", "USER.md", "IDENTITY.md", "HEARTBEAT.md", "TOOLS.md"] - def _remove_readonly(func, path, _): def _remove_readonly(func, path, _): try: os.chmod(path, stat.S_IWRITE) func(path) except OSError: pass - func(path) saved_bootstrap: dict[str, bytes] = {} if workspace.exists(): @@ -528,7 +526,9 @@ def _find_recent_session_path(agent_dir: Path, started_at: float) -> Path | None return max(pool, key=lambda path: path.stat().st_mtime) -def _load_transcript(agent_id: str, session_id: str, started_at: float) -> List[Dict[str, Any]]: +def _load_transcript( + agent_id: str, session_id: str, started_at: float +) -> tuple[List[Dict[str, Any]], Optional[Path]]: agent_dir = _get_agent_store_dir(agent_id) transcript_path = None @@ -623,7 +623,7 @@ def _load_transcript(agent_id: str, session_id: str, started_at: float) -> List[ "Transcript not found — sessions dir does not exist: %s", sessions_dir, ) - return [] + return [], None transcript: List[Dict[str, Any]] = [] for line in transcript_path.read_text(encoding="utf-8").splitlines(): @@ -634,7 +634,7 @@ def _load_transcript(agent_id: str, session_id: str, started_at: float) -> List[ except json.JSONDecodeError as exc: logger.warning("Failed to parse transcript line: %s", exc) transcript.append({"raw": line, "parse_error": str(exc)}) - return transcript + return transcript, transcript_path def _extract_usage_from_transcript(transcript: List[Dict[str, Any]]) -> Dict[str, Any]: @@ -676,6 +676,7 @@ def execute_openclaw_task( run_id: str, timeout_multiplier: float, skill_dir: Path, + output_dir: Optional[Path] = None, verbose: bool = False, ) -> Dict[str, Any]: logger.info("🤖 Agent [%s] starting task: %s", agent_id, task.task_id) @@ -783,10 +784,21 @@ def execute_openclaw_task( except FileNotFoundError as exc: stderr = f"openclaw command not found: {exc}" - transcript = _load_transcript(agent_id, session_id, start_time) + transcript, transcript_path = _load_transcript(agent_id, session_id, start_time) usage = _extract_usage_from_transcript(transcript) execution_time = time.time() - start_time + # Archive the raw transcript JSONL before cleanup_agent_sessions deletes it + if transcript_path and output_dir: + import shutil as _shutil + output_dir.mkdir(parents=True, exist_ok=True) + archive_dest = output_dir / f"{task.task_id}.jsonl" + try: + _shutil.copy2(transcript_path, archive_dest) + logger.info("Archived transcript to %s", archive_dest) + except OSError as exc: + logger.warning("Failed to archive transcript: %s", exc) + status = "success" if timed_out: status = "timeout" @@ -948,7 +960,7 @@ def run_openclaw_prompt( stderr += f"openclaw command not found: {exc}" break - transcript = _load_transcript(agent_id, session_id, start_time) + transcript, _ = _load_transcript(agent_id, session_id, start_time) execution_time = time.time() - start_time status = "success" @@ -972,3 +984,179 @@ def run_openclaw_prompt( "stdout": stdout, "stderr": stderr, } + + +_JUDGE_SYSTEM_MSG = ( + "You are a strict grading function. " + "Respond with ONLY a JSON object, no prose, no markdown fences, no extra text." +) + + +def call_judge_api( + *, + prompt: str, + model: str, + timeout_seconds: float = 120.0, +) -> Dict[str, Any]: + """Call a judge model directly via API, bypassing OpenClaw. + + Dispatches based on model prefix: + - openrouter/* -> OpenRouter chat completions API + - anthropic/* -> Anthropic Messages API + - openai/* -> OpenAI chat completions API + - claude -> headless Claude CLI (claude -p) + + Returns {"status": str, "text": str, "error"?: str}. + """ + if model == "claude" or model.startswith("claude:"): + return _judge_via_claude_cli(prompt, model, timeout_seconds) + if model.startswith("anthropic/"): + return _judge_via_anthropic(prompt, model, timeout_seconds) + if model.startswith("openai/"): + return _judge_via_openai(prompt, model, timeout_seconds) + # Default: OpenRouter (handles openrouter/ prefix and bare provider/model) + return _judge_via_openrouter(prompt, model, timeout_seconds) + + +def _judge_via_openai_compat( + prompt: str, + api_model: str, + endpoint: str, + api_key: str, + timeout_seconds: float, + extra_headers: Optional[Dict[str, str]] = None, +) -> Dict[str, Any]: + """Shared implementation for OpenAI-compatible chat completions APIs.""" + payload = json.dumps({ + "model": api_model, + "messages": [ + {"role": "system", "content": _JUDGE_SYSTEM_MSG}, + {"role": "user", "content": prompt}, + ], + "temperature": 0.0, + "max_tokens": 2048, + }).encode("utf-8") + + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + if extra_headers: + headers.update(extra_headers) + + req = request.Request(endpoint, data=payload, headers=headers, method="POST") + try: + with request.urlopen(req, timeout=timeout_seconds) as resp: + data = json.loads(resp.read().decode("utf-8")) + except error.HTTPError as exc: + body = "" + try: + body = exc.read().decode("utf-8", errors="replace")[:500] + except Exception: + pass + logger.error("Judge API error (%s): %s", exc.code, body) + return {"status": "error", "text": "", "error": f"HTTP {exc.code}: {body}"} + except error.URLError as exc: + logger.error("Judge network error: %s", exc) + return {"status": "error", "text": "", "error": str(exc)} + except TimeoutError: + return {"status": "timeout", "text": "", "error": "Request timed out"} + + choices = data.get("choices", []) + if not choices: + return {"status": "error", "text": "", "error": "No choices in response"} + text = choices[0].get("message", {}).get("content", "") + return {"status": "success", "text": text} + + +def _judge_via_openrouter(prompt: str, model: str, timeout_seconds: float) -> Dict[str, Any]: + api_key = os.environ.get("OPENROUTER_API_KEY") + if not api_key: + return {"status": "error", "text": "", "error": "OPENROUTER_API_KEY not set"} + bare_model = model.removeprefix("openrouter/") + return _judge_via_openai_compat( + prompt, bare_model, + "https://openrouter.ai/api/v1/chat/completions", + api_key, timeout_seconds, + extra_headers={"HTTP-Referer": "https://pinchbench.com", "X-Title": "PinchBench-Judge"}, + ) + + +def _judge_via_openai(prompt: str, model: str, timeout_seconds: float) -> Dict[str, Any]: + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + return {"status": "error", "text": "", "error": "OPENAI_API_KEY not set"} + bare_model = model.removeprefix("openai/") + return _judge_via_openai_compat( + prompt, bare_model, + "https://api.openai.com/v1/chat/completions", + api_key, timeout_seconds, + ) + + +def _judge_via_anthropic(prompt: str, model: str, timeout_seconds: float) -> Dict[str, Any]: + api_key = os.environ.get("ANTHROPIC_API_KEY") + if not api_key: + return {"status": "error", "text": "", "error": "ANTHROPIC_API_KEY not set"} + bare_model = model.removeprefix("anthropic/") + payload = json.dumps({ + "model": bare_model, + "max_tokens": 2048, + "temperature": 0.0, + "system": _JUDGE_SYSTEM_MSG, + "messages": [{"role": "user", "content": prompt}], + }).encode("utf-8") + headers = { + "x-api-key": api_key, + "Content-Type": "application/json", + "anthropic-version": "2023-06-01", + } + req = request.Request( + "https://api.anthropic.com/v1/messages", + data=payload, headers=headers, method="POST", + ) + try: + with request.urlopen(req, timeout=timeout_seconds) as resp: + data = json.loads(resp.read().decode("utf-8")) + except error.HTTPError as exc: + body = "" + try: + body = exc.read().decode("utf-8", errors="replace")[:500] + except Exception: + pass + logger.error("Anthropic judge API error (%s): %s", exc.code, body) + return {"status": "error", "text": "", "error": f"HTTP {exc.code}: {body}"} + except error.URLError as exc: + logger.error("Anthropic judge network error: %s", exc) + return {"status": "error", "text": "", "error": str(exc)} + except TimeoutError: + return {"status": "timeout", "text": "", "error": "Request timed out"} + + content = data.get("content", []) + text = "".join(block.get("text", "") for block in content if block.get("type") == "text") + return {"status": "success", "text": text} + + +def _judge_via_claude_cli(prompt: str, model: str, timeout_seconds: float) -> Dict[str, Any]: + """Use headless Claude CLI (claude -p) as judge.""" + cmd: List[str] = ["claude", "-p"] + # Support "claude:model-name" to pass --model + if ":" in model: + _, cli_model = model.split(":", 1) + cmd.extend(["--model", cli_model]) + try: + result = subprocess.run( + cmd, + input=f"{_JUDGE_SYSTEM_MSG}\n\n{prompt}", + capture_output=True, + text=True, + timeout=timeout_seconds, + check=False, + ) + except FileNotFoundError: + return {"status": "error", "text": "", "error": "claude CLI not found"} + except subprocess.TimeoutExpired: + return {"status": "timeout", "text": "", "error": "claude -p timed out"} + if result.returncode != 0: + return {"status": "error", "text": "", "error": f"claude exit {result.returncode}: {result.stderr[:300]}"} + return {"status": "success", "text": result.stdout} diff --git a/scripts/lib_grading.py b/scripts/lib_grading.py index bf25b2d..87de7e5 100644 --- a/scripts/lib_grading.py +++ b/scripts/lib_grading.py @@ -9,9 +9,9 @@ import re from dataclasses import dataclass from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional -from lib_agent import ensure_agent_exists, run_openclaw_prompt, slugify_model +from lib_agent import call_judge_api, ensure_agent_exists, run_openclaw_prompt, slugify_model from lib_tasks import Task @@ -51,6 +51,7 @@ def grade_task( judge_model: str = DEFAULT_JUDGE_MODEL, judge_agent_prefix: str = DEFAULT_JUDGE_AGENT_PREFIX, judge_timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS, + judge_backend: str = "openclaw", verbose: bool = False, ) -> GradeResult: grading_type = task.grading_type @@ -70,6 +71,7 @@ def grade_task( judge_model=judge_model, judge_agent_prefix=judge_agent_prefix, judge_timeout_seconds=judge_timeout_seconds, + judge_backend=judge_backend, skill_dir=skill_dir, verbose=verbose, ) @@ -84,6 +86,7 @@ def grade_task( judge_model=judge_model, judge_agent_prefix=judge_agent_prefix, judge_timeout_seconds=judge_timeout_seconds, + judge_backend=judge_backend, skill_dir=skill_dir, verbose=verbose, ) @@ -144,7 +147,8 @@ def _grade_llm_judge( judge_model: str, judge_agent_prefix: str, judge_timeout_seconds: float, - skill_dir: Path, + judge_backend: str = "openclaw", + skill_dir: Optional[Path] = None, verbose: bool = False, ) -> GradeResult: transcript = execution_result.get("transcript", []) @@ -174,24 +178,44 @@ def _grade_llm_judge( rubric = task.llm_judge_rubric or _format_grading_criteria(task) prompt = _build_judge_prompt(task, transcript_summary, rubric, workspace_content) - agent_id = _ensure_judge_agent(judge_agent_prefix, judge_model, skill_dir) - judge_workspace = Path(f"/tmp/pinchbench/judge/{task.task_id}") - judge_result = run_openclaw_prompt( - agent_id=agent_id, - prompt=prompt, - workspace=judge_workspace, - timeout_seconds=judge_timeout_seconds, - ) + if judge_backend == "api": + # Direct API call — bypasses OpenClaw personality injection + judge_result = call_judge_api( + prompt=prompt, + model=judge_model, + timeout_seconds=judge_timeout_seconds, + ) - if verbose: - logger.info(" [VERBOSE] Judge execution status: %s", judge_result.get("status")) - logger.info(" [VERBOSE] Judge exit code: %s", judge_result.get("exit_code")) - logger.info(" [VERBOSE] Judge stderr: %s", judge_result.get("stderr", "")[:500]) + if verbose: + logger.info(" [VERBOSE] Judge execution status: %s", judge_result.get("status")) + if judge_result.get("error"): + logger.info(" [VERBOSE] Judge error: %s", judge_result["error"]) + + if judge_result.get("status") != "success": + logger.warning("Judge API call failed: %s", judge_result.get("error", judge_result.get("status"))) + + raw_parsed = _parse_judge_text(judge_result.get("text", "")) + else: + # Default: OpenClaw agent session + agent_id = _ensure_judge_agent(judge_agent_prefix, judge_model, skill_dir) + judge_workspace = Path(f"/tmp/pinchbench/judge/{task.task_id}") + judge_result = run_openclaw_prompt( + agent_id=agent_id, + prompt=prompt, + workspace=judge_workspace, + timeout_seconds=judge_timeout_seconds, + ) + + if verbose: + logger.info(" [VERBOSE] Judge execution status: %s", judge_result.get("status")) + logger.info(" [VERBOSE] Judge exit code: %s", judge_result.get("exit_code")) + logger.info(" [VERBOSE] Judge stderr: %s", judge_result.get("stderr", "")[:500]) + + if judge_result.get("status") != "success": + logger.warning("Judge execution failed: %s", judge_result.get("status")) - if judge_result.get("status") != "success": - logger.warning("Judge execution failed: %s", judge_result.get("status")) + raw_parsed = _parse_judge_response(judge_result.get("transcript", [])) - raw_parsed = _parse_judge_response(judge_result.get("transcript", [])) if verbose: logger.info(" [VERBOSE] Judge raw response parsed: %s", raw_parsed) @@ -464,6 +488,80 @@ def _parse_judge_response(transcript: List[Dict[str, Any]]) -> Dict[str, Any]: return {} +def _parse_judge_text(raw_text: str) -> Dict[str, Any]: + """Parse judge response from raw text (direct API call, no OpenClaw transcript).""" + raw_text = raw_text.strip() + if not raw_text: + return {} + + # Try direct JSON parse first (ideal case with system prompt enforcement) + try: + parsed = json.loads(raw_text) + if isinstance(parsed, dict): + return parsed + except json.JSONDecodeError: + pass + + # Try extracting from code blocks + code_block_match = re.search(r"```(?:json)?\s*(.*?)\s*```", raw_text, re.DOTALL) + if code_block_match: + try: + parsed = json.loads(code_block_match.group(1)) + if isinstance(parsed, dict): + return parsed + except json.JSONDecodeError: + pass + + # Find balanced-brace JSON objects + json_candidates: List[str] = [] + brace_depth = 0 + current_json: List[str] = [] + for char in raw_text: + if char == "{": + if brace_depth == 0: + current_json = [] + brace_depth += 1 + if brace_depth > 0: + current_json.append(char) + if char == "}": + brace_depth -= 1 + if brace_depth == 0 and current_json: + json_candidates.append("".join(current_json)) + + for candidate in reversed(json_candidates): + try: + parsed = json.loads(candidate) + if isinstance(parsed, dict) and "scores" in parsed: + return parsed + except json.JSONDecodeError: + continue + for candidate in reversed(json_candidates): + try: + parsed = json.loads(candidate) + if isinstance(parsed, dict): + return parsed + except json.JSONDecodeError: + continue + + # Fallback: regex for total score + score_pattern = re.search( + r"(?:total|overall|final)\s*(?:score)?[:\s]*(0\.\d+|1\.0+)", + raw_text, + re.IGNORECASE, + ) + if score_pattern: + try: + total = float(score_pattern.group(1)) + if 0.0 <= total <= 1.0: + logger.warning("Fell back to regex score extraction (total=%.2f)", total) + return {"scores": {}, "total": total, "notes": "Score extracted from prose"} + except ValueError: + pass + + logger.warning("Failed to parse judge text response") + return {} + + def _normalize_judge_response(parsed: Dict[str, Any]) -> Dict[str, Any]: """ Normalize judge response to expected format with 'scores', 'total', and 'notes'.