From b376d4f50fcc02e987dc7d5973f44b8eb4ac7229 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Thu, 5 Feb 2026 13:30:21 +0100 Subject: [PATCH 1/6] fix(p2p): update peer address on reconnect and optimize reacher --- pkg/p2p/libp2p/internal/reacher/reacher.go | 96 ++++++++++----- .../libp2p/internal/reacher/reacher_test.go | 110 ++++++++++++++++++ 2 files changed, 176 insertions(+), 30 deletions(-) diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index aa2ec9cf3e5..c4007805509 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -7,6 +7,7 @@ package reacher import ( + "container/heap" "context" "sync" "time" @@ -27,11 +28,41 @@ type peer struct { overlay swarm.Address addr ma.Multiaddr retryAfter time.Time + index int // index in the heap +} + +// peerHeap is a min-heap of peers ordered by retryAfter time. +type peerHeap []*peer + +func (h peerHeap) Len() int { return len(h) } +func (h peerHeap) Less(i, j int) bool { return h[i].retryAfter.Before(h[j].retryAfter) } +func (h peerHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *peerHeap) Push(x any) { + n := len(*h) + p := x.(*peer) + p.index = n + *h = append(*h, p) +} + +func (h *peerHeap) Pop() any { + old := *h + n := len(old) + p := old[n-1] + old[n-1] = nil // avoid memory leak + p.index = -1 // for safety + *h = old[0 : n-1] + return p } type reacher struct { - mu sync.Mutex - peers map[string]*peer + mu sync.Mutex + peerHeap peerHeap // min-heap ordered by retryAfter + peerIndex map[string]*peer // lookup by overlay for O(1) access newPeer chan struct{} quit chan struct{} @@ -53,12 +84,13 @@ type Options struct { func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log log.Logger) *reacher { r := &reacher{ - newPeer: make(chan struct{}, 1), - quit: make(chan struct{}), - pinger: streamer, - peers: make(map[string]*peer), - notifier: notifier, - logger: log.WithName("reacher").Register(), + newPeer: make(chan struct{}, 1), + quit: make(chan struct{}), + pinger: streamer, + peerHeap: make(peerHeap, 0), + peerIndex: make(map[string]*peer), + notifier: notifier, + logger: log.WithName("reacher").Register(), } if o == nil { @@ -151,40 +183,40 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { r.mu.Lock() defer r.mu.Unlock() - var ( - now = time.Now() - nextClosest time.Time - ) + if len(r.peerHeap) == 0 { + return nil, 0 + } - for _, p := range r.peers { + now := time.Now() - // retry after has expired, retry - if now.After(p.retryAfter) { - p.retryAfter = time.Now().Add(r.options.RetryAfterDuration) - return p, 0 - } + // Peek at the peer with the earliest retryAfter + p := r.peerHeap[0] - // here, we find the peer with the earliest retry after - if nextClosest.IsZero() || p.retryAfter.Before(nextClosest) { - nextClosest = p.retryAfter - } + // If retryAfter has not expired, return time to wait + if now.Before(p.retryAfter) { + return nil, time.Until(p.retryAfter) } - if nextClosest.IsZero() { - return nil, 0 - } + // Update retryAfter and fix heap position + p.retryAfter = time.Now().Add(r.options.RetryAfterDuration) + heap.Fix(&r.peerHeap, p.index) - // return the time to wait until the closest retry after - return nil, time.Until(nextClosest) + return p, 0 } // Connected adds a new peer to the queue for testing reachability. +// If the peer already exists, its address is updated. func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) { r.mu.Lock() defer r.mu.Unlock() - if _, ok := r.peers[overlay.ByteString()]; !ok { - r.peers[overlay.ByteString()] = &peer{overlay: overlay, addr: addr} + key := overlay.ByteString() + if existing, ok := r.peerIndex[key]; ok { + existing.addr = addr // Update address for reconnecting peer + } else { + p := &peer{overlay: overlay, addr: addr} + r.peerIndex[key] = p + heap.Push(&r.peerHeap, p) } select { @@ -198,7 +230,11 @@ func (r *reacher) Disconnected(overlay swarm.Address) { r.mu.Lock() defer r.mu.Unlock() - delete(r.peers, overlay.ByteString()) + key := overlay.ByteString() + if p, ok := r.peerIndex[key]; ok { + heap.Remove(&r.peerHeap, p.index) + delete(r.peerIndex, key) + } } // Close stops the worker. Must be called once. diff --git a/pkg/p2p/libp2p/internal/reacher/reacher_test.go b/pkg/p2p/libp2p/internal/reacher/reacher_test.go index 515e05eab88..29e6aa293f9 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher_test.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher_test.go @@ -7,6 +7,7 @@ package reacher_test import ( "context" "errors" + "sync" "testing" "time" @@ -127,6 +128,115 @@ func TestDisconnected(t *testing.T) { }) } +func TestAddressUpdateOnReconnect(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + done := make(chan struct{}) + + overlay := swarm.RandAddress(t) + oldAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") + newAddr, _ := ma.NewMultiaddr("/ip4/192.168.1.1/tcp/7072/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") + + pingFunc := func(_ context.Context, a ma.Multiaddr) (time.Duration, error) { + // Verify that the new address is being pinged, not the old one + if a.Equal(oldAddr) { + t.Fatalf("ping should use updated address, got old address") + } + if a.Equal(newAddr) { + done <- struct{}{} + } + return 0, nil + } + + reachableFunc := func(addr swarm.Address, status p2p.ReachabilityStatus) {} + + mock := newMock(pingFunc, reachableFunc) + + r := reacher.New(mock, mock, &defaultOptions, log.Noop) + testutil.CleanupCloser(t, r) + + // First connection with old address + r.Connected(overlay, oldAddr) + // Immediate reconnection with new address (simulates peer reconnecting with different IP) + r.Connected(overlay, newAddr) + + select { + case <-time.After(time.Second * 5): + t.Fatalf("test timed out") + case <-done: + } + }) +} + +func TestHeapOrdering(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + // Use single worker to ensure sequential processing + options := reacher.Options{ + PingTimeout: time.Second * 5, + Workers: 1, + RetryAfterDuration: time.Second * 10, + } + + var pingOrder []swarm.Address + var pingOrderMu sync.Mutex + allPinged := make(chan struct{}) + + overlay1 := swarm.RandAddress(t) + overlay2 := swarm.RandAddress(t) + overlay3 := swarm.RandAddress(t) + addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") + + pingFunc := func(_ context.Context, _ ma.Multiaddr) (time.Duration, error) { + return 0, nil + } + + reachableFunc := func(overlay swarm.Address, status p2p.ReachabilityStatus) { + pingOrderMu.Lock() + pingOrder = append(pingOrder, overlay) + if len(pingOrder) == 3 { + close(allPinged) + } + pingOrderMu.Unlock() + } + + mock := newMock(pingFunc, reachableFunc) + + r := reacher.New(mock, mock, &options, log.Noop) + testutil.CleanupCloser(t, r) + + // Add peers - they should all be pinged since retryAfter starts at zero + r.Connected(overlay1, addr) + r.Connected(overlay2, addr) + r.Connected(overlay3, addr) + + select { + case <-time.After(time.Second * 5): + t.Fatalf("test timed out, only %d peers pinged", len(pingOrder)) + case <-allPinged: + } + + // Verify all three peers were pinged + pingOrderMu.Lock() + defer pingOrderMu.Unlock() + + if len(pingOrder) != 3 { + t.Fatalf("expected 3 peers pinged, got %d", len(pingOrder)) + } + + // Verify all overlays are present (order may vary due to heap with same retryAfter) + seen := make(map[string]bool) + for _, o := range pingOrder { + seen[o.String()] = true + } + if !seen[overlay1.String()] || !seen[overlay2.String()] || !seen[overlay3.String()] { + t.Fatalf("not all peers were pinged") + } + }) +} + type mock struct { pingFunc func(context.Context, ma.Multiaddr) (time.Duration, error) reachableFunc func(swarm.Address, p2p.ReachabilityStatus) From b22d2342e67dbf356b0b3e5e9f3ba3dd619ca03a Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 9 Feb 2026 13:55:29 +0100 Subject: [PATCH 2/6] fix(p2p): update peer underlay on reconnect and optimize reacher scheduling --- pkg/p2p/libp2p/internal/reacher/reacher.go | 4 +- .../libp2p/internal/reacher/reacher_test.go | 80 +++++++++++++++---- 2 files changed, 69 insertions(+), 15 deletions(-) diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index c4007805509..230c3e42d4b 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -212,7 +212,9 @@ func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) { key := overlay.ByteString() if existing, ok := r.peerIndex[key]; ok { - existing.addr = addr // Update address for reconnecting peer + existing.addr = addr // Update address for reconnecting peer + existing.retryAfter = time.Time{} // Reset to trigger immediate re-ping + heap.Fix(&r.peerHeap, existing.index) } else { p := &peer{overlay: overlay, addr: addr} r.peerIndex[key] = p diff --git a/pkg/p2p/libp2p/internal/reacher/reacher_test.go b/pkg/p2p/libp2p/internal/reacher/reacher_test.go index 29e6aa293f9..e86442ed402 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher_test.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher_test.go @@ -132,20 +132,26 @@ func TestAddressUpdateOnReconnect(t *testing.T) { t.Parallel() synctest.Test(t, func(t *testing.T) { - done := make(chan struct{}) + // Use 1 worker and a known retry duration to make timing deterministic. + options := reacher.Options{ + PingTimeout: time.Second * 5, + Workers: 1, + RetryAfterDuration: time.Minute, + } overlay := swarm.RandAddress(t) oldAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") newAddr, _ := ma.NewMultiaddr("/ip4/192.168.1.1/tcp/7072/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb") + var pingsMu sync.Mutex + var pings []ma.Multiaddr + pinged := make(chan struct{}, 8) + pingFunc := func(_ context.Context, a ma.Multiaddr) (time.Duration, error) { - // Verify that the new address is being pinged, not the old one - if a.Equal(oldAddr) { - t.Fatalf("ping should use updated address, got old address") - } - if a.Equal(newAddr) { - done <- struct{}{} - } + pingsMu.Lock() + pings = append(pings, a) + pingsMu.Unlock() + pinged <- struct{}{} return 0, nil } @@ -153,19 +159,65 @@ func TestAddressUpdateOnReconnect(t *testing.T) { mock := newMock(pingFunc, reachableFunc) - r := reacher.New(mock, mock, &defaultOptions, log.Noop) + r := reacher.New(mock, mock, &options, log.Noop) testutil.CleanupCloser(t, r) - // First connection with old address + // First connection with old address – triggers initial ping. r.Connected(overlay, oldAddr) - // Immediate reconnection with new address (simulates peer reconnecting with different IP) + + select { + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for initial ping") + case <-pinged: + } + + // Verify old address was pinged first. + pingsMu.Lock() + if len(pings) != 1 { + t.Fatalf("expected 1 ping after initial connect, got %d", len(pings)) + } + if !pings[0].Equal(oldAddr) { + t.Fatalf("first ping should use old address, got %s", pings[0]) + } + pingsMu.Unlock() + + // Reconnect with a new address — should trigger immediate re-ping. r.Connected(overlay, newAddr) select { - case <-time.After(time.Second * 5): - t.Fatalf("test timed out") - case <-done: + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for reconnect ping") + case <-pinged: + } + + // Verify the reconnect pinged the new address. + pingsMu.Lock() + if len(pings) != 2 { + t.Fatalf("expected 2 pings after reconnect, got %d", len(pings)) + } + if !pings[1].Equal(newAddr) { + t.Fatalf("reconnect ping should use new address, got %s", pings[1]) + } + pingsMu.Unlock() + + // Advance time past the retry duration — should trigger a scheduled re-ping. + time.Sleep(options.RetryAfterDuration + time.Second) + + select { + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for scheduled re-ping") + case <-pinged: + } + + // Verify the scheduled re-ping used the new address. + pingsMu.Lock() + if len(pings) != 3 { + t.Fatalf("expected 3 pings after retry duration, got %d", len(pings)) + } + if !pings[2].Equal(newAddr) { + t.Fatalf("scheduled re-ping should use new address, got %s", pings[2]) } + pingsMu.Unlock() }) } From 8f309fabe626bf37add48ea1522501b55771ecde Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 9 Feb 2026 14:21:14 +0100 Subject: [PATCH 3/6] fix(p2p): update peer address on reconnect and optimize reacher --- pkg/p2p/libp2p/internal/reacher/reacher.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index 230c3e42d4b..5e915a7c121 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -112,7 +112,7 @@ func (r *reacher) manage() { defer r.wg.Done() - c := make(chan *peer) + c := make(chan peer) defer close(c) ctx, cancel := context.WithCancel(context.Background()) @@ -125,7 +125,7 @@ func (r *reacher) manage() { for { - p, tryAfter := r.tryAcquirePeer() + p, ok, tryAfter := r.tryAcquirePeer() // if no peer is returned, // wait until either more work or the closest retry-after time. @@ -143,7 +143,7 @@ func (r *reacher) manage() { } // wait for work - if p == nil { + if !ok { select { case <-r.quit: return @@ -161,7 +161,7 @@ func (r *reacher) manage() { } } -func (r *reacher) ping(c chan *peer, ctx context.Context) { +func (r *reacher) ping(c chan peer, ctx context.Context) { defer r.wg.Done() for p := range c { func() { @@ -179,12 +179,12 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { } } -func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { +func (r *reacher) tryAcquirePeer() (peer, bool, time.Duration) { r.mu.Lock() defer r.mu.Unlock() if len(r.peerHeap) == 0 { - return nil, 0 + return peer{}, false, 0 } now := time.Now() @@ -194,14 +194,15 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { // If retryAfter has not expired, return time to wait if now.Before(p.retryAfter) { - return nil, time.Until(p.retryAfter) + return peer{}, false, time.Until(p.retryAfter) } // Update retryAfter and fix heap position p.retryAfter = time.Now().Add(r.options.RetryAfterDuration) heap.Fix(&r.peerHeap, p.index) - return p, 0 + // Return a copy so callers can read fields without holding the lock. + return *p, true, 0 } // Connected adds a new peer to the queue for testing reachability. From 64e0bc4c00a3ea3b22fef03889ebaa7be5d6ce45 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 13 Feb 2026 17:47:12 +0100 Subject: [PATCH 4/6] refactor(p2p): move peer heap to separate file in reacher package --- pkg/p2p/libp2p/internal/reacher/heap.go | 29 ++++++++++++++++++++++ pkg/p2p/libp2p/internal/reacher/reacher.go | 28 --------------------- 2 files changed, 29 insertions(+), 28 deletions(-) create mode 100644 pkg/p2p/libp2p/internal/reacher/heap.go diff --git a/pkg/p2p/libp2p/internal/reacher/heap.go b/pkg/p2p/libp2p/internal/reacher/heap.go new file mode 100644 index 00000000000..9cb6ef2a9a5 --- /dev/null +++ b/pkg/p2p/libp2p/internal/reacher/heap.go @@ -0,0 +1,29 @@ +package reacher + +// peerHeap is a min-heap of peers ordered by retryAfter time. +type peerHeap []*peer + +func (h peerHeap) Len() int { return len(h) } +func (h peerHeap) Less(i, j int) bool { return h[i].retryAfter.Before(h[j].retryAfter) } +func (h peerHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *peerHeap) Push(x any) { + n := len(*h) + p := x.(*peer) + p.index = n + *h = append(*h, p) +} + +func (h *peerHeap) Pop() any { + old := *h + n := len(old) + p := old[n-1] + old[n-1] = nil // avoid memory leak + p.index = -1 // for safety + *h = old[0 : n-1] + return p +} diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index 5e915a7c121..d51a7aef377 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -31,34 +31,6 @@ type peer struct { index int // index in the heap } -// peerHeap is a min-heap of peers ordered by retryAfter time. -type peerHeap []*peer - -func (h peerHeap) Len() int { return len(h) } -func (h peerHeap) Less(i, j int) bool { return h[i].retryAfter.Before(h[j].retryAfter) } -func (h peerHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] - h[i].index = i - h[j].index = j -} - -func (h *peerHeap) Push(x any) { - n := len(*h) - p := x.(*peer) - p.index = n - *h = append(*h, p) -} - -func (h *peerHeap) Pop() any { - old := *h - n := len(old) - p := old[n-1] - old[n-1] = nil // avoid memory leak - p.index = -1 // for safety - *h = old[0 : n-1] - return p -} - type reacher struct { mu sync.Mutex peerHeap peerHeap // min-heap ordered by retryAfter From 73e102a62718dcf7ae461f6b057436d8ca58d13e Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 13 Feb 2026 17:56:10 +0100 Subject: [PATCH 5/6] fix(p2p): add header to heap.go in reacher package --- pkg/p2p/libp2p/internal/reacher/heap.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/p2p/libp2p/internal/reacher/heap.go b/pkg/p2p/libp2p/internal/reacher/heap.go index 9cb6ef2a9a5..60727c84933 100644 --- a/pkg/p2p/libp2p/internal/reacher/heap.go +++ b/pkg/p2p/libp2p/internal/reacher/heap.go @@ -1,3 +1,9 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package reacher runs a background worker that will ping peers +// from an internal queue and report back the reachability to the notifier. package reacher // peerHeap is a min-heap of peers ordered by retryAfter time. From 59da0bfd17b1816ecf11da69e79f3ce8a398507c Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 13 Feb 2026 17:56:41 +0100 Subject: [PATCH 6/6] fix(p2p): add header to heap.go in reacher package --- pkg/p2p/libp2p/internal/reacher/heap.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/p2p/libp2p/internal/reacher/heap.go b/pkg/p2p/libp2p/internal/reacher/heap.go index 60727c84933..10277b3df47 100644 --- a/pkg/p2p/libp2p/internal/reacher/heap.go +++ b/pkg/p2p/libp2p/internal/reacher/heap.go @@ -2,8 +2,6 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Package reacher runs a background worker that will ping peers -// from an internal queue and report back the reachability to the notifier. package reacher // peerHeap is a min-heap of peers ordered by retryAfter time.