Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ import (

// App is main application structure
type App struct {
dcs dcs.DCS
dcsDivergeTime time.Time
replFailTime time.Time
critical atomic.Value
ctx context.Context
nodeFailTime map[string]time.Time
dcs dcs.DCS
config *config.Config
splitTime map[string]time.Time
dcsDivergeTime time.Time
replFailTime time.Time
logger *slog.Logger
config *config.Config
nodeFailTime map[string]time.Time
shard *valkey.Shard
cache *valkey.SentiCacheNode
daemonLock *flock.Flock
timings *TimingReporter
mode appMode
aofMode aofMode
state appState
Expand Down Expand Up @@ -95,6 +96,7 @@ func NewApp(configFile, logLevel string) (*App, error) {
config: conf,
}
app.critical.Store(false)
app.timings = newTimingReporter(conf, logger)
return app, nil
}

Expand Down Expand Up @@ -130,6 +132,9 @@ func (app *App) unlockDaemonFile() {
func (app *App) Run() int {
app.lockDaemonFile()
defer app.unlockDaemonFile()

defer app.timings.Close()

err := app.connectDCS()
if err != nil {
app.logger.Error("Unable to connect to dcs", slog.Any("error", err))
Expand All @@ -154,9 +159,14 @@ func (app *App) Run() int {
go app.healthChecker()
go app.stateFileHandler()

sighup := make(chan os.Signal, 1)
signal.Notify(sighup, syscall.SIGHUP)

ticker := time.NewTicker(app.config.TickInterval)
for {
select {
case <-sighup:
app.timings.Reopen()
case <-ticker.C:
for {
app.logger.Info(fmt.Sprintf("Rdsync state: %s", app.state))
Expand Down
105 changes: 105 additions & 0 deletions internal/app/event_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package app

import (
"log/slog"
"os"
"sync"
"time"

"github.com/yandex/rdsync/internal/config"
)

// TimingReporter handles reporting event durations to a separate log file
type TimingReporter struct {
logger *slog.Logger
appLogger *slog.Logger
file *os.File
path string
mu sync.Mutex
}

func openTimingLog(path string) (*os.File, *slog.Logger, error) {
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, nil, err
}
logger := slog.New(slog.NewTextHandler(f, &slog.HandlerOptions{Level: slog.LevelInfo}))
return f, logger, nil
}

func newTimingReporter(conf *config.Config, appLogger *slog.Logger) *TimingReporter {
if conf.EventTimingLogFile == "" {
return nil
}

f, logger, err := openTimingLog(conf.EventTimingLogFile)
if err != nil {
appLogger.Error("Failed to open event timing log file", slog.String("path", conf.EventTimingLogFile), slog.Any("error", err))
return nil
}

return &TimingReporter{
logger: logger,
appLogger: appLogger,
file: f,
path: conf.EventTimingLogFile,
}
}

// reportTiming logs an event duration to the timing log file.
// If the reporter is nil (not configured), this is a no-op.
func (r *TimingReporter) reportTiming(eventType string, duration time.Duration) {
if r == nil {
return
}

r.mu.Lock()
defer r.mu.Unlock()

r.logger.Info("event_timing", slog.String("event", eventType), slog.Int64("duration_ms", duration.Milliseconds()))
}

// Reopen closes the current log file and opens it again at the same path.
// This supports log rotation: an external tool renames the file, then sends
// SIGHUP, and the reporter starts writing to a new file at the original path.
// If the reporter is nil (not configured), this is a no-op.
func (r *TimingReporter) Reopen() {
if r == nil {
return
}

r.appLogger.Info("Reopening timing log file")

r.mu.Lock()
defer r.mu.Unlock()

if r.file != nil {
r.file.Close()
}

f, logger, err := openTimingLog(r.path)
if err != nil {
r.appLogger.Error("Failed to reopen event timing log file", slog.String("path", r.path), slog.Any("error", err))
r.file = nil
r.logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
return
}

r.file = f
r.logger = logger
}

// Close shuts down the reporter and closes the log file
func (r *TimingReporter) Close() {
if r == nil {
return
}

r.mu.Lock()
defer r.mu.Unlock()

if r.file != nil {
r.file.Close()
r.file = nil
}
}
127 changes: 127 additions & 0 deletions internal/app/event_reporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package app

import (
"log/slog"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/yandex/rdsync/internal/config"
)

func TestNewTimingReporterNil(t *testing.T) {
conf := &config.Config{
EventTimingLogFile: "", // empty path
}
logger := slog.Default()

reporter := newTimingReporter(conf, logger)
require.Nil(t, reporter, "newTimingReporter should return nil when EventTimingLogFile is empty")
}

func TestReportTimingNilSafe(t *testing.T) {
var reporter *TimingReporter // nil reporter

// Should not panic when calling reportTiming on nil reporter
require.NotPanics(t, func() {
reporter.reportTiming("test_event", 100*time.Millisecond)
}, "reportTiming should be nil-safe and not panic")

// Should not panic when calling Close on nil reporter
require.NotPanics(t, func() {
reporter.Close()
}, "Close should be nil-safe and not panic")
}

func TestReportTimingWritesToFile(t *testing.T) {
tmpFile, err := os.CreateTemp("", "rdsync_timing_test_*.log")
require.NoError(t, err)
tmpPath := tmpFile.Name()
tmpFile.Close()
defer os.Remove(tmpPath)

conf := &config.Config{
EventTimingLogFile: tmpPath,
}
logger := slog.Default()

reporter := newTimingReporter(conf, logger)
require.NotNil(t, reporter)

reporter.reportTiming("switchover_complete", 1523*time.Millisecond)
reporter.reportTiming("failover_complete", 45002*time.Millisecond)
reporter.Close()

content, err := os.ReadFile(tmpPath)
require.NoError(t, err)

output := string(content)
require.Contains(t, output, "event=switchover_complete")
require.Contains(t, output, "duration_ms=1523")
require.Contains(t, output, "event=failover_complete")
require.Contains(t, output, "duration_ms=45002")
require.Contains(t, output, "msg=event_timing")
}

func TestNewTimingReporterInvalidPath(t *testing.T) {
conf := &config.Config{
EventTimingLogFile: "/nonexistent/directory/timing.log",
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))

reporter := newTimingReporter(conf, logger)
require.Nil(t, reporter, "newTimingReporter should return nil when log file cannot be opened")
}

func TestReportTimingReopen(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "rdsync_reopen_test_*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

logPath := tmpDir + "/events.log"

conf := &config.Config{
EventTimingLogFile: logPath,
}
logger := slog.Default()

reporter := newTimingReporter(conf, logger)
require.NotNil(t, reporter)
defer reporter.Close()

// Write first event
reporter.reportTiming("switchover_complete", 1000*time.Millisecond)

// Simulate logrotate: rename the file
rotatedPath := logPath + ".1"
err = os.Rename(logPath, rotatedPath)
require.NoError(t, err)

// Reopen the log (creates a new file at the original path)
reporter.Reopen()

// Write second event
reporter.reportTiming("failover_complete", 2000*time.Millisecond)

// Verify rotated file contains only the first event
rotatedContent, err := os.ReadFile(rotatedPath)
require.NoError(t, err)
require.Contains(t, string(rotatedContent), "event=switchover_complete")
require.Contains(t, string(rotatedContent), "duration_ms=1000")
require.NotContains(t, string(rotatedContent), "event=failover_complete")

// Verify new file contains only the second event
newContent, err := os.ReadFile(logPath)
require.NoError(t, err)
require.Contains(t, string(newContent), "event=failover_complete")
require.Contains(t, string(newContent), "duration_ms=2000")
require.NotContains(t, string(newContent), "event=switchover_complete")
}

func TestReopenNilSafe(t *testing.T) {
var reporter *TimingReporter
require.NotPanics(t, func() {
reporter.Reopen()
}, "Reopen should be nil-safe and not panic")
}
10 changes: 10 additions & 0 deletions internal/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ func (app *App) stateManager() appState {
err = app.approveFailover(shardState, activeNodes, master)
if err == nil {
app.logger.Info("Failover approved")
// Report master unavailability duration before performing failover
if failTime, ok := app.nodeFailTime[master]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("master_unavailable", dur)
}
err = app.performFailover(master)
if err != nil {
app.logger.Error("Unable to perform failover", slog.Any("error", err))
Expand Down Expand Up @@ -226,6 +231,11 @@ func (app *App) stateManager() appState {
}
return stateCandidate
}
// Report master unavailability duration when master recovers
if failTime, ok := app.nodeFailTime[master]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("master_unavailable", dur)
}
delete(app.nodeFailTime, master)
delete(app.splitTime, master)
app.repairShard(shardState, activeNodes, master)
Expand Down
11 changes: 11 additions & 0 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func (app *App) repairLocalNode(master string) bool {
}
}
} else if !offline {
// Report node_offline duration when node comes back online naturally
if failTime, ok := app.nodeFailTime[local.FQDN()]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("node_offline", dur)
}
delete(app.nodeFailTime, local.FQDN())
}

Expand Down Expand Up @@ -312,6 +317,12 @@ func (app *App) repairLocalNode(master string) bool {
return false
}
}
// Report node_offline duration when node is brought back online after repair
if failTime, ok := app.nodeFailTime[local.FQDN()]; ok {
dur := time.Since(failTime)
app.timings.reportTiming("node_offline", dur)
delete(app.nodeFailTime, local.FQDN())
}
err = local.SetOnline(app.ctx)
if err != nil {
app.logger.Error("Unable to set local node online", slog.Any("error", err))
Expand Down
17 changes: 17 additions & 0 deletions internal/app/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ func (app *App) failSwitchover(switchover *Switchover, err error) error {
switchover.Result.Ok = false
switchover.Result.Error = err.Error()
switchover.Result.FinishedAt = time.Now()

// Report failure timing
dur := time.Since(switchover.StartedAt)
eventName := "switchover_failed"
if switchover.Cause == CauseAuto {
eventName = "failover_failed"
}
app.timings.reportTiming(eventName, dur)

return app.dcs.Set(pathCurrentSwitch, switchover)
}

Expand Down Expand Up @@ -99,6 +108,14 @@ func (app *App) finishSwitchover(switchover *Switchover, switchErr error) error

if switchErr != nil {
switchover.Result.Error = switchErr.Error()
} else {
// Report success timing
dur := time.Since(switchover.StartedAt)
eventName := "switchover_complete"
if switchover.Cause == CauseAuto {
eventName = "failover_complete"
}
app.timings.reportTiming(eventName, dur)
}

err := app.dcs.Delete(pathCurrentSwitch)
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
MaintenanceFile string `yaml:"maintenance_file"`
DaemonLockFile string `yaml:"daemon_lock_file"`
PprofAddr string `yaml:"pprof_addr"`
EventTimingLogFile string `yaml:"event_timing_log_file"`
SentinelMode SentinelModeConfig `yaml:"sentinel_mode"`
Zookeeper dcs.ZookeeperConfig `yaml:"zookeeper"`
Valkey ValkeyConfig `yaml:"valkey"`
Expand Down Expand Up @@ -168,6 +169,7 @@ func DefaultConfig() (Config, error) {
InfoFile: "/var/run/rdsync/rdsync.info",
DaemonLockFile: "/var/run/rdsync/rdsync.lock",
MaintenanceFile: "/var/run/rdsync/rdsync.maintenance",
EventTimingLogFile: "",
PingStable: 3,
TickInterval: 5 * time.Second,
InactivationDelay: 30 * time.Second,
Expand Down
4 changes: 4 additions & 0 deletions tests/features/02_cluster_switchover_from.feature
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ Feature: Cluster mode switchover from old master
When I wait for "30" seconds
Then path "/var/lib/valkey/appendonlydir" exists on "valkey1"
Then path "/var/lib/valkey/appendonlydir" does not exist on "{{.new_master}}"
And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds
"""
event=switchover_complete duration_ms=\d+
"""

Scenario: Cluster mode switchover (from) with unhealthy replicas is rejected
Given clustered shard is up and running
Expand Down
Loading
Loading