Skip to content

Commit 1fcfe79

Browse files
committed
fix: worker-pool ID collision + query-do init race condition
- worker-pool: Date.now() in fan-out worker IDs can collide when multiple fan-outs execute within the same millisecond — use crypto.randomUUID() - query-do: ensureInitialized() set initialized=true before async work completed, allowing concurrent callers to proceed before WASM was ready — now uses promise-based serialization so all callers await the same init
1 parent 3a82b84 commit 1fcfe79

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

src/query-do.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ export class QueryDO extends DurableObject<Env> {
409409
private resultCache = new VipCache<string, QueryResult>(RESULT_CACHE_MAX, RESULT_VIP_THRESHOLD);
410410
private wasmEngine!: WasmEngine;
411411
private activeFragmentSlots = new Set<number>(); // slots currently scanning
412-
private initialized = false;
412+
private initPromise: Promise<void> | null = null;
413413
private registeredWithMaster = false;
414414

415415
constructor(ctx: DurableObjectState, env: Env) {
@@ -524,10 +524,14 @@ export class QueryDO extends DurableObject<Env> {
524524
}
525525
}
526526

527-
private async ensureInitialized(): Promise<void> {
528-
if (this.initialized) return;
529-
this.initialized = true;
527+
private ensureInitialized(): Promise<void> {
528+
if (!this.initPromise) {
529+
this.initPromise = this.doInit();
530+
}
531+
return this.initPromise;
532+
}
530533

534+
private async doInit(): Promise<void> {
531535
const stored = await this.ctx.storage.list<TableMeta>({ prefix: "table:" });
532536
for (const [key, meta] of stored) this.footerCache.set(key.replace("table:", ""), meta);
533537

src/worker-pool.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ export class WorkerPool {
159159
): Promise<R2Partition[]> {
160160
const results = await Promise.all(
161161
partitions.map(async (partition, i) => {
162-
const workerId = `worker-${taskName}-${i}-${Date.now()}`;
162+
const workerId = `worker-${taskName}-${i}-${crypto.randomUUID()}`;
163163
const id = this.doNamespace.idFromName(workerId);
164164
const handle = this.doNamespace.get(id) as unknown as WorkerDORpc;
165165
return handle.executeTask({
@@ -187,7 +187,7 @@ export class WorkerPool {
187187

188188
const results = await Promise.all(
189189
chunks.map(async (chunk, ci) => {
190-
const coordId = `coord-${taskName}-${ci}-${Date.now()}`;
190+
const coordId = `coord-${taskName}-${ci}-${crypto.randomUUID()}`;
191191
const id = this.doNamespace.idFromName(coordId);
192192
const handle = this.doNamespace.get(id) as unknown as WorkerDORpc;
193193
return handle.executeCoordinator({
@@ -222,7 +222,7 @@ export class WorkerPool {
222222

223223
const results = await Promise.all(
224224
superChunks.map(async (superChunk, sci) => {
225-
const superCoordId = `supercoord-${taskName}-${sci}-${Date.now()}`;
225+
const superCoordId = `supercoord-${taskName}-${sci}-${crypto.randomUUID()}`;
226226
const id = this.doNamespace.idFromName(superCoordId);
227227
const handle = this.doNamespace.get(id) as unknown as WorkerDORpc;
228228
return handle.executeSuperCoordinator({

0 commit comments

Comments
 (0)