diff --git a/.gitignore b/.gitignore index f7bce9e92..372f01f94 100644 --- a/.gitignore +++ b/.gitignore @@ -56,4 +56,11 @@ agent-os # Sandbox workspace directory - created at runtime sandbox-workspace/ -coverage \ No newline at end of file +coverage + +# Python (harbor wrapper) +**/__pycache__/ +**/*.egg-info/ +**/.venv/ +**/uv.lock +**/jobs/ diff --git a/eslint.config.ts b/eslint.config.ts index bf2027c3d..ddfa4686e 100644 --- a/eslint.config.ts +++ b/eslint.config.ts @@ -5,7 +5,7 @@ import { configs } from "typescript-eslint"; import noInstanceof from "eslint-plugin-no-instanceof"; export default defineConfig([ - { ignores: ["**/dist", "**/dist-examples", "**/node_modules"] }, + { ignores: ["**/dist", "**/dist-examples", "**/node_modules", "**/.venv"] }, js.configs.recommended, ...configs.recommended, { diff --git a/libs/harbor/Makefile b/libs/harbor/Makefile new file mode 100644 index 000000000..7e0c1e65c --- /dev/null +++ b/libs/harbor/Makefile @@ -0,0 +1,94 @@ +.PHONY: all build bench-docker bench-docker-all bench-daytona bench-langsmith bench-langsmith-all help + +AGENT_IMPORT = deepagents_js_harbor:DeepAgentsJSWrapper +ENV_IMPORT = deepagents_js_harbor.langsmith_environment:LangSmithEnvironment +PYTHON_DIR = python + +# Default target +all: help + +###################### +# BUILD +###################### + +build: + pnpm build + +###################### +# BENCHMARKS +###################### + +# Run 1 task on Docker (local, good for quick testing) +# Use TASK= to pick a specific task, e.g.: make bench-docker TASK=gpt2-codegolf +TASK ?= gpt2-codegolf +bench-docker: build + @mkdir -p jobs/terminal-bench + cd $(PYTHON_DIR) && uv run harbor run \ + --agent-import-path $(AGENT_IMPORT) \ + --dataset terminal-bench@2.0 -n 1 \ + -t $(TASK) \ + --jobs-dir ../jobs/terminal-bench --env docker + +# Run all tasks on Docker (local) +bench-docker-all: build + @mkdir -p jobs/terminal-bench + cd $(PYTHON_DIR) && uv run harbor run \ + --agent-import-path $(AGENT_IMPORT) \ + --dataset terminal-bench@2.0 -n 1 \ + --jobs-dir ../jobs/terminal-bench --env docker + +# Run 10 concurrent tasks on Daytona (cloud, requires DAYTONA_API_KEY) +bench-daytona: build + @mkdir -p jobs/terminal-bench + cd $(PYTHON_DIR) && uv run harbor run \ + --agent-import-path $(AGENT_IMPORT) \ + --dataset terminal-bench@2.0 -n 10 \ + --jobs-dir ../jobs/terminal-bench --env daytona + +# Run 1 task on LangSmith sandbox (cloud, requires LANGSMITH_API_KEY) +bench-langsmith: build + @mkdir -p jobs/terminal-bench + cd $(PYTHON_DIR) && uv run harbor run \ + --agent-import-path $(AGENT_IMPORT) \ + -c langsmith-env-config.yaml \ + --dataset terminal-bench@2.0 -n 1 \ + -t $(TASK) \ + --jobs-dir ../jobs/terminal-bench + +# Run all tasks on LangSmith sandbox (cloud, 10 concurrent) +bench-langsmith-all: build + @mkdir -p jobs/terminal-bench + cd $(PYTHON_DIR) && uv run harbor run \ + --agent-import-path $(AGENT_IMPORT) \ + -c langsmith-env-config.yaml \ + --dataset terminal-bench@2.0 -n 10 \ + --jobs-dir ../jobs/terminal-bench + +###################### +# HELP +###################### + +help: + @echo '====================' + @echo 'Harbor Benchmark Runner for DeepAgents JS' + @echo '====================' + @echo '' + @echo 'Prerequisites:' + @echo ' - Run: cd $(PYTHON_DIR) && uv sync' + @echo ' - Set ANTHROPIC_API_KEY in environment' + @echo '' + @echo 'Commands:' + @echo ' make build - Build the TypeScript package' + @echo ' make bench-docker - Run 1 task on Docker (TASK=gpt2-codegolf)' + @echo ' make bench-docker-all - Run all tasks on Docker' + @echo ' make bench-daytona - Run 10 tasks on Daytona (cloud)' + @echo ' make bench-langsmith - Run 1 task on LangSmith sandbox (cloud)' + @echo ' make bench-langsmith-all - Run all tasks on LangSmith sandbox (10 concurrent)' + @echo '' + @echo 'Environment variables:' + @echo ' TASK - Task name for bench-docker/bench-langsmith (default: gpt2-codegolf)' + @echo ' ANTHROPIC_API_KEY - Required for Claude models' + @echo ' DAYTONA_API_KEY - Required for Daytona environment' + @echo ' LANGSMITH_API_KEY - Required for LangSmith sandbox & tracing' + @echo ' LANGSMITH_EXPERIMENT - Optional experiment name for LangSmith' + @echo ' DEEPAGENTS_JS_RUNNER - Override path to runner.js' diff --git a/libs/harbor/README.md b/libs/harbor/README.md new file mode 100644 index 000000000..4c50398ad --- /dev/null +++ b/libs/harbor/README.md @@ -0,0 +1,256 @@ +# Harbor Benchmark Runner for DeepAgents JS + +Runs deepagents-js against [Harbor](https://github.com/laude-institute/harbor) benchmarks like [terminal-bench](https://github.com/laude-institute/terminal-bench-2). + +## Why Python in a JS repo? + +Harbor is a Python-only benchmark framework. It can only load Python agent classes that extend `BaseAgent`. Since our agent runs in Node.js, we need a thin Python wrapper that acts as a bridge between the two runtimes. + +The Python side is purely glue (~300 lines). It has no dependency on the deepagents Python SDK -- all agent logic runs in Node.js. + +## Architecture + +``` +Harbor (Python) → wrapper.py (Python) → runner.ts (Node.js) → createDeepAgent (JS) + ↑ ↓ + ↑ exec_request/exec_response + ↑ (JSON-RPC over stdin/stdout) + ↑ ↓ + environment.exec() ← RpcSandbox.execute() + (runs in sandbox) +``` + +1. **Harbor calls `run(instruction, environment)`** on our Python wrapper (`DeepAgentsJSWrapper`) +2. **Python spawns a Node.js subprocess** running `runner.ts`, which creates a deepagents agent via `createDeepAgent()` +3. **The two processes communicate via JSON-RPC over stdin/stdout:** + - When the JS agent needs to execute a shell command (e.g., `ls -la`), the Node process sends an `exec_request` to Python via stdout + - Python calls Harbor's `environment.exec()` -- which runs the command in the sandboxed container (Docker, Daytona, or LangSmith) -- and sends the result back as an `exec_response` via stdin + - All higher-level file operations (read, write, edit, grep, glob) are handled inside Node by `BaseSandbox`, which builds shell commands and routes them through `execute()` +4. **When the agent finishes**, Node sends a `done` message with the full message history, and Python saves the trajectory in Harbor's ATIF format + +## Directory structure + +``` +libs/harbor/ + scripts/ + harbor_langsmith.py # CLI for creating LangSmith datasets & experiments + src/ # TypeScript (Node.js side) + runner.ts # Entry point spawned by Python -- creates agent, runs bridge loop + rpc-sandbox.ts # RpcSandbox extends BaseSandbox -- bridges execute() over stdin/stdout + rpc-protocol.ts # JSON-RPC message types and stdio helpers + index.ts # Package exports + python/ # Python (Harbor side) + deepagents_js_harbor/ + wrapper.py # DeepAgentsJSWrapper -- extends BaseAgent, spawns Node, proxies exec calls + upload_to_langsmith.py # Attach feedback & upload experiment tables to LangSmith + langsmith_environment.py # LangSmithEnvironment -- custom Harbor env backed by LangSmith sandbox + __init__.py + langsmith-env-config.yaml # Harbor job config for LangSmith sandbox runs + pyproject.toml + Makefile # Build + benchmark targets +``` + +## Quick Start + +```bash +# 1. Install Python dependencies +cd libs/harbor/python && uv sync + +# 2. Build the TypeScript runner +cd libs/harbor && pnpm build + +# 3. Configure API keys (use .env or export directly) +export ANTHROPIC_API_KEY="sk-ant-..." # Required: For Claude model +export LANGSMITH_API_KEY="lsv2_..." # Required: For LangSmith tracing & datasets +export LANGCHAIN_TRACING_V2=true # Required: Enable LangSmith tracing +export LANGSMITH_ENDPOINT="https://api.smith.langchain.com" # Optional: Default shown + +# 4. Run a quick test (1 task, Docker) +make bench-docker +``` + +## LangSmith Integration + +LangSmith provides tracing, observability, and experiment comparison for agent runs. The full workflow: + +``` +Create Dataset → Create Experiment → Run Benchmark with Tracing → Attach Feedback → Analyze +``` + +### Prerequisites + +Ensure your LangSmith credentials are configured: + +```bash +export LANGSMITH_API_KEY=lsv2_... +export LANGCHAIN_TRACING_V2=true +export LANGSMITH_ENDPOINT=https://api.smith.langchain.com # Optional: defaults to this +``` + +### Step 1: Create Dataset and Experiment + +Create a LangSmith dataset from Harbor benchmark tasks, then create an experiment session to organize your runs: + +```bash +# Create a dataset from Harbor tasks (downloads from the Harbor registry) +cd python +uv run python ../scripts/harbor_langsmith.py create-dataset terminal-bench --version 2.0 + +# Or use a custom LangSmith name (useful if a dataset with the same name already exists) +uv run python ../scripts/harbor_langsmith.py create-dataset terminal-bench --version 2.0 \ + --langsmith-name terminal-bench-js-v1 + +# Create an experiment session linked to the dataset +uv run python ../scripts/harbor_langsmith.py create-experiment tb2-random-test \ + --name deepagentsjs-baseline-v1 +``` + +The `create-experiment` command outputs the session ID and a direct link to the LangSmith comparison view. + +### Step 2: Run Benchmark with Tracing + +Set `LANGSMITH_EXPERIMENT` to link all runs to the experiment you created in Step 1: + +```bash +# Option 1: Run with experiment tracking (enables side-by-side comparison in LangSmith) +export LANGSMITH_EXPERIMENT="deepagentsjs-baseline-v1" +make bench-docker # 1 task locally +make bench-docker-all # All tasks locally +make bench-daytona # 10 tasks on Daytona (cloud) + +# Option 2: Run a specific task +LANGSMITH_EXPERIMENT="deepagentsjs-baseline-v1" make bench-docker TASK=hello-world@1.0 + +# Option 3: Run harbor directly (customize -n for number of tasks) +cd python +LANGSMITH_EXPERIMENT="deepagentsjs-baseline-v1" uv run harbor run \ + --agent-import-path deepagents_js_harbor:DeepAgentsJSWrapper \ + --dataset terminal-bench@2.0 -n 10 --jobs-dir ../jobs/terminal-bench --env docker + +# Option 4: Development mode (simpler project view, no experiment linking) +LANGSMITH_PROJECT="deepagentsjs-dev" make bench-docker +``` + +### Step 3: Attach Feedback + +After the benchmark completes, attach verifier results (pass/fail, reward scores, test pass rate) to the LangSmith traces: + +```bash +cd python +uv run python -m deepagents_js_harbor.upload_to_langsmith \ + ../jobs/terminal-bench/ \ + --attach-feedback --project-name deepagentsjs-baseline-v1 + +# Dry run first to see what would happen +uv run python -m deepagents_js_harbor.upload_to_langsmith \ + ../jobs/terminal-bench/ \ + --attach-feedback --project-name deepagentsjs-baseline-v1 --dry-run + +# Optionally also upload structured experiment table for comparison view +uv run python -m deepagents_js_harbor.upload_to_langsmith \ + ../jobs/terminal-bench/ \ + --attach-feedback --upload --dataset-name terminal-bench-js-v1 +``` + +This matches trials to traces via `harbor_session_id` metadata and adds feedback scores: +- **pass** — 1.0 or 0.0 based on verifier result +- **reward** — 0.0-1.0 from Harbor's test results +- **test_pass_rate** — fraction of tests passed + +## Analyzing Results + +LangSmith captures every LLM call, tool invocation, and performance metric. Combined with Harbor reward scores (added via Step 3), you can filter runs by performance and identify patterns in successful vs. failed runs. + +### Common Patterns & Fixes + +After running evaluations, analyze failed runs in LangSmith to identify improvement opportunities: + +| Pattern | Symptom | Potential Fix | +|----------------------------|------------------------------------------------------|--------------------------------------------| +| **Poor Planning** | Agent jumps into coding without reading requirements | Add upfront planning requirement to prompt | +| **Incorrect Tool Usage** | Uses `bash cat` instead of `read_file` | Improve tool descriptions with examples | +| **No Incremental Testing** | Writes 200 lines, then tests once | Prompt to test after each logical unit | +| **Hallucinated Paths** | Reads files before checking existence | Add "always `ls` before read" rule | +| **Wrong Model** | Model fails on complex reasoning | Use more capable model for hard tasks | + +### Agent-Assisted Analysis + +Use LangSmith's Insights Agent or your own agent to analyze trajectory data across runs. Task it with identifying common failure patterns, grouping errors by category, and suggesting prompt or tool improvements. + +## Running Benchmarks + +From `libs/harbor/`: + +### Docker (local) + +```bash +# Run a single task +make bench-docker + +# Run a specific task +make bench-docker TASK=gpt2-codegolf + +# Run all tasks +make bench-docker-all +``` + +### Daytona (cloud) + +```bash +# Run 10 concurrent tasks (requires DAYTONA_API_KEY) +make bench-daytona +``` + +### LangSmith Sandbox (cloud) + +Uses a [LangSmith hosted sandbox](https://docs.smith.langchain.com) instead of Docker/Daytona. This is a custom Harbor environment that uses `--environment-import-path` to plug in a `LangSmithEnvironment` class backed by the `langsmith.sandbox` SDK. + +```bash +# Run a single task (requires LANGSMITH_API_KEY) +make bench-langsmith + +# Run a specific task +make bench-langsmith TASK=hello-world@1.0 +``` + +You can also use the YAML config directly with `harbor run`: + +```bash +cd python +uv run harbor run \ + --agent-import-path deepagents_js_harbor:DeepAgentsJSWrapper \ + -c langsmith-env-config.yaml \ + --dataset terminal-bench@2.0 -n 1 \ + -t gpt2-codegolf \ + --jobs-dir ../jobs/terminal-bench +``` + +Or use `harbor trials run` with the CLI flag for a single task: + +```bash +cd python +uv run harbor trials run \ + --agent-import-path deepagents_js_harbor:DeepAgentsJSWrapper \ + --environment-import-path "deepagents_js_harbor.langsmith_environment:LangSmithEnvironment" \ + --environment-kwargs template_name=harbor-default \ + --task ./path-to-task +``` + +The LangSmith environment accepts two kwargs (configurable via `--environment-kwargs` or the YAML config): + +- `template_name` -- sandbox template name (default: `harbor-default`) +- `template_image` -- container image for auto-creating the template (default: `python:3.12-slim`) + +## Environment Variables + +| Variable | Required | Description | +| ---------------------- | ------------- | ------------------------------------------------------------------------------ | +| `ANTHROPIC_API_KEY` | Yes | API key for Claude models | +| `DAYTONA_API_KEY` | For Daytona | API key for Daytona cloud environments | +| `LANGSMITH_API_KEY` | For LangSmith | Required for LangSmith sandbox environment, tracing, and dataset management | +| `LANGCHAIN_TRACING_V2` | For tracing | Set to `true` to enable LangSmith tracing | +| `LANGSMITH_ENDPOINT` | No | LangSmith API endpoint (default: `https://api.smith.langchain.com`) | +| `LANGSMITH_EXPERIMENT` | No | Links runs to a LangSmith experiment for side-by-side comparison | +| `LANGSMITH_PROJECT` | No | LangSmith project name for development/ad-hoc runs | +| `DEEPAGENTS_JS_RUNNER` | No | Override path to `runner.js` (auto-detected by default) | +| `TASK` | No | Task name for `make bench-docker`/`bench-langsmith` (default: `gpt2-codegolf`) | diff --git a/libs/harbor/package.json b/libs/harbor/package.json new file mode 100644 index 000000000..f7e4bd4bb --- /dev/null +++ b/libs/harbor/package.json @@ -0,0 +1,78 @@ +{ + "name": "@langchain/harbor", + "version": "0.0.1", + "description": "Harbor benchmark integration for deepagents - runs terminal-bench via JSON-RPC bridge", + "main": "./dist/index.cjs", + "module": "./dist/index.js", + "types": "./dist/index.d.ts", + "type": "module", + "scripts": { + "build": "tsdown", + "clean": "rm -rf dist/ .tsdown/", + "dev": "tsc --watch", + "typecheck": "tsc --noEmit", + "prepublishOnly": "pnpm build", + "test": "vitest run", + "test:unit": "vitest run", + "bench": "make bench-docker", + "bench:daytona": "make bench-daytona" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/langchain-ai/deepagentsjs.git" + }, + "keywords": [ + "ai", + "agents", + "langgraph", + "langchain", + "typescript", + "llm", + "harbor", + "benchmark", + "terminal-bench" + ], + "author": "LangChain", + "license": "MIT", + "bugs": { + "url": "https://github.com/langchain-ai/deepagentsjs/issues" + }, + "homepage": "https://github.com/langchain-ai/deepagentsjs#readme", + "dependencies": { + "@langchain/anthropic": "^1.0.0", + "@langchain/openai": "^1.2.3", + "langsmith": ">=0.2.0" + }, + "peerDependencies": { + "deepagents": ">=1.6.0" + }, + "devDependencies": { + "deepagents": "workspace:*", + "@langchain/core": "^1.1.19", + "@langchain/langgraph": "^1.1.3", + "@tsconfig/recommended": "^1.0.13", + "@types/node": "^25.1.0", + "@vitest/coverage-v8": "^4.0.18", + "dotenv": "^17.2.3", + "tsdown": "^0.20.1", + "tsx": "^4.21.0", + "typescript": "^5.9.3", + "vitest": "^4.0.18" + }, + "exports": { + ".": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + } + }, + "./package.json": "./package.json" + }, + "files": [ + "dist/**/*" + ] +} diff --git a/libs/harbor/python/deepagents_js_harbor/__init__.py b/libs/harbor/python/deepagents_js_harbor/__init__.py new file mode 100644 index 000000000..82df5e225 --- /dev/null +++ b/libs/harbor/python/deepagents_js_harbor/__init__.py @@ -0,0 +1,4 @@ +from deepagents_js_harbor.langsmith_environment import LangSmithEnvironment +from deepagents_js_harbor.wrapper import DeepAgentsJSWrapper + +__all__ = ["DeepAgentsJSWrapper", "LangSmithEnvironment"] diff --git a/libs/harbor/python/deepagents_js_harbor/langsmith_environment.py b/libs/harbor/python/deepagents_js_harbor/langsmith_environment.py new file mode 100644 index 000000000..8a45b6210 --- /dev/null +++ b/libs/harbor/python/deepagents_js_harbor/langsmith_environment.py @@ -0,0 +1,337 @@ +"""Custom Harbor environment backed by a LangSmith hosted sandbox. + +Uses the ``langsmith.sandbox`` async client to create and manage +sandboxes on sandbox.langchain.com. Plug it into Harbor via the +``--environment-import-path`` flag or the ``environment.import_path`` +config key:: + + harbor trials run \ + --environment-import-path \ + "deepagents_js_harbor.langsmith_environment:LangSmithEnvironment" \ + --environment-kwargs template_name=my-template ... +""" + +from __future__ import annotations + +import os +from pathlib import Path + +from harbor.environments.base import BaseEnvironment, ExecResult +from harbor.models.environment_type import EnvironmentType +from harbor.models.task.config import EnvironmentConfig +from harbor.models.trial.paths import EnvironmentPaths, TrialPaths +from langsmith.sandbox import AsyncSandbox, AsyncSandboxClient, ResourceNotFoundError + + +class LangSmithEnvironment(BaseEnvironment): + """Harbor environment that delegates to a LangSmith hosted sandbox. + + Image resolution order (first non-None wins): + + 1. ``task_env_config.docker_image`` -- pre-built image specified in the + task's ``task.toml``. This is the standard way Terminal-Bench tasks + ship their environment (gcc, weights, test harness, etc.). + 2. ``template_image`` kwarg -- explicit override via ``--environment-kwargs`` + or ``environment.kwargs`` in YAML config. + 3. Fallback default: ``python:3.12-slim``. + + Constructor kwargs (passed via ``--environment-kwargs`` or + ``environment.kwargs`` in YAML config): + + * ``template_name`` -- name of the LangSmith sandbox template to use. + When omitted, a per-task name is derived from the environment name. + * ``template_image`` -- container image override (only used when the task + does not specify ``docker_image``). + """ + + _DEFAULT_TEMPLATE_IMAGE = "python:3.12-slim" + _DEFAULT_EXEC_TIMEOUT = 1000 # seconds + + def __init__( + self, + environment_dir: Path, + environment_name: str, + session_id: str, + trial_paths: TrialPaths, + task_env_config: EnvironmentConfig, + *args, + template_name: str | None = None, + template_image: str | None = None, + **kwargs, + ): + # Resolve the container image: task config > kwarg > default + self._template_image = ( + task_env_config.docker_image + or template_image + or self._DEFAULT_TEMPLATE_IMAGE + ) + + # Store resource limits from the task config so they can be forwarded + # to the LangSmith sandbox template. + self._resource_cpu = str(task_env_config.cpus) + self._resource_memory_mb = task_env_config.memory_mb + self._resource_storage_mb = task_env_config.storage_mb + + # Derive a template name that is unique per image so different tasks + # don't collide. Sanitise for LangSmith naming rules (lowercase, + # alphanumeric + hyphens, max 63 chars). + if template_name: + self._template_name = template_name + else: + safe_name = ( + self._template_image + .replace("/", "-") + .replace(":", "-") + .replace(".", "-") + .lower() + ) + self._template_name = f"harbor-{safe_name}"[:63] + + # These are initialised in start() + self._client: AsyncSandboxClient | None = None + self._sandbox: AsyncSandbox | None = None + + super().__init__( + environment_dir=environment_dir, + environment_name=environment_name, + session_id=session_id, + trial_paths=trial_paths, + task_env_config=task_env_config, + **kwargs, + ) + + # ------------------------------------------------------------------ + # BaseEnvironment metadata + # ------------------------------------------------------------------ + + @staticmethod + def type() -> EnvironmentType: + # No LANGSMITH member in the enum. Return DOCKER as a harmless + # placeholder -- the import-path code path never checks this value + # against the factory map. + return EnvironmentType.DOCKER + + @property + def is_mounted(self) -> bool: + return False + + @property + def supports_gpus(self) -> bool: + return False + + @property + def can_disable_internet(self) -> bool: + return False + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + def _validate_definition(self): + """No local Dockerfile needed for a remote LangSmith sandbox.""" + api_key = os.environ.get("LANGSMITH_API_KEY", "") + if not api_key: + raise RuntimeError( + "LANGSMITH_API_KEY environment variable is not set. " + "It is required for the LangSmith sandbox environment." + ) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def _require_sandbox(self) -> AsyncSandbox: + if self._sandbox is None: + raise RuntimeError( + "LangSmith sandbox not initialised. Call start() first." + ) + return self._sandbox + + async def start(self, force_build: bool) -> None: + self.logger.info( + f"LangSmith environment: image={self._template_image}, " + f"template={self._template_name}, " + f"memory={self._resource_memory_mb}Mi, " + f"cpu={self._resource_cpu}" + ) + + self._client = AsyncSandboxClient() + + # Ensure the template exists (create if missing or forced). + # When reusing an existing template, verify its resource limits + # still match what the task config requests. If they differ + # (e.g. a previous run used a lower memory limit), recreate it. + if force_build: + await self._ensure_template() + else: + try: + existing = await self._client.get_template(self._template_name) + needs_recreate = ( + existing.resources.memory != f"{self._resource_memory_mb}Mi" + or existing.resources.cpu != self._resource_cpu + ) + if needs_recreate: + self.logger.info( + f"Template '{self._template_name}' exists but resource " + f"limits differ (have cpu={existing.resources.cpu}, " + f"memory={existing.resources.memory}; want " + f"cpu={self._resource_cpu}, " + f"memory={self._resource_memory_mb}Mi). Recreating." + ) + await self._ensure_template() + except ResourceNotFoundError: + await self._ensure_template() + + self._sandbox = await self._client.create_sandbox( + template_name=self._template_name, + ) + + if not self._sandbox: + raise RuntimeError( + "LangSmith sandbox was not created. This should never happen." + ) + + self.logger.info( + f"LangSmith sandbox started: {self._sandbox.name} " + f"(template={self._template_name}, image={self._template_image})" + ) + + # Create standard Harbor directories inside the sandbox + await self._sandbox.run( + f"mkdir -p {EnvironmentPaths.agent_dir} {EnvironmentPaths.verifier_dir}" + ) + + async def _ensure_template(self) -> None: + """Create (or recreate) the LangSmith sandbox template.""" + assert self._client is not None + + # Delete existing template if force-rebuilding + try: + await self._client.delete_template(self._template_name) + except ResourceNotFoundError: + pass + + # Convert Harbor's memory_mb / storage_mb to Kubernetes-style strings + # that the LangSmith API expects (e.g. "2048Mi", "10Gi"). + memory_str = f"{self._resource_memory_mb}Mi" + storage_str = f"{self._resource_storage_mb}Mi" + + await self._client.create_template( + name=self._template_name, + image=self._template_image, + cpu=self._resource_cpu, + memory=memory_str, + storage=storage_str, + ) + self.logger.info( + f"Created LangSmith template '{self._template_name}' " + f"with image '{self._template_image}' " + f"(cpu={self._resource_cpu}, memory={memory_str}, storage={storage_str})" + ) + + async def stop(self, delete: bool) -> None: + if not delete: + self.logger.info( + "LangSmith sandboxes are ephemeral and will be deleted " + "after use, regardless of delete=False." + ) + + if self._sandbox: + try: + assert self._client is not None + await self._client.delete_sandbox(self._sandbox.name) + self.logger.info( + f"Deleted LangSmith sandbox: {self._sandbox.name}" + ) + except Exception as e: + self.logger.error(f"Error deleting LangSmith sandbox: {e}") + finally: + self._sandbox = None + + if self._client: + try: + await self._client.aclose() + except Exception: + pass + finally: + self._client = None + + # ------------------------------------------------------------------ + # Command execution + # ------------------------------------------------------------------ + + async def exec( + self, + command: str, + cwd: str | None = None, + env: dict[str, str] | None = None, + timeout_sec: int | None = None, + ) -> ExecResult: + sandbox = self._require_sandbox() + timeout = timeout_sec or self._DEFAULT_EXEC_TIMEOUT + + result = await sandbox.run( + command, + timeout=timeout, + env=env, + cwd=cwd, + ) + + return ExecResult( + stdout=result.stdout, + stderr=result.stderr, + return_code=result.exit_code, + ) + + # ------------------------------------------------------------------ + # File upload + # ------------------------------------------------------------------ + + async def upload_file(self, source_path: Path | str, target_path: str) -> None: + sandbox = self._require_sandbox() + content = Path(source_path).read_bytes() + await sandbox.write(target_path, content) + + async def upload_dir(self, source_dir: Path | str, target_dir: str) -> None: + sandbox = self._require_sandbox() + for file_path in Path(source_dir).rglob("*"): + if file_path.is_file(): + relative = file_path.relative_to(Path(source_dir)) + remote_path = f"{target_dir}/{relative}" + await sandbox.write(remote_path, file_path.read_bytes()) + + # ------------------------------------------------------------------ + # File download + # ------------------------------------------------------------------ + + async def download_file(self, source_path: str, target_path: Path | str) -> None: + sandbox = self._require_sandbox() + data = await sandbox.read(source_path) + Path(target_path).parent.mkdir(parents=True, exist_ok=True) + Path(target_path).write_bytes(data) + + async def download_dir(self, source_dir: str, target_dir: Path | str) -> None: + """Download a directory by listing files with ``find`` then reading each one.""" + sandbox = self._require_sandbox() + + # List all regular files under source_dir + result = await sandbox.run( + f"find {source_dir} -type f", + timeout=30, + ) + + if result.exit_code != 0 or not result.stdout.strip(): + return + + for remote_file in result.stdout.strip().split("\n"): + remote_file = remote_file.strip() + if not remote_file: + continue + + # Compute local target path preserving directory structure + relative = Path(remote_file).relative_to(Path(source_dir)) + local_path = Path(target_dir) / relative + local_path.parent.mkdir(parents=True, exist_ok=True) + + data = await sandbox.read(remote_file) + local_path.write_bytes(data) diff --git a/libs/harbor/python/deepagents_js_harbor/upload_to_langsmith.py b/libs/harbor/python/deepagents_js_harbor/upload_to_langsmith.py new file mode 100644 index 000000000..323bb7690 --- /dev/null +++ b/libs/harbor/python/deepagents_js_harbor/upload_to_langsmith.py @@ -0,0 +1,565 @@ +"""Upload Harbor job results to LangSmith and attach feedback to traces. + +Two modes of operation: + +1. **Attach feedback** (primary) — stamps pass/fail, reward, and test + scores on the detailed traces that were auto-captured during the + benchmark run (via ``LANGCHAIN_TRACING_V2``). + +2. **Upload experiment** (optional) — uploads structured experiment rows + to the ``/datasets/upload-experiment`` endpoint for the side-by-side + comparison table in LangSmith's "Datasets & Experiments" view. + +Usage:: + + # Attach pass/fail feedback to traces (primary workflow) + python -m deepagents_js_harbor.upload_to_langsmith \\ + jobs/terminal-bench/2026-02-10__18-17-58 \\ + --attach-feedback --project-name my-experiment + + # Upload structured experiment table + python -m deepagents_js_harbor.upload_to_langsmith \\ + jobs/terminal-bench/2026-02-10__18-17-58 \\ + --upload --dataset-name terminal-bench-v2 + + # Both at once + python -m deepagents_js_harbor.upload_to_langsmith \\ + jobs/terminal-bench/2026-02-10__18-17-58 \\ + --attach-feedback --upload + +Environment variables: + LANGSMITH_API_KEY -- Required. Your LangSmith API key. + LANGSMITH_ENDPOINT -- Optional. Defaults to https://api.smith.langchain.com + LANGSMITH_EXPERIMENT -- Used as default --project-name for feedback. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import uuid +from pathlib import Path + +from dotenv import load_dotenv +import requests + +load_dotenv() + + +# ----------------------------------------------------------------------- +# Defaults +# ----------------------------------------------------------------------- + +_DEFAULT_ENDPOINT = "https://api.smith.langchain.com" +_UPLOAD_PATH = "/api/v1/datasets/upload-experiment" + + +# ----------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------- + + +def _read_json(path: Path) -> dict | list | None: + """Read a JSON file, returning None if it doesn't exist or is invalid.""" + if not path.is_file(): + return None + try: + return json.loads(path.read_text()) + except (json.JSONDecodeError, OSError): + return None + + +def _read_text(path: Path) -> str | None: + """Read a text file, returning None if missing.""" + if not path.is_file(): + return None + try: + return path.read_text().strip() + except OSError: + return None + + +def _discover_trial_dirs(job_dir: Path) -> list[Path]: + """Find all trial subdirectories within a job directory. + + Trial dirs are any subdirectory containing a ``result.json``. + """ + trials = [] + for child in sorted(job_dir.iterdir()): + if child.is_dir() and (child / "result.json").is_file(): + trials.append(child) + return trials + + +def _parse_verifier(trial_dir: Path) -> dict: + """Read verifier outputs for a trial. + + Returns a dict with ``reward``, ``passed``, ``tests_passed``, + ``tests_failed``, ``tests_total``. + """ + reward_text = _read_text(trial_dir / "verifier" / "reward.txt") + reward = float(reward_text) if reward_text is not None else None + + ctrf = _read_json(trial_dir / "verifier" / "ctrf.json") + ctrf_summary = ctrf.get("results", {}).get("summary", {}) if ctrf else {} + tests_passed = ctrf_summary.get("passed", 0) if ctrf_summary else 0 + tests_failed = ctrf_summary.get("failed", 0) if ctrf_summary else 0 + tests_total = ctrf_summary.get("tests", 0) if ctrf_summary else 0 + + return { + "reward": reward, + "passed": reward is not None and reward > 0, + "tests_passed": tests_passed, + "tests_failed": tests_failed, + "tests_total": tests_total, + } + + +# ----------------------------------------------------------------------- +# Feedback attachment (primary workflow) +# ----------------------------------------------------------------------- + + +def _find_run_by_session_id( + client: "langsmith.Client", # noqa: F821 + project_name: str, + session_id: str, +) -> object | None: + """Find the root LangSmith run whose metadata contains ``harbor_session_id``. + + Returns the first matching Run object, or None if not found. + """ + filter_str = ( + f'and(eq(is_root, true), ' + f'eq(metadata_key, "harbor_session_id"), ' + f'eq(metadata_value, "{session_id}"))' + ) + try: + for run in client.list_runs( + project_name=project_name, + filter=filter_str, + limit=1, + ): + return run + except Exception as e: + print(f" Warning: Failed to query runs for {session_id}: {e}", file=sys.stderr) + return None + + +def attach_feedback( + job_dir: Path, + *, + project_name: str, + api_key: str | None = None, + dry_run: bool = False, +) -> int: + """Attach pass/fail feedback to LangSmith traces for a Harbor job. + + For each trial in the job: + 1. Reads verifier results (``reward.txt``, ``ctrf.json``). + 2. Finds the matching root run in LangSmith via + ``metadata.harbor_session_id``. + 3. Creates feedback (pass, reward, test_pass_rate) on that run. + + Returns the number of runs that received feedback. + """ + import langsmith + + api_key = api_key or os.environ.get("LANGSMITH_API_KEY", "") + if not api_key: + print("Error: LANGSMITH_API_KEY is not set.", file=sys.stderr) + return 0 + + job_dir = Path(job_dir).resolve() + if not job_dir.is_dir(): + print(f"Error: {job_dir} is not a directory.", file=sys.stderr) + return 0 + + trial_dirs = _discover_trial_dirs(job_dir) + if not trial_dirs: + print(f"Warning: no trial directories found in {job_dir}", file=sys.stderr) + return 0 + + client = langsmith.Client(api_key=api_key) + attached = 0 + + print(f"Attaching feedback to LangSmith traces in project '{project_name}'...") + print(f" Job: {job_dir.name}") + print(f" Trials: {len(trial_dirs)}") + print() + + for trial_dir in trial_dirs: + result = _read_json(trial_dir / "result.json") + if not result: + continue + + trial_name = result.get("trial_name", trial_dir.name) + task_name = result.get("task_name", trial_dir.name) + verifier = _parse_verifier(trial_dir) + + if dry_run: + status = "PASS" if verifier["passed"] else "FAIL" + print(f" [{status}] {task_name} (session={trial_name}, reward={verifier['reward']})") + continue + + run = _find_run_by_session_id(client, project_name, trial_name) + if run is None: + print(f" SKIP {task_name}: no matching trace found for {trial_name}") + continue + + run_id = run.id # type: ignore[union-attr] + + try: + client.create_feedback( + run_id=run_id, + key="pass", + score=1.0 if verifier["passed"] else 0.0, + comment=f"{'Passed' if verifier['passed'] else 'Failed'}: {task_name}", + ) + if verifier["reward"] is not None: + client.create_feedback( + run_id=run_id, + key="reward", + score=verifier["reward"], + comment=f"Verifier reward for {task_name}", + ) + if verifier["tests_total"] > 0: + client.create_feedback( + run_id=run_id, + key="test_pass_rate", + score=verifier["tests_passed"] / verifier["tests_total"], + comment=f"{verifier['tests_passed']}/{verifier['tests_total']} tests passed", + ) + + status = "PASS" if verifier["passed"] else "FAIL" + print(f" [{status}] {task_name} — feedback attached to run {run_id}") + attached += 1 + + except Exception as e: + print(f" ERROR {task_name}: failed to attach feedback: {e}", file=sys.stderr) + + print(f"\nDone. Attached feedback to {attached}/{len(trial_dirs)} trace(s).") + return attached + + +# ----------------------------------------------------------------------- +# Experiment upload (optional — structured comparison tables) +# ----------------------------------------------------------------------- + + +def _parse_trial(trial_dir: Path) -> dict | None: + """Parse a trial directory into a LangSmith upload-experiment row. + + Only reads ``result.json`` and verifier outputs — all execution + detail (LLM calls, tool calls, tokens) is already captured in the + auto-traced LangSmith runs. + """ + result = _read_json(trial_dir / "result.json") + if not result: + return None + + task_name = result.get("task_name", trial_dir.name) + trial_name = result.get("trial_name", trial_dir.name) + started_at = result.get("started_at") + finished_at = result.get("finished_at") + agent_info = result.get("agent_info", {}) + + verifier = _parse_verifier(trial_dir) + + # Exception info + exception_info = result.get("exception_info") + exception_text = _read_text(trial_dir / "exception.txt") + error = str(exception_info) if exception_info else (exception_text or None) + + row: dict = { + "row_id": str(uuid.uuid4()), + "run_name": trial_name, + "inputs": {"task_name": task_name}, + "expected_outputs": {"reward": 1.0}, + "actual_outputs": { + "reward": verifier["reward"], + "tests_passed": verifier["tests_passed"], + "tests_failed": verifier["tests_failed"], + "tests_total": verifier["tests_total"], + }, + "start_time": started_at or "1970-01-01T00:00:00Z", + "end_time": finished_at or "1970-01-01T00:00:00Z", + "run_metadata": { + "task_name": task_name, + "trial_name": trial_name, + "agent_name": agent_info.get("name", "unknown"), + }, + } + if error: + row["error"] = error + + scores: list[dict] = [] + if verifier["reward"] is not None: + scores.append({"key": "reward", "score": verifier["reward"]}) + scores.append({ + "key": "pass", + "score": 1 if verifier["passed"] else 0, + }) + if verifier["tests_total"] > 0: + scores.append({ + "key": "test_pass_rate", + "score": verifier["tests_passed"] / verifier["tests_total"], + }) + if error: + scores.append({"key": "error", "score": 0, "value": error[:500]}) + row["evaluation_scores"] = scores + + return row + + +def upload_job( + job_dir: Path, + *, + dataset_name: str, + experiment_name: str | None = None, + experiment_description: str | None = None, + api_key: str | None = None, + endpoint: str | None = None, + dry_run: bool = False, +) -> dict | None: + """Upload all trials in a Harbor job directory to LangSmith. + + Creates a structured experiment in the "Datasets & Experiments" view. + This is separate from (and complementary to) the auto-traced runs. + """ + api_key = api_key or os.environ.get("LANGSMITH_API_KEY", "") + if not api_key: + print("Error: LANGSMITH_API_KEY is not set.", file=sys.stderr) + return None + + endpoint = ( + endpoint + or os.environ.get("LANGSMITH_ENDPOINT", "").rstrip("/") + or _DEFAULT_ENDPOINT + ) + + job_dir = Path(job_dir).resolve() + if not job_dir.is_dir(): + print(f"Error: {job_dir} is not a directory.", file=sys.stderr) + return None + + trial_dirs = _discover_trial_dirs(job_dir) + if not trial_dirs: + print(f"Warning: no trial directories found in {job_dir}", file=sys.stderr) + return None + + rows = [r for td in trial_dirs if (r := _parse_trial(td)) is not None] + if not rows: + print(f"Warning: no valid trial results in {job_dir}", file=sys.stderr) + return None + + job_config = _read_json(job_dir / "config.json") or {} + job_result = _read_json(job_dir / "result.json") or {} + + exp_name = experiment_name or job_dir.name + exp_desc = experiment_description or ( + f"Harbor job {job_dir.name} | " + f"{job_result.get('n_total_trials', len(rows))} trial(s) | " + f"dataset: {job_config.get('datasets', [{}])[0].get('name', 'unknown')}" + ) + + all_starts = [r["start_time"] for r in rows if r.get("start_time")] + all_ends = [r["end_time"] for r in rows if r.get("end_time")] + + payload = { + "experiment_name": exp_name, + "experiment_description": exp_desc, + "dataset_name": dataset_name, + "dataset_description": f"Harbor benchmark dataset: {dataset_name}", + "experiment_start_time": min(all_starts) if all_starts else "1970-01-01T00:00:00Z", + "experiment_end_time": max(all_ends) if all_ends else "1970-01-01T00:00:00Z", + "results": rows, + } + + n_trials = len(rows) + + if dry_run: + print(f"\n{'='*60}") + print(f"DRY RUN — would upload to {endpoint}{_UPLOAD_PATH}") + print(f" Experiment: {exp_name}") + print(f" Dataset: {dataset_name}") + print(f" Trials: {n_trials}") + print(f"{'='*60}") + print(json.dumps(payload, indent=2)) + return None + + url = f"{endpoint}{_UPLOAD_PATH}" + headers = {"x-api-key": api_key, "Content-Type": "application/json"} + + print(f"Uploading {n_trials} trial(s) to LangSmith...") + print(f" Experiment: {exp_name}") + print(f" Dataset: {dataset_name}") + print(f" Endpoint: {url}") + + resp = requests.post(url, headers=headers, json=payload, timeout=60) + + if resp.status_code >= 400: + print(f"\nError: LangSmith returned {resp.status_code}", file=sys.stderr) + print(resp.text, file=sys.stderr) + return None + + result = resp.json() + experiment_info = result.get("experiment", {}) + dataset_info = result.get("dataset", {}) + print(f"\nSuccess!") + print(f" Experiment ID: {experiment_info.get('id', 'n/a')}") + print(f" Dataset ID: {dataset_info.get('id', 'n/a')}") + + langsmith_ui = endpoint.replace("api.smith", "smith") + if experiment_info.get("id") and dataset_info.get("id"): + print( + f" View at: {langsmith_ui}/datasets/" + f"{dataset_info['id']}/compare?" + f"selectedSessions={experiment_info['id']}" + ) + + return result + + +# ----------------------------------------------------------------------- +# CLI +# ----------------------------------------------------------------------- + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Upload Harbor job results to LangSmith.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "job_dir", + type=Path, + help=( + "Path to a Harbor job directory (e.g. " + "jobs/terminal-bench/2026-02-10__18-17-58). " + "With --all, pass the parent dataset directory instead." + ), + ) + parser.add_argument( + "--attach-feedback", + action="store_true", + help=( + "Attach pass/fail feedback to existing LangSmith traces. " + "Requires --project-name or LANGSMITH_EXPERIMENT." + ), + ) + parser.add_argument( + "--project-name", + default=None, + help=( + "LangSmith project to search for traces (for --attach-feedback). " + "Defaults to LANGSMITH_EXPERIMENT env var." + ), + ) + parser.add_argument( + "--upload", + action="store_true", + help=( + "Upload structured experiment data to LangSmith's " + "Datasets & Experiments view." + ), + ) + parser.add_argument( + "--dataset-name", + default=None, + help="LangSmith dataset name for --upload (defaults to parent dir name).", + ) + parser.add_argument( + "--experiment-name", + default=None, + help="Override the experiment name for --upload (defaults to job dir name).", + ) + parser.add_argument( + "--all", + action="store_true", + dest="upload_all", + help="Process all job directories under the given path.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print what would happen without calling the API.", + ) + + args = parser.parse_args() + job_dir: Path = args.job_dir.resolve() + + # Default: if neither --attach-feedback nor --upload specified, + # assume --attach-feedback (the primary workflow). + if not args.attach_feedback and not args.upload: + args.attach_feedback = True + + # ---- Feedback attachment ---- + if args.attach_feedback: + project = ( + args.project_name + or os.environ.get("LANGSMITH_EXPERIMENT", "").strip() + ) + if not project: + print( + "Error: --attach-feedback requires --project-name or " + "LANGSMITH_EXPERIMENT env var.", + file=sys.stderr, + ) + sys.exit(1) + + if args.upload_all: + job_dirs = sorted( + d for d in job_dir.iterdir() + if d.is_dir() and (d / "result.json").is_file() + ) + if not job_dirs: + print(f"No job directories found in {job_dir}", file=sys.stderr) + sys.exit(1) + for jd in job_dirs: + print(f"\n--- Feedback: {jd.name} ---") + attach_feedback(jd, project_name=project, dry_run=args.dry_run) + else: + attach_feedback(job_dir, project_name=project, dry_run=args.dry_run) + + # ---- Experiment upload ---- + if args.upload: + if args.upload_all: + dataset_name = args.dataset_name or job_dir.name + else: + dataset_name = args.dataset_name or job_dir.parent.name + + if args.upload_all: + job_dirs = sorted( + d for d in job_dir.iterdir() + if d.is_dir() and (d / "result.json").is_file() + ) + if not job_dirs: + print(f"No job directories found in {job_dir}", file=sys.stderr) + sys.exit(1) + + print(f"\nFound {len(job_dirs)} job(s) to upload.\n") + for jd in job_dirs: + print(f"\n--- Upload: {jd.name} ---") + upload_job( + jd, + dataset_name=dataset_name, + experiment_name=args.experiment_name, + dry_run=args.dry_run, + ) + else: + result = upload_job( + job_dir, + dataset_name=dataset_name, + experiment_name=args.experiment_name, + dry_run=args.dry_run, + ) + if result is None and not args.dry_run: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/libs/harbor/python/deepagents_js_harbor/wrapper.py b/libs/harbor/python/deepagents_js_harbor/wrapper.py new file mode 100644 index 000000000..832475834 --- /dev/null +++ b/libs/harbor/python/deepagents_js_harbor/wrapper.py @@ -0,0 +1,487 @@ +"""Harbor agent wrapper that runs deepagents-js via a JSON-RPC bridge. + +Spawns a Node.js process running the deepagents-js agent and bridges +Harbor's environment.exec() via newline-delimited JSON over stdin/stdout. +""" + +import asyncio +import json +import os +import shutil +from datetime import datetime, timezone +from pathlib import Path + +from dotenv import load_dotenv +from harbor.agents.base import BaseAgent +from harbor.environments.base import BaseEnvironment +from harbor.models.agent.context import AgentContext +from harbor.models.trajectories import ( + Agent, + FinalMetrics, + Observation, + ObservationResult, + Step, + ToolCall, + Trajectory, +) +from langsmith import trace +from langsmith.client import Client +from langsmith.run_helpers import get_current_run_tree + +load_dotenv() + +# Harmless TTY noise from non-interactive bash in Docker containers. +_TTY_NOISE = [ + "bash: cannot set terminal process group (-1): Inappropriate ioctl for device", + "bash: cannot set terminal process group (1): Inappropriate ioctl for device", + "bash: no job control in this shell", + "bash: initialize_job_control: no job control in background: Bad file descriptor", +] + +SYSTEM_MESSAGE = """\ +You are an autonomous agent executing tasks in a sandboxed environment. \ +Follow these instructions carefully. + +## WORKING DIRECTORY & ENVIRONMENT CONTEXT + +Your current working directory is: +{current_directory} + +{file_listing_header} +{file_listing} + +**IMPORTANT**: This directory information is provided for your convenience \ +at the start of the task. You should: +- Use this information to understand the initial environment state +- Avoid redundantly calling `ls` or similar commands just to list the same directory +- Only use file listing commands if you need updated information \ +(after creating/deleting files) or need to explore subdirectories +- Work in the /app directory unless explicitly instructed otherwise +""" + + +# --------------------------------------------------------------------------- +# Runner discovery +# --------------------------------------------------------------------------- + +# This file lives at: libs/harbor/python/deepagents_js_harbor/wrapper.py +# The TS source is at: libs/harbor/src/runner.ts +# The built JS is at: libs/harbor/dist/runner.js +_HARBOR_ROOT = Path(__file__).resolve().parent.parent.parent # libs/harbor/ + + +def _find_js_runner() -> str: + """Locate the JS runner script. + + Search order: + 1. DEEPAGENTS_JS_RUNNER env var (explicit override) + 2. Built dist/runner.js (production) + 3. Source src/runner.ts (development via tsx) + """ + env_path = os.environ.get("DEEPAGENTS_JS_RUNNER", "").strip() + if env_path and os.path.isfile(env_path): + return os.path.abspath(env_path) + + candidates = [ + _HARBOR_ROOT / "dist" / "runner.js", + _HARBOR_ROOT / "src" / "runner.ts", + ] + for candidate in candidates: + if candidate.is_file(): + return str(candidate) + + search_paths = "\n ".join( + [f"DEEPAGENTS_JS_RUNNER env var: {env_path or '(not set)'}"] + + [f"{c}" for c in candidates] + ) + raise FileNotFoundError( + f"Could not find the deepagents-js Harbor runner.\n" + f"Searched:\n {search_paths}\n\n" + f"Run 'pnpm build' in libs/harbor/ or set DEEPAGENTS_JS_RUNNER." + ) + + +def _get_node_command(runner_path: str) -> list[str]: + """Build the shell command to execute the runner.""" + if runner_path.endswith(".ts"): + tsx_path = shutil.which("tsx") + if tsx_path: + return [tsx_path, runner_path] + npx_path = shutil.which("npx") + if npx_path: + return [npx_path, "tsx", runner_path] + raise FileNotFoundError( + "Found TypeScript runner but tsx is not installed. " + "Install it with: npm install -g tsx" + ) + + node_path = shutil.which("node") + if not node_path: + raise FileNotFoundError("node is not installed or not in PATH") + return [node_path, runner_path] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _filter_tty_noise(stdout: str, stderr: str) -> str: + """Strip harmless TTY messages and merge stdout/stderr into one string.""" + bash_messages: list[str] = [] + for noise in _TTY_NOISE: + if noise in stdout: + bash_messages.append(noise) + stdout = stdout.replace(noise, "") + if noise in stderr: + stderr = stderr.replace(noise, "") + + stdout = stdout.strip() + stderr = stderr.strip() + + if bash_messages: + bash_text = "\n".join(bash_messages) + stderr = f"{bash_text}\n{stderr}".strip() if stderr else bash_text + + if stderr: + return f"{stdout}\n\nstderr: {stderr}" if stdout else f"\nstderr: {stderr}" + return stdout + + +# --------------------------------------------------------------------------- +# Agent wrapper +# --------------------------------------------------------------------------- + + +class DeepAgentsJSWrapper(BaseAgent): + """Harbor agent that delegates to deepagents-js via a Node.js subprocess.""" + + def __init__( + self, + logs_dir: Path, + model_name: str | None = None, + *args, + **kwargs, + ) -> None: + super().__init__(logs_dir, model_name, *args, **kwargs) + self._model_name = model_name or "anthropic:claude-opus-4-6" + + # Build instruction -> LangSmith example_id mapping (if experiment is set) + self._instruction_to_example_id: dict[str, str] = {} + experiment = os.environ.get("LANGSMITH_EXPERIMENT", "").strip() or None + if experiment: + try: + client = Client() + project = client.read_project(project_name=experiment) + for example in client.list_examples(dataset_id=project.reference_dataset_id): + instruction = example.inputs.get("instruction") if example.inputs else None + if instruction: + self._instruction_to_example_id[instruction] = str(example.id) + except Exception as e: + print(f"Warning: Failed to build instruction->example_id mapping: {e}") + + @staticmethod + def name() -> str: + return "deepagent-js-harbor" + + def version(self) -> str | None: + return "0.0.1" + + async def setup(self, environment: BaseEnvironment) -> None: + pass + + # ------------------------------------------------------------------ + # System prompt + # ------------------------------------------------------------------ + + async def _format_system_prompt(self, environment: BaseEnvironment) -> str: + """Build the system prompt with working directory and file listing context. + + Calls environment.exec() directly instead of going through a + separate HarborSandbox backend. + """ + pwd_result = await environment.exec("pwd") + current_dir = (pwd_result.stdout or "/app").strip() + + ls_result = await environment.exec( + "ls -1 2>/dev/null | head -50" + ) + files = [f for f in (ls_result.stdout or "").strip().split("\n") if f] + total_files = len(files) + first_10 = files[:10] + + if total_files == 0: + header = "Current directory is empty." + listing = "" + elif total_files <= 10: + count_text = "1 file" if total_files == 1 else f"{total_files} files" + header = f"Files in current directory ({count_text}):" + listing = "\n".join(f"{i + 1}. {f}" for i, f in enumerate(first_10)) + else: + header = f"Files in current directory (showing first 10 of {total_files}):" + listing = "\n".join(f"{i + 1}. {f}" for i, f in enumerate(first_10)) + + return SYSTEM_MESSAGE.format( + current_directory=current_dir, + file_listing_header=header, + file_listing=listing, + ) + + # ------------------------------------------------------------------ + # Main entry point + # ------------------------------------------------------------------ + + async def run( + self, + instruction: str, + environment: BaseEnvironment, + context: AgentContext, + ) -> None: + configuration = json.loads(environment.trial_paths.config_path.read_text()) + if not isinstance(configuration, dict): + raise AssertionError( + f"Unexpected configuration format. Expected a dict got {type(configuration)}." + ) + + system_prompt = await self._format_system_prompt(environment) + + runner_path = _find_js_runner() + node_cmd = _get_node_command(runner_path) + print(f"[DeepAgentsJS] Spawning: {' '.join(node_cmd)}") + + # 10 MB stream buffer (default 64 KB is too small for the "done" message + # which contains the full serialized message history). + process = await asyncio.create_subprocess_exec( + *node_cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=os.environ.copy(), + limit=100 * 1024 * 1024, + ) + + try: + experiment_name = os.environ.get("LANGSMITH_EXPERIMENT", "").strip() or None + + if experiment_name: + metadata = { + "task_instruction": instruction, + "model": self._model_name, + "harbor_session_id": environment.session_id, + "agent_mode": "js", + **configuration, + } + example_id = self._instruction_to_example_id.get(instruction) + + with trace( + name=environment.session_id, + reference_example_id=example_id, + inputs={"instruction": instruction}, + project_name=experiment_name, + metadata=metadata, + ) as run_tree: + # Propagate LangSmith trace context to the Node.js subprocess + # so its LLM calls and tool invocations nest under this trace. + # See: https://docs.langchain.com/langsmith/distributed-tracing + rt = get_current_run_tree() + langsmith_headers = rt.to_headers() if rt else {} + + await self._send(process, { + "type": "init", + "instruction": instruction, + "sessionId": environment.session_id, + "model": self._model_name, + "systemPrompt": system_prompt, + "langsmithHeaders": langsmith_headers, + }) + + result_messages = await self._bridge_loop(process, environment) + last_ai = next( + (m for m in reversed(result_messages) if m.get("role") == "ai"), + None, + ) + if last_ai: + run_tree.end(outputs={"last_message": last_ai.get("content", "")}) + else: + await self._send(process, { + "type": "init", + "instruction": instruction, + "sessionId": environment.session_id, + "model": self._model_name, + "systemPrompt": system_prompt, + }) + result_messages = await self._bridge_loop(process, environment) + + self._save_trajectory(environment, instruction, result_messages) + + finally: + if process.returncode is None: + process.terminate() + try: + await asyncio.wait_for(process.wait(), timeout=5.0) + except asyncio.TimeoutError: + process.kill() + + if process.stderr: + stderr_data = await process.stderr.read() + if stderr_data: + for line in stderr_data.decode("utf-8", errors="replace").strip().split("\n"): + if line.strip(): + print(f"[DeepAgentsJS] {line}") + + # ------------------------------------------------------------------ + # JSON-RPC bridge + # ------------------------------------------------------------------ + + @staticmethod + async def _send(process: asyncio.subprocess.Process, msg: dict) -> None: + assert process.stdin is not None + process.stdin.write((json.dumps(msg) + "\n").encode("utf-8")) + await process.stdin.drain() + + async def _bridge_loop( + self, + process: asyncio.subprocess.Process, + environment: BaseEnvironment, + ) -> list[dict]: + """Proxy exec requests from Node to Harbor and return final messages.""" + assert process.stdout is not None + + while True: + line = await process.stdout.readline() + if not line: + returncode = await process.wait() + raise RuntimeError( + f"Node process exited unexpectedly with code {returncode}" + ) + + line_str = line.decode("utf-8").strip() + if not line_str: + continue + + try: + msg = json.loads(line_str) + except json.JSONDecodeError: + print(f"[DeepAgentsJS] Warning: unparseable line: {line_str[:200]}") + continue + + msg_type = msg.get("type") + + if msg_type == "exec_request": + result = await environment.exec(msg["command"]) + output = _filter_tty_noise(result.stdout or "", result.stderr or "") + await self._send(process, { + "type": "exec_response", + "id": msg["id"], + "output": output, + "exitCode": result.return_code, + }) + + elif msg_type == "done": + return msg.get("messages", []) + + elif msg_type == "error": + raise RuntimeError( + f"JS agent error: {msg.get('message', 'Unknown')}\n" + f"{msg.get('stack', '')}" + ) + else: + print(f"[DeepAgentsJS] Warning: unknown message type: {msg_type}") + + # ------------------------------------------------------------------ + # Trajectory + # ------------------------------------------------------------------ + + def _save_trajectory( + self, + environment: BaseEnvironment, + instruction: str, + messages: list[dict], + ) -> None: + """Convert serialized JS messages to ATIF trajectory format.""" + total_prompt_tokens = 0 + total_completion_tokens = 0 + + steps: list[Step] = [ + Step( + step_id=1, + timestamp=datetime.now(timezone.utc).isoformat(), + source="user", + message=instruction, + ), + ] + observations: list[ObservationResult] = [] + pending_step: Step | None = None + + for msg in messages: + role = msg.get("role", "") + + if role == "ai": + usage = msg.get("usage") + if usage: + total_prompt_tokens += usage.get("input_tokens", 0) + total_completion_tokens += usage.get("output_tokens", 0) + + # Flush pending step + if pending_step is not None: + if pending_step.tool_calls and observations: + pending_step.observation = Observation(results=observations) + observations = [] + steps.append(pending_step) + pending_step = None + + tool_calls = [ + ToolCall( + tool_call_id=tc.get("id", ""), + function_name=tc.get("name", ""), + arguments=tc.get("args", {}), + ) + for tc in msg.get("toolCalls", []) + ] + + new_step = Step( + step_id=steps[-1].step_id + 1 if steps else 0, + timestamp=datetime.now(timezone.utc).isoformat(), + source="agent", + message=msg.get("content", ""), + tool_calls=tool_calls or None, + ) + + if tool_calls: + pending_step = new_step + else: + steps.append(new_step) + + elif role == "tool": + observations.append( + ObservationResult( + source_call_id=msg.get("toolCallId", ""), + content=msg.get("content", ""), + ) + ) + # Skip human/system messages + + # Flush final pending step + if pending_step is not None: + if pending_step.tool_calls and observations: + pending_step.observation = Observation(results=observations) + steps.append(pending_step) + + trajectory = Trajectory( + schema_version="ATIF-v1.2", + session_id=environment.session_id, + agent=Agent( + name=self.name(), + version=self.version() or "unknown", + model_name=self._model_name, + extra={"framework": "deepagents-js", "runtime": "node"}, + ), + steps=steps, + final_metrics=FinalMetrics( + total_prompt_tokens=total_prompt_tokens or None, + total_completion_tokens=total_completion_tokens or None, + total_steps=len(steps), + ), + ) + trajectory_path = self.logs_dir / "trajectory.json" + trajectory_path.write_text(json.dumps(trajectory.to_json_dict(), indent=2)) diff --git a/libs/harbor/python/langsmith-env-config.yaml b/libs/harbor/python/langsmith-env-config.yaml new file mode 100644 index 000000000..eb28d6f2d --- /dev/null +++ b/libs/harbor/python/langsmith-env-config.yaml @@ -0,0 +1,7 @@ +# Harbor job config for running benchmarks with a LangSmith sandbox. +# Usage: harbor run -c langsmith-env-config.yaml --dataset terminal-bench@2.0 ... +# +# The container image is automatically resolved from the task's docker_image +# field in task.toml. Override with template_image if needed. +environment: + import_path: "deepagents_js_harbor.langsmith_environment:LangSmithEnvironment" diff --git a/libs/harbor/python/pyproject.toml b/libs/harbor/python/pyproject.toml new file mode 100644 index 000000000..955f0fb3e --- /dev/null +++ b/libs/harbor/python/pyproject.toml @@ -0,0 +1,46 @@ +[project] +name = "deepagents-js-harbor" +version = "0.0.1" +description = "Harbor benchmark wrapper for deepagents-js (TypeScript agent via JSON-RPC bridge)" +license = { text = "MIT" } +requires-python = ">=3.12" +keywords = ["agents", "ai", "harbor", "benchmark", "terminal-bench"] +dependencies = [ + "harbor>=0.1.12", + "langsmith>=0.7.0", + "python-dotenv>=1.0.0", + "aiohttp>=3.9.0", + "toml>=0.10.0", +] + +[dependency-groups] +test = [ + "pytest>=8.4.2", + "pytest-asyncio>=1.2.0", + "ruff>=0.8.0", +] + +[build-system] +requires = ["setuptools>=75.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["."] +include = ["deepagents_js_harbor*"] + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "I", "TID252"] +ignore = ["E501"] + +[tool.ruff.lint.isort] +force-single-line = false +combine-as-imports = true +known-first-party = ["deepagents_js_harbor"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +asyncio_mode = "auto" diff --git a/libs/harbor/scripts/harbor_langsmith.py b/libs/harbor/scripts/harbor_langsmith.py new file mode 100644 index 000000000..cc3eb113f --- /dev/null +++ b/libs/harbor/scripts/harbor_langsmith.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +""" +CLI for LangSmith integration with Harbor. + +Provides commands for: +- Creating LangSmith datasets from Harbor tasks +- Creating experiment sessions + +For attaching feedback to traces after a benchmark run, use +``upload_to_langsmith.py`` instead:: + + cd python + uv run python -m deepagents_js_harbor.upload_to_langsmith \\ + ../jobs/terminal-bench/ --attach-feedback --project-name +""" + +import argparse +import asyncio +import datetime +import os +import tempfile +from pathlib import Path + +import aiohttp +import toml +from dotenv import load_dotenv +from harbor.models.dataset_item import DownloadedDatasetItem +from harbor.registry.client import RegistryClientFactory +from langsmith import Client + +load_dotenv() + +LANGSMITH_API_URL = os.getenv("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com") +HEADERS = { + "x-api-key": os.getenv("LANGSMITH_API_KEY"), +} + + +# ============================================================================ +# CREATE DATASET +# ============================================================================ + + +def _read_instruction(task_path: Path) -> str: + """Read the instruction.md file from a task directory.""" + instruction_file = task_path / "instruction.md" + if instruction_file.exists(): + return instruction_file.read_text() + return "" + + +def _read_task_metadata(task_path: Path) -> dict: + """Read metadata from task.toml file.""" + task_toml = task_path / "task.toml" + if task_toml.exists(): + return toml.load(task_toml) + return {} + + +def _read_solution(task_path: Path) -> str | None: + """Read the solution script from a task directory. + + Args: + task_path: Path to the task directory + + Returns: + Solution script content if it exists, None otherwise + """ + solution_file = task_path / "solution" / "solve.sh" + if solution_file.exists(): + return solution_file.read_text() + return None + + +def _scan_downloaded_tasks(downloaded_tasks: list[DownloadedDatasetItem]) -> list: + """Scan downloaded tasks and extract all task information. + + Args: + downloaded_tasks: List of DownloadedDatasetItem objects from Harbor + + Returns: + List of example dictionaries for LangSmith + """ + examples = [] + + for downloaded_task in downloaded_tasks: + task_path = downloaded_task.downloaded_path + + instruction = _read_instruction(task_path) + metadata = _read_task_metadata(task_path) + solution = _read_solution(task_path) + task_name = downloaded_task.id.name + task_id = str(downloaded_task.id) + + if instruction: + # Build outputs dict with reference solution if available + outputs = {} + if solution: + outputs["reference_solution"] = solution + + example = { + "inputs": { + "task_id": task_id, + "task_name": task_name, + "instruction": instruction, + "metadata": metadata.get("metadata", {}), + }, + "outputs": outputs, + } + examples.append(example) + + solution_status = "with solution" if solution else "without solution" + print(f"Added task: {task_name} (ID: {task_id}) [{solution_status}]") + + return examples + + +def create_dataset( + dataset_name: str, + version: str = "head", + overwrite: bool = False, + langsmith_name: str | None = None, +) -> None: + """Create a LangSmith dataset from Harbor tasks. + + Args: + dataset_name: Harbor registry dataset name (e.g., 'terminal-bench') + version: Harbor dataset version (default: 'head') + overwrite: Whether to overwrite cached remote tasks + langsmith_name: Name for the LangSmith dataset (defaults to dataset_name) + """ + ls_dataset_name = langsmith_name or dataset_name + + langsmith_client = Client() + output_dir = Path(tempfile.mkdtemp(prefix="harbor_tasks_")) + print(f"Using temporary directory: {output_dir}") + + # Download from Harbor registry + print(f"Downloading dataset '{dataset_name}@{version}' from Harbor registry...") + registry_client = RegistryClientFactory.create() + downloaded_tasks = registry_client.download_dataset( + name=dataset_name, + version=version, + overwrite=overwrite, + output_dir=output_dir, + ) + + print(f"Downloaded {len(downloaded_tasks)} tasks") + examples = _scan_downloaded_tasks(downloaded_tasks) + + print(f"\nFound {len(examples)} tasks") + + # Create the dataset + print(f"\nCreating LangSmith dataset: {ls_dataset_name}") + dataset = langsmith_client.create_dataset(dataset_name=ls_dataset_name) + + print(f"Dataset created with ID: {dataset.id}") + + # Add examples to the dataset + print(f"\nAdding {len(examples)} examples to dataset...") + langsmith_client.create_examples(dataset_id=dataset.id, examples=examples) + + print(f"\nSuccessfully created dataset '{ls_dataset_name}' with {len(examples)} examples") + print(f"Dataset ID: {dataset.id}") + + +# ============================================================================ +# CREATE EXPERIMENT +# ============================================================================ + + +async def _create_experiment_session( + dataset_id: str, name: str, session: aiohttp.ClientSession +) -> dict: + """Create a LangSmith experiment session. + + Args: + dataset_id: LangSmith dataset ID to associate with + name: Name for the experiment session + session: aiohttp ClientSession for making requests + + Returns: + Experiment session dictionary with 'id' field + """ + async with session.post( + f"{LANGSMITH_API_URL}/sessions", + headers=HEADERS, + json={ + "start_time": datetime.datetime.now(datetime.timezone.utc).isoformat(), + "reference_dataset_id": dataset_id, + "name": name, + }, + ) as experiment_response: + if experiment_response.status == 200: + return await experiment_response.json() + else: + raise Exception( + f"Failed to create experiment: {experiment_response.status} " + f"{await experiment_response.text()}" + ) + + +async def _get_dataset_by_name(dataset_name: str, session: aiohttp.ClientSession) -> dict: + """Get a LangSmith dataset by name. + + Args: + dataset_name: Name of the dataset to retrieve + session: aiohttp ClientSession for making requests + + Returns: + Dataset dictionary with 'id' field + """ + async with session.get( + f"{LANGSMITH_API_URL}/datasets?name={dataset_name}&limit=1", + headers=HEADERS, + ) as response: + if response.status == 200: + datasets = await response.json() + if len(datasets) > 0: + return datasets[0] + else: + raise Exception(f"Dataset '{dataset_name}' not found") + else: + raise Exception( + f"Failed to get dataset: {response.status} {await response.text()}" + ) + + +async def create_experiment_async(dataset_name: str, experiment_name: str | None = None) -> str: + """Create a LangSmith experiment session for the given dataset. + + Args: + dataset_name: Name of the LangSmith dataset to create experiment for + experiment_name: Optional name for the experiment (auto-generated if not provided) + + Returns: + The experiment session ID + """ + async with aiohttp.ClientSession() as session: + # Get the dataset + dataset = await _get_dataset_by_name(dataset_name, session) + dataset_id = dataset["id"] + print(f"Found dataset '{dataset_name}' with ID: {dataset_id}") + + # Generate experiment name if not provided + if experiment_name is None: + timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d_%H-%M-%S") + experiment_name = f"harbor-experiment-{timestamp}" + + # Create experiment session + print(f"Creating experiment session: {experiment_name}") + experiment_session = await _create_experiment_session(dataset_id, experiment_name, session) + session_id = experiment_session["id"] + tenant_id = experiment_session["tenant_id"] + + print("Experiment created successfully!") + print(f" Session ID: {session_id}") + print( + f" View at: https://smith.langchain.com/o/{tenant_id}/datasets/" + f"{dataset_id}/compare?selectedSessions={session_id}" + ) + print("\nTo run Harbor with this experiment, use:") + print(f" LANGSMITH_EXPERIMENT={experiment_name} make bench-docker") + + return session_id + + +def create_experiment(dataset_name: str, experiment_name: str | None = None) -> str: + """Synchronous wrapper for create_experiment_async.""" + return asyncio.run(create_experiment_async(dataset_name, experiment_name)) + + +# ============================================================================ +# CLI +# ============================================================================ + + +def main() -> None: + """Main CLI entrypoint with subcommands.""" + parser = argparse.ArgumentParser( + description=( + "Harbor-LangSmith integration CLI for managing datasets and experiments.\n\n" + "For attaching feedback to traces after a benchmark run, use\n" + "upload_to_langsmith.py instead (see README for details)." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + subparsers = parser.add_subparsers(dest="command", help="Available commands", required=True) + + # ======================================================================== + # create-dataset subcommand + # ======================================================================== + dataset_parser = subparsers.add_parser( + "create-dataset", + help="Create a LangSmith dataset from Harbor tasks", + ) + dataset_parser.add_argument( + "dataset_name", + type=str, + help="Harbor registry dataset name (e.g., 'terminal-bench')", + ) + dataset_parser.add_argument( + "--version", + type=str, + default="head", + help="Harbor dataset version (default: 'head')", + ) + dataset_parser.add_argument( + "--overwrite", + action="store_true", + help="Overwrite cached remote tasks", + ) + dataset_parser.add_argument( + "--langsmith-name", + type=str, + default=None, + help="Name for the LangSmith dataset (defaults to dataset_name)", + ) + + # ======================================================================== + # create-experiment subcommand + # ======================================================================== + experiment_parser = subparsers.add_parser( + "create-experiment", + help="Create an experiment session for a dataset", + ) + experiment_parser.add_argument( + "dataset_name", + type=str, + help="LangSmith dataset name (must already exist)", + ) + experiment_parser.add_argument( + "--name", + type=str, + help="Name for the experiment (auto-generated if not provided)", + ) + + args = parser.parse_args() + + # Route to appropriate command + if args.command == "create-dataset": + create_dataset( + dataset_name=args.dataset_name, + version=args.version, + overwrite=args.overwrite, + langsmith_name=args.langsmith_name, + ) + elif args.command == "create-experiment": + create_experiment( + dataset_name=args.dataset_name, + experiment_name=args.name, + ) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/libs/harbor/src/index.ts b/libs/harbor/src/index.ts new file mode 100644 index 000000000..80f1850a8 --- /dev/null +++ b/libs/harbor/src/index.ts @@ -0,0 +1,29 @@ +/** + * Harbor integration for deepagents-js. + * + * Provides an RPC-based sandbox backend that bridges deepagents-js + * to Harbor benchmark environments via a JSON-RPC stdin/stdout protocol. + * + * @packageDocumentation + */ + +export { RpcSandbox } from "./rpc-sandbox.js"; + +export type { + InitMessage, + ExecRequest, + ExecResponse, + DoneMessage, + ErrorMessage, + SerializedMessage, + IncomingMessage, + OutgoingMessage, +} from "./rpc-protocol.js"; + +export { + sendMessage, + log, + createStdinReader, + parseIncomingMessage, + nextRequestId, +} from "./rpc-protocol.js"; diff --git a/libs/harbor/src/rpc-protocol.test.ts b/libs/harbor/src/rpc-protocol.test.ts new file mode 100644 index 000000000..1e47638fd --- /dev/null +++ b/libs/harbor/src/rpc-protocol.test.ts @@ -0,0 +1,97 @@ +import { describe, it, expect, beforeEach } from "vitest"; + +import { + parseIncomingMessage, + nextRequestId, + resetRequestCounter, + type InitMessage, + type ExecResponse, +} from "./rpc-protocol.js"; + +describe("rpc-protocol", () => { + describe("parseIncomingMessage", () => { + it("should parse a valid init message", () => { + const json = JSON.stringify({ + type: "init", + instruction: "Write hello world", + sessionId: "session-123", + model: "anthropic:claude-sonnet-4-5-20250929", + systemPrompt: "You are an agent.", + }); + + const msg = parseIncomingMessage(json); + expect(msg).not.toBeNull(); + expect(msg!.type).toBe("init"); + + const init = msg as InitMessage; + expect(init.instruction).toBe("Write hello world"); + expect(init.sessionId).toBe("session-123"); + expect(init.model).toBe("anthropic:claude-sonnet-4-5-20250929"); + expect(init.systemPrompt).toBe("You are an agent."); + }); + + it("should parse a valid exec_response message", () => { + const json = JSON.stringify({ + type: "exec_response", + id: "req-1", + output: "file1.txt\nfile2.txt", + exitCode: 0, + }); + + const msg = parseIncomingMessage(json); + expect(msg).not.toBeNull(); + expect(msg!.type).toBe("exec_response"); + + const resp = msg as ExecResponse; + expect(resp.id).toBe("req-1"); + expect(resp.output).toBe("file1.txt\nfile2.txt"); + expect(resp.exitCode).toBe(0); + }); + + it("should return null for empty strings", () => { + expect(parseIncomingMessage("")).toBeNull(); + expect(parseIncomingMessage(" ")).toBeNull(); + }); + + it("should return null for invalid JSON", () => { + expect(parseIncomingMessage("not json")).toBeNull(); + expect(parseIncomingMessage("{broken")).toBeNull(); + }); + + it("should return null for JSON without type field", () => { + expect(parseIncomingMessage('{"id": "req-1"}')).toBeNull(); + }); + + it("should handle exec_response with non-zero exit code", () => { + const json = JSON.stringify({ + type: "exec_response", + id: "req-5", + output: "command not found", + exitCode: 127, + }); + + const msg = parseIncomingMessage(json) as ExecResponse; + expect(msg.exitCode).toBe(127); + expect(msg.output).toBe("command not found"); + }); + }); + + describe("nextRequestId", () => { + beforeEach(() => { + resetRequestCounter(); + }); + + it("should generate incrementing request IDs", () => { + expect(nextRequestId()).toBe("req-1"); + expect(nextRequestId()).toBe("req-2"); + expect(nextRequestId()).toBe("req-3"); + }); + + it("should reset properly", () => { + nextRequestId(); + nextRequestId(); + resetRequestCounter(); + expect(nextRequestId()).toBe("req-1"); + }); + }); +}); diff --git a/libs/harbor/src/rpc-protocol.ts b/libs/harbor/src/rpc-protocol.ts new file mode 100644 index 000000000..f63c952b3 --- /dev/null +++ b/libs/harbor/src/rpc-protocol.ts @@ -0,0 +1,189 @@ +/** + * JSON-RPC protocol types and stdio helpers for the Harbor bridge. + * + * Communication uses newline-delimited JSON (NDJSON) over stdio. + * Python writes to node's stdin, node writes to Python via stdout. + * Node uses stderr for logging so it doesn't interfere with the protocol. + * + * @packageDocumentation + */ + +import { createInterface, type Interface as ReadlineInterface } from "readline"; + +// ============================================================================ +// Message types: Python -> Node (sent over stdin) +// ============================================================================ + +/** + * Initialization message sent by Python to start the agent run. + */ +export interface InitMessage { + type: "init"; + /** The task instruction for the agent */ + instruction: string; + /** Harbor session ID for this task */ + sessionId: string; + /** Model name (e.g., "anthropic:claude-sonnet-4-5-20250929") */ + model: string; + /** Pre-formatted system prompt with directory context */ + systemPrompt: string; + /** + * LangSmith distributed tracing headers (optional). + * When present, the runner uses these to nest all agent traces + * (LLM calls, tool invocations) under the parent Python trace. + * @see https://docs.langchain.com/langsmith/distributed-tracing + */ + langsmithHeaders?: Record; +} + +/** + * Response to an execute request, sent by Python after running the command + * via Harbor's environment.exec(). + */ +export interface ExecResponse { + type: "exec_response"; + /** Request ID to match with the pending request */ + id: string; + /** Combined stdout/stderr output */ + output: string; + /** Process exit code */ + exitCode: number; +} + +/** Messages that Python sends to Node over stdin. */ +export type IncomingMessage = InitMessage | ExecResponse; + +// ============================================================================ +// Message types: Node -> Python (sent over stdout) +// ============================================================================ + +/** + * Request to execute a shell command in the Harbor sandbox. + * Python will call environment.exec() and send back an ExecResponse. + */ +export interface ExecRequest { + type: "exec_request"; + /** Unique request ID for matching responses */ + id: string; + /** Shell command to execute */ + command: string; +} + +/** + * Serialized message from the LangChain message history, + * used to transfer the agent result back to Python for trajectory saving. + */ +export interface SerializedMessage { + /** Message type: "human", "ai", "tool", "system" */ + role: string; + /** Text content or stringified content blocks */ + content: string; + /** For AI messages: token usage */ + usage?: { + input_tokens: number; + output_tokens: number; + }; + /** For AI messages: tool calls in content_blocks */ + toolCalls?: Array<{ + id: string; + name: string; + args: Record; + }>; + /** For tool messages: the tool_call_id */ + toolCallId?: string; +} + +/** + * Final message sent when the agent run completes. + * Contains the full message history for trajectory saving. + */ +export interface DoneMessage { + type: "done"; + /** Serialized LangChain messages for ATIF trajectory */ + messages: SerializedMessage[]; +} + +/** + * Error message sent if the agent run fails. + */ +export interface ErrorMessage { + type: "error"; + /** Error description */ + message: string; + /** Optional stack trace */ + stack?: string; +} + +/** Messages that Node sends to Python over stdout. */ +export type OutgoingMessage = ExecRequest | DoneMessage | ErrorMessage; + +// ============================================================================ +// Stdio helpers +// ============================================================================ + +/** + * Write a JSON message to stdout (Node -> Python). + * Each message is a single line of JSON followed by a newline. + */ +export function sendMessage(msg: OutgoingMessage): void { + process.stdout.write(JSON.stringify(msg) + "\n"); +} + +/** + * Log a message to stderr (does not interfere with the protocol). + */ +export function log(...args: unknown[]): void { + process.stderr.write( + `[harbor-js] ${args.map((a) => (typeof a === "string" ? a : JSON.stringify(a))).join(" ")}\n`, + ); +} + +/** + * Create a readline-based line reader for stdin. + * Returns an async iterator that yields parsed JSON messages. + */ +export function createStdinReader(): ReadlineInterface { + return createInterface({ + input: process.stdin, + crlfDelay: Infinity, + }); +} + +/** + * Parse a single line from stdin into a typed message. + * Returns null if the line is empty or unparseable. + */ +export function parseIncomingMessage(line: string): IncomingMessage | null { + const trimmed = line.trim(); + if (!trimmed) return null; + + try { + const parsed = JSON.parse(trimmed) as IncomingMessage; + if (!parsed.type) return null; + return parsed; + } catch { + log("Failed to parse incoming message:", trimmed); + return null; + } +} + +// ============================================================================ +// Request ID generation +// ============================================================================ + +let _requestCounter = 0; + +/** + * Generate a unique request ID for exec requests. + */ +export function nextRequestId(): string { + _requestCounter += 1; + return `req-${_requestCounter}`; +} + +/** + * Reset the request counter (for testing). + */ +export function resetRequestCounter(): void { + _requestCounter = 0; +} diff --git a/libs/harbor/src/rpc-sandbox.test.ts b/libs/harbor/src/rpc-sandbox.test.ts new file mode 100644 index 000000000..43f2e59b1 --- /dev/null +++ b/libs/harbor/src/rpc-sandbox.test.ts @@ -0,0 +1,300 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { createInterface } from "readline"; +import { Readable } from "stream"; + +import { RpcSandbox } from "./rpc-sandbox.js"; +import { resetRequestCounter } from "./rpc-protocol.js"; + +/** + * Helper to create an RpcSandbox with a mock stdin that we can write to. + * We create a readable stream that we can push data into, and wrap it + * with readline for the sandbox to consume. + */ +function createTestSandbox(sessionId = "test-session") { + const inputStream = new Readable({ + read() { + // no-op: we push data manually + }, + }); + + const reader = createInterface({ + input: inputStream, + crlfDelay: Infinity, + }); + + const sandbox = new RpcSandbox(sessionId, reader); + + // Capture stdout writes (outgoing messages) + const writtenMessages: string[] = []; + const originalWrite = process.stdout.write.bind(process.stdout); + const mockWrite = vi + .fn() + .mockImplementation( + ( + chunk: string | Uint8Array, + _encodingOrCb?: BufferEncoding | ((error?: Error | null) => void), + _cb?: (error?: Error | null) => void, + ): boolean => { + const text = + typeof chunk === "string" ? chunk : Buffer.from(chunk).toString(); + writtenMessages.push(text); + return true; + }, + ); + process.stdout.write = mockWrite as typeof process.stdout.write; + + return { + sandbox, + inputStream, + reader, + writtenMessages, + restore() { + process.stdout.write = originalWrite; + reader.close(); + inputStream.destroy(); + }, + }; +} + +describe("RpcSandbox", () => { + beforeEach(() => { + resetRequestCounter(); + }); + + it("should have the correct ID", () => { + const { sandbox, restore } = createTestSandbox("my-session"); + expect(sandbox.id).toBe("my-session"); + restore(); + }); + + it("should send exec_request on execute() and resolve on exec_response", async () => { + const { sandbox, inputStream, writtenMessages, restore } = + createTestSandbox(); + + sandbox.startListening(); + + // Call execute - this will send a request and wait for a response + const executePromise = sandbox.execute("echo hello"); + + // Wait a tick for the request to be sent + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Check that the request was sent to stdout + expect(writtenMessages.length).toBe(1); + const request = JSON.parse(writtenMessages[0].trim()); + expect(request.type).toBe("exec_request"); + expect(request.id).toBe("req-1"); + expect(request.command).toBe("echo hello"); + + // Simulate Python sending back a response + inputStream.push( + JSON.stringify({ + type: "exec_response", + id: "req-1", + output: "hello", + exitCode: 0, + }) + "\n", + ); + + // Wait for the response to be processed + const result = await executePromise; + expect(result.output).toBe("hello"); + expect(result.exitCode).toBe(0); + expect(result.truncated).toBe(false); + + restore(); + }); + + it("should handle multiple concurrent execute() calls", async () => { + const { sandbox, inputStream, writtenMessages, restore } = + createTestSandbox(); + + sandbox.startListening(); + + // Fire two requests concurrently + const promise1 = sandbox.execute("echo one"); + const promise2 = sandbox.execute("echo two"); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Both requests should have been sent + expect(writtenMessages.length).toBe(2); + + // Send responses (out of order to test matching) + inputStream.push( + JSON.stringify({ + type: "exec_response", + id: "req-2", + output: "two", + exitCode: 0, + }) + "\n", + ); + inputStream.push( + JSON.stringify({ + type: "exec_response", + id: "req-1", + output: "one", + exitCode: 0, + }) + "\n", + ); + + const [result1, result2] = await Promise.all([promise1, promise2]); + expect(result1.output).toBe("one"); + expect(result2.output).toBe("two"); + + restore(); + }); + + it("should reject pending requests on dispose()", async () => { + const { sandbox, restore } = createTestSandbox(); + + sandbox.startListening(); + + // Fire a request that will never get a response + const promise = sandbox.execute("echo never"); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Dispose should reject the pending request + sandbox.dispose(); + + await expect(promise).rejects.toThrow("RpcSandbox disposed"); + + restore(); + }); + + it("should route non-exec messages to the message handler", async () => { + const { sandbox, inputStream, restore } = createTestSandbox(); + + const receivedMessages: unknown[] = []; + sandbox.setMessageHandler((msg) => { + receivedMessages.push(msg); + }); + + sandbox.startListening(); + + // Send an init message (not an exec_response) + inputStream.push( + JSON.stringify({ + type: "init", + instruction: "do something", + sessionId: "s-1", + model: "test-model", + systemPrompt: "test prompt", + }) + "\n", + ); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(receivedMessages.length).toBe(1); + expect((receivedMessages[0] as { type: string }).type).toBe("init"); + + restore(); + }); + + describe("uploadFiles", () => { + it("should upload files via execute using base64 encoding", async () => { + const { sandbox, inputStream, writtenMessages, restore } = + createTestSandbox(); + + sandbox.startListening(); + + const encoder = new TextEncoder(); + const uploadPromise = sandbox.uploadFiles([ + ["/app/test.txt", encoder.encode("hello world")], + ]); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Should have sent an exec_request + expect(writtenMessages.length).toBe(1); + const request = JSON.parse(writtenMessages[0].trim()); + expect(request.type).toBe("exec_request"); + expect(request.command).toContain("base64"); + + // Respond with success + inputStream.push( + JSON.stringify({ + type: "exec_response", + id: request.id, + output: "", + exitCode: 0, + }) + "\n", + ); + + const results = await uploadPromise; + expect(results.length).toBe(1); + expect(results[0].path).toBe("/app/test.txt"); + expect(results[0].error).toBeNull(); + + restore(); + }); + }); + + describe("downloadFiles", () => { + it("should download files via execute using base64 encoding", async () => { + const { sandbox, inputStream, writtenMessages, restore } = + createTestSandbox(); + + sandbox.startListening(); + + const downloadPromise = sandbox.downloadFiles(["/app/test.txt"]); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Should have sent an exec_request + expect(writtenMessages.length).toBe(1); + const request = JSON.parse(writtenMessages[0].trim()); + expect(request.type).toBe("exec_request"); + + // Respond with base64-encoded content + const b64Content = Buffer.from("hello world").toString("base64"); + inputStream.push( + JSON.stringify({ + type: "exec_response", + id: request.id, + output: b64Content, + exitCode: 0, + }) + "\n", + ); + + const results = await downloadPromise; + expect(results.length).toBe(1); + expect(results[0].path).toBe("/app/test.txt"); + expect(results[0].error).toBeNull(); + expect(new TextDecoder().decode(results[0].content!)).toBe("hello world"); + + restore(); + }); + + it("should handle file not found", async () => { + const { sandbox, inputStream, writtenMessages, restore } = + createTestSandbox(); + + sandbox.startListening(); + + const downloadPromise = sandbox.downloadFiles(["/app/missing.txt"]); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + const request = JSON.parse(writtenMessages[0].trim()); + + // Respond with not found + inputStream.push( + JSON.stringify({ + type: "exec_response", + id: request.id, + output: "__NOT_FOUND__", + exitCode: 1, + }) + "\n", + ); + + const results = await downloadPromise; + expect(results.length).toBe(1); + expect(results[0].error).toBe("file_not_found"); + expect(results[0].content).toBeNull(); + + restore(); + }); + }); +}); diff --git a/libs/harbor/src/rpc-sandbox.ts b/libs/harbor/src/rpc-sandbox.ts new file mode 100644 index 000000000..43e54cad1 --- /dev/null +++ b/libs/harbor/src/rpc-sandbox.ts @@ -0,0 +1,266 @@ +/** + * RpcSandbox: A BaseSandbox implementation that bridges execute() calls + * to a Python process via JSON-RPC over stdin/stdout. + * + * The Python process (Harbor wrapper) calls environment.exec() in the + * actual sandbox container and sends the results back. + * + * All higher-level file operations (read, write, edit, ls, grep, glob) + * are inherited from BaseSandbox and work automatically via execute(). + * + * @packageDocumentation + */ + +import type { Interface as ReadlineInterface } from "readline"; + +import type { + ExecuteResponse, + FileDownloadResponse, + FileOperationError, + FileUploadResponse, +} from "deepagents"; +import { BaseSandbox } from "deepagents"; + +import { + type ExecResponse, + type IncomingMessage, + log, + nextRequestId, + parseIncomingMessage, + sendMessage, +} from "./rpc-protocol.js"; + +/** + * A pending execute request waiting for a response from Python. + */ +interface PendingRequest { + resolve: (response: ExecuteResponse) => void; + reject: (error: Error) => void; +} + +/** + * RpcSandbox extends BaseSandbox to execute commands via a JSON-RPC bridge. + * + * When the agent calls execute("ls -la"), this class: + * 1. Writes an exec_request JSON to stdout + * 2. Waits for the matching exec_response JSON from stdin + * 3. Returns the result as an ExecuteResponse + * + * The Python wrapper reads the request, calls environment.exec() in Harbor, + * and writes the response back. + * + * uploadFiles and downloadFiles are implemented via execute() using + * base64-encoded shell commands, since the sandbox is only accessible + * through Harbor's environment.exec(). + */ +export class RpcSandbox extends BaseSandbox { + readonly id: string; + + /** Map of request IDs to pending promise resolvers */ + #pendingRequests = new Map(); + + /** Whether the stdin reader has been started */ + #readerStarted = false; + + /** The readline interface for reading stdin */ + #reader: ReadlineInterface; + + /** Callback for non-exec_response messages (e.g., used by runner for init) */ + #onMessage?: (msg: IncomingMessage) => void; + + /** Bound line handler so we can remove it in dispose() */ + #lineHandler?: (line: string) => void; + + constructor(sessionId: string, reader: ReadlineInterface) { + super(); + this.id = sessionId; + this.#reader = reader; + } + + /** + * Set a callback for incoming messages that are NOT exec_response. + * Used by the runner to receive init messages. + */ + setMessageHandler(handler: (msg: IncomingMessage) => void): void { + this.#onMessage = handler; + } + + /** + * Start listening for incoming messages on stdin. + * Must be called before any execute() calls. + */ + startListening(): void { + if (this.#readerStarted) return; + this.#readerStarted = true; + + this.#lineHandler = (line: string) => { + const msg = parseIncomingMessage(line); + if (!msg) return; + + if (msg.type === "exec_response") { + this.#handleExecResponse(msg); + } else if (this.#onMessage) { + this.#onMessage(msg); + } + }; + + this.#reader.on("line", this.#lineHandler); + } + + /** + * Handle an exec_response message by resolving the matching pending request. + */ + #handleExecResponse(msg: ExecResponse): void { + const pending = this.#pendingRequests.get(msg.id); + if (!pending) { + log(`Warning: received exec_response for unknown request ID: ${msg.id}`); + return; + } + + this.#pendingRequests.delete(msg.id); + pending.resolve({ + output: msg.output, + exitCode: msg.exitCode, + truncated: false, + }); + } + + /** + * Execute a command in the Harbor sandbox via the Python bridge. + * + * Sends an exec_request to stdout and waits for the matching + * exec_response from stdin. + */ + async execute(command: string): Promise { + const id = nextRequestId(); + + // Create a promise that will be resolved when the response arrives + const promise = new Promise((resolve, reject) => { + this.#pendingRequests.set(id, { resolve, reject }); + }); + + // Send the request to Python via stdout + sendMessage({ + type: "exec_request", + id, + command, + }); + + return promise; + } + + /** + * Upload files to the sandbox via execute(). + * + * Uses base64 encoding to safely transfer file content through + * shell commands, similar to the Python HarborSandbox approach. + */ + async uploadFiles( + files: Array<[string, Uint8Array]>, + ): Promise { + const results: FileUploadResponse[] = []; + + for (const [filePath, content] of files) { + try { + // Base64-encode the content + const b64 = Buffer.from(content).toString("base64"); + + // Use heredoc to pass content via stdin to avoid ARG_MAX limits + const cmd = ` +parent_dir=$(dirname '${filePath.replace(/'/g, "'\\''")}') +mkdir -p "$parent_dir" 2>/dev/null +base64 -d > '${filePath.replace(/'/g, "'\\''")}' <<'__DEEPAGENTS_EOF__' +${b64} +__DEEPAGENTS_EOF__`; + + const result = await this.execute(cmd); + + if (result.exitCode !== 0) { + results.push({ + path: filePath, + error: this.#mapError(result.output), + }); + } else { + results.push({ path: filePath, error: null }); + } + } catch { + results.push({ path: filePath, error: "invalid_path" }); + } + } + + return results; + } + + /** + * Download files from the sandbox via execute(). + * + * Reads files by base64-encoding their content in the sandbox + * and decoding it on the JS side. + */ + async downloadFiles(paths: string[]): Promise { + const results: FileDownloadResponse[] = []; + + for (const filePath of paths) { + try { + const safePath = filePath.replace(/'/g, "'\\''"); + const cmd = `if [ -f '${safePath}' ]; then base64 '${safePath}'; else echo '__NOT_FOUND__'; exit 1; fi`; + + const result = await this.execute(cmd); + + if (result.exitCode !== 0 || result.output.trim() === "__NOT_FOUND__") { + results.push({ + path: filePath, + content: null, + error: "file_not_found", + }); + } else { + // Decode the base64 output + const content = Buffer.from(result.output.trim(), "base64"); + results.push({ + path: filePath, + content: new Uint8Array(content), + error: null, + }); + } + } catch { + results.push({ path: filePath, content: null, error: "invalid_path" }); + } + } + + return results; + } + + /** + * Stop listening, remove the line handler, and reject all pending requests. + */ + dispose(): void { + // Remove the readline listener so it doesn't fire after disposal + if (this.#lineHandler) { + this.#reader.removeListener("line", this.#lineHandler); + this.#lineHandler = undefined; + this.#readerStarted = false; + } + + for (const [id, pending] of this.#pendingRequests) { + pending.reject(new Error(`RpcSandbox disposed, request ${id} cancelled`)); + } + this.#pendingRequests.clear(); + } + + /** + * Map error output to a standardized FileOperationError. + */ + #mapError(output: string): FileOperationError { + const lower = output.toLowerCase(); + if (lower.includes("not found") || lower.includes("no such file")) { + return "file_not_found"; + } + if (lower.includes("permission denied")) { + return "permission_denied"; + } + if (lower.includes("is a directory")) { + return "is_directory"; + } + return "invalid_path"; + } +} diff --git a/libs/harbor/src/runner.int.test.ts b/libs/harbor/src/runner.int.test.ts new file mode 100644 index 000000000..f7959068e --- /dev/null +++ b/libs/harbor/src/runner.int.test.ts @@ -0,0 +1,187 @@ +/** + * Integration test for the Harbor runner. + * + * Spawns runner.ts as a child process and plays the Python side of the + * JSON-RPC protocol: sends init, handles exec_request/exec_response, + * and verifies the done message + clean exit. + * + * Requires ANTHROPIC_API_KEY (or the key for whichever model is used). + * Run with: vitest run --mode int + */ + +import { spawn, type ChildProcess } from "node:child_process"; +import path from "node:path"; +import { createInterface, type Interface as ReadlineInterface } from "readline"; +import { describe, it, expect, afterEach } from "vitest"; + +/** Path to the runner source (executed via tsx) */ +const RUNNER_PATH = path.resolve(__dirname, "runner.ts"); + +/** Find the tsx binary – prefer local node_modules/.bin, fall back to npx */ +function getTsxCommand(): { cmd: string; args: string[] } { + // Use npx tsx so we don't need to resolve the binary path ourselves + return { cmd: "npx", args: ["tsx", RUNNER_PATH] }; +} + +/** Send an NDJSON message to the child's stdin */ +function send(child: ChildProcess, msg: Record): void { + child.stdin!.write(JSON.stringify(msg) + "\n"); +} + +/** + * Collect lines from a readable stream via readline. + * Each line is parsed as JSON and pushed to the provided array. + * Returns the readline interface so it can be closed in cleanup. + */ +function collectJsonLines(child: ChildProcess): { + messages: Record[]; + rl: ReadlineInterface; +} { + const messages: Record[] = []; + const rl = createInterface({ input: child.stdout!, crlfDelay: Infinity }); + rl.on("line", (line: string) => { + const trimmed = line.trim(); + if (!trimmed) return; + try { + messages.push(JSON.parse(trimmed)); + } catch { + // Ignore non-JSON lines (shouldn't happen, but be safe) + } + }); + return { messages, rl }; +} + +describe("Harbor runner (end-to-end)", () => { + let child: ChildProcess | undefined; + let rl: ReadlineInterface | undefined; + + afterEach(() => { + rl?.close(); + if (child && child.exitCode === null) { + child.kill(); + } + }); + + it("should complete a simple agent run via the JSON-RPC bridge", async () => { + const { cmd, args } = getTsxCommand(); + + child = spawn(cmd, args, { + stdio: ["pipe", "pipe", "pipe"], + env: { + ...process.env, + // Ensure Node doesn't buffer stdout (not strictly needed for pipes + // but good practice) + NODE_NO_WARNINGS: "1", + }, + }); + + const { messages, rl: stdoutRl } = collectJsonLines(child); + rl = stdoutRl; + + // Collect stderr for debugging + const stderrChunks: string[] = []; + child.stderr!.on("data", (chunk: Buffer) => { + stderrChunks.push(chunk.toString()); + }); + + // Step 1: Send init message + send(child, { + type: "init", + instruction: + "Write the text 'hello harbor' to a file called /app/test.txt using a single echo command.", + sessionId: "int-test-session", + model: "anthropic:claude-sonnet-4-5-20250929", + systemPrompt: + "You are an autonomous agent. Execute commands in the sandbox. Your working directory is /app.", + }); + + // Step 2: Run the bridge loop – handle exec_requests, wait for done/error + const result = await new Promise>( + (resolve, reject) => { + const timeout = setTimeout(() => { + reject( + new Error( + `Runner timed out after 90s.\nstderr:\n${stderrChunks.join("")}`, + ), + ); + }, 90_000); + + // Poll for messages from the runner + const interval = setInterval(() => { + while (messages.length > 0) { + const msg = messages.shift()!; + const msgType = msg.type as string; + + if (msgType === "exec_request") { + // Simulate executing the command – we just return a mock success + const command = msg.command as string; + let output = ""; + let exitCode = 0; + + // Provide realistic responses for common commands + if (command.includes("echo") && command.includes(">")) { + // File write via echo redirect + output = ""; + exitCode = 0; + } else if (command.includes("cat ")) { + output = "hello harbor"; + exitCode = 0; + } else if (command.includes("pwd")) { + output = "/app"; + exitCode = 0; + } else if (command.includes("ls")) { + output = "test.txt"; + exitCode = 0; + } else if (command.includes("base64")) { + // File upload/download via base64 – just succeed + output = ""; + exitCode = 0; + } else { + // Default: succeed with empty output + output = ""; + exitCode = 0; + } + + send(child!, { + type: "exec_response", + id: msg.id, + output, + exitCode, + }); + } else if (msgType === "done" || msgType === "error") { + clearTimeout(timeout); + clearInterval(interval); + resolve(msg); + return; + } + } + }, 50); + }, + ); + + // Step 3: Verify the result + expect(result.type).toBe("done"); + + const resultMessages = result.messages as Array>; + expect(resultMessages).toBeDefined(); + expect(resultMessages.length).toBeGreaterThan(0); + + // Should contain at least a human message and an AI message + const roles = resultMessages.map((m) => m.role); + expect(roles).toContain("human"); + expect(roles).toContain("ai"); + + // Step 4: Wait for clean exit + const exitCode = await new Promise((resolve) => { + if (child!.exitCode !== null) { + resolve(child!.exitCode); + return; + } + child!.on("exit", (code) => resolve(code)); + // Give it a few seconds to exit cleanly + setTimeout(() => resolve(child!.exitCode), 5_000); + }); + + expect(exitCode).toBe(0); + }, 100_000); // 100s timeout for the full test +}); diff --git a/libs/harbor/src/runner.ts b/libs/harbor/src/runner.ts new file mode 100644 index 000000000..6aefc336c --- /dev/null +++ b/libs/harbor/src/runner.ts @@ -0,0 +1,221 @@ +#!/usr/bin/env node +/** + * Harbor runner: Node.js entry point spawned by the Python wrapper. + * + * Protocol: + * 1. Reads an "init" message from stdin with instruction, model, etc. + * 2. Creates an RpcSandbox and a DeepAgent + * 3. Invokes the agent with the instruction + * 4. Sends a "done" message to stdout with serialized messages + * + * All logging goes to stderr to keep stdout clean for the JSON-RPC protocol. + * + * @packageDocumentation + */ + +import { awaitAllCallbacks } from "@langchain/core/callbacks/promises"; +import { AIMessage, HumanMessage, ToolMessage } from "@langchain/core/messages"; +import type { BaseMessage } from "@langchain/core/messages"; +import { RunTree } from "langsmith"; +import { withRunTree } from "langsmith/traceable"; + +import { createDeepAgent } from "deepagents"; + +import { RpcSandbox } from "./rpc-sandbox.js"; +import { + type InitMessage, + type SerializedMessage, + createStdinReader, + log, + sendMessage, +} from "./rpc-protocol.js"; + +/** + * Serialize a LangChain BaseMessage into a simple JSON-friendly format + * for transfer to Python. Python will convert these into ATIF trajectory steps. + */ +function serializeMessage(msg: BaseMessage): SerializedMessage { + // Determine role + let role: string; + if (HumanMessage.isInstance(msg)) { + role = "human"; + } else if (AIMessage.isInstance(msg)) { + role = "ai"; + } else if (ToolMessage.isInstance(msg)) { + role = "tool"; + } else { + role = "system"; + } + + // Extract text content + const content = + typeof msg.content === "string" ? msg.content : JSON.stringify(msg.content); + + const serialized: SerializedMessage = { role, content }; + + // AI message specifics + if (AIMessage.isInstance(msg)) { + // Token usage + const usage = msg.usage_metadata; + if (usage) { + serialized.usage = { + input_tokens: usage.input_tokens ?? 0, + output_tokens: usage.output_tokens ?? 0, + }; + } + + // Tool calls + if (msg.tool_calls && msg.tool_calls.length > 0) { + serialized.toolCalls = msg.tool_calls.map((tc) => ({ + id: tc.id ?? "", + name: tc.name, + args: tc.args as Record, + })); + } + } + + // Tool message specifics + if (ToolMessage.isInstance(msg)) { + serialized.toolCallId = msg.tool_call_id; + } + + return serialized; +} + +/** + * Wait for the init message from Python. + * Returns a promise that resolves with the init message, or rejects + * if no init message arrives within the timeout period. + */ +async function waitForInit( + sandbox: RpcSandbox, + timeoutMs = 30_000, +): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject( + new Error( + `Timed out waiting for init message from Python after ${timeoutMs}ms`, + ), + ); + }, timeoutMs); + + sandbox.setMessageHandler((msg) => { + if (msg.type === "init") { + clearTimeout(timer); + resolve(msg as InitMessage); + } + }); + }); +} + +/** + * Main entry point for the Harbor JS runner. + */ +async function main(): Promise { + log("Starting Harbor JS runner..."); + + // Create the stdin reader + const reader = createStdinReader(); + + // Create sandbox with a temporary ID (will be updated after init) + const sandbox = new RpcSandbox("pending", reader); + sandbox.startListening(); + + // Wait for the init message from Python + log("Waiting for init message..."); + const init = await waitForInit(sandbox); + log(`Received init: model=${init.model}, sessionId=${init.sessionId}`); + + // Update sandbox ID to the real session ID + // We need to create a new sandbox with the correct ID + // since `id` is readonly, we create a new one + sandbox.dispose(); + + const rpcSandbox = new RpcSandbox(init.sessionId, reader); + rpcSandbox.startListening(); + + try { + // Create the deep agent with the RPC sandbox backend + const agent = createDeepAgent({ + model: init.model, + backend: rpcSandbox, + systemPrompt: init.systemPrompt, + }); + + log("Agent created, invoking with instruction..."); + + const invokeConfig = { + configurable: { + thread_id: init.sessionId, + }, + recursionLimit: 10_000, + }; + + const invokeAgent = () => + agent.invoke( + { messages: [{ role: "user", content: init.instruction }] }, + invokeConfig, + ); + + // If LangSmith distributed tracing headers were provided by the Python + // wrapper, reconstruct the parent RunTree and run the agent inside it. + // This nests all LLM calls and tool invocations under the parent trace. + // See: https://docs.langchain.com/langsmith/distributed-tracing + const parentRunTree = init.langsmithHeaders + ? RunTree.fromHeaders(init.langsmithHeaders) + : undefined; + + const result = parentRunTree + ? await withRunTree(parentRunTree, invokeAgent) + : await invokeAgent(); + + // Serialize all messages + const messages: SerializedMessage[] = []; + const rawMessages = result.messages as BaseMessage[]; + + for (const msg of rawMessages) { + messages.push(serializeMessage(msg)); + } + + log(`Agent completed. ${messages.length} messages in history.`); + + // Send the done message + sendMessage({ + type: "done", + messages, + }); + } catch (error) { + const isError = + typeof error === "object" && error !== null && "message" in error; + const errorMsg = isError ? (error as Error).message : String(error); + const errorStack = isError ? (error as Error).stack : undefined; + + log(`Agent error: ${errorMsg}`); + + sendMessage({ + type: "error", + message: errorMsg, + stack: errorStack, + }); + + process.exitCode = 1; + } finally { + // Flush all pending LangSmith traces and LangChain callbacks before the + // process exits. Without this, Python may terminate the child process + // before background trace batches have been sent. + await awaitAllCallbacks(); + + rpcSandbox.dispose(); + reader.close(); + // Destroy stdin so the event loop can drain and the process exits cleanly. + // Without this, Node keeps waiting for more data on the piped stdin from Python. + process.stdin.destroy(); + } +} + +// Run the main function +main().catch((error) => { + log(`Fatal error: ${error}`); + process.exitCode = 1; +}); diff --git a/libs/harbor/tsconfig.json b/libs/harbor/tsconfig.json new file mode 100644 index 000000000..de299e72b --- /dev/null +++ b/libs/harbor/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": ["src/**/*.ts", "src/*.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/libs/harbor/tsdown.config.ts b/libs/harbor/tsdown.config.ts new file mode 100644 index 000000000..1076e47f0 --- /dev/null +++ b/libs/harbor/tsdown.config.ts @@ -0,0 +1,27 @@ +import { defineConfig } from "tsdown"; + +// Mark all node_modules as external since this is a library +const external = [/^[^./]/]; + +export default defineConfig([ + { + entry: ["./src/index.ts", "./src/runner.ts"], + format: ["esm"], + dts: true, + clean: true, + sourcemap: true, + outDir: "dist", + outExtensions: () => ({ js: ".js" }), + external, + }, + { + entry: ["./src/index.ts", "./src/runner.ts"], + format: ["cjs"], + dts: true, + clean: true, + sourcemap: true, + outDir: "dist", + outExtensions: () => ({ js: ".cjs" }), + external, + }, +]); diff --git a/libs/harbor/vitest.config.ts b/libs/harbor/vitest.config.ts new file mode 100644 index 000000000..76d7f778a --- /dev/null +++ b/libs/harbor/vitest.config.ts @@ -0,0 +1,47 @@ +import path from "node:path"; +import { + configDefaults, + defineConfig, + type ViteUserConfigExport, +} from "vitest/config"; + +// Load .env from workspace root +import dotenv from "dotenv"; +dotenv.config({ path: path.resolve(__dirname, "../../.env") }); + +export default defineConfig((env) => { + const common: ViteUserConfigExport = { + test: { + environment: "node", + hideSkippedTests: true, + globals: true, + testTimeout: 60_000, + hookTimeout: 60_000, + teardownTimeout: 60_000, + exclude: ["**/*.int.test.ts", ...configDefaults.exclude], + }, + }; + + if (env.mode === "int") { + return { + test: { + ...common.test, + globals: false, + testTimeout: 100_000, + hookTimeout: 120_000, + teardownTimeout: 120_000, + exclude: configDefaults.exclude, + include: ["**/*.int.test.ts"], + name: "int", + sequence: { concurrent: false }, + }, + } satisfies ViteUserConfigExport; + } + + return { + test: { + ...common.test, + include: ["src/**/*.test.ts"], + }, + } satisfies ViteUserConfigExport; +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 641d99999..09349d02d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -226,6 +226,52 @@ importers: specifier: ^4.0.18 version: 4.0.18(@types/node@25.2.0)(@vitest/ui@4.0.18)(jiti@2.6.1)(tsx@4.21.0)(yaml@2.8.2) + libs/harbor: + dependencies: + '@langchain/anthropic': + specifier: ^1.0.0 + version: 1.3.14(@langchain/core@1.1.19(openai@6.17.0(ws@8.19.0)(zod@4.3.6))) + '@langchain/openai': + specifier: ^1.2.3 + version: 1.2.4(@langchain/core@1.1.19(openai@6.17.0(ws@8.19.0)(zod@4.3.6)))(ws@8.19.0) + langsmith: + specifier: '>=0.2.0' + version: 0.4.12(openai@6.17.0(ws@8.19.0)(zod@4.3.6)) + devDependencies: + '@langchain/core': + specifier: ^1.1.19 + version: 1.1.19(openai@6.17.0(ws@8.19.0)(zod@4.3.6)) + '@langchain/langgraph': + specifier: ^1.1.3 + version: 1.1.3(@langchain/core@1.1.19(openai@6.17.0(ws@8.19.0)(zod@4.3.6)))(zod@4.3.6) + '@tsconfig/recommended': + specifier: ^1.0.13 + version: 1.0.13 + '@types/node': + specifier: ^25.1.0 + version: 25.2.0 + '@vitest/coverage-v8': + specifier: ^4.0.18 + version: 4.0.18(vitest@4.0.18) + deepagents: + specifier: workspace:* + version: link:../deepagents + dotenv: + specifier: ^17.2.3 + version: 17.2.3 + tsdown: + specifier: ^0.20.1 + version: 0.20.2(synckit@0.11.12)(typescript@5.9.3) + tsx: + specifier: ^4.21.0 + version: 4.21.0 + typescript: + specifier: ^5.9.3 + version: 5.9.3 + vitest: + specifier: ^4.0.18 + version: 4.0.18(@types/node@25.2.0)(@vitest/ui@4.0.18)(jiti@2.6.1)(tsx@4.21.0)(yaml@2.8.2) + libs/providers/daytona: dependencies: '@daytonaio/sdk':