From 98b5f1267530ed2eba9c9f538054528464bd11cc Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 19 Aug 2025 14:41:28 +0200 Subject: [PATCH 01/23] feat: uniqueness depth and pull syncing --- pkg/puller/puller.go | 77 ++++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 27 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 2fdbc24e9cf..b893fdd2515 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -40,8 +40,6 @@ const ( recalcPeersDur = time.Minute * 5 maxChunksPerSecond = 1000 // roughly 4 MB/s - - maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing. ) type Options struct { @@ -157,10 +155,14 @@ func (p *Puller) manage(ctx context.Context) { // peersDisconnected is used to mark and prune peers that are no longer connected. peersDisconnected := maps.Clone(p.syncPeers) + var neighbors []swarm.Address _ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { if _, ok := p.syncPeers[addr.ByteString()]; !ok { p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po) } + if po >= newRadius { + neighbors = append(neighbors, addr) + } delete(peersDisconnected, addr.ByteString()) return false, false, nil }, topology.Select{}) @@ -169,6 +171,28 @@ func (p *Puller) manage(ctx context.Context) { p.disconnectPeer(peer.address) } + // minUd := uint8(255) + for i, target := range neighbors { + maxUd := uint8(0) + + // find the uniqueness depth, within which they are the only peer in the set + for j, neighbor := range neighbors { + if i == j { + continue + } + + ud := swarm.Proximity(target.Bytes(), neighbor.Bytes()) + 1 + if ud > maxUd { + maxUd = ud + } + } + p.syncPeers[target.ByteString()].ud = int8(maxUd) + + // if maxUd < minUd { + // minUd = maxUd + // } + } + p.recalcPeers(ctx, newRadius) } @@ -216,14 +240,23 @@ func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err) } }(peer) + // TODO: sync bins above or equal to storageRadius not covered by syncpeer } wg.Wait() } func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error { + if peer.ud == -1 { // not neighbor + peer.stop() + return nil + } peer.mtx.Lock() defer peer.mtx.Unlock() + // If the peer's epoch has changed (indicating a reserve reset or storage change on the peer): + // - Cancel all ongoing bin syncs for this peer. + // - Reset all previously synced intervals for this peer (to force a fresh sync). + // This guarantees that sync state is consistent with the peer's current reserve, and avoids pulling stale or irrelevant data. if peer.cursors == nil { cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address) if err != nil { @@ -257,6 +290,11 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin return errCursorsLength } + // sync PO bin only + if !peer.isBinSyncing(peer.po) { + p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po]) + } + /* The syncing behavior diverges for peers outside and within the storage radius. For neighbor peers, we sync ALL bins greater than or equal to the storage radius. @@ -265,33 +303,16 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin no syncing is done. */ - if peer.po >= storageRadius { - - // cancel all bins lower than the storage radius - for bin := uint8(0); bin < storageRadius; bin++ { - peer.cancelBin(bin) - } - - // sync all bins >= storage radius - for bin, cur := range peer.cursors { - if bin >= int(storageRadius) && !peer.isBinSyncing(uint8(bin)) { - p.syncPeerBin(ctx, peer, uint8(bin), cur) - } - } + // cancel all bins lower than the storage radius + for bin := uint8(0); bin < storageRadius; bin++ { + peer.cancelBin(bin) + } - } else if storageRadius-peer.po <= maxPODelta { - // cancel all non-po bins, if any - for bin := uint8(0); bin < p.bins; bin++ { - if bin != peer.po { - peer.cancelBin(bin) - } + // sync all bins >= uniqueness depth or peer PO equals to bin + for bin, cur := range peer.cursors { + if (bin >= int(peer.ud) || bin == int(peer.po)) && !peer.isBinSyncing(uint8(bin)) { + p.syncPeerBin(ctx, peer, uint8(bin), cur) } - // sync PO bin only - if !peer.isBinSyncing(peer.po) { - p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po]) - } - } else { - peer.stop() } return nil @@ -540,6 +561,7 @@ type syncPeer struct { address swarm.Address binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin po uint8 + ud int8 // uniqueness depth (-1 if not neighbor) cursors []uint64 mtx sync.Mutex @@ -551,6 +573,7 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { address: addr, binCancelFuncs: make(map[uint8]func(), bins), po: po, + ud: -1, // calculated later when all neighbors are collected } } From 792b4241bdb1b6b9eff6df5e757f453f7db5a90c Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 19 Aug 2025 18:16:21 +0200 Subject: [PATCH 02/23] fix: potential race issue because of calling stop --- pkg/puller/puller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index b893fdd2515..9b8c9b4965f 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -246,13 +246,13 @@ func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { } func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error { + peer.mtx.Lock() + defer peer.mtx.Unlock() + if peer.ud == -1 { // not neighbor peer.stop() return nil } - peer.mtx.Lock() - defer peer.mtx.Unlock() - // If the peer's epoch has changed (indicating a reserve reset or storage change on the peer): // - Cancel all ongoing bin syncs for this peer. // - Reset all previously synced intervals for this peer (to force a fresh sync). From dcb4797ab0392c48e6cbf2ebcd5b7689efb7e48f Mon Sep 17 00:00:00 2001 From: nugaon Date: Fri, 22 Aug 2025 18:46:45 +0200 Subject: [PATCH 03/23] fix: reset ud on top change --- pkg/puller/puller.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 9b8c9b4965f..2630e123a1c 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -157,8 +157,10 @@ func (p *Puller) manage(ctx context.Context) { var neighbors []swarm.Address _ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { - if _, ok := p.syncPeers[addr.ByteString()]; !ok { + if syncPeer, ok := p.syncPeers[addr.ByteString()]; !ok { p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po) + } else { + syncPeer.ud = -1 } if po >= newRadius { neighbors = append(neighbors, addr) From e8ab11286ad5ab8d0ccfbccc8531a38f3054e356 Mon Sep 17 00:00:00 2001 From: nugaon Date: Fri, 22 Aug 2025 18:47:15 +0200 Subject: [PATCH 04/23] test: should not sync if not neighbor --- pkg/puller/puller_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 29687138b9c..191b379dc3c 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -462,10 +462,7 @@ func TestRadiusIncrease(t *testing.T) { rs.SetStorageRadius(2) kad.Trigger() time.Sleep(100 * time.Millisecond) - if !p.IsBinSyncing(addr, 1) { - t.Fatalf("peer is not syncing but should") - } - if p.IsBinSyncing(addr, 2) { + if p.IsBinSyncing(addr, 1) || p.IsBinSyncing(addr, 2) || p.IsBinSyncing(addr, 3) { t.Fatalf("peer is syncing but shouldn't") } } From fce1ee0433a888c450186737e80c88feaa518060 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 26 Aug 2025 14:54:11 +0200 Subject: [PATCH 05/23] test: check whether peer is syncing for real --- pkg/puller/export_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/puller/export_test.go b/pkg/puller/export_test.go index b57ae396b3f..e980791618d 100644 --- a/pkg/puller/export_test.go +++ b/pkg/puller/export_test.go @@ -11,8 +11,16 @@ var PeerIntervalKey = peerIntervalKey func (p *Puller) IsSyncing(addr swarm.Address) bool { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock() - _, ok := p.syncPeers[addr.ByteString()] - return ok + peer, ok := p.syncPeers[addr.ByteString()] + if !ok { + return false + } + for bin := range p.bins { + if peer.isBinSyncing(bin) { + return true + } + } + return false } func (p *Puller) IsBinSyncing(addr swarm.Address, bin uint8) bool { From 3a57f37a74970186ecdf839e5b696c14d14ad29d Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 26 Aug 2025 14:55:22 +0200 Subject: [PATCH 06/23] test: puller adjustments --- pkg/puller/puller_test.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 191b379dc3c..c3cb4ab5149 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -71,7 +71,7 @@ func TestSyncOutsideDepth(t *testing.T) { } ) - _, _, kad, pullsync := newPuller(t, opts{ + p, _, kad, pullsync := newPuller(t, opts{ kad: []kadMock.Option{ kadMock.WithEachPeerRevCalls( kadMock.AddrTuple{Addr: addr, PO: 2}, @@ -86,11 +86,16 @@ func TestSyncOutsideDepth(t *testing.T) { time.Sleep(100 * time.Millisecond) kad.Trigger() + if !p.IsSyncing(addr) { + t.Fatalf("peer is not syncing but should") + } + if p.IsSyncing(addr2) { + t.Fatalf("peer is syncing but shouldn't") // because not neighbor + } + waitCursorsCalled(t, pullsync, addr) - waitCursorsCalled(t, pullsync, addr2) waitSyncCalledBins(t, pullsync, addr, 2, 3) - waitSyncCalledBins(t, pullsync, addr2, 0) } func TestSyncIntervals(t *testing.T) { @@ -212,7 +217,7 @@ func TestPeerDisconnected(t *testing.T) { p, _, kad, pullsync := newPuller(t, opts{ kad: []kadMock.Option{ kadMock.WithEachPeerRevCalls( - kadMock.AddrTuple{Addr: addr, PO: 1}, + kadMock.AddrTuple{Addr: addr, PO: 2}, ), }, pullSync: []mockps.Option{mockps.WithCursors(cursors, 0)}, @@ -222,10 +227,10 @@ func TestPeerDisconnected(t *testing.T) { time.Sleep(100 * time.Millisecond) kad.Trigger() - waitCursorsCalled(t, pullsync, addr) if !p.IsSyncing(addr) { t.Fatalf("peer is not syncing but should") } + waitCursorsCalled(t, pullsync, addr) kad.ResetPeers() kad.Trigger() time.Sleep(50 * time.Millisecond) @@ -298,10 +303,10 @@ func TestBinReset(t *testing.T) { cursors = []uint64{1000, 1000, 1000} ) - _, s, kad, pullsync := newPuller(t, opts{ + p, s, kad, pullsync := newPuller(t, opts{ kad: []kadMock.Option{ kadMock.WithEachPeerRevCalls( - kadMock.AddrTuple{Addr: addr, PO: 1}, + kadMock.AddrTuple{Addr: addr, PO: 2}, ), }, pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(mockps.SyncReply{Bin: 1, Start: 1, Topmost: 1, Peer: addr})}, @@ -313,6 +318,9 @@ func TestBinReset(t *testing.T) { kad.Trigger() + if !p.IsSyncing(addr) { + t.Fatalf("peer is not syncing but should") + } waitCursorsCalled(t, pullsync, addr) waitSync(t, pullsync, addr) @@ -463,7 +471,7 @@ func TestRadiusIncrease(t *testing.T) { kad.Trigger() time.Sleep(100 * time.Millisecond) if p.IsBinSyncing(addr, 1) || p.IsBinSyncing(addr, 2) || p.IsBinSyncing(addr, 3) { - t.Fatalf("peer is syncing but shouldn't") + t.Fatalf("peer is syncing but shouldn't because it is not a neighbor") } } @@ -535,8 +543,8 @@ func TestPeerGone(t *testing.T) { beforeCalls := pullsync.SyncCalls(addr) - if len(beforeCalls) != 1 { - t.Fatalf("unexpected amount of calls, got %d, want 1", len(beforeCalls)) + if len(beforeCalls) != 2 { // sync both bins because UD is 0 and that is the only peer + t.Fatalf("unexpected amount of calls, got %d, want 2", len(beforeCalls)) } kad.ResetPeers() From cc530ac9d4ae48e0fa51f2d7a6ea6728e1f0d3e2 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 26 Aug 2025 15:27:56 +0200 Subject: [PATCH 07/23] fix: ud at least radius --- pkg/puller/puller.go | 2 +- pkg/puller/puller_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 2630e123a1c..1042eefab69 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -175,7 +175,7 @@ func (p *Puller) manage(ctx context.Context) { // minUd := uint8(255) for i, target := range neighbors { - maxUd := uint8(0) + maxUd := uint8(newRadius) // do not want to sync below radius // find the uniqueness depth, within which they are the only peer in the set for j, neighbor := range neighbors { diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index c3cb4ab5149..38203e4371b 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -309,7 +309,7 @@ func TestBinReset(t *testing.T) { kadMock.AddrTuple{Addr: addr, PO: 2}, ), }, - pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(mockps.SyncReply{Bin: 1, Start: 1, Topmost: 1, Peer: addr})}, + pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(mockps.SyncReply{Bin: 2, Start: 1, Topmost: 1, Peer: addr})}, // bin must be at least radius to sync bins: 3, rs: resMock.NewReserve(resMock.WithRadius(2)), }) @@ -543,8 +543,8 @@ func TestPeerGone(t *testing.T) { beforeCalls := pullsync.SyncCalls(addr) - if len(beforeCalls) != 2 { // sync both bins because UD is 0 and that is the only peer - t.Fatalf("unexpected amount of calls, got %d, want 2", len(beforeCalls)) + if len(beforeCalls) != 1 { + t.Fatalf("unexpected amount of calls, got %d, want 1", len(beforeCalls)) } kad.ResetPeers() From 65e6c4deac11271e2bf35472c20a7ee4ed0f9743 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 26 Aug 2025 17:51:17 +0200 Subject: [PATCH 08/23] refactor: lint --- pkg/puller/puller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 1042eefab69..d2d76203c4f 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -175,7 +175,7 @@ func (p *Puller) manage(ctx context.Context) { // minUd := uint8(255) for i, target := range neighbors { - maxUd := uint8(newRadius) // do not want to sync below radius + maxUd := newRadius // do not want to sync below radius // find the uniqueness depth, within which they are the only peer in the set for j, neighbor := range neighbors { From d32cc322efcf3622032c6173752f3c16c3e37f02 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 26 Aug 2025 20:16:28 +0200 Subject: [PATCH 09/23] refactor: remove unnecessary po sync --- pkg/puller/puller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index d2d76203c4f..96f8dcd1bbd 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -310,9 +310,9 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin peer.cancelBin(bin) } - // sync all bins >= uniqueness depth or peer PO equals to bin + // sync all bins >= uniqueness depth for bin, cur := range peer.cursors { - if (bin >= int(peer.ud) || bin == int(peer.po)) && !peer.isBinSyncing(uint8(bin)) { + if bin >= int(peer.ud) && !peer.isBinSyncing(uint8(bin)) { p.syncPeerBin(ctx, peer, uint8(bin), cur) } } @@ -563,8 +563,8 @@ type syncPeer struct { address swarm.Address binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin po uint8 - ud int8 // uniqueness depth (-1 if not neighbor) - cursors []uint64 + ud int8 // uniqueness depth (-1 if not neighbor) + cursors []uint64 // index is bin, value is cursor mtx sync.Mutex wg sync.WaitGroup From f91d049c4a5839be04ce29341be530d890b55098 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 26 Aug 2025 20:29:42 +0200 Subject: [PATCH 10/23] test: not sync between ud and storagedepth --- pkg/puller/puller_test.go | 56 +++++++++++++++++++++++++- pkg/topology/kademlia/mock/kademlia.go | 12 ++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 38203e4371b..bb83667f1c7 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -90,7 +90,7 @@ func TestSyncOutsideDepth(t *testing.T) { t.Fatalf("peer is not syncing but should") } if p.IsSyncing(addr2) { - t.Fatalf("peer is syncing but shouldn't") // because not neighbor + t.Fatalf("peer is syncing but shouldn't because not neighbor") } waitCursorsCalled(t, pullsync, addr) @@ -98,6 +98,60 @@ func TestSyncOutsideDepth(t *testing.T) { waitSyncCalledBins(t, pullsync, addr, 2, 3) } +// test that addresses not cover full range of radius +func TestAddressesNotCoverRange(t *testing.T) { + t.Parallel() + + // calculate addresses where PO is greater than storage radius for all neighbors + storageRadius := uint8(0) + base := swarm.RandAddress(t) + firstBitDiffer := storageRadius + 1 // common prefix in neighbor addresses, their POs must be this number. + addr1Bytes := base.Clone().Bytes() + addr1Bytes[firstBitDiffer/8] ^= 1 << (7 - (firstBitDiffer % 8)) + addr1 := swarm.NewAddress(addr1Bytes) + udIndex := firstBitDiffer + 2 // their UDs will be this number + 1 + addr2Bytes := addr1.Clone().Bytes() + addr2Bytes[udIndex/8] ^= (1 << (7 - (udIndex % 8))) + addr2 := swarm.NewAddress(addr2Bytes) + + var ( + cursors = []uint64{0, 0, 0, 0, 0} + replies = []mockps.SyncReply{ + {Bin: storageRadius, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: storageRadius, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: storageRadius + 1, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: storageRadius + 1, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: storageRadius + 2, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: storageRadius + 2, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: storageRadius + 3, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: storageRadius + 3, Start: 1, Topmost: 1, Peer: addr2}, + } + revCalls = kadMock.MakeAddrTupleForRevCalls(base, addr1, addr2) + ) + + p, _, kad, _ := newPuller(t, opts{ + kad: []kadMock.Option{kadMock.WithEachPeerRevCalls( + revCalls..., + )}, + pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(replies...)}, + bins: 5, + rs: resMock.NewReserve(resMock.WithRadius(storageRadius)), + }) + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + + if !p.IsBinSyncing(addr1, udIndex+1) || !p.IsBinSyncing(addr2, udIndex+1) { + t.Fatalf("peers should sync bin = uniqueness depth") + } + if !p.IsBinSyncing(addr1, udIndex) || !p.IsBinSyncing(addr2, udIndex) { + t.Fatalf("peers should sync bin = uniqueness depth - 1 > storage radius") + } + if !p.IsBinSyncing(addr1, storageRadius) && !p.IsBinSyncing(addr2, storageRadius) { + t.Fatalf("no peer sync bin = storage radius") + } +} + func TestSyncIntervals(t *testing.T) { t.Parallel() diff --git a/pkg/topology/kademlia/mock/kademlia.go b/pkg/topology/kademlia/mock/kademlia.go index 81fa3fe69d6..e78f3c67c75 100644 --- a/pkg/topology/kademlia/mock/kademlia.go +++ b/pkg/topology/kademlia/mock/kademlia.go @@ -6,6 +6,7 @@ package mock import ( "context" + "sort" "sync" "time" @@ -19,6 +20,17 @@ type AddrTuple struct { PO uint8 // the po } +func MakeAddrTupleForRevCalls(base swarm.Address, addrs ...swarm.Address) []AddrTuple { + var tuples []AddrTuple + for _, addr := range addrs { + tuples = append(tuples, AddrTuple{Addr: addr, PO: swarm.Proximity(base.Bytes(), addr.Bytes())}) + } + sort.Slice(tuples, func(i, j int) bool { + return tuples[i].PO > tuples[j].PO + }) + return tuples +} + func WithEachPeerRevCalls(addrs ...AddrTuple) Option { return optionFunc(func(m *Mock) { m.eachPeerRev = append(m.eachPeerRev, addrs...) From 2e5275f2198d2b6e38c641f880ef293328ed133b Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 3 Sep 2025 18:57:33 +0200 Subject: [PATCH 11/23] feat: comptacted leaf binary tree --- pkg/puller/tree.go | 104 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 pkg/puller/tree.go diff --git a/pkg/puller/tree.go b/pkg/puller/tree.go new file mode 100644 index 00000000000..166bc87d85b --- /dev/null +++ b/pkg/puller/tree.go @@ -0,0 +1,104 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package puller + +import ( + "bytes" + "errors" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +var errShouldNeverHappen = errors.New("should never happen in case of keys have the same length and method was called from root") + +// TreeNode is a leaf compacted binary tree +// representing the address space of the neighborhood +type TreeNode[T any] struct { + K []byte + V *T + L uint8 + C [2]*TreeNode[T] +} + +// Put will override the value if the key is already present +func (t *TreeNode[T]) Put(key []byte, p *T) *TreeNode[T] { + bitIndex, err := bitOfBytes(key, t.L) + if err != nil { + // cannot go further in the binary representation, override leaf + if t.V != nil && !bytes.Equal(t.K, key) { + panic(errShouldNeverHappen) + } + t.V = p + t.K = key + return t + } + + c := t.C[bitIndex] + if c != nil { + return c.Put(key, p) + } + + if t.C[1-bitIndex] == nil { + // both children are nil, we are on a leaf. + if t.V == nil || bytes.Equal(t.K, key) { + t.V = p + t.K = key + return t + } + + // create as many parent tree nodes as needed + po := swarm.Proximity(t.K, key) + parent := t + ci := bitIndex + for i := uint8(0); i < po-t.L; i++ { + parent.C[ci] = newTreeNode[T](nil, nil, parent.L+1) + parent = parent.C[ci] + ci, err = bitOfBytes(key, parent.L) + if err != nil { + panic(errShouldNeverHappen) + } + } + + // move the old leaf value to the new parent + parent.C[1-ci] = newTreeNode(t.K, t.V, parent.L+1) + t.V = nil + t.K = nil + + // insert p to the new parent + parent.C[ci] = newTreeNode(key, p, parent.L+1) + return parent.C[ci] + } + + // child slot is free on the node so peer can be inserted. + c = newTreeNode(key, p, t.L+1) + t.C[bitIndex] = c + return c +} + +// traverse assigns bins to each peer for syncing by traversing the tree +// func (t *TreeNode[T]) traverse() { +// // TODO +// } + +func newTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { + return &TreeNode[T]{ + K: key, + V: p, + L: level, + C: [2]*TreeNode[T]{}, + } +} + +// bitOfBytes extracts the bit at the specified index from a byte slice. +// Returns 0 or 1 based on the bit value at the given position. +func bitOfBytes(bytes []byte, bitIndex uint8) (uint8, error) { + if bitIndex >= uint8(len(bytes)*8) { + return 0, errors.New("bit index out of range") + } + byteIndex := bitIndex / 8 + bitPosition := 7 - (bitIndex % 8) // MSB first (big-endian) + + b := bytes[byteIndex] + return (b >> bitPosition) & 1, nil +} From 22fc8d99b3863fe255c2e63ca780a48ac58c66da Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 3 Sep 2025 19:00:21 +0200 Subject: [PATCH 12/23] feat: binSync init instead of ud calculations --- pkg/puller/puller.go | 84 ++++++++++++++++---------------------------- 1 file changed, 30 insertions(+), 54 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 96f8dcd1bbd..7ecb6ec9468 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -150,52 +150,37 @@ func (p *Puller) manage(ctx context.Context) { } p.logger.Debug("radius decrease", "old_radius", prevRadius, "new_radius", newRadius) } + // TODO check neighborhood or radius have changed or not prevRadius = newRadius // peersDisconnected is used to mark and prune peers that are no longer connected. peersDisconnected := maps.Clone(p.syncPeers) - var neighbors []swarm.Address + // make pullsync binary tree of neighbors by their address + bt := newTreeNode[syncPeer](nil, nil, newRadius) _ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { - if syncPeer, ok := p.syncPeers[addr.ByteString()]; !ok { - p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po) + syncPeer, ok := p.syncPeers[addr.ByteString()] + if !ok { + syncPeer = newSyncPeer(addr, p.bins, po) + p.syncPeers[addr.ByteString()] = syncPeer } else { - syncPeer.ud = -1 + syncPeer.syncBins = make([]bool, p.bins) } if po >= newRadius { - neighbors = append(neighbors, addr) + _ = bt.Put(addr.Bytes(), syncPeer) // all peers are unique and have the same key length, panic should not happen } delete(peersDisconnected, addr.ByteString()) return false, false, nil }, topology.Select{}) + // assign bins to each peer for syncing if peerset or radius changed + // bt.traverse() + for _, peer := range peersDisconnected { p.disconnectPeer(peer.address) } - // minUd := uint8(255) - for i, target := range neighbors { - maxUd := newRadius // do not want to sync below radius - - // find the uniqueness depth, within which they are the only peer in the set - for j, neighbor := range neighbors { - if i == j { - continue - } - - ud := swarm.Proximity(target.Bytes(), neighbor.Bytes()) + 1 - if ud > maxUd { - maxUd = ud - } - } - p.syncPeers[target.ByteString()].ud = int8(maxUd) - - // if maxUd < minUd { - // minUd = maxUd - // } - } - - p.recalcPeers(ctx, newRadius) + p.recalcPeers(ctx) } tick := time.NewTicker(recalcPeersDur) @@ -230,7 +215,7 @@ func (p *Puller) disconnectPeer(addr swarm.Address) { // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. // Must be called under lock. -func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { +func (p *Puller) recalcPeers(ctx context.Context) { var wg sync.WaitGroup for _, peer := range p.syncPeers { wg.Add(1) @@ -238,20 +223,19 @@ func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { go func(peer *syncPeer) { defer p.wg.Done() defer wg.Done() - if err := p.syncPeer(ctx, peer, storageRadius); err != nil { + if err := p.syncPeer(ctx, peer); err != nil { p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err) } }(peer) - // TODO: sync bins above or equal to storageRadius not covered by syncpeer } wg.Wait() } -func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error { +func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer) error { peer.mtx.Lock() defer peer.mtx.Unlock() - if peer.ud == -1 { // not neighbor + if len(peer.syncBins) == 0 { // no bin is assigned for syncing peer.stop() return nil } @@ -292,28 +276,20 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin return errCursorsLength } - // sync PO bin only - if !peer.isBinSyncing(peer.po) { - p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po]) - } - /* - The syncing behavior diverges for peers outside and within the storage radius. - For neighbor peers, we sync ALL bins greater than or equal to the storage radius. - For peers with PO lower than the storage radius, we must sync ONLY the bin that is the PO. - For peers peer with PO lower than the storage radius and even lower than the allowed minimum threshold, - no syncing is done. + All chunks above storage radius need to be syncronized only once. + For that, the puller must be aware of how neighbors cover the chunks by their bins together. + Based on that information, the puller will assign bins to each peer for sync. + So this point all bins need to be known for a peer for syncing. */ - // cancel all bins lower than the storage radius - for bin := uint8(0); bin < storageRadius; bin++ { - peer.cancelBin(bin) - } - - // sync all bins >= uniqueness depth - for bin, cur := range peer.cursors { - if bin >= int(peer.ud) && !peer.isBinSyncing(uint8(bin)) { - p.syncPeerBin(ctx, peer, uint8(bin), cur) + for bin, forSync := range peer.syncBins { + if forSync { + if !peer.isBinSyncing(uint8(bin)) { + p.syncPeerBin(ctx, peer, uint8(bin), peer.cursors[bin]) + } + } else { + peer.cancelBin(uint8(bin)) } } @@ -563,7 +539,7 @@ type syncPeer struct { address swarm.Address binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin po uint8 - ud int8 // uniqueness depth (-1 if not neighbor) + syncBins []bool // index is bin, value is syncing or not. will be set during folding in the puller neighborhood tree cursors []uint64 // index is bin, value is cursor mtx sync.Mutex @@ -575,7 +551,7 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { address: addr, binCancelFuncs: make(map[uint8]func(), bins), po: po, - ud: -1, // calculated later when all neighbors are collected + syncBins: make([]bool, bins), } } From 5bbcd9a049943844c26e4bca02f0e6eb256203be Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 3 Sep 2025 19:00:59 +0200 Subject: [PATCH 13/23] test: binary tree --- pkg/puller/export_test.go | 5 ++ pkg/puller/tree_test.go | 128 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 pkg/puller/tree_test.go diff --git a/pkg/puller/export_test.go b/pkg/puller/export_test.go index e980791618d..258fc1964a9 100644 --- a/pkg/puller/export_test.go +++ b/pkg/puller/export_test.go @@ -8,6 +8,11 @@ import "github.com/ethersphere/bee/v2/pkg/swarm" var PeerIntervalKey = peerIntervalKey +// NewTreeNode is a wrapper for the generic newTreeNode function for testing +func NewTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { + return newTreeNode(key, p, level) +} + func (p *Puller) IsSyncing(addr swarm.Address) bool { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock() diff --git a/pkg/puller/tree_test.go b/pkg/puller/tree_test.go new file mode 100644 index 00000000000..c472c43bd0d --- /dev/null +++ b/pkg/puller/tree_test.go @@ -0,0 +1,128 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package puller_test + +import ( + "bytes" + "testing" + + "github.com/ethersphere/bee/v2/pkg/puller" +) + +func TestTreeNodePut(t *testing.T) { + tree := puller.NewTreeNode[int](nil, nil, 0) + keys := [][]byte{ + {0b11000000}, + {0b11100000}, + {0b00000000}, + {0b00000001}, + {0b00010001}, + } + for i, k := range keys { + _ = tree.Put(k, &i) + } + + noKeyAndValue := func(c *puller.TreeNode[int]) { + if c.V != nil || c.K != nil { + t.Fatalf("expected no key and value") + } + } + onlyRightChild := func(c *puller.TreeNode[int]) { + if c.C[0] != nil || c.C[1] == nil { + t.Fatalf("expected to have a right child only") + } + } + onlyLeftChild := func(c *puller.TreeNode[int]) { + if c.C[0] == nil || c.C[1] != nil { + t.Fatalf("expected to have a left child only") + } + } + noChildren := func(c *puller.TreeNode[int]) { + if c.C[0] != nil || c.C[1] != nil { + t.Fatalf("expected no children") + } + } + bothChildren := func(c *puller.TreeNode[int]) { + if c.C[0] == nil || c.C[1] == nil { + t.Fatalf("expected both left and right branches to exist") + } + } + keyEqualsTo := func(c *puller.TreeNode[int], k []byte) { + if !bytes.Equal(c.K, k) { + t.Fatalf("expected key to be %b, got %b", k, c.K) + } + } + valueEqualsTo := func(c *puller.TreeNode[int], v int) { + if c.V == nil || *c.V != v { + t.Fatalf("expected value to be %b, got %b", v, c.V) + } + } + lengthEqualsTo := func(c *puller.TreeNode[int], l uint8) { + if c.L != l { + t.Fatalf("expected length to be %d, got %d", l, c.L) + } + } + + cursor := tree + // Simple verification that values were stored + noKeyAndValue(cursor) + bothChildren(cursor) + //check right side of the tree + cursor = tree.C[1] // prefix: 1 + noKeyAndValue(cursor) + onlyRightChild(cursor) + lengthEqualsTo(cursor, 1) + cursor = cursor.C[1] // prefix 11 + noKeyAndValue(cursor) + bothChildren(cursor) + cursor0 := cursor.C[0] + noChildren(cursor0) + keyEqualsTo(cursor0, []byte{0b11000000}) + valueEqualsTo(cursor0, 0) + lengthEqualsTo(cursor0, 3) + cursor1 := cursor.C[1] + noChildren(cursor1) + keyEqualsTo(cursor1, []byte{0b11100000}) + valueEqualsTo(cursor1, 1) + lengthEqualsTo(cursor1, 3) + + //check right left of the tree + cursor = tree.C[0] // prefix: 0 + noKeyAndValue(cursor) + onlyLeftChild(cursor) + lengthEqualsTo(cursor, 1) + cursor = cursor.C[0] // prefix: 00 + noKeyAndValue(cursor) + onlyLeftChild(cursor) + lengthEqualsTo(cursor, 2) + cursor = cursor.C[0] // prefix: 000 + noKeyAndValue(cursor) + bothChildren(cursor) + lengthEqualsTo(cursor, 3) + cursor1 = cursor.C[1] // prefix: 0001 + noChildren(cursor1) + keyEqualsTo(cursor1, []byte{0b00010001}) + valueEqualsTo(cursor1, 4) + lengthEqualsTo(cursor1, 4) + cursor = cursor.C[0] // prefix: 0000 + noKeyAndValue(cursor) + onlyLeftChild(cursor) + cursor = cursor.C[0] // prefix: 00000 + noKeyAndValue(cursor) + onlyLeftChild(cursor) + cursor = cursor.C[0] // prefix: 000000 + noKeyAndValue(cursor) + onlyLeftChild(cursor) + cursor = cursor.C[0] // prefix: 0000000 + noKeyAndValue(cursor) + bothChildren(cursor) + cursor0 = cursor.C[0] + noChildren(cursor0) + keyEqualsTo(cursor0, []byte{0b00000000}) + valueEqualsTo(cursor0, 2) + cursor1 = cursor.C[1] + noChildren(cursor1) + keyEqualsTo(cursor1, []byte{0b00000001}) + valueEqualsTo(cursor1, 3) +} From 2f47a58a3b814046f1e3bd5d098fbaec95be8491 Mon Sep 17 00:00:00 2001 From: nugaon Date: Sun, 7 Sep 2025 14:46:33 +0200 Subject: [PATCH 14/23] feat: peerTreeNode --- pkg/puller/tree.go | 83 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 78 insertions(+), 5 deletions(-) diff --git a/pkg/puller/tree.go b/pkg/puller/tree.go index 166bc87d85b..c685e5d5529 100644 --- a/pkg/puller/tree.go +++ b/pkg/puller/tree.go @@ -10,7 +10,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/swarm" ) -var errShouldNeverHappen = errors.New("should never happen in case of keys have the same length and method was called from root") +var errShouldNeverHappen = errors.New("should never happen in case of keys have the same length, bits of those are matching until the starting level and method was called from root") // TreeNode is a leaf compacted binary tree // representing the address space of the neighborhood @@ -76,10 +76,9 @@ func (t *TreeNode[T]) Put(key []byte, p *T) *TreeNode[T] { return c } -// traverse assigns bins to each peer for syncing by traversing the tree -// func (t *TreeNode[T]) traverse() { -// // TODO -// } +func (p TreeNode[T]) isLeaf() bool { + return p.V != nil +} func newTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { return &TreeNode[T]{ @@ -102,3 +101,77 @@ func bitOfBytes(bytes []byte, bitIndex uint8) (uint8, error) { b := bytes[byteIndex] return (b >> bitPosition) & 1, nil } + +func newPeerTreeNode(key []byte, p *peerTreeNodeValue, level uint8) *peerTreeNode { + return &peerTreeNode{ + TreeNode: &TreeNode[peerTreeNodeValue]{ + K: key, + V: p, + L: level, + C: [2]*TreeNode[peerTreeNodeValue]{}, + }, + } +} + +// All properties of a peer that are needed for bin assignment +type peerTreeNodeValue struct { + SyncBins []bool +} + +// peerTreeNode is a specialized TreeNode for managing sync peers +type peerTreeNode struct { + *TreeNode[peerTreeNodeValue] +} + +// BinAssignment assigns bins to each peer for syncing by traversing the tree +func (t *peerTreeNode) BinAssignment() (peers []*peerTreeNodeValue) { + if t.isLeaf() { + bl := uint8(len(t.V.SyncBins)) + for i := t.L; i < bl; i++ { + t.V.SyncBins[i] = true + } + + return []*peerTreeNodeValue{t.V} + } + // handle compactible nodes and nodes with both children + for i := 1; i >= 0; i-- { + if t.C[1-i] != nil { + l := (&peerTreeNode{t.C[1-i]}).BinAssignment() + peers = append(peers, l...) + if t.C[i] == nil { + // choose one of the leaves to add level bin + p := selectSyncPeer(peers) + p.SyncBins[t.L] = true + } + } + } + return peers +} + +// selectSyncPeer selects a peer from many to sync the bin +// that should include the same chunks among the peers +// current strategy: it returns the peer with the least number of bins assigned +// assumes peers array has one element at least +func selectSyncPeer(peers []*peerTreeNodeValue) *peerTreeNodeValue { + minPeer := peers[0] + minCount := countTrue(minPeer.SyncBins) + for i := 1; i < len(peers); i++ { + count := countTrue(peers[i].SyncBins) + if count < minCount { + minPeer = peers[i] + minCount = count + } + } + return minPeer +} + +// countTrue returns the number of true values in a boolean slice. +func countTrue(arr []bool) int { + count := 0 + for _, v := range arr { + if v { + count++ + } + } + return count +} From 3f59c8e54d453f07d01da10684f123d071a3b41a Mon Sep 17 00:00:00 2001 From: nugaon Date: Sun, 7 Sep 2025 14:47:26 +0200 Subject: [PATCH 15/23] test: bin assignments and refactor --- pkg/puller/export_test.go | 9 +- pkg/puller/tree_test.go | 333 +++++++++++++++++++++++++++++--------- 2 files changed, 261 insertions(+), 81 deletions(-) diff --git a/pkg/puller/export_test.go b/pkg/puller/export_test.go index 258fc1964a9..603d897d56f 100644 --- a/pkg/puller/export_test.go +++ b/pkg/puller/export_test.go @@ -6,7 +6,14 @@ package puller import "github.com/ethersphere/bee/v2/pkg/swarm" -var PeerIntervalKey = peerIntervalKey +type ( + PeerTreeNodeValue = peerTreeNodeValue +) + +var ( + PeerIntervalKey = peerIntervalKey + NewPeerTreeNode = newPeerTreeNode +) // NewTreeNode is a wrapper for the generic newTreeNode function for testing func NewTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { diff --git a/pkg/puller/tree_test.go b/pkg/puller/tree_test.go index c472c43bd0d..77ac80adbbc 100644 --- a/pkg/puller/tree_test.go +++ b/pkg/puller/tree_test.go @@ -11,118 +11,291 @@ import ( ) func TestTreeNodePut(t *testing.T) { - tree := puller.NewTreeNode[int](nil, nil, 0) - keys := [][]byte{ - {0b11000000}, - {0b11100000}, - {0b00000000}, - {0b00000001}, - {0b00010001}, - } - for i, k := range keys { - _ = tree.Put(k, &i) - } + t.Parallel() - noKeyAndValue := func(c *puller.TreeNode[int]) { + noKeyAndValue := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() if c.V != nil || c.K != nil { t.Fatalf("expected no key and value") } } - onlyRightChild := func(c *puller.TreeNode[int]) { + + onlyRightChild := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() if c.C[0] != nil || c.C[1] == nil { t.Fatalf("expected to have a right child only") } } - onlyLeftChild := func(c *puller.TreeNode[int]) { + + onlyLeftChild := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() if c.C[0] == nil || c.C[1] != nil { t.Fatalf("expected to have a left child only") } } - noChildren := func(c *puller.TreeNode[int]) { + + noChildren := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() if c.C[0] != nil || c.C[1] != nil { t.Fatalf("expected no children") } } - bothChildren := func(c *puller.TreeNode[int]) { + + bothChildren := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() if c.C[0] == nil || c.C[1] == nil { t.Fatalf("expected both left and right branches to exist") } } - keyEqualsTo := func(c *puller.TreeNode[int], k []byte) { + + keyEqualsTo := func(t *testing.T, c *puller.TreeNode[int], k []byte) { + t.Helper() if !bytes.Equal(c.K, k) { t.Fatalf("expected key to be %b, got %b", k, c.K) } } - valueEqualsTo := func(c *puller.TreeNode[int], v int) { + + valueEqualsTo := func(t *testing.T, c *puller.TreeNode[int], v int) { + t.Helper() if c.V == nil || *c.V != v { t.Fatalf("expected value to be %b, got %b", v, c.V) } } - lengthEqualsTo := func(c *puller.TreeNode[int], l uint8) { + + lengthEqualsTo := func(t *testing.T, c *puller.TreeNode[int], l uint8) { + t.Helper() if c.L != l { t.Fatalf("expected length to be %d, got %d", l, c.L) } } - cursor := tree - // Simple verification that values were stored - noKeyAndValue(cursor) - bothChildren(cursor) - //check right side of the tree - cursor = tree.C[1] // prefix: 1 - noKeyAndValue(cursor) - onlyRightChild(cursor) - lengthEqualsTo(cursor, 1) - cursor = cursor.C[1] // prefix 11 - noKeyAndValue(cursor) - bothChildren(cursor) - cursor0 := cursor.C[0] - noChildren(cursor0) - keyEqualsTo(cursor0, []byte{0b11000000}) - valueEqualsTo(cursor0, 0) - lengthEqualsTo(cursor0, 3) - cursor1 := cursor.C[1] - noChildren(cursor1) - keyEqualsTo(cursor1, []byte{0b11100000}) - valueEqualsTo(cursor1, 1) - lengthEqualsTo(cursor1, 3) - - //check right left of the tree - cursor = tree.C[0] // prefix: 0 - noKeyAndValue(cursor) - onlyLeftChild(cursor) - lengthEqualsTo(cursor, 1) - cursor = cursor.C[0] // prefix: 00 - noKeyAndValue(cursor) - onlyLeftChild(cursor) - lengthEqualsTo(cursor, 2) - cursor = cursor.C[0] // prefix: 000 - noKeyAndValue(cursor) - bothChildren(cursor) - lengthEqualsTo(cursor, 3) - cursor1 = cursor.C[1] // prefix: 0001 - noChildren(cursor1) - keyEqualsTo(cursor1, []byte{0b00010001}) - valueEqualsTo(cursor1, 4) - lengthEqualsTo(cursor1, 4) - cursor = cursor.C[0] // prefix: 0000 - noKeyAndValue(cursor) - onlyLeftChild(cursor) - cursor = cursor.C[0] // prefix: 00000 - noKeyAndValue(cursor) - onlyLeftChild(cursor) - cursor = cursor.C[0] // prefix: 000000 - noKeyAndValue(cursor) - onlyLeftChild(cursor) - cursor = cursor.C[0] // prefix: 0000000 - noKeyAndValue(cursor) - bothChildren(cursor) - cursor0 = cursor.C[0] - noChildren(cursor0) - keyEqualsTo(cursor0, []byte{0b00000000}) - valueEqualsTo(cursor0, 2) - cursor1 = cursor.C[1] - noChildren(cursor1) - keyEqualsTo(cursor1, []byte{0b00000001}) - valueEqualsTo(cursor1, 3) + t.Run("TestWithEdgeCases", func(t *testing.T) { + t.Parallel() + + tree := puller.NewTreeNode[int](nil, nil, 0) + keys := [][]byte{ + {0b11000000}, + {0b11100000}, + {0b00000000}, + {0b00000001}, + {0b00010001}, + } + for i, k := range keys { + _ = tree.Put(k, &i) + } + + cursor := tree + // Simple verification that values were stored + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + //check right side of the tree + cursor = tree.C[1] // prefix: 1 + noKeyAndValue(t, cursor) + onlyRightChild(t, cursor) + lengthEqualsTo(t, cursor, 1) + cursor = cursor.C[1] // prefix 11 + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + cursor0 := cursor.C[0] + noChildren(t, cursor0) + keyEqualsTo(t, cursor0, []byte{0b11000000}) + valueEqualsTo(t, cursor0, 0) + lengthEqualsTo(t, cursor0, 3) + cursor1 := cursor.C[1] + noChildren(t, cursor1) + keyEqualsTo(t, cursor1, []byte{0b11100000}) + valueEqualsTo(t, cursor1, 1) + lengthEqualsTo(t, cursor1, 3) + + //check right left of the tree + cursor = tree.C[0] // prefix: 0 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + lengthEqualsTo(t, cursor, 1) + cursor = cursor.C[0] // prefix: 00 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + lengthEqualsTo(t, cursor, 2) + cursor = cursor.C[0] // prefix: 000 + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + lengthEqualsTo(t, cursor, 3) + cursor1 = cursor.C[1] // prefix: 0001 + noChildren(t, cursor1) + keyEqualsTo(t, cursor1, []byte{0b00010001}) + valueEqualsTo(t, cursor1, 4) + lengthEqualsTo(t, cursor1, 4) + cursor = cursor.C[0] // prefix: 0000 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + cursor = cursor.C[0] // prefix: 00000 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + cursor = cursor.C[0] // prefix: 000000 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + cursor = cursor.C[0] // prefix: 0000000 + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + cursor0 = cursor.C[0] + noChildren(t, cursor0) + keyEqualsTo(t, cursor0, []byte{0b00000000}) + valueEqualsTo(t, cursor0, 2) + cursor1 = cursor.C[1] + noChildren(t, cursor1) + keyEqualsTo(t, cursor1, []byte{0b00000001}) + valueEqualsTo(t, cursor1, 3) + }) + + t.Run("TestWithStartingLevel", func(t *testing.T) { + tree := puller.NewTreeNode[int](nil, nil, 2) + keys := [][]byte{ + {0b00000000}, + {0b00100000}, + } + for i, k := range keys { + _ = tree.Put(k, &i) + } + cursor := tree + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + lengthEqualsTo(t, cursor, 2) + cursor0 := cursor.C[0] + keyEqualsTo(t, cursor0, keys[0]) + valueEqualsTo(t, cursor0, 0) + noChildren(t, cursor0) + lengthEqualsTo(t, cursor0, 3) + cursor1 := cursor.C[1] + keyEqualsTo(t, cursor1, keys[1]) + valueEqualsTo(t, cursor1, 1) + noChildren(t, cursor1) + lengthEqualsTo(t, cursor1, 3) + }) +} + +func TestBinAssignment(t *testing.T) { + t.Parallel() + + allTrue := func(t *testing.T, from uint8, arr []bool) { + t.Helper() + for i := from; i < uint8(len(arr)); i++ { + if !arr[i] { + t.Fatalf("expected true at index %d", i) + } + } + } + + allFalse := func(t *testing.T, index uint8, arrs ...[]bool) { + t.Helper() + for i, arr := range arrs { + if arr[index] { + t.Fatalf("expected false at index %d for array %d", index, i) + } + } + } + + t.Run("TestEmpty", func(t *testing.T) { + t.Parallel() + + tree := puller.NewPeerTreeNode(nil, nil, 0) + neighbors := tree.BinAssignment() + if len(neighbors) != 0 { + t.Errorf("expected no neighbors, got %d", len(neighbors)) + } + }) + + t.Run("TestOnePeer", func(t *testing.T) { + t.Parallel() + + tree := puller.NewPeerTreeNode(nil, nil, 0) + syncBins := make([]bool, 4) + tree.Put([]byte{0b00000000}, &puller.PeerTreeNodeValue{SyncBins: syncBins}) + neighbors := tree.BinAssignment() + if len(neighbors) != 1 { + t.Errorf("expected one peer, got %d", len(neighbors)) + } + allTrue(t, 0, neighbors[0].SyncBins) + }) + + t.Run("TestWithStartingLevel", func(t *testing.T) { + t.Parallel() + + tree := puller.NewPeerTreeNode(nil, nil, 2) + syncBins := make([]bool, 6) + tree.Put([]byte{0b00000000}, &puller.PeerTreeNodeValue{SyncBins: syncBins}) + neighbors := tree.BinAssignment() + if len(neighbors) != 1 { + t.Errorf("expected one peer, got %d", len(neighbors)) + } + allTrue(t, 2, neighbors[0].SyncBins) + allFalse(t, 1, neighbors[0].SyncBins) + allFalse(t, 0, neighbors[0].SyncBins) + }) + + t.Run("TestMultiplePeers", func(t *testing.T) { + t.Parallel() + + onlyOneTrue := func(t *testing.T, index uint8, arrs ...[]bool) { + t.Helper() + hadOneTrue := false + for _, arr := range arrs { + if arr[index] { + if hadOneTrue { + t.Fatalf("expected only one true") + } + hadOneTrue = true + } + } + if !hadOneTrue { + t.Fatalf("expected exactly one true") + } + } + + tree := puller.NewPeerTreeNode(nil, nil, 0) + maxBins := 9 // bin0-bin8 + sb := make([][]bool, 0, 7) + putSyncPeer := func(key []byte) { + syncBins := make([]bool, maxBins) + sb = append(sb, syncBins) + tree.Put(key, &puller.PeerTreeNodeValue{SyncBins: syncBins}) + } + putSyncPeer([]byte{0b00000001}) + putSyncPeer([]byte{0b00000000}) + putSyncPeer([]byte{0b00010001}) + putSyncPeer([]byte{0b00010000}) + putSyncPeer([]byte{0b11100000}) + putSyncPeer([]byte{0b11000000}) + putSyncPeer([]byte{0b11110000}) + + neighbors := tree.BinAssignment() + if len(neighbors) != 7 { + t.Errorf("expected 7 neighbors, got %d", len(neighbors)) + } + + // check bins >= ud + allTrue(t, 8, sb[0]) + allTrue(t, 8, sb[1]) + allTrue(t, 8, sb[2]) + allTrue(t, 8, sb[3]) + allTrue(t, 4, sb[4]) + allTrue(t, 3, sb[5]) + allTrue(t, 4, sb[6]) + + // check intermediate bins + // left branch + allFalse(t, 7, sb[0], sb[1]) + onlyOneTrue(t, 6, sb[0], sb[1]) + onlyOneTrue(t, 5, sb[0], sb[1]) + onlyOneTrue(t, 4, sb[0], sb[1]) + allFalse(t, 7, sb[2], sb[3]) + onlyOneTrue(t, 6, sb[2], sb[3]) + onlyOneTrue(t, 5, sb[2], sb[3]) + onlyOneTrue(t, 4, sb[2], sb[3]) + allFalse(t, 3, sb[0], sb[1], sb[2], sb[3]) + // right branch + allFalse(t, 3, sb[4], sb[6]) + allFalse(t, 2, sb[4], sb[5], sb[6]) + onlyOneTrue(t, 1, sb[4], sb[5], sb[6]) + allFalse(t, 0, sb[0], sb[1], sb[2], sb[3], sb[4], sb[5], sb[6]) + }) } From 5486ef06e4f1d368b2884b0ec1cfdca41b254f4c Mon Sep 17 00:00:00 2001 From: nugaon Date: Sun, 7 Sep 2025 14:50:33 +0200 Subject: [PATCH 16/23] feat: bin assignment in puller --- pkg/puller/puller.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 7ecb6ec9468..1def8e8f25e 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -132,7 +132,7 @@ func (p *Puller) manage(ctx context.Context) { var prevRadius uint8 - onChange := func() { + changeCheck := func() { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock() @@ -150,14 +150,14 @@ func (p *Puller) manage(ctx context.Context) { } p.logger.Debug("radius decrease", "old_radius", prevRadius, "new_radius", newRadius) } - // TODO check neighborhood or radius have changed or not + changed := newRadius != prevRadius prevRadius = newRadius // peersDisconnected is used to mark and prune peers that are no longer connected. peersDisconnected := maps.Clone(p.syncPeers) // make pullsync binary tree of neighbors by their address - bt := newTreeNode[syncPeer](nil, nil, newRadius) + bt := newPeerTreeNode(nil, nil, newRadius) _ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { syncPeer, ok := p.syncPeers[addr.ByteString()] if !ok { @@ -167,18 +167,25 @@ func (p *Puller) manage(ctx context.Context) { syncPeer.syncBins = make([]bool, p.bins) } if po >= newRadius { - _ = bt.Put(addr.Bytes(), syncPeer) // all peers are unique and have the same key length, panic should not happen + if !ok { + changed = true + } + _ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins}) } delete(peersDisconnected, addr.ByteString()) return false, false, nil }, topology.Select{}) - // assign bins to each peer for syncing if peerset or radius changed - // bt.traverse() - for _, peer := range peersDisconnected { + if peer.po >= newRadius { + changed = true + } p.disconnectPeer(peer.address) } + // assign bins to each peer for syncing if peerset or radius changed + if changed { + _ = bt.BinAssignment() + } p.recalcPeers(ctx) } @@ -188,7 +195,7 @@ func (p *Puller) manage(ctx context.Context) { for { - onChange() + changeCheck() select { case <-ctx.Done(): @@ -235,7 +242,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer) error { peer.mtx.Lock() defer peer.mtx.Unlock() - if len(peer.syncBins) == 0 { // no bin is assigned for syncing + if peer.po < p.radius.StorageRadius() { // no bin is assigned for syncing peer.stop() return nil } @@ -555,6 +562,10 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { } } +func (p *syncPeer) GetSyncBins() []bool { + return p.syncBins +} + // called when peer disconnects or on shutdown, cleans up ongoing sync operations func (p *syncPeer) stop() { for bin, c := range p.binCancelFuncs { From 82cb9ad40c530f90061fa0bcf7a79c238c99da12 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Sep 2025 12:40:44 +0200 Subject: [PATCH 17/23] chore: lint --- pkg/topology/kademlia/mock/kademlia.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/topology/kademlia/mock/kademlia.go b/pkg/topology/kademlia/mock/kademlia.go index e78f3c67c75..3e752a74dae 100644 --- a/pkg/topology/kademlia/mock/kademlia.go +++ b/pkg/topology/kademlia/mock/kademlia.go @@ -21,14 +21,14 @@ type AddrTuple struct { } func MakeAddrTupleForRevCalls(base swarm.Address, addrs ...swarm.Address) []AddrTuple { - var tuples []AddrTuple + var at []AddrTuple for _, addr := range addrs { - tuples = append(tuples, AddrTuple{Addr: addr, PO: swarm.Proximity(base.Bytes(), addr.Bytes())}) + at = append(at, AddrTuple{Addr: addr, PO: swarm.Proximity(base.Bytes(), addr.Bytes())}) } - sort.Slice(tuples, func(i, j int) bool { - return tuples[i].PO > tuples[j].PO + sort.Slice(at, func(i, j int) bool { + return at[i].PO > at[j].PO }) - return tuples + return at } func WithEachPeerRevCalls(addrs ...AddrTuple) Option { From bd7cb9a2f2383404cf69a1f0ca1006664082e287 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Sep 2025 14:30:24 +0200 Subject: [PATCH 18/23] fix: call recalcPeers only if something has changed --- pkg/puller/export_test.go | 1 + pkg/puller/puller.go | 16 +++++++--------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/puller/export_test.go b/pkg/puller/export_test.go index 603d897d56f..bc24f0c10fd 100644 --- a/pkg/puller/export_test.go +++ b/pkg/puller/export_test.go @@ -20,6 +20,7 @@ func NewTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { return newTreeNode(key, p, level) } +// IsSyncing returns true if any of the bins is syncing func (p *Puller) IsSyncing(addr swarm.Address) bool { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock() diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 1def8e8f25e..d27a1f63917 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -163,21 +163,20 @@ func (p *Puller) manage(ctx context.Context) { if !ok { syncPeer = newSyncPeer(addr, p.bins, po) p.syncPeers[addr.ByteString()] = syncPeer - } else { - syncPeer.syncBins = make([]bool, p.bins) - } - if po >= newRadius { - if !ok { + if po >= newRadius { changed = true + _ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins}) } - _ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins}) + } else { + syncPeer.syncBins = make([]bool, p.bins) } delete(peersDisconnected, addr.ByteString()) return false, false, nil }, topology.Select{}) for _, peer := range peersDisconnected { - if peer.po >= newRadius { + // if prevRadius was smaller, we need to take out it from the tree + if peer.po >= min(newRadius, prevRadius) { changed = true } p.disconnectPeer(peer.address) @@ -185,9 +184,8 @@ func (p *Puller) manage(ctx context.Context) { // assign bins to each peer for syncing if peerset or radius changed if changed { _ = bt.BinAssignment() + p.recalcPeers(ctx) } - - p.recalcPeers(ctx) } tick := time.NewTicker(recalcPeersDur) From ee651860abbb904d83cd9a05dc938d937853e9d1 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Sep 2025 15:30:29 +0200 Subject: [PATCH 19/23] fix: prevent overflow --- pkg/puller/tree.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/puller/tree.go b/pkg/puller/tree.go index c685e5d5529..18d2741ea00 100644 --- a/pkg/puller/tree.go +++ b/pkg/puller/tree.go @@ -92,7 +92,7 @@ func newTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { // bitOfBytes extracts the bit at the specified index from a byte slice. // Returns 0 or 1 based on the bit value at the given position. func bitOfBytes(bytes []byte, bitIndex uint8) (uint8, error) { - if bitIndex >= uint8(len(bytes)*8) { + if int(bitIndex) >= len(bytes)*8 { return 0, errors.New("bit index out of range") } byteIndex := bitIndex / 8 From 237c510b268e9a510e1895f182d8941ae4ecd867 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Sep 2025 16:42:42 +0200 Subject: [PATCH 20/23] fix: put peer under the tree if it is a eighbor --- pkg/puller/puller.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index d27a1f63917..15e2127f3b9 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -165,11 +165,13 @@ func (p *Puller) manage(ctx context.Context) { p.syncPeers[addr.ByteString()] = syncPeer if po >= newRadius { changed = true - _ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins}) } } else { syncPeer.syncBins = make([]bool, p.bins) } + if po >= newRadius { + _ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins}) + } delete(peersDisconnected, addr.ByteString()) return false, false, nil }, topology.Select{}) From 07ea3279f0a7e474d0b7d79f496ddffa5caf5977 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Sep 2025 16:43:25 +0200 Subject: [PATCH 21/23] test: puller addresses coverage --- pkg/puller/puller_test.go | 82 +++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index bb83667f1c7..b191d65545c 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "math/rand" "testing" "time" @@ -98,33 +99,29 @@ func TestSyncOutsideDepth(t *testing.T) { waitSyncCalledBins(t, pullsync, addr, 2, 3) } -// test that addresses not cover full range of radius -func TestAddressesNotCoverRange(t *testing.T) { +// test that addresses cover full range of radius +func TestAddressesCoverage(t *testing.T) { t.Parallel() - // calculate addresses where PO is greater than storage radius for all neighbors storageRadius := uint8(0) - base := swarm.RandAddress(t) - firstBitDiffer := storageRadius + 1 // common prefix in neighbor addresses, their POs must be this number. - addr1Bytes := base.Clone().Bytes() - addr1Bytes[firstBitDiffer/8] ^= 1 << (7 - (firstBitDiffer % 8)) - addr1 := swarm.NewAddress(addr1Bytes) - udIndex := firstBitDiffer + 2 // their UDs will be this number + 1 - addr2Bytes := addr1.Clone().Bytes() - addr2Bytes[udIndex/8] ^= (1 << (7 - (udIndex % 8))) - addr2 := swarm.NewAddress(addr2Bytes) + base := swarm.NewAddress([]byte{byte(rand.Intn(256))}) + addr1 := swarm.NewAddress([]byte{0b01100000}) + addr2 := swarm.NewAddress([]byte{0b01110001}) + supportBins := uint8(6) var ( - cursors = []uint64{0, 0, 0, 0, 0} + cursors = make([]uint64, supportBins) replies = []mockps.SyncReply{ - {Bin: storageRadius, Start: 1, Topmost: 1, Peer: addr1}, - {Bin: storageRadius, Start: 1, Topmost: 1, Peer: addr2}, - {Bin: storageRadius + 1, Start: 1, Topmost: 1, Peer: addr1}, - {Bin: storageRadius + 1, Start: 1, Topmost: 1, Peer: addr2}, - {Bin: storageRadius + 2, Start: 1, Topmost: 1, Peer: addr1}, - {Bin: storageRadius + 2, Start: 1, Topmost: 1, Peer: addr2}, - {Bin: storageRadius + 3, Start: 1, Topmost: 1, Peer: addr1}, - {Bin: storageRadius + 3, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 0, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 0, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 1, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 1, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 2, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 2, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 3, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 3, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 4, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 4, Start: 1, Topmost: 1, Peer: addr2}, } revCalls = kadMock.MakeAddrTupleForRevCalls(base, addr1, addr2) ) @@ -134,22 +131,47 @@ func TestAddressesNotCoverRange(t *testing.T) { revCalls..., )}, pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(replies...)}, - bins: 5, + bins: supportBins, rs: resMock.NewReserve(resMock.WithRadius(storageRadius)), }) + onlyOneTrue := func(bin uint8, addrs ...swarm.Address) { + hadOneTrue := false + for _, addr := range addrs { + if p.IsBinSyncing(addr, bin) { + if hadOneTrue { + t.Fatalf("expected only one true") + } + hadOneTrue = true + } + } + if !hadOneTrue { + t.Fatalf("expected exactly one true") + } + } + + allFalse := func(bin uint8, addrs ...swarm.Address) { + for _, addr := range addrs { + if p.IsBinSyncing(addr, bin) { + t.Fatalf("expected node %x node do not sync bin %d", addr, bin) + } + } + } + time.Sleep(100 * time.Millisecond) kad.Trigger() - if !p.IsBinSyncing(addr1, udIndex+1) || !p.IsBinSyncing(addr2, udIndex+1) { - t.Fatalf("peers should sync bin = uniqueness depth") - } - if !p.IsBinSyncing(addr1, udIndex) || !p.IsBinSyncing(addr2, udIndex) { - t.Fatalf("peers should sync bin = uniqueness depth - 1 > storage radius") - } - if !p.IsBinSyncing(addr1, storageRadius) && !p.IsBinSyncing(addr2, storageRadius) { - t.Fatalf("no peer sync bin = storage radius") + // all bin syncing >= ud + for bin := uint8(4); bin < supportBins; bin++ { + if !p.IsBinSyncing(addr1, bin) || !p.IsBinSyncing(addr2, bin) { + t.Fatalf("peers should sync their unique bin = %d", bin) + } } + + allFalse(3, addr1, addr2) // PO of addr1, addr2 + onlyOneTrue(2, addr1, addr2) + onlyOneTrue(1, addr1, addr2) + onlyOneTrue(0, addr1, addr2) } func TestSyncIntervals(t *testing.T) { From ea8609f284971dd61a495619c6dea29cbbf32a53 Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Sep 2025 17:58:52 +0200 Subject: [PATCH 22/23] chore: linting --- pkg/topology/kademlia/mock/kademlia.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/topology/kademlia/mock/kademlia.go b/pkg/topology/kademlia/mock/kademlia.go index 3e752a74dae..3a116e151cb 100644 --- a/pkg/topology/kademlia/mock/kademlia.go +++ b/pkg/topology/kademlia/mock/kademlia.go @@ -21,7 +21,7 @@ type AddrTuple struct { } func MakeAddrTupleForRevCalls(base swarm.Address, addrs ...swarm.Address) []AddrTuple { - var at []AddrTuple + at := make([]AddrTuple, 0, len(addrs)) for _, addr := range addrs { at = append(at, AddrTuple{Addr: addr, PO: swarm.Proximity(base.Bytes(), addr.Bytes())}) } From 1b4a6f35e5352116031748d3330978800817996f Mon Sep 17 00:00:00 2001 From: nugaon Date: Thu, 25 Sep 2025 08:56:00 +0200 Subject: [PATCH 23/23] docs: add clarification to changeCheck function --- pkg/puller/puller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 15e2127f3b9..98b4ead636a 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -132,6 +132,7 @@ func (p *Puller) manage(ctx context.Context) { var prevRadius uint8 + // change in radius or in neighborhood peerset changeCheck := func() { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock()