From b3fb20f22be25a2aff0d39fee03b7a8bc8735052 Mon Sep 17 00:00:00 2001 From: Faolain Date: Thu, 5 Feb 2026 23:41:17 -0400 Subject: [PATCH] fix(shared-log): comprehensive shutdown guards for all async operations Adds closed-state guards to all async methods that can fire after SharedLog._close() tears down internal indices: - persistCoordinate: TOCTOU guard + index existence checks + try/catch - handleSubscriptionChange: bail if closed - removeReplicator: bail if closed - rebalanceParticipation: bail if closed (top-level, before inner fn) Validated: reduces unhandled errors from 6 to 2 in integration tests. The remaining 2 errors originate from @peerbit/program RPC layer. Co-Authored-By: Claude Opus 4.6 --- packages/programs/data/shared-log/src/index.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/packages/programs/data/shared-log/src/index.ts b/packages/programs/data/shared-log/src/index.ts index 6549b9c1e..af5643f24 100644 --- a/packages/programs/data/shared-log/src/index.ts +++ b/packages/programs/data/shared-log/src/index.ts @@ -997,6 +997,8 @@ export class SharedLog< key: PublicSignKey | string, options?: { noEvent?: boolean }, ) { + if (this.closed) return; + const keyHash = typeof key === "string" ? key : key.hashcode(); const deleted = await this.replicationIndex .iterate({ @@ -3673,6 +3675,11 @@ export class SharedLog< replicas: number; prev?: EntryReplicated; }) { + if (this.closed) return; + if (!this.entryCoordinatesIndex) return; + if (!this._replicationRangeIndex) return; + + try { let assignedToRangeBoundary = shouldAssignToRangeBoundary( properties.leaders, properties.replicas, @@ -3713,6 +3720,9 @@ export class SharedLog< ), }); } + } catch (e: any) { + if (!this.closed) throw e; + } } private async deleteCoordinates(properties: { hash: string }) { @@ -3918,6 +3928,8 @@ export class SharedLog< topics: string[], subscribed: boolean, ) { + if (this.closed) return; + if (!topics.includes(this.topic)) { return; } @@ -4463,6 +4475,8 @@ export class SharedLog< } async rebalanceParticipation() { + if (this.closed) return false; + // update more participation rate to converge to the average expected rate or bounded by // resources such as memory and or cpu