Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions typed.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,52 @@ type TMessage[K any, V any] struct {

// TLog is a typed log
type TLog[K any, V any] interface {
// Publish see Log.Publish
// Publish see [Log.Publish]
Publish(messages []TMessage[K, V]) (nextOffset int64, err error)

// NextOffset see Log.NextOffset
// NextOffset see [Log.NextOffset]
NextOffset() (nextOffset int64, err error)

// Consume see Log.Consume
// Consume see [Log.Consume]
Consume(offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

// ConsumeByKey see Log.ConsumeByKey
// ConsumeByKey see [Log.ConsumeByKey]
ConsumeByKey(key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

// Get see Log.Get
// Get see [Log.Get]
Get(offset int64) (message TMessage[K, V], err error)

// GetByKey see Log.GetByKey
// GetByKey see [Log.GetByKey]
GetByKey(key K, empty bool) (message TMessage[K, V], err error)

// GetByTime see Log.GetByTime
// OffsetByKey see [Lot.OffsetByKey]
OffsetByKey(key K, empty bool) (int64, error)

// GetByTime see [Log.GetByTime]
GetByTime(start time.Time) (message TMessage[K, V], err error)

// Delete see Log.Delete
// OffsetByTime see [Lot.OffsetByTime]
OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error)

// Delete see [Log.Delete]
Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error)

// Size see Log.Size
// Size see [Log.Size]
Size(m Message) int64

// Stat see Log.Stat
// Stat see [Log.Stat]
Stat() (Stats, error)

// Backup see Log.Backup
// Backup see [Log.Backup]
Backup(dir string) error

// Sync see Log.Sync
// Sync see [Log.Sync]
Sync() (nextOffset int64, err error)

// GC see Log.GC
// GC see [Log.GC]
GC(unusedFor time.Duration) error

// Close see Log.Close
// Close see [Log.Close]
Close() error

// Raw returns the wrapped in log
Expand Down Expand Up @@ -156,6 +162,14 @@ func (l *tlog[K, V]) GetByKey(key K, empty bool) (TMessage[K, V], error) {
return l.decode(msg)
}

func (l *tlog[K, V]) OffsetByKey(key K, empty bool) (int64, error) {
kbytes, err := l.keyCodec.Encode(key, empty)
if err != nil {
return OffsetInvalid, err
}
return l.Log.OffsetByKey(kbytes)
}

func (l *tlog[K, V]) GetByTime(start time.Time) (TMessage[K, V], error) {
msg, err := l.Log.GetByTime(start)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions typed_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import "context"
type TBlockingLog[K any, V any] interface {
TLog[K, V]

// ConsumeBlocking see BlockingLog.ConsumeBlocking
// ConsumeBlocking see [BlockingLog.ConsumeBlocking]
ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

// ConsumeByKeyBlocking see BlockingLog.ConsumeByKeyBlocking
// ConsumeByKeyBlocking see [BlockingLog.ConsumeByKeyBlocking]
ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)
}

Expand Down