From 0f1f04bd7acfb53effb841378874e6d085a290a8 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Wed, 11 Feb 2026 07:06:23 -0500 Subject: [PATCH] expose more typed funcs --- typed.go | 42 ++++++++++++++++++++++++++++-------------- typed_blocking.go | 4 ++-- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/typed.go b/typed.go index 8fb3607..d63a2d3 100644 --- a/typed.go +++ b/typed.go @@ -13,46 +13,52 @@ type TMessage[K any, V any] struct { // TLog is a typed log type TLog[K any, V any] interface { - // Publish see Log.Publish + // Publish see [Log.Publish] Publish(messages []TMessage[K, V]) (nextOffset int64, err error) - // NextOffset see Log.NextOffset + // NextOffset see [Log.NextOffset] NextOffset() (nextOffset int64, err error) - // Consume see Log.Consume + // Consume see [Log.Consume] Consume(offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) - // ConsumeByKey see Log.ConsumeByKey + // ConsumeByKey see [Log.ConsumeByKey] ConsumeByKey(key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) - // Get see Log.Get + // Get see [Log.Get] Get(offset int64) (message TMessage[K, V], err error) - // GetByKey see Log.GetByKey + // GetByKey see [Log.GetByKey] GetByKey(key K, empty bool) (message TMessage[K, V], err error) - // GetByTime see Log.GetByTime + // OffsetByKey see [Lot.OffsetByKey] + OffsetByKey(key K, empty bool) (int64, error) + + // GetByTime see [Log.GetByTime] GetByTime(start time.Time) (message TMessage[K, V], err error) - // Delete see Log.Delete + // OffsetByTime see [Lot.OffsetByTime] + OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error) + + // Delete see [Log.Delete] Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error) - // Size see Log.Size + // Size see [Log.Size] Size(m Message) int64 - // Stat see Log.Stat + // Stat see [Log.Stat] Stat() (Stats, error) - // Backup see Log.Backup + // Backup see [Log.Backup] Backup(dir string) error - // Sync see Log.Sync + // Sync see [Log.Sync] Sync() (nextOffset int64, err error) - // GC see Log.GC + // GC see [Log.GC] GC(unusedFor time.Duration) error - // Close see Log.Close + // Close see [Log.Close] Close() error // Raw returns the wrapped in log @@ -156,6 +162,14 @@ func (l *tlog[K, V]) GetByKey(key K, empty bool) (TMessage[K, V], error) { return l.decode(msg) } +func (l *tlog[K, V]) OffsetByKey(key K, empty bool) (int64, error) { + kbytes, err := l.keyCodec.Encode(key, empty) + if err != nil { + return OffsetInvalid, err + } + return l.Log.OffsetByKey(kbytes) +} + func (l *tlog[K, V]) GetByTime(start time.Time) (TMessage[K, V], error) { msg, err := l.Log.GetByTime(start) if err != nil { diff --git a/typed_blocking.go b/typed_blocking.go index 974fdf8..7df734e 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -6,10 +6,10 @@ import "context" type TBlockingLog[K any, V any] interface { TLog[K, V] - // ConsumeBlocking see BlockingLog.ConsumeBlocking + // ConsumeBlocking see [BlockingLog.ConsumeBlocking] ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) - // ConsumeByKeyBlocking see BlockingLog.ConsumeByKeyBlocking + // ConsumeByKeyBlocking see [BlockingLog.ConsumeByKeyBlocking] ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error) }