Skip to content

Commit cd6e704

Browse files
committed
fix: executeStream uses pull() with cancel() — proper backpressure and reader cleanup
The RemoteExecutor.executeStream() ReadableStream had two issues: 1. Used start() to eagerly drain the upstream — no backpressure, reader never released on error or early consumer cancellation. 2. Missing cancel() handler — if consumer broke out of `for await`, the upstream reader kept reading with no way to stop. Now uses pull()-based consumption (reads only when consumer requests) and cancel() that calls reader.cancel() to release the upstream lock.
1 parent fdf96a4 commit cd6e704

File tree

1 file changed

+26
-17
lines changed

1 file changed

+26
-17
lines changed

src/index.ts

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -274,25 +274,22 @@ class RemoteExecutor implements QueryExecutor {
274274
const byteStream = await rpc.streamRpc(query);
275275

276276
// Parse length-prefixed columnar binary frames into Row objects
277-
return new ReadableStream<Row>({
278-
async start(controller) {
279-
const reader = byteStream.getReader();
280-
let pending: Uint8Array = new Uint8Array(0);
277+
const reader = byteStream.getReader();
278+
let pending: Uint8Array = new Uint8Array(0);
279+
let cancelled = false;
281280

282-
const concat = (a: Uint8Array, b: Uint8Array): Uint8Array => {
283-
const out = new Uint8Array(a.length + b.length);
284-
out.set(a, 0);
285-
out.set(b, a.length);
286-
return out;
287-
};
281+
const concat = (a: Uint8Array, b: Uint8Array): Uint8Array => {
282+
const out = new Uint8Array(a.length + b.length);
283+
out.set(a, 0);
284+
out.set(b, a.length);
285+
return out;
286+
};
288287

288+
return new ReadableStream<Row>({
289+
async pull(controller) {
289290
try {
290-
while (true) {
291-
const { done, value } = await reader.read();
292-
if (done) break;
293-
pending = pending.length > 0 ? concat(pending, value) : value;
294-
295-
// Process complete frames
291+
while (!cancelled) {
292+
// Process any complete frames in pending buffer first
296293
while (pending.length >= 4) {
297294
const frameLen = new DataView(pending.buffer as ArrayBuffer, pending.byteOffset).getUint32(0, true);
298295
if (pending.length < 4 + frameLen) break; // wait for more data
@@ -306,13 +303,25 @@ class RemoteExecutor implements QueryExecutor {
306303
for (const row of decodeColumnarRun(frameBuf)) {
307304
controller.enqueue(row);
308305
}
306+
return; // yield back to consumer after enqueuing
309307
}
308+
309+
// Need more data from upstream
310+
const { done, value } = await reader.read();
311+
if (done) {
312+
controller.close();
313+
return;
314+
}
315+
pending = pending.length > 0 ? concat(pending, value) : value;
310316
}
311-
controller.close();
312317
} catch (err) {
313318
controller.error(err);
314319
}
315320
},
321+
cancel() {
322+
cancelled = true;
323+
reader.cancel().catch(() => {});
324+
},
316325
});
317326
}
318327

0 commit comments

Comments
 (0)