Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/python_discovery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ._discovery import get_interpreter
from ._py_info import PythonInfo
from ._py_spec import PythonSpec
from ._specifier import SimpleSpecifier, SimpleSpecifierSet, SimpleVersion

__version__ = version("python-discovery")

Expand All @@ -17,6 +18,9 @@
"PyInfoCache",
"PythonInfo",
"PythonSpec",
"SimpleSpecifier",
"SimpleSpecifierSet",
"SimpleVersion",
"__version__",
"get_interpreter",
]
42 changes: 35 additions & 7 deletions src/python_discovery/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,47 @@
class ContentStore(Protocol):
"""A store for reading and writing cached content."""

def exists(self) -> bool: ...
def exists(self) -> bool:
"""Return whether the cached content exists."""
...

def read(self) -> dict | None: ...
def read(self) -> dict | None:
"""Read the cached content, or ``None`` if unavailable or corrupt."""
...

def write(self, content: dict) -> None: ...
def write(self, content: dict) -> None:
"""
Persist *content* to the store.

def remove(self) -> None: ...
:param content: interpreter metadata to cache.
"""
...

def remove(self) -> None:
"""Delete the cached content."""
...

@contextmanager
def locked(self) -> Generator[None]: ...
def locked(self) -> Generator[None]:
"""Context manager that acquires an exclusive lock on this store."""
...


@runtime_checkable
class PyInfoCache(Protocol):
"""Cache interface for Python interpreter information."""

def py_info(self, path: Path) -> ContentStore: ...
def py_info(self, path: Path) -> ContentStore:
"""
Return the content store for the interpreter at *path*.

:param path: absolute path to a Python executable.
"""
...

def py_info_clear(self) -> None: ...
def py_info_clear(self) -> None:
"""Remove all cached interpreter information."""
...


class DiskContentStore:
Expand Down Expand Up @@ -101,10 +123,16 @@ def _py_info_dir(self) -> Path:
return self._root / "py_info" / "4"

def py_info(self, path: Path) -> DiskContentStore:
"""
Return the content store for the interpreter at *path*.

:param path: absolute path to a Python executable.
"""
key = sha256(str(path).encode("utf-8")).hexdigest()
return DiskContentStore(self._py_info_dir, key)

def py_info_clear(self) -> None:
"""Remove all cached interpreter information."""
folder = self._py_info_dir
if folder.exists():
for entry in folder.iterdir():
Expand Down
31 changes: 29 additions & 2 deletions src/python_discovery/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,26 @@ def get_interpreter(
try_first_with: Iterable[str] | None = None,
cache: PyInfoCache | None = None,
env: Mapping[str, str] | None = None,
predicate: Callable[[PythonInfo], bool] | None = None,
) -> PythonInfo | None:
"""
Find a Python interpreter matching *key*.

Iterates over one or more specification strings and returns the first interpreter that satisfies the spec and passes
the optional *predicate*.

:param key: interpreter specification string(s) — an absolute path, a version (``3.12``), an implementation prefix
(``cpython3.12``), or a PEP 440 specifier (``>=3.10``). When a sequence is given each entry is tried in order.
:param try_first_with: executables to probe before the normal discovery search.
:param cache: interpreter metadata cache; when ``None`` results are not cached.
:param env: environment mapping for ``PATH`` lookup; defaults to :data:`os.environ`.
:param predicate: optional callback applied after an interpreter matches the spec. Return ``True`` to accept the
interpreter, ``False`` to skip it and continue searching.
:return: the first matching interpreter, or ``None`` if no match is found.
"""
specs = [key] if isinstance(key, str) else key
for spec_str in specs:
if result := _find_interpreter(spec_str, try_first_with or (), cache, env):
if result := _find_interpreter(spec_str, try_first_with or (), cache, env, predicate):
return result
return None

Expand All @@ -40,6 +56,7 @@ def _find_interpreter(
try_first_with: Iterable[str],
cache: PyInfoCache | None = None,
env: Mapping[str, str] | None = None,
predicate: Callable[[PythonInfo], bool] | None = None,
) -> PythonInfo | None:
spec = PythonSpec.from_string_spec(key)
_LOGGER.info("find interpreter for spec %r", spec)
Expand All @@ -52,7 +69,9 @@ def _find_interpreter(
if proposed_key in proposed_paths:
continue
_LOGGER.info("proposed %s", interpreter)
if interpreter.satisfies(spec, impl_must_match=impl_must_match):
if interpreter.satisfies(spec, impl_must_match=impl_must_match) and (
predicate is None or predicate(interpreter)
):
_LOGGER.debug("accepted %s", interpreter)
return interpreter
proposed_paths.add(proposed_key)
Expand Down Expand Up @@ -88,6 +107,14 @@ def propose_interpreters(
cache: PyInfoCache | None = None,
env: Mapping[str, str] | None = None,
) -> Generator[tuple[PythonInfo | None, bool], None, None]:
"""
Yield ``(interpreter, impl_must_match)`` candidates for *spec*.

:param spec: the parsed interpreter specification to match against.
:param try_first_with: executable paths to probe before the standard search.
:param cache: interpreter metadata cache; when ``None`` results are not cached.
:param env: environment mapping for ``PATH`` lookup; defaults to :data:`os.environ`.
"""
env = os.environ if env is None else env
tested_exes: set[str] = set()
if spec.is_abs and spec.path is not None:
Expand Down
18 changes: 18 additions & 0 deletions src/python_discovery/_py_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,11 @@ def current_system(cls, cache: PyInfoCache | None = None) -> PythonInfo:
return cls._current_system

def to_json(self) -> str:
"""Serialize this interpreter information to a JSON string."""
return json.dumps(self.to_dict(), indent=2)

def to_dict(self) -> dict[str, object]:
"""Convert this interpreter information to a plain dictionary."""
data = {var: (getattr(self, var) if var != "_creators" else None) for var in vars(self)}
version_info = data["version_info"]
data["version_info"] = version_info._asdict() if hasattr(version_info, "_asdict") else version_info
Expand Down Expand Up @@ -520,18 +522,34 @@ def from_exe( # noqa: PLR0913

@classmethod
def from_json(cls, payload: str) -> PythonInfo:
"""
Deserialize interpreter information from a JSON string.

:param payload: JSON produced by :meth:`to_json`.
"""
raw = json.loads(payload)
return cls.from_dict(raw.copy())

@classmethod
def from_dict(cls, data: dict[str, object]) -> PythonInfo:
"""
Reconstruct a :class:`PythonInfo` from a plain dictionary.

:param data: dictionary produced by :meth:`to_dict`.
"""
data["version_info"] = VersionInfo(**data["version_info"]) # restore this to a named tuple structure
result = cls()
result.__dict__ = data.copy()
return result

@classmethod
def resolve_to_system(cls, cache: PyInfoCache | None, target: PythonInfo) -> PythonInfo:
"""
Walk virtualenv/venv prefix chains to find the underlying system interpreter.

:param cache: interpreter metadata cache; when ``None`` results are not cached.
:param target: the interpreter to resolve.
"""
start_executable = target.executable
prefixes = OrderedDict()
while target.system_executable is None:
Expand Down
1 change: 1 addition & 0 deletions src/python_discovery/_py_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def generate_re(self, *, windows: bool) -> re.Pattern:

@property
def is_abs(self) -> bool:
"""``True`` if the spec refers to an absolute filesystem path."""
return self.path is not None and pathlib.Path(self.path).is_absolute()

def _check_version_specifier(self, spec: PythonSpec) -> bool:
Expand Down
27 changes: 25 additions & 2 deletions src/python_discovery/_specifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class SimpleVersion:

@classmethod
def from_string(cls, version_str: str) -> SimpleVersion:
"""
Parse a PEP 440 version string (e.g. ``3.12.1``).

:param version_str: the version string to parse.
"""
stripped = version_str.strip()
if not (match := _VERSION_RE.match(stripped)):
msg = f"Invalid version: {version_str}"
Expand Down Expand Up @@ -123,6 +128,11 @@ class SimpleSpecifier:

@classmethod
def from_string(cls, spec_str: str) -> SimpleSpecifier:
"""
Parse a single PEP 440 specifier (e.g. ``>=3.10``).

:param spec_str: the specifier string to parse.
"""
stripped = spec_str.strip()
if not (match := _SPECIFIER_RE.match(stripped)):
msg = f"Invalid specifier: {spec_str}"
Expand All @@ -148,7 +158,11 @@ def from_string(cls, spec_str: str) -> SimpleSpecifier:
)

def contains(self, version_str: str) -> bool:
"""Check if a version string satisfies this specifier."""
"""
Check if a version string satisfies this specifier.

:param version_str: the version string to test.
"""
try:
candidate = SimpleVersion.from_string(version_str) if isinstance(version_str, str) else version_str
except ValueError:
Expand Down Expand Up @@ -223,6 +237,11 @@ class SimpleSpecifierSet:

@classmethod
def from_string(cls, specifiers_str: str = "") -> SimpleSpecifierSet:
"""
Parse a comma-separated PEP 440 specifier string (e.g. ``>=3.10,<4``).

:param specifiers_str: the specifier string to parse.
"""
stripped = specifiers_str.strip()
specs: list[SimpleSpecifier] = []
if stripped:
Expand All @@ -234,7 +253,11 @@ def from_string(cls, specifiers_str: str = "") -> SimpleSpecifierSet:
return cls(specifiers_str=stripped, specifiers=tuple(specs))

def contains(self, version_str: str) -> bool:
"""Check if a version satisfies all specifiers in the set."""
"""
Check if a version satisfies all specifiers in the set.

:param version_str: the version string to test.
"""
if not self.specifiers:
return True
return all(spec.contains(version_str) for spec in self.specifiers)
Expand Down
35 changes: 35 additions & 0 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,38 @@ def test_shim_colon_separated_pyenv_version_picks_first_match(
mock_from_exe.return_value = None
get_interpreter("python2.7", [])
assert mock_from_exe.call_args_list[0][0][0] == str(second_binary)


def test_predicate_filters_interpreters(session_cache: DiskCache) -> None:
result = get_interpreter(sys.executable, [], session_cache, predicate=lambda _: False)
assert result is None


def test_predicate_accepts_interpreter(session_cache: DiskCache) -> None:
result = get_interpreter(sys.executable, [], session_cache, predicate=lambda _: True)
assert result is not None
assert result.executable == sys.executable


def test_predicate_none_is_noop(session_cache: DiskCache) -> None:
result = get_interpreter(sys.executable, [], session_cache, predicate=None)
assert result is not None
assert result.executable == sys.executable


def test_predicate_with_fallback_specs(session_cache: DiskCache) -> None:
current = PythonInfo.current_system(session_cache)
major, minor = current.version_info.major, current.version_info.minor
accepted_exe: str | None = None

def reject_first(info: PythonInfo) -> bool:
nonlocal accepted_exe
if accepted_exe is None:
accepted_exe = str(info.executable)
return False
return True

result = get_interpreter([f"{major}.{minor}", sys.executable], [], session_cache, predicate=reject_first)
assert accepted_exe is not None
assert result is not None
assert str(result.executable) != accepted_exe