Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 0 additions & 310 deletions efemel/pipeline.py

This file was deleted.

82 changes: 82 additions & 0 deletions efemel/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
import itertools
from typing import Any
from typing import TypedDict
from typing import TypeVar

from .transformers.transformer import Transformer

# --- Type Aliases ---
T = TypeVar("T")
PipelineFunction = Callable[[T], Any]


class PipelineContext(TypedDict):
"""Global context available to all pipeline operations."""

pass


class Pipeline[T]:
"""
Manages a data source and applies transformers to it.
Provides terminal operations to consume the resulting data.
"""

def __init__(self, *data: Iterable[T]):
self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0]
self.processed_data: Iterator = iter(self.data_source)

def apply[U](self, transformer: Transformer[T, U] | Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]":
"""
Applies a transformer to the current data source.
"""
# The transformer is called with the current processed data, producing a new iterator
new_data = transformer(self.processed_data)
# Create a new pipeline with the transformed data
self.processed_data = new_data
return self # type: ignore

def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]":
"""
Shorthand method to apply a transformation using a lambda function.
Creates a Transformer under the hood and applies it to the pipeline.

Args:
t: A callable that takes a transformer and returns a transformed transformer

Returns:
A new Pipeline with the transformed data
"""
# Create a new transformer and apply the transformation function
transformer = t(Transformer[T, T]())
return self.apply(transformer)

def __iter__(self) -> Iterator[T]:
"""Allows the pipeline to be iterated over."""
yield from self.processed_data

def to_list(self) -> list[T]:
"""Executes the pipeline and returns the results as a list."""
return list(self.processed_data)

def each(self, function: PipelineFunction[T]) -> None:
"""Applies a function to each element (terminal operation)."""
# Context needs to be accessed from the function if it's context-aware,
# but the pipeline itself doesn't own a context. This is a design choice.
# For simplicity, we assume the function is not context-aware here
# or that context is handled within the Transformers.
for item in self.processed_data:
function(item)

def first(self, n: int = 1) -> list[T]:
"""Gets the first n elements of the pipeline (terminal operation)."""
assert n >= 1, "n must be at least 1"
return list(itertools.islice(self.processed_data, n))

def consume(self) -> None:
"""Consumes the pipeline without returning results."""
for _ in self.processed_data:
pass
Loading
Loading