Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## [v0.13.0] - 2026-03-02
### Added
- Added `minSubGroup` field to PodGroup and SubGroup API to support specifying the minimum number of child SubGroups required for elastic gang scheduling, along with validation to prevent simultaneous use of `minSubGroup` and `minMember` fields (#TBD) by [KAI Dev Agent](https://github.com/run-ai/KAI-Agents)
- Added `global.nodeSelector` propagation from Helm values to Config CR, ensuring operator-created sub-component deployments (admission, binder, scheduler, pod-grouper, etc.) receive the configured nodeSelector [#1102](https://github.com/NVIDIA/KAI-Scheduler/pull/1102) [yuanchen8911](https://github.com/yuanchen8911)
- Added `plugins` and `actions` fields to SchedulingShard spec, allowing per-shard customization of scheduler plugin/action enablement, priority, and arguments [gshaibi](https://github.com/gshaibi)
- Added support for Kubeflow Trainer v2 TrainJob workloads via skipTopOwner grouper pattern
Expand Down
18 changes: 18 additions & 0 deletions deployments/kai-scheduler/crds/scheduling.run.ai_podgroups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ spec:
description: |-
MinMember defines the minimal number of members to run the PodGroup;
if there are not enough resources to start all required members, the scheduler will not start anyone.
Mutually exclusive with MinSubGroup.
format: int32
minimum: 1
type: integer
minSubGroup:
description: |-
MinSubGroup defines the minimal number of direct child SubGroups required for this PodGroup to be schedulable.
Only applicable when SubGroups are defined.
Mutually exclusive with MinMember.
format: int32
minimum: 1
type: integer
Expand Down Expand Up @@ -94,6 +103,15 @@ spec:
description: |-
MinMember defines the minimal number of members to run this SubGroup;
if there are not enough resources to start all required members, the scheduler will not start anyone.
Mutually exclusive with MinSubGroup.
format: int32
minimum: 0
type: integer
minSubGroup:
description: |-
MinSubGroup defines the minimal number of direct child SubGroups required for this SubGroup to be schedulable.
Only applicable when this SubGroup has child SubGroups.
Mutually exclusive with MinMember.
format: int32
minimum: 0
type: integer
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/scheduling/v2alpha2/podgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ import (
type PodGroupSpec struct {
// MinMember defines the minimal number of members to run the PodGroup;
// if there are not enough resources to start all required members, the scheduler will not start anyone.
// Mutually exclusive with MinSubGroup.
// +kubebuilder:validation:Nullable
// +kubebuilder:validation:Minimum=1
MinMember *int32 `json:"minMember,omitempty" protobuf:"varint,1,opt,name=minMember"`

// MinSubGroup defines the minimal number of direct child SubGroups required for this PodGroup to be schedulable.
// Only applicable when SubGroups are defined.
// Mutually exclusive with MinMember.
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Minimum=1
MinSubGroup *int32 `json:"minSubGroup,omitempty"`

// Queue defines the queue to allocate resource for PodGroup; if queue does not exist,
// the PodGroup will not be scheduled.
Queue string `json:"queue,omitempty" protobuf:"bytes,2,opt,name=queue"`
Expand Down Expand Up @@ -108,10 +116,18 @@ type SubGroup struct {

// MinMember defines the minimal number of members to run this SubGroup;
// if there are not enough resources to start all required members, the scheduler will not start anyone.
// Mutually exclusive with MinSubGroup.
// +kubebuilder:validation:Nullable
// +kubebuilder:validation:Minimum=0
MinMember *int32 `json:"minMember,omitempty" protobuf:"varint,2,opt,name=minMember"`

// MinSubGroup defines the minimal number of direct child SubGroups required for this SubGroup to be schedulable.
// Only applicable when this SubGroup has child SubGroups.
// Mutually exclusive with MinMember.
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Minimum=0
MinSubGroup *int32 `json:"minSubGroup,omitempty"`

// Parent is an optional attribute that specifies the name of the parent SubGroup.
// Must be a valid DNS label (RFC 1123).
// +kubebuilder:validation:Optional
Expand Down
226 changes: 182 additions & 44 deletions pkg/apis/scheduling/v2alpha2/podgroup_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"sort"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -29,11 +30,18 @@ func (_ *PodGroup) ValidateCreate(ctx context.Context, obj runtime.Object) (admi
}
logger.Info("validate create", "namespace", podGroup.Namespace, "name", podGroup.Name)

if err := validateSubGroups(podGroup.Spec.SubGroups); err != nil {
logger.Info("Subgroups validation failed",
"namespace", podGroup.Namespace, "name", podGroup.Name, "error", err)
return nil, err
validationErrors := validatePodGroupSpec(&podGroup.Spec)

if validationErrors.structuralError != nil {
logger.Info("PodGroup spec validation failed on structural error",
"namespace", podGroup.Namespace, "name", podGroup.Name, "error", validationErrors.structuralError)
return nil, validationErrors.structuralError
}
if len(validationErrors.minDefinitionErrors) > 0 {
return handleMinDefinitionErrors(ctx, validationErrors.minDefinitionErrors, podGroup,
[]error{&minSubGroupExceedsChildCountError{}})
}

return nil, nil
}

Expand All @@ -46,14 +54,52 @@ func (_ *PodGroup) ValidateUpdate(ctx context.Context, _ runtime.Object, newObj
}
logger.Info("validate update", "namespace", podGroup.Namespace, "name", podGroup.Name)

if err := validateSubGroups(podGroup.Spec.SubGroups); err != nil {
logger.Info("Subgroups validation failed",
"namespace", podGroup.Namespace, "name", podGroup.Name, "error", err)
return nil, err
validationErrors := validatePodGroupSpec(&podGroup.Spec)

if validationErrors.structuralError != nil {
logger.Info("PodGroup spec validation failed on structural error",
"namespace", podGroup.Namespace, "name", podGroup.Name, "error", validationErrors.structuralError)
return nil, validationErrors.structuralError
}
if len(validationErrors.minDefinitionErrors) > 0 {
return handleMinDefinitionErrors(ctx, validationErrors.minDefinitionErrors, podGroup,
[]error{&parentMinMemberError{}, &minSubGroupExceedsChildCountError{}})
}

return nil, nil
}

func handleMinDefinitionErrors(ctx context.Context,
minDefinitionErrors []error, podGroup *PodGroup, validationWarningTypes []error) (admission.Warnings, error) {
logger := log.FromContext(ctx)

var warnings admission.Warnings
var hardErrs []error
for _, e := range minDefinitionErrors {
isWarning := false
for _, warningType := range validationWarningTypes {
if errors.Is(e, warningType) {
warnings = append(warnings, e.Error())
isWarning = true
break
}
}
if !isWarning {
hardErrs = append(hardErrs, e)
}
}

if len(hardErrs) > 0 {
logger.Info("PodGroup spec validation failed on min definition errors",
"namespace", podGroup.Namespace, "name", podGroup.Name, "errors", hardErrs,
"warnings", warnings)
return warnings, errors.Join(hardErrs...)
}
logger.Info("PodGroup spec validation warning", "namespace", podGroup.Namespace, "name", podGroup.Name,
"warnings", warnings)
return warnings, nil
}

func (_ *PodGroup) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
logger := log.FromContext(ctx)
podGroup, ok := obj.(*PodGroup)
Expand All @@ -64,80 +110,172 @@ func (_ *PodGroup) ValidateDelete(ctx context.Context, obj runtime.Object) (admi
return nil, nil
}

func validateSubGroups(subGroups []SubGroup) error {
// validatePodGroupSpec validates the PodGroup spec including top-level minMember/minSubGroup
// mutual exclusivity and subgroup structural rules.
// Returns collected per-subgroup errors and a hard structural error.
func validatePodGroupSpec(spec *PodGroupSpec) *validationErrors {
if spec.MinMember != nil && spec.MinSubGroup != nil {
return &validationErrors{structuralError: &mutuallyExclusiveFieldsError{msg: fmt.Sprintf(
"minMember and minSubGroup are mutually exclusive: "+
"set minMember (%d) to schedule a fixed number of pods, or set minSubGroup to require a minimum number of child SubGroups, but not both",
*spec.MinMember)}}
}

validationErrors := validateSubGroups(spec.SubGroups)
if validationErrors.structuralError != nil {
return validationErrors
}

if spec.MinSubGroup != nil {
if *spec.MinSubGroup < 1 {
validationErrors.minDefinitionErrors = append(validationErrors.minDefinitionErrors,
&invalidMinSubGroupError{msg: "minSubGroup at the podgroup level must be equal to or greater than 1"})
return validationErrors
}
rootCount := countRootSubGroups(spec.SubGroups)
if int(*spec.MinSubGroup) > rootCount {
validationErrors.minDefinitionErrors = append(validationErrors.minDefinitionErrors,
&minSubGroupExceedsChildCountError{msg: fmt.Sprintf(
"minSubGroup (%d) exceeds the number of direct child SubGroups (%d)",
*spec.MinSubGroup, rootCount)})
}
}

return validationErrors
}

// validateSubGroups validates the subgroup list.
// Returns collected per-subgroup min-field errors and a hard structural error (duplicate name, missing parent, cycle).
func validateSubGroups(subGroups []SubGroup) *validationErrors {
subGroupMap := map[string]*SubGroup{}
for _, subGroup := range subGroups {
for i := range subGroups {
subGroup := &subGroups[i]
if subGroupMap[subGroup.Name] != nil {
return fmt.Errorf("duplicate subgroup name %s", subGroup.Name)
return &validationErrors{structuralError: &subGroupGraphError{msg: fmt.Sprintf("duplicate subgroup name %s", subGroup.Name)}}
}
subGroupMap[subGroup.Name] = &subGroup
subGroupMap[subGroup.Name] = subGroup
}

if err := validateParent(subGroupMap); err != nil {
return err
return &validationErrors{structuralError: err}
}

childrenByParent := subgroupChildrenByParent(subGroupMap)
childrenMap := buildChildrenMap(subGroupMap)

if err := validateLeafMinMembers(subGroupMap, childrenByParent); err != nil {
return err
if detectCycle(subGroupMap, childrenMap) {
return &validationErrors{structuralError: &subGroupGraphError{msg: "cycle detected in subgroups"}}
}

if detectCycle(subGroupMap, childrenByParent) {
return errors.New("cycle detected in subgroups")
// Sort SubGroup names for deterministic error reporting across API calls.
subGroupNames := make([]string, 0, len(subGroupMap))
for name := range subGroupMap {
subGroupNames = append(subGroupNames, name)
}
return nil
sort.Strings(subGroupNames)

subgroupsErrors := &validationErrors{}
for _, name := range subGroupNames {
subgroupValidationErrors := validateSubGroupMinFields(subGroupMap[name], childrenMap)
subgroupsErrors.minDefinitionErrors = append(subgroupsErrors.minDefinitionErrors,
subgroupValidationErrors.minDefinitionErrors...)
if subgroupValidationErrors.structuralError != nil {
subgroupsErrors.structuralError = subgroupValidationErrors.structuralError
return subgroupsErrors
}
}

return subgroupsErrors
}

func validateParent(subGroupMap map[string]*SubGroup) error {
for _, subGroup := range subGroupMap {
if subGroup.Parent == nil {
continue
// validateSubGroupMinFields returns all validation errors for minMember/minSubGroup on a single SubGroup.
func validateSubGroupMinFields(subGroup *SubGroup, childrenMap map[string][]string) validationErrors {
var minFieldsErrors []error

if subGroup.MinMember != nil && subGroup.MinSubGroup != nil {
return validationErrors{structuralError: &mutuallyExclusiveFieldsError{msg: fmt.Sprintf(
"subgroup %q: minMember and minSubGroup are mutually exclusive", subGroup.Name)}}
}

children := childrenMap[subGroup.Name]
isLeaf := len(children) == 0

if isLeaf {
if subGroup.MinSubGroup != nil {
minFieldsErrors = append(minFieldsErrors,
&invalidMinSubGroupError{msg: fmt.Sprintf("subgroup %q: minSubGroup cannot be set on a leaf SubGroup (no child SubGroups)", subGroup.Name)})
}
if _, exists := subGroupMap[*subGroup.Parent]; !exists {
return fmt.Errorf("parent %s of %s was not found", *subGroup.Parent, subGroup.Name)
if subGroup.MinMember == nil {
minFieldsErrors = append(minFieldsErrors, &missingMinMemberError{msg: fmt.Sprintf(
"subgroup %s: minMember is required", subGroup.Name)})
}
} else {
if subGroup.MinMember != nil {
minFieldsErrors = append(minFieldsErrors, &parentMinMemberError{msg: fmt.Sprintf(
"subgroup %q: minMember cannot be set on a mid-level SubGroup (has child SubGroups); use minSubGroup instead", subGroup.Name)})
}
if subGroup.MinSubGroup != nil {
if *subGroup.MinSubGroup <= 0 {
minFieldsErrors = append(minFieldsErrors, &invalidMinSubGroupError{msg: fmt.Sprintf(
"subgroup %q: minSubGroup must be greater than 0", subGroup.Name)})
} else if int(*subGroup.MinSubGroup) > len(children) {
minFieldsErrors = append(minFieldsErrors, &minSubGroupExceedsChildCountError{msg: fmt.Sprintf(
"subgroup %q: minSubGroup (%d) exceeds the number of direct child SubGroups (%d)",
subGroup.Name, *subGroup.MinSubGroup, len(children))})
}
}
}
return nil

return validationErrors{minDefinitionErrors: minFieldsErrors}
}

func subgroupChildrenByParent(subGroupMap map[string]*SubGroup) map[string][]string {
childrenByParent := make(map[string][]string, len(subGroupMap))
for _, subGroup := range subGroupMap {
parent := ""
if subGroup.Parent != nil {
parent = *subGroup.Parent
// buildChildrenMap returns a map from parent name to list of child SubGroup names.
func buildChildrenMap(subGroupMap map[string]*SubGroup) map[string][]string {
childrenMap := map[string][]string{}
for _, sg := range subGroupMap {
if sg.Parent != nil {
childrenMap[*sg.Parent] = append(childrenMap[*sg.Parent], sg.Name)
}
childrenByParent[parent] = append(childrenByParent[parent], subGroup.Name)
}
return childrenByParent
return childrenMap
}

func validateLeafMinMembers(subGroupMap map[string]*SubGroup, childrenByParent map[string][]string) error {
for name, subGroup := range subGroupMap {
if len(childrenByParent[name]) > 0 {
// countRootSubGroups returns the number of SubGroups with no parent (direct children of the PodGroup).
func countRootSubGroups(subGroups []SubGroup) int {
count := 0
for _, sg := range subGroups {
if sg.Parent == nil {
count++
}
}
return count
}

func validateParent(subGroupMap map[string]*SubGroup) error {
for _, subGroup := range subGroupMap {
if subGroup.Parent == nil {
continue
}
if subGroup.MinMember == nil {
return fmt.Errorf("subgroup %s: minMember is required", name)
if _, exists := subGroupMap[*subGroup.Parent]; !exists {
return &subGroupGraphError{msg: fmt.Sprintf(
"parent %s of %s was not found", *subGroup.Parent, subGroup.Name)}
}
}
return nil
}

func detectCycle(subGroupMap map[string]*SubGroup, childrenByParent map[string][]string) bool {
func detectCycle(subGroupMap map[string]*SubGroup, childrenMap map[string][]string) bool {
visited := map[string]bool{}
recStack := map[string]bool{}

for name := range subGroupMap {
if dfsCycleCheck(name, childrenByParent, visited, recStack) {
if dfsCycleCheck(name, childrenMap, visited, recStack) {
return true
}
}
return false
}

func dfsCycleCheck(node string, childrenByParent map[string][]string, visited, recStack map[string]bool) bool {
func dfsCycleCheck(node string, childrenMap map[string][]string, visited, recStack map[string]bool) bool {
if recStack[node] {
return true // cycle detected
}
Expand All @@ -147,9 +285,9 @@ func dfsCycleCheck(node string, childrenByParent map[string][]string, visited, r
visited[node] = true
recStack[node] = true

children := childrenByParent[node]
children := childrenMap[node]
for _, child := range children {
if dfsCycleCheck(child, childrenByParent, visited, recStack) {
if dfsCycleCheck(child, childrenMap, visited, recStack) {
return true
}
}
Expand Down
Loading
Loading