From f21ef57c466effd0779db6f81ce29de16d8b2384 Mon Sep 17 00:00:00 2001 From: Andrei Borza Date: Fri, 10 Apr 2026 16:34:05 +0900 Subject: [PATCH 1/2] fix(deno): Handle `reader.closed` rejection from `releaseLock()` in streaming Replace `reader.closed.finally(() => onDone())` with `reader.closed.then(() => onDone(), () => onDone())` in `monitorStream`. Per the WHATWG Streams spec, `reader.releaseLock()` rejects `reader.closed` when the promise is still pending. `.finally()` propagates that rejection as an unhandled promise rejection, while `.then(f, f)` suppresses it by handling both the fulfilled and rejected cases. Closes: #20177 --- packages/deno/src/utils/streaming.ts | 3 +- packages/deno/test/streaming.test.ts | 88 ++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 packages/deno/test/streaming.test.ts diff --git a/packages/deno/src/utils/streaming.ts b/packages/deno/src/utils/streaming.ts index 045a104c5e93..1b2254e61d2d 100644 --- a/packages/deno/src/utils/streaming.ts +++ b/packages/deno/src/utils/streaming.ts @@ -81,8 +81,7 @@ function monitorStream( onDone: () => void, ): ReadableStream> { const reader = stream.getReader(); - // oxlint-disable-next-line typescript/no-floating-promises - reader.closed.finally(() => onDone()); + reader.closed.then(() => onDone(), () => onDone()); return new ReadableStream({ async start(controller) { let result: ReadableStreamReadResult>; diff --git a/packages/deno/test/streaming.test.ts b/packages/deno/test/streaming.test.ts new file mode 100644 index 000000000000..11b86541e956 --- /dev/null +++ b/packages/deno/test/streaming.test.ts @@ -0,0 +1,88 @@ +// + +import { assertEquals } from 'https://deno.land/std@0.212.0/assert/mod.ts'; + +/** + * Minimal reproduction of monitorStream to verify that reader.releaseLock() + * after stream completion does not cause an unhandled promise rejection. + * + * Per the WHATWG Streams spec, releaseLock() rejects reader.closed. + * Using .then(onDone, onDone) handles both the fulfilled and rejected cases + * so the rejection is suppressed. + */ +function monitorStream( + stream: ReadableStream, + onDone: () => void, +): ReadableStream { + const reader = stream.getReader(); + reader.closed.then(() => onDone(), () => onDone()); + return new ReadableStream({ + async start(controller) { + let result: ReadableStreamReadResult; + do { + result = await reader.read(); + if (result.value) { + try { + controller.enqueue(result.value); + } catch (er) { + controller.error(er); + reader.releaseLock(); + return; + } + } + } while (!result.done); + controller.close(); + reader.releaseLock(); + }, + }); +} + +Deno.test('monitorStream calls onDone and does not cause unhandled rejection after normal completion', async () => { + let doneCalled = false; + + const source = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + controller.enqueue(new TextEncoder().encode('chunk2')); + controller.close(); + }, + }); + + const monitored = monitorStream(source, () => { + doneCalled = true; + }); + + // Listen for unhandled rejections — the bug caused one here. + let unhandledRejection: PromiseRejectionEvent | undefined; + const handler = (e: PromiseRejectionEvent): void => { + e.preventDefault(); + unhandledRejection = e; + }; + globalThis.addEventListener('unhandledrejection', handler); + + try { + const reader = monitored.getReader(); + const chunks: string[] = []; + const decoder = new TextDecoder(); + + let result: ReadableStreamReadResult; + do { + result = await reader.read(); + if (result.value) { + chunks.push(decoder.decode(result.value)); + } + } while (!result.done); + reader.releaseLock(); + + assertEquals(chunks, ['chunk1', 'chunk2']); + + // Give microtasks a chance to settle so any unhandled rejection fires. + await new Promise(resolve => setTimeout(resolve, 50)); + + assertEquals(doneCalled, true, 'onDone callback should have been called'); + assertEquals(unhandledRejection, undefined, 'should not have caused an unhandled promise rejection'); + } finally { + globalThis.removeEventListener('unhandledrejection', handler); + } +}); + From dccdf31218f5b70f2f869dc45b58ae1726f086a5 Mon Sep 17 00:00:00 2001 From: Andrei Borza Date: Fri, 10 Apr 2026 17:08:39 +0900 Subject: [PATCH 2/2] Tests --- packages/deno/src/utils/streaming.ts | 5 +- packages/deno/test/streaming.test.ts | 92 +++++++++------------------- 2 files changed, 34 insertions(+), 63 deletions(-) diff --git a/packages/deno/src/utils/streaming.ts b/packages/deno/src/utils/streaming.ts index 1b2254e61d2d..35233f9b1a5f 100644 --- a/packages/deno/src/utils/streaming.ts +++ b/packages/deno/src/utils/streaming.ts @@ -81,7 +81,10 @@ function monitorStream( onDone: () => void, ): ReadableStream> { const reader = stream.getReader(); - reader.closed.then(() => onDone(), () => onDone()); + reader.closed.then( + () => onDone(), + () => onDone(), + ); return new ReadableStream({ async start(controller) { let result: ReadableStreamReadResult>; diff --git a/packages/deno/test/streaming.test.ts b/packages/deno/test/streaming.test.ts index 11b86541e956..d9849ece3c76 100644 --- a/packages/deno/test/streaming.test.ts +++ b/packages/deno/test/streaming.test.ts @@ -2,57 +2,42 @@ import { assertEquals } from 'https://deno.land/std@0.212.0/assert/mod.ts'; -/** - * Minimal reproduction of monitorStream to verify that reader.releaseLock() - * after stream completion does not cause an unhandled promise rejection. - * - * Per the WHATWG Streams spec, releaseLock() rejects reader.closed. - * Using .then(onDone, onDone) handles both the fulfilled and rejected cases - * so the rejection is suppressed. - */ -function monitorStream( - stream: ReadableStream, - onDone: () => void, -): ReadableStream { - const reader = stream.getReader(); - reader.closed.then(() => onDone(), () => onDone()); - return new ReadableStream({ - async start(controller) { - let result: ReadableStreamReadResult; - do { - result = await reader.read(); - if (result.value) { - try { - controller.enqueue(result.value); - } catch (er) { - controller.error(er); - reader.releaseLock(); - return; - } - } - } while (!result.done); - controller.close(); - reader.releaseLock(); - }, - }); -} +Deno.test('reader.closed.then(f, f) suppresses rejection when releaseLock is called on an open stream', async () => { + // Reproduces the bug from GitHub issue #20177: + // In monitorStream, reader.releaseLock() is called while the source stream + // is still open (e.g. the error path when controller.enqueue() throws). + // Per WHATWG Streams spec, this rejects reader.closed with a TypeError. + // Using .then(onDone, onDone) handles both cases; .finally() would propagate + // the rejection as unhandled. -Deno.test('monitorStream calls onDone and does not cause unhandled rejection after normal completion', async () => { - let doneCalled = false; + let onDoneCalled = false; - const source = new ReadableStream({ + const stream = new ReadableStream({ start(controller) { - controller.enqueue(new TextEncoder().encode('chunk1')); - controller.enqueue(new TextEncoder().encode('chunk2')); - controller.close(); + controller.enqueue(new TextEncoder().encode('data')); + // intentionally not closing — stream stays open }, }); - const monitored = monitorStream(source, () => { - doneCalled = true; - }); + const reader = stream.getReader(); + + // This is the exact pattern from monitorStream (line 84 in streaming.ts). + // With .finally(() => onDone()), this would propagate the rejection. + reader.closed.then( + () => { + onDoneCalled = true; + }, + () => { + onDoneCalled = true; + }, + ); + + await reader.read(); + + // This is what monitorStream does on the error path (line 98) when + // controller.enqueue() throws — releaseLock while the source is still open. + reader.releaseLock(); - // Listen for unhandled rejections — the bug caused one here. let unhandledRejection: PromiseRejectionEvent | undefined; const handler = (e: PromiseRejectionEvent): void => { e.preventDefault(); @@ -61,28 +46,11 @@ Deno.test('monitorStream calls onDone and does not cause unhandled rejection aft globalThis.addEventListener('unhandledrejection', handler); try { - const reader = monitored.getReader(); - const chunks: string[] = []; - const decoder = new TextDecoder(); - - let result: ReadableStreamReadResult; - do { - result = await reader.read(); - if (result.value) { - chunks.push(decoder.decode(result.value)); - } - } while (!result.done); - reader.releaseLock(); - - assertEquals(chunks, ['chunk1', 'chunk2']); - - // Give microtasks a chance to settle so any unhandled rejection fires. await new Promise(resolve => setTimeout(resolve, 50)); - assertEquals(doneCalled, true, 'onDone callback should have been called'); + assertEquals(onDoneCalled, true, 'onDone should have been called via the rejection handler'); assertEquals(unhandledRejection, undefined, 'should not have caused an unhandled promise rejection'); } finally { globalThis.removeEventListener('unhandledrejection', handler); } }); -