Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/deno/src/utils/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ function monitorStream(
onDone: () => void,
): ReadableStream<Uint8Array<ArrayBufferLike>> {
const reader = stream.getReader();
// oxlint-disable-next-line typescript/no-floating-promises
reader.closed.finally(() => onDone());
reader.closed.then(() => onDone(), () => onDone());
return new ReadableStream({
async start(controller) {
let result: ReadableStreamReadResult<Uint8Array<ArrayBufferLike>>;
Expand Down
88 changes: 88 additions & 0 deletions packages/deno/test/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// <reference lib="deno.ns" />

import { assertEquals } from 'https://deno.land/std@0.212.0/assert/mod.ts';

/**
* Minimal reproduction of monitorStream to verify that reader.releaseLock()
* after stream completion does not cause an unhandled promise rejection.
*
* Per the WHATWG Streams spec, releaseLock() rejects reader.closed.
* Using .then(onDone, onDone) handles both the fulfilled and rejected cases
* so the rejection is suppressed.
*/
function monitorStream(
stream: ReadableStream<Uint8Array>,
onDone: () => void,
): ReadableStream<Uint8Array> {
const reader = stream.getReader();
reader.closed.then(() => onDone(), () => onDone());
return new ReadableStream({
async start(controller) {
let result: ReadableStreamReadResult<Uint8Array>;
do {
result = await reader.read();
if (result.value) {
try {
controller.enqueue(result.value);
} catch (er) {
controller.error(er);
reader.releaseLock();
return;
}
}
} while (!result.done);
controller.close();
reader.releaseLock();
},
});
}

Deno.test('monitorStream calls onDone and does not cause unhandled rejection after normal completion', async () => {
let doneCalled = false;

const source = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode('chunk1'));
controller.enqueue(new TextEncoder().encode('chunk2'));
controller.close();
},
});

const monitored = monitorStream(source, () => {
doneCalled = true;
});

// Listen for unhandled rejections — the bug caused one here.
let unhandledRejection: PromiseRejectionEvent | undefined;
const handler = (e: PromiseRejectionEvent): void => {
e.preventDefault();
unhandledRejection = e;
};
globalThis.addEventListener('unhandledrejection', handler);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhandled rejection listener registered after triggering action

Low Severity

The unhandledrejection event listener is registered on line 46, after reader.releaseLock() on line 39 which is the action that would trigger the rejection. If the fix were ever reverted to .finally(), the unhandled rejection event could fire before the listener is attached, causing the assertEquals(unhandledRejection, undefined, ...) assertion to pass incorrectly — hiding the regression the test is meant to catch. The listener needs to be set up before reader.releaseLock() to reliably detect unhandled rejections. Also, the test uses setTimeout(resolve, 50) which is a sleep-in-test pattern; a more deterministic signal would be preferable.

Fix in Cursor Fix in Web

Triggered by project rule: PR Review Guidelines for Cursor Bot

Reviewed by Cursor Bugbot for commit 7f65c39. Configure here.


try {
const reader = monitored.getReader();
const chunks: string[] = [];
const decoder = new TextDecoder();

let result: ReadableStreamReadResult<Uint8Array>;
do {
result = await reader.read();
if (result.value) {
chunks.push(decoder.decode(result.value));
}
} while (!result.done);
reader.releaseLock();

assertEquals(chunks, ['chunk1', 'chunk2']);

// Give microtasks a chance to settle so any unhandled rejection fires.
await new Promise(resolve => setTimeout(resolve, 50));

assertEquals(doneCalled, true, 'onDone callback should have been called');
assertEquals(unhandledRejection, undefined, 'should not have caused an unhandled promise rejection');
} finally {
globalThis.removeEventListener('unhandledrejection', handler);
}
});
Comment thread
cursor[bot] marked this conversation as resolved.

Loading