diff --git a/pkg/p2p/libp2p/internal/reacher/heap.go b/pkg/p2p/libp2p/internal/reacher/heap.go new file mode 100644 index 00000000000..10277b3df47 --- /dev/null +++ b/pkg/p2p/libp2p/internal/reacher/heap.go @@ -0,0 +1,33 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package reacher + +// peerHeap is a min-heap of peers ordered by retryAfter time. +type peerHeap []*peer + +func (h peerHeap) Len() int { return len(h) } +func (h peerHeap) Less(i, j int) bool { return h[i].retryAfter.Before(h[j].retryAfter) } +func (h peerHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *peerHeap) Push(x any) { + n := len(*h) + p := x.(*peer) + p.index = n + *h = append(*h, p) +} + +func (h *peerHeap) Pop() any { + old := *h + n := len(old) + p := old[n-1] + old[n-1] = nil // avoid memory leak + p.index = -1 // for safety + *h = old[0 : n-1] + return p +} diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index aa2ec9cf3e5..d51a7aef377 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -7,6 +7,7 @@ package reacher import ( + "container/heap" "context" "sync" "time" @@ -27,11 +28,13 @@ type peer struct { overlay swarm.Address addr ma.Multiaddr retryAfter time.Time + index int // index in the heap } type reacher struct { - mu sync.Mutex - peers map[string]*peer + mu sync.Mutex + peerHeap peerHeap // min-heap ordered by retryAfter + peerIndex map[string]*peer // lookup by overlay for O(1) access newPeer chan struct{} quit chan struct{} @@ -53,12 +56,13 @@ type Options struct { func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log log.Logger) *reacher { r := &reacher{ - newPeer: make(chan struct{}, 1), - quit: make(chan struct{}), - pinger: streamer, - peers: make(map[string]*peer), - notifier: notifier, - logger: log.WithName("reacher").Register(), + newPeer: make(chan struct{}, 1), + quit: make(chan struct{}), + pinger: streamer, + peerHeap: make(peerHeap, 0), + peerIndex: make(map[string]*peer), + notifier: notifier, + logger: log.WithName("reacher").Register(), } if o == nil { @@ -80,7 +84,7 @@ func (r *reacher) manage() { defer r.wg.Done() - c := make(chan *peer) + c := make(chan peer) defer close(c) ctx, cancel := context.WithCancel(context.Background()) @@ -93,7 +97,7 @@ func (r *reacher) manage() { for { - p, tryAfter := r.tryAcquirePeer() + p, ok, tryAfter := r.tryAcquirePeer() // if no peer is returned, // wait until either more work or the closest retry-after time. @@ -111,7 +115,7 @@ func (r *reacher) manage() { } // wait for work - if p == nil { + if !ok { select { case <-r.quit: return @@ -129,7 +133,7 @@ func (r *reacher) manage() { } } -func (r *reacher) ping(c chan *peer, ctx context.Context) { +func (r *reacher) ping(c chan peer, ctx context.Context) { defer r.wg.Done() for p := range c { func() { @@ -147,44 +151,47 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { } } -func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { +func (r *reacher) tryAcquirePeer() (peer, bool, time.Duration) { r.mu.Lock() defer r.mu.Unlock() - var ( - now = time.Now() - nextClosest time.Time - ) + if len(r.peerHeap) == 0 { + return peer{}, false, 0 + } - for _, p := range r.peers { + now := time.Now() - // retry after has expired, retry - if now.After(p.retryAfter) { - p.retryAfter = time.Now().Add(r.options.RetryAfterDuration) - return p, 0 - } + // Peek at the peer with the earliest retryAfter + p := r.peerHeap[0] - // here, we find the peer with the earliest retry after - if nextClosest.IsZero() || p.retryAfter.Before(nextClosest) { - nextClosest = p.retryAfter - } + // If retryAfter has not expired, return time to wait + if now.Before(p.retryAfter) { + return peer{}, false, time.Until(p.retryAfter) } - if nextClosest.IsZero() { - return nil, 0 - } + // Update retryAfter and fix heap position + p.retryAfter = time.Now().Add(r.options.RetryAfterDuration) + heap.Fix(&r.peerHeap, p.index) - // return the time to wait until the closest retry after - return nil, time.Until(nextClosest) + // Return a copy so callers can read fields without holding the lock. + return *p, true, 0 } // Connected adds a new peer to the queue for testing reachability. +// If the peer already exists, its address is updated. func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) { r.mu.Lock() defer r.mu.Unlock() - if _, ok := r.peers[overlay.ByteString()]; !ok { - r.peers[overlay.ByteString()] = &peer{overlay: overlay, addr: addr} + key := overlay.ByteString() + if existing, ok := r.peerIndex[key]; ok { + existing.addr = addr // Update address for reconnecting peer + existing.retryAfter = time.Time{} // Reset to trigger immediate re-ping + heap.Fix(&r.peerHeap, existing.index) + } else { + p := &peer{overlay: overlay, addr: addr} + r.peerIndex[key] = p + heap.Push(&r.peerHeap, p) } select { @@ -198,7 +205,11 @@ func (r *reacher) Disconnected(overlay swarm.Address) { r.mu.Lock() defer r.mu.Unlock() - delete(r.peers, overlay.ByteString()) + key := overlay.ByteString() + if p, ok := r.peerIndex[key]; ok { + heap.Remove(&r.peerHeap, p.index) + delete(r.peerIndex, key) + } } // Close stops the worker. Must be called once. diff --git a/pkg/p2p/libp2p/internal/reacher/reacher_test.go b/pkg/p2p/libp2p/internal/reacher/reacher_test.go index 515e05eab88..e86442ed402 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher_test.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher_test.go @@ -7,6 +7,7 @@ package reacher_test import ( "context" "errors" + "sync" "testing" "time" @@ -127,6 +128,167 @@ func TestDisconnected(t *testing.T) { }) } +func TestAddressUpdateOnReconnect(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + // Use 1 worker and a known retry duration to make timing deterministic. + options := reacher.Options{ + PingTimeout: time.Second * 5, + Workers: 1, + RetryAfterDuration: time.Minute, + } + + overlay := swarm.RandAddress(t) + oldAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") + newAddr, _ := ma.NewMultiaddr("/ip4/192.168.1.1/tcp/7072/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") + + var pingsMu sync.Mutex + var pings []ma.Multiaddr + pinged := make(chan struct{}, 8) + + pingFunc := func(_ context.Context, a ma.Multiaddr) (time.Duration, error) { + pingsMu.Lock() + pings = append(pings, a) + pingsMu.Unlock() + pinged <- struct{}{} + return 0, nil + } + + reachableFunc := func(addr swarm.Address, status p2p.ReachabilityStatus) {} + + mock := newMock(pingFunc, reachableFunc) + + r := reacher.New(mock, mock, &options, log.Noop) + testutil.CleanupCloser(t, r) + + // First connection with old address – triggers initial ping. + r.Connected(overlay, oldAddr) + + select { + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for initial ping") + case <-pinged: + } + + // Verify old address was pinged first. + pingsMu.Lock() + if len(pings) != 1 { + t.Fatalf("expected 1 ping after initial connect, got %d", len(pings)) + } + if !pings[0].Equal(oldAddr) { + t.Fatalf("first ping should use old address, got %s", pings[0]) + } + pingsMu.Unlock() + + // Reconnect with a new address — should trigger immediate re-ping. + r.Connected(overlay, newAddr) + + select { + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for reconnect ping") + case <-pinged: + } + + // Verify the reconnect pinged the new address. + pingsMu.Lock() + if len(pings) != 2 { + t.Fatalf("expected 2 pings after reconnect, got %d", len(pings)) + } + if !pings[1].Equal(newAddr) { + t.Fatalf("reconnect ping should use new address, got %s", pings[1]) + } + pingsMu.Unlock() + + // Advance time past the retry duration — should trigger a scheduled re-ping. + time.Sleep(options.RetryAfterDuration + time.Second) + + select { + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for scheduled re-ping") + case <-pinged: + } + + // Verify the scheduled re-ping used the new address. + pingsMu.Lock() + if len(pings) != 3 { + t.Fatalf("expected 3 pings after retry duration, got %d", len(pings)) + } + if !pings[2].Equal(newAddr) { + t.Fatalf("scheduled re-ping should use new address, got %s", pings[2]) + } + pingsMu.Unlock() + }) +} + +func TestHeapOrdering(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + // Use single worker to ensure sequential processing + options := reacher.Options{ + PingTimeout: time.Second * 5, + Workers: 1, + RetryAfterDuration: time.Second * 10, + } + + var pingOrder []swarm.Address + var pingOrderMu sync.Mutex + allPinged := make(chan struct{}) + + overlay1 := swarm.RandAddress(t) + overlay2 := swarm.RandAddress(t) + overlay3 := swarm.RandAddress(t) + addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") + + pingFunc := func(_ context.Context, _ ma.Multiaddr) (time.Duration, error) { + return 0, nil + } + + reachableFunc := func(overlay swarm.Address, status p2p.ReachabilityStatus) { + pingOrderMu.Lock() + pingOrder = append(pingOrder, overlay) + if len(pingOrder) == 3 { + close(allPinged) + } + pingOrderMu.Unlock() + } + + mock := newMock(pingFunc, reachableFunc) + + r := reacher.New(mock, mock, &options, log.Noop) + testutil.CleanupCloser(t, r) + + // Add peers - they should all be pinged since retryAfter starts at zero + r.Connected(overlay1, addr) + r.Connected(overlay2, addr) + r.Connected(overlay3, addr) + + select { + case <-time.After(time.Second * 5): + t.Fatalf("test timed out, only %d peers pinged", len(pingOrder)) + case <-allPinged: + } + + // Verify all three peers were pinged + pingOrderMu.Lock() + defer pingOrderMu.Unlock() + + if len(pingOrder) != 3 { + t.Fatalf("expected 3 peers pinged, got %d", len(pingOrder)) + } + + // Verify all overlays are present (order may vary due to heap with same retryAfter) + seen := make(map[string]bool) + for _, o := range pingOrder { + seen[o.String()] = true + } + if !seen[overlay1.String()] || !seen[overlay2.String()] || !seen[overlay3.String()] { + t.Fatalf("not all peers were pinged") + } + }) +} + type mock struct { pingFunc func(context.Context, ma.Multiaddr) (time.Duration, error) reachableFunc func(swarm.Address, p2p.ReachabilityStatus)