Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions pipelex/libraries/pipe/pipe_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pipelex import pretty_print
from pipelex.core.pipes.pipe_abstract import PipeAbstract
from pipelex.core.qualified_ref import QualifiedRef
from pipelex.core.qualified_ref import QualifiedRef, QualifiedRefError
from pipelex.libraries.pipe.exceptions import PipeLibraryError, PipeNotFoundError
from pipelex.libraries.pipe.pipe_library_abstract import PipeLibraryAbstract
from pipelex.types import Self
Expand Down Expand Up @@ -61,12 +61,24 @@ def get_optional_pipe(self, pipe_code: str) -> PipeAbstract | None:
# Cross-package: "alias->domain.pipe_code" -> lookup "alias->pipe_code"
if QualifiedRef.has_cross_package_prefix(pipe_code):
alias, remainder = QualifiedRef.split_cross_package_ref(pipe_code)
ref = QualifiedRef.parse(remainder)
return self.root.get(f"{alias}->{ref.local_code}")
try:
ref = QualifiedRef.parse(remainder)
except QualifiedRefError:
return None
pipe = self.root.get(f"{alias}->{ref.local_code}")
if pipe is not None and ref.is_qualified and pipe.domain_code != ref.domain_path:
return None
return pipe
# If it's a domain-qualified ref (e.g. "scoring.compute_score"), try the local code
if "." in pipe_code:
ref = QualifiedRef.parse(pipe_code)
return self.root.get(ref.local_code)
try:
ref = QualifiedRef.parse(pipe_code)
except QualifiedRefError:
return None
pipe = self.root.get(ref.local_code)
if pipe is not None and ref.is_qualified and pipe.domain_code != ref.domain_path:
return None
return pipe
return None

def add_dependency_pipe(self, alias: str, pipe: PipeAbstract) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def test_pipe_library_get_optional_cross_package_ref(self, mocker: MockerFixture
library = PipeLibrary.make_empty()
mock_pipe = mocker.MagicMock()
mock_pipe.code = "compute_score"
mock_pipe.domain_code = "scoring"
library.add_dependency_pipe(alias="scoring_lib", pipe=mock_pipe)

result = library.get_optional_pipe("scoring_lib->scoring.compute_score")
Expand Down
101 changes: 101 additions & 0 deletions tests/unit/pipelex/libraries/test_pipe_library_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from typing import Any

import pytest
from pytest_mock import MockerFixture

from pipelex.libraries.pipe.exceptions import PipeNotFoundError
from pipelex.libraries.pipe.pipe_library import PipeLibrary


def _make_stub_pipe(mocker: MockerFixture, code: str, domain_code: str) -> Any:
"""Create a minimal mock pipe with code and domain_code."""
mock_pipe = mocker.MagicMock()
mock_pipe.code = code
mock_pipe.domain_code = domain_code
return mock_pipe


class TestPipeLibraryLookup:
"""Tests for PipeLibrary.get_optional_pipe domain enforcement and malformed-ref safety."""

def test_bare_code_lookup(self, mocker: MockerFixture):
"""Bare code lookup still works."""
library = PipeLibrary.make_empty()
mock_pipe = _make_stub_pipe(mocker, code="compute_score", domain_code="scoring")
library.root["compute_score"] = mock_pipe
result = library.get_optional_pipe("compute_score")
assert result is mock_pipe

def test_domain_qualified_ref_correct_domain(self, mocker: MockerFixture):
"""Domain-qualified ref resolves when pipe domain matches."""
library = PipeLibrary.make_empty()
mock_pipe = _make_stub_pipe(mocker, code="compute_score", domain_code="scoring")
library.root["compute_score"] = mock_pipe
result = library.get_optional_pipe("scoring.compute_score")
assert result is mock_pipe

def test_domain_qualified_ref_wrong_domain(self, mocker: MockerFixture):
"""Domain-qualified ref returns None when pipe domain does not match."""
library = PipeLibrary.make_empty()
mock_pipe = _make_stub_pipe(mocker, code="compute_score", domain_code="scoring")
library.root["compute_score"] = mock_pipe
result = library.get_optional_pipe("wrong_domain.compute_score")
assert result is None

def test_cross_package_ref_correct_domain(self, mocker: MockerFixture):
"""Cross-package ref resolves when pipe domain matches."""
library = PipeLibrary.make_empty()
mock_pipe = _make_stub_pipe(mocker, code="compute_score", domain_code="scoring")
library.add_dependency_pipe(alias="lib", pipe=mock_pipe)
result = library.get_optional_pipe("lib->scoring.compute_score")
assert result is mock_pipe

def test_cross_package_ref_wrong_domain(self, mocker: MockerFixture):
"""Cross-package ref returns None when pipe domain does not match."""
library = PipeLibrary.make_empty()
mock_pipe = _make_stub_pipe(mocker, code="compute_score", domain_code="scoring")
library.add_dependency_pipe(alias="lib", pipe=mock_pipe)
result = library.get_optional_pipe("lib->wrong_domain.compute_score")
assert result is None

@pytest.mark.parametrize(
"malformed_ref",
[
"foo..bar",
".foo",
"foo.",
],
)
def test_malformed_dotted_ref_returns_none(self, malformed_ref: str):
"""Malformed dotted refs return None instead of raising."""
library = PipeLibrary.make_empty()
result = library.get_optional_pipe(malformed_ref)
assert result is None

@pytest.mark.parametrize(
"malformed_ref",
[
"lib->foo..bar",
"lib->.foo",
"lib->foo.",
],
)
def test_malformed_cross_package_ref_returns_none(self, malformed_ref: str):
"""Malformed cross-package refs return None instead of raising."""
library = PipeLibrary.make_empty()
result = library.get_optional_pipe(malformed_ref)
assert result is None

def test_get_required_pipe_malformed_raises_not_found(self):
"""Malformed ref through get_required_pipe raises PipeNotFoundError, not QualifiedRefError."""
library = PipeLibrary.make_empty()
with pytest.raises(PipeNotFoundError):
library.get_required_pipe("foo..bar")

def test_get_required_pipe_domain_mismatch_raises_not_found(self, mocker: MockerFixture):
"""Domain mismatch through get_required_pipe raises PipeNotFoundError."""
library = PipeLibrary.make_empty()
mock_pipe = _make_stub_pipe(mocker, code="compute_score", domain_code="scoring")
library.root["compute_score"] = mock_pipe
with pytest.raises(PipeNotFoundError):
library.get_required_pipe("wrong_domain.compute_score")
Loading