diff --git a/efemel/pipeline.py b/efemel/pipeline.py deleted file mode 100644 index aea2baa..0000000 --- a/efemel/pipeline.py +++ /dev/null @@ -1,310 +0,0 @@ -""" -Pipeline module for functional data processing with chunked processing. - -This module provides a Pipeline class that enables functional programming patterns -for data transformation and processing, using internal chunking and optional -concurrent execution for performance. -""" - -from collections import deque -from collections.abc import Callable -from collections.abc import Generator -from collections.abc import Iterable -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import wait -from typing import Any -from typing import Self -from typing import TypeVar -from typing import Union -from typing import overload - -T = TypeVar("T") # Type variable for the elements in the pipeline -U = TypeVar("U") # Type variable for transformed elements - - -class Pipeline[T]: - """ - A functional pipeline for data processing with internal chunked processing. - - The Pipeline class wraps an iterable and provides a fluent interface for - applying transformations, filters, and reductions. Internally, it processes - data in chunks for improved performance and supports concurrent execution. - - Type Parameters: - T: The type of elements in the pipeline - - Attributes: - generator: Generator yielding chunks (lists) of elements - """ - - generator: Generator[list[T], None, None] - - def __init__(self, source: Union[Iterable[T], "Pipeline[T]"], chunk_size: int = 1000) -> None: - """ - Initialize a new Pipeline with the given data source. - - Args: - source: An iterable that provides the data for the pipeline. - If source is another Pipeline, it will be efficiently composed. - chunk_size: Number of elements per chunk (default: 1000) - """ - match source: - case Pipeline(): - # If source is already a Pipeline, we can use its generator directly - self.generator = source.generator - case Iterable(): - # If source is an iterable, we will chunk it - self.generator = self._chunked(source, chunk_size) - - self.chunk_size = chunk_size - - @staticmethod - def _chunked(iterable: Iterable[T], size: int) -> Generator[list[T], None, None]: - """Break an iterable into chunks of specified size.""" - chunk = [] - for item in iterable: - chunk.append(item) - if len(chunk) == size: - yield chunk - chunk = [] - if chunk: - yield chunk - - @classmethod - def _from_chunks(cls, chunks: Iterable[list[T]], chunk_size: int = 1000) -> "Pipeline[T]": - """Create a pipeline directly from an iterable of chunks.""" - p = cls([]) - p.generator = (chunk for chunk in chunks) - p.chunk_size = chunk_size - return p - - @classmethod - def from_pipeline(cls, pipeline: "Pipeline[T]") -> "Pipeline[T]": - """ - Create a new Pipeline from another Pipeline. - - This method provides an explicit way to create a new Pipeline from an existing one, - preserving the chunked structure for optimal performance. - - Args: - pipeline: The source Pipeline to copy from - - Returns: - A new Pipeline that will process the same data as the source - """ - new_pipeline = cls([]) - new_pipeline.generator = pipeline.generator - return new_pipeline - - def __iter__(self) -> Generator[T, None, None]: - """Iterate over elements by flattening chunks.""" - return (item for chunk in self.generator for item in chunk) - - def to_list(self) -> list[T]: - """Convert the pipeline to a list by concatenating all chunks.""" - return [item for chunk in self.generator for item in chunk] - - def first(self) -> T: - """Get the first element from the pipeline.""" - item = next(self.generator, None) - - if item is None: - raise StopIteration("Pipeline is empty") - - return item.pop(0) - - def filter(self, predicate: Callable[[T], bool]) -> "Pipeline[T]": - """Filter elements using a predicate, applied per chunk.""" - - return Pipeline._from_chunks( - ([x for x in chunk if predicate(x)] for chunk in self.generator), - self.chunk_size, - ) - - def map(self, function: Callable[[T], U]) -> "Pipeline[U]": - """Transform elements using a function, applied per chunk.""" - return Pipeline._from_chunks( - ([function(x) for x in chunk] for chunk in self.generator), - self.chunk_size, - ) - - def reduce(self, function: Callable[[U, T], U], initial: U) -> "Pipeline[U]": - """Reduce elements to a single value using the given function.""" - acc = initial - for chunk in self.generator: - for item in chunk: - acc = function(acc, item) - return Pipeline([acc]) - - def tap(self, function: Callable[[T], Any]) -> Self: - """Apply side effect to each element without modifying data.""" - - def tap_chunk(chunk: list[T]) -> list[T]: - return [item for item in chunk if function(item) or True] - - return Pipeline._from_chunks(tap_chunk(chunk) for chunk in self.generator) - - def each(self, function: Callable[[T], Any]) -> None: - """Apply function to each element (terminal operation).""" - deque((function(item) for chunk in self.generator for item in chunk), maxlen=0) - - def noop(self) -> None: - """Consume the pipeline without any operation.""" - # Consume all elements in the pipeline without any operation - deque(self.generator, maxlen=0) - - def passthrough(self) -> Self: - """Return the pipeline unchanged (identity operation).""" - return self - - def apply(self, function: Callable[[Self], "Pipeline[U]"]) -> "Pipeline[U]": - """Apply sequence of transformation functions.""" - return function(self) - - @overload - def flatten(self: "Pipeline[list[U]]") -> "Pipeline[U]": ... - - @overload - def flatten(self: "Pipeline[tuple[U, ...]]") -> "Pipeline[U]": ... - - @overload - def flatten(self: "Pipeline[set[U]]") -> "Pipeline[U]": ... - - def flatten( - self: Union["Pipeline[list[U]]", "Pipeline[tuple[U, ...]]", "Pipeline[set[U]]"], - ) -> "Pipeline[Any]": - """Flatten iterable chunks into a single pipeline of elements. - - This method flattens each chunk of iterables and maintains the chunked - structure to avoid memory issues with large datasets. After flattening, - the data is re-chunked to maintain the original chunk_size. - - Example: - [[1, 2], [3, 4]] -> [1, 2, 3, 4] - [(1, 2), (3, 4)] -> [1, 2, 3, 4] - - Note: - We need to overload this method to handle different iterable types because - using Iterable[U] does not preserve the type information for the flattened elements. - It returns Pipeline[Any] instead of Pipeline[U], which is incorrect. - """ - - def flatten_generator() -> Generator[Any, None, None]: - """Generator that yields individual flattened items.""" - return (item for chunk in self.generator for iterable in chunk for item in iterable) - - # Re-chunk the flattened stream to maintain consistent chunk size - return Pipeline._from_chunks(self._chunked(flatten_generator(), self.chunk_size), self.chunk_size) - - def concurrent( - self, - pipeline_func: Callable[["Pipeline[T]"], "Pipeline[U]"], - max_workers: int, - ordered: bool = True, - ) -> "Pipeline[U]": - """ - Applies a pipeline function to each chunk in parallel. - - This method processes chunks concurrently using a thread pool, which can - significantly speed up I/O-bound or GIL-releasing tasks. The provided - function is applied to a new mini-pipeline created from each chunk. - - Args: - pipeline_func: A function that takes a `Pipeline` and returns a - transformed `Pipeline`. - max_workers: The maximum number of threads to use. - ordered: If True (default), the output chunks will be in the same - order as the input. Setting to False can improve performance - by yielding results as they complete. - - Returns: - A new Pipeline containing the elements from the concurrently - processed chunks. - """ - - def apply_to_chunk(chunk: list[T]) -> list[U]: - # Create a mini-pipeline for the chunk that treats it as a single unit - chunk_pipeline = Pipeline(chunk, chunk_size=len(chunk)) - # Apply the user's pipeline function and collect the results - processed_pipeline = pipeline_func(chunk_pipeline) - return processed_pipeline.to_list() - - def ordered_generator() -> Generator[list[U], None, None]: - """Yields results in the order they were submitted.""" - with ThreadPoolExecutor(max_workers=max_workers) as executor: - source_iterator = iter(self.generator) - futures = deque() - - # Prime the executor with the initial set of tasks - for _ in range(max_workers): - try: - chunk = next(source_iterator) - futures.append(executor.submit(apply_to_chunk, chunk)) - except StopIteration: - break # No more chunks in the source - - # As tasks complete, yield results and submit new tasks - while futures: - completed_future = futures.popleft() - yield completed_future.result() - - try: - chunk = next(source_iterator) - futures.append(executor.submit(apply_to_chunk, chunk)) - except StopIteration: - continue # All chunks have been submitted - - def unordered_generator() -> Generator[list[U], None, None]: - """Yields results as soon as they complete.""" - with ThreadPoolExecutor(max_workers=max_workers) as executor: - source_iterator = iter(self.generator) - futures = set() - - # Prime the executor with the initial set of tasks - for _ in range(max_workers): - try: - chunk = next(source_iterator) - futures.add(executor.submit(apply_to_chunk, chunk)) - except StopIteration: - break - - while futures: - # Wait for the first available result - done, futures = wait(futures, return_when=FIRST_COMPLETED) - - for completed_future in done: - yield completed_future.result() - # Refill the pool with a new task - try: - chunk = next(source_iterator) - futures.add(executor.submit(apply_to_chunk, chunk)) - except StopIteration: - continue - - gen = ordered_generator() if ordered else unordered_generator() - return Pipeline._from_chunks(gen, self.chunk_size) - - @classmethod - def chain(cls, *pipelines: "Pipeline[T]") -> "Pipeline[T]": - """ - Chain multiple pipelines together sequentially. - - Args: - *pipelines: Variable number of Pipeline instances to chain - - Returns: - A new Pipeline that processes all input pipelines in sequence - """ - - def chain_generator(): - for pipeline in pipelines: - yield from pipeline.generator - - # chain preserves chunk structure exactly (just concatenates generators) - # Use chunk_size from the first pipeline, or default if no pipelines - chunk_size = pipelines[0].chunk_size if pipelines else 1000 - new_pipeline = cls.__new__(cls) - new_pipeline.generator = chain_generator() - new_pipeline.chunk_size = chunk_size - return new_pipeline diff --git a/efemel/pipeline/pipeline.py b/efemel/pipeline/pipeline.py new file mode 100644 index 0000000..37cb288 --- /dev/null +++ b/efemel/pipeline/pipeline.py @@ -0,0 +1,82 @@ +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator +import itertools +from typing import Any +from typing import TypedDict +from typing import TypeVar + +from .transformers.transformer import Transformer + +# --- Type Aliases --- +T = TypeVar("T") +PipelineFunction = Callable[[T], Any] + + +class PipelineContext(TypedDict): + """Global context available to all pipeline operations.""" + + pass + + +class Pipeline[T]: + """ + Manages a data source and applies transformers to it. + Provides terminal operations to consume the resulting data. + """ + + def __init__(self, *data: Iterable[T]): + self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0] + self.processed_data: Iterator = iter(self.data_source) + + def apply[U](self, transformer: Transformer[T, U] | Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": + """ + Applies a transformer to the current data source. + """ + # The transformer is called with the current processed data, producing a new iterator + new_data = transformer(self.processed_data) + # Create a new pipeline with the transformed data + self.processed_data = new_data + return self # type: ignore + + def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]": + """ + Shorthand method to apply a transformation using a lambda function. + Creates a Transformer under the hood and applies it to the pipeline. + + Args: + t: A callable that takes a transformer and returns a transformed transformer + + Returns: + A new Pipeline with the transformed data + """ + # Create a new transformer and apply the transformation function + transformer = t(Transformer[T, T]()) + return self.apply(transformer) + + def __iter__(self) -> Iterator[T]: + """Allows the pipeline to be iterated over.""" + yield from self.processed_data + + def to_list(self) -> list[T]: + """Executes the pipeline and returns the results as a list.""" + return list(self.processed_data) + + def each(self, function: PipelineFunction[T]) -> None: + """Applies a function to each element (terminal operation).""" + # Context needs to be accessed from the function if it's context-aware, + # but the pipeline itself doesn't own a context. This is a design choice. + # For simplicity, we assume the function is not context-aware here + # or that context is handled within the Transformers. + for item in self.processed_data: + function(item) + + def first(self, n: int = 1) -> list[T]: + """Gets the first n elements of the pipeline (terminal operation).""" + assert n >= 1, "n must be at least 1" + return list(itertools.islice(self.processed_data, n)) + + def consume(self) -> None: + """Consumes the pipeline without returning results.""" + for _ in self.processed_data: + pass diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py new file mode 100644 index 0000000..6076587 --- /dev/null +++ b/efemel/pipeline/transformers/transformer.py @@ -0,0 +1,138 @@ +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator +from functools import reduce +import inspect +import itertools +from typing import Any +from typing import Self +from typing import Union +from typing import overload + +# --- Type Aliases --- +type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] +type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] + + +class PipelineContext(dict): + """Global context available to all pipeline operations.""" + + pass + + +class Transformer[In, Out]: + """ + Defines and composes data transformations (e.g., map, filter). + + Transformers are callable and return a generator, applying the composed + transformation logic to an iterable data source. + """ + + def __init__( + self, + chunk_size: int = 1000, + context: PipelineContext | None = None, + ): + self.chunk_size = chunk_size + self.context = context or PipelineContext() + self.transformer = lambda chunk: chunk + + @classmethod + def init[T](cls, _type_hint: type[T], chunk_size: int = 1000) -> "Transformer[T, T]": + """Create a new identity pipeline with explicit type hint.""" + return cls(chunk_size) # type: ignore + + def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: + """Breaks an iterable into chunks of a specified size.""" + data_iter = iter(data) + while chunk := list(itertools.islice(data_iter, self.chunk_size)): + yield chunk + + def _pipe[U](self, operation: Callable[[list[Out]], list[U]]) -> "Transformer[In, U]": + """Composes the current transformer with a new chunk-wise operation.""" + prev_transformer = self.transformer + self.transformer = lambda chunk: operation(prev_transformer(chunk)) + return self # type: ignore + + def _create_context_aware_function(self, func: Callable) -> Callable: + """Creates a function that correctly handles the context parameter.""" + try: + sig = inspect.signature(func) + params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] + if len(params) >= 2: + return lambda value: func(value, self.context) + except (ValueError, TypeError): + pass + return func + + def _create_reduce_function(self, func: Callable) -> Callable: + """Creates a reduce function that correctly handles the context parameter.""" + try: + sig = inspect.signature(func) + params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] + if len(params) >= 3: + return lambda acc, value: func(acc, value, self.context) + except (ValueError, TypeError): + pass + return func + + def map[U](self, function: PipelineFunction[Out, U]) -> "Transformer[In, U]": + """Transforms elements in the pipeline.""" + std_function = self._create_context_aware_function(function) + return self._pipe(lambda chunk: [std_function(x) for x in chunk]) + + def filter(self, predicate: PipelineFunction[Out, bool]) -> "Transformer[In, Out]": + """Filters elements in the pipeline.""" + std_predicate = self._create_context_aware_function(predicate) + return self._pipe(lambda chunk: [x for x in chunk if std_predicate(x)]) + + @overload + def flatten[T](self: "Transformer[In, list[T]]") -> "Transformer[In, T]": ... + + @overload + def flatten[T](self: "Transformer[In, tuple[T, ...]]") -> "Transformer[In, T]": ... + + @overload + def flatten[T](self: "Transformer[In, set[T]]") -> "Transformer[In, T]": ... + + def flatten[T]( + self: Union["Transformer[In, list[T]]", "Transformer[In, tuple[T, ...]]", "Transformer[In, set[T]]"], + ) -> "Transformer[In, T]": + """Flatten nested lists into a single list.""" + return self._pipe(lambda chunk: [item for sublist in chunk for item in sublist]) # type: ignore + + def tap(self, function: PipelineFunction[Out, Any]) -> "Transformer[In, Out]": + """Applies a side-effect function without modifying the data.""" + std_function = self._create_context_aware_function(function) + + def tap_operation(chunk: list[Out]) -> list[Out]: + for item in chunk: + std_function(item) + return chunk + + return self._pipe(tap_operation) + + def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In, T]": + """Apply another pipeline to the current one.""" + return t(self) + + # Terminal operations + # These operations execute the transformer on a data source and yield results. + # If you want to operate on the results, you need to use a Pipeline and apply + # a different transformer to it. + + def __call__(self, data: Iterable[In]) -> Iterator[Out]: + """ + Executes the transformer on a data source (terminal operations). + """ + for chunk in self._chunk_generator(data): + yield from self.transformer(chunk) + + def reduce[U](self, function: PipelineReduceFunction[Out, U], initial: U): + """Reduces elements to a single value (terminal operation).""" + reducer = self._create_reduce_function(function) + + def _reduce(data: Iterable[In]) -> Iterator[U]: + yield reduce(reducer, self(data), initial) + + return _reduce diff --git a/performance_test.py b/performance_test.py new file mode 100644 index 0000000..5dd0374 --- /dev/null +++ b/performance_test.py @@ -0,0 +1,586 @@ +#!/usr/bin/env python3 +""" +Performance test for Pipeline class comparing different data processing approaches. + +This test compares: +1. Pipeline class operations +2. Chained generator expressions with for/yield +3. Itertools map/filter chaining + +Tests are run on 1 million items, 100 times each to get statistical data. +""" + +import itertools +from itertools import islice +import statistics +import time + +from efemel.pipeline import Pipeline + + +def generate_test_data(size: int = 1_000_000) -> list[int]: + """Generate test data of specified size.""" + return range(size) + + +def generator_approach(data: list[int]) -> list[int]: + """Process data using chained generator expressions.""" + + def step1(items): + """Filter even numbers.""" + for item in items: + if item % 2 == 0: + yield item + + def step2(items): + """Double the numbers.""" + for item in items: + yield item * 2 + + def step3(items): + """Filter > 100.""" + for item in items: + if item > 100: + yield item + + def step4(items): + """Add 1.""" + for item in items: + yield item + 1 + + return list(step4(step3(step2(step1(data))))) + + +def builtin_map_filter_approach(data: list[int]) -> list[int]: + """Process data using built-in map/filter chaining.""" + result = filter(lambda x: x % 2 == 0, data) # Keep even numbers + result = (x * 2 for x in result) # Double them + result = filter(lambda x: x > 100, result) # Keep only > 100 + result = (x + 1 for x in result) # Add 1 + return list(result) + + +def generator_expression_approach(data: list[int]) -> list[int]: + """Process data using generator expressions.""" + result = (x for x in data if x % 2 == 0) # Keep even numbers + result = (x * 2 for x in result) # Double them + result = (x for x in result if x > 100) # Keep only > 100 + result = (x + 1 for x in result) # Add 1 + return list(result) + + +def list_comprehension_approach(data: list[int]) -> list[int]: + """Process data using separate list comprehensions with intermediate lists.""" + # Create intermediate lists at each step to match other approaches + step1 = [x for x in data if x % 2 == 0] # Filter even numbers + step2 = [x * 2 for x in step1] # Double them + step3 = [x for x in step2 if x > 100] # Filter > 100 + step4 = [x + 1 for x in step3] # Add 1 + return step4 + + +def chunked_generator_listcomp_approach(data: list[int]) -> list[int]: + """Process data using chunked generators with intermediate lists per chunk.""" + + def chunk_generator(data, chunk_size=1000): + """Generate chunks using a generator.""" + for i in range(0, len(data), chunk_size): + yield data[i : i + chunk_size] + + # Process each chunk with intermediate lists and combine + results = [] + for chunk in chunk_generator(data): + # Create intermediate lists for each chunk + step1 = [x for x in chunk if x % 2 == 0] # Filter even numbers + step2 = [x * 2 for x in step1] # Double them + step3 = [x for x in step2 if x > 100] # Filter > 100 + step4 = [x + 1 for x in step3] # Add 1 + results.extend(step4) + return results + + +def mutated_chunked_generator_listcomp_approach(data: list[int]) -> list[int]: + """Process data using chunked generators with intermediate lists per chunk.""" + + def chunk_generator(data, chunk_size=1000): + """Generate chunks using a generator.""" + while True: + chunk = list(islice(data, chunk_size)) + if not chunk: + return + yield chunk + + # Process each chunk with intermediate lists and combine + results = [] + for chunk in chunk_generator(data): + # Create intermediate lists for each chunk + step = [x for x in chunk if x % 2 == 0] # Filter even numbers + step = [x * 2 for x in step] # Double them + step = [x for x in step if x > 100] # Filter > 100 + step = [x + 1 for x in step] # Add 1 + results.extend(step) + return results + + +class ChunkedPipeline: + """ + A chunked pipeline that composes operations into a single function, + inspired by a callback-chaining pattern. + """ + + def __init__(self, chunk_size=1000): + """Initialize the pipeline with data and chunk size.""" + self.chunk_size = chunk_size + # The transformer starts as an identity function that does nothing. + self.transformer = lambda chunk: chunk + + def _chunk_generator(self, data): + """Generate chunks from the data.""" + data_iter = iter(data) + while True: + chunk = list(itertools.islice(data_iter, self.chunk_size)) + if not chunk: + break + yield chunk + + def pipe(self, operation): + """ + Composes the current transformer with a new chunk-wise operation. + The operation should be a function that takes a list and returns a list. + """ + # Capture the current transformer + prev_transformer = self.transformer + # Create a new transformer that first calls the old one, then the new operation + self.transformer = lambda chunk: operation(prev_transformer(chunk)) + return self + + def filter(self, predicate): + """Adds a filter operation by wrapping the current transformer in a list comprehension.""" + return self.pipe(lambda chunk: [x for x in chunk if predicate(x)]) + + def map(self, func): + """Adds a map operation by wrapping the current transformer in a list comprehension.""" + return self.pipe(lambda chunk: [func(x) for x in chunk]) + + def run(self, data): + """ + Execute the pipeline by calling the single, composed transformer on each chunk. + """ + for chunk in self._chunk_generator(data): + # The transformer is a single function that contains all the nested operations. + yield from self.transformer(chunk) + + +CHUNKED_PIPELINE = ( + ChunkedPipeline() + .filter(lambda x: x % 2 == 0) # Filter even numbers + .map(lambda x: x * 2) # Double them + .filter(lambda x: x > 100) # Filter > 100 + .map(lambda x: x + 1) +) + + +class ChunkedPipelineGenerator: + """ + A chunked pipeline that composes operations into a single generator function, + using lazy evaluation throughout the pipeline. + """ + + def __init__(self, chunk_size=1000): + """Initialize the pipeline with chunk size.""" + self.chunk_size = chunk_size + # The transformer starts as an identity generator that yields items as-is + self.transformer = lambda chunk: (x for x in chunk) + + def _chunk_generator(self, data): + """Generate chunks from the data.""" + data_iter = iter(data) + while True: + chunk = list(itertools.islice(data_iter, self.chunk_size)) + if not chunk: + break + yield chunk + + def pipe(self, operation): + """ + Composes the current transformer with a new chunk-wise operation. + The operation should be a function that takes a generator and returns a generator. + """ + # Capture the current transformer + prev_transformer = self.transformer + # Create a new transformer that first calls the old one, then the new operation + self.transformer = lambda chunk: operation(prev_transformer(chunk)) + return self + + def filter(self, predicate): + """Adds a filter operation using generator expression.""" + return self.pipe(lambda gen: (x for x in gen if predicate(x))) + + def map(self, func): + """Adds a map operation using generator expression.""" + return self.pipe(lambda gen: (func(x) for x in gen)) + + def run(self, data): + """ + Execute the pipeline by calling the single, composed transformer on each chunk. + Yields items lazily from the generator pipeline. + """ + for chunk in self._chunk_generator(data): + # The transformer is a single function that contains all the nested generator operations + yield from self.transformer(chunk) + + +# Create a generator-based pipeline instance +PIPELINE_GENERATOR = ( + ChunkedPipelineGenerator() + .filter(lambda x: x % 2 == 0) # Filter even numbers + .map(lambda x: x * 2) # Double them + .filter(lambda x: x > 100) # Filter > 100 + .map(lambda x: x + 1) +) + + +def chunked_pipeline_approach(data: list[int]) -> list[int]: + """Process data using the ChunkedPipeline class.""" + return list(CHUNKED_PIPELINE.run(data)) + + +def chunked_pipeline_generator_approach(data: list[int]) -> list[int]: + """Process data using the ChunkedPipelineGenerator class.""" + return list(PIPELINE_GENERATOR.run(data)) + + +class ChunkedPipelineSimple: + """ + A simple chunked pipeline that stores operations in a list and applies them sequentially + using list comprehensions when executed. + """ + + def __init__(self, chunk_size=1000): + """Initialize the pipeline with chunk size.""" + self.chunk_size = chunk_size + self.operations = [] + + def _chunk_generator(self, data): + """Generate chunks from the data.""" + data_iter = iter(data) + while True: + chunk = list(itertools.islice(data_iter, self.chunk_size)) + if not chunk: + break + yield chunk + + def pipe(self, operation): + """ + Add an operation to the pipeline. + The operation should be a function that takes a list and returns a list. + """ + self.operations.append(operation) + return self + + def filter(self, predicate): + """Add a filter operation to the pipeline.""" + return self.pipe(lambda chunk: [x for x in chunk if predicate(x)]) + + def map(self, func): + """Add a map operation to the pipeline.""" + return self.pipe(lambda chunk: [func(x) for x in chunk]) + + def run(self, data): + """ + Execute the pipeline by applying all operations sequentially to each chunk. + """ + for chunk in self._chunk_generator(data): + # Apply all operations sequentially to the chunk + current_chunk = chunk + for operation in self.operations: + current_chunk = operation(current_chunk) + + # Yield each item from the processed chunk + yield from current_chunk + + +# Create a simple pipeline instance +PIPELINE_SIMPLE = ( + ChunkedPipelineSimple() + .filter(lambda x: x % 2 == 0) # Filter even numbers + .map(lambda x: x * 2) # Double them + .filter(lambda x: x > 100) # Filter > 100 + .map(lambda x: x + 1) +) + + +def chunked_pipeline_simple_approach(data: list[int]) -> list[int]: + """Process data using the ChunkedPipelineSimple class.""" + return list(PIPELINE_SIMPLE.run(data)) + + +# Sentinel value to indicate that an item has been filtered out. +_SKIPPED = object() + + +class ChunkedPipelinePerItem: + """ + A chunked pipeline that composes operations into a single function + operating on individual items, inspired by a callback-chaining pattern + to reduce the creation of intermediate lists. + """ + + def __init__(self, chunk_size=1000): + """Initialize the pipeline with a specified chunk size.""" + self.chunk_size = chunk_size + # The transformer starts as an identity function. + self.transformer = lambda item: item + + def _chunk_generator(self, data): + """A generator that yields chunks of data.""" + data_iter = iter(data) + while True: + chunk = list(itertools.islice(data_iter, self.chunk_size)) + if not chunk: + break + yield chunk + + def pipe(self, operation): + """ + Composes the current transformer with a new item-wise operation. + This internal method is the core of the pipeline's composition logic. + """ + prev_transformer = self.transformer + + def new_transformer(item): + # Apply the existing chain of transformations. + processed_item = prev_transformer(item) + + # If a previous operation (like a filter) already skipped this item, + # we bypass the new operation entirely. + if processed_item is _SKIPPED: + return _SKIPPED + + # Apply the new operation to the result of the previous ones. + return operation(processed_item) + + self.transformer = new_transformer + return self + + def filter(self, predicate): + """ + Adds a filter operation to the pipeline. + + If the predicate returns `False`, the item is marked as skipped, + and no further operations in the chain will be executed on it. + """ + + def filter_operation(item): + return item if predicate(item) else _SKIPPED + + return self.pipe(filter_operation) + + def map(self, func): + """Adds a map operation to transform an item.""" + return self.pipe(func) + + def run(self, data): + """ + Executes the pipeline. + + The composed transformer function is applied to each item individually. + Results are yielded only if they haven't been marked as skipped. + """ + for chunk in self._chunk_generator(data): + yield from [result for item in chunk if (result := self.transformer(item)) is not _SKIPPED] + + +PIPELINE_PER_ITEM = ( + ChunkedPipelinePerItem() + .filter(lambda x: x % 2 == 0) # Filter even numbers + .map(lambda x: x * 2) # Double them + .filter(lambda x: x > 100) # Filter > 100 + .map(lambda x: x + 1) +) + + +def chunked_pipeline_per_item_approach(data) -> list[int]: + """Process data using the ChunkedPipelinePerItem class.""" + return list(PIPELINE_PER_ITEM.run(data)) + + +PIPELINE = ( + Pipeline.init(int) + .filter(lambda x: x % 2 == 0) # Filter even numbers + .map(lambda x: x * 2) # Double them + .filter(lambda x: x > 100) # Filter > 100 + .map(lambda x: x + 1) +) + + +def pipeline_approach(data: list[int]) -> list[int]: + """Process data using the Pipeline class.""" + return PIPELINE.to_list(data) + + +def time_function(func, *args, **kwargs) -> float: + """Time a function execution and return duration in seconds.""" + start_time = time.perf_counter() + func(*args, **kwargs) + end_time = time.perf_counter() + return end_time - start_time + + +def run_performance_test(): + """Run comprehensive performance test.""" + print("šŸš€ Starting Performance Test") + print("=" * 60) + + # Test configurations + approaches = { + # "Generators": generator_approach, + # "Map/Filter": builtin_map_filter_approach, + # "GeneratorExpression": generator_expression_approach, + "ChunkedGeneratorListComp": chunked_generator_listcomp_approach, + "ChunkedPipeline": chunked_pipeline_approach, + "Pipeline": pipeline_approach, + # "ChunkedPipelinePerItem": chunked_pipeline_per_item_approach, + # "ChunkedPipelineGenerator": chunked_pipeline_generator_approach, + # "ChunkedPipelineSimple": chunked_pipeline_simple_approach, + # "ListComprehension": list_comprehension_approach, + # "MutatedChunkedGeneratorListComp": mutated_chunked_generator_listcomp_approach, + } + + num_runs = 20 + results = {} + + for name, approach_func in approaches.items(): + print(f"\nšŸ”„ Testing {name} approach ({num_runs} runs)...") + times = [] + + # Warm up + approach_func(generate_test_data(1_000_000)) + + # Actual timing runs + for run in range(num_runs): + print(f" Run {run + 1}/{num_runs}") + + duration = time_function(approach_func, generate_test_data(1_000_000)) + times.append(duration) + + # Calculate statistics + results[name] = { + "times": times, + "min": min(times), + "max": max(times), + "avg": statistics.mean(times), + "p95": statistics.quantiles(times, n=20)[18], # 95th percentile + "p99": statistics.quantiles(times, n=100)[98], # 99th percentile + "median": statistics.median(times), + "stdev": statistics.stdev(times) if len(times) > 1 else 0, + } # Print results + print("\n" + "=" * 60) + print("šŸ“ˆ PERFORMANCE RESULTS") + print("=" * 60) + + # Header + header = ( + f"{'Approach':<12} {'Min (s)':<10} {'Max (s)':<10} {'Avg (s)':<10} " + f"{'P95 (s)':<10} {'P99 (s)':<10} {'Median (s)':<12} {'StdDev':<10}" + ) + print(header) + print("-" * 104) + + # Sort by average time + sorted_results = sorted(results.items(), key=lambda x: x[1]["avg"]) + + for name, stats in sorted_results: + print( + f"{name:<12} {stats['min']:<10.4f} {stats['max']:<10.4f} {stats['avg']:<10.4f} " + f"{stats['p95']:<10.4f} {stats['p99']:<10.4f} {stats['median']:<12.4f} {stats['stdev']:<10.4f}" + ) + + # Relative performance + print("\n" + "=" * 60) + print("šŸ† RELATIVE PERFORMANCE") + print("=" * 60) + + baseline = sorted_results[0][1]["avg"] # Fastest approach as baseline + + for name, stats in sorted_results: + ratio = stats["avg"] / baseline + percentage = (ratio - 1) * 100 + if ratio == 1.0: + print(f"{name:<12} 1.00x (baseline)") + else: + print(f"{name:<12} {ratio:.2f}x ({percentage:+.1f}% slower)") + + # Verify correctness + print("\n" + "=" * 60) + print("āœ… CORRECTNESS VERIFICATION") + print("=" * 60) + + # Use smaller dataset for verification + results_correctness = {} + + for name, approach_func in approaches.items(): + result = approach_func(generate_test_data(1_000)) + results_correctness[name] = result + print(f"{name:<12} Result length: {len(result)}") + + # Check if all approaches produce the same result + first_result = next(iter(results_correctness.values())) + all_same = all(result == first_result for result in results_correctness.values()) + + if all_same: + print("āœ… All approaches produce identical results") + print(f" Sample result (first 10): {first_result[:10]}") + else: + print("āŒ Approaches produce different results!") + for name, result in results_correctness.items(): + print(f" {name}: {result[:10]} (length: {len(result)})") + + +def run_memory_test(): + """Run a quick memory efficiency comparison.""" + print("\n" + "=" * 60) + print("šŸ’¾ MEMORY EFFICIENCY TEST") + print("=" * 60) + + import tracemalloc + + approaches = { + # "Generators": generator_approach, + # "MapFilter": builtin_map_filter_approach, + # "GeneratorExpression": generator_expression_approach, + "ChunkedGeneratorListComp": chunked_generator_listcomp_approach, + "ChunkedPipeline": chunked_pipeline_approach, + "Pipeline": pipeline_approach, + # "ChunkedPipelinePerItem": chunked_pipeline_per_item_approach, + # "ChunkedPipelineGenerator": chunked_pipeline_generator_approach, + # "ChunkedPipelineSimple": chunked_pipeline_simple_approach, + # "ListComprehension": list_comprehension_approach, + # "MutatedChunkedGeneratorListComp": mutated_chunked_generator_listcomp_approach, + } + + for name, approach_func in approaches.items(): + tracemalloc.start() + + result = approach_func(generate_test_data(100_000)) + + current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + print( + f"{name:<12} Peak memory: {peak / 1024 / 1024:.2f} MB, " + f"Current: {current / 1024 / 1024:.2f} MB, " + f"Result length: {len(result)}" + ) + + +if __name__ == "__main__": + try: + run_performance_test() + run_memory_test() + print("\nšŸŽ‰ ChunkedPipeline test completed successfully!") + except KeyboardInterrupt: + print("\nāš ļø Test interrupted by user") + except Exception as e: + print(f"\nāŒ Test failed with error: {e}") + raise diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..1cbe1fc --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,221 @@ +"""Integration tests for Pipeline and Transformer working together.""" + +from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.transformer import PipelineContext +from efemel.pipeline.transformers.transformer import Transformer + + +class TestPipelineTransformerIntegration: + """Test Pipeline and Transformer integration.""" + + def test_basic_integration(self): + """Test basic pipeline and transformer integration.""" + # Create a transformer that doubles and filters + transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 5) + + # Apply to pipeline + result = Pipeline([1, 2, 3, 4, 5]).apply(transformer).to_list() + assert result == [6, 8, 10] + + def test_context_sharing(self): + """Test that context is properly shared in transformations.""" + context = PipelineContext({"multiplier": 3, "threshold": 5}) + + transformer = ( + Transformer(context=context).map(lambda x, ctx: x * ctx["multiplier"]).filter(lambda x, ctx: x > ctx["threshold"]) + ) + + result = Pipeline([1, 2, 3]).apply(transformer).to_list() + assert result == [6, 9] # [3, 6, 9] filtered to [6, 9] + + def test_reduce_integration(self): + """Test reduce operation with pipeline.""" + transformer = Transformer.init(int).map(lambda x: x * 2) + reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) + + result = list(reducer([1, 2, 3, 4])) + assert result == [20] # [2, 4, 6, 8] summed = 20 + + def test_complex_chain_integration(self): + """Test complex chain of operations.""" + # Transform: double -> flatten -> filter -> tap + side_effects = [] + + transformer = ( + Transformer.init(int) + .map(lambda x: [x, x * 2]) # Create pairs + .flatten() # Flatten to single list + .filter(lambda x: x % 3 == 0) # Keep multiples of 3 + .tap(lambda x: side_effects.append(f"processed: {x}")) + ) + + result = Pipeline([1, 2, 3, 6]).apply(transformer).to_list() + + # Expected: [1,2,2,4,3,6,6,12] -> [3,6,6,12] (multiples of 3) + assert result == [3, 6, 6, 12] + assert side_effects == ["processed: 3", "processed: 6", "processed: 6", "processed: 12"] + + def test_multiple_transformer_applications(self): + """Test applying multiple transformers in sequence.""" + t1 = Transformer.init(int).map(lambda x: x * 2) + t2 = Transformer.init(int).filter(lambda x: x > 5) + t3 = Transformer.init(int).map(lambda x: x + 1) + + result = ( + Pipeline([1, 2, 3, 4, 5]) + .apply(t1) # [2, 4, 6, 8, 10] + .apply(t2) # [6, 8, 10] + .apply(t3) # [7, 9, 11] + .to_list() + ) + + assert result == [7, 9, 11] + + def test_transform_shorthand_integration(self): + """Test transform shorthand method integration.""" + result = ( + Pipeline([1, 2, 3, 4, 5]) + .transform(lambda t: t.map(lambda x: x * 3)) + .transform(lambda t: t.filter(lambda x: x > 6)) + .to_list() + ) + + assert result == [9, 12, 15] # [3, 6, 9, 12, 15] -> [9, 12, 15] + + def test_mixed_operations(self): + """Test mixing transform shorthand and apply methods.""" + transformer = Transformer.init(int).filter(lambda x: x < 10) + + result = ( + Pipeline([1, 2, 3, 4, 5]) + .transform(lambda t: t.map(lambda x: x * 2)) # [2, 4, 6, 8, 10] + .apply(transformer) # [2, 4, 6, 8] + .transform(lambda t: t.map(lambda x: x + 1)) # [3, 5, 7, 9] + .to_list() + ) + + assert result == [3, 5, 7, 9] + + +class TestPipelineDataProcessingPatterns: + """Test common data processing patterns.""" + + def test_etl_pattern(self): + """Test Extract-Transform-Load pattern.""" + # Simulate raw data extraction + raw_data = [ + {"name": "Alice", "age": 25, "salary": 50000}, + {"name": "Bob", "age": 30, "salary": 60000}, + {"name": "Charlie", "age": 35, "salary": 70000}, + {"name": "David", "age": 28, "salary": 55000}, + ] + + # Transform: extract names of people over 28 with salary > 55000 + result = ( + Pipeline(raw_data) + .transform(lambda t: t.filter(lambda x: x["age"] > 28 and x["salary"] > 55000)) + .transform(lambda t: t.map(lambda x: x["name"])) + .to_list() + ) + + assert result == ["Bob", "Charlie"] + + def test_data_aggregation_pattern(self): + """Test data aggregation pattern.""" + # Group and count pattern + data = ["apple", "banana", "apple", "cherry", "banana", "apple"] + + # Count occurrences + counts = {} + (Pipeline(data).transform(lambda t: t.tap(lambda x: counts.update({x: counts.get(x, 0) + 1}))).consume()) + + assert counts == {"apple": 3, "banana": 2, "cherry": 1} + + def test_map_reduce_pattern(self): + """Test map-reduce pattern.""" + # Map: square numbers, Reduce: sum them + transformer = Transformer.init(int).map(lambda x: x * x) # Square + + reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) # Sum + + result = list(reducer([1, 2, 3, 4, 5])) + assert result == [55] # 1 + 4 + 9 + 16 + 25 = 55 + + def test_filtering_and_transformation_pattern(self): + """Test filtering and transformation pattern.""" + # Process only even numbers, double them, then sum + even_numbers = [] + transformer = ( + Transformer.init(int).filter(lambda x: x % 2 == 0).tap(lambda x: even_numbers.append(x)).map(lambda x: x * 2) + ) + + result = Pipeline(range(1, 11)).apply(transformer).to_list() + + assert even_numbers == [2, 4, 6, 8, 10] + assert result == [4, 8, 12, 16, 20] + + def test_data_validation_pattern(self): + """Test data validation pattern.""" + # Validate and clean data + raw_data = [1, "2", 3.0, "invalid", 5, None, 7] + + valid_numbers = [] + + def validate_and_convert(x): + try: + num = float(x) if x is not None else None + if num is not None and not str(x).lower() == "invalid": + valid_numbers.append(num) + return int(num) + return None + except (ValueError, TypeError): + return None + + result = ( + Pipeline(raw_data) + .transform(lambda t: t.map(validate_and_convert)) + .transform(lambda t: t.filter(lambda x: x is not None)) + .to_list() + ) + + assert result == [1, 2, 3, 5, 7] + assert valid_numbers == [1.0, 2.0, 3.0, 5.0, 7.0] + + +class TestPipelinePerformanceIntegration: + """Test performance aspects of pipeline and transformer integration.""" + + def test_large_dataset_chunked_processing(self): + """Test chunked processing with large datasets.""" + # Process 10,000 numbers with small chunk size + transformer = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x % 1000 == 0) + + large_data = range(10000) + result = Pipeline(large_data).apply(transformer).to_list() + + # Should get multiples of 500 doubled (0, 1000, 2000, ..., 18000) + expected = [x * 2 for x in range(0, 10000, 500)] + assert result == expected + + def test_memory_efficient_processing(self): + """Test memory-efficient lazy processing.""" + # Process large dataset but only take first few results + transformer = Transformer.init(int).map(lambda x: x**2).filter(lambda x: x > 100) + + large_data = range(1000) + result = Pipeline(large_data).apply(transformer).first(5) + + # First 5 squares > 100: 121, 144, 169, 196, 225 + assert result == [121, 144, 169, 196, 225] + + def test_streaming_processing(self): + """Test streaming-like processing pattern.""" + # Simulate processing data as it arrives + batches = [[1, 2], [3, 4], [5, 6]] + results = [] + + for batch in batches: + batch_result = Pipeline(batch).transform(lambda t: t.map(lambda x: x * 3)).to_list() + results.extend(batch_result) + + assert results == [3, 6, 9, 12, 15, 18] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 261d971..6de9a51 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,722 +1,242 @@ -""" -Test suite for the Pipeline class. +"""Tests for the Pipeline class.""" -This module contains comprehensive tests for the Pipeline class functionality -including all methods, edge cases, and error conditions. -""" - -import pytest - -from efemel.pipeline import Pipeline +from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.transformer import Transformer class TestPipelineBasics: - """Test basic Pipeline functionality.""" - - def test_pipeline_initialization(self): - """Test Pipeline initialization with various iterables.""" - # Test with list - pipeline = Pipeline([1, 2, 3, 4, 5]) - assert isinstance(pipeline, Pipeline) - # Generator should be a generator object, not the original list - from types import GeneratorType - - assert isinstance(pipeline.generator, GeneratorType) - - # Test with tuple - pipeline = Pipeline((1, 2, 3)) - assert list(pipeline) == [1, 2, 3] + """Test basic pipeline functionality.""" - # Test with generator - def gen(): - yield from range(1, 4) - - pipeline = Pipeline(gen()) - assert list(pipeline) == [1, 2, 3] - - # Test with empty list - pipeline = Pipeline([]) - assert list(pipeline) == [] - - def test_pipeline_iteration(self): - """Test Pipeline iteration behavior.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Test iteration - result = [] - for item in pipeline: - result.append(item) - - assert result == [1, 2, 3, 4, 5] - - def test_to_list(self): - """Test Pipeline.to_list() method.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) + def test_pipeline_creation_from_single_iterable(self): + """Test creating pipeline from single iterable.""" + pipeline = Pipeline([1, 2, 3]) result = pipeline.to_list() + assert result == [1, 2, 3] + def test_pipeline_creation_from_multiple_iterables(self): + """Test creating pipeline from multiple iterables.""" + pipeline = Pipeline([1, 2], [3, 4], [5]) + result = pipeline.to_list() assert result == [1, 2, 3, 4, 5] - assert isinstance(result, list) - - # Test with empty pipeline - empty_pipeline = Pipeline([]) - assert empty_pipeline.to_list() == [] - - def test_first(self): - """Test Pipeline.first() method.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - assert pipeline.first() == 1 - - # Test with string - str_pipeline = Pipeline(["hello", "world"]) - assert str_pipeline.first() == "hello" - - def test_first_empty_pipeline(self): - """Test Pipeline.first() with empty pipeline.""" - empty_pipeline = Pipeline([]) - with pytest.raises(StopIteration): - empty_pipeline.first() - - -class TestPipelineFiltering: - """Test Pipeline filtering functionality.""" - - def test_filter_basic(self): - """Test basic filtering operations.""" - # Filter even numbers - pipeline1 = Pipeline([1, 2, 3, 4, 5]) - even_pipeline = pipeline1.filter(lambda x: x % 2 == 0) - assert even_pipeline.to_list() == [2, 4] - - # Filter numbers greater than 3 - use fresh pipeline - pipeline2 = Pipeline([1, 2, 3, 4, 5]) - gt3_pipeline = pipeline2.filter(lambda x: x > 3) - assert gt3_pipeline.to_list() == [4, 5] - - def test_filter_with_strings(self): - """Test filtering with string data.""" - # Filter strings longer than 4 characters - pipeline1 = Pipeline(["hello", "world", "python", "test"]) - long_strings = pipeline1.filter(lambda s: len(s) > 4) - assert long_strings.to_list() == ["hello", "world", "python"] - - # Filter strings starting with 'p' - use fresh pipeline - pipeline2 = Pipeline(["hello", "world", "python", "test"]) - p_strings = pipeline2.filter(lambda s: s.startswith("p")) - assert p_strings.to_list() == ["python"] - - def test_filter_empty_result(self): - """Test filter that results in empty pipeline.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Filter that matches nothing - empty_result = pipeline.filter(lambda x: x > 10) - assert empty_result.to_list() == [] - - def test_filter_all_pass(self): - """Test filter where all items pass.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Filter that passes everything - all_pass = pipeline.filter(lambda x: True) - assert all_pass.to_list() == [1, 2, 3, 4, 5] - - def test_filter_chaining(self): - """Test chaining multiple filters.""" - pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) - - # Chain filters: even numbers and greater than 5 - result = pipeline.filter(lambda x: x % 2 == 0).filter(lambda x: x > 5) - assert result.to_list() == [6, 8, 10] - - -class TestPipelineMapping: - """Test Pipeline mapping functionality.""" - - def test_map_basic(self): - """Test basic mapping operations.""" - # Double each number - pipeline1 = Pipeline([1, 2, 3, 4, 5]) - doubled = pipeline1.map(lambda x: x * 2) - assert doubled.to_list() == [2, 4, 6, 8, 10] - - # Square each number - use fresh pipeline - pipeline2 = Pipeline([1, 2, 3, 4, 5]) - squared = pipeline2.map(lambda x: x**2) - assert squared.to_list() == [1, 4, 9, 16, 25] - - def test_map_type_transformation(self): - """Test mapping with type transformation.""" - # Convert numbers to strings - pipeline1 = Pipeline([1, 2, 3, 4, 5]) - str_pipeline = pipeline1.map(str) - assert str_pipeline.to_list() == ["1", "2", "3", "4", "5"] - - # Convert to boolean (non-zero is True) - use fresh pipeline - pipeline2 = Pipeline([1, 2, 3, 4, 5]) - bool_pipeline = pipeline2.map(bool) - assert bool_pipeline.to_list() == [True, True, True, True, True] - - def test_map_with_strings(self): - """Test mapping with string data.""" - # Convert to uppercase - pipeline1 = Pipeline(["hello", "world", "python"]) - upper_pipeline = pipeline1.map(str.upper) - assert upper_pipeline.to_list() == ["HELLO", "WORLD", "PYTHON"] - - # Get string lengths - use fresh pipeline - pipeline2 = Pipeline(["hello", "world", "python"]) - len_pipeline = pipeline2.map(len) - assert len_pipeline.to_list() == [5, 5, 6] - - def test_map_chaining(self): - """Test chaining multiple map operations.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Chain maps: double, then add 1 - result = pipeline.map(lambda x: x * 2).map(lambda x: x + 1) - assert result.to_list() == [3, 5, 7, 9, 11] - - def test_map_empty_pipeline(self): - """Test mapping on empty pipeline.""" - empty_pipeline = Pipeline([]) - - # Map should return empty result - result = empty_pipeline.map(lambda x: x * 2) - assert result.to_list() == [] - -class TestPipelineMapFilter: - """Test Pipeline map and filter combinations.""" - - def test_map_then_filter(self): - """Test mapping then filtering.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Double numbers, then filter even results - result = pipeline.map(lambda x: x * 2).filter(lambda x: x % 2 == 0) - assert result.to_list() == [2, 4, 6, 8, 10] - - def test_filter_then_map(self): - """Test filtering then mapping.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Filter even numbers, then double them - result = pipeline.filter(lambda x: x % 2 == 0).map(lambda x: x * 2) - assert result.to_list() == [4, 8] - - def test_complex_map_filter_chain(self): - """Test complex chaining of map and filter operations.""" - pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) - - # Complex chain: filter odd, multiply by 3, filter > 10 - result = ( - pipeline.filter(lambda x: x % 2 == 1) # [1, 3, 5, 7, 9] - .map(lambda x: x * 3) # [3, 9, 15, 21, 27] - .filter(lambda x: x > 10) - ) # [15, 21, 27] - - assert result.to_list() == [15, 21, 27] + def test_pipeline_iteration(self): + """Test that pipeline can be iterated over.""" + pipeline = Pipeline([1, 2, 3]) + result = list(pipeline) + assert result == [1, 2, 3] + def test_pipeline_to_list_multiple_calls(self): + """Test that to_list can be called multiple times (iterator consumption).""" + pipeline = Pipeline([1, 2, 3]) + first_result = pipeline.to_list() + second_result = pipeline.to_list() + # Note: This tests current behavior - iterator is consumed after first call + assert first_result == [1, 2, 3] + assert second_result == [] # Iterator is consumed -class TestPipelineReduce: - """Test Pipeline reduce functionality.""" - def test_reduce_basic(self): - """Test basic reduce operations.""" - # Sum all numbers - pipeline1 = Pipeline([1, 2, 3, 4, 5]) - sum_result = pipeline1.reduce(lambda acc, x: acc + x, 0) - assert sum_result.first() == 15 +class TestPipelineApply: + """Test apply method with transformers.""" - # Multiply all numbers - use fresh pipeline - pipeline2 = Pipeline([1, 2, 3, 4, 5]) - product_result = pipeline2.reduce(lambda acc, x: acc * x, 1) - assert product_result.first() == 120 + def test_apply_transformer(self): + """Test apply with chained transformer operations.""" + transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 4) + pipeline = Pipeline([1, 2, 3, 4]).apply(transformer) + result = pipeline.to_list() + assert result == [6, 8] - def test_reduce_with_strings(self): - """Test reduce with string data.""" - # Concatenate strings - pipeline1 = Pipeline(["hello", "world", "python"]) - concat_result = pipeline1.reduce(lambda acc, x: acc + " " + x, "") - assert concat_result.first() == " hello world python" + def test_apply_callable_function(self): + """Test apply with a callable function.""" - # Join with commas - use fresh pipeline - pipeline2 = Pipeline(["hello", "world", "python"]) - join_result = pipeline2.reduce(lambda acc, x: acc + "," + x if acc else x, "") - assert join_result.first() == "hello,world,python" + def double_generator(data): + for item in data: + yield item * 2 - def test_reduce_empty_pipeline(self): - """Test reduce on empty pipeline.""" - empty_pipeline = Pipeline([]) + pipeline = Pipeline([1, 2, 3]).apply(double_generator) + result = pipeline.to_list() + assert result == [2, 4, 6] - # Reduce should return initial value - result = empty_pipeline.reduce(lambda acc, x: acc + x, 10) - assert result.first() == 10 - def test_reduce_single_item(self): - """Test reduce with single item.""" - single_pipeline = Pipeline([42]) +class TestPipelineTransform: + """Test transform shorthand method.""" - # Should combine initial value with single item - result = single_pipeline.reduce(lambda acc, x: acc + x, 10) - assert result.first() == 52 + def test_transform_chain_operations(self): + """Test transform with chained operations.""" + pipeline = Pipeline([1, 2, 3, 4]).transform(lambda t: t.map(lambda x: x * 2).filter(lambda x: x > 4)) + result = pipeline.to_list() + assert result == [6, 8] + def test_transform_with_custom_transformer(self): + """Test transform with custom transformer class.""" -class TestPipelineTap: - """Test Pipeline tap functionality.""" + def custom_transform(transformer: Transformer[int, int]) -> Transformer[int, int]: + return transformer.map(lambda x: x + 10).filter(lambda x: x > 12) - def test_tap_basic(self): - """Test basic tap operations.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - side_effects = [] + pipeline = Pipeline([1, 2, 3]).transform(custom_transform) + result = pipeline.to_list() + assert result == [13] # [11, 12, 13] filtered to [13] - # Tap to collect side effects - result = pipeline.tap(side_effects.append) - result_list = result.to_list() - assert result_list == [1, 2, 3, 4, 5] - assert side_effects == [1, 2, 3, 4, 5] +class TestPipelineTerminalOperations: + """Test terminal operations that consume the pipeline.""" - def test_tap_with_print(self): - """Test tap with print (no assertion needed, just verify it works).""" + def test_each_applies_function_to_all_elements(self): + """Test each applies function to all elements.""" + results = [] pipeline = Pipeline([1, 2, 3]) + pipeline.each(lambda x: results.append(x * 2)) + assert results == [2, 4, 6] - # This should not raise any exceptions - result = pipeline.tap(lambda x: None) # Mock print - assert result.to_list() == [1, 2, 3] - - def test_tap_chaining(self): - """Test tap in a chain of operations.""" + def test_first_gets_first_n_elements(self): + """Test first gets first n elements.""" pipeline = Pipeline([1, 2, 3, 4, 5]) - side_effects = [] - - # Tap in middle of chain - result = pipeline.map(lambda x: x * 2).tap(side_effects.append).filter(lambda x: x > 5) + result = pipeline.first(3) + assert result == [1, 2, 3] - result_list = result.to_list() - - assert result_list == [6, 8, 10] - assert side_effects == [2, 4, 6, 8, 10] - - def test_tap_doesnt_modify_pipeline(self): - """Test that tap doesn't modify the pipeline data.""" + def test_first_default_gets_one_element(self): + """Test first with default argument gets one element.""" pipeline = Pipeline([1, 2, 3, 4, 5]) + result = pipeline.first() + assert result == [1] - # Tap with function that would modify if it could - result = pipeline.tap(lambda x: x * 1000) - - # Data should be unchanged - assert result.to_list() == [1, 2, 3, 4, 5] - - -class TestPipelineEach: - """Test Pipeline each functionality.""" + def test_first_with_small_dataset(self): + """Test first when requesting more elements than available.""" + pipeline = Pipeline([1, 2]) + result = pipeline.first(5) + assert result == [1, 2] - def test_each_basic(self): - """Test basic each operations.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) + def test_consume_processes_without_return(self): + """Test consume processes all elements without returning anything.""" side_effects = [] + transformer = Transformer.init(int).tap(lambda x: side_effects.append(x)) + pipeline = Pipeline([1, 2, 3]).apply(transformer) - # Each should execute function for each item - pipeline.each(side_effects.append) - - assert side_effects == [1, 2, 3, 4, 5] - - def test_each_returns_none(self): - """Test that each returns None.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - result = pipeline.each(lambda x: None) + result = pipeline.consume() assert result is None + assert side_effects == [1, 2, 3] - def test_each_with_empty_pipeline(self): - """Test each with empty pipeline.""" - empty_pipeline = Pipeline([]) - side_effects = [] - - # Should not execute function - empty_pipeline.each(side_effects.append) - - assert side_effects == [] - - def test_each_terminal_operation(self): - """Test that each is a terminal operation.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - side_effects = [] - - # After each, pipeline should be consumed - pipeline.each(side_effects.append) - - assert side_effects == [1, 2, 3, 4, 5] - - -class TestPipelineUtility: - """Test Pipeline utility methods.""" - - def test_passthrough(self): - """Test passthrough method.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Passthrough should return the same pipeline - result = pipeline.passthrough() - assert result is pipeline - - # Data should be unchanged - assert result.to_list() == [1, 2, 3, 4, 5] - - def test_apply_function(self): - """Test apply with single function.""" + def test_to_list_returns_all_elements(self): + """Test to_list returns all processed elements.""" pipeline = Pipeline([1, 2, 3, 4, 5]) + result = pipeline.to_list() + assert result == [1, 2, 3, 4, 5] - def double_pipeline(p): - return p.map(lambda x: x * 2) - - result = pipeline.apply(double_pipeline) - assert result.to_list() == [2, 4, 6, 8, 10] - - def test_flatten_basic(self): - """Test basic flatten operation.""" - pipeline = Pipeline([[1, 2], [3, 4], [5]]) - - result = pipeline.flatten() - assert result.to_list() == [1, 2, 3, 4, 5] - - def test_flatten_empty_lists(self): - """Test flatten with empty lists.""" - pipeline = Pipeline([[], [1, 2], [], [3]]) - - result = pipeline.flatten() - assert result.to_list() == [1, 2, 3] - - def test_flatten_nested_tuples(self): - """Test flatten with nested tuples.""" - pipeline = Pipeline([(1, 2), (3, 4, 5), (6,)]) - - result = pipeline.flatten() - assert result.to_list() == [1, 2, 3, 4, 5, 6] - - def test_flatten_chunked_processing(self): - """Test that flatten maintains chunked processing and doesn't consume entire pipeline.""" - - # Create a large dataset that would cause OOM if consumed all at once - def generate_nested_data(): - for i in range(1000): - yield [i * 2, i * 2 + 1] - - # Use small chunk size to verify chunked processing - pipeline = Pipeline(generate_nested_data(), chunk_size=10) - result = pipeline.flatten() - - # Verify first few elements without consuming the entire pipeline - first_10 = [] - count = 0 - for item in result: - first_10.append(item) - count += 1 - if count >= 10: - break - - assert first_10 == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - - def test_flatten_preserves_chunk_structure(self): - """Test that flatten preserves chunked structure.""" - # Create pipeline with known chunk structure - data = [[1, 2], [3, 4], [5, 6], [7, 8]] - pipeline = Pipeline(data, chunk_size=2) # 2 sublists per chunk - - result = pipeline.flatten() - - # Verify the result is correct - assert result.to_list() == [1, 2, 3, 4, 5, 6, 7, 8] - - def test_flatten_maintains_chunk_size(self): - """Test that flatten maintains the original chunk_size setting.""" - # Create test data with varying sizes of sublists - data = [[1, 2], [3, 4, 5], [6], [7, 8, 9, 10]] - chunk_size = 3 - pipeline = Pipeline(data, chunk_size=chunk_size) - - # Flatten the pipeline - flattened = pipeline.flatten() - - # Verify chunk_size is preserved - assert flattened.chunk_size == chunk_size - - # Collect chunks to verify structure - chunks = [] - for chunk in flattened.generator: - chunks.append(chunk) - # Verify all chunks except the last have the expected size - for i, chunk in enumerate(chunks[:-1]): - assert len(chunk) == chunk_size, f"Chunk {i} has size {len(chunk)}, expected {chunk_size}" +class TestPipelineChaining: + """Test chaining pipeline operations.""" - # Last chunk can be smaller than chunk_size - if chunks: - assert len(chunks[-1]) <= chunk_size, f"Last chunk has size {len(chunks[-1])}, should be <= {chunk_size}" + def test_apply_then_terminal_operation(self): + """Test applying transformer then using terminal operation.""" + transformer = Transformer.init(int).map(lambda x: x * 2) + pipeline = Pipeline([1, 2, 3]).apply(transformer) + result = pipeline.first(2) + assert result == [2, 4] - # Verify the final result is correct - result = [] - for chunk in chunks: - result.extend(chunk) - assert result == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + def test_multiple_transforms(self): + """Test applying multiple transforms.""" + pipeline = ( + Pipeline([1, 2, 3, 4]).transform(lambda t: t.map(lambda x: x * 2)).transform(lambda t: t.filter(lambda x: x > 4)) + ) + result = pipeline.to_list() + assert result == [6, 8] + def test_transform_then_apply(self): + """Test transform followed by apply.""" + transformer = Transformer.init(int).filter(lambda x: x > 4) + pipeline = Pipeline([1, 2, 3, 4, 5]).transform(lambda t: t.map(lambda x: x * 2)).apply(transformer) + result = pipeline.to_list() + assert result == [6, 8, 10] -class TestPipelineEdgeCases: - """Test Pipeline edge cases and error conditions.""" - def test_pipeline_with_none_values(self): - """Test pipeline with None values.""" - # Should handle None values properly - pipeline1 = Pipeline([1, None, 3, None, 5]) - result = pipeline1.to_list() - assert result == [1, None, 3, None, 5] +class TestPipelineDataTypes: + """Test pipeline with different data types.""" - # Filter out None values - use fresh pipeline - pipeline2 = Pipeline([1, None, 3, None, 5]) - no_none = pipeline2.filter(lambda x: x is not None) - assert no_none.to_list() == [1, 3, 5] + def test_pipeline_with_strings(self): + """Test pipeline with string data.""" + pipeline = Pipeline(["hello", "world"]).transform(lambda t: t.map(lambda x: x.upper())) + result = pipeline.to_list() + assert result == ["HELLO", "WORLD"] - def test_pipeline_with_mixed_types(self): + def test_pipeline_with_mixed_data(self): """Test pipeline with mixed data types.""" - # Should handle mixed types - pipeline1 = Pipeline([1, "hello", 3.14, True, None]) - result = pipeline1.to_list() - assert result == [1, "hello", 3.14, True, None] - - # Filter by type (excluding boolean which is a subclass of int) - use fresh pipeline - pipeline2 = Pipeline([1, "hello", 3.14, True, None]) - numbers = pipeline2.filter(lambda x: isinstance(x, int | float) and not isinstance(x, bool) and x is not None) - assert numbers.to_list() == [1, 3.14] - - def test_pipeline_reuse(self): - """Test that pipelines can be reused.""" - original_data = [1, 2, 3, 4, 5] - pipeline = Pipeline(original_data) - - # First use - result1 = pipeline.map(lambda x: x * 2).to_list() - assert result1 == [2, 4, 6, 8, 10] - - # Create new pipeline from same data - pipeline2 = Pipeline(original_data) - result2 = pipeline2.filter(lambda x: x % 2 == 0).to_list() - assert result2 == [2, 4] - - def test_pipeline_method_chaining_returns_new_instances(self): - """Test that pipeline methods return new instances.""" - original = Pipeline([1, 2, 3, 4, 5]) - - # Methods should return new instances (except passthrough) - mapped = original.map(lambda x: x * 2) - filtered = original.filter(lambda x: x % 2 == 0) - - assert mapped is not original - assert filtered is not original - assert mapped is not filtered - - def test_pipeline_with_generator_exhaustion(self): - """Test pipeline behavior with generator exhaustion.""" - - def number_generator(): - yield from range(3) - - pipeline = Pipeline(number_generator()) - - # First consumption - result1 = pipeline.to_list() - assert result1 == [0, 1, 2] - - # Generator should be exhausted now - # This behavior depends on the implementation - - -class TestPipelineIntegration: - """Integration tests for Pipeline class.""" - - def test_data_processing_pipeline(self): - """Test a realistic data processing pipeline.""" - # Simulate processing a list of user data - users = [ - {"name": "Alice", "age": 30, "active": True}, - {"name": "Bob", "age": 25, "active": False}, - {"name": "Charlie", "age": 35, "active": True}, - {"name": "Diana", "age": 28, "active": True}, - {"name": "Eve", "age": 22, "active": False}, - ] - - pipeline = Pipeline(users) - - # Process: filter active users, extract names, convert to uppercase - result = pipeline.filter(lambda user: user["active"]).map(lambda user: user["name"]).map(str.upper) - - assert result.to_list() == ["ALICE", "CHARLIE", "DIANA"] - - def test_number_processing_pipeline(self): - """Test a number processing pipeline.""" - numbers = range(1, 21) # 1 to 20 - - pipeline = Pipeline(numbers) - - # Process: filter even numbers, square them, filter > 50, sum - result = ( - pipeline.filter(lambda x: x % 2 == 0) # [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] - .map(lambda x: x**2) # [4, 16, 36, 64, 100, 144, 196, 256, 324, 400] - .filter(lambda x: x > 50) # [64, 100, 144, 196, 256, 324, 400] - .reduce(lambda acc, x: acc + x, 0) - ) # 1484 - - assert result.first() == 1484 - - def test_text_processing_pipeline(self): - """Test a text processing pipeline.""" - text = "Hello world! This is a test. Python is amazing." - words = text.split() - - pipeline = Pipeline(words) - - # Process: filter words > 3 chars, remove punctuation, lowercase, get unique - result = ( - pipeline.filter(lambda word: len(word) > 3) - .map(lambda word: word.strip(".,!")) - .map(str.lower) - .filter(lambda word: word not in ["this"]) - ) # Simple "unique" filter - - expected = ["hello", "world", "test", "python", "amazing"] - assert result.to_list() == expected - - def test_nested_data_processing(self): - """Test processing nested data structures.""" - data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] - - pipeline = Pipeline(data) - - # Flatten, filter odd numbers, square them - result = ( - pipeline.flatten() # [1, 2, 3, 4, 5, 6, 7, 8, 9] - .filter(lambda x: x % 2 == 1) # [1, 3, 5, 7, 9] - .map(lambda x: x**2) - ) # [1, 9, 25, 49, 81] - - assert result.to_list() == [1, 9, 25, 49, 81] - - -class TestPipelineComposition: - """Test Pipeline composition functionality.""" - - def test_pipeline_from_pipeline_init(self): - """Test creating a Pipeline from another Pipeline using __init__.""" - # Create source pipeline - source_pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Create new pipeline from the source - new_pipeline = Pipeline(source_pipeline) - - # Test that new pipeline works (source gets exhausted, so test new one) - result = new_pipeline.to_list() - assert result == [1, 2, 3, 4, 5] - - def test_from_pipeline_class_method(self): - """Test creating a Pipeline using from_pipeline class method.""" - # Create source pipeline - source_pipeline = Pipeline([1, 2, 3, 4, 5]) - - # Create new pipeline using class method - new_pipeline = Pipeline.from_pipeline(source_pipeline) - - # Test that they produce the same results - assert new_pipeline.to_list() == [1, 2, 3, 4, 5] + pipeline = Pipeline([1, "hello", 3.14]).transform(lambda t: t.map(lambda x: str(x))) + result = pipeline.to_list() + assert result == ["1", "hello", "3.14"] - def test_chain_method(self): - """Test chaining multiple pipelines.""" - pipeline1 = Pipeline([1, 2, 3]) - pipeline2 = Pipeline([4, 5, 6]) - pipeline3 = Pipeline([7, 8, 9]) + def test_pipeline_with_complex_objects(self): + """Test pipeline with complex objects.""" + data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}] + pipeline = Pipeline(data).transform(lambda t: t.map(lambda x: x["name"])) + result = pipeline.to_list() + assert result == ["Alice", "Bob"] - # Chain them together - chained = Pipeline.chain(pipeline1, pipeline2, pipeline3) - # Should contain all elements in sequence - assert chained.to_list() == [1, 2, 3, 4, 5, 6, 7, 8, 9] +class TestPipelineEdgeCases: + """Test edge cases for pipeline.""" - def test_chain_empty_pipelines(self): - """Test chaining with empty pipelines.""" - pipeline1 = Pipeline([1, 2]) - empty_pipeline = Pipeline([]) - pipeline2 = Pipeline([3, 4]) + def test_empty_pipeline(self): + """Test pipeline with empty data.""" + pipeline = Pipeline([]) + result = pipeline.to_list() + assert result == [] + + def test_empty_pipeline_terminal_operations(self): + """Test terminal operations on empty pipeline.""" + # Test each + results = [] + pipeline_each = Pipeline([]) + pipeline_each.each(lambda x: results.append(x)) + assert results == [] + + # Test first + pipeline_first = Pipeline([]) + result = pipeline_first.first(5) + assert result == [] + + # Test consume + pipeline_consume = Pipeline([]) + result = pipeline_consume.consume() + assert result is None - chained = Pipeline.chain(pipeline1, empty_pipeline, pipeline2) - assert chained.to_list() == [1, 2, 3, 4] + def test_single_element_pipeline(self): + """Test pipeline with single element.""" + pipeline = Pipeline([42]) + result = pipeline.to_list() + assert result == [42] - def test_chain_single_pipeline(self): - """Test chaining with a single pipeline.""" + def test_pipeline_type_preservation(self): + """Test that pipeline preserves and transforms types correctly.""" + # Start with integers pipeline = Pipeline([1, 2, 3]) - chained = Pipeline.chain(pipeline) - - assert chained.to_list() == [1, 2, 3] - - def test_pipeline_composition_with_operations(self): - """Test that pipeline composition works with transformations.""" - # Create source pipeline with transformations - source = Pipeline([1, 2, 3, 4, 5]).map(lambda x: x * 2) - - # Create new pipeline from transformed source - new_pipeline = Pipeline.from_pipeline(source) - filtered = new_pipeline.filter(lambda x: x > 5) - - assert filtered.to_list() == [6, 8, 10] - - def test_chain_with_different_types(self): - """Test chaining pipelines with different but compatible types.""" - numbers = Pipeline([1, 2, 3]) - strings = Pipeline(["4", "5", "6"]) - - # Chain and then transform to make types consistent - chained = Pipeline.chain(numbers, strings) - all_strings = chained.map(str) - - assert all_strings.to_list() == ["1", "2", "3", "4", "5", "6"] + int_result = pipeline.to_list() + assert all(isinstance(x, int) for x in int_result) - def test_complex_pipeline_composition(self): - """Test complex pipeline composition scenario.""" - # Create multiple source pipelines - evens = Pipeline([2, 4, 6, 8]) - odds = Pipeline([1, 3, 5, 7]) + # Transform to strings + pipeline = Pipeline([1, 2, 3]).transform(lambda t: t.map(lambda x: str(x))) + str_result = pipeline.to_list() + assert all(isinstance(x, str) for x in str_result) + assert str_result == ["1", "2", "3"] - # Chain them - all_numbers = Pipeline.chain(evens, odds) - # Apply transformations - result = all_numbers.filter(lambda x: x > 3).map(lambda x: x**2).reduce(lambda acc, x: acc + x, 0) +class TestPipelinePerformance: + """Test pipeline performance characteristics.""" - # Expected: [4, 6, 8, 5, 7] -> [16, 36, 64, 25, 49] -> 190 - assert result.first() == 190 - - -class TestPipelineConcurrency: - """Test Pipeline concurrency functionality.""" - - def test_concurrent(self): - # Create a simple pipeline - - def _double_pipeline(p: Pipeline[int]): - return p.map(lambda x: x * 2) - - result = Pipeline(range(0, 5), 1).concurrent(_double_pipeline, 5).to_list() - - # Check if all numbers are doubled - assert result == [0, 2, 4, 6, 8] - - def test_concurrent_consumer(self): - # Create a simple pipeline - - items = [] + def test_large_dataset_processing(self): + """Test pipeline can handle large datasets.""" + large_data = list(range(10000)) + pipeline = Pipeline(large_data).transform(lambda t: t.map(lambda x: x * 2).filter(lambda x: x % 100 == 0)) + result = pipeline.to_list() - def dummy_consumer(p: Pipeline[int]): - return p.tap(lambda x: items.append(x)) + # Should have every 50th element doubled (0, 100, 200, ..., 19800) + expected = [x * 2 for x in range(0, 10000, 50)] + assert result == expected - Pipeline(range(0, 5), 1).concurrent(dummy_consumer, 5).noop() + def test_chunked_processing(self): + """Test that chunked processing works correctly.""" + # Use small chunk size to test chunking behavior + transformer = Transformer.init(int, chunk_size=10).map(lambda x: x + 1) + pipeline = Pipeline(list(range(100))).apply(transformer) + result = pipeline.to_list() - # Check if all numbers are consumed - assert items == [0, 1, 2, 3, 4] + expected = list(range(1, 101)) # [1, 2, 3, ..., 100] + assert result == expected diff --git a/tests/test_transformer.py b/tests/test_transformer.py new file mode 100644 index 0000000..b01109a --- /dev/null +++ b/tests/test_transformer.py @@ -0,0 +1,211 @@ +"""Tests for the Transformer class.""" + +from efemel.pipeline.transformers.transformer import PipelineContext +from efemel.pipeline.transformers.transformer import Transformer + + +class TestTransformerBasics: + """Test basic transformer functionality.""" + + def test_init_creates_identity_transformer(self): + """Test that init creates an identity transformer.""" + transformer = Transformer.init(int) + result = list(transformer([1, 2, 3])) + assert result == [1, 2, 3] + + def test_init_with_chunk_size(self): + """Test init with custom chunk size.""" + transformer = Transformer.init(int, chunk_size=2) + assert transformer.chunk_size == 2 + result = list(transformer([1, 2, 3, 4])) + assert result == [1, 2, 3, 4] + + def test_init_with_context(self): + """Test init with custom context.""" + context = PipelineContext({"key": "value"}) + transformer = Transformer(context=context) + assert transformer.context == context + + def test_call_executes_transformer(self): + """Test that calling transformer executes it on data.""" + transformer = Transformer.init(int) + result = list(transformer([1, 2, 3])) + assert result == [1, 2, 3] + + +class TestTransformerOperations: + """Test transformer operations like map, filter, etc.""" + + def test_map_transforms_elements(self): + """Test map transforms each element.""" + transformer = Transformer.init(int).map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) + assert result == [2, 4, 6] + + def test_map_with_context_aware_function(self): + """Test map with context-aware function.""" + context = PipelineContext({"multiplier": 3}) + transformer = Transformer(context=context).map(lambda x, ctx: x * ctx["multiplier"]) + result = list(transformer([1, 2, 3])) + assert result == [3, 6, 9] + + def test_filter_keeps_matching_elements(self): + """Test filter keeps only matching elements.""" + transformer = Transformer.init(int).filter(lambda x: x % 2 == 0) + result = list(transformer([1, 2, 3, 4, 5, 6])) + assert result == [2, 4, 6] + + def test_filter_with_context_aware_function(self): + """Test filter with context-aware function.""" + context = PipelineContext({"threshold": 3}) + transformer = Transformer(context=context).filter(lambda x, ctx: x > ctx["threshold"]) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [4, 5] + + def test_flatten_list_of_lists(self): + """Test flatten with list of lists.""" + transformer = Transformer.init(list).flatten() + result = list(transformer([[1, 2], [3, 4], [5]])) + assert result == [1, 2, 3, 4, 5] + + def test_flatten_list_of_tuples(self): + """Test flatten with list of tuples.""" + transformer = Transformer.init(tuple).flatten() + result = list(transformer([(1, 2), (3, 4), (5,)])) + assert result == [1, 2, 3, 4, 5] + + def test_flatten_list_of_sets(self): + """Test flatten with list of sets.""" + transformer = Transformer.init(set).flatten() + result = list(transformer([{1, 2}, {3, 4}, {5}])) + # Sets are unordered, so we sort for comparison + assert sorted(result) == [1, 2, 3, 4, 5] + + def test_tap_applies_side_effect_without_modification(self): + """Test tap applies side effect without modifying data.""" + side_effects = [] + transformer = Transformer.init(int).tap(lambda x: side_effects.append(x)) + result = list(transformer([1, 2, 3])) + + assert result == [1, 2, 3] # Data unchanged + assert side_effects == [1, 2, 3] # Side effect applied + + def test_tap_with_context_aware_function(self): + """Test tap with context-aware function.""" + side_effects = [] + context = PipelineContext({"prefix": "item:"}) + transformer = Transformer(context=context).tap(lambda x, ctx: side_effects.append(f"{ctx['prefix']}{x}")) + result = list(transformer([1, 2, 3])) + + assert result == [1, 2, 3] + assert side_effects == ["item:1", "item:2", "item:3"] + + def test_apply_composes_transformers(self): + """Test apply composes transformers.""" + transformer1 = Transformer.init(int).map(lambda x: x * 2) + transformer2 = transformer1.apply(lambda t: t.filter(lambda x: x > 4)) + result = list(transformer2([1, 2, 3, 4])) + assert result == [6, 8] # [2, 4, 6, 8] filtered to [6, 8] + + +class TestTransformerChaining: + """Test chaining multiple transformer operations.""" + + def test_map_then_filter(self): + """Test map followed by filter.""" + transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 4) + result = list(transformer([1, 2, 3, 4])) + assert result == [6, 8] + + def test_filter_then_map(self): + """Test filter followed by map.""" + transformer = Transformer.init(int).filter(lambda x: x % 2 == 0).map(lambda x: x * 3) + result = list(transformer([1, 2, 3, 4, 5, 6])) + assert result == [6, 12, 18] + + def test_map_flatten_filter(self): + """Test map, flatten, then filter.""" + transformer = Transformer.init(int).map(lambda x: [x, x * 2]).flatten().filter(lambda x: x > 3) + result = list(transformer([1, 2, 3])) + assert result == [4, 6] # [[1,2], [2,4], [3,6]] -> [1,2,2,4,3,6] -> [4,6] + + def test_complex_chain_with_tap(self): + """Test complex chain with tap for side effects.""" + side_effects = [] + transformer = ( + Transformer.init(int) + .map(lambda x: x * 2) + .tap(lambda x: side_effects.append(f"doubled: {x}")) + .filter(lambda x: x > 4) + .tap(lambda x: side_effects.append(f"filtered: {x}")) + ) + + result = list(transformer([1, 2, 3, 4])) + assert result == [6, 8] + assert side_effects == ["doubled: 2", "doubled: 4", "doubled: 6", "doubled: 8", "filtered: 6", "filtered: 8"] + + +class TestTransformerTerminalOperations: + """Test terminal operations like reduce.""" + + def test_reduce_sums_elements(self): + """Test reduce sums all elements.""" + transformer = Transformer.init(int) + reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) + result = list(reducer([1, 2, 3, 4])) + assert result == [10] + + def test_reduce_with_context_aware_function(self): + """Test reduce with context-aware function.""" + context = PipelineContext({"multiplier": 2}) + transformer = Transformer(context=context) + reducer = transformer.reduce(lambda acc, x, ctx: acc + (x * ctx["multiplier"]), initial=0) + result = list(reducer([1, 2, 3])) + assert result == [12] # (1*2) + (2*2) + (3*2) = 2 + 4 + 6 = 12 + + def test_reduce_with_map_chain(self): + """Test reduce after map transformation.""" + transformer = Transformer.init(int).map(lambda x: x * 2) + reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) + result = list(reducer([1, 2, 3])) + assert result == [12] # [2, 4, 6] summed = 12 + + +class TestTransformerChunking: + """Test chunking behavior.""" + + def test_chunking_with_small_chunk_size(self): + """Test transformer works correctly with small chunk sizes.""" + transformer = Transformer.init(int, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [2, 4, 6, 8, 10] + + def test_chunking_with_large_data(self): + """Test transformer works correctly with large datasets.""" + transformer = Transformer.init(int, chunk_size=100).map(lambda x: x + 1) + large_data = list(range(1000)) + result = list(transformer(large_data)) + expected = [x + 1 for x in large_data] + assert result == expected + + +class TestTransformerEdgeCases: + """Test edge cases for transformer.""" + + def test_empty_data(self): + """Test transformer with empty data.""" + transformer = Transformer.init(int).map(lambda x: x * 2) + result = list(transformer([])) + assert result == [] + + def test_single_element(self): + """Test transformer with single element.""" + transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 0) + result = list(transformer([5])) + assert result == [10] + + def test_filter_removes_all_elements(self): + """Test filter that removes all elements.""" + transformer = Transformer.init(int).filter(lambda x: x > 100) + result = list(transformer([1, 2, 3])) + assert result == []