Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions laygo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@
from laygo.helpers import PipelineContext
from laygo.pipeline import Pipeline
from laygo.transformers.http import HTTPTransformer
from laygo.transformers.http import createHTTPTransformer
from laygo.transformers.parallel import ParallelTransformer
from laygo.transformers.parallel import createParallelTransformer
from laygo.transformers.threaded import ThreadedTransformer
from laygo.transformers.threaded import createThreadedTransformer
from laygo.transformers.transformer import Transformer
from laygo.transformers.transformer import createTransformer

__all__ = [
"Pipeline",
"Transformer",
"createTransformer",
"ThreadedTransformer",
"createThreadedTransformer",
"ParallelTransformer",
"createParallelTransformer",
"HTTPTransformer",
"createHTTPTransformer",
"PipelineContext",
"ErrorHandler",
]
90 changes: 61 additions & 29 deletions laygo/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# pipeline.py

from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
import itertools
import multiprocessing as mp
from typing import Any
from typing import TypeVar
from typing import overload
Expand All @@ -17,34 +20,79 @@
class Pipeline[T]:
"""
Manages a data source and applies transformers to it.
Provides terminal operations to consume the resulting data.
Always uses a multiprocessing-safe shared context.
"""

def __init__(self, *data: Iterable[T]):
if len(data) == 0:
raise ValueError("At least one data source must be provided to Pipeline.")
self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0]
self.processed_data: Iterator = iter(self.data_source)
self.ctx = PipelineContext()

# Always create a shared context with multiprocessing manager
self._manager = mp.Manager()
self.ctx = self._manager.dict()
# Add a shared lock to the context for safe concurrent updates
self.ctx["lock"] = self._manager.Lock()

# Store reference to original context for final synchronization
self._original_context_ref: PipelineContext | None = None

def __del__(self):
"""Clean up the multiprocessing manager when the pipeline is destroyed."""
try:
self._sync_context_back()
self._manager.shutdown()
except Exception:
pass # Ignore errors during cleanup

def context(self, ctx: PipelineContext) -> "Pipeline[T]":
"""
Sets the context for the pipeline.
Updates the pipeline context and stores a reference to the original context.
When the pipeline finishes processing, the original context will be updated
with the final pipeline context data.
"""
self.ctx = ctx
# Store reference to the original context
self._original_context_ref = ctx
# Copy the context data to the pipeline's shared context
self.ctx.update(ctx)
return self

def _sync_context_back(self) -> None:
"""
Synchronize the final pipeline context back to the original context reference.
This is called after processing is complete.
"""
if self._original_context_ref is not None:
# Copy the final context state back to the original context reference
final_context_state = dict(self.ctx)
final_context_state.pop("lock", None) # Remove non-serializable lock
self._original_context_ref.clear()
self._original_context_ref.update(final_context_state)

def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]":
"""
Shorthand method to apply a transformation using a lambda function.
Creates a Transformer under the hood and applies it to the pipeline.

Args:
t: A callable that takes a transformer and returns a transformed transformer

Returns:
A new Pipeline with the transformed data
"""
# Create a new transformer and apply the transformation function
transformer = t(Transformer[T, T]())
return self.apply(transformer)

@overload
def apply[U](self, transformer: Transformer[T, U]) -> "Pipeline[U]": ...

@overload
def apply[U](self, transformer: Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": ...

@overload
def apply[U](
self,
transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]],
) -> "Pipeline[U]": ...
def apply[U](self, transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]]) -> "Pipeline[U]": ...

def apply[U](
self,
Expand All @@ -53,42 +101,26 @@ def apply[U](
| Callable[[Iterable[T], PipelineContext], Iterator[U]],
) -> "Pipeline[U]":
"""
Applies a transformer to the current data source.
Applies a transformer to the current data source. The pipeline's
managed context is passed down.
"""

match transformer:
case Transformer():
# If a Transformer instance is provided, use its __call__ method
# The transformer is called with self.ctx, which is the
# shared mp.Manager.dict proxy when inside a 'with' block.
self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore
case _ if callable(transformer):
# If a callable function is provided, call it with the current data and context

if is_context_aware(transformer):
processed_transformer = transformer
else:
processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731

self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore
case _:
raise TypeError("Transformer must be a Transformer instance or a callable function")

return self # type: ignore

def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]":
"""
Shorthand method to apply a transformation using a lambda function.
Creates a Transformer under the hood and applies it to the pipeline.

Args:
t: A callable that takes a transformer and returns a transformed transformer

Returns:
A new Pipeline with the transformed data
"""
# Create a new transformer and apply the transformation function
transformer = t(Transformer[T, T]())
return self.apply(transformer)

# ... The rest of the Pipeline class (transform, __iter__, to_list, etc.) remains unchanged ...
def __iter__(self) -> Iterator[T]:
"""Allows the pipeline to be iterated over."""
yield from self.processed_data
Expand Down
20 changes: 18 additions & 2 deletions laygo/transformers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,30 @@
U = TypeVar("U")


def createHTTPTransformer[T](
_type_hint: type[T],
base_url: str,
chunk_size: int | None = None,
endpoint: str | None = None,
max_workers: int = 4,
) -> "HTTPTransformer[T, T]":
"""Create a new identity parallel transformer with an explicit type hint."""
return HTTPTransformer[T, T](
base_url=base_url,
endpoint=endpoint,
max_workers=max_workers,
chunk_size=chunk_size,
)


class HTTPTransformer(Transformer[In, Out]):
"""
A self-sufficient, chainable transformer that manages its own
distributed execution and worker endpoint definition.
"""

def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8):
super().__init__()
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8, chunk_size: int | None = None):
super().__init__(chunk_size=chunk_size)
self.base_url = base_url.rstrip("/")
self.endpoint = endpoint
self.max_workers = max_workers
Expand Down
Loading
Loading