Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import * as findUp from "find-up";
import path from "path";

const root = path.dirname(await findUp.findUp(".git", { type: "directory" }));
const gitDir = await findUp.findUp(".git", { type: "directory" });
const gitFile = gitDir ? null : await findUp.findUp(".git", { type: "file" });
const root = path.dirname(gitDir ?? gitFile);

export default {
// global options
Expand Down
4 changes: 3 additions & 1 deletion packages/clients/peerbit/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import fs from "fs";
import { createRequire } from "module";
import path from "path";

const root = path.dirname(await findUp.findUp(".git", { type: "directory" }));
const gitDir = await findUp.findUp(".git", { type: "directory" });
const gitFile = gitDir ? null : await findUp.findUp(".git", { type: "file" });
const root = path.dirname(gitDir ?? gitFile);
const resolverFromRoot = createRequire(path.join(root, "package.json"));
const resolverFromLocal = createRequire(import.meta.url);

Expand Down
137 changes: 103 additions & 34 deletions packages/programs/data/shared-log/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,24 @@ export class SharedLog<

private latestReplicationInfoMessage!: Map<string, bigint>;

// Per-peer queue to serialize replication-info processing (prevents TOCTOU race
// where two concurrent async handlers both see prevCount === 0 and emit duplicate
// replicator:join events).
private _replicationInfoQueue!: Map<string, Promise<void>>;

// Stores the latest replication-info message per peer when addReplicationRange()
// throws NotStartedError (indexes not ready). Drained in afterOpen().
private _pendingReplicationInfo!: Map<
string,
{
ranges: ReplicationRangeIndexable<any>[];
from: PublicSignKey;
reset: boolean;
timestamp: number;
messageTimestamp: bigint;
}
>;

private remoteBlocks!: RemoteBlocks;

private openTime!: number;
Expand Down Expand Up @@ -1281,6 +1299,7 @@ export class SharedLog<
diffs = changes;
}

const wasAlreadyReplicator = this.uniqueReplicators.has(from.hashcode());
this.uniqueReplicators.add(from.hashcode());

let now = +new Date();
Expand Down Expand Up @@ -1379,7 +1398,7 @@ export class SharedLog<
}),
);

if (isNewReplicator) {
if (isNewReplicator && !wasAlreadyReplicator) {
this.events.dispatchEvent(
new CustomEvent<ReplicatorJoinEvent>("replicator:join", {
detail: { publicKey: from },
Expand Down Expand Up @@ -1894,6 +1913,8 @@ export class SharedLog<
this._pendingDeletes = new Map();
this._pendingIHave = new Map();
this.latestReplicationInfoMessage = new Map();
this._replicationInfoQueue = new Map();
this._pendingReplicationInfo = new Map();
this.coordinateToHash = new Cache<string>({ max: 1e6, ttl: 1e4 });
this.recentlyRebalanced = new Cache<string>({ max: 1e4, ttl: 1e5 });

Expand Down Expand Up @@ -2245,6 +2266,27 @@ export class SharedLog<
this.handleSubscriptionChange(v, [this.topic], true);
},
);

// Drain replication-info messages that arrived before indexes were ready
if (this._pendingReplicationInfo.size > 0) {
const pending = new Map(this._pendingReplicationInfo);
this._pendingReplicationInfo.clear();
for (const [, info] of pending) {
try {
await this.addReplicationRange(info.ranges, info.from, {
reset: info.reset,
checkDuplicates: true,
timestamp: info.timestamp,
});
} catch (e: any) {
if (!isNotStartedError(e)) {
logger.error(
`Failed to apply pending replication info: ${e?.message ?? e}`,
);
}
}
}
}
}

async reset() {
Expand Down Expand Up @@ -2526,6 +2568,8 @@ export class SharedLog<
this._pendingDeletes.clear();
this._pendingIHave.clear();
this.latestReplicationInfoMessage.clear();
this._replicationInfoQueue.clear();
this._pendingReplicationInfo.clear();
this._gidPeersHistory.clear();
this._requestIPruneSent.clear();
this._requestIPruneResponseReplicatorSet.clear();
Expand Down Expand Up @@ -2968,43 +3012,68 @@ export class SharedLog<
// (and downstream `waitForReplicator()` timeouts) under timing-sensitive joins.
const from = context.from!;
const messageTimestamp = context.message.header.timestamp;
(async () => {
const prev = this.latestReplicationInfoMessage.get(from.hashcode());
if (prev && prev > messageTimestamp) {
return;
}

this.latestReplicationInfoMessage.set(
from.hashcode(),
messageTimestamp,
);
// Serialize replication-info processing per peer to prevent TOCTOU
// races where concurrent handlers both see prevCount === 0 and emit
// duplicate replicator:join events.
const peerHash = from.hashcode();
const prevWork =
this._replicationInfoQueue.get(peerHash) ?? Promise.resolve();
const work = prevWork
.then(async () => {
const prevTs =
this.latestReplicationInfoMessage.get(peerHash);
if (prevTs && prevTs > messageTimestamp) {
return;
}

if (this.closed) {
return;
}
this.latestReplicationInfoMessage.set(
peerHash,
messageTimestamp,
);

const reset = msg instanceof AllReplicatingSegmentsMessage;
await this.addReplicationRange(
replicationInfoMessage.segments.map((x) =>
if (this.closed) {
return;
}

const reset = msg instanceof AllReplicatingSegmentsMessage;
const ranges = replicationInfoMessage.segments.map((x) =>
x.toReplicationRangeIndexable(from),
),
from,
{
reset,
checkDuplicates: true,
timestamp: Number(messageTimestamp),
},
);
})().catch((e) => {
if (isNotStartedError(e)) {
return;
}
logger.error(
`Failed to apply replication settings from '${from.hashcode()}': ${
e?.message ?? e
}`,
);
});
);
try {
await this.addReplicationRange(ranges, from, {
reset,
checkDuplicates: true,
timestamp: Number(messageTimestamp),
});
// Successful — clear any stored pending info for this peer
this._pendingReplicationInfo.delete(peerHash);
} catch (e: any) {
if (isNotStartedError(e)) {
// Store for retry after indexes are ready
this._pendingReplicationInfo.set(peerHash, {
ranges,
from,
reset,
timestamp: Number(messageTimestamp),
messageTimestamp,
});
return;
}
throw e;
}
})
.catch((e) => {
if (isNotStartedError(e)) {
return;
}
logger.error(
`Failed to apply replication settings from '${peerHash}': ${
e?.message ?? e
}`,
);
});
this._replicationInfoQueue.set(peerHash, work);
} else if (msg instanceof StoppedReplicating) {
if (context.from.equals(this.node.identity.publicKey)) {
return;
Expand Down
5 changes: 5 additions & 0 deletions packages/transport/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {
async unsubscribe(topic: string) {
if (this.debounceSubscribeAggregator.has(topic)) {
this.debounceSubscribeAggregator.delete(topic); // cancel subscription before it performed
// Clean up eagerly-initialized topic tracking from subscribe().
// Without this, the topics entry persists as an orphaned empty Map
// because _unsubscribe() is never called (nothing in subscriptions).
this.topics.delete(topic);
this.topicsToPeers.delete(topic);
return false;
}
const subscriptions = this.subscriptions.get(topic);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// packages/transport/pubsub/test/bug2-requestSubscribers-pendingSubscribe.spec.ts
import { TestSession } from "@peerbit/libp2p-test-utils";
import { waitForNeighbour } from "@peerbit/stream";
import { delay, waitForResolved } from "@peerbit/time";
import { expect } from "chai";
import { DirectSub } from "../src/index.js";

function deferred<T = void>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}

describe("BUG 2: pending subscribe should be visible via requestSubscribers", function () {
this.timeout(60_000);

// Skip: DirectSub.requestSubscribers does not yet include pending subscribes.
// This test documents the gap as a design probe -- when pending subscribes are
// included in requestSubscribers responses, this test should be unskipped.
it.skip("peer discovers remote subscription while remote _subscribe() is blocked (pending subscribe advertised)", async () => {
const TOPIC = "pending-subscribe-advertised";

const session = await TestSession.disconnected<{ pubsub: DirectSub }>(2, {
services: {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: false,
}),
},
});

try {
const a = session.peers[0].services.pubsub;
const b = session.peers[1].services.pubsub;

// Connect first (so any handshake/requestSubscribers traffic can happen)
await session.connect([[session.peers[0], session.peers[1]]]);
await waitForNeighbour(a, b);

// Block A._subscribe() so that:
// - A has called subscribe(TOPIC) (so debounce aggregator has it)
// - but A never reaches the point where it sets `subscriptions`
// This isolates the "pending subscribe counts" logic.
const gate = deferred<void>();
const aAny = a as any;

expect(aAny._subscribe, "Expected DirectSub to have a _subscribe() method").to.be.a(
"function",
);

const originalSubscribeImpl = aAny._subscribe.bind(aAny);
aAny._subscribe = async (...args: any[]) => {
await gate.promise;
return originalSubscribeImpl(...args);
};

let aSubscribeResolved = false;
const aSubscribePromise = a.subscribe(TOPIC).then(() => {
aSubscribeResolved = true;
});

// Now subscribe normally on B
await b.subscribe(TOPIC);

// Key assertion:
// B should learn that A is subscribed *even though* A's subscribe hasn't resolved yet.
// Without the "pending subscribe" inclusion in requestSubscribers, B has no basis to learn A.
await waitForResolved(() => {
expect(aSubscribeResolved, "A.subscribe should still be pending").to.be.false;

const bTopicMap = b.topics.get(TOPIC);
expect(bTopicMap).to.not.be.undefined;
expect(
bTopicMap!.has(a.publicKeyHash),
"B should record A as a subscriber while A is pending",
).to.be.true;
});

// Cleanup: release A and let subscribe resolve, so we don't leave dangling work.
gate.resolve();
await aSubscribePromise;
} finally {
await session.stop();
}
});

it("a node that did NOT subscribe does NOT start tracking a topic just because it receives Subscribe traffic (design guard)", async () => {
const TOPIC = "non-subscriber-should-not-track";

const session = await TestSession.disconnected<{ pubsub: DirectSub }>(2, {
services: {
pubsub: (c) =>
new DirectSub(c, {
canRelayMessage: true,
connectionManager: false,
}),
},
});

try {
const a = session.peers[0].services.pubsub; // will NOT subscribe
const b = session.peers[1].services.pubsub; // WILL subscribe

await session.connect([[session.peers[0], session.peers[1]]]);
await waitForNeighbour(a, b);

await b.subscribe(TOPIC);

// Give a moment for any Subscribe traffic to be exchanged
await delay(250);

// If we ever re-introduce "initializeTopic on incoming Subscribe",
// this would start failing.
expect(a.topics.has(TOPIC)).to.equal(false);
expect(a.topics.get(TOPIC)).to.equal(undefined);
} finally {
await session.stop();
}
});
});
Loading
Loading