Skip to content
This repository was archived by the owner on Jan 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 107 additions & 11 deletions RomM/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import datetime
import json
import math
import os
Expand All @@ -11,6 +12,7 @@

import platform_maps
from filesystem import Filesystem
from imageutils import ImageUtils
from models import Collection, Platform, Rom
from PIL import Image
from status import Status, View
Expand All @@ -28,6 +30,7 @@ class API:
def __init__(self):
self.status = Status()
self.file_system = Filesystem()
self.image_utils = ImageUtils()

self.host = os.getenv("HOST", "").strip("/")
self.username = os.getenv("USERNAME", "")
Expand All @@ -37,6 +40,11 @@ def __init__(self):
self._include_collections = set(self._getenv_list("INCLUDE_COLLECTIONS"))
self._exclude_collections = set(self._getenv_list("EXCLUDE_COLLECTIONS"))
self._collection_type = os.getenv("COLLECTION_TYPE", "collection")
self._download_assets = os.getenv("DOWNLOAD_ASSETS", "false") in ("true", "1")
self._fullscreen_assets = os.getenv("FULLSCREEN_ASSETS", "false") in (
"true",
"1",
)

if self.username and self.password:
credentials = f"{self.username}:{self.password}"
Expand Down Expand Up @@ -234,6 +242,7 @@ def fetch_platforms(self) -> None:
self.status.valid_host = False
self.status.valid_credentials = False
return

platforms = json.loads(response.read().decode("utf-8"))
_platforms: list[Platform] = []

Expand All @@ -251,7 +260,7 @@ def fetch_platforms(self) -> None:

for platform in platforms:
if platform["rom_count"] > 0:
platform_slug = platform["slug"].lower()
platform_slug: str = platform["slug"].lower()
if (
platform_maps._env_maps
and platform_slug in platform_maps._env_platforms
Expand Down Expand Up @@ -458,7 +467,7 @@ def fetch_roms(self) -> None:

_roms = []
for rom in roms:
platform_slug = rom["platform_slug"].lower()
platform_slug: str = rom["platform_slug"].lower()
if (
platform_maps._env_maps
and platform_slug in platform_maps._env_platforms
Expand All @@ -476,22 +485,48 @@ def fetch_roms(self) -> None:
)
if mapped_folder.lower() not in roms_subfolders:
continue

if view == View.PLATFORMS and platform_slug != selected_platform_slug:
continue

metadatum = rom.get("metadatum", {})
_roms.append(
Rom(
id=rom["id"],
name=rom["name"],
fs_name=rom["fs_name"],
platform_id=rom["platform_id"],
platform_slug=rom["platform_slug"],
fs_name=rom["fs_name"],
fs_name_no_tags=rom["fs_name_no_tags"],
fs_name_no_ext=rom["fs_name_no_ext"],
fs_extension=rom["fs_extension"],
fs_size=self._human_readable_size(rom["fs_size_bytes"]),
fs_size_bytes=rom["fs_size_bytes"],
multi=rom["multi"],
languages=rom["languages"],
regions=rom["regions"],
revision=rom["revision"],
tags=rom["tags"],
name=rom["name"],
slug=rom["slug"],
summary=rom["summary"],
youtube_video_id=rom.get("youtube_video_id", None),
path_cover_small=rom["path_cover_small"],
path_cover_large=rom["path_cover_large"],
is_identified=rom["is_identified"],
revision=rom.get("revision", None),
regions=rom.get("regions", []),
languages=rom.get("languages", []),
tags=rom.get("tags", []),
crc_hash=rom.get("crc_hash", ""),
md5_hash=rom.get("md5_hash", ""),
sha1_hash=rom.get("sha1_hash", ""),
has_simple_single_file=rom.get("has_simple_single_file", False),
has_nested_single_file=rom.get("has_nested_single_file", False),
has_multiple_files=rom.get("has_multiple_files", False),
merged_screenshots=rom.get("merged_screenshots", []),
genres=metadatum.get("genres", []),
franchises=metadatum.get("franchises", []),
collections=metadatum.get("collections", []),
companies=metadatum.get("companies", []),
game_modes=metadatum.get("game_modes", []),
age_ratings=metadatum.get("age_ratings", []),
first_release_date=metadatum.get("first_release_date", None),
average_rating=metadatum.get("average_rating", None),
)
)

Expand Down Expand Up @@ -532,6 +567,7 @@ def download_rom(self) -> None:
except ValueError:
self._reset_download_status()
return

try:
if request.type not in ("http", "https"):
self._reset_download_status()
Expand Down Expand Up @@ -563,10 +599,11 @@ def download_rom(self) -> None:
self._reset_download_status(True, True)
os.remove(dest_path)
return

# Handle multi-file (ZIP) ROMs
if rom.multi:
if rom.has_multiple_files:
self.status.extracting_rom = True
print("Multi file rom detected. Extracting...")
print("Multi-file rom detected. Extracting...")
with zipfile.ZipFile(dest_path, "r") as zip_ref:
total_size = sum(file.file_size for file in zip_ref.infolist())
extracted_size = 0
Expand Down Expand Up @@ -608,5 +645,64 @@ def download_rom(self) -> None:
except URLError:
self._reset_download_status(valid_host=True)
return

# Check if the catalogue path is set and valid
catalogue_path = self.file_system.get_catalogue_platform_path(
rom.platform_slug
)
if not catalogue_path:
continue

filename = self._sanitize_filename(rom.fs_name_no_ext)
if rom.summary:
text_path = os.path.join(
catalogue_path,
"text",
f"{filename}.txt",
)
os.makedirs(os.path.dirname(text_path), exist_ok=True)
with open(text_path, "w") as f:
f.write(rom.summary)
f.write("\n\n")

if rom.first_release_date:
dt = datetime.datetime.fromtimestamp(
rom.first_release_date / 1000
)
formatted_date = dt.strftime("%Y-%m-%d")
f.write(f"First release date: {formatted_date}\n")

if rom.average_rating:
f.write(f"Average rating: {rom.average_rating}\n")

if rom.genres:
f.write(f"Genres: {', '.join(rom.genres)}\n")

if rom.franchises:
f.write(f"Franchises: {', '.join(rom.franchises)}\n")

if rom.companies:
f.write(f"Companies: {', '.join(rom.companies)}\n")

# Don't download covers and previews if the user disabled the option
if not self._download_assets:
continue

box_path = os.path.join(catalogue_path, "box", f"{filename}.png")
preview_path = os.path.join(catalogue_path, "preview", f"{filename}.png")

# Download cover and preview images
os.makedirs(os.path.dirname(box_path), exist_ok=True)
os.makedirs(os.path.dirname(preview_path), exist_ok=True)

self.image_utils.process_assets(
fullscreen=self._fullscreen_assets,
cover_url=rom.path_cover_small,
screenshot_urls=rom.merged_screenshots,
box_path=box_path,
preview_path=preview_path,
headers=self.headers,
)

# End of download
self._reset_download_status(valid_host=True, valid_credentials=True)
4 changes: 4 additions & 0 deletions RomM/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ COLLECTION_TYPE=collection
# Do not display collections with these names (comma separated)
# EXCLUDE_COLLECTIONS=""

# Download cover images and screenshots
DOWNLOAD_ASSETS=1
FULLSCREEN_ASSETS=1

# Map RomM slugs to filesystem directories
# For example, if your PlayStation directory is called "psx":
# CUSTOM_MAPS='{"ps": "psx"}'
Expand Down
31 changes: 27 additions & 4 deletions RomM/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ class Filesystem:

# Storage paths for ROMs
_sd1_roms_storage_path: str
_sd2_roms_storage_path: str | None
_sd2_roms_storage_path: str | None = None
_sd1_catalogue_path: str | None = None
_sd2_catalogue_path: str | None = None

# Resources path: Use current working directory + "resources"
resources_path = os.path.join(os.getcwd(), "resources")
Expand All @@ -35,15 +37,15 @@ def __init__(self) -> None:
if self.is_muos:
self._sd1_roms_storage_path = "/mnt/mmc/ROMS"
self._sd2_roms_storage_path = "/mnt/sdcard/ROMS"
self._sd1_catalogue_path = "/mnt/mmc/MUOS/info/catalogue"
self._sd2_catalogue_path = "/mnt/sdcard/MUOS/info/catalogue"
elif self.is_spruceos:
self._sd1_roms_storage_path = "/mnt/SDCARD/Roms"
self._sd2_roms_storage_path = None
else:
# Go up two levels from the script's directory (e.g., from roms/ports/romm to roms/)
base_path = os.path.abspath(os.path.join(os.getcwd(), "..", ".."))
# Default to the ROMs directory, overridable via environment variable
self._sd1_roms_storage_path = os.environ.get("ROMS_STORAGE_PATH", base_path)
self._sd2_roms_storage_path = None

# Ensure the ROMs storage path exists
if self._sd2_roms_storage_path and not os.path.exists(
Expand Down Expand Up @@ -114,6 +116,20 @@ def _get_sd2_platforms_storage_path(self, platform: str) -> Optional[str]:
return os.path.join(self._sd2_roms_storage_path, platforms_dir)
return None

def get_sd1_catalogue_platform_path(self, platform: str) -> str:
if not self._sd1_catalogue_path:
raise ValueError("SD1 catalogue path is not set.")

platforms_dir = self._get_platform_storage_dir_from_mapping(platform)
return os.path.join(self._sd1_catalogue_path, platforms_dir)

def get_sd2_catalogue_platform_path(self, platform: str) -> str:
if not self._sd2_catalogue_path:
raise ValueError("SD2 catalogue path is not set.")

platforms_dir = self._get_platform_storage_dir_from_mapping(platform)
return os.path.join(self._sd2_catalogue_path, platforms_dir)

###
# PUBLIC METHODS
###
Expand Down Expand Up @@ -141,10 +157,17 @@ def get_platforms_storage_path(self, platform: str) -> str:

return self._get_sd1_platforms_storage_path(platform)

def get_catalogue_platform_path(self, platform: str) -> str:
"""Return the catalogue path for a specific platform."""
if self._current_sd == 2:
return self.get_sd2_catalogue_platform_path(platform)

return self.get_sd1_catalogue_platform_path(platform)

def is_rom_in_device(self, rom: Rom) -> bool:
"""Check if a ROM exists in the storage path."""
rom_path = os.path.join(
self.get_platforms_storage_path(rom.platform_slug),
rom.fs_name if not rom.multi else f"{rom.fs_name}.m3u",
rom.fs_name if not rom.has_multiple_files else f"{rom.fs_name}.m3u",
)
return os.path.exists(rom_path)
Loading