Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,74 @@ PORT=8010 make start
- 扫描并执行本地技能
- 运行零信任加固脚本和相关验证脚本

## Telegram Worker Orchestration:怎么用

现在 Telegram 入口已经接上 deterministic worker routing,可以直接把自然语言任务交给系统,由它决定走哪条最窄执行链路。

### 任务会怎么路由

| 你发的任务类型 | 实际路由 |
| --- | --- |
| 普通聊天、闲聊、非执行问题 | `claude_direct` |
| 分析、调研、方案、风险评估 | `autoresearch` |
| 明确代码修改、补测试、修 bug | `openhands` |
| 先分析再改代码的混合任务 | `autoresearch -> openhands` |
| 带 `main` / `merge` / `delete` / 高风险写意图 | 先挂 approval,再 resume worker |

### 最常见的用法

```text
请分析 src/autoresearch/core/services/worker_orchestrator.py 的风险和边界
```

- 会走 `autoresearch`

```text
请修复 src/autoresearch/api/routers/panel.py 的 bug 并补测试
```

- 会走 `openhands`

```text
先分析 src/autoresearch/api/routers/gateway_telegram.py 的问题,再修复这个 bug 并补测试
```

- 会走 `autoresearch -> openhands`

```text
请修复 src/demo_fix.py 并直接合并到 main
```

- 不会直接进 worker
- 系统会先创建 approval
- approval 通过后,自动恢复同一条 orchestration run

### approval 怎么批

- Telegram:`/approve`、`/approve <approval_id>`、`/approve <approval_id> approve [备注]`
- Panel:待审批列表里直接点 `Approve`
- Admin:Approval Queue 里直接批准
- API:`POST /api/v1/approvals/{approval_id}/decision`

## Worker Orchestration:实现了什么

- deterministic routing:同类任务稳定走同一类 worker,不靠随机 prompt 漂移
- approval replay payload:高风险任务在 approval 里持久化恢复所需上下文
- unified resume path:Telegram / panel / admin / approvals API 批准后统一回到同一条 resume 路径
- async approval UX:Telegram `/approve` 先回执,再异步恢复 worker 执行
- artifact-only analysis stage:`autoresearch -> openhands` 第一段只产出 plan / risk / test artifacts,不额外 finalize promotion
- observability:panel/admin 可以看到 selected worker、route reason、requested/effective promotion mode、approval/resume state、blocker

## Worker Orchestration 最佳实践

1. 分析任务和修改任务分开写,除非你明确想要 `autoresearch -> openhands` 链路。
2. 代码修改类任务尽量带明确文件路径,例如 `src/...py`,这样 allowed paths 会更窄。
3. 在 prompt 里直接写成功条件,例如“补测试”“通过 py_compile”“给出 risk summary”。
4. 不要让 worker 直接承担 merge / main / delete 之类高风险意图;让 approval gate 做唯一出口。
5. 如果你需要“先分析,再执行”,就明确写出“先分析…再修复…”,不要让模型自己猜。
6. 排障先看 panel/admin 的 route reason、promotion requested/effective mode、approval/resume state,再决定是不是 worker 问题。
7. 对链式任务,把第一段当成 artifact producer,不要期待它直接完成 promotion。

## OpenHands 接入边界(重要)

- “更容易上手”指 AAS 的统一启动和排错流程:`make setup -> make doctor -> make start`。
Expand Down
33 changes: 33 additions & 0 deletions configs/agents/autoresearch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"id": "autoresearch",
"kind": "process",
"entrypoint": "drivers/autoresearch_adapter.sh",
"version": "0.1",
"capabilities": [
"read_repo",
"produce_patchable_suggestions",
"produce_execution_plan",
"produce_test_plan",
"produce_risk_summary"
],
"default_mode": "patch_only",
"policy_defaults": {
"timeout_sec": 120,
"max_steps": 1,
"network": "disabled",
"network_allowlist": [],
"tool_allowlist": ["read", "write", "bash"],
"allowed_paths": ["src/**", "tests/**", "docs/**"],
"forbidden_paths": [
".git/**",
"logs/**",
".masfactory_runtime/**",
"memory/**"
],
"max_changed_files": 10,
"max_patch_lines": 300,
"allow_binary_changes": false,
"cleanup_on_success": true,
"retain_workspace_on_failure": true
}
}
151 changes: 151 additions & 0 deletions drivers/autoresearch_adapter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env bash
set -euo pipefail

require_env() {
local key="$1"
if [[ -z "${!key:-}" ]]; then
echo "[aep][autoresearch] missing env: ${key}" >&2
exit 40
fi
}

require_env "AEP_WORKSPACE"
require_env "AEP_ARTIFACT_DIR"
require_env "AEP_JOB_SPEC"
require_env "AEP_RESULT_PATH"
require_env "AEP_BASELINE"

PY_BIN="${PYTHON_BIN:-python3}"

"${PY_BIN}" - "${AEP_JOB_SPEC}" "${AEP_WORKSPACE}" "${AEP_ARTIFACT_DIR}" "${AEP_RESULT_PATH}" "${AEP_BASELINE}" <<'PY'
import json
import re
import sys
from pathlib import Path

job_path = Path(sys.argv[1])
workspace = Path(sys.argv[2])
artifact_dir = Path(sys.argv[3])
result_path = Path(sys.argv[4])
baseline = Path(sys.argv[5])

payload = json.loads(job_path.read_text(encoding="utf-8"))
task = str(payload.get("task") or "autoresearch task").strip()
run_id = str(payload.get("run_id") or "autoresearch-run").strip()
agent_id = str(payload.get("agent_id") or "autoresearch").strip()
policy = payload.get("policy") or {}
metadata = payload.get("metadata") or {}
allowed_paths = list(policy.get("allowed_paths") or [])
deliverables = list(metadata.get("deliverables") or ["execution_plan", "test_plan", "risk_summary", "patch_suggestion"])

def pick_target(patterns: list[str]) -> Path:
for pattern in patterns:
if not any(char in pattern for char in "*?["):
return workspace / pattern
for pattern in patterns:
prefix = pattern.split("*", 1)[0].split("?", 1)[0].split("[", 1)[0].rstrip("/")
if prefix:
if "." in Path(prefix).name:
return workspace / prefix
return workspace / prefix / "autoresearch_worker_output.py"
return workspace / "src" / "autoresearch_worker_output.py"

target = pick_target(allowed_paths)
target.parent.mkdir(parents=True, exist_ok=True)
safe_task = task.replace('"""', "'''")

if target.suffix == ".py":
target.write_text(
'"""Generated by the AutoResearch AEP adapter."""\n\n'
"def plan() -> dict[str, str]:\n"
f' return {{"run_id": "{run_id}", "task": "{safe_task}", "status": "suggested"}}\n',
encoding="utf-8",
)
else:
target.write_text(
f"AutoResearch patch candidate\n\nTask: {task}\n",
encoding="utf-8",
)

artifact_dir.mkdir(parents=True, exist_ok=True)
artifact_refs = []

def write_artifact(name: str, content: str, kind: str) -> None:
path = artifact_dir / f"{name}.md"
path.write_text(content, encoding="utf-8")
artifact_refs.append({"name": name, "kind": kind, "uri": str(path), "sha256": None})

if "execution_plan" in deliverables:
write_artifact(
"execution_plan",
"# Execution Plan\n\n"
f"- Scope target: `{target.relative_to(workspace).as_posix()}`\n"
f"- Research task: {task}\n"
"- Emit a narrow patch candidate inside allowed_paths.\n",
"plan",
)
if "test_plan" in deliverables:
validators = payload.get("validators") or []
command = ""
for item in validators:
if item.get("kind") == "command" and item.get("command"):
command = str(item["command"])
break
write_artifact(
"test_plan",
"# Test Plan\n\n"
f"- Validate with: `{command}`\n"
"- Fail closed if the validator does not pass.\n",
"plan",
)
if "risk_summary" in deliverables:
write_artifact(
"risk_summary",
"# Risk Summary\n\n"
"- No direct git writes.\n"
"- No direct promotion finalize.\n"
"- Scope stays inside allowed_paths.\n",
"report",
)
if "patch_suggestion" in deliverables:
write_artifact(
"patch_suggestion",
"# Patch Suggestion\n\n"
f"- Candidate target: `{target.relative_to(workspace).as_posix()}`\n"
"- Promotion remains gated.\n",
"report",
)

base_files = {p.relative_to(baseline).as_posix() for p in baseline.rglob("*") if p.is_file()}
workspace_files = {p.relative_to(workspace).as_posix() for p in workspace.rglob("*") if p.is_file()}
changed = []
for rel in sorted(base_files | workspace_files):
base_path = baseline / rel
ws_path = workspace / rel
if not base_path.exists() or not ws_path.exists():
changed.append(rel)
continue
if base_path.read_bytes() != ws_path.read_bytes():
changed.append(rel)

result = {
"protocol_version": "aep/v0",
"run_id": run_id,
"agent_id": agent_id,
"attempt": 1,
"status": "succeeded",
"summary": "autoresearch adapter produced scoped artifacts and a patch candidate",
"changed_paths": changed,
"output_artifacts": artifact_refs,
"metrics": {
"duration_ms": 0,
"steps": 1,
"commands": 0,
"prompt_tokens": None,
"completion_tokens": None,
},
"recommended_action": "promote",
"error": None,
}
result_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
PY
11 changes: 11 additions & 0 deletions memory/2026-03-28.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@
- Telegram 决策动作直接写回统一 approval store,并写入 `resolved_via=telegram_command` 元数据,不再是只读查询。
- help 文案和 approval detail 都补了决策用法,减少用户在 bot 内来回试错。
- 相关 targeted regression 已多轮通过,当前最新结果为 `62 passed`,`624 tests collected`。
- worker orchestration 第一版已接入:
- 新增 deterministic routing,普通聊天继续走 `claude_direct`,分析类任务走 `autoresearch`,代码修改类任务走 `openhands`,混合任务走 `autoresearch -> openhands`。
- Telegram webhook 已能在高风险写操作前置创建 approval,不再让 execution worker 直接越过审批线。
- panel 现在能从 run metadata 看见选中的 worker、选路原因和 promotion 结果,并且对 orchestrated worker run 禁止 panel 侧 cancel/retry 误操作。
- 相关 focused regression 当前通过:`40 passed`(routing/gateway/panel/openhands backend)+ `34 passed`(worker/gate strict suites)。
- worker orchestration 第二轮补强:
- approval metadata 现在会持久化 replay payload;Telegram / panel / admin / approvals API 这几条批准路径在 approval 通过后都能自动 resume orchestration run。
- resumed run 会把 approval id、approval status、resume state、promotion requested/effective mode 等观测字段写回 run metadata;panel/admin 也补了 why/state 视图,能看出为什么选这个 worker、为什么降级成 patch、以及卡在什么 approval/resume 状态。
- `autoresearch -> openhands` 链路里的第一段 analysis 已改为 artifact-only:保留 execution/test/risk artifacts,不再额外 finalize promotion。
- 新增 e2e/regression 覆盖:Telegram pending approval、approval granted 后 resume、autoresearch -> openhands chained execution。
- 当前补强回归结果:`50 passed`(router/orchestrator/panel/admin/approvals)+ `69 passed`(含 controlled backend / strict chain suites)。
9 changes: 9 additions & 0 deletions src/autoresearch/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from autoresearch.core.services.self_integration import SelfIntegrationService
from autoresearch.core.services.telegram_notify import TelegramNotifierService
from autoresearch.core.services.variants import VariantService
from autoresearch.core.services.worker_orchestrator import WorkerOrchestratorService
from autoresearch.shared.models import (
ClaudeAgentRunRead,
AdminAgentConfigRead,
Expand Down Expand Up @@ -208,6 +209,14 @@ def get_claude_agent_service() -> ClaudeAgentService:
)


@lru_cache(maxsize=1)
def get_worker_orchestrator_service() -> WorkerOrchestratorService:
return WorkerOrchestratorService(
agent_service=get_claude_agent_service(),
approval_service=get_approval_store_service(),
)


def get_openviking_memory_service(
openclaw_service: OpenClawCompatService = Depends(get_openclaw_compat_service),
) -> OpenVikingMemoryService:
Expand Down
Loading
Loading