From 62d8c6df78b83498f197f759dd3bcb33f777e9cc Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 09:32:28 +0000 Subject: [PATCH 01/10] feat: concurrent transformer --- efemel/pipeline/transformers/transformer.py | 121 +++++++++ tests/test_transformer.py | 263 ++++++++++++++++++++ 2 files changed, 384 insertions(+) diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py index 6076587..eeec58f 100644 --- a/efemel/pipeline/transformers/transformer.py +++ b/efemel/pipeline/transformers/transformer.py @@ -1,6 +1,11 @@ +from collections import deque from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait +import copy from functools import reduce import inspect import itertools @@ -136,3 +141,119 @@ def _reduce(data: Iterable[In]) -> Iterator[U]: yield reduce(reducer, self(data), initial) return _reduce + + +class ConcurrentTransformer[In, Out](Transformer[In, Out]): + """ + A transformer that executes operations concurrently using multiple threads. + + This transformer overrides the __call__ method to process data chunks + in parallel, yielding results as they become available. + """ + + def __init__( + self, + max_workers: int = 4, + ordered: bool = True, + chunk_size: int = 1000, + context: PipelineContext | None = None, + ): + """ + Initialize the concurrent transformer. + + Args: + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. If False, results + are yielded as they complete. + chunk_size: Size of data chunks to process. + context: Pipeline context for operations. + """ + super().__init__(chunk_size, context) + self.max_workers = max_workers + self.ordered = ordered + + @classmethod + def from_transformer[T, U]( + cls, transformer: Transformer[T, U], max_workers: int = 4, ordered: bool = True + ) -> "ConcurrentTransformer[T, U]": + """ + Create a ConcurrentTransformer from an existing Transformer. + + Args: + transformer: The base transformer to make concurrent. + max_workers: Maximum number of worker threads. + ordered: Whether to maintain order of results. + + Returns: + A new ConcurrentTransformer with the same transformation logic. + """ + concurrent = cls(max_workers, ordered, transformer.chunk_size, copy.deepcopy(transformer.context)) + concurrent.transformer = copy.deepcopy(transformer.transformer) + return concurrent # type: ignore + + def __call__(self, data: Iterable[In]) -> Iterator[Out]: + """ + Executes the transformer on data concurrently, yielding results as processed. + + Args: + data: Input data to process. + + Returns: + Iterator yielding processed results. + """ + + def process_chunk(chunk: list[In]) -> list[Out]: + """Process a single chunk using the transformer.""" + return self.transformer(chunk) + + def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results in original order.""" + from concurrent.futures import Future + + futures: deque[Future[list[Out]]] = deque() + + # Submit initial batch of work + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk, chunk)) + except StopIteration: + break + + # Process remaining chunks, maintaining order + while futures: + yield futures.popleft().result() + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk, chunk)) + except StopIteration: + continue + + def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results as they complete (unordered).""" + # Submit initial batch of work + futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, self.max_workers + 1)} + + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() + try: + chunk = next(chunks_iter) + futures.add(executor.submit(process_chunk, chunk)) + except StopIteration: + continue + + def result_iterator_manager() -> Iterator[Out]: + """Manage the thread pool and flatten results.""" + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + chunks_to_process = self._chunk_generator(data) + + gen_func = _ordered_generator if self.ordered else _unordered_generator + processed_chunks_iterator = gen_func(chunks_to_process, executor) + + # Flatten the iterator of chunks into an iterator of items + for result_chunk in processed_chunks_iterator: + yield from result_chunk + + return result_iterator_manager() diff --git a/tests/test_transformer.py b/tests/test_transformer.py index b01109a..f7d12a2 100644 --- a/tests/test_transformer.py +++ b/tests/test_transformer.py @@ -1,5 +1,9 @@ """Tests for the Transformer class.""" +import time +from unittest.mock import patch + +from efemel.pipeline.transformers.transformer import ConcurrentTransformer from efemel.pipeline.transformers.transformer import PipelineContext from efemel.pipeline.transformers.transformer import Transformer @@ -209,3 +213,262 @@ def test_filter_removes_all_elements(self): transformer = Transformer.init(int).filter(lambda x: x > 100) result = list(transformer([1, 2, 3])) assert result == [] + + +class TestConcurrentTransformerBasics: + """Test basic concurrent transformer functionality.""" + + def test_init_creates_concurrent_transformer(self): + """Test that init creates a concurrent transformer with default values.""" + transformer = ConcurrentTransformer[int, int]() + assert transformer.max_workers == 4 + assert transformer.ordered is True + assert transformer.chunk_size == 1000 + + def test_init_with_custom_parameters(self): + """Test init with custom parameters.""" + context = PipelineContext({"key": "value"}) + transformer = ConcurrentTransformer[int, int](max_workers=8, ordered=False, chunk_size=500, context=context) + assert transformer.max_workers == 8 + assert transformer.ordered is False + assert transformer.chunk_size == 500 + assert transformer.context == context + + def test_call_executes_transformer_concurrently(self): + """Test that calling concurrent transformer executes it on data.""" + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=3) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [1, 2, 3, 4, 5] + + def test_from_transformer_class_method(self): + """Test creating ConcurrentTransformer from existing Transformer.""" + # Create a regular transformer with some operations + regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) + + # Convert to concurrent + concurrent = ConcurrentTransformer.from_transformer(regular, max_workers=2, ordered=True) + + # Test both produce same results + data = [1, 2, 3, 4, 5, 6] + regular_results = list(regular(data)) + concurrent_results = list(concurrent(data)) + + assert regular_results == concurrent_results + assert concurrent.max_workers == 2 + assert concurrent.ordered is True + assert concurrent.chunk_size == 100 + + +class TestConcurrentTransformerOperations: + """Test concurrent transformer operations.""" + + def test_map_transforms_elements_concurrently(self): + """Test map transforms each element using multiple threads.""" + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4])) + assert result == [2, 4, 6, 8] + + def test_filter_keeps_matching_elements_concurrently(self): + """Test filter keeps only matching elements using multiple threads.""" + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) + result = list(transformer([1, 2, 3, 4, 5, 6])) + assert result == [2, 4, 6] + + def test_chained_operations_concurrently(self): + """Test chained operations work correctly with concurrency.""" + transformer = ( + ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) + .map(lambda x: x * 2) + .filter(lambda x: x > 4) + .map(lambda x: x + 1) + ) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [7, 9, 11] # [2,4,6,8,10] -> [6,8,10] -> [7,9,11] + + def test_map_with_context_aware_function_concurrently(self): + """Test map with context-aware function in concurrent execution.""" + context = PipelineContext({"multiplier": 3}) + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2, context=context) + transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) + result = list(transformer([1, 2, 3])) + assert result == [3, 6, 9] + + def test_flatten_concurrently(self): + """Test flatten operation works with concurrent execution.""" + transformer = ConcurrentTransformer[list, int](max_workers=2, chunk_size=2).flatten() + result = list(transformer([[1, 2], [3, 4], [5, 6]])) + assert result == [1, 2, 3, 4, 5, 6] + + def test_tap_applies_side_effects_concurrently(self): + """Test tap applies side effects correctly in concurrent execution.""" + side_effects = [] + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.tap(lambda x: side_effects.append(x)) + result = list(transformer([1, 2, 3, 4])) + + assert result == [1, 2, 3, 4] # Data unchanged + assert sorted(side_effects) == [1, 2, 3, 4] # Side effects applied (may be out of order) + + +class TestConcurrentTransformerOrdering: + """Test ordering behavior of concurrent transformer.""" + + def test_ordered_execution_maintains_order(self): + """Test that ordered=True maintains element order.""" + + def slow_transform(x: int) -> int: + # Simulate variable processing time + time.sleep(0.01 * (5 - x)) # Later elements process faster + return x * 2 + + transformer = ConcurrentTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) + transformer = transformer.map(slow_transform) + result = list(transformer([1, 2, 3, 4, 5])) + + assert result == [2, 4, 6, 8, 10] # Order maintained despite processing times + + def test_unordered_execution_allows_reordering(self): + """Test that ordered=False allows results in completion order.""" + transformer = ConcurrentTransformer[int, int](max_workers=2, ordered=False, chunk_size=1) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4])) + + # Results should have the same elements, but order may vary + assert sorted(result) == [2, 4, 6, 8] + + def test_ordered_vs_unordered_same_results(self): + """Test that ordered and unordered produce same elements, just different order.""" + data = list(range(10)) + + ordered_transformer = ConcurrentTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) + ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) + + unordered_transformer = ConcurrentTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) + unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) + + assert sorted(ordered_result) == sorted(unordered_result) + assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence + + +class TestConcurrentTransformerPerformance: + """Test performance aspects of concurrent transformer.""" + + def test_concurrent_faster_than_sequential_for_slow_operations(self): + """Test that concurrent execution is faster for CPU-intensive operations.""" + + def slow_operation(x: int) -> int: + time.sleep(0.01) # 10ms delay + return x * 2 + + data = list(range(8)) # 8 items, 80ms total sequential time + + # Sequential execution + start_time = time.time() + sequential = Transformer[int, int](chunk_size=4) + seq_result = list(sequential.map(slow_operation)(data)) + seq_time = time.time() - start_time + + # Concurrent execution + start_time = time.time() + concurrent = ConcurrentTransformer[int, int](max_workers=4, chunk_size=4) + conc_result = list(concurrent.map(slow_operation)(data)) + conc_time = time.time() - start_time + + # Results should be the same + assert seq_result == conc_result + + # Concurrent should be faster (allowing some variance for thread overhead) + assert conc_time < seq_time * 0.8 # At least 20% faster def test_thread_pool_properly_managed(self): + """Test that thread pool is properly created and cleaned up.""" + with patch("efemel.pipeline.transformers.transformer.ThreadPoolExecutor") as mock_executor: + mock_executor.return_value.__enter__.return_value = mock_executor.return_value + mock_executor.return_value.__exit__.return_value = None + mock_executor.return_value.submit.return_value.result.return_value = [2, 4] + + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) + list(transformer([1, 2])) + + # Verify ThreadPoolExecutor was created with correct max_workers + mock_executor.assert_called_with(max_workers=2) + # Verify context manager was used (enter/exit called) + mock_executor.return_value.__enter__.assert_called_once() + mock_executor.return_value.__exit__.assert_called_once() + + +class TestConcurrentTransformerEdgeCases: + """Test edge cases for concurrent transformer.""" + + def test_empty_data_concurrent(self): + """Test concurrent transformer with empty data.""" + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([])) + assert result == [] + + def test_single_element_concurrent(self): + """Test concurrent transformer with single element.""" + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(lambda x: x * 2).filter(lambda x: x > 0) + result = list(transformer([5])) + assert result == [10] + + def test_data_smaller_than_chunk_size(self): + """Test concurrent transformer when data is smaller than chunk size.""" + transformer = ConcurrentTransformer[int, int](max_workers=4, chunk_size=100) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) + assert result == [2, 4, 6] + + def test_more_workers_than_chunks(self): + """Test concurrent transformer when workers exceed number of chunks.""" + transformer = ConcurrentTransformer[int, int](max_workers=10, chunk_size=2) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers + assert result == [2, 4, 6] + + def test_exception_handling_in_concurrent_execution(self): + """Test that exceptions in worker threads are properly propagated.""" + + def failing_function(x: int) -> int: + if x == 3: + raise ValueError("Test exception") + return x * 2 + + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(failing_function) + + try: + list(transformer([1, 2, 3, 4])) + raise AssertionError("Expected exception was not raised") + except ValueError as e: + assert "Test exception" in str(e) + + +class TestConcurrentTransformerChunking: + """Test chunking behavior with concurrent execution.""" + + def test_chunking_with_concurrent_execution(self): + """Test that chunking works correctly with concurrent execution.""" + # Use a function that can help us verify chunking + processed_chunks = [] + + def chunk_tracking_function(x: int) -> int: + # This will help us see which items are processed together + processed_chunks.append(x) + return x * 2 + + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=3) + transformer = transformer.map(chunk_tracking_function) + result = list(transformer([1, 2, 3, 4, 5, 6, 7])) + + assert result == [2, 4, 6, 8, 10, 12, 14] + # All items should have been processed + assert sorted(processed_chunks) == [1, 2, 3, 4, 5, 6, 7] + + def test_large_chunk_size_concurrent(self): + """Test concurrent transformer with large chunk size.""" + transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=1000) + transformer = transformer.map(lambda x: x + 1) + large_data = list(range(100)) # Much smaller than chunk size + result = list(transformer(large_data)) + expected = [x + 1 for x in large_data] + assert result == expected From d96e16d45d33d946b9907ab3628e9b6964dca23e Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 10:06:32 +0000 Subject: [PATCH 02/10] chore: refactored parallel transformer --- efemel/pipeline/transformers/parallel.py | 149 +++++++++++ efemel/pipeline/transformers/transformer.py | 161 +++--------- tests/test_parallel_transformer.py | 269 +++++++++++++++++++ tests/test_transformer.py | 275 ++------------------ 4 files changed, 477 insertions(+), 377 deletions(-) create mode 100644 efemel/pipeline/transformers/parallel.py create mode 100644 tests/test_parallel_transformer.py diff --git a/efemel/pipeline/transformers/parallel.py b/efemel/pipeline/transformers/parallel.py new file mode 100644 index 0000000..c0c84d6 --- /dev/null +++ b/efemel/pipeline/transformers/parallel.py @@ -0,0 +1,149 @@ +"""Parallel transformer implementation using multiple threads.""" + +from collections import deque +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait +import copy +import itertools + +from .transformer import DEFAULT_CHUNK_SIZE +from .transformer import PipelineContext +from .transformer import Transformer + + +class ParallelTransformer[In, Out](Transformer[In, Out]): + """ + A transformer that executes operations concurrently using multiple threads. + + This transformer overrides the __call__ method to process data chunks + in parallel, yielding results as they become available. + """ + + def __init__( + self, + max_workers: int = 4, + ordered: bool = True, + chunk_size: int = DEFAULT_CHUNK_SIZE, + context: PipelineContext | None = None, + transformer: Callable[[list[In]], list[Out]] = lambda chunk: chunk, # type: ignore + ): + """ + Initialize the parallel transformer. + + Args: + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. If False, results + are yielded as they complete. + chunk_size: Size of data chunks to process. + context: Pipeline context for operations. + """ + super().__init__(chunk_size, context) + self.max_workers = max_workers + self.ordered = ordered + self.transformer = transformer + + def __call__(self, data: Iterable[In]) -> Iterator[Out]: + """ + Executes the transformer on data concurrently, yielding results as processed. + + Args: + data: Input data to process. + + Returns: + Iterator yielding processed results. + """ + + def process_chunk(chunk: list[In]) -> list[Out]: + """Process a single chunk using the transformer.""" + return self.transformer(chunk) + + def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results in original order.""" + from concurrent.futures import Future + + futures: deque[Future[list[Out]]] = deque() + + # Submit initial batch of work + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk, chunk)) + except StopIteration: + break + + # Process remaining chunks, maintaining order + while futures: + yield futures.popleft().result() + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk, chunk)) + except StopIteration: + continue + + def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results as they complete (unordered).""" + # Submit initial batch of work + futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, self.max_workers + 1)} + + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() + try: + chunk = next(chunks_iter) + futures.add(executor.submit(process_chunk, chunk)) + except StopIteration: + continue + + def result_iterator_manager() -> Iterator[Out]: + """Manage the thread pool and flatten results.""" + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + chunks_to_process = self._chunk_generator(data) + + gen_func = _ordered_generator if self.ordered else _unordered_generator + processed_chunks_iterator = gen_func(chunks_to_process, executor) + + # Flatten the iterator of chunks into an iterator of items + for result_chunk in processed_chunks_iterator: + yield from result_chunk + + return result_iterator_manager() + + @classmethod + def from_transformer[T, U]( + cls, + transformer: Transformer[T, U], + max_workers: int = 4, + ordered: bool = True, + chunk_size: int | None = None, + context: PipelineContext | None = None, + **kwargs, + ) -> "ParallelTransformer[T, U]": + """ + Create a ParallelTransformer from an existing Transformer. + + This method uses the base class implementation but ensures the result + is properly typed as a ParallelTransformer. + + Args: + transformer: The base transformer to copy from. + max_workers: Maximum number of worker threads. + ordered: Whether to maintain order of results. + **kwargs: Additional arguments passed to the constructor. + + Returns: + A new ParallelTransformer with the same transformation logic. + """ + # Pass the ParallelTransformer-specific parameters via kwargs + + return cls( + chunk_size=chunk_size or transformer.chunk_size, + context=context or copy.deepcopy(transformer.context), + transformer=copy.deepcopy(transformer.transformer), # type: ignore + max_workers=max_workers, + ordered=ordered, + ) # type: ignore diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py index eeec58f..fe67113 100644 --- a/efemel/pipeline/transformers/transformer.py +++ b/efemel/pipeline/transformers/transformer.py @@ -1,10 +1,6 @@ -from collections import deque from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import wait import copy from functools import reduce import inspect @@ -14,6 +10,8 @@ from typing import Union from typing import overload +DEFAULT_CHUNK_SIZE = 1000 + # --- Type Aliases --- type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] @@ -35,17 +33,46 @@ class Transformer[In, Out]: def __init__( self, - chunk_size: int = 1000, + chunk_size: int = DEFAULT_CHUNK_SIZE, context: PipelineContext | None = None, + transformer: Callable[[list[In]], list[Out]] = lambda chunk: chunk, # type: ignore ): self.chunk_size = chunk_size self.context = context or PipelineContext() - self.transformer = lambda chunk: chunk + self.transformer = transformer @classmethod - def init[T](cls, _type_hint: type[T], chunk_size: int = 1000) -> "Transformer[T, T]": + def init[T](cls, _type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": """Create a new identity pipeline with explicit type hint.""" - return cls(chunk_size) # type: ignore + return cls(chunk_size=chunk_size) # type: ignore + + @classmethod + def from_transformer[T, U]( + cls, + transformer: "Transformer[T, U]", + chunk_size: int | None = None, + context: PipelineContext | None = None, + ) -> "Transformer[T, U]": + """ + Create a new transformer from an existing transformer. + + This method copies the transformation logic and context from an existing + transformer and applies it to a new transformer instance of the target class. + + Args: + transformer: The base transformer to copy from. + **kwargs: Additional arguments to pass to the new transformer constructor. + + Returns: + A new transformer instance with the same transformation logic. + """ + + # Create new instance with the transformer's chunk_size and context as defaults + return cls( + chunk_size=chunk_size or transformer.chunk_size, + context=context or copy.deepcopy(transformer.context), + transformer=copy.deepcopy(transformer.transformer), # type: ignore + ) def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: """Breaks an iterable into chunks of a specified size.""" @@ -56,7 +83,7 @@ def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: def _pipe[U](self, operation: Callable[[list[Out]], list[U]]) -> "Transformer[In, U]": """Composes the current transformer with a new chunk-wise operation.""" prev_transformer = self.transformer - self.transformer = lambda chunk: operation(prev_transformer(chunk)) + self.transformer = lambda chunk: operation(prev_transformer(chunk)) # type: ignore return self # type: ignore def _create_context_aware_function(self, func: Callable) -> Callable: @@ -141,119 +168,3 @@ def _reduce(data: Iterable[In]) -> Iterator[U]: yield reduce(reducer, self(data), initial) return _reduce - - -class ConcurrentTransformer[In, Out](Transformer[In, Out]): - """ - A transformer that executes operations concurrently using multiple threads. - - This transformer overrides the __call__ method to process data chunks - in parallel, yielding results as they become available. - """ - - def __init__( - self, - max_workers: int = 4, - ordered: bool = True, - chunk_size: int = 1000, - context: PipelineContext | None = None, - ): - """ - Initialize the concurrent transformer. - - Args: - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. If False, results - are yielded as they complete. - chunk_size: Size of data chunks to process. - context: Pipeline context for operations. - """ - super().__init__(chunk_size, context) - self.max_workers = max_workers - self.ordered = ordered - - @classmethod - def from_transformer[T, U]( - cls, transformer: Transformer[T, U], max_workers: int = 4, ordered: bool = True - ) -> "ConcurrentTransformer[T, U]": - """ - Create a ConcurrentTransformer from an existing Transformer. - - Args: - transformer: The base transformer to make concurrent. - max_workers: Maximum number of worker threads. - ordered: Whether to maintain order of results. - - Returns: - A new ConcurrentTransformer with the same transformation logic. - """ - concurrent = cls(max_workers, ordered, transformer.chunk_size, copy.deepcopy(transformer.context)) - concurrent.transformer = copy.deepcopy(transformer.transformer) - return concurrent # type: ignore - - def __call__(self, data: Iterable[In]) -> Iterator[Out]: - """ - Executes the transformer on data concurrently, yielding results as processed. - - Args: - data: Input data to process. - - Returns: - Iterator yielding processed results. - """ - - def process_chunk(chunk: list[In]) -> list[Out]: - """Process a single chunk using the transformer.""" - return self.transformer(chunk) - - def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results in original order.""" - from concurrent.futures import Future - - futures: deque[Future[list[Out]]] = deque() - - # Submit initial batch of work - for _ in range(self.max_workers + 1): - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk, chunk)) - except StopIteration: - break - - # Process remaining chunks, maintaining order - while futures: - yield futures.popleft().result() - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk, chunk)) - except StopIteration: - continue - - def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results as they complete (unordered).""" - # Submit initial batch of work - futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, self.max_workers + 1)} - - while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() - try: - chunk = next(chunks_iter) - futures.add(executor.submit(process_chunk, chunk)) - except StopIteration: - continue - - def result_iterator_manager() -> Iterator[Out]: - """Manage the thread pool and flatten results.""" - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - chunks_to_process = self._chunk_generator(data) - - gen_func = _ordered_generator if self.ordered else _unordered_generator - processed_chunks_iterator = gen_func(chunks_to_process, executor) - - # Flatten the iterator of chunks into an iterator of items - for result_chunk in processed_chunks_iterator: - yield from result_chunk - - return result_iterator_manager() diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py new file mode 100644 index 0000000..bdfea89 --- /dev/null +++ b/tests/test_parallel_transformer.py @@ -0,0 +1,269 @@ +"""Tests for the ParallelTransformer class.""" + +import time +from unittest.mock import patch + +from efemel.pipeline.transformers.parallel import ParallelTransformer +from efemel.pipeline.transformers.transformer import PipelineContext +from efemel.pipeline.transformers.transformer import Transformer + + +class TestParallelTransformerBasics: + """Test basic parallel transformer functionality.""" + + def test_init_creates_parallel_transformer(self): + """Test that init creates a parallel transformer with default values.""" + transformer = ParallelTransformer[int, int]() + assert transformer.max_workers == 4 + assert transformer.ordered is True + assert transformer.chunk_size == 1000 + + def test_init_with_custom_parameters(self): + """Test init with custom parameters.""" + context = PipelineContext({"key": "value"}) + transformer = ParallelTransformer[int, int](max_workers=8, ordered=False, chunk_size=500, context=context) + assert transformer.max_workers == 8 + assert transformer.ordered is False + assert transformer.chunk_size == 500 + assert transformer.context == context + + def test_call_executes_transformer_concurrently(self): + """Test that calling parallel transformer executes it on data.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [1, 2, 3, 4, 5] + + def test_from_transformer_class_method(self): + """Test creating ParallelTransformer from existing Transformer.""" + # Create a regular transformer with some operations + regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) + + # Convert to parallel using the base class method + parallel = ParallelTransformer.from_transformer(regular, max_workers=2, ordered=True) + + # Test both produce same results + data = [1, 2, 3, 4, 5, 6] + regular_results = list(regular(data)) + parallel_results = list(parallel(data)) + + assert regular_results == parallel_results + assert parallel.max_workers == 2 + assert parallel.ordered is True + assert parallel.chunk_size == 100 + + +class TestParallelTransformerOperations: + """Test parallel transformer operations.""" + + def test_map_transforms_elements_concurrently(self): + """Test map transforms each element using multiple threads.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4])) + assert result == [2, 4, 6, 8] + + def test_filter_keeps_matching_elements_concurrently(self): + """Test filter keeps only matching elements using multiple threads.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) + result = list(transformer([1, 2, 3, 4, 5, 6])) + assert result == [2, 4, 6] + + def test_chained_operations_concurrently(self): + """Test chained operations work correctly with concurrency.""" + transformer = ( + ParallelTransformer[int, int](max_workers=2, chunk_size=2) + .map(lambda x: x * 2) + .filter(lambda x: x > 4) + .map(lambda x: x + 1) + ) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [7, 9, 11] # [2,4,6,8,10] -> [6,8,10] -> [7,9,11] + + def test_map_with_context_aware_function_concurrently(self): + """Test map with context-aware function in concurrent execution.""" + context = PipelineContext({"multiplier": 3}) + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2, context=context) + transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) + result = list(transformer([1, 2, 3])) + assert result == [3, 6, 9] + + def test_flatten_concurrently(self): + """Test flatten operation works with concurrent execution.""" + transformer = ParallelTransformer[list, int](max_workers=2, chunk_size=2).flatten() + result = list(transformer([[1, 2], [3, 4], [5, 6]])) + assert result == [1, 2, 3, 4, 5, 6] + + def test_tap_applies_side_effects_concurrently(self): + """Test tap applies side effects correctly in concurrent execution.""" + side_effects = [] + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.tap(lambda x: side_effects.append(x)) + result = list(transformer([1, 2, 3, 4])) + + assert result == [1, 2, 3, 4] # Data unchanged + assert sorted(side_effects) == [1, 2, 3, 4] # Side effects applied (may be out of order) + + +class TestParallelTransformerOrdering: + """Test ordering behavior of parallel transformer.""" + + def test_ordered_execution_maintains_order(self): + """Test that ordered=True maintains element order.""" + + def slow_transform(x: int) -> int: + # Simulate variable processing time + time.sleep(0.01 * (5 - x)) # Later elements process faster + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) + transformer = transformer.map(slow_transform) + result = list(transformer([1, 2, 3, 4, 5])) + + assert result == [2, 4, 6, 8, 10] # Order maintained despite processing times + + def test_unordered_execution_allows_reordering(self): + """Test that ordered=False allows results in completion order.""" + transformer = ParallelTransformer[int, int](max_workers=2, ordered=False, chunk_size=1) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4])) + + # Results should have the same elements, but order may vary + assert sorted(result) == [2, 4, 6, 8] + + def test_ordered_vs_unordered_same_results(self): + """Test that ordered and unordered produce same elements, just different order.""" + data = list(range(10)) + + ordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) + ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) + + unordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) + unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) + + assert sorted(ordered_result) == sorted(unordered_result) + assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence + + +class TestParallelTransformerPerformance: + """Test performance aspects of parallel transformer.""" + + def test_concurrent_faster_than_sequential_for_slow_operations(self): + """Test that concurrent execution is faster for CPU-intensive operations.""" + + def slow_operation(x: int) -> int: + time.sleep(0.01) # 10ms delay + return x * 2 + + data = list(range(8)) # 8 items, 80ms total sequential time + + # Sequential execution + start_time = time.time() + sequential = Transformer[int, int](chunk_size=4) + seq_result = list(sequential.map(slow_operation)(data)) + seq_time = time.time() - start_time + + # Concurrent execution + start_time = time.time() + concurrent = ParallelTransformer[int, int](max_workers=4, chunk_size=4) + conc_result = list(concurrent.map(slow_operation)(data)) + conc_time = time.time() - start_time + + # Results should be the same + assert seq_result == conc_result + + # Concurrent should be faster (allowing some variance for thread overhead) + assert conc_time < seq_time * 0.8 # At least 20% faster + + def test_thread_pool_properly_managed(self): + """Test that thread pool is properly created and cleaned up.""" + with patch("efemel.pipeline.transformers.parallel.ThreadPoolExecutor") as mock_executor: + mock_executor.return_value.__enter__.return_value = mock_executor.return_value + mock_executor.return_value.__exit__.return_value = None + mock_executor.return_value.submit.return_value.result.return_value = [2, 4] + + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + list(transformer([1, 2])) + + # Verify ThreadPoolExecutor was created with correct max_workers + mock_executor.assert_called_with(max_workers=2) + # Verify context manager was used (enter/exit called) + mock_executor.return_value.__enter__.assert_called_once() + mock_executor.return_value.__exit__.assert_called_once() + + +class TestParallelTransformerEdgeCases: + """Test edge cases for parallel transformer.""" + + def test_empty_data_concurrent(self): + """Test parallel transformer with empty data.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([])) + assert result == [] + + def test_single_element_concurrent(self): + """Test parallel transformer with single element.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(lambda x: x * 2).filter(lambda x: x > 0) + result = list(transformer([5])) + assert result == [10] + + def test_data_smaller_than_chunk_size(self): + """Test parallel transformer when data is smaller than chunk size.""" + transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=100) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) + assert result == [2, 4, 6] + + def test_more_workers_than_chunks(self): + """Test parallel transformer when workers exceed number of chunks.""" + transformer = ParallelTransformer[int, int](max_workers=10, chunk_size=2) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers + assert result == [2, 4, 6] + + def test_exception_handling_in_concurrent_execution(self): + """Test that exceptions in worker threads are properly propagated.""" + + def failing_function(x: int) -> int: + if x == 3: + raise ValueError("Test exception") + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(failing_function) + + try: + list(transformer([1, 2, 3, 4])) + raise AssertionError("Expected exception was not raised") + except ValueError as e: + assert "Test exception" in str(e) + + +class TestParallelTransformerChunking: + """Test chunking behavior with concurrent execution.""" + + def test_chunking_with_concurrent_execution(self): + """Test that chunking works correctly with concurrent execution.""" + # Use a function that can help us verify chunking + processed_chunks = [] + + def chunk_tracking_function(x: int) -> int: + # This will help us see which items are processed together + processed_chunks.append(x) + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + transformer = transformer.map(chunk_tracking_function) + result = list(transformer([1, 2, 3, 4, 5, 6, 7])) + + assert result == [2, 4, 6, 8, 10, 12, 14] + # All items should have been processed + assert sorted(processed_chunks) == [1, 2, 3, 4, 5, 6, 7] + + def test_large_chunk_size_concurrent(self): + """Test parallel transformer with large chunk size.""" + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=1000) + transformer = transformer.map(lambda x: x + 1) + large_data = list(range(100)) # Much smaller than chunk size + result = list(transformer(large_data)) + expected = [x + 1 for x in large_data] + assert result == expected diff --git a/tests/test_transformer.py b/tests/test_transformer.py index f7d12a2..b683bd4 100644 --- a/tests/test_transformer.py +++ b/tests/test_transformer.py @@ -1,9 +1,5 @@ """Tests for the Transformer class.""" -import time -from unittest.mock import patch - -from efemel.pipeline.transformers.transformer import ConcurrentTransformer from efemel.pipeline.transformers.transformer import PipelineContext from efemel.pipeline.transformers.transformer import Transformer @@ -215,260 +211,35 @@ def test_filter_removes_all_elements(self): assert result == [] -class TestConcurrentTransformerBasics: - """Test basic concurrent transformer functionality.""" - - def test_init_creates_concurrent_transformer(self): - """Test that init creates a concurrent transformer with default values.""" - transformer = ConcurrentTransformer[int, int]() - assert transformer.max_workers == 4 - assert transformer.ordered is True - assert transformer.chunk_size == 1000 - - def test_init_with_custom_parameters(self): - """Test init with custom parameters.""" - context = PipelineContext({"key": "value"}) - transformer = ConcurrentTransformer[int, int](max_workers=8, ordered=False, chunk_size=500, context=context) - assert transformer.max_workers == 8 - assert transformer.ordered is False - assert transformer.chunk_size == 500 - assert transformer.context == context - - def test_call_executes_transformer_concurrently(self): - """Test that calling concurrent transformer executes it on data.""" - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=3) - result = list(transformer([1, 2, 3, 4, 5])) - assert result == [1, 2, 3, 4, 5] +class TestTransformerFromTransformer: + """Test the from_transformer class method.""" - def test_from_transformer_class_method(self): - """Test creating ConcurrentTransformer from existing Transformer.""" - # Create a regular transformer with some operations - regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) + def test_from_transformer_copies_logic(self): + """Test that from_transformer copies transformation logic.""" + # Create a transformer with some operations + source = Transformer.init(int, chunk_size=50).map(lambda x: x * 3).filter(lambda x: x > 6) - # Convert to concurrent - concurrent = ConcurrentTransformer.from_transformer(regular, max_workers=2, ordered=True) + # Create new transformer from the source + target = Transformer.from_transformer(source) # Test both produce same results - data = [1, 2, 3, 4, 5, 6] - regular_results = list(regular(data)) - concurrent_results = list(concurrent(data)) - - assert regular_results == concurrent_results - assert concurrent.max_workers == 2 - assert concurrent.ordered is True - assert concurrent.chunk_size == 100 - - -class TestConcurrentTransformerOperations: - """Test concurrent transformer operations.""" - - def test_map_transforms_elements_concurrently(self): - """Test map transforms each element using multiple threads.""" - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) - result = list(transformer([1, 2, 3, 4])) - assert result == [2, 4, 6, 8] - - def test_filter_keeps_matching_elements_concurrently(self): - """Test filter keeps only matching elements using multiple threads.""" - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) - result = list(transformer([1, 2, 3, 4, 5, 6])) - assert result == [2, 4, 6] - - def test_chained_operations_concurrently(self): - """Test chained operations work correctly with concurrency.""" - transformer = ( - ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) - .map(lambda x: x * 2) - .filter(lambda x: x > 4) - .map(lambda x: x + 1) - ) - result = list(transformer([1, 2, 3, 4, 5])) - assert result == [7, 9, 11] # [2,4,6,8,10] -> [6,8,10] -> [7,9,11] - - def test_map_with_context_aware_function_concurrently(self): - """Test map with context-aware function in concurrent execution.""" - context = PipelineContext({"multiplier": 3}) - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2, context=context) - transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) - result = list(transformer([1, 2, 3])) - assert result == [3, 6, 9] - - def test_flatten_concurrently(self): - """Test flatten operation works with concurrent execution.""" - transformer = ConcurrentTransformer[list, int](max_workers=2, chunk_size=2).flatten() - result = list(transformer([[1, 2], [3, 4], [5, 6]])) - assert result == [1, 2, 3, 4, 5, 6] - - def test_tap_applies_side_effects_concurrently(self): - """Test tap applies side effects correctly in concurrent execution.""" - side_effects = [] - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.tap(lambda x: side_effects.append(x)) - result = list(transformer([1, 2, 3, 4])) - - assert result == [1, 2, 3, 4] # Data unchanged - assert sorted(side_effects) == [1, 2, 3, 4] # Side effects applied (may be out of order) - - -class TestConcurrentTransformerOrdering: - """Test ordering behavior of concurrent transformer.""" - - def test_ordered_execution_maintains_order(self): - """Test that ordered=True maintains element order.""" - - def slow_transform(x: int) -> int: - # Simulate variable processing time - time.sleep(0.01 * (5 - x)) # Later elements process faster - return x * 2 - - transformer = ConcurrentTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) - transformer = transformer.map(slow_transform) - result = list(transformer([1, 2, 3, 4, 5])) - - assert result == [2, 4, 6, 8, 10] # Order maintained despite processing times - - def test_unordered_execution_allows_reordering(self): - """Test that ordered=False allows results in completion order.""" - transformer = ConcurrentTransformer[int, int](max_workers=2, ordered=False, chunk_size=1) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3, 4])) - - # Results should have the same elements, but order may vary - assert sorted(result) == [2, 4, 6, 8] - - def test_ordered_vs_unordered_same_results(self): - """Test that ordered and unordered produce same elements, just different order.""" - data = list(range(10)) - - ordered_transformer = ConcurrentTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) - ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) - - unordered_transformer = ConcurrentTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) - unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) - - assert sorted(ordered_result) == sorted(unordered_result) - assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence - - -class TestConcurrentTransformerPerformance: - """Test performance aspects of concurrent transformer.""" - - def test_concurrent_faster_than_sequential_for_slow_operations(self): - """Test that concurrent execution is faster for CPU-intensive operations.""" - - def slow_operation(x: int) -> int: - time.sleep(0.01) # 10ms delay - return x * 2 + data = [1, 2, 3, 4, 5] + source_result = list(source(data)) + target_result = list(target(data)) - data = list(range(8)) # 8 items, 80ms total sequential time + assert source_result == target_result + assert target.chunk_size == 50 # Chunk size should be copied - # Sequential execution - start_time = time.time() - sequential = Transformer[int, int](chunk_size=4) - seq_result = list(sequential.map(slow_operation)(data)) - seq_time = time.time() - start_time + def test_from_transformer_with_custom_parameters(self): + """Test from_transformer with custom parameters.""" + source = Transformer.init(int).map(lambda x: x * 2) + context = PipelineContext({"custom": "value"}) - # Concurrent execution - start_time = time.time() - concurrent = ConcurrentTransformer[int, int](max_workers=4, chunk_size=4) - conc_result = list(concurrent.map(slow_operation)(data)) - conc_time = time.time() - start_time + target = Transformer.from_transformer(source, chunk_size=200, context=context) - # Results should be the same - assert seq_result == conc_result + assert target.chunk_size == 200 # Custom chunk size + assert target.context == context # Custom context - # Concurrent should be faster (allowing some variance for thread overhead) - assert conc_time < seq_time * 0.8 # At least 20% faster def test_thread_pool_properly_managed(self): - """Test that thread pool is properly created and cleaned up.""" - with patch("efemel.pipeline.transformers.transformer.ThreadPoolExecutor") as mock_executor: - mock_executor.return_value.__enter__.return_value = mock_executor.return_value - mock_executor.return_value.__exit__.return_value = None - mock_executor.return_value.submit.return_value.result.return_value = [2, 4] - - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) - list(transformer([1, 2])) - - # Verify ThreadPoolExecutor was created with correct max_workers - mock_executor.assert_called_with(max_workers=2) - # Verify context manager was used (enter/exit called) - mock_executor.return_value.__enter__.assert_called_once() - mock_executor.return_value.__exit__.assert_called_once() - - -class TestConcurrentTransformerEdgeCases: - """Test edge cases for concurrent transformer.""" - - def test_empty_data_concurrent(self): - """Test concurrent transformer with empty data.""" - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) - result = list(transformer([])) - assert result == [] - - def test_single_element_concurrent(self): - """Test concurrent transformer with single element.""" - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.map(lambda x: x * 2).filter(lambda x: x > 0) - result = list(transformer([5])) - assert result == [10] - - def test_data_smaller_than_chunk_size(self): - """Test concurrent transformer when data is smaller than chunk size.""" - transformer = ConcurrentTransformer[int, int](max_workers=4, chunk_size=100) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3])) - assert result == [2, 4, 6] - - def test_more_workers_than_chunks(self): - """Test concurrent transformer when workers exceed number of chunks.""" - transformer = ConcurrentTransformer[int, int](max_workers=10, chunk_size=2) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers - assert result == [2, 4, 6] - - def test_exception_handling_in_concurrent_execution(self): - """Test that exceptions in worker threads are properly propagated.""" - - def failing_function(x: int) -> int: - if x == 3: - raise ValueError("Test exception") - return x * 2 - - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.map(failing_function) - - try: - list(transformer([1, 2, 3, 4])) - raise AssertionError("Expected exception was not raised") - except ValueError as e: - assert "Test exception" in str(e) - - -class TestConcurrentTransformerChunking: - """Test chunking behavior with concurrent execution.""" - - def test_chunking_with_concurrent_execution(self): - """Test that chunking works correctly with concurrent execution.""" - # Use a function that can help us verify chunking - processed_chunks = [] - - def chunk_tracking_function(x: int) -> int: - # This will help us see which items are processed together - processed_chunks.append(x) - return x * 2 - - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=3) - transformer = transformer.map(chunk_tracking_function) - result = list(transformer([1, 2, 3, 4, 5, 6, 7])) - - assert result == [2, 4, 6, 8, 10, 12, 14] - # All items should have been processed - assert sorted(processed_chunks) == [1, 2, 3, 4, 5, 6, 7] - - def test_large_chunk_size_concurrent(self): - """Test concurrent transformer with large chunk size.""" - transformer = ConcurrentTransformer[int, int](max_workers=2, chunk_size=1000) - transformer = transformer.map(lambda x: x + 1) - large_data = list(range(100)) # Much smaller than chunk size - result = list(transformer(large_data)) - expected = [x + 1 for x in large_data] - assert result == expected + # Should still have same transformation logic + data = [1, 2, 3] + assert list(source(data)) == list(target(data)) From eb43bef8cd8b58bb5397d410be5ce5eb4e6da9d6 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 10:13:26 +0000 Subject: [PATCH 03/10] fix: from_transformer type signature is invalid --- efemel/pipeline/transformers/parallel.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/efemel/pipeline/transformers/parallel.py b/efemel/pipeline/transformers/parallel.py index c0c84d6..f462b9f 100644 --- a/efemel/pipeline/transformers/parallel.py +++ b/efemel/pipeline/transformers/parallel.py @@ -117,11 +117,10 @@ def result_iterator_manager() -> Iterator[Out]: def from_transformer[T, U]( cls, transformer: Transformer[T, U], - max_workers: int = 4, - ordered: bool = True, chunk_size: int | None = None, context: PipelineContext | None = None, - **kwargs, + max_workers: int = 4, + ordered: bool = True, ) -> "ParallelTransformer[T, U]": """ Create a ParallelTransformer from an existing Transformer. @@ -131,19 +130,17 @@ def from_transformer[T, U]( Args: transformer: The base transformer to copy from. - max_workers: Maximum number of worker threads. - ordered: Whether to maintain order of results. - **kwargs: Additional arguments passed to the constructor. + chunk_size: Optional chunk size override. + context: Optional context override. + **kwargs: Additional arguments including max_workers and ordered. Returns: A new ParallelTransformer with the same transformation logic. """ - # Pass the ParallelTransformer-specific parameters via kwargs - return cls( chunk_size=chunk_size or transformer.chunk_size, context=context or copy.deepcopy(transformer.context), transformer=copy.deepcopy(transformer.transformer), # type: ignore max_workers=max_workers, ordered=ordered, - ) # type: ignore + ) From 3516da690793137ec8495181c4fd8565edca09e3 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 11:33:09 +0000 Subject: [PATCH 04/10] fix: context sharing and passing --- efemel/pipeline/pipeline.py | 15 ++++- efemel/pipeline/transformers/parallel.py | 67 ++++++++++++--------- efemel/pipeline/transformers/transformer.py | 40 +++++------- tests/test_integration.py | 9 +-- tests/test_parallel_transformer.py | 8 +-- tests/test_transformer.py | 26 +++----- 6 files changed, 83 insertions(+), 82 deletions(-) diff --git a/efemel/pipeline/pipeline.py b/efemel/pipeline/pipeline.py index 37cb288..631c197 100644 --- a/efemel/pipeline/pipeline.py +++ b/efemel/pipeline/pipeline.py @@ -28,13 +28,24 @@ class Pipeline[T]: def __init__(self, *data: Iterable[T]): self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0] self.processed_data: Iterator = iter(self.data_source) + self.ctx = PipelineContext() - def apply[U](self, transformer: Transformer[T, U] | Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": + def context(self, ctx: PipelineContext) -> "Pipeline[T]": + """ + Sets the context for the pipeline. + """ + self.ctx = ctx + return self + + def apply[U]( + self, transformer: Transformer[T, U] | Callable[[Iterable[T], PipelineContext], Iterator[U]] + ) -> "Pipeline[U]": """ Applies a transformer to the current data source. """ + # The transformer is called with the current processed data, producing a new iterator - new_data = transformer(self.processed_data) + new_data = transformer(self.processed_data, self.ctx) # type: ignore # Create a new pipeline with the transformed data self.processed_data = new_data return self # type: ignore diff --git a/efemel/pipeline/transformers/parallel.py b/efemel/pipeline/transformers/parallel.py index f462b9f..f7822d3 100644 --- a/efemel/pipeline/transformers/parallel.py +++ b/efemel/pipeline/transformers/parallel.py @@ -8,19 +8,25 @@ from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait import copy +from functools import partial import itertools +import threading +from typing import TypedDict from .transformer import DEFAULT_CHUNK_SIZE from .transformer import PipelineContext from .transformer import Transformer +class ParallelPipelineContextType(TypedDict): + """A specific context type for parallel transformers that includes a lock.""" + + lock: threading.Lock + + class ParallelTransformer[In, Out](Transformer[In, Out]): """ A transformer that executes operations concurrently using multiple threads. - - This transformer overrides the __call__ method to process data chunks - in parallel, yielding results as they become available. """ def __init__( @@ -28,7 +34,6 @@ def __init__( max_workers: int = 4, ordered: bool = True, chunk_size: int = DEFAULT_CHUNK_SIZE, - context: PipelineContext | None = None, transformer: Callable[[list[In]], list[Out]] = lambda chunk: chunk, # type: ignore ): """ @@ -37,57 +42,71 @@ def __init__( Args: max_workers: Maximum number of worker threads. ordered: If True, results are yielded in order. If False, results - are yielded as they complete. + are yielded as they complete. chunk_size: Size of data chunks to process. - context: Pipeline context for operations. """ - super().__init__(chunk_size, context) + super().__init__(chunk_size) self.max_workers = max_workers self.ordered = ordered self.transformer = transformer + # The lock is no longer created here. It will be created per-call. - def __call__(self, data: Iterable[In]) -> Iterator[Out]: + def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: """ Executes the transformer on data concurrently, yielding results as processed. + A new `threading.Lock` is created for each call to ensure execution runs + are isolated from each other. + Args: data: Input data to process. - - Returns: - Iterator yielding processed results. + context: A PipelineContext to be shared across all worker threads. + If provided, it will be merged with this transformer's context. """ + # Create a context scoped to this specific call to ensure thread safety + # and prevent state leakage between different runs. + if context: + self.context = context + + self.context["lock"] = threading.Lock() + + # A new lock is created for every call, ensuring each run is isolated. - def process_chunk(chunk: list[In]) -> list[Out]: + def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]: """Process a single chunk using the transformer.""" + self.context = shared_context return self.transformer(chunk) + # Use functools.partial to pass the call-specific context to every thread. + process_chunk_with_context = partial(process_chunk, shared_context=self.context) + def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: """Generate results in original order.""" from concurrent.futures import Future futures: deque[Future[list[Out]]] = deque() - # Submit initial batch of work for _ in range(self.max_workers + 1): try: chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk, chunk)) + futures.append(executor.submit(process_chunk_with_context, chunk)) except StopIteration: break - # Process remaining chunks, maintaining order while futures: yield futures.popleft().result() try: chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk, chunk)) + futures.append(executor.submit(process_chunk_with_context, chunk)) except StopIteration: continue def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: """Generate results as they complete (unordered).""" - # Submit initial batch of work - futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, self.max_workers + 1)} + futures = { + executor.submit(process_chunk_with_context, chunk) + for chunk in itertools.islice(chunks_iter, self.max_workers + 1) + } while futures: done, futures = wait(futures, return_when=FIRST_COMPLETED) @@ -95,7 +114,7 @@ def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolEx yield future.result() try: chunk = next(chunks_iter) - futures.add(executor.submit(process_chunk, chunk)) + futures.add(executor.submit(process_chunk_with_context, chunk)) except StopIteration: continue @@ -107,7 +126,6 @@ def result_iterator_manager() -> Iterator[Out]: gen_func = _ordered_generator if self.ordered else _unordered_generator processed_chunks_iterator = gen_func(chunks_to_process, executor) - # Flatten the iterator of chunks into an iterator of items for result_chunk in processed_chunks_iterator: yield from result_chunk @@ -118,28 +136,23 @@ def from_transformer[T, U]( cls, transformer: Transformer[T, U], chunk_size: int | None = None, - context: PipelineContext | None = None, max_workers: int = 4, ordered: bool = True, ) -> "ParallelTransformer[T, U]": """ Create a ParallelTransformer from an existing Transformer. - This method uses the base class implementation but ensures the result - is properly typed as a ParallelTransformer. - Args: transformer: The base transformer to copy from. chunk_size: Optional chunk size override. - context: Optional context override. - **kwargs: Additional arguments including max_workers and ordered. + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. Returns: A new ParallelTransformer with the same transformation logic. """ return cls( chunk_size=chunk_size or transformer.chunk_size, - context=context or copy.deepcopy(transformer.context), transformer=copy.deepcopy(transformer.transformer), # type: ignore max_workers=max_workers, ordered=ordered, diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py index fe67113..c729a97 100644 --- a/efemel/pipeline/transformers/transformer.py +++ b/efemel/pipeline/transformers/transformer.py @@ -13,12 +13,12 @@ DEFAULT_CHUNK_SIZE = 1000 # --- Type Aliases --- -type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] -type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] +type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, "PipelineContext"], T] +type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, "PipelineContext"], U] class PipelineContext(dict): - """Global context available to all pipeline operations.""" + """Generic, untyped context available to all pipeline operations.""" pass @@ -34,11 +34,10 @@ class Transformer[In, Out]: def __init__( self, chunk_size: int = DEFAULT_CHUNK_SIZE, - context: PipelineContext | None = None, transformer: Callable[[list[In]], list[Out]] = lambda chunk: chunk, # type: ignore ): self.chunk_size = chunk_size - self.context = context or PipelineContext() + self.context: PipelineContext = PipelineContext() self.transformer = transformer @classmethod @@ -51,26 +50,16 @@ def from_transformer[T, U]( cls, transformer: "Transformer[T, U]", chunk_size: int | None = None, - context: PipelineContext | None = None, ) -> "Transformer[T, U]": """ Create a new transformer from an existing transformer. - This method copies the transformation logic and context from an existing - transformer and applies it to a new transformer instance of the target class. - - Args: - transformer: The base transformer to copy from. - **kwargs: Additional arguments to pass to the new transformer constructor. - - Returns: - A new transformer instance with the same transformation logic. + This method copies the transformation logic from an existing + transformer and applies it to a new transformer instance. The new + transformer will have its own fresh context. """ - - # Create new instance with the transformer's chunk_size and context as defaults return cls( chunk_size=chunk_size or transformer.chunk_size, - context=context or copy.deepcopy(transformer.context), transformer=copy.deepcopy(transformer.transformer), # type: ignore ) @@ -149,14 +138,15 @@ def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In return t(self) # Terminal operations - # These operations execute the transformer on a data source and yield results. - # If you want to operate on the results, you need to use a Pipeline and apply - # a different transformer to it. - - def __call__(self, data: Iterable[In]) -> Iterator[Out]: + def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: """ Executes the transformer on a data source (terminal operations). """ + if context: + self.context.update(context) + + print(self.context) + for chunk in self._chunk_generator(data): yield from self.transformer(chunk) @@ -164,7 +154,7 @@ def reduce[U](self, function: PipelineReduceFunction[Out, U], initial: U): """Reduces elements to a single value (terminal operation).""" reducer = self._create_reduce_function(function) - def _reduce(data: Iterable[In]) -> Iterator[U]: - yield reduce(reducer, self(data), initial) + def _reduce(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: + yield reduce(reducer, self(data, context), initial) return _reduce diff --git a/tests/test_integration.py b/tests/test_integration.py index 1cbe1fc..944a4e2 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,7 +1,6 @@ """Integration tests for Pipeline and Transformer working together.""" from efemel.pipeline.pipeline import Pipeline -from efemel.pipeline.transformers.transformer import PipelineContext from efemel.pipeline.transformers.transformer import Transformer @@ -19,13 +18,11 @@ def test_basic_integration(self): def test_context_sharing(self): """Test that context is properly shared in transformations.""" - context = PipelineContext({"multiplier": 3, "threshold": 5}) + context = {"multiplier": 3, "threshold": 5} - transformer = ( - Transformer(context=context).map(lambda x, ctx: x * ctx["multiplier"]).filter(lambda x, ctx: x > ctx["threshold"]) - ) + transformer = Transformer().map(lambda x, ctx: x * ctx["multiplier"]).filter(lambda x, ctx: x > ctx["threshold"]) - result = Pipeline([1, 2, 3]).apply(transformer).to_list() + result = Pipeline([1, 2, 3]).context(context).apply(transformer).to_list() assert result == [6, 9] # [3, 6, 9] filtered to [6, 9] def test_reduce_integration(self): diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py index bdfea89..2ca7f7d 100644 --- a/tests/test_parallel_transformer.py +++ b/tests/test_parallel_transformer.py @@ -20,12 +20,10 @@ def test_init_creates_parallel_transformer(self): def test_init_with_custom_parameters(self): """Test init with custom parameters.""" - context = PipelineContext({"key": "value"}) - transformer = ParallelTransformer[int, int](max_workers=8, ordered=False, chunk_size=500, context=context) + transformer = ParallelTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) assert transformer.max_workers == 8 assert transformer.ordered is False assert transformer.chunk_size == 500 - assert transformer.context == context def test_call_executes_transformer_concurrently(self): """Test that calling parallel transformer executes it on data.""" @@ -81,9 +79,9 @@ def test_chained_operations_concurrently(self): def test_map_with_context_aware_function_concurrently(self): """Test map with context-aware function in concurrent execution.""" context = PipelineContext({"multiplier": 3}) - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2, context=context) + transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) - result = list(transformer([1, 2, 3])) + result = list(transformer([1, 2, 3], context)) assert result == [3, 6, 9] def test_flatten_concurrently(self): diff --git a/tests/test_transformer.py b/tests/test_transformer.py index b683bd4..e970863 100644 --- a/tests/test_transformer.py +++ b/tests/test_transformer.py @@ -20,12 +20,6 @@ def test_init_with_chunk_size(self): result = list(transformer([1, 2, 3, 4])) assert result == [1, 2, 3, 4] - def test_init_with_context(self): - """Test init with custom context.""" - context = PipelineContext({"key": "value"}) - transformer = Transformer(context=context) - assert transformer.context == context - def test_call_executes_transformer(self): """Test that calling transformer executes it on data.""" transformer = Transformer.init(int) @@ -45,8 +39,8 @@ def test_map_transforms_elements(self): def test_map_with_context_aware_function(self): """Test map with context-aware function.""" context = PipelineContext({"multiplier": 3}) - transformer = Transformer(context=context).map(lambda x, ctx: x * ctx["multiplier"]) - result = list(transformer([1, 2, 3])) + transformer = Transformer().map(lambda x, ctx: x * ctx["multiplier"]) + result = list(transformer([1, 2, 3], context)) assert result == [3, 6, 9] def test_filter_keeps_matching_elements(self): @@ -58,8 +52,8 @@ def test_filter_keeps_matching_elements(self): def test_filter_with_context_aware_function(self): """Test filter with context-aware function.""" context = PipelineContext({"threshold": 3}) - transformer = Transformer(context=context).filter(lambda x, ctx: x > ctx["threshold"]) - result = list(transformer([1, 2, 3, 4, 5])) + transformer = Transformer().filter(lambda x, ctx: x > ctx["threshold"]) + result = list(transformer([1, 2, 3, 4, 5], context)) assert result == [4, 5] def test_flatten_list_of_lists(self): @@ -94,8 +88,8 @@ def test_tap_with_context_aware_function(self): """Test tap with context-aware function.""" side_effects = [] context = PipelineContext({"prefix": "item:"}) - transformer = Transformer(context=context).tap(lambda x, ctx: side_effects.append(f"{ctx['prefix']}{x}")) - result = list(transformer([1, 2, 3])) + transformer = Transformer().tap(lambda x, ctx: side_effects.append(f"{ctx['prefix']}{x}")) + result = list(transformer([1, 2, 3], context)) assert result == [1, 2, 3] assert side_effects == ["item:1", "item:2", "item:3"] @@ -158,9 +152,9 @@ def test_reduce_sums_elements(self): def test_reduce_with_context_aware_function(self): """Test reduce with context-aware function.""" context = PipelineContext({"multiplier": 2}) - transformer = Transformer(context=context) + transformer = Transformer() reducer = transformer.reduce(lambda acc, x, ctx: acc + (x * ctx["multiplier"]), initial=0) - result = list(reducer([1, 2, 3])) + result = list(reducer([1, 2, 3], context)) assert result == [12] # (1*2) + (2*2) + (3*2) = 2 + 4 + 6 = 12 def test_reduce_with_map_chain(self): @@ -233,12 +227,10 @@ def test_from_transformer_copies_logic(self): def test_from_transformer_with_custom_parameters(self): """Test from_transformer with custom parameters.""" source = Transformer.init(int).map(lambda x: x * 2) - context = PipelineContext({"custom": "value"}) - target = Transformer.from_transformer(source, chunk_size=200, context=context) + target = Transformer.from_transformer(source, chunk_size=200) assert target.chunk_size == 200 # Custom chunk size - assert target.context == context # Custom context # Should still have same transformation logic data = [1, 2, 3] From a24a47c463830b8e173fe3f68484d0b04fd21c72 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 12:37:11 +0000 Subject: [PATCH 05/10] fix: context sharing across threads --- efemel/pipeline/helpers.py | 20 ++++ efemel/pipeline/pipeline.py | 35 +++++- efemel/pipeline/transformers/parallel.py | 110 ++++++++---------- efemel/pipeline/transformers/transformer.py | 117 ++++++++++---------- 4 files changed, 157 insertions(+), 125 deletions(-) create mode 100644 efemel/pipeline/helpers.py diff --git a/efemel/pipeline/helpers.py b/efemel/pipeline/helpers.py new file mode 100644 index 0000000..81d61c1 --- /dev/null +++ b/efemel/pipeline/helpers.py @@ -0,0 +1,20 @@ +from collections.abc import Callable +import inspect + + +def create_context_aware_function(func: Callable) -> Callable: + """ + Normalizes a user-provided function to always accept a context argument. + It no longer closes over `self.context`. + """ + try: + sig = inspect.signature(func) + params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] + # If the function already takes 2+ args, we assume it's context-aware. + if len(params) >= 2: + return func # type: ignore + except (ValueError, TypeError): + # This handles built-ins or other non-inspectable callables. + pass + # If the function takes only one argument, we adapt it to accept a context that it will ignore. + return lambda value, ctx: func(value) # type: ignore diff --git a/efemel/pipeline/pipeline.py b/efemel/pipeline/pipeline.py index 631c197..3afc79a 100644 --- a/efemel/pipeline/pipeline.py +++ b/efemel/pipeline/pipeline.py @@ -5,6 +5,9 @@ from typing import Any from typing import TypedDict from typing import TypeVar +from typing import overload + +from efemel.pipeline.helpers import create_context_aware_function from .transformers.transformer import Transformer @@ -37,17 +40,39 @@ def context(self, ctx: PipelineContext) -> "Pipeline[T]": self.ctx = ctx return self + @overload + def apply[U](self, transformer: Transformer[T, U]) -> "Pipeline[U]": ... + + @overload + def apply[U](self, transformer: Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": ... + + @overload def apply[U]( - self, transformer: Transformer[T, U] | Callable[[Iterable[T], PipelineContext], Iterator[U]] + self, + transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]], + ) -> "Pipeline[U]": ... + + def apply[U]( + self, + transformer: Transformer[T, U] + | Callable[[Iterable[T]], Iterator[U]] + | Callable[[Iterable[T], PipelineContext], Iterator[U]], ) -> "Pipeline[U]": """ Applies a transformer to the current data source. """ - # The transformer is called with the current processed data, producing a new iterator - new_data = transformer(self.processed_data, self.ctx) # type: ignore - # Create a new pipeline with the transformed data - self.processed_data = new_data + match transformer: + case Transformer(): + # If a Transformer instance is provided, use its __call__ method + self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore + case _ if callable(transformer): + # If a callable function is provided, call it with the current data and context + processed_transformer = create_context_aware_function(transformer) # type: ignore + self.processed_data = processed_transformer(self.processed_data, self.ctx) + case _: + raise TypeError("Transformer must be a Transformer instance or a callable function") + return self # type: ignore def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]": diff --git a/efemel/pipeline/transformers/parallel.py b/efemel/pipeline/transformers/parallel.py index f7822d3..bf616be 100644 --- a/efemel/pipeline/transformers/parallel.py +++ b/efemel/pipeline/transformers/parallel.py @@ -1,10 +1,10 @@ """Parallel transformer implementation using multiple threads.""" from collections import deque -from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait import copy @@ -14,16 +14,19 @@ from typing import TypedDict from .transformer import DEFAULT_CHUNK_SIZE +from .transformer import InternalTransformer from .transformer import PipelineContext from .transformer import Transformer +# --- Type Definitions --- class ParallelPipelineContextType(TypedDict): """A specific context type for parallel transformers that includes a lock.""" lock: threading.Lock +# --- Class Definition --- class ParallelTransformer[In, Out](Transformer[In, Out]): """ A transformer that executes operations concurrently using multiple threads. @@ -34,7 +37,7 @@ def __init__( max_workers: int = 4, ordered: bool = True, chunk_size: int = DEFAULT_CHUNK_SIZE, - transformer: Callable[[list[In]], list[Out]] = lambda chunk: chunk, # type: ignore + transformer: InternalTransformer[In, Out] | None = None, ): """ Initialize the parallel transformer. @@ -44,55 +47,70 @@ def __init__( ordered: If True, results are yielded in order. If False, results are yielded as they complete. chunk_size: Size of data chunks to process. + transformer: The transformation logic chain. """ - super().__init__(chunk_size) + super().__init__(chunk_size, transformer) self.max_workers = max_workers self.ordered = ordered - self.transformer = transformer - # The lock is no longer created here. It will be created per-call. - def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: + @classmethod + def from_transformer[T, U]( + cls, + transformer: Transformer[T, U], + chunk_size: int | None = None, + max_workers: int = 4, + ordered: bool = True, + ) -> "ParallelTransformer[T, U]": """ - Executes the transformer on data concurrently, yielding results as processed. - - A new `threading.Lock` is created for each call to ensure execution runs - are isolated from each other. + Create a ParallelTransformer from an existing Transformer's logic. Args: - data: Input data to process. - context: A PipelineContext to be shared across all worker threads. - If provided, it will be merged with this transformer's context. + transformer: The base transformer to copy the transformation logic from. + chunk_size: Optional chunk size override. + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. + + Returns: + A new ParallelTransformer with the same transformation logic. """ - # Create a context scoped to this specific call to ensure thread safety - # and prevent state leakage between different runs. - if context: - self.context = context + return cls( + chunk_size=chunk_size or transformer.chunk_size, + transformer=copy.deepcopy(transformer.transformer), # type: ignore + max_workers=max_workers, + ordered=ordered, + ) - self.context["lock"] = threading.Lock() + def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: + """ + Executes the transformer on data concurrently. - # A new lock is created for every call, ensuring each run is isolated. + A new `threading.Lock` is created and added to the context for each call + to ensure execution runs are isolated and thread-safe. + """ + # Determine the context for this run, passing it by reference as requested. + run_context = context or self.context + # Add a per-call lock for thread safety. + run_context["lock"] = threading.Lock() def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]: - """Process a single chunk using the transformer.""" - self.context = shared_context - return self.transformer(chunk) + """ + Process a single chunk by passing the chunk and context explicitly + to the transformer chain. This is safer and avoids mutating self. + """ + return self.transformer(chunk, shared_context) - # Use functools.partial to pass the call-specific context to every thread. - process_chunk_with_context = partial(process_chunk, shared_context=self.context) + # Create a partial function with the run_context "baked in". + process_chunk_with_context = partial(process_chunk, shared_context=run_context) def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results in original order.""" - from concurrent.futures import Future - + """Generate results in their original order.""" futures: deque[Future[list[Out]]] = deque() - for _ in range(self.max_workers + 1): try: chunk = next(chunks_iter) futures.append(executor.submit(process_chunk_with_context, chunk)) except StopIteration: break - while futures: yield futures.popleft().result() try: @@ -102,12 +120,11 @@ def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExec continue def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results as they complete (unordered).""" + """Generate results as they complete.""" futures = { executor.submit(process_chunk_with_context, chunk) for chunk in itertools.islice(chunks_iter, self.max_workers + 1) } - while futures: done, futures = wait(futures, return_when=FIRST_COMPLETED) for future in done: @@ -119,41 +136,12 @@ def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolEx continue def result_iterator_manager() -> Iterator[Out]: - """Manage the thread pool and flatten results.""" + """Manage the thread pool and yield flattened results.""" with ThreadPoolExecutor(max_workers=self.max_workers) as executor: chunks_to_process = self._chunk_generator(data) - gen_func = _ordered_generator if self.ordered else _unordered_generator processed_chunks_iterator = gen_func(chunks_to_process, executor) - for result_chunk in processed_chunks_iterator: yield from result_chunk return result_iterator_manager() - - @classmethod - def from_transformer[T, U]( - cls, - transformer: Transformer[T, U], - chunk_size: int | None = None, - max_workers: int = 4, - ordered: bool = True, - ) -> "ParallelTransformer[T, U]": - """ - Create a ParallelTransformer from an existing Transformer. - - Args: - transformer: The base transformer to copy from. - chunk_size: Optional chunk size override. - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. - - Returns: - A new ParallelTransformer with the same transformation logic. - """ - return cls( - chunk_size=chunk_size or transformer.chunk_size, - transformer=copy.deepcopy(transformer.transformer), # type: ignore - max_workers=max_workers, - ordered=ordered, - ) diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py index c729a97..66bfe3b 100644 --- a/efemel/pipeline/transformers/transformer.py +++ b/efemel/pipeline/transformers/transformer.py @@ -10,39 +10,43 @@ from typing import Union from typing import overload -DEFAULT_CHUNK_SIZE = 1000 +from efemel.pipeline.helpers import create_context_aware_function -# --- Type Aliases --- -type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, "PipelineContext"], T] -type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, "PipelineContext"], U] +DEFAULT_CHUNK_SIZE = 1000 +# --- Type Aliases --- class PipelineContext(dict): """Generic, untyped context available to all pipeline operations.""" pass +type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] +type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] + +# The internal transformer function signature is changed to explicitly accept a context. +type InternalTransformer[In, Out] = Callable[[list[In], PipelineContext], list[Out]] + + class Transformer[In, Out]: """ - Defines and composes data transformations (e.g., map, filter). - - Transformers are callable and return a generator, applying the composed - transformation logic to an iterable data source. + Defines and composes data transformations by passing context explicitly. """ def __init__( self, chunk_size: int = DEFAULT_CHUNK_SIZE, - transformer: Callable[[list[In]], list[Out]] = lambda chunk: chunk, # type: ignore + transformer: InternalTransformer[In, Out] | None = None, ): self.chunk_size = chunk_size self.context: PipelineContext = PipelineContext() - self.transformer = transformer + # The default transformer now accepts and ignores a context argument. + self.transformer: InternalTransformer[In, Out] = transformer or (lambda chunk, ctx: chunk) # type: ignore @classmethod def init[T](cls, _type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": - """Create a new identity pipeline with explicit type hint.""" + """Create a new identity pipeline with an explicit type hint.""" return cls(chunk_size=chunk_size) # type: ignore @classmethod @@ -51,13 +55,7 @@ def from_transformer[T, U]( transformer: "Transformer[T, U]", chunk_size: int | None = None, ) -> "Transformer[T, U]": - """ - Create a new transformer from an existing transformer. - - This method copies the transformation logic from an existing - transformer and applies it to a new transformer instance. The new - transformer will have its own fresh context. - """ + """Create a new transformer from an existing one, copying its logic.""" return cls( chunk_size=chunk_size or transformer.chunk_size, transformer=copy.deepcopy(transformer.transformer), # type: ignore @@ -69,66 +67,56 @@ def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: while chunk := list(itertools.islice(data_iter, self.chunk_size)): yield chunk - def _pipe[U](self, operation: Callable[[list[Out]], list[U]]) -> "Transformer[In, U]": - """Composes the current transformer with a new chunk-wise operation.""" + def _pipe[U](self, operation: Callable[[list[Out], PipelineContext], list[U]]) -> "Transformer[In, U]": + """Composes the current transformer with a new context-aware operation.""" prev_transformer = self.transformer - self.transformer = lambda chunk: operation(prev_transformer(chunk)) # type: ignore + # The new transformer chain ensures the context `ctx` is passed at each step. + self.transformer = lambda chunk, ctx: operation(prev_transformer(chunk, ctx), ctx) # type: ignore return self # type: ignore - def _create_context_aware_function(self, func: Callable) -> Callable: - """Creates a function that correctly handles the context parameter.""" - try: - sig = inspect.signature(func) - params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] - if len(params) >= 2: - return lambda value: func(value, self.context) - except (ValueError, TypeError): - pass - return func - - def _create_reduce_function(self, func: Callable) -> Callable: - """Creates a reduce function that correctly handles the context parameter.""" + def _create_reduce_function(self, func: PipelineReduceFunction) -> Callable[[Any, Any, PipelineContext], Any]: + """Normalizes a user-provided reduce function to accept a context argument.""" try: sig = inspect.signature(func) params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] if len(params) >= 3: - return lambda acc, value: func(acc, value, self.context) + return func # type: ignore except (ValueError, TypeError): pass - return func + # Adapt a simple reducer (e.g., lambda acc, val: acc + val). + return lambda acc, value, ctx: func(acc, value) # type: ignore def map[U](self, function: PipelineFunction[Out, U]) -> "Transformer[In, U]": - """Transforms elements in the pipeline.""" - std_function = self._create_context_aware_function(function) - return self._pipe(lambda chunk: [std_function(x) for x in chunk]) + """Transforms elements, passing context explicitly to the mapping function.""" + std_function = create_context_aware_function(function) + # The operation passed to _pipe receives the chunk and context, and applies the function. + return self._pipe(lambda chunk, ctx: [std_function(x, ctx) for x in chunk]) def filter(self, predicate: PipelineFunction[Out, bool]) -> "Transformer[In, Out]": - """Filters elements in the pipeline.""" - std_predicate = self._create_context_aware_function(predicate) - return self._pipe(lambda chunk: [x for x in chunk if std_predicate(x)]) + """Filters elements, passing context explicitly to the predicate function.""" + std_predicate = create_context_aware_function(predicate) + return self._pipe(lambda chunk, ctx: [x for x in chunk if std_predicate(x, ctx)]) @overload def flatten[T](self: "Transformer[In, list[T]]") -> "Transformer[In, T]": ... - @overload def flatten[T](self: "Transformer[In, tuple[T, ...]]") -> "Transformer[In, T]": ... - @overload def flatten[T](self: "Transformer[In, set[T]]") -> "Transformer[In, T]": ... def flatten[T]( self: Union["Transformer[In, list[T]]", "Transformer[In, tuple[T, ...]]", "Transformer[In, set[T]]"], ) -> "Transformer[In, T]": - """Flatten nested lists into a single list.""" - return self._pipe(lambda chunk: [item for sublist in chunk for item in sublist]) # type: ignore + """Flattens nested lists; the context is passed through the operation.""" + return self._pipe(lambda chunk, ctx: [item for sublist in chunk for item in sublist]) # type: ignore def tap(self, function: PipelineFunction[Out, Any]) -> "Transformer[In, Out]": """Applies a side-effect function without modifying the data.""" - std_function = self._create_context_aware_function(function) + std_function = create_context_aware_function(function) - def tap_operation(chunk: list[Out]) -> list[Out]: + def tap_operation(chunk: list[Out], ctx: PipelineContext) -> list[Out]: for item in chunk: - std_function(item) + std_function(item, ctx) return chunk return self._pipe(tap_operation) @@ -137,24 +125,35 @@ def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In """Apply another pipeline to the current one.""" return t(self) - # Terminal operations def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: """ - Executes the transformer on a data source (terminal operations). - """ - if context: - self.context.update(context) + Executes the transformer on a data source. - print(self.context) + It uses the provided `context` by reference. If none is provided, it uses + the transformer's internal context. + """ + # Use the provided context by reference, or default to the instance's context. + run_context = context or self.context for chunk in self._chunk_generator(data): - yield from self.transformer(chunk) + # The context is now passed explicitly through the transformer chain. + yield from self.transformer(chunk, run_context) - def reduce[U](self, function: PipelineReduceFunction[Out, U], initial: U): + def reduce[U](self, function: PipelineReduceFunction[U, Out], initial: U): """Reduces elements to a single value (terminal operation).""" - reducer = self._create_reduce_function(function) + std_reducer = self._create_reduce_function(function) def _reduce(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: - yield reduce(reducer, self(data, context), initial) + # The context for the run is determined here. + run_context = context or self.context + + # The generator now needs the context to pass to the transformer. + data_iterator = self(data, run_context) + + # We need a new reducer that curries the context for functools.reduce. + def reducer_with_context(acc, value): + return std_reducer(acc, value, run_context) + + yield reduce(reducer_with_context, data_iterator, initial) return _reduce From e0078a713d39569194a29cac2076a9d39dea5ac6 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 12:47:12 +0000 Subject: [PATCH 06/10] chore: making sure context is correctly passed around --- tests/test_integration.py | 164 +++++++++++++++++++++++++++++ tests/test_parallel_transformer.py | 90 ++++++++++++++++ 2 files changed, 254 insertions(+) diff --git a/tests/test_integration.py b/tests/test_integration.py index 944a4e2..d6f2123 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,6 +1,10 @@ """Integration tests for Pipeline and Transformer working together.""" +import threading + from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.parallel import ParallelTransformer +from efemel.pipeline.transformers.transformer import PipelineContext from efemel.pipeline.transformers.transformer import Transformer @@ -216,3 +220,163 @@ def test_streaming_processing(self): results.extend(batch_result) assert results == [3, 6, 9, 12, 15, 18] + + +class TestPipelineParallelTransformerIntegration: + """Test Pipeline integration with ParallelTransformer and context modification.""" + + def test_pipeline_with_parallel_transformer_context_modification(self): + """Test pipeline calling parallel transformer that safely modifies context.""" + # Create context with statistics to be updated by parallel processing + context = PipelineContext({"processed_count": 0, "sum_total": 0, "max_value": 0, "_lock": threading.Lock()}) + + def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: + """Safely increment context counters and transform the value.""" + with ctx["_lock"]: + ctx["processed_count"] += 1 + ctx["sum_total"] += x + ctx["max_value"] = max(ctx["max_value"], x) + return x * 2 + + # Create parallel transformer that modifies context safely + parallel_transformer = ParallelTransformer[int, int](max_workers=3, chunk_size=2) + parallel_transformer = parallel_transformer.map(safe_increment_and_transform) + + # Use pipeline to process data with parallel transformer + data = [1, 5, 3, 8, 2, 7, 4, 6] + result = Pipeline(data).context(context).apply(parallel_transformer).to_list() + + # Verify transformation results + expected_result = [x * 2 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context was safely modified by parallel workers + assert context["processed_count"] == len(data) + assert context["sum_total"] == sum(data) + assert context["max_value"] == max(data) + + def test_pipeline_accesses_context_after_parallel_processing(self): + """Test that pipeline can access context data modified by parallel transformer.""" + # Create context with counters + context = PipelineContext({"items_processed": 0, "even_count": 0, "odd_count": 0, "_lock": threading.Lock()}) + + def count_and_transform(x: int, ctx: PipelineContext) -> int: + """Count even/odd numbers and transform.""" + with ctx["_lock"]: + ctx["items_processed"] += 1 + if x % 2 == 0: + ctx["even_count"] += 1 + else: + ctx["odd_count"] += 1 + return x * 3 + + parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + parallel_transformer = parallel_transformer.map(count_and_transform) + + data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + # Process with pipeline + pipeline = Pipeline(data).context(context) + result = pipeline.apply(parallel_transformer).to_list() + + # Access the modified context from pipeline + final_context = pipeline.ctx + + # Verify results + expected_result = [x * 3 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context statistics computed by parallel workers + assert final_context["items_processed"] == 10 + assert final_context["even_count"] == 5 # 2, 4, 6, 8, 10 + assert final_context["odd_count"] == 5 # 1, 3, 5, 7, 9 + + # Demonstrate context access after processing + total_processed = final_context["even_count"] + final_context["odd_count"] + assert total_processed == final_context["items_processed"] + + def test_multiple_parallel_transformers_sharing_context(self): + """Test multiple parallel transformers modifying the same context.""" + # Shared context for statistics across transformations + context = PipelineContext({"stage1_processed": 0, "stage2_processed": 0, "total_sum": 0, "_lock": threading.Lock()}) + + def stage1_processor(x: int, ctx: PipelineContext) -> int: + """First stage processing with context update.""" + with ctx["_lock"]: + ctx["stage1_processed"] += 1 + ctx["total_sum"] += x + return x * 2 + + def stage2_processor(x: int, ctx: PipelineContext) -> int: + """Second stage processing with context update.""" + with ctx["_lock"]: + ctx["stage2_processed"] += 1 + ctx["total_sum"] += x # Add transformed value too + return x + 10 + + # Create two parallel transformers + stage1 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage1_processor) + stage2 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage2_processor) + + data = [1, 2, 3, 4, 5] + + # Chain parallel transformers in pipeline + pipeline = Pipeline(data).context(context) + result = ( + pipeline.apply(stage1) # [2, 4, 6, 8, 10] + .apply(stage2) # [12, 14, 16, 18, 20] + .to_list() + ) + + # Verify final results + expected_stage1 = [x * 2 for x in data] # [2, 4, 6, 8, 10] + expected_final = [x + 10 for x in expected_stage1] # [12, 14, 16, 18, 20] + assert result == expected_final + + # Verify context reflects both stages + final_context = pipeline.ctx + assert final_context["stage1_processed"] == 5 + assert final_context["stage2_processed"] == 5 + + # Total sum should include original values + transformed values + original_sum = sum(data) # 1+2+3+4+5 = 15 + stage1_sum = sum(expected_stage1) # 2+4+6+8+10 = 30 + expected_total = original_sum + stage1_sum # 15 + 30 = 45 + assert final_context["total_sum"] == expected_total + + def test_pipeline_context_isolation_with_parallel_processing(self): + """Test that different pipeline instances have isolated contexts.""" + + # Create base context structure + def create_context(): + return PipelineContext({"count": 0, "_lock": threading.Lock()}) + + def increment_counter(x: int, ctx: PipelineContext) -> int: + """Increment counter in context.""" + with ctx["_lock"]: + ctx["count"] += 1 + return x * 2 + + parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + parallel_transformer = parallel_transformer.map(increment_counter) + + data = [1, 2, 3] + + # Create two separate pipeline instances with their own contexts + pipeline1 = Pipeline(data).context(create_context()) + pipeline2 = Pipeline(data).context(create_context()) + + # Process with both pipelines + result1 = pipeline1.apply(parallel_transformer).to_list() + result2 = pipeline2.apply(parallel_transformer).to_list() + + # Both should have same transformation results + assert result1 == [2, 4, 6] + assert result2 == [2, 4, 6] + + # But contexts should be isolated + assert pipeline1.ctx["count"] == 3 + assert pipeline2.ctx["count"] == 3 + + # Verify they are different context objects + assert pipeline1.ctx is not pipeline2.ctx diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py index 2ca7f7d..da98a4f 100644 --- a/tests/test_parallel_transformer.py +++ b/tests/test_parallel_transformer.py @@ -1,5 +1,6 @@ """Tests for the ParallelTransformer class.""" +import threading import time from unittest.mock import patch @@ -265,3 +266,92 @@ def test_large_chunk_size_concurrent(self): result = list(transformer(large_data)) expected = [x + 1 for x in large_data] assert result == expected + + +class TestParallelTransformerContextModification: + """Test context modification behavior with parallel transformer.""" + + def test_context_modification_with_locking(self): + """Test that context modification with locking works correctly in concurrent execution.""" + # Create context with items counter and a lock for thread safety + context = PipelineContext({"items": 0, "_lock": threading.Lock()}) + + def increment_counter(x: int, ctx: PipelineContext) -> int: + """Increment the items counter in context thread-safely.""" + with ctx["_lock"]: + current_items = ctx["items"] + # Small delay to increase chance of race condition without locking + time.sleep(0.001) + ctx["items"] = current_items + 1 + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=1) + transformer = transformer.map(increment_counter) + + data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + result = list(transformer(data, context)) + + # Verify results are correct + expected_result = [x * 2 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context was modified correctly - items should equal number of processed elements + assert context["items"] == len(data) + + def test_context_modification_without_locking_shows_race_condition(self): + """Test that context modification without locking can lead to race conditions.""" + # Create context without lock to demonstrate race condition + context = PipelineContext({"items": 0}) + + def unsafe_increment_counter(x: int, ctx: PipelineContext) -> int: + """Increment the items counter without thread safety.""" + current_items = ctx["items"] + # Delay to increase chance of race condition + time.sleep(0.001) + ctx["items"] = current_items + 1 + return x * 2 + + transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=1) + transformer = transformer.map(unsafe_increment_counter) + + data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + result = list(transformer(data, context)) + + # Results should still be correct + expected_result = [x * 2 for x in data] + assert sorted(result) == sorted(expected_result) + + # Without locking, the final count will likely be less than expected due to race conditions + # This test may occasionally pass by chance, but should usually fail + # We'll check it's at least 1 but likely less than the full count + assert context["items"] >= 1 + # In a race condition, this will typically be less than len(data) + # But we can't assert this reliably in a test, so we just verify it's reasonable + assert context["items"] <= len(data) + + def test_multiple_context_values_with_locking(self): + """Test modifying multiple context values safely in concurrent execution.""" + context = PipelineContext({"total_sum": 0, "item_count": 0, "max_value": 0, "_lock": threading.Lock()}) + + def update_statistics(x: int, ctx: PipelineContext) -> int: + """Update multiple statistics in context thread-safely.""" + with ctx["_lock"]: + ctx["total_sum"] += x + ctx["item_count"] += 1 + ctx["max_value"] = max(ctx["max_value"], x) + return x * 3 + + transformer = ParallelTransformer[int, int](max_workers=3, chunk_size=2) + transformer = transformer.map(update_statistics) + + data = [1, 5, 3, 8, 2, 7, 4, 6] + result = list(transformer(data, context)) + + # Verify transformation results + expected_result = [x * 3 for x in data] + assert sorted(result) == sorted(expected_result) + + # Verify context statistics were updated correctly + assert context["total_sum"] == sum(data) + assert context["item_count"] == len(data) + assert context["max_value"] == max(data) From 02abc3797cfc85eee22bb41d6f3375ae04939a28 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 12:54:49 +0000 Subject: [PATCH 07/10] chore: don't set locks --- tests/test_integration.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/tests/test_integration.py b/tests/test_integration.py index d6f2123..7fc2f6b 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,7 +1,5 @@ """Integration tests for Pipeline and Transformer working together.""" -import threading - from efemel.pipeline.pipeline import Pipeline from efemel.pipeline.transformers.parallel import ParallelTransformer from efemel.pipeline.transformers.transformer import PipelineContext @@ -228,11 +226,11 @@ class TestPipelineParallelTransformerIntegration: def test_pipeline_with_parallel_transformer_context_modification(self): """Test pipeline calling parallel transformer that safely modifies context.""" # Create context with statistics to be updated by parallel processing - context = PipelineContext({"processed_count": 0, "sum_total": 0, "max_value": 0, "_lock": threading.Lock()}) + context = PipelineContext({"processed_count": 0, "sum_total": 0, "max_value": 0}) def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: """Safely increment context counters and transform the value.""" - with ctx["_lock"]: + with ctx["lock"]: ctx["processed_count"] += 1 ctx["sum_total"] += x ctx["max_value"] = max(ctx["max_value"], x) @@ -258,11 +256,11 @@ def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: def test_pipeline_accesses_context_after_parallel_processing(self): """Test that pipeline can access context data modified by parallel transformer.""" # Create context with counters - context = PipelineContext({"items_processed": 0, "even_count": 0, "odd_count": 0, "_lock": threading.Lock()}) + context = PipelineContext({"items_processed": 0, "even_count": 0, "odd_count": 0}) def count_and_transform(x: int, ctx: PipelineContext) -> int: """Count even/odd numbers and transform.""" - with ctx["_lock"]: + with ctx["lock"]: ctx["items_processed"] += 1 if x % 2 == 0: ctx["even_count"] += 1 @@ -298,18 +296,18 @@ def count_and_transform(x: int, ctx: PipelineContext) -> int: def test_multiple_parallel_transformers_sharing_context(self): """Test multiple parallel transformers modifying the same context.""" # Shared context for statistics across transformations - context = PipelineContext({"stage1_processed": 0, "stage2_processed": 0, "total_sum": 0, "_lock": threading.Lock()}) + context = PipelineContext({"stage1_processed": 0, "stage2_processed": 0, "total_sum": 0}) def stage1_processor(x: int, ctx: PipelineContext) -> int: """First stage processing with context update.""" - with ctx["_lock"]: + with ctx["lock"]: ctx["stage1_processed"] += 1 ctx["total_sum"] += x return x * 2 def stage2_processor(x: int, ctx: PipelineContext) -> int: """Second stage processing with context update.""" - with ctx["_lock"]: + with ctx["lock"]: ctx["stage2_processed"] += 1 ctx["total_sum"] += x # Add transformed value too return x + 10 @@ -349,11 +347,11 @@ def test_pipeline_context_isolation_with_parallel_processing(self): # Create base context structure def create_context(): - return PipelineContext({"count": 0, "_lock": threading.Lock()}) + return PipelineContext({"count": 0}) def increment_counter(x: int, ctx: PipelineContext) -> int: """Increment counter in context.""" - with ctx["_lock"]: + with ctx["lock"]: ctx["count"] += 1 return x * 2 From c536ad586faad8cb980fb5b8ae184bd751e286bf Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 13:09:22 +0000 Subject: [PATCH 08/10] chore: updated performance test --- performance_test.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/performance_test.py b/performance_test.py index 5dd0374..9f0a182 100644 --- a/performance_test.py +++ b/performance_test.py @@ -15,12 +15,13 @@ import statistics import time -from efemel.pipeline import Pipeline +from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.transformer import Transformer def generate_test_data(size: int = 1_000_000) -> list[int]: """Generate test data of specified size.""" - return range(size) + return range(size) # type: ignore def generator_approach(data: list[int]) -> list[int]: @@ -54,9 +55,9 @@ def step4(items): def builtin_map_filter_approach(data: list[int]) -> list[int]: """Process data using built-in map/filter chaining.""" result = filter(lambda x: x % 2 == 0, data) # Keep even numbers - result = (x * 2 for x in result) # Double them + result = (x * 2 for x in result) # type: ignore result = filter(lambda x: x > 100, result) # Keep only > 100 - result = (x + 1 for x in result) # Add 1 + result = (x + 1 for x in result) # type: ignore return list(result) @@ -405,8 +406,8 @@ def chunked_pipeline_per_item_approach(data) -> list[int]: return list(PIPELINE_PER_ITEM.run(data)) -PIPELINE = ( - Pipeline.init(int) +PIPELINE_TRANSFORMER = ( + Transformer() .filter(lambda x: x % 2 == 0) # Filter even numbers .map(lambda x: x * 2) # Double them .filter(lambda x: x > 100) # Filter > 100 @@ -416,7 +417,7 @@ def chunked_pipeline_per_item_approach(data) -> list[int]: def pipeline_approach(data: list[int]) -> list[int]: """Process data using the Pipeline class.""" - return PIPELINE.to_list(data) + return Pipeline(data).apply(PIPELINE_TRANSFORMER).to_list() def time_function(func, *args, **kwargs) -> float: From 9eb3e5ac94c24c835eeab4ec67c07070875d0fd9 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 13:48:59 +0000 Subject: [PATCH 09/10] fix: optimised function calls and the amount of wrappers --- efemel/pipeline/helpers.py | 35 +++++++++++++++++++++ efemel/pipeline/transformers/transformer.py | 24 +++++++------- test.py | 29 +++++++++++++++++ 3 files changed, 76 insertions(+), 12 deletions(-) create mode 100644 test.py diff --git a/efemel/pipeline/helpers.py b/efemel/pipeline/helpers.py index 81d61c1..3beb110 100644 --- a/efemel/pipeline/helpers.py +++ b/efemel/pipeline/helpers.py @@ -2,6 +2,41 @@ import inspect +def is_context_aware(func: Callable, min_params: int = 2) -> bool: + """ + Checks if a function is "context-aware" by inspecting its signature. + + A function is considered context-aware if it accepts a minimum number + of positional arguments. + + - For standard maps/filters: `min_params=2` (e.g., `(value, context)`) + - For reduce functions: `min_params=3` (e.g., `(accumulator, value, context)`) + + Args: + func: The function or callable to inspect. + min_params: The minimum number of arguments required for the + function to be considered context-aware. + + Returns: + True if the function accepts the minimum number of arguments, + False otherwise. + """ + try: + # Get the function's signature. + sig = inspect.signature(func) + + # Filter for parameters that can be passed by position. + params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] + + # Check if the function meets the minimum parameter count. + return len(params) >= min_params + + except (ValueError, TypeError): + # This handles built-ins or other non-inspectable callables, + # which we assume are not context-aware. + return False + + def create_context_aware_function(func: Callable) -> Callable: """ Normalizes a user-provided function to always accept a context argument. diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py index 66bfe3b..a348666 100644 --- a/efemel/pipeline/transformers/transformer.py +++ b/efemel/pipeline/transformers/transformer.py @@ -10,7 +10,7 @@ from typing import Union from typing import overload -from efemel.pipeline.helpers import create_context_aware_function +from efemel.pipeline.helpers import is_context_aware DEFAULT_CHUNK_SIZE = 1000 @@ -88,14 +88,17 @@ def _create_reduce_function(self, func: PipelineReduceFunction) -> Callable[[Any def map[U](self, function: PipelineFunction[Out, U]) -> "Transformer[In, U]": """Transforms elements, passing context explicitly to the mapping function.""" - std_function = create_context_aware_function(function) - # The operation passed to _pipe receives the chunk and context, and applies the function. - return self._pipe(lambda chunk, ctx: [std_function(x, ctx) for x in chunk]) + if is_context_aware(function): + return self._pipe(lambda chunk, ctx: [function(x, ctx) for x in chunk]) + + return self._pipe(lambda chunk, _ctx: [function(x) for x in chunk]) def filter(self, predicate: PipelineFunction[Out, bool]) -> "Transformer[In, Out]": """Filters elements, passing context explicitly to the predicate function.""" - std_predicate = create_context_aware_function(predicate) - return self._pipe(lambda chunk, ctx: [x for x in chunk if std_predicate(x, ctx)]) + if is_context_aware(predicate): + return self._pipe(lambda chunk, ctx: [x for x in chunk if predicate(x, ctx)]) + + return self._pipe(lambda chunk, _ctx: [x for x in chunk if predicate(x)]) @overload def flatten[T](self: "Transformer[In, list[T]]") -> "Transformer[In, T]": ... @@ -112,14 +115,11 @@ def flatten[T]( def tap(self, function: PipelineFunction[Out, Any]) -> "Transformer[In, Out]": """Applies a side-effect function without modifying the data.""" - std_function = create_context_aware_function(function) - def tap_operation(chunk: list[Out], ctx: PipelineContext) -> list[Out]: - for item in chunk: - std_function(item, ctx) - return chunk + if is_context_aware(function): + return self._pipe(lambda chunk, ctx: [x for x in chunk if function(x, ctx) or True]) - return self._pipe(tap_operation) + return self._pipe(lambda chunk, ctx: [x for x in chunk if function(x) or True]) def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In, T]": """Apply another pipeline to the current one.""" diff --git a/test.py b/test.py new file mode 100644 index 0000000..a2328d3 --- /dev/null +++ b/test.py @@ -0,0 +1,29 @@ +from efemel.pipeline.pipeline import Pipeline +from efemel.pipeline.transformers.transformer import Transformer + + +def generate_test_data(size: int = 1_000_000) -> list[int]: + """Generate test data of specified size.""" + return range(size) # type: ignore + + +def run(): + PIPELINE_TRANSFORMER = ( + Transformer() + .filter(lambda x: x % 2 == 0) # Filter even numbers + .map(lambda x: x * 2) # Double them + .filter(lambda x: x > 100) # Filter > 100 + .map(lambda x: x + 1) + ) + + for i in range(20): + Pipeline(generate_test_data(1_000_000)).apply(PIPELINE_TRANSFORMER).to_list() + print(f"Finished run {i + 1}") + + +if __name__ == "__main__": + try: + run() + except Exception as e: + print(f"\n❌ Test failed with error: {e}") + raise From 84edcecae8a12a0e55d77d0223ee6fc1e8634667 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 15 Jul 2025 14:56:44 +0000 Subject: [PATCH 10/10] fix: remove function wrappers --- efemel/pipeline/helpers.py | 68 ++++++++++----------- efemel/pipeline/pipeline.py | 11 +++- efemel/pipeline/transformers/transformer.py | 46 ++++++-------- performance_test.py | 2 +- 4 files changed, 59 insertions(+), 68 deletions(-) diff --git a/efemel/pipeline/helpers.py b/efemel/pipeline/helpers.py index 3beb110..b03f933 100644 --- a/efemel/pipeline/helpers.py +++ b/efemel/pipeline/helpers.py @@ -1,55 +1,49 @@ from collections.abc import Callable import inspect +from typing import Any +from typing import TypeGuard -def is_context_aware(func: Callable, min_params: int = 2) -> bool: - """ - Checks if a function is "context-aware" by inspecting its signature. +# --- Type Aliases --- +class PipelineContext(dict): + """Generic, untyped context available to all pipeline operations.""" + + pass - A function is considered context-aware if it accepts a minimum number - of positional arguments. - - For standard maps/filters: `min_params=2` (e.g., `(value, context)`) - - For reduce functions: `min_params=3` (e.g., `(accumulator, value, context)`) +# Define the specific callables for clarity +ContextAwareCallable = Callable[[Any, PipelineContext], Any] +ContextAwareReduceCallable = Callable[[Any, Any, PipelineContext], Any] - Args: - func: The function or callable to inspect. - min_params: The minimum number of arguments required for the - function to be considered context-aware. - Returns: - True if the function accepts the minimum number of arguments, - False otherwise. +def get_function_param_count(func: Callable[..., Any]) -> int: + """ + Returns the number of parameters a function accepts, excluding `self` or `cls`. + This is useful for determining if a function is context-aware. """ try: - # Get the function's signature. sig = inspect.signature(func) - - # Filter for parameters that can be passed by position. params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] + return len(params) + except (ValueError, TypeError): + return 0 - # Check if the function meets the minimum parameter count. - return len(params) >= min_params - except (ValueError, TypeError): - # This handles built-ins or other non-inspectable callables, - # which we assume are not context-aware. - return False +def is_context_aware(func: Callable[..., Any]) -> TypeGuard[ContextAwareCallable]: + """ + Checks if a function is "context-aware" by inspecting its signature. + + This function uses a TypeGuard, allowing Mypy to narrow the type of + the checked function in conditional blocks. + """ + return get_function_param_count(func) >= 2 -def create_context_aware_function(func: Callable) -> Callable: +def is_context_aware_reduce(func: Callable[..., Any]) -> TypeGuard[ContextAwareReduceCallable]: """ - Normalizes a user-provided function to always accept a context argument. - It no longer closes over `self.context`. + Checks if a function is "context-aware" by inspecting its signature. + + This function uses a TypeGuard, allowing Mypy to narrow the type of + the checked function in conditional blocks. """ - try: - sig = inspect.signature(func) - params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] - # If the function already takes 2+ args, we assume it's context-aware. - if len(params) >= 2: - return func # type: ignore - except (ValueError, TypeError): - # This handles built-ins or other non-inspectable callables. - pass - # If the function takes only one argument, we adapt it to accept a context that it will ignore. - return lambda value, ctx: func(value) # type: ignore + return get_function_param_count(func) >= 3 diff --git a/efemel/pipeline/pipeline.py b/efemel/pipeline/pipeline.py index 3afc79a..01b1a8a 100644 --- a/efemel/pipeline/pipeline.py +++ b/efemel/pipeline/pipeline.py @@ -7,7 +7,7 @@ from typing import TypeVar from typing import overload -from efemel.pipeline.helpers import create_context_aware_function +from efemel.pipeline.helpers import is_context_aware from .transformers.transformer import Transformer @@ -68,8 +68,13 @@ def apply[U]( self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore case _ if callable(transformer): # If a callable function is provided, call it with the current data and context - processed_transformer = create_context_aware_function(transformer) # type: ignore - self.processed_data = processed_transformer(self.processed_data, self.ctx) + + if is_context_aware(transformer): + processed_transformer = transformer + else: + processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731 + + self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore case _: raise TypeError("Transformer must be a Transformer instance or a callable function") diff --git a/efemel/pipeline/transformers/transformer.py b/efemel/pipeline/transformers/transformer.py index a348666..aab2dfa 100644 --- a/efemel/pipeline/transformers/transformer.py +++ b/efemel/pipeline/transformers/transformer.py @@ -3,25 +3,19 @@ from collections.abc import Iterator import copy from functools import reduce -import inspect import itertools from typing import Any from typing import Self from typing import Union from typing import overload +from efemel.pipeline.helpers import PipelineContext from efemel.pipeline.helpers import is_context_aware +from efemel.pipeline.helpers import is_context_aware_reduce DEFAULT_CHUNK_SIZE = 1000 -# --- Type Aliases --- -class PipelineContext(dict): - """Generic, untyped context available to all pipeline operations.""" - - pass - - type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, PipelineContext], T] type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, PipelineContext], U] @@ -74,18 +68,6 @@ def _pipe[U](self, operation: Callable[[list[Out], PipelineContext], list[U]]) - self.transformer = lambda chunk, ctx: operation(prev_transformer(chunk, ctx), ctx) # type: ignore return self # type: ignore - def _create_reduce_function(self, func: PipelineReduceFunction) -> Callable[[Any, Any, PipelineContext], Any]: - """Normalizes a user-provided reduce function to accept a context argument.""" - try: - sig = inspect.signature(func) - params = [p for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)] - if len(params) >= 3: - return func # type: ignore - except (ValueError, TypeError): - pass - # Adapt a simple reducer (e.g., lambda acc, val: acc + val). - return lambda acc, value, ctx: func(acc, value) # type: ignore - def map[U](self, function: PipelineFunction[Out, U]) -> "Transformer[In, U]": """Transforms elements, passing context explicitly to the mapping function.""" if is_context_aware(function): @@ -141,19 +123,29 @@ def __call__(self, data: Iterable[In], context: PipelineContext | None = None) - def reduce[U](self, function: PipelineReduceFunction[U, Out], initial: U): """Reduces elements to a single value (terminal operation).""" - std_reducer = self._create_reduce_function(function) + if is_context_aware_reduce(function): + + def _reduce_with_context(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: + # The context for the run is determined here. + run_context = context or self.context + + data_iterator = self(data, run_context) + + def function_wrapper(acc: U, value: Out) -> U: + return function(acc, value, run_context) + + yield reduce(function_wrapper, data_iterator, initial) + + return _reduce_with_context + + # Not context-aware, so we adapt the function to ignore the context. def _reduce(data: Iterable[In], context: PipelineContext | None = None) -> Iterator[U]: # The context for the run is determined here. run_context = context or self.context - # The generator now needs the context to pass to the transformer. data_iterator = self(data, run_context) - # We need a new reducer that curries the context for functools.reduce. - def reducer_with_context(acc, value): - return std_reducer(acc, value, run_context) - - yield reduce(reducer_with_context, data_iterator, initial) + yield reduce(function, data_iterator, initial) return _reduce diff --git a/performance_test.py b/performance_test.py index 9f0a182..08dae5d 100644 --- a/performance_test.py +++ b/performance_test.py @@ -406,7 +406,7 @@ def chunked_pipeline_per_item_approach(data) -> list[int]: return list(PIPELINE_PER_ITEM.run(data)) -PIPELINE_TRANSFORMER = ( +PIPELINE_TRANSFORMER: Transformer = ( Transformer() .filter(lambda x: x % 2 == 0) # Filter even numbers .map(lambda x: x * 2) # Double them