Skip to content

Commit e144d30

Browse files
committed
feat: streaming execution engine for TB+ scale queries
Pull-based operator pipeline replaces full materialization: - ScanOperator, FilterOperator, ProjectOperator, LimitOperator - AggregateOperator (streaming partial agg, O(groups) memory) - TopKOperator (ORDER BY + LIMIT via min-heap, O(K) memory) - ExternalSortOperator (spill to NDJSON temp files, K-way merge) - HashJoinOperator (.join() API, build/probe with hash map) - WasmAggregateOperator (SIMD sum/min/max/avg on raw buffers) - Lance v2 page stats (compute-and-cache min/max for page skipping)
1 parent 2fa9df9 commit e144d30

File tree

7 files changed

+1424
-220
lines changed

7 files changed

+1424
-220
lines changed

src/client.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type {
33
AppendResult,
44
ExplainResult,
55
FilterOp,
6+
JoinDescriptor,
67
QueryResult,
78
Row,
89
VectorSearchParams,
@@ -32,6 +33,7 @@ export class TableQuery<T extends Row = Row> {
3233
private _aggregates: AggregateOp[] = [];
3334
private _groupBy: string[] = [];
3435
private _cacheTTL?: number;
36+
private _join?: JoinDescriptor;
3537
private _executor: QueryExecutor;
3638

3739
constructor(table: string, executor: QueryExecutor) {
@@ -102,6 +104,24 @@ export class TableQuery<T extends Row = Row> {
102104
return this;
103105
}
104106

107+
/**
108+
* Join with another table query. Build side (right) is hashed, probe side (left) streams through.
109+
*
110+
* Usage:
111+
* const orders = qm.table("orders");
112+
* const users = qm.table("users");
113+
* const result = await orders.join(users, { left: "user_id", right: "id" }).exec();
114+
*/
115+
join(other: TableQuery, on: { left: string; right: string }, type?: "inner" | "left"): this {
116+
this._join = {
117+
right: other.toDescriptor(),
118+
leftKey: on.left,
119+
rightKey: on.right,
120+
type: type ?? "inner",
121+
};
122+
return this;
123+
}
124+
105125
/** Enable query result caching with a TTL in milliseconds. Remote executor only. */
106126
cache(opts: { ttl: number }): this {
107127
this._cacheTTL = opts.ttl;
@@ -186,6 +206,7 @@ export class TableQuery<T extends Row = Row> {
186206
aggregates: this._aggregates.length > 0 ? this._aggregates : undefined,
187207
groupBy: this._groupBy.length > 0 ? this._groupBy : undefined,
188208
cacheTTL: this._cacheTTL,
209+
join: this._join,
189210
};
190211
}
191212
}
@@ -203,6 +224,7 @@ export interface QueryDescriptor {
203224
aggregates?: AggregateOp[];
204225
groupBy?: string[];
205226
cacheTTL?: number;
227+
join?: JoinDescriptor;
206228
}
207229

208230
/** Interface for query execution backends (local, DO, browser) */

src/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export type {
2525
FragmentInfo,
2626
FilterOp,
2727
AggregateOp,
28+
JoinDescriptor,
2829
QueryResult,
2930
Row,
3031
VectorSearchParams,
@@ -74,8 +75,8 @@ export class QueryMode {
7475
}
7576

7677
/** Create an QueryMode client for local use (Node/Bun, reads files from disk or URLs). */
77-
static local(wasmModule?: WebAssembly.Module): QueryMode {
78-
const executor = new LocalExecutor(wasmModule);
78+
static local(opts?: { wasmModule?: WebAssembly.Module; memoryBudgetBytes?: number }): QueryMode {
79+
const executor = opts ? new LocalExecutor(opts) : new LocalExecutor();
7980
return new QueryMode(executor);
8081
}
8182

src/lance-v2.ts

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,150 @@ export function lanceV2ToColumnMeta(colInfos: LanceV2ColumnInfo[]): ColumnMeta[]
391391
return columns;
392392
}
393393

394+
/**
395+
* Compute min/max statistics for Lance v2 columns by reading page data.
396+
* Lance v2 protobuf doesn't store page-level stats, so we compute them
397+
* on first access and populate the ColumnMeta pages in place.
398+
*
399+
* Only computes stats for numeric types (int8-int64, uint8-uint64, float32, float64)
400+
* since those benefit most from page skipping.
401+
*
402+
* @param columns - ColumnMeta array to populate with stats (mutated in place)
403+
* @param readPage - Function to read a page's raw bytes given offset and length
404+
*/
405+
export async function computeLanceV2Stats(
406+
columns: ColumnMeta[],
407+
readPage: (byteOffset: number, byteLength: number) => Promise<ArrayBuffer>,
408+
): Promise<void> {
409+
const numericTypes = new Set([
410+
"int8", "int16", "int32", "int64",
411+
"uint8", "uint16", "uint32", "uint64",
412+
"float32", "float64",
413+
]);
414+
415+
for (const col of columns) {
416+
if (!numericTypes.has(col.dtype)) continue;
417+
418+
for (const page of col.pages) {
419+
// Skip if stats already populated
420+
if (page.minValue !== undefined) continue;
421+
422+
try {
423+
const buf = await readPage(Number(page.byteOffset), page.byteLength);
424+
const stats = computePageStats(buf, col.dtype, page.rowCount, page.nullCount, page.dataOffsetInPage);
425+
if (stats) {
426+
page.minValue = stats.min;
427+
page.maxValue = stats.max;
428+
}
429+
} catch {
430+
// Silently skip — stats are an optimization, not required
431+
}
432+
}
433+
}
434+
}
435+
436+
/** Compute min/max from a raw page buffer for a numeric dtype. */
437+
function computePageStats(
438+
buf: ArrayBuffer,
439+
dtype: string,
440+
rowCount: number,
441+
nullCount: number,
442+
dataOffsetInPage?: number,
443+
): { min: number | bigint; max: number | bigint } | null {
444+
let dataBuf = buf;
445+
446+
// Strip null bitmap for nullable columns
447+
if (nullCount > 0 && rowCount > 0) {
448+
const stripBytes = dataOffsetInPage ?? Math.ceil(rowCount / 8);
449+
dataBuf = buf.slice(stripBytes);
450+
}
451+
452+
const view = new DataView(dataBuf);
453+
const numValues = dataOffsetInPage !== undefined ? rowCount : rowCount - nullCount;
454+
if (numValues <= 0) return null;
455+
456+
// Read null bitmap to skip null positions
457+
let nullSet: Set<number> | null = null;
458+
if (nullCount > 0 && rowCount > 0) {
459+
const bitmapBytes = Math.ceil(rowCount / 8);
460+
if (buf.byteLength >= bitmapBytes) {
461+
const bytes = new Uint8Array(buf, 0, bitmapBytes);
462+
nullSet = new Set<number>();
463+
let idx = 0;
464+
for (let b = 0; b < bitmapBytes && idx < rowCount; b++) {
465+
for (let bit = 0; bit < 8 && idx < rowCount; bit++, idx++) {
466+
if (((bytes[b] >> bit) & 1) === 0) nullSet.add(idx);
467+
}
468+
}
469+
}
470+
}
471+
472+
switch (dtype) {
473+
case "float64": {
474+
let min = Infinity, max = -Infinity;
475+
for (let i = 0; i < numValues; i++) {
476+
if (nullSet?.has(i)) continue;
477+
const v = view.getFloat64(i * 8, true);
478+
if (v < min) min = v;
479+
if (v > max) max = v;
480+
}
481+
return min <= max ? { min, max } : null;
482+
}
483+
case "float32": {
484+
let min = Infinity, max = -Infinity;
485+
for (let i = 0; i < numValues; i++) {
486+
if (nullSet?.has(i)) continue;
487+
const v = view.getFloat32(i * 4, true);
488+
if (v < min) min = v;
489+
if (v > max) max = v;
490+
}
491+
return min <= max ? { min, max } : null;
492+
}
493+
case "int64": {
494+
let min = BigInt("9223372036854775807"), max = BigInt("-9223372036854775808");
495+
for (let i = 0; i < numValues; i++) {
496+
if (nullSet?.has(i)) continue;
497+
const v = view.getBigInt64(i * 8, true);
498+
if (v < min) min = v;
499+
if (v > max) max = v;
500+
}
501+
return min <= max ? { min, max } : null;
502+
}
503+
case "int32": {
504+
let min = 2147483647, max = -2147483648;
505+
for (let i = 0; i < numValues; i++) {
506+
if (nullSet?.has(i)) continue;
507+
const v = view.getInt32(i * 4, true);
508+
if (v < min) min = v;
509+
if (v > max) max = v;
510+
}
511+
return min <= max ? { min, max } : null;
512+
}
513+
case "int16": {
514+
let min = 32767, max = -32768;
515+
for (let i = 0; i < numValues; i++) {
516+
if (nullSet?.has(i)) continue;
517+
const v = view.getInt16(i * 2, true);
518+
if (v < min) min = v;
519+
if (v > max) max = v;
520+
}
521+
return min <= max ? { min, max } : null;
522+
}
523+
case "int8": {
524+
let min = 127, max = -128;
525+
for (let i = 0; i < numValues; i++) {
526+
if (nullSet?.has(i)) continue;
527+
const v = view.getInt8(i);
528+
if (v < min) min = v;
529+
if (v > max) max = v;
530+
}
531+
return min <= max ? { min, max } : null;
532+
}
533+
default:
534+
return null;
535+
}
536+
}
537+
394538
/**
395539
* Decode Lance v2 utf8 data from a buffer containing [offsets_array | padding | string_data].
396540
* The offsets array is (rowCount) i64 values representing cumulative end positions.

0 commit comments

Comments
 (0)