Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 96 additions & 22 deletions scripts/lib_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import json
import logging
import os
import platform
import stat
import subprocess
import time
from pathlib import Path
Expand All @@ -18,14 +20,17 @@

logger = logging.getLogger(__name__)

USE_SHELL = platform.system() == "Windows"


class ModelValidationError(Exception):
"""Raised when a model ID is invalid or inaccessible."""

pass


MAX_OPENCLAW_MESSAGE_CHARS = int(os.environ.get("PINCHBENCH_MAX_MSG_CHARS", "4000"))
MAX_OPENCLAW_MESSAGE_CHARS = int(os.environ.get("PINCHBENCH_MAX_MSG_CHARS", "8000"))
JUDGE_MAX_MSG_CHARS = int(os.environ.get("PINCHBENCH_JUDGE_MAX_MSG_CHARS", "3000"))


def _coerce_subprocess_output(value: Any) -> str:
Expand Down Expand Up @@ -163,6 +168,7 @@ def _get_agent_workspace(agent_id: str) -> Path | None:
capture_output=True,
text=True,
check=False,
shell=USE_SHELL,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: shell=USE_SHELL with a list argument is a security concern on Windows. When shell=True and a list is passed, Python joins the list into a single string for cmd.exe. Any argument containing shell metacharacters (&, |, >, etc.) could lead to command injection.

For the "openclaw", "agents", "list" invocation here this is low risk since args are hardcoded, but this same pattern is applied in execute_openclaw_task (line 735) and run_openclaw_prompt (line 930) where session_prompt/send_chunk contain task-controlled or transcript-derived content. On Windows, a malicious or unexpected prompt containing & del /s /q C:\ would be interpreted by cmd.exe.

Consider using subprocess.list2cmdline() to properly quote the argument list when shell=True, or better yet, avoid shell=True and resolve the executable path via shutil.which("openclaw") instead — this is typically why shell=True is needed on Windows (to locate executables on PATH).

)
if list_result.returncode != 0:
return None
Expand Down Expand Up @@ -206,6 +212,7 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo
capture_output=True,
text=True,
check=False,
shell=USE_SHELL,
)
except FileNotFoundError:
logger.error("openclaw CLI not found while listing agents")
Expand Down Expand Up @@ -248,6 +255,7 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo
capture_output=True,
text=True,
check=False,
shell=USE_SHELL,
)

logger.info("Creating OpenClaw agent %s", agent_id)
Expand All @@ -267,6 +275,7 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo
capture_output=True,
text=True,
check=False,
shell=USE_SHELL,
)
except FileNotFoundError:
logger.error("openclaw CLI not found while creating agent")
Expand All @@ -276,6 +285,44 @@ def ensure_agent_exists(agent_id: str, model_id: str, workspace_dir: Path) -> bo
logger.warning(
"Agent creation returned %s: %s", create_result.returncode, create_result.stderr
)

# Copy main agent's models.json to bench agent so custom providers (e.g. lenovo)
# are available. OpenClaw only copies a subset of providers when creating a new agent.
main_models = Path.home() / ".openclaw" / "agents" / "main" / "agent" / "models.json"
if main_models.exists():
bench_agent_dir = _get_agent_store_dir(agent_id) / "agent"
bench_agent_dir.mkdir(parents=True, exist_ok=True)
bench_models = bench_agent_dir / "models.json"
import shutil as _shutil
_shutil.copy2(main_models, bench_models)
# Set defaultProvider/defaultModel so OpenClaw uses the requested model
if "/" in model_id:
provider_name, model_name = model_id.split("/", 1)
try:
import json as _json
raw = bench_models.read_text("utf-8-sig")
data = _json.loads(raw)
data["defaultProvider"] = provider_name
data["defaultModel"] = model_name
bench_models.write_text(_json.dumps(data, indent=2, ensure_ascii=False), "utf-8")
logger.info(
"Set bench agent default model to %s / %s", provider_name, model_name
)
except Exception as exc:
logger.warning("Failed to set default model in bench models.json: %s", exc)
logger.info("Copied main agent models.json to bench agent %s", agent_id)

# Delete sessions.json so OpenClaw picks up the new defaultProvider/defaultModel
# instead of reusing a cached session entry that still points to an old model.
bench_sessions_dir = _get_agent_store_dir(agent_id) / "sessions"
sessions_store = bench_sessions_dir / "sessions.json"
if sessions_store.exists():
try:
sessions_store.unlink()
logger.info("Deleted stale sessions.json for bench agent %s", agent_id)
except OSError as exc:
logger.warning("Failed to delete sessions.json: %s", exc)

return True


Expand Down Expand Up @@ -317,12 +364,29 @@ def prepare_task_workspace(skill_dir: Path, run_id: str, task: Task, agent_id: s
logger.warning("Could not find agent workspace, using fallback")
workspace = Path(f"/tmp/pinchbench/{run_id}/{task.task_id}")

# Clear workspace before each task to prevent stale files from prior tasks
# from contaminating the agent's context.
_BOOTSTRAP_FILES = ["SOUL.md", "BOOTSTRAP.md", "USER.md", "IDENTITY.md", "HEARTBEAT.md", "TOOLS.md"]

def _remove_readonly(func, path, _):
def _remove_readonly(func, path, _):
try:
os.chmod(path, stat.S_IWRITE)
func(path)
except OSError:
pass
func(path)

saved_bootstrap: dict[str, bytes] = {}
if workspace.exists():
shutil.rmtree(workspace)
for fname in _BOOTSTRAP_FILES:
fpath = workspace / fname
if fpath.exists():
saved_bootstrap[fname] = fpath.read_bytes()
shutil.rmtree(workspace, onerror=_remove_readonly)
workspace.mkdir(parents=True, exist_ok=True)

for fname, content in saved_bootstrap.items():
(workspace / fname).write_bytes(content)

for file_spec in task.workspace_files:
if "content" in file_spec:
dest = workspace / file_spec["path"]
Expand All @@ -339,17 +403,6 @@ def prepare_task_workspace(skill_dir: Path, run_id: str, task: Task, agent_id: s
logger.error("Workspace file not found: %s", source)
raise

# Remove bootstrap files that would trigger the onboarding flow
# These interfere with benchmark tasks
for bootstrap_file in ["BOOTSTRAP.md", "SOUL.md", "USER.md", "IDENTITY.md"]:
bootstrap_path = workspace / bootstrap_file
if bootstrap_path.exists():
try:
bootstrap_path.unlink()
logger.info("Removed bootstrap file: %s", bootstrap_file)
except OSError as exc:
logger.warning("Failed to remove %s: %s", bootstrap_file, exc)

# Copy skills from main workspace to benchmark workspace
# This enables benchmark agents to use installed skills like nano-pdf
main_skills_dir = Path.home() / ".openclaw" / "workspace" / "skills"
Expand All @@ -363,7 +416,7 @@ def prepare_task_workspace(skill_dir: Path, run_id: str, task: Task, agent_id: s
import shutil

if dest_skill_dir.exists():
shutil.rmtree(dest_skill_dir)
shutil.rmtree(dest_skill_dir, onerror=_remove_readonly)
shutil.copytree(skill_dir_src, dest_skill_dir)
logger.info("Copied skill to benchmark workspace: %s", skill_dir_src.name)

Expand Down Expand Up @@ -684,6 +737,7 @@ def execute_openclaw_task(
cwd=str(workspace),
timeout=remaining,
check=False,
shell=USE_SHELL,
)
stdout += result.stdout
stderr += result.stderr
Expand Down Expand Up @@ -717,6 +771,7 @@ def execute_openclaw_task(
cwd=str(workspace),
timeout=timeout_seconds,
check=False,
shell=USE_SHELL,
)
stdout = result.stdout
stderr = result.stderr
Expand Down Expand Up @@ -801,10 +856,19 @@ def run_openclaw_prompt(
timeout_seconds: float,
) -> Dict[str, Any]:
"""Run a single OpenClaw prompt for helper agents like the judge."""
# Clean up previous session transcripts so we can reliably find this
# prompt's transcript (OpenClaw uses its own UUID-based naming).
cleanup_agent_sessions(agent_id)

agent_workspace = _get_agent_workspace(agent_id)
if agent_workspace and agent_workspace.exists():
for bootstrap_file in ["BOOTSTRAP.md", "SOUL.md", "USER.md", "IDENTITY.md", "HEARTBEAT.md"]:
bp = agent_workspace / bootstrap_file
if bp.exists():
try:
bp.unlink()
logger.debug("Removed bootstrap file from judge workspace: %s", bootstrap_file)
except OSError as exc:
logger.warning("Failed to remove bootstrap file %s: %s", bootstrap_file, exc)

start_time = time.time()
workspace.mkdir(parents=True, exist_ok=True)
session_id = f"judge_{int(time.time() * 1000)}"
Expand All @@ -814,8 +878,8 @@ def run_openclaw_prompt(
timed_out = False

chunks = [
prompt[i : i + MAX_OPENCLAW_MESSAGE_CHARS]
for i in range(0, max(1, len(prompt)), MAX_OPENCLAW_MESSAGE_CHARS)
prompt[i : i + JUDGE_MAX_MSG_CHARS]
for i in range(0, max(1, len(prompt)), JUDGE_MAX_MSG_CHARS)
]
if len(chunks) > 1:
total_chunks = len(chunks)
Expand Down Expand Up @@ -843,22 +907,32 @@ def run_openclaw_prompt(
timed_out = True
break
try:
openclaw_path = os.environ.get("OPENCLAW_PATH", "openclaw")
# On Windows, cmd.exe splits command-line arguments at literal newlines,
# causing the message to be truncated after the first line.
# Escape newlines to literal \n sequences so the full prompt is received.
send_chunk = (
chunk.replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: This newline escaping is fragile and can corrupt prompt content. If the original chunk already contains literal \n text sequences (common in JSON strings within the judge prompt), those become indistinguishable from escaped actual newlines when the receiving side interprets the message.

For example, a JSON snippet {"key": "line1\nline2"} in the prompt would be double-escaped to {"key": "line1\\nline2"} — or if it contained real newlines alongside escaped ones, the escaping would be inconsistent.

Consider using base64 encoding, a temp file passed via --file argument, or stdin piping (subprocess.run(..., input=chunk)) instead, which avoids the need for shell escaping entirely.

if USE_SHELL
else chunk
)
result = subprocess.run(
[
"openclaw",
openclaw_path,
"agent",
"--agent",
agent_id,
"--session-id",
session_id,
"--message",
chunk,
send_chunk,
],
capture_output=True,
text=True,
cwd=str(workspace),
timeout=remaining,
check=False,
shell=USE_SHELL,
)
stdout += result.stdout
stderr += result.stderr
Expand Down
86 changes: 82 additions & 4 deletions scripts/lib_grading.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,32 @@ def _grade_llm_judge(
skill_dir: Path,
verbose: bool = False,
) -> GradeResult:
transcript_summary = _summarize_transcript(execution_result.get("transcript", []))
transcript = execution_result.get("transcript", [])
execution_status = execution_result.get("status", "unknown")

if not transcript and execution_status != "success":
if verbose:
logger.info(
" [VERBOSE] Skipping LLM judge: status=%s, transcript empty",
execution_status,
)
return GradeResult(
task_id=task.task_id,
score=0.0,
max_score=1.0,
grading_type="llm_judge",
breakdown={},
notes=f"Skipped: task execution failed ({execution_status}), no transcript to evaluate",
)

transcript_summary = _summarize_transcript(transcript)
if verbose:
logger.info(" [VERBOSE] Transcript summary for judge (first 1000 chars):\n%s", transcript_summary[:1000])
workspace_content = _read_workspace_files(execution_result.get("workspace", ""))
if verbose and workspace_content:
logger.info(" [VERBOSE] Workspace files passed to judge (first 500 chars):\n%s", workspace_content[:500])
rubric = task.llm_judge_rubric or _format_grading_criteria(task)
prompt = _build_judge_prompt(task, transcript_summary, rubric)
prompt = _build_judge_prompt(task, transcript_summary, rubric, workspace_content)

agent_id = _ensure_judge_agent(judge_agent_prefix, judge_model, skill_dir)
judge_workspace = Path(f"/tmp/pinchbench/judge/{task.task_id}")
Expand All @@ -162,6 +183,14 @@ def _grade_llm_judge(
timeout_seconds=judge_timeout_seconds,
)

if verbose:
logger.info(" [VERBOSE] Judge execution status: %s", judge_result.get("status"))
logger.info(" [VERBOSE] Judge exit code: %s", judge_result.get("exit_code"))
logger.info(" [VERBOSE] Judge stderr: %s", judge_result.get("stderr", "")[:500])

if judge_result.get("status") != "success":
logger.warning("Judge execution failed: %s", judge_result.get("status"))

raw_parsed = _parse_judge_response(judge_result.get("transcript", []))
if verbose:
logger.info(" [VERBOSE] Judge raw response parsed: %s", raw_parsed)
Expand Down Expand Up @@ -252,9 +281,20 @@ def _summarize_transcript(transcript: List[Dict[str, Any]]) -> str:
if role == "assistant":
for item in msg.get("content", []):
if item.get("type") == "toolCall":
args = item.get("arguments", {})
truncated_args: Dict[str, Any] = {}
for k, v in args.items():
if isinstance(v, str) and len(v) > 200:
truncated_args[k] = v[:200] + "...[truncated]"
else:
truncated_args[k] = v
summary_parts.append(
f"Tool: {item.get('name')}({json.dumps(item.get('arguments', {}))})"
f"Tool: {item.get('name')}({json.dumps(truncated_args)})"
)
elif item.get("type") == "text":
text = item.get("text", "").strip()
if text:
summary_parts.append(f"Assistant: {text[:2000]}")
elif role == "toolResult":
content = msg.get("content", [])
if content:
Expand All @@ -267,7 +307,43 @@ def _summarize_transcript(transcript: List[Dict[str, Any]]) -> str:
return "\n".join(summary_parts)


def _build_judge_prompt(task: Task, transcript_summary: str, rubric: str) -> str:
def _read_workspace_files(workspace_path: str) -> str:
"""Read user-created text files from workspace to provide grading context."""
if not workspace_path:
return ""
workspace = Path(workspace_path)
if not workspace.exists():
return ""
skip_names = {
"BOOTSTRAP.md", "SOUL.md", "USER.md", "IDENTITY.md",
"HEARTBEAT.md", "TOOLS.md", "AGENTS.md",
}
skip_dirs = {".git", ".openclaw", "__pycache__", "node_modules", "skills"}
file_contents: List[str] = []
for f in sorted(workspace.rglob("*")):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: No bound on total file count or aggregate size. A workspace containing hundreds of files (e.g., after npm install leaks through the skip_dirs filter, or a code generation task creates many files) could produce an extremely large string that blows up the judge prompt context window and wastes API tokens.

Consider adding a total size cap, e.g.:

MAX_WORKSPACE_CONTENT_CHARS = 30_000
total_chars = 0
for f in sorted(workspace.rglob("*")):
    ...
    content = f.read_text(encoding="utf-8")
    snippet = f"### File: {rel}\n{content[:3000]}"
    total_chars += len(snippet)
    if total_chars > MAX_WORKSPACE_CONTENT_CHARS:
        file_contents.append("... (truncated, workspace too large)")
        break
    file_contents.append(snippet)

if not f.is_file():
continue
rel = f.relative_to(workspace)
parts = rel.parts
if any(part.startswith(".") or part in skip_dirs for part in parts):
continue
if f.name in skip_names:
continue
try:
content = f.read_text(encoding="utf-8")
file_contents.append(f"### File: {rel}\n{content[:3000]}")
except (OSError, UnicodeDecodeError):
pass
return "\n\n".join(file_contents)


def _build_judge_prompt(task: Task, transcript_summary: str, rubric: str, workspace_content: str = "") -> str:
workspace_section = ""
if workspace_content.strip():
workspace_section = (
"## Workspace Files Created by Agent\n"
f"{workspace_content}\n\n"
)
return (
"You are a grading function. Your ONLY job is to output a single JSON object.\n\n"
"CRITICAL RULES:\n"
Expand All @@ -284,6 +360,7 @@ def _build_judge_prompt(task: Task, transcript_summary: str, rubric: str) -> str
f"{task.expected_behavior}\n\n"
"## Agent Transcript (summarized)\n"
f"{transcript_summary}\n\n"
f"{workspace_section}"
"## Grading Rubric\n"
f"{rubric}\n\n"
"Score each criterion from 0.0 to 1.0.\n\n"
Expand Down Expand Up @@ -312,6 +389,7 @@ def _parse_judge_response(transcript: List[Dict[str, Any]]) -> Dict[str, Any]:
if item.get("type") == "text":
content_chunks.append(item.get("text", ""))
raw_text = "\n".join(content_chunks).strip()
logger.info(" [VERBOSE] Judge raw response text (first 2000 chars):\n%s", raw_text[:2000])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: This logger.info call runs unconditionally on every judge grading, unlike the surrounding verbose-gated log lines. It will dump up to 2000 chars of raw judge response text in production logs even when verbose=False. This seems unintentional — the [VERBOSE] prefix suggests it should be guarded.

Note that verbose is not in scope here (it's a parameter of _grade_llm_judge, not _parse_judge_response). Either pass verbose to this function, or move this log into the caller after the call to _parse_judge_response.

if not raw_text:
return {}

Expand Down
3 changes: 3 additions & 0 deletions scripts/lib_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def load_all_tasks(self) -> List[Task]:
for task_file in task_files:
try:
task = self.load_task(task_file)
if "_XX_" in task.task_id or task.task_id == "task_XX_name":
logger.debug("Skipping template task: %s", task.task_id)
continue
tasks.append(task)
logger.info(f"Successfully loaded task: {task.task_id}")
except Exception as e:
Expand Down
Loading
Loading