diff --git a/.gx/lastpubver b/.gx/lastpubver index 86add2a3..f3fdab6b 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.0.15: QmVDDgboX5nPUE4pBcK2xC1b9XbStA4t2KrUWBRMr9AiFd +1.1.0: QmQPWVDYeWvxN75cP4MGrbMVpADm2XqpM4KxgvbxkYk16u diff --git a/bitswap.go b/bitswap.go index f6a42fc7..b8dd498c 100644 --- a/bitswap.go +++ b/bitswap.go @@ -96,8 +96,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, network: network, findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, - newBlocks: make(chan *cid.Cid, HasBlockBufferSize), - provideKeys: make(chan *cid.Cid, provideKeysBufferSize), + newBlocks: make(chan cid.Cid, HasBlockBufferSize), + provideKeys: make(chan cid.Cid, provideKeysBufferSize), wm: NewWantManager(ctx, network), counters: new(counters), @@ -146,9 +146,9 @@ type Bitswap struct { // newBlocks is a channel for newly added blocks to be provided to the // network. blocks pushed down this channel get buffered and fed to the // provideKeys channel later on to avoid too much network activity - newBlocks chan *cid.Cid + newBlocks chan cid.Cid // provideKeys directly feeds provide workers - provideKeys chan *cid.Cid + provideKeys chan cid.Cid process process.Process @@ -179,18 +179,18 @@ type counters struct { } type blockRequest struct { - Cid *cid.Cid + Cid cid.Cid Ctx context.Context } // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. -func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { +func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) { return getBlock(parent, k, bs.GetBlocks) } -func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid { - var out []*cid.Cid +func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid { + var out []cid.Cid for _, e := range bs.engine.WantlistForPeer(p) { out = append(out, e.Cid) } @@ -208,7 +208,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt { // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) -func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { +func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) { if len(keys) == 0 { out := make(chan blocks.Block) close(out) @@ -259,7 +259,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block return } - bs.CancelWants([]*cid.Cid{blk.Cid()}, mses) + bs.CancelWants([]cid.Cid{blk.Cid()}, mses) remaining.Remove(blk.Cid()) select { case out <- blk: @@ -288,7 +288,7 @@ func (bs *Bitswap) getNextSessionID() uint64 { } // CancelWant removes a given key from the wantlist -func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { +func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) { if len(cids) == 0 { return } @@ -326,7 +326,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { bs.notifications.Publish(blk) k := blk.Cid() - ks := []*cid.Cid{k} + ks := []cid.Cid{k} for _, s := range bs.SessionsForBlock(k) { s.receiveBlockFrom(from, blk) bs.CancelWants(ks, s.id) @@ -344,7 +344,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { } // SessionsForBlock returns a slice of all sessions that may be interested in the given cid -func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { +func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session { bs.sessLk.Lock() defer bs.sessLk.Unlock() @@ -440,9 +440,9 @@ func (bs *Bitswap) Close() error { return bs.process.Close() } -func (bs *Bitswap) GetWantlist() []*cid.Cid { +func (bs *Bitswap) GetWantlist() []cid.Cid { entries := bs.wm.wl.Entries() - out := make([]*cid.Cid, 0, len(entries)) + out := make([]cid.Cid, 0, len(entries)) for _, e := range entries { out = append(out, e.Cid) } diff --git a/bitswap_test.go b/bitswap_test.go index 34885996..715958eb 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -179,7 +179,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } } - var blkeys []*cid.Cid + var blkeys []cid.Cid first := instances[0] for _, b := range blocks { blkeys = append(blkeys, b.Cid()) @@ -253,7 +253,7 @@ func TestSendToWantingPeer(t *testing.T) { // peerA requests and waits for block alpha ctx, cancel := context.WithTimeout(context.Background(), waitTime) defer cancel() - alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []*cid.Cid{alpha.Cid()}) + alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []cid.Cid{alpha.Cid()}) if err != nil { t.Fatal(err) } @@ -285,7 +285,7 @@ func TestEmptyKey(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - _, err := bs.GetBlock(ctx, nil) + _, err := bs.GetBlock(ctx, cid.Cid{}) if err != blockstore.ErrNotFound { t.Error("empty str key should return ErrNotFound") } @@ -393,7 +393,7 @@ func TestDoubleGet(t *testing.T) { // through before the peers even get connected. This is okay, bitswap // *should* be able to handle this. ctx1, cancel1 := context.WithCancel(context.Background()) - blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()}) + blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []cid.Cid{blocks[0].Cid()}) if err != nil { t.Fatal(err) } @@ -401,7 +401,7 @@ func TestDoubleGet(t *testing.T) { ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel2() - blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []*cid.Cid{blocks[0].Cid()}) + blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []cid.Cid{blocks[0].Cid()}) if err != nil { t.Fatal(err) } @@ -456,7 +456,7 @@ func TestWantlistCleanup(t *testing.T) { bswap := instances.Exchange blocks := bg.Blocks(20) - var keys []*cid.Cid + var keys []cid.Cid for _, b := range blocks { keys = append(keys, b.Cid()) } diff --git a/decision/ledger.go b/decision/ledger.go index f38460ec..2c449763 100644 --- a/decision/ledger.go +++ b/decision/ledger.go @@ -76,16 +76,16 @@ func (l *ledger) ReceivedBytes(n int) { l.Accounting.BytesRecv += uint64(n) } -func (l *ledger) Wants(k *cid.Cid, priority int) { +func (l *ledger) Wants(k cid.Cid, priority int) { log.Debugf("peer %s wants %s", l.Partner, k) l.wantList.Add(k, priority) } -func (l *ledger) CancelWant(k *cid.Cid) { +func (l *ledger) CancelWant(k cid.Cid) { l.wantList.Remove(k) } -func (l *ledger) WantListContains(k *cid.Cid) (*wl.Entry, bool) { +func (l *ledger) WantListContains(k cid.Cid) (*wl.Entry, bool) { return l.wantList.Contains(k) } diff --git a/decision/peer_request_queue.go b/decision/peer_request_queue.go index b9e34763..78113f75 100644 --- a/decision/peer_request_queue.go +++ b/decision/peer_request_queue.go @@ -15,7 +15,7 @@ type peerRequestQueue interface { // Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty. Pop() *peerRequestTask Push(entry *wantlist.Entry, to peer.ID) - Remove(k *cid.Cid, p peer.ID) + Remove(k cid.Cid, p peer.ID) // NB: cannot expose simply expose taskQueue.Len because trashed elements // may exist. These trashed elements should not contribute to the count. @@ -114,7 +114,7 @@ func (tl *prq) Pop() *peerRequestTask { } // Remove removes a task from the queue -func (tl *prq) Remove(k *cid.Cid, p peer.ID) { +func (tl *prq) Remove(k cid.Cid, p peer.ID) { tl.lock.Lock() t, ok := tl.taskMap[taskKey(p, k)] if ok { @@ -195,7 +195,7 @@ func (t *peerRequestTask) SetIndex(i int) { } // taskKey returns a key that uniquely identifies a task. -func taskKey(p peer.ID, k *cid.Cid) string { +func taskKey(p peer.ID, k cid.Cid) string { return string(p) + k.KeyString() } @@ -281,7 +281,7 @@ func partnerCompare(a, b pq.Elem) bool { } // StartTask signals that a task was started for this partner -func (p *activePartner) StartTask(k *cid.Cid) { +func (p *activePartner) StartTask(k cid.Cid) { p.activelk.Lock() p.activeBlocks.Add(k) p.active++ @@ -289,7 +289,7 @@ func (p *activePartner) StartTask(k *cid.Cid) { } // TaskDone signals that a task was completed for this partner -func (p *activePartner) TaskDone(k *cid.Cid) { +func (p *activePartner) TaskDone(k cid.Cid) { p.activelk.Lock() p.activeBlocks.Remove(k) p.active-- diff --git a/get.go b/get.go index be5cf3cb..8578277e 100644 --- a/get.go +++ b/get.go @@ -11,11 +11,11 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" ) -type getBlocksFunc func(context.Context, []*cid.Cid) (<-chan blocks.Block, error) +type getBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error) -func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, error) { - if k == nil { - log.Error("nil cid in GetBlock") +func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, error) { + if !k.Defined() { + log.Error("undefined cid in GetBlock") return nil, blockstore.ErrNotFound } @@ -28,7 +28,7 @@ func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, er ctx, cancel := context.WithCancel(p) defer cancel() - promise, err := gb(ctx, []*cid.Cid{k}) + promise, err := gb(ctx, []cid.Cid{k}) if err != nil { return nil, err } @@ -49,9 +49,9 @@ func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, er } } -type wantFunc func(context.Context, []*cid.Cid) +type wantFunc func(context.Context, []cid.Cid) -func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]*cid.Cid)) (<-chan blocks.Block, error) { +func getBlocksImpl(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) { if len(keys) == 0 { out := make(chan blocks.Block) close(out) @@ -72,7 +72,7 @@ func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.Pub return out, nil } -func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]*cid.Cid)) { +func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) { ctx, cancel := context.WithCancel(ctx) defer func() { cancel() diff --git a/message/message.go b/message/message.go index 9aba444b..92f0259c 100644 --- a/message/message.go +++ b/message/message.go @@ -25,9 +25,9 @@ type BitSwapMessage interface { Blocks() []blocks.Block // AddEntry adds an entry to the Wantlist. - AddEntry(key *cid.Cid, priority int) + AddEntry(key cid.Cid, priority int) - Cancel(key *cid.Cid) + Cancel(key cid.Cid) Empty() bool @@ -134,16 +134,16 @@ func (m *impl) Blocks() []blocks.Block { return bs } -func (m *impl) Cancel(k *cid.Cid) { +func (m *impl) Cancel(k cid.Cid) { delete(m.wantlist, k.KeyString()) m.addEntry(k, 0, true) } -func (m *impl) AddEntry(k *cid.Cid, priority int) { +func (m *impl) AddEntry(k cid.Cid, priority int) { m.addEntry(k, priority, false) } -func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) { +func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) { k := c.KeyString() e, exists := m.wantlist[k] if exists { diff --git a/message/message_test.go b/message/message_test.go index 539d212e..a3e1cd8f 100644 --- a/message/message_test.go +++ b/message/message_test.go @@ -11,7 +11,7 @@ import ( u "github.com/ipfs/go-ipfs-util" ) -func mkFakeCid(s string) *cid.Cid { +func mkFakeCid(s string) cid.Cid { return cid.NewCidV0(u.Hash([]byte(s))) } @@ -67,7 +67,7 @@ func TestAppendBlock(t *testing.T) { } func TestWantlist(t *testing.T) { - keystrs := []*cid.Cid{mkFakeCid("foo"), mkFakeCid("bar"), mkFakeCid("baz"), mkFakeCid("bat")} + keystrs := []cid.Cid{mkFakeCid("foo"), mkFakeCid("bar"), mkFakeCid("baz"), mkFakeCid("bat")} m := New(true) for _, s := range keystrs { m.AddEntry(s, 1) @@ -163,7 +163,7 @@ func TestToAndFromNetMessage(t *testing.T) { } } -func wantlistContains(wantlist *pb.Message_Wantlist, c *cid.Cid) bool { +func wantlistContains(wantlist *pb.Message_Wantlist, c cid.Cid) bool { for _, e := range wantlist.GetEntries() { if bytes.Equal(e.GetBlock(), c.Bytes()) { return true diff --git a/network/interface.go b/network/interface.go index 03a37980..fd5622c1 100644 --- a/network/interface.go +++ b/network/interface.go @@ -63,8 +63,8 @@ type Receiver interface { type Routing interface { // FindProvidersAsync returns a channel of providers for the given key - FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID + FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID // Provide provides the key to the network - Provide(context.Context, *cid.Cid) error + Provide(context.Context, cid.Cid) error } diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index aa142d87..cd0670ae 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -138,7 +138,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { } // FindProvidersAsync returns a channel of providers for the given key -func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { +func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { // Since routing queries are expensive, give bitswap the peers to which we // have open connections. Note that this may cause issues if bitswap starts @@ -174,7 +174,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) } // Provide provides the key to the network -func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error { +func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error { return bsnet.routing.Provide(ctx, k, true) } diff --git a/notifications/notifications.go b/notifications/notifications.go index d2027010..81ba3949 100644 --- a/notifications/notifications.go +++ b/notifications/notifications.go @@ -13,7 +13,7 @@ const bufferSize = 16 type PubSub interface { Publish(block blocks.Block) - Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block + Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block Shutdown() } @@ -61,7 +61,7 @@ func (ps *impl) Shutdown() { // Subscribe returns a channel of blocks for the given |keys|. |blockChannel| // is closed if the |ctx| times out or is cancelled, or after sending len(keys) // blocks. -func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block { +func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block { blocksCh := make(chan blocks.Block, len(keys)) valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking @@ -121,7 +121,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.B return blocksCh } -func toStrings(keys []*cid.Cid) []string { +func toStrings(keys []cid.Cid) []string { strs := make([]string, 0, len(keys)) for _, key := range keys { strs = append(strs, key.KeyString()) diff --git a/notifications/notifications_test.go b/notifications/notifications_test.go index e377f319..38ab6f9a 100644 --- a/notifications/notifications_test.go +++ b/notifications/notifications_test.go @@ -151,8 +151,8 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { t.Log("generate a large number of blocks. exceed default buffer") bs := g.Blocks(1000) - ks := func() []*cid.Cid { - var keys []*cid.Cid + ks := func() []cid.Cid { + var keys []cid.Cid for _, b := range bs { keys = append(keys, b.Cid()) } diff --git a/package.json b/package.json index d78c27e1..852954f3 100644 --- a/package.json +++ b/package.json @@ -15,9 +15,9 @@ }, { "author": "hsanjuan", - "hash": "QmRuUsZEg2WLCuidJGHVmE1NreHDmXWKLS466PKyDpXMhN", + "hash": "QmSbZCrt5cSiCNcXFZKoGjukcEf4DRdTzexqzEWATZDdz6", "name": "go-ipfs-routing", - "version": "0.0.25" + "version": "0.1.0" }, { "author": "whyrusleeping", @@ -33,9 +33,9 @@ }, { "author": "whyrusleeping", - "hash": "QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb", + "hash": "QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7", "name": "go-cid", - "version": "0.8.0" + "version": "0.9.0" }, { "author": "whyrusleeping", @@ -45,9 +45,9 @@ }, { "author": "stebalien", - "hash": "QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3", + "hash": "QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM", "name": "go-block-format", - "version": "0.1.11" + "version": "0.2.0" }, { "author": "ipfs", @@ -69,15 +69,15 @@ }, { "author": "hsanjuan", - "hash": "Qmc3bqcjeTjbvgbMW5MvW8SBXnEmTquEx9CXSjT2myoFmi", + "hash": "QmUf9xAmbHndjh31YNRugNeyNX6mDTUDuWN1bVTmUriwdZ", "name": "go-ipfs-blocksutil", - "version": "0.0.6" + "version": "0.1.0" }, { "author": "hsanjuan", - "hash": "Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i", + "hash": "QmeMussyD8s3fQ3pM19ZsfbxvomEqPV9FvczLMWyBDYSnS", "name": "go-ipfs-blockstore", - "version": "0.0.20" + "version": "0.1.0" }, { "author": "hector", @@ -87,9 +87,9 @@ }, { "author": "hsanjuan", - "hash": "QmWw71Mz9PXKgYG8ZfTYN7Ax2Zm48Eurbne3wC2y7CKmLz", + "hash": "QmR1nncPsZR14A4hWr39mq8Lm7BGgS68bHVT9nop8NpWEM", "name": "go-ipfs-exchange-interface", - "version": "0.0.6" + "version": "0.1.0" }, { "author": "whyrusleeping", @@ -175,9 +175,9 @@ "version": "3.1.0" }, { - "hash": "QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU", + "hash": "QmdKS5YtmuSWKuLLgbHG176mS3VX3AKiyVmaaiAfvgcuch", "name": "go-libp2p-routing", - "version": "2.4.10" + "version": "2.5.0" }, { "author": "jbenet", @@ -191,6 +191,6 @@ "license": "", "name": "go-bitswap", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.0.15" + "version": "1.1.0" } diff --git a/session.go b/session.go index d652dac1..a3b6005b 100644 --- a/session.go +++ b/session.go @@ -28,8 +28,8 @@ type Session struct { bs *Bitswap incoming chan blkRecv - newReqs chan []*cid.Cid - cancelKeys chan []*cid.Cid + newReqs chan []cid.Cid + cancelKeys chan []cid.Cid interestReqs chan interestReq interest *lru.Cache @@ -55,8 +55,8 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { s := &Session{ activePeers: make(map[peer.ID]struct{}), liveWants: make(map[string]time.Time), - newReqs: make(chan []*cid.Cid), - cancelKeys: make(chan []*cid.Cid), + newReqs: make(chan []cid.Cid), + cancelKeys: make(chan []cid.Cid), tofetch: newCidQueue(), interestReqs: make(chan interestReq), ctx: ctx, @@ -85,7 +85,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { func (bs *Bitswap) removeSession(s *Session) { s.notif.Shutdown() - live := make([]*cid.Cid, 0, len(s.liveWants)) + live := make([]cid.Cid, 0, len(s.liveWants)) for c := range s.liveWants { cs, _ := cid.Cast([]byte(c)) live = append(live, cs) @@ -116,7 +116,7 @@ func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) { } type interestReq struct { - c *cid.Cid + c cid.Cid resp chan bool } @@ -127,7 +127,7 @@ type interestReq struct { // note that in the average case (where this session *is* interested in the // block we received) this function will not be called, as the cid will likely // still be in the interest cache. -func (s *Session) isLiveWant(c *cid.Cid) bool { +func (s *Session) isLiveWant(c cid.Cid) bool { resp := make(chan bool, 1) select { case s.interestReqs <- interestReq{ @@ -146,7 +146,7 @@ func (s *Session) isLiveWant(c *cid.Cid) bool { } } -func (s *Session) interestedIn(c *cid.Cid) bool { +func (s *Session) interestedIn(c cid.Cid) bool { return s.interest.Contains(c.KeyString()) || s.isLiveWant(c) } @@ -208,7 +208,7 @@ func (s *Session) run(ctx context.Context) { s.cancel(keys) case <-s.tick.C: - live := make([]*cid.Cid, 0, len(s.liveWants)) + live := make([]cid.Cid, 0, len(s.liveWants)) now := time.Now() for c := range s.liveWants { cs, _ := cid.Cast([]byte(c)) @@ -220,7 +220,7 @@ func (s *Session) run(ctx context.Context) { s.bs.wm.WantBlocks(ctx, live, nil, s.id) if len(live) > 0 { - go func(k *cid.Cid) { + go func(k cid.Cid) { // TODO: have a task queue setup for this to: // - rate limit // - manage timeouts @@ -249,7 +249,7 @@ func (s *Session) run(ctx context.Context) { } } -func (s *Session) cidIsWanted(c *cid.Cid) bool { +func (s *Session) cidIsWanted(c cid.Cid) bool { _, ok := s.liveWants[c.KeyString()] if !ok { ok = s.tofetch.Has(c) @@ -272,13 +272,13 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { s.fetchcnt++ s.notif.Publish(blk) - if next := s.tofetch.Pop(); next != nil { - s.wantBlocks(ctx, []*cid.Cid{next}) + if next := s.tofetch.Pop(); next.Defined() { + s.wantBlocks(ctx, []cid.Cid{next}) } } } -func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { +func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { now := time.Now() for _, c := range ks { s.liveWants[c.KeyString()] = now @@ -286,20 +286,20 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) } -func (s *Session) cancel(keys []*cid.Cid) { +func (s *Session) cancel(keys []cid.Cid) { for _, c := range keys { s.tofetch.Remove(c) } } -func (s *Session) cancelWants(keys []*cid.Cid) { +func (s *Session) cancelWants(keys []cid.Cid) { select { case s.cancelKeys <- keys: case <-s.ctx.Done(): } } -func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { +func (s *Session) fetch(ctx context.Context, keys []cid.Cid) { select { case s.newReqs <- keys: case <-ctx.Done(): @@ -310,18 +310,18 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { // GetBlocks fetches a set of blocks within the context of this session and // returns a channel that found blocks will be returned on. No order is // guaranteed on the returned blocks. -func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { +func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) { ctx = logging.ContextWithLoggable(ctx, s.uuid) return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants) } // GetBlock fetches a single block -func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { +func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) { return getBlock(parent, k, s.GetBlocks) } type cidQueue struct { - elems []*cid.Cid + elems []cid.Cid eset *cid.Set } @@ -329,10 +329,10 @@ func newCidQueue() *cidQueue { return &cidQueue{eset: cid.NewSet()} } -func (cq *cidQueue) Pop() *cid.Cid { +func (cq *cidQueue) Pop() cid.Cid { for { if len(cq.elems) == 0 { - return nil + return cid.Cid{} } out := cq.elems[0] @@ -345,17 +345,17 @@ func (cq *cidQueue) Pop() *cid.Cid { } } -func (cq *cidQueue) Push(c *cid.Cid) { +func (cq *cidQueue) Push(c cid.Cid) { if cq.eset.Visit(c) { cq.elems = append(cq.elems, c) } } -func (cq *cidQueue) Remove(c *cid.Cid) { +func (cq *cidQueue) Remove(c cid.Cid) { cq.eset.Remove(c) } -func (cq *cidQueue) Has(c *cid.Cid) bool { +func (cq *cidQueue) Has(c cid.Cid) bool { return cq.eset.Has(c) } diff --git a/session_test.go b/session_test.go index 97b7a31a..8769d891 100644 --- a/session_test.go +++ b/session_test.go @@ -76,7 +76,7 @@ func TestSessionBetweenPeers(t *testing.T) { t.Fatal(err) } - var cids []*cid.Cid + var cids []cid.Cid for _, blk := range blks { cids = append(cids, blk.Cid()) } @@ -127,7 +127,7 @@ func TestSessionSplitFetch(t *testing.T) { } } - var cids []*cid.Cid + var cids []cid.Cid for _, blk := range blks { cids = append(cids, blk.Cid()) } @@ -167,12 +167,12 @@ func TestInterestCacheOverflow(t *testing.T) { b := inst[1] ses := a.Exchange.NewSession(ctx) - zeroch, err := ses.GetBlocks(ctx, []*cid.Cid{blks[0].Cid()}) + zeroch, err := ses.GetBlocks(ctx, []cid.Cid{blks[0].Cid()}) if err != nil { t.Fatal(err) } - var restcids []*cid.Cid + var restcids []cid.Cid for _, blk := range blks[1:] { restcids = append(restcids, blk.Cid()) } @@ -219,7 +219,7 @@ func TestPutAfterSessionCacheEvict(t *testing.T) { ses := a.Exchange.NewSession(ctx) - var allcids []*cid.Cid + var allcids []cid.Cid for _, blk := range blks[1:] { allcids = append(allcids, blk.Cid()) } @@ -261,14 +261,14 @@ func TestMultipleSessions(t *testing.T) { ctx1, cancel1 := context.WithCancel(ctx) ses := a.Exchange.NewSession(ctx1) - blkch, err := ses.GetBlocks(ctx, []*cid.Cid{blk.Cid()}) + blkch, err := ses.GetBlocks(ctx, []cid.Cid{blk.Cid()}) if err != nil { t.Fatal(err) } cancel1() ses2 := a.Exchange.NewSession(ctx) - blkch2, err := ses2.GetBlocks(ctx, []*cid.Cid{blk.Cid()}) + blkch2, err := ses2.GetBlocks(ctx, []cid.Cid{blk.Cid()}) if err != nil { t.Fatal(err) } @@ -296,7 +296,7 @@ func TestWantlistClearsOnCancel(t *testing.T) { bgen := blocksutil.NewBlockGenerator() blks := bgen.Blocks(10) - var cids []*cid.Cid + var cids []cid.Cid for _, blk := range blks { cids = append(cids, blk.Cid()) } diff --git a/stat.go b/stat.go index 99dbbd32..d01d1717 100644 --- a/stat.go +++ b/stat.go @@ -8,7 +8,7 @@ import ( type Stat struct { ProvideBufLen int - Wantlist []*cid.Cid + Wantlist []cid.Cid Peers []string BlocksReceived uint64 DataReceived uint64 diff --git a/testnet/virtual.go b/testnet/virtual.go index 2a1e9377..004dd66c 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -131,7 +131,7 @@ func (nc *networkClient) SendMessage( } // FindProvidersAsync returns a channel of providers for the given key -func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { +func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { // NB: this function duplicates the PeerInfo -> ID transformation in the // bitswap network adapter. Not to worry. This network client will be @@ -185,7 +185,7 @@ func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet. } // Provide provides the key to the network -func (nc *networkClient) Provide(ctx context.Context, k *cid.Cid) error { +func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error { return nc.routing.Provide(ctx, k, true) } diff --git a/wantlist/wantlist.go b/wantlist/wantlist.go index beb4ac75..22819240 100644 --- a/wantlist/wantlist.go +++ b/wantlist/wantlist.go @@ -20,14 +20,14 @@ type Wantlist struct { } type Entry struct { - Cid *cid.Cid + Cid cid.Cid Priority int SesTrk map[uint64]struct{} } // NewRefEntry creates a new reference tracked wantlist entry -func NewRefEntry(c *cid.Cid, p int) *Entry { +func NewRefEntry(c cid.Cid, p int) *Entry { return &Entry{ Cid: c, Priority: p, @@ -61,7 +61,7 @@ func New() *Wantlist { // TODO: think through priority changes here // Add returns true if the cid did not exist in the wantlist before this call // (even if it was under a different session) -func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool { +func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() k := c.KeyString() @@ -97,7 +97,7 @@ func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool { // 'true' is returned if this call to Remove removed the final session ID // tracking the cid. (meaning true will be returned iff this call caused the // value of 'Contains(c)' to change from true to false) -func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool { +func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool { w.lk.Lock() defer w.lk.Unlock() k := c.KeyString() @@ -116,7 +116,7 @@ func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool { // Contains returns true if the given cid is in the wantlist tracked by one or // more sessions -func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) { +func (w *ThreadSafe) Contains(k cid.Cid) (*Entry, bool) { w.lk.RLock() defer w.lk.RUnlock() e, ok := w.set[k.KeyString()] @@ -149,7 +149,7 @@ func (w *Wantlist) Len() int { return len(w.set) } -func (w *Wantlist) Add(c *cid.Cid, priority int) bool { +func (w *Wantlist) Add(c cid.Cid, priority int) bool { k := c.KeyString() if _, ok := w.set[k]; ok { return false @@ -172,7 +172,7 @@ func (w *Wantlist) AddEntry(e *Entry) bool { return true } -func (w *Wantlist) Remove(c *cid.Cid) bool { +func (w *Wantlist) Remove(c cid.Cid) bool { k := c.KeyString() _, ok := w.set[k] if !ok { @@ -183,7 +183,7 @@ func (w *Wantlist) Remove(c *cid.Cid) bool { return true } -func (w *Wantlist) Contains(k *cid.Cid) (*Entry, bool) { +func (w *Wantlist) Contains(k cid.Cid) (*Entry, bool) { e, ok := w.set[k.KeyString()] return e, ok } diff --git a/wantlist/wantlist_test.go b/wantlist/wantlist_test.go index 0d4c696a..4ce31949 100644 --- a/wantlist/wantlist_test.go +++ b/wantlist/wantlist_test.go @@ -6,7 +6,7 @@ import ( cid "github.com/ipfs/go-cid" ) -var testcids []*cid.Cid +var testcids []cid.Cid func init() { strs := []string{ @@ -25,10 +25,10 @@ func init() { } type wli interface { - Contains(*cid.Cid) (*Entry, bool) + Contains(cid.Cid) (*Entry, bool) } -func assertHasCid(t *testing.T, w wli, c *cid.Cid) { +func assertHasCid(t *testing.T, w wli, c cid.Cid) { e, ok := w.Contains(c) if !ok { t.Fatal("expected to have ", c) @@ -38,7 +38,7 @@ func assertHasCid(t *testing.T, w wli, c *cid.Cid) { } } -func assertNotHasCid(t *testing.T, w wli, c *cid.Cid) { +func assertNotHasCid(t *testing.T, w wli, c cid.Cid) { _, ok := w.Contains(c) if ok { t.Fatal("expected not to have ", c) diff --git a/wantmanager.go b/wantmanager.go index 380d8538..87efb860 100644 --- a/wantmanager.go +++ b/wantmanager.go @@ -77,13 +77,13 @@ type msgQueue struct { } // WantBlocks adds the given cids to the wantlist, tracked by the given session -func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { +func (pm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { log.Infof("want blocks: %s", ks) pm.addEntries(ctx, ks, peers, false, ses) } // CancelWants removes the given cids from the wantlist, tracked by the given session -func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) { +func (pm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { pm.addEntries(context.Background(), ks, peers, true, ses) } @@ -93,7 +93,7 @@ type wantSet struct { from uint64 } -func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) { +func (pm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { entries := make([]*bsmsg.Entry, 0, len(ks)) for i, k := range ks { entries = append(entries, &bsmsg.Entry{ diff --git a/workers.go b/workers.go index 8f5e6edd..41ede8e9 100644 --- a/workers.go +++ b/workers.go @@ -91,7 +91,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { limit := make(chan struct{}, provideWorkerMax) - limitedGoProvide := func(k *cid.Cid, wid int) { + limitedGoProvide := func(k cid.Cid, wid int) { defer func() { // replace token when done <-limit @@ -135,9 +135,9 @@ func (bs *Bitswap) provideWorker(px process.Process) { func (bs *Bitswap) provideCollector(ctx context.Context) { defer close(bs.provideKeys) - var toProvide []*cid.Cid - var nextKey *cid.Cid - var keysOut chan *cid.Cid + var toProvide []cid.Cid + var nextKey cid.Cid + var keysOut chan cid.Cid for { select {