PubSub pendingSubscriptions + SharedLog replication handshake hardening#596
Open
Faolain wants to merge 10 commits intodao-xyz:masterfrom
Open
PubSub pendingSubscriptions + SharedLog replication handshake hardening#596Faolain wants to merge 10 commits intodao-xyz:masterfrom
Faolain wants to merge 10 commits intodao-xyz:masterfrom
Conversation
Context PR dao-xyz#589 (pubsub debounce hardening) tightened peer subscription timing and surfaced latent shared-log races/timeouts in CI (duplicate replicator:join + migration-8-9 replication stalls). What Was Implemented (see shared-log-debug.md for the full investigation log) Pubsub (packages/transport/pubsub/src/index.ts) - Eagerly initialize per-topic state in subscribe() so early remote Subscribe messages aren't dropped. - Include topics pending in the debounce window when responding to requestSubscribers overlap queries. - If subscribe() is cancelled via unsubscribe() before the debounce fires, clean up the eager topic state to avoid stale topic entries. Shared-log (packages/programs/data/shared-log/src/index.ts) - Serialize replication-info application per peer (promise chain) to eliminate TOCTOU around addReplicationRange() and replicator:join emission. - Track latest replication-info timestamp per peer and ignore older messages. - If replication-info can't be applied yet (NotStartedError / index-not-ready), keep the latest per-peer message and flush after open instead of dropping it. - After open, requestSubscribers(topic) and backfill handleSubscriptionChange() from the current subscriber snapshot to avoid missed subscribe windows. - Make replicator:join idempotent: emit on the transition "not known replicator -> has segments", including the restart/full-state announce case. Migration + Role fixes - v8 compat: always respond with a role; getRole() is best-effort when multiple segments exist, and request handlers correctly await getRole(). - ResponseRoleMessage -> replication-info conversion denormalizes factor/offset into u32 coordinate space. - Fix RoleReplicationSegment offset encoding bug (offset nominator incorrectly used factor). Tests (packages/programs/data/shared-log/test/*) - Fix migration v8 mock to respond using the actually opened instance (db1.log.rpc). - Make the maturity timing assertion robust by using remaining maturity time based on the observed segment timestamp. Additional flake hardening (this commit) - checkReplicas() now uses a slightly longer waitForResolved timeout with lower polling rate to avoid false negatives under heavy load (u32-simple sharding convergence could exceed the default 10s). File: packages/programs/data/shared-log/test/utils.ts Docs - shared-log-debug-plan.md updated with current verification status, the observed sharding flake, and the test hardening applied. Verification - pnpm run build: PASS - Targeted shared-log tests: PASS - replicate:join not emitted on update - 8-9, replicates database of 1 entry - 9-8, replicates database of 1 entry - waitForReplicator waits until maturity - handles peer joining and leaving multiple times - Full CI Part 4 suite: PASS - node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/data/shared-log ./packages/programs/data/shared-log/proxy -- -t node --no-build - @peerbit/shared-log: 1743 passing - @peerbit/shared-log-proxy: 1 passing
Contains terminal transcript / scratch notes from the shared-log + pubsub investigation (with account/session redacted).
Contributor
Author
Test Results for Master vs #596
What these tests mean
|
Contributor
Author
Contributor
Author
The sequential replication-info processing and bounded sends introduced in this PR can take longer under CI load. Increase the test-level timeout to 300s and inner waitForResolved to 180s with 500ms polling to avoid flaky convergence failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
Author
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.













Summary
This PR is an integration of:
It addresses a class of races caused by debounced
pubsub.subscribe(). During the debounce window, a node has expressed local intent to subscribe, but historically it would:subscriptions).Those gaps can cascade into shared-log replication handshake flakes and incorrect/unstable replication state under suite load.
This PR combines the best parts of #593 (shared-log correctness) with the stronger PubSub model from PR #7 (pending-subscription semantics + regressions).
What This PR Fixes
1) PubSub: Pending Subscriptions Are First-Class
Problem:
DirectSub.subscribe()is debounced. Between the call tosubscribe(topic)and the eventual debounced_subscribe([topic]), the node has local interest but was treated as "not subscribed".Fix: Introduce explicit
pendingSubscriptionstracking and treat it as local interest.Key changes (
packages/transport/pubsub/src/index.ts):pendingSubscriptions: Set<string>.subscribe(topic):topictopendingSubscriptionsinitializeTopic(topic)(so we track the topic immediately and don't drop inbound subscription traffic)_subscribe():topicfrompendingSubscriptionsonce the real subscription is appliedunsubscribe(topic):topicfrompendingSubscriptionspeerToTopiclastSubscriptionMessagestopics/topicsToPeersPubSubDatawithstrict: true, treat pending topics as local interest so strict deliveries are not dropped during the debounce window.Subscribe{requestSubscribers:true}, include pending topics in the overlap response.Why this matters:
2) Shared-log: Replication Handshake More Deterministic Under Load
This PR keeps #593's shared-log correctness work (queueing/buffering replication-info, backfill, v8 conversion/role fixes, etc.) and adds a small but important ordering/hardening layer.
Key changes:
packages/programs/data/shared-log/src/index.tsafterOpen():pubsub.requestSubscribers(this.topic)for backfillfor...of+await this.handleSubscriptionChange(...)(instead of fire-and-forget), then flushes buffered replication-info.handleSubscriptionChange(...):AllReplicatingSegmentsMessageResponseRoleMessageRequestReplicationInfoMessageawait Promise.race([ rpc.send(...), delay(10s) ])This specifically stabilizes the existing shared-log test:
replication.spec.ts: "applies replication segments even if waitFor() fails"3) Test Flake Mitigations (Targeted)
Two tests were observed to be stable in isolation but flaky in long-suite execution due to timing sensitivity. This PR increases only the tightest waits.
packages/programs/data/shared-log/test/leader.spec.ts:waitForResolvedtimeouts with the test's 120s timeout forreachableOnlycover stabilitypackages/programs/data/shared-log/test/sharding.spec.ts:calculateTotalParticipation()to{ timeout: 30_000, delayInterval: 200 }4) Tooling: Run Tests In Git Worktrees
.aegir.jsupdated to handle git worktrees where.gitis a file (not a directory).This is not a runtime change, but it enables running the new regressions in secondary worktrees (which is how these comparisons were validated).
Why This Is Better Than #593 Alone Or PR Faolain#7 Alone
Compared to #593 alone
Compared to Faolain PR Faolain#7 alone
Tests Added / Updated
PubSub regression suite (added)
packages/transport/pubsub/test/bug1-initializeTopic-race.spec.tstopics.has(topic)is true immediately aftersubscribe(topic)(before debounce fires)packages/transport/pubsub/test/bug2-requestSubscribers-pendingSubscribe.spec.tspackages/transport/pubsub/test/bug3-subscribe-then-unsubscribe-before-debounce.spec.tspackages/transport/pubsub/test/bug4-pending-subscribe-receives-strict-pubsubdata.spec.tsSeekDeliveryPubSubData is delivered during the pending-subscribe debounce window (BUG4)Shared-log
Evidence / Repro (Master vs #593 vs This PR)
1) Master + regressions (proves BUG1 exists)
pnpm --filter @peerbit/pubsub testBUG 1 ... topics.has(topic) is true immediately after subscribe()(deterministic)2) #593 + regressions (proves BUG4 remains)
pnpm --filter @peerbit/pubsub testBUG 4 ... pending subscribe receives strict PubSubData3) This PR
pnpm --filter @peerbit/pubsub test46 passing, 1 pending)pnpm run test:ci:part-4(shared-log + shared-log-proxy)@peerbit/shared-log:1743 passing@peerbit/shared-log-proxy:1 passingHow To Run
pnpm --filter @peerbit/pubsub testpnpm run test:ci:part-4Behavioral Changes / Risk Notes
Strict deliveries during pending subscribe:
subscribe()(local intent) and is covered by BUG4 regression.Cancel path cleanup is deeper:
unsubscribe()cancels a pending (debounced) subscribe, we now fully undo eager topic initialization to prevent ghost topic state.Shared-log RPC send awaits are bounded:
Claims-to-Tests Matrix
subscribe()tracks topic immediately (no debounce window race)bug1-initializeTopic-race.spec.ts(master fails, this PR passes)bug3-subscribe-then-unsubscribe-before-debounce.spec.tsbug4-pending-subscribe-receives-strict-pubsubdata.spec.ts(#593 fails, this PR passes)bug2-requestSubscribers-pendingSubscribe.spec.ts(design guard)waitFor()rejectsreplication.spec.ts(existing) +pnpm run test:ci:part-4passpnpm run test:ci:part-4pass twice