diff --git a/log.go b/log.go index b352b32..fefe369 100644 --- a/log.go +++ b/log.go @@ -259,6 +259,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { } hash := index.KeyHashEncoded(index.KeyHash(key)) + tctx := time.Now().UnixMicro() l.readersMu.RLock() defer l.readersMu.RUnlock() @@ -266,7 +267,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { for i := len(l.readers) - 1; i >= 0; i-- { rdr := l.readers[i] - switch msg, err := rdr.GetByKey(key, hash); { + switch msg, err := rdr.GetByKey(key, hash, tctx); { case err == nil: return msg, nil case err == index.ErrKeyNotFound: @@ -294,6 +295,7 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { } ts := start.UnixMicro() + tctx := time.Now().UnixMicro() l.readersMu.RLock() defer l.readersMu.RUnlock() @@ -301,7 +303,7 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { for i := len(l.readers) - 1; i >= 0; i-- { rdr := l.readers[i] - switch msg, err := rdr.GetByTime(ts); { + switch msg, err := rdr.GetByTime(ts, tctx); { case err == nil: return msg, nil case err == index.ErrTimeBeforeStart: diff --git a/reader.go b/reader.go index 5d94c3f..c6c1418 100644 --- a/reader.go +++ b/reader.go @@ -20,11 +20,11 @@ type reader struct { messages *message.Reader messagesMu sync.RWMutex - messagesInuse int64 + messagesInuse atomic.Int64 index indexer indexMu sync.RWMutex - indexLastAccess int64 + indexLastAccess atomic.Int64 } type indexer interface { @@ -75,7 +75,7 @@ func (r *reader) GetOffset() int64 { } func (r *reader) GetNextOffset() (int64, error) { - index, err := r.getIndex() + index, err := r.getIndexNow() if err != nil { return 0, err } @@ -83,7 +83,7 @@ func (r *reader) GetNextOffset() (int64, error) { } func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, error) { - index, err := r.getIndex() + index, err := r.getIndexNow() if err != nil { return OffsetInvalid, nil, err } @@ -108,7 +108,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro if err != nil { return OffsetInvalid, nil, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) msgs, err := messages.Consume(position, maxPosition, maxCount) if err != nil { @@ -118,7 +118,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro } func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int64, []message.Message, error) { - ix, err := r.getIndex() + ix, err := r.getIndexNow() if err != nil { return OffsetInvalid, nil, err } @@ -149,7 +149,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 if err != nil { return OffsetInvalid, nil, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) var msgs []message.Message for i := 0; i < len(positions); i++ { @@ -180,7 +180,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 } func (r *reader) Get(offset int64) (message.Message, error) { - index, err := r.getIndex() + index, err := r.getIndexNow() if err != nil { return message.Invalid, err } @@ -194,13 +194,13 @@ func (r *reader) Get(offset int64) (message.Message, error) { if err != nil { return message.Invalid, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) return messages.Get(position) } -func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) { - ix, err := r.getIndex() +func (r *reader) GetByKey(key, keyHash []byte, tctx int64) (message.Message, error) { + ix, err := r.getIndexAt(tctx) if err != nil { return message.Invalid, err } @@ -214,7 +214,7 @@ func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) { if err != nil { return message.Invalid, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) for i := len(positions) - 1; i >= 0; i-- { msg, err := messages.Get(positions[i]) @@ -229,8 +229,8 @@ func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) { return message.Invalid, index.ErrKeyNotFound } -func (r *reader) GetByTime(ts int64) (message.Message, error) { - index, err := r.getIndex() +func (r *reader) GetByTime(ts int64, tctx int64) (message.Message, error) { + index, err := r.getIndexAt(tctx) if err != nil { return message.Invalid, err } @@ -244,7 +244,7 @@ func (r *reader) GetByTime(ts int64) (message.Message, error) { if err != nil { return message.Invalid, err } - defer atomic.AddInt64(&r.messagesInuse, -1) + defer r.messagesInuse.Add(-1) return messages.Get(position) } @@ -297,9 +297,17 @@ func (r *reader) Delete(rs *segment.RewriteSegment) (*reader, error) { return r, nil } -func (r *reader) getIndex() (indexer, error) { - atomic.StoreInt64(&r.indexLastAccess, time.Now().UnixMicro()) +func (r *reader) getIndexNow() (indexer, error) { + r.indexLastAccess.Store(time.Now().UnixMicro()) + return r.getIndexMarked() +} + +func (r *reader) getIndexAt(tctx int64) (indexer, error) { + r.indexLastAccess.Store(tctx) + return r.getIndexMarked() +} +func (r *reader) getIndexMarked() (indexer, error) { r.indexMu.RLock() if ix := r.index; ix != nil { defer r.indexMu.RUnlock() @@ -326,7 +334,7 @@ func (r *reader) getIndex() (indexer, error) { func (r *reader) getMessages() (*message.Reader, error) { r.messagesMu.RLock() if msgs := r.messages; msgs != nil { - atomic.AddInt64(&r.messagesInuse, 1) + r.messagesInuse.Add(1) r.messagesMu.RUnlock() return msgs, nil } @@ -336,7 +344,7 @@ func (r *reader) getMessages() (*message.Reader, error) { defer r.messagesMu.Unlock() if msgs := r.messages; msgs != nil { - atomic.AddInt64(&r.messagesInuse, 1) + r.messagesInuse.Add(1) return msgs, nil } @@ -346,7 +354,7 @@ func (r *reader) getMessages() (*message.Reader, error) { } r.messages = msgs - atomic.AddInt64(&r.messagesInuse, 1) + r.messagesInuse.Add(1) return msgs, nil } @@ -363,7 +371,7 @@ func (r *reader) GC(unusedFor time.Duration) error { return nil } - indexLastAccess := time.UnixMicro(atomic.LoadInt64(&r.indexLastAccess)) + indexLastAccess := time.UnixMicro(r.indexLastAccess.Load()) if time.Since(indexLastAccess) < unusedFor { // only unload segments unused for defined time return nil @@ -374,7 +382,7 @@ func (r *reader) GC(unusedFor time.Duration) error { r.messagesMu.Lock() defer r.messagesMu.Unlock() - if r.messages == nil || atomic.LoadInt64(&r.messagesInuse) > 0 { + if r.messages == nil || r.messagesInuse.Load() > 0 { return nil }