forked from openhome-dev/abilities
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
69 lines (56 loc) · 2.45 KB
/
main.py
File metadata and controls
69 lines (56 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
import os
from src.agent.capability import MatchingCapability
from src.main import AgentWorker
from src.agent.capability_worker import CapabilityWorker
# =============================================================================
# LOOP TEMPLATE
# For interactive Abilities with multi-turn conversations.
# Pattern: Greet → Loop (Listen → Process → Respond) → Exit on command
#
# Replace the processing logic inside the while loop with your own.
# =============================================================================
# Words that will exit the Ability and return to normal flow
EXIT_WORDS = {"stop", "exit", "quit", "done", "cancel", "bye", "goodbye", "leave"}
class LoopTemplateCapability(MatchingCapability):
worker: AgentWorker = None
capability_worker: CapabilityWorker = None
@classmethod
def register_capability(cls) -> "MatchingCapability":
with open(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
) as file:
data = json.load(file)
return cls(
unique_name=data["unique_name"],
matching_hotwords=data["matching_hotwords"],
)
def call(self, worker: AgentWorker):
self.worker = worker
self.capability_worker = CapabilityWorker(self.worker)
self.worker.session_tasks.create(self.run())
async def run(self):
# Greet the user
await self.capability_worker.speak(
"I'm ready to help. Ask me anything, or say stop when you're done."
)
while True:
# Listen for user input
user_input = await self.capability_worker.user_response()
# Skip empty input
if not user_input:
continue
# Check for exit commands
if any(word in user_input.lower() for word in EXIT_WORDS):
await self.capability_worker.speak("Goodbye!")
break
# --- YOUR LOGIC HERE ---
# Process the input and generate a response.
# This example uses the LLM, but you can do anything:
# call APIs, play audio, run calculations, etc.
response = self.capability_worker.text_to_text_response(
f"Respond in one short sentence: {user_input}"
)
await self.capability_worker.speak(response)
# ALWAYS resume normal flow when the loop ends
self.capability_worker.resume_normal_flow()