Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
### Changed

### Fixed
- Fixed `windowSize` field in `SchedulingShard` CR to support Prometheus duration format (e.g. `1w`, `7d`). Previously, using `windowSize: 1w` as shown in the documentation caused the kai-operator to crash-loop with `time: unknown unit "w" in duration "1w"`.
- Race condition where `SyncForGpuGroup` could prematurely delete reservation pods when the informer cache had not yet propagated GPU group labels on recently-bound fraction pods. The binder now checks for active BindRequests referencing the GPU group before deleting a reservation pod.

## [v0.14.0] - 2026-03-30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ spec:
type: string
windowSize:
description: Window size of the usage. Default is 1 week.
pattern: ^(0|(([0-9]+)y)?(([0-9]+)w)?(([0-9]+)d)?(([0-9]+)h)?(([0-9]+)m)?(([0-9]+)s)?(([0-9]+)ms)?)$
type: string
windowType:
description: Window type for time-series aggregation. If not
Expand Down
4 changes: 3 additions & 1 deletion pkg/env-tests/timeaware/timeaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"time"

monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/prometheus/common/model"
"github.com/xyproto/randomstring"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -120,7 +122,7 @@ func setupControllers(backgroundCtx context.Context, cfg *rest.Config,
ClientType: "fake-with-history",
ConnectionString: "fake-connection",
UsageParams: &api.UsageParams{
WindowSize: &metav1.Duration{Duration: time.Second * time.Duration(*windowSize)},
WindowSize: monitoringv1.DurationPointer(model.Duration(time.Second * time.Duration(*windowSize)).String()),
FetchInterval: &metav1.Duration{Duration: time.Millisecond},
HalfLifePeriod: &metav1.Duration{Duration: time.Second * time.Duration(*halfLifePeriod)},
},
Expand Down
7 changes: 4 additions & 3 deletions pkg/operator/operands/scheduler/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/spf13/pflag"

"github.com/kai-scheduler/KAI-scheduler/cmd/scheduler/app/options"
Expand Down Expand Up @@ -691,7 +692,7 @@ tiers:
ConnectionString: "http://prometheus-operated.kai-scheduler.svc.cluster.local:9090",
UsageParams: &usagedbapi.UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 10 * time.Minute},
WindowSize: &metav1.Duration{Duration: 10 * time.Minute},
WindowSize: monitoringv1.DurationPointer("10m"),
WindowType: ptr.To(usagedbapi.SlidingWindow),
},
},
Expand Down Expand Up @@ -1151,7 +1152,7 @@ func TestGetUsageDBConfig(t *testing.T) {
ConnectionString: "http://prometheus:9090",
UsageParams: &usagedbapi.UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 10 * time.Minute},
WindowSize: &metav1.Duration{Duration: 20 * time.Minute},
WindowSize: monitoringv1.DurationPointer("20m"),
},
},
},
Expand All @@ -1162,7 +1163,7 @@ func TestGetUsageDBConfig(t *testing.T) {
assert.NotNil(t, result)
assert.NotNil(t, result.UsageParams)
assert.Equal(t, 10*time.Minute, result.UsageParams.HalfLifePeriod.Duration)
assert.Equal(t, 20*time.Minute, result.UsageParams.WindowSize.Duration)
assert.Equal(t, monitoringv1.Duration("20m"), *result.UsageParams.WindowSize)
assert.Equal(t, "http://prometheus:9090", result.ConnectionString)
},
},
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/cache/usagedb/api/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"time"

monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -17,7 +18,8 @@ func (p *UsageParams) SetDefaults() {
p.HalfLifePeriod = nil
}
if p.WindowSize == nil {
p.WindowSize = &metav1.Duration{Duration: time.Hour * 24 * 7}
windowSize := monitoringv1.Duration("1w")
p.WindowSize = &windowSize
}
if p.WindowType == nil {
windowType := SlidingWindow
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/cache/usagedb/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package api

import (
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/queue_info"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -44,7 +45,7 @@ type UsageParams struct {
// Half life period of the usage. If not set, or set to 0, the usage will not be decayed.
HalfLifePeriod *metav1.Duration `yaml:"halfLifePeriod,omitempty" json:"halfLifePeriod,omitempty"`
// Window size of the usage. Default is 1 week.
WindowSize *metav1.Duration `yaml:"windowSize,omitempty" json:"windowSize,omitempty"`
WindowSize *monitoringv1.Duration `yaml:"windowSize,omitempty" json:"windowSize,omitempty"`
// Window type for time-series aggregation. If not set, defaults to sliding.
WindowType *WindowType `yaml:"windowType,omitempty" json:"windowType,omitempty"`
// The start timestamp of the tumbling window. If not set, defaults to the current time.
Expand Down
39 changes: 20 additions & 19 deletions pkg/scheduler/cache/usagedb/api/usage_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -23,7 +24,7 @@ func TestUsageParams_SetDefaults(t *testing.T) {
input: &UsageParams{},
expected: &UsageParams{
HalfLifePeriod: nil, // should remain nil (disabled by default)
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
Expand All @@ -34,7 +35,7 @@ func TestUsageParams_SetDefaults(t *testing.T) {
},
expected: &UsageParams{
HalfLifePeriod: nil,
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
Expand All @@ -45,7 +46,7 @@ func TestUsageParams_SetDefaults(t *testing.T) {
},
expected: &UsageParams{
HalfLifePeriod: nil,
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
Expand All @@ -56,18 +57,18 @@ func TestUsageParams_SetDefaults(t *testing.T) {
},
expected: &UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 30 * time.Minute},
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
{
name: "params with window size set should preserve it",
input: &UsageParams{
WindowSize: &metav1.Duration{Duration: 2 * time.Hour},
WindowSize: monitoringv1.DurationPointer("2h"),
},
expected: &UsageParams{
HalfLifePeriod: nil,
WindowSize: &metav1.Duration{Duration: 2 * time.Hour},
WindowSize: monitoringv1.DurationPointer("2h"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
Expand All @@ -78,20 +79,20 @@ func TestUsageParams_SetDefaults(t *testing.T) {
},
expected: &UsageParams{
HalfLifePeriod: nil,
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{TumblingWindow}[0],
},
},
{
name: "all params set should preserve all",
input: &UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute},
WindowSize: &metav1.Duration{Duration: 3 * time.Hour},
WindowSize: monitoringv1.DurationPointer("3h"),
WindowType: &[]WindowType{TumblingWindow}[0],
},
expected: &UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute},
WindowSize: &metav1.Duration{Duration: 3 * time.Hour},
WindowSize: monitoringv1.DurationPointer("3h"),
WindowType: &[]WindowType{TumblingWindow}[0],
},
},
Expand All @@ -109,7 +110,7 @@ func TestUsageParams_SetDefaults(t *testing.T) {
}

require.NotNil(t, tt.input.WindowSize)
assert.Equal(t, tt.expected.WindowSize.Duration, tt.input.WindowSize.Duration)
assert.Equal(t, *tt.expected.WindowSize, *tt.input.WindowSize)

require.NotNil(t, tt.input.WindowType)
assert.Equal(t, *tt.expected.WindowType, *tt.input.WindowType)
Expand Down Expand Up @@ -163,7 +164,7 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) {
},
expected: &UsageParams{
HalfLifePeriod: nil,
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
Expand All @@ -176,7 +177,7 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) {
},
expected: &UsageParams{
HalfLifePeriod: nil,
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
Expand All @@ -191,7 +192,7 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) {
},
expected: &UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 30 * time.Minute},
WindowSize: &metav1.Duration{Duration: time.Hour * 24 * 7},
WindowSize: monitoringv1.DurationPointer("1w"),
WindowType: &[]WindowType{SlidingWindow}[0],
},
},
Expand All @@ -202,13 +203,13 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) {
ConnectionString: "http://localhost:9090",
UsageParams: &UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute},
WindowSize: &metav1.Duration{Duration: 2 * time.Hour},
WindowSize: monitoringv1.DurationPointer("2h"),
WindowType: &[]WindowType{TumblingWindow}[0],
},
},
expected: &UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: 45 * time.Minute},
WindowSize: &metav1.Duration{Duration: 2 * time.Hour},
WindowSize: monitoringv1.DurationPointer("2h"),
WindowType: &[]WindowType{TumblingWindow}[0],
},
},
Expand All @@ -227,7 +228,7 @@ func TestUsageDBConfig_GetUsageParams(t *testing.T) {
}

require.NotNil(t, result.WindowSize)
assert.Equal(t, tt.expected.WindowSize.Duration, result.WindowSize.Duration)
assert.Equal(t, *tt.expected.WindowSize, *result.WindowSize)

require.NotNil(t, result.WindowType)
assert.Equal(t, *tt.expected.WindowType, *result.WindowType)
Expand All @@ -250,7 +251,7 @@ func TestUsageDBConfig_GetUsageParams_ImmutableOriginal(t *testing.T) {
result := config.GetUsageParams()

// Modify the result
result.WindowSize = &metav1.Duration{Duration: 5 * time.Hour}
result.WindowSize = monitoringv1.DurationPointer("5h")

// Original should remain unchanged
assert.Nil(t, originalParams.WindowSize)
Expand Down Expand Up @@ -307,7 +308,7 @@ func TestUsageParams_ZeroValues(t *testing.T) {
// Test behavior with zero duration values
params := &UsageParams{
HalfLifePeriod: &metav1.Duration{Duration: time.Duration(0)},
WindowSize: &metav1.Duration{Duration: time.Duration(0)},
WindowSize: monitoringv1.DurationPointer("0s"),
}

params.SetDefaults()
Expand All @@ -316,7 +317,7 @@ func TestUsageParams_ZeroValues(t *testing.T) {

// Zero values should be preserved, not replaced with defaults
require.NotNil(t, params.WindowSize)
assert.Equal(t, time.Duration(0), params.WindowSize.Duration)
assert.Equal(t, monitoringv1.Duration("0s"), *params.WindowSize)

require.NotNil(t, params.WindowType)
assert.Equal(t, SlidingWindow, *params.WindowType)
Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/cache/usagedb/fake/fake_with_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package fake

import (
"fmt"
"math"
"sync"
"time"

"github.com/prometheus/common/model"

"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/queue_info"
Expand Down Expand Up @@ -49,7 +53,11 @@ func (f *FakeUsageDBClient) GetResourceUsage() (*queue_info.ClusterUsage, error)
usage := queue_info.NewClusterUsage()

var windowStart, windowEnd int
size := f.usageParams.WindowSize.Duration.Seconds()
windowSizeDuration, err := model.ParseDuration(string(*f.usageParams.WindowSize))
if err != nil {
return nil, fmt.Errorf("failed to parse windowSize %q: %w", *f.usageParams.WindowSize, err)
}
size := time.Duration(windowSizeDuration).Seconds()
l := len(f.allocationHistory)
if l == 0 {
return usage, nil
Expand Down
12 changes: 9 additions & 3 deletions pkg/scheduler/cache/usagedb/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (p *PrometheusClient) queryResourceUsage(
func (p *PrometheusClient) querySlidingTimeWindow(ctx context.Context, decayedAllocationMetric string) (model.Value, promv1.Warnings, error) {
usageQuery := fmt.Sprintf("sum_over_time((%s)[%s:%s])",
decayedAllocationMetric,
p.usageParams.WindowSize.Duration.String(),
string(*p.usageParams.WindowSize),
p.queryResolution.String(),
)

Expand Down Expand Up @@ -275,13 +275,19 @@ func (p *PrometheusClient) getLatestUsageResetTime_TumblingWindow(now time.Time)
return now
}

windowSize, err := model.ParseDuration(string(*p.usageParams.WindowSize))
if err != nil {
log.InfraLogger.V(3).Warnf("Failed to parse windowSize %q: %v, using current time as reset time", *p.usageParams.WindowSize, err)
return now
}

previousResetTime := startTime
currentResetTime := startTime.Add(p.usageParams.WindowSize.Duration)
currentResetTime := startTime.Add(time.Duration(windowSize))

// Keep finding the next reset time until it's after or equal to the current time
for currentResetTime.Before(now) {
previousResetTime = currentResetTime
currentResetTime = currentResetTime.Add(p.usageParams.WindowSize.Duration)
currentResetTime = currentResetTime.Add(time.Duration(windowSize))
}

return previousResetTime
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/cache/usagedb/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/aptible/supercronic/cronexpr"
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/cache/usagedb/api"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -390,7 +392,7 @@ func TestGetLatestUsageResetTime_TumblingWindow(t *testing.T) {
client := &PrometheusClient{
tumblingWindowStartTime: tt.startTime,
usageParams: &api.UsageParams{
WindowSize: &metav1.Duration{Duration: tt.windowSize},
WindowSize: monitoringv1.DurationPointer(model.Duration(tt.windowSize).String()),
},
}

Expand Down
5 changes: 4 additions & 1 deletion test/e2e/suites/timeaware/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"

kaiv1 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1"
kaiprometheus "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1/prometheus"
usagedbapi "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/cache/usagedb/api"
"github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/configurations"
testcontext "github.com/kai-scheduler/KAI-scheduler/test/e2e/modules/context"
"github.com/prometheus/common/model"
)

// timeAwareConfig holds configuration for time-aware fairness tests
Expand Down Expand Up @@ -66,7 +69,7 @@ func configureTimeAwareFairness(ctx context.Context, testCtx *testcontext.TestCo
shard.Spec.UsageDBConfig = &usagedbapi.UsageDBConfig{
ClientType: "prometheus",
UsageParams: &usagedbapi.UsageParams{
WindowSize: &metav1.Duration{Duration: config.WindowSize},
WindowSize: monitoringv1.DurationPointer(model.Duration(config.WindowSize).String()),
HalfLifePeriod: &metav1.Duration{Duration: config.HalfLifePeriod},
FetchInterval: &metav1.Duration{Duration: config.FetchInterval},
WindowType: &windowType,
Expand Down
Loading