Skip to content

Commit 9f814db

Browse files
committed
refactor: extract shared queryCacheKey to types.ts — deduplicate FNV-1a hash
local-executor.ts and query-do.ts each had identical 37-line private queryCacheKey methods. Extracted to shared function in types.ts with compatible type signature (accepts both QueryDescriptor and lighter inline types).
1 parent 4b170f9 commit 9f814db

File tree

3 files changed

+67
-84
lines changed

3 files changed

+67
-84
lines changed

src/local-executor.ts

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
*/
88
import type { QueryDescriptor, QueryExecutor } from "./client.js";
99
import type { AppendResult, ColumnMeta, DataType, DiffResult, ExplainResult, PageInfo, QueryResult, Row, TableMeta, DatasetMeta, VersionInfo } from "./types.js";
10-
import { queryReferencedColumns, NULL_SENTINEL } from "./types.js";
10+
import { queryReferencedColumns, queryCacheKey, NULL_SENTINEL } from "./types.js";
1111
import { parseFooter, parseColumnMetaFromProtobuf, FOOTER_SIZE } from "./footer.js";
1212
import { parseManifest } from "./manifest.js";
1313
import { detectFormat, getParquetFooterLength, parseParquetFooter, parquetMetaToTableMeta } from "./parquet.js";
@@ -346,52 +346,14 @@ export class LocalExecutor implements QueryExecutor {
346346
return cached;
347347
}
348348

349-
/** Build a cache key from query descriptor — no serialization. */
350-
private queryCacheKey(query: QueryDescriptor): string {
351-
let h = 0x811c9dc5;
352-
const feed = (s: string) => { for (let i = 0; i < s.length; i++) { h ^= s.charCodeAt(i); h = Math.imul(h, 0x01000193); } };
353-
feed(query.table); feed("\0");
354-
if (query.version !== undefined) { feed(`v${query.version}`); feed("\0"); }
355-
for (const f of [...query.filters].sort((a, b) => a.column.localeCompare(b.column) || a.op.localeCompare(b.op))) {
356-
feed(f.column); feed("\0"); feed(f.op); feed("\0"); feed(String(f.value)); feed("\0");
357-
}
358-
if (query.filterGroups) {
359-
for (const group of query.filterGroups) {
360-
feed("|");
361-
for (const f of [...group].sort((a, b) => a.column.localeCompare(b.column) || a.op.localeCompare(b.op))) {
362-
feed(f.column); feed("\0"); feed(f.op); feed("\0"); feed(String(f.value)); feed("\0");
363-
}
364-
}
365-
}
366-
for (const p of [...query.projections].sort()) { feed(p); feed("\0"); }
367-
if (query.sortColumn) { feed(query.sortColumn); feed("\0"); feed(query.sortDirection ?? "asc"); feed("\0"); }
368-
if (query.limit !== undefined) { feed(String(query.limit)); feed("\0"); }
369-
if (query.offset !== undefined) { feed(String(query.offset)); feed("\0"); }
370-
if (query.aggregates) for (const a of query.aggregates) { feed(a.fn); feed("\0"); feed(a.column); feed("\0"); if (a.alias) feed(a.alias); feed("\0"); }
371-
if (query.groupBy) for (const g of query.groupBy) { feed(g); feed("\0"); }
372-
if (query.distinct) for (const d of query.distinct) { feed(d); feed("\0"); }
373-
if (query.windows) for (const w of query.windows) {
374-
feed(w.fn); feed("\0"); feed(w.alias); feed("\0"); feed(w.column ?? NULL_SENTINEL); feed("\0");
375-
if (w.partitionBy) for (const p of w.partitionBy) { feed(p); feed("\0"); }
376-
if (w.orderBy) for (const o of w.orderBy) { feed(o.column); feed(o.direction); feed("\0"); }
377-
if (w.frame) { feed(w.frame.type); feed(String(w.frame.start)); feed(String(w.frame.end)); feed("\0"); }
378-
if (w.args?.offset !== undefined) { feed(String(w.args.offset)); feed("\0"); }
379-
if (w.args?.default_ !== undefined) { feed(String(w.args.default_)); feed("\0"); }
380-
}
381-
if (query.computedColumns) for (const cc of query.computedColumns) { feed(cc.alias); feed("\0"); if (cc.fn) { feed(cc.fn.toString()); feed("\0"); } }
382-
if (query.setOperation) { feed(query.setOperation.mode); feed("\0"); feed(this.queryCacheKey(query.setOperation.right)); feed("\0"); }
383-
if (query.subqueryIn) for (const sq of query.subqueryIn) { feed(sq.column); feed("\0"); for (const v of sq.valueSet) { feed(v); feed("\0"); } }
384-
if (query.join) { feed(query.join.type ?? "inner"); feed("\0"); feed(query.join.leftKey); feed("\0"); feed(query.join.rightKey); feed("\0"); feed(this.queryCacheKey(query.join.right)); feed("\0"); }
385-
return `qr:${query.table}:${(h >>> 0).toString(36)}`;
386-
}
387349

388350
async execute(query: QueryDescriptor): Promise<QueryResult> {
389351
const startTime = Date.now();
390352
const isUrl = query.table.startsWith("http://") || query.table.startsWith("https://");
391353

392354
// Check result cache (skip for vector search — non-deterministic with IVF-PQ)
393355
if (query.cacheTTL && !query.vectorSearch) {
394-
const cacheKey = this.queryCacheKey(query);
356+
const cacheKey = queryCacheKey(query);
395357
const cached = this.resultCache.get(cacheKey);
396358
if (cached) return { ...cached, cacheHit: true, durationMs: Date.now() - startTime };
397359
}
@@ -492,7 +454,7 @@ export class LocalExecutor implements QueryExecutor {
492454
};
493455

494456
if (query.cacheTTL) {
495-
this.resultCache.setWithTTL(this.queryCacheKey(query), result, query.cacheTTL);
457+
this.resultCache.setWithTTL(queryCacheKey(query), result, query.cacheTTL);
496458
}
497459

498460
return result;

src/query-do.ts

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { DurableObject } from "cloudflare:workers";
22
import type { ColumnMeta, DataType, Env, ExplainResult, FilterOp, Footer, Row, TableMeta, DatasetMeta, IcebergDatasetMeta, QueryResult } from "./types.js";
3-
import { queryReferencedColumns, NULL_SENTINEL } from "./types.js";
3+
import { queryReferencedColumns, queryCacheKey, NULL_SENTINEL } from "./types.js";
44
import type { QueryDescriptor } from "./client.js";
55
import { parseFooter, parseColumnMetaFromProtobuf } from "./footer.js";
66
import { parseManifest, logicalTypeToDataType } from "./manifest.js";
@@ -652,45 +652,6 @@ export class QueryDO extends DurableObject<Env> {
652652
await this.ctx.storage.put(`table:${body.table}`, meta);
653653
}
654654

655-
private queryKey(query: QueryDescriptor): string {
656-
// FNV-1a hash over query components — no JSON serialization
657-
let h = 0x811c9dc5;
658-
const feed = (s: string) => { for (let i = 0; i < s.length; i++) { h ^= s.charCodeAt(i); h = Math.imul(h, 0x01000193); } };
659-
feed(query.table); feed("\0");
660-
if (query.version !== undefined) { feed(`v${query.version}`); feed("\0"); }
661-
for (const f of [...query.filters].sort((a, b) => a.column.localeCompare(b.column) || a.op.localeCompare(b.op))) {
662-
feed(f.column); feed("\0"); feed(f.op); feed("\0"); feed(String(f.value)); feed("\0");
663-
}
664-
if (query.filterGroups) {
665-
for (const group of query.filterGroups) {
666-
feed("|");
667-
for (const f of [...group].sort((a, b) => a.column.localeCompare(b.column) || a.op.localeCompare(b.op))) {
668-
feed(f.column); feed("\0"); feed(f.op); feed("\0"); feed(String(f.value)); feed("\0");
669-
}
670-
}
671-
}
672-
for (const p of [...query.projections].sort()) { feed(p); feed("\0"); }
673-
if (query.sortColumn) { feed(query.sortColumn); feed("\0"); feed(query.sortDirection ?? "asc"); feed("\0"); }
674-
if (query.limit !== undefined) { feed(String(query.limit)); feed("\0"); }
675-
if (query.offset !== undefined) { feed(String(query.offset)); feed("\0"); }
676-
if (query.aggregates) for (const a of query.aggregates) { feed(a.fn); feed("\0"); feed(a.column); feed("\0"); if (a.alias) feed(a.alias); feed("\0"); }
677-
if (query.groupBy) for (const g of query.groupBy) { feed(g); feed("\0"); }
678-
if (query.distinct) for (const d of query.distinct) { feed(d); feed("\0"); }
679-
if (query.windows) for (const w of query.windows) {
680-
feed(w.fn); feed("\0"); feed(w.alias); feed("\0"); feed(w.column ?? NULL_SENTINEL); feed("\0");
681-
if (w.partitionBy) for (const p of w.partitionBy) { feed(p); feed("\0"); }
682-
if (w.orderBy) for (const o of w.orderBy) { feed(o.column); feed(o.direction); feed("\0"); }
683-
if (w.frame) { feed(w.frame.type); feed(String(w.frame.start)); feed(String(w.frame.end)); feed("\0"); }
684-
if (w.args?.offset !== undefined) { feed(String(w.args.offset)); feed("\0"); }
685-
if (w.args?.default_ !== undefined) { feed(String(w.args.default_)); feed("\0"); }
686-
}
687-
if (query.computedColumns) for (const cc of query.computedColumns) { feed(cc.alias); feed("\0"); if (cc.fn) { feed(cc.fn.toString()); feed("\0"); } }
688-
if (query.setOperation) { feed(query.setOperation.mode); feed("\0"); feed(this.queryKey(query.setOperation.right)); feed("\0"); }
689-
if (query.subqueryIn) for (const sq of query.subqueryIn) { feed(sq.column); feed("\0"); for (const v of sq.valueSet) { feed(v); feed("\0"); } }
690-
if (query.join) { feed(query.join.type ?? "inner"); feed("\0"); feed(query.join.leftKey); feed("\0"); feed(query.join.rightKey); feed("\0"); feed(this.queryKey(query.join.right)); feed("\0"); }
691-
return `qr:${query.table}:${(h >>> 0).toString(36)}`;
692-
}
693-
694655
private parseQuery(body: unknown): QueryDescriptor {
695656
return parseAndValidateQuery(body) as QueryDescriptor;
696657
}
@@ -818,15 +779,15 @@ export class QueryDO extends DurableObject<Env> {
818779
// Result cache check (skip for vector search)
819780
const cacheable = !!(query.cacheTTL && !query.vectorSearch);
820781
if (cacheable) {
821-
const cached = this.resultCache.get(this.queryKey(query));
782+
const cached = this.resultCache.get(queryCacheKey(query));
822783
if (cached) return { ...cached, durationMs: 0 };
823784
}
824785

825786
// Join path: use operator pipeline with R2 spill
826787
if (query.join) {
827788
const result = await this.executeJoin(query, t0);
828789
if (cacheable) {
829-
this.resultCache.setWithTTL(this.queryKey(query), result, query.cacheTTL!);
790+
this.resultCache.setWithTTL(queryCacheKey(query), result, query.cacheTTL!);
830791
}
831792
return result;
832793
}
@@ -874,7 +835,7 @@ export class QueryDO extends DurableObject<Env> {
874835
}
875836

876837
if (cacheable) {
877-
this.resultCache.setWithTTL(this.queryKey(query), result, query.cacheTTL!);
838+
this.resultCache.setWithTTL(queryCacheKey(query), result, query.cacheTTL!);
878839
}
879840
return result;
880841
}

src/types.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,66 @@ export interface QueryDORpc {
435435
registerIcebergRpc(body: unknown): Promise<unknown>;
436436
}
437437

438+
/**
439+
* FNV-1a cache key for a query descriptor — deterministic hash over all query fields.
440+
* Shared by local-executor and query-do for result cache dedup.
441+
*/
442+
export function queryCacheKey(query: {
443+
table: string;
444+
version?: number;
445+
filters: FilterOp[];
446+
filterGroups?: FilterOp[][];
447+
projections: string[];
448+
sortColumn?: string;
449+
sortDirection?: string;
450+
limit?: number;
451+
offset?: number;
452+
aggregates?: AggregateOp[];
453+
groupBy?: string[];
454+
distinct?: string[];
455+
windows?: WindowSpec[];
456+
computedColumns?: { alias: string; fn?: ((...args: never[]) => unknown) }[];
457+
setOperation?: { mode: string; right: unknown };
458+
subqueryIn?: { column: string; valueSet: Set<string> | string[] }[];
459+
join?: { type?: string; leftKey: string; rightKey: string; right: unknown };
460+
}): string {
461+
let h = 0x811c9dc5;
462+
const feed = (s: string) => { for (let i = 0; i < s.length; i++) { h ^= s.charCodeAt(i); h = Math.imul(h, 0x01000193); } };
463+
feed(query.table); feed("\0");
464+
if (query.version !== undefined) { feed(`v${query.version}`); feed("\0"); }
465+
for (const f of [...query.filters].sort((a, b) => a.column.localeCompare(b.column) || a.op.localeCompare(b.op))) {
466+
feed(f.column); feed("\0"); feed(f.op); feed("\0"); feed(String(f.value)); feed("\0");
467+
}
468+
if (query.filterGroups) {
469+
for (const group of query.filterGroups) {
470+
feed("|");
471+
for (const f of [...group].sort((a, b) => a.column.localeCompare(b.column) || a.op.localeCompare(b.op))) {
472+
feed(f.column); feed("\0"); feed(f.op); feed("\0"); feed(String(f.value)); feed("\0");
473+
}
474+
}
475+
}
476+
for (const p of [...query.projections].sort()) { feed(p); feed("\0"); }
477+
if (query.sortColumn) { feed(query.sortColumn); feed("\0"); feed(query.sortDirection ?? "asc"); feed("\0"); }
478+
if (query.limit !== undefined) { feed(String(query.limit)); feed("\0"); }
479+
if (query.offset !== undefined) { feed(String(query.offset)); feed("\0"); }
480+
if (query.aggregates) for (const a of query.aggregates) { feed(a.fn); feed("\0"); feed(a.column); feed("\0"); if (a.alias) feed(a.alias); feed("\0"); }
481+
if (query.groupBy) for (const g of query.groupBy) { feed(g); feed("\0"); }
482+
if (query.distinct) for (const d of query.distinct) { feed(d); feed("\0"); }
483+
if (query.windows) for (const w of query.windows) {
484+
feed(w.fn); feed("\0"); feed(w.alias); feed("\0"); feed(w.column ?? NULL_SENTINEL); feed("\0");
485+
if (w.partitionBy) for (const p of w.partitionBy) { feed(p); feed("\0"); }
486+
if (w.orderBy) for (const o of w.orderBy) { feed(o.column); feed(o.direction); feed("\0"); }
487+
if (w.frame) { feed(w.frame.type); feed(String(w.frame.start)); feed(String(w.frame.end)); feed("\0"); }
488+
if (w.args?.offset !== undefined) { feed(String(w.args.offset)); feed("\0"); }
489+
if (w.args?.default_ !== undefined) { feed(String(w.args.default_)); feed("\0"); }
490+
}
491+
if (query.computedColumns) for (const cc of query.computedColumns) { feed(cc.alias); feed("\0"); if (cc.fn) { feed(cc.fn.toString()); feed("\0"); } }
492+
if (query.setOperation) { feed(query.setOperation.mode); feed("\0"); feed(queryCacheKey(query.setOperation.right as Parameters<typeof queryCacheKey>[0])); feed("\0"); }
493+
if (query.subqueryIn) for (const sq of query.subqueryIn) { feed(sq.column); feed("\0"); for (const v of sq.valueSet) { feed(v); feed("\0"); } }
494+
if (query.join) { feed(query.join.type ?? "inner"); feed("\0"); feed(query.join.leftKey); feed("\0"); feed(query.join.rightKey); feed("\0"); feed(queryCacheKey(query.join.right as Parameters<typeof queryCacheKey>[0])); feed("\0"); }
495+
return `qr:${query.table}:${(h >>> 0).toString(36)}`;
496+
}
497+
438498
/** RPC interface exposed by MasterDO for zero-serialization calls */
439499
export interface MasterDORpc {
440500
appendRpc(table: string, rows: Record<string, unknown>[], options?: AppendOptions): Promise<AppendResult>;

0 commit comments

Comments
 (0)