Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ jobs:

- name: Update version in __init__.py
run: |
sed -i 's|VERSION_NUMBER|${{ steps.version.outputs.VERSION }}|g' laygo/__init__.py
sed -i 's|"0.1.0"|"${{ steps.version.outputs.VERSION }}"|g' pyproject.toml
echo "Updated version to ${{ steps.version.outputs.VERSION }}"
cat laygo/__init__.py
cat pyproject.toml

- name: Build package
run: uv build
Expand Down
14 changes: 7 additions & 7 deletions laygo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
Laygo - A lightweight Python library for building resilient, in-memory data pipelines
"""

__version__ = "VERSION_NUMBER"

from .errors import ErrorHandler
from .helpers import PipelineContext
from .pipeline import Pipeline
from .transformers.parallel import ParallelTransformer
from .transformers.transformer import Transformer
from laygo.errors import ErrorHandler
from laygo.helpers import PipelineContext
from laygo.pipeline import Pipeline
from laygo.transformers.http import HTTPTransformer
from laygo.transformers.parallel import ParallelTransformer
from laygo.transformers.transformer import Transformer

__all__ = [
"Pipeline",
"Transformer",
"ParallelTransformer",
"HTTPTransformer",
"PipelineContext",
"ErrorHandler",
]
3 changes: 1 addition & 2 deletions laygo/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

from laygo.helpers import PipelineContext
from laygo.helpers import is_context_aware

from .transformers.transformer import Transformer
from laygo.transformers.transformer import Transformer

T = TypeVar("T")
PipelineFunction = Callable[[T], Any]
Expand Down
157 changes: 157 additions & 0 deletions laygo/transformers/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
The final, self-sufficient DistributedTransformer with corrected typing.
"""

from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
import hashlib
import itertools
import pickle
from typing import Any
from typing import TypeVar
from typing import Union
from typing import overload

import requests

from laygo.errors import ErrorHandler
from laygo.helpers import PipelineContext
from laygo.transformers.transformer import ChunkErrorHandler
from laygo.transformers.transformer import PipelineFunction
from laygo.transformers.transformer import Transformer

In = TypeVar("In")
Out = TypeVar("Out")
T = TypeVar("T")
U = TypeVar("U")


class HTTPTransformer(Transformer[In, Out]):
"""
A self-sufficient, chainable transformer that manages its own
distributed execution and worker endpoint definition.
"""

def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8):
super().__init__()
self.base_url = base_url.rstrip("/")
self.endpoint = endpoint
self.max_workers = max_workers
self.session = requests.Session()
self._worker_url: str | None = None

def _finalize_config(self):
"""Determines the final worker URL, generating one if needed."""
if hasattr(self, "_worker_url") and self._worker_url:
return

if self.endpoint:
path = self.endpoint
else:
if not self.transformer:
raise ValueError("Cannot determine endpoint for an empty transformer.")
serialized_logic = pickle.dumps(self.transformer)
hash_id = hashlib.sha1(serialized_logic).hexdigest()[:16]
path = f"/autogen/{hash_id}"

self.endpoint = path.lstrip("/")
self._worker_url = f"{self.base_url}/{self.endpoint}"

# --- Original HTTPTransformer Methods ---

def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]:
"""CLIENT-SIDE: Called by the Pipeline to start distributed processing."""
self._finalize_config()

def process_chunk(chunk: list) -> list:
"""Target for a thread: sends one chunk to the worker."""
try:
response = self.session.post(
self._worker_url, # type: ignore
json=chunk,
timeout=300,
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error calling worker {self._worker_url}: {e}")
return []

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
chunk_iterator = self._chunk_generator(data)
futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers)}
while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield from future.result()
try:
new_chunk = next(chunk_iterator)
futures.add(executor.submit(process_chunk, new_chunk))
except StopIteration:
continue

def get_route(self):
"""
Function that returns the route for the worker.
This is used to register the worker in a Flask app or similar.
"""
self._finalize_config()

def worker_view_func(chunk: list, context: PipelineContext):
"""The actual worker logic for this transformer."""
return self.transformer(chunk, context)

return (f"/{self.endpoint}", worker_view_func)

# --- Overridden Chaining Methods to Preserve Type ---

def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "HTTPTransformer[In, Out]":
super().on_error(handler)
return self

def map[U](self, function: PipelineFunction[Out, U]) -> "HTTPTransformer[In, U]":
super().map(function)
return self # type: ignore

def filter(self, predicate: PipelineFunction[Out, bool]) -> "HTTPTransformer[In, Out]":
super().filter(predicate)
return self

@overload
def flatten[T](self: "HTTPTransformer[In, list[T]]") -> "HTTPTransformer[In, T]": ...
@overload
def flatten[T](self: "HTTPTransformer[In, tuple[T, ...]]") -> "HTTPTransformer[In, T]": ...
@overload
def flatten[T](self: "HTTPTransformer[In, set[T]]") -> "HTTPTransformer[In, T]": ...
# Forgive me for I have sinned, but this is necessary to avoid type errors
# Sinec I'm setting self type in the parent class, overriding it isn't allowed
def flatten[T]( # type: ignore
self: Union["HTTPTransformer[In, list[T]]", "HTTPTransformer[In, tuple[T, ...]]", "HTTPTransformer[In, set[T]]"],
) -> "HTTPTransformer[In, T]":
super().flatten() # type: ignore
return self # type: ignore

def tap(self, function: PipelineFunction[Out, Any]) -> "HTTPTransformer[In, Out]":
super().tap(function)
return self

def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T]"]) -> "HTTPTransformer[In, T]":
# Note: The type hint for `t` is slightly adjusted to reflect it receives an HTTPTransformer
super().apply(t) # type: ignore
return self # type: ignore

def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, U] | None = None,
) -> "HTTPTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore

def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "HTTPTransformer[In, Out]":
super().short_circuit(function)
return self
65 changes: 61 additions & 4 deletions laygo/transformers/parallel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Parallel transformer implementation using multiple threads."""

from collections import deque
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED
Expand All @@ -11,11 +12,17 @@
from functools import partial
import itertools
import threading
from typing import Any
from typing import Union
from typing import overload

from .transformer import DEFAULT_CHUNK_SIZE
from .transformer import InternalTransformer
from .transformer import PipelineContext
from .transformer import Transformer
from laygo.errors import ErrorHandler
from laygo.helpers import PipelineContext
from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE
from laygo.transformers.transformer import ChunkErrorHandler
from laygo.transformers.transformer import InternalTransformer
from laygo.transformers.transformer import PipelineFunction
from laygo.transformers.transformer import Transformer


class ParallelPipelineContextType(PipelineContext):
Expand Down Expand Up @@ -142,3 +149,53 @@ def result_iterator_manager() -> Iterator[Out]:
yield from result_chunk

return result_iterator_manager()

# --- Overridden Chaining Methods to Preserve Type ---

def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ParallelTransformer[In, Out]":
super().on_error(handler)
return self

def map[U](self, function: PipelineFunction[Out, U]) -> "ParallelTransformer[In, U]":
super().map(function)
return self # type: ignore

def filter(self, predicate: PipelineFunction[Out, bool]) -> "ParallelTransformer[In, Out]":
super().filter(predicate)
return self

@overload
def flatten[T](self: "ParallelTransformer[In, list[T]]") -> "ParallelTransformer[In, T]": ...
@overload
def flatten[T](self: "ParallelTransformer[In, tuple[T, ...]]") -> "ParallelTransformer[In, T]": ...
@overload
def flatten[T](self: "ParallelTransformer[In, set[T]]") -> "ParallelTransformer[In, T]": ...
def flatten[T]( # type: ignore
self: Union[
"ParallelTransformer[In, list[T]]", "ParallelTransformer[In, tuple[T, ...]]", "ParallelTransformer[In, set[T]]"
],
) -> "ParallelTransformer[In, T]":
super().flatten() # type: ignore
return self # type: ignore

def tap(self, function: PipelineFunction[Out, Any]) -> "ParallelTransformer[In, Out]":
super().tap(function)
return self

def apply[T](
self, t: Callable[["ParallelTransformer[In, Out]"], "Transformer[In, T]"]
) -> "ParallelTransformer[In, T]":
super().apply(t) # type: ignore
return self # type: ignore

def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, U] | None = None,
) -> "ParallelTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore

def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "ParallelTransformer[In, Out]":
super().short_circuit(function)
return self
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ path = "laygo/__init__.py"

[project]
name = "laygo"
dynamic = ["version"]
version = "0.1.0"
description = "A lightweight Python library for building resilient, in-memory data pipelines with elegant, chainable syntax"
readme = "README.md"
requires-python = ">=3.12"
Expand All @@ -29,17 +29,21 @@ classifiers = [
"Typing :: Typed",
]

dependencies = ["requests>=2.32"]

[project.urls]
Homepage = "https://github.com/ringoldsdev/laygo-python"
Documentation = "https://github.com/ringoldsdev/laygo-python/wiki"
Repository = "https://github.com/ringoldsdev/laygo-python.git"
Issues = "https://github.com/ringoldsdev/laygo-python/issues"


[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"ruff>=0.1.0",
"twine>=4.0.0",
"requests-mock>=1.12.1",
]

[tool.ruff]
Expand Down
59 changes: 59 additions & 0 deletions tests/test_http_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Assuming the classes from your latest example are in a file named `pipeline_lib.py`
# This includes Pipeline, Transformer, and your HTTPTransformer.
import requests_mock

from laygo import HTTPTransformer
from laygo import Pipeline
from laygo import PipelineContext


class TestHTTPTransformer:
"""
Test suite for the HTTPTransformer class.
"""

def test_distributed_transformer_with_mock(self):
"""
Tests the HTTPTransformer by mocking the worker endpoint.
This test validates that the client-side of the transformer correctly
calls the endpoint and processes the response from the (mocked) worker.
"""
# 1. Define the transformer's properties
base_url = "http://mock-worker.com"
endpoint = "/process/data"
worker_url = f"{base_url}{endpoint}"

# 2. Define the transformer and its logic using the chainable API.
# This single instance holds both the client and server logic.
http_transformer = (
HTTPTransformer(base_url=base_url, endpoint=endpoint).map(lambda x: x * 2).filter(lambda x: x > 10)
)

# Set a small chunk_size to ensure the client makes multiple requests
http_transformer.chunk_size = 4

# 3. Get the worker's logic from the transformer itself
# The `get_route` method provides the exact function the worker would run.
_, worker_view_func = http_transformer.get_route()

# 4. Configure the mock endpoint to use the real worker logic
def mock_response(request, context):
"""The behavior of the mocked Flask endpoint."""
input_chunk = request.json()
# Call the actual view function logic obtained from get_route()
# We pass None for the context as it's not used in this simple case.
output_chunk = worker_view_func(chunk=input_chunk, context=PipelineContext())
return output_chunk

# Use requests_mock context manager
with requests_mock.Mocker() as m:
m.post(worker_url, json=mock_response)

# 5. Run the standard Pipeline with the configured transformer
initial_data = list(range(10)) # [0, 1, 2, ..., 9]
pipeline = Pipeline(initial_data).apply(http_transformer)
result = pipeline.to_list()

# 6. Assert the final result
expected_result = [12, 14, 16, 18]
assert sorted(result) == sorted(expected_result)
Loading
Loading