Skip to content

Add admission control hook for worker backpressure — Closes #57#149

Open
conradbzura wants to merge 4 commits intowool-labs:mainfrom
conradbzura:57-admission-control-hook
Open

Add admission control hook for worker backpressure — Closes #57#149
conradbzura wants to merge 4 commits intowool-labs:mainfrom
conradbzura:57-admission-control-hook

Conversation

@conradbzura
Copy link
Copy Markdown
Contributor

Summary

Add a BackpressureLike protocol and BackpressureContext dataclass that let users configure a per-worker hook to reject incoming tasks based on workload-specific criteria. The hook receives a snapshot of the worker's active task count and the incoming task, returning True to reject (triggering gRPC RESOURCE_EXHAUSTED) or False to accept. The load balancer already treats RESOURCE_EXHAUSTED as transient and skips to the next worker — so no client-side changes are needed.

The default behavior (no hook) accepts all tasks, preserving current semantics. The existing WorkerConnection semaphore continues to cap per-worker concurrent dispatches on the client side; this hook adds server-side admission control.

Closes #57

Proposed changes

BackpressureLike protocol and BackpressureContext dataclass (service.py)

Introduce two new public types in wool.runtime.worker.service:

  • BackpressureContext(frozen=True) — immutable snapshot with active_task_count: int and task: Task
  • BackpressureLike(Protocol) — runtime-checkable protocol accepting both sync and async callables. Return True to reject, False to accept. The falsey default of None (no hook) naturally means "accept all".

Backpressure check in WorkerService.dispatch (service.py)

Insert the check after task deserialization but before the Ack response. If the hook returns truthy, the service calls context.abort(StatusCode.RESOURCE_EXHAUSTED, ...) — the task never enters the docket and no Ack is sent. Both sync and async hooks are supported via inspect.isawaitable().

Thread hook through WorkerProcess (process.py)

Accept backpressure in WorkerProcess.__init__ and serialize it with cloudpickle.dumps() to cross the multiprocessing.spawn boundary. This supports lambdas, closures, and callable class instances — not just top-level functions. Deserialize with cloudpickle.loads() in _serve() before passing to WorkerService.

User-facing API on LocalWorker (local.py)

Add backpressure: BackpressureLike | None = None parameter to LocalWorker.__init__, passed through to WorkerProcess. Usage with WorkerPool is supported via functools.partial:

from functools import partial

worker_factory = partial(LocalWorker, backpressure=my_hook)
async with WorkerPool(worker=worker_factory):
    ...

Public exports (__init__.py)

Export BackpressureContext and BackpressureLike from the top-level wool package.

Test cases

# Test Suite Given When Then Coverage Target
1 TestBackpressureContext Valid active_task_count and Task Instantiated Stores both fields correctly Field storage
2 TestBackpressureContext A BackpressureContext instance Attribute reassigned Raises AttributeError Frozen immutability
3 TestBackpressureContext Two instances with identical fields Compared with == Returns True Equality
4 TestBackpressureContext Two instances with different count Compared with == Returns False Inequality
5 TestBackpressureContext Any non-negative int (Hypothesis) Instantiated Field equals input Arbitrary task count
6 TestBackpressureLike Sync function isinstance check Returns True Sync protocol
7 TestBackpressureLike Async function isinstance check Returns True Async protocol
8 TestBackpressureLike Callable class instance isinstance check Returns True Class protocol
9 TestBackpressureLike A plain string isinstance check Returns False Negative case
10 TestWorkerServiceBackpressure Callable hook WorkerService(backpressure=hook) Initializes with events unset Init with hook
11 TestWorkerServiceBackpressure Sync hook returning False Dispatch RPC called Accepts task, returns result Sync accept
12 TestWorkerServiceBackpressure Sync hook returning True Dispatch RPC called RESOURCE_EXHAUSTED raised Sync reject
13 TestWorkerServiceBackpressure Async hook returning True Dispatch RPC called RESOURCE_EXHAUSTED raised Async reject
14 TestWorkerServiceBackpressure Async hook returning False Dispatch RPC called Accepts task, returns result Async accept
15 TestWorkerServiceBackpressure Hook capturing argument Dispatch RPC called Receives BackpressureContext with correct fields Context validation
16 TestWorkerServiceBackpressure One active task dispatched Second dispatch with hook Hook sees active_task_count == 1 Active count
17 TestLocalWorker Callable hook LocalWorker(backpressure=hook) Constructs without error Init with hook

@conradbzura conradbzura self-assigned this Apr 1, 2026
@conradbzura conradbzura added this to the v0.8.0 milestone Apr 1, 2026
Add BackpressureLike protocol and BackpressureContext dataclass that
let users configure a per-worker hook to reject incoming tasks. The
hook receives a snapshot of active task count and the incoming task,
returning True to reject (triggering gRPC RESOURCE_EXHAUSTED) or
False to accept. The load balancer already treats RESOURCE_EXHAUSTED
as transient and skips to the next worker.

The hook is threaded through LocalWorker -> WorkerProcess ->
WorkerService, serialized via cloudpickle across the spawn boundary
to support lambdas and closures.
Cover BackpressureContext (instantiation, frozen immutability, equality,
Hypothesis property test), BackpressureLike protocol (sync, async,
callable class, negative case), WorkerService dispatch with
backpressure (sync/async accept and reject, context field validation,
active task counting), and LocalWorker construction with the hook.
Update public API completeness test for the new exports.
@conradbzura conradbzura force-pushed the 57-admission-control-hook branch from 1fdf83e to 3b0a441 Compare April 1, 2026 17:04
@conradbzura conradbzura marked this pull request as ready for review April 1, 2026 17:04
LocalWorker, WorkerProcess, and WorkerService were missing docstring
entries for parameters introduced by the backpressure feature.
WorkerProcess was also missing uid, tags, and extra params that
predated this PR. Removes a stale :param tasks: entry from
WorkerService._cancel which takes no arguments.
Extends the pairwise covering array and Hypothesis strategy with a
BackpressureMode dimension (NONE, SYNC, ASYNC) to verify that
backpressure hooks survive cloudpickle serialization through real
subprocesses. Adds rejection composition tests validating the
end-to-end RESOURCE_EXHAUSTED path and load balancer fallback to an
accepting worker.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add admission control hook for worker backpressure

1 participant