Skip to content

Commit 32f3b72

Browse files
committed
fix: bigint/number filter comparison, page skip alignment, Parquet V2 decompression
- Fix bigint===number always false: coerce types in matchesFilter(), canSkipPage(), and query-do inline Parquet filters (int64 columns decode as bigint but JSON filter values are numbers) - Fix page-buffer index misalignment: when canSkipPage() skips pages, track non-skipped page infos per column so buffer indices stay aligned with col.pages metadata (was using wrong encoding/rowCount for decode) - Fix DATA_PAGE_V2 decompression: subtract rep/def level byte lengths from uncompressedSize before passing to decompressor (per Parquet spec, levels are uncompressed in V2 pages) - Fix Snappy: return actual bytes written on early termination instead of zero-padded full buffer - Fix RLE bitWidth=32: mask was 0 due to JS 32-bit shift overflow - Add SQL buffer length check (64KB max) to prevent WASM memory corruption - Add gzip retry with increasing capacity (4x, 16x, 64x) for high compression ratios - Add bounds checking in parseWasmResult to prevent reading past buffer
1 parent bdf131d commit 32f3b72

File tree

4 files changed

+76
-32
lines changed

4 files changed

+76
-32
lines changed

src/decode.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@ export function canSkipPage(page: PageInfo, filters: QueryDescriptor["filters"],
99
if (filter.column !== columnName) continue;
1010
if (page.minValue === undefined || page.maxValue === undefined) continue;
1111

12-
const { minValue: min, maxValue: max } = page;
13-
const val = filter.value;
12+
let { minValue: min, maxValue: max } = page;
13+
let val = filter.value;
1414
if (typeof val === "object") continue;
1515

16+
// Coerce bigint↔number for cross-type comparisons
17+
if (typeof min === "bigint" && typeof val === "number") val = BigInt(Math.trunc(val as number));
18+
else if (typeof min === "number" && typeof val === "bigint") { min = BigInt(Math.trunc(min)); max = BigInt(Math.trunc(max as number)); }
19+
1620
switch (filter.op) {
1721
case "gt": if (max <= val) return true; break;
1822
case "gte": if (max < val) return true; break;
@@ -282,20 +286,27 @@ function vectorSearch(
282286

283287
// --- Filters ---
284288

289+
/** Coerce bigint↔number for cross-type comparisons (JSON filter values are numbers, int64 columns decode as bigint). */
290+
function coerceCompare(a: unknown, b: unknown): [unknown, unknown] {
291+
if (typeof a === "bigint" && typeof b === "number") return [a, BigInt(Math.trunc(b))];
292+
if (typeof a === "number" && typeof b === "bigint") return [BigInt(Math.trunc(a)), b];
293+
return [a, b];
294+
}
295+
285296
export function matchesFilter(
286297
val: number | bigint | string | boolean | Float32Array | null,
287298
filter: QueryDescriptor["filters"][0],
288299
): boolean {
289300
if (val === null) return false;
290301
const t = filter.value;
291302
switch (filter.op) {
292-
case "eq": return val === t;
293-
case "neq": return val !== t;
294-
case "gt": return val > (t as number | bigint | string);
295-
case "gte": return val >= (t as number | bigint | string);
296-
case "lt": return val < (t as number | bigint | string);
297-
case "lte": return val <= (t as number | bigint | string);
298-
case "in": return Array.isArray(t) && t.includes(val as number | bigint | string);
303+
case "eq": { const [a, b] = coerceCompare(val, t); return a === b; }
304+
case "neq": { const [a, b] = coerceCompare(val, t); return a !== b; }
305+
case "gt": { const [a, b] = coerceCompare(val, t); return (a as number | bigint | string) > (b as number | bigint | string); }
306+
case "gte": { const [a, b] = coerceCompare(val, t); return (a as number | bigint | string) >= (b as number | bigint | string); }
307+
case "lt": { const [a, b] = coerceCompare(val, t); return (a as number | bigint | string) < (b as number | bigint | string); }
308+
case "lte": { const [a, b] = coerceCompare(val, t); return (a as number | bigint | string) <= (b as number | bigint | string); }
309+
case "in": return Array.isArray(t) && t.some(v => { const [a, b] = coerceCompare(val, v); return a === b; });
299310
default: return true;
300311
}
301312
}

src/parquet-decode.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ export function decompressSnappy(input: Uint8Array): Uint8Array {
8080
}
8181
}
8282

83+
if (outPos < uncompressedLen) {
84+
// Decompression terminated early — return only the valid portion
85+
return output.subarray(0, outPos);
86+
}
8387
return output;
8488
}
8589

@@ -93,7 +97,7 @@ function decodeRleBitPacked(
9397
): { values: number[]; bytesRead: number } {
9498
const values: number[] = [];
9599
let pos = offset;
96-
const mask = (1 << bitWidth) - 1;
100+
const mask = bitWidth >= 32 ? 0xFFFFFFFF : (1 << bitWidth) - 1;
97101

98102
while (pos < bytes.length && values.length < maxValues) {
99103
const { value: header, bytesRead } = readVarint(bytes, pos);
@@ -433,9 +437,12 @@ export function decodeParquetColumnChunk(
433437
dataStart += header.defLevelsByteLength;
434438

435439
// Actual data (may be compressed)
440+
// V2 spec: uncompressed_page_size includes rep/def level bytes which are NOT compressed,
441+
// so subtract them to get the actual data payload uncompressed size.
436442
let dataPayload: Uint8Array = pageData.subarray(dataStart);
437443
if (header.isCompressed) {
438-
dataPayload = decompressPage(dataPayload, pageEncoding.compression, header.uncompressedSize, wasm);
444+
const dataUncompressedSize = header.uncompressedSize - header.repLevelsByteLength - header.defLevelsByteLength;
445+
dataPayload = decompressPage(dataPayload, pageEncoding.compression, dataUncompressedSize, wasm);
439446
}
440447

441448
// Decode def levels to find nulls

src/query-do.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -517,14 +517,19 @@ export class QueryDO implements DurableObject {
517517
}
518518
}
519519

520-
// Build per-page ranges, applying page-level skip
520+
// Build per-page ranges, applying page-level skip.
521+
// Track non-skipped page infos per column so buffer indices stay aligned.
521522
const ranges: { column: string; offset: number; length: number }[] = [];
523+
const columnPageInfos = new Map<string, typeof cols[0]["pages"]>();
522524
let pagesSkipped = 0;
523525
for (const col of cols) {
526+
const keptPages: typeof col.pages = [];
524527
for (const page of col.pages) {
525528
if (!query.vectorSearch && canSkipPage(page, query.filters, col.name)) { pagesSkipped++; continue; }
529+
keptPages.push(page);
526530
ranges.push({ column: col.name, offset: Number(page.byteOffset), length: page.byteLength });
527531
}
532+
columnPageInfos.set(col.name, keptPages);
528533
}
529534

530535
// Cache-before-fetch: check WASM buffer pool for each range
@@ -613,10 +618,13 @@ export class QueryDO implements DurableObject {
613618
const pages = columnData.get(col.name);
614619
if (!pages?.length) { decodedColumns.set(col.name, []); continue; }
615620

621+
// Use non-skipped page infos (aligned with columnData buffers, not col.pages)
622+
const keptPageInfos = columnPageInfos.get(col.name) ?? col.pages;
623+
616624
// Concatenate all page buffers for this column (may span multiple row groups)
617625
const allValues: (number | bigint | string | boolean | null)[] = [];
618626
for (let pi = 0; pi < pages.length; pi++) {
619-
const pageInfo = col.pages[pi];
627+
const pageInfo = keptPageInfos[pi];
620628
const encoding = pageInfo?.encoding ?? { compression: "UNCOMPRESSED" };
621629

622630
// Include dictionary page if present: fetch it from R2 and prepend
@@ -673,19 +681,23 @@ export class QueryDO implements DurableObject {
673681
rows.push(row);
674682
}
675683

676-
// Apply filters in JS
684+
// Apply filters in JS (coerce bigint↔number for cross-type comparison)
677685
if (query.filters.length > 0) {
678686
rows = rows.filter(row => {
679687
for (const f of query.filters) {
680688
const val = row[f.column];
681689
if (val == null) return false;
690+
// Coerce bigint↔number: int64 columns decode as bigint, JSON filter values are numbers
691+
let cv = val, cf = f.value;
692+
if (typeof cv === "bigint" && typeof cf === "number") cf = BigInt(Math.trunc(cf));
693+
else if (typeof cv === "number" && typeof cf === "bigint") cv = BigInt(Math.trunc(cv));
682694
switch (f.op) {
683-
case "eq": if (val !== f.value) return false; break;
684-
case "neq": if (val === f.value) return false; break;
685-
case "gt": if (!(val > f.value)) return false; break;
686-
case "gte": if (!(val >= f.value)) return false; break;
687-
case "lt": if (!(val < f.value)) return false; break;
688-
case "lte": if (!(val <= f.value)) return false; break;
695+
case "eq": if (cv !== cf) return false; break;
696+
case "neq": if (cv === cf) return false; break;
697+
case "gt": if (!(cv > (cf as number | bigint | string))) return false; break;
698+
case "gte": if (!(cv >= (cf as number | bigint | string))) return false; break;
699+
case "lt": if (!(cv < (cf as number | bigint | string))) return false; break;
700+
case "lte": if (!(cv <= (cf as number | bigint | string))) return false; break;
689701
}
690702
}
691703
return true;
@@ -722,7 +734,8 @@ export class QueryDO implements DurableObject {
722734
for (const col of cols) {
723735
const pages = columnData.get(col.name);
724736
if (!pages?.length) continue;
725-
if (!this.wasmEngine.registerColumn(query.table, col.name, col.dtype, pages, col.pages, col.listDimension)) {
737+
const keptPageInfos = columnPageInfos.get(col.name) ?? col.pages;
738+
if (!this.wasmEngine.registerColumn(query.table, col.name, col.dtype, pages, keptPageInfos, col.listDimension)) {
726739
throw new Error(`WASM OOM: failed to register column "${col.name}" for table "${query.table}"`);
727740
}
728741
}

src/wasm-engine.ts

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,19 +171,24 @@ export class WasmEngine {
171171
return new Uint8Array(this.exports.memory.buffer, outPtr, written).slice();
172172
}
173173

174-
/** Decompress GZIP data using the Zig std.compress.gzip implementation. */
174+
/** Decompress GZIP data using the Zig std.compress.gzip implementation. Retries with larger buffer if needed. */
175175
decompressGzip(compressed: Uint8Array): Uint8Array {
176176
const inPtr = this.exports.alloc(compressed.length);
177177
if (!inPtr) throw new Error("WASM OOM allocating gzip input");
178178
new Uint8Array(this.exports.memory.buffer, inPtr, compressed.length).set(compressed);
179179

180-
const capacity = compressed.length * 4;
181-
const outPtr = this.exports.alloc(capacity);
182-
if (!outPtr) throw new Error("WASM OOM allocating gzip output");
180+
// Try increasing capacities: 4x, 16x, 64x (handles high compression ratios)
181+
for (const multiplier of [4, 16, 64]) {
182+
const capacity = compressed.length * multiplier;
183+
const outPtr = this.exports.alloc(capacity);
184+
if (!outPtr) throw new Error("WASM OOM allocating gzip output");
183185

184-
const written = this.exports.gzip_decompress(inPtr, compressed.length, outPtr, capacity);
185-
if (written === 0 && compressed.length > 0) throw new Error("gzip decompression failed");
186-
return new Uint8Array(this.exports.memory.buffer, outPtr, written).slice();
186+
const written = this.exports.gzip_decompress(inPtr, compressed.length, outPtr, capacity);
187+
if (written > 0) return new Uint8Array(this.exports.memory.buffer, outPtr, written).slice();
188+
// written === 0 && capacity may have been too small — retry with larger buffer
189+
if (written === 0 && compressed.length === 0) return new Uint8Array(0);
190+
}
191+
throw new Error("gzip decompression failed (output exceeds 64x compressed size)");
187192
}
188193

189194
/** Decompress LZ4 block data (Parquet hadoop codec). */
@@ -485,7 +490,11 @@ export class WasmEngine {
485490
}
486491

487492
executeQuery(query: QueryDescriptor): Row[] | null {
493+
const MAX_SQL_LENGTH = 64 * 1024; // 64KB — WASM SQL input buffer is fixed-size
488494
const sqlBytes = textEncoder.encode(queryToSql(query));
495+
if (sqlBytes.length > MAX_SQL_LENGTH) {
496+
throw new Error(`SQL query too large (${sqlBytes.length} bytes, max ${MAX_SQL_LENGTH})`);
497+
}
489498
const sqlBufPtr = this.exports.getSqlInputBuffer();
490499
new Uint8Array(this.exports.memory.buffer, sqlBufPtr, sqlBytes.length).set(sqlBytes);
491500
this.exports.setSqlInputLength(sqlBytes.length);
@@ -855,17 +864,21 @@ function parseWasmResult(memoryBuffer: ArrayBuffer, ptr: number, size: number):
855864

856865
const rows: Row[] = Array.from({ length: numRows }, () => ({}));
857866
let dp = dataStart;
867+
const end = ptr + size;
858868

859869
for (const col of columns) {
860870
for (let r = 0; r < numRows; r++) {
871+
if (dp + 1 > end) return rows.slice(0, r > 0 ? r : 0); // truncated result
861872
switch (col.type) {
862-
case WasmColumnType.Int64: rows[r][col.name] = view.getBigInt64(dp, true); dp += 8; break;
863-
case WasmColumnType.Float64: rows[r][col.name] = view.getFloat64(dp, true); dp += 8; break;
864-
case WasmColumnType.Int32: rows[r][col.name] = view.getInt32(dp, true); dp += 4; break;
865-
case WasmColumnType.Float32: rows[r][col.name] = view.getFloat32(dp, true); dp += 4; break;
873+
case WasmColumnType.Int64: if (dp + 8 > end) return rows; rows[r][col.name] = view.getBigInt64(dp, true); dp += 8; break;
874+
case WasmColumnType.Float64: if (dp + 8 > end) return rows; rows[r][col.name] = view.getFloat64(dp, true); dp += 8; break;
875+
case WasmColumnType.Int32: if (dp + 4 > end) return rows; rows[r][col.name] = view.getInt32(dp, true); dp += 4; break;
876+
case WasmColumnType.Float32: if (dp + 4 > end) return rows; rows[r][col.name] = view.getFloat32(dp, true); dp += 4; break;
866877
case WasmColumnType.Bool: rows[r][col.name] = buf[dp] !== 0; dp += 1; break;
867878
case WasmColumnType.String: {
879+
if (dp + 4 > end) return rows;
868880
const len = view.getUint32(dp, true); dp += 4;
881+
if (dp + len > end) return rows;
869882
rows[r][col.name] = textDecoder.decode(buf.subarray(dp, dp + len)); dp += len;
870883
break;
871884
}

0 commit comments

Comments
 (0)