diff --git a/.github/actions/run-tests/action.yaml b/.github/actions/run-tests/action.yaml new file mode 100644 index 0000000..b40d963 --- /dev/null +++ b/.github/actions/run-tests/action.yaml @@ -0,0 +1,37 @@ +name: Run tests +description: Checkout, install dependencies, and run pytest for a given namespace. +inputs: + python-version: + required: true + type: string + namespace: + required: true + type: string + pytest-marker: + required: true + type: string + cov-fail-under: + required: true + type: string + +runs: + using: composite + steps: + - name: Install uv and prepare python + uses: astral-sh/setup-uv@v5 + with: + python-version: ${{ inputs.python-version }} + + - name: Install packages + shell: bash + run: | + .github/scripts/install-python-packages.sh + uv pip install -e './${{ inputs.namespace }}[dev]' + uv pip freeze + + - name: Run tests + shell: bash + run: | + ulimit -n 65536 + cd ${{ inputs.namespace }} + uv run pytest -m "${{ inputs.pytest-marker }}" --cov-fail-under=${{ inputs.cov-fail-under }} diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 1f45cbc..65c46dc 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -34,38 +34,35 @@ jobs: cd wool uv run pyright - run-tests: - name: Namespace ${{ matrix.namespace }} / Python ${{ matrix.python-version }} + unit-tests: + name: Unit / Python ${{ matrix.python-version }} runs-on: ubuntu-latest strategy: matrix: - namespace: - - 'wool' - python-version: - - '3.11' - - '3.12' - - '3.13' + namespace: ['wool'] + python-version: ['3.11', '3.12', '3.13'] steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Install uv and prepare python - uses: astral-sh/setup-uv@v5 + - uses: actions/checkout@v4 + - uses: ./.github/actions/run-tests with: python-version: ${{ matrix.python-version }} + namespace: ${{ matrix.namespace }} + pytest-marker: not integration + cov-fail-under: '98' - - name: Install packages - env: - NAMESPACE: ${{ matrix.namespace }} - run: | - .github/scripts/install-python-packages.sh - uv pip install -e './${{ env.NAMESPACE }}[dev]' - uv pip freeze - - - name: Run tests - env: - NAMESPACE: ${{ matrix.namespace }} - run: | - ulimit -n 65536 - cd ${{ env.NAMESPACE }} - uv run pytest + integration-tests: + name: Integration / Python ${{ matrix.python-version }} + needs: unit-tests + runs-on: ubuntu-latest + strategy: + matrix: + namespace: ['wool'] + python-version: ['3.11', '3.12', '3.13'] + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/run-tests + with: + python-version: ${{ matrix.python-version }} + namespace: ${{ matrix.namespace }} + pytest-marker: integration + cov-fail-under: '70' diff --git a/wool/.coveragerc b/wool/.coveragerc index 799cb71..9c7a0b4 100644 --- a/wool/.coveragerc +++ b/wool/.coveragerc @@ -9,4 +9,3 @@ omit = [report] show_missing = true precision = 2 -fail_under = 98 diff --git a/wool/README.md b/wool/README.md index 577de45..bfc2983 100644 --- a/wool/README.md +++ b/wool/README.md @@ -156,6 +156,14 @@ async with wool.WorkerPool(discovery=wool.LanDiscovery(), lease=10): result = await my_routine() ``` +`lazy` controls whether the pool's internal `WorkerProxy` defers startup until the first task is dispatched. Defaults to `True`. The pool propagates this flag to every `WorkerProxy` it constructs, and each task serializes the proxy (including the flag) so that workers receiving the task inherit the same laziness setting. With `lazy=True`, worker subprocesses that never invoke nested `@wool.routine` calls avoid the cost of discovery subscription and sentinel setup entirely. Set `lazy=False` to start proxies eagerly — useful when you want connections established before the first dispatch. + +```python +# Eager proxy startup — connections established before first dispatch +async with wool.WorkerPool(spawn=4, lazy=False): + result = await my_routine() +``` + ## Workers A worker is a separate OS process hosting a gRPC server with two RPCs: `dispatch` (bidirectional streaming for task execution) and `stop` (graceful shutdown). Tasks execute on a dedicated asyncio event loop in a separate daemon thread, so that long-running or CPU-intensive task code does not block the main gRPC event loop. This keeps the worker responsive to new dispatches, stop requests, and concurrent streaming interactions with in-flight tasks. diff --git a/wool/src/wool/runtime/discovery/lan.py b/wool/src/wool/runtime/discovery/lan.py index 07a9b97..5dfcf83 100644 --- a/wool/src/wool/runtime/discovery/lan.py +++ b/wool/src/wool/runtime/discovery/lan.py @@ -323,7 +323,10 @@ def _resolve_address(self, address: str) -> Tuple[bytes, int]: return socket.inet_aton(socket.gethostbyname(host)), port - class Subscriber(metaclass=SubscriberMeta): + class Subscriber( + metaclass=SubscriberMeta, + key=lambda cls, service_type: (cls, service_type), + ): """Subscriber for receiving worker discovery events. Subscribes to worker :class:`discovery events @@ -347,10 +350,6 @@ class Subscriber(metaclass=SubscriberMeta): def __init__(self, service_type: str) -> None: self.service_type = service_type - @classmethod - def _cache_key(cls, service_type: str) -> tuple: - return (cls, service_type) - async def _shutdown(self) -> None: """Clean up shared subscription state for this subscriber.""" diff --git a/wool/src/wool/runtime/discovery/local.py b/wool/src/wool/runtime/discovery/local.py index d4c3484..ebb31fb 100644 --- a/wool/src/wool/runtime/discovery/local.py +++ b/wool/src/wool/runtime/discovery/local.py @@ -560,7 +560,14 @@ def _shared_memory_finalizer(self, shared_memory: SharedMemory): pass # pragma: no cover atexit.unregister(self._cleanups.pop(shared_memory.name)) - class Subscriber(metaclass=SubscriberMeta): + class Subscriber( + metaclass=SubscriberMeta, + key=lambda cls, namespace, *, poll_interval=None: ( + cls, + namespace, + poll_interval, + ), + ): """Subscriber for receiving worker discovery events. Subscribes to worker :class:`discovery events <~wool.DiscoveryEvent>` @@ -600,10 +607,6 @@ def __init__( raise ValueError(f"Expected positive poll interval, got {poll_interval}") self._poll_interval = poll_interval - @classmethod - def _cache_key(cls, namespace: str, *, poll_interval: float | None = None): - return (cls, namespace, poll_interval) - async def _shutdown(self) -> None: """Clean up shared subscription state for this subscriber.""" diff --git a/wool/src/wool/runtime/discovery/pool.py b/wool/src/wool/runtime/discovery/pool.py index 536fef5..56bb22f 100644 --- a/wool/src/wool/runtime/discovery/pool.py +++ b/wool/src/wool/runtime/discovery/pool.py @@ -108,9 +108,14 @@ class SubscriberMeta(type): :class:`~wool.runtime.resourcepool.Resource` is entered lazily on first iteration. - Subscriber classes using this metaclass must define a - ``_cache_key`` classmethod that returns a hashable key from the - constructor arguments. + Subscriber classes pass a ``key`` keyword argument at class + definition time. The callable receives ``(cls, *args, **kwargs)`` + and must return a hashable cache key. + + Example:: + + class Sub(metaclass=SubscriberMeta, key=lambda cls, ns: (cls, ns)): + def __init__(self, ns: str) -> None: ... """ def __new__( @@ -118,12 +123,16 @@ def __new__( name: str, bases: tuple[type, ...], namespace: dict[str, Any], + **kwargs: Any, ) -> SubscriberMeta: - cls = super().__new__(mcs, name, bases, namespace) + key_fn: Callable[..., Any] | None = kwargs.pop("key", None) + cls = super().__new__(mcs, name, bases, namespace, **kwargs) + if key_fn is not None: + cls._cache_key_fn = key_fn # type: ignore[attr-defined] original_init = cls.__init__ # type: ignore[misc] def _subscriber_new(cls_arg: type, *args: Any, **kwargs: Any) -> Any: - key = cls_arg._cache_key(*args, **kwargs) # type: ignore[attr-defined] + key = cls_arg._cache_key_fn(cls_arg, *args, **kwargs) # type: ignore[attr-defined] pool = __subscriber_pool__.get() if pool is None: pool = ResourcePool( diff --git a/wool/src/wool/runtime/worker/README.md b/wool/src/wool/runtime/worker/README.md index 999e43c..13b3af2 100644 --- a/wool/src/wool/runtime/worker/README.md +++ b/wool/src/wool/runtime/worker/README.md @@ -193,7 +193,9 @@ Signal handlers map `SIGTERM` to timeout 0 (cancel immediately) and `SIGINT` to ### Nested routines -Worker subprocesses can dispatch tasks to other workers. Each subprocess is configured with a `ResourcePool` of `WorkerProxy` instances (via `wool.__proxy_pool__`), so `@wool.routine` calls within a task transparently route to the target pool. Spinning up a `WorkerProxy` is not free — it involves establishing a discovery subscription and opening gRPC connections — so the resource pool caches proxies with a configurable TTL (default 60 seconds, set via `proxy_pool_ttl` on `LocalWorker`). If the interval between dispatches for a given pool on a given worker is shorter than the TTL, the cached proxy is reused. If it exceeds the TTL, the proxy is finalized and must be recreated on the next dispatch. Tuning `proxy_pool_ttl` above the expected dispatch interval keeps proxies warm and avoids this cold-start overhead. +Worker subprocesses can dispatch tasks to other workers. Each subprocess is configured with a `ResourcePool` of `WorkerProxy` instances (via `wool.__proxy_pool__`), so `@wool.routine` calls within a task transparently route to the target pool. Spinning up a `WorkerProxy` is not free — it involves establishing a discovery subscription, starting a sentinel task, and opening gRPC connections — so the resource pool caches proxies with a configurable TTL (default 60 seconds, set via `proxy_pool_ttl` on `LocalWorker`). If the interval between dispatches for a given pool on a given worker is shorter than the TTL, the cached proxy is reused. If it exceeds the TTL, the proxy is finalized and must be recreated on the next dispatch. Tuning `proxy_pool_ttl` above the expected dispatch interval keeps proxies warm and avoids this cold-start overhead. + +Proxies on worker subprocesses are lazy by default — the `WorkerPool` propagates its `lazy` flag to every `WorkerProxy` it constructs, and each task serializes the proxy (including the flag) so that workers receiving the task inherit the same laziness setting. A lazy proxy defers discovery subscription and sentinel setup until its first `dispatch()` call, so workers that never invoke nested routines pay no startup cost. ## Connections @@ -207,6 +209,31 @@ Worker subprocesses can dispatch tasks to other workers. Each subprocess is conf | Discovery | `discovery` | Accepts any `DiscoverySubscriberLike` or `Factory` thereof. | | Static | `workers` | Takes a sequence of `WorkerMetadata` directly — no discovery needed. | +### Lazy startup + +`WorkerProxy` accepts a `lazy` parameter (default `True`) that controls when the proxy actually starts — i.e., when it subscribes to discovery, launches the worker sentinel task, and initializes the load balancer context. + +| `lazy` | `enter()` / `__aenter__` | `dispatch()` | `exit()` on un-started proxy | +| ------ | ------------------------ | ------------- | ----------------------------- | +| `True` | Sets context var only | Calls `start()` on first call, then dispatches | No-op (safe to call) | +| `False` | Sets context var, calls `start()` | Raises `RuntimeError` if not started | Raises `RuntimeError` | + +When `lazy=True`, concurrent `dispatch()` calls use a double-checked lock to ensure the proxy starts exactly once. The `lazy` flag is preserved through `cloudpickle` serialization, so proxies sent to worker subprocesses as part of a task retain their laziness setting. + +### Context lifecycle + +Both `WorkerPool` and `WorkerProxy` are **single-use** async context managers. Once entered and exited, the same instance cannot be entered again — create a new instance instead. Attempting to call `enter()` or `__aenter__()` a second time raises `RuntimeError`. This prevents silent state corruption from reentrant or repeated context usage (e.g., accidentally nesting `async with proxy:` blocks or calling `enter()` in a retry loop). + +```python +# Correct — one instance per context +async with wool.WorkerPool(spawn=4): + await my_routine() + +# Need another pool? Create a new instance. +async with wool.WorkerPool(spawn=4): + await my_routine() +``` + ### Self-describing connections Workers are self-describing: each worker advertises its gRPC transport configuration via `ChannelOptions` in its `WorkerMetadata`. When a client discovers a worker, it reads the advertised options and configures its channel to match — message sizes, keepalive intervals, concurrency limits, and compression are all set automatically. There is no separate client-side configuration step; the worker's metadata is the single source of truth for how to connect to it. diff --git a/wool/src/wool/runtime/worker/pool.py b/wool/src/wool/runtime/worker/pool.py index bcbf7be..a5ced29 100644 --- a/wool/src/wool/runtime/worker/pool.py +++ b/wool/src/wool/runtime/worker/pool.py @@ -26,6 +26,7 @@ from wool.runtime.worker.proxy import LoadBalancerLike from wool.runtime.worker.proxy import RoundRobinLoadBalancer from wool.runtime.worker.proxy import WorkerProxy +from wool.utilities.noreentry import noreentry # public @@ -180,6 +181,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): """ Create an ephemeral pool of workers, spawning the specified @@ -197,6 +199,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): """ Connect to an existing pool of workers discovered by the @@ -216,6 +219,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): """ Create a hybrid pool that spawns local workers and discovers @@ -236,6 +240,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): ... @overload @@ -251,6 +256,7 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): ... def __init__( @@ -265,9 +271,11 @@ def __init__( LoadBalancerLike | Factory[LoadBalancerLike] ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None = None, + lazy: bool = True, ): self._workers = {} self._credentials = credentials + self._lazy = lazy if size is not None and spawn is not None: raise TypeError( @@ -310,6 +318,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=max_workers, + lazy=self._lazy, ): yield finally: @@ -335,6 +344,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=max_workers, + lazy=self._lazy, ): yield @@ -353,6 +363,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=lease, + lazy=self._lazy, ): yield finally: @@ -378,6 +389,7 @@ async def create_proxy(): loadbalancer=loadbalancer, credentials=self._credentials, lease=max_workers, + lazy=self._lazy, ): yield @@ -386,6 +398,7 @@ async def create_proxy(): self._proxy_factory = create_proxy + @noreentry async def __aenter__(self) -> WorkerPool: """Starts the worker pool and its services, returning a session. @@ -394,6 +407,10 @@ async def __aenter__(self) -> WorkerPool: :returns: The :class:`WorkerPool` instance itself for method chaining. + :raises RuntimeError: + If the pool has already been entered. ``WorkerPool`` + contexts are single-use — create a new instance instead + of re-entering. """ self._proxy_context = self._proxy_factory() await self._proxy_context.__aenter__() diff --git a/wool/src/wool/runtime/worker/process.py b/wool/src/wool/runtime/worker/process.py index f6d963f..4f12cb8 100644 --- a/wool/src/wool/runtime/worker/process.py +++ b/wool/src/wool/runtime/worker/process.py @@ -390,30 +390,30 @@ def _sigint_handler(loop, service, signum, frame): async def _proxy_factory(proxy: WorkerProxy): """Factory function for WorkerProxy instances in ResourcePool. - Starts the proxy if not already started and returns it. + Calls ``enter()`` on the proxy. Lazy proxies defer actual + startup until first dispatch; non-lazy proxies start eagerly. The proxy object itself is used as the cache key. :param proxy: - The WorkerProxy instance to start (passed as key from - ResourcePool). + The WorkerProxy instance (passed as key from ResourcePool). :returns: - The started WorkerProxy instance. + The entered WorkerProxy instance. """ - if not proxy.started: - await proxy.start() + await proxy.enter() return proxy async def _proxy_finalizer(proxy: WorkerProxy): """Finalizer function for WorkerProxy instances in ResourcePool. - Stops the proxy when it's being cleaned up from the resource pool. - Based on the cleanup logic from WorkerProxyCache._delayed_cleanup. + Exits the proxy context when it's being cleaned up from the + resource pool. Lazy proxies that were never started are handled + gracefully by the proxy's own exit method. :param proxy: The WorkerProxy instance to clean up. """ try: - await proxy.stop() + await proxy.exit() except Exception: pass diff --git a/wool/src/wool/runtime/worker/proxy.py b/wool/src/wool/runtime/worker/proxy.py index e6567ce..a92636c 100644 --- a/wool/src/wool/runtime/worker/proxy.py +++ b/wool/src/wool/runtime/worker/proxy.py @@ -33,8 +33,11 @@ from wool.runtime.worker.auth import WorkerCredentials from wool.runtime.worker.connection import WorkerConnection from wool.runtime.worker.metadata import WorkerMetadata +from wool.utilities.noreentry import noreentry if TYPE_CHECKING: + from contextvars import Token + from wool.runtime.routine.task import Task T = TypeVar("T") @@ -208,6 +211,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): ... @overload @@ -220,6 +224,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): ... @overload @@ -232,6 +237,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): ... def __init__( @@ -247,6 +253,7 @@ def __init__( ) = RoundRobinLoadBalancer, credentials: WorkerCredentials | None | UndefinedType = Undefined, lease: int | None = None, + lazy: bool = True, ): if not (pool_uri or discovery or workers): raise ValueError( @@ -259,8 +266,11 @@ def __init__( self._id: uuid.UUID = uuid.uuid4() self._started = False + self._lazy = lazy + self._start_lock = asyncio.Lock() if lazy else None self._loadbalancer = loadbalancer self._lease = lease + self._proxy_token: Token[WorkerProxy | None] | None = None if isinstance(loadbalancer, (ContextManager, AsyncContextManager)): warnings.warn( @@ -322,13 +332,13 @@ def combined_filter(w): self._loadbalancer_context: LoadBalancerContext | None = None async def __aenter__(self): - """Starts the proxy and sets it as the active context.""" - await self.start() + """Enters the proxy context and sets it as the active proxy.""" + await self.enter() return self async def __aexit__(self, *args): - """Stops the proxy and resets the active context.""" - await self.stop(*args) + """Exits the proxy context and resets the active proxy.""" + await self.exit(*args) def __hash__(self) -> int: return hash(str(self.id)) @@ -363,11 +373,12 @@ def __reduce__(self) -> tuple: f"{name}=my_cm())." ) - def _restore_proxy(discovery, loadbalancer, proxy_id, lease): + def _restore_proxy(discovery, loadbalancer, proxy_id, lease, lazy): proxy = WorkerProxy( discovery=discovery, loadbalancer=loadbalancer, lease=lease, + lazy=lazy, ) proxy._id = proxy_id return proxy @@ -379,6 +390,7 @@ def _restore_proxy(discovery, loadbalancer, proxy_id, lease): self._loadbalancer, self._id, self._lease, + self._lazy, ), ) @@ -390,6 +402,10 @@ def id(self) -> uuid.UUID: def started(self) -> bool: return self._started + @property + def lazy(self) -> bool: + return self._lazy + @property def workers(self) -> list[WorkerMetadata]: """A list of the currently discovered worker gRPC stubs.""" @@ -398,8 +414,33 @@ def workers(self) -> list[WorkerMetadata]: else: return [] + @noreentry + async def enter(self) -> None: + """Enter the proxy context. + + Sets this proxy as the active context variable. When + ``lazy=True``, defers resource acquisition until + :meth:`dispatch` is first called. When ``lazy=False``, + calls :meth:`start` eagerly. + + :raises RuntimeError: + If the proxy has already been entered. ``WorkerProxy`` + contexts are single-use — create a new instance instead + of re-entering. + :raises RuntimeError: + If the proxy has already been started and ``lazy`` is + ``False``. + """ + self._proxy_token = wool.__proxy__.set(self) + if self._lazy: + return + await self.start() + async def start(self) -> None: - """Starts the proxy by initiating the worker discovery process. + """Start the proxy by initiating discovery and load balancing. + + Subscribes to worker discovery, initializes the load-balancer + context, and launches the worker sentinel task. :raises RuntimeError: If the proxy has already been started. @@ -421,13 +462,32 @@ async def start(self) -> None: if not isinstance(self._discovery_stream, DiscoverySubscriberLike): raise ValueError - self._proxy_token = wool.__proxy__.set(self) self._loadbalancer_context = LoadBalancerContext() self._sentinel_task = asyncio.create_task(self._worker_sentinel()) self._started = True + async def exit(self, *args) -> None: + """Exit the proxy context. + + Resets the context variable. If the proxy was started, + delegates to :meth:`stop` to release resources. Calling + ``exit()`` on an un-started lazy proxy is a safe no-op. + + :raises RuntimeError: + If the proxy was not started first and ``lazy`` is + ``False``. + """ + if self._proxy_token is not None: + wool.__proxy__.reset(self._proxy_token) + self._proxy_token = None + if not self._started: + if not self._lazy: + raise RuntimeError("Proxy not started - call start() first") + return + await self.stop(*args) + async def stop(self, *args) -> None: - """Stops the proxy, terminating discovery and clearing connections. + """Stop the proxy, terminating discovery and clearing connections. :raises RuntimeError: If the proxy was not started first. @@ -438,7 +498,6 @@ async def stop(self, *args) -> None: await self._exit_context(self._discovery_context_manager, *args) await self._exit_context(self._loadbalancer_context_manager, *args) - wool.__proxy__.reset(self._proxy_token) if self._sentinel_task: self._sentinel_task.cancel() try: @@ -456,7 +515,8 @@ async def dispatch( This method selects a worker using a round-robin strategy. If no workers are available within the timeout period, it raises an - exception. + exception. When ``lazy=True``, the proxy is started automatically + on first dispatch. :param task: The :class:`Task` object to be dispatched. @@ -465,12 +525,17 @@ async def dispatch( :returns: A protobuf result object from the worker. :raises RuntimeError: - If the proxy is not started. + If the proxy is not started and ``lazy`` is ``False``. :raises asyncio.TimeoutError: If no worker is available within the timeout period. """ if not self._started: - raise RuntimeError("Proxy not started - call start() first") + if not self._lazy: + raise RuntimeError("Proxy not started - call start() first") + assert self._start_lock is not None + async with self._start_lock: + if not self._started: + await self.start() await asyncio.wait_for(self._await_workers(), 60) diff --git a/wool/src/wool/utilities/noreentry.py b/wool/src/wool/utilities/noreentry.py new file mode 100644 index 0000000..f497e02 --- /dev/null +++ b/wool/src/wool/utilities/noreentry.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import asyncio.coroutines +import functools +import inspect +import sys +import weakref +from typing import Never + + +class _Token: + """Hashable, weakly-referenceable token for instance tracking.""" + + pass + + +class NoReentryBoundMethod: + """Descriptor implementing single-use guard for bound methods. + + On the first call the decorated method executes normally. Any + subsequent call raises :class:`RuntimeError`. + + Guard state uses a per-instance token stored on the instance under + ``__noreentry_token__``. The token is unique, hashable, and tied to the + instance's lifetime. The descriptor tracks tokens in a WeakSet to auto-clean + when instances are garbage collected. + + Works with both synchronous and asynchronous methods. Only supports + bound methods; using @noreentry on bare functions raises TypeError. + """ + + def __init__(self, fn, /): + functools.update_wrapper(self, fn) + if inspect.iscoroutinefunction(fn): + if sys.version_info >= (3, 12): + inspect.markcoroutinefunction(self) + else: + self._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore[attr-defined] + self._fn = fn + self._invocations = weakref.WeakSet() + + def __call__(self, *args, **kwargs) -> Never: + raise TypeError("@noreentry only decorates methods, not bare functions") + + def __get__(self, obj, objtype=None): + if obj is None: + return self + + # Cache the wrapper on the instance to avoid recreating it on every access. + cache_key = f"__noreentry_wrapper_{id(self)}__" + return obj.__dict__.setdefault(cache_key, self._make_wrapper(obj)) + + def _make_wrapper(self, obj): + """Create the bound wrapper for an instance.""" + guard = self._guard + fn = self._fn + if inspect.iscoroutinefunction(fn): + + @functools.wraps(fn) + async def async_wrapper(*args, **kwargs): + guard(obj) + return await fn(obj, *args, **kwargs) + + return async_wrapper + else: + + @functools.wraps(fn) + def sync_wrapper(*args, **kwargs): + guard(obj) + return fn(obj, *args, **kwargs) + + return sync_wrapper + + def _guard(self, obj): + """Check and record invocation on the specified object.""" + # Get or create a unique token for this object. + token = obj.__dict__.setdefault("__noreentry_token__", _Token()) + + # Check if this descriptor was invoked on this object already. + if token in self._invocations: + raise RuntimeError( + f"'{self._fn.__qualname__}' cannot be invoked more than once" + ) + + # Track invocation. + self._invocations.add(token) + + +def noreentry(fn): + """Mark a method as single-use. + + On the first call the decorated method executes normally. Any + subsequent call raises :class:`RuntimeError`. + + Guard state uses a per-instance token stored on the instance under + ``__noreentry_token__``. The token is unique, hashable, and tied to the + instance's lifetime. The descriptor tracks tokens in a :class:`WeakSet` + to auto-clean when instances are garbage collected. + + Works with both synchronous and asynchronous methods. Only supports + bound methods; using @noreentry on bare functions raises :class:`TypeError` + on the first invocation. + + :param fn: + The bound instance or class method to decorate. + """ + return NoReentryBoundMethod(fn) diff --git a/wool/tests/integration/conftest.py b/wool/tests/integration/conftest.py index dae3195..05e3592 100644 --- a/wool/tests/integration/conftest.py +++ b/wool/tests/integration/conftest.py @@ -105,6 +105,11 @@ class RoutineBinding(Enum): STATICMETHOD = auto() +class LazyMode(Enum): + LAZY = auto() + EAGER = auto() + + @dataclass(frozen=True) class Scenario: """Composable scenario describing one integration test configuration. @@ -121,6 +126,7 @@ class Scenario: options: WorkerOptionsKind | None = None timeout: TimeoutKind | None = None binding: RoutineBinding | None = None + lazy: LazyMode | None = None def __or__(self, other: Scenario) -> Scenario: """Merge two partial scenarios. Right side wins on ``None`` fields. @@ -141,7 +147,7 @@ def __or__(self, other: Scenario) -> Scenario: @property def is_complete(self) -> bool: - """True when all 8 dimensions are set.""" + """True when all 9 dimensions are set.""" return all(getattr(self, f.name) is not None for f in fields(self)) def __str__(self) -> str: @@ -277,20 +283,24 @@ async def _lan_async_cm(): if scenario.timeout is TimeoutKind.VIA_RUNTIME_CONTEXT: runtime_ctx = RuntimeContext(dispatch_timeout=30.0) + lazy = scenario.lazy is LazyMode.LAZY + try: if runtime_ctx is not None: runtime_ctx.__enter__() try: if scenario.pool_mode is PoolMode.DURABLE: - async with _durable_pool_context(lb, creds, options) as pool: + async with _durable_pool_context(lb, creds, options, lazy) as pool: yield pool elif scenario.pool_mode is PoolMode.DURABLE_SHARED: - async with _durable_shared_pool_context(lb, creds, options) as pool: + async with _durable_shared_pool_context( + lb, creds, options, lazy + ) as pool: yield pool elif scenario.pool_mode is PoolMode.DURABLE_JOINED: async with _durable_joined_pool_context( - scenario.discovery, lb, creds, options + scenario.discovery, lb, creds, options, lazy ) as pool: yield pool else: @@ -298,6 +308,7 @@ async def _lan_async_cm(): "loadbalancer": lb, "credentials": creds, "worker": partial(LocalWorker, options=options), + "lazy": lazy, } match scenario.pool_mode: case PoolMode.DEFAULT: @@ -345,7 +356,7 @@ async def _lan_async_cm(): @asynccontextmanager -async def _durable_pool_context(lb, creds, options): +async def _durable_pool_context(lb, creds, options, lazy): """Manually start a worker, register it, then create a DURABLE pool. DURABLE pools don't spawn workers — they only discover external @@ -368,6 +379,7 @@ async def _durable_pool_context(lb, creds, options): discovery=_DirectDiscovery(discovery), loadbalancer=lb, credentials=creds, + lazy=lazy, ) async with pool: yield pool @@ -378,7 +390,7 @@ async def _durable_pool_context(lb, creds, options): @asynccontextmanager -async def _durable_shared_pool_context(lb, creds, options): +async def _durable_shared_pool_context(lb, creds, options, lazy): """Create two pools sharing the same LocalDiscovery subscriber. Exercises ``SubscriberMeta`` singleton caching and @@ -401,11 +413,13 @@ async def _durable_shared_pool_context(lb, creds, options): discovery=shared, loadbalancer=lb, credentials=creds, + lazy=lazy, ) pool_b = WorkerPool( discovery=shared, loadbalancer=lb, credentials=creds, + lazy=lazy, ) async with pool_a: async with pool_b: @@ -455,7 +469,7 @@ async def _acm(): @asynccontextmanager -async def _durable_joined_pool_context(discovery_factory, lb, creds, options): +async def _durable_joined_pool_context(discovery_factory, lb, creds, options, lazy): """Create a DURABLE pool that joins an externally owned namespace. Sets up an owner ``LocalDiscovery`` that creates workers and publishes @@ -480,6 +494,7 @@ async def _durable_joined_pool_context(discovery_factory, lb, creds, options): discovery=joiner, loadbalancer=lb, credentials=creds, + lazy=lazy, ) async with pool: yield pool @@ -704,6 +719,7 @@ def _pairwise_filter(row): options=row[5], timeout=row[6], binding=row[7], + lazy=row[8], ) for row in AllPairs( [ @@ -715,6 +731,7 @@ def _pairwise_filter(row): list(WorkerOptionsKind), list(TimeoutKind), list(RoutineBinding), + list(LazyMode), ], filter_func=_pairwise_filter, ) @@ -774,6 +791,8 @@ def scenarios_strategy(draw): else: binding = draw(st.sampled_from(RoutineBinding)) + lazy = draw(st.sampled_from(LazyMode)) + return Scenario( shape=shape, pool_mode=pool_mode, @@ -783,6 +802,7 @@ def scenarios_strategy(draw): options=options, timeout=timeout, binding=binding, + lazy=lazy, ) diff --git a/wool/tests/integration/test_integration.py b/wool/tests/integration/test_integration.py index b8c7b26..70d0852 100644 --- a/wool/tests/integration/test_integration.py +++ b/wool/tests/integration/test_integration.py @@ -11,6 +11,7 @@ from .conftest import PAIRWISE_SCENARIOS from .conftest import CredentialType from .conftest import DiscoveryFactory +from .conftest import LazyMode from .conftest import LbFactory from .conftest import PoolMode from .conftest import RoutineBinding @@ -68,6 +69,7 @@ async def body(): WorkerOptionsKind.DEFAULT, TimeoutKind.NONE, RoutineBinding.MODULE_FUNCTION, + LazyMode.LAZY, ) ) @example( @@ -80,6 +82,7 @@ async def body(): WorkerOptionsKind.DEFAULT, TimeoutKind.NONE, RoutineBinding.MODULE_FUNCTION, + LazyMode.LAZY, ) ) @example( @@ -92,6 +95,7 @@ async def body(): WorkerOptionsKind.KEEPALIVE, TimeoutKind.NONE, RoutineBinding.MODULE_FUNCTION, + LazyMode.LAZY, ) ) @given(scenario=scenarios_strategy()) diff --git a/wool/tests/integration/test_pool_composition.py b/wool/tests/integration/test_pool_composition.py index 397a9da..55bf2d5 100644 --- a/wool/tests/integration/test_pool_composition.py +++ b/wool/tests/integration/test_pool_composition.py @@ -4,6 +4,7 @@ from .conftest import CredentialType from .conftest import DiscoveryFactory +from .conftest import LazyMode from .conftest import LbFactory from .conftest import PoolMode from .conftest import RoutineBinding @@ -39,6 +40,7 @@ async def test_build_pool_from_scenario_with_default_mode(self, credentials_map) options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -53,7 +55,8 @@ async def test_build_pool_from_scenario_with_ephemeral_mode(self, credentials_ma """Test building a pool with EPHEMERAL mode and size=2. Given: - A complete scenario using EPHEMERAL pool mode with 2 workers. + A complete scenario using EPHEMERAL pool mode with 2 workers + and eager proxy start. When: A pool is built and a coroutine routine is dispatched. Then: @@ -69,6 +72,7 @@ async def test_build_pool_from_scenario_with_ephemeral_mode(self, credentials_ma options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.EAGER, ) # Act @@ -100,6 +104,7 @@ async def test_build_pool_from_scenario_with_durable_mode(self, credentials_map) options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -131,6 +136,7 @@ async def test_build_pool_from_scenario_with_hybrid_mode(self, credentials_map): options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -166,6 +172,7 @@ async def test_build_pool_from_scenario_with_durable_joined_local( options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -197,6 +204,7 @@ async def test_build_pool_from_scenario_with_restrictive_opts(self, credentials_ options=WorkerOptionsKind.RESTRICTIVE, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -229,6 +237,7 @@ async def test_build_pool_from_scenario_with_keepalive_opts(self, credentials_ma options=WorkerOptionsKind.KEEPALIVE, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -260,6 +269,7 @@ async def test_build_pool_from_scenario_with_dispatch_timeout(self, credentials_ options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.VIA_RUNTIME_CONTEXT, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act @@ -294,6 +304,7 @@ async def test_build_pool_from_scenario_with_shared_discovery(self, credentials_ options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act diff --git a/wool/tests/integration/test_scenario.py b/wool/tests/integration/test_scenario.py index 2ad2cbc..81159b2 100644 --- a/wool/tests/integration/test_scenario.py +++ b/wool/tests/integration/test_scenario.py @@ -4,6 +4,7 @@ from .conftest import CredentialType from .conftest import DiscoveryFactory +from .conftest import LazyMode from .conftest import LbFactory from .conftest import PoolMode from .conftest import RoutineBinding @@ -102,7 +103,7 @@ def test_is_complete_with_all_fields(self): """Test that a fully populated scenario reports complete. Given: - A scenario with all 8 dimensions set. + A scenario with all 9 dimensions set. When: ``is_complete`` is checked. Then: @@ -118,6 +119,7 @@ def test_is_complete_with_all_fields(self): options=WorkerOptionsKind.DEFAULT, timeout=TimeoutKind.NONE, binding=RoutineBinding.MODULE_FUNCTION, + lazy=LazyMode.LAZY, ) # Act & assert @@ -163,4 +165,4 @@ def test___str___with_partial_fields(self): result = str(scenario) # Assert - assert result == "COROUTINE-DEFAULT-_-_-_-_-_-_" + assert result == "COROUTINE-DEFAULT-_-_-_-_-_-_-_" diff --git a/wool/tests/runtime/discovery/test_pool.py b/wool/tests/runtime/discovery/test_pool.py index 049d82e..49b362c 100644 --- a/wool/tests/runtime/discovery/test_pool.py +++ b/wool/tests/runtime/discovery/test_pool.py @@ -15,16 +15,15 @@ from wool.runtime.worker.metadata import WorkerMetadata -class _StubSubscriber(metaclass=SubscriberMeta): +class _StubSubscriber( + metaclass=SubscriberMeta, + key=lambda cls, key_value: (cls, key_value), +): """Minimal subscriber stub for testing SubscriberMeta.""" def __init__(self, key_value: str) -> None: self.key_value = key_value - @classmethod - def _cache_key(cls, key_value: str): - return (cls, key_value) - def __reduce__(self): return type(self), (self.key_value,) @@ -105,14 +104,13 @@ async def test___new___with_shared_fan_out(self): # Arrange events = [_make_event(address=f"127.0.0.1:{50051 + i}") for i in range(3)] - class _EventStub(metaclass=SubscriberMeta): + class _EventStub( + metaclass=SubscriberMeta, + key=lambda cls, tag: (cls, tag), + ): def __init__(self, tag): self.tag = tag - @classmethod - def _cache_key(cls, tag): - return (cls, tag) - async def _shutdown(self): pass @@ -157,15 +155,14 @@ async def test___new___with_different_keys(self): event_a = _make_event(address="10.0.0.1:1") event_b = _make_event(address="10.0.0.2:2") - class _IsoStub(metaclass=SubscriberMeta): + class _IsoStub( + metaclass=SubscriberMeta, + key=lambda cls, tag, event: (cls, tag), + ): def __init__(self, tag, event): self.tag = tag self._event = event - @classmethod - def _cache_key(cls, tag, event): - return (cls, tag) - async def _shutdown(self): pass @@ -206,6 +203,89 @@ def test___new___with_lazy_pool_init(self): # Assert assert __subscriber_pool__.get() is not None + def test___new___with_key_callable_receiving_cls(self): + """Test key callable receives the class as its first argument. + + Given: + A subscriber class defined with a key callable that + includes cls in the returned tuple. + When: + The class is instantiated. + Then: + It should produce a _SharedSubscription whose cache key + was computed using the class itself. + """ + # Arrange + captured = {} + + def capture_key(cls, value): + captured["cls"] = cls + captured["value"] = value + return (cls, value) + + class _CaptureSub( + metaclass=SubscriberMeta, + key=capture_key, + ): + def __init__(self, value): + self.value = value + + async def _shutdown(self): + pass + + def __aiter__(self): + return self._gen() + + async def _gen(self): + yield # pragma: no cover + + # Act + result = _CaptureSub("test-val") + + # Assert + assert isinstance(result, _SharedSubscription) + assert captured["cls"] is _CaptureSub + assert captured["value"] == "test-val" + + def test___new___with_inherited_key(self): + """Test subclass inherits key callable from parent. + + Given: + A parent subscriber class defined with a key callable + and a subclass that does not pass its own key. + When: + The subclass is instantiated. + Then: + It should inherit the parent's key callable and return + a _SharedSubscription. + """ + + # Arrange + class _Parent( + metaclass=SubscriberMeta, + key=lambda cls, tag: (cls, tag), + ): + def __init__(self, tag): + self.tag = tag + + async def _shutdown(self): + pass + + def __aiter__(self): + return self._gen() + + async def _gen(self): + yield # pragma: no cover + + class _Child(_Parent): + pass + + # Act + result = _Child("child-tag") + + # Assert + assert isinstance(result, _SharedSubscription) + class TestSharedSubscription: """Tests for _SharedSubscription class. diff --git a/wool/tests/runtime/worker/test_pool.py b/wool/tests/runtime/worker/test_pool.py index 102be55..4b39fc3 100644 --- a/wool/tests/runtime/worker/test_pool.py +++ b/wool/tests/runtime/worker/test_pool.py @@ -554,6 +554,47 @@ async def test___aexit___cleanup_on_error(self, mock_worker_factory): assert pool_created, "Pool should have been created before exception" assert exception_caught, "Exception should have been propagated" + @pytest.mark.asyncio + async def test___aenter___already_entered_raises_error(self, mock_worker_factory): + """Test pool raises on reentrant entry. + + Given: + A WorkerPool that is already entered via async with. + When: + The pool is entered a second time via async with. + Then: + It should raise RuntimeError. + """ + # Arrange + pool = WorkerPool(worker=mock_worker_factory, spawn=2) + + # Act & assert + async with pool: + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + async with pool: + pass + + @pytest.mark.asyncio + async def test___aenter___after_exit_raises_error(self, mock_worker_factory): + """Test pool raises when re-entered after exit. + + Given: + A WorkerPool that has been entered and exited. + When: + The pool is entered again via async with. + Then: + It should raise RuntimeError because the context is single-use. + """ + # Arrange + pool = WorkerPool(worker=mock_worker_factory, spawn=2) + async with pool: + pass + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + async with pool: + pass + @pytest.mark.asyncio async def test___aenter___lifecycle_returns_pool_instance( self, diff --git a/wool/tests/runtime/worker/test_process.py b/wool/tests/runtime/worker/test_process.py index b5cc1d0..98cf880 100644 --- a/wool/tests/runtime/worker/test_process.py +++ b/wool/tests/runtime/worker/test_process.py @@ -18,8 +18,6 @@ from wool.runtime.worker.base import WorkerOptions from wool.runtime.worker.metadata import WorkerMetadata from wool.runtime.worker.process import WorkerProcess -from wool.runtime.worker.process import _proxy_factory -from wool.runtime.worker.process import _proxy_finalizer from wool.runtime.worker.process import _sigint_handler from wool.runtime.worker.process import _signal_handlers from wool.runtime.worker.process import _sigterm_handler @@ -746,96 +744,6 @@ def test_start_deserializes_metadata_with_extra_from_pipe(self, mocker): assert isinstance(process.metadata.extra, MappingProxyType) assert process.metadata.tags == frozenset({"gpu"}) - @pytest.mark.asyncio - async def test__proxy_factory_starts_proxy_when_not_started(self, mocker): - """Test _proxy_factory starts proxy when not already started. - - Given: - A WorkerProcess and a proxy with started=False - When: - _proxy_factory() is called - Then: - It should call proxy.start() and return the proxy - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.started = False - mock_proxy.start = mocker.AsyncMock() - - # Act - result = await _proxy_factory(mock_proxy) - - # Assert - mock_proxy.start.assert_called_once() - assert result is mock_proxy - - @pytest.mark.asyncio - async def test__proxy_factory_does_not_start_proxy_when_already_started( - self, mocker - ): - """Test _proxy_factory does not start proxy when already started. - - Given: - A WorkerProcess and a proxy with started=True - When: - _proxy_factory() is called - Then: - It should not call proxy.start() but still return the proxy - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.started = True - mock_proxy.start = mocker.AsyncMock() - - # Act - result = await _proxy_factory(mock_proxy) - - # Assert - mock_proxy.start.assert_not_called() - assert result is mock_proxy - - @pytest.mark.asyncio - async def test__proxy_finalizer_successfully_stops_proxy(self, mocker): - """Test _proxy_finalizer successfully stops proxy. - - Given: - A WorkerProcess and a proxy that stops successfully - When: - _proxy_finalizer() is called - Then: - It should call proxy.stop() and complete without error - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.stop = mocker.AsyncMock() - - # Act - await _proxy_finalizer(mock_proxy) - - # Assert - mock_proxy.stop.assert_called_once() - - @pytest.mark.asyncio - async def test__proxy_finalizer_handles_exception_gracefully(self, mocker): - """Test _proxy_finalizer handles exception from proxy.stop() gracefully. - - Given: - A WorkerProcess and a proxy that raises exception on stop - When: - _proxy_finalizer() is called - Then: - It should catch the exception and complete without propagating it - """ - # Arrange - mock_proxy = mocker.MagicMock() - mock_proxy.stop = mocker.AsyncMock(side_effect=Exception("Stop failed")) - - # Act — should not raise exception - await _proxy_finalizer(mock_proxy) - - # Assert - mock_proxy.stop.assert_called_once() - def test_run_sets_up_proxy_pool_and_starts_server(self, mocker): """Test run method sets up proxy pool and starts gRPC server. diff --git a/wool/tests/runtime/worker/test_proxy.py b/wool/tests/runtime/worker/test_proxy.py index 97383a9..63f9d62 100644 --- a/wool/tests/runtime/worker/test_proxy.py +++ b/wool/tests/runtime/worker/test_proxy.py @@ -647,19 +647,51 @@ def test___init___with_callable_loadbalancer_no_warning( user_warnings = [w for w in caught if issubclass(w.category, UserWarning)] assert user_warnings == [] + def test___init___with_default_lazy(self, mock_discovery_service): + """Test WorkerProxy defaults to lazy initialization. + + Given: + A discovery service. + When: + WorkerProxy is instantiated with default parameters. + Then: + It should have lazy set to True. + """ + # Act + proxy = WorkerProxy(discovery=mock_discovery_service) + + # Assert + assert proxy.lazy is True + + def test___init___with_lazy_false(self, mock_discovery_service): + """Test WorkerProxy accepts explicit lazy=False. + + Given: + A discovery service and lazy=False. + When: + WorkerProxy is instantiated. + Then: + It should have lazy set to False. + """ + # Act + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) + + # Assert + assert proxy.lazy is False + @pytest.mark.asyncio async def test___aenter___lifecycle(self, mock_discovery_service): """Test it starts and stops correctly. Given: - A WorkerProxy configured with discovery service + A non-lazy WorkerProxy configured with discovery service When: The proxy is used as a context manager Then: It starts and stops correctly """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) entered = False exited = False @@ -708,14 +740,14 @@ async def test_start_sets_started_flag( """Test set the started flag to True. Given: - An unstarted WorkerProxy instance + A non-lazy unstarted WorkerProxy instance When: Start is called Then: It should set the started flag to True """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act await proxy.start() @@ -723,19 +755,100 @@ async def test_start_sets_started_flag( # Assert assert proxy.started + @pytest.mark.asyncio + async def test_enter_with_lazy_proxy(self, mock_discovery_service): + """Test enter defers startup for lazy proxies. + + Given: + A lazy WorkerProxy that has not been entered. + When: + enter() is called. + Then: + It should remain un-started. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + + # Act + await proxy.enter() + + # Assert + assert not proxy.started + + @pytest.mark.asyncio + async def test_enter_with_non_lazy_proxy( + self, mock_discovery_service, mock_proxy_session + ): + """Test enter eagerly starts a non-lazy proxy. + + Given: + A non-lazy WorkerProxy that has not been entered. + When: + enter() is called. + Then: + It should set started to True. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) + + # Act + await proxy.enter() + + # Assert + assert proxy.started + + @pytest.mark.asyncio + async def test_enter_already_entered_raises_error(self, mock_discovery_service): + """Test enter raises on reentrant call. + + Given: + A lazy WorkerProxy that has already been entered. + When: + enter() is called a second time. + Then: + It should raise RuntimeError. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + await proxy.enter() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + await proxy.enter() + + @pytest.mark.asyncio + async def test_enter_after_exit_raises_error(self, mock_discovery_service): + """Test enter raises after a full enter/exit cycle. + + Given: + A lazy WorkerProxy that has been entered and exited. + When: + enter() is called again. + Then: + It should raise RuntimeError because the context is single-use. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + await proxy.enter() + await proxy.exit() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + await proxy.enter() + @pytest.mark.asyncio async def test_stop_clears_state(self, mock_discovery_service, mock_proxy_session): """Test clear workers and reset the started flag to False. Given: - A started WorkerProxy with registered workers + A started WorkerProxy with registered workers. When: - Stop is called + stop() is called. Then: - It should clear workers and reset the started flag to False + It should clear workers and reset the started flag to False. """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) await proxy.start() # Act @@ -752,14 +865,14 @@ async def test_start_already_started_raises_error( """Test raise RuntimeError. Given: - A WorkerProxy that is already started + A non-lazy WorkerProxy that is already started When: Start is called again Then: It should raise RuntimeError """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) await proxy.start() # Act & assert @@ -767,22 +880,64 @@ async def test_start_already_started_raises_error( await proxy.start() @pytest.mark.asyncio - async def test_stop_not_started_raises_error(self, mock_discovery_service): + async def test_exit_not_started_raises_error(self, mock_discovery_service): """Test raise RuntimeError. Given: - A WorkerProxy that is not started + A non-lazy WorkerProxy that is not started. When: - Stop is called + exit() is called. Then: - It should raise RuntimeError + It should raise RuntimeError. """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act & assert with pytest.raises(RuntimeError, match="Proxy not started"): - await proxy.stop() + await proxy.exit() + + @pytest.mark.asyncio + async def test_exit_with_unstarted_lazy_proxy(self, mock_discovery_service): + """Test exit is a no-op on an un-started lazy proxy. + + Given: + A lazy WorkerProxy that was never started. + When: + exit() is called. + Then: + It should return without raising. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + + # Act & assert — should not raise + await proxy.exit() + + @pytest.mark.asyncio + async def test_exit_stops_started_lazy_proxy( + self, mock_discovery_service, mock_proxy_session + ): + """Test exit stops a lazy proxy that was started. + + Given: + A lazy WorkerProxy that was entered and subsequently started. + When: + exit() is called. + Then: + It should stop the proxy and set started to False. + """ + # Arrange + proxy = WorkerProxy(discovery=mock_discovery_service) + await proxy.enter() + await proxy.start() + assert proxy.started + + # Act + await proxy.exit() + + # Assert + assert not proxy.started @pytest.mark.asyncio async def test___aenter___enter_starts_proxy( @@ -791,14 +946,14 @@ async def test___aenter___enter_starts_proxy( """Test automatically start the proxy. Given: - An unstarted WorkerProxy + A non-lazy unstarted WorkerProxy When: The async context manager is entered Then: It should automatically start the proxy """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act & assert async with proxy as p: @@ -811,14 +966,14 @@ async def test___aexit___exit_stops_proxy( """Test automatically stop the proxy. Given: - A WorkerProxy within async context + A non-lazy WorkerProxy within async context When: The async context manager exits Then: It should automatically stop the proxy """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act async with proxy: @@ -834,8 +989,8 @@ async def test_start_stop_with_sync_cm_loadbalancer( """Test start/stop with sync context manager load balancer. Given: - A WorkerProxy with a load balancer provided as a sync - context manager. + A non-lazy WorkerProxy with a load balancer provided as a + sync context manager. When: start() then stop() are called. Then: @@ -859,7 +1014,9 @@ def __exit__(self, *args): self.exited = True cm = SyncCM() - proxy = WorkerProxy(discovery=mock_discovery_service, loadbalancer=cm) + proxy = WorkerProxy( + discovery=mock_discovery_service, loadbalancer=cm, lazy=False + ) # Act await proxy.start() @@ -879,8 +1036,8 @@ async def test_start_stop_with_async_cm_loadbalancer( """Test start/stop with async context manager load balancer. Given: - A WorkerProxy with a load balancer provided as an async - context manager. + A non-lazy WorkerProxy with a load balancer provided as an + async context manager. When: start() then stop() are called. Then: @@ -904,7 +1061,9 @@ async def __aexit__(self, *args): self.exited = True cm = AsyncCM() - proxy = WorkerProxy(discovery=mock_discovery_service, loadbalancer=cm) + proxy = WorkerProxy( + discovery=mock_discovery_service, loadbalancer=cm, lazy=False + ) # Act await proxy.start() @@ -924,8 +1083,8 @@ async def test_start_with_awaitable_loadbalancer( """Test start with an awaitable load balancer. Given: - A WorkerProxy with a load balancer provided as a bare - awaitable (coroutine object). + A non-lazy WorkerProxy with a load balancer provided as a + bare awaitable (coroutine object). When: start() is called. Then: @@ -939,7 +1098,9 @@ async def test_start_with_awaitable_loadbalancer( async def make_lb(): return mock_lb - proxy = WorkerProxy(discovery=mock_discovery_service, loadbalancer=make_lb()) + proxy = WorkerProxy( + discovery=mock_discovery_service, loadbalancer=make_lb(), lazy=False + ) # Act await proxy.start() @@ -1055,7 +1216,7 @@ async def test_handles_worker_updated(self, mock_proxy_session): """Test the proxy handles worker-updated events. Given: - A started WorkerProxy with a discovered worker. + A non-lazy, started WorkerProxy with a discovered worker. When: A "worker-updated" event is received for that worker. Then: @@ -1074,7 +1235,7 @@ async def test_handles_worker_updated(self, mock_proxy_session): DiscoveryEvent("worker-updated", metadata=metadata), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1093,8 +1254,9 @@ async def test_sentinel_passes_metadata_options_to_connection( """Test sentinel creates WorkerConnection with metadata options. Given: - A WorkerProxy with a discovery stream yielding a worker-added - event whose metadata has options=ChannelOptions(keepalive_time_ms=60000) + A non-lazy WorkerProxy with a discovery stream yielding a + worker-added event whose metadata has + options=ChannelOptions(keepalive_time_ms=60000) When: The sentinel processes the event Then: @@ -1117,7 +1279,7 @@ async def test_sentinel_passes_metadata_options_to_connection( ) events = [DiscoveryEvent("worker-added", metadata=metadata)] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1140,8 +1302,8 @@ async def test_sentinel_passes_none_options_for_legacy_workers( """Test sentinel creates WorkerConnection with options=None for legacy workers. Given: - A WorkerProxy with a discovery stream yielding a worker-added - event whose metadata has options=None + A non-lazy WorkerProxy with a discovery stream yielding a + worker-added event whose metadata has options=None When: The sentinel processes the event Then: @@ -1161,7 +1323,7 @@ async def test_sentinel_passes_none_options_for_legacy_workers( ) events = [DiscoveryEvent("worker-added", metadata=metadata)] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1184,9 +1346,9 @@ async def test_sentinel_passes_updated_options_on_worker_updated( """Test sentinel creates WorkerConnection with updated metadata options. Given: - A WorkerProxy with a discovery stream yielding a worker-added - event followed by a worker-updated event whose metadata has - options=ChannelOptions(keepalive_time_ms=90000) + A non-lazy WorkerProxy with a discovery stream yielding a + worker-added event followed by a worker-updated event whose + metadata has options=ChannelOptions(keepalive_time_ms=90000) When: The sentinel processes the events Then: @@ -1221,7 +1383,7 @@ async def test_sentinel_passes_updated_options_on_worker_updated( DiscoveryEvent("worker-updated", metadata=updated_metadata), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery) + proxy = WorkerProxy(discovery=discovery, lazy=False) # Act await proxy.start() @@ -1292,8 +1454,8 @@ async def test_start_caps_discovered_workers_at_lease( """Test sentinel respects lease cap on worker-added events. Given: - A WorkerProxy with lease=2 and a discovery stream with - 3 worker-added events + A non-lazy WorkerProxy with lease=2 and a discovery stream + with 3 worker-added events When: The sentinel processes all events Then: @@ -1312,7 +1474,7 @@ async def test_start_caps_discovered_workers_at_lease( ] events = [DiscoveryEvent("worker-added", metadata=w) for w in workers] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=2) + proxy = WorkerProxy(discovery=discovery, lease=2, lazy=False) # Act await proxy.start() @@ -1331,8 +1493,8 @@ async def test_start_accepts_all_workers_when_lease_none( """Test sentinel accepts all workers when lease is None. Given: - A WorkerProxy with lease=None and a discovery stream - with 3 worker-added events + A non-lazy WorkerProxy with lease=None and a discovery + stream with 3 worker-added events When: The sentinel processes all events Then: @@ -1351,7 +1513,7 @@ async def test_start_accepts_all_workers_when_lease_none( ] events = [DiscoveryEvent("worker-added", metadata=w) for w in workers] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=None) + proxy = WorkerProxy(discovery=discovery, lease=None, lazy=False) # Act await proxy.start() @@ -1370,7 +1532,7 @@ async def test_start_allows_worker_updated_at_capacity( """Test sentinel processes worker-updated events even at capacity. Given: - A WorkerProxy with lease=2, already at capacity, + A non-lazy WorkerProxy with lease=2, already at capacity, receiving a worker-updated event When: The sentinel processes the update event @@ -1397,7 +1559,7 @@ async def test_start_allows_worker_updated_at_capacity( DiscoveryEvent("worker-updated", metadata=worker1), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=2) + proxy = WorkerProxy(discovery=discovery, lease=2, lazy=False) # Act await proxy.start() @@ -1417,8 +1579,9 @@ async def test_start_accepts_worker_after_drop_frees_capacity( """Test drop restores capacity for a subsequent worker-added event. Given: - A WorkerProxy with lease=2, at capacity with 2 workers, - then one worker is dropped followed by a new worker-added + A non-lazy WorkerProxy with lease=2, at capacity with 2 + workers, then one worker is dropped followed by a new + worker-added When: The proxy processes all events Then: @@ -1451,7 +1614,7 @@ async def test_start_accepts_worker_after_drop_frees_capacity( DiscoveryEvent("worker-added", metadata=worker3), ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=2) + proxy = WorkerProxy(discovery=discovery, lease=2, lazy=False) # Act await proxy.start() @@ -1472,8 +1635,8 @@ async def test_start_drops_update_for_rejected_worker( """Test worker-updated for a cap-rejected worker is silently dropped. Given: - A WorkerProxy with lease=1 and a discovery stream where - worker2 is rejected by the cap, then a worker-updated + A non-lazy WorkerProxy with lease=1 and a discovery stream + where worker2 is rejected by the cap, then a worker-updated event arrives for worker2 When: The proxy processes all events @@ -1500,7 +1663,7 @@ async def test_start_drops_update_for_rejected_worker( DiscoveryEvent("worker-updated", metadata=worker2), # dropped ] discovery = wp.ReducibleAsyncIterator(events) - proxy = WorkerProxy(discovery=discovery, lease=1) + proxy = WorkerProxy(discovery=discovery, lease=1, lazy=False) # Act await proxy.start() @@ -1521,7 +1684,7 @@ async def test_cloudpickle_serialization_preserves_lease( """Test pickle round-trip preserves lease cap behavior. Given: - A WorkerProxy with lease=2 and a discovery stream + A non-lazy WorkerProxy with lease=2 and a discovery stream with 3 worker-added events When: The proxy is pickled, unpickled, started, and processes @@ -1547,6 +1710,7 @@ async def test_cloudpickle_serialization_preserves_lease( discovery=discovery, loadbalancer=wp.RoundRobinLoadBalancer, lease=2, + lazy=False, ) # Act — pickle round-trip, then start the restored proxy @@ -1559,6 +1723,69 @@ async def test_cloudpickle_serialization_preserves_lease( assert len(restored.workers) == 2 await restored.stop() + def test_cloudpickle_serialization_with_lazy_false(self, mock_discovery_service): + """Test pickle round-trip with explicit lazy=False. + + Given: + A WorkerProxy with lazy=False. + When: + The proxy is pickled and unpickled. + Then: + It should preserve lazy as False on the restored proxy. + """ + # Arrange + proxy = WorkerProxy( + discovery=mock_discovery_service, + loadbalancer=wp.RoundRobinLoadBalancer, + lazy=False, + ) + + # Act + restored = cloudpickle.loads(cloudpickle.dumps(proxy)) + + # Assert + assert restored.lazy is False + + def test_cloudpickle_serialization_with_default_lazy(self, mock_discovery_service): + """Test pickle round-trip with default lazy=True. + + Given: + A WorkerProxy with default lazy=True. + When: + The proxy is pickled and unpickled. + Then: + It should preserve lazy as True on the restored proxy. + """ + # Arrange + proxy = WorkerProxy( + discovery=mock_discovery_service, + loadbalancer=wp.RoundRobinLoadBalancer, + ) + + # Act + restored = cloudpickle.loads(cloudpickle.dumps(proxy)) + + # Assert + assert restored.lazy is True + + @given(lazy=st.booleans()) + @settings(suppress_health_check=[HealthCheck.function_scoped_fixture]) + def test___init___with_arbitrary_lazy_value(self, mock_discovery_service, lazy): + """Test instantiation with an arbitrary lazy value. + + Given: + An arbitrary boolean value for lazy. + When: + WorkerProxy is instantiated with that value. + Then: + It should set the lazy property to the given value. + """ + # Act + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=lazy) + + # Assert + assert proxy.lazy is lazy + @pytest.mark.asyncio async def test_dispatch_delegates_to_loadbalancer( self, @@ -1610,20 +1837,111 @@ async def test_dispatch_not_started_raises_error( """Test raise RuntimeError. Given: - A WorkerProxy that is not started + A non-lazy WorkerProxy that is not started When: Dispatch is called Then: It should raise RuntimeError """ # Arrange - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act & assert with pytest.raises(RuntimeError, match="Proxy not started"): async for _ in await proxy.dispatch(mock_wool_task): pass + @pytest.mark.asyncio + async def test_dispatch_with_lazy_auto_start( + self, + spy_loadbalancer_with_workers, + spy_discovery_with_events, + mock_worker_connection, + mock_wool_task, + mock_proxy_session, + mocker, + ): + """Test dispatch auto-starts a lazy proxy on first call. + + Given: + A lazy WorkerProxy that has not been started. + When: + dispatch() is called. + Then: + It should auto-start the proxy and dispatch the task. + """ + # Arrange + discovery, metadata = spy_discovery_with_events + + proxy = WorkerProxy( + discovery=discovery, + loadbalancer=spy_loadbalancer_with_workers, + ) + + spy_loadbalancer_with_workers.worker_added_callback( + mock_worker_connection, metadata + ) + + assert not proxy.started + + # Act + result_iterator = await proxy.dispatch(mock_wool_task) + results = [result async for result in result_iterator] + + # Assert + assert proxy.started + assert results == ["test_result"] + await proxy.stop() + + @pytest.mark.asyncio + async def test_dispatch_with_lazy_concurrent_start( + self, + spy_loadbalancer_with_workers, + spy_discovery_with_events, + mock_worker_connection, + mock_proxy_session, + mocker, + ): + """Test concurrent dispatch calls start the proxy only once. + + Given: + A lazy WorkerProxy that has not been started. + When: + Two dispatch() calls are made concurrently. + Then: + It should start the proxy and both dispatches should succeed. + """ + # Arrange + discovery, metadata = spy_discovery_with_events + + proxy = WorkerProxy( + discovery=discovery, + loadbalancer=spy_loadbalancer_with_workers, + ) + + spy_loadbalancer_with_workers.worker_added_callback( + mock_worker_connection, metadata + ) + + task = mocker.MagicMock(spec=Task) + spy_loadbalancer_with_workers.dispatch = mocker.AsyncMock( + return_value=mocker.AsyncMock( + __aiter__=mocker.Mock(return_value=mocker.AsyncMock()), + __anext__=mocker.AsyncMock(side_effect=StopAsyncIteration), + ) + ) + + # Act + await asyncio.gather( + proxy.dispatch(task), + proxy.dispatch(task), + ) + + # Assert + assert proxy.started + assert spy_loadbalancer_with_workers.dispatch.call_count == 2 + await proxy.stop() + @pytest.mark.asyncio async def test_dispatch_propagates_loadbalancer_errors( self, @@ -1845,7 +2163,7 @@ async def test_proxy_with_static_workers_list(self): ), ] - proxy = WorkerProxy(workers=workers) + proxy = WorkerProxy(workers=workers, lazy=False) # Act & assert async with proxy as p: @@ -1860,14 +2178,14 @@ async def test_proxy_with_pool_uri(self): """Test it starts and stops correctly. Given: - A WorkerProxy configured with a pool URI + A non-lazy WorkerProxy configured with a pool URI When: The proxy is used as a context manager Then: It starts and stops correctly """ # Arrange - proxy = WorkerProxy("test://pool") + proxy = WorkerProxy("test://pool", lazy=False) # Act & assert async with proxy as p: @@ -1902,7 +2220,7 @@ async def test_workers_property_returns_workers_list( await mock_discovery_service.start() mock_discovery_service.inject_worker_added(metadata) - proxy = WorkerProxy(discovery=mock_discovery_service) + proxy = WorkerProxy(discovery=mock_discovery_service, lazy=False) # Act async with proxy: @@ -1996,7 +2314,9 @@ async def test_cloudpickle_serialization_with_services(self): # Arrange - Use real objects instead of mocks for cloudpickle test discovery_service = LocalDiscovery("test-pool").subscriber proxy = WorkerProxy( - discovery=discovery_service, loadbalancer=wp.RoundRobinLoadBalancer + discovery=discovery_service, + loadbalancer=wp.RoundRobinLoadBalancer, + lazy=False, ) # Act & assert @@ -2018,7 +2338,7 @@ async def test_cloudpickle_serialization_discovery_only(self): in an unstarted state. Given: - A started WorkerProxy with only discovery service + A non-lazy started WorkerProxy with only discovery service When: Cloudpickle serialization and deserialization are performed within context @@ -2028,7 +2348,7 @@ async def test_cloudpickle_serialization_discovery_only(self): """ # Arrange - Use real objects instead of mocks for cloudpickle test discovery_service = LocalDiscovery("test-pool").subscriber - proxy = WorkerProxy(discovery=discovery_service) + proxy = WorkerProxy(discovery=discovery_service, lazy=False) # Act & assert async with proxy: @@ -2049,7 +2369,7 @@ async def test_cloudpickle_serialization_uri_preserves_id(self): in an unstarted state and preserved ID. Given: - A started WorkerProxy created with only a URI + A non-lazy started WorkerProxy created with only a URI When: Cloudpickle serialization and deserialization are performed within context @@ -2058,7 +2378,7 @@ async def test_cloudpickle_serialization_uri_preserves_id(self): proxy in an unstarted state and preserved ID """ # Arrange - Use real objects - this creates a LocalDiscovery internally - proxy = WorkerProxy("pool-1") + proxy = WorkerProxy("pool-1", lazy=False) # Act & assert async with proxy: @@ -2189,6 +2509,7 @@ async def test_cloudpickle_serialization_resolves_credentials_from_context( # static workers — default credentials resolve from ContextVar. restored_proxy = WorkerProxy( workers=[secure_worker, insecure_worker], + lazy=False, ) # Assert — resolved credentials from ContextVar: only secure workers @@ -2324,7 +2645,8 @@ async def test_explicit_credentials_parameter_overrides_contextvar( Given: A WorkerCredentials context is active with mTLS credentials - and a mix of secure and insecure static workers. + and a non-lazy WorkerProxy with a mix of secure and insecure + static workers. When: WorkerProxy is created with explicit None credentials. Then: @@ -2353,6 +2675,7 @@ async def test_explicit_credentials_parameter_overrides_contextvar( proxy = WorkerProxy( workers=[secure_worker, insecure_worker], credentials=None, + lazy=False, ) await proxy.start() @@ -2370,8 +2693,9 @@ async def test_credentials_default_resolves_from_contextvar( """Test default credentials resolves from ContextVar. Given: - A WorkerCredentials context is active and a mix of secure - and insecure static workers. + A WorkerCredentials context is active and a non-lazy + WorkerProxy with a mix of secure and insecure static + workers. When: WorkerProxy is created without explicit credentials. Then: @@ -2399,6 +2723,7 @@ async def test_credentials_default_resolves_from_contextvar( with CredentialContext(worker_credentials): proxy = WorkerProxy( workers=[secure_worker, insecure_worker], + lazy=False, ) await proxy.start() @@ -2416,8 +2741,9 @@ async def test_credentials_none_without_contextvar( """Test credentials resolve to None when no context is set. Given: - No WorkerCredentials context is active and a mix of secure - and insecure static workers. + No WorkerCredentials context is active and a non-lazy + WorkerProxy with a mix of secure and insecure static + workers. When: WorkerProxy is created without explicit credentials. Then: @@ -2444,6 +2770,7 @@ async def test_credentials_none_without_contextvar( # Act proxy = WorkerProxy( workers=[secure_worker, insecure_worker], + lazy=False, ) await proxy.start() @@ -2461,7 +2788,8 @@ async def test_start_invalid_loadbalancer_type_raises_error( """Test raise ValueError. Given: - A WorkerProxy with a loadbalancer that doesn't implement LoadBalancerLike + A non-lazy WorkerProxy with a loadbalancer that doesn't + implement LoadBalancerLike When: Start() is called Then: @@ -2475,7 +2803,9 @@ class NotALoadBalancer: invalid_loadbalancer = NotALoadBalancer() proxy = WorkerProxy( - pool_uri="test-pool", loadbalancer=lambda: invalid_loadbalancer + pool_uri="test-pool", + loadbalancer=lambda: invalid_loadbalancer, + lazy=False, ) # Act & assert @@ -2489,7 +2819,8 @@ async def test_start_invalid_discovery_type_raises_error( """Test raise ValueError. Given: - A WorkerProxy with a discovery that doesn't implement AsyncIterator + A non-lazy WorkerProxy with a discovery that doesn't + implement AsyncIterator When: Start() is called Then: @@ -2498,7 +2829,7 @@ async def test_start_invalid_discovery_type_raises_error( # Arrange - use a simple string which is definitely not an AsyncIterator invalid_discovery = "not_an_async_iterator" - proxy = WorkerProxy(discovery=lambda: invalid_discovery) + proxy = WorkerProxy(discovery=lambda: invalid_discovery, lazy=False) # Act & assert with pytest.raises(ValueError): @@ -2511,8 +2842,8 @@ async def test_security_filter_with_credentials( """Test only secure workers are discovered with credentials. Given: - A WorkerProxy instantiated with credentials and a mix - of secure and insecure static workers. + A non-lazy WorkerProxy instantiated with credentials and a + mix of secure and insecure static workers. When: The proxy is started and discovery events are processed. Then: @@ -2538,6 +2869,7 @@ async def test_security_filter_with_credentials( proxy = WorkerProxy( workers=[secure_worker, insecure_worker], credentials=worker_credentials, + lazy=False, ) # Act diff --git a/wool/tests/utilities/test_noreentry.py b/wool/tests/utilities/test_noreentry.py new file mode 100644 index 0000000..1e0b08c --- /dev/null +++ b/wool/tests/utilities/test_noreentry.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import asyncio + +import pytest + +from wool.utilities.noreentry import noreentry + + +# Test helpers (not fixtures) +class _SyncDummy: + """Class with a sync @noreentry method.""" + + @noreentry + def run(self): + return "ok" + + +class _AsyncDummy: + """Class with an async @noreentry method.""" + + @noreentry + async def run(self): + return "ok" + + +class _MultiMethodDummy: + """Class with multiple @noreentry methods.""" + + @noreentry + def first(self): + return "first" + + @noreentry + def second(self): + return "second" + + +class TestNoReentry: + """Tests for the noreentry decorator.""" + + def test_noreentry_sync_method_first_invocation(self): + """Test sync method executes normally on first invocation. + + Given: + A class with a sync @noreentry method. + When: + The method is called for the first time. + Then: + It should return normally. + """ + # Arrange + obj = _SyncDummy() + + # Act + result = obj.run() + + # Assert + assert result == "ok" + + def test_noreentry_sync_method_second_invocation_raises(self): + """Test sync method raises RuntimeError on second invocation. + + Given: + A class with a sync @noreentry method called once. + When: + The method is called a second time on the same instance. + Then: + It should raise RuntimeError. + """ + # Arrange + obj = _SyncDummy() + obj.run() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + obj.run() + + @pytest.mark.asyncio + async def test_noreentry_async_method_first_invocation(self): + """Test async method executes normally on first invocation. + + Given: + A class with an async @noreentry method. + When: + The method is awaited for the first time. + Then: + It should return normally. + """ + # Arrange + obj = _AsyncDummy() + + # Act + result = await obj.run() + + # Assert + assert result == "ok" + + @pytest.mark.asyncio + async def test_noreentry_async_method_second_invocation_raises(self): + """Test async method raises RuntimeError on second invocation. + + Given: + A class with an async @noreentry method awaited once. + When: + The method is awaited a second time on the same instance. + Then: + It should raise RuntimeError. + """ + # Arrange + obj = _AsyncDummy() + await obj.run() + + # Act & assert + with pytest.raises(RuntimeError, match="cannot be invoked more than once"): + await obj.run() + + def test_noreentry_separate_instances_independent(self): + """Test instances track guard state independently. + + Given: + Two instances of a class with a @noreentry method. + When: + The method is called on the first instance. + Then: + The method remains callable on the second instance. + """ + # Arrange + a = _SyncDummy() + b = _SyncDummy() + a.run() + + # Act + result = b.run() + + # Assert + assert result == "ok" + + def test_noreentry_error_message_qualname(self): + """Test RuntimeError includes the method's qualified name. + + Given: + A class with a @noreentry method called once. + When: + The method is called a second time. + Then: + The RuntimeError message should include the method's __qualname__. + """ + # Arrange + obj = _SyncDummy() + obj.run() + + # Act & assert + with pytest.raises(RuntimeError, match="_SyncDummy.run"): + obj.run() + + def test_noreentry_preserves_coroutinefunction_check(self): + """Test decorator preserves async function detection. + + Given: + A class with an async @noreentry method. + When: + asyncio.iscoroutinefunction is called on the method. + Then: + It should return True. + """ + # Act & assert + assert asyncio.iscoroutinefunction(_AsyncDummy.run) + + def test_noreentry_preserves_wrapped_function_name(self): + """Test decorator preserves the original function name. + + Given: + A class with a @noreentry method. + When: + The decorated method's __name__ is inspected. + Then: + It should equal the original function name. + """ + # Act & assert + assert _SyncDummy.run.__name__ == "run" + + def test_noreentry_multiple_methods_independent(self): + """Test guard on one method does not affect other methods. + + Given: + A class with two @noreentry methods where the first + has been guarded (called twice). + When: + The second method is called. + Then: + The second method executes normally. + """ + # Arrange + obj = _MultiMethodDummy() + obj.first() + with pytest.raises(RuntimeError): + obj.first() + + # Act + result = obj.second() + + # Assert + assert result == "second" + + def test_noreentry_unbound_access_returns_descriptor(self): + """Test accessing decorated method via class returns descriptor. + + Given: + A class with a @noreentry method. + When: + The method is accessed through the class (unbound). + Then: + It should return the descriptor itself. + """ + # Act + unbound = _SyncDummy.run + + # Assert + assert unbound is not None + + def test_noreentry_bare_function_raises_error(self): + """Test decorator rejects application to bare functions. + + Given: + A @noreentry-decorated function called without a bound instance. + When: + The descriptor is called directly. + Then: + It should raise TypeError. + """ + # Arrange + unbound = _SyncDummy.run + + # Act & assert + with pytest.raises( + TypeError, match="only decorates methods, not bare functions" + ): + unbound()