-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_suite.py
More file actions
293 lines (233 loc) · 10.5 KB
/
test_suite.py
File metadata and controls
293 lines (233 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import asyncio
import argparse
import os
from dotenv import load_dotenv
from modules.asr import ASR
from modules.tts import TTS
from modules.storage import Storage
import json
# Load environment variables
load_dotenv()
def get_llm(provider="gemini"):
if provider == "gemini":
from modules.llm.gemini import GeminiProvider
return GeminiProvider()
elif provider == "anthropic":
from modules.llm.anthropic import AnthropicProvider
return AnthropicProvider()
elif provider == "openai":
from modules.llm.openai import OpenAIProvider
return OpenAIProvider()
return None
async def test_single_turn(provider="gemini"):
"""Test 1: Single Turn Full Loop (Mic -> AI -> TTS)"""
print(f"\n--- Single Turn Loop ({provider}) ---")
llm = get_llm(provider)
asr = ASR()
tts = TTS()
# Shared System Prompt
# Shared System Prompt
SYSTEM_PROMPT = """
You are an idea refinement agent. Respond **only** in JSON.
Format: `{"voice_output": {"text": "..."}, "data_management": {"will_capture": true, "capture_payload": {"filename": "ideas.md", "content": "## Idea\n..."}}}`
**Data Capture Rules:**
1. If an idea is worth saving, include a `data_management` object.
2. **Memory**: You must remember the filename you used previously for a topic. Do not change it unless the user explicitly asks to start a new file.
3. **Content**: Append new details. Do not repeat previously saved details unless they have changed.
Keep voice responses concise and conversational.
"""
async def test_single_turn(provider="gemini"):
"""Test 1: Single Turn Full Loop (Mic -> AI -> TTS)"""
print(f"\n--- Single Turn Loop ({provider}) ---")
llm = get_llm(provider)
asr = ASR()
tts = TTS()
storage = Storage(base_path="brain") # Save to 'brain' directory
input("Press Enter to start listening (say 'Hello')...")
text = await asr.listen()
print(f"User: {text}")
if not text:
print("No speech detected.")
return
response = await llm.generate(SYSTEM_PROMPT, [{"role": "user", "content": text}])
voice_text = response.get("voice_output", {}).get("text", "")
# Handle Data Capture
data_mgmt = response.get("data_management", {})
if data_mgmt.get("will_capture"):
payload = data_mgmt.get("capture_payload", {})
filename = payload.get("filename")
content = payload.get("content")
if filename and content:
print(f"[STORAGE] Saving to {filename}...")
await storage.save(filename, content)
print(f"AI Response Text: {voice_text}")
if voice_text:
print("Streaming to TTS...")
await tts.speak(voice_text)
tts.close()
async def test_continuous_loop(provider="gemini"):
"""Test 2: Continuous Loop (Mic -> AI -> TTS -> Repeat)"""
print(f"\n--- Continuous Conversation ({provider}) ---")
print("Say 'exit', 'quit', or 'stop' to end.")
llm = get_llm(provider)
asr = ASR()
tts = TTS()
storage = Storage(base_path="brain")
history = []
active_filename = None # Memory for filename
try:
while True:
# ASR
print("\nListening...")
text = await asr.listen()
if not text:
continue
print(f"User: {text}")
if text.lower() in ["exit", "quit", "stop"]:
print("Exiting loop.")
break
history.append({"role": "user", "content": text})
# Dynamic System Prompt to enforce filename
current_prompt = SYSTEM_PROMPT
if active_filename:
current_prompt += f"\n\nIMPORTANT: You MUST continue appending to the file: '{active_filename}'. Do NOT change the filename."
# LLM
print("Thinking...")
response = await llm.generate(current_prompt, history)
voice_text = response.get("voice_output", {}).get("text", "")
# Handle Data Capture
data_mgmt = response.get("data_management", {})
if data_mgmt.get("will_capture"):
payload = data_mgmt.get("capture_payload", {})
# Logic: If active_filename is set, FORCE it. If not, set it from payload.
proposed_filename = payload.get("filename")
content = payload.get("content")
if content:
if active_filename:
# Override LLM's filename with our strict memory
filename = active_filename
elif proposed_filename:
# First time setting it
active_filename = proposed_filename
filename = active_filename
else:
filename = None
if filename:
print(f"[STORAGE] Saving to {filename}...")
await storage.save(filename, content)
# Update history
history.append({"role": "model", "content": voice_text})
print(f"AI: {voice_text}")
# TTS
if voice_text:
await tts.speak(voice_text)
except KeyboardInterrupt:
print("\nInterrupted.")
finally:
tts.close()
async def test_concurrent_loop(provider="gemini"):
"""Test 3: Concurrent Duplex Loop (Mic listens WHILE AI speaks)"""
print(f"\n--- Concurrent Duplex Conversation ({provider}) ---")
print("Say 'exit', 'quit', or 'stop' to end.")
print("NOTE: You can speak while the AI is speaking.")
llm = get_llm(provider)
asr = ASR()
tts = TTS()
storage = Storage(base_path="brain")
# Queue for decoupled ASR -> LLM processing
input_queue = asyncio.Queue()
history = []
# Shared state for filename memory (needs to be mutable or object to access in closure if needed, but here simple local var in outer scope works if passed/accessed correctly)
# Actually, needs to be in consumer scope or shared dict
state = {"active_filename": None}
async def producer_asr():
"""Continuously listens and pushes text to queue"""
print("[System] ASR Background Task Started")
while True:
# This yields control, allowing consumer to run
text = await asr.listen()
if text:
print(f"\n[ASR Input]: {text}")
await input_queue.put(text)
if text.lower() in ["exit", "quit", "stop"]:
print("[ASR] Exit command received.")
break
async def consumer_processing():
"""Consumes text, generates response, and speaks"""
print("[System] Processing Task Started")
while True:
# Wait for input
text = await input_queue.get()
if text.lower() in ["exit", "quit", "stop"]:
input_queue.task_done()
break
# Add to history
history.append({"role": "user", "content": text})
# Dynamic System Prompt
current_prompt = SYSTEM_PROMPT
if state["active_filename"]:
current_prompt += f"\n\nIMPORTANT: You MUST continue appending to the file: '{state['active_filename']}'. Do NOT change the filename."
print(f"[LLM] Thinking...")
try:
response = await llm.generate(current_prompt, history)
voice_text = response.get("voice_output", {}).get("text", "")
# Handle Data Capture
data_mgmt = response.get("data_management", {})
if data_mgmt.get("will_capture"):
payload = data_mgmt.get("capture_payload", {})
proposed_filename = payload.get("filename")
content = payload.get("content")
if content:
if state["active_filename"]:
filename = state["active_filename"]
elif proposed_filename:
state["active_filename"] = proposed_filename
filename = proposed_filename
else:
filename = None
if filename:
print(f"[STORAGE] Saving to {filename}...")
asyncio.create_task(storage.save(filename, content))
# Update history
history.append({"role": "model", "content": voice_text})
# TTS
if voice_text:
print(f"AI: {voice_text}")
# We await here, but producer_asr continues running!
await tts.speak(voice_text)
except Exception as e:
print(f"[Error] Processing failed: {e}")
finally:
input_queue.task_done()
# Run both tasks concurrently
producer = asyncio.create_task(producer_asr())
consumer = asyncio.create_task(consumer_processing())
# Wait for them to finish (they finish when 'exit' is spoken)
await asyncio.gather(producer, consumer)
tts.close()
async def main():
parser = argparse.ArgumentParser(description="Second Brain Test Suite")
parser.add_argument("--provider", default="gemini", choices=["gemini", "anthropic", "openai"])
args = parser.parse_args()
while True:
print("\n--- TEST MENU ---")
print("1. Single Turn Loop (Mic -> AI -> TTS)")
print("2. Continuous Conversation Loop (Sequential)")
print("3. Concurrent Duplex Loop (Simultaneous Listen/Speak)")
print("q. Quit")
choice = input("Select an option: ").strip().lower()
if choice == '1':
await test_single_turn(args.provider)
elif choice == '2':
await test_continuous_loop(args.provider)
elif choice == '3':
await test_concurrent_loop(args.provider)
elif choice == 'q':
break
else:
print("Invalid choice.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass