Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/anchordb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"anchordb/internal/api"
"anchordb/internal/config"
"anchordb/internal/crypto"
"anchordb/internal/health"
"anchordb/internal/metadata"
"anchordb/internal/repository"
"anchordb/internal/scheduler"
Expand All @@ -36,14 +37,16 @@ func main() {
repo := repository.New(db, cryptoSvc)
exec := scheduler.NewExecutor(cfg)
sch := scheduler.New(repo, exec, cfg)
hmon := health.NewMonitor(cfg, repo)

ctx := context.Background()
if err := sch.LoadAll(ctx); err != nil {
log.Fatalf("load schedules: %v", err)
}
sch.Start()
hmon.Start(ctx)

apiHandler := api.NewHandler(repo, sch)
apiHandler := api.NewHandler(repo, sch, cfg)
uiHandler := ui.NewHandler(repo, sch, cfg)

r := chi.NewRouter()
Expand Down Expand Up @@ -71,6 +74,7 @@ func main() {

log.Println("shutting down")
sch.Stop()
hmon.Stop()

shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
Expand Down
240 changes: 238 additions & 2 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"anchordb/internal/config"
"anchordb/internal/health"
"anchordb/internal/models"
"anchordb/internal/notifications"
"anchordb/internal/repository"
Expand All @@ -23,10 +25,11 @@ type Handler struct {
repo *repository.Repository
scheduler *scheduler.Scheduler
notifier *notifications.Dispatcher
cfg config.Config
}

func NewHandler(repo *repository.Repository, scheduler *scheduler.Scheduler) *Handler {
return &Handler{repo: repo, scheduler: scheduler, notifier: notifications.NewDispatcher(repo)}
func NewHandler(repo *repository.Repository, scheduler *scheduler.Scheduler, cfg config.Config) *Handler {
return &Handler{repo: repo, scheduler: scheduler, notifier: notifications.NewDispatcher(repo), cfg: cfg}
}

func (h *Handler) Router() http.Handler {
Expand Down Expand Up @@ -74,6 +77,15 @@ func (h *Handler) Router() http.Handler {
r.Put("/{id}/notifications", h.setBackupNotifications)
})

r.Route("/health-checks", func(r chi.Router) {
r.Get("/", h.listHealthChecks)
r.Get("/{id}", h.getHealthCheck)
r.Patch("/{id}", h.updateHealthCheck)
r.Post("/{id}/run", h.runHealthCheckNow)
r.Get("/{id}/notifications", h.listHealthCheckNotifications)
r.Put("/{id}/notifications", h.setHealthCheckNotifications)
})

return r
}

Expand Down Expand Up @@ -112,6 +124,15 @@ func (h *Handler) createConnection(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if _, err := h.repo.UpsertHealthCheckForConnection(r.Context(), req.ID, repository.HealthCheckDefaults{
IntervalSecond: int(h.cfg.DefaultHealthEvery / time.Second),
TimeoutSecond: int(h.cfg.DefaultHealthTimeout / time.Second),
FailureThreshold: h.cfg.DefaultFailThreshold,
SuccessThreshold: h.cfg.DefaultPassThreshold,
}); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
redactConnection(&req)
writeJSON(w, http.StatusCreated, req)
}
Expand Down Expand Up @@ -729,6 +750,221 @@ func (h *Handler) setBackupNotifications(w http.ResponseWriter, r *http.Request)
writeJSON(w, http.StatusOK, items)
}

func (h *Handler) listHealthChecks(w http.ResponseWriter, r *http.Request) {
items, err := h.repo.ListHealthChecks(r.Context())
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
for i := range items {
redactConnection(&items[i].Connection)
}
writeJSON(w, http.StatusOK, items)
}

func (h *Handler) getHealthCheck(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
item, err := h.repo.GetHealthCheck(r.Context(), id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
writeError(w, http.StatusNotFound, "health check not found")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}
redactConnection(&item.Connection)
writeJSON(w, http.StatusOK, item)
}

func (h *Handler) updateHealthCheck(w http.ResponseWriter, r *http.Request) {
type patchRequest struct {
Enabled *bool `json:"enabled"`
CheckIntervalSecond *int `json:"check_interval_second"`
TimeoutSecond *int `json:"timeout_second"`
FailureThreshold *int `json:"failure_threshold"`
SuccessThreshold *int `json:"success_threshold"`
}

id := chi.URLParam(r, "id")
var req patchRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid JSON")
return
}

item, err := h.repo.UpdateHealthCheck(r.Context(), id, repository.HealthCheckPatch{
Enabled: req.Enabled,
CheckIntervalSecond: req.CheckIntervalSecond,
TimeoutSecond: req.TimeoutSecond,
FailureThreshold: req.FailureThreshold,
SuccessThreshold: req.SuccessThreshold,
})
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
writeError(w, http.StatusNotFound, "health check not found")
return
}
writeError(w, http.StatusBadRequest, err.Error())
return
}
redactConnection(&item.Connection)
writeJSON(w, http.StatusOK, item)
}

func (h *Handler) runHealthCheckNow(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
item, err := h.repo.GetHealthCheck(r.Context(), id)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
writeError(w, http.StatusNotFound, "health check not found")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}

if item.LastCheckedAt != nil && h.cfg.HealthManualCooldown > 0 {
nextAllowedAt := item.LastCheckedAt.UTC().Add(h.cfg.HealthManualCooldown)
if time.Now().UTC().Before(nextAllowedAt) {
retryAfter := int(time.Until(nextAllowedAt).Seconds())
if retryAfter < 1 {
retryAfter = 1
}
w.Header().Set("Retry-After", strconv.Itoa(retryAfter))
writeJSON(w, http.StatusTooManyRequests, map[string]any{
"status": "cooldown",
"error": "manual health check is on cooldown",
"retry_after_seconds": retryAfter,
})
return
}
}

timeoutSecond := item.TimeoutSecond
if timeoutSecond <= 0 {
timeoutSecond = int(h.cfg.DefaultHealthTimeout / time.Second)
}
if timeoutSecond <= 0 {
timeoutSecond = 5
}
timeout := time.Duration(timeoutSecond) * time.Second

result, probeErr := health.ProbeConnection(r.Context(), item.Connection, h.cfg, timeout)
healthy := probeErr == nil
errText := ""
if probeErr != nil {
errText = strings.TrimSpace(probeErr.Error())
}

updated, event, saveErr := h.repo.SaveHealthCheckProbeResult(r.Context(), item.ID, time.Now().UTC(), healthy, errText)
if saveErr != nil {
writeError(w, http.StatusInternalServerError, saveErr.Error())
return
}
if event != "" {
if notifyErr := h.notifier.NotifyHealthCheckEvent(r.Context(), updated, event); notifyErr != nil {
writeError(w, http.StatusBadGateway, "health check updated but notification failed: "+notifyErr.Error())
return
}
_ = h.repo.MarkHealthCheckNotified(r.Context(), updated.ID, time.Now().UTC())
}

redactConnection(&updated.Connection)
if probeErr != nil {
writeJSON(w, http.StatusBadGateway, map[string]any{"status": "failed", "error": probeErr.Error(), "health_check": updated})
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "result": result, "health_check": updated})
}

func (h *Handler) listHealthCheckNotifications(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if _, err := h.repo.GetHealthCheck(r.Context(), id); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
writeError(w, http.StatusNotFound, "health check not found")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}

items, err := h.repo.ListHealthCheckNotifications(r.Context(), id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
for i := range items {
redactNotification(&items[i].Notification)
}
writeJSON(w, http.StatusOK, items)
}

func (h *Handler) setHealthCheckNotifications(w http.ResponseWriter, r *http.Request) {
type notificationBindingRequest struct {
NotificationID string `json:"notification_id"`
OnDown *bool `json:"on_down"`
OnRecovered *bool `json:"on_recovered"`
Enabled *bool `json:"enabled"`
}
type bindingsRequest struct {
Notifications []notificationBindingRequest `json:"notifications"`
}

id := chi.URLParam(r, "id")
if _, err := h.repo.GetHealthCheck(r.Context(), id); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
writeError(w, http.StatusNotFound, "health check not found")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}

var req bindingsRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid JSON")
return
}

bindings := make([]models.HealthCheckNotification, 0, len(req.Notifications))
for _, item := range req.Notifications {
onDown := true
if item.OnDown != nil {
onDown = *item.OnDown
}
onRecovered := true
if item.OnRecovered != nil {
onRecovered = *item.OnRecovered
}
enabled := true
if item.Enabled != nil {
enabled = *item.Enabled
}

bindings = append(bindings, models.HealthCheckNotification{
NotificationID: strings.TrimSpace(item.NotificationID),
OnDown: onDown,
OnRecovered: onRecovered,
Enabled: enabled,
})
}

items, err := h.repo.SetHealthCheckNotifications(r.Context(), id, bindings)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
writeError(w, http.StatusNotFound, "health check not found")
return
}
writeError(w, http.StatusBadRequest, err.Error())
return
}
for i := range items {
redactNotification(&items[i].Notification)
}
writeJSON(w, http.StatusOK, items)
}

func (h *Handler) validateBackupRequest(r *http.Request, b models.Backup, create bool) error {
if create && (b.Name == "" || b.ConnectionID == "" || b.CronExpr == "" || b.TargetType == "") {
return errors.New("name, connection_id, cron_expr, target_type are required")
Expand Down
Loading
Loading