Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 245 additions & 1 deletion tests/birdnetpi/audio/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,29 @@ async def test_websocket_handler_audio_path(self, audio_websocket_service):
mock_request = MagicMock(spec=Request)
mock_request.path = "/ws/audio"
mock_websocket.request = mock_request
mock_websocket.__aiter__.return_value = [].__iter__()

# Track whether client was added during execution
was_added = False

# Create actual async generator
async def mock_message_generator():
# Wait briefly to allow handler to add client
await asyncio.sleep(0.01)
# Check if added while the generator is active
nonlocal was_added
if mock_websocket in audio_websocket_service._audio_clients:
was_added = True
# Empty generator - no messages
return
yield # Unreachable but makes this a generator

# Configure AsyncMock to return our generator when iterated
mock_websocket.__aiter__ = lambda self: mock_message_generator()

await audio_websocket_service._websocket_handler(mock_websocket)

# Verify the client lifecycle
assert was_added, "Websocket should have been added to _audio_clients during execution"
assert mock_websocket not in audio_websocket_service._audio_clients

@pytest.mark.asyncio
Expand Down Expand Up @@ -149,3 +170,226 @@ async def test_wait_for_shutdown(self, audio_websocket_service):
await asyncio.sleep(0.01)
audio_websocket_service._shutdown_flag = True
await asyncio.wait_for(wait_task, timeout=0.1)

@pytest.mark.asyncio
async def test_extract_websocket_path_with_request_line_fallback(self, audio_websocket_service):
"""Should extract path from request line when path attribute not available."""
mock_websocket = AsyncMock(spec=ServerConnection)

# Create a simple object that has a string representation but no path attribute
class MockRequest:
def __str__(self):
return "GET /ws/audio HTTP/1.1"

mock_websocket.request = MockRequest()

path = await audio_websocket_service._extract_websocket_path(mock_websocket)
assert path == "/ws/audio"

@pytest.mark.asyncio
async def test_extract_websocket_path_defaults_to_root_on_error(self, audio_websocket_service):
"""Should default to root path when extraction fails."""
mock_websocket = AsyncMock(spec=ServerConnection)

# Create a request with malformed string representation
class MockRequest:
def __str__(self):
return "malformed"

mock_websocket.request = MockRequest()

path = await audio_websocket_service._extract_websocket_path(mock_websocket)
assert path == "/"

@pytest.mark.asyncio
async def test_extract_websocket_path_handles_exception(self, audio_websocket_service):
"""Should handle exceptions during path extraction and default to root."""
mock_websocket = AsyncMock(spec=ServerConnection)
mock_websocket.request = None # Will cause AttributeError

path = await audio_websocket_service._extract_websocket_path(mock_websocket)
assert path == "/"

@pytest.mark.asyncio
async def test_broadcast_audio_data_handles_general_exception(self, audio_websocket_service):
"""Should handle general exceptions during broadcast."""
mock_client = AsyncMock(spec=ServerConnection)
mock_client.send.side_effect = RuntimeError("Test error")
audio_websocket_service._audio_clients.add(mock_client)
audio_data = b"test_data"

# Should not raise, but log the error
await audio_websocket_service._broadcast_audio_data(audio_data)

@pytest.mark.asyncio
async def test_broadcast_audio_data_with_no_clients(self, audio_websocket_service):
"""Should handle broadcast when no clients are connected."""
audio_data = b"test_data"
# No clients added, should not raise
await audio_websocket_service._broadcast_audio_data(audio_data)

@pytest.mark.asyncio
async def test_fifo_reading_loop_without_fifo_fd(self, audio_websocket_service):
"""Should handle FIFO reading loop when FIFO is not open."""
audio_websocket_service._fifo_livestream_fd = None
audio_websocket_service._shutdown_flag = False

# Run one iteration and then shut down
async def delayed_shutdown():
await asyncio.sleep(0.05)
audio_websocket_service._shutdown_flag = True

shutdown_task = asyncio.create_task(delayed_shutdown())
await audio_websocket_service._fifo_reading_loop()
await shutdown_task

@pytest.mark.asyncio
async def test_fifo_reading_loop_blocking_io_error(self, audio_websocket_service):
"""Should handle BlockingIOError gracefully."""
audio_websocket_service._fifo_livestream_fd = 123
audio_websocket_service._shutdown_flag = False

read_count = 0

def mock_read(fd, size):
nonlocal read_count
read_count += 1
if read_count == 1:
raise BlockingIOError
audio_websocket_service._shutdown_flag = True
return b""

with patch("birdnetpi.audio.websocket.os.read", side_effect=mock_read):
await audio_websocket_service._fifo_reading_loop()

assert read_count == 2

@pytest.mark.asyncio
async def test_fifo_reading_loop_general_exception(self, audio_websocket_service):
"""Should handle general exceptions during FIFO reading."""
audio_websocket_service._fifo_livestream_fd = 123
audio_websocket_service._shutdown_flag = False

read_count = 0

def mock_read(fd, size):
nonlocal read_count
read_count += 1
if read_count == 1:
raise RuntimeError("Test error")
audio_websocket_service._shutdown_flag = True
return b""

with patch("birdnetpi.audio.websocket.os.read", side_effect=mock_read):
await audio_websocket_service._fifo_reading_loop()

assert read_count == 2

@pytest.mark.asyncio
async def test_fifo_reading_loop_broadcasts_when_clients_connected(
self, audio_websocket_service
):
"""Should broadcast audio data when clients are connected."""
audio_websocket_service._fifo_livestream_fd = 123
audio_websocket_service._shutdown_flag = False
mock_client = AsyncMock(spec=ServerConnection)
audio_websocket_service._audio_clients.add(mock_client)

read_count = 0

def mock_read(fd, size):
nonlocal read_count
read_count += 1
if read_count == 1:
return b"audio_data"
audio_websocket_service._shutdown_flag = True
return b""

with patch("birdnetpi.audio.websocket.os.read", side_effect=mock_read):
await audio_websocket_service._fifo_reading_loop()

# Verify audio was broadcast to the client
assert mock_client.send.call_count == 1

@pytest.mark.asyncio
async def test_start_general_exception(self, audio_websocket_service):
"""Should handle general exceptions during start."""
with patch("birdnetpi.audio.websocket.os.open", side_effect=RuntimeError("Test error")):
with pytest.raises(RuntimeError, match="Test error"):
await audio_websocket_service.start()

@pytest.mark.asyncio
async def test_handle_audio_websocket_connection_closed(self, audio_websocket_service):
"""Should handle connection closed exception gracefully."""
mock_websocket = AsyncMock(spec=ServerConnection)

# Configure the async iterator to raise ConnectionClosed
mock_websocket.__aiter__.return_value.__anext__.side_effect = (
websockets.exceptions.ConnectionClosed(None, None)
)

await audio_websocket_service._handle_audio_websocket(mock_websocket)

assert mock_websocket not in audio_websocket_service._audio_clients

@pytest.mark.asyncio
async def test_signal_handler(self, audio_websocket_service):
"""Should set shutdown flag when signal received."""
assert audio_websocket_service._shutdown_flag is False
audio_websocket_service._signal_handler(15, None)
assert audio_websocket_service._shutdown_flag is True

@pytest.mark.asyncio
async def test_cleanup_fifo_and_service_without_resources(self, audio_websocket_service):
"""Should handle cleanup when no resources are allocated."""
# Both fd and server are None
audio_websocket_service._fifo_livestream_fd = None
audio_websocket_service._websocket_server = None

# Should not raise
audio_websocket_service._cleanup_fifo_and_service()

@pytest.mark.asyncio
async def test_start_permission_error(self, audio_websocket_service):
"""Should handle permission errors when opening FIFO."""
with patch(
"birdnetpi.audio.websocket.os.open", side_effect=PermissionError("Permission denied")
):
with pytest.raises(PermissionError, match="Permission denied"):
await audio_websocket_service.start()

@pytest.mark.asyncio
async def test_stop_idempotency(self, audio_websocket_service):
"""Should handle multiple stop() calls without error."""
# First stop
await audio_websocket_service.stop()
assert audio_websocket_service._shutdown_flag is True

# Second stop should not raise
await audio_websocket_service.stop()
assert audio_websocket_service._shutdown_flag is True

@pytest.mark.asyncio
async def test_start_idempotency_prevented(self, audio_websocket_service):
"""Should prevent multiple start() calls by checking state."""
with (
patch("birdnetpi.audio.websocket.os.open", return_value=123),
patch("birdnetpi.audio.websocket.serve", autospec=True) as mock_serve,
patch("birdnetpi.audio.websocket.signal", autospec=True),
patch("birdnetpi.audio.websocket.atexit", autospec=True),
):
mock_server = MagicMock(spec=Server)

async def mock_serve_func(*args, **kwargs):
return mock_server

mock_serve.side_effect = mock_serve_func

# First start
await audio_websocket_service.start()
assert audio_websocket_service._fifo_livestream_fd == 123

# Attempting second start should open FIFO again (no guard currently)
# This verifies current behavior - service doesn't prevent re-initialization
await audio_websocket_service.start()
# If start had idempotency guards, this would raise or no-op
Loading
Loading