Skip to content

Commit b6836a2

Browse files
committed
fix: HashJoinOperator spill leak + drainPipeline error leak + RIGHT/FULL join missing left columns
Three bugs found by background audit agents: 1. HashJoinOperator.close() never called spill.cleanup() — Grace hash join R2/fs spill objects were permanently leaked after every join. 2. drainPipeline() called pipeline.close() only on success path. If any pipeline.next() threw, close() was never reached and all spill files (ExternalSort, HashJoin) were leaked. Now uses try/finally. 3. emitUnmatchedRight() and Grace hash processPartition() emitted unmatched right rows with only right-side columns present. LEFT/FULL join output had inconsistent schemas — matched rows had all columns, unmatched right rows were missing left columns entirely instead of having them as null. Added nullFilledRight() helper + leftColumns tracking from first probe batch.
1 parent 5ce39a2 commit b6836a2

File tree

1 file changed

+45
-18
lines changed

1 file changed

+45
-18
lines changed

src/operators.ts

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,6 +2116,9 @@ export class HashJoinOperator implements Operator {
21162116
private crossRightIdx = 0;
21172117
private crossDone = false;
21182118

2119+
// Column name tracking for null-fill in right/full joins
2120+
private leftColumns: string[] | null = null;
2121+
21192122
// Partitioned spill state (Grace hash join)
21202123
private partitionCount = 0;
21212124
private leftPartitionIds: string[] = [];
@@ -2172,14 +2175,28 @@ export class HashJoinOperator implements Operator {
21722175
return `${key}\x00${idx}`;
21732176
}
21742177

2178+
/** Create a merged row with null-filled left columns for an unmatched right row. */
2179+
private nullFilledRight(rightRow: Row): Row {
2180+
const merged: Row = {};
2181+
if (this.leftColumns) {
2182+
for (const k of this.leftColumns) merged[k] = null;
2183+
}
2184+
for (const k in rightRow) {
2185+
if (k === this.rightKey) continue;
2186+
const outKey = k in merged ? `right_${k}` : k;
2187+
merged[outKey] = rightRow[k];
2188+
}
2189+
return merged;
2190+
}
2191+
21752192
/** Emit unmatched right rows with null-filled left columns. */
21762193
private emitUnmatchedRight(): Row[] {
21772194
if (!this.hashMap || !this.rightMatched) return [];
21782195
const result: Row[] = [];
21792196
for (const [key, rows] of this.hashMap) {
21802197
for (let i = 0; i < rows.length; i++) {
21812198
if (!this.rightMatched.has(this.rightRowId(key, i))) {
2182-
result.push({ ...rows[i] });
2199+
result.push(this.nullFilledRight(rows[i]));
21832200
}
21842201
}
21852202
}
@@ -2416,6 +2433,8 @@ export class HashJoinOperator implements Operator {
24162433
// Probe with left partition (skip NULL keys)
24172434
const result: Row[] = [];
24182435
for await (const leftRow of this.streamPartition(this.leftPartitionIds[partIdx])) {
2436+
// Capture left column names on first row (for null-fill in right/full unmatched rows)
2437+
if (!this.leftColumns) this.leftColumns = Object.keys(leftRow);
24192438
const leftVal = leftRow[this.leftKey];
24202439
if (leftVal === null || leftVal === undefined) {
24212440
if (this.joinType === "left" || this.joinType === "full") result.push({ ...leftRow });
@@ -2438,13 +2457,13 @@ export class HashJoinOperator implements Operator {
24382457
for (const [key, rows] of rightMap) {
24392458
for (let i = 0; i < rows.length; i++) {
24402459
if (!matched.has(`${key}\x00${i}`)) {
2441-
result.push({ ...rows[i] });
2460+
result.push(this.nullFilledRight(rows[i]));
24422461
}
24432462
}
24442463
}
24452464
// NULL-keyed right rows never match — always unmatched
24462465
for (const row of rightNullRows) {
2447-
result.push({ ...row });
2466+
result.push(this.nullFilledRight(row));
24482467
}
24492468
}
24502469

@@ -2499,6 +2518,11 @@ export class HashJoinOperator implements Operator {
24992518
return null;
25002519
}
25012520

2521+
// Capture left column names on first batch (for null-fill in right/full unmatched rows)
2522+
if (!this.leftColumns && batch.length > 0) {
2523+
this.leftColumns = Object.keys(batch[0]);
2524+
}
2525+
25022526
const result: Row[] = [];
25032527
for (const leftRow of batch) {
25042528
const leftVal = leftRow[this.leftKey];
@@ -2540,6 +2564,7 @@ export class HashJoinOperator implements Operator {
25402564
this.rightMatched = null;
25412565
this.crossRightBuffer = null;
25422566
this.crossLeftBatch = null;
2567+
if (this.spill) await this.spill.cleanup();
25432568
await this.left.close();
25442569
await this.right.close();
25452570
}
@@ -2961,25 +2986,27 @@ function assemblePipeline(
29612986
* Drain all rows from an operator pipeline.
29622987
*/
29632988
export async function drainPipeline(pipeline: Operator): Promise<Row[]> {
2964-
// Use columnar path if the pipeline supports it — avoids Row[] materialization in intermediate operators
2965-
if (pipeline.nextColumnar) {
2989+
try {
2990+
// Use columnar path if the pipeline supports it — avoids Row[] materialization in intermediate operators
2991+
if (pipeline.nextColumnar) {
2992+
const rows: Row[] = [];
2993+
while (true) {
2994+
const batch = await pipeline.nextColumnar();
2995+
if (!batch) break;
2996+
// Materialize only at the pipeline exit
2997+
for (const row of materializeRows(batch)) rows.push(row);
2998+
}
2999+
return rows;
3000+
}
3001+
29663002
const rows: Row[] = [];
29673003
while (true) {
2968-
const batch = await pipeline.nextColumnar();
3004+
const batch = await pipeline.next();
29693005
if (!batch) break;
2970-
// Materialize only at the pipeline exit
2971-
for (const row of materializeRows(batch)) rows.push(row);
3006+
for (const row of batch) rows.push(row);
29723007
}
2973-
await pipeline.close();
29743008
return rows;
3009+
} finally {
3010+
await pipeline.close();
29753011
}
2976-
2977-
const rows: Row[] = [];
2978-
while (true) {
2979-
const batch = await pipeline.next();
2980-
if (!batch) break;
2981-
for (const row of batch) rows.push(row);
2982-
}
2983-
await pipeline.close();
2984-
return rows;
29853012
}

0 commit comments

Comments
 (0)