diff --git a/README.md b/README.md index 38cd7f1..5dc6549 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ await pipeline.process(10) ## Class-Based Stages -Class-based stages are also possible. The `StageInterface[InputType, OutputType]` interface can be implemented, which ensures you have the correct method signature for the `__call__` method. +Class-based stages are also possible. The `StageInterface[T_in, T_out]` interface can be implemented, which ensures you have the correct method signature for the `__call__` method. ```python class TimesTwoStage(StageInterface[int, int]): diff --git a/docs/api/core.md b/docs/api/core.md index 44b38e8..903a805 100644 --- a/docs/api/core.md +++ b/docs/api/core.md @@ -1,15 +1,5 @@ # API Reference -::: thecodecrate_pipeline.core +::: thecodecrate_pipeline options: - show_if_no_docstring: true - preload_modules: - - "_api" - - "thecodecrate_pipeline" - members: - - T_in - - T_out - - Pipeline - - PipelineFactory - - Stage - - Processor + inherited_members: true diff --git a/docs/api/processors.md b/docs/api/processors.md index 724338f..a3acaf9 100644 --- a/docs/api/processors.md +++ b/docs/api/processors.md @@ -2,12 +2,7 @@ ::: thecodecrate_pipeline.processors options: - show_if_no_docstring: true - preload_modules: - - "_api" - - "thecodecrate_pipeline" - members: - - ChainedProcessor - - InterruptibleProcessor - - ChainedProcessorInterface - - InterruptibleProcessorInterface + inherited_members: false + filters: + - "!^_" + - "!^process$" diff --git a/docs/api/types.md b/docs/api/types.md new file mode 100644 index 0000000..f624d73 --- /dev/null +++ b/docs/api/types.md @@ -0,0 +1,5 @@ +# API Reference + +::: thecodecrate_pipeline.types + options: + inherited_members: false diff --git a/docs/index.md b/docs/index.md index 6b3ed6e..612c7a5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,252 +1 @@ -# TheCodeCrate's Pipeline - -This package provides a pipeline pattern implementation. - -The implementation is inspired by the excellent [PHP League Pipeline](https://github.com/thephpleague/pipeline) package. - -## Installation - -```bash -pip install thecodecrate-pipeline -``` - -## Pipeline Pattern - -The pipeline pattern allows you to easily compose sequential stages by chaining stages. - -In this particular implementation, the interface consists of two parts: - -- `StageInterface` -- `PipelineInterface` - -A pipeline consists of zero, one, or multiple stages. A pipeline can process a payload. During the processing, the payload will be passed to the first stage. From that moment on, the resulting value is passed on from stage to stage. - -In the simplest form, the execution chain can be represented as a for loop: - -```python -result = payload - -for stage in stages: - result = stage(result) - -return result -``` - -Effectively, this is the same as: - -```python -result = stage3(stage2(stage1(payload))) -``` - -## Immutability - -Pipelines are implemented as immutable stage chains. When you pipe a new stage, a new pipeline will be created with the added stage. This makes pipelines easy to reuse and minimizes side-effects. - -## Usage - -Operations in a pipeline, stages, can be anything that satisfies the `Callable` type hint. So functions and anything that's callable is acceptable. - -```python -pipeline = Pipeline().pipe(lambda payload: payload * 10) - -# Returns 100 -await pipeline.process(10) -``` - -## Class-Based Stages - -Class-based stages are also possible. The `StageInterface[InputType, OutputType]` interface can be implemented, which ensures you have the correct method signature for the `__call__` method. - -```python -class TimesTwoStage(StageInterface[int, int]): - async def __call__(self, payload: int) -> int: - return payload * 2 - -class AddOneStage(StageInterface[int, int]): - async def __call__(self, payload: int) -> int: - return payload + 1 - -pipeline = ( - Pipeline[int, int]() - .pipe(TimesTwoStage()) - .pipe(AddOneStage()) -) - -# Returns 21 -await pipeline.process(10) -``` - -## Reusable Pipelines - -Because the `PipelineInterface` is an extension of the `StageInterface`, pipelines can be reused as stages. This creates a highly composable model to create complex execution patterns while keeping the cognitive load low. - -For example, if we'd want to compose a pipeline to process API calls, we'd create something along these lines: - -```python -process_api_request = ( - Pipeline() - .pipe(ExecuteHttpRequest()) - .pipe(ParseJsonResponse()) -) - -pipeline = ( - Pipeline() - .pipe(ConvertToPsr7Request()) - .pipe(process_api_request) - .pipe(ConvertToResponseDto()) -) - -await pipeline.process(DeleteBlogPost(post_id)) -``` - -## Type Hinting - -You can specify the input and output types for pipelines and stages using type variables `T_in` and `T_out`. This allows you to handle varying types between stages, enhancing type safety and code clarity. - -The `T_out` type variable is optional and defaults to `T_in`. Similarly, `T_in` is also optional and defaults to `Any`. - -```python -from typing import Any - -pipeline = Pipeline[int]().pipe(lambda payload: payload * 2) - -# Returns 20 -await pipeline.process(10) -``` - -You can also handle varying types between stages: - -```python -pipeline = Pipeline[int, str]().pipe(lambda payload: f"Number: {payload}") - -# Returns "Number: 10" -await pipeline.process(10) -``` - -This flexibility allows you to build pipelines that transform data types between stages seamlessly. - -## Custom Processors - -You can create your own processors to customize how the pipeline processes stages. This allows you to implement different execution strategies, such as handling exceptions, processing resources, or implementing middleware patterns. - -For example, you can define a custom processor: - -```python -class MyCustomProcessor(Processor[T_in, T_out]): - async def process( - self, - payload: T_in, - stages: StageInstanceCollection, - ) -> T_out: - # Custom processing logic - for stage in stages: - payload = await stage(payload) - return payload -``` - -And use it in your pipeline: - -```python -pipeline = Pipeline[int, int](processor=MyCustomProcessor()).pipe(lambda x: x * 2) -``` - -## Declarative Stages - -Instead of using `pipe` to add stages at runtime, you can define stages declaratively by specifying them as class-level attributes. This makes pipelines easier to set up and reuse with predefined stages. - -```python -class MyPipeline(Pipeline[int, int]): - stages = [ - TimesTwoStage(), - TimesThreeStage(), - ] - -# Process the payload through the pipeline with the declared stages -result = await MyPipeline().process(5) - -# Returns 30 -print(result) -``` - -In this example, `MyPipeline` declares its stages directly in the class definition, making the pipeline setup more readable and maintainable. - -## Declarative Processor - -You can also specify the processor in a declarative way by setting the `processor_class` attribute in your pipeline class. - -```python -class MyPipeline(Pipeline[T_in, T_out]): - processor_class = MyCustomProcessor -``` - -This allows you to customize the processing behavior of your pipeline while keeping the definition clean and declarative. - -## Processing Streams - -The pipeline can also process streams in real-time, allowing you to handle asynchronous iterators and process data as it becomes available. - -```python -from typing import AsyncIterator -import asyncio - -async def input_stream() -> AsyncIterator[int]: - for i in range(5): - yield i - -async def stage1(stream: AsyncIterator[int]) -> AsyncIterator[int]: - async for item in stream: - yield item * 2 - await asyncio.sleep(1) # Simulate processing delay - -async def stage2(stream: AsyncIterator[int]) -> AsyncIterator[str]: - async for item in stream: - yield f"Number: {item}" - - -async def main(): - pipeline = ( - Pipeline[AsyncIterator[int], AsyncIterator[str]]() - .pipe(stage1) - .pipe(stage2) - ) - - stream = await pipeline.process(input_stream()) - - async for result in stream: - print(result) - -# Run the async main function -await main() -``` - -This allows you to process data in a streaming fashion, where each stage can yield results that are immediately consumed by the next stage. - -## Pipeline Factory - -Because pipelines themselves are immutable, pipeline factory is introduced to facilitate distributed composition of a pipeline. - -The `PipelineFactory[InputType, OutputType]` collects stages and allows you to create a pipeline at any given time. - -```python -pipeline_factory = PipelineFactory().with_stages([LogicalStage(), AddOneStage()]) - -# Additional stages can be added later -pipeline_factory.add_stage(LastStage()).with_processor(MyCustomProcessor()) - -# Build the pipeline -pipeline = pipeline_factory.build() -``` - -## Exception Handling - -This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis, either inside a _stage_ or at the time the pipeline processes a payload. - -```python -pipeline = Pipeline().pipe(lambda payload: payload / 0) - -try: - await pipeline.process(10) -except ZeroDivisionError as e: - # Handle the exception. - pass -``` +--8<-- "README.md" diff --git a/jupyter/01-playground.ipynb b/jupyter/01-playground.ipynb index a583e7e..5f8a9c4 100644 --- a/jupyter/01-playground.ipynb +++ b/jupyter/01-playground.ipynb @@ -40,7 +40,6 @@ " Pipeline,\n", ")\n", "\n", - "\n", "pipeline = (\n", " (Pipeline[int, str]())\n", " .pipe(lambda x: x + 1)\n", diff --git a/mkdocs.yml b/mkdocs.yml index e35292f..cb1dc89 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -2,14 +2,17 @@ site_name: "TheCodeCrate's Pipeline" site_url: "https://thecodecrate.github.io/python-pipeline" repo_url: "https://github.com/thecodecrate/python-pipeline" +watch: + - src + extra_css: - assets/css/extra.css - assets/css/extra-pymdownx.css theme: name: material - logo: assets/images/logo.svg - favicon: assets/images/favicon.png + # logo: assets/images/logo.svg + # favicon: assets/images/favicon.png font: text: Roboto code: Roboto Mono @@ -103,19 +106,21 @@ plugins: default_handler: python handlers: python: - paths: [src] + paths: ["src"] options: + show_if_no_docstring: true docstring_options: ignore_init_summary: true + trim_doctest_flags: true docstring_section_style: list # docstring_section_style: spacy - filters: ["!^_"] + filters: + - "!^_" heading_level: 1 - inherited_members: true merge_init_into_class: true parameter_headings: true allow_inspection: true - # preload_modules: [mkdocstrings] + preload_modules: ["_api", "thecodecrate_pipeline",] relative_crossrefs: true scoped_crossrefs: true separate_signature: true @@ -129,7 +134,7 @@ plugins: show_symbol_type_heading: true show_symbol_type_toc: true signature_crossrefs: true - summary: true + summary: false unwrap_annotated: true extra: @@ -138,13 +143,16 @@ extra: link: https://github.com/thecodecrate/python-pipeline - icon: fontawesome/brands/python link: https://pypi.org/project/thecodecrate-pipeline/ + validation: omitted_files: warn absolute_links: warn unrecognized_links: warn anchors: warn + nav: - Home: index.md - API Reference: - core: api/core.md + - types: api/types.md - processors: api/processors.md diff --git a/pyproject.toml b/pyproject.toml index fae722c..5837fa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "thecodecrate-pipeline" -version = "1.24.1" +version = "1.25.0" description = "This package provides a pipeline pattern implementation" readme = "README.md" authors = [{ name = "TheCodeCrate", email = "loureiro.rg@gmail.com" }] @@ -48,7 +48,7 @@ build-backend = "hatchling.build" line-length = 79 [tool.bumpver] -current_version = "1.24.1" +current_version = "1.25.0" version_pattern = "MAJOR.MINOR.PATCH" commit_message = "🚀 feat: bump version {old_version} -> {new_version}" tag_message = "{new_version}" diff --git a/pyrightconfig.json b/pyrightconfig.json index b002043..d202cee 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -1,5 +1,5 @@ { "typeCheckingMode": "strict", "include": ["**/*.py"], - "exclude": ["**/node_modules", "**/__pycache__", ".uv_cache", ".venv", ".git", ".vscode"] + "exclude": ["**/node_modules", "**/__pycache__", ".uv_cache", ".venv/**", ".git", ".vscode"] } diff --git a/src/_api/core/final/pipeline.py b/src/_api/core/final/pipeline.py index 5748bee..b51adc7 100644 --- a/src/_api/core/final/pipeline.py +++ b/src/_api/core/final/pipeline.py @@ -37,4 +37,6 @@ class Pipeline( ImplementsInterface[T_in, T_out], Generic[T_in, T_out], ): + """Pipeline Class""" + pass diff --git a/src/_api/core/final/processor.py b/src/_api/core/final/processor.py index ad10e0e..1222781 100644 --- a/src/_api/core/final/processor.py +++ b/src/_api/core/final/processor.py @@ -15,4 +15,6 @@ class Processor( ImplementsInterface[T_in, T_out], ABC, ): + """Processor Class""" + pass diff --git a/src/_api/core/plugins/_02_with_pipeline_as_list/external/pipeline_interface.py b/src/_api/core/plugins/_01_with_base/bases/__init__.py similarity index 100% rename from src/_api/core/plugins/_02_with_pipeline_as_list/external/pipeline_interface.py rename to src/_api/core/plugins/_01_with_base/bases/__init__.py diff --git a/src/_api/core/plugins/_01_with_base/bases/stage_callable.py b/src/_api/core/plugins/_01_with_base/bases/stage_callable.py index c4f74fc..1951e87 100644 --- a/src/_api/core/plugins/_01_with_base/bases/stage_callable.py +++ b/src/_api/core/plugins/_01_with_base/bases/stage_callable.py @@ -1,12 +1,13 @@ from typing import Any, Awaitable, Protocol -# uses: local base from .types import T_in, T_out class StageCallable( Protocol[T_in, T_out], ): + """Callable object used in the pipeline""" + def __call__( self, payload: T_in, @@ -17,9 +18,13 @@ def __call__( StageInstance = StageCallable +"""Stage object""" StageInstanceCollection = tuple[StageInstance, ...] +"""Collection of Stage objects""" StageClassOrInstance = StageInstance | type[StageInstance] +"""Stage class or object""" StageCollection = tuple[StageClassOrInstance, ...] +"""Collection of Stage classes or objects""" diff --git a/src/_api/core/plugins/_01_with_base/bases/types.py b/src/_api/core/plugins/_01_with_base/bases/types.py index 52e0733..669d959 100644 --- a/src/_api/core/plugins/_01_with_base/bases/types.py +++ b/src/_api/core/plugins/_01_with_base/bases/types.py @@ -1,5 +1,7 @@ from typing import Any, TypeVar T_in = TypeVar("T_in", default=Any, infer_variance=True) +"""Input data type for pipeline flow (payload's value type)""" T_out = TypeVar("T_out", default=T_in, infer_variance=True) +"""Output data type for pipeline flow (result's value type)""" diff --git a/src/_api/core/plugins/_02_with_pipeline_as_list/bridges/__init__.py b/src/_api/core/plugins/_02_with_pipeline_as_list/bridges/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_02_with_pipeline_as_list/mixins/__init__.py b/src/_api/core/plugins/_02_with_pipeline_as_list/mixins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_03_with_stage_as_callable/bridges/__init__.py b/src/_api/core/plugins/_03_with_stage_as_callable/bridges/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_03_with_stage_as_callable/mixins/__init__.py b/src/_api/core/plugins/_03_with_stage_as_callable/mixins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_04_with_pipeline_declared_stages/bridges/__init__.py b/src/_api/core/plugins/_04_with_pipeline_declared_stages/bridges/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_04_with_pipeline_declared_stages/mixins/__init__.py b/src/_api/core/plugins/_04_with_pipeline_declared_stages/mixins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_05_with_pipeline_factory/bases/__init__.py b/src/_api/core/plugins/_05_with_pipeline_factory/bases/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_05_with_pipeline_factory/bridges/__init__.py b/src/_api/core/plugins/_05_with_pipeline_factory/bridges/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_06_with_pipeline_processor/bases/__init__.py b/src/_api/core/plugins/_06_with_pipeline_processor/bases/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor.py b/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor.py index 7a3ab5c..132d796 100644 --- a/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor.py +++ b/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor.py @@ -18,6 +18,10 @@ class Processor( Clonable, ImplementsInterface[T_in, T_out], ): + def __init__(self, *args: Any, **kwds: Any) -> None: + """Constructor.""" + pass + @abstractmethod async def process( self, @@ -28,6 +32,15 @@ async def process( ) -> T_out: """ Process the given payload through the provided stages. + + Args: + payload (T_in): The input payload to process. + stages (StageInstanceCollection): The collection of stages to process the payload through. + *args (Any): Additional positional arguments. + **kwds (Any): Additional keyword arguments. + + Returns: + T_out: The processed output. """ pass diff --git a/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor_interface.py b/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor_interface.py index 2d434ef..188a6f4 100644 --- a/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor_interface.py +++ b/src/_api/core/plugins/_06_with_pipeline_processor/bases/processor_interface.py @@ -15,6 +15,8 @@ class ProcessorInterface( ClonableInterface, Protocol[T_in, T_out], ): + def __init__(self, *args: Any, **kwds: Any) -> None: ... + @abstractmethod async def process( self, diff --git a/src/_api/core/plugins/_06_with_pipeline_processor/bridges/__init__.py b/src/_api/core/plugins/_06_with_pipeline_processor/bridges/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_06_with_pipeline_processor/mixins/__init__.py b/src/_api/core/plugins/_06_with_pipeline_processor/mixins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_08_with_pipeline_as_stage/bridges/__init__.py b/src/_api/core/plugins/_08_with_pipeline_as_stage/bridges/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_08_with_pipeline_as_stage/mixins/__init__.py b/src/_api/core/plugins/_08_with_pipeline_as_stage/mixins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/__init__.py b/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/processors/__init__.py b/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/processors/chained_processor.py b/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/processors/chained_processor.py index dd1bdd5..e0e1101 100644 --- a/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/processors/chained_processor.py +++ b/src/_api/core/plugins/_99_with_pipeline_default_processor/bases/processors/chained_processor.py @@ -24,6 +24,18 @@ async def process( *args: Any, **kwds: Any, ) -> T_out: + """ + Process the given payload through the provided stages. + + Args: + payload (T_in): The input payload to process. + stages (StageInstanceCollection): The collection of stages to process the payload through. + *args (Any): Additional positional arguments. + **kwds (Any): Additional keyword arguments. + + Returns: + T_out: The processed output. + """ for stage in stages: payload = await self._call(callable=stage, payload=payload, *args, **kwds) diff --git a/src/_api/core/plugins/_99_with_pipeline_default_processor/bridges/__init__.py b/src/_api/core/plugins/_99_with_pipeline_default_processor/bridges/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/_99_with_pipeline_default_processor/mixins/__init__.py b/src/_api/core/plugins/_99_with_pipeline_default_processor/mixins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_api/core/plugins/__init__.py b/src/_api/core/plugins/__init__.py index e69de29..da233fd 100644 --- a/src/_api/core/plugins/__init__.py +++ b/src/_api/core/plugins/__init__.py @@ -0,0 +1,24 @@ +from ._01_with_base import __all__ as _01_with_base +from ._02_with_pipeline_as_list import __all__ as _02_with_pipeline_as_list +from ._03_with_stage_as_callable import __all__ as _03_with_stage_as_callable +from ._04_with_pipeline_declared_stages import ( + __all__ as _04_with_pipeline_declared_stages, +) +from ._05_with_pipeline_factory import __all__ as _05_with_pipeline_factory +from ._06_with_pipeline_processor import __all__ as _06_with_pipeline_processor +from ._08_with_pipeline_as_stage import __all__ as _08_with_pipeline_as_stage +from ._99_with_pipeline_default_processor import ( + __all__ as _99_with_pipeline_default_processor, +) + +# pyright: reportUnsupportedDunderAll=false +__all__ = ( + *_01_with_base, + *_02_with_pipeline_as_list, + *_03_with_stage_as_callable, + *_04_with_pipeline_declared_stages, + *_05_with_pipeline_factory, + *_06_with_pipeline_processor, + *_08_with_pipeline_as_stage, + *_99_with_pipeline_default_processor, +) diff --git a/src/_api/processors/chained_processor/chained_pipeline.py b/src/_api/processors/chained_processor/chained_pipeline.py index 613999e..c1463b1 100644 --- a/src/_api/processors/chained_processor/chained_pipeline.py +++ b/src/_api/processors/chained_processor/chained_pipeline.py @@ -1,6 +1,29 @@ +from typing import Any + from ...core import Pipeline, T_in, T_out from .chained_processor import ChainedProcessor class ChainedPipeline(Pipeline[T_in, T_out]): - processor_class = ChainedProcessor + """Default pipeline (`Pipeline` alias). Sequentially processes data through multiple stages.""" + + def __init__(self, *args: Any, **kwargs: Any): + """Constructor. + + Example: + ```python + # Process data through multiple stages + pipeline = ( + (ChainedPipeline[int]()) + .pipe(lambda payload: payload + 1) + .pipe(lambda payload: payload * 2) + .pipe(lambda payload: payload + 1) + ) + + # Assert result + assert await pipeline.process(1) == 5 + ``` + """ + processor = ChainedProcessor[T_in, T_out]() + + super().__init__(processor=processor, *args, **kwargs) diff --git a/src/_api/processors/chained_processor/chained_processor.py b/src/_api/processors/chained_processor/chained_processor.py index 2f7c892..fb60b62 100644 --- a/src/_api/processors/chained_processor/chained_processor.py +++ b/src/_api/processors/chained_processor/chained_processor.py @@ -1,3 +1,25 @@ -from ...core import ChainedProcessor +from ...core import ChainedProcessor as ChainedProcessorBase +from ...core import T_in, T_out -__all__ = ("ChainedProcessor",) + +class ChainedProcessor(ChainedProcessorBase[T_in, T_out]): + """Default processor. Sequentially processes data through multiple stages. + + Example: + ```python + # Create processor + processor = ChainedProcessor[int]() + + # Stages + result = await processor.process( + payload=5, + stages=( + lambda payload: payload + 1, + lambda payload: payload * 2, + ), + ) + + # Assert result + assert result == 12 + ``` + """ diff --git a/src/_api/processors/interruptible_processor/interruptible_pipeline.py b/src/_api/processors/interruptible_processor/interruptible_pipeline.py index a6af134..2cd1f4a 100644 --- a/src/_api/processors/interruptible_processor/interruptible_pipeline.py +++ b/src/_api/processors/interruptible_processor/interruptible_pipeline.py @@ -5,7 +5,28 @@ class InterruptiblePipeline(Pipeline[T_in, T_out]): + """Pipeline with conditional interruption.""" + def __init__(self, check: CheckCallable, *args: Any, **kwargs: Any): + """Constructor. + + Parameters: + check: Callable used to interrupt processing. + + Example: + ```python + # Interrupts when payload value exceeds 100 + pipeline = ( + InterruptiblePipeline[int](lambda payload: payload > 100) + .pipe(lambda payload: payload + 2) + .pipe(lambda payload: payload * 10) + .pipe(lambda payload: payload * 10) + ) + + # Process payload - will stop if value exceeds 100 + assert await pipeline.process(5) == 70 + ``` + """ processor = InterruptibleProcessor[T_in, T_out](check) super().__init__(processor=processor, *args, **kwargs) diff --git a/src/_api/processors/interruptible_processor/interruptible_processor.py b/src/_api/processors/interruptible_processor/interruptible_processor.py index 59381f2..b1b33f0 100644 --- a/src/_api/processors/interruptible_processor/interruptible_processor.py +++ b/src/_api/processors/interruptible_processor/interruptible_processor.py @@ -7,9 +7,38 @@ class InterruptibleProcessor(Processor[T_in, T_out]): + """Processor with conditional interruption.""" + check: CheckCallable[T_in] + """Callable for processing interruption. Useful for declarative subclassing. + + Example: + ```python + class MaxValueProcessor(InterruptibleProcessor[int, int]): + # interrupt if value exceeds 100 + check = lambda x: x > 100 + ``` + """ def __init__(self, check: CheckCallable[T_in]) -> None: + """Constructor. + + Parameters: + check: Callable for processing interruption. + + Example: + ```python + # Interrupts when payload value exceeds 100 + def check_value(payload: int) -> bool: + return payload > 100 + + # Create processor with the check + processor = InterruptibleProcessor(check_value) + + # Process payload - will stop if value exceeds 100 + result = await processor.process(initial_payload, stages) + ``` + """ super().__init__() self.check = check @@ -24,7 +53,7 @@ async def process( for stage in stages: payload = await self._call(callable=stage, payload=payload, *args, **kwds) - if not await self._call_check(payload): + if await self._call_check(payload): return cast(T_out, payload) return cast(T_out, payload) diff --git a/src/thecodecrate_pipeline/__init__.py b/src/thecodecrate_pipeline/__init__.py index 0e1fe08..9953301 100644 --- a/src/thecodecrate_pipeline/__init__.py +++ b/src/thecodecrate_pipeline/__init__.py @@ -3,29 +3,28 @@ # This will be updated by `bumpver` command. # - Make sure to commit all changes first before running `bumpver`. # - Run `bumpver update --[minor|major|patch]` -__version__ = "1.24.1" +__version__ = "1.25.0" # Re-exporting symbols -from _api import ChainedPipeline as ChainedPipeline -from _api import ChainedProcessor as ChainedProcessor -from _api import InterruptiblePipeline as InterruptiblePipeline -from _api import InterruptibleProcessor as InterruptibleProcessor -from _api import Pipeline as Pipeline -from _api import PipelineFactory as PipelineFactory -from _api import PipelineFactoryInterface as PipelineFactoryInterface -from _api import PipelineInterface as PipelineInterface -from _api import Processor as Processor -from _api import ProcessorInterface as ProcessorInterface -from _api import Stage as Stage -from _api import StageCallable as StageCallable -from _api import StageClassOrInstance as StageClassOrInstance -from _api import StageCollection as StageCollection -from _api import StageInstance as StageInstance -from _api import StageInstanceCollection as StageInstanceCollection -from _api import StageInterface as StageInterface -from _api import T_in as T_in -from _api import T_out as T_out -from _api import __all__ as _api_all +from _api.core import Pipeline as Pipeline +from _api.core import PipelineFactory as PipelineFactory +from _api.core import PipelineFactoryInterface as PipelineFactoryInterface +from _api.core import PipelineInterface as PipelineInterface +from _api.core import Processor as Processor +from _api.core import ProcessorInterface as ProcessorInterface +from _api.core import Stage as Stage +from _api.core import StageCallable as StageCallable +from _api.core import StageInterface as StageInterface # pyright: reportUnsupportedDunderAll=false -__all__ = (*_api_all,) +__all__ = ( + "Pipeline", + "PipelineFactory", + "PipelineFactoryInterface", + "PipelineInterface", + "Processor", + "ProcessorInterface", + "Stage", + "StageCallable", + "StageInterface", +) diff --git a/src/thecodecrate_pipeline/processors/__init__.py b/src/thecodecrate_pipeline/processors/__init__.py new file mode 100644 index 0000000..47ba3f5 --- /dev/null +++ b/src/thecodecrate_pipeline/processors/__init__.py @@ -0,0 +1,11 @@ +"""A collection of processors and their pipelines""" + +# Re-exporting symbols +from _api.processors import ChainedPipeline as ChainedPipeline +from _api.processors import ChainedProcessor as ChainedProcessor +from _api.processors import InterruptiblePipeline as InterruptiblePipeline +from _api.processors import InterruptibleProcessor as InterruptibleProcessor +from _api.processors import __all__ as _processors_all + +# pyright: reportUnsupportedDunderAll=false +__all__ = (*_processors_all,) diff --git a/src/thecodecrate_pipeline/types/__init__.py b/src/thecodecrate_pipeline/types/__init__.py new file mode 100644 index 0000000..876ce6a --- /dev/null +++ b/src/thecodecrate_pipeline/types/__init__.py @@ -0,0 +1,19 @@ +"""Library's public types""" + +# Re-exporting symbols +from _api.core import StageClassOrInstance as StageClassOrInstance +from _api.core import StageCollection as StageCollection +from _api.core import StageInstance as StageInstance +from _api.core import StageInstanceCollection as StageInstanceCollection +from _api.core import T_in as T_in +from _api.core import T_out as T_out + +# pyright: reportUnsupportedDunderAll=false +__all__ = ( + "StageClassOrInstance", + "StageCollection", + "StageInstance", + "StageInstanceCollection", + "T_in", + "T_out", +) diff --git a/tests/stubs/stub_processor.py b/tests/stubs/stub_processor.py index 4982476..16b0e77 100644 --- a/tests/stubs/stub_processor.py +++ b/tests/stubs/stub_processor.py @@ -1,7 +1,5 @@ -from thecodecrate_pipeline import ( - Processor, - StageInstanceCollection, -) +from thecodecrate_pipeline import Processor +from thecodecrate_pipeline.types import StageInstanceCollection class StubProcessor(Processor[int]): diff --git a/tests/stubs/stub_stage_with_custom_args.py b/tests/stubs/stub_stage_with_custom_args.py index 08d446f..8e5d80c 100644 --- a/tests/stubs/stub_stage_with_custom_args.py +++ b/tests/stubs/stub_stage_with_custom_args.py @@ -1,14 +1,8 @@ from abc import abstractmethod from typing import Awaitable, Callable, Concatenate, cast -from thecodecrate_pipeline import ( - Pipeline, - Processor, - Stage, - StageInstanceCollection, - T_in, - T_out, -) +from thecodecrate_pipeline import Pipeline, Processor, Stage +from thecodecrate_pipeline.types import StageInstanceCollection, T_in, T_out class IndexedStage(Stage[T_in, T_out]): diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 2a50926..f1f5550 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,11 +1,11 @@ import pytest -from thecodecrate_pipeline import ( - Pipeline, - StageCollection, - StageInstanceCollection, -) -from tests.stubs.stub_stage_with_custom_args import ( +from thecodecrate_pipeline import Pipeline +from thecodecrate_pipeline.types import StageCollection, StageInstanceCollection + +from .stubs.stub_processor import StubProcessor +from .stubs.stub_stage import StubStage +from .stubs.stub_stage_with_custom_args import ( IndexedPipeline, IndexedProcessor, IndexedStage, @@ -15,8 +15,6 @@ TimesThreeStage, TimesTwoStage, ) -from .stubs.stub_stage import StubStage -from .stubs.stub_processor import StubProcessor @pytest.mark.asyncio diff --git a/tests/test_pipeline_factory.py b/tests/test_pipeline_factory.py index ac2e631..967d796 100644 --- a/tests/test_pipeline_factory.py +++ b/tests/test_pipeline_factory.py @@ -1,11 +1,8 @@ import pytest -from thecodecrate_pipeline import ( - ChainedProcessor, - PipelineFactory, - Stage, - StageCollection, -) +from thecodecrate_pipeline import PipelineFactory, Stage +from thecodecrate_pipeline.processors import ChainedProcessor +from thecodecrate_pipeline.types import StageCollection from .stubs.stub_stages_int import ( AddOneStage, diff --git a/tests/test_processors.py b/tests/test_processors.py index 351f65b..c2f73a9 100644 --- a/tests/test_processors.py +++ b/tests/test_processors.py @@ -1,13 +1,14 @@ import pytest -from thecodecrate_pipeline import ( - Pipeline, + +from thecodecrate_pipeline import Pipeline +from thecodecrate_pipeline.processors import ( ChainedPipeline, ChainedProcessor, InterruptiblePipeline, InterruptibleProcessor, ) -from tests.stubs.stub_processor import StubProcessor +from .stubs.stub_processor import StubProcessor @pytest.mark.asyncio @@ -77,7 +78,7 @@ async def test_pipeline_with_chained_processor(): @pytest.mark.asyncio async def test_interruptible_pipeline(): pipeline = ( - InterruptiblePipeline[int](lambda payload: payload < 10) + InterruptiblePipeline[int](lambda payload: payload >= 10) .pipe(lambda payload: payload + 2) .pipe(lambda payload: payload * 10) .pipe(lambda payload: payload * 10) @@ -90,7 +91,7 @@ async def test_interruptible_pipeline(): @pytest.mark.asyncio async def test_interruptible_processor(): - processor = InterruptibleProcessor[int](lambda payload: payload < 10) + processor = InterruptibleProcessor[int](lambda payload: payload >= 10) result = await processor.process( payload=5, @@ -111,7 +112,7 @@ async def test_interruptible_processor(): @pytest.mark.asyncio async def test_pipeline_with_interruptible_processor(): - processor = InterruptibleProcessor[int](lambda payload: payload < 20) + processor = InterruptibleProcessor[int](lambda payload: payload >= 20) pipeline = ( (Pipeline[int](processor=processor))