From 1b6c31f6774331e3391bdea4247754eb3b91ebd8 Mon Sep 17 00:00:00 2001 From: HaoZhang534 Date: Thu, 12 Feb 2026 02:49:16 +0000 Subject: [PATCH] test script --- scripts/tests/run_standalone_swebench_test.sh | 216 ++++ ...un_standalone_swebench_test_single_node.sh | 211 ++++ scripts/tests/standalone_swebench_test.py | 1032 +++++++++++++++++ scripts/tests/vllm_api_server.py | 184 +++ 4 files changed, 1643 insertions(+) create mode 100644 scripts/tests/run_standalone_swebench_test.sh create mode 100644 scripts/tests/run_standalone_swebench_test_single_node.sh create mode 100644 scripts/tests/standalone_swebench_test.py create mode 100644 scripts/tests/vllm_api_server.py diff --git a/scripts/tests/run_standalone_swebench_test.sh b/scripts/tests/run_standalone_swebench_test.sh new file mode 100644 index 000000000..fe7e6c73e --- /dev/null +++ b/scripts/tests/run_standalone_swebench_test.sh @@ -0,0 +1,216 @@ +#!/bin/bash +#SBATCH --job-name=standalone-swebench-test +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=1 +#SBATCH --mem=1000G +#SBATCH --partition=interactive +#SBATCH --time=4:00:00 +#SBATCH --account=llmservice_fm_vision +#SBATCH --gpus-per-node=8 +#SBATCH --cpus-per-task=64 +#SBATCH --output=/lustre/fsw/portfolios/llmservice/users/haozh/outputs/verl_internal/results/slurm-%A_%a.out +#SBATCH --error=/lustre/fsw/portfolios/llmservice/users/haozh/outputs/verl_internal/results/slurm-%A_%a.err + +set -x # Enable debug output + +# ==================== Configuration ==================== +HOME_HAOZH='/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/haozh' +OPENHANDS_WORKDIR=$HOME_HAOZH/projects/new_ProRL-Agent-Server/ProRL-Agent-Server +RESULTS_DIR="${OPENHANDS_WORKDIR}/results/standalone_test_$(date +%Y%m%d_%H%M%S)" +container_name="$HOME_HAOZH/singularity_images_v3/nvidian+nemo+verl_v2+vllm0.10dev.sqsh" +MOUNTS="--container-mounts=/lustre:/lustre" +# Model configuration +SFT_MODEL_PATH='/lustre/fsw/portfolios/llmservice/users/haozh/.cache/huggingface/hub/models--Qwen--Qwen3-8B/snapshots/9c925d64d72725edaf899c6cb9c377fd0709d9c5' +TOKENIZER_PATH='/lustre/fsw/portfolios/llmservice/users/haozh/.cache/huggingface/hub/models--Qwen--Qwen3-8B/snapshots/9c925d64d72725edaf899c6cb9c377fd0709d9c5' + +# Data configuration +DATA_PATH="$HOME_HAOZH/data/swegym-new-split/test-transformed-with-prompt-first-64.parquet" +OUTPUT_DIR="${RESULTS_DIR}/standalone_swebench_test_${SLURM_JOB_ID}" + +# Server configuration +GPUS_PER_NODE=8 +TP_SIZE=4 +GPU_MEM_UTIL=0.8 +NUM_SERVERS_PER_NODE=$((GPUS_PER_NODE / TP_SIZE)) +VLLM_BASE_PORT=8100 +OPENHANDS_PORT=8006 +OPENHANDS_NUM_WORKERS=64 + +# Evaluation configuration +NUM_TRAJECTORIES=1 +TEMPERATURE=0.0 +TOP_P=1.0 +MAX_ITERATIONS=50 +MAX_OUTPUT_TOKENS=1536 +MAX_MODEL_LEN=32768 +TIMEOUT=1500 +HINT_MODE=none +TOKEN_LEVEL_GENERATION=true # set to true for token-level generation + +# ==================== Node Setup ==================== +nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +nodes_array=($nodes) +NNODES=$SLURM_NNODES + +mkdir -p "$RESULTS_DIR" + +# ==================== Resolve Node IPs ==================== +declare -a node_ips +for i in "${!nodes_array[@]}"; do + node=${nodes_array[$i]} + node_ip=$(srun --nodes=1 --ntasks=1 -w "$node" hostname --ip-address) + # Convert to ipv4 if needed + if [[ "$node_ip" == *" "* ]]; then + IFS=' ' read -ra ADDR <<<"$node_ip" + if [[ ${#ADDR[0]} -gt 16 ]]; then + node_ip=${ADDR[1]} + else + node_ip=${ADDR[0]} + fi + fi + node_ips[$i]=$node_ip + echo "Node $i: ${nodes_array[$i]} -> IP: $node_ip" +done + +# ==================== Start OpenHands on all nodes ==================== +echo "Starting OpenHands servers on all nodes..." +openhands_urls="" + +for i in "${!nodes_array[@]}"; do + node=${nodes_array[$i]} + node_ip=${node_ips[$i]} + + echo "Starting OpenHands on node $node (IP: $node_ip)" + + srun --nodes=1 --ntasks=1 -w "$node" \ + -o "$RESULTS_DIR/output-%A_%a-openhands-node-$i.out" \ + -e "$RESULTS_DIR/output-%A_%a-openhands-node-$i.err" \ + --container-image="$container_name" $MOUNTS \ + bash -c "cd $OPENHANDS_WORKDIR \ + && export OH_RUNTIME_SINGULARITY_IMAGE_REPO=$HOME_HAOZH/singularity_images_v3 \ + && export OVERWRITE_OPENHANDS_DIR=$OPENHANDS_WORKDIR \ + && export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:\$PATH \ + && export PYTHONPATH=$OPENHANDS_WORKDIR:\$PYTHONPATH \ + && export LOG_LEVEL=ERROR \ + && export DEBUG=False \ + && nohup /usr/bin/python scripts/start_server_thread.py --max-init-workers 70 --max-run-workers $OPENHANDS_NUM_WORKERS --timeout 9999999" & + + # Build the OpenHands URLs string + if [ -z "$openhands_urls" ]; then + openhands_urls="http://$node_ip:$OPENHANDS_PORT" + else + openhands_urls="$openhands_urls+http://$node_ip:$OPENHANDS_PORT" + fi +done + +echo "OpenHands URLs: $openhands_urls" + +# ==================== Start vLLM servers on all nodes ==================== +echo "Starting vLLM servers on all nodes..." +llm_server_urls="" + +for i in "${!nodes_array[@]}"; do + node=${nodes_array[$i]} + node_ip=${node_ips[$i]} + + echo "Starting $NUM_SERVERS_PER_NODE vLLM server(s) on node $node (IP: $node_ip)" + + for server_idx in $(seq 0 $((NUM_SERVERS_PER_NODE - 1))); do + gpu_start=$((server_idx * TP_SIZE)) + gpu_end=$((gpu_start + TP_SIZE - 1)) + cuda_devices=$(seq -s, $gpu_start $gpu_end) + port=$((VLLM_BASE_PORT + server_idx)) + + if [ "$TOKEN_LEVEL_GENERATION" = "true" ]; then + # Token-level generation: use custom vllm_api_server.py + vllm_cmd="CUDA_VISIBLE_DEVICES=$cuda_devices python $OPENHANDS_WORKDIR/scripts/tests/vllm_api_server.py \ + --model $SFT_MODEL_PATH \ + --tensor-parallel-size $TP_SIZE \ + --port $port \ + --host 0.0.0.0 \ + --gpu-memory-utilization $GPU_MEM_UTIL \ + --max-model-len $MAX_MODEL_LEN" + else + # Standard mode: use OpenAI-compatible vLLM server + vllm_cmd="CUDA_VISIBLE_DEVICES=$cuda_devices python -m vllm.entrypoints.openai.api_server \ + --model $SFT_MODEL_PATH \ + --tensor-parallel-size $TP_SIZE \ + --port $port \ + --host 0.0.0.0 \ + --gpu-memory-utilization $GPU_MEM_UTIL \ + --max-model-len $MAX_MODEL_LEN" + fi + + srun --nodes=1 --ntasks=1 -w "$node" \ + -o "$RESULTS_DIR/output-%A_%a-vllm-node-$i-server-$server_idx.out" \ + -e "$RESULTS_DIR/output-%A_%a-vllm-node-$i-server-$server_idx.err" \ + --container-image="$container_name" $MOUNTS \ + bash -c "$vllm_cmd" & + + # Build the LLM server URLs string + if [ -z "$llm_server_urls" ]; then + llm_server_urls="http://$node_ip:$port" + else + llm_server_urls="$llm_server_urls+http://$node_ip:$port" + fi + done +done + +echo "LLM Server URLs: $llm_server_urls" + +# ==================== Wait for servers to be ready ==================== +echo "Waiting for servers to start up..." +sleep 120 + +# Health check for vLLM servers +echo "Checking vLLM server health..." +IFS='+' read -ra LLM_URLS <<< "$llm_server_urls" +for url in "${LLM_URLS[@]}"; do + for attempt in $(seq 1 60); do + if curl -s -o /dev/null -w "%{http_code}" "$url/health" | grep -q "200"; then + echo "vLLM server $url is healthy" + break + fi + if [ $attempt -eq 60 ]; then + echo "WARNING: vLLM server $url did not become healthy after 5 minutes" + fi + sleep 5 + done +done + +# ==================== Build evaluation command args ==================== +TOKEN_LEVEL_FLAG="" +if [ "$TOKEN_LEVEL_GENERATION" = "true" ]; then + TOKEN_LEVEL_FLAG="--token_level_generation" +fi + +# ==================== Run standalone evaluation ==================== +echo "Starting standalone SWE-bench evaluation..." +echo " OpenHands URLs: $openhands_urls" +echo " LLM Server URLs: $llm_server_urls" + +srun --overlap --nodes=1 --ntasks=1 -w "${nodes_array[0]}" \ + -o "$RESULTS_DIR/output-%A_%a-evaluation.out" \ + -e "$RESULTS_DIR/output-%A_%a-evaluation.err" \ + --container-image="$container_name" $MOUNTS \ + bash -c "cd $OPENHANDS_WORKDIR \ + && export PYTHONPATH=$OPENHANDS_WORKDIR:\$PYTHONPATH \ + && python scripts/tests/standalone_swebench_test.py \ + --data_path $DATA_PATH \ + --openhands_urls '$openhands_urls' \ + --llm_server_urls '$llm_server_urls' \ + --model_name $SFT_MODEL_PATH \ + --output_dir $OUTPUT_DIR \ + --num_trajectories $NUM_TRAJECTORIES \ + --num_workers_per_server $OPENHANDS_NUM_WORKERS \ + --temperature $TEMPERATURE \ + --top_p $TOP_P \ + --max_iterations $MAX_ITERATIONS \ + --max_output_tokens $MAX_OUTPUT_TOKENS \ + --max_model_len $MAX_MODEL_LEN \ + --timeout $TIMEOUT \ + --hint_mode $HINT_MODE \ + --custom_tokenizer $TOKENIZER_PATH \ + $TOKEN_LEVEL_FLAG" + +echo "Evaluation completed! Results saved to: $OUTPUT_DIR" diff --git a/scripts/tests/run_standalone_swebench_test_single_node.sh b/scripts/tests/run_standalone_swebench_test_single_node.sh new file mode 100644 index 000000000..91f44852f --- /dev/null +++ b/scripts/tests/run_standalone_swebench_test_single_node.sh @@ -0,0 +1,211 @@ +#!/bin/bash +# Single-node standalone SWE-bench evaluation script. +# Usage: bash run_standalone_swebench_test_single_node.sh +# +# This script launches OpenHands + vLLM servers locally, then runs evaluation. +# All background processes are cleaned up on exit (Ctrl-C or natural completion). + +set -x # Enable debug output + +# ==================== Configuration ==================== +HOME_HAOZH='/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/haozh' +OPENHANDS_WORKDIR=$HOME_HAOZH/projects/new_ProRL-Agent-Server/ProRL-Agent-Server +LOG_DIR="${OPENHANDS_WORKDIR}/logs/standalone_test_$(date +%Y%m%d_%H%M%S)" +OUTPUT_DIR="${OPENHANDS_WORKDIR}/results/standalone_test_$(date +%Y%m%d_%H%M%S)" + +# Model configuration +SFT_MODEL_PATH='/lustre/fsw/portfolios/llmservice/users/haozh/.cache/huggingface/hub/models--Qwen--Qwen3-8B/snapshots/9c925d64d72725edaf899c6cb9c377fd0709d9c5' +TOKENIZER_PATH='/lustre/fsw/portfolios/llmservice/users/haozh/.cache/huggingface/hub/models--Qwen--Qwen3-8B/snapshots/9c925d64d72725edaf899c6cb9c377fd0709d9c5' + +# Data configuration +DATA_PATH=$HOME_HAOZH/data/SWE-GYM-R2E-GYM/test-transformed-with-prompt-first-64.parquet + +# Server configuration +GPUS_PER_NODE=8 +TP_SIZE=4 +GPU_MEM_UTIL=0.8 +NUM_SERVERS=$((GPUS_PER_NODE / TP_SIZE)) +VLLM_BASE_PORT=8100 +OPENHANDS_PORT=8006 +OPENHANDS_NUM_WORKERS=64 + +# Evaluation configuration +NUM_TRAJECTORIES=1 +TEMPERATURE=0.0 +TOP_P=1.0 +MAX_ITERATIONS=50 +MAX_OUTPUT_TOKENS=1536 +MAX_MODEL_LEN=32768 +TIMEOUT=1500 +HINT_MODE=none +TOKEN_LEVEL_GENERATION=true # set to true for token-level generation + +# ==================== Cleanup trap ==================== +PIDS=() + +cleanup() { + echo "" + echo "Cleaning up background processes..." + for pid in "${PIDS[@]}"; do + if kill -0 "$pid" 2>/dev/null; then + echo " Killing PID $pid" + kill "$pid" 2>/dev/null + wait "$pid" 2>/dev/null + fi + done + echo "Cleanup done." +} + +trap cleanup EXIT INT TERM + +# ==================== Setup ==================== +mkdir -p "$LOG_DIR" "$OUTPUT_DIR" + +NODE_IP=$(hostname --ip-address) +# Convert to ipv4 if needed +if [[ "$NODE_IP" == *" "* ]]; then + IFS=' ' read -ra ADDR <<<"$NODE_IP" + if [[ ${#ADDR[0]} -gt 16 ]]; then + NODE_IP=${ADDR[1]} + else + NODE_IP=${ADDR[0]} + fi +fi +echo "Node IP: $NODE_IP" + +# ==================== Start OpenHands server ==================== +echo "Starting OpenHands server..." + +cd "$OPENHANDS_WORKDIR" +export OH_RUNTIME_SINGULARITY_IMAGE_REPO=$HOME_HAOZH/singularity_images_v3 +export OVERWRITE_OPENHANDS_DIR="$OPENHANDS_WORKDIR" +export PYTHONPATH="${OPENHANDS_WORKDIR}:${PYTHONPATH}" +export LOG_LEVEL=ERROR +export DEBUG=False + +python scripts/start_server_thread.py \ + --max-init-workers 70 \ + --max-run-workers "$OPENHANDS_NUM_WORKERS" \ + --timeout 9999999 \ + > "$LOG_DIR/openhands.out" 2> "$LOG_DIR/openhands.err" & +PIDS+=($!) +echo "OpenHands server PID: ${PIDS[-1]}" + +openhands_urls="http://${NODE_IP}:${OPENHANDS_PORT}" +echo "OpenHands URL: $openhands_urls" + +cd "$WORKDIR" + +# ==================== Start vLLM servers ==================== +echo "Starting $NUM_SERVERS vLLM server(s)..." +llm_server_urls="" + +for server_idx in $(seq 0 $((NUM_SERVERS - 1))); do + gpu_start=$((server_idx * TP_SIZE)) + gpu_end=$((gpu_start + TP_SIZE - 1)) + cuda_devices=$(seq -s, "$gpu_start" "$gpu_end") + port=$((VLLM_BASE_PORT + server_idx)) + + echo " Server $server_idx: GPUs=$cuda_devices, port=$port" + + if [ "$TOKEN_LEVEL_GENERATION" = "true" ]; then + CUDA_VISIBLE_DEVICES=$cuda_devices python "$OPENHANDS_WORKDIR/scripts/tests/vllm_api_server.py" \ + --model "$SFT_MODEL_PATH" \ + --tensor-parallel-size "$TP_SIZE" \ + --port "$port" \ + --host 0.0.0.0 \ + --gpu-memory-utilization "$GPU_MEM_UTIL" \ + --max-model-len "$MAX_MODEL_LEN" \ + > "$LOG_DIR/vllm_server_${server_idx}.out" 2> "$LOG_DIR/vllm_server_${server_idx}.err" & + else + CUDA_VISIBLE_DEVICES=$cuda_devices python -m vllm.entrypoints.openai.api_server \ + --model "$SFT_MODEL_PATH" \ + --tensor-parallel-size "$TP_SIZE" \ + --port "$port" \ + --host 0.0.0.0 \ + --gpu-memory-utilization "$GPU_MEM_UTIL" \ + --max-model-len "$MAX_MODEL_LEN" \ + > "$LOG_DIR/vllm_server_${server_idx}.out" 2> "$LOG_DIR/vllm_server_${server_idx}.err" & + fi + PIDS+=($!) + echo " vLLM server $server_idx PID: ${PIDS[-1]}" + + if [ -z "$llm_server_urls" ]; then + llm_server_urls="http://${NODE_IP}:${port}" + else + llm_server_urls="${llm_server_urls}+http://${NODE_IP}:${port}" + fi +done + +echo "LLM Server URLs: $llm_server_urls" + +# ==================== Wait for vLLM servers to be healthy ==================== +echo "Waiting for vLLM servers to become healthy..." +IFS='+' read -ra LLM_URLS <<< "$llm_server_urls" +all_healthy=true +for url in "${LLM_URLS[@]}"; do + healthy=false + for attempt in $(seq 1 120); do + if curl -s -o /dev/null -w "%{http_code}" "${url}/health" 2>/dev/null | grep -q "200"; then + echo " vLLM server $url is healthy (attempt $attempt)" + healthy=true + break + fi + sleep 5 + done + if [ "$healthy" = false ]; then + echo "ERROR: vLLM server $url did not become healthy after 10 minutes" + echo " Check logs: $LOG_DIR/vllm_server_*.err" + all_healthy=false + fi +done + +if [ "$all_healthy" = false ]; then + echo "Some vLLM servers failed to start. Aborting." + exit 1 +fi + +echo "All vLLM servers are healthy." + + +# ==================== Run evaluation ==================== +echo "" +echo "==========================================" +echo "Starting standalone SWE-bench evaluation" +echo "==========================================" +echo " OpenHands URLs: $openhands_urls" +echo " LLM Server URLs: $llm_server_urls" +echo " Data: $DATA_PATH" +echo " Output: $OUTPUT_DIR" +echo " Logs: $LOG_DIR" +echo "==========================================" + +TOKEN_LEVEL_FLAG="" +if [ "$TOKEN_LEVEL_GENERATION" = "true" ]; then + TOKEN_LEVEL_FLAG="--token_level_generation" +fi + +cd "$OPENHANDS_WORKDIR" +export PYTHONPATH="${OPENHANDS_WORKDIR}:${PYTHONPATH}" + +python scripts/tests/standalone_swebench_test.py \ + --data_path "$DATA_PATH" \ + --openhands_urls "$openhands_urls" \ + --llm_server_urls "$llm_server_urls" \ + --model_name "$SFT_MODEL_PATH" \ + --output_dir "$OUTPUT_DIR" \ + --num_trajectories "$NUM_TRAJECTORIES" \ + --num_workers_per_server "$OPENHANDS_NUM_WORKERS" \ + --temperature "$TEMPERATURE" \ + --top_p "$TOP_P" \ + --max_iterations "$MAX_ITERATIONS" \ + --max_output_tokens "$MAX_OUTPUT_TOKENS" \ + --max_model_len "$MAX_MODEL_LEN" \ + --timeout "$TIMEOUT" \ + --hint_mode "$HINT_MODE" \ + --custom_tokenizer "$TOKENIZER_PATH" \ + $TOKEN_LEVEL_FLAG + +echo "" +echo "Evaluation completed! Results saved to: $OUTPUT_DIR" +echo "Logs available at: $LOG_DIR" diff --git a/scripts/tests/standalone_swebench_test.py b/scripts/tests/standalone_swebench_test.py new file mode 100644 index 000000000..cf9404123 --- /dev/null +++ b/scripts/tests/standalone_swebench_test.py @@ -0,0 +1,1032 @@ +#!/usr/bin/env python3 +""" +Standalone SWE-bench validation/testing script. + +This script replicates the validation logic from the verl training framework +without depending on any verl project files. It: +1. Loads test data from a parquet file +2. Sends instances to OpenHands servers for multi-turn agent interaction +3. Computes rewards (resolved, file_iou, block_iou) +4. Computes pass@k metrics +5. Saves results to JSONL + +Usage: + python standalone_swebench_test.py \ + --data_path /path/to/test-transformed-with-prompt.parquet \ + --openhands_urls "http://host1:8006+http://host2:8006" \ + --model_name "Qwen/Qwen3-32B" \ + --output_dir ./results +""" + +import argparse +import asyncio +import json +import logging +import os +import re +import time +from collections import defaultdict +from functools import partial +from typing import Any, Callable, Dict, List, Set, Tuple + +import aiohttp +import numpy as np +import pandas as pd +from scipy.special import comb + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger(__name__) + + +# ===================================================================== +# File / Block IoU computation (from unified diffs) +# ===================================================================== + +HunkRange = Tuple[int, int] + + +def merge_intervals(intervals: List[HunkRange]) -> List[HunkRange]: + if not intervals: + return [] + intervals = sorted(intervals) + merged: List[HunkRange] = [] + s, e = intervals[0] + for cs, ce in intervals[1:]: + if cs <= e + 1: + e = max(e, ce) + else: + merged.append((s, e)) + s, e = cs, ce + merged.append((s, e)) + return merged + + +def intervals_length(intervals: List[HunkRange]) -> int: + return sum(e - s + 1 for s, e in intervals) + + +def intervals_intersection_length(a: List[HunkRange], b: List[HunkRange]) -> int: + i = j = 0 + inter = 0 + while i < len(a) and j < len(b): + s1, e1 = a[i] + s2, e2 = b[j] + s, e = max(s1, s2), min(e1, e2) + if s <= e: + inter += e - s + 1 + if e1 < e2: + i += 1 + else: + j += 1 + return inter + + +def _strip_ab_prefix(path: str) -> str: + if path.startswith("a/"): + return path[2:] + if path.startswith("b/"): + return path[2:] + return path + + +def _set_to_blocks(line_set: Set[int]) -> List[HunkRange]: + if not line_set: + return [] + seq = sorted(line_set) + blocks: List[HunkRange] = [] + start = prev = seq[0] + for v in seq[1:]: + if v == prev + 1: + prev = v + else: + blocks.append((start, prev)) + start = prev = v + blocks.append((start, prev)) + return blocks + + +def parse_unified_diff(patch_text: str): + """Parse a unified diff and return (files, new_blocks, old_blocks).""" + files: Set[str] = set() + new_blocks: Dict[str, List[HunkRange]] = {} + old_blocks: Dict[str, List[HunkRange]] = {} + + lines = patch_text.splitlines() + current_file = None + old_file = None + new_file = None + in_hunk = False + old_line = None + new_line = None + file_old_changed: Set[int] = set() + file_new_changed: Set[int] = set() + + def finalize_file(): + nonlocal current_file, file_old_changed, file_new_changed + if current_file is not None: + files.add(current_file) + if file_new_changed: + new_blocks.setdefault(current_file, []).extend(_set_to_blocks(file_new_changed)) + if file_old_changed: + old_blocks.setdefault(current_file, []).extend(_set_to_blocks(file_old_changed)) + current_file = None + + hunk_re = re.compile(r"^@@\s+-([0-9]+)(?:,([0-9]+))?\s+\+([0-9]+)(?:,([0-9]+))?\s+@@") + + for line in lines: + if line.startswith("diff --git "): + if current_file is not None: + finalize_file() + file_old_changed = set() + file_new_changed = set() + in_hunk = False + old_file = None + new_file = None + current_file = None + old_line = None + new_line = None + continue + if line.startswith("--- "): + old_file = line[4:].strip() + continue + if line.startswith("+++ "): + new_file = line[4:].strip() + token_new = (new_file or "").split()[0] + token_old = (old_file or "").split()[0] + if token_new and not token_new.endswith("/dev/null"): + current_file = _strip_ab_prefix(token_new) + elif token_old and not token_old.endswith("/dev/null"): + current_file = _strip_ab_prefix(token_old) + else: + current_file = token_new or token_old or "" + continue + if line.startswith("Binary files "): + m = re.search(r"and\s+b/(.+?)\s+differ", line) + if m: + files.add(m.group(1)) + else: + m2 = re.search(r"a/(.+?)\s+and", line) + if m2: + files.add(m2.group(1)) + continue + m = hunk_re.match(line) + if m: + in_hunk = True + old_line = int(m.group(1)) + new_line = int(m.group(3)) + continue + if in_hunk and current_file is not None: + if line.startswith(" "): + old_line += 1 + new_line += 1 + elif line.startswith("-"): + file_old_changed.add(old_line) + old_line += 1 + elif line.startswith("+"): + file_new_changed.add(new_line) + new_line += 1 + + if current_file is not None: + finalize_file() + + return files, new_blocks, old_blocks + + +def file_iou(gt_patch: str, pred_patch: str) -> float: + gt_files, _, _ = parse_unified_diff(gt_patch) + pred_files, _, _ = parse_unified_diff(pred_patch) + inter = len(gt_files & pred_files) + union = len(gt_files | pred_files) + if union == 0: + return 1.0 + return inter / union + + +def block_iou(gt_patch: str, pred_patch: str) -> float: + gt_files, gt_new, gt_old = parse_unified_diff(gt_patch) + pred_files, pred_new, pred_old = parse_unified_diff(pred_patch) + all_files = set(gt_files) | set(pred_files) + inter_sum = 0 + union_sum = 0 + for side in ("new", "old"): + for f in all_files: + g = merge_intervals((gt_new if side == "new" else gt_old).get(f, [])) + p = merge_intervals((pred_new if side == "new" else pred_old).get(f, [])) + if not g and not p: + continue + inter_len = intervals_intersection_length(g, p) + union_len = intervals_length(merge_intervals(g + p)) + inter_sum += inter_len + union_sum += union_len + if union_sum == 0: + return 1.0 + return inter_sum / union_sum + + +def compute_git_patch_ious(gt_patch: str, pred_patch: str) -> Dict[str, float]: + return { + "file_iou": file_iou(gt_patch, pred_patch), + "block_iou": block_iou(gt_patch, pred_patch), + } + + +# ===================================================================== +# Reward computation +# ===================================================================== + + +def compute_rewards(results: List[dict], file_iou_coef: float = 0.0, block_iou_coef: float = 0.0) -> List[dict]: + """ + Compute rewards for each result returned by OpenHands. + + Each result dict should have keys: success, resolved, finish, error, git_patch, messages, instance. + Returns the same list of dicts with added keys: score, file_iou, block_iou. + """ + for r in results: + gt_patch = r.get("instance", {}).get("patch", "") or "" + pred_patch = r.get("git_patch", "") or "" + + r["resolved"] = bool(r.get("resolved", False)) + r["finish"] = bool(r.get("finish", False)) + r["error"] = r.get("error", None) or "" + + if gt_patch and pred_patch: + ious = compute_git_patch_ious(gt_patch, pred_patch) + r["file_iou"] = ious["file_iou"] + r["block_iou"] = ious["block_iou"] + else: + r["file_iou"] = 0.0 + r["block_iou"] = 0.0 + + base_score = 1.0 if r["resolved"] else 0.0 + converted_file_iou = 1.0 if r["resolved"] else r["file_iou"] + converted_block_iou = 1.0 if r["resolved"] else r["block_iou"] + r["score"] = base_score + file_iou_coef * converted_file_iou + block_iou_coef * converted_block_iou + + return results + + +# ===================================================================== +# Metrics: process_validation_metrics +# ===================================================================== + + +def bootstrap_metric( + data: list, + subset_size: int, + reduce_fns: list, + n_bootstrap: int = 1000, + seed: int = 42, +) -> list: + np.random.seed(seed) + bootstrap_metric_lsts = [[] for _ in range(len(reduce_fns))] + for _ in range(n_bootstrap): + bootstrap_idxs = np.random.choice(len(data), size=subset_size, replace=True) + bootstrap_data = [data[i] for i in bootstrap_idxs] + for i, reduce_fn in enumerate(reduce_fns): + bootstrap_metric_lsts[i].append(reduce_fn(bootstrap_data)) + return [(np.mean(lst), np.std(lst)) for lst in bootstrap_metric_lsts] + + +def calc_maj_val(data: list, vote_key: str, val_key: str) -> float: + vote2vals = defaultdict(list) + for d in data: + vote2vals[d[vote_key]].append(d[val_key]) + vote2cnt = {k: len(v) for k, v in vote2vals.items()} + maj_vote = max(vote2cnt, key=vote2cnt.get) + return vote2vals[maj_vote][0] + + +def process_validation_metrics( + data_sources: list, + sample_inputs: list, + infos_dict: dict, + seed: int = 42, +) -> dict: + """Group by data_source and prompt, then compute mean, best@N, worst@N, maj@N.""" + data_src2prompt2var2vals = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + for idx, ds in enumerate(data_sources): + prompt = sample_inputs[idx] + var2vals = data_src2prompt2var2vals[ds][prompt] + for var_name, var_vals in infos_dict.items(): + var2vals[var_name].append(var_vals[idx]) + + data_src2prompt2var2metric = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) + for ds, prompt2var2vals in data_src2prompt2var2vals.items(): + for prompt, var2vals in prompt2var2vals.items(): + for var_name, var_vals in var2vals.items(): + if isinstance(var_vals[0], str): + continue + metric = {} + n_resps = len(var_vals) + metric[f"mean@{n_resps}"] = np.mean(var_vals) + if n_resps > 1: + metric[f"std@{n_resps}"] = np.std(var_vals) + ns = [] + n = 2 + while n < n_resps: + ns.append(n) + n *= 2 + ns.append(n_resps) + for n in ns: + [(bon_mean, bon_std), (won_mean, won_std)] = bootstrap_metric( + data=var_vals, subset_size=n, reduce_fns=[np.max, np.min], seed=seed + ) + metric[f"best@{n}/mean"], metric[f"best@{n}/std"] = bon_mean, bon_std + metric[f"worst@{n}/mean"], metric[f"worst@{n}/std"] = won_mean, won_std + data_src2prompt2var2metric[ds][prompt][var_name] = metric + + data_src2var2metric2prompt_vals = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + for ds, prompt2var2metric in data_src2prompt2var2metric.items(): + for prompt, var2metric in prompt2var2metric.items(): + for var_name, metric in var2metric.items(): + for metric_name, metric_val in metric.items(): + data_src2var2metric2prompt_vals[ds][var_name][metric_name].append(metric_val) + + data_src2var2metric2val = defaultdict(lambda: defaultdict(lambda: defaultdict(float))) + for ds, var2metric2prompt_vals in data_src2var2metric2prompt_vals.items(): + for var_name, metric2prompt_vals in var2metric2prompt_vals.items(): + for metric_name, prompt_vals in metric2prompt_vals.items(): + data_src2var2metric2val[ds][var_name][metric_name] = np.mean(prompt_vals) + + return data_src2var2metric2val + + +# ===================================================================== +# Pass@k +# ===================================================================== + + +def calculate_pass_at_k(n: int, c: int, k: int) -> float: + if n - c < k: + return 1.0 + return 1.0 - float(comb(n - c, k)) / float(comb(n, k)) + + +def compute_pass_k_metrics(problem_results: dict, k_values: list = None) -> dict: + if k_values is None: + k_values = [1, 4, 8, 16] + results = {} + for k in k_values: + pass_at_k_scores = [] + valid_problems = 0 + for pid, stats in problem_results.items(): + n = stats["total"] + c = stats["correct"] + if n >= k: + pass_at_k_scores.append(calculate_pass_at_k(n, c, k)) + valid_problems += 1 + if pass_at_k_scores: + results[f"pass@{k}"] = { + "score": float(np.mean(pass_at_k_scores)), + "std": float(np.std(pass_at_k_scores)), + "num_problems": valid_problems, + } + else: + results[f"pass@{k}"] = {"score": 0.0, "std": 0.0, "num_problems": 0} + return results + + +# ===================================================================== +# Data loading +# ===================================================================== + + +def load_test_data(parquet_path: str) -> List[dict]: + """ + Load test instances from a parquet file. + + Expected columns: + - prompt: JSON string of chat messages [{role, content}, ...] + - instance: dict with instance_id, problem_statement, patch, etc. + - data_source: str (e.g. "SWE-Gym/SWE-Gym") + - extra_info: dict with name, index, split, etc. + """ + df = pd.read_parquet(parquet_path) + records = [] + for i, row in df.iterrows(): + record = {} + + # Parse instance + instance = row.get("instance", {}) + if isinstance(instance, str): + instance = json.loads(instance) + record["instance"] = instance + + # Data source + record["data_source"] = row.get("data_source", "unknown") + + # Extra info + extra_info = row.get("extra_info", {}) + if isinstance(extra_info, str): + extra_info = json.loads(extra_info) + record["extra_info"] = extra_info + + # Build instance_id if not present + if "instance_id" not in record["instance"]: + ds = record["data_source"] + name = extra_info.get("name", "") + split = extra_info.get("split", "") + index = extra_info.get("index", i) + record["instance"]["instance_id"] = f"{ds}/{name}/{split}/{index}" + + # Prompt text (for input dedup / pass@k grouping) + prompt = row.get("prompt", "") + if isinstance(prompt, str): + try: + prompt = json.loads(prompt) + except (json.JSONDecodeError, TypeError): + pass + if isinstance(prompt, list): + record["prompt_text"] = " ".join(m.get("content", "") for m in prompt if isinstance(m, dict)) + else: + record["prompt_text"] = str(prompt) + + records.append(record) + + logger.info(f"Loaded {len(records)} test instances from {parquet_path}") + return records + + +# ===================================================================== +# OpenHands interaction +# ===================================================================== + + +async def send_to_openhands( + session: aiohttp.ClientSession, + openhands_url: str, + instance: dict, + sampling_params: dict, + timeout_seconds: int = 1500, +) -> dict: + """Send a single instance to an OpenHands server's /process endpoint.""" + request_data = {"instance": instance, "sampling_params": sampling_params} + try: + timeout = aiohttp.ClientTimeout(total=timeout_seconds) + async with session.post( + f"{openhands_url}/process", + headers={"Content-Type": "application/json"}, + json=request_data, + timeout=timeout, + ) as resp: + if resp.status == 200: + data = await resp.json() + messages = data.get("messages", []) + if messages and len(messages) > 0: + return data + else: + logger.warning(f"Empty messages from {openhands_url} for {instance.get('instance_id', '?')}") + return {"success": False, "messages": [], "resolved": False, "finish": False, "error": "Empty response", "git_patch": ""} + else: + error_text = await resp.text() + logger.error(f"HTTP {resp.status} from {openhands_url}: {error_text}") + return {"success": False, "messages": [], "resolved": False, "finish": False, "error": f"HTTP {resp.status}", "git_patch": ""} + except asyncio.TimeoutError: + logger.error(f"Timeout for instance {instance.get('instance_id', '?')}") + return {"success": False, "messages": [], "resolved": False, "finish": False, "error": "Timeout", "git_patch": ""} + except Exception as e: + logger.error(f"Error for instance {instance.get('instance_id', '?')}: {e}") + return {"success": False, "messages": [], "resolved": False, "finish": False, "error": str(e), "git_patch": ""} + + +async def start_openhands_server(session: aiohttp.ClientSession, url: str): + try: + timeout = aiohttp.ClientTimeout(total=60) + async with session.post(f"{url}/start", timeout=timeout) as resp: + if resp.status == 200: + logger.info(f"Started OpenHands server: {url}") + else: + logger.warning(f"Failed to start {url}: HTTP {resp.status}") + except Exception as e: + logger.warning(f"Failed to start {url}: {e}") + + +async def stop_openhands_server(session: aiohttp.ClientSession, url: str): + try: + timeout = aiohttp.ClientTimeout(total=30) + async with session.post(f"{url}/stop", timeout=timeout) as resp: + if resp.status == 200: + logger.info(f"Stopped OpenHands server: {url}") + except Exception as e: + logger.warning(f"Failed to stop {url}: {e}") + + +# ===================================================================== +# LLM Server Management +# ===================================================================== + + +async def clear_llm_servers_from_openhands(session: aiohttp.ClientSession, openhands_url: str): + """Clear existing LLM servers from an OpenHands server.""" + try: + timeout = aiohttp.ClientTimeout(total=30) + async with session.post(f"{openhands_url}/clear_llm_server", timeout=timeout) as resp: + if resp.status == 200: + result = await resp.json() + logger.info(f"Cleared LLM servers from {openhands_url}: {result}") + else: + error_text = await resp.text() + logger.warning(f"Failed to clear LLM servers from {openhands_url}: HTTP {resp.status}: {error_text}") + except Exception as e: + logger.warning(f"Failed to clear LLM servers from {openhands_url}: {e}") + + +async def add_llm_server_to_openhands(session: aiohttp.ClientSession, openhands_url: str, llm_address: str): + """Register a single LLM server address with an OpenHands server.""" + try: + timeout = aiohttp.ClientTimeout(total=30) + payload = {"address": llm_address} + async with session.post(f"{openhands_url}/add_llm_server", json=payload, timeout=timeout) as resp: + if resp.status == 200: + result = await resp.json() + logger.info(f"Added LLM server {llm_address} to {openhands_url}: {result}") + else: + error_text = await resp.text() + logger.error(f"Failed to add LLM server {llm_address} to {openhands_url}: HTTP {resp.status}: {error_text}") + except Exception as e: + logger.error(f"Failed to add LLM server {llm_address} to {openhands_url}: {e}") + + +def _url_to_ip(url: str) -> str: + """Extract IP address from URL by removing protocol and port.""" + url = url.replace("http://", "").replace("https://", "") + return url.split(":")[0] + + +def assign_llm_servers_to_openhands( + openhands_urls: List[str], llm_server_urls: List[str] +) -> Dict[str, List[str]]: + """ + Assign LLM servers to OpenHands servers based on IP locality. + + Phase 1: Assign LLM servers to OpenHands servers on the same IP. + Phase 2: Distribute remaining LLM servers evenly across all OpenHands servers. + """ + assignments: Dict[str, List[str]] = {url: [] for url in openhands_urls} + assigned: Set[str] = set() + + # Phase 1: Locality-based assignment + for llm_url in llm_server_urls: + llm_ip = _url_to_ip(llm_url) + for oh_url in openhands_urls: + oh_ip = _url_to_ip(oh_url) + if llm_ip == oh_ip: + assignments[oh_url].append(llm_url) + assigned.add(llm_url) + break + + # Phase 2: Distribute remaining evenly (round-robin) + remaining = [url for url in llm_server_urls if url not in assigned] + if remaining: + logger.info(f"Distributing {len(remaining)} remaining LLM servers across {len(openhands_urls)} OpenHands servers") + for i, llm_url in enumerate(remaining): + oh_url = openhands_urls[i % len(openhands_urls)] + assignments[oh_url].append(llm_url) + + return assignments + + +async def wait_for_server_health(urls: List[str], timeout: int = 600): + """Wait for all servers to become healthy by polling /health endpoint.""" + logger.info(f"Waiting for {len(urls)} servers to become healthy (timeout: {timeout}s)...") + start = time.time() + async with aiohttp.ClientSession() as session: + for url in urls: + healthy = False + while time.time() - start < timeout: + try: + health_url = f"{url}/health" + async with session.get(health_url, timeout=aiohttp.ClientTimeout(total=5)) as resp: + if resp.status == 200: + logger.info(f"Server {url} is healthy") + healthy = True + break + except Exception: + pass + await asyncio.sleep(5) + if not healthy: + raise TimeoutError(f"Server {url} did not become healthy within {timeout}s") + logger.info("All servers are healthy") + + +async def setup_llm_servers_with_openhands( + openhands_urls: List[str], + llm_server_urls: List[str], + token_level_generation: bool = False, +): + """ + Clear existing LLM servers and register new ones with all OpenHands servers. + + Steps: + 1. Wait for all LLM servers to be healthy. + 2. Clear existing LLM server registrations from all OpenHands servers. + 3. Compute locality-based assignment of LLM servers to OpenHands servers. + 4. Register each LLM server with its assigned OpenHands server. + """ + # Step 1: Wait for LLM servers to be healthy + await wait_for_server_health(llm_server_urls) + + connector = aiohttp.TCPConnector(limit=0) + async with aiohttp.ClientSession(connector=connector) as session: + # Step 2: Clear existing LLM servers + logger.info("Clearing existing LLM servers from all OpenHands servers...") + clear_tasks = [clear_llm_servers_from_openhands(session, url) for url in openhands_urls] + await asyncio.gather(*clear_tasks) + + # Step 3: Compute assignment + assignments = assign_llm_servers_to_openhands(openhands_urls, llm_server_urls) + + # Step 4: Register LLM servers + logger.info(f"Registering {len(llm_server_urls)} LLM servers with {len(openhands_urls)} OpenHands servers...") + add_tasks = [] + for oh_url, llm_urls in assignments.items(): + for llm_url in llm_urls: + # Format address based on token_level_generation mode + if token_level_generation: + address = llm_url if llm_url.startswith("http") else f"http://{llm_url}" + else: + base = llm_url if llm_url.startswith("http") else f"http://{llm_url}" + address = f"{base}/v1" if not base.endswith("/v1") else base + add_tasks.append(add_llm_server_to_openhands(session, oh_url, address)) + await asyncio.gather(*add_tasks) + + # Log summary + for oh_url, llm_urls in assignments.items(): + logger.info(f" {oh_url} -> {llm_urls}") + logger.info("LLM server registration complete") + + +async def run_openhands_evaluation( + instances: List[dict], + openhands_urls: List[str], + sampling_params: dict, + num_trajectories: int = 1, + num_workers_per_server: int = 64, + max_retries: int = 2, + timeout_seconds: int = 1500, +) -> List[dict]: + """ + Send all instances to OpenHands servers with load balancing. + + Returns a flat list of result dicts (one per instance * trajectory). + Each result has the original instance attached. + """ + # Build job list: expand each instance into num_trajectories copies + jobs = [] + for inst in instances: + for traj_id in range(num_trajectories): + job_instance = inst.copy() + job_instance["trajectory_id"] = traj_id + jobs.append(job_instance) + + logger.info(f"Total jobs: {len(jobs)} ({len(instances)} instances x {num_trajectories} trajectories)") + + # Build server worker pool + server_workers = [] + for url in openhands_urls: + for w in range(num_workers_per_server): + server_workers.append(url) + + results = [None] * len(jobs) + job_queue = asyncio.Queue() + for i, job in enumerate(jobs): + await job_queue.put((i, job, 0)) + + server_queue = asyncio.Queue() + for sw in server_workers: + await server_queue.put(sw) + + completed = 0 + total = len(jobs) + processing_start_time = time.time() + + connector = aiohttp.TCPConnector(limit=0) + async with aiohttp.ClientSession(connector=connector) as session: + # Start all servers + start_tasks = [start_openhands_server(session, url) for url in openhands_urls] + await asyncio.gather(*start_tasks) + logger.info("All OpenHands servers started") + + async def worker(): + nonlocal completed + while True: + try: + idx, instance, retry_count = await asyncio.wait_for(job_queue.get(), timeout=5.0) + except asyncio.TimeoutError: + if job_queue.empty(): + break + continue + + server_url = await server_queue.get() + try: + result = await send_to_openhands(session, server_url, instance, sampling_params, timeout_seconds) + result["instance"] = instance + + if not result.get("success", False) and not result.get("messages") and retry_count < max_retries: + await job_queue.put((idx, instance, retry_count + 1)) + logger.info(f"Retrying job {idx} (attempt {retry_count + 1})") + else: + results[idx] = result + completed += 1 + if completed % 10 == 0 or completed == total: + # Compute resolved/finish ratios from completed results + resolved_count = 0 + finish_count = 0 + all_results_count = 0 + resolved_instances = set() + for r in results: + if r is None: + continue + all_results_count += 1 + inst_id = r.get("instance", {}).get("instance_id", "") + if r.get("resolved", False): + resolved_count += 1 + resolved_instances.add(inst_id) + if r.get("finish", False): + finish_count += 1 + resolved_ratio = resolved_count / all_results_count if all_results_count else 0 + finish_ratio = finish_count / all_results_count if all_results_count else 0 + total_instances = len(instances) + resolved_instance_ratio = len(resolved_instances) / total_instances if total_instances else 0 + elapsed_time = time.time() - processing_start_time + logger.info( + f"Progress: {completed}/{total} ({completed / total * 100:.1f}%), " + f"elapsed: {elapsed_time:.1f}s, " + f"resolved: {resolved_ratio:.2f}, finish: {finish_ratio:.2f}, " + f"resolved_instance: {resolved_instance_ratio:.2f}" + ) + finally: + await server_queue.put(server_url) + + # Run workers concurrently + num_concurrent = len(server_workers) + workers = [asyncio.create_task(worker()) for _ in range(num_concurrent)] + await asyncio.gather(*workers) + + # Stop all servers + stop_tasks = [stop_openhands_server(session, url) for url in openhands_urls] + await asyncio.gather(*stop_tasks) + + # Fill in any None results (jobs that exhausted all retries) + for i, r in enumerate(results): + if r is None: + results[i] = { + "success": False, + "messages": [], + "resolved": False, + "finish": False, + "error": "All retries exhausted", + "git_patch": "", + "instance": jobs[i], + } + + return results + + +# ===================================================================== +# Main evaluation logic +# ===================================================================== + + +def save_results_jsonl(results: List[dict], output_path: str): + """Save results as JSONL.""" + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + with open(output_path, "w") as f: + for r in results: + record = { + "instance_id": r.get("instance", {}).get("instance_id", ""), + "trajectory_id": r.get("instance", {}).get("trajectory_id", 0), + "score": r.get("score", 0.0), + "resolved": r.get("resolved", False), + "finish": r.get("finish", False), + "error": r.get("error", ""), + "file_iou": r.get("file_iou", 0.0), + "block_iou": r.get("block_iou", 0.0), + "git_patch": r.get("git_patch", ""), + "num_messages": len(r.get("messages", [])), + } + f.write(json.dumps(record) + "\n") + logger.info(f"Saved {len(results)} results to {output_path}") + + +def print_summary(results: List[dict], instances: List[dict], num_trajectories: int, k_values: List[int]): + """Print evaluation summary with metrics.""" + print("\n" + "=" * 60) + print("EVALUATION SUMMARY") + print("=" * 60) + + total = len(results) + resolved_count = sum(1 for r in results if r.get("resolved", False)) + finish_count = sum(1 for r in results if r.get("finish", False)) + error_count = sum(1 for r in results if r.get("error") and r["error"] not in ("", None)) + max_turn_count = sum( + 1 for r in results + if r.get("error") and "maximum iteration" in str(r["error"]) + ) + stuck_count = sum( + 1 for r in results + if r.get("error") and "stuck in a loop" in str(r["error"]) + ) + + scores = [r.get("score", 0.0) for r in results] + file_ious = [r.get("file_iou", 0.0) for r in results] + block_ious = [r.get("block_iou", 0.0) for r in results] + + print(f"Total instances: {len(instances)}") + print(f"Trajectories per inst: {num_trajectories}") + print(f"Total evaluations: {total}") + print(f"Resolved: {resolved_count}/{total} ({resolved_count / total * 100:.1f}%)") + print(f"Finish action: {finish_count}/{total} ({finish_count / total * 100:.1f}%)") + print(f"Max turn reached: {max_turn_count}/{total} ({max_turn_count / total * 100:.1f}%)") + print(f"Stuck in loop: {stuck_count}/{total} ({stuck_count / total * 100:.1f}%)") + print(f"Has error: {error_count}/{total}") + print(f"Mean score: {np.mean(scores):.4f}") + print(f"Mean file IoU: {np.mean(file_ious):.4f}") + print(f"Mean block IoU: {np.mean(block_ious):.4f}") + + # Group by data source for per-source metrics + data_sources = [r.get("instance", {}).get("data_source", "unknown") for r in results] + unique_sources = sorted(set(data_sources)) + if len(unique_sources) > 1: + print(f"\nPer data-source breakdown:") + for src in unique_sources: + src_scores = [s for s, ds in zip(scores, data_sources) if ds == src] + src_resolved = sum(1 for r, ds in zip(results, data_sources) if ds == src and r.get("resolved")) + print(f" {src}: resolved={src_resolved}/{len(src_scores)} mean_score={np.mean(src_scores):.4f}") + + # Compute process_validation_metrics + sample_inputs = [r.get("instance", {}).get("instance_id", str(i)) for i, r in enumerate(results)] + infos_dict = { + "reward": scores, + "acc": [1.0 if r.get("resolved") else 0.0 for r in results], + } + val_metrics = process_validation_metrics(data_sources, sample_inputs, infos_dict) + print(f"\nValidation metrics (process_validation_metrics):") + for ds, var2metric2val in val_metrics.items(): + for var_name, metric2val in var2metric2val.items(): + for metric_name, val in metric2val.items(): + print(f" {ds}/{var_name}/{metric_name}: {val:.4f}") + + # Compute pass@k + print(f"\n{'=' * 50}") + print("PASS@K RESULTS") + print("=" * 50) + + problem_results = defaultdict(lambda: {"total": 0, "correct": 0}) + for r in results: + problem_id = r.get("instance", {}).get("instance_id", "unknown") + problem_results[problem_id]["total"] += 1 + if r.get("resolved", False): + problem_results[problem_id]["correct"] += 1 + + pass_k = compute_pass_k_metrics(dict(problem_results), k_values) + for k_metric, k_result in sorted(pass_k.items()): + print(f" {k_metric}: {k_result['score']:.4f} +/- {k_result['std']:.4f} (n={k_result['num_problems']})") + + print("=" * 60) + return val_metrics, pass_k + + +def parse_args(): + parser = argparse.ArgumentParser(description="Standalone SWE-bench validation script") + parser.add_argument("--data_path", type=str, required=True, help="Path to test parquet file") + parser.add_argument( + "--openhands_urls", + type=str, + required=True, + help="OpenHands server URLs separated by '+' (e.g. http://host1:8006+http://host2:8006)", + ) + parser.add_argument( + "--llm_server_urls", + type=str, + default=None, + help="vLLM server URLs separated by '+' (e.g. http://host1:8100+http://host2:8100). " + "If provided, these will be registered with OpenHands servers before evaluation.", + ) + parser.add_argument("--model_name", type=str, default="Qwen/Qwen3-32B", help="Model name for vLLM") + parser.add_argument("--output_dir", type=str, default="./test_results", help="Output directory") + parser.add_argument("--num_trajectories", type=int, default=1, help="Number of trajectories per instance") + parser.add_argument("--num_workers_per_server", type=int, default=64, help="Number of concurrent workers per OpenHands server") + parser.add_argument("--max_retries", type=int, default=2, help="Max retries per job") + parser.add_argument("--temperature", type=float, default=0.0, help="Sampling temperature (0 = greedy)") + parser.add_argument("--top_p", type=float, default=1.0, help="Top-p sampling") + parser.add_argument("--max_iterations", type=int, default=50, help="Max agent iterations") + parser.add_argument("--max_output_tokens", type=int, default=1536, help="Max output tokens per turn") + parser.add_argument("--max_model_len", type=int, default=32768, help="Max model context length") + parser.add_argument("--timeout", type=int, default=1500, help="Timeout per request (seconds)") + parser.add_argument("--hint_mode", type=str, default="none", help="Hint mode: none, file, old_str") + parser.add_argument("--file_iou_coef", type=float, default=0.0, help="Coefficient for file IoU in reward") + parser.add_argument("--block_iou_coef", type=float, default=0.0, help="Coefficient for block IoU in reward") + parser.add_argument("--k_values", type=str, default="1,4,8,16", help="Comma-separated k values for pass@k") + parser.add_argument("--token_level_generation", action="store_true", help="Enable token-level generation") + parser.add_argument("--custom_tokenizer", type=str, default=None, help="Custom tokenizer path (defaults to model_name)") + parser.add_argument("--native_tool_calling", action="store_true", default=True, help="Enable native tool calling") + return parser.parse_args() + + +def main(): + args = parse_args() + + # Parse OpenHands URLs + openhands_urls = [url.strip() for url in args.openhands_urls.split("+") if url.strip()] + logger.info(f"OpenHands servers: {openhands_urls}") + + # Register LLM servers with OpenHands if provided + if args.llm_server_urls: + llm_server_urls = [url.strip() for url in args.llm_server_urls.split("+") if url.strip()] + logger.info(f"LLM servers: {llm_server_urls}") + asyncio.run( + setup_llm_servers_with_openhands( + openhands_urls, llm_server_urls, args.token_level_generation + ) + ) + + # Parse k values + k_values = [int(k.strip()) for k in args.k_values.split(",")] + + # Load test data + instances = load_test_data(args.data_path) + + # Set hint_mode on all instances + for inst in instances: + inst["instance"]["hint_mode"] = args.hint_mode + + # Build sampling params + sampling_params = { + "model": f"hosted_vllm/{args.model_name}", + "api_key": "dummy_key", + "modify_params": False, + "log_completions": False, + "native_tool_calling": args.native_tool_calling, + "temperature": args.temperature, + "top_p": args.top_p, + "max_iterations": args.max_iterations, + "max_output_tokens": args.max_output_tokens, + "token_level_generation": args.token_level_generation, + "custom_tokenizer": args.custom_tokenizer or args.model_name, + "max_model_len": args.max_model_len, + "ensure_thinking_end_properly": not args.token_level_generation, + } + logger.info(f"Sampling params: {json.dumps(sampling_params, indent=2)}") + + # Run evaluation + start_time = time.time() + results = asyncio.run( + run_openhands_evaluation( + instances=[inst["instance"] for inst in instances], + openhands_urls=openhands_urls, + sampling_params=sampling_params, + num_trajectories=args.num_trajectories, + num_workers_per_server=args.num_workers_per_server, + max_retries=args.max_retries, + timeout_seconds=args.timeout, + ) + ) + elapsed = time.time() - start_time + logger.info(f"Evaluation took {elapsed:.1f}s") + + # Attach data_source back to results + for r in results: + inst_id = r.get("instance", {}).get("instance_id", "") + # Find matching source instance + for src_inst in instances: + if src_inst["instance"].get("instance_id") == inst_id.replace(f"/{r['instance'].get('trajectory_id', 0)}", ""): + r["instance"]["data_source"] = src_inst.get("data_source", "unknown") + break + if "data_source" not in r.get("instance", {}): + r["instance"]["data_source"] = "unknown" + + # Compute rewards + results = compute_rewards(results, file_iou_coef=args.file_iou_coef, block_iou_coef=args.block_iou_coef) + + # Save results + os.makedirs(args.output_dir, exist_ok=True) + output_path = os.path.join(args.output_dir, "results.jsonl") + save_results_jsonl(results, output_path) + + # Print summary + val_metrics, pass_k = print_summary(results, instances, args.num_trajectories, k_values) + + # Save metrics as JSON + metrics_path = os.path.join(args.output_dir, "metrics.json") + all_metrics = { + "elapsed_seconds": elapsed, + "num_instances": len(instances), + "num_trajectories": args.num_trajectories, + "total_evaluations": len(results), + "resolved_count": sum(1 for r in results if r.get("resolved")), + "resolve_rate": sum(1 for r in results if r.get("resolved")) / len(results) if results else 0, + "mean_score": float(np.mean([r.get("score", 0) for r in results])), + "mean_file_iou": float(np.mean([r.get("file_iou", 0) for r in results])), + "mean_block_iou": float(np.mean([r.get("block_iou", 0) for r in results])), + "pass_k": {k: v for k, v in pass_k.items()}, + "sampling_params": sampling_params, + } + with open(metrics_path, "w") as f: + json.dump(all_metrics, f, indent=2) + logger.info(f"Saved metrics to {metrics_path}") + + +if __name__ == "__main__": + main() diff --git a/scripts/tests/vllm_api_server.py b/scripts/tests/vllm_api_server.py new file mode 100644 index 000000000..e39ebc18d --- /dev/null +++ b/scripts/tests/vllm_api_server.py @@ -0,0 +1,184 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +NOTE: This API server is used only for demonstrating usage of AsyncEngine +and simple performance benchmarks. It is not intended for production use. +For production use, we recommend using our OpenAI compatible server. +We are also not going to accept PRs modifying this file, please +change `vllm/entrypoints/openai/api_server.py` instead. +""" + +import asyncio +import ssl +from argparse import Namespace +from typing import Any, Optional + +import vllm.envs as envs +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse, Response +from vllm.engine.arg_utils import AsyncEngineArgs +from vllm.engine.async_llm_engine import AsyncLLMEngine +from vllm.entrypoints.launcher import serve_http +from vllm.entrypoints.utils import with_cancellation +from vllm.inputs import TokensPrompt +from vllm.logger import init_logger +from vllm.outputs import RequestOutput +from vllm.sampling_params import SamplingParams +from vllm.usage.usage_lib import UsageContext +from vllm.utils import FlexibleArgumentParser, random_uuid, set_ulimit +from vllm.version import __version__ as VLLM_VERSION + +logger = init_logger('vllm.nvidia.api_server') + +app = FastAPI() +engine = None + + +@app.get('/health') +async def health() -> Response: + """Health check.""" + return Response(status_code=200) + + +@app.post('/generate') +async def generate(request: Request) -> Response: + """Generate completion for the request. + + The request should be a JSON object with the following fields: + - prompt: the prompt to use for the generation. + - stream: whether to stream the results or not. + - other fields: the sampling parameters (See `SamplingParams` for details). + """ + request_dict = await request.json() + return await _generate(request_dict, raw_request=request) + + +@with_cancellation +async def _generate(request_dict: dict, raw_request: Request) -> list[int]: + prompt_ids = request_dict.pop('prompt_ids') + # Set logprobs to 0 to only return selected tokens + request_dict['logprobs'] = 0 + sampling_params = SamplingParams(**request_dict) + request_id = random_uuid() + + # Create prompt from token IDs + prompt = TokensPrompt(prompt_token_ids=prompt_ids) + generator = engine.generate( + prompt=prompt, sampling_params=sampling_params, request_id=request_id + ) + + # Get final response + final_res: Optional[RequestOutput] = None + try: + async for output in generator: + final_res = output + except asyncio.CancelledError: + return Response(status_code=499) + + assert final_res is not None + + def obtain_logprobs(logprobs): + if logprobs is None: + return None + log_probs = [] + for d in logprobs: + cur_logprobs = list(d.values()) + assert len(cur_logprobs) == 1, f"Expected 1 logprob per token when logprobs=0, but got {len(cur_logprobs)}" + log_probs.append(cur_logprobs[0].logprob) + return log_probs + + ret = { + 'response_ids': final_res.outputs[0].token_ids, + 'logprobs': obtain_logprobs(final_res.outputs[0].logprobs), + } + return JSONResponse(ret) + + +def build_app(args: Namespace) -> FastAPI: + global app + + app.root_path = args.root_path + return app + + +async def init_app( + args: Namespace, + llm_engine: Optional[AsyncLLMEngine] = None, +) -> FastAPI: + app = build_app(args) + + global engine + + engine_args = AsyncEngineArgs.from_cli_args(args) + engine = ( + llm_engine + if llm_engine is not None + else AsyncLLMEngine.from_engine_args( + engine_args, usage_context=UsageContext.API_SERVER + ) + ) + app.state.engine_client = engine + return app + + +async def run_server( + args: Namespace, llm_engine: Optional[AsyncLLMEngine] = None, **uvicorn_kwargs: Any +) -> None: + logger.info('vLLM API server version %s', VLLM_VERSION) + logger.info('args: %s', args) + + set_ulimit() + + app = await init_app(args, llm_engine) + assert engine is not None + + shutdown_task = await serve_http( + app, + sock=None, + enable_ssl_refresh=args.enable_ssl_refresh, + host=args.host, + port=args.port, + log_level=args.log_level, + timeout_keep_alive=getattr(envs, 'VLLM_HTTP_TIMEOUT_KEEP_ALIVE', 5), + ssl_keyfile=args.ssl_keyfile, + ssl_certfile=args.ssl_certfile, + ssl_ca_certs=args.ssl_ca_certs, + ssl_cert_reqs=args.ssl_cert_reqs, + **uvicorn_kwargs, + ) + + await shutdown_task + + +if __name__ == '__main__': + parser = FlexibleArgumentParser() + parser.add_argument('--host', type=str, default=None) + parser.add_argument('--port', type=parser.check_port, default=8000) + parser.add_argument('--ssl-keyfile', type=str, default=None) + parser.add_argument('--ssl-certfile', type=str, default=None) + parser.add_argument( + '--ssl-ca-certs', type=str, default=None, help='The CA certificates file' + ) + parser.add_argument( + '--enable-ssl-refresh', + action='store_true', + default=False, + help='Refresh SSL Context when SSL certificate files change', + ) + parser.add_argument( + '--ssl-cert-reqs', + type=int, + default=int(ssl.CERT_NONE), + help="Whether client certificate is required (see stdlib ssl module's)", + ) + parser.add_argument( + '--root-path', + type=str, + default=None, + help='FastAPI root_path when app is behind a path based routing proxy', + ) + parser.add_argument('--log-level', type=str, default='error') + parser = AsyncEngineArgs.add_cli_args(parser) + args = parser.parse_args() + + asyncio.run(run_server(args))