Skip to content
/ qwerk Public

A type-safe job queue for Bun with multiple backends (Memory, BroadcastChannel, Redis)

License

Notifications You must be signed in to change notification settings

brielov/qwerk

Repository files navigation

qwerk

A lightweight, type-safe job queue for Bun.

Features

  • Zero dependencies - No external runtime dependencies
  • Multiple backends - Memory, BroadcastChannel, or Redis
  • Type-safe - Full TypeScript generics for job types and results
  • Retry with backoff - Exponential, linear, or fixed strategies with jitter
  • Job progress - Report and track job progress (0-100%)
  • Job results - Store and retrieve return values from handlers
  • Repeatable jobs - Cron expressions or fixed intervals
  • Rate limiting - Token bucket algorithm
  • Priority queues - Lower number = higher priority
  • Dead letter queue - Failed jobs are preserved for inspection/retry
  • Graceful shutdown - Wait for in-flight jobs with timeout
  • Job timeout - Automatic cancellation via AbortController
  • Visibility timeout - Stalled job recovery
  • Deduplication - Prevent duplicate jobs via custom IDs
  • Metrics - Throughput, processing time, queue depth

Installation

bun add qwerk

Quick Start

import { Queue, MemoryBackend } from "qwerk";

// Define your job types
type Jobs = {
  "send-email": { to: string; subject: string; body: string };
  "process-image": { url: string; width: number };
};

// Create queue with in-memory backend
const queue = new Queue<Jobs>(new MemoryBackend());

// Register handlers
queue.process("send-email", async (job, ctx) => {
  await ctx.updateProgress(50);
  console.log(`Sending email to ${job.data.to}`);
  await ctx.updateProgress(100);
  return { sent: true, timestamp: Date.now() };
});

// Add jobs
await queue.add("send-email", {
  to: "user@example.com",
  subject: "Hello",
  body: "World",
});

// Start processing
queue.start();

// Later: graceful shutdown
await queue.stop();

Backends

MemoryBackend

Single-process, non-persistent. Good for development and testing.

import { MemoryBackend } from "qwerk";

const backend = new MemoryBackend();

BroadcastBackend

Cross-tab/worker communication using the BroadcastChannel API. Works in browsers and Bun/Node worker threads.

import { BroadcastBackend } from "qwerk";

const backend = new BroadcastBackend("my-queue-channel");

RedisBackend

Distributed, persistent queue using Bun's built-in Redis client.

import { RedisBackend } from "qwerk";

const backend = new RedisBackend("redis://localhost:6379", {
  prefix: "myapp:queue",
});

API

Queue Options

const queue = new Queue<Jobs>(backend, {
  pollInterval: 1000, // How often to check for jobs (ms)
  concurrency: 5, // Max parallel jobs
  visibilityTimeout: 30000, // Job lock timeout (ms)
  stalledInterval: 5000, // How often to check for stalled jobs
  logger: silentLogger, // Custom logger (or consoleLogger)
  maxPayloadSize: 1024, // Max job data size (bytes)
  maxQueueSize: 10000, // Max pending jobs
  rateLimit: {
    max: 100, // Max jobs per interval
    duration: 1000, // Interval in ms
  },
});

Adding Jobs

// Simple add
const job = await queue.add("send-email", { to: "a@b.com", subject: "Hi", body: "Hello" });

// With options
await queue.add("send-email", data, {
  delay: 5000, // Delay execution by 5s
  maxAttempts: 5, // Retry up to 5 times
  priority: 1, // Lower = higher priority
  timeout: 60000, // Job timeout (ms)
  jobId: "unique-123", // Custom ID for deduplication
  backoff: {
    type: "exponential", // or "linear" | "fixed"
    delay: 1000,
    maxDelay: 30000,
  },
  repeat: {
    cron: "0 9 * * MON", // Every Monday at 9am
    // or: every: 3600000,  // Every hour
    limit: 10, // Max 10 repetitions
  },
});

// Bulk add
const jobs = await queue.addBulk([
  { name: "send-email", data: { to: "a@b.com", subject: "1", body: "..." } },
  { name: "send-email", data: { to: "b@c.com", subject: "2", body: "..." } },
]);

Processing Jobs

// Basic handler
queue.process("send-email", async (job) => {
  console.log(job.data.to);
});

// With progress and result
queue.process<"process-image", { thumbnail: string }>("process-image", async (job, ctx) => {
  await ctx.updateProgress(10);

  // ctx.signal is an AbortSignal for cancellation
  const result = await processImage(job.data.url, { signal: ctx.signal });

  await ctx.updateProgress(100);
  return { thumbnail: result.url };
});

Events

queue.on("added", (job) => console.log("Job added:", job.id));
queue.on("active", (job) => console.log("Job started:", job.id));
queue.on("progress", (job, progress) => console.log(`Job ${job.id}: ${progress}%`));
queue.on("completed", (job, result) => console.log("Job done:", result));
queue.on("failed", (job, error) => console.log("Job failed:", error.message));
queue.on("retry", (job, error) => console.log("Job retrying:", job.attempts));
queue.on("stalled", (job) => console.log("Job stalled:", job.id));
queue.on("timeout", (job) => console.log("Job timed out:", job.id));

// One-time listener
queue.once("completed", (job) => console.log("First job done!"));

// Remove listener
queue.off("completed", myHandler);

Queue Control

queue.start(); // Start processing
queue.pause(); // Stop picking up new jobs
queue.resume(); // Resume after pause
await queue.stop(); // Graceful shutdown (waits for in-flight)
await queue.stop(5000); // Shutdown with 5s timeout
await queue.drain(); // Wait for all current jobs to complete
await queue.close(); // Stop + close backend

Inspecting the Queue

await queue.size(); // Pending jobs count
await queue.activeCount(); // Currently processing
await queue.failedCount(); // Jobs in DLQ
await queue.completedCount(); // Completed jobs with results

// Get failed jobs
const failed = await queue.getFailed(100);
for (const { job, error, failedAt } of failed) {
  console.log(`${job.id} failed: ${error}`);
}

// Retry a failed job
await queue.retryFailed(jobId);

// Get completed jobs with results
const completed = await queue.getCompletedJobs(100);
for (const { job, result, completedAt } of completed) {
  console.log(`${job.id} returned:`, result);
}

// Get specific completed job
const entry = await queue.getCompleted(jobId);

// Remove a job
await queue.remove(jobId);

// Clear all jobs
await queue.clear();

Metrics

const metrics = await queue.getMetrics();
// {
//   waiting: 42,
//   active: 3,
//   failed: 1,
//   completed: 1337,
//   totalFailed: 5,
//   totalProcessingTime: 45230,
//   avgProcessingTime: 33.8,
//   throughput: 2.5  // jobs/second (last 60s)
// }

Custom Logger

import { Queue, MemoryBackend, silentLogger, consoleLogger, type Logger } from "qwerk";

// Built-in loggers
const queue1 = new Queue(backend, { logger: silentLogger });
const queue2 = new Queue(backend, { logger: consoleLogger });

// Custom logger
const myLogger: Logger = {
  debug: (msg, ...args) => myDebugFn(msg, ...args),
  info: (msg, ...args) => myInfoFn(msg, ...args),
  warn: (msg, ...args) => myWarnFn(msg, ...args),
  error: (msg, ...args) => myErrorFn(msg, ...args),
};

Cron Expressions

Standard 5-field cron syntax:

 ┌───────────── minute (0-59)
 │ ┌───────────── hour (0-23)
 │ │ ┌───────────── day of month (1-31)
 │ │ │ ┌───────────── month (1-12 or JAN-DEC)
 │ │ │ │ ┌───────────── day of week (0-7 or SUN-SAT, 0 and 7 are Sunday)
 │ │ │ │ │
 * * * * *

Examples:

  • 0 * * * * - Every hour at :00
  • */15 * * * * - Every 15 minutes
  • 0 9 * * MON-FRI - Weekdays at 9am
  • 0 0 1 * * - First day of every month at midnight

Implementing a Custom Backend

import type { Backend, Job } from "qwerk";

class MyBackend implements Backend {
  async push(job: Job): Promise<boolean> {
    /* ... */
  }
  async pushBulk(jobs: Job[]): Promise<number> {
    /* ... */
  }
  async pop(visibilityTimeout: number): Promise<Job | null> {
    /* ... */
  }
  async ack(jobId: string, result?: unknown): Promise<void> {
    /* ... */
  }
  async nack(job: Job, nextAttemptAt: number): Promise<void> {
    /* ... */
  }
  async fail(job: Job, error: Error): Promise<void> {
    /* ... */
  }
  async getStalled(): Promise<Job[]> {
    /* ... */
  }
  async updateProgress(jobId: string, progress: number): Promise<void> {
    /* ... */
  }
  async getCompleted(jobId: string): Promise<CompletedJob | null> {
    /* ... */
  }
  async completedCount(): Promise<number> {
    /* ... */
  }
  async getCompletedJobs(limit?: number): Promise<CompletedJob[]> {
    /* ... */
  }
  subscribe(callback: () => void): void {
    /* ... */
  }
  unsubscribe(): void {
    /* ... */
  }
  async size(): Promise<number> {
    /* ... */
  }
  async activeCount(): Promise<number> {
    /* ... */
  }
  async failedCount(): Promise<number> {
    /* ... */
  }
  async getFailed(limit?: number): Promise<FailedJob[]> {
    /* ... */
  }
  async retryFailed(jobId: string): Promise<boolean> {
    /* ... */
  }
  async remove(jobId: string): Promise<boolean> {
    /* ... */
  }
  async clear(): Promise<void> {
    /* ... */
  }
  async close?(): Promise<void> {
    /* ... */
  }
}

Comparison

Feature qwerk BullMQ Agenda Bee-Queue
Backend Pluggable Redis MongoDB Redis
Zero dependencies Yes No No No
TypeScript native Yes Yes Partial Partial
Bundle size ~15KB ~150KB ~80KB ~40KB
Job progress Yes Yes No Yes
Job results Yes Yes No Yes
Cron jobs Yes (built-in) Yes Yes No
Rate limiting Yes Yes No No
Priority queues Yes Yes Yes Yes
Multi-tab/worker Yes No No No
Web UI No Yes Yes No

License

MIT

About

A type-safe job queue for Bun with multiple backends (Memory, BroadcastChannel, Redis)

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published