Skip to content

Commit d7f341f

Browse files
committed
feat: persist partition catalogs in DO storage + expose in explain()
- PartitionCatalog.serialize()/deserialize() for durable storage persistence - Catalogs saved to ctx.storage on build, restored on DO wake (ensureInitialized) - Survives DO hibernation — no rebuild needed across requests - ExplainResult.partitionCatalog shows column + partition value count - At PB scale: catalog persists across millions of queries without rebuild cost
1 parent 6bc13d7 commit d7f341f

File tree

3 files changed

+31
-0
lines changed

3 files changed

+31
-0
lines changed

src/partition-catalog.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,23 @@ export class PartitionCatalog {
142142
}
143143
}
144144

145+
/** Serialize to a plain object for DO durable storage. */
146+
serialize(): { column: string; index: Record<string, number[]>; allFragmentIds: number[] } {
147+
const index: Record<string, number[]> = {};
148+
for (const [key, ids] of this.index) index[key] = ids;
149+
return { column: this.column, index, allFragmentIds: this.allFragmentIds };
150+
}
151+
152+
/** Restore from serialized form. */
153+
static deserialize(data: { column: string; index: Record<string, number[]>; allFragmentIds: number[] }): PartitionCatalog {
154+
const catalog = new PartitionCatalog(data.column);
155+
for (const [key, ids] of Object.entries(data.index)) {
156+
catalog.index.set(key, ids);
157+
}
158+
catalog.allFragmentIds = data.allFragmentIds;
159+
return catalog;
160+
}
161+
145162
/** Stats for diagnostics. */
146163
stats(): { column: string; partitionValues: number; fragments: number; indexSizeBytes: number } {
147164
let indexSizeBytes = 0;

src/query-do.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ export class QueryDO extends DurableObject<Env> {
484484
if (partitionBy) {
485485
const catalog = PartitionCatalog.fromFragments(partitionBy, fragmentMetas);
486486
this.partitionCatalogs.set(tableName, catalog);
487+
this.ctx.storage.put(`pcatalog:${tableName}`, catalog.serialize());
487488
this.log("info", "partition_catalog_built", { ...catalog.stats(), source: "explicit" });
488489
return;
489490
}
@@ -516,6 +517,7 @@ export class QueryDO extends DurableObject<Env> {
516517
if (bestColumn) {
517518
const catalog = PartitionCatalog.fromFragments(bestColumn, fragmentMetas);
518519
this.partitionCatalogs.set(tableName, catalog);
520+
this.ctx.storage.put(`pcatalog:${tableName}`, catalog.serialize());
519521
this.log("info", "partition_catalog_built", catalog.stats());
520522
}
521523
}
@@ -540,6 +542,13 @@ export class QueryDO extends DurableObject<Env> {
540542
const stored = await this.ctx.storage.list<TableMeta>({ prefix: "table:" });
541543
for (const [key, meta] of stored) this.footerCache.set(key.replace("table:", ""), meta);
542544

545+
// Restore persisted partition catalogs
546+
const catalogs = await this.ctx.storage.list<ReturnType<PartitionCatalog["serialize"]>>({ prefix: "pcatalog:" });
547+
for (const [key, data] of catalogs) {
548+
const tableName = key.replace("pcatalog:", "");
549+
this.partitionCatalogs.set(tableName, PartitionCatalog.deserialize(data));
550+
}
551+
543552
this.wasmEngine = await instantiateWasm(wasmModule);
544553

545554
// Register with Master for invalidation broadcasts
@@ -773,6 +782,9 @@ export class QueryDO extends DurableObject<Env> {
773782
estimatedRows: meta.totalRows,
774783
fragments: totalFragments,
775784
fragmentsSkipped,
785+
partitionCatalog: this.partitionCatalogs.has(query.table)
786+
? { column: this.partitionCatalogs.get(query.table)!.column, partitionValues: this.partitionCatalogs.get(query.table)!.stats().partitionValues }
787+
: undefined,
776788
filters: [
777789
...query.filters.map(f => ({
778790
column: f.column,

src/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ export interface ExplainResult {
309309
fragments: number;
310310
/** Fragments skipped by fragment-level min/max pruning */
311311
fragmentsSkipped?: number;
312+
/** Partition catalog info (present when catalog exists for this table) */
313+
partitionCatalog?: { column: string; partitionValues: number };
312314
filters: { column: string; op: string; pushable: boolean }[];
313315
metaCached: boolean;
314316
/** Estimated number of rows after filter pushdown */

0 commit comments

Comments
 (0)