Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion database/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
)

type (
dbClient struct {
dbOperationReporter func(context.Context, operationType, error)
dbClient struct {
db *connector.DB
operationReporter dbOperationReporter
rollbackableEvents *xsync.Map[eventHash, *databaseRollbackRequest]
relayPrivateKey string
relayURL string
Expand Down Expand Up @@ -53,6 +55,8 @@ func openDatabase(ctx context.Context, writeURLs []string, readURLs []string, ru
client := &dbClient{
rollbackableEvents: xsync.NewMap[eventHash, *databaseRollbackRequest](),
hasReadURLs: len(readURLs) > 0,
operationReporter: func(context.Context, operationType, error) {
},
}
options := []connector.Option{
connector.WithFieldNameMapper(func(in string) string {
Expand Down Expand Up @@ -83,6 +87,11 @@ func openDatabase(ctx context.Context, writeURLs []string, readURLs []string, ru
return client
}

func (client *dbClient) WithOperationReporter(reporter dbOperationReporter) *dbClient {
client.operationReporter = reporter
return client
}

func (client *dbClient) Close() (err error) {
if client.db != nil {
err = errors.Join(err, client.db.Close())
Expand Down
15 changes: 13 additions & 2 deletions database/query/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (

var (
globalDB struct {
Client *dbClient
Once sync.Once
Client *dbClient
Tracker *statusTracker
Once sync.Once
}
UsedDatabaseStorage atomic.Uint64
)
Expand Down Expand Up @@ -134,8 +135,10 @@ func MustInit(ctx context.Context, opts ...Option) {
log.Warn().Msg("database DDL execution is disabled")
}

globalDB.Tracker = newStatusTracker()
globalDB.Client = openDatabase(ctx, conf.WriteURLs, conf.ReadURLs, conf.RunDDL).
WithPrivateKey(conf.PrivateKey).
WithOperationReporter(globalDB.Tracker.Submit).
WithRelayURL(conf.RelayURL)

if !conf.DisableSelfTest {
Expand All @@ -147,6 +150,7 @@ func MustInit(ctx context.Context, opts ...Option) {
}
}

globalDB.Tracker.Start(ctx)
if !globalDB.Client.hasReadURLs {
go globalDB.Client.StartExpiredEventsCleanup(ctx)
} else {
Expand Down Expand Up @@ -275,3 +279,10 @@ func (db *dbClient) StartCollectingUsedDatabaseStorage(ctx context.Context) {
func CollectDeviceRegistrationEvents(ctx context.Context) EventIterator {
return globalDB.Client.collectDeviceRegistrationEvents(ctx)
}

func GetStatusReport(ctx context.Context) (*Status, error) {
if globalDB.Tracker == nil {
return nil, errors.New("database status tracker is not initialized")
}
return globalDB.Tracker.Get(ctx)
}
13 changes: 13 additions & 0 deletions database/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ func (db *dbClient) executeBatch(ctx context.Context, req *databaseBatchRequest)
if err == nil && (!eventsToRollback.Empty() || len(eventsToRollback.ReplaceableEvents) > 0) {
db.rollbackableEvents.Store(*req.EventsHash, &eventsToRollback)
}
db.trackEventWriteError(ctx, err)
return err
}

Expand Down Expand Up @@ -1238,6 +1239,7 @@ func (db *dbClient) SelectEvents(ctx context.Context, filters ...model.Filter) E
if errors.Is(err, connector.ErrNotFound) {
err = nil
}
db.trackEventReadError(ctx, err)
yield(nil, errors.Wrap(err, "failed to select events"))
return
}
Expand Down Expand Up @@ -1697,3 +1699,14 @@ func startPeriodicSelfTest(ctx context.Context, writeURLs []string, readURLs []s
}
}()
}

func (db *dbClient) trackEventReadError(ctx context.Context, err error) {
if errors.Is(err, ErrRaceCondition) {
err = nil
}
db.operationReporter(ctx, operationTypeRead, err)
}

func (db *dbClient) trackEventWriteError(ctx context.Context, err error) {
db.operationReporter(ctx, operationTypeWrite, err)
}
159 changes: 159 additions & 0 deletions database/query/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// SPDX-License-Identifier: ice License 1.0

package query

import (
"context"
"time"

"github.com/rs/zerolog/log"

"github.com/ice-blockchain/subzero/tracing/statefsm"
)

const (
// Threshold for consecutive failed operations before marking the database as unhealthy.
consecutiveOperationThreshold = 3
// Time window to consider operations as consecutive.
consecutiveWindow = time.Minute
)

type (
Status struct {
LastWrite time.Time // Timestamp of the last event write operation.
LastRead time.Time // Timestamp of the last event read operation.
InReadErrorState bool // Indicates if the last N read operations failed.
InWriteErrorState bool // Indicates if the last N write operations failed.
}

operationType int

operationStatus struct {
Timestamp time.Time
Err error
Type operationType
Ack chan struct{}
}
statusTracker struct {
In chan operationStatus
Req chan chan Status
}
)

const (
operationTypeRead operationType = iota + 1
operationTypeWrite
)

func newStatusTracker() *statusTracker {
return &statusTracker{
In: make(chan operationStatus, 100),
Req: make(chan chan Status, 10),
}
}

func (st *statusTracker) Start(ctx context.Context) {
go st.worker(ctx)
}

func (st *statusTracker) worker(ctx context.Context) {
var lastReadTime, lastWriteTime time.Time

readFSM := statefsm.New(consecutiveWindow, consecutiveOperationThreshold)
writeFSM := statefsm.New(consecutiveWindow, consecutiveOperationThreshold)

for ctx.Err() == nil {
select {
case <-ctx.Done():
return

case status := <-st.In:
switch status.Type {
case operationTypeRead:
lastReadTime = status.Timestamp
readFSM.Push(status.Err != nil, status.Timestamp)

case operationTypeWrite:
lastWriteTime = status.Timestamp
writeFSM.Push(status.Err != nil, status.Timestamp)

default:
log.Warn().Str("context", "db-tracker").Int("operation_type", int(status.Type)).Msg("unknown operation type")
}
if status.Ack != nil {
close(status.Ack)
}

case respChan := <-st.Req:
select {
case respChan <- Status{
LastRead: lastReadTime,
LastWrite: lastWriteTime,
InReadErrorState: readFSM.InError(),
InWriteErrorState: writeFSM.InError(),
}:
case <-ctx.Done():
return
}
}
}
}

func (st *statusTracker) Submit(ctx context.Context, opType operationType, err error) {
st.submitOp(ctx, opType, true, err)
}

func (st *statusTracker) SubmitSync(ctx context.Context, opType operationType, err error) bool {
return st.submitOp(ctx, opType, false, err)
}

func (st *statusTracker) submitOp(ctx context.Context, opType operationType, async bool, err error) bool {
data := operationStatus{
Timestamp: time.Now().UTC(),
Err: err,
Type: opType,
}

if async {
select {
case st.In <- data:
return true
case <-ctx.Done():
default:
// Drop the status update if the channel is full, meaning we already have plenty of data to process.
}
return false
}

// For synchronous submission, use an acknowledgment channel.
data.Ack = make(chan struct{})

select {
case st.In <- data:
select {
case <-data.Ack:
return true
case <-ctx.Done():
}
case <-ctx.Done():
}

return false
}

func (st *statusTracker) Get(ctx context.Context) (*Status, error) {
respChan := make(chan Status, 1) // Allow buffer to avoid workers blocking.

select {
case st.Req <- respChan:
case <-ctx.Done():
return nil, ctx.Err()
}

select {
case status := <-respChan:
return &status, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
95 changes: 95 additions & 0 deletions database/query/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// SPDX-License-Identifier: ice License 1.0

package query

import (
"context"
"testing"

"github.com/cockroachdb/errors"
"github.com/nbd-wtf/go-nostr"
"github.com/stretchr/testify/require"

"github.com/ice-blockchain/subzero/model"
)

func TestStatusTracker(t *testing.T) {
t.Parallel()

tracker := newStatusTracker()
db := helperNewDatabase(t).
WithOperationReporter(tracker.Submit)
defer db.Close()

workerCtx, cancel := context.WithCancel(t.Context())
tracker.Start(workerCtx)

isHealthy := func(t *testing.T) (readOK, writeOK bool) {
status, err := tracker.Get(t.Context())
require.NoError(t, err)

return !status.InReadErrorState, !status.InWriteErrorState
}

t.Run("Initial status is healthy", func(t *testing.T) {
readOK, writeOK := isHealthy(t)
require.True(t, readOK)
require.True(t, writeOK)
})
t.Run("Insert events successfully", func(t *testing.T) {
for range consecutiveOperationThreshold * 10 {
var ev model.Event

ev.Kind = nostr.KindTextNote
ev.CreatedAt = nostr.Now()

require.NoError(t, ev.SignWithAlg(model.GeneratePrivateKey(), model.SignAlgEDDSA, model.KeyAlgCurve25519))
require.NoError(t, db.AcceptEvents(t.Context(), &ev))
}
})
t.Run("Single error does not trip FSM", func(t *testing.T) {
require.True(t, tracker.SubmitSync(t.Context(), operationTypeRead, errors.New("simulated read error")))
require.True(t, tracker.SubmitSync(t.Context(), operationTypeWrite, errors.New("simulated write error")))

readOK, writeOK := isHealthy(t)
require.True(t, readOK)
require.True(t, writeOK)
})
t.Run("Simulate read errors to trip FSM", func(t *testing.T) {
for range consecutiveOperationThreshold {
require.True(t, tracker.SubmitSync(t.Context(), operationTypeRead, errors.New("simulated read error")))
}
readOK, writeOK := isHealthy(t)
require.False(t, readOK)
require.True(t, writeOK)

t.Run("Recover from read errors", func(t *testing.T) {
for range consecutiveOperationThreshold {
require.True(t, tracker.SubmitSync(t.Context(), operationTypeRead, nil))
}

readOK, writeOK := isHealthy(t)
require.True(t, readOK)
require.True(t, writeOK)
})
})
t.Run("Simulate write errors to trip FSM", func(t *testing.T) {
for range consecutiveOperationThreshold {
require.True(t, tracker.SubmitSync(t.Context(), operationTypeWrite, errors.New("simulated write error")))
}
readOK, writeOK := isHealthy(t)
require.True(t, readOK)
require.False(t, writeOK)

t.Run("Recover from write errors", func(t *testing.T) {
for range consecutiveOperationThreshold {
require.True(t, tracker.SubmitSync(t.Context(), operationTypeWrite, nil))
}
readOK, writeOK := isHealthy(t)
require.True(t, readOK)
require.True(t, writeOK)
})
})

cancel()
}
Loading
Loading