diff --git a/efemel/pipeline/helpers.py b/efemel/pipeline/helpers.py new file mode 100644 index 0000000..b03f933 --- /dev/null +++ b/efemel/pipeline/helpers.py @@ -0,0 +1,49 @@ +from collections.abc import Callable +import inspect +from typing import Any +from typing import TypeGuard + + +# --- Type Aliases --- +class PipelineContext(dict): + """Generic, untyped context available to all pipeline operations.""" + + pass + + +# Define the specific callables for clarity +ContextAwareCallable = Callable[[Any, PipelineContext], Any] +ContextAwareReduceCallable = Callable[[Any, Any, PipelineContext], Any] + + +def get_function_param_count(func: Callable[..., Any]) -> int: + """ + Returns the number of parameters a function accepts, excluding `self` or `cls`. + This is useful for determining if a function is context-aware. + """ + try: + sig = inspect.signature(func) + params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] + return len(params) + except (ValueError, TypeError): + return 0 + + +def is_context_aware(func: Callable[..., Any]) -> TypeGuard[ContextAwareCallable]: + """ + Checks if a function is "context-aware" by inspecting its signature. + + This function uses a TypeGuard, allowing Mypy to narrow the type of + the checked function in conditional blocks. + """ + return get_function_param_count(func) >= 2 + + +def is_context_aware_reduce(func: Callable[..., Any]) -> TypeGuard[ContextAwareReduceCallable]: + """ + Checks if a function is "context-aware" by inspecting its signature. + + This function uses a TypeGuard, allowing Mypy to narrow the type of + the checked function in conditional blocks. + """ + return get_function_param_count(func) >= 3 diff --git a/efemel/pipeline/pipeline.py b/efemel/pipeline/pipeline.py index 37cb288..01b1a8a 100644 --- a/efemel/pipeline/pipeline.py +++ b/efemel/pipeline/pipeline.py @@ -5,6 +5,9 @@ from typing import Any from typing import TypedDict from typing import TypeVar +from typing import overload + +from efemel.pipeline.helpers import is_context_aware from .transformers.transformer import Transformer @@ -28,15 +31,53 @@ class Pipeline[T]: def __init__(self, *data: Iterable[T]): self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0] self.processed_data: Iterator = iter(self.data_source) + self.ctx = PipelineContext() - def apply[U](self, transformer: Transformer[T, U] | Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": + def context(self, ctx: PipelineContext) -> "Pipeline[T]": + """ + Sets the context for the pipeline. + """ + self.ctx = ctx + return self + + @overload + def apply[U](self, transformer: Transformer[T, U]) -> "Pipeline[U]": ... + + @overload + def apply[U](self, transformer: Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": ... + + @overload + def apply[U]( + self, + transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]], + ) -> "Pipeline[U]": ... + + def apply[U]( + self, + transformer: Transformer[T, U] + | Callable[[Iterable[T]], Iterator[U]] + | Callable[[Iterable[T], PipelineContext], Iterator[U]], + ) -> "Pipeline[U]": """ Applies a transformer to the current data source. """ - # The transformer is called with the current processed data, producing a new iterator - new_data = transformer(self.processed_data) - # Create a new pipeline with the transformed data - self.processed_data = new_data + + match transformer: + case Transformer(): + # If a Transformer instance is provided, use its __call__ method + self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore + case _ if callable(transformer): + # If a callable function is provided, call it with the current data and context + + if is_context_aware(transformer): + processed_transformer = transformer + else: + processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731 + + self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore + case _: + raise TypeError("Transformer must be a Transformer instance or a callable function") + return self # type: ignore def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]": diff --git a/efemel/pipeline/transformers/parallel.py b/efemel/pipeline/transformers/parallel.py new file mode 100644 index 0000000..bf616be --- /dev/null +++ b/efemel/pipeline/transformers/parallel.py @@ -0,0 +1,147 @@ +"""Parallel transformer implementation using multiple threads.""" + +from collections import deque +from collections.abc import Iterable +from collections.abc import Iterator +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import Future +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait +import copy +from functools import partial +import itertools +import threading +from typing import TypedDict + +from .transformer import DEFAULT_CHUNK_SIZE +from .transformer import InternalTransformer +from .transformer import PipelineContext +from .transformer import Transformer + + +# --- Type Definitions --- +class ParallelPipelineContextType(TypedDict): + """A specific context type for parallel transformers that includes a lock.""" + + lock: threading.Lock + + +# --- Class Definition --- +class ParallelTransformer[In, Out](Transformer[In, Out]): + """ + A transformer that executes operations concurrently using multiple threads. + """ + + def __init__( + self, + max_workers: int = 4, + ordered: bool = True, + chunk_size: int = DEFAULT_CHUNK_SIZE, + transformer: InternalTransformer[In, Out] | None = None, + ): + """ + Initialize the parallel transformer. + + Args: + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. If False, results + are yielded as they complete. + chunk_size: Size of data chunks to process. + transformer: The transformation logic chain. + """ + super().__init__(chunk_size, transformer) + self.max_workers = max_workers + self.ordered = ordered + + @classmethod + def from_transformer[T, U]( + cls, + transformer: Transformer[T, U], + chunk_size: int | None = None, + max_workers: int = 4, + ordered: bool = True, + ) -> "ParallelTransformer[T, U]": + """ + Create a ParallelTransformer from an existing Transformer's logic. + + Args: + transformer: The base transformer to copy the transformation logic from. + chunk_size: Optional chunk size override. + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. + + Returns: + A new ParallelTransformer with the same transformation logic. + """ + return cls( + chunk_size=chunk_size or transformer.chunk_size, + transformer=copy.deepcopy(transformer.transformer), # type: ignore + max_workers=max_workers, + ordered=ordered, + ) + + def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: + """ + Executes the transformer on data concurrently. + + A new `threading.Lock` is created and added to the context for each call + to ensure execution runs are isolated and thread-safe. + """ + # Determine the context for this run, passing it by reference as requested. + run_context = context or self.context + # Add a per-call lock for thread safety. + run_context["lock"] = threading.Lock() + + def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]: + """ + Process a single chunk by passing the chunk and context explicitly + to the transformer chain. This is safer and avoids mutating self. + """ + return self.transformer(chunk, shared_context) + + # Create a partial function with the run_context "baked in". + process_chunk_with_context = partial(process_chunk, shared_context=run_context) + + def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results in their original order.""" + futures: deque[Future[list[Out]]] = deque() + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + break + while futures: + yield futures.popleft().result() + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + continue + + def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results as they complete.""" + futures = { + executor.submit(process_chunk_with_context, chunk) + for chunk in itertools.islice(chunks_iter, self.max_workers + 1) + } + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() + try: + chunk = next(chunks_iter) + futures.add(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + continue + + def result_iterator_manager() -> Iterator[Out]: + """Manage the thread pool and yield flattened results.""" + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + chunks_to_process = self._chunk_generator(data) + gen_func = _ordered_generator if self.ordered else _unordered_generator + processed_chunks_iterator = gen_func(chunks_to_process, executor) + for result_chunk in processed_chunks_iterator: + yield from result_chunk + + return result_iterator_manager() diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py index 6076587..aab2dfa 100644 --- a/efemel/pipeline/transformers/transformer.py +++ b/efemel/pipeline/transformers/transformer.py @@ -1,46 +1,59 @@ from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator +import copy from functools import reduce -import inspect import itertools from typing import Any from typing import Self from typing import Union from typing import overload -# --- Type Aliases --- -type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] -type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] +from efemel.pipeline.helpers import PipelineContext +from efemel.pipeline.helpers import is_context_aware +from efemel.pipeline.helpers import is_context_aware_reduce + +DEFAULT_CHUNK_SIZE = 1000 -class PipelineContext(dict): - """Global context available to all pipeline operations.""" +type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] +type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] - pass +# The internal transformer function signature is changed to explicitly accept a context. +type InternalTransformer[In, Out] = Callable[[list[In], PipelineContext], list[Out]] class Transformer[In, Out]: """ - Defines and composes data transformations (e.g., map, filter). - - Transformers are callable and return a generator, applying the composed - transformation logic to an iterable data source. + Defines and composes data transformations by passing context explicitly. """ def __init__( self, - chunk_size: int = 1000, - context: PipelineContext | None = None, + chunk_size: int = DEFAULT_CHUNK_SIZE, + transformer: InternalTransformer[In, Out] | None = None, ): self.chunk_size = chunk_size - self.context = context or PipelineContext() - self.transformer = lambda chunk: chunk + self.context: PipelineContext = PipelineContext() + # The default transformer now accepts and ignores a context argument. + self.transformer: InternalTransformer[In, Out] = transformer or (lambda chunk, ctx: chunk) # type: ignore @classmethod - def init[T](cls, _type_hint: type[T], chunk_size: int = 1000) -> "Transformer[T, T]": - """Create a new identity pipeline with explicit type hint.""" - return cls(chunk_size) # type: ignore + def init[T](cls, _type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": + """Create a new identity pipeline with an explicit type hint.""" + return cls(chunk_size=chunk_size) # type: ignore + + @classmethod + def from_transformer[T, U]( + cls, + transformer: "Transformer[T, U]", + chunk_size: int | None = None, + ) -> "Transformer[T, U]": + """Create a new transformer from an existing one, copying its logic.""" + return cls( + chunk_size=chunk_size or transformer.chunk_size, + transformer=copy.deepcopy(transformer.transformer), # type: ignore + ) def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: """Breaks an iterable into chunks of a specified size.""" @@ -48,91 +61,91 @@ def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: while chunk := list(itertools.islice(data_iter, self.chunk_size)): yield chunk - def _pipe[U](self, operation: Callable[[list[Out]], list[U]]) -> "Transformer[In, U]": - """Composes the current transformer with a new chunk-wise operation.""" + def _pipe[U](self, operation: Callable[[list[Out], PipelineContext], list[U]]) -> "Transformer[In, U]": + """Composes the current transformer with a new context-aware operation.""" prev_transformer = self.transformer - self.transformer = lambda chunk: operation(prev_transformer(chunk)) + # The new transformer chain ensures the context `ctx` is passed at each step. + self.transformer = lambda chunk, ctx: operation(prev_transformer(chunk, ctx), ctx) # type: ignore return self # type: ignore - def _create_context_aware_function(self, func: Callable) -> Callable: - """Creates a function that correctly handles the context parameter.""" - try: - sig = inspect.signature(func) - params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] - if len(params) >= 2: - return lambda value: func(value, self.context) - except (ValueError, TypeError): - pass - return func - - def _create_reduce_function(self, func: Callable) -> Callable: - """Creates a reduce function that correctly handles the context parameter.""" - try: - sig = inspect.signature(func) - params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] - if len(params) >= 3: - return lambda acc, value: func(acc, value, self.context) - except (ValueError, TypeError): - pass - return func - def map[U](self, function: PipelineFunction[Out, U]) -> "Transformer[In, U]": - """Transforms elements in the pipeline.""" - std_function = self._create_context_aware_function(function) - return self._pipe(lambda chunk: [std_function(x) for x in chunk]) + """Transforms elements, passing context explicitly to the mapping function.""" + if is_context_aware(function): + return self._pipe(lambda chunk, ctx: [function(x, ctx) for x in chunk]) + + return self._pipe(lambda chunk, _ctx: [function(x) for x in chunk]) def filter(self, predicate: PipelineFunction[Out, bool]) -> "Transformer[In, Out]": - """Filters elements in the pipeline.""" - std_predicate = self._create_context_aware_function(predicate) - return self._pipe(lambda chunk: [x for x in chunk if std_predicate(x)]) + """Filters elements, passing context explicitly to the predicate function.""" + if is_context_aware(predicate): + return self._pipe(lambda chunk, ctx: [x for x in chunk if predicate(x, ctx)]) + + return self._pipe(lambda chunk, _ctx: [x for x in chunk if predicate(x)]) @overload def flatten[T](self: "Transformer[In, list[T]]") -> "Transformer[In, T]": ... - @overload def flatten[T](self: "Transformer[In, tuple[T, ...]]") -> "Transformer[In, T]": ... - @overload def flatten[T](self: "Transformer[In, set[T]]") -> "Transformer[In, T]": ... def flatten[T]( self: Union["Transformer[In, list[T]]", "Transformer[In, tuple[T, ...]]", "Transformer[In, set[T]]"], ) -> "Transformer[In, T]": - """Flatten nested lists into a single list.""" - return self._pipe(lambda chunk: [item for sublist in chunk for item in sublist]) # type: ignore + """Flattens nested lists; the context is passed through the operation.""" + return self._pipe(lambda chunk, ctx: [item for sublist in chunk for item in sublist]) # type: ignore def tap(self, function: PipelineFunction[Out, Any]) -> "Transformer[In, Out]": """Applies a side-effect function without modifying the data.""" - std_function = self._create_context_aware_function(function) - def tap_operation(chunk: list[Out]) -> list[Out]: - for item in chunk: - std_function(item) - return chunk + if is_context_aware(function): + return self._pipe(lambda chunk, ctx: [x for x in chunk if function(x, ctx) or True]) - return self._pipe(tap_operation) + return self._pipe(lambda chunk, ctx: [x for x in chunk if function(x) or True]) def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In, T]": """Apply another pipeline to the current one.""" return t(self) - # Terminal operations - # These operations execute the transformer on a data source and yield results. - # If you want to operate on the results, you need to use a Pipeline and apply - # a different transformer to it. - - def __call__(self, data: Iterable[In]) -> Iterator[Out]: + def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: """ - Executes the transformer on a data source (terminal operations). + Executes the transformer on a data source. + + It uses the provided `context` by reference. If none is provided, it uses + the transformer's internal context. """ + # Use the provided context by reference, or default to the instance's context. + run_context = context or self.context + for chunk in self._chunk_generator(data): - yield from self.transformer(chunk) + # The context is now passed explicitly through the transformer chain. + yield from self.transformer(chunk, run_context) - def reduce[U](self, function: PipelineReduceFunction[Out, U], initial: U): + def reduce[U](self, function: PipelineReduceFunction[U, Out], initial: U): """Reduces elements to a single value (terminal operation).""" - reducer = self._create_reduce_function(function) - def _reduce(data: Iterable[In]) -> Iterator[U]: - yield reduce(reducer, self(data), initial) + if is_context_aware_reduce(function): + + def _reduce_with_context(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: + # The context for the run is determined here. + run_context = context or self.context + + data_iterator = self(data, run_context) + + def function_wrapper(acc: U, value: Out) -> U: + return function(acc, value, run_context) + + yield reduce(function_wrapper, data_iterator, initial) + + return _reduce_with_context + + # Not context-aware, so we adapt the function to ignore the context. + def _reduce(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: + # The context for the run is determined here. + run_context = context or self.context + + data_iterator = self(data, run_context) + + yield reduce(function, data_iterator, initial) return _reduce diff --git a/performance_test.py b/performance_test.py index 5dd0374..08dae5d 100644 --- a/performance_test.py +++ b/performance_test.py @@ -15,12 +15,13 @@ import statistics import time -from efemel.pipeline import Pipeline +from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.transformer import Transformer def generate_test_data(size: int = 1_000_000) -> list[int]: """Generate test data of specified size.""" - return range(size) + return range(size) # type: ignore def generator_approach(data: list[int]) -> list[int]: @@ -54,9 +55,9 @@ def step4(items): def builtin_map_filter_approach(data: list[int]) -> list[int]: """Process data using built-in map/filter chaining.""" result = filter(lambda x: x % 2 == 0, data) # Keep even numbers - result = (x * 2 for x in result) # Double them + result = (x * 2 for x in result) # type: ignore result = filter(lambda x: x > 100, result) # Keep only > 100 - result = (x + 1 for x in result) # Add 1 + result = (x + 1 for x in result) # type: ignore return list(result) @@ -405,8 +406,8 @@ def chunked_pipeline_per_item_approach(data) -> list[int]: return list(PIPELINE_PER_ITEM.run(data)) -PIPELINE = ( - Pipeline.init(int) +PIPELINE_TRANSFORMER: Transformer = ( + Transformer() .filter(lambda x: x % 2 == 0) # Filter even numbers .map(lambda x: x * 2) # Double them .filter(lambda x: x > 100) # Filter > 100 @@ -416,7 +417,7 @@ def chunked_pipeline_per_item_approach(data) -> list[int]: def pipeline_approach(data: list[int]) -> list[int]: """Process data using the Pipeline class.""" - return PIPELINE.to_list(data) + return Pipeline(data).apply(PIPELINE_TRANSFORMER).to_list() def time_function(func, *args, **kwargs) -> float: diff --git a/test.py b/test.py new file mode 100644 index 0000000..a2328d3 --- /dev/null +++ b/test.py @@ -0,0 +1,29 @@ +from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.transformer import Transformer + + +def generate_test_data(size: int = 1_000_000) -> list[int]: + """Generate test data of specified size.""" + return range(size) # type: ignore + + +def run(): + PIPELINE_TRANSFORMER = ( + Transformer() + .filter(lambda x: x % 2 == 0) # Filter even numbers + .map(lambda x: x * 2) # Double them + .filter(lambda x: x > 100) # Filter > 100 + .map(lambda x: x + 1) + ) + + for i in range(20): + Pipeline(generate_test_data(1_000_000)).apply(PIPELINE_TRANSFORMER).to_list() + print(f"Finished run {i + 1}") + + +if __name__ == "__main__": + try: + run() + except Exception as e: + print(f"\n❌ Test failed with error: {e}") + raise diff --git a/tests/test_integration.py b/tests/test_integration.py index 1cbe1fc..7fc2f6b 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,6 +1,7 @@ """Integration tests for Pipeline and Transformer working together.""" from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.parallel import ParallelTransformer from efemel.pipeline.transformers.transformer import PipelineContext from efemel.pipeline.transformers.transformer import Transformer @@ -19,13 +20,11 @@ def test_basic_integration(self): def test_context_sharing(self): """Test that context is properly shared in transformations.""" - context = PipelineContext({"multiplier": 3, "threshold": 5}) + context = {"multiplier": 3, "threshold": 5} - transformer = ( - Transformer(context=context).map(lambda x, ctx: x * ctx["multiplier"]).filter(lambda x, ctx: x > ctx["threshold"]) - ) + transformer = Transformer().map(lambda x, ctx: x * ctx["multiplier"]).filter(lambda x, ctx: x > ctx["threshold"]) - result = Pipeline([1, 2, 3]).apply(transformer).to_list() + result = Pipeline([1, 2, 3]).context(context).apply(transformer).to_list() assert result == [6, 9] # [3, 6, 9] filtered to [6, 9] def test_reduce_integration(self): @@ -219,3 +218,163 @@ def test_streaming_processing(self): results.extend(batch_result) assert results == [3, 6, 9, 12, 15, 18] + + +class TestPipelineParallelTransformerIntegration: + """Test Pipeline integration with ParallelTransformer and context modification.""" + + def test_pipeline_with_parallel_transformer_context_modification(self): + """Test pipeline calling parallel transformer that safely modifies context.""" + # Create context with statistics to be updated by parallel processing + context = PipelineContext({"processed_count": 0, "sum_total": 0, "max_value": 0}) + + def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: + """Safely increment context counters and transform the value.""" + with ctx["lock"]: + ctx["processed_count"] += 1 + ctx["sum_total"] += x + ctx["max_value"] = max(ctx["max_value"], x) + return x * 2 + + # Create parallel transformer that modifies context safely + parallel_transformer = ParallelTransformer[int, int](max_workers=3, chunk_size=2) + parallel_transformer = parallel_transformer.map(safe_increment_and_transform) + + # Use pipeline to process data with parallel transformer + data = [1, 5, 3, 8, 2, 7, 4, 6] + result = Pipeline(data).context(context).apply(parallel_transformer).to_list() + + # Verify transformation results + expected_result = [x * 2 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context was safely modified by parallel workers + assert context["processed_count"] == len(data) + assert context["sum_total"] == sum(data) + assert context["max_value"] == max(data) + + def test_pipeline_accesses_context_after_parallel_processing(self): + """Test that pipeline can access context data modified by parallel transformer.""" + # Create context with counters + context = PipelineContext({"items_processed": 0, "even_count": 0, "odd_count": 0}) + + def count_and_transform(x: int, ctx: PipelineContext) -> int: + """Count even/odd numbers and transform.""" + with ctx["lock"]: + ctx["items_processed"] += 1 + if x % 2 == 0: + ctx["even_count"] += 1 + else: + ctx["odd_count"] += 1 + return x * 3 + + parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + parallel_transformer = parallel_transformer.map(count_and_transform) + + data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + # Process with pipeline + pipeline = Pipeline(data).context(context) + result = pipeline.apply(parallel_transformer).to_list() + + # Access the modified context from pipeline + final_context = pipeline.ctx + + # Verify results + expected_result = [x * 3 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context statistics computed by parallel workers + assert final_context["items_processed"] == 10 + assert final_context["even_count"] == 5 # 2, 4, 6, 8, 10 + assert final_context["odd_count"] == 5 # 1, 3, 5, 7, 9 + + # Demonstrate context access after processing + total_processed = final_context["even_count"] + final_context["odd_count"] + assert total_processed == final_context["items_processed"] + + def test_multiple_parallel_transformers_sharing_context(self): + """Test multiple parallel transformers modifying the same context.""" + # Shared context for statistics across transformations + context = PipelineContext({"stage1_processed": 0, "stage2_processed": 0, "total_sum": 0}) + + def stage1_processor(x: int, ctx: PipelineContext) -> int: + """First stage processing with context update.""" + with ctx["lock"]: + ctx["stage1_processed"] += 1 + ctx["total_sum"] += x + return x * 2 + + def stage2_processor(x: int, ctx: PipelineContext) -> int: + """Second stage processing with context update.""" + with ctx["lock"]: + ctx["stage2_processed"] += 1 + ctx["total_sum"] += x # Add transformed value too + return x + 10 + + # Create two parallel transformers + stage1 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage1_processor) + stage2 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage2_processor) + + data = [1, 2, 3, 4, 5] + + # Chain parallel transformers in pipeline + pipeline = Pipeline(data).context(context) + result = ( + pipeline.apply(stage1) # [2, 4, 6, 8, 10] + .apply(stage2) # [12, 14, 16, 18, 20] + .to_list() + ) + + # Verify final results + expected_stage1 = [x * 2 for x in data] # [2, 4, 6, 8, 10] + expected_final = [x + 10 for x in expected_stage1] # [12, 14, 16, 18, 20] + assert result == expected_final + + # Verify context reflects both stages + final_context = pipeline.ctx + assert final_context["stage1_processed"] == 5 + assert final_context["stage2_processed"] == 5 + + # Total sum should include original values + transformed values + original_sum = sum(data) # 1+2+3+4+5 = 15 + stage1_sum = sum(expected_stage1) # 2+4+6+8+10 = 30 + expected_total = original_sum + stage1_sum # 15 + 30 = 45 + assert final_context["total_sum"] == expected_total + + def test_pipeline_context_isolation_with_parallel_processing(self): + """Test that different pipeline instances have isolated contexts.""" + + # Create base context structure + def create_context(): + return PipelineContext({"count": 0}) + + def increment_counter(x: int, ctx: PipelineContext) -> int: + """Increment counter in context.""" + with ctx["lock"]: + ctx["count"] += 1 + return x * 2 + + parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + parallel_transformer = parallel_transformer.map(increment_counter) + + data = [1, 2, 3] + + # Create two separate pipeline instances with their own contexts + pipeline1 = Pipeline(data).context(create_context()) + pipeline2 = Pipeline(data).context(create_context()) + + # Process with both pipelines + result1 = pipeline1.apply(parallel_transformer).to_list() + result2 = pipeline2.apply(parallel_transformer).to_list() + + # Both should have same transformation results + assert result1 == [2, 4, 6] + assert result2 == [2, 4, 6] + + # But contexts should be isolated + assert pipeline1.ctx["count"] == 3 + assert pipeline2.ctx["count"] == 3 + + # Verify they are different context objects + assert pipeline1.ctx is not pipeline2.ctx diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py new file mode 100644 index 0000000..da98a4f --- /dev/null +++ b/tests/test_parallel_transformer.py @@ -0,0 +1,357 @@ +"""Tests for the ParallelTransformer class.""" + +import threading +import time +from unittest.mock import patch + +from efemel.pipeline.transformers.parallel import ParallelTransformer +from efemel.pipeline.transformers.transformer import PipelineContext +from efemel.pipeline.transformers.transformer import Transformer + + +class TestParallelTransformerBasics: + """Test basic parallel transformer functionality.""" + + def test_init_creates_parallel_transformer(self): + """Test that init creates a parallel transformer with default values.""" + transformer = ParallelTransformer[int, int]() + assert transformer.max_workers == 4 + assert transformer.ordered is True + assert transformer.chunk_size == 1000 + + def test_init_with_custom_parameters(self): + """Test init with custom parameters.""" + transformer = ParallelTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) + assert transformer.max_workers == 8 + assert transformer.ordered is False + assert transformer.chunk_size == 500 + + def test_call_executes_transformer_concurrently(self): + """Test that calling parallel transformer executes it on data.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [1, 2, 3, 4, 5] + + def test_from_transformer_class_method(self): + """Test creating ParallelTransformer from existing Transformer.""" + # Create a regular transformer with some operations + regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) + + # Convert to parallel using the base class method + parallel = ParallelTransformer.from_transformer(regular, max_workers=2, ordered=True) + + # Test both produce same results + data = [1, 2, 3, 4, 5, 6] + regular_results = list(regular(data)) + parallel_results = list(parallel(data)) + + assert regular_results == parallel_results + assert parallel.max_workers == 2 + assert parallel.ordered is True + assert parallel.chunk_size == 100 + + +class TestParallelTransformerOperations: + """Test parallel transformer operations.""" + + def test_map_transforms_elements_concurrently(self): + """Test map transforms each element using multiple threads.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4])) + assert result == [2, 4, 6, 8] + + def test_filter_keeps_matching_elements_concurrently(self): + """Test filter keeps only matching elements using multiple threads.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) + result = list(transformer([1, 2, 3, 4, 5, 6])) + assert result == [2, 4, 6] + + def test_chained_operations_concurrently(self): + """Test chained operations work correctly with concurrency.""" + transformer = ( + ParallelTransformer[int, int](max_workers=2, chunk_size=2) + .map(lambda x: x * 2) + .filter(lambda x: x > 4) + .map(lambda x: x + 1) + ) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [7, 9, 11] # [2,4,6,8,10] -> [6,8,10] -> [7,9,11] + + def test_map_with_context_aware_function_concurrently(self): + """Test map with context-aware function in concurrent execution.""" + context = PipelineContext({"multiplier": 3}) + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) + result = list(transformer([1, 2, 3], context)) + assert result == [3, 6, 9] + + def test_flatten_concurrently(self): + """Test flatten operation works with concurrent execution.""" + transformer = ParallelTransformer[list, int](max_workers=2, chunk_size=2).flatten() + result = list(transformer([[1, 2], [3, 4], [5, 6]])) + assert result == [1, 2, 3, 4, 5, 6] + + def test_tap_applies_side_effects_concurrently(self): + """Test tap applies side effects correctly in concurrent execution.""" + side_effects = [] + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.tap(lambda x: side_effects.append(x)) + result = list(transformer([1, 2, 3, 4])) + + assert result == [1, 2, 3, 4] # Data unchanged + assert sorted(side_effects) == [1, 2, 3, 4] # Side effects applied (may be out of order) + + +class TestParallelTransformerOrdering: + """Test ordering behavior of parallel transformer.""" + + def test_ordered_execution_maintains_order(self): + """Test that ordered=True maintains element order.""" + + def slow_transform(x: int) -> int: + # Simulate variable processing time + time.sleep(0.01 * (5 - x)) # Later elements process faster + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) + transformer = transformer.map(slow_transform) + result = list(transformer([1, 2, 3, 4, 5])) + + assert result == [2, 4, 6, 8, 10] # Order maintained despite processing times + + def test_unordered_execution_allows_reordering(self): + """Test that ordered=False allows results in completion order.""" + transformer = ParallelTransformer[int, int](max_workers=2, ordered=False, chunk_size=1) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4])) + + # Results should have the same elements, but order may vary + assert sorted(result) == [2, 4, 6, 8] + + def test_ordered_vs_unordered_same_results(self): + """Test that ordered and unordered produce same elements, just different order.""" + data = list(range(10)) + + ordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) + ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) + + unordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) + unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) + + assert sorted(ordered_result) == sorted(unordered_result) + assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence + + +class TestParallelTransformerPerformance: + """Test performance aspects of parallel transformer.""" + + def test_concurrent_faster_than_sequential_for_slow_operations(self): + """Test that concurrent execution is faster for CPU-intensive operations.""" + + def slow_operation(x: int) -> int: + time.sleep(0.01) # 10ms delay + return x * 2 + + data = list(range(8)) # 8 items, 80ms total sequential time + + # Sequential execution + start_time = time.time() + sequential = Transformer[int, int](chunk_size=4) + seq_result = list(sequential.map(slow_operation)(data)) + seq_time = time.time() - start_time + + # Concurrent execution + start_time = time.time() + concurrent = ParallelTransformer[int, int](max_workers=4, chunk_size=4) + conc_result = list(concurrent.map(slow_operation)(data)) + conc_time = time.time() - start_time + + # Results should be the same + assert seq_result == conc_result + + # Concurrent should be faster (allowing some variance for thread overhead) + assert conc_time < seq_time * 0.8 # At least 20% faster + + def test_thread_pool_properly_managed(self): + """Test that thread pool is properly created and cleaned up.""" + with patch("efemel.pipeline.transformers.parallel.ThreadPoolExecutor") as mock_executor: + mock_executor.return_value.__enter__.return_value = mock_executor.return_value + mock_executor.return_value.__exit__.return_value = None + mock_executor.return_value.submit.return_value.result.return_value = [2, 4] + + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + list(transformer([1, 2])) + + # Verify ThreadPoolExecutor was created with correct max_workers + mock_executor.assert_called_with(max_workers=2) + # Verify context manager was used (enter/exit called) + mock_executor.return_value.__enter__.assert_called_once() + mock_executor.return_value.__exit__.assert_called_once() + + +class TestParallelTransformerEdgeCases: + """Test edge cases for parallel transformer.""" + + def test_empty_data_concurrent(self): + """Test parallel transformer with empty data.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([])) + assert result == [] + + def test_single_element_concurrent(self): + """Test parallel transformer with single element.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(lambda x: x * 2).filter(lambda x: x > 0) + result = list(transformer([5])) + assert result == [10] + + def test_data_smaller_than_chunk_size(self): + """Test parallel transformer when data is smaller than chunk size.""" + transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=100) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) + assert result == [2, 4, 6] + + def test_more_workers_than_chunks(self): + """Test parallel transformer when workers exceed number of chunks.""" + transformer = ParallelTransformer[int, int](max_workers=10, chunk_size=2) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers + assert result == [2, 4, 6] + + def test_exception_handling_in_concurrent_execution(self): + """Test that exceptions in worker threads are properly propagated.""" + + def failing_function(x: int) -> int: + if x == 3: + raise ValueError("Test exception") + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(failing_function) + + try: + list(transformer([1, 2, 3, 4])) + raise AssertionError("Expected exception was not raised") + except ValueError as e: + assert "Test exception" in str(e) + + +class TestParallelTransformerChunking: + """Test chunking behavior with concurrent execution.""" + + def test_chunking_with_concurrent_execution(self): + """Test that chunking works correctly with concurrent execution.""" + # Use a function that can help us verify chunking + processed_chunks = [] + + def chunk_tracking_function(x: int) -> int: + # This will help us see which items are processed together + processed_chunks.append(x) + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + transformer = transformer.map(chunk_tracking_function) + result = list(transformer([1, 2, 3, 4, 5, 6, 7])) + + assert result == [2, 4, 6, 8, 10, 12, 14] + # All items should have been processed + assert sorted(processed_chunks) == [1, 2, 3, 4, 5, 6, 7] + + def test_large_chunk_size_concurrent(self): + """Test parallel transformer with large chunk size.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=1000) + transformer = transformer.map(lambda x: x + 1) + large_data = list(range(100)) # Much smaller than chunk size + result = list(transformer(large_data)) + expected = [x + 1 for x in large_data] + assert result == expected + + +class TestParallelTransformerContextModification: + """Test context modification behavior with parallel transformer.""" + + def test_context_modification_with_locking(self): + """Test that context modification with locking works correctly in concurrent execution.""" + # Create context with items counter and a lock for thread safety + context = PipelineContext({"items": 0, "_lock": threading.Lock()}) + + def increment_counter(x: int, ctx: PipelineContext) -> int: + """Increment the items counter in context thread-safely.""" + with ctx["_lock"]: + current_items = ctx["items"] + # Small delay to increase chance of race condition without locking + time.sleep(0.001) + ctx["items"] = current_items + 1 + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=1) + transformer = transformer.map(increment_counter) + + data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + result = list(transformer(data, context)) + + # Verify results are correct + expected_result = [x * 2 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context was modified correctly - items should equal number of processed elements + assert context["items"] == len(data) + + def test_context_modification_without_locking_shows_race_condition(self): + """Test that context modification without locking can lead to race conditions.""" + # Create context without lock to demonstrate race condition + context = PipelineContext({"items": 0}) + + def unsafe_increment_counter(x: int, ctx: PipelineContext) -> int: + """Increment the items counter without thread safety.""" + current_items = ctx["items"] + # Delay to increase chance of race condition + time.sleep(0.001) + ctx["items"] = current_items + 1 + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=1) + transformer = transformer.map(unsafe_increment_counter) + + data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + result = list(transformer(data, context)) + + # Results should still be correct + expected_result = [x * 2 for x in data] + assert sorted(result) == sorted(expected_result) + + # Without locking, the final count will likely be less than expected due to race conditions + # This test may occasionally pass by chance, but should usually fail + # We'll check it's at least 1 but likely less than the full count + assert context["items"] >= 1 + # In a race condition, this will typically be less than len(data) + # But we can't assert this reliably in a test, so we just verify it's reasonable + assert context["items"] <= len(data) + + def test_multiple_context_values_with_locking(self): + """Test modifying multiple context values safely in concurrent execution.""" + context = PipelineContext({"total_sum": 0, "item_count": 0, "max_value": 0, "_lock": threading.Lock()}) + + def update_statistics(x: int, ctx: PipelineContext) -> int: + """Update multiple statistics in context thread-safely.""" + with ctx["_lock"]: + ctx["total_sum"] += x + ctx["item_count"] += 1 + ctx["max_value"] = max(ctx["max_value"], x) + return x * 3 + + transformer = ParallelTransformer[int, int](max_workers=3, chunk_size=2) + transformer = transformer.map(update_statistics) + + data = [1, 5, 3, 8, 2, 7, 4, 6] + result = list(transformer(data, context)) + + # Verify transformation results + expected_result = [x * 3 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context statistics were updated correctly + assert context["total_sum"] == sum(data) + assert context["item_count"] == len(data) + assert context["max_value"] == max(data) diff --git a/tests/test_transformer.py b/tests/test_transformer.py index b01109a..e970863 100644 --- a/tests/test_transformer.py +++ b/tests/test_transformer.py @@ -20,12 +20,6 @@ def test_init_with_chunk_size(self): result = list(transformer([1, 2, 3, 4])) assert result == [1, 2, 3, 4] - def test_init_with_context(self): - """Test init with custom context.""" - context = PipelineContext({"key": "value"}) - transformer = Transformer(context=context) - assert transformer.context == context - def test_call_executes_transformer(self): """Test that calling transformer executes it on data.""" transformer = Transformer.init(int) @@ -45,8 +39,8 @@ def test_map_transforms_elements(self): def test_map_with_context_aware_function(self): """Test map with context-aware function.""" context = PipelineContext({"multiplier": 3}) - transformer = Transformer(context=context).map(lambda x, ctx: x * ctx["multiplier"]) - result = list(transformer([1, 2, 3])) + transformer = Transformer().map(lambda x, ctx: x * ctx["multiplier"]) + result = list(transformer([1, 2, 3], context)) assert result == [3, 6, 9] def test_filter_keeps_matching_elements(self): @@ -58,8 +52,8 @@ def test_filter_keeps_matching_elements(self): def test_filter_with_context_aware_function(self): """Test filter with context-aware function.""" context = PipelineContext({"threshold": 3}) - transformer = Transformer(context=context).filter(lambda x, ctx: x > ctx["threshold"]) - result = list(transformer([1, 2, 3, 4, 5])) + transformer = Transformer().filter(lambda x, ctx: x > ctx["threshold"]) + result = list(transformer([1, 2, 3, 4, 5], context)) assert result == [4, 5] def test_flatten_list_of_lists(self): @@ -94,8 +88,8 @@ def test_tap_with_context_aware_function(self): """Test tap with context-aware function.""" side_effects = [] context = PipelineContext({"prefix": "item:"}) - transformer = Transformer(context=context).tap(lambda x, ctx: side_effects.append(f"{ctx['prefix']}{x}")) - result = list(transformer([1, 2, 3])) + transformer = Transformer().tap(lambda x, ctx: side_effects.append(f"{ctx['prefix']}{x}")) + result = list(transformer([1, 2, 3], context)) assert result == [1, 2, 3] assert side_effects == ["item:1", "item:2", "item:3"] @@ -158,9 +152,9 @@ def test_reduce_sums_elements(self): def test_reduce_with_context_aware_function(self): """Test reduce with context-aware function.""" context = PipelineContext({"multiplier": 2}) - transformer = Transformer(context=context) + transformer = Transformer() reducer = transformer.reduce(lambda acc, x, ctx: acc + (x * ctx["multiplier"]), initial=0) - result = list(reducer([1, 2, 3])) + result = list(reducer([1, 2, 3], context)) assert result == [12] # (1*2) + (2*2) + (3*2) = 2 + 4 + 6 = 12 def test_reduce_with_map_chain(self): @@ -209,3 +203,35 @@ def test_filter_removes_all_elements(self): transformer = Transformer.init(int).filter(lambda x: x > 100) result = list(transformer([1, 2, 3])) assert result == [] + + +class TestTransformerFromTransformer: + """Test the from_transformer class method.""" + + def test_from_transformer_copies_logic(self): + """Test that from_transformer copies transformation logic.""" + # Create a transformer with some operations + source = Transformer.init(int, chunk_size=50).map(lambda x: x * 3).filter(lambda x: x > 6) + + # Create new transformer from the source + target = Transformer.from_transformer(source) + + # Test both produce same results + data = [1, 2, 3, 4, 5] + source_result = list(source(data)) + target_result = list(target(data)) + + assert source_result == target_result + assert target.chunk_size == 50 # Chunk size should be copied + + def test_from_transformer_with_custom_parameters(self): + """Test from_transformer with custom parameters.""" + source = Transformer.init(int).map(lambda x: x * 2) + + target = Transformer.from_transformer(source, chunk_size=200) + + assert target.chunk_size == 200 # Custom chunk size + + # Should still have same transformation logic + data = [1, 2, 3] + assert list(source(data)) == list(target(data))