-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
158 lines (128 loc) · 6.56 KB
/
main.py
File metadata and controls
158 lines (128 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import asyncio
import argparse
import os
import json
from dotenv import load_dotenv
from modules.asr import ASR
from modules.llm.gemini import GeminiProvider
from modules.llm.anthropic import AnthropicProvider
from modules.llm.openai import OpenAIProvider
from modules.tts import TTS
from modules.storage import Storage
# Load environment variables
load_dotenv()
async def pipeline(asr, llm, tts, storage):
"""
Main pipeline: Concurrent ASR -> LLM -> (TTS, Storage)
Allows listening while speaking.
"""
# System Prompt with JSON instruction
SYSTEM_PROMPT = """
You are an idea refinement agent. Respond **only** in JSON.
Format: `{"voice_output": {"text": "..."}, "data_management": {"will_capture": true, "capture_payload": {"filename": "ideas.md", "content": "## Idea\n..."}}}`
**Data Capture Rules:**
1. If an idea is worth saving, include a `data_management` object.
2. **Filename**: Once a filename is chosen for a topic, it is LOCKED. Always reuse the exact same filename. Never propose a new one for existing content.
3. **Content**: Append new details. Do not repeat previously saved details unless they have changed.
Keep voice responses concise and conversational.
"""
messages = []
# Shared state for filename memory
state = {"active_filename": None}
# Queue for decoupled ASR -> LLM processing
input_queue = asyncio.Queue()
async def producer_asr():
"""Continuously listens and pushes text to queue"""
print("[System] ASR Background Task Started")
while True:
# This yields control, allowing consumer to run
text = await asr.listen()
if text:
print(f"[ASR Input]: {text}")
await input_queue.put(text)
if text.lower() in ["exit", "quit", "stop"]:
print("[ASR] Exit command received.")
break
async def consumer_processing():
"""Consumes text, generates response, and speaks"""
print("[System] Processing Task Started")
while True:
# Wait for input
text = await input_queue.get()
if text.lower() in ["exit", "quit", "stop"]:
input_queue.task_done()
break
# Add to history
messages.append({"role": "user", "content": text})
# Dynamic System Prompt
current_prompt = SYSTEM_PROMPT
if state["active_filename"]:
current_prompt += f"\n\nFILENAME LOCKED: The active file is '{state['active_filename']}'. You MUST use this exact filename. Any other filename you propose will be ignored."
print(f"[LLM] Thinking...")
try:
response_data = await llm.generate(current_prompt, messages)
voice_output = response_data.get("voice_output", {})
data_mgmt = response_data.get("data_management", {})
assistant_text = voice_output.get("text", "")
messages.append({"role": "assistant", "content": assistant_text})
# Handle Data Capture
if data_mgmt.get("will_capture"):
payload = data_mgmt.get("capture_payload", {})
proposed_filename = payload.get("filename")
content = payload.get("content")
if content:
if state["active_filename"]:
if proposed_filename and proposed_filename != state["active_filename"]:
print( f"[STORAGE] Ignoring proposed '{proposed_filename}', locked to '{state['active_filename']}'" )
filename = state["active_filename"]
elif proposed_filename:
state["active_filename"] = proposed_filename
filename = proposed_filename
print( f"[STORAGE] Filename locked: '{filename}'" )
else:
filename = None
if filename:
print( f"[STORAGE] Saving to {filename}..." )
asyncio.create_task( storage.save( filename, content ) )
# TTS
if assistant_text:
print(f"AI: {assistant_text}")
# We await here, but producer_asr continues running!
await tts.speak(assistant_text)
except Exception as e:
print(f"[Error] Processing failed: {e}")
finally:
input_queue.task_done()
# Run both tasks concurrently
producer = asyncio.create_task(producer_asr())
consumer = asyncio.create_task(consumer_processing())
# Wait for them to finish (they finish when 'exit' is spoken)
await asyncio.gather(producer, consumer)
def main():
parser = argparse.ArgumentParser(description="Second Brain Voice Assistant")
parser.add_argument("--provider", choices=["gemini", "anthropic", "openai"], default="gemini", help="LLM Provider")
parser.add_argument("--model", type=str, help="Specific model name")
parser.add_argument("--voice-id", type=str, help="ElevenLabs Voice ID")
parser.add_argument("--audio-output-index", type=int, help="Audio Output Device Index")
parser.add_argument("--audio-channels", type=int, help="Audio Channels (1 or 2)")
parser.add_argument("--debug", action="store_true", help="Enable debug logging and audio dump")
args = parser.parse_args()
# Initialize Provider
if args.provider == "gemini":
# Updated default to Flash (faster/cheaper)
llm = GeminiProvider(model_name=args.model if args.model else "gemini-3-flash-preview")
elif args.provider == "anthropic":
llm = AnthropicProvider(model_name=args.model if args.model else "claude-3-opus-20240229")
elif args.provider == "openai":
llm = OpenAIProvider(model_name=args.model if args.model else "gpt-4-turbo-preview")
asr = ASR()
tts = TTS(voice_id=args.voice_id, device_index=args.audio_output_index, channels=args.audio_channels, debug=args.debug)
storage = Storage(base_path="brain")
print(f"Starting Second Brain with {args.provider}...")
try:
asyncio.run(pipeline(asr, llm, tts, storage))
except KeyboardInterrupt:
print("\nExiting...")
tts.close()
if __name__ == "__main__":
main()