-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcoder-bridge.py
More file actions
304 lines (246 loc) · 9.14 KB
/
coder-bridge.py
File metadata and controls
304 lines (246 loc) · 9.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#!/usr/bin/env python3
"""
openclaw-coder-bridge
A background daemon that bridges OpenClaw agents with AI coding tools
via a file-based task queue (inbox/ -> outbox/).
https://github.com/dlxeva/openclaw-coder-bridge
"""
import os
import json
import time
import hashlib
import shutil
import subprocess
import signal
import sys
import locale
from datetime import datetime
from pathlib import Path
# Windows only: warn if system encoding is not UTF-8
if sys.platform == "win32":
_enc = locale.getpreferredencoding(False)
if _enc.upper().replace("-", "") not in ("UTF8", "UTF-8"):
print(f"[WARNING] System encoding is {_enc}, not UTF-8. "
"If you see path or decode errors, enable 'Beta: Use Unicode UTF-8' "
"in Windows Region → Administrative → Change system locale, then reboot.")
# 路径配置
BASE_DIR = Path(__file__).parent
INBOX_DIR = BASE_DIR / "inbox"
OUTBOX_DIR = BASE_DIR / "outbox"
ARCHIVE_DIR = BASE_DIR / "archive"
LOG_FILE = BASE_DIR / "bridge.log"
STATE_FILE = BASE_DIR / "bridge-status.json"
PID_FILE = BASE_DIR / "bridge.pid"
def check_single_instance():
"""确保只有一个 bridge 实例在运行"""
if PID_FILE.exists():
old_pid = int(PID_FILE.read_text().strip())
try:
import psutil
if psutil.pid_exists(old_pid):
print(f"Bridge 已在运行 (PID {old_pid}),退出。")
sys.exit(0)
except ImportError:
# psutil 不可用时用 os.kill 检测
try:
os.kill(old_pid, 0)
print(f"Bridge 已在运行 (PID {old_pid}),退出。")
sys.exit(0)
except OSError:
pass # 进程不存在,继续启动
PID_FILE.write_text(str(os.getpid()))
# 超时配置(秒)
CODER_TIMEOUT = int(os.environ.get("CODER_TIMEOUT", "600"))
# Telegram 配置
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
# Windows only: locate Git Bash (claude is a bash script on Windows)
def _find_bash():
if env := os.environ.get("BASH_EXE"):
return env
candidates = [
r"C:\Program Files\Git\usr\bin\bash.exe",
r"C:\Program Files (x86)\Git\usr\bin\bash.exe",
]
for p in candidates:
if Path(p).exists():
return p
raise RuntimeError(
"Git Bash not found. Install Git for Windows, "
"or set the BASH_EXE environment variable to your bash.exe path."
)
BASH_EXE = _find_bash() if sys.platform == "win32" else None
def log(msg):
"""日志输出"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_msg = f"[{timestamp}] {msg}"
print(log_msg)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(log_msg + "\n")
def load_state():
"""加载状态"""
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
return {"running": True, "processed": [], "errors": [], "started_at": datetime.now().isoformat()}
def save_state(state):
"""保存状态"""
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
def notify_telegram(task_id, status="completed"):
"""发送 Telegram 通知"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return
try:
import urllib.request
import urllib.parse
text = f"✅ Bridge 任务 {status}: {task_id}"
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = urllib.parse.urlencode({"chat_id": TELEGRAM_CHAT_ID, "text": text}).encode()
req = urllib.request.Request(url, data=data)
urllib.request.urlopen(req, timeout=10)
log(f"Telegram 通知已发出: {task_id}")
except Exception as e:
log(f"Telegram 通知失败: {e}")
def compute_file_hash(filepath):
"""计算文件内容 hash"""
with open(filepath, "rb") as f:
return hashlib.sha256(f.read()).hexdigest()[:8]
def process_task(task_file):
"""处理单个任务"""
task_id = task_file.stem
log(f"处理任务: {task_id}")
state = load_state()
try:
# 读取任务内容
content = task_file.read_text(encoding="utf-8")
# 提取目标 (from -> to)
lines = content.split("\n")
from_addr = "main"
to_addr = "claude-code"
for line in lines:
if line.startswith("from:"):
from_addr = line.split(":", 1)[1].strip()
elif line.startswith("to:"):
to_addr = line.split(":", 1)[1].strip()
# 构建 prompt 给 Claude Code
prompt = f"""请处理以下任务并回复:
{content}
回复格式:
---
task_id: {task_id}
from: {to_addr}
to: {from_addr}
status: ok|error
---
只返回任务结果,不要多余解释。"""
# 调用 Claude Code
result = run_claude(prompt)
# 写入回复
reply_file = OUTBOX_DIR / f"reply-{task_id}.md"
reply_file.write_text(result, encoding="utf-8")
# 更新状态
state["processed"].append(task_id)
save_state(state)
# 归档任务
archive_file = ARCHIVE_DIR / task_file.name
shutil.move(str(task_file), str(archive_file))
log(f"任务已完成: {task_id}")
notify_telegram(task_id, "completed")
except Exception as e:
error_msg = str(e)
log(f"任务失败: {task_id} - {error_msg}")
# 记录错误
state["errors"].append({"task": task_id, "error": error_msg})
save_state(state)
# 写入错误回复
reply_file = OUTBOX_DIR / f"reply-{task_id}.md"
reply_file.write_text(f"---\ntask_id: {task_id}\nstatus: error\n---\n\n# 回复\n\n[ERROR] {error_msg}", encoding="utf-8")
# 归档失败任务,防止无限重试
archive_file = ARCHIVE_DIR / task_file.name
if task_file.exists():
shutil.move(str(task_file), str(archive_file))
notify_telegram(task_id, "failed")
def run_claude(prompt):
"""Invoke the coding AI. Routes through Git Bash on Windows; calls directly on Mac/Linux."""
# Strip nested-session markers (keep ANTHROPIC_API_KEY)
env = os.environ.copy()
for k in list(env.keys()):
if "CLAUDE" in k.upper():
del env[k]
if sys.platform == "win32":
# Windows: claude is a bash script — must run via Git Bash.
# Also patch PATH so the wrapper can find sed/dirname/uname.
bash_dir = str(Path(BASH_EXE).parent) # .../Git/usr/bin
git_bin = str(Path(BASH_EXE).parent.parent.parent / "bin") # .../Git/bin
current_path = env.get("PATH", "")
if bash_dir not in current_path:
env["PATH"] = bash_dir + ";" + git_bin + ";" + current_path
# The bash wrapper uses `dirname "$0"` to locate node_modules.
# When called as just "claude", dirname returns ".".
# Set cwd to the npm bin dir so "." resolves correctly.
_claude_which = shutil.which("claude")
npm_bin = (
Path(_claude_which).parent
if _claude_which
else Path.home() / "AppData" / "Roaming" / "npm"
)
cmd = [BASH_EXE, "-c", "claude -p --dangerously-skip-permissions"]
cwd = str(npm_bin)
else:
# Mac / Linux: claude is a native binary, call it directly.
claude_exe = shutil.which("claude") or "claude"
cmd = [claude_exe, "-p", "--dangerously-skip-permissions"]
cwd = None
result = subprocess.run(
cmd,
input=prompt,
capture_output=True,
text=True,
encoding="utf-8",
env=env,
cwd=cwd,
timeout=CODER_TIMEOUT,
)
if result.returncode != 0:
stderr = (result.stderr or "").strip()
raise Exception(f"Claude 调用失败 (exit {result.returncode}): {stderr[:500]}")
output = (result.stdout or "").strip()
if not output:
stderr = (result.stderr or "").strip()
raise Exception(f"Claude 返回空输出. stderr: {stderr[:300]}")
return output
def main():
"""主循环"""
check_single_instance()
log("=" * 50)
log("Claude Bridge 启动")
log("=" * 50)
# 创建必要目录
INBOX_DIR.mkdir(exist_ok=True)
OUTBOX_DIR.mkdir(exist_ok=True)
ARCHIVE_DIR.mkdir(exist_ok=True)
# 扫描存量任务
log("扫描存量任务...")
for task_file in INBOX_DIR.glob("task-*.md"):
process_task(task_file)
log(f"监听目录: {INBOX_DIR}")
log("按 Ctrl+C 停止")
# 主循环
try:
while True:
# 检查新任务
task_files = list(INBOX_DIR.glob("task-*.md"))
if task_files:
for task_file in task_files:
process_task(task_file)
else:
time.sleep(2)
except KeyboardInterrupt:
log("Bridge 已停止")
state = load_state()
state["running"] = False
save_state(state)
finally:
if PID_FILE.exists():
PID_FILE.unlink()
if __name__ == "__main__":
main()