From 6bbf97c7a012bc35ed465c61df11f72d3137d5ff Mon Sep 17 00:00:00 2001 From: Gabe Ruttner Date: Sat, 14 Mar 2026 08:55:25 -0400 Subject: [PATCH] fix: prevent connection pool exhaustion from concurrent is-active checks --- pkg/repository/scheduler_concurrency.go | 6 +++++- pkg/scheduling/v1/concurrency.go | 15 ++++++++++++++- pkg/scheduling/v1/pool.go | 5 +++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pkg/repository/scheduler_concurrency.go b/pkg/repository/scheduler_concurrency.go index 635a0dfa67..8b532314a2 100644 --- a/pkg/repository/scheduler_concurrency.go +++ b/pkg/repository/scheduler_concurrency.go @@ -69,12 +69,16 @@ func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive( defer rollback() - err = c.queries.AdvisoryLock(ctx, tx, strategy.ID) + acquired, err := c.queries.TryAdvisoryLock(ctx, tx, strategy.ID) if err != nil { return err } + if !acquired { + return nil + } + isActive, err := c.queries.CheckStrategyActive(ctx, tx, sqlcv1.CheckStrategyActiveParams{ Tenantid: tenantId, Workflowid: strategy.WorkflowID, diff --git a/pkg/scheduling/v1/concurrency.go b/pkg/scheduling/v1/concurrency.go index 279627d88a..43d36885b4 100644 --- a/pkg/scheduling/v1/concurrency.go +++ b/pkg/scheduling/v1/concurrency.go @@ -44,6 +44,8 @@ type ConcurrencyManager struct { minPollingInterval time.Duration maxPollingInterval time.Duration + + isActiveSemaphore chan struct{} } func newConcurrencyManager(conf *sharedConfig, tenantId uuid.UUID, strategy *sqlcv1.V1StepConcurrency, resultsCh chan<- *ConcurrencyResults) *ConcurrencyManager { @@ -62,6 +64,7 @@ func newConcurrencyManager(conf *sharedConfig, tenantId uuid.UUID, strategy *sql rateLimiter: newConcurrencyRateLimiter(conf.schedulerConcurrencyRateLimit), minPollingInterval: conf.schedulerConcurrencyPollingMinInterval, maxPollingInterval: conf.schedulerConcurrencyPollingMaxInterval, + isActiveSemaphore: conf.isActiveSemaphore, } ctx, cancel := context.WithCancel(context.Background()) @@ -156,7 +159,8 @@ func (c *ConcurrencyManager) loopConcurrency(ctx context.Context) { } func (c *ConcurrencyManager) loopCheckActive(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) + ticker := randomticker.NewRandomTicker(5*time.Second, 10*time.Second) + defer ticker.Stop() for { select { @@ -165,6 +169,13 @@ func (c *ConcurrencyManager) loopCheckActive(ctx context.Context) { case <-ticker.C: } + // Limit the number of concurrent is-active checks to avoid saturating the connection pool. + select { + case <-ctx.Done(): + return + case c.isActiveSemaphore <- struct{}{}: + } + ctx, span := telemetry.NewSpan(ctx, "concurrency-check-active") telemetry.WithAttributes(span, @@ -176,6 +187,8 @@ func (c *ConcurrencyManager) loopCheckActive(ctx context.Context) { err := c.repo.UpdateConcurrencyStrategyIsActive(ctx, c.tenantId, c.strategy) + <-c.isActiveSemaphore + if err != nil { span.End() c.l.Error().Err(err).Msg("error updating concurrency strategy is_active") diff --git a/pkg/scheduling/v1/pool.go b/pkg/scheduling/v1/pool.go index a1b5f2c550..59e699a10c 100644 --- a/pkg/scheduling/v1/pool.go +++ b/pkg/scheduling/v1/pool.go @@ -26,6 +26,10 @@ type sharedConfig struct { schedulerConcurrencyPollingMinInterval time.Duration schedulerConcurrencyPollingMaxInterval time.Duration + + // Limits concurrent is-active checks across all ConcurrencyManagers to avoid + // saturating the database connection pool. + isActiveSemaphore chan struct{} } // SchedulingPool is responsible for managing a pool of tenantManagers. @@ -68,6 +72,7 @@ func NewSchedulingPool( schedulerConcurrencyRateLimit: schedulerConcurrencyRateLimit, schedulerConcurrencyPollingMinInterval: schedulerConcurrencyPollingMinInterval, schedulerConcurrencyPollingMaxInterval: schedulerConcurrencyPollingMaxInterval, + isActiveSemaphore: make(chan struct{}, 10), }, resultsCh: resultsCh, concurrencyResultsCh: concurrencyResultsCh,