-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
238 lines (198 loc) · 10.6 KB
/
agent.py
File metadata and controls
238 lines (198 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""
Core autonomous agent using PydanticAI tools for filesystem interaction.
"""
import os
import asyncio
import uuid
from typing import Optional
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
@dataclass
class AgentDeps:
"""Dependencies for the tools"""
base_path: str # The directory of the app being built
class CodeGenAgent:
"""Autonomous agent that uses tools to build web applications"""
SYSTEM_PROMPT = """You are an autonomous web developer.
Your goal is to build a functional, beautiful Web Application (not just a JSON API) in the provided directory.
THOUGHT PROCESS:
Start your response with a concise thinking process wrapped in <thought> tags (e.g., <thought>I need to create the database schema first.</thought>). This helps the user understand your reasoning.
WORKFLOW:
1. EXPLORE: Check the directory. If files exist, you are MODIFYING an existing app. Do not overwrite unless asked.
2. PLAN: Write out the DB schema and the UI pages needed.
3. BUILD:
- Run `uv init --app --no-workspace` in the app directory to create a standalone `pyproject.toml`.
- Use `uv add fastapi uvicorn sqlalchemy jinja2 python-multipart` (and any others needed) to manage dependencies via `pyproject.toml`.
- Use `make_directory` to create folders like `static/` or `templates/`.
- `models.py`: SQLAlchemy models.
- `database.py`: SQLite connection and SessionLocal setup.
- `templates/`: Jinja2 HTML templates. Use Tailwind CSS v4.
- `main.py`: FastAPI routes that return `Jinja2Templates.TemplateResponse`. THIS MUST BE IN THE ROOT of the app directory.
4. VERIFY: Run `python -m py_compile main.py` or similar to check for syntax errors.
CRITICAL RULES:
- DEPENDENCIES: Do NOT use `requirements.txt`. Use `uv add` commands to install dependencies into the app's `pyproject.toml`.
- WEB APP, NOT API: Your routes MUST return HTML using templates.
- TEMPLATES: Use Jinja2 syntax (e.g., `{% if ... %}`, `{{ variable }}`). DO NOT use Mako/Perl syntax (e.g., `% if ...`).
- FORM INPUTS: Use `Form(...)` parameters in your routes for POST requests. DO NOT try to use SQLAlchemy models as Pydantic request bodies.
- NO NEW FOLDERS: Do NOT create a subfolder for the app (e.g., `apps/todo/todo_app`). Put `main.py` directly in the base path provided.
- SCOPE: Build the simplest possible version (MVP). Do NOT add User Authentication unless explicitly asked.
- DB LINKING: You MUST import Base from database.py into models.py (do not create a new Base).
- COMPLETE: Ensure `init_db()` is called on startup to create tables.
If you encounter an error (e.g., 'Address already in use'), ignore it and focus on ensuring the CODE in the files is correct and complete.
"""
def __init__(self, model_str: str = "mistral:mistral-small-latest"):
"""Initialize agent with a model string like 'openai:gpt-4o'"""
self.agent = Agent(
model_str,
deps_type=AgentDeps,
system_prompt=self.SYSTEM_PROMPT,
)
# Conversation history and stats
self.history = []
self.total_usage = {"input": 0, "output": 0}
# Logging callback
self.log_callback = None
# Register tools
self._register_tools()
def set_log_callback(self, callback):
"""Set a callback for logging agent actions"""
self.log_callback = callback
def log(self, message: str):
"""Log a message using callback or print"""
if self.log_callback:
self.log_callback(message)
else:
print(message, flush=True)
def _register_tools(self):
"""Define the 'Hands' of the agent"""
@self.agent.tool
async def list_files(ctx: RunContext[AgentDeps], path: str = ".") -> str:
"""List files and directories in a path"""
self.log(f"📂 Agent is listing files in: {path}")
full_path = os.path.join(ctx.deps.base_path, path)
try:
if not os.path.exists(full_path):
return "Directory does not exist."
items = os.listdir(full_path)
return "\n".join(items) if items else "Directory is empty"
except Exception as e:
return f"Error: {str(e)}"
@self.agent.tool
async def read_file(ctx: RunContext[AgentDeps], filename: str) -> str:
"""Read the content of a file"""
self.log(f"📖 Agent is reading file: {filename}")
full_path = os.path.join(ctx.deps.base_path, filename)
try:
with open(full_path, 'r') as f:
return f.read()
except Exception as e:
return f"Error reading {filename}: {str(e)}"
@self.agent.tool
async def make_directory(ctx: RunContext[AgentDeps], path: str) -> str:
"""Create a directory (and any parent directories)"""
self.log(f"📂 Agent is creating directory: {path}")
full_path = os.path.join(ctx.deps.base_path, path)
try:
os.makedirs(full_path, exist_ok=True)
return f"Successfully created directory {path}"
except Exception as e:
return f"Error creating directory {path}: {str(e)}"
@self.agent.tool
async def write_file(ctx: RunContext[AgentDeps], filename: str, content: str) -> str:
"""Write content to a file (creates parent directories if needed)"""
self.log(f"✍️ Agent is writing file: {filename}")
full_path = os.path.join(ctx.deps.base_path, filename)
try:
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w') as f:
f.write(content)
return f"Successfully wrote to {filename}"
except Exception as e:
return f"Error writing {filename}: {str(e)}"
@self.agent.tool
async def search_files(ctx: RunContext[AgentDeps], pattern: str, path: str = ".") -> str:
"""Search for a pattern (grep) in the workspace"""
self.log(f"🔍 Agent is searching for '{pattern}' in {path}")
full_path = os.path.join(ctx.deps.base_path, path)
try:
# Use grep -r for searching, async
process = await asyncio.create_subprocess_exec(
"grep", "-r", pattern, ".",
cwd=full_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await asyncio.wait_for(process.communicate(), timeout=10)
output = stdout.decode()
return output if output else "No matches found."
except asyncio.TimeoutError:
return "Search timed out."
except Exception as e:
return f"Search failed: {str(e)}"
@self.agent.tool
async def execute_command(ctx: RunContext[AgentDeps], command: str) -> str:
"""Execute a shell command in the app directory and return output"""
self.log(f"💻 Agent is executing command: {command}")
try:
# Use asyncio for non-blocking execution
process = await asyncio.create_subprocess_shell(
command,
cwd=ctx.deps.base_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30)
output = f"STDOUT:\n{stdout.decode()}\nSTDERR:\n{stderr.decode()}"
return output
except asyncio.TimeoutError:
return "Command execution timed out after 30 seconds."
except Exception as e:
return f"Command execution failed: {str(e)}"
async def get_suggested_name(self, prompt: str) -> str:
"""Ask the LLM for a suitable one-word folder name for the app"""
self.log("🤖 Agent is suggesting an app name...")
# Prepare a suffix
suffix = uuid.uuid4().hex[:3]
try:
# We use the internal agent to generate a name
result = await self.agent.run(
f"Suggest a single, concise, lowercase alphanumeric word (e.g. 'todo', 'inventory', 'finance') to use as a folder name for this request: '{prompt}'. Output ONLY the word, no punctuation or explanation.",
deps=AgentDeps(base_path=".")
)
# Safe attribute access for different PydanticAI versions/result types
raw_output = str(getattr(result, 'data', getattr(result, 'output', result))).strip()
self.log(f"📝 Raw naming response: {raw_output}")
# 1. Strip out thought tags if they exist
import re
clean_output = re.sub(r'<thought>.*?</thought>', '', raw_output, flags=re.DOTALL).strip().lower()
# 2. Extract just the first word in case it gave an explanation
first_word = clean_output.split()[0] if clean_output else ""
# 3. Clean to only alphanumeric
clean_name = "".join(c for c in first_word if c.isalnum())
# Combine with our suffix
final_name = f"{clean_name}_{suffix}" if clean_name else f"app_{suffix}"
return final_name
except Exception as e:
self.log(f"⚠️ Naming failed: {e}")
return f"app_{suffix}"
async def run_task(self, prompt: str, app_path: str):
"""Run the agent on a specific task within an app directory"""
deps = AgentDeps(base_path=app_path)
# Run with history
result = await self.agent.run(prompt, deps=deps, message_history=self.history)
# Update history and usage
self.history = result.all_messages()
self.total_usage["input"] += result.usage().request_tokens or 0
self.total_usage["output"] += result.usage().response_tokens or 0
# Safe attribute access for different PydanticAI versions
return getattr(result, 'data', getattr(result, 'output', str(result)))
def get_context_usage(self) -> tuple[int, int]:
"""Return (current_tokens, max_tokens)"""
total = self.total_usage["input"] + self.total_usage["output"]
return total, 32000
def get_message_count(self) -> int:
"""Return number of messages in history"""
return len(self.history)
def clear_history(self):
"""Reset the conversation history"""
self.history = []