From 886c1795f65a0e769b1cc7dc1f4cc48bf10621c0 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 10 Mar 2025 18:46:16 -0400 Subject: [PATCH 1/8] cleanup docs and some errors --- blocking.go | 11 +++++++++-- index/format.go | 35 +++++++++++++++-------------------- log.go | 2 +- message/format.go | 43 +++++++++++++++++++------------------------ message/message.go | 9 --------- reader.go | 16 ++++++++-------- segment/index.go | 6 ++---- segment/segment.go | 45 +++++++++++++++++++++------------------------ segment/segments.go | 8 ++++---- segment/utils.go | 20 ++++++++++---------- typed.go | 17 +++++++++++++++++ typed_blocking.go | 9 +++++++++ typed_codec.go | 5 +++++ writer.go | 3 +-- 14 files changed, 121 insertions(+), 108 deletions(-) diff --git a/blocking.go b/blocking.go index 5513421..b632a07 100644 --- a/blocking.go +++ b/blocking.go @@ -2,16 +2,18 @@ package klevdb import "context" +// BlockingLog enhances log adding blocking consume type BlockingLog interface { Log - // ConsumeBlocking is similar to Consume, but if offset is equal to the next offsetit will block until next event is produced + // ConsumeBlocking is similar to Consume, but if offset is equal to the next offset it will block until next message is produced ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) // ConsumeByKeyBlocking is similar to ConsumeBlocking, but only returns messages matching the key ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error) } +// OpenBlocking opens log and wraps it with support for blocking consume func OpenBlocking(dir string, opts Options) (BlockingLog, error) { l, err := Open(dir, opts) if err != nil { @@ -20,6 +22,7 @@ func OpenBlocking(dir string, opts Options) (BlockingLog, error) { return WrapBlocking(l) } +// WrapBlocking wraps log with support for blocking consume func WrapBlocking(l Log) (BlockingLog, error) { next, err := l.NextOffset() if err != nil { @@ -35,8 +38,12 @@ type blockingLog struct { func (l *blockingLog) Publish(messages []Message) (int64, error) { nextOffset, err := l.Log.Publish(messages) + if err != nil { + return OffsetInvalid, err + } + l.notify.Set(nextOffset) - return nextOffset, err + return nextOffset, nil } func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []Message, error) { diff --git a/index/format.go b/index/format.go index eff23c4..e698864 100644 --- a/index/format.go +++ b/index/format.go @@ -22,20 +22,18 @@ type Writer struct { func OpenWriter(path string, opts Params) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("could not open index: %w", err) + return nil, kleverr.Newf("index open: %w", err) } - pos := int64(0) - if stat, err := f.Stat(); err != nil { - return nil, kleverr.Newf("could not stat index: %w", err) - } else { - pos = stat.Size() + stat, err := f.Stat() + if err != nil { + return nil, kleverr.Newf("index stat: %w", err) } return &Writer{ opts: opts, f: f, - pos: pos, + pos: stat.Size(), keyOffset: opts.keyOffset(), }, nil } @@ -57,7 +55,7 @@ func (w *Writer) Write(it Item) error { } if n, err := w.f.Write(w.buff); err != nil { - return kleverr.Newf("failed to write index: %w", err) + return kleverr.Newf("index write: %w", err) } else { w.pos += int64(n) } @@ -71,26 +69,23 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync index: %w", err) + return kleverr.Newf("index sync: %w", err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close index: %w", err) + return kleverr.Newf("index close: %w", err) } return nil } func (w *Writer) SyncAndClose() error { - if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync index: %w", err) - } - if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close index: %w", err) + if err := w.Sync(); err != nil { + return err } - return nil + return w.Close() } func Write(path string, opts Params, index []Item) error { @@ -112,24 +107,24 @@ func Write(path string, opts Params, index []Item) error { func Read(path string, opts Params) ([]Item, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("could not open index: %w", err) + return nil, kleverr.Newf("index open: %w", err) } defer f.Close() stat, err := os.Stat(path) if err != nil { - return nil, kleverr.Newf("could not stat index: %w", err) + return nil, kleverr.Newf("index stat: %w", err) } dataSize := stat.Size() itemSize := opts.Size() if dataSize%itemSize > 0 { - return nil, kleverr.Newf("%w: unexpected data len: %d", ErrCorrupted, dataSize) + return nil, kleverr.Newf("%w: unexpected data len %d", ErrCorrupted, dataSize) } data := make([]byte, dataSize) if _, err = io.ReadFull(f, data); err != nil { - return nil, kleverr.Newf("could not read index: %w", err) + return nil, kleverr.Newf("index read: %w", err) } var keyOffset = opts.keyOffset() diff --git a/log.go b/log.go index a4afdce..024ccb7 100644 --- a/log.go +++ b/log.go @@ -371,7 +371,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err newWriter, newReader, err := l.writer.Delete(rs) switch { - case errors.Is(err, errSegmentChanged): + case err == errSegmentChanged: return nil, 0, nil case err != nil: return nil, 0, err diff --git a/message/format.go b/message/format.go index da853bd..8b5e3e8 100644 --- a/message/format.go +++ b/message/format.go @@ -13,7 +13,7 @@ import ( "github.com/klev-dev/kleverr" ) -var ErrCorrupted = errors.New("message corrupted") +var ErrCorrupted = errors.New("log corrupted") var crc32cTable = crc32.MakeTable(crc32.Castagnoli) @@ -31,17 +31,15 @@ type Writer struct { func OpenWriter(path string) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, kleverr.Newf("log open: %w", err) } - pos := int64(0) - if stat, err := f.Stat(); err != nil { - return nil, kleverr.Newf("could not stat log: %w", err) - } else { - pos = stat.Size() + stat, err := f.Stat() + if err != nil { + return nil, kleverr.Newf("log stat: %w", err) } - return &Writer{Path: path, f: f, pos: pos}, nil + return &Writer{Path: path, f: f, pos: stat.Size()}, nil } func (w *Writer) Write(m Message) (int64, error) { @@ -71,7 +69,7 @@ func (w *Writer) Write(m Message) (int64, error) { pos := w.pos if n, err := w.f.Write(w.buff); err != nil { - return 0, kleverr.Newf("log failed write: %w", err) + return 0, kleverr.Newf("log write: %w", err) } else { w.pos += int64(n) } @@ -84,26 +82,23 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync log: %w", err) + return kleverr.Newf("log sync: %w", err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return kleverr.Newf("log close: %w", err) } return nil } func (w *Writer) SyncAndClose() error { - if err := w.f.Sync(); err != nil { - return kleverr.Newf("could not sync log: %w", err) + if err := w.Sync(); err != nil { + return err } - if err := w.f.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) - } - return nil + return w.Close() } type Reader struct { @@ -115,7 +110,7 @@ type Reader struct { func OpenReader(path string) (*Reader, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, kleverr.Newf("log open: %w", err) } return &Reader{ @@ -127,7 +122,7 @@ func OpenReader(path string) (*Reader, error) { func OpenReaderMem(path string) (*Reader, error) { f, err := mmap.Open(path) if err != nil { - return nil, kleverr.Newf("could not open log: %w", err) + return nil, kleverr.Newf("log open: %w", err) } return &Reader{ @@ -176,7 +171,7 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case errors.Is(err, io.ErrUnexpectedEOF): return -1, kleverr.Newf("%w: short header", ErrCorrupted) default: - return -1, kleverr.Newf("could not read header: %w", err) + return -1, kleverr.Newf("header read: %w", err) } msg.Offset = int64(binary.BigEndian.Uint64(headerBytes[0:])) @@ -200,12 +195,12 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case errors.Is(err, io.EOF): return -1, kleverr.Newf("%w: no message", ErrCorrupted) default: - return -1, kleverr.Newf("could not read message: %w", err) + return -1, kleverr.Newf("message read: %w", err) } actualCRC := crc32.Checksum(messageBytes, crc32cTable) if expectedCRC != actualCRC { - return -1, kleverr.Newf("%w: invalid crc: expected=%d; actual=%d", ErrCorrupted, expectedCRC, actualCRC) + return -1, kleverr.Newf("%w: checksum mismatch", ErrCorrupted) } if keySize > 0 { @@ -222,11 +217,11 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err func (r *Reader) Close() error { if r.ra != nil { if err := r.ra.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return kleverr.Newf("log close: %w", err) } } else { if err := r.r.Close(); err != nil { - return kleverr.Newf("could not close log: %w", err) + return kleverr.Newf("log close: %w", err) } } return nil diff --git a/message/message.go b/message/message.go index 3af2c55..2d7c3c2 100644 --- a/message/message.go +++ b/message/message.go @@ -5,8 +5,6 @@ import ( "fmt" "strings" "time" - - "github.com/klev-dev/kleverr" ) const ( @@ -23,13 +21,6 @@ const ( var ErrInvalidOffset = errors.New("invalid offset") var ErrNotFound = errors.New("not found") -func ValidateOffset(offset int64) error { - if offset < OffsetOldest { - return kleverr.Newf("%w: %d is not a valid offset", ErrInvalidOffset, offset) - } - return nil -} - type Message struct { Offset int64 Time time.Time diff --git a/reader.go b/reader.go index 2f71553..208e518 100644 --- a/reader.go +++ b/reader.go @@ -2,7 +2,6 @@ package klevdb import ( "bytes" - "errors" "sync" "sync/atomic" "time" @@ -133,14 +132,15 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 } positions, err := index.Keys(keyHash) - if err != nil { - if errors.Is(err, message.ErrNotFound) { - nextOffset, err := index.GetNextOffset() - if err != nil { - return OffsetInvalid, nil, err - } - return nextOffset, nil, nil + switch { + case err == nil: + case err == message.ErrNotFound: + nextOffset, err := index.GetNextOffset() + if err != nil { + return OffsetInvalid, nil, err } + return nextOffset, nil, nil + default: return OffsetInvalid, nil, err } diff --git a/segment/index.go b/segment/index.go index dadcc3d..7458241 100644 --- a/segment/index.go +++ b/segment/index.go @@ -2,7 +2,6 @@ package segment import ( "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/kleverr" ) type Offsetter interface { @@ -61,11 +60,10 @@ func Get[S ~[]O, O Offsetter](segments S, offset int64) (O, int, error) { switch { case offset < beginSegment.GetOffset(): var v O - err := message.ErrNotFound if beginSegment.GetOffset() == 0 { - err = message.ErrInvalidOffset + return v, -1, message.ErrInvalidOffset } - return v, -1, kleverr.Newf("%w: %d is before beginning", err, offset) + return v, -1, message.ErrNotFound case offset == beginSegment.GetOffset(): return beginSegment, 0, nil } diff --git a/segment/segment.go b/segment/segment.go index 6bdbf33..c4b8e54 100644 --- a/segment/segment.go +++ b/segment/segment.go @@ -46,12 +46,12 @@ type Stats struct { func (s Segment) Stat(params index.Params) (Stats, error) { logStat, err := os.Stat(s.Log) if err != nil { - return Stats{}, kleverr.Newf("could not stat log: %w", err) + return Stats{}, kleverr.Newf("log stat: %w", err) } indexStat, err := os.Stat(s.Index) if err != nil { - return Stats{}, kleverr.Newf("could not stat index: %w", err) + return Stats{}, kleverr.Newf("index stat: %w", err) } return Stats{ @@ -75,7 +75,7 @@ func (s Segment) Check(params index.Params) error { if errors.Is(err, io.EOF) { break } else if err != nil { - return kleverr.Newf("%s: %w", s.Log, err) + return err } item := params.NewItem(msg, position, indexTime) @@ -91,11 +91,11 @@ func (s Segment) Check(params index.Params) error { case err != nil: return err case len(logIndex) != len(items): - return kleverr.Newf("%s: incorrect index size: %w", s.Index, index.ErrCorrupted) + return kleverr.Newf("segment %d incorrect index size: %w", s.Offset, index.ErrCorrupted) default: for i, item := range logIndex { if item != items[i] { - return kleverr.Newf("%s: incorrect index item: %w", s.Index, index.ErrCorrupted) + return kleverr.Newf("segment %d mismatch index item: %w", s.Offset, index.ErrCorrupted) } } } @@ -144,20 +144,17 @@ func (s Segment) Recover(params index.Params) error { if err := log.Close(); err != nil { return err } - if err := restore.Sync(); err != nil { - return err - } - if err := restore.Close(); err != nil { + if err := restore.SyncAndClose(); err != nil { return err } if corrupted { if err := os.Rename(restore.Path, log.Path); err != nil { - return kleverr.Newf("could not rename restore: %w", err) + return kleverr.Newf("restore rename: %w", err) } } else { if err := os.Remove(restore.Path); err != nil { - return kleverr.Newf("could not delete restore: %w", err) + return kleverr.Newf("restore delete: %w", err) } } @@ -182,7 +179,7 @@ func (s Segment) Recover(params index.Params) error { if corruptedIndex { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("could not remove corrupted index: %w", err) + return kleverr.Newf("restore remove corrupted index: %w", err) } } @@ -194,7 +191,7 @@ func (s Segment) NeedsReindex() (bool, error) { case os.IsNotExist(err): return true, nil case err != nil: - return false, kleverr.Newf("could not stat index: %w", err) + return false, kleverr.Newf("index stat: %w", err) case info.Size() == 0: return true, nil default: @@ -250,20 +247,20 @@ func (s Segment) ReindexReader(params index.Params, log *message.Reader) ([]inde func (s Segment) Backup(targetDir string) error { logName, err := filepath.Rel(s.Dir, s.Log) if err != nil { - return kleverr.Newf("could not rel log: %w", err) + return kleverr.Newf("backup log rel: %w", err) } targetLog := filepath.Join(targetDir, logName) if err := copyFile(s.Log, targetLog); err != nil { - return kleverr.Newf("could not copy log: %w", err) + return kleverr.Newf("backup log copy: %w", err) } indexName, err := filepath.Rel(s.Dir, s.Index) if err != nil { - return kleverr.Newf("could not rel index: %w", err) + return kleverr.Newf("backup index rel: %w", err) } targetIndex := filepath.Join(targetDir, indexName) if err := copyFile(s.Index, targetIndex); err != nil { - return kleverr.Newf("could not copy index: %w", err) + return kleverr.Newf("backup index copy: %w", err) } return nil @@ -282,11 +279,11 @@ func (s Segment) ForRewrite() (Segment, error) { func (olds Segment) Rename(news Segment) error { if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("could not rename log: %w", err) + return kleverr.Newf("log rename: %w", err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("could not rename index: %w", err) + return kleverr.Newf("index rename: %w", err) } return nil @@ -295,15 +292,15 @@ func (olds Segment) Rename(news Segment) error { func (olds Segment) Override(news Segment) error { // remove index segment so we don't have invalid index if err := os.Remove(news.Index); err != nil { - return kleverr.Newf("could not delete index: %w", err) + return kleverr.Newf("index remove: %w", err) } if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("could not rename log: %w", err) + return kleverr.Newf("log rename: %w", err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("could not rename index: %w", err) + return kleverr.Newf("index rename: %w", err) } return nil @@ -311,10 +308,10 @@ func (olds Segment) Override(news Segment) error { func (s Segment) Remove() error { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("could not delete index: %w", err) + return kleverr.Newf("index remove: %w", err) } if err := os.Remove(s.Log); err != nil { - return kleverr.Newf("could not delete log: %w", err) + return kleverr.Newf("log remove: %w", err) } return nil } diff --git a/segment/segments.go b/segment/segments.go index c16eceb..9fea6cc 100644 --- a/segment/segments.go +++ b/segment/segments.go @@ -13,7 +13,7 @@ import ( func Find(dir string) ([]Segment, error) { files, err := os.ReadDir(dir) if err != nil { - return nil, kleverr.Newf("could not list dir: %w", err) + return nil, kleverr.Newf("read dir: %w", err) } var segments []Segment @@ -23,7 +23,7 @@ func Find(dir string) ([]Segment, error) { offset, err := strconv.ParseInt(offsetStr, 10, 64) if err != nil { - return nil, kleverr.Newf("parse offset failed: %w", err) + return nil, kleverr.Newf("parse offset: %w", err) } segments = append(segments, New(dir, offset)) @@ -97,7 +97,7 @@ func BackupDir(dir, target string) error { } if err := os.MkdirAll(target, 0700); err != nil { - return kleverr.Newf("could not create backup dir: %w", err) + return kleverr.Newf("backup create dir: %w", err) } return Backup(segments, target) @@ -106,7 +106,7 @@ func BackupDir(dir, target string) error { func Backup(segments []Segment, dir string) error { for _, seg := range segments { if err := seg.Backup(dir); err != nil { - return kleverr.Newf("could not backup segment %d: %w", seg.Offset, err) + return err } } diff --git a/segment/utils.go b/segment/utils.go index 654e24a..f65e382 100644 --- a/segment/utils.go +++ b/segment/utils.go @@ -12,7 +12,7 @@ import ( func randStr(length int) (string, error) { k := make([]byte, length) if _, err := io.ReadFull(rand.Reader, k); err != nil { - return "", kleverr.Ret(err) + return "", kleverr.Newf("rand read: %w", err) } return base58.Encode(k), nil } @@ -20,20 +20,20 @@ func randStr(length int) (string, error) { func copyFile(src, dst string) error { fsrc, err := os.Open(src) if err != nil { - return kleverr.Newf("could not open src: %w", err) + return kleverr.Newf("src open: %w", err) } defer fsrc.Close() stat, err := fsrc.Stat() if err != nil { - return kleverr.Newf("could not stat src: %w", err) + return kleverr.Newf("src stat: %w", err) } fdst, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) if os.IsExist(err) { switch dstStat, err := os.Stat(dst); { case err != nil: - return kleverr.Newf("could not stat dst: %w", err) + return kleverr.Newf("dst stat: %w", err) case stat.Size() == dstStat.Size() && stat.ModTime().Equal(dstStat.ModTime()): // TODO do we need a safer version of this? return nil @@ -41,25 +41,25 @@ func copyFile(src, dst string) error { fdst, err = os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) } if err != nil { - return kleverr.Newf("could not open dst: %w", err) + return kleverr.Newf("dst open: %w", err) } defer fdst.Close() switch n, err := io.Copy(fdst, fsrc); { case err != nil: - return kleverr.Newf("could not copy: %w", err) + return kleverr.Newf("src/dst copy: %w", err) case n < stat.Size(): - return kleverr.Newf("could not copy all data (%d/%d)", n, stat.Size()) + return kleverr.Newf("src/dst copy: partial (%d/%d)", n, stat.Size()) } if err := fdst.Sync(); err != nil { - return kleverr.Newf("could not sync dst: %w", err) + return kleverr.Newf("dst sync: %w", err) } if err := fdst.Close(); err != nil { - return kleverr.Newf("could not close dst: %w", err) + return kleverr.Newf("dst close: %w", err) } if err := os.Chtimes(dst, stat.ModTime(), stat.ModTime()); err != nil { - return kleverr.Newf("could not set dst time: %w", err) + return kleverr.Newf("dst chtimes: %w", err) } return nil diff --git a/typed.go b/typed.go index 8dd956b..8fb3607 100644 --- a/typed.go +++ b/typed.go @@ -13,37 +13,53 @@ type TMessage[K any, V any] struct { // TLog is a typed log type TLog[K any, V any] interface { + // Publish see Log.Publish Publish(messages []TMessage[K, V]) (nextOffset int64, err error) + // NextOffset see Log.NextOffset NextOffset() (nextOffset int64, err error) + // Consume see Log.Consume Consume(offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) + // ConsumeByKey see Log.ConsumeByKey ConsumeByKey(key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) + // Get see Log.Get Get(offset int64) (message TMessage[K, V], err error) + // GetByKey see Log.GetByKey GetByKey(key K, empty bool) (message TMessage[K, V], err error) + // GetByTime see Log.GetByTime GetByTime(start time.Time) (message TMessage[K, V], err error) + // Delete see Log.Delete Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error) + // Size see Log.Size Size(m Message) int64 + // Stat see Log.Stat Stat() (Stats, error) + // Backup see Log.Backup Backup(dir string) error + // Sync see Log.Sync Sync() (nextOffset int64, err error) + // GC see Log.GC GC(unusedFor time.Duration) error + // Close see Log.Close Close() error + // Raw returns the wrapped in log Raw() Log } +// OpenT opens a typed log with specified key/value codecs func OpenT[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error) { l, err := Open(dir, opts) if err != nil { @@ -52,6 +68,7 @@ func OpenT[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec return &tlog[K, V]{l, keyCodec, valueCodec}, nil } +// WrapT wraps a log with specified key/value codecs func WrapT[K any, V any](l Log, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error) { return &tlog[K, V]{l, keyCodec, valueCodec}, nil } diff --git a/typed_blocking.go b/typed_blocking.go index a97e56e..17b9c51 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -2,14 +2,18 @@ package klevdb import "context" +// TBlockingLog enhances tlog adding blocking consume type TBlockingLog[K any, V any] interface { TLog[K, V] + // ConsumeBlocking see BlockingLog.ConsumeBlocking ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) + // ConsumeByKeyBlocking see BlockingLog.ConsumeByKeyBlocking ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) } +// OpenBlocking opens tlog and wraps it with support for blocking consume func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error) { l, err := OpenT(dir, opts, keyCodec, valueCodec) if err != nil { @@ -18,6 +22,7 @@ func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], va return WrapTBlocking(l) } +// WrapBlocking wraps tlog with support for blocking consume func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) { next, err := l.NextOffset() if err != nil { @@ -33,6 +38,10 @@ type tlogBlocking[K any, V any] struct { func (l *tlogBlocking[K, V]) Publish(tmessages []TMessage[K, V]) (int64, error) { nextOffset, err := l.TLog.Publish(tmessages) + if err != nil { + return OffsetInvalid, err + } + l.notify.Set(nextOffset) return nextOffset, err } diff --git a/typed_codec.go b/typed_codec.go index 30ebf2a..fcc7441 100644 --- a/typed_codec.go +++ b/typed_codec.go @@ -7,11 +7,13 @@ import ( "github.com/klev-dev/kleverr" ) +// Codec is interface satisfied by all codecs type Codec[T any] interface { Encode(t T, empty bool) (b []byte, err error) Decode(b []byte) (t T, empty bool, err error) } +// JsonCodec supports coding values as a JSON type JsonCodec[T any] struct{} func (c JsonCodec[T]) Encode(t T, empty bool) ([]byte, error) { @@ -48,6 +50,7 @@ func (c stringOptCodec) Decode(b []byte) (string, bool, error) { return s, false, err } +// StringOptCodec supports coding an optional string, e.g. differantiates between "" and nil strings var StringOptCodec = stringOptCodec{} type stringCodec struct{} @@ -60,6 +63,7 @@ func (c stringCodec) Decode(b []byte) (string, bool, error) { return string(b), false, nil } +// StringCodec supports coding a string var StringCodec = stringCodec{} type varintCodec struct{} @@ -82,4 +86,5 @@ func (c varintCodec) Decode(b []byte) (int64, bool, error) { return t, false, nil } +// VarintCodec supports coding integers as varint var VarintCodec = varintCodec{} diff --git a/writer.go b/writer.go index 39d718e..eda6765 100644 --- a/writer.go +++ b/writer.go @@ -11,7 +11,6 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" "github.com/klev-dev/klevdb/segment" - "github.com/klev-dev/kleverr" ) type writer struct { @@ -113,7 +112,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { if err := rs.Segment.Remove(); err != nil { return nil, nil, err } - return nil, nil, kleverr.Newf("delete failed: %w", errSegmentChanged) + return nil, nil, errSegmentChanged } if err := w.Close(); err != nil { From 232061e2012aa17048fac91deaebd1229727ec4f Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 11 Mar 2025 08:35:59 -0400 Subject: [PATCH 2/8] move to sentinel errors --- message/format.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/message/format.go b/message/format.go index 8b5e3e8..a91deab 100644 --- a/message/format.go +++ b/message/format.go @@ -3,6 +3,7 @@ package message import ( "encoding/binary" "errors" + "fmt" "hash/crc32" "io" "os" @@ -14,6 +15,10 @@ import ( ) var ErrCorrupted = errors.New("log corrupted") +var errShortHeader = fmt.Errorf("%w: short header", ErrCorrupted) +var errShortMessage = fmt.Errorf("%w: short message", ErrCorrupted) +var errNoMessage = fmt.Errorf("%w: no message", ErrCorrupted) +var errCrcFailed = fmt.Errorf("%w: crc failed", ErrCorrupted) var crc32cTable = crc32.MakeTable(crc32.Castagnoli) @@ -169,7 +174,7 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case err == nil: // all good, continue case errors.Is(err, io.ErrUnexpectedEOF): - return -1, kleverr.Newf("%w: short header", ErrCorrupted) + return -1, errShortHeader default: return -1, kleverr.Newf("header read: %w", err) } @@ -191,16 +196,16 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case err == nil: // all good, continue case errors.Is(err, io.ErrUnexpectedEOF): - return -1, kleverr.Newf("%w: short message", ErrCorrupted) + return -1, errShortMessage case errors.Is(err, io.EOF): - return -1, kleverr.Newf("%w: no message", ErrCorrupted) + return -1, errNoMessage default: return -1, kleverr.Newf("message read: %w", err) } actualCRC := crc32.Checksum(messageBytes, crc32cTable) if expectedCRC != actualCRC { - return -1, kleverr.Newf("%w: checksum mismatch", ErrCorrupted) + return -1, errCrcFailed } if keySize > 0 { From c834d2739be1970a83ef95e81109d6dcb5cca827 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 11 Mar 2025 08:41:49 -0400 Subject: [PATCH 3/8] move to sentinel errors --- index/format.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/index/format.go b/index/format.go index e698864..a366058 100644 --- a/index/format.go +++ b/index/format.go @@ -3,6 +3,7 @@ package index import ( "encoding/binary" "errors" + "fmt" "io" "os" @@ -10,6 +11,7 @@ import ( ) var ErrCorrupted = errors.New("index corrupted") +var errIndexSize = fmt.Errorf("%w: unaligned index size", ErrCorrupted) type Writer struct { opts Params @@ -119,7 +121,7 @@ func Read(path string, opts Params) ([]Item, error) { itemSize := opts.Size() if dataSize%itemSize > 0 { - return nil, kleverr.Newf("%w: unexpected data len %d", ErrCorrupted, dataSize) + return nil, errIndexSize } data := make([]byte, dataSize) From 057f4161c05f2cbd56593fcf4b4445dfdae61281 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 11 Mar 2025 08:57:51 -0400 Subject: [PATCH 4/8] cleanup kleverr usage --- index/format.go | 18 ++++++++---------- log_test.go | 15 +++------------ message/format.go | 24 +++++++++++------------- segment/utils.go | 22 +++++++++++----------- 4 files changed, 33 insertions(+), 46 deletions(-) diff --git a/index/format.go b/index/format.go index a366058..fc6b7d8 100644 --- a/index/format.go +++ b/index/format.go @@ -6,8 +6,6 @@ import ( "fmt" "io" "os" - - "github.com/klev-dev/kleverr" ) var ErrCorrupted = errors.New("index corrupted") @@ -24,12 +22,12 @@ type Writer struct { func OpenWriter(path string, opts Params) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("index open: %w", err) + return nil, fmt.Errorf("write index open: %w", err) } stat, err := f.Stat() if err != nil { - return nil, kleverr.Newf("index stat: %w", err) + return nil, fmt.Errorf("write index stat: %w", err) } return &Writer{ @@ -57,7 +55,7 @@ func (w *Writer) Write(it Item) error { } if n, err := w.f.Write(w.buff); err != nil { - return kleverr.Newf("index write: %w", err) + return fmt.Errorf("write index: %w", err) } else { w.pos += int64(n) } @@ -71,14 +69,14 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("index sync: %w", err) + return fmt.Errorf("write index sync: %w", err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("index close: %w", err) + return fmt.Errorf("write index close: %w", err) } return nil } @@ -109,13 +107,13 @@ func Write(path string, opts Params, index []Item) error { func Read(path string, opts Params) ([]Item, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("index open: %w", err) + return nil, fmt.Errorf("read index open: %w", err) } defer f.Close() stat, err := os.Stat(path) if err != nil { - return nil, kleverr.Newf("index stat: %w", err) + return nil, fmt.Errorf("read index stat: %w", err) } dataSize := stat.Size() @@ -126,7 +124,7 @@ func Read(path string, opts Params) ([]Item, error) { data := make([]byte, dataSize) if _, err = io.ReadFull(f, data); err != nil { - return nil, kleverr.Newf("index read: %w", err) + return nil, fmt.Errorf("read index: %w", err) } var keyOffset = opts.keyOffset() diff --git a/log_test.go b/log_test.go index fa13567..3409cca 100644 --- a/log_test.go +++ b/log_test.go @@ -16,7 +16,6 @@ import ( "github.com/klev-dev/klevdb/message" "github.com/klev-dev/klevdb/segment" - "github.com/klev-dev/kleverr" ) func publishBatched(t *testing.T, l Log, msgs []Message, batchLen int) { @@ -1556,7 +1555,7 @@ func testConcurrentPubsubRecent(t *testing.T) { for ctx.Err() == nil { next, msgs, err := s.Consume(offset, 32) if err != nil { - return kleverr.Newf("could not consume offset %d: %w", offset, err) + return fmt.Errorf("could not consume offset %d: %w", offset, err) } if offset == next { @@ -1591,11 +1590,7 @@ func testConcurrentPubsubRecent(t *testing.T) { return nil }) - err = g.Wait() - if serr := kleverr.Get(err); serr != nil { - fmt.Println(serr.Print()) - } - require.NoError(t, err) + require.NoError(t, g.Wait()) } func testConcurrentConsume(t *testing.T) { @@ -1742,9 +1737,5 @@ func testConcurrentGC(t *testing.T) { return nil }) - err = g.Wait() - if serr := kleverr.Get(err); serr != nil { - fmt.Println(serr.Print()) - } - require.NoError(t, err) + require.NoError(t, g.Wait()) } diff --git a/message/format.go b/message/format.go index a91deab..028677c 100644 --- a/message/format.go +++ b/message/format.go @@ -10,8 +10,6 @@ import ( "time" "golang.org/x/exp/mmap" - - "github.com/klev-dev/kleverr" ) var ErrCorrupted = errors.New("log corrupted") @@ -36,12 +34,12 @@ type Writer struct { func OpenWriter(path string) (*Writer, error) { f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - return nil, kleverr.Newf("log open: %w", err) + return nil, fmt.Errorf("write log open: %w", err) } stat, err := f.Stat() if err != nil { - return nil, kleverr.Newf("log stat: %w", err) + return nil, fmt.Errorf("write log stat: %w", err) } return &Writer{Path: path, f: f, pos: stat.Size()}, nil @@ -74,7 +72,7 @@ func (w *Writer) Write(m Message) (int64, error) { pos := w.pos if n, err := w.f.Write(w.buff); err != nil { - return 0, kleverr.Newf("log write: %w", err) + return 0, fmt.Errorf("write log: %w", err) } else { w.pos += int64(n) } @@ -87,14 +85,14 @@ func (w *Writer) Size() int64 { func (w *Writer) Sync() error { if err := w.f.Sync(); err != nil { - return kleverr.Newf("log sync: %w", err) + return fmt.Errorf("write log sync: %w", err) } return nil } func (w *Writer) Close() error { if err := w.f.Close(); err != nil { - return kleverr.Newf("log close: %w", err) + return fmt.Errorf("write log close: %w", err) } return nil } @@ -115,7 +113,7 @@ type Reader struct { func OpenReader(path string) (*Reader, error) { f, err := os.Open(path) if err != nil { - return nil, kleverr.Newf("log open: %w", err) + return nil, fmt.Errorf("read log open: %w", err) } return &Reader{ @@ -127,7 +125,7 @@ func OpenReader(path string) (*Reader, error) { func OpenReaderMem(path string) (*Reader, error) { f, err := mmap.Open(path) if err != nil { - return nil, kleverr.Newf("log open: %w", err) + return nil, fmt.Errorf("read mem log open: %w", err) } return &Reader{ @@ -176,7 +174,7 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case errors.Is(err, io.ErrUnexpectedEOF): return -1, errShortHeader default: - return -1, kleverr.Newf("header read: %w", err) + return -1, fmt.Errorf("read header: %w", err) } msg.Offset = int64(binary.BigEndian.Uint64(headerBytes[0:])) @@ -200,7 +198,7 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err case errors.Is(err, io.EOF): return -1, errNoMessage default: - return -1, kleverr.Newf("message read: %w", err) + return -1, fmt.Errorf("read message: %w", err) } actualCRC := crc32.Checksum(messageBytes, crc32cTable) @@ -222,11 +220,11 @@ func (r *Reader) read(position int64, msg *Message) (nextPosition int64, err err func (r *Reader) Close() error { if r.ra != nil { if err := r.ra.Close(); err != nil { - return kleverr.Newf("log close: %w", err) + return fmt.Errorf("write mem log close: %w", err) } } else { if err := r.r.Close(); err != nil { - return kleverr.Newf("log close: %w", err) + return fmt.Errorf("write log close: %w", err) } } return nil diff --git a/segment/utils.go b/segment/utils.go index f65e382..d2dd92e 100644 --- a/segment/utils.go +++ b/segment/utils.go @@ -2,17 +2,17 @@ package segment import ( "crypto/rand" + "fmt" "io" "os" - "github.com/klev-dev/kleverr" "github.com/mr-tron/base58" ) func randStr(length int) (string, error) { k := make([]byte, length) if _, err := io.ReadFull(rand.Reader, k); err != nil { - return "", kleverr.Newf("rand read: %w", err) + return "", fmt.Errorf("rand read: %w", err) } return base58.Encode(k), nil } @@ -20,20 +20,20 @@ func randStr(length int) (string, error) { func copyFile(src, dst string) error { fsrc, err := os.Open(src) if err != nil { - return kleverr.Newf("src open: %w", err) + return fmt.Errorf("copy src open: %w", err) } defer fsrc.Close() stat, err := fsrc.Stat() if err != nil { - return kleverr.Newf("src stat: %w", err) + return fmt.Errorf("copy src stat: %w", err) } fdst, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) if os.IsExist(err) { switch dstStat, err := os.Stat(dst); { case err != nil: - return kleverr.Newf("dst stat: %w", err) + return fmt.Errorf("copy dst stat: %w", err) case stat.Size() == dstStat.Size() && stat.ModTime().Equal(dstStat.ModTime()): // TODO do we need a safer version of this? return nil @@ -41,25 +41,25 @@ func copyFile(src, dst string) error { fdst, err = os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) } if err != nil { - return kleverr.Newf("dst open: %w", err) + return fmt.Errorf("copy dst open: %w", err) } defer fdst.Close() switch n, err := io.Copy(fdst, fsrc); { case err != nil: - return kleverr.Newf("src/dst copy: %w", err) + return fmt.Errorf("copy: %w", err) case n < stat.Size(): - return kleverr.Newf("src/dst copy: partial (%d/%d)", n, stat.Size()) + return fmt.Errorf("partial copy (%d/%d)", n, stat.Size()) } if err := fdst.Sync(); err != nil { - return kleverr.Newf("dst sync: %w", err) + return fmt.Errorf("copy dst sync: %w", err) } if err := fdst.Close(); err != nil { - return kleverr.Newf("dst close: %w", err) + return fmt.Errorf("copy dst close: %w", err) } if err := os.Chtimes(dst, stat.ModTime(), stat.ModTime()); err != nil { - return kleverr.Newf("dst chtimes: %w", err) + return fmt.Errorf("copy dst chtimes: %w", err) } return nil From a678986148f6f310c0b394b6e95aa6f8442241c8 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 11 Mar 2025 09:27:12 -0400 Subject: [PATCH 5/8] more sentinel errors and kleverr cleanup --- segment/segment.go | 44 +++++++++++++++++++++++--------------------- segment/segments.go | 37 +++++++++++++++++++++---------------- typed_codec.go | 16 +++++++++++----- 3 files changed, 55 insertions(+), 42 deletions(-) diff --git a/segment/segment.go b/segment/segment.go index c4b8e54..ba06b64 100644 --- a/segment/segment.go +++ b/segment/segment.go @@ -12,7 +12,6 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/kleverr" ) type Segment struct { @@ -46,12 +45,12 @@ type Stats struct { func (s Segment) Stat(params index.Params) (Stats, error) { logStat, err := os.Stat(s.Log) if err != nil { - return Stats{}, kleverr.Newf("log stat: %w", err) + return Stats{}, fmt.Errorf("stat log: %w", err) } indexStat, err := os.Stat(s.Index) if err != nil { - return Stats{}, kleverr.Newf("index stat: %w", err) + return Stats{}, fmt.Errorf("stat index: %w", err) } return Stats{ @@ -61,6 +60,9 @@ func (s Segment) Stat(params index.Params) (Stats, error) { }, nil } +var errIndexSize = fmt.Errorf("%w: incorrect size", index.ErrCorrupted) +var errIndexItem = fmt.Errorf("%w: incorrect item", index.ErrCorrupted) + func (s Segment) Check(params index.Params) error { log, err := message.OpenReader(s.Log) if err != nil { @@ -91,11 +93,11 @@ func (s Segment) Check(params index.Params) error { case err != nil: return err case len(logIndex) != len(items): - return kleverr.Newf("segment %d incorrect index size: %w", s.Offset, index.ErrCorrupted) + return errIndexSize default: for i, item := range logIndex { if item != items[i] { - return kleverr.Newf("segment %d mismatch index item: %w", s.Offset, index.ErrCorrupted) + return errIndexItem } } } @@ -150,11 +152,11 @@ func (s Segment) Recover(params index.Params) error { if corrupted { if err := os.Rename(restore.Path, log.Path); err != nil { - return kleverr.Newf("restore rename: %w", err) + return fmt.Errorf("restore log rename: %w", err) } } else { if err := os.Remove(restore.Path); err != nil { - return kleverr.Newf("restore delete: %w", err) + return fmt.Errorf("restore log delete: %w", err) } } @@ -179,7 +181,7 @@ func (s Segment) Recover(params index.Params) error { if corruptedIndex { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("restore remove corrupted index: %w", err) + return fmt.Errorf("restore index delete: %w", err) } } @@ -191,7 +193,7 @@ func (s Segment) NeedsReindex() (bool, error) { case os.IsNotExist(err): return true, nil case err != nil: - return false, kleverr.Newf("index stat: %w", err) + return false, fmt.Errorf("needs reindex stat: %w", err) case info.Size() == 0: return true, nil default: @@ -247,20 +249,20 @@ func (s Segment) ReindexReader(params index.Params, log *message.Reader) ([]inde func (s Segment) Backup(targetDir string) error { logName, err := filepath.Rel(s.Dir, s.Log) if err != nil { - return kleverr.Newf("backup log rel: %w", err) + return fmt.Errorf("backup log rel: %w", err) } targetLog := filepath.Join(targetDir, logName) if err := copyFile(s.Log, targetLog); err != nil { - return kleverr.Newf("backup log copy: %w", err) + return fmt.Errorf("backup log copy: %w", err) } indexName, err := filepath.Rel(s.Dir, s.Index) if err != nil { - return kleverr.Newf("backup index rel: %w", err) + return fmt.Errorf("backup index rel: %w", err) } targetIndex := filepath.Join(targetDir, indexName) if err := copyFile(s.Index, targetIndex); err != nil { - return kleverr.Newf("backup index copy: %w", err) + return fmt.Errorf("backup index copy: %w", err) } return nil @@ -269,7 +271,7 @@ func (s Segment) Backup(targetDir string) error { func (s Segment) ForRewrite() (Segment, error) { randStr, err := randStr(8) if err != nil { - return Segment{}, nil + return Segment{}, err } s.Log = fmt.Sprintf("%s.rewrite.%s", s.Log, randStr) @@ -279,11 +281,11 @@ func (s Segment) ForRewrite() (Segment, error) { func (olds Segment) Rename(news Segment) error { if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("log rename: %w", err) + return fmt.Errorf("rename log rename: %w", err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("index rename: %w", err) + return fmt.Errorf("rename index rename: %w", err) } return nil @@ -292,15 +294,15 @@ func (olds Segment) Rename(news Segment) error { func (olds Segment) Override(news Segment) error { // remove index segment so we don't have invalid index if err := os.Remove(news.Index); err != nil { - return kleverr.Newf("index remove: %w", err) + return fmt.Errorf("override index delete: %w", err) } if err := os.Rename(olds.Log, news.Log); err != nil { - return kleverr.Newf("log rename: %w", err) + return fmt.Errorf("override log rename: %w", err) } if err := os.Rename(olds.Index, news.Index); err != nil { - return kleverr.Newf("index rename: %w", err) + return fmt.Errorf("override index rename: %w", err) } return nil @@ -308,10 +310,10 @@ func (olds Segment) Override(news Segment) error { func (s Segment) Remove() error { if err := os.Remove(s.Index); err != nil { - return kleverr.Newf("index remove: %w", err) + return fmt.Errorf("remove index delete: %w", err) } if err := os.Remove(s.Log); err != nil { - return kleverr.Newf("log remove: %w", err) + return fmt.Errorf("remove log delete: %w", err) } return nil } diff --git a/segment/segments.go b/segment/segments.go index 9fea6cc..e29e4d0 100644 --- a/segment/segments.go +++ b/segment/segments.go @@ -2,18 +2,18 @@ package segment import ( "errors" + "fmt" "os" "strconv" "strings" "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/kleverr" ) func Find(dir string) ([]Segment, error) { files, err := os.ReadDir(dir) if err != nil { - return nil, kleverr.Newf("read dir: %w", err) + return nil, fmt.Errorf("find read dir: %w", err) } var segments []Segment @@ -23,7 +23,7 @@ func Find(dir string) ([]Segment, error) { offset, err := strconv.ParseInt(offsetStr, 10, 64) if err != nil { - return nil, kleverr.Newf("parse offset: %w", err) + return nil, fmt.Errorf("find parse offset: %w", err) } segments = append(segments, New(dir, offset)) @@ -49,7 +49,7 @@ func Stat(segments []Segment, params index.Params) (Stats, error) { for _, seg := range segments { segStat, err := seg.Stat(params) if err != nil { - return Stats{}, err + return Stats{}, fmt.Errorf("stat %d: %w", seg.Offset, err) } total.Segments += segStat.Segments @@ -68,8 +68,11 @@ func CheckDir(dir string, params index.Params) error { case len(segments) == 0: return nil default: - s := segments[len(segments)-1] - return s.Check(params) + seg := segments[len(segments)-1] + if err := seg.Check(params); err != nil { + return fmt.Errorf("check %d: %w", seg.Offset, err) + } + return nil } } @@ -82,31 +85,33 @@ func RecoverDir(dir string, params index.Params) error { case len(segments) == 0: return nil default: - s := segments[len(segments)-1] - return s.Recover(params) + seg := segments[len(segments)-1] + if err := seg.Recover(params); err != nil { + return fmt.Errorf("recover %d: %w", seg.Offset, err) + } + return nil } } func BackupDir(dir, target string) error { - segments, err := Find(dir) - switch { + switch segments, err := Find(dir); { case errors.Is(err, os.ErrNotExist): return nil case err != nil: return err - } + default: + if err := os.MkdirAll(target, 0700); err != nil { + return fmt.Errorf("backup dir create: %w", err) + } - if err := os.MkdirAll(target, 0700); err != nil { - return kleverr.Newf("backup create dir: %w", err) + return Backup(segments, target) } - - return Backup(segments, target) } func Backup(segments []Segment, dir string) error { for _, seg := range segments { if err := seg.Backup(dir); err != nil { - return err + return fmt.Errorf("backup %d: %w", seg.Offset, err) } } diff --git a/typed_codec.go b/typed_codec.go index fcc7441..cfa4e5d 100644 --- a/typed_codec.go +++ b/typed_codec.go @@ -3,8 +3,7 @@ package klevdb import ( "encoding/binary" "encoding/json" - - "github.com/klev-dev/kleverr" + "errors" ) // Codec is interface satisfied by all codecs @@ -75,15 +74,22 @@ func (c varintCodec) Encode(t int64, empty bool) ([]byte, error) { return binary.AppendVarint(nil, t), nil } +var errShortBuffer = errors.New("varint: short buffer") +var errOverflow = errors.New("varint: overflow") + func (c varintCodec) Decode(b []byte) (int64, bool, error) { if b == nil { return 0, true, nil } t, n := binary.Varint(b) - if n <= 0 { - return 0, true, kleverr.Newf("invalid varint: %d", n) + switch { + case n == 0: + return 0, true, errShortBuffer + case n < 0: + return 0, true, errOverflow + default: + return t, false, nil } - return t, false, nil } // VarintCodec supports coding integers as varint From 7978137c25ca18fc556b486cddbe328af658a9e9 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 11 Mar 2025 09:52:50 -0400 Subject: [PATCH 6/8] remove last usages of kleverr --- go.mod | 1 - go.sum | 2 -- log.go | 34 ++++++++++++++++++++-------------- notify.go | 8 +++----- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 8784725..96ffefc 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ toolchain go1.23.1 require ( github.com/gofrs/flock v0.12.1 - github.com/klev-dev/kleverr v0.1.0 github.com/mr-tron/base58 v1.2.0 github.com/plar/go-adaptive-radix-tree/v2 v2.0.3 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index de81f44..ef297f7 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E= github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0= -github.com/klev-dev/kleverr v0.1.0 h1:UnBDKFlHFy6bnN5M/fQ3uCI4G91ciCf1jX3dj1EqL9k= -github.com/klev-dev/kleverr v0.1.0/go.mod h1:DV1tEcfsgAzKraeb/7nux27wOJs8w9P8fLB6GT7DmGM= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/log.go b/log.go index 024ccb7..f237fae 100644 --- a/log.go +++ b/log.go @@ -2,6 +2,7 @@ package klevdb import ( "errors" + "fmt" "os" "path/filepath" "sync" @@ -14,9 +15,14 @@ import ( "github.com/klev-dev/klevdb/index" "github.com/klev-dev/klevdb/message" "github.com/klev-dev/klevdb/segment" - "github.com/klev-dev/kleverr" ) +var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex) +var errKeyNotFound = fmt.Errorf("key %w", ErrNotFound) +var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex) +var errTimeNotFound = fmt.Errorf("time %w", ErrNotFound) +var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset) + // Open create a log based on a dir and set of options func Open(dir string, opts Options) (result Log, err error) { if opts.Rollover == 0 { @@ -25,7 +31,7 @@ func Open(dir string, opts Options) (result Log, err error) { if opts.CreateDirs { if err := os.MkdirAll(dir, 0700); err != nil { - return nil, kleverr.Newf("could not create log dirs: %w", err) + return nil, fmt.Errorf("open create dirs: %w", err) } } @@ -33,22 +39,22 @@ func Open(dir string, opts Options) (result Log, err error) { if opts.Readonly { switch ok, err := lock.TryRLock(); { case err != nil: - return nil, kleverr.Newf("could not lock: %w", err) + return nil, fmt.Errorf("open read lock: %w", err) case !ok: - return nil, kleverr.Newf("log already writing locked") + return nil, fmt.Errorf("open already writing locked") } } else { switch ok, err := lock.TryLock(); { case err != nil: - return nil, kleverr.Newf("could not lock: %w", err) + return nil, fmt.Errorf("open lock: %w", err) case !ok: - return nil, kleverr.Newf("log already locked") + return nil, fmt.Errorf("open already locked") } } defer func() { if err != nil { if lerr := lock.Unlock(); lerr != nil { - err = kleverr.Newf("%w: could not release lock: %w", err, lerr) + err = fmt.Errorf("%w: open release lock: %w", err, lerr) } } }() @@ -206,7 +212,7 @@ func (l *log) Consume(offset int64, maxCount int64) (int64, []message.Message, e func (l *log) ConsumeByKey(key []byte, offset int64, maxCount int64) (int64, []message.Message, error) { if !l.opts.KeyIndex { - return OffsetInvalid, nil, kleverr.Newf("%w by key", ErrNoIndex) + return OffsetInvalid, nil, errNoKeyIndex } hash := index.KeyHashEncoded(index.KeyHash(key)) @@ -247,7 +253,7 @@ func (l *log) Get(offset int64) (message.Message, error) { func (l *log) GetByKey(key []byte) (message.Message, error) { if !l.opts.KeyIndex { - return message.Invalid, kleverr.Newf("%w by key", ErrNoIndex) + return message.Invalid, errNoKeyIndex } hash := index.KeyHashEncoded(index.KeyHash(key)) @@ -269,7 +275,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) { } // not in any segment, so just return the error - return message.Invalid, kleverr.Newf("key %w", message.ErrNotFound) + return message.Invalid, errKeyNotFound } func (l *log) OffsetByKey(key []byte) (int64, error) { @@ -282,7 +288,7 @@ func (l *log) OffsetByKey(key []byte) (int64, error) { func (l *log) GetByTime(start time.Time) (message.Message, error) { if !l.opts.TimeIndex { - return message.Invalid, kleverr.Newf("%w by time", ErrNoIndex) + return message.Invalid, errNoTimeIndex } ts := start.UnixMicro() @@ -314,7 +320,7 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) { } } - return message.Invalid, kleverr.Newf("time %w", message.ErrNotFound) + return message.Invalid, errTimeNotFound } func (l *log) OffsetByTime(start time.Time) (int64, time.Time, error) { @@ -419,7 +425,7 @@ func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) { lowestOffset := orderedOffsets[0] if lowestOffset < 0 { - return nil, kleverr.Newf("%w: cannot delete relative offsets", message.ErrInvalidOffset) + return nil, errDeleteRelative } l.readersMu.RLock() @@ -544,7 +550,7 @@ func (l *log) Close() error { } if err := l.lock.Unlock(); err != nil { - return kleverr.Newf("could not release lock: %w", err) + return fmt.Errorf("close unlock: %w", err) } return nil diff --git a/notify.go b/notify.go index 74f69e7..361801a 100644 --- a/notify.go +++ b/notify.go @@ -4,8 +4,6 @@ import ( "context" "errors" "sync/atomic" - - "github.com/klev-dev/kleverr" ) var ErrOffsetNotifyClosed = errors.New("offset notify already closed") @@ -36,7 +34,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { b, ok := <-w.barrier if !ok { // already closed, return error - return kleverr.Ret(ErrOffsetNotifyClosed) + return ErrOffsetNotifyClosed } // probe the current offset @@ -55,7 +53,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { case <-b: return nil case <-ctx.Done(): - return kleverr.Ret(ctx.Err()) + return ctx.Err() } } @@ -84,7 +82,7 @@ func (w *OffsetNotify) Close() error { b, ok := <-w.barrier if !ok { // already closed, return an error - return kleverr.Ret(ErrOffsetNotifyClosed) + return ErrOffsetNotifyClosed } // close the current barrier, e.g. broadcasting update From 7074a0e7f75180aef874a53948bbd08069789295 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 11 Mar 2025 18:05:22 -0400 Subject: [PATCH 7/8] update libs --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 0296b4f..06f2b97 100644 --- a/Makefile +++ b/Makefile @@ -32,5 +32,5 @@ bench-multi: .PHONY: update-libs update-libs: - go get -u github.com/klev-dev/kleverr@main + go get -u ./... go mod tidy From f087b4e91c478bbc9829e2d6b4c83b0582ff754e Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 11 Mar 2025 18:19:12 -0400 Subject: [PATCH 8/8] explicit break on no error --- reader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/reader.go b/reader.go index 208e518..fd8749f 100644 --- a/reader.go +++ b/reader.go @@ -134,6 +134,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6 positions, err := index.Keys(keyHash) switch { case err == nil: + break case err == message.ErrNotFound: nextOffset, err := index.GetNextOffset() if err != nil {