Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ Qified is a task and message queue library with multiple providers, built with T

## Development Rules

1. **Always run `pnpm test` before committing** - All tests must pass
2. **Maintain 100% code coverage** - Add tests for any new code
1. **Always run `pnpm test` after every change** - You must test your changes every time, no exceptions. All tests must pass before committing.
2. **Maintain 100% code coverage** - Add tests for any new code. Every change must achieve 100% coverage. If coverage drops below 100%, add or update tests until it is restored.
3. **Follow existing code style** - Biome enforces formatting and linting

## Structure
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![site/logo.svg](site/logo.svg)](https://qified.org)

[![tests](https://github.com/jaredwray/qified/actions/workflows/tests.yaml/badge.svg)](https://github.com/jaredwray/qified/actions/workflows/tests.yaml)
[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/master/LICENSE)
[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/main/LICENSE)
[![codecov](https://codecov.io/gh/jaredwray/qified/graph/badge.svg?token=jcRdy8SkOG)](https://codecov.io/gh/jaredwray/qified)
[![npm](https://img.shields.io/npm/dm/qified)](https://npmjs.com/package/qified)
[![npm](https://img.shields.io/npm/v/qified)](https://npmjs.com/package/qified)
Expand Down
12 changes: 6 additions & 6 deletions packages/qified/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![logo.svg](https://qified.org/logo.svg)](https://qified.org)

[![tests](https://github.com/jaredwray/qified/actions/workflows/tests.yaml/badge.svg)](https://github.com/jaredwray/qified/actions/workflows/tests.yaml)
[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/master/LICENSE)
[![GitHub license](https://img.shields.io/github/license/jaredwray/qified)](https://github.com/jaredwray/qified/blob/main/LICENSE)
[![codecov](https://codecov.io/gh/jaredwray/qified/graph/badge.svg?token=jcRdy8SkOG)](https://codecov.io/gh/jaredwray/qified)
[![npm](https://img.shields.io/npm/dm/qified)](https://npmjs.com/package/qified)
[![npm](https://img.shields.io/npm/v/qified)](https://npmjs.com/package/qified)
Expand Down Expand Up @@ -198,7 +198,7 @@ await qified.unsubscribe('user-events', 'userEventHandler');
await qified.unsubscribe('user-events');
```

## disconnect`
## disconnect

Disconnect from all providers and clean up resources.

Expand Down Expand Up @@ -473,10 +473,10 @@ qified.on(QifiedEvents.publish, async (data) => {
There are multiple providers available to use:

* Memory - this is built into the current `qified` library as `MemoryMessageProvider`.
* [@qified/redis](packages/redis/README.md) - Redis Provider
* [@qified/rabbitmq](packages/rabbitmq/README.md) - RabbitMQ Provider
* [@qified/nats](packages/nats/README.md) - NATS Provider
* [@qified/zeromq](packages/zeromq/README.md) - ZeroMQ Provider
* [@qified/redis](../redis/README.md) - Redis Provider
* [@qified/rabbitmq](../rabbitmq/README.md) - RabbitMQ Provider
* [@qified/nats](../nats/README.md) - NATS Provider
* [@qified/zeromq](../zeromq/README.md) - ZeroMQ Provider

# Development and Testing

Expand Down
8 changes: 0 additions & 8 deletions packages/qified/src/memory/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,6 @@ export class MemoryTaskProvider implements TaskProvider {
continue;
}

// Check if task is scheduled for later
if (
queuedTask.task.scheduledAt &&
queuedTask.task.scheduledAt > Date.now()
) {
continue;
}

// Mark as processing
queuedTask.processing = true;
processingSet.add(queuedTask.task.id);
Expand Down
7 changes: 0 additions & 7 deletions packages/qified/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ export type Task<T = any> = {
*/
timestamp?: number;

/**
* Scheduled time for delayed task execution (milliseconds since epoch)
* If set, task won't be processed until this time
* @type {number}
*/
scheduledAt?: number;

/**
* Headers for additional metadata
* @type {Record<string, string>}
Expand Down
82 changes: 0 additions & 82 deletions packages/qified/test/memory-task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ describe("MemoryTaskProvider", () => {
priority: 10,
maxRetries: 5,
timeout: 5000,
scheduledAt: Date.now() + 10000,
});

expect(taskId).toBeDefined();
Expand Down Expand Up @@ -523,58 +522,6 @@ describe("MemoryTaskProvider", () => {
});
});

describe("scheduled tasks", () => {
test("should not process task before scheduledAt time", async () => {
let processed = false;
const handler: TaskHandler = {
id: "test-handler",
handler: async () => {
processed = true;
},
};

await provider.dequeue("test-queue", handler);
await provider.enqueue("test-queue", {
data: { message: "test" },
scheduledAt: Date.now() + 1000, // Schedule 1 second in future
});

await new Promise((resolve) => setTimeout(resolve, 100));

expect(processed).toBe(false);
const stats = provider.getQueueStats("test-queue");
expect(stats.waiting).toBe(1);
});

test("should process task after scheduledAt time", async () => {
let processed = false;
const handler: TaskHandler = {
id: "test-handler",
handler: async () => {
processed = true;
},
};

await provider.dequeue("test-queue", handler);
await provider.enqueue("test-queue", {
data: { message: "test" },
scheduledAt: Date.now() + 50, // Schedule 50ms in future
});

// Task should not be processed yet
await new Promise((resolve) => setTimeout(resolve, 25));
expect(processed).toBe(false);

// Now enqueue another task to trigger processing of scheduled task
await provider.enqueue("test-queue", { data: { message: "trigger" } });

// Wait for scheduled task to be processed
await new Promise((resolve) => setTimeout(resolve, 100));

expect(processed).toBe(true);
});
});

describe("unsubscribe", () => {
test("should unsubscribe specific handler by id", async () => {
const handler1: TaskHandler = {
Expand Down Expand Up @@ -798,35 +745,6 @@ describe("MemoryTaskProvider", () => {
expect(stats.waiting).toBe(0);
expect(stats.processing).toBe(0);
});

test("should not process scheduled tasks after disconnect", async () => {
const customProvider = new MemoryTaskProvider();
let taskProcessed = false;

const handler: TaskHandler = {
id: "test-handler",
handler: async () => {
taskProcessed = true;
},
};

await customProvider.dequeue("test-queue", handler);

// Enqueue a task scheduled for the future
await customProvider.enqueue("test-queue", {
data: { message: "scheduled-task" },
scheduledAt: Date.now() + 100,
});

// Disconnect before the scheduled time
await customProvider.disconnect();

// Wait past the scheduled time
await new Promise((resolve) => setTimeout(resolve, 150));

// Task should not have been processed
expect(taskProcessed).toBe(false);
});
});

describe("getDeadLetterTasks", () => {
Expand Down
133 changes: 131 additions & 2 deletions packages/rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# @qified/rabbitmq

RabbitMQ message provider for [Qified](https://github.com/jaredwray/qified).
RabbitMQ message and task provider for [Qified](https://github.com/jaredwray/qified).

This package implements a message provider backed by RabbitMQ using queues for publish and subscribe operations.
This package implements a message provider and a task provider backed by RabbitMQ. The message provider uses queues for publish/subscribe operations, and the task provider adds reliable task queue processing with retries, timeouts, and dead-letter queues.

## Table of Contents

- [Installation](#installation)
- [Usage with Qified](#usage-with-qified)
- [Message Provider](#message-provider)
- [Task Provider](#task-provider)
- [API](#api)
- [RabbitMqMessageProviderOptions](#rabbitmqmessageprovideroptions)
- [defaultRabbitMqUri](#defaultrabbitmquri)
Expand All @@ -18,6 +20,17 @@ This package implements a message provider backed by RabbitMQ using queues for p
- [unsubscribe](#unsubscribe)
- [disconnect](#disconnect)
- [createQified](#createqified)
- [RabbitMqTaskProviderOptions](#rabbitmqtaskprovideroptions)
- [RabbitMqTaskProvider](#rabbitmqtaskprovider)
- [constructor](#constructor-1)
- [connect](#connect)
- [enqueue](#enqueue)
- [dequeue](#dequeue)
- [unsubscribe](#unsubscribe-1)
- [disconnect](#disconnect-1)
- [getDeadLetterTasks](#getdeadlettertasks)
- [getQueueStats](#getqueuestats)
- [clearQueue](#clearqueue)
- [Contributing](#contributing)
- [License](#license)

Expand All @@ -29,6 +42,8 @@ pnpm add @qified/rabbitmq

## Usage with Qified

### Message Provider

```ts
import { createQified } from "@qified/rabbitmq";
import type { Message } from "qified";
Expand All @@ -46,6 +61,46 @@ await qified.publish("example-topic", { id: "1", data: "Hello from RabbitMQ!" })
await qified.disconnect();
```

### Task Provider

```ts
import { RabbitMqTaskProvider } from "@qified/rabbitmq";

const taskProvider = new RabbitMqTaskProvider({ uri: "amqp://localhost:5672" });

// Enqueue a task
await taskProvider.enqueue("my-queue", {
data: { action: "send-email", to: "user@example.com" },
});

// Dequeue and process tasks
await taskProvider.dequeue("my-queue", {
id: "email-handler",
handler: async (task, ctx) => {
console.log("Processing task:", task.data);

// Access attempt metadata
console.log(`Attempt ${ctx.metadata.attempt} of ${ctx.metadata.maxRetries}`);

// Extend the deadline if needed
await ctx.extend(10_000);

// Acknowledge the task on success
await ctx.ack();
},
});

// Get queue statistics
const stats = await taskProvider.getQueueStats("my-queue");
console.log(stats); // { waiting, processing, deadLetter }

// Get dead-letter tasks for inspection
const deadLetters = await taskProvider.getDeadLetterTasks("my-queue");

// Clean up
await taskProvider.disconnect();
```

## API

### RabbitMqMessageProviderOptions
Expand Down Expand Up @@ -90,6 +145,80 @@ Cancels all subscriptions and closes the underlying RabbitMQ connection.

Convenience factory that returns a `Qified` instance configured with `RabbitMqMessageProvider`.

### RabbitMqTaskProviderOptions

Configuration options for the RabbitMQ task provider. Extends `TaskProviderOptions`.

- `uri?`: RabbitMQ connection URI. Defaults to `"amqp://localhost:5672"`.
- `id?`: Unique identifier for this provider instance. Defaults to `"@qified/rabbitmq-task"`.
- `timeout?`: Default timeout in milliseconds for task processing. Defaults to `30000`.
- `retries?`: Default maximum retry attempts before a task is moved to the dead-letter queue. Defaults to `3`.
- `reconnectTimeInSeconds?`: Time in seconds to wait before reconnecting after connection loss. Set to `0` to disable. Defaults to `5`.

### RabbitMqTaskProvider

Implements the `TaskProvider` interface using RabbitMQ durable queues for reliable task processing. Extends `Hookified` for event emission. Features include:

- Automatic retries with configurable max attempts
- Task timeouts with automatic rejection on expiry
- Dead-letter queue for failed tasks
- Automatic reconnection on connection loss

#### constructor(options?: RabbitMqTaskProviderOptions)

Creates a new task provider instance.

#### connect()

Explicitly connects to RabbitMQ. Called automatically on first `enqueue` or `dequeue` if not called manually.

#### enqueue(queue: string, taskData: EnqueueTask)

Enqueues a task to the specified queue. Returns a `Promise<string>` with the generated task ID.

Task data options:

- `data`: The task payload (any serializable value).
- `id?`: Custom task ID. Auto-generated if omitted.
- `timeout?`: Per-task timeout override in milliseconds.
- `maxRetries?`: Per-task max retry override.
- `priority?`: Task priority value.

#### dequeue(queue: string, handler: TaskHandler)

Registers a handler to process tasks from the specified queue. The handler receives a `Task` and a `TaskContext`.

`TaskContext` methods:

- `ack()`: Acknowledge the task (removes it from the queue).
- `reject(requeue?: boolean)`: Reject the task. If `requeue` is `true` (default), re-enqueues for retry. After max retries, moves to dead-letter queue.
- `extend(ms: number)`: Extend the processing deadline by the given milliseconds.
- `metadata`: Object with `{ attempt, maxRetries }` for the current task.

#### unsubscribe(queue: string, id?: string)

Removes a handler by id, or all handlers for the queue if no id is provided.

#### disconnect(force?: boolean)

Disconnects from RabbitMQ and cleans up all consumers, timers, and in-memory state. If `force` is `true`, skips graceful channel close.

#### getDeadLetterTasks(queue: string)

Returns an array of tasks that have been moved to the dead-letter queue for the given queue.

#### getQueueStats(queue: string)

Returns statistics for the given queue:

```ts
{ waiting: number; processing: number; deadLetter: number }
```

#### clearQueue(queue: string)

Purges all tasks from the queue and its dead-letter queue, and clears all in-memory tracking state.

## Contributing

Contributions are welcome! Please read the [CONTRIBUTING.md](../../CONTRIBUTING.md) and [CODE_OF_CONDUCT.md](../../CODE_OF_CONDUCT.md) for details on our process.
Expand Down
3 changes: 2 additions & 1 deletion packages/rabbitmq/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"author": "Jared Wray <me@jaredwray.com>",
"license": "MIT",
"dependencies": {
"amqplib": "^0.10.9"
"amqplib": "^0.10.9",
"hookified": "^1.15.1"
},
"peerDependencies": {
"qified": "workspace:^"
Expand Down
8 changes: 8 additions & 0 deletions packages/rabbitmq/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import {
type TopicHandler,
} from "qified";

export {
defaultRabbitMqTaskId,
defaultRetries as defaultTaskRetries,
defaultTimeout as defaultTaskTimeout,
RabbitMqTaskProvider,
type RabbitMqTaskProviderOptions,
} from "./task.js";

/**
* Configuration options for the RabbitMQ message provider.
*/
Expand Down
Loading
Loading