diff --git a/README.md b/README.md index de79716..29aca50 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ A Django application to track and rate the media I consume: movies, TV shows, bo - `ALLOWED_HOSTS`: Add your domain and IP (e.g., `datakult.example.com,192.168.1.100,localhost`) - `DJANGO_SUPERUSER_PASSWORD`: Use a secure password - `TMDB_API_KEY`: If you want to be able to import metadata from TMDB + - `TWITCH_CLIENT_ID` and `TWITCH_CLIENT_SECRET`: If you want to import metadata from IGDB 4. Start the application: ```bash diff --git a/docker/.env.example b/docker/.env.example index c7963e3..dd011f3 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -16,6 +16,8 @@ DJANGO_SUPERUSER_USERNAME=admin DJANGO_SUPERUSER_EMAIL=admin@example.com DJANGO_SUPERUSER_PASSWORD=change-this-password -# API keys (optional - for importing metadata from TMDB) -# Leave empty to disable TMDB integration +# External services (optional - for importing metadata from TMDB and IGDB) +# Leave empty to disable external services integration TMDB_API_KEY= +TWITCH_CLIENT_ID= +TWITCH_CLIENT_SECRET= diff --git a/src/config/settings.py b/src/config/settings.py index 5c375c9..2b31ff7 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -223,6 +223,8 @@ # ============================================================================= TMDB_API_KEY = os.environ.get("TMDB_API_KEY", "") +TWITCH_CLIENT_ID = os.environ.get("TWITCH_CLIENT_ID", "") +TWITCH_CLIENT_SECRET = os.environ.get("TWITCH_CLIENT_SECRET", "") # ============================================================================= # Security Settings for Production (behind reverse proxy like Cloudflare Tunnel) diff --git a/src/core/services/igdb.py b/src/core/services/igdb.py new file mode 100644 index 0000000..c7a62ca --- /dev/null +++ b/src/core/services/igdb.py @@ -0,0 +1,284 @@ +""" +IGDB API client for fetching video game metadata. + +API Documentation: https://api-docs.igdb.com/ +Authentication: Uses Twitch OAuth2 - https://dev.twitch.tv/docs/authentication/ + +To use this API, you need to: +1. Create an application at https://dev.twitch.tv/console +2. Set TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET in your environment +""" + +import datetime +import logging +import time +from dataclasses import dataclass + +import requests +from django.conf import settings + +logger = logging.getLogger(__name__) + +IGDB_BASE_URL = "https://api.igdb.com/v4/" +TWITCH_AUTH_URL = "https://id.twitch.tv/oauth2/token" +IGDB_IMAGE_BASE_URL = "https://images.igdb.com/igdb/image/upload/" + +# Minimum query length for search +MIN_QUERY_LENGTH = 2 + +# Cache for access token (simple in-memory cache) +_token_cache: dict = {"access_token": None, "expires_at": 0} + + +class IGDBError(Exception): + """Exception raised when IGDB API credentials are missing or invalid.""" + + +@dataclass +class IGDBResult: + """Represents a search result from IGDB.""" + + igdb_id: int + name: str + year: int | None + summary: str + cover_url: str | None + cover_url_small: str | None + + +def _get_image_url(image_id: str | None, size: str = "cover_big") -> str | None: + """ + Build IGDB image URL from image ID. + + Size options: cover_small (90x128), cover_big (264x374), + screenshot_med (569x320), 720p, 1080p + """ + if not image_id: + return None + return f"{IGDB_IMAGE_BASE_URL}t_{size}/{image_id}.jpg" + + +def _escape_apicalypse_query(query: str) -> str: + """Escape user input for Apicalypse queries.""" + return query.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ").strip() + + +class IGDBClient: + """Client for interacting with the IGDB API.""" + + def __init__(self, client_id: str | None = None, client_secret: str | None = None): + self.client_id = client_id or getattr(settings, "TWITCH_CLIENT_ID", "") + self.client_secret = client_secret or getattr(settings, "TWITCH_CLIENT_SECRET", "") + + if not self.client_id or not self.client_secret: + msg = "TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET are required." + raise IGDBError(msg) + + def _get_access_token(self) -> str: + """ + Get a valid access token, refreshing if necessary. + + Uses Twitch's client credentials flow. + """ + # Check if cached token is still valid (with 60s buffer) + if _token_cache["access_token"] and _token_cache["expires_at"] > time.time() + 60: + return _token_cache["access_token"] + + # Request new token + try: + response = requests.post( + TWITCH_AUTH_URL, + params={ + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "client_credentials", + }, + timeout=10, + ) + response.raise_for_status() + data = response.json() + + _token_cache["access_token"] = data["access_token"] + _token_cache["expires_at"] = time.time() + data.get("expires_in", 3600) + + except requests.RequestException as e: + logger.exception("Failed to get Twitch access token") + msg = "Failed to authenticate with Twitch" + raise IGDBError(msg) from e + + return _token_cache["access_token"] + + def _request(self, endpoint: str, body: str) -> list[dict]: + """ + Make a request to the IGDB API. + + IGDB uses POST requests with a custom query language (Apicalypse). + """ + access_token = self._get_access_token() + + headers = { + "Client-ID": self.client_id, + "Authorization": f"Bearer {access_token}", + "Content-Type": "text/plain", + } + + url = f"{IGDB_BASE_URL}{endpoint}" + + try: + response = requests.post(url, headers=headers, data=body, timeout=10) + response.raise_for_status() + except requests.RequestException: + logger.exception("IGDB API request failed") + raise + + return response.json() + + def search_games(self, query: str, limit: int = 10) -> list[IGDBResult]: + """ + Search for video games. + + Args: + query: The search query + limit: Maximum number of results + + Returns: + List of IGDBResult objects + """ + if not query or len(query) < MIN_QUERY_LENGTH: + return [] + + # Apicalypse query language + # See: https://api-docs.igdb.com/#apicalypse + safe_query = _escape_apicalypse_query(query) + body = f""" + search "{safe_query}"; + fields name, first_release_date, summary, cover.image_id; + limit {limit}; + """ + + data = self._request("games", body) + + results = [] + for item in data: + # Extract year from Unix timestamp + release_date = item.get("first_release_date") + year = None + if release_date: + year = datetime.datetime.fromtimestamp(release_date, tz=datetime.UTC).year + + # Extract cover image ID + cover = item.get("cover", {}) + cover_image_id = cover.get("image_id") if isinstance(cover, dict) else None + + results.append( + IGDBResult( + igdb_id=item.get("id"), + name=item.get("name", ""), + year=year, + summary=item.get("summary", ""), + cover_url=_get_image_url(cover_image_id, "cover_big"), + cover_url_small=_get_image_url(cover_image_id, "cover_small"), + ) + ) + + return results + + def get_game_details(self, game_id: int) -> dict: + """ + Get detailed information about a game. + + Returns a dict with: + - name, year, summary + - developers: list of developer names + - publishers: list of publisher names + - genres: list of genre names + - cover_url: full URL for cover image + - igdb_url: URL to IGDB page + """ + body = f""" + fields name, first_release_date, summary, url, + cover.image_id, + involved_companies.company.name, involved_companies.developer, involved_companies.publisher, + genres.name; + where id = {game_id}; + """ + + data = self._request("games", body) + + if not data: + return {} + + game = data[0] + + # Extract year + release_date = game.get("first_release_date") + year = None + if release_date: + year = datetime.datetime.fromtimestamp(release_date, tz=datetime.UTC).year + + # Extract developers and publishers + developers = [] + publishers = [] + for company_info in game.get("involved_companies", []): + company = company_info.get("company", {}) + company_name = company.get("name") if isinstance(company, dict) else None + if company_name: + if company_info.get("developer"): + developers.append(company_name) + if company_info.get("publisher"): + publishers.append(company_name) + + # Extract genres + genres = [g.get("name") for g in game.get("genres", []) if g.get("name")] + + # Extract cover + cover = game.get("cover", {}) + cover_image_id = cover.get("image_id") if isinstance(cover, dict) else None + + return { + "title": game.get("name", ""), + "year": year, + "overview": game.get("summary", ""), + "developers": developers, + "publishers": publishers, + "contributors": developers, # Use developers as primary contributors + "genres": genres, + "cover_url": _get_image_url(cover_image_id, "cover_big"), + "igdb_url": game.get("url", f"https://www.igdb.com/games/{game_id}"), + "media_type": "game", + } + + def download_cover(self, cover_url: str) -> bytes | None: + """Download cover image and return bytes.""" + if not cover_url: + return None + + # Basic validation - ensure it's from IGDB + if not cover_url.startswith(IGDB_IMAGE_BASE_URL): + logger.warning("Invalid IGDB cover URL: %s", cover_url) + return None + + try: + response = requests.get(cover_url, timeout=15) + response.raise_for_status() + except requests.RequestException: + logger.exception("Failed to download cover from %s", cover_url) + return None + + return response.content + + +def get_igdb_client() -> IGDBClient | None: + """ + Factory function to get an IGDB client instance. + + Returns None if the API credentials are not configured. + """ + client_id = getattr(settings, "TWITCH_CLIENT_ID", "") + client_secret = getattr(settings, "TWITCH_CLIENT_SECRET", "") + + if not client_id or not client_secret: + logger.warning("IGDB API credentials not configured") + return None + + return IGDBClient() diff --git a/src/core/services/openlibrary.py b/src/core/services/openlibrary.py new file mode 100644 index 0000000..2691bcb --- /dev/null +++ b/src/core/services/openlibrary.py @@ -0,0 +1,267 @@ +""" +OpenLibrary API client for fetching book metadata. + +API Documentation: https://openlibrary.org/developers/api + +This API is free and does not require authentication. +Rate limiting: Please be respectful and limit requests to ~1/second. +""" + +import logging +import re +from dataclasses import dataclass +from urllib.parse import urlencode, urljoin + +import requests + +logger = logging.getLogger(__name__) + +OPENLIBRARY_BASE_URL = "https://openlibrary.org/" +OPENLIBRARY_COVERS_URL = "https://covers.openlibrary.org/" + +# Minimum query length for search +MIN_QUERY_LENGTH = 2 + +# Pattern for valid OpenLibrary cover URLs +OPENLIBRARY_COVER_PATTERN = re.compile(r"^https://covers\.openlibrary\.org/[baw]/(?:id|olid|isbn)/[^/]+\.jpg$") + +# Minimum size in bytes to consider a cover valid (OpenLibrary returns 1x1 pixel placeholder) +MIN_COVER_SIZE_BYTES = 1000 + + +class OpenLibraryError(Exception): + """Exception raised when OpenLibrary API request fails.""" + + +@dataclass +class OpenLibraryResult: + """Represents a search result from OpenLibrary.""" + + work_key: str # e.g., "/works/OL45883W" + title: str + authors: list[str] + year: int | None + cover_id: int | None + + @property + def olid(self) -> str: + """Extract the OpenLibrary ID from the work key.""" + return self.work_key.split("/")[-1] if self.work_key else "" + + @property + def cover_url(self) -> str | None: + """Returns the full URL for the cover image (medium size).""" + if self.cover_id: + return f"{OPENLIBRARY_COVERS_URL}b/id/{self.cover_id}-M.jpg" + return None + + @property + def cover_url_small(self) -> str | None: + """Returns a smaller cover URL for thumbnails.""" + if self.cover_id: + return f"{OPENLIBRARY_COVERS_URL}b/id/{self.cover_id}-S.jpg" + return None + + @property + def cover_url_large(self) -> str | None: + """Returns a larger cover URL.""" + if self.cover_id: + return f"{OPENLIBRARY_COVERS_URL}b/id/{self.cover_id}-L.jpg" + return None + + +class OpenLibraryClient: + """Client for interacting with the OpenLibrary API.""" + + def __init__(self): + # OpenLibrary doesn't require authentication + pass + + def _request(self, endpoint: str, params: dict | None = None) -> dict: + """Make a request to the OpenLibrary API.""" + params = params or {} + + url = urljoin(OPENLIBRARY_BASE_URL, endpoint) + if params: + url = f"{url}?{urlencode(params)}" + + try: + response = requests.get(url, timeout=10) + response.raise_for_status() + except requests.RequestException: + logger.exception("OpenLibrary API request failed") + raise + + return response.json() + + def search_books(self, query: str, limit: int = 10) -> list[OpenLibraryResult]: + """ + Search for books. + + Args: + query: The search query + limit: Maximum number of results + + Returns: + List of OpenLibraryResult objects + """ + if not query or len(query) < MIN_QUERY_LENGTH: + return [] + + data = self._request( + "search.json", + { + "q": query, + "limit": limit, + "fields": "key,title,author_name,first_publish_year,cover_i", + }, + ) + + results = [] + for doc in data.get("docs", []): + # Get first cover ID if available + cover_id = doc.get("cover_i") + + # Get authors list + authors = doc.get("author_name", []) + + results.append( + OpenLibraryResult( + work_key=doc.get("key", ""), + title=doc.get("title", ""), + authors=authors if isinstance(authors, list) else [authors], + year=doc.get("first_publish_year"), + cover_id=cover_id, + ) + ) + + return results + + def get_work_details(self, work_key: str, first_publish_year: int | None = None) -> dict: + """ + Get detailed information about a work (book). + + Args: + work_key: The work key (e.g., "/works/OL45883W" or just "OL45883W") + first_publish_year: Optional year from search results + + Returns a dict with: + - title, year, overview (description) + - authors: list of author names + - cover_url: full URL for cover image + - openlibrary_url: URL to OpenLibrary page + """ + # Normalize work key + if not work_key.startswith("/works/"): + work_key = f"/works/{work_key}" + + # Fetch work details + work_data = self._request(f"{work_key}.json") + + # Extract description + description = work_data.get("description", "") + if isinstance(description, dict): + description = description.get("value", "") + + # Get cover IDs + cover_ids = work_data.get("covers", []) + cover_id = cover_ids[0] if cover_ids else None + cover_url = f"{OPENLIBRARY_COVERS_URL}b/id/{cover_id}-L.jpg" if cover_id else None + + # Get authors - need to fetch each author + authors = [] + for author_ref in work_data.get("authors", []): + author_key = None + if isinstance(author_ref, dict): + # Can be {"author": {"key": "/authors/..."}} or {"key": "/authors/..."} + author_key = author_ref["author"].get("key") if "author" in author_ref else author_ref.get("key") + + if author_key: + try: + author_data = self._request(f"{author_key}.json") + if author_data.get("name"): + authors.append(author_data["name"]) + except requests.RequestException: + logger.warning("Failed to fetch author: %s", author_key) + + return { + "title": work_data.get("title", ""), + "year": first_publish_year, + "overview": description, + "authors": authors, + "contributors": authors, + "cover_url": cover_url, + "openlibrary_url": f"https://openlibrary.org{work_key}", + "media_type": "book", + } + + def get_book_by_isbn(self, isbn: str) -> dict | None: + """ + Get book details by ISBN. + + Args: + isbn: ISBN-10 or ISBN-13 + + Returns: + Book details dict or None if not found + """ + # Clean ISBN (remove dashes and spaces) + isbn = re.sub(r"[\s-]", "", isbn) + + try: + data = self._request(f"isbn/{isbn}.json") + except requests.RequestException: + return None + + if not data: + return None + + # ISBN endpoint returns an edition, get the work for full details + works = data.get("works", []) + if works: + work_key = works[0].get("key") + if work_key: + details = self.get_work_details(work_key) + # Override year with edition's publish date if available + publish_date = data.get("publish_date", "") + if publish_date: + # Try to extract year from various date formats + year_match = re.search(r"\b(1[89]\d{2}|20[0-2]\d)\b", publish_date) + if year_match: + details["year"] = int(year_match.group(1)) + return details + + return None + + def download_cover(self, cover_url: str) -> bytes | None: + """Download cover image and return bytes.""" + if not cover_url: + return None + + # Basic validation - ensure it's from OpenLibrary + if not OPENLIBRARY_COVER_PATTERN.match(cover_url): + logger.warning("Invalid OpenLibrary cover URL: %s", cover_url) + return None + + try: + response = requests.get(cover_url, timeout=15) + response.raise_for_status() + except requests.RequestException: + logger.exception("Failed to download cover from %s", cover_url) + return None + + # OpenLibrary returns a 1x1 pixel if cover doesn't exist + if len(response.content) < MIN_COVER_SIZE_BYTES: + logger.warning("Cover not available (placeholder returned): %s", cover_url) + return None + + return response.content + + +def get_openlibrary_client() -> OpenLibraryClient: + """ + Factory function to get an OpenLibrary client instance. + + OpenLibrary doesn't require authentication, so this always returns a client. + """ + return OpenLibraryClient() diff --git a/src/core/services/tmdb.py b/src/core/services/tmdb.py index 173b6f6..af6e491 100644 --- a/src/core/services/tmdb.py +++ b/src/core/services/tmdb.py @@ -4,13 +4,10 @@ API Documentation: https://developer.themoviedb.org/docs """ -import ipaddress import logging -import re -import socket from dataclasses import dataclass from typing import Literal -from urllib.parse import urlencode, urljoin, urlparse +from urllib.parse import urlencode, urljoin import requests from django.conf import settings @@ -20,11 +17,6 @@ TMDB_BASE_URL = "https://api.themoviedb.org/3/" TMDB_IMAGE_BASE_URL = "https://image.tmdb.org/t/p/" -# Allowed hosts for TMDB image downloads (SSRF protection) -TMDB_ALLOWED_IMAGE_HOSTS = frozenset({"image.tmdb.org"}) -# Pattern for valid TMDB poster paths (e.g., /t/p/w500/abc123.jpg) -TMDB_POSTER_PATH_PATTERN = re.compile(r"^/t/p/w\d+/[a-zA-Z0-9]+\.[a-z]{3,4}$") - # Minimum query length for search MIN_QUERY_LENGTH = 2 # Minimum date string length for year extraction (YYYY) @@ -47,21 +39,21 @@ class TMDBResult: original_title: str year: int | None overview: str - poster_path: str | None + cover_path: str | None media_type: Literal["movie", "tv"] @property - def poster_url(self) -> str | None: - """Returns the full URL for the poster image (w500 size).""" - if self.poster_path: - return f"{TMDB_IMAGE_BASE_URL}w500{self.poster_path}" + def cover_url(self) -> str | None: + """Returns the full URL for the cover image (w500 size).""" + if self.cover_path: + return f"{TMDB_IMAGE_BASE_URL}w500{self.cover_path}" return None @property - def poster_url_small(self) -> str | None: - """Returns a smaller poster URL for thumbnails (w185 size).""" - if self.poster_path: - return f"{TMDB_IMAGE_BASE_URL}w185{self.poster_path}" + def cover_url_small(self) -> str | None: + """Returns a smaller cover URL for thumbnails (w185 size).""" + if self.cover_path: + return f"{TMDB_IMAGE_BASE_URL}w185{self.cover_path}" return None @@ -133,7 +125,7 @@ def search_multi(self, query: str, language: str = "fr-FR", page: int = 1) -> li original_title=original_title, year=year, overview=item.get("overview", ""), - poster_path=item.get("poster_path"), + cover_path=item.get("poster_path"), media_type=media_type, ) ) @@ -156,7 +148,7 @@ def get_full_details(self, tmdb_id: int, media_type: Literal["movie", "tv"], lan - title, original_title, year, overview - directors: list of director names - production_companies: list of company names - - poster_url: full URL for poster image + - cover_url: full URL for cover image - tmdb_url: URL to TMDB page """ if media_type == "movie": @@ -183,9 +175,9 @@ def get_full_details(self, tmdb_id: int, media_type: Literal["movie", "tv"], lan # Get genres genres = [g["name"] for g in data.get("genres", [])] - # Build poster URL - poster_path = data.get("poster_path") - poster_url = f"{TMDB_IMAGE_BASE_URL}w500{poster_path}" if poster_path else None + # Build cover URL + cover_path = data.get("poster_path") + cover_url = f"{TMDB_IMAGE_BASE_URL}w500{cover_path}" if cover_path else None # Build TMDB URL tmdb_url = f"https://www.themoviedb.org/{media_type}/{tmdb_id}" @@ -198,75 +190,26 @@ def get_full_details(self, tmdb_id: int, media_type: Literal["movie", "tv"], lan "directors": directors, "production_companies": production_companies, "genres": genres, - "poster_url": poster_url, + "cover_url": cover_url, "tmdb_url": tmdb_url, "media_type": media_type, } - def _validate_poster_url(self, poster_url: str) -> bool: - """ - Validate that the poster URL is a legitimate TMDB image URL. - - Returns True if the URL is valid and safe to fetch, False otherwise. - """ - try: - parsed = urlparse(poster_url) - except ValueError: - logger.warning("Invalid URL format rejected: %s", poster_url) - return False - - # Validate scheme, host, and path pattern - if parsed.scheme != "https": - logger.warning("Non-HTTPS URL rejected: %s", poster_url) - return False - if parsed.hostname not in TMDB_ALLOWED_IMAGE_HOSTS: - logger.warning("URL with disallowed host rejected: %s", poster_url) - return False - if not TMDB_POSTER_PATH_PATTERN.match(parsed.path): - logger.warning("URL with invalid path pattern rejected: %s", poster_url) - return False - - # DNS resolution check: reject private/reserved IP ranges - return self._validate_resolved_ip(poster_url, parsed.hostname) - - def _validate_resolved_ip(self, poster_url: str, hostname: str) -> bool: - """Check that hostname does not resolve to private/reserved IP ranges.""" - try: - resolved_ips = socket.getaddrinfo(hostname, 443, proto=socket.IPPROTO_TCP) - for _, _, _, _, sockaddr in resolved_ips: - ip_str = sockaddr[0] - ip = ipaddress.ip_address(ip_str) - if ip.is_private or ip.is_reserved or ip.is_loopback or ip.is_link_local: - logger.warning("URL resolving to private/reserved IP rejected: %s -> %s", poster_url, ip_str) - return False - except (socket.gaierror, ValueError) as e: - logger.warning("DNS resolution failed for URL %s: %s", poster_url, e) - return False - return True - - def download_poster(self, poster_url: str) -> bytes | None: - """ - Download poster image and return bytes. - - Validates the URL against TMDB allowlist and performs security checks - to prevent SSRF attacks before downloading. - """ - if not poster_url: + def download_cover(self, cover_url: str) -> bytes | None: + """Download cover image and return bytes.""" + if not cover_url: return None - if not self._validate_poster_url(poster_url): + # Validate URL is from TMDB image CDN + if not cover_url.startswith(TMDB_IMAGE_BASE_URL): + logger.warning("Invalid TMDB cover URL: %s", cover_url) return None try: - response = requests.get(poster_url, timeout=15, allow_redirects=False) + response = requests.get(cover_url, timeout=15) response.raise_for_status() except requests.RequestException: - logger.exception("Failed to download poster from %s", poster_url) - return None - - # Check for redirect responses (3xx status codes) - if response.is_redirect or response.is_permanent_redirect: - logger.warning("Redirect response rejected for URL: %s", poster_url) + logger.exception("Failed to download cover from %s", cover_url) return None return response.content diff --git a/src/core/urls.py b/src/core/urls.py index 48807c1..e025734 100644 --- a/src/core/urls.py +++ b/src/core/urls.py @@ -16,6 +16,8 @@ path("tags/search-htmx/", views.tag_search_htmx, name="tag_search_htmx"), path("tags/select-htmx/", views.tag_select_htmx, name="tag_select_htmx"), path("tmdb-search/", views.tmdb_search_htmx, name="tmdb_search_htmx"), + path("igdb-search/", views.igdb_search_htmx, name="igdb_search_htmx"), + path("openlibrary-search/", views.openlibrary_search_htmx, name="openlibrary_search_htmx"), path("media/validate_field/", validate_media_field, name="media_validate_field"), path("media//review-full/", views.media_review_full_htmx, name="media_review_full_htmx"), path("media//review-clamped/", views.media_review_clamped_htmx, name="media_review_clamped_htmx"), diff --git a/src/core/views.py b/src/core/views.py index 5db7385..bd115cf 100644 --- a/src/core/views.py +++ b/src/core/views.py @@ -19,15 +19,17 @@ from .forms import MediaForm from .models import Agent, Media, SavedView, Tag from .queries import build_media_context +from .services.igdb import get_igdb_client +from .services.openlibrary import get_openlibrary_client from .services.tmdb import get_tmdb_client from .utils import create_backup, delete_orphan_agents_by_ids logger = logging.getLogger(__name__) -# TMDB constants +# Search constants DEFAULT_TMDB_LANGUAGE = "en-US" MIN_SEARCH_QUERY_LENGTH = 2 -MAX_TMDB_RESULTS = 8 +MAX_SEARCH_RESULTS = 15 @login_required @@ -118,23 +120,62 @@ def _process_new_tags(post_data): return post_data, errors -def _handle_tmdb_poster(request, instance): - """Download and attach TMDB poster if provided.""" - tmdb_poster_url = request.POST.get("tmdb_poster_url") - if tmdb_poster_url and not request.FILES.get("cover"): - poster_bytes = _download_tmdb_poster(tmdb_poster_url) - if poster_bytes: +def _handle_import_cover(request, instance): + """Download and attach cover from import source if provided.""" + cover_url = request.POST.get("import_cover_url") + if cover_url and not request.FILES.get("cover"): + cover_bytes = _download_cover(cover_url) + if cover_bytes: filename = f"{instance.title[:50].replace('/', '_')}.jpg" - instance.cover.save(filename, ContentFile(poster_bytes), save=False) + instance.cover.save(filename, ContentFile(cover_bytes), save=False) -def _build_tmdb_initial_data(tmdb_data: dict, media_type: str, media=None) -> dict: - """Build form initial data from TMDB data, optionally merging with existing media.""" +def _download_cover(cover_url: str) -> bytes | None: + """Download cover image from any supported source.""" + if not cover_url: + return None + + # Determine source and use appropriate client + if "image.tmdb.org" in cover_url: + client = get_tmdb_client() + if client: + return client.download_cover(cover_url) + elif "images.igdb.com" in cover_url: + client = get_igdb_client() + if client: + return client.download_cover(cover_url) + elif "covers.openlibrary.org" in cover_url: + client = get_openlibrary_client() + return client.download_cover(cover_url) + + return None + + +def _build_import_initial_data(import_data: dict, media=None) -> dict: + """Build form initial data from import data, optionally merging with existing media.""" + # Determine media_type based on source + source_media_type = import_data.get("media_type", "") + if source_media_type == "movie": + media_type = "FILM" + elif source_media_type == "tv": + media_type = "TV" + elif source_media_type == "game": + media_type = "GAME" + elif source_media_type == "book": + media_type = "BOOK" + else: + media_type = "" + + # Determine external URI + external_uri = ( + import_data.get("tmdb_url") or import_data.get("igdb_url") or import_data.get("openlibrary_url") or "" + ) + initial_data = { - "title": tmdb_data.get("title", ""), - "pub_year": tmdb_data.get("year"), - "media_type": "FILM" if media_type == "movie" else "TV", - "external_uri": tmdb_data.get("tmdb_url", ""), + "title": import_data.get("title", ""), + "pub_year": import_data.get("year"), + "media_type": media_type, + "external_uri": external_uri, } if media: # Keep existing values for fields user may have customized @@ -148,9 +189,9 @@ def _build_tmdb_initial_data(tmdb_data: dict, media_type: str, media=None) -> di @login_required def media_edit(request, pk=None): media = get_object_or_404(Media, pk=pk) if pk else None - tmdb_data = None - tmdb_contributors = [] - tmdb_tags = [] + import_data = None + import_contributors = [] + import_tags = [] if request.method == "POST": before_contributor_ids = set(media.contributors.values_list("pk", flat=True)) if media else set() @@ -164,7 +205,7 @@ def media_edit(request, pk=None): form = MediaForm(post_data, request.FILES, instance=media) if form.is_valid(): instance = form.save(commit=False) - _handle_tmdb_poster(request, instance) + _handle_import_cover(request, instance) instance.save() form.save_m2m() @@ -178,38 +219,47 @@ def media_edit(request, pk=None): messages.success(request, _(msg_key) % {"title": instance.title}) return redirect("media_detail", pk=instance.pk) else: + # Check for import parameters from different sources tmdb_id = request.GET.get("tmdb_id") media_type = request.GET.get("media_type") lang = request.GET.get("lang", DEFAULT_TMDB_LANGUAGE) + igdb_id = request.GET.get("igdb_id") + openlibrary_key = request.GET.get("openlibrary_key") if tmdb_id and media_type in ("movie", "tv"): - tmdb_data = _fetch_tmdb_data(tmdb_id, media_type, language=lang) - if tmdb_data: - initial_data = _build_tmdb_initial_data(tmdb_data, media_type, media) - form = MediaForm(initial=initial_data, instance=media) - - # Filter out TMDB contributors/tags that already exist on the media - existing_contributor_names = set() - existing_tag_names = set() - if media: - existing_contributor_names = {c.name.lower() for c in media.contributors.all()} - existing_tag_names = {t.name.lower() for t in media.tags.all()} - - tmdb_contributors = [ - name for name in tmdb_data.get("contributors", []) if name.lower() not in existing_contributor_names - ] - tmdb_tags = [name for name in tmdb_data.get("genres", []) if name.lower() not in existing_tag_names] - else: - form = MediaForm(instance=media) + import_data = _fetch_tmdb_data(tmdb_id, media_type, language=lang) + elif igdb_id: + import_data = _fetch_igdb_data(igdb_id) + elif openlibrary_key: + # Get year from search results if available + openlibrary_year = request.GET.get("year") + year = int(openlibrary_year) if openlibrary_year and openlibrary_year.isdigit() else None + import_data = _fetch_openlibrary_data(openlibrary_key, year=year) + + if import_data: + initial_data = _build_import_initial_data(import_data, media) + form = MediaForm(initial=initial_data, instance=media) + + # Filter out contributors/tags that already exist on the media + existing_contributor_names = set() + existing_tag_names = set() + if media: + existing_contributor_names = {c.name.lower() for c in media.contributors.all()} + existing_tag_names = {t.name.lower() for t in media.tags.all()} + + import_contributors = [ + name for name in import_data.get("contributors", []) if name.lower() not in existing_contributor_names + ] + import_tags = [name for name in import_data.get("genres", []) if name.lower() not in existing_tag_names] else: form = MediaForm(instance=media) context = { "media": media, "form": form, - "tmdb_data": tmdb_data, - "tmdb_contributors": tmdb_contributors, - "tmdb_tags": tmdb_tags, + "import_data": import_data, + "import_contributors": import_contributors, + "import_tags": import_tags, } return render(request, "base/media_edit.html", context) @@ -232,12 +282,32 @@ def _fetch_tmdb_data(tmdb_id: str, media_type: str, language: str = DEFAULT_TMDB return details -def _download_tmdb_poster(poster_url: str) -> bytes | None: - """Download poster image from TMDB.""" - client = get_tmdb_client() +def _fetch_igdb_data(igdb_id: str) -> dict | None: + """Fetch IGDB data for pre-filling the form.""" + client = get_igdb_client() if not client: return None - return client.download_poster(poster_url) + + try: + details = client.get_game_details(int(igdb_id)) + except (requests.RequestException, ValueError): + logger.exception("Failed to fetch IGDB data for game %s", igdb_id) + return None + + return details + + +def _fetch_openlibrary_data(work_key: str, year: int | None = None) -> dict | None: + """Fetch OpenLibrary data for pre-filling the form.""" + client = get_openlibrary_client() + + try: + details = client.get_work_details(work_key, first_publish_year=year) + except requests.RequestException: + logger.exception("Failed to fetch OpenLibrary data for work %s", work_key) + return None + + return details @login_required @@ -322,7 +392,7 @@ def tmdb_search_htmx(request): media_id = request.GET.get("media_id") # For editing existing media lang = request.GET.get("lang", DEFAULT_TMDB_LANGUAGE) - base_context = {"results": [], "media_id": media_id, "lang": lang} + base_context = {"results": [], "media_id": media_id, "lang": lang, "query": query} if len(query) < MIN_SEARCH_QUERY_LENGTH: return render(request, "partials/tmdb/tmdb_suggestions.html", base_context) @@ -337,7 +407,7 @@ def tmdb_search_htmx(request): ) try: - results = client.search_multi(query, language=lang)[:MAX_TMDB_RESULTS] + results = client.search_multi(query, language=lang)[:MAX_SEARCH_RESULTS] except requests.RequestException: logger.exception("TMDB search failed") return render( @@ -349,6 +419,65 @@ def tmdb_search_htmx(request): return render(request, "partials/tmdb/tmdb_suggestions.html", {**base_context, "results": results}) +@login_required +def igdb_search_htmx(request): + """HTMX view: search IGDB for video games.""" + query = request.GET.get("q", "").strip() + media_id = request.GET.get("media_id") + + base_context = {"results": [], "media_id": media_id, "query": query} + + if len(query) < MIN_SEARCH_QUERY_LENGTH: + return render(request, "partials/igdb/igdb_suggestions.html", base_context) + + client = get_igdb_client() + if not client: + logger.warning("IGDB search attempted but API credentials not configured") + return render( + request, + "partials/igdb/igdb_suggestions.html", + {**base_context, "error": "IGDB API credentials not configured"}, + ) + + try: + results = client.search_games(query, limit=MAX_SEARCH_RESULTS) + except requests.RequestException: + logger.exception("IGDB search failed") + return render( + request, + "partials/igdb/igdb_suggestions.html", + {**base_context, "error": "Search failed"}, + ) + + return render(request, "partials/igdb/igdb_suggestions.html", {**base_context, "results": results}) + + +@login_required +def openlibrary_search_htmx(request): + """HTMX view: search OpenLibrary for books.""" + query = request.GET.get("q", "").strip() + media_id = request.GET.get("media_id") + + base_context = {"results": [], "media_id": media_id, "query": query} + + if len(query) < MIN_SEARCH_QUERY_LENGTH: + return render(request, "partials/openlibrary/openlibrary_suggestions.html", base_context) + + client = get_openlibrary_client() + + try: + results = client.search_books(query, limit=MAX_SEARCH_RESULTS) + except requests.RequestException: + logger.exception("OpenLibrary search failed") + return render( + request, + "partials/openlibrary/openlibrary_suggestions.html", + {**base_context, "error": "Search failed"}, + ) + + return render(request, "partials/openlibrary/openlibrary_suggestions.html", {**base_context, "results": results}) + + @login_required def media_review_clamped_htmx(request, pk): """HTMX view: return clamped review for a media item (for table cell collapse).""" diff --git a/src/static/js/media_edit.js b/src/static/js/media_edit.js index 8c4cd5a..c9accb8 100644 --- a/src/static/js/media_edit.js +++ b/src/static/js/media_edit.js @@ -68,7 +68,7 @@ document.addEventListener('DOMContentLoaded', () => { const btn = document.createElement('button'); btn.type = 'button'; - btn.className = 'btn btn-ghost btn-xs btn-circle'; + btn.className = 'btn btn-neutral btn-ghost btn-xs btn-circle'; btn.dataset.action = 'remove-chip'; btn.textContent = '✕'; diff --git a/src/templates/base/media_edit.html b/src/templates/base/media_edit.html index 899fe83..f795d9d 100644 --- a/src/templates/base/media_edit.html +++ b/src/templates/base/media_edit.html @@ -40,9 +40,9 @@

+ {# Main content #}
{# Left column - Cover and metadata #} @@ -61,22 +65,17 @@

- {# Show TMDB poster if available #} - {% if tmdb_data.poster_url %} + {% if import_data.cover_url %}
- {% translate 'Cover from TMDB' %} - + id="import-poster-preview">

{% if media.cover %} {% translate "This will replace the current cover" %} {% else %} - {% translate "Cover will be imported from TMDB" %} + {% translate "Cover will be imported" %} {% endif %}

@@ -114,10 +113,10 @@

{% for contributor in media.contributors.all %} {% include "partials/contributors/contributor_chip.html" with agent=contributor %} {% endfor %} - {# TMDB contributors (pre-filled as new) #} - {% for name in tmdb_contributors %} + {# Imported contributors (pre-filled as new) #} + {% for name in import_contributors %} - {{ name }} + {{ name }} @@ -154,10 +153,10 @@

{% for tag in media.tags.all %} {% include "partials/tags/tag_chip.html" %} {% endfor %} - {# TMDB tags (pre-filled as new) #} - {% for name in tmdb_tags %} + {# Imported tags (pre-filled as new) #} + {% for name in import_tags %} - {{ name }} + {{ name }} diff --git a/src/templates/base/media_import.html b/src/templates/base/media_import.html index ba34b05..9e3544d 100644 --- a/src/templates/base/media_import.html +++ b/src/templates/base/media_import.html @@ -1,7 +1,7 @@ {% extends "base/base.html" %} {% load i18n %} {% block title %} - {% translate "Import from TMDB" %} - Datakult + {% translate "Import metadata" %} - Datakult {% endblock title %} {% block content %}
@@ -10,11 +10,38 @@ {% lucide "arrow-left" %} -

{% translate "Import from TMDB" %}

+

{% translate "Import metadata" %}

- {# Search form #} -
+ {# Source selector tabs #} +
+ + + +
+ {# TMDB Search (Movies & TV) #} +
+
+ {% lucide "film" class="w-5 h-5" %} + TMDB + - {% translate "Movies & TV shows" %} +
{% translate "Import from TMDB" %}

autocomplete="off" hx-get="{% url 'tmdb_search_htmx' %}{% if media_id %}?media_id={{ media_id }}{% endif %}" hx-trigger="keyup changed delay:400ms, search" - hx-target="#tmdb-results" + hx-target="#import-results" hx-include="#tmdb-lang" hx-indicator="#search-spinner" />

@@ -48,16 +75,74 @@

{% translate "Import from TMDB" %}

+ {# IGDB Search (Video games) #} + + {# OpenLibrary Search (Books) #} + {# Loading indicator #}
{# Results #} -
+
{# Manual entry link #}

{% translate "Can't find what you're looking for?" %}

- + {% lucide "square-pen" class="w-4 h-4" %} {% translate "Add manually" %} diff --git a/src/templates/partials/contributors/contributor_chip.html b/src/templates/partials/contributors/contributor_chip.html index ea80934..64594f9 100644 --- a/src/templates/partials/contributors/contributor_chip.html +++ b/src/templates/partials/contributors/contributor_chip.html @@ -1,9 +1,16 @@ {% load i18n %} {% if agent %} - + {{ agent.name }} - - + + {% elif error %} {{ error }} diff --git a/src/templates/partials/igdb/igdb_suggestions.html b/src/templates/partials/igdb/igdb_suggestions.html new file mode 100644 index 0000000..4c43d3c --- /dev/null +++ b/src/templates/partials/igdb/igdb_suggestions.html @@ -0,0 +1,41 @@ +{% load i18n %} +{% if error %} +
+ {% lucide "circle-alert" class="w-5 h-5" %} + {{ error }} +
+{% elif results %} + +{% elif query %} +
+ {% lucide "search-x" class="w-5 h-5" %} + {% translate "No results found" %} +
+{% endif %} diff --git a/src/templates/partials/openlibrary/openlibrary_suggestions.html b/src/templates/partials/openlibrary/openlibrary_suggestions.html new file mode 100644 index 0000000..776426d --- /dev/null +++ b/src/templates/partials/openlibrary/openlibrary_suggestions.html @@ -0,0 +1,48 @@ +{% load i18n %} +{% if error %} +
+ {% lucide "circle-alert" class="w-5 h-5" %} + {{ error }} +
+{% elif results %} + +{% elif query %} +
+ {% lucide "search-x" class="w-5 h-5" %} + {% translate "No results found" %} +
+{% endif %} diff --git a/src/templates/partials/tags/tag_chip.html b/src/templates/partials/tags/tag_chip.html index a746bbe..871d0b6 100644 --- a/src/templates/partials/tags/tag_chip.html +++ b/src/templates/partials/tags/tag_chip.html @@ -1,9 +1,16 @@ {% load i18n %} {% if tag %} - - {{ tag.name }} - - + + {{ tag.name }} + + {% elif error %} {{ error }} diff --git a/src/templates/partials/tmdb/tmdb_suggestions.html b/src/templates/partials/tmdb/tmdb_suggestions.html index eb28f03..4b7c8eb 100644 --- a/src/templates/partials/tmdb/tmdb_suggestions.html +++ b/src/templates/partials/tmdb/tmdb_suggestions.html @@ -1,7 +1,7 @@ {% load i18n %} {% if error %}
- {% lucide "alert-circle" class="w-5 h-5" %} + {% lucide "circle-alert" class="w-5 h-5" %} {{ error }}
{% elif results %} @@ -10,8 +10,8 @@
- {% if result.poster_url_small %} - {{ result.title }} diff --git a/src/tests/core/test_tmdb.py b/src/tests/core/test_tmdb.py index 871fcf3..821ba62 100644 --- a/src/tests/core/test_tmdb.py +++ b/src/tests/core/test_tmdb.py @@ -16,49 +16,49 @@ class TestTMDBResult: - """Tests for the TMDBResult dataclass poster URL generation.""" + """Tests for the TMDBResult dataclass cover URL generation.""" - def test_poster_url_builds_w500_url(self): - """poster_url constructs the correct w500 image URL.""" + def test_cover_url_builds_w500_url(self): + """cover_url constructs the correct w500 image URL.""" result = TMDBResult( tmdb_id=123, title="Test", original_title="Test", year=2024, overview="", - poster_path="/abc123.jpg", + cover_path="/abc123.jpg", media_type="movie", ) - assert result.poster_url == f"{TMDB_IMAGE_BASE_URL}w500/abc123.jpg" + assert result.cover_url == f"{TMDB_IMAGE_BASE_URL}w500/abc123.jpg" - def test_poster_url_returns_none_when_no_path(self): - """poster_url returns None when poster_path is None.""" + def test_cover_url_returns_none_when_no_path(self): + """cover_url returns None when cover_path is None.""" result = TMDBResult( tmdb_id=123, title="Test", original_title="Test", year=2024, overview="", - poster_path=None, + cover_path=None, media_type="movie", ) - assert result.poster_url is None + assert result.cover_url is None - def test_poster_url_small_builds_w185_url(self): - """poster_url_small constructs the correct w185 thumbnail URL.""" + def test_cover_url_small_builds_w185_url(self): + """cover_url_small constructs the correct w185 thumbnail URL.""" result = TMDBResult( tmdb_id=123, title="Test", original_title="Test", year=2024, overview="", - poster_path="/abc123.jpg", + cover_path="/abc123.jpg", media_type="movie", ) - assert result.poster_url_small == f"{TMDB_IMAGE_BASE_URL}w185/abc123.jpg" + assert result.cover_url_small == f"{TMDB_IMAGE_BASE_URL}w185/abc123.jpg" class TestTMDBClientValidation: