Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions pkg/p2p/libp2p/internal/reacher/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package reacher

// peerHeap is a min-heap of peers ordered by retryAfter time.
type peerHeap []*peer

func (h peerHeap) Len() int { return len(h) }
func (h peerHeap) Less(i, j int) bool { return h[i].retryAfter.Before(h[j].retryAfter) }
func (h peerHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}

func (h *peerHeap) Push(x any) {
n := len(*h)
p := x.(*peer)
p.index = n
*h = append(*h, p)
}

func (h *peerHeap) Pop() any {
old := *h
n := len(old)
p := old[n-1]
old[n-1] = nil // avoid memory leak
p.index = -1 // for safety
*h = old[0 : n-1]
return p
}
81 changes: 46 additions & 35 deletions pkg/p2p/libp2p/internal/reacher/reacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package reacher

import (
"container/heap"
"context"
"sync"
"time"
Expand All @@ -27,11 +28,13 @@ type peer struct {
overlay swarm.Address
addr ma.Multiaddr
retryAfter time.Time
index int // index in the heap
}

type reacher struct {
mu sync.Mutex
peers map[string]*peer
mu sync.Mutex
peerHeap peerHeap // min-heap ordered by retryAfter
peerIndex map[string]*peer // lookup by overlay for O(1) access

newPeer chan struct{}
quit chan struct{}
Expand All @@ -53,12 +56,13 @@ type Options struct {

func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log log.Logger) *reacher {
r := &reacher{
newPeer: make(chan struct{}, 1),
quit: make(chan struct{}),
pinger: streamer,
peers: make(map[string]*peer),
notifier: notifier,
logger: log.WithName("reacher").Register(),
newPeer: make(chan struct{}, 1),
quit: make(chan struct{}),
pinger: streamer,
peerHeap: make(peerHeap, 0),
peerIndex: make(map[string]*peer),
notifier: notifier,
logger: log.WithName("reacher").Register(),
}

if o == nil {
Expand All @@ -80,7 +84,7 @@ func (r *reacher) manage() {

defer r.wg.Done()

c := make(chan *peer)
c := make(chan peer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and also bellow, why not using a pointer to a peer anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By sending a value copy through the channel, each ping goroutine gets its own independent snapshot of the peer data, which is safe to read concurrently

defer close(c)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -93,7 +97,7 @@ func (r *reacher) manage() {

for {

p, tryAfter := r.tryAcquirePeer()
p, ok, tryAfter := r.tryAcquirePeer()

// if no peer is returned,
// wait until either more work or the closest retry-after time.
Expand All @@ -111,7 +115,7 @@ func (r *reacher) manage() {
}

// wait for work
if p == nil {
if !ok {
select {
case <-r.quit:
return
Expand All @@ -129,7 +133,7 @@ func (r *reacher) manage() {
}
}

func (r *reacher) ping(c chan *peer, ctx context.Context) {
func (r *reacher) ping(c chan peer, ctx context.Context) {
defer r.wg.Done()
for p := range c {
func() {
Expand All @@ -147,44 +151,47 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
}
}

func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {
func (r *reacher) tryAcquirePeer() (peer, bool, time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()

var (
now = time.Now()
nextClosest time.Time
)
if len(r.peerHeap) == 0 {
return peer{}, false, 0
}

for _, p := range r.peers {
now := time.Now()

// retry after has expired, retry
if now.After(p.retryAfter) {
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
return p, 0
}
// Peek at the peer with the earliest retryAfter
p := r.peerHeap[0]

// here, we find the peer with the earliest retry after
if nextClosest.IsZero() || p.retryAfter.Before(nextClosest) {
nextClosest = p.retryAfter
}
// If retryAfter has not expired, return time to wait
if now.Before(p.retryAfter) {
return peer{}, false, time.Until(p.retryAfter)
}

if nextClosest.IsZero() {
return nil, 0
}
// Update retryAfter and fix heap position
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
heap.Fix(&r.peerHeap, p.index)

// return the time to wait until the closest retry after
return nil, time.Until(nextClosest)
// Return a copy so callers can read fields without holding the lock.
return *p, true, 0
}

// Connected adds a new peer to the queue for testing reachability.
// If the peer already exists, its address is updated.
func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.peers[overlay.ByteString()]; !ok {
r.peers[overlay.ByteString()] = &peer{overlay: overlay, addr: addr}
key := overlay.ByteString()
if existing, ok := r.peerIndex[key]; ok {
existing.addr = addr // Update address for reconnecting peer
existing.retryAfter = time.Time{} // Reset to trigger immediate re-ping
heap.Fix(&r.peerHeap, existing.index)
} else {
p := &peer{overlay: overlay, addr: addr}
r.peerIndex[key] = p
heap.Push(&r.peerHeap, p)
}

select {
Expand All @@ -198,7 +205,11 @@ func (r *reacher) Disconnected(overlay swarm.Address) {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.peers, overlay.ByteString())
key := overlay.ByteString()
if p, ok := r.peerIndex[key]; ok {
heap.Remove(&r.peerHeap, p.index)
delete(r.peerIndex, key)
}
}

// Close stops the worker. Must be called once.
Expand Down
162 changes: 162 additions & 0 deletions pkg/p2p/libp2p/internal/reacher/reacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package reacher_test
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -127,6 +128,167 @@ func TestDisconnected(t *testing.T) {
})
}

func TestAddressUpdateOnReconnect(t *testing.T) {
t.Parallel()

synctest.Test(t, func(t *testing.T) {
// Use 1 worker and a known retry duration to make timing deterministic.
options := reacher.Options{
PingTimeout: time.Second * 5,
Workers: 1,
RetryAfterDuration: time.Minute,
}

overlay := swarm.RandAddress(t)
oldAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb")
newAddr, _ := ma.NewMultiaddr("/ip4/192.168.1.1/tcp/7072/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb")

var pingsMu sync.Mutex
var pings []ma.Multiaddr
pinged := make(chan struct{}, 8)

pingFunc := func(_ context.Context, a ma.Multiaddr) (time.Duration, error) {
pingsMu.Lock()
pings = append(pings, a)
pingsMu.Unlock()
pinged <- struct{}{}
return 0, nil
}

reachableFunc := func(addr swarm.Address, status p2p.ReachabilityStatus) {}

mock := newMock(pingFunc, reachableFunc)

r := reacher.New(mock, mock, &options, log.Noop)
testutil.CleanupCloser(t, r)

// First connection with old address – triggers initial ping.
r.Connected(overlay, oldAddr)

select {
case <-time.After(time.Second * 10):
t.Fatal("timed out waiting for initial ping")
case <-pinged:
}

// Verify old address was pinged first.
pingsMu.Lock()
if len(pings) != 1 {
t.Fatalf("expected 1 ping after initial connect, got %d", len(pings))
}
if !pings[0].Equal(oldAddr) {
t.Fatalf("first ping should use old address, got %s", pings[0])
}
pingsMu.Unlock()

// Reconnect with a new address — should trigger immediate re-ping.
r.Connected(overlay, newAddr)

select {
case <-time.After(time.Second * 10):
t.Fatal("timed out waiting for reconnect ping")
case <-pinged:
}

// Verify the reconnect pinged the new address.
pingsMu.Lock()
if len(pings) != 2 {
t.Fatalf("expected 2 pings after reconnect, got %d", len(pings))
}
if !pings[1].Equal(newAddr) {
t.Fatalf("reconnect ping should use new address, got %s", pings[1])
}
pingsMu.Unlock()

// Advance time past the retry duration — should trigger a scheduled re-ping.
time.Sleep(options.RetryAfterDuration + time.Second)

select {
case <-time.After(time.Second * 10):
t.Fatal("timed out waiting for scheduled re-ping")
case <-pinged:
}

// Verify the scheduled re-ping used the new address.
pingsMu.Lock()
if len(pings) != 3 {
t.Fatalf("expected 3 pings after retry duration, got %d", len(pings))
}
if !pings[2].Equal(newAddr) {
t.Fatalf("scheduled re-ping should use new address, got %s", pings[2])
}
pingsMu.Unlock()
})
}

func TestHeapOrdering(t *testing.T) {
t.Parallel()

synctest.Test(t, func(t *testing.T) {
// Use single worker to ensure sequential processing
options := reacher.Options{
PingTimeout: time.Second * 5,
Workers: 1,
RetryAfterDuration: time.Second * 10,
}

var pingOrder []swarm.Address
var pingOrderMu sync.Mutex
allPinged := make(chan struct{})

overlay1 := swarm.RandAddress(t)
overlay2 := swarm.RandAddress(t)
overlay3 := swarm.RandAddress(t)
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb")

pingFunc := func(_ context.Context, _ ma.Multiaddr) (time.Duration, error) {
return 0, nil
}

reachableFunc := func(overlay swarm.Address, status p2p.ReachabilityStatus) {
pingOrderMu.Lock()
pingOrder = append(pingOrder, overlay)
if len(pingOrder) == 3 {
close(allPinged)
}
pingOrderMu.Unlock()
}

mock := newMock(pingFunc, reachableFunc)

r := reacher.New(mock, mock, &options, log.Noop)
testutil.CleanupCloser(t, r)

// Add peers - they should all be pinged since retryAfter starts at zero
r.Connected(overlay1, addr)
r.Connected(overlay2, addr)
r.Connected(overlay3, addr)

select {
case <-time.After(time.Second * 5):
t.Fatalf("test timed out, only %d peers pinged", len(pingOrder))
case <-allPinged:
}

// Verify all three peers were pinged
pingOrderMu.Lock()
defer pingOrderMu.Unlock()

if len(pingOrder) != 3 {
t.Fatalf("expected 3 peers pinged, got %d", len(pingOrder))
}

// Verify all overlays are present (order may vary due to heap with same retryAfter)
seen := make(map[string]bool)
for _, o := range pingOrder {
seen[o.String()] = true
}
if !seen[overlay1.String()] || !seen[overlay2.String()] || !seen[overlay3.String()] {
t.Fatalf("not all peers were pinged")
}
})
}

type mock struct {
pingFunc func(context.Context, ma.Multiaddr) (time.Duration, error)
reachableFunc func(swarm.Address, p2p.ReachabilityStatus)
Expand Down
Loading