From c0ba1f9ea9df9431614afc9778a2693ec643e917 Mon Sep 17 00:00:00 2001 From: Conrad Date: Sat, 28 Mar 2026 09:37:47 -0400 Subject: [PATCH 01/12] refactor: Accept cache key callable as a SubscriberMeta keyword argument Replace the _cache_key classmethod convention with a key keyword argument passed at class definition time. The metaclass stores the callable and invokes it in the injected __new__, making the cache key requirement explicit at the declaration site. --- wool/src/wool/runtime/discovery/lan.py | 9 +- wool/src/wool/runtime/discovery/local.py | 13 ++- wool/src/wool/runtime/discovery/pool.py | 19 +++- wool/tests/runtime/discovery/test_pool.py | 110 +++++++++++++++++++--- 4 files changed, 121 insertions(+), 30 deletions(-) diff --git a/wool/src/wool/runtime/discovery/lan.py b/wool/src/wool/runtime/discovery/lan.py index 07a9b97..5dfcf83 100644 --- a/wool/src/wool/runtime/discovery/lan.py +++ b/wool/src/wool/runtime/discovery/lan.py @@ -323,7 +323,10 @@ def _resolve_address(self, address: str) -> Tuple[bytes, int]: return socket.inet_aton(socket.gethostbyname(host)), port - class Subscriber(metaclass=SubscriberMeta): + class Subscriber( + metaclass=SubscriberMeta, + key=lambda cls, service_type: (cls, service_type), + ): """Subscriber for receiving worker discovery events. Subscribes to worker :class:`discovery events @@ -347,10 +350,6 @@ class Subscriber(metaclass=SubscriberMeta): def __init__(self, service_type: str) -> None: self.service_type = service_type - @classmethod - def _cache_key(cls, service_type: str) -> tuple: - return (cls, service_type) - async def _shutdown(self) -> None: """Clean up shared subscription state for this subscriber.""" diff --git a/wool/src/wool/runtime/discovery/local.py b/wool/src/wool/runtime/discovery/local.py index d4c3484..ebb31fb 100644 --- a/wool/src/wool/runtime/discovery/local.py +++ b/wool/src/wool/runtime/discovery/local.py @@ -560,7 +560,14 @@ def _shared_memory_finalizer(self, shared_memory: SharedMemory): pass # pragma: no cover atexit.unregister(self._cleanups.pop(shared_memory.name)) - class Subscriber(metaclass=SubscriberMeta): + class Subscriber( + metaclass=SubscriberMeta, + key=lambda cls, namespace, *, poll_interval=None: ( + cls, + namespace, + poll_interval, + ), + ): """Subscriber for receiving worker discovery events. Subscribes to worker :class:`discovery events <~wool.DiscoveryEvent>` @@ -600,10 +607,6 @@ def __init__( raise ValueError(f"Expected positive poll interval, got {poll_interval}") self._poll_interval = poll_interval - @classmethod - def _cache_key(cls, namespace: str, *, poll_interval: float | None = None): - return (cls, namespace, poll_interval) - async def _shutdown(self) -> None: """Clean up shared subscription state for this subscriber.""" diff --git a/wool/src/wool/runtime/discovery/pool.py b/wool/src/wool/runtime/discovery/pool.py index 536fef5..56bb22f 100644 --- a/wool/src/wool/runtime/discovery/pool.py +++ b/wool/src/wool/runtime/discovery/pool.py @@ -108,9 +108,14 @@ class SubscriberMeta(type): :class:`~wool.runtime.resourcepool.Resource` is entered lazily on first iteration. - Subscriber classes using this metaclass must define a - ``_cache_key`` classmethod that returns a hashable key from the - constructor arguments. + Subscriber classes pass a ``key`` keyword argument at class + definition time. The callable receives ``(cls, *args, **kwargs)`` + and must return a hashable cache key. + + Example:: + + class Sub(metaclass=SubscriberMeta, key=lambda cls, ns: (cls, ns)): + def __init__(self, ns: str) -> None: ... """ def __new__( @@ -118,12 +123,16 @@ def __new__( name: str, bases: tuple[type, ...], namespace: dict[str, Any], + **kwargs: Any, ) -> SubscriberMeta: - cls = super().__new__(mcs, name, bases, namespace) + key_fn: Callable[..., Any] | None = kwargs.pop("key", None) + cls = super().__new__(mcs, name, bases, namespace, **kwargs) + if key_fn is not None: + cls._cache_key_fn = key_fn # type: ignore[attr-defined] original_init = cls.__init__ # type: ignore[misc] def _subscriber_new(cls_arg: type, *args: Any, **kwargs: Any) -> Any: - key = cls_arg._cache_key(*args, **kwargs) # type: ignore[attr-defined] + key = cls_arg._cache_key_fn(cls_arg, *args, **kwargs) # type: ignore[attr-defined] pool = __subscriber_pool__.get() if pool is None: pool = ResourcePool( diff --git a/wool/tests/runtime/discovery/test_pool.py b/wool/tests/runtime/discovery/test_pool.py index 049d82e..49b362c 100644 --- a/wool/tests/runtime/discovery/test_pool.py +++ b/wool/tests/runtime/discovery/test_pool.py @@ -15,16 +15,15 @@ from wool.runtime.worker.metadata import WorkerMetadata -class _StubSubscriber(metaclass=SubscriberMeta): +class _StubSubscriber( + metaclass=SubscriberMeta, + key=lambda cls, key_value: (cls, key_value), +): """Minimal subscriber stub for testing SubscriberMeta.""" def __init__(self, key_value: str) -> None: self.key_value = key_value - @classmethod - def _cache_key(cls, key_value: str): - return (cls, key_value) - def __reduce__(self): return type(self), (self.key_value,) @@ -105,14 +104,13 @@ async def test___new___with_shared_fan_out(self): # Arrange events = [_make_event(address=f"127.0.0.1:{50051 + i}") for i in range(3)] - class _EventStub(metaclass=SubscriberMeta): + class _EventStub( + metaclass=SubscriberMeta, + key=lambda cls, tag: (cls, tag), + ): def __init__(self, tag): self.tag = tag - @classmethod - def _cache_key(cls, tag): - return (cls, tag) - async def _shutdown(self): pass @@ -157,15 +155,14 @@ async def test___new___with_different_keys(self): event_a = _make_event(address="10.0.0.1:1") event_b = _make_event(address="10.0.0.2:2") - class _IsoStub(metaclass=SubscriberMeta): + class _IsoStub( + metaclass=SubscriberMeta, + key=lambda cls, tag, event: (cls, tag), + ): def __init__(self, tag, event): self.tag = tag self._event = event - @classmethod - def _cache_key(cls, tag, event): - return (cls, tag) - async def _shutdown(self): pass @@ -206,6 +203,89 @@ def test___new___with_lazy_pool_init(self): # Assert assert __subscriber_pool__.get() is not None + def test___new___with_key_callable_receiving_cls(self): + """Test key callable receives the class as its first argument. + + Given: + A subscriber class defined with a key callable that + includes cls in the returned tuple. + When: + The class is instantiated. + Then: + It should produce a _SharedSubscription whose cache key + was computed using the class itself. + """ + # Arrange + captured = {} + + def capture_key(cls, value): + captured["cls"] = cls + captured["value"] = value + return (cls, value) + + class _CaptureSub( + metaclass=SubscriberMeta, + key=capture_key, + ): + def __init__(self, value): + self.value = value + + async def _shutdown(self): + pass + + def __aiter__(self): + return self._gen() + + async def _gen(self): + yield # pragma: no cover + + # Act + result = _CaptureSub("test-val") + + # Assert + assert isinstance(result, _SharedSubscription) + assert captured["cls"] is _CaptureSub + assert captured["value"] == "test-val" + + def test___new___with_inherited_key(self): + """Test subclass inherits key callable from parent. + + Given: + A parent subscriber class defined with a key callable + and a subclass that does not pass its own key. + When: + The subclass is instantiated. + Then: + It should inherit the parent's key callable and return + a _SharedSubscription. + """ + + # Arrange + class _Parent( + metaclass=SubscriberMeta, + key=lambda cls, tag: (cls, tag), + ): + def __init__(self, tag): + self.tag = tag + + async def _shutdown(self): + pass + + def __aiter__(self): + return self._gen() + + async def _gen(self): + yield # pragma: no cover + + class _Child(_Parent): + pass + + # Act + result = _Child("child-tag") + + # Assert + assert isinstance(result, _SharedSubscription) + class TestSharedSubscription: """Tests for _SharedSubscription class. From faaa3440996176a7f2cd70c88d724a3f36f85078 Mon Sep 17 00:00:00 2001 From: Conrad Date: Sat, 28 Mar 2026 21:01:08 -0400 Subject: [PATCH 02/12] ci: Split unit and integration test coverage in CI Replace the single run-tests job with separate unit-tests and integration-tests jobs. Each job filters by pytest marker and enforces its own coverage threshold via --cov-fail-under (98% for unit, 70% for integration). The integration job depends on unit-tests so it only runs after unit tests pass. Move the fail_under setting out of .coveragerc into per-job CLI args to allow independent thresholds. Extract the shared checkout, uv setup, install, and pytest steps into a run-tests composite action to avoid duplicating the setup sequence across both jobs. --- .github/actions/run-tests/action.yaml | 37 +++++++++++++++++++ .github/workflows/run-tests.yaml | 53 +++++++++++++-------------- wool/.coveragerc | 1 - 3 files changed, 62 insertions(+), 29 deletions(-) create mode 100644 .github/actions/run-tests/action.yaml diff --git a/.github/actions/run-tests/action.yaml b/.github/actions/run-tests/action.yaml new file mode 100644 index 0000000..b40d963 --- /dev/null +++ b/.github/actions/run-tests/action.yaml @@ -0,0 +1,37 @@ +name: Run tests +description: Checkout, install dependencies, and run pytest for a given namespace. +inputs: + python-version: + required: true + type: string + namespace: + required: true + type: string + pytest-marker: + required: true + type: string + cov-fail-under: + required: true + type: string + +runs: + using: composite + steps: + - name: Install uv and prepare python + uses: astral-sh/setup-uv@v5 + with: + python-version: ${{ inputs.python-version }} + + - name: Install packages + shell: bash + run: | + .github/scripts/install-python-packages.sh + uv pip install -e './${{ inputs.namespace }}[dev]' + uv pip freeze + + - name: Run tests + shell: bash + run: | + ulimit -n 65536 + cd ${{ inputs.namespace }} + uv run pytest -m "${{ inputs.pytest-marker }}" --cov-fail-under=${{ inputs.cov-fail-under }} diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 1f45cbc..65c46dc 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -34,38 +34,35 @@ jobs: cd wool uv run pyright - run-tests: - name: Namespace ${{ matrix.namespace }} / Python ${{ matrix.python-version }} + unit-tests: + name: Unit / Python ${{ matrix.python-version }} runs-on: ubuntu-latest strategy: matrix: - namespace: - - 'wool' - python-version: - - '3.11' - - '3.12' - - '3.13' + namespace: ['wool'] + python-version: ['3.11', '3.12', '3.13'] steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Install uv and prepare python - uses: astral-sh/setup-uv@v5 + - uses: actions/checkout@v4 + - uses: ./.github/actions/run-tests with: python-version: ${{ matrix.python-version }} + namespace: ${{ matrix.namespace }} + pytest-marker: not integration + cov-fail-under: '98' - - name: Install packages - env: - NAMESPACE: ${{ matrix.namespace }} - run: | - .github/scripts/install-python-packages.sh - uv pip install -e './${{ env.NAMESPACE }}[dev]' - uv pip freeze - - - name: Run tests - env: - NAMESPACE: ${{ matrix.namespace }} - run: | - ulimit -n 65536 - cd ${{ env.NAMESPACE }} - uv run pytest + integration-tests: + name: Integration / Python ${{ matrix.python-version }} + needs: unit-tests + runs-on: ubuntu-latest + strategy: + matrix: + namespace: ['wool'] + python-version: ['3.11', '3.12', '3.13'] + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/run-tests + with: + python-version: ${{ matrix.python-version }} + namespace: ${{ matrix.namespace }} + pytest-marker: integration + cov-fail-under: '70' diff --git a/wool/.coveragerc b/wool/.coveragerc index 799cb71..9c7a0b4 100644 --- a/wool/.coveragerc +++ b/wool/.coveragerc @@ -9,4 +9,3 @@ omit = [report] show_missing = true precision = 2 -fail_under = 98 From 7238aa15efb8d42dd168a8f8687ec7334eb6a152 Mon Sep 17 00:00:00 2001 From: Conrad Date: Mon, 30 Mar 2026 11:10:54 -0400 Subject: [PATCH 03/12] feat: Add lazy proxy start to WorkerProxy WorkerProxy now accepts a lazy flag (default True) that defers discovery subscription and sentinel setup until dispatch() is first called. Double-checked locking ensures start() runs exactly once under concurrent dispatch. Lazy proxies treat start() as a no-op and stop() as a no-op when never started. The flag is threaded through WorkerPool and preserved across pickle via __reduce__. The proxy factory calls start() indiscriminately since lazy proxies handle it as a no-op. --- wool/src/wool/runtime/worker/pool.py | 11 ++++ wool/src/wool/runtime/worker/process.py | 16 ++--- wool/src/wool/runtime/worker/proxy.py | 85 ++++++++++++++++++------- 3 files changed, 82 insertions(+), 30 deletions(-) diff --git a/wool/src/wool/runtime/worker/pool.py b/wool/src/wool/runtime/worker/pool.py index bcbf7be..d42d893 100644 --- a/wool/src/wool/runtime/worker/pool.py +++ b/wool/src/wool/runtime/worker/pool.py @@ -180,6 +180,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): """ Create an ephemeral pool of workers, spawning the specified @@ -197,6 +198,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): """ Connect to an existing pool of workers discovered by the @@ -216,6 +218,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): """ Create a hybrid pool that spawns local workers and discovers @@ -236,6 +239,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): ... @overload @@ -251,6 +255,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): ... def __init__( @@ -265,9 +270,11 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): self._workers = {} self._credentials = credentials + self._lazy = lazy if size is not None and spawn is not None: raise TypeError( @@ -310,6 +317,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=max_workers, + lazy=self._lazy, ): yield finally: @@ -335,6 +343,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=max_workers, + lazy=self._lazy, ): yield @@ -353,6 +362,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=lease, + lazy=self._lazy, ): yield finally: @@ -378,6 +388,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=max_workers, + lazy=self._lazy, ): yield diff --git a/wool/src/wool/runtime/worker/process.py b/wool/src/wool/runtime/worker/process.py index f6d963f..a2a08d1 100644 --- a/wool/src/wool/runtime/worker/process.py +++ b/wool/src/wool/runtime/worker/process.py @@ -390,17 +390,16 @@ def _sigint_handler(loop, service, signum, frame): async def _proxy_factory(proxy: WorkerProxy): """Factory function for WorkerProxy instances in ResourcePool. - Starts the proxy if not already started and returns it. - The proxy object itself is used as the cache key. + Calls ``start()`` on the proxy. Lazy proxies treat this as a + no-op and defer startup until first dispatch. Non-lazy proxies + start eagerly. The proxy object itself is used as the cache key. :param proxy: - The WorkerProxy instance to start (passed as key from - ResourcePool). + The WorkerProxy instance (passed as key from ResourcePool). :returns: - The started WorkerProxy instance. + The started (or lazy) WorkerProxy instance. """ - if not proxy.started: - await proxy.start() + await proxy.start() return proxy @@ -408,7 +407,8 @@ async def _proxy_finalizer(proxy: WorkerProxy): """Finalizer function for WorkerProxy instances in ResourcePool. Stops the proxy when it's being cleaned up from the resource pool. - Based on the cleanup logic from WorkerProxyCache._delayed_cleanup. + Lazy proxies that were never started are handled gracefully by + the proxy's own stop method. :param proxy: The WorkerProxy instance to clean up. diff --git a/wool/src/wool/runtime/worker/proxy.py b/wool/src/wool/runtime/worker/proxy.py index e6567ce..43ba196 100644 --- a/wool/src/wool/runtime/worker/proxy.py +++ b/wool/src/wool/runtime/worker/proxy.py @@ -35,6 +35,8 @@ from wool.runtime.worker.metadata import WorkerMetadata if TYPE_CHECKING: + from contextvars import Token + from wool.runtime.routine.task import Task T = TypeVar("T") @@ -208,6 +210,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): ... @overload @@ -220,6 +223,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): ... @overload @@ -232,6 +236,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): ... def __init__( @@ -247,6 +252,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): if not (pool_uri or discovery or workers): raise ValueError( @@ -259,8 +265,11 @@ def __init__( self._id: uuid.UUID = uuid.uuid4() self._started = False + self._lazy = lazy + self._start_lock = asyncio.Lock() if lazy else None self._loadbalancer = loadbalancer self._lease = lease + self._proxy_token: Token[WorkerProxy | None] | None = None if isinstance(loadbalancer, (ContextManager, AsyncContextManager)): warnings.warn( @@ -363,11 +372,12 @@ def __reduce__(self) -> tuple: f"{name}=my_cm())." ) - def _restore_proxy(discovery, loadbalancer, proxy_id, lease): + def _restore_proxy(discovery, loadbalancer, proxy_id, lease, lazy): proxy = WorkerProxy( discovery=discovery, loadbalancer=loadbalancer, lease=lease, + lazy=lazy, ) proxy._id = proxy_id return proxy @@ -379,6 +389,7 @@ def _restore_proxy(discovery, loadbalancer, proxy_id, lease): self._loadbalancer, self._id, self._lease, + self._lazy, ), ) @@ -390,6 +401,10 @@ def id(self) -> uuid.UUID: def started(self) -> bool: return self._started + @property + def lazy(self) -> bool: + return self._lazy + @property def workers(self) -> list[WorkerMetadata]: """A list of the currently discovered worker gRPC stubs.""" @@ -401,9 +416,21 @@ def workers(self) -> list[WorkerMetadata]: async def start(self) -> None: """Starts the proxy by initiating the worker discovery process. + Always sets this proxy as the active context variable. When + ``lazy=True``, defers discovery and load-balancer startup + until :meth:`dispatch` is first called. When ``lazy=False``, + starts eagerly. + :raises RuntimeError: - If the proxy has already been started. + If the proxy has already been started and ``lazy`` is + ``False``. """ + self._proxy_token = wool.__proxy__.set(self) + if self._lazy: + return + await self._start() + + async def _start(self) -> None: if self._started: raise RuntimeError("Proxy already started") @@ -421,7 +448,6 @@ async def start(self) -> None: if not isinstance(self._discovery_stream, DiscoverySubscriberLike): raise ValueError - self._proxy_token = wool.__proxy__.set(self) self._loadbalancer_context = LoadBalancerContext() self._sentinel_task = asyncio.create_task(self._worker_sentinel()) self._started = True @@ -429,25 +455,34 @@ async def start(self) -> None: async def stop(self, *args) -> None: """Stops the proxy, terminating discovery and clearing connections. + When ``lazy=True``, calling stop on an un-started proxy resets + only the context variable. When ``lazy=False``, raises + :class:`RuntimeError`. + :raises RuntimeError: - If the proxy was not started first. + If the proxy was not started first and ``lazy`` is + ``False``. """ + if self._proxy_token is not None: + wool.__proxy__.reset(self._proxy_token) + self._proxy_token = None if not self._started: - raise RuntimeError("Proxy not started - call start() first") - - await self._exit_context(self._discovery_context_manager, *args) - await self._exit_context(self._loadbalancer_context_manager, *args) - - wool.__proxy__.reset(self._proxy_token) - if self._sentinel_task: - self._sentinel_task.cancel() - try: - await self._sentinel_task - except asyncio.CancelledError: - pass - self._sentinel_task = None - self._loadbalancer_context = None - self._started = False + if not self._lazy: + raise RuntimeError("Proxy not started - call start() first") + return + else: + await self._exit_context(self._discovery_context_manager, *args) + await self._exit_context(self._loadbalancer_context_manager, *args) + + if self._sentinel_task: + self._sentinel_task.cancel() + try: + await self._sentinel_task + except asyncio.CancelledError: + pass + self._sentinel_task = None + self._loadbalancer_context = None + self._started = False async def dispatch( self, task: Task, *, timeout: float | None = None @@ -456,7 +491,8 @@ async def dispatch( This method selects a worker using a round-robin strategy. If no workers are available within the timeout period, it raises an - exception. + exception. When ``lazy=True``, the proxy is started automatically + on first dispatch. :param task: The :class:`Task` object to be dispatched. @@ -465,12 +501,17 @@ async def dispatch( :returns: A protobuf result object from the worker. :raises RuntimeError: - If the proxy is not started. + If the proxy is not started and ``lazy`` is ``False``. :raises asyncio.TimeoutError: If no worker is available within the timeout period. """ if not self._started: - raise RuntimeError("Proxy not started - call start() first") + if not self._lazy: + raise RuntimeError("Proxy not started - call start() first") + assert self._start_lock is not None + async with self._start_lock: + if not self._started: + await self._start() await asyncio.wait_for(self._await_workers(), 60) From d4e79d26db07a54299ed7708101366b8dbab5ca0 Mon Sep 17 00:00:00 2001 From: Conrad Date: Mon, 30 Mar 2026 11:11:02 -0400 Subject: [PATCH 04/12] test: Cover lazy proxy start across unit and integration tests Add unit tests for the lazy property, start() no-op, dispatch auto-start, concurrent dispatch locking, stop no-op, and pickle roundtrip. Update existing proxy tests to use lazy=False where they test eager start behavior. Add LazyMode dimension to the integration test pairwise covering array and Hypothesis strategy so both LAZY and EAGER paths are exercised end-to-end. --- wool/tests/integration/conftest.py | 34 +- wool/tests/integration/test_integration.py | 4 + .../integration/test_pool_composition.py | 13 +- wool/tests/integration/test_scenario.py | 6 +- wool/tests/runtime/worker/test_process.py | 157 ++++---- wool/tests/runtime/worker/test_proxy.py | 346 ++++++++++++++---- 6 files changed, 390 insertions(+), 170 deletions(-) diff --git a/wool/tests/integration/conftest.py b/wool/tests/integration/conftest.py index dae3195..05e3592 100644 --- a/wool/tests/integration/conftest.py +++ b/wool/tests/integration/conftest.py @@ -105,6 +105,11 @@ class RoutineBinding(Enum): STATICMETHOD = auto() +class LazyMode(Enum): + LAZY = auto() + EAGER = auto() + + @dataclass(frozen=True) class Scenario: """Composable scenario describing one integration test configuration. @@ -121,6 +126,7 @@ class Scenario: options: WorkerOptionsKind | None = None timeout: TimeoutKind | None = None binding: RoutineBinding | None = None + lazy: LazyMode | None = None def __or__(self, other: Scenario) -> Scenario: """Merge two partial scenarios. Right side wins on ``None`` fields. @@ -141,7 +147,7 @@ def __or__(self, other: Scenario) -> Scenario: @property def is_complete(self) -> bool: - """True when all 8 dimensions are set.""" + """True when all 9 dimensions are set.""" return all(getattr(self, f.name) is not None for f in fields(self)) def __str__(self) -> str: @@ -277,20 +283,24 @@ async def _lan_async_cm(): if scenario.timeout is TimeoutKind.VIA_RUNTIME_CONTEXT: runtime_ctx = RuntimeContext(dispatch_timeout=30.0) + lazy = scenario.lazy is LazyMode.LAZY + try: if runtime_ctx is not None: runtime_ctx.__enter__() try: if scenario.pool_mode is PoolMode.DURABLE: - async with _durable_pool_context(lb, creds, options) as pool: + async with _durable_pool_context(lb, creds, options, lazy) as pool: yield pool elif scenario.pool_mode is PoolMode.DURABLE_SHARED: - async with _durable_shared_pool_context(lb, creds, options) as pool: + async with _durable_shared_pool_context( + lb, creds, options, lazy + ) as pool: yield pool elif scenario.pool_mode is PoolMode.DURABLE_JOINED: async with _durable_joined_pool_context( - scenario.discovery, lb, creds, options + scenario.discovery, lb, creds, options, lazy ) as pool: yield pool else: @@ -298,6 +308,7 @@ async def _lan_async_cm(): "loadbalancer": lb, "credentials": creds, "worker": partial(LocalWorker, options=options), + "lazy": lazy, } match scenario.pool_mode: case PoolMode.DEFAULT: @@ -345,7 +356,7 @@ async def _lan_async_cm(): @asynccontextmanager -async def _durable_pool_context(lb, creds, options): +async def _durable_pool_context(lb, creds, options, lazy): """Manually start a worker, register it, then create a DURABLE pool. DURABLE pools don't spawn workers — they only discover external @@ -368,6 +379,7 @@ async def _durable_pool_context(lb, creds, options): discovery=_DirectDiscovery(discovery), loadbalancer=lb, credentials=creds, + lazy=lazy, ) async with pool: yield pool @@ -378,7 +390,7 @@ async def _durable_pool_context(lb, creds, options): @asynccontextmanager -async def _durable_shared_pool_context(lb, creds, options): +async def _durable_shared_pool_context(lb, creds, options, lazy): """Create two pools sharing the same LocalDiscovery subscriber. Exercises ``SubscriberMeta`` singleton caching and @@ -401,11 +413,13 @@ async def _durable_shared_pool_context(lb, creds, options): discovery=shared, loadbalancer=lb, credentials=creds, + lazy=lazy, ) pool_b = WorkerPool( discovery=shared, loadbalancer=lb, credentials=creds, + lazy=lazy, ) async with pool_a: async with pool_b: @@ -455,7 +469,7 @@ async def _acm(): @asynccontextmanager -async def _durable_joined_pool_context(discovery_factory, lb, creds, options): +async def _durable_joined_pool_context(discovery_factory, lb, creds, options, lazy): """Create a DURABLE pool that joins an externally owned namespace. Sets up an owner ``LocalDiscovery`` that creates workers and publishes @@ -480,6 +494,7 @@ async def _durable_joined_pool_context(discovery_factory, lb, creds, options): discovery=joiner, loadbalancer=lb, credentials=creds, + lazy=lazy, ) async with pool: yield pool @@ -704,6 +719,7 @@ def _pairwise_filter(row): options=row[5], timeout=row[6], binding=row[7], + lazy=row[8], ) for row in AllPairs( [ @@ -715,6 +731,7 @@ def _pairwise_filter(row): list(WorkerOptionsKind), list(TimeoutKind), list(RoutineBinding), + list(LazyMode), ], filter_func=_pairwise_filter, ) @@ -774,6 +791,8 @@ def scenarios_strategy(draw): else: binding = draw(st.sampled_from(RoutineBinding)) + lazy = draw(st.sampled_from(LazyMode)) + return Scenario( shape=shape, pool_mode=pool_mode, @@ -783,6 +802,7 @@ def scenarios_strategy(draw): options=options, timeout=timeout, binding=binding, + lazy=lazy, ) diff --git a/wool/tests/integration/test_integration.py b/wool/tests/integration/test_integration.py index b8c7b26..70d0852 100644 --- a/wool/tests/integration/test_integration.py +++ b/wool/tests/integration/test_integration.py @@ -11,6 +11,7 @@ from .conftest import PAIRWISE_SCENARIOS from .conftest import CredentialType from .conftest import DiscoveryFactory +from .conftest import LazyMode from .conftest import LbFactory from .conftest import PoolMode from .conftest import RoutineBinding @@ -68,6 +69,7 @@ async def body(): WorkerOptionsKind.DEFAULT, TimeoutKind.NONE, RoutineBinding.MODULE_FUNCTION, + LazyMode.LAZY, ) ) @example( @@ -80,6 +82,7 @@ async def body(): WorkerOptionsKind.DEFAULT, TimeoutKind.NONE, RoutineBinding.MODULE_FUNCTION, + LazyMode.LAZY, ) ) @example( @@ -92,6 +95,7 @@ async def body(): WorkerOptionsKind.KEEPALIVE, TimeoutKind.NONE, RoutineBinding.MODULE_FUNCTION, + LazyMode.LAZY, ) ) @given(scenario=scenarios_strategy()) diff --git a/wool/tests/integration/test_pool_composition.py b/wool/tests/integration/test_pool_composition.py index 397a9da..55bf2d5 100644 --- a/wool/tests/integration/test_pool_composition.py +++ b/wool/tests/integration/test_pool_composition.py @@ -4,6 +4,7 @@ from .conftest import CredentialType from .conftest import DiscoveryFactory +from .conftest import LazyMode from .conftest import LbFactory from .conftest import PoolMode from .conftest import RoutineBinding @@ -39,6 +40,7 @@ async def test_build_pool_from_scenario_with_default_mode(self, credentials_map) options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -53,7 +55,8 @@ async def test_build_pool_from_scenario_with_ephemeral_mode(self, credentials_ma """Test building a pool with EPHEMERAL mode and size=2. Given: - A complete scenario using EPHEMERAL pool mode with 2 workers. + A complete scenario using EPHEMERAL pool mode with 2 workers + and eager proxy start. When: A pool is built and a coroutine routine is dispatched. Then: @@ -69,6 +72,7 @@ async def test_build_pool_from_scenario_with_ephemeral_mode(self, credentials_ma options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.EAGER, ) # Act @@ -100,6 +104,7 @@ async def test_build_pool_from_scenario_with_durable_mode(self, credentials_map) options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -131,6 +136,7 @@ async def test_build_pool_from_scenario_with_hybrid_mode(self, credentials_map): options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -166,6 +172,7 @@ async def test_build_pool_from_scenario_with_durable_joined_local( options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -197,6 +204,7 @@ async def test_build_pool_from_scenario_with_restrictive_opts(self, credentials_ options=WorkerOptionsKind.RESTRICTIVE, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -229,6 +237,7 @@ async def test_build_pool_from_scenario_with_keepalive_opts(self, credentials_ma options=WorkerOptionsKind.KEEPALIVE, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -260,6 +269,7 @@ async def test_build_pool_from_scenario_with_dispatch_timeout(self, credentials_ options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.VIA_RUNTIME_CONTEXT, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -294,6 +304,7 @@ async def test_build_pool_from_scenario_with_shared_discovery(self, credentials_ options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act diff --git a/wool/tests/integration/test_scenario.py b/wool/tests/integration/test_scenario.py index 2ad2cbc..81159b2 100644 --- a/wool/tests/integration/test_scenario.py +++ b/wool/tests/integration/test_scenario.py @@ -4,6 +4,7 @@ from .conftest import CredentialType from .conftest import DiscoveryFactory +from .conftest import LazyMode from .conftest import LbFactory from .conftest import PoolMode from .conftest import RoutineBinding @@ -102,7 +103,7 @@ def test_is_complete_with_all_fields(self): """Test that a fully populated scenario reports complete. Given: - A scenario with all 8 dimensions set. + A scenario with all 9 dimensions set. When: ``is_complete`` is checked. Then: @@ -118,6 +119,7 @@ def test_is_complete_with_all_fields(self): options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act & assert @@ -163,4 +165,4 @@ def test___str___with_partial_fields(self): result = str(scenario) # Assert - assert result == "COROUTINE-DEFAULT-_-_-_-_-_-_" + assert result == "COROUTINE-DEFAULT-_-_-_-_-_-_-_" diff --git a/wool/tests/runtime/worker/test_process.py b/wool/tests/runtime/worker/test_process.py index b5cc1d0..0e8b6b3 100644 --- a/wool/tests/runtime/worker/test_process.py +++ b/wool/tests/runtime/worker/test_process.py @@ -215,6 +215,73 @@ def mock_signal(sig, handler): assert signal_calls[3] == (signal.SIGINT, old_sigint) +@pytest.mark.asyncio +async def test__proxy_factory_with_proxy(mocker): + """Test _proxy_factory calls start and returns the proxy. + + Given: + A WorkerProxy instance. + When: + _proxy_factory() is called. + Then: + It should call start() and return the proxy. + """ + # Arrange + mock_proxy = mocker.MagicMock() + mock_proxy.start = mocker.AsyncMock() + + # Act + result = await _proxy_factory(mock_proxy) + + # Assert + mock_proxy.start.assert_called_once() + assert result is mock_proxy + + +@pytest.mark.asyncio +async def test__proxy_finalizer_with_proxy(mocker): + """Test _proxy_finalizer calls stop on the proxy. + + Given: + A proxy that stops successfully. + When: + _proxy_finalizer() is called. + Then: + It should call proxy.stop(). + """ + # Arrange + mock_proxy = mocker.MagicMock() + mock_proxy.stop = mocker.AsyncMock() + + # Act + await _proxy_finalizer(mock_proxy) + + # Assert + mock_proxy.stop.assert_called_once() + + +@pytest.mark.asyncio +async def test__proxy_finalizer_with_stop_exception(mocker): + """Test _proxy_finalizer handles exception from proxy.stop() gracefully. + + Given: + A proxy that raises an exception on stop. + When: + _proxy_finalizer() is called. + Then: + It should catch the exception and complete without propagating it. + """ + # Arrange + mock_proxy = mocker.MagicMock() + mock_proxy.stop = mocker.AsyncMock(side_effect=Exception("Stop failed")) + + # Act — should not raise exception + await _proxy_finalizer(mock_proxy) + + # Assert + mock_proxy.stop.assert_called_once() + + class TestWorkerProcess: """Test suite for WorkerProcess.""" @@ -746,96 +813,6 @@ def test_start_deserializes_metadata_with_extra_from_pipe(self, mocker): assert isinstance(process.metadata.extra, MappingProxyType) assert process.metadata.tags == frozenset({"gpu"}) - @pytest.mark.asyncio - async def test__proxy_factory_starts_proxy_when_not_started(self, mocker): - """Test _proxy_factory starts proxy when not already started. - - Given: - A WorkerProcess and a proxy with started=False - When: - _proxy_factory() is called - Then: - It should call proxy.start() and return the proxy - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.started = False - mock_proxy.start = mocker.AsyncMock() - - # Act - result = await _proxy_factory(mock_proxy) - - # Assert - mock_proxy.start.assert_called_once() - assert result is mock_proxy - - @pytest.mark.asyncio - async def test__proxy_factory_does_not_start_proxy_when_already_started( - self, mocker - ): - """Test _proxy_factory does not start proxy when already started. - - Given: - A WorkerProcess and a proxy with started=True - When: - _proxy_factory() is called - Then: - It should not call proxy.start() but still return the proxy - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.started = True - mock_proxy.start = mocker.AsyncMock() - - # Act - result = await _proxy_factory(mock_proxy) - - # Assert - mock_proxy.start.assert_not_called() - assert result is mock_proxy - - @pytest.mark.asyncio - async def test__proxy_finalizer_successfully_stops_proxy(self, mocker): - """Test _proxy_finalizer successfully stops proxy. - - Given: - A WorkerProcess and a proxy that stops successfully - When: - _proxy_finalizer() is called - Then: - It should call proxy.stop() and complete without error - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.stop = mocker.AsyncMock() - - # Act - await _proxy_finalizer(mock_proxy) - - # Assert - mock_proxy.stop.assert_called_once() - - @pytest.mark.asyncio - async def test__proxy_finalizer_handles_exception_gracefully(self, mocker): - """Test _proxy_finalizer handles exception from proxy.stop() gracefully. - - Given: - A WorkerProcess and a proxy that raises exception on stop - When: - _proxy_finalizer() is called - Then: - It should catch the exception and complete without propagating it - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.stop = mocker.AsyncMock(side_effect=Exception("Stop failed")) - - # Act — should not raise exception - await _proxy_finalizer(mock_proxy) - - # Assert - mock_proxy.stop.assert_called_once() - def test_run_sets_up_proxy_pool_and_starts_server(self, mocker): """Test run method sets up proxy pool and starts gRPC server. diff --git a/wool/tests/runtime/worker/test_proxy.py b/wool/tests/runtime/worker/test_proxy.py index 97383a9..5601e54 100644 --- a/wool/tests/runtime/worker/test_proxy.py +++ b/wool/tests/runtime/worker/test_proxy.py @@ -647,19 +647,51 @@ def test___init___with_callable_loadbalancer_no_warning( user_warnings = [w for w in caught if issubclass(w.category, UserWarning)] assert user_warnings == [] + def test___init___with_default_lazy(self, mock_discovery_service): + """Test WorkerProxy defaults to lazy initialization. + + Given: + A discovery service. + When: + WorkerProxy is instantiated with default parameters. + Then: + It should have lazy set to True. + """ + # Act + proxy = WorkerProxy(discovery=mock_discovery_service) + + # Assert + assert proxy.lazy is True + + def test___init___with_lazy_false(self, mock_discovery_service): + """Test WorkerProxy accepts explicit lazy=False. + + Given: + A discovery service and lazy=False. + When: + WorkerProxy is instantiated. + Then: + It should have lazy set to False. + """ + # Act + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) + + # Assert + assert proxy.lazy is False + @pytest.mark.asyncio async def test___aenter___lifecycle(self, mock_discovery_service): """Test it starts and stops correctly. Given: - A WorkerProxy configured with discovery service + A non-lazy WorkerProxy configured with discovery service When: The proxy is used as a context manager Then: It starts and stops correctly """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) entered = False exited = False @@ -708,14 +740,14 @@ async def test_start_sets_started_flag( """Test set the started flag to True. Given: - An unstarted WorkerProxy instance + A non-lazy unstarted WorkerProxy instance When: Start is called Then: It should set the started flag to True """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act await proxy.start() @@ -723,6 +755,26 @@ async def test_start_sets_started_flag( # Assert assert proxy.started + @pytest.mark.asyncio + async def test_start_with_lazy_proxy(self, mock_discovery_service): + """Test start is a no-op for lazy proxies. + + Given: + A lazy WorkerProxy that has not been started. + When: + start() is called. + Then: + It should remain un-started. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + + # Act + await proxy.start() + + # Assert + assert not proxy.started + @pytest.mark.asyncio async def test_stop_clears_state(self, mock_discovery_service, mock_proxy_session): """Test clear workers and reset the started flag to False. @@ -752,14 +804,14 @@ async def test_start_already_started_raises_error( """Test raise RuntimeError. Given: - A WorkerProxy that is already started + A non-lazy WorkerProxy that is already started When: Start is called again Then: It should raise RuntimeError """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) await proxy.start() # Act & assert @@ -771,19 +823,36 @@ async def test_stop_not_started_raises_error(self, mock_discovery_service): """Test raise RuntimeError. Given: - A WorkerProxy that is not started + A non-lazy WorkerProxy that is not started When: Stop is called Then: It should raise RuntimeError """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act & assert with pytest.raises(RuntimeError, match="Proxy not started"): await proxy.stop() + @pytest.mark.asyncio + async def test_stop_with_unstarted_lazy_proxy(self, mock_discovery_service): + """Test stop is a no-op on an un-started lazy proxy. + + Given: + A lazy WorkerProxy that was never started. + When: + stop() is called. + Then: + It should return without raising. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + + # Act & assert — should not raise + await proxy.stop() + @pytest.mark.asyncio async def test___aenter___enter_starts_proxy( self, mock_discovery_service, mock_proxy_session @@ -791,14 +860,14 @@ async def test___aenter___enter_starts_proxy( """Test automatically start the proxy. Given: - An unstarted WorkerProxy + A non-lazy unstarted WorkerProxy When: The async context manager is entered Then: It should automatically start the proxy """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act & assert async with proxy as p: @@ -811,14 +880,14 @@ async def test___aexit___exit_stops_proxy( """Test automatically stop the proxy. Given: - A WorkerProxy within async context + A non-lazy WorkerProxy within async context When: The async context manager exits Then: It should automatically stop the proxy """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act async with proxy: @@ -834,8 +903,8 @@ async def test_start_stop_with_sync_cm_loadbalancer( """Test start/stop with sync context manager load balancer. Given: - A WorkerProxy with a load balancer provided as a sync - context manager. + A non-lazy WorkerProxy with a load balancer provided as a + sync context manager. When: start() then stop() are called. Then: @@ -859,7 +928,9 @@ def __exit__(self, *args): self.exited = True cm = SyncCM() - proxy = WorkerProxy(discovery=mock_discovery_service, loadbalancer=cm) + proxy = WorkerProxy( + discovery=mock_discovery_service, loadbalancer=cm, lazy=False + ) # Act await proxy.start() @@ -879,8 +950,8 @@ async def test_start_stop_with_async_cm_loadbalancer( """Test start/stop with async context manager load balancer. Given: - A WorkerProxy with a load balancer provided as an async - context manager. + A non-lazy WorkerProxy with a load balancer provided as an + async context manager. When: start() then stop() are called. Then: @@ -904,7 +975,9 @@ async def __aexit__(self, *args): self.exited = True cm = AsyncCM() - proxy = WorkerProxy(discovery=mock_discovery_service, loadbalancer=cm) + proxy = WorkerProxy( + discovery=mock_discovery_service, loadbalancer=cm, lazy=False + ) # Act await proxy.start() @@ -924,8 +997,8 @@ async def test_start_with_awaitable_loadbalancer( """Test start with an awaitable load balancer. Given: - A WorkerProxy with a load balancer provided as a bare - awaitable (coroutine object). + A non-lazy WorkerProxy with a load balancer provided as a + bare awaitable (coroutine object). When: start() is called. Then: @@ -939,7 +1012,9 @@ async def test_start_with_awaitable_loadbalancer( async def make_lb(): return mock_lb - proxy = WorkerProxy(discovery=mock_discovery_service, loadbalancer=make_lb()) + proxy = WorkerProxy( + discovery=mock_discovery_service, loadbalancer=make_lb(), lazy=False + ) # Act await proxy.start() @@ -1055,7 +1130,7 @@ async def test_handles_worker_updated(self, mock_proxy_session): """Test the proxy handles worker-updated events. Given: - A started WorkerProxy with a discovered worker. + A non-lazy, started WorkerProxy with a discovered worker. When: A "worker-updated" event is received for that worker. Then: @@ -1074,7 +1149,7 @@ async def test_handles_worker_updated(self, mock_proxy_session): DiscoveryEvent("worker-updated", metadata=metadata), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1093,8 +1168,9 @@ async def test_sentinel_passes_metadata_options_to_connection( """Test sentinel creates WorkerConnection with metadata options. Given: - A WorkerProxy with a discovery stream yielding a worker-added - event whose metadata has options=ChannelOptions(keepalive_time_ms=60000) + A non-lazy WorkerProxy with a discovery stream yielding a + worker-added event whose metadata has + options=ChannelOptions(keepalive_time_ms=60000) When: The sentinel processes the event Then: @@ -1117,7 +1193,7 @@ async def test_sentinel_passes_metadata_options_to_connection( ) events = [DiscoveryEvent("worker-added", metadata=metadata)] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1140,8 +1216,8 @@ async def test_sentinel_passes_none_options_for_legacy_workers( """Test sentinel creates WorkerConnection with options=None for legacy workers. Given: - A WorkerProxy with a discovery stream yielding a worker-added - event whose metadata has options=None + A non-lazy WorkerProxy with a discovery stream yielding a + worker-added event whose metadata has options=None When: The sentinel processes the event Then: @@ -1161,7 +1237,7 @@ async def test_sentinel_passes_none_options_for_legacy_workers( ) events = [DiscoveryEvent("worker-added", metadata=metadata)] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1184,9 +1260,9 @@ async def test_sentinel_passes_updated_options_on_worker_updated( """Test sentinel creates WorkerConnection with updated metadata options. Given: - A WorkerProxy with a discovery stream yielding a worker-added - event followed by a worker-updated event whose metadata has - options=ChannelOptions(keepalive_time_ms=90000) + A non-lazy WorkerProxy with a discovery stream yielding a + worker-added event followed by a worker-updated event whose + metadata has options=ChannelOptions(keepalive_time_ms=90000) When: The sentinel processes the events Then: @@ -1221,7 +1297,7 @@ async def test_sentinel_passes_updated_options_on_worker_updated( DiscoveryEvent("worker-updated", metadata=updated_metadata), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1292,8 +1368,8 @@ async def test_start_caps_discovered_workers_at_lease( """Test sentinel respects lease cap on worker-added events. Given: - A WorkerProxy with lease=2 and a discovery stream with - 3 worker-added events + A non-lazy WorkerProxy with lease=2 and a discovery stream + with 3 worker-added events When: The sentinel processes all events Then: @@ -1312,7 +1388,7 @@ async def test_start_caps_discovered_workers_at_lease( ] events = [DiscoveryEvent("worker-added", metadata=w) for w in workers] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=2) + proxy = WorkerProxy(discovery=discovery, lease=2, lazy=False) # Act await proxy.start() @@ -1331,8 +1407,8 @@ async def test_start_accepts_all_workers_when_lease_none( """Test sentinel accepts all workers when lease is None. Given: - A WorkerProxy with lease=None and a discovery stream - with 3 worker-added events + A non-lazy WorkerProxy with lease=None and a discovery + stream with 3 worker-added events When: The sentinel processes all events Then: @@ -1351,7 +1427,7 @@ async def test_start_accepts_all_workers_when_lease_none( ] events = [DiscoveryEvent("worker-added", metadata=w) for w in workers] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=None) + proxy = WorkerProxy(discovery=discovery, lease=None, lazy=False) # Act await proxy.start() @@ -1370,7 +1446,7 @@ async def test_start_allows_worker_updated_at_capacity( """Test sentinel processes worker-updated events even at capacity. Given: - A WorkerProxy with lease=2, already at capacity, + A non-lazy WorkerProxy with lease=2, already at capacity, receiving a worker-updated event When: The sentinel processes the update event @@ -1397,7 +1473,7 @@ async def test_start_allows_worker_updated_at_capacity( DiscoveryEvent("worker-updated", metadata=worker1), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=2) + proxy = WorkerProxy(discovery=discovery, lease=2, lazy=False) # Act await proxy.start() @@ -1417,8 +1493,9 @@ async def test_start_accepts_worker_after_drop_frees_capacity( """Test drop restores capacity for a subsequent worker-added event. Given: - A WorkerProxy with lease=2, at capacity with 2 workers, - then one worker is dropped followed by a new worker-added + A non-lazy WorkerProxy with lease=2, at capacity with 2 + workers, then one worker is dropped followed by a new + worker-added When: The proxy processes all events Then: @@ -1451,7 +1528,7 @@ async def test_start_accepts_worker_after_drop_frees_capacity( DiscoveryEvent("worker-added", metadata=worker3), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=2) + proxy = WorkerProxy(discovery=discovery, lease=2, lazy=False) # Act await proxy.start() @@ -1472,8 +1549,8 @@ async def test_start_drops_update_for_rejected_worker( """Test worker-updated for a cap-rejected worker is silently dropped. Given: - A WorkerProxy with lease=1 and a discovery stream where - worker2 is rejected by the cap, then a worker-updated + A non-lazy WorkerProxy with lease=1 and a discovery stream + where worker2 is rejected by the cap, then a worker-updated event arrives for worker2 When: The proxy processes all events @@ -1500,7 +1577,7 @@ async def test_start_drops_update_for_rejected_worker( DiscoveryEvent("worker-updated", metadata=worker2), # dropped ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=1) + proxy = WorkerProxy(discovery=discovery, lease=1, lazy=False) # Act await proxy.start() @@ -1521,7 +1598,7 @@ async def test_cloudpickle_serialization_preserves_lease( """Test pickle round-trip preserves lease cap behavior. Given: - A WorkerProxy with lease=2 and a discovery stream + A non-lazy WorkerProxy with lease=2 and a discovery stream with 3 worker-added events When: The proxy is pickled, unpickled, started, and processes @@ -1547,6 +1624,7 @@ async def test_cloudpickle_serialization_preserves_lease( discovery=discovery, loadbalancer=wp.RoundRobinLoadBalancer, lease=2, + lazy=False, ) # Act — pickle round-trip, then start the restored proxy @@ -1559,6 +1637,29 @@ async def test_cloudpickle_serialization_preserves_lease( assert len(restored.workers) == 2 await restored.stop() + def test_cloudpickle_serialization_preserves_lazy(self, mock_discovery_service): + """Test pickle round-trip preserves the lazy flag. + + Given: + A WorkerProxy with lazy=False. + When: + The proxy is pickled and unpickled. + Then: + It should preserve lazy as False on the restored proxy. + """ + # Arrange + proxy = WorkerProxy( + discovery=mock_discovery_service, + loadbalancer=wp.RoundRobinLoadBalancer, + lazy=False, + ) + + # Act + restored = cloudpickle.loads(cloudpickle.dumps(proxy)) + + # Assert + assert restored.lazy is False + @pytest.mark.asyncio async def test_dispatch_delegates_to_loadbalancer( self, @@ -1610,20 +1711,111 @@ async def test_dispatch_not_started_raises_error( """Test raise RuntimeError. Given: - A WorkerProxy that is not started + A non-lazy WorkerProxy that is not started When: Dispatch is called Then: It should raise RuntimeError """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act & assert with pytest.raises(RuntimeError, match="Proxy not started"): async for _ in await proxy.dispatch(mock_wool_task): pass + @pytest.mark.asyncio + async def test_dispatch_with_lazy_auto_start( + self, + spy_loadbalancer_with_workers, + spy_discovery_with_events, + mock_worker_connection, + mock_wool_task, + mock_proxy_session, + mocker, + ): + """Test dispatch auto-starts a lazy proxy on first call. + + Given: + A lazy WorkerProxy that has not been started. + When: + dispatch() is called. + Then: + It should auto-start the proxy and dispatch the task. + """ + # Arrange + discovery, metadata = spy_discovery_with_events + + proxy = WorkerProxy( + discovery=discovery, + loadbalancer=spy_loadbalancer_with_workers, + ) + + spy_loadbalancer_with_workers.worker_added_callback( + mock_worker_connection, metadata + ) + + assert not proxy.started + + # Act + result_iterator = await proxy.dispatch(mock_wool_task) + results = [result async for result in result_iterator] + + # Assert + assert proxy.started + assert results == ["test_result"] + await proxy.stop() + + @pytest.mark.asyncio + async def test_dispatch_with_lazy_concurrent_start( + self, + spy_loadbalancer_with_workers, + spy_discovery_with_events, + mock_worker_connection, + mock_proxy_session, + mocker, + ): + """Test concurrent dispatch calls start the proxy only once. + + Given: + A lazy WorkerProxy that has not been started. + When: + Two dispatch() calls are made concurrently. + Then: + It should start the proxy and both dispatches should succeed. + """ + # Arrange + discovery, metadata = spy_discovery_with_events + + proxy = WorkerProxy( + discovery=discovery, + loadbalancer=spy_loadbalancer_with_workers, + ) + + spy_loadbalancer_with_workers.worker_added_callback( + mock_worker_connection, metadata + ) + + task = mocker.MagicMock(spec=Task) + spy_loadbalancer_with_workers.dispatch = mocker.AsyncMock( + return_value=mocker.AsyncMock( + __aiter__=mocker.Mock(return_value=mocker.AsyncMock()), + __anext__=mocker.AsyncMock(side_effect=StopAsyncIteration), + ) + ) + + # Act + await asyncio.gather( + proxy.dispatch(task), + proxy.dispatch(task), + ) + + # Assert + assert proxy.started + assert spy_loadbalancer_with_workers.dispatch.call_count == 2 + await proxy.stop() + @pytest.mark.asyncio async def test_dispatch_propagates_loadbalancer_errors( self, @@ -1845,7 +2037,7 @@ async def test_proxy_with_static_workers_list(self): ), ] - proxy = WorkerProxy(workers=workers) + proxy = WorkerProxy(workers=workers, lazy=False) # Act & assert async with proxy as p: @@ -1860,14 +2052,14 @@ async def test_proxy_with_pool_uri(self): """Test it starts and stops correctly. Given: - A WorkerProxy configured with a pool URI + A non-lazy WorkerProxy configured with a pool URI When: The proxy is used as a context manager Then: It starts and stops correctly """ # Arrange - proxy = WorkerProxy("test://pool") + proxy = WorkerProxy("test://pool", lazy=False) # Act & assert async with proxy as p: @@ -1902,7 +2094,7 @@ async def test_workers_property_returns_workers_list( await mock_discovery_service.start() mock_discovery_service.inject_worker_added(metadata) - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act async with proxy: @@ -1996,7 +2188,9 @@ async def test_cloudpickle_serialization_with_services(self): # Arrange - Use real objects instead of mocks for cloudpickle test discovery_service = LocalDiscovery("test-pool").subscriber proxy = WorkerProxy( - discovery=discovery_service, loadbalancer=wp.RoundRobinLoadBalancer + discovery=discovery_service, + loadbalancer=wp.RoundRobinLoadBalancer, + lazy=False, ) # Act & assert @@ -2018,7 +2212,7 @@ async def test_cloudpickle_serialization_discovery_only(self): in an unstarted state. Given: - A started WorkerProxy with only discovery service + A non-lazy started WorkerProxy with only discovery service When: Cloudpickle serialization and deserialization are performed within context @@ -2028,7 +2222,7 @@ async def test_cloudpickle_serialization_discovery_only(self): """ # Arrange - Use real objects instead of mocks for cloudpickle test discovery_service = LocalDiscovery("test-pool").subscriber - proxy = WorkerProxy(discovery=discovery_service) + proxy = WorkerProxy(discovery=discovery_service, lazy=False) # Act & assert async with proxy: @@ -2049,7 +2243,7 @@ async def test_cloudpickle_serialization_uri_preserves_id(self): in an unstarted state and preserved ID. Given: - A started WorkerProxy created with only a URI + A non-lazy started WorkerProxy created with only a URI When: Cloudpickle serialization and deserialization are performed within context @@ -2058,7 +2252,7 @@ async def test_cloudpickle_serialization_uri_preserves_id(self): proxy in an unstarted state and preserved ID """ # Arrange - Use real objects - this creates a LocalDiscovery internally - proxy = WorkerProxy("pool-1") + proxy = WorkerProxy("pool-1", lazy=False) # Act & assert async with proxy: @@ -2189,6 +2383,7 @@ async def test_cloudpickle_serialization_resolves_credentials_from_context( # static workers — default credentials resolve from ContextVar. restored_proxy = WorkerProxy( workers=[secure_worker, insecure_worker], + lazy=False, ) # Assert — resolved credentials from ContextVar: only secure workers @@ -2324,7 +2519,8 @@ async def test_explicit_credentials_parameter_overrides_contextvar( Given: A WorkerCredentials context is active with mTLS credentials - and a mix of secure and insecure static workers. + and a non-lazy WorkerProxy with a mix of secure and insecure + static workers. When: WorkerProxy is created with explicit None credentials. Then: @@ -2353,6 +2549,7 @@ async def test_explicit_credentials_parameter_overrides_contextvar( proxy = WorkerProxy( workers=[secure_worker, insecure_worker], credentials=None, + lazy=False, ) await proxy.start() @@ -2370,8 +2567,9 @@ async def test_credentials_default_resolves_from_contextvar( """Test default credentials resolves from ContextVar. Given: - A WorkerCredentials context is active and a mix of secure - and insecure static workers. + A WorkerCredentials context is active and a non-lazy + WorkerProxy with a mix of secure and insecure static + workers. When: WorkerProxy is created without explicit credentials. Then: @@ -2399,6 +2597,7 @@ async def test_credentials_default_resolves_from_contextvar( with CredentialContext(worker_credentials): proxy = WorkerProxy( workers=[secure_worker, insecure_worker], + lazy=False, ) await proxy.start() @@ -2416,8 +2615,9 @@ async def test_credentials_none_without_contextvar( """Test credentials resolve to None when no context is set. Given: - No WorkerCredentials context is active and a mix of secure - and insecure static workers. + No WorkerCredentials context is active and a non-lazy + WorkerProxy with a mix of secure and insecure static + workers. When: WorkerProxy is created without explicit credentials. Then: @@ -2444,6 +2644,7 @@ async def test_credentials_none_without_contextvar( # Act proxy = WorkerProxy( workers=[secure_worker, insecure_worker], + lazy=False, ) await proxy.start() @@ -2461,7 +2662,8 @@ async def test_start_invalid_loadbalancer_type_raises_error( """Test raise ValueError. Given: - A WorkerProxy with a loadbalancer that doesn't implement LoadBalancerLike + A non-lazy WorkerProxy with a loadbalancer that doesn't + implement LoadBalancerLike When: Start() is called Then: @@ -2475,7 +2677,9 @@ class NotALoadBalancer: invalid_loadbalancer = NotALoadBalancer() proxy = WorkerProxy( - pool_uri="test-pool", loadbalancer=lambda: invalid_loadbalancer + pool_uri="test-pool", + loadbalancer=lambda: invalid_loadbalancer, + lazy=False, ) # Act & assert @@ -2489,7 +2693,8 @@ async def test_start_invalid_discovery_type_raises_error( """Test raise ValueError. Given: - A WorkerProxy with a discovery that doesn't implement AsyncIterator + A non-lazy WorkerProxy with a discovery that doesn't + implement AsyncIterator When: Start() is called Then: @@ -2498,7 +2703,7 @@ async def test_start_invalid_discovery_type_raises_error( # Arrange - use a simple string which is definitely not an AsyncIterator invalid_discovery = "not_an_async_iterator" - proxy = WorkerProxy(discovery=lambda: invalid_discovery) + proxy = WorkerProxy(discovery=lambda: invalid_discovery, lazy=False) # Act & assert with pytest.raises(ValueError): @@ -2511,8 +2716,8 @@ async def test_security_filter_with_credentials( """Test only secure workers are discovered with credentials. Given: - A WorkerProxy instantiated with credentials and a mix - of secure and insecure static workers. + A non-lazy WorkerProxy instantiated with credentials and a + mix of secure and insecure static workers. When: The proxy is started and discovery events are processed. Then: @@ -2538,6 +2743,7 @@ async def test_security_filter_with_credentials( proxy = WorkerProxy( workers=[secure_worker, insecure_worker], credentials=worker_credentials, + lazy=False, ) # Act From e1449fd7761989f77617e1d7f5cf12bd628eb256 Mon Sep 17 00:00:00 2001 From: Conrad Date: Mon, 30 Mar 2026 17:35:15 -0400 Subject: [PATCH 05/12] docs: Document lazy proxy startup and flag propagation Cover the lazy parameter in three places: the main README's worker pools section, the worker README's nested routines section, and a new "Lazy startup" subsection under connections. All three explain that WorkerPool propagates lazy to WorkerProxy, and that task serialization carries the flag to worker subprocesses. --- wool/README.md | 8 ++++++++ wool/src/wool/runtime/worker/README.md | 15 ++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/wool/README.md b/wool/README.md index 577de45..bfc2983 100644 --- a/wool/README.md +++ b/wool/README.md @@ -156,6 +156,14 @@ async with wool.WorkerPool(discovery=wool.LanDiscovery(), lease=10): result = await my_routine() ``` +`lazy` controls whether the pool's internal `WorkerProxy` defers startup until the first task is dispatched. Defaults to `True`. The pool propagates this flag to every `WorkerProxy` it constructs, and each task serializes the proxy (including the flag) so that workers receiving the task inherit the same laziness setting. With `lazy=True`, worker subprocesses that never invoke nested `@wool.routine` calls avoid the cost of discovery subscription and sentinel setup entirely. Set `lazy=False` to start proxies eagerly — useful when you want connections established before the first dispatch. + +```python +# Eager proxy startup — connections established before first dispatch +async with wool.WorkerPool(spawn=4, lazy=False): + result = await my_routine() +``` + ## Workers A worker is a separate OS process hosting a gRPC server with two RPCs: `dispatch` (bidirectional streaming for task execution) and `stop` (graceful shutdown). Tasks execute on a dedicated asyncio event loop in a separate daemon thread, so that long-running or CPU-intensive task code does not block the main gRPC event loop. This keeps the worker responsive to new dispatches, stop requests, and concurrent streaming interactions with in-flight tasks. diff --git a/wool/src/wool/runtime/worker/README.md b/wool/src/wool/runtime/worker/README.md index 999e43c..1715ce7 100644 --- a/wool/src/wool/runtime/worker/README.md +++ b/wool/src/wool/runtime/worker/README.md @@ -193,7 +193,9 @@ Signal handlers map `SIGTERM` to timeout 0 (cancel immediately) and `SIGINT` to ### Nested routines -Worker subprocesses can dispatch tasks to other workers. Each subprocess is configured with a `ResourcePool` of `WorkerProxy` instances (via `wool.__proxy_pool__`), so `@wool.routine` calls within a task transparently route to the target pool. Spinning up a `WorkerProxy` is not free — it involves establishing a discovery subscription and opening gRPC connections — so the resource pool caches proxies with a configurable TTL (default 60 seconds, set via `proxy_pool_ttl` on `LocalWorker`). If the interval between dispatches for a given pool on a given worker is shorter than the TTL, the cached proxy is reused. If it exceeds the TTL, the proxy is finalized and must be recreated on the next dispatch. Tuning `proxy_pool_ttl` above the expected dispatch interval keeps proxies warm and avoids this cold-start overhead. +Worker subprocesses can dispatch tasks to other workers. Each subprocess is configured with a `ResourcePool` of `WorkerProxy` instances (via `wool.__proxy_pool__`), so `@wool.routine` calls within a task transparently route to the target pool. Spinning up a `WorkerProxy` is not free — it involves establishing a discovery subscription, starting a sentinel task, and opening gRPC connections — so the resource pool caches proxies with a configurable TTL (default 60 seconds, set via `proxy_pool_ttl` on `LocalWorker`). If the interval between dispatches for a given pool on a given worker is shorter than the TTL, the cached proxy is reused. If it exceeds the TTL, the proxy is finalized and must be recreated on the next dispatch. Tuning `proxy_pool_ttl` above the expected dispatch interval keeps proxies warm and avoids this cold-start overhead. + +Proxies on worker subprocesses are lazy by default — the `WorkerPool` propagates its `lazy` flag to every `WorkerProxy` it constructs, and each task serializes the proxy (including the flag) so that workers receiving the task inherit the same laziness setting. A lazy proxy defers discovery subscription and sentinel setup until its first `dispatch()` call, so workers that never invoke nested routines pay no startup cost. ## Connections @@ -207,6 +209,17 @@ Worker subprocesses can dispatch tasks to other workers. Each subprocess is conf | Discovery | `discovery` | Accepts any `DiscoverySubscriberLike` or `Factory` thereof. | | Static | `workers` | Takes a sequence of `WorkerMetadata` directly — no discovery needed. | +### Lazy startup + +`WorkerProxy` accepts a `lazy` parameter (default `True`) that controls when the proxy actually starts — i.e., when it subscribes to discovery, launches the worker sentinel task, and initializes the load balancer context. + +| `lazy` | `start()` / `__aenter__` | `dispatch()` | `stop()` on un-started proxy | +| ------ | ------------------------ | ------------- | ---------------------------- | +| `True` | No-op | Starts the proxy on first call, then dispatches | No-op (safe to call) | +| `False` | Starts eagerly | Raises `RuntimeError` if not started | Raises `RuntimeError` | + +When `lazy=True`, concurrent `dispatch()` calls use a double-checked lock to ensure the proxy starts exactly once. The `lazy` flag is preserved through `cloudpickle` serialization, so proxies sent to worker subprocesses as part of a task retain their laziness setting. + ### Self-describing connections Workers are self-describing: each worker advertises its gRPC transport configuration via `ChannelOptions` in its `WorkerMetadata`. When a client discovers a worker, it reads the advertised options and configures its channel to match — message sizes, keepalive intervals, concurrency limits, and compression are all set automatically. There is no separate client-side configuration step; the worker's metadata is the single source of truth for how to connect to it. From 9fd138e9a85439365c0aa4247c7134df8c5c7516 Mon Sep 17 00:00:00 2001 From: Conrad Date: Mon, 30 Mar 2026 23:45:31 -0400 Subject: [PATCH 06/12] refactor: Split WorkerProxy into enter/exit and start/stop layers The old start() conflated context-variable setup with resource acquisition, making lazy behavior hard to reason about. Split into enter()/exit() for context management and start()/stop() for the actual discovery, load-balancer, and sentinel lifecycle. --- wool/src/wool/runtime/worker/README.md | 8 +-- wool/src/wool/runtime/worker/process.py | 18 +++--- wool/src/wool/runtime/worker/proxy.py | 80 +++++++++++++++---------- 3 files changed, 62 insertions(+), 44 deletions(-) diff --git a/wool/src/wool/runtime/worker/README.md b/wool/src/wool/runtime/worker/README.md index 1715ce7..c4a4d80 100644 --- a/wool/src/wool/runtime/worker/README.md +++ b/wool/src/wool/runtime/worker/README.md @@ -213,10 +213,10 @@ Proxies on worker subprocesses are lazy by default — the `WorkerPool` propagat `WorkerProxy` accepts a `lazy` parameter (default `True`) that controls when the proxy actually starts — i.e., when it subscribes to discovery, launches the worker sentinel task, and initializes the load balancer context. -| `lazy` | `start()` / `__aenter__` | `dispatch()` | `stop()` on un-started proxy | -| ------ | ------------------------ | ------------- | ---------------------------- | -| `True` | No-op | Starts the proxy on first call, then dispatches | No-op (safe to call) | -| `False` | Starts eagerly | Raises `RuntimeError` if not started | Raises `RuntimeError` | +| `lazy` | `enter()` / `__aenter__` | `dispatch()` | `exit()` on un-started proxy | +| ------ | ------------------------ | ------------- | ----------------------------- | +| `True` | Sets context var only | Calls `start()` on first call, then dispatches | No-op (safe to call) | +| `False` | Sets context var, calls `start()` | Raises `RuntimeError` if not started | Raises `RuntimeError` | When `lazy=True`, concurrent `dispatch()` calls use a double-checked lock to ensure the proxy starts exactly once. The `lazy` flag is preserved through `cloudpickle` serialization, so proxies sent to worker subprocesses as part of a task retain their laziness setting. diff --git a/wool/src/wool/runtime/worker/process.py b/wool/src/wool/runtime/worker/process.py index a2a08d1..4f12cb8 100644 --- a/wool/src/wool/runtime/worker/process.py +++ b/wool/src/wool/runtime/worker/process.py @@ -390,30 +390,30 @@ def _sigint_handler(loop, service, signum, frame): async def _proxy_factory(proxy: WorkerProxy): """Factory function for WorkerProxy instances in ResourcePool. - Calls ``start()`` on the proxy. Lazy proxies treat this as a - no-op and defer startup until first dispatch. Non-lazy proxies - start eagerly. The proxy object itself is used as the cache key. + Calls ``enter()`` on the proxy. Lazy proxies defer actual + startup until first dispatch; non-lazy proxies start eagerly. + The proxy object itself is used as the cache key. :param proxy: The WorkerProxy instance (passed as key from ResourcePool). :returns: - The started (or lazy) WorkerProxy instance. + The entered WorkerProxy instance. """ - await proxy.start() + await proxy.enter() return proxy async def _proxy_finalizer(proxy: WorkerProxy): """Finalizer function for WorkerProxy instances in ResourcePool. - Stops the proxy when it's being cleaned up from the resource pool. - Lazy proxies that were never started are handled gracefully by - the proxy's own stop method. + Exits the proxy context when it's being cleaned up from the + resource pool. Lazy proxies that were never started are handled + gracefully by the proxy's own exit method. :param proxy: The WorkerProxy instance to clean up. """ try: - await proxy.stop() + await proxy.exit() except Exception: pass diff --git a/wool/src/wool/runtime/worker/proxy.py b/wool/src/wool/runtime/worker/proxy.py index 43ba196..7be103e 100644 --- a/wool/src/wool/runtime/worker/proxy.py +++ b/wool/src/wool/runtime/worker/proxy.py @@ -331,13 +331,13 @@ def combined_filter(w): self._loadbalancer_context: LoadBalancerContext | None = None async def __aenter__(self): - """Starts the proxy and sets it as the active context.""" - await self.start() + """Enters the proxy context and sets it as the active proxy.""" + await self.enter() return self async def __aexit__(self, *args): - """Stops the proxy and resets the active context.""" - await self.stop(*args) + """Exits the proxy context and resets the active proxy.""" + await self.exit(*args) def __hash__(self) -> int: return hash(str(self.id)) @@ -413,13 +413,13 @@ def workers(self) -> list[WorkerMetadata]: else: return [] - async def start(self) -> None: - """Starts the proxy by initiating the worker discovery process. + async def enter(self) -> None: + """Enter the proxy context. - Always sets this proxy as the active context variable. When - ``lazy=True``, defers discovery and load-balancer startup - until :meth:`dispatch` is first called. When ``lazy=False``, - starts eagerly. + Sets this proxy as the active context variable. When + ``lazy=True``, defers resource acquisition until + :meth:`dispatch` is first called. When ``lazy=False``, + calls :meth:`start` eagerly. :raises RuntimeError: If the proxy has already been started and ``lazy`` is @@ -428,9 +428,17 @@ async def start(self) -> None: self._proxy_token = wool.__proxy__.set(self) if self._lazy: return - await self._start() + await self.start() + + async def start(self) -> None: + """Start the proxy by initiating discovery and load balancing. + + Subscribes to worker discovery, initializes the load-balancer + context, and launches the worker sentinel task. - async def _start(self) -> None: + :raises RuntimeError: + If the proxy has already been started. + """ if self._started: raise RuntimeError("Proxy already started") @@ -452,12 +460,12 @@ async def _start(self) -> None: self._sentinel_task = asyncio.create_task(self._worker_sentinel()) self._started = True - async def stop(self, *args) -> None: - """Stops the proxy, terminating discovery and clearing connections. + async def exit(self, *args) -> None: + """Exit the proxy context. - When ``lazy=True``, calling stop on an un-started proxy resets - only the context variable. When ``lazy=False``, raises - :class:`RuntimeError`. + Resets the context variable. If the proxy was started, + delegates to :meth:`stop` to release resources. Calling + ``exit()`` on an un-started lazy proxy is a safe no-op. :raises RuntimeError: If the proxy was not started first and ``lazy`` is @@ -470,19 +478,29 @@ async def stop(self, *args) -> None: if not self._lazy: raise RuntimeError("Proxy not started - call start() first") return - else: - await self._exit_context(self._discovery_context_manager, *args) - await self._exit_context(self._loadbalancer_context_manager, *args) - - if self._sentinel_task: - self._sentinel_task.cancel() - try: - await self._sentinel_task - except asyncio.CancelledError: - pass - self._sentinel_task = None - self._loadbalancer_context = None - self._started = False + await self.stop(*args) + + async def stop(self, *args) -> None: + """Stop the proxy, terminating discovery and clearing connections. + + :raises RuntimeError: + If the proxy was not started first. + """ + if not self._started: + raise RuntimeError("Proxy not started - call start() first") + + await self._exit_context(self._discovery_context_manager, *args) + await self._exit_context(self._loadbalancer_context_manager, *args) + + if self._sentinel_task: + self._sentinel_task.cancel() + try: + await self._sentinel_task + except asyncio.CancelledError: + pass + self._sentinel_task = None + self._loadbalancer_context = None + self._started = False async def dispatch( self, task: Task, *, timeout: float | None = None @@ -511,7 +529,7 @@ async def dispatch( assert self._start_lock is not None async with self._start_lock: if not self._started: - await self._start() + await self.start() await asyncio.wait_for(self._await_workers(), 60) From 566431fc5d0cba0c2ded129f1d8e5681f2b67be9 Mon Sep 17 00:00:00 2001 From: Conrad Date: Mon, 30 Mar 2026 23:45:38 -0400 Subject: [PATCH 07/12] test: Update proxy tests for enter/exit and start/stop split --- wool/tests/runtime/worker/test_process.py | 69 ------------ wool/tests/runtime/worker/test_proxy.py | 127 ++++++++++++++++++---- 2 files changed, 107 insertions(+), 89 deletions(-) diff --git a/wool/tests/runtime/worker/test_process.py b/wool/tests/runtime/worker/test_process.py index 0e8b6b3..98cf880 100644 --- a/wool/tests/runtime/worker/test_process.py +++ b/wool/tests/runtime/worker/test_process.py @@ -18,8 +18,6 @@ from wool.runtime.worker.base import WorkerOptions from wool.runtime.worker.metadata import WorkerMetadata from wool.runtime.worker.process import WorkerProcess -from wool.runtime.worker.process import _proxy_factory -from wool.runtime.worker.process import _proxy_finalizer from wool.runtime.worker.process import _sigint_handler from wool.runtime.worker.process import _signal_handlers from wool.runtime.worker.process import _sigterm_handler @@ -215,73 +213,6 @@ def mock_signal(sig, handler): assert signal_calls[3] == (signal.SIGINT, old_sigint) -@pytest.mark.asyncio -async def test__proxy_factory_with_proxy(mocker): - """Test _proxy_factory calls start and returns the proxy. - - Given: - A WorkerProxy instance. - When: - _proxy_factory() is called. - Then: - It should call start() and return the proxy. - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.start = mocker.AsyncMock() - - # Act - result = await _proxy_factory(mock_proxy) - - # Assert - mock_proxy.start.assert_called_once() - assert result is mock_proxy - - -@pytest.mark.asyncio -async def test__proxy_finalizer_with_proxy(mocker): - """Test _proxy_finalizer calls stop on the proxy. - - Given: - A proxy that stops successfully. - When: - _proxy_finalizer() is called. - Then: - It should call proxy.stop(). - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.stop = mocker.AsyncMock() - - # Act - await _proxy_finalizer(mock_proxy) - - # Assert - mock_proxy.stop.assert_called_once() - - -@pytest.mark.asyncio -async def test__proxy_finalizer_with_stop_exception(mocker): - """Test _proxy_finalizer handles exception from proxy.stop() gracefully. - - Given: - A proxy that raises an exception on stop. - When: - _proxy_finalizer() is called. - Then: - It should catch the exception and complete without propagating it. - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.stop = mocker.AsyncMock(side_effect=Exception("Stop failed")) - - # Act — should not raise exception - await _proxy_finalizer(mock_proxy) - - # Assert - mock_proxy.stop.assert_called_once() - - class TestWorkerProcess: """Test suite for WorkerProcess.""" diff --git a/wool/tests/runtime/worker/test_proxy.py b/wool/tests/runtime/worker/test_proxy.py index 5601e54..3f51a4a 100644 --- a/wool/tests/runtime/worker/test_proxy.py +++ b/wool/tests/runtime/worker/test_proxy.py @@ -756,13 +756,13 @@ async def test_start_sets_started_flag( assert proxy.started @pytest.mark.asyncio - async def test_start_with_lazy_proxy(self, mock_discovery_service): - """Test start is a no-op for lazy proxies. + async def test_enter_with_lazy_proxy(self, mock_discovery_service): + """Test enter defers startup for lazy proxies. Given: - A lazy WorkerProxy that has not been started. + A lazy WorkerProxy that has not been entered. When: - start() is called. + enter() is called. Then: It should remain un-started. """ @@ -770,24 +770,46 @@ async def test_start_with_lazy_proxy(self, mock_discovery_service): proxy = WorkerProxy(discovery=mock_discovery_service) # Act - await proxy.start() + await proxy.enter() # Assert assert not proxy.started + @pytest.mark.asyncio + async def test_enter_with_non_lazy_proxy( + self, mock_discovery_service, mock_proxy_session + ): + """Test enter eagerly starts a non-lazy proxy. + + Given: + A non-lazy WorkerProxy that has not been entered. + When: + enter() is called. + Then: + It should set started to True. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) + + # Act + await proxy.enter() + + # Assert + assert proxy.started + @pytest.mark.asyncio async def test_stop_clears_state(self, mock_discovery_service, mock_proxy_session): """Test clear workers and reset the started flag to False. Given: - A started WorkerProxy with registered workers + A started WorkerProxy with registered workers. When: - Stop is called + stop() is called. Then: - It should clear workers and reset the started flag to False + It should clear workers and reset the started flag to False. """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) await proxy.start() # Act @@ -819,31 +841,31 @@ async def test_start_already_started_raises_error( await proxy.start() @pytest.mark.asyncio - async def test_stop_not_started_raises_error(self, mock_discovery_service): + async def test_exit_not_started_raises_error(self, mock_discovery_service): """Test raise RuntimeError. Given: - A non-lazy WorkerProxy that is not started + A non-lazy WorkerProxy that is not started. When: - Stop is called + exit() is called. Then: - It should raise RuntimeError + It should raise RuntimeError. """ # Arrange proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act & assert with pytest.raises(RuntimeError, match="Proxy not started"): - await proxy.stop() + await proxy.exit() @pytest.mark.asyncio - async def test_stop_with_unstarted_lazy_proxy(self, mock_discovery_service): - """Test stop is a no-op on an un-started lazy proxy. + async def test_exit_with_unstarted_lazy_proxy(self, mock_discovery_service): + """Test exit is a no-op on an un-started lazy proxy. Given: A lazy WorkerProxy that was never started. When: - stop() is called. + exit() is called. Then: It should return without raising. """ @@ -851,7 +873,32 @@ async def test_stop_with_unstarted_lazy_proxy(self, mock_discovery_service): proxy = WorkerProxy(discovery=mock_discovery_service) # Act & assert — should not raise - await proxy.stop() + await proxy.exit() + + @pytest.mark.asyncio + async def test_exit_stops_started_lazy_proxy( + self, mock_discovery_service, mock_proxy_session + ): + """Test exit stops a lazy proxy that was started. + + Given: + A lazy WorkerProxy that was entered and subsequently started. + When: + exit() is called. + Then: + It should stop the proxy and set started to False. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + await proxy.enter() + await proxy.start() + assert proxy.started + + # Act + await proxy.exit() + + # Assert + assert not proxy.started @pytest.mark.asyncio async def test___aenter___enter_starts_proxy( @@ -1637,8 +1684,8 @@ async def test_cloudpickle_serialization_preserves_lease( assert len(restored.workers) == 2 await restored.stop() - def test_cloudpickle_serialization_preserves_lazy(self, mock_discovery_service): - """Test pickle round-trip preserves the lazy flag. + def test_cloudpickle_serialization_with_lazy_false(self, mock_discovery_service): + """Test pickle round-trip with explicit lazy=False. Given: A WorkerProxy with lazy=False. @@ -1660,6 +1707,46 @@ def test_cloudpickle_serialization_preserves_lazy(self, mock_discovery_service): # Assert assert restored.lazy is False + def test_cloudpickle_serialization_with_default_lazy(self, mock_discovery_service): + """Test pickle round-trip with default lazy=True. + + Given: + A WorkerProxy with default lazy=True. + When: + The proxy is pickled and unpickled. + Then: + It should preserve lazy as True on the restored proxy. + """ + # Arrange + proxy = WorkerProxy( + discovery=mock_discovery_service, + loadbalancer=wp.RoundRobinLoadBalancer, + ) + + # Act + restored = cloudpickle.loads(cloudpickle.dumps(proxy)) + + # Assert + assert restored.lazy is True + + @given(lazy=st.booleans()) + @settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) + def test___init___with_arbitrary_lazy_value(self, mock_discovery_service, lazy): + """Test instantiation with an arbitrary lazy value. + + Given: + An arbitrary boolean value for lazy. + When: + WorkerProxy is instantiated with that value. + Then: + It should set the lazy property to the given value. + """ + # Act + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=lazy) + + # Assert + assert proxy.lazy is lazy + @pytest.mark.asyncio async def test_dispatch_delegates_to_loadbalancer( self, From f2468e4a7b2d079bafdf9578a079a30911a8ac3b Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 31 Mar 2026 19:12:53 -0400 Subject: [PATCH 08/12] feat: Add noreentry descriptor for single-use method guards MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a generic @noreentry descriptor class that prevents instance methods from being invoked more than once. Guard state is tracked via a weakref.WeakSet on the descriptor instance — when a method is called on an instance, that instance is added to the set. Any subsequent call raises RuntimeError. This approach keeps instance namespaces clean and auto-cleans references when instances are garbage collected. The descriptor works with both sync and async methods. Attempting to use it on bare functions or call it directly raises TypeError at runtime and is flagged as a static error via the Never return annotation on __call__. --- wool/src/wool/utilities/noreentry.py | 107 +++++++++++ wool/tests/utilities/test_noreentry.py | 239 +++++++++++++++++++++++++ 2 files changed, 346 insertions(+) create mode 100644 wool/src/wool/utilities/noreentry.py create mode 100644 wool/tests/utilities/test_noreentry.py diff --git a/wool/src/wool/utilities/noreentry.py b/wool/src/wool/utilities/noreentry.py new file mode 100644 index 0000000..36c95ef --- /dev/null +++ b/wool/src/wool/utilities/noreentry.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import asyncio.coroutines +import functools +import inspect +import sys +import weakref +from typing import Never + + +class _Token: + """Hashable, weakly-referenceable token for instance tracking.""" + + pass + + +class NoReentryBoundMethod: + """Descriptor implementing single-use guard for bound methods. + + On the first call the decorated method executes normally. Any + subsequent call raises :class:`RuntimeError`. + + Guard state uses a per-instance token stored on the instance under + ``__noreentry_token__``. The token is unique, hashable, and tied to the + instance's lifetime. The descriptor tracks tokens in a WeakSet to auto-clean + when instances are garbage collected. + + Works with both synchronous and asynchronous methods. Only supports + bound methods; using @noreentry on bare functions raises TypeError. + """ + + def __init__(self, fn, /): + functools.update_wrapper(self, fn) + if inspect.iscoroutinefunction(fn): + if sys.version_info >= (3, 12): + inspect.markcoroutinefunction(self) + else: + self._is_coroutine = asyncio.coroutines._is_coroutine + self._fn = fn + self._invocations = weakref.WeakSet() + + def __call__(self, *args, **kwargs) -> Never: + raise TypeError("@noreentry only decorates methods, not bare functions") + + def __get__(self, obj, objtype=None): + if obj is None: + return self + + # Cache the wrapper on the instance to avoid recreating it on every access. + cache_key = f"__noreentry_wrapper_{id(self)}__" + return obj.__dict__.setdefault(cache_key, self._make_wrapper(obj)) + + def _make_wrapper(self, obj): + """Create the bound wrapper for an instance.""" + guard = self._guard + fn = self._fn + if inspect.iscoroutinefunction(fn): + + @functools.wraps(fn) + async def async_wrapper(*args, **kwargs): + guard(obj) + return await fn(obj, *args, **kwargs) + + return async_wrapper + else: + + @functools.wraps(fn) + def sync_wrapper(*args, **kwargs): + guard(obj) + return fn(obj, *args, **kwargs) + + return sync_wrapper + + def _guard(self, obj): + """Check and record invocation on the specified object.""" + # Get or create a unique token for this object. + token = obj.__dict__.setdefault("__noreentry_token__", _Token()) + + # Check if this descriptor was invoked on this object already. + if token in self._invocations: + raise RuntimeError( + f"'{self._fn.__qualname__}' cannot be invoked more than once" + ) + + # Track invocation. + self._invocations.add(token) + + +def noreentry(fn): + """Mark a method as single-use. + + On the first call the decorated method executes normally. Any + subsequent call raises :class:`RuntimeError`. + + Guard state uses a per-instance token stored on the instance under + ``__noreentry_token__``. The token is unique, hashable, and tied to the + instance's lifetime. The descriptor tracks tokens in a :class:`WeakSet` + to auto-clean when instances are garbage collected. + + Works with both synchronous and asynchronous methods. Only supports + bound methods; using @noreentry on bare functions raises :class:`TypeError` + on the first invocation. + + :param fn: + The bound instance or class method to decorate. + """ + return NoReentryBoundMethod(fn) diff --git a/wool/tests/utilities/test_noreentry.py b/wool/tests/utilities/test_noreentry.py new file mode 100644 index 0000000..1e0b08c --- /dev/null +++ b/wool/tests/utilities/test_noreentry.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import asyncio + +import pytest + +from wool.utilities.noreentry import noreentry + + +# Test helpers (not fixtures) +class _SyncDummy: + """Class with a sync @noreentry method.""" + + @noreentry + def run(self): + return "ok" + + +class _AsyncDummy: + """Class with an async @noreentry method.""" + + @noreentry + async def run(self): + return "ok" + + +class _MultiMethodDummy: + """Class with multiple @noreentry methods.""" + + @noreentry + def first(self): + return "first" + + @noreentry + def second(self): + return "second" + + +class TestNoReentry: + """Tests for the noreentry decorator.""" + + def test_noreentry_sync_method_first_invocation(self): + """Test sync method executes normally on first invocation. + + Given: + A class with a sync @noreentry method. + When: + The method is called for the first time. + Then: + It should return normally. + """ + # Arrange + obj = _SyncDummy() + + # Act + result = obj.run() + + # Assert + assert result == "ok" + + def test_noreentry_sync_method_second_invocation_raises(self): + """Test sync method raises RuntimeError on second invocation. + + Given: + A class with a sync @noreentry method called once. + When: + The method is called a second time on the same instance. + Then: + It should raise RuntimeError. + """ + # Arrange + obj = _SyncDummy() + obj.run() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + obj.run() + + @pytest.mark.asyncio + async def test_noreentry_async_method_first_invocation(self): + """Test async method executes normally on first invocation. + + Given: + A class with an async @noreentry method. + When: + The method is awaited for the first time. + Then: + It should return normally. + """ + # Arrange + obj = _AsyncDummy() + + # Act + result = await obj.run() + + # Assert + assert result == "ok" + + @pytest.mark.asyncio + async def test_noreentry_async_method_second_invocation_raises(self): + """Test async method raises RuntimeError on second invocation. + + Given: + A class with an async @noreentry method awaited once. + When: + The method is awaited a second time on the same instance. + Then: + It should raise RuntimeError. + """ + # Arrange + obj = _AsyncDummy() + await obj.run() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + await obj.run() + + def test_noreentry_separate_instances_independent(self): + """Test instances track guard state independently. + + Given: + Two instances of a class with a @noreentry method. + When: + The method is called on the first instance. + Then: + The method remains callable on the second instance. + """ + # Arrange + a = _SyncDummy() + b = _SyncDummy() + a.run() + + # Act + result = b.run() + + # Assert + assert result == "ok" + + def test_noreentry_error_message_qualname(self): + """Test RuntimeError includes the method's qualified name. + + Given: + A class with a @noreentry method called once. + When: + The method is called a second time. + Then: + The RuntimeError message should include the method's __qualname__. + """ + # Arrange + obj = _SyncDummy() + obj.run() + + # Act & assert + with pytest.raises(RuntimeError, match="_SyncDummy.run"): + obj.run() + + def test_noreentry_preserves_coroutinefunction_check(self): + """Test decorator preserves async function detection. + + Given: + A class with an async @noreentry method. + When: + asyncio.iscoroutinefunction is called on the method. + Then: + It should return True. + """ + # Act & assert + assert asyncio.iscoroutinefunction(_AsyncDummy.run) + + def test_noreentry_preserves_wrapped_function_name(self): + """Test decorator preserves the original function name. + + Given: + A class with a @noreentry method. + When: + The decorated method's __name__ is inspected. + Then: + It should equal the original function name. + """ + # Act & assert + assert _SyncDummy.run.__name__ == "run" + + def test_noreentry_multiple_methods_independent(self): + """Test guard on one method does not affect other methods. + + Given: + A class with two @noreentry methods where the first + has been guarded (called twice). + When: + The second method is called. + Then: + The second method executes normally. + """ + # Arrange + obj = _MultiMethodDummy() + obj.first() + with pytest.raises(RuntimeError): + obj.first() + + # Act + result = obj.second() + + # Assert + assert result == "second" + + def test_noreentry_unbound_access_returns_descriptor(self): + """Test accessing decorated method via class returns descriptor. + + Given: + A class with a @noreentry method. + When: + The method is accessed through the class (unbound). + Then: + It should return the descriptor itself. + """ + # Act + unbound = _SyncDummy.run + + # Assert + assert unbound is not None + + def test_noreentry_bare_function_raises_error(self): + """Test decorator rejects application to bare functions. + + Given: + A @noreentry-decorated function called without a bound instance. + When: + The descriptor is called directly. + Then: + It should raise TypeError. + """ + # Arrange + unbound = _SyncDummy.run + + # Act & assert + with pytest.raises( + TypeError, match="only decorates methods, not bare functions" + ): + unbound() From 79b3de277c8bf3463c8193119fb3ddd5af9f7175 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 31 Mar 2026 19:12:56 -0400 Subject: [PATCH 09/12] refactor: Guard WorkerProxy and WorkerPool against reentrant context usage Apply the @noreentry decorator to WorkerProxy.enter() and WorkerPool.__aenter__() to enforce single-use semantics. Both context managers now reject any attempt to re-enter (whether reentrant or after a full enter/exit cycle) with RuntimeError. Update docstrings to document the single-use contract and explain that users must create a new instance for a new context. --- wool/src/wool/runtime/worker/pool.py | 6 ++++++ wool/src/wool/runtime/worker/proxy.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/wool/src/wool/runtime/worker/pool.py b/wool/src/wool/runtime/worker/pool.py index d42d893..a5ced29 100644 --- a/wool/src/wool/runtime/worker/pool.py +++ b/wool/src/wool/runtime/worker/pool.py @@ -26,6 +26,7 @@ from wool.runtime.worker.proxy import LoadBalancerLike from wool.runtime.worker.proxy import RoundRobinLoadBalancer from wool.runtime.worker.proxy import WorkerProxy +from wool.utilities.noreentry import noreentry # public @@ -397,6 +398,7 @@ async def create_proxy(): self._proxy_factory = create_proxy + @noreentry async def __aenter__(self) -> WorkerPool: """Starts the worker pool and its services, returning a session. @@ -405,6 +407,10 @@ async def __aenter__(self) -> WorkerPool: :returns: The :class:`WorkerPool` instance itself for method chaining. + :raises RuntimeError: + If the pool has already been entered. ``WorkerPool`` + contexts are single-use — create a new instance instead + of re-entering. """ self._proxy_context = self._proxy_factory() await self._proxy_context.__aenter__() diff --git a/wool/src/wool/runtime/worker/proxy.py b/wool/src/wool/runtime/worker/proxy.py index 7be103e..a92636c 100644 --- a/wool/src/wool/runtime/worker/proxy.py +++ b/wool/src/wool/runtime/worker/proxy.py @@ -33,6 +33,7 @@ from wool.runtime.worker.auth import WorkerCredentials from wool.runtime.worker.connection import WorkerConnection from wool.runtime.worker.metadata import WorkerMetadata +from wool.utilities.noreentry import noreentry if TYPE_CHECKING: from contextvars import Token @@ -413,6 +414,7 @@ def workers(self) -> list[WorkerMetadata]: else: return [] + @noreentry async def enter(self) -> None: """Enter the proxy context. @@ -421,6 +423,10 @@ async def enter(self) -> None: :meth:`dispatch` is first called. When ``lazy=False``, calls :meth:`start` eagerly. + :raises RuntimeError: + If the proxy has already been entered. ``WorkerProxy`` + contexts are single-use — create a new instance instead + of re-entering. :raises RuntimeError: If the proxy has already been started and ``lazy`` is ``False``. From 1ad077521c6a92e74702543ee2339af5de199721 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 31 Mar 2026 19:12:59 -0400 Subject: [PATCH 10/12] test: Improve guard test coverage for pool and proxy Add and update tests to verify single-use enforcement on WorkerPool and WorkerProxy. New test cases cover: - Reentrant entry (async with pool within pool context) - Post-exit re-entry (entering pool again after full cycle) - RuntimeError is raised with appropriate message These tests validate that the noreentry guard works correctly across both context managers and complements the comprehensive unit tests for the noreentry descriptor itself. --- wool/tests/runtime/worker/test_pool.py | 41 +++++++++++++++++++++++++ wool/tests/runtime/worker/test_proxy.py | 39 +++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/wool/tests/runtime/worker/test_pool.py b/wool/tests/runtime/worker/test_pool.py index 102be55..4b39fc3 100644 --- a/wool/tests/runtime/worker/test_pool.py +++ b/wool/tests/runtime/worker/test_pool.py @@ -554,6 +554,47 @@ async def test___aexit___cleanup_on_error(self, mock_worker_factory): assert pool_created, "Pool should have been created before exception" assert exception_caught, "Exception should have been propagated" + @pytest.mark.asyncio + async def test___aenter___already_entered_raises_error(self, mock_worker_factory): + """Test pool raises on reentrant entry. + + Given: + A WorkerPool that is already entered via async with. + When: + The pool is entered a second time via async with. + Then: + It should raise RuntimeError. + """ + # Arrange + pool = WorkerPool(worker=mock_worker_factory, spawn=2) + + # Act & assert + async with pool: + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + async with pool: + pass + + @pytest.mark.asyncio + async def test___aenter___after_exit_raises_error(self, mock_worker_factory): + """Test pool raises when re-entered after exit. + + Given: + A WorkerPool that has been entered and exited. + When: + The pool is entered again via async with. + Then: + It should raise RuntimeError because the context is single-use. + """ + # Arrange + pool = WorkerPool(worker=mock_worker_factory, spawn=2) + async with pool: + pass + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + async with pool: + pass + @pytest.mark.asyncio async def test___aenter___lifecycle_returns_pool_instance( self, diff --git a/wool/tests/runtime/worker/test_proxy.py b/wool/tests/runtime/worker/test_proxy.py index 3f51a4a..63f9d62 100644 --- a/wool/tests/runtime/worker/test_proxy.py +++ b/wool/tests/runtime/worker/test_proxy.py @@ -797,6 +797,45 @@ async def test_enter_with_non_lazy_proxy( # Assert assert proxy.started + @pytest.mark.asyncio + async def test_enter_already_entered_raises_error(self, mock_discovery_service): + """Test enter raises on reentrant call. + + Given: + A lazy WorkerProxy that has already been entered. + When: + enter() is called a second time. + Then: + It should raise RuntimeError. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + await proxy.enter() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + await proxy.enter() + + @pytest.mark.asyncio + async def test_enter_after_exit_raises_error(self, mock_discovery_service): + """Test enter raises after a full enter/exit cycle. + + Given: + A lazy WorkerProxy that has been entered and exited. + When: + enter() is called again. + Then: + It should raise RuntimeError because the context is single-use. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + await proxy.enter() + await proxy.exit() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + await proxy.enter() + @pytest.mark.asyncio async def test_stop_clears_state(self, mock_discovery_service, mock_proxy_session): """Test clear workers and reset the started flag to False. From 30c65701d1507482ac640993d4ef9cdd23376f7e Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 31 Mar 2026 19:13:03 -0400 Subject: [PATCH 11/12] docs: Document single-use context lifecycle for pools and proxies Add documentation to the worker README explaining the single-use contract for WorkerPool and WorkerProxy. Include examples of correct usage patterns and clarify that both context managers must be re-created for each context. This complements the updated docstrings in pool.py and proxy.py and ensures users understand the single-use semantics enforced by the @noreentry guard. --- wool/src/wool/runtime/worker/README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/wool/src/wool/runtime/worker/README.md b/wool/src/wool/runtime/worker/README.md index c4a4d80..13b3af2 100644 --- a/wool/src/wool/runtime/worker/README.md +++ b/wool/src/wool/runtime/worker/README.md @@ -220,6 +220,20 @@ Proxies on worker subprocesses are lazy by default — the `WorkerPool` propagat When `lazy=True`, concurrent `dispatch()` calls use a double-checked lock to ensure the proxy starts exactly once. The `lazy` flag is preserved through `cloudpickle` serialization, so proxies sent to worker subprocesses as part of a task retain their laziness setting. +### Context lifecycle + +Both `WorkerPool` and `WorkerProxy` are **single-use** async context managers. Once entered and exited, the same instance cannot be entered again — create a new instance instead. Attempting to call `enter()` or `__aenter__()` a second time raises `RuntimeError`. This prevents silent state corruption from reentrant or repeated context usage (e.g., accidentally nesting `async with proxy:` blocks or calling `enter()` in a retry loop). + +```python +# Correct — one instance per context +async with wool.WorkerPool(spawn=4): + await my_routine() + +# Need another pool? Create a new instance. +async with wool.WorkerPool(spawn=4): + await my_routine() +``` + ### Self-describing connections Workers are self-describing: each worker advertises its gRPC transport configuration via `ChannelOptions` in its `WorkerMetadata`. When a client discovers a worker, it reads the advertised options and configures its channel to match — message sizes, keepalive intervals, concurrency limits, and compression are all set automatically. There is no separate client-side configuration step; the worker's metadata is the single source of truth for how to connect to it. From 077199ea6dc312fd76c2118e571b8b5f9df1abd1 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 31 Mar 2026 19:13:26 -0400 Subject: [PATCH 12/12] fix: Support Python 3.11 in noreentry decorator Conditionally set the asyncio.coroutines._is_coroutine marker when Python version is 3.11. This ensures that asyncio.iscoroutinefunction() returns True for async noreentry methods on Python 3.11 and earlier. --- wool/src/wool/utilities/noreentry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wool/src/wool/utilities/noreentry.py b/wool/src/wool/utilities/noreentry.py index 36c95ef..f497e02 100644 --- a/wool/src/wool/utilities/noreentry.py +++ b/wool/src/wool/utilities/noreentry.py @@ -35,7 +35,7 @@ def __init__(self, fn, /): if sys.version_info >= (3, 12): inspect.markcoroutinefunction(self) else: - self._is_coroutine = asyncio.coroutines._is_coroutine + self._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore[attr-defined] self._fn = fn self._invocations = weakref.WeakSet()