Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 87 additions & 107 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,153 +4,133 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co

## Project Overview

This is a Python implementation of the Pipeline pattern, distributed as `thecodecrate-pipeline`. The package allows composing sequential stages into reusable, immutable pipelines for processing data. It's inspired by the PHP League Pipeline package.
This is `thecodecrate-pipeline`, a Python package that provides a pipeline pattern implementation for data processing. Pipelines allow chaining multiple stages together, where each stage transforms a payload before passing it to the next stage.

## Development Commands
**Key Concepts:**

### Environment Setup
```bash
# The project uses uv for dependency management
# Dependencies are managed in pyproject.toml under [dependency-groups]
uv sync
```
- **Pipeline**: Orchestrates the execution of multiple stages using a processor
- **Stage**: A callable unit that transforms a payload (input → output)
- **Processor**: Defines how stages are executed (e.g., chained, interruptible)
- **PipelineFactory**: Creates pipeline instances with predefined stages and processors

### Testing
```bash
# Run all tests
pytest
## Architecture

# Run tests with coverage
pytest --cov
### Code Organization

# Run a specific test file
pytest tests/test_pipeline.py
The codebase follows a concern-based architecture with clear separation between contracts, concerns, and implementations:

# Run a specific test function
pytest tests/test_pipeline.py::test_function_name
```
src/
├── thecodecrate_pipeline/ # Public API
│ └── __init__.py # Re-exports main classes
└── _lib/ # Internal implementation
├── contracts/ # Protocol definitions (interfaces)
├── concerns/ # Mixins for shared behavior
├── processors/ # Processor implementations
├── support/ # Utility patterns (Clonable, ActAsFactory)
└── types/ # Type definitions
```

### Code Quality
```bash
# Format code with ruff
ruff format .
### Key Design Patterns

# Lint with ruff
ruff check .
**Concerns Pattern**: The Pipeline class inherits from multiple concern classes (BasePipeline, ProcessablePipeline, StageablePipeline), each providing specific functionality. This follows a composition-over-inheritance approach where each concern is responsible for a single aspect:

# Fix auto-fixable linting issues
ruff check --fix .
- `BasePipeline`: Core cloning and factory behavior
- `ProcessablePipeline`: Handles processor management and execution (src/_lib/concerns/processable_pipeline.py)
- `StageablePipeline`: Manages stage collection and instantiation (src/_lib/concerns/stageable_pipeline.py)

# Format with black (line length: 79)
black .
**Contracts (Protocols)**: All contracts are defined using Python's `Protocol` type for structural subtyping. Implementations explicitly declare they implement these protocols via inheritance.

# Type checking is configured with strict mode in pyrightconfig.json
# Type check manually: pyright (if installed)
```
**Clonable Pattern**: Most classes inherit from `Clonable` (src/_lib/support/clonable/clonable.py), which provides immutable-style operations using deep copying. Methods like `pipe()`, `with_stages()`, and `with_processor()` return cloned instances rather than mutating the original.

### Documentation
```bash
# Build documentation locally
mkdocs serve
**ActAsFactory Pattern**: The `PipelineFactory` uses this pattern (src/_lib/support/act_as_factory/) to create pipeline instances with predefined configuration.

# Documentation is built with mkdocs-material and auto-generates API docs
# from docstrings using mkdocstrings-python
```
### Type System

### Version Management
```bash
# Bump version (uses bumpver)
bumpver update --patch # 1.26.0 -> 1.26.1
bumpver update --minor # 1.26.0 -> 1.27.0
bumpver update --major # 1.26.0 -> 2.0.0
The codebase is fully typed using generic types `T_in` and `T_out` for input/output payloads. All classes are generic over these types to ensure type safety through the pipeline chain.

# Note: bumpver automatically commits and tags, but does NOT push
```
## Development Commands

## Architecture
### Environment Setup

### Core Concepts
```bash
# Install uv package manager if not available
uv python install 3.13
uv sync --all-extras --dev
```

The codebase implements a pipeline pattern with three main abstractions:
### Testing

1. **Stage**: A callable unit that transforms input to output (`StageInterface[T_in, T_out]`)
2. **Pipeline**: An immutable chain of stages (`PipelineInterface[T_in, T_out]`)
3. **Processor**: Controls how stages are executed (`ProcessorInterface[T_in, T_out]`)
```bash
# Run all tests with coverage
uv run pytest tests --cov

### Directory Structure
# Run a specific test file
uv run pytest tests/test_pipeline.py

```
src/
├── _lib/ # Internal implementation
│ ├── pipeline/ # Core pipeline implementation
│ │ ├── pipeline.py # Main Pipeline class
│ │ ├── pipeline_factory.py # Factory for building pipelines
│ │ ├── processor.py # Base Processor class
│ │ ├── stage.py # Base Stage class
│ │ ├── processors/ # Built-in processors
│ │ │ ├── chained_processor.py
│ │ │ └── ...
│ │ └── traits/ # Mixins (Clonable, ActAsFactory)
│ └── processors/ # Additional processor implementations
│ ├── chained_processor/ # Processor that chains stages
│ └── interruptible_processor/ # Processor with interruption support
└── thecodecrate_pipeline/ # Public API package
├── __init__.py # Re-exports from _lib
├── processors/ # Public processor exports
└── types/ # Public type exports
# Run a specific test
uv run pytest tests/test_pipeline.py::test_lambda_stages -v
```

### Key Design Patterns
### Linting & Formatting

```bash
# Check linting
uvx ruff check .

**Immutability**: Pipelines use copy-on-write semantics. The `pipe()` method creates a new pipeline instance with the added stage, preserving the original pipeline.
# Fix linting issues automatically
uvx ruff check --fix .

**Traits System**: The codebase uses a trait-like pattern with mixins:
- `Clonable`: Provides shallow cloning capability
- `ActAsFactory`: Enables objects to act as factories for creating instances
# Format code
uvx ruff format .
```

**Interface Segregation**: Each core concept has an interface (`*Interface`) and implementation, enabling custom implementations while maintaining type safety.
### Version Bumping

**Async-First**: All processing is async (`async def process(...)`). The processor handles both sync and async callables transparently using `inspect.isawaitable()`.
```bash
# Bump version (patch/minor/major)
uv run bumpver update --patch
uv run bumpver update --minor
uv run bumpver update --major
```

### Type System
Version is managed by bumpver and automatically updates:

The codebase uses generic type variables for type safety:
- `T_in`: Input type to a stage or pipeline
- `T_out`: Output type from a stage or pipeline (defaults to `T_in`)
- `pyproject.toml`
- `src/thecodecrate_pipeline/__init__.py`

Stages can transform types:
```python
Pipeline[int, str] # Takes int, returns str
StageInterface[int, int] # Takes int, returns int
```
### Documentation

### Processing Flow
```bash
# Build documentation locally
mkdocs serve

# Build static site
mkdocs build
```

1. A Pipeline is created with stages (either via `.pipe()` or declaratively)
2. When `.process(payload)` is called, the pipeline:
- Instantiates stages if needed (converts classes to instances)
- Delegates to the processor's `.process()` method
- The processor iterates through stages, passing output to next stage
3. Processors can customize execution (e.g., ChainedProcessor for error handling)
## Important Implementation Details

### Stream Processing
### Async Processing

Pipelines support processing `AsyncIterator` streams, allowing real-time data transformation where each stage can yield results consumed immediately by the next stage.
All pipeline processing is async. The `Pipeline.process()` method and all stage `__call__` methods are async. The base `Processor._call()` method (src/_lib/processor.py:37-52) handles both sync and async callables automatically using `inspect.isawaitable()`.

### PipelineFactory
### Stage Instantiation

Because pipelines are immutable, `PipelineFactory` provides mutable stage collection during composition. It builds the final immutable pipeline via `.build()`.
Stages can be provided as either classes or instances. The `StageablePipeline` concern automatically instantiates stage classes when needed (src/_lib/concerns/stageable_pipeline.py:67-72).

## Testing Notes
### Processor Types

- Test files use stub classes in `tests/stubs/` for consistent test fixtures
- Tests are async-aware (configured via `pytest.ini` with `pytest-asyncio`)
- Mock stages implement `StageInterface` for type safety
- `ChainedProcessor`: Default processor that executes stages sequentially (src/_lib/processors/chained_processor/)
- `InterruptibleProcessor`: Allows stages to interrupt the pipeline flow (src/_lib/processors/interruptible_processor/)

## Python Version
### Callable Invocation

Requires Python 3.13+ (specified in pyproject.toml).
The `Pipeline` class is callable and delegates to `process()` (src/_lib/concerns/processable_pipeline.py:52-62). The first parameter is positional-only to match the callable signature.

## Package Distribution
## Testing Guidelines

Built with `hatchling`. The wheel includes both `thecodecrate_pipeline` (public API) and `_api` packages (internal). The public package re-exports symbols from `_lib`.
- All async tests must be marked with `@pytest.mark.asyncio`
- Test stubs are located in `tests/stubs/`
- Tests cover: lambda stages, class-based stages, pipeline-as-stage, custom processors, and factory patterns
- The pytest configuration sets `asyncio_default_fixture_loop_scope = function`
51 changes: 37 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ Pipelines are implemented as immutable stage chains. When you pipe a new stage,
Operations in a pipeline, stages, can be anything that satisfies the `Callable` type hint. So functions and anything that's callable is acceptable.

```python
pipeline = Pipeline().pipe(lambda payload: payload * 10)
pipeline = (
Pipeline()
.pipe(lambda payload: payload * 10)
)

# Returns 100
await pipeline.process(10)
Expand Down Expand Up @@ -110,7 +113,10 @@ The `T_out` type variable is optional and defaults to `T_in`. Similarly, `T_in`
```python
from typing import Any

pipeline = Pipeline[int]().pipe(lambda payload: payload * 2)
pipeline = (
Pipeline[int]()
.pipe(lambda payload: payload * 2)
)

# Returns 20
await pipeline.process(10)
Expand All @@ -119,7 +125,10 @@ await pipeline.process(10)
You can also handle varying types between stages:

```python
pipeline = Pipeline[int, str]().pipe(lambda payload: f"Number: {payload}")
pipeline = (
Pipeline[int, str]()
.pipe(lambda payload: f"Number: {payload}")
)

# Returns "Number: 10"
await pipeline.process(10)
Expand Down Expand Up @@ -149,7 +158,12 @@ class MyCustomProcessor(Processor[T_in, T_out]):
And use it in your pipeline:

```python
pipeline = Pipeline[int, int](processor=MyCustomProcessor()).pipe(lambda x: x * 2)
pipeline = (
Pipeline[int, int](
processor=MyCustomProcessor(),
)
.pipe(lambda x: x * 2)
)
```

## Declarative Stages
Expand All @@ -158,10 +172,10 @@ Instead of using `pipe` to add stages at runtime, you can define stages declarat

```python
class MyPipeline(Pipeline[int, int]):
stages = [
TimesTwoStage(),
TimesThreeStage(),
]
stages = (
TimesTwoStage, # class
TimesThreeStage(), # object
)

# Process the payload through the pipeline with the declared stages
result = await MyPipeline().process(5)
Expand All @@ -174,11 +188,11 @@ In this example, `MyPipeline` declares its stages directly in the class definiti

## Declarative Processor

You can also specify the processor in a declarative way by setting the `processor_class` attribute in your pipeline class.
You can also specify the processor in a declarative way by setting the `processor` attribute in your pipeline class.

```python
class MyPipeline(Pipeline[T_in, T_out]):
processor_class = MyCustomProcessor
processor = MyCustomProcessor
```

This allows you to customize the processing behavior of your pipeline while keeping the definition clean and declarative.
Expand Down Expand Up @@ -227,13 +241,19 @@ This allows you to process data in a streaming fashion, where each stage can yie

Because pipelines themselves are immutable, pipeline factory is introduced to facilitate distributed composition of a pipeline.

The `PipelineFactory[InputType, OutputType]` collects stages and allows you to create a pipeline at any given time.
The `PipelineFactory[T_in, T_out]` collects stages and allows you to create a pipeline at any given time.

```python
pipeline_factory = PipelineFactory().with_stages([LogicalStage(), AddOneStage()])
pipeline_factory = (
PipelineFactory()
.with_stages([
LogicalStage, # class
AddOneStage(), # object
])
)

# Additional stages can be added later
pipeline_factory.add_stage(LastStage()).with_processor(MyCustomProcessor())
pipeline_factory.add_stage(LastStage())

# Build the pipeline
pipeline = pipeline_factory.build()
Expand All @@ -244,7 +264,10 @@ pipeline = pipeline_factory.build()
This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis, either inside a _stage_ or at the time the pipeline processes a payload.

```python
pipeline = Pipeline().pipe(lambda payload: payload / 0)
pipeline = (
Pipeline()
.pipe(lambda payload: payload / 0)
)

try:
await pipeline.process(10)
Expand Down
2 changes: 2 additions & 0 deletions docs/api/core.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
::: thecodecrate_pipeline
options:
inherited_members: true
filters:
- "!Interface$"
6 changes: 3 additions & 3 deletions jupyter/01-playground.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
")\n",
"\n",
"pipeline = (\n",
" (Pipeline[int, str]())\n",
" Pipeline[int, str]()\n",
" .pipe(lambda x: x + 1)\n",
" .pipe(lambda x: x + 1)\n",
" .pipe(lambda x: f\"result is {x}\")\n",
Expand Down Expand Up @@ -260,7 +260,7 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 7,
"metadata": {},
"outputs": [
{
Expand All @@ -269,7 +269,7 @@
"12"
]
},
"execution_count": 8,
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
Expand Down
Loading
Loading