Skip to content

Commit d44554a

Browse files
committed
fix: add WasmEngine.resetHeap() wrapper with safety invariant docs
The Zig WASM engine uses a bump allocator — resetHeap() invalidates ALL prior allocations. All call sites now go through WasmEngine.resetHeap() instead of reaching into .exports.resetHeap() directly. This creates a single choke point for future guards and documents the 5 safety invariants that prevent heap corruption (single-threaded DOs, no await between reset and consumption, .slice() before next reset, per-DO ownership, decompress methods copy results out). Verified all 14 call sites across operators.ts, query-do.ts, and fragment-do.ts — every path is synchronous between reset and consumption.
1 parent adcb63d commit d44554a

File tree

6 files changed

+49
-25
lines changed

6 files changed

+49
-25
lines changed

src/fragment-do.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ vi.mock("./wasm-engine.js", () => ({
1717
clearTables: () => {},
1818
},
1919
reset: () => {},
20+
resetHeap: () => {},
2021
clearTable: () => {},
2122
registerColumn: () => true,
2223
registerColumns: () => true,

src/fragment-do.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ export class FragmentDO extends DurableObject<Env> {
183183
// Zero-copy WASM path: register raw page data per-column, execute SQL in WASM
184184
const wasmStart = Date.now();
185185
const fragTable = `__frag_${r2Key}`;
186-
this.wasmEngine.exports.resetHeap();
186+
this.wasmEngine.resetHeap();
187187
const fragColEntries = cols
188188
.filter(col => columnData.get(col.name)?.length)
189189
.map(col => ({

src/operators.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ function wasmFilterNumeric(
559559
wasm: WasmEngine,
560560
): Uint32Array | null {
561561
try {
562-
wasm.exports.resetHeap();
562+
wasm.resetHeap();
563563

564564
if (dtype === "float64" || dtype === "float32") {
565565
const dataPtr = wasm.exports.alloc(rowCount * 8);
@@ -620,7 +620,7 @@ function wasmFilterRange(
620620
rowCount: number, wasm: WasmEngine, negate = false,
621621
): Uint32Array | null {
622622
try {
623-
wasm.exports.resetHeap();
623+
wasm.resetHeap();
624624

625625
if (dtype === "float64" || dtype === "float32") {
626626
const dataPtr = wasm.exports.alloc(rowCount * 8);
@@ -677,7 +677,7 @@ function wasmFilterLike(
677677
wasm: WasmEngine,
678678
): Uint32Array | null {
679679
try {
680-
wasm.exports.resetHeap();
680+
wasm.resetHeap();
681681
const encoder = _textEncoder;
682682

683683
// Build offsets array and packed string data
@@ -730,7 +730,7 @@ function wasmFilterLike(
730730
/** Intersect two sorted index arrays using WASM. */
731731
function wasmIntersect(a: Uint32Array, b: Uint32Array, wasm: WasmEngine): Uint32Array {
732732
try {
733-
wasm.exports.resetHeap();
733+
wasm.resetHeap();
734734
const aPtr = wasm.exports.alloc(a.byteLength);
735735
const bPtr = wasm.exports.alloc(b.byteLength);
736736
const outPtr = wasm.exports.alloc(Math.min(a.length, b.length) * 4);
@@ -758,7 +758,7 @@ function wasmIntersect(a: Uint32Array, b: Uint32Array, wasm: WasmEngine): Uint32
758758
/** Union two sorted index arrays using WASM. */
759759
function wasmUnion(a: Uint32Array, b: Uint32Array, wasm: WasmEngine): Uint32Array {
760760
try {
761-
wasm.exports.resetHeap();
761+
wasm.resetHeap();
762762
const aPtr = wasm.exports.alloc(a.byteLength);
763763
const bPtr = wasm.exports.alloc(b.byteLength);
764764
const outPtr = wasm.exports.alloc((a.length + b.length) * 4);
@@ -2015,7 +2015,7 @@ export class WasmAggregateOperator implements Operator {
20152015
let matchCount = -1; // -1 = no filter, use full buffer
20162016
let indicesPtr = 0;
20172017
if (hasFilters) {
2018-
this.wasm.exports.resetHeap();
2018+
this.wasm.resetHeap();
20192019

20202020
// Evaluate AND filters
20212021
let currentIndices: Uint32Array | null = null;
@@ -2040,7 +2040,7 @@ export class WasmAggregateOperator implements Operator {
20402040
if (!currentIndices || currentIndices.length === 0) continue;
20412041

20422042
// Copy indices to WASM for indexed aggregates
2043-
this.wasm.exports.resetHeap();
2043+
this.wasm.resetHeap();
20442044
indicesPtr = this.wasm.exports.alloc(currentIndices.byteLength);
20452045
if (indicesPtr) {
20462046
new Uint32Array(this.wasm.exports.memory.buffer, indicesPtr, currentIndices.length).set(currentIndices);
@@ -2142,7 +2142,7 @@ export class WasmAggregateOperator implements Operator {
21422142
const elemSize = col.dtype === "int32" ? 4 : 8;
21432143
const rowCount = buf.byteLength / elemSize;
21442144

2145-
this.wasm.exports.resetHeap();
2145+
this.wasm.resetHeap();
21462146
const dataPtr = this.wasm.exports.alloc(buf.byteLength);
21472147
if (!dataPtr) return new Uint32Array(0);
21482148
new Uint8Array(this.wasm.exports.memory.buffer, dataPtr, buf.byteLength).set(new Uint8Array(buf));

src/query-do.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ class EdgeScanOperator implements Operator {
290290
await this.fetchAllPages();
291291

292292
const wasmStart = Date.now();
293-
this.wasmEngine.exports.resetHeap();
293+
this.wasmEngine.resetHeap();
294294
const fragTable = `__edge_${this.meta.r2Key}`;
295295
const colEntries = this.cols
296296
.filter(col => this.columnData.get(col.name)?.length)
@@ -358,7 +358,7 @@ class EdgeScanOperator implements Operator {
358358
for (const v of decodedColumns.values()) if (v.length > numRows) numRows = v.length;
359359
if (numRows === 0) return this.next(); // skip empty pages
360360

361-
this.wasmEngine.exports.resetHeap();
361+
this.wasmEngine.resetHeap();
362362
const fragTable = `__edge_pq_${pi}`;
363363
const decodedEntries = cols
364364
.filter(col => decodedColumns.get(col.name)?.length)
@@ -885,7 +885,7 @@ export class QueryDO extends DurableObject<Env> {
885885
const r2ReadMs = Date.now() - r2Start;
886886

887887
const wasmStart = Date.now();
888-
this.wasmEngine.exports.resetHeap();
888+
this.wasmEngine.resetHeap();
889889

890890
// Load fragment and extract columns via fragment reader
891891
const dataPtr = this.wasmEngine.exports.alloc(fileData.byteLength);
@@ -1132,7 +1132,7 @@ export class QueryDO extends DurableObject<Env> {
11321132

11331133
// Try WASM SQL path: register decoded columns → executeQuery (SIMD filter/sort/agg)
11341134
const colNames = cols.map(c => c.name);
1135-
this.wasmEngine.exports.resetHeap();
1135+
this.wasmEngine.resetHeap();
11361136
const decodedEntries = cols
11371137
.filter(col => decodedColumns.get(col.name)?.length)
11381138
.map(col => ({ name: col.name, dtype: col.dtype, values: decodedColumns.get(col.name)! }));
@@ -1176,7 +1176,7 @@ export class QueryDO extends DurableObject<Env> {
11761176

11771177
// Lance path: zero-copy WASM registration + SQL execution
11781178
const wasmStart = Date.now();
1179-
this.wasmEngine.exports.resetHeap();
1179+
this.wasmEngine.resetHeap();
11801180
const lanceColEntries = cols
11811181
.filter(col => columnData.get(col.name)?.length)
11821182
.map(col => ({

src/wasm-engine.integration.test.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ beforeAll(async () => {
2828
describe.skipIf(!hasWasm)("WASM integration", () => {
2929
describe("int64 + float64 columns → SQL → rows", () => {
3030
it("registers columns and executes SELECT *", () => {
31-
wasm.exports.resetHeap();
31+
wasm.resetHeap();
3232

3333
const table = "test_nums";
3434
const rowCount = 3;
@@ -63,7 +63,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
6363

6464
describe("string column registration", () => {
6565
it("registers utf8 strings with length-prefixed encoding", () => {
66-
wasm.exports.resetHeap();
66+
wasm.resetHeap();
6767
const table = "test_strings";
6868

6969
// Build length-prefixed string data: "hello" (5), "world" (5)
@@ -94,7 +94,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
9494

9595
describe("type promotion", () => {
9696
it("promotes int32 to int64 for WASM registration", () => {
97-
wasm.exports.resetHeap();
97+
wasm.resetHeap();
9898
const table = "test_promo";
9999

100100
// int32 values
@@ -117,7 +117,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
117117

118118
describe("filter pushdown SQL", () => {
119119
it("filters with WHERE gt", () => {
120-
wasm.exports.resetHeap();
120+
wasm.resetHeap();
121121
const table = "test_filter";
122122

123123
const i64Buf = new BigInt64Array([10n, 20n, 30n, 40n, 50n]);
@@ -139,7 +139,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
139139
});
140140

141141
it("filters with WHERE eq", () => {
142-
wasm.exports.resetHeap();
142+
wasm.resetHeap();
143143
const table = "test_eq";
144144

145145
const f64Buf = new Float64Array([1.0, 2.0, 3.0, 2.0]);
@@ -160,7 +160,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
160160
});
161161

162162
it("filters with WHERE lt on float64", () => {
163-
wasm.exports.resetHeap();
163+
wasm.resetHeap();
164164
const table = "test_lt";
165165

166166
const f64Buf = new Float64Array([1.0, 2.0, 3.0, 4.0]);
@@ -181,7 +181,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
181181

182182
describe("vector search", () => {
183183
it("finds top-K nearest vectors via vectorSearchBuffer", () => {
184-
wasm.exports.resetHeap();
184+
wasm.resetHeap();
185185
const dim = 4;
186186
const numVectors = 5;
187187

@@ -231,7 +231,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
231231

232232
describe("LIMIT and ORDER BY", () => {
233233
it("applies LIMIT to results", () => {
234-
wasm.exports.resetHeap();
234+
wasm.resetHeap();
235235
const table = "test_limit";
236236

237237
const i64Buf = new BigInt64Array([1n, 2n, 3n, 4n, 5n]);
@@ -252,7 +252,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
252252

253253
describe("multi-column SELECT with projections", () => {
254254
it("returns only projected columns", () => {
255-
wasm.exports.resetHeap();
255+
wasm.resetHeap();
256256
const table = "test_proj";
257257

258258
const i64Buf = new BigInt64Array([1n, 2n, 3n]);
@@ -278,7 +278,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
278278

279279
describe("bool column", () => {
280280
it("registers and queries bool values", () => {
281-
wasm.exports.resetHeap();
281+
wasm.resetHeap();
282282
const table = "test_bool";
283283

284284
// Bool bitmap: [true, false, true] = 0b101 = 0x05
@@ -301,7 +301,7 @@ describe.skipIf(!hasWasm)("WASM integration", () => {
301301

302302
describe("aggregates", () => {
303303
it("computes SUM via WASM", () => {
304-
wasm.exports.resetHeap();
304+
wasm.resetHeap();
305305
const table = "test_sum";
306306

307307
const f64Buf = new Float64Array([10.0, 20.0, 30.0]);

src/wasm-engine.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,22 @@ export async function instantiateWasm(wasmModule: WebAssembly.Module, opts?: { o
169169
return engine;
170170
}
171171

172+
/**
173+
* WASM Heap Safety Invariants (bump allocator):
174+
*
175+
* The Zig engine uses a bump allocator — resetHeap() resets the pointer to zero,
176+
* invalidating ALL prior allocations. Correctness requires:
177+
*
178+
* 1. resetHeap() MUST be called before each batch of alloc() calls
179+
* 2. Between resetHeap() and result consumption: NO await (purely synchronous)
180+
* 3. Results MUST be copied to JS (.slice() or parseWasmResult) before next resetHeap()
181+
* 4. Each Durable Object owns its own WasmEngine — no cross-DO sharing
182+
* 5. Decompress/aggregate helpers allocate without reset (.slice() results) — safe
183+
*
184+
* These invariants hold because Cloudflare DOs are single-threaded:
185+
* no parallel JS execution within a DO, so no interleaving between
186+
* resetHeap() and result consumption.
187+
*/
172188
export class WasmEngine {
173189
readonly exports: WasmExports;
174190
constructor(exports: WasmExports) { this.exports = exports; }
@@ -179,6 +195,13 @@ export class WasmEngine {
179195
return this.exports.alloc(bytes);
180196
}
181197

198+
/**
199+
* Reset the bump allocator — invalidates ALL prior allocations.
200+
* Call before each batch of alloc()+execute operations.
201+
* The caller MUST consume results synchronously (no await) before the next reset.
202+
*/
203+
resetHeap(): void { this.exports.resetHeap(); }
204+
182205
reset(): void { this.exports.resetResult(); }
183206

184207
/** Decompress ZSTD data using the Zig std.compress.zstd implementation. */

0 commit comments

Comments
 (0)