diff --git a/blocking.go b/blocking.go index b18eb26..51701c6 100644 --- a/blocking.go +++ b/blocking.go @@ -1,6 +1,10 @@ package klevdb -import "context" +import ( + "context" + + "github.com/klev-dev/klevdb/notify" +) // BlockingLog enhances [Log] adding blocking consume type BlockingLog interface { @@ -28,12 +32,12 @@ func WrapBlocking(l Log) (BlockingLog, error) { if err != nil { return nil, err } - return &blockingLog{l, NewOffsetNotify(next)}, nil + return &blockingLog{l, notify.NewOffset(next)}, nil } type blockingLog struct { Log - notify *OffsetNotify + notify *notify.Offset } func (l *blockingLog) Publish(messages []Message) (int64, error) { diff --git a/notify.go b/notify/notify.go similarity index 83% rename from notify.go rename to notify/notify.go index 361801a..fdd7992 100644 --- a/notify.go +++ b/notify/notify.go @@ -1,4 +1,4 @@ -package klevdb +package notify import ( "context" @@ -8,13 +8,13 @@ import ( var ErrOffsetNotifyClosed = errors.New("offset notify already closed") -type OffsetNotify struct { +type Offset struct { nextOffset atomic.Int64 barrier chan chan struct{} } -func NewOffsetNotify(nextOffset int64) *OffsetNotify { - w := &OffsetNotify{ +func NewOffset(nextOffset int64) *Offset { + w := &Offset{ barrier: make(chan chan struct{}, 1), } @@ -24,7 +24,7 @@ func NewOffsetNotify(nextOffset int64) *OffsetNotify { return w } -func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { +func (w *Offset) Wait(ctx context.Context, offset int64) error { // quick path, just load and check if w.nextOffset.Load() > offset { return nil @@ -57,7 +57,7 @@ func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error { } } -func (w *OffsetNotify) Set(nextOffset int64) { +func (w *Offset) Set(nextOffset int64) { // acquire current barrier b, ok := <-w.barrier if !ok { @@ -77,7 +77,7 @@ func (w *OffsetNotify) Set(nextOffset int64) { w.barrier <- make(chan struct{}) } -func (w *OffsetNotify) Close() error { +func (w *Offset) Close() error { // acquire current barrier b, ok := <-w.barrier if !ok { diff --git a/notify_test.go b/notify/notify_test.go similarity index 90% rename from notify_test.go rename to notify/notify_test.go index ef298a1..3124d7f 100644 --- a/notify_test.go +++ b/notify/notify_test.go @@ -1,4 +1,4 @@ -package klevdb +package notify import ( "context" @@ -11,14 +11,14 @@ import ( func TestNotify(t *testing.T) { t.Run("unblock", func(t *testing.T) { - n := NewOffsetNotify(10) + n := NewOffset(10) err := n.Wait(context.TODO(), 5) require.NoError(t, err) }) t.Run("blocked", func(t *testing.T) { - n := NewOffsetNotify(10) + n := NewOffset(10) ch := make(chan struct{}) var wg sync.WaitGroup @@ -38,7 +38,7 @@ func TestNotify(t *testing.T) { }) t.Run("cancel", func(t *testing.T) { - n := NewOffsetNotify(10) + n := NewOffset(10) ch := make(chan struct{}) var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.TODO()) @@ -60,7 +60,7 @@ func TestNotify(t *testing.T) { }) t.Run("close", func(t *testing.T) { - n := NewOffsetNotify(10) + n := NewOffset(10) ch := make(chan struct{}) var wg sync.WaitGroup @@ -80,7 +80,7 @@ func TestNotify(t *testing.T) { }) t.Run("close_err", func(t *testing.T) { - n := NewOffsetNotify(10) + n := NewOffset(10) err := n.Close() require.NoError(t, err) diff --git a/typed_blocking.go b/typed_blocking.go index 8db0b08..9f3d7f7 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -1,6 +1,10 @@ package klevdb -import "context" +import ( + "context" + + "github.com/klev-dev/klevdb/notify" +) // TBlockingLog enhances [TLog] adding blocking consume type TBlockingLog[K any, V any] interface { @@ -28,12 +32,12 @@ func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error) { if err != nil { return nil, err } - return &tlogBlocking[K, V]{l, NewOffsetNotify(next)}, nil + return &tlogBlocking[K, V]{l, notify.NewOffset(next)}, nil } type tlogBlocking[K any, V any] struct { TLog[K, V] - notify *OffsetNotify + notify *notify.Offset } func (l *tlogBlocking[K, V]) Publish(tmessages []TMessage[K, V]) (int64, error) {