forked from livekit/agents
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic_agent.py
More file actions
136 lines (110 loc) ยท 4.67 KB
/
basic_agent.py
File metadata and controls
136 lines (110 loc) ยท 4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
from collections.abc import AsyncIterable
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
NOT_GIVEN,
Agent,
AgentFalseInterruptionEvent,
AgentSession,
JobContext,
JobProcess,
MetricsCollectedEvent,
ModelSettings,
RoomInputOptions,
RoomOutputOptions,
RunContext,
WorkerOptions,
cli,
metrics,
)
from livekit.agents.llm import function_tool
from livekit.agents.voice.transcription.filters import filter_markdown
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
# uncomment to enable Krisp background voice/noise cancellation
# from livekit.plugins import noise_cancellation
logger = logging.getLogger("basic-agent")
load_dotenv()
class MyAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="Your name is Kelly. You would interact with users via voice."
"with that in mind keep your responses concise and to the point."
"do not use emojis, asterisks, markdown, or other special characters in your responses."
"You are curious and friendly, and have a sense of humor.",
)
async def on_enter(self):
# when the agent is added to the session, it'll generate a reply
# according to its instructions
self.session.generate_reply()
async def tts_node(
self, text: AsyncIterable[str], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
# TTS node allows us to process the text before it's sent to the model
# here we'll strip out markdown
filtered_text = filter_markdown(text)
return super().tts_node(filtered_text, model_settings)
# all functions annotated with @function_tool will be passed to the LLM when this
# agent is active
@function_tool
async def lookup_weather(
self, context: RunContext, location: str, latitude: str, longitude: str
):
"""Called when the user asks for weather related information.
Ensure the user's location (city or region) is provided.
When given a location, please estimate the latitude and longitude of the location and
do not ask the user for them.
Args:
location: The location they are asking for
latitude: The latitude of the location, do not ask user for it
longitude: The longitude of the location, do not ask user for it
"""
logger.info(f"Looking up weather for {location}")
return "sunny with a temperature of 70 degrees."
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
# each log entry will include these fields
ctx.log_context_fields = {
"room": ctx.room.name,
}
session = AgentSession(
vad=ctx.proc.userdata["vad"],
# any combination of STT, LLM, TTS, or realtime API can be used
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3", language="multi"),
tts=openai.TTS(voice="ash"),
# allow the LLM to generate a response while waiting for the end of turn
preemptive_generation=True,
# use LiveKit's turn detection model
turn_detection=MultilingualModel(),
)
# log metrics as they are emitted, and total usage after session is over
usage_collector = metrics.UsageCollector()
# sometimes background noise could interrupt the agent session, these are considered false positive interruptions
# when it's detected, you may resume the agent's speech
@session.on("agent_false_interruption")
def _on_agent_false_interruption(ev: AgentFalseInterruptionEvent):
logger.info("false positive interruption, resuming")
session.generate_reply(instructions=ev.extra_instructions or NOT_GIVEN)
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: {summary}")
# shutdown callbacks are triggered when the session is over
ctx.add_shutdown_callback(log_usage)
await session.start(
agent=MyAgent(),
room=ctx.room,
room_input_options=RoomInputOptions(
# uncomment to enable Krisp BVC noise cancellation
# noise_cancellation=noise_cancellation.BVC(),
),
room_output_options=RoomOutputOptions(transcription_enabled=True),
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))