Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/repository/scheduler_concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(

defer rollback()

err = c.queries.AdvisoryLock(ctx, tx, strategy.ID)
acquired, err := c.queries.TryAdvisoryLock(ctx, tx, strategy.ID)

if err != nil {
return err
}

if !acquired {
return nil
}

isActive, err := c.queries.CheckStrategyActive(ctx, tx, sqlcv1.CheckStrategyActiveParams{
Tenantid: tenantId,
Workflowid: strategy.WorkflowID,
Expand Down
15 changes: 14 additions & 1 deletion pkg/scheduling/v1/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type ConcurrencyManager struct {
minPollingInterval time.Duration

maxPollingInterval time.Duration

isActiveSemaphore chan struct{}
}

func newConcurrencyManager(conf *sharedConfig, tenantId uuid.UUID, strategy *sqlcv1.V1StepConcurrency, resultsCh chan<- *ConcurrencyResults) *ConcurrencyManager {
Expand All @@ -62,6 +64,7 @@ func newConcurrencyManager(conf *sharedConfig, tenantId uuid.UUID, strategy *sql
rateLimiter: newConcurrencyRateLimiter(conf.schedulerConcurrencyRateLimit),
minPollingInterval: conf.schedulerConcurrencyPollingMinInterval,
maxPollingInterval: conf.schedulerConcurrencyPollingMaxInterval,
isActiveSemaphore: conf.isActiveSemaphore,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -156,7 +159,8 @@ func (c *ConcurrencyManager) loopConcurrency(ctx context.Context) {
}

func (c *ConcurrencyManager) loopCheckActive(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
ticker := randomticker.NewRandomTicker(5*time.Second, 10*time.Second)
defer ticker.Stop()

for {
select {
Expand All @@ -165,6 +169,13 @@ func (c *ConcurrencyManager) loopCheckActive(ctx context.Context) {
case <-ticker.C:
}

// Limit the number of concurrent is-active checks to avoid saturating the connection pool.
select {
case <-ctx.Done():
return
case c.isActiveSemaphore <- struct{}{}:
}
Comment on lines +172 to +177

ctx, span := telemetry.NewSpan(ctx, "concurrency-check-active")

telemetry.WithAttributes(span,
Expand All @@ -176,6 +187,8 @@ func (c *ConcurrencyManager) loopCheckActive(ctx context.Context) {

err := c.repo.UpdateConcurrencyStrategyIsActive(ctx, c.tenantId, c.strategy)

<-c.isActiveSemaphore

if err != nil {
span.End()
c.l.Error().Err(err).Msg("error updating concurrency strategy is_active")
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduling/v1/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type sharedConfig struct {
schedulerConcurrencyPollingMinInterval time.Duration

schedulerConcurrencyPollingMaxInterval time.Duration

// Limits concurrent is-active checks across all ConcurrencyManagers to avoid
// saturating the database connection pool.
isActiveSemaphore chan struct{}
}

// SchedulingPool is responsible for managing a pool of tenantManagers.
Expand Down Expand Up @@ -68,6 +72,7 @@ func NewSchedulingPool(
schedulerConcurrencyRateLimit: schedulerConcurrencyRateLimit,
schedulerConcurrencyPollingMinInterval: schedulerConcurrencyPollingMinInterval,
schedulerConcurrencyPollingMaxInterval: schedulerConcurrencyPollingMaxInterval,
isActiveSemaphore: make(chan struct{}, 10),
},
Comment on lines 72 to 76
resultsCh: resultsCh,
concurrencyResultsCh: concurrencyResultsCh,
Expand Down
Loading