diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index 8611274a78..d5386ba76d 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -199,7 +199,11 @@ export class Registrar implements RegistrarInterface { await Promise.all( [...topologies.values()].map(async topology => { - if (topology.filter?.has(remotePeer) === false) { + // If the topology has a filter, only call onDisconnect if the peer + // was previously added to the filter (which happens on onConnect). + // This ensures limited connections that were never notified via + // onConnect don't trigger onDisconnect. + if (topology.filter != null && topology.filter.has(remotePeer) !== true) { return } @@ -237,7 +241,11 @@ export class Registrar implements RegistrarInterface { await Promise.all( [...topologies.values()].map(async topology => { - if (topology.filter?.has(peer.id) === false) { + // If the topology has a filter, only call onDisconnect if the peer + // was previously added to the filter (which happens on onConnect). + // This ensures limited connections that were never notified via + // onConnect don't trigger onDisconnect. + if (topology.filter != null && topology.filter.has(peer.id) !== true) { return } diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index df93b4409e..30d4107b43 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -5,9 +5,10 @@ import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { expect } from 'aegir/chai' import { TypedEventEmitter } from 'main-event' import pDefer from 'p-defer' +import sinon from 'sinon' import { stubInterface } from 'sinon-ts' import { Registrar } from '../../src/registrar.js' -import type { Libp2pEvents, PeerId, PeerStore, Topology, Peer, Connection } from '@libp2p/interface' +import type { Libp2pEvents, PeerId, PeerStore, Topology, TopologyFilter, Peer, Connection } from '@libp2p/interface' import type { TypedEventTarget } from 'main-event' import type { StubbedInstance } from 'sinon-ts' @@ -229,6 +230,15 @@ describe('registrar topologies', () => { // register topology for protocol await registrar.register(protocol, topology) + // Peer data is in the peer store + peerStore.get.withArgs(remotePeerId).resolves({ + id: remotePeerId, + addresses: [], + protocols: [protocol], + metadata: new Map(), + tags: new Map() + }) + // remote peer connects events.safeDispatchEvent('peer:identify', { detail: { @@ -238,19 +248,35 @@ describe('registrar topologies', () => { } }) + // wait a bit to ensure onConnect is not called await expect(Promise.any([ onConnectDefer.promise, + new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 100) + }) + ])).to.eventually.not.be.rejected() + + // now simulate disconnect + events.safeDispatchEvent('peer:disconnect', { + detail: remotePeerId + }) + + // wait to ensure onDisconnect is not called + await expect(Promise.any([ onDisconnectDefer.promise, new Promise((resolve) => { setTimeout(() => { resolve() - }, 1000) + }, 100) }) ])).to.eventually.not.be.rejected() }) it('should call topology onConnect handler for limited connection when explicitly requested', async () => { const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() // setup connections before registrar const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) @@ -266,12 +292,24 @@ describe('registrar topologies', () => { notifyOnLimitedConnection: true, onConnect: () => { onConnectDefer.resolve() + }, + onDisconnect: () => { + onDisconnectDefer.resolve() } } // register topology for protocol await registrar.register(protocol, topology) + // Peer data is in the peer store + peerStore.get.withArgs(remotePeerId).resolves({ + id: remotePeerId, + addresses: [], + protocols: [protocol], + metadata: new Map(), + tags: new Map() + }) + // remote peer connects events.safeDispatchEvent('peer:identify', { detail: { @@ -282,6 +320,146 @@ describe('registrar topologies', () => { }) await expect(onConnectDefer.promise).to.eventually.be.undefined() + + // now simulate disconnect - this should also be called + events.safeDispatchEvent('peer:disconnect', { + detail: remotePeerId + }) + + await expect(onDisconnectDefer.promise).to.eventually.be.undefined() + }) + + it('should not call topology onDisconnect when peer was filtered out during connect', async () => { + const onDisconnectDefer = pDefer() + + // setup peer + const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + + // topology WITH filter - this is required to track which peers were notified + const filter = stubInterface({ + has: sinon.stub().returns(false), + add: sinon.stub(), + remove: sinon.stub() + }) + + const topology: Topology = { + filter, + onDisconnect: () => { + onDisconnectDefer.reject(new Error('Topology onDisconnect called for peer that was never onConnect\'d')) + } + } + + // register topology for protocol + await registrar.register(protocol, topology) + + // Peer data is in the peer store + peerStore.get.withArgs(remotePeerId).resolves({ + id: remotePeerId, + addresses: [], + protocols: [protocol], + metadata: new Map(), + tags: new Map() + }) + + // simulate disconnect without the peer ever being in the filter + // (this happens when a limited connection connects and disconnects + // but the topology has notifyOnLimitedConnection: false) + events.safeDispatchEvent('peer:disconnect', { + detail: remotePeerId + }) + + // wait to ensure onDisconnect is not called + await expect(Promise.any([ + onDisconnectDefer.promise, + new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 100) + }) + ])).to.eventually.not.be.rejected() + }) + + it('should not call topology onDisconnect on peer update when peer was filtered out during connect', async () => { + const onDisconnectDefer = pDefer() + + // setup peer + const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + + // connection is limited + const conn = stubInterface({ + remotePeer: remotePeerId, + limits: { + bytes: 100n + } + }) + + // topology WITH filter - this is required to track which peers were notified + const filter = stubInterface({ + has: sinon.stub().returns(false), + add: sinon.stub(), + remove: sinon.stub() + }) + + const topology: Topology = { + filter, + // notifyOnLimitedConnection is NOT set (defaults to false) + onDisconnect: () => { + onDisconnectDefer.reject(new Error('Topology onDisconnect called for peer that was never onConnect\'d')) + } + } + + // register topology for protocol + await registrar.register(protocol, topology) + + // Peer data is in the peer store with the protocol + peerStore.get.withArgs(remotePeerId).resolves({ + id: remotePeerId, + addresses: [], + protocols: [protocol], + metadata: new Map(), + tags: new Map() + }) + + // remote peer identifies with limited connection + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } + }) + + // wait a bit to ensure onConnect is not called (because connection is limited) + await new Promise(resolve => setTimeout(resolve, 100)) + + // now simulate peer update removing the protocol + // (this triggers onDisconnect in _onPeerUpdate) + events.safeDispatchEvent('peer:update', { + detail: { + peer: { + id: remotePeerId, + protocols: [], // protocol removed + addresses: [], + metadata: new Map() + }, + previous: { + id: remotePeerId, + protocols: [protocol], // had protocol before + addresses: [], + metadata: new Map() + } + } + }) + + // wait to ensure onDisconnect is not called + await expect(Promise.any([ + onDisconnectDefer.promise, + new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 100) + }) + ])).to.eventually.not.be.rejected() }) it('should call topology handlers for non-limited connection opened after limited connection', async () => {