Skip to content

Commit d487ecf

Browse files
committed
fix: close 5 unbounded memory violations in operator pipeline
- AggregateOperator: incrementally merge partials instead of accumulating all in an array — O(groups) memory instead of O(batches × groups) - mergeQueryResults: use k-way merge for sorted results without limit, limit-aware concat with early termination for bounded results - LocalExecutor: always pass FsSpillBackend + memory budget to pipeline options and HashJoinOperator so spill is available for joins - ProjectOperator: create new projected row objects instead of deleting keys (avoids V8 hidden class deoptimization) - HashJoinOperator: sample first batch to estimate row size and go straight to partitioned spill if right side likely exceeds budget
1 parent 0d940c5 commit d487ecf

File tree

3 files changed

+137
-65
lines changed

3 files changed

+137
-65
lines changed

src/local-executor.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { instantiateWasm, type WasmEngine } from "./wasm-engine.js";
1616
import { VipCache } from "./vip-cache.js";
1717
import { QueryModeError } from "./errors.js";
1818
import { parseLanceV2Columns, lanceV2ToColumnMeta, computeLanceV2Stats } from "./lance-v2.js";
19-
import { buildPipeline, drainPipeline, type FragmentSource, type PipelineOptions } from "./operators.js";
19+
import { buildPipeline, drainPipeline, DEFAULT_MEMORY_BUDGET, type FragmentSource, type PipelineOptions } from "./operators.js";
2020

2121
/**
2222
* Executor for local mode (Node/Bun).
@@ -426,8 +426,11 @@ export class LocalExecutor implements QueryExecutor {
426426
// Step 3: Build streaming pipeline
427427
const wasm = await this.getWasm();
428428
const fragment = await this.makeFragmentSource(query.table, projectedColumns, isUrl);
429-
const pipeOpts: PipelineOptions | undefined = this.memoryBudgetBytes
430-
? { memoryBudgetBytes: this.memoryBudgetBytes } : undefined;
429+
const { FsSpillBackend } = await import("./operators.js");
430+
const pipeOpts: PipelineOptions = {
431+
memoryBudgetBytes: this.memoryBudgetBytes ?? DEFAULT_MEMORY_BUDGET,
432+
spill: new FsSpillBackend(),
433+
};
431434

432435
// If join is specified, build left + right pipelines and combine with HashJoinOperator
433436
if (query.join) {
@@ -581,13 +584,16 @@ export class LocalExecutor implements QueryExecutor {
581584
}
582585

583586
// Hash join: right is build side, left is probe side
584-
const { HashJoinOperator } = await import("./operators.js");
587+
const { HashJoinOperator, FsSpillBackend: FsSpillJoin } = await import("./operators.js");
588+
const joinSpill = pipeOpts?.spill ?? new FsSpillJoin();
589+
const joinBudget = pipeOpts?.memoryBudgetBytes ?? DEFAULT_MEMORY_BUDGET;
585590
let pipeline: import("./operators.js").Operator = new HashJoinOperator(
586591
leftPipeline, rightPipeline, join.leftKey, join.rightKey, join.type ?? "inner",
592+
joinBudget, joinSpill,
587593
);
588594

589595
// Apply post-join sort/limit/aggregates
590-
const { buildPipeline: _, TopKOperator, InMemorySortOperator, LimitOperator, ProjectOperator, AggregateOperator, ExternalSortOperator, DEFAULT_MEMORY_BUDGET } = await import("./operators.js");
596+
const { buildPipeline: _, TopKOperator, InMemorySortOperator, LimitOperator, ProjectOperator, AggregateOperator, ExternalSortOperator } = await import("./operators.js");
591597
const hasAgg = query.aggregates && query.aggregates.length > 0;
592598

593599
if (hasAgg) {
@@ -780,8 +786,11 @@ export class LocalExecutor implements QueryExecutor {
780786
? query.projections
781787
: (dataset.fragmentMetas.values().next().value?.columns.map((c: ColumnMeta) => c.name) ?? []);
782788

783-
const pipeOpts: PipelineOptions | undefined = this.memoryBudgetBytes
784-
? { memoryBudgetBytes: this.memoryBudgetBytes } : undefined;
789+
const { FsSpillBackend: FsSpill } = await import("./operators.js");
790+
const pipeOpts: PipelineOptions = {
791+
memoryBudgetBytes: this.memoryBudgetBytes ?? DEFAULT_MEMORY_BUDGET,
792+
spill: new FsSpill(),
793+
};
785794
const { pipeline, scan, wasmAgg } = buildPipeline(fragments, query, wasm, outputColumns, pipeOpts);
786795
const rows = await drainPipeline(pipeline);
787796

src/merge.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,13 @@ export function mergeQueryResults(
140140
};
141141
}
142142

143-
// Sort + limit path: k-way merge
144-
if (query.sortColumn && query.limit) {
143+
// Sort path: k-way merge (works with or without limit)
144+
if (query.sortColumn) {
145145
const rows = kWayMerge(
146146
partials.map((p) => p.rows),
147147
query.sortColumn,
148148
query.sortDirection ?? "asc",
149-
query.limit,
149+
query.limit ?? Infinity,
150150
);
151151
return {
152152
rows,
@@ -158,10 +158,19 @@ export function mergeQueryResults(
158158
};
159159
}
160160

161-
// Default: concat and apply limit
162-
let rows = partials.flatMap((p) => p.rows);
163-
if (query.limit && rows.length > query.limit) {
164-
rows = rows.slice(0, query.limit);
161+
// Unsorted: limit-aware concat with early termination
162+
let rows: Row[];
163+
if (query.limit) {
164+
rows = [];
165+
for (const p of partials) {
166+
for (const row of p.rows) {
167+
rows.push(row);
168+
if (rows.length >= query.limit) break;
169+
}
170+
if (rows.length >= query.limit) break;
171+
}
172+
} else {
173+
rows = partials.flatMap((p) => p.rows);
165174
}
166175

167176
return {

src/operators.ts

Lines changed: 105 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,12 @@ export class ProjectOperator implements Operator {
267267
const batch = await this.upstream.next();
268268
if (!batch) return null;
269269

270-
for (const row of batch) {
271-
for (const key of Object.keys(row)) {
272-
if (!this.keep.has(key)) delete row[key];
273-
}
274-
}
275-
return batch;
270+
// Create new projected row objects — no delete, no V8 hidden class deopt
271+
return batch.map(row => {
272+
const out: Row = {};
273+
for (const k of this.keep) if (k in row) out[k] = row[k];
274+
return out;
275+
});
276276
}
277277

278278
async close(): Promise<void> {
@@ -298,19 +298,18 @@ export class AggregateOperator implements Operator {
298298
if (this.consumed) return null;
299299
this.consumed = true;
300300

301-
// Stream all batches through partial aggregation
302-
const partials: PartialAgg[] = [];
301+
// Incrementally merge partials — O(groups) memory, not O(batches × groups)
302+
let merged: PartialAgg | null = null;
303303
while (true) {
304304
const batch = await this.upstream.next();
305305
if (!batch) break;
306-
partials.push(computePartialAgg(batch, this.query));
306+
const partial = computePartialAgg(batch, this.query);
307+
merged = merged ? mergePartialAggs([merged, partial]) : partial;
307308
}
308309

309-
if (partials.length === 0) {
310+
if (!merged) {
310311
return finalizePartialAgg({ states: [] }, this.query);
311312
}
312-
313-
const merged = partials.length === 1 ? partials[0] : mergePartialAggs(partials);
314313
return finalizePartialAgg(merged, this.query);
315314
}
316315

@@ -673,89 +672,144 @@ export class HashJoinOperator implements Operator {
673672
private async buildOrPartition(): Promise<void> {
674673
if (this.hashMap || this.partitionCount > 0) return;
675674

676-
const inMemoryRows: Row[] = [];
677-
let exceeds = false;
675+
// Sample first batch to estimate whether right side fits in memory.
676+
// If estimated total exceeds budget, go straight to partitioned path
677+
// instead of accumulating rows we'll have to re-spill.
678+
const firstBatch = await this.right.next();
679+
if (!firstBatch) {
680+
// Empty right side — build empty hash map
681+
this.hashMap = new Map<string, Row[]>();
682+
return;
683+
}
678684

679-
// Consume right side, tracking memory
680-
while (true) {
681-
const batch = await this.right.next();
682-
if (!batch) break;
683-
for (const row of batch) {
684-
const rowSize = estimateRowSize(row);
685-
this.buildSizeBytes += rowSize;
686-
inMemoryRows.push(row);
685+
let batchSizeBytes = 0;
686+
for (const row of firstBatch) batchSizeBytes += estimateRowSize(row);
687+
const avgRowSize = batchSizeBytes / firstBatch.length;
687688

688-
if (this.spill && this.buildSizeBytes > this.memoryBudget) {
689-
exceeds = true;
690-
break;
689+
// Heuristic: if first batch already exceeds half the budget, go partitioned
690+
const goPartitioned = this.spill && batchSizeBytes > this.memoryBudget / 2;
691+
692+
if (!goPartitioned) {
693+
// Optimistic in-memory path: consume right side, tracking memory
694+
const inMemoryRows: Row[] = [...firstBatch];
695+
this.buildSizeBytes = batchSizeBytes;
696+
let exceeds = false;
697+
698+
while (true) {
699+
const batch = await this.right.next();
700+
if (!batch) break;
701+
for (const row of batch) {
702+
const rowSize = estimateRowSize(row);
703+
this.buildSizeBytes += rowSize;
704+
inMemoryRows.push(row);
705+
706+
if (this.spill && this.buildSizeBytes > this.memoryBudget) {
707+
exceeds = true;
708+
break;
709+
}
691710
}
711+
if (exceeds) break;
692712
}
693-
if (exceeds) break;
694-
}
695713

696-
if (!exceeds) {
697-
// Fits in memory — build hash map directly
698-
this.hashMap = new Map<string, Row[]>();
699-
for (const row of inMemoryRows) {
700-
const key = this.toJoinKey(row[this.rightKey]);
701-
const bucket = this.hashMap.get(key);
702-
if (bucket) bucket.push(row);
703-
else this.hashMap.set(key, [row]);
714+
if (!exceeds) {
715+
// Fits in memory — build hash map directly
716+
this.hashMap = new Map<string, Row[]>();
717+
for (const row of inMemoryRows) {
718+
const key = this.toJoinKey(row[this.rightKey]);
719+
const bucket = this.hashMap.get(key);
720+
if (bucket) bucket.push(row);
721+
else this.hashMap.set(key, [row]);
722+
}
723+
return;
704724
}
725+
726+
// Fell through: exceeded budget mid-stream, partition what we have
727+
this.buildExceeded = true;
728+
this.partitionCount = Math.max(4, Math.ceil(this.buildSizeBytes / (this.memoryBudget / 2)));
729+
await this.partitionRightRows(inMemoryRows);
730+
inMemoryRows.length = 0;
731+
732+
// Continue consuming remaining right-side rows
733+
await this.consumeRemainingRight();
705734
return;
706735
}
707736

708-
// Switch to Grace hash join: partition both sides with bounded memory.
709-
// Use partitionCount that ensures each partition fits in memory budget.
737+
// Proactive partition path: first batch signals right side is large
710738
this.buildExceeded = true;
711-
this.partitionCount = Math.max(4, Math.ceil(this.buildSizeBytes / (this.memoryBudget / 2)));
712-
const bucketBudget = Math.floor(this.memoryBudget / this.partitionCount);
739+
this.buildSizeBytes = batchSizeBytes;
740+
// Estimate partition count from first batch size × expected batch count
741+
// Use conservative estimate: assume at least 4× more data coming
742+
const estimatedTotal = batchSizeBytes * 4;
743+
this.partitionCount = Math.max(4, Math.ceil(estimatedTotal / (this.memoryBudget / 2)));
744+
745+
await this.partitionRightRows(firstBatch);
746+
await this.consumeRemainingRight();
747+
}
713748

714-
// Spill right-side rows we already consumed, flushing per-bucket when full
749+
/** Partition an array of right-side rows into spill buckets. */
750+
private async partitionRightRows(rows: Row[]): Promise<void> {
751+
const bucketBudget = Math.floor(this.memoryBudget / this.partitionCount);
715752
const rightBuckets: Row[][] = Array.from({ length: this.partitionCount }, () => []);
716753
const rightBucketBytes: number[] = new Array(this.partitionCount).fill(0);
717-
this.rightPartitionIds = new Array(this.partitionCount).fill("");
754+
if (!this.rightPartitionIds.length) {
755+
this.rightPartitionIds = new Array(this.partitionCount).fill("");
756+
}
718757

719-
const flushRightBucket = async (bi: number): Promise<void> => {
758+
const flushBucket = async (bi: number): Promise<void> => {
720759
if (rightBuckets[bi].length === 0) return;
721760
const spillId = await this.spill!.writeRun(rightBuckets[bi]);
722-
// Append to existing partition by tracking multiple spill IDs per partition
723761
this.rightPartitionIds[bi] = this.rightPartitionIds[bi]
724762
? this.rightPartitionIds[bi] + "|" + spillId
725763
: spillId;
726764
rightBuckets[bi] = [];
727765
rightBucketBytes[bi] = 0;
728766
};
729767

730-
for (const row of inMemoryRows) {
768+
for (const row of rows) {
731769
const bi = this.hashPartition(row[this.rightKey], this.partitionCount);
732770
const rowSize = estimateRowSize(row);
733771
if (rightBucketBytes[bi] + rowSize > bucketBudget && rightBuckets[bi].length > 0) {
734-
await flushRightBucket(bi);
772+
await flushBucket(bi);
735773
}
736774
rightBuckets[bi].push(row);
737775
rightBucketBytes[bi] += rowSize;
738776
}
739-
// Free inMemoryRows — no longer needed
740-
inMemoryRows.length = 0;
741777

742-
// Continue consuming remaining right-side rows
778+
for (let bi = 0; bi < this.partitionCount; bi++) await flushBucket(bi);
779+
}
780+
781+
/** Consume remaining right-side batches into partitions. */
782+
private async consumeRemainingRight(): Promise<void> {
783+
const bucketBudget = Math.floor(this.memoryBudget / this.partitionCount);
784+
const rightBuckets: Row[][] = Array.from({ length: this.partitionCount }, () => []);
785+
const rightBucketBytes: number[] = new Array(this.partitionCount).fill(0);
786+
787+
const flushBucket = async (bi: number): Promise<void> => {
788+
if (rightBuckets[bi].length === 0) return;
789+
const spillId = await this.spill!.writeRun(rightBuckets[bi]);
790+
this.rightPartitionIds[bi] = this.rightPartitionIds[bi]
791+
? this.rightPartitionIds[bi] + "|" + spillId
792+
: spillId;
793+
rightBuckets[bi] = [];
794+
rightBucketBytes[bi] = 0;
795+
};
796+
743797
while (true) {
744798
const batch = await this.right.next();
745799
if (!batch) break;
746800
for (const row of batch) {
747801
const bi = this.hashPartition(row[this.rightKey], this.partitionCount);
748802
const rowSize = estimateRowSize(row);
803+
this.buildSizeBytes += rowSize;
749804
if (rightBucketBytes[bi] + rowSize > bucketBudget && rightBuckets[bi].length > 0) {
750-
await flushRightBucket(bi);
805+
await flushBucket(bi);
751806
}
752807
rightBuckets[bi].push(row);
753808
rightBucketBytes[bi] += rowSize;
754809
}
755810
}
756811

757-
// Flush remaining right buckets
758-
for (let bi = 0; bi < this.partitionCount; bi++) await flushRightBucket(bi);
812+
for (let bi = 0; bi < this.partitionCount; bi++) await flushBucket(bi);
759813

760814
// Consume and partition left side with same bounded approach
761815
const leftBuckets: Row[][] = Array.from({ length: this.partitionCount }, () => []);

0 commit comments

Comments
 (0)