diff --git a/commons/tenant-manager/mongo/manager.go b/commons/tenant-manager/mongo/manager.go index 75694b3..f2c7751 100644 --- a/commons/tenant-manager/mongo/manager.go +++ b/commons/tenant-manager/mongo/manager.go @@ -31,6 +31,16 @@ import ( // mongoPingTimeout is the maximum duration for MongoDB connection health check pings. const mongoPingTimeout = 3 * time.Second +// defaultSettingsCheckInterval is the default interval between periodic +// connection pool settings revalidation checks. When a cached connection is +// returned by GetConnection and this interval has elapsed since the last check, +// fresh config is fetched from the Tenant Manager asynchronously. +const defaultSettingsCheckInterval = 30 * time.Second + +// settingsRevalidationTimeout is the maximum duration for the HTTP call +// to the Tenant Manager during async settings revalidation. +const settingsRevalidationTimeout = 5 * time.Second + // DefaultMaxConnections is the default max connections for MongoDB. const DefaultMaxConnections uint64 = 100 @@ -69,6 +79,14 @@ type Manager struct { maxConnections int // soft limit for pool size (0 = unlimited) idleTimeout time.Duration // how long before a connection is eligible for eviction lastAccessed map[string]time.Time // LRU tracking per tenant + + lastSettingsCheck map[string]time.Time // tracks per-tenant last settings revalidation time + settingsCheckInterval time.Duration // configurable interval between settings revalidation checks + + // revalidateWG tracks in-flight revalidatePoolSettings goroutines so Close() + // can wait for them to finish before returning. Without this, goroutines + // spawned by GetConnection may access Manager state after Close() returns. + revalidateWG sync.WaitGroup } type MongoConnection struct { @@ -173,6 +191,21 @@ func WithMaxTenantPools(maxSize int) Option { } } +// WithSettingsCheckInterval sets the interval between periodic connection pool settings +// revalidation checks. When GetConnection returns a cached connection and this interval +// has elapsed since the last check for that tenant, fresh config is fetched from the +// Tenant Manager asynchronously. For MongoDB, the driver does not support runtime pool +// resize, but revalidation detects suspended/purged tenants and evicts their connections. +// +// If d <= 0, revalidation is DISABLED (settingsCheckInterval is set to 0). +// When disabled, no async revalidation checks are performed on cache hits. +// Default: 30 seconds (defaultSettingsCheckInterval). +func WithSettingsCheckInterval(d time.Duration) Option { + return func(p *Manager) { + p.settingsCheckInterval = max(d, 0) + } +} + // WithIdleTimeout sets the duration after which an unused tenant connection becomes // eligible for eviction. Only connections idle longer than this duration will be evicted // when the pool exceeds the soft limit (maxConnections). If all connections are active @@ -187,12 +220,14 @@ func WithIdleTimeout(d time.Duration) Option { // NewManager creates a new MongoDB connection manager. func NewManager(c *client.Client, service string, opts ...Option) *Manager { p := &Manager{ - client: c, - service: service, - logger: logcompat.New(nil), - connections: make(map[string]*MongoConnection), - databaseNames: make(map[string]string), - lastAccessed: make(map[string]time.Time), + client: c, + service: service, + logger: logcompat.New(nil), + connections: make(map[string]*MongoConnection), + databaseNames: make(map[string]string), + lastAccessed: make(map[string]time.Time), + lastSettingsCheck: make(map[string]time.Time), + settingsCheckInterval: defaultSettingsCheckInterval, } for _, opt := range opts { @@ -206,7 +241,7 @@ func NewManager(c *client.Client, service string, opts ...Option) *Manager { // If a cached client fails a health check (e.g., due to credential rotation // after a tenant purge+re-associate), the stale client is evicted and a new // one is created with fresh credentials from the Tenant Manager. -func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Client, error) { +func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Client, error) { //nolint:gocognit // complexity from connection lifecycle (ping, revalidate, evict) is inherent if ctx == nil { ctx = context.Background() } @@ -250,11 +285,28 @@ func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Cl // but re-check that the connection was not evicted while we were // pinging (another goroutine may have called CloseConnection, // Close, or evictLRU in the meantime). + now := time.Now() + p.mu.Lock() - if _, stillExists := p.connections[tenantID]; stillExists { - p.lastAccessed[tenantID] = time.Now() + if current, stillExists := p.connections[tenantID]; stillExists && current == conn { + p.lastAccessed[tenantID] = now + + shouldRevalidate := p.client != nil && p.settingsCheckInterval > 0 && time.Since(p.lastSettingsCheck[tenantID]) > p.settingsCheckInterval + if shouldRevalidate { + p.lastSettingsCheck[tenantID] = now + p.revalidateWG.Add(1) + } + p.mu.Unlock() + if shouldRevalidate { + go func() { //#nosec G118 -- intentional: revalidatePoolSettings creates its own timeout context; must not use request-scoped context as this outlives the request + defer p.revalidateWG.Done() + + p.revalidatePoolSettings(tenantID) + }() + } + return conn.DB, nil } @@ -274,6 +326,54 @@ func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Cl return p.createConnection(ctx, tenantID) } +// revalidatePoolSettings fetches fresh config from the Tenant Manager and detects +// whether the tenant has been suspended or purged. For MongoDB, the driver does not +// support changing pool size after client creation, so this method only checks for +// tenant status changes and evicts the cached connection if the tenant is suspended. +// This runs asynchronously (in a goroutine) and must never block GetConnection. +// If the fetch fails, a warning is logged but the connection remains usable. +func (p *Manager) revalidatePoolSettings(tenantID string) { + // Guard: recover from any panic to avoid crashing the process. + // This goroutine runs asynchronously and must never bring down the service. + defer func() { + if r := recover(); r != nil { + if p.logger != nil { + p.logger.Warnf("recovered from panic during settings revalidation for tenant %s: %v", tenantID, r) + } + } + }() + + revalidateCtx, cancel := context.WithTimeout(context.Background(), settingsRevalidationTimeout) + defer cancel() + + _, err := p.client.GetTenantConfig(revalidateCtx, tenantID, p.service) + if err != nil { + // If tenant service was suspended/purged, evict the cached connection immediately. + // The next request for this tenant will call createConnection, which fetches fresh + // config from the Tenant Manager and receives the 403 error directly. + if core.IsTenantSuspendedError(err) { + if p.logger != nil { + p.logger.Warnf("tenant %s service suspended, evicting cached connection", tenantID) + } + + evictCtx, evictCancel := context.WithTimeout(context.Background(), settingsRevalidationTimeout) + defer evictCancel() + + _ = p.CloseConnection(evictCtx, tenantID) + + return + } + + if p.logger != nil { + p.logger.Warnf("failed to revalidate connection settings for tenant %s: %v", tenantID, err) + } + + return + } + + p.ApplyConnectionSettings(tenantID, nil) +} + // createConnection fetches config from Tenant Manager and creates a MongoDB client. func (p *Manager) createConnection(ctx context.Context, tenantID string) (*mongo.Client, error) { if p.client == nil { @@ -395,6 +495,7 @@ func (p *Manager) removeStaleCacheEntry(tenantID string, cachedConn *MongoConnec delete(p.connections, tenantID) delete(p.databaseNames, tenantID) delete(p.lastAccessed, tenantID) + delete(p.lastSettingsCheck, tenantID) } } @@ -564,6 +665,7 @@ func (p *Manager) evictLRU(ctx context.Context, logger log.Logger) { delete(p.connections, candidateID) delete(p.databaseNames, candidateID) delete(p.lastAccessed, candidateID) + delete(p.lastSettingsCheck, candidateID) } } @@ -642,11 +744,13 @@ func (p *Manager) GetDatabaseForTenant(ctx context.Context, tenantID string) (*m } // Close closes all MongoDB connections. +// It waits for any in-flight revalidatePoolSettings goroutines to finish +// before returning, preventing goroutine leaks and use-after-close races. // // Uses snapshot-then-cleanup to avoid holding the mutex during network I/O // (Disconnect calls), which could block other goroutines on slow networks. func (p *Manager) Close(ctx context.Context) error { - // Step 1: Under lock — mark closed, snapshot all connections, clear maps. + // Phase 1: Under lock — mark closed, snapshot all connections, clear maps. p.mu.Lock() p.closed = true @@ -659,10 +763,11 @@ func (p *Manager) Close(ctx context.Context) error { clear(p.connections) clear(p.databaseNames) clear(p.lastAccessed) + clear(p.lastSettingsCheck) p.mu.Unlock() - // Step 2: Outside lock — disconnect each snapshotted connection. + // Phase 2: Outside lock — disconnect each snapshotted connection. var errs []error for _, conn := range snapshot { @@ -673,6 +778,11 @@ func (p *Manager) Close(ctx context.Context) error { } } + // Phase 3: Wait for in-flight revalidatePoolSettings goroutines OUTSIDE the lock. + // revalidatePoolSettings acquires p.mu internally (via CloseConnection), + // so waiting with the lock held would deadlock. + p.revalidateWG.Wait() + return errors.Join(errs...) } @@ -693,6 +803,7 @@ func (p *Manager) CloseConnection(ctx context.Context, tenantID string) error { delete(p.connections, tenantID) delete(p.databaseNames, tenantID) delete(p.lastAccessed, tenantID) + delete(p.lastSettingsCheck, tenantID) p.mu.Unlock() diff --git a/commons/tenant-manager/mongo/manager_test.go b/commons/tenant-manager/mongo/manager_test.go index c4659c3..b768c96 100644 --- a/commons/tenant-manager/mongo/manager_test.go +++ b/commons/tenant-manager/mongo/manager_test.go @@ -7,21 +7,138 @@ import ( "crypto/rand" "crypto/x509" "crypto/x509/pkix" + "encoding/binary" "encoding/pem" "fmt" + "io" "math/big" + "net" + "net/http" + "net/http/httptest" "os" "path/filepath" + "sync/atomic" "testing" "time" "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/client" "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/core" + "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/logcompat" "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/goleak" ) +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, + goleak.IgnoreTopFunction("github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/cache.(*InMemoryCache).cleanupLoop"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("net/http.(*persistConn).readLoop"), + ) +} + +func startFakeMongoServer(t *testing.T) (*mongo.Client, func()) { + t.Helper() + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + go func() { + for { + conn, acceptErr := ln.Accept() + if acceptErr != nil { + return + } + + go serveFakeMongoConn(conn) + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + addr := ln.Addr().String() + + mongoClient, err := mongo.Connect(ctx, options.Client(). + ApplyURI(fmt.Sprintf("mongodb://%s/?directConnection=true", addr)). + SetServerSelectionTimeout(2*time.Second)) + require.NoError(t, err) + + require.NoError(t, mongoClient.Ping(ctx, nil)) + + cleanup := func() { + _ = mongoClient.Disconnect(context.Background()) + ln.Close() + } + + return mongoClient, cleanup +} + +func serveFakeMongoConn(conn net.Conn) { + defer conn.Close() + + for { + header := make([]byte, 16) + if _, err := io.ReadFull(conn, header); err != nil { + return + } + + msgLen := int(binary.LittleEndian.Uint32(header[0:4])) + reqID := binary.LittleEndian.Uint32(header[4:8]) + + body := make([]byte, msgLen-16) + if _, err := io.ReadFull(conn, body); err != nil { + return + } + + resp := bson.D{ + {Key: "ismaster", Value: true}, + {Key: "ok", Value: 1.0}, + {Key: "maxWireVersion", Value: int32(21)}, + {Key: "minWireVersion", Value: int32(0)}, + {Key: "maxBsonObjectSize", Value: int32(16777216)}, + {Key: "maxMessageSizeBytes", Value: int32(48000000)}, + {Key: "maxWriteBatchSize", Value: int32(100000)}, + {Key: "localTime", Value: time.Now()}, + {Key: "connectionId", Value: int32(1)}, + } + + respBytes, _ := bson.Marshal(resp) + + var payload []byte + payload = append(payload, 0, 0, 0, 0) + payload = append(payload, 0) + payload = append(payload, respBytes...) + + totalLen := uint32(16 + len(payload)) + respHeader := make([]byte, 16) + binary.LittleEndian.PutUint32(respHeader[0:4], totalLen) + binary.LittleEndian.PutUint32(respHeader[4:8], reqID+1) + binary.LittleEndian.PutUint32(respHeader[8:12], reqID) + binary.LittleEndian.PutUint32(respHeader[12:16], 2013) + + _, _ = conn.Write(respHeader) + _, _ = conn.Write(payload) + } +} + +// mustNewTestClient creates a test client or fails the test immediately. +// Centralises the repeated client.NewClient + error-check boilerplate. +// Tests use httptest servers (http://), so WithAllowInsecureHTTP is applied. +func mustNewTestClient(t testing.TB, baseURL string) *client.Client { + t.Helper() + + c, err := client.NewClient(baseURL, testutil.NewMockLogger(), client.WithAllowInsecureHTTP(), client.WithServiceAPIKey("test-key")) + require.NoError(t, err) + + return c +} + func TestNewManager(t *testing.T) { t.Run("creates manager with client and service", func(t *testing.T) { c := &client.Client{} @@ -1123,3 +1240,437 @@ func TestBuildTLSConfigFromFiles(t *testing.T) { assert.Contains(t, err.Error(), "failed to parse CA certificate") }) } + +func TestManager_WithSettingsCheckInterval_Option(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + interval time.Duration + expectedInterval time.Duration + }{ + { + name: "sets custom settings check interval", + interval: 1 * time.Minute, + expectedInterval: 1 * time.Minute, + }, + { + name: "sets short settings check interval", + interval: 5 * time.Second, + expectedInterval: 5 * time.Second, + }, + { + name: "disables revalidation with zero duration", + interval: 0, + expectedInterval: 0, + }, + { + name: "disables revalidation with negative duration", + interval: -1 * time.Second, + expectedInterval: 0, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger", + WithSettingsCheckInterval(tt.interval), + ) + + assert.Equal(t, tt.expectedInterval, manager.settingsCheckInterval) + }) + } +} + +func TestManager_DefaultSettingsCheckInterval(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger") + + assert.Equal(t, defaultSettingsCheckInterval, manager.settingsCheckInterval, + "default settings check interval should be set from named constant") + assert.NotNil(t, manager.lastSettingsCheck, + "lastSettingsCheck map should be initialized") +} + +func TestManager_GetConnection_RevalidatesSettingsAfterInterval(t *testing.T) { + t.Parallel() + + var callCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&callCount, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "id": "tenant-123", + "tenantSlug": "test-tenant", + "databases": { + "onboarding": { + "mongodb": {"host": "localhost", "port": 27017, "database": "testdb", "username": "user", "password": "pass"} + } + } + }`)) + })) + defer server.Close() + + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "ledger", + WithLogger(testutil.NewMockLogger()), + WithModule("onboarding"), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + cachedConn := &MongoConnection{DB: fakeDB} + manager.connections["tenant-123"] = cachedConn + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) + + db, err := manager.GetConnection(context.Background(), "tenant-123") + require.NoError(t, err) + assert.Equal(t, fakeDB, db) + + assert.Eventually(t, func() bool { + return atomic.LoadInt32(&callCount) > 0 + }, 500*time.Millisecond, 10*time.Millisecond, "should have fetched fresh config from Tenant Manager") + + manager.mu.RLock() + lastCheck := manager.lastSettingsCheck["tenant-123"] + manager.mu.RUnlock() + + assert.False(t, lastCheck.IsZero(), "lastSettingsCheck should have been updated") + + manager.revalidateWG.Wait() + require.NoError(t, manager.Close(context.Background())) +} + +func TestManager_GetConnection_DisabledRevalidation_WithZero(t *testing.T) { + t.Parallel() + + var callCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&callCount, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "id": "tenant-123", + "tenantSlug": "test-tenant", + "databases": { + "onboarding": { + "mongodb": {"host": "localhost", "port": 27017, "database": "testdb"} + } + } + }`)) + })) + defer server.Close() + + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "ledger", + WithLogger(testutil.NewMockLogger()), + WithModule("onboarding"), + WithSettingsCheckInterval(0), + ) + + assert.Equal(t, time.Duration(0), manager.settingsCheckInterval) + + cachedConn := &MongoConnection{DB: fakeDB} + manager.connections["tenant-123"] = cachedConn + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) + + db, err := manager.GetConnection(context.Background(), "tenant-123") + require.NoError(t, err) + assert.Equal(t, fakeDB, db) + + time.Sleep(50 * time.Millisecond) + + assert.Equal(t, int32(0), atomic.LoadInt32(&callCount), "should NOT have fetched config - revalidation is disabled") + + require.NoError(t, manager.Close(context.Background())) +} + +func TestManager_GetConnection_DisabledRevalidation_WithNegative(t *testing.T) { + t.Parallel() + + var callCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&callCount, 1) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) + })) + defer server.Close() + + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "payment", + WithLogger(testutil.NewMockLogger()), + WithModule("payment"), + WithSettingsCheckInterval(-5*time.Second), + ) + + assert.Equal(t, time.Duration(0), manager.settingsCheckInterval) + + cachedConn := &MongoConnection{DB: fakeDB} + manager.connections["tenant-456"] = cachedConn + manager.lastAccessed["tenant-456"] = time.Now() + manager.lastSettingsCheck["tenant-456"] = time.Now().Add(-1 * time.Hour) + + db, err := manager.GetConnection(context.Background(), "tenant-456") + require.NoError(t, err) + assert.Equal(t, fakeDB, db) + + time.Sleep(50 * time.Millisecond) + + assert.Equal(t, int32(0), atomic.LoadInt32(&callCount), "should NOT have fetched config - revalidation is disabled via negative interval") + + require.NoError(t, manager.Close(context.Background())) +} + +func TestManager_RevalidateSettings_EvictsSuspendedTenant(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + responseStatus int + responseBody string + expectEviction bool + expectLogSubstring string + }{ + { + name: "evicts_cached_connection_when_tenant_is_suspended", + responseStatus: http.StatusForbidden, + responseBody: `{"code":"TS-SUSPENDED","error":"service suspended","status":"suspended"}`, + expectEviction: true, + expectLogSubstring: "tenant tenant-suspended service suspended, evicting cached connection", + }, + { + name: "evicts_cached_connection_when_tenant_is_purged", + responseStatus: http.StatusForbidden, + responseBody: `{"code":"TS-SUSPENDED","error":"service purged","status":"purged"}`, + expectEviction: true, + expectLogSubstring: "tenant tenant-suspended service suspended, evicting cached connection", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tt.responseStatus) + w.Write([]byte(tt.responseBody)) + })) + defer server.Close() + + capLogger := testutil.NewCapturingLogger() + tmClient, err := client.NewClient(server.URL, capLogger, client.WithAllowInsecureHTTP(), client.WithServiceAPIKey("test-key")) + require.NoError(t, err) + manager := NewManager(tmClient, "ledger", + WithLogger(capLogger), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + // Pre-populate a cached connection for the tenant (nil DB to avoid real MongoDB) + manager.connections["tenant-suspended"] = &MongoConnection{DB: nil} + manager.lastAccessed["tenant-suspended"] = time.Now() + manager.lastSettingsCheck["tenant-suspended"] = time.Now() + + // Verify the connection exists before revalidation + statsBefore := manager.Stats() + assert.Equal(t, 1, statsBefore.TotalConnections, + "should have 1 connection before revalidation") + + // Trigger revalidatePoolSettings directly + manager.revalidatePoolSettings("tenant-suspended") + + if tt.expectEviction { + // Verify the connection was evicted + statsAfter := manager.Stats() + assert.Equal(t, 0, statsAfter.TotalConnections, + "connection should be evicted after suspended tenant detected") + + // Verify lastAccessed and lastSettingsCheck were cleaned up + manager.mu.RLock() + _, accessExists := manager.lastAccessed["tenant-suspended"] + _, settingsExists := manager.lastSettingsCheck["tenant-suspended"] + manager.mu.RUnlock() + + assert.False(t, accessExists, + "lastAccessed should be removed for evicted tenant") + assert.False(t, settingsExists, + "lastSettingsCheck should be removed for evicted tenant") + } + + // Verify the appropriate log message was produced + assert.True(t, capLogger.ContainsSubstring(tt.expectLogSubstring), + "expected log message containing %q, got: %v", + tt.expectLogSubstring, capLogger.GetMessages()) + }) + } +} + +func TestManager_RevalidateSettings_FailedDoesNotBreakConnection(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + capLogger := testutil.NewCapturingLogger() + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "ledger", + WithLogger(capLogger), + WithModule("onboarding"), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + // Pre-populate cache + manager.connections["tenant-123"] = &MongoConnection{DB: nil} + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now().Add(-1 * time.Hour) + + // Trigger revalidation directly - should fail but not evict + manager.revalidatePoolSettings("tenant-123") + + // Connection should still exist (not evicted on transient failure) + stats := manager.Stats() + assert.Equal(t, 1, stats.TotalConnections, + "connection should NOT be evicted after transient revalidation failure") + + // Verify a warning was logged + assert.True(t, capLogger.ContainsSubstring("failed to revalidate connection settings"), + "should log a warning when revalidation fails") +} + +func TestManager_RevalidateSettings_RecoverFromPanic(t *testing.T) { + t.Parallel() + + capLogger := testutil.NewCapturingLogger() + + // Create a manager with nil client to trigger a panic path + manager := &Manager{ + logger: logcompat.New(capLogger), + connections: make(map[string]*MongoConnection), + databaseNames: make(map[string]string), + lastAccessed: make(map[string]time.Time), + lastSettingsCheck: make(map[string]time.Time), + settingsCheckInterval: 1 * time.Millisecond, + } + + // Should not panic -- the recovery handler should catch it + assert.NotPanics(t, func() { + manager.revalidatePoolSettings("tenant-panic") + }) +} + +func TestManager_CloseConnection_CleansUpLastSettingsCheck(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger", + WithLogger(testutil.NewMockLogger()), + ) + + // Pre-populate cache + manager.connections["tenant-123"] = &MongoConnection{DB: nil} + manager.lastAccessed["tenant-123"] = time.Now() + manager.lastSettingsCheck["tenant-123"] = time.Now() + + err := manager.CloseConnection(context.Background(), "tenant-123") + + require.NoError(t, err) + + manager.mu.RLock() + _, connExists := manager.connections["tenant-123"] + _, accessExists := manager.lastAccessed["tenant-123"] + _, settingsCheckExists := manager.lastSettingsCheck["tenant-123"] + manager.mu.RUnlock() + + assert.False(t, connExists, "connection should be removed after CloseConnection") + assert.False(t, accessExists, "lastAccessed should be removed after CloseConnection") + assert.False(t, settingsCheckExists, "lastSettingsCheck should be removed after CloseConnection") +} + +func TestManager_Close_CleansUpLastSettingsCheck(t *testing.T) { + t.Parallel() + + c := mustNewTestClient(t, "http://localhost:8080") + manager := NewManager(c, "ledger", + WithLogger(testutil.NewMockLogger()), + ) + + // Pre-populate cache with multiple tenants + for _, id := range []string{"tenant-1", "tenant-2"} { + manager.connections[id] = &MongoConnection{DB: nil} + manager.lastAccessed[id] = time.Now() + manager.lastSettingsCheck[id] = time.Now() + } + + err := manager.Close(context.Background()) + + require.NoError(t, err) + + assert.Empty(t, manager.connections, "all connections should be removed after Close") + assert.Empty(t, manager.lastAccessed, "all lastAccessed should be removed after Close") + assert.Empty(t, manager.lastSettingsCheck, "all lastSettingsCheck should be removed after Close") +} + +func TestManager_Close_WaitsForRevalidateSettings(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + time.Sleep(300 * time.Millisecond) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "id": "tenant-slow", + "tenantSlug": "slow-tenant", + "databases": { + "onboarding": { + "mongodb": {"host": "localhost", "port": 27017, "database": "testdb", "username": "user", "password": "pass"} + } + } + }`)) + })) + defer server.Close() + + fakeDB, cleanupFake := startFakeMongoServer(t) + defer cleanupFake() + + tmClient := mustNewTestClient(t, server.URL) + manager := NewManager(tmClient, "test-service", + WithLogger(testutil.NewMockLogger()), + WithSettingsCheckInterval(1*time.Millisecond), + ) + + cachedConn := &MongoConnection{DB: fakeDB} + manager.connections["tenant-slow"] = cachedConn + manager.lastAccessed["tenant-slow"] = time.Now() + manager.lastSettingsCheck["tenant-slow"] = time.Time{} + + _, err := manager.GetConnection(context.Background(), "tenant-slow") + require.NoError(t, err) + + err = manager.Close(context.Background()) + require.NoError(t, err) + + assert.True(t, manager.closed, "manager should be closed") + assert.Empty(t, manager.connections, "connections should be cleared after Close") +}