From 732cc2a53010ffc6283e846939205c3a4242ddc0 Mon Sep 17 00:00:00 2001 From: Gravel Freeman <44218371+gravelfreeman@users.noreply.github.com> Date: Mon, 19 Jan 2026 13:12:27 -0500 Subject: [PATCH 1/6] add support for importing metadata from a local launchbox installation --- .dockerignore | 50 + backend/endpoints/sockets/scan.py | 9 + backend/handler/metadata/launchbox_handler.py | 1085 ++++++++++++++--- backend/handler/scan_handler.py | 15 +- .../scheduled/update_launchbox_metadata.py | 6 +- frontend/src/locales/en_US/scan.json | 1 + frontend/src/locales/fr_FR/scan.json | 1 + frontend/src/views/Scan.vue | 28 + 8 files changed, 1009 insertions(+), 186 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..880feeeef --- /dev/null +++ b/.dockerignore @@ -0,0 +1,50 @@ +# Local secrets / env +.env +**/.env + +# Git / editor / OS +.git +.gitignore +.gitattributes +**/.DS_Store + +# IDE / dev containers +.vscode/ +.devcontainer/ + +# Python virtualenvs & caches +.venv/ +**/.venv/ +**/venv/ +**/__pycache__/ +**/.mypy_cache/ +**/.pytest_cache/ +**/.ruff_cache/ + +# Node caches +**/node_modules/ +**/.npm/ +**/.vite/ + +# CI / metadata not needed in image +.github/ + +# Mock & test data (requested) +romm_mock/ +**/romm_mock/ +backend/tests/ +**/romm_test/ + +# Local-only docker compose files/examples (not needed in image) +docker-compose.yml +examples/ + +# Local tools / docs not required for runtime +**/*.md +CODE_OF_CONDUCT.md +CONTRIBUTING.md +DEVELOPER_SETUP.md +SECURITY.md + +# Note: we intentionally do NOT ignore `docker/` because `docker/Dockerfile` +# copies init scripts and nginx/gunicorn configs from that folder. diff --git a/backend/endpoints/sockets/scan.py b/backend/endpoints/sockets/scan.py index a027a726a..1137e91e9 100644 --- a/backend/endpoints/sockets/scan.py +++ b/backend/endpoints/sockets/scan.py @@ -230,6 +230,7 @@ async def _identify_rom( scan_type: ScanType, roms_ids: list[int], metadata_sources: list[str], + launchbox_remote_enabled: bool, socket_manager: socketio.AsyncRedisManager, scan_stats: ScanStats, calculate_hashes: bool = True, @@ -321,6 +322,7 @@ async def _identify_rom( fs_rom=fs_rom, metadata_sources=metadata_sources, newly_added=newly_added, + launchbox_remote_enabled=launchbox_remote_enabled, socket_manager=socket_manager, ) @@ -450,6 +452,7 @@ async def _identify_platform( fs_platforms: list[str], roms_ids: list[int], metadata_sources: list[str], + launchbox_remote_enabled: bool, socket_manager: socketio.AsyncRedisManager, scan_stats: ScanStats, calculate_hashes: bool = True, @@ -539,6 +542,7 @@ async def scan_rom_with_semaphore(fs_rom: FSRom, rom: Rom | None) -> None: scan_type=scan_type, roms_ids=roms_ids, metadata_sources=metadata_sources, + launchbox_remote_enabled=launchbox_remote_enabled, socket_manager=socket_manager, scan_stats=scan_stats, calculate_hashes=calculate_hashes, @@ -589,6 +593,7 @@ async def scan_platforms( metadata_sources: list[str], scan_type: ScanType = ScanType.QUICK, roms_ids: list[int] | None = None, + launchbox_remote_enabled: bool = True, ) -> ScanStats: """Scan all the listed platforms and fetch metadata from different sources @@ -663,6 +668,7 @@ async def stop_scan(): fs_platforms=fs_platforms, roms_ids=roms_ids, metadata_sources=metadata_sources, + launchbox_remote_enabled=launchbox_remote_enabled, socket_manager=socket_manager, scan_stats=scan_stats, calculate_hashes=calculate_hashes, @@ -702,6 +708,7 @@ async def scan_handler(_sid: str, options: dict[str, Any]): scan_type = ScanType[options.get("type", "quick").upper()] roms_ids = options.get("roms_ids", []) metadata_sources = options.get("apis", []) + launchbox_remote_enabled = bool(options.get("launchbox_remote_enabled", True)) if DEV_MODE: return await scan_platforms( @@ -709,6 +716,7 @@ async def scan_handler(_sid: str, options: dict[str, Any]): metadata_sources=metadata_sources, scan_type=scan_type, roms_ids=roms_ids, + launchbox_remote_enabled=launchbox_remote_enabled, ) return high_prio_queue.enqueue( @@ -717,6 +725,7 @@ async def scan_handler(_sid: str, options: dict[str, Any]): metadata_sources=metadata_sources, scan_type=scan_type, roms_ids=roms_ids, + launchbox_remote_enabled=launchbox_remote_enabled, job_timeout=SCAN_TIMEOUT, # Timeout (default of 4 hours) result_ttl=TASK_RESULT_TTL, meta={ diff --git a/backend/handler/metadata/launchbox_handler.py b/backend/handler/metadata/launchbox_handler.py index c8cd5f6eb..14c7c34d6 100644 --- a/backend/handler/metadata/launchbox_handler.py +++ b/backend/handler/metadata/launchbox_handler.py @@ -1,11 +1,14 @@ import json import re +from dataclasses import dataclass from datetime import datetime +from pathlib import Path, PureWindowsPath from typing import Final, NotRequired, TypedDict import pydash +from defusedxml import ElementTree as ET -from config import LAUNCHBOX_API_ENABLED +from config import LAUNCHBOX_API_ENABLED, ROMM_BASE_PATH from handler.redis_handler import async_cache from logger.logger import log from utils.database import safe_str_to_bool @@ -23,14 +26,13 @@ LAUNCHBOX_MAME_KEY: Final[str] = "romm:launchbox_mame" LAUNCHBOX_FILES_KEY: Final[str] = "romm:launchbox_files" -# Regex to detect LaunchBox ID tags in filenames like (launchbox-12345) -LAUNCHBOX_TAG_REGEX = re.compile(r"\(launchbox-(\d+)\)", re.IGNORECASE) +LAUNCHBOX_LOCAL_DIR: Final[Path] = Path(ROMM_BASE_PATH) / "temp" +LAUNCHBOX_PLATFORMS_DIR: Final[Path] = LAUNCHBOX_LOCAL_DIR / "Data" / "Platforms" +LAUNCHBOX_IMAGES_DIR: Final[Path] = LAUNCHBOX_LOCAL_DIR / "Images" +LAUNCHBOX_MANUALS_DIR: Final[Path] = LAUNCHBOX_LOCAL_DIR / "Manuals" +LAUNCHBOX_VIDEOS_DIR: Final[Path] = LAUNCHBOX_LOCAL_DIR / "Videos" - -class LaunchboxPlatform(TypedDict): - slug: str - launchbox_id: int | None - name: NotRequired[str] +LOCAL_XML_INDEX_CACHE: dict[str, tuple[int, dict[str, dict[str, str]]]] = {} class LaunchboxImage(TypedDict): @@ -39,6 +41,13 @@ class LaunchboxImage(TypedDict): region: NotRequired[str] +class LaunchboxPlatform(TypedDict): + slug: str + launchbox_id: int | None + name: NotRequired[str] + images: NotRequired[list[LaunchboxImage]] + + class LaunchboxMetadata(TypedDict): first_release_date: int | None max_players: NotRequired[int] @@ -59,11 +68,106 @@ class LaunchboxRom(BaseRom): launchbox_metadata: NotRequired[LaunchboxMetadata] -def extract_video_id_from_youtube_url(url: str | None) -> str: - """ - Extracts the video ID from a YouTube URL. - Returns None if the URL is not a valid YouTube URL. - """ +def _sanitize_filename(stem: str) -> str: + s = (stem or "").strip() + s = s.replace("’", "'") + s = re.sub(r"[:']", "_", s) + s = re.sub(r"[\\/|<>\"?*]", "_", s) + s = re.sub(r"\s+", " ", s) + s = re.sub(r"_+", "_", s) + return s.strip(" .") + + +def _file_uri_for_local_path(path: Path) -> str | None: + try: + _ = path.resolve().relative_to(LAUNCHBOX_LOCAL_DIR.resolve()) + except Exception: + return None + return f"file://{str(path)}" + + +def _safe_int(value: object) -> int | None: + if value is None: + return None + + try: + as_str = str(value).strip() + if not as_str: + return None + parsed = int(as_str) + return parsed or None + except (TypeError, ValueError): + return None + + +def _coalesce(*values: object | None) -> str | None: + for v in values: + if v is None: + continue + s = str(v).strip() + if s: + return s + return None + + +def _parse_list(value: str | None) -> list[str]: + if not value: + return [] + parts = re.split(r"[;,]", value) + return [p.strip() for p in parts if p and p.strip()] + + +def _dedupe_words(values: list[str]) -> list[str]: + seen: dict[str, int] = {} + out: list[str] = [] + + for v in values: + cleaned = v.strip() + key = cleaned.lower() + if not key: + continue + + if key not in seen: + seen[key] = len(out) + out.append(cleaned) + continue + + # Prefer the best-cased representation when duplicates differ only by case. + idx = seen[key] + current = out[idx] + if current.islower() and not cleaned.islower(): + out[idx] = cleaned + + return out + + +def _parse_release_date(value: str | None) -> int | None: + if not value: + return None + + try: + iso = value.replace("Z", "+00:00") + return int(datetime.fromisoformat(iso).timestamp()) + except ValueError: + pass + + for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%d"): + try: + return int(datetime.strptime(value, fmt).timestamp()) + except ValueError: + continue + + return None + + +def _parse_playmode(play_mode: str | None) -> bool: + if not play_mode: + return False + pm = play_mode.lower() + return bool(re.search(r"\b(cooperative|coop|co-op)\b", pm)) + + +def _parse_videourl(url: str | None) -> str: if not url: return "" @@ -75,162 +179,731 @@ def extract_video_id_from_youtube_url(url: str | None) -> str: return "" -def extract_metadata_from_launchbox_rom( - index_entry: dict, game_images: list[dict] | None +def build_launchbox_metadata( + *, + local: dict[str, str] | None = None, + remote: dict | None = None, + images: list[LaunchboxImage], + **kwargs: object, ) -> LaunchboxMetadata: + if local is None and isinstance(kwargs.get("local_entry"), dict): + local = kwargs["local_entry"] # type: ignore[assignment] + + local_release_date = local.get("ReleaseDate") if local else None + remote_release_date = remote.get("ReleaseDate") if remote else None + release_date_raw = _coalesce(local_release_date, remote_release_date) + first_release_date = _parse_release_date(release_date_raw) + + max_players_raw = _coalesce( + local.get("MaxPlayers") if local else None, + remote.get("MaxPlayers") if remote else None, + ) try: - first_release_date = int( - datetime.strptime( - index_entry["ReleaseDate"], "%Y-%m-%dT%H:%M:%S%z" - ).timestamp() + max_players = int(max_players_raw or 0) + except (TypeError, ValueError): + max_players = 0 + + release_type = _coalesce( + local.get("ReleaseType") if local else None, + remote.get("ReleaseType") if remote else None, + ) or "" + + if local and _coalesce(local.get("PlayMode")): + cooperative = _parse_playmode(local.get("PlayMode")) + else: + cooperative = safe_str_to_bool( + (remote.get("Cooperative") if remote else None) or "false" ) - except (ValueError, KeyError, IndexError): - first_release_date = None + + video_url = _coalesce( + (local.get("VideoUrl") if local else None), + (remote.get("VideoURL") if remote else None), + ) + + community_rating_raw = _coalesce( + local.get("CommunityStarRating") if local else None, + remote.get("CommunityRating") if remote else None, + ) + try: + community_rating = float(community_rating_raw or 0.0) + except (TypeError, ValueError): + community_rating = 0.0 + + community_rating_count_raw = _coalesce( + local.get("CommunityStarRatingTotalVotes") if local else None, + remote.get("CommunityRatingCount") if remote else None, + ) + try: + community_rating_count = int(community_rating_count_raw or 0) + except (TypeError, ValueError): + community_rating_count = 0 + + wikipedia_url = _coalesce( + local.get("WikipediaURL") if local else None, + remote.get("WikipediaURL") if remote else None, + ) or "" + + esrb_raw = _coalesce( + (local.get("Rating") if local else None), + (remote.get("ESRB") if remote else None), + ) + esrb = (esrb_raw or "").split(" - ")[0].strip() + + genres_raw = _coalesce( + local.get("Genre") if local else None, + remote.get("Genres") if remote else None, + ) + genres = _parse_list(genres_raw) + + publisher = _coalesce( + local.get("Publisher") if local else None, + remote.get("Publisher") if remote else None, + ) + developer = _coalesce( + local.get("Developer") if local else None, + remote.get("Developer") if remote else None, + ) + companies = _dedupe_words(pydash.compact([publisher, developer])) return LaunchboxMetadata( { "first_release_date": first_release_date, - "max_players": int(index_entry.get("MaxPlayers") or 0), - "release_type": index_entry.get("ReleaseType", ""), - "cooperative": safe_str_to_bool(index_entry.get("Cooperative") or "false"), - "youtube_video_id": extract_video_id_from_youtube_url( - index_entry.get("VideoURL") - ), - "community_rating": float(index_entry.get("CommunityRating") or 0.0), - "community_rating_count": int(index_entry.get("CommunityRatingCount") or 0), - "wikipedia_url": index_entry.get("WikipediaURL", ""), - "esrb": index_entry.get("ESRB", "").split(" - ")[0].strip(), - "genres": ( - index_entry["Genres"].split() if index_entry.get("Genres", None) else [] - ), - "companies": pydash.compact( - [ - index_entry.get("Publisher", None), - index_entry.get("Developer", None), - ] - ), - "images": [ - LaunchboxImage( - { - "url": f"https://images.launchbox-app.com/{image['FileName']}", - "type": image.get("Type", ""), - "region": image.get("Region", ""), - } - ) - for image in game_images or [] - ], + "max_players": max_players, + "release_type": release_type, + "cooperative": cooperative, + "youtube_video_id": _parse_videourl(video_url), + "community_rating": community_rating, + "community_rating_count": community_rating_count, + "wikipedia_url": wikipedia_url, + "esrb": esrb, + "genres": genres, + "companies": companies, + "images": images, } ) +class _LocalMediaContext(TypedDict): + base: Path + stems: list[str] + preferred_regions: list[str] + + +@dataclass(frozen=True, slots=True) +class _MediaRequest: + platform_name: str | None + fs_name: str + title: str + region_hint: str | None + remote_images: list[dict] | None + remote_enabled: bool + + +def _local_media_req( + *, + platform_name: str | None, + fs_name: str, + local: dict[str, str] | None, + remote: dict | None, + remote_images: list[dict] | None, + remote_enabled: bool, +) -> _MediaRequest: + title = ((local or {}).get("Title") or "").strip() + region_hint = (((local or {}).get("Region") or "").strip() or None) + return _MediaRequest( + platform_name, + fs_name, + title, + region_hint, + remote_images, + remote_enabled, + ) + + +def _remote_media_req( + *, + remote: dict | None, + remote_images: list[dict] | None, + remote_enabled: bool, +) -> _MediaRequest: + title = ((remote or {}).get("Name") or "").strip() + return _MediaRequest( + None, + "", + title, + None, + remote_images, + remote_enabled, + ) + + +def _build_local_media_context( + req: _MediaRequest, + base_dir: Path, + *, + include_region_hints: bool = True, +) -> _LocalMediaContext | None: + if not req.platform_name: + return None + + if not base_dir.exists(): + return None + base = (base_dir / req.platform_name).resolve() + if not base.is_dir(): + return None + + stems: list[str] = [] + if req.fs_name: + stems.append(Path(req.fs_name).stem) + if req.title: + stems.append(req.title) + + out: list[str] = [] + for s in stems: + clean = _sanitize_filename(s) + if clean and clean not in out: + out.append(clean) + stems = out + if not stems: + return None + + preferred_regions: list[str] = [] + if include_region_hints and req.region_hint: + region_hint = req.region_hint.strip() + if region_hint: + preferred_regions.append(region_hint) + if "," in region_hint: + preferred_regions.extend( + [r.strip() for r in region_hint.split(",") if r.strip()] + ) + + return { + "base": base, + "stems": stems, + "preferred_regions": preferred_regions, + } + + +def _find_local_media_candidates( + ctx: _LocalMediaContext, + category_name: str, + *, + exts: tuple[str, ...] = (".png", ".jpg", ".jpeg", ".webp"), + indexed_preference: tuple[int, ...] | None = None, + indexed_only_preferred: bool = False, +) -> tuple[list[Path], str]: + category_dir = ctx["base"] / category_name + if not category_dir.is_dir(): + return [], "" + + search_dirs: list[Path] = [] + + for region in ctx["preferred_regions"]: + p = category_dir / region + if p.exists() and p.is_dir() and p not in search_dirs: + search_dirs.append(p) + + for p in sorted( + [p for p in category_dir.iterdir() if p.is_dir()], + key=lambda p: p.name.lower(), + ): + if p not in search_dirs: + search_dirs.append(p) + + if category_dir not in search_dirs: + search_dirs.append(category_dir) + + if not search_dirs: + return [], "" + allowed_exts = {e.lower() for e in exts} + + def _candidates(d: Path, stem: str) -> list[Path]: + if not stem: + return [] + + plain: Path | None = None + indexed: list[tuple[int, Path]] = [] + prefix = f"{stem}-" + + for p in d.iterdir(): + if not (p.is_file() and p.suffix.lower() in allowed_exts): + continue + + stem_name = p.stem + if stem_name == stem: + plain = p + continue + + if stem_name.startswith(prefix): + suffix = stem_name[len(prefix) :] + if suffix.isdigit(): + indexed.append((int(suffix), p)) + + if indexed: + indexed.sort(key=lambda t: (t[0], t[1].name.lower())) + if indexed_preference: + indexed_by_num: dict[int, Path] = {n: p for n, p in indexed} + preferred_hits = [indexed_by_num[n] for n in indexed_preference if n in indexed_by_num] + if preferred_hits: + return preferred_hits + if indexed_only_preferred: + return [plain] if plain else [] + + return [p for _, p in indexed] + + return [plain] if plain else [] + + for d in search_dirs: + region = "" if d == category_dir else d.name + for stem in ctx["stems"]: + candidate_files = _candidates(d, stem) + if candidate_files: + return candidate_files, region + + return [], "" + + +def _get_cover(req: _MediaRequest) -> str | None: + cover: str | None = None + + cover_priority_types = ( + "Box - Front", + "Box - Front - Reconstructed", + "Fanart - Box - Front", + "Box - 3D", + "Amazon Poster", + "Epic Games Poster", + "GOG Poster", + "Steam Poster", + ) + + # Remote media fallback (only if allowed) + if req.remote_enabled and req.remote_images: + best_cover: dict | None = None + for image_type in cover_priority_types: + for image in req.remote_images: + if image.get("Type") == image_type and image.get("FileName"): + best_cover = image + break + if best_cover is not None: + break + + if best_cover and best_cover.get("FileName"): + cover = f"https://images.launchbox-app.com/{best_cover.get('FileName')}" + + ctx = _build_local_media_context(req, LAUNCHBOX_IMAGES_DIR, include_region_hints=True) + if ctx is not None: + for category in cover_priority_types: + candidate_files, _region = _find_local_media_candidates( + ctx, + category, + indexed_preference=(1,), + indexed_only_preferred=True, + ) + if not candidate_files: + continue + + cover_path = candidate_files[0] + url = _file_uri_for_local_path(cover_path) + if url: + cover = url + break + + return cover + + +def _get_screenshots(req: _MediaRequest) -> list[str]: + screenshots: list[str] = [] + + # Remote media fallback (only if allowed) + if req.remote_enabled and req.remote_images: + screenshots = [ + f"https://images.launchbox-app.com/{image.get('FileName')}" + for image in req.remote_images + if image.get("FileName") and "Screenshot" in image.get("Type", "") + ] + + ctx = _build_local_media_context(req, LAUNCHBOX_IMAGES_DIR, include_region_hints=True) + if ctx is not None: + local_screens: list[str] = [] + seen: set[str] = set() + for dir_name in ( + "Amazon Screenshot", + "Epic Games Screenshot", + "GOG Screenshot", + "Origin Screenshot", + "Screenshot - Game Title", + "Screenshot - Game Select", + "Screenshot - Gameplay", + "Screenshot - High Scores", + "Screenshot - Game Over", + "Steam Screenshot", + ): + candidate_files, _region = _find_local_media_candidates(ctx, dir_name) + for p in candidate_files: + url = _file_uri_for_local_path(p) + if url and url not in seen: + seen.add(url) + local_screens.append(url) + + if local_screens: + screenshots = local_screens + + return screenshots + + +def _get_manuals(req: _MediaRequest) -> str | None: + manual: str | None = None + + ctx = _build_local_media_context(req, LAUNCHBOX_MANUALS_DIR, include_region_hints=False) + if ctx is None: + return manual + + pdfs: list[Path] = [ + p + for p in ctx["base"].iterdir() + if p.is_file() and p.suffix.lower() == ".pdf" + ] + if not pdfs: + return manual + + def _key(p: Path) -> str: + return _sanitize_filename(p.stem).lower() + + pdfs_sorted = sorted(pdfs, key=lambda p: (len(p.name), p.name.lower())) + + stems_lower = [s.lower() for s in ctx["stems"]] + + for stem in stems_lower: + for p in pdfs_sorted: + if _key(p) == stem: + url = _file_uri_for_local_path(p) + if url: + return url + + for stem in stems_lower: + for p in pdfs_sorted: + if _key(p).startswith(stem): + url = _file_uri_for_local_path(p) + if url: + return url + + return manual + + +def _get_images(req: _MediaRequest) -> list[LaunchboxImage]: + images: list[LaunchboxImage] = [] + + # Remote media fallback (only if allowed) + if req.remote_enabled and req.remote_images: + images = [ + LaunchboxImage( + { + "url": f"https://images.launchbox-app.com/{image['FileName']}", + "type": image.get("Type", ""), + "region": image.get("Region", ""), + } + ) + for image in req.remote_images + if image.get("FileName") + ] + + ctx = _build_local_media_context(req, LAUNCHBOX_IMAGES_DIR, include_region_hints=True) + if ctx is not None: + local_images: list[LaunchboxImage] = [] + for dir_name in ( + "Advertisement Flyer - Back", + "Advertisement Flyer - Front", + "Box - Back", + "Box - Back - Reconstructed", + "Box - Full", + "Box - Spine", + "Cart - Front", + "Cart - 3D", + "Clear Logo", + "Fanart - Box - Back", + "Fanart - Background", # Later separate in new category for rom header + "Amazon Background", # Later separate in new category for rom header + "Epic Games Background", # Later separate in new category for rom header + "Origin Background", # Later separate in new category for rom header + "Uplay Background", # Later separate in new category for rom header + ): + candidate_files, region = _find_local_media_candidates(ctx, dir_name) + for p in candidate_files: + url = _file_uri_for_local_path(p) + if not url: + continue + local_images.append( + LaunchboxImage( + { + "url": url, + "type": dir_name, + "region": region, + } + ) + ) + + if local_images: + images = local_images + + seen_urls: set[str] = set() + return [ + img + for img in images + if not (img["url"] in seen_urls or seen_urls.add(img["url"])) + ] + + +def build_rom( + *, + local: dict[str, str] | None, + remote: dict | None, + launchbox_id: int | None, + media_req: _MediaRequest | None = None, +) -> LaunchboxRom: + images: list[LaunchboxImage] = _get_images(media_req) if media_req is not None else [] + + url_cover: str | None = None + url_screenshots: list[str] = [] + url_manual: str | None = None + if media_req is not None: + url_cover = _get_cover(media_req) + url_screenshots = _get_screenshots(media_req) + url_manual = _get_manuals(media_req) + url_screenshots = url_screenshots or [] + + name = ( + _coalesce( + (local.get("Title") if local else None), + (remote.get("Name") if remote else None), + ) + or "" + ).strip() + + summary = ( + _coalesce( + (local.get("Notes") if local else None), + (remote.get("Overview") if remote else None), + ) + or "" + ).strip() + + rom = { + "launchbox_id": launchbox_id, + "name": name, + "summary": summary, + "url_cover": url_cover, + "url_screenshots": url_screenshots, + "url_manual": url_manual, + "launchbox_metadata": build_launchbox_metadata( + local=local, + remote=remote, + images=images, + ), + } + + return LaunchboxRom({k: v for k, v in rom.items() if v}) + + class LaunchboxHandler(MetadataHandler): @classmethod def is_enabled(cls) -> bool: - return LAUNCHBOX_API_ENABLED - + return LAUNCHBOX_API_ENABLED or LAUNCHBOX_PLATFORMS_DIR.exists() + async def heartbeat(self) -> bool: return self.is_enabled() - @staticmethod - def extract_launchbox_id_from_filename(fs_name: str) -> int | None: - """Extract LaunchBox ID from filename tag like (launchbox-12345).""" - match = LAUNCHBOX_TAG_REGEX.search(fs_name) - if match: - return int(match.group(1)) - return None - async def _get_rom_from_metadata( - self, file_name: str, platform_slug: str - ) -> dict | None: - if not (await async_cache.exists(LAUNCHBOX_METADATA_NAME_KEY)): - log.error("Could not find the Launchbox Metadata.xml file in cache") + async def _fetch_remote_images( + self, + *, + remote: dict | None = None, + database_id: str | int | None = None, + remote_enabled: bool = True, + ) -> list[dict] | None: + if not remote_enabled: return None - lb_platform = self.get_platform(platform_slug) - platform_name = lb_platform.get("name", None) - if not platform_name: + resolved_id = database_id + if resolved_id is None and remote is not None: + resolved_id = remote.get("DatabaseID") + + if not resolved_id: return None - metadata_name_index_entry = await async_cache.hget( - LAUNCHBOX_METADATA_NAME_KEY, f"{file_name}:{platform_name}" + metadata_image_index_entry = await async_cache.hget( + LAUNCHBOX_METADATA_IMAGE_KEY, str(resolved_id) ) - if metadata_name_index_entry: - return json.loads(metadata_name_index_entry) + if not metadata_image_index_entry: + return None - metadata_alternate_name_index_entry = await async_cache.hget( - LAUNCHBOX_METADATA_ALTERNATE_NAME_KEY, file_name - ) + return json.loads(metadata_image_index_entry) - if not metadata_alternate_name_index_entry: - return None - metadata_alternate_name_index_entry = json.loads( - metadata_alternate_name_index_entry - ) - database_id = metadata_alternate_name_index_entry["DatabaseID"] - metadata_database_index_entry = await async_cache.hget( - LAUNCHBOX_METADATA_DATABASE_ID_KEY, database_id + def get_platform(self, slug: str) -> LaunchboxPlatform: + slug_clean = (slug or "").strip().lower() + + resolved: UPS | None = None + for candidate in ( + slug_clean, + slug_clean.replace("-", ""), + slug_clean.replace("_", ""), + slug_clean.replace("-", "").replace("_", ""), + ): + if not candidate: + continue + try: + ups = UPS(candidate) + except ValueError: + continue + if ups in LAUNCHBOX_PLATFORM_LIST: + resolved = ups + break + + if resolved is None: + return LaunchboxPlatform(slug=slug_clean, launchbox_id=None) + + platform = LAUNCHBOX_PLATFORM_LIST[resolved] + + return LaunchboxPlatform( + slug=slug_clean, + launchbox_id=platform["id"], + name=platform["name"], ) - if not metadata_database_index_entry: + + async def _get_local_rom( + self, fs_name: str, platform_slug: str + ) -> dict[str, str] | None: + if not LAUNCHBOX_PLATFORMS_DIR.exists(): return None - return json.loads(metadata_database_index_entry) + platform_name = self.get_platform(platform_slug).get("name") + xml_path = LAUNCHBOX_PLATFORMS_DIR / f"{platform_name}.xml" if platform_name else None + if not xml_path or not xml_path.exists(): + return None - async def _get_game_images(self, database_id: str) -> list[dict] | None: - metadata_image_index_entry = await async_cache.hget( - LAUNCHBOX_METADATA_IMAGE_KEY, database_id - ) + try: + xml_path_str = str(xml_path.resolve()) + mtime_ns = xml_path.stat().st_mtime_ns - if not metadata_image_index_entry: + cached = LOCAL_XML_INDEX_CACHE.get(xml_path_str) + if cached is not None and cached[0] == mtime_ns: + index = cached[1] + else: + root = ET.parse(xml_path_str).getroot() + + index: dict[str, dict[str, str]] = {} + for game in root.findall(".//Game"): + entry: dict[str, str] = {} + for child in list(game): + if child.tag and child.text is not None: + entry[child.tag] = child.text + if not entry: + continue + + app_path = (entry.get("ApplicationPath") or "").strip() + if app_path: + app_base = PureWindowsPath(app_path).name.strip().lower() + if app_base: + index.setdefault(app_base, entry) + + title = (entry.get("Title") or "").strip().lower() + if title: + index.setdefault(f"title:{title}", entry) + + LOCAL_XML_INDEX_CACHE[xml_path_str] = (mtime_ns, index) + except Exception as e: + log.warning(f"Failed to parse local LaunchBox XML {xml_path}: {e}") return None - return json.loads(metadata_image_index_entry) + if not index: + return None - def _get_best_cover_image(self, game_images: list[dict]) -> dict | None: - """ - Get the best cover image from a list of game images based on priority order: - """ - # Define priority order - priority_types = [ - "Box - Front", - "Box - 3D", - "Fanart - Box - Front", - "Cart - Front", - "Cart - 3D", - ] + fs_key = (fs_name or "").strip().lower() + if not fs_key: + return None - for image_type in priority_types: - for image in game_images: - if image.get("Type") == image_type: - return image + direct = index.get(fs_key) + if direct is not None: + return direct - return None + try: + stem = Path(fs_name).stem.strip().lower() + except Exception: + stem = "" - def _get_screenshots(self, game_images: list[dict]) -> list[str]: - screenshots: list[str] = [] - for image in game_images: - if "Screenshot" in image.get("Type", ""): - screenshots.append( - f"https://images.launchbox-app.com/{image.get('FileName')}" - ) + if stem: + by_title = index.get(f"title:{stem}") + if by_title is not None: + return by_title - return screenshots + return index.get(f"title:{fs_key}") - def get_platform(self, slug: str) -> LaunchboxPlatform: - if slug not in LAUNCHBOX_PLATFORM_LIST: - return LaunchboxPlatform(slug=slug, launchbox_id=None) - platform = LAUNCHBOX_PLATFORM_LIST[UPS(slug)] + async def _get_remote_rom( + self, + file_name: str, + platform_slug: str, + *, + assume_cache_present: bool = False, + ) -> dict | None: + if not assume_cache_present and not ( + await async_cache.exists(LAUNCHBOX_METADATA_NAME_KEY) + ): + log.error("Could not find the Launchbox Metadata.xml file in cache") + return None - return LaunchboxPlatform( - slug=slug, - launchbox_id=platform["id"], - name=platform["name"], - ) + lb_platform = self.get_platform(platform_slug) + platform_name = lb_platform.get("name", None) + if not platform_name: + return None + + file_name_clean = (file_name or "").strip() + if not file_name_clean: + return None - async def get_rom(self, fs_name: str, platform_slug: str) -> LaunchboxRom: + candidates: list[str] = [file_name_clean] + lower = file_name_clean.lower() + if lower != file_name_clean: + candidates.append(lower) + + for candidate in candidates: + metadata_name_index_entry = await async_cache.hget( + LAUNCHBOX_METADATA_NAME_KEY, f"{candidate}:{platform_name}" + ) + if metadata_name_index_entry: + return json.loads(metadata_name_index_entry) + + for candidate in candidates: + metadata_alternate_name_index_entry = await async_cache.hget( + LAUNCHBOX_METADATA_ALTERNATE_NAME_KEY, candidate + ) + if not metadata_alternate_name_index_entry: + continue + + metadata_alternate_name_index_entry = json.loads( + metadata_alternate_name_index_entry + ) + database_id = metadata_alternate_name_index_entry["DatabaseID"] + metadata_database_index_entry = await async_cache.hget( + LAUNCHBOX_METADATA_DATABASE_ID_KEY, database_id + ) + if metadata_database_index_entry: + return json.loads(metadata_database_index_entry) + + return None + + + async def get_rom( + self, + fs_name: str, + platform_slug: str, + keep_tags: bool = False, + *, + remote_enabled: bool = True, + ) -> LaunchboxRom: from handler.filesystem import fs_rom_handler fallback_rom = LaunchboxRom(launchbox_id=None) @@ -238,11 +911,62 @@ async def get_rom(self, fs_name: str, platform_slug: str) -> LaunchboxRom: if not self.is_enabled(): return fallback_rom - # Check for LaunchBox ID tag in filename first - launchbox_id_from_tag = self.extract_launchbox_id_from_filename(fs_name) - if launchbox_id_from_tag: + local = await self._get_local_rom(fs_name, platform_slug) + + remote_available = remote_enabled and bool( + await async_cache.exists(LAUNCHBOX_METADATA_NAME_KEY) + ) + + if local is not None: + launchbox_id_local = _safe_int(local.get("DatabaseID")) + remote: dict | None = None + if remote_available: + if launchbox_id_local is not None: + metadata_database_index_entry = await async_cache.hget( + LAUNCHBOX_METADATA_DATABASE_ID_KEY, str(launchbox_id_local) + ) + if metadata_database_index_entry: + remote = json.loads(metadata_database_index_entry) + + if remote is None: + local_title = (local.get("Title") or "").strip() + if local_title: + remote = await self._get_remote_rom( + local_title, + platform_slug, + assume_cache_present=True, + ) + platform_name = self.get_platform(platform_slug).get("name") + remote_images = await self._fetch_remote_images( + remote=remote, remote_enabled=remote_available + ) + media_req = _local_media_req( + platform_name=platform_name, + fs_name=fs_name, + local=local, + remote=remote, + remote_images=remote_images, + remote_enabled=remote_available, + ) + return build_rom( + local=local, + remote=remote, + launchbox_id=launchbox_id_local + or (remote.get("DatabaseID") if remote else None), + media_req=media_req, + ) + + if not remote_available: + return fallback_rom + + match = re.search(r"\(launchbox-(\d+)\)", fs_name, flags=re.IGNORECASE) + launchbox_id_from_tag = int(match.group(1)) if match else None + + if launchbox_id_from_tag is not None: log.debug(f"Found LaunchBox ID tag in filename: {launchbox_id_from_tag}") - rom_by_id = await self.get_rom_by_id(launchbox_id_from_tag) + rom_by_id = await self.get_rom_by_id( + launchbox_id_from_tag, remote_enabled=remote_enabled + ) if rom_by_id["launchbox_id"]: log.debug( f"Successfully matched ROM by LaunchBox ID tag: {fs_name} -> {launchbox_id_from_tag}" @@ -253,45 +977,46 @@ async def get_rom(self, fs_name: str, platform_slug: str) -> LaunchboxRom: f"LaunchBox ID {launchbox_id_from_tag} from filename tag not found in LaunchBox" ) - # We replace " - " with ": " to match Launchbox's naming convention - search_term = fs_rom_handler.get_file_name_with_no_tags(fs_name).replace( - " - ", ": " + if keep_tags: + search_term = fs_name + else: + search_term = fs_rom_handler.get_file_name_with_no_tags(fs_name).replace( + " - ", ": " + ) + index_entry = await self._get_remote_rom( + search_term, + platform_slug, + assume_cache_present=True, ) - index_entry = await self._get_rom_from_metadata(search_term, platform_slug) if not index_entry: return fallback_rom - url_cover = None - url_screenshots = [] - - game_images = await self._get_game_images(index_entry["DatabaseID"]) - if game_images: - best_cover = self._get_best_cover_image(game_images) - if best_cover: - url_cover = ( - f"https://images.launchbox-app.com/{best_cover.get('FileName')}" - ) - - url_screenshots = self._get_screenshots(game_images) - - rom = { - "launchbox_id": index_entry["DatabaseID"], - "name": index_entry["Name"], - "summary": index_entry.get("Overview", ""), - "url_cover": url_cover, - "url_screenshots": url_screenshots, - "launchbox_metadata": extract_metadata_from_launchbox_rom( - index_entry, game_images - ), - } + remote_images = await self._fetch_remote_images( + remote=index_entry, remote_enabled=remote_available + ) + media_req = _remote_media_req( + remote=index_entry, + remote_images=remote_images, + remote_enabled=remote_available, + ) - return LaunchboxRom({k: v for k, v in rom.items() if v}) # type: ignore[misc] + return build_rom( + local=None, + remote=index_entry, + launchbox_id=index_entry["DatabaseID"], + media_req=media_req, + ) - async def get_rom_by_id(self, database_id: int) -> LaunchboxRom: + async def get_rom_by_id( + self, database_id: int, *, remote_enabled: bool = True + ) -> LaunchboxRom: if not self.is_enabled(): return LaunchboxRom(launchbox_id=None) + if not remote_enabled: + return LaunchboxRom(launchbox_id=None) + metadata_database_index_entry = await async_cache.hget( LAUNCHBOX_METADATA_DATABASE_ID_KEY, str(database_id) ) @@ -299,29 +1024,22 @@ async def get_rom_by_id(self, database_id: int) -> LaunchboxRom: if not metadata_database_index_entry: return LaunchboxRom(launchbox_id=None) - # Parse the JSON string from cache metadata_database_index_entry = json.loads(metadata_database_index_entry) - game_images = await self._get_game_images( - metadata_database_index_entry["DatabaseID"] + remote_images = await self._fetch_remote_images( + remote=metadata_database_index_entry, remote_enabled=remote_enabled + ) + media_req = _remote_media_req( + remote=metadata_database_index_entry, + remote_images=remote_images, + remote_enabled=remote_enabled, ) - rom = { - "launchbox_id": database_id, - "name": metadata_database_index_entry["Name"], - "summary": metadata_database_index_entry.get("Overview", ""), - "launchbox_metadata": extract_metadata_from_launchbox_rom( - metadata_database_index_entry, - game_images, - ), - } - - return LaunchboxRom({k: v for k, v in rom.items() if v}) # type: ignore[misc] - - async def get_matched_rom_by_id(self, database_id: int) -> LaunchboxRom | None: - if not self.is_enabled(): - return None - - return await self.get_rom_by_id(database_id) + return build_rom( + local=None, + remote=metadata_database_index_entry, + launchbox_id=database_id, + media_req=media_req, + ) async def get_matched_roms_by_name( self, search_term: str, platform_slug: str @@ -329,8 +1047,15 @@ async def get_matched_roms_by_name( if not self.is_enabled(): return [] - rom = await self.get_rom(search_term, platform_slug) - return [rom] if rom else [] + rom = await self.get_rom(search_term, platform_slug, True, remote_enabled=True) + return [rom] if rom.get("launchbox_id") else [] + + async def get_matched_rom_by_id(self, database_id: int) -> LaunchboxRom | None: + if not self.is_enabled(): + return None + + rom = await self.get_rom_by_id(database_id, remote_enabled=True) + return rom if rom.get("launchbox_id") else None class SlugToLaunchboxId(TypedDict): diff --git a/backend/handler/scan_handler.py b/backend/handler/scan_handler.py index 9572b0b00..c755fa815 100644 --- a/backend/handler/scan_handler.py +++ b/backend/handler/scan_handler.py @@ -287,6 +287,7 @@ async def scan_rom( fs_rom: FSRom, metadata_sources: list[str], newly_added: bool, + launchbox_remote_enabled: bool = True, socket_manager: socketio.AsyncRedisManager | None = None, ) -> Rom: rom_attrs = { @@ -585,13 +586,17 @@ async def fetch_launchbox_rom(platform_slug: str) -> LaunchboxRom: and rom.platform_slug in LAUNCHBOX_PLATFORM_LIST ) ): - if scan_type == ScanType.UPDATE and rom.launchbox_id: - return await meta_launchbox_handler.get_rom_by_id(rom.launchbox_id) - else: - return await meta_launchbox_handler.get_rom( - rom_attrs["fs_name"], platform_slug + if scan_type == ScanType.UPDATE and rom.launchbox_id and launchbox_remote_enabled: + return await meta_launchbox_handler.get_rom_by_id( + rom.launchbox_id, remote_enabled=True ) + return await meta_launchbox_handler.get_rom( + rom_attrs["fs_name"], + platform_slug, + remote_enabled=launchbox_remote_enabled, + ) + return LaunchboxRom(launchbox_id=None) async def fetch_ra_rom(hasheous_rom: HasheousRom) -> RAGameRom: diff --git a/backend/tasks/scheduled/update_launchbox_metadata.py b/backend/tasks/scheduled/update_launchbox_metadata.py index f87708ef1..5301d7721 100644 --- a/backend/tasks/scheduled/update_launchbox_metadata.py +++ b/backend/tasks/scheduled/update_launchbox_metadata.py @@ -7,6 +7,7 @@ from config import ( ENABLE_SCHEDULED_UPDATE_LAUNCHBOX_METADATA, + LAUNCHBOX_API_ENABLED, SCHEDULED_UPDATE_LAUNCHBOX_METADATA_CRON, ) from handler.metadata import meta_launchbox_handler @@ -44,7 +45,10 @@ def __init__(self): async def run(self, force: bool = False) -> dict[str, Any]: update_stats = UpdateStats() - if not meta_launchbox_handler.is_enabled(): + # This task pulls remote metadata from LaunchBox (Metadata.zip) and should + # only run when the LaunchBox API integration is enabled. + # `meta_launchbox_handler.is_enabled()` may also be true for local-only mode. + if not LAUNCHBOX_API_ENABLED: log.warning("Launchbox API is not enabled, skipping metadata update") return update_stats.to_dict() diff --git a/frontend/src/locales/en_US/scan.json b/frontend/src/locales/en_US/scan.json index 2f8b07e67..378f9ffff 100644 --- a/frontend/src/locales/en_US/scan.json +++ b/frontend/src/locales/en_US/scan.json @@ -21,6 +21,7 @@ "hasheous-requires-hashes": "Hasheous requires hash calculation to be enabled", "retroachievements-requires-hashes": "RetroAchievements requires hash calculation to be enabled", "manage-library": "Manage library", + "launchbox-remote": "LaunchBox remote (enrich local with remote)", "metadata-sources": "Metadata sources", "new-platforms": "New platforms", "new-platforms-desc": "Scan new platforms only (fastest)", diff --git a/frontend/src/locales/fr_FR/scan.json b/frontend/src/locales/fr_FR/scan.json index 4552e6aad..32812508b 100644 --- a/frontend/src/locales/fr_FR/scan.json +++ b/frontend/src/locales/fr_FR/scan.json @@ -21,6 +21,7 @@ "hasheous-requires-hashes": "Hasheous nécessite que le calcul de hachage soit activé", "retroachievements-requires-hashes": "RetroAchievements nécessite que le calcul de hachage soit activé", "manage-library": "Gérer la bibliothèque", + "launchbox-remote": "LaunchBox remote (enrichir le local avec le remote)", "metadata-sources": "Sources de métadonnées", "new-platforms": "Nouvelles plateformes", "new-platforms-desc": "Scanner uniquement les plateformes récemment ajoutées (plus rapide)", diff --git a/frontend/src/views/Scan.vue b/frontend/src/views/Scan.vue index 02e162db1..a26dd4f05 100644 --- a/frontend/src/views/Scan.vue +++ b/frontend/src/views/Scan.vue @@ -16,6 +16,7 @@ import storePlatforms from "@/stores/platforms"; import storeScanning from "@/stores/scanning"; const LOCAL_STORAGE_METADATA_SOURCES_KEY = "scan.metadataSources"; +const LOCAL_STORAGE_LAUNCHBOX_REMOTE_ENABLED_KEY = "scan.launchboxRemoteEnabled"; const { t } = useI18n(); const { xs, smAndDown } = useDisplay(); const scanningStore = storeScanning(); @@ -66,12 +67,20 @@ const storedMetadataSources = useLocalStorage( LOCAL_STORAGE_METADATA_SOURCES_KEY, [] as string[], ); +const launchboxRemoteEnabled = useLocalStorage( + LOCAL_STORAGE_LAUNCHBOX_REMOTE_ENABLED_KEY, + true, +); const metadataSources = ref( metadataOptions.value.filter( (m) => storedMetadataSources.value.includes(m.value) && !m.disabled, ) || heartbeat.getEnabledMetadataOptions(), ); +const isLaunchboxSelected = computed(() => + metadataSources.value.some((s) => s.value === "launchbox"), +); + watch(metadataOptions, (newOptions) => { // Remove any sources that are now disabled metadataSources.value = metadataSources.value.filter((s) => @@ -137,6 +146,7 @@ async function scan() { platforms: platformsToScan.value, type: scanType.value, apis: metadataSources.value.map((s) => s.value), + launchbox_remote_enabled: launchboxRemoteEnabled.value, }); } @@ -350,6 +360,24 @@ async function stopScan() { + + -