Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/common/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ func Unique[S ~[]E, E comparable](s S) S {
return rv
}

// UniqueBy returns a copy of s with duplicate elements removed based on the result of keyFunc(e),
// keeping only the first occurrence of each unique key.
func UniqueBy[S ~[]E, E any, K comparable](s S, keyFunc func(E) K) S {
if s == nil {
return nil
}
rv := make(S, 0)
seen := make(map[K]bool)
for _, v := range s {
key := keyFunc(v)
if !seen[key] {
rv = append(rv, v)
seen[key] = true
}
}
return rv
}

// GroupByFunc groups the elements e_1, ..., e_n of s into separate slices by keyFunc(e).
func GroupByFunc[S ~[]E, E any, K comparable](s S, keyFunc func(E) K) map[K]S {
rv := make(map[K]S)
Expand Down
81 changes: 81 additions & 0 deletions internal/common/slices/slices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,87 @@ func TestUnique(t *testing.T) {
}
}

func TestUniqueBy(t *testing.T) {
type item struct {
Id string
Name string
}

tests := map[string]struct {
input []item
keyFunc func(i item) string
expected []item
}{
"nil": {
input: nil,
keyFunc: func(i item) string { return i.Id },
expected: nil,
},
"empty": {
input: []item{},
keyFunc: func(i item) string { return i.Name },
expected: []item{},
},
"no duplicates": {
input: []item{
{Id: "1", Name: "Alice"},
{Id: "2", Name: "Bob"},
{Id: "3", Name: "Charlie"},
},
keyFunc: func(i item) string { return i.Id },
expected: []item{
{Id: "1", Name: "Alice"},
{Id: "2", Name: "Bob"},
{Id: "3", Name: "Charlie"},
},
},
"consecutive duplicates": {
input: []item{
{Id: "1", Name: "Alice"},
{Id: "2", Name: "Bob"},
{Id: "2", Name: "Bobby"},
},
keyFunc: func(i item) string { return i.Id },
expected: []item{
{Id: "1", Name: "Alice"},
{Id: "2", Name: "Bob"},
},
},
"non-consecutive duplicates": {
input: []item{
{Id: "2", Name: "Bob"},
{Id: "1", Name: "Alice"},
{Id: "2", Name: "Bobby"},
},
keyFunc: func(i item) string { return i.Id },
expected: []item{
{Id: "2", Name: "Bob"},
{Id: "1", Name: "Alice"},
},
},
"duplicate based on custom key function Name": {
input: []item{
{Id: "1", Name: "Alice"},
{Id: "2", Name: "Bob"},
{Id: "2", Name: "Bobby"},
},
keyFunc: func(i item) string { return i.Name },
expected: []item{
{Id: "1", Name: "Alice"},
{Id: "2", Name: "Bob"},
{Id: "2", Name: "Bobby"},
},
},
}

// Run all test cases
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, UniqueBy(tc.input, tc.keyFunc))
})
}
}

func TestFilter(t *testing.T) {
includeOver5 := func(val int) bool { return val > 5 }
input := []int{1, 3, 5, 7, 9}
Expand Down
5 changes: 5 additions & 0 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,11 @@ func (job *Job) WithValidated(validated bool) *Job {
return j
}

// Leased returns true if the job is currently leased
func (job *Job) Leased() bool {
return !job.queued && !job.InTerminalState() && job.LatestRun() != nil
}

// Validated returns true if the job has been validated
func (job *Job) Validated() bool {
return job.validated
Expand Down
16 changes: 16 additions & 0 deletions internal/scheduler/jobdb/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,22 @@ func TestJob_TestWithNewRun(t *testing.T) {
)
}

func TestJob_Leased(t *testing.T) {
leasedJob := baseJob.WithQueued(false).WithUpdatedRun(baseRun)
assert.True(t, leasedJob.Leased())

queuedJob := leasedJob.WithQueued(true)
assert.False(t, queuedJob.Leased())

// Terminal jobs
assert.False(t, leasedJob.WithSucceeded(true).Leased())
assert.False(t, leasedJob.WithFailed(true).Leased())
assert.False(t, leasedJob.WithCancelled(true).Leased())

jobWithoutRun := baseJob.WithQueued(false)
assert.False(t, jobWithoutRun.Leased())
}

func TestJob_TestWithUpdatedRun_NewRun(t *testing.T) {
jobWithRun := baseJob.WithUpdatedRun(baseRun)
assert.Equal(t, true, jobWithRun.HasRuns())
Expand Down
103 changes: 102 additions & 1 deletion internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type JobDb struct {
jobsByGangKey map[gangKey]immutable.Set[string]
jobsByQueue map[string]immutable.SortedSet[*Job]
jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job]
leasedJobs *immutable.Set[*Job]
terminalJobs *immutable.Set[*Job]
unvalidatedJobs *immutable.Set[*Job]
// Configured priority classes.
priorityClasses map[string]types.PriorityClass
Expand Down Expand Up @@ -128,12 +130,16 @@ func NewJobDbWithSchedulingKeyGenerator(
panic(fmt.Sprintf("unknown default priority class %s", defaultPriorityClassName))
}
unvalidatedJobs := immutable.NewSet[*Job](JobHasher{})
leasedJobs := immutable.NewSet[*Job](JobHasher{})
terminalJobs := immutable.NewSet[*Job](JobHasher{})
return &JobDb{
jobsById: immutable.NewMap[string, *Job](nil),
jobsByRunId: immutable.NewMap[string, string](nil),
jobsByGangKey: map[gangKey]immutable.Set[string]{},
jobsByQueue: map[string]immutable.SortedSet[*Job]{},
jobsByPoolAndQueue: map[string]map[string]immutable.SortedSet[*Job]{},
leasedJobs: &leasedJobs,
terminalJobs: &terminalJobs,
unvalidatedJobs: &unvalidatedJobs,
priorityClasses: priorityClasses,
defaultPriorityClass: defaultPriorityClass,
Expand Down Expand Up @@ -161,6 +167,8 @@ func (jobDb *JobDb) Clone() *JobDb {
jobsByGangKey: maps.Clone(jobDb.jobsByGangKey),
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue),
leasedJobs: jobDb.leasedJobs,
terminalJobs: jobDb.terminalJobs,
unvalidatedJobs: jobDb.unvalidatedJobs,
priorityClasses: jobDb.priorityClasses,
defaultPriorityClass: jobDb.defaultPriorityClass,
Expand Down Expand Up @@ -318,6 +326,8 @@ func (jobDb *JobDb) ReadTxn() *Txn {
jobsByGangKey: jobDb.jobsByGangKey,
jobsByQueue: jobDb.jobsByQueue,
jobsByPoolAndQueue: jobDb.jobsByPoolAndQueue,
leasedJobs: jobDb.leasedJobs,
terminalJobs: jobDb.terminalJobs,
unvalidatedJobs: jobDb.unvalidatedJobs,
active: true,
jobDb: jobDb,
Expand All @@ -338,6 +348,8 @@ func (jobDb *JobDb) WriteTxn() *Txn {
jobsByGangKey: maps.Clone(jobDb.jobsByGangKey),
jobsByQueue: maps.Clone(jobDb.jobsByQueue),
jobsByPoolAndQueue: deepClone(jobDb.jobsByPoolAndQueue),
leasedJobs: jobDb.leasedJobs,
terminalJobs: jobDb.terminalJobs,
unvalidatedJobs: jobDb.unvalidatedJobs,
active: true,
jobDb: jobDb,
Expand Down Expand Up @@ -376,6 +388,10 @@ type Txn struct {
// Queued jobs for each queue and pool.
// Stored as a set and needs sorting to determine the order they should be scheduled in.
jobsByPoolAndQueue map[string]map[string]immutable.SortedSet[*Job]
// Jobs that are currently leased
leasedJobs *immutable.Set[*Job]
// Jobs that are currently in a terminal state
terminalJobs *immutable.Set[*Job]
// Jobs that require submit checking
unvalidatedJobs *immutable.Set[*Job]
// The jobDb from which this transaction was created.
Expand All @@ -396,6 +412,8 @@ func (txn *Txn) Commit() {
txn.jobDb.jobsByGangKey = txn.jobsByGangKey
txn.jobDb.jobsByQueue = txn.jobsByQueue
txn.jobDb.jobsByPoolAndQueue = txn.jobsByPoolAndQueue
txn.jobDb.leasedJobs = txn.leasedJobs
txn.jobDb.terminalJobs = txn.terminalJobs
txn.jobDb.unvalidatedJobs = txn.unvalidatedJobs

txn.active = false
Expand Down Expand Up @@ -526,6 +544,16 @@ func (txn *Txn) Upsert(jobs []*Job) error {
txn.jobsByPoolAndQueue[pool][job.queue] = existingJobs.Delete(existingJob)
}

if existingJob.Leased() {
newLeasedJobs := txn.leasedJobs.Delete(existingJob)
txn.leasedJobs = &newLeasedJobs
}

if existingJob.InTerminalState() {
newTerminalJobs := txn.terminalJobs.Delete(existingJob)
txn.terminalJobs = &newTerminalJobs
}

if !existingJob.Validated() {
newUnvalidatedJobs := txn.unvalidatedJobs.Delete(existingJob)
txn.unvalidatedJobs = &newUnvalidatedJobs
Expand All @@ -536,7 +564,7 @@ func (txn *Txn) Upsert(jobs []*Job) error {

// Now need to insert jobs, runs and queuedJobs. This can be done in parallel.
wg := sync.WaitGroup{}
wg.Add(5)
wg.Add(7)

// jobs
go func() {
Expand Down Expand Up @@ -675,6 +703,54 @@ func (txn *Txn) Upsert(jobs []*Job) error {
}
}()

// Leased jobs
go func() {
defer wg.Done()
if hasJobs {
for _, job := range jobs {
if job.Leased() {
leasedJobs := txn.leasedJobs.Add(job)
txn.leasedJobs = &leasedJobs
}
}
} else {
leasedJobs := map[*Job]bool{}

for _, job := range jobs {
if job.Leased() {
leasedJobs[job] = true
}
}

leasedJobsImmutable := immutable.NewSet[*Job](JobHasher{}, maps.Keys(leasedJobs)...)
txn.leasedJobs = &leasedJobsImmutable
}
}()

// Terminal jobs
go func() {
defer wg.Done()
if hasJobs {
for _, job := range jobs {
if job.InTerminalState() {
terminalJobs := txn.terminalJobs.Add(job)
txn.terminalJobs = &terminalJobs
}
}
} else {
terminalJobs := map[*Job]bool{}

for _, job := range jobs {
if job.InTerminalState() {
terminalJobs[job] = true
}
}

terminalJobsImmutable := immutable.NewSet[*Job](JobHasher{}, maps.Keys(terminalJobs)...)
txn.terminalJobs = &terminalJobsImmutable
}
}()

// Unvalidated jobs
go func() {
defer wg.Done()
Expand Down Expand Up @@ -811,6 +887,16 @@ func (txn *Txn) UnvalidatedJobs() *immutable.SetIterator[*Job] {
return txn.unvalidatedJobs.Iterator()
}

// GetAllLeasedJobs returns all leased jobs in the database
func (txn *Txn) GetAllLeasedJobs() []*Job {
return txn.leasedJobs.Items()
}

// GetAllTerminalJobs returns all terminal jobs in the database
func (txn *Txn) GetAllTerminalJobs() []*Job {
return txn.terminalJobs.Items()
}

// GetAll returns all jobs in the database.
func (txn *Txn) GetAll() []*Job {
allJobs := make([]*Job, 0, txn.jobsById.Len())
Expand All @@ -822,6 +908,15 @@ func (txn *Txn) GetAll() []*Job {
return allJobs
}

// GetQueuedJobsByPool returns all queued jobs against a given pool
func (txn *Txn) GetQueuedJobsByPool(pool string) []*Job {
allJobs := make([]*Job, 0)
for _, jobs := range txn.jobsByPoolAndQueue[pool] {
allJobs = append(allJobs, jobs.Items()...)
}
return allJobs
}

// BatchDelete deletes the jobs with the given ids from the database.
// Any ids not in the database are ignored.
func (txn *Txn) BatchDelete(jobIds []string) error {
Expand Down Expand Up @@ -871,6 +966,12 @@ func (txn *Txn) delete(jobId string) {
}
}
}
newLeasedJobs := txn.leasedJobs.Delete(job)
txn.leasedJobs = &newLeasedJobs

newTerminalJobs := txn.terminalJobs.Delete(job)
txn.terminalJobs = &newTerminalJobs

newUnvalidatedJobs := txn.unvalidatedJobs.Delete(job)
txn.unvalidatedJobs = &newUnvalidatedJobs
}
Expand Down
Loading
Loading