Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export class RedisMessageProvider implements MessageProvider {
const subClient = await this.getSubClient();
await subClient.subscribe(topic, async (raw) => {
const message = JSON.parse(raw) as Message;
/* v8 ignore next -- @preserve */
const handlers = this.subscriptions.get(topic) ?? [];
await Promise.all(handlers.map(async (sub) => sub.handler(message)));
});
Expand All @@ -151,6 +152,7 @@ export class RedisMessageProvider implements MessageProvider {
async unsubscribe(topic: string, id?: string): Promise<void> {
if (id) {
const current = this.subscriptions.get(topic);
/* v8 ignore next -- @preserve */
if (current) {
this.subscriptions.set(
topic,
Expand All @@ -170,6 +172,7 @@ export class RedisMessageProvider implements MessageProvider {
*/
async disconnect(force = false): Promise<void> {
// Only disconnect if we've connected
/* v8 ignore next -- @preserve */
if (this.connectionPromise) {
await this.connectionPromise;

Expand All @@ -180,6 +183,7 @@ export class RedisMessageProvider implements MessageProvider {

this.subscriptions.clear();

/* v8 ignore start -- @preserve */
if (force) {
if (this.pub.isOpen) {
this.pub.destroy();
Expand All @@ -197,6 +201,7 @@ export class RedisMessageProvider implements MessageProvider {
await this.sub.close();
}
}
/* v8 ignore stop */

this.connectionPromise = null;
}
Expand Down
7 changes: 6 additions & 1 deletion packages/redis/src/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ export class RedisTaskProvider extends Hookified implements TaskProvider {
this._processingTasks.set(queue, new Set());
}

/* v8 ignore next -- @preserve */
const processingSet = this._processingTasks.get(queue) ?? new Set<string>();

// Get task from queue
Expand Down Expand Up @@ -482,6 +483,7 @@ export class RedisTaskProvider extends Hookified implements TaskProvider {
value: task.id,
});
// Reset timeout handle
/* v8 ignore start -- @preserve */
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
Expand All @@ -491,7 +493,6 @@ export class RedisTaskProvider extends Hookified implements TaskProvider {
}
}, ttl);
} catch (error) {
/* v8 ignore next -- @preserve */
this.emit("error", error);
}
},
Expand All @@ -500,6 +501,7 @@ export class RedisTaskProvider extends Hookified implements TaskProvider {
maxRetries,
},
};
/* v8 ignore stop */

// Set timeout handler
timeoutHandle = setTimeout(() => {
Expand All @@ -517,13 +519,15 @@ export class RedisTaskProvider extends Hookified implements TaskProvider {
}
} catch {
// Auto-reject on error
/* v8 ignore start -- @preserve */
if (!acknowledged && !rejected) {
await context.reject(true);
}
} finally {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
/* v8 ignore stop */
}
}

Expand Down Expand Up @@ -624,6 +628,7 @@ export class RedisTaskProvider extends Hookified implements TaskProvider {
const tasks: Task[] = [];
for (const taskId of taskIds) {
const taskDataStr = await client.get(this.getTaskDataKey(queue, taskId));
/* v8 ignore next -- @preserve */
if (taskDataStr) {
tasks.push(JSON.parse(taskDataStr) as Task);
}
Expand Down
171 changes: 168 additions & 3 deletions packages/redis/test/task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ describe("RedisTaskProvider", () => {
const stats = await customProvider.getQueueStats(testQueue);
expect(
stats.deadLetter + stats.waiting + stats.processing,
).toBeGreaterThanOrEqual(0);
).toBeGreaterThanOrEqual(1);

await customProvider.clearQueue(testQueue);
await customProvider.disconnect();
Expand Down Expand Up @@ -1109,7 +1109,7 @@ describe("RedisTaskProvider", () => {
const stats = await customProvider.getQueueStats(testQueue);
expect(
stats.waiting + stats.deadLetter + stats.processing,
).toBeGreaterThanOrEqual(0);
).toBeGreaterThanOrEqual(1);

await customProvider.clearQueue(testQueue);
await customProvider.disconnect();
Expand Down Expand Up @@ -1173,7 +1173,7 @@ describe("RedisTaskProvider", () => {
// Either it's in DLQ, still processing, or waiting for retry
expect(
stats.deadLetter + stats.processing + stats.waiting,
).toBeGreaterThanOrEqual(0);
).toBeGreaterThanOrEqual(1);

await customProvider.clearQueue(testQueue);
await customProvider.disconnect();
Expand Down Expand Up @@ -1348,5 +1348,170 @@ describe("RedisTaskProvider", () => {
await customProvider.clearQueue(testQueue);
await customProvider.disconnect();
});

test("should handle extend resetting existing timeout handle", async () => {
// Covers lines 485-489 (reset timeout in extend when timeoutHandle exists)
const customProvider = new RedisTaskProvider({
timeout: 200,
pollInterval: 50,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

let completed = false;
const handler: TaskHandler = {
id: "test-handler",
handler: async (_task: Task, ctx: TaskContext) => {
// First extend sets a new timeout
await ctx.extend(500);
// Second extend resets the existing timeout handle
await ctx.extend(500);
await new Promise((resolve) => setTimeout(resolve, 100));
completed = true;
await ctx.ack();
},
};

await customProvider.dequeue(testQueue, handler);
await customProvider.enqueue(testQueue, { data: { message: "test" } });

await new Promise((resolve) => setTimeout(resolve, 300));

expect(completed).toBe(true);

await customProvider.clearQueue(testQueue);
await customProvider.disconnect();
});

test("should auto-reject and clear timeout on handler error", async () => {
// Covers lines 520-524 (catch block auto-reject + finally clearing timeout)
const customProvider = new RedisTaskProvider({
retries: 1,
pollInterval: 50,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

let handlerCalled = false;
const handler: TaskHandler = {
id: "test-handler",
handler: async () => {
handlerCalled = true;
throw new Error("Intentional handler error");
},
};

await customProvider.dequeue(testQueue, handler);
await customProvider.enqueue(testQueue, { data: { message: "test" } });

await new Promise((resolve) => setTimeout(resolve, 200));

expect(handlerCalled).toBe(true);

// Task should be in dead-letter queue after failing
const stats = await customProvider.getQueueStats(testQueue);
expect(stats).toEqual({
deadLetter: 1,
waiting: 0,
processing: 0,
});

await customProvider.clearQueue(testQueue);
await customProvider.disconnect();
});

test("should force disconnect with destroy", async () => {
// Covers lines 601-604 (force disconnect path)
const customProvider = new RedisTaskProvider();
await customProvider.connect();
await customProvider.clearQueue(testQueue);

await customProvider.disconnect(true);

// Client should be destroyed
expect(customProvider.taskHandlers.size).toBe(0);
});

test("should handle disconnect when not connected", async () => {
// Covers line 598 (connectionPromise is null)
const customProvider = new RedisTaskProvider();

// Disconnect without ever connecting - should not throw
await customProvider.disconnect();

expect(customProvider.taskHandlers.size).toBe(0);
});

test("should handle force disconnect when client is already closed", async () => {
// Covers lines 602-604 (force disconnect when client not open)
const customProvider = new RedisTaskProvider();
await customProvider.connect();

// Close normally first
const client = (customProvider as any)._client;
if (client.isOpen) {
await client.close();
}

// Force disconnect when already closed - should not throw
await customProvider.disconnect(true);

expect(customProvider.taskHandlers.size).toBe(0);
});

test("should handle graceful disconnect when client is already closed", async () => {
// Covers lines 606-608 (graceful disconnect when client not open)
const customProvider = new RedisTaskProvider();
await customProvider.connect();

// Close the client directly
const client = (customProvider as any)._client;
if (client.isOpen) {
await client.close();
}

// Graceful disconnect when already closed - should not throw
await customProvider.disconnect(false);

expect(customProvider.taskHandlers.size).toBe(0);
});

test("should return tasks from dead-letter queue", async () => {
// Covers lines 620-627 (getDeadLetterTasks iteration)
const customProvider = new RedisTaskProvider({
retries: 1,
pollInterval: 50,
});
customProvider.throwOnEmptyListeners = false;
await customProvider.connect();
await customProvider.clearQueue(testQueue);

const handler: TaskHandler = {
id: "test-handler",
handler: async (_task: Task, ctx: TaskContext) => {
await ctx.reject(false); // Send directly to DLQ
},
};

await customProvider.dequeue(testQueue, handler);

// Enqueue multiple tasks that will all go to DLQ
await customProvider.enqueue(testQueue, { data: { message: "dlq-1" } });
await customProvider.enqueue(testQueue, { data: { message: "dlq-2" } });

await new Promise((resolve) => setTimeout(resolve, 300));

const deadLetterTasks =
await customProvider.getDeadLetterTasks(testQueue);
expect(deadLetterTasks.length).toBe(2);
const messages = deadLetterTasks.map((t) => (t.data as any).message);
expect(messages).toContain("dlq-1");
expect(messages).toContain("dlq-2");

await customProvider.clearQueue(testQueue);
await customProvider.disconnect();
});
});
});
Loading