Skip to content

Commit 5103d82

Browse files
committed
fix: deep-clone partial-agg states in mergePartialAggs, in-place merge in AggregateOperator
- mergePartialAggs shallow-copied states with {...s}, but values[] and distinctSet were shared references — mergeStates mutated the originals - Added cloneState() helper that deep-copies values/distinctSet - AggregateOperator now merges in-place (owns accumulated state) instead of calling mergePartialAggs per batch, avoiding O(n) Set clone per iteration - TopKOperator: extract column arrays outside inner loop (Map.get → direct access)
1 parent bed49cc commit 5103d82

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

src/operators.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
computePartialAgg,
3535
computePartialAggColumnar,
3636
mergePartialAggs,
37+
mergeStates,
3738
finalizePartialAgg,
3839
type PartialAgg,
3940
} from "./partial-agg.js";
@@ -1533,7 +1534,17 @@ export class AggregateOperator implements Operator {
15331534
const batch = await this.upstream.nextColumnar();
15341535
if (!batch) break;
15351536
const partial = computePartialAggColumnar(batch, this.query);
1536-
merged = merged ? mergePartialAggs([merged, partial]) : partial;
1537+
if (!merged) { merged = partial; continue; }
1538+
// Merge in-place — we own merged, no clone needed
1539+
if (merged.groups && partial.groups) {
1540+
for (const [key, states] of partial.groups) {
1541+
const existing = merged.groups.get(key);
1542+
if (!existing) merged.groups.set(key, states);
1543+
else mergeStates(existing, states);
1544+
}
1545+
} else {
1546+
mergeStates(merged.states, partial.states);
1547+
}
15371548
}
15381549
if (!merged) return finalizePartialAgg(computePartialAgg([], this.query), this.query);
15391550
return finalizePartialAgg(merged, this.query);
@@ -1545,7 +1556,16 @@ export class AggregateOperator implements Operator {
15451556
const batch = await this.upstream.next();
15461557
if (!batch) break;
15471558
const partial = computePartialAgg(batch, this.query);
1548-
merged = merged ? mergePartialAggs([merged, partial]) : partial;
1559+
if (!merged) { merged = partial; continue; }
1560+
if (merged.groups && partial.groups) {
1561+
for (const [key, states] of partial.groups) {
1562+
const existing = merged.groups.get(key);
1563+
if (!existing) merged.groups.set(key, states);
1564+
else mergeStates(existing, states);
1565+
}
1566+
} else {
1567+
mergeStates(merged.states, partial.states);
1568+
}
15491569
}
15501570

15511571
if (!merged) {
@@ -1633,7 +1653,7 @@ export class TopKOperator implements Operator {
16331653
const batch = await this.upstream.nextColumnar();
16341654
if (!batch) break;
16351655
const indices = batch.selection ?? identityIndices(batch.rowCount);
1636-
const colNames = Array.from(batch.columns.keys());
1656+
const colEntries = Array.from(batch.columns.entries());
16371657
const sortVals = batch.columns.get(col);
16381658
for (const idx of indices) {
16391659
// Fast reject: if heap is full and this value can't beat the root, skip materialization
@@ -1644,8 +1664,8 @@ export class TopKOperator implements Operator {
16441664
if (rv !== null && (desc ? nv <= rv : nv >= rv)) continue;
16451665
}
16461666
const row: Row = {};
1647-
for (const name of colNames) {
1648-
row[name] = (batch.columns.get(name)![idx] as Row[string]) ?? null;
1667+
for (let c = 0; c < colEntries.length; c++) {
1668+
row[colEntries[c][0]] = (colEntries[c][1][idx] as Row[string]) ?? null;
16491669
}
16501670
pushRow(row);
16511671
}

src/partial-agg.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ export function computePartialAggColumnar(
194194
return { states: [], groups };
195195
}
196196

197-
function mergeStates(
197+
export function mergeStates(
198198
target: PartialAggState[],
199199
source: PartialAggState[],
200200
): void {
@@ -230,13 +230,21 @@ function mergeStates(
230230
}
231231
}
232232

233+
function cloneState(s: PartialAggState): PartialAggState {
234+
return {
235+
...s,
236+
values: s.values ? [...s.values] : undefined,
237+
distinctSet: s.distinctSet ? new Set(s.distinctSet) : undefined,
238+
};
239+
}
240+
233241
export function mergePartialAggs(partials: PartialAgg[]): PartialAgg {
234242
if (partials.length === 0) return { states: [] };
235243

236244
const hasGroups = partials.some((p) => p.groups && p.groups.size > 0);
237245

238246
if (!hasGroups) {
239-
const merged = partials[0].states.map((s) => ({ ...s }));
247+
const merged = partials[0].states.map(cloneState);
240248
for (let i = 1; i < partials.length; i++) {
241249
mergeStates(merged, partials[i].states);
242250
}
@@ -249,7 +257,7 @@ export function mergePartialAggs(partials: PartialAgg[]): PartialAgg {
249257
for (const [key, states] of partial.groups) {
250258
const existing = mergedGroups.get(key);
251259
if (!existing) {
252-
mergedGroups.set(key, states.map((s) => ({ ...s })));
260+
mergedGroups.set(key, states.map(cloneState));
253261
} else {
254262
mergeStates(existing, states);
255263
}

0 commit comments

Comments
 (0)