Skip to content

Commit ea9ae6d

Browse files
committed
feat: add lazy materialization, result caching, and query plan inspection
- Add .count(), .exists(), .first() short-circuit terminals that avoid full row materialization (count with no filters is zero I/O) - Add .explain() for query plan inspection without execution - Add .cache({ ttl }) for query result caching with TTL and auto- invalidation on table updates (VipCache extended with TTL support) - Add .cursor({ batchSize }) for lazy page-at-a-time batch iteration with early termination and backpressure support
1 parent 377ec35 commit ea9ae6d

File tree

5 files changed

+521
-17
lines changed

5 files changed

+521
-17
lines changed

src/client.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type {
22
AggregateOp,
33
AppendResult,
4+
ExplainResult,
45
FilterOp,
56
QueryResult,
67
Row,
@@ -29,6 +30,7 @@ export class TableQuery {
2930
private _vectorSearch?: VectorSearchParams;
3031
private _aggregates: AggregateOp[] = [];
3132
private _groupBy: string[] = [];
33+
private _cacheTTL?: number;
3234
private _executor: QueryExecutor;
3335

3436
constructor(table: string, executor: QueryExecutor) {
@@ -84,6 +86,56 @@ export class TableQuery {
8486
return this;
8587
}
8688

89+
/** Enable query result caching with a TTL in milliseconds. Remote executor only. */
90+
cache(opts: { ttl: number }): this {
91+
this._cacheTTL = opts.ttl;
92+
return this;
93+
}
94+
95+
/** Return the count of matching rows without full materialization. */
96+
async count(): Promise<number> {
97+
if (this._executor.count) return this._executor.count(this.toDescriptor());
98+
const desc = this.toDescriptor();
99+
desc.aggregates = [{ fn: "count", column: "*" }];
100+
const result = await this._executor.execute(desc);
101+
return (result.rows[0]?.["count_*"] as number) ?? 0;
102+
}
103+
104+
/** Return true if at least one row matches. */
105+
async exists(): Promise<boolean> {
106+
if (this._executor.exists) return this._executor.exists(this.toDescriptor());
107+
const desc = this.toDescriptor();
108+
desc.limit = 1;
109+
if (desc.projections.length === 0) desc.projections = [];
110+
const result = await this._executor.execute(desc);
111+
return result.rowCount > 0;
112+
}
113+
114+
/** Return the first matching row, or null. */
115+
async first(): Promise<Row | null> {
116+
if (this._executor.first) return this._executor.first(this.toDescriptor());
117+
const desc = this.toDescriptor();
118+
desc.limit = 1;
119+
const result = await this._executor.execute(desc);
120+
return result.rows[0] ?? null;
121+
}
122+
123+
/** Inspect the query plan without executing. No data I/O is performed. */
124+
async explain(): Promise<ExplainResult> {
125+
if (!this._executor.explain) {
126+
throw new Error("explain() requires an executor with plan inspection support");
127+
}
128+
return this._executor.explain(this.toDescriptor());
129+
}
130+
131+
/** Iterate over results in batches. Processes pages lazily — stops when consumer breaks. */
132+
cursor(opts?: { batchSize?: number }): AsyncIterable<Row[]> {
133+
if (!this._executor.cursor) {
134+
throw new Error("cursor() requires an executor with cursor support");
135+
}
136+
return this._executor.cursor(this.toDescriptor(), opts?.batchSize ?? 1000);
137+
}
138+
87139
/** Append rows to this table. Uses CAS coordination for safe concurrent writes. */
88140
async append(rows: Record<string, unknown>[]): Promise<AppendResult> {
89141
if (!this._executor.append) {
@@ -116,6 +168,7 @@ export class TableQuery {
116168
vectorSearch: this._vectorSearch,
117169
aggregates: this._aggregates.length > 0 ? this._aggregates : undefined,
118170
groupBy: this._groupBy.length > 0 ? this._groupBy : undefined,
171+
cacheTTL: this._cacheTTL,
119172
};
120173
}
121174
}
@@ -131,6 +184,7 @@ export interface QueryDescriptor {
131184
vectorSearch?: VectorSearchParams;
132185
aggregates?: AggregateOp[];
133186
groupBy?: string[];
187+
cacheTTL?: number;
134188
}
135189

136190
/** Interface for query execution backends (local, DO, browser) */
@@ -140,4 +194,14 @@ export interface QueryExecutor {
140194
append?(table: string, rows: Record<string, unknown>[]): Promise<AppendResult>;
141195
/** Optional: streaming execution (available on remote executors) */
142196
executeStream?(query: QueryDescriptor): Promise<ReadableStream<Row>>;
197+
/** Optional: count without full materialization */
198+
count?(query: QueryDescriptor): Promise<number>;
199+
/** Optional: existence check with limit(1) */
200+
exists?(query: QueryDescriptor): Promise<boolean>;
201+
/** Optional: return first matching row */
202+
first?(query: QueryDescriptor): Promise<Row | null>;
203+
/** Optional: plan inspection without execution */
204+
explain?(query: QueryDescriptor): Promise<ExplainResult>;
205+
/** Optional: lazy batch iteration */
206+
cursor?(query: QueryDescriptor, batchSize: number): AsyncIterable<Row[]>;
143207
}

src/index.ts

Lines changed: 211 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { TableQuery } from "./client.js";
22
import type { QueryDescriptor, QueryExecutor } from "./client.js";
3-
import type { AppendResult, ColumnMeta, Env, QueryResult, Row, TableMeta, DatasetMeta } from "./types.js";
3+
import type { AppendResult, ColumnMeta, DataType, Env, ExplainResult, QueryResult, Row, TableMeta, DatasetMeta } from "./types.js";
44
import { parseFooter, parseColumnMetaFromProtobuf, FOOTER_SIZE } from "./footer.js";
55
import { parseManifest, type ManifestInfo } from "./manifest.js";
66
import { detectFormat, getParquetFooterLength, parseParquetFooter, parquetMetaToTableMeta } from "./parquet.js";
77
import { assembleRows, canSkipPage, bigIntReplacer } from "./decode.js";
8+
import { coalesceRanges } from "./coalesce.js";
89
import { instantiateWasm, WasmEngine } from "./wasm-engine.js";
910

1011
export { MasterDO } from "./master-do.js";
@@ -31,6 +32,7 @@ export type {
3132
IcebergSchema,
3233
IcebergDatasetMeta,
3334
AppendResult,
35+
ExplainResult,
3436
VectorIndexInfo,
3537
} from "./types.js";
3638

@@ -203,6 +205,39 @@ class RemoteExecutor implements QueryExecutor {
203205
},
204206
});
205207
}
208+
209+
private async postQuery<T>(path: string, query: QueryDescriptor): Promise<T> {
210+
const queryDo = this.getQueryDo();
211+
const response = await queryDo.fetch(new Request(`http://internal${path}`, {
212+
method: "POST",
213+
body: JSON.stringify(query, bigIntReplacer),
214+
headers: { "content-type": "application/json" },
215+
}));
216+
if (!response.ok) {
217+
const error = await response.text();
218+
throw new Error(`QueryMode ${path} failed: ${error}`);
219+
}
220+
return response.json() as Promise<T>;
221+
}
222+
223+
async count(query: QueryDescriptor): Promise<number> {
224+
const body = await this.postQuery<{ count: number }>("/query/count", query);
225+
return body.count;
226+
}
227+
228+
async exists(query: QueryDescriptor): Promise<boolean> {
229+
const body = await this.postQuery<{ exists: boolean }>("/query/exists", query);
230+
return body.exists;
231+
}
232+
233+
async first(query: QueryDescriptor): Promise<Row | null> {
234+
const body = await this.postQuery<{ row: Row | null }>("/query/first", query);
235+
return body.row;
236+
}
237+
238+
async explain(query: QueryDescriptor): Promise<ExplainResult> {
239+
return this.postQuery<ExplainResult>("/query/explain", query);
240+
}
206241
}
207242

208243
/**
@@ -321,6 +356,181 @@ class LocalExecutor implements QueryExecutor {
321356
};
322357
}
323358

359+
/** Count matching rows. No-filter case uses metadata only (zero I/O). */
360+
async count(query: QueryDescriptor): Promise<number> {
361+
const meta = await this.getOrLoadMeta(query.table);
362+
if (query.filters.length === 0) {
363+
return meta.columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
364+
}
365+
// With filters: fall through to aggregate path
366+
const desc = { ...query, aggregates: [{ fn: "count" as const, column: "*" }] };
367+
const result = await this.execute(desc);
368+
return (result.rows[0]?.["count_*"] as number) ?? 0;
369+
}
370+
371+
async exists(query: QueryDescriptor): Promise<boolean> {
372+
const desc = { ...query, limit: 1 };
373+
const result = await this.execute(desc);
374+
return result.rowCount > 0;
375+
}
376+
377+
async first(query: QueryDescriptor): Promise<Row | null> {
378+
const desc = { ...query, limit: 1 };
379+
const result = await this.execute(desc);
380+
return result.rows[0] ?? null;
381+
}
382+
383+
async explain(query: QueryDescriptor): Promise<ExplainResult> {
384+
const meta = await this.getOrLoadMeta(query.table);
385+
const { columns } = meta;
386+
const projectedColumns = query.projections.length > 0
387+
? columns.filter(c => query.projections.includes(c.name))
388+
: columns;
389+
390+
let pagesTotal = 0;
391+
let pagesSkipped = 0;
392+
const ranges: { column: string; offset: number; length: number }[] = [];
393+
const colDetails: ExplainResult["columns"] = [];
394+
395+
for (const col of projectedColumns) {
396+
let colBytes = 0;
397+
let colPages = 0;
398+
for (const page of col.pages) {
399+
pagesTotal++;
400+
if (!query.vectorSearch && canSkipPage(page, query.filters, col.name)) {
401+
pagesSkipped++;
402+
continue;
403+
}
404+
colPages++;
405+
colBytes += page.byteLength;
406+
ranges.push({ column: col.name, offset: Number(page.byteOffset), length: page.byteLength });
407+
}
408+
colDetails.push({ name: col.name, dtype: col.dtype as DataType, pages: colPages, bytes: colBytes });
409+
}
410+
411+
const coalesced = coalesceRanges(ranges, 64 * 1024);
412+
const estimatedBytes = ranges.reduce((s, r) => s + r.length, 0);
413+
const totalRows = columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
414+
415+
return {
416+
table: query.table,
417+
format: "lance",
418+
totalRows,
419+
columns: colDetails,
420+
pagesTotal,
421+
pagesSkipped,
422+
pagesScanned: pagesTotal - pagesSkipped,
423+
estimatedBytes,
424+
estimatedR2Reads: coalesced.length,
425+
fragments: 1,
426+
filters: query.filters.map(f => ({
427+
column: f.column,
428+
op: f.op,
429+
pushable: f.op !== "in" && f.op !== "neq",
430+
})),
431+
metaCached: this.metaCache.has(query.table),
432+
};
433+
}
434+
435+
async *cursor(query: QueryDescriptor, batchSize: number): AsyncIterable<Row[]> {
436+
const meta = await this.getOrLoadMeta(query.table);
437+
const { columns } = meta;
438+
const projectedColumns = query.projections.length > 0
439+
? columns.filter(c => query.projections.includes(c.name))
440+
: columns;
441+
442+
const isUrl = query.table.startsWith("http://") || query.table.startsWith("https://");
443+
const firstCol = projectedColumns[0];
444+
if (!firstCol) return;
445+
446+
// If sorted, must buffer all rows then chunk
447+
if (query.sortColumn) {
448+
const result = await this.execute(query);
449+
for (let i = 0; i < result.rows.length; i += batchSize) {
450+
yield result.rows.slice(i, i + batchSize);
451+
}
452+
return;
453+
}
454+
455+
const totalPages = firstCol.pages.length;
456+
let pageIdx = 0;
457+
let totalYielded = 0;
458+
459+
const fs = isUrl ? null : await import("node:fs/promises");
460+
const handle = isUrl ? null : await fs!.open(query.table, "r");
461+
const wasm = await this.getWasm();
462+
463+
try {
464+
while (pageIdx < totalPages) {
465+
let batchRows = 0;
466+
const batchStartPage = pageIdx;
467+
while (pageIdx < totalPages && batchRows < batchSize) {
468+
const page = firstCol.pages[pageIdx];
469+
if (!query.vectorSearch && canSkipPage(page, query.filters, firstCol.name)) {
470+
pageIdx++;
471+
continue;
472+
}
473+
batchRows += page.rowCount;
474+
pageIdx++;
475+
}
476+
477+
if (batchRows === 0) continue;
478+
479+
const columnData = new Map<string, ArrayBuffer[]>();
480+
for (const col of projectedColumns) {
481+
for (let pi = batchStartPage; pi < pageIdx; pi++) {
482+
const page = col.pages[pi];
483+
if (!page) continue;
484+
if (!query.vectorSearch && canSkipPage(page, query.filters, col.name)) continue;
485+
486+
let ab: ArrayBuffer;
487+
if (isUrl) {
488+
const start = Number(page.byteOffset);
489+
const end = start + page.byteLength - 1;
490+
const resp = await fetch(query.table, { headers: { Range: `bytes=${start}-${end}` } });
491+
ab = await resp.arrayBuffer();
492+
} else {
493+
const buf = Buffer.alloc(page.byteLength);
494+
await handle!.read(buf, 0, page.byteLength, Number(page.byteOffset));
495+
ab = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
496+
}
497+
498+
const arr = columnData.get(col.name) ?? [];
499+
arr.push(ab);
500+
columnData.set(col.name, arr);
501+
}
502+
}
503+
504+
const rows = assembleRows(columnData, projectedColumns, query, wasm);
505+
506+
if (rows.length > 0) {
507+
if (query.limit && totalYielded + rows.length > query.limit) {
508+
yield rows.slice(0, query.limit - totalYielded);
509+
return;
510+
}
511+
yield rows;
512+
totalYielded += rows.length;
513+
}
514+
}
515+
} finally {
516+
if (handle) await handle.close();
517+
}
518+
}
519+
520+
/** Get or load table metadata (footer + columns). Caches in-memory. */
521+
private async getOrLoadMeta(table: string): Promise<{ columns: ColumnMeta[]; fileSize: number }> {
522+
let cached = this.metaCache.get(table);
523+
if (cached) return cached;
524+
const isUrl = table.startsWith("http://") || table.startsWith("https://");
525+
cached = isUrl ? await this.loadMetaFromUrl(table) : await this.loadMetaFromFile(table);
526+
this.metaCache.set(table, cached);
527+
if (this.metaCache.size > 1000) {
528+
const firstKey = this.metaCache.keys().next().value;
529+
if (firstKey) this.metaCache.delete(firstKey);
530+
}
531+
return cached;
532+
}
533+
324534
async execute(query: QueryDescriptor): Promise<QueryResult> {
325535
const startTime = Date.now();
326536
const isUrl = query.table.startsWith("http://") || query.table.startsWith("https://");

0 commit comments

Comments
 (0)