Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 130 additions & 5 deletions morgan/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import urllib.parse
import urllib.request
import zipfile
from collections import defaultdict
from typing import Dict, Iterable, Optional, Tuple

import packaging.requirements
Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(self, args: argparse.Namespace):
# into representations that are easier for the mirrorer to work with
self.index_path = args.index_path
self.index_url = args.index_url
self.mirror_all_wheels: bool = args.mirror_all_wheels
self.mirror_all_versions: bool = args.mirror_all_versions
self.package_type_regex: str = args.package_type_regex
self.config = configparser.ConfigParser(
Expand Down Expand Up @@ -278,7 +280,56 @@ def _filter_files(

# Now we only have files that satisfy the requirement, and we need to
# filter out files that do not match our environments.
files = list(filter(lambda file: self._matches_environments(file), files))
if self.mirror_all_wheels:
files = list(
filter(
lambda file: self._matches_environments(
file, self._supported_pyversions, self._supported_platforms
),
files,
)
)
else:
# Group files by version
files_by_version: defaultdict = defaultdict(list)
for file in files:
files_by_version[file["version"]].append(file)

# For each version, select the best matching wheels
files = []
for version_files in files_by_version.values():
wheels = [f for f in version_files if f.get("is_wheel", False)]
non_wheels = [f for f in version_files if not f.get("is_wheel", False)]

# Sort wheels by score (highest first)
wheels = sorted(
wheels, key=self._calculate_scores_for_wheel, reverse=True
)

selected_files = []

for python_version in self._supported_pyversions:
for platform_pattern in self._supported_platforms:
for non_wheel in non_wheels:
if self._matches_environments(
non_wheel, [python_version], [platform_pattern]
):
if non_wheel not in selected_files: # avoid duplicates
selected_files.append(non_wheel)
break

for wheel in wheels + non_wheels:
if self._matches_environments(
wheel, [python_version], [platform_pattern]
):
if wheel not in selected_files: # avoid duplicates
selected_files.append(wheel)
break

files.extend(selected_files)

# make sure the list is sorted by version again
files.sort(key=lambda file: file["version"], reverse=True)

if len(files) == 0:
print(f"Skipping {requirement}, no file matches environments")
Expand All @@ -292,7 +343,10 @@ def _filter_files(

return files

def _matches_environments(self, fileinfo: dict) -> bool:
@staticmethod
def _matches_environments(
fileinfo: dict, supported_pyversions: list, supported_platforms: list
) -> bool:
if req := fileinfo.get("requires-python"):
# The Python versions in all of our environments must be supported
# by this file in order to match.
Expand All @@ -308,7 +362,7 @@ def _matches_environments(self, fileinfo: dict) -> bool:
req = fileinfo["requires-python"] = re.sub(r"([0-9])\.?\*", r"\1", req)
try:
spec_set = packaging.specifiers.SpecifierSet(req)
for supported_python in self._supported_pyversions:
for supported_python in supported_pyversions:
if not spec_set.contains(supported_python):
# file does not support the Python version of one of our
# environments, reject it
Expand All @@ -331,7 +385,7 @@ def _matches_environments(self, fileinfo: dict) -> bool:
intrp_ver_matched = any(
map(
lambda supported_python: intrp_set.contains(supported_python),
self._supported_pyversions,
supported_pyversions,
)
)

Expand All @@ -340,7 +394,7 @@ def _matches_environments(self, fileinfo: dict) -> bool:

if tag.platform == "any":
return True
for platformre in self._supported_platforms:
for platformre in supported_platforms:
if platformre.fullmatch(tag.platform):
# tag matched, accept this file
return True
Expand All @@ -350,6 +404,67 @@ def _matches_environments(self, fileinfo: dict) -> bool:

return True

def _calculate_scores_for_wheel(self, file: dict) -> Tuple[int, int]:
"""Calculate scoring tuple for a file to determine best wheel selection.

Assigns high scores to non-wheel files (sdists) to ensure they're always included.
For wheel files, calculates scores based on Python version and manylinux tag
to enable selection of the most compatible wheel for each platform.

Args:
file: Dictionary containing package file information with keys 'is_wheel' and 'tags'

Returns:
A tuple of (python_score, platform_score) where:
- python_score (int): Python version as an integer (e.g., 3.11 = 311).
Non-wheels get 1e10 to ensure they're always kept.
- platform_score (int): Numeric representation of manylinux tag.
Modern formats use glibc version (e.g., manylinux_2_28 = 228).
Deprecated formats have fixed scores: manylinux2014 = 90,
manylinux2010 = 80, manylinux1 = 70. Non-wheels get a very high value.

Note:
The scoring algorithm prioritizes:
1. Higher Python version (e.g., cp311 over cp39)
2. Newer platform tags with higher scores (e.g., manylinux_2_28 [228] over
manylinux2014 [90] over manylinux1 [70])
Only CPython (cp) and generic Python (py) interpreters are considered.

"""
if file.get("is_wheel", False) is False:
return (int(1e10), int(1e10))

best_score: Tuple[int, int] = (0, 0)

for tag in file.get("tags", []):
# Calculate Python score
interpreter_name, py_version = parse_interpreter(tag.interpreter)
if interpreter_name not in ("cp", "py") or not py_version:
continue

version_obj = packaging.version.Version(py_version)
py_score = version_obj.major * 100 + version_obj.minor

# Calculate platform score
platform = tag.platform
platform_score = 0
match = re.search(r"[a-z]+_(\d+)_(\d+)", platform)
if match:
# this provides a minimum platform_score of 100 (glibc 1.0 = 100+0 = 100)
platform_score = int(match.group(1)) * 100 + int(match.group(2))
elif "manylinux2014" in platform:
platform_score = 90
elif "manylinux2010" in platform:
platform_score = 80
elif "manylinux1" in platform:
platform_score = 70

# Keep the lexicographically maximum tuple (highest py_score, then highest platform_score)
current_score = (py_score, platform_score)
best_score = max(best_score, current_score)

return best_score

def _process_file(
self,
requirement: packaging.requirements.Requirement,
Expand Down Expand Up @@ -598,6 +713,16 @@ def my_url(arg):
type=str,
help="Regular expression to filter which package file types are mirrored",
)
parser.add_argument(
"-W",
"--mirror-all-wheels",
dest="mirror_all_wheels",
action="store_true",
help=(
"Download all compatible wheels for each version. "
"(default: fetch only the wheel for latest compatible Python version)"
),
)

server.add_arguments(parser)
configurator.add_arguments(parser)
Expand Down
112 changes: 110 additions & 2 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os

import packaging.requirements
import packaging.version
import pytest

from morgan import PYPI_ADDRESS, Mirrorer, parse_interpreter, parse_requirement, server
Expand Down Expand Up @@ -88,6 +89,7 @@ def test_mirrorer_initialization(self, temp_index_path):
config=os.path.join(temp_index_path, "morgan.ini"),
mirror_all_versions=False,
package_type_regex="(whl|zip|tar.gz)",
mirror_all_wheels=False,
)

mirrorer = Mirrorer(args)
Expand All @@ -107,6 +109,7 @@ def test_server_file_copying(self, temp_index_path):
config=os.path.join(temp_index_path, "morgan.ini"),
mirror_all_versions=False,
package_type_regex="(whl|zip|tar.gz)",
mirror_all_wheels=False,
)
mirrorer = Mirrorer(args)

Expand All @@ -131,6 +134,7 @@ def test_file_hashing(self, temp_index_path):
config=os.path.join(temp_index_path, "morgan.ini"),
mirror_all_versions=False,
package_type_regex="(whl|zip|tar.gz)",
mirror_all_wheels=False,
)
mirrorer = Mirrorer(args)

Expand Down Expand Up @@ -180,21 +184,23 @@ def _make_mirrorer(mirror_all_versions):
config=os.path.join(temp_index_path, "morgan.ini"),
mirror_all_versions=mirror_all_versions,
package_type_regex=r"(whl|zip|tar\.gz)",
mirror_all_wheels=False,
)
return Mirrorer(args)

return _make_mirrorer

@staticmethod
def make_file(filename, **overrides):
def make_file(filename, overrides=None):
fileinfo = {
"filename": filename,
"hashes": {
"sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
},
"url": f"https://example.com/{filename}",
}
fileinfo.update(overrides)
if overrides:
fileinfo.update(overrides)
return fileinfo

@pytest.fixture
Expand All @@ -206,6 +212,46 @@ def sample_files(self):
self.make_file("sample_package-1.4.9.tar.gz"),
]

@pytest.fixture
def sample_package_files(self):
"""Fixture providing v2.0.0 (incompatible) and v1.0.0 (compatible) files."""
# Version 2.0.0 files - NOT compatible (requires Python 3.11+, but env is 3.10)
v2_whl = self.make_file(
"sample_package-2.0.0-py3-none-any.whl",
{
"version": packaging.version.Version("2.0.0"),
"is_wheel": True,
"requires-python": ">=3.11",
},
)
v2_tar = self.make_file(
"sample_package-2.0.0.tar.gz",
{
"version": packaging.version.Version("2.0.0"),
"is_wheel": False,
"requires-python": ">=3.11",
},
)

# Version 1.0.0 files - Compatible (works with Python 3.9+, and env is 3.10)
v1_whl = self.make_file(
"sample_package-1.0.0-py3-none-any.whl",
{
"version": packaging.version.Version("1.0.0"),
"is_wheel": True,
"requires-python": ">=3.9",
},
)
v1_tar = self.make_file(
"sample_package-1.0.0.tar.gz",
{
"version": packaging.version.Version("1.0.0"),
"is_wheel": False,
"requires-python": ">=3.9",
},
)
return v1_whl, v1_tar, v2_whl, v2_tar

@staticmethod
def extract_versions(files):
if not files:
Expand Down Expand Up @@ -266,3 +312,65 @@ def test_filter_files_with_latest_version_mirrored(
)

assert self.extract_versions(filtered_files) == expected_versions

def test_filter_files_selects_latest_compatible_version_of_wheel_and_sdist(
self, make_mirrorer, sample_package_files
):
"""Check that no incompatible wheel or sdist is being mirrored when multiple versions are available."""
mirrorer = make_mirrorer(mirror_all_versions=False)
requirement = packaging.requirements.Requirement("sample_package>=1.0")

files = sample_package_files

# pylint: disable=W0212
filtered_files = mirrorer._filter_files(
requirement=requirement, required_by=None, files=files
)

filenames = {f["filename"] for f in filtered_files}
assert {
"sample_package-1.0.0-py3-none-any.whl",
"sample_package-1.0.0.tar.gz",
} == set(filenames), f"Wrong packages selected. Got: {filenames}"

def test_filter_files_selects_latest_compatible_version_of_wheel_only(
self, make_mirrorer, sample_package_files
):
"""Check that no incompatible wheel or sdist is being mirrored when multiple versions are available."""
mirrorer = make_mirrorer(mirror_all_versions=False)

v1_whl, _, v2_whl, v2_tar = sample_package_files

files = [v1_whl, v2_whl, v2_tar]
requirement = packaging.requirements.Requirement("sample_package>=1.0")

# pylint: disable=W0212
filtered_files = mirrorer._filter_files(
requirement=requirement, required_by=None, files=files
)

filenames = {f["filename"] for f in filtered_files}
assert {
"sample_package-1.0.0-py3-none-any.whl",
} == set(filenames), f"Wrong packages selected. Got: {filenames}"

def test_filter_files_selects_latest_compatible_version_of_sdist_only(
self, make_mirrorer, sample_package_files
):
"""Check that no incompatible wheel or sdist is being mirrored when multiple versions are available."""
mirrorer = make_mirrorer(mirror_all_versions=False)

_, v1_tar, v2_whl, v2_tar = sample_package_files

files = [v1_tar, v2_whl, v2_tar]
requirement = packaging.requirements.Requirement("sample_package>=1.0")

# pylint: disable=W0212
filtered_files = mirrorer._filter_files(
requirement=requirement, required_by=None, files=files
)

filenames = {f["filename"] for f in filtered_files}
assert {
"sample_package-1.0.0.tar.gz",
} == set(filenames), f"Wrong packages selected. Got: {filenames}"
Loading