diff --git a/internal/app/app.go b/internal/app/app.go index a460dd2..8a1b98e 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -19,18 +19,19 @@ import ( // App is main application structure type App struct { - dcs dcs.DCS + dcsDivergeTime time.Time + replFailTime time.Time critical atomic.Value ctx context.Context - nodeFailTime map[string]time.Time + dcs dcs.DCS + config *config.Config splitTime map[string]time.Time - dcsDivergeTime time.Time - replFailTime time.Time logger *slog.Logger - config *config.Config + nodeFailTime map[string]time.Time shard *valkey.Shard cache *valkey.SentiCacheNode daemonLock *flock.Flock + timings *TimingReporter mode appMode aofMode aofMode state appState @@ -95,6 +96,7 @@ func NewApp(configFile, logLevel string) (*App, error) { config: conf, } app.critical.Store(false) + app.timings = newTimingReporter(conf, logger) return app, nil } @@ -130,6 +132,9 @@ func (app *App) unlockDaemonFile() { func (app *App) Run() int { app.lockDaemonFile() defer app.unlockDaemonFile() + + defer app.timings.Close() + err := app.connectDCS() if err != nil { app.logger.Error("Unable to connect to dcs", slog.Any("error", err)) @@ -154,9 +159,14 @@ func (app *App) Run() int { go app.healthChecker() go app.stateFileHandler() + sighup := make(chan os.Signal, 1) + signal.Notify(sighup, syscall.SIGHUP) + ticker := time.NewTicker(app.config.TickInterval) for { select { + case <-sighup: + app.timings.Reopen() case <-ticker.C: for { app.logger.Info(fmt.Sprintf("Rdsync state: %s", app.state)) diff --git a/internal/app/event_reporter.go b/internal/app/event_reporter.go new file mode 100644 index 0000000..d09ec90 --- /dev/null +++ b/internal/app/event_reporter.go @@ -0,0 +1,105 @@ +package app + +import ( + "log/slog" + "os" + "sync" + "time" + + "github.com/yandex/rdsync/internal/config" +) + +// TimingReporter handles reporting event durations to a separate log file +type TimingReporter struct { + logger *slog.Logger + appLogger *slog.Logger + file *os.File + path string + mu sync.Mutex +} + +func openTimingLog(path string) (*os.File, *slog.Logger, error) { + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, nil, err + } + logger := slog.New(slog.NewTextHandler(f, &slog.HandlerOptions{Level: slog.LevelInfo})) + return f, logger, nil +} + +func newTimingReporter(conf *config.Config, appLogger *slog.Logger) *TimingReporter { + if conf.EventTimingLogFile == "" { + return nil + } + + f, logger, err := openTimingLog(conf.EventTimingLogFile) + if err != nil { + appLogger.Error("Failed to open event timing log file", slog.String("path", conf.EventTimingLogFile), slog.Any("error", err)) + return nil + } + + return &TimingReporter{ + logger: logger, + appLogger: appLogger, + file: f, + path: conf.EventTimingLogFile, + } +} + +// reportTiming logs an event duration to the timing log file. +// If the reporter is nil (not configured), this is a no-op. +func (r *TimingReporter) reportTiming(eventType string, duration time.Duration) { + if r == nil { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + r.logger.Info("event_timing", slog.String("event", eventType), slog.Int64("duration_ms", duration.Milliseconds())) +} + +// Reopen closes the current log file and opens it again at the same path. +// This supports log rotation: an external tool renames the file, then sends +// SIGHUP, and the reporter starts writing to a new file at the original path. +// If the reporter is nil (not configured), this is a no-op. +func (r *TimingReporter) Reopen() { + if r == nil { + return + } + + r.appLogger.Info("Reopening timing log file") + + r.mu.Lock() + defer r.mu.Unlock() + + if r.file != nil { + r.file.Close() + } + + f, logger, err := openTimingLog(r.path) + if err != nil { + r.appLogger.Error("Failed to reopen event timing log file", slog.String("path", r.path), slog.Any("error", err)) + r.file = nil + r.logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) + return + } + + r.file = f + r.logger = logger +} + +// Close shuts down the reporter and closes the log file +func (r *TimingReporter) Close() { + if r == nil { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.file != nil { + r.file.Close() + r.file = nil + } +} diff --git a/internal/app/event_reporter_test.go b/internal/app/event_reporter_test.go new file mode 100644 index 0000000..b774802 --- /dev/null +++ b/internal/app/event_reporter_test.go @@ -0,0 +1,127 @@ +package app + +import ( + "log/slog" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/yandex/rdsync/internal/config" +) + +func TestNewTimingReporterNil(t *testing.T) { + conf := &config.Config{ + EventTimingLogFile: "", // empty path + } + logger := slog.Default() + + reporter := newTimingReporter(conf, logger) + require.Nil(t, reporter, "newTimingReporter should return nil when EventTimingLogFile is empty") +} + +func TestReportTimingNilSafe(t *testing.T) { + var reporter *TimingReporter // nil reporter + + // Should not panic when calling reportTiming on nil reporter + require.NotPanics(t, func() { + reporter.reportTiming("test_event", 100*time.Millisecond) + }, "reportTiming should be nil-safe and not panic") + + // Should not panic when calling Close on nil reporter + require.NotPanics(t, func() { + reporter.Close() + }, "Close should be nil-safe and not panic") +} + +func TestReportTimingWritesToFile(t *testing.T) { + tmpFile, err := os.CreateTemp("", "rdsync_timing_test_*.log") + require.NoError(t, err) + tmpPath := tmpFile.Name() + tmpFile.Close() + defer os.Remove(tmpPath) + + conf := &config.Config{ + EventTimingLogFile: tmpPath, + } + logger := slog.Default() + + reporter := newTimingReporter(conf, logger) + require.NotNil(t, reporter) + + reporter.reportTiming("switchover_complete", 1523*time.Millisecond) + reporter.reportTiming("failover_complete", 45002*time.Millisecond) + reporter.Close() + + content, err := os.ReadFile(tmpPath) + require.NoError(t, err) + + output := string(content) + require.Contains(t, output, "event=switchover_complete") + require.Contains(t, output, "duration_ms=1523") + require.Contains(t, output, "event=failover_complete") + require.Contains(t, output, "duration_ms=45002") + require.Contains(t, output, "msg=event_timing") +} + +func TestNewTimingReporterInvalidPath(t *testing.T) { + conf := &config.Config{ + EventTimingLogFile: "/nonexistent/directory/timing.log", + } + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + + reporter := newTimingReporter(conf, logger) + require.Nil(t, reporter, "newTimingReporter should return nil when log file cannot be opened") +} + +func TestReportTimingReopen(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "rdsync_reopen_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + logPath := tmpDir + "/events.log" + + conf := &config.Config{ + EventTimingLogFile: logPath, + } + logger := slog.Default() + + reporter := newTimingReporter(conf, logger) + require.NotNil(t, reporter) + defer reporter.Close() + + // Write first event + reporter.reportTiming("switchover_complete", 1000*time.Millisecond) + + // Simulate logrotate: rename the file + rotatedPath := logPath + ".1" + err = os.Rename(logPath, rotatedPath) + require.NoError(t, err) + + // Reopen the log (creates a new file at the original path) + reporter.Reopen() + + // Write second event + reporter.reportTiming("failover_complete", 2000*time.Millisecond) + + // Verify rotated file contains only the first event + rotatedContent, err := os.ReadFile(rotatedPath) + require.NoError(t, err) + require.Contains(t, string(rotatedContent), "event=switchover_complete") + require.Contains(t, string(rotatedContent), "duration_ms=1000") + require.NotContains(t, string(rotatedContent), "event=failover_complete") + + // Verify new file contains only the second event + newContent, err := os.ReadFile(logPath) + require.NoError(t, err) + require.Contains(t, string(newContent), "event=failover_complete") + require.Contains(t, string(newContent), "duration_ms=2000") + require.NotContains(t, string(newContent), "event=switchover_complete") +} + +func TestReopenNilSafe(t *testing.T) { + var reporter *TimingReporter + require.NotPanics(t, func() { + reporter.Reopen() + }, "Reopen should be nil-safe and not panic") +} diff --git a/internal/app/manager.go b/internal/app/manager.go index d6af8bf..365eb04 100644 --- a/internal/app/manager.go +++ b/internal/app/manager.go @@ -154,6 +154,11 @@ func (app *App) stateManager() appState { err = app.approveFailover(shardState, activeNodes, master) if err == nil { app.logger.Info("Failover approved") + // Report master unavailability duration before performing failover + if failTime, ok := app.nodeFailTime[master]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("master_unavailable", dur) + } err = app.performFailover(master) if err != nil { app.logger.Error("Unable to perform failover", slog.Any("error", err)) @@ -226,6 +231,11 @@ func (app *App) stateManager() appState { } return stateCandidate } + // Report master unavailability duration when master recovers + if failTime, ok := app.nodeFailTime[master]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("master_unavailable", dur) + } delete(app.nodeFailTime, master) delete(app.splitTime, master) app.repairShard(shardState, activeNodes, master) diff --git a/internal/app/repair.go b/internal/app/repair.go index 6eb131e..603b5a5 100644 --- a/internal/app/repair.go +++ b/internal/app/repair.go @@ -217,6 +217,11 @@ func (app *App) repairLocalNode(master string) bool { } } } else if !offline { + // Report node_offline duration when node comes back online naturally + if failTime, ok := app.nodeFailTime[local.FQDN()]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("node_offline", dur) + } delete(app.nodeFailTime, local.FQDN()) } @@ -312,6 +317,12 @@ func (app *App) repairLocalNode(master string) bool { return false } } + // Report node_offline duration when node is brought back online after repair + if failTime, ok := app.nodeFailTime[local.FQDN()]; ok { + dur := time.Since(failTime) + app.timings.reportTiming("node_offline", dur) + delete(app.nodeFailTime, local.FQDN()) + } err = local.SetOnline(app.ctx) if err != nil { app.logger.Error("Unable to set local node online", slog.Any("error", err)) diff --git a/internal/app/switchover.go b/internal/app/switchover.go index a0505f3..a4e15a0 100644 --- a/internal/app/switchover.go +++ b/internal/app/switchover.go @@ -70,6 +70,15 @@ func (app *App) failSwitchover(switchover *Switchover, err error) error { switchover.Result.Ok = false switchover.Result.Error = err.Error() switchover.Result.FinishedAt = time.Now() + + // Report failure timing + dur := time.Since(switchover.StartedAt) + eventName := "switchover_failed" + if switchover.Cause == CauseAuto { + eventName = "failover_failed" + } + app.timings.reportTiming(eventName, dur) + return app.dcs.Set(pathCurrentSwitch, switchover) } @@ -99,6 +108,14 @@ func (app *App) finishSwitchover(switchover *Switchover, switchErr error) error if switchErr != nil { switchover.Result.Error = switchErr.Error() + } else { + // Report success timing + dur := time.Since(switchover.StartedAt) + eventName := "switchover_complete" + if switchover.Cause == CauseAuto { + eventName = "failover_complete" + } + app.timings.reportTiming(eventName, dur) } err := app.dcs.Delete(pathCurrentSwitch) diff --git a/internal/config/config.go b/internal/config/config.go index b63233c..7da5eb5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -71,6 +71,7 @@ type Config struct { MaintenanceFile string `yaml:"maintenance_file"` DaemonLockFile string `yaml:"daemon_lock_file"` PprofAddr string `yaml:"pprof_addr"` + EventTimingLogFile string `yaml:"event_timing_log_file"` SentinelMode SentinelModeConfig `yaml:"sentinel_mode"` Zookeeper dcs.ZookeeperConfig `yaml:"zookeeper"` Valkey ValkeyConfig `yaml:"valkey"` @@ -168,6 +169,7 @@ func DefaultConfig() (Config, error) { InfoFile: "/var/run/rdsync/rdsync.info", DaemonLockFile: "/var/run/rdsync/rdsync.lock", MaintenanceFile: "/var/run/rdsync/rdsync.maintenance", + EventTimingLogFile: "", PingStable: 3, TickInterval: 5 * time.Second, InactivationDelay: 30 * time.Second, diff --git a/tests/features/02_cluster_switchover_from.feature b/tests/features/02_cluster_switchover_from.feature index d238a70..6f0d4cc 100644 --- a/tests/features/02_cluster_switchover_from.feature +++ b/tests/features/02_cluster_switchover_from.feature @@ -48,6 +48,10 @@ Feature: Cluster mode switchover from old master When I wait for "30" seconds Then path "/var/lib/valkey/appendonlydir" exists on "valkey1" Then path "/var/lib/valkey/appendonlydir" does not exist on "{{.new_master}}" + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + event=switchover_complete duration_ms=\d+ + """ Scenario: Cluster mode switchover (from) with unhealthy replicas is rejected Given clustered shard is up and running diff --git a/tests/features/02_sentinel_switchover_from.feature b/tests/features/02_sentinel_switchover_from.feature index 8bde22d..d12152a 100644 --- a/tests/features/02_sentinel_switchover_from.feature +++ b/tests/features/02_sentinel_switchover_from.feature @@ -51,6 +51,10 @@ Feature: Sentinel mode switchover from old master When I wait for "30" seconds Then path "/var/lib/valkey/appendonlydir" exists on "valkey1" Then path "/var/lib/valkey/appendonlydir" does not exist on "{{.new_master}}" + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + event=switchover_complete duration_ms=\d+ + """ Scenario: Sentinel mode switchover with unhealthy replicas is rejected Given sentinel shard is up and running diff --git a/tests/features/04_cluster_failover.feature b/tests/features/04_cluster_failover.feature index ddfab9d..992caca 100644 --- a/tests/features/04_cluster_failover.feature +++ b/tests/features/04_cluster_failover.feature @@ -43,6 +43,10 @@ Feature: Cluster mode failover from dead master When I get zookeeper node "/test/master" And I save zookeeper query result as "new_master" Then valkey host "{{.new_master}}" should be master + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + event=failover_complete duration_ms=\d+ + """ When host "valkey1" is started Then valkey host "valkey1" should become available within "20" seconds And valkey host "valkey1" should become replica of "{{.new_master}}" within "30" seconds diff --git a/tests/features/04_sentinel_failover.feature b/tests/features/04_sentinel_failover.feature index 2d5b9bb..37210e3 100644 --- a/tests/features/04_sentinel_failover.feature +++ b/tests/features/04_sentinel_failover.feature @@ -43,6 +43,10 @@ Feature: Sentinel mode failover from dead master When I get zookeeper node "/test/master" And I save zookeeper query result as "new_master" Then valkey host "{{.new_master}}" should be master + And file "/var/log/rdsync_events.log" on any of hosts "valkey1,valkey2,valkey3" should match regexp within "30" seconds + """ + event=failover_complete duration_ms=\d+ + """ When host "valkey1" is started Then valkey host "valkey1" should become available within "20" seconds And valkey host "valkey1" should become replica of "{{.new_master}}" within "30" seconds diff --git a/tests/images/valkey/rdsync_cluster.yaml b/tests/images/valkey/rdsync_cluster.yaml index 7c30d0f..08e2039 100644 --- a/tests/images/valkey/rdsync_cluster.yaml +++ b/tests/images/valkey/rdsync_cluster.yaml @@ -5,6 +5,7 @@ pprof_addr: ":8081" info_file: /var/run/rdsync.info maintenance_file: /var/run/rdsync.maintenance daemon_lock_file: /var/run/rdsync.lock +event_timing_log_file: "/var/log/rdsync_events.log" valkey: auth_password: functestpassword restart_command: supervisorctl restart valkey diff --git a/tests/images/valkey/rdsync_sentinel.yaml b/tests/images/valkey/rdsync_sentinel.yaml index eea26af..6fe3f3c 100644 --- a/tests/images/valkey/rdsync_sentinel.yaml +++ b/tests/images/valkey/rdsync_sentinel.yaml @@ -5,6 +5,7 @@ pprof_addr: ":8081" info_file: /var/run/rdsync.info maintenance_file: /var/run/rdsync.maintenance daemon_lock_file: /var/run/rdsync.lock +event_timing_log_file: "/var/log/rdsync_events.log" valkey: auth_password: functestpassword restart_command: supervisorctl restart valkey diff --git a/tests/rdsync_test.go b/tests/rdsync_test.go index 2f00e67..8e6c9fa 100644 --- a/tests/rdsync_test.go +++ b/tests/rdsync_test.go @@ -1052,6 +1052,60 @@ func (tctx *testContext) stepInfoFileOnHostMatch(filepath, host, matcher string, return err } +func (tctx *testContext) stepFileOnHostsShouldMatchRegexpWithin(filepath, hostsStr string, timeout int, body *godog.DocString) error { + pattern := strings.TrimSpace(body.Content) + matcher, err := matchers.GetMatcher("regexp") + if err != nil { + return err + } + + hosts := strings.Split(hostsStr, ",") + for i := range hosts { + hosts[i] = strings.TrimSpace(hosts[i]) + } + + var lastError error + var outputs map[string]string + found := false + + testutil.Retry(func() bool { + outputs = make(map[string]string) + + for _, host := range hosts { + cmd := fmt.Sprintf("cat '%s'", filepath) + _, output, cmdErr := tctx.composer.RunCommand(host, cmd, 10*time.Second) + if cmdErr != nil { + outputs[host] = fmt.Sprintf("error: %s", cmdErr) + continue + } + outputs[host] = output + + // Check if this host's output matches + if matchErr := matcher(output, pattern); matchErr == nil { + found = true + return true // Success! Found a match + } + } + return false // No match yet, keep retrying + }, time.Duration(timeout)*time.Second, time.Second) + + if found { + return nil + } + + // If we get here, no host matched within the timeout + var details strings.Builder + details.WriteString(fmt.Sprintf("file %s did not match pattern on any host after %d seconds.\n", filepath, timeout)) + details.WriteString("Pattern:\n") + details.WriteString(fmt.Sprintf(" %s\n", pattern)) + details.WriteString("Outputs from each host:\n") + for host, output := range outputs { + details.WriteString(fmt.Sprintf(" %s: %s\n", host, output)) + } + lastError = fmt.Errorf("%s", details.String()) + return lastError +} + func InitializeScenario(s *godog.ScenarioContext) { tctx, err := newTestContext() if err != nil { @@ -1168,6 +1222,7 @@ func InitializeScenario(s *godog.ScenarioContext) { // misc s.Step(`^I wait for "(\d+)" seconds$`, tctx.stepIWaitFor) s.Step(`^info file "([^"]*)" on "([^"]*)" match (\w+)$`, tctx.stepInfoFileOnHostMatch) + s.Step(`^file "([^"]*)" on any of hosts "([^"]*)" should match regexp within "(\d+)" seconds$`, tctx.stepFileOnHostsShouldMatchRegexpWithin) } func TestRdsync(t *testing.T) {