Skip to content

Commit d7c637b

Browse files
committed
refactor: improve code quality, validation, and test robustness
Query DO: - Add table name validation in handleQuery (400 for missing/invalid) - Fix unhandled promise in registerWithMaster (try/catch + error logging) - Add datasetCache eviction (LRU by updatedAt, max 100 entries) - Add filter/projection counts to error log context Master DO: - Validate r2Key in handleWrite/handleRefresh (reject empty or path traversal) Conformance tests: - Share fixture loading via beforeAll (was re-reading same files per test) - Strengthen assertions: exact values instead of toBeGreaterThan(0) - Fix unsafe type coercions (typeof check before bigint comparison) - Add encoding assertions before decodeParquetColumnChunk calls - Add edge case tests: empty filters, AND logic, non-existent column - Skip generated-fixture tests when bench data not present (CI compat) - Extract ROW_GROUP_SIZE and BENCH_CATEGORIES constants Scripts: - Fix Avro manifest encoding (proper zigzag varints, correct map count) - Fix O(n²) offset tracking → O(1) running fileOffset - Add percentile bounds checking in bench.ts - Parallelize Parquet seed uploads (concurrency limit 4) - Add generated fixtures to .gitignore
1 parent 4478cce commit d7c637b

7 files changed

Lines changed: 348 additions & 230 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ wasm/benchmarks/
1313
wasm/fixtures/
1414
wasm/deps/
1515
wasm/examples/
16+
wasm/tests/fixtures/generated/

scripts/bench.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ interface BenchResult {
3535

3636
function percentile(sorted: number[], p: number): number {
3737
const idx = Math.ceil((p / 100) * sorted.length) - 1;
38-
return sorted[Math.max(0, idx)];
38+
return sorted[Math.max(0, Math.min(idx, sorted.length - 1))];
3939
}
4040

4141
async function runQuery(body: unknown): Promise<{ latencyMs: number; result: Record<string, unknown> }> {
@@ -60,6 +60,7 @@ async function bench(name: string, query: unknown): Promise<BenchResult> {
6060
}
6161

6262
const latencies: number[] = [];
63+
// Reset each run — only the latest successful result's metrics are reported
6364
let lastResult: Record<string, unknown> = {};
6465

6566
for (let i = 0; i < BENCH_RUNS; i++) {

scripts/generate-bench-data.ts

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,10 @@ function writeParquet(columns: Column[], rowGroupSize = ROW_GROUP_SIZE): Buffer
154154
const numRows = columns[0].values.length;
155155
const numRowGroups = Math.ceil(numRows / rowGroupSize);
156156
const parts: Buffer[] = [];
157+
let fileOffset = 0;
157158

158-
parts.push(Buffer.from("PAR1"));
159+
const addPart = (buf: Buffer) => { parts.push(buf); fileOffset += buf.length; };
160+
addPart(Buffer.from("PAR1"));
159161

160162
// Track per-row-group column chunk metadata
161163
const rowGroupMetas: {
@@ -176,10 +178,11 @@ function writeParquet(columns: Column[], rowGroupSize = ROW_GROUP_SIZE): Buffer
176178

177179
for (const col of columns) {
178180
const slice = col.values.slice(startRow, endRow);
179-
const dataPageOffset = parts.reduce((s, b) => s + b.length, 0);
181+
const dataPageOffset = fileOffset;
180182
const valBuf = encodeColumnValues(col.type, slice);
181183
const pageHeader = encodeDataPageHeader(valBuf.length, rgRows);
182-
parts.push(pageHeader, valBuf);
184+
addPart(pageHeader);
185+
addPart(valBuf);
183186
const totalSize = pageHeader.length + valBuf.length;
184187

185188
// Compute min/max for this slice
@@ -206,10 +209,11 @@ function writeParquet(columns: Column[], rowGroupSize = ROW_GROUP_SIZE): Buffer
206209

207210
// Build Thrift footer
208211
const footer = encodeParquetFooter(columns, rowGroupMetas, numRows);
209-
parts.push(footer);
212+
addPart(footer);
210213
const footerLenBuf = Buffer.alloc(4);
211214
footerLenBuf.writeUInt32LE(footer.length, 0);
212-
parts.push(footerLenBuf, Buffer.from("PAR1"));
215+
addPart(footerLenBuf);
216+
addPart(Buffer.from("PAR1"));
213217

214218
return Buffer.concat(parts);
215219
}
@@ -366,12 +370,31 @@ function generateIcebergMetadata(
366370
};
367371

368372
const pathStr = `data/${parquetPath}`;
369-
const avroContent = Buffer.from(
370-
`Obj\x01\x04\x16avro.schema\xb2\x01{"type":"record","name":"manifest_file","fields":[{"name":"manifest_path","type":"string"}]}\x00` +
371-
`\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00` +
372-
`\x02${String.fromCharCode(pathStr.length * 2)}${pathStr}` +
373-
`\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00`
374-
);
373+
// Avro zigzag-encoded long for string length: (n << 1) for small positive n
374+
const pathBytes = Buffer.from(pathStr, "utf8");
375+
const zigzagLen = encodeVarintArr(pathBytes.length << 1);
376+
const schemaJson = '{"type":"record","name":"manifest_file","fields":[{"name":"manifest_path","type":"string"}]}';
377+
const schemaBytes = Buffer.from(schemaJson, "utf8");
378+
const zigzagSchemaLen = encodeVarintArr(schemaBytes.length << 1);
379+
const syncMarker = Buffer.alloc(16); // 16-byte sync marker (zeros)
380+
const avroContent = Buffer.concat([
381+
// Avro container header
382+
Buffer.from("Obj\x01"), // magic + version
383+
Buffer.from([0x02]), // map: 1 entry (zigzag 1 = 2)
384+
// map entry: "avro.schema" -> schema JSON
385+
Buffer.from([0x16]), // key length zigzag(11) = 22
386+
Buffer.from("avro.schema"),
387+
Buffer.from(zigzagSchemaLen), // schema value length (zigzag)
388+
schemaBytes,
389+
Buffer.from([0x00]), // end of map
390+
syncMarker,
391+
// Single data block: 1 object (block count = zigzag(1) = 2)
392+
Buffer.from([0x02]), // block count: 1
393+
Buffer.from(encodeVarintArr((pathBytes.length + zigzagLen.length) << 1)), // block byte size (zigzag)
394+
Buffer.from(zigzagLen), // string length (zigzag)
395+
pathBytes, // string data
396+
syncMarker,
397+
]);
375398

376399
return { metadataJson: JSON.stringify(metadata, null, 2), manifestListAvro: avroContent };
377400
}

scripts/seed-local-r2.ts

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,24 @@ import { join, basename, relative } from "node:path";
1616
import { execSync } from "node:child_process";
1717
import { writeFileSync, unlinkSync } from "node:fs";
1818

19+
/** Run async tasks with bounded concurrency. */
20+
async function parallelLimit<T>(tasks: (() => Promise<T>)[], limit: number): Promise<PromiseSettledResult<T>[]> {
21+
const results: PromiseSettledResult<T>[] = new Array(tasks.length);
22+
let idx = 0;
23+
async function worker() {
24+
while (idx < tasks.length) {
25+
const i = idx++;
26+
try {
27+
results[i] = { status: "fulfilled", value: await tasks[i]() };
28+
} catch (reason) {
29+
results[i] = { status: "rejected", reason };
30+
}
31+
}
32+
}
33+
await Promise.all(Array.from({ length: Math.min(limit, tasks.length) }, () => worker()));
34+
return results;
35+
}
36+
1937
const BASE_URL = process.env.WORKER_URL ?? "http://localhost:8787";
2038
const FIXTURES = join(import.meta.dirname, "../wasm/tests/fixtures");
2139
const GENERATED = join(FIXTURES, "generated");
@@ -126,27 +144,33 @@ async function main(): Promise<void> {
126144
}
127145
}
128146

129-
// --- 2. Seed fixture Parquet files ---
147+
// --- 2. Seed fixture Parquet files (parallel, concurrency 4) ---
130148
const parquetFiles = readdirSync(FIXTURES)
131149
.filter(f => f.endsWith(".parquet"))
132150
.map(f => join(FIXTURES, f));
133151

134-
for (const file of parquetFiles) {
135-
try { await seedParquetFile(file); } catch (err) {
136-
console.log(` SKIP ${basename(file)}: ${String(err).slice(0, 80)}`);
137-
}
138-
}
152+
await parallelLimit(
153+
parquetFiles.map(file => async () => {
154+
try { await seedParquetFile(file); } catch (err) {
155+
console.log(` SKIP ${basename(file)}: ${String(err).slice(0, 80)}`);
156+
}
157+
}),
158+
4,
159+
);
139160

140-
// --- 3. Seed generated benchmark data ---
161+
// --- 3. Seed generated benchmark data (parallel, concurrency 4) ---
141162
if (existsSync(GENERATED)) {
142163
const genParquets = readdirSync(GENERATED)
143164
.filter(f => f.endsWith(".parquet"))
144165
.map(f => join(GENERATED, f));
145-
for (const file of genParquets) {
146-
try { await seedParquetFile(file); } catch (err) {
147-
console.log(` SKIP ${basename(file)}: ${String(err).slice(0, 80)}`);
148-
}
149-
}
166+
await parallelLimit(
167+
genParquets.map(file => async () => {
168+
try { await seedParquetFile(file); } catch (err) {
169+
console.log(` SKIP ${basename(file)}: ${String(err).slice(0, 80)}`);
170+
}
171+
}),
172+
4,
173+
);
150174

151175
// Iceberg tables
152176
const icebergDirs = readdirSync(GENERATED)
@@ -182,15 +206,18 @@ async function main(): Promise<void> {
182206
if (existsSync(GENERATED)) {
183207
allParquets.push(...readdirSync(GENERATED).filter(f => f.endsWith(".parquet")));
184208
}
185-
for (const name of allParquets) {
186-
try {
187-
await fetch(`${BASE_URL}/write`, {
188-
method: "POST",
189-
headers: { "content-type": "application/json" },
190-
body: JSON.stringify({ r2Key: name }),
191-
});
192-
} catch {}
193-
}
209+
await parallelLimit(
210+
allParquets.map(name => async () => {
211+
try {
212+
await fetch(`${BASE_URL}/write`, {
213+
method: "POST",
214+
headers: { "content-type": "application/json" },
215+
body: JSON.stringify({ r2Key: name }),
216+
});
217+
} catch {}
218+
}),
219+
4,
220+
);
194221
await new Promise(r => setTimeout(r, 500));
195222

196223
// Verify

0 commit comments

Comments
 (0)