Skip to content

Commit 85ddd99

Browse files
committed
fix: dataset directory support for count/explain/cursor, filter+projection correctness
- Extract getOrLoadDataset() for reuse across count, explain, cursor, execute - Fix getOrLoadMeta to handle Lance dataset directories (was EISDIR) - Fix detectFileFormat to return "lance" for directories - Fix filter+select: fetch filter/sort columns even when not in projection, strip them from output rows after filtering - Fix cursor on dataset directories: fall back to execute+chunk - Fix explain fragment count for multi-fragment datasets
1 parent 8cf2de3 commit 85ddd99

File tree

1 file changed

+142
-93
lines changed

1 file changed

+142
-93
lines changed

src/local-executor.ts

Lines changed: 142 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ export class LocalExecutor implements QueryExecutor {
217217
estimatedBytes,
218218
estimatedR2Reads: coalesced.length,
219219
estimatedRows,
220-
fragments: 1,
220+
fragments: this.datasetCache.get(query.table)?.fragmentMetas.size ?? 1,
221221
filters: query.filters.map(f => ({
222222
column: f.column,
223223
op: f.op,
@@ -240,6 +240,7 @@ export class LocalExecutor implements QueryExecutor {
240240
}
241241
const fs = await import("node:fs/promises");
242242
const stat = await fs.stat(table);
243+
if (stat.isDirectory()) return "lance"; // Lance dataset directories are always Lance format
243244
const fileSize = Number(stat.size);
244245
const tailSize = Math.min(fileSize, FOOTER_SIZE);
245246
const handle = await fs.open(table, "r");
@@ -254,17 +255,30 @@ export class LocalExecutor implements QueryExecutor {
254255
}
255256

256257
async *cursor(query: QueryDescriptor, batchSize: number): AsyncIterable<Row[]> {
258+
const isUrl = query.table.startsWith("http://") || query.table.startsWith("https://");
259+
260+
// For dataset directories or sorted queries, buffer via execute then chunk
261+
if (!isUrl) {
262+
const fs = await import("node:fs/promises");
263+
const stat = await fs.stat(query.table).catch(() => null);
264+
if (stat?.isDirectory()) {
265+
const result = await this.execute(query);
266+
for (let i = 0; i < result.rows.length; i += batchSize) {
267+
yield result.rows.slice(i, i + batchSize);
268+
}
269+
return;
270+
}
271+
}
272+
257273
const meta = await this.getOrLoadMeta(query.table);
258274
const { columns } = meta;
259275
const projectedColumns = query.projections.length > 0
260276
? columns.filter(c => query.projections.includes(c.name))
261277
: columns;
262278

263-
const isUrl = query.table.startsWith("http://") || query.table.startsWith("https://");
264279
const firstCol = projectedColumns[0];
265280
if (!firstCol) return;
266281

267-
// If sorted, must buffer all rows then chunk
268282
if (query.sortColumn) {
269283
const result = await this.execute(query);
270284
for (let i = 0; i < result.rows.length; i += batchSize) {
@@ -340,11 +354,27 @@ export class LocalExecutor implements QueryExecutor {
340354
}
341355
}
342356

343-
/** Get or load table metadata (footer + columns). Caches in-memory. */
357+
/** Get or load table metadata (footer + columns). Caches in-memory. Handles both files and dataset directories. */
344358
private async getOrLoadMeta(table: string): Promise<{ columns: ColumnMeta[]; fileSize: number }> {
345359
let cached = this.metaCache.get(table);
346360
if (cached) return cached;
347361
const isUrl = table.startsWith("http://") || table.startsWith("https://");
362+
if (!isUrl) {
363+
const fs = await import("node:fs/promises");
364+
const stat = await fs.stat(table).catch((err: unknown) => {
365+
throw QueryModeError.from(err, { table });
366+
});
367+
if (stat.isDirectory()) {
368+
// Lance dataset directory — load via executeDatasetQuery path to populate cache
369+
const dataset = await this.getOrLoadDataset(table);
370+
const firstMeta = dataset.fragmentMetas.values().next().value;
371+
const columns = firstMeta?.columns ?? [];
372+
const fileSize = Number(firstMeta?.fileSize ?? 0n);
373+
cached = { columns, fileSize };
374+
this.metaCache.set(table, cached);
375+
return cached;
376+
}
377+
}
348378
cached = isUrl ? await this.loadMetaFromUrl(table) : await this.loadMetaFromFile(table);
349379
this.metaCache.set(table, cached);
350380
if (this.metaCache.size > 1000) {
@@ -414,11 +444,11 @@ export class LocalExecutor implements QueryExecutor {
414444
throw new QueryModeError("COLUMN_NOT_FOUND", `Sort column "${query.sortColumn}" not found in ${query.table}. Available: ${[...columnNames].join(", ")}`);
415445
}
416446

417-
// Step 2: Determine projected columns
418-
const projectedColumns =
419-
query.projections.length > 0
420-
? columns.filter((c) => query.projections.includes(c.name))
421-
: columns;
447+
// Step 2: Determine columns to fetch (projection + filter + sort columns)
448+
const neededColumns = new Set(query.projections.length > 0 ? query.projections : columns.map(c => c.name));
449+
for (const f of query.filters) neededColumns.add(f.column);
450+
if (query.sortColumn) neededColumns.add(query.sortColumn);
451+
const projectedColumns = columns.filter(c => neededColumns.has(c.name));
422452

423453
// Step 3: Determine which pages to fetch using filter pushdown
424454
const pageRanges: { column: string; offset: bigint; length: number }[] = [];
@@ -508,113 +538,121 @@ export class LocalExecutor implements QueryExecutor {
508538
}
509539
}
510540

541+
// Strip columns that were fetched for filter/sort but not in user's projection
542+
const outputColumns = query.projections.length > 0 ? query.projections : projectedColumns.map(c => c.name);
543+
if (query.projections.length > 0) {
544+
const keep = new Set(query.projections);
545+
for (const row of rows) {
546+
for (const key of Object.keys(row)) {
547+
if (!keep.has(key)) delete row[key];
548+
}
549+
}
550+
}
551+
511552
const result: QueryResult = {
512553
rows,
513554
rowCount: rows.length,
514-
columns: projectedColumns.map((c) => c.name),
555+
columns: outputColumns,
515556
bytesRead,
516557
pagesSkipped,
517558
durationMs: Date.now() - startTime,
518559
cacheHit: false,
519560
};
520561

521-
// Store in result cache if TTL specified
522562
if (query.cacheTTL && !query.vectorSearch) {
523563
this.resultCache.setWithTTL(this.queryCacheKey(query), result, query.cacheTTL);
524564
}
525565

526566
return result;
527567
}
528568

529-
/** Execute a query against a multi-fragment Lance dataset directory. */
530-
private async executeDatasetQuery(query: QueryDescriptor, startTime: number): Promise<QueryResult> {
569+
/** Load or retrieve cached dataset metadata for a Lance directory. */
570+
private async getOrLoadDataset(table: string): Promise<DatasetMeta> {
571+
let dataset = this.datasetCache.get(table);
572+
if (dataset) return dataset;
573+
531574
const fs = await import("node:fs/promises");
532575
const pathMod = await import("node:path");
533576

534-
// Discover or reuse cached dataset metadata
535-
let dataset = this.datasetCache.get(query.table);
536-
if (!dataset) {
537-
// Find latest manifest
538-
const versionsDir = pathMod.join(query.table, "_versions");
539-
const entries = await fs.readdir(versionsDir).catch(() => [] as string[]);
540-
const manifests = entries.filter(e => e.endsWith(".manifest")).sort();
541-
if (manifests.length === 0) {
542-
throw new Error(`No manifests found in ${versionsDir}`);
543-
}
577+
const versionsDir = pathMod.join(table, "_versions");
578+
const entries = await fs.readdir(versionsDir).catch(() => [] as string[]);
579+
const manifests = entries.filter(e => e.endsWith(".manifest")).sort();
580+
if (manifests.length === 0) {
581+
throw new Error(`No manifests found in ${versionsDir}`);
582+
}
544583

545-
const latestManifest = manifests[manifests.length - 1];
546-
const manifestBuf = await fs.readFile(pathMod.join(versionsDir, latestManifest));
547-
const ab = manifestBuf.buffer.slice(manifestBuf.byteOffset, manifestBuf.byteOffset + manifestBuf.byteLength);
548-
const manifest = parseManifest(ab);
549-
if (!manifest) throw new Error(`Failed to parse manifest ${latestManifest}`);
550-
551-
// Read footer + column metadata for each fragment
552-
const fragmentMetas = new Map<number, TableMeta>();
553-
for (const frag of manifest.fragments) {
554-
// Lance stores relative paths without data/ prefix — try both
555-
let fragPath = pathMod.join(query.table, frag.filePath);
556-
try { await fs.stat(fragPath); } catch {
557-
fragPath = pathMod.join(query.table, "data", frag.filePath);
558-
}
559-
try {
560-
const cachedMeta = await this.loadMetaFromFile(fragPath);
561-
let { columns } = cachedMeta;
562-
563-
// Always try v2 parser with manifest schema for Lance files.
564-
// The schema provides correct column names and types that the
565-
// no-schema fallback in loadMetaFromFile cannot determine.
566-
const isLanceV2 = columns.some(c => c.name.startsWith("column_")) || columns.every(c => c.dtype === "int64");
567-
if (isLanceV2 || !columns.some(c => c.pages.length > 0)) {
568-
const fragBuf = await fs.readFile(fragPath);
569-
const fragAb = fragBuf.buffer.slice(fragBuf.byteOffset, fragBuf.byteOffset + fragBuf.byteLength);
570-
const v2Cols = parseLanceV2Columns(fragAb, manifest.schema, frag.physicalRows);
571-
if (v2Cols && v2Cols.length > 0) {
572-
columns = lanceV2ToColumnMeta(v2Cols);
573-
}
584+
const latestManifest = manifests[manifests.length - 1];
585+
const manifestBuf = await fs.readFile(pathMod.join(versionsDir, latestManifest));
586+
const ab = manifestBuf.buffer.slice(manifestBuf.byteOffset, manifestBuf.byteOffset + manifestBuf.byteLength);
587+
const manifest = parseManifest(ab);
588+
if (!manifest) throw new Error(`Failed to parse manifest ${latestManifest}`);
589+
590+
const fragmentMetas = new Map<number, TableMeta>();
591+
for (const frag of manifest.fragments) {
592+
let fragPath = pathMod.join(table, frag.filePath);
593+
try { await fs.stat(fragPath); } catch {
594+
fragPath = pathMod.join(table, "data", frag.filePath);
595+
}
596+
try {
597+
const cachedMeta = await this.loadMetaFromFile(fragPath);
598+
let { columns } = cachedMeta;
599+
600+
const isLanceV2 = columns.some(c => c.name.startsWith("column_")) || columns.every(c => c.dtype === "int64");
601+
if (isLanceV2 || !columns.some(c => c.pages.length > 0)) {
602+
const fragBuf = await fs.readFile(fragPath);
603+
const fragAb = fragBuf.buffer.slice(fragBuf.byteOffset, fragBuf.byteOffset + fragBuf.byteLength);
604+
const v2Cols = parseLanceV2Columns(fragAb, manifest.schema, frag.physicalRows);
605+
if (v2Cols && v2Cols.length > 0) {
606+
columns = lanceV2ToColumnMeta(v2Cols);
574607
}
608+
}
575609

576-
// Read the actual footer from the fragment file
577-
const fragStat = await fs.stat(fragPath);
578-
const fragHandle = await fs.open(fragPath, "r");
579-
let footer: import("./types.js").Footer;
580-
try {
581-
const footerBuf = Buffer.alloc(FOOTER_SIZE);
582-
await fragHandle.read(footerBuf, 0, FOOTER_SIZE, fragStat.size - FOOTER_SIZE);
583-
const footerAb = footerBuf.buffer.slice(footerBuf.byteOffset, footerBuf.byteOffset + footerBuf.byteLength);
584-
const parsed = parseFooter(footerAb);
585-
if (!parsed) continue;
586-
footer = parsed;
587-
} finally {
588-
await fragHandle.close();
589-
}
590-
fragmentMetas.set(frag.id, {
591-
name: frag.filePath,
592-
footer,
593-
columns,
594-
totalRows: frag.physicalRows,
595-
fileSize: BigInt(cachedMeta.fileSize),
596-
r2Key: fragPath,
597-
updatedAt: Date.now(),
598-
});
599-
} catch (err) {
600-
throw new Error(`Failed to load fragment ${frag.filePath}: ${err instanceof Error ? err.message : String(err)}`);
610+
const fragStat = await fs.stat(fragPath);
611+
const fragHandle = await fs.open(fragPath, "r");
612+
let footer: import("./types.js").Footer;
613+
try {
614+
const footerBuf = Buffer.alloc(FOOTER_SIZE);
615+
await fragHandle.read(footerBuf, 0, FOOTER_SIZE, fragStat.size - FOOTER_SIZE);
616+
const footerAb = footerBuf.buffer.slice(footerBuf.byteOffset, footerBuf.byteOffset + footerBuf.byteLength);
617+
const parsed = parseFooter(footerAb);
618+
if (!parsed) continue;
619+
footer = parsed;
620+
} finally {
621+
await fragHandle.close();
601622
}
623+
fragmentMetas.set(frag.id, {
624+
name: frag.filePath,
625+
footer,
626+
columns,
627+
totalRows: frag.physicalRows,
628+
fileSize: BigInt(cachedMeta.fileSize),
629+
r2Key: fragPath,
630+
updatedAt: Date.now(),
631+
});
632+
} catch (err) {
633+
throw new Error(`Failed to load fragment ${frag.filePath}: ${err instanceof Error ? err.message : String(err)}`);
602634
}
635+
}
603636

604-
dataset = {
605-
name: query.table,
606-
r2Prefix: query.table + "/",
607-
manifest,
608-
fragmentMetas,
609-
totalRows: manifest.totalRows,
610-
updatedAt: Date.now(),
611-
};
612-
this.datasetCache.set(query.table, dataset);
613-
if (this.datasetCache.size > 100) {
614-
const firstKey = this.datasetCache.keys().next().value;
615-
if (firstKey) this.datasetCache.delete(firstKey);
616-
}
637+
dataset = {
638+
name: table,
639+
r2Prefix: table + "/",
640+
manifest,
641+
fragmentMetas,
642+
totalRows: manifest.totalRows,
643+
updatedAt: Date.now(),
644+
};
645+
this.datasetCache.set(table, dataset);
646+
if (this.datasetCache.size > 100) {
647+
const firstKey = this.datasetCache.keys().next().value;
648+
if (firstKey) this.datasetCache.delete(firstKey);
617649
}
650+
return dataset;
651+
}
652+
653+
/** Execute a query against a multi-fragment Lance dataset directory. */
654+
private async executeDatasetQuery(query: QueryDescriptor, startTime: number): Promise<QueryResult> {
655+
const dataset = await this.getOrLoadDataset(query.table);
618656

619657
// Validate column references against schema
620658
const schemaColumnNames = new Set(dataset.manifest.schema.map(f => f.name));
@@ -639,9 +677,10 @@ export class LocalExecutor implements QueryExecutor {
639677
let totalPagesSkipped = 0;
640678

641679
for (const [, meta] of dataset.fragmentMetas) {
642-
const projectedColumns = query.projections.length > 0
643-
? meta.columns.filter(c => query.projections.includes(c.name))
644-
: meta.columns;
680+
const neededCols = new Set(query.projections.length > 0 ? query.projections : meta.columns.map(c => c.name));
681+
for (const f of query.filters) neededCols.add(f.column);
682+
if (query.sortColumn) neededCols.add(query.sortColumn);
683+
const projectedColumns = meta.columns.filter(c => neededCols.has(c.name));
645684

646685
const pageRanges: { column: string; offset: bigint; length: number }[] = [];
647686
for (const col of projectedColumns) {
@@ -702,6 +741,16 @@ export class LocalExecutor implements QueryExecutor {
702741
}
703742
}
704743

744+
// Strip columns fetched for filter/sort but not in user's projection
745+
if (query.projections.length > 0) {
746+
const keep = new Set(query.projections);
747+
for (const row of allRows) {
748+
for (const key of Object.keys(row)) {
749+
if (!keep.has(key)) delete row[key];
750+
}
751+
}
752+
}
753+
705754
return {
706755
rows: allRows,
707756
rowCount: allRows.length,

0 commit comments

Comments
 (0)