From e2a6c4ad86ad3cd35af2cc0b22f31c1b9ac547e6 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 1 Apr 2026 13:03:06 +0100 Subject: [PATCH 1/7] Improve the performance of creating FairSchedulingAlgoContext When creating the fairshare scheduling context, we loop through all jobs of the jobdb and then filter out the ones relevant to the current pool We then use these jobs to calculate relevant info about the pool - Which jobs are on which nodes - The demand of each queue (and subsequently the fairshare/adjusted fairshare) This is simple but quite inefficient if you have a lot (millions) of jobs in the system and many pools (most of which only a small fraction of jobs interact with) - As a result most of the scheduling time can be taken up by this data shuffling Now instead of loading all jobs, we'll load on relevant jobs: - All leased jobs (needed to calculate which jobs are on which nodes) - Possibly this could be further improved by storing them by node but for now this is simple - All queued jobs against pools we are processing (current pool + away pools) This should have a significant impact, especially in the case we have 1 pool with most jobs queued against it and many smaller pools There is larger reworks we could do here to make it even more efficient, however for now this should give us most the benefit for a minor change - Example would be to calculate demand for each pool + jobs on each node upfront. Incrementally update this as pools are processed rather than recalculate it entirely Signed-off-by: JamesMurkin --- internal/common/slices/slices.go | 18 +++++ internal/common/slices/slices_test.go | 81 +++++++++++++++++++ internal/scheduler/jobdb/job.go | 5 ++ internal/scheduler/jobdb/jobdb.go | 57 ++++++++++++- .../scheduler/scheduling/scheduling_algo.go | 10 ++- 5 files changed, 169 insertions(+), 2 deletions(-) diff --git a/internal/common/slices/slices.go b/internal/common/slices/slices.go index 0eb84145472..a68169e58af 100644 --- a/internal/common/slices/slices.go +++ b/internal/common/slices/slices.go @@ -87,6 +87,24 @@ func Unique[S ~[]E, E comparable](s S) S { return rv } +// UniqueBy returns a copy of s with duplicate elements removed based on the result of keyFunc(e), +// keeping only the first occurrence of each unique key. +func UniqueBy[S ~[]E, E any, K comparable](s S, keyFunc func(E) K) S { + if s == nil { + return nil + } + rv := make(S, 0) + seen := make(map[K]bool) + for _, v := range s { + key := keyFunc(v) + if !seen[key] { + rv = append(rv, v) + seen[key] = true + } + } + return rv +} + // GroupByFunc groups the elements e_1, ..., e_n of s into separate slices by keyFunc(e). func GroupByFunc[S ~[]E, E any, K comparable](s S, keyFunc func(E) K) map[K]S { rv := make(map[K]S) diff --git a/internal/common/slices/slices_test.go b/internal/common/slices/slices_test.go index dd733ad98bf..ef05180c41c 100644 --- a/internal/common/slices/slices_test.go +++ b/internal/common/slices/slices_test.go @@ -322,6 +322,87 @@ func TestUnique(t *testing.T) { } } +func TestUniqueBy(t *testing.T) { + type item struct { + Id string + Name string + } + + tests := map[string]struct { + input []item + keyFunc func(i item) string + expected []item + }{ + "nil": { + input: nil, + keyFunc: func(i item) string { return i.Id }, + expected: nil, + }, + "empty": { + input: []item{}, + keyFunc: func(i item) string { return i.Name }, + expected: []item{}, + }, + "no duplicates": { + input: []item{ + {Id: "1", Name: "Alice"}, + {Id: "2", Name: "Bob"}, + {Id: "3", Name: "Charlie"}, + }, + keyFunc: func(i item) string { return i.Id }, + expected: []item{ + {Id: "1", Name: "Alice"}, + {Id: "2", Name: "Bob"}, + {Id: "3", Name: "Charlie"}, + }, + }, + "consecutive duplicates": { + input: []item{ + {Id: "1", Name: "Alice"}, + {Id: "2", Name: "Bob"}, + {Id: "2", Name: "Bobby"}, + }, + keyFunc: func(i item) string { return i.Id }, + expected: []item{ + {Id: "1", Name: "Alice"}, + {Id: "2", Name: "Bob"}, + }, + }, + "non-consecutive duplicates": { + input: []item{ + {Id: "2", Name: "Bob"}, + {Id: "1", Name: "Alice"}, + {Id: "2", Name: "Bobby"}, + }, + keyFunc: func(i item) string { return i.Id }, + expected: []item{ + {Id: "2", Name: "Bob"}, + {Id: "1", Name: "Alice"}, + }, + }, + "duplicate based on custom key function Name": { + input: []item{ + {Id: "1", Name: "Alice"}, + {Id: "2", Name: "Bob"}, + {Id: "2", Name: "Bobby"}, + }, + keyFunc: func(i item) string { return i.Name }, + expected: []item{ + {Id: "1", Name: "Alice"}, + {Id: "2", Name: "Bob"}, + {Id: "2", Name: "Bobby"}, + }, + }, + } + + // Run all test cases + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.expected, UniqueBy(tc.input, tc.keyFunc)) + }) + } +} + func TestFilter(t *testing.T) { includeOver5 := func(val int) bool { return val > 5 } input := []int{1, 3, 5, 7, 9} diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index ebfbe170519..80f6771681f 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -893,6 +893,11 @@ func (job *Job) WithValidated(validated bool) *Job { return j } +// Leased returns true if the job is currently leased +func (job *Job) Leased() bool { + return !job.Queued() && !job.InTerminalState() +} + // Validated returns true if the job has been validated func (job *Job) Validated() bool { return job.validated diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 6f8b93148a9..2fff80db76c 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -69,6 +69,7 @@ type JobDb struct { jobsByGangKey map[gangKey]immutable.Set[string] jobsByQueue map[string]immutable.SortedSet[*Job] jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job] + leasedJobs *immutable.Set[*Job] unvalidatedJobs *immutable.Set[*Job] // Configured priority classes. priorityClasses map[string]types.PriorityClass @@ -128,12 +129,14 @@ func NewJobDbWithSchedulingKeyGenerator( panic(fmt.Sprintf("unknown default priority class %s", defaultPriorityClassName)) } unvalidatedJobs := immutable.NewSet[*Job](JobHasher{}) + leasedJobs := immutable.NewSet[*Job](JobHasher{}) return &JobDb{ jobsById: immutable.NewMap[string, *Job](nil), jobsByRunId: immutable.NewMap[string, string](nil), jobsByGangKey: map[gangKey]immutable.Set[string]{}, jobsByQueue: map[string]immutable.SortedSet[*Job]{}, jobsByPoolAndQueue: map[string]map[string]immutable.SortedSet[*Job]{}, + leasedJobs: &leasedJobs, unvalidatedJobs: &unvalidatedJobs, priorityClasses: priorityClasses, defaultPriorityClass: defaultPriorityClass, @@ -161,6 +164,7 @@ func (jobDb *JobDb) Clone() *JobDb { jobsByGangKey: maps.Clone(jobDb.jobsByGangKey), jobsByQueue: maps.Clone(jobDb.jobsByQueue), jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue), + leasedJobs: jobDb.leasedJobs, unvalidatedJobs: jobDb.unvalidatedJobs, priorityClasses: jobDb.priorityClasses, defaultPriorityClass: jobDb.defaultPriorityClass, @@ -318,6 +322,7 @@ func (jobDb *JobDb) ReadTxn() *Txn { jobsByGangKey: jobDb.jobsByGangKey, jobsByQueue: jobDb.jobsByQueue, jobsByPoolAndQueue: jobDb.jobsByPoolAndQueue, + leasedJobs: jobDb.leasedJobs, unvalidatedJobs: jobDb.unvalidatedJobs, active: true, jobDb: jobDb, @@ -338,6 +343,7 @@ func (jobDb *JobDb) WriteTxn() *Txn { jobsByGangKey: maps.Clone(jobDb.jobsByGangKey), jobsByQueue: maps.Clone(jobDb.jobsByQueue), jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue), + leasedJobs: jobDb.leasedJobs, unvalidatedJobs: jobDb.unvalidatedJobs, active: true, jobDb: jobDb, @@ -376,6 +382,8 @@ type Txn struct { // Queued jobs for each queue and pool. // Stored as a set and needs sorting to determine the order they should be scheduled in. jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job] + // Jobs that are currently leased + leasedJobs *immutable.Set[*Job] // Jobs that require submit checking unvalidatedJobs *immutable.Set[*Job] // The jobDb from which this transaction was created. @@ -396,6 +404,7 @@ func (txn *Txn) Commit() { txn.jobDb.jobsByGangKey = txn.jobsByGangKey txn.jobDb.jobsByQueue = txn.jobsByQueue txn.jobDb.jobsByPoolAndQueue = txn.jobsByPoolAndQueue + txn.jobDb.leasedJobs = txn.leasedJobs txn.jobDb.unvalidatedJobs = txn.unvalidatedJobs txn.active = false @@ -526,6 +535,11 @@ func (txn *Txn) Upsert(jobs []*Job) error { txn.jobsByPoolAndQueue[pool][job.queue] = existingJobs.Delete(existingJob) } + if existingJob.Leased() { + newLeasedJobs := txn.leasedJobs.Delete(existingJob) + txn.leasedJobs = &newLeasedJobs + } + if !existingJob.Validated() { newUnvalidatedJobs := txn.unvalidatedJobs.Delete(existingJob) txn.unvalidatedJobs = &newUnvalidatedJobs @@ -536,7 +550,7 @@ func (txn *Txn) Upsert(jobs []*Job) error { // Now need to insert jobs, runs and queuedJobs. This can be done in parallel. wg := sync.WaitGroup{} - wg.Add(5) + wg.Add(6) // jobs go func() { @@ -675,6 +689,30 @@ func (txn *Txn) Upsert(jobs []*Job) error { } }() + // Leased jobs + go func() { + defer wg.Done() + if hasJobs { + for _, job := range jobs { + if job.Leased() { + leasedJobs := txn.leasedJobs.Add(job) + txn.leasedJobs = &leasedJobs + } + } + } else { + leasedJobs := map[*Job]bool{} + + for _, job := range jobs { + if job.Leased() { + leasedJobs[job] = true + } + } + + leasedJobsImmutable := immutable.NewSet[*Job](JobHasher{}, maps.Keys(leasedJobs)...) + txn.leasedJobs = &leasedJobsImmutable + } + }() + // Unvalidated jobs go func() { defer wg.Done() @@ -811,6 +849,11 @@ func (txn *Txn) UnvalidatedJobs() *immutable.SetIterator[*Job] { return txn.unvalidatedJobs.Iterator() } +// GetAllLeasedJobs returns all leased jobs in the database +func (txn *Txn) GetAllLeasedJobs() []*Job { + return txn.leasedJobs.Items() +} + // GetAll returns all jobs in the database. func (txn *Txn) GetAll() []*Job { allJobs := make([]*Job, 0, txn.jobsById.Len()) @@ -822,6 +865,15 @@ func (txn *Txn) GetAll() []*Job { return allJobs } +// GetQueuedJobsByPool returns all queued jobs against a given pool +func (txn *Txn) GetQueuedJobsByPool(pool string) []*Job { + allJobs := make([]*Job, 0, txn.jobsById.Len()) + for _, jobs := range txn.jobsByPoolAndQueue[pool] { + allJobs = append(allJobs, jobs.Items()...) + } + return allJobs +} + // BatchDelete deletes the jobs with the given ids from the database. // Any ids not in the database are ignored. func (txn *Txn) BatchDelete(jobIds []string) error { @@ -871,6 +923,9 @@ func (txn *Txn) delete(jobId string) { } } } + newLeasedJobs := txn.leasedJobs.Delete(job) + txn.unvalidatedJobs = &newLeasedJobs + newUnvalidatedJobs := txn.unvalidatedJobs.Delete(job) txn.unvalidatedJobs = &newUnvalidatedJobs } diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 3ca82c858a1..1c7b3162b9e 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -371,12 +371,20 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con allPools = append(allPools, currentPool.AwayPools...) allPools = append(allPools, awayAllocationPools...) + allJobs := txn.GetAllLeasedJobs() + allQueuedJobs := []*jobdb.Job{} + for _, pool := range allPools { + allQueuedJobs = append(allQueuedJobs, txn.GetQueuedJobsByPool(pool)...) + } + allUniqueQueuedJobs := armadaslices.UniqueBy(allQueuedJobs, func(job *jobdb.Job) string { return job.Id() }) + allJobs = append(allJobs, allUniqueQueuedJobs...) + jobSchedulingInfo, err := l.calculateJobSchedulingInfo(ctx, armadamaps.FromSlice(executors, func(ex *schedulerobjects.Executor) string { return ex.Id }, func(_ *schedulerobjects.Executor) bool { return true }), queueByName, - txn.GetAll(), + allJobs, currentPool.Name, awayAllocationPools, allPools) From 1faafd04a55a4afdcdda7d358c812462bdc795c6 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 1 Apr 2026 15:44:17 +0100 Subject: [PATCH 2/7] Testing Signed-off-by: JamesMurkin --- internal/scheduler/jobdb/job.go | 2 +- internal/scheduler/jobdb/job_test.go | 16 +++++ internal/scheduler/jobdb/jobdb.go | 2 +- internal/scheduler/jobdb/jobdb_test.go | 92 ++++++++++++++++++++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index 80f6771681f..9145b433613 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -895,7 +895,7 @@ func (job *Job) WithValidated(validated bool) *Job { // Leased returns true if the job is currently leased func (job *Job) Leased() bool { - return !job.Queued() && !job.InTerminalState() + return !job.queued && !job.InTerminalState() && job.LatestRun() != nil } // Validated returns true if the job has been validated diff --git a/internal/scheduler/jobdb/job_test.go b/internal/scheduler/jobdb/job_test.go index 4db9dc11e96..147c3ae862c 100644 --- a/internal/scheduler/jobdb/job_test.go +++ b/internal/scheduler/jobdb/job_test.go @@ -288,6 +288,22 @@ func TestJob_TestWithNewRun(t *testing.T) { ) } +func TestJob_Leased(t *testing.T) { + leasedJob := baseJob.WithQueued(false).WithUpdatedRun(baseRun) + assert.True(t, leasedJob.Leased()) + + queuedJob := leasedJob.WithQueued(true) + assert.False(t, queuedJob.Leased()) + + // Terminal jobs + assert.False(t, leasedJob.WithSucceeded(true).Leased()) + assert.False(t, leasedJob.WithFailed(true).Leased()) + assert.False(t, leasedJob.WithCancelled(true).Leased()) + + jobWithoutRun := baseJob.WithQueued(false) + assert.False(t, jobWithoutRun.Leased()) +} + func TestJob_TestWithUpdatedRun_NewRun(t *testing.T) { jobWithRun := baseJob.WithUpdatedRun(baseRun) assert.Equal(t, true, jobWithRun.HasRuns()) diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 2fff80db76c..66c0ac90a42 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -924,7 +924,7 @@ func (txn *Txn) delete(jobId string) { } } newLeasedJobs := txn.leasedJobs.Delete(job) - txn.unvalidatedJobs = &newLeasedJobs + txn.leasedJobs = &newLeasedJobs newUnvalidatedJobs := txn.unvalidatedJobs.Delete(job) txn.unvalidatedJobs = &newUnvalidatedJobs diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index 971095e0374..ffd464f7034 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -74,6 +74,76 @@ func TestJobDb_TestGetById(t *testing.T) { assert.Nil(t, txn.GetById(util.NewULID())) } +func TestJobDb_TestGetLeased(t *testing.T) { + jobDb := NewTestJobDb() + job1 := newJob().WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) + job2 := newJob().WithQueued(true) + job3 := newJob().WithQueued(false).WithSucceeded(true) + job4 := newJob().WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) + txn := jobDb.WriteTxn() + + err := txn.Upsert([]*Job{job1, job2, job3, job4}) + require.NoError(t, err) + + expected := []*Job{job1, job4} + actual := txn.GetAllLeasedJobs() + sort.SliceStable(actual, func(i, j int) bool { return actual[i].id < actual[j].id }) + sort.SliceStable(expected, func(i, j int) bool { return expected[i].id < expected[j].id }) + assert.Equal(t, expected, actual) +} + +func TestJobDb_Leased_Lifecycle(t *testing.T) { + jobDb := NewTestJobDb() + + upsert := func(jobDb *JobDb, job *Job) { + txn := jobDb.WriteTxn() + err := txn.Upsert([]*Job{job}) + require.NoError(t, err) + txn.Commit() + } + + job1 := newJob().WithQueued(true) + upsert(jobDb, job1) + assert.Empty(t, jobDb.ReadTxn().GetAllLeasedJobs()) + + // leased + job1 = job1.WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) + upsert(jobDb, job1) + assert.NotEmpty(t, jobDb.ReadTxn().GetAllLeasedJobs()) + + // requeued + job1 = job1.WithQueued(true) + upsert(jobDb, job1) + assert.Empty(t, jobDb.ReadTxn().GetAllLeasedJobs()) + + // leased + job1 = job1.WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) + upsert(jobDb, job1) + assert.NotEmpty(t, jobDb.ReadTxn().GetAllLeasedJobs()) + + // finished + job1 = job1.WithSucceeded(true) + upsert(jobDb, job1) + assert.Empty(t, jobDb.ReadTxn().GetAllLeasedJobs()) +} + +func TestJobDb_Leased_Deleted(t *testing.T) { + jobDb := NewTestJobDb() + job1 := newJob().WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) + txn := jobDb.WriteTxn() + + err := txn.Upsert([]*Job{job1}) + require.NoError(t, err) + + expected := []*Job{job1} + actual := txn.GetAllLeasedJobs() + assert.Equal(t, expected, actual) + + err = txn.BatchDelete([]string{job1.Id()}) + require.NoError(t, err) + assert.Empty(t, txn.GetAllLeasedJobs()) +} + func TestJobDb_TestGetUnvalidated(t *testing.T) { jobDb := NewTestJobDb() job1 := newJob().WithValidated(false) @@ -163,6 +233,28 @@ func TestJobDb_TestGetByRunId(t *testing.T) { assert.Nil(t, txn.GetByRunId(job1.LatestRun().id)) } +func TestJobDb_TestGetQueuedJobsByPool(t *testing.T) { + jobDb := NewTestJobDb() + job1 := newJob().WithQueued(true).WithPools([]string{"pool-1", "pool-2", "pool-3"}) + job2 := newJob().WithQueued(true).WithPools([]string{"pool-1", "pool-2"}) + job3 := newJob().WithQueued(true).WithPools([]string{"pool-1"}) + txn := jobDb.WriteTxn() + + err := txn.Upsert([]*Job{job1, job2, job3}) + require.NoError(t, err) + + assertEqual := func(expected []*Job, actual []*Job) { + sort.SliceStable(actual, func(i, j int) bool { return actual[i].id < actual[j].id }) + sort.SliceStable(expected, func(i, j int) bool { return expected[i].id < expected[j].id }) + assert.Equal(t, expected, actual) + } + + assertEqual([]*Job{job1, job2, job3}, txn.GetQueuedJobsByPool("pool-1")) + assertEqual([]*Job{job1, job2}, txn.GetQueuedJobsByPool("pool-2")) + assertEqual([]*Job{job1}, txn.GetQueuedJobsByPool("pool-3")) + assertEqual([]*Job{}, txn.GetQueuedJobsByPool("pool-4")) +} + func TestJobDb_TestHasQueuedJobs(t *testing.T) { jobDb := NewTestJobDb() job1 := newJob().WithNewRun("executor", "nodeId", "nodeName", "pool", 5) From 0d37b877578a3944fcd3bb0a1022a47d9734e5ea Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 1 Apr 2026 15:56:51 +0100 Subject: [PATCH 3/7] Better default Signed-off-by: JamesMurkin --- internal/scheduler/jobdb/jobdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 66c0ac90a42..a3b39f95526 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -867,7 +867,7 @@ func (txn *Txn) GetAll() []*Job { // GetQueuedJobsByPool returns all queued jobs against a given pool func (txn *Txn) GetQueuedJobsByPool(pool string) []*Job { - allJobs := make([]*Job, 0, txn.jobsById.Len()) + allJobs := make([]*Job, 0) for _, jobs := range txn.jobsByPoolAndQueue[pool] { allJobs = append(allJobs, jobs.Items()...) } From d0edb647a35d0e8a6758f087797077a9b8ff191c Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 1 Apr 2026 16:19:15 +0100 Subject: [PATCH 4/7] Track terminal jobs Signed-off-by: JamesMurkin --- internal/scheduler/jobdb/jobdb.go | 48 +++++++++++++- internal/scheduler/jobdb/jobdb_test.go | 66 ++++++++++++++++++- .../scheduler/scheduling/scheduling_algo.go | 1 + 3 files changed, 112 insertions(+), 3 deletions(-) diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index a3b39f95526..6ed2716391c 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -70,6 +70,7 @@ type JobDb struct { jobsByQueue map[string]immutable.SortedSet[*Job] jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job] leasedJobs *immutable.Set[*Job] + terminalJobs *immutable.Set[*Job] unvalidatedJobs *immutable.Set[*Job] // Configured priority classes. priorityClasses map[string]types.PriorityClass @@ -130,6 +131,7 @@ func NewJobDbWithSchedulingKeyGenerator( } unvalidatedJobs := immutable.NewSet[*Job](JobHasher{}) leasedJobs := immutable.NewSet[*Job](JobHasher{}) + terminalJobs := immutable.NewSet[*Job](JobHasher{}) return &JobDb{ jobsById: immutable.NewMap[string, *Job](nil), jobsByRunId: immutable.NewMap[string, string](nil), @@ -137,6 +139,7 @@ func NewJobDbWithSchedulingKeyGenerator( jobsByQueue: map[string]immutable.SortedSet[*Job]{}, jobsByPoolAndQueue: map[string]map[string]immutable.SortedSet[*Job]{}, leasedJobs: &leasedJobs, + terminalJobs: &terminalJobs, unvalidatedJobs: &unvalidatedJobs, priorityClasses: priorityClasses, defaultPriorityClass: defaultPriorityClass, @@ -165,6 +168,7 @@ func (jobDb *JobDb) Clone() *JobDb { jobsByQueue: maps.Clone(jobDb.jobsByQueue), jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue), leasedJobs: jobDb.leasedJobs, + terminalJobs: jobDb.terminalJobs, unvalidatedJobs: jobDb.unvalidatedJobs, priorityClasses: jobDb.priorityClasses, defaultPriorityClass: jobDb.defaultPriorityClass, @@ -323,6 +327,7 @@ func (jobDb *JobDb) ReadTxn() *Txn { jobsByQueue: jobDb.jobsByQueue, jobsByPoolAndQueue: jobDb.jobsByPoolAndQueue, leasedJobs: jobDb.leasedJobs, + terminalJobs: jobDb.terminalJobs, unvalidatedJobs: jobDb.unvalidatedJobs, active: true, jobDb: jobDb, @@ -344,6 +349,7 @@ func (jobDb *JobDb) WriteTxn() *Txn { jobsByQueue: maps.Clone(jobDb.jobsByQueue), jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue), leasedJobs: jobDb.leasedJobs, + terminalJobs: jobDb.terminalJobs, unvalidatedJobs: jobDb.unvalidatedJobs, active: true, jobDb: jobDb, @@ -384,6 +390,8 @@ type Txn struct { jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job] // Jobs that are currently leased leasedJobs *immutable.Set[*Job] + // Jobs that are currently in a terminal state + terminalJobs *immutable.Set[*Job] // Jobs that require submit checking unvalidatedJobs *immutable.Set[*Job] // The jobDb from which this transaction was created. @@ -405,6 +413,7 @@ func (txn *Txn) Commit() { txn.jobDb.jobsByQueue = txn.jobsByQueue txn.jobDb.jobsByPoolAndQueue = txn.jobsByPoolAndQueue txn.jobDb.leasedJobs = txn.leasedJobs + txn.jobDb.terminalJobs = txn.terminalJobs txn.jobDb.unvalidatedJobs = txn.unvalidatedJobs txn.active = false @@ -540,6 +549,11 @@ func (txn *Txn) Upsert(jobs []*Job) error { txn.leasedJobs = &newLeasedJobs } + if existingJob.InTerminalState() { + newTerminalJobs := txn.terminalJobs.Delete(existingJob) + txn.terminalJobs = &newTerminalJobs + } + if !existingJob.Validated() { newUnvalidatedJobs := txn.unvalidatedJobs.Delete(existingJob) txn.unvalidatedJobs = &newUnvalidatedJobs @@ -550,7 +564,7 @@ func (txn *Txn) Upsert(jobs []*Job) error { // Now need to insert jobs, runs and queuedJobs. This can be done in parallel. wg := sync.WaitGroup{} - wg.Add(6) + wg.Add(7) // jobs go func() { @@ -713,6 +727,30 @@ func (txn *Txn) Upsert(jobs []*Job) error { } }() + // Terminal jobs + go func() { + defer wg.Done() + if hasJobs { + for _, job := range jobs { + if job.InTerminalState() { + terminalJobs := txn.terminalJobs.Add(job) + txn.terminalJobs = &terminalJobs + } + } + } else { + terminalJobs := map[*Job]bool{} + + for _, job := range jobs { + if job.InTerminalState() { + terminalJobs[job] = true + } + } + + terminalJobsImmutable := immutable.NewSet[*Job](JobHasher{}, maps.Keys(terminalJobs)...) + txn.terminalJobs = &terminalJobsImmutable + } + }() + // Unvalidated jobs go func() { defer wg.Done() @@ -854,6 +892,11 @@ func (txn *Txn) GetAllLeasedJobs() []*Job { return txn.leasedJobs.Items() } +// GetAllTerminalJobs returns all terminal jobs in the database +func (txn *Txn) GetAllTerminalJobs() []*Job { + return txn.terminalJobs.Items() +} + // GetAll returns all jobs in the database. func (txn *Txn) GetAll() []*Job { allJobs := make([]*Job, 0, txn.jobsById.Len()) @@ -926,6 +969,9 @@ func (txn *Txn) delete(jobId string) { newLeasedJobs := txn.leasedJobs.Delete(job) txn.leasedJobs = &newLeasedJobs + newTerminalJobs := txn.terminalJobs.Delete(job) + txn.terminalJobs = &newTerminalJobs + newUnvalidatedJobs := txn.unvalidatedJobs.Delete(job) txn.unvalidatedJobs = &newUnvalidatedJobs } diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index ffd464f7034..9add27bd512 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -92,7 +92,7 @@ func TestJobDb_TestGetLeased(t *testing.T) { assert.Equal(t, expected, actual) } -func TestJobDb_Leased_Lifecycle(t *testing.T) { +func TestJobDb_LeasedJobs_Lifecycle(t *testing.T) { jobDb := NewTestJobDb() upsert := func(jobDb *JobDb, job *Job) { @@ -127,7 +127,7 @@ func TestJobDb_Leased_Lifecycle(t *testing.T) { assert.Empty(t, jobDb.ReadTxn().GetAllLeasedJobs()) } -func TestJobDb_Leased_Deleted(t *testing.T) { +func TestJobDb_LeasedJobs_Deleted(t *testing.T) { jobDb := NewTestJobDb() job1 := newJob().WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) txn := jobDb.WriteTxn() @@ -144,6 +144,68 @@ func TestJobDb_Leased_Deleted(t *testing.T) { assert.Empty(t, txn.GetAllLeasedJobs()) } +func TestJobDb_TestGetTerminalJobs(t *testing.T) { + jobDb := NewTestJobDb() + job1 := newJob().WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) + job2 := newJob().WithQueued(true) + job3 := newJob().WithQueued(false).WithSucceeded(true) + job4 := newJob().WithQueued(false).WithCancelled(true) + job5 := newJob().WithQueued(false).WithFailed(true) + job6 := newJob().WithQueued(true).WithFailed(true) + txn := jobDb.WriteTxn() + + err := txn.Upsert([]*Job{job1, job2, job3, job4, job5, job6}) + require.NoError(t, err) + + expected := []*Job{job3, job4, job5, job6} + actual := txn.GetAllTerminalJobs() + sort.SliceStable(actual, func(i, j int) bool { return actual[i].id < actual[j].id }) + sort.SliceStable(expected, func(i, j int) bool { return expected[i].id < expected[j].id }) + assert.Equal(t, expected, actual) +} + +func TestJobDb_TerminalJobs_Lifecycle(t *testing.T) { + jobDb := NewTestJobDb() + + upsert := func(jobDb *JobDb, job *Job) { + txn := jobDb.WriteTxn() + err := txn.Upsert([]*Job{job}) + require.NoError(t, err) + txn.Commit() + } + + job1 := newJob().WithQueued(true) + upsert(jobDb, job1) + assert.Empty(t, jobDb.ReadTxn().GetAllTerminalJobs()) + + // leased + job1 = job1.WithQueued(false).WithNewRun("executor", "nodeId", "nodeName", "pool", 5) + upsert(jobDb, job1) + assert.Empty(t, jobDb.ReadTxn().GetAllTerminalJobs()) + + // finished + job1 = job1.WithSucceeded(true) + upsert(jobDb, job1) + assert.NotEmpty(t, jobDb.ReadTxn().GetAllTerminalJobs()) +} + +func TestJobDb_TerminalJobs_Deleted(t *testing.T) { + jobDb := NewTestJobDb() + job1 := newJob().WithFailed(true) + txn := jobDb.WriteTxn() + + err := txn.Upsert([]*Job{job1}) + require.NoError(t, err) + + expected := []*Job{job1} + actual := txn.GetAllTerminalJobs() + assert.Equal(t, expected, actual) + + err = txn.BatchDelete([]string{job1.Id()}) + require.NoError(t, err) + assert.Empty(t, txn.GetAllTerminalJobs()) +} + func TestJobDb_TestGetUnvalidated(t *testing.T) { jobDb := NewTestJobDb() job1 := newJob().WithValidated(false) diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 1c7b3162b9e..ad1141d7162 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -372,6 +372,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con allPools = append(allPools, awayAllocationPools...) allJobs := txn.GetAllLeasedJobs() + allJobs = append(allJobs, txn.GetAllTerminalJobs()...) allQueuedJobs := []*jobdb.Job{} for _, pool := range allPools { allQueuedJobs = append(allQueuedJobs, txn.GetQueuedJobsByPool(pool)...) From 5310f7de2330fbf0fb6301ad45ab45b9e8010b45 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 1 Apr 2026 16:30:56 +0100 Subject: [PATCH 5/7] Comment + better unique checks Signed-off-by: JamesMurkin --- internal/scheduler/scheduling/scheduling_algo.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index ad1141d7162..71a600728c5 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -370,15 +370,24 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con allPools := []string{currentPool.Name} allPools = append(allPools, currentPool.AwayPools...) allPools = append(allPools, awayAllocationPools...) - + allPools = armadaslices.Unique(allPools) + + // We must include jobs in the following states: + // - Jobs active on the nodes of this pool + // - These are used to populate the jobdb, calculate demand/fairshare + // - This may include nodes from other pools, especially if the nodes pool has changed + // - Terminal jobs of this pool + // - For calculating short job penalty + // - Jobs queued against all pools + // - This is to calculate demand on both home and away pools allJobs := txn.GetAllLeasedJobs() allJobs = append(allJobs, txn.GetAllTerminalJobs()...) allQueuedJobs := []*jobdb.Job{} for _, pool := range allPools { allQueuedJobs = append(allQueuedJobs, txn.GetQueuedJobsByPool(pool)...) } - allUniqueQueuedJobs := armadaslices.UniqueBy(allQueuedJobs, func(job *jobdb.Job) string { return job.Id() }) - allJobs = append(allJobs, allUniqueQueuedJobs...) + allJobs = append(allJobs, allQueuedJobs...) + allJobs = armadaslices.UniqueBy(allJobs, func(job *jobdb.Job) string { return job.Id() }) jobSchedulingInfo, err := l.calculateJobSchedulingInfo(ctx, armadamaps.FromSlice(executors, From 2fb76ec25fbf7c62f692df388309fbf49a7a384f Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 1 Apr 2026 16:47:56 +0100 Subject: [PATCH 6/7] Simplify Signed-off-by: JamesMurkin --- internal/scheduler/scheduling/scheduling_algo.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 71a600728c5..f2bd74db0d6 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -382,11 +382,9 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con // - This is to calculate demand on both home and away pools allJobs := txn.GetAllLeasedJobs() allJobs = append(allJobs, txn.GetAllTerminalJobs()...) - allQueuedJobs := []*jobdb.Job{} for _, pool := range allPools { - allQueuedJobs = append(allQueuedJobs, txn.GetQueuedJobsByPool(pool)...) + allJobs = append(allJobs, txn.GetQueuedJobsByPool(pool)...) } - allJobs = append(allJobs, allQueuedJobs...) allJobs = armadaslices.UniqueBy(allJobs, func(job *jobdb.Job) string { return job.Id() }) jobSchedulingInfo, err := l.calculateJobSchedulingInfo(ctx, From ac00b008a6c54ae799fca713bcba0448d61a0df9 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 1 Apr 2026 16:58:18 +0100 Subject: [PATCH 7/7] Better comment Signed-off-by: JamesMurkin --- internal/scheduler/scheduling/scheduling_algo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index f2bd74db0d6..b19dacf5872 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -378,7 +378,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con // - This may include nodes from other pools, especially if the nodes pool has changed // - Terminal jobs of this pool // - For calculating short job penalty - // - Jobs queued against all pools + // - Jobs queued against home/away pools relevant to the pool being computed // - This is to calculate demand on both home and away pools allJobs := txn.GetAllLeasedJobs() allJobs = append(allJobs, txn.GetAllTerminalJobs()...)