Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 7 additions & 28 deletions genesis/group/load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from __future__ import annotations

import asyncio
import sys
from typing import Protocol, List, Optional, Any, Awaitable
from abc import ABC, abstractmethod

Expand All @@ -23,7 +22,7 @@

async def _create_redis_client(url: str = "redis://localhost:6379") -> Any:
"""
Create a Redis async client, installing redis package if needed.
Create a Redis async client.

Internal helper function.

Expand All @@ -34,35 +33,15 @@ async def _create_redis_client(url: str = "redis://localhost:6379") -> Any:
Redis async client instance

Raises:
RuntimeError: If redis package cannot be installed or imported
ImportError: If redis package is not installed
"""
# Import here to handle optional dependency
try:
import redis.asyncio as redis_module
except ImportError:
# Try to install redis automatically
try:
proc = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"pip",
"install",
"redis>=5.0.0",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError(
"Redis package is required for RedisLoadBalancer. "
"Install it with: pip install redis"
)
import redis.asyncio as redis_module
except (OSError, ImportError):
raise RuntimeError(
"Redis package is required for RedisLoadBalancer. "
"Install it with: pip install redis"
)
raise ImportError(
"The redis package is required for RedisLoadBalancer. "
"Install it with: pip install genesis[redis]"
)

return await redis_module.from_url(url)

Expand Down Expand Up @@ -171,7 +150,7 @@ class RedisLoadBalancer:
Redis-based load balancer backend.

Tracks call counts in Redis. Suitable for horizontal scaling.
The redis package is automatically installed when needed.
Requires the redis extra: pip install genesis[redis]

Args:
url: Redis connection URL (default: "redis://localhost:6379")
Expand Down
19 changes: 7 additions & 12 deletions genesis/inbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,13 @@ async def authenticate(self) -> None:
async def start(self) -> None:
"""Initiates an authenticated connection to a freeswitch server."""
try:
try:
with tracer.start_as_current_span(
"inbound_connect",
attributes={
"net.peer.name": self.host,
"net.peer.port": self.port,
},
):
await self._connect()
except Exception as e:
if "tracer" not in str(e).lower():
raise
with tracer.start_as_current_span(
"inbound_connect",
attributes={
"net.peer.name": self.host,
"net.peer.port": self.port,
},
):
await self._connect()
except TimeoutError:
logger.debug("A timeout occurred when trying to connect to the freeswitch.")
Expand Down
6 changes: 5 additions & 1 deletion genesis/protocol/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self):
self.writer: Optional[StreamWriter] = None
self.handlers: Dict[str, List[EventHandler]] = {}
self.channel_registry: Dict[str, List[EventHandler]] = {}
self.handler_tasks: set[Task[Any]] = set()

# Initialize routing strategy (Strategy Pattern)
self.routing_strategy = CompositeRoutingStrategy(
Expand Down Expand Up @@ -100,6 +101,9 @@ async def stop(self) -> None:
self.is_connected = False
await self._cancel_task(self.producer, "event producer")
await self._cancel_task(self.consumer, "event consumer")
for task in list(self.handler_tasks):
await self._cancel_task(task, "handler task")
self.handler_tasks.clear()

async def _cancel_task(
self, task: Optional[Task[Any]], label: str = "task"
Expand Down Expand Up @@ -190,7 +194,7 @@ async def _process_one_event(self, event: ESLEvent) -> None:

handlers, _ = await self.routing_strategy.route(event)
if handlers:
dispatch_to_handlers(handlers, event)
dispatch_to_handlers(handlers, event, self.handler_tasks)

def on(
self,
Expand Down
29 changes: 22 additions & 7 deletions genesis/protocol/routing/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,38 @@
Helper for dispatching events to handlers asynchronously.
"""

from asyncio import create_task, to_thread, iscoroutinefunction
from typing import List
from asyncio import Task, create_task, to_thread, iscoroutinefunction
from typing import List, Optional, Set, Any

from genesis.observability import logger
from genesis.protocol.parser import ESLEvent
from genesis.types import EventHandler


def dispatch_to_handlers(handlers: List[EventHandler], event: ESLEvent) -> None:
"""Dispatch event to all handlers asynchronously (fire-and-forget tasks).
def _handler_done_callback(task_set: Set[Task[Any]], task: Task[Any]) -> None:
task_set.discard(task)
if not task.cancelled() and task.exception() is not None:
logger.error(f"Unhandled exception in event handler: {task.exception()}")


def dispatch_to_handlers(
handlers: List[EventHandler],
event: ESLEvent,
task_set: Optional[Set[Task[Any]]] = None,
) -> None:
"""Dispatch event to all handlers asynchronously.

Args:
handlers: List of event handlers
event: The ESL event to dispatch
task_set: Optional set to track live tasks (prevents GC and logs exceptions)
"""
_tasks: list = []
for handler in handlers:
if iscoroutinefunction(handler):
_tasks.append(create_task(handler(event)))
task = create_task(handler(event))
else:
_tasks.append(create_task(to_thread(handler, event)))
task = create_task(to_thread(handler, event))

if task_set is not None:
task_set.add(task)
task.add_done_callback(lambda t: _handler_done_callback(task_set, t))
10 changes: 0 additions & 10 deletions tests/test_inbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,6 @@ async def test_inbound_client_send_command_error(freeswitch):
await client.send("uptime")


async def test_inbound_tracer_fallback(freeswitch):
async with freeswitch:
with patch(
"genesis.inbound.tracer.start_as_current_span",
side_effect=Exception("Tracer error"),
):
async with Inbound(*freeswitch.address) as client:
assert client.is_connected


async def test_inbound_metrics_error_on_start(freeswitch):
async with freeswitch:
with patch(
Expand Down
Loading
Loading