feat: customization options for contrib, middleware, events, and queue config#28
Merged
pratyush618 merged 13 commits intomasterfrom Mar 14, 2026
Merged
feat: customization options for contrib, middleware, events, and queue config#28pratyush618 merged 13 commits intomasterfrom
pratyush618 merged 13 commits intomasterfrom
Conversation
OTel: span_name_fn, attribute_prefix, extra_attributes_fn, task_filter. Sentry: tag_prefix, transaction_name_fn, task_filter, extra_tags_fn. Prometheus: refactor to per-namespace instance-based metric stores with namespace, extra_labels_fn, disabled_metrics params on both middleware and stats collector. FastAPI: include_routes/exclude_routes, dependencies, sse_poll_interval, result_timeout, default_page_size/max_page_size, result_serializer. Flask: cli_group param, --format json|table on info command. Django: TASKITO_AUTODISCOVER_MODULE, TASKITO_ADMIN_PER_PAGE, TASKITO_ADMIN_TITLE/HEADER, TASKITO_WATCH_INTERVAL, TASKITO_DASHBOARD_HOST/PORT settings.
Add on_enqueue, on_dead_letter, on_timeout, on_cancel hooks to TaskMiddleware base class. on_enqueue receives a mutable options dict. Add WORKER_STARTED, WORKER_STOPPED, QUEUE_PAUSED, QUEUE_RESUMED event types to EventType enum. Add per-webhook max_retries, timeout, retry_backoff params to WebhookManager.add_webhook() with configurable exponential backoff.
Queue: event_workers, scheduler_poll_interval_ms, scheduler_reap_interval, scheduler_cleanup_interval params. Per-task: serializer, max_retry_delay, max_concurrent params on @queue.task(). Queue-level: set_queue_rate_limit() and set_queue_concurrency() methods. Wire on_enqueue middleware hook in enqueue() with mutable options dict. Emit WORKER_STARTED/WORKER_STOPPED events in run_worker(). Pass queue configs as JSON to Rust worker.
…tcomes
Rust core:
- SchedulerConfig fields (poll_interval, reap_interval, cleanup_interval)
now wired from Python via PyQueue constructor params.
- PyTaskConfig gains max_retry_delay and max_concurrent fields.
- max_retry_delay wired to RetryPolicy.max_delay_ms (was hardcoded 300s).
- Per-task concurrency via count_running_by_task() Storage method (all 3
backends) checked in scheduler poller before dispatch.
- QueueConfig struct with rate_limit and max_concurrent, checked in poller
before per-task checks. Queue rate limits use "queue:{name}" keys.
- ResultOutcome enum returned from handle_result() — Success, Retry,
DeadLettered, Cancelled variants enable Python-side middleware dispatch.
- dispatch_outcome() in worker.rs calls Python middleware hooks (on_retry,
on_dead_letter, on_cancel) and emits JOB_RETRYING, JOB_DEAD,
JOB_CANCELLED events from the result loop.
Add test_customizability.py with 23 tests covering middleware hooks, event system, webhook config, queue/task config, per-task serializer, queue-level limits, and Flask CLI options. Update test_contrib.py fixtures for OTel/Sentry/Prometheus middleware refactoring (new instance attributes). Update test_events.py for 4 new event types.
Update middleware guide with 7-hook table and on_enqueue example. Update events guide with 4 new event types and payload fields. Update tasks guide with max_retry_delay, max_concurrent, serializer. Update queue guide with set_queue_rate_limit/set_queue_concurrency. Update API reference with all new Queue/task params. Update all 6 integration pages with configuration sections. Add 0.6.0 changelog entries.
Add timed_out field to JobResult::Failure and propagate through ResultOutcome::Retry/DeadLettered variants. Maintenance reap_stale() sets timed_out: true; all other failure paths set false. dispatch_outcome() now calls on_timeout() middleware before on_retry() or on_dead_letter() when the timeout flag is set.
Call _emit_event() in pause() and resume() methods on QueueOperationsMixin so event bus listeners and webhooks receive queue lifecycle notifications.
Add _deserialize_payload() method on Queue that routes to the per-task serializer (or falls back to queue-level serializer). Both Rust worker paths (sync py_worker.rs and async task_executor.rs) now call this method instead of cloudpickle.loads() directly, completing the per-task serializer feature for both enqueue and worker sides.
Add cleanup workflow to delete PR branch caches on close. Avoid redundant cache saves in rust-test (reads from lint) and test on ubuntu (already saved by lint).
Run rust-test in parallel with lint instead of sequentially. Test jobs now only wait on lint, not rust-test. Reduces wall-clock time by ~2 minutes per run.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
on_enqueue,on_dead_letter,on_timeout,on_cancel) wired end-to-end from Rust result handler to Python middleware chainsWORKER_STARTED/STOPPED,QUEUE_PAUSED/RESUMED), all events now actually emitted includingJOB_RETRYING,JOB_DEAD,JOB_CANCELLEDpoll_interval,reap_interval,cleanup_interval), per-taskmax_retry_delay,max_concurrent,serializer(full round-trip), queue-levelset_queue_rate_limit()/set_queue_concurrency()max_retries,timeout,retry_backoffTest plan
cargo test --workspace— 43 Rust tests passcargo check --workspace --features postgres— compilescargo check --workspace --features redis— compilesuv run maturin develop— wheel buildsuv run python -m pytest tests/python/ -v— 345 Python tests pass (23 new intest_customizability.py)uv run ruff check py_src/— cleanuv run mypy py_src/taskito/ --no-incremental— clean