Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 122 additions & 11 deletions commons/tenant-manager/mongo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ import (
// mongoPingTimeout is the maximum duration for MongoDB connection health check pings.
const mongoPingTimeout = 3 * time.Second

// defaultSettingsCheckInterval is the default interval between periodic
// connection pool settings revalidation checks. When a cached connection is
// returned by GetConnection and this interval has elapsed since the last check,
// fresh config is fetched from the Tenant Manager asynchronously.
const defaultSettingsCheckInterval = 30 * time.Second

// settingsRevalidationTimeout is the maximum duration for the HTTP call
// to the Tenant Manager during async settings revalidation.
const settingsRevalidationTimeout = 5 * time.Second

// DefaultMaxConnections is the default max connections for MongoDB.
const DefaultMaxConnections uint64 = 100

Expand Down Expand Up @@ -69,6 +79,14 @@ type Manager struct {
maxConnections int // soft limit for pool size (0 = unlimited)
idleTimeout time.Duration // how long before a connection is eligible for eviction
lastAccessed map[string]time.Time // LRU tracking per tenant

lastSettingsCheck map[string]time.Time // tracks per-tenant last settings revalidation time
settingsCheckInterval time.Duration // configurable interval between settings revalidation checks

// revalidateWG tracks in-flight revalidatePoolSettings goroutines so Close()
// can wait for them to finish before returning. Without this, goroutines
// spawned by GetConnection may access Manager state after Close() returns.
revalidateWG sync.WaitGroup
}

type MongoConnection struct {
Expand Down Expand Up @@ -173,6 +191,21 @@ func WithMaxTenantPools(maxSize int) Option {
}
}

// WithSettingsCheckInterval sets the interval between periodic connection pool settings
// revalidation checks. When GetConnection returns a cached connection and this interval
// has elapsed since the last check for that tenant, fresh config is fetched from the
// Tenant Manager asynchronously. For MongoDB, the driver does not support runtime pool
// resize, but revalidation detects suspended/purged tenants and evicts their connections.
//
// If d <= 0, revalidation is DISABLED (settingsCheckInterval is set to 0).
// When disabled, no async revalidation checks are performed on cache hits.
// Default: 30 seconds (defaultSettingsCheckInterval).
func WithSettingsCheckInterval(d time.Duration) Option {
return func(p *Manager) {
p.settingsCheckInterval = max(d, 0)
}
}

// WithIdleTimeout sets the duration after which an unused tenant connection becomes
// eligible for eviction. Only connections idle longer than this duration will be evicted
// when the pool exceeds the soft limit (maxConnections). If all connections are active
Expand All @@ -187,12 +220,14 @@ func WithIdleTimeout(d time.Duration) Option {
// NewManager creates a new MongoDB connection manager.
func NewManager(c *client.Client, service string, opts ...Option) *Manager {
p := &Manager{
client: c,
service: service,
logger: logcompat.New(nil),
connections: make(map[string]*MongoConnection),
databaseNames: make(map[string]string),
lastAccessed: make(map[string]time.Time),
client: c,
service: service,
logger: logcompat.New(nil),
connections: make(map[string]*MongoConnection),
databaseNames: make(map[string]string),
lastAccessed: make(map[string]time.Time),
lastSettingsCheck: make(map[string]time.Time),
settingsCheckInterval: defaultSettingsCheckInterval,
}

for _, opt := range opts {
Expand All @@ -206,7 +241,7 @@ func NewManager(c *client.Client, service string, opts ...Option) *Manager {
// If a cached client fails a health check (e.g., due to credential rotation
// after a tenant purge+re-associate), the stale client is evicted and a new
// one is created with fresh credentials from the Tenant Manager.
func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Client, error) {
func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Client, error) { //nolint:gocognit // complexity from connection lifecycle (ping, revalidate, evict) is inherent
if ctx == nil {
ctx = context.Background()
}
Expand Down Expand Up @@ -250,11 +285,28 @@ func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Cl
// but re-check that the connection was not evicted while we were
// pinging (another goroutine may have called CloseConnection,
// Close, or evictLRU in the meantime).
now := time.Now()

p.mu.Lock()
if _, stillExists := p.connections[tenantID]; stillExists {
p.lastAccessed[tenantID] = time.Now()
if current, stillExists := p.connections[tenantID]; stillExists && current == conn {
p.lastAccessed[tenantID] = now

shouldRevalidate := p.client != nil && p.settingsCheckInterval > 0 && time.Since(p.lastSettingsCheck[tenantID]) > p.settingsCheckInterval
if shouldRevalidate {
p.lastSettingsCheck[tenantID] = now
p.revalidateWG.Add(1)
}

p.mu.Unlock()

if shouldRevalidate {
go func() { //#nosec G118 -- intentional: revalidatePoolSettings creates its own timeout context; must not use request-scoped context as this outlives the request
defer p.revalidateWG.Done()

p.revalidatePoolSettings(tenantID)
}()
}

return conn.DB, nil
}

Expand All @@ -274,6 +326,54 @@ func (p *Manager) GetConnection(ctx context.Context, tenantID string) (*mongo.Cl
return p.createConnection(ctx, tenantID)
}

// revalidatePoolSettings fetches fresh config from the Tenant Manager and detects
// whether the tenant has been suspended or purged. For MongoDB, the driver does not
// support changing pool size after client creation, so this method only checks for
// tenant status changes and evicts the cached connection if the tenant is suspended.
// This runs asynchronously (in a goroutine) and must never block GetConnection.
// If the fetch fails, a warning is logged but the connection remains usable.
func (p *Manager) revalidatePoolSettings(tenantID string) {
// Guard: recover from any panic to avoid crashing the process.
// This goroutine runs asynchronously and must never bring down the service.
defer func() {
if r := recover(); r != nil {
if p.logger != nil {
p.logger.Warnf("recovered from panic during settings revalidation for tenant %s: %v", tenantID, r)
}
}
}()

revalidateCtx, cancel := context.WithTimeout(context.Background(), settingsRevalidationTimeout)
defer cancel()

_, err := p.client.GetTenantConfig(revalidateCtx, tenantID, p.service)
if err != nil {
// If tenant service was suspended/purged, evict the cached connection immediately.
// The next request for this tenant will call createConnection, which fetches fresh
// config from the Tenant Manager and receives the 403 error directly.
if core.IsTenantSuspendedError(err) {
if p.logger != nil {
p.logger.Warnf("tenant %s service suspended, evicting cached connection", tenantID)
}

evictCtx, evictCancel := context.WithTimeout(context.Background(), settingsRevalidationTimeout)
defer evictCancel()

_ = p.CloseConnection(evictCtx, tenantID)

return
}

if p.logger != nil {
p.logger.Warnf("failed to revalidate connection settings for tenant %s: %v", tenantID, err)
}

return
}

p.ApplyConnectionSettings(tenantID, nil)
}

// createConnection fetches config from Tenant Manager and creates a MongoDB client.
func (p *Manager) createConnection(ctx context.Context, tenantID string) (*mongo.Client, error) {
if p.client == nil {
Expand Down Expand Up @@ -395,6 +495,7 @@ func (p *Manager) removeStaleCacheEntry(tenantID string, cachedConn *MongoConnec
delete(p.connections, tenantID)
delete(p.databaseNames, tenantID)
delete(p.lastAccessed, tenantID)
delete(p.lastSettingsCheck, tenantID)
}
}

Expand Down Expand Up @@ -564,6 +665,7 @@ func (p *Manager) evictLRU(ctx context.Context, logger log.Logger) {
delete(p.connections, candidateID)
delete(p.databaseNames, candidateID)
delete(p.lastAccessed, candidateID)
delete(p.lastSettingsCheck, candidateID)
}
}

Expand Down Expand Up @@ -642,11 +744,13 @@ func (p *Manager) GetDatabaseForTenant(ctx context.Context, tenantID string) (*m
}

// Close closes all MongoDB connections.
// It waits for any in-flight revalidatePoolSettings goroutines to finish
// before returning, preventing goroutine leaks and use-after-close races.
//
// Uses snapshot-then-cleanup to avoid holding the mutex during network I/O
// (Disconnect calls), which could block other goroutines on slow networks.
func (p *Manager) Close(ctx context.Context) error {
// Step 1: Under lock — mark closed, snapshot all connections, clear maps.
// Phase 1: Under lock — mark closed, snapshot all connections, clear maps.
p.mu.Lock()
p.closed = true

Expand All @@ -659,10 +763,11 @@ func (p *Manager) Close(ctx context.Context) error {
clear(p.connections)
clear(p.databaseNames)
clear(p.lastAccessed)
clear(p.lastSettingsCheck)

p.mu.Unlock()

// Step 2: Outside lock — disconnect each snapshotted connection.
// Phase 2: Outside lock — disconnect each snapshotted connection.
var errs []error

for _, conn := range snapshot {
Expand All @@ -673,6 +778,11 @@ func (p *Manager) Close(ctx context.Context) error {
}
}

// Phase 3: Wait for in-flight revalidatePoolSettings goroutines OUTSIDE the lock.
// revalidatePoolSettings acquires p.mu internally (via CloseConnection),
// so waiting with the lock held would deadlock.
p.revalidateWG.Wait()

return errors.Join(errs...)
}

Expand All @@ -693,6 +803,7 @@ func (p *Manager) CloseConnection(ctx context.Context, tenantID string) error {
delete(p.connections, tenantID)
delete(p.databaseNames, tenantID)
delete(p.lastAccessed, tenantID)
delete(p.lastSettingsCheck, tenantID)

p.mu.Unlock()

Expand Down
Loading
Loading