Skip to content

Commit ca66f84

Browse files
committed
fix: QueryModeError in master-do + concatColumnarBatches offsets guard
- master-do: all 8 throw sites use QueryModeError with proper codes (TABLE_NOT_FOUND, INVALID_FORMAT, QUERY_FAILED) and descriptive messages - columnar: concatColumnarBatches UTF8 path guards against missing offsets instead of non-null assertion crash
1 parent 7ba9261 commit ca66f84

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed

src/columnar.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -499,17 +499,30 @@ export function concatColumnarBatches(batches: ColumnarBatch[]): ColumnarBatch |
499499
let row = 0;
500500
for (const batch of batches) {
501501
const col = batch.columns[ci];
502-
const srcOffsets = col.offsets!;
503502
const srcData = new Uint8Array(col.data);
504-
for (let r = 0; r < batch.rowCount; r++) {
503+
if (col.offsets) {
504+
for (let r = 0; r < batch.rowCount; r++) {
505+
offsets[row] = strOffset;
506+
const start = col.offsets[r];
507+
const end = col.offsets[r + 1];
508+
if (end > start) {
509+
strBuf.set(srcData.subarray(start, end), strOffset);
510+
strOffset += end - start;
511+
}
512+
row++;
513+
}
514+
} else {
515+
// No offsets — treat entire data as a single string for row 0, empty for rest
505516
offsets[row] = strOffset;
506-
const start = srcOffsets[r];
507-
const end = srcOffsets[r + 1];
508-
if (end > start) {
509-
strBuf.set(srcData.subarray(start, end), strOffset);
510-
strOffset += end - start;
517+
if (srcData.byteLength > 0) {
518+
strBuf.set(srcData, strOffset);
519+
strOffset += srcData.byteLength;
511520
}
512521
row++;
522+
for (let r = 1; r < batch.rowCount; r++) {
523+
offsets[row] = strOffset;
524+
row++;
525+
}
513526
}
514527
}
515528
offsets[totalRows] = strOffset;

src/master-do.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type { QueryDORpc } from "./types.js";
88
import { instantiateWasm, rowsToColumnArrays, type WasmEngine } from "./wasm-engine.js";
99
import { resolveBucket } from "./bucket.js";
1010
import { withTimeout } from "./coalesce.js";
11+
import { QueryModeError } from "./errors.js";
1112
import wasmModule from "./wasm-module.js";
1213

1314
const textEncoder = new TextEncoder();
@@ -41,7 +42,7 @@ export class MasterDO extends DurableObject<Env> {
4142
async writeRpc(body: unknown): Promise<unknown> {
4243
const { r2Key } = body as { r2Key: string };
4344
if (!r2Key || typeof r2Key !== "string" || r2Key.includes("..")) {
44-
throw new Error("Invalid r2Key");
45+
throw new QueryModeError("QUERY_FAILED", "Invalid r2Key");
4546
}
4647

4748
// Check if this is a dataset directory (ends with / or .lance/)
@@ -50,7 +51,7 @@ export class MasterDO extends DurableObject<Env> {
5051
}
5152

5253
const result = await this.readFooterAndColumns(r2Key);
53-
if (!result) throw new Error("Failed to read footer");
54+
if (!result) throw new QueryModeError("INVALID_FORMAT", `Failed to read footer for "${r2Key}"`);
5455

5556
const tableName = r2Key.replace(/\.(lance|parquet)$/, "").split("/").pop() ?? r2Key;
5657
const totalRows = result.columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
@@ -72,14 +73,14 @@ export class MasterDO extends DurableObject<Env> {
7273
const manifestKeys = listed.objects
7374
.filter(o => o.key.endsWith(".manifest"))
7475
.sort((a, b) => { const na = parseInt(a.key.split("/").pop()!, 10); const nb = parseInt(b.key.split("/").pop()!, 10); return na - nb; });
75-
if (manifestKeys.length === 0) throw new Error("No manifests found");
76+
if (manifestKeys.length === 0) throw new QueryModeError("TABLE_NOT_FOUND", `No manifests found in "${r2Prefix}"`);
7677

7778
const latestKey = manifestKeys[manifestKeys.length - 1].key;
7879
const manifestObj = await resolveBucket(this.env, latestKey).get(latestKey);
79-
if (!manifestObj) throw new Error("Failed to read manifest");
80+
if (!manifestObj) throw new QueryModeError("TABLE_NOT_FOUND", `Failed to read manifest "${latestKey}"`);
8081

8182
const manifest = parseManifest(await manifestObj.arrayBuffer());
82-
if (!manifest) throw new Error("Failed to parse manifest");
83+
if (!manifest) throw new QueryModeError("INVALID_FORMAT", `Failed to parse manifest "${latestKey}"`);
8384

8485
// Read first fragment's footer to broadcast (Query DOs will discover the rest)
8586
if (manifest.fragments.length > 0) {
@@ -111,7 +112,7 @@ export class MasterDO extends DurableObject<Env> {
111112

112113
/** Core append logic. Supports partitioned writes via options.partitionBy. */
113114
private async executeAppend(table: string, rows: Record<string, unknown>[], options?: AppendOptions): Promise<AppendResult> {
114-
if (!rows?.length) throw new Error("No rows provided");
115+
if (!rows?.length) throw new QueryModeError("QUERY_FAILED", "No rows provided for append");
115116

116117
// Partition-aware ingest: split rows by partition value, write separate fragments
117118
if (options?.partitionBy) {
@@ -245,7 +246,7 @@ export class MasterDO extends DurableObject<Env> {
245246
}
246247
}
247248

248-
throw new Error("CAS failed after max retries");
249+
throw new QueryModeError("QUERY_FAILED", `CAS failed after ${MAX_RETRIES} retries for table "${table}"`);
249250
}
250251

251252
/** Build a simple binary manifest for the _versions/ directory. */
@@ -317,10 +318,10 @@ export class MasterDO extends DurableObject<Env> {
317318
async refreshRpc(body: unknown): Promise<unknown> {
318319
const { r2Key } = body as { r2Key: string };
319320
if (!r2Key || typeof r2Key !== "string" || r2Key.includes("..")) {
320-
throw new Error("Invalid r2Key");
321+
throw new QueryModeError("QUERY_FAILED", "Invalid r2Key");
321322
}
322323
const result = await this.readFooterAndColumns(r2Key);
323-
if (!result) throw new Error("Failed to read footer");
324+
if (!result) throw new QueryModeError("INVALID_FORMAT", `Failed to read footer for "${r2Key}"`);
324325

325326
const tableName = r2Key.replace(/\.(lance|parquet)$/, "").split("/").pop() ?? r2Key;
326327
await this.broadcast(tableName, r2Key, result);

0 commit comments

Comments
 (0)