Skip to content

Commit 90d3644

Browse files
committed
refactor: extract countColumnRows helper + QueryModeError in fragment-do
- types.ts: new countColumnRows(columns) helper replaces 7 duplicated `columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0` patterns - local-executor, query-do, master-do: all 7 call sites use the helper - fragment-do: 2 plain Error throws → QueryModeError (MEMORY_EXCEEDED, QUERY_FAILED) - master-do: 1 missed plain Error → QueryModeError (SCHEMA_MISMATCH)
1 parent 98a055f commit 90d3644

File tree

6 files changed

+20
-14
lines changed

6 files changed

+20
-14
lines changed

src/fragment-do.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
buildKeptPageIndices,
1212
} from "./operators.js";
1313
import { mergeQueryResults } from "./merge.js";
14+
import { QueryModeError } from "./errors.js";
1415
import { resolveBucket } from "./bucket.js";
1516
import { concatQMCBBatches, decodeColumnarBatch, columnarBatchToRows } from "./columnar.js";
1617
import wasmModule from "./wasm-module.js";
@@ -187,12 +188,12 @@ export class FragmentDO extends DurableObject<Env> {
187188
pageInfos: keptPageIndices.map(pi => col.pages[pi]).filter(Boolean),
188189
}));
189190
if (!this.wasmEngine.registerColumns(fragTable, fragColEntries)) {
190-
throw new Error(`WASM OOM: failed to register columns for fragment "${r2Key}"`);
191+
throw new QueryModeError("MEMORY_EXCEEDED", `WASM OOM: failed to register columns for fragment "${r2Key}"`);
191192
}
192193

193194
const fragQuery = { ...query, table: fragTable };
194195
const qmcb = this.wasmEngine.executeQueryColumnar(fragQuery);
195-
if (!qmcb) throw new Error(`WASM query execution failed for fragment "${r2Key}"`);
196+
if (!qmcb) throw new QueryModeError("QUERY_FAILED", `WASM query execution failed for fragment "${r2Key}"`);
196197
this.wasmEngine.clearTable(fragTable);
197198
allBatches.push(qmcb);
198199
totalWasmExecMs += Date.now() - wasmStart;

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { DataFrame, TableQuery } from "./client.js";
22
import type { QueryDescriptor, QueryExecutor } from "./client.js";
33
import type { AppendOptions, AppendResult, DropResult, ExplainResult, QueryResult, Row, QueryDORpc, MasterDORpc } from "./types.js";
4-
import { NULL_SENTINEL } from "./types.js";
4+
import { NULL_SENTINEL, countColumnRows } from "./types.js";
55
import { LocalExecutor } from "./local-executor.js";
66
import { createFromJSON, createFromCSV, createDemo } from "./convenience.js";
77
import { sqlToDescriptor, buildSqlDataFrame } from "./sql/index.js";

src/local-executor.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
*/
88
import type { QueryDescriptor, QueryExecutor } from "./client.js";
99
import type { AppendResult, ColumnMeta, DataType, DiffResult, ExplainResult, PageInfo, QueryResult, Row, TableMeta, DatasetMeta, VersionInfo } from "./types.js";
10-
import { queryReferencedColumns, queryCacheKey, NULL_SENTINEL } from "./types.js";
10+
import { queryReferencedColumns, queryCacheKey, countColumnRows, NULL_SENTINEL } from "./types.js";
1111
import { parseFooter, parseColumnMetaFromProtobuf, FOOTER_SIZE } from "./footer.js";
1212
import { parseManifest } from "./manifest.js";
1313
import { detectFormat, getParquetFooterLength, parseParquetFooter, parquetMetaToTableMeta } from "./parquet.js";
@@ -149,7 +149,7 @@ export class LocalExecutor implements QueryExecutor {
149149
async count(query: QueryDescriptor): Promise<number> {
150150
const meta = await this.getOrLoadMeta(query.table);
151151
if (query.filters.length === 0 && !query.filterGroups?.length && !query.aggregates?.length && !query.groupBy?.length && !query.distinct && !query.join && !query.vectorSearch && !query.setOperation && !query.subqueryIn && !query.computedColumns?.length) {
152-
return meta.columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
152+
return countColumnRows(meta.columns);
153153
}
154154
// With filters: fall through to aggregate path
155155
const desc = { ...query, aggregates: [{ fn: "count" as const, column: "*" }] };
@@ -206,7 +206,7 @@ export class LocalExecutor implements QueryExecutor {
206206

207207
const coalesced = coalesceRanges(ranges);
208208
const estimatedBytes = ranges.reduce((s, r) => s + r.length, 0);
209-
const totalRows = columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
209+
const totalRows = countColumnRows(columns);
210210

211211
// Detect format from file
212212
const format = await this.detectFileFormat(query.table);

src/master-do.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { DurableObject } from "cloudflare:workers";
22
import type { ColumnMeta, Env, Footer, TableMeta, DatasetMeta, AppendResult, AppendOptions, DropResult } from "./types.js";
3-
import { NULL_SENTINEL } from "./types.js";
3+
import { NULL_SENTINEL, countColumnRows } from "./types.js";
44
import { parseFooter, parseColumnMetaFromProtobuf, FOOTER_SIZE } from "./footer.js";
55
import { parseManifest } from "./manifest.js";
66
import { detectFormat, getParquetFooterLength, parseParquetFooter, parquetMetaToTableMeta } from "./parquet.js";
@@ -54,7 +54,7 @@ export class MasterDO extends DurableObject<Env> {
5454
if (!result) throw new QueryModeError("INVALID_FORMAT", `Failed to read footer for "${r2Key}"`);
5555

5656
const tableName = r2Key.replace(/\.(lance|parquet)$/, "").split("/").pop() ?? r2Key;
57-
const totalRows = result.columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
57+
const totalRows = countColumnRows(result.columns);
5858
const meta: TableMeta = {
5959
name: tableName, footer: result.parsed, format: result.format, columns: result.columns,
6060
totalRows, fileSize: result.fileSize, r2Key, updatedAt: Date.now(),
@@ -150,7 +150,7 @@ export class MasterDO extends DurableObject<Env> {
150150

151151
// Convert row-major to column-major
152152
const columnArrays = rowsToColumnArrays(rows);
153-
if (columnArrays.length === 0) throw new Error("No valid columns found");
153+
if (columnArrays.length === 0) throw new QueryModeError("SCHEMA_MISMATCH", "No valid columns found in rows");
154154

155155
// Build Lance fragment via WASM
156156
const fragmentBytes = wasm.buildFragment(columnArrays);

src/query-do.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { DurableObject } from "cloudflare:workers";
22
import type { ColumnMeta, DataType, Env, ExplainResult, FilterOp, Footer, Row, TableMeta, DatasetMeta, IcebergDatasetMeta, QueryResult } from "./types.js";
3-
import { queryReferencedColumns, queryCacheKey, NULL_SENTINEL } from "./types.js";
3+
import { queryReferencedColumns, queryCacheKey, countColumnRows, NULL_SENTINEL } from "./types.js";
44
import type { QueryDescriptor } from "./client.js";
55
import { parseFooter, parseColumnMetaFromProtobuf } from "./footer.js";
66
import { parseManifest, logicalTypeToDataType } from "./manifest.js";
@@ -589,7 +589,7 @@ export class QueryDO extends DurableObject<Env> {
589589
const columns = await this.readColumnMeta(r2Key, footer);
590590
meta = {
591591
name: table, footer, format: "lance", columns,
592-
totalRows: columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0,
592+
totalRows: countColumnRows(columns),
593593
fileSize, r2Key, updatedAt,
594594
};
595595
}
@@ -639,7 +639,7 @@ export class QueryDO extends DurableObject<Env> {
639639
}
640640

641641
// Use totalRows from broadcast if available and > 0, otherwise fall back to column computation
642-
const computedRows = columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
642+
const computedRows = countColumnRows(columns);
643643
const totalRows = (body.totalRows != null && body.totalRows > 0) ? body.totalRows : computedRows;
644644

645645
const meta: TableMeta = {
@@ -665,7 +665,7 @@ export class QueryDO extends DurableObject<Env> {
665665
if (query.filters.length === 0 && !query.filterGroups?.length && !query.aggregates?.length && !query.groupBy?.length && !query.distinct && !query.join && !query.vectorSearch && !query.setOperation && !query.subqueryIn && !query.computedColumns?.length) {
666666
const meta = this.footerCache.get(query.table)
667667
?? (await this.loadTableFromR2(query.table)) ?? undefined;
668-
if (meta) return meta.columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? meta.totalRows;
668+
if (meta) return countColumnRows(meta.columns) || meta.totalRows;
669669
}
670670
const countQuery = { ...query, aggregates: [{ fn: "count" as const, column: "*" }] };
671671
const result = await this.executeQuery(countQuery);
@@ -1455,7 +1455,7 @@ export class QueryDO extends DurableObject<Env> {
14551455
const columns = await this.readColumnMeta(r2Key, footer);
14561456
const meta: TableMeta = {
14571457
name: tableName, footer, format: "lance", columns,
1458-
totalRows: columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0,
1458+
totalRows: countColumnRows(columns),
14591459
fileSize, r2Key, updatedAt: Date.now(),
14601460
};
14611461
this.footerCache.set(tableName, meta);

src/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ export interface ColumnMeta {
5757
listDimension?: number;
5858
}
5959

60+
/** Count total rows across all pages of a column set. */
61+
export function countColumnRows(columns: ColumnMeta[]): number {
62+
return columns[0]?.pages.reduce((s, p) => s + p.rowCount, 0) ?? 0;
63+
}
64+
6065
/** Parquet-specific encoding info attached to pages */
6166
export interface PageEncoding {
6267
encoding?: string; // "PLAIN" | "RLE_DICTIONARY" | ...

0 commit comments

Comments
 (0)