diff --git a/scripts/lib_agent.py b/scripts/lib_agent.py index 7f261bf..39d3ac7 100644 --- a/scripts/lib_agent.py +++ b/scripts/lib_agent.py @@ -8,6 +8,7 @@ import logging import os import platform +import shutil import stat import subprocess import time @@ -21,6 +22,15 @@ logger = logging.getLogger(__name__) USE_SHELL = platform.system() == "Windows" +CONTROL_MARKDOWN_FILES = [ + "BOOTSTRAP.md", + "SOUL.md", + "USER.md", + "IDENTITY.md", + "HEARTBEAT.md", + "TOOLS.md", + "AGENTS.md", +] class ModelValidationError(Exception): @@ -33,6 +43,18 @@ class ModelValidationError(Exception): JUDGE_MAX_MSG_CHARS = int(os.environ.get("PINCHBENCH_JUDGE_MAX_MSG_CHARS", "3000")) +def _resolve_openclaw_executable() -> str: + configured = os.environ.get("OPENCLAW_PATH") + if configured: + return configured + resolved = shutil.which("openclaw") + return resolved or "openclaw" + + +def _openclaw_cmd(*args: str) -> list[str]: + return [_resolve_openclaw_executable(), *args] + + def _coerce_subprocess_output(value: Any) -> str: if value is None: return "" @@ -164,11 +186,11 @@ def _get_agent_workspace(agent_id: str) -> Path | None: """Get the workspace path for an agent from OpenClaw config.""" try: list_result = subprocess.run( - ["openclaw", "agents", "list"], + _openclaw_cmd("agents", "list"), capture_output=True, text=True, check=False, - shell=USE_SHELL, + shell=False, ) if list_result.returncode != 0: return None @@ -208,11 +230,11 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo try: list_result = subprocess.run( - ["openclaw", "agents", "list"], + _openclaw_cmd("agents", "list"), capture_output=True, text=True, check=False, - shell=USE_SHELL, + shell=False, ) except FileNotFoundError: logger.error("openclaw CLI not found while listing agents") @@ -251,18 +273,17 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo workspace_dir, ) subprocess.run( - ["openclaw", "agents", "delete", delete_name, "--force"], + _openclaw_cmd("agents", "delete", delete_name, "--force"), capture_output=True, text=True, check=False, - shell=USE_SHELL, + shell=False, ) logger.info("Creating OpenClaw agent %s", agent_id) try: create_result = subprocess.run( - [ - "openclaw", + _openclaw_cmd( "agents", "add", agent_id, @@ -271,11 +292,11 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo "--workspace", str(workspace_dir), "--non-interactive", - ], + ), capture_output=True, text=True, check=False, - shell=USE_SHELL, + shell=False, ) except FileNotFoundError: logger.error("openclaw CLI not found while creating agent") @@ -294,20 +315,20 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo bench_agent_dir.mkdir(parents=True, exist_ok=True) bench_models = bench_agent_dir / "models.json" import shutil as _shutil + _shutil.copy2(main_models, bench_models) # Set defaultProvider/defaultModel so OpenClaw uses the requested model if "/" in model_id: provider_name, model_name = model_id.split("/", 1) try: import json as _json + raw = bench_models.read_text("utf-8-sig") data = _json.loads(raw) data["defaultProvider"] = provider_name data["defaultModel"] = model_name bench_models.write_text(_json.dumps(data, indent=2, ensure_ascii=False), "utf-8") - logger.info( - "Set bench agent default model to %s / %s", provider_name, model_name - ) + logger.info("Set bench agent default model to %s / %s", provider_name, model_name) except Exception as exc: logger.warning("Failed to set default model in bench models.json: %s", exc) logger.info("Copied main agent models.json to bench agent %s", agent_id) @@ -364,13 +385,11 @@ def prepare_task_workspace(skill_dir: Path, run_id: str, task: Task, agent_id: s logger.warning("Could not find agent workspace, using fallback") workspace = Path(f"/tmp/pinchbench/{run_id}/{task.task_id}") - _BOOTSTRAP_FILES = ["SOUL.md", "BOOTSTRAP.md", "USER.md", "IDENTITY.md", "HEARTBEAT.md", "TOOLS.md"] + _BOOTSTRAP_FILES = CONTROL_MARKDOWN_FILES - def _remove_readonly(func, path, _): def _remove_readonly(func, path, _): try: os.chmod(path, stat.S_IWRITE) - func(path) except OSError: pass func(path) @@ -722,8 +741,7 @@ def execute_openclaw_task( break try: result = subprocess.run( - [ - "openclaw", + _openclaw_cmd( "agent", "--agent", agent_id, @@ -731,13 +749,13 @@ def execute_openclaw_task( session_id, "--message", session_prompt, - ], + ), capture_output=True, text=True, cwd=str(workspace), timeout=remaining, check=False, - shell=USE_SHELL, + shell=False, ) stdout += result.stdout stderr += result.stderr @@ -756,8 +774,7 @@ def execute_openclaw_task( # Single-session task: send task.prompt once try: result = subprocess.run( - [ - "openclaw", + _openclaw_cmd( "agent", "--agent", agent_id, @@ -765,13 +782,13 @@ def execute_openclaw_task( session_id, "--message", task.prompt, - ], + ), capture_output=True, text=True, cwd=str(workspace), timeout=timeout_seconds, check=False, - shell=USE_SHELL, + shell=False, ) stdout = result.stdout stderr = result.stderr @@ -860,7 +877,7 @@ def run_openclaw_prompt( agent_workspace = _get_agent_workspace(agent_id) if agent_workspace and agent_workspace.exists(): - for bootstrap_file in ["BOOTSTRAP.md", "SOUL.md", "USER.md", "IDENTITY.md", "HEARTBEAT.md"]: + for bootstrap_file in CONTROL_MARKDOWN_FILES: bp = agent_workspace / bootstrap_file if bp.exists(): try: @@ -907,32 +924,22 @@ def run_openclaw_prompt( timed_out = True break try: - openclaw_path = os.environ.get("OPENCLAW_PATH", "openclaw") - # On Windows, cmd.exe splits command-line arguments at literal newlines, - # causing the message to be truncated after the first line. - # Escape newlines to literal \n sequences so the full prompt is received. - send_chunk = ( - chunk.replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n") - if USE_SHELL - else chunk - ) result = subprocess.run( - [ - openclaw_path, + _openclaw_cmd( "agent", "--agent", agent_id, "--session-id", session_id, "--message", - send_chunk, - ], + chunk, + ), capture_output=True, text=True, cwd=str(workspace), timeout=remaining, check=False, - shell=USE_SHELL, + shell=False, ) stdout += result.stdout stderr += result.stderr diff --git a/scripts/lib_grading.py b/scripts/lib_grading.py index bf25b2d..71b4b8d 100644 --- a/scripts/lib_grading.py +++ b/scripts/lib_grading.py @@ -6,6 +6,7 @@ import json import logging +import os import re from dataclasses import dataclass from pathlib import Path @@ -21,6 +22,8 @@ DEFAULT_JUDGE_MODEL = "openrouter/anthropic/claude-opus-4.5" DEFAULT_JUDGE_AGENT_PREFIX = "bench-judge" DEFAULT_JUDGE_TIMEOUT_SECONDS = 180 +MAX_WORKSPACE_CONTENT_CHARS = int(os.environ.get("PINCHBENCH_MAX_WORKSPACE_CONTENT_CHARS", "30000")) +MAX_WORKSPACE_FILES = int(os.environ.get("PINCHBENCH_MAX_WORKSPACE_FILES", "200")) @dataclass @@ -57,7 +60,7 @@ def grade_task( if verbose: logger.info(" [VERBOSE] Grading task %s with type: %s", task.task_id, grading_type) logger.info(" [VERBOSE] Execution status: %s", execution_result.get("status", "unknown")) - + if grading_type == "automated": result = _grade_automated(task, execution_result, verbose=verbose) if verbose: @@ -91,7 +94,9 @@ def grade_task( raise ValueError(f"Unknown grading type: {grading_type}") -def _grade_automated(task: Task, execution_result: Dict[str, Any], verbose: bool = False) -> GradeResult: +def _grade_automated( + task: Task, execution_result: Dict[str, Any], verbose: bool = False +) -> GradeResult: grading_code = _extract_grading_code(task) if not grading_code: return GradeResult( @@ -122,7 +127,7 @@ def _grade_automated(task: Task, execution_result: Dict[str, Any], verbose: bool ) if not isinstance(scores, dict): scores = {} - + if verbose: logger.info(" [VERBOSE] Automated grading scores: %s", scores) @@ -167,10 +172,16 @@ def _grade_llm_judge( transcript_summary = _summarize_transcript(transcript) if verbose: - logger.info(" [VERBOSE] Transcript summary for judge (first 1000 chars):\n%s", transcript_summary[:1000]) + logger.info( + " [VERBOSE] Transcript summary for judge (first 1000 chars):\n%s", + transcript_summary[:1000], + ) workspace_content = _read_workspace_files(execution_result.get("workspace", "")) if verbose and workspace_content: - logger.info(" [VERBOSE] Workspace files passed to judge (first 500 chars):\n%s", workspace_content[:500]) + logger.info( + " [VERBOSE] Workspace files passed to judge (first 500 chars):\n%s", + workspace_content[:500], + ) rubric = task.llm_judge_rubric or _format_grading_criteria(task) prompt = _build_judge_prompt(task, transcript_summary, rubric, workspace_content) @@ -191,15 +202,15 @@ def _grade_llm_judge( if judge_result.get("status") != "success": logger.warning("Judge execution failed: %s", judge_result.get("status")) - raw_parsed = _parse_judge_response(judge_result.get("transcript", [])) + raw_parsed = _parse_judge_response(judge_result.get("transcript", []), verbose=verbose) if verbose: logger.info(" [VERBOSE] Judge raw response parsed: %s", raw_parsed) - + # Normalize the response to handle various formats (criteria_scores, score, justification, etc.) parsed = _normalize_judge_response(raw_parsed) if verbose: logger.info(" [VERBOSE] Normalized judge response: %s", parsed) - + breakdown = parsed.get("scores", {}) total = parsed.get("total") notes = parsed.get("notes", "") @@ -288,9 +299,7 @@ def _summarize_transcript(transcript: List[Dict[str, Any]]) -> str: truncated_args[k] = v[:200] + "...[truncated]" else: truncated_args[k] = v - summary_parts.append( - f"Tool: {item.get('name')}({json.dumps(truncated_args)})" - ) + summary_parts.append(f"Tool: {item.get('name')}({json.dumps(truncated_args)})") elif item.get("type") == "text": text = item.get("text", "").strip() if text: @@ -315,11 +324,19 @@ def _read_workspace_files(workspace_path: str) -> str: if not workspace.exists(): return "" skip_names = { - "BOOTSTRAP.md", "SOUL.md", "USER.md", "IDENTITY.md", - "HEARTBEAT.md", "TOOLS.md", "AGENTS.md", + "BOOTSTRAP.md", + "SOUL.md", + "USER.md", + "IDENTITY.md", + "HEARTBEAT.md", + "TOOLS.md", + "AGENTS.md", } skip_dirs = {".git", ".openclaw", "__pycache__", "node_modules", "skills"} file_contents: List[str] = [] + total_chars = 0 + files_seen = 0 + limit_reached = False for f in sorted(workspace.rglob("*")): if not f.is_file(): continue @@ -331,19 +348,36 @@ def _read_workspace_files(workspace_path: str) -> str: continue try: content = f.read_text(encoding="utf-8") - file_contents.append(f"### File: {rel}\n{content[:3000]}") + snippet = f"### File: {rel}\n{content[:3000]}" + if ( + files_seen >= MAX_WORKSPACE_FILES + or total_chars + len(snippet) > MAX_WORKSPACE_CONTENT_CHARS + ): + limit_reached = True + break + file_contents.append(snippet) + files_seen += 1 + total_chars += len(snippet) except (OSError, UnicodeDecodeError): pass + if limit_reached: + logger.warning( + "Workspace grading context truncated at %d files / %d chars", + files_seen, + total_chars, + ) + file_contents.append( + "... [workspace content truncated due to PINCHBENCH_MAX_WORKSPACE_FILES/PINCHBENCH_MAX_WORKSPACE_CONTENT_CHARS limits]" + ) return "\n\n".join(file_contents) -def _build_judge_prompt(task: Task, transcript_summary: str, rubric: str, workspace_content: str = "") -> str: +def _build_judge_prompt( + task: Task, transcript_summary: str, rubric: str, workspace_content: str = "" +) -> str: workspace_section = "" if workspace_content.strip(): - workspace_section = ( - "## Workspace Files Created by Agent\n" - f"{workspace_content}\n\n" - ) + workspace_section = f"## Workspace Files Created by Agent\n{workspace_content}\n\n" return ( "You are a grading function. Your ONLY job is to output a single JSON object.\n\n" "CRITICAL RULES:\n" @@ -378,7 +412,9 @@ def _ensure_judge_agent(judge_agent_prefix: str, judge_model: str, skill_dir: Pa return agent_id -def _parse_judge_response(transcript: List[Dict[str, Any]]) -> Dict[str, Any]: +def _parse_judge_response( + transcript: List[Dict[str, Any]], *, verbose: bool = False +) -> Dict[str, Any]: content_chunks: List[str] = [] for event in transcript: if event.get("type") != "message": @@ -390,7 +426,8 @@ def _parse_judge_response(transcript: List[Dict[str, Any]]) -> Dict[str, Any]: if item.get("type") == "text": content_chunks.append(item.get("text", "")) raw_text = "\n".join(content_chunks).strip() - logger.info(" [VERBOSE] Judge raw response text (first 2000 chars):\n%s", raw_text[:2000]) + if verbose: + logger.info(" [VERBOSE] Judge raw response text (first 2000 chars):\n%s", raw_text[:2000]) if not raw_text: return {} @@ -453,10 +490,12 @@ def _parse_judge_response(transcript: List[Dict[str, Any]]) -> Dict[str, Any]: try: total = float(score_pattern.group(1)) if 0.0 <= total <= 1.0: - logger.warning( - "Fell back to regex score extraction from prose (total=%.2f)", total - ) - return {"scores": {}, "total": total, "notes": "Score extracted from prose (JSON parse failed)"} + logger.warning("Fell back to regex score extraction from prose (total=%.2f)", total) + return { + "scores": {}, + "total": total, + "notes": "Score extracted from prose (JSON parse failed)", + } except ValueError: pass @@ -467,14 +506,14 @@ def _parse_judge_response(transcript: List[Dict[str, Any]]) -> Dict[str, Any]: def _normalize_judge_response(parsed: Dict[str, Any]) -> Dict[str, Any]: """ Normalize judge response to expected format with 'scores', 'total', and 'notes'. - + Handles various response formats: - {"scores": {...}, "total": 0.9, "notes": "..."} (expected) - {"criteria_scores": {...}, ...} (Claude sometimes uses this) - {"score": 0.9, "justification": "..."} (simplified format) """ result: Dict[str, Any] = {"scores": {}, "total": None, "notes": ""} - + # Extract scores from various keys if "scores" in parsed: scores_data = parsed["scores"] @@ -482,7 +521,11 @@ def _normalize_judge_response(parsed: Dict[str, Any]) -> Dict[str, Any]: # Handle nested structure: {"criterion": {"score": 0.9, "weight": 0.3}} for key, value in scores_data.items(): if isinstance(value, dict) and "score" in value: - result["scores"][key] = float(value["score"]) if isinstance(value["score"], (int, float, str)) else value["score"] + result["scores"][key] = ( + float(value["score"]) + if isinstance(value["score"], (int, float, str)) + else value["score"] + ) elif isinstance(value, (int, float)): result["scores"][key] = value elif "criteria_scores" in parsed: @@ -494,10 +537,12 @@ def _normalize_judge_response(parsed: Dict[str, Any]) -> Dict[str, Any]: result["scores"][key] = value["score"] elif isinstance(value, (int, float)): result["scores"][key] = value - + # Extract total score if "total" in parsed and parsed["total"] is not None: - result["total"] = float(parsed["total"]) if isinstance(parsed["total"], (int, float)) else None + result["total"] = ( + float(parsed["total"]) if isinstance(parsed["total"], (int, float)) else None + ) elif "score" in parsed and isinstance(parsed["score"], (int, float)): result["total"] = float(parsed["score"]) elif "overall_score" in parsed and isinstance(parsed["overall_score"], (int, float)): @@ -518,7 +563,7 @@ def _normalize_judge_response(parsed: Dict[str, Any]) -> Dict[str, Any]: and all(0.0 <= float(v) <= 1.0 for v in values) ): result["total"] = sum(values) / len(values) - + # Extract notes/justification if "notes" in parsed: result["notes"] = str(parsed["notes"]) @@ -526,5 +571,5 @@ def _normalize_judge_response(parsed: Dict[str, Any]) -> Dict[str, Any]: result["notes"] = str(parsed["justification"]) elif "reasoning" in parsed: result["notes"] = str(parsed["reasoning"]) - + return result