Skip to content

pratyush618/taskito

Repository files navigation

taskito

PyPI version Python versions License

A Rust-powered task queue for Python. No broker required — just SQLite or Postgres.

pip install taskito                # SQLite (default)
pip install taskito[postgres]      # with Postgres backend

Quickstart

1. Define tasks in tasks.py:

from taskito import Queue

queue = Queue(db_path="tasks.db")

@queue.task()
def add(a: int, b: int) -> int:
    return a + b

2. Start a worker in one terminal:

taskito worker --app tasks:queue

3. Enqueue jobs from another terminal or script:

from tasks import add

job = add.delay(2, 3)
print(job.result(timeout=10))  # 5

Why taskito?

Most Python task queues require a separate broker (Redis, RabbitMQ) even for single-machine workloads. taskito embeds everything — storage, scheduling, and worker management — into a single pip install with no external dependencies beyond Python itself. For distributed setups, an optional Postgres backend enables multi-machine workers with the same API.

The heavy lifting runs in Rust: a Tokio async scheduler, OS thread worker pool with crossbeam channels, and Diesel ORM over SQLite in WAL mode. Python's GIL is only held during task execution.

Features

  • Priority queues — higher priority jobs run first
  • Retry with exponential backoff — automatic retries with jitter
  • Dead letter queue — inspect and replay failed jobs
  • Rate limiting — token bucket with "100/m" syntax
  • Task dependenciesdepends_on for DAG workflows with cascade cancel
  • Task workflowschain, group, chord primitives
  • Periodic tasks — cron scheduling with seconds granularity
  • Progress tracking — report and read progress from inside tasks
  • Job cancellation — cancel pending or running jobs
  • Unique tasks — deduplicate active jobs by key
  • Batch enqueuetask.map() for high-throughput bulk inserts
  • Named queues — route tasks to isolated queues
  • Hooks — before/after/success/failure middleware
  • Per-task middlewareTaskMiddleware with before/after/on_retry hooks
  • Pluggable serializersCloudpickleSerializer (default), JsonSerializer, or custom
  • Cancel running tasks — cooperative cancellation with check_cancelled()
  • Soft timeoutscheck_timeout() inside tasks
  • Worker heartbeat — monitor worker health via queue.workers()
  • Job expirationexpires parameter for time-sensitive jobs
  • Exception filteringretry_on / dont_retry_on for selective retries
  • OpenTelemetry — optional tracing integration via pip install taskito[otel]
  • Async supportawait job.aresult(), await queue.astats()
  • Web dashboardtaskito dashboard --app myapp:queue serves a built-in monitoring UI
  • FastAPI integrationTaskitoRouter for instant REST API over the queue
  • Postgres backend — optional multi-machine storage via PostgreSQL
  • Events system — subscribe to JOB_COMPLETED, JOB_FAILED, etc. with queue.on_event()
  • Webhooksqueue.add_webhook(url, events, secret) with HMAC-SHA256 signing
  • Job archivalqueue.archive(older_than=86400), queue.list_archived()
  • Queue pause/resumequeue.pause(), queue.resume(), queue.paused_queues()
  • Circuit breakerscircuit_breaker={"threshold": 5, "window": 60, "cooldown": 300}
  • Structured loggingcurrent_job.log("msg", level="info", extra={...})
  • CLItaskito worker, taskito info --watch, taskito dashboard

Integrations

Install optional extras to unlock additional integrations:

Extra Install What you get
Flask pip install taskito[flask] Taskito(app) extension, flask taskito worker CLI
FastAPI pip install taskito[fastapi] TaskitoRouter for instant REST API over the queue
Django pip install taskito[django] Admin integration, management commands
OpenTelemetry pip install taskito[otel] Distributed tracing with span-per-task
Prometheus pip install taskito[prometheus] PrometheusMiddleware, queue depth gauges, /metrics server
Sentry pip install taskito[sentry] SentryMiddleware with auto error capture and task tags
Encryption pip install taskito[encryption] EncryptedSerializer for at-rest payload encryption
MsgPack pip install taskito[msgpack] MsgpackSerializer for compact binary serialization
Postgres pip install taskito[postgres] Multi-machine workers via PostgreSQL backend
Redis pip install taskito[redis] Redis storage backend

Examples

Retry with Backoff

@queue.task(max_retries=5, retry_backoff=2.0)
def fetch_url(url: str) -> str:
    return requests.get(url).text

Priority Queues

urgent_report.apply_async(args=[data], priority=10)
bulk_report.delay(data)  # default priority 0

Rate Limiting

@queue.task(rate_limit="100/m")
def call_api(endpoint: str) -> dict:
    return requests.get(endpoint).json()

Task Dependencies

download = fetch_file.delay("data.csv")
parsed = parse_file.apply_async(
    args=["data.csv"],
    depends_on=[download.id],
)
# parsed waits until download completes; if download fails, parsed is cancelled

Workflows

from taskito import chain, group, chord

# Sequential pipeline — each step receives the previous result
chain(fetch.s(url), parse.s(), store.s()).apply()

# Parallel fan-out
group(process.s(chunk) for chunk in chunks).apply()

# Parallel + callback when all complete
chord([download.s(u) for u in urls], merge.s()).apply()

Periodic Tasks

@queue.periodic(cron="0 0 */6 * * *")
def cleanup_temp_files():
    ...

Progress Tracking

from taskito import current_job

@queue.task()
def train_model(epochs: int):
    for i in range(epochs):
        ...
        current_job.update_progress(int((i + 1) / epochs * 100))

Hooks

@queue.on_failure
def alert_on_failure(task_name, args, kwargs, error):
    slack.post(f"Task {task_name} failed: {error}")

Exception Filtering

@queue.task(
    max_retries=5,
    retry_on=[ConnectionError, TimeoutError],
    dont_retry_on=[ValueError],
)
def fetch_data(url: str) -> dict:
    return requests.get(url).json()

Per-Task Middleware

from taskito import TaskMiddleware

class TimingMiddleware(TaskMiddleware):
    def before(self, ctx):
        ctx._start = time.time()

    def after(self, ctx, result, error):
        elapsed = time.time() - ctx._start
        print(f"{ctx.task_name} took {elapsed:.2f}s")

@queue.task(middleware=[TimingMiddleware()])
def process(data):
    ...

Delayed Scheduling

# Run 30 minutes from now
reminder.apply_async(args=[user_id, msg], delay=1800)

Unique Tasks

report.apply_async(args=[user_id], unique_key=f"report:{user_id}")
# Second enqueue with same key is silently deduplicated while first is active

FastAPI Integration

from fastapi import FastAPI
from taskito.contrib.fastapi import TaskitoRouter

app = FastAPI()
app.include_router(TaskitoRouter(queue), prefix="/tasks")
# GET /tasks/stats, GET /tasks/jobs/{id}, GET /tasks/jobs/{id}/progress (SSE), ...

Batch Enqueue

jobs = send_email.map([("alice@x.com",), ("bob@x.com",), ("carol@x.com",)])

Async Support

job = expensive_task.delay(data)
result = await job.aresult(timeout=30)
stats = await queue.astats()

Testing

taskito includes a built-in test mode — no worker needed:

def test_add():
    with queue.test_mode() as results:
        add.delay(2, 3)
        assert results[0].return_value == 5

Documentation

Full documentation with guides, API reference, architecture diagrams, and examples:

Read the docs →

Coming from Celery? See the Migration Guide.

Comparison

Feature taskito Celery RQ Dramatiq Huey
Broker required No Yes Yes Yes Yes
Core language Rust + Python Python Python Python Python
Priority queues Yes Yes No No Yes
Rate limiting Yes Yes No Yes No
Dead letter queue Yes No Yes No No
Task dependencies Yes No No No No
Task chaining Yes Yes No Yes No
Built-in dashboard Yes No No No No
FastAPI integration Yes No No No No
Per-task middleware Yes No No Yes No
Cancel running tasks Yes Yes No No No
Custom serializers Yes Yes No No No
Postgres backend Yes Yes No No No
Setup pip install Broker + backend Redis Broker Redis

License

MIT

About

Rust based python task worker

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors