Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/p2p/libp2p/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,10 +1067,9 @@ func TestTopologyOverSaturated(t *testing.T) {
addr := serviceUnderlayAddress(t, s1)

// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err := s2.Connect(ctx, addr)
if err == nil {
t.Fatal("expected connect to fail but it didn't")
}
// Connect might return nil if the handshake completes before the server processes the rejection (protocol race).
// We verify that the peer is eventually disconnected.
_, _ = s2.Connect(ctx, addr)

expectPeers(t, s1)
expectPeersEventually(t, s2)
Expand Down Expand Up @@ -1171,9 +1170,10 @@ func TestWithBlocklistStreams(t *testing.T) {
expectPeersEventually(t, s2)
expectPeersEventually(t, s1)

if _, err := s2.Connect(ctx, s1_underlay); err == nil {
t.Fatal("expected error when connecting to blocklisted peer")
}
// s2 connects to s1, but because of blocklist it should fail
// Connect might return nil if the handshake completes before the server processes the blocklist (protocol race).
// We verify that the peer is eventually disconnected.
_, _ = s2.Connect(ctx, s1_underlay)

expectPeersEventually(t, s2)
expectPeersEventually(t, s1)
Expand Down
12 changes: 10 additions & 2 deletions pkg/p2p/libp2p/internal/handshake/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -94,6 +95,7 @@ type Service struct {
libp2pID libp2ppeer.ID
metrics metrics
picker p2p.Picker
mu sync.RWMutex
hostAddresser Addresser
}

Expand Down Expand Up @@ -136,6 +138,8 @@ func New(signer crypto.Signer, advertisableAddresser AdvertisableAddressResolver
}

func (s *Service) SetPicker(n p2p.Picker) {
s.mu.Lock()
defer s.mu.Unlock()
s.picker = n
}

Expand Down Expand Up @@ -351,8 +355,12 @@ func (s *Service) Handle(ctx context.Context, stream p2p.Stream, peerMultiaddrs

overlay := swarm.NewAddress(ack.Address.Overlay)

if s.picker != nil {
if !s.picker.Pick(p2p.Peer{Address: overlay, FullNode: ack.FullNode}) {
s.mu.RLock()
picker := s.picker
s.mu.RUnlock()

if picker != nil {
if !picker.Pick(p2p.Peer{Address: overlay, FullNode: ack.FullNode}) {
return nil, ErrPicker
}
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,15 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return nil, fmt.Errorf("autonat: %w", err)
}

blocklist := blocklist.NewBlocklist(storer)

handshakeService, err := handshake.New(signer, newCompositeAddressResolver(tcpResolver, wssResolver), overlay, networkID, o.FullNode, o.Nonce, newHostAddresser(h), o.WelcomeMessage, o.ValidateOverlay, h.ID(), logger)
if err != nil {
return nil, fmt.Errorf("handshake service: %w", err)
}

handshakeService.SetPicker(&blocklistPicker{blocklist: blocklist})

// Create a new dialer for libp2p ping protocol. This ensures that the protocol
// uses a different set of keys to do ping. It prevents inconsistencies in peerstore as
// the addresses used are not dialable and hence should be cleaned up. We should create
Expand All @@ -534,7 +538,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
networkID: networkID,
peers: peerRegistry,
addressbook: ab,
blocklist: blocklist.NewBlocklist(storer),
blocklist: blocklist,
logger: logger,
tracer: tracer,
connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options
Expand Down Expand Up @@ -647,6 +651,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
handshakeStream := newStream(stream, s.metrics)

peerMultiaddrs, err := s.peerMultiaddrs(s.ctx, stream.Conn().RemoteMultiaddr(), peerID)

if err != nil {
s.logger.Debug("stream handler: handshake: build remote multiaddrs", "peer_id", peerID, "error", err)
s.logger.Error(nil, "stream handler: handshake: build remote multiaddrs", "peer_id", peerID)
Expand Down Expand Up @@ -1681,3 +1686,15 @@ func waitPeerAddrs(ctx context.Context, s peerstore.Peerstore, peerID libp2ppeer
return s.Addrs(peerID)
}
}

type blocklistPicker struct {
blocklist *blocklist.Blocklist
}

func (b *blocklistPicker) Pick(peer p2p.Peer) bool {
blocked, err := b.blocklist.Exists(peer.Address)
if err != nil {
return false
}
return !blocked
}
Loading