diff --git a/pkg/puller/export_test.go b/pkg/puller/export_test.go index b57ae396b3f..bc24f0c10fd 100644 --- a/pkg/puller/export_test.go +++ b/pkg/puller/export_test.go @@ -6,13 +6,34 @@ package puller import "github.com/ethersphere/bee/v2/pkg/swarm" -var PeerIntervalKey = peerIntervalKey +type ( + PeerTreeNodeValue = peerTreeNodeValue +) +var ( + PeerIntervalKey = peerIntervalKey + NewPeerTreeNode = newPeerTreeNode +) + +// NewTreeNode is a wrapper for the generic newTreeNode function for testing +func NewTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { + return newTreeNode(key, p, level) +} + +// IsSyncing returns true if any of the bins is syncing func (p *Puller) IsSyncing(addr swarm.Address) bool { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock() - _, ok := p.syncPeers[addr.ByteString()] - return ok + peer, ok := p.syncPeers[addr.ByteString()] + if !ok { + return false + } + for bin := range p.bins { + if peer.isBinSyncing(bin) { + return true + } + } + return false } func (p *Puller) IsBinSyncing(addr swarm.Address, bin uint8) bool { diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 2fdbc24e9cf..98b4ead636a 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -40,8 +40,6 @@ const ( recalcPeersDur = time.Minute * 5 maxChunksPerSecond = 1000 // roughly 4 MB/s - - maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing. ) type Options struct { @@ -134,7 +132,8 @@ func (p *Puller) manage(ctx context.Context) { var prevRadius uint8 - onChange := func() { + // change in radius or in neighborhood peerset + changeCheck := func() { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock() @@ -152,24 +151,44 @@ func (p *Puller) manage(ctx context.Context) { } p.logger.Debug("radius decrease", "old_radius", prevRadius, "new_radius", newRadius) } + changed := newRadius != prevRadius prevRadius = newRadius // peersDisconnected is used to mark and prune peers that are no longer connected. peersDisconnected := maps.Clone(p.syncPeers) + // make pullsync binary tree of neighbors by their address + bt := newPeerTreeNode(nil, nil, newRadius) _ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { - if _, ok := p.syncPeers[addr.ByteString()]; !ok { - p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po) + syncPeer, ok := p.syncPeers[addr.ByteString()] + if !ok { + syncPeer = newSyncPeer(addr, p.bins, po) + p.syncPeers[addr.ByteString()] = syncPeer + if po >= newRadius { + changed = true + } + } else { + syncPeer.syncBins = make([]bool, p.bins) + } + if po >= newRadius { + _ = bt.Put(addr.Bytes(), &peerTreeNodeValue{SyncBins: syncPeer.syncBins}) } delete(peersDisconnected, addr.ByteString()) return false, false, nil }, topology.Select{}) for _, peer := range peersDisconnected { + // if prevRadius was smaller, we need to take out it from the tree + if peer.po >= min(newRadius, prevRadius) { + changed = true + } p.disconnectPeer(peer.address) } - - p.recalcPeers(ctx, newRadius) + // assign bins to each peer for syncing if peerset or radius changed + if changed { + _ = bt.BinAssignment() + p.recalcPeers(ctx) + } } tick := time.NewTicker(recalcPeersDur) @@ -177,7 +196,7 @@ func (p *Puller) manage(ctx context.Context) { for { - onChange() + changeCheck() select { case <-ctx.Done(): @@ -204,7 +223,7 @@ func (p *Puller) disconnectPeer(addr swarm.Address) { // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. // Must be called under lock. -func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { +func (p *Puller) recalcPeers(ctx context.Context) { var wg sync.WaitGroup for _, peer := range p.syncPeers { wg.Add(1) @@ -212,7 +231,7 @@ func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { go func(peer *syncPeer) { defer p.wg.Done() defer wg.Done() - if err := p.syncPeer(ctx, peer, storageRadius); err != nil { + if err := p.syncPeer(ctx, peer); err != nil { p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err) } }(peer) @@ -220,10 +239,18 @@ func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { wg.Wait() } -func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error { +func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer) error { peer.mtx.Lock() defer peer.mtx.Unlock() + if peer.po < p.radius.StorageRadius() { // no bin is assigned for syncing + peer.stop() + return nil + } + // If the peer's epoch has changed (indicating a reserve reset or storage change on the peer): + // - Cancel all ongoing bin syncs for this peer. + // - Reset all previously synced intervals for this peer (to force a fresh sync). + // This guarantees that sync state is consistent with the peer's current reserve, and avoids pulling stale or irrelevant data. if peer.cursors == nil { cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address) if err != nil { @@ -258,40 +285,20 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin } /* - The syncing behavior diverges for peers outside and within the storage radius. - For neighbor peers, we sync ALL bins greater than or equal to the storage radius. - For peers with PO lower than the storage radius, we must sync ONLY the bin that is the PO. - For peers peer with PO lower than the storage radius and even lower than the allowed minimum threshold, - no syncing is done. + All chunks above storage radius need to be syncronized only once. + For that, the puller must be aware of how neighbors cover the chunks by their bins together. + Based on that information, the puller will assign bins to each peer for sync. + So this point all bins need to be known for a peer for syncing. */ - if peer.po >= storageRadius { - - // cancel all bins lower than the storage radius - for bin := uint8(0); bin < storageRadius; bin++ { - peer.cancelBin(bin) - } - - // sync all bins >= storage radius - for bin, cur := range peer.cursors { - if bin >= int(storageRadius) && !peer.isBinSyncing(uint8(bin)) { - p.syncPeerBin(ctx, peer, uint8(bin), cur) - } - } - - } else if storageRadius-peer.po <= maxPODelta { - // cancel all non-po bins, if any - for bin := uint8(0); bin < p.bins; bin++ { - if bin != peer.po { - peer.cancelBin(bin) + for bin, forSync := range peer.syncBins { + if forSync { + if !peer.isBinSyncing(uint8(bin)) { + p.syncPeerBin(ctx, peer, uint8(bin), peer.cursors[bin]) } + } else { + peer.cancelBin(uint8(bin)) } - // sync PO bin only - if !peer.isBinSyncing(peer.po) { - p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po]) - } - } else { - peer.stop() } return nil @@ -540,7 +547,8 @@ type syncPeer struct { address swarm.Address binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin po uint8 - cursors []uint64 + syncBins []bool // index is bin, value is syncing or not. will be set during folding in the puller neighborhood tree + cursors []uint64 // index is bin, value is cursor mtx sync.Mutex wg sync.WaitGroup @@ -551,9 +559,14 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { address: addr, binCancelFuncs: make(map[uint8]func(), bins), po: po, + syncBins: make([]bool, bins), } } +func (p *syncPeer) GetSyncBins() []bool { + return p.syncBins +} + // called when peer disconnects or on shutdown, cleans up ongoing sync operations func (p *syncPeer) stop() { for bin, c := range p.binCancelFuncs { diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 29687138b9c..b191d65545c 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "math/rand" "testing" "time" @@ -71,7 +72,7 @@ func TestSyncOutsideDepth(t *testing.T) { } ) - _, _, kad, pullsync := newPuller(t, opts{ + p, _, kad, pullsync := newPuller(t, opts{ kad: []kadMock.Option{ kadMock.WithEachPeerRevCalls( kadMock.AddrTuple{Addr: addr, PO: 2}, @@ -86,11 +87,91 @@ func TestSyncOutsideDepth(t *testing.T) { time.Sleep(100 * time.Millisecond) kad.Trigger() + if !p.IsSyncing(addr) { + t.Fatalf("peer is not syncing but should") + } + if p.IsSyncing(addr2) { + t.Fatalf("peer is syncing but shouldn't because not neighbor") + } + waitCursorsCalled(t, pullsync, addr) - waitCursorsCalled(t, pullsync, addr2) waitSyncCalledBins(t, pullsync, addr, 2, 3) - waitSyncCalledBins(t, pullsync, addr2, 0) +} + +// test that addresses cover full range of radius +func TestAddressesCoverage(t *testing.T) { + t.Parallel() + + storageRadius := uint8(0) + base := swarm.NewAddress([]byte{byte(rand.Intn(256))}) + addr1 := swarm.NewAddress([]byte{0b01100000}) + addr2 := swarm.NewAddress([]byte{0b01110001}) + supportBins := uint8(6) + + var ( + cursors = make([]uint64, supportBins) + replies = []mockps.SyncReply{ + {Bin: 0, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 0, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 1, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 1, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 2, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 2, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 3, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 3, Start: 1, Topmost: 1, Peer: addr2}, + {Bin: 4, Start: 1, Topmost: 1, Peer: addr1}, + {Bin: 4, Start: 1, Topmost: 1, Peer: addr2}, + } + revCalls = kadMock.MakeAddrTupleForRevCalls(base, addr1, addr2) + ) + + p, _, kad, _ := newPuller(t, opts{ + kad: []kadMock.Option{kadMock.WithEachPeerRevCalls( + revCalls..., + )}, + pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(replies...)}, + bins: supportBins, + rs: resMock.NewReserve(resMock.WithRadius(storageRadius)), + }) + + onlyOneTrue := func(bin uint8, addrs ...swarm.Address) { + hadOneTrue := false + for _, addr := range addrs { + if p.IsBinSyncing(addr, bin) { + if hadOneTrue { + t.Fatalf("expected only one true") + } + hadOneTrue = true + } + } + if !hadOneTrue { + t.Fatalf("expected exactly one true") + } + } + + allFalse := func(bin uint8, addrs ...swarm.Address) { + for _, addr := range addrs { + if p.IsBinSyncing(addr, bin) { + t.Fatalf("expected node %x node do not sync bin %d", addr, bin) + } + } + } + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + + // all bin syncing >= ud + for bin := uint8(4); bin < supportBins; bin++ { + if !p.IsBinSyncing(addr1, bin) || !p.IsBinSyncing(addr2, bin) { + t.Fatalf("peers should sync their unique bin = %d", bin) + } + } + + allFalse(3, addr1, addr2) // PO of addr1, addr2 + onlyOneTrue(2, addr1, addr2) + onlyOneTrue(1, addr1, addr2) + onlyOneTrue(0, addr1, addr2) } func TestSyncIntervals(t *testing.T) { @@ -212,7 +293,7 @@ func TestPeerDisconnected(t *testing.T) { p, _, kad, pullsync := newPuller(t, opts{ kad: []kadMock.Option{ kadMock.WithEachPeerRevCalls( - kadMock.AddrTuple{Addr: addr, PO: 1}, + kadMock.AddrTuple{Addr: addr, PO: 2}, ), }, pullSync: []mockps.Option{mockps.WithCursors(cursors, 0)}, @@ -222,10 +303,10 @@ func TestPeerDisconnected(t *testing.T) { time.Sleep(100 * time.Millisecond) kad.Trigger() - waitCursorsCalled(t, pullsync, addr) if !p.IsSyncing(addr) { t.Fatalf("peer is not syncing but should") } + waitCursorsCalled(t, pullsync, addr) kad.ResetPeers() kad.Trigger() time.Sleep(50 * time.Millisecond) @@ -298,13 +379,13 @@ func TestBinReset(t *testing.T) { cursors = []uint64{1000, 1000, 1000} ) - _, s, kad, pullsync := newPuller(t, opts{ + p, s, kad, pullsync := newPuller(t, opts{ kad: []kadMock.Option{ kadMock.WithEachPeerRevCalls( - kadMock.AddrTuple{Addr: addr, PO: 1}, + kadMock.AddrTuple{Addr: addr, PO: 2}, ), }, - pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(mockps.SyncReply{Bin: 1, Start: 1, Topmost: 1, Peer: addr})}, + pullSync: []mockps.Option{mockps.WithCursors(cursors, 0), mockps.WithReplies(mockps.SyncReply{Bin: 2, Start: 1, Topmost: 1, Peer: addr})}, // bin must be at least radius to sync bins: 3, rs: resMock.NewReserve(resMock.WithRadius(2)), }) @@ -313,6 +394,9 @@ func TestBinReset(t *testing.T) { kad.Trigger() + if !p.IsSyncing(addr) { + t.Fatalf("peer is not syncing but should") + } waitCursorsCalled(t, pullsync, addr) waitSync(t, pullsync, addr) @@ -462,11 +546,8 @@ func TestRadiusIncrease(t *testing.T) { rs.SetStorageRadius(2) kad.Trigger() time.Sleep(100 * time.Millisecond) - if !p.IsBinSyncing(addr, 1) { - t.Fatalf("peer is not syncing but should") - } - if p.IsBinSyncing(addr, 2) { - t.Fatalf("peer is syncing but shouldn't") + if p.IsBinSyncing(addr, 1) || p.IsBinSyncing(addr, 2) || p.IsBinSyncing(addr, 3) { + t.Fatalf("peer is syncing but shouldn't because it is not a neighbor") } } diff --git a/pkg/puller/tree.go b/pkg/puller/tree.go new file mode 100644 index 00000000000..18d2741ea00 --- /dev/null +++ b/pkg/puller/tree.go @@ -0,0 +1,177 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package puller + +import ( + "bytes" + "errors" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +var errShouldNeverHappen = errors.New("should never happen in case of keys have the same length, bits of those are matching until the starting level and method was called from root") + +// TreeNode is a leaf compacted binary tree +// representing the address space of the neighborhood +type TreeNode[T any] struct { + K []byte + V *T + L uint8 + C [2]*TreeNode[T] +} + +// Put will override the value if the key is already present +func (t *TreeNode[T]) Put(key []byte, p *T) *TreeNode[T] { + bitIndex, err := bitOfBytes(key, t.L) + if err != nil { + // cannot go further in the binary representation, override leaf + if t.V != nil && !bytes.Equal(t.K, key) { + panic(errShouldNeverHappen) + } + t.V = p + t.K = key + return t + } + + c := t.C[bitIndex] + if c != nil { + return c.Put(key, p) + } + + if t.C[1-bitIndex] == nil { + // both children are nil, we are on a leaf. + if t.V == nil || bytes.Equal(t.K, key) { + t.V = p + t.K = key + return t + } + + // create as many parent tree nodes as needed + po := swarm.Proximity(t.K, key) + parent := t + ci := bitIndex + for i := uint8(0); i < po-t.L; i++ { + parent.C[ci] = newTreeNode[T](nil, nil, parent.L+1) + parent = parent.C[ci] + ci, err = bitOfBytes(key, parent.L) + if err != nil { + panic(errShouldNeverHappen) + } + } + + // move the old leaf value to the new parent + parent.C[1-ci] = newTreeNode(t.K, t.V, parent.L+1) + t.V = nil + t.K = nil + + // insert p to the new parent + parent.C[ci] = newTreeNode(key, p, parent.L+1) + return parent.C[ci] + } + + // child slot is free on the node so peer can be inserted. + c = newTreeNode(key, p, t.L+1) + t.C[bitIndex] = c + return c +} + +func (p TreeNode[T]) isLeaf() bool { + return p.V != nil +} + +func newTreeNode[T any](key []byte, p *T, level uint8) *TreeNode[T] { + return &TreeNode[T]{ + K: key, + V: p, + L: level, + C: [2]*TreeNode[T]{}, + } +} + +// bitOfBytes extracts the bit at the specified index from a byte slice. +// Returns 0 or 1 based on the bit value at the given position. +func bitOfBytes(bytes []byte, bitIndex uint8) (uint8, error) { + if int(bitIndex) >= len(bytes)*8 { + return 0, errors.New("bit index out of range") + } + byteIndex := bitIndex / 8 + bitPosition := 7 - (bitIndex % 8) // MSB first (big-endian) + + b := bytes[byteIndex] + return (b >> bitPosition) & 1, nil +} + +func newPeerTreeNode(key []byte, p *peerTreeNodeValue, level uint8) *peerTreeNode { + return &peerTreeNode{ + TreeNode: &TreeNode[peerTreeNodeValue]{ + K: key, + V: p, + L: level, + C: [2]*TreeNode[peerTreeNodeValue]{}, + }, + } +} + +// All properties of a peer that are needed for bin assignment +type peerTreeNodeValue struct { + SyncBins []bool +} + +// peerTreeNode is a specialized TreeNode for managing sync peers +type peerTreeNode struct { + *TreeNode[peerTreeNodeValue] +} + +// BinAssignment assigns bins to each peer for syncing by traversing the tree +func (t *peerTreeNode) BinAssignment() (peers []*peerTreeNodeValue) { + if t.isLeaf() { + bl := uint8(len(t.V.SyncBins)) + for i := t.L; i < bl; i++ { + t.V.SyncBins[i] = true + } + + return []*peerTreeNodeValue{t.V} + } + // handle compactible nodes and nodes with both children + for i := 1; i >= 0; i-- { + if t.C[1-i] != nil { + l := (&peerTreeNode{t.C[1-i]}).BinAssignment() + peers = append(peers, l...) + if t.C[i] == nil { + // choose one of the leaves to add level bin + p := selectSyncPeer(peers) + p.SyncBins[t.L] = true + } + } + } + return peers +} + +// selectSyncPeer selects a peer from many to sync the bin +// that should include the same chunks among the peers +// current strategy: it returns the peer with the least number of bins assigned +// assumes peers array has one element at least +func selectSyncPeer(peers []*peerTreeNodeValue) *peerTreeNodeValue { + minPeer := peers[0] + minCount := countTrue(minPeer.SyncBins) + for i := 1; i < len(peers); i++ { + count := countTrue(peers[i].SyncBins) + if count < minCount { + minPeer = peers[i] + minCount = count + } + } + return minPeer +} + +// countTrue returns the number of true values in a boolean slice. +func countTrue(arr []bool) int { + count := 0 + for _, v := range arr { + if v { + count++ + } + } + return count +} diff --git a/pkg/puller/tree_test.go b/pkg/puller/tree_test.go new file mode 100644 index 00000000000..77ac80adbbc --- /dev/null +++ b/pkg/puller/tree_test.go @@ -0,0 +1,301 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package puller_test + +import ( + "bytes" + "testing" + + "github.com/ethersphere/bee/v2/pkg/puller" +) + +func TestTreeNodePut(t *testing.T) { + t.Parallel() + + noKeyAndValue := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() + if c.V != nil || c.K != nil { + t.Fatalf("expected no key and value") + } + } + + onlyRightChild := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() + if c.C[0] != nil || c.C[1] == nil { + t.Fatalf("expected to have a right child only") + } + } + + onlyLeftChild := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() + if c.C[0] == nil || c.C[1] != nil { + t.Fatalf("expected to have a left child only") + } + } + + noChildren := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() + if c.C[0] != nil || c.C[1] != nil { + t.Fatalf("expected no children") + } + } + + bothChildren := func(t *testing.T, c *puller.TreeNode[int]) { + t.Helper() + if c.C[0] == nil || c.C[1] == nil { + t.Fatalf("expected both left and right branches to exist") + } + } + + keyEqualsTo := func(t *testing.T, c *puller.TreeNode[int], k []byte) { + t.Helper() + if !bytes.Equal(c.K, k) { + t.Fatalf("expected key to be %b, got %b", k, c.K) + } + } + + valueEqualsTo := func(t *testing.T, c *puller.TreeNode[int], v int) { + t.Helper() + if c.V == nil || *c.V != v { + t.Fatalf("expected value to be %b, got %b", v, c.V) + } + } + + lengthEqualsTo := func(t *testing.T, c *puller.TreeNode[int], l uint8) { + t.Helper() + if c.L != l { + t.Fatalf("expected length to be %d, got %d", l, c.L) + } + } + + t.Run("TestWithEdgeCases", func(t *testing.T) { + t.Parallel() + + tree := puller.NewTreeNode[int](nil, nil, 0) + keys := [][]byte{ + {0b11000000}, + {0b11100000}, + {0b00000000}, + {0b00000001}, + {0b00010001}, + } + for i, k := range keys { + _ = tree.Put(k, &i) + } + + cursor := tree + // Simple verification that values were stored + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + //check right side of the tree + cursor = tree.C[1] // prefix: 1 + noKeyAndValue(t, cursor) + onlyRightChild(t, cursor) + lengthEqualsTo(t, cursor, 1) + cursor = cursor.C[1] // prefix 11 + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + cursor0 := cursor.C[0] + noChildren(t, cursor0) + keyEqualsTo(t, cursor0, []byte{0b11000000}) + valueEqualsTo(t, cursor0, 0) + lengthEqualsTo(t, cursor0, 3) + cursor1 := cursor.C[1] + noChildren(t, cursor1) + keyEqualsTo(t, cursor1, []byte{0b11100000}) + valueEqualsTo(t, cursor1, 1) + lengthEqualsTo(t, cursor1, 3) + + //check right left of the tree + cursor = tree.C[0] // prefix: 0 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + lengthEqualsTo(t, cursor, 1) + cursor = cursor.C[0] // prefix: 00 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + lengthEqualsTo(t, cursor, 2) + cursor = cursor.C[0] // prefix: 000 + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + lengthEqualsTo(t, cursor, 3) + cursor1 = cursor.C[1] // prefix: 0001 + noChildren(t, cursor1) + keyEqualsTo(t, cursor1, []byte{0b00010001}) + valueEqualsTo(t, cursor1, 4) + lengthEqualsTo(t, cursor1, 4) + cursor = cursor.C[0] // prefix: 0000 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + cursor = cursor.C[0] // prefix: 00000 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + cursor = cursor.C[0] // prefix: 000000 + noKeyAndValue(t, cursor) + onlyLeftChild(t, cursor) + cursor = cursor.C[0] // prefix: 0000000 + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + cursor0 = cursor.C[0] + noChildren(t, cursor0) + keyEqualsTo(t, cursor0, []byte{0b00000000}) + valueEqualsTo(t, cursor0, 2) + cursor1 = cursor.C[1] + noChildren(t, cursor1) + keyEqualsTo(t, cursor1, []byte{0b00000001}) + valueEqualsTo(t, cursor1, 3) + }) + + t.Run("TestWithStartingLevel", func(t *testing.T) { + tree := puller.NewTreeNode[int](nil, nil, 2) + keys := [][]byte{ + {0b00000000}, + {0b00100000}, + } + for i, k := range keys { + _ = tree.Put(k, &i) + } + cursor := tree + noKeyAndValue(t, cursor) + bothChildren(t, cursor) + lengthEqualsTo(t, cursor, 2) + cursor0 := cursor.C[0] + keyEqualsTo(t, cursor0, keys[0]) + valueEqualsTo(t, cursor0, 0) + noChildren(t, cursor0) + lengthEqualsTo(t, cursor0, 3) + cursor1 := cursor.C[1] + keyEqualsTo(t, cursor1, keys[1]) + valueEqualsTo(t, cursor1, 1) + noChildren(t, cursor1) + lengthEqualsTo(t, cursor1, 3) + }) +} + +func TestBinAssignment(t *testing.T) { + t.Parallel() + + allTrue := func(t *testing.T, from uint8, arr []bool) { + t.Helper() + for i := from; i < uint8(len(arr)); i++ { + if !arr[i] { + t.Fatalf("expected true at index %d", i) + } + } + } + + allFalse := func(t *testing.T, index uint8, arrs ...[]bool) { + t.Helper() + for i, arr := range arrs { + if arr[index] { + t.Fatalf("expected false at index %d for array %d", index, i) + } + } + } + + t.Run("TestEmpty", func(t *testing.T) { + t.Parallel() + + tree := puller.NewPeerTreeNode(nil, nil, 0) + neighbors := tree.BinAssignment() + if len(neighbors) != 0 { + t.Errorf("expected no neighbors, got %d", len(neighbors)) + } + }) + + t.Run("TestOnePeer", func(t *testing.T) { + t.Parallel() + + tree := puller.NewPeerTreeNode(nil, nil, 0) + syncBins := make([]bool, 4) + tree.Put([]byte{0b00000000}, &puller.PeerTreeNodeValue{SyncBins: syncBins}) + neighbors := tree.BinAssignment() + if len(neighbors) != 1 { + t.Errorf("expected one peer, got %d", len(neighbors)) + } + allTrue(t, 0, neighbors[0].SyncBins) + }) + + t.Run("TestWithStartingLevel", func(t *testing.T) { + t.Parallel() + + tree := puller.NewPeerTreeNode(nil, nil, 2) + syncBins := make([]bool, 6) + tree.Put([]byte{0b00000000}, &puller.PeerTreeNodeValue{SyncBins: syncBins}) + neighbors := tree.BinAssignment() + if len(neighbors) != 1 { + t.Errorf("expected one peer, got %d", len(neighbors)) + } + allTrue(t, 2, neighbors[0].SyncBins) + allFalse(t, 1, neighbors[0].SyncBins) + allFalse(t, 0, neighbors[0].SyncBins) + }) + + t.Run("TestMultiplePeers", func(t *testing.T) { + t.Parallel() + + onlyOneTrue := func(t *testing.T, index uint8, arrs ...[]bool) { + t.Helper() + hadOneTrue := false + for _, arr := range arrs { + if arr[index] { + if hadOneTrue { + t.Fatalf("expected only one true") + } + hadOneTrue = true + } + } + if !hadOneTrue { + t.Fatalf("expected exactly one true") + } + } + + tree := puller.NewPeerTreeNode(nil, nil, 0) + maxBins := 9 // bin0-bin8 + sb := make([][]bool, 0, 7) + putSyncPeer := func(key []byte) { + syncBins := make([]bool, maxBins) + sb = append(sb, syncBins) + tree.Put(key, &puller.PeerTreeNodeValue{SyncBins: syncBins}) + } + putSyncPeer([]byte{0b00000001}) + putSyncPeer([]byte{0b00000000}) + putSyncPeer([]byte{0b00010001}) + putSyncPeer([]byte{0b00010000}) + putSyncPeer([]byte{0b11100000}) + putSyncPeer([]byte{0b11000000}) + putSyncPeer([]byte{0b11110000}) + + neighbors := tree.BinAssignment() + if len(neighbors) != 7 { + t.Errorf("expected 7 neighbors, got %d", len(neighbors)) + } + + // check bins >= ud + allTrue(t, 8, sb[0]) + allTrue(t, 8, sb[1]) + allTrue(t, 8, sb[2]) + allTrue(t, 8, sb[3]) + allTrue(t, 4, sb[4]) + allTrue(t, 3, sb[5]) + allTrue(t, 4, sb[6]) + + // check intermediate bins + // left branch + allFalse(t, 7, sb[0], sb[1]) + onlyOneTrue(t, 6, sb[0], sb[1]) + onlyOneTrue(t, 5, sb[0], sb[1]) + onlyOneTrue(t, 4, sb[0], sb[1]) + allFalse(t, 7, sb[2], sb[3]) + onlyOneTrue(t, 6, sb[2], sb[3]) + onlyOneTrue(t, 5, sb[2], sb[3]) + onlyOneTrue(t, 4, sb[2], sb[3]) + allFalse(t, 3, sb[0], sb[1], sb[2], sb[3]) + // right branch + allFalse(t, 3, sb[4], sb[6]) + allFalse(t, 2, sb[4], sb[5], sb[6]) + onlyOneTrue(t, 1, sb[4], sb[5], sb[6]) + allFalse(t, 0, sb[0], sb[1], sb[2], sb[3], sb[4], sb[5], sb[6]) + }) +} diff --git a/pkg/topology/kademlia/mock/kademlia.go b/pkg/topology/kademlia/mock/kademlia.go index 81fa3fe69d6..3a116e151cb 100644 --- a/pkg/topology/kademlia/mock/kademlia.go +++ b/pkg/topology/kademlia/mock/kademlia.go @@ -6,6 +6,7 @@ package mock import ( "context" + "sort" "sync" "time" @@ -19,6 +20,17 @@ type AddrTuple struct { PO uint8 // the po } +func MakeAddrTupleForRevCalls(base swarm.Address, addrs ...swarm.Address) []AddrTuple { + at := make([]AddrTuple, 0, len(addrs)) + for _, addr := range addrs { + at = append(at, AddrTuple{Addr: addr, PO: swarm.Proximity(base.Bytes(), addr.Bytes())}) + } + sort.Slice(at, func(i, j int) bool { + return at[i].PO > at[j].PO + }) + return at +} + func WithEachPeerRevCalls(addrs ...AddrTuple) Option { return optionFunc(func(m *Mock) { m.eachPeerRev = append(m.eachPeerRev, addrs...)