Skip to content

Commit 45f6b7d

Browse files
committed
perf: cache file handles in FragmentSource — eliminate open/close per page read
readPage opened and closed a file handle for every page read, causing potentially 1000+ open/close syscalls per query on multi-page files. Now lazily caches the handle on first read and closes via the new FragmentSource.close() method, called by ScanOperator.close() which is invoked by drainPipeline's finally block.
1 parent 86c40ae commit 45f6b7d

File tree

2 files changed

+21
-17
lines changed

2 files changed

+21
-17
lines changed

src/local-executor.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -301,17 +301,17 @@ export class LocalExecutor implements QueryExecutor {
301301
};
302302
}
303303
const fsMod = await import("node:fs/promises");
304+
let cachedHandle: import("node:fs/promises").FileHandle | null = null;
304305
return {
305306
columns: projectedColumns,
306307
async readPage(_col: ColumnMeta, page: PageInfo): Promise<ArrayBuffer> {
307-
const handle = await fsMod.open(table, "r");
308-
try {
309-
const buf = Buffer.alloc(page.byteLength);
310-
await handle.read(buf, 0, page.byteLength, Number(page.byteOffset));
311-
return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
312-
} finally {
313-
await handle.close();
314-
}
308+
if (!cachedHandle) cachedHandle = await fsMod.open(table, "r");
309+
const buf = Buffer.alloc(page.byteLength);
310+
await cachedHandle.read(buf, 0, page.byteLength, Number(page.byteOffset));
311+
return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
312+
},
313+
async close(): Promise<void> {
314+
if (cachedHandle) { await cachedHandle.close(); cachedHandle = null; }
315315
},
316316
};
317317
}
@@ -899,17 +899,17 @@ export class LocalExecutor implements QueryExecutor {
899899
const projectedColumns = meta.columns.filter(c => neededCols.has(c.name));
900900
const filePath = meta.r2Key;
901901

902+
let cachedHandle: import("node:fs/promises").FileHandle | null = null;
902903
fragments.push({
903904
columns: projectedColumns,
904905
async readPage(_col: ColumnMeta, page: PageInfo): Promise<ArrayBuffer> {
905-
const handle = await fsMod.open(filePath, "r");
906-
try {
907-
const buf = Buffer.alloc(page.byteLength);
908-
await handle.read(buf, 0, page.byteLength, Number(page.byteOffset));
909-
return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
910-
} finally {
911-
await handle.close();
912-
}
906+
if (!cachedHandle) cachedHandle = await fsMod.open(filePath, "r");
907+
const buf = Buffer.alloc(page.byteLength);
908+
await cachedHandle.read(buf, 0, page.byteLength, Number(page.byteOffset));
909+
return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
910+
},
911+
async close(): Promise<void> {
912+
if (cachedHandle) { await cachedHandle.close(); cachedHandle = null; }
913913
},
914914
});
915915
}

src/operators.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ export interface FragmentSource {
9090
columns: ColumnMeta[];
9191
/** Read page data from the file. */
9292
readPage(col: ColumnMeta, page: PageInfo): Promise<ArrayBuffer>;
93+
/** Optional: release resources (e.g., file handles) when scanning is complete. */
94+
close?(): Promise<void>;
9395
}
9496

9597
/** Check if a page can be skipped by checking ALL filter columns' min/max stats.
@@ -383,7 +385,9 @@ export class ScanOperator implements Operator {
383385
return materializeRows(batch);
384386
}
385387

386-
async close(): Promise<void> { /* no-op */ }
388+
async close(): Promise<void> {
389+
for (const frag of this.fragments) await frag.close?.();
390+
}
387391
}
388392

389393
// ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)