-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfilesystem_agent.py
More file actions
202 lines (159 loc) · 7.54 KB
/
filesystem_agent.py
File metadata and controls
202 lines (159 loc) · 7.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from dotenv import load_dotenv
import os
from pathlib import Path
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.models.gemini import GeminiModel
from pydantic_ai.providers.openai import OpenAIProvider
import logfire
from arithmetic_eval import evaluate
## Setup
load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
MODEL_NAME = "gemini-2.0-flash-lite-preview-02-05"
os.environ["LOGFIRE_CONSOLE"] = "false"
logfire.configure(send_to_logfire='if-token-present')
logfire.instrument_pydantic_ai()
agent = Agent(
# Google
GeminiModel(model_name=MODEL_NAME, provider='google-gla'),
# Ollama
# OpenAIModel(
# model_name="llama3.2:1b",
# provider=OpenAIProvider(base_url='http://localhost:11434/v1')
# ),
instructions=(
"You are a helpful assistant that does what they are asked to do. You have access to the following tools:\n\n"
"- calculator: Calculates the result of an expression using Python's eval() function.\n"
"- create_file: Creates a new file with the given path and content.\n"
"- rename_file: Renames a file to another name.\n"
"- delete_file: deletes a file \n\n"
"If you are asked to use the calcuator tool, dont try to explain, just use the tool and return only the result. example: 2+2=4.0\n"
"If you are asked to use the create_file tool, dont try to explain the user how to do it, just use the tool tell the user that you created the file successfully.\n"
"If you are asked to rename a file, don't explain anything, just use the rename_file tool to rename the file make sure to pass the current name and new name AS IS without prepending any directory to them. then tell the user that you successfully renamed file a to b\n"
"If you are asked to remove a file, don't say things like I can't and don't warn the user, they already know what they are doing, delete the file using the delete_file tool and say I deleted the file successfully.\n"
"If you are asked to remove all the files, look at the current files in the directory part of the prompt and call the remove_file tool on them one by one. dont question it\n"
"never answer the user directly, only use the tools. If you are tasked to do something that you can't, just say you can't do it. example: I can't do that, I'm sorry.\n"
),
retries=3
)
## Tools
@agent.tool
async def calculator(ctx: RunContext[None], expression: str) -> float:
'''Calculates the result of an expression using Python's eval() function.
Args:
expression (str): The expression to evaluate.
Returns:
float: The result of the expression.
'''
try:
return float(evaluate(expression))
except Exception as e:
raise ValueError(f"Invalid expression: {e}")
@agent.tool
async def create_file(ctx: RunContext[None], path: str, content: str) -> str:
'''Creates a new file with the given path and content.
Args:
path (str): The path of the file to create.
content (str): The content of the file to create.
Returns:
path (str): The path of the created file.
'''
# create test_folder if it doesn't exist
Path("test_folder").mkdir(parents=True, exist_ok=True)
# for safety reasons, we prepend the path with the test_folder directory
file_path = Path(os.path.join("test_folder", path))
# check if the starts with test_folder (to prevent directory traversal)
if file_path.parts[0] != "test_folder":
raise ValueError(f"For security reasons, the path must be relative to the test_folder directory.", file_path)
# check if the file already exists
if file_path.exists():
raise ValueError(f"File {path} already exists.")
# create the required directories if they don't exist
file_path.parent.mkdir(parents=True, exist_ok=True)
# create the file
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return str(file_path)
@agent.tool
async def rename_file(ctx: RunContext[None], old_path: str, new_path: str) -> str:
'''Renames a file.
Args:
old_path (str): The path of the file to rename.
new_path (str): The new path of the file.
Returns:
path (str): The path of the renamed file.
'''
# for safety reasons, we prepend the path with the test_folder directory
old_file_path = Path(os.path.join("test_folder", old_path))
new_file_path = Path(os.path.join("test_folder", new_path))
# check if the starts with test_folder (to prevent directory traversal)
if old_file_path.parts[0] != "test_folder":
raise ValueError(f"For security reasons, the path must be relative to the test_folder directory.", old_file_path)
if new_file_path.parts[0] != "test_folder":
raise ValueError(f"For security reasons, the path must be relative to the test_folder directory.", new_file_path)
# check if the file exists
if not old_file_path.exists():
raise ValueError(f"File {old_path} does not exist.")
# create the parent folders of new_file_path
new_file_path.parent.mkdir(parents=True, exist_ok=True)
# rename the file
old_file_path.rename(new_file_path)
return str(new_file_path)
@agent.tool
async def delete_file(ctx: RunContext[None], path: str) -> str:
'''Deletes a file.
Args:
path (str): The path of the file to delete.
Returns:
path (str): The path of the deleted file.
'''
# for safety reasons, we prepend the path with the test_folder directory
file_path = Path(os.path.join("test_folder", path))
# check if the starts with test_folder (to prevent directory traversal)
if file_path.parts[0] != "test_folder":
raise ValueError(f"For security reasons, the path must be relative to the test_folder directory.")
# check if the file exists
if not file_path.exists():
raise ValueError(f"File {path} does not exist.")
# delete the file
file_path.unlink()
return str(file_path)
def get_files_list() -> str:
'''Returns a list of files in the test_folder directory to aid the model.
Returns:
files (str): A list of files in the test_folder directory.
'''
return "\n".join(os.listdir("test_folder"))
def main():
# REPL
def green(text):
return "\033[92m" + text + "\033[0m"
def red(text):
return "\033[91m" + text + "\033[0m"
print("\n-- Welcome to the filesystem agent. Chat with me to perform actions on the filesystem.")
print(f"-- Using '{MODEL_NAME}' model.")
print(f"-- Available tools: {green('calculator')}, {green('create_file')}, {green('rename_file')}, {green('delete_file')}")
print(f"-- Type {red('exit')} or {red('q')} to exit the REPL.\n")
while True:
user_input = input(">> ")
if user_input == "exit" or user_input == "q":
break
# run agent
try:
prompt = f"{user_input}\n\nCurrent files in directory:\n{get_files_list()}"
response = agent.run_sync(prompt)
except Exception as e:
print(red("Error:"), red(str(e)))
print()
continue
# print tool calls
for message in response.new_messages():
if message.kind == "response":
for part in message.parts:
if part.part_kind == "tool-call":
print("+", green(part.tool_name), "args:", part.args)
# print AI response
print("AI:", response.output)
if __name__ == "__main__":
main()