Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package klevdb

import "context"

// BlockingLog enhances log adding blocking consume
// BlockingLog enhances [Log] adding blocking consume
type BlockingLog interface {
Log

// ConsumeBlocking is similar to Consume, but if offset is equal to the next offset it will block until next message is produced
// ConsumeBlocking is similar to [Consume], but if offset is equal to the next offset it will block until next message is produced
ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

// ConsumeByKeyBlocking is similar to ConsumeBlocking, but only returns messages matching the key
// ConsumeByKeyBlocking is similar to [ConsumeBlocking], but only returns messages matching the key
ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)
}

// OpenBlocking opens log and wraps it with support for blocking consume
// OpenBlocking opens a [Log] and wraps it with support for blocking consume
func OpenBlocking(dir string, opts Options) (BlockingLog, error) {
l, err := Open(dir, opts)
if err != nil {
Expand All @@ -22,7 +22,7 @@ func OpenBlocking(dir string, opts Options) (BlockingLog, error) {
return WrapBlocking(l)
}

// WrapBlocking wraps log with support for blocking consume
// WrapBlocking wraps a [Log] with support for blocking consume
func WrapBlocking(l Log) (BlockingLog, error) {
next, err := l.NextOffset()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion compact/deletes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package compact

import (
"context"
"slices"
"testing"
"time"

"github.com/klev-dev/klevdb"
"github.com/klev-dev/klevdb/message"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

func TestDeletes(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion compact/updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package compact

import (
"context"
"slices"
"testing"
"time"

"github.com/klev-dev/klevdb"
"github.com/klev-dev/klevdb/message"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)

func TestUpdates(t *testing.T) {
Expand Down
19 changes: 7 additions & 12 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package klevdb

import (
"context"
"maps"
"time"

"golang.org/x/exp/maps"
)

// DeleteMultiBackoff is call on each iteration of
// DeleteMulti to give applications opportunity to not overload
// the target log with deletes
// DeleteMultiBackoff is call on each iteration of [DeleteMulti] to give applications
// opportunity to not overload the target log with deletes
type DeleteMultiBackoff func(context.Context) error

// DeleteMultiWithWait returns a backoff func that sleeps/waits
Expand All @@ -27,16 +25,13 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff {
}

// DeleteMulti tries to delete all messages with offsets
//
// from the log and returns the amount of storage deleted
// from the log and returns the amount of storage deleted
//
// If error is encountered, it will return the deleted offsets
// and size, together with the error
//
// and size, together with the error
//
// DeleteMultiBackoff is called on each iteration to give
//
// others a chanse to work with the log, while being deleted
// [DeleteMultiBackoff] is called on each iteration to give
// others a chanse to work with the log, while being deleted
func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
var deletedOffsets = map[int64]struct{}{}
var deletedSize int64
Expand Down
8 changes: 4 additions & 4 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package klevdb
import (
"errors"
"fmt"
"maps"
"os"
"path/filepath"
"slices"
"sync"
"time"

"github.com/gofrs/flock"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"

"github.com/klev-dev/klevdb/index"
"github.com/klev-dev/klevdb/message"
Expand All @@ -23,7 +23,7 @@ var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex)
var errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound)
var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset)

// Open create a log based on a dir and set of options
// Open opens or creates a [Log] based on a dir and set of options
func Open(dir string, opts Options) (result Log, err error) {
if opts.Rollover == 0 {
opts.Rollover = 1024 * 1024
Expand Down Expand Up @@ -423,7 +423,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
}

func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) {
orderedOffsets := maps.Keys(offsets)
orderedOffsets := slices.Collect(maps.Keys(offsets))
slices.Sort(orderedOffsets)
lowestOffset := orderedOffsets[0]

Expand Down
7 changes: 3 additions & 4 deletions segment/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import (
"errors"
"fmt"
"io"
"maps"
"os"
"path/filepath"

"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"slices"

"github.com/klev-dev/klevdb/index"
"github.com/klev-dev/klevdb/message"
Expand Down Expand Up @@ -328,7 +327,7 @@ type RewriteSegment struct {
}

func (r *RewriteSegment) GetNewSegment() Segment {
orderedOffsets := maps.Keys(r.SurviveOffsets)
orderedOffsets := slices.Collect(maps.Keys(r.SurviveOffsets))
slices.Sort(orderedOffsets)
lowestOffset := orderedOffsets[0]
return New(r.Segment.Dir, lowestOffset)
Expand Down
3 changes: 2 additions & 1 deletion typed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package klevdb

import "time"

// TMessage represents a typed [Message]
type TMessage[K any, V any] struct {
Offset int64
Time time.Time
Expand All @@ -11,7 +12,7 @@ type TMessage[K any, V any] struct {
ValueEmpty bool
}

// TLog is a typed log
// TLog is a typed [Log] which encodes/decodes keys and values to bytes.
type TLog[K any, V any] interface {
// Publish see [Log.Publish]
Publish(messages []TMessage[K, V]) (nextOffset int64, err error)
Expand Down
6 changes: 3 additions & 3 deletions typed_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package klevdb

import "context"

// TBlockingLog enhances tlog adding blocking consume
// TBlockingLog enhances [TLog] adding blocking consume
type TBlockingLog[K any, V any] interface {
TLog[K, V]

Expand All @@ -13,7 +13,7 @@ type TBlockingLog[K any, V any] interface {
ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)
}

// OpenBlocking opens tlog and wraps it with support for blocking consume
// OpenTBlocking opens tlog and wraps it with support for blocking consume
func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error) {
l, err := OpenT(dir, opts, keyCodec, valueCodec)
if err != nil {
Expand All @@ -22,7 +22,7 @@ func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], va
return WrapTBlocking(l)
}

// WrapBlocking wraps tlog with support for blocking consume
// WrapTBlocking wraps tlog with support for blocking consume
func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) {
next, err := l.NextOffset()
if err != nil {
Expand Down