Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 105 additions & 37 deletions packages/qified/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ Task and Message Queues with Multiple Providers
* Customizable Compress / Decompress Handlers (Coming in v1.0.0)
* Provider Fail Over Support

# Table of Contents
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Constructor](#constructor)
- [Properties](#properties)
- [Methods](#methods)
- [subscribe](#subscribe)
- [publish](#publish)
- [unsubscribe](#unsubscribe)
- [disconnect](#disconnect)
- [Events](#events)
- [Available Events](#available-events)
- [Listening to Events](#listening-to-events)
- [Error Handling with Events](#error-handling-with-events)
- [Hooks](#hooks)
- [Available Hooks](#available-hooks)
- [Using Hooks](#using-hooks)
- [Modifying Data with Before Hooks](#modifying-data-with-before-hooks)
- [Modifying Topics with Before Hooks](#modifying-topics-with-before-hooks)
- [Multiple Hooks](#multiple-hooks)
- [Hooks vs Events](#hooks-vs-events)
- [Providers](#providers)
- [Development and Testing](#development-and-testing)
- [License](#license)

# Installation

Expand Down Expand Up @@ -326,7 +350,7 @@ The following hooks are available via the `QifiedHooks` enum:

## Using Hooks

Use the `onHook()` method to register a hook handler:
Use the `onHook()` method to register a hook handler. Hooks use the `IHook` object format from [Hookified](https://hookified.org):

```js
import { Qified, MemoryMessageProvider, QifiedHooks } from 'qified';
Expand All @@ -335,15 +359,30 @@ const qified = new Qified({
messageProviders: new MemoryMessageProvider()
});

// Register a before hook
qified.onHook(QifiedHooks.beforePublish, async (context) => {
console.log('About to publish to:', context.topic);
// Register a before hook using IHook object
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
console.log('About to publish to:', context.topic);
}
});

// Register an after hook
qified.onHook(QifiedHooks.afterPublish, async (context) => {
console.log('Published message:', context.message.id);
// Register an after hook with an id for later removal
qified.onHook({
id: 'publish-logger',
event: QifiedHooks.afterPublish,
handler: async (context) => {
console.log('Published message:', context.message.id);
}
});

// Register with options to control position
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
console.log('This runs first');
}
}, { position: 'Top' });
```

## Modifying Data with Before Hooks
Expand All @@ -358,28 +397,34 @@ const qified = new Qified({
});

// Add timestamp and headers to all messages
qified.onHook(QifiedHooks.beforePublish, async (context) => {
// Add timestamp if not present
context.message.timestamp = context.message.timestamp ?? Date.now();

// Add custom headers
context.message.headers = {
...context.message.headers,
'x-processed-by': 'qified',
'x-environment': process.env.NODE_ENV
};
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
// Add timestamp if not present
context.message.timestamp = context.message.timestamp ?? Date.now();

// Add custom headers
context.message.headers = {
...context.message.headers,
'x-processed-by': 'qified',
'x-environment': process.env.NODE_ENV
};
}
});

// Modify message data
qified.onHook(QifiedHooks.beforePublish, async (context) => {
// Add metadata to the message data
context.message.data = {
...context.message.data,
_meta: {
version: '1.0',
source: 'api'
}
};
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
// Add metadata to the message data
context.message.data = {
...context.message.data,
_meta: {
version: '1.0',
source: 'api'
}
};
}
});

// Subscribe to receive messages
Expand All @@ -406,8 +451,11 @@ You can also modify the topic in before hooks:

```js
// Route all messages to a prefixed topic
qified.onHook(QifiedHooks.beforePublish, async (context) => {
context.topic = `production/${context.topic}`;
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
context.topic = `production/${context.topic}`;
}
});

// Subscribe to the prefixed topic
Expand All @@ -430,20 +478,37 @@ await qified.publish('events', {
Multiple hooks for the same event execute in the order they were registered:

```js
// First hook - runs first
qified.onHook(QifiedHooks.beforePublish, async (context) => {
context.message.timestamp = Date.now();
// First hook - runs first (default position is 'Bottom')
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
context.message.timestamp = Date.now();
}
});

// Second hook - runs second, can see changes from first hook
qified.onHook(QifiedHooks.beforePublish, async (context) => {
context.message.headers = { 'x-timestamp': String(context.message.timestamp) };
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
context.message.headers = { 'x-timestamp': String(context.message.timestamp) };
}
});

// Third hook - runs third
qified.onHook(QifiedHooks.beforePublish, async (context) => {
console.log('Final message:', context.message);
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
console.log('Final message:', context.message);
}
});

// Use position option to insert at the top
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
console.log('This runs before all other hooks');
}
}, { position: 'Top' });
```

## Hooks vs Events
Expand All @@ -458,8 +523,11 @@ Both hooks and events are available, but they serve different purposes:

```js
// Hook - can modify the message before it's published
qified.onHook(QifiedHooks.beforePublish, async (context) => {
context.message.timestamp = Date.now();
qified.onHook({
event: QifiedHooks.beforePublish,
handler: async (context) => {
context.message.timestamp = Date.now();
}
});

// Event - notified after publish completes (cannot modify)
Expand Down
2 changes: 1 addition & 1 deletion packages/qified/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@
"LICENSE"
],
"dependencies": {
"hookified": "^1.15.1"
"hookified": "^2.1.0"
}
}
2 changes: 1 addition & 1 deletion packages/rabbitmq/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"license": "MIT",
"dependencies": {
"amqplib": "^0.10.9",
"hookified": "^1.15.1"
"hookified": "^2.1.0"
},
"peerDependencies": {
"qified": "workspace:^"
Expand Down
2 changes: 1 addition & 1 deletion packages/redis/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"author": "Jared Wray <me@jaredwray.com>",
"license": "MIT",
"dependencies": {
"hookified": "^1.15.1",
"hookified": "^2.1.0",
"redis": "^5.10.0"
},
"peerDependencies": {
Expand Down
12 changes: 12 additions & 0 deletions packages/redis/test/task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ describe("RedisTaskProvider", () => {

beforeEach(async () => {
provider = new RedisTaskProvider();
provider.throwOnEmptyListeners = false;
await provider.connect();
await provider.clearQueue(testQueue);
});
Expand Down Expand Up @@ -468,6 +469,7 @@ describe("RedisTaskProvider", () => {
retries: 3,
pollInterval: 100,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -502,6 +504,7 @@ describe("RedisTaskProvider", () => {
timeout: 100,
pollInterval: 50,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -539,6 +542,7 @@ describe("RedisTaskProvider", () => {
timeout: 200,
pollInterval: 50,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -952,6 +956,7 @@ describe("RedisTaskProvider", () => {
retries: 2,
pollInterval: 30,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -985,6 +990,7 @@ describe("RedisTaskProvider", () => {
timeout: 20,
pollInterval: 30,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -1074,6 +1080,7 @@ describe("RedisTaskProvider", () => {
timeout: 5000,
pollInterval: 50,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -1111,6 +1118,7 @@ describe("RedisTaskProvider", () => {
test("should handle processQueue when _active becomes false after getClient", async () => {
// This test covers line 376 (early return after getClient when _active is false)
const customProvider = new RedisTaskProvider({ pollInterval: 30 });
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -1142,6 +1150,7 @@ describe("RedisTaskProvider", () => {
retries: 1,
pollInterval: 20,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -1211,6 +1220,7 @@ describe("RedisTaskProvider", () => {
retries: 5,
pollInterval: 20,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -1249,6 +1259,7 @@ describe("RedisTaskProvider", () => {
retries: 1, // Only 1 retry allowed
pollInterval: 15,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down Expand Up @@ -1296,6 +1307,7 @@ describe("RedisTaskProvider", () => {
retries: 5,
pollInterval: 15,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

Expand Down
17 changes: 11 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pnpm-workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ packages:

minimumReleaseAge: 2880

minimumReleaseAgeExclude:
- hookified

onlyBuiltDependencies:
- esbuild
- unrs-resolver
Expand Down
Loading