Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pipelex/builder/agentic_builder.mthds
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ inputs = { plan_draft = "builder.PlanDraft", pipe_signatures = "pipe_design.Pipe
output = "pipe_design.PipeSpec[]"
input_list_name = "pipe_signatures"
input_item_name = "pipe_signature"
branch_pipe_code = "detail_pipe_spec"
branch_pipe_code = "pipe_design.detail_pipe_spec"

# Main agent builder: from flow to bundle (skips all drafting)
[pipe.build_from_flow]
Expand All @@ -22,8 +22,8 @@ description = "Build a complete PipelexBundleSpec from pre-generated flow and co
inputs = { brief = "builder.UserBrief", plan_draft = "builder.PlanDraft", prepared_flow = "builder.FlowDraft", concept_specs = "builder.ConceptSpec[]" }
output = "builder.PipelexBundleSpec"
steps = [
{ pipe = "design_pipe_signatures", result = "pipe_signatures" },
{ pipe = "write_bundle_header", result = "bundle_header_spec" },
{ pipe = "builder.design_pipe_signatures", result = "pipe_signatures" },
{ pipe = "builder.write_bundle_header", result = "bundle_header_spec" },
{ pipe = "detail_all_pipe_specs", result = "pipe_specs" },
{ pipe = "assemble_pipelex_bundle_spec", result = "pipelex_bundle_spec" }
{ pipe = "builder.assemble_pipelex_bundle_spec", result = "pipelex_bundle_spec" }
]
2 changes: 1 addition & 1 deletion pipelex/builder/builder.mthds
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ steps = [
{ pipe = "review_flow", result = "prepared_flow" },
{ pipe = "design_pipe_signatures", result = "pipe_signatures" },
{ pipe = "write_bundle_header", result = "bundle_header_spec" },
{ pipe = "detail_pipe_spec", batch_over = "pipe_signatures", batch_as = "pipe_signature", result = "pipe_specs" },
{ pipe = "pipe_design.detail_pipe_spec", batch_over = "pipe_signatures", batch_as = "pipe_signature", result = "pipe_specs" },
{ pipe = "assemble_pipelex_bundle_spec", result = "pipelex_bundle_spec" }
]

Expand Down
98 changes: 86 additions & 12 deletions pipelex/core/bundles/pipelex_bundle_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from pipelex.core.domains.validation import validate_domain_code
from pipelex.core.pipes.validation import is_pipe_code_valid
from pipelex.core.pipes.variable_multiplicity import parse_concept_with_multiplicity
from pipelex.core.qualified_ref import QualifiedRef, QualifiedRefError
from pipelex.pipe_controllers.batch.pipe_batch_blueprint import PipeBatchBlueprint
from pipelex.pipe_controllers.condition.pipe_condition_blueprint import PipeConditionBlueprint
from pipelex.pipe_controllers.condition.special_outcome import SpecialOutcome
from pipelex.pipe_controllers.parallel.pipe_parallel_blueprint import PipeParallelBlueprint
from pipelex.pipe_controllers.sequence.pipe_sequence_blueprint import PipeSequenceBlueprint
from pipelex.pipe_operators.compose.pipe_compose_blueprint import PipeComposeBlueprint
Expand Down Expand Up @@ -123,18 +125,15 @@ def validate_local_concept_references(self) -> Self:

undeclared_refs: list[str] = []
for concept_ref_or_code, context in all_refs:
# Determine if this is a local reference or an external one
if "." in concept_ref_or_code:
# It's a concept ref (domain.ConceptCode)
domain, concept_code = concept_ref_or_code.split(".", 1)
if domain != self.domain:
# External reference - skip validation (will be validated when loading dependencies)
continue
else:
# It's a bare concept code - always local
concept_code = concept_ref_or_code

# Validate local reference
# Parse the reference using QualifiedRef
ref = QualifiedRef.parse(concept_ref_or_code)

if ref.is_external_to(self.domain):
# External reference - skip validation (will be validated when loading dependencies)
continue

# Local reference (bare code or same domain) - validate
concept_code = ref.local_code
if concept_code not in declared_concepts and concept_code not in native_codes:
undeclared_refs.append(f"'{concept_ref_or_code}' in {context}")

Expand All @@ -148,6 +147,81 @@ def validate_local_concept_references(self) -> Self:
raise ValueError(msg)
return self

@model_validator(mode="after")
def validate_local_pipe_references(self) -> Self:
"""Validate that domain-qualified pipe references pointing to this bundle's domain exist locally.

Three categories:
- Bare refs (no dot): no validation here (deferred to package-level resolution)
- Domain-qualified, same domain: must exist in self.pipe
- Domain-qualified, different domain: skip (external, validated at load time)

Special outcomes ("fail", "continue") are excluded from validation.
"""
declared_pipes: set[str] = set(self.pipe.keys()) if self.pipe else set()
special_outcomes = SpecialOutcome.value_list()
all_pipe_refs = self._collect_pipe_references()

invalid_refs: list[str] = []
for pipe_ref_str, context in all_pipe_refs:
# Skip special outcomes
if pipe_ref_str in special_outcomes:
continue

# Try to parse as a pipe ref
try:
ref = QualifiedRef.parse_pipe_ref(pipe_ref_str)
except QualifiedRefError:
# If it doesn't parse as a valid pipe ref, skip (will be caught elsewhere)
continue

if not ref.is_qualified:
# Bare ref - no validation at bundle level
continue

if ref.is_external_to(self.domain):
# External domain - skip
continue

# Same domain, qualified ref - must exist locally
if ref.local_code not in declared_pipes:
invalid_refs.append(f"'{pipe_ref_str}' in {context}")

if invalid_refs:
msg = (
f"The following same-domain pipe references are not declared in domain '{self.domain}' "
f"at '{self.source}': {', '.join(invalid_refs)}. "
f"Declared pipes: {sorted(declared_pipes) if declared_pipes else '(none)'}"
)
raise ValueError(msg)
return self

def _collect_pipe_references(self) -> list[tuple[str, str]]:
"""Collect all pipe references from controller blueprints.

Returns:
List of (pipe_ref_string, context_description) tuples
"""
pipe_refs: list[tuple[str, str]] = []
if not self.pipe:
return pipe_refs

for pipe_code, pipe_blueprint in self.pipe.items():
if isinstance(pipe_blueprint, PipeSequenceBlueprint):
for step_index, step in enumerate(pipe_blueprint.steps):
pipe_refs.append((step.pipe, f"pipe.{pipe_code}.steps[{step_index}].pipe"))
elif isinstance(pipe_blueprint, PipeBatchBlueprint):
pipe_refs.append((pipe_blueprint.branch_pipe_code, f"pipe.{pipe_code}.branch_pipe_code"))
elif isinstance(pipe_blueprint, PipeConditionBlueprint):
for outcome_key, outcome_pipe in pipe_blueprint.outcomes.items():
pipe_refs.append((outcome_pipe, f"pipe.{pipe_code}.outcomes[{outcome_key}]"))
pipe_refs.append((pipe_blueprint.default_outcome, f"pipe.{pipe_code}.default_outcome"))
elif isinstance(pipe_blueprint, PipeParallelBlueprint):
for branch_index, branch in enumerate(pipe_blueprint.branches):
pipe_refs.append((branch.pipe, f"pipe.{pipe_code}.branches[{branch_index}].pipe"))

return pipe_refs

def _collect_local_concept_references(self) -> list[tuple[str, str]]:
local_refs: list[tuple[str, str]] = []

Expand Down
14 changes: 9 additions & 5 deletions pipelex/core/concepts/concept_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pipelex.core.concepts.structure_generation.generator import StructureGenerator
from pipelex.core.concepts.validation import validate_concept_ref_or_code
from pipelex.core.domains.domain import SpecialDomain
from pipelex.core.qualified_ref import QualifiedRef
from pipelex.core.stuffs.text_content import TextContent
from pipelex.types import StrEnum

Expand Down Expand Up @@ -178,12 +179,14 @@ def make_domain_and_concept_code_from_concept_ref_or_code(
raise ConceptFactoryError(msg) from exc

if NativeConceptCode.is_native_concept_ref_or_code(concept_ref_or_code=concept_ref_or_code):
natice_concept_ref = NativeConceptCode.get_validated_native_concept_ref(concept_ref_or_code=concept_ref_or_code)
return DomainAndConceptCode(domain_code=SpecialDomain.NATIVE, concept_code=natice_concept_ref.split(".")[1])
native_concept_ref = NativeConceptCode.get_validated_native_concept_ref(concept_ref_or_code=concept_ref_or_code)
ref = QualifiedRef.parse(native_concept_ref)
return DomainAndConceptCode(domain_code=SpecialDomain.NATIVE, concept_code=ref.local_code)

if "." in concept_ref_or_code:
domain_code, concept_code = concept_ref_or_code.rsplit(".")
return DomainAndConceptCode(domain_code=domain_code, concept_code=concept_code)
ref = QualifiedRef.parse(concept_ref_or_code)
assert ref.domain_path is not None
return DomainAndConceptCode(domain_code=ref.domain_path, concept_code=ref.local_code)
elif domain_code:
return DomainAndConceptCode(domain_code=domain_code, concept_code=concept_ref_or_code)
else:
Expand Down Expand Up @@ -365,7 +368,8 @@ def _handle_refines(
# Get the refined concept's structure class name
# For native concepts, the structure class name is "ConceptCode" + "Content" (e.g., TextContent)
# For custom concepts, the structure class name is just the concept code (e.g., Customer)
refined_concept_code = current_refine.split(".")[1]
refined_ref = QualifiedRef.parse(current_refine)
refined_concept_code = refined_ref.local_code
if NativeConceptCode.is_native_concept_ref_or_code(concept_ref_or_code=current_refine):
refined_structure_class_name = refined_concept_code + "Content"
else:
Expand Down
12 changes: 5 additions & 7 deletions pipelex/core/concepts/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pipelex.core.concepts.concept_structure_blueprint import ConceptStructureBlueprint, ConceptStructureBlueprintFieldType
from pipelex.core.concepts.validation import is_concept_ref_or_code_valid
from pipelex.core.qualified_ref import QualifiedRef

if TYPE_CHECKING:
from pipelex.core.concepts.concept_blueprint import ConceptBlueprint
Expand Down Expand Up @@ -35,10 +36,8 @@ def get_structure_class_name_from_blueprint(
raise ValueError(msg)

# Extract concept_code from concept_ref_or_code
if "." in concept_ref_or_code:
concept_code = concept_ref_or_code.rsplit(".", maxsplit=1)[-1]
else:
concept_code = concept_ref_or_code
ref = QualifiedRef.parse(concept_ref_or_code)
concept_code = ref.local_code

if isinstance(blueprint_or_string_description, str):
return concept_code
Expand Down Expand Up @@ -101,6 +100,5 @@ def extract_concept_code_from_concept_ref_or_code(concept_ref_or_code: str) -> s
msg = f"Invalid concept_ref_or_code: '{concept_ref_or_code}' for extracting concept code"
raise ValueError(msg)

if "." in concept_ref_or_code:
return concept_ref_or_code.rsplit(".", maxsplit=1)[-1]
return concept_ref_or_code
ref = QualifiedRef.parse(concept_ref_or_code)
return ref.local_code
11 changes: 7 additions & 4 deletions pipelex/core/concepts/native/concept_native.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pipelex.core.concepts.native.exceptions import NativeConceptDefinitionError
from pipelex.core.concepts.validation import is_concept_ref_or_code_valid
from pipelex.core.domains.domain import SpecialDomain
from pipelex.core.qualified_ref import QualifiedRef
from pipelex.core.stuffs.document_content import DocumentContent
from pipelex.core.stuffs.dynamic_content import DynamicContent
from pipelex.core.stuffs.html_content import HtmlContent
Expand Down Expand Up @@ -160,8 +161,9 @@ def is_native_concept_ref_or_code(cls, concept_ref_or_code: str) -> bool:
return False

if "." in concept_ref_or_code:
domain_code, concept_code = concept_ref_or_code.split(".", 1)
return SpecialDomain.is_native(domain_code=domain_code) and concept_code in cls.values_list()
ref = QualifiedRef.parse(concept_ref_or_code)
assert ref.domain_path is not None
return SpecialDomain.is_native(domain_code=ref.domain_path) and ref.local_code in cls.values_list()
return concept_ref_or_code in cls.values_list()

@classmethod
Expand All @@ -179,8 +181,9 @@ def is_valid_native_concept_ref(cls, concept_ref: str) -> bool:
"""
if "." not in concept_ref:
return False
domain_code, concept_code = concept_ref.split(".", 1)
return SpecialDomain.is_native(domain_code=domain_code) and concept_code in cls.values_list()
ref = QualifiedRef.parse(concept_ref)
assert ref.domain_path is not None
return SpecialDomain.is_native(domain_code=ref.domain_path) and ref.local_code in cls.values_list()

@classmethod
def validate_native_concept_ref_or_code(cls, concept_ref_or_code: str) -> None:
Expand Down
36 changes: 17 additions & 19 deletions pipelex/core/concepts/validation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pipelex.core.concepts.exceptions import ConceptCodeError, ConceptStringError
from pipelex.core.domains.validation import is_domain_code_valid
from pipelex.core.qualified_ref import QualifiedRef, QualifiedRefError
from pipelex.tools.misc.string_utils import is_pascal_case


Expand All @@ -14,40 +14,38 @@ def validate_concept_code(concept_code: str) -> None:


def is_concept_ref_valid(concept_ref: str) -> bool:
if "." not in concept_ref:
return False

if concept_ref.count(".") > 1:
return False

domain, concept_code = concept_ref.split(".", 1)
"""Check if a concept reference (domain.ConceptCode) is valid.

# Validate domain
if not is_domain_code_valid(code=domain):
Supports hierarchical domains: "legal.contracts.NonCompeteClause" is valid.
"""
try:
ref = QualifiedRef.parse_concept_ref(concept_ref)
except QualifiedRefError:
return False

# Validate concept code
return is_concept_code_valid(concept_code=concept_code)
return ref.is_qualified


def validate_concept_ref(concept_ref: str) -> None:
if not is_concept_ref_valid(concept_ref=concept_ref):
msg = (
f"Concept string '{concept_ref}' is not a valid concept string. It must be in the format 'domain.ConceptCode': "
" - domain: a valid domain code (snake_case), "
" - domain: a valid domain code (snake_case, possibly hierarchical like legal.contracts), "
" - ConceptCode: a valid concept code (PascalCase)"
)
raise ConceptStringError(msg)


def is_concept_ref_or_code_valid(concept_ref_or_code: str) -> bool:
if concept_ref_or_code.count(".") > 1:
return False
"""Check if a concept reference or bare code is valid.

if concept_ref_or_code.count(".") == 1:
Supports hierarchical domains: "legal.contracts.NonCompeteClause" is valid.
Bare codes must be PascalCase: "NonCompeteClause" is valid.
"""
if not concept_ref_or_code:
return False
if "." in concept_ref_or_code:
return is_concept_ref_valid(concept_ref=concept_ref_or_code)
else:
return is_concept_code_valid(concept_code=concept_ref_or_code)
return is_concept_code_valid(concept_code=concept_ref_or_code)


def validate_concept_ref_or_code(concept_ref_or_code: str) -> None:
Expand Down
18 changes: 15 additions & 3 deletions pipelex/core/domains/validation.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
from typing import Any

from pipelex.core.domains.exceptions import DomainCodeError
from pipelex.tools.misc.string_utils import is_snake_case


def is_domain_code_valid(code: str) -> bool:
return is_snake_case(code)
def is_domain_code_valid(code: Any) -> bool:
"""Check if a domain code is valid.

Accepts single-segment (e.g. "legal") and hierarchical dotted paths
(e.g. "legal.contracts", "legal.contracts.shareholder").
Each segment must be snake_case.
"""
if not isinstance(code, str):
return False
if not code or code.startswith(".") or code.endswith(".") or ".." in code:
return False
return all(is_snake_case(segment) for segment in code.split("."))


def validate_domain_code(code: str) -> None:
if not is_domain_code_valid(code=code):
msg = f"Domain code '{code}' is not a valid domain code. It should be in snake_case."
msg = f"Domain code '{code}' is not a valid domain code. It should be in snake_case (segments separated by dots for hierarchical domains)."
raise DomainCodeError(msg)
6 changes: 3 additions & 3 deletions pipelex/core/pipes/variable_multiplicity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

VariableMultiplicity = bool | int

MUTLIPLICITY_PATTERN = r"^([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)(?:\[(\d*)\])?$"
MUTLIPLICITY_PATTERN = r"^([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)(?:\[(\d*)\])?$"


class VariableMultiplicityResolution(BaseModel):
Expand Down Expand Up @@ -77,8 +77,8 @@ def parse_concept_with_multiplicity(concept_ref_or_code: str) -> MultiplicityPar
or if multiplicity is zero or negative (a pipe must produce at least one output)
"""
# Use strict pattern to validate identifier syntax
# Concept must start with letter/underscore, optional domain prefix, optional brackets
pattern = r"^([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)(?:\[(\d*)\])?$"
# Concept must start with letter/underscore, with zero or more dotted domain segments, optional brackets
pattern = r"^([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)(?:\[(\d*)\])?$"
match = re.match(pattern, concept_ref_or_code)

if not match:
Expand Down
Loading
Loading