From 3ed8144012f06f9beaf179ae142812672f5c8194 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 09:09:59 -0800 Subject: [PATCH 01/13] rabbitmq - feat: adding in tasks --- packages/rabbitmq/package.json | 3 +- packages/rabbitmq/src/index.ts | 9 + packages/rabbitmq/src/task.ts | 776 +++++++++++++++++ packages/rabbitmq/test/task.test.ts | 1194 +++++++++++++++++++++++++++ pnpm-lock.yaml | 3 + 5 files changed, 1984 insertions(+), 1 deletion(-) create mode 100644 packages/rabbitmq/src/task.ts create mode 100644 packages/rabbitmq/test/task.test.ts diff --git a/packages/rabbitmq/package.json b/packages/rabbitmq/package.json index 78fb5e8..52c38c4 100644 --- a/packages/rabbitmq/package.json +++ b/packages/rabbitmq/package.json @@ -33,7 +33,8 @@ "author": "Jared Wray ", "license": "MIT", "dependencies": { - "amqplib": "^0.10.9" + "amqplib": "^0.10.9", + "hookified": "^1.15.1" }, "peerDependencies": { "qified": "workspace:^" diff --git a/packages/rabbitmq/src/index.ts b/packages/rabbitmq/src/index.ts index 360c9f9..f4e41a9 100644 --- a/packages/rabbitmq/src/index.ts +++ b/packages/rabbitmq/src/index.ts @@ -7,6 +7,15 @@ import { type TopicHandler, } from "qified"; +export { + defaultPollInterval, + defaultRabbitMqTaskId, + defaultRetries as defaultTaskRetries, + defaultTimeout as defaultTaskTimeout, + RabbitMqTaskProvider, + type RabbitMqTaskProviderOptions, +} from "./task.js"; + /** * Configuration options for the RabbitMQ message provider. */ diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts new file mode 100644 index 0000000..53c9fcb --- /dev/null +++ b/packages/rabbitmq/src/task.ts @@ -0,0 +1,776 @@ +import { Buffer } from "node:buffer"; +import { randomUUID } from "node:crypto"; +import { + type Channel, + type ChannelModel, + type ConsumeMessage, + connect, +} from "amqplib"; +import { Hookified } from "hookified"; +import type { + EnqueueTask, + Task, + TaskContext, + TaskHandler, + TaskProvider, + TaskProviderOptions, +} from "qified"; + +/** + * Configuration options for the RabbitMQ task provider. + */ +export type RabbitMqTaskProviderOptions = TaskProviderOptions & { + /** RabbitMQ connection URI. Defaults to "amqp://localhost:5672" */ + uri?: string; + /** Unique identifier for this provider instance. Defaults to "@qified/rabbitmq-task" */ + id?: string; + /** Poll interval in milliseconds for checking scheduled tasks. Defaults to 1000 */ + pollInterval?: number; + /** Time in seconds to wait before reconnecting. Set to 0 to disable. Defaults to 5 */ + reconnectTimeInSeconds?: number; +}; + +/** Default RabbitMQ connection URI */ +export const defaultRabbitMqTaskUri = "amqp://localhost:5672"; + +/** Default RabbitMQ task provider identifier */ +export const defaultRabbitMqTaskId = "@qified/rabbitmq-task"; + +/** Default timeout for task processing (30 seconds) */ +export const defaultTimeout = 30_000; + +/** Default maximum retry attempts */ +export const defaultRetries = 3; + +/** Default poll interval (1 second) */ +export const defaultPollInterval = 1000; + +/** Default reconnect time in seconds */ +export const defaultReconnectTimeInSeconds = 5; + +/** + * RabbitMQ-based task provider for Qified. + * Uses RabbitMQ queues for reliable task queue processing + * with visibility timeout, retries, and dead-letter queues. + * Extends Hookified to emit events for errors and other lifecycle events. + */ +export class RabbitMqTaskProvider extends Hookified implements TaskProvider { + private _id: string; + private _timeout: number; + private _retries: number; + private _taskHandlers: Map; + + private _connection: ChannelModel | undefined; + private _channel: Channel | undefined; + private _uri: string; + private _reconnectTimeInSeconds: number; + private _reconnecting = false; + private _closing = false; + private _reconnectTimer: ReturnType | undefined; + private _connectionPromise: Promise | null = null; + + private _active = true; + private _pollInterval: number; + private readonly _pollTimers: Map> = + new Map(); + private readonly _consumerTags: Map = new Map(); + + // In-memory tracking for attempt counts: taskId -> count + private readonly _attemptCounts: Map = new Map(); + + // Scheduled tasks: queue -> ScheduledEntry[] + private readonly _scheduledTasks: Map< + string, + Array<{ task: Task; scheduledAt: number }> + > = new Map(); + + // Dead letter tasks for stats: queue -> Task[] + private readonly _deadLetterTasks: Map = new Map(); + + // Track queue -> set of taskIds for cleanup + private readonly _queueTaskIds: Map> = new Map(); + + /** + * Creates a new RabbitMQ task provider instance. + * @param options Configuration options for the provider + */ + constructor(options: RabbitMqTaskProviderOptions = {}) { + super(); + this._uri = options.uri ?? defaultRabbitMqTaskUri; + this._id = options.id ?? defaultRabbitMqTaskId; + this._timeout = options.timeout ?? defaultTimeout; + this._retries = options.retries ?? defaultRetries; + this._pollInterval = options.pollInterval ?? defaultPollInterval; + this._reconnectTimeInSeconds = + options.reconnectTimeInSeconds ?? defaultReconnectTimeInSeconds; + this._taskHandlers = new Map(); + } + + /** + * Gets the provider ID. + */ + public get id(): string { + return this._id; + } + + /** + * Sets the provider ID. + */ + public set id(id: string) { + this._id = id; + } + + /** + * Gets the default timeout for task processing. + */ + public get timeout(): number { + return this._timeout; + } + + /** + * Sets the default timeout for task processing. + */ + public set timeout(timeout: number) { + this._timeout = timeout; + } + + /** + * Gets the default maximum retry attempts. + */ + public get retries(): number { + return this._retries; + } + + /** + * Sets the default maximum retry attempts. + */ + public set retries(retries: number) { + this._retries = retries; + } + + /** + * Gets the task handlers map. + */ + public get taskHandlers(): Map { + return this._taskHandlers; + } + + /** + * Sets the task handlers map. + */ + public set taskHandlers(value: Map) { + this._taskHandlers = value; + } + + /** + * Connects to RabbitMQ. Can be called explicitly or will be called automatically on first use. + */ + async connect(): Promise { + if (!this._connectionPromise) { + this._connectionPromise = (async () => { + const connection = await connect(this._uri); + this._connection = connection; + this._channel = await connection.createChannel(); + + connection.on("error", () => { + // Connection error emitted — connection is already closing/closed. + // The 'close' handler will trigger reconnection. + }); + + connection.on("close", () => { + this._channel = undefined; + this._connection = undefined; + if (!this._closing) { + this._scheduleReconnect(); + } + }); + })(); + } + + return this._connectionPromise; + } + + /** + * Returns the connected channel, connecting if necessary. + */ + private async getChannel(): Promise { + if (!this._connection || !this._channel) { + await this.connect(); + } + + // biome-ignore lint/style/noNonNullAssertion: channel is set by connect + return this._channel!; + } + + /** + * Schedules a reconnection attempt after the configured delay. + */ + private _scheduleReconnect(): void { + if ( + this._reconnectTimeInSeconds <= 0 || + this._reconnecting || + this._closing + ) { + return; + } + + this._reconnectTimer = setTimeout(async () => { + this._reconnectTimer = undefined; + await this._attemptReconnect(); + }, this._reconnectTimeInSeconds * 1000); + } + + /** + * Attempts to reconnect to RabbitMQ and re-establish all consumers. + */ + private async _attemptReconnect(): Promise { + if (this._reconnecting || this._closing) { + return; + } + + this._reconnecting = true; + let failed = false; + try { + // Reset connection promise so connect() creates a new one + this._connectionPromise = null; + await this.connect(); + + // biome-ignore lint/style/noNonNullAssertion: channel is set by connect + const channel = this._channel!; + + // Re-establish consumers for all queues with handlers + const queues = [...this._taskHandlers.keys()]; + for (const queue of queues) { + this._consumerTags.delete(queue); + await this._setupConsumer(channel, queue); + } + } catch { + this._channel = undefined; + this._connection = undefined; + this._connectionPromise = null; + failed = true; + } finally { + this._reconnecting = false; + } + + if (failed) { + this._scheduleReconnect(); + } + } + + /** + * Generates a globally unique task ID. + */ + private generateTaskId(): string { + return `task-${randomUUID()}`; + } + + /** + * Publishes a task to a RabbitMQ queue. + */ + private async publishTask(queue: string, task: Task): Promise { + const channel = await this.getChannel(); + await channel.assertQueue(queue, { durable: true }); + + // Track task in memory + if (!this._queueTaskIds.has(queue)) { + this._queueTaskIds.set(queue, new Set()); + } + + this._queueTaskIds.get(queue)?.add(task.id); + + channel.sendToQueue(queue, Buffer.from(JSON.stringify(task)), { + persistent: true, + }); + } + + /** + * Moves a task to the dead-letter queue. + */ + private async moveToDeadLetter(queue: string, task: Task): Promise { + const channel = await this.getChannel(); + const dlqName = `${queue}:dead-letter`; + await channel.assertQueue(dlqName, { durable: true }); + channel.sendToQueue(dlqName, Buffer.from(JSON.stringify(task)), { + persistent: true, + }); + + if (!this._deadLetterTasks.has(queue)) { + this._deadLetterTasks.set(queue, []); + } + + this._deadLetterTasks.get(queue)?.push(task); + + // Clean up in-memory tracking + this._attemptCounts.delete(task.id); + this._queueTaskIds.get(queue)?.delete(task.id); + } + + /** + * Cleans up in-memory tracking for a task after acknowledgment. + */ + private cleanupTask(queue: string, taskId: string): void { + this._attemptCounts.delete(taskId); + this._queueTaskIds.get(queue)?.delete(taskId); + } + + /** + * Sets up a RabbitMQ consumer for a task queue. + */ + private async _setupConsumer(channel: Channel, queue: string): Promise { + await channel.assertQueue(queue, { durable: true }); + await channel.assertQueue(`${queue}:dead-letter`, { durable: true }); + await channel.prefetch(1); + + const { consumerTag } = await channel.consume( + queue, + async (message_) => { + if (!message_) { + return; + } + + /* v8 ignore next 4 -- @preserve */ + if (!this._active) { + channel.nack(message_, false, true); + return; + } + + const task = JSON.parse(message_.content.toString()) as Task; + const handlers = this._taskHandlers.get(queue); + /* v8 ignore next 4 -- @preserve */ + if (!handlers || handlers.length === 0) { + channel.nack(message_, false, true); + return; + } + + for (const handler of handlers) { + await this.processTask(queue, task, message_, handler); + } + }, + { noAck: false }, + ); + + this._consumerTags.set(queue, consumerTag); + } + + /** + * Processes a single task with a handler. + */ + private async processTask( + queue: string, + task: Task, + amqpMessage: ConsumeMessage, + handler: TaskHandler, + ): Promise { + const maxRetries = task.maxRetries ?? this._retries; + const timeout = task.timeout ?? this._timeout; + + // Increment attempt count + const currentAttempt = (this._attemptCounts.get(task.id) ?? 0) + 1; + this._attemptCounts.set(task.id, currentAttempt); + + let acknowledged = false; + let rejected = false; + let timeoutHandle: ReturnType | undefined; + + const channel = await this.getChannel(); + + const context: TaskContext = { + ack: async () => { + if (acknowledged || rejected || !this._active) { + return; + } + + acknowledged = true; + try { + channel.ack(amqpMessage); + this.cleanupTask(queue, task.id); + } catch (error) { + /* v8 ignore next -- @preserve */ + this.emit("error", error); + } + }, + reject: async (requeue = true) => { + if (acknowledged || rejected || !this._active) { + return; + } + + rejected = true; + try { + channel.nack(amqpMessage, false, false); + + if (requeue && currentAttempt < maxRetries) { + await this.publishTask(queue, task); + } else { + await this.moveToDeadLetter(queue, task); + } + } catch (error) { + /* v8 ignore next -- @preserve */ + this.emit("error", error); + } + }, + extend: async (ttl: number) => { + if (acknowledged || rejected || !this._active) { + return; + } + + try { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + + timeoutHandle = setTimeout(() => { + if (!acknowledged && !rejected && this._active) { + void context.reject(true); + } + }, ttl); + } catch (error) { + /* v8 ignore next -- @preserve */ + this.emit("error", error); + } + }, + metadata: { + attempt: currentAttempt, + maxRetries, + }, + }; + + // Set timeout handler + timeoutHandle = setTimeout(() => { + if (!acknowledged && !rejected && this._active) { + void context.reject(true); + } + }, timeout); + + try { + await handler.handler(task, context); + + // Auto-ack if handler completes without explicit ack/reject + if (!acknowledged && !rejected) { + await context.ack(); + } + } catch { + // Auto-reject on error + if (!acknowledged && !rejected) { + await context.reject(true); + } + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } + } + + /** + * Checks for scheduled tasks that are ready to execute. + */ + private async checkScheduledTasks(queue: string): Promise { + /* v8 ignore next -- @preserve */ + if (!this._active) { + return; + } + + const scheduled = this._scheduledTasks.get(queue); + if (!scheduled || scheduled.length === 0) { + return; + } + + const now = Date.now(); + const ready: Array<{ task: Task; scheduledAt: number }> = []; + const remaining: Array<{ task: Task; scheduledAt: number }> = []; + + for (const entry of scheduled) { + /* v8 ignore next -- @preserve */ + if (!this._active) { + return; + } + + if (entry.scheduledAt <= now) { + ready.push(entry); + } else { + remaining.push(entry); + } + } + + this._scheduledTasks.set(queue, remaining); + + for (const entry of ready) { + /* v8 ignore next -- @preserve */ + if (!this._active) { + return; + } + + await this.publishTask(queue, entry.task); + } + } + + /** + * Starts the polling loop for a queue. + */ + private startPolling(queue: string): void { + const poll = async () => { + /* v8 ignore next -- @preserve */ + if (!this._active) { + return; + } + + try { + await this.checkScheduledTasks(queue); + } catch (error) { + /* v8 ignore next -- @preserve */ + this.emit("error", error); + } + + if (this._active && this._taskHandlers.has(queue)) { + this._pollTimers.set(queue, setTimeout(poll, this._pollInterval)); + } + }; + + this._pollTimers.set(queue, setTimeout(poll, this._pollInterval)); + } + + /** + * Enqueues a task to a specific queue. + * @param queue The queue name to enqueue to + * @param taskData The task data to enqueue + * @returns The ID of the enqueued task + */ + public async enqueue(queue: string, taskData: EnqueueTask): Promise { + if (!this._active) { + throw new Error("TaskProvider has been disconnected"); + } + + const task: Task = { + id: this.generateTaskId(), + timestamp: Date.now(), + ...taskData, + }; + + // If scheduled for the future, hold in memory + if (task.scheduledAt && task.scheduledAt > Date.now()) { + if (!this._scheduledTasks.has(queue)) { + this._scheduledTasks.set(queue, []); + } + + this._scheduledTasks.get(queue)?.push({ + task, + scheduledAt: task.scheduledAt, + }); + return task.id; + } + + // Publish to RabbitMQ queue immediately + await this.publishTask(queue, task); + return task.id; + } + + /** + * Registers a handler to process tasks from a queue. + * @param queue The queue name to dequeue from + * @param handler The handler configuration + */ + public async dequeue(queue: string, handler: TaskHandler): Promise { + if (!this._active) { + throw new Error("TaskProvider has been disconnected"); + } + + if (!this._taskHandlers.has(queue)) { + this._taskHandlers.set(queue, []); + } + + this._taskHandlers.get(queue)?.push(handler); + + // Set up consumer if not already + if (!this._consumerTags.has(queue)) { + const channel = await this.getChannel(); + await this._setupConsumer(channel, queue); + } + + // Start polling for scheduled tasks if not already + if (!this._pollTimers.has(queue)) { + this.startPolling(queue); + } + } + + /** + * Unsubscribes a handler from a queue. + * @param queue The queue name to unsubscribe from + * @param id Optional handler ID. If not provided, removes all handlers. + */ + public async unsubscribe(queue: string, id?: string): Promise { + if (id) { + const handlers = this._taskHandlers.get(queue); + if (handlers) { + this._taskHandlers.set( + queue, + handlers.filter((h) => h.id !== id), + ); + } + } else { + this._taskHandlers.delete(queue); + } + + // Stop polling and cancel consumer if no handlers left for this queue + if ( + !this._taskHandlers.has(queue) || + this._taskHandlers.get(queue)?.length === 0 + ) { + const consumerTag = this._consumerTags.get(queue); + if (consumerTag && this._channel) { + try { + await this._channel.cancel(consumerTag); + } catch { + /* ignore if channel closed */ + } + + this._consumerTags.delete(queue); + } + + const timer = this._pollTimers.get(queue); + if (timer) { + clearTimeout(timer); + this._pollTimers.delete(queue); + } + } + } + + /** + * Disconnects and cleans up the provider. + * @param force If true, skips graceful close. Defaults to false. + */ + public async disconnect(force = false): Promise { + this._active = false; + this._closing = true; + + // Clear reconnect timer + if (this._reconnectTimer) { + clearTimeout(this._reconnectTimer); + this._reconnectTimer = undefined; + } + + // Clear all poll timers + for (const timer of this._pollTimers.values()) { + clearTimeout(timer); + } + + this._pollTimers.clear(); + + // Clear handlers and in-memory state + this._taskHandlers.clear(); + this._scheduledTasks.clear(); + this._attemptCounts.clear(); + this._queueTaskIds.clear(); + + // Cancel consumers and close channel/connection + if (this._channel) { + if (!force) { + for (const tag of this._consumerTags.values()) { + try { + await this._channel.cancel(tag); + } catch { + /* ignore */ + } + } + } + + this._consumerTags.clear(); + + if (!force) { + try { + await this._channel.close(); + } catch { + /* ignore */ + } + } + + this._channel = undefined; + } + + if (this._connection) { + if (!force) { + try { + await this._connection.close(); + } catch { + /* ignore */ + } + } + + this._connection = undefined; + } + + this._connectionPromise = null; + this._closing = false; + } + + /** + * Gets all tasks in the dead-letter queue for a specific queue. + * @param queue The queue name + * @returns Array of tasks in the dead-letter queue + */ + public async getDeadLetterTasks(queue: string): Promise { + return this._deadLetterTasks.get(queue) ?? []; + } + + /** + * Gets the current state of a queue. + * @param queue The queue name + * @returns Queue statistics + */ + public async getQueueStats(queue: string): Promise<{ + waiting: number; + processing: number; + deadLetter: number; + scheduled: number; + }> { + let waiting = 0; + try { + const channel = await this.getChannel(); + const queueInfo = await channel.checkQueue(queue); + waiting = queueInfo.messageCount; + } catch { + // Queue may not exist yet + } + + const deadLetter = this._deadLetterTasks.get(queue)?.length ?? 0; + const scheduled = this._scheduledTasks.get(queue)?.length ?? 0; + + return { + waiting, + processing: 0, + deadLetter, + scheduled, + }; + } + + /** + * Clears all data for a queue. Useful for testing. + * @param queue The queue name to clear + */ + public async clearQueue(queue: string): Promise { + const channel = await this.getChannel(); + try { + await channel.purgeQueue(queue); + } catch { + /* queue may not exist */ + } + + try { + await channel.purgeQueue(`${queue}:dead-letter`); + } catch { + /* queue may not exist */ + } + + this._deadLetterTasks.delete(queue); + this._scheduledTasks.delete(queue); + + // Clear task data for this queue + const taskIds = this._queueTaskIds.get(queue); + if (taskIds) { + for (const taskId of taskIds) { + this._attemptCounts.delete(taskId); + } + + this._queueTaskIds.delete(queue); + } + } +} diff --git a/packages/rabbitmq/test/task.test.ts b/packages/rabbitmq/test/task.test.ts new file mode 100644 index 0000000..d10dbb0 --- /dev/null +++ b/packages/rabbitmq/test/task.test.ts @@ -0,0 +1,1194 @@ +// biome-ignore-all lint/suspicious/noExplicitAny: This is a test file and explicit any is acceptable here. + +import type { Task, TaskContext, TaskHandler } from "qified"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { + defaultRabbitMqTaskId, + defaultTaskRetries, + defaultTaskTimeout, + RabbitMqTaskProvider, +} from "../src/index.js"; + +async function waitFor( + condition: () => boolean | Promise, + timeoutMs = 5000, + intervalMs = 50, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (await condition()) { + return; + } + + await new Promise((resolve) => { + setTimeout(resolve, intervalMs); + }); + } + + throw new Error(`waitFor timed out after ${timeoutMs}ms`); +} + +describe("RabbitMqTaskProvider", () => { + let provider: RabbitMqTaskProvider; + const testQueue = `test-queue-${Date.now()}`; + + describe("hookified inheritance", () => { + test("should extend Hookified and support event emission", () => { + const p = new RabbitMqTaskProvider(); + expect(typeof p.on).toBe("function"); + expect(typeof p.emit).toBe("function"); + expect(typeof p.off).toBe("function"); + }); + + test("should emit error events when RabbitMQ operations fail in ack", async () => { + const customProvider = new RabbitMqTaskProvider(); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + const errors: Error[] = []; + customProvider.on("error", (error: Error) => { + errors.push(error); + }); + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, _ctx: TaskContext) => { + // Handler completes, auto-ack will happen + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + // Wait for task to be processed + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Clean up + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + + // Create a new provider to verify error listener support + const p2 = new RabbitMqTaskProvider(); + await p2.connect(); + await p2.clearQueue(testQueue); + + const errors2: Error[] = []; + p2.on("error", (error: Error) => { + errors2.push(error); + }); + + expect(typeof p2.on).toBe("function"); + + await p2.clearQueue(testQueue); + await p2.disconnect(); + }); + + test("should support onHook and removeHook from Hookified", () => { + const p = new RabbitMqTaskProvider(); + expect(typeof p.onHook).toBe("function"); + expect(typeof p.removeHook).toBe("function"); + }); + + test("should support once method from Hookified", () => { + const p = new RabbitMqTaskProvider(); + expect(typeof p.once).toBe("function"); + }); + }); + + beforeEach(async () => { + provider = new RabbitMqTaskProvider(); + await provider.connect(); + await provider.clearQueue(testQueue); + }); + + afterEach(async () => { + await provider.clearQueue(testQueue); + await provider.disconnect(); + }); + + describe("constructor and initialization", () => { + test("should initialize with default values", () => { + const p = new RabbitMqTaskProvider(); + expect(p.id).toBe(defaultRabbitMqTaskId); + expect(p.timeout).toBe(defaultTaskTimeout); + expect(p.retries).toBe(defaultTaskRetries); + expect(p.taskHandlers).toEqual(new Map()); + }); + + test("should initialize with custom id", () => { + const customProvider = new RabbitMqTaskProvider({ id: "custom-id" }); + expect(customProvider.id).toBe("custom-id"); + }); + + test("should initialize with custom timeout", () => { + const customProvider = new RabbitMqTaskProvider({ timeout: 5000 }); + expect(customProvider.timeout).toBe(5000); + }); + + test("should initialize with custom retries", () => { + const customProvider = new RabbitMqTaskProvider({ retries: 5 }); + expect(customProvider.retries).toBe(5); + }); + + test("should initialize with all custom options", () => { + const customProvider = new RabbitMqTaskProvider({ + id: "custom-id", + timeout: 5000, + retries: 5, + pollInterval: 500, + }); + expect(customProvider.id).toBe("custom-id"); + expect(customProvider.timeout).toBe(5000); + expect(customProvider.retries).toBe(5); + }); + + test("should fail to connect when RabbitMQ is not available", async () => { + const p = new RabbitMqTaskProvider({ uri: "amqp://localhost:9999" }); + await expect(p.connect()).rejects.toThrow(); + }); + }); + + describe("getters and setters", () => { + test("should set and get id", () => { + provider.id = "new-id"; + expect(provider.id).toBe("new-id"); + }); + + test("should set and get timeout", () => { + provider.timeout = 10000; + expect(provider.timeout).toBe(10000); + }); + + test("should set and get retries", () => { + provider.retries = 10; + expect(provider.retries).toBe(10); + }); + + test("should set and get taskHandlers", () => { + const handlers = new Map(); + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + handlers.set("test-queue", [handler]); + provider.taskHandlers = handlers; + expect(provider.taskHandlers).toBe(handlers); + }); + }); + + describe("enqueue", () => { + test("should enqueue a task with auto-generated id and timestamp", async () => { + const taskId = await provider.enqueue(testQueue, { + data: { message: "test" }, + }); + + expect(taskId).toBeDefined(); + expect(typeof taskId).toBe("string"); + // UUID format: task-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx + expect(taskId).toMatch( + /^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ); + }); + + test("should enqueue multiple tasks with unique ids", async () => { + const taskId1 = await provider.enqueue(testQueue, { + data: { message: "test1" }, + }); + const taskId2 = await provider.enqueue(testQueue, { + data: { message: "test2" }, + }); + + expect(taskId1).not.toBe(taskId2); + }); + + test("should enqueue task with all optional fields", async () => { + const taskId = await provider.enqueue(testQueue, { + data: { message: "test" }, + headers: { "x-custom": "value" }, + priority: 10, + maxRetries: 5, + timeout: 5000, + scheduledAt: Date.now() + 10000, + }); + + expect(taskId).toBeDefined(); + }); + + test("should throw error when disconnected", async () => { + await provider.disconnect(); + + await expect( + provider.enqueue(testQueue, { data: { message: "test" } }), + ).rejects.toThrow("TaskProvider has been disconnected"); + }); + }); + + describe("dequeue and task processing", () => { + test("should register a handler for a queue", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler); + + expect(provider.taskHandlers.has(testQueue)).toBe(true); + expect(provider.taskHandlers.get(testQueue)?.length).toBe(1); + expect(provider.taskHandlers.get(testQueue)?.[0]).toBe(handler); + }); + + test("should register multiple handlers for the same queue", async () => { + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(2); + }); + + test("should process enqueued task when handler is registered", async () => { + let processedTask: Task | undefined; + const handler: TaskHandler = { + id: "test-handler", + handler: async (task: Task) => { + processedTask = task; + }, + }; + + await provider.dequeue(testQueue, handler); + const taskId = await provider.enqueue(testQueue, { + data: { message: "test" }, + }); + + await waitFor(() => processedTask !== undefined); + + expect(processedTask).toBeDefined(); + expect(processedTask?.id).toBe(taskId); + expect(processedTask?.data).toEqual({ message: "test" }); + }); + + test("should auto-acknowledge task when handler completes successfully", async () => { + let processed = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + processed = true; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => processed); + + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + }); + + test("should throw error when disconnected", async () => { + await provider.disconnect(); + + await expect( + provider.dequeue(testQueue, { handler: async () => {} }), + ).rejects.toThrow("TaskProvider has been disconnected"); + }); + }); + + describe("task acknowledgment", () => { + test("should acknowledge task explicitly", async () => { + let context: TaskContext | undefined; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + context = ctx; + await ctx.ack(); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => context !== undefined); + + expect(context).toBeDefined(); + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + }); + + test("should not acknowledge twice", async () => { + let ackCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.ack(); + ackCount++; + await ctx.ack(); // Second ack should be no-op + ackCount++; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => ackCount >= 2); + + expect(ackCount).toBe(2); + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + }); + }); + + describe("task rejection and retry", () => { + test("should reject and requeue task on failure", async () => { + const customProvider = new RabbitMqTaskProvider({ pollInterval: 100 }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + let attemptCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + attemptCount++; + if (attemptCount === 1) { + await ctx.reject(true); // Requeue + } else { + await ctx.ack(); + } + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => attemptCount > 1); + + expect(attemptCount).toBeGreaterThan(1); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + + test("should move to dead-letter queue after max retries", async () => { + const customProvider = new RabbitMqTaskProvider({ + retries: 2, + pollInterval: 100, + }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + let attemptCount = 0; + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + attemptCount++; + await ctx.reject(true); // Always reject + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(testQueue); + return dlq.length === 1; + }); + + const deadLetterTasks = + await customProvider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(1); + expect(deadLetterTasks[0].data).toEqual({ message: "test" }); + expect(attemptCount).toBe(2); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + + test("should move to dead-letter queue when reject with requeue=false", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); // Don't requeue + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length === 1; + }); + + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(1); + }); + + test("should not reject twice", async () => { + let rejectCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + rejectCount++; + await ctx.reject(false); // Second reject should be no-op + rejectCount++; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => rejectCount >= 2); + + expect(rejectCount).toBe(2); + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(1); + }); + + test("should auto-reject task on handler error", async () => { + const customProvider = new RabbitMqTaskProvider({ + retries: 3, + pollInterval: 100, + }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + let attemptCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + attemptCount++; + throw new Error("Handler error"); + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(testQueue); + return dlq.length === 1; + }); + + expect(attemptCount).toBe(3); + const deadLetterTasks = + await customProvider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(1); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + }); + + describe("task timeout", () => { + test("should timeout task after configured timeout", async () => { + const customProvider = new RabbitMqTaskProvider({ + timeout: 100, + pollInterval: 50, + }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + // Simulate long-running task + await new Promise((resolve) => setTimeout(resolve, 500)); + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Task should have timed out and be requeued or in DLQ + const stats = await customProvider.getQueueStats(testQueue); + const dlq = await customProvider.getDeadLetterTasks(testQueue); + const taskTimedOut = stats.waiting > 0 || dlq.length > 0; + expect(taskTimedOut).toBe(true); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + + test("should use task-specific timeout over provider default", async () => { + const customProvider = new RabbitMqTaskProvider({ + timeout: 5000, + pollInterval: 50, + }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + // Simulate long-running task + await new Promise((resolve) => setTimeout(resolve, 200)); + await ctx.ack(); + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { + data: { message: "test" }, + timeout: 50, // Task-specific timeout (shorter than handler duration) + }); + + await waitFor(async () => { + const s = await customProvider.getQueueStats(testQueue); + const dlq = await customProvider.getDeadLetterTasks(testQueue); + return s.waiting + dlq.length > 0; + }); + + const stats = await customProvider.getQueueStats(testQueue); + const dlq = await customProvider.getDeadLetterTasks(testQueue); + expect(stats.waiting + dlq.length).toBeGreaterThan(0); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + }); + + describe("task context extend", () => { + test("should extend task deadline", async () => { + const customProvider = new RabbitMqTaskProvider({ + timeout: 200, + pollInterval: 50, + }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + let completed = false; + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + // Extend timeout before long operation + await ctx.extend(1000); + await new Promise((resolve) => setTimeout(resolve, 300)); + completed = true; + await ctx.ack(); + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => completed); + + expect(completed).toBe(true); + const stats = await customProvider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + + test("should not extend after acknowledgment", async () => { + let extendCalled = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.ack(); + await ctx.extend(1000); // Should be no-op + extendCalled = true; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => extendCalled); + + expect(extendCalled).toBe(true); + }); + }); + + describe("task context metadata", () => { + test("should provide attempt and maxRetries in context", async () => { + let contextMetadata: any; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + contextMetadata = ctx.metadata; + await ctx.ack(); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => contextMetadata !== undefined); + + expect(contextMetadata).toBeDefined(); + expect(contextMetadata.attempt).toBe(1); + expect(contextMetadata.maxRetries).toBe(defaultTaskRetries); + }); + + test("should increment attempt on retry", async () => { + const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + const attempts: number[] = []; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + attempts.push(ctx.metadata.attempt); + if (ctx.metadata.attempt < 2) { + await ctx.reject(true); + } else { + await ctx.ack(); + } + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => attempts.length > 1); + + expect(attempts.length).toBeGreaterThan(1); + expect(attempts[0]).toBe(1); + expect(attempts[1]).toBe(2); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + + test("should use task-specific maxRetries", async () => { + let contextMetadata: any; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + contextMetadata = ctx.metadata; + await ctx.ack(); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { + data: { message: "test" }, + maxRetries: 10, + }); + + await waitFor(() => contextMetadata !== undefined); + + expect(contextMetadata.maxRetries).toBe(10); + }); + }); + + describe("scheduled tasks", () => { + test("should not process task before scheduledAt time", async () => { + let processed = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + processed = true; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { + data: { message: "test" }, + scheduledAt: Date.now() + 2000, // Schedule 2 seconds in future + }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(processed).toBe(false); + const stats = await provider.getQueueStats(testQueue); + expect(stats.scheduled).toBe(1); + }); + + test("should process task after scheduledAt time via polling", async () => { + const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + let processed = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + processed = true; + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { + data: { message: "test" }, + scheduledAt: Date.now() + 100, // Schedule 100ms in future + }); + + // Task should not be processed yet + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(processed).toBe(false); + + // Wait for scheduled task to be processed by polling + await waitFor(() => processed); + + expect(processed).toBe(true); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + }); + + describe("unsubscribe", () => { + test("should unsubscribe specific handler by id", async () => { + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(2); + + await provider.unsubscribe(testQueue, "handler-1"); + + const handlers = provider.taskHandlers.get(testQueue); + expect(handlers?.length).toBe(1); + expect(handlers?.[0].id).toBe("handler-2"); + }); + + test("should unsubscribe all handlers when id not provided", async () => { + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(2); + + await provider.unsubscribe(testQueue); + + expect(provider.taskHandlers.has(testQueue)).toBe(false); + }); + + test("should handle unsubscribe for non-existent queue", async () => { + await expect( + provider.unsubscribe("non-existent-queue", "handler-1"), + ).resolves.not.toThrow(); + }); + + test("should handle unsubscribe for non-existent handler id", async () => { + const handler: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler); + + await expect( + provider.unsubscribe(testQueue, "non-existent-handler"), + ).resolves.not.toThrow(); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(1); + }); + }); + + describe("disconnect", () => { + test("should clear all handlers on disconnect", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + expect(provider.taskHandlers.size).toBeGreaterThan(0); + + await provider.disconnect(); + + expect(provider.taskHandlers.size).toBe(0); + }); + + test("should prevent operations after disconnect", async () => { + await provider.disconnect(); + + await expect( + provider.enqueue(testQueue, { data: { message: "test" } }), + ).rejects.toThrow("TaskProvider has been disconnected"); + + await expect( + provider.dequeue(testQueue, { handler: async () => {} }), + ).rejects.toThrow("TaskProvider has been disconnected"); + }); + + test("should force disconnect", async () => { + const p = new RabbitMqTaskProvider(); + await p.connect(); + + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + await p.dequeue(testQueue, handler); + + // Force disconnect should skip graceful close + await p.disconnect(true); + expect(p.taskHandlers.size).toBe(0); + }); + }); + + describe("getDeadLetterTasks", () => { + test("should return empty array for queue with no dead-letter tasks", async () => { + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks).toEqual([]); + }); + + test("should return dead-letter tasks for queue", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test1" } }); + await provider.enqueue(testQueue, { data: { message: "test2" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length === 2; + }); + + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(2); + }); + }); + + describe("getQueueStats", () => { + test("should return empty stats for non-existent queue", async () => { + const stats = await provider.getQueueStats("non-existent-queue"); + expect(stats).toEqual({ + waiting: 0, + processing: 0, + deadLetter: 0, + scheduled: 0, + }); + }); + + test("should return correct stats for queue with waiting tasks", async () => { + await provider.enqueue(testQueue, { data: { message: "test1" } }); + await provider.enqueue(testQueue, { data: { message: "test2" } }); + + // Small delay for RabbitMQ to process + await new Promise((resolve) => setTimeout(resolve, 50)); + + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(2); + expect(stats.processing).toBe(0); + expect(stats.deadLetter).toBe(0); + }); + + test("should return correct stats with scheduled tasks", async () => { + await provider.enqueue(testQueue, { + data: { message: "test" }, + scheduledAt: Date.now() + 10000, + }); + + const stats = await provider.getQueueStats(testQueue); + expect(stats.scheduled).toBe(1); + expect(stats.waiting).toBe(0); + }); + + test("should return correct stats with dead-letter tasks", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test1" } }); + await provider.enqueue(testQueue, { data: { message: "test2" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length === 2; + }); + + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + expect(stats.deadLetter).toBe(2); + }); + }); + + describe("multiple handlers", () => { + test("should deliver task to all registered handlers", async () => { + let handler1Called = false; + let handler2Called = false; + + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => { + handler1Called = true; + }, + }; + + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => { + handler2Called = true; + }, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => handler1Called && handler2Called); + + expect(handler1Called).toBe(true); + expect(handler2Called).toBe(true); + }); + }); + + describe("multiple queues", () => { + test("should maintain separate queues", async () => { + const queue1 = `${testQueue}-1`; + const queue2 = `${testQueue}-2`; + + let queue1Processed = false; + let queue2Processed = false; + + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => { + queue1Processed = true; + }, + }; + + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => { + queue2Processed = true; + }, + }; + + await provider.dequeue(queue1, handler1); + await provider.dequeue(queue2, handler2); + await provider.enqueue(queue1, { data: { message: "test1" } }); + await provider.enqueue(queue2, { data: { message: "test2" } }); + + await waitFor(() => queue1Processed && queue2Processed); + + expect(queue1Processed).toBe(true); + expect(queue2Processed).toBe(true); + + const stats1 = await provider.getQueueStats(queue1); + const stats2 = await provider.getQueueStats(queue2); + + expect(stats1.waiting).toBe(0); + expect(stats2.waiting).toBe(0); + + // Clean up + await provider.clearQueue(queue1); + await provider.clearQueue(queue2); + }); + }); + + describe("task fields", () => { + test("should preserve task data fields", async () => { + let receivedTask: Task | undefined; + + const handler: TaskHandler = { + id: "test-handler", + handler: async (task: Task) => { + receivedTask = task; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { + data: { message: "test", nested: { value: 123 } }, + headers: { "x-custom": "header-value" }, + priority: 5, + }); + + await waitFor(() => receivedTask !== undefined); + + expect(receivedTask).toBeDefined(); + expect(receivedTask?.data).toEqual({ + message: "test", + nested: { value: 123 }, + }); + expect(receivedTask?.headers).toEqual({ "x-custom": "header-value" }); + expect(receivedTask?.priority).toBe(5); + expect(receivedTask?.timestamp).toBeDefined(); + }); + }); + + describe("clearQueue", () => { + test("should clear all data for a queue", async () => { + // Add tasks to various states + await provider.enqueue(testQueue, { data: { message: "test1" } }); + await provider.enqueue(testQueue, { + data: { message: "test2" }, + scheduledAt: Date.now() + 10000, + }); + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test3" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length > 0; + }); + + // Clear queue + await provider.clearQueue(testQueue); + + // Verify data is cleared + const statsAfter = await provider.getQueueStats(testQueue); + expect(statsAfter.waiting).toBe(0); + expect(statsAfter.deadLetter).toBe(0); + expect(statsAfter.scheduled).toBe(0); + }); + }); + + describe("edge cases", () => { + test("should handle disconnect during task processing", async () => { + const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + // Long operation + await new Promise((resolve) => setTimeout(resolve, 200)); + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + // Wait just enough for processing to start + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Disconnect while potentially in the middle of processing + await customProvider.disconnect(); + + // Should handle gracefully + expect(customProvider.taskHandlers.size).toBe(0); + }); + + test("should handle polling loop exit when inactive", async () => { + const customProvider = new RabbitMqTaskProvider({ pollInterval: 20 }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + await customProvider.dequeue(testQueue, { + id: "test-handler", + handler: async () => {}, + }); + + // Let polling start + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Disconnect - this sets _active to false + await customProvider.disconnect(); + + // Wait a bit more - polling should have stopped + await new Promise((resolve) => setTimeout(resolve, 100)); + + // No errors should occur + expect(customProvider.taskHandlers.size).toBe(0); + }); + + test("should handle extended timeout expiration triggering reject", async () => { + const customProvider = new RabbitMqTaskProvider({ + timeout: 5000, + pollInterval: 50, + }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + let extendCalled = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + // Extend with a short timeout + await ctx.extend(50); + extendCalled = true; + // Wait longer than the extended timeout + await new Promise((resolve) => setTimeout(resolve, 200)); + // Task should have been auto-rejected by now via the extended timeout + }, + }; + + await customProvider.dequeue(testQueue, handler); + await customProvider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => extendCalled); + + // Wait for extended timeout to fire + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(extendCalled).toBe(true); + + await customProvider.clearQueue(testQueue); + await customProvider.disconnect(); + }); + + test("should handle disconnect during scheduled task processing", async () => { + const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + await customProvider.connect(); + await customProvider.clearQueue(testQueue); + + // Enqueue multiple scheduled tasks + for (let i = 0; i < 5; i++) { + await customProvider.enqueue(testQueue, { + data: { message: `scheduled-${i}` }, + scheduledAt: Date.now() + 10, // Very short delay + }); + } + + // Register a handler to start polling + await customProvider.dequeue(testQueue, { + id: "test-handler", + handler: async () => {}, + }); + + // Wait a tiny bit then disconnect during processing + await new Promise((resolve) => setTimeout(resolve, 30)); + await customProvider.disconnect(); + + // Should not throw - disconnect handled gracefully + expect(customProvider.taskHandlers.size).toBe(0); + }); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3da2c46..3c5894f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -59,6 +59,9 @@ importers: amqplib: specifier: ^0.10.9 version: 0.10.9 + hookified: + specifier: ^1.15.1 + version: 1.15.1 qified: specifier: workspace:^ version: link:../qified From 9c95f427add79c38dc8d601c333f138eb5ee1ce1 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 10:19:45 -0800 Subject: [PATCH 02/13] updating tests --- packages/rabbitmq/README.md | 142 +++++++++- packages/rabbitmq/src/task.ts | 61 ++-- packages/rabbitmq/test/task-edge.test.ts | 342 +++++++++++++++++++++++ packages/rabbitmq/test/task.test.ts | 288 +++++++++++-------- 4 files changed, 683 insertions(+), 150 deletions(-) create mode 100644 packages/rabbitmq/test/task-edge.test.ts diff --git a/packages/rabbitmq/README.md b/packages/rabbitmq/README.md index 8436b26..6b69e98 100644 --- a/packages/rabbitmq/README.md +++ b/packages/rabbitmq/README.md @@ -1,13 +1,15 @@ # @qified/rabbitmq -RabbitMQ message provider for [Qified](https://github.com/jaredwray/qified). +RabbitMQ message and task provider for [Qified](https://github.com/jaredwray/qified). -This package implements a message provider backed by RabbitMQ using queues for publish and subscribe operations. +This package implements a message provider and a task provider backed by RabbitMQ. The message provider uses queues for publish/subscribe operations, and the task provider adds reliable task queue processing with retries, timeouts, scheduled tasks, and dead-letter queues. ## Table of Contents - [Installation](#installation) - [Usage with Qified](#usage-with-qified) + - [Message Provider](#message-provider) + - [Task Provider](#task-provider) - [API](#api) - [RabbitMqMessageProviderOptions](#rabbitmqmessageprovideroptions) - [defaultRabbitMqUri](#defaultrabbitmquri) @@ -18,6 +20,17 @@ This package implements a message provider backed by RabbitMQ using queues for p - [unsubscribe](#unsubscribe) - [disconnect](#disconnect) - [createQified](#createqified) + - [RabbitMqTaskProviderOptions](#rabbitmqtaskprovideroptions) + - [RabbitMqTaskProvider](#rabbitmqtaskprovider) + - [constructor](#constructor-1) + - [connect](#connect) + - [enqueue](#enqueue) + - [dequeue](#dequeue) + - [unsubscribe](#unsubscribe-1) + - [disconnect](#disconnect-1) + - [getDeadLetterTasks](#getdeadlettertasks) + - [getQueueStats](#getqueuestats) + - [clearQueue](#clearqueue) - [Contributing](#contributing) - [License](#license) @@ -29,6 +42,8 @@ pnpm add @qified/rabbitmq ## Usage with Qified +### Message Provider + ```ts import { createQified } from "@qified/rabbitmq"; import type { Message } from "qified"; @@ -46,6 +61,52 @@ await qified.publish("example-topic", { id: "1", data: "Hello from RabbitMQ!" }) await qified.disconnect(); ``` +### Task Provider + +```ts +import { RabbitMqTaskProvider } from "@qified/rabbitmq"; + +const taskProvider = new RabbitMqTaskProvider({ uri: "amqp://localhost:5672" }); + +// Enqueue a task +await taskProvider.enqueue("my-queue", { + data: { action: "send-email", to: "user@example.com" }, +}); + +// Enqueue a scheduled task (processed after the given time) +await taskProvider.enqueue("my-queue", { + data: { action: "send-reminder" }, + scheduledAt: Date.now() + 60_000, // 1 minute from now +}); + +// Dequeue and process tasks +await taskProvider.dequeue("my-queue", { + id: "email-handler", + handler: async (task, ctx) => { + console.log("Processing task:", task.data); + + // Access attempt metadata + console.log(`Attempt ${ctx.metadata().attempt} of ${ctx.metadata().maxRetries}`); + + // Extend the deadline if needed + ctx.extend(10_000); + + // Acknowledge the task on success + ctx.ack(); + }, +}); + +// Get queue statistics +const stats = await taskProvider.getQueueStats("my-queue"); +console.log(stats); // { waiting, processing, deadLetter, scheduled } + +// Get dead-letter tasks for inspection +const deadLetters = await taskProvider.getDeadLetterTasks("my-queue"); + +// Clean up +await taskProvider.disconnect(); +``` + ## API ### RabbitMqMessageProviderOptions @@ -90,6 +151,83 @@ Cancels all subscriptions and closes the underlying RabbitMQ connection. Convenience factory that returns a `Qified` instance configured with `RabbitMqMessageProvider`. +### RabbitMqTaskProviderOptions + +Configuration options for the RabbitMQ task provider. Extends `TaskProviderOptions`. + +- `uri?`: RabbitMQ connection URI. Defaults to `"amqp://localhost:5672"`. +- `id?`: Unique identifier for this provider instance. Defaults to `"@qified/rabbitmq-task"`. +- `timeout?`: Default timeout in milliseconds for task processing. Defaults to `30000`. +- `retries?`: Default maximum retry attempts before a task is moved to the dead-letter queue. Defaults to `3`. +- `pollInterval?`: Interval in milliseconds to poll for scheduled tasks. Defaults to `1000`. +- `reconnectTimeInSeconds?`: Time in seconds to wait before reconnecting after connection loss. Set to `0` to disable. Defaults to `5`. + +### RabbitMqTaskProvider + +Implements the `TaskProvider` interface using RabbitMQ durable queues for reliable task processing. Extends `Hookified` for event emission. Features include: + +- Automatic retries with configurable max attempts +- Task timeouts with automatic rejection on expiry +- Scheduled tasks (processed after a given timestamp) +- Dead-letter queue for failed tasks +- Automatic reconnection on connection loss + +#### constructor(options?: RabbitMqTaskProviderOptions) + +Creates a new task provider instance. + +#### connect() + +Explicitly connects to RabbitMQ. Called automatically on first `enqueue` or `dequeue` if not called manually. + +#### enqueue(queue: string, taskData: EnqueueTask) + +Enqueues a task to the specified queue. Returns a `Promise`. + +Task data options: + +- `data`: The task payload (any serializable value). +- `id?`: Custom task ID. Auto-generated if omitted. +- `timeout?`: Per-task timeout override in milliseconds. +- `retries?`: Per-task max retry override. +- `scheduledAt?`: Unix timestamp (ms). Task will not be processed until this time. +- `priority?`: Task priority value. + +#### dequeue(queue: string, handler: TaskHandler) + +Registers a handler to process tasks from the specified queue. The handler receives a `Task` and a `TaskContext`. + +`TaskContext` methods: + +- `ack()`: Acknowledge the task (removes it from the queue). +- `reject(requeue?: boolean)`: Reject the task. If `requeue` is `true` (default), re-enqueues for retry. After max retries, moves to dead-letter queue. +- `extend(ms: number)`: Extend the processing deadline by the given milliseconds. +- `metadata()`: Returns `{ attempt, maxRetries }` for the current task. + +#### unsubscribe(queue: string, id?: string) + +Removes a handler by id, or all handlers for the queue if no id is provided. + +#### disconnect(force?: boolean) + +Disconnects from RabbitMQ and cleans up all consumers, timers, and in-memory state. If `force` is `true`, skips graceful channel close. + +#### getDeadLetterTasks(queue: string) + +Returns an array of tasks that have been moved to the dead-letter queue for the given queue. + +#### getQueueStats(queue: string) + +Returns statistics for the given queue: + +```ts +{ waiting: number; processing: number; deadLetter: number; scheduled: number } +``` + +#### clearQueue(queue: string) + +Purges all tasks from the queue and its dead-letter queue, and clears all in-memory tracking state. + ## Contributing Contributions are welcome! Please read the [CONTRIBUTING.md](../../CONTRIBUTING.md) and [CODE_OF_CONDUCT.md](../../CODE_OF_CONDUCT.md) for details on our process. diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index 53c9fcb..330e2e9 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -1,11 +1,6 @@ import { Buffer } from "node:buffer"; import { randomUUID } from "node:crypto"; -import { - type Channel, - type ChannelModel, - type ConsumeMessage, - connect, -} from "amqplib"; +import { type Channel, type ChannelModel, connect } from "amqplib"; import { Hookified } from "hookified"; import type { EnqueueTask, @@ -78,7 +73,7 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { // In-memory tracking for attempt counts: taskId -> count private readonly _attemptCounts: Map = new Map(); - // Scheduled tasks: queue -> ScheduledEntry[] + // Scheduled tasks: queue -> entries private readonly _scheduledTasks: Map< string, Array<{ task: Task; scheduledAt: number }> @@ -343,8 +338,24 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { return; } + // Shared AMQP message state to prevent double ack/nack + let amqpHandled = false; + const ackAmqp = () => { + if (!amqpHandled) { + amqpHandled = true; + channel.ack(message_); + } + }; + + const nackAmqp = () => { + if (!amqpHandled) { + amqpHandled = true; + channel.nack(message_, false, false); + } + }; + for (const handler of handlers) { - await this.processTask(queue, task, message_, handler); + await this.processTask(queue, task, handler, ackAmqp, nackAmqp); } }, { noAck: false }, @@ -359,8 +370,9 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { private async processTask( queue: string, task: Task, - amqpMessage: ConsumeMessage, handler: TaskHandler, + ackAmqp: () => void, + nackAmqp: () => void, ): Promise { const maxRetries = task.maxRetries ?? this._retries; const timeout = task.timeout ?? this._timeout; @@ -373,8 +385,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { let rejected = false; let timeoutHandle: ReturnType | undefined; - const channel = await this.getChannel(); - const context: TaskContext = { ack: async () => { if (acknowledged || rejected || !this._active) { @@ -383,7 +393,7 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { acknowledged = true; try { - channel.ack(amqpMessage); + ackAmqp(); this.cleanupTask(queue, task.id); } catch (error) { /* v8 ignore next -- @preserve */ @@ -397,7 +407,7 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { rejected = true; try { - channel.nack(amqpMessage, false, false); + nackAmqp(); if (requeue && currentAttempt < maxRetries) { await this.publishTask(queue, task); @@ -713,6 +723,7 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { /** * Gets the current state of a queue. + * Uses assertQueue to safely check queue state without risking channel closure. * @param queue The queue name * @returns Queue statistics */ @@ -725,10 +736,13 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { let waiting = 0; try { const channel = await this.getChannel(); - const queueInfo = await channel.checkQueue(queue); + // Use assertQueue (idempotent) instead of checkQueue to avoid + // channel closure when queue doesn't exist + const queueInfo = await channel.assertQueue(queue, { durable: true }); waiting = queueInfo.messageCount; } catch { - // Queue may not exist yet + /* v8 ignore next -- @preserve */ + // Queue assertion failed } const deadLetter = this._deadLetterTasks.get(queue)?.length ?? 0; @@ -744,21 +758,18 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { /** * Clears all data for a queue. Useful for testing. + * Uses assertQueue before purgeQueue to avoid channel closure on non-existent queues. * @param queue The queue name to clear */ public async clearQueue(queue: string): Promise { const channel = await this.getChannel(); - try { - await channel.purgeQueue(queue); - } catch { - /* queue may not exist */ - } - try { - await channel.purgeQueue(`${queue}:dead-letter`); - } catch { - /* queue may not exist */ - } + // Assert queues first to ensure they exist (prevents channel error on purge) + await channel.assertQueue(queue, { durable: true }); + await channel.purgeQueue(queue); + + await channel.assertQueue(`${queue}:dead-letter`, { durable: true }); + await channel.purgeQueue(`${queue}:dead-letter`); this._deadLetterTasks.delete(queue); this._scheduledTasks.delete(queue); diff --git a/packages/rabbitmq/test/task-edge.test.ts b/packages/rabbitmq/test/task-edge.test.ts new file mode 100644 index 0000000..59eb3b1 --- /dev/null +++ b/packages/rabbitmq/test/task-edge.test.ts @@ -0,0 +1,342 @@ +import { EventEmitter } from "node:events"; +import type { Channel, ChannelModel } from "amqplib"; +import { + afterEach, + beforeEach, + describe, + expect, + type Mock, + test, + vi, +} from "vitest"; +import { RabbitMqTaskProvider } from "../src/index.js"; + +// Mock amqplib +vi.mock("amqplib", () => ({ + connect: vi.fn(), +})); + +function createMockChannel(): Channel { + return { + assertQueue: vi + .fn() + .mockResolvedValue({ queue: "test", messageCount: 0, consumerCount: 0 }), + sendToQueue: vi.fn().mockReturnValue(true), + consume: vi.fn().mockResolvedValue({ consumerTag: `ctag-${Date.now()}` }), + cancel: vi.fn().mockResolvedValue({}), + ack: vi.fn(), + nack: vi.fn(), + prefetch: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + purgeQueue: vi.fn().mockResolvedValue({ messageCount: 0 }), + } as unknown as Channel; +} + +function createMockConnection(channel: Channel): ChannelModel & EventEmitter { + const emitter = new EventEmitter(); + return Object.assign(emitter, { + createChannel: vi.fn().mockResolvedValue(channel), + close: vi.fn().mockResolvedValue(undefined), + }) as unknown as ChannelModel & EventEmitter; +} + +let mockConnect: Mock; + +beforeEach(async () => { + const amqplib = await import("amqplib"); + mockConnect = amqplib.connect as unknown as Mock; + mockConnect.mockReset(); +}); + +afterEach(() => { + vi.useRealTimers(); +}); + +describe("RabbitMqTaskProvider (edge cases requiring mocks)", () => { + test("should schedule reconnect when connection closes unexpectedly", async () => { + vi.useFakeTimers(); + + const channel1 = createMockChannel(); + const connection1 = createMockConnection(channel1); + const channel2 = createMockChannel(); + const connection2 = createMockConnection(channel2); + + mockConnect + .mockResolvedValueOnce(connection1) + .mockResolvedValueOnce(connection2); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + expect(mockConnect).toHaveBeenCalledTimes(1); + + // Simulate unexpected connection loss — triggers _scheduleReconnect (line 179) + connection1.emit("close"); + + // Advance timer so reconnect fires (lines 212-215) + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(2); + + await provider.disconnect(true); + }); + + test("should retry reconnection on failure then succeed", async () => { + vi.useFakeTimers(); + + const channel1 = createMockChannel(); + const connection1 = createMockConnection(channel1); + const channel3 = createMockChannel(); + const connection3 = createMockConnection(channel3); + + mockConnect + .mockResolvedValueOnce(connection1) + .mockRejectedValueOnce(new Error("Connection refused")) + .mockResolvedValueOnce(connection3); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + expect(mockConnect).toHaveBeenCalledTimes(1); + + // Simulate connection loss + connection1.emit("close"); + + // First reconnect attempt — fails (line 242-246) + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(2); + + // Second reconnect attempt — succeeds (lines 228-241) + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(3); + + await provider.disconnect(true); + }); + + test("should not reconnect if reconnectTimeInSeconds is 0", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValueOnce(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 0, + }); + await provider.connect(); + + // Simulate connection loss — _scheduleReconnect should bail out (line 205) + connection.emit("close"); + + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(5000); + expect(mockConnect).not.toHaveBeenCalled(); + + await provider.disconnect(true); + }); + + test("should not reconnect if closing flag is set when timer fires", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValue(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Simulate connection loss — schedules reconnect + connection.emit("close"); + + // Set _closing before the timer fires (line 222-223) + (provider as unknown as { _closing: boolean })._closing = true; + + try { + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(1500); + expect(mockConnect).not.toHaveBeenCalled(); + } finally { + (provider as unknown as { _closing: boolean })._closing = false; + } + }); + + test("should not schedule reconnect if already reconnecting", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValue(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Set _reconnecting to true — _scheduleReconnect should bail out (line 206) + (provider as unknown as { _reconnecting: boolean })._reconnecting = true; + + connection.emit("close"); + + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(5000); + expect(mockConnect).not.toHaveBeenCalled(); + + (provider as unknown as { _reconnecting: boolean })._reconnecting = false; + await provider.disconnect(true); + }); + + test("should re-establish consumers on reconnect", async () => { + vi.useFakeTimers(); + + const channel1 = createMockChannel(); + const connection1 = createMockConnection(channel1); + const channel2 = createMockChannel(); + const connection2 = createMockConnection(channel2); + + mockConnect + .mockResolvedValueOnce(connection1) + .mockResolvedValueOnce(connection2); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Register a handler so there's something to re-establish (lines 237-241) + provider.taskHandlers.set("my-queue", [ + { id: "h1", handler: async () => {} }, + ]); + + // Simulate connection loss + connection1.emit("close"); + + // Reconnect fires and re-establishes consumers + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(2); + // Should have called consume on the new channel for the queue + expect(channel2.consume).toHaveBeenCalledWith( + "my-queue", + expect.any(Function), + { noAck: false }, + ); + + await provider.disconnect(true); + }); + + test("should handle null message in consumer callback", async () => { + const channel = createMockChannel(); + const connection = createMockConnection(channel); + mockConnect.mockResolvedValue(connection); + + let consumerCallback: (msg: unknown) => Promise; + (channel.consume as Mock).mockImplementation( + async (_queue: string, cb: (msg: unknown) => Promise) => { + consumerCallback = cb; + return { consumerTag: "ctag-test" }; + }, + ); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 0, + }); + await provider.connect(); + + // Register a handler and set up consumer + const tasks: unknown[] = []; + provider.taskHandlers.set("test-q", [ + { + id: "h1", + handler: async (task) => { + tasks.push(task); + }, + }, + ]); + + const ch = (provider as unknown as { _channel: Channel })._channel; + // Manually call _setupConsumer + await ( + provider as unknown as { + _setupConsumer(ch: Channel, q: string): Promise; + } + )._setupConsumer(ch, "test-q"); + + // Send null message — should return early (line 323-324) + // biome-ignore lint/style/noNonNullAssertion: set by mock + await consumerCallback!(null); + expect(tasks).toHaveLength(0); + expect(channel.ack).not.toHaveBeenCalled(); + expect(channel.nack).not.toHaveBeenCalled(); + + await provider.disconnect(true); + }); + + test("should clear reconnect timer on disconnect", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValue(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Simulate connection loss — schedules reconnect timer + connection.emit("close"); + + // Verify the timer is set (lines 657-658) + const internal = provider as unknown as { + _reconnectTimer: ReturnType | undefined; + }; + expect(internal._reconnectTimer).toBeDefined(); + + // Disconnect should clear the timer + await provider.disconnect(true); + expect(internal._reconnectTimer).toBeUndefined(); + + // No reconnect should have happened + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(5000); + expect(mockConnect).not.toHaveBeenCalled(); + }); + + test("should handle connection error event gracefully", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + const channel2 = createMockChannel(); + const connection2 = createMockConnection(channel2); + + mockConnect + .mockResolvedValueOnce(connection) + .mockResolvedValueOnce(connection2); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + expect(connection.listenerCount("error")).toBe(1); + expect(connection.listenerCount("close")).toBe(1); + + // Emit error — handler is a no-op but should not throw + connection.emit("error", new Error("Socket closed")); + + // Close event triggers reconnection + connection.emit("close"); + + await vi.advanceTimersByTimeAsync(1500); + expect(mockConnect).toHaveBeenCalledTimes(2); + + await provider.disconnect(true); + }); +}); diff --git a/packages/rabbitmq/test/task.test.ts b/packages/rabbitmq/test/task.test.ts index d10dbb0..8aeacd3 100644 --- a/packages/rabbitmq/test/task.test.ts +++ b/packages/rabbitmq/test/task.test.ts @@ -28,9 +28,26 @@ async function waitFor( throw new Error(`waitFor timed out after ${timeoutMs}ms`); } +let queueCounter = 0; +function uniqueQueue(): string { + queueCounter++; + return `test-queue-${Date.now()}-${queueCounter}`; +} + describe("RabbitMqTaskProvider", () => { let provider: RabbitMqTaskProvider; - const testQueue = `test-queue-${Date.now()}`; + const testQueue = uniqueQueue(); + + // Track custom providers for cleanup + const customProviders: RabbitMqTaskProvider[] = []; + + async function createCustomProvider( + options: ConstructorParameters[0] = {}, + ): Promise { + const p = new RabbitMqTaskProvider(options); + customProviders.push(p); + return p; + } describe("hookified inheritance", () => { test("should extend Hookified and support event emission", () => { @@ -41,9 +58,10 @@ describe("RabbitMqTaskProvider", () => { }); test("should emit error events when RabbitMQ operations fail in ack", async () => { - const customProvider = new RabbitMqTaskProvider(); + const customProvider = await createCustomProvider(); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + const q = uniqueQueue(); + await customProvider.clearQueue(q); const errors: Error[] = []; customProvider.on("error", (error: Error) => { @@ -57,20 +75,20 @@ describe("RabbitMqTaskProvider", () => { }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); // Wait for task to be processed await new Promise((resolve) => setTimeout(resolve, 200)); // Clean up - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); await customProvider.disconnect(); // Create a new provider to verify error listener support - const p2 = new RabbitMqTaskProvider(); + const p2 = await createCustomProvider(); await p2.connect(); - await p2.clearQueue(testQueue); + await p2.clearQueue(q); const errors2: Error[] = []; p2.on("error", (error: Error) => { @@ -79,7 +97,7 @@ describe("RabbitMqTaskProvider", () => { expect(typeof p2.on).toBe("function"); - await p2.clearQueue(testQueue); + await p2.clearQueue(q); await p2.disconnect(); }); @@ -102,6 +120,17 @@ describe("RabbitMqTaskProvider", () => { }); afterEach(async () => { + // Clean up custom providers first + for (const p of customProviders) { + try { + await p.disconnect(true); + } catch { + /* ignore */ + } + } + + customProviders.length = 0; + await provider.clearQueue(testQueue); await provider.disconnect(); }); @@ -116,30 +145,30 @@ describe("RabbitMqTaskProvider", () => { }); test("should initialize with custom id", () => { - const customProvider = new RabbitMqTaskProvider({ id: "custom-id" }); - expect(customProvider.id).toBe("custom-id"); + const p = new RabbitMqTaskProvider({ id: "custom-id" }); + expect(p.id).toBe("custom-id"); }); test("should initialize with custom timeout", () => { - const customProvider = new RabbitMqTaskProvider({ timeout: 5000 }); - expect(customProvider.timeout).toBe(5000); + const p = new RabbitMqTaskProvider({ timeout: 5000 }); + expect(p.timeout).toBe(5000); }); test("should initialize with custom retries", () => { - const customProvider = new RabbitMqTaskProvider({ retries: 5 }); - expect(customProvider.retries).toBe(5); + const p = new RabbitMqTaskProvider({ retries: 5 }); + expect(p.retries).toBe(5); }); test("should initialize with all custom options", () => { - const customProvider = new RabbitMqTaskProvider({ + const p = new RabbitMqTaskProvider({ id: "custom-id", timeout: 5000, retries: 5, pollInterval: 500, }); - expect(customProvider.id).toBe("custom-id"); - expect(customProvider.timeout).toBe(5000); - expect(customProvider.retries).toBe(5); + expect(p.id).toBe("custom-id"); + expect(p.timeout).toBe(5000); + expect(p.retries).toBe(5); }); test("should fail to connect when RabbitMQ is not available", async () => { @@ -184,7 +213,6 @@ describe("RabbitMqTaskProvider", () => { expect(taskId).toBeDefined(); expect(typeof taskId).toBe("string"); - // UUID format: task-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx expect(taskId).toMatch( /^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, ); @@ -347,9 +375,10 @@ describe("RabbitMqTaskProvider", () => { describe("task rejection and retry", () => { test("should reject and requeue task on failure", async () => { - const customProvider = new RabbitMqTaskProvider({ pollInterval: 100 }); + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ pollInterval: 100 }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); let attemptCount = 0; const handler: TaskHandler = { @@ -357,65 +386,62 @@ describe("RabbitMqTaskProvider", () => { handler: async (_task: Task, ctx: TaskContext) => { attemptCount++; if (attemptCount === 1) { - await ctx.reject(true); // Requeue + await ctx.reject(true); } else { await ctx.ack(); } }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); await waitFor(() => attemptCount > 1); expect(attemptCount).toBeGreaterThan(1); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); test("should move to dead-letter queue after max retries", async () => { - const customProvider = new RabbitMqTaskProvider({ + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ retries: 2, pollInterval: 100, }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); let attemptCount = 0; - const handler: TaskHandler = { id: "test-handler", handler: async (_task: Task, ctx: TaskContext) => { attemptCount++; - await ctx.reject(true); // Always reject + await ctx.reject(true); }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); await waitFor(async () => { - const dlq = await customProvider.getDeadLetterTasks(testQueue); + const dlq = await customProvider.getDeadLetterTasks(q); return dlq.length === 1; }); - const deadLetterTasks = - await customProvider.getDeadLetterTasks(testQueue); + const deadLetterTasks = await customProvider.getDeadLetterTasks(q); expect(deadLetterTasks.length).toBe(1); expect(deadLetterTasks[0].data).toEqual({ message: "test" }); expect(attemptCount).toBe(2); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); test("should move to dead-letter queue when reject with requeue=false", async () => { const handler: TaskHandler = { id: "test-handler", handler: async (_task: Task, ctx: TaskContext) => { - await ctx.reject(false); // Don't requeue + await ctx.reject(false); }, }; @@ -454,12 +480,13 @@ describe("RabbitMqTaskProvider", () => { }); test("should auto-reject task on handler error", async () => { - const customProvider = new RabbitMqTaskProvider({ + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ retries: 3, pollInterval: 100, }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); let attemptCount = 0; const handler: TaskHandler = { @@ -470,102 +497,108 @@ describe("RabbitMqTaskProvider", () => { }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); await waitFor(async () => { - const dlq = await customProvider.getDeadLetterTasks(testQueue); + const dlq = await customProvider.getDeadLetterTasks(q); return dlq.length === 1; }); expect(attemptCount).toBe(3); - const deadLetterTasks = - await customProvider.getDeadLetterTasks(testQueue); + const deadLetterTasks = await customProvider.getDeadLetterTasks(q); expect(deadLetterTasks.length).toBe(1); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); }); describe("task timeout", () => { test("should timeout task after configured timeout", async () => { - const customProvider = new RabbitMqTaskProvider({ + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ timeout: 100, + retries: 1, pollInterval: 50, }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); + let handlerCalled = false; const handler: TaskHandler = { id: "test-handler", handler: async () => { + handlerCalled = true; // Simulate long-running task await new Promise((resolve) => setTimeout(resolve, 500)); }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); - await new Promise((resolve) => setTimeout(resolve, 300)); + // Wait for the task to be processed and eventually end up in DLQ + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(q); + return dlq.length > 0; + }, 10_000); - // Task should have timed out and be requeued or in DLQ - const stats = await customProvider.getQueueStats(testQueue); - const dlq = await customProvider.getDeadLetterTasks(testQueue); - const taskTimedOut = stats.waiting > 0 || dlq.length > 0; - expect(taskTimedOut).toBe(true); + expect(handlerCalled).toBe(true); + const dlq = await customProvider.getDeadLetterTasks(q); + expect(dlq.length).toBeGreaterThan(0); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); test("should use task-specific timeout over provider default", async () => { - const customProvider = new RabbitMqTaskProvider({ + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ timeout: 5000, + retries: 1, pollInterval: 50, }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); + let handlerCalled = false; const handler: TaskHandler = { id: "test-handler", - handler: async (_task: Task, ctx: TaskContext) => { + handler: async () => { + handlerCalled = true; // Simulate long-running task - await new Promise((resolve) => setTimeout(resolve, 200)); - await ctx.ack(); + await new Promise((resolve) => setTimeout(resolve, 500)); }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" }, timeout: 50, // Task-specific timeout (shorter than handler duration) }); + // Wait for the task to end up in DLQ after retries exhaust await waitFor(async () => { - const s = await customProvider.getQueueStats(testQueue); - const dlq = await customProvider.getDeadLetterTasks(testQueue); - return s.waiting + dlq.length > 0; - }); + const dlq = await customProvider.getDeadLetterTasks(q); + return dlq.length > 0; + }, 10_000); - const stats = await customProvider.getQueueStats(testQueue); - const dlq = await customProvider.getDeadLetterTasks(testQueue); - expect(stats.waiting + dlq.length).toBeGreaterThan(0); + expect(handlerCalled).toBe(true); + const dlq = await customProvider.getDeadLetterTasks(q); + expect(dlq.length).toBeGreaterThan(0); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); }); describe("task context extend", () => { test("should extend task deadline", async () => { - const customProvider = new RabbitMqTaskProvider({ + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ timeout: 200, pollInterval: 50, }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); let completed = false; @@ -580,17 +613,16 @@ describe("RabbitMqTaskProvider", () => { }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); await waitFor(() => completed); expect(completed).toBe(true); - const stats = await customProvider.getQueueStats(testQueue); + const stats = await customProvider.getQueueStats(q); expect(stats.waiting).toBe(0); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); test("should not extend after acknowledgment", async () => { @@ -635,9 +667,10 @@ describe("RabbitMqTaskProvider", () => { }); test("should increment attempt on retry", async () => { - const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ pollInterval: 50 }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); const attempts: number[] = []; const handler: TaskHandler = { @@ -652,8 +685,8 @@ describe("RabbitMqTaskProvider", () => { }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); await waitFor(() => attempts.length > 1); @@ -661,8 +694,7 @@ describe("RabbitMqTaskProvider", () => { expect(attempts[0]).toBe(1); expect(attempts[1]).toBe(2); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); test("should use task-specific maxRetries", async () => { @@ -711,9 +743,10 @@ describe("RabbitMqTaskProvider", () => { }); test("should process task after scheduledAt time via polling", async () => { - const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ pollInterval: 50 }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); let processed = false; const handler: TaskHandler = { @@ -723,8 +756,8 @@ describe("RabbitMqTaskProvider", () => { }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" }, scheduledAt: Date.now() + 100, // Schedule 100ms in future }); @@ -738,8 +771,7 @@ describe("RabbitMqTaskProvider", () => { expect(processed).toBe(true); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); }); @@ -838,14 +870,15 @@ describe("RabbitMqTaskProvider", () => { }); test("should force disconnect", async () => { - const p = new RabbitMqTaskProvider(); + const p = await createCustomProvider(); await p.connect(); const handler: TaskHandler = { id: "test-handler", handler: async () => {}, }; - await p.dequeue(testQueue, handler); + const q = uniqueQueue(); + await p.dequeue(q, handler); // Force disconnect should skip graceful close await p.disconnect(true); @@ -893,16 +926,20 @@ describe("RabbitMqTaskProvider", () => { }); test("should return correct stats for queue with waiting tasks", async () => { - await provider.enqueue(testQueue, { data: { message: "test1" } }); - await provider.enqueue(testQueue, { data: { message: "test2" } }); + // Use a fresh queue with no consumer so messages stay as "ready" + const q = uniqueQueue(); + await provider.enqueue(q, { data: { message: "test1" } }); + await provider.enqueue(q, { data: { message: "test2" } }); // Small delay for RabbitMQ to process await new Promise((resolve) => setTimeout(resolve, 50)); - const stats = await provider.getQueueStats(testQueue); + const stats = await provider.getQueueStats(q); expect(stats.waiting).toBe(2); expect(stats.processing).toBe(0); expect(stats.deadLetter).toBe(0); + + await provider.clearQueue(q); }); test("should return correct stats with scheduled tasks", async () => { @@ -971,8 +1008,8 @@ describe("RabbitMqTaskProvider", () => { describe("multiple queues", () => { test("should maintain separate queues", async () => { - const queue1 = `${testQueue}-1`; - const queue2 = `${testQueue}-2`; + const queue1 = uniqueQueue(); + const queue2 = uniqueQueue(); let queue1Processed = false; let queue2Processed = false; @@ -1081,9 +1118,10 @@ describe("RabbitMqTaskProvider", () => { describe("edge cases", () => { test("should handle disconnect during task processing", async () => { - const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ pollInterval: 50 }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); const handler: TaskHandler = { id: "test-handler", @@ -1093,8 +1131,8 @@ describe("RabbitMqTaskProvider", () => { }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); // Wait just enough for processing to start await new Promise((resolve) => setTimeout(resolve, 50)); @@ -1107,11 +1145,12 @@ describe("RabbitMqTaskProvider", () => { }); test("should handle polling loop exit when inactive", async () => { - const customProvider = new RabbitMqTaskProvider({ pollInterval: 20 }); + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ pollInterval: 20 }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); - await customProvider.dequeue(testQueue, { + await customProvider.dequeue(q, { id: "test-handler", handler: async () => {}, }); @@ -1130,12 +1169,14 @@ describe("RabbitMqTaskProvider", () => { }); test("should handle extended timeout expiration triggering reject", async () => { - const customProvider = new RabbitMqTaskProvider({ + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ timeout: 5000, + retries: 1, pollInterval: 50, }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); let extendCalled = false; const handler: TaskHandler = { @@ -1150,35 +1191,36 @@ describe("RabbitMqTaskProvider", () => { }, }; - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); - await waitFor(() => extendCalled); - - // Wait for extended timeout to fire - await new Promise((resolve) => setTimeout(resolve, 300)); + // Wait for processing and extended timeout to eventually move to DLQ + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(q); + return extendCalled && dlq.length > 0; + }, 10_000); expect(extendCalled).toBe(true); - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); + await customProvider.clearQueue(q); }); test("should handle disconnect during scheduled task processing", async () => { - const customProvider = new RabbitMqTaskProvider({ pollInterval: 50 }); + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ pollInterval: 50 }); await customProvider.connect(); - await customProvider.clearQueue(testQueue); + await customProvider.clearQueue(q); // Enqueue multiple scheduled tasks for (let i = 0; i < 5; i++) { - await customProvider.enqueue(testQueue, { + await customProvider.enqueue(q, { data: { message: `scheduled-${i}` }, scheduledAt: Date.now() + 10, // Very short delay }); } // Register a handler to start polling - await customProvider.dequeue(testQueue, { + await customProvider.dequeue(q, { id: "test-handler", handler: async () => {}, }); From 5a7615487eeacc9e63006549225b1d32ac6b8eae Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 10:25:37 -0800 Subject: [PATCH 03/13] Update task.ts --- packages/rabbitmq/src/task.ts | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index 330e2e9..13d5062 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -85,6 +85,9 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { // Track queue -> set of taskIds for cleanup private readonly _queueTaskIds: Map> = new Map(); + // Track currently processing tasks: queue -> set of taskIds + private readonly _processingTasks: Map> = new Map(); + /** * Creates a new RabbitMQ task provider instance. * @param options Configuration options for the provider @@ -338,6 +341,13 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { return; } + // Track as processing + if (!this._processingTasks.has(queue)) { + this._processingTasks.set(queue, new Set()); + } + + this._processingTasks.get(queue)?.add(task.id); + // Shared AMQP message state to prevent double ack/nack let amqpHandled = false; const ackAmqp = () => { @@ -357,6 +367,9 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { for (const handler of handlers) { await this.processTask(queue, task, handler, ackAmqp, nackAmqp); } + + // Remove from processing + this._processingTasks.get(queue)?.delete(task.id); }, { noAck: false }, ); @@ -670,6 +683,7 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { this._scheduledTasks.clear(); this._attemptCounts.clear(); this._queueTaskIds.clear(); + this._processingTasks.clear(); // Cancel consumers and close channel/connection if (this._channel) { @@ -725,7 +739,8 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { * Gets the current state of a queue. * Uses assertQueue to safely check queue state without risking channel closure. * @param queue The queue name - * @returns Queue statistics + * @returns Queue statistics: `waiting` (ready messages in RabbitMQ), `processing` (tasks + * currently being handled by this provider instance), `deadLetter`, and `scheduled`. */ public async getQueueStats(queue: string): Promise<{ waiting: number; @@ -745,12 +760,13 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { // Queue assertion failed } + const processing = this._processingTasks.get(queue)?.size ?? 0; const deadLetter = this._deadLetterTasks.get(queue)?.length ?? 0; const scheduled = this._scheduledTasks.get(queue)?.length ?? 0; return { waiting, - processing: 0, + processing, deadLetter, scheduled, }; @@ -773,6 +789,7 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { this._deadLetterTasks.delete(queue); this._scheduledTasks.delete(queue); + this._processingTasks.delete(queue); // Clear task data for this queue const taskIds = this._queueTaskIds.get(queue); From e9c0cee714ae2a756c12dc4410542f64e1a6a211 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 10:44:47 -0800 Subject: [PATCH 04/13] removing scheduled tasks for compatibility --- packages/qified/src/memory/task.ts | 8 -- packages/qified/src/types.ts | 7 -- packages/qified/test/memory-task.test.ts | 82 --------------- packages/rabbitmq/README.md | 15 +-- packages/rabbitmq/src/index.ts | 1 - packages/rabbitmq/src/task.ts | 123 +---------------------- packages/rabbitmq/test/task.test.ts | 117 +-------------------- packages/redis/src/task.ts | 80 ++------------- packages/redis/test/task.test.ts | 106 +------------------ 9 files changed, 21 insertions(+), 518 deletions(-) diff --git a/packages/qified/src/memory/task.ts b/packages/qified/src/memory/task.ts index b1e154d..fe623ef 100644 --- a/packages/qified/src/memory/task.ts +++ b/packages/qified/src/memory/task.ts @@ -223,14 +223,6 @@ export class MemoryTaskProvider implements TaskProvider { continue; } - // Check if task is scheduled for later - if ( - queuedTask.task.scheduledAt && - queuedTask.task.scheduledAt > Date.now() - ) { - continue; - } - // Mark as processing queuedTask.processing = true; processingSet.add(queuedTask.task.id); diff --git a/packages/qified/src/types.ts b/packages/qified/src/types.ts index 22d6180..13bbc4f 100644 --- a/packages/qified/src/types.ts +++ b/packages/qified/src/types.ts @@ -103,13 +103,6 @@ export type Task = { */ timestamp?: number; - /** - * Scheduled time for delayed task execution (milliseconds since epoch) - * If set, task won't be processed until this time - * @type {number} - */ - scheduledAt?: number; - /** * Headers for additional metadata * @type {Record} diff --git a/packages/qified/test/memory-task.test.ts b/packages/qified/test/memory-task.test.ts index 5e90542..8ea080c 100644 --- a/packages/qified/test/memory-task.test.ts +++ b/packages/qified/test/memory-task.test.ts @@ -107,7 +107,6 @@ describe("MemoryTaskProvider", () => { priority: 10, maxRetries: 5, timeout: 5000, - scheduledAt: Date.now() + 10000, }); expect(taskId).toBeDefined(); @@ -523,58 +522,6 @@ describe("MemoryTaskProvider", () => { }); }); - describe("scheduled tasks", () => { - test("should not process task before scheduledAt time", async () => { - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await provider.dequeue("test-queue", handler); - await provider.enqueue("test-queue", { - data: { message: "test" }, - scheduledAt: Date.now() + 1000, // Schedule 1 second in future - }); - - await new Promise((resolve) => setTimeout(resolve, 100)); - - expect(processed).toBe(false); - const stats = provider.getQueueStats("test-queue"); - expect(stats.waiting).toBe(1); - }); - - test("should process task after scheduledAt time", async () => { - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await provider.dequeue("test-queue", handler); - await provider.enqueue("test-queue", { - data: { message: "test" }, - scheduledAt: Date.now() + 50, // Schedule 50ms in future - }); - - // Task should not be processed yet - await new Promise((resolve) => setTimeout(resolve, 25)); - expect(processed).toBe(false); - - // Now enqueue another task to trigger processing of scheduled task - await provider.enqueue("test-queue", { data: { message: "trigger" } }); - - // Wait for scheduled task to be processed - await new Promise((resolve) => setTimeout(resolve, 100)); - - expect(processed).toBe(true); - }); - }); - describe("unsubscribe", () => { test("should unsubscribe specific handler by id", async () => { const handler1: TaskHandler = { @@ -798,35 +745,6 @@ describe("MemoryTaskProvider", () => { expect(stats.waiting).toBe(0); expect(stats.processing).toBe(0); }); - - test("should not process scheduled tasks after disconnect", async () => { - const customProvider = new MemoryTaskProvider(); - let taskProcessed = false; - - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - taskProcessed = true; - }, - }; - - await customProvider.dequeue("test-queue", handler); - - // Enqueue a task scheduled for the future - await customProvider.enqueue("test-queue", { - data: { message: "scheduled-task" }, - scheduledAt: Date.now() + 100, - }); - - // Disconnect before the scheduled time - await customProvider.disconnect(); - - // Wait past the scheduled time - await new Promise((resolve) => setTimeout(resolve, 150)); - - // Task should not have been processed - expect(taskProcessed).toBe(false); - }); }); describe("getDeadLetterTasks", () => { diff --git a/packages/rabbitmq/README.md b/packages/rabbitmq/README.md index 6b69e98..a4af112 100644 --- a/packages/rabbitmq/README.md +++ b/packages/rabbitmq/README.md @@ -2,7 +2,7 @@ RabbitMQ message and task provider for [Qified](https://github.com/jaredwray/qified). -This package implements a message provider and a task provider backed by RabbitMQ. The message provider uses queues for publish/subscribe operations, and the task provider adds reliable task queue processing with retries, timeouts, scheduled tasks, and dead-letter queues. +This package implements a message provider and a task provider backed by RabbitMQ. The message provider uses queues for publish/subscribe operations, and the task provider adds reliable task queue processing with retries, timeouts, and dead-letter queues. ## Table of Contents @@ -73,12 +73,6 @@ await taskProvider.enqueue("my-queue", { data: { action: "send-email", to: "user@example.com" }, }); -// Enqueue a scheduled task (processed after the given time) -await taskProvider.enqueue("my-queue", { - data: { action: "send-reminder" }, - scheduledAt: Date.now() + 60_000, // 1 minute from now -}); - // Dequeue and process tasks await taskProvider.dequeue("my-queue", { id: "email-handler", @@ -98,7 +92,7 @@ await taskProvider.dequeue("my-queue", { // Get queue statistics const stats = await taskProvider.getQueueStats("my-queue"); -console.log(stats); // { waiting, processing, deadLetter, scheduled } +console.log(stats); // { waiting, processing, deadLetter } // Get dead-letter tasks for inspection const deadLetters = await taskProvider.getDeadLetterTasks("my-queue"); @@ -159,7 +153,6 @@ Configuration options for the RabbitMQ task provider. Extends `TaskProviderOptio - `id?`: Unique identifier for this provider instance. Defaults to `"@qified/rabbitmq-task"`. - `timeout?`: Default timeout in milliseconds for task processing. Defaults to `30000`. - `retries?`: Default maximum retry attempts before a task is moved to the dead-letter queue. Defaults to `3`. -- `pollInterval?`: Interval in milliseconds to poll for scheduled tasks. Defaults to `1000`. - `reconnectTimeInSeconds?`: Time in seconds to wait before reconnecting after connection loss. Set to `0` to disable. Defaults to `5`. ### RabbitMqTaskProvider @@ -168,7 +161,6 @@ Implements the `TaskProvider` interface using RabbitMQ durable queues for reliab - Automatic retries with configurable max attempts - Task timeouts with automatic rejection on expiry -- Scheduled tasks (processed after a given timestamp) - Dead-letter queue for failed tasks - Automatic reconnection on connection loss @@ -190,7 +182,6 @@ Task data options: - `id?`: Custom task ID. Auto-generated if omitted. - `timeout?`: Per-task timeout override in milliseconds. - `retries?`: Per-task max retry override. -- `scheduledAt?`: Unix timestamp (ms). Task will not be processed until this time. - `priority?`: Task priority value. #### dequeue(queue: string, handler: TaskHandler) @@ -221,7 +212,7 @@ Returns an array of tasks that have been moved to the dead-letter queue for the Returns statistics for the given queue: ```ts -{ waiting: number; processing: number; deadLetter: number; scheduled: number } +{ waiting: number; processing: number; deadLetter: number } ``` #### clearQueue(queue: string) diff --git a/packages/rabbitmq/src/index.ts b/packages/rabbitmq/src/index.ts index f4e41a9..a21155b 100644 --- a/packages/rabbitmq/src/index.ts +++ b/packages/rabbitmq/src/index.ts @@ -8,7 +8,6 @@ import { } from "qified"; export { - defaultPollInterval, defaultRabbitMqTaskId, defaultRetries as defaultTaskRetries, defaultTimeout as defaultTaskTimeout, diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index 13d5062..30f5f84 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -19,8 +19,6 @@ export type RabbitMqTaskProviderOptions = TaskProviderOptions & { uri?: string; /** Unique identifier for this provider instance. Defaults to "@qified/rabbitmq-task" */ id?: string; - /** Poll interval in milliseconds for checking scheduled tasks. Defaults to 1000 */ - pollInterval?: number; /** Time in seconds to wait before reconnecting. Set to 0 to disable. Defaults to 5 */ reconnectTimeInSeconds?: number; }; @@ -37,9 +35,6 @@ export const defaultTimeout = 30_000; /** Default maximum retry attempts */ export const defaultRetries = 3; -/** Default poll interval (1 second) */ -export const defaultPollInterval = 1000; - /** Default reconnect time in seconds */ export const defaultReconnectTimeInSeconds = 5; @@ -65,20 +60,11 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { private _connectionPromise: Promise | null = null; private _active = true; - private _pollInterval: number; - private readonly _pollTimers: Map> = - new Map(); private readonly _consumerTags: Map = new Map(); // In-memory tracking for attempt counts: taskId -> count private readonly _attemptCounts: Map = new Map(); - // Scheduled tasks: queue -> entries - private readonly _scheduledTasks: Map< - string, - Array<{ task: Task; scheduledAt: number }> - > = new Map(); - // Dead letter tasks for stats: queue -> Task[] private readonly _deadLetterTasks: Map = new Map(); @@ -98,7 +84,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { this._id = options.id ?? defaultRabbitMqTaskId; this._timeout = options.timeout ?? defaultTimeout; this._retries = options.retries ?? defaultRetries; - this._pollInterval = options.pollInterval ?? defaultPollInterval; this._reconnectTimeInSeconds = options.reconnectTimeInSeconds ?? defaultReconnectTimeInSeconds; this._taskHandlers = new Map(); @@ -484,74 +469,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { } } - /** - * Checks for scheduled tasks that are ready to execute. - */ - private async checkScheduledTasks(queue: string): Promise { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - - const scheduled = this._scheduledTasks.get(queue); - if (!scheduled || scheduled.length === 0) { - return; - } - - const now = Date.now(); - const ready: Array<{ task: Task; scheduledAt: number }> = []; - const remaining: Array<{ task: Task; scheduledAt: number }> = []; - - for (const entry of scheduled) { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - - if (entry.scheduledAt <= now) { - ready.push(entry); - } else { - remaining.push(entry); - } - } - - this._scheduledTasks.set(queue, remaining); - - for (const entry of ready) { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - - await this.publishTask(queue, entry.task); - } - } - - /** - * Starts the polling loop for a queue. - */ - private startPolling(queue: string): void { - const poll = async () => { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - - try { - await this.checkScheduledTasks(queue); - } catch (error) { - /* v8 ignore next -- @preserve */ - this.emit("error", error); - } - - if (this._active && this._taskHandlers.has(queue)) { - this._pollTimers.set(queue, setTimeout(poll, this._pollInterval)); - } - }; - - this._pollTimers.set(queue, setTimeout(poll, this._pollInterval)); - } - /** * Enqueues a task to a specific queue. * @param queue The queue name to enqueue to @@ -569,20 +486,7 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { ...taskData, }; - // If scheduled for the future, hold in memory - if (task.scheduledAt && task.scheduledAt > Date.now()) { - if (!this._scheduledTasks.has(queue)) { - this._scheduledTasks.set(queue, []); - } - - this._scheduledTasks.get(queue)?.push({ - task, - scheduledAt: task.scheduledAt, - }); - return task.id; - } - - // Publish to RabbitMQ queue immediately + // Publish to RabbitMQ queue await this.publishTask(queue, task); return task.id; } @@ -608,11 +512,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { const channel = await this.getChannel(); await this._setupConsumer(channel, queue); } - - // Start polling for scheduled tasks if not already - if (!this._pollTimers.has(queue)) { - this.startPolling(queue); - } } /** @@ -648,12 +547,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { this._consumerTags.delete(queue); } - - const timer = this._pollTimers.get(queue); - if (timer) { - clearTimeout(timer); - this._pollTimers.delete(queue); - } } } @@ -671,16 +564,8 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { this._reconnectTimer = undefined; } - // Clear all poll timers - for (const timer of this._pollTimers.values()) { - clearTimeout(timer); - } - - this._pollTimers.clear(); - // Clear handlers and in-memory state this._taskHandlers.clear(); - this._scheduledTasks.clear(); this._attemptCounts.clear(); this._queueTaskIds.clear(); this._processingTasks.clear(); @@ -740,13 +625,12 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { * Uses assertQueue to safely check queue state without risking channel closure. * @param queue The queue name * @returns Queue statistics: `waiting` (ready messages in RabbitMQ), `processing` (tasks - * currently being handled by this provider instance), `deadLetter`, and `scheduled`. + * currently being handled by this provider instance), and `deadLetter`. */ public async getQueueStats(queue: string): Promise<{ waiting: number; processing: number; deadLetter: number; - scheduled: number; }> { let waiting = 0; try { @@ -762,13 +646,11 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { const processing = this._processingTasks.get(queue)?.size ?? 0; const deadLetter = this._deadLetterTasks.get(queue)?.length ?? 0; - const scheduled = this._scheduledTasks.get(queue)?.length ?? 0; return { waiting, processing, deadLetter, - scheduled, }; } @@ -788,7 +670,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { await channel.purgeQueue(`${queue}:dead-letter`); this._deadLetterTasks.delete(queue); - this._scheduledTasks.delete(queue); this._processingTasks.delete(queue); // Clear task data for this queue diff --git a/packages/rabbitmq/test/task.test.ts b/packages/rabbitmq/test/task.test.ts index 8aeacd3..417eb0c 100644 --- a/packages/rabbitmq/test/task.test.ts +++ b/packages/rabbitmq/test/task.test.ts @@ -164,7 +164,6 @@ describe("RabbitMqTaskProvider", () => { id: "custom-id", timeout: 5000, retries: 5, - pollInterval: 500, }); expect(p.id).toBe("custom-id"); expect(p.timeout).toBe(5000); @@ -236,7 +235,6 @@ describe("RabbitMqTaskProvider", () => { priority: 10, maxRetries: 5, timeout: 5000, - scheduledAt: Date.now() + 10000, }); expect(taskId).toBeDefined(); @@ -376,7 +374,7 @@ describe("RabbitMqTaskProvider", () => { describe("task rejection and retry", () => { test("should reject and requeue task on failure", async () => { const q = uniqueQueue(); - const customProvider = await createCustomProvider({ pollInterval: 100 }); + const customProvider = await createCustomProvider({}); await customProvider.connect(); await customProvider.clearQueue(q); @@ -407,7 +405,6 @@ describe("RabbitMqTaskProvider", () => { const q = uniqueQueue(); const customProvider = await createCustomProvider({ retries: 2, - pollInterval: 100, }); await customProvider.connect(); await customProvider.clearQueue(q); @@ -483,7 +480,6 @@ describe("RabbitMqTaskProvider", () => { const q = uniqueQueue(); const customProvider = await createCustomProvider({ retries: 3, - pollInterval: 100, }); await customProvider.connect(); await customProvider.clearQueue(q); @@ -519,7 +515,6 @@ describe("RabbitMqTaskProvider", () => { const customProvider = await createCustomProvider({ timeout: 100, retries: 1, - pollInterval: 50, }); await customProvider.connect(); await customProvider.clearQueue(q); @@ -555,7 +550,6 @@ describe("RabbitMqTaskProvider", () => { const customProvider = await createCustomProvider({ timeout: 5000, retries: 1, - pollInterval: 50, }); await customProvider.connect(); await customProvider.clearQueue(q); @@ -595,7 +589,6 @@ describe("RabbitMqTaskProvider", () => { const q = uniqueQueue(); const customProvider = await createCustomProvider({ timeout: 200, - pollInterval: 50, }); await customProvider.connect(); await customProvider.clearQueue(q); @@ -668,7 +661,7 @@ describe("RabbitMqTaskProvider", () => { test("should increment attempt on retry", async () => { const q = uniqueQueue(); - const customProvider = await createCustomProvider({ pollInterval: 50 }); + const customProvider = await createCustomProvider({}); await customProvider.connect(); await customProvider.clearQueue(q); @@ -719,62 +712,6 @@ describe("RabbitMqTaskProvider", () => { }); }); - describe("scheduled tasks", () => { - test("should not process task before scheduledAt time", async () => { - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await provider.dequeue(testQueue, handler); - await provider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 2000, // Schedule 2 seconds in future - }); - - await new Promise((resolve) => setTimeout(resolve, 200)); - - expect(processed).toBe(false); - const stats = await provider.getQueueStats(testQueue); - expect(stats.scheduled).toBe(1); - }); - - test("should process task after scheduledAt time via polling", async () => { - const q = uniqueQueue(); - const customProvider = await createCustomProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(q); - - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await customProvider.dequeue(q, handler); - await customProvider.enqueue(q, { - data: { message: "test" }, - scheduledAt: Date.now() + 100, // Schedule 100ms in future - }); - - // Task should not be processed yet - await new Promise((resolve) => setTimeout(resolve, 50)); - expect(processed).toBe(false); - - // Wait for scheduled task to be processed by polling - await waitFor(() => processed); - - expect(processed).toBe(true); - - await customProvider.clearQueue(q); - }); - }); - describe("unsubscribe", () => { test("should unsubscribe specific handler by id", async () => { const handler1: TaskHandler = { @@ -921,7 +858,6 @@ describe("RabbitMqTaskProvider", () => { waiting: 0, processing: 0, deadLetter: 0, - scheduled: 0, }); }); @@ -942,17 +878,6 @@ describe("RabbitMqTaskProvider", () => { await provider.clearQueue(q); }); - test("should return correct stats with scheduled tasks", async () => { - await provider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 10000, - }); - - const stats = await provider.getQueueStats(testQueue); - expect(stats.scheduled).toBe(1); - expect(stats.waiting).toBe(0); - }); - test("should return correct stats with dead-letter tasks", async () => { const handler: TaskHandler = { id: "test-handler", @@ -1085,10 +1010,6 @@ describe("RabbitMqTaskProvider", () => { test("should clear all data for a queue", async () => { // Add tasks to various states await provider.enqueue(testQueue, { data: { message: "test1" } }); - await provider.enqueue(testQueue, { - data: { message: "test2" }, - scheduledAt: Date.now() + 10000, - }); const handler: TaskHandler = { id: "test-handler", @@ -1112,14 +1033,13 @@ describe("RabbitMqTaskProvider", () => { const statsAfter = await provider.getQueueStats(testQueue); expect(statsAfter.waiting).toBe(0); expect(statsAfter.deadLetter).toBe(0); - expect(statsAfter.scheduled).toBe(0); }); }); describe("edge cases", () => { test("should handle disconnect during task processing", async () => { const q = uniqueQueue(); - const customProvider = await createCustomProvider({ pollInterval: 50 }); + const customProvider = await createCustomProvider({}); await customProvider.connect(); await customProvider.clearQueue(q); @@ -1146,7 +1066,7 @@ describe("RabbitMqTaskProvider", () => { test("should handle polling loop exit when inactive", async () => { const q = uniqueQueue(); - const customProvider = await createCustomProvider({ pollInterval: 20 }); + const customProvider = await createCustomProvider({}); await customProvider.connect(); await customProvider.clearQueue(q); @@ -1173,7 +1093,6 @@ describe("RabbitMqTaskProvider", () => { const customProvider = await createCustomProvider({ timeout: 5000, retries: 1, - pollInterval: 50, }); await customProvider.connect(); await customProvider.clearQueue(q); @@ -1204,33 +1123,5 @@ describe("RabbitMqTaskProvider", () => { await customProvider.clearQueue(q); }); - - test("should handle disconnect during scheduled task processing", async () => { - const q = uniqueQueue(); - const customProvider = await createCustomProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(q); - - // Enqueue multiple scheduled tasks - for (let i = 0; i < 5; i++) { - await customProvider.enqueue(q, { - data: { message: `scheduled-${i}` }, - scheduledAt: Date.now() + 10, // Very short delay - }); - } - - // Register a handler to start polling - await customProvider.dequeue(q, { - id: "test-handler", - handler: async () => {}, - }); - - // Wait a tiny bit then disconnect during processing - await new Promise((resolve) => setTimeout(resolve, 30)); - await customProvider.disconnect(); - - // Should not throw - disconnect handled gracefully - expect(customProvider.taskHandlers.size).toBe(0); - }); }); }); diff --git a/packages/redis/src/task.ts b/packages/redis/src/task.ts index cc3d48e..6bb689e 100644 --- a/packages/redis/src/task.ts +++ b/packages/redis/src/task.ts @@ -18,7 +18,7 @@ export type RedisTaskProviderOptions = TaskProviderOptions & { uri?: string; /** Unique identifier for this provider instance. Defaults to "@qified/redis-task" */ id?: string; - /** Poll interval in milliseconds for checking scheduled and timed-out tasks. Defaults to 1000 */ + /** Poll interval in milliseconds for checking timed-out tasks. Defaults to 1000 */ pollInterval?: number; }; @@ -161,13 +161,6 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { return `${queue}:tasks`; } - /** - * Gets the Redis key for scheduled tasks sorted set. - */ - private getScheduledKey(queue: string): string { - return `${queue}:scheduled`; - } - /** * Gets the Redis key for processing tasks sorted set. */ @@ -221,17 +214,8 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { // Initialize attempt count await client.set(this.getTaskAttemptKey(queue, task.id), "0"); - // Add to appropriate queue - if (task.scheduledAt && task.scheduledAt > Date.now()) { - // Add to scheduled sorted set with scheduledAt as score - await client.zAdd(this.getScheduledKey(queue), { - score: task.scheduledAt, - value: task.id, - }); - } else { - // Add to queue list (LPUSH for FIFO with RPOP) - await client.lPush(this.getQueueKey(queue), task.id); - } + // Add to queue list (LPUSH for FIFO with RPOP) + await client.lPush(this.getQueueKey(queue), task.id); // Process immediately if handlers are registered void this.processQueue(queue); @@ -275,7 +259,6 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { } try { - await this.checkScheduledTasks(queue); await this.checkTimedOutTasks(queue); await this.processQueue(queue); } catch (error) { @@ -291,36 +274,6 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { this._pollTimers.set(queue, setTimeout(poll, this._pollInterval)); } - /** - * Checks for scheduled tasks that are ready to execute. - */ - private async checkScheduledTasks(queue: string): Promise { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - - const client = await this.getClient(); - const now = Date.now(); - - // Get scheduled tasks with score <= now - const readyTasks = await client.zRangeByScore( - this.getScheduledKey(queue), - 0, - now, - ); - - for (const taskId of readyTasks) { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - // Move from scheduled to queue - await client.zRem(this.getScheduledKey(queue), taskId); - await client.lPush(this.getQueueKey(queue), taskId); - } - } - /** * Checks for tasks that have timed out during processing. */ @@ -683,22 +636,19 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { waiting: number; processing: number; deadLetter: number; - scheduled: number; }> { const client = await this.getClient(); - const [waiting, processing, deadLetter, scheduled] = await Promise.all([ + const [waiting, processing, deadLetter] = await Promise.all([ client.lLen(this.getQueueKey(queue)), client.zCard(this.getProcessingKey(queue)), client.lLen(this.getDeadLetterKey(queue)), - client.zCard(this.getScheduledKey(queue)), ]); return { waiting, processing, deadLetter, - scheduled, }; } @@ -710,20 +660,13 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { const client = await this.getClient(); // Get all task IDs from all locations - const [queueTasks, processingTasks, deadLetterTasks, scheduledTasks] = - await Promise.all([ - client.lRange(this.getQueueKey(queue), 0, -1), - client.zRange(this.getProcessingKey(queue), 0, -1), - client.lRange(this.getDeadLetterKey(queue), 0, -1), - client.zRange(this.getScheduledKey(queue), 0, -1), - ]); - - const allTaskIds = [ - ...queueTasks, - ...processingTasks, - ...deadLetterTasks, - ...scheduledTasks, - ]; + const [queueTasks, processingTasks, deadLetterTasks] = await Promise.all([ + client.lRange(this.getQueueKey(queue), 0, -1), + client.zRange(this.getProcessingKey(queue), 0, -1), + client.lRange(this.getDeadLetterKey(queue), 0, -1), + ]); + + const allTaskIds = [...queueTasks, ...processingTasks, ...deadLetterTasks]; // Delete all task data and attempt counters for (const taskId of allTaskIds) { @@ -735,6 +678,5 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { await client.del(this.getQueueKey(queue)); await client.del(this.getProcessingKey(queue)); await client.del(this.getDeadLetterKey(queue)); - await client.del(this.getScheduledKey(queue)); } } diff --git a/packages/redis/test/task.test.ts b/packages/redis/test/task.test.ts index 571242f..5598384 100644 --- a/packages/redis/test/task.test.ts +++ b/packages/redis/test/task.test.ts @@ -239,7 +239,6 @@ describe("RedisTaskProvider", () => { priority: 10, maxRetries: 5, timeout: 5000, - scheduledAt: Date.now() + 10000, }); expect(taskId).toBeDefined(); @@ -723,62 +722,6 @@ describe("RedisTaskProvider", () => { }); }); - describe("scheduled tasks", () => { - test("should not process task before scheduledAt time", async () => { - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await provider.dequeue(testQueue, handler); - await provider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 2000, // Schedule 2 seconds in future - }); - - await new Promise((resolve) => setTimeout(resolve, 200)); - - expect(processed).toBe(false); - const stats = await provider.getQueueStats(testQueue); - expect(stats.scheduled).toBe(1); - }); - - test("should process task after scheduledAt time via polling", async () => { - const customProvider = new RedisTaskProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 100, // Schedule 100ms in future - }); - - // Task should not be processed yet - await new Promise((resolve) => setTimeout(resolve, 50)); - expect(processed).toBe(false); - - // Wait for scheduled task to be processed by polling - await new Promise((resolve) => setTimeout(resolve, 200)); - - expect(processed).toBe(true); - - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); - }); - }); - describe("unsubscribe", () => { test("should unsubscribe specific handler by id", async () => { const handler1: TaskHandler = { @@ -921,7 +864,6 @@ describe("RedisTaskProvider", () => { waiting: 0, processing: 0, deadLetter: 0, - scheduled: 0, }); }); @@ -935,17 +877,6 @@ describe("RedisTaskProvider", () => { expect(stats.deadLetter).toBe(0); }); - test("should return correct stats with scheduled tasks", async () => { - await provider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 10000, - }); - - const stats = await provider.getQueueStats(testQueue); - expect(stats.scheduled).toBe(1); - expect(stats.waiting).toBe(0); - }); - test("should return correct stats with dead-letter tasks", async () => { const handler: TaskHandler = { id: "test-handler", @@ -1075,10 +1006,6 @@ describe("RedisTaskProvider", () => { test("should clear all data for a queue", async () => { // Add tasks to various states await provider.enqueue(testQueue, { data: { message: "test1" } }); - await provider.enqueue(testQueue, { - data: { message: "test2" }, - scheduledAt: Date.now() + 10000, - }); const handler: TaskHandler = { id: "test-handler", @@ -1094,9 +1021,7 @@ describe("RedisTaskProvider", () => { // Verify data exists const statsBefore = await provider.getQueueStats(testQueue); - expect( - statsBefore.waiting + statsBefore.scheduled + statsBefore.deadLetter, - ).toBeGreaterThan(0); + expect(statsBefore.waiting + statsBefore.deadLetter).toBeGreaterThan(0); // Clear queue await provider.clearQueue(testQueue); @@ -1106,7 +1031,6 @@ describe("RedisTaskProvider", () => { expect(statsAfter.waiting).toBe(0); expect(statsAfter.processing).toBe(0); expect(statsAfter.deadLetter).toBe(0); - expect(statsAfter.scheduled).toBe(0); }); }); @@ -1145,34 +1069,6 @@ describe("RedisTaskProvider", () => { await customProvider.disconnect(); }); - test("should handle disconnect during scheduled task processing", async () => { - // This test covers early return in checkScheduledTasks (lines 293, 308) - const customProvider = new RedisTaskProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - // Enqueue multiple scheduled tasks - for (let i = 0; i < 5; i++) { - await customProvider.enqueue(testQueue, { - data: { message: `scheduled-${i}` }, - scheduledAt: Date.now() + 10, // Very short delay - }); - } - - // Register a handler to start polling - await customProvider.dequeue(testQueue, { - id: "test-handler", - handler: async () => {}, - }); - - // Wait a tiny bit then disconnect during processing - await new Promise((resolve) => setTimeout(resolve, 30)); - await customProvider.disconnect(); - - // Should not throw - disconnect handled gracefully - expect(customProvider.taskHandlers.size).toBe(0); - }); - test("should handle disconnect during timed out task processing", async () => { // This test covers early return in checkTimedOutTasks (lines 321, 335) const customProvider = new RedisTaskProvider({ From 8273d681912a74779074f149528989fca5c2baf7 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 11:04:15 -0800 Subject: [PATCH 05/13] Update task.test.ts --- packages/redis/test/task.test.ts | 90 -------------------------------- 1 file changed, 90 deletions(-) diff --git a/packages/redis/test/task.test.ts b/packages/redis/test/task.test.ts index 5598384..8ec12bc 100644 --- a/packages/redis/test/task.test.ts +++ b/packages/redis/test/task.test.ts @@ -9,25 +9,6 @@ import { RedisTaskProvider, } from "../src/index.js"; -async function waitFor( - condition: () => boolean | Promise, - timeoutMs = 5000, - intervalMs = 50, -): Promise { - const deadline = Date.now() + timeoutMs; - while (Date.now() < deadline) { - if (await condition()) { - return; - } - - await new Promise((resolve) => { - setTimeout(resolve, intervalMs); - }); - } - - throw new Error(`waitFor timed out after ${timeoutMs}ms`); -} - describe("RedisTaskProvider", () => { let provider: RedisTaskProvider; const testQueue = `test-queue-${Date.now()}`; @@ -550,45 +531,6 @@ describe("RedisTaskProvider", () => { await customProvider.clearQueue(testQueue); await customProvider.disconnect(); }); - - test("should use task-specific timeout over provider default", async () => { - const customProvider = new RedisTaskProvider({ - timeout: 5000, - pollInterval: 50, - }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - const handler: TaskHandler = { - id: "test-handler", - handler: async (_task: Task, ctx: TaskContext) => { - // Simulate long-running task - await new Promise((resolve) => setTimeout(resolve, 200)); - await ctx.ack(); - }, - }; - - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { - data: { message: "test" }, - timeout: 50, // Task-specific timeout (shorter than handler duration) - }); - - // Poll until the timeout has fired and the task has been requeued or dead-lettered - await waitFor(async () => { - const s = await customProvider.getQueueStats(testQueue); - return s.waiting + s.deadLetter > 0; - }); - - // Task should have timed out and been requeued or moved to dead letter - const stats = await customProvider.getQueueStats(testQueue); - expect( - stats.waiting + stats.deadLetter + stats.processing, - ).toBeGreaterThan(0); - - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); - }); }); describe("task context extend", () => { @@ -668,38 +610,6 @@ describe("RedisTaskProvider", () => { expect(contextMetadata.maxRetries).toBe(defaultTaskRetries); }); - test("should increment attempt on retry", async () => { - const customProvider = new RedisTaskProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - const attempts: number[] = []; - const handler: TaskHandler = { - id: "test-handler", - handler: async (_task: Task, ctx: TaskContext) => { - attempts.push(ctx.metadata.attempt); - if (ctx.metadata.attempt < 2) { - await ctx.reject(true); - } else { - await ctx.ack(); - } - }, - }; - - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); - - // Poll until both processing attempts have completed - await waitFor(() => attempts.length > 1); - - expect(attempts.length).toBeGreaterThan(1); - expect(attempts[0]).toBe(1); - expect(attempts[1]).toBe(2); - - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); - }); - test("should use task-specific maxRetries", async () => { let contextMetadata: any; const handler: TaskHandler = { From 63a623fb78749a8cb21f52450b093e7afe94b157 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 11:08:21 -0800 Subject: [PATCH 06/13] connection promise fix --- packages/rabbitmq/src/task.ts | 37 ++++++++++++++++++++--------------- packages/redis/src/task.ts | 7 ++++++- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index 30f5f84..0445c5f 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -151,22 +151,27 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { async connect(): Promise { if (!this._connectionPromise) { this._connectionPromise = (async () => { - const connection = await connect(this._uri); - this._connection = connection; - this._channel = await connection.createChannel(); - - connection.on("error", () => { - // Connection error emitted — connection is already closing/closed. - // The 'close' handler will trigger reconnection. - }); - - connection.on("close", () => { - this._channel = undefined; - this._connection = undefined; - if (!this._closing) { - this._scheduleReconnect(); - } - }); + try { + const connection = await connect(this._uri); + this._connection = connection; + this._channel = await connection.createChannel(); + + connection.on("error", () => { + // Connection error emitted — connection is already closing/closed. + // The 'close' handler will trigger reconnection. + }); + + connection.on("close", () => { + this._channel = undefined; + this._connection = undefined; + if (!this._closing) { + this._scheduleReconnect(); + } + }); + } catch (error) { + this._connectionPromise = null; + throw error; + } })(); } diff --git a/packages/redis/src/task.ts b/packages/redis/src/task.ts index 6bb689e..8ebc976 100644 --- a/packages/redis/src/task.ts +++ b/packages/redis/src/task.ts @@ -132,7 +132,12 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { async connect(): Promise { if (!this._connectionPromise) { this._connectionPromise = (async () => { - await this._client.connect(); + try { + await this._client.connect(); + } catch (error) { + this._connectionPromise = null; + throw error; + } })(); } return this._connectionPromise; From c7426e4736f1cd439eac9d406cbc5e49a75d8eec Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 11:14:53 -0800 Subject: [PATCH 07/13] retry dead letter --- packages/rabbitmq/src/task.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index 0445c5f..5cbeb42 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -410,13 +410,13 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { rejected = true; try { - nackAmqp(); - if (requeue && currentAttempt < maxRetries) { await this.publishTask(queue, task); } else { await this.moveToDeadLetter(queue, task); } + + nackAmqp(); } catch (error) { /* v8 ignore next -- @preserve */ this.emit("error", error); From 23930857d49a28ddaf1697b2d323a2a848c8b0ba Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 11:16:45 -0800 Subject: [PATCH 08/13] more fixes on deadletter --- packages/rabbitmq/src/task.ts | 43 +++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index 5cbeb42..fa66d8d 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -65,9 +65,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { // In-memory tracking for attempt counts: taskId -> count private readonly _attemptCounts: Map = new Map(); - // Dead letter tasks for stats: queue -> Task[] - private readonly _deadLetterTasks: Map = new Map(); - // Track queue -> set of taskIds for cleanup private readonly _queueTaskIds: Map> = new Map(); @@ -283,12 +280,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { persistent: true, }); - if (!this._deadLetterTasks.has(queue)) { - this._deadLetterTasks.set(queue, []); - } - - this._deadLetterTasks.get(queue)?.push(task); - // Clean up in-memory tracking this._attemptCounts.delete(task.id); this._queueTaskIds.get(queue)?.delete(task.id); @@ -622,7 +613,27 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { * @returns Array of tasks in the dead-letter queue */ public async getDeadLetterTasks(queue: string): Promise { - return this._deadLetterTasks.get(queue) ?? []; + const channel = await this.getChannel(); + const dlqName = `${queue}:dead-letter`; + await channel.assertQueue(dlqName, { durable: true }); + + const tasks: Task[] = []; + // Drain DLQ messages using basic.get, then nack them back so they + // remain in the queue for future inspection. + let msg = await channel.get(dlqName, { noAck: false }); + while (msg) { + try { + const task = JSON.parse(msg.content.toString()) as Task; + tasks.push(task); + } catch { + // Skip malformed messages + } + + channel.nack(msg, false, true); + msg = await channel.get(dlqName, { noAck: false }); + } + + return tasks; } /** @@ -649,8 +660,17 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { // Queue assertion failed } + let deadLetter = 0; + try { + const channel = await this.getChannel(); + const dlqInfo = await channel.assertQueue(`${queue}:dead-letter`, { durable: true }); + deadLetter = dlqInfo.messageCount; + } catch { + /* v8 ignore next -- @preserve */ + // DLQ assertion failed + } + const processing = this._processingTasks.get(queue)?.size ?? 0; - const deadLetter = this._deadLetterTasks.get(queue)?.length ?? 0; return { waiting, @@ -674,7 +694,6 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { await channel.assertQueue(`${queue}:dead-letter`, { durable: true }); await channel.purgeQueue(`${queue}:dead-letter`); - this._deadLetterTasks.delete(queue); this._processingTasks.delete(queue); // Clear task data for this queue From edec71da6bb63beedfbb37d333cd9161e60b9830 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 11:18:47 -0800 Subject: [PATCH 09/13] lint --- packages/rabbitmq/src/task.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index fa66d8d..9b072d7 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -663,7 +663,9 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { let deadLetter = 0; try { const channel = await this.getChannel(); - const dlqInfo = await channel.assertQueue(`${queue}:dead-letter`, { durable: true }); + const dlqInfo = await channel.assertQueue(`${queue}:dead-letter`, { + durable: true, + }); deadLetter = dlqInfo.messageCount; } catch { /* v8 ignore next -- @preserve */ From a75ad549ed51bd916e22990c0a940a39e43e8905 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 11:24:59 -0800 Subject: [PATCH 10/13] Update AGENTS.md --- AGENTS.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index f468a4d..bc0d083 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,8 +24,8 @@ Qified is a task and message queue library with multiple providers, built with T ## Development Rules -1. **Always run `pnpm test` before committing** - All tests must pass -2. **Maintain 100% code coverage** - Add tests for any new code +1. **Always run `pnpm test` after every change** - You must test your changes every time, no exceptions. All tests must pass before committing. +2. **Maintain 100% code coverage** - Add tests for any new code. Every change must achieve 100% coverage. If coverage drops below 100%, add or update tests until it is restored. 3. **Follow existing code style** - Biome enforces formatting and linting ## Structure From 37d6db6901b7a6ebe2c699a3c91137d32da41061 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 11:29:19 -0800 Subject: [PATCH 11/13] fixing tests --- packages/rabbitmq/src/task.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts index 9b072d7..2a98716 100644 --- a/packages/rabbitmq/src/task.ts +++ b/packages/rabbitmq/src/task.ts @@ -618,10 +618,13 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { await channel.assertQueue(dlqName, { durable: true }); const tasks: Task[] = []; - // Drain DLQ messages using basic.get, then nack them back so they - // remain in the queue for future inspection. + const messages: import("amqplib").GetMessage[] = []; + + // Collect all messages without acking — unacked messages are held by + // the channel and won't be re-delivered to subsequent get calls. let msg = await channel.get(dlqName, { noAck: false }); while (msg) { + messages.push(msg); try { const task = JSON.parse(msg.content.toString()) as Task; tasks.push(task); @@ -629,10 +632,14 @@ export class RabbitMqTaskProvider extends Hookified implements TaskProvider { // Skip malformed messages } - channel.nack(msg, false, true); msg = await channel.get(dlqName, { noAck: false }); } + // Nack all messages back to the queue so they remain for future inspection + for (const m of messages) { + channel.nack(m, false, true); + } + return tasks; } From 1fbe8529222a426ae694d685f18b369a074120e4 Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 12:02:15 -0800 Subject: [PATCH 12/13] fixing readme --- packages/rabbitmq/README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/rabbitmq/README.md b/packages/rabbitmq/README.md index a4af112..69179aa 100644 --- a/packages/rabbitmq/README.md +++ b/packages/rabbitmq/README.md @@ -80,13 +80,13 @@ await taskProvider.dequeue("my-queue", { console.log("Processing task:", task.data); // Access attempt metadata - console.log(`Attempt ${ctx.metadata().attempt} of ${ctx.metadata().maxRetries}`); + console.log(`Attempt ${ctx.metadata.attempt} of ${ctx.metadata.maxRetries}`); // Extend the deadline if needed - ctx.extend(10_000); + await ctx.extend(10_000); // Acknowledge the task on success - ctx.ack(); + await ctx.ack(); }, }); @@ -174,14 +174,14 @@ Explicitly connects to RabbitMQ. Called automatically on first `enqueue` or `deq #### enqueue(queue: string, taskData: EnqueueTask) -Enqueues a task to the specified queue. Returns a `Promise`. +Enqueues a task to the specified queue. Returns a `Promise` with the generated task ID. Task data options: - `data`: The task payload (any serializable value). - `id?`: Custom task ID. Auto-generated if omitted. - `timeout?`: Per-task timeout override in milliseconds. -- `retries?`: Per-task max retry override. +- `maxRetries?`: Per-task max retry override. - `priority?`: Task priority value. #### dequeue(queue: string, handler: TaskHandler) @@ -193,7 +193,7 @@ Registers a handler to process tasks from the specified queue. The handler recei - `ack()`: Acknowledge the task (removes it from the queue). - `reject(requeue?: boolean)`: Reject the task. If `requeue` is `true` (default), re-enqueues for retry. After max retries, moves to dead-letter queue. - `extend(ms: number)`: Extend the processing deadline by the given milliseconds. -- `metadata()`: Returns `{ attempt, maxRetries }` for the current task. +- `metadata`: Object with `{ attempt, maxRetries }` for the current task. #### unsubscribe(queue: string, id?: string) From 5886ae0fc6a005abd1984c5c82c6baf0ee38b53c Mon Sep 17 00:00:00 2001 From: Jared Wray Date: Thu, 12 Feb 2026 12:10:40 -0800 Subject: [PATCH 13/13] readme fixes --- README.md | 2 +- packages/qified/README.md | 12 ++++++------ packages/zeromq/README.md | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 97ef7aa..ee085d4 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![site/logo.svg](site/logo.svg)](https://qified.org) [![tests](https://github.com/jaredwray/qified/actions/workflows/tests.yaml/badge.svg)](https://github.com/jaredwray/qified/actions/workflows/tests.yaml) -[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/master/LICENSE) +[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/main/LICENSE) [![codecov](https://codecov.io/gh/jaredwray/qified/graph/badge.svg?token=jcRdy8SkOG)](https://codecov.io/gh/jaredwray/qified) [![npm](https://img.shields.io/npm/dm/qified)](https://npmjs.com/package/qified) [![npm](https://img.shields.io/npm/v/qified)](https://npmjs.com/package/qified) diff --git a/packages/qified/README.md b/packages/qified/README.md index edb6255..000aa8f 100644 --- a/packages/qified/README.md +++ b/packages/qified/README.md @@ -1,7 +1,7 @@ [![logo.svg](https://qified.org/logo.svg)](https://qified.org) [![tests](https://github.com/jaredwray/qified/actions/workflows/tests.yaml/badge.svg)](https://github.com/jaredwray/qified/actions/workflows/tests.yaml) -[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/master/LICENSE) +[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/main/LICENSE) [![codecov](https://codecov.io/gh/jaredwray/qified/graph/badge.svg?token=jcRdy8SkOG)](https://codecov.io/gh/jaredwray/qified) [![npm](https://img.shields.io/npm/dm/qified)](https://npmjs.com/package/qified) [![npm](https://img.shields.io/npm/v/qified)](https://npmjs.com/package/qified) @@ -198,7 +198,7 @@ await qified.unsubscribe('user-events', 'userEventHandler'); await qified.unsubscribe('user-events'); ``` -## disconnect` +## disconnect Disconnect from all providers and clean up resources. @@ -473,10 +473,10 @@ qified.on(QifiedEvents.publish, async (data) => { There are multiple providers available to use: * Memory - this is built into the current `qified` library as `MemoryMessageProvider`. -* [@qified/redis](packages/redis/README.md) - Redis Provider -* [@qified/rabbitmq](packages/rabbitmq/README.md) - RabbitMQ Provider -* [@qified/nats](packages/nats/README.md) - NATS Provider -* [@qified/zeromq](packages/zeromq/README.md) - ZeroMQ Provider +* [@qified/redis](../redis/README.md) - Redis Provider +* [@qified/rabbitmq](../rabbitmq/README.md) - RabbitMQ Provider +* [@qified/nats](../nats/README.md) - NATS Provider +* [@qified/zeromq](../zeromq/README.md) - ZeroMQ Provider # Development and Testing diff --git a/packages/zeromq/README.md b/packages/zeromq/README.md index 9199e96..74b0c76 100644 --- a/packages/zeromq/README.md +++ b/packages/zeromq/README.md @@ -9,7 +9,7 @@ This package implements a message provider backed by ZeroMQ using queues for pub - [Installation](#installation) - [Usage with Qified](#usage-with-qified) - [API](#api) - - [ZmqMessageProviderOptions](#ZmqMessageProviderOptions) + - [ZmqMessageProviderOptions](#zmqmessageprovideroptions) - [defaultZmqUri](#defaultzmquri) - [ZmqMessageProvider](#zmqmessageprovider) - [constructor](#constructor)