From 69cc4e4ddff60132cd304e12aedc10f77fa62264 Mon Sep 17 00:00:00 2001 From: Jefferson Rodrigues Date: Sat, 21 Mar 2026 16:17:39 -0300 Subject: [PATCH 1/2] feat(tmmongo): add WithSettingsCheckInterval for tenant config revalidation (#376) Ports revalidation logic from tmpostgres.Manager to tmmongo.Manager. Periodically checks tenant config and evicts cached connections for suspended/purged tenants. Default interval: 30s. Includes 11 new tests. X-Lerian-Ref: 0x1 --- commons/tenant-manager/mongo/manager.go | 126 +++++- commons/tenant-manager/mongo/manager_test.go | 447 +++++++++++++++++++ 2 files changed, 564 insertions(+), 9 deletions(-) diff --git a/commons/tenant-manager/mongo/manager.go b/commons/tenant-manager/mongo/manager.go index 75694b3..e3dace6 100644 --- a/commons/tenant-manager/mongo/manager.go +++ b/commons/tenant-manager/mongo/manager.go @@ -31,6 +31,16 @@ import ( // mongoPingTimeout is the maximum duration for MongoDB connection health check pings. const mongoPingTimeout = 3 * time.Second +// defaultSettingsCheckInterval is the default interval between periodic +// connection pool settings revalidation checks. When a cached connection is +// returned by GetConnection and this interval has elapsed since the last check, +// fresh config is fetched from the Tenant Manager asynchronously. +const defaultSettingsCheckInterval = 30 * time.Second + +// settingsRevalidationTimeout is the maximum duration for the HTTP call +// to the Tenant Manager during async settings revalidation. +const settingsRevalidationTimeout = 5 * time.Second + // DefaultMaxConnections is the default max connections for MongoDB. const DefaultMaxConnections uint64 = 100 @@ -69,6 +79,14 @@ type Manager struct { maxConnections int // soft limit for pool size (0 = unlimited) idleTimeout time.Duration // how long before a connection is eligible for eviction lastAccessed map[string]time.Time // LRU tracking per tenant + + lastSettingsCheck map[string]time.Time // tracks per-tenant last settings revalidation time + settingsCheckInterval time.Duration // configurable interval between settings revalidation checks + + // revalidateWG tracks in-flight revalidatePoolSettings goroutines so Close() + // can wait for them to finish before returning. Without this, goroutines + // spawned by GetConnection may access Manager state after Close() returns. + revalidateWG sync.WaitGroup } type MongoConnection struct { @@ -173,6 +191,21 @@ func WithMaxTenantPools(maxSize int) Option { } } +// WithSettingsCheckInterval sets the interval between periodic connection pool settings +// revalidation checks. When GetConnection returns a cached connection and this interval +// has elapsed since the last check for that tenant, fresh config is fetched from the +// Tenant Manager asynchronously. For MongoDB, the driver does not support runtime pool +// resize, but revalidation detects suspended/purged tenants and evicts their connections. +// +// If d <= 0, revalidation is DISABLED (settingsCheckInterval is set to 0). +// When disabled, no async revalidation checks are performed on cache hits. +// Default: 30 seconds (defaultSettingsCheckInterval). +func WithSettingsCheckInterval(d time.Duration) Option { + return func(p *Manager) { + p.settingsCheckInterval = max(d, 0) + } +} + // WithIdleTimeout sets the duration after which an unused tenant connection becomes // eligible for eviction. Only connections idle longer than this duration will be evicted // when the pool exceeds the soft limit (maxConnections). If all connections are active @@ -187,12 +220,14 @@ func WithIdleTimeout(d time.Duration) Option { // NewManager creates a new MongoDB connection manager. func NewManager(c *client.Client, service string, opts ...Option) *Manager { p := &Manager{ - client: c, - service: service, - logger: logcompat.New(nil), - connections: make(map[string]*MongoConnection), - databaseNames: make(map[string]string), - lastAccessed: make(map[string]time.Time), + client: c, + service: service, + logger: logcompat.New(nil), + connections: make(map[string]*MongoConnection), + databaseNames: make(map[string]string), + lastAccessed: make(map[string]time.Time), + lastSettingsCheck: make(map[string]time.Time), + settingsCheckInterval: defaultSettingsCheckInterval, } for _, opt := range opts { @@ -250,11 +285,28 @@ func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Cl // but re-check that the connection was not evicted while we were // pinging (another goroutine may have called CloseConnection, // Close, or evictLRU in the meantime). + now := time.Now() + p.mu.Lock() if _, stillExists := p.connections[tenantID]; stillExists { - p.lastAccessed[tenantID] = time.Now() + p.lastAccessed[tenantID] = now + + // Only revalidate if settingsCheckInterval > 0 (means revalidation is enabled) + shouldRevalidate := p.client != nil && p.settingsCheckInterval > 0 && time.Since(p.lastSettingsCheck[tenantID]) > p.settingsCheckInterval + if shouldRevalidate { + // Update timestamp BEFORE spawning goroutine to prevent multiple + // concurrent revalidation checks for the same tenant. + p.lastSettingsCheck[tenantID] = now + } + p.mu.Unlock() + if shouldRevalidate { + p.revalidateWG.Go(func() { //#nosec G118 -- intentional: revalidatePoolSettings creates its own timeout context; must not use request-scoped context as this outlives the request + p.revalidatePoolSettings(tenantID) + }) + } + return conn.DB, nil } @@ -274,6 +326,51 @@ func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Cl return p.createConnection(ctx, tenantID) } +// revalidatePoolSettings fetches fresh config from the Tenant Manager and detects +// whether the tenant has been suspended or purged. For MongoDB, the driver does not +// support changing pool size after client creation, so this method only checks for +// tenant status changes and evicts the cached connection if the tenant is suspended. +// This runs asynchronously (in a goroutine) and must never block GetConnection. +// If the fetch fails, a warning is logged but the connection remains usable. +func (p *Manager) revalidatePoolSettings(tenantID string) { + // Guard: recover from any panic to avoid crashing the process. + // This goroutine runs asynchronously and must never bring down the service. + defer func() { + if r := recover(); r != nil { + if p.logger != nil { + p.logger.Warnf("recovered from panic during settings revalidation for tenant %s: %v", tenantID, r) + } + } + }() + + revalidateCtx, cancel := context.WithTimeout(context.Background(), settingsRevalidationTimeout) + defer cancel() + + _, err := p.client.GetTenantConfig(revalidateCtx, tenantID, p.service) + if err != nil { + // If tenant service was suspended/purged, evict the cached connection immediately. + // The next request for this tenant will call createConnection, which fetches fresh + // config from the Tenant Manager and receives the 403 error directly. + if core.IsTenantSuspendedError(err) { + if p.logger != nil { + p.logger.Warnf("tenant %s service suspended, evicting cached connection", tenantID) + } + + _ = p.CloseConnection(context.Background(), tenantID) + + return + } + + if p.logger != nil { + p.logger.Warnf("failed to revalidate connection settings for tenant %s: %v", tenantID, err) + } + + return + } + + p.ApplyConnectionSettings(tenantID, nil) +} + // createConnection fetches config from Tenant Manager and creates a MongoDB client. func (p *Manager) createConnection(ctx context.Context, tenantID string) (*mongo.Client, error) { if p.client == nil { @@ -395,6 +492,7 @@ func (p *Manager) removeStaleCacheEntry(tenantID string, cachedConn *MongoConnec delete(p.connections, tenantID) delete(p.databaseNames, tenantID) delete(p.lastAccessed, tenantID) + delete(p.lastSettingsCheck, tenantID) } } @@ -564,6 +662,7 @@ func (p *Manager) evictLRU(ctx context.Context, logger log.Logger) { delete(p.connections, candidateID) delete(p.databaseNames, candidateID) delete(p.lastAccessed, candidateID) + delete(p.lastSettingsCheck, candidateID) } } @@ -642,11 +741,13 @@ func (p *Manager) GetDatabaseForTenant(ctx context.Context, tenantID string) (*m } // Close closes all MongoDB connections. +// It waits for any in-flight revalidatePoolSettings goroutines to finish +// before returning, preventing goroutine leaks and use-after-close races. // // Uses snapshot-then-cleanup to avoid holding the mutex during network I/O // (Disconnect calls), which could block other goroutines on slow networks. func (p *Manager) Close(ctx context.Context) error { - // Step 1: Under lock — mark closed, snapshot all connections, clear maps. + // Phase 1: Under lock — mark closed, snapshot all connections, clear maps. p.mu.Lock() p.closed = true @@ -659,10 +760,11 @@ func (p *Manager) Close(ctx context.Context) error { clear(p.connections) clear(p.databaseNames) clear(p.lastAccessed) + clear(p.lastSettingsCheck) p.mu.Unlock() - // Step 2: Outside lock — disconnect each snapshotted connection. + // Phase 2: Outside lock — disconnect each snapshotted connection. var errs []error for _, conn := range snapshot { @@ -673,6 +775,11 @@ func (p *Manager) Close(ctx context.Context) error { } } + // Phase 3: Wait for in-flight revalidatePoolSettings goroutines OUTSIDE the lock. + // revalidatePoolSettings acquires p.mu internally (via CloseConnection), + // so waiting with the lock held would deadlock. + p.revalidateWG.Wait() + return errors.Join(errs...) } @@ -693,6 +800,7 @@ func (p *Manager) CloseConnection(ctx context.Context, tenantID string) error { delete(p.connections, tenantID) delete(p.databaseNames, tenantID) delete(p.lastAccessed, tenantID) + delete(p.lastSettingsCheck, tenantID) p.mu.Unlock() diff --git a/commons/tenant-manager/mongo/manager_test.go b/commons/tenant-manager/mongo/manager_test.go index c4659c3..77c5bfb 100644 --- a/commons/tenant-manager/mongo/manager_test.go +++ b/commons/tenant-manager/mongo/manager_test.go @@ -10,18 +10,34 @@ import ( "encoding/pem" "fmt" "math/big" + "net/http" + "net/http/httptest" "os" "path/filepath" + "sync/atomic" "testing" "time" "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/client" "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/core" + "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/logcompat" "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// mustNewTestClient creates a test client or fails the test immediately. +// Centralises the repeated client.NewClient + error-check boilerplate. +// Tests use httptest servers (http://), so WithAllowInsecureHTTP is applied. +func mustNewTestClient(t testing.TB, baseURL string) *client.Client { + t.Helper() + + c, err := client.NewClient(baseURL, testutil.NewMockLogger(), client.WithAllowInsecureHTTP(), client.WithServiceAPIKey("test-key")) + require.NoError(t, err) + + return c +} + func TestNewManager(t *testing.T) { t.Run("creates manager with client and service", func(t *testing.T) { c := &client.Client{} @@ -1123,3 +1139,434 @@ func TestBuildTLSConfigFromFiles(t *testing.T) { assert.Contains(t, err.Error(), "failed to parse CA certificate") }) } + +func TestManager_WithSettingsCheckInterval_Option(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + interval time.Duration + expectedInterval time.Duration + }{ + { + name: "sets custom settings check interval", + interval: 1 * time.Minute, + expectedInterval: 1 * time.Minute, + }, + { + name: "sets short settings check interval", + interval: 5 * time.Second, + expectedInterval: 5 * time.Second, + }, + { + name: "disables revalidation with zero duration", + interval: 0, + expectedInterval: 0, + }, + { + name: "disables revalidation with negative duration", + interval: -1 * time.Second, + expectedInterval: 0, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger", + WithSettingsCheckInterval(tt.interval), + ) + + assert.Equal(t, tt.expectedInterval, manager.settingsCheckInterval) + }) + } +} + +func TestManager_DefaultSettingsCheckInterval(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger") + + assert.Equal(t, defaultSettingsCheckInterval, manager.settingsCheckInterval, + "default settings check interval should be set from named constant") + assert.NotNil(t, manager.lastSettingsCheck, + "lastSettingsCheck map should be initialized") +} + +func TestManager_GetConnection_RevalidatesSettingsAfterInterval(t *testing.T) { + t.Parallel() + + var callCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&callCount, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "id": "tenant-123", + "tenantSlug": "test-tenant", + "databases": { + "onboarding": { + "mongodb": {"host": "localhost", "port": 27017, "database": "testdb", "username": "user", "password": "pass"} + } + } + }`)) + })) + defer server.Close() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "ledger", + WithLogger(testutil.NewMockLogger()), + WithModule("onboarding"), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + // Pre-populate cache with a healthy connection (nil DB to avoid real MongoDB). + // GetConnection with a nil DB triggers reconnect, so we need to test via + // the revalidation path specifically. We use a connection that has a non-nil + // DB field but is a mock. Since we can't mock mongo.Client's Ping, we test + // the revalidation path directly via revalidatePoolSettings. + cachedConn := &MongoConnection{DB: nil} + manager.connections["tenant-123"] = cachedConn + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) + + // Trigger revalidation directly (mirrors postgres test pattern for EvictsSuspendedTenant) + manager.revalidatePoolSettings("tenant-123") + + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&callCount) > 0 + }, 500*time.Millisecond, 20*time.Millisecond, "should have fetched fresh config from Tenant Manager") +} + +func TestManager_GetConnection_DisabledRevalidation_WithZero(t *testing.T) { + t.Parallel() + + var callCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&callCount, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "id": "tenant-123", + "tenantSlug": "test-tenant", + "databases": { + "onboarding": { + "mongodb": {"host": "localhost", "port": 27017, "database": "testdb"} + } + } + }`)) + })) + defer server.Close() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "ledger", + WithLogger(testutil.NewMockLogger()), + WithModule("onboarding"), + WithSettingsCheckInterval(0), + ) + + // Verify revalidation is disabled + assert.Equal(t, time.Duration(0), manager.settingsCheckInterval) + + // Pre-populate cache with a connection (nil DB) + cachedConn := &MongoConnection{DB: nil} + manager.connections["tenant-123"] = cachedConn + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) + + // Simulate the revalidation check logic (same as in GetConnection) + manager.mu.Lock() + shouldRevalidate := manager.client != nil && manager.settingsCheckInterval > 0 && time.Since(manager.lastSettingsCheck["tenant-123"]) > manager.settingsCheckInterval + manager.mu.Unlock() + + assert.False(t, shouldRevalidate, "should NOT trigger revalidation when interval is zero") + + // Wait to ensure no async goroutine fires + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, int32(0), atomic.LoadInt32(&callCount), "should NOT have fetched config - revalidation is disabled") +} + +func TestManager_GetConnection_DisabledRevalidation_WithNegative(t *testing.T) { + t.Parallel() + + var callCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&callCount, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) + })) + defer server.Close() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "payment", + WithLogger(testutil.NewMockLogger()), + WithModule("payment"), + WithSettingsCheckInterval(-5*time.Second), + ) + + // Verify negative was clamped to zero + assert.Equal(t, time.Duration(0), manager.settingsCheckInterval) + + // Pre-populate cache + cachedConn := &MongoConnection{DB: nil} + manager.connections["tenant-456"] = cachedConn + manager.lastAccessed["tenant-456"] = time.Now() + manager.lastSettingsCheck["tenant-456"] = time.Now().Add(-1 * time.Hour) + + // Simulate the revalidation check logic + manager.mu.Lock() + shouldRevalidate := manager.client != nil && manager.settingsCheckInterval > 0 && time.Since(manager.lastSettingsCheck["tenant-456"]) > manager.settingsCheckInterval + manager.mu.Unlock() + + assert.False(t, shouldRevalidate, "should NOT trigger revalidation when interval is negative (clamped to zero)") + + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, int32(0), atomic.LoadInt32(&callCount), "should NOT have fetched config - revalidation is disabled via negative interval") +} + +func TestManager_RevalidateSettings_EvictsSuspendedTenant(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + responseStatus int + responseBody string + expectEviction bool + expectLogSubstring string + }{ + { + name: "evicts_cached_connection_when_tenant_is_suspended", + responseStatus: http.StatusForbidden, + responseBody: `{"code":"TS-SUSPENDED","error":"service suspended","status":"suspended"}`, + expectEviction: true, + expectLogSubstring: "tenant tenant-suspended service suspended, evicting cached connection", + }, + { + name: "evicts_cached_connection_when_tenant_is_purged", + responseStatus: http.StatusForbidden, + responseBody: `{"code":"TS-SUSPENDED","error":"service purged","status":"purged"}`, + expectEviction: true, + expectLogSubstring: "tenant tenant-suspended service suspended, evicting cached connection", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tt.responseStatus) + w.Write([]byte(tt.responseBody)) + })) + defer server.Close() + + capLogger := testutil.NewCapturingLogger() + tmClient, err := client.NewClient(server.URL, capLogger, client.WithAllowInsecureHTTP(), client.WithServiceAPIKey("test-key")) + require.NoError(t, err) + manager := NewManager(tmClient, "ledger", + WithLogger(capLogger), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + // Pre-populate a cached connection for the tenant (nil DB to avoid real MongoDB) + manager.connections["tenant-suspended"] = &MongoConnection{DB: nil} + manager.lastAccessed["tenant-suspended"] = time.Now() + manager.lastSettingsCheck["tenant-suspended"] = time.Now() + + // Verify the connection exists before revalidation + statsBefore := manager.Stats() + assert.Equal(t, 1, statsBefore.TotalConnections, + "should have 1 connection before revalidation") + + // Trigger revalidatePoolSettings directly + manager.revalidatePoolSettings("tenant-suspended") + + if tt.expectEviction { + // Verify the connection was evicted + statsAfter := manager.Stats() + assert.Equal(t, 0, statsAfter.TotalConnections, + "connection should be evicted after suspended tenant detected") + + // Verify lastAccessed and lastSettingsCheck were cleaned up + manager.mu.RLock() + _, accessExists := manager.lastAccessed["tenant-suspended"] + _, settingsExists := manager.lastSettingsCheck["tenant-suspended"] + manager.mu.RUnlock() + + assert.False(t, accessExists, + "lastAccessed should be removed for evicted tenant") + assert.False(t, settingsExists, + "lastSettingsCheck should be removed for evicted tenant") + } + + // Verify the appropriate log message was produced + assert.True(t, capLogger.ContainsSubstring(tt.expectLogSubstring), + "expected log message containing %q, got: %v", + tt.expectLogSubstring, capLogger.GetMessages()) + }) + } +} + +func TestManager_RevalidateSettings_FailedDoesNotBreakConnection(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + capLogger := testutil.NewCapturingLogger() + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "ledger", + WithLogger(capLogger), + WithModule("onboarding"), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + // Pre-populate cache + manager.connections["tenant-123"] = &MongoConnection{DB: nil} + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) + + // Trigger revalidation directly - should fail but not evict + manager.revalidatePoolSettings("tenant-123") + + // Connection should still exist (not evicted on transient failure) + stats := manager.Stats() + assert.Equal(t, 1, stats.TotalConnections, + "connection should NOT be evicted after transient revalidation failure") + + // Verify a warning was logged + assert.True(t, capLogger.ContainsSubstring("failed to revalidate connection settings"), + "should log a warning when revalidation fails") +} + +func TestManager_RevalidateSettings_RecoverFromPanic(t *testing.T) { + t.Parallel() + + capLogger := testutil.NewCapturingLogger() + + // Create a manager with nil client to trigger a panic path + manager := &Manager{ + logger: logcompat.New(capLogger), + connections: make(map[string]*MongoConnection), + databaseNames: make(map[string]string), + lastAccessed: make(map[string]time.Time), + lastSettingsCheck: make(map[string]time.Time), + settingsCheckInterval: 1 * time.Millisecond, + } + + // Should not panic -- the recovery handler should catch it + assert.NotPanics(t, func() { + manager.revalidatePoolSettings("tenant-panic") + }) +} + +func TestManager_CloseConnection_CleansUpLastSettingsCheck(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger", + WithLogger(testutil.NewMockLogger()), + ) + + // Pre-populate cache + manager.connections["tenant-123"] = &MongoConnection{DB: nil} + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now() + + err := manager.CloseConnection(context.Background(), "tenant-123") + + require.NoError(t, err) + + manager.mu.RLock() + _, connExists := manager.connections["tenant-123"] + _, accessExists := manager.lastAccessed["tenant-123"] + _, settingsCheckExists := manager.lastSettingsCheck["tenant-123"] + manager.mu.RUnlock() + + assert.False(t, connExists, "connection should be removed after CloseConnection") + assert.False(t, accessExists, "lastAccessed should be removed after CloseConnection") + assert.False(t, settingsCheckExists, "lastSettingsCheck should be removed after CloseConnection") +} + +func TestManager_Close_CleansUpLastSettingsCheck(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger", + WithLogger(testutil.NewMockLogger()), + ) + + // Pre-populate cache with multiple tenants + for _, id := range []string{"tenant-1", "tenant-2"} { + manager.connections[id] = &MongoConnection{DB: nil} + manager.lastAccessed[id] = time.Now() + manager.lastSettingsCheck[id] = time.Now() + } + + err := manager.Close(context.Background()) + + require.NoError(t, err) + + assert.Empty(t, manager.connections, "all connections should be removed after Close") + assert.Empty(t, manager.lastAccessed, "all lastAccessed should be removed after Close") + assert.Empty(t, manager.lastSettingsCheck, "all lastSettingsCheck should be removed after Close") +} + +func TestManager_Close_WaitsForRevalidateSettings(t *testing.T) { + t.Parallel() + + // Create a slow HTTP server that simulates a Tenant Manager responding after a delay. + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + time.Sleep(300 * time.Millisecond) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "id": "tenant-slow", + "tenantSlug": "slow-tenant", + "databases": { + "onboarding": { + "mongodb": {"host": "localhost", "port": 27017, "database": "testdb", "username": "user", "password": "pass"} + } + } + }`)) + })) + defer server.Close() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "test-service", + WithLogger(testutil.NewMockLogger()), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + // Pre-populate cache + manager.connections["tenant-slow"] = &MongoConnection{DB: nil} + manager.lastAccessed["tenant-slow"] = time.Now() + manager.lastSettingsCheck["tenant-slow"] = time.Time{} + + // Spawn the revalidation goroutine via the WaitGroup + manager.revalidateWG.Go(func() { + manager.revalidatePoolSettings("tenant-slow") + }) + + // Close immediately -- the revalidation goroutine is still blocked on the + // slow HTTP server. With the fix, Close() waits for it to finish. + err := manager.Close(context.Background()) + require.NoError(t, err) + + // If Close() properly waited, no goroutines should be leaked. + // We verify by checking the manager is fully closed and maps are cleared. + assert.True(t, manager.closed, "manager should be closed") + assert.Empty(t, manager.connections, "connections should be cleared after Close") +} From 728fe42fedb42be3280e8395e409da952bb015a1 Mon Sep 17 00:00:00 2001 From: Jefferson Rodrigues Date: Sat, 21 Mar 2026 16:46:26 -0300 Subject: [PATCH 2/2] =?UTF-8?q?fix(tmmongo):=20address=20code=20review=20f?= =?UTF-8?q?indings=20=E2=80=94=20race,=20context=20timeout,=20stale=20chec?= =?UTF-8?q?k,=20goleak?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Register revalidation goroutine in WaitGroup before releasing mutex (race fix). 2. Use bounded context for CloseConnection in eviction (prevents hang). 3. Check connection identity before revalidation (stale connection fix). 4. Tests use GetConnection instead of calling revalidatePoolSettings directly. 5. Added goleak verification to detect goroutine leaks. X-Lerian-Ref: 0x1 --- commons/tenant-manager/mongo/manager.go | 19 +- commons/tenant-manager/mongo/manager_test.go | 188 ++++++++++++++----- 2 files changed, 157 insertions(+), 50 deletions(-) diff --git a/commons/tenant-manager/mongo/manager.go b/commons/tenant-manager/mongo/manager.go index e3dace6..f2c7751 100644 --- a/commons/tenant-manager/mongo/manager.go +++ b/commons/tenant-manager/mongo/manager.go @@ -241,7 +241,7 @@ func NewManager(c *client.Client, service string, opts ...Option) *Manager { // If a cached client fails a health check (e.g., due to credential rotation // after a tenant purge+re-associate), the stale client is evicted and a new // one is created with fresh credentials from the Tenant Manager. -func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Client, error) { +func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Client, error) { //nolint:gocognit // complexity from connection lifecycle (ping, revalidate, evict) is inherent if ctx == nil { ctx = context.Background() } @@ -288,23 +288,23 @@ func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Cl now := time.Now() p.mu.Lock() - if _, stillExists := p.connections[tenantID]; stillExists { + if current, stillExists := p.connections[tenantID]; stillExists && current == conn { p.lastAccessed[tenantID] = now - // Only revalidate if settingsCheckInterval > 0 (means revalidation is enabled) shouldRevalidate := p.client != nil && p.settingsCheckInterval > 0 && time.Since(p.lastSettingsCheck[tenantID]) > p.settingsCheckInterval if shouldRevalidate { - // Update timestamp BEFORE spawning goroutine to prevent multiple - // concurrent revalidation checks for the same tenant. p.lastSettingsCheck[tenantID] = now + p.revalidateWG.Add(1) } p.mu.Unlock() if shouldRevalidate { - p.revalidateWG.Go(func() { //#nosec G118 -- intentional: revalidatePoolSettings creates its own timeout context; must not use request-scoped context as this outlives the request + go func() { //#nosec G118 -- intentional: revalidatePoolSettings creates its own timeout context; must not use request-scoped context as this outlives the request + defer p.revalidateWG.Done() + p.revalidatePoolSettings(tenantID) - }) + }() } return conn.DB, nil @@ -356,7 +356,10 @@ func (p *Manager) revalidatePoolSettings(tenantID string) { p.logger.Warnf("tenant %s service suspended, evicting cached connection", tenantID) } - _ = p.CloseConnection(context.Background(), tenantID) + evictCtx, evictCancel := context.WithTimeout(context.Background(), settingsRevalidationTimeout) + defer evictCancel() + + _ = p.CloseConnection(evictCtx, tenantID) return } diff --git a/commons/tenant-manager/mongo/manager_test.go b/commons/tenant-manager/mongo/manager_test.go index 77c5bfb..b768c96 100644 --- a/commons/tenant-manager/mongo/manager_test.go +++ b/commons/tenant-manager/mongo/manager_test.go @@ -7,9 +7,12 @@ import ( "crypto/rand" "crypto/x509" "crypto/x509/pkix" + "encoding/binary" "encoding/pem" "fmt" + "io" "math/big" + "net" "net/http" "net/http/httptest" "os" @@ -24,8 +27,106 @@ import ( "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/goleak" ) +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, + goleak.IgnoreTopFunction("github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/cache.(*InMemoryCache).cleanupLoop"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("net/http.(*persistConn).readLoop"), + ) +} + +func startFakeMongoServer(t *testing.T) (*mongo.Client, func()) { + t.Helper() + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + go func() { + for { + conn, acceptErr := ln.Accept() + if acceptErr != nil { + return + } + + go serveFakeMongoConn(conn) + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + addr := ln.Addr().String() + + mongoClient, err := mongo.Connect(ctx, options.Client(). + ApplyURI(fmt.Sprintf("mongodb://%s/?directConnection=true", addr)). + SetServerSelectionTimeout(2*time.Second)) + require.NoError(t, err) + + require.NoError(t, mongoClient.Ping(ctx, nil)) + + cleanup := func() { + _ = mongoClient.Disconnect(context.Background()) + ln.Close() + } + + return mongoClient, cleanup +} + +func serveFakeMongoConn(conn net.Conn) { + defer conn.Close() + + for { + header := make([]byte, 16) + if _, err := io.ReadFull(conn, header); err != nil { + return + } + + msgLen := int(binary.LittleEndian.Uint32(header[0:4])) + reqID := binary.LittleEndian.Uint32(header[4:8]) + + body := make([]byte, msgLen-16) + if _, err := io.ReadFull(conn, body); err != nil { + return + } + + resp := bson.D{ + {Key: "ismaster", Value: true}, + {Key: "ok", Value: 1.0}, + {Key: "maxWireVersion", Value: int32(21)}, + {Key: "minWireVersion", Value: int32(0)}, + {Key: "maxBsonObjectSize", Value: int32(16777216)}, + {Key: "maxMessageSizeBytes", Value: int32(48000000)}, + {Key: "maxWriteBatchSize", Value: int32(100000)}, + {Key: "localTime", Value: time.Now()}, + {Key: "connectionId", Value: int32(1)}, + } + + respBytes, _ := bson.Marshal(resp) + + var payload []byte + payload = append(payload, 0, 0, 0, 0) + payload = append(payload, 0) + payload = append(payload, respBytes...) + + totalLen := uint32(16 + len(payload)) + respHeader := make([]byte, 16) + binary.LittleEndian.PutUint32(respHeader[0:4], totalLen) + binary.LittleEndian.PutUint32(respHeader[4:8], reqID+1) + binary.LittleEndian.PutUint32(respHeader[8:12], reqID) + binary.LittleEndian.PutUint32(respHeader[12:16], 2013) + + _, _ = conn.Write(respHeader) + _, _ = conn.Write(payload) + } +} + // mustNewTestClient creates a test client or fails the test immediately. // Centralises the repeated client.NewClient + error-check boilerplate. // Tests use httptest servers (http://), so WithAllowInsecureHTTP is applied. @@ -1217,6 +1318,9 @@ func TestManager_GetConnection_RevalidatesSettingsAfterInterval(t *testing.T) { })) defer server.Close() + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + tmClient := mustNewTestClient(t, server.URL) manager := NewManager(tmClient, "ledger", WithLogger(testutil.NewMockLogger()), @@ -1224,22 +1328,27 @@ func TestManager_GetConnection_RevalidatesSettingsAfterInterval(t *testing.T) { WithSettingsCheckInterval(1*time.Millisecond), ) - // Pre-populate cache with a healthy connection (nil DB to avoid real MongoDB). - // GetConnection with a nil DB triggers reconnect, so we need to test via - // the revalidation path specifically. We use a connection that has a non-nil - // DB field but is a mock. Since we can't mock mongo.Client's Ping, we test - // the revalidation path directly via revalidatePoolSettings. - cachedConn := &MongoConnection{DB: nil} + cachedConn := &MongoConnection{DB: fakeDB} manager.connections["tenant-123"] = cachedConn manager.lastAccessed["tenant-123"] = time.Now() manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) - // Trigger revalidation directly (mirrors postgres test pattern for EvictsSuspendedTenant) - manager.revalidatePoolSettings("tenant-123") + db, err := manager.GetConnection(context.Background(), "tenant-123") + require.NoError(t, err) + assert.Equal(t, fakeDB, db) assert.Eventually(t, func() bool { return atomic.LoadInt32(&callCount) > 0 - }, 500*time.Millisecond, 20*time.Millisecond, "should have fetched fresh config from Tenant Manager") + }, 500*time.Millisecond, 10*time.Millisecond, "should have fetched fresh config from Tenant Manager") + + manager.mu.RLock() + lastCheck := manager.lastSettingsCheck["tenant-123"] + manager.mu.RUnlock() + + assert.False(t, lastCheck.IsZero(), "lastSettingsCheck should have been updated") + + manager.revalidateWG.Wait() + require.NoError(t, manager.Close(context.Background())) } func TestManager_GetConnection_DisabledRevalidation_WithZero(t *testing.T) { @@ -1262,6 +1371,9 @@ func TestManager_GetConnection_DisabledRevalidation_WithZero(t *testing.T) { })) defer server.Close() + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + tmClient := mustNewTestClient(t, server.URL) manager := NewManager(tmClient, "ledger", WithLogger(testutil.NewMockLogger()), @@ -1269,26 +1381,22 @@ func TestManager_GetConnection_DisabledRevalidation_WithZero(t *testing.T) { WithSettingsCheckInterval(0), ) - // Verify revalidation is disabled assert.Equal(t, time.Duration(0), manager.settingsCheckInterval) - // Pre-populate cache with a connection (nil DB) - cachedConn := &MongoConnection{DB: nil} + cachedConn := &MongoConnection{DB: fakeDB} manager.connections["tenant-123"] = cachedConn manager.lastAccessed["tenant-123"] = time.Now() manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) - // Simulate the revalidation check logic (same as in GetConnection) - manager.mu.Lock() - shouldRevalidate := manager.client != nil && manager.settingsCheckInterval > 0 && time.Since(manager.lastSettingsCheck["tenant-123"]) > manager.settingsCheckInterval - manager.mu.Unlock() - - assert.False(t, shouldRevalidate, "should NOT trigger revalidation when interval is zero") + db, err := manager.GetConnection(context.Background(), "tenant-123") + require.NoError(t, err) + assert.Equal(t, fakeDB, db) - // Wait to ensure no async goroutine fires - time.Sleep(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) assert.Equal(t, int32(0), atomic.LoadInt32(&callCount), "should NOT have fetched config - revalidation is disabled") + + require.NoError(t, manager.Close(context.Background())) } func TestManager_GetConnection_DisabledRevalidation_WithNegative(t *testing.T) { @@ -1303,6 +1411,9 @@ func TestManager_GetConnection_DisabledRevalidation_WithNegative(t *testing.T) { })) defer server.Close() + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + tmClient := mustNewTestClient(t, server.URL) manager := NewManager(tmClient, "payment", WithLogger(testutil.NewMockLogger()), @@ -1310,25 +1421,22 @@ func TestManager_GetConnection_DisabledRevalidation_WithNegative(t *testing.T) { WithSettingsCheckInterval(-5*time.Second), ) - // Verify negative was clamped to zero assert.Equal(t, time.Duration(0), manager.settingsCheckInterval) - // Pre-populate cache - cachedConn := &MongoConnection{DB: nil} + cachedConn := &MongoConnection{DB: fakeDB} manager.connections["tenant-456"] = cachedConn manager.lastAccessed["tenant-456"] = time.Now() manager.lastSettingsCheck["tenant-456"] = time.Now().Add(-1 * time.Hour) - // Simulate the revalidation check logic - manager.mu.Lock() - shouldRevalidate := manager.client != nil && manager.settingsCheckInterval > 0 && time.Since(manager.lastSettingsCheck["tenant-456"]) > manager.settingsCheckInterval - manager.mu.Unlock() - - assert.False(t, shouldRevalidate, "should NOT trigger revalidation when interval is negative (clamped to zero)") + db, err := manager.GetConnection(context.Background(), "tenant-456") + require.NoError(t, err) + assert.Equal(t, fakeDB, db) - time.Sleep(100 * time.Millisecond) + time.Sleep(50 * time.Millisecond) assert.Equal(t, int32(0), atomic.LoadInt32(&callCount), "should NOT have fetched config - revalidation is disabled via negative interval") + + require.NoError(t, manager.Close(context.Background())) } func TestManager_RevalidateSettings_EvictsSuspendedTenant(t *testing.T) { @@ -1526,7 +1634,6 @@ func TestManager_Close_CleansUpLastSettingsCheck(t *testing.T) { func TestManager_Close_WaitsForRevalidateSettings(t *testing.T) { t.Parallel() - // Create a slow HTTP server that simulates a Tenant Manager responding after a delay. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { time.Sleep(300 * time.Millisecond) @@ -1544,29 +1651,26 @@ func TestManager_Close_WaitsForRevalidateSettings(t *testing.T) { })) defer server.Close() + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + tmClient := mustNewTestClient(t, server.URL) manager := NewManager(tmClient, "test-service", WithLogger(testutil.NewMockLogger()), WithSettingsCheckInterval(1*time.Millisecond), ) - // Pre-populate cache - manager.connections["tenant-slow"] = &MongoConnection{DB: nil} + cachedConn := &MongoConnection{DB: fakeDB} + manager.connections["tenant-slow"] = cachedConn manager.lastAccessed["tenant-slow"] = time.Now() manager.lastSettingsCheck["tenant-slow"] = time.Time{} - // Spawn the revalidation goroutine via the WaitGroup - manager.revalidateWG.Go(func() { - manager.revalidatePoolSettings("tenant-slow") - }) + _, err := manager.GetConnection(context.Background(), "tenant-slow") + require.NoError(t, err) - // Close immediately -- the revalidation goroutine is still blocked on the - // slow HTTP server. With the fix, Close() waits for it to finish. - err := manager.Close(context.Background()) + err = manager.Close(context.Background()) require.NoError(t, err) - // If Close() properly waited, no goroutines should be leaked. - // We verify by checking the manager is fully closed and maps are cleared. assert.True(t, manager.closed, "manager should be closed") assert.Empty(t, manager.connections, "connections should be cleared after Close") }