Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions efemel/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
Pipeline module for functional data processing with chunked processing.

This module provides a Pipeline class that enables functional programming patterns
for data transformation and processing, using internal chunking for performance.
for data transformation and processing, using internal chunking and optional
concurrent execution for performance.
"""

from collections import deque
from collections.abc import Callable, Generator, Iterable
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from itertools import chain
from typing import Any, Self, TypeVar

T = TypeVar("T") # Type variable for the elements in the pipeline
Expand All @@ -19,7 +23,7 @@ class Pipeline[T]:

The Pipeline class wraps an iterable and provides a fluent interface for
applying transformations, filters, and reductions. Internally, it processes
data in chunks for improved performance.
data in chunks for improved performance and supports concurrent execution.

Type Parameters:
T: The type of elements in the pipeline
Expand All @@ -30,13 +34,13 @@ class Pipeline[T]:

generator: Generator[list[T], None, None]

def __init__(self, source: Iterable[T], chunk_size: int = 1000) -> "Pipeline[T]":
def __init__(self, source: Iterable[T], chunk_size: int = 1000) -> None:
"""
Initialize a new Pipeline with the given data source.

Args:
source: An iterable that provides the data for the pipeline.
If source is another Pipeline, it will be efficiently composed.
If source is another Pipeline, it will be efficiently composed.
chunk_size: Number of elements per chunk (default: 1000)
"""
if isinstance(source, Pipeline):
Expand Down Expand Up @@ -141,13 +145,19 @@ def each(self, function: Callable[[T], Any]) -> None:
for item in chunk:
function(item)

def noop(self) -> None:
"""Consume the pipeline without any operation."""
# Consume all elements in the pipeline without any operation
for _ in chain.from_iterable(self.generator):
continue

def passthrough(self) -> Self:
"""Return the pipeline unchanged (identity operation)."""
return self

def apply(self, *functions: Callable[[Self], "Pipeline[U]"]) -> "Pipeline[U]":
"""Apply sequence of transformation functions."""
result: Pipeline[T] = self
result: Pipeline[Any] = self
for function in functions:
result = function(result)
return result
Expand All @@ -160,6 +170,94 @@ def flatten_chunk(chunk: list[Iterable[U]]) -> list[U]:

return Pipeline._from_chunks(flatten_chunk(chunk) for chunk in self.generator)

def concurrent(
self,
pipeline_func: Callable[["Pipeline[T]"], "Pipeline[U]"],
max_workers: int,
ordered: bool = True,
) -> "Pipeline[U]":
"""
Applies a pipeline function to each chunk in parallel.

This method processes chunks concurrently using a thread pool, which can
significantly speed up I/O-bound or GIL-releasing tasks. The provided
function is applied to a new mini-pipeline created from each chunk.

Args:
pipeline_func: A function that takes a `Pipeline` and returns a
transformed `Pipeline`.
max_workers: The maximum number of threads to use.
ordered: If True (default), the output chunks will be in the same
order as the input. Setting to False can improve performance
by yielding results as they complete.

Returns:
A new Pipeline containing the elements from the concurrently
processed chunks.
"""

def apply_to_chunk(chunk: list[T]) -> list[U]:
# Create a mini-pipeline for the chunk that treats it as a single unit
chunk_pipeline = Pipeline(chunk, chunk_size=len(chunk))
# Apply the user's pipeline function and collect the results
processed_pipeline = pipeline_func(chunk_pipeline)
return processed_pipeline.to_list()

def ordered_generator() -> Generator[list[U], None, None]:
"""Yields results in the order they were submitted."""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
source_iterator = iter(self.generator)
futures = deque()

# Prime the executor with the initial set of tasks
for _ in range(max_workers):
try:
chunk = next(source_iterator)
futures.append(executor.submit(apply_to_chunk, chunk))
except StopIteration:
break # No more chunks in the source

# As tasks complete, yield results and submit new tasks
while futures:
completed_future = futures.popleft()
yield completed_future.result()

try:
chunk = next(source_iterator)
futures.append(executor.submit(apply_to_chunk, chunk))
except StopIteration:
continue # All chunks have been submitted

def unordered_generator() -> Generator[list[U], None, None]:
"""Yields results as soon as they complete."""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
source_iterator = iter(self.generator)
futures = set()

# Prime the executor with the initial set of tasks
for _ in range(max_workers):
try:
chunk = next(source_iterator)
futures.add(executor.submit(apply_to_chunk, chunk))
except StopIteration:
break

while futures:
# Wait for the first available result
done, futures = wait(futures, return_when=FIRST_COMPLETED)

for completed_future in done:
yield completed_future.result()
# Refill the pool with a new task
try:
chunk = next(source_iterator)
futures.add(executor.submit(apply_to_chunk, chunk))
except StopIteration:
continue

gen = ordered_generator() if ordered else unordered_generator()
return Pipeline._from_chunks(gen)

@classmethod
def chain(cls, *pipelines: "Pipeline[T]") -> "Pipeline[T]":
"""
Expand Down
28 changes: 28 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,31 @@ def test_complex_pipeline_composition(self):

# Expected: [4, 6, 8, 5, 7] -> [16, 36, 64, 25, 49] -> 190
assert result.first() == 190


class TestPipelineConcurrency:
"""Test Pipeline concurrency functionality."""

def test_concurrent(self):
# Create a simple pipeline

def _double_pipeline(p: Pipeline[int]):
return p.map(lambda x: x * 2)

result = Pipeline(range(0, 5), 1).concurrent(_double_pipeline, 5).to_list()

# Check if all numbers are doubled
assert result == [0, 2, 4, 6, 8]

def test_concurrent_consumer(self):
# Create a simple pipeline

items = []

def dummy_consumer(p: Pipeline[int]):
return p.tap(lambda x: items.append(x))

Pipeline(range(0, 5), 1).concurrent(dummy_consumer, 5).noop()

# Check if all numbers are consumed
assert items == [0, 1, 2, 3, 4]
Loading