-
Notifications
You must be signed in to change notification settings - Fork 296
Expand file tree
/
Copy pathbackground.py
More file actions
37 lines (29 loc) · 1.54 KB
/
background.py
File metadata and controls
37 lines (29 loc) · 1.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
from src.agent.capability import MatchingCapability
from src.main import AgentWorker
from src.agent.capability_worker import CapabilityWorker
from time import time
class BackgroundCapabilityBackground(MatchingCapability):
worker: AgentWorker = None
capability_worker: CapabilityWorker = None
background_daemon_mode: bool = False
# Do not change following tag of register capability
#{{register capability}}
async def first_function(self):
self.worker.editor_logging_handler.info("%s: Background Called"%time())
while True:
self.worker.editor_logging_handler.info("%s: background watching"%time())
message_history = self.capability_worker.get_full_message_history()[-10:]
for message in message_history:
self.worker.editor_logging_handler.info("Role: %s, Message: %s"%(message.get("role",""), message.get("content","")))
# await self.capability_worker.speak("watching")
# await self.capability_worker.play_from_audio_file("alarm.mp3")
await self.worker.session_tasks.sleep(20.0)
# Resume the normal workflow
self.capability_worker.resume_normal_flow()
def call(self, worker: AgentWorker, background_daemon_mode: bool):
# Initialize the worker and capability worker
self.worker = worker
self.background_daemon_mode = background_daemon_mode
self.capability_worker = CapabilityWorker(self.worker)
self.worker.session_tasks.create(self.first_function())