Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 210 additions & 30 deletions researchclaw/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,71 @@

import argparse
import hashlib
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from collections.abc import Mapping
from typing import cast

from researchclaw.adapters import AdapterBundle
from researchclaw.config import RCConfig
from researchclaw.config import (
CONFIG_SEARCH_ORDER,
EXAMPLE_CONFIG,
RCConfig,
resolve_config_path,
)
from researchclaw.health import print_doctor_report, run_doctor, write_doctor_report


def _resolve_config_or_exit(args: argparse.Namespace) -> Path | None:
"""Resolve config path from args, printing helpful errors on failure.

Returns the resolved Path on success, or None if the config cannot be found
(after printing an error message to stderr).
"""
path = resolve_config_path(getattr(args, "config", None))
if path is not None and not path.exists():
print(f"Error: config file not found: {path}", file=sys.stderr)
return None
if path is None:
search_list = ", ".join(CONFIG_SEARCH_ORDER)
print(
f"Error: no config file found (searched: {search_list}).\n"
f"Run 'researchclaw init' to create one from the example template.",
file=sys.stderr,
)
return None
return path


def _generate_run_id(topic: str) -> str:
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
topic_hash = hashlib.sha256(topic.encode()).hexdigest()[:6]
return f"rc-{ts}-{topic_hash}"


def _find_latest_run(artifacts_dir: str = "artifacts") -> Path | None:
"""Find the most recent run directory that contains a checkpoint."""
base = Path(artifacts_dir)
if not base.is_dir():
return None
candidates = sorted(
(d for d in base.iterdir() if d.is_dir() and d.name.startswith("rc-")),
key=lambda d: d.stat().st_mtime,
reverse=True,
)
for d in candidates:
if (d / "checkpoint.json").exists():
return d
return None


def cmd_run(args: argparse.Namespace) -> int:
config_path = Path(cast(str, args.config))
resolved = _resolve_config_or_exit(args)
if resolved is None:
return 1
config_path = resolved
topic = cast(str | None, args.topic)
output = cast(str | None, args.output)
from_stage_name = cast(str | None, args.from_stage)
Expand All @@ -31,10 +77,6 @@ def cmd_run(args: argparse.Namespace) -> int:
resume = cast(bool, args.resume)
skip_noncritical = cast(bool, args.skip_noncritical_stage)

if not config_path.exists():
print(f"Error: config file not found: {config_path}", file=sys.stderr)
return 1

kb_root_path = None
config = RCConfig.load(config_path, check_paths=False)

Expand All @@ -57,29 +99,55 @@ def cmd_run(args: argparse.Namespace) -> int:
print(f"FAILED — {msg}", file=sys.stderr)
return 1

run_id = _generate_run_id(config.research.topic)
run_dir = Path(output or f"artifacts/{run_id}")
run_dir.mkdir(parents=True, exist_ok=True)
from researchclaw.pipeline.runner import execute_pipeline, read_checkpoint
from researchclaw.pipeline.stages import Stage

# --- Resolve run directory and start stage ---
from_stage = Stage.TOPIC_INIT

if resume:
# Resolve existing run directory
if output:
run_dir = Path(output)
else:
run_dir = _find_latest_run() # type: ignore[assignment]
if run_dir is None:
print(
"No resumable run found in artifacts/. "
"Use --output to specify a run directory.",
file=sys.stderr,
)
return 1

if not run_dir.exists():
print(f"Run directory not found: {run_dir}", file=sys.stderr)
return 1

resumed = read_checkpoint(run_dir)
if resumed is None:
print(f"No checkpoint found in {run_dir}", file=sys.stderr)
return 1

from_stage = resumed
# Read run_id from checkpoint instead of generating a new one
cp_data = json.loads(
(run_dir / "checkpoint.json").read_text(encoding="utf-8")
)
run_id = cp_data.get("run_id", run_dir.name)
print(f"Resuming from checkpoint: Stage {int(from_stage)}: {from_stage.name}")
else:
if from_stage_name:
from_stage = Stage[from_stage_name.upper()]
run_id = _generate_run_id(config.research.topic)
run_dir = Path(output or f"artifacts/{run_id}")
run_dir.mkdir(parents=True, exist_ok=True)

if config.knowledge_base.root:
kb_root_path = Path(config.knowledge_base.root)
kb_root_path.mkdir(parents=True, exist_ok=True)

adapters = AdapterBundle()

from researchclaw.pipeline.runner import execute_pipeline, read_checkpoint
from researchclaw.pipeline.stages import Stage

# --- Determine start stage ---
from_stage = Stage.TOPIC_INIT
if from_stage_name:
from_stage = Stage[from_stage_name.upper()]
elif resume:
resumed = read_checkpoint(run_dir)
if resumed is not None:
from_stage = resumed
print(f"Resuming from checkpoint: Stage {int(from_stage)}: {from_stage.name}")

print(f"ResearchClaw v0.1.0 — Starting pipeline")
print(f" Run ID: {run_id}")
print(f" Topic: {config.research.topic}")
Expand Down Expand Up @@ -109,11 +177,11 @@ def cmd_validate(args: argparse.Namespace) -> int:
from researchclaw.config import validate_config
import yaml

config_path = Path(cast(str, args.config))
no_check_paths = cast(bool, args.no_check_paths)
if not config_path.exists():
print(f"Error: config file not found: {config_path}", file=sys.stderr)
resolved = _resolve_config_or_exit(args)
if resolved is None:
return 1
config_path = resolved
no_check_paths = cast(bool, args.no_check_paths)

with config_path.open(encoding="utf-8") as f:
loaded = cast(object, yaml.safe_load(f))
Expand Down Expand Up @@ -142,7 +210,10 @@ def cmd_validate(args: argparse.Namespace) -> int:


def cmd_doctor(args: argparse.Namespace) -> int:
config_path = Path(cast(str, args.config))
resolved = _resolve_config_or_exit(args)
if resolved is None:
return 1
config_path = resolved
output = cast(str | None, args.output)

report = run_doctor(config_path)
Expand All @@ -151,6 +222,105 @@ def cmd_doctor(args: argparse.Namespace) -> int:
write_doctor_report(report, Path(output))
return 0 if report.overall == "pass" else 1

_PROVIDER_CHOICES = {
"1": ("openai", "OPENAI_API_KEY"),
"2": ("openrouter", "OPENROUTER_API_KEY"),
"3": ("deepseek", "DEEPSEEK_API_KEY"),
"4": ("acp", ""),
}

_PROVIDER_URLS = {
"openai": "https://api.openai.com/v1",
"openrouter": "https://openrouter.ai/api/v1",
"deepseek": "https://api.deepseek.com/v1",
}

_PROVIDER_MODELS = {
"openai": ("gpt-4o", ["gpt-4.1", "gpt-4o-mini"]),
"openrouter": (
"anthropic/claude-3.5-sonnet",
["google/gemini-pro-1.5", "meta-llama/llama-3.1-70b-instruct"],
),
"deepseek": ("deepseek-chat", ["deepseek-reasoner"]),
}


def cmd_init(args: argparse.Namespace) -> int:
force = cast(bool, args.force)
dest = Path("config.arc.yaml")

if dest.exists() and not force:
print(f"{dest} already exists. Use --force to overwrite.")
return 0

example = Path.cwd() / EXAMPLE_CONFIG
if not example.exists():
print(f"Error: example config not found: {example}", file=sys.stderr)
return 1

# Interactive provider prompt (TTY only, else default to openai)
choice = "1"
if sys.stdin.isatty():
print("Select LLM provider:")
print(" 1) openai (requires OPENAI_API_KEY)")
print(" 2) openrouter (requires OPENROUTER_API_KEY)")
print(" 3) deepseek (requires DEEPSEEK_API_KEY)")
print(" 4) acp (local AI agent — no API key needed)")
raw = input("Choice [1]: ").strip()
if raw in _PROVIDER_CHOICES:
choice = raw

provider, api_key_env = _PROVIDER_CHOICES[choice]

content = example.read_text(encoding="utf-8")

# String-based replacement to preserve YAML comments
content = content.replace('provider: "openai"', f'provider: "{provider}"')

if provider == "acp":
# ACP doesn't need base_url or api_key_env
content = content.replace(
'base_url: "https://api.openai.com/v1"', 'base_url: ""'
)
content = content.replace('api_key_env: "OPENAI_API_KEY"', 'api_key_env: ""')
else:
base_url = _PROVIDER_URLS.get(provider, "https://api.openai.com/v1")
content = content.replace(
'base_url: "https://api.openai.com/v1"', f'base_url: "{base_url}"'
)
if api_key_env:
content = content.replace(
'api_key_env: "OPENAI_API_KEY"', f'api_key_env: "{api_key_env}"'
)

if provider in _PROVIDER_MODELS:
primary, fallbacks = _PROVIDER_MODELS[provider]
content = content.replace('primary_model: "gpt-4o"', f'primary_model: "{primary}"')
# Replace fallback models block
old_fallbacks = ' fallback_models:\n - "gpt-4.1"\n - "gpt-4o-mini"'
new_fallbacks = " fallback_models:\n" + "".join(
f' - "{m}"\n' for m in fallbacks
)
content = content.replace(old_fallbacks, new_fallbacks.rstrip("\n"))

dest.write_text(content, encoding="utf-8")
print(f"Created {dest} (provider: {provider})")

if provider == "acp":
print("\nNext steps:")
print(" 1. Ensure your ACP agent is installed and on PATH")
print(" 2. Edit config.arc.yaml to set llm.acp.agent if needed")
print(" 3. Run: researchclaw doctor")
else:
env_var = api_key_env or "OPENAI_API_KEY"
print(f"\nNext steps:")
print(f" 1. Export your API key: export {env_var}=sk-...")
print(" 2. Edit config.arc.yaml to customize your settings")
print(" 3. Run: researchclaw doctor")

return 0


def cmd_report(args: argparse.Namespace) -> int:
from researchclaw.report import generate_report, write_report

Expand Down Expand Up @@ -179,7 +349,8 @@ def main(argv: list[str] | None = None) -> int:
run_p = sub.add_parser("run", help="Run the 23-stage research pipeline")
_ = run_p.add_argument("--topic", "-t", help="Override research topic")
_ = run_p.add_argument(
"--config", "-c", default="config.yaml", help="Config file path"
"--config", "-c", default=None,
help="Config file (default: auto-detect config.arc.yaml or config.yaml)",
)
_ = run_p.add_argument("--output", "-o", help="Output directory")
_ = run_p.add_argument(
Expand All @@ -200,18 +371,25 @@ def main(argv: list[str] | None = None) -> int:
)
val_p = sub.add_parser("validate", help="Validate config file")
_ = val_p.add_argument(
"--config", "-c", default="config.yaml", help="Config file path"
"--config", "-c", default=None,
help="Config file (default: auto-detect config.arc.yaml or config.yaml)",
)
_ = val_p.add_argument(
"--no-check-paths", action="store_true", help="Skip path existence checks"
)

doc_p = sub.add_parser("doctor", help="Check environment and configuration health")
_ = doc_p.add_argument(
"--config", "-c", default="config.yaml", help="Config file path"
"--config", "-c", default=None,
help="Config file (default: auto-detect config.arc.yaml or config.yaml)",
)
_ = doc_p.add_argument("--output", "-o", help="Write JSON report to file")

init_p = sub.add_parser("init", help="Create config.arc.yaml from example template")
_ = init_p.add_argument(
"--force", action="store_true", help="Overwrite existing config.arc.yaml"
)

rpt_p = sub.add_parser("report", help="Generate human-readable run report")
_ = rpt_p.add_argument(
"--run-dir", required=True, help="Path to run artifacts directory"
Expand All @@ -227,6 +405,8 @@ def main(argv: list[str] | None = None) -> int:
return cmd_validate(args)
elif command == "doctor":
return cmd_doctor(args)
elif command == "init":
return cmd_init(args)
elif command == "report":
return cmd_report(args)
else:
Expand Down
17 changes: 16 additions & 1 deletion researchclaw/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@

import yaml

CONFIG_SEARCH_ORDER: tuple[str, ...] = ("config.arc.yaml", "config.yaml")
EXAMPLE_CONFIG = "config.researchclaw.example.yaml"


def resolve_config_path(explicit: str | None) -> Path | None:
"""Return first existing config from search order, or explicit path if given."""
if explicit is not None:
return Path(explicit)
for name in CONFIG_SEARCH_ORDER:
candidate = Path(name)
if candidate.exists():
return candidate
return None


REQUIRED_FIELDS = (
"project.name",
"research.topic",
Expand Down Expand Up @@ -106,7 +121,7 @@ class AcpConfig:
cwd: str = "."
acpx_command: str = ""
session_name: str = "researchclaw"
timeout_sec: int = 600
timeout_sec: int = 1200


@dataclass(frozen=True)
Expand Down
2 changes: 1 addition & 1 deletion researchclaw/llm/acp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ACPConfig:
cwd: str = "."
acpx_command: str = "" # auto-detect if empty
session_name: str = "researchclaw"
timeout_sec: int = 600 # per-prompt timeout
timeout_sec: int = 1200 # per-prompt timeout (code generation needs >600s)


def _find_acpx() -> str | None:
Expand Down
Loading