diff --git a/efemel/pipeline/helpers.py b/efemel/pipeline/helpers.py deleted file mode 100644 index b03f933..0000000 --- a/efemel/pipeline/helpers.py +++ /dev/null @@ -1,49 +0,0 @@ -from collections.abc import Callable -import inspect -from typing import Any -from typing import TypeGuard - - -# --- Type Aliases --- -class PipelineContext(dict): - """Generic, untyped context available to all pipeline operations.""" - - pass - - -# Define the specific callables for clarity -ContextAwareCallable = Callable[[Any, PipelineContext], Any] -ContextAwareReduceCallable = Callable[[Any, Any, PipelineContext], Any] - - -def get_function_param_count(func: Callable[..., Any]) -> int: - """ - Returns the number of parameters a function accepts, excluding `self` or `cls`. - This is useful for determining if a function is context-aware. - """ - try: - sig = inspect.signature(func) - params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] - return len(params) - except (ValueError, TypeError): - return 0 - - -def is_context_aware(func: Callable[..., Any]) -> TypeGuard[ContextAwareCallable]: - """ - Checks if a function is "context-aware" by inspecting its signature. - - This function uses a TypeGuard, allowing Mypy to narrow the type of - the checked function in conditional blocks. - """ - return get_function_param_count(func) >= 2 - - -def is_context_aware_reduce(func: Callable[..., Any]) -> TypeGuard[ContextAwareReduceCallable]: - """ - Checks if a function is "context-aware" by inspecting its signature. - - This function uses a TypeGuard, allowing Mypy to narrow the type of - the checked function in conditional blocks. - """ - return get_function_param_count(func) >= 3 diff --git a/efemel/pipeline/pipeline.py b/efemel/pipeline/pipeline.py deleted file mode 100644 index 01b1a8a..0000000 --- a/efemel/pipeline/pipeline.py +++ /dev/null @@ -1,123 +0,0 @@ -from collections.abc import Callable -from collections.abc import Iterable -from collections.abc import Iterator -import itertools -from typing import Any -from typing import TypedDict -from typing import TypeVar -from typing import overload - -from efemel.pipeline.helpers import is_context_aware - -from .transformers.transformer import Transformer - -# --- Type Aliases --- -T = TypeVar("T") -PipelineFunction = Callable[[T], Any] - - -class PipelineContext(TypedDict): - """Global context available to all pipeline operations.""" - - pass - - -class Pipeline[T]: - """ - Manages a data source and applies transformers to it. - Provides terminal operations to consume the resulting data. - """ - - def __init__(self, *data: Iterable[T]): - self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0] - self.processed_data: Iterator = iter(self.data_source) - self.ctx = PipelineContext() - - def context(self, ctx: PipelineContext) -> "Pipeline[T]": - """ - Sets the context for the pipeline. - """ - self.ctx = ctx - return self - - @overload - def apply[U](self, transformer: Transformer[T, U]) -> "Pipeline[U]": ... - - @overload - def apply[U](self, transformer: Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": ... - - @overload - def apply[U]( - self, - transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]], - ) -> "Pipeline[U]": ... - - def apply[U]( - self, - transformer: Transformer[T, U] - | Callable[[Iterable[T]], Iterator[U]] - | Callable[[Iterable[T], PipelineContext], Iterator[U]], - ) -> "Pipeline[U]": - """ - Applies a transformer to the current data source. - """ - - match transformer: - case Transformer(): - # If a Transformer instance is provided, use its __call__ method - self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore - case _ if callable(transformer): - # If a callable function is provided, call it with the current data and context - - if is_context_aware(transformer): - processed_transformer = transformer - else: - processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731 - - self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore - case _: - raise TypeError("Transformer must be a Transformer instance or a callable function") - - return self # type: ignore - - def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]": - """ - Shorthand method to apply a transformation using a lambda function. - Creates a Transformer under the hood and applies it to the pipeline. - - Args: - t: A callable that takes a transformer and returns a transformed transformer - - Returns: - A new Pipeline with the transformed data - """ - # Create a new transformer and apply the transformation function - transformer = t(Transformer[T, T]()) - return self.apply(transformer) - - def __iter__(self) -> Iterator[T]: - """Allows the pipeline to be iterated over.""" - yield from self.processed_data - - def to_list(self) -> list[T]: - """Executes the pipeline and returns the results as a list.""" - return list(self.processed_data) - - def each(self, function: PipelineFunction[T]) -> None: - """Applies a function to each element (terminal operation).""" - # Context needs to be accessed from the function if it's context-aware, - # but the pipeline itself doesn't own a context. This is a design choice. - # For simplicity, we assume the function is not context-aware here - # or that context is handled within the Transformers. - for item in self.processed_data: - function(item) - - def first(self, n: int = 1) -> list[T]: - """Gets the first n elements of the pipeline (terminal operation).""" - assert n >= 1, "n must be at least 1" - return list(itertools.islice(self.processed_data, n)) - - def consume(self) -> None: - """Consumes the pipeline without returning results.""" - for _ in self.processed_data: - pass diff --git a/efemel/pipeline/transformers/parallel.py b/efemel/pipeline/transformers/parallel.py deleted file mode 100644 index bf616be..0000000 --- a/efemel/pipeline/transformers/parallel.py +++ /dev/null @@ -1,147 +0,0 @@ -"""Parallel transformer implementation using multiple threads.""" - -from collections import deque -from collections.abc import Iterable -from collections.abc import Iterator -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import Future -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import wait -import copy -from functools import partial -import itertools -import threading -from typing import TypedDict - -from .transformer import DEFAULT_CHUNK_SIZE -from .transformer import InternalTransformer -from .transformer import PipelineContext -from .transformer import Transformer - - -# --- Type Definitions --- -class ParallelPipelineContextType(TypedDict): - """A specific context type for parallel transformers that includes a lock.""" - - lock: threading.Lock - - -# --- Class Definition --- -class ParallelTransformer[In, Out](Transformer[In, Out]): - """ - A transformer that executes operations concurrently using multiple threads. - """ - - def __init__( - self, - max_workers: int = 4, - ordered: bool = True, - chunk_size: int = DEFAULT_CHUNK_SIZE, - transformer: InternalTransformer[In, Out] | None = None, - ): - """ - Initialize the parallel transformer. - - Args: - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. If False, results - are yielded as they complete. - chunk_size: Size of data chunks to process. - transformer: The transformation logic chain. - """ - super().__init__(chunk_size, transformer) - self.max_workers = max_workers - self.ordered = ordered - - @classmethod - def from_transformer[T, U]( - cls, - transformer: Transformer[T, U], - chunk_size: int | None = None, - max_workers: int = 4, - ordered: bool = True, - ) -> "ParallelTransformer[T, U]": - """ - Create a ParallelTransformer from an existing Transformer's logic. - - Args: - transformer: The base transformer to copy the transformation logic from. - chunk_size: Optional chunk size override. - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. - - Returns: - A new ParallelTransformer with the same transformation logic. - """ - return cls( - chunk_size=chunk_size or transformer.chunk_size, - transformer=copy.deepcopy(transformer.transformer), # type: ignore - max_workers=max_workers, - ordered=ordered, - ) - - def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """ - Executes the transformer on data concurrently. - - A new `threading.Lock` is created and added to the context for each call - to ensure execution runs are isolated and thread-safe. - """ - # Determine the context for this run, passing it by reference as requested. - run_context = context or self.context - # Add a per-call lock for thread safety. - run_context["lock"] = threading.Lock() - - def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]: - """ - Process a single chunk by passing the chunk and context explicitly - to the transformer chain. This is safer and avoids mutating self. - """ - return self.transformer(chunk, shared_context) - - # Create a partial function with the run_context "baked in". - process_chunk_with_context = partial(process_chunk, shared_context=run_context) - - def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results in their original order.""" - futures: deque[Future[list[Out]]] = deque() - for _ in range(self.max_workers + 1): - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - break - while futures: - yield futures.popleft().result() - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - continue - - def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results as they complete.""" - futures = { - executor.submit(process_chunk_with_context, chunk) - for chunk in itertools.islice(chunks_iter, self.max_workers + 1) - } - while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() - try: - chunk = next(chunks_iter) - futures.add(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - continue - - def result_iterator_manager() -> Iterator[Out]: - """Manage the thread pool and yield flattened results.""" - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - chunks_to_process = self._chunk_generator(data) - gen_func = _ordered_generator if self.ordered else _unordered_generator - processed_chunks_iterator = gen_func(chunks_to_process, executor) - for result_chunk in processed_chunks_iterator: - yield from result_chunk - - return result_iterator_manager() diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py deleted file mode 100644 index b08d9ec..0000000 --- a/efemel/pipeline/transformers/transformer.py +++ /dev/null @@ -1,151 +0,0 @@ -from collections.abc import Callable -from collections.abc import Iterable -from collections.abc import Iterator -import copy -from functools import reduce -import itertools -from typing import Any -from typing import Self -from typing import Union -from typing import overload - -from efemel.pipeline.helpers import PipelineContext -from efemel.pipeline.helpers import is_context_aware -from efemel.pipeline.helpers import is_context_aware_reduce - -DEFAULT_CHUNK_SIZE = 1000 - - -type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] -type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] - -# The internal transformer function signature is changed to explicitly accept a context. -type InternalTransformer[In, Out] = Callable[[list[In], PipelineContext], list[Out]] - - -class Transformer[In, Out]: - """ - Defines and composes data transformations by passing context explicitly. - """ - - def __init__( - self, - chunk_size: int = DEFAULT_CHUNK_SIZE, - transformer: InternalTransformer[In, Out] | None = None, - ): - self.chunk_size = chunk_size - self.context: PipelineContext = PipelineContext() - # The default transformer now accepts and ignores a context argument. - self.transformer: InternalTransformer[In, Out] = transformer or (lambda chunk, ctx: chunk) # type: ignore - - @classmethod - def init[T](cls, _type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": - """Create a new identity pipeline with an explicit type hint.""" - return cls(chunk_size=chunk_size) # type: ignore - - @classmethod - def from_transformer[T, U]( - cls, - transformer: "Transformer[T, U]", - chunk_size: int | None = None, - ) -> "Transformer[T, U]": - """Create a new transformer from an existing one, copying its logic.""" - return cls( - chunk_size=chunk_size or transformer.chunk_size, - transformer=copy.deepcopy(transformer.transformer), # type: ignore - ) - - def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: - """Breaks an iterable into chunks of a specified size.""" - data_iter = iter(data) - while chunk := list(itertools.islice(data_iter, self.chunk_size)): - yield chunk - - def _pipe[U](self, operation: Callable[[list[Out], PipelineContext], list[U]]) -> "Transformer[In, U]": - """Composes the current transformer with a new context-aware operation.""" - prev_transformer = self.transformer - # The new transformer chain ensures the context `ctx` is passed at each step. - self.transformer = lambda chunk, ctx: operation(prev_transformer(chunk, ctx), ctx) # type: ignore - return self # type: ignore - - def map[U](self, function: PipelineFunction[Out, U]) -> "Transformer[In, U]": - """Transforms elements, passing context explicitly to the mapping function.""" - if is_context_aware(function): - return self._pipe(lambda chunk, ctx: [function(x, ctx) for x in chunk]) - - return self._pipe(lambda chunk, _ctx: [function(x) for x in chunk]) # type: ignore - - def filter(self, predicate: PipelineFunction[Out, bool]) -> "Transformer[In, Out]": - """Filters elements, passing context explicitly to the predicate function.""" - if is_context_aware(predicate): - return self._pipe(lambda chunk, ctx: [x for x in chunk if predicate(x, ctx)]) - - return self._pipe(lambda chunk, _ctx: [x for x in chunk if predicate(x)]) # type: ignore - - @overload - def flatten[T](self: "Transformer[In, list[T]]") -> "Transformer[In, T]": ... - @overload - def flatten[T](self: "Transformer[In, tuple[T, ...]]") -> "Transformer[In, T]": ... - @overload - def flatten[T](self: "Transformer[In, set[T]]") -> "Transformer[In, T]": ... - - def flatten[T]( - self: Union["Transformer[In, list[T]]", "Transformer[In, tuple[T, ...]]", "Transformer[In, set[T]]"], - ) -> "Transformer[In, T]": - """Flattens nested lists; the context is passed through the operation.""" - return self._pipe(lambda chunk, ctx: [item for sublist in chunk for item in sublist]) # type: ignore - - def tap(self, function: PipelineFunction[Out, Any]) -> "Transformer[In, Out]": - """Applies a side-effect function without modifying the data.""" - - if is_context_aware(function): - return self._pipe(lambda chunk, ctx: [x for x in chunk if function(x, ctx) or True]) - - return self._pipe(lambda chunk, _ctx: [function(x) or x for x in chunk]) # type: ignore - - def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In, T]": - """Apply another pipeline to the current one.""" - return t(self) - - def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """ - Executes the transformer on a data source. - - It uses the provided `context` by reference. If none is provided, it uses - the transformer's internal context. - """ - # Use the provided context by reference, or default to the instance's context. - run_context = context or self.context - - for chunk in self._chunk_generator(data): - # The context is now passed explicitly through the transformer chain. - yield from self.transformer(chunk, run_context) - - def reduce[U](self, function: PipelineReduceFunction[U, Out], initial: U): - """Reduces elements to a single value (terminal operation).""" - - if is_context_aware_reduce(function): - - def _reduce_with_context(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: - # The context for the run is determined here. - run_context = context or self.context - - data_iterator = self(data, run_context) - - def function_wrapper(acc: U, value: Out) -> U: - return function(acc, value, run_context) - - yield reduce(function_wrapper, data_iterator, initial) - - return _reduce_with_context - - # Not context-aware, so we adapt the function to ignore the context. - def _reduce(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: - # The context for the run is determined here. - run_context = context or self.context - - data_iterator = self(data, run_context) - - yield reduce(function, data_iterator, initial) # type: ignore - - return _reduce diff --git a/performance_test.py b/performance_test.py deleted file mode 100644 index 08dae5d..0000000 --- a/performance_test.py +++ /dev/null @@ -1,587 +0,0 @@ -#!/usr/bin/env python3 -""" -Performance test for Pipeline class comparing different data processing approaches. - -This test compares: -1. Pipeline class operations -2. Chained generator expressions with for/yield -3. Itertools map/filter chaining - -Tests are run on 1 million items, 100 times each to get statistical data. -""" - -import itertools -from itertools import islice -import statistics -import time - -from efemel.pipeline.pipeline import Pipeline -from efemel.pipeline.transformers.transformer import Transformer - - -def generate_test_data(size: int = 1_000_000) -> list[int]: - """Generate test data of specified size.""" - return range(size) # type: ignore - - -def generator_approach(data: list[int]) -> list[int]: - """Process data using chained generator expressions.""" - - def step1(items): - """Filter even numbers.""" - for item in items: - if item % 2 == 0: - yield item - - def step2(items): - """Double the numbers.""" - for item in items: - yield item * 2 - - def step3(items): - """Filter > 100.""" - for item in items: - if item > 100: - yield item - - def step4(items): - """Add 1.""" - for item in items: - yield item + 1 - - return list(step4(step3(step2(step1(data))))) - - -def builtin_map_filter_approach(data: list[int]) -> list[int]: - """Process data using built-in map/filter chaining.""" - result = filter(lambda x: x % 2 == 0, data) # Keep even numbers - result = (x * 2 for x in result) # type: ignore - result = filter(lambda x: x > 100, result) # Keep only > 100 - result = (x + 1 for x in result) # type: ignore - return list(result) - - -def generator_expression_approach(data: list[int]) -> list[int]: - """Process data using generator expressions.""" - result = (x for x in data if x % 2 == 0) # Keep even numbers - result = (x * 2 for x in result) # Double them - result = (x for x in result if x > 100) # Keep only > 100 - result = (x + 1 for x in result) # Add 1 - return list(result) - - -def list_comprehension_approach(data: list[int]) -> list[int]: - """Process data using separate list comprehensions with intermediate lists.""" - # Create intermediate lists at each step to match other approaches - step1 = [x for x in data if x % 2 == 0] # Filter even numbers - step2 = [x * 2 for x in step1] # Double them - step3 = [x for x in step2 if x > 100] # Filter > 100 - step4 = [x + 1 for x in step3] # Add 1 - return step4 - - -def chunked_generator_listcomp_approach(data: list[int]) -> list[int]: - """Process data using chunked generators with intermediate lists per chunk.""" - - def chunk_generator(data, chunk_size=1000): - """Generate chunks using a generator.""" - for i in range(0, len(data), chunk_size): - yield data[i : i + chunk_size] - - # Process each chunk with intermediate lists and combine - results = [] - for chunk in chunk_generator(data): - # Create intermediate lists for each chunk - step1 = [x for x in chunk if x % 2 == 0] # Filter even numbers - step2 = [x * 2 for x in step1] # Double them - step3 = [x for x in step2 if x > 100] # Filter > 100 - step4 = [x + 1 for x in step3] # Add 1 - results.extend(step4) - return results - - -def mutated_chunked_generator_listcomp_approach(data: list[int]) -> list[int]: - """Process data using chunked generators with intermediate lists per chunk.""" - - def chunk_generator(data, chunk_size=1000): - """Generate chunks using a generator.""" - while True: - chunk = list(islice(data, chunk_size)) - if not chunk: - return - yield chunk - - # Process each chunk with intermediate lists and combine - results = [] - for chunk in chunk_generator(data): - # Create intermediate lists for each chunk - step = [x for x in chunk if x % 2 == 0] # Filter even numbers - step = [x * 2 for x in step] # Double them - step = [x for x in step if x > 100] # Filter > 100 - step = [x + 1 for x in step] # Add 1 - results.extend(step) - return results - - -class ChunkedPipeline: - """ - A chunked pipeline that composes operations into a single function, - inspired by a callback-chaining pattern. - """ - - def __init__(self, chunk_size=1000): - """Initialize the pipeline with data and chunk size.""" - self.chunk_size = chunk_size - # The transformer starts as an identity function that does nothing. - self.transformer = lambda chunk: chunk - - def _chunk_generator(self, data): - """Generate chunks from the data.""" - data_iter = iter(data) - while True: - chunk = list(itertools.islice(data_iter, self.chunk_size)) - if not chunk: - break - yield chunk - - def pipe(self, operation): - """ - Composes the current transformer with a new chunk-wise operation. - The operation should be a function that takes a list and returns a list. - """ - # Capture the current transformer - prev_transformer = self.transformer - # Create a new transformer that first calls the old one, then the new operation - self.transformer = lambda chunk: operation(prev_transformer(chunk)) - return self - - def filter(self, predicate): - """Adds a filter operation by wrapping the current transformer in a list comprehension.""" - return self.pipe(lambda chunk: [x for x in chunk if predicate(x)]) - - def map(self, func): - """Adds a map operation by wrapping the current transformer in a list comprehension.""" - return self.pipe(lambda chunk: [func(x) for x in chunk]) - - def run(self, data): - """ - Execute the pipeline by calling the single, composed transformer on each chunk. - """ - for chunk in self._chunk_generator(data): - # The transformer is a single function that contains all the nested operations. - yield from self.transformer(chunk) - - -CHUNKED_PIPELINE = ( - ChunkedPipeline() - .filter(lambda x: x % 2 == 0) # Filter even numbers - .map(lambda x: x * 2) # Double them - .filter(lambda x: x > 100) # Filter > 100 - .map(lambda x: x + 1) -) - - -class ChunkedPipelineGenerator: - """ - A chunked pipeline that composes operations into a single generator function, - using lazy evaluation throughout the pipeline. - """ - - def __init__(self, chunk_size=1000): - """Initialize the pipeline with chunk size.""" - self.chunk_size = chunk_size - # The transformer starts as an identity generator that yields items as-is - self.transformer = lambda chunk: (x for x in chunk) - - def _chunk_generator(self, data): - """Generate chunks from the data.""" - data_iter = iter(data) - while True: - chunk = list(itertools.islice(data_iter, self.chunk_size)) - if not chunk: - break - yield chunk - - def pipe(self, operation): - """ - Composes the current transformer with a new chunk-wise operation. - The operation should be a function that takes a generator and returns a generator. - """ - # Capture the current transformer - prev_transformer = self.transformer - # Create a new transformer that first calls the old one, then the new operation - self.transformer = lambda chunk: operation(prev_transformer(chunk)) - return self - - def filter(self, predicate): - """Adds a filter operation using generator expression.""" - return self.pipe(lambda gen: (x for x in gen if predicate(x))) - - def map(self, func): - """Adds a map operation using generator expression.""" - return self.pipe(lambda gen: (func(x) for x in gen)) - - def run(self, data): - """ - Execute the pipeline by calling the single, composed transformer on each chunk. - Yields items lazily from the generator pipeline. - """ - for chunk in self._chunk_generator(data): - # The transformer is a single function that contains all the nested generator operations - yield from self.transformer(chunk) - - -# Create a generator-based pipeline instance -PIPELINE_GENERATOR = ( - ChunkedPipelineGenerator() - .filter(lambda x: x % 2 == 0) # Filter even numbers - .map(lambda x: x * 2) # Double them - .filter(lambda x: x > 100) # Filter > 100 - .map(lambda x: x + 1) -) - - -def chunked_pipeline_approach(data: list[int]) -> list[int]: - """Process data using the ChunkedPipeline class.""" - return list(CHUNKED_PIPELINE.run(data)) - - -def chunked_pipeline_generator_approach(data: list[int]) -> list[int]: - """Process data using the ChunkedPipelineGenerator class.""" - return list(PIPELINE_GENERATOR.run(data)) - - -class ChunkedPipelineSimple: - """ - A simple chunked pipeline that stores operations in a list and applies them sequentially - using list comprehensions when executed. - """ - - def __init__(self, chunk_size=1000): - """Initialize the pipeline with chunk size.""" - self.chunk_size = chunk_size - self.operations = [] - - def _chunk_generator(self, data): - """Generate chunks from the data.""" - data_iter = iter(data) - while True: - chunk = list(itertools.islice(data_iter, self.chunk_size)) - if not chunk: - break - yield chunk - - def pipe(self, operation): - """ - Add an operation to the pipeline. - The operation should be a function that takes a list and returns a list. - """ - self.operations.append(operation) - return self - - def filter(self, predicate): - """Add a filter operation to the pipeline.""" - return self.pipe(lambda chunk: [x for x in chunk if predicate(x)]) - - def map(self, func): - """Add a map operation to the pipeline.""" - return self.pipe(lambda chunk: [func(x) for x in chunk]) - - def run(self, data): - """ - Execute the pipeline by applying all operations sequentially to each chunk. - """ - for chunk in self._chunk_generator(data): - # Apply all operations sequentially to the chunk - current_chunk = chunk - for operation in self.operations: - current_chunk = operation(current_chunk) - - # Yield each item from the processed chunk - yield from current_chunk - - -# Create a simple pipeline instance -PIPELINE_SIMPLE = ( - ChunkedPipelineSimple() - .filter(lambda x: x % 2 == 0) # Filter even numbers - .map(lambda x: x * 2) # Double them - .filter(lambda x: x > 100) # Filter > 100 - .map(lambda x: x + 1) -) - - -def chunked_pipeline_simple_approach(data: list[int]) -> list[int]: - """Process data using the ChunkedPipelineSimple class.""" - return list(PIPELINE_SIMPLE.run(data)) - - -# Sentinel value to indicate that an item has been filtered out. -_SKIPPED = object() - - -class ChunkedPipelinePerItem: - """ - A chunked pipeline that composes operations into a single function - operating on individual items, inspired by a callback-chaining pattern - to reduce the creation of intermediate lists. - """ - - def __init__(self, chunk_size=1000): - """Initialize the pipeline with a specified chunk size.""" - self.chunk_size = chunk_size - # The transformer starts as an identity function. - self.transformer = lambda item: item - - def _chunk_generator(self, data): - """A generator that yields chunks of data.""" - data_iter = iter(data) - while True: - chunk = list(itertools.islice(data_iter, self.chunk_size)) - if not chunk: - break - yield chunk - - def pipe(self, operation): - """ - Composes the current transformer with a new item-wise operation. - This internal method is the core of the pipeline's composition logic. - """ - prev_transformer = self.transformer - - def new_transformer(item): - # Apply the existing chain of transformations. - processed_item = prev_transformer(item) - - # If a previous operation (like a filter) already skipped this item, - # we bypass the new operation entirely. - if processed_item is _SKIPPED: - return _SKIPPED - - # Apply the new operation to the result of the previous ones. - return operation(processed_item) - - self.transformer = new_transformer - return self - - def filter(self, predicate): - """ - Adds a filter operation to the pipeline. - - If the predicate returns `False`, the item is marked as skipped, - and no further operations in the chain will be executed on it. - """ - - def filter_operation(item): - return item if predicate(item) else _SKIPPED - - return self.pipe(filter_operation) - - def map(self, func): - """Adds a map operation to transform an item.""" - return self.pipe(func) - - def run(self, data): - """ - Executes the pipeline. - - The composed transformer function is applied to each item individually. - Results are yielded only if they haven't been marked as skipped. - """ - for chunk in self._chunk_generator(data): - yield from [result for item in chunk if (result := self.transformer(item)) is not _SKIPPED] - - -PIPELINE_PER_ITEM = ( - ChunkedPipelinePerItem() - .filter(lambda x: x % 2 == 0) # Filter even numbers - .map(lambda x: x * 2) # Double them - .filter(lambda x: x > 100) # Filter > 100 - .map(lambda x: x + 1) -) - - -def chunked_pipeline_per_item_approach(data) -> list[int]: - """Process data using the ChunkedPipelinePerItem class.""" - return list(PIPELINE_PER_ITEM.run(data)) - - -PIPELINE_TRANSFORMER: Transformer = ( - Transformer() - .filter(lambda x: x % 2 == 0) # Filter even numbers - .map(lambda x: x * 2) # Double them - .filter(lambda x: x > 100) # Filter > 100 - .map(lambda x: x + 1) -) - - -def pipeline_approach(data: list[int]) -> list[int]: - """Process data using the Pipeline class.""" - return Pipeline(data).apply(PIPELINE_TRANSFORMER).to_list() - - -def time_function(func, *args, **kwargs) -> float: - """Time a function execution and return duration in seconds.""" - start_time = time.perf_counter() - func(*args, **kwargs) - end_time = time.perf_counter() - return end_time - start_time - - -def run_performance_test(): - """Run comprehensive performance test.""" - print("šŸš€ Starting Performance Test") - print("=" * 60) - - # Test configurations - approaches = { - # "Generators": generator_approach, - # "Map/Filter": builtin_map_filter_approach, - # "GeneratorExpression": generator_expression_approach, - "ChunkedGeneratorListComp": chunked_generator_listcomp_approach, - "ChunkedPipeline": chunked_pipeline_approach, - "Pipeline": pipeline_approach, - # "ChunkedPipelinePerItem": chunked_pipeline_per_item_approach, - # "ChunkedPipelineGenerator": chunked_pipeline_generator_approach, - # "ChunkedPipelineSimple": chunked_pipeline_simple_approach, - # "ListComprehension": list_comprehension_approach, - # "MutatedChunkedGeneratorListComp": mutated_chunked_generator_listcomp_approach, - } - - num_runs = 20 - results = {} - - for name, approach_func in approaches.items(): - print(f"\nšŸ”„ Testing {name} approach ({num_runs} runs)...") - times = [] - - # Warm up - approach_func(generate_test_data(1_000_000)) - - # Actual timing runs - for run in range(num_runs): - print(f" Run {run + 1}/{num_runs}") - - duration = time_function(approach_func, generate_test_data(1_000_000)) - times.append(duration) - - # Calculate statistics - results[name] = { - "times": times, - "min": min(times), - "max": max(times), - "avg": statistics.mean(times), - "p95": statistics.quantiles(times, n=20)[18], # 95th percentile - "p99": statistics.quantiles(times, n=100)[98], # 99th percentile - "median": statistics.median(times), - "stdev": statistics.stdev(times) if len(times) > 1 else 0, - } # Print results - print("\n" + "=" * 60) - print("šŸ“ˆ PERFORMANCE RESULTS") - print("=" * 60) - - # Header - header = ( - f"{'Approach':<12} {'Min (s)':<10} {'Max (s)':<10} {'Avg (s)':<10} " - f"{'P95 (s)':<10} {'P99 (s)':<10} {'Median (s)':<12} {'StdDev':<10}" - ) - print(header) - print("-" * 104) - - # Sort by average time - sorted_results = sorted(results.items(), key=lambda x: x[1]["avg"]) - - for name, stats in sorted_results: - print( - f"{name:<12} {stats['min']:<10.4f} {stats['max']:<10.4f} {stats['avg']:<10.4f} " - f"{stats['p95']:<10.4f} {stats['p99']:<10.4f} {stats['median']:<12.4f} {stats['stdev']:<10.4f}" - ) - - # Relative performance - print("\n" + "=" * 60) - print("šŸ† RELATIVE PERFORMANCE") - print("=" * 60) - - baseline = sorted_results[0][1]["avg"] # Fastest approach as baseline - - for name, stats in sorted_results: - ratio = stats["avg"] / baseline - percentage = (ratio - 1) * 100 - if ratio == 1.0: - print(f"{name:<12} 1.00x (baseline)") - else: - print(f"{name:<12} {ratio:.2f}x ({percentage:+.1f}% slower)") - - # Verify correctness - print("\n" + "=" * 60) - print("āœ… CORRECTNESS VERIFICATION") - print("=" * 60) - - # Use smaller dataset for verification - results_correctness = {} - - for name, approach_func in approaches.items(): - result = approach_func(generate_test_data(1_000)) - results_correctness[name] = result - print(f"{name:<12} Result length: {len(result)}") - - # Check if all approaches produce the same result - first_result = next(iter(results_correctness.values())) - all_same = all(result == first_result for result in results_correctness.values()) - - if all_same: - print("āœ… All approaches produce identical results") - print(f" Sample result (first 10): {first_result[:10]}") - else: - print("āŒ Approaches produce different results!") - for name, result in results_correctness.items(): - print(f" {name}: {result[:10]} (length: {len(result)})") - - -def run_memory_test(): - """Run a quick memory efficiency comparison.""" - print("\n" + "=" * 60) - print("šŸ’¾ MEMORY EFFICIENCY TEST") - print("=" * 60) - - import tracemalloc - - approaches = { - # "Generators": generator_approach, - # "MapFilter": builtin_map_filter_approach, - # "GeneratorExpression": generator_expression_approach, - "ChunkedGeneratorListComp": chunked_generator_listcomp_approach, - "ChunkedPipeline": chunked_pipeline_approach, - "Pipeline": pipeline_approach, - # "ChunkedPipelinePerItem": chunked_pipeline_per_item_approach, - # "ChunkedPipelineGenerator": chunked_pipeline_generator_approach, - # "ChunkedPipelineSimple": chunked_pipeline_simple_approach, - # "ListComprehension": list_comprehension_approach, - # "MutatedChunkedGeneratorListComp": mutated_chunked_generator_listcomp_approach, - } - - for name, approach_func in approaches.items(): - tracemalloc.start() - - result = approach_func(generate_test_data(100_000)) - - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - - print( - f"{name:<12} Peak memory: {peak / 1024 / 1024:.2f} MB, " - f"Current: {current / 1024 / 1024:.2f} MB, " - f"Result length: {len(result)}" - ) - - -if __name__ == "__main__": - try: - run_performance_test() - run_memory_test() - print("\nšŸŽ‰ ChunkedPipeline test completed successfully!") - except KeyboardInterrupt: - print("\nāš ļø Test interrupted by user") - except Exception as e: - print(f"\nāŒ Test failed with error: {e}") - raise diff --git a/pyproject.toml b/pyproject.toml index ee38b38..56ceca4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ authors = [ ] dependencies = [ "click>=8.0.0", + "laygo>=0.1.1" ] [project.scripts] diff --git a/tests/test_integration.py b/tests/test_integration.py deleted file mode 100644 index 7fc2f6b..0000000 --- a/tests/test_integration.py +++ /dev/null @@ -1,380 +0,0 @@ -"""Integration tests for Pipeline and Transformer working together.""" - -from efemel.pipeline.pipeline import Pipeline -from efemel.pipeline.transformers.parallel import ParallelTransformer -from efemel.pipeline.transformers.transformer import PipelineContext -from efemel.pipeline.transformers.transformer import Transformer - - -class TestPipelineTransformerIntegration: - """Test Pipeline and Transformer integration.""" - - def test_basic_integration(self): - """Test basic pipeline and transformer integration.""" - # Create a transformer that doubles and filters - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 5) - - # Apply to pipeline - result = Pipeline([1, 2, 3, 4, 5]).apply(transformer).to_list() - assert result == [6, 8, 10] - - def test_context_sharing(self): - """Test that context is properly shared in transformations.""" - context = {"multiplier": 3, "threshold": 5} - - transformer = Transformer().map(lambda x, ctx: x * ctx["multiplier"]).filter(lambda x, ctx: x > ctx["threshold"]) - - result = Pipeline([1, 2, 3]).context(context).apply(transformer).to_list() - assert result == [6, 9] # [3, 6, 9] filtered to [6, 9] - - def test_reduce_integration(self): - """Test reduce operation with pipeline.""" - transformer = Transformer.init(int).map(lambda x: x * 2) - reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) - - result = list(reducer([1, 2, 3, 4])) - assert result == [20] # [2, 4, 6, 8] summed = 20 - - def test_complex_chain_integration(self): - """Test complex chain of operations.""" - # Transform: double -> flatten -> filter -> tap - side_effects = [] - - transformer = ( - Transformer.init(int) - .map(lambda x: [x, x * 2]) # Create pairs - .flatten() # Flatten to single list - .filter(lambda x: x % 3 == 0) # Keep multiples of 3 - .tap(lambda x: side_effects.append(f"processed: {x}")) - ) - - result = Pipeline([1, 2, 3, 6]).apply(transformer).to_list() - - # Expected: [1,2,2,4,3,6,6,12] -> [3,6,6,12] (multiples of 3) - assert result == [3, 6, 6, 12] - assert side_effects == ["processed: 3", "processed: 6", "processed: 6", "processed: 12"] - - def test_multiple_transformer_applications(self): - """Test applying multiple transformers in sequence.""" - t1 = Transformer.init(int).map(lambda x: x * 2) - t2 = Transformer.init(int).filter(lambda x: x > 5) - t3 = Transformer.init(int).map(lambda x: x + 1) - - result = ( - Pipeline([1, 2, 3, 4, 5]) - .apply(t1) # [2, 4, 6, 8, 10] - .apply(t2) # [6, 8, 10] - .apply(t3) # [7, 9, 11] - .to_list() - ) - - assert result == [7, 9, 11] - - def test_transform_shorthand_integration(self): - """Test transform shorthand method integration.""" - result = ( - Pipeline([1, 2, 3, 4, 5]) - .transform(lambda t: t.map(lambda x: x * 3)) - .transform(lambda t: t.filter(lambda x: x > 6)) - .to_list() - ) - - assert result == [9, 12, 15] # [3, 6, 9, 12, 15] -> [9, 12, 15] - - def test_mixed_operations(self): - """Test mixing transform shorthand and apply methods.""" - transformer = Transformer.init(int).filter(lambda x: x < 10) - - result = ( - Pipeline([1, 2, 3, 4, 5]) - .transform(lambda t: t.map(lambda x: x * 2)) # [2, 4, 6, 8, 10] - .apply(transformer) # [2, 4, 6, 8] - .transform(lambda t: t.map(lambda x: x + 1)) # [3, 5, 7, 9] - .to_list() - ) - - assert result == [3, 5, 7, 9] - - -class TestPipelineDataProcessingPatterns: - """Test common data processing patterns.""" - - def test_etl_pattern(self): - """Test Extract-Transform-Load pattern.""" - # Simulate raw data extraction - raw_data = [ - {"name": "Alice", "age": 25, "salary": 50000}, - {"name": "Bob", "age": 30, "salary": 60000}, - {"name": "Charlie", "age": 35, "salary": 70000}, - {"name": "David", "age": 28, "salary": 55000}, - ] - - # Transform: extract names of people over 28 with salary > 55000 - result = ( - Pipeline(raw_data) - .transform(lambda t: t.filter(lambda x: x["age"] > 28 and x["salary"] > 55000)) - .transform(lambda t: t.map(lambda x: x["name"])) - .to_list() - ) - - assert result == ["Bob", "Charlie"] - - def test_data_aggregation_pattern(self): - """Test data aggregation pattern.""" - # Group and count pattern - data = ["apple", "banana", "apple", "cherry", "banana", "apple"] - - # Count occurrences - counts = {} - (Pipeline(data).transform(lambda t: t.tap(lambda x: counts.update({x: counts.get(x, 0) + 1}))).consume()) - - assert counts == {"apple": 3, "banana": 2, "cherry": 1} - - def test_map_reduce_pattern(self): - """Test map-reduce pattern.""" - # Map: square numbers, Reduce: sum them - transformer = Transformer.init(int).map(lambda x: x * x) # Square - - reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) # Sum - - result = list(reducer([1, 2, 3, 4, 5])) - assert result == [55] # 1 + 4 + 9 + 16 + 25 = 55 - - def test_filtering_and_transformation_pattern(self): - """Test filtering and transformation pattern.""" - # Process only even numbers, double them, then sum - even_numbers = [] - transformer = ( - Transformer.init(int).filter(lambda x: x % 2 == 0).tap(lambda x: even_numbers.append(x)).map(lambda x: x * 2) - ) - - result = Pipeline(range(1, 11)).apply(transformer).to_list() - - assert even_numbers == [2, 4, 6, 8, 10] - assert result == [4, 8, 12, 16, 20] - - def test_data_validation_pattern(self): - """Test data validation pattern.""" - # Validate and clean data - raw_data = [1, "2", 3.0, "invalid", 5, None, 7] - - valid_numbers = [] - - def validate_and_convert(x): - try: - num = float(x) if x is not None else None - if num is not None and not str(x).lower() == "invalid": - valid_numbers.append(num) - return int(num) - return None - except (ValueError, TypeError): - return None - - result = ( - Pipeline(raw_data) - .transform(lambda t: t.map(validate_and_convert)) - .transform(lambda t: t.filter(lambda x: x is not None)) - .to_list() - ) - - assert result == [1, 2, 3, 5, 7] - assert valid_numbers == [1.0, 2.0, 3.0, 5.0, 7.0] - - -class TestPipelinePerformanceIntegration: - """Test performance aspects of pipeline and transformer integration.""" - - def test_large_dataset_chunked_processing(self): - """Test chunked processing with large datasets.""" - # Process 10,000 numbers with small chunk size - transformer = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x % 1000 == 0) - - large_data = range(10000) - result = Pipeline(large_data).apply(transformer).to_list() - - # Should get multiples of 500 doubled (0, 1000, 2000, ..., 18000) - expected = [x * 2 for x in range(0, 10000, 500)] - assert result == expected - - def test_memory_efficient_processing(self): - """Test memory-efficient lazy processing.""" - # Process large dataset but only take first few results - transformer = Transformer.init(int).map(lambda x: x**2).filter(lambda x: x > 100) - - large_data = range(1000) - result = Pipeline(large_data).apply(transformer).first(5) - - # First 5 squares > 100: 121, 144, 169, 196, 225 - assert result == [121, 144, 169, 196, 225] - - def test_streaming_processing(self): - """Test streaming-like processing pattern.""" - # Simulate processing data as it arrives - batches = [[1, 2], [3, 4], [5, 6]] - results = [] - - for batch in batches: - batch_result = Pipeline(batch).transform(lambda t: t.map(lambda x: x * 3)).to_list() - results.extend(batch_result) - - assert results == [3, 6, 9, 12, 15, 18] - - -class TestPipelineParallelTransformerIntegration: - """Test Pipeline integration with ParallelTransformer and context modification.""" - - def test_pipeline_with_parallel_transformer_context_modification(self): - """Test pipeline calling parallel transformer that safely modifies context.""" - # Create context with statistics to be updated by parallel processing - context = PipelineContext({"processed_count": 0, "sum_total": 0, "max_value": 0}) - - def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: - """Safely increment context counters and transform the value.""" - with ctx["lock"]: - ctx["processed_count"] += 1 - ctx["sum_total"] += x - ctx["max_value"] = max(ctx["max_value"], x) - return x * 2 - - # Create parallel transformer that modifies context safely - parallel_transformer = ParallelTransformer[int, int](max_workers=3, chunk_size=2) - parallel_transformer = parallel_transformer.map(safe_increment_and_transform) - - # Use pipeline to process data with parallel transformer - data = [1, 5, 3, 8, 2, 7, 4, 6] - result = Pipeline(data).context(context).apply(parallel_transformer).to_list() - - # Verify transformation results - expected_result = [x * 2 for x in data] - assert sorted(result) == sorted(expected_result) - - # Verify context was safely modified by parallel workers - assert context["processed_count"] == len(data) - assert context["sum_total"] == sum(data) - assert context["max_value"] == max(data) - - def test_pipeline_accesses_context_after_parallel_processing(self): - """Test that pipeline can access context data modified by parallel transformer.""" - # Create context with counters - context = PipelineContext({"items_processed": 0, "even_count": 0, "odd_count": 0}) - - def count_and_transform(x: int, ctx: PipelineContext) -> int: - """Count even/odd numbers and transform.""" - with ctx["lock"]: - ctx["items_processed"] += 1 - if x % 2 == 0: - ctx["even_count"] += 1 - else: - ctx["odd_count"] += 1 - return x * 3 - - parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) - parallel_transformer = parallel_transformer.map(count_and_transform) - - data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - - # Process with pipeline - pipeline = Pipeline(data).context(context) - result = pipeline.apply(parallel_transformer).to_list() - - # Access the modified context from pipeline - final_context = pipeline.ctx - - # Verify results - expected_result = [x * 3 for x in data] - assert sorted(result) == sorted(expected_result) - - # Verify context statistics computed by parallel workers - assert final_context["items_processed"] == 10 - assert final_context["even_count"] == 5 # 2, 4, 6, 8, 10 - assert final_context["odd_count"] == 5 # 1, 3, 5, 7, 9 - - # Demonstrate context access after processing - total_processed = final_context["even_count"] + final_context["odd_count"] - assert total_processed == final_context["items_processed"] - - def test_multiple_parallel_transformers_sharing_context(self): - """Test multiple parallel transformers modifying the same context.""" - # Shared context for statistics across transformations - context = PipelineContext({"stage1_processed": 0, "stage2_processed": 0, "total_sum": 0}) - - def stage1_processor(x: int, ctx: PipelineContext) -> int: - """First stage processing with context update.""" - with ctx["lock"]: - ctx["stage1_processed"] += 1 - ctx["total_sum"] += x - return x * 2 - - def stage2_processor(x: int, ctx: PipelineContext) -> int: - """Second stage processing with context update.""" - with ctx["lock"]: - ctx["stage2_processed"] += 1 - ctx["total_sum"] += x # Add transformed value too - return x + 10 - - # Create two parallel transformers - stage1 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage1_processor) - stage2 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage2_processor) - - data = [1, 2, 3, 4, 5] - - # Chain parallel transformers in pipeline - pipeline = Pipeline(data).context(context) - result = ( - pipeline.apply(stage1) # [2, 4, 6, 8, 10] - .apply(stage2) # [12, 14, 16, 18, 20] - .to_list() - ) - - # Verify final results - expected_stage1 = [x * 2 for x in data] # [2, 4, 6, 8, 10] - expected_final = [x + 10 for x in expected_stage1] # [12, 14, 16, 18, 20] - assert result == expected_final - - # Verify context reflects both stages - final_context = pipeline.ctx - assert final_context["stage1_processed"] == 5 - assert final_context["stage2_processed"] == 5 - - # Total sum should include original values + transformed values - original_sum = sum(data) # 1+2+3+4+5 = 15 - stage1_sum = sum(expected_stage1) # 2+4+6+8+10 = 30 - expected_total = original_sum + stage1_sum # 15 + 30 = 45 - assert final_context["total_sum"] == expected_total - - def test_pipeline_context_isolation_with_parallel_processing(self): - """Test that different pipeline instances have isolated contexts.""" - - # Create base context structure - def create_context(): - return PipelineContext({"count": 0}) - - def increment_counter(x: int, ctx: PipelineContext) -> int: - """Increment counter in context.""" - with ctx["lock"]: - ctx["count"] += 1 - return x * 2 - - parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - parallel_transformer = parallel_transformer.map(increment_counter) - - data = [1, 2, 3] - - # Create two separate pipeline instances with their own contexts - pipeline1 = Pipeline(data).context(create_context()) - pipeline2 = Pipeline(data).context(create_context()) - - # Process with both pipelines - result1 = pipeline1.apply(parallel_transformer).to_list() - result2 = pipeline2.apply(parallel_transformer).to_list() - - # Both should have same transformation results - assert result1 == [2, 4, 6] - assert result2 == [2, 4, 6] - - # But contexts should be isolated - assert pipeline1.ctx["count"] == 3 - assert pipeline2.ctx["count"] == 3 - - # Verify they are different context objects - assert pipeline1.ctx is not pipeline2.ctx diff --git a/tests/test_laygo.py b/tests/test_laygo.py new file mode 100644 index 0000000..664413e --- /dev/null +++ b/tests/test_laygo.py @@ -0,0 +1,7 @@ +from laygo import Pipeline + + +class TestLaygo: + def test_laygo(self): + p = Pipeline([1, 2, 3]).transform(lambda t: t.map(lambda x: x + 1)).to_list() + assert p == [2, 3, 4], "Test failed: Output does not match expected result." diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py deleted file mode 100644 index da98a4f..0000000 --- a/tests/test_parallel_transformer.py +++ /dev/null @@ -1,357 +0,0 @@ -"""Tests for the ParallelTransformer class.""" - -import threading -import time -from unittest.mock import patch - -from efemel.pipeline.transformers.parallel import ParallelTransformer -from efemel.pipeline.transformers.transformer import PipelineContext -from efemel.pipeline.transformers.transformer import Transformer - - -class TestParallelTransformerBasics: - """Test basic parallel transformer functionality.""" - - def test_init_creates_parallel_transformer(self): - """Test that init creates a parallel transformer with default values.""" - transformer = ParallelTransformer[int, int]() - assert transformer.max_workers == 4 - assert transformer.ordered is True - assert transformer.chunk_size == 1000 - - def test_init_with_custom_parameters(self): - """Test init with custom parameters.""" - transformer = ParallelTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) - assert transformer.max_workers == 8 - assert transformer.ordered is False - assert transformer.chunk_size == 500 - - def test_call_executes_transformer_concurrently(self): - """Test that calling parallel transformer executes it on data.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) - result = list(transformer([1, 2, 3, 4, 5])) - assert result == [1, 2, 3, 4, 5] - - def test_from_transformer_class_method(self): - """Test creating ParallelTransformer from existing Transformer.""" - # Create a regular transformer with some operations - regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) - - # Convert to parallel using the base class method - parallel = ParallelTransformer.from_transformer(regular, max_workers=2, ordered=True) - - # Test both produce same results - data = [1, 2, 3, 4, 5, 6] - regular_results = list(regular(data)) - parallel_results = list(parallel(data)) - - assert regular_results == parallel_results - assert parallel.max_workers == 2 - assert parallel.ordered is True - assert parallel.chunk_size == 100 - - -class TestParallelTransformerOperations: - """Test parallel transformer operations.""" - - def test_map_transforms_elements_concurrently(self): - """Test map transforms each element using multiple threads.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) - result = list(transformer([1, 2, 3, 4])) - assert result == [2, 4, 6, 8] - - def test_filter_keeps_matching_elements_concurrently(self): - """Test filter keeps only matching elements using multiple threads.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) - result = list(transformer([1, 2, 3, 4, 5, 6])) - assert result == [2, 4, 6] - - def test_chained_operations_concurrently(self): - """Test chained operations work correctly with concurrency.""" - transformer = ( - ParallelTransformer[int, int](max_workers=2, chunk_size=2) - .map(lambda x: x * 2) - .filter(lambda x: x > 4) - .map(lambda x: x + 1) - ) - result = list(transformer([1, 2, 3, 4, 5])) - assert result == [7, 9, 11] # [2,4,6,8,10] -> [6,8,10] -> [7,9,11] - - def test_map_with_context_aware_function_concurrently(self): - """Test map with context-aware function in concurrent execution.""" - context = PipelineContext({"multiplier": 3}) - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) - result = list(transformer([1, 2, 3], context)) - assert result == [3, 6, 9] - - def test_flatten_concurrently(self): - """Test flatten operation works with concurrent execution.""" - transformer = ParallelTransformer[list, int](max_workers=2, chunk_size=2).flatten() - result = list(transformer([[1, 2], [3, 4], [5, 6]])) - assert result == [1, 2, 3, 4, 5, 6] - - def test_tap_applies_side_effects_concurrently(self): - """Test tap applies side effects correctly in concurrent execution.""" - side_effects = [] - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.tap(lambda x: side_effects.append(x)) - result = list(transformer([1, 2, 3, 4])) - - assert result == [1, 2, 3, 4] # Data unchanged - assert sorted(side_effects) == [1, 2, 3, 4] # Side effects applied (may be out of order) - - -class TestParallelTransformerOrdering: - """Test ordering behavior of parallel transformer.""" - - def test_ordered_execution_maintains_order(self): - """Test that ordered=True maintains element order.""" - - def slow_transform(x: int) -> int: - # Simulate variable processing time - time.sleep(0.01 * (5 - x)) # Later elements process faster - return x * 2 - - transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) - transformer = transformer.map(slow_transform) - result = list(transformer([1, 2, 3, 4, 5])) - - assert result == [2, 4, 6, 8, 10] # Order maintained despite processing times - - def test_unordered_execution_allows_reordering(self): - """Test that ordered=False allows results in completion order.""" - transformer = ParallelTransformer[int, int](max_workers=2, ordered=False, chunk_size=1) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3, 4])) - - # Results should have the same elements, but order may vary - assert sorted(result) == [2, 4, 6, 8] - - def test_ordered_vs_unordered_same_results(self): - """Test that ordered and unordered produce same elements, just different order.""" - data = list(range(10)) - - ordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) - ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) - - unordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) - unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) - - assert sorted(ordered_result) == sorted(unordered_result) - assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence - - -class TestParallelTransformerPerformance: - """Test performance aspects of parallel transformer.""" - - def test_concurrent_faster_than_sequential_for_slow_operations(self): - """Test that concurrent execution is faster for CPU-intensive operations.""" - - def slow_operation(x: int) -> int: - time.sleep(0.01) # 10ms delay - return x * 2 - - data = list(range(8)) # 8 items, 80ms total sequential time - - # Sequential execution - start_time = time.time() - sequential = Transformer[int, int](chunk_size=4) - seq_result = list(sequential.map(slow_operation)(data)) - seq_time = time.time() - start_time - - # Concurrent execution - start_time = time.time() - concurrent = ParallelTransformer[int, int](max_workers=4, chunk_size=4) - conc_result = list(concurrent.map(slow_operation)(data)) - conc_time = time.time() - start_time - - # Results should be the same - assert seq_result == conc_result - - # Concurrent should be faster (allowing some variance for thread overhead) - assert conc_time < seq_time * 0.8 # At least 20% faster - - def test_thread_pool_properly_managed(self): - """Test that thread pool is properly created and cleaned up.""" - with patch("efemel.pipeline.transformers.parallel.ThreadPoolExecutor") as mock_executor: - mock_executor.return_value.__enter__.return_value = mock_executor.return_value - mock_executor.return_value.__exit__.return_value = None - mock_executor.return_value.submit.return_value.result.return_value = [2, 4] - - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - list(transformer([1, 2])) - - # Verify ThreadPoolExecutor was created with correct max_workers - mock_executor.assert_called_with(max_workers=2) - # Verify context manager was used (enter/exit called) - mock_executor.return_value.__enter__.assert_called_once() - mock_executor.return_value.__exit__.assert_called_once() - - -class TestParallelTransformerEdgeCases: - """Test edge cases for parallel transformer.""" - - def test_empty_data_concurrent(self): - """Test parallel transformer with empty data.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) - result = list(transformer([])) - assert result == [] - - def test_single_element_concurrent(self): - """Test parallel transformer with single element.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.map(lambda x: x * 2).filter(lambda x: x > 0) - result = list(transformer([5])) - assert result == [10] - - def test_data_smaller_than_chunk_size(self): - """Test parallel transformer when data is smaller than chunk size.""" - transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=100) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3])) - assert result == [2, 4, 6] - - def test_more_workers_than_chunks(self): - """Test parallel transformer when workers exceed number of chunks.""" - transformer = ParallelTransformer[int, int](max_workers=10, chunk_size=2) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers - assert result == [2, 4, 6] - - def test_exception_handling_in_concurrent_execution(self): - """Test that exceptions in worker threads are properly propagated.""" - - def failing_function(x: int) -> int: - if x == 3: - raise ValueError("Test exception") - return x * 2 - - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.map(failing_function) - - try: - list(transformer([1, 2, 3, 4])) - raise AssertionError("Expected exception was not raised") - except ValueError as e: - assert "Test exception" in str(e) - - -class TestParallelTransformerChunking: - """Test chunking behavior with concurrent execution.""" - - def test_chunking_with_concurrent_execution(self): - """Test that chunking works correctly with concurrent execution.""" - # Use a function that can help us verify chunking - processed_chunks = [] - - def chunk_tracking_function(x: int) -> int: - # This will help us see which items are processed together - processed_chunks.append(x) - return x * 2 - - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) - transformer = transformer.map(chunk_tracking_function) - result = list(transformer([1, 2, 3, 4, 5, 6, 7])) - - assert result == [2, 4, 6, 8, 10, 12, 14] - # All items should have been processed - assert sorted(processed_chunks) == [1, 2, 3, 4, 5, 6, 7] - - def test_large_chunk_size_concurrent(self): - """Test parallel transformer with large chunk size.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=1000) - transformer = transformer.map(lambda x: x + 1) - large_data = list(range(100)) # Much smaller than chunk size - result = list(transformer(large_data)) - expected = [x + 1 for x in large_data] - assert result == expected - - -class TestParallelTransformerContextModification: - """Test context modification behavior with parallel transformer.""" - - def test_context_modification_with_locking(self): - """Test that context modification with locking works correctly in concurrent execution.""" - # Create context with items counter and a lock for thread safety - context = PipelineContext({"items": 0, "_lock": threading.Lock()}) - - def increment_counter(x: int, ctx: PipelineContext) -> int: - """Increment the items counter in context thread-safely.""" - with ctx["_lock"]: - current_items = ctx["items"] - # Small delay to increase chance of race condition without locking - time.sleep(0.001) - ctx["items"] = current_items + 1 - return x * 2 - - transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=1) - transformer = transformer.map(increment_counter) - - data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - result = list(transformer(data, context)) - - # Verify results are correct - expected_result = [x * 2 for x in data] - assert sorted(result) == sorted(expected_result) - - # Verify context was modified correctly - items should equal number of processed elements - assert context["items"] == len(data) - - def test_context_modification_without_locking_shows_race_condition(self): - """Test that context modification without locking can lead to race conditions.""" - # Create context without lock to demonstrate race condition - context = PipelineContext({"items": 0}) - - def unsafe_increment_counter(x: int, ctx: PipelineContext) -> int: - """Increment the items counter without thread safety.""" - current_items = ctx["items"] - # Delay to increase chance of race condition - time.sleep(0.001) - ctx["items"] = current_items + 1 - return x * 2 - - transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=1) - transformer = transformer.map(unsafe_increment_counter) - - data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - result = list(transformer(data, context)) - - # Results should still be correct - expected_result = [x * 2 for x in data] - assert sorted(result) == sorted(expected_result) - - # Without locking, the final count will likely be less than expected due to race conditions - # This test may occasionally pass by chance, but should usually fail - # We'll check it's at least 1 but likely less than the full count - assert context["items"] >= 1 - # In a race condition, this will typically be less than len(data) - # But we can't assert this reliably in a test, so we just verify it's reasonable - assert context["items"] <= len(data) - - def test_multiple_context_values_with_locking(self): - """Test modifying multiple context values safely in concurrent execution.""" - context = PipelineContext({"total_sum": 0, "item_count": 0, "max_value": 0, "_lock": threading.Lock()}) - - def update_statistics(x: int, ctx: PipelineContext) -> int: - """Update multiple statistics in context thread-safely.""" - with ctx["_lock"]: - ctx["total_sum"] += x - ctx["item_count"] += 1 - ctx["max_value"] = max(ctx["max_value"], x) - return x * 3 - - transformer = ParallelTransformer[int, int](max_workers=3, chunk_size=2) - transformer = transformer.map(update_statistics) - - data = [1, 5, 3, 8, 2, 7, 4, 6] - result = list(transformer(data, context)) - - # Verify transformation results - expected_result = [x * 3 for x in data] - assert sorted(result) == sorted(expected_result) - - # Verify context statistics were updated correctly - assert context["total_sum"] == sum(data) - assert context["item_count"] == len(data) - assert context["max_value"] == max(data) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py deleted file mode 100644 index 6de9a51..0000000 --- a/tests/test_pipeline.py +++ /dev/null @@ -1,242 +0,0 @@ -"""Tests for the Pipeline class.""" - -from efemel.pipeline.pipeline import Pipeline -from efemel.pipeline.transformers.transformer import Transformer - - -class TestPipelineBasics: - """Test basic pipeline functionality.""" - - def test_pipeline_creation_from_single_iterable(self): - """Test creating pipeline from single iterable.""" - pipeline = Pipeline([1, 2, 3]) - result = pipeline.to_list() - assert result == [1, 2, 3] - - def test_pipeline_creation_from_multiple_iterables(self): - """Test creating pipeline from multiple iterables.""" - pipeline = Pipeline([1, 2], [3, 4], [5]) - result = pipeline.to_list() - assert result == [1, 2, 3, 4, 5] - - def test_pipeline_iteration(self): - """Test that pipeline can be iterated over.""" - pipeline = Pipeline([1, 2, 3]) - result = list(pipeline) - assert result == [1, 2, 3] - - def test_pipeline_to_list_multiple_calls(self): - """Test that to_list can be called multiple times (iterator consumption).""" - pipeline = Pipeline([1, 2, 3]) - first_result = pipeline.to_list() - second_result = pipeline.to_list() - # Note: This tests current behavior - iterator is consumed after first call - assert first_result == [1, 2, 3] - assert second_result == [] # Iterator is consumed - - -class TestPipelineApply: - """Test apply method with transformers.""" - - def test_apply_transformer(self): - """Test apply with chained transformer operations.""" - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 4) - pipeline = Pipeline([1, 2, 3, 4]).apply(transformer) - result = pipeline.to_list() - assert result == [6, 8] - - def test_apply_callable_function(self): - """Test apply with a callable function.""" - - def double_generator(data): - for item in data: - yield item * 2 - - pipeline = Pipeline([1, 2, 3]).apply(double_generator) - result = pipeline.to_list() - assert result == [2, 4, 6] - - -class TestPipelineTransform: - """Test transform shorthand method.""" - - def test_transform_chain_operations(self): - """Test transform with chained operations.""" - pipeline = Pipeline([1, 2, 3, 4]).transform(lambda t: t.map(lambda x: x * 2).filter(lambda x: x > 4)) - result = pipeline.to_list() - assert result == [6, 8] - - def test_transform_with_custom_transformer(self): - """Test transform with custom transformer class.""" - - def custom_transform(transformer: Transformer[int, int]) -> Transformer[int, int]: - return transformer.map(lambda x: x + 10).filter(lambda x: x > 12) - - pipeline = Pipeline([1, 2, 3]).transform(custom_transform) - result = pipeline.to_list() - assert result == [13] # [11, 12, 13] filtered to [13] - - -class TestPipelineTerminalOperations: - """Test terminal operations that consume the pipeline.""" - - def test_each_applies_function_to_all_elements(self): - """Test each applies function to all elements.""" - results = [] - pipeline = Pipeline([1, 2, 3]) - pipeline.each(lambda x: results.append(x * 2)) - assert results == [2, 4, 6] - - def test_first_gets_first_n_elements(self): - """Test first gets first n elements.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - result = pipeline.first(3) - assert result == [1, 2, 3] - - def test_first_default_gets_one_element(self): - """Test first with default argument gets one element.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - result = pipeline.first() - assert result == [1] - - def test_first_with_small_dataset(self): - """Test first when requesting more elements than available.""" - pipeline = Pipeline([1, 2]) - result = pipeline.first(5) - assert result == [1, 2] - - def test_consume_processes_without_return(self): - """Test consume processes all elements without returning anything.""" - side_effects = [] - transformer = Transformer.init(int).tap(lambda x: side_effects.append(x)) - pipeline = Pipeline([1, 2, 3]).apply(transformer) - - result = pipeline.consume() - assert result is None - assert side_effects == [1, 2, 3] - - def test_to_list_returns_all_elements(self): - """Test to_list returns all processed elements.""" - pipeline = Pipeline([1, 2, 3, 4, 5]) - result = pipeline.to_list() - assert result == [1, 2, 3, 4, 5] - - -class TestPipelineChaining: - """Test chaining pipeline operations.""" - - def test_apply_then_terminal_operation(self): - """Test applying transformer then using terminal operation.""" - transformer = Transformer.init(int).map(lambda x: x * 2) - pipeline = Pipeline([1, 2, 3]).apply(transformer) - result = pipeline.first(2) - assert result == [2, 4] - - def test_multiple_transforms(self): - """Test applying multiple transforms.""" - pipeline = ( - Pipeline([1, 2, 3, 4]).transform(lambda t: t.map(lambda x: x * 2)).transform(lambda t: t.filter(lambda x: x > 4)) - ) - result = pipeline.to_list() - assert result == [6, 8] - - def test_transform_then_apply(self): - """Test transform followed by apply.""" - transformer = Transformer.init(int).filter(lambda x: x > 4) - pipeline = Pipeline([1, 2, 3, 4, 5]).transform(lambda t: t.map(lambda x: x * 2)).apply(transformer) - result = pipeline.to_list() - assert result == [6, 8, 10] - - -class TestPipelineDataTypes: - """Test pipeline with different data types.""" - - def test_pipeline_with_strings(self): - """Test pipeline with string data.""" - pipeline = Pipeline(["hello", "world"]).transform(lambda t: t.map(lambda x: x.upper())) - result = pipeline.to_list() - assert result == ["HELLO", "WORLD"] - - def test_pipeline_with_mixed_data(self): - """Test pipeline with mixed data types.""" - pipeline = Pipeline([1, "hello", 3.14]).transform(lambda t: t.map(lambda x: str(x))) - result = pipeline.to_list() - assert result == ["1", "hello", "3.14"] - - def test_pipeline_with_complex_objects(self): - """Test pipeline with complex objects.""" - data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}] - pipeline = Pipeline(data).transform(lambda t: t.map(lambda x: x["name"])) - result = pipeline.to_list() - assert result == ["Alice", "Bob"] - - -class TestPipelineEdgeCases: - """Test edge cases for pipeline.""" - - def test_empty_pipeline(self): - """Test pipeline with empty data.""" - pipeline = Pipeline([]) - result = pipeline.to_list() - assert result == [] - - def test_empty_pipeline_terminal_operations(self): - """Test terminal operations on empty pipeline.""" - # Test each - results = [] - pipeline_each = Pipeline([]) - pipeline_each.each(lambda x: results.append(x)) - assert results == [] - - # Test first - pipeline_first = Pipeline([]) - result = pipeline_first.first(5) - assert result == [] - - # Test consume - pipeline_consume = Pipeline([]) - result = pipeline_consume.consume() - assert result is None - - def test_single_element_pipeline(self): - """Test pipeline with single element.""" - pipeline = Pipeline([42]) - result = pipeline.to_list() - assert result == [42] - - def test_pipeline_type_preservation(self): - """Test that pipeline preserves and transforms types correctly.""" - # Start with integers - pipeline = Pipeline([1, 2, 3]) - int_result = pipeline.to_list() - assert all(isinstance(x, int) for x in int_result) - - # Transform to strings - pipeline = Pipeline([1, 2, 3]).transform(lambda t: t.map(lambda x: str(x))) - str_result = pipeline.to_list() - assert all(isinstance(x, str) for x in str_result) - assert str_result == ["1", "2", "3"] - - -class TestPipelinePerformance: - """Test pipeline performance characteristics.""" - - def test_large_dataset_processing(self): - """Test pipeline can handle large datasets.""" - large_data = list(range(10000)) - pipeline = Pipeline(large_data).transform(lambda t: t.map(lambda x: x * 2).filter(lambda x: x % 100 == 0)) - result = pipeline.to_list() - - # Should have every 50th element doubled (0, 100, 200, ..., 19800) - expected = [x * 2 for x in range(0, 10000, 50)] - assert result == expected - - def test_chunked_processing(self): - """Test that chunked processing works correctly.""" - # Use small chunk size to test chunking behavior - transformer = Transformer.init(int, chunk_size=10).map(lambda x: x + 1) - pipeline = Pipeline(list(range(100))).apply(transformer) - result = pipeline.to_list() - - expected = list(range(1, 101)) # [1, 2, 3, ..., 100] - assert result == expected diff --git a/tests/test_transformer.py b/tests/test_transformer.py deleted file mode 100644 index e970863..0000000 --- a/tests/test_transformer.py +++ /dev/null @@ -1,237 +0,0 @@ -"""Tests for the Transformer class.""" - -from efemel.pipeline.transformers.transformer import PipelineContext -from efemel.pipeline.transformers.transformer import Transformer - - -class TestTransformerBasics: - """Test basic transformer functionality.""" - - def test_init_creates_identity_transformer(self): - """Test that init creates an identity transformer.""" - transformer = Transformer.init(int) - result = list(transformer([1, 2, 3])) - assert result == [1, 2, 3] - - def test_init_with_chunk_size(self): - """Test init with custom chunk size.""" - transformer = Transformer.init(int, chunk_size=2) - assert transformer.chunk_size == 2 - result = list(transformer([1, 2, 3, 4])) - assert result == [1, 2, 3, 4] - - def test_call_executes_transformer(self): - """Test that calling transformer executes it on data.""" - transformer = Transformer.init(int) - result = list(transformer([1, 2, 3])) - assert result == [1, 2, 3] - - -class TestTransformerOperations: - """Test transformer operations like map, filter, etc.""" - - def test_map_transforms_elements(self): - """Test map transforms each element.""" - transformer = Transformer.init(int).map(lambda x: x * 2) - result = list(transformer([1, 2, 3])) - assert result == [2, 4, 6] - - def test_map_with_context_aware_function(self): - """Test map with context-aware function.""" - context = PipelineContext({"multiplier": 3}) - transformer = Transformer().map(lambda x, ctx: x * ctx["multiplier"]) - result = list(transformer([1, 2, 3], context)) - assert result == [3, 6, 9] - - def test_filter_keeps_matching_elements(self): - """Test filter keeps only matching elements.""" - transformer = Transformer.init(int).filter(lambda x: x % 2 == 0) - result = list(transformer([1, 2, 3, 4, 5, 6])) - assert result == [2, 4, 6] - - def test_filter_with_context_aware_function(self): - """Test filter with context-aware function.""" - context = PipelineContext({"threshold": 3}) - transformer = Transformer().filter(lambda x, ctx: x > ctx["threshold"]) - result = list(transformer([1, 2, 3, 4, 5], context)) - assert result == [4, 5] - - def test_flatten_list_of_lists(self): - """Test flatten with list of lists.""" - transformer = Transformer.init(list).flatten() - result = list(transformer([[1, 2], [3, 4], [5]])) - assert result == [1, 2, 3, 4, 5] - - def test_flatten_list_of_tuples(self): - """Test flatten with list of tuples.""" - transformer = Transformer.init(tuple).flatten() - result = list(transformer([(1, 2), (3, 4), (5,)])) - assert result == [1, 2, 3, 4, 5] - - def test_flatten_list_of_sets(self): - """Test flatten with list of sets.""" - transformer = Transformer.init(set).flatten() - result = list(transformer([{1, 2}, {3, 4}, {5}])) - # Sets are unordered, so we sort for comparison - assert sorted(result) == [1, 2, 3, 4, 5] - - def test_tap_applies_side_effect_without_modification(self): - """Test tap applies side effect without modifying data.""" - side_effects = [] - transformer = Transformer.init(int).tap(lambda x: side_effects.append(x)) - result = list(transformer([1, 2, 3])) - - assert result == [1, 2, 3] # Data unchanged - assert side_effects == [1, 2, 3] # Side effect applied - - def test_tap_with_context_aware_function(self): - """Test tap with context-aware function.""" - side_effects = [] - context = PipelineContext({"prefix": "item:"}) - transformer = Transformer().tap(lambda x, ctx: side_effects.append(f"{ctx['prefix']}{x}")) - result = list(transformer([1, 2, 3], context)) - - assert result == [1, 2, 3] - assert side_effects == ["item:1", "item:2", "item:3"] - - def test_apply_composes_transformers(self): - """Test apply composes transformers.""" - transformer1 = Transformer.init(int).map(lambda x: x * 2) - transformer2 = transformer1.apply(lambda t: t.filter(lambda x: x > 4)) - result = list(transformer2([1, 2, 3, 4])) - assert result == [6, 8] # [2, 4, 6, 8] filtered to [6, 8] - - -class TestTransformerChaining: - """Test chaining multiple transformer operations.""" - - def test_map_then_filter(self): - """Test map followed by filter.""" - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 4) - result = list(transformer([1, 2, 3, 4])) - assert result == [6, 8] - - def test_filter_then_map(self): - """Test filter followed by map.""" - transformer = Transformer.init(int).filter(lambda x: x % 2 == 0).map(lambda x: x * 3) - result = list(transformer([1, 2, 3, 4, 5, 6])) - assert result == [6, 12, 18] - - def test_map_flatten_filter(self): - """Test map, flatten, then filter.""" - transformer = Transformer.init(int).map(lambda x: [x, x * 2]).flatten().filter(lambda x: x > 3) - result = list(transformer([1, 2, 3])) - assert result == [4, 6] # [[1,2], [2,4], [3,6]] -> [1,2,2,4,3,6] -> [4,6] - - def test_complex_chain_with_tap(self): - """Test complex chain with tap for side effects.""" - side_effects = [] - transformer = ( - Transformer.init(int) - .map(lambda x: x * 2) - .tap(lambda x: side_effects.append(f"doubled: {x}")) - .filter(lambda x: x > 4) - .tap(lambda x: side_effects.append(f"filtered: {x}")) - ) - - result = list(transformer([1, 2, 3, 4])) - assert result == [6, 8] - assert side_effects == ["doubled: 2", "doubled: 4", "doubled: 6", "doubled: 8", "filtered: 6", "filtered: 8"] - - -class TestTransformerTerminalOperations: - """Test terminal operations like reduce.""" - - def test_reduce_sums_elements(self): - """Test reduce sums all elements.""" - transformer = Transformer.init(int) - reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) - result = list(reducer([1, 2, 3, 4])) - assert result == [10] - - def test_reduce_with_context_aware_function(self): - """Test reduce with context-aware function.""" - context = PipelineContext({"multiplier": 2}) - transformer = Transformer() - reducer = transformer.reduce(lambda acc, x, ctx: acc + (x * ctx["multiplier"]), initial=0) - result = list(reducer([1, 2, 3], context)) - assert result == [12] # (1*2) + (2*2) + (3*2) = 2 + 4 + 6 = 12 - - def test_reduce_with_map_chain(self): - """Test reduce after map transformation.""" - transformer = Transformer.init(int).map(lambda x: x * 2) - reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) - result = list(reducer([1, 2, 3])) - assert result == [12] # [2, 4, 6] summed = 12 - - -class TestTransformerChunking: - """Test chunking behavior.""" - - def test_chunking_with_small_chunk_size(self): - """Test transformer works correctly with small chunk sizes.""" - transformer = Transformer.init(int, chunk_size=2).map(lambda x: x * 2) - result = list(transformer([1, 2, 3, 4, 5])) - assert result == [2, 4, 6, 8, 10] - - def test_chunking_with_large_data(self): - """Test transformer works correctly with large datasets.""" - transformer = Transformer.init(int, chunk_size=100).map(lambda x: x + 1) - large_data = list(range(1000)) - result = list(transformer(large_data)) - expected = [x + 1 for x in large_data] - assert result == expected - - -class TestTransformerEdgeCases: - """Test edge cases for transformer.""" - - def test_empty_data(self): - """Test transformer with empty data.""" - transformer = Transformer.init(int).map(lambda x: x * 2) - result = list(transformer([])) - assert result == [] - - def test_single_element(self): - """Test transformer with single element.""" - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 0) - result = list(transformer([5])) - assert result == [10] - - def test_filter_removes_all_elements(self): - """Test filter that removes all elements.""" - transformer = Transformer.init(int).filter(lambda x: x > 100) - result = list(transformer([1, 2, 3])) - assert result == [] - - -class TestTransformerFromTransformer: - """Test the from_transformer class method.""" - - def test_from_transformer_copies_logic(self): - """Test that from_transformer copies transformation logic.""" - # Create a transformer with some operations - source = Transformer.init(int, chunk_size=50).map(lambda x: x * 3).filter(lambda x: x > 6) - - # Create new transformer from the source - target = Transformer.from_transformer(source) - - # Test both produce same results - data = [1, 2, 3, 4, 5] - source_result = list(source(data)) - target_result = list(target(data)) - - assert source_result == target_result - assert target.chunk_size == 50 # Chunk size should be copied - - def test_from_transformer_with_custom_parameters(self): - """Test from_transformer with custom parameters.""" - source = Transformer.init(int).map(lambda x: x * 2) - - target = Transformer.from_transformer(source, chunk_size=200) - - assert target.chunk_size == 200 # Custom chunk size - - # Should still have same transformation logic - data = [1, 2, 3] - assert list(source(data)) == list(target(data)) diff --git a/uv.lock b/uv.lock index 9476a50..f8ff7d8 100644 --- a/uv.lock +++ b/uv.lock @@ -135,6 +135,7 @@ name = "efemel" source = { editable = "." } dependencies = [ { name = "click" }, + { name = "laygo" }, ] [package.optional-dependencies] @@ -148,6 +149,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "click", specifier = ">=8.0.0" }, + { name = "laygo", specifier = ">=0.1.1" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0.0" }, { name = "pytest-xdist", extras = ["psutil"], marker = "extra == 'dev'", specifier = ">=3.8.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.1.0" }, @@ -253,6 +255,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d3/32/da7f44bcb1105d3e88a0b74ebdca50c59121d2ddf71c9e34ba47df7f3a56/keyring-25.6.0-py3-none-any.whl", hash = "sha256:552a3f7af126ece7ed5c89753650eec89c7eaae8617d0aa4d9ad2b75111266bd", size = 39085, upload-time = "2024-12-25T15:26:44.377Z" }, ] +[[package]] +name = "laygo" +version = "0.1.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/06/27/6656d1fb9d3bf5983902ed91604e2ed0be9beeeea5c0a38232b386029e45/laygo-0.1.1.tar.gz", hash = "sha256:b8a030cfc509f89e099317f5722c23ae3c3aa23ff6d6dbf38a8f857a72edb4ef", size = 44452, upload-time = "2025-07-16T18:20:47.478Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/74/0a/1ef73e31b24f10a68b69c2c891b619c2c0cebaa9338d08956c317131af55/laygo-0.1.1-py3-none-any.whl", hash = "sha256:28f804f8a873487f4b0e5e0502d0294a50a65a9285fe746560925571f941f7c3", size = 11933, upload-time = "2025-07-16T18:20:46.435Z" }, +] + [[package]] name = "markdown-it-py" version = "3.0.0"