diff --git a/AGENTS.md b/AGENTS.md index f468a4d..bc0d083 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,8 +24,8 @@ Qified is a task and message queue library with multiple providers, built with T ## Development Rules -1. **Always run `pnpm test` before committing** - All tests must pass -2. **Maintain 100% code coverage** - Add tests for any new code +1. **Always run `pnpm test` after every change** - You must test your changes every time, no exceptions. All tests must pass before committing. +2. **Maintain 100% code coverage** - Add tests for any new code. Every change must achieve 100% coverage. If coverage drops below 100%, add or update tests until it is restored. 3. **Follow existing code style** - Biome enforces formatting and linting ## Structure diff --git a/README.md b/README.md index 97ef7aa..ee085d4 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![site/logo.svg](site/logo.svg)](https://qified.org) [![tests](https://github.com/jaredwray/qified/actions/workflows/tests.yaml/badge.svg)](https://github.com/jaredwray/qified/actions/workflows/tests.yaml) -[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/master/LICENSE) +[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/main/LICENSE) [![codecov](https://codecov.io/gh/jaredwray/qified/graph/badge.svg?token=jcRdy8SkOG)](https://codecov.io/gh/jaredwray/qified) [![npm](https://img.shields.io/npm/dm/qified)](https://npmjs.com/package/qified) [![npm](https://img.shields.io/npm/v/qified)](https://npmjs.com/package/qified) diff --git a/packages/qified/README.md b/packages/qified/README.md index edb6255..000aa8f 100644 --- a/packages/qified/README.md +++ b/packages/qified/README.md @@ -1,7 +1,7 @@ [![logo.svg](https://qified.org/logo.svg)](https://qified.org) [![tests](https://github.com/jaredwray/qified/actions/workflows/tests.yaml/badge.svg)](https://github.com/jaredwray/qified/actions/workflows/tests.yaml) -[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/master/LICENSE) +[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/main/LICENSE) [![codecov](https://codecov.io/gh/jaredwray/qified/graph/badge.svg?token=jcRdy8SkOG)](https://codecov.io/gh/jaredwray/qified) [![npm](https://img.shields.io/npm/dm/qified)](https://npmjs.com/package/qified) [![npm](https://img.shields.io/npm/v/qified)](https://npmjs.com/package/qified) @@ -198,7 +198,7 @@ await qified.unsubscribe('user-events', 'userEventHandler'); await qified.unsubscribe('user-events'); ``` -## disconnect` +## disconnect Disconnect from all providers and clean up resources. @@ -473,10 +473,10 @@ qified.on(QifiedEvents.publish, async (data) => { There are multiple providers available to use: * Memory - this is built into the current `qified` library as `MemoryMessageProvider`. -* [@qified/redis](packages/redis/README.md) - Redis Provider -* [@qified/rabbitmq](packages/rabbitmq/README.md) - RabbitMQ Provider -* [@qified/nats](packages/nats/README.md) - NATS Provider -* [@qified/zeromq](packages/zeromq/README.md) - ZeroMQ Provider +* [@qified/redis](../redis/README.md) - Redis Provider +* [@qified/rabbitmq](../rabbitmq/README.md) - RabbitMQ Provider +* [@qified/nats](../nats/README.md) - NATS Provider +* [@qified/zeromq](../zeromq/README.md) - ZeroMQ Provider # Development and Testing diff --git a/packages/qified/src/memory/task.ts b/packages/qified/src/memory/task.ts index b1e154d..fe623ef 100644 --- a/packages/qified/src/memory/task.ts +++ b/packages/qified/src/memory/task.ts @@ -223,14 +223,6 @@ export class MemoryTaskProvider implements TaskProvider { continue; } - // Check if task is scheduled for later - if ( - queuedTask.task.scheduledAt && - queuedTask.task.scheduledAt > Date.now() - ) { - continue; - } - // Mark as processing queuedTask.processing = true; processingSet.add(queuedTask.task.id); diff --git a/packages/qified/src/types.ts b/packages/qified/src/types.ts index 22d6180..13bbc4f 100644 --- a/packages/qified/src/types.ts +++ b/packages/qified/src/types.ts @@ -103,13 +103,6 @@ export type Task = { */ timestamp?: number; - /** - * Scheduled time for delayed task execution (milliseconds since epoch) - * If set, task won't be processed until this time - * @type {number} - */ - scheduledAt?: number; - /** * Headers for additional metadata * @type {Record} diff --git a/packages/qified/test/memory-task.test.ts b/packages/qified/test/memory-task.test.ts index 5e90542..8ea080c 100644 --- a/packages/qified/test/memory-task.test.ts +++ b/packages/qified/test/memory-task.test.ts @@ -107,7 +107,6 @@ describe("MemoryTaskProvider", () => { priority: 10, maxRetries: 5, timeout: 5000, - scheduledAt: Date.now() + 10000, }); expect(taskId).toBeDefined(); @@ -523,58 +522,6 @@ describe("MemoryTaskProvider", () => { }); }); - describe("scheduled tasks", () => { - test("should not process task before scheduledAt time", async () => { - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await provider.dequeue("test-queue", handler); - await provider.enqueue("test-queue", { - data: { message: "test" }, - scheduledAt: Date.now() + 1000, // Schedule 1 second in future - }); - - await new Promise((resolve) => setTimeout(resolve, 100)); - - expect(processed).toBe(false); - const stats = provider.getQueueStats("test-queue"); - expect(stats.waiting).toBe(1); - }); - - test("should process task after scheduledAt time", async () => { - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await provider.dequeue("test-queue", handler); - await provider.enqueue("test-queue", { - data: { message: "test" }, - scheduledAt: Date.now() + 50, // Schedule 50ms in future - }); - - // Task should not be processed yet - await new Promise((resolve) => setTimeout(resolve, 25)); - expect(processed).toBe(false); - - // Now enqueue another task to trigger processing of scheduled task - await provider.enqueue("test-queue", { data: { message: "trigger" } }); - - // Wait for scheduled task to be processed - await new Promise((resolve) => setTimeout(resolve, 100)); - - expect(processed).toBe(true); - }); - }); - describe("unsubscribe", () => { test("should unsubscribe specific handler by id", async () => { const handler1: TaskHandler = { @@ -798,35 +745,6 @@ describe("MemoryTaskProvider", () => { expect(stats.waiting).toBe(0); expect(stats.processing).toBe(0); }); - - test("should not process scheduled tasks after disconnect", async () => { - const customProvider = new MemoryTaskProvider(); - let taskProcessed = false; - - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - taskProcessed = true; - }, - }; - - await customProvider.dequeue("test-queue", handler); - - // Enqueue a task scheduled for the future - await customProvider.enqueue("test-queue", { - data: { message: "scheduled-task" }, - scheduledAt: Date.now() + 100, - }); - - // Disconnect before the scheduled time - await customProvider.disconnect(); - - // Wait past the scheduled time - await new Promise((resolve) => setTimeout(resolve, 150)); - - // Task should not have been processed - expect(taskProcessed).toBe(false); - }); }); describe("getDeadLetterTasks", () => { diff --git a/packages/rabbitmq/README.md b/packages/rabbitmq/README.md index 8436b26..69179aa 100644 --- a/packages/rabbitmq/README.md +++ b/packages/rabbitmq/README.md @@ -1,13 +1,15 @@ # @qified/rabbitmq -RabbitMQ message provider for [Qified](https://github.com/jaredwray/qified). +RabbitMQ message and task provider for [Qified](https://github.com/jaredwray/qified). -This package implements a message provider backed by RabbitMQ using queues for publish and subscribe operations. +This package implements a message provider and a task provider backed by RabbitMQ. The message provider uses queues for publish/subscribe operations, and the task provider adds reliable task queue processing with retries, timeouts, and dead-letter queues. ## Table of Contents - [Installation](#installation) - [Usage with Qified](#usage-with-qified) + - [Message Provider](#message-provider) + - [Task Provider](#task-provider) - [API](#api) - [RabbitMqMessageProviderOptions](#rabbitmqmessageprovideroptions) - [defaultRabbitMqUri](#defaultrabbitmquri) @@ -18,6 +20,17 @@ This package implements a message provider backed by RabbitMQ using queues for p - [unsubscribe](#unsubscribe) - [disconnect](#disconnect) - [createQified](#createqified) + - [RabbitMqTaskProviderOptions](#rabbitmqtaskprovideroptions) + - [RabbitMqTaskProvider](#rabbitmqtaskprovider) + - [constructor](#constructor-1) + - [connect](#connect) + - [enqueue](#enqueue) + - [dequeue](#dequeue) + - [unsubscribe](#unsubscribe-1) + - [disconnect](#disconnect-1) + - [getDeadLetterTasks](#getdeadlettertasks) + - [getQueueStats](#getqueuestats) + - [clearQueue](#clearqueue) - [Contributing](#contributing) - [License](#license) @@ -29,6 +42,8 @@ pnpm add @qified/rabbitmq ## Usage with Qified +### Message Provider + ```ts import { createQified } from "@qified/rabbitmq"; import type { Message } from "qified"; @@ -46,6 +61,46 @@ await qified.publish("example-topic", { id: "1", data: "Hello from RabbitMQ!" }) await qified.disconnect(); ``` +### Task Provider + +```ts +import { RabbitMqTaskProvider } from "@qified/rabbitmq"; + +const taskProvider = new RabbitMqTaskProvider({ uri: "amqp://localhost:5672" }); + +// Enqueue a task +await taskProvider.enqueue("my-queue", { + data: { action: "send-email", to: "user@example.com" }, +}); + +// Dequeue and process tasks +await taskProvider.dequeue("my-queue", { + id: "email-handler", + handler: async (task, ctx) => { + console.log("Processing task:", task.data); + + // Access attempt metadata + console.log(`Attempt ${ctx.metadata.attempt} of ${ctx.metadata.maxRetries}`); + + // Extend the deadline if needed + await ctx.extend(10_000); + + // Acknowledge the task on success + await ctx.ack(); + }, +}); + +// Get queue statistics +const stats = await taskProvider.getQueueStats("my-queue"); +console.log(stats); // { waiting, processing, deadLetter } + +// Get dead-letter tasks for inspection +const deadLetters = await taskProvider.getDeadLetterTasks("my-queue"); + +// Clean up +await taskProvider.disconnect(); +``` + ## API ### RabbitMqMessageProviderOptions @@ -90,6 +145,80 @@ Cancels all subscriptions and closes the underlying RabbitMQ connection. Convenience factory that returns a `Qified` instance configured with `RabbitMqMessageProvider`. +### RabbitMqTaskProviderOptions + +Configuration options for the RabbitMQ task provider. Extends `TaskProviderOptions`. + +- `uri?`: RabbitMQ connection URI. Defaults to `"amqp://localhost:5672"`. +- `id?`: Unique identifier for this provider instance. Defaults to `"@qified/rabbitmq-task"`. +- `timeout?`: Default timeout in milliseconds for task processing. Defaults to `30000`. +- `retries?`: Default maximum retry attempts before a task is moved to the dead-letter queue. Defaults to `3`. +- `reconnectTimeInSeconds?`: Time in seconds to wait before reconnecting after connection loss. Set to `0` to disable. Defaults to `5`. + +### RabbitMqTaskProvider + +Implements the `TaskProvider` interface using RabbitMQ durable queues for reliable task processing. Extends `Hookified` for event emission. Features include: + +- Automatic retries with configurable max attempts +- Task timeouts with automatic rejection on expiry +- Dead-letter queue for failed tasks +- Automatic reconnection on connection loss + +#### constructor(options?: RabbitMqTaskProviderOptions) + +Creates a new task provider instance. + +#### connect() + +Explicitly connects to RabbitMQ. Called automatically on first `enqueue` or `dequeue` if not called manually. + +#### enqueue(queue: string, taskData: EnqueueTask) + +Enqueues a task to the specified queue. Returns a `Promise` with the generated task ID. + +Task data options: + +- `data`: The task payload (any serializable value). +- `id?`: Custom task ID. Auto-generated if omitted. +- `timeout?`: Per-task timeout override in milliseconds. +- `maxRetries?`: Per-task max retry override. +- `priority?`: Task priority value. + +#### dequeue(queue: string, handler: TaskHandler) + +Registers a handler to process tasks from the specified queue. The handler receives a `Task` and a `TaskContext`. + +`TaskContext` methods: + +- `ack()`: Acknowledge the task (removes it from the queue). +- `reject(requeue?: boolean)`: Reject the task. If `requeue` is `true` (default), re-enqueues for retry. After max retries, moves to dead-letter queue. +- `extend(ms: number)`: Extend the processing deadline by the given milliseconds. +- `metadata`: Object with `{ attempt, maxRetries }` for the current task. + +#### unsubscribe(queue: string, id?: string) + +Removes a handler by id, or all handlers for the queue if no id is provided. + +#### disconnect(force?: boolean) + +Disconnects from RabbitMQ and cleans up all consumers, timers, and in-memory state. If `force` is `true`, skips graceful channel close. + +#### getDeadLetterTasks(queue: string) + +Returns an array of tasks that have been moved to the dead-letter queue for the given queue. + +#### getQueueStats(queue: string) + +Returns statistics for the given queue: + +```ts +{ waiting: number; processing: number; deadLetter: number } +``` + +#### clearQueue(queue: string) + +Purges all tasks from the queue and its dead-letter queue, and clears all in-memory tracking state. + ## Contributing Contributions are welcome! Please read the [CONTRIBUTING.md](../../CONTRIBUTING.md) and [CODE_OF_CONDUCT.md](../../CODE_OF_CONDUCT.md) for details on our process. diff --git a/packages/rabbitmq/package.json b/packages/rabbitmq/package.json index 78fb5e8..52c38c4 100644 --- a/packages/rabbitmq/package.json +++ b/packages/rabbitmq/package.json @@ -33,7 +33,8 @@ "author": "Jared Wray ", "license": "MIT", "dependencies": { - "amqplib": "^0.10.9" + "amqplib": "^0.10.9", + "hookified": "^1.15.1" }, "peerDependencies": { "qified": "workspace:^" diff --git a/packages/rabbitmq/src/index.ts b/packages/rabbitmq/src/index.ts index 360c9f9..a21155b 100644 --- a/packages/rabbitmq/src/index.ts +++ b/packages/rabbitmq/src/index.ts @@ -7,6 +7,14 @@ import { type TopicHandler, } from "qified"; +export { + defaultRabbitMqTaskId, + defaultRetries as defaultTaskRetries, + defaultTimeout as defaultTaskTimeout, + RabbitMqTaskProvider, + type RabbitMqTaskProviderOptions, +} from "./task.js"; + /** * Configuration options for the RabbitMQ message provider. */ diff --git a/packages/rabbitmq/src/task.ts b/packages/rabbitmq/src/task.ts new file mode 100644 index 0000000..2a98716 --- /dev/null +++ b/packages/rabbitmq/src/task.ts @@ -0,0 +1,718 @@ +import { Buffer } from "node:buffer"; +import { randomUUID } from "node:crypto"; +import { type Channel, type ChannelModel, connect } from "amqplib"; +import { Hookified } from "hookified"; +import type { + EnqueueTask, + Task, + TaskContext, + TaskHandler, + TaskProvider, + TaskProviderOptions, +} from "qified"; + +/** + * Configuration options for the RabbitMQ task provider. + */ +export type RabbitMqTaskProviderOptions = TaskProviderOptions & { + /** RabbitMQ connection URI. Defaults to "amqp://localhost:5672" */ + uri?: string; + /** Unique identifier for this provider instance. Defaults to "@qified/rabbitmq-task" */ + id?: string; + /** Time in seconds to wait before reconnecting. Set to 0 to disable. Defaults to 5 */ + reconnectTimeInSeconds?: number; +}; + +/** Default RabbitMQ connection URI */ +export const defaultRabbitMqTaskUri = "amqp://localhost:5672"; + +/** Default RabbitMQ task provider identifier */ +export const defaultRabbitMqTaskId = "@qified/rabbitmq-task"; + +/** Default timeout for task processing (30 seconds) */ +export const defaultTimeout = 30_000; + +/** Default maximum retry attempts */ +export const defaultRetries = 3; + +/** Default reconnect time in seconds */ +export const defaultReconnectTimeInSeconds = 5; + +/** + * RabbitMQ-based task provider for Qified. + * Uses RabbitMQ queues for reliable task queue processing + * with visibility timeout, retries, and dead-letter queues. + * Extends Hookified to emit events for errors and other lifecycle events. + */ +export class RabbitMqTaskProvider extends Hookified implements TaskProvider { + private _id: string; + private _timeout: number; + private _retries: number; + private _taskHandlers: Map; + + private _connection: ChannelModel | undefined; + private _channel: Channel | undefined; + private _uri: string; + private _reconnectTimeInSeconds: number; + private _reconnecting = false; + private _closing = false; + private _reconnectTimer: ReturnType | undefined; + private _connectionPromise: Promise | null = null; + + private _active = true; + private readonly _consumerTags: Map = new Map(); + + // In-memory tracking for attempt counts: taskId -> count + private readonly _attemptCounts: Map = new Map(); + + // Track queue -> set of taskIds for cleanup + private readonly _queueTaskIds: Map> = new Map(); + + // Track currently processing tasks: queue -> set of taskIds + private readonly _processingTasks: Map> = new Map(); + + /** + * Creates a new RabbitMQ task provider instance. + * @param options Configuration options for the provider + */ + constructor(options: RabbitMqTaskProviderOptions = {}) { + super(); + this._uri = options.uri ?? defaultRabbitMqTaskUri; + this._id = options.id ?? defaultRabbitMqTaskId; + this._timeout = options.timeout ?? defaultTimeout; + this._retries = options.retries ?? defaultRetries; + this._reconnectTimeInSeconds = + options.reconnectTimeInSeconds ?? defaultReconnectTimeInSeconds; + this._taskHandlers = new Map(); + } + + /** + * Gets the provider ID. + */ + public get id(): string { + return this._id; + } + + /** + * Sets the provider ID. + */ + public set id(id: string) { + this._id = id; + } + + /** + * Gets the default timeout for task processing. + */ + public get timeout(): number { + return this._timeout; + } + + /** + * Sets the default timeout for task processing. + */ + public set timeout(timeout: number) { + this._timeout = timeout; + } + + /** + * Gets the default maximum retry attempts. + */ + public get retries(): number { + return this._retries; + } + + /** + * Sets the default maximum retry attempts. + */ + public set retries(retries: number) { + this._retries = retries; + } + + /** + * Gets the task handlers map. + */ + public get taskHandlers(): Map { + return this._taskHandlers; + } + + /** + * Sets the task handlers map. + */ + public set taskHandlers(value: Map) { + this._taskHandlers = value; + } + + /** + * Connects to RabbitMQ. Can be called explicitly or will be called automatically on first use. + */ + async connect(): Promise { + if (!this._connectionPromise) { + this._connectionPromise = (async () => { + try { + const connection = await connect(this._uri); + this._connection = connection; + this._channel = await connection.createChannel(); + + connection.on("error", () => { + // Connection error emitted — connection is already closing/closed. + // The 'close' handler will trigger reconnection. + }); + + connection.on("close", () => { + this._channel = undefined; + this._connection = undefined; + if (!this._closing) { + this._scheduleReconnect(); + } + }); + } catch (error) { + this._connectionPromise = null; + throw error; + } + })(); + } + + return this._connectionPromise; + } + + /** + * Returns the connected channel, connecting if necessary. + */ + private async getChannel(): Promise { + if (!this._connection || !this._channel) { + await this.connect(); + } + + // biome-ignore lint/style/noNonNullAssertion: channel is set by connect + return this._channel!; + } + + /** + * Schedules a reconnection attempt after the configured delay. + */ + private _scheduleReconnect(): void { + if ( + this._reconnectTimeInSeconds <= 0 || + this._reconnecting || + this._closing + ) { + return; + } + + this._reconnectTimer = setTimeout(async () => { + this._reconnectTimer = undefined; + await this._attemptReconnect(); + }, this._reconnectTimeInSeconds * 1000); + } + + /** + * Attempts to reconnect to RabbitMQ and re-establish all consumers. + */ + private async _attemptReconnect(): Promise { + if (this._reconnecting || this._closing) { + return; + } + + this._reconnecting = true; + let failed = false; + try { + // Reset connection promise so connect() creates a new one + this._connectionPromise = null; + await this.connect(); + + // biome-ignore lint/style/noNonNullAssertion: channel is set by connect + const channel = this._channel!; + + // Re-establish consumers for all queues with handlers + const queues = [...this._taskHandlers.keys()]; + for (const queue of queues) { + this._consumerTags.delete(queue); + await this._setupConsumer(channel, queue); + } + } catch { + this._channel = undefined; + this._connection = undefined; + this._connectionPromise = null; + failed = true; + } finally { + this._reconnecting = false; + } + + if (failed) { + this._scheduleReconnect(); + } + } + + /** + * Generates a globally unique task ID. + */ + private generateTaskId(): string { + return `task-${randomUUID()}`; + } + + /** + * Publishes a task to a RabbitMQ queue. + */ + private async publishTask(queue: string, task: Task): Promise { + const channel = await this.getChannel(); + await channel.assertQueue(queue, { durable: true }); + + // Track task in memory + if (!this._queueTaskIds.has(queue)) { + this._queueTaskIds.set(queue, new Set()); + } + + this._queueTaskIds.get(queue)?.add(task.id); + + channel.sendToQueue(queue, Buffer.from(JSON.stringify(task)), { + persistent: true, + }); + } + + /** + * Moves a task to the dead-letter queue. + */ + private async moveToDeadLetter(queue: string, task: Task): Promise { + const channel = await this.getChannel(); + const dlqName = `${queue}:dead-letter`; + await channel.assertQueue(dlqName, { durable: true }); + channel.sendToQueue(dlqName, Buffer.from(JSON.stringify(task)), { + persistent: true, + }); + + // Clean up in-memory tracking + this._attemptCounts.delete(task.id); + this._queueTaskIds.get(queue)?.delete(task.id); + } + + /** + * Cleans up in-memory tracking for a task after acknowledgment. + */ + private cleanupTask(queue: string, taskId: string): void { + this._attemptCounts.delete(taskId); + this._queueTaskIds.get(queue)?.delete(taskId); + } + + /** + * Sets up a RabbitMQ consumer for a task queue. + */ + private async _setupConsumer(channel: Channel, queue: string): Promise { + await channel.assertQueue(queue, { durable: true }); + await channel.assertQueue(`${queue}:dead-letter`, { durable: true }); + await channel.prefetch(1); + + const { consumerTag } = await channel.consume( + queue, + async (message_) => { + if (!message_) { + return; + } + + /* v8 ignore next 4 -- @preserve */ + if (!this._active) { + channel.nack(message_, false, true); + return; + } + + const task = JSON.parse(message_.content.toString()) as Task; + const handlers = this._taskHandlers.get(queue); + /* v8 ignore next 4 -- @preserve */ + if (!handlers || handlers.length === 0) { + channel.nack(message_, false, true); + return; + } + + // Track as processing + if (!this._processingTasks.has(queue)) { + this._processingTasks.set(queue, new Set()); + } + + this._processingTasks.get(queue)?.add(task.id); + + // Shared AMQP message state to prevent double ack/nack + let amqpHandled = false; + const ackAmqp = () => { + if (!amqpHandled) { + amqpHandled = true; + channel.ack(message_); + } + }; + + const nackAmqp = () => { + if (!amqpHandled) { + amqpHandled = true; + channel.nack(message_, false, false); + } + }; + + for (const handler of handlers) { + await this.processTask(queue, task, handler, ackAmqp, nackAmqp); + } + + // Remove from processing + this._processingTasks.get(queue)?.delete(task.id); + }, + { noAck: false }, + ); + + this._consumerTags.set(queue, consumerTag); + } + + /** + * Processes a single task with a handler. + */ + private async processTask( + queue: string, + task: Task, + handler: TaskHandler, + ackAmqp: () => void, + nackAmqp: () => void, + ): Promise { + const maxRetries = task.maxRetries ?? this._retries; + const timeout = task.timeout ?? this._timeout; + + // Increment attempt count + const currentAttempt = (this._attemptCounts.get(task.id) ?? 0) + 1; + this._attemptCounts.set(task.id, currentAttempt); + + let acknowledged = false; + let rejected = false; + let timeoutHandle: ReturnType | undefined; + + const context: TaskContext = { + ack: async () => { + if (acknowledged || rejected || !this._active) { + return; + } + + acknowledged = true; + try { + ackAmqp(); + this.cleanupTask(queue, task.id); + } catch (error) { + /* v8 ignore next -- @preserve */ + this.emit("error", error); + } + }, + reject: async (requeue = true) => { + if (acknowledged || rejected || !this._active) { + return; + } + + rejected = true; + try { + if (requeue && currentAttempt < maxRetries) { + await this.publishTask(queue, task); + } else { + await this.moveToDeadLetter(queue, task); + } + + nackAmqp(); + } catch (error) { + /* v8 ignore next -- @preserve */ + this.emit("error", error); + } + }, + extend: async (ttl: number) => { + if (acknowledged || rejected || !this._active) { + return; + } + + try { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + + timeoutHandle = setTimeout(() => { + if (!acknowledged && !rejected && this._active) { + void context.reject(true); + } + }, ttl); + } catch (error) { + /* v8 ignore next -- @preserve */ + this.emit("error", error); + } + }, + metadata: { + attempt: currentAttempt, + maxRetries, + }, + }; + + // Set timeout handler + timeoutHandle = setTimeout(() => { + if (!acknowledged && !rejected && this._active) { + void context.reject(true); + } + }, timeout); + + try { + await handler.handler(task, context); + + // Auto-ack if handler completes without explicit ack/reject + if (!acknowledged && !rejected) { + await context.ack(); + } + } catch { + // Auto-reject on error + if (!acknowledged && !rejected) { + await context.reject(true); + } + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } + } + + /** + * Enqueues a task to a specific queue. + * @param queue The queue name to enqueue to + * @param taskData The task data to enqueue + * @returns The ID of the enqueued task + */ + public async enqueue(queue: string, taskData: EnqueueTask): Promise { + if (!this._active) { + throw new Error("TaskProvider has been disconnected"); + } + + const task: Task = { + id: this.generateTaskId(), + timestamp: Date.now(), + ...taskData, + }; + + // Publish to RabbitMQ queue + await this.publishTask(queue, task); + return task.id; + } + + /** + * Registers a handler to process tasks from a queue. + * @param queue The queue name to dequeue from + * @param handler The handler configuration + */ + public async dequeue(queue: string, handler: TaskHandler): Promise { + if (!this._active) { + throw new Error("TaskProvider has been disconnected"); + } + + if (!this._taskHandlers.has(queue)) { + this._taskHandlers.set(queue, []); + } + + this._taskHandlers.get(queue)?.push(handler); + + // Set up consumer if not already + if (!this._consumerTags.has(queue)) { + const channel = await this.getChannel(); + await this._setupConsumer(channel, queue); + } + } + + /** + * Unsubscribes a handler from a queue. + * @param queue The queue name to unsubscribe from + * @param id Optional handler ID. If not provided, removes all handlers. + */ + public async unsubscribe(queue: string, id?: string): Promise { + if (id) { + const handlers = this._taskHandlers.get(queue); + if (handlers) { + this._taskHandlers.set( + queue, + handlers.filter((h) => h.id !== id), + ); + } + } else { + this._taskHandlers.delete(queue); + } + + // Stop polling and cancel consumer if no handlers left for this queue + if ( + !this._taskHandlers.has(queue) || + this._taskHandlers.get(queue)?.length === 0 + ) { + const consumerTag = this._consumerTags.get(queue); + if (consumerTag && this._channel) { + try { + await this._channel.cancel(consumerTag); + } catch { + /* ignore if channel closed */ + } + + this._consumerTags.delete(queue); + } + } + } + + /** + * Disconnects and cleans up the provider. + * @param force If true, skips graceful close. Defaults to false. + */ + public async disconnect(force = false): Promise { + this._active = false; + this._closing = true; + + // Clear reconnect timer + if (this._reconnectTimer) { + clearTimeout(this._reconnectTimer); + this._reconnectTimer = undefined; + } + + // Clear handlers and in-memory state + this._taskHandlers.clear(); + this._attemptCounts.clear(); + this._queueTaskIds.clear(); + this._processingTasks.clear(); + + // Cancel consumers and close channel/connection + if (this._channel) { + if (!force) { + for (const tag of this._consumerTags.values()) { + try { + await this._channel.cancel(tag); + } catch { + /* ignore */ + } + } + } + + this._consumerTags.clear(); + + if (!force) { + try { + await this._channel.close(); + } catch { + /* ignore */ + } + } + + this._channel = undefined; + } + + if (this._connection) { + if (!force) { + try { + await this._connection.close(); + } catch { + /* ignore */ + } + } + + this._connection = undefined; + } + + this._connectionPromise = null; + this._closing = false; + } + + /** + * Gets all tasks in the dead-letter queue for a specific queue. + * @param queue The queue name + * @returns Array of tasks in the dead-letter queue + */ + public async getDeadLetterTasks(queue: string): Promise { + const channel = await this.getChannel(); + const dlqName = `${queue}:dead-letter`; + await channel.assertQueue(dlqName, { durable: true }); + + const tasks: Task[] = []; + const messages: import("amqplib").GetMessage[] = []; + + // Collect all messages without acking — unacked messages are held by + // the channel and won't be re-delivered to subsequent get calls. + let msg = await channel.get(dlqName, { noAck: false }); + while (msg) { + messages.push(msg); + try { + const task = JSON.parse(msg.content.toString()) as Task; + tasks.push(task); + } catch { + // Skip malformed messages + } + + msg = await channel.get(dlqName, { noAck: false }); + } + + // Nack all messages back to the queue so they remain for future inspection + for (const m of messages) { + channel.nack(m, false, true); + } + + return tasks; + } + + /** + * Gets the current state of a queue. + * Uses assertQueue to safely check queue state without risking channel closure. + * @param queue The queue name + * @returns Queue statistics: `waiting` (ready messages in RabbitMQ), `processing` (tasks + * currently being handled by this provider instance), and `deadLetter`. + */ + public async getQueueStats(queue: string): Promise<{ + waiting: number; + processing: number; + deadLetter: number; + }> { + let waiting = 0; + try { + const channel = await this.getChannel(); + // Use assertQueue (idempotent) instead of checkQueue to avoid + // channel closure when queue doesn't exist + const queueInfo = await channel.assertQueue(queue, { durable: true }); + waiting = queueInfo.messageCount; + } catch { + /* v8 ignore next -- @preserve */ + // Queue assertion failed + } + + let deadLetter = 0; + try { + const channel = await this.getChannel(); + const dlqInfo = await channel.assertQueue(`${queue}:dead-letter`, { + durable: true, + }); + deadLetter = dlqInfo.messageCount; + } catch { + /* v8 ignore next -- @preserve */ + // DLQ assertion failed + } + + const processing = this._processingTasks.get(queue)?.size ?? 0; + + return { + waiting, + processing, + deadLetter, + }; + } + + /** + * Clears all data for a queue. Useful for testing. + * Uses assertQueue before purgeQueue to avoid channel closure on non-existent queues. + * @param queue The queue name to clear + */ + public async clearQueue(queue: string): Promise { + const channel = await this.getChannel(); + + // Assert queues first to ensure they exist (prevents channel error on purge) + await channel.assertQueue(queue, { durable: true }); + await channel.purgeQueue(queue); + + await channel.assertQueue(`${queue}:dead-letter`, { durable: true }); + await channel.purgeQueue(`${queue}:dead-letter`); + + this._processingTasks.delete(queue); + + // Clear task data for this queue + const taskIds = this._queueTaskIds.get(queue); + if (taskIds) { + for (const taskId of taskIds) { + this._attemptCounts.delete(taskId); + } + + this._queueTaskIds.delete(queue); + } + } +} diff --git a/packages/rabbitmq/test/task-edge.test.ts b/packages/rabbitmq/test/task-edge.test.ts new file mode 100644 index 0000000..59eb3b1 --- /dev/null +++ b/packages/rabbitmq/test/task-edge.test.ts @@ -0,0 +1,342 @@ +import { EventEmitter } from "node:events"; +import type { Channel, ChannelModel } from "amqplib"; +import { + afterEach, + beforeEach, + describe, + expect, + type Mock, + test, + vi, +} from "vitest"; +import { RabbitMqTaskProvider } from "../src/index.js"; + +// Mock amqplib +vi.mock("amqplib", () => ({ + connect: vi.fn(), +})); + +function createMockChannel(): Channel { + return { + assertQueue: vi + .fn() + .mockResolvedValue({ queue: "test", messageCount: 0, consumerCount: 0 }), + sendToQueue: vi.fn().mockReturnValue(true), + consume: vi.fn().mockResolvedValue({ consumerTag: `ctag-${Date.now()}` }), + cancel: vi.fn().mockResolvedValue({}), + ack: vi.fn(), + nack: vi.fn(), + prefetch: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + purgeQueue: vi.fn().mockResolvedValue({ messageCount: 0 }), + } as unknown as Channel; +} + +function createMockConnection(channel: Channel): ChannelModel & EventEmitter { + const emitter = new EventEmitter(); + return Object.assign(emitter, { + createChannel: vi.fn().mockResolvedValue(channel), + close: vi.fn().mockResolvedValue(undefined), + }) as unknown as ChannelModel & EventEmitter; +} + +let mockConnect: Mock; + +beforeEach(async () => { + const amqplib = await import("amqplib"); + mockConnect = amqplib.connect as unknown as Mock; + mockConnect.mockReset(); +}); + +afterEach(() => { + vi.useRealTimers(); +}); + +describe("RabbitMqTaskProvider (edge cases requiring mocks)", () => { + test("should schedule reconnect when connection closes unexpectedly", async () => { + vi.useFakeTimers(); + + const channel1 = createMockChannel(); + const connection1 = createMockConnection(channel1); + const channel2 = createMockChannel(); + const connection2 = createMockConnection(channel2); + + mockConnect + .mockResolvedValueOnce(connection1) + .mockResolvedValueOnce(connection2); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + expect(mockConnect).toHaveBeenCalledTimes(1); + + // Simulate unexpected connection loss — triggers _scheduleReconnect (line 179) + connection1.emit("close"); + + // Advance timer so reconnect fires (lines 212-215) + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(2); + + await provider.disconnect(true); + }); + + test("should retry reconnection on failure then succeed", async () => { + vi.useFakeTimers(); + + const channel1 = createMockChannel(); + const connection1 = createMockConnection(channel1); + const channel3 = createMockChannel(); + const connection3 = createMockConnection(channel3); + + mockConnect + .mockResolvedValueOnce(connection1) + .mockRejectedValueOnce(new Error("Connection refused")) + .mockResolvedValueOnce(connection3); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + expect(mockConnect).toHaveBeenCalledTimes(1); + + // Simulate connection loss + connection1.emit("close"); + + // First reconnect attempt — fails (line 242-246) + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(2); + + // Second reconnect attempt — succeeds (lines 228-241) + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(3); + + await provider.disconnect(true); + }); + + test("should not reconnect if reconnectTimeInSeconds is 0", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValueOnce(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 0, + }); + await provider.connect(); + + // Simulate connection loss — _scheduleReconnect should bail out (line 205) + connection.emit("close"); + + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(5000); + expect(mockConnect).not.toHaveBeenCalled(); + + await provider.disconnect(true); + }); + + test("should not reconnect if closing flag is set when timer fires", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValue(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Simulate connection loss — schedules reconnect + connection.emit("close"); + + // Set _closing before the timer fires (line 222-223) + (provider as unknown as { _closing: boolean })._closing = true; + + try { + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(1500); + expect(mockConnect).not.toHaveBeenCalled(); + } finally { + (provider as unknown as { _closing: boolean })._closing = false; + } + }); + + test("should not schedule reconnect if already reconnecting", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValue(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Set _reconnecting to true — _scheduleReconnect should bail out (line 206) + (provider as unknown as { _reconnecting: boolean })._reconnecting = true; + + connection.emit("close"); + + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(5000); + expect(mockConnect).not.toHaveBeenCalled(); + + (provider as unknown as { _reconnecting: boolean })._reconnecting = false; + await provider.disconnect(true); + }); + + test("should re-establish consumers on reconnect", async () => { + vi.useFakeTimers(); + + const channel1 = createMockChannel(); + const connection1 = createMockConnection(channel1); + const channel2 = createMockChannel(); + const connection2 = createMockConnection(channel2); + + mockConnect + .mockResolvedValueOnce(connection1) + .mockResolvedValueOnce(connection2); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Register a handler so there's something to re-establish (lines 237-241) + provider.taskHandlers.set("my-queue", [ + { id: "h1", handler: async () => {} }, + ]); + + // Simulate connection loss + connection1.emit("close"); + + // Reconnect fires and re-establishes consumers + await vi.advanceTimersByTimeAsync(1100); + expect(mockConnect).toHaveBeenCalledTimes(2); + // Should have called consume on the new channel for the queue + expect(channel2.consume).toHaveBeenCalledWith( + "my-queue", + expect.any(Function), + { noAck: false }, + ); + + await provider.disconnect(true); + }); + + test("should handle null message in consumer callback", async () => { + const channel = createMockChannel(); + const connection = createMockConnection(channel); + mockConnect.mockResolvedValue(connection); + + let consumerCallback: (msg: unknown) => Promise; + (channel.consume as Mock).mockImplementation( + async (_queue: string, cb: (msg: unknown) => Promise) => { + consumerCallback = cb; + return { consumerTag: "ctag-test" }; + }, + ); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 0, + }); + await provider.connect(); + + // Register a handler and set up consumer + const tasks: unknown[] = []; + provider.taskHandlers.set("test-q", [ + { + id: "h1", + handler: async (task) => { + tasks.push(task); + }, + }, + ]); + + const ch = (provider as unknown as { _channel: Channel })._channel; + // Manually call _setupConsumer + await ( + provider as unknown as { + _setupConsumer(ch: Channel, q: string): Promise; + } + )._setupConsumer(ch, "test-q"); + + // Send null message — should return early (line 323-324) + // biome-ignore lint/style/noNonNullAssertion: set by mock + await consumerCallback!(null); + expect(tasks).toHaveLength(0); + expect(channel.ack).not.toHaveBeenCalled(); + expect(channel.nack).not.toHaveBeenCalled(); + + await provider.disconnect(true); + }); + + test("should clear reconnect timer on disconnect", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + + mockConnect.mockResolvedValue(connection); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + // Simulate connection loss — schedules reconnect timer + connection.emit("close"); + + // Verify the timer is set (lines 657-658) + const internal = provider as unknown as { + _reconnectTimer: ReturnType | undefined; + }; + expect(internal._reconnectTimer).toBeDefined(); + + // Disconnect should clear the timer + await provider.disconnect(true); + expect(internal._reconnectTimer).toBeUndefined(); + + // No reconnect should have happened + mockConnect.mockClear(); + await vi.advanceTimersByTimeAsync(5000); + expect(mockConnect).not.toHaveBeenCalled(); + }); + + test("should handle connection error event gracefully", async () => { + vi.useFakeTimers(); + + const channel = createMockChannel(); + const connection = createMockConnection(channel); + const channel2 = createMockChannel(); + const connection2 = createMockConnection(channel2); + + mockConnect + .mockResolvedValueOnce(connection) + .mockResolvedValueOnce(connection2); + + const provider = new RabbitMqTaskProvider({ + reconnectTimeInSeconds: 1, + }); + await provider.connect(); + + expect(connection.listenerCount("error")).toBe(1); + expect(connection.listenerCount("close")).toBe(1); + + // Emit error — handler is a no-op but should not throw + connection.emit("error", new Error("Socket closed")); + + // Close event triggers reconnection + connection.emit("close"); + + await vi.advanceTimersByTimeAsync(1500); + expect(mockConnect).toHaveBeenCalledTimes(2); + + await provider.disconnect(true); + }); +}); diff --git a/packages/rabbitmq/test/task.test.ts b/packages/rabbitmq/test/task.test.ts new file mode 100644 index 0000000..417eb0c --- /dev/null +++ b/packages/rabbitmq/test/task.test.ts @@ -0,0 +1,1127 @@ +// biome-ignore-all lint/suspicious/noExplicitAny: This is a test file and explicit any is acceptable here. + +import type { Task, TaskContext, TaskHandler } from "qified"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { + defaultRabbitMqTaskId, + defaultTaskRetries, + defaultTaskTimeout, + RabbitMqTaskProvider, +} from "../src/index.js"; + +async function waitFor( + condition: () => boolean | Promise, + timeoutMs = 5000, + intervalMs = 50, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (await condition()) { + return; + } + + await new Promise((resolve) => { + setTimeout(resolve, intervalMs); + }); + } + + throw new Error(`waitFor timed out after ${timeoutMs}ms`); +} + +let queueCounter = 0; +function uniqueQueue(): string { + queueCounter++; + return `test-queue-${Date.now()}-${queueCounter}`; +} + +describe("RabbitMqTaskProvider", () => { + let provider: RabbitMqTaskProvider; + const testQueue = uniqueQueue(); + + // Track custom providers for cleanup + const customProviders: RabbitMqTaskProvider[] = []; + + async function createCustomProvider( + options: ConstructorParameters[0] = {}, + ): Promise { + const p = new RabbitMqTaskProvider(options); + customProviders.push(p); + return p; + } + + describe("hookified inheritance", () => { + test("should extend Hookified and support event emission", () => { + const p = new RabbitMqTaskProvider(); + expect(typeof p.on).toBe("function"); + expect(typeof p.emit).toBe("function"); + expect(typeof p.off).toBe("function"); + }); + + test("should emit error events when RabbitMQ operations fail in ack", async () => { + const customProvider = await createCustomProvider(); + await customProvider.connect(); + const q = uniqueQueue(); + await customProvider.clearQueue(q); + + const errors: Error[] = []; + customProvider.on("error", (error: Error) => { + errors.push(error); + }); + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, _ctx: TaskContext) => { + // Handler completes, auto-ack will happen + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + // Wait for task to be processed + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Clean up + await customProvider.clearQueue(q); + await customProvider.disconnect(); + + // Create a new provider to verify error listener support + const p2 = await createCustomProvider(); + await p2.connect(); + await p2.clearQueue(q); + + const errors2: Error[] = []; + p2.on("error", (error: Error) => { + errors2.push(error); + }); + + expect(typeof p2.on).toBe("function"); + + await p2.clearQueue(q); + await p2.disconnect(); + }); + + test("should support onHook and removeHook from Hookified", () => { + const p = new RabbitMqTaskProvider(); + expect(typeof p.onHook).toBe("function"); + expect(typeof p.removeHook).toBe("function"); + }); + + test("should support once method from Hookified", () => { + const p = new RabbitMqTaskProvider(); + expect(typeof p.once).toBe("function"); + }); + }); + + beforeEach(async () => { + provider = new RabbitMqTaskProvider(); + await provider.connect(); + await provider.clearQueue(testQueue); + }); + + afterEach(async () => { + // Clean up custom providers first + for (const p of customProviders) { + try { + await p.disconnect(true); + } catch { + /* ignore */ + } + } + + customProviders.length = 0; + + await provider.clearQueue(testQueue); + await provider.disconnect(); + }); + + describe("constructor and initialization", () => { + test("should initialize with default values", () => { + const p = new RabbitMqTaskProvider(); + expect(p.id).toBe(defaultRabbitMqTaskId); + expect(p.timeout).toBe(defaultTaskTimeout); + expect(p.retries).toBe(defaultTaskRetries); + expect(p.taskHandlers).toEqual(new Map()); + }); + + test("should initialize with custom id", () => { + const p = new RabbitMqTaskProvider({ id: "custom-id" }); + expect(p.id).toBe("custom-id"); + }); + + test("should initialize with custom timeout", () => { + const p = new RabbitMqTaskProvider({ timeout: 5000 }); + expect(p.timeout).toBe(5000); + }); + + test("should initialize with custom retries", () => { + const p = new RabbitMqTaskProvider({ retries: 5 }); + expect(p.retries).toBe(5); + }); + + test("should initialize with all custom options", () => { + const p = new RabbitMqTaskProvider({ + id: "custom-id", + timeout: 5000, + retries: 5, + }); + expect(p.id).toBe("custom-id"); + expect(p.timeout).toBe(5000); + expect(p.retries).toBe(5); + }); + + test("should fail to connect when RabbitMQ is not available", async () => { + const p = new RabbitMqTaskProvider({ uri: "amqp://localhost:9999" }); + await expect(p.connect()).rejects.toThrow(); + }); + }); + + describe("getters and setters", () => { + test("should set and get id", () => { + provider.id = "new-id"; + expect(provider.id).toBe("new-id"); + }); + + test("should set and get timeout", () => { + provider.timeout = 10000; + expect(provider.timeout).toBe(10000); + }); + + test("should set and get retries", () => { + provider.retries = 10; + expect(provider.retries).toBe(10); + }); + + test("should set and get taskHandlers", () => { + const handlers = new Map(); + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + handlers.set("test-queue", [handler]); + provider.taskHandlers = handlers; + expect(provider.taskHandlers).toBe(handlers); + }); + }); + + describe("enqueue", () => { + test("should enqueue a task with auto-generated id and timestamp", async () => { + const taskId = await provider.enqueue(testQueue, { + data: { message: "test" }, + }); + + expect(taskId).toBeDefined(); + expect(typeof taskId).toBe("string"); + expect(taskId).toMatch( + /^task-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/, + ); + }); + + test("should enqueue multiple tasks with unique ids", async () => { + const taskId1 = await provider.enqueue(testQueue, { + data: { message: "test1" }, + }); + const taskId2 = await provider.enqueue(testQueue, { + data: { message: "test2" }, + }); + + expect(taskId1).not.toBe(taskId2); + }); + + test("should enqueue task with all optional fields", async () => { + const taskId = await provider.enqueue(testQueue, { + data: { message: "test" }, + headers: { "x-custom": "value" }, + priority: 10, + maxRetries: 5, + timeout: 5000, + }); + + expect(taskId).toBeDefined(); + }); + + test("should throw error when disconnected", async () => { + await provider.disconnect(); + + await expect( + provider.enqueue(testQueue, { data: { message: "test" } }), + ).rejects.toThrow("TaskProvider has been disconnected"); + }); + }); + + describe("dequeue and task processing", () => { + test("should register a handler for a queue", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler); + + expect(provider.taskHandlers.has(testQueue)).toBe(true); + expect(provider.taskHandlers.get(testQueue)?.length).toBe(1); + expect(provider.taskHandlers.get(testQueue)?.[0]).toBe(handler); + }); + + test("should register multiple handlers for the same queue", async () => { + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(2); + }); + + test("should process enqueued task when handler is registered", async () => { + let processedTask: Task | undefined; + const handler: TaskHandler = { + id: "test-handler", + handler: async (task: Task) => { + processedTask = task; + }, + }; + + await provider.dequeue(testQueue, handler); + const taskId = await provider.enqueue(testQueue, { + data: { message: "test" }, + }); + + await waitFor(() => processedTask !== undefined); + + expect(processedTask).toBeDefined(); + expect(processedTask?.id).toBe(taskId); + expect(processedTask?.data).toEqual({ message: "test" }); + }); + + test("should auto-acknowledge task when handler completes successfully", async () => { + let processed = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + processed = true; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => processed); + + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + }); + + test("should throw error when disconnected", async () => { + await provider.disconnect(); + + await expect( + provider.dequeue(testQueue, { handler: async () => {} }), + ).rejects.toThrow("TaskProvider has been disconnected"); + }); + }); + + describe("task acknowledgment", () => { + test("should acknowledge task explicitly", async () => { + let context: TaskContext | undefined; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + context = ctx; + await ctx.ack(); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => context !== undefined); + + expect(context).toBeDefined(); + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + }); + + test("should not acknowledge twice", async () => { + let ackCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.ack(); + ackCount++; + await ctx.ack(); // Second ack should be no-op + ackCount++; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => ackCount >= 2); + + expect(ackCount).toBe(2); + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + }); + }); + + describe("task rejection and retry", () => { + test("should reject and requeue task on failure", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({}); + await customProvider.connect(); + await customProvider.clearQueue(q); + + let attemptCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + attemptCount++; + if (attemptCount === 1) { + await ctx.reject(true); + } else { + await ctx.ack(); + } + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + await waitFor(() => attemptCount > 1); + + expect(attemptCount).toBeGreaterThan(1); + + await customProvider.clearQueue(q); + }); + + test("should move to dead-letter queue after max retries", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ + retries: 2, + }); + await customProvider.connect(); + await customProvider.clearQueue(q); + + let attemptCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + attemptCount++; + await ctx.reject(true); + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(q); + return dlq.length === 1; + }); + + const deadLetterTasks = await customProvider.getDeadLetterTasks(q); + expect(deadLetterTasks.length).toBe(1); + expect(deadLetterTasks[0].data).toEqual({ message: "test" }); + expect(attemptCount).toBe(2); + + await customProvider.clearQueue(q); + }); + + test("should move to dead-letter queue when reject with requeue=false", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length === 1; + }); + + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(1); + }); + + test("should not reject twice", async () => { + let rejectCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + rejectCount++; + await ctx.reject(false); // Second reject should be no-op + rejectCount++; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => rejectCount >= 2); + + expect(rejectCount).toBe(2); + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(1); + }); + + test("should auto-reject task on handler error", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ + retries: 3, + }); + await customProvider.connect(); + await customProvider.clearQueue(q); + + let attemptCount = 0; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + attemptCount++; + throw new Error("Handler error"); + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(q); + return dlq.length === 1; + }); + + expect(attemptCount).toBe(3); + const deadLetterTasks = await customProvider.getDeadLetterTasks(q); + expect(deadLetterTasks.length).toBe(1); + + await customProvider.clearQueue(q); + }); + }); + + describe("task timeout", () => { + test("should timeout task after configured timeout", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ + timeout: 100, + retries: 1, + }); + await customProvider.connect(); + await customProvider.clearQueue(q); + + let handlerCalled = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + handlerCalled = true; + // Simulate long-running task + await new Promise((resolve) => setTimeout(resolve, 500)); + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + // Wait for the task to be processed and eventually end up in DLQ + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(q); + return dlq.length > 0; + }, 10_000); + + expect(handlerCalled).toBe(true); + const dlq = await customProvider.getDeadLetterTasks(q); + expect(dlq.length).toBeGreaterThan(0); + + await customProvider.clearQueue(q); + }); + + test("should use task-specific timeout over provider default", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ + timeout: 5000, + retries: 1, + }); + await customProvider.connect(); + await customProvider.clearQueue(q); + + let handlerCalled = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + handlerCalled = true; + // Simulate long-running task + await new Promise((resolve) => setTimeout(resolve, 500)); + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { + data: { message: "test" }, + timeout: 50, // Task-specific timeout (shorter than handler duration) + }); + + // Wait for the task to end up in DLQ after retries exhaust + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(q); + return dlq.length > 0; + }, 10_000); + + expect(handlerCalled).toBe(true); + const dlq = await customProvider.getDeadLetterTasks(q); + expect(dlq.length).toBeGreaterThan(0); + + await customProvider.clearQueue(q); + }); + }); + + describe("task context extend", () => { + test("should extend task deadline", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ + timeout: 200, + }); + await customProvider.connect(); + await customProvider.clearQueue(q); + + let completed = false; + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + // Extend timeout before long operation + await ctx.extend(1000); + await new Promise((resolve) => setTimeout(resolve, 300)); + completed = true; + await ctx.ack(); + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + await waitFor(() => completed); + + expect(completed).toBe(true); + const stats = await customProvider.getQueueStats(q); + expect(stats.waiting).toBe(0); + + await customProvider.clearQueue(q); + }); + + test("should not extend after acknowledgment", async () => { + let extendCalled = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.ack(); + await ctx.extend(1000); // Should be no-op + extendCalled = true; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => extendCalled); + + expect(extendCalled).toBe(true); + }); + }); + + describe("task context metadata", () => { + test("should provide attempt and maxRetries in context", async () => { + let contextMetadata: any; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + contextMetadata = ctx.metadata; + await ctx.ack(); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => contextMetadata !== undefined); + + expect(contextMetadata).toBeDefined(); + expect(contextMetadata.attempt).toBe(1); + expect(contextMetadata.maxRetries).toBe(defaultTaskRetries); + }); + + test("should increment attempt on retry", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({}); + await customProvider.connect(); + await customProvider.clearQueue(q); + + const attempts: number[] = []; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + attempts.push(ctx.metadata.attempt); + if (ctx.metadata.attempt < 2) { + await ctx.reject(true); + } else { + await ctx.ack(); + } + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + await waitFor(() => attempts.length > 1); + + expect(attempts.length).toBeGreaterThan(1); + expect(attempts[0]).toBe(1); + expect(attempts[1]).toBe(2); + + await customProvider.clearQueue(q); + }); + + test("should use task-specific maxRetries", async () => { + let contextMetadata: any; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + contextMetadata = ctx.metadata; + await ctx.ack(); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { + data: { message: "test" }, + maxRetries: 10, + }); + + await waitFor(() => contextMetadata !== undefined); + + expect(contextMetadata.maxRetries).toBe(10); + }); + }); + + describe("unsubscribe", () => { + test("should unsubscribe specific handler by id", async () => { + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(2); + + await provider.unsubscribe(testQueue, "handler-1"); + + const handlers = provider.taskHandlers.get(testQueue); + expect(handlers?.length).toBe(1); + expect(handlers?.[0].id).toBe("handler-2"); + }); + + test("should unsubscribe all handlers when id not provided", async () => { + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(2); + + await provider.unsubscribe(testQueue); + + expect(provider.taskHandlers.has(testQueue)).toBe(false); + }); + + test("should handle unsubscribe for non-existent queue", async () => { + await expect( + provider.unsubscribe("non-existent-queue", "handler-1"), + ).resolves.not.toThrow(); + }); + + test("should handle unsubscribe for non-existent handler id", async () => { + const handler: TaskHandler = { + id: "handler-1", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler); + + await expect( + provider.unsubscribe(testQueue, "non-existent-handler"), + ).resolves.not.toThrow(); + + expect(provider.taskHandlers.get(testQueue)?.length).toBe(1); + }); + }); + + describe("disconnect", () => { + test("should clear all handlers on disconnect", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + expect(provider.taskHandlers.size).toBeGreaterThan(0); + + await provider.disconnect(); + + expect(provider.taskHandlers.size).toBe(0); + }); + + test("should prevent operations after disconnect", async () => { + await provider.disconnect(); + + await expect( + provider.enqueue(testQueue, { data: { message: "test" } }), + ).rejects.toThrow("TaskProvider has been disconnected"); + + await expect( + provider.dequeue(testQueue, { handler: async () => {} }), + ).rejects.toThrow("TaskProvider has been disconnected"); + }); + + test("should force disconnect", async () => { + const p = await createCustomProvider(); + await p.connect(); + + const handler: TaskHandler = { + id: "test-handler", + handler: async () => {}, + }; + const q = uniqueQueue(); + await p.dequeue(q, handler); + + // Force disconnect should skip graceful close + await p.disconnect(true); + expect(p.taskHandlers.size).toBe(0); + }); + }); + + describe("getDeadLetterTasks", () => { + test("should return empty array for queue with no dead-letter tasks", async () => { + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks).toEqual([]); + }); + + test("should return dead-letter tasks for queue", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test1" } }); + await provider.enqueue(testQueue, { data: { message: "test2" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length === 2; + }); + + const deadLetterTasks = await provider.getDeadLetterTasks(testQueue); + expect(deadLetterTasks.length).toBe(2); + }); + }); + + describe("getQueueStats", () => { + test("should return empty stats for non-existent queue", async () => { + const stats = await provider.getQueueStats("non-existent-queue"); + expect(stats).toEqual({ + waiting: 0, + processing: 0, + deadLetter: 0, + }); + }); + + test("should return correct stats for queue with waiting tasks", async () => { + // Use a fresh queue with no consumer so messages stay as "ready" + const q = uniqueQueue(); + await provider.enqueue(q, { data: { message: "test1" } }); + await provider.enqueue(q, { data: { message: "test2" } }); + + // Small delay for RabbitMQ to process + await new Promise((resolve) => setTimeout(resolve, 50)); + + const stats = await provider.getQueueStats(q); + expect(stats.waiting).toBe(2); + expect(stats.processing).toBe(0); + expect(stats.deadLetter).toBe(0); + + await provider.clearQueue(q); + }); + + test("should return correct stats with dead-letter tasks", async () => { + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test1" } }); + await provider.enqueue(testQueue, { data: { message: "test2" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length === 2; + }); + + const stats = await provider.getQueueStats(testQueue); + expect(stats.waiting).toBe(0); + expect(stats.deadLetter).toBe(2); + }); + }); + + describe("multiple handlers", () => { + test("should deliver task to all registered handlers", async () => { + let handler1Called = false; + let handler2Called = false; + + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => { + handler1Called = true; + }, + }; + + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => { + handler2Called = true; + }, + }; + + await provider.dequeue(testQueue, handler1); + await provider.dequeue(testQueue, handler2); + await provider.enqueue(testQueue, { data: { message: "test" } }); + + await waitFor(() => handler1Called && handler2Called); + + expect(handler1Called).toBe(true); + expect(handler2Called).toBe(true); + }); + }); + + describe("multiple queues", () => { + test("should maintain separate queues", async () => { + const queue1 = uniqueQueue(); + const queue2 = uniqueQueue(); + + let queue1Processed = false; + let queue2Processed = false; + + const handler1: TaskHandler = { + id: "handler-1", + handler: async () => { + queue1Processed = true; + }, + }; + + const handler2: TaskHandler = { + id: "handler-2", + handler: async () => { + queue2Processed = true; + }, + }; + + await provider.dequeue(queue1, handler1); + await provider.dequeue(queue2, handler2); + await provider.enqueue(queue1, { data: { message: "test1" } }); + await provider.enqueue(queue2, { data: { message: "test2" } }); + + await waitFor(() => queue1Processed && queue2Processed); + + expect(queue1Processed).toBe(true); + expect(queue2Processed).toBe(true); + + const stats1 = await provider.getQueueStats(queue1); + const stats2 = await provider.getQueueStats(queue2); + + expect(stats1.waiting).toBe(0); + expect(stats2.waiting).toBe(0); + + // Clean up + await provider.clearQueue(queue1); + await provider.clearQueue(queue2); + }); + }); + + describe("task fields", () => { + test("should preserve task data fields", async () => { + let receivedTask: Task | undefined; + + const handler: TaskHandler = { + id: "test-handler", + handler: async (task: Task) => { + receivedTask = task; + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { + data: { message: "test", nested: { value: 123 } }, + headers: { "x-custom": "header-value" }, + priority: 5, + }); + + await waitFor(() => receivedTask !== undefined); + + expect(receivedTask).toBeDefined(); + expect(receivedTask?.data).toEqual({ + message: "test", + nested: { value: 123 }, + }); + expect(receivedTask?.headers).toEqual({ "x-custom": "header-value" }); + expect(receivedTask?.priority).toBe(5); + expect(receivedTask?.timestamp).toBeDefined(); + }); + }); + + describe("clearQueue", () => { + test("should clear all data for a queue", async () => { + // Add tasks to various states + await provider.enqueue(testQueue, { data: { message: "test1" } }); + + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + await ctx.reject(false); + }, + }; + + await provider.dequeue(testQueue, handler); + await provider.enqueue(testQueue, { data: { message: "test3" } }); + + await waitFor(async () => { + const dlq = await provider.getDeadLetterTasks(testQueue); + return dlq.length > 0; + }); + + // Clear queue + await provider.clearQueue(testQueue); + + // Verify data is cleared + const statsAfter = await provider.getQueueStats(testQueue); + expect(statsAfter.waiting).toBe(0); + expect(statsAfter.deadLetter).toBe(0); + }); + }); + + describe("edge cases", () => { + test("should handle disconnect during task processing", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({}); + await customProvider.connect(); + await customProvider.clearQueue(q); + + const handler: TaskHandler = { + id: "test-handler", + handler: async () => { + // Long operation + await new Promise((resolve) => setTimeout(resolve, 200)); + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + // Wait just enough for processing to start + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Disconnect while potentially in the middle of processing + await customProvider.disconnect(); + + // Should handle gracefully + expect(customProvider.taskHandlers.size).toBe(0); + }); + + test("should handle polling loop exit when inactive", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({}); + await customProvider.connect(); + await customProvider.clearQueue(q); + + await customProvider.dequeue(q, { + id: "test-handler", + handler: async () => {}, + }); + + // Let polling start + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Disconnect - this sets _active to false + await customProvider.disconnect(); + + // Wait a bit more - polling should have stopped + await new Promise((resolve) => setTimeout(resolve, 100)); + + // No errors should occur + expect(customProvider.taskHandlers.size).toBe(0); + }); + + test("should handle extended timeout expiration triggering reject", async () => { + const q = uniqueQueue(); + const customProvider = await createCustomProvider({ + timeout: 5000, + retries: 1, + }); + await customProvider.connect(); + await customProvider.clearQueue(q); + + let extendCalled = false; + const handler: TaskHandler = { + id: "test-handler", + handler: async (_task: Task, ctx: TaskContext) => { + // Extend with a short timeout + await ctx.extend(50); + extendCalled = true; + // Wait longer than the extended timeout + await new Promise((resolve) => setTimeout(resolve, 200)); + // Task should have been auto-rejected by now via the extended timeout + }, + }; + + await customProvider.dequeue(q, handler); + await customProvider.enqueue(q, { data: { message: "test" } }); + + // Wait for processing and extended timeout to eventually move to DLQ + await waitFor(async () => { + const dlq = await customProvider.getDeadLetterTasks(q); + return extendCalled && dlq.length > 0; + }, 10_000); + + expect(extendCalled).toBe(true); + + await customProvider.clearQueue(q); + }); + }); +}); diff --git a/packages/redis/src/task.ts b/packages/redis/src/task.ts index cc3d48e..8ebc976 100644 --- a/packages/redis/src/task.ts +++ b/packages/redis/src/task.ts @@ -18,7 +18,7 @@ export type RedisTaskProviderOptions = TaskProviderOptions & { uri?: string; /** Unique identifier for this provider instance. Defaults to "@qified/redis-task" */ id?: string; - /** Poll interval in milliseconds for checking scheduled and timed-out tasks. Defaults to 1000 */ + /** Poll interval in milliseconds for checking timed-out tasks. Defaults to 1000 */ pollInterval?: number; }; @@ -132,7 +132,12 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { async connect(): Promise { if (!this._connectionPromise) { this._connectionPromise = (async () => { - await this._client.connect(); + try { + await this._client.connect(); + } catch (error) { + this._connectionPromise = null; + throw error; + } })(); } return this._connectionPromise; @@ -161,13 +166,6 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { return `${queue}:tasks`; } - /** - * Gets the Redis key for scheduled tasks sorted set. - */ - private getScheduledKey(queue: string): string { - return `${queue}:scheduled`; - } - /** * Gets the Redis key for processing tasks sorted set. */ @@ -221,17 +219,8 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { // Initialize attempt count await client.set(this.getTaskAttemptKey(queue, task.id), "0"); - // Add to appropriate queue - if (task.scheduledAt && task.scheduledAt > Date.now()) { - // Add to scheduled sorted set with scheduledAt as score - await client.zAdd(this.getScheduledKey(queue), { - score: task.scheduledAt, - value: task.id, - }); - } else { - // Add to queue list (LPUSH for FIFO with RPOP) - await client.lPush(this.getQueueKey(queue), task.id); - } + // Add to queue list (LPUSH for FIFO with RPOP) + await client.lPush(this.getQueueKey(queue), task.id); // Process immediately if handlers are registered void this.processQueue(queue); @@ -275,7 +264,6 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { } try { - await this.checkScheduledTasks(queue); await this.checkTimedOutTasks(queue); await this.processQueue(queue); } catch (error) { @@ -291,36 +279,6 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { this._pollTimers.set(queue, setTimeout(poll, this._pollInterval)); } - /** - * Checks for scheduled tasks that are ready to execute. - */ - private async checkScheduledTasks(queue: string): Promise { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - - const client = await this.getClient(); - const now = Date.now(); - - // Get scheduled tasks with score <= now - const readyTasks = await client.zRangeByScore( - this.getScheduledKey(queue), - 0, - now, - ); - - for (const taskId of readyTasks) { - /* v8 ignore next -- @preserve */ - if (!this._active) { - return; - } - // Move from scheduled to queue - await client.zRem(this.getScheduledKey(queue), taskId); - await client.lPush(this.getQueueKey(queue), taskId); - } - } - /** * Checks for tasks that have timed out during processing. */ @@ -683,22 +641,19 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { waiting: number; processing: number; deadLetter: number; - scheduled: number; }> { const client = await this.getClient(); - const [waiting, processing, deadLetter, scheduled] = await Promise.all([ + const [waiting, processing, deadLetter] = await Promise.all([ client.lLen(this.getQueueKey(queue)), client.zCard(this.getProcessingKey(queue)), client.lLen(this.getDeadLetterKey(queue)), - client.zCard(this.getScheduledKey(queue)), ]); return { waiting, processing, deadLetter, - scheduled, }; } @@ -710,20 +665,13 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { const client = await this.getClient(); // Get all task IDs from all locations - const [queueTasks, processingTasks, deadLetterTasks, scheduledTasks] = - await Promise.all([ - client.lRange(this.getQueueKey(queue), 0, -1), - client.zRange(this.getProcessingKey(queue), 0, -1), - client.lRange(this.getDeadLetterKey(queue), 0, -1), - client.zRange(this.getScheduledKey(queue), 0, -1), - ]); - - const allTaskIds = [ - ...queueTasks, - ...processingTasks, - ...deadLetterTasks, - ...scheduledTasks, - ]; + const [queueTasks, processingTasks, deadLetterTasks] = await Promise.all([ + client.lRange(this.getQueueKey(queue), 0, -1), + client.zRange(this.getProcessingKey(queue), 0, -1), + client.lRange(this.getDeadLetterKey(queue), 0, -1), + ]); + + const allTaskIds = [...queueTasks, ...processingTasks, ...deadLetterTasks]; // Delete all task data and attempt counters for (const taskId of allTaskIds) { @@ -735,6 +683,5 @@ export class RedisTaskProvider extends Hookified implements TaskProvider { await client.del(this.getQueueKey(queue)); await client.del(this.getProcessingKey(queue)); await client.del(this.getDeadLetterKey(queue)); - await client.del(this.getScheduledKey(queue)); } } diff --git a/packages/redis/test/task.test.ts b/packages/redis/test/task.test.ts index 571242f..8ec12bc 100644 --- a/packages/redis/test/task.test.ts +++ b/packages/redis/test/task.test.ts @@ -9,25 +9,6 @@ import { RedisTaskProvider, } from "../src/index.js"; -async function waitFor( - condition: () => boolean | Promise, - timeoutMs = 5000, - intervalMs = 50, -): Promise { - const deadline = Date.now() + timeoutMs; - while (Date.now() < deadline) { - if (await condition()) { - return; - } - - await new Promise((resolve) => { - setTimeout(resolve, intervalMs); - }); - } - - throw new Error(`waitFor timed out after ${timeoutMs}ms`); -} - describe("RedisTaskProvider", () => { let provider: RedisTaskProvider; const testQueue = `test-queue-${Date.now()}`; @@ -239,7 +220,6 @@ describe("RedisTaskProvider", () => { priority: 10, maxRetries: 5, timeout: 5000, - scheduledAt: Date.now() + 10000, }); expect(taskId).toBeDefined(); @@ -551,45 +531,6 @@ describe("RedisTaskProvider", () => { await customProvider.clearQueue(testQueue); await customProvider.disconnect(); }); - - test("should use task-specific timeout over provider default", async () => { - const customProvider = new RedisTaskProvider({ - timeout: 5000, - pollInterval: 50, - }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - const handler: TaskHandler = { - id: "test-handler", - handler: async (_task: Task, ctx: TaskContext) => { - // Simulate long-running task - await new Promise((resolve) => setTimeout(resolve, 200)); - await ctx.ack(); - }, - }; - - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { - data: { message: "test" }, - timeout: 50, // Task-specific timeout (shorter than handler duration) - }); - - // Poll until the timeout has fired and the task has been requeued or dead-lettered - await waitFor(async () => { - const s = await customProvider.getQueueStats(testQueue); - return s.waiting + s.deadLetter > 0; - }); - - // Task should have timed out and been requeued or moved to dead letter - const stats = await customProvider.getQueueStats(testQueue); - expect( - stats.waiting + stats.deadLetter + stats.processing, - ).toBeGreaterThan(0); - - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); - }); }); describe("task context extend", () => { @@ -669,38 +610,6 @@ describe("RedisTaskProvider", () => { expect(contextMetadata.maxRetries).toBe(defaultTaskRetries); }); - test("should increment attempt on retry", async () => { - const customProvider = new RedisTaskProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - const attempts: number[] = []; - const handler: TaskHandler = { - id: "test-handler", - handler: async (_task: Task, ctx: TaskContext) => { - attempts.push(ctx.metadata.attempt); - if (ctx.metadata.attempt < 2) { - await ctx.reject(true); - } else { - await ctx.ack(); - } - }, - }; - - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { data: { message: "test" } }); - - // Poll until both processing attempts have completed - await waitFor(() => attempts.length > 1); - - expect(attempts.length).toBeGreaterThan(1); - expect(attempts[0]).toBe(1); - expect(attempts[1]).toBe(2); - - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); - }); - test("should use task-specific maxRetries", async () => { let contextMetadata: any; const handler: TaskHandler = { @@ -723,62 +632,6 @@ describe("RedisTaskProvider", () => { }); }); - describe("scheduled tasks", () => { - test("should not process task before scheduledAt time", async () => { - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await provider.dequeue(testQueue, handler); - await provider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 2000, // Schedule 2 seconds in future - }); - - await new Promise((resolve) => setTimeout(resolve, 200)); - - expect(processed).toBe(false); - const stats = await provider.getQueueStats(testQueue); - expect(stats.scheduled).toBe(1); - }); - - test("should process task after scheduledAt time via polling", async () => { - const customProvider = new RedisTaskProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - let processed = false; - const handler: TaskHandler = { - id: "test-handler", - handler: async () => { - processed = true; - }, - }; - - await customProvider.dequeue(testQueue, handler); - await customProvider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 100, // Schedule 100ms in future - }); - - // Task should not be processed yet - await new Promise((resolve) => setTimeout(resolve, 50)); - expect(processed).toBe(false); - - // Wait for scheduled task to be processed by polling - await new Promise((resolve) => setTimeout(resolve, 200)); - - expect(processed).toBe(true); - - await customProvider.clearQueue(testQueue); - await customProvider.disconnect(); - }); - }); - describe("unsubscribe", () => { test("should unsubscribe specific handler by id", async () => { const handler1: TaskHandler = { @@ -921,7 +774,6 @@ describe("RedisTaskProvider", () => { waiting: 0, processing: 0, deadLetter: 0, - scheduled: 0, }); }); @@ -935,17 +787,6 @@ describe("RedisTaskProvider", () => { expect(stats.deadLetter).toBe(0); }); - test("should return correct stats with scheduled tasks", async () => { - await provider.enqueue(testQueue, { - data: { message: "test" }, - scheduledAt: Date.now() + 10000, - }); - - const stats = await provider.getQueueStats(testQueue); - expect(stats.scheduled).toBe(1); - expect(stats.waiting).toBe(0); - }); - test("should return correct stats with dead-letter tasks", async () => { const handler: TaskHandler = { id: "test-handler", @@ -1075,10 +916,6 @@ describe("RedisTaskProvider", () => { test("should clear all data for a queue", async () => { // Add tasks to various states await provider.enqueue(testQueue, { data: { message: "test1" } }); - await provider.enqueue(testQueue, { - data: { message: "test2" }, - scheduledAt: Date.now() + 10000, - }); const handler: TaskHandler = { id: "test-handler", @@ -1094,9 +931,7 @@ describe("RedisTaskProvider", () => { // Verify data exists const statsBefore = await provider.getQueueStats(testQueue); - expect( - statsBefore.waiting + statsBefore.scheduled + statsBefore.deadLetter, - ).toBeGreaterThan(0); + expect(statsBefore.waiting + statsBefore.deadLetter).toBeGreaterThan(0); // Clear queue await provider.clearQueue(testQueue); @@ -1106,7 +941,6 @@ describe("RedisTaskProvider", () => { expect(statsAfter.waiting).toBe(0); expect(statsAfter.processing).toBe(0); expect(statsAfter.deadLetter).toBe(0); - expect(statsAfter.scheduled).toBe(0); }); }); @@ -1145,34 +979,6 @@ describe("RedisTaskProvider", () => { await customProvider.disconnect(); }); - test("should handle disconnect during scheduled task processing", async () => { - // This test covers early return in checkScheduledTasks (lines 293, 308) - const customProvider = new RedisTaskProvider({ pollInterval: 50 }); - await customProvider.connect(); - await customProvider.clearQueue(testQueue); - - // Enqueue multiple scheduled tasks - for (let i = 0; i < 5; i++) { - await customProvider.enqueue(testQueue, { - data: { message: `scheduled-${i}` }, - scheduledAt: Date.now() + 10, // Very short delay - }); - } - - // Register a handler to start polling - await customProvider.dequeue(testQueue, { - id: "test-handler", - handler: async () => {}, - }); - - // Wait a tiny bit then disconnect during processing - await new Promise((resolve) => setTimeout(resolve, 30)); - await customProvider.disconnect(); - - // Should not throw - disconnect handled gracefully - expect(customProvider.taskHandlers.size).toBe(0); - }); - test("should handle disconnect during timed out task processing", async () => { // This test covers early return in checkTimedOutTasks (lines 321, 335) const customProvider = new RedisTaskProvider({ diff --git a/packages/zeromq/README.md b/packages/zeromq/README.md index 9199e96..74b0c76 100644 --- a/packages/zeromq/README.md +++ b/packages/zeromq/README.md @@ -9,7 +9,7 @@ This package implements a message provider backed by ZeroMQ using queues for pub - [Installation](#installation) - [Usage with Qified](#usage-with-qified) - [API](#api) - - [ZmqMessageProviderOptions](#ZmqMessageProviderOptions) + - [ZmqMessageProviderOptions](#zmqmessageprovideroptions) - [defaultZmqUri](#defaultzmquri) - [ZmqMessageProvider](#zmqmessageprovider) - [constructor](#constructor) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3da2c46..3c5894f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -59,6 +59,9 @@ importers: amqplib: specifier: ^0.10.9 version: 0.10.9 + hookified: + specifier: ^1.15.1 + version: 1.15.1 qified: specifier: workspace:^ version: link:../qified