forked from gastownhall/beads
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
executable file
·209 lines (163 loc) · 6.75 KB
/
agent.py
File metadata and controls
executable file
·209 lines (163 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/env python3
"""
Simple AI agent workflow using bd (Beads issue tracker).
This demonstrates how an agent can:
1. Find ready work
2. Claim and execute tasks
3. Discover new issues during work
4. Link discoveries back to parent tasks
5. Complete work and move on
6. Coordinate with other agents via Agent Mail (optional)
"""
import json
import subprocess
import sys
import os
from pathlib import Path
from typing import Optional
# Add lib directory to path for beads_mail_adapter
lib_path = Path(__file__).parent.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
from beads_mail_adapter import AgentMailAdapter
class BeadsAgent:
"""Simple agent that manages tasks using bd."""
def __init__(self):
self.current_task = None
self.mail = AgentMailAdapter()
if self.mail.enabled:
print(f"📬 Agent Mail enabled (agent: {self.mail.agent_name})")
else:
print("📭 Agent Mail disabled (Beads-only mode)")
def run_bd(self, *args) -> dict:
"""Run bd command and parse JSON output."""
cmd = ["bd"] + list(args) + ["--json"]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
if result.stdout.strip():
return json.loads(result.stdout)
return {}
def find_ready_work(self) -> Optional[dict]:
"""Find the highest priority ready work.
Integration Point 1: Check inbox before finding work.
"""
# Check inbox for notifications from other agents
messages = self.mail.check_inbox()
if messages:
print(f"📨 Received {len(messages)} messages:")
for msg in messages:
event_type = msg.get("event_type", "unknown")
payload = msg.get("payload", {})
from_agent = msg.get("from_agent", "unknown")
print(f" • {event_type} from {from_agent}: {payload}")
ready = self.run_bd("ready", "--limit", "1")
if isinstance(ready, list) and len(ready) > 0:
return ready[0]
return None
def claim_task(self, issue_id: str) -> dict:
"""Claim a task by setting status to in_progress.
Integration Point 2: Reserve issue before claiming.
Integration Point 3: Notify other agents of status change.
"""
# Reserve the issue to prevent conflicts with other agents
if not self.mail.reserve_issue(issue_id):
print(f"⚠️ Failed to reserve {issue_id} - already claimed by another agent")
return {}
print(f"📋 Claiming task: {issue_id}")
result = self.run_bd("update", issue_id, "--status", "in_progress")
# Notify other agents of status change
self.mail.notify("status_changed", {
"issue_id": issue_id,
"status": "in_progress",
"agent": self.mail.agent_name
})
return result
def create_issue(self, title: str, description: str = "",
priority: int = 2, issue_type: str = "task") -> dict:
"""Create a new issue."""
print(f"✨ Creating issue: {title}")
args = ["create", title, "-p", str(priority), "-t", issue_type]
if description:
args.extend(["-d", description])
return self.run_bd(*args)
def link_discovery(self, discovered_id: str, parent_id: str):
"""Link a discovered issue back to its parent."""
print(f"🔗 Linking {discovered_id} ← discovered-from ← {parent_id}")
subprocess.run(
["bd", "dep", "add", discovered_id, parent_id, "--type", "discovered-from"],
check=True
)
def complete_task(self, issue_id: str, reason: str = "Completed"):
"""Mark task as complete.
Integration Point 4: Release reservation and notify completion.
"""
print(f"✅ Completing task: {issue_id} - {reason}")
result = self.run_bd("close", issue_id, "--reason", reason)
# Notify other agents of completion
self.mail.notify("issue_completed", {
"issue_id": issue_id,
"reason": reason,
"agent": self.mail.agent_name
})
# Release the reservation
self.mail.release_issue(issue_id)
return result
def simulate_work(self, issue: dict) -> bool:
"""Simulate doing work on an issue.
In a real agent, this would call an LLM, execute code, etc.
Returns True if work discovered new issues.
"""
issue_id = issue["id"]
title = issue["title"]
print(f"\n🤖 Working on: {title} ({issue_id})")
print(f" Priority: {issue['priority']}, Type: {issue['issue_type']}")
# Simulate discovering a bug while working
if "implement" in title.lower() or "add" in title.lower():
print("\n💡 Discovered: Missing test coverage for this feature")
new_issue = self.create_issue(
f"Add tests for {title}",
description=f"While implementing {issue_id}, noticed missing tests",
priority=1,
issue_type="task"
)
self.link_discovery(new_issue["id"], issue_id)
return True
return False
def run_once(self) -> bool:
"""Execute one work cycle. Returns True if work was found."""
# Find ready work
issue = self.find_ready_work()
if not issue:
print("📭 No ready work found.")
return False
# Claim the task
self.claim_task(issue["id"])
# Do the work (simulated)
discovered_new_work = self.simulate_work(issue)
# Complete the task
self.complete_task(issue["id"], "Implemented successfully")
if discovered_new_work:
print("\n🔄 New work discovered and linked. Running another cycle...")
return True
def run(self, max_iterations: int = 10):
"""Run the agent for multiple iterations."""
print("🚀 Beads Agent starting...\n")
for i in range(max_iterations):
print(f"\n{'='*60}")
print(f"Iteration {i+1}/{max_iterations}")
print(f"{'='*60}")
if not self.run_once():
break
print("\n✨ Agent finished!")
def main():
"""Main entry point."""
try:
agent = BeadsAgent()
agent.run()
except subprocess.CalledProcessError as e:
print(f"Error running bd: {e}", file=sys.stderr)
print(f"Make sure bd is installed: go install github.com/steveyegge/beads/cmd/bd@latest")
sys.exit(1)
except KeyboardInterrupt:
print("\n\n👋 Agent interrupted by user")
sys.exit(0)
if __name__ == "__main__":
main()