Skip to content

Commit 3477078

Browse files
committed
feat: implement lazy DataFrame with full query engine capabilities
Transform QueryMode from basic SQL into a composable lazy DataFrame API with 10 new capabilities: - Phase 0: Distributed execution layer (WorkerPool + R2Partitioner tree fan-out) - Phase 1: Immutable lazy DataFrame with computed columns, replacing mutable TableQuery - Phase 2: Window functions (row_number, rank, dense_rank, lag, lead, aggregates) - Phase 3: Set operations (union/intersect/except) and DISTINCT operator - Phase 4: Extended aggregates (stddev, variance, median, percentile, count_distinct) + right/full/cross join types in HashJoinOperator - Phase 5: Subqueries (filterIn) and materialized intermediates - Phase 6: Pure TypeScript HNSW index for approximate nearest neighbor search - Phase 7: Pluggable format readers (CSV, JSON/NDJSON, Arrow IPC) via ReaderRegistry - Phase 8: MaterializationCache with page-level LRU and byte-budget eviction - Phase 9: Time travel (version queries, diff, version listing) All operators honor the pull-based pipeline contract (bounded memory, spill-to-disk). Backward compatible: TableQuery kept as type alias, .exec() as alias for .collect().
1 parent d487ecf commit 3477078

File tree

14 files changed

+4251
-109
lines changed

14 files changed

+4251
-109
lines changed

src/client.ts

Lines changed: 479 additions & 72 deletions
Large diffs are not rendered by default.

src/hnsw.ts

Lines changed: 622 additions & 0 deletions
Large diffs are not rendered by default.

src/index.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,25 @@
1-
import { TableQuery } from "./client.js";
1+
import { DataFrame, TableQuery } from "./client.js";
22
import type { QueryDescriptor, QueryExecutor } from "./client.js";
33
import type { AppendResult, ExplainResult, QueryResult, Row, QueryDORpc, MasterDORpc } from "./types.js";
44
import { LocalExecutor } from "./local-executor.js";
55

66
export { MasterDO } from "./master-do.js";
77
export { QueryDO } from "./query-do.js";
88
export { FragmentDO } from "./fragment-do.js";
9-
export { TableQuery } from "./client.js";
9+
export { WorkerPool, R2Partitioner } from "./worker-pool.js";
10+
export { WorkerDO } from "./worker-do.js";
11+
export type { R2Partition, WorkerDORpc } from "./worker-pool.js";
12+
export { ReaderRegistry, FileDataSource, UrlDataSource } from "./reader.js";
13+
export type { FormatReader, DataSource } from "./reader.js";
14+
export { DataFrame, TableQuery } from "./client.js";
15+
export { LazyResultHandle } from "./client.js";
1016
export { QueryModeError } from "./errors.js";
1117
export { LocalExecutor } from "./local-executor.js";
1218
export { bigIntReplacer } from "./decode.js";
19+
export { HnswIndex, cosineDistance, l2DistanceSq, dotDistance } from "./hnsw.js";
20+
export type { HnswOptions } from "./hnsw.js";
21+
export { MaterializationCache, queryHashKey } from "./lazy.js";
22+
export type { MaterializationCacheOptions } from "./lazy.js";
1323
export type { QueryExecutor, QueryDescriptor } from "./client.js";
1424
export type {
1525
Env,
@@ -25,6 +35,10 @@ export type {
2535
FilterOp,
2636
AggregateOp,
2737
JoinDescriptor,
38+
JoinKeys,
39+
JoinType,
40+
WindowSpec,
41+
VectorOpts,
2842
QueryResult,
2943
Row,
3044
VectorSearchParams,
@@ -33,6 +47,8 @@ export type {
3347
AppendResult,
3448
ExplainResult,
3549
VectorIndexInfo,
50+
VersionInfo,
51+
DiffResult,
3652
QueryDORpc,
3753
MasterDORpc,
3854
} from "./types.js";
@@ -81,9 +97,9 @@ export class QueryMode {
8197
return new QueryMode(executor);
8298
}
8399

84-
/** Start building a query against a table. */
85-
table(name: string): TableQuery {
86-
return new TableQuery(name, this.executor);
100+
/** Start building a query against a table. Returns a lazy DataFrame. */
101+
table(name: string): DataFrame {
102+
return new DataFrame(name, this.executor);
87103
}
88104

89105
/**

src/lazy.ts

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
import type { Row } from "./types.js";
2+
import type { QueryDescriptor } from "./client.js";
3+
4+
export interface MaterializationCacheOptions {
5+
/** Maximum bytes for the cache (default 64MB). */
6+
maxBytes?: number;
7+
}
8+
9+
interface CacheEntry {
10+
rows: Row[];
11+
byteSize: number;
12+
lastAccess: number;
13+
}
14+
15+
const DEFAULT_MAX_BYTES = 64 * 1024 * 1024; // 64MB
16+
17+
/**
18+
* Estimate the byte size of a single Row.
19+
* 64 bytes base overhead + 8 bytes per key + value sizes.
20+
*/
21+
function estimateRowBytes(row: Row): number {
22+
let bytes = 64; // base object overhead
23+
for (const key in row) {
24+
bytes += 8; // key pointer/overhead
25+
const val = row[key];
26+
if (val === null || val === undefined) {
27+
bytes += 0;
28+
} else if (typeof val === "number") {
29+
bytes += 8;
30+
} else if (typeof val === "bigint") {
31+
bytes += 8;
32+
} else if (typeof val === "boolean") {
33+
bytes += 4;
34+
} else if (typeof val === "string") {
35+
bytes += val.length * 2;
36+
} else if (val instanceof Float32Array) {
37+
bytes += val.byteLength;
38+
}
39+
}
40+
return bytes;
41+
}
42+
43+
/**
44+
* Estimate total byte size for a page of rows.
45+
*/
46+
function estimatePageBytes(rows: Row[]): number {
47+
let total = 0;
48+
for (let i = 0; i < rows.length; i++) {
49+
total += estimateRowBytes(rows[i]);
50+
}
51+
return total;
52+
}
53+
54+
/**
55+
* Page-level LRU cache for materialized query results.
56+
*
57+
* Keyed by query hash + page offset. Evicts least-recently-accessed
58+
* entries when the byte budget is exceeded.
59+
*/
60+
export class MaterializationCache {
61+
private readonly maxBytes: number;
62+
private readonly entries: Map<string, CacheEntry> = new Map();
63+
private _bytesUsed = 0;
64+
private _hits = 0;
65+
private _misses = 0;
66+
67+
constructor(opts?: MaterializationCacheOptions) {
68+
this.maxBytes = opts?.maxBytes ?? DEFAULT_MAX_BYTES;
69+
}
70+
71+
/** Store a page of rows under a query hash + offset. */
72+
set(queryHash: string, offset: number, rows: Row[]): void {
73+
const key = `${queryHash}:${offset}`;
74+
// Remove existing entry if present (to update size accounting)
75+
const existing = this.entries.get(key);
76+
if (existing) {
77+
this._bytesUsed -= existing.byteSize;
78+
this.entries.delete(key);
79+
}
80+
81+
const byteSize = estimatePageBytes(rows);
82+
83+
// If single entry exceeds budget, don't cache it
84+
if (byteSize > this.maxBytes) return;
85+
86+
// Evict LRU entries until there's room
87+
while (this._bytesUsed + byteSize > this.maxBytes && this.entries.size > 0) {
88+
this.evictLRU();
89+
}
90+
91+
this.entries.set(key, {
92+
rows,
93+
byteSize,
94+
lastAccess: performance.now(),
95+
});
96+
this._bytesUsed += byteSize;
97+
}
98+
99+
/**
100+
* Retrieve a cached page. Returns null on miss.
101+
*
102+
* A get for offset=100, limit=50 succeeds if there is a cached page
103+
* starting at offset=100 that contains at least 50 rows.
104+
*/
105+
get(queryHash: string, offset: number, limit: number): Row[] | null {
106+
const key = `${queryHash}:${offset}`;
107+
const entry = this.entries.get(key);
108+
if (!entry || entry.rows.length < limit) {
109+
this._misses++;
110+
return null;
111+
}
112+
// Update last access for LRU
113+
entry.lastAccess = performance.now();
114+
this._hits++;
115+
// Return exactly `limit` rows if the cached page has more
116+
return entry.rows.length === limit ? entry.rows : entry.rows.slice(0, limit);
117+
}
118+
119+
/** Invalidate all cached pages for a query. */
120+
invalidate(queryHash: string): void {
121+
const prefix = `${queryHash}:`;
122+
for (const [key, entry] of this.entries) {
123+
if (key.startsWith(prefix)) {
124+
this._bytesUsed -= entry.byteSize;
125+
this.entries.delete(key);
126+
}
127+
}
128+
}
129+
130+
/** Clear the entire cache. */
131+
clear(): void {
132+
this.entries.clear();
133+
this._bytesUsed = 0;
134+
}
135+
136+
/** Cache statistics. */
137+
get stats(): { hits: number; misses: number; bytesUsed: number; entries: number } {
138+
return {
139+
hits: this._hits,
140+
misses: this._misses,
141+
bytesUsed: this._bytesUsed,
142+
entries: this.entries.size,
143+
};
144+
}
145+
146+
/** Evict the least-recently-accessed entry. */
147+
private evictLRU(): void {
148+
let oldestKey: string | undefined;
149+
let oldestTime = Infinity;
150+
for (const [key, entry] of this.entries) {
151+
if (entry.lastAccess < oldestTime) {
152+
oldestTime = entry.lastAccess;
153+
oldestKey = key;
154+
}
155+
}
156+
if (oldestKey !== undefined) {
157+
const entry = this.entries.get(oldestKey)!;
158+
this._bytesUsed -= entry.byteSize;
159+
this.entries.delete(oldestKey);
160+
}
161+
}
162+
}
163+
164+
/**
165+
* Create a deterministic hash key from a QueryDescriptor, excluding
166+
* offset and limit (since those define the page, not the query identity).
167+
*/
168+
export function queryHashKey(desc: QueryDescriptor): string {
169+
const parts: string[] = [desc.table];
170+
171+
// Filters — sorted by column for determinism
172+
if (desc.filters.length > 0) {
173+
const sorted = [...desc.filters].sort((a, b) => a.column.localeCompare(b.column));
174+
for (const f of sorted) {
175+
parts.push(`f:${f.column}:${f.op}:${stringifyValue(f.value)}`);
176+
}
177+
}
178+
179+
// Projections — sorted
180+
if (desc.projections.length > 0) {
181+
parts.push(`p:${[...desc.projections].sort().join(",")}`);
182+
}
183+
184+
// Sort
185+
if (desc.sortColumn) {
186+
parts.push(`s:${desc.sortColumn}:${desc.sortDirection ?? "asc"}`);
187+
}
188+
189+
// Aggregates
190+
if (desc.aggregates && desc.aggregates.length > 0) {
191+
for (const agg of desc.aggregates) {
192+
parts.push(`a:${agg.fn}:${agg.column}${agg.alias ? `:${agg.alias}` : ""}`);
193+
}
194+
}
195+
196+
// Group by
197+
if (desc.groupBy && desc.groupBy.length > 0) {
198+
parts.push(`g:${[...desc.groupBy].sort().join(",")}`);
199+
}
200+
201+
// Vector search
202+
if (desc.vectorSearch) {
203+
const vs = desc.vectorSearch;
204+
parts.push(`v:${vs.column}:${vs.topK}:${Array.from(vs.queryVector).join(",")}`);
205+
}
206+
207+
// Join
208+
if (desc.join) {
209+
const j = desc.join;
210+
parts.push(`j:${j.leftKey}:${j.rightKey}:${j.type ?? "inner"}:${queryHashKey(j.right as QueryDescriptor)}`);
211+
}
212+
213+
return parts.join("|");
214+
}
215+
216+
function stringifyValue(value: unknown): string {
217+
if (Array.isArray(value)) {
218+
return `[${value.map(stringifyValue).join(",")}]`;
219+
}
220+
if (typeof value === "bigint") {
221+
return `${value}n`;
222+
}
223+
return String(value);
224+
}

0 commit comments

Comments
 (0)