diff --git a/docs/contributor/validator.md b/docs/contributor/validator.md index a663261e1..68684f927 100644 --- a/docs/contributor/validator.md +++ b/docs/contributor/validator.md @@ -136,6 +136,8 @@ The validator engine mounts snapshot and recipe data as ConfigMaps: | `AICR_SNAPSHOT_PATH` | Override snapshot mount path | | `AICR_RECIPE_PATH` | Override recipe mount path | | `AICR_VALIDATOR_IMAGE_REGISTRY` | Override image registry prefix (set by user) | +| `AICR_NODE_SELECTOR` | User-provided node selector override for inner workloads (comma-separated `key=value` pairs). Set by the `--node-selector` CLI flag. Use `ctx.NodeSelector` to access the parsed value. | +| `AICR_TOLERATIONS` | User-provided toleration override for inner workloads (comma-separated `key=value:effect` entries). Set by the `--toleration` CLI flag. Use `ctx.Tolerations` to access the parsed value. | ## Context API @@ -151,11 +153,32 @@ type Context struct { Snapshot *snapshotter.Snapshot // Captured cluster state Recipe *recipe.RecipeResult // Recipe with validation config Namespace string // Validation namespace + NodeSelector map[string]string // User-provided node selector override (nil = use defaults) + Tolerations []corev1.Toleration // User-provided toleration override (nil = use defaults) } ``` `LoadContext()` builds this from the container environment: reads mounted ConfigMaps, creates in-cluster K8s clients, and sets a timeout from `defaults.CheckExecutionTimeout`. +### Scheduling Overrides + +When creating inner workloads (pods, Jobs, TrainJobs), check `ctx.NodeSelector` and `ctx.Tolerations` before applying hardcoded platform selectors. If non-nil, these override the default scheduling constraints to support clusters with non-standard GPU node labels or taints. + +```go +// Apply scheduling overrides when creating inner workload pods. +nodeSelector := map[string]string{"cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb"} +if ctx.NodeSelector != nil { + nodeSelector = ctx.NodeSelector // user override replaces platform default +} + +tolerations := []corev1.Toleration{{Operator: corev1.TolerationOpExists}} +if ctx.Tolerations != nil { + tolerations = ctx.Tolerations // user override replaces default tolerate-all +} +``` + +Validators that use `nodeName` pinning (e.g., nvidia-smi, DRA isolation) bypass the scheduler entirely and should not apply `ctx.NodeSelector`. + ### Helper Methods **`ctx.Timeout(d)`** — Create a child context with a specific timeout: diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 9dc5d3ff7..4112262c0 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -564,8 +564,8 @@ aicr validate [flags] | `--image-pull-secret` | | string[] | | Image pull secrets for private registries (repeatable) | | `--job-name` | | string | aicr-validate | Name for the validation Job | | `--service-account-name` | | string | aicr | ServiceAccount name for validation Job | -| `--node-selector` | | string[] | | Node selector for validation scheduling (key=value, repeatable) | -| `--toleration` | | string[] | | Tolerations for validation scheduling (key=value:effect, repeatable) | +| `--node-selector` | | string[] | | Override GPU node selection for validation workloads. Replaces platform-specific selectors (e.g., `cloud.google.com/gke-accelerator`, `node.kubernetes.io/instance-type`) on inner workloads like NCCL benchmark pods. Use when GPU nodes have non-standard labels. Does not affect the validator orchestrator Job. (format: key=value, repeatable) | +| `--toleration` | | string[] | | Override tolerations for validation workloads. Replaces the default tolerate-all policy on inner workloads like NCCL benchmark pods and conformance test pods. Does not affect the validator orchestrator Job. (format: key=value:effect, repeatable) | | `--timeout` | | duration | 5m | Timeout for validation Job completion | | `--no-cleanup` | | bool | false | Skip removal of Job and RBAC resources on completion | | `--require-gpu` | | bool | false | Require GPU resources on the validation pod | @@ -667,8 +667,43 @@ aicr validate \ --recipe recipe.yaml \ --snapshot cm://gpu-operator/aicr-snapshot \ --kubeconfig ~/.kube/prod-cluster + +# Validate on a cluster with custom GPU node labels (non-standard labels that AICR doesn't +# recognize by default, e.g., using a custom node pool label instead of cloud-provider defaults) +aicr validate \ + --recipe recipe.yaml \ + --node-selector my-org/gpu-pool=true \ + --phase performance + +# Override both node selector and tolerations for a non-standard taint setup +aicr validate \ + --recipe recipe.yaml \ + --node-selector gpu-type=h100 \ + --toleration gpu-type=h100:NoSchedule ``` +**Workload Scheduling:** + +The `--node-selector` and `--toleration` flags control scheduling for **validation +workloads** — the inner pods that validators create to test cluster functionality +(e.g., NCCL benchmark workers, conformance test pods). They do **not** affect the +validator orchestrator Job, which runs lightweight check logic and is placed on +CPU-preferred nodes automatically. + +When `--node-selector` is provided, it replaces the platform-specific selectors +that validators use by default: + +| Platform | Default Selector (replaced) | Use Case | +|----------|-----------------------------|----------| +| GKE | `cloud.google.com/gke-accelerator: nvidia-h100-mega-80gb` | Non-standard GPU node pool labels | +| EKS | `node.kubernetes.io/instance-type: ` | Custom node pool labels | + +When `--toleration` is provided, it replaces the default tolerate-all policy +(`operator: Exists`) on workloads that need to land on tainted GPU nodes. + +Validators that use `nodeName` pinning (nvidia-smi, DRA isolation test) or +DRA ResourceClaims for placement (gang scheduling) are not affected by these flags. + **Output Structure ([CTRF](https://ctrf.io/) JSON):** Results are output in CTRF (Common Test Report Format) — an industry-standard schema for test reporting. diff --git a/pkg/cli/validate.go b/pkg/cli/validate.go index 66c682056..3064e0e1e 100644 --- a/pkg/cli/validate.go +++ b/pkg/cli/validate.go @@ -210,7 +210,8 @@ type validationConfig struct { noCluster bool // Scheduling - tolerations []corev1.Toleration + nodeSelector map[string]string + tolerations []corev1.Toleration // Behavior failOnError bool @@ -236,6 +237,7 @@ func runValidation( validator.WithImagePullSecrets(cfg.imagePullSecrets), validator.WithNoCluster(cfg.noCluster), validator.WithTolerations(cfg.tolerations), + validator.WithNodeSelector(cfg.nodeSelector), ) results, err := v.ValidatePhases(ctx, cfg.phases, rec, snap) @@ -379,13 +381,13 @@ func validateCmdFlags() []cli.Flag { }, &cli.StringSliceFlag{ Name: "node-selector", - Usage: "Node selector for snapshot agent Job scheduling (format: key=value, can be repeated).", - Category: "Agent Deployment", + Usage: "Override GPU node selection for validation workloads (format: key=value, can be repeated). Replaces platform-specific selectors on inner workloads (e.g., NCCL benchmark pods). Use when GPU nodes have non-standard labels. Does not affect the validator orchestrator Job.", + Category: "Scheduling", }, &cli.StringSliceFlag{ Name: "toleration", - Usage: "Toleration for snapshot agent and validation Job scheduling (format: key=value:effect). By default, all taints are tolerated.", - Category: "Agent Deployment", + Usage: "Override tolerations for validation workloads (format: key=value:effect, can be repeated). Replaces the default tolerate-all policy on inner workloads. Does not affect the validator orchestrator Job.", + Category: "Scheduling", }, &cli.DurationFlag{ Name: "timeout", @@ -540,6 +542,11 @@ Run validation without failing on check errors (informational mode): return errors.Wrap(errors.ErrCodeInvalidRequest, "invalid toleration", tolErr) } + nodeSelector, nsErr := snapshotter.ParseNodeSelectors(cmd.StringSlice("node-selector")) + if nsErr != nil { + return errors.Wrap(errors.ErrCodeInvalidRequest, "invalid node-selector", nsErr) + } + // Validate that requested phases are defined in the recipe. if err := validatePhasesAgainstRecipe(phases, rec); err != nil { return err @@ -561,6 +568,7 @@ Run validation without failing on check errors (informational mode): cleanup: !noCleanup, imagePullSecrets: cmd.StringSlice("image-pull-secret"), noCluster: cmd.Bool("no-cluster"), + nodeSelector: nodeSelector, tolerations: tolerations, evidenceDir: evidenceDir, }) diff --git a/pkg/defaults/timeouts.go b/pkg/defaults/timeouts.go index 558c042e4..3ed90afdb 100644 --- a/pkg/defaults/timeouts.go +++ b/pkg/defaults/timeouts.go @@ -227,6 +227,10 @@ const ( // to reach the Established condition after installation. TrainerCRDEstablishedTimeout = 2 * time.Minute + // TrainerControllerReadyTimeout is the time to wait for the Kubeflow Trainer + // controller-manager Deployment to have at least one ready replica after installation. + TrainerControllerReadyTimeout = 2 * time.Minute + // NCCLTrainJobTimeout is the maximum time to wait for the NCCL all-reduce TrainJob to complete. NCCLTrainJobTimeout = 30 * time.Minute diff --git a/pkg/defaults/timeouts_test.go b/pkg/defaults/timeouts_test.go index 8b545bd92..cbfe59d82 100644 --- a/pkg/defaults/timeouts_test.go +++ b/pkg/defaults/timeouts_test.go @@ -62,6 +62,9 @@ func TestTimeoutConstants(t *testing.T) { // Gang scheduling co-scheduling window {"CoScheduleWindow", CoScheduleWindow, 10 * time.Second, 60 * time.Second}, + // Trainer timeouts + {"TrainerControllerReadyTimeout", TrainerControllerReadyTimeout, 1 * time.Minute, 5 * time.Minute}, + // Validator timeouts {"ValidatorWaitBuffer", ValidatorWaitBuffer, 10 * time.Second, 60 * time.Second}, {"ValidatorDefaultTimeout", ValidatorDefaultTimeout, 1 * time.Minute, 15 * time.Minute}, diff --git a/pkg/snapshotter/agent.go b/pkg/snapshotter/agent.go index ccd087c92..08c57ac07 100644 --- a/pkg/snapshotter/agent.go +++ b/pkg/snapshotter/agent.go @@ -282,6 +282,11 @@ func ParseTolerations(tolerations []string) ([]corev1.Toleration, error) { result := make([]corev1.Toleration, 0, len(tolerations)) for _, t := range tolerations { + if t == "*" { + result = append(result, corev1.Toleration{Operator: corev1.TolerationOpExists}) + continue + } + // Format: key=value:effect or key:effect (for exists operator) var key, value, effect string diff --git a/pkg/snapshotter/agent_test.go b/pkg/snapshotter/agent_test.go index bb5f4895f..b818e0262 100644 --- a/pkg/snapshotter/agent_test.go +++ b/pkg/snapshotter/agent_test.go @@ -349,6 +349,12 @@ func TestParseTolerations(t *testing.T) { wantLen: 0, wantErr: true, }, + { + name: "wildcard toleration", + tolerations: []string{"*"}, + wantLen: 1, + wantErr: false, + }, } for _, tt := range tests { @@ -391,6 +397,14 @@ func TestParseTolerationsOperator(t *testing.T) { wantValue: "", wantEffect: corev1.TaintEffectNoExecute, }, + { + name: "wildcard toleration produces Exists with empty key", + toleration: "*", + wantOperator: corev1.TolerationOpExists, + wantKey: "", + wantValue: "", + wantEffect: "", + }, } for _, tt := range tests { diff --git a/pkg/validator/job/deployer.go b/pkg/validator/job/deployer.go index 5b2343d57..e63a75696 100644 --- a/pkg/validator/job/deployer.go +++ b/pkg/validator/job/deployer.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "fmt" "log/slog" + "slices" "strings" "time" @@ -49,6 +50,7 @@ type Deployer struct { jobName string // Unique name generated client-side (set by DeployJob) imagePullSecrets []string tolerations []corev1.Toleration + nodeSelector map[string]string // passed through to inner workloads via AICR_NODE_SELECTOR env var } // NewDeployer creates a Deployer for a single validator catalog entry. @@ -60,6 +62,7 @@ func NewDeployer( entry catalog.ValidatorEntry, imagePullSecrets []string, tolerations []corev1.Toleration, + nodeSelector map[string]string, ) *Deployer { return &Deployer{ @@ -70,6 +73,7 @@ func NewDeployer( entry: entry, imagePullSecrets: imagePullSecrets, tolerations: tolerations, + nodeSelector: nodeSelector, } } @@ -149,13 +153,7 @@ func (d *Deployer) buildApplyConfig() *applybatchv1.JobApplyConfiguration { labels.Component: labels.ValueValidation, labels.Validator: d.entry.Name, }). - WithSpec(applycorev1.PodSpec(). - WithServiceAccountName(ServiceAccountName). - WithRestartPolicy(corev1.RestartPolicyNever). - WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())). - WithImagePullSecrets(d.buildImagePullSecretsApply()...). - WithTolerations(d.buildTolerationsApply()...). - WithAffinity(preferCPUNodeAffinityApply()). + WithSpec(d.buildPodSpecApply(). WithContainers(applycorev1.Container(). WithName("validator"). WithImage(d.entry.Image). @@ -191,7 +189,7 @@ func (d *Deployer) buildApplyConfig() *applybatchv1.JobApplyConfiguration { } func (d *Deployer) buildEnvApply() []*applycorev1.EnvVarApplyConfiguration { - orchestratorEnvCount := 6 + orchestratorEnvCount := 8 env := make([]*applycorev1.EnvVarApplyConfiguration, 0, orchestratorEnvCount+len(d.entry.Env)) env = append(env, applycorev1.EnvVar().WithName("AICR_SNAPSHOT_PATH").WithValue("/data/snapshot/snapshot.yaml"), @@ -203,28 +201,62 @@ func (d *Deployer) buildEnvApply() []*applycorev1.EnvVarApplyConfiguration { WithValueFrom(applycorev1.EnvVarSource(). WithFieldRef(applycorev1.ObjectFieldSelector().WithFieldPath("metadata.namespace"))), ) + // Pass scheduling overrides to the validator container so it can apply them + // to the inner workloads it creates (e.g., NCCL benchmark pods). These env + // vars are NOT used to schedule the orchestrator Job itself. + if len(d.nodeSelector) > 0 { + env = append(env, applycorev1.EnvVar().WithName("AICR_NODE_SELECTOR").WithValue(serializeNodeSelector(d.nodeSelector))) + } + if len(d.tolerations) > 0 { + env = append(env, applycorev1.EnvVar().WithName("AICR_TOLERATIONS").WithValue(serializeTolerations(d.tolerations))) + } for _, e := range d.entry.Env { env = append(env, applycorev1.EnvVar().WithName(e.Name).WithValue(e.Value)) } return env } -// imagePullPolicy returns the appropriate pull policy based on the image reference. -// Local images (ko.local, kind.local, localhost) always use IfNotPresent since they -// are side-loaded into the cluster and cannot be pulled from a registry. -// Remote images with :latest tag use Always to avoid stale cached images. -func (d *Deployer) imagePullPolicy() corev1.PullPolicy { - img := d.entry.Image - // Local images side-loaded into kind/nvkind — never pull from registry. - if strings.HasPrefix(img, "ko.local") || - strings.HasPrefix(img, "kind.local") || - strings.HasPrefix(img, "localhost/") || - strings.HasPrefix(img, "localhost:") { - - return corev1.PullIfNotPresent +// serializeNodeSelector encodes a nodeSelector map as a comma-separated key=value string. +// Keys are sorted for deterministic output. This matches the format expected by +// snapshotter.ParseNodeSelectors on the receiving end. +func serializeNodeSelector(ns map[string]string) string { + keys := make([]string, 0, len(ns)) + for k := range ns { + keys = append(keys, k) + } + slices.Sort(keys) + pairs := make([]string, 0, len(ns)) + for _, k := range keys { + pairs = append(pairs, k+"="+ns[k]) } + return strings.Join(pairs, ",") +} - if strings.HasSuffix(img, ":latest") { +// serializeTolerations encodes tolerations as a comma-separated list. +// Format per toleration: key=value:Effect or key:Effect (for tolerations without value). +// This matches the format expected by snapshotter.ParseTolerations on the receiving end. +func serializeTolerations(tols []corev1.Toleration) string { + parts := make([]string, 0, len(tols)) + for _, t := range tols { + var part string + switch { + case t.Key == "" && t.Operator == corev1.TolerationOpExists: + part = "*" + case t.Value != "": + part = t.Key + "=" + t.Value + ":" + string(t.Effect) + default: + part = t.Key + ":" + string(t.Effect) + } + parts = append(parts, part) + } + return strings.Join(parts, ",") +} + +// imagePullPolicy returns Always when the image uses :latest tag (dev builds), +// PullIfNotPresent otherwise. This ensures dev builds always pull fresh images +// and avoids exec format errors from stale cached images on cluster nodes. +func (d *Deployer) imagePullPolicy() corev1.PullPolicy { + if strings.HasSuffix(d.entry.Image, ":latest") { return corev1.PullAlways } return corev1.PullIfNotPresent @@ -238,26 +270,17 @@ func (d *Deployer) buildImagePullSecretsApply() []*applycorev1.LocalObjectRefere return refs } -func (d *Deployer) buildTolerationsApply() []*applycorev1.TolerationApplyConfiguration { - tols := make([]*applycorev1.TolerationApplyConfiguration, 0, len(d.tolerations)) - for i := range d.tolerations { - t := &d.tolerations[i] - tol := applycorev1.Toleration().WithOperator(t.Operator) - if t.Key != "" { - tol = tol.WithKey(t.Key) - } - if t.Value != "" { - tol = tol.WithValue(t.Value) - } - if t.Effect != "" { - tol = tol.WithEffect(t.Effect) - } - if t.TolerationSeconds != nil { - tol = tol.WithTolerationSeconds(*t.TolerationSeconds) - } - tols = append(tols, tol) - } - return tols +func (d *Deployer) buildPodSpecApply() *applycorev1.PodSpecApplyConfiguration { + // The orchestrator Job always tolerates all taints so it can schedule on any + // available CPU node. User-provided tolerations (--toleration flag) are forwarded + // to inner workloads via AICR_TOLERATIONS and do not affect orchestrator scheduling. + return applycorev1.PodSpec(). + WithServiceAccountName(ServiceAccountName). + WithRestartPolicy(corev1.RestartPolicyNever). + WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())). + WithImagePullSecrets(d.buildImagePullSecretsApply()...). + WithTolerations(applycorev1.Toleration().WithOperator(corev1.TolerationOpExists)). + WithAffinity(preferCPUNodeAffinityApply()) } // WaitForCompletion watches the Job until it reaches a terminal state diff --git a/pkg/validator/job/deployer_test.go b/pkg/validator/job/deployer_test.go index 3fecfbc76..6d84672e5 100644 --- a/pkg/validator/job/deployer_test.go +++ b/pkg/validator/job/deployer_test.go @@ -55,7 +55,7 @@ func deployAndGet(t *testing.T, d *Deployer) *batchv1.Job { } func TestJobNameEmptyBeforeDeploy(t *testing.T) { - d := NewDeployer(nil, nil, "default", "run123", testEntry(), nil, nil) + d := NewDeployer(nil, nil, "default", "run123", testEntry(), nil, nil, nil) if d.JobName() != "" { t.Errorf("JobName() before deploy = %q, want empty", d.JobName()) } @@ -63,7 +63,7 @@ func TestJobNameEmptyBeforeDeploy(t *testing.T) { func TestGenerateJobName(t *testing.T) { ns := createUniqueNamespace(t) - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) job := deployAndGet(t, d) if !strings.HasPrefix(job.Name, "aicr-gpu-operator-health-") { @@ -78,8 +78,8 @@ func TestDeployJobUniqueNames(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d1 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) - d2 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d1 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) + d2 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d1.DeployJob(ctx); err != nil { t.Fatalf("first DeployJob() failed: %v", err) @@ -95,7 +95,7 @@ func TestDeployJobUniqueNames(t *testing.T) { func TestDeployJobSSAFieldManager(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) found := false for _, ref := range job.ManagedFields { @@ -111,7 +111,7 @@ func TestDeployJobSSAFieldManager(t *testing.T) { func TestDeployJobLabels(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) expectedLabels := map[string]string{ "app.kubernetes.io/name": "aicr", @@ -138,7 +138,7 @@ func TestDeployJobLabels(t *testing.T) { func TestDeployJobTimeouts(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) if job.Spec.ActiveDeadlineSeconds == nil || *job.Spec.ActiveDeadlineSeconds != 120 { t.Errorf("ActiveDeadlineSeconds = %v, want 120", job.Spec.ActiveDeadlineSeconds) @@ -163,7 +163,7 @@ func TestDeployJobDefaultTimeout(t *testing.T) { ns := createUniqueNamespace(t) entry := testEntry() entry.Timeout = 0 - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil, nil)) expected := int64(defaults.ValidatorDefaultTimeout.Seconds()) if job.Spec.ActiveDeadlineSeconds == nil || *job.Spec.ActiveDeadlineSeconds != expected { @@ -173,7 +173,7 @@ func TestDeployJobDefaultTimeout(t *testing.T) { func TestDeployJobContainer(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) containers := job.Spec.Template.Spec.Containers if len(containers) != 1 { @@ -203,7 +203,7 @@ func TestDeployJobContainer(t *testing.T) { func TestDeployJobResources(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) c := job.Spec.Template.Spec.Containers[0] if c.Resources.Requests.Cpu().String() != "1" { @@ -222,7 +222,7 @@ func TestDeployJobResources(t *testing.T) { func TestDeployJobEnvVars(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) env := job.Spec.Template.Spec.Containers[0].Env envMap := make(map[string]corev1.EnvVar) @@ -258,7 +258,7 @@ func TestDeployJobEnvVars(t *testing.T) { func TestDeployJobVolumes(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) volumes := job.Spec.Template.Spec.Volumes if len(volumes) != 2 { @@ -283,7 +283,7 @@ func TestDeployJobVolumes(t *testing.T) { func TestDeployJobAffinity(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) affinity := job.Spec.Template.Spec.Affinity if affinity == nil || affinity.NodeAffinity == nil { @@ -307,7 +307,7 @@ func TestDeployJobAffinity(t *testing.T) { func TestDeployJobImagePullSecrets(t *testing.T) { ns := createUniqueNamespace(t) secrets := []string{"registry-creds", "other-secret"} - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), secrets, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), secrets, nil, nil)) ips := job.Spec.Template.Spec.ImagePullSecrets if len(ips) != 2 { @@ -318,20 +318,86 @@ func TestDeployJobImagePullSecrets(t *testing.T) { } } -func TestDeployJobTolerations(t *testing.T) { +func TestDeployJobOrchestratorToleratesTolerateAll(t *testing.T) { + // The orchestrator Job must always have tolerate-all so it can schedule on + // any CPU node, regardless of what tolerations are passed for inner workloads. + tests := []struct { + name string + tolerations []corev1.Toleration + }{ + {"nil tolerations", nil}, + {"narrow GPU toleration", []corev1.Toleration{{Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}}}, + {"explicit tolerate-all", []corev1.Toleration{{Operator: corev1.TolerationOpExists}}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ns := createUniqueNamespace(t) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tt.tolerations, nil)) + tols := job.Spec.Template.Spec.Tolerations + if len(tols) != 1 || tols[0].Operator != corev1.TolerationOpExists || tols[0].Key != "" { + t.Errorf("orchestrator tolerations = %v, want single tolerate-all {Operator: Exists}", tols) + } + }) + } +} + +func TestDeployJobNodeSelectorEnvVar(t *testing.T) { + ns := createUniqueNamespace(t) + // Use a single-key selector to avoid map ordering issues in serialization. + nodeSelector := map[string]string{"my-org/gpu-pool": "true"} + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nodeSelector)) + + env := job.Spec.Template.Spec.Containers[0].Env + envMap := make(map[string]corev1.EnvVar) + for _, e := range env { + envMap[e.Name] = e + } + + // AICR_NODE_SELECTOR must be set so the validator container can apply it to inner workloads. + if envMap["AICR_NODE_SELECTOR"].Value != "my-org/gpu-pool=true" { + t.Errorf("AICR_NODE_SELECTOR = %q, want %q", envMap["AICR_NODE_SELECTOR"].Value, "my-org/gpu-pool=true") + } + + // The orchestrator Job pod spec must NOT have a nodeSelector — scheduling of the + // orchestrator is handled by preferCPUNodeAffinityApply(), not the user flag. + if len(job.Spec.Template.Spec.NodeSelector) != 0 { + t.Errorf("orchestrator pod spec nodeSelector should be empty, got %v", job.Spec.Template.Spec.NodeSelector) + } +} + +func TestDeployJobNodeSelectorEnvVarAbsent(t *testing.T) { + ns := createUniqueNamespace(t) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) + + for _, e := range job.Spec.Template.Spec.Containers[0].Env { + if e.Name == "AICR_NODE_SELECTOR" { + t.Errorf("AICR_NODE_SELECTOR should be absent when nodeSelector is nil, got %q", e.Value) + } + } +} + +func TestDeployJobTolerationsEnvVar(t *testing.T) { ns := createUniqueNamespace(t) - tolerations := []corev1.Toleration{{Operator: corev1.TolerationOpExists}} - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tolerations)) + tolerations := []corev1.Toleration{ + {Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + } + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tolerations, nil)) + + env := job.Spec.Template.Spec.Containers[0].Env + envMap := make(map[string]corev1.EnvVar) + for _, e := range env { + envMap[e.Name] = e + } - tols := job.Spec.Template.Spec.Tolerations - if len(tols) != 1 || tols[0].Operator != corev1.TolerationOpExists { - t.Errorf("tolerations = %v, want tolerate-all", tols) + // AICR_TOLERATIONS must be set so validators can apply it to inner workloads. + if envMap["AICR_TOLERATIONS"].Value != "gpu-type=h100:NoSchedule" { + t.Errorf("AICR_TOLERATIONS = %q, want %q", envMap["AICR_TOLERATIONS"].Value, "gpu-type=h100:NoSchedule") } } func TestDeployJobPodSpec(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) podSpec := job.Spec.Template.Spec if podSpec.ServiceAccountName != ServiceAccountName { @@ -346,7 +412,7 @@ func TestCleanupJob(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(ctx); err != nil { t.Fatalf("DeployJob() failed: %v", err) } @@ -367,7 +433,7 @@ func TestCleanupJob(t *testing.T) { } func TestCleanupJobNotFound(t *testing.T) { - d := NewDeployer(testClientset, nil, "default", "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, nil, "default", "run1", testEntry(), nil, nil, nil) // jobName is empty — CleanupJob should return nil if err := d.CleanupJob(context.Background()); err != nil { t.Fatalf("CleanupJob() on empty jobName should not error, got: %v", err) @@ -443,7 +509,7 @@ func TestWaitForCompletionFastPath(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(ctx); err != nil { t.Fatalf("DeployJob() failed: %v", err) } @@ -481,7 +547,7 @@ func TestWaitForCompletionFastPathFailed(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(ctx); err != nil { t.Fatalf("DeployJob() failed: %v", err) } @@ -507,7 +573,7 @@ func TestWaitForCompletionFastPathFailed(t *testing.T) { func TestWaitForCompletionJobNotFound(t *testing.T) { ns := createUniqueNamespace(t) - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) d.jobName = "nonexistent-job" err := d.WaitForCompletion(context.Background(), 1*time.Minute) @@ -524,11 +590,6 @@ func TestImagePullPolicy(t *testing.T) { }{ {"latest tag uses Always", "ghcr.io/nvidia/aicr-validators/conformance:latest", corev1.PullAlways}, {"versioned tag uses IfNotPresent", "ghcr.io/nvidia/aicr-validators/conformance:v1.0.0", corev1.PullIfNotPresent}, - {"ko.local uses IfNotPresent", "ko.local:smoke-test", corev1.PullIfNotPresent}, - {"ko.local latest uses IfNotPresent", "ko.local:latest", corev1.PullIfNotPresent}, - {"kind.local uses IfNotPresent", "kind.local/validator:latest", corev1.PullIfNotPresent}, - {"localhost registry uses IfNotPresent", "localhost:5000/validator:latest", corev1.PullIfNotPresent}, - {"localhost path uses IfNotPresent", "localhost/validator:latest", corev1.PullIfNotPresent}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -546,7 +607,7 @@ func TestImagePullPolicy(t *testing.T) { func TestWaitForCompletionTimeout(t *testing.T) { ns := createUniqueNamespace(t) - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(context.Background()); err != nil { t.Fatalf("DeployJob() failed: %v", err) } diff --git a/pkg/validator/job/result_test.go b/pkg/validator/job/result_test.go index 82e3c233b..5306bd64f 100644 --- a/pkg/validator/job/result_test.go +++ b/pkg/validator/job/result_test.go @@ -60,7 +60,7 @@ func createPodForJob(t *testing.T, ns, jobName string, status corev1.PodStatus) // deployTestJob deploys a Job via envtest and returns the Deployer. func deployTestJob(t *testing.T, ns string, entry catalog.ValidatorEntry) *Deployer { t.Helper() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil, nil) if err := d.DeployJob(context.Background()); err != nil { t.Fatalf("DeployJob() failed: %v", err) } diff --git a/pkg/validator/options.go b/pkg/validator/options.go index 55975d1c1..02c4f7f0c 100644 --- a/pkg/validator/options.go +++ b/pkg/validator/options.go @@ -65,10 +65,20 @@ func WithNoCluster(noCluster bool) Option { } } -// WithTolerations sets tolerations for validator Jobs. -// Default: tolerate-all. +// WithTolerations sets tolerations to override inner workload scheduling. +// When set, validators pass these tolerations to the workloads they create (e.g., NCCL +// benchmark pods), replacing default tolerate-all policy. Does not affect the orchestrator Job. func WithTolerations(tolerations []corev1.Toleration) Option { return func(v *Validator) { v.Tolerations = tolerations } } + +// WithNodeSelector sets node selector labels to override inner workload scheduling. +// When set, validators pass these selectors to the workloads they create (e.g., NCCL +// benchmark pods), replacing platform-specific defaults. Does not affect the orchestrator Job. +func WithNodeSelector(nodeSelector map[string]string) Option { + return func(v *Validator) { + v.NodeSelector = nodeSelector + } +} diff --git a/pkg/validator/types.go b/pkg/validator/types.go index 8f467c3f0..72856af51 100644 --- a/pkg/validator/types.go +++ b/pkg/validator/types.go @@ -46,8 +46,15 @@ type Validator struct { // NoCluster controls whether to skip cluster operations (dry-run mode). NoCluster bool - // Tolerations are applied to validator Jobs for scheduling. + // Tolerations are passed to validation workloads (e.g., NCCL benchmark pods) + // to override their default scheduling constraints. Does not affect the + // orchestrator Job itself. Tolerations []corev1.Toleration + + // NodeSelector is passed to validation workloads (e.g., NCCL benchmark pods) + // to override platform-specific node selectors. Use when GPU nodes have + // non-standard labels. Does not affect the orchestrator Job itself. + NodeSelector map[string]string } // PhaseResult is the outcome of running all validators in a single phase. diff --git a/pkg/validator/validator.go b/pkg/validator/validator.go index 3c53444c9..d01aa2698 100644 --- a/pkg/validator/validator.go +++ b/pkg/validator/validator.go @@ -322,7 +322,7 @@ func (v *Validator) runPhase( deployer := job.NewDeployer( clientset, factory, v.Namespace, v.RunID, entry, - v.ImagePullSecrets, v.Tolerations, + v.ImagePullSecrets, v.Tolerations, v.NodeSelector, ) // Deploy diff --git a/recipes/overlays/b200-any-training.yaml b/recipes/overlays/b200-any-training.yaml new file mode 100644 index 000000000..44e5bb5d9 --- /dev/null +++ b/recipes/overlays/b200-any-training.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: RecipeMetadata +apiVersion: aicr.nvidia.com/v1alpha1 +metadata: + name: b200-any-training + +spec: + base: base + + criteria: + service: any + accelerator: b200 + intent: training + + validation: + performance: + checks: + - nccl-all-reduce-bw + constraints: + - name: nccl-all-reduce-bw + value: ">= 350" diff --git a/recipes/overlays/gb200-any-training.yaml b/recipes/overlays/gb200-any-training.yaml new file mode 100644 index 000000000..862a54e43 --- /dev/null +++ b/recipes/overlays/gb200-any-training.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: RecipeMetadata +apiVersion: aicr.nvidia.com/v1alpha1 +metadata: + name: gb200-any-training + +spec: + base: base + + criteria: + service: any + accelerator: gb200 + intent: training + + validation: + performance: + checks: + - nccl-all-reduce-bw + constraints: + - name: nccl-all-reduce-bw + value: ">= 720" diff --git a/validators/conformance/dra_support_check.go b/validators/conformance/dra_support_check.go index 0cfc84cc8..2a10cfe36 100644 --- a/validators/conformance/dra_support_check.go +++ b/validators/conformance/dra_support_check.go @@ -148,7 +148,7 @@ func validateDRAAllocation(ctx *validators.Context, dynClient dynamic.Interface) fmt.Sprintf("Created Namespace=%s ResourceClaim=%s Pod=%s via Kubernetes API", draTestNamespace, run.claimName, run.podName)) - if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run); err != nil { + if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run, ctx.Tolerations); err != nil { return err } defer func() { //nolint:contextcheck // Fresh context: parent may be canceled during cleanup diff --git a/validators/conformance/gang_scheduling_check.go b/validators/conformance/gang_scheduling_check.go index 28d49b0c7..2bfda2612 100644 --- a/validators/conformance/gang_scheduling_check.go +++ b/validators/conformance/gang_scheduling_check.go @@ -195,7 +195,7 @@ func CheckGangScheduling(ctx *validators.Context) error { fmt.Sprintf("Created PodGroup=%s ResourceClaims=%s,%s Pods=%s,%s in namespace=%s", run.groupName, run.claims[0], run.claims[1], run.pods[0], run.pods[1], gangTestNamespace)) - if err = deployGangTestResources(ctx.Ctx, ctx.Clientset, dynClient, run); err != nil { + if err = deployGangTestResources(ctx.Ctx, ctx.Clientset, dynClient, run, ctx.Tolerations); err != nil { return err } @@ -275,7 +275,8 @@ func collectGangTestArtifacts(ctx *validators.Context, dynClient dynamic.Interfa } // deployGangTestResources creates the namespace, PodGroup, ResourceClaims, and Pods. -func deployGangTestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *gangTestRun) error { +// tolerations, when non-nil, replace the default tolerate-all policy on test pods. +func deployGangTestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *gangTestRun, tolerations []corev1.Toleration) error { // 1. Create namespace (idempotent). ns := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{Name: gangTestNamespace}, @@ -300,7 +301,7 @@ func deployGangTestResources(ctx context.Context, clientset kubernetes.Interface fmt.Sprintf("failed to create ResourceClaim %s", run.claims[i]), err) } - pod := buildGangTestPod(run, i) + pod := buildGangTestPod(run, i, tolerations) if _, err := clientset.CoreV1().Pods(gangTestNamespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { return errors.Wrap(errors.ErrCodeInternal, fmt.Sprintf("failed to create gang test pod %s", run.pods[i]), err) @@ -505,7 +506,11 @@ func buildGangResourceClaim(run *gangTestRun, index int) *unstructured.Unstructu } // buildGangTestPod returns the Pod spec for a gang scheduling test worker. -func buildGangTestPod(run *gangTestRun, index int) *corev1.Pod { +// tolerations, when non-nil, replace the default tolerate-all policy. +func buildGangTestPod(run *gangTestRun, index int, tolerations []corev1.Toleration) *corev1.Pod { + if tolerations == nil { + tolerations = []corev1.Toleration{{Operator: corev1.TolerationOpExists}} + } return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: run.pods[index], @@ -518,9 +523,7 @@ func buildGangTestPod(run *gangTestRun, index int) *corev1.Pod { Spec: corev1.PodSpec{ SchedulerName: "kai-scheduler", RestartPolicy: corev1.RestartPolicyNever, - Tolerations: []corev1.Toleration{ - {Operator: corev1.TolerationOpExists}, - }, + Tolerations: tolerations, ResourceClaims: []corev1.PodResourceClaim{ { Name: "gpu", diff --git a/validators/conformance/secure_access_check.go b/validators/conformance/secure_access_check.go index 9ceadb41a..bed3ad454 100644 --- a/validators/conformance/secure_access_check.go +++ b/validators/conformance/secure_access_check.go @@ -112,7 +112,7 @@ func CheckSecureAcceleratorAccess(ctx *validators.Context) error { } // Deploy DRA test resources and ensure cleanup. - if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run); err != nil { + if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run, ctx.Tolerations); err != nil { return err } defer func() { //nolint:contextcheck // Fresh context: parent may be canceled during cleanup @@ -240,7 +240,8 @@ func collectSecureAccessBaselineArtifacts(ctx *validators.Context, dynClient dyn } // deployDRATestResources creates the namespace, ResourceClaim, and Pod for the DRA test. -func deployDRATestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *draTestRun) error { +// tolerations, when non-nil, replace the default tolerate-all policy on the test pod. +func deployDRATestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *draTestRun, tolerations []corev1.Toleration) error { // 1. Create namespace (idempotent). ns := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{Name: draTestNamespace}, @@ -257,7 +258,7 @@ func deployDRATestResources(ctx context.Context, clientset kubernetes.Interface, } // 3. Create Pod with unique name. - pod := buildDRATestPod(run) + pod := buildDRATestPod(run, tolerations) if _, err := clientset.CoreV1().Pods(draTestNamespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { return errors.Wrap(errors.ErrCodeInternal, "failed to create DRA test pod", err) } @@ -509,7 +510,11 @@ func cleanupDRATestResources(ctx context.Context, clientset kubernetes.Interface } // buildDRATestPod returns the Pod spec for the DRA GPU allocation test. -func buildDRATestPod(run *draTestRun) *corev1.Pod { +// tolerations, when non-nil, replace the default tolerate-all policy. +func buildDRATestPod(run *draTestRun, tolerations []corev1.Toleration) *corev1.Pod { + if tolerations == nil { + tolerations = []corev1.Toleration{{Operator: corev1.TolerationOpExists}} + } return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: run.podName, @@ -517,9 +522,7 @@ func buildDRATestPod(run *draTestRun) *corev1.Pod { }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, - Tolerations: []corev1.Toleration{ - {Operator: corev1.TolerationOpExists}, - }, + Tolerations: tolerations, ResourceClaims: []corev1.PodResourceClaim{ { Name: "gpu", diff --git a/validators/context.go b/validators/context.go index bccc9a410..c265546b5 100644 --- a/validators/context.go +++ b/validators/context.go @@ -27,6 +27,7 @@ import ( "github.com/NVIDIA/aicr/pkg/recipe" "github.com/NVIDIA/aicr/pkg/serializer" "github.com/NVIDIA/aicr/pkg/snapshotter" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -57,6 +58,16 @@ type Context struct { // Namespace is the validation namespace. Namespace string + + // NodeSelector overrides platform-specific node selectors on inner workloads + // (e.g., NCCL benchmark worker pods). Nil means use the validator's default selectors. + // Set from the AICR_NODE_SELECTOR env var (comma-separated key=value pairs). + NodeSelector map[string]string + + // Tolerations overrides the default tolerate-all policy on inner workloads. + // Nil means use the validator's default tolerations. + // Set from the AICR_TOLERATIONS env var (comma-separated key=value:effect entries). + Tolerations []corev1.Toleration } // LoadContext creates a Context from the v2 container environment. @@ -103,6 +114,18 @@ func LoadContext() (*Context, error) { } } + // Parse optional scheduling overrides for inner workloads. + nodeSelector, err := parseNodeSelectorEnv() + if err != nil { + cancel() + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to parse AICR_NODE_SELECTOR", err) + } + tolerations, err := parseTolerationEnv() + if err != nil { + cancel() + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to parse AICR_TOLERATIONS", err) + } + return &Context{ Ctx: ctx, Cancel: cancel, @@ -112,9 +135,35 @@ func LoadContext() (*Context, error) { Snapshot: snap, Recipe: rec, Namespace: namespace, + NodeSelector: nodeSelector, + Tolerations: tolerations, }, nil } +// parseNodeSelectorEnv reads AICR_NODE_SELECTOR and parses it into a map. +// Returns nil (no override) if the env var is unset or empty. +func parseNodeSelectorEnv() (map[string]string, error) { + raw := os.Getenv("AICR_NODE_SELECTOR") + if raw == "" { + return nil, nil //nolint:nilnil // nil signals "not set" — callers check len to distinguish from empty + } + entries := strings.Split(raw, ",") + return snapshotter.ParseNodeSelectors(entries) +} + +// parseTolerationEnv reads AICR_TOLERATIONS and parses it into a slice of Tolerations. +// Returns nil (no override) if the env var is unset or empty. +func parseTolerationEnv() ([]corev1.Toleration, error) { + raw := os.Getenv("AICR_TOLERATIONS") + if raw == "" { + return nil, nil //nolint:nilnil // nil signals "not set" — callers check len to distinguish from empty + } + entries := strings.Split(raw, ",") + // Use ParseTolerations but we must not pass empty slice (it returns DefaultTolerations). + // Since we guard against empty raw above, entries will always be non-empty here. + return snapshotter.ParseTolerations(entries) +} + // Timeout returns a child context with the specified timeout. // The caller is responsible for calling the returned CancelFunc. func (c *Context) Timeout(d time.Duration) (context.Context, context.CancelFunc) { diff --git a/validators/context_test.go b/validators/context_test.go index 87c9ec894..d3b59b33a 100644 --- a/validators/context_test.go +++ b/validators/context_test.go @@ -16,6 +16,8 @@ package validators import ( "testing" + + corev1 "k8s.io/api/core/v1" ) func TestResolveNamespace(t *testing.T) { @@ -53,6 +55,114 @@ func TestResolveNamespace(t *testing.T) { } } +func TestParseNodeSelectorEnv(t *testing.T) { + tests := []struct { + name string + envVal string + expected map[string]string + wantErr bool + }{ + { + name: "env var unset returns nil", + envVal: "", + expected: nil, + }, + { + name: "single key=value pair", + envVal: "my-org/gpu-pool=true", + expected: map[string]string{"my-org/gpu-pool": "true"}, + }, + { + name: "multiple key=value pairs", + envVal: "accelerator=h100,pool=gpu", + expected: map[string]string{"accelerator": "h100", "pool": "gpu"}, + }, + { + name: "invalid format missing equals", + envVal: "invalidkey", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("AICR_NODE_SELECTOR", tt.envVal) + got, err := parseNodeSelectorEnv() + if (err != nil) != tt.wantErr { + t.Errorf("parseNodeSelectorEnv() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr { + if len(got) != len(tt.expected) { + t.Errorf("parseNodeSelectorEnv() = %v, want %v", got, tt.expected) + return + } + for k, want := range tt.expected { + if got[k] != want { + t.Errorf("parseNodeSelectorEnv()[%q] = %q, want %q", k, got[k], want) + } + } + } + }) + } +} + +func TestParseTolerationEnv(t *testing.T) { + tests := []struct { + name string + envVal string + expected []corev1.Toleration + wantErr bool + }{ + { + name: "env var unset returns nil", + envVal: "", + expected: nil, + }, + { + name: "key=value:effect toleration", + envVal: "gpu-type=h100:NoSchedule", + expected: []corev1.Toleration{ + {Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + }, + }, + { + name: "key:effect toleration (exists operator)", + envVal: "dedicated:NoExecute", + expected: []corev1.Toleration{ + {Key: "dedicated", Effect: corev1.TaintEffectNoExecute, Operator: corev1.TolerationOpExists}, + }, + }, + { + name: "invalid format", + envVal: "badformat", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("AICR_TOLERATIONS", tt.envVal) + got, err := parseTolerationEnv() + if (err != nil) != tt.wantErr { + t.Errorf("parseTolerationEnv() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr { + if len(got) != len(tt.expected) { + t.Errorf("parseTolerationEnv() = %v, want %v", got, tt.expected) + return + } + for i, want := range tt.expected { + if got[i].Key != want.Key || got[i].Value != want.Value || + got[i].Effect != want.Effect || got[i].Operator != want.Operator { + + t.Errorf("parseTolerationEnv()[%d] = %+v, want %+v", i, got[i], want) + } + } + } + }) + } +} + func TestEnvOrDefault(t *testing.T) { tests := []struct { name string diff --git a/validators/performance/nccl_all_reduce_bw_constraint.go b/validators/performance/nccl_all_reduce_bw_constraint.go index b9f02aa3a..5777c3e6e 100644 --- a/validators/performance/nccl_all_reduce_bw_constraint.go +++ b/validators/performance/nccl_all_reduce_bw_constraint.go @@ -92,6 +92,7 @@ func templatePath(accelerator recipe.CriteriaAcceleratorType, service recipe.Cri var supportedNCCLCombinations = map[recipe.CriteriaServiceType][]recipe.CriteriaAcceleratorType{ recipe.CriteriaServiceEKS: {recipe.CriteriaAcceleratorH100}, recipe.CriteriaServiceGKE: {recipe.CriteriaAcceleratorH100}, + recipe.CriteriaServiceAny: {recipe.CriteriaAcceleratorB200, recipe.CriteriaAcceleratorGB200}, } // validateNcclAllReduceBw validates NCCL All Reduce bandwidth using Kubeflow TrainJob + MPI. @@ -309,6 +310,8 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface "MAX_MESSAGE_SIZE": maxMessageSize, } + var instanceType string + // For GKE, discover GPU NIC network names (cluster-specific prefixes). if service == recipe.CriteriaServiceGKE { gpuNICs, err := discoverGKEGPUNICNetworks(ctx.Ctx, dynamicClient) @@ -328,11 +331,11 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface // EFA count of 0 is valid — NCCL falls back to TCP (slower but functional). if service == recipe.CriteriaServiceEKS { warnIfHeterogeneousNodes(config.Nodes) - instanceType, efaCount, err := discoverEKSNodeConfig(config.Nodes[0]) + it, efaCount, err := discoverEKSNodeConfig(config.Nodes[0]) if err != nil { return err } - templateData["INSTANCE_TYPE"] = instanceType + instanceType = it // Indentation matches the resource block position in runtime.yaml. const efaIndent = " " templateData["EFA_RESOURCE_LIMITS"] = buildEFAResourceLine(efaCount, efaIndent) @@ -346,8 +349,33 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface } } - // Apply per-platform runtime: testdata/{accelerator}/{service}/runtime.yaml - if err := applyYAMLWithDynamicClient(ctx.Ctx, dynamicClient, trainingRuntimeGVR, config.Namespace, templatePath(accelerator, service, "runtime.yaml"), templateData); err != nil { + // Build effective worker scheduling: user override takes precedence over platform default. + defaultNodeSelector, defaultTolerations := platformWorkerScheduling(service, instanceType) + effectiveNodeSelector := defaultNodeSelector + if ctx.NodeSelector != nil { + effectiveNodeSelector = ctx.NodeSelector + slog.Info("Using user-provided node selector override for NCCL workers", "selector", ctx.NodeSelector) + } + effectiveTolerations := defaultTolerations + if ctx.Tolerations != nil { + effectiveTolerations = ctx.Tolerations + slog.Info("Using user-provided toleration override for NCCL workers", "count", len(ctx.Tolerations)) + } + + if service == recipe.CriteriaServiceAny && len(effectiveNodeSelector) == 0 { + return aicrErrors.New(aicrErrors.ErrCodeInvalidRequest, + "self-managed clusters (service=any) require --node-selector to identify GPU nodes "+ + "(e.g., --node-selector nvidia.com/gpu.present=true)") + } + + runtimeObj, err := parseYAMLTemplate(templatePath(accelerator, service, "runtime.yaml"), templateData) + if err != nil { + return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse training runtime template", err) + } + if err := applyNCCLWorkerScheduling(runtimeObj, effectiveNodeSelector, effectiveTolerations); err != nil { + return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to apply NCCL worker scheduling", err) + } + if err := createUnstructured(ctx.Ctx, dynamicClient, trainingRuntimeGVR, config.Namespace, runtimeObj); err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to apply training runtime", err) } slog.Info("Applied TrainingRuntime", "service", service) @@ -370,37 +398,150 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface } // applyYAMLWithDynamicClient reads a YAML template, performs substitution, and applies it using dynamic client -func applyYAMLWithDynamicClient(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace, templatePath string, data map[string]string) error { - content, err := os.ReadFile(templatePath) +func applyYAMLWithDynamicClient(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace, path string, data map[string]string) error { + obj, err := parseYAMLTemplate(path, data) if err != nil { - return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to read template", err) + return err } + return createUnstructured(ctx, dynamicClient, gvr, namespace, obj) +} - // Perform template substitution +// parseYAMLTemplate reads a YAML template file, performs ${KEY} substitution, +// and unmarshals it into an unstructured object. +func parseYAMLTemplate(path string, data map[string]string) (*unstructured.Unstructured, error) { + content, err := os.ReadFile(path) + if err != nil { + return nil, aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to read template", err) + } yamlContent := string(content) for key, value := range data { yamlContent = strings.ReplaceAll(yamlContent, "${"+key+"}", value) } - - // Parse YAML to unstructured object obj := &unstructured.Unstructured{} - if unmarshalErr := yaml.Unmarshal([]byte(yamlContent), obj); unmarshalErr != nil { - return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse YAML", unmarshalErr) + if err := yaml.Unmarshal([]byte(yamlContent), obj); err != nil { + return nil, aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse YAML", err) } + return obj, nil +} - // Apply with timeout +// createUnstructured creates a namespaced resource from an unstructured object with a timeout. +func createUnstructured(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string, obj *unstructured.Unstructured) error { applyCtx, cancel := context.WithTimeout(ctx, defaults.DiagnosticTimeout) defer cancel() - - // Create the resource - _, err = dynamicClient.Resource(gvr).Namespace(namespace).Create(applyCtx, obj, metav1.CreateOptions{}) + _, err := dynamicClient.Resource(gvr).Namespace(namespace).Create(applyCtx, obj, metav1.CreateOptions{}) if err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to create resource", err) } - return nil } +// platformWorkerScheduling returns the default nodeSelector and tolerations +// for NCCL worker pods on the given service. instanceType is only used for EKS. +func platformWorkerScheduling(service recipe.CriteriaServiceType, instanceType string) (map[string]string, []v1.Toleration) { + switch service { + case recipe.CriteriaServiceEKS: + return map[string]string{ + "node.kubernetes.io/instance-type": instanceType, + }, []v1.Toleration{{Operator: v1.TolerationOpExists}} + case recipe.CriteriaServiceGKE: + return map[string]string{ + "cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb", + }, []v1.Toleration{ + {Operator: v1.TolerationOpExists}, + {Key: "nvidia.com/gpu", Operator: v1.TolerationOpEqual, Value: "present", Effect: v1.TaintEffectNoSchedule}, + } + case recipe.CriteriaServiceAny, recipe.CriteriaServiceAKS, recipe.CriteriaServiceOKE, recipe.CriteriaServiceKind: + return nil, nil + default: + return nil, nil + } +} + +// applyNCCLWorkerScheduling sets the nodeSelector and tolerations on the "node" +// (worker) replicatedJob within a TrainingRuntime unstructured object. +func applyNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector map[string]string, tolerations []v1.Toleration) error { + replicatedJobs, found, err := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + if err != nil || !found { + return aicrErrors.New(aicrErrors.ErrCodeInternal, "replicatedJobs not found in TrainingRuntime") + } + + nodeJobFound := false + for i, jobRaw := range replicatedJobs { + jobMap, ok := jobRaw.(map[string]interface{}) + if !ok { + continue + } + name, _, _ := unstructured.NestedString(jobMap, "name") + if name != "node" { + continue + } + nodeJobFound = true + + // Navigate deep into the worker pod spec. + workerPodSpec, found := nestedMap(jobMap, "template", "spec", "template", "spec") + if !found { + return aicrErrors.New(aicrErrors.ErrCodeInternal, "worker pod spec not found in TrainingRuntime node job") + } + + if len(nodeSelector) > 0 { + ns := make(map[string]interface{}, len(nodeSelector)) + for k, v := range nodeSelector { + ns[k] = v + } + workerPodSpec["nodeSelector"] = ns + slog.Info("Applying NCCL worker nodeSelector", "selector", nodeSelector) + } + + if len(tolerations) > 0 { + tolList := make([]interface{}, 0, len(tolerations)) + for _, t := range tolerations { + tolMap := map[string]interface{}{ + "operator": string(t.Operator), + } + if t.Key != "" { + tolMap["key"] = t.Key + } + if t.Value != "" { + tolMap["value"] = t.Value + } + if t.Effect != "" { + tolMap["effect"] = string(t.Effect) + } + tolList = append(tolList, tolMap) + } + workerPodSpec["tolerations"] = tolList + slog.Info("Applying NCCL worker tolerations", "count", len(tolerations)) + } + + replicatedJobs[i] = jobMap + break + } + + if !nodeJobFound { + return aicrErrors.New(aicrErrors.ErrCodeInternal, `replicatedJob "node" not found in TrainingRuntime`) + } + + return unstructured.SetNestedSlice(obj.Object, replicatedJobs, "spec", "template", "spec", "replicatedJobs") +} + +// nestedMap navigates a chain of string keys through nested map[string]interface{} values. +// Returns the target map and true if found, nil and false otherwise. +func nestedMap(m map[string]interface{}, keys ...string) (map[string]interface{}, bool) { + current := m + for _, key := range keys { + next, ok := current[key] + if !ok { + return nil, false + } + nextMap, ok := next.(map[string]interface{}) + if !ok { + return nil, false + } + current = nextMap + } + return current, true +} + // waitForLauncherPodAndGetLogs waits for the launcher pod to be created and retrieves logs func waitForLauncherPodAndGetLogs(ctx *validators.Context, podHelper *helper.PodLifecycle) (string, error) { slog.Info("Waiting for launcher pod to be created...") diff --git a/validators/performance/nccl_test.go b/validators/performance/nccl_test.go index 55a38f3f1..187beea68 100644 --- a/validators/performance/nccl_test.go +++ b/validators/performance/nccl_test.go @@ -21,9 +21,231 @@ import ( "testing" "github.com/NVIDIA/aicr/pkg/recipe" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) +const testWorkerJobName = "node" + +func TestApplyNCCLWorkerScheduling_NodeSelector(t *testing.T) { + // Build a minimal TrainingRuntime-like unstructured object matching the real template structure. + workerPodSpec := map[string]interface{}{ + "nodeSelector": map[string]interface{}{ + "node.kubernetes.io/instance-type": "p5.48xlarge", + }, + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, + }, + } + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "replicatedJobs": []interface{}{ + map[string]interface{}{"name": "launcher"}, + map[string]interface{}{ + "name": testWorkerJobName, + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": workerPodSpec, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + nodeSelector := map[string]string{"my-org/gpu-pool": "true"} + if err := applyNCCLWorkerScheduling(obj, nodeSelector, nil); err != nil { + t.Fatalf("applyNCCLWorkerScheduling() error = %v", err) + } + + // Verify the nodeSelector was replaced in the worker spec. + jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + for _, j := range jobs { + jm, _ := j.(map[string]interface{}) + name, _, _ := unstructured.NestedString(jm, "name") + if name != testWorkerJobName { + continue + } + ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") + if ns["my-org/gpu-pool"] != "true" { + t.Errorf("worker nodeSelector = %v, want my-org/gpu-pool=true", ns) + } + if _, hasOld := ns["node.kubernetes.io/instance-type"]; hasOld { + t.Error("old instance-type selector should have been replaced") + } + } +} + +func TestApplyNCCLWorkerScheduling_Tolerations(t *testing.T) { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "replicatedJobs": []interface{}{ + map[string]interface{}{"name": "launcher"}, + map[string]interface{}{ + "name": testWorkerJobName, + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "nodeSelector": map[string]interface{}{ + "cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb", + }, + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + tolerations := []corev1.Toleration{ + {Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + } + if err := applyNCCLWorkerScheduling(obj, nil, tolerations); err != nil { + t.Fatalf("applyNCCLWorkerScheduling() error = %v", err) + } + + // nodeSelector should be unchanged (only tolerations overridden). + jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + for _, j := range jobs { + jm, _ := j.(map[string]interface{}) + name, _, _ := unstructured.NestedString(jm, "name") + if name != testWorkerJobName { + continue + } + ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") + if ns["cloud.google.com/gke-accelerator"] != "nvidia-h100-mega-80gb" { + t.Errorf("nodeSelector should be unchanged, got %v", ns) + } + tolsRaw, _, _ := unstructured.NestedSlice(jm, "template", "spec", "template", "spec", "tolerations") + if len(tolsRaw) != 1 { + t.Fatalf("tolerations count = %d, want 1", len(tolsRaw)) + } + tol, _ := tolsRaw[0].(map[string]interface{}) + if tol["key"] != "gpu-type" || tol["value"] != "h100" || tol["effect"] != "NoSchedule" { + t.Errorf("toleration = %v, want gpu-type=h100:NoSchedule", tol) + } + } +} + +func TestApplyNCCLWorkerScheduling_Both(t *testing.T) { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "replicatedJobs": []interface{}{ + map[string]interface{}{"name": "launcher"}, + map[string]interface{}{ + "name": testWorkerJobName, + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "nodeSelector": map[string]interface{}{ + "node.kubernetes.io/instance-type": "p5.48xlarge", + }, + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + nodeSelector := map[string]string{"custom-label/gpu": "a100"} + tolerations := []corev1.Toleration{ + {Key: "custom-taint", Value: "true", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + } + if err := applyNCCLWorkerScheduling(obj, nodeSelector, tolerations); err != nil { + t.Fatalf("applyNCCLWorkerScheduling() error = %v", err) + } + + jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + for _, j := range jobs { + jm, _ := j.(map[string]interface{}) + name, _, _ := unstructured.NestedString(jm, "name") + if name != testWorkerJobName { + continue + } + // Verify nodeSelector was replaced. + ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") + if ns["custom-label/gpu"] != "a100" { + t.Errorf("worker nodeSelector = %v, want custom-label/gpu=a100", ns) + } + if _, hasOld := ns["node.kubernetes.io/instance-type"]; hasOld { + t.Error("old instance-type selector should have been replaced") + } + // Verify tolerations were replaced. + tolsRaw, _, _ := unstructured.NestedSlice(jm, "template", "spec", "template", "spec", "tolerations") + if len(tolsRaw) != 1 { + t.Fatalf("tolerations count = %d, want 1", len(tolsRaw)) + } + tol, _ := tolsRaw[0].(map[string]interface{}) + if tol["key"] != "custom-taint" || tol["value"] != "true" || tol["effect"] != "NoSchedule" { + t.Errorf("toleration = %v, want custom-taint=true:NoSchedule", tol) + } + } +} + +func TestPlatformWorkerScheduling(t *testing.T) { + t.Run("EKS returns instance-type selector", func(t *testing.T) { + ns, tols := platformWorkerScheduling(recipe.CriteriaServiceEKS, "p5.48xlarge") + if ns["node.kubernetes.io/instance-type"] != "p5.48xlarge" { + t.Errorf("EKS nodeSelector = %v, want instance-type=p5.48xlarge", ns) + } + if len(tols) != 1 || tols[0].Operator != corev1.TolerationOpExists { + t.Errorf("EKS tolerations = %v, want tolerate-all", tols) + } + }) + t.Run("GKE returns gke-accelerator selector", func(t *testing.T) { + ns, tols := platformWorkerScheduling(recipe.CriteriaServiceGKE, "") + if ns["cloud.google.com/gke-accelerator"] != "nvidia-h100-mega-80gb" { + t.Errorf("GKE nodeSelector = %v, want gke-accelerator=nvidia-h100-mega-80gb", ns) + } + if len(tols) != 2 { + t.Errorf("GKE tolerations count = %d, want 2", len(tols)) + } + }) + t.Run("unknown service returns nil", func(t *testing.T) { + ns, tols := platformWorkerScheduling("unknown", "") + if ns != nil || tols != nil { + t.Errorf("unknown service should return nil, got ns=%v tols=%v", ns, tols) + } + }) + t.Run("any service returns nil", func(t *testing.T) { + ns, tols := platformWorkerScheduling(recipe.CriteriaServiceAny, "") + if ns != nil || tols != nil { + t.Errorf("any service should return nil, got ns=%v tols=%v", ns, tols) + } + }) +} + func TestTemplatePath(t *testing.T) { tests := []struct { name string @@ -53,6 +275,20 @@ func TestTemplatePath(t *testing.T) { filename: "runtime.yaml", expected: filepath.Join("testdata", "gb200", "gke", "runtime.yaml"), }, + { + name: "b200 any runtime", + accelerator: recipe.CriteriaAcceleratorB200, + service: recipe.CriteriaServiceAny, + filename: "runtime.yaml", + expected: filepath.Join("testdata", "b200", "any", "runtime.yaml"), + }, + { + name: "gb200 any runtime", + accelerator: recipe.CriteriaAcceleratorGB200, + service: recipe.CriteriaServiceAny, + filename: "runtime.yaml", + expected: filepath.Join("testdata", "gb200", "any", "runtime.yaml"), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/validators/performance/testdata/b200/any/runtime.yaml b/validators/performance/testdata/b200/any/runtime.yaml new file mode 100644 index 000000000..5d38929d5 --- /dev/null +++ b/validators/performance/testdata/b200/any/runtime.yaml @@ -0,0 +1,181 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Self-managed (service=any) NCCL All-Reduce TrainingRuntime — InfiniBand/RDMA. +# +# Simplified runtime for bare-metal / self-managed clusters using InfiniBand. +# No EFA, no TCPXO, no cloud-specific sidecars or resources. +# Requires --node-selector to identify GPU nodes (no standard label exists). +# +# Must stay in sync with ncclTrainingRuntimeName in nccl_all_reduce_bw_constraint.go. + +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: TrainingRuntime +metadata: + name: nccl-all-reduce-runtime + namespace: ${NAMESPACE} + labels: + trainer.kubeflow.org/framework: mpi +spec: + mlPolicy: + mpi: + mpiImplementation: OpenMPI + numProcPerNode: ${GPU_COUNT_PER_NODE} + runLauncherAsNode: false + sshAuthMountPath: /tmp/mpi-keys + template: + spec: + network: + enableDNSHostnames: true + publishNotReadyAddresses: true + replicatedJobs: + - name: launcher + replicas: 1 + template: + spec: + template: + spec: + tolerations: + - operator: Exists + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + mkdir -p /root/.ssh + cp /tmp/mpi-keys/id_rsa /root/.ssh/id_rsa + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys + chmod 700 /root/.ssh + chmod 600 /root/.ssh/id_rsa /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + env: + - name: LD_LIBRARY_PATH + value: "/usr/local/nvidia/lib64:/usr/local/cuda/lib64" + command: + - /usr/local/mpi/bin/mpirun + args: + - -np + - "${GPU_COUNT}" + - --allow-run-as-root + - --mca + - plm_rsh_args + - -o StrictHostKeyChecking=no -o ConnectionAttempts=10 + - --mca + - btl + - ^openib + - --mca + - btl_tcp_if_include + - eth0 + - --mca + - oob_tcp_if_include + - eth0 + - -x + - LD_LIBRARY_PATH + - -x + - NCCL_DEBUG=WARN + - -x + - NCCL_SOCKET_IFNAME=eth0 + - /usr/local/bin/${TEST_TYPE}_mpi + - -b + - ${MIN_MESSAGE_SIZE} + - -e + - ${MAX_MESSAGE_SIZE} + - -f + - "2" + - -g + - "1" + resources: + limits: + cpu: "2" + memory: 128Mi + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + volumes: + - name: ssh-config + emptyDir: {} + - name: node + template: + spec: + template: + spec: + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: ["sh", "-c"] + args: + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/* /root/.ssh/ && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys && + /usr/sbin/sshd -De + resources: + limits: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + requests: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + securityContext: + capabilities: + add: ["IPC_LOCK"] + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + - name: dshm + mountPath: /dev/shm + volumes: + - name: ssh-config + emptyDir: {} + - name: dshm + emptyDir: + medium: Memory + successPolicy: + operator: All + targetReplicatedJobs: + - launcher diff --git a/validators/performance/testdata/gb200/any/runtime.yaml b/validators/performance/testdata/gb200/any/runtime.yaml new file mode 100644 index 000000000..5d38929d5 --- /dev/null +++ b/validators/performance/testdata/gb200/any/runtime.yaml @@ -0,0 +1,181 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Self-managed (service=any) NCCL All-Reduce TrainingRuntime — InfiniBand/RDMA. +# +# Simplified runtime for bare-metal / self-managed clusters using InfiniBand. +# No EFA, no TCPXO, no cloud-specific sidecars or resources. +# Requires --node-selector to identify GPU nodes (no standard label exists). +# +# Must stay in sync with ncclTrainingRuntimeName in nccl_all_reduce_bw_constraint.go. + +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: TrainingRuntime +metadata: + name: nccl-all-reduce-runtime + namespace: ${NAMESPACE} + labels: + trainer.kubeflow.org/framework: mpi +spec: + mlPolicy: + mpi: + mpiImplementation: OpenMPI + numProcPerNode: ${GPU_COUNT_PER_NODE} + runLauncherAsNode: false + sshAuthMountPath: /tmp/mpi-keys + template: + spec: + network: + enableDNSHostnames: true + publishNotReadyAddresses: true + replicatedJobs: + - name: launcher + replicas: 1 + template: + spec: + template: + spec: + tolerations: + - operator: Exists + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + mkdir -p /root/.ssh + cp /tmp/mpi-keys/id_rsa /root/.ssh/id_rsa + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys + chmod 700 /root/.ssh + chmod 600 /root/.ssh/id_rsa /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + env: + - name: LD_LIBRARY_PATH + value: "/usr/local/nvidia/lib64:/usr/local/cuda/lib64" + command: + - /usr/local/mpi/bin/mpirun + args: + - -np + - "${GPU_COUNT}" + - --allow-run-as-root + - --mca + - plm_rsh_args + - -o StrictHostKeyChecking=no -o ConnectionAttempts=10 + - --mca + - btl + - ^openib + - --mca + - btl_tcp_if_include + - eth0 + - --mca + - oob_tcp_if_include + - eth0 + - -x + - LD_LIBRARY_PATH + - -x + - NCCL_DEBUG=WARN + - -x + - NCCL_SOCKET_IFNAME=eth0 + - /usr/local/bin/${TEST_TYPE}_mpi + - -b + - ${MIN_MESSAGE_SIZE} + - -e + - ${MAX_MESSAGE_SIZE} + - -f + - "2" + - -g + - "1" + resources: + limits: + cpu: "2" + memory: 128Mi + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + volumes: + - name: ssh-config + emptyDir: {} + - name: node + template: + spec: + template: + spec: + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: ["sh", "-c"] + args: + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/* /root/.ssh/ && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys && + /usr/sbin/sshd -De + resources: + limits: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + requests: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + securityContext: + capabilities: + add: ["IPC_LOCK"] + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + - name: dshm + mountPath: /dev/shm + volumes: + - name: ssh-config + emptyDir: {} + - name: dshm + emptyDir: + medium: Memory + successPolicy: + operator: All + targetReplicatedJobs: + - launcher diff --git a/validators/performance/testdata/h100/eks/runtime.yaml b/validators/performance/testdata/h100/eks/runtime.yaml index 7552dd8af..0462509b6 100644 --- a/validators/performance/testdata/h100/eks/runtime.yaml +++ b/validators/performance/testdata/h100/eks/runtime.yaml @@ -145,10 +145,6 @@ spec: spec: template: spec: - nodeSelector: - node.kubernetes.io/instance-type: ${INSTANCE_TYPE} - tolerations: - - operator: Exists initContainers: # Fix authorized_keys permissions so sshd accepts incoming connections. # The MPI plugin mounts the secret at /tmp/mpi-keys with 0644; sshd diff --git a/validators/performance/testdata/h100/gke/runtime.yaml b/validators/performance/testdata/h100/gke/runtime.yaml index b34942b0d..feedb9755 100644 --- a/validators/performance/testdata/h100/gke/runtime.yaml +++ b/validators/performance/testdata/h100/gke/runtime.yaml @@ -152,14 +152,6 @@ spec: networking.gke.io/default-interface: eth0 networking.gke.io/interfaces: '${GKE_NETWORK_INTERFACES}' spec: - nodeSelector: - cloud.google.com/gke-accelerator: nvidia-h100-mega-80gb - tolerations: - - operator: Exists - - key: nvidia.com/gpu - operator: Equal - value: present - effect: NoSchedule initContainers: # TCPXO FastRak native sidecar — runs fastrak_gpumem_manager # alongside the worker. Uses NRI device injection (devices.gke.io diff --git a/validators/performance/trainer_lifecycle.go b/validators/performance/trainer_lifecycle.go index d26aebafa..73395aa2f 100644 --- a/validators/performance/trainer_lifecycle.go +++ b/validators/performance/trainer_lifecycle.go @@ -27,6 +27,7 @@ import ( "path/filepath" "slices" "strings" + "time" "github.com/NVIDIA/aicr/pkg/defaults" aicrErrors "github.com/NVIDIA/aicr/pkg/errors" @@ -55,6 +56,12 @@ const ( // trainerCRDName is the CRD that signals the Trainer operator is installed. trainerCRDName = "trainjobs.trainer.kubeflow.org" + // trainerControllerDeployment is the Deployment name for the Trainer controller-manager. + trainerControllerDeployment = "kubeflow-trainer-controller-manager" + + // trainerNamespace is the namespace where the Trainer operator is installed. + trainerNamespace = "kubeflow-system" + // maxExtractedFileSize caps individual file sizes during tar extraction (50 MB). maxExtractedFileSize = 50 * 1024 * 1024 ) @@ -178,6 +185,12 @@ func installTrainer(ctx context.Context, dynamicClient dynamic.Interface, discov return applied, aicrErrors.Wrap(aicrErrors.ErrCodeTimeout, "Trainer CRDs not ready after install", err) } + // Wait for the controller-manager to be ready so the ValidatingWebhookConfiguration + // backing Service can serve requests before the caller creates TrainingRuntime resources. + if err := waitForTrainerControllerReady(ctx, dynamicClient); err != nil { + return applied, aicrErrors.Wrap(aicrErrors.ErrCodeTimeout, "Trainer controller not ready after install", err) + } + return applied, nil } @@ -225,6 +238,40 @@ func waitForTrainerCRDsEstablished(ctx context.Context, dynamicClient dynamic.In return nil } +// waitForTrainerControllerReady polls the controller-manager Deployment until at +// least one replica is ready, ensuring the ValidatingWebhookConfiguration can +// serve admission requests before the caller creates Trainer custom resources. +func waitForTrainerControllerReady(ctx context.Context, dynamicClient dynamic.Interface) error { + slog.Info("Waiting for Trainer controller-manager to become ready", + "deployment", trainerControllerDeployment, "namespace", trainerNamespace) + + deployGVR := schema.GroupVersionResource{ + Group: "apps", Version: "v1", Resource: "deployments", + } + + waitCtx, cancel := context.WithTimeout(ctx, defaults.TrainerControllerReadyTimeout) + defer cancel() + + for { + deploy, err := dynamicClient.Resource(deployGVR).Namespace(trainerNamespace). + Get(waitCtx, trainerControllerDeployment, metav1.GetOptions{}) + if err == nil { + readyReplicas, _, _ := unstructured.NestedInt64(deploy.Object, "status", "readyReplicas") + if readyReplicas >= 1 { + slog.Info("Trainer controller-manager is ready", "readyReplicas", readyReplicas) + return nil + } + } + + select { + case <-waitCtx.Done(): + return aicrErrors.Wrap(aicrErrors.ErrCodeTimeout, + "timed out waiting for Trainer controller-manager to become ready", waitCtx.Err()) + case <-time.After(2 * time.Second): + } + } +} + // waitForCRDEstablished watches a CRD until its Established condition is True. // It checks the current state first so the fast path (already established) returns // immediately without starting a watch.