Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,25 @@ async def get_or_create_stream(
# previous session. Delete it so we get a fresh StreamInfo below —
# stale fields (failover state, error counts, circuit-breaker markers)
# from the old session can prevent the new connection from succeeding.
# Preserve cumulative stats so the stream monitor shows accurate totals
# (variant streams always have 0 clients since clients register with
# the parent, so they get recycled on every playlist refresh).
was_recycled = False
recycled_bytes = 0
recycled_segments = 0
recycled_created_at = None
if stream_id in self.streams:
active_clients = len(self.stream_clients.get(stream_id, set()))
if active_clients == 0:
old_stream = self.streams[stream_id]
was_recycled = True
recycled_bytes = old_stream.total_bytes_served
recycled_segments = old_stream.total_segments_served
recycled_created_at = old_stream.created_at
logger.info(
f"Recycling orphaned stream {stream_id} (0 clients) — "
f"creating fresh state for new session"
f"creating fresh state for new session "
f"(preserving {recycled_bytes} bytes, {recycled_segments} segments)"
)
del self.streams[stream_id]
self.stream_clients.pop(stream_id, None)
Expand Down Expand Up @@ -580,6 +593,15 @@ async def get_or_create_stream(
)
self.stream_clients[stream_id] = set()

# Restore cumulative stats from recycled stream
if was_recycled:
assert (
recycled_created_at is not None
) # always set when was_recycled is True
self.streams[stream_id].total_bytes_served = recycled_bytes
self.streams[stream_id].total_segments_served = recycled_segments
self.streams[stream_id].created_at = recycled_created_at

# Only count non-variant streams in stats
if not is_variant:
self._stats.total_streams += 1
Expand Down
62 changes: 62 additions & 0 deletions tests/test_stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,68 @@ async def test_get_or_create_stream_existing(self, stream_manager):

assert stream_id1 == stream_id2

@pytest.mark.asyncio
async def test_variant_stream_recycle_preserves_stats(self, stream_manager):
"""Variant streams have 0 clients (clients register with parent), so they
are recycled on every HLS playlist refresh. Cumulative byte/segment counters
and created_at must survive multiple recycle cycles."""
parent_url = "http://example.com/master.m3u8"
variant_url = "http://example.com/variant_720p.m3u8"

parent_id = await stream_manager.get_or_create_stream(parent_url)
variant_id = await stream_manager.get_or_create_stream(
variant_url, parent_stream_id=parent_id
)

# Simulate stats accumulated during the first session
stream_manager.streams[variant_id].total_bytes_served = 1_000_000
stream_manager.streams[variant_id].total_segments_served = 42
original_created_at = stream_manager.streams[variant_id].created_at

# Recycle once (no registered clients — mirrors real HLS variant behaviour)
variant_id2 = await stream_manager.get_or_create_stream(
variant_url, parent_stream_id=parent_id
)
assert variant_id2 == variant_id
assert stream_manager.streams[variant_id].total_bytes_served == 1_000_000
assert stream_manager.streams[variant_id].total_segments_served == 42
assert stream_manager.streams[variant_id].created_at == original_created_at

# Accumulate more stats and recycle a second time — counters must keep accumulating
stream_manager.streams[variant_id].total_bytes_served += 500_000
stream_manager.streams[variant_id].total_segments_served += 10

variant_id3 = await stream_manager.get_or_create_stream(
variant_url, parent_stream_id=parent_id
)
assert variant_id3 == variant_id
assert stream_manager.streams[variant_id].total_bytes_served == 1_500_000
assert stream_manager.streams[variant_id].total_segments_served == 52
assert stream_manager.streams[variant_id].created_at == original_created_at

@pytest.mark.asyncio
async def test_variant_stream_recycle_preserves_zero_byte_stats(
self, stream_manager
):
"""Counters must be preserved even when they are 0 at recycle time
(e.g. a brand-new variant recycled before serving any data)."""
parent_url = "http://example.com/master.m3u8"
variant_url = "http://example.com/variant_480p.m3u8"

parent_id = await stream_manager.get_or_create_stream(parent_url)
variant_id = await stream_manager.get_or_create_stream(
variant_url, parent_stream_id=parent_id
)
original_created_at = stream_manager.streams[variant_id].created_at

# Recycle immediately with zero stats — created_at must still be preserved
await stream_manager.get_or_create_stream(
variant_url, parent_stream_id=parent_id
)
assert stream_manager.streams[variant_id].total_bytes_served == 0
assert stream_manager.streams[variant_id].total_segments_served == 0
assert stream_manager.streams[variant_id].created_at == original_created_at

def test_get_all_streams(self, stream_manager):
stats = stream_manager.get_stats()
proxy_stats = stats["proxy_stats"]
Expand Down
Loading