diff --git a/Makefile b/Makefile index 047f5d4f8be..ccfbcc98f64 100644 --- a/Makefile +++ b/Makefile @@ -129,9 +129,9 @@ endif .PHONY: test-ci-race test-ci-race: ifdef cover - $(GO) test -race -run "[^FLAKY]$$" -coverprofile=cover.out ./... + @bash -c '$(GO) test -count=1 -race -run "[^FLAKY]$$" -coverprofile=cover.out ./... 2>&1 | grep -v "malformed LC_DYSYMTAB" || true; exit $${PIPESTATUS[0]}' else - $(GO) test -race -run "[^FLAKY]$$" ./... + @bash -c '$(GO) test -count=1 -race -run "[^FLAKY]$$" ./... 2>&1 | grep -v "malformed LC_DYSYMTAB" || true; exit $${PIPESTATUS[0]}' endif .PHONY: test-ci-flaky diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index cc89f921397..a4450575e12 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -81,6 +81,14 @@ func (g *decoderCache) createRemoveCallback(key string) func(error) { } } +// getFromCache safely retrieves a decoder from cache with lock +func (g *decoderCache) getFromCache(key string) (storage.Getter, bool) { + g.mu.Lock() + defer g.mu.Unlock() + d, ok := g.cache[key] + return d, ok +} + // GetOrCreate returns a decoder for the given chunk address func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter { // since a recovery decoder is not allowed, simply return the underlying netstore @@ -93,9 +101,8 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. } key := fingerprint(addrs) - g.mu.Lock() - defer g.mu.Unlock() - d, ok := g.cache[key] + d, ok := g.getFromCache(key) + if ok { if d == nil { // The nil value indicates a previous successful recovery @@ -105,15 +112,20 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. // Create a factory function that will instantiate the decoder only when needed recovery := func() storage.Getter { g.config.Logger.Debug("lazy-creating recovery decoder after fetch failed", "key", key) + + if d, ok := g.getFromCache(key); ok && d != nil { + return d + } + + newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config) + g.mu.Lock() defer g.mu.Unlock() - d, ok := g.cache[key] - if ok && d != nil { + if d, ok := g.cache[key]; ok && d != nil { return d } - d = getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config) - g.cache[key] = d - return d + g.cache[key] = newGetter + return newGetter } return getter.NewReDecoder(g.fetcher, recovery, g.config.Logger) @@ -122,9 +134,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. } removeCallback := g.createRemoveCallback(key) - d = getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config) - g.cache[key] = d - return d + newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config) + + // ensure no other goroutine created the same getter + g.mu.Lock() + defer g.mu.Unlock() + if d, ok := g.cache[key]; ok { + return d + } + g.cache[key] = newGetter + return newGetter } // New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities. @@ -211,8 +230,8 @@ func (j *joiner) ReadAt(buffer []byte, off int64) (read int, err error) { readLen := min(int64(cap(buffer)), j.span-off) var bytesRead int64 - var eg errgroup.Group - j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, &eg) + eg, ectx := errgroup.WithContext(j.ctx) + j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, eg, ectx) err = eg.Wait() if err != nil { @@ -230,6 +249,7 @@ func (j *joiner) readAtOffset( bytesRead *int64, parity int, eg *errgroup.Group, + ectx context.Context, ) { // we are at a leaf data chunk if subTrieSize <= int64(len(data)) { @@ -280,7 +300,12 @@ func (j *joiner) readAtOffset( func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) { eg.Go(func() error { - ch, err := g.Get(j.ctx, addr) + select { + case <-ectx.Done(): + return ectx.Err() + default: + } + ch, err := g.Get(ectx, addr) if err != nil { return err } @@ -293,7 +318,7 @@ func (j *joiner) readAtOffset( return ErrMalformedTrie } - j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg) + j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg, ectx) return nil }) }(addr, b, cur, subtrieSpan, off, bufferOffset, currentReadSize, subtrieSpanLimit)