Skip to content

Commit f8a1a8b

Browse files
authored
Kortexa Radio (#222)
1 parent 529bad3 commit f8a1a8b

3 files changed

Lines changed: 188 additions & 0 deletions

File tree

community/kortexa-radio/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Kortexa Radio
2+
3+
![Community](https://img.shields.io/badge/OpenHome-Community-orange?style=flat-square)
4+
![Author](https://img.shields.io/badge/Author-@kortexa--ai-lightgrey?style=flat-square)
5+
6+
## What It Does
7+
Streams AI-generated radio from [radio.kortexa.ai](https://radio.kortexa.ai) to your OpenHome device. Music, DJ announcements, and news segments — all generated in real-time by AI.
8+
9+
## Suggested Trigger Words
10+
- "start radio"
11+
- "kortexa radio"
12+
13+
## Setup
14+
No setup required. The stream is public.
15+
16+
## How It Works
17+
1. Say the trigger word
18+
2. The ability enters music mode and starts streaming
19+
3. Interrupt (wake word or touch) to stop — control returns to the agent
20+
4. Say the trigger word again to restart
21+
22+
## Example Conversation
23+
> **User:** "start radio"
24+
> **AI:** "Tuning in to Kortexa Radio."
25+
> *(AI-generated music plays through the device)*
26+
>
27+
> *(User interrupts)*
28+
> **AI:** *(agent resumes normal conversation)*
29+
>
30+
> **User:** "start radio"
31+
> **AI:** "Tuning in to Kortexa Radio."
32+
> *(Music plays again)*
33+
34+
## Stream Details
35+
- Format: MP3, 128kbps, 48kHz stereo
36+
- Source: https://api.kortexa.ai/radio/stream
37+
- Content: AI-generated music, DJ segments, news updates
38+
- Programming changes by time of day
39+
40+
## Logs
41+
Look for `[KortexaRadio]` entries in OpenHome Live Editor logs.

community/kortexa-radio/__init__.py

Whitespace-only changes.

community/kortexa-radio/main.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import asyncio
2+
import httpx
3+
4+
from src.agent.capability import MatchingCapability
5+
from src.main import AgentWorker
6+
from src.agent.capability_worker import CapabilityWorker
7+
8+
STREAM_URL = "https://api.kortexa.ai/radio/stream"
9+
EVENTS_URL = "https://api.kortexa.ai/radio/events"
10+
CHUNK_SIZE = 25 * 1024
11+
TAG = "[KortexaRadio]"
12+
13+
STOP_WORDS = ["stop", "off", "exit", "quit", "turn it off"]
14+
15+
16+
class KortexaRadioCapability(MatchingCapability):
17+
worker: AgentWorker = None
18+
capability_worker: CapabilityWorker = None
19+
20+
#{{register_capability}}
21+
22+
async def _stream(self):
23+
"""Stream radio audio with pause/stop handling."""
24+
try:
25+
26+
async with httpx.AsyncClient(timeout=None) as client:
27+
async with client.stream("GET", STREAM_URL, follow_redirects=True) as response:
28+
self.worker.editor_logging_handler.info(f"{TAG} Connected, status={response.status_code}")
29+
30+
if response.status_code != 200:
31+
await self.capability_worker.speak("Could not connect to the radio stream.")
32+
return
33+
34+
await self.capability_worker.stream_init()
35+
36+
async for chunk in response.aiter_bytes(chunk_size=CHUNK_SIZE):
37+
if not chunk:
38+
self.worker.editor_logging_handler.info(f"{TAG} No chunk")
39+
continue
40+
41+
if self.worker.music_mode_stop_event.is_set():
42+
self.worker.editor_logging_handler.info(f"{TAG} Stop event, ending stream")
43+
await self.capability_worker.stream_end()
44+
return
45+
46+
while self.worker.music_mode_pause_event.is_set():
47+
await self.worker.session_tasks.sleep(0.1)
48+
49+
await self.capability_worker.send_audio_data_in_stream(chunk)
50+
51+
await self.capability_worker.stream_end()
52+
53+
except asyncio.CancelledError as e:
54+
self.worker.editor_logging_handler.info(f"{TAG} Stream cancelled: {e}")
55+
except Exception as e:
56+
self.worker.editor_logging_handler.error(f"{TAG} Stream error: {e}")
57+
58+
async def _keep_events_connected(self):
59+
"""Stay connected to SSE to register as a listener. Events are not processed."""
60+
try:
61+
while True:
62+
try:
63+
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
64+
async with client.stream("GET", EVENTS_URL) as response:
65+
async for _ in response.aiter_lines():
66+
pass
67+
except httpx.HTTPError as e:
68+
self.worker.editor_logging_handler.error(f"{TAG} SSE error: {e}")
69+
pass
70+
71+
await self.worker.session_tasks.sleep(5)
72+
except asyncio.CancelledError:
73+
self.worker.editor_logging_handler.info(f"{TAG} SSE cancelled")
74+
75+
pass
76+
77+
async def run(self):
78+
"""Auto-start radio, listen for stop command, exit cleanly."""
79+
80+
stream_task = None
81+
events_task = None
82+
83+
try:
84+
await self.capability_worker.speak("Tuning in to Kortexa Radio.")
85+
86+
# Subscribe to events (registers us as a listener)
87+
self.worker.editor_logging_handler.info(f"{TAG} Register as a listener")
88+
events_task = self.worker.session_tasks.create(self._keep_events_connected())
89+
90+
# Turn on music mode
91+
self.worker.editor_logging_handler.info(f"{TAG} Turn on music mode")
92+
self.worker.music_mode_event.set()
93+
await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"})
94+
95+
# Stream audio in background so the loop stays responsive
96+
self.worker.editor_logging_handler.info(f"{TAG} Start streaming")
97+
stream_task = self.worker.session_tasks.create(self._stream())
98+
99+
# Wait for stop command
100+
while not self.worker.music_mode_stop_event.is_set():
101+
msg = await self.capability_worker.user_response()
102+
103+
if self.worker.music_mode_stop_event.is_set():
104+
break
105+
106+
if msg:
107+
normalized = msg.strip().lower()
108+
self.worker.editor_logging_handler.info(f"{TAG} Command: {normalized}")
109+
110+
if any(word in normalized for word in STOP_WORDS):
111+
self.worker.editor_logging_handler.info(f"{TAG} Stop requested")
112+
self.worker.music_mode_stop_event.set()
113+
break
114+
115+
await self.capability_worker.speak("Radio off! Catch you later.")
116+
117+
except Exception as e:
118+
self.worker.editor_logging_handler.error(f"{TAG} Error: {e}")
119+
120+
finally:
121+
self.worker.editor_logging_handler.info(f"{TAG} Clean up")
122+
123+
self.worker.music_mode_stop_event.set()
124+
125+
if stream_task:
126+
stream_task.cancel()
127+
if events_task:
128+
events_task.cancel()
129+
130+
await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"})
131+
self.worker.music_mode_event.clear()
132+
self.worker.music_mode_stop_event.clear()
133+
134+
self.worker.editor_logging_handler.info(f"{TAG} Radio OFF")
135+
136+
await self.worker.session_tasks.sleep(1)
137+
self.capability_worker.resume_normal_flow()
138+
139+
def call(self, worker: AgentWorker):
140+
try:
141+
self.worker = worker
142+
self.capability_worker = CapabilityWorker(self.worker)
143+
self.worker.editor_logging_handler.info(f"{TAG} Radio ON")
144+
145+
self.worker.session_tasks.create(self.run())
146+
except Exception as e:
147+
self.worker.editor_logging_handler.warning(e)

0 commit comments

Comments
 (0)