diff --git a/efemel/pipeline.py b/efemel/pipeline.py index 39a994f..cb22bd2 100644 --- a/efemel/pipeline.py +++ b/efemel/pipeline.py @@ -2,10 +2,14 @@ Pipeline module for functional data processing with chunked processing. This module provides a Pipeline class that enables functional programming patterns -for data transformation and processing, using internal chunking for performance. +for data transformation and processing, using internal chunking and optional +concurrent execution for performance. """ +from collections import deque from collections.abc import Callable, Generator, Iterable +from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait +from itertools import chain from typing import Any, Self, TypeVar T = TypeVar("T") # Type variable for the elements in the pipeline @@ -19,7 +23,7 @@ class Pipeline[T]: The Pipeline class wraps an iterable and provides a fluent interface for applying transformations, filters, and reductions. Internally, it processes - data in chunks for improved performance. + data in chunks for improved performance and supports concurrent execution. Type Parameters: T: The type of elements in the pipeline @@ -30,13 +34,13 @@ class Pipeline[T]: generator: Generator[list[T], None, None] - def __init__(self, source: Iterable[T], chunk_size: int = 1000) -> "Pipeline[T]": + def __init__(self, source: Iterable[T], chunk_size: int = 1000) -> None: """ Initialize a new Pipeline with the given data source. Args: source: An iterable that provides the data for the pipeline. - If source is another Pipeline, it will be efficiently composed. + If source is another Pipeline, it will be efficiently composed. chunk_size: Number of elements per chunk (default: 1000) """ if isinstance(source, Pipeline): @@ -141,13 +145,19 @@ def each(self, function: Callable[[T], Any]) -> None: for item in chunk: function(item) + def noop(self) -> None: + """Consume the pipeline without any operation.""" + # Consume all elements in the pipeline without any operation + for _ in chain.from_iterable(self.generator): + continue + def passthrough(self) -> Self: """Return the pipeline unchanged (identity operation).""" return self def apply(self, *functions: Callable[[Self], "Pipeline[U]"]) -> "Pipeline[U]": """Apply sequence of transformation functions.""" - result: Pipeline[T] = self + result: Pipeline[Any] = self for function in functions: result = function(result) return result @@ -160,6 +170,94 @@ def flatten_chunk(chunk: list[Iterable[U]]) -> list[U]: return Pipeline._from_chunks(flatten_chunk(chunk) for chunk in self.generator) + def concurrent( + self, + pipeline_func: Callable[["Pipeline[T]"], "Pipeline[U]"], + max_workers: int, + ordered: bool = True, + ) -> "Pipeline[U]": + """ + Applies a pipeline function to each chunk in parallel. + + This method processes chunks concurrently using a thread pool, which can + significantly speed up I/O-bound or GIL-releasing tasks. The provided + function is applied to a new mini-pipeline created from each chunk. + + Args: + pipeline_func: A function that takes a `Pipeline` and returns a + transformed `Pipeline`. + max_workers: The maximum number of threads to use. + ordered: If True (default), the output chunks will be in the same + order as the input. Setting to False can improve performance + by yielding results as they complete. + + Returns: + A new Pipeline containing the elements from the concurrently + processed chunks. + """ + + def apply_to_chunk(chunk: list[T]) -> list[U]: + # Create a mini-pipeline for the chunk that treats it as a single unit + chunk_pipeline = Pipeline(chunk, chunk_size=len(chunk)) + # Apply the user's pipeline function and collect the results + processed_pipeline = pipeline_func(chunk_pipeline) + return processed_pipeline.to_list() + + def ordered_generator() -> Generator[list[U], None, None]: + """Yields results in the order they were submitted.""" + with ThreadPoolExecutor(max_workers=max_workers) as executor: + source_iterator = iter(self.generator) + futures = deque() + + # Prime the executor with the initial set of tasks + for _ in range(max_workers): + try: + chunk = next(source_iterator) + futures.append(executor.submit(apply_to_chunk, chunk)) + except StopIteration: + break # No more chunks in the source + + # As tasks complete, yield results and submit new tasks + while futures: + completed_future = futures.popleft() + yield completed_future.result() + + try: + chunk = next(source_iterator) + futures.append(executor.submit(apply_to_chunk, chunk)) + except StopIteration: + continue # All chunks have been submitted + + def unordered_generator() -> Generator[list[U], None, None]: + """Yields results as soon as they complete.""" + with ThreadPoolExecutor(max_workers=max_workers) as executor: + source_iterator = iter(self.generator) + futures = set() + + # Prime the executor with the initial set of tasks + for _ in range(max_workers): + try: + chunk = next(source_iterator) + futures.add(executor.submit(apply_to_chunk, chunk)) + except StopIteration: + break + + while futures: + # Wait for the first available result + done, futures = wait(futures, return_when=FIRST_COMPLETED) + + for completed_future in done: + yield completed_future.result() + # Refill the pool with a new task + try: + chunk = next(source_iterator) + futures.add(executor.submit(apply_to_chunk, chunk)) + except StopIteration: + continue + + gen = ordered_generator() if ordered else unordered_generator() + return Pipeline._from_chunks(gen) + @classmethod def chain(cls, *pipelines: "Pipeline[T]") -> "Pipeline[T]": """ diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 64a01f1..7e85607 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -654,3 +654,31 @@ def test_complex_pipeline_composition(self): # Expected: [4, 6, 8, 5, 7] -> [16, 36, 64, 25, 49] -> 190 assert result.first() == 190 + + +class TestPipelineConcurrency: + """Test Pipeline concurrency functionality.""" + + def test_concurrent(self): + # Create a simple pipeline + + def _double_pipeline(p: Pipeline[int]): + return p.map(lambda x: x * 2) + + result = Pipeline(range(0, 5), 1).concurrent(_double_pipeline, 5).to_list() + + # Check if all numbers are doubled + assert result == [0, 2, 4, 6, 8] + + def test_concurrent_consumer(self): + # Create a simple pipeline + + items = [] + + def dummy_consumer(p: Pipeline[int]): + return p.tap(lambda x: items.append(x)) + + Pipeline(range(0, 5), 1).concurrent(dummy_consumer, 5).noop() + + # Check if all numbers are consumed + assert items == [0, 1, 2, 3, 4]