From df0be5f9c7baa8d59e717912d6e11374efd4095b Mon Sep 17 00:00:00 2001 From: Mohammed Sayed <49954985+cersho@users.noreply.github.com> Date: Sat, 21 Mar 2026 13:29:34 +0200 Subject: [PATCH 1/2] feat: add health monitoring and health check management - Implemented a health monitoring system with a new Monitor struct to manage health checks for database connections. - Added functionality to probe connections for various database types, including PostgreSQL, MySQL, Convex, and Cloudflare D1. - Created a health check section in the UI to display the status of health checks, including the ability to configure settings and view notifications. - Enhanced the app's HTML template to include health check indicators and a detailed view of affected databases. --- cmd/anchordb/main.go | 6 +- internal/api/handler.go | 240 +++++++- internal/api/handler_integration_test.go | 127 ++++- internal/config/config.go | 16 + internal/health/monitor.go | 144 +++++ internal/health/probe.go | 175 ++++++ internal/metadata/db.go | 2 + internal/models/models.go | 34 ++ internal/notifications/dispatcher.go | 90 +++ internal/repository/repository.go | 451 ++++++++++++++- .../repository/repository_integration_test.go | 154 ++++++ internal/ui/handler.go | 521 ++++++++++++------ internal/ui/templates/app.html | 276 +++++++++- 13 files changed, 2051 insertions(+), 185 deletions(-) create mode 100644 internal/health/monitor.go create mode 100644 internal/health/probe.go diff --git a/cmd/anchordb/main.go b/cmd/anchordb/main.go index 91f49c3..dccc5ff 100644 --- a/cmd/anchordb/main.go +++ b/cmd/anchordb/main.go @@ -12,6 +12,7 @@ import ( "anchordb/internal/api" "anchordb/internal/config" "anchordb/internal/crypto" + "anchordb/internal/health" "anchordb/internal/metadata" "anchordb/internal/repository" "anchordb/internal/scheduler" @@ -36,14 +37,16 @@ func main() { repo := repository.New(db, cryptoSvc) exec := scheduler.NewExecutor(cfg) sch := scheduler.New(repo, exec, cfg) + hmon := health.NewMonitor(cfg, repo) ctx := context.Background() if err := sch.LoadAll(ctx); err != nil { log.Fatalf("load schedules: %v", err) } sch.Start() + hmon.Start(ctx) - apiHandler := api.NewHandler(repo, sch) + apiHandler := api.NewHandler(repo, sch, cfg) uiHandler := ui.NewHandler(repo, sch, cfg) r := chi.NewRouter() @@ -71,6 +74,7 @@ func main() { log.Println("shutting down") sch.Stop() + hmon.Stop() shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() diff --git a/internal/api/handler.go b/internal/api/handler.go index 6cf3a58..351eea6 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "anchordb/internal/config" + "anchordb/internal/health" "anchordb/internal/models" "anchordb/internal/notifications" "anchordb/internal/repository" @@ -23,10 +25,11 @@ type Handler struct { repo *repository.Repository scheduler *scheduler.Scheduler notifier *notifications.Dispatcher + cfg config.Config } -func NewHandler(repo *repository.Repository, scheduler *scheduler.Scheduler) *Handler { - return &Handler{repo: repo, scheduler: scheduler, notifier: notifications.NewDispatcher(repo)} +func NewHandler(repo *repository.Repository, scheduler *scheduler.Scheduler, cfg config.Config) *Handler { + return &Handler{repo: repo, scheduler: scheduler, notifier: notifications.NewDispatcher(repo), cfg: cfg} } func (h *Handler) Router() http.Handler { @@ -74,6 +77,15 @@ func (h *Handler) Router() http.Handler { r.Put("/{id}/notifications", h.setBackupNotifications) }) + r.Route("/health-checks", func(r chi.Router) { + r.Get("/", h.listHealthChecks) + r.Get("/{id}", h.getHealthCheck) + r.Patch("/{id}", h.updateHealthCheck) + r.Post("/{id}/run", h.runHealthCheckNow) + r.Get("/{id}/notifications", h.listHealthCheckNotifications) + r.Put("/{id}/notifications", h.setHealthCheckNotifications) + }) + return r } @@ -112,6 +124,15 @@ func (h *Handler) createConnection(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err.Error()) return } + if _, err := h.repo.UpsertHealthCheckForConnection(r.Context(), req.ID, repository.HealthCheckDefaults{ + IntervalSecond: int(h.cfg.DefaultHealthEvery / time.Second), + TimeoutSecond: int(h.cfg.DefaultHealthTimeout / time.Second), + FailureThreshold: h.cfg.DefaultFailThreshold, + SuccessThreshold: h.cfg.DefaultPassThreshold, + }); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } redactConnection(&req) writeJSON(w, http.StatusCreated, req) } @@ -729,6 +750,221 @@ func (h *Handler) setBackupNotifications(w http.ResponseWriter, r *http.Request) writeJSON(w, http.StatusOK, items) } +func (h *Handler) listHealthChecks(w http.ResponseWriter, r *http.Request) { + items, err := h.repo.ListHealthChecks(r.Context()) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + for i := range items { + redactConnection(&items[i].Connection) + } + writeJSON(w, http.StatusOK, items) +} + +func (h *Handler) getHealthCheck(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + item, err := h.repo.GetHealthCheck(r.Context(), id) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + writeError(w, http.StatusNotFound, "health check not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + redactConnection(&item.Connection) + writeJSON(w, http.StatusOK, item) +} + +func (h *Handler) updateHealthCheck(w http.ResponseWriter, r *http.Request) { + type patchRequest struct { + Enabled *bool `json:"enabled"` + CheckIntervalSecond *int `json:"check_interval_second"` + TimeoutSecond *int `json:"timeout_second"` + FailureThreshold *int `json:"failure_threshold"` + SuccessThreshold *int `json:"success_threshold"` + } + + id := chi.URLParam(r, "id") + var req patchRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid JSON") + return + } + + item, err := h.repo.UpdateHealthCheck(r.Context(), id, repository.HealthCheckPatch{ + Enabled: req.Enabled, + CheckIntervalSecond: req.CheckIntervalSecond, + TimeoutSecond: req.TimeoutSecond, + FailureThreshold: req.FailureThreshold, + SuccessThreshold: req.SuccessThreshold, + }) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + writeError(w, http.StatusNotFound, "health check not found") + return + } + writeError(w, http.StatusBadRequest, err.Error()) + return + } + redactConnection(&item.Connection) + writeJSON(w, http.StatusOK, item) +} + +func (h *Handler) runHealthCheckNow(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + item, err := h.repo.GetHealthCheck(r.Context(), id) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + writeError(w, http.StatusNotFound, "health check not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + if item.LastCheckedAt != nil && h.cfg.HealthManualCooldown > 0 { + nextAllowedAt := item.LastCheckedAt.UTC().Add(h.cfg.HealthManualCooldown) + if time.Now().UTC().Before(nextAllowedAt) { + retryAfter := int(time.Until(nextAllowedAt).Seconds()) + if retryAfter < 1 { + retryAfter = 1 + } + w.Header().Set("Retry-After", strconv.Itoa(retryAfter)) + writeJSON(w, http.StatusTooManyRequests, map[string]any{ + "status": "cooldown", + "error": "manual health check is on cooldown", + "retry_after_seconds": retryAfter, + }) + return + } + } + + timeoutSecond := item.TimeoutSecond + if timeoutSecond <= 0 { + timeoutSecond = int(h.cfg.DefaultHealthTimeout / time.Second) + } + if timeoutSecond <= 0 { + timeoutSecond = 5 + } + timeout := time.Duration(timeoutSecond) * time.Second + + result, probeErr := health.ProbeConnection(r.Context(), item.Connection, h.cfg, timeout) + healthy := probeErr == nil + errText := "" + if probeErr != nil { + errText = strings.TrimSpace(probeErr.Error()) + } + + updated, event, saveErr := h.repo.SaveHealthCheckProbeResult(r.Context(), item.ID, time.Now().UTC(), healthy, errText) + if saveErr != nil { + writeError(w, http.StatusInternalServerError, saveErr.Error()) + return + } + if event != "" { + if notifyErr := h.notifier.NotifyHealthCheckEvent(r.Context(), updated, event); notifyErr != nil { + writeError(w, http.StatusBadGateway, "health check updated but notification failed: "+notifyErr.Error()) + return + } + _ = h.repo.MarkHealthCheckNotified(r.Context(), updated.ID, time.Now().UTC()) + } + + redactConnection(&updated.Connection) + if probeErr != nil { + writeJSON(w, http.StatusBadGateway, map[string]any{"status": "failed", "error": probeErr.Error(), "health_check": updated}) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "result": result, "health_check": updated}) +} + +func (h *Handler) listHealthCheckNotifications(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if _, err := h.repo.GetHealthCheck(r.Context(), id); err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + writeError(w, http.StatusNotFound, "health check not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + items, err := h.repo.ListHealthCheckNotifications(r.Context(), id) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + for i := range items { + redactNotification(&items[i].Notification) + } + writeJSON(w, http.StatusOK, items) +} + +func (h *Handler) setHealthCheckNotifications(w http.ResponseWriter, r *http.Request) { + type notificationBindingRequest struct { + NotificationID string `json:"notification_id"` + OnDown *bool `json:"on_down"` + OnRecovered *bool `json:"on_recovered"` + Enabled *bool `json:"enabled"` + } + type bindingsRequest struct { + Notifications []notificationBindingRequest `json:"notifications"` + } + + id := chi.URLParam(r, "id") + if _, err := h.repo.GetHealthCheck(r.Context(), id); err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + writeError(w, http.StatusNotFound, "health check not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + var req bindingsRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid JSON") + return + } + + bindings := make([]models.HealthCheckNotification, 0, len(req.Notifications)) + for _, item := range req.Notifications { + onDown := true + if item.OnDown != nil { + onDown = *item.OnDown + } + onRecovered := true + if item.OnRecovered != nil { + onRecovered = *item.OnRecovered + } + enabled := true + if item.Enabled != nil { + enabled = *item.Enabled + } + + bindings = append(bindings, models.HealthCheckNotification{ + NotificationID: strings.TrimSpace(item.NotificationID), + OnDown: onDown, + OnRecovered: onRecovered, + Enabled: enabled, + }) + } + + items, err := h.repo.SetHealthCheckNotifications(r.Context(), id, bindings) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + writeError(w, http.StatusNotFound, "health check not found") + return + } + writeError(w, http.StatusBadRequest, err.Error()) + return + } + for i := range items { + redactNotification(&items[i].Notification) + } + writeJSON(w, http.StatusOK, items) +} + func (h *Handler) validateBackupRequest(r *http.Request, b models.Backup, create bool) error { if create && (b.Name == "" || b.ConnectionID == "" || b.CronExpr == "" || b.TargetType == "") { return errors.New("name, connection_id, cron_expr, target_type are required") diff --git a/internal/api/handler_integration_test.go b/internal/api/handler_integration_test.go index 4d014b7..cbdb081 100644 --- a/internal/api/handler_integration_test.go +++ b/internal/api/handler_integration_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "anchordb/internal/api" "anchordb/internal/models" @@ -15,7 +16,7 @@ import ( func TestHealthEndpoint(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) res, err := http.Get(server.URL + "/health") @@ -43,7 +44,7 @@ func TestHealthEndpoint(t *testing.T) { func TestConnectionsEndpointsRedactSecrets(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) createdRes := doJSON(t, http.MethodPost, server.URL+"/connections", map[string]any{ @@ -99,9 +100,117 @@ func TestConnectionsEndpointsRedactSecrets(t *testing.T) { } } +func TestCreateConnectionUsesConfiguredHealthCheckDefaults(t *testing.T) { + stack := testutil.NewStack(t) + stack.Config.DefaultHealthEvery = 90 * time.Second + stack.Config.DefaultHealthTimeout = 7 * time.Second + stack.Config.DefaultFailThreshold = 4 + stack.Config.DefaultPassThreshold = 2 + + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) + t.Cleanup(server.Close) + + createRes := doJSON(t, http.MethodPost, server.URL+"/connections", map[string]any{ + "name": "defaults-check", + "type": "postgres", + "host": "localhost", + "port": 5432, + "database": "appdb", + "username": "postgres", + "password": "secret", + "ssl_mode": "disable", + }) + defer func() { _ = createRes.Body.Close() }() + if createRes.StatusCode != http.StatusCreated { + t.Fatalf("expected 201, got %d", createRes.StatusCode) + } + + listRes := doJSON(t, http.MethodGet, server.URL+"/health-checks", nil) + defer func() { _ = listRes.Body.Close() }() + if listRes.StatusCode != http.StatusOK { + t.Fatalf("expected 200 from list health checks, got %d", listRes.StatusCode) + } + + var checks []models.HealthCheck + decodeJSON(t, listRes.Body, &checks) + if len(checks) != 1 { + t.Fatalf("expected 1 health check, got %d", len(checks)) + } + if checks[0].CheckIntervalSecond != 90 { + t.Fatalf("expected interval 90, got %d", checks[0].CheckIntervalSecond) + } + if checks[0].TimeoutSecond != 7 { + t.Fatalf("expected timeout 7, got %d", checks[0].TimeoutSecond) + } + if checks[0].FailureThreshold != 4 { + t.Fatalf("expected failure threshold 4, got %d", checks[0].FailureThreshold) + } + if checks[0].SuccessThreshold != 2 { + t.Fatalf("expected success threshold 2, got %d", checks[0].SuccessThreshold) + } +} + +func TestRunHealthCheckNowRespectsManualCooldown(t *testing.T) { + stack := testutil.NewStack(t) + stack.Config.DefaultHealthTimeout = 1 * time.Second + stack.Config.HealthManualCooldown = 1 * time.Minute + + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) + t.Cleanup(server.Close) + + createRes := doJSON(t, http.MethodPost, server.URL+"/connections", map[string]any{ + "name": "cooldown-check", + "type": "postgres", + "host": "127.0.0.1", + "port": 1, + "database": "appdb", + "username": "postgres", + "password": "secret", + "ssl_mode": "disable", + }) + defer func() { _ = createRes.Body.Close() }() + if createRes.StatusCode != http.StatusCreated { + t.Fatalf("expected 201, got %d", createRes.StatusCode) + } + + checksRes := doJSON(t, http.MethodGet, server.URL+"/health-checks", nil) + defer func() { _ = checksRes.Body.Close() }() + if checksRes.StatusCode != http.StatusOK { + t.Fatalf("expected 200 from list health checks, got %d", checksRes.StatusCode) + } + + var checks []models.HealthCheck + decodeJSON(t, checksRes.Body, &checks) + if len(checks) != 1 { + t.Fatalf("expected 1 health check, got %d", len(checks)) + } + + firstRunRes := doJSON(t, http.MethodPost, server.URL+"/health-checks/"+checks[0].ID+"/run", nil) + defer func() { _ = firstRunRes.Body.Close() }() + if firstRunRes.StatusCode != http.StatusOK && firstRunRes.StatusCode != http.StatusBadGateway { + t.Fatalf("expected 200 or 502 from first run, got %d", firstRunRes.StatusCode) + } + + secondRunRes := doJSON(t, http.MethodPost, server.URL+"/health-checks/"+checks[0].ID+"/run", nil) + defer func() { _ = secondRunRes.Body.Close() }() + if secondRunRes.StatusCode != http.StatusTooManyRequests { + t.Fatalf("expected 429 from second run due to cooldown, got %d", secondRunRes.StatusCode) + } + + if secondRunRes.Header.Get("Retry-After") == "" { + t.Fatal("expected Retry-After header") + } + + var body map[string]any + decodeJSON(t, secondRunRes.Body, &body) + if body["status"] != "cooldown" { + t.Fatalf("expected cooldown status, got %v", body["status"]) + } +} + func TestBackupsCreateAndToggleIntegration(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) conn := testutil.MustCreateConnection(t, stack.Repo, "api-backup-source") @@ -162,7 +271,7 @@ func TestBackupsCreateAndToggleIntegration(t *testing.T) { func TestCreateBackupValidationError(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) res := doJSON(t, http.MethodPost, server.URL+"/backups", map[string]any{ @@ -181,7 +290,7 @@ func TestCreateBackupValidationError(t *testing.T) { func TestCreateConvexConnectionWithMinimalFields(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) res := doJSON(t, http.MethodPost, server.URL+"/connections", map[string]any{ @@ -214,7 +323,7 @@ func TestCreateConvexConnectionWithMinimalFields(t *testing.T) { func TestCreateConvexBackupForcesNoCompression(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) connRes := doJSON(t, http.MethodPost, server.URL+"/connections", map[string]any{ @@ -260,7 +369,7 @@ func TestCreateConvexBackupForcesNoCompression(t *testing.T) { func TestCreateD1ConnectionWithMinimalFields(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) res := doJSON(t, http.MethodPost, server.URL+"/connections", map[string]any{ @@ -294,7 +403,7 @@ func TestCreateD1ConnectionWithMinimalFields(t *testing.T) { func TestNotificationsEndpointsAndBackupBindings(t *testing.T) { stack := testutil.NewStack(t) - server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + server := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(server.Close) createNotificationRes := doJSON(t, http.MethodPost, server.URL+"/notifications", map[string]any{ @@ -350,7 +459,7 @@ func TestNotificationsEndpointsAndBackupBindings(t *testing.T) { func TestNotificationTestEndpointSendsDiscordWebhook(t *testing.T) { stack := testutil.NewStack(t) - apiServer := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler).Router()) + apiServer := httptest.NewServer(api.NewHandler(stack.Repo, stack.Scheduler, stack.Config).Router()) t.Cleanup(apiServer.Close) called := false diff --git a/internal/config/config.go b/internal/config/config.go index 798eba9..656497a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,6 +19,14 @@ type Config struct { MaxConcurrentBackups int BackupCommandTimeout time.Duration DefaultLocalBasePath string + HealthPollInterval time.Duration + HealthDueBatchSize int + MaxConcurrentHealth int + DefaultHealthEvery time.Duration + DefaultHealthTimeout time.Duration + DefaultFailThreshold int + DefaultPassThreshold int + HealthManualCooldown time.Duration } func Load() Config { @@ -35,6 +43,14 @@ func Load() Config { MaxConcurrentBackups: getenvInt("MAX_CONCURRENT_BACKUPS", 2), BackupCommandTimeout: getenvDuration("BACKUP_COMMAND_TIMEOUT", 2*time.Hour), DefaultLocalBasePath: getenv("DEFAULT_LOCAL_BACKUP_PATH", "./backups"), + HealthPollInterval: getenvDuration("HEALTH_POLL_INTERVAL", 15*time.Second), + HealthDueBatchSize: getenvInt("HEALTH_DUE_BATCH_SIZE", 50), + MaxConcurrentHealth: getenvInt("MAX_CONCURRENT_HEALTH_CHECKS", 2), + DefaultHealthEvery: getenvDuration("DEFAULT_HEALTH_CHECK_INTERVAL", 1*time.Minute), + DefaultHealthTimeout: getenvDuration("DEFAULT_HEALTH_CHECK_TIMEOUT", 5*time.Second), + DefaultFailThreshold: getenvInt("DEFAULT_HEALTH_FAILURE_THRESHOLD", 3), + DefaultPassThreshold: getenvInt("DEFAULT_HEALTH_SUCCESS_THRESHOLD", 1), + HealthManualCooldown: getenvDuration("HEALTH_MANUAL_RUN_COOLDOWN", 10*time.Second), } } diff --git a/internal/health/monitor.go b/internal/health/monitor.go new file mode 100644 index 0000000..5346e62 --- /dev/null +++ b/internal/health/monitor.go @@ -0,0 +1,144 @@ +package health + +import ( + "context" + "log" + "strings" + "time" + + "anchordb/internal/config" + "anchordb/internal/models" + "anchordb/internal/notifications" + "anchordb/internal/repository" +) + +type Monitor struct { + cfg config.Config + repo *repository.Repository + notifier *notifications.Dispatcher + sem chan struct{} + stopCh chan struct{} + doneCh chan struct{} +} + +func NewMonitor(cfg config.Config, repo *repository.Repository) *Monitor { + concurrency := cfg.MaxConcurrentHealth + if concurrency <= 0 { + concurrency = 2 + } + return &Monitor{ + cfg: cfg, + repo: repo, + notifier: notifications.NewDispatcher(repo), + sem: make(chan struct{}, concurrency), + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + } +} + +func (m *Monitor) Start(ctx context.Context) { + if err := m.repo.EnsureHealthChecksForConnections(ctx, repository.HealthCheckDefaults{ + IntervalSecond: int(m.cfg.DefaultHealthEvery / time.Second), + TimeoutSecond: int(m.cfg.DefaultHealthTimeout / time.Second), + FailureThreshold: m.cfg.DefaultFailThreshold, + SuccessThreshold: m.cfg.DefaultPassThreshold, + }); err != nil { + log.Printf("health monitor: ensure default checks failed: %v", err) + } + + interval := m.cfg.HealthPollInterval + if interval <= 0 { + interval = 15 * time.Second + } + ticker := time.NewTicker(interval) + + go func() { + defer close(m.doneCh) + defer ticker.Stop() + m.processDue(context.Background()) + for { + select { + case <-ticker.C: + m.processDue(context.Background()) + case <-m.stopCh: + return + } + } + }() +} + +func (m *Monitor) Stop() { + close(m.stopCh) + <-m.doneCh +} + +func (m *Monitor) processDue(ctx context.Context) { + batchSize := m.cfg.HealthDueBatchSize + if batchSize <= 0 { + batchSize = 50 + } + + now := time.Now().UTC() + items, err := m.repo.ListDueHealthChecks(ctx, now, batchSize) + if err != nil { + log.Printf("health monitor: list due checks failed: %v", err) + return + } + + for _, item := range items { + itemCopy := item + timeoutSec := itemCopy.TimeoutSecond + if timeoutSec <= 0 { + timeoutSec = int(m.cfg.DefaultHealthTimeout / time.Second) + } + if timeoutSec <= 0 { + timeoutSec = 5 + } + lease := time.Duration(timeoutSec+2) * time.Second + claimed, claimErr := m.repo.ClaimHealthCheckRun(ctx, itemCopy.ID, time.Now().UTC(), lease) + if claimErr != nil { + log.Printf("health monitor: claim check failed for %s: %v", itemCopy.ID, claimErr) + continue + } + if !claimed { + continue + } + m.sem <- struct{}{} + go func() { + defer func() { <-m.sem }() + m.runCheck(context.Background(), itemCopy) + }() + } +} + +func (m *Monitor) runCheck(ctx context.Context, checkItem models.HealthCheck) { + timeoutSec := checkItem.TimeoutSecond + if timeoutSec <= 0 { + timeoutSec = int(m.cfg.DefaultHealthTimeout / time.Second) + } + if timeoutSec <= 0 { + timeoutSec = 5 + } + timeout := time.Duration(timeoutSec) * time.Second + + _, probeErr := ProbeConnection(ctx, checkItem.Connection, m.cfg, timeout) + healthy := probeErr == nil + errText := "" + if probeErr != nil { + errText = strings.TrimSpace(probeErr.Error()) + } + + updated, event, err := m.repo.SaveHealthCheckProbeResult(ctx, checkItem.ID, time.Now().UTC(), healthy, errText) + if err != nil { + log.Printf("health monitor: save probe result failed for %s: %v", checkItem.ID, err) + return + } + if event == "" { + return + } + if notifyErr := m.notifier.NotifyHealthCheckEvent(ctx, updated, event); notifyErr != nil { + log.Printf("health monitor: notify failed for %s (%s): %v", updated.ID, event, notifyErr) + return + } + _ = m.repo.MarkHealthCheckNotified(ctx, updated.ID, time.Now().UTC()) +} diff --git a/internal/health/probe.go b/internal/health/probe.go new file mode 100644 index 0000000..b1d5129 --- /dev/null +++ b/internal/health/probe.go @@ -0,0 +1,175 @@ +package health + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "anchordb/internal/config" + "anchordb/internal/models" +) + +func ProbeConnection(ctx context.Context, item models.Connection, cfg config.Config, timeout time.Duration) (string, error) { + typeName := strings.ToLower(strings.TrimSpace(item.Type)) + host := strings.TrimSpace(item.Host) + if host == "" { + return "", errors.New("host is required") + } + if timeout <= 0 { + timeout = 5 * time.Second + } + dialTimeout := timeout + if dialTimeout > time.Second { + dialTimeout -= time.Second + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + switch typeName { + case "postgres", "postgresql", "mysql": + applyConnectionDefaults(&item) + if item.Port <= 0 { + return "", errors.New("port is required") + } + addr := host + if _, _, err := net.SplitHostPort(host); err != nil { + addr = net.JoinHostPort(host, strconv.Itoa(item.Port)) + } + dialer := net.Dialer{Timeout: dialTimeout} + conn, err := dialer.DialContext(ctx, "tcp", addr) + if err != nil { + return "", err + } + _ = conn.Close() + return "TCP reachability OK (" + addr + ")", nil + case "convex": + raw := strings.TrimSpace(item.Host) + if !strings.Contains(raw, "://") { + raw = "https://" + raw + } + u, err := url.Parse(raw) + if err != nil || u.Scheme == "" || u.Host == "" { + return "", errors.New("invalid Convex URL") + } + port := u.Port() + if port == "" { + if strings.EqualFold(u.Scheme, "http") { + port = "80" + } else { + port = "443" + } + } + addr := net.JoinHostPort(u.Hostname(), port) + dialer := net.Dialer{Timeout: dialTimeout} + conn, err := dialer.DialContext(ctx, "tcp", addr) + if err != nil { + return "", err + } + _ = conn.Close() + return "Convex endpoint reachable (" + addr + ")", nil + case "d1": + accountID := strings.TrimSpace(item.Host) + if accountID == "" { + accountID = strings.TrimSpace(cfg.CloudflareAccountID) + } + databaseID := strings.TrimSpace(item.Database) + if databaseID == "" { + databaseID = strings.TrimSpace(cfg.CloudflareDatabaseID) + } + apiKey := strings.TrimSpace(item.Password) + if apiKey == "" { + apiKey = strings.TrimSpace(cfg.CloudflareAPIKey) + } + if accountID == "" || databaseID == "" || apiKey == "" { + return "", errors.New("d1 test requires account id, database id, and api key") + } + if err := probeD1Query(ctx, cfg, accountID, databaseID, apiKey); err != nil { + return "", err + } + return "Cloudflare D1 API reachable", nil + default: + return "", fmt.Errorf("unsupported connection type: %s", item.Type) + } +} + +func applyConnectionDefaults(item *models.Connection) { + if item.Port != 0 { + return + } + + switch strings.ToLower(strings.TrimSpace(item.Type)) { + case "postgres", "postgresql": + item.Port = 5432 + case "mysql": + item.Port = 3306 + } +} + +func probeD1Query(ctx context.Context, cfg config.Config, accountID, databaseID, apiKey string) error { + endpoint := strings.TrimSuffix(strings.TrimSpace(cfg.D1APIBaseURL), "/") + if endpoint == "" { + endpoint = "https://api.cloudflare.com/client/v4" + } + endpoint = endpoint + "/accounts/" + url.PathEscape(accountID) + "/d1/database/" + url.PathEscape(databaseID) + "/query" + + payload, err := json.Marshal(map[string]any{"sql": "SELECT 1 AS ok"}) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+apiKey) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer func() { _ = res.Body.Close() }() + + body, err := io.ReadAll(res.Body) + if err != nil { + return err + } + + var envelope struct { + Success bool `json:"success"` + Errors []struct { + Message string `json:"message"` + } `json:"errors"` + } + if err := json.Unmarshal(body, &envelope); err != nil { + if res.StatusCode >= http.StatusBadRequest { + return fmt.Errorf("d1 query failed (status %d)", res.StatusCode) + } + return err + } + + if res.StatusCode >= http.StatusBadRequest { + if len(envelope.Errors) > 0 && strings.TrimSpace(envelope.Errors[0].Message) != "" { + return errors.New(envelope.Errors[0].Message) + } + return fmt.Errorf("d1 query failed (status %d)", res.StatusCode) + } + if !envelope.Success { + if len(envelope.Errors) > 0 && strings.TrimSpace(envelope.Errors[0].Message) != "" { + return errors.New(envelope.Errors[0].Message) + } + return errors.New("d1 query returned unsuccessful response") + } + + return nil +} diff --git a/internal/metadata/db.go b/internal/metadata/db.go index 3168d6c..b513dc6 100644 --- a/internal/metadata/db.go +++ b/internal/metadata/db.go @@ -30,5 +30,7 @@ func Migrate(db *gorm.DB) error { &models.BackupRun{}, &models.NotificationDestination{}, &models.BackupNotification{}, + &models.HealthCheck{}, + &models.HealthCheckNotification{}, ) } diff --git a/internal/models/models.go b/internal/models/models.go index 4a8bb2e..405cf36 100644 --- a/internal/models/models.go +++ b/internal/models/models.go @@ -93,3 +93,37 @@ type BackupNotification struct { Backup Backup `gorm:"foreignKey:BackupID" json:"backup,omitempty"` Notification NotificationDestination `gorm:"foreignKey:NotificationID" json:"notification,omitempty"` } + +type HealthCheck struct { + ID string `gorm:"primaryKey;size:36" json:"id"` + ConnectionID string `gorm:"size:36;not null;uniqueIndex" json:"connection_id"` + Enabled bool `gorm:"not null;default:true" json:"enabled"` + CheckIntervalSecond int `gorm:"not null;default:60" json:"check_interval_second"` + TimeoutSecond int `gorm:"not null;default:5" json:"timeout_second"` + FailureThreshold int `gorm:"not null;default:3" json:"failure_threshold"` + SuccessThreshold int `gorm:"not null;default:1" json:"success_threshold"` + Status string `gorm:"size:32;not null;default:unknown" json:"status"` + ConsecutiveFailures int `gorm:"not null;default:0" json:"consecutive_failures"` + ConsecutiveSuccess int `gorm:"not null;default:0" json:"consecutive_success"` + LastCheckedAt *time.Time `json:"last_checked_at"` + LastError string `gorm:"size:2048" json:"last_error"` + NextCheckAt *time.Time `gorm:"index" json:"next_check_at"` + LastNotifiedAt *time.Time `json:"last_notified_at"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + + Connection Connection `gorm:"foreignKey:ConnectionID" json:"connection,omitempty"` +} + +type HealthCheckNotification struct { + ID string `gorm:"primaryKey;size:36" json:"id"` + HealthCheckID string `gorm:"size:36;not null;index:idx_health_notification_unique,unique" json:"health_check_id"` + NotificationID string `gorm:"size:36;not null;index:idx_health_notification_unique,unique;index" json:"notification_id"` + OnDown bool `gorm:"not null;default:true" json:"on_down"` + OnRecovered bool `gorm:"not null;default:true" json:"on_recovered"` + Enabled bool `gorm:"not null;default:true" json:"enabled"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + HealthCheck HealthCheck `gorm:"foreignKey:HealthCheckID" json:"health_check,omitempty"` + Notification NotificationDestination `gorm:"foreignKey:NotificationID" json:"notification,omitempty"` +} diff --git a/internal/notifications/dispatcher.go b/internal/notifications/dispatcher.go index c073299..41c5576 100644 --- a/internal/notifications/dispatcher.go +++ b/internal/notifications/dispatcher.go @@ -72,6 +72,49 @@ func (d *Dispatcher) NotifyBackupRun(ctx context.Context, backup models.Backup, return nil } +func (d *Dispatcher) NotifyHealthCheckEvent(ctx context.Context, check models.HealthCheck, event string) error { + eventName := strings.ToLower(strings.TrimSpace(event)) + if eventName != "down" && eventName != "recovered" { + return nil + } + + destinations, err := d.repo.ListHealthNotificationDestinationsForEvent(ctx, check.ID, eventName) + if err != nil { + return err + } + if len(destinations) == 0 { + return nil + } + + subject := buildHealthSMTPSubject(check, eventName) + body := formatHealthMessage(check, eventName) + errs := make([]string, 0) + for _, destination := range destinations { + if strings.TrimSpace(destination.Type) == "" { + continue + } + + var sendErr error + switch strings.ToLower(strings.TrimSpace(destination.Type)) { + case "discord": + sendErr = d.sendDiscordMessage(ctx, destination, body) + case "smtp": + sendErr = d.sendSMTPMessage(ctx, destination, subject, body) + default: + sendErr = fmt.Errorf("unsupported notification type: %s", destination.Type) + } + + if sendErr != nil { + errs = append(errs, fmt.Sprintf("%s: %v", destination.Name, sendErr)) + } + } + + if len(errs) > 0 { + return fmt.Errorf(strings.Join(errs, "; ")) + } + return nil +} + func (d *Dispatcher) SendTestNotification(ctx context.Context, destination models.NotificationDestination) error { kind := strings.ToLower(strings.TrimSpace(destination.Type)) message := "AnchorDB test notification\n- Result: Delivery channel is configured correctly" @@ -244,6 +287,53 @@ func buildSMTPSubject(backup models.Backup, run models.BackupRun) string { return fmt.Sprintf("[AnchorDB] %s - %s", status, backup.Name) } +func buildHealthSMTPSubject(check models.HealthCheck, event string) string { + state := "DOWN" + if event == "recovered" { + state = "RECOVERED" + } + connectionName := strings.TrimSpace(check.Connection.Name) + if connectionName == "" { + connectionName = check.ConnectionID + } + return fmt.Sprintf("[AnchorDB] HEALTH %s - %s", state, connectionName) +} + +func formatHealthMessage(check models.HealthCheck, event string) string { + state := "DOWN" + if event == "recovered" { + state = "RECOVERED" + } + b := strings.Builder{} + b.WriteString("AnchorDB health check ") + b.WriteString(state) + b.WriteString("\n") + b.WriteString("- Connection: ") + b.WriteString(check.Connection.Name) + b.WriteString("\n") + b.WriteString("- Type: ") + b.WriteString(check.Connection.Type) + b.WriteString("\n") + b.WriteString("- Host: ") + b.WriteString(check.Connection.Host) + b.WriteString("\n") + b.WriteString("- Status: ") + b.WriteString(strings.ToUpper(strings.TrimSpace(check.Status))) + if check.LastCheckedAt != nil { + b.WriteString("\n- Checked: ") + b.WriteString(check.LastCheckedAt.UTC().Format(time.RFC3339)) + } + if check.ConsecutiveFailures > 0 { + b.WriteString("\n- Consecutive failures: ") + b.WriteString(strconv.Itoa(check.ConsecutiveFailures)) + } + if strings.TrimSpace(check.LastError) != "" { + b.WriteString("\n- Error: ") + b.WriteString(strings.TrimSpace(check.LastError)) + } + return b.String() +} + func buildSMTPMessage(from string, to []string, subject, body string) string { headers := []string{ "From: " + from, diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 20b46aa..8b612be 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -33,6 +33,21 @@ type NotificationPatch struct { SMTPSecurity *string } +type HealthCheckDefaults struct { + IntervalSecond int + TimeoutSecond int + FailureThreshold int + SuccessThreshold int +} + +type HealthCheckPatch struct { + Enabled *bool + CheckIntervalSecond *int + TimeoutSecond *int + FailureThreshold *int + SuccessThreshold *int +} + func New(db *gorm.DB, cryptoSvc *crypto.Service) *Repository { return &Repository{db: db, crypto: cryptoSvc} } @@ -103,14 +118,29 @@ func (r *Repository) UpdateConnection(ctx context.Context, id string, c *models. } func (r *Repository) DeleteConnection(ctx context.Context, id string) error { - res := r.db.WithContext(ctx).Delete(&models.Connection{}, "id = ?", id) - if res.Error != nil { - return res.Error - } - if res.RowsAffected == 0 { - return gorm.ErrRecordNotFound - } - return nil + return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var checkIDs []string + if err := tx.Model(&models.HealthCheck{}).Where("connection_id = ?", id).Pluck("id", &checkIDs).Error; err != nil { + return err + } + if len(checkIDs) > 0 { + if err := tx.Where("health_check_id IN ?", checkIDs).Delete(&models.HealthCheckNotification{}).Error; err != nil { + return err + } + } + if err := tx.Where("connection_id = ?", id).Delete(&models.HealthCheck{}).Error; err != nil { + return err + } + + res := tx.Delete(&models.Connection{}, "id = ?", id) + if res.Error != nil { + return res.Error + } + if res.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil + }) } func (r *Repository) ListConnections(ctx context.Context) ([]models.Connection, error) { @@ -280,6 +310,9 @@ func (r *Repository) CreateNotification(ctx context.Context, n *models.Notificat if err := r.db.WithContext(ctx).Create(©).Error; err != nil { return err } + if err := r.attachNotificationToAllHealthChecks(ctx, copy.ID); err != nil { + return err + } *n = copy return r.decryptNotification(n) } @@ -353,6 +386,9 @@ func (r *Repository) DeleteNotification(ctx context.Context, id string) error { if err := tx.Delete(&models.BackupNotification{}, "notification_id = ?", id).Error; err != nil { return err } + if err := tx.Delete(&models.HealthCheckNotification{}, "notification_id = ?", id).Error; err != nil { + return err + } res := tx.Delete(&models.NotificationDestination{}, "id = ?", id) if res.Error != nil { return res.Error @@ -504,6 +540,405 @@ func (r *Repository) ListNotificationDestinationsForEvent(ctx context.Context, b return items, nil } +func (r *Repository) EnsureHealthChecksForConnections(ctx context.Context, defaults HealthCheckDefaults) error { + conns, err := r.ListConnections(ctx) + if err != nil { + return err + } + for _, conn := range conns { + if _, err := r.UpsertHealthCheckForConnection(ctx, conn.ID, defaults); err != nil { + return err + } + } + return nil +} + +func (r *Repository) UpsertHealthCheckForConnection(ctx context.Context, connectionID string, defaults HealthCheckDefaults) (models.HealthCheck, error) { + var item models.HealthCheck + err := r.db.WithContext(ctx).Where("connection_id = ?", strings.TrimSpace(connectionID)).First(&item).Error + if err == nil { + if seedErr := r.seedDefaultHealthCheckNotifications(ctx, item.ID); seedErr != nil { + return models.HealthCheck{}, seedErr + } + return r.GetHealthCheck(ctx, item.ID) + } + if !errors.Is(err, gorm.ErrRecordNotFound) { + return models.HealthCheck{}, err + } + + now := time.Now().UTC() + item = models.HealthCheck{ + ID: uuid.NewString(), + ConnectionID: strings.TrimSpace(connectionID), + Enabled: true, + CheckIntervalSecond: normalizePositive(defaults.IntervalSecond, 60), + TimeoutSecond: normalizePositive(defaults.TimeoutSecond, 5), + FailureThreshold: normalizePositive(defaults.FailureThreshold, 3), + SuccessThreshold: normalizePositive(defaults.SuccessThreshold, 1), + Status: "unknown", + NextCheckAt: &now, + } + if err := r.db.WithContext(ctx).Create(&item).Error; err != nil { + return models.HealthCheck{}, err + } + if err := r.seedDefaultHealthCheckNotifications(ctx, item.ID); err != nil { + return models.HealthCheck{}, err + } + return r.GetHealthCheck(ctx, item.ID) +} + +func (r *Repository) ListHealthChecks(ctx context.Context) ([]models.HealthCheck, error) { + var items []models.HealthCheck + err := r.db.WithContext(ctx). + Preload("Connection"). + Order("created_at desc"). + Find(&items).Error + if err != nil { + return nil, err + } + for i := range items { + if decErr := r.decryptConnection(&items[i].Connection); decErr != nil { + return nil, decErr + } + } + return items, nil +} + +func (r *Repository) GetHealthCheck(ctx context.Context, id string) (models.HealthCheck, error) { + var item models.HealthCheck + err := r.db.WithContext(ctx). + Preload("Connection"). + First(&item, "id = ?", id).Error + if err != nil { + return models.HealthCheck{}, err + } + if err := r.decryptConnection(&item.Connection); err != nil { + return models.HealthCheck{}, err + } + return item, nil +} + +func (r *Repository) UpdateHealthCheck(ctx context.Context, id string, patch HealthCheckPatch) (models.HealthCheck, error) { + var item models.HealthCheck + if err := r.db.WithContext(ctx).First(&item, "id = ?", id).Error; err != nil { + return models.HealthCheck{}, err + } + + if patch.Enabled != nil { + item.Enabled = *patch.Enabled + } + if patch.CheckIntervalSecond != nil { + item.CheckIntervalSecond = normalizePositive(*patch.CheckIntervalSecond, item.CheckIntervalSecond) + } + if patch.TimeoutSecond != nil { + item.TimeoutSecond = normalizePositive(*patch.TimeoutSecond, item.TimeoutSecond) + } + if patch.FailureThreshold != nil { + item.FailureThreshold = normalizePositive(*patch.FailureThreshold, item.FailureThreshold) + } + if patch.SuccessThreshold != nil { + item.SuccessThreshold = normalizePositive(*patch.SuccessThreshold, item.SuccessThreshold) + } + if item.CheckIntervalSecond <= 0 { + item.CheckIntervalSecond = 60 + } + if item.TimeoutSecond <= 0 { + item.TimeoutSecond = 5 + } + if item.FailureThreshold <= 0 { + item.FailureThreshold = 3 + } + if item.SuccessThreshold <= 0 { + item.SuccessThreshold = 1 + } + if err := r.db.WithContext(ctx).Save(&item).Error; err != nil { + return models.HealthCheck{}, err + } + return r.GetHealthCheck(ctx, item.ID) +} + +func (r *Repository) ListDueHealthChecks(ctx context.Context, now time.Time, limit int) ([]models.HealthCheck, error) { + if limit <= 0 { + limit = 50 + } + var items []models.HealthCheck + err := r.db.WithContext(ctx). + Where("enabled = ?", true). + Where("next_check_at IS NULL OR next_check_at <= ?", now.UTC()). + Preload("Connection"). + Order("next_check_at asc"). + Limit(limit). + Find(&items).Error + if err != nil { + return nil, err + } + for i := range items { + if decErr := r.decryptConnection(&items[i].Connection); decErr != nil { + return nil, decErr + } + } + return items, nil +} + +func (r *Repository) ClaimHealthCheckRun(ctx context.Context, id string, now time.Time, lease time.Duration) (bool, error) { + if lease <= 0 { + lease = 10 * time.Second + } + nowUTC := now.UTC() + next := nowUTC.Add(lease) + + res := r.db.WithContext(ctx). + Model(&models.HealthCheck{}). + Where("id = ?", id). + Where("enabled = ?", true). + Where("next_check_at IS NULL OR next_check_at <= ?", nowUTC). + Updates(map[string]any{"next_check_at": &next}) + if res.Error != nil { + return false, res.Error + } + return res.RowsAffected > 0, nil +} + +func (r *Repository) SaveHealthCheckProbeResult(ctx context.Context, id string, checkedAt time.Time, healthy bool, errorText string) (models.HealthCheck, string, error) { + var item models.HealthCheck + if err := r.db.WithContext(ctx).First(&item, "id = ?", id).Error; err != nil { + return models.HealthCheck{}, "", err + } + + oldStatus := strings.ToLower(strings.TrimSpace(item.Status)) + if oldStatus == "" { + oldStatus = "unknown" + } + + if healthy { + item.ConsecutiveSuccess++ + item.ConsecutiveFailures = 0 + item.LastError = "" + if item.ConsecutiveSuccess >= normalizePositive(item.SuccessThreshold, 1) { + item.Status = "up" + } + } else { + item.ConsecutiveFailures++ + item.ConsecutiveSuccess = 0 + item.LastError = strings.TrimSpace(errorText) + if item.ConsecutiveFailures >= normalizePositive(item.FailureThreshold, 3) { + item.Status = "down" + } + } + + if strings.TrimSpace(item.Status) == "" { + item.Status = "unknown" + } + + checked := checkedAt.UTC() + item.LastCheckedAt = &checked + next := checked.Add(time.Duration(normalizePositive(item.CheckIntervalSecond, 60)) * time.Second) + item.NextCheckAt = &next + + if err := r.db.WithContext(ctx).Save(&item).Error; err != nil { + return models.HealthCheck{}, "", err + } + + newStatus := strings.ToLower(strings.TrimSpace(item.Status)) + event := "" + if oldStatus != "down" && newStatus == "down" { + event = "down" + } + if oldStatus == "down" && newStatus == "up" { + event = "recovered" + } + + fresh, err := r.GetHealthCheck(ctx, item.ID) + if err != nil { + return models.HealthCheck{}, "", err + } + return fresh, event, nil +} + +func (r *Repository) MarkHealthCheckNotified(ctx context.Context, id string, at time.Time) error { + t := at.UTC() + return r.db.WithContext(ctx). + Model(&models.HealthCheck{}). + Where("id = ?", id). + Update("last_notified_at", &t).Error +} + +func (r *Repository) ListHealthCheckNotifications(ctx context.Context, healthCheckID string) ([]models.HealthCheckNotification, error) { + var items []models.HealthCheckNotification + err := r.db.WithContext(ctx). + Where("health_check_id = ?", healthCheckID). + Preload("Notification"). + Order("created_at asc"). + Find(&items).Error + if err != nil { + return nil, err + } + for i := range items { + if items[i].Notification.ID != "" { + if decErr := r.decryptNotification(&items[i].Notification); decErr != nil { + return nil, decErr + } + } + } + return items, nil +} + +func (r *Repository) SetHealthCheckNotifications(ctx context.Context, healthCheckID string, bindings []models.HealthCheckNotification) ([]models.HealthCheckNotification, error) { + err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var check models.HealthCheck + if err := tx.First(&check, "id = ?", healthCheckID).Error; err != nil { + return err + } + + if err := tx.Delete(&models.HealthCheckNotification{}, "health_check_id = ?", healthCheckID).Error; err != nil { + return err + } + + seen := make(map[string]struct{}, len(bindings)) + for _, binding := range bindings { + notificationID := strings.TrimSpace(binding.NotificationID) + if notificationID == "" { + return errors.New("notification_id is required") + } + if _, ok := seen[notificationID]; ok { + return fmt.Errorf("duplicate notification_id: %s", notificationID) + } + seen[notificationID] = struct{}{} + if !binding.OnDown && !binding.OnRecovered { + return fmt.Errorf("notification %s must enable on_down or on_recovered", notificationID) + } + + var destination models.NotificationDestination + if err := tx.First(&destination, "id = ?", notificationID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return fmt.Errorf("notification_id not found: %s", notificationID) + } + return err + } + + now := time.Now().UTC() + item := map[string]any{ + "id": uuid.NewString(), + "health_check_id": healthCheckID, + "notification_id": notificationID, + "on_down": binding.OnDown, + "on_recovered": binding.OnRecovered, + "enabled": binding.Enabled, + "created_at": now, + "updated_at": now, + } + if err := tx.Table("health_check_notifications").Create(item).Error; err != nil { + return err + } + } + + return nil + }) + if err != nil { + return nil, err + } + return r.ListHealthCheckNotifications(ctx, healthCheckID) +} + +func (r *Repository) ListHealthNotificationDestinationsForEvent(ctx context.Context, healthCheckID, event string) ([]models.NotificationDestination, error) { + column := "on_down" + if strings.EqualFold(strings.TrimSpace(event), "recovered") { + column = "on_recovered" + } + + var bindings []models.HealthCheckNotification + err := r.db.WithContext(ctx). + Where("health_check_id = ?", healthCheckID). + Where("enabled = ?", true). + Where(column+" = ?", true). + Preload("Notification", "enabled = ?", true). + Find(&bindings).Error + if err != nil { + return nil, err + } + + items := make([]models.NotificationDestination, 0, len(bindings)) + for _, binding := range bindings { + if binding.Notification.ID == "" { + continue + } + if decErr := r.decryptNotification(&binding.Notification); decErr != nil { + return nil, decErr + } + items = append(items, binding.Notification) + } + return items, nil +} + +func normalizePositive(value, fallback int) int { + if value > 0 { + return value + } + if fallback > 0 { + return fallback + } + return 1 +} + +func (r *Repository) seedDefaultHealthCheckNotifications(ctx context.Context, healthCheckID string) error { + var count int64 + if err := r.db.WithContext(ctx).Model(&models.HealthCheckNotification{}).Where("health_check_id = ?", healthCheckID).Count(&count).Error; err != nil { + return err + } + if count > 0 { + return nil + } + + var destinations []models.NotificationDestination + if err := r.db.WithContext(ctx).Where("enabled = ?", true).Find(&destinations).Error; err != nil { + return err + } + if len(destinations) == 0 { + return nil + } + + now := time.Now().UTC() + items := make([]map[string]any, 0, len(destinations)) + for _, destination := range destinations { + items = append(items, map[string]any{ + "id": uuid.NewString(), + "health_check_id": healthCheckID, + "notification_id": destination.ID, + "on_down": true, + "on_recovered": true, + "enabled": true, + "created_at": now, + "updated_at": now, + }) + } + return r.db.WithContext(ctx).Table("health_check_notifications").Create(items).Error +} + +func (r *Repository) attachNotificationToAllHealthChecks(ctx context.Context, notificationID string) error { + var checks []models.HealthCheck + if err := r.db.WithContext(ctx).Select("id").Find(&checks).Error; err != nil { + return err + } + if len(checks) == 0 { + return nil + } + now := time.Now().UTC() + items := make([]map[string]any, 0, len(checks)) + for _, check := range checks { + items = append(items, map[string]any{ + "id": uuid.NewString(), + "health_check_id": check.ID, + "notification_id": notificationID, + "on_down": true, + "on_recovered": true, + "enabled": true, + "created_at": now, + "updated_at": now, + }) + } + return r.db.WithContext(ctx).Table("health_check_notifications").Create(items).Error +} + func validateNotificationDestination(n models.NotificationDestination) error { if strings.TrimSpace(n.Name) == "" { return errors.New("name is required") diff --git a/internal/repository/repository_integration_test.go b/internal/repository/repository_integration_test.go index d839ee4..f2b8728 100644 --- a/internal/repository/repository_integration_test.go +++ b/internal/repository/repository_integration_test.go @@ -4,6 +4,7 @@ import ( "context" "strings" "testing" + "time" "anchordb/internal/models" "anchordb/internal/repository" @@ -235,3 +236,156 @@ func TestNotificationLifecycleAndBindings(t *testing.T) { func ptrString(v string) *string { return &v } + +func TestDeleteConnectionRemovesHealthChecksAndBindings(t *testing.T) { + stack := testutil.NewStack(t) + ctx := context.Background() + + notification := models.NotificationDestination{ + Name: "health-alerts", + Type: "discord", + Enabled: true, + DiscordWebhookURL: "https://discord.com/api/webhooks/test-id/test-token", + } + if err := stack.Repo.CreateNotification(ctx, ¬ification); err != nil { + t.Fatalf("create notification: %v", err) + } + + conn := testutil.MustCreateConnection(t, stack.Repo, "health-source") + check, err := stack.Repo.UpsertHealthCheckForConnection(ctx, conn.ID, repository.HealthCheckDefaults{ + IntervalSecond: 60, + TimeoutSecond: 5, + FailureThreshold: 3, + SuccessThreshold: 1, + }) + if err != nil { + t.Fatalf("upsert health check: %v", err) + } + + bindings, err := stack.Repo.ListHealthCheckNotifications(ctx, check.ID) + if err != nil { + t.Fatalf("list health check bindings: %v", err) + } + if len(bindings) == 0 { + t.Fatal("expected seeded health check notification bindings") + } + + if err := stack.Repo.DeleteConnection(ctx, conn.ID); err != nil { + t.Fatalf("delete connection: %v", err) + } + + var checkCount int64 + if err := stack.DB.WithContext(ctx).Model(&models.HealthCheck{}).Where("connection_id = ?", conn.ID).Count(&checkCount).Error; err != nil { + t.Fatalf("count health checks: %v", err) + } + if checkCount != 0 { + t.Fatalf("expected health checks deleted, got %d", checkCount) + } + + var bindingCount int64 + if err := stack.DB.WithContext(ctx).Model(&models.HealthCheckNotification{}).Where("health_check_id = ?", check.ID).Count(&bindingCount).Error; err != nil { + t.Fatalf("count health check bindings: %v", err) + } + if bindingCount != 0 { + t.Fatalf("expected health check bindings deleted, got %d", bindingCount) + } +} + +func TestDeleteNotificationRemovesHealthBindings(t *testing.T) { + stack := testutil.NewStack(t) + ctx := context.Background() + + conn := testutil.MustCreateConnection(t, stack.Repo, "health-binding-source") + if _, err := stack.Repo.UpsertHealthCheckForConnection(ctx, conn.ID, repository.HealthCheckDefaults{ + IntervalSecond: 60, + TimeoutSecond: 5, + FailureThreshold: 3, + SuccessThreshold: 1, + }); err != nil { + t.Fatalf("upsert health check: %v", err) + } + + notification := models.NotificationDestination{ + Name: "health-alerts-delete", + Type: "discord", + Enabled: true, + DiscordWebhookURL: "https://discord.com/api/webhooks/test-id/test-token", + } + if err := stack.Repo.CreateNotification(ctx, ¬ification); err != nil { + t.Fatalf("create notification: %v", err) + } + + checks, err := stack.Repo.ListHealthChecks(ctx) + if err != nil { + t.Fatalf("list health checks: %v", err) + } + if len(checks) == 0 { + t.Fatal("expected at least one health check") + } + + var beforeCount int64 + if err := stack.DB.WithContext(ctx).Model(&models.HealthCheckNotification{}).Where("notification_id = ?", notification.ID).Count(&beforeCount).Error; err != nil { + t.Fatalf("count bindings before delete: %v", err) + } + if beforeCount == 0 { + t.Fatal("expected health notification bindings before delete") + } + + if err := stack.Repo.DeleteNotification(ctx, notification.ID); err != nil { + t.Fatalf("delete notification: %v", err) + } + + var afterCount int64 + if err := stack.DB.WithContext(ctx).Model(&models.HealthCheckNotification{}).Where("notification_id = ?", notification.ID).Count(&afterCount).Error; err != nil { + t.Fatalf("count bindings after delete: %v", err) + } + if afterCount != 0 { + t.Fatalf("expected health notification bindings deleted, got %d", afterCount) + } +} + +func TestClaimHealthCheckRunPreventsDuplicateClaims(t *testing.T) { + stack := testutil.NewStack(t) + ctx := context.Background() + + conn := testutil.MustCreateConnection(t, stack.Repo, "claim-source") + check, err := stack.Repo.UpsertHealthCheckForConnection(ctx, conn.ID, repository.HealthCheckDefaults{ + IntervalSecond: 60, + TimeoutSecond: 5, + FailureThreshold: 3, + SuccessThreshold: 1, + }) + if err != nil { + t.Fatalf("upsert health check: %v", err) + } + + now := time.Now().UTC() + claimed, err := stack.Repo.ClaimHealthCheckRun(ctx, check.ID, now, 5*time.Second) + if err != nil { + t.Fatalf("first claim: %v", err) + } + if !claimed { + t.Fatal("expected first claim to succeed") + } + + claimed, err = stack.Repo.ClaimHealthCheckRun(ctx, check.ID, now.Add(1*time.Second), 5*time.Second) + if err != nil { + t.Fatalf("second claim: %v", err) + } + if claimed { + t.Fatal("expected second claim to be rejected while lease is active") + } + + past := time.Now().UTC().Add(-1 * time.Second) + if err := stack.DB.WithContext(ctx).Model(&models.HealthCheck{}).Where("id = ?", check.ID).Update("next_check_at", &past).Error; err != nil { + t.Fatalf("set next_check_at to past: %v", err) + } + + claimed, err = stack.Repo.ClaimHealthCheckRun(ctx, check.ID, time.Now().UTC(), 5*time.Second) + if err != nil { + t.Fatalf("third claim: %v", err) + } + if !claimed { + t.Fatal("expected claim to succeed after lease expires") + } +} diff --git a/internal/ui/handler.go b/internal/ui/handler.go index c4c8aab..f493274 100644 --- a/internal/ui/handler.go +++ b/internal/ui/handler.go @@ -1,17 +1,13 @@ package ui import ( - "bytes" "context" "embed" - "encoding/json" "errors" "fmt" "html/template" "io" - "net" "net/http" - "net/url" "os" "path/filepath" "strconv" @@ -19,6 +15,7 @@ import ( "time" "anchordb/internal/config" + "anchordb/internal/health" "anchordb/internal/models" "anchordb/internal/notifications" "anchordb/internal/repository" @@ -48,8 +45,11 @@ type pageData struct { Connections []models.Connection Remotes []models.Remote Backups []models.Backup + HealthChecks []models.HealthCheck + DownHealthChecks []models.HealthCheck Notifications []models.NotificationDestination BackupBindings []models.BackupNotification + HealthCheckBindings []models.HealthCheckNotification Runs []models.BackupRun RunItems []runListItem RunLog []runLogLine @@ -57,7 +57,12 @@ type pageData struct { SelectedRun runListItem HasSelectedRun bool SelectedBackupID string + SelectedHealthCheckID string BackupNotificationCounts map[string]int + HealthNotificationCounts map[string]int + TotalHealthChecks int + DownHealthCheckCount int + HealthSummaryLabel string CurrentPage string Dashboard dashboardStats @@ -67,6 +72,8 @@ type pageData struct { RemotesErr string BackupsMsg string BackupsErr string + HealthChecksMsg string + HealthChecksErr string NotificationsMsg string NotificationsErr string RunsErr string @@ -166,6 +173,30 @@ func NewHandler(repo *repository.Repository, scheduler *scheduler.Scheduler, cfg } return false }, + "healthBindingEnabled": func(bindings []models.HealthCheckNotification, notificationID string) bool { + for _, item := range bindings { + if item.NotificationID == notificationID && item.Enabled { + return true + } + } + return false + }, + "healthBindingOnDown": func(bindings []models.HealthCheckNotification, notificationID string) bool { + for _, item := range bindings { + if item.NotificationID == notificationID && item.Enabled { + return item.OnDown + } + } + return false + }, + "healthBindingOnRecovered": func(bindings []models.HealthCheckNotification, notificationID string) bool { + for _, item := range bindings { + if item.NotificationID == notificationID && item.Enabled { + return item.OnRecovered + } + } + return false + }, }).ParseFS(templateFS, "templates/*.html")) return &Handler{repo: repo, scheduler: scheduler, notifier: notifications.NewDispatcher(repo), cfg: cfg, tmpl: tmpl} @@ -191,6 +222,11 @@ func (h *Handler) Router() http.Handler { r.Post("/notifications/{id}/test", h.testNotification) r.Post("/notifications/{id}/delete", h.deleteNotification) r.Post("/notifications/bindings", h.saveNotificationBindings) + r.Get("/health-checks", h.healthChecksPage) + r.Get("/health-checks/section", h.healthChecksSection) + r.Post("/health-checks/{id}/settings", h.updateHealthCheckSettings) + r.Post("/health-checks/{id}/run", h.runHealthCheckNow) + r.Post("/health-checks/bindings", h.saveHealthCheckBindings) r.Get("/backups", h.backupsPage) r.Get("/schedules", h.backupsPage) r.Get("/backups/section", h.backupsSection) @@ -218,7 +254,7 @@ func (h *Handler) dashboardPage(w http.ResponseWriter, r *http.Request) { return } data.CurrentPage = "dashboard" - h.render(w, "page", data) + h.renderPage(w, r, data) } func (h *Handler) connectionsPage(w http.ResponseWriter, r *http.Request) { @@ -228,7 +264,7 @@ func (h *Handler) connectionsPage(w http.ResponseWriter, r *http.Request) { return } redactConnections(items) - h.render(w, "page", pageData{CurrentPage: "connections", Connections: items}) + h.renderPage(w, r, pageData{CurrentPage: "connections", Connections: items}) } func (h *Handler) connectionsSection(w http.ResponseWriter, r *http.Request) { @@ -258,7 +294,7 @@ func (h *Handler) remotesPage(w http.ResponseWriter, r *http.Request) { return } redactRemotes(items) - h.render(w, "page", pageData{CurrentPage: "remotes", Remotes: items}) + h.renderPage(w, r, pageData{CurrentPage: "remotes", Remotes: items}) } func (h *Handler) notificationsPage(w http.ResponseWriter, r *http.Request) { @@ -269,7 +305,7 @@ func (h *Handler) notificationsPage(w http.ResponseWriter, r *http.Request) { return } data.CurrentPage = "notifications" - h.render(w, "page", data) + h.renderPage(w, r, data) } func (h *Handler) notificationsSection(w http.ResponseWriter, r *http.Request) { @@ -282,6 +318,27 @@ func (h *Handler) notificationsSection(w http.ResponseWriter, r *http.Request) { h.render(w, "notifications_section", data) } +func (h *Handler) healthChecksPage(w http.ResponseWriter, r *http.Request) { + selectedHealthCheckID := strings.TrimSpace(r.URL.Query().Get("health_check_id")) + data, err := h.loadHealthChecksData(r.Context(), selectedHealthCheckID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + data.CurrentPage = "health-checks" + h.renderPage(w, r, data) +} + +func (h *Handler) healthChecksSection(w http.ResponseWriter, r *http.Request) { + selectedHealthCheckID := strings.TrimSpace(r.URL.Query().Get("health_check_id")) + data, err := h.loadHealthChecksData(r.Context(), selectedHealthCheckID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + h.render(w, "health_checks_section", data) +} + func (h *Handler) backupsSection(w http.ResponseWriter, r *http.Request) { data, err := h.loadBackupsData(r.Context()) if err != nil { @@ -299,7 +356,7 @@ func (h *Handler) backupsPage(w http.ResponseWriter, r *http.Request) { return } data.CurrentPage = "schedules" - h.render(w, "page", data) + h.renderPage(w, r, data) } func (h *Handler) backupRuns(w http.ResponseWriter, r *http.Request) { @@ -314,7 +371,7 @@ func (h *Handler) runsPage(w http.ResponseWriter, r *http.Request) { return } data.CurrentPage = "runs" - h.render(w, "page", data) + h.renderPage(w, r, data) } func (h *Handler) runsSection(w http.ResponseWriter, r *http.Request) { @@ -613,6 +670,15 @@ func (h *Handler) createConnection(w http.ResponseWriter, r *http.Request) { h.renderConnections(w, "", err.Error()) return } + if _, err := h.repo.UpsertHealthCheckForConnection(r.Context(), item.ID, repository.HealthCheckDefaults{ + IntervalSecond: int(h.cfg.DefaultHealthEvery / time.Second), + TimeoutSecond: int(h.cfg.DefaultHealthTimeout / time.Second), + FailureThreshold: h.cfg.DefaultFailThreshold, + SuccessThreshold: h.cfg.DefaultPassThreshold, + }); err != nil { + h.renderConnections(w, "", err.Error()) + return + } h.renderConnections(w, "Connection created", "") } @@ -644,7 +710,6 @@ func (h *Handler) testConnection(w http.ResponseWriter, r *http.Request) { h.renderConnections(w, "", "type and host are required for connection test") return } - applyConnectionDefaults(&item) result, err := h.probeConnection(r.Context(), item) if err != nil { @@ -707,159 +772,8 @@ func (h *Handler) testAllConnections(w http.ResponseWriter, r *http.Request) { h.renderConnections(w, fmt.Sprintf("%d/%d connections passed", passed, len(items)), strings.Join(failures, " | ")) } -func applyConnectionDefaults(item *models.Connection) { - if item.Port != 0 { - return - } - - switch strings.ToLower(strings.TrimSpace(item.Type)) { - case "postgres", "postgresql": - item.Port = 5432 - case "mysql": - item.Port = 3306 - case "mssql": - item.Port = 1433 - case "redis": - item.Port = 6379 - case "mongo", "mongodb": - item.Port = 27017 - } -} - func (h *Handler) probeConnection(ctx context.Context, item models.Connection) (string, error) { - typeName := strings.ToLower(strings.TrimSpace(item.Type)) - host := strings.TrimSpace(item.Host) - if host == "" { - return "", errors.New("host is required") - } - - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - switch typeName { - case "postgres", "postgresql", "mysql", "mssql", "redis", "mongo", "mongodb": - applyConnectionDefaults(&item) - if item.Port <= 0 { - return "", errors.New("port is required") - } - addr := host - if _, _, err := net.SplitHostPort(host); err != nil { - addr = net.JoinHostPort(host, strconv.Itoa(item.Port)) - } - dialer := net.Dialer{Timeout: 4 * time.Second} - conn, err := dialer.DialContext(ctx, "tcp", addr) - if err != nil { - return "", err - } - _ = conn.Close() - return "TCP reachability OK (" + addr + ")", nil - case "convex": - raw := strings.TrimSpace(item.Host) - if !strings.Contains(raw, "://") { - raw = "https://" + raw - } - u, err := url.Parse(raw) - if err != nil || u.Scheme == "" || u.Host == "" { - return "", errors.New("invalid Convex URL") - } - port := u.Port() - if port == "" { - if strings.EqualFold(u.Scheme, "http") { - port = "80" - } else { - port = "443" - } - } - addr := net.JoinHostPort(u.Hostname(), port) - dialer := net.Dialer{Timeout: 4 * time.Second} - conn, err := dialer.DialContext(ctx, "tcp", addr) - if err != nil { - return "", err - } - _ = conn.Close() - return "Convex endpoint reachable (" + addr + ")", nil - case "d1": - accountID := strings.TrimSpace(item.Host) - if accountID == "" { - accountID = strings.TrimSpace(h.cfg.CloudflareAccountID) - } - databaseID := strings.TrimSpace(item.Database) - if databaseID == "" { - databaseID = strings.TrimSpace(h.cfg.CloudflareDatabaseID) - } - apiKey := strings.TrimSpace(item.Password) - if apiKey == "" { - apiKey = strings.TrimSpace(h.cfg.CloudflareAPIKey) - } - if accountID == "" || databaseID == "" || apiKey == "" { - return "", errors.New("d1 test requires account id, database id, and api key") - } - if err := h.testD1Query(ctx, accountID, databaseID, apiKey); err != nil { - return "", err - } - return "Cloudflare D1 API reachable", nil - default: - return "", fmt.Errorf("unsupported connection type: %s", item.Type) - } -} - -func (h *Handler) testD1Query(ctx context.Context, accountID, databaseID, apiKey string) error { - endpoint := strings.TrimSuffix(strings.TrimSpace(h.cfg.D1APIBaseURL), "/") - if endpoint == "" { - endpoint = "https://api.cloudflare.com/client/v4" - } - endpoint = endpoint + "/accounts/" + url.PathEscape(accountID) + "/d1/database/" + url.PathEscape(databaseID) + "/query" - - payload, err := json.Marshal(map[string]any{"sql": "SELECT 1 AS ok"}) - if err != nil { - return err - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(payload)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+apiKey) - - res, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer func() { _ = res.Body.Close() }() - - body, err := io.ReadAll(res.Body) - if err != nil { - return err - } - - var envelope struct { - Success bool `json:"success"` - Errors []struct { - Message string `json:"message"` - } `json:"errors"` - } - if err := json.Unmarshal(body, &envelope); err != nil { - if res.StatusCode >= http.StatusBadRequest { - return fmt.Errorf("d1 query failed (status %d)", res.StatusCode) - } - return err - } - - if res.StatusCode >= http.StatusBadRequest { - if len(envelope.Errors) > 0 && strings.TrimSpace(envelope.Errors[0].Message) != "" { - return errors.New(envelope.Errors[0].Message) - } - return fmt.Errorf("d1 query failed (status %d)", res.StatusCode) - } - if !envelope.Success { - if len(envelope.Errors) > 0 && strings.TrimSpace(envelope.Errors[0].Message) != "" { - return errors.New(envelope.Errors[0].Message) - } - return errors.New("d1 query returned unsuccessful response") - } - - return nil + return health.ProbeConnection(ctx, item, h.cfg, 5*time.Second) } func (h *Handler) createRemote(w http.ResponseWriter, r *http.Request) { @@ -1030,6 +944,171 @@ func (h *Handler) saveNotificationBindings(w http.ResponseWriter, r *http.Reques h.renderNotifications(w, "Schedule notifications updated", "", selectedBackupID) } +func (h *Handler) updateHealthCheckSettings(w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + h.renderHealthChecks(w, "", "Invalid form data", strings.TrimSpace(r.FormValue("health_check_id"))) + return + } + + id := strings.TrimSpace(chi.URLParam(r, "id")) + selectedHealthCheckID := strings.TrimSpace(r.FormValue("health_check_id")) + if selectedHealthCheckID == "" { + selectedHealthCheckID = id + } + if id == "" { + h.renderHealthChecks(w, "", "health check id is required", selectedHealthCheckID) + return + } + + intervalSecond, err := strconv.Atoi(strings.TrimSpace(r.FormValue("check_interval_second"))) + if err != nil || intervalSecond <= 0 { + h.renderHealthChecks(w, "", "check_interval_second must be a positive number", selectedHealthCheckID) + return + } + timeoutSecond, err := strconv.Atoi(strings.TrimSpace(r.FormValue("timeout_second"))) + if err != nil || timeoutSecond <= 0 { + h.renderHealthChecks(w, "", "timeout_second must be a positive number", selectedHealthCheckID) + return + } + failureThreshold, err := strconv.Atoi(strings.TrimSpace(r.FormValue("failure_threshold"))) + if err != nil || failureThreshold <= 0 { + h.renderHealthChecks(w, "", "failure_threshold must be a positive number", selectedHealthCheckID) + return + } + successThreshold, err := strconv.Atoi(strings.TrimSpace(r.FormValue("success_threshold"))) + if err != nil || successThreshold <= 0 { + h.renderHealthChecks(w, "", "success_threshold must be a positive number", selectedHealthCheckID) + return + } + + enabled := r.FormValue("enabled") == "on" + if _, err := h.repo.UpdateHealthCheck(r.Context(), id, repository.HealthCheckPatch{ + Enabled: &enabled, + CheckIntervalSecond: &intervalSecond, + TimeoutSecond: &timeoutSecond, + FailureThreshold: &failureThreshold, + SuccessThreshold: &successThreshold, + }); err != nil { + h.renderHealthChecks(w, "", err.Error(), selectedHealthCheckID) + return + } + + h.renderHealthChecks(w, "Health check settings updated", "", selectedHealthCheckID) +} + +func (h *Handler) runHealthCheckNow(w http.ResponseWriter, r *http.Request) { + id := strings.TrimSpace(chi.URLParam(r, "id")) + selectedHealthCheckID := strings.TrimSpace(r.URL.Query().Get("health_check_id")) + if selectedHealthCheckID == "" { + selectedHealthCheckID = id + } + if id == "" { + h.renderHealthChecks(w, "", "health check id is required", selectedHealthCheckID) + return + } + + checkItem, err := h.repo.GetHealthCheck(r.Context(), id) + if err != nil { + h.renderHealthChecks(w, "", "health check not found", selectedHealthCheckID) + return + } + if checkItem.LastCheckedAt != nil && h.cfg.HealthManualCooldown > 0 { + nextAllowedAt := checkItem.LastCheckedAt.UTC().Add(h.cfg.HealthManualCooldown) + if time.Now().UTC().Before(nextAllowedAt) { + retryAfter := int(time.Until(nextAllowedAt).Seconds()) + if retryAfter < 1 { + retryAfter = 1 + } + h.renderHealthChecks(w, "", "Manual health check is on cooldown. Retry in "+strconv.Itoa(retryAfter)+"s", selectedHealthCheckID) + return + } + } + + timeoutSecond := checkItem.TimeoutSecond + if timeoutSecond <= 0 { + timeoutSecond = int(h.cfg.DefaultHealthTimeout / time.Second) + } + if timeoutSecond <= 0 { + timeoutSecond = 5 + } + timeout := time.Duration(timeoutSecond) * time.Second + + result, probeErr := health.ProbeConnection(r.Context(), checkItem.Connection, h.cfg, timeout) + healthy := probeErr == nil + errText := "" + if probeErr != nil { + errText = strings.TrimSpace(probeErr.Error()) + } + + updated, event, saveErr := h.repo.SaveHealthCheckProbeResult(r.Context(), checkItem.ID, time.Now().UTC(), healthy, errText) + if saveErr != nil { + h.renderHealthChecks(w, "", saveErr.Error(), selectedHealthCheckID) + return + } + if event != "" { + if notifyErr := h.notifier.NotifyHealthCheckEvent(r.Context(), updated, event); notifyErr != nil { + h.renderHealthChecks(w, "", "health check updated but notification failed: "+notifyErr.Error(), selectedHealthCheckID) + return + } + _ = h.repo.MarkHealthCheckNotified(r.Context(), updated.ID, time.Now().UTC()) + } + + if probeErr != nil { + h.renderHealthChecks(w, "", "Health check failed: "+probeErr.Error(), selectedHealthCheckID) + return + } + h.renderHealthChecks(w, "Health check passed: "+result, "", selectedHealthCheckID) +} + +func (h *Handler) saveHealthCheckBindings(w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + h.renderHealthChecks(w, "", "Invalid form data", strings.TrimSpace(r.FormValue("health_check_id"))) + return + } + + selectedHealthCheckID := strings.TrimSpace(r.FormValue("health_check_id")) + if selectedHealthCheckID == "" { + h.renderHealthChecks(w, "", "health_check_id is required", selectedHealthCheckID) + return + } + + notifications, err := h.repo.ListNotifications(r.Context()) + if err != nil { + h.renderHealthChecks(w, "", err.Error(), selectedHealthCheckID) + return + } + + bindings := make([]models.HealthCheckNotification, 0, len(notifications)) + for _, notification := range notifications { + key := notification.ID + enabled := r.FormValue("binding_enabled_"+key) == "on" + if !enabled { + continue + } + + onDown := r.FormValue("binding_down_"+key) == "on" + onRecovered := r.FormValue("binding_recovered_"+key) == "on" + if !onDown && !onRecovered { + h.renderHealthChecks(w, "", "each enabled notification must have down or recovered selected", selectedHealthCheckID) + return + } + + bindings = append(bindings, models.HealthCheckNotification{ + NotificationID: notification.ID, + Enabled: true, + OnDown: onDown, + OnRecovered: onRecovered, + }) + } + + if _, err := h.repo.SetHealthCheckNotifications(r.Context(), selectedHealthCheckID, bindings); err != nil { + h.renderHealthChecks(w, "", err.Error(), selectedHealthCheckID) + return + } + + h.renderHealthChecks(w, "Health check notifications updated", "", selectedHealthCheckID) +} + func (h *Handler) createBackup(w http.ResponseWriter, r *http.Request) { if err := r.ParseForm(); err != nil { h.renderBackups(w, "", "Invalid form data") @@ -1197,6 +1276,14 @@ func (h *Handler) renderNotifications(w http.ResponseWriter, msg, errMsg, select h.render(w, "notifications_section", data) } +func (h *Handler) renderHealthChecks(w http.ResponseWriter, msg, errMsg, selectedHealthCheckID string) { + data, _ := h.loadHealthChecksData(context.Background(), selectedHealthCheckID) + data.CurrentPage = "health-checks" + data.HealthChecksMsg = msg + data.HealthChecksErr = errMsg + h.render(w, "health_checks_section", data) +} + func (h *Handler) render(w http.ResponseWriter, name string, data pageData) { w.Header().Set("Content-Type", "text/html; charset=utf-8") if err := h.tmpl.ExecuteTemplate(w, name, data); err != nil { @@ -1204,6 +1291,14 @@ func (h *Handler) render(w http.ResponseWriter, name string, data pageData) { } } +func (h *Handler) renderPage(w http.ResponseWriter, r *http.Request, data pageData) { + if err := h.decorateHeaderHealth(r.Context(), &data); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + h.render(w, "page", data) +} + func (h *Handler) loadCoreData(ctx context.Context) ([]models.Connection, []models.Remote, []models.Backup, error) { connections, err := h.repo.ListConnections(ctx) if err != nil { @@ -1254,6 +1349,35 @@ func (h *Handler) loadBackupsData(ctx context.Context) (pageData, error) { }, nil } +func (h *Handler) decorateHeaderHealth(ctx context.Context, data *pageData) error { + checks, err := h.repo.ListHealthChecks(ctx) + if err != nil { + return err + } + redactHealthChecks(checks) + + down := make([]models.HealthCheck, 0) + for _, item := range checks { + if strings.EqualFold(strings.TrimSpace(item.Status), "down") { + down = append(down, item) + } + } + + data.TotalHealthChecks = len(checks) + data.DownHealthChecks = down + data.DownHealthCheckCount = len(down) + if len(checks) == 0 { + data.HealthSummaryLabel = "No health checks configured" + return nil + } + if len(down) > 0 { + data.HealthSummaryLabel = fmt.Sprintf("%d database(s) down", len(down)) + return nil + } + data.HealthSummaryLabel = "All systems operational" + return nil +} + func (h *Handler) loadNotificationsData(ctx context.Context, selectedBackupID string) (pageData, error) { connections, remotes, backups, err := h.loadCoreData(ctx) if err != nil { @@ -1305,6 +1429,64 @@ func (h *Handler) loadNotificationsData(ctx context.Context, selectedBackupID st }, nil } +func (h *Handler) loadHealthChecksData(ctx context.Context, selectedHealthCheckID string) (pageData, error) { + connections, remotes, backups, err := h.loadCoreData(ctx) + if err != nil { + return pageData{}, err + } + + healthChecks, err := h.repo.ListHealthChecks(ctx) + if err != nil { + return pageData{}, err + } + redactHealthChecks(healthChecks) + + notifications, err := h.repo.ListNotifications(ctx) + if err != nil { + return pageData{}, err + } + redactNotifications(notifications) + + counts := make(map[string]int, len(healthChecks)) + for _, check := range healthChecks { + bindings, bindErr := h.repo.ListHealthCheckNotifications(ctx, check.ID) + if bindErr != nil { + return pageData{}, bindErr + } + active := 0 + for _, binding := range bindings { + if binding.Enabled { + active++ + } + } + counts[check.ID] = active + } + + if strings.TrimSpace(selectedHealthCheckID) == "" && len(healthChecks) > 0 { + selectedHealthCheckID = healthChecks[0].ID + } + + bindings := make([]models.HealthCheckNotification, 0) + if strings.TrimSpace(selectedHealthCheckID) != "" { + bindings, err = h.repo.ListHealthCheckNotifications(ctx, selectedHealthCheckID) + if err != nil { + return pageData{}, err + } + redactHealthCheckNotifications(bindings) + } + + return pageData{ + Connections: connections, + Remotes: remotes, + Backups: backups, + HealthChecks: healthChecks, + Notifications: notifications, + HealthCheckBindings: bindings, + SelectedHealthCheckID: selectedHealthCheckID, + HealthNotificationCounts: counts, + }, nil +} + func (h *Handler) loadDashboardData(ctx context.Context) (pageData, error) { connections, remotes, backups, err := h.loadCoreData(ctx) if err != nil { @@ -1464,6 +1646,19 @@ func redactBackupNotifications(items []models.BackupNotification) { } } +func redactHealthChecks(items []models.HealthCheck) { + for i := range items { + items[i].Connection.Password = "" + } +} + +func redactHealthCheckNotifications(items []models.HealthCheckNotification) { + for i := range items { + items[i].Notification.DiscordWebhookURL = "" + items[i].Notification.SMTPPassword = "" + } +} + func connectionTypeLabel(v string) string { switch strings.ToLower(strings.TrimSpace(v)) { case "postgres", "postgresql": diff --git a/internal/ui/templates/app.html b/internal/ui/templates/app.html index aa3e9c4..0584c63 100644 --- a/internal/ui/templates/app.html +++ b/internal/ui/templates/app.html @@ -114,10 +114,99 @@ width: 8px; height: 8px; border-radius: 999px; + } + + .status-dot-up { background: var(--green); box-shadow: 0 0 0 4px rgba(34, 197, 94, 0.1); } + .status-dot-down { + background: var(--red); + box-shadow: 0 0 0 4px rgba(239, 68, 68, 0.15); + } + + .health-popover { + position: relative; + } + + .health-popover summary { + list-style: none; + cursor: pointer; + display: inline-flex; + align-items: center; + gap: 6px; + color: inherit; + user-select: none; + } + + .health-popover summary::-webkit-details-marker { + display: none; + } + + .health-popover-menu { + position: absolute; + right: 0; + top: calc(100% + 8px); + width: 340px; + max-width: min(340px, 88vw); + background: var(--bg-elevated); + border: 1px solid var(--border-sub); + border-radius: var(--radius-md); + box-shadow: 0 14px 30px rgba(0, 0, 0, 0.35); + padding: 10px; + z-index: 120; + } + + .health-popover-title { + font-size: 11px; + color: var(--text-muted); + text-transform: uppercase; + letter-spacing: 0.06em; + margin-bottom: 8px; + } + + .health-popover-item { + border: 1px solid var(--border-dim); + border-radius: var(--radius-sm); + background: var(--bg-surface); + padding: 8px; + margin-bottom: 6px; + } + + .health-popover-link { + display: block; + text-decoration: none; + color: inherit; + } + + .health-popover-link:hover .health-popover-item { + border-color: var(--border-em); + background: var(--bg-hover); + } + + .health-popover-item:last-child { + margin-bottom: 0; + } + + .health-popover-name { + color: var(--text-primary); + font-weight: 600; + } + + .health-popover-meta { + color: var(--text-secondary); + margin-top: 2px; + } + + .health-popover-error { + color: #fca5a5; + margin-top: 4px; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + } + .version { font-family: var(--font-mono); font-size: 11px; @@ -654,8 +743,29 @@ AnchorDB
| Connection | +Status | +Failures | +Last Check | +Bindings | +Actions | +
|---|---|---|---|---|---|
|
+ {{.Connection.Name}}
+ {{typeLabel .Connection.Type}} · {{connectionAddress .Connection}}
+ |
+ + {{if eq .Status "up"}}up{{end}} + {{if eq .Status "down"}}down{{end}} + {{if and (ne .Status "up") (ne .Status "down")}}{{.Status}}{{end}} + | +{{.ConsecutiveFailures}} | +{{if .LastCheckedAt}}{{ago .LastCheckedAt.UTC}}{{else}}-{{end}} | +{{index $.HealthNotificationCounts .ID}} bound | +
+
+
+ Configure
+
+ |
+
| No health checks yet. Add a connection first. | +|||||