From 51f101fbd9055068884f0745b159ca403a9ca0e2 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Fri, 18 Jul 2025 13:12:11 +0000 Subject: [PATCH 1/6] chore: created a threaded transformer (a copy) --- laygo/__init__.py | 2 + laygo/transformers/threaded.py | 201 ++++++++++++++++++ tests/test_threaded_transformer.py | 329 +++++++++++++++++++++++++++++ 3 files changed, 532 insertions(+) create mode 100644 laygo/transformers/threaded.py create mode 100644 tests/test_threaded_transformer.py diff --git a/laygo/__init__.py b/laygo/__init__.py index 611995c..cf3caba 100644 --- a/laygo/__init__.py +++ b/laygo/__init__.py @@ -7,11 +7,13 @@ from laygo.pipeline import Pipeline from laygo.transformers.http import HTTPTransformer from laygo.transformers.parallel import ParallelTransformer +from laygo.transformers.threaded import ThreadedTransformer from laygo.transformers.transformer import Transformer __all__ = [ "Pipeline", "Transformer", + "ThreadedTransformer", "ParallelTransformer", "HTTPTransformer", "PipelineContext", diff --git a/laygo/transformers/threaded.py b/laygo/transformers/threaded.py new file mode 100644 index 0000000..2e57c86 --- /dev/null +++ b/laygo/transformers/threaded.py @@ -0,0 +1,201 @@ +"""Parallel transformer implementation using multiple threads.""" + +from collections import deque +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import Future +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait +import copy +from functools import partial +import itertools +import threading +from typing import Any +from typing import Union +from typing import overload + +from laygo.errors import ErrorHandler +from laygo.helpers import PipelineContext +from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE +from laygo.transformers.transformer import ChunkErrorHandler +from laygo.transformers.transformer import InternalTransformer +from laygo.transformers.transformer import PipelineFunction +from laygo.transformers.transformer import Transformer + + +class ThreadedPipelineContextType(PipelineContext): + """A specific context type for threaded transformers that includes a lock.""" + + lock: threading.Lock + + +class ThreadedTransformer[In, Out](Transformer[In, Out]): + """ + A transformer that executes operations concurrently using multiple threads. + """ + + def __init__( + self, + max_workers: int = 4, + ordered: bool = True, + chunk_size: int = DEFAULT_CHUNK_SIZE, + transformer: InternalTransformer[In, Out] | None = None, + ): + """ + Initialize the threaded transformer. + + Args: + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. If False, results + are yielded as they complete. + chunk_size: Size of data chunks to process. + transformer: The transformation logic chain. + """ + super().__init__(chunk_size, transformer) + self.max_workers = max_workers + self.ordered = ordered + + @classmethod + def from_transformer[T, U]( + cls, + transformer: Transformer[T, U], + chunk_size: int | None = None, + max_workers: int = 4, + ordered: bool = True, + ) -> "ThreadedTransformer[T, U]": + """ + Create a ThreadedTransformer from an existing Transformer's logic. + + Args: + transformer: The base transformer to copy the transformation logic from. + chunk_size: Optional chunk size override. + max_workers: Maximum number of worker threads. + ordered: If True, results are yielded in order. + + Returns: + A new ThreadedTransformer with the same transformation logic. + """ + return cls( + chunk_size=chunk_size or transformer.chunk_size, + transformer=copy.deepcopy(transformer.transformer), # type: ignore + max_workers=max_workers, + ordered=ordered, + ) + + def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: + """ + Executes the transformer on data concurrently. + + A new `threading.Lock` is created and added to the context for each call + to ensure execution runs are isolated and thread-safe. + """ + # Determine the context for this run, passing it by reference as requested. + run_context = context or self.context + # Add a per-call lock for thread safety. + run_context["lock"] = threading.Lock() + + def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]: + """ + Process a single chunk by passing the chunk and context explicitly + to the transformer chain. This is safer and avoids mutating self. + """ + return self.transformer(chunk, shared_context) + + # Create a partial function with the run_context "baked in". + process_chunk_with_context = partial(process_chunk, shared_context=run_context) + + def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results in their original order.""" + futures: deque[Future[list[Out]]] = deque() + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + break + while futures: + yield futures.popleft().result() + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + continue + + def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results as they complete.""" + futures = { + executor.submit(process_chunk_with_context, chunk) + for chunk in itertools.islice(chunks_iter, self.max_workers + 1) + } + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() + try: + chunk = next(chunks_iter) + futures.add(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + continue + + def result_iterator_manager() -> Iterator[Out]: + """Manage the thread pool and yield flattened results.""" + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + chunks_to_process = self._chunk_generator(data) + gen_func = _ordered_generator if self.ordered else _unordered_generator + processed_chunks_iterator = gen_func(chunks_to_process, executor) + for result_chunk in processed_chunks_iterator: + yield from result_chunk + + return result_iterator_manager() + + # --- Overridden Chaining Methods to Preserve Type --- + + def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ThreadedTransformer[In, Out]": + super().on_error(handler) + return self + + def map[U](self, function: PipelineFunction[Out, U]) -> "ThreadedTransformer[In, U]": + super().map(function) + return self # type: ignore + + def filter(self, predicate: PipelineFunction[Out, bool]) -> "ThreadedTransformer[In, Out]": + super().filter(predicate) + return self + + @overload + def flatten[T](self: "ThreadedTransformer[In, list[T]]") -> "ThreadedTransformer[In, T]": ... + @overload + def flatten[T](self: "ThreadedTransformer[In, tuple[T, ...]]") -> "ThreadedTransformer[In, T]": ... + @overload + def flatten[T](self: "ThreadedTransformer[In, set[T]]") -> "ThreadedTransformer[In, T]": ... + def flatten[T]( # type: ignore + self: Union[ + "ThreadedTransformer[In, list[T]]", "ThreadedTransformer[In, tuple[T, ...]]", "ThreadedTransformer[In, set[T]]" + ], + ) -> "ThreadedTransformer[In, T]": + super().flatten() # type: ignore + return self # type: ignore + + def tap(self, function: PipelineFunction[Out, Any]) -> "ThreadedTransformer[In, Out]": + super().tap(function) + return self + + def apply[T]( + self, t: Callable[["ThreadedTransformer[In, Out]"], "Transformer[In, T]"] + ) -> "ThreadedTransformer[In, T]": + super().apply(t) # type: ignore + return self # type: ignore + + def catch[U]( + self, + sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]], + on_error: ChunkErrorHandler[Out, U] | None = None, + ) -> "ThreadedTransformer[In, U]": + super().catch(sub_pipeline_builder, on_error) + return self # type: ignore + + def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "ThreadedTransformer[In, Out]": + super().short_circuit(function) + return self diff --git a/tests/test_threaded_transformer.py b/tests/test_threaded_transformer.py new file mode 100644 index 0000000..9d9fce3 --- /dev/null +++ b/tests/test_threaded_transformer.py @@ -0,0 +1,329 @@ +"""Tests for the ThreadedTransformer class.""" + +import threading +import time +from unittest.mock import patch + +from laygo import ErrorHandler +from laygo import PipelineContext +from laygo import ThreadedTransformer +from laygo import Transformer + + +class TestThreadedTransformerBasics: + """Test core parallel transformer functionality.""" + + def test_initialization_defaults(self): + """Test parallel transformer initialization with default values.""" + transformer = ThreadedTransformer[int, int]() + assert transformer.max_workers == 4 + assert transformer.ordered is True + assert transformer.chunk_size == 1000 + + def test_initialization_custom_parameters(self): + """Test initialization with custom parameters.""" + transformer = ThreadedTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) + assert transformer.max_workers == 8 + assert transformer.ordered is False + assert transformer.chunk_size == 500 + + def test_basic_execution(self): + """Test basic parallel transformer execution.""" + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=3) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [1, 2, 3, 4, 5] + + def test_from_transformer_creation(self): + """Test creating ThreadedTransformer from existing Transformer.""" + regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) + parallel = ThreadedTransformer.from_transformer(regular, max_workers=2, ordered=True) + + data = [1, 2, 3, 4, 5, 6] + regular_results = list(regular(data)) + parallel_results = list(parallel(data)) + + assert regular_results == parallel_results + assert parallel.max_workers == 2 + assert parallel.ordered is True + assert parallel.chunk_size == 100 + + +class TestThreadedTransformerOperations: + """Test parallel transformer operations like map, filter, etc.""" + + def test_map_concurrent_execution(self): + """Test map operation with concurrent execution.""" + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([1, 2, 3, 4])) + assert result == [2, 4, 6, 8] + + def test_filter_concurrent_execution(self): + """Test filter operation with concurrent execution.""" + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) + result = list(transformer([1, 2, 3, 4, 5, 6])) + assert result == [2, 4, 6] + + def test_chained_operations(self): + """Test chained operations work correctly with concurrency.""" + transformer = ( + ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + .map(lambda x: x * 2) + .filter(lambda x: x > 4) + .map(lambda x: x + 1) + ) + result = list(transformer([1, 2, 3, 4, 5])) + assert result == [7, 9, 11] # [2,4,6,8,10] -> [6,8,10] -> [7,9,11] + + def test_flatten_operation(self): + """Test flatten operation with concurrent execution.""" + transformer = ThreadedTransformer[list[int], list[int]](max_workers=2, chunk_size=2).flatten() + result = list(transformer([[1, 2], [3, 4], [5, 6]])) + assert result == [1, 2, 3, 4, 5, 6] + + def test_tap_side_effects(self): + """Test tap applies side effects correctly in concurrent execution.""" + side_effects = [] + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.tap(lambda x: side_effects.append(x)) + result = list(transformer([1, 2, 3, 4])) + + assert result == [1, 2, 3, 4] # Data unchanged + assert sorted(side_effects) == [1, 2, 3, 4] # Side effects applied (may be out of order) + + +class TestThreadedTransformerContextSupport: + """Test context-aware parallel transformer operations.""" + + def test_map_with_context(self): + """Test map with context-aware function in concurrent execution.""" + context = PipelineContext({"multiplier": 3}) + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) + result = list(transformer([1, 2, 3], context)) + assert result == [3, 6, 9] + + def test_context_modification_with_locking(self): + """Test safe context modification with locking in concurrent execution.""" + context = PipelineContext({"items": 0, "_lock": threading.Lock()}) + + def safe_increment(x: int, ctx: PipelineContext) -> int: + with ctx["_lock"]: + current_items = ctx["items"] + time.sleep(0.001) # Increase chance of race condition + ctx["items"] = current_items + 1 + return x * 2 + + transformer = ThreadedTransformer[int, int](max_workers=4, chunk_size=1) + transformer = transformer.map(safe_increment) + + data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + result = list(transformer(data, context)) + + assert sorted(result) == sorted([x * 2 for x in data]) + assert context["items"] == len(data) + + def test_multiple_context_values_modification(self): + """Test modifying multiple context values safely.""" + context = PipelineContext({"total_sum": 0, "item_count": 0, "max_value": 0, "_lock": threading.Lock()}) + + def update_stats(x: int, ctx: PipelineContext) -> int: + with ctx["_lock"]: + ctx["total_sum"] += x + ctx["item_count"] += 1 + ctx["max_value"] = max(ctx["max_value"], x) + return x * 3 + + transformer = ThreadedTransformer[int, int](max_workers=3, chunk_size=2) + transformer = transformer.map(update_stats) + + data = [1, 5, 3, 8, 2, 7, 4, 6] + result = list(transformer(data, context)) + + assert sorted(result) == sorted([x * 3 for x in data]) + assert context["total_sum"] == sum(data) + assert context["item_count"] == len(data) + assert context["max_value"] == max(data) + + +class TestThreadedTransformerOrdering: + """Test ordering behavior of parallel transformer.""" + + def test_ordered_execution_maintains_sequence(self): + """Test that ordered=True maintains element order despite variable processing time.""" + + def variable_time_transform(x: int) -> int: + time.sleep(0.01 * (5 - x)) # Later elements process faster + return x * 2 + + transformer = ThreadedTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) + transformer = transformer.map(variable_time_transform) + result = list(transformer([1, 2, 3, 4, 5])) + + assert result == [2, 4, 6, 8, 10] # Order maintained + + def test_unordered_vs_ordered_same_elements(self): + """Test that ordered and unordered produce same elements with different ordering.""" + data = list(range(10)) + + ordered_transformer = ThreadedTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) + ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) + + unordered_transformer = ThreadedTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) + unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) + + assert sorted(ordered_result) == sorted(unordered_result) + assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence + + +class TestThreadedTransformerPerformance: + """Test performance aspects of parallel transformer.""" + + def test_concurrent_performance_improvement(self): + """Test that concurrent execution improves performance for slow operations.""" + + def slow_operation(x: int) -> int: + time.sleep(0.01) # 10ms delay + return x * 2 + + data = list(range(8)) # 8 items, 80ms total sequential time + + # Sequential execution + start_time = time.time() + sequential = Transformer[int, int](chunk_size=4) + seq_result = list(sequential.map(slow_operation)(data)) + seq_time = time.time() - start_time + + # Concurrent execution + start_time = time.time() + concurrent = ThreadedTransformer[int, int](max_workers=4, chunk_size=4) + conc_result = list(concurrent.map(slow_operation)(data)) + conc_time = time.time() - start_time + + assert seq_result == conc_result + assert conc_time < seq_time * 0.8 # At least 20% faster + + def test_thread_pool_management(self): + """Test that thread pool is properly created and cleaned up.""" + with patch("laygo.transformers.threaded.ThreadPoolExecutor") as mock_executor: + mock_executor.return_value.__enter__.return_value = mock_executor.return_value + mock_executor.return_value.__exit__.return_value = None + mock_executor.return_value.submit.return_value.result.return_value = [2, 4] + + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + list(transformer([1, 2])) + + mock_executor.assert_called_with(max_workers=2) + mock_executor.return_value.__enter__.assert_called_once() + mock_executor.return_value.__exit__.assert_called_once() + + +class TestThreadedTransformerChunking: + """Test chunking behavior with concurrent execution.""" + + def test_chunking_effectiveness(self): + """Test that chunking works correctly with concurrent execution.""" + processed_chunks = [] + + def track_processing(x: int) -> int: + processed_chunks.append(x) + return x * 2 + + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=3) + transformer = transformer.map(track_processing) + result = list(transformer([1, 2, 3, 4, 5, 6, 7])) + + assert result == [2, 4, 6, 8, 10, 12, 14] + assert sorted(processed_chunks) == [1, 2, 3, 4, 5, 6, 7] + + def test_large_chunk_size_handling(self): + """Test parallel transformer with large chunk size relative to data.""" + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=1000) + transformer = transformer.map(lambda x: x + 1) + large_data = list(range(100)) # Much smaller than chunk size + result = list(transformer(large_data)) + expected = [x + 1 for x in large_data] + assert result == expected + + +class TestThreadedTransformerEdgeCases: + """Test edge cases and boundary conditions.""" + + def test_empty_data(self): + """Test parallel transformer with empty data.""" + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + result = list(transformer([])) + assert result == [] + + def test_single_element(self): + """Test parallel transformer with single element.""" + transformer = ( + ThreadedTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2).filter(lambda x: x > 0) + ) + result = list(transformer([5])) + assert result == [10] + + def test_data_smaller_than_chunk_size(self): + """Test when data is smaller than chunk size.""" + transformer = ThreadedTransformer[int, int](max_workers=4, chunk_size=100) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) + assert result == [2, 4, 6] + + def test_more_workers_than_chunks(self): + """Test when workers exceed number of chunks.""" + transformer = ThreadedTransformer[int, int](max_workers=10, chunk_size=2) + transformer = transformer.map(lambda x: x * 2) + result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers + assert result == [2, 4, 6] + + def test_exception_propagation(self): + """Test that exceptions in worker threads are properly propagated.""" + + def failing_function(x: int) -> int: + if x == 3: + raise ValueError("Test exception") + return x * 2 + + transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + transformer = transformer.map(failing_function) + + try: + list(transformer([1, 2, 3, 4])) + raise AssertionError("Expected exception was not raised") + except ValueError as e: + assert "Test exception" in str(e) + + +class TestThreadedTransformerErrorHandling: + """Test error handling with parallel transformer.""" + + def test_safe_with_successful_operation(self): + """Test safe execution with successful transformation.""" + transformer = ThreadedTransformer.init(int).catch(lambda t: t.map(lambda x: x * 2)) + result = list(transformer([1, 2, 3])) + assert result == [2, 4, 6] + + def test_safe_with_error_isolation(self): + """Test safe execution isolates errors to specific chunks.""" + errored_chunks = [] + transformer = ThreadedTransformer.init(int, chunk_size=1).catch( + lambda t: t.map(lambda x: x / 0), # Division by zero + on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore + ) + result = list(transformer([1, 2, 3])) + + assert result == [] # All operations failed + assert errored_chunks == [[1], [2], [3]] # Each chunk failed individually + + def test_global_error_handler(self): + """Test global error handling through error handler.""" + errored_chunks = [] + error_handler = ErrorHandler() + error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) + + transformer = ( + ThreadedTransformer.init(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) + ) + + list(transformer([1, 2, 3])) + assert errored_chunks == [[1], [2], [3]] From 1a3d782dcae38204a0d7a468486c722f7a263d43 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Fri, 18 Jul 2025 17:31:59 +0000 Subject: [PATCH 2/6] chore: transformer factory and tests --- laygo/__init__.py | 6 + laygo/transformers/http.py | 20 ++- laygo/transformers/parallel.py | 220 +++++++++++++----------- laygo/transformers/threaded.py | 17 +- laygo/transformers/transformer.py | 14 +- pyproject.toml | 5 +- tests/test_integration.py | 71 ++++---- tests/test_parallel_transformer.py | 259 +++++++++-------------------- tests/test_pipeline.py | 8 +- tests/test_threaded_transformer.py | 10 +- tests/test_transformer.py | 47 +++--- uv.lock | 23 +++ 12 files changed, 347 insertions(+), 353 deletions(-) diff --git a/laygo/__init__.py b/laygo/__init__.py index cf3caba..defa647 100644 --- a/laygo/__init__.py +++ b/laygo/__init__.py @@ -7,14 +7,20 @@ from laygo.pipeline import Pipeline from laygo.transformers.http import HTTPTransformer from laygo.transformers.parallel import ParallelTransformer +from laygo.transformers.parallel import createParallelTransformer from laygo.transformers.threaded import ThreadedTransformer +from laygo.transformers.threaded import createThreadedTransformer from laygo.transformers.transformer import Transformer +from laygo.transformers.transformer import createTransformer __all__ = [ "Pipeline", "Transformer", + "createTransformer", "ThreadedTransformer", + "createThreadedTransformer", "ParallelTransformer", + "createParallelTransformer", "HTTPTransformer", "PipelineContext", "ErrorHandler", diff --git a/laygo/transformers/http.py b/laygo/transformers/http.py index f181160..5a60934 100644 --- a/laygo/transformers/http.py +++ b/laygo/transformers/http.py @@ -30,14 +30,30 @@ U = TypeVar("U") +def createHTTPTransformer[T]( + _type_hint: type[T], + base_url: str, + chunk_size: int | None = None, + endpoint: str | None = None, + max_workers: int = 4, +) -> "HTTPTransformer[T, T]": + """Create a new identity parallel transformer with an explicit type hint.""" + return HTTPTransformer[T, T]( + base_url=base_url, + endpoint=endpoint, + max_workers=max_workers, + chunk_size=chunk_size, + ) + + class HTTPTransformer(Transformer[In, Out]): """ A self-sufficient, chainable transformer that manages its own distributed execution and worker endpoint definition. """ - def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8): - super().__init__() + def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8, chunk_size: int | None = None): + super().__init__(chunk_size=chunk_size) self.base_url = base_url.rstrip("/") self.endpoint = endpoint self.max_workers = max_workers diff --git a/laygo/transformers/parallel.py b/laygo/transformers/parallel.py index c247ab5..d663868 100644 --- a/laygo/transformers/parallel.py +++ b/laygo/transformers/parallel.py @@ -1,58 +1,70 @@ -"""Parallel transformer implementation using multiple threads.""" +"""Parallel transformer implementation using multiple processes and loky.""" from collections import deque from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator +from collections.abc import MutableMapping from concurrent.futures import FIRST_COMPLETED from concurrent.futures import Future -from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait import copy -from functools import partial import itertools -import threading +import multiprocessing as mp from typing import Any from typing import Union from typing import overload +from loky import ProcessPoolExecutor + from laygo.errors import ErrorHandler from laygo.helpers import PipelineContext -from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE from laygo.transformers.transformer import ChunkErrorHandler from laygo.transformers.transformer import InternalTransformer from laygo.transformers.transformer import PipelineFunction from laygo.transformers.transformer import Transformer -class ParallelPipelineContextType(PipelineContext): - """A specific context type for parallel transformers that includes a lock.""" +def _process_chunk_for_multiprocessing[In, Out]( + transformer: InternalTransformer[In, Out], + shared_context: MutableMapping[str, Any], + chunk: list[In], +) -> list[Out]: + """ + Top-level function to process a single chunk. + 'loky' will use cloudpickle to serialize the 'transformer' object. + """ + return transformer(chunk, shared_context) # type: ignore + - lock: threading.Lock +def createParallelTransformer[T]( + _type_hint: type[T], + max_workers: int = 4, + ordered: bool = True, + chunk_size: int | None = None, +) -> "ParallelTransformer[T, T]": + """Create a new identity parallel transformer with an explicit type hint.""" + return ParallelTransformer[T, T]( + max_workers=max_workers, + ordered=ordered, + chunk_size=chunk_size, + transformer=None, + ) class ParallelTransformer[In, Out](Transformer[In, Out]): """ - A transformer that executes operations concurrently using multiple threads. + A transformer that executes operations concurrently using multiple processes. + It uses 'loky' to support dynamically created transformation logic. """ def __init__( self, max_workers: int = 4, ordered: bool = True, - chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_size: int | None = None, transformer: InternalTransformer[In, Out] | None = None, ): - """ - Initialize the parallel transformer. - - Args: - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. If False, results - are yielded as they complete. - chunk_size: Size of data chunks to process. - transformer: The transformation logic chain. - """ super().__init__(chunk_size, transformer) self.max_workers = max_workers self.ordered = ordered @@ -65,18 +77,6 @@ def from_transformer[T, U]( max_workers: int = 4, ordered: bool = True, ) -> "ParallelTransformer[T, U]": - """ - Create a ParallelTransformer from an existing Transformer's logic. - - Args: - transformer: The base transformer to copy the transformation logic from. - chunk_size: Optional chunk size override. - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. - - Returns: - A new ParallelTransformer with the same transformation logic. - """ return cls( chunk_size=chunk_size or transformer.chunk_size, transformer=copy.deepcopy(transformer.transformer), # type: ignore @@ -85,73 +85,101 @@ def from_transformer[T, U]( ) def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """ - Executes the transformer on data concurrently. - - A new `threading.Lock` is created and added to the context for each call - to ensure execution runs are isolated and thread-safe. - """ - # Determine the context for this run, passing it by reference as requested. - run_context = context or self.context - # Add a per-call lock for thread safety. - run_context["lock"] = threading.Lock() - - def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]: - """ - Process a single chunk by passing the chunk and context explicitly - to the transformer chain. This is safer and avoids mutating self. - """ - return self.transformer(chunk, shared_context) - - # Create a partial function with the run_context "baked in". - process_chunk_with_context = partial(process_chunk, shared_context=run_context) - - def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results in their original order.""" - futures: deque[Future[list[Out]]] = deque() - for _ in range(self.max_workers + 1): - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - break - while futures: - yield futures.popleft().result() + """Executes the transformer on data concurrently using processes.""" + with mp.Manager() as manager: + initial_ctx_data = context if context is not None else self.context + shared_context = manager.dict(initial_ctx_data) + + if "lock" not in shared_context: + shared_context["lock"] = manager.Lock() + + try: + with ProcessPoolExecutor(max_workers=self.max_workers) as executor: + chunks_to_process = self._chunk_generator(data) + gen_func = self._ordered_generator if self.ordered else self._unordered_generator + processed_chunks_iterator = gen_func(chunks_to_process, executor, shared_context) + + for result_chunk in processed_chunks_iterator: + yield from result_chunk + finally: + if context is not None: + final_context_state = dict(shared_context) + final_context_state.pop("lock", None) + # FIX 2: Do not clear the context, just update it. + # This allows chained transformers to merge their context results. + # context.clear() + context.update(final_context_state) + + # ... The rest of the file remains the same ... + def _ordered_generator( + self, + chunks_iter: Iterator[list[In]], + executor: ProcessPoolExecutor, + shared_context: MutableMapping[str, Any], + ) -> Iterator[list[Out]]: + """Generate results in their original order.""" + futures: deque[Future[list[Out]]] = deque() + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append( + executor.submit( + _process_chunk_for_multiprocessing, + self.transformer, + shared_context, + chunk, + ) + ) + except StopIteration: + break + while futures: + yield futures.popleft().result() + try: + chunk = next(chunks_iter) + futures.append( + executor.submit( + _process_chunk_for_multiprocessing, + self.transformer, + shared_context, + chunk, + ) + ) + except StopIteration: + continue + + def _unordered_generator( + self, + chunks_iter: Iterator[list[In]], + executor: ProcessPoolExecutor, + shared_context: MutableMapping[str, Any], + ) -> Iterator[list[Out]]: + """Generate results as they complete.""" + futures = { + executor.submit( + _process_chunk_for_multiprocessing, + self.transformer, + shared_context, + chunk, + ) + for chunk in itertools.islice(chunks_iter, self.max_workers + 1) + } + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() try: chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk_with_context, chunk)) + futures.add( + executor.submit( + _process_chunk_for_multiprocessing, + self.transformer, + shared_context, + chunk, + ) + ) except StopIteration: continue - def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results as they complete.""" - futures = { - executor.submit(process_chunk_with_context, chunk) - for chunk in itertools.islice(chunks_iter, self.max_workers + 1) - } - while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() - try: - chunk = next(chunks_iter) - futures.add(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - continue - - def result_iterator_manager() -> Iterator[Out]: - """Manage the thread pool and yield flattened results.""" - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - chunks_to_process = self._chunk_generator(data) - gen_func = _ordered_generator if self.ordered else _unordered_generator - processed_chunks_iterator = gen_func(chunks_to_process, executor) - for result_chunk in processed_chunks_iterator: - yield from result_chunk - - return result_iterator_manager() - - # --- Overridden Chaining Methods to Preserve Type --- - def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ParallelTransformer[In, Out]": super().on_error(handler) return self @@ -172,7 +200,9 @@ def flatten[T](self: "ParallelTransformer[In, tuple[T, ...]]") -> "ParallelTrans def flatten[T](self: "ParallelTransformer[In, set[T]]") -> "ParallelTransformer[In, T]": ... def flatten[T]( # type: ignore self: Union[ - "ParallelTransformer[In, list[T]]", "ParallelTransformer[In, tuple[T, ...]]", "ParallelTransformer[In, set[T]]" + "ParallelTransformer[In, list[T]]", + "ParallelTransformer[In, tuple[T, ...]]", + "ParallelTransformer[In, set[T]]", ], ) -> "ParallelTransformer[In, T]": super().flatten() # type: ignore diff --git a/laygo/transformers/threaded.py b/laygo/transformers/threaded.py index 2e57c86..8bd784e 100644 --- a/laygo/transformers/threaded.py +++ b/laygo/transformers/threaded.py @@ -31,6 +31,21 @@ class ThreadedPipelineContextType(PipelineContext): lock: threading.Lock +def createThreadedTransformer[T]( + _type_hint: type[T], + max_workers: int = 4, + ordered: bool = True, + chunk_size: int = DEFAULT_CHUNK_SIZE, +) -> "ThreadedTransformer[T, T]": + """Create a new identity threaded transformer with an explicit type hint.""" + return ThreadedTransformer[T, T]( + max_workers=max_workers, + ordered=ordered, + chunk_size=chunk_size, + transformer=None, + ) + + class ThreadedTransformer[In, Out](Transformer[In, Out]): """ A transformer that executes operations concurrently using multiple threads. @@ -40,7 +55,7 @@ def __init__( self, max_workers: int = 4, ordered: bool = True, - chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_size: int | None = None, transformer: InternalTransformer[In, Out] | None = None, ): """ diff --git a/laygo/transformers/transformer.py b/laygo/transformers/transformer.py index 6ed5df6..fe22cc0 100644 --- a/laygo/transformers/transformer.py +++ b/laygo/transformers/transformer.py @@ -25,6 +25,11 @@ type ChunkErrorHandler[In, U] = Callable[[list[In], Exception, PipelineContext], list[U]] +def createTransformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": + """Create a new identity pipeline with an explicit type hint.""" + return Transformer[T, T](chunk_size=chunk_size) # type: ignore + + class Transformer[In, Out]: """ Defines and composes data transformations by passing context explicitly. @@ -32,7 +37,7 @@ class Transformer[In, Out]: def __init__( self, - chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_size: int | None = DEFAULT_CHUNK_SIZE, transformer: InternalTransformer[In, Out] | None = None, ): self.chunk_size = chunk_size @@ -41,11 +46,6 @@ def __init__( self.transformer: InternalTransformer[In, Out] = transformer or (lambda chunk, ctx: chunk) # type: ignore self.error_handler = ErrorHandler() - @classmethod - def init[T](cls, _type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": - """Create a new identity pipeline with an explicit type hint.""" - return cls(chunk_size=chunk_size) # type: ignore - @classmethod def from_transformer[T, U]( cls, @@ -178,7 +178,7 @@ def catch[U]( self.on_error(on_error) # type: ignore # Create a blank transformer for the sub-pipeline - temp_transformer = Transformer.init(_type_hint=..., chunk_size=self.chunk_size) # type: ignore + temp_transformer = createTransformer(_type_hint=..., chunk_size=self.chunk_size) # type: ignore # Build the sub-pipeline and get its internal transformer function sub_pipeline = sub_pipeline_builder(temp_transformer) diff --git a/pyproject.toml b/pyproject.toml index 70a709a..172f43b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,10 @@ classifiers = [ "Typing :: Typed", ] -dependencies = ["requests>=2.32"] +dependencies = [ + "requests>=2.32", + "loky>=3.5.5", +] [project.urls] Homepage = "https://github.com/ringoldsdev/laygo-python" diff --git a/tests/test_integration.py b/tests/test_integration.py index 1c2a65a..9fd850a 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,11 +1,10 @@ """Integration tests for Pipeline and Transformer working together.""" -import threading - from laygo import ParallelTransformer from laygo import Pipeline from laygo import PipelineContext from laygo import Transformer +from laygo import createTransformer class TestPipelineTransformerBasics: @@ -13,7 +12,7 @@ class TestPipelineTransformerBasics: def test_basic_pipeline_transformer_integration(self): """Test basic pipeline and transformer integration.""" - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 5) + transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 5) result = Pipeline([1, 2, 3, 4, 5]).apply(transformer).to_list() assert result == [6, 8, 10] @@ -83,6 +82,39 @@ def validate_and_convert(x): assert valid_numbers == [1.0, 2.0, 3.0, 5.0, 7.0] +def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: + with ctx["lock"]: + ctx["processed_count"] += 1 + ctx["sum_total"] += x + return x * 2 + + +def count_and_transform(x: int, ctx: PipelineContext) -> int: + with ctx["lock"]: + ctx["items_processed"] += 1 + if x % 2 == 0: + ctx["even_count"] += 1 + else: + ctx["odd_count"] += 1 + return x * 3 + + +def stage1_processor(x: int, ctx: PipelineContext) -> int: + """First stage processing with context update.""" + with ctx["lock"]: + ctx["stage1_processed"] += 1 + ctx["total_sum"] += x + return x * 2 + + +def stage2_processor(x: int, ctx: PipelineContext) -> int: + """Second stage processing with context update.""" + with ctx["lock"]: + ctx["stage2_processed"] += 1 + ctx["total_sum"] += x # Add transformed value too + return x + 10 + + class TestPipelineParallelTransformerIntegration: """Test Pipeline integration with ParallelTransformer and context modification.""" @@ -96,13 +128,7 @@ def test_parallel_transformer_basic_integration(self): def test_parallel_transformer_with_context_modification(self): """Test parallel transformer safely modifying shared context.""" - context = PipelineContext({"processed_count": 0, "sum_total": 0, "_lock": threading.Lock()}) - - def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: - with ctx["_lock"]: - ctx["processed_count"] += 1 - ctx["sum_total"] += x - return x * 2 + context = PipelineContext({"processed_count": 0, "sum_total": 0}) parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) parallel_transformer = parallel_transformer.map(safe_increment_and_transform) @@ -118,16 +144,7 @@ def safe_increment_and_transform(x: int, ctx: PipelineContext) -> int: def test_pipeline_accesses_modified_context(self): """Test that pipeline can access context data modified by parallel transformer.""" - context = PipelineContext({"items_processed": 0, "even_count": 0, "odd_count": 0, "_lock": threading.Lock()}) - - def count_and_transform(x: int, ctx: PipelineContext) -> int: - with ctx["_lock"]: - ctx["items_processed"] += 1 - if x % 2 == 0: - ctx["even_count"] += 1 - else: - ctx["odd_count"] += 1 - return x * 3 + context = PipelineContext({"items_processed": 0, "even_count": 0, "odd_count": 0}) parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) parallel_transformer = parallel_transformer.map(count_and_transform) @@ -147,20 +164,6 @@ def test_multiple_parallel_transformers_chaining(self): # Shared context for statistics across transformations context = PipelineContext({"stage1_processed": 0, "stage2_processed": 0, "total_sum": 0}) - def stage1_processor(x: int, ctx: PipelineContext) -> int: - """First stage processing with context update.""" - with ctx["lock"]: - ctx["stage1_processed"] += 1 - ctx["total_sum"] += x - return x * 2 - - def stage2_processor(x: int, ctx: PipelineContext) -> int: - """Second stage processing with context update.""" - with ctx["lock"]: - ctx["stage2_processed"] += 1 - ctx["total_sum"] += x # Add transformed value too - return x + 10 - # Create two parallel transformers stage1 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage1_processor) stage2 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage2_processor) diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py index dcaf3fe..18c81f0 100644 --- a/tests/test_parallel_transformer.py +++ b/tests/test_parallel_transformer.py @@ -1,25 +1,19 @@ """Tests for the ParallelTransformer class.""" -import threading +import multiprocessing as mp import time from unittest.mock import patch from laygo import ErrorHandler from laygo import ParallelTransformer from laygo import PipelineContext -from laygo import Transformer +from laygo.transformers.parallel import createParallelTransformer +from laygo.transformers.transformer import createTransformer class TestParallelTransformerBasics: """Test core parallel transformer functionality.""" - def test_initialization_defaults(self): - """Test parallel transformer initialization with default values.""" - transformer = ParallelTransformer[int, int]() - assert transformer.max_workers == 4 - assert transformer.ordered is True - assert transformer.chunk_size == 1000 - def test_initialization_custom_parameters(self): """Test initialization with custom parameters.""" transformer = ParallelTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) @@ -35,8 +29,8 @@ def test_basic_execution(self): def test_from_transformer_creation(self): """Test creating ParallelTransformer from existing Transformer.""" - regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) - parallel = ParallelTransformer.from_transformer(regular, max_workers=2, ordered=True) + regular = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 5) + parallel = ParallelTransformer.from_transformer(regular, max_workers=2, chunk_size=10) data = [1, 2, 3, 4, 5, 6] regular_results = list(regular(data)) @@ -45,7 +39,7 @@ def test_from_transformer_creation(self): assert regular_results == parallel_results assert parallel.max_workers == 2 assert parallel.ordered is True - assert parallel.chunk_size == 100 + assert parallel.chunk_size == 10 class TestParallelTransformerOperations: @@ -53,42 +47,54 @@ class TestParallelTransformerOperations: def test_map_concurrent_execution(self): """Test map operation with concurrent execution.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + transformer = createParallelTransformer(int).map(lambda x: x * 2) result = list(transformer([1, 2, 3, 4])) assert result == [2, 4, 6, 8] def test_filter_concurrent_execution(self): """Test filter operation with concurrent execution.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) + transformer = createParallelTransformer(int).filter(lambda x: x % 2 == 0) result = list(transformer([1, 2, 3, 4, 5, 6])) assert result == [2, 4, 6] def test_chained_operations(self): """Test chained operations work correctly with concurrency.""" transformer = ( - ParallelTransformer[int, int](max_workers=2, chunk_size=2) - .map(lambda x: x * 2) - .filter(lambda x: x > 4) - .map(lambda x: x + 1) + createParallelTransformer(int, chunk_size=2).map(lambda x: x * 2).filter(lambda x: x > 4).map(lambda x: x + 1) ) result = list(transformer([1, 2, 3, 4, 5])) - assert result == [7, 9, 11] # [2,4,6,8,10] -> [6,8,10] -> [7,9,11] + assert result == [7, 9, 11] def test_flatten_operation(self): """Test flatten operation with concurrent execution.""" - transformer = ParallelTransformer[list[int], list[int]](max_workers=2, chunk_size=2).flatten() + # This defines a transformer that accepts iterables of lists and flattens them. + transformer = createParallelTransformer(list[int]).flatten() result = list(transformer([[1, 2], [3, 4], [5, 6]])) assert result == [1, 2, 3, 4, 5, 6] def test_tap_side_effects(self): """Test tap applies side effects correctly in concurrent execution.""" - side_effects = [] - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.tap(lambda x: side_effects.append(x)) - result = list(transformer([1, 2, 3, 4])) + with mp.Manager() as manager: + side_effects = manager.list() + transformer = createParallelTransformer(int).tap(lambda x: side_effects.append(x)) + result = list(transformer([1, 2, 3, 4])) + + assert result == [1, 2, 3, 4] + assert sorted(side_effects) == [1, 2, 3, 4] + + +def safe_increment(x: int, ctx: PipelineContext) -> int: + current_items = ctx["items"] + time.sleep(0.001) + ctx["items"] = current_items + 1 + return x * 2 + - assert result == [1, 2, 3, 4] # Data unchanged - assert sorted(side_effects) == [1, 2, 3, 4] # Side effects applied (may be out of order) +def update_stats(x: int, ctx: PipelineContext) -> int: + ctx["total_sum"] += x + ctx["item_count"] += 1 + ctx["max_value"] = max(ctx["max_value"], x) + return x * 3 class TestParallelTransformerContextSupport: @@ -97,26 +103,16 @@ class TestParallelTransformerContextSupport: def test_map_with_context(self): """Test map with context-aware function in concurrent execution.""" context = PipelineContext({"multiplier": 3}) - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) + transformer = createParallelTransformer(int).map(lambda x, ctx: x * ctx["multiplier"]) result = list(transformer([1, 2, 3], context)) assert result == [3, 6, 9] def test_context_modification_with_locking(self): """Test safe context modification with locking in concurrent execution.""" - context = PipelineContext({"items": 0, "_lock": threading.Lock()}) + context = PipelineContext({"items": 0}) - def safe_increment(x: int, ctx: PipelineContext) -> int: - with ctx["_lock"]: - current_items = ctx["items"] - time.sleep(0.001) # Increase chance of race condition - ctx["items"] = current_items + 1 - return x * 2 - - transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=1) - transformer = transformer.map(safe_increment) - - data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + transformer = createParallelTransformer(int, max_workers=4, chunk_size=1).map(safe_increment) + data = list(range(1, 11)) result = list(transformer(data, context)) assert sorted(result) == sorted([x * 2 for x in data]) @@ -124,18 +120,9 @@ def safe_increment(x: int, ctx: PipelineContext) -> int: def test_multiple_context_values_modification(self): """Test modifying multiple context values safely.""" - context = PipelineContext({"total_sum": 0, "item_count": 0, "max_value": 0, "_lock": threading.Lock()}) - - def update_stats(x: int, ctx: PipelineContext) -> int: - with ctx["_lock"]: - ctx["total_sum"] += x - ctx["item_count"] += 1 - ctx["max_value"] = max(ctx["max_value"], x) - return x * 3 - - transformer = ParallelTransformer[int, int](max_workers=3, chunk_size=2) - transformer = transformer.map(update_stats) + context = PipelineContext({"total_sum": 0, "item_count": 0, "max_value": 0}) + transformer = createParallelTransformer(int, max_workers=3, chunk_size=2).map(update_stats) data = [1, 5, 3, 8, 2, 7, 4, 6] result = list(transformer(data, context)) @@ -145,8 +132,8 @@ def update_stats(x: int, ctx: PipelineContext) -> int: assert context["max_value"] == max(data) -class TestParallelTransformerOrdering: - """Test ordering behavior of parallel transformer.""" +class TestParallelTransformerOrderingAndPerformance: + """Test ordering and performance aspects of the parallel transformer.""" def test_ordered_execution_maintains_sequence(self): """Test that ordered=True maintains element order despite variable processing time.""" @@ -155,60 +142,27 @@ def variable_time_transform(x: int) -> int: time.sleep(0.01 * (5 - x)) # Later elements process faster return x * 2 - transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) - transformer = transformer.map(variable_time_transform) + transformer = createParallelTransformer(int, max_workers=3, ordered=True).map(variable_time_transform) result = list(transformer([1, 2, 3, 4, 5])) - - assert result == [2, 4, 6, 8, 10] # Order maintained + assert result == [2, 4, 6, 8, 10] def test_unordered_vs_ordered_same_elements(self): """Test that ordered and unordered produce same elements with different ordering.""" data = list(range(10)) - - ordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) - ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) - - unordered_transformer = ParallelTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) - unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) + ordered_transformer = createParallelTransformer(int, max_workers=3, ordered=True).map(lambda x: x * 2) + ordered_result = list(ordered_transformer(data)) + unordered_transformer = createParallelTransformer(int, max_workers=3, ordered=False).map(lambda x: x * 2) + unordered_result = list(unordered_transformer(data)) assert sorted(ordered_result) == sorted(unordered_result) - assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence - - -class TestParallelTransformerPerformance: - """Test performance aspects of parallel transformer.""" - - def test_concurrent_performance_improvement(self): - """Test that concurrent execution improves performance for slow operations.""" - - def slow_operation(x: int) -> int: - time.sleep(0.01) # 10ms delay - return x * 2 - - data = list(range(8)) # 8 items, 80ms total sequential time - - # Sequential execution - start_time = time.time() - sequential = Transformer[int, int](chunk_size=4) - seq_result = list(sequential.map(slow_operation)(data)) - seq_time = time.time() - start_time + assert ordered_result == [x * 2 for x in data] - # Concurrent execution - start_time = time.time() - concurrent = ParallelTransformer[int, int](max_workers=4, chunk_size=4) - conc_result = list(concurrent.map(slow_operation)(data)) - conc_time = time.time() - start_time - - assert seq_result == conc_result - assert conc_time < seq_time * 0.8 # At least 20% faster - - def test_thread_pool_management(self): - """Test that thread pool is properly created and cleaned up.""" - with patch("laygo.transformers.parallel.ThreadPoolExecutor") as mock_executor: + def test_process_pool_management(self): + """Test that process pool is properly created and cleaned up.""" + with patch("laygo.transformers.parallel.ProcessPoolExecutor") as mock_executor: mock_executor.return_value.__enter__.return_value = mock_executor.return_value mock_executor.return_value.__exit__.return_value = None mock_executor.return_value.submit.return_value.result.return_value = [2, 4] - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) list(transformer([1, 2])) @@ -217,113 +171,54 @@ def test_thread_pool_management(self): mock_executor.return_value.__exit__.assert_called_once() -class TestParallelTransformerChunking: - """Test chunking behavior with concurrent execution.""" - - def test_chunking_effectiveness(self): - """Test that chunking works correctly with concurrent execution.""" - processed_chunks = [] - - def track_processing(x: int) -> int: - processed_chunks.append(x) - return x * 2 - - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) - transformer = transformer.map(track_processing) - result = list(transformer([1, 2, 3, 4, 5, 6, 7])) - - assert result == [2, 4, 6, 8, 10, 12, 14] - assert sorted(processed_chunks) == [1, 2, 3, 4, 5, 6, 7] - - def test_large_chunk_size_handling(self): - """Test parallel transformer with large chunk size relative to data.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=1000) - transformer = transformer.map(lambda x: x + 1) - large_data = list(range(100)) # Much smaller than chunk size - result = list(transformer(large_data)) - expected = [x + 1 for x in large_data] - assert result == expected - - -class TestParallelTransformerEdgeCases: - """Test edge cases and boundary conditions.""" +class TestParallelTransformerChunkingAndEdgeCases: + """Test chunking behavior and edge cases.""" def test_empty_data(self): """Test parallel transformer with empty data.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + transformer = createParallelTransformer(int).map(lambda x: x * 2) result = list(transformer([])) assert result == [] - def test_single_element(self): - """Test parallel transformer with single element.""" - transformer = ( - ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2).filter(lambda x: x > 0) - ) - result = list(transformer([5])) - assert result == [10] - - def test_data_smaller_than_chunk_size(self): - """Test when data is smaller than chunk size.""" - transformer = ParallelTransformer[int, int](max_workers=4, chunk_size=100) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3])) - assert result == [2, 4, 6] - - def test_more_workers_than_chunks(self): - """Test when workers exceed number of chunks.""" - transformer = ParallelTransformer[int, int](max_workers=10, chunk_size=2) - transformer = transformer.map(lambda x: x * 2) - result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers - assert result == [2, 4, 6] - def test_exception_propagation(self): - """Test that exceptions in worker threads are properly propagated.""" + """Test that exceptions in worker processes are properly propagated.""" def failing_function(x: int) -> int: if x == 3: raise ValueError("Test exception") - return x * 2 + return x - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) - transformer = transformer.map(failing_function) + import pytest - try: + transformer = createParallelTransformer(int, chunk_size=1).map(failing_function) + with pytest.raises(ValueError, match="Test exception"): list(transformer([1, 2, 3, 4])) - raise AssertionError("Expected exception was not raised") - except ValueError as e: - assert "Test exception" in str(e) class TestParallelTransformerErrorHandling: """Test error handling with parallel transformer.""" - def test_safe_with_successful_operation(self): - """Test safe execution with successful transformation.""" - transformer = ParallelTransformer.init(int).catch(lambda t: t.map(lambda x: x * 2)) - result = list(transformer([1, 2, 3])) - assert result == [2, 4, 6] - def test_safe_with_error_isolation(self): """Test safe execution isolates errors to specific chunks.""" - errored_chunks = [] - transformer = ParallelTransformer.init(int, chunk_size=1).catch( - lambda t: t.map(lambda x: x / 0), # Division by zero - on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore - ) - result = list(transformer([1, 2, 3])) - - assert result == [] # All operations failed - assert errored_chunks == [[1], [2], [3]] # Each chunk failed individually + with mp.Manager() as manager: + errored_chunks = manager.list() + transformer = createParallelTransformer(int, chunk_size=1).catch( + lambda t: t.map(lambda x: x / 0), # Division by zero + on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore + ) + result = list(transformer([1, 2, 3])) + assert result == [] + assert sorted(map(tuple, errored_chunks)) == sorted(map(tuple, [[1], [2], [3]])) def test_global_error_handler(self): """Test global error handling through error handler.""" - errored_chunks = [] - error_handler = ErrorHandler() - error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) - - transformer = ( - ParallelTransformer.init(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) - ) - - list(transformer([1, 2, 3])) - assert errored_chunks == [[1], [2], [3]] + with mp.Manager() as manager: + errored_chunks = manager.list() + error_handler = ErrorHandler() + error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) + + transformer = ( + createParallelTransformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) + ) + list(transformer([1, 2, 3])) + assert sorted(map(tuple, errored_chunks)) == sorted(map(tuple, [[1], [2], [3]])) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 95a2306..150470f 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,7 +1,7 @@ """Tests for the Pipeline class.""" from laygo import Pipeline -from laygo import Transformer +from laygo.transformers.transformer import createTransformer class TestPipelineBasics: @@ -36,7 +36,7 @@ class TestPipelineTransformations: def test_apply_with_transformer(self): """Test apply with transformer object.""" - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 4) + transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 4) result = Pipeline([1, 2, 3, 4]).apply(transformer).to_list() assert result == [6, 8] @@ -93,7 +93,7 @@ def test_first_with_insufficient_data(self): def test_consume_processes_without_return(self): """Test consume processes all elements without returning anything.""" side_effects = [] - transformer = Transformer.init(int).tap(lambda x: side_effects.append(x)) + transformer = createTransformer(int).tap(lambda x: side_effects.append(x)) result = Pipeline([1, 2, 3]).apply(transformer).consume() assert result is None @@ -166,7 +166,7 @@ def test_large_dataset_processing(self): def test_chunked_processing_consistency(self): """Test that chunked processing produces consistent results.""" # Use small chunk size to test chunking behavior - transformer = Transformer.init(int, chunk_size=10).map(lambda x: x + 1) + transformer = createTransformer(int, chunk_size=10).map(lambda x: x + 1) result = Pipeline(list(range(100))).apply(transformer).to_list() expected = list(range(1, 101)) # [1, 2, 3, ..., 100] diff --git a/tests/test_threaded_transformer.py b/tests/test_threaded_transformer.py index 9d9fce3..e10069c 100644 --- a/tests/test_threaded_transformer.py +++ b/tests/test_threaded_transformer.py @@ -8,6 +8,8 @@ from laygo import PipelineContext from laygo import ThreadedTransformer from laygo import Transformer +from laygo.transformers.threaded import createThreadedTransformer +from laygo.transformers.transformer import createTransformer class TestThreadedTransformerBasics: @@ -35,7 +37,7 @@ def test_basic_execution(self): def test_from_transformer_creation(self): """Test creating ThreadedTransformer from existing Transformer.""" - regular = Transformer.init(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) + regular = createTransformer(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) parallel = ThreadedTransformer.from_transformer(regular, max_workers=2, ordered=True) data = [1, 2, 3, 4, 5, 6] @@ -299,14 +301,14 @@ class TestThreadedTransformerErrorHandling: def test_safe_with_successful_operation(self): """Test safe execution with successful transformation.""" - transformer = ThreadedTransformer.init(int).catch(lambda t: t.map(lambda x: x * 2)) + transformer = createThreadedTransformer(int).catch(lambda t: t.map(lambda x: x * 2)) result = list(transformer([1, 2, 3])) assert result == [2, 4, 6] def test_safe_with_error_isolation(self): """Test safe execution isolates errors to specific chunks.""" errored_chunks = [] - transformer = ThreadedTransformer.init(int, chunk_size=1).catch( + transformer = createThreadedTransformer(int, chunk_size=1).catch( lambda t: t.map(lambda x: x / 0), # Division by zero on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore ) @@ -322,7 +324,7 @@ def test_global_error_handler(self): error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) transformer = ( - ThreadedTransformer.init(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) + createThreadedTransformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) ) list(transformer([1, 2, 3])) diff --git a/tests/test_transformer.py b/tests/test_transformer.py index 610b0d5..7135349 100644 --- a/tests/test_transformer.py +++ b/tests/test_transformer.py @@ -5,6 +5,7 @@ from laygo import ErrorHandler from laygo import PipelineContext from laygo import Transformer +from laygo.transformers.transformer import createTransformer class TestTransformerBasics: @@ -12,13 +13,13 @@ class TestTransformerBasics: def test_identity_transformer(self): """Test that init creates an identity transformer.""" - transformer = Transformer.init(int) + transformer = createTransformer(int) result = list(transformer([1, 2, 3])) assert result == [1, 2, 3] def test_custom_chunk_size(self): """Test transformer with custom chunk size.""" - transformer = Transformer.init(int, chunk_size=2) + transformer = createTransformer(int, chunk_size=2) assert transformer.chunk_size == 2 # Functionality should work regardless of chunk size result = list(transformer([1, 2, 3, 4])) @@ -30,27 +31,27 @@ class TestTransformerOperations: def test_map_transformation(self): """Test map transforms each element.""" - transformer = Transformer.init(int).map(lambda x: x * 2) + transformer = createTransformer(int).map(lambda x: x * 2) result = list(transformer([1, 2, 3])) assert result == [2, 4, 6] def test_filter_operation(self): """Test filter keeps only matching elements.""" - transformer = Transformer.init(int).filter(lambda x: x % 2 == 0) + transformer = createTransformer(int).filter(lambda x: x % 2 == 0) result = list(transformer([1, 2, 3, 4, 5, 6])) assert result == [2, 4, 6] def test_flatten_operation(self): """Test flatten with various iterable types.""" # Test with lists - transformer = Transformer.init(list).flatten() + transformer = createTransformer(list).flatten() result = list(transformer([[1, 2], [3, 4], [5]])) assert result == [1, 2, 3, 4, 5] def test_tap_side_effects(self): """Test tap applies side effects without modifying data.""" side_effects = [] - transformer = Transformer.init(int).tap(lambda x: side_effects.append(x)) + transformer = createTransformer(int).tap(lambda x: side_effects.append(x)) result = list(transformer([1, 2, 3])) assert result == [1, 2, 3] # Data unchanged @@ -90,14 +91,14 @@ class TestTransformerChaining: def test_map_filter_chain(self): """Test map followed by filter.""" - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 4) + transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 4) result = list(transformer([1, 2, 3, 4])) assert result == [6, 8] def test_complex_operation_chain(self): """Test complex chain with multiple operations.""" transformer = ( - Transformer.init(int) + createTransformer(int) .map(lambda x: [x, x * 2]) # Create pairs .flatten() # Flatten to single list .filter(lambda x: x > 3) # Keep values > 3 @@ -107,7 +108,7 @@ def test_complex_operation_chain(self): def test_transformer_composition(self): """Test transformer composition with apply.""" - base_transformer = Transformer.init(int).map(lambda x: x * 2) + base_transformer = createTransformer(int).map(lambda x: x * 2) composed_transformer = base_transformer.apply(lambda t: t.filter(lambda x: x > 4)) result = list(composed_transformer([1, 2, 3, 4])) assert result == [6, 8] @@ -118,7 +119,7 @@ class TestTransformerReduceOperations: def test_basic_reduce(self): """Test reduce with sum operation.""" - transformer = Transformer.init(int) + transformer = createTransformer(int) reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) result = list(reducer([1, 2, 3, 4])) assert result == [10] @@ -133,7 +134,7 @@ def test_reduce_with_context(self): def test_reduce_after_transformation(self): """Test reduce after map transformation.""" - transformer = Transformer.init(int).map(lambda x: x * 2) + transformer = createTransformer(int).map(lambda x: x * 2) reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) result = list(reducer([1, 2, 3])) assert result == [12] # [2, 4, 6] summed = 12 @@ -144,19 +145,19 @@ class TestTransformerEdgeCases: def test_empty_data(self): """Test transformer with empty data.""" - transformer = Transformer.init(int).map(lambda x: x * 2) + transformer = createTransformer(int).map(lambda x: x * 2) result = list(transformer([])) assert result == [] def test_single_element(self): """Test transformer with single element.""" - transformer = Transformer.init(int).map(lambda x: x * 2).filter(lambda x: x > 0) + transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 0) result = list(transformer([5])) assert result == [10] def test_filter_removes_all_elements(self): """Test filter that removes all elements.""" - transformer = Transformer.init(int).filter(lambda x: x > 100) + transformer = createTransformer(int).filter(lambda x: x > 100) result = list(transformer([1, 2, 3])) assert result == [] @@ -165,11 +166,11 @@ def test_chunking_behavior(self): data = list(range(100)) # Small chunks - small_chunk_transformer = Transformer.init(int, chunk_size=5).map(lambda x: x * 2) + small_chunk_transformer = createTransformer(int, chunk_size=5).map(lambda x: x * 2) small_result = list(small_chunk_transformer(data)) # Large chunks - large_chunk_transformer = Transformer.init(int, chunk_size=50).map(lambda x: x * 2) + large_chunk_transformer = createTransformer(int, chunk_size=50).map(lambda x: x * 2) large_result = list(large_chunk_transformer(data)) # Results should be identical regardless of chunk size @@ -181,7 +182,7 @@ class TestTransformerFromTransformer: def test_copy_transformer_logic(self): """Test that from_transformer copies transformation logic.""" - source = Transformer.init(int, chunk_size=50).map(lambda x: x * 3).filter(lambda x: x > 6) + source = createTransformer(int, chunk_size=50).map(lambda x: x * 3).filter(lambda x: x > 6) target = Transformer.from_transformer(source) data = [1, 2, 3, 4, 5] @@ -193,7 +194,7 @@ def test_copy_transformer_logic(self): def test_copy_with_custom_parameters(self): """Test from_transformer with custom parameters.""" - source = Transformer.init(int).map(lambda x: x * 2) + source = createTransformer(int).map(lambda x: x * 2) target = Transformer.from_transformer(source, chunk_size=200) assert target.chunk_size == 200 @@ -207,14 +208,14 @@ class TestTransformerErrorHandling: def test_catch_with_successful_operation(self): """Test catch with successful transformation.""" - transformer = Transformer.init(int).catch(lambda t: t.map(lambda x: x * 2)) + transformer = createTransformer(int).catch(lambda t: t.map(lambda x: x * 2)) result = list(transformer([1, 2, 3])) assert result == [2, 4, 6] def test_catch_with_error_isolation(self): """Test catch isolates errors to specific chunks.""" errored_chunks = [] - transformer = Transformer.init(int, chunk_size=1).catch( + transformer = createTransformer(int, chunk_size=1).catch( lambda t: t.map(lambda x: x / 0), # Division by zero on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore ) @@ -229,7 +230,7 @@ def test_global_error_handler(self): error_handler = ErrorHandler() error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) - transformer = Transformer.init(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) + transformer = createTransformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) list(transformer([1, 2, 3])) assert errored_chunks == [[1], [2], [3]] @@ -241,7 +242,7 @@ def set_error_flag(_chunk, _error, context): context["error_occurred"] = True transformer = ( - Transformer.init(int, chunk_size=1) + createTransformer(int, chunk_size=1) .catch( lambda t: t.map(lambda x: x / 0), on_error=set_error_flag, # type: ignore @@ -263,7 +264,7 @@ def raise_on_error(ctx): raise RuntimeError("Short-circuit condition met, stopping execution.") transformer = ( - Transformer.init(int, chunk_size=1) + createTransformer(int, chunk_size=1) .catch( lambda t: t.map(lambda x: x / 0), on_error=set_error_flag, # type: ignore diff --git a/uv.lock b/uv.lock index 15e5f9d..5b48b7e 100644 --- a/uv.lock +++ b/uv.lock @@ -71,6 +71,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/20/94/c5790835a017658cbfabd07f3bfb549140c3ac458cfc196323996b10095a/charset_normalizer-3.4.2-py3-none-any.whl", hash = "sha256:7f56930ab0abd1c45cd15be65cc741c28b1c9a34876ce8c17a2fa107810c0af0", size = 52626, upload-time = "2025-05-02T08:34:40.053Z" }, ] +[[package]] +name = "cloudpickle" +version = "3.1.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/52/39/069100b84d7418bc358d81669d5748efb14b9cceacd2f9c75f550424132f/cloudpickle-3.1.1.tar.gz", hash = "sha256:b216fa8ae4019d5482a8ac3c95d8f6346115d8835911fd4aefd1a445e4242c64", size = 22113, upload-time = "2025-01-14T17:02:05.085Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/e8/64c37fadfc2816a7701fa8a6ed8d87327c7d54eacfbfb6edab14a2f2be75/cloudpickle-3.1.1-py3-none-any.whl", hash = "sha256:c8c5a44295039331ee9dad40ba100a9c7297b6f988e50e87ccdf3765a668350e", size = 20992, upload-time = "2025-01-14T17:02:02.417Z" }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -212,6 +221,7 @@ name = "laygo" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "loky" }, { name = "requests" }, ] @@ -225,6 +235,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "loky", specifier = ">=3.5.5" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0.0" }, { name = "requests", specifier = ">=2.32" }, { name = "requests-mock", marker = "extra == 'dev'", specifier = ">=1.12.1" }, @@ -233,6 +244,18 @@ requires-dist = [ ] provides-extras = ["dev"] +[[package]] +name = "loky" +version = "3.5.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cloudpickle" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e4/b1/059b0b3e5d98cb5085cb8611bf6e89bc36999dc056d5470a784bfdc74e49/loky-3.5.5.tar.gz", hash = "sha256:0730c7180a35972532f22fe09be600db9d69fed3552aad0410c570acbb47cf41", size = 101789, upload-time = "2025-05-23T08:58:51.242Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b2/3e/4c14113f8b9f8109a437f7148cca584147a611e1b0fc5012b87c7db9876b/loky-3.5.5-py3-none-any.whl", hash = "sha256:0cd7655df3579c4d2f5cf9c6c6f222f44a3cffe6a27e29edc10a573c138995af", size = 56038, upload-time = "2025-05-23T08:58:49.335Z" }, +] + [[package]] name = "markdown-it-py" version = "3.0.0" From 5cceb7ce94fc69c4b6ec82008ed4c697f40ecf54 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Fri, 18 Jul 2025 18:15:43 +0000 Subject: [PATCH 3/6] fix: updated test --- tests/test_parallel_transformer.py | 14 ++++++++------ tests/test_threaded_transformer.py | 7 ------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py index 18c81f0..df1c874 100644 --- a/tests/test_parallel_transformer.py +++ b/tests/test_parallel_transformer.py @@ -84,16 +84,18 @@ def test_tap_side_effects(self): def safe_increment(x: int, ctx: PipelineContext) -> int: - current_items = ctx["items"] - time.sleep(0.001) - ctx["items"] = current_items + 1 + with ctx["lock"]: + current_items = ctx["items"] + time.sleep(0.001) + ctx["items"] = current_items + 1 return x * 2 def update_stats(x: int, ctx: PipelineContext) -> int: - ctx["total_sum"] += x - ctx["item_count"] += 1 - ctx["max_value"] = max(ctx["max_value"], x) + with ctx["lock"]: + ctx["total_sum"] += x + ctx["item_count"] += 1 + ctx["max_value"] = max(ctx["max_value"], x) return x * 3 diff --git a/tests/test_threaded_transformer.py b/tests/test_threaded_transformer.py index e10069c..4c25fc8 100644 --- a/tests/test_threaded_transformer.py +++ b/tests/test_threaded_transformer.py @@ -15,13 +15,6 @@ class TestThreadedTransformerBasics: """Test core parallel transformer functionality.""" - def test_initialization_defaults(self): - """Test parallel transformer initialization with default values.""" - transformer = ThreadedTransformer[int, int]() - assert transformer.max_workers == 4 - assert transformer.ordered is True - assert transformer.chunk_size == 1000 - def test_initialization_custom_parameters(self): """Test initialization with custom parameters.""" transformer = ThreadedTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) From ed0d59c553ad3f840b70e846c58a5c4d40c8aa2e Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Fri, 18 Jul 2025 20:08:06 +0000 Subject: [PATCH 4/6] fix: global pipeline context passing --- laygo/pipeline.py | 90 +++++++++++++++++++++++----------- laygo/transformers/parallel.py | 64 +++++++++++++++--------- 2 files changed, 101 insertions(+), 53 deletions(-) diff --git a/laygo/pipeline.py b/laygo/pipeline.py index 44d4a9c..ae2e953 100644 --- a/laygo/pipeline.py +++ b/laygo/pipeline.py @@ -1,7 +1,10 @@ +# pipeline.py + from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator import itertools +import multiprocessing as mp from typing import Any from typing import TypeVar from typing import overload @@ -17,7 +20,7 @@ class Pipeline[T]: """ Manages a data source and applies transformers to it. - Provides terminal operations to consume the resulting data. + Always uses a multiprocessing-safe shared context. """ def __init__(self, *data: Iterable[T]): @@ -25,15 +28,63 @@ def __init__(self, *data: Iterable[T]): raise ValueError("At least one data source must be provided to Pipeline.") self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0] self.processed_data: Iterator = iter(self.data_source) - self.ctx = PipelineContext() + + # Always create a shared context with multiprocessing manager + self._manager = mp.Manager() + self.ctx = self._manager.dict() + # Add a shared lock to the context for safe concurrent updates + self.ctx["lock"] = self._manager.Lock() + + # Store reference to original context for final synchronization + self._original_context_ref: PipelineContext | None = None + + def __del__(self): + """Clean up the multiprocessing manager when the pipeline is destroyed.""" + try: + self._sync_context_back() + self._manager.shutdown() + except Exception: + pass # Ignore errors during cleanup def context(self, ctx: PipelineContext) -> "Pipeline[T]": """ - Sets the context for the pipeline. + Updates the pipeline context and stores a reference to the original context. + When the pipeline finishes processing, the original context will be updated + with the final pipeline context data. """ - self.ctx = ctx + # Store reference to the original context + self._original_context_ref = ctx + # Copy the context data to the pipeline's shared context + self.ctx.update(ctx) return self + def _sync_context_back(self) -> None: + """ + Synchronize the final pipeline context back to the original context reference. + This is called after processing is complete. + """ + if self._original_context_ref is not None: + # Copy the final context state back to the original context reference + final_context_state = dict(self.ctx) + final_context_state.pop("lock", None) # Remove non-serializable lock + self._original_context_ref.clear() + self._original_context_ref.update(final_context_state) + + def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]": + """ + Shorthand method to apply a transformation using a lambda function. + Creates a Transformer under the hood and applies it to the pipeline. + + Args: + t: A callable that takes a transformer and returns a transformed transformer + + Returns: + A new Pipeline with the transformed data + """ + # Create a new transformer and apply the transformation function + transformer = t(Transformer[T, T]()) + return self.apply(transformer) + @overload def apply[U](self, transformer: Transformer[T, U]) -> "Pipeline[U]": ... @@ -41,10 +92,7 @@ def apply[U](self, transformer: Transformer[T, U]) -> "Pipeline[U]": ... def apply[U](self, transformer: Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": ... @overload - def apply[U]( - self, - transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]], - ) -> "Pipeline[U]": ... + def apply[U](self, transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]]) -> "Pipeline[U]": ... def apply[U]( self, @@ -53,42 +101,26 @@ def apply[U]( | Callable[[Iterable[T], PipelineContext], Iterator[U]], ) -> "Pipeline[U]": """ - Applies a transformer to the current data source. + Applies a transformer to the current data source. The pipeline's + managed context is passed down. """ - match transformer: case Transformer(): - # If a Transformer instance is provided, use its __call__ method + # The transformer is called with self.ctx, which is the + # shared mp.Manager.dict proxy when inside a 'with' block. self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore case _ if callable(transformer): - # If a callable function is provided, call it with the current data and context - if is_context_aware(transformer): processed_transformer = transformer else: processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731 - self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore case _: raise TypeError("Transformer must be a Transformer instance or a callable function") return self # type: ignore - def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]": - """ - Shorthand method to apply a transformation using a lambda function. - Creates a Transformer under the hood and applies it to the pipeline. - - Args: - t: A callable that takes a transformer and returns a transformed transformer - - Returns: - A new Pipeline with the transformed data - """ - # Create a new transformer and apply the transformation function - transformer = t(Transformer[T, T]()) - return self.apply(transformer) - + # ... The rest of the Pipeline class (transform, __iter__, to_list, etc.) remains unchanged ... def __iter__(self) -> Iterator[T]: """Allows the pipeline to be iterated over.""" yield from self.processed_data diff --git a/laygo/transformers/parallel.py b/laygo/transformers/parallel.py index d663868..aacf977 100644 --- a/laygo/transformers/parallel.py +++ b/laygo/transformers/parallel.py @@ -11,6 +11,7 @@ import copy import itertools import multiprocessing as mp +from multiprocessing.managers import DictProxy from typing import Any from typing import Union from typing import overload @@ -85,30 +86,45 @@ def from_transformer[T, U]( ) def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """Executes the transformer on data concurrently using processes.""" - with mp.Manager() as manager: - initial_ctx_data = context if context is not None else self.context - shared_context = manager.dict(initial_ctx_data) - - if "lock" not in shared_context: - shared_context["lock"] = manager.Lock() - - try: - with ProcessPoolExecutor(max_workers=self.max_workers) as executor: - chunks_to_process = self._chunk_generator(data) - gen_func = self._ordered_generator if self.ordered else self._unordered_generator - processed_chunks_iterator = gen_func(chunks_to_process, executor, shared_context) - - for result_chunk in processed_chunks_iterator: - yield from result_chunk - finally: - if context is not None: - final_context_state = dict(shared_context) - final_context_state.pop("lock", None) - # FIX 2: Do not clear the context, just update it. - # This allows chained transformers to merge their context results. - # context.clear() - context.update(final_context_state) + """ + Executes the transformer on data concurrently. It uses the shared + context provided by the Pipeline, if available. + """ + run_context = context if context is not None else self.context + + # Detect if the context is already managed by the Pipeline. + is_managed_context = isinstance(run_context, DictProxy) + + if is_managed_context: + # Use the existing shared context and lock from the Pipeline. + shared_context = run_context + yield from self._execute_with_context(data, shared_context) + # The context is live, so no need to update it here. + # The Pipeline's __exit__ will handle final state. + else: + # Fallback for standalone use: create a temporary manager. + with mp.Manager() as manager: + initial_ctx_data = dict(run_context) + shared_context = manager.dict(initial_ctx_data) + if "lock" not in shared_context: + shared_context["lock"] = manager.Lock() + + yield from self._execute_with_context(data, shared_context) + + # Copy results back to the original non-shared context. + final_context_state = dict(shared_context) + final_context_state.pop("lock", None) + run_context.update(final_context_state) + + def _execute_with_context(self, data: Iterable[In], shared_context: MutableMapping[str, Any]) -> Iterator[Out]: + """Helper to run the execution logic with a given context.""" + with ProcessPoolExecutor(max_workers=self.max_workers) as executor: + chunks_to_process = self._chunk_generator(data) + gen_func = self._ordered_generator if self.ordered else self._unordered_generator + processed_chunks_iterator = gen_func(chunks_to_process, executor, shared_context) + + for result_chunk in processed_chunks_iterator: + yield from result_chunk # ... The rest of the file remains the same ... def _ordered_generator( From 0b2cdb99dfe3599ffeb0c94ef6b9e4d060807dfe Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Fri, 18 Jul 2025 20:19:12 +0000 Subject: [PATCH 5/6] fix: reuse global executor --- laygo/transformers/parallel.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/laygo/transformers/parallel.py b/laygo/transformers/parallel.py index aacf977..73def84 100644 --- a/laygo/transformers/parallel.py +++ b/laygo/transformers/parallel.py @@ -17,6 +17,7 @@ from typing import overload from loky import ProcessPoolExecutor +from loky import get_reusable_executor from laygo.errors import ErrorHandler from laygo.helpers import PipelineContext @@ -119,6 +120,8 @@ def __call__(self, data: Iterable[In], context: PipelineContext | None = None) - def _execute_with_context(self, data: Iterable[In], shared_context: MutableMapping[str, Any]) -> Iterator[Out]: """Helper to run the execution logic with a given context.""" with ProcessPoolExecutor(max_workers=self.max_workers) as executor: + executor = get_reusable_executor(max_workers=self.max_workers) + chunks_to_process = self._chunk_generator(data) gen_func = self._ordered_generator if self.ordered else self._unordered_generator processed_chunks_iterator = gen_func(chunks_to_process, executor, shared_context) From ec2fe3d7fc368fbbd7115fdb493b28df589c20f0 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Fri, 18 Jul 2025 20:20:34 +0000 Subject: [PATCH 6/6] fix: expose createHTTPTransformer --- laygo/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/laygo/__init__.py b/laygo/__init__.py index defa647..dc5dc88 100644 --- a/laygo/__init__.py +++ b/laygo/__init__.py @@ -6,6 +6,7 @@ from laygo.helpers import PipelineContext from laygo.pipeline import Pipeline from laygo.transformers.http import HTTPTransformer +from laygo.transformers.http import createHTTPTransformer from laygo.transformers.parallel import ParallelTransformer from laygo.transformers.parallel import createParallelTransformer from laygo.transformers.threaded import ThreadedTransformer @@ -22,6 +23,7 @@ "ParallelTransformer", "createParallelTransformer", "HTTPTransformer", + "createHTTPTransformer", "PipelineContext", "ErrorHandler", ]