From 2ab6dee7268f127793653b64bdcfda338ba60a65 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 13 Oct 2025 19:24:07 +0000 Subject: [PATCH 1/3] fix: add missing __init__.py files to directories --- src/_lib/{py.typed => concerns/__init__.py} | 0 src/_lib/contracts/__init__.py | 0 src/_lib/pipeline.py | 5 ----- src/_lib/support/__init__.py | 0 src/_lib/types/__init__.py | 0 5 files changed, 5 deletions(-) rename src/_lib/{py.typed => concerns/__init__.py} (100%) create mode 100644 src/_lib/contracts/__init__.py create mode 100644 src/_lib/support/__init__.py create mode 100644 src/_lib/types/__init__.py diff --git a/src/_lib/py.typed b/src/_lib/concerns/__init__.py similarity index 100% rename from src/_lib/py.typed rename to src/_lib/concerns/__init__.py diff --git a/src/_lib/contracts/__init__.py b/src/_lib/contracts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_lib/pipeline.py b/src/_lib/pipeline.py index 41345a5..47a7f95 100644 --- a/src/_lib/pipeline.py +++ b/src/_lib/pipeline.py @@ -1,5 +1,3 @@ -from typing import TypeVar - from .concerns.base_pipeline import BasePipeline from .concerns.processable_pipeline import ProcessablePipeline from .concerns.stageable_pipeline import StageablePipeline @@ -14,6 +12,3 @@ class Pipeline( ImplementsInterface[T_in, T_out], ): pass - - -TPipeline = TypeVar("TPipeline", bound=Pipeline, infer_variance=True) diff --git a/src/_lib/support/__init__.py b/src/_lib/support/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_lib/types/__init__.py b/src/_lib/types/__init__.py new file mode 100644 index 0000000..e69de29 From efe14d5bfedb93c8b747c7e4b8c684b7273a8a13 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 13 Oct 2025 19:24:42 +0000 Subject: [PATCH 2/3] docs: fix renamed classes import on jupyter notebooks --- jupyter/01-playground.ipynb | 6 +++--- jupyter/02-streams.ipynb | 4 ++-- jupyter/03-pipeline-factory.ipynb | 22 +++++++++++----------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/jupyter/01-playground.ipynb b/jupyter/01-playground.ipynb index 8b99906..6741b85 100644 --- a/jupyter/01-playground.ipynb +++ b/jupyter/01-playground.ipynb @@ -41,7 +41,7 @@ ")\n", "\n", "pipeline = (\n", - " (Pipeline[int, str]())\n", + " Pipeline[int, str]()\n", " .pipe(lambda x: x + 1)\n", " .pipe(lambda x: x + 1)\n", " .pipe(lambda x: f\"result is {x}\")\n", @@ -260,7 +260,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -269,7 +269,7 @@ "12" ] }, - "execution_count": 8, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } diff --git a/jupyter/02-streams.ipynb b/jupyter/02-streams.ipynb index 22fc386..233128e 100644 --- a/jupyter/02-streams.ipynb +++ b/jupyter/02-streams.ipynb @@ -56,7 +56,7 @@ ], "metadata": { "kernelspec": { - "display_name": ".venv", + "display_name": "thecodecrate-pipeline", "language": "python", "name": "python3" }, @@ -70,7 +70,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.2" + "version": "3.13.8" } }, "nbformat": 4, diff --git a/jupyter/03-pipeline-factory.ipynb b/jupyter/03-pipeline-factory.ipynb index 1e670ab..b85a763 100644 --- a/jupyter/03-pipeline-factory.ipynb +++ b/jupyter/03-pipeline-factory.ipynb @@ -14,19 +14,17 @@ } ], "source": [ - "from thecodecrate_pipeline import (\n", - " PipelineFactory,\n", - " StageCollection,\n", - ")\n", + "from thecodecrate_pipeline import PipelineFactory\n", + "from thecodecrate_pipeline.types import CallableCollection\n", "\n", "# define a factory with a few stages\n", - "some_stages: StageCollection = (\n", + "some_stages: CallableCollection = (\n", " (lambda x: x + 1),\n", " (lambda x: x + 1),\n", " (lambda x: f\"result is {x}\"),\n", ")\n", "\n", - "pipeline_factory = (PipelineFactory[int, str]()).with_stages(some_stages)\n", + "pipeline_factory = PipelineFactory[int, str]().with_stages(some_stages)\n", "\n", "# create and process\n", "pipeline = pipeline_factory.make()\n", @@ -101,12 +99,14 @@ "output_type": "stream", "text": [ "result is 12\n", - "<__main__.MyProcessor object at 0x799918ffe270>\n" + "<__main__.MyProcessor object at 0x775738eba7b0>\n" ] } ], "source": [ - "from thecodecrate_pipeline import ChainedProcessor, StageCollection\n", + "from thecodecrate_pipeline import PipelineFactory\n", + "from thecodecrate_pipeline.processors import ChainedProcessor\n", + "from thecodecrate_pipeline.types import CallableCollection\n", "\n", "\n", "# create factory with a processor\n", @@ -114,7 +114,7 @@ " pass\n", "\n", "\n", - "some_stages: StageCollection = (\n", + "some_stages: CallableCollection = (\n", " (lambda x: x + 1),\n", " (lambda x: x + 1),\n", " (lambda x: f\"result is {x}\"),\n", @@ -138,7 +138,7 @@ ], "metadata": { "kernelspec": { - "display_name": ".venv", + "display_name": "thecodecrate-pipeline", "language": "python", "name": "python3" }, @@ -152,7 +152,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.2" + "version": "3.13.8" } }, "nbformat": 4, From 7f76d9c2e3f9d10f023046b89ecfe5c5d5360b29 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 13 Oct 2025 19:33:42 +0000 Subject: [PATCH 3/3] docs: update README and mkdocs --- CLAUDE.md | 194 ++++++++++++++------------------ README.md | 51 ++++++--- docs/api/core.md | 2 + mkdocs.yml | 75 ++++++------ src/_lib/pipeline_factory.py | 6 +- src/_lib/types/callable_type.py | 18 ++- 6 files changed, 184 insertions(+), 162 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 124233c..cba4a2f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,153 +4,133 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -This is a Python implementation of the Pipeline pattern, distributed as `thecodecrate-pipeline`. The package allows composing sequential stages into reusable, immutable pipelines for processing data. It's inspired by the PHP League Pipeline package. +This is `thecodecrate-pipeline`, a Python package that provides a pipeline pattern implementation for data processing. Pipelines allow chaining multiple stages together, where each stage transforms a payload before passing it to the next stage. -## Development Commands +**Key Concepts:** -### Environment Setup -```bash -# The project uses uv for dependency management -# Dependencies are managed in pyproject.toml under [dependency-groups] -uv sync -``` +- **Pipeline**: Orchestrates the execution of multiple stages using a processor +- **Stage**: A callable unit that transforms a payload (input → output) +- **Processor**: Defines how stages are executed (e.g., chained, interruptible) +- **PipelineFactory**: Creates pipeline instances with predefined stages and processors -### Testing -```bash -# Run all tests -pytest +## Architecture -# Run tests with coverage -pytest --cov +### Code Organization -# Run a specific test file -pytest tests/test_pipeline.py +The codebase follows a concern-based architecture with clear separation between contracts, concerns, and implementations: -# Run a specific test function -pytest tests/test_pipeline.py::test_function_name +``` +src/ +├── thecodecrate_pipeline/ # Public API +│ └── __init__.py # Re-exports main classes +└── _lib/ # Internal implementation + ├── contracts/ # Protocol definitions (interfaces) + ├── concerns/ # Mixins for shared behavior + ├── processors/ # Processor implementations + ├── support/ # Utility patterns (Clonable, ActAsFactory) + └── types/ # Type definitions ``` -### Code Quality -```bash -# Format code with ruff -ruff format . +### Key Design Patterns -# Lint with ruff -ruff check . +**Concerns Pattern**: The Pipeline class inherits from multiple concern classes (BasePipeline, ProcessablePipeline, StageablePipeline), each providing specific functionality. This follows a composition-over-inheritance approach where each concern is responsible for a single aspect: -# Fix auto-fixable linting issues -ruff check --fix . +- `BasePipeline`: Core cloning and factory behavior +- `ProcessablePipeline`: Handles processor management and execution (src/_lib/concerns/processable_pipeline.py) +- `StageablePipeline`: Manages stage collection and instantiation (src/_lib/concerns/stageable_pipeline.py) -# Format with black (line length: 79) -black . +**Contracts (Protocols)**: All contracts are defined using Python's `Protocol` type for structural subtyping. Implementations explicitly declare they implement these protocols via inheritance. -# Type checking is configured with strict mode in pyrightconfig.json -# Type check manually: pyright (if installed) -``` +**Clonable Pattern**: Most classes inherit from `Clonable` (src/_lib/support/clonable/clonable.py), which provides immutable-style operations using deep copying. Methods like `pipe()`, `with_stages()`, and `with_processor()` return cloned instances rather than mutating the original. -### Documentation -```bash -# Build documentation locally -mkdocs serve +**ActAsFactory Pattern**: The `PipelineFactory` uses this pattern (src/_lib/support/act_as_factory/) to create pipeline instances with predefined configuration. -# Documentation is built with mkdocs-material and auto-generates API docs -# from docstrings using mkdocstrings-python -``` +### Type System -### Version Management -```bash -# Bump version (uses bumpver) -bumpver update --patch # 1.26.0 -> 1.26.1 -bumpver update --minor # 1.26.0 -> 1.27.0 -bumpver update --major # 1.26.0 -> 2.0.0 +The codebase is fully typed using generic types `T_in` and `T_out` for input/output payloads. All classes are generic over these types to ensure type safety through the pipeline chain. -# Note: bumpver automatically commits and tags, but does NOT push -``` +## Development Commands -## Architecture +### Environment Setup -### Core Concepts +```bash +# Install uv package manager if not available +uv python install 3.13 +uv sync --all-extras --dev +``` -The codebase implements a pipeline pattern with three main abstractions: +### Testing -1. **Stage**: A callable unit that transforms input to output (`StageInterface[T_in, T_out]`) -2. **Pipeline**: An immutable chain of stages (`PipelineInterface[T_in, T_out]`) -3. **Processor**: Controls how stages are executed (`ProcessorInterface[T_in, T_out]`) +```bash +# Run all tests with coverage +uv run pytest tests --cov -### Directory Structure +# Run a specific test file +uv run pytest tests/test_pipeline.py -``` -src/ -├── _lib/ # Internal implementation -│ ├── pipeline/ # Core pipeline implementation -│ │ ├── pipeline.py # Main Pipeline class -│ │ ├── pipeline_factory.py # Factory for building pipelines -│ │ ├── processor.py # Base Processor class -│ │ ├── stage.py # Base Stage class -│ │ ├── processors/ # Built-in processors -│ │ │ ├── chained_processor.py -│ │ │ └── ... -│ │ └── traits/ # Mixins (Clonable, ActAsFactory) -│ └── processors/ # Additional processor implementations -│ ├── chained_processor/ # Processor that chains stages -│ └── interruptible_processor/ # Processor with interruption support -└── thecodecrate_pipeline/ # Public API package - ├── __init__.py # Re-exports from _lib - ├── processors/ # Public processor exports - └── types/ # Public type exports +# Run a specific test +uv run pytest tests/test_pipeline.py::test_lambda_stages -v ``` -### Key Design Patterns +### Linting & Formatting + +```bash +# Check linting +uvx ruff check . -**Immutability**: Pipelines use copy-on-write semantics. The `pipe()` method creates a new pipeline instance with the added stage, preserving the original pipeline. +# Fix linting issues automatically +uvx ruff check --fix . -**Traits System**: The codebase uses a trait-like pattern with mixins: -- `Clonable`: Provides shallow cloning capability -- `ActAsFactory`: Enables objects to act as factories for creating instances +# Format code +uvx ruff format . +``` -**Interface Segregation**: Each core concept has an interface (`*Interface`) and implementation, enabling custom implementations while maintaining type safety. +### Version Bumping -**Async-First**: All processing is async (`async def process(...)`). The processor handles both sync and async callables transparently using `inspect.isawaitable()`. +```bash +# Bump version (patch/minor/major) +uv run bumpver update --patch +uv run bumpver update --minor +uv run bumpver update --major +``` -### Type System +Version is managed by bumpver and automatically updates: -The codebase uses generic type variables for type safety: -- `T_in`: Input type to a stage or pipeline -- `T_out`: Output type from a stage or pipeline (defaults to `T_in`) +- `pyproject.toml` +- `src/thecodecrate_pipeline/__init__.py` -Stages can transform types: -```python -Pipeline[int, str] # Takes int, returns str -StageInterface[int, int] # Takes int, returns int -``` +### Documentation -### Processing Flow +```bash +# Build documentation locally +mkdocs serve + +# Build static site +mkdocs build +``` -1. A Pipeline is created with stages (either via `.pipe()` or declaratively) -2. When `.process(payload)` is called, the pipeline: - - Instantiates stages if needed (converts classes to instances) - - Delegates to the processor's `.process()` method - - The processor iterates through stages, passing output to next stage -3. Processors can customize execution (e.g., ChainedProcessor for error handling) +## Important Implementation Details -### Stream Processing +### Async Processing -Pipelines support processing `AsyncIterator` streams, allowing real-time data transformation where each stage can yield results consumed immediately by the next stage. +All pipeline processing is async. The `Pipeline.process()` method and all stage `__call__` methods are async. The base `Processor._call()` method (src/_lib/processor.py:37-52) handles both sync and async callables automatically using `inspect.isawaitable()`. -### PipelineFactory +### Stage Instantiation -Because pipelines are immutable, `PipelineFactory` provides mutable stage collection during composition. It builds the final immutable pipeline via `.build()`. +Stages can be provided as either classes or instances. The `StageablePipeline` concern automatically instantiates stage classes when needed (src/_lib/concerns/stageable_pipeline.py:67-72). -## Testing Notes +### Processor Types -- Test files use stub classes in `tests/stubs/` for consistent test fixtures -- Tests are async-aware (configured via `pytest.ini` with `pytest-asyncio`) -- Mock stages implement `StageInterface` for type safety +- `ChainedProcessor`: Default processor that executes stages sequentially (src/_lib/processors/chained_processor/) +- `InterruptibleProcessor`: Allows stages to interrupt the pipeline flow (src/_lib/processors/interruptible_processor/) -## Python Version +### Callable Invocation -Requires Python 3.13+ (specified in pyproject.toml). +The `Pipeline` class is callable and delegates to `process()` (src/_lib/concerns/processable_pipeline.py:52-62). The first parameter is positional-only to match the callable signature. -## Package Distribution +## Testing Guidelines -Built with `hatchling`. The wheel includes both `thecodecrate_pipeline` (public API) and `_api` packages (internal). The public package re-exports symbols from `_lib`. +- All async tests must be marked with `@pytest.mark.asyncio` +- Test stubs are located in `tests/stubs/` +- Tests cover: lambda stages, class-based stages, pipeline-as-stage, custom processors, and factory patterns +- The pytest configuration sets `asyncio_default_fixture_loop_scope = function` diff --git a/README.md b/README.md index a9b711b..2489016 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,10 @@ Pipelines are implemented as immutable stage chains. When you pipe a new stage, Operations in a pipeline, stages, can be anything that satisfies the `Callable` type hint. So functions and anything that's callable is acceptable. ```python -pipeline = Pipeline().pipe(lambda payload: payload * 10) +pipeline = ( + Pipeline() + .pipe(lambda payload: payload * 10) +) # Returns 100 await pipeline.process(10) @@ -110,7 +113,10 @@ The `T_out` type variable is optional and defaults to `T_in`. Similarly, `T_in` ```python from typing import Any -pipeline = Pipeline[int]().pipe(lambda payload: payload * 2) +pipeline = ( + Pipeline[int]() + .pipe(lambda payload: payload * 2) +) # Returns 20 await pipeline.process(10) @@ -119,7 +125,10 @@ await pipeline.process(10) You can also handle varying types between stages: ```python -pipeline = Pipeline[int, str]().pipe(lambda payload: f"Number: {payload}") +pipeline = ( + Pipeline[int, str]() + .pipe(lambda payload: f"Number: {payload}") +) # Returns "Number: 10" await pipeline.process(10) @@ -149,7 +158,12 @@ class MyCustomProcessor(Processor[T_in, T_out]): And use it in your pipeline: ```python -pipeline = Pipeline[int, int](processor=MyCustomProcessor()).pipe(lambda x: x * 2) +pipeline = ( + Pipeline[int, int]( + processor=MyCustomProcessor(), + ) + .pipe(lambda x: x * 2) +) ``` ## Declarative Stages @@ -158,10 +172,10 @@ Instead of using `pipe` to add stages at runtime, you can define stages declarat ```python class MyPipeline(Pipeline[int, int]): - stages = [ - TimesTwoStage(), - TimesThreeStage(), - ] + stages = ( + TimesTwoStage, # class + TimesThreeStage(), # object + ) # Process the payload through the pipeline with the declared stages result = await MyPipeline().process(5) @@ -174,11 +188,11 @@ In this example, `MyPipeline` declares its stages directly in the class definiti ## Declarative Processor -You can also specify the processor in a declarative way by setting the `processor_class` attribute in your pipeline class. +You can also specify the processor in a declarative way by setting the `processor` attribute in your pipeline class. ```python class MyPipeline(Pipeline[T_in, T_out]): - processor_class = MyCustomProcessor + processor = MyCustomProcessor ``` This allows you to customize the processing behavior of your pipeline while keeping the definition clean and declarative. @@ -227,13 +241,19 @@ This allows you to process data in a streaming fashion, where each stage can yie Because pipelines themselves are immutable, pipeline factory is introduced to facilitate distributed composition of a pipeline. -The `PipelineFactory[InputType, OutputType]` collects stages and allows you to create a pipeline at any given time. +The `PipelineFactory[T_in, T_out]` collects stages and allows you to create a pipeline at any given time. ```python -pipeline_factory = PipelineFactory().with_stages([LogicalStage(), AddOneStage()]) +pipeline_factory = ( + PipelineFactory() + .with_stages([ + LogicalStage, # class + AddOneStage(), # object + ]) +) # Additional stages can be added later -pipeline_factory.add_stage(LastStage()).with_processor(MyCustomProcessor()) +pipeline_factory.add_stage(LastStage()) # Build the pipeline pipeline = pipeline_factory.build() @@ -244,7 +264,10 @@ pipeline = pipeline_factory.build() This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis, either inside a _stage_ or at the time the pipeline processes a payload. ```python -pipeline = Pipeline().pipe(lambda payload: payload / 0) +pipeline = ( + Pipeline() + .pipe(lambda payload: payload / 0) +) try: await pipeline.process(10) diff --git a/docs/api/core.md b/docs/api/core.md index 903a805..09849b3 100644 --- a/docs/api/core.md +++ b/docs/api/core.md @@ -3,3 +3,5 @@ ::: thecodecrate_pipeline options: inherited_members: true + filters: + - "!Interface$" diff --git a/mkdocs.yml b/mkdocs.yml index cb1dc89..3b120e4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -17,24 +17,24 @@ theme: text: Roboto code: Roboto Mono palette: - - media: "(prefers-color-scheme)" - toggle: - icon: material/theme-light-dark - name: Switch to light mode - - media: "(prefers-color-scheme: light)" - scheme: default - primary: deep purple - accent: deep purple - toggle: - icon: material/weather-sunny - name: Switch to dark mode - - media: "(prefers-color-scheme: dark)" - scheme: dracula - primary: deep purple - accent: deep purple - toggle: - icon: material/weather-night - name: Switch to system preference + - media: "(prefers-color-scheme)" + toggle: + icon: material/theme-light-dark + name: Switch to light mode + - media: "(prefers-color-scheme: light)" + scheme: default + primary: deep purple + accent: deep purple + toggle: + icon: material/weather-sunny + name: Switch to dark mode + - media: "(prefers-color-scheme: dark)" + scheme: dracula + primary: deep purple + accent: deep purple + toggle: + icon: material/weather-night + name: Switch to system preference features: - content.code.annotate @@ -57,24 +57,25 @@ markdown_extensions: anchorlink_class: "toclink" - pymdownx.blocks.admonition: types: - - new - - settings - - note - - abstract - - info - - tip - - success - - question - - warning - - failure - - danger - - bug - - example - - quote + - new + - settings + - note + - abstract + - info + - tip + - success + - question + - warning + - failure + - danger + - bug + - example + - quote - pymdownx.blocks.tab: alternate_style: True combine_header_slug: True - slugify: !!python/object/apply:pymdownx.slugs.slugify {kwds: {case: lower}} + slugify: + !!python/object/apply:pymdownx.slugs.slugify { kwds: { case: lower } } - pymdownx.blocks.caption: - pymdownx.details: - pymdownx.snippets: @@ -120,7 +121,7 @@ plugins: merge_init_into_class: true parameter_headings: true allow_inspection: true - preload_modules: ["_api", "thecodecrate_pipeline",] + preload_modules: ["_lib", "thecodecrate_pipeline"] relative_crossrefs: true scoped_crossrefs: true separate_signature: true @@ -153,6 +154,6 @@ validation: nav: - Home: index.md - API Reference: - - core: api/core.md - - types: api/types.md - - processors: api/processors.md + - core: api/core.md + - types: api/types.md + - processors: api/processors.md diff --git a/src/_lib/pipeline_factory.py b/src/_lib/pipeline_factory.py index 127f5f2..7770d3c 100644 --- a/src/_lib/pipeline_factory.py +++ b/src/_lib/pipeline_factory.py @@ -45,7 +45,7 @@ def __init__( def add_stage(self, stage: StageDefinition) -> Self: """ - Adds a single stage to the pipeline. + Add a single stage (class, object or function) to the pipeline. """ self.stages = self.stages + (stage,) @@ -53,7 +53,7 @@ def add_stage(self, stage: StageDefinition) -> Self: def with_stages(self, stages: StageDefinitionCollection) -> Self: """ - Adds a collection of stages to the pipeline. + Set a collection of stages (class, object or function) to the pipeline. """ self.stages = stages @@ -63,7 +63,7 @@ def with_processor( self, processor: type[ProcessorContract] | ProcessorContract ) -> Self: """ - Attachs a processor (class or instance) to the pipeline factory. + Attach a processor (class or instance) to the pipeline factory. """ self.processor = processor diff --git a/src/_lib/types/callable_type.py b/src/_lib/types/callable_type.py index cd62f1c..3f0aa25 100644 --- a/src/_lib/types/callable_type.py +++ b/src/_lib/types/callable_type.py @@ -6,7 +6,23 @@ class CallableType( Protocol[T_in, T_out], ): - """Protocol for callable stages that process data in a pipeline.""" + """Protocol for callable stages. + + Format: + (payload: T_in, /, *args: Any, **kwds: Any) -> T_out | Awaitable[T_out] + + Example (method): + async def process_data(payload: str) -> int: + return len(payload) + + Example (class): + class DataProcessor: + def __call__(self, payload: str) -> int: + return len(payload) + + Example (lambda): + process_data = lambda payload: len(payload) + """ def __call__( self,