Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
59f42f6
Sync main with master (#139)
wool-labs[bot] Mar 28, 2026
c0ba1f9
refactor: Accept cache key callable as a SubscriberMeta keyword argument
conradbzura Mar 28, 2026
b1bfb11
Accept cache key callable as a SubscriberMeta keyword argument — Clos…
conradbzura Mar 28, 2026
faaa344
ci: Split unit and integration test coverage in CI
conradbzura Mar 29, 2026
aa0eaa6
Split unit and integration test coverage in CI — Closes #141 (#142)
conradbzura Mar 29, 2026
7238aa1
feat: Add lazy proxy start to WorkerProxy
conradbzura Mar 30, 2026
d4e79d2
test: Cover lazy proxy start across unit and integration tests
conradbzura Mar 30, 2026
e1449fd
docs: Document lazy proxy startup and flag propagation
conradbzura Mar 30, 2026
9fd138e
refactor: Split WorkerProxy into enter/exit and start/stop layers
conradbzura Mar 31, 2026
566431f
test: Update proxy tests for enter/exit and start/stop split
conradbzura Mar 31, 2026
e758075
Make worker proxy start lazily on dispatch by default — Closes #54 (#…
conradbzura Mar 31, 2026
f2468e4
feat: Add noreentry descriptor for single-use method guards
conradbzura Mar 31, 2026
79b3de2
refactor: Guard WorkerProxy and WorkerPool against reentrant context …
conradbzura Mar 31, 2026
1ad0775
test: Improve guard test coverage for pool and proxy
conradbzura Mar 31, 2026
30c6570
docs: Document single-use context lifecycle for pools and proxies
conradbzura Mar 31, 2026
077199e
fix: Support Python 3.11 in noreentry decorator
conradbzura Mar 31, 2026
b9369d7
Guard WorkerPool and WorkerProxy against reentrant context usage — Cl…
conradbzura Apr 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/actions/run-tests/action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Run tests
description: Checkout, install dependencies, and run pytest for a given namespace.
inputs:
python-version:
required: true
type: string
namespace:
required: true
type: string
pytest-marker:
required: true
type: string
cov-fail-under:
required: true
type: string

runs:
using: composite
steps:
- name: Install uv and prepare python
uses: astral-sh/setup-uv@v5
with:
python-version: ${{ inputs.python-version }}

- name: Install packages
shell: bash
run: |
.github/scripts/install-python-packages.sh
uv pip install -e './${{ inputs.namespace }}[dev]'
uv pip freeze

- name: Run tests
shell: bash
run: |
ulimit -n 65536
cd ${{ inputs.namespace }}
uv run pytest -m "${{ inputs.pytest-marker }}" --cov-fail-under=${{ inputs.cov-fail-under }}
53 changes: 25 additions & 28 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,35 @@ jobs:
cd wool
uv run pyright

run-tests:
name: Namespace ${{ matrix.namespace }} / Python ${{ matrix.python-version }}
unit-tests:
name: Unit / Python ${{ matrix.python-version }}
runs-on: ubuntu-latest
strategy:
matrix:
namespace:
- 'wool'
python-version:
- '3.11'
- '3.12'
- '3.13'
namespace: ['wool']
python-version: ['3.11', '3.12', '3.13']
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install uv and prepare python
uses: astral-sh/setup-uv@v5
- uses: actions/checkout@v4
- uses: ./.github/actions/run-tests
with:
python-version: ${{ matrix.python-version }}
namespace: ${{ matrix.namespace }}
pytest-marker: not integration
cov-fail-under: '98'

- name: Install packages
env:
NAMESPACE: ${{ matrix.namespace }}
run: |
.github/scripts/install-python-packages.sh
uv pip install -e './${{ env.NAMESPACE }}[dev]'
uv pip freeze

- name: Run tests
env:
NAMESPACE: ${{ matrix.namespace }}
run: |
ulimit -n 65536
cd ${{ env.NAMESPACE }}
uv run pytest
integration-tests:
name: Integration / Python ${{ matrix.python-version }}
needs: unit-tests
runs-on: ubuntu-latest
strategy:
matrix:
namespace: ['wool']
python-version: ['3.11', '3.12', '3.13']
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/run-tests
with:
python-version: ${{ matrix.python-version }}
namespace: ${{ matrix.namespace }}
pytest-marker: integration
cov-fail-under: '70'
1 change: 0 additions & 1 deletion wool/.coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ omit =
[report]
show_missing = true
precision = 2
fail_under = 98
8 changes: 8 additions & 0 deletions wool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ async with wool.WorkerPool(discovery=wool.LanDiscovery(), lease=10):
result = await my_routine()
```

`lazy` controls whether the pool's internal `WorkerProxy` defers startup until the first task is dispatched. Defaults to `True`. The pool propagates this flag to every `WorkerProxy` it constructs, and each task serializes the proxy (including the flag) so that workers receiving the task inherit the same laziness setting. With `lazy=True`, worker subprocesses that never invoke nested `@wool.routine` calls avoid the cost of discovery subscription and sentinel setup entirely. Set `lazy=False` to start proxies eagerly — useful when you want connections established before the first dispatch.

```python
# Eager proxy startup — connections established before first dispatch
async with wool.WorkerPool(spawn=4, lazy=False):
result = await my_routine()
```

## Workers

A worker is a separate OS process hosting a gRPC server with two RPCs: `dispatch` (bidirectional streaming for task execution) and `stop` (graceful shutdown). Tasks execute on a dedicated asyncio event loop in a separate daemon thread, so that long-running or CPU-intensive task code does not block the main gRPC event loop. This keeps the worker responsive to new dispatches, stop requests, and concurrent streaming interactions with in-flight tasks.
Expand Down
9 changes: 4 additions & 5 deletions wool/src/wool/runtime/discovery/lan.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,10 @@ def _resolve_address(self, address: str) -> Tuple[bytes, int]:

return socket.inet_aton(socket.gethostbyname(host)), port

class Subscriber(metaclass=SubscriberMeta):
class Subscriber(
metaclass=SubscriberMeta,
key=lambda cls, service_type: (cls, service_type),
):
"""Subscriber for receiving worker discovery events.

Subscribes to worker :class:`discovery events
Expand All @@ -347,10 +350,6 @@ class Subscriber(metaclass=SubscriberMeta):
def __init__(self, service_type: str) -> None:
self.service_type = service_type

@classmethod
def _cache_key(cls, service_type: str) -> tuple:
return (cls, service_type)

async def _shutdown(self) -> None:
"""Clean up shared subscription state for this subscriber."""

Expand Down
13 changes: 8 additions & 5 deletions wool/src/wool/runtime/discovery/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,14 @@ def _shared_memory_finalizer(self, shared_memory: SharedMemory):
pass # pragma: no cover
atexit.unregister(self._cleanups.pop(shared_memory.name))

class Subscriber(metaclass=SubscriberMeta):
class Subscriber(
metaclass=SubscriberMeta,
key=lambda cls, namespace, *, poll_interval=None: (
cls,
namespace,
poll_interval,
),
):
"""Subscriber for receiving worker discovery events.

Subscribes to worker :class:`discovery events <~wool.DiscoveryEvent>`
Expand Down Expand Up @@ -600,10 +607,6 @@ def __init__(
raise ValueError(f"Expected positive poll interval, got {poll_interval}")
self._poll_interval = poll_interval

@classmethod
def _cache_key(cls, namespace: str, *, poll_interval: float | None = None):
return (cls, namespace, poll_interval)

async def _shutdown(self) -> None:
"""Clean up shared subscription state for this subscriber."""

Expand Down
19 changes: 14 additions & 5 deletions wool/src/wool/runtime/discovery/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,31 @@ class SubscriberMeta(type):
:class:`~wool.runtime.resourcepool.Resource` is entered lazily
on first iteration.

Subscriber classes using this metaclass must define a
``_cache_key`` classmethod that returns a hashable key from the
constructor arguments.
Subscriber classes pass a ``key`` keyword argument at class
definition time. The callable receives ``(cls, *args, **kwargs)``
and must return a hashable cache key.

Example::

class Sub(metaclass=SubscriberMeta, key=lambda cls, ns: (cls, ns)):
def __init__(self, ns: str) -> None: ...
"""

def __new__(
mcs,
name: str,
bases: tuple[type, ...],
namespace: dict[str, Any],
**kwargs: Any,
) -> SubscriberMeta:
cls = super().__new__(mcs, name, bases, namespace)
key_fn: Callable[..., Any] | None = kwargs.pop("key", None)
cls = super().__new__(mcs, name, bases, namespace, **kwargs)
if key_fn is not None:
cls._cache_key_fn = key_fn # type: ignore[attr-defined]
original_init = cls.__init__ # type: ignore[misc]

def _subscriber_new(cls_arg: type, *args: Any, **kwargs: Any) -> Any:
key = cls_arg._cache_key(*args, **kwargs) # type: ignore[attr-defined]
key = cls_arg._cache_key_fn(cls_arg, *args, **kwargs) # type: ignore[attr-defined]
pool = __subscriber_pool__.get()
if pool is None:
pool = ResourcePool(
Expand Down
29 changes: 28 additions & 1 deletion wool/src/wool/runtime/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ Signal handlers map `SIGTERM` to timeout 0 (cancel immediately) and `SIGINT` to

### Nested routines

Worker subprocesses can dispatch tasks to other workers. Each subprocess is configured with a `ResourcePool` of `WorkerProxy` instances (via `wool.__proxy_pool__`), so `@wool.routine` calls within a task transparently route to the target pool. Spinning up a `WorkerProxy` is not free — it involves establishing a discovery subscription and opening gRPC connections — so the resource pool caches proxies with a configurable TTL (default 60 seconds, set via `proxy_pool_ttl` on `LocalWorker`). If the interval between dispatches for a given pool on a given worker is shorter than the TTL, the cached proxy is reused. If it exceeds the TTL, the proxy is finalized and must be recreated on the next dispatch. Tuning `proxy_pool_ttl` above the expected dispatch interval keeps proxies warm and avoids this cold-start overhead.
Worker subprocesses can dispatch tasks to other workers. Each subprocess is configured with a `ResourcePool` of `WorkerProxy` instances (via `wool.__proxy_pool__`), so `@wool.routine` calls within a task transparently route to the target pool. Spinning up a `WorkerProxy` is not free — it involves establishing a discovery subscription, starting a sentinel task, and opening gRPC connections — so the resource pool caches proxies with a configurable TTL (default 60 seconds, set via `proxy_pool_ttl` on `LocalWorker`). If the interval between dispatches for a given pool on a given worker is shorter than the TTL, the cached proxy is reused. If it exceeds the TTL, the proxy is finalized and must be recreated on the next dispatch. Tuning `proxy_pool_ttl` above the expected dispatch interval keeps proxies warm and avoids this cold-start overhead.

Proxies on worker subprocesses are lazy by default — the `WorkerPool` propagates its `lazy` flag to every `WorkerProxy` it constructs, and each task serializes the proxy (including the flag) so that workers receiving the task inherit the same laziness setting. A lazy proxy defers discovery subscription and sentinel setup until its first `dispatch()` call, so workers that never invoke nested routines pay no startup cost.

## Connections

Expand All @@ -207,6 +209,31 @@ Worker subprocesses can dispatch tasks to other workers. Each subprocess is conf
| Discovery | `discovery` | Accepts any `DiscoverySubscriberLike` or `Factory` thereof. |
| Static | `workers` | Takes a sequence of `WorkerMetadata` directly — no discovery needed. |

### Lazy startup

`WorkerProxy` accepts a `lazy` parameter (default `True`) that controls when the proxy actually starts — i.e., when it subscribes to discovery, launches the worker sentinel task, and initializes the load balancer context.

| `lazy` | `enter()` / `__aenter__` | `dispatch()` | `exit()` on un-started proxy |
| ------ | ------------------------ | ------------- | ----------------------------- |
| `True` | Sets context var only | Calls `start()` on first call, then dispatches | No-op (safe to call) |
| `False` | Sets context var, calls `start()` | Raises `RuntimeError` if not started | Raises `RuntimeError` |

When `lazy=True`, concurrent `dispatch()` calls use a double-checked lock to ensure the proxy starts exactly once. The `lazy` flag is preserved through `cloudpickle` serialization, so proxies sent to worker subprocesses as part of a task retain their laziness setting.

### Context lifecycle

Both `WorkerPool` and `WorkerProxy` are **single-use** async context managers. Once entered and exited, the same instance cannot be entered again — create a new instance instead. Attempting to call `enter()` or `__aenter__()` a second time raises `RuntimeError`. This prevents silent state corruption from reentrant or repeated context usage (e.g., accidentally nesting `async with proxy:` blocks or calling `enter()` in a retry loop).

```python
# Correct — one instance per context
async with wool.WorkerPool(spawn=4):
await my_routine()

# Need another pool? Create a new instance.
async with wool.WorkerPool(spawn=4):
await my_routine()
```

### Self-describing connections

Workers are self-describing: each worker advertises its gRPC transport configuration via `ChannelOptions` in its `WorkerMetadata`. When a client discovers a worker, it reads the advertised options and configures its channel to match — message sizes, keepalive intervals, concurrency limits, and compression are all set automatically. There is no separate client-side configuration step; the worker's metadata is the single source of truth for how to connect to it.
Expand Down
17 changes: 17 additions & 0 deletions wool/src/wool/runtime/worker/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from wool.runtime.worker.proxy import LoadBalancerLike
from wool.runtime.worker.proxy import RoundRobinLoadBalancer
from wool.runtime.worker.proxy import WorkerProxy
from wool.utilities.noreentry import noreentry


# public
Expand Down Expand Up @@ -180,6 +181,7 @@ def __init__(
LoadBalancerLike | Factory[LoadBalancerLike]
) = RoundRobinLoadBalancer,
credentials: WorkerCredentials | None = None,
lazy: bool = True,
):
"""
Create an ephemeral pool of workers, spawning the specified
Expand All @@ -197,6 +199,7 @@ def __init__(
LoadBalancerLike | Factory[LoadBalancerLike]
) = RoundRobinLoadBalancer,
credentials: WorkerCredentials | None = None,
lazy: bool = True,
):
"""
Connect to an existing pool of workers discovered by the
Expand All @@ -216,6 +219,7 @@ def __init__(
LoadBalancerLike | Factory[LoadBalancerLike]
) = RoundRobinLoadBalancer,
credentials: WorkerCredentials | None = None,
lazy: bool = True,
):
"""
Create a hybrid pool that spawns local workers and discovers
Expand All @@ -236,6 +240,7 @@ def __init__(
LoadBalancerLike | Factory[LoadBalancerLike]
) = RoundRobinLoadBalancer,
credentials: WorkerCredentials | None = None,
lazy: bool = True,
): ...

@overload
Expand All @@ -251,6 +256,7 @@ def __init__(
LoadBalancerLike | Factory[LoadBalancerLike]
) = RoundRobinLoadBalancer,
credentials: WorkerCredentials | None = None,
lazy: bool = True,
): ...

def __init__(
Expand All @@ -265,9 +271,11 @@ def __init__(
LoadBalancerLike | Factory[LoadBalancerLike]
) = RoundRobinLoadBalancer,
credentials: WorkerCredentials | None = None,
lazy: bool = True,
):
self._workers = {}
self._credentials = credentials
self._lazy = lazy

if size is not None and spawn is not None:
raise TypeError(
Expand Down Expand Up @@ -310,6 +318,7 @@ async def create_proxy():
loadbalancer=loadbalancer,
credentials=self._credentials,
lease=max_workers,
lazy=self._lazy,
):
yield
finally:
Expand All @@ -335,6 +344,7 @@ async def create_proxy():
loadbalancer=loadbalancer,
credentials=self._credentials,
lease=max_workers,
lazy=self._lazy,
):
yield

Expand All @@ -353,6 +363,7 @@ async def create_proxy():
loadbalancer=loadbalancer,
credentials=self._credentials,
lease=lease,
lazy=self._lazy,
):
yield
finally:
Expand All @@ -378,6 +389,7 @@ async def create_proxy():
loadbalancer=loadbalancer,
credentials=self._credentials,
lease=max_workers,
lazy=self._lazy,
):
yield

Expand All @@ -386,6 +398,7 @@ async def create_proxy():

self._proxy_factory = create_proxy

@noreentry
async def __aenter__(self) -> WorkerPool:
"""Starts the worker pool and its services, returning a session.

Expand All @@ -394,6 +407,10 @@ async def __aenter__(self) -> WorkerPool:

:returns:
The :class:`WorkerPool` instance itself for method chaining.
:raises RuntimeError:
If the pool has already been entered. ``WorkerPool``
contexts are single-use — create a new instance instead
of re-entering.
"""
self._proxy_context = self._proxy_factory()
await self._proxy_context.__aenter__()
Expand Down
Loading
Loading