Skip to content

Commit 5489350

Browse files
committed
perf: cached identity indices, eliminate .map().join() in GROUP BY/DISTINCT keys; fix 5 doc issues
Performance: - Identity Uint32Array: cache by size instead of allocating per batch per operator (7 sites) - GROUP BY key: replace .map().join() with direct concatenation (partial-agg.ts, 2 paths) - DISTINCT key: same .map().join() → direct concatenation (operators.ts, 2 paths) Docs: - README: fix Fragment DO pool max (20 → 100), test count (112 → 460+) - formats.mdx: add missing await on fromCSV - composability.mdx: add nextColumnar to Operator interface
1 parent 40797e7 commit 5489350

5 files changed

Lines changed: 59 additions & 23 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,8 @@ npx tsx examples/nextjs-api-route.ts
251251
- **IVF-PQ vector search** — index-aware routing in Query DO, falls back to flat SIMD search when no index present
252252
- **Multi-format support** — Lance, Parquet, and Iceberg tables
253253
- **Local mode** — same API reads Lance/Parquet files from disk or HTTP (Node/Bun)
254-
- **Fragment DO pool** — fan-out parallel scanning for multi-fragment datasets (max 20 slots per datacenter)
255-
- **112 unit tests + 26 conformance tests** — unit tests cover footer parsing, column decoding, Parquet/Thrift, merging, aggregates, VIP cache, WASM integration; conformance tests validate every operator against DuckDB at 1M-5M row scale
254+
- **Fragment DO pool** — fan-out parallel scanning for multi-fragment datasets (max 100 slots per datacenter)
255+
- **460+ tests** — unit tests cover footer parsing, column decoding, Parquet/Thrift, merging, aggregates, VIP cache, WASM integration, SQL, partition catalog, materialized executor; 110+ conformance tests validate every operator against DuckDB at 1M-5M row scale
256256
- **CI benchmarks** — head-to-head QueryMode (Miniflare) vs DuckDB (native) on every push, results posted to [GitHub Actions summary](https://github.com/teamchong/querymode/actions/workflows/ci.yml)
257257

258258
## What doesn't exist yet

docs/src/content/docs/composability.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Every operator in QueryMode implements one interface:
1010
```typescript
1111
interface Operator {
1212
next(): Promise<RowBatch | null>
13+
nextColumnar?(): Promise<ColumnarBatch | null> // optional columnar mode
1314
close(): Promise<void>
1415
}
1516
```

docs/src/content/docs/formats.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ In-memory materialization for small datasets:
8181

8282
```typescript
8383
// From CSV string
84-
const qm = QueryMode.fromCSV(csvString, "my_table")
84+
const qm = await QueryMode.fromCSV(csvString, "my_table")
8585

8686
// From JSON array
8787
const qm = QueryMode.fromJSON(jsonArray, "my_table")

src/operators.ts

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,19 @@ import { decodeParquetColumnChunk } from "./parquet-decode.js";
1515

1616
const _textEncoder = new TextEncoder();
1717

18+
/** Cached identity index arrays to avoid repeated allocations on hot paths. */
19+
const _identityCache = new Map<number, Uint32Array>();
20+
function identityIndices(n: number): Uint32Array {
21+
let cached = _identityCache.get(n);
22+
if (cached) return cached;
23+
cached = new Uint32Array(n);
24+
for (let i = 0; i < n; i++) cached[i] = i;
25+
// Keep cache bounded — only cache common page sizes
26+
if (_identityCache.size > 16) _identityCache.clear();
27+
_identityCache.set(n, cached);
28+
return cached;
29+
}
30+
1831
export type DecodedValue = number | bigint | string | boolean | Float32Array | null;
1932
import {
2033
computePartialAgg,
@@ -54,7 +67,7 @@ export interface Operator {
5467
/** Materialize a ColumnarBatch into Row[] — used at pipeline boundaries. */
5568
export function materializeRows(batch: ColumnarBatch): Row[] {
5669
const rows: Row[] = [];
57-
const indices = batch.selection ?? Uint32Array.from({ length: batch.rowCount }, (_, i) => i);
70+
const indices = batch.selection ?? identityIndices(batch.rowCount);
5871
for (const idx of indices) {
5972
const row: Row = {};
6073
for (const [name, vals] of batch.columns) {
@@ -399,7 +412,7 @@ function scanFilterIndices(
399412
indices = indices ? wasmIntersect(indices, orResult, wasm) : orResult;
400413
}
401414

402-
return indices ?? Uint32Array.from({ length: rowCount }, (_, i) => i);
415+
return indices ?? identityIndices(rowCount);
403416
}
404417

405418
/** Apply AND-connected filters, returning matching row indices. */
@@ -467,7 +480,7 @@ function applyAndFilters(
467480
}
468481

469482
// JS fallback: evaluate filter on current index set
470-
const src = indices ?? Uint32Array.from({ length: rowCount }, (_, i) => i);
483+
const src = indices ?? identityIndices(rowCount);
471484
const kept: number[] = [];
472485
for (const idx of src) {
473486
const v = values[idx];
@@ -481,7 +494,7 @@ function applyAndFilters(
481494
if (indices.length === 0) return indices;
482495
}
483496

484-
return indices ?? Uint32Array.from({ length: rowCount }, (_, i) => i);
497+
return indices ?? identityIndices(rowCount);
485498
}
486499

487500
/** Run WASM SIMD filter on decoded numeric values (f64, i32, i64). */
@@ -1112,15 +1125,17 @@ export class DistinctOperator implements Operator {
11121125
const batch = await this.upstream.nextColumnar();
11131126
if (!batch) return null;
11141127

1115-
const indices = batch.selection ?? Uint32Array.from({ length: batch.rowCount }, (_, i) => i);
1128+
const indices = batch.selection ?? identityIndices(batch.rowCount);
11161129
const kept: number[] = [];
11171130
const cols = this.columns.length > 0 ? this.columns : Array.from(batch.columns.keys());
11181131

11191132
for (const idx of indices) {
1120-
const key = cols.map(c => {
1121-
const vals = batch.columns.get(c);
1122-
return String(vals ? (vals[idx] ?? "") : "");
1123-
}).join("\x00");
1133+
let key = "";
1134+
for (let g = 0; g < cols.length; g++) {
1135+
if (g > 0) key += "\x00";
1136+
const vals = batch.columns.get(cols[g]);
1137+
key += String(vals ? (vals[idx] ?? "") : "");
1138+
}
11241139
if (!this.seen.has(key)) {
11251140
this.seen.add(key);
11261141
kept.push(idx);
@@ -1139,9 +1154,12 @@ export class DistinctOperator implements Operator {
11391154

11401155
const unique: Row[] = [];
11411156
for (const row of batch) {
1142-
const key = this.columns.length > 0
1143-
? this.columns.map(c => String(row[c] ?? "")).join("\x00")
1144-
: Object.keys(row).map(k => String(row[k] ?? "")).join("\x00");
1157+
let key = "";
1158+
const keyCols = this.columns.length > 0 ? this.columns : Object.keys(row);
1159+
for (let g = 0; g < keyCols.length; g++) {
1160+
if (g > 0) key += "\x00";
1161+
key += String(row[keyCols[g]] ?? "");
1162+
}
11451163
if (!this.seen.has(key)) {
11461164
this.seen.add(key);
11471165
unique.push(row);
@@ -1354,7 +1372,7 @@ export class FilterOperator implements Operator {
13541372
if (!batch) return null;
13551373

13561374
// Apply filters on the columnar batch — narrow the selection vector
1357-
const srcIndices = batch.selection ?? Uint32Array.from({ length: batch.rowCount }, (_, i) => i);
1375+
const srcIndices = batch.selection ?? identityIndices(batch.rowCount);
13581376
const kept: number[] = [];
13591377

13601378
for (const idx of srcIndices) {
@@ -1681,7 +1699,7 @@ export class TopKOperator implements Operator {
16811699
while (true) {
16821700
const batch = await this.upstream.nextColumnar();
16831701
if (!batch) break;
1684-
const indices = batch.selection ?? Uint32Array.from({ length: batch.rowCount }, (_, i) => i);
1702+
const indices = batch.selection ?? identityIndices(batch.rowCount);
16851703
const colNames = Array.from(batch.columns.keys());
16861704
const sortVals = batch.columns.get(col);
16871705
for (const idx of indices) {

src/partial-agg.ts

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@ import type { Row } from "./types.js";
22
import type { QueryDescriptor } from "./client.js";
33
import type { ColumnarBatch, DecodedValue } from "./operators.js";
44

5+
const _identityCacheLocal = new Map<number, Uint32Array>();
6+
function identityIndicesLocal(n: number): Uint32Array {
7+
let cached = _identityCacheLocal.get(n);
8+
if (cached) return cached;
9+
cached = new Uint32Array(n);
10+
for (let i = 0; i < n; i++) cached[i] = i;
11+
if (_identityCacheLocal.size > 16) _identityCacheLocal.clear();
12+
_identityCacheLocal.set(n, cached);
13+
return cached;
14+
}
15+
516
export interface PartialAgg {
617
states: PartialAggState[];
718
groups?: Map<string, PartialAggState[]>;
@@ -143,7 +154,11 @@ export function computePartialAgg(
143154
const groupCols = query.groupBy;
144155

145156
for (const row of rows) {
146-
const key = groupCols.map((c) => String(row[c] ?? "")).join("\x00");
157+
let key = "";
158+
for (let g = 0; g < groupCols.length; g++) {
159+
if (g > 0) key += "\x00";
160+
key += String(row[groupCols[g]] ?? "");
161+
}
147162
let states = groups.get(key);
148163
if (!states) {
149164
states = aggregates.map((agg) =>
@@ -176,7 +191,7 @@ export function computePartialAggColumnar(
176191
query: QueryDescriptor,
177192
): PartialAgg {
178193
const aggregates = query.aggregates ?? [];
179-
const indices = batch.selection ?? Uint32Array.from({ length: batch.rowCount }, (_, i) => i);
194+
const indices = batch.selection ?? identityIndicesLocal(batch.rowCount);
180195

181196
if (!query.groupBy || query.groupBy.length === 0) {
182197
const states = aggregates.map((agg) =>
@@ -207,10 +222,12 @@ export function computePartialAggColumnar(
207222
const groupCols = query.groupBy;
208223

209224
for (const idx of indices) {
210-
const key = groupCols.map((c) => {
211-
const vals = batch.columns.get(c);
212-
return String(vals ? (vals[idx] ?? "") : "");
213-
}).join("\x00");
225+
let key = "";
226+
for (let g = 0; g < groupCols.length; g++) {
227+
if (g > 0) key += "\x00";
228+
const vals = batch.columns.get(groupCols[g]);
229+
key += String(vals ? (vals[idx] ?? "") : "");
230+
}
214231
let states = groups.get(key);
215232
if (!states) {
216233
states = aggregates.map((agg) =>

0 commit comments

Comments
 (0)