diff --git a/internal/app/migrate_env_test.go b/internal/app/migrate_env_test.go index 48845e6b..85438a2c 100644 --- a/internal/app/migrate_env_test.go +++ b/internal/app/migrate_env_test.go @@ -110,11 +110,14 @@ func TestMigrateAttachmentEnvFile(t *testing.T) { require.NoError(t, err) containerName := fmt.Sprintf("gitea-postgres-%d", time.Now().UnixNano()) + envFileName := "gordon-" + containerName + ".env" + // extractContainerNameFromAttachmentFile strips the .env suffix, so the + // stored name includes the "gordon-" prefix from the filename. + storedContainerName := extractContainerNameFromAttachmentFile(envFileName) keys := []string{"POSTGRES_USER", "POSTGRES_PASSWORD"} - defer cleanupPassAttachment(t, containerName, keys) + defer cleanupPassAttachment(t, storedContainerName, keys) envContent := "POSTGRES_USER=gitea\nPOSTGRES_PASSWORD=secret123\n" - envFileName := "gordon-" + containerName + ".env" envFilePath := filepath.Join(tmpDir, envFileName) err = os.WriteFile(envFilePath, []byte(envContent), 0600) require.NoError(t, err) @@ -129,7 +132,7 @@ func TestMigrateAttachmentEnvFile(t *testing.T) { _, err = os.Stat(migratedPath) assert.NoError(t, err, ".env.migrated file should exist") - values, err := store.GetAllAttachment(containerName) + values, err := store.GetAllAttachment(storedContainerName) require.NoError(t, err) assert.Equal(t, "gitea", values["POSTGRES_USER"]) assert.Equal(t, "secret123", values["POSTGRES_PASSWORD"]) diff --git a/internal/app/run.go b/internal/app/run.go index f5569c98..83b5174a 100644 --- a/internal/app/run.go +++ b/internal/app/run.go @@ -1750,7 +1750,7 @@ func runServers(ctx context.Context, v *viper.Viper, cfg Config, registryHandler syncAndAutoStart(ctx, svc, log) waitForShutdown(ctx, errChan, reloadChan, deployChan, eventBus, log) - gracefulShutdown(registrySrv, proxySrv, tlsSrv, containerSvc, log) + gracefulShutdown(registrySrv, proxySrv, tlsSrv, containerSvc, svc.proxySvc, log) return nil } @@ -1950,13 +1950,14 @@ func waitForShutdown(ctx context.Context, errChan <-chan error, reloadChan, depl // gracefulShutdown stops HTTP servers with a 30s timeout, then shuts down // the container service and cleans up runtime files. -func gracefulShutdown(registrySrv, proxySrv, tlsSrv *http.Server, containerSvc *container.Service, log zerowrap.Logger) { +func gracefulShutdown(registrySrv, proxySrv, tlsSrv *http.Server, containerSvc *container.Service, proxySvc *proxy.Service, log zerowrap.Logger) { log.Info().Msg("shutting down Gordon...") shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) defer shutdownCancel() - for _, srv := range []*http.Server{registrySrv, proxySrv, tlsSrv} { + // Phase 1: Stop ingress frontends (TLS, then proxy) — no new traffic accepted + for _, srv := range []*http.Server{tlsSrv, proxySrv} { if srv == nil { continue } @@ -1965,6 +1966,21 @@ func gracefulShutdown(registrySrv, proxySrv, tlsSrv *http.Server, containerSvc * } } + // Phase 2: Drain in-flight registry push sessions before stopping the backend + if proxySvc != nil { + log.Info().Msg("draining in-flight registry requests...") + if drained := proxySvc.DrainRegistryInFlight(25 * time.Second); !drained { + log.Warn().Int64("in_flight", proxySvc.RegistryInFlight()).Msg("registry drain timed out; some in-flight pushes may be interrupted") + } + } + + // Phase 3: Stop the registry backend + if registrySrv != nil { + if err := registrySrv.Shutdown(shutdownCtx); err != nil { + log.Warn().Err(err).Str("addr", registrySrv.Addr).Msg("server shutdown error") + } + } + containerSvc.StopMonitor() if err := containerSvc.Shutdown(shutdownCtx); err != nil { diff --git a/internal/usecase/proxy/service.go b/internal/usecase/proxy/service.go index 99bdead4..b8ff120e 100644 --- a/internal/usecase/proxy/service.go +++ b/internal/usecase/proxy/service.go @@ -70,15 +70,16 @@ type Config struct { // Service implements the ProxyService interface. type Service struct { - runtime out.ContainerRuntime - containerSvc in.ContainerService - configSvc in.ConfigService - config Config - targets map[string]*domain.ProxyTarget - mu sync.RWMutex - activeConns atomic.Int64 // Atomic counter for concurrent connection limiting - inFlight map[string]int - inFlightMu sync.Mutex + runtime out.ContainerRuntime + containerSvc in.ContainerService + configSvc in.ConfigService + config Config + targets map[string]*domain.ProxyTarget + mu sync.RWMutex + activeConns atomic.Int64 // Atomic counter for concurrent connection limiting + inFlight map[string]int + inFlightMu sync.Mutex + registryInFlight atomic.Int64 // active registry proxy requests, for graceful drain } // NewService creates a new proxy service. @@ -574,6 +575,26 @@ func (s *Service) trackInFlight(containerID string) func() { } } +// RegistryInFlight returns the current count of active registry proxy requests. +func (s *Service) RegistryInFlight() int64 { + return s.registryInFlight.Load() +} + +// DrainRegistryInFlight blocks until all in-flight registry proxy requests +// complete or the timeout elapses. Returns true if drained cleanly, false if +// timed out with requests still in flight. Call this before shutting down the +// registry server. +func (s *Service) DrainRegistryInFlight(timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if s.registryInFlight.Load() == 0 { + return true + } + time.Sleep(10 * time.Millisecond) + } + return false +} + // proxyToRegistry forwards requests to the local registry HTTP server. // SECURITY: This uses http://localhost:{port} which is safe because the registry // runs on the same host and traffic never leaves the loopback interface. If Gordon @@ -585,6 +606,9 @@ func (s *Service) proxyToRegistry(w http.ResponseWriter, r *http.Request) { cfg := s.config s.mu.RUnlock() + s.registryInFlight.Add(1) + defer s.registryInFlight.Add(-1) + log := zerowrap.FromCtx(r.Context()) targetURL, err := url.Parse(fmt.Sprintf("http://localhost:%d", cfg.RegistryPort)) diff --git a/internal/usecase/proxy/service_test.go b/internal/usecase/proxy/service_test.go index 838669e4..841bb907 100644 --- a/internal/usecase/proxy/service_test.go +++ b/internal/usecase/proxy/service_test.go @@ -4,6 +4,7 @@ import ( "context" "net/http/httptest" "testing" + "time" "github.com/bnema/zerowrap" "github.com/stretchr/testify/assert" @@ -510,3 +511,66 @@ func TestServeHTTP_MaxBodySize(t *testing.T) { }) } } + +func TestRegistryInFlightTracking(t *testing.T) { + svc := &Service{ + inFlight: make(map[string]int), + } + + if got := svc.registryInFlight.Load(); got != 0 { + t.Fatalf("expected 0 in-flight, got %d", got) + } + + svc.registryInFlight.Add(1) + if got := svc.registryInFlight.Load(); got != 1 { + t.Fatalf("expected 1 in-flight after Add, got %d", got) + } + + svc.registryInFlight.Add(-1) + if got := svc.registryInFlight.Load(); got != 0 { + t.Fatalf("expected 0 in-flight after release, got %d", got) + } +} + +func TestDrainRegistryInFlight(t *testing.T) { + svc := &Service{ + inFlight: make(map[string]int), + } + + svc.registryInFlight.Add(2) + + done := make(chan struct{}) + go func() { + drained := svc.DrainRegistryInFlight(50 * time.Millisecond) + if !drained { + // Signal failure via done channel by leaving it open — test will time out + return + } + close(done) + }() + + time.Sleep(5 * time.Millisecond) + svc.registryInFlight.Add(-1) + svc.registryInFlight.Add(-1) + + select { + case <-done: + // good — drained cleanly + case <-time.After(500 * time.Millisecond): + t.Fatal("DrainRegistryInFlight did not return true after requests completed") + } +} + +func TestDrainRegistryInFlightTimeout(t *testing.T) { + svc := &Service{ + inFlight: make(map[string]int), + } + + // Add a request and never release it + svc.registryInFlight.Add(1) + + drained := svc.DrainRegistryInFlight(30 * time.Millisecond) + if drained { + t.Fatal("expected DrainRegistryInFlight to return false on timeout, got true") + } +}