diff --git a/examples/eval/scripts/multi_tasks.yaml b/examples/eval/scripts/nemo_skills/multi_tasks.yaml similarity index 100% rename from examples/eval/scripts/multi_tasks.yaml rename to examples/eval/scripts/nemo_skills/multi_tasks.yaml diff --git a/examples/eval/scripts/run-qwen3-32B.sh b/examples/eval/scripts/nemo_skills/run-qwen3-32B.sh similarity index 96% rename from examples/eval/scripts/run-qwen3-32B.sh rename to examples/eval/scripts/nemo_skills/run-qwen3-32B.sh index eb6702deb..4d3da4f18 100644 --- a/examples/eval/scripts/run-qwen3-32B.sh +++ b/examples/eval/scripts/nemo_skills/run-qwen3-32B.sh @@ -29,11 +29,11 @@ fi echo "HAS_NVLINK: $HAS_NVLINK (detected $NVLINK_COUNT NVLink references)" SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" -REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../../.." &>/dev/null && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../../../.." &>/dev/null && pwd)" source "${REPO_ROOT}/scripts/models/qwen3-32B.sh" # Store eval/delegate settings in a YAML config similar to examples/eval_multi_task. -EVAL_CONFIG_PATH=${SKILLS_EVAL_CONFIG_PATH:-"${REPO_ROOT}/examples/eval/scripts/multi_tasks.yaml"} +EVAL_CONFIG_PATH=${SKILLS_EVAL_CONFIG_PATH:-"${REPO_ROOT}/examples/eval/scripts/nemo_skills/multi_tasks.yaml"} CKPT_ARGS=( --hf-checkpoint /root/shared/Qwen3-32B diff --git a/examples/eval/scripts/run-qwen3-4B.sh b/examples/eval/scripts/nemo_skills/run-qwen3-4B.sh similarity index 96% rename from examples/eval/scripts/run-qwen3-4B.sh rename to examples/eval/scripts/nemo_skills/run-qwen3-4B.sh index 34891126d..679a7a7bc 100644 --- a/examples/eval/scripts/run-qwen3-4B.sh +++ b/examples/eval/scripts/nemo_skills/run-qwen3-4B.sh @@ -29,11 +29,11 @@ fi echo "HAS_NVLINK: $HAS_NVLINK (detected $NVLINK_COUNT NVLink references)" SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" -REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../../.." &>/dev/null && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../../../.." &>/dev/null && pwd)" source "${REPO_ROOT}/scripts/models/qwen3-4B.sh" # Store eval/delegate settings in a YAML config similar to examples/eval_multi_task. -EVAL_CONFIG_PATH=${SKILLS_EVAL_CONFIG_PATH:-"${REPO_ROOT}/examples/eval/scripts/multi_tasks.yaml"} +EVAL_CONFIG_PATH=${SKILLS_EVAL_CONFIG_PATH:-"${REPO_ROOT}/examples/eval/scripts/nemo_skills/multi_tasks.yaml"} CKPT_ARGS=( --hf-checkpoint /root/Qwen3-4B diff --git a/examples/eval/scripts/terminal_bench/harbor_runner.yaml b/examples/eval/scripts/terminal_bench/harbor_runner.yaml new file mode 100644 index 000000000..6ab080339 --- /dev/null +++ b/examples/eval/scripts/terminal_bench/harbor_runner.yaml @@ -0,0 +1,42 @@ +eval: + defaults: + n_samples_per_eval_prompt: 1 + temperature: 0.6 + top_p: 0.95 + top_k: -1 + max_response_len: 24576 + datasets: # these eval tasks go through miles dataset config and default rollout function (miles.rollout.sglang_rollout.generate_rollout) + - name: gpqa # huggingface-cli download --repo-type dataset zyzshishui0627/gpqa_diamond --local-dir /root/gpqa + path: /root/gpqa/gpqa_eval.jsonl + rm_type: gpqa + n_samples_per_eval_prompt: 2 + - name: ifbench # huggingface-cli download --repo-type dataset zyzshishui0627/IFBench --local-dir /root/ifbench + path: /root/ifbench/IFBench_eval.jsonl + rm_type: ifbench + n_samples_per_eval_prompt: 1 + delegate: + - name: terminal_bench + url: http://172.17.0.1:9051 # Port must match the TB server running on the host machine + timeout_secs: 86400 # 24 hours + max_retries: 1 # HTTP request retries from Miles to the TB server + model_name: qwen3-8b + agent_name: terminus-2 + api_base: http://127.0.0.1:30005/v1 # Port must match the sglang router port set in run-eval-tb-qwen.sh + runner: harbor + dataset_name: terminal-bench + dataset_version: "2.0" + output_path: harbor_runner_jobs + n_concurrent: 32 + runner_kwargs: + # task_name: + # - fix-git + # debug: true + # timeout_multiplier: 2.0 + # retry_exclude: + # - AgentTimeoutError + # - VerifierTimeoutError + n_attempts: 2 + agent_kwarg: + model_info: + max_input_tokens: 40960 + max_output_tokens: 8192 diff --git a/examples/eval/scripts/terminal_bench/run-eval-tb-qwen.sh b/examples/eval/scripts/terminal_bench/run-eval-tb-qwen.sh new file mode 100644 index 000000000..fc827129f --- /dev/null +++ b/examples/eval/scripts/terminal_bench/run-eval-tb-qwen.sh @@ -0,0 +1,159 @@ +#!/bin/bash + +# Example launcher that reuses the Qwen3-8B recipe but delegates evaluation to an +# external Terminal Bench server via the eval_delegate_rollout wrapper. + +# Clean up any stale processes from a previous run. +pkill -9 sglang +sleep 3 +ray stop --force +pkill -9 ray +pkill -9 python +sleep 3 +pkill -9 ray +pkill -9 python + +set -ex + +export PYTHONBUFFERED=16 +export MILES_HOST_IP=${MILES_HOST_IP:-"127.0.0.1"} + +MODEL_DIR="${MODEL_DIR:-/root/.cache/huggingface}" +export MODEL_DIR + +NVLINK_COUNT=$(nvidia-smi topo -m 2>/dev/null | grep -o 'NV[0-9][0-9]*' | wc -l) +if [ "$NVLINK_COUNT" -gt 0 ]; then + HAS_NVLINK=1 +else + HAS_NVLINK=0 +fi +echo "HAS_NVLINK: $HAS_NVLINK (detected $NVLINK_COUNT NVLink references)" + +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../../../.." &>/dev/null && pwd)" +source "${REPO_ROOT}/scripts/models/qwen3-8B.sh" + +EVAL_CONFIG_PATH=${TB_EVAL_CONFIG_PATH:-"${SCRIPT_DIR}/harbor_runner.yaml"} +# EVAL_CONFIG_PATH=${TB_EVAL_CONFIG_PATH:-"${SCRIPT_DIR}/tb_runner.yaml"} + +CKPT_ARGS=( + --hf-checkpoint ${MODEL_DIR}/OpenThinker-Agent-v1 # huggingface-cli download open-thoughts/OpenThinker-Agent-v1 + --ref-load ${MODEL_DIR}/OpenThinker-Agent-v1_torch_dist + # --load ${MODEL_DIR}/OpenThinker-Agent-v1_miles/ + --save ${MODEL_DIR}/OpenThinker-Agent-v1_miles/ + --save-interval 20 +) + +ROLLOUT_ARGS=( + --prompt-data /root/dapo-math-17k/dapo-math-17k.jsonl + --input-key prompt + --label-key label + --apply-chat-template + --rollout-shuffle + --rm-type deepscaler + # --num-rollout 3000 + --num-rollout 1 + --rollout-batch-size 32 + --n-samples-per-prompt 8 + --rollout-max-response-len 8192 + --rollout-temperature 1 + --global-batch-size 256 + --balance-data +) + +EVAL_ARGS=( + --eval-interval 1 + --eval-config "${EVAL_CONFIG_PATH}" + --eval-function-path examples.eval.eval_delegate_rollout.generate_rollout +) + +PERF_ARGS=( + --tensor-model-parallel-size 1 + --pipeline-model-parallel-size 1 + --context-parallel-size 1 + --expert-model-parallel-size 1 + --expert-tensor-parallel-size 1 + + --recompute-granularity full + --recompute-method uniform + --recompute-num-layers 1 + + --use-dynamic-batch-size + --max-tokens-per-gpu 9216 +) + +GRPO_ARGS=( + --advantage-estimator grpo + --use-kl-loss + --kl-loss-coef 0.00 + --kl-loss-type low_var_kl + --entropy-coef 0.00 + --eps-clip 0.2 + --eps-clip-high 0.28 +) + +OPTIMIZER_ARGS=( + --optimizer adam + --lr 1e-6 + --lr-decay-style constant + --weight-decay 0.1 + --adam-beta1 0.9 + --adam-beta2 0.98 +) + +WANDB_ARGS=( + --use-wandb + --wandb-project miles-tb + --wandb-group qwen3-8b-eval + --wandb-key ${WANDB_KEY} # export WANDB_KEY="your_key" +) + +SGLANG_ARGS=( + --rollout-num-gpus-per-engine 1 + --sglang-mem-fraction-static 0.7 + --sglang-router-port 30005 +) + +MISC_ARGS=( + --attention-dropout 0.0 + --hidden-dropout 0.0 + --accumulate-allreduce-grads-in-fp32 + --attention-softmax-in-fp32 + --attention-backend flash +) + +export MASTER_ADDR=${MASTER_ADDR:-"127.0.0.1"} +export CUDA_VISIBLE_DEVICES=4,5,6,7 + +ray start --head --node-ip-address ${MASTER_ADDR} --port 6380 --num-gpus 4 \ + --disable-usage-stats \ + --dashboard-host=0.0.0.0 \ + --dashboard-port=8266 \ + --dashboard-agent-listen-port 52366 \ + --dashboard-agent-grpc-port 52367 \ + --runtime-env-agent-port 52368 + +RUNTIME_ENV_JSON="{ + \"env_vars\": { + \"PYTHONPATH\": \"/root/Megatron-LM/\", + \"CUDA_DEVICE_MAX_CONNECTIONS\": \"1\" + } +}" + +ray job submit --address="http://${MASTER_ADDR}:8266" \ + --working-dir "${REPO_ROOT}" \ + --runtime-env-json="${RUNTIME_ENV_JSON}" \ + -- python3 train.py \ + --actor-num-nodes 1 \ + --actor-num-gpus-per-node 4 \ + --colocate \ + ${MODEL_ARGS[@]} \ + ${CKPT_ARGS[@]} \ + ${ROLLOUT_ARGS[@]} \ + ${OPTIMIZER_ARGS[@]} \ + ${GRPO_ARGS[@]} \ + ${WANDB_ARGS[@]} \ + ${PERF_ARGS[@]} \ + ${EVAL_ARGS[@]} \ + ${SGLANG_ARGS[@]} \ + ${MISC_ARGS[@]} diff --git a/examples/eval/scripts/terminal_bench/tb_runner.yaml b/examples/eval/scripts/terminal_bench/tb_runner.yaml new file mode 100644 index 000000000..bdd46e137 --- /dev/null +++ b/examples/eval/scripts/terminal_bench/tb_runner.yaml @@ -0,0 +1,34 @@ +eval: + defaults: + n_samples_per_eval_prompt: 1 + temperature: 0.6 + top_p: 0.95 + top_k: -1 + max_response_len: 24576 + datasets: # these eval tasks go through miles dataset config and default rollout function (miles.rollout.sglang_rollout.generate_rollout) + - name: gpqa # huggingface-cli download --repo-type dataset zyzshishui0627/gpqa_diamond --local-dir /root/gpqa + path: /root/gpqa/gpqa_eval.jsonl + rm_type: gpqa + n_samples_per_eval_prompt: 2 + - name: ifbench # huggingface-cli download --repo-type dataset zyzshishui0627/IFBench --local-dir /root/ifbench + path: /root/ifbench/IFBench_eval.jsonl + rm_type: ifbench + n_samples_per_eval_prompt: 1 + delegate: + - name: terminal_bench + url: http://172.17.0.1:9051 # Port must match the TB server running on the host machine + timeout_secs: 86400 # 24 hours + max_retries: 1 # HTTP request retries from Miles to the TB server + model_name: qwen3-8b + agent_name: terminus-2 + api_base: http://127.0.0.1:30005/v1 # Port must match the sglang router port set in run-eval-tb-qwen.sh + runner: tb + dataset_name: terminal-bench-core + dataset_version: "0.1.1" + output_path: tb_runner_jobs + n_concurrent: 16 + # runner_kwargs: + # task_id: + # - hello-world + # n_tasks: 10 + # example_flag: value diff --git a/examples/eval/terminal_bench/README.md b/examples/eval/terminal_bench/README.md index 341e543fc..904e63169 100644 --- a/examples/eval/terminal_bench/README.md +++ b/examples/eval/terminal_bench/README.md @@ -1,12 +1,12 @@ # Terminal Bench Eval -This folder wires Terminal Bench (TB) into Miles as an eval delegate. The TB run happens on the host via the `tb` CLI, and Miles reads back aggregated metrics such as `accuracy`, `n_resolved`, `n_unresolved`, `pass_at_k/*`, and token stats like `total_input_tokens_mean/median` and `total_output_tokens_mean/median`. +This folder wires Terminal Bench (TB) into Miles as an eval delegate. The run happens on the host via `harbor run` (Terminal Bench 2.0, default) or `tb run` (Terminal Bench 1.0, legacy). Metrics extraction lives in `utils/metrics.py` and command construction lives in `utils/runner.py`. ## What runs where - Miles runs your training/eval loop inside the Docker container. - Miles calls the TB delegate client. -- The TB delegate server (`tb_server.py`) runs `tb run ...` on the host. +- The TB delegate server (`tb_server.py`) runs `harbor run ...` or `tb run ...` on the host. - The server reads the latest TB JSON results and returns metrics to Miles. ## 1) Get the code (host) @@ -15,7 +15,6 @@ This folder wires Terminal Bench (TB) into Miles as an eval delegate. The TB run mkdir miles-tb cd miles-tb git clone https://github.com/radixark/miles.git -git clone https://github.com/laude-institute/terminal-bench ``` ## 2) Launch the Miles container @@ -31,7 +30,7 @@ docker run \ --ulimit memlock=-1 \ --ulimit stack=67108864 \ --ulimit nofile=65536:65536 \ - -v /mnt/data/.cache:/root/.cache \ + -v /data/cache:/root/.cache \ -v $(pwd):/shared/miles-tb \ --name \ radixark/miles:latest \ @@ -46,15 +45,26 @@ docker exec -it /bin/bash ## 4) Terminal Bench environment (host) -Run on the machine that will host `tb_server.py` (where you cloned both repos): +Run on the machine that will host `tb_server.py`: ```bash # Host machine terminal (outside Docker) uv venv --python 3.13 .venv source .venv/bin/activate +uv pip install -r miles/examples/eval/terminal_bench/requirements.txt +``` + +Terminal Bench 2.0 (default, via harbor): + +```bash +uv pip install harbor +``` + +Terminal Bench 1.0 (legacy, via tb CLI): +```bash +git clone https://github.com/laude-institute/terminal-bench uv pip install terminal-bench/. -uv pip install -r miles/examples/eval/terminal_bench/requirements.txt ``` Notes: @@ -62,32 +72,35 @@ Notes: ## 5) Start the Terminal Bench server -Run on the host (same machine where `tb` works): +Run on the host (same machine where `tb`/`harbor` works). Match the port in your +eval config (examples use `9051`): ```bash -python miles/examples/eval/terminal_bench/tb_server.py \ - --host 0.0.0.0 --port 9051 \ - --output-root tb_eval_output +python miles/examples/eval/terminal_bench/tb_server.py --host 0.0.0.0 --port 9051 ``` What it does: - Uses `OPENAI_API_KEY=EMPTY` -- Runs `tb run -a terminus-2 -m openai/ ... --n-concurrent 8` -- Waits for completion, then returns `accuracy`, `n_resolved`, - `n_unresolved`, `pass_at_k/*`, and token stats such as - `total_input_tokens_mean/median` and `total_output_tokens_mean/median` +- For `runner: harbor`, builds a command like: + `harbor run -d terminal-bench@2.0 --jobs-dir --job-name --model openai/ --agent --agent-kwarg api_base=... --n-concurrent ...` +- For `runner: tb`, builds a command like: + `tb run -d terminal-bench-core==0.1.1 --output-path --run-id --model openai/ --agent --agent-kwarg api_base=... --n-concurrent ...` +- Waits for completion, then returns TB metrics (`accuracy`, `n_resolved`, + `n_unresolved`, `pass_at_k/*`, `total_input_tokens_mean/median/min/max`, + `total_output_tokens_mean/median`) or Harbor metrics (`n_trials`, `n_errors`, + `metrics` entries like `mean`, `reward_stats/*`, `exception_stats/*`, + `n_input_tokens/*`, `n_output_tokens/*`). ## 6) Run the eval script (example) -If you use the provided Qwen eval launcher (`run-eval-tb-qwen.sh`), follow the steps below to run Terminal-Bench evaluation. +If you use the provided Qwen eval launcher (`run-eval-tb-qwen.sh`), follow the steps below to run Terminal-Bench evaluation. Configure the runner via `harbor_runner.yaml` or `tb_runner.yaml`. runner_kwargs is used to pass through extra CLI arguments, new parameters can be added directly via runner_kwargs. -First, update the `dataset_path` in `eval_tb_example.yaml` to the local path of `terminal-bench/tasks` on your host (not an internal Docker-only path). Then download the HuggingFace model checkpoint inside the Miles container: ```bash huggingface-cli download open-thoughts/OpenThinker-Agent-v1 \ ---local-dir /root/.cache/OpenThinker-Agent-v1 +--local-dir /root/.cache/huggingface/OpenThinker-Agent-v1 ``` After downloading, convert the HuggingFace checkpoint to Miles's torch distributed format. From the Miles root directory, run: @@ -100,18 +113,16 @@ export PYTHONPATH=/root/Megatron-LM:/shared/miles-tb/miles python tools/convert_hf_to_torch_dist.py \ ${MODEL_ARGS[@]} \ - --hf-checkpoint /root/.cache/OpenThinker-Agent-v1 \ - --save /root/.cache/OpenThinker-Agent-v1_torch_dist + --hf-checkpoint /root/.cache/huggingface/OpenThinker-Agent-v1 \ + --save /root/.cache/huggingface/OpenThinker-Agent-v1_torch_dist ``` Finally, run the following command inside the Miles container: ```bash -bash miles/examples/eval/scripts/run-eval-tb-qwen.sh 2>&1 | tee run.log +bash miles/examples/eval/scripts/terminal_bench/run-eval-tb-qwen.sh 2>&1 | tee run.log ``` -For convenience, you can restrict the evaluation scope in `eval_tb_example.yaml`, either by specifying a single task or multiple tasks (`task_ids`), or by limiting the number of tasks via `n_tasks`. - ## 7) Common Issues When running Miles inside a Docker container with `--network host`, Ray may encounter port conflicts due to shared networking with the host. @@ -120,10 +131,10 @@ In some cases, this manifests as Ray failing to start or reporting Redis- or ses In more severe cases, Ray job submission may fail with errors indicating that no available agent can accept jobs. This typically happens when the dashboard agent or runtime environment agent ports are also in conflict. In such situations, explicitly specifying the agent-related ports (e.g. `--dashboard-agent-listen-port`, `--dashboard-agent-grpc-port`, and `--runtime-env-agent-port`) when starting Ray can resolve the issue. -If the TB server cannot connect to the Miles server through the sglang router (`InternalServerError`), check which address is actually listening on the router port (e.g. 30005 in this example) and update the `api_base` in `eval_tb_example.yaml` accordingly: +If the TB server cannot connect to the Miles server through the sglang router (`InternalServerError`), check which address is actually listening on the router port (e.g. 30005 in this example) and update the `api_base` in `harbor_runner.yaml` or `tb_runner.yaml` accordingly: ```bash ss -lntp | grep 30005 ``` -You may see `Parser warnings`, `Context length exceeded`, `Command 1 should end with newline`, `Harness execution failed` in `tb_server.py` logs. They are warnings from Terminal Bench and can be ignored if runs proceed normally. \ No newline at end of file +You may see `Parser warnings`, `Context length exceeded`, `Command 1 should end with newline`, `Harness execution failed`, `Provider List` in `tb_server.py` logs. They are warnings from Terminal Bench and can be ignored if runs proceed normally. diff --git a/examples/eval/terminal_bench/tb_client.py b/examples/eval/terminal_bench/tb_client.py index 2a93b7161..43104fc14 100644 --- a/examples/eval/terminal_bench/tb_client.py +++ b/examples/eval/terminal_bench/tb_client.py @@ -40,19 +40,22 @@ def evaluate(self, args, rollout_id: int) -> tuple[dict[str, Any], dict[str, Any return metrics, response def _build_payload(self, args, rollout_id: int) -> dict[str, Any]: + return self._base_payload() + + def _base_payload(self) -> dict[str, Any]: payload = { "model_name": self._config.model_name, + "agent_name": self._config.agent_name, + "dataset_name": self._config.dataset_name, + "dataset_version": self._config.dataset_version, "api_base": self._config.api_base, - "n_tasks": self._config.n_tasks, "n_concurrent": self._config.n_concurrent, "metric_prefix": self._config.name, + "runner": self._config.runner, + "output_path": self._config.output_path, } - if self._config.dataset_path: - payload["dataset_path"] = self._config.dataset_path - if self._config.task_ids: - payload["task_ids"] = list(self._config.task_ids) - if self._config.n_attempts is not None: - payload["n_attempts"] = self._config.n_attempts + if self._config.runner_kwargs: + payload["runner_kwargs"] = dict(self._config.runner_kwargs) return payload def _request(self, payload: dict[str, Any]) -> dict[str, Any]: diff --git a/examples/eval/terminal_bench/tb_config.py b/examples/eval/terminal_bench/tb_config.py index f57b445dd..ac0eb3bb2 100644 --- a/examples/eval/terminal_bench/tb_config.py +++ b/examples/eval/terminal_bench/tb_config.py @@ -12,12 +12,14 @@ class TerminalBenchConfig(EvalEnvConfig): """Environment configuration shared by the Terminal Bench client/server.""" model_name: str = "qwen3-8b" + agent_name: str = "terminus-2" api_base: str = "http://127.0.1.1:30001/v1" - dataset_path: str | None = None - n_tasks: int | None = None - task_ids: list[str] = field(default_factory=list) - n_attempts: int | None = None + runner: str = "harbor" + dataset_name: str = "terminal-bench" + dataset_version: str = "2.0" + output_path: str | None = None n_concurrent: int = 8 + runner_kwargs: dict[str, Any] = field(default_factory=dict) @classmethod def parse(cls, args, raw_env_config: Mapping[str, Any], defaults: Mapping[str, Any]) -> TerminalBenchConfig: @@ -27,11 +29,13 @@ def parse(cls, args, raw_env_config: Mapping[str, Any], defaults: Mapping[str, A field_casts = { "model_name": str, + "agent_name": str, "api_base": str, - "n_attempts": int, - "n_tasks": int, + "runner": str, + "dataset_name": lambda v: str(v).strip(), + "dataset_version": lambda v: str(v).strip(), + "output_path": lambda v: str(v).strip(), "n_concurrent": int, - "dataset_path": str, } for key, caster in field_casts.items(): @@ -39,11 +43,9 @@ def parse(cls, args, raw_env_config: Mapping[str, Any], defaults: Mapping[str, A if value is not None: setattr(base_cfg, key, caster(value)) - task_ids = clean_raw.get("task_ids") - if isinstance(task_ids, (list, tuple)): - base_cfg.task_ids = [str(item) for item in task_ids if item] - elif task_ids is not None: - raise ValueError("task_ids must be a list") + runner_kwargs = clean_raw.get("runner_kwargs") + if runner_kwargs is not None: + base_cfg.runner_kwargs = dict(runner_kwargs) return base_cfg diff --git a/examples/eval/terminal_bench/tb_server.py b/examples/eval/terminal_bench/tb_server.py index 58c9d54ad..a278ba493 100644 --- a/examples/eval/terminal_bench/tb_server.py +++ b/examples/eval/terminal_bench/tb_server.py @@ -1,27 +1,24 @@ #!/usr/bin/env python3 """ -Simple HTTP server that proxies Miles evaluation requests to the `tb run` -command shipped with Terminal Bench. +Simple HTTP server that proxies Miles evaluation requests to `tb run` (1.0) +or `harbor run` (2.0), depending on the request payload. Usage: python examples/eval/terminal_bench/tb_server.py \ - --host 0.0.0.0 --port 9050 \ - --output-root /opt/tb-eval + --host 0.0.0.0 --port 9050 Miles (or Miles-compatible runners) should POST the payload described in `EvalRequestPayload` to http://:/evaluate. The server blocks until -`tb run` finishes, then returns aggregated metrics along with paths to the -generated artifacts (logs + raw metrics). +the run finishes, then returns aggregated metrics. """ from __future__ import annotations import argparse -import json import logging import os +import pty import shlex -import statistics import subprocess import sys import threading @@ -31,14 +28,19 @@ from pathlib import Path from typing import Any -REPO_ROOT = Path(__file__).resolve().parents[3] -if str(REPO_ROOT) not in sys.path: - sys.path.insert(0, str(REPO_ROOT)) - from flask import Flask, jsonify, request from omegaconf import OmegaConf from omegaconf.errors import OmegaConfBaseException +from utils.metrics import extract_harbor_metrics, extract_tb_metrics +from utils.runner import ( + Runner, + ServerConfig, + _build_harbor_command, + _build_tb_command, + _normalize_model_name, +) + logger = logging.getLogger("terminal_bench_server") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") @@ -51,13 +53,15 @@ @dataclass class EvalRequestPayload: model_name: str = "" + agent_name: str | None = None api_base: str = "" - n_tasks: int | None = None + runner: str | None = None + dataset_name: str | None = None + dataset_version: str | None = None n_concurrent: int | None = None - dataset_path: str | None = None - task_ids: list[str] | None = None - n_attempts: int | None = None metric_prefix: str | None = None + output_path: str | None = None + runner_kwargs: dict[str, Any] | None = None @dataclass @@ -67,7 +71,6 @@ class JobRecord: run_id: str command: str output_dir: str - log_path: str raw_metrics: dict[str, Any] | None = None error: str | None = None created_at: float = field(default_factory=time.time) @@ -81,7 +84,6 @@ def to_dict(self) -> dict[str, Any]: "run_id": self.run_id, "command": self.command, "output_dir": self.output_dir, - "log_path": self.log_path, "created_at": self.created_at, "started_at": self.started_at, "finished_at": self.finished_at, @@ -93,38 +95,12 @@ def to_dict(self) -> dict[str, Any]: return payload -# --------------------------------------------------------------------------- -# Configuration + command helpers -# --------------------------------------------------------------------------- - - -def _normalize_model_name(model_name: str) -> str: - name = (model_name or "").strip() - if not name: - return "" - if "/" in name: - return name - return f"openai/{name}" - - -@dataclass -class ServerConfig: - output_root: Path - - @classmethod - def from_args(cls, args: argparse.Namespace) -> ServerConfig: - return cls(output_root=Path(args.output_root).expanduser().resolve()) - - class TerminalBenchEvaluator: def __init__(self, config: ServerConfig): self._config = config self._lock = threading.Lock() self._jobs_lock = threading.Lock() self._jobs: dict[str, JobRecord] = {} - self._config.output_root.mkdir(parents=True, exist_ok=True) - self._log_root = REPO_ROOT.parent / "tb_eval_logs" - self._log_root.mkdir(parents=True, exist_ok=True) def evaluate(self, payload: EvalRequestPayload) -> dict[str, Any]: if not payload.model_name: @@ -134,11 +110,11 @@ def evaluate(self, payload: EvalRequestPayload) -> dict[str, Any]: job_id = uuid.uuid4().hex run_id = f"{int(time.time())}-{job_id[:8]}" - run_dir = self._config.output_root / run_id + runner = Runner(payload.runner) + run_dir, job_name = self._prepare_run_dir(payload, runner, run_id) - command = self._build_command(payload, run_id) - command_str = " ".join(shlex.quote(part) for part in command) - log_path = self._log_root / f"{run_id}.log" + command = self._build_command(payload, run_id, runner, job_name) + command_str = self._format_command(command) record = JobRecord( job_id=job_id, @@ -146,14 +122,13 @@ def evaluate(self, payload: EvalRequestPayload) -> dict[str, Any]: run_id=run_id, command=command_str, output_dir=str(run_dir), - log_path=str(log_path), ) with self._jobs_lock: self._jobs[job_id] = record thread = threading.Thread( target=self._run_job, - args=(job_id, payload, run_dir, command, log_path), + args=(job_id, payload, run_dir, command, runner), daemon=True, ) thread.start() @@ -165,7 +140,6 @@ def evaluate(self, payload: EvalRequestPayload) -> dict[str, Any]: "run_id": run_id, "command": command_str, "output_dir": str(run_dir), - "log_path": str(log_path), } def _run_job( @@ -174,38 +148,34 @@ def _run_job( payload: EvalRequestPayload, run_dir: Path, command: list[str], - log_path: Path, + runner: Runner, ) -> None: - with self._jobs_lock: - record = self._jobs.get(job_id) - if record is None: - return - record.status = "running" - record.started_at = time.time() + self._update_job(job_id, status="running", started_at=time.time()) env = self._build_env() logger.info("Starting Terminal Bench run: %s", " ".join(shlex.quote(part) for part in command)) try: with self._lock: - self._run_command(command, env=env, log_path=log_path) - metrics = self._collect_metrics(run_dir) + self._run_command( + command, + env=env, + ) + metrics = self._collect_metrics(run_dir, runner, payload) if payload.metric_prefix: metrics = {payload.metric_prefix: metrics} - with self._jobs_lock: - record = self._jobs.get(job_id) - if record is None: - return - record.status = "completed" - record.raw_metrics = metrics - record.finished_at = time.time() + self._update_job( + job_id, + status="completed", + raw_metrics=metrics, + finished_at=time.time(), + ) except Exception as exc: # noqa: BLE001 - with self._jobs_lock: - record = self._jobs.get(job_id) - if record is None: - return - record.status = "failed" - record.error = str(exc) - record.finished_at = time.time() + self._update_job( + job_id, + status="failed", + error=str(exc), + finished_at=time.time(), + ) def get_job_status(self, job_id: str) -> dict[str, Any] | None: with self._jobs_lock: @@ -214,53 +184,61 @@ def get_job_status(self, job_id: str) -> dict[str, Any] | None: return None return record.to_dict() - def _build_command(self, payload: EvalRequestPayload, run_id: str) -> list[str]: - # 1. Normalize model name (add openai/ prefix) - model_name = _normalize_model_name(payload.model_name) + def _build_command( + self, + payload: EvalRequestPayload, + run_id: str, + runner: Runner, + job_name: str | None, + ) -> list[str]: + if runner is Runner.HARBOR: + cmd = _build_harbor_command(payload, job_name) + else: + cmd = _build_tb_command(payload, run_id, self._config.output_root) - cmd = [ - "tb", - "run", - "-a", - "terminus-2", # Added Agent flag - "--output-path", - str(self._config.output_root), - "--run-id", - run_id, - ] - - # 2. Add model + model_name = _normalize_model_name(payload.model_name) if model_name: cmd.extend(["--model", model_name]) - # 3. Add Agent kwargs (Use api_base exactly like the CLI command) + agent_name = (payload.agent_name or "terminus-2").strip() + if agent_name: + cmd.extend(["--agent", agent_name]) + if payload.api_base: cmd.extend(["--agent-kwarg", f"api_base={payload.api_base}"]) - if payload.dataset_path: - cmd.extend(["--dataset-path", payload.dataset_path]) - - if payload.n_attempts is not None: - cmd.extend(["--n-attempts", str(payload.n_attempts)]) - - # 4. Add n_tasks if present - task_ids = [] - if payload.task_ids: - task_ids.extend([str(item) for item in payload.task_ids if item]) - if task_ids: - for task_id in task_ids: - cmd.extend(["--task-id", task_id]) - elif payload.n_tasks is not None: - cmd.extend(["--n-tasks", str(payload.n_tasks)]) - - # 5. Add concurrency - n_concurrent = payload.n_concurrent - if n_concurrent is None: - n_concurrent = 1 + n_concurrent = payload.n_concurrent if payload.n_concurrent is not None else 1 cmd.extend(["--n-concurrent", str(n_concurrent)]) return cmd + def _prepare_run_dir( + self, + payload: EvalRequestPayload, + runner: Runner, + run_id: str, + ) -> tuple[Path, str | None]: + if runner is Runner.HARBOR: + jobs_dir = Path(payload.output_path or "jobs").expanduser() + jobs_dir.mkdir(parents=True, exist_ok=True) + return jobs_dir / run_id, run_id + + tb_root = Path(payload.output_path or self._config.output_root).expanduser() + tb_root.mkdir(parents=True, exist_ok=True) + return tb_root / run_id, None + + def _update_job(self, job_id: str, **updates: Any) -> None: + with self._jobs_lock: + record = self._jobs.get(job_id) + if record is None: + return + for key, value in updates.items(): + setattr(record, key, value) + + @staticmethod + def _format_command(command: list[str]) -> str: + return " ".join(shlex.quote(part) for part in command) + def _build_env(self) -> dict[str, str]: env = os.environ.copy() # Inject env var to simulate "OPENAI_API_KEY=EMPTY" @@ -268,93 +246,76 @@ def _build_env(self) -> dict[str, str]: return env @staticmethod - def _run_command(cmd: list[str], *, env: dict[str, str], log_path: Path): - with open(log_path, "w", encoding="utf-8") as log_file: - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - env=env, - text=True, - bufsize=1, - ) - assert process.stdout is not None - for line in process.stdout: - log_file.write(line) - log_file.flush() - sys.stdout.write(line) - sys.stdout.flush() - retcode = process.wait() + def _run_command( + cmd: list[str], + *, + env: dict[str, str], + ): + env = env.copy() + env.setdefault("TERM", "xterm-256color") + env.setdefault("RICH_FORCE_TERMINAL", "1") + master_fd, slave_fd = pty.openpty() + process = subprocess.Popen( + cmd, + stdout=slave_fd, + stderr=slave_fd, + env=env, + ) + os.close(slave_fd) + try: + while True: + try: + data = os.read(master_fd, 1024) + except OSError: + break + if not data: + break + sys.stdout.buffer.write(data) + sys.stdout.buffer.flush() + finally: + os.close(master_fd) + retcode = process.wait() if retcode != 0: - with open(log_path, encoding="utf-8", errors="ignore") as log_file: - tail = "".join(log_file.readlines()[-200:]) - raise RuntimeError(f"`tb run` failed with exit code {retcode}. See {log_path}\n{tail}") + raise RuntimeError(f"Command failed with exit code {retcode}.") @staticmethod - def _collect_metrics(run_dir: Path) -> dict[str, Any]: - metrics_path = run_dir / "results.json" - if not metrics_path.exists(): - logger.warning("Results file missing at %s", metrics_path) - return {} - - metrics = TerminalBenchEvaluator._extract_metrics(metrics_path) + def _collect_metrics(run_dir: Path, runner: Runner, payload: EvalRequestPayload) -> dict[str, Any]: + if runner is Runner.HARBOR: + metrics_path = run_dir / "result.json" + if not metrics_path.exists(): + fallback = TerminalBenchEvaluator._find_latest_result( + Path(payload.output_path or "jobs").expanduser() + ) + if fallback is not None: + metrics_path = fallback + if not metrics_path.exists(): + logger.warning("Results file missing at %s", metrics_path) + return {} + metrics = extract_harbor_metrics( + metrics_path, + run_dir, + model_name=_normalize_model_name(payload.model_name), + dataset_name=(payload.dataset_name or "terminal-bench"), + agent_name=(payload.agent_name or "terminus-2"), + ) + else: + metrics_path = run_dir / "results.json" + if not metrics_path.exists(): + logger.warning("Results file missing at %s", metrics_path) + return {} + metrics = extract_tb_metrics(metrics_path) if not metrics: logger.warning("No accuracy/n_resolved metrics found in %s", metrics_path) return metrics @staticmethod - def _extract_metrics(metrics_path: Path) -> dict[str, Any]: - try: - with open(metrics_path, encoding="utf-8") as fp: - metrics_data = json.load(fp) - except json.JSONDecodeError as exc: - logger.warning("Failed to parse %s: %s", metrics_path, exc) - return {} - - metrics: dict[str, Any] = {} - - # core metrics - accuracy = metrics_data.get("accuracy") - if isinstance(accuracy, (int, float)): - metrics["accuracy"] = float(accuracy) - - n_resolved = metrics_data.get("n_resolved") - if isinstance(n_resolved, (int, float)): - metrics["n_resolved"] = int(n_resolved) - - n_unresolved = metrics_data.get("n_unresolved") - if isinstance(n_unresolved, (int, float)): - metrics["n_unresolved"] = int(n_unresolved) - - # pass@k flatten - pass_at_k = metrics_data.get("pass_at_k") - if isinstance(pass_at_k, dict): - for k, v in pass_at_k.items(): - if isinstance(v, (int, float)): - metrics[f"pass_at_k/{k}"] = float(v) - - # token stats from per-task results - results = metrics_data.get("results") - if isinstance(results, list): - input_tokens = [ - r.get("total_input_tokens") - for r in results - if isinstance(r, dict) and isinstance(r.get("total_input_tokens"), (int, float)) - ] - output_tokens = [ - r.get("total_output_tokens") - for r in results - if isinstance(r, dict) and isinstance(r.get("total_output_tokens"), (int, float)) - ] - - if input_tokens: - metrics["total_input_tokens_mean"] = float(statistics.mean(input_tokens)) - metrics["total_input_tokens_median"] = float(statistics.median(input_tokens)) - if output_tokens: - metrics["total_output_tokens_mean"] = float(statistics.mean(output_tokens)) - metrics["total_output_tokens_median"] = float(statistics.median(output_tokens)) - - return metrics + def _find_latest_result(jobs_dir: Path) -> Path | None: + if not jobs_dir.exists(): + return None + candidates = list(jobs_dir.glob("**/result.json")) + if not candidates: + return None + return max(candidates, key=lambda path: path.stat().st_mtime) # --------------------------------------------------------------------------- @@ -410,7 +371,7 @@ def parse_args() -> argparse.Namespace: "--output-root", type=str, default="./terminal-bench-output", - help="Directory to store `tb run` outputs.", + help="Directory to store `tb run` outputs (Terminal Bench 1.0).", ) return parser.parse_args() @@ -420,6 +381,7 @@ def main(): config = ServerConfig.from_args(args) evaluator = TerminalBenchEvaluator(config) app = build_app(evaluator) + logging.getLogger("werkzeug").setLevel(logging.WARNING) logger.info( "Starting Terminal Bench evaluation server on %s:%s (output root=%s)", args.host, diff --git a/examples/eval/terminal_bench/utils/__init__.py b/examples/eval/terminal_bench/utils/__init__.py new file mode 100644 index 000000000..63f2cf887 --- /dev/null +++ b/examples/eval/terminal_bench/utils/__init__.py @@ -0,0 +1 @@ +"""Helper modules for terminal_bench server.""" diff --git a/examples/eval/terminal_bench/utils/metrics.py b/examples/eval/terminal_bench/utils/metrics.py new file mode 100644 index 000000000..10c43824c --- /dev/null +++ b/examples/eval/terminal_bench/utils/metrics.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import json +import logging +import statistics +from pathlib import Path +from typing import Any + +logger = logging.getLogger("terminal_bench_server") + + +def extract_tb_metrics(metrics_path: Path) -> dict[str, Any]: + try: + with open(metrics_path, encoding="utf-8") as fp: + metrics_data = json.load(fp) + except json.JSONDecodeError as exc: + logger.warning("Failed to parse %s: %s", metrics_path, exc) + return {} + + metrics: dict[str, Any] = {} + + # core metrics + accuracy = metrics_data.get("accuracy") + if isinstance(accuracy, (int, float)): + metrics["accuracy"] = float(accuracy) + + n_resolved = metrics_data.get("n_resolved") + if isinstance(n_resolved, (int, float)): + metrics["n_resolved"] = int(n_resolved) + + n_unresolved = metrics_data.get("n_unresolved") + if isinstance(n_unresolved, (int, float)): + metrics["n_unresolved"] = int(n_unresolved) + + # pass@k flatten + pass_at_k = metrics_data.get("pass_at_k") + if isinstance(pass_at_k, dict): + for k, v in pass_at_k.items(): + if isinstance(v, (int, float)): + metrics[f"pass_at_k/{k}"] = float(v) + + # token stats from per-task results + results = metrics_data.get("results") + if isinstance(results, list): + input_tokens = [ + r.get("total_input_tokens") + for r in results + if isinstance(r, dict) + and isinstance(r.get("total_input_tokens"), (int, float)) + ] + output_tokens = [ + r.get("total_output_tokens") + for r in results + if isinstance(r, dict) + and isinstance(r.get("total_output_tokens"), (int, float)) + ] + + if input_tokens: + metrics["total_input_tokens_mean"] = float(statistics.mean(input_tokens)) + metrics["total_input_tokens_median"] = float( + statistics.median(input_tokens) + ) + metrics["total_input_tokens_min"] = float(min(input_tokens)) + metrics["total_input_tokens_max"] = float(max(input_tokens)) + if output_tokens: + metrics["total_output_tokens_mean"] = float( + statistics.mean(output_tokens) + ) + metrics["total_output_tokens_median"] = float( + statistics.median(output_tokens) + ) + + return metrics + + +def extract_harbor_metrics( + metrics_path: Path, + run_dir: Path, + *, + model_name: str, + dataset_name: str, + agent_name: str, +) -> dict[str, Any]: + try: + with open(metrics_path, encoding="utf-8") as fp: + metrics_data = json.load(fp) + except json.JSONDecodeError as exc: + logger.warning("Failed to parse %s: %s", metrics_path, exc) + return {} + + evals = metrics_data.get("stats", {}).get("evals", {}) + if not isinstance(evals, dict) or not evals: + return {} + + candidates = ( + f"{agent_name}__{model_name}__{dataset_name}", + f"{agent_name}__{model_name}__terminal-bench", + ) + entry = next((evals.get(key) for key in candidates if key in evals), None) + if entry is None: + entry = next(iter(evals.values())) + if not isinstance(entry, dict): + return {} + + metrics: dict[str, Any] = {} + for key in ("n_trials", "n_errors"): + value = entry.get(key) + if isinstance(value, (int, float)): + metrics[key] = int(value) + + metrics_block = entry.get("metrics") + if isinstance(metrics_block, list): + for metric in metrics_block: + if isinstance(metric, dict): + for name, value in metric.items(): + if isinstance(value, (int, float)): + metrics[name] = float(value) + + reward_stats = entry.get("reward_stats") + if isinstance(reward_stats, dict): + for reward_name, reward_values in reward_stats.items(): + if isinstance(reward_values, dict): + for reward_value, trials in reward_values.items(): + if isinstance(trials, list): + metrics[f"reward_stats/{reward_name}/{reward_value}"] = len( + trials + ) + + exception_stats = entry.get("exception_stats") + if isinstance(exception_stats, dict): + for exception_name, trials in exception_stats.items(): + if isinstance(trials, list): + metrics[f"exception_stats/{exception_name}"] = len(trials) + + input_tokens = [] + output_tokens = [] + for result_path in run_dir.glob("*/result.json"): + try: + with open(result_path, encoding="utf-8") as fp: + task_data = json.load(fp) + except json.JSONDecodeError: + logger.warning("Failed to parse %s", result_path) + continue + agent_result = task_data.get("agent_result") or {} + n_input = agent_result.get("n_input_tokens") + if isinstance(n_input, (int, float)): + input_tokens.append(float(n_input)) + n_output = agent_result.get("n_output_tokens") + if isinstance(n_output, (int, float)): + output_tokens.append(float(n_output)) + + def add_token_stats(name: str, values: list[float]) -> None: + if not values: + return + metrics[f"{name}/min"] = float(min(values)) + metrics[f"{name}/max"] = float(max(values)) + metrics[f"{name}/mean"] = float(statistics.mean(values)) + metrics[f"{name}/median"] = float(statistics.median(values)) + + add_token_stats("n_input_tokens", input_tokens) + add_token_stats("n_output_tokens", output_tokens) + + return metrics diff --git a/examples/eval/terminal_bench/utils/runner.py b/examples/eval/terminal_bench/utils/runner.py new file mode 100644 index 000000000..af70d435d --- /dev/null +++ b/examples/eval/terminal_bench/utils/runner.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import argparse +import json +from collections.abc import Mapping +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any + + +class Runner(str, Enum): + TB = "tb" + HARBOR = "harbor" + + +def _normalize_model_name(model_name: str) -> str: + name = (model_name or "").strip() + if not name: + return "" + if "/" in name: + return name + return f"openai/{name}" + + +def _snake_to_kebab(value: str) -> str: + return value.replace("_", "-") + + +def _json_value(value: Any) -> str: + return json.dumps(value, separators=(",", ":")) + + +def _append_runner_kwargs(cmd: list[str], runner_kwargs: Mapping[str, Any]) -> None: + for key, value in runner_kwargs.items(): + flag = f"--{_snake_to_kebab(str(key))}" + if isinstance(value, bool): + if value: + cmd.append(flag) + continue + if isinstance(value, list): + for item in value: + if isinstance(item, (dict, list)): + cmd.extend([flag, _json_value(item)]) + else: + cmd.extend([flag, str(item)]) + continue + if isinstance(value, dict): + if key == "agent_kwarg": + for agent_key, agent_value in value.items(): + if isinstance(agent_value, (dict, list)): + agent_value_str = _json_value(agent_value) + else: + agent_value_str = str(agent_value) + cmd.extend([flag, f"{agent_key}={agent_value_str}"]) + else: + cmd.extend([flag, _json_value(value)]) + continue + cmd.extend([flag, str(value)]) + + +@dataclass +class ServerConfig: + output_root: Path + + @classmethod + def from_args(cls, args: argparse.Namespace) -> ServerConfig: + return cls(output_root=Path(args.output_root).expanduser().resolve()) + + +def _build_harbor_command(payload: Any, job_name: str | None) -> list[str]: + dataset_name = (payload.dataset_name or "terminal-bench").strip() or "terminal-bench" + dataset_version = (payload.dataset_version or "2.0").strip() or "2.0" + cmd = [ + "harbor", + "run", + "-d", + f"{dataset_name}@{dataset_version}", + ] + jobs_dir = payload.output_path + if jobs_dir: + cmd.extend(["--jobs-dir", jobs_dir]) + if job_name: + cmd.extend(["--job-name", job_name]) + + if payload.runner_kwargs: + _append_runner_kwargs(cmd, payload.runner_kwargs) + + return cmd + + +def _build_tb_command(payload: Any, run_id: str, output_root: Path) -> list[str]: + dataset_name = (payload.dataset_name or "terminal-bench-core").strip() or "terminal-bench-core" + dataset_version = (payload.dataset_version or "0.1.1").strip() or "0.1.1" + cmd = [ + "tb", + "run", + "-d", + f"{dataset_name}=={dataset_version}", + ] + output_root = str(Path(payload.output_path or output_root).expanduser()) + Path(output_root).mkdir(parents=True, exist_ok=True) + cmd.extend(["--output-path", output_root, "--run-id", run_id]) + if payload.runner_kwargs: + _append_runner_kwargs(cmd, payload.runner_kwargs) + + return cmd