From 4b8ade1c13cae2f0224c3b7869439a1bb0cb606b Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 14 Feb 2024 16:59:05 +0100 Subject: [PATCH 1/5] Revert "blockservice: make ContextWithSession shortcut grabSessionFromContext inside newSession" Supperseeded by #570 This reverts commit ea04c77d44386b10e1d0aa8b691021f84f04c0c8. --- blockservice/blockservice.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 353be00f8..463c866bb 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -483,7 +483,7 @@ func ContextWithSession(ctx context.Context, bs BlockService) context.Context { if grabSessionFromContext(ctx, bs) != nil { return ctx } - return EmbedSessionInContext(ctx, newSession(ctx, bs)) + return EmbedSessionInContext(ctx, NewSession(ctx, bs)) } // EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. From d8d7675544ccd6a4bfaf85e5b7c09aae90efc9dd Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 14 Feb 2024 17:07:56 +0100 Subject: [PATCH 2/5] blockservice: remove session embeding in context This brings us to a state before #549 back then I also did cleanup in this session code, that I have kept, I only removed the sessions in context feature. --- CHANGELOG.md | 1 - blockservice/blockservice.go | 55 -------------------------- blockservice/blockservice_test.go | 65 ------------------------------- gateway/blocks_backend.go | 6 --- 4 files changed, 127 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a64b4e7..2360bd711 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,6 @@ The following emojis are used to highlight certain changes: ### Added -- `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. - `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically. - `namesys` now has a `WithMaxCacheTTL` option, which allows you to define a maximum TTL that will be used for caching IPNS entries. diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 463c866bb..87bc8db6b 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -140,16 +140,6 @@ func (s *blockService) Allowlist() verifcid.Allowlist { // directly. // Sessions are lazily setup, this is cheap. func NewSession(ctx context.Context, bs BlockService) *Session { - ses := grabSessionFromContext(ctx, bs) - if ses != nil { - return ses - } - - return newSession(ctx, bs) -} - -// newSession is like [NewSession] but it does not attempt to reuse session from the existing context. -func newSession(ctx context.Context, bs BlockService) *Session { return &Session{bs: bs, sesctx: ctx} } @@ -232,10 +222,6 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - if ses := grabSessionFromContext(ctx, s); ses != nil { - return ses.GetBlock(ctx, c) - } - ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() @@ -295,10 +281,6 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { - if ses := grabSessionFromContext(ctx, s); ses != nil { - return ses.GetBlocks(ctx, ks) - } - ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() @@ -474,43 +456,6 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo var _ BlockGetter = (*Session)(nil) -// ContextWithSession is a helper which creates a context with an embded session, -// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService] -// will be redirected to this same session instead. -// Sessions are lazily setup, this is cheap. -// It wont make a new session if one exists already in the context. -func ContextWithSession(ctx context.Context, bs BlockService) context.Context { - if grabSessionFromContext(ctx, bs) != nil { - return ctx - } - return EmbedSessionInContext(ctx, NewSession(ctx, bs)) -} - -// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. -func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { - // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice. - return context.WithValue(ctx, ses.bs, ses) -} - -// grabSessionFromContext returns nil if the session was not found -// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, -// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. -// By having this private we allow consumers to follow the trace of where the blockservice is passed and used. -func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { - s := ctx.Value(bs) - if s == nil { - return nil - } - - ss, ok := s.(*Session) - if !ok { - // idk what to do here, that kinda sucks, giveup - return nil - } - - return ss -} - // grabAllowlistFromBlockservice never returns nil func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { if bbs, ok := bs.(BoundedBlockService); ok { diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 53fd725f3..f4668fe93 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -288,68 +288,3 @@ func TestAllowlist(t *testing.T) { check(blockservice.GetBlock) check(NewSession(ctx, blockservice).GetBlock) } - -type fakeIsNewSessionCreateExchange struct { - ses exchange.Fetcher - newSessionWasCalled bool -} - -var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil) - -func (*fakeIsNewSessionCreateExchange) Close() error { - return nil -} - -func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { - panic("should call on the session") -} - -func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) { - panic("should call on the session") -} - -func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher { - f.newSessionWasCalled = true - return f.ses -} - -func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error { - return nil -} - -func TestContextSession(t *testing.T) { - t.Parallel() - a := assert.New(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - bgen := butil.NewBlockGenerator() - block1 := bgen.Next() - block2 := bgen.Next() - - bs := blockstore.NewBlockstore(ds.NewMapDatastore()) - a.NoError(bs.Put(ctx, block1)) - a.NoError(bs.Put(ctx, block2)) - sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)} - - service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) - - ctx = ContextWithSession(ctx, service) - - b, err := service.GetBlock(ctx, block1.Cid()) - a.NoError(err) - a.Equal(b.RawData(), block1.RawData()) - a.True(sesEx.newSessionWasCalled, "new session from context should be created") - sesEx.newSessionWasCalled = false - - bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()}) - a.Equal((<-bchan).RawData(), block2.RawData()) - a.False(sesEx.newSessionWasCalled, "session should be reused in context") - - a.Equal( - NewSession(ctx, service), - NewSession(ContextWithSession(ctx, service), service), - "session must be deduped in all invocations on the same context", - ) -} diff --git a/gateway/blocks_backend.go b/gateway/blocks_backend.go index d85c2846b..99a762c80 100644 --- a/gateway/blocks_backend.go +++ b/gateway/blocks_backend.go @@ -689,12 +689,6 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool { return has } -var _ WithContextHint = (*BlocksBackend)(nil) - -func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context { - return blockservice.ContextWithSession(ctx, bb.blockService) -} - func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { roots, lastSeg, remainder, err := bb.getPathRoots(ctx, path) if err != nil { From ff3ac2612843e41241866f3a9c4af1f0efee1c42 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 28 Dec 2023 19:44:06 +0100 Subject: [PATCH 3/5] blockservice: add WithProvider option This allows to recreate the behavior of advertising added blocks the bitswap server used to do. --- CHANGELOG.md | 1 + blockservice/blockservice.go | 59 ++++++++++++++++++++++++++ blockservice/blockservice_test.go | 69 +++++++++++++++++++++++++++++++ examples/go.mod | 1 + examples/go.sum | 2 + 5 files changed, 132 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2360bd711..9c9e39536 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes: - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. - `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically. - `namesys` now has a `WithMaxCacheTTL` option, which allows you to define a maximum TTL that will be used for caching IPNS entries. +- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do. ### Changed diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 87bc8db6b..6fff661cb 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/exchange" + "github.com/ipfs/boxo/provider" "github.com/ipfs/boxo/verifcid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -73,10 +74,21 @@ type BoundedBlockService interface { var _ BoundedBlockService = (*blockService)(nil) +// ProvidingBlockService is a Blockservice which provides new blocks to a provider. +type ProvidingBlockService interface { + BlockService + + // Provider can return nil, then no provider is used. + Provider() provider.Provider +} + +var _ ProvidingBlockService = (*blockService)(nil) + type blockService struct { allowlist verifcid.Allowlist blockstore blockstore.Blockstore exchange exchange.Interface + provider provider.Provider // If checkFirst is true then first check that a block doesn't // already exist to avoid republishing the block on the exchange. checkFirst bool @@ -99,6 +111,13 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option { } } +// WithProvider allows to advertise anything that is added through the blockservice. +func WithProvider(prov provider.Provider) Option { + return func(bs *blockService) { + bs.provider = prov + } +} + // New creates a BlockService with given datastore instance. func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService { if exchange == nil { @@ -133,6 +152,10 @@ func (s *blockService) Allowlist() verifcid.Allowlist { return s.allowlist } +func (s *blockService) Provider() provider.Provider { + return s.provider +} + // NewSession creates a new session that allows for // controlled exchange of wantlists to decrease the bandwidth overhead. // If the current exchange is a SessionExchange, a new exchange @@ -170,6 +193,11 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } + if s.provider != nil { + if err := s.provider.Provide(o.Cid()); err != nil { + logger.Errorf("Provide: %s", err.Error()) + } + } return nil } @@ -216,6 +244,14 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } + if s.provider != nil { + for _, o := range toput { + if err := s.provider.Provide(o.Cid()); err != nil { + logger.Errorf("Provide: %s", err.Error()) + } + } + } + return nil } @@ -273,6 +309,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } } + if provider := grabProviderFromBlockservice(bs); provider != nil { + err = provider.Provide(blk.Cid()) + if err != nil { + return nil, err + } + } logger.Debugf("BlockService.BlockFetched %s", c) return blk, nil } @@ -346,6 +388,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } ex := blockservice.Exchange() + provider := grabProviderFromBlockservice(blockservice) var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block @@ -377,6 +420,14 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet cache[0] = nil // early gc } + if provider != nil { + err = provider.Provide(b.Cid()) + if err != nil { + logger.Errorf("could not tell the provider about new blocks: %s", err) + return + } + } + select { case out <- b: case <-ctx.Done(): @@ -463,3 +514,11 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { } return verifcid.DefaultAllowlist } + +// grabProviderFromBlockservice can return nil if no provider is used. +func grabProviderFromBlockservice(bs BlockService) provider.Provider { + if bbs, ok := bs.(ProvidingBlockService); ok { + return bbs.Provider() + } + return nil +} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index f4668fe93..2a2cb831b 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -288,3 +288,72 @@ func TestAllowlist(t *testing.T) { check(blockservice.GetBlock) check(NewSession(ctx, blockservice).GetBlock) } + +type mockProvider []cid.Cid + +func (p *mockProvider) Provide(c cid.Cid) error { + *p = append(*p, c) + return nil +} +func TestProviding(t *testing.T) { + t.Parallel() + a := assert.New(t) + + bgen := butil.NewBlockGenerator() + blocks := bgen.Blocks(9) + + exchange := blockstore.NewBlockstore(ds.NewMapDatastore()) + + prov := mockProvider{} + blockservice := New(blockstore.NewBlockstore(ds.NewMapDatastore()), offline.Exchange(exchange), WithProvider(&prov)) + var added []cid.Cid + + // Adding one block provide it. + a.NoError(blockservice.AddBlock(context.Background(), blocks[0])) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Adding multiple blocks provide them. + a.NoError(blockservice.AddBlocks(context.Background(), blocks[0:2])) + added = append(added, blocks[0].Cid(), blocks[1].Cid()) + blocks = blocks[2:] + + // Downloading one block provide it. + a.NoError(exchange.Put(context.Background(), blocks[0])) + _, err := blockservice.GetBlock(context.Background(), blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks provide them. + a.NoError(exchange.PutMany(context.Background(), blocks[0:2])) + cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + var got []cid.Cid + for b := range blockservice.GetBlocks(context.Background(), cids) { + got = append(got, b.Cid()) + } + added = append(added, cids...) + a.ElementsMatch(cids, got) + blocks = blocks[2:] + + session := NewSession(context.Background(), blockservice) + + // Downloading one block over a session provide it. + a.NoError(exchange.Put(context.Background(), blocks[0])) + _, err = session.GetBlock(context.Background(), blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks over a session provide them. + a.NoError(exchange.PutMany(context.Background(), blocks[0:2])) + cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + got = nil + for b := range session.GetBlocks(context.Background(), cids) { + got = append(got, b.Cid()) + } + a.ElementsMatch(cids, got) + added = append(added, cids...) + + a.ElementsMatch(added, []cid.Cid(prov)) +} diff --git a/examples/go.mod b/examples/go.mod index 6c0630543..7b91ade2a 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -60,6 +60,7 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect + github.com/ipfs/go-cidutil v0.1.0 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect diff --git a/examples/go.sum b/examples/go.sum index cfb0944d0..725c2e244 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -167,6 +167,8 @@ github.com/ipfs/go-blockservice v0.5.0 h1:B2mwhhhVQl2ntW2EIpaWPwSCxSuqr5fFA93Ms4 github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q= +github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= From 047fce01696f82617ff1545ea307374b3fd89f52 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 27 Dec 2023 20:03:08 +0100 Subject: [PATCH 4/5] bitswap/server: remove provide We always had a very weird relationship between bitswap and providing. Bitswap took care of doing the initial provide and then reprovider did it later. The Bitswap server had a complicated providing workflow where it slurped thing into memory. Reprovide accepts provides and is able to queue them in a database, such as on disk, this is much better. I'll add options to hook initial provide logic from the blockservice to the reprovider queue so consumers don't have to do this themselves. --- CHANGELOG.md | 1 + bitswap/bitswap.go | 12 +- bitswap/bitswap_test.go | 2 +- bitswap/client/bitswap_with_sessions_test.go | 4 + bitswap/internal/defaults/defaults.go | 5 - bitswap/options.go | 4 - bitswap/server/server.go | 166 +------------------ 7 files changed, 16 insertions(+), 178 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c9e39536..ad9097666 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ The following emojis are used to highlight certain changes: - 🛠 `boxo/gateway`: when making a trustless CAR request with the "entity-bytes" parameter, using a negative index greater than the underlying entity length could trigger reading more data than intended - 🛠 `boxo/gateway`: the header configuration `Config.Headers` and `AddAccessControlHeaders` has been replaced by the new middleware provided by `NewHeaders`. +- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany. ### Security diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 393ab96ad..90c8690b7 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ipfs/boxo/bitswap/client" - "github.com/ipfs/boxo/bitswap/internal/defaults" "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/server" @@ -45,9 +44,8 @@ type bitswap interface { } var ( - _ exchange.SessionExchange = (*Bitswap)(nil) - _ bitswap = (*Bitswap)(nil) - HasBlockBufferSize = defaults.HasBlockBufferSize + _ exchange.SessionExchange = (*Bitswap)(nil) + _ bitswap = (*Bitswap)(nil) ) type Bitswap struct { @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.WithTracer(tracer)) } - if HasBlockBufferSize != defaults.HasBlockBufferSize { - serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize)) - } - ctx = metrics.CtxSubScope(ctx, "bitswap") bs.Server = server.New(ctx, net, bstore, serverOptions...) @@ -115,7 +109,6 @@ type Stat struct { MessagesReceived uint64 BlocksSent uint64 DataSent uint64 - ProvideBufLen int } func (bs *Bitswap) Stat() (*Stat, error) { @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) { Peers: ss.Peers, BlocksSent: ss.BlocksSent, DataSent: ss.DataSent, - ProvideBufLen: ss.ProvideBufLen, }, nil } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 505871d6e..7d2b1f924 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)} + bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) defer ig.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index 0baede658..a3174d0a4 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -37,6 +37,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk if err != nil { t.Fatal(err) } + err = inst.Adapter.Provide(ctx, blk.Cid()) + if err != nil { + t.Fatal(err) + } } func TestBasicSessions(t *testing.T) { diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index f5511cc7a..a06f5c8b9 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -20,11 +20,6 @@ const ( BitswapMaxOutstandingBytesPerPeer = 1 << 20 // the number of bytes we attempt to make each outgoing bitswap message BitswapEngineTargetMessageSize = 16 * 1024 - // HasBlockBufferSize is the buffer size of the channel for new blocks - // that need to be provided. They should get pulled over by the - // provideCollector even before they are actually provided. - // TODO: Does this need to be this large givent that? - HasBlockBufferSize = 256 // Maximum size of the wantlist we are willing to keep in memory. MaxQueuedWantlistEntiresPerPeer = 1024 diff --git a/bitswap/options.go b/bitswap/options.go index da759dfe2..9bea0b637 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option { return Option{server.TaskWorkerCount(count)} } -func ProvideEnabled(enabled bool) Option { - return Option{server.ProvideEnabled(enabled)} -} - func SetSendDontHaves(send bool) Option { return Option{server.SetSendDontHaves(send)} } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 7feffd093..913fb83fb 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -21,20 +21,15 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" ) -var provideKeysBufferSize = 2048 - var ( log = logging.Logger("bitswap-server") sflog = log.Desugar() ) -const provideWorkerMax = 6 - type Option func(*Server) type Server struct { @@ -59,20 +54,8 @@ type Server struct { process process.Process - // newBlocks is a channel for newly added blocks to be provided to the - // network. blocks pushed down this channel get buffered and fed to the - // provideKeys channel later on to avoid too much network activity - newBlocks chan cid.Cid - // provideKeys directly feeds provide workers - provideKeys chan cid.Cid - // Extra options to pass to the decision manager engineOptions []decision.Option - - // the size of channel buffer to use - hasBlockBufferSize int - // whether or not to make provide announcements - provideEnabled bool } func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl }() s := &Server{ - sentHistogram: bmetrics.SentHist(ctx), - sendTimeHistogram: bmetrics.SendTimeHist(ctx), - taskWorkerCount: defaults.BitswapTaskWorkerCount, - network: network, - process: px, - provideEnabled: true, - hasBlockBufferSize: defaults.HasBlockBufferSize, - provideKeys: make(chan cid.Cid, provideKeysBufferSize), + sentHistogram: bmetrics.SentHist(ctx), + sendTimeHistogram: bmetrics.SendTimeHist(ctx), + taskWorkerCount: defaults.BitswapTaskWorkerCount, + network: network, + process: px, } - s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize) for _, o := range options { o(s) @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option { } } -// ProvideEnabled is an option for enabling/disabling provide announcements -func ProvideEnabled(enabled bool) Option { - return func(bs *Server) { - bs.provideEnabled = enabled - } -} - func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option { o := decision.WithPeerBlockRequestFilter(pbrf) return func(bs *Server) { @@ -233,16 +205,6 @@ func MaxCidSize(n uint) Option { } } -// HasBlockBufferSize configure how big the new blocks buffer should be. -func HasBlockBufferSize(count int) Option { - if count < 0 { - panic("cannot have negative buffer size") - } - return func(bs *Server) { - bs.hasBlockBufferSize = count - } -} - // WantlistForPeer returns the currently understood list of blocks requested by a // given peer. func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid { @@ -263,18 +225,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) { bs.taskWorker(ctx, i) }) } - - if bs.provideEnabled { - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - px.Go(bs.provideWorker) - } } func (bs *Server) taskWorker(ctx context.Context, id int) { @@ -382,10 +332,9 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) { } type Stat struct { - Peers []string - ProvideBufLen int - BlocksSent uint64 - DataSent uint64 + Peers []string + BlocksSent uint64 + DataSent uint64 } // Stat returns aggregated statistics about bitswap operations @@ -393,7 +342,6 @@ func (bs *Server) Stat() (Stat, error) { bs.counterLk.Lock() s := bs.counters bs.counterLk.Unlock() - s.ProvideBufLen = len(bs.newBlocks) peers := bs.engine.Peers() peersStr := make([]string, len(peers)) @@ -420,107 +368,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err // Send wanted blocks to decision engine bs.engine.NotifyNewBlocks(blks) - // If the reprovider is enabled, send block to reprovider - if bs.provideEnabled { - for _, blk := range blks { - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() - } - } - } - return nil } -func (bs *Server) provideCollector(ctx context.Context) { - defer close(bs.provideKeys) - var toProvide []cid.Cid - var nextKey cid.Cid - var keysOut chan cid.Cid - - for { - select { - case blkey, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - - if keysOut == nil { - nextKey = blkey - keysOut = bs.provideKeys - } else { - toProvide = append(toProvide, blkey) - } - case keysOut <- nextKey: - if len(toProvide) > 0 { - nextKey = toProvide[0] - toProvide = toProvide[1:] - } else { - keysOut = nil - } - case <-ctx.Done(): - return - } - } -} - -func (bs *Server) provideWorker(px process.Process) { - // FIXME: OnClosingContext returns a _custom_ context type. - // Unfortunately, deriving a new cancelable context from this custom - // type fires off a goroutine. To work around this, we create a single - // cancelable context up-front and derive all sub-contexts from that. - // - // See: https://github.com/ipfs/go-ipfs/issues/5810 - ctx := procctx.OnClosingContext(px) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - limit := make(chan struct{}, provideWorkerMax) - - limitedGoProvide := func(k cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - - log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k) - defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k) - - ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warn(err) - } - } - - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - for wid := 2; ; wid++ { - log.Debug("Bitswap.ProvideWorker.Loop") - - select { - case <-px.Closing(): - return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - select { - case <-px.Closing(): - return - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - } - } - } -} - func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. From 4060154040ef0ae67ad1157bb413a74afa98cea0 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 16 Feb 2024 14:46:21 +0100 Subject: [PATCH 5/5] blockservice: add session workaround to work with wrapped blockservices --- blockservice/blockservice.go | 23 ++++++++---- blockservice/blockservice_test.go | 56 ++++++++++++++++++++++------ blockservice/providing_blockstore.go | 37 ++++++++++++++++++ 3 files changed, 96 insertions(+), 20 deletions(-) create mode 100644 blockservice/providing_blockstore.go diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 6fff661cb..aac14ba84 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -140,6 +140,11 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) // Blockstore returns the blockstore behind this blockservice. func (s *blockService) Blockstore() blockstore.Blockstore { + if s.provider != nil { + // FIXME: this is a hack remove once ipfs/boxo#567 is solved. + return providingBlockstore{s.blockstore, s.provider} + } + return s.blockstore } @@ -275,7 +280,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } - blockstore := bs.Blockstore() + provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs) block, err := blockstore.Get(ctx, c) switch { @@ -309,7 +314,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } } - if provider := grabProviderFromBlockservice(bs); provider != nil { + if provider != nil { err = provider.Provide(blk.Cid()) if err != nil { return nil, err @@ -360,7 +365,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet ks = ks2 } - bs := blockservice.Blockstore() + provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice) var misses []cid.Cid for _, c := range ks { @@ -388,7 +393,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } ex := blockservice.Exchange() - provider := grabProviderFromBlockservice(blockservice) var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block @@ -515,10 +519,13 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { return verifcid.DefaultAllowlist } -// grabProviderFromBlockservice can return nil if no provider is used. -func grabProviderFromBlockservice(bs BlockService) provider.Provider { +// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used. +func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) { + if bbs, ok := bs.(*blockService); ok { + return bbs.provider, bbs.blockstore + } if bbs, ok := bs.(ProvidingBlockService); ok { - return bbs.Provider() + return bbs.Provider(), bbs.Blockstore() } - return nil + return nil, bs.Blockstore() } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 2a2cb831b..b04c3df8d 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -289,18 +289,26 @@ func TestAllowlist(t *testing.T) { check(NewSession(ctx, blockservice).GetBlock) } +type wrappedBlockservice struct { + BlockService +} + type mockProvider []cid.Cid func (p *mockProvider) Provide(c cid.Cid) error { *p = append(*p, c) return nil } + func TestProviding(t *testing.T) { t.Parallel() a := assert.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bgen := butil.NewBlockGenerator() - blocks := bgen.Blocks(9) + blocks := bgen.Blocks(12) exchange := blockstore.NewBlockstore(ds.NewMapDatastore()) @@ -309,51 +317,75 @@ func TestProviding(t *testing.T) { var added []cid.Cid // Adding one block provide it. - a.NoError(blockservice.AddBlock(context.Background(), blocks[0])) + a.NoError(blockservice.AddBlock(ctx, blocks[0])) added = append(added, blocks[0].Cid()) blocks = blocks[1:] // Adding multiple blocks provide them. - a.NoError(blockservice.AddBlocks(context.Background(), blocks[0:2])) + a.NoError(blockservice.AddBlocks(ctx, blocks[0:2])) added = append(added, blocks[0].Cid(), blocks[1].Cid()) blocks = blocks[2:] // Downloading one block provide it. - a.NoError(exchange.Put(context.Background(), blocks[0])) - _, err := blockservice.GetBlock(context.Background(), blocks[0].Cid()) + a.NoError(exchange.Put(ctx, blocks[0])) + _, err := blockservice.GetBlock(ctx, blocks[0].Cid()) a.NoError(err) added = append(added, blocks[0].Cid()) blocks = blocks[1:] // Downloading multiple blocks provide them. - a.NoError(exchange.PutMany(context.Background(), blocks[0:2])) + a.NoError(exchange.PutMany(ctx, blocks[0:2])) cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} var got []cid.Cid - for b := range blockservice.GetBlocks(context.Background(), cids) { + for b := range blockservice.GetBlocks(ctx, cids) { got = append(got, b.Cid()) } added = append(added, cids...) a.ElementsMatch(cids, got) blocks = blocks[2:] - session := NewSession(context.Background(), blockservice) + session := NewSession(ctx, blockservice) // Downloading one block over a session provide it. - a.NoError(exchange.Put(context.Background(), blocks[0])) - _, err = session.GetBlock(context.Background(), blocks[0].Cid()) + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) a.NoError(err) added = append(added, blocks[0].Cid()) blocks = blocks[1:] // Downloading multiple blocks over a session provide them. - a.NoError(exchange.PutMany(context.Background(), blocks[0:2])) + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + got = nil + for b := range session.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + a.ElementsMatch(cids, got) + added = append(added, cids...) + blocks = blocks[2:] + + // Test wrapping the blockservice like nopfs does. + session = NewSession(ctx, wrappedBlockservice{blockservice}) + + // Downloading one block over a wrapped blockservice session provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks over a wrapped blockservice session provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} got = nil - for b := range session.GetBlocks(context.Background(), cids) { + for b := range session.GetBlocks(ctx, cids) { got = append(got, b.Cid()) } a.ElementsMatch(cids, got) added = append(added, cids...) + blocks = blocks[2:] + + a.Empty(blocks) a.ElementsMatch(added, []cid.Cid(prov)) } diff --git a/blockservice/providing_blockstore.go b/blockservice/providing_blockstore.go new file mode 100644 index 000000000..7435f8ae2 --- /dev/null +++ b/blockservice/providing_blockstore.go @@ -0,0 +1,37 @@ +package blockservice + +import ( + "context" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/provider" + blocks "github.com/ipfs/go-block-format" +) + +var _ blockstore.Blockstore = providingBlockstore{} + +type providingBlockstore struct { + blockstore.Blockstore + provider provider.Provider +} + +func (pbs providingBlockstore) Put(ctx context.Context, b blocks.Block) error { + if err := pbs.Blockstore.Put(ctx, b); err != nil { + return err + } + + return pbs.provider.Provide(b.Cid()) +} + +func (pbs providingBlockstore) PutMany(ctx context.Context, b []blocks.Block) error { + if err := pbs.Blockstore.PutMany(ctx, b); err != nil { + return err // what are the semantics here, did some blocks were put ? assume PutMany is atomic + } + + for _, b := range b { + if err := pbs.provider.Provide(b.Cid()); err != nil { + return err // this can only error if the whole provider is done for + } + } + return nil +}