Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ export class Registrar implements RegistrarInterface {

await Promise.all(
[...topologies.values()].map(async topology => {
if (topology.filter?.has(remotePeer) === false) {
// If the topology has a filter, only call onDisconnect if the peer
// was previously added to the filter (which happens on onConnect).
// This ensures limited connections that were never notified via
// onConnect don't trigger onDisconnect.
if (topology.filter != null && topology.filter.has(remotePeer) !== true) {
return
}

Expand Down Expand Up @@ -237,7 +241,11 @@ export class Registrar implements RegistrarInterface {

await Promise.all(
[...topologies.values()].map(async topology => {
if (topology.filter?.has(peer.id) === false) {
// If the topology has a filter, only call onDisconnect if the peer
// was previously added to the filter (which happens on onConnect).
// This ensures limited connections that were never notified via
// onConnect don't trigger onDisconnect.
if (topology.filter != null && topology.filter.has(peer.id) !== true) {
return
}

Expand Down
182 changes: 180 additions & 2 deletions packages/libp2p/test/registrar/registrar.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
import { TypedEventEmitter } from 'main-event'
import pDefer from 'p-defer'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { Registrar } from '../../src/registrar.js'
import type { Libp2pEvents, PeerId, PeerStore, Topology, Peer, Connection } from '@libp2p/interface'
import type { Libp2pEvents, PeerId, PeerStore, Topology, TopologyFilter, Peer, Connection } from '@libp2p/interface'
import type { TypedEventTarget } from 'main-event'
import type { StubbedInstance } from 'sinon-ts'

Expand Down Expand Up @@ -229,6 +230,15 @@ describe('registrar topologies', () => {
// register topology for protocol
await registrar.register(protocol, topology)

// Peer data is in the peer store
peerStore.get.withArgs(remotePeerId).resolves({
id: remotePeerId,
addresses: [],
protocols: [protocol],
metadata: new Map(),
tags: new Map()
})

// remote peer connects
events.safeDispatchEvent('peer:identify', {
detail: {
Expand All @@ -238,19 +248,35 @@ describe('registrar topologies', () => {
}
})

// wait a bit to ensure onConnect is not called
await expect(Promise.any([
onConnectDefer.promise,
new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 100)
})
])).to.eventually.not.be.rejected()

// now simulate disconnect
events.safeDispatchEvent('peer:disconnect', {
detail: remotePeerId
})

// wait to ensure onDisconnect is not called
await expect(Promise.any([
onDisconnectDefer.promise,
new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 1000)
}, 100)
})
])).to.eventually.not.be.rejected()
})

it('should call topology onConnect handler for limited connection when explicitly requested', async () => {
const onConnectDefer = pDefer()
const onDisconnectDefer = pDefer()

// setup connections before registrar
const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
Expand All @@ -266,12 +292,24 @@ describe('registrar topologies', () => {
notifyOnLimitedConnection: true,
onConnect: () => {
onConnectDefer.resolve()
},
onDisconnect: () => {
onDisconnectDefer.resolve()
}
}

// register topology for protocol
await registrar.register(protocol, topology)

// Peer data is in the peer store
peerStore.get.withArgs(remotePeerId).resolves({
id: remotePeerId,
addresses: [],
protocols: [protocol],
metadata: new Map(),
tags: new Map()
})

// remote peer connects
events.safeDispatchEvent('peer:identify', {
detail: {
Expand All @@ -282,6 +320,146 @@ describe('registrar topologies', () => {
})

await expect(onConnectDefer.promise).to.eventually.be.undefined()

// now simulate disconnect - this should also be called
events.safeDispatchEvent('peer:disconnect', {
detail: remotePeerId
})

await expect(onDisconnectDefer.promise).to.eventually.be.undefined()
})

it('should not call topology onDisconnect when peer was filtered out during connect', async () => {
const onDisconnectDefer = pDefer()

// setup peer
const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))

// topology WITH filter - this is required to track which peers were notified
const filter = stubInterface<TopologyFilter>({
has: sinon.stub().returns(false),
add: sinon.stub(),
remove: sinon.stub()
})

const topology: Topology = {
filter,
onDisconnect: () => {
onDisconnectDefer.reject(new Error('Topology onDisconnect called for peer that was never onConnect\'d'))
}
}

// register topology for protocol
await registrar.register(protocol, topology)

// Peer data is in the peer store
peerStore.get.withArgs(remotePeerId).resolves({
id: remotePeerId,
addresses: [],
protocols: [protocol],
metadata: new Map(),
tags: new Map()
})

// simulate disconnect without the peer ever being in the filter
// (this happens when a limited connection connects and disconnects
// but the topology has notifyOnLimitedConnection: false)
events.safeDispatchEvent('peer:disconnect', {
detail: remotePeerId
})

// wait to ensure onDisconnect is not called
await expect(Promise.any([
onDisconnectDefer.promise,
new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 100)
})
])).to.eventually.not.be.rejected()
})

it('should not call topology onDisconnect on peer update when peer was filtered out during connect', async () => {
const onDisconnectDefer = pDefer()

// setup peer
const remotePeerId = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))

// connection is limited
const conn = stubInterface<Connection>({
remotePeer: remotePeerId,
limits: {
bytes: 100n
}
})

// topology WITH filter - this is required to track which peers were notified
const filter = stubInterface<TopologyFilter>({
has: sinon.stub().returns(false),
add: sinon.stub(),
remove: sinon.stub()
})

const topology: Topology = {
filter,
// notifyOnLimitedConnection is NOT set (defaults to false)
onDisconnect: () => {
onDisconnectDefer.reject(new Error('Topology onDisconnect called for peer that was never onConnect\'d'))
}
}

// register topology for protocol
await registrar.register(protocol, topology)

// Peer data is in the peer store with the protocol
peerStore.get.withArgs(remotePeerId).resolves({
id: remotePeerId,
addresses: [],
protocols: [protocol],
metadata: new Map(),
tags: new Map()
})

// remote peer identifies with limited connection
events.safeDispatchEvent('peer:identify', {
detail: {
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})

// wait a bit to ensure onConnect is not called (because connection is limited)
await new Promise(resolve => setTimeout(resolve, 100))

// now simulate peer update removing the protocol
// (this triggers onDisconnect in _onPeerUpdate)
events.safeDispatchEvent('peer:update', {
detail: {
peer: {
id: remotePeerId,
protocols: [], // protocol removed
addresses: [],
metadata: new Map()
},
previous: {
id: remotePeerId,
protocols: [protocol], // had protocol before
addresses: [],
metadata: new Map()
}
}
})

// wait to ensure onDisconnect is not called
await expect(Promise.any([
onDisconnectDefer.promise,
new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 100)
})
])).to.eventually.not.be.rejected()
})

it('should call topology handlers for non-limited connection opened after limited connection', async () => {
Expand Down