Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "thecodecrate-pipeline"
version = "1.27.0"
version = "1.28.0"
description = "This package provides a pipeline pattern implementation"
readme = "README.md"
authors = [{ name = "TheCodeCrate", email = "loureiro.rg@gmail.com" }]
Expand Down Expand Up @@ -48,7 +48,7 @@ build-backend = "hatchling.build"
line-length = 79

[tool.bumpver]
current_version = "1.27.0"
current_version = "1.28.0"
version_pattern = "MAJOR.MINOR.PATCH"
commit_message = "feat: bump version {old_version} -> {new_version}"
tag_message = "{new_version}"
Expand Down
34 changes: 0 additions & 34 deletions src/_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,34 +0,0 @@
# Version of the package
# DO NOT MODIFY MANUALLY
# This will be updated by `bumpver` command.
# - Make sure to commit all changes first before running `bumpver`.
# - Run `bumpver update --[minor|major|patch]`
__version__ = "1.26.0"

# Re-exporting symbols
from .pipeline import CallableCollection as CallableCollection
from .pipeline import CallableType as CallableType
from .pipeline import Pipeline as Pipeline
from .pipeline import PipelineFactory as PipelineFactory
from .pipeline import PipelineFactoryInterface as PipelineFactoryInterface
from .pipeline import PipelineInterface as PipelineInterface
from .pipeline import Processor as Processor
from .pipeline import ProcessorInterface as ProcessorInterface
from .pipeline import Stage as Stage
from .pipeline import StageDefinition as StageDefinition
from .pipeline import StageDefinitionCollection as StageDefinitionCollection
from .pipeline import StageInterface as StageInterface
from .pipeline import T_in as T_in
from .pipeline import T_out as T_out
from .pipeline import __all__ as _pipeline_all
from .processors import ChainedPipeline as ChainedPipeline
from .processors import ChainedProcessor as ChainedProcessor
from .processors import InterruptiblePipeline as InterruptiblePipeline
from .processors import InterruptibleProcessor as InterruptibleProcessor
from .processors import __all__ as _processor_all

# pyright: reportUnsupportedDunderAll=false
__all__ = (
*_pipeline_all,
*_processor_all,
)
18 changes: 18 additions & 0 deletions src/_lib/concerns/base_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from abc import ABC
from typing import Any, Generic

from ..support.clonable import Clonable
from ..types.types import T_in, T_out


class BasePipeline(
Clonable,
Generic[T_in, T_out],
ABC,
):
def __init__(
self,
*args: Any,
**kwds: Any,
) -> None:
pass
95 changes: 95 additions & 0 deletions src/_lib/concerns/processable_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from abc import ABC
from typing import Any, Optional, Self

from ..contracts.processable_pipeline import ProcessablePipeline as ImplementsInterface
from ..contracts.processor import Processor as ProcessorContract
from ..processors.chained_processor.chained_processor import ChainedProcessor
from ..types.types import T_in, T_out


class ProcessablePipeline(
ImplementsInterface[T_in, T_out],
ABC,
):
processor: Optional[type[ProcessorContract] | ProcessorContract]
processor_instance: Optional[ProcessorContract]

def __init__(
self,
processor: Optional[type[ProcessorContract] | ProcessorContract] = None,
processor_instance: Optional[ProcessorContract] = None,
*args: Any,
**kwds: Any,
) -> None:
super().__init__(*args, **kwds) # type: ignore

if not hasattr(self, "processor"):
self.processor = self._get_default_processor()

if not hasattr(self, "processor_instance"):
self.processor_instance = None

if processor:
self.processor = processor

if processor_instance:
self.processor_instance = processor_instance

if self._should_instantiate_processor():
self._instantiate_processor()

async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out:
"""
Process the given payload through the pipeline.
"""
if self.processor_instance is None:
raise ValueError("Processor not set")

return await self.processor_instance.process(
payload=payload, stages=self.get_stages_instances(), *args, **kwds
)

async def __call__(
self,
payload: T_in,
/, # Make 'payload' a positional-only parameter
*args: Any,
**kwds: Any,
) -> T_out:
"""
Processes payload through the pipeline.
"""
return await self.process(payload, *args, **kwds)

def with_processor(
self, processor: type[ProcessorContract] | ProcessorContract
) -> Self:
"""
Attachs a processor (class or instance) to the pipeline.
"""
cloned = self.clone({"processor": processor, "processor_instance": None})

return cloned._instantiate_processor()

def get_processor_instance(self) -> Optional[ProcessorContract]:
return self.processor_instance

def _get_default_processor(self) -> type[ChainedProcessor[T_in, T_out]]:
return ChainedProcessor

def _should_instantiate_processor(self) -> bool:
return self.processor_instance is None

def _instantiate_processor(self) -> Self:
if self.processor is None:
raise ValueError("Processor class not set")

if isinstance(self.processor, type):
self.processor_instance = self.processor()
else:
self.processor_instance = self.processor

if isinstance(self.processor_instance, type):
raise ValueError("Processor instance could not be created")

return self
72 changes: 72 additions & 0 deletions src/_lib/concerns/stageable_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from abc import ABC
from typing import Any, Optional, Self

from ..contracts.stageable_pipeline import StageablePipeline as ImplementsInterface
from ..types.callable_type import (
CallableCollection,
CallableType,
StageDefinitionCollection,
)
from ..types.types import T_in, T_out


class StageablePipeline(
ImplementsInterface[T_in, T_out],
ABC,
):
stages: StageDefinitionCollection
stages_instances: CallableCollection

def __init__(
self,
stages: Optional[StageDefinitionCollection] = None,
stages_instances: Optional[CallableCollection] = None,
*args: Any,
**kwds: Any,
) -> None:
super().__init__(*args, **kwds) # type: ignore

if not hasattr(self, "stages"):
self.stages = tuple()

if not hasattr(self, "stages_instances"):
self.stages_instances = tuple()

if stages:
self.stages = stages

if stages_instances:
self.stages_instances = stages_instances

if self._should_instantiate_stages():
self._instantiate_stages()

def pipe(self, stage: CallableType) -> Self:
"""
Adds a single stage to the pipeline.
"""
return self.clone({"stages_instances": tuple([*self.stages_instances, stage])})

def with_stages(self, stages: StageDefinitionCollection) -> Self:
"""
Adds a collection of stages to the pipeline.
"""
cloned = self.clone({"stages": stages, "stages_instances": []})

return cloned._instantiate_stages()

def get_stages(self) -> StageDefinitionCollection:
return self.stages

def get_stages_instances(self) -> CallableCollection:
return self.stages_instances

def _should_instantiate_stages(self) -> bool:
return len(self.stages_instances) == 0 and len(self.stages) > 0

def _instantiate_stages(self) -> Self:
self.stages_instances = tuple(
stage() if isinstance(stage, type) else stage for stage in self.stages
)

return self
11 changes: 11 additions & 0 deletions src/_lib/contracts/base_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Protocol

from ..support.clonable import ClonableInterface
from ..types.types import T_in, T_out


class BasePipeline(
ClonableInterface,
Protocol[T_in, T_out],
):
pass
15 changes: 15 additions & 0 deletions src/_lib/contracts/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Protocol

from ..types.types import T_in, T_out
from .base_pipeline import BasePipeline as BasePipelineContract
from .processable_pipeline import ProcessablePipeline as ProcessablePipelineContract
from .stageable_pipeline import StageablePipeline as StageablePipelineContract


class Pipeline(
ProcessablePipelineContract[T_in, T_out],
StageablePipelineContract[T_in, T_out],
BasePipelineContract[T_in, T_out],
Protocol[T_in, T_out],
):
pass
23 changes: 23 additions & 0 deletions src/_lib/contracts/pipeline_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any, Optional, Protocol, Self

from ..support.act_as_factory.act_as_factory_interface import ActAsFactoryInterface
from ..types.callable_type import StageDefinition, StageDefinitionCollection
from ..types.types import T_in, T_out
from .pipeline import Pipeline as PipelineContract


class PipelineFactory(
ActAsFactoryInterface[PipelineContract[T_in, T_out]],
Protocol[T_in, T_out],
):
def __init__(
self,
stages: Optional[StageDefinitionCollection] = None,
pipeline_class: Optional[type[PipelineContract[T_in, T_out]]] = None,
*args: Any,
**kwds: Any,
) -> None: ...

def add_stage(self, stage: StageDefinition) -> Self: ...

def with_stages(self, stages: StageDefinitionCollection) -> Self: ...
36 changes: 36 additions & 0 deletions src/_lib/contracts/processable_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Any, Optional, Protocol, Self

from ..types.types import T_in, T_out
from .base_pipeline import BasePipeline as BasePipelineContract
from .processor import Processor as ProcessorContract
from .stageable_pipeline import StageablePipeline as StageablePipelineContract


class ProcessablePipeline(
StageablePipelineContract[T_in, T_out],
BasePipelineContract[T_in, T_out],
Protocol[T_in, T_out],
):
def __init__(
self,
processor: Optional[type[ProcessorContract] | ProcessorContract] = None,
processor_instance: Optional[ProcessorContract] = None,
*args: Any,
**kwds: Any,
) -> None: ...

async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out: ...

async def __call__(
self,
payload: T_in,
/, # Make 'payload' a positional-only parameter
*args: Any,
**kwds: Any,
) -> T_out: ...

def with_processor(
self, processor: type[ProcessorContract] | ProcessorContract
) -> Self: ...

def get_processor_instance(self) -> Optional[ProcessorContract]: ...
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from abc import abstractmethod
from typing import Any, Awaitable, Protocol

from .callable_type import CallableCollection, CallableType
from .traits.clonable import ClonableInterface
from .types import T_in, T_out
from ..support.clonable.clonable_interface import ClonableInterface
from ..types.callable_type import CallableCollection, CallableType
from ..types.types import T_in, T_out


class ProcessorInterface(
class Processor(
ClonableInterface,
Protocol[T_in, T_out],
):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from abc import abstractmethod
from typing import Any, Protocol

from .types import T_in, T_out
from ..types.types import T_in, T_out


class StageInterface(
class Stage(
Protocol[T_in, T_out],
):
@abstractmethod
Expand Down
30 changes: 30 additions & 0 deletions src/_lib/contracts/stageable_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Any, Optional, Protocol, Self

from ..types.callable_type import (
CallableCollection,
CallableType,
StageDefinitionCollection,
)
from ..types.types import T_in, T_out
from .base_pipeline import BasePipeline as BasePipelineContract


class StageablePipeline(
BasePipelineContract[T_in, T_out],
Protocol[T_in, T_out],
):
def __init__(
self,
stages: Optional[StageDefinitionCollection] = None,
stages_instances: Optional[CallableCollection] = None,
*args: Any,
**kwds: Any,
) -> None: ...

def pipe(self, stage: CallableType) -> Self: ...

def with_stages(self, stages: StageDefinitionCollection) -> Self: ...

def get_stages(self) -> StageDefinitionCollection: ...

def get_stages_instances(self) -> CallableCollection: ...
Loading
Loading