Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 120 additions & 3 deletions backend/src/librarysync/jobs/aiostreams_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
EpisodeItem,
Integration,
MediaItem,
WatchedItem,
)
from librarysync.jobs.import_base import ImportContext, ImportResult, ImportStrategy
from librarysync.jobs.import_pipeline import (
Expand Down Expand Up @@ -446,7 +447,7 @@ async def _lookup_metadata_candidate(
exc_info=True,
)
continue
candidate = _select_candidate_for_entry(entry, candidates)
candidate = await _select_candidate_for_entry(db, user_id, entry, candidates)
if not candidate:
continue
if _candidate_has_useful_id(candidate):
Expand All @@ -471,8 +472,107 @@ def _build_lookup_queries(title: str, year: int | None) -> list[str]:
return [f"{cleaned} {year}", cleaned]


def _select_candidate_for_entry(
entry: ParsedEntry, candidates: list[MediaCandidate]
async def _check_series_continuity(
db: AsyncSession,
user_id: str,
entry: ParsedEntry,
candidates: list[MediaCandidate],
) -> MediaCandidate | None:
"""
Check if the user has previously watched an earlier episode of a show matching one of the candidates.
This helps disambiguate when there are multiple shows with the same or similar titles.

For example, if watching "Fallout S02E08" and user previously watched "Fallout S02E07",
prefer the same show's metadata rather than selecting a different "Fallout" (e.g., an anime).
"""
if not entry.title or entry.season_number is None or entry.episode_number is None:
return None

# Build a normalized title key for matching
title_key = _normalize_title_key(entry.title)
if not title_key:
return None

# Query for shows the user has watched episodes of, matching the title
# We look for MediaItems that:
# 1. Are TV shows
# 2. Have a matching normalized title
# 3. Have episode watches by this user
from sqlalchemy import and_, func
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanup: importing and_/func inside the function makes dependencies less discoverable; prefer module-level imports for consistency. Also, the if entry.season_number is not None: check is redundant because the function already returns early when season_number is None.

Copilot uses AI. Check for mistakes.

# First, find all TV shows the user has watched
# Use a subquery to get the max season and episode per show
# Note: This may return max_season from one episode and max_episode from another,
# but it's acceptable for our continuity check as we're looking for a general pattern
result = await db.execute(
select(
MediaItem,
func.max(EpisodeItem.season_number).label("max_season"),
func.max(EpisodeItem.episode_number).label("max_episode"),
)
.join(EpisodeItem, EpisodeItem.show_media_item_id == MediaItem.id)
.join(WatchedItem, and_(
WatchedItem.episode_item_id == EpisodeItem.id,
WatchedItem.user_id == user_id
))
.where(MediaItem.media_type == "tv")
.group_by(MediaItem.id)
Comment on lines +503 to +519
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Computing max_season and max_episode independently can produce an impossible (season, episode) pair (e.g., max season from one row and max episode from another). This can incorrectly reject true continuations (e.g., user watched S03E01 but max_episode comes from S01E22, causing the nearby/continuation checks to fail). Consider selecting the latest watched episode per show using a single ordering over (season_number DESC, episode_number DESC) (e.g., a window function / DISTINCT ON in Postgres), or a two-step aggregate (max season, then max episode constrained to that season).

Suggested change
# First, find all TV shows the user has watched
# Use a subquery to get the max season and episode per show
# Note: This may return max_season from one episode and max_episode from another,
# but it's acceptable for our continuity check as we're looking for a general pattern
result = await db.execute(
select(
MediaItem,
func.max(EpisodeItem.season_number).label("max_season"),
func.max(EpisodeItem.episode_number).label("max_episode"),
)
.join(EpisodeItem, EpisodeItem.show_media_item_id == MediaItem.id)
.join(WatchedItem, and_(
WatchedItem.episode_item_id == EpisodeItem.id,
WatchedItem.user_id == user_id
))
.where(MediaItem.media_type == "tv")
.group_by(MediaItem.id)
# First, find the latest watched episode per show for this user, in a way that
# guarantees (max_season, max_episode) comes from a real episode row.
#
# Step 1: find the max season per show that the user has watched.
max_season_per_show_subq = (
select(
EpisodeItem.show_media_item_id.label("show_id"),
func.max(EpisodeItem.season_number).label("max_season"),
)
.join(
WatchedItem,
and_(
WatchedItem.episode_item_id == EpisodeItem.id,
WatchedItem.user_id == user_id,
),
)
.group_by(EpisodeItem.show_media_item_id)
.subquery()
)
# Step 2: within that max season, find the max episode per show.
max_episode_in_max_season_subq = (
select(
EpisodeItem.show_media_item_id.label("show_id"),
func.max(EpisodeItem.episode_number).label("max_episode"),
)
.join(
max_season_per_show_subq,
and_(
max_season_per_show_subq.c.show_id == EpisodeItem.show_media_item_id,
max_season_per_show_subq.c.max_season == EpisodeItem.season_number,
),
)
.group_by(EpisodeItem.show_media_item_id)
.subquery()
)
# Step 3: join the aggregates back to MediaItem to get per-show latest season/episode.
result = await db.execute(
select(
MediaItem,
max_season_per_show_subq.c.max_season.label("max_season"),
max_episode_in_max_season_subq.c.max_episode.label("max_episode"),
)
.join(
max_season_per_show_subq,
max_season_per_show_subq.c.show_id == MediaItem.id,
)
.join(
max_episode_in_max_season_subq,
max_episode_in_max_season_subq.c.show_id == MediaItem.id,
)
.where(MediaItem.media_type == "tv")

Copilot uses AI. Check for mistakes.
)
Comment on lines +507 to +520
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite computing title_key, the query fetches all TV shows a user has watched and only later filters by title in Python. For users with large histories this can be unnecessarily expensive. Consider pushing at least a coarse title filter into SQL (e.g., ilike / lower(title) equality on entry.title) so you only aggregate candidate-relevant shows, then keep the stricter _normalize_title_key check in Python if needed.

Copilot uses AI. Check for mistakes.

watched_shows = result.all()

# For each watched show, check if it matches one of our candidates
for show_item, max_season, max_episode in watched_shows:
# Check if the show title matches the entry title (fuzzy match)
show_title_key = _normalize_title_key(show_item.title or "")
if not show_title_key or show_title_key != title_key:
continue

# Skip if we don't have valid season/episode data
if max_season is None or max_episode is None:
continue

# Check if this is a continuation (same season and later episode, or later season)
if entry.season_number is not None:
is_continuation = (
Comment on lines +536 to +537
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanup: importing and_/func inside the function makes dependencies less discoverable; prefer module-level imports for consistency. Also, the if entry.season_number is not None: check is redundant because the function already returns early when season_number is None.

Copilot uses AI. Check for mistakes.
(entry.season_number == max_season and entry.episode_number > max_episode) or
(entry.season_number > max_season)
)

# Allow same episode or earlier if not too far back (could be rewatching)
is_nearby = (
entry.season_number == max_season and
abs(entry.episode_number - max_episode) <= 3
)

if not is_continuation and not is_nearby:
continue

# Now check if this show matches one of our candidates
for candidate in candidates:
# Match by IMDb ID (most reliable)
if show_item.imdb_id and candidate.imdb_id:
if show_item.imdb_id.lower() == candidate.imdb_id.lower():
return candidate

# Match by TMDB ID
if show_item.tmdb_id and candidate.provider == "tmdb" and candidate.provider_id:
if str(show_item.tmdb_id) == str(candidate.provider_id):
return candidate

# Match by TVDB ID
if show_item.tvdb_id and candidate.provider == "tvdb" and candidate.provider_id:
if str(show_item.tvdb_id) == str(candidate.provider_id):
return candidate

return None


async def _select_candidate_for_entry(
db: AsyncSession,
user_id: str,
entry: ParsedEntry,
candidates: list[MediaCandidate],
) -> MediaCandidate | None:
if not candidates:
return None
Expand All @@ -483,6 +583,23 @@ def _select_candidate_for_entry(
]
if not scoped:
scoped = candidates

# For TV shows, check if user has watched a previous episode from one of the candidates
if entry.media_type == "tv" and entry.season_number is not None and entry.episode_number is not None:
continuity_candidate = await _check_series_continuity(
db, user_id, entry, scoped
)
if continuity_candidate:
logger.debug(
"Using series continuity: selected %s (imdb=%s) for %s S%02dE%02d",
continuity_candidate.title,
continuity_candidate.imdb_id,
entry.title,
entry.season_number,
entry.episode_number,
)
return continuity_candidate

title_key = _normalize_title_key(entry.title or "")
title_matches: list[MediaCandidate] = []
if title_key:
Expand Down
100 changes: 94 additions & 6 deletions backend/tests/test_aiostreams_import.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import sys
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock

PROJECT_ROOT = Path(__file__).resolve().parents[1]
sys.path.append(str(PROJECT_ROOT / "src"))
Expand All @@ -11,19 +13,26 @@


class TestAIOStreamsLookup(unittest.TestCase):
def _build_entry(self, title: str, year: int | None) -> aiostreams_import.ParsedEntry:
def _build_entry(
self,
title: str,
year: int | None,
media_type: str = "movie",
season_number: int | None = None,
episode_number: int | None = None,
) -> aiostreams_import.ParsedEntry:
now = datetime.now(timezone.utc)
return aiostreams_import.ParsedEntry(
raw={},
watched_at=now,
last_seen=now,
duration_seconds=3600,
media_type="movie",
media_type=media_type,
imdb_id=None,
tmdb_id=None,
tvdb_id=None,
season_number=None,
episode_number=None,
season_number=season_number,
episode_number=episode_number,
title=title,
year=year,
filename=None,
Expand Down Expand Up @@ -56,7 +65,13 @@ def test_select_candidate_prefers_title_and_year(self) -> None:
raw={},
),
]
selected = aiostreams_import._select_candidate_for_entry(entry, candidates)
# Create a mock db session
db = AsyncMock()
db.execute = AsyncMock(return_value=MagicMock(all=MagicMock(return_value=[])))

selected = asyncio.run(
aiostreams_import._select_candidate_for_entry(db, "test_user", entry, candidates)
)
Comment on lines +72 to +74
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asyncio.run() inside unittest.TestCase can break if tests are executed under an environment that already has a running event loop. Prefer unittest.IsolatedAsyncioTestCase with async def test_... methods (and await ...) for these async tests.

Copilot uses AI. Check for mistakes.
self.assertIsNotNone(selected)
self.assertEqual(selected.provider_id, "200")

Expand Down Expand Up @@ -84,10 +99,83 @@ def test_select_candidate_uses_title_match(self) -> None:
raw={},
),
]
selected = aiostreams_import._select_candidate_for_entry(entry, candidates)
# Create a mock db session
db = AsyncMock()
db.execute = AsyncMock(return_value=MagicMock(all=MagicMock(return_value=[])))

selected = asyncio.run(
aiostreams_import._select_candidate_for_entry(db, "test_user", entry, candidates)
)
self.assertIsNotNone(selected)
self.assertEqual(selected.provider_id, "tt0000002")

def test_series_continuity_prefers_previously_watched_show(self) -> None:
"""Test that if user watched Fallout S02E07, watching S02E08 prefers the same show."""
entry = self._build_entry(
title="Fallout",
year=None,
media_type="tv",
season_number=2,
episode_number=8,
)

# Two candidates: one is the correct Fallout TV show, another is an anime
fallout_tv_candidate = MediaCandidate(
provider="tmdb",
provider_id="12345",
media_type="tv",
title="Fallout",
year=2024,
poster_url=None,
imdb_id="tt12345678",
raw={},
)
fallout_anime_candidate = MediaCandidate(
provider="tmdb",
provider_id="99999",
media_type="tv",
title="Fallout",
year=2008,
poster_url=None,
imdb_id="tt99999999",
raw={},
)

candidates = [
fallout_anime_candidate, # Wrong one first in list
fallout_tv_candidate, # Correct one second
]

# Mock database to return a show with matching imdb_id that user has watched before
from sqlalchemy.engine import Result
mock_result = MagicMock()

# Create mock MediaItem for the TV show the user has watched
mock_media_item = MagicMock()
mock_media_item.id = "show_123"
mock_media_item.media_type = "tv"
mock_media_item.title = "Fallout"
mock_media_item.imdb_id = "tt12345678"
mock_media_item.tmdb_id = None
mock_media_item.tvdb_id = None

# User has watched S02E07 (episode 7 before current episode 8)
mock_result.all = MagicMock(return_value=[
(mock_media_item, 2, 7) # (MediaItem, max_season, max_episode)
])

db = AsyncMock()
db.execute = AsyncMock(return_value=mock_result)

selected = asyncio.run(
aiostreams_import._select_candidate_for_entry(db, "test_user", entry, candidates)
)

# Should select the TV show, not the anime, because user watched previous episode
self.assertIsNotNone(selected)
self.assertEqual(selected.imdb_id, "tt12345678")
self.assertEqual(selected.year, 2024)


if __name__ == "__main__":
unittest.main()