diff --git a/pkg/bzz/address.go b/pkg/bzz/address.go index 1a4021e9eff..89914ef66bf 100644 --- a/pkg/bzz/address.go +++ b/pkg/bzz/address.go @@ -15,14 +15,12 @@ import ( "errors" "fmt" "slices" - "sort" "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/swarm" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" ) var ErrInvalidAddress = errors.New("invalid address") @@ -222,35 +220,3 @@ func parseMultiaddrs(addrs []string) ([]ma.Multiaddr, error) { } return multiAddrs, nil } - -func SelectBestAdvertisedAddress(addrs []ma.Multiaddr, fallback ma.Multiaddr) ma.Multiaddr { - if len(addrs) == 0 { - return fallback - } - - hasTCPProtocol := func(addr ma.Multiaddr) bool { - _, err := addr.ValueForProtocol(ma.P_TCP) - return err == nil - } - - // Sort addresses to prioritize TCP over other protocols - sort.SliceStable(addrs, func(i, j int) bool { - iTCP := hasTCPProtocol(addrs[i]) - jTCP := hasTCPProtocol(addrs[j]) - return iTCP && !jTCP - }) - - for _, addr := range addrs { - if manet.IsPublicAddr(addr) { - return addr - } - } - - for _, addr := range addrs { - if !manet.IsPrivateAddr(addr) { - return addr - } - } - - return addrs[0] -} diff --git a/pkg/bzz/address_test.go b/pkg/bzz/address_test.go index 0c50b08de64..8d038010e25 100644 --- a/pkg/bzz/address_test.go +++ b/pkg/bzz/address_test.go @@ -64,134 +64,6 @@ func TestBzzAddress(t *testing.T) { } } -func TestSelectBestAdvertisedAddress(t *testing.T) { - t.Parallel() - - mustMultiaddr := func(s string) multiaddr.Multiaddr { - addr, err := multiaddr.NewMultiaddr(s) - if err != nil { - t.Fatalf("failed to create multiaddr %s: %v", s, err) - } - return addr - } - - tests := []struct { - name string - addrs []multiaddr.Multiaddr - fallback multiaddr.Multiaddr - expected multiaddr.Multiaddr - }{ - { - name: "empty addresses returns fallback", - addrs: []multiaddr.Multiaddr{}, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - }, - { - name: "nil addresses returns fallback", - addrs: nil, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - }, - { - name: "prefers public addresses", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private - mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), // public - mustMultiaddr("/ip4/10.0.0.1/tcp/8080"), // private - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), - }, - { - name: "prefers first public address when multiple exist", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private - mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), // public - mustMultiaddr("/ip4/1.1.1.1/tcp/8080"), // public - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), - }, - { - name: "prefers non-private when no public addresses", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), // loopback - mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private but not loopback - mustMultiaddr("/ip4/10.0.0.1/tcp/8080"), // private but not loopback - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), - expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - }, - { - name: "returns first address when all are loopback", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - mustMultiaddr("/ip4/127.0.0.1/tcp/8081"), - mustMultiaddr("/ip6/::1/tcp/8080"), - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), - expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), - }, - { - name: "sorts TCP addresses first", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/192.168.1.1/udp/8080"), // UDP - mustMultiaddr("/ip4/1.1.1.1/udp/8080"), // UDP public - mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), // TCP public - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), - expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), - }, - { - name: "handles IPv6 addresses", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip6/::1/tcp/8080"), // loopback - mustMultiaddr("/ip6/2001:db8::1/tcp/8080"), // public IPv6 - mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private IPv4 - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), - expected: mustMultiaddr("/ip6/2001:db8::1/tcp/8080"), - }, - { - name: "handles mixed protocols with preference order", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/192.168.1.1/udp/8080"), // private UDP - mustMultiaddr("/ip4/192.168.1.2/tcp/8080"), // private TCP - mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), // loopback TCP - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), - expected: mustMultiaddr("/ip4/192.168.1.2/tcp/8080"), // first TCP, and it's non-loopback - }, - { - name: "single address", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), - expected: mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), - }, - { - name: "websocket addresses", - addrs: []multiaddr.Multiaddr{ - mustMultiaddr("/ip4/127.0.0.1/tcp/8080/ws"), - mustMultiaddr("/ip4/8.8.8.8/tcp/8080/ws"), // public with websocket - }, - fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), - expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080/ws"), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := bzz.SelectBestAdvertisedAddress(tt.addrs, tt.fallback) - if !result.Equal(tt.expected) { - t.Errorf("SelectBestAdvertisedAddress() = %v, want %v", result, tt.expected) - } - }) - } -} - func TestAreUnderlaysEqual(t *testing.T) { // --- Test Data Initialization --- addr1 := mustNewMultiaddr(t, "/ip4/127.0.0.1/tcp/8001") diff --git a/pkg/bzz/transport.go b/pkg/bzz/transport.go new file mode 100644 index 00000000000..579a1807936 --- /dev/null +++ b/pkg/bzz/transport.go @@ -0,0 +1,108 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bzz + +import ( + "sort" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +// TransportType represents the transport protocol of a multiaddress. +type TransportType int + +const ( + // TransportUnknown indicates an unrecognized transport. + TransportUnknown TransportType = iota + // TransportTCP indicates plain TCP without WebSocket. + TransportTCP + // TransportWS indicates WebSocket without TLS. + TransportWS + // TransportWSS indicates WebSocket with TLS (secure). + TransportWSS +) + +// String returns a string representation of the transport type. +func (t TransportType) String() string { + switch t { + case TransportTCP: + return "tcp" + case TransportWS: + return "ws" + case TransportWSS: + return "wss" + default: + return "unknown" + } +} + +// Priority returns the sorting priority for the transport type. +// Lower value = higher priority: TCP (0) > WS (1) > WSS (2) > Unknown (3) +func (t TransportType) Priority() int { + switch t { + case TransportTCP: + return 0 + case TransportWS: + return 1 + case TransportWSS: + return 2 + default: + return 3 + } +} + +// ClassifyTransport returns the transport type of a multiaddress. +// It distinguishes between plain TCP, WebSocket (WS), and secure WebSocket (WSS). +func ClassifyTransport(addr ma.Multiaddr) TransportType { + if addr == nil { + return TransportUnknown + } + + hasProtocol := func(p int) bool { + _, err := addr.ValueForProtocol(p) + return err == nil + } + + hasWS := hasProtocol(ma.P_WS) + hasTLS := hasProtocol(ma.P_TLS) + hasTCP := hasProtocol(ma.P_TCP) + + switch { + case hasWS && hasTLS: + return TransportWSS + case hasWS: + return TransportWS + case hasTCP: + return TransportTCP + default: + return TransportUnknown + } +} + +func SelectBestAdvertisedAddress(addrs []ma.Multiaddr, fallback ma.Multiaddr) ma.Multiaddr { + if len(addrs) == 0 { + return fallback + } + + // Sort addresses: first by transport priority (TCP > WS > WSS), preserving relative order + sort.SliceStable(addrs, func(i, j int) bool { + return ClassifyTransport(addrs[i]).Priority() < ClassifyTransport(addrs[j]).Priority() + }) + + for _, addr := range addrs { + if manet.IsPublicAddr(addr) { + return addr + } + } + + for _, addr := range addrs { + if !manet.IsPrivateAddr(addr) { + return addr + } + } + + return addrs[0] +} diff --git a/pkg/bzz/transport_test.go b/pkg/bzz/transport_test.go new file mode 100644 index 00000000000..fab5f90246f --- /dev/null +++ b/pkg/bzz/transport_test.go @@ -0,0 +1,338 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bzz_test + +import ( + "testing" + + "github.com/ethersphere/bee/v2/pkg/bzz" + + "github.com/multiformats/go-multiaddr" +) + +func TestSelectBestAdvertisedAddress(t *testing.T) { + t.Parallel() + + mustMultiaddr := func(s string) multiaddr.Multiaddr { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + t.Fatalf("failed to create multiaddr %s: %v", s, err) + } + return addr + } + + tests := []struct { + name string + addrs []multiaddr.Multiaddr + fallback multiaddr.Multiaddr + expected multiaddr.Multiaddr + }{ + { + name: "empty addresses returns fallback", + addrs: []multiaddr.Multiaddr{}, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + }, + { + name: "nil addresses returns fallback", + addrs: nil, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + }, + { + name: "prefers public addresses", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private + mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), // public + mustMultiaddr("/ip4/10.0.0.1/tcp/8080"), // private + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), + }, + { + name: "prefers first public address when multiple exist", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private + mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), // public + mustMultiaddr("/ip4/1.1.1.1/tcp/8080"), // public + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), + }, + { + name: "prefers non-private when no public addresses", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), // loopback + mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private but not loopback + mustMultiaddr("/ip4/10.0.0.1/tcp/8080"), // private but not loopback + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), + expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + }, + { + name: "returns first address when all are loopback", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + mustMultiaddr("/ip4/127.0.0.1/tcp/8081"), + mustMultiaddr("/ip6/::1/tcp/8080"), + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), + expected: mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), + }, + { + name: "sorts TCP addresses first", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/192.168.1.1/udp/8080"), // UDP + mustMultiaddr("/ip4/1.1.1.1/udp/8080"), // UDP public + mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), // TCP public + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), + expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080"), + }, + { + name: "handles IPv6 addresses", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip6/::1/tcp/8080"), // loopback + mustMultiaddr("/ip6/2001:db8::1/tcp/8080"), // public IPv6 + mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), // private IPv4 + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), + expected: mustMultiaddr("/ip6/2001:db8::1/tcp/8080"), + }, + { + name: "handles mixed protocols with preference order", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/192.168.1.1/udp/8080"), // private UDP + mustMultiaddr("/ip4/192.168.1.2/tcp/8080"), // private TCP + mustMultiaddr("/ip4/127.0.0.1/tcp/8080"), // loopback TCP + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), + expected: mustMultiaddr("/ip4/192.168.1.2/tcp/8080"), // first TCP, and it's non-loopback + }, + { + name: "single address", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), + expected: mustMultiaddr("/ip4/192.168.1.1/tcp/8080"), + }, + { + name: "websocket addresses", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/127.0.0.1/tcp/8080/ws"), + mustMultiaddr("/ip4/8.8.8.8/tcp/8080/ws"), // public with websocket + }, + fallback: mustMultiaddr("/ip4/127.0.0.1/tcp/9999"), + expected: mustMultiaddr("/ip4/8.8.8.8/tcp/8080/ws"), + }, + // Full underlay addresses tests + { + name: "full underlay addresses: prefers public TCP over private TCP", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/10.233.99.120/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // private + mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public + mustMultiaddr("/ip4/127.0.0.1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // loopback + }, + fallback: nil, + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: prefers public TCP over WSS addresses", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/10.233.99.120/tcp/1635/tls/sni/10-233-99-120.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // private WSS + mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public TCP + mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public WSS + }, + fallback: nil, + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: full node underlay list selects public TCP", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/10.233.99.120/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/10.233.99.120/tcp/1635/tls/sni/10-233-99-120.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/127.0.0.1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/127.0.0.1/tcp/1635/tls/sni/127-0-0-1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip6/::1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip6/::1/tcp/1635/tls/sni/0--1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + fallback: nil, + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: WSS only list selects public WSS", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/10.233.99.120/tcp/1635/tls/sni/10-233-99-120.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/127.0.0.1/tcp/1635/tls/sni/127-0-0-1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip6/::1/tcp/1635/tls/sni/0--1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + fallback: nil, + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: TCP only list selects public TCP", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/10.233.99.120/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip4/127.0.0.1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip6/::1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + fallback: nil, + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: IPv6 loopback list with no public addresses", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip6/::1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + mustMultiaddr("/ip6/::1/tcp/1635/tls/sni/0--1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + fallback: nil, + expected: mustMultiaddr("/ip6/::1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: WSS before TCP in input - TCP is still selected due to transport priority", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public WSS (first in input) + mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public TCP (second in input) + }, + fallback: nil, + // TCP is selected because: + // 1. Transport priority: TCP (0) > WS (1) > WSS (2) + // 2. Sorting puts TCP addresses before WSS addresses + // 3. Both are public, so the first public after sorting (TCP) is returned + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: TCP before WSS in input - TCP is selected", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public TCP (first) + mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public WSS (second) + }, + fallback: nil, + // TCP is selected because it has higher transport priority + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + { + name: "full underlay addresses: private TCP vs public WSS - public WSS is selected", + addrs: []multiaddr.Multiaddr{ + mustMultiaddr("/ip4/10.233.99.120/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // private TCP + mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), // public WSS + }, + fallback: nil, + // Public WSS is selected because there is no public TCP address + // Priority: public addresses > private addresses, then by transport type + expected: mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := bzz.SelectBestAdvertisedAddress(tt.addrs, tt.fallback) + if !result.Equal(tt.expected) { + t.Errorf("SelectBestAdvertisedAddress() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestClassifyTransport(t *testing.T) { + t.Parallel() + + mustMultiaddr := func(s string) multiaddr.Multiaddr { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + t.Fatalf("failed to create multiaddr %s: %v", s, err) + } + return addr + } + + tests := []struct { + name string + addr multiaddr.Multiaddr + expected bzz.TransportType + }{ + { + name: "plain TCP address", + addr: mustMultiaddr("/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + expected: bzz.TransportTCP, + }, + { + name: "plain TCP IPv6 address", + addr: mustMultiaddr("/ip6/::1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + expected: bzz.TransportTCP, + }, + { + name: "plain WS address", + addr: mustMultiaddr("/ip4/127.0.0.1/tcp/8080/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + expected: bzz.TransportWS, + }, + { + name: "WSS address with TLS and SNI", + addr: mustMultiaddr("/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + expected: bzz.TransportWSS, + }, + { + name: "WSS IPv6 address", + addr: mustMultiaddr("/ip6/::1/tcp/1635/tls/sni/0--1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj"), + expected: bzz.TransportWSS, + }, + { + name: "UDP address returns unknown", + addr: mustMultiaddr("/ip4/127.0.0.1/udp/8080"), + expected: bzz.TransportUnknown, + }, + { + name: "nil address returns unknown", + addr: nil, + expected: bzz.TransportUnknown, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := bzz.ClassifyTransport(tt.addr) + if result != tt.expected { + t.Errorf("ClassifyTransport() = %v (%s), want %v (%s)", result, result.String(), tt.expected, tt.expected.String()) + } + }) + } +} + +func TestTransportTypePriority(t *testing.T) { + t.Parallel() + + tests := []struct { + transport bzz.TransportType + priority int + }{ + {bzz.TransportTCP, 0}, + {bzz.TransportWS, 1}, + {bzz.TransportWSS, 2}, + {bzz.TransportUnknown, 3}, + } + + for _, tt := range tests { + t.Run(tt.transport.String(), func(t *testing.T) { + if got := tt.transport.Priority(); got != tt.priority { + t.Errorf("TransportType(%v).Priority() = %d, want %d", tt.transport, got, tt.priority) + } + }) + } + + // Verify priority ordering: TCP < WS < WSS < Unknown + if bzz.TransportTCP.Priority() >= bzz.TransportWS.Priority() { + t.Error("TCP priority should be lower (better) than WS") + } + if bzz.TransportWS.Priority() >= bzz.TransportWSS.Priority() { + t.Error("WS priority should be lower (better) than WSS") + } + if bzz.TransportWSS.Priority() >= bzz.TransportUnknown.Priority() { + t.Error("WSS priority should be lower (better) than Unknown") + } +} diff --git a/pkg/p2p/error.go b/pkg/p2p/error.go index 5ce54d12e10..4ec8e2cfea3 100644 --- a/pkg/p2p/error.go +++ b/pkg/p2p/error.go @@ -20,6 +20,8 @@ var ( ErrDialLightNode = errors.New("target peer is a light node") // ErrPeerBlocklisted is returned if peer is on blocklist ErrPeerBlocklisted = errors.New("peer blocklisted") + // ErrUnsupportedAddresses is returned when all peer addresses use unsupported transports + ErrUnsupportedAddresses = errors.New("no supported addresses") ) const ( diff --git a/pkg/p2p/libp2p/export_test.go b/pkg/p2p/libp2p/export_test.go index c85a85b54ae..7d7db8d5ee4 100644 --- a/pkg/p2p/libp2p/export_test.go +++ b/pkg/p2p/libp2p/export_test.go @@ -7,11 +7,13 @@ package libp2p import ( "context" + "github.com/ethersphere/bee/v2/pkg/bzz" handshake "github.com/ethersphere/bee/v2/pkg/p2p/libp2p/internal/handshake" libp2pm "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" libp2ppeer "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" ) func (s *Service) HandshakeService() *handshake.Service { @@ -56,3 +58,15 @@ func SetAutoTLSCertManager(o *Options, m autoTLSCertManager) { type AutoTLSCertManager = autoTLSCertManager var NewCompositeAddressResolver = newCompositeAddressResolver + +func (s *Service) FilterSupportedAddresses(addrs []ma.Multiaddr) []ma.Multiaddr { + return s.filterSupportedAddresses(addrs) +} + +func (s *Service) SetTransportFlags(hasTCP, hasWS, hasWSS bool) { + s.enabledTransports = map[bzz.TransportType]bool{ + bzz.TransportTCP: hasTCP, + bzz.TransportWS: hasWS, + bzz.TransportWSS: hasWSS, + } +} diff --git a/pkg/p2p/libp2p/filter_addresses_test.go b/pkg/p2p/libp2p/filter_addresses_test.go new file mode 100644 index 00000000000..a86b09e0b05 --- /dev/null +++ b/pkg/p2p/libp2p/filter_addresses_test.go @@ -0,0 +1,218 @@ +// Copyright 2020 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package libp2p_test + +import ( + "testing" + + ma "github.com/multiformats/go-multiaddr" +) + +func mustMultiaddr(t *testing.T, s string) ma.Multiaddr { + t.Helper() + addr, err := ma.NewMultiaddr(s) + if err != nil { + t.Fatalf("failed to create multiaddr from %q: %v", s, err) + } + return addr +} + +func TestFilterSupportedAddresses(t *testing.T) { + t.Parallel() + + // Plain TCP addresses (IPv4) + tcpPrivate := "/ip4/10.233.99.120/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + tcpPublic := "/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + tcpLoopback := "/ip4/127.0.0.1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + tcpIPv6Loopback := "/ip6/::1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + + // WSS addresses with TLS and SNI (full underlay format) + wssPrivate := "/ip4/10.233.99.120/tcp/1635/tls/sni/10-233-99-120.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + wssPublic := "/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + wssLoopback := "/ip4/127.0.0.1/tcp/1635/tls/sni/127-0-0-1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + wssIPv6Loopback := "/ip6/::1/tcp/1635/tls/sni/0--1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + + // Plain WS address (no TLS) + wsPlain := "/ip4/127.0.0.1/tcp/1635/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj" + + // All TCP addresses + allTCP := []string{tcpPrivate, tcpPublic, tcpLoopback, tcpIPv6Loopback} + // All WSS addresses + allWSS := []string{wssPrivate, wssPublic, wssLoopback, wssIPv6Loopback} + + tests := []struct { + name string + hasTCP bool + hasWS bool + hasWSS bool + inputAddrs []string + expectedCount int + }{ + { + name: "TCP only transport accepts all TCP addresses", + hasTCP: true, + hasWS: false, + hasWSS: false, + inputAddrs: allTCP, + expectedCount: 4, + }, + { + name: "TCP only transport rejects WSS addresses", + hasTCP: true, + hasWS: false, + hasWSS: false, + inputAddrs: allWSS, + expectedCount: 0, + }, + { + name: "WSS only transport accepts all WSS addresses", + hasTCP: false, + hasWS: false, + hasWSS: true, + inputAddrs: allWSS, + expectedCount: 4, + }, + { + name: "WSS only transport rejects TCP addresses", + hasTCP: false, + hasWS: false, + hasWSS: true, + inputAddrs: allTCP, + expectedCount: 0, + }, + { + name: "TCP and WSS transports accept mixed addresses", + hasTCP: true, + hasWS: false, + hasWSS: true, + inputAddrs: append(allTCP, allWSS...), + expectedCount: 8, + }, + { + name: "WS transport accepts plain WS but not WSS", + hasTCP: false, + hasWS: true, + hasWSS: false, + inputAddrs: []string{wsPlain, wssPublic}, + expectedCount: 1, + }, + { + name: "WSS transport does not accept plain WS", + hasTCP: false, + hasWS: false, + hasWSS: true, + inputAddrs: []string{wsPlain}, + expectedCount: 0, + }, + { + name: "No transports reject all addresses", + hasTCP: false, + hasWS: false, + hasWSS: false, + inputAddrs: append(allTCP, allWSS...), + expectedCount: 0, + }, + { + name: "Empty input returns empty output", + hasTCP: true, + hasWS: true, + hasWSS: true, + inputAddrs: []string{}, + expectedCount: 0, + }, + { + name: "Real node addresses with TCP and WSS", + hasTCP: true, + hasWS: false, + hasWSS: true, + inputAddrs: []string{tcpPrivate, wssPrivate, tcpPublic, wssPublic, tcpLoopback, wssLoopback, tcpIPv6Loopback, wssIPv6Loopback}, + expectedCount: 8, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Create a service for testing + s, _ := newService(t, 1, libp2pServiceOpts{}) + + // Set transport flags + s.SetTransportFlags(tc.hasTCP, tc.hasWS, tc.hasWSS) + + // Create multiaddrs from strings + addrs := make([]ma.Multiaddr, len(tc.inputAddrs)) + for i, addrStr := range tc.inputAddrs { + addrs[i] = mustMultiaddr(t, addrStr) + } + + // Filter addresses + filtered := s.FilterSupportedAddresses(addrs) + + // Verify count + if len(filtered) != tc.expectedCount { + t.Errorf("expected %d addresses, got %d", tc.expectedCount, len(filtered)) + } + }) + } +} + +func TestFilterSupportedAddresses_FullUnderlayAddresses(t *testing.T) { + t.Parallel() + + // Complete set of underlay addresses from a real full underlay node + fullAddrs := []string{ + "/ip4/10.233.99.120/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + "/ip4/10.233.99.120/tcp/1635/tls/sni/10-233-99-120.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + "/ip4/104.28.194.73/tcp/32002/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + "/ip4/104.28.194.73/tcp/32532/tls/sni/104-28-194-73.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + "/ip4/127.0.0.1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + "/ip4/127.0.0.1/tcp/1635/tls/sni/127-0-0-1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + "/ip6/::1/tcp/1634/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + "/ip6/::1/tcp/1635/tls/sni/0--1.k2k4r8pr3m3aug5nudg2y039qfj2gxw6wnlx0e0ghzxufcn38soyp9z4.libp2p.direct/ws/p2p/QmfSx1ujzboapD5h2CiqTJqUy46FeTDwXBszB3XUCfKEEj", + } + + // Create multiaddrs + addrs := make([]ma.Multiaddr, len(fullAddrs)) + for i, addrStr := range fullAddrs { + addrs[i] = mustMultiaddr(t, addrStr) + } + + t.Run("TCP and WSS enabled accepts all full underlay addresses", func(t *testing.T) { + t.Parallel() + s, _ := newService(t, 1, libp2pServiceOpts{}) + s.SetTransportFlags(true, false, true) + + filtered := s.FilterSupportedAddresses(addrs) + // 4 TCP addresses + 4 WSS addresses = 8 + if len(filtered) != 8 { + t.Errorf("expected 8 addresses, got %d", len(filtered)) + } + }) + + t.Run("TCP only accepts half of full underlay addresses", func(t *testing.T) { + t.Parallel() + s, _ := newService(t, 1, libp2pServiceOpts{}) + s.SetTransportFlags(true, false, false) + + filtered := s.FilterSupportedAddresses(addrs) + // Only 4 TCP addresses + if len(filtered) != 4 { + t.Errorf("expected 4 addresses (TCP only), got %d", len(filtered)) + } + }) + + t.Run("WSS only accepts half of full underlay addresses", func(t *testing.T) { + t.Parallel() + s, _ := newService(t, 1, libp2pServiceOpts{}) + s.SetTransportFlags(false, false, true) + + filtered := s.FilterSupportedAddresses(addrs) + // Only 4 WSS addresses + if len(filtered) != 4 { + t.Errorf("expected 4 addresses (WSS only), got %d", len(filtered)) + } + }) +} diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index aa2ec9cf3e5..eac2cc016f9 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -77,7 +77,6 @@ func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log lo } func (r *reacher) manage() { - defer r.wg.Done() c := make(chan *peer) @@ -180,6 +179,10 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { // Connected adds a new peer to the queue for testing reachability. func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) { + if addr == nil { + return + } + r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 9765268754b..95587e17f19 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -121,9 +121,9 @@ type Service struct { networkStatus atomic.Int32 HeadersRWTimeout time.Duration autoNAT autonat.AutoNAT - enableWS bool autoTLSCertManager autoTLSCertManager zapLogger *zap.Logger + enabledTransports map[bzz.TransportType]bool } type lightnodes interface { @@ -543,9 +543,13 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay lightNodes: lightNodes, HeadersRWTimeout: o.HeadersRWTimeout, autoNAT: autoNAT, - enableWS: o.EnableWS, autoTLSCertManager: certManager, zapLogger: zapLogger, + enabledTransports: map[bzz.TransportType]bool{ + bzz.TransportTCP: true, // TCP transport is always included + bzz.TransportWS: o.EnableWS, + bzz.TransportWSS: o.EnableWSS, + }, } peerRegistry.setDisconnecter(s) @@ -789,7 +793,7 @@ func (s *Service) handleIncoming(stream network.Stream) { return } - s.notifyReacherConnected(overlay, peerID) + s.notifyReacherConnected(overlay, peerMultiaddrs) peerUserAgent := appendSpace(s.peerUserAgent(s.ctx, peerID)) s.networkStatus.Store(int32(p2p.NetworkStatusAvailable)) @@ -798,23 +802,44 @@ func (s *Service) handleIncoming(stream network.Stream) { s.logger.Debug("stream handler: successfully connected to peer (inbound)", "address", i.BzzAddress.Overlay, "light", i.LightString(), "user_agent", peerUserAgent) } -func (s *Service) notifyReacherConnected(overlay swarm.Address, peerID libp2ppeer.ID) { - if s.reacher == nil { - return +// isTransportSupported checks if the given transport type is supported by this service. +func (s *Service) isTransportSupported(t bzz.TransportType) bool { + return s.enabledTransports[t] +} + +// filterSupportedAddresses filters multiaddresses to only include those +// that are supported by the available transports (TCP, WS, WSS). +func (s *Service) filterSupportedAddresses(addrs []ma.Multiaddr) []ma.Multiaddr { + if len(addrs) == 0 { + return addrs } - peerAddrs := s.host.Peerstore().Addrs(peerID) - bestAddr := bzz.SelectBestAdvertisedAddress(peerAddrs, nil) + filtered := make([]ma.Multiaddr, 0, len(addrs)) + for _, addr := range addrs { + if s.isTransportSupported(bzz.ClassifyTransport(addr)) { + filtered = append(filtered, addr) + } + } - s.logger.Debug("selected reacher address", "peer_id", peerID, "selected_addr", bestAddr.String(), "advertised_count", len(peerAddrs)) + return filtered +} - underlay, err := buildFullMA(bestAddr, peerID) - if err != nil { - s.logger.Error(err, "stream handler: unable to build complete peer multiaddr", "peer", overlay, "multiaddr", bestAddr, "peer_id", peerID) - _ = s.Disconnect(overlay, "unable to build complete peer multiaddr") +func (s *Service) notifyReacherConnected(overlay swarm.Address, underlays []ma.Multiaddr) { + if s.reacher == nil { + return + } + + filteredAddrs := s.filterSupportedAddresses(underlays) + if len(filteredAddrs) == 0 { + s.logger.Debug("no supported addresses for reacher", "overlay", overlay, "total", len(underlays)) return } - s.reacher.Connected(overlay, underlay) + + bestAddr := bzz.SelectBestAdvertisedAddress(filteredAddrs, nil) + + s.logger.Debug("selected reacher address", "overlay", overlay, "address", bestAddr, "filtered", len(filteredAddrs), "total", len(underlays)) + + s.reacher.Connected(overlay, bestAddr) } func (s *Service) SetPickyNotifier(n p2p.PickyNotifier) { @@ -988,19 +1013,19 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b err = s.determineCurrentNetworkStatus(err) }() + filteredAddrs := s.filterSupportedAddresses(addrs) + if len(filteredAddrs) == 0 { + s.logger.Debug("no supported addresses to connect", "total_addrs", len(addrs)) + return nil, p2p.ErrUnsupportedAddresses + } + var info *libp2ppeer.AddrInfo var peerID libp2ppeer.ID var connectErr error skippedSelf := false // Try to connect to each underlay address one by one. - // - // TODO: investigate the issue when AddrInfo with multiple underlay - // addresses for the same peer is passed to the host.Host.Connect function - // and reachabiltiy Private is emitted on libp2p EventBus(), which results - // in weaker connectivity and failures in some integration tests. - - for _, addr := range addrs { + for _, addr := range filteredAddrs { // Extract the peer ID from the multiaddr. ai, err := libp2ppeer.AddrInfoFromP2pAddr(addr) if err != nil { @@ -1059,7 +1084,7 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b } if info == nil { - return nil, fmt.Errorf("unable to identify peer from addresses: %v", addrs) + return nil, fmt.Errorf("unable to identify peer from addresses: %v", filteredAddrs) } stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName) @@ -1161,7 +1186,7 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b s.metrics.CreatedConnectionCount.Inc() - s.notifyReacherConnected(overlay, peerID) + s.notifyReacherConnected(overlay, peerMultiaddrs) peerUA := appendSpace(s.peerUserAgent(ctx, peerID)) loggerV1.Debug("successfully connected to peer (outbound)", "addresses", i.BzzAddress.ShortString(), "light", i.LightString(), "user_agent", peerUA) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index d4b28e8eb1b..7cfabd1ff27 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -430,21 +430,24 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, switch err = k.connect(ctx, peer.addr, bzzAddr.Underlays); { case errors.Is(err, p2p.ErrNetworkUnavailable): - k.logger.Debug("network unavailable when reaching peer", "peer_overlay_address", peer.addr, "peer_underlay_addresses", bzzAddr.Underlays) + k.logger.Debug("network unavailable when reaching peer", "peer_address", peer.addr, "peer_underlays", bzzAddr.Underlays) return case errors.Is(err, errPruneEntry): - k.logger.Debug("dial to light node", "peer_overlay_address", peer.addr, "peer_underlay_addresses", bzzAddr.Underlays) + k.logger.Debug("dial to light node", "peer_address", peer.addr, "peer_underlays", bzzAddr.Underlays) remove(peer) return case errors.Is(err, errOverlayMismatch): - k.logger.Debug("overlay mismatch has occurred", "peer_overlay_address", peer.addr, "peer_underlay_addresses", bzzAddr.Underlays) + k.logger.Debug("overlay mismatch has occurred", "peer_address", peer.addr, "peer_underlays", bzzAddr.Underlays) remove(peer) return case errors.Is(err, p2p.ErrPeerBlocklisted): - k.logger.Debug("peer still in blocklist", "peer_address", bzzAddr) + k.logger.Debug("peer still in blocklist", "peer_address", peer.addr, "peer_underlays", bzzAddr.Underlays) + return + case errors.Is(err, p2p.ErrUnsupportedAddresses): + k.logger.Debug("peer has no supported addresses", "peer_address", peer.addr, "peer_underlays", bzzAddr.Underlays) return case err != nil: - k.logger.Debug("peer not reachable from kademlia", "peer_address", bzzAddr, "error", err) + k.logger.Debug("peer not reachable from kademlia", "peer_address", peer.addr, "peer_underlays", bzzAddr.Underlays, "error", err) return } @@ -981,6 +984,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma []ma.Multiaddr return err case errors.Is(err, p2p.ErrPeerBlocklisted): return err + case errors.Is(err, p2p.ErrUnsupportedAddresses): + return err case err != nil: k.logger.Info("could not connect to peer", "peer_address", peer, "error", err)