diff --git a/.aegir.js b/.aegir.js index 56f892ef3..ec7ba8386 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,8 +1,15 @@ -// get monorepo root location using esm and .git folder +// Resolve monorepo root from the location of this config file. +// Note: in git worktrees `.git` is a file (not a directory), so we can't rely +// on `findUp(..., { type: "directory" })` always succeeding. import * as findUp from "find-up"; import path from "path"; +import { fileURLToPath } from "url"; -const root = path.dirname(await findUp.findUp(".git", { type: "directory" })); +const configDir = path.dirname(fileURLToPath(import.meta.url)); +const gitPath = + (await findUp.findUp(".git", { cwd: configDir, type: "directory" })) ?? + (await findUp.findUp(".git", { cwd: configDir, type: "file" })); +const root = gitPath ? path.dirname(gitPath) : configDir; export default { // global options diff --git a/packages/clients/peerbit/.aegir.js b/packages/clients/peerbit/.aegir.js index 64df93a99..87930e33b 100644 --- a/packages/clients/peerbit/.aegir.js +++ b/packages/clients/peerbit/.aegir.js @@ -3,8 +3,14 @@ import * as findUp from "find-up"; import fs from "fs"; import { createRequire } from "module"; import path from "path"; +import { fileURLToPath } from "url"; -const root = path.dirname(await findUp.findUp(".git", { type: "directory" })); +// In git worktrees `.git` is a file (not a directory), so allow both. +const configDir = path.dirname(fileURLToPath(import.meta.url)); +const gitPath = + (await findUp.findUp(".git", { cwd: configDir, type: "directory" })) ?? + (await findUp.findUp(".git", { cwd: configDir, type: "file" })); +const root = gitPath ? path.dirname(gitPath) : configDir; const resolverFromRoot = createRequire(path.join(root, "package.json")); const resolverFromLocal = createRequire(import.meta.url); diff --git a/packages/programs/data/shared-log/src/index.ts b/packages/programs/data/shared-log/src/index.ts index 6549b9c1e..fe673a5ef 100644 --- a/packages/programs/data/shared-log/src/index.ts +++ b/packages/programs/data/shared-log/src/index.ts @@ -468,6 +468,11 @@ export class SharedLog< uniqueReplicators!: Set; private _replicatorsReconciled!: boolean; + private _subscribedPeers!: Set; + private _pendingReplicationInfoRequestTimers!: Map< + string, + ReturnType + >; /* private _totalParticipation!: number; */ @@ -518,9 +523,20 @@ export class SharedLog< > >; // map of peerId to timeout - private latestReplicationInfoMessage!: Map; + private latestReplicationInfoMessage!: Map; + private _replicationInfoApplyQueue!: Map>; + private _pendingReplicationInfoMessages!: Map< + string, + { + from: PublicSignKey; + message: + | AllReplicatingSegmentsMessage + | AddedReplicationSegmentMessage; + timestamp: bigint; + } + >; - private remoteBlocks!: RemoteBlocks; + private remoteBlocks!: RemoteBlocks; private openTime!: number; private oldestOpenTime!: number; @@ -1150,26 +1166,28 @@ export class SharedLog< checkDuplicates?: boolean; timestamp?: number; } = {}, - ) { - if (this._isTrustedReplicator && !(await this._isTrustedReplicator(from))) { - return undefined; - } - let isNewReplicator = false; - let timestamp = BigInt(ts ?? +new Date()); - rebalance = rebalance == null ? true : rebalance; - - let diffs: ReplicationChanges>; - let deleted: ReplicationRangeIndexable[] | undefined = undefined; - if (reset) { - deleted = ( - await this.replicationIndex - .iterate({ - query: { hash: from.hashcode() }, - }) - .all() - ).map((x) => x.value); + ) { + if (this._isTrustedReplicator && !(await this._isTrustedReplicator(from))) { + return undefined; + } + const fromHash = from.hashcode(); + let isNewReplicator = false; + let timestamp = BigInt(ts ?? +new Date()); + rebalance = rebalance == null ? true : rebalance; + + let diffs: ReplicationChanges>; + let deleted: ReplicationRangeIndexable[] | undefined = undefined; + let willHaveSegmentsAfterApply = false; + if (reset) { + deleted = ( + await this.replicationIndex + .iterate({ + query: { hash: fromHash }, + }) + .all() + ).map((x) => x.value); - let prevCount = deleted.length; + const prevCount = deleted.length; const existingById = new Map(deleted.map((x) => [x.idString, x])); const hasSameRanges = @@ -1180,27 +1198,30 @@ export class SharedLog< }); // Avoid churn on repeated full-state announcements that don't change any - // replication ranges. This prevents unnecessary `replication:change` - // events and rebalancing cascades. - if (hasSameRanges) { - diffs = []; - } else { - await this.replicationIndex.del({ query: { hash: from.hashcode() } }); + // replication ranges. This prevents unnecessary `replication:change` + // events and rebalancing cascades. + if (hasSameRanges) { + diffs = []; + willHaveSegmentsAfterApply = prevCount > 0; + } else { + await this.replicationIndex.del({ query: { hash: fromHash } }); - diffs = [ - ...deleted.map((x) => { - return { range: x, type: "removed" as const, timestamp }; - }), - ...ranges.map((x) => { - return { range: x, type: "added" as const, timestamp }; - }), - ]; - } + diffs = [ + ...deleted.map((x) => { + return { range: x, type: "removed" as const, timestamp }; + }), + ...ranges.map((x) => { + return { range: x, type: "added" as const, timestamp }; + }), + ]; - isNewReplicator = prevCount === 0 && ranges.length > 0; - } else { - let batchSize = 100; - let existing: ReplicationRangeIndexable[] = []; + willHaveSegmentsAfterApply = ranges.length > 0; + } + + isNewReplicator = prevCount === 0 && ranges.length > 0; + } else { + let batchSize = 100; + let existing: ReplicationRangeIndexable[] = []; for (let i = 0; i < ranges.length; i += batchSize) { const results = await this.replicationIndex .iterate( @@ -1218,15 +1239,15 @@ export class SharedLog< } } - let prevCountForOwner: number | undefined = undefined; - if (existing.length === 0) { - prevCountForOwner = await this.replicationIndex.count({ - query: new StringMatch({ key: "hash", value: from.hashcode() }), - }); - isNewReplicator = prevCountForOwner === 0; - } else { - isNewReplicator = false; - } + let prevCountForOwner: number | undefined = undefined; + if (existing.length === 0) { + prevCountForOwner = await this.replicationIndex.count({ + query: new StringMatch({ key: "hash", value: fromHash }), + }); + isNewReplicator = prevCountForOwner === 0; + } else { + isNewReplicator = false; + } if ( checkDuplicates && @@ -1276,16 +1297,18 @@ export class SharedLog< type: "added" as const, }; } - }) - .flat() as ReplicationChanges>; - diffs = changes; - } + }) + .flat() as ReplicationChanges>; + diffs = changes; - this.uniqueReplicators.add(from.hashcode()); + const hadSegmentsBefore = + (prevCountForOwner ?? 0) > 0 || existing.some((x) => x.hash === fromHash); + willHaveSegmentsAfterApply = hadSegmentsBefore || ranges.length > 0; + } - let now = +new Date(); - let minRoleAge = await this.getDefaultMinRoleAge(); - let isAllMature = true; + let now = +new Date(); + let minRoleAge = await this.getDefaultMinRoleAge(); + let isAllMature = true; for (const diff of diffs) { if (diff.type === "added") { @@ -1365,14 +1388,22 @@ export class SharedLog< } } } - // else replaced, do nothing - } + // else replaced, do nothing + } - if (diffs.length > 0) { - if (reset) { - await this.updateOldestTimestampFromIndex(); + // Keep `uniqueReplicators` consistent with the persisted replication index. + // (Avoids "replicator present" signals before index changes are durable.) + if (willHaveSegmentsAfterApply) { + this.uniqueReplicators.add(fromHash); + } else { + this.uniqueReplicators.delete(fromHash); } + if (diffs.length > 0) { + if (reset) { + await this.updateOldestTimestampFromIndex(); + } + this.events.dispatchEvent( new CustomEvent("replication:change", { detail: { publicKey: from }, @@ -1890,15 +1921,19 @@ export class SharedLog< this.indexableDomain = createIndexableDomainFromResolution( this.domain.resolution, ); - this._respondToIHaveTimeout = options?.respondToIHaveTimeout ?? 2e4; - this._pendingDeletes = new Map(); - this._pendingIHave = new Map(); - this.latestReplicationInfoMessage = new Map(); - this.coordinateToHash = new Cache({ max: 1e6, ttl: 1e4 }); - this.recentlyRebalanced = new Cache({ max: 1e4, ttl: 1e5 }); + this._respondToIHaveTimeout = options?.respondToIHaveTimeout ?? 2e4; + this._pendingDeletes = new Map(); + this._pendingIHave = new Map(); + this.latestReplicationInfoMessage = new Map(); + this._replicationInfoApplyQueue = new Map(); + this._pendingReplicationInfoMessages = new Map(); + this.coordinateToHash = new Cache({ max: 1e6, ttl: 1e4 }); + this.recentlyRebalanced = new Cache({ max: 1e4, ttl: 1e5 }); this.uniqueReplicators = new Set(); this._replicatorsReconciled = false; + this._subscribedPeers = new Set(); + this._pendingReplicationInfoRequestTimers = new Map(); this.openTime = +new Date(); this.oldestOpenTime = this.openTime; @@ -2216,8 +2251,8 @@ export class SharedLog< } } - async afterOpen(): Promise { - await super.afterOpen(); + async afterOpen(): Promise { + await super.afterOpen(); // We do this here, because these calls requires this.closed == false void this.pruneOfflineReplicators() @@ -2234,22 +2269,128 @@ export class SharedLog< await this.rebalanceParticipation(); // Take into account existing subscription - (await this.node.services.pubsub.getSubscribers(this.topic))?.forEach( - (v, k) => { - if (v.equals(this.node.identity.publicKey)) { + (await this.node.services.pubsub.getSubscribers(this.topic))?.forEach( + (v, k) => { + if (v.equals(this.node.identity.publicKey)) { + return; + } + if (this.closed) { + return; + } + this.handleSubscriptionChange(v, [this.topic], true); + }, + ); + + // Replication-info messages can arrive early during startup (especially with + // faster pubsub subscription discovery). If applying replication info fails + // due to indices not being started yet, we buffer the latest message per peer + // and retry here once we're fully open. + await this._drainPendingReplicationInfoMessages(); + } + + private _enqueueApplyReplicationInfo( + from: PublicSignKey, + message: + | AllReplicatingSegmentsMessage + | AddedReplicationSegmentMessage, + messageTimestamp: bigint, + ): Promise { + const fromHash = from.hashcode(); + + const apply = async (): Promise => { + const prevApplied = this.latestReplicationInfoMessage.get(fromHash); + if (prevApplied != null && prevApplied > messageTimestamp) { return; } + if (this.closed) { return; } - this.handleSubscriptionChange(v, [this.topic], true); - }, - ); - } - async reset() { - await this.log.load({ reset: true }); - } + // If we received replication info, we don't need a pending request timer anymore. + this._clearPendingReplicationInfoRequest(fromHash); + + const reset = message instanceof AllReplicatingSegmentsMessage; + const bufferForRetry = () => { + const prevPending = this._pendingReplicationInfoMessages.get(fromHash); + if (prevPending == null || prevPending.timestamp < messageTimestamp) { + this._pendingReplicationInfoMessages.set(fromHash, { + from, + message, + timestamp: messageTimestamp, + }); + } + }; + try { + // If indices are not started yet, avoid partially mutating state in + // `addReplicationRange()`; buffer and retry once we're fully open. + await this.replicationIndex.getSize(); + + await this.addReplicationRange( + message.segments.map((x) => x.toReplicationRangeIndexable(from)), + from, + { + reset, + checkDuplicates: true, + timestamp: Number(messageTimestamp), + }, + ); + + const currentApplied = this.latestReplicationInfoMessage.get(fromHash); + if (currentApplied == null || currentApplied < messageTimestamp) { + this.latestReplicationInfoMessage.set(fromHash, messageTimestamp); + } + } catch (e) { + if (isNotStartedError(e as Error)) { + bufferForRetry(); + return; + } + throw e; + } + }; + + const previous = + this._replicationInfoApplyQueue.get(fromHash) ?? Promise.resolve(); + const next = previous.catch(() => {}).then(apply); + this._replicationInfoApplyQueue.set(fromHash, next); + void next + .finally(() => { + if (this._replicationInfoApplyQueue.get(fromHash) === next) { + this._replicationInfoApplyQueue.delete(fromHash); + } + }) + .catch(() => { + // Errors are handled by callers; prevent unhandled rejection noise. + }); + return next; + } + + private async _drainPendingReplicationInfoMessages(): Promise { + if (this.closed || this._pendingReplicationInfoMessages.size === 0) { + return; + } + + const pending = [...this._pendingReplicationInfoMessages.values()]; + this._pendingReplicationInfoMessages.clear(); + + await Promise.all( + pending.map((p) => + this._enqueueApplyReplicationInfo(p.from, p.message, p.timestamp).catch( + (e) => { + logger.error( + `Failed to apply buffered replication settings from '${p.from.hashcode()}': ${ + e?.message ?? e + }`, + ); + }, + ), + ), + ); + } + + async reset() { + await this.log.load({ reset: true }); + } async pruneOfflineReplicators() { // go through all segments and for waitForAll replicators to become reachable if not prune them away @@ -2522,13 +2663,15 @@ export class SharedLog< v.clear(); } - await this.remoteBlocks.stop(); - this._pendingDeletes.clear(); - this._pendingIHave.clear(); - this.latestReplicationInfoMessage.clear(); - this._gidPeersHistory.clear(); - this._requestIPruneSent.clear(); - this._requestIPruneResponseReplicatorSet.clear(); + await this.remoteBlocks.stop(); + this._pendingDeletes.clear(); + this._pendingIHave.clear(); + this.latestReplicationInfoMessage.clear(); + this._replicationInfoApplyQueue.clear(); + this._pendingReplicationInfoMessages.clear(); + this._gidPeersHistory.clear(); + this._requestIPruneSent.clear(); + this._requestIPruneResponseReplicatorSet.clear(); this.pruneDebouncedFn = undefined as any; this.rebalanceParticipationDebounced = undefined; this._replicationRangeIndex.stop(); @@ -2964,51 +3107,25 @@ export class SharedLog< | AddedReplicationSegmentMessage; // Process replication updates even if the sender isn't yet considered "ready" by - // `Program.waitFor()`. Dropping these messages can lead to missing replicator info - // (and downstream `waitForReplicator()` timeouts) under timing-sensitive joins. - const from = context.from!; - const messageTimestamp = context.message.header.timestamp; - (async () => { - const prev = this.latestReplicationInfoMessage.get(from.hashcode()); - if (prev && prev > messageTimestamp) { - return; - } - - this.latestReplicationInfoMessage.set( - from.hashcode(), - messageTimestamp, - ); - - if (this.closed) { - return; - } - - const reset = msg instanceof AllReplicatingSegmentsMessage; - await this.addReplicationRange( - replicationInfoMessage.segments.map((x) => - x.toReplicationRangeIndexable(from), - ), + // `Program.waitFor()`. Dropping these messages can lead to missing replicator info + // (and downstream `waitForReplicator()` timeouts) under timing-sensitive joins. + const from = context.from!; + const messageTimestamp = context.message.header.timestamp; + void this._enqueueApplyReplicationInfo( from, - { - reset, - checkDuplicates: true, - timestamp: Number(messageTimestamp), - }, - ); - })().catch((e) => { - if (isNotStartedError(e)) { + replicationInfoMessage, + messageTimestamp, + ).catch((e) => { + logger.error( + `Failed to apply replication settings from '${from.hashcode()}': ${ + e?.message ?? e + }`, + ); + }); + } else if (msg instanceof StoppedReplicating) { + if (context.from.equals(this.node.identity.publicKey)) { return; } - logger.error( - `Failed to apply replication settings from '${from.hashcode()}': ${ - e?.message ?? e - }`, - ); - }); - } else if (msg instanceof StoppedReplicating) { - if (context.from.equals(this.node.identity.publicKey)) { - return; - } const rangesToRemove = await this.resolveReplicationRangesFromIdsAndKey( msg.segmentIds, @@ -3913,6 +4030,51 @@ export class SharedLog< ); } + private _clearPendingReplicationInfoRequest(peerHash: string) { + const timer = this._pendingReplicationInfoRequestTimers.get(peerHash); + if (timer != null) { + clearTimeout(timer); + this._pendingReplicationInfoRequestTimers.delete(peerHash); + } + } + + private _scheduleRequestReplicationInfo(peer: PublicSignKey) { + const peerHash = peer.hashcode(); + if (this.latestReplicationInfoMessage.get(peerHash) != null) { + return; + } + if (this._pendingReplicationInfoRequestTimers.has(peerHash)) { + return; + } + + // Often the peer will proactively announce its replication segments right after + // subscribing. Delay the request a bit to avoid redundant request/response pairs + // (and to reduce the chance of racy overwrites during startup). + const REQUEST_DELAY_MS = 100; + + const timer = setTimeout(() => { + this._pendingReplicationInfoRequestTimers.delete(peerHash); + + if (this.closed || !this._subscribedPeers.has(peerHash)) { + return; + } + + // We might have learned their replication info before the subscription event + // handler ran; avoid sending a redundant request in that race. + if (this.latestReplicationInfoMessage.get(peerHash) != null) { + return; + } + + this.rpc + .send(new RequestReplicationInfoMessage(), { + mode: new SeekDelivery({ redundancy: 1, to: [peer] }), + }) + .catch((e) => logger.error(e.toString())); + }, REQUEST_DELAY_MS); + + this._pendingReplicationInfoRequestTimers.set(peerHash, timer); + } + async handleSubscriptionChange( publicKey: PublicSignKey, topics: string[], @@ -3922,18 +4084,34 @@ export class SharedLog< return; } + const peerHash = publicKey.hashcode(); + + // Pubsub subscription events can be emitted multiple times for the same peer + // during startup (and we also reconcile existing subscribers in `afterOpen()`). + // Make this handler idempotent so we don't spam replication-info requests and + // duplicate full-state announcements. + if (subscribed) { + if (this._subscribedPeers.has(peerHash)) { + return; + } + this._subscribedPeers.add(peerHash); + } else { + this._subscribedPeers.delete(peerHash); + this._clearPendingReplicationInfoRequest(peerHash); + } + if (!subscribed) { - this.removePeerFromGidPeerHistory(publicKey.hashcode()); + this.removePeerFromGidPeerHistory(peerHash); for (const [k, v] of this._requestIPruneSent) { - v.delete(publicKey.hashcode()); + v.delete(peerHash); if (v.size === 0) { this._requestIPruneSent.delete(k); } } for (const [k, v] of this._requestIPruneResponseReplicatorSet) { - v.delete(publicKey.hashcode()); + v.delete(peerHash); if (v.size === 0) { this._requestIPruneResponseReplicatorSet.delete(k); } @@ -3942,7 +4120,7 @@ export class SharedLog< this.syncronizer.onPeerDisconnected(publicKey); (await this.replicationIndex.count({ - query: { hash: publicKey.hashcode() }, + query: { hash: peerHash }, })) > 0 && this.events.dispatchEvent( new CustomEvent("replicator:leave", { @@ -3978,11 +4156,7 @@ export class SharedLog< // Request the remote peer's replication info. This makes joins resilient to // timing-sensitive delivery/order issues where we may miss their initial // replication announcement. - this.rpc - .send(new RequestReplicationInfoMessage(), { - mode: new SeekDelivery({ redundancy: 1, to: [publicKey] }), - }) - .catch((e) => logger.error(e.toString())); + this._scheduleRequestReplicationInfo(publicKey); } else { await this.removeReplicator(publicKey); } @@ -4433,16 +4607,17 @@ export class SharedLog< } async _onUnsubscription(evt: CustomEvent) { - logger.trace( - `Peer disconnected '${evt.detail.from.hashcode()}' from '${JSON.stringify( - evt.detail.topics.map((x) => x), - )} '`, - ); - this.latestReplicationInfoMessage.delete(evt.detail.from.hashcode()); + logger.trace( + `Peer disconnected '${evt.detail.from.hashcode()}' from '${JSON.stringify( + evt.detail.topics.map((x) => x), + )} '`, + ); + this.latestReplicationInfoMessage.delete(evt.detail.from.hashcode()); + this._pendingReplicationInfoMessages.delete(evt.detail.from.hashcode()); - return this.handleSubscriptionChange( - evt.detail.from, - evt.detail.topics, + return this.handleSubscriptionChange( + evt.detail.from, + evt.detail.topics, false, ); } diff --git a/packages/transport/pubsub/src/index.ts b/packages/transport/pubsub/src/index.ts index 23a9d6aee..d8b58b050 100644 --- a/packages/transport/pubsub/src/index.ts +++ b/packages/transport/pubsub/src/index.ts @@ -78,6 +78,7 @@ export class DirectSub extends DirectStream implements PubSub { private debounceSubscribeAggregator: DebouncedAccumulatorCounterMap; private debounceUnsubscribeAggregator: DebouncedAccumulatorCounterMap; + private pendingSubscriptions: Set; constructor( components: DirectSubComponents, @@ -93,6 +94,7 @@ export class DirectSub extends DirectStream implements PubSub { this.peerToTopic = new Map(); this.dispatchEventOnSelfPublish = props?.dispatchEventOnSelfPublish || false; + this.pendingSubscriptions = new Set(); this.debounceSubscribeAggregator = debouncedAccumulatorSetCounter( (set) => this._subscribe([...set.values()]), props?.subscriptionDebounceDelay ?? 50, @@ -108,6 +110,7 @@ export class DirectSub extends DirectStream implements PubSub { this.topics.clear(); this.peerToTopic.clear(); this.topicsToPeers.clear(); + this.pendingSubscriptions.clear(); this.debounceSubscribeAggregator.close(); this.debounceUnsubscribeAggregator.close(); return super.stop(); @@ -124,7 +127,13 @@ export class DirectSub extends DirectStream implements PubSub { } async subscribe(topic: string) { - // this.debounceUnsubscribeAggregator.delete(topic); + this.pendingSubscriptions.add(topic); + + // NOTE: subscribe() is debounced, but we still need to start tracking the topic + // immediately. Otherwise we can drop incoming Subscribe traffic during the + // debounce window (`this.topics.get(topic)` is undefined) and/or fail to + // advertise pending subscribes in requestSubscribers-style handshakes. + this.initializeTopic(topic); return this.debounceSubscribeAggregator.add({ key: topic }); } @@ -141,20 +150,24 @@ export class DirectSub extends DirectStream implements PubSub { } const newTopicsForTopicData: string[] = []; - for (const { key: topic, counter } of topics) { - let prev = this.subscriptions.get(topic); - if (prev) { - prev.counter += counter; - } else { + for (const { key: topic, counter } of topics) { + let prev = this.subscriptions.get(topic); + if (prev) { + prev.counter += counter; + } else { prev = { counter: counter, }; this.subscriptions.set(topic, prev); - newTopicsForTopicData.push(topic); - this.listenForSubscribers(topic); + newTopicsForTopicData.push(topic); + this.listenForSubscribers(topic); + } + + // `_subscribe()` updates `subscriptions` synchronously; once we're here the + // topic is no longer "pending" (debounce window). + this.pendingSubscriptions.delete(topic); } - } if (newTopicsForTopicData.length > 0) { const message = new DataMessage({ @@ -176,8 +189,34 @@ export class DirectSub extends DirectStream implements PubSub { } async unsubscribe(topic: string) { + this.pendingSubscriptions.delete(topic); + if (this.debounceSubscribeAggregator.has(topic)) { this.debounceSubscribeAggregator.delete(topic); // cancel subscription before it performed + + // If `subscribe()` eagerly initialized the topic, undo it when the debounced + // subscribe is cancelled to avoid "ghost topic" tracking (which can affect + // relay heuristics that use `this.topics.has(topic)`). + if (!this.subscriptions.get(topic)) { + const peersOnTopic = this.topicsToPeers.get(topic); + if (peersOnTopic) { + for (const peerHash of peersOnTopic) { + this.peerToTopic.get(peerHash)?.delete(topic); + if (!this.peerToTopic.get(peerHash)?.size) { + this.peerToTopic.delete(peerHash); + } + + this.lastSubscriptionMessages.get(peerHash)?.delete(topic); + if (!this.lastSubscriptionMessages.get(peerHash)?.size) { + this.lastSubscriptionMessages.delete(peerHash); + } + } + } + + this.topics.delete(topic); + this.topicsToPeers.delete(topic); + } + return false; } const subscriptions = this.subscriptions.get(topic); @@ -602,19 +641,18 @@ export class DirectSub extends DirectStream implements PubSub { (x) => this.publicKeyHash === x, ); - let isForMe: boolean; - if (pubsubMessage.strict) { - isForMe = - !!pubsubMessage.topics.find((topic) => - this.subscriptions.has(topic), - ) && meInTOs; - } else { - isForMe = - !!pubsubMessage.topics.find((topic) => - this.subscriptions.has(topic), - ) || - (pubsubMessage.topics.length === 0 && meInTOs); - } + let isForMe: boolean; + const hasLocalInterest = !!pubsubMessage.topics.find( + (topic) => + this.subscriptions.has(topic) || + this.pendingSubscriptions.has(topic), + ); + if (pubsubMessage.strict) { + isForMe = hasLocalInterest && meInTOs; + } else { + isForMe = + hasLocalInterest || (pubsubMessage.topics.length === 0 && meInTOs); + } if (isForMe) { if ((await this.verifyAndProcess(message)) === false) { @@ -724,15 +762,23 @@ export class DirectSub extends DirectStream implements PubSub { ); } - if (pubsubMessage.requestSubscribers) { - // respond if we are subscribing - const mySubscriptions = this.getSubscriptionOverlap( - pubsubMessage.topics, - ); - if (mySubscriptions.length > 0) { - const response = new DataMessage({ - data: toUint8Array( - new Subscribe({ + if (pubsubMessage.requestSubscribers) { + // respond if we are subscribing + const mySubscriptions = this.getSubscriptionOverlap( + pubsubMessage.topics, + ); + for (const topic of pubsubMessage.topics) { + if ( + !mySubscriptions.includes(topic) && + this.pendingSubscriptions.has(topic) + ) { + mySubscriptions.push(topic); + } + } + if (mySubscriptions.length > 0) { + const response = new DataMessage({ + data: toUint8Array( + new Subscribe({ topics: mySubscriptions, requestSubscribers: false, }).bytes(), @@ -795,14 +841,14 @@ export class DirectSub extends DirectStream implements PubSub { // DONT await this since it might introduce a dead-lock this.relayMessage(from, message).catch(logErrorIfStarted); - } else if (pubsubMessage instanceof GetSubscribers) { - const subscriptionsToSend: string[] = this.getSubscriptionOverlap( - pubsubMessage.topics, - ); - if (subscriptionsToSend.length > 0) { - // respond - this.publishMessage( - this.publicKey, + } else if (pubsubMessage instanceof GetSubscribers) { + const subscriptionsToSend: string[] = this.getSubscriptionOverlap( + pubsubMessage.topics, + ); + if (subscriptionsToSend.length > 0) { + // respond + this.publishMessage( + this.publicKey, await new DataMessage({ data: toUint8Array( new Subscribe({ diff --git a/packages/transport/pubsub/test/bug1-initializeTopic-race.spec.ts b/packages/transport/pubsub/test/bug1-initializeTopic-race.spec.ts new file mode 100644 index 000000000..d75257bd1 --- /dev/null +++ b/packages/transport/pubsub/test/bug1-initializeTopic-race.spec.ts @@ -0,0 +1,102 @@ +import { TestSession } from "@peerbit/libp2p-test-utils"; +import { waitForNeighbour } from "@peerbit/stream"; +import { waitForResolved } from "@peerbit/time"; +import { expect } from "chai"; +import { DirectSub } from "../src/index.js"; + +/** + * Regression: `DirectSub.subscribe()` is debounced, but we still must initialize topic tracking + * synchronously. Otherwise, incoming `Subscribe` messages can arrive before `_subscribe()` runs + * and be dropped because `topics.get(topic)` is undefined. + */ +describe("BUG: initializeTopic race", function () { + this.timeout(20_000); + + it("initializes topic tracking immediately on subscribe() (before debounce fires)", async () => { + const session = (await TestSession.disconnected(1, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + })) as TestSession<{ pubsub: DirectSub }>; + + try { + const a = session.peers[0].services.pubsub; + const TOPIC = "bug-initTopic-immediate"; + + // Don't await; we want to assert the synchronous side effects of subscribe(). + const pending = a.subscribe(TOPIC); + + expect(a.topics.has(TOPIC)).to.equal(true); + expect(a.topics.get(TOPIC)).to.be.instanceof(Map); + + await pending; + } finally { + await session.stop(); + } + }); + + it("subscribe and connect concurrently does not miss remote subscription", async () => { + const session = (await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + })) as TestSession<{ pubsub: DirectSub }>; + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + const TOPIC = "bug-initTopic-subscribe-connect"; + + // Subscribe before connecting; subscriptions will be sent during connect/join. + await Promise.all([a.subscribe(TOPIC), b.subscribe(TOPIC)]); + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + await waitForResolved(() => { + expect(a.topics.get(TOPIC)?.has(b.publicKeyHash)).to.equal(true); + expect(b.topics.get(TOPIC)?.has(a.publicKeyHash)).to.equal(true); + }); + } finally { + await session.stop(); + } + }); + + it("subscribe after connect still works", async () => { + const session = (await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + })) as TestSession<{ pubsub: DirectSub }>; + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + const TOPIC = "bug-initTopic-subscribe-after-connect"; + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + await Promise.all([a.subscribe(TOPIC), b.subscribe(TOPIC)]); + + await waitForResolved(() => { + expect(a.topics.get(TOPIC)?.has(b.publicKeyHash)).to.equal(true); + expect(b.topics.get(TOPIC)?.has(a.publicKeyHash)).to.equal(true); + }); + } finally { + await session.stop(); + } + }); +}); diff --git a/packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts b/packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts new file mode 100644 index 000000000..6da8aee26 --- /dev/null +++ b/packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts @@ -0,0 +1,230 @@ +import { TestSession } from "@peerbit/libp2p-test-utils"; +import { waitForNeighbour } from "@peerbit/stream"; +import { SeekDelivery } from "@peerbit/stream-interface"; +import { delay, waitForResolved } from "@peerbit/time"; +import { expect } from "chai"; +import { DirectSub } from "../src/index.js"; + +const deferred = () => { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +}; + +describe("BUG: subscribe debounce race", function () { + this.timeout(20_000); + + it("a pending debounced subscribe is advertised via Subscribe{requestSubscribers:true} response", async () => { + const TOPIC = "pending-subscribe-visible"; + + const session = (await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + })) as TestSession<{ pubsub: DirectSub }>; + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + // Block A._subscribe() so that A remains "pending" (no `subscriptions` entry). + const gate = deferred(); + const aAny = a as any; + const originalSubscribeImpl = aAny._subscribe.bind(aAny); + aAny._subscribe = async (...args: any[]) => { + await gate.promise; + return originalSubscribeImpl(...args); + }; + + let aSubscribeDone = false; + const aSubscribe = a.subscribe(TOPIC).then(() => { + aSubscribeDone = true; + }); + + await b.subscribe(TOPIC); + + await waitForResolved(() => { + expect(aSubscribeDone, "A.subscribe should still be pending").to.equal(false); + expect(b.topics.get(TOPIC)?.has(a.publicKeyHash)).to.equal(true); + }); + + // Cleanup: allow A._subscribe to proceed so we don't leave dangling work. + gate.resolve(); + await aSubscribe; + } finally { + await session.stop(); + } + }); + + it("incoming Subscribe is not dropped during the local debounce window (topic initialized in subscribe)", async () => { + const TOPIC = "incoming-subscribe-not-dropped"; + + const session = (await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + })) as TestSession<{ pubsub: DirectSub }>; + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + // Block A._subscribe() to force A to remain in the debounce window. + const gate = deferred(); + const aAny = a as any; + const originalSubscribeImpl = aAny._subscribe.bind(aAny); + aAny._subscribe = async (...args: any[]) => { + await gate.promise; + return originalSubscribeImpl(...args); + }; + + // A is "pending subscribe" for TOPIC. The bug is that, without eager topic + // init, A drops B's Subscribe message because `topics.get(TOPIC)` is undefined. + const aSubscribe = a.subscribe(TOPIC); + + await b.subscribe(TOPIC); + + await waitForResolved(() => { + expect(a.topics.get(TOPIC)?.has(b.publicKeyHash)).to.equal(true); + }); + + // Cleanup + gate.resolve(); + await aSubscribe; + } finally { + await session.stop(); + } + }); + + it("subscribe then unsubscribe within debounce does not advertise or retain the topic", async () => { + const TOPIC = "subscribe-unsubscribe-before-debounce"; + + const session = (await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + })) as TestSession<{ pubsub: DirectSub }>; + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + // Block A._subscribe() so A never reaches "subscriptions set" state. + const gate = deferred(); + const aAny = a as any; + const originalSubscribeImpl = aAny._subscribe.bind(aAny); + aAny._subscribe = async (...args: any[]) => { + await gate.promise; + return originalSubscribeImpl(...args); + }; + + // Start subscribe (pending) but don't await it; then cancel before debounce. + const aSubscribe = a.subscribe(TOPIC).catch(() => { + // Avoid unhandled rejections if cancellation rejects the promise. + }); + await a.unsubscribe(TOPIC); + + await b.requestSubscribers(TOPIC, a.publicKey); + await delay(250); + + expect(b.topics.get(TOPIC)?.has(a.publicKeyHash)).to.equal(false); + expect(a.topics.has(TOPIC)).to.equal(false); + + // Cleanup + gate.resolve(); + await aSubscribe; + } finally { + await session.stop(); + } + }); + + it("pending subscribe receives strict PubSubData", async () => { + const TOPIC = "pending-subscribe-receives-pubsubdata"; + + const session = (await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + })) as TestSession<{ pubsub: DirectSub }>; + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + // Block A._subscribe() so that A remains "pending" (no `subscriptions` entry). + const gate = deferred(); + const aAny = a as any; + const originalSubscribeImpl = aAny._subscribe.bind(aAny); + aAny._subscribe = async (...args: any[]) => { + await gate.promise; + return originalSubscribeImpl(...args); + }; + + let received = false; + const payload = new Uint8Array([1, 2, 3, 4]); + + const onData = (e: any) => { + if (!e?.detail?.data) { + return; + } + if (e.detail.data.topics?.includes(TOPIC)) { + received = true; + } + }; + a.addEventListener("data", onData); + + // Start subscribe (pending), but keep it blocked in the debounce window. + const aSubscribe = a.subscribe(TOPIC); + + // Publish a strict message to A. Without treating `pendingSubscriptions` as + // local interest, A would incorrectly ignore this message. + await b.publish(payload, { + topics: [TOPIC], + mode: new SeekDelivery({ redundancy: 1, to: [a.publicKeyHash] }), + }); + + await waitForResolved(() => { + expect(received).to.equal(true); + }); + + // Cleanup + a.removeEventListener("data", onData); + gate.resolve(); + await aSubscribe; + } finally { + await session.stop(); + } + }); +}); diff --git a/packages/utils/any-store/any-store/.aegir.js b/packages/utils/any-store/any-store/.aegir.js index 6a2b238c8..611ea3861 100644 --- a/packages/utils/any-store/any-store/.aegir.js +++ b/packages/utils/any-store/any-store/.aegir.js @@ -3,8 +3,14 @@ import * as findUp from "find-up"; import fs from "fs"; import { createRequire } from "module"; import path from "path"; +import { fileURLToPath } from "url"; -const root = path.dirname(await findUp.findUp(".git", { type: "directory" })); +// In git worktrees `.git` is a file (not a directory), so allow both. +const configDir = path.dirname(fileURLToPath(import.meta.url)); +const gitPath = + (await findUp.findUp(".git", { cwd: configDir, type: "directory" })) ?? + (await findUp.findUp(".git", { cwd: configDir, type: "file" })); +const root = gitPath ? path.dirname(gitPath) : configDir; const resolverFromRoot = createRequire(path.join(root, "package.json")); const resolverFromLocal = createRequire(import.meta.url); diff --git a/packages/utils/indexer/sqlite3/.aegir.js b/packages/utils/indexer/sqlite3/.aegir.js index 3a253af33..a237517cf 100644 --- a/packages/utils/indexer/sqlite3/.aegir.js +++ b/packages/utils/indexer/sqlite3/.aegir.js @@ -1,8 +1,14 @@ import * as findUp from "find-up"; import fs from "fs"; import path from "path"; +import { fileURLToPath } from "url"; -const root = path.dirname(await findUp.findUp(".git", { type: "directory" })); +// In git worktrees `.git` is a file (not a directory), so allow both. +const configDir = path.dirname(fileURLToPath(import.meta.url)); +const gitPath = + (await findUp.findUp(".git", { cwd: configDir, type: "directory" })) ?? + (await findUp.findUp(".git", { cwd: configDir, type: "file" })); +const root = gitPath ? path.dirname(gitPath) : configDir; export default { // test cmd options diff --git a/shared-log-debug-plan.md b/shared-log-debug-plan.md new file mode 100644 index 000000000..d5e97e39e --- /dev/null +++ b/shared-log-debug-plan.md @@ -0,0 +1,268 @@ +Use this as a running log of all findings. + +# Learnings +- 2026-02-06: Repo state: on branch `debug-connectivity` at `07ba57225` (`git log -1 --oneline`). `shared-log-debug.md` + `shared-log-debug-plan.md` are untracked local notes. +- 2026-02-06: `node_modules/` was missing initially (need `pnpm install` before running tests). +- 2026-02-06: `pnpm install` completed with warnings about failing to create `peerbit` bin symlink under `packages/clients/peerbit-server/frontend/node_modules/.bin/peerbit` due to missing `dist/src/bin.js` in `@peerbit/server`. (May be irrelevant for node tests, but flag in case build is required.) +- 2026-02-06: Local `DirectSub` pubsub code does **not** match the behavior described in PR #589 yet: + - `packages/transport/pubsub/src/index.ts:126` `subscribe(topic)` only enqueues into the debounced accumulator; it does **not** call `initializeTopic(topic)` eagerly. + - `packages/transport/pubsub/src/index.ts:729` Subscribe handler responds to `requestSubscribers` using only `getSubscriptionOverlap()` (based on `this.subscriptions`), with no inclusion of “pending debounced subscribes”. + - `packages/transport/pubsub/src/index.ts:799` GetSubscribers handler likewise uses only `getSubscriptionOverlap()`. + - `packages/transport/pubsub/src/index.ts:178` `unsubscribe()` cancels pending debounced subscribe via `debounceSubscribeAggregator.has(topic)`, but that is separate from requestSubscribers response behavior. +- 2026-02-06: Running shared-log tests requires a working aegir config and a built `dist/test/**` output. With `--no-build`, aegir/mocha only looks for `test/**/*.spec.*js` and `dist/test/**/*.spec.*js`, so TypeScript tests (`test/**/*.ts`) are ignored unless we build/emit JS first. +- 2026-02-06: `pnpm run build` succeeded after fixing `.aegir.js` worktree handling in multiple packages (`.aegir.js`, `packages/utils/any-store/any-store/.aegir.js`, `packages/clients/peerbit/.aegir.js`, `packages/utils/indexer/sqlite3/.aegir.js`). This produced `dist/test/**` outputs so `--no-build` test runs work locally. +- 2026-02-06: Test run: `shared-log` `events` `replicate:join not emitted on update` PASSED locally (`node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "replicate:join not emitted on update"`). This suggests the CI failure is flaky/timing-sensitive rather than deterministic in current local timing. +- 2026-02-06: Test run: `shared-log` `migration-8-9` suite PASSED locally (`--grep "migration-8-9"`). The CI timeouts likely require a narrower reproduction (timing/ordering) or are only triggered under different pubsub timing (e.g., PR #589 changes). +- 2026-02-06: Implemented pubsub debounce-race regression tests and ran them: PASSED (`node ./node_modules/aegir/src/index.js run test --roots ./packages/transport/pubsub -- -t node --no-build --grep "BUG: subscribe debounce race"`). This directly validates (1) eager `initializeTopic()` in `subscribe()`, (2) pending subscribes being advertised via `GetSubscribers`/requestSubscribers, and (3) subscribe->unsubscribe within debounce does not leak/retain a ghost topic. +- 2026-02-06: After applying the pubsub fix, shared-log flake is now locally reproducible: `events > replicate:join not emitted on update` **FAILED** with a duplicate `replicator:join` entry for the same peer hash (`node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "replicate:join not emitted on update"`). This strongly supports the hypothesis that pubsub timing changes expose a latent shared-log race (TOCTOU around “new replicator” detection). +- 2026-02-06: After applying the pubsub fix, shared-log migration flake is now locally reproducible: `migration-8-9 > 8-9, replicates database of 1 entry` **FAILED** with `expected +0 to equal 1` (timeout waiting for replication) (`node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "migration-8-9"`). This matches the CI symptom where replication never converges within `waitForResolved`'s default timeout. +- 2026-02-06: Shared-log fix iteration: + - Added per-peer serialization for replication-info apply + buffering of replication-info on NotStartedError, drained in `afterOpen()`. + - Result: `events > replicate:join not emitted on update` now PASSED (`--grep "replicate:join not emitted on update"`). +- 2026-02-06: Migration root cause (local repro) was *not* fixed by shared-log serialization alone. The missing piece was pubsub message processing during the debounce window: RPC messages sent to a peer during pending subscribe were not treated as “for me”. After changing `DirectSub` to treat pending subscribes as subscribed for PubSubData delivery, `migration-8-9` now PASSED (both `8-9` and `9-8`) and remains quick/stable in local runs. + +# Claims +## Claim1 (Unique Triple-Failure Signature Only Once) +- Verdict: **False** (appears in at least 2 PR #589 CI runs). +- Evidence: + - PR `dao-xyz/peerbit#589` run `21732319700` (job `62689580781`, `test:ci:part-4`) shows the triple failure (events + 2x migration). + - PR `dao-xyz/peerbit#589` run `21738359696` (job `62708216873`, `test:ci:part-4`) shows the same triple failure. + +## Claim2 (PR #589 Complete + All Tests Pass + “Definitive” Root Cause) +- Verdict: **Partially true**. +- Evidence: + - True: PR #589 has 4 commits on `fix/pubsub-initialize-topic-on-subscribe` (GitHub PR metadata). + - False: “All tests pass” (CI `test:ci:part-4` fails on shared-log in at least the runs above). + - Supported: pubsub tests pass in that PR run (`@peerbit/pubsub: 43 passing` in part-3 job output). + - Local repo cross-check: this repo does **not** contain PR #589 code changes yet (see Learnings). + - Local repo cross-check: shared-log has a plausible TOCTOU/serialization issue (replication-info fire-and-forget IIFE around `packages/programs/data/shared-log/src/index.ts:2971` + racy `isNewReplicator` check around `packages/programs/data/shared-log/src/index.ts:1200`). + - Note: PR #589 does **not** “remove the debounce delay”; it keeps debounced subscribe but changes initialization/advertising semantics. + +## Claim3 (waitForReplicator maturity timing + duplicate join “as expected”) +- Verdict: **Partially true**. +- Evidence: + - True: the test asserts ~3s wait (`packages/programs/data/shared-log/test/replicate.spec.ts` expects `>= 3000ms - 100ms`). + - False for the cited CI runs: in the failing PR #589 runs, `waitForReplicator waits until maturity` is shown as passing at ~3054-3056ms. + - Supported: CI failure output for the `events` assertion is consistent with a duplicate `replicator:join` emission (same peer hash twice). + +## Claim4 (“PR #3 persistCoordinate guard doesn’t fix TOCTOU race”) +- Verdict: **Partially true** (the “doesn’t fix TOCTOU race” conclusion is supported; the exact PR reference is likely misidentified). +- Evidence: + - The closest matching change appears to be `dao-xyz/peerbit#591` (`fix(shared-log): avoid unhandled rejection...`) which adjusts `persistCoordinate` and related shutdown handling, but does not touch replication-info IIFE / `addReplicationRange` join emission logic. + - Local repo cross-check: the replication-info IIFE begins at `packages/programs/data/shared-log/src/index.ts:2971`. + +# Ahas/Gotchas +- Aegir config bug: `.aegir.js` assumed `.git` is a directory; in this worktree `.git` is a file, causing `Error finding your config file` on any `aegir` command. Fixed by resolving root from the config file directory and falling back to `.git` file (`.aegir.js`). +- Aegir test gotcha: `--no-build` skips compilation, but tests live in TypeScript. Until `dist/test/**` exists, `aegir test --no-build` reports `Error: No test files found`. + +# Tests +## Proposed Quick Regression Tests (Additions) + +### PubSub / DirectSub +- Add: `packages/transport/pubsub/test/bug-pending-subscribe-requestSubscribers.spec.ts` (or inline into existing `packages/transport/pubsub/test/index.spec.ts`). +- Purpose: deterministically validate the PR #589 hypothesis: + - Eager topic initialization in `subscribe()` prevents incoming `Subscribe` drop during debounce. + - “Pending subscribe” is advertised via Subscribe(requestSubscribers) and GetSubscribers responses. +- Sketches (from subagent proposal): + - `requestSubscribers includes pending debounced subscribe`: block `(a as any)._subscribe` with a deferred promise; call `a.subscribe(TOPIC)` (pending); then from B call `await b.requestSubscribers(TOPIC, a.publicKey)`; assert B records A as a subscriber while A’s subscribe promise is still unresolved. + - `incoming Subscribe not dropped during local debounce window`: with A pending subscribe (topic initialized), let B do `await b.subscribe(TOPIC)`; assert A records B under `a.topics.get(TOPIC)` even though A’s `_subscribe` has not run. + - `subscribe then unsubscribe within debounce does not advertise/leak`: with A pending subscribe then `await a.unsubscribe(TOPIC)`, from B do `requestSubscribers(TOPIC, a.publicKey)`; assert B does not record A as subscriber. + +- 2026-02-06 Update: + - Added deterministic regression coverage in `packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts` (blocks `_subscribe()` to force the debounce window and asserts pending-subscribe behavior). + - Incorporated PR #589’s pubsub test into this worktree as `packages/transport/pubsub/test/bug1-initializeTopic-race.spec.ts` (`--grep "BUG: initializeTopic race"`). + +### Shared-Log +- Add: `packages/programs/data/shared-log/test/replication-info-race.spec.ts`. +- Purpose: deterministically validate the shared-log TOCTOU hypothesis: + - Two concurrent “replication-info apply” executions for the same peer must not emit duplicate `replicator:join`. +- Preferred shape: + - Call internal `addReplicationRange(...)` twice concurrently for the same `from` key (TS `private` is runtime-accessible), and deliberately widen the critical section by gating `replicationIndex.del` so both calls compute “new replicator” off the same empty state. +- Add (optional): `packages/programs/data/shared-log/test/replication-info-not-started-buffer.spec.ts`. +- Purpose: validate the “don’t drop replication-info on NotStartedError” hypothesis by forcing a `NotStarted`/`IndexNotStarted` error once and asserting it is retried/applied later. + +## Optional Existing Test Tightening +- `packages/programs/data/shared-log/test/events.spec.ts`: replace fixed `delay(2e3)` waits with `waitForResolved()` on concrete state (join/leave arrays length, replicationIndex contents). This reduces flakiness but shouldn’t be the primary fix. + +# Reviews +```md +## Review1/Review2 Cross-Check (Against Local Repo) + +### Validated (Local Code) +- The debounce window exists: `DirectSub.subscribe()` enqueues via `debounceSubscribeAggregator` and does not immediately set `topics`/`subscriptions` (`packages/transport/pubsub/src/index.ts:96`, `packages/transport/pubsub/src/index.ts:126`). +- `_subscribe()` is where `subscriptions` is set and `listenForSubscribers()` is called (`packages/transport/pubsub/src/index.ts:134`), which initializes topic tracking via `initializeTopic()` (`packages/transport/pubsub/src/index.ts:265`). +- Incoming `Subscribe` drops remote subscription info for uninitialized topics (`packages/transport/pubsub/src/index.ts:685-689`). +- Replies for `requestSubscribers` / `GetSubscribers` are computed from `this.subscriptions` only (`getSubscriptionOverlap()`), so pending debounced subscribes aren’t advertised (`packages/transport/pubsub/src/index.ts:451`, `packages/transport/pubsub/src/index.ts:727`, `packages/transport/pubsub/src/index.ts:798`). +- There is a known flaky concurrent subscribe/connect test already (currently commented out) (`packages/transport/pubsub/test/index.spec.ts:1370`). + +### Not True Locally +- PR #589 changes are not present in this repo: no eager `initializeTopic()` in `subscribe()` and no inclusion of pending topics in requestSubscribers responses. +- The PR #589 regression test files referenced in Review1/2 don’t exist locally. + +### Action Items +- Implement PR #589-style fix in this repo: + - eager `initializeTopic(topic)` in `subscribe()`. + - include pending topics (debounce aggregator) in overlap when responding to `Subscribe{requestSubscribers:true}` and `GetSubscribers`. +- Add tests for the above plus `subscribe()` then immediate `unsubscribe()` (ensure no “ghost topic” advertising). +- Re-enable/replace the disabled concurrent subscribe/connect test once the fix lands. +``` + +## 2026-02-06 Update: Reviews vs Current Repo State + +### PubSub (debounce + pending subscribe) + +- Confirmed: the review’s debounce-window hypothesis was correct (topic not initialized yet => inbound `Subscribe` can be dropped; subscribe handshakes can miss “pending” intent). +- Implemented in `packages/transport/pubsub/src/index.ts`: `subscribe()` now eagerly initializes topic tracking and keeps `pendingSubscriptions` until `_subscribe()` commits. +- Tests added in `packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts` now directly assert two review concerns: pending subscribes are discoverable via `Subscribe{requestSubscribers:true}` responses (even with `_subscribe()` blocked) and subscribe→unsubscribe within the debounce window does not advertise or retain a ghost topic. +- Extra nuance found while debugging shared-log: pending subscribes also must count as “local interest” for `PubSubData` delivery filtering; otherwise RPC traffic can be ignored during the debounce window (this was the missing piece for `migration-8-9` stability). + +Outdated/incorrect in the `Review1/Review2 Cross-Check` block above (now historical): + +- “PR #589 changes are not present in this repo” is no longer true in this worktree. +- “The PR #589 regression test files referenced in Review1/2 don’t exist locally” is now outdated in intent: equivalent coverage exists in `packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts`. +- “requestSubscribers / GetSubscribers replies are computed from `subscriptions` only” is now partially outdated: responses to `Subscribe{requestSubscribers:true}` include pending topics; responses to `GetSubscribers` still only reflect `subscriptions` (so the `requestSubscribers()` API still won’t surface a remote pending subscribe). + +### Shared-log (replication-info handling) + +- Confirmed: with pubsub timing improved, shared-log’s latent replication-info TOCTOU became reproducible locally (matching CI: duplicate `replicator:join` + migration replication timeouts). +- Implemented in `packages/programs/data/shared-log/src/index.ts`: replication-info apply is serialized per peer; `NotStartedError` replication-info is buffered and drained in `afterOpen()`; subscription-change handling is idempotent with delayed/cancellable `RequestReplicationInfoMessage`. +- Local cross-check (2026-02-06): `replicate:join not emitted on update`, `migration-8-9`, `will set replicaiton info on load`, and `applies replication segments even if waitFor() fails` all pass with `--no-build` greps. + +# 2026-02-06 Shared-Log Regression Failures (Post Replication-Info Queue/Buffer) +- Repro: `node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "will set replicaiton info on load"` fails with `expected 1 to equal 2` (`packages/programs/data/shared-log/test/load.spec.ts:213`). The test observes `uniqueReplicators.size === 2` but `replicationIndex.count() === 1`, which implies inconsistent in-memory vs persisted replication state. +- Repro: `node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "applies replication segments even if waitFor\\(\\) fails"` fails for both `u32-simple` and `u64-iblt` at the "clear segments" step (`packages/programs/data/shared-log/test/replication.spec.ts:216-224`): after sending `AllReplicatingSegmentsMessage({ segments: [] })`, db2 still reports `count({hash: fromHash}) === 1` instead of `0`. +- Hypothesis: `addReplicationRange()` mutates `uniqueReplicators` before `replicationIndex.put()`/`del()` completes (`packages/programs/data/shared-log/src/index.ts`), so tests that wait on `uniqueReplicators` can observe "replicator present" while the index is still incomplete. If `addReplicationRange()` throws `NotStartedError`, the new buffering path can also leave partial in-memory changes without a retry outside `afterOpen()`. +- Planned fix: make `addReplicationRange()` update `uniqueReplicators` only after index operations succeed and delete the peer when a `reset` results in zero ranges; add a read-only "index ready" probe in `_enqueueApplyReplicationInfo()` before calling `addReplicationRange()` to avoid partial mutations on `NotStartedError`. +- Note: `node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "does not lose entries when ranges rotate with delayed replication updates"` passed for both `u32-simple` and `u64-iblt` (~43s each) in a local run, so that previously observed failure may be flaky/timing-dependent. + +# Attack Plan + +## Root-Cause Summary (Definitive) + +### PubSub: `DirectSub` Pending-Subscribe Window + +During the subscribe debounce window we were in a "pending" state that was not treated as subscribed: +- `topics` not initialized yet, so inbound `Subscribe` could be dropped as "unknown topic". +- Pending subscribe not included in `Subscribe{requestSubscribers:true}` overlap responses. +- `PubSubData` delivery filtering only checked committed `subscriptions`, not pending subscribes (so shared-log RPC traffic could be ignored during the window). + +Net effect: timing-sensitive joins could miss the handshake or drop RPC traffic, surfacing as shared-log replication timeouts (notably `migration-8-9`) once connectivity got faster. + +### Shared-log: Replication-Info Apply Was Concurrent + Lossy + +Replication-info (`AllReplicatingSegmentsMessage` / `AddedReplicationSegmentMessage`) was applied via a fire-and-forget async IIFE, so multiple messages from the same peer could overlap and race through `addReplicationRange()` (TOCTOU on "isNewReplicator"). Startup-time `NotStartedError` could also effectively drop replication-info without a retry. + +Net effect: duplicate `replicator:join` for the same peer (CI `events.spec.ts` failure) and missing replication ranges leading to `waitForResolved` timeouts (CI `migration.spec.ts` failures). + +### Shared-log: Subscription Request Scheduling Amplified Races + +`handleSubscriptionChange()` could run multiple times for the same peer during startup and would request replication info immediately, creating redundant request/response pairs right as peers join. + +Net effect: amplified the concurrency window above by producing more replication-info traffic closer together. + +## Fix Inventory (What We Changed) + +- PubSub `DirectSub` (`packages/transport/pubsub/src/index.ts`): track `pendingSubscriptions`, eagerly `initializeTopic()` in `subscribe()`, advertise pending topics in `Subscribe{requestSubscribers:true}` overlap responses, treat pending topics as local interest for `PubSubData` delivery filtering, and clean up eager topic state when a debounced subscribe is cancelled (subscribe then unsubscribe within the window). +- Shared-log (`packages/programs/data/shared-log/src/index.ts`): serialize replication-info apply per peer, buffer latest replication-info per peer on `NotStartedError` and drain after `afterOpen()`, make subscription-change handling idempotent, and schedule/dedupe `RequestReplicationInfoMessage` to avoid redundant request/response pairs during startup. + +## Status (2026-02-06 Local) + +After `pnpm run build`, the sentinel greps below pass locally with `--no-build`. + +## Step-By-Step Attack Plan + +1. Build once (keep `dist/test/**` in sync; all `--no-build` greps assume this): + +```bash +pnpm run build +``` + +2. Smoke test the three fixes (fast, deterministic greps): + +```bash +# PubSub: pending-subscribe handling + PubSubData delivery while pending +node ./node_modules/aegir/src/index.js run test \ + --roots ./packages/transport/pubsub -- -t node --no-build \ + --grep "BUG: subscribe debounce race" + +# Shared-log: replication-info concurrency should not duplicate join +node ./node_modules/aegir/src/index.js run test \ + --roots ./packages/programs/data/shared-log -- -t node --no-build \ + --grep "replicate:join not emitted on update" + +# Shared-log: handshake should converge (no 10s waitForResolved timeout) +node ./node_modules/aegir/src/index.js run test \ + --roots ./packages/programs/data/shared-log -- -t node --no-build \ + --grep "migration-8-9" +``` + +3. Flake-proofing loops (cheap confidence that we closed the timing window): + +```bash +# Very fast loop (pubsub) +for i in {1..50}; do + node ./node_modules/aegir/src/index.js run test \ + --roots ./packages/transport/pubsub -- -t node --no-build \ + --grep "BUG: subscribe debounce race" || exit 1 +done + +# Shared-log join event (previously the CI symptom) +for i in {1..50}; do + node ./node_modules/aegir/src/index.js run test \ + --roots ./packages/programs/data/shared-log -- -t node --no-build \ + --grep "replicate:join not emitted on update" || exit 1 +done + +# Shared-log migration convergence (slower; fewer iterations is fine) +for i in {1..20}; do + node ./node_modules/aegir/src/index.js run test \ + --roots ./packages/programs/data/shared-log -- -t node --no-build \ + --grep "migration-8-9" || exit 1 +done +``` + +4. Lock in regressions with 2 shared-log tests (so we never reintroduce these). +Test 1: `replication-info is serialized per peer (no duplicate replicator:join)`. Arrange two replication-info applies for the same `from` to overlap; assert exactly one `replicator:join` and stable `replicationIndex` state. +Test 2: `replication-info is buffered on NotStarted and drained afterOpen()`. Force `replicationIndex` to throw `NotStartedError` once, deliver replication-info, then open fully; assert the buffered message eventually applies (replication ranges appear; join/replication converges). + +5. Keep a short "sentinel" grep set for future edits (run before/after any touch to pubsub/shared-log): + +```bash +node ./node_modules/aegir/src/index.js run test --roots ./packages/transport/pubsub -- -t node --no-build --grep "BUG: subscribe debounce race" +node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "replicate:join not emitted on update" +node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "migration-8-9" +node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "will set replicaiton info on load" +node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build --grep "applies replication segments even if waitFor\\(\\) fails" +``` + +# 2026-02-06 Resolution (Local) +- Shared-log full suite: `1743 passing (17m)` (`node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log -- -t node --no-build`). +- PubSub full suite: `44 passing (1m)` (`node ./node_modules/aegir/src/index.js run test --roots ./packages/transport/pubsub -- -t node --no-build`). +- Shared-log regressions from the earlier “Regression Failures” section are now fixed (`will set replicaiton info on load`, `applies replication segments even if waitFor() fails` now pass). +- Added an additional pubsub regression test for the “pending subscribe should still receive strict PubSubData” class: + - `packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts` `--grep "pending subscribe receives strict PubSubData"`. + +## Claims -> Tests Coverage +| Claim | Validated By (Existing Tests) | Gap / Missing Minimal Test | +|---|---|---| +| **Claim1:** “Triple-failure signature only happened once in CI” | None (CI-history, not runtime behavior) | Not testable as a unit/integration test in this repo; would need CI log mining tooling. | +| **Claim2 (pubsub):** debounce-window race is real; fix is eager topic init + advertise “pending subscribe” in subscriber-discovery handshake | `packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts` (eager init prevents dropped inbound `Subscribe`; pending advertised via `Subscribe{requestSubscribers:true}` response; subscribe→unsubscribe doesn’t leak ghost topic; pending receives strict `PubSubData`) | Still unclear/untested whether `requestSubscribers()` (GetSubscribers path) should include pending topics (currently: no). Add a minimal test to assert intended behavior. | +| **Claim2 (shared-log):** shared-log CI failures are from pre-existing TOCTOU race (concurrent replication-info apply) rather than pubsub logic | Symptom-level: `packages/programs/data/shared-log/test/events.spec.ts` `--grep "replicate:join not emitted on update"` | Missing deterministic regression: add `replication-info-race.spec.ts` that forces two concurrent replication-info applies for same peer and asserts only one `replicator:join`. | +| **Claim2 subclaim:** “pubsub fix removes the ~50ms debounce delay” | None | Not validated by tests; also contradicts current implementation (subscribe still debounced). | +| **Claim3:** “waitForReplicator waits until maturity” behavior | `packages/programs/data/shared-log/test/replicate.spec.ts` `--grep "waitForReplicator waits until maturity"` | Covered. | +| **Claim3 subclaim:** “duplicate `replicator:join` is expected from the TOCTOU race” | `packages/programs/data/shared-log/test/events.spec.ts` `--grep "replicate:join not emitted on update"` | Still missing the deterministic race test above (current is timing-sensitive). | +| **Claim4:** “persistCoordinate guard PR doesn’t fix the TOCTOU race” | None (depends on external patch/PR) | Minimal validation path: run the deterministic TOCTOU regression test with/without that guard change applied. | +| **Key pubsub/shared-log connectivity claim:** pending subscribe must count as “local interest” for `PubSubData` delivery (RPC during debounce otherwise ignored) | `packages/transport/pubsub/test/bug1-subscribe-debounce-race.spec.ts` `--grep "pending subscribe receives strict PubSubData"` | Covered. | +| **Key shared-log connectivity claim:** migration timeouts come from missed handshake and/or replication-info dropped on `NotStartedError` | Symptom-level: `packages/programs/data/shared-log/test/migration.spec.ts` `--grep "migration-8-9"` | Missing targeted tests: 1) force `NotStartedError` during replication-info apply and assert it’s buffered+applied later; 2) simulate missed subscribe event and assert startup backfill recovers (if/when implemented). | + +## 2026-02-06 Note: Replication-Info Timestamp “Strictness” +- The replication-info “newer wins” guard in shared-log is **per-sender**, based on `context.message.header.timestamp` and keyed by `from.hashcode()` (`packages/programs/data/shared-log/src/index.ts`). +- The guard is already relatively permissive: it only drops strictly older messages (`prevApplied > messageTimestamp`), so equal timestamps will still apply. +- The CI symptom here was not “valid updates dropped due to timestamp skew”; it was (1) replication-info apply happening concurrently and (2) pubsub dropping/ignoring RPC traffic during the pending-subscribe window. Loosening the timestamp check would risk allowing stale state to overwrite newer state without addressing either root cause. + +## 2026-02-06 Note: PR #589 Test vs PR #593 Scope +- PR `dao-xyz/peerbit#589` adds a pubsub regression test: `packages/transport/pubsub/test/bug1-initializeTopic-race.spec.ts` (guards the debounced `subscribe()` topic-init race). +- PR `dao-xyz/peerbit#593` changes `packages/transport/pubsub/src/index.ts` (in addition to many shared-log files), but does **not** list any new pubsub test file under `packages/transport/pubsub/test/**`. +- Recommendation: + - If `#593` is intended to be mergeable on its own (or might land before/without `#589`), it should include the `#589` pubsub regression test (or a stricter/deterministic variant) so the pubsub fix doesn’t land untested. + - If `#593` is intentionally stacked on `#589` and will be rebased/merged after `#589`, avoid duplication but make the dependency explicit so reviewers know the pubsub tests come from `#589`. diff --git a/shared-log-debug.md b/shared-log-debug.md new file mode 100644 index 000000000..15a0ca1ea --- /dev/null +++ b/shared-log-debug.md @@ -0,0 +1,876 @@ +You have recently made a PR here: https://github.com/dao-xyz/peerbit/pull/589 in order to address flaky connectivity within the Peerbit library for pubsub. This patch then had 3 CI test failures visible on github here https://github.com/dao-xyz/peerbit/actions/runs/21732319700/job/62689580781 (and you can inspect the history of the same PR with the other job failures on the CI page). It has been noted by some peers that this PR actually tries to improve connectivity between peers but actually more easily surfaces a "shared-log" flaky/race condition that was under the hood before, so it doesn't cause the error it just speeds up connections leading to the error surfacing (but this is not confirmed). Some make claims that it's related to a debounce whereas there are other claims that extra replications are now being caused because a debounce isn't canceled. + + + These assumptions are correct in the current code: + + - DirectSub.subscribe() is debounced (subscriptionDebounceDelay ?? 50ms), and _subscribe() is where subscriptions is set and listenForSubscribers() is called. + packages/transport/pubsub/src/index.ts:96 + packages/transport/pubsub/src/index.ts:139 + packages/transport/pubsub/src/index.ts:159 + packages/transport/pubsub/src/index.ts:270 + - Incoming Subscribe handling drops remote subscriptions if the topic hasn’t been initialized locally: + const peers = this.topics.get(topic); if (peers == null) return; + packages/transport/pubsub/src/index.ts:690 + - requestSubscribers responses only include topics present in this.subscriptions via getSubscriptionOverlap(), so during the debounce window overlap can be empty. + packages/transport/pubsub/src/index.ts:456 + packages/transport/pubsub/src/index.ts:732 + + The PR’s fix (eager initializeTopic() inside subscribe() and adding pending-topics via debounceSubscribeAggregator.has(topic) to requestSubscribers responses) directly addresses those gaps. + packages/transport/pubsub/src/index.ts:126 + packages/transport/pubsub/src/index.ts:732 + + Notable edge-case to sanity-check (not a blocker, but real): subscribe() then unsubscribe() before debounce fires now leaves an initialized topics entry behind (since unsubscribe only cancels the debounced subscribe). That can affect “are we subscribed” heuristics that + use this.topics.has(topic) rather than this.subscriptions.has(topic). + packages/transport/pubsub/src/index.ts:126 + packages/transport/pubsub/src/index.ts:183 + + ## What CI Actually Failed (PR run 21732319700, job 62689580781) + + In pnpm run test:ci:part-4 (Ubuntu 22.04, Node 22), @peerbit/shared-log failed: + + - events > replicate:join not emitted on update with duplicate join entries (same peer hash twice). + packages/programs/data/shared-log/test/events.spec.ts:103 + packages/programs/data/shared-log/test/events.spec.ts:104 + - migration-8-9 > 8-9, replicates database of 1 entry timed out and last assertion was expected +0 to equal 1. + packages/programs/data/shared-log/test/migration.spec.ts:130 + - migration-8-9 > 9-8, replicates database of 1 entry same symptom. + packages/programs/data/shared-log/test/migration.spec.ts:139 + + (waitForResolved default timeout is 10s, so these “AssertionError” failures are consistent with a 10s timeout where replication never completed.) + packages/utils/time/src/wait.ts:73 + + ## Most Likely Origin of the shared-log Flakes + + ### 1) Duplicate replicator:join (events.spec) + + The most plausible mechanism is a race in shared-log’s replication-info handling: + + - Shared-log can receive multiple replication-info messages for the same peer during join, close together: + - Proactive announcement on subscribe (handleSubscriptionChange sends AllReplicatingSegmentsMessage). + packages/programs/data/shared-log/src/index.ts:3954 + - Response to RequestReplicationInfoMessage (receiver sends AllReplicatingSegmentsMessage). + packages/programs/data/shared-log/src/index.ts:2919 + packages/programs/data/shared-log/src/index.ts:3981 + - Incoming replication-info messages are processed in a fire-and-forget async IIFE, not serialized per peer. + packages/programs/data/shared-log/src/index.ts:2955 + packages/programs/data/shared-log/src/index.ts:2971 + - addReplicationRange() decides “is new replicator” by reading the index state (prevCount === 0), then later writes. Two concurrent calls can both see prevCount === 0 and both emit replicator:join. + packages/programs/data/shared-log/src/index.ts:1139 + packages/programs/data/shared-log/src/index.ts:1200 + packages/programs/data/shared-log/src/index.ts:1382 + + This exactly matches the CI diff: same peer hash appearing twice in db1JoinEvents. + + Why PR #589 could increase the odds (without being the root cause): pubsub timing changes can make subscription discovery and replication-info exchange happen “tighter” in time, increasing the chance the two replication-info messages overlap and hit this race window. + + ### 2) Migration tests not replicating within 10s (migration.spec) + + Two high-probability contributors (can stack): + + - Missed/late subscription discovery ⇒ replication-info handshake doesn’t run soon enough. + Shared-log relies on pubsub subscribe events plus a single snapshot in afterOpen(). It does not call pubsub requestSubscribers(). If the subscribe event happens before the listener is attached (or the snapshot runs before pubsub has the subscriber recorded), + handleSubscriptionChange() might not run, so RequestReplicationInfoMessage isn’t sent and roles/ranges aren’t applied. + packages/programs/data/shared-log/src/index.ts:2138 + packages/programs/data/shared-log/src/index.ts:2219 + packages/programs/data/shared-log/src/index.ts:3916 + packages/transport/pubsub/src/index.ts:274 + - Replication-info messages can be dropped during startup if indexes aren’t ready. + In the replication-info IIFE, errors classified as “not started” are swallowed and the message is effectively lost (no retry at this layer). If this happens at the wrong time, the peer never gets replication ranges, and replication can stall. + packages/programs/data/shared-log/src/index.ts:290 + packages/programs/data/shared-log/src/index.ts:2999 + + ## Bottom Line On “Are the suspected origins correct?” + + - “This PR doesn’t touch shared-log directly” is true, but shared-log depends on pubsub (workspace:*), so pubsub timing changes can expose or amplify existing shared-log races. + - The replicator:join failure is strongly consistent with a shared-log concurrency bug (lack of per-peer serialization around addReplicationRange()), not with a deterministic pubsub regression. + - The migration failures are consistent with timing/handshake issues (missed subscribe window, lost replication-info processing, or simply >10s replication under CI load). Pubsub races can be a trigger, but shared-log also has its own “one-shot snapshot” assumptions. + + ## Most Actionable Hardening (if you want to fix the flakes) + Creating an actionable plan + + In the shared-log, we always subscribe to our RPC topic during the open process, which leads us to eventually initialize it. However, if a remote subscribe message arrives before this, it might be dropped, preventing us from learning about the subscription. To tackle this, we should incorporate a + requestSubscribers call right after opening. Here’s the proposed minimal patch set: + + 1. Immediate triage: Confirm causes by running tests with instrumentation. + 2. Code hardening: Strengthen the shared-log. + 3. Pubsub improvements: Ensure the relevant PR is merged, possibly adding tests. + 4. Test hardening: Enhance waitForResolved timeouts; implement a deterministic handshake. + 5. CI: Execute targeted tests several times, capturing debug logs and seeds. + + For the update, the latest PR was modified on Feb 6, 2026, head 61ab928..., with the failing run ID 21732319700. I believe the assumption about the CI failure is correct; it seems related to shared-log concurrency rather than pubsub issues. + +• Current Evidence (PR + CI) + + 1. PR dao-xyz/peerbit#589 (head 61ab928698a82de0441adca1b8580a4c4133795b, Feb 6 2026) is correctly targeting a real pubsub race: incoming Subscribe can be dropped while local subscribe() is still in the debounce window. Fix is to eagerly initializeTopic() in subscribe() and treat “pending + subscribe” as subscribed for requestSubscribers responses (packages/transport/pubsub/src/index.ts:126, packages/transport/pubsub/src/index.ts:732). + 2. The CI failures you called out look shared-log–originated (not pubsub-test–originated): duplicate replicator:join in events.spec.ts and replication timeout in migration.spec.ts (PR run 21732319700, job 62689580781 per earlier log inspection). + + Most Likely Root Causes + + 1. Duplicate replicator:join is a shared-log concurrency bug: + + - Join emits inside addReplicationRange() when isNewReplicator is computed off index counts (packages/programs/data/shared-log/src/index.ts:1139, packages/programs/data/shared-log/src/index.ts:1200, packages/programs/data/shared-log/src/index.ts:1384). + - Replication-info messages are applied via an un-awaited async IIFE, so two replication-info messages from the same peer can run addReplicationRange() concurrently (packages/programs/data/shared-log/src/index.ts:2971). + - On subscribe, shared-log sends both AllReplicatingSegmentsMessage and RequestReplicationInfoMessage (remote responds with another AllReplicatingSegmentsMessage), making “two near-simultaneous replication-info messages” common during joins (packages/programs/data/shared-log/src/index.ts:3954, + packages/programs/data/shared-log/src/index.ts:3981, packages/programs/data/shared-log/src/index.ts:2919). + + 2. Migration flake is likely “handshake missed” + “early message dropped”: + + - Startup only snapshots local pubsub state via getSubscribers() and does not actively backfill via requestSubscribers() (packages/programs/data/shared-log/src/index.ts:2236). + - Replication-info processing explicitly swallows NotStartedError/IndexNotStartedError and drops the update (packages/programs/data/shared-log/src/index.ts:2999, packages/programs/data/shared-log/src/index.ts:290), which can turn into a permanent “never learned peer’s replication intent” and then + a 10s waitForResolved() timeout (packages/utils/time/src/wait.ts:73). + + Actionable Hardening Plan (Ordered By Impact) + + 1. Serialize replication-info application per peer (must-do) + + - Change the replication-info handler at packages/programs/data/shared-log/src/index.ts:2955 to enqueue work per from.hashcode() instead of fire-and-forget IIFEs. + - Implementation shape: Map> chaining (or a tiny per-peer mutex) so only one addReplicationRange() runs at a time for a given peer. + - Move the “drop old message” check inside the serialized section and compare against “latest applied” (not “latest received”) to prevent older messages applying after newer ones. + - Validation: run pnpm -C ../peerbit-pr589/repo run test:ci:part-4 in a loop (20-50 iterations) and ensure events.spec.ts no longer produces duplicate join. + + 2. Make replicator:join emission idempotent across all paths + + - Guard replicator:join in addReplicationRange() with a state transition, not a racy count check. Prefer “was peer in uniqueReplicators before?” inside the same serialized section. + - Add the same guard in pruneOfflineReplicators() before emitting join (packages/programs/data/shared-log/src/index.ts:2294), because it can overlap with join-time replication updates and currently emits unconditionally. + - Validation: add a regression test that forces two identical replication-info messages to be handled “back-to-back” and asserts only one join event. + + 3. Stop losing replication-info messages on NotStartedError + + - Replace the current “swallow and return” in the replication-info handler catch (packages/programs/data/shared-log/src/index.ts:2999) with “store latest message per peer and retry once indexes are started”. + - Drain pending messages at the end of afterOpen() (packages/programs/data/shared-log/src/index.ts:2219) or when the replication index signals readiness. + - Validation: stress test by injecting an artificial delay before index start (or force early inbound messages) and confirm replication still converges. + + 4. Reduce duplication of replication-info messages at join + + - Adjust handleSubscriptionChange() (packages/programs/data/shared-log/src/index.ts:3916) so it does not always send both: + - Keep the proactive AllReplicatingSegmentsMessage for fast-path. + - Send RequestReplicationInfoMessage only as a fallback (e.g., delayed 200-500ms; cancel if we already received any replication-info from that peer). + - This is not required once (1) is done, but it reduces traffic and reduces probability of uncovering other races. + - Validation: keep join correctness and re-run stress loop. + + 5. Backfill subscriber state on startup + + - In afterOpen() (packages/programs/data/shared-log/src/index.ts:2219), call await this.node.services.pubsub.requestSubscribers(this.topic) before the getSubscribers() snapshot. + - This makes startup robust to missed subscribe events and to peers that subscribed while we were offline. + - Validation: reproduce migration tests with forced “subscribe event loss” (can be simulated by dropping first Subscribe message in pubsub tests) and ensure migration.spec.ts stabilizes. + + 6. Test hardening (only after code hardening above) + + - Keep existing assertions, but make them wait on explicit shared-log state rather than only time: + - In packages/programs/data/shared-log/test/events.spec.ts:84, wait for replicationIndex to contain exactly 1 segment for peer2 before asserting join list length. + - In packages/programs/data/shared-log/test/migration.spec.ts:124, optionally bump waitForResolved timeout to 30s while the underlying fix lands (defaults are 10s at packages/utils/time/src/wait.ts:73). + - Add a dedicated regression test for “duplicate inbound replication-info does not duplicate join”. + + 7. CI signal improvements + + - Add a “flake detector” job (nightly or non-blocking) that runs test:ci:part-4 20x and uploads logs when a failure occurs. + - Add trace logging behind an env flag for the replication-info queue and join emission so CI failures become diagnosable without reruns. + + Next Steps + + 1. Do you want this hardening plan turned into an actual patch series in a new PR against dao-xyz/peerbit (shared-log fixes), keeping PR #589 pubsub-only? + 2. If yes, tell me which worktree to implement in (../peerbit-pr589/repo master vs ../peerbit-pr589/repo.pr-589 PR branch), and I’ll implement steps 1-3 first and validate by looping test:ci:part-4. + + + +I can’t find the same exact triple-failure signature anywhere except that one PR #589 run. + + What I checked: + + - CI pull_request runs (last 134 retained): 14 had test:ci:part-4 failures, but only run 21732319700 (PR #589, head 976ac89) contains the failure summaries: + - events -> replicate:join not emitted on update: (assertion) + - migration-8-9 -> 8-9, replicates database of 1 entry: (expected log length 1, got 0) + - migration-8-9 -> 9-8, replicates database of 1 entry: (expected 1, got 0) + - CI master push runs (last 400): 5 had test:ci:part-4 failures, 0 had those same failure summaries. Master failures were other shared-log flakes (timeouts/assertions in other suites), and these 3 tests were passing in those + logs. + + So there’s no pattern / “started after commit X” I can point to from CI data, because it’s a single occurrence. + + Root cause (from the actual stacks in PR #589 run): + + - events.spec.ts:103-107: replicator:join events collected in db1JoinEvents ended up not matching exactly one peer as expected. The assertion failure output is consistent with a duplicate join emission (same peer hash + appearing twice). + - migration.spec.ts:130 and migration.spec.ts:139: after adding "hello" on one side, the other side’s log.log.length never reached 1 within waitForResolved’s timeout (so replication/handshake didn’t complete). + + Those are both symptoms of replication/membership not settling deterministically (either duplicated join signals, or replication never converging). With only one CI occurrence, it’s unclear whether that’s a shared-log flake + that happened to hit this PR, or a regression made more likely by the pubsub changes. + + + + PR #589 (pubsub race condition fix) is complete and pushed: + - 4 commits on fix/pubsub-initialize-topic-on-subscribe + - All 40 pubsub tests pass + 3 new regression tests pass + 192 log/replicate tests pass + - PR description updated with definitive root cause analysis of CI failures + + Key findings from the deep investigation: + - The 3 shared-log CI failures (events, migration-8-9) are caused by a pre-existing TOCTOU race in SharedLog.onMessage(), not by our pubsub fix + - Our fix makes subscriptions resolve faster (removes ~50ms debounce delay), which exposes the latent shared-log race where two concurrent async IIFEs both see prevCount === 0 and emit duplicate replicator:join events + - Master runs clean on Part 4 (1744/1744 pass), confirming the failures only surface when subscription timing changes + - The shared-log race should be fixed separately (serializing per-peer replication info processing) + + + + Another expected result — the replicate timing test (waitForReplicator waits until maturity) fails because our faster subscription discovery shortens the wait time (2639ms < expected 2900ms). Same shared-log timing sensitivity we already + documented. All background tasks from the previous session have now completed, all consistent with our analysis. + +⏺ As expected — duplicate replicator:join event (same peer hash appears twice in the array instead of once). This is the shared-log TOCTOU race where two concurrent AllReplicatingSegmentsMessage handlers both see prevCount === 0 and both + emit the event. + + All three background tasks confirm our analysis. Everything is consistent and documented in PR #589. + + + +Results: PR #3 does NOT fix the TOCTOU race + + PR #3 (fix/rootcause-b-persistcoordinate-guard) adds shutdown guards to persistCoordinate — protecting against TypeError when _close() nullifies internal indices during async operations. It's a valid shutdown hardening fix, but it targets a different problem. + + Test results with PR #3 applied (on top of pubsub fix): + ┌────────────────────────────────────────────────────┬────────┬──────────────────────────────────────────────────┐ + │ Test │ Result │ Issue │ + ├────────────────────────────────────────────────────┼────────┼──────────────────────────────────────────────────┤ + │ events > replicate:join not emitted on update │ FAIL │ Still duplicate replicator:join (2 instead of 1) │ + ├────────────────────────────────────────────────────┼────────┼──────────────────────────────────────────────────┤ + │ migration-8-9 > replicates database of 1 entry │ FAIL │ Still 0 instead of 1 │ + ├────────────────────────────────────────────────────┼────────┼──────────────────────────────────────────────────┤ + │ replicate > waitForReplicator waits until maturity │ PASS │ Likely environmental timing variance │ + └────────────────────────────────────────────────────┴────────┴──────────────────────────────────────────────────┘ + Why it doesn't fix the race + + The TOCTOU race is in onMessage() (~line 2971), where two concurrent fire-and-forget async IIFEs both call addReplicationRange() and both see prevCount === 0 before either writes. The fix would need a per-peer mutex or serialization queue around addReplicationRange calls, or an atomic + check-and-set for isNewReplicator. PR #3's persistCoordinate guard is in a completely different code path. + + + +## PR #589 Assumptions (PubSub) + + These assumptions are correct in the current code: + + - DirectSub.subscribe() is debounced (subscriptionDebounceDelay ?? 50ms), and _subscribe() is where subscriptions is set and listenForSubscribers() is called. + packages/transport/pubsub/src/index.ts:96 + packages/transport/pubsub/src/index.ts:139 + packages/transport/pubsub/src/index.ts:159 + packages/transport/pubsub/src/index.ts:270 + - Incoming Subscribe handling drops remote subscriptions if the topic hasn’t been initialized locally: + const peers = this.topics.get(topic); if (peers == null) return; + packages/transport/pubsub/src/index.ts:690 + - requestSubscribers responses only include topics present in this.subscriptions via getSubscriptionOverlap(), so during the debounce window overlap can be empty. + packages/transport/pubsub/src/index.ts:456 + packages/transport/pubsub/src/index.ts:732 + + The PR’s fix (eager initializeTopic() inside subscribe() and adding pending-topics via debounceSubscribeAggregator.has(topic) to requestSubscribers responses) directly addresses those gaps. + packages/transport/pubsub/src/index.ts:126 + packages/transport/pubsub/src/index.ts:732 + + Notable edge-case to sanity-check (not a blocker, but real): subscribe() then unsubscribe() before debounce fires now leaves an initialized topics entry behind (since unsubscribe only cancels the debounced subscribe). That can affect “are we subscribed” heuristics that + use this.topics.has(topic) rather than this.subscriptions.has(topic). + packages/transport/pubsub/src/index.ts:126 + packages/transport/pubsub/src/index.ts:183 + + ## What CI Actually Failed (PR run 21732319700, job 62689580781) + + In pnpm run test:ci:part-4 (Ubuntu 22.04, Node 22), @peerbit/shared-log failed: + + - events > replicate:join not emitted on update with duplicate join entries (same peer hash twice). + packages/programs/data/shared-log/test/events.spec.ts:103 + packages/programs/data/shared-log/test/events.spec.ts:104 + - migration-8-9 > 8-9, replicates database of 1 entry timed out and last assertion was expected +0 to equal 1. + packages/programs/data/shared-log/test/migration.spec.ts:130 + - migration-8-9 > 9-8, replicates database of 1 entry same symptom. + packages/programs/data/shared-log/test/migration.spec.ts:139 + + (waitForResolved default timeout is 10s, so these “AssertionError” failures are consistent with a 10s timeout where replication never completed.) + packages/utils/time/src/wait.ts:73 + + ## Most Likely Origin of the shared-log Flakes + + ### 1) Duplicate replicator:join (events.spec) + + The most plausible mechanism is a race in shared-log’s replication-info handling: + + - Shared-log can receive multiple replication-info messages for the same peer during join, close together: + - Proactive announcement on subscribe (handleSubscriptionChange sends AllReplicatingSegmentsMessage). + packages/programs/data/shared-log/src/index.ts:3954 + - Response to RequestReplicationInfoMessage (receiver sends AllReplicatingSegmentsMessage). + packages/programs/data/shared-log/src/index.ts:2919 + packages/programs/data/shared-log/src/index.ts:3981 + - Incoming replication-info messages are processed in a fire-and-forget async IIFE, not serialized per peer. + packages/programs/data/shared-log/src/index.ts:2955 + packages/programs/data/shared-log/src/index.ts:2971 + - addReplicationRange() decides “is new replicator” by reading the index state (prevCount === 0), then later writes. Two concurrent calls can both see prevCount === 0 and both emit replicator:join. + packages/programs/data/shared-log/src/index.ts:1139 + packages/programs/data/shared-log/src/index.ts:1200 + packages/programs/data/shared-log/src/index.ts:1382 + + This exactly matches the CI diff: same peer hash appearing twice in db1JoinEvents. + + Why PR #589 could increase the odds (without being the root cause): pubsub timing changes can make subscription discovery and replication-info exchange happen “tighter” in time, increasing the chance the two replication-info messages overlap and hit this race window. + + ### 2) Migration tests not replicating within 10s (migration.spec) + + Two high-probability contributors (can stack): + + - Missed/late subscription discovery ⇒ replication-info handshake doesn’t run soon enough. + Shared-log relies on pubsub subscribe events plus a single snapshot in afterOpen(). It does not call pubsub requestSubscribers(). If the subscribe event happens before the listener is attached (or the snapshot runs before pubsub has the subscriber recorded), + handleSubscriptionChange() might not run, so RequestReplicationInfoMessage isn’t sent and roles/ranges aren’t applied. + packages/programs/data/shared-log/src/index.ts:2138 + packages/programs/data/shared-log/src/index.ts:2219 + packages/programs/data/shared-log/src/index.ts:3916 + packages/transport/pubsub/src/index.ts:274 + - Replication-info messages can be dropped during startup if indexes aren’t ready. + In the replication-info IIFE, errors classified as “not started” are swallowed and the message is effectively lost (no retry at this layer). If this happens at the wrong time, the peer never gets replication ranges, and replication can stall. + packages/programs/data/shared-log/src/index.ts:290 + packages/programs/data/shared-log/src/index.ts:2999 + + ## Bottom Line On “Are the suspected origins correct?” + + - “This PR doesn’t touch shared-log directly” is true, but shared-log depends on pubsub (workspace:*), so pubsub timing changes can expose or amplify existing shared-log races. + - The replicator:join failure is strongly consistent with a shared-log concurrency bug (lack of per-peer serialization around addReplicationRange()), not with a deterministic pubsub regression. + - The migration failures are consistent with timing/handshake issues (missed subscribe window, lost replication-info processing, or simply >10s replication under CI load). Pubsub races can be a trigger, but shared-log also has its own “one-shot snapshot” assumptions. + + ## Most Actionable Hardening (if you want to fix the flakes) +Creating an actionable plan + + In the shared-log, we always subscribe to our RPC topic during the open process, which leads us to eventually initialize it. However, if a remote subscribe message arrives before this, it might be dropped, preventing us from learning about the subscription. To tackle this, we should incorporate a + requestSubscribers call right after opening. Here’s the proposed minimal patch set: + + 1. Immediate triage: Confirm causes by running tests with instrumentation. + 2. Code hardening: Strengthen the shared-log. + 3. Pubsub improvements: Ensure the relevant PR is merged, possibly adding tests. + 4. Test hardening: Enhance waitForResolved timeouts; implement a deterministic handshake. + 5. CI: Execute targeted tests several times, capturing debug logs and seeds. + + For the update, the latest PR was modified on Feb 6, 2026, head 61ab928..., with the failing run ID 21732319700. I believe the assumption about the CI failure is correct; it seems related to shared-log concurrency rather than pubsub issues. + +• Current Evidence (PR + CI) + + 1. PR dao-xyz/peerbit#589 (head 61ab928698a82de0441adca1b8580a4c4133795b, Feb 6 2026) is correctly targeting a real pubsub race: incoming Subscribe can be dropped while local subscribe() is still in the debounce window. Fix is to eagerly initializeTopic() in subscribe() and treat “pending + subscribe” as subscribed for requestSubscribers responses (packages/transport/pubsub/src/index.ts:126, packages/transport/pubsub/src/index.ts:732). + 2. The CI failures you called out look shared-log–originated (not pubsub-test–originated): duplicate replicator:join in events.spec.ts and replication timeout in migration.spec.ts (PR run 21732319700, job 62689580781 per earlier log inspection). + + Most Likely Root Causes + + 1. Duplicate replicator:join is a shared-log concurrency bug: + + - Join emits inside addReplicationRange() when isNewReplicator is computed off index counts (packages/programs/data/shared-log/src/index.ts:1139, packages/programs/data/shared-log/src/index.ts:1200, packages/programs/data/shared-log/src/index.ts:1384). + - Replication-info messages are applied via an un-awaited async IIFE, so two replication-info messages from the same peer can run addReplicationRange() concurrently (packages/programs/data/shared-log/src/index.ts:2971). + - On subscribe, shared-log sends both AllReplicatingSegmentsMessage and RequestReplicationInfoMessage (remote responds with another AllReplicatingSegmentsMessage), making “two near-simultaneous replication-info messages” common during joins (packages/programs/data/shared-log/src/index.ts:3954, + packages/programs/data/shared-log/src/index.ts:3981, packages/programs/data/shared-log/src/index.ts:2919). + + 2. Migration flake is likely “handshake missed” + “early message dropped”: + + - Startup only snapshots local pubsub state via getSubscribers() and does not actively backfill via requestSubscribers() (packages/programs/data/shared-log/src/index.ts:2236). + - Replication-info processing explicitly swallows NotStartedError/IndexNotStartedError and drops the update (packages/programs/data/shared-log/src/index.ts:2999, packages/programs/data/shared-log/src/index.ts:290), which can turn into a permanent “never learned peer’s replication intent” and then + a 10s waitForResolved() timeout (packages/utils/time/src/wait.ts:73). + + Actionable Hardening Plan (Ordered By Impact) + + 1. Serialize replication-info application per peer (must-do) + + - Change the replication-info handler at packages/programs/data/shared-log/src/index.ts:2955 to enqueue work per from.hashcode() instead of fire-and-forget IIFEs. + - Implementation shape: Map> chaining (or a tiny per-peer mutex) so only one addReplicationRange() runs at a time for a given peer. + - Move the “drop old message” check inside the serialized section and compare against “latest applied” (not “latest received”) to prevent older messages applying after newer ones. + - Validation: run pnpm -C ../peerbit-pr589/repo run test:ci:part-4 in a loop (20-50 iterations) and ensure events.spec.ts no longer produces duplicate join. + + 2. Make replicator:join emission idempotent across all paths + + - Guard replicator:join in addReplicationRange() with a state transition, not a racy count check. Prefer “was peer in uniqueReplicators before?” inside the same serialized section. + - Add the same guard in pruneOfflineReplicators() before emitting join (packages/programs/data/shared-log/src/index.ts:2294), because it can overlap with join-time replication updates and currently emits unconditionally. + - Validation: add a regression test that forces two identical replication-info messages to be handled “back-to-back” and asserts only one join event. + + 3. Stop losing replication-info messages on NotStartedError + + - Replace the current “swallow and return” in the replication-info handler catch (packages/programs/data/shared-log/src/index.ts:2999) with “store latest message per peer and retry once indexes are started”. + - Drain pending messages at the end of afterOpen() (packages/programs/data/shared-log/src/index.ts:2219) or when the replication index signals readiness. + - Validation: stress test by injecting an artificial delay before index start (or force early inbound messages) and confirm replication still converges. + + 4. Reduce duplication of replication-info messages at join + + - Adjust handleSubscriptionChange() (packages/programs/data/shared-log/src/index.ts:3916) so it does not always send both: + - Keep the proactive AllReplicatingSegmentsMessage for fast-path. + - Send RequestReplicationInfoMessage only as a fallback (e.g., delayed 200-500ms; cancel if we already received any replication-info from that peer). + - This is not required once (1) is done, but it reduces traffic and reduces probability of uncovering other races. + - Validation: keep join correctness and re-run stress loop. + + 5. Backfill subscriber state on startup + + - In afterOpen() (packages/programs/data/shared-log/src/index.ts:2219), call await this.node.services.pubsub.requestSubscribers(this.topic) before the getSubscribers() snapshot. + - This makes startup robust to missed subscribe events and to peers that subscribed while we were offline. + - Validation: reproduce migration tests with forced “subscribe event loss” (can be simulated by dropping first Subscribe message in pubsub tests) and ensure migration.spec.ts stabilizes. + + 6. Test hardening (only after code hardening above) + + - Keep existing assertions, but make them wait on explicit shared-log state rather than only time: + - In packages/programs/data/shared-log/test/events.spec.ts:84, wait for replicationIndex to contain exactly 1 segment for peer2 before asserting join list length. + - In packages/programs/data/shared-log/test/migration.spec.ts:124, optionally bump waitForResolved timeout to 30s while the underlying fix lands (defaults are 10s at packages/utils/time/src/wait.ts:73). + - Add a dedicated regression test for “duplicate inbound replication-info does not duplicate join”. + + 7. CI signal improvements + + - Add a “flake detector” job (nightly or non-blocking) that runs test:ci:part-4 20x and uploads logs when a failure occurs. + - Add trace logging behind an env flag for the replication-info queue and join emission so CI failures become diagnosable without reruns. + + + +## What PR #589 actually does (verified against the diff + PR description) + +The PR description explicitly states the root cause is a debounce window: `subscribe()` queues work via `debounceSubscribeAggregator`, while the real `_subscribe()` (which calls `initializeTopic()` and sets `subscriptions`) happens later. If a peer receives (or responds to) messages in that window, state can become asymmetric. ([GitHub][1]) + +Concretely, the PR implements two behavioral changes in `DirectSub`: + +1. **Eager topic initialization inside `subscribe()`** + +It now calls `initializeTopic(topic)` immediately if the topic doesn’t exist, before adding to the debounce aggregator. ([GitHub][2]) + +2. **Treat “pending subscribes” as subscribes when answering `requestSubscribers`** + +When building `mySubscriptions` (overlap), it now also includes any topic that is still pending inside `debounceSubscribeAggregator` (i.e., `subscribe()` was called but the debounced `_subscribe()` hasn’t executed yet). ([GitHub][2]) + +And it adds 3 tests in `bug1-initializeTopic-race.spec.ts`: + +* Unit: `topics.has(topic)` becomes true immediately after calling `subscribe()` (before debounce fires). +* Integration: concurrent subscribe+connect → mutual discovery. +* Integration: connect first, then subscribe → still works. ([GitHub][2]) + +--- + +## Is the review correct? + +### ✅ The core technical story is correct + +The review’s description of the race matches the PR author’s own explanation: + +* `subscribe()` is debounced, `_subscribe()` sets up topic tracking and `subscriptions`. ([GitHub][1]) +* During the debounce gap: + + * an incoming `Subscribe` may be dropped because `topics.get(topic)` is undefined/null and the handler returns early ([GitHub][1]) + * `requestSubscribers` overlap can be empty because `subscriptions` isn’t set yet ([GitHub][1]) + +That’s exactly what the PR claims and what the fix targets. + +### ✅ The review’s “what changed” section matches the diff + +* Eager `initializeTopic()` in `subscribe()` is exactly in the diff. ([GitHub][2]) +* Pending-subscribe inclusion using `debounceSubscribeAggregator.has(topic)` in the `requestSubscribers` response is exactly in the diff. ([GitHub][2]) +* The three tests described in the review are present. ([GitHub][2]) + +### ✅ The nuance “this only fixes topics you locally called subscribe() for” is also intended + +The PR author explicitly says they **did not** change the incoming Subscribe handler, and they rejected an earlier approach (“always init topic on incoming Subscribe”) because it broke a “rejoin with different subscriptions” test. The current approach is intentionally targeted to topics the local node subscribes to. ([GitHub][1]) + +So the review’s point that this is a *design choice* (not a general relay-tracks-everything behavior) is aligned with the PR. + +### ⚠️ The review is right that the “pending subscribe advertised in requestSubscribers” isn’t directly asserted + +The current tests prove **eventual mutual discovery** and **eager topic init**, but they do not explicitly assert “a requestSubscribers response includes a pending subscribe” as a standalone behavior. ([GitHub][2]) + +That said, it *may* be indirectly exercised by the concurrent subscribe+connect integration test depending on exactly how the protocol handshake works. The review’s “less directly proven” comment is fair. + +### ⚠️ “subscribe then unsubscribe within debounce” is not covered + +The review’s concern here is legitimate: eager init creates a topic entry even if you later cancel before the debounced work fires, and there’s no new test that ensures cleanup/advertising is correct. The PR notes unsubscribe already uses `debounceSubscribeAggregator.has()` for pending subscribes, but doesn’t add an explicit test for the subscribe→unsubscribe edge. ([GitHub][1]) + +--- + +## Tests to add to validate the assumptions + +Below are **drop-in mocha+chai tests** in the same style as the existing `bug1-...spec.ts` file. They are designed to directly validate the assumptions the review calls out. + +### Test file 1: `bug2-requestSubscribers-pendingSubscribe.spec.ts` + +This file adds two integration tests: + +1. **Directly tests the “pending subscribe is treated as subscribed for requestSubscribers” assumption** + To make this deterministic, it *blocks* `_subscribe()` on peer A so `subscriptions` never gets set, while the “pending” marker should still be present. Then peer B subscribes and should still discover A as a subscriber. This isolates the new “pending subscribe counts” logic. The PR description explicitly names `_subscribe()` as the place where `subscriptions` is set, and the diff shows the `debounceSubscribeAggregator.has()` fallback used to cover that gap. ([GitHub][1]) + +2. **Asserts the “don’t track topics you didn’t subscribe to” design** + Peer A never subscribes; peer B subscribes. We assert A never creates `topics` entry for that topic (confirming the intended behavior and guarding against reintroducing the reverted “initializeTopic on incoming Subscribe” behavior). ([GitHub][1]) + +```ts +// packages/transport/pubsub/test/bug2-requestSubscribers-pendingSubscribe.spec.ts +import { TestSession } from "@peerbit/libp2p-test-utils"; +import { waitForNeighbour } from "@peerbit/stream"; +import { delay, waitForResolved } from "@peerbit/time"; +import { expect } from "chai"; +import { DirectSub } from "../src/index.js"; + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe("BUG 2: pending subscribe should be visible via requestSubscribers", function () { + this.timeout(60_000); + + it("peer discovers remote subscription while remote _subscribe() is blocked (pending subscribe advertised)", async () => { + const TOPIC = "pending-subscribe-advertised"; + + const session = await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + }); + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + + // Connect first (so any handshake/requestSubscribers traffic can happen) + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + // Block A._subscribe() so that: + // - A has called subscribe(TOPIC) (so debounce aggregator has it) + // - but A never reaches the point where it sets `subscriptions` + // This isolates the "pending subscribe counts" logic. + const gate = deferred(); + const aAny = a as any; + + expect(aAny._subscribe, "Expected DirectSub to have a _subscribe() method").to.be.a( + "function", + ); + + const originalSubscribeImpl = aAny._subscribe.bind(aAny); + aAny._subscribe = async (...args: any[]) => { + await gate.promise; + return originalSubscribeImpl(...args); + }; + + let aSubscribeResolved = false; + const aSubscribePromise = a.subscribe(TOPIC).then(() => { + aSubscribeResolved = true; + }); + + // Now subscribe normally on B + await b.subscribe(TOPIC); + + // Key assertion: + // B should learn that A is subscribed *even though* A's subscribe hasn't resolved yet. + // Without the "pending subscribe" inclusion in requestSubscribers, B has no basis to learn A. + await waitForResolved(() => { + expect(aSubscribeResolved, "A.subscribe should still be pending").to.be.false; + + const bTopicMap = b.topics.get(TOPIC); + expect(bTopicMap).to.not.be.undefined; + expect( + bTopicMap!.has(a.publicKeyHash), + "B should record A as a subscriber while A is pending", + ).to.be.true; + }); + + // Cleanup: release A and let subscribe resolve, so we don't leave dangling work. + gate.resolve(); + await aSubscribePromise; + } finally { + await session.stop(); + } + }); + + it("a node that did NOT subscribe does NOT start tracking a topic just because it receives Subscribe traffic (design guard)", async () => { + const TOPIC = "non-subscriber-should-not-track"; + + const session = await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + }); + + try { + const a = session.peers[0].services.pubsub; // will NOT subscribe + const b = session.peers[1].services.pubsub; // WILL subscribe + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + await b.subscribe(TOPIC); + + // Give a moment for any Subscribe traffic to be exchanged + await delay(250); + + // If we ever re-introduce "initializeTopic on incoming Subscribe", + // this would start failing. + expect(a.topics.has(TOPIC)).to.equal(false); + expect(a.topics.get(TOPIC)).to.equal(undefined); + } finally { + await session.stop(); + } + }); +}); +``` + +--- + +### Test file 2: `bug3-subscribe-then-unsubscribe-before-debounce.spec.ts` + +This test targets the edge case the review flagged: + +* Peer A calls `subscribe(topic)` (so topic is eagerly initialized and “pending subscribe” exists). +* Peer A then immediately calls `unsubscribe(topic)` before the debounced subscribe finishes. +* Peer B subscribes and connects/handshakes. +* Assert B does **not** record A as a subscriber, and A does **not** keep tracking the topic. + +This validates the assumption: “unsubscribe properly cancels pending subscribe + stops advertising it”. + +```ts +// packages/transport/pubsub/test/bug3-subscribe-then-unsubscribe-before-debounce.spec.ts +import { TestSession } from "@peerbit/libp2p-test-utils"; +import { waitForNeighbour } from "@peerbit/stream"; +import { delay, waitForResolved } from "@peerbit/time"; +import { expect } from "chai"; +import { DirectSub } from "../src/index.js"; + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe("Edge case: subscribe then unsubscribe inside debounce window", function () { + this.timeout(60_000); + + it("does not advertise or retain topic if unsubscribe happens before debounced subscribe executes", async () => { + const TOPIC = "subscribe-then-unsubscribe-before-debounce"; + + const session = await TestSession.disconnected(2, { + services: { + pubsub: (c) => + new DirectSub(c, { + canRelayMessage: true, + connectionManager: false, + }), + }, + }); + + try { + const a = session.peers[0].services.pubsub; + const b = session.peers[1].services.pubsub; + + await session.connect([[session.peers[0], session.peers[1]]]); + await waitForNeighbour(a, b); + + // Block A._subscribe so A never reaches "subscriptions set" state, + // making this test specifically about pending-subscribe cancellation. + const gate = deferred(); + const aAny = a as any; + expect(aAny._subscribe).to.be.a("function"); + const originalSubscribeImpl = aAny._subscribe.bind(aAny); + aAny._subscribe = async (...args: any[]) => { + await gate.promise; + return originalSubscribeImpl(...args); + }; + + // Start subscribe (pending) but do not await it + const aSubscribe = a.subscribe(TOPIC).catch(() => { + // If unsubscribe cancels in a way that rejects the subscribe promise, + // we don't want an unhandled rejection to fail the test. + }); + + // Immediately unsubscribe + await a.unsubscribe(TOPIC); + + // Now B subscribes; if A still advertises "pending subscribe", + // B may incorrectly record A as a subscriber. + await b.subscribe(TOPIC); + + // Wait a bit for any requestSubscribers/Subscribe traffic to settle + await delay(500); + + const bTopicMap = b.topics.get(TOPIC); + if (bTopicMap) { + expect( + bTopicMap.has(a.publicKeyHash), + "B should NOT record A as subscribed after A unsubscribed during debounce", + ).to.equal(false); + } + + // Also assert A is not still tracking the topic (or at least not claiming it) + // Depending on current semantics, you may want either: + // - topics entry removed entirely, OR + // - topics entry exists but empty and not advertised. + // + // This is the stricter expectation (recommended): + await waitForResolved(() => { + expect(a.topics.has(TOPIC)).to.equal(false); + }); + + // Cleanup: release gate and give a tick so any queued work can finish + gate.resolve(); + await delay(50); + await aSubscribe; + } finally { + await session.stop(); + } + }); +}); +``` + +**Note:** That “topics entry removed entirely” assertion is intentionally strict. If the intended semantics are “topic map may exist but should not be advertised / should stay empty,” then change that part to check the map is empty instead of checking `has() === false`. + +--- + +## Summary of what these added tests validate + +* **Assumption: `requestSubscribers` should include pending subscribes** + ✅ Tested by forcing `subscriptions` to remain unset while the pending marker exists, and asserting the other peer still discovers the subscriber. + +* **Assumption: eager init doesn’t accidentally turn a node into a topic tracker for topics it didn’t subscribe to** + ✅ Tested by ensuring a non-subscribing peer doesn’t create topic entries just because it sees Subscribe traffic (guards against the reverted approach that broke “rejoin”). ([GitHub][1]) + +* **Assumption: unsubscribe cancels pending subscribe correctly** + ✅ Tested by subscribe→unsubscribe inside debounce and checking no advertising/retained tracking. + +If you want, I can also propose a **more surgical unit test** (no libp2p session) that asserts the requestSubscribers handler includes pending topics by directly invoking the handler, but that depends on whether the handler is factored into a testable method or is currently only inside `onDataMessage()` (it looks inlined). ([GitHub][1]) + +[1]: https://github.com/dao-xyz/peerbit/pull/589 "https://github.com/dao-xyz/peerbit/pull/589" +[2]: https://github.com/dao-xyz/peerbit/pull/589/files "https://github.com/dao-xyz/peerbit/pull/589/files" + + + +Here’s what PR **#589** is doing, and whether the “this fixes the race” story holds up. + +## What the PR claims the bug is + +In **DirectSub**, `subscribe()` doesn’t immediately run the real subscription logic; it schedules it through a debounce aggregator. The PR author says there’s a window where: + +1. `this.topics.get(topic)` is still `undefined`, so an incoming **Subscribe** message gets dropped (because the handler returns early), and +2. even if the topic exists, `getSubscriptionOverlap()` can still return empty because `this.subscriptions` hasn’t been set yet, so a **requestSubscribers** response is empty and the other peer never learns about your subscription. ([GitHub][1]) + +That produces “peer A knows peer B, but peer B doesn’t know peer A” when both subscribe + connect concurrently. ([GitHub][1]) + +## What actually changed in code + +### 1) Eagerly initialize the topic in `subscribe()` + +They added: + +```ts +if (!this.topics.has(topic)) { + this.initializeTopic(topic); +} +return this.debounceSubscribeAggregator.add({ key: topic }); +``` + +…so the topic entry exists immediately, even before the debounced `_subscribe()` fires. ([GitHub][2]) + +### 2) Treat “pending subscribe” as a subscription in `requestSubscribers` + +When building the `requestSubscribers` response, they now add topics that are still pending inside the debounce aggregator: + +```ts +if (!mySubscriptions.includes(topic) && + this.debounceSubscribeAggregator.has(topic)) { + mySubscriptions.push(topic); +} +``` + +So a peer can learn you’re “about to be subscribed” during the debounce window. ([GitHub][2]) + +### 3) Regression tests were added + +The new test file asserts: + +* **Unit:** right after calling `subscribe()`, `topics.has(topic)` is already true (before debounce fires) ([GitHub][2]) +* **Integration:** if both peers `subscribe()` and `connect()` concurrently, they eventually see each other recorded under the topic ([GitHub][2]) +* **Integration (normal path):** connect first, then subscribe, still works ([GitHub][2]) + +## Does this actually fix the described race? + +### Problem 1 (“incoming Subscribe dropped because topics.get is undefined”) + +**Yes — but only for topics the local node has called `subscribe()` on.** + +If the incoming Subscribe handler was previously doing a `topics.get(topic)` + early return when undefined (as described in the PR), then creating the topic map *immediately* in `subscribe()` removes the “undefined topic” window for that topic. ([GitHub][1]) + +The unit test strongly supports that `initializeTopic()` creates the expected Map entry synchronously (it checks `.get(TOPIC)` is a `Map` immediately). ([GitHub][2]) + +### Problem 2 (“requestSubscribers response empty because subscriptions isn’t set yet”) + +**The fix is logically consistent with the claimed cause**, because it explicitly covers the same debounce window by checking `debounceSubscribeAggregator.has(topic)`. ([GitHub][1]) + +However: **there is no direct regression test** in the added test file that asserts the `requestSubscribers` response includes pending subscribes. The tests mainly validate topic initialization + mutual discovery during concurrent subscribe/connect. ([GitHub][2]) +So this part looks reasonable, but it’s less “proven” by the new tests than part 1. + +## Are the PR’s assumptions correct? + +### ✅ Assumption: “`subscribe()` is debounced; `_subscribe()` sets up topic tracking and `subscriptions` later” + +This is explicitly stated in the PR writeup ([GitHub][1]) and is consistent with why the added tests would fail without eager init. I can’t fully confirm without reading the full `DirectSub` implementation (outside the diff hunks), but nothing in the diff contradicts it. + +### ✅ Assumption: “Calling `initializeTopic()` early is safe / idempotent” + +They claim it’s idempotent and guarded. ([GitHub][1]) +In the actual change they also guard with `if (!this.topics.has(topic))`, so even if `initializeTopic()` isn’t perfectly idempotent, it won’t be called repeatedly for the same topic. ([GitHub][2]) +One subtle risk: using `has()` assumes the map never contains a key with an `undefined` value (rare, but possible). If that could happen, checking `get() == null` would be safer than `has()`. + +### ⚠️ Assumption: “Relays should *not* track topics they don’t subscribe to” + +This is a design assumption, not a mechanical one. + +The PR explicitly says they *avoided* initializing topics in the incoming Subscribe handler because it broke “rejoin with different subscriptions,” and they want to preserve “only track topics you care about.” ([GitHub][1]) + +That’s coherent if DirectSub nodes are *not supposed* to act as full routing relays for topics they aren’t subscribed to. If your “relay” use case depends on tracking remote subscribers even when the relay itself isn’t subscribed, then this PR intentionally does **not** solve that (it fixes only the “I called subscribe but debounce hasn’t fired yet” case). + +### ⚠️ New behavioral edge case introduced by eager init + +Eager init means `topics` will exist **even if you unsubscribe before the debounced subscribe executes** (depending on how unsubscribe cleanup works). If topic cleanup is tied to `_subscribe()` having run, you could end up temporarily (or permanently) tracking a topic you aren’t actually subscribed to. + +The PR notes `debounceSubscribeAggregator.has()` is used in `unsubscribe()` for pending subscribes ([GitHub][1]), which suggests they’ve thought about pending subscribe cancellation — but there’s no new test covering “subscribe then immediately unsubscribe” to ensure the topic entry is removed/ignored correctly. + +## What I’d recommend to “verify” this PR beyond what’s here + +If you want higher confidence the assumptions are correct, I’d add/confirm: + +1. **A regression test for requestSubscribers + pending subscribe**, e.g.: + + * Peer B calls `subscribe(topic)` but before debounce fires, + * Peer A triggers `requestSubscribers` to B, + * Assert A learns B is subscribed (via the pending check). + +2. **A subscribe-then-unsubscribe-within-debounce test**, asserting: + + * topic doesn’t remain tracked / doesn’t cause incorrect remote subscription tracking, + * `requestSubscribers` does *not* advertise it once unsubscribe has been called. + +3. **A quick read of `initializeTopic()`** to confirm it’s truly “create map + no heavy side effects,” since the PR relies on it being safe to do earlier. ([GitHub][1]) + +--- + +**Bottom line:** + +* The **core fix (eager topic init in `subscribe()`) matches the described race and is supported by the new unit + integration tests.** ([GitHub][2]) +* The **requestSubscribers pending-subscribe fix is plausible and consistent**, but it’s **not directly exercised** by the new tests. ([GitHub][2]) +* The **relay/“track topics you didn’t subscribe to”** question is a **design choice**; the PR explicitly chooses “no” to avoid breaking rejoin behavior. ([GitHub][1]) + +[1]: https://github.com/dao-xyz/peerbit/pull/589 "fix(pubsub): eagerly initialize topic in subscribe() to prevent race by Faolain · Pull Request #589 · dao-xyz/peerbit · GitHub" +[2]: https://github.com/dao-xyz/peerbit/pull/589/files "fix(pubsub): eagerly initialize topic in subscribe() to prevent race by Faolain · Pull Request #589 · dao-xyz/peerbit · GitHub" + + +Other relevant information: +- This PR https://github.com/Faolain/peerbit/pull/3 attempts to solve the TOCTOU race within shared-log but seems to have CI failures. Find out what those are. Did these appear on CI history for main or any other PRs or did this PR cause these regressions. + +- This PR https://github.com/Faolain/peerbit/pull/4 attempts to be a more comprehensive solution than https://github.com/Faolain/peerbit/pull/3 yet still also has CI errors. Find out what these errors are see if they appear in the CI history for main or other PRs or if this PR is the cause of it. +- This PR https://github.com/dao-xyz/peerbit/pull/591 fix(shared-log): avoid unhandled rejection when entry.hash is missing which also has its own CI errors on Github that can be inspected. + + +Instructions: +- When running tests, for every test that either passes or fails note down the result in the shared-log-debug-plan.md learnings section along with any learnings you had from that result. + +If there are any gotchas/ahas note them in the # Ahas/Gotchas section. +