Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/contributor/validator.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ The validator engine mounts snapshot and recipe data as ConfigMaps:
| `AICR_SNAPSHOT_PATH` | Override snapshot mount path |
| `AICR_RECIPE_PATH` | Override recipe mount path |
| `AICR_VALIDATOR_IMAGE_REGISTRY` | Override image registry prefix (set by user) |
| `AICR_NODE_SELECTOR` | User-provided node selector override for inner workloads (comma-separated `key=value` pairs). Set by the `--node-selector` CLI flag. Use `ctx.NodeSelector` to access the parsed value. |
| `AICR_TOLERATIONS` | User-provided toleration override for inner workloads (comma-separated `key=value:effect` entries). Set by the `--toleration` CLI flag. Use `ctx.Tolerations` to access the parsed value. |

## Context API

Expand All @@ -151,11 +153,32 @@ type Context struct {
Snapshot *snapshotter.Snapshot // Captured cluster state
Recipe *recipe.RecipeResult // Recipe with validation config
Namespace string // Validation namespace
NodeSelector map[string]string // User-provided node selector override (nil = use defaults)
Tolerations []corev1.Toleration // User-provided toleration override (nil = use defaults)
}
```

`LoadContext()` builds this from the container environment: reads mounted ConfigMaps, creates in-cluster K8s clients, and sets a timeout from `defaults.CheckExecutionTimeout`.

### Scheduling Overrides

When creating inner workloads (pods, Jobs, TrainJobs), check `ctx.NodeSelector` and `ctx.Tolerations` before applying hardcoded platform selectors. If non-nil, these override the default scheduling constraints to support clusters with non-standard GPU node labels or taints.

```go
// Apply scheduling overrides when creating inner workload pods.
nodeSelector := map[string]string{"cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb"}
if ctx.NodeSelector != nil {
nodeSelector = ctx.NodeSelector // user override replaces platform default
}

tolerations := []corev1.Toleration{{Operator: corev1.TolerationOpExists}}
if ctx.Tolerations != nil {
tolerations = ctx.Tolerations // user override replaces default tolerate-all
}
```

Validators that use `nodeName` pinning (e.g., nvidia-smi, DRA isolation) bypass the scheduler entirely and should not apply `ctx.NodeSelector`.

### Helper Methods

**`ctx.Timeout(d)`** — Create a child context with a specific timeout:
Expand Down
39 changes: 37 additions & 2 deletions docs/user/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,8 @@ aicr validate [flags]
| `--image-pull-secret` | | string[] | | Image pull secrets for private registries (repeatable) |
| `--job-name` | | string | aicr-validate | Name for the validation Job |
| `--service-account-name` | | string | aicr | ServiceAccount name for validation Job |
| `--node-selector` | | string[] | | Node selector for validation scheduling (key=value, repeatable) |
| `--toleration` | | string[] | | Tolerations for validation scheduling (key=value:effect, repeatable) |
| `--node-selector` | | string[] | | Override GPU node selection for validation workloads. Replaces platform-specific selectors (e.g., `cloud.google.com/gke-accelerator`, `node.kubernetes.io/instance-type`) on inner workloads like NCCL benchmark pods. Use when GPU nodes have non-standard labels. Does not affect the validator orchestrator Job. (format: key=value, repeatable) |
| `--toleration` | | string[] | | Override tolerations for validation workloads. Replaces the default tolerate-all policy on inner workloads like NCCL benchmark pods and conformance test pods. Does not affect the validator orchestrator Job. (format: key=value:effect, repeatable) |
| `--timeout` | | duration | 5m | Timeout for validation Job completion |
| `--no-cleanup` | | bool | false | Skip removal of Job and RBAC resources on completion |
| `--require-gpu` | | bool | false | Require GPU resources on the validation pod |
Expand Down Expand Up @@ -667,8 +667,43 @@ aicr validate \
--recipe recipe.yaml \
--snapshot cm://gpu-operator/aicr-snapshot \
--kubeconfig ~/.kube/prod-cluster

# Validate on a cluster with custom GPU node labels (non-standard labels that AICR doesn't
# recognize by default, e.g., using a custom node pool label instead of cloud-provider defaults)
aicr validate \
--recipe recipe.yaml \
--node-selector my-org/gpu-pool=true \
--phase performance

# Override both node selector and tolerations for a non-standard taint setup
aicr validate \
--recipe recipe.yaml \
--node-selector gpu-type=h100 \
--toleration gpu-type=h100:NoSchedule
```

**Workload Scheduling:**

The `--node-selector` and `--toleration` flags control scheduling for **validation
workloads** — the inner pods that validators create to test cluster functionality
(e.g., NCCL benchmark workers, conformance test pods). They do **not** affect the
validator orchestrator Job, which runs lightweight check logic and is placed on
CPU-preferred nodes automatically.

When `--node-selector` is provided, it replaces the platform-specific selectors
that validators use by default:

| Platform | Default Selector (replaced) | Use Case |
|----------|-----------------------------|----------|
| GKE | `cloud.google.com/gke-accelerator: nvidia-h100-mega-80gb` | Non-standard GPU node pool labels |
| EKS | `node.kubernetes.io/instance-type: <discovered>` | Custom node pool labels |

When `--toleration` is provided, it replaces the default tolerate-all policy
(`operator: Exists`) on workloads that need to land on tainted GPU nodes.

Validators that use `nodeName` pinning (nvidia-smi, DRA isolation test) or
DRA ResourceClaims for placement (gang scheduling) are not affected by these flags.

**Output Structure ([CTRF](https://ctrf.io/) JSON):**

Results are output in CTRF (Common Test Report Format) — an industry-standard schema for test reporting.
Expand Down
18 changes: 13 additions & 5 deletions pkg/cli/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ type validationConfig struct {
noCluster bool

// Scheduling
tolerations []corev1.Toleration
nodeSelector map[string]string
tolerations []corev1.Toleration

// Behavior
failOnError bool
Expand All @@ -236,6 +237,7 @@ func runValidation(
validator.WithImagePullSecrets(cfg.imagePullSecrets),
validator.WithNoCluster(cfg.noCluster),
validator.WithTolerations(cfg.tolerations),
validator.WithNodeSelector(cfg.nodeSelector),
)

results, err := v.ValidatePhases(ctx, cfg.phases, rec, snap)
Expand Down Expand Up @@ -379,13 +381,13 @@ func validateCmdFlags() []cli.Flag {
},
&cli.StringSliceFlag{
Name: "node-selector",
Usage: "Node selector for snapshot agent Job scheduling (format: key=value, can be repeated).",
Category: "Agent Deployment",
Usage: "Override GPU node selection for validation workloads (format: key=value, can be repeated). Replaces platform-specific selectors on inner workloads (e.g., NCCL benchmark pods). Use when GPU nodes have non-standard labels. Does not affect the validator orchestrator Job.",
Category: "Scheduling",
},
&cli.StringSliceFlag{
Name: "toleration",
Usage: "Toleration for snapshot agent and validation Job scheduling (format: key=value:effect). By default, all taints are tolerated.",
Category: "Agent Deployment",
Usage: "Override tolerations for validation workloads (format: key=value:effect, can be repeated). Replaces the default tolerate-all policy on inner workloads. Does not affect the validator orchestrator Job.",
Category: "Scheduling",
},
&cli.DurationFlag{
Name: "timeout",
Expand Down Expand Up @@ -540,6 +542,11 @@ Run validation without failing on check errors (informational mode):
return errors.Wrap(errors.ErrCodeInvalidRequest, "invalid toleration", tolErr)
}

nodeSelector, nsErr := snapshotter.ParseNodeSelectors(cmd.StringSlice("node-selector"))
if nsErr != nil {
return errors.Wrap(errors.ErrCodeInvalidRequest, "invalid node-selector", nsErr)
}

// Validate that requested phases are defined in the recipe.
if err := validatePhasesAgainstRecipe(phases, rec); err != nil {
return err
Expand All @@ -561,6 +568,7 @@ Run validation without failing on check errors (informational mode):
cleanup: !noCleanup,
imagePullSecrets: cmd.StringSlice("image-pull-secret"),
noCluster: cmd.Bool("no-cluster"),
nodeSelector: nodeSelector,
tolerations: tolerations,
evidenceDir: evidenceDir,
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/defaults/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ const (
// to reach the Established condition after installation.
TrainerCRDEstablishedTimeout = 2 * time.Minute

// TrainerControllerReadyTimeout is the time to wait for the Kubeflow Trainer
// controller-manager Deployment to have at least one ready replica after installation.
TrainerControllerReadyTimeout = 2 * time.Minute

// NCCLTrainJobTimeout is the maximum time to wait for the NCCL all-reduce TrainJob to complete.
NCCLTrainJobTimeout = 30 * time.Minute

Expand Down
3 changes: 3 additions & 0 deletions pkg/defaults/timeouts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func TestTimeoutConstants(t *testing.T) {
// Gang scheduling co-scheduling window
{"CoScheduleWindow", CoScheduleWindow, 10 * time.Second, 60 * time.Second},

// Trainer timeouts
{"TrainerControllerReadyTimeout", TrainerControllerReadyTimeout, 1 * time.Minute, 5 * time.Minute},

// Validator timeouts
{"ValidatorWaitBuffer", ValidatorWaitBuffer, 10 * time.Second, 60 * time.Second},
{"ValidatorDefaultTimeout", ValidatorDefaultTimeout, 1 * time.Minute, 15 * time.Minute},
Expand Down
5 changes: 5 additions & 0 deletions pkg/snapshotter/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ func ParseTolerations(tolerations []string) ([]corev1.Toleration, error) {

result := make([]corev1.Toleration, 0, len(tolerations))
for _, t := range tolerations {
if t == "*" {
result = append(result, corev1.Toleration{Operator: corev1.TolerationOpExists})
continue
}

// Format: key=value:effect or key:effect (for exists operator)
var key, value, effect string

Expand Down
14 changes: 14 additions & 0 deletions pkg/snapshotter/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ func TestParseTolerations(t *testing.T) {
wantLen: 0,
wantErr: true,
},
{
name: "wildcard toleration",
tolerations: []string{"*"},
wantLen: 1,
wantErr: false,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -391,6 +397,14 @@ func TestParseTolerationsOperator(t *testing.T) {
wantValue: "",
wantEffect: corev1.TaintEffectNoExecute,
},
{
name: "wildcard toleration produces Exists with empty key",
toleration: "*",
wantOperator: corev1.TolerationOpExists,
wantKey: "",
wantValue: "",
wantEffect: "",
},
}

for _, tt := range tests {
Expand Down
107 changes: 65 additions & 42 deletions pkg/validator/job/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/hex"
"fmt"
"log/slog"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -49,6 +50,7 @@ type Deployer struct {
jobName string // Unique name generated client-side (set by DeployJob)
imagePullSecrets []string
tolerations []corev1.Toleration
nodeSelector map[string]string // passed through to inner workloads via AICR_NODE_SELECTOR env var
}

// NewDeployer creates a Deployer for a single validator catalog entry.
Expand All @@ -60,6 +62,7 @@ func NewDeployer(
entry catalog.ValidatorEntry,
imagePullSecrets []string,
tolerations []corev1.Toleration,
nodeSelector map[string]string,
) *Deployer {

return &Deployer{
Expand All @@ -70,6 +73,7 @@ func NewDeployer(
entry: entry,
imagePullSecrets: imagePullSecrets,
tolerations: tolerations,
nodeSelector: nodeSelector,
}
}

Expand Down Expand Up @@ -149,13 +153,7 @@ func (d *Deployer) buildApplyConfig() *applybatchv1.JobApplyConfiguration {
labels.Component: labels.ValueValidation,
labels.Validator: d.entry.Name,
}).
WithSpec(applycorev1.PodSpec().
WithServiceAccountName(ServiceAccountName).
WithRestartPolicy(corev1.RestartPolicyNever).
WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())).
WithImagePullSecrets(d.buildImagePullSecretsApply()...).
WithTolerations(d.buildTolerationsApply()...).
WithAffinity(preferCPUNodeAffinityApply()).
WithSpec(d.buildPodSpecApply().
WithContainers(applycorev1.Container().
WithName("validator").
WithImage(d.entry.Image).
Expand Down Expand Up @@ -191,7 +189,7 @@ func (d *Deployer) buildApplyConfig() *applybatchv1.JobApplyConfiguration {
}

func (d *Deployer) buildEnvApply() []*applycorev1.EnvVarApplyConfiguration {
orchestratorEnvCount := 6
orchestratorEnvCount := 8
env := make([]*applycorev1.EnvVarApplyConfiguration, 0, orchestratorEnvCount+len(d.entry.Env))
env = append(env,
applycorev1.EnvVar().WithName("AICR_SNAPSHOT_PATH").WithValue("/data/snapshot/snapshot.yaml"),
Expand All @@ -203,28 +201,62 @@ func (d *Deployer) buildEnvApply() []*applycorev1.EnvVarApplyConfiguration {
WithValueFrom(applycorev1.EnvVarSource().
WithFieldRef(applycorev1.ObjectFieldSelector().WithFieldPath("metadata.namespace"))),
)
// Pass scheduling overrides to the validator container so it can apply them
// to the inner workloads it creates (e.g., NCCL benchmark pods). These env
// vars are NOT used to schedule the orchestrator Job itself.
if len(d.nodeSelector) > 0 {
env = append(env, applycorev1.EnvVar().WithName("AICR_NODE_SELECTOR").WithValue(serializeNodeSelector(d.nodeSelector)))
}
if len(d.tolerations) > 0 {
env = append(env, applycorev1.EnvVar().WithName("AICR_TOLERATIONS").WithValue(serializeTolerations(d.tolerations)))
}
for _, e := range d.entry.Env {
env = append(env, applycorev1.EnvVar().WithName(e.Name).WithValue(e.Value))
}
return env
}

// imagePullPolicy returns the appropriate pull policy based on the image reference.
// Local images (ko.local, kind.local, localhost) always use IfNotPresent since they
// are side-loaded into the cluster and cannot be pulled from a registry.
// Remote images with :latest tag use Always to avoid stale cached images.
func (d *Deployer) imagePullPolicy() corev1.PullPolicy {
img := d.entry.Image
// Local images side-loaded into kind/nvkind — never pull from registry.
if strings.HasPrefix(img, "ko.local") ||
strings.HasPrefix(img, "kind.local") ||
strings.HasPrefix(img, "localhost/") ||
strings.HasPrefix(img, "localhost:") {

return corev1.PullIfNotPresent
// serializeNodeSelector encodes a nodeSelector map as a comma-separated key=value string.
// Keys are sorted for deterministic output. This matches the format expected by
// snapshotter.ParseNodeSelectors on the receiving end.
func serializeNodeSelector(ns map[string]string) string {
keys := make([]string, 0, len(ns))
for k := range ns {
keys = append(keys, k)
}
slices.Sort(keys)
pairs := make([]string, 0, len(ns))
for _, k := range keys {
pairs = append(pairs, k+"="+ns[k])
}
return strings.Join(pairs, ",")
}

if strings.HasSuffix(img, ":latest") {
// serializeTolerations encodes tolerations as a comma-separated list.
// Format per toleration: key=value:Effect or key:Effect (for tolerations without value).
// This matches the format expected by snapshotter.ParseTolerations on the receiving end.
func serializeTolerations(tols []corev1.Toleration) string {
parts := make([]string, 0, len(tols))
for _, t := range tols {
var part string
switch {
case t.Key == "" && t.Operator == corev1.TolerationOpExists:
part = "*"
case t.Value != "":
part = t.Key + "=" + t.Value + ":" + string(t.Effect)
default:
part = t.Key + ":" + string(t.Effect)
}
parts = append(parts, part)
}
return strings.Join(parts, ",")
}

// imagePullPolicy returns Always when the image uses :latest tag (dev builds),
// PullIfNotPresent otherwise. This ensures dev builds always pull fresh images
// and avoids exec format errors from stale cached images on cluster nodes.
func (d *Deployer) imagePullPolicy() corev1.PullPolicy {
if strings.HasSuffix(d.entry.Image, ":latest") {
return corev1.PullAlways
}
return corev1.PullIfNotPresent
Expand All @@ -238,26 +270,17 @@ func (d *Deployer) buildImagePullSecretsApply() []*applycorev1.LocalObjectRefere
return refs
}

func (d *Deployer) buildTolerationsApply() []*applycorev1.TolerationApplyConfiguration {
tols := make([]*applycorev1.TolerationApplyConfiguration, 0, len(d.tolerations))
for i := range d.tolerations {
t := &d.tolerations[i]
tol := applycorev1.Toleration().WithOperator(t.Operator)
if t.Key != "" {
tol = tol.WithKey(t.Key)
}
if t.Value != "" {
tol = tol.WithValue(t.Value)
}
if t.Effect != "" {
tol = tol.WithEffect(t.Effect)
}
if t.TolerationSeconds != nil {
tol = tol.WithTolerationSeconds(*t.TolerationSeconds)
}
tols = append(tols, tol)
}
return tols
func (d *Deployer) buildPodSpecApply() *applycorev1.PodSpecApplyConfiguration {
// The orchestrator Job always tolerates all taints so it can schedule on any
// available CPU node. User-provided tolerations (--toleration flag) are forwarded
// to inner workloads via AICR_TOLERATIONS and do not affect orchestrator scheduling.
return applycorev1.PodSpec().
WithServiceAccountName(ServiceAccountName).
WithRestartPolicy(corev1.RestartPolicyNever).
WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())).
WithImagePullSecrets(d.buildImagePullSecretsApply()...).
WithTolerations(applycorev1.Toleration().WithOperator(corev1.TolerationOpExists)).
WithAffinity(preferCPUNodeAffinityApply())
}

// WaitForCompletion watches the Job until it reaches a terminal state
Expand Down
Loading
Loading