Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/app/migrate_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ func TestMigrateAttachmentEnvFile(t *testing.T) {
require.NoError(t, err)

containerName := fmt.Sprintf("gitea-postgres-%d", time.Now().UnixNano())
envFileName := "gordon-" + containerName + ".env"
// extractContainerNameFromAttachmentFile strips the .env suffix, so the
// stored name includes the "gordon-" prefix from the filename.
storedContainerName := extractContainerNameFromAttachmentFile(envFileName)
keys := []string{"POSTGRES_USER", "POSTGRES_PASSWORD"}
defer cleanupPassAttachment(t, containerName, keys)
defer cleanupPassAttachment(t, storedContainerName, keys)

envContent := "POSTGRES_USER=gitea\nPOSTGRES_PASSWORD=secret123\n"
envFileName := "gordon-" + containerName + ".env"
envFilePath := filepath.Join(tmpDir, envFileName)
err = os.WriteFile(envFilePath, []byte(envContent), 0600)
require.NoError(t, err)
Expand All @@ -129,7 +132,7 @@ func TestMigrateAttachmentEnvFile(t *testing.T) {
_, err = os.Stat(migratedPath)
assert.NoError(t, err, ".env.migrated file should exist")

values, err := store.GetAllAttachment(containerName)
values, err := store.GetAllAttachment(storedContainerName)
require.NoError(t, err)
assert.Equal(t, "gitea", values["POSTGRES_USER"])
assert.Equal(t, "secret123", values["POSTGRES_PASSWORD"])
Expand Down
22 changes: 19 additions & 3 deletions internal/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ func runServers(ctx context.Context, v *viper.Viper, cfg Config, registryHandler
syncAndAutoStart(ctx, svc, log)

waitForShutdown(ctx, errChan, reloadChan, deployChan, eventBus, log)
gracefulShutdown(registrySrv, proxySrv, tlsSrv, containerSvc, log)
gracefulShutdown(registrySrv, proxySrv, tlsSrv, containerSvc, svc.proxySvc, log)
return nil
}

Expand Down Expand Up @@ -1950,13 +1950,14 @@ func waitForShutdown(ctx context.Context, errChan <-chan error, reloadChan, depl

// gracefulShutdown stops HTTP servers with a 30s timeout, then shuts down
// the container service and cleans up runtime files.
func gracefulShutdown(registrySrv, proxySrv, tlsSrv *http.Server, containerSvc *container.Service, log zerowrap.Logger) {
func gracefulShutdown(registrySrv, proxySrv, tlsSrv *http.Server, containerSvc *container.Service, proxySvc *proxy.Service, log zerowrap.Logger) {
log.Info().Msg("shutting down Gordon...")

shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()

for _, srv := range []*http.Server{registrySrv, proxySrv, tlsSrv} {
// Phase 1: Stop ingress frontends (TLS, then proxy) — no new traffic accepted
for _, srv := range []*http.Server{tlsSrv, proxySrv} {
if srv == nil {
continue
}
Expand All @@ -1965,6 +1966,21 @@ func gracefulShutdown(registrySrv, proxySrv, tlsSrv *http.Server, containerSvc *
}
}

// Phase 2: Drain in-flight registry push sessions before stopping the backend
if proxySvc != nil {
log.Info().Msg("draining in-flight registry requests...")
if drained := proxySvc.DrainRegistryInFlight(25 * time.Second); !drained {
log.Warn().Int64("in_flight", proxySvc.RegistryInFlight()).Msg("registry drain timed out; some in-flight pushes may be interrupted")
}
}

// Phase 3: Stop the registry backend
if registrySrv != nil {
if err := registrySrv.Shutdown(shutdownCtx); err != nil {
log.Warn().Err(err).Str("addr", registrySrv.Addr).Msg("server shutdown error")
}
}

containerSvc.StopMonitor()

if err := containerSvc.Shutdown(shutdownCtx); err != nil {
Expand Down
42 changes: 33 additions & 9 deletions internal/usecase/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ type Config struct {

// Service implements the ProxyService interface.
type Service struct {
runtime out.ContainerRuntime
containerSvc in.ContainerService
configSvc in.ConfigService
config Config
targets map[string]*domain.ProxyTarget
mu sync.RWMutex
activeConns atomic.Int64 // Atomic counter for concurrent connection limiting
inFlight map[string]int
inFlightMu sync.Mutex
runtime out.ContainerRuntime
containerSvc in.ContainerService
configSvc in.ConfigService
config Config
targets map[string]*domain.ProxyTarget
mu sync.RWMutex
activeConns atomic.Int64 // Atomic counter for concurrent connection limiting
inFlight map[string]int
inFlightMu sync.Mutex
registryInFlight atomic.Int64 // active registry proxy requests, for graceful drain
}

// NewService creates a new proxy service.
Expand Down Expand Up @@ -574,6 +575,26 @@ func (s *Service) trackInFlight(containerID string) func() {
}
}

// RegistryInFlight returns the current count of active registry proxy requests.
func (s *Service) RegistryInFlight() int64 {
return s.registryInFlight.Load()
}

// DrainRegistryInFlight blocks until all in-flight registry proxy requests
// complete or the timeout elapses. Returns true if drained cleanly, false if
// timed out with requests still in flight. Call this before shutting down the
// registry server.
func (s *Service) DrainRegistryInFlight(timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if s.registryInFlight.Load() == 0 {
return true
}
time.Sleep(10 * time.Millisecond)
}
return false
}

// proxyToRegistry forwards requests to the local registry HTTP server.
// SECURITY: This uses http://localhost:{port} which is safe because the registry
// runs on the same host and traffic never leaves the loopback interface. If Gordon
Expand All @@ -585,6 +606,9 @@ func (s *Service) proxyToRegistry(w http.ResponseWriter, r *http.Request) {
cfg := s.config
s.mu.RUnlock()

s.registryInFlight.Add(1)
defer s.registryInFlight.Add(-1)

log := zerowrap.FromCtx(r.Context())

targetURL, err := url.Parse(fmt.Sprintf("http://localhost:%d", cfg.RegistryPort))
Expand Down
64 changes: 64 additions & 0 deletions internal/usecase/proxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http/httptest"
"testing"
"time"

"github.com/bnema/zerowrap"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -510,3 +511,66 @@ func TestServeHTTP_MaxBodySize(t *testing.T) {
})
}
}

func TestRegistryInFlightTracking(t *testing.T) {
svc := &Service{
inFlight: make(map[string]int),
}

if got := svc.registryInFlight.Load(); got != 0 {
t.Fatalf("expected 0 in-flight, got %d", got)
}

svc.registryInFlight.Add(1)
if got := svc.registryInFlight.Load(); got != 1 {
t.Fatalf("expected 1 in-flight after Add, got %d", got)
}

svc.registryInFlight.Add(-1)
if got := svc.registryInFlight.Load(); got != 0 {
t.Fatalf("expected 0 in-flight after release, got %d", got)
}
}

func TestDrainRegistryInFlight(t *testing.T) {
svc := &Service{
inFlight: make(map[string]int),
}

svc.registryInFlight.Add(2)

done := make(chan struct{})
go func() {
drained := svc.DrainRegistryInFlight(50 * time.Millisecond)
if !drained {
// Signal failure via done channel by leaving it open — test will time out
return
}
close(done)
}()

time.Sleep(5 * time.Millisecond)
svc.registryInFlight.Add(-1)
svc.registryInFlight.Add(-1)

select {
case <-done:
// good — drained cleanly
case <-time.After(500 * time.Millisecond):
t.Fatal("DrainRegistryInFlight did not return true after requests completed")
}
}
Comment on lines +535 to +562
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider using t.Errorf in goroutine for faster failure feedback.

The current approach leaves the done channel open on failure, causing the test to wait for the full 500ms timeout. Using a result channel or t.Errorf would provide faster feedback on failures.

♻️ Optional: faster failure signaling
 	done := make(chan struct{})
+	result := make(chan bool, 1)
 	go func() {
 		drained := svc.DrainRegistryInFlight(50 * time.Millisecond)
-		if !drained {
-			// Signal failure via done channel by leaving it open — test will time out
-			return
-		}
-		close(done)
+		result <- drained
+		if drained {
+			close(done)
+		}
 	}()
 
 	time.Sleep(5 * time.Millisecond)
 	svc.registryInFlight.Add(-1)
 	svc.registryInFlight.Add(-1)
 
 	select {
 	case <-done:
 		// good — drained cleanly
 	case <-time.After(500 * time.Millisecond):
-		t.Fatal("DrainRegistryInFlight did not return true after requests completed")
+		if r := <-result; !r {
+			t.Fatal("DrainRegistryInFlight returned false unexpectedly")
+		} else {
+			t.Fatal("DrainRegistryInFlight did not return in time")
+		}
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestDrainRegistryInFlight(t *testing.T) {
svc := &Service{
inFlight: make(map[string]int),
}
svc.registryInFlight.Add(2)
done := make(chan struct{})
go func() {
drained := svc.DrainRegistryInFlight(50 * time.Millisecond)
if !drained {
// Signal failure via done channel by leaving it open — test will time out
return
}
close(done)
}()
time.Sleep(5 * time.Millisecond)
svc.registryInFlight.Add(-1)
svc.registryInFlight.Add(-1)
select {
case <-done:
// good — drained cleanly
case <-time.After(500 * time.Millisecond):
t.Fatal("DrainRegistryInFlight did not return true after requests completed")
}
}
func TestDrainRegistryInFlight(t *testing.T) {
svc := &Service{
inFlight: make(map[string]int),
}
svc.registryInFlight.Add(2)
done := make(chan struct{})
result := make(chan bool, 1)
go func() {
drained := svc.DrainRegistryInFlight(50 * time.Millisecond)
result <- drained
if drained {
close(done)
}
}()
time.Sleep(5 * time.Millisecond)
svc.registryInFlight.Add(-1)
svc.registryInFlight.Add(-1)
select {
case <-done:
// good — drained cleanly
case <-time.After(500 * time.Millisecond):
if r := <-result; !r {
t.Fatal("DrainRegistryInFlight returned false unexpectedly")
} else {
t.Fatal("DrainRegistryInFlight did not return in time")
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/usecase/proxy/service_test.go` around lines 535 - 562, In
TestDrainRegistryInFlight the goroutine leaves the done channel open on failure
causing a long timeout; change it to signal failures immediately by using a
result channel (e.g., result := make(chan bool)) or by calling t.Errorf from
inside the goroutine, then always send a boolean/close the result channel when
DrainRegistryInFlight returns; update the select to check the result and call
t.Fatalf/t.Errorf on false so failures in the goroutine surface immediately —
adjust the TestDrainRegistryInFlight function and the anonymous goroutine that
calls svc.DrainRegistryInFlight (referenced symbols: TestDrainRegistryInFlight,
DrainRegistryInFlight, svc, registryInFlight).


func TestDrainRegistryInFlightTimeout(t *testing.T) {
svc := &Service{
inFlight: make(map[string]int),
}

// Add a request and never release it
svc.registryInFlight.Add(1)

drained := svc.DrainRegistryInFlight(30 * time.Millisecond)
if drained {
t.Fatal("expected DrainRegistryInFlight to return false on timeout, got true")
}
}
Loading