Skip to content

Commit e05149b

Browse files
authored
Merge pull request #50 from thecodecrate/chore/organize-as-spatie
chore: refactor code to use concerns and contracts
2 parents 9753114 + 17a1fcb commit e05149b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+440
-460
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "thecodecrate-pipeline"
3-
version = "1.27.0"
3+
version = "1.28.0"
44
description = "This package provides a pipeline pattern implementation"
55
readme = "README.md"
66
authors = [{ name = "TheCodeCrate", email = "loureiro.rg@gmail.com" }]
@@ -48,7 +48,7 @@ build-backend = "hatchling.build"
4848
line-length = 79
4949

5050
[tool.bumpver]
51-
current_version = "1.27.0"
51+
current_version = "1.28.0"
5252
version_pattern = "MAJOR.MINOR.PATCH"
5353
commit_message = "feat: bump version {old_version} -> {new_version}"
5454
tag_message = "{new_version}"

src/_lib/__init__.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +0,0 @@
1-
# Version of the package
2-
# DO NOT MODIFY MANUALLY
3-
# This will be updated by `bumpver` command.
4-
# - Make sure to commit all changes first before running `bumpver`.
5-
# - Run `bumpver update --[minor|major|patch]`
6-
__version__ = "1.26.0"
7-
8-
# Re-exporting symbols
9-
from .pipeline import CallableCollection as CallableCollection
10-
from .pipeline import CallableType as CallableType
11-
from .pipeline import Pipeline as Pipeline
12-
from .pipeline import PipelineFactory as PipelineFactory
13-
from .pipeline import PipelineFactoryInterface as PipelineFactoryInterface
14-
from .pipeline import PipelineInterface as PipelineInterface
15-
from .pipeline import Processor as Processor
16-
from .pipeline import ProcessorInterface as ProcessorInterface
17-
from .pipeline import Stage as Stage
18-
from .pipeline import StageDefinition as StageDefinition
19-
from .pipeline import StageDefinitionCollection as StageDefinitionCollection
20-
from .pipeline import StageInterface as StageInterface
21-
from .pipeline import T_in as T_in
22-
from .pipeline import T_out as T_out
23-
from .pipeline import __all__ as _pipeline_all
24-
from .processors import ChainedPipeline as ChainedPipeline
25-
from .processors import ChainedProcessor as ChainedProcessor
26-
from .processors import InterruptiblePipeline as InterruptiblePipeline
27-
from .processors import InterruptibleProcessor as InterruptibleProcessor
28-
from .processors import __all__ as _processor_all
29-
30-
# pyright: reportUnsupportedDunderAll=false
31-
__all__ = (
32-
*_pipeline_all,
33-
*_processor_all,
34-
)

src/_lib/concerns/base_pipeline.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from abc import ABC
2+
from typing import Any, Generic
3+
4+
from ..support.clonable import Clonable
5+
from ..types.types import T_in, T_out
6+
7+
8+
class BasePipeline(
9+
Clonable,
10+
Generic[T_in, T_out],
11+
ABC,
12+
):
13+
def __init__(
14+
self,
15+
*args: Any,
16+
**kwds: Any,
17+
) -> None:
18+
pass
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from abc import ABC
2+
from typing import Any, Optional, Self
3+
4+
from ..contracts.processable_pipeline import ProcessablePipeline as ImplementsInterface
5+
from ..contracts.processor import Processor as ProcessorContract
6+
from ..processors.chained_processor.chained_processor import ChainedProcessor
7+
from ..types.types import T_in, T_out
8+
9+
10+
class ProcessablePipeline(
11+
ImplementsInterface[T_in, T_out],
12+
ABC,
13+
):
14+
processor: Optional[type[ProcessorContract] | ProcessorContract]
15+
processor_instance: Optional[ProcessorContract]
16+
17+
def __init__(
18+
self,
19+
processor: Optional[type[ProcessorContract] | ProcessorContract] = None,
20+
processor_instance: Optional[ProcessorContract] = None,
21+
*args: Any,
22+
**kwds: Any,
23+
) -> None:
24+
super().__init__(*args, **kwds) # type: ignore
25+
26+
if not hasattr(self, "processor"):
27+
self.processor = self._get_default_processor()
28+
29+
if not hasattr(self, "processor_instance"):
30+
self.processor_instance = None
31+
32+
if processor:
33+
self.processor = processor
34+
35+
if processor_instance:
36+
self.processor_instance = processor_instance
37+
38+
if self._should_instantiate_processor():
39+
self._instantiate_processor()
40+
41+
async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out:
42+
"""
43+
Process the given payload through the pipeline.
44+
"""
45+
if self.processor_instance is None:
46+
raise ValueError("Processor not set")
47+
48+
return await self.processor_instance.process(
49+
payload=payload, stages=self.get_stages_instances(), *args, **kwds
50+
)
51+
52+
async def __call__(
53+
self,
54+
payload: T_in,
55+
/, # Make 'payload' a positional-only parameter
56+
*args: Any,
57+
**kwds: Any,
58+
) -> T_out:
59+
"""
60+
Processes payload through the pipeline.
61+
"""
62+
return await self.process(payload, *args, **kwds)
63+
64+
def with_processor(
65+
self, processor: type[ProcessorContract] | ProcessorContract
66+
) -> Self:
67+
"""
68+
Attachs a processor (class or instance) to the pipeline.
69+
"""
70+
cloned = self.clone({"processor": processor, "processor_instance": None})
71+
72+
return cloned._instantiate_processor()
73+
74+
def get_processor_instance(self) -> Optional[ProcessorContract]:
75+
return self.processor_instance
76+
77+
def _get_default_processor(self) -> type[ChainedProcessor[T_in, T_out]]:
78+
return ChainedProcessor
79+
80+
def _should_instantiate_processor(self) -> bool:
81+
return self.processor_instance is None
82+
83+
def _instantiate_processor(self) -> Self:
84+
if self.processor is None:
85+
raise ValueError("Processor class not set")
86+
87+
if isinstance(self.processor, type):
88+
self.processor_instance = self.processor()
89+
else:
90+
self.processor_instance = self.processor
91+
92+
if isinstance(self.processor_instance, type):
93+
raise ValueError("Processor instance could not be created")
94+
95+
return self
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from abc import ABC
2+
from typing import Any, Optional, Self
3+
4+
from ..contracts.stageable_pipeline import StageablePipeline as ImplementsInterface
5+
from ..types.callable_type import (
6+
CallableCollection,
7+
CallableType,
8+
StageDefinitionCollection,
9+
)
10+
from ..types.types import T_in, T_out
11+
12+
13+
class StageablePipeline(
14+
ImplementsInterface[T_in, T_out],
15+
ABC,
16+
):
17+
stages: StageDefinitionCollection
18+
stages_instances: CallableCollection
19+
20+
def __init__(
21+
self,
22+
stages: Optional[StageDefinitionCollection] = None,
23+
stages_instances: Optional[CallableCollection] = None,
24+
*args: Any,
25+
**kwds: Any,
26+
) -> None:
27+
super().__init__(*args, **kwds) # type: ignore
28+
29+
if not hasattr(self, "stages"):
30+
self.stages = tuple()
31+
32+
if not hasattr(self, "stages_instances"):
33+
self.stages_instances = tuple()
34+
35+
if stages:
36+
self.stages = stages
37+
38+
if stages_instances:
39+
self.stages_instances = stages_instances
40+
41+
if self._should_instantiate_stages():
42+
self._instantiate_stages()
43+
44+
def pipe(self, stage: CallableType) -> Self:
45+
"""
46+
Adds a single stage to the pipeline.
47+
"""
48+
return self.clone({"stages_instances": tuple([*self.stages_instances, stage])})
49+
50+
def with_stages(self, stages: StageDefinitionCollection) -> Self:
51+
"""
52+
Adds a collection of stages to the pipeline.
53+
"""
54+
cloned = self.clone({"stages": stages, "stages_instances": []})
55+
56+
return cloned._instantiate_stages()
57+
58+
def get_stages(self) -> StageDefinitionCollection:
59+
return self.stages
60+
61+
def get_stages_instances(self) -> CallableCollection:
62+
return self.stages_instances
63+
64+
def _should_instantiate_stages(self) -> bool:
65+
return len(self.stages_instances) == 0 and len(self.stages) > 0
66+
67+
def _instantiate_stages(self) -> Self:
68+
self.stages_instances = tuple(
69+
stage() if isinstance(stage, type) else stage for stage in self.stages
70+
)
71+
72+
return self
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from typing import Protocol
2+
3+
from ..support.clonable import ClonableInterface
4+
from ..types.types import T_in, T_out
5+
6+
7+
class BasePipeline(
8+
ClonableInterface,
9+
Protocol[T_in, T_out],
10+
):
11+
pass

src/_lib/contracts/pipeline.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from typing import Protocol
2+
3+
from ..types.types import T_in, T_out
4+
from .base_pipeline import BasePipeline as BasePipelineContract
5+
from .processable_pipeline import ProcessablePipeline as ProcessablePipelineContract
6+
from .stageable_pipeline import StageablePipeline as StageablePipelineContract
7+
8+
9+
class Pipeline(
10+
ProcessablePipelineContract[T_in, T_out],
11+
StageablePipelineContract[T_in, T_out],
12+
BasePipelineContract[T_in, T_out],
13+
Protocol[T_in, T_out],
14+
):
15+
pass
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from typing import Any, Optional, Protocol, Self
2+
3+
from ..support.act_as_factory.act_as_factory_interface import ActAsFactoryInterface
4+
from ..types.callable_type import StageDefinition, StageDefinitionCollection
5+
from ..types.types import T_in, T_out
6+
from .pipeline import Pipeline as PipelineContract
7+
8+
9+
class PipelineFactory(
10+
ActAsFactoryInterface[PipelineContract[T_in, T_out]],
11+
Protocol[T_in, T_out],
12+
):
13+
def __init__(
14+
self,
15+
stages: Optional[StageDefinitionCollection] = None,
16+
pipeline_class: Optional[type[PipelineContract[T_in, T_out]]] = None,
17+
*args: Any,
18+
**kwds: Any,
19+
) -> None: ...
20+
21+
def add_stage(self, stage: StageDefinition) -> Self: ...
22+
23+
def with_stages(self, stages: StageDefinitionCollection) -> Self: ...
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from typing import Any, Optional, Protocol, Self
2+
3+
from ..types.types import T_in, T_out
4+
from .base_pipeline import BasePipeline as BasePipelineContract
5+
from .processor import Processor as ProcessorContract
6+
from .stageable_pipeline import StageablePipeline as StageablePipelineContract
7+
8+
9+
class ProcessablePipeline(
10+
StageablePipelineContract[T_in, T_out],
11+
BasePipelineContract[T_in, T_out],
12+
Protocol[T_in, T_out],
13+
):
14+
def __init__(
15+
self,
16+
processor: Optional[type[ProcessorContract] | ProcessorContract] = None,
17+
processor_instance: Optional[ProcessorContract] = None,
18+
*args: Any,
19+
**kwds: Any,
20+
) -> None: ...
21+
22+
async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out: ...
23+
24+
async def __call__(
25+
self,
26+
payload: T_in,
27+
/, # Make 'payload' a positional-only parameter
28+
*args: Any,
29+
**kwds: Any,
30+
) -> T_out: ...
31+
32+
def with_processor(
33+
self, processor: type[ProcessorContract] | ProcessorContract
34+
) -> Self: ...
35+
36+
def get_processor_instance(self) -> Optional[ProcessorContract]: ...
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from abc import abstractmethod
22
from typing import Any, Awaitable, Protocol
33

4-
from .callable_type import CallableCollection, CallableType
5-
from .traits.clonable import ClonableInterface
6-
from .types import T_in, T_out
4+
from ..support.clonable.clonable_interface import ClonableInterface
5+
from ..types.callable_type import CallableCollection, CallableType
6+
from ..types.types import T_in, T_out
77

88

9-
class ProcessorInterface(
9+
class Processor(
1010
ClonableInterface,
1111
Protocol[T_in, T_out],
1212
):

0 commit comments

Comments
 (0)