From a1679a25011522f29b22fc4980bd69b570d73e35 Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Thu, 19 Mar 2026 19:43:07 -0400 Subject: [PATCH 1/9] feat(validator): pass node selector to validation Jobs for scheduling parity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Validation Jobs previously ignored the --node-selector CLI flag, creating friction for users operating in constrained environments with specific node placement requirements. AICR recipes and bundles already expose nodeSelector, tolerations, and affinity for workload scheduling, but validators did not honor the same inputs. This change threads --node-selector through the full validation pipeline: CLI → Validator → Deployer → K8s Job PodSpec. When set, validator Jobs are constrained to nodes matching the specified labels, achieving configuration parity between recipe/bundle deployment and validation phases. Changes: - Add NodeSelector field to Validator struct and WithNodeSelector option - Accept nodeSelector in job.NewDeployer and apply via PodSpec.NodeSelector - Wire --node-selector through validationConfig to validator construction - Move --node-selector and --toleration flags to "Scheduling" category - Update flag description to clarify it applies to both agent and validators - Add TestDeployJobNodeSelector and TestDeployJobNodeSelectorEmpty tests Closes #443 --- pkg/cli/validate.go | 16 +++++-- pkg/validator/job/deployer.go | 27 ++++++++--- pkg/validator/job/deployer_test.go | 74 +++++++++++++++++++++--------- pkg/validator/job/result_test.go | 2 +- pkg/validator/options.go | 8 ++++ pkg/validator/types.go | 3 ++ pkg/validator/validator.go | 2 +- 7 files changed, 97 insertions(+), 35 deletions(-) diff --git a/pkg/cli/validate.go b/pkg/cli/validate.go index 66c682056..4cd696e49 100644 --- a/pkg/cli/validate.go +++ b/pkg/cli/validate.go @@ -210,7 +210,8 @@ type validationConfig struct { noCluster bool // Scheduling - tolerations []corev1.Toleration + nodeSelector map[string]string + tolerations []corev1.Toleration // Behavior failOnError bool @@ -236,6 +237,7 @@ func runValidation( validator.WithImagePullSecrets(cfg.imagePullSecrets), validator.WithNoCluster(cfg.noCluster), validator.WithTolerations(cfg.tolerations), + validator.WithNodeSelector(cfg.nodeSelector), ) results, err := v.ValidatePhases(ctx, cfg.phases, rec, snap) @@ -379,13 +381,13 @@ func validateCmdFlags() []cli.Flag { }, &cli.StringSliceFlag{ Name: "node-selector", - Usage: "Node selector for snapshot agent Job scheduling (format: key=value, can be repeated).", - Category: "Agent Deployment", + Usage: "Node selector for snapshot agent and validation Job scheduling (format: key=value, can be repeated).", + Category: "Scheduling", }, &cli.StringSliceFlag{ Name: "toleration", Usage: "Toleration for snapshot agent and validation Job scheduling (format: key=value:effect). By default, all taints are tolerated.", - Category: "Agent Deployment", + Category: "Scheduling", }, &cli.DurationFlag{ Name: "timeout", @@ -540,6 +542,11 @@ Run validation without failing on check errors (informational mode): return errors.Wrap(errors.ErrCodeInvalidRequest, "invalid toleration", tolErr) } + nodeSelector, nsErr := snapshotter.ParseNodeSelectors(cmd.StringSlice("node-selector")) + if nsErr != nil { + return errors.Wrap(errors.ErrCodeInvalidRequest, "invalid node-selector", nsErr) + } + // Validate that requested phases are defined in the recipe. if err := validatePhasesAgainstRecipe(phases, rec); err != nil { return err @@ -561,6 +568,7 @@ Run validation without failing on check errors (informational mode): cleanup: !noCleanup, imagePullSecrets: cmd.StringSlice("image-pull-secret"), noCluster: cmd.Bool("no-cluster"), + nodeSelector: nodeSelector, tolerations: tolerations, evidenceDir: evidenceDir, }) diff --git a/pkg/validator/job/deployer.go b/pkg/validator/job/deployer.go index 5b2343d57..42561c04d 100644 --- a/pkg/validator/job/deployer.go +++ b/pkg/validator/job/deployer.go @@ -49,6 +49,7 @@ type Deployer struct { jobName string // Unique name generated client-side (set by DeployJob) imagePullSecrets []string tolerations []corev1.Toleration + nodeSelector map[string]string } // NewDeployer creates a Deployer for a single validator catalog entry. @@ -60,6 +61,7 @@ func NewDeployer( entry catalog.ValidatorEntry, imagePullSecrets []string, tolerations []corev1.Toleration, + nodeSelector map[string]string, ) *Deployer { return &Deployer{ @@ -70,6 +72,7 @@ func NewDeployer( entry: entry, imagePullSecrets: imagePullSecrets, tolerations: tolerations, + nodeSelector: nodeSelector, } } @@ -149,13 +152,7 @@ func (d *Deployer) buildApplyConfig() *applybatchv1.JobApplyConfiguration { labels.Component: labels.ValueValidation, labels.Validator: d.entry.Name, }). - WithSpec(applycorev1.PodSpec(). - WithServiceAccountName(ServiceAccountName). - WithRestartPolicy(corev1.RestartPolicyNever). - WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())). - WithImagePullSecrets(d.buildImagePullSecretsApply()...). - WithTolerations(d.buildTolerationsApply()...). - WithAffinity(preferCPUNodeAffinityApply()). + WithSpec(d.buildPodSpecApply(). WithContainers(applycorev1.Container(). WithName("validator"). WithImage(d.entry.Image). @@ -238,6 +235,22 @@ func (d *Deployer) buildImagePullSecretsApply() []*applycorev1.LocalObjectRefere return refs } +func (d *Deployer) buildPodSpecApply() *applycorev1.PodSpecApplyConfiguration { + spec := applycorev1.PodSpec(). + WithServiceAccountName(ServiceAccountName). + WithRestartPolicy(corev1.RestartPolicyNever). + WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())). + WithImagePullSecrets(d.buildImagePullSecretsApply()...). + WithTolerations(d.buildTolerationsApply()...). + WithAffinity(preferCPUNodeAffinityApply()) + + if len(d.nodeSelector) > 0 { + spec = spec.WithNodeSelector(d.nodeSelector) + } + + return spec +} + func (d *Deployer) buildTolerationsApply() []*applycorev1.TolerationApplyConfiguration { tols := make([]*applycorev1.TolerationApplyConfiguration, 0, len(d.tolerations)) for i := range d.tolerations { diff --git a/pkg/validator/job/deployer_test.go b/pkg/validator/job/deployer_test.go index 3fecfbc76..a45e8b65e 100644 --- a/pkg/validator/job/deployer_test.go +++ b/pkg/validator/job/deployer_test.go @@ -55,7 +55,7 @@ func deployAndGet(t *testing.T, d *Deployer) *batchv1.Job { } func TestJobNameEmptyBeforeDeploy(t *testing.T) { - d := NewDeployer(nil, nil, "default", "run123", testEntry(), nil, nil) + d := NewDeployer(nil, nil, "default", "run123", testEntry(), nil, nil, nil) if d.JobName() != "" { t.Errorf("JobName() before deploy = %q, want empty", d.JobName()) } @@ -63,7 +63,7 @@ func TestJobNameEmptyBeforeDeploy(t *testing.T) { func TestGenerateJobName(t *testing.T) { ns := createUniqueNamespace(t) - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) job := deployAndGet(t, d) if !strings.HasPrefix(job.Name, "aicr-gpu-operator-health-") { @@ -78,8 +78,8 @@ func TestDeployJobUniqueNames(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d1 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) - d2 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d1 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) + d2 := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d1.DeployJob(ctx); err != nil { t.Fatalf("first DeployJob() failed: %v", err) @@ -95,7 +95,7 @@ func TestDeployJobUniqueNames(t *testing.T) { func TestDeployJobSSAFieldManager(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) found := false for _, ref := range job.ManagedFields { @@ -111,7 +111,7 @@ func TestDeployJobSSAFieldManager(t *testing.T) { func TestDeployJobLabels(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) expectedLabels := map[string]string{ "app.kubernetes.io/name": "aicr", @@ -138,7 +138,7 @@ func TestDeployJobLabels(t *testing.T) { func TestDeployJobTimeouts(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) if job.Spec.ActiveDeadlineSeconds == nil || *job.Spec.ActiveDeadlineSeconds != 120 { t.Errorf("ActiveDeadlineSeconds = %v, want 120", job.Spec.ActiveDeadlineSeconds) @@ -163,7 +163,7 @@ func TestDeployJobDefaultTimeout(t *testing.T) { ns := createUniqueNamespace(t) entry := testEntry() entry.Timeout = 0 - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil, nil)) expected := int64(defaults.ValidatorDefaultTimeout.Seconds()) if job.Spec.ActiveDeadlineSeconds == nil || *job.Spec.ActiveDeadlineSeconds != expected { @@ -173,7 +173,7 @@ func TestDeployJobDefaultTimeout(t *testing.T) { func TestDeployJobContainer(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) containers := job.Spec.Template.Spec.Containers if len(containers) != 1 { @@ -203,7 +203,7 @@ func TestDeployJobContainer(t *testing.T) { func TestDeployJobResources(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) c := job.Spec.Template.Spec.Containers[0] if c.Resources.Requests.Cpu().String() != "1" { @@ -222,7 +222,7 @@ func TestDeployJobResources(t *testing.T) { func TestDeployJobEnvVars(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) env := job.Spec.Template.Spec.Containers[0].Env envMap := make(map[string]corev1.EnvVar) @@ -258,7 +258,7 @@ func TestDeployJobEnvVars(t *testing.T) { func TestDeployJobVolumes(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) volumes := job.Spec.Template.Spec.Volumes if len(volumes) != 2 { @@ -283,7 +283,7 @@ func TestDeployJobVolumes(t *testing.T) { func TestDeployJobAffinity(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) affinity := job.Spec.Template.Spec.Affinity if affinity == nil || affinity.NodeAffinity == nil { @@ -307,7 +307,7 @@ func TestDeployJobAffinity(t *testing.T) { func TestDeployJobImagePullSecrets(t *testing.T) { ns := createUniqueNamespace(t) secrets := []string{"registry-creds", "other-secret"} - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), secrets, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), secrets, nil, nil)) ips := job.Spec.Template.Spec.ImagePullSecrets if len(ips) != 2 { @@ -321,7 +321,7 @@ func TestDeployJobImagePullSecrets(t *testing.T) { func TestDeployJobTolerations(t *testing.T) { ns := createUniqueNamespace(t) tolerations := []corev1.Toleration{{Operator: corev1.TolerationOpExists}} - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tolerations)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tolerations, nil)) tols := job.Spec.Template.Spec.Tolerations if len(tols) != 1 || tols[0].Operator != corev1.TolerationOpExists { @@ -329,9 +329,39 @@ func TestDeployJobTolerations(t *testing.T) { } } +func TestDeployJobNodeSelector(t *testing.T) { + ns := createUniqueNamespace(t) + nodeSelector := map[string]string{ + "dedicated": "system-workload", + "kubernetes.io/os": "linux", + } + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nodeSelector)) + + podSpec := job.Spec.Template.Spec + if len(podSpec.NodeSelector) != 2 { + t.Fatalf("nodeSelector count = %d, want 2", len(podSpec.NodeSelector)) + } + if podSpec.NodeSelector["dedicated"] != "system-workload" { + t.Errorf("nodeSelector[dedicated] = %q, want %q", podSpec.NodeSelector["dedicated"], "system-workload") + } + if podSpec.NodeSelector["kubernetes.io/os"] != "linux" { + t.Errorf("nodeSelector[kubernetes.io/os] = %q, want %q", podSpec.NodeSelector["kubernetes.io/os"], "linux") + } +} + +func TestDeployJobNodeSelectorEmpty(t *testing.T) { + ns := createUniqueNamespace(t) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) + + podSpec := job.Spec.Template.Spec + if len(podSpec.NodeSelector) != 0 { + t.Errorf("nodeSelector should be empty when nil, got %v", podSpec.NodeSelector) + } +} + func TestDeployJobPodSpec(t *testing.T) { ns := createUniqueNamespace(t) - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil)) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) podSpec := job.Spec.Template.Spec if podSpec.ServiceAccountName != ServiceAccountName { @@ -346,7 +376,7 @@ func TestCleanupJob(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(ctx); err != nil { t.Fatalf("DeployJob() failed: %v", err) } @@ -367,7 +397,7 @@ func TestCleanupJob(t *testing.T) { } func TestCleanupJobNotFound(t *testing.T) { - d := NewDeployer(testClientset, nil, "default", "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, nil, "default", "run1", testEntry(), nil, nil, nil) // jobName is empty — CleanupJob should return nil if err := d.CleanupJob(context.Background()); err != nil { t.Fatalf("CleanupJob() on empty jobName should not error, got: %v", err) @@ -443,7 +473,7 @@ func TestWaitForCompletionFastPath(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(ctx); err != nil { t.Fatalf("DeployJob() failed: %v", err) } @@ -481,7 +511,7 @@ func TestWaitForCompletionFastPathFailed(t *testing.T) { ns := createUniqueNamespace(t) ctx := context.Background() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(ctx); err != nil { t.Fatalf("DeployJob() failed: %v", err) } @@ -507,7 +537,7 @@ func TestWaitForCompletionFastPathFailed(t *testing.T) { func TestWaitForCompletionJobNotFound(t *testing.T) { ns := createUniqueNamespace(t) - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) d.jobName = "nonexistent-job" err := d.WaitForCompletion(context.Background(), 1*time.Minute) @@ -546,7 +576,7 @@ func TestImagePullPolicy(t *testing.T) { func TestWaitForCompletionTimeout(t *testing.T) { ns := createUniqueNamespace(t) - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil) if err := d.DeployJob(context.Background()); err != nil { t.Fatalf("DeployJob() failed: %v", err) } diff --git a/pkg/validator/job/result_test.go b/pkg/validator/job/result_test.go index 82e3c233b..5306bd64f 100644 --- a/pkg/validator/job/result_test.go +++ b/pkg/validator/job/result_test.go @@ -60,7 +60,7 @@ func createPodForJob(t *testing.T, ns, jobName string, status corev1.PodStatus) // deployTestJob deploys a Job via envtest and returns the Deployer. func deployTestJob(t *testing.T, ns string, entry catalog.ValidatorEntry) *Deployer { t.Helper() - d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil) + d := NewDeployer(testClientset, testFactory(t, ns), ns, "run1", entry, nil, nil, nil) if err := d.DeployJob(context.Background()); err != nil { t.Fatalf("DeployJob() failed: %v", err) } diff --git a/pkg/validator/options.go b/pkg/validator/options.go index 55975d1c1..298a37722 100644 --- a/pkg/validator/options.go +++ b/pkg/validator/options.go @@ -72,3 +72,11 @@ func WithTolerations(tolerations []corev1.Toleration) Option { v.Tolerations = tolerations } } + +// WithNodeSelector sets node selector labels for validator Jobs. +// When set, validator Jobs will only be scheduled on nodes matching these labels. +func WithNodeSelector(nodeSelector map[string]string) Option { + return func(v *Validator) { + v.NodeSelector = nodeSelector + } +} diff --git a/pkg/validator/types.go b/pkg/validator/types.go index 8f467c3f0..b20bf8b16 100644 --- a/pkg/validator/types.go +++ b/pkg/validator/types.go @@ -48,6 +48,9 @@ type Validator struct { // Tolerations are applied to validator Jobs for scheduling. Tolerations []corev1.Toleration + + // NodeSelector constrains validator Jobs to nodes matching these labels. + NodeSelector map[string]string } // PhaseResult is the outcome of running all validators in a single phase. diff --git a/pkg/validator/validator.go b/pkg/validator/validator.go index 3c53444c9..d01aa2698 100644 --- a/pkg/validator/validator.go +++ b/pkg/validator/validator.go @@ -322,7 +322,7 @@ func (v *Validator) runPhase( deployer := job.NewDeployer( clientset, factory, v.Namespace, v.RunID, entry, - v.ImagePullSecrets, v.Tolerations, + v.ImagePullSecrets, v.Tolerations, v.NodeSelector, ) // Deploy From 90fa106094a5934f3799f7240e2dac60cf7f312f Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Sat, 21 Mar 2026 17:41:44 -0400 Subject: [PATCH 2/9] feat(validator): redesign --node-selector to override inner workload scheduling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The --node-selector and --toleration flags now target the inner validation workloads (NCCL benchmark workers, conformance test pods) rather than the validator orchestrator Job itself. This is the correct layer: the orchestrator is a lightweight CPU pod already handled by preferCPUNodeAffinityApply(). The real need is for clusters with non-standard GPU node labels where hardcoded platform selectors don't match. Data flow: CLI flags → Validator.NodeSelector/Tolerations → AICR_NODE_SELECTOR / AICR_TOLERATIONS env vars on orchestrator pod → validators.Context.NodeSelector / .Tolerations (parsed in LoadContext) → override nodeSelector/tolerations on NCCL worker pods and conformance test pods (gang scheduling, DRA) Changes: - pkg/validator/job/deployer: remove nodeSelector from orchestrator pod spec; serialize both fields as AICR_NODE_SELECTOR and AICR_TOLERATIONS env vars - validators/context: add NodeSelector and Tolerations fields, parse from env vars - validators/performance/nccl: post-process TrainingRuntime unstructured object to replace worker nodeSelector/tolerations when ctx overrides are set; refactor applyYAMLWithDynamicClient into parseYAMLTemplate + createUnstructured helpers - validators/conformance: thread tolerations through deployGangTestResources and deployDRATestResources to buildGangTestPod and buildDRATestPod - Tests: replace pod-spec nodeSelector tests with env var assertions; add context env var parsing tests; add overrideNCCLWorkerScheduling unit tests - Docs: rewrite --node-selector and --toleration descriptions in cli-reference.md and validate.go; add Workload Scheduling section; document AICR_NODE_SELECTOR and AICR_TOLERATIONS env vars and ctx fields in contributor/validator.md --- docs/contributor/validator.md | 23 +++ docs/user/cli-reference.md | 39 ++++- pkg/cli/validate.go | 4 +- pkg/validator/job/deployer.go | 49 +++++- pkg/validator/job/deployer_test.go | 56 ++++-- pkg/validator/options.go | 10 +- pkg/validator/types.go | 8 +- validators/conformance/dra_support_check.go | 2 +- .../conformance/gang_scheduling_check.go | 17 +- validators/conformance/secure_access_check.go | 17 +- validators/context.go | 49 ++++++ validators/context_test.go | 109 ++++++++++++ .../nccl_all_reduce_bw_constraint.go | 130 ++++++++++++-- validators/performance/nccl_test.go | 161 ++++++++++++++++++ 14 files changed, 610 insertions(+), 64 deletions(-) diff --git a/docs/contributor/validator.md b/docs/contributor/validator.md index a663261e1..68684f927 100644 --- a/docs/contributor/validator.md +++ b/docs/contributor/validator.md @@ -136,6 +136,8 @@ The validator engine mounts snapshot and recipe data as ConfigMaps: | `AICR_SNAPSHOT_PATH` | Override snapshot mount path | | `AICR_RECIPE_PATH` | Override recipe mount path | | `AICR_VALIDATOR_IMAGE_REGISTRY` | Override image registry prefix (set by user) | +| `AICR_NODE_SELECTOR` | User-provided node selector override for inner workloads (comma-separated `key=value` pairs). Set by the `--node-selector` CLI flag. Use `ctx.NodeSelector` to access the parsed value. | +| `AICR_TOLERATIONS` | User-provided toleration override for inner workloads (comma-separated `key=value:effect` entries). Set by the `--toleration` CLI flag. Use `ctx.Tolerations` to access the parsed value. | ## Context API @@ -151,11 +153,32 @@ type Context struct { Snapshot *snapshotter.Snapshot // Captured cluster state Recipe *recipe.RecipeResult // Recipe with validation config Namespace string // Validation namespace + NodeSelector map[string]string // User-provided node selector override (nil = use defaults) + Tolerations []corev1.Toleration // User-provided toleration override (nil = use defaults) } ``` `LoadContext()` builds this from the container environment: reads mounted ConfigMaps, creates in-cluster K8s clients, and sets a timeout from `defaults.CheckExecutionTimeout`. +### Scheduling Overrides + +When creating inner workloads (pods, Jobs, TrainJobs), check `ctx.NodeSelector` and `ctx.Tolerations` before applying hardcoded platform selectors. If non-nil, these override the default scheduling constraints to support clusters with non-standard GPU node labels or taints. + +```go +// Apply scheduling overrides when creating inner workload pods. +nodeSelector := map[string]string{"cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb"} +if ctx.NodeSelector != nil { + nodeSelector = ctx.NodeSelector // user override replaces platform default +} + +tolerations := []corev1.Toleration{{Operator: corev1.TolerationOpExists}} +if ctx.Tolerations != nil { + tolerations = ctx.Tolerations // user override replaces default tolerate-all +} +``` + +Validators that use `nodeName` pinning (e.g., nvidia-smi, DRA isolation) bypass the scheduler entirely and should not apply `ctx.NodeSelector`. + ### Helper Methods **`ctx.Timeout(d)`** — Create a child context with a specific timeout: diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 9dc5d3ff7..4112262c0 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -564,8 +564,8 @@ aicr validate [flags] | `--image-pull-secret` | | string[] | | Image pull secrets for private registries (repeatable) | | `--job-name` | | string | aicr-validate | Name for the validation Job | | `--service-account-name` | | string | aicr | ServiceAccount name for validation Job | -| `--node-selector` | | string[] | | Node selector for validation scheduling (key=value, repeatable) | -| `--toleration` | | string[] | | Tolerations for validation scheduling (key=value:effect, repeatable) | +| `--node-selector` | | string[] | | Override GPU node selection for validation workloads. Replaces platform-specific selectors (e.g., `cloud.google.com/gke-accelerator`, `node.kubernetes.io/instance-type`) on inner workloads like NCCL benchmark pods. Use when GPU nodes have non-standard labels. Does not affect the validator orchestrator Job. (format: key=value, repeatable) | +| `--toleration` | | string[] | | Override tolerations for validation workloads. Replaces the default tolerate-all policy on inner workloads like NCCL benchmark pods and conformance test pods. Does not affect the validator orchestrator Job. (format: key=value:effect, repeatable) | | `--timeout` | | duration | 5m | Timeout for validation Job completion | | `--no-cleanup` | | bool | false | Skip removal of Job and RBAC resources on completion | | `--require-gpu` | | bool | false | Require GPU resources on the validation pod | @@ -667,8 +667,43 @@ aicr validate \ --recipe recipe.yaml \ --snapshot cm://gpu-operator/aicr-snapshot \ --kubeconfig ~/.kube/prod-cluster + +# Validate on a cluster with custom GPU node labels (non-standard labels that AICR doesn't +# recognize by default, e.g., using a custom node pool label instead of cloud-provider defaults) +aicr validate \ + --recipe recipe.yaml \ + --node-selector my-org/gpu-pool=true \ + --phase performance + +# Override both node selector and tolerations for a non-standard taint setup +aicr validate \ + --recipe recipe.yaml \ + --node-selector gpu-type=h100 \ + --toleration gpu-type=h100:NoSchedule ``` +**Workload Scheduling:** + +The `--node-selector` and `--toleration` flags control scheduling for **validation +workloads** — the inner pods that validators create to test cluster functionality +(e.g., NCCL benchmark workers, conformance test pods). They do **not** affect the +validator orchestrator Job, which runs lightweight check logic and is placed on +CPU-preferred nodes automatically. + +When `--node-selector` is provided, it replaces the platform-specific selectors +that validators use by default: + +| Platform | Default Selector (replaced) | Use Case | +|----------|-----------------------------|----------| +| GKE | `cloud.google.com/gke-accelerator: nvidia-h100-mega-80gb` | Non-standard GPU node pool labels | +| EKS | `node.kubernetes.io/instance-type: ` | Custom node pool labels | + +When `--toleration` is provided, it replaces the default tolerate-all policy +(`operator: Exists`) on workloads that need to land on tainted GPU nodes. + +Validators that use `nodeName` pinning (nvidia-smi, DRA isolation test) or +DRA ResourceClaims for placement (gang scheduling) are not affected by these flags. + **Output Structure ([CTRF](https://ctrf.io/) JSON):** Results are output in CTRF (Common Test Report Format) — an industry-standard schema for test reporting. diff --git a/pkg/cli/validate.go b/pkg/cli/validate.go index 4cd696e49..3064e0e1e 100644 --- a/pkg/cli/validate.go +++ b/pkg/cli/validate.go @@ -381,12 +381,12 @@ func validateCmdFlags() []cli.Flag { }, &cli.StringSliceFlag{ Name: "node-selector", - Usage: "Node selector for snapshot agent and validation Job scheduling (format: key=value, can be repeated).", + Usage: "Override GPU node selection for validation workloads (format: key=value, can be repeated). Replaces platform-specific selectors on inner workloads (e.g., NCCL benchmark pods). Use when GPU nodes have non-standard labels. Does not affect the validator orchestrator Job.", Category: "Scheduling", }, &cli.StringSliceFlag{ Name: "toleration", - Usage: "Toleration for snapshot agent and validation Job scheduling (format: key=value:effect). By default, all taints are tolerated.", + Usage: "Override tolerations for validation workloads (format: key=value:effect, can be repeated). Replaces the default tolerate-all policy on inner workloads. Does not affect the validator orchestrator Job.", Category: "Scheduling", }, &cli.DurationFlag{ diff --git a/pkg/validator/job/deployer.go b/pkg/validator/job/deployer.go index 42561c04d..4171d0733 100644 --- a/pkg/validator/job/deployer.go +++ b/pkg/validator/job/deployer.go @@ -49,7 +49,7 @@ type Deployer struct { jobName string // Unique name generated client-side (set by DeployJob) imagePullSecrets []string tolerations []corev1.Toleration - nodeSelector map[string]string + nodeSelector map[string]string // passed through to inner workloads via AICR_NODE_SELECTOR env var } // NewDeployer creates a Deployer for a single validator catalog entry. @@ -200,12 +200,51 @@ func (d *Deployer) buildEnvApply() []*applycorev1.EnvVarApplyConfiguration { WithValueFrom(applycorev1.EnvVarSource(). WithFieldRef(applycorev1.ObjectFieldSelector().WithFieldPath("metadata.namespace"))), ) + // Pass scheduling overrides to the validator container so it can apply them + // to the inner workloads it creates (e.g., NCCL benchmark pods). These env + // vars are NOT used to schedule the orchestrator Job itself. + if len(d.nodeSelector) > 0 { + env = append(env, applycorev1.EnvVar().WithName("AICR_NODE_SELECTOR").WithValue(serializeNodeSelector(d.nodeSelector))) + } + if len(d.tolerations) > 0 { + env = append(env, applycorev1.EnvVar().WithName("AICR_TOLERATIONS").WithValue(serializeTolerations(d.tolerations))) + } for _, e := range d.entry.Env { env = append(env, applycorev1.EnvVar().WithName(e.Name).WithValue(e.Value)) } return env } +// serializeNodeSelector encodes a nodeSelector map as a comma-separated key=value string. +// This matches the format expected by snapshotter.ParseNodeSelectors on the receiving end. +func serializeNodeSelector(ns map[string]string) string { + pairs := make([]string, 0, len(ns)) + for k, v := range ns { + pairs = append(pairs, k+"="+v) + } + return strings.Join(pairs, ",") +} + +// serializeTolerations encodes tolerations as a comma-separated list. +// Format per toleration: key=value:Effect or key:Effect (for tolerations without value). +// This matches the format expected by snapshotter.ParseTolerations on the receiving end. +func serializeTolerations(tols []corev1.Toleration) string { + parts := make([]string, 0, len(tols)) + for _, t := range tols { + var part string + if t.Key == "" { + // Universal toleration (operator: Exists, no key) + part = ":" + } else if t.Value != "" { + part = t.Key + "=" + t.Value + ":" + string(t.Effect) + } else { + part = t.Key + ":" + string(t.Effect) + } + parts = append(parts, part) + } + return strings.Join(parts, ",") +} + // imagePullPolicy returns the appropriate pull policy based on the image reference. // Local images (ko.local, kind.local, localhost) always use IfNotPresent since they // are side-loaded into the cluster and cannot be pulled from a registry. @@ -236,19 +275,13 @@ func (d *Deployer) buildImagePullSecretsApply() []*applycorev1.LocalObjectRefere } func (d *Deployer) buildPodSpecApply() *applycorev1.PodSpecApplyConfiguration { - spec := applycorev1.PodSpec(). + return applycorev1.PodSpec(). WithServiceAccountName(ServiceAccountName). WithRestartPolicy(corev1.RestartPolicyNever). WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())). WithImagePullSecrets(d.buildImagePullSecretsApply()...). WithTolerations(d.buildTolerationsApply()...). WithAffinity(preferCPUNodeAffinityApply()) - - if len(d.nodeSelector) > 0 { - spec = spec.WithNodeSelector(d.nodeSelector) - } - - return spec } func (d *Deployer) buildTolerationsApply() []*applycorev1.TolerationApplyConfiguration { diff --git a/pkg/validator/job/deployer_test.go b/pkg/validator/job/deployer_test.go index a45e8b65e..fbc61650b 100644 --- a/pkg/validator/job/deployer_test.go +++ b/pkg/validator/job/deployer_test.go @@ -329,33 +329,57 @@ func TestDeployJobTolerations(t *testing.T) { } } -func TestDeployJobNodeSelector(t *testing.T) { +func TestDeployJobNodeSelectorEnvVar(t *testing.T) { ns := createUniqueNamespace(t) - nodeSelector := map[string]string{ - "dedicated": "system-workload", - "kubernetes.io/os": "linux", - } + // Use a single-key selector to avoid map ordering issues in serialization. + nodeSelector := map[string]string{"my-org/gpu-pool": "true"} job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nodeSelector)) - podSpec := job.Spec.Template.Spec - if len(podSpec.NodeSelector) != 2 { - t.Fatalf("nodeSelector count = %d, want 2", len(podSpec.NodeSelector)) + env := job.Spec.Template.Spec.Containers[0].Env + envMap := make(map[string]corev1.EnvVar) + for _, e := range env { + envMap[e.Name] = e } - if podSpec.NodeSelector["dedicated"] != "system-workload" { - t.Errorf("nodeSelector[dedicated] = %q, want %q", podSpec.NodeSelector["dedicated"], "system-workload") + + // AICR_NODE_SELECTOR must be set so the validator container can apply it to inner workloads. + if envMap["AICR_NODE_SELECTOR"].Value != "my-org/gpu-pool=true" { + t.Errorf("AICR_NODE_SELECTOR = %q, want %q", envMap["AICR_NODE_SELECTOR"].Value, "my-org/gpu-pool=true") } - if podSpec.NodeSelector["kubernetes.io/os"] != "linux" { - t.Errorf("nodeSelector[kubernetes.io/os] = %q, want %q", podSpec.NodeSelector["kubernetes.io/os"], "linux") + + // The orchestrator Job pod spec must NOT have a nodeSelector — scheduling of the + // orchestrator is handled by preferCPUNodeAffinityApply(), not the user flag. + if len(job.Spec.Template.Spec.NodeSelector) != 0 { + t.Errorf("orchestrator pod spec nodeSelector should be empty, got %v", job.Spec.Template.Spec.NodeSelector) } } -func TestDeployJobNodeSelectorEmpty(t *testing.T) { +func TestDeployJobNodeSelectorEnvVarAbsent(t *testing.T) { ns := createUniqueNamespace(t) job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, nil, nil)) - podSpec := job.Spec.Template.Spec - if len(podSpec.NodeSelector) != 0 { - t.Errorf("nodeSelector should be empty when nil, got %v", podSpec.NodeSelector) + for _, e := range job.Spec.Template.Spec.Containers[0].Env { + if e.Name == "AICR_NODE_SELECTOR" { + t.Errorf("AICR_NODE_SELECTOR should be absent when nodeSelector is nil, got %q", e.Value) + } + } +} + +func TestDeployJobTolerationsEnvVar(t *testing.T) { + ns := createUniqueNamespace(t) + tolerations := []corev1.Toleration{ + {Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + } + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tolerations, nil)) + + env := job.Spec.Template.Spec.Containers[0].Env + envMap := make(map[string]corev1.EnvVar) + for _, e := range env { + envMap[e.Name] = e + } + + // AICR_TOLERATIONS must be set so validators can apply it to inner workloads. + if envMap["AICR_TOLERATIONS"].Value != "gpu-type=h100:NoSchedule" { + t.Errorf("AICR_TOLERATIONS = %q, want %q", envMap["AICR_TOLERATIONS"].Value, "gpu-type=h100:NoSchedule") } } diff --git a/pkg/validator/options.go b/pkg/validator/options.go index 298a37722..02c4f7f0c 100644 --- a/pkg/validator/options.go +++ b/pkg/validator/options.go @@ -65,16 +65,18 @@ func WithNoCluster(noCluster bool) Option { } } -// WithTolerations sets tolerations for validator Jobs. -// Default: tolerate-all. +// WithTolerations sets tolerations to override inner workload scheduling. +// When set, validators pass these tolerations to the workloads they create (e.g., NCCL +// benchmark pods), replacing default tolerate-all policy. Does not affect the orchestrator Job. func WithTolerations(tolerations []corev1.Toleration) Option { return func(v *Validator) { v.Tolerations = tolerations } } -// WithNodeSelector sets node selector labels for validator Jobs. -// When set, validator Jobs will only be scheduled on nodes matching these labels. +// WithNodeSelector sets node selector labels to override inner workload scheduling. +// When set, validators pass these selectors to the workloads they create (e.g., NCCL +// benchmark pods), replacing platform-specific defaults. Does not affect the orchestrator Job. func WithNodeSelector(nodeSelector map[string]string) Option { return func(v *Validator) { v.NodeSelector = nodeSelector diff --git a/pkg/validator/types.go b/pkg/validator/types.go index b20bf8b16..72856af51 100644 --- a/pkg/validator/types.go +++ b/pkg/validator/types.go @@ -46,10 +46,14 @@ type Validator struct { // NoCluster controls whether to skip cluster operations (dry-run mode). NoCluster bool - // Tolerations are applied to validator Jobs for scheduling. + // Tolerations are passed to validation workloads (e.g., NCCL benchmark pods) + // to override their default scheduling constraints. Does not affect the + // orchestrator Job itself. Tolerations []corev1.Toleration - // NodeSelector constrains validator Jobs to nodes matching these labels. + // NodeSelector is passed to validation workloads (e.g., NCCL benchmark pods) + // to override platform-specific node selectors. Use when GPU nodes have + // non-standard labels. Does not affect the orchestrator Job itself. NodeSelector map[string]string } diff --git a/validators/conformance/dra_support_check.go b/validators/conformance/dra_support_check.go index 0cfc84cc8..2a10cfe36 100644 --- a/validators/conformance/dra_support_check.go +++ b/validators/conformance/dra_support_check.go @@ -148,7 +148,7 @@ func validateDRAAllocation(ctx *validators.Context, dynClient dynamic.Interface) fmt.Sprintf("Created Namespace=%s ResourceClaim=%s Pod=%s via Kubernetes API", draTestNamespace, run.claimName, run.podName)) - if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run); err != nil { + if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run, ctx.Tolerations); err != nil { return err } defer func() { //nolint:contextcheck // Fresh context: parent may be canceled during cleanup diff --git a/validators/conformance/gang_scheduling_check.go b/validators/conformance/gang_scheduling_check.go index 28d49b0c7..2bfda2612 100644 --- a/validators/conformance/gang_scheduling_check.go +++ b/validators/conformance/gang_scheduling_check.go @@ -195,7 +195,7 @@ func CheckGangScheduling(ctx *validators.Context) error { fmt.Sprintf("Created PodGroup=%s ResourceClaims=%s,%s Pods=%s,%s in namespace=%s", run.groupName, run.claims[0], run.claims[1], run.pods[0], run.pods[1], gangTestNamespace)) - if err = deployGangTestResources(ctx.Ctx, ctx.Clientset, dynClient, run); err != nil { + if err = deployGangTestResources(ctx.Ctx, ctx.Clientset, dynClient, run, ctx.Tolerations); err != nil { return err } @@ -275,7 +275,8 @@ func collectGangTestArtifacts(ctx *validators.Context, dynClient dynamic.Interfa } // deployGangTestResources creates the namespace, PodGroup, ResourceClaims, and Pods. -func deployGangTestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *gangTestRun) error { +// tolerations, when non-nil, replace the default tolerate-all policy on test pods. +func deployGangTestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *gangTestRun, tolerations []corev1.Toleration) error { // 1. Create namespace (idempotent). ns := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{Name: gangTestNamespace}, @@ -300,7 +301,7 @@ func deployGangTestResources(ctx context.Context, clientset kubernetes.Interface fmt.Sprintf("failed to create ResourceClaim %s", run.claims[i]), err) } - pod := buildGangTestPod(run, i) + pod := buildGangTestPod(run, i, tolerations) if _, err := clientset.CoreV1().Pods(gangTestNamespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { return errors.Wrap(errors.ErrCodeInternal, fmt.Sprintf("failed to create gang test pod %s", run.pods[i]), err) @@ -505,7 +506,11 @@ func buildGangResourceClaim(run *gangTestRun, index int) *unstructured.Unstructu } // buildGangTestPod returns the Pod spec for a gang scheduling test worker. -func buildGangTestPod(run *gangTestRun, index int) *corev1.Pod { +// tolerations, when non-nil, replace the default tolerate-all policy. +func buildGangTestPod(run *gangTestRun, index int, tolerations []corev1.Toleration) *corev1.Pod { + if tolerations == nil { + tolerations = []corev1.Toleration{{Operator: corev1.TolerationOpExists}} + } return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: run.pods[index], @@ -518,9 +523,7 @@ func buildGangTestPod(run *gangTestRun, index int) *corev1.Pod { Spec: corev1.PodSpec{ SchedulerName: "kai-scheduler", RestartPolicy: corev1.RestartPolicyNever, - Tolerations: []corev1.Toleration{ - {Operator: corev1.TolerationOpExists}, - }, + Tolerations: tolerations, ResourceClaims: []corev1.PodResourceClaim{ { Name: "gpu", diff --git a/validators/conformance/secure_access_check.go b/validators/conformance/secure_access_check.go index 9ceadb41a..bed3ad454 100644 --- a/validators/conformance/secure_access_check.go +++ b/validators/conformance/secure_access_check.go @@ -112,7 +112,7 @@ func CheckSecureAcceleratorAccess(ctx *validators.Context) error { } // Deploy DRA test resources and ensure cleanup. - if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run); err != nil { + if err = deployDRATestResources(ctx.Ctx, ctx.Clientset, dynClient, run, ctx.Tolerations); err != nil { return err } defer func() { //nolint:contextcheck // Fresh context: parent may be canceled during cleanup @@ -240,7 +240,8 @@ func collectSecureAccessBaselineArtifacts(ctx *validators.Context, dynClient dyn } // deployDRATestResources creates the namespace, ResourceClaim, and Pod for the DRA test. -func deployDRATestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *draTestRun) error { +// tolerations, when non-nil, replace the default tolerate-all policy on the test pod. +func deployDRATestResources(ctx context.Context, clientset kubernetes.Interface, dynClient dynamic.Interface, run *draTestRun, tolerations []corev1.Toleration) error { // 1. Create namespace (idempotent). ns := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{Name: draTestNamespace}, @@ -257,7 +258,7 @@ func deployDRATestResources(ctx context.Context, clientset kubernetes.Interface, } // 3. Create Pod with unique name. - pod := buildDRATestPod(run) + pod := buildDRATestPod(run, tolerations) if _, err := clientset.CoreV1().Pods(draTestNamespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { return errors.Wrap(errors.ErrCodeInternal, "failed to create DRA test pod", err) } @@ -509,7 +510,11 @@ func cleanupDRATestResources(ctx context.Context, clientset kubernetes.Interface } // buildDRATestPod returns the Pod spec for the DRA GPU allocation test. -func buildDRATestPod(run *draTestRun) *corev1.Pod { +// tolerations, when non-nil, replace the default tolerate-all policy. +func buildDRATestPod(run *draTestRun, tolerations []corev1.Toleration) *corev1.Pod { + if tolerations == nil { + tolerations = []corev1.Toleration{{Operator: corev1.TolerationOpExists}} + } return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: run.podName, @@ -517,9 +522,7 @@ func buildDRATestPod(run *draTestRun) *corev1.Pod { }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, - Tolerations: []corev1.Toleration{ - {Operator: corev1.TolerationOpExists}, - }, + Tolerations: tolerations, ResourceClaims: []corev1.PodResourceClaim{ { Name: "gpu", diff --git a/validators/context.go b/validators/context.go index bccc9a410..498a77b88 100644 --- a/validators/context.go +++ b/validators/context.go @@ -27,6 +27,7 @@ import ( "github.com/NVIDIA/aicr/pkg/recipe" "github.com/NVIDIA/aicr/pkg/serializer" "github.com/NVIDIA/aicr/pkg/snapshotter" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -57,6 +58,16 @@ type Context struct { // Namespace is the validation namespace. Namespace string + + // NodeSelector overrides platform-specific node selectors on inner workloads + // (e.g., NCCL benchmark worker pods). Nil means use the validator's default selectors. + // Set from the AICR_NODE_SELECTOR env var (comma-separated key=value pairs). + NodeSelector map[string]string + + // Tolerations overrides the default tolerate-all policy on inner workloads. + // Nil means use the validator's default tolerations. + // Set from the AICR_TOLERATIONS env var (comma-separated key=value:effect entries). + Tolerations []corev1.Toleration } // LoadContext creates a Context from the v2 container environment. @@ -103,6 +114,18 @@ func LoadContext() (*Context, error) { } } + // Parse optional scheduling overrides for inner workloads. + nodeSelector, err := parseNodeSelectorEnv() + if err != nil { + cancel() + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to parse AICR_NODE_SELECTOR", err) + } + tolerations, err := parseTolerationEnv() + if err != nil { + cancel() + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to parse AICR_TOLERATIONS", err) + } + return &Context{ Ctx: ctx, Cancel: cancel, @@ -112,9 +135,35 @@ func LoadContext() (*Context, error) { Snapshot: snap, Recipe: rec, Namespace: namespace, + NodeSelector: nodeSelector, + Tolerations: tolerations, }, nil } +// parseNodeSelectorEnv reads AICR_NODE_SELECTOR and parses it into a map. +// Returns nil (no override) if the env var is unset or empty. +func parseNodeSelectorEnv() (map[string]string, error) { + raw := os.Getenv("AICR_NODE_SELECTOR") + if raw == "" { + return nil, nil + } + entries := strings.Split(raw, ",") + return snapshotter.ParseNodeSelectors(entries) +} + +// parseTolerationEnv reads AICR_TOLERATIONS and parses it into a slice of Tolerations. +// Returns nil (no override) if the env var is unset or empty. +func parseTolerationEnv() ([]corev1.Toleration, error) { + raw := os.Getenv("AICR_TOLERATIONS") + if raw == "" { + return nil, nil + } + entries := strings.Split(raw, ",") + // Use ParseTolerations but we must not pass empty slice (it returns DefaultTolerations). + // Since we guard against empty raw above, entries will always be non-empty here. + return snapshotter.ParseTolerations(entries) +} + // Timeout returns a child context with the specified timeout. // The caller is responsible for calling the returned CancelFunc. func (c *Context) Timeout(d time.Duration) (context.Context, context.CancelFunc) { diff --git a/validators/context_test.go b/validators/context_test.go index 87c9ec894..b0acb780d 100644 --- a/validators/context_test.go +++ b/validators/context_test.go @@ -16,6 +16,8 @@ package validators import ( "testing" + + corev1 "k8s.io/api/core/v1" ) func TestResolveNamespace(t *testing.T) { @@ -53,6 +55,113 @@ func TestResolveNamespace(t *testing.T) { } } +func TestParseNodeSelectorEnv(t *testing.T) { + tests := []struct { + name string + envVal string + expected map[string]string + wantErr bool + }{ + { + name: "env var unset returns nil", + envVal: "", + expected: nil, + }, + { + name: "single key=value pair", + envVal: "my-org/gpu-pool=true", + expected: map[string]string{"my-org/gpu-pool": "true"}, + }, + { + name: "multiple key=value pairs", + envVal: "accelerator=h100,pool=gpu", + expected: map[string]string{"accelerator": "h100", "pool": "gpu"}, + }, + { + name: "invalid format missing equals", + envVal: "invalidkey", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("AICR_NODE_SELECTOR", tt.envVal) + got, err := parseNodeSelectorEnv() + if (err != nil) != tt.wantErr { + t.Errorf("parseNodeSelectorEnv() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr { + if len(got) != len(tt.expected) { + t.Errorf("parseNodeSelectorEnv() = %v, want %v", got, tt.expected) + return + } + for k, want := range tt.expected { + if got[k] != want { + t.Errorf("parseNodeSelectorEnv()[%q] = %q, want %q", k, got[k], want) + } + } + } + }) + } +} + +func TestParseTolerationEnv(t *testing.T) { + tests := []struct { + name string + envVal string + expected []corev1.Toleration + wantErr bool + }{ + { + name: "env var unset returns nil", + envVal: "", + expected: nil, + }, + { + name: "key=value:effect toleration", + envVal: "gpu-type=h100:NoSchedule", + expected: []corev1.Toleration{ + {Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + }, + }, + { + name: "key:effect toleration (exists operator)", + envVal: "dedicated:NoExecute", + expected: []corev1.Toleration{ + {Key: "dedicated", Effect: corev1.TaintEffectNoExecute, Operator: corev1.TolerationOpExists}, + }, + }, + { + name: "invalid format", + envVal: "badformat", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv("AICR_TOLERATIONS", tt.envVal) + got, err := parseTolerationEnv() + if (err != nil) != tt.wantErr { + t.Errorf("parseTolerationEnv() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr { + if len(got) != len(tt.expected) { + t.Errorf("parseTolerationEnv() = %v, want %v", got, tt.expected) + return + } + for i, want := range tt.expected { + if got[i].Key != want.Key || got[i].Value != want.Value || + got[i].Effect != want.Effect || got[i].Operator != want.Operator { + t.Errorf("parseTolerationEnv()[%d] = %+v, want %+v", i, got[i], want) + } + } + } + }) + } +} + func TestEnvOrDefault(t *testing.T) { tests := []struct { name string diff --git a/validators/performance/nccl_all_reduce_bw_constraint.go b/validators/performance/nccl_all_reduce_bw_constraint.go index b9f02aa3a..06ecbb15b 100644 --- a/validators/performance/nccl_all_reduce_bw_constraint.go +++ b/validators/performance/nccl_all_reduce_bw_constraint.go @@ -346,8 +346,18 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface } } - // Apply per-platform runtime: testdata/{accelerator}/{service}/runtime.yaml - if err := applyYAMLWithDynamicClient(ctx.Ctx, dynamicClient, trainingRuntimeGVR, config.Namespace, templatePath(accelerator, service, "runtime.yaml"), templateData); err != nil { + // Parse the per-platform runtime template and apply scheduling overrides + // before creating the resource. This replaces the hardcoded platform-specific + // nodeSelector (e.g., instance-type, gke-accelerator) when the user has + // provided --node-selector or --toleration flags for non-standard clusters. + runtimeObj, err := parseYAMLTemplate(templatePath(accelerator, service, "runtime.yaml"), templateData) + if err != nil { + return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse training runtime template", err) + } + if err := overrideNCCLWorkerScheduling(runtimeObj, ctx.NodeSelector, ctx.Tolerations); err != nil { + return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to override NCCL worker scheduling", err) + } + if err := createUnstructured(ctx.Ctx, dynamicClient, trainingRuntimeGVR, config.Namespace, runtimeObj); err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to apply training runtime", err) } slog.Info("Applied TrainingRuntime", "service", service) @@ -370,37 +380,127 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface } // applyYAMLWithDynamicClient reads a YAML template, performs substitution, and applies it using dynamic client -func applyYAMLWithDynamicClient(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace, templatePath string, data map[string]string) error { - content, err := os.ReadFile(templatePath) +func applyYAMLWithDynamicClient(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace, path string, data map[string]string) error { + obj, err := parseYAMLTemplate(path, data) if err != nil { - return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to read template", err) + return err } + return createUnstructured(ctx, dynamicClient, gvr, namespace, obj) +} - // Perform template substitution +// parseYAMLTemplate reads a YAML template file, performs ${KEY} substitution, +// and unmarshals it into an unstructured object. +func parseYAMLTemplate(path string, data map[string]string) (*unstructured.Unstructured, error) { + content, err := os.ReadFile(path) + if err != nil { + return nil, aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to read template", err) + } yamlContent := string(content) for key, value := range data { yamlContent = strings.ReplaceAll(yamlContent, "${"+key+"}", value) } - - // Parse YAML to unstructured object obj := &unstructured.Unstructured{} - if unmarshalErr := yaml.Unmarshal([]byte(yamlContent), obj); unmarshalErr != nil { - return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse YAML", unmarshalErr) + if err := yaml.Unmarshal([]byte(yamlContent), obj); err != nil { + return nil, aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse YAML", err) } + return obj, nil +} - // Apply with timeout +// createUnstructured creates a namespaced resource from an unstructured object with a timeout. +func createUnstructured(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string, obj *unstructured.Unstructured) error { applyCtx, cancel := context.WithTimeout(ctx, defaults.DiagnosticTimeout) defer cancel() - - // Create the resource - _, err = dynamicClient.Resource(gvr).Namespace(namespace).Create(applyCtx, obj, metav1.CreateOptions{}) + _, err := dynamicClient.Resource(gvr).Namespace(namespace).Create(applyCtx, obj, metav1.CreateOptions{}) if err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to create resource", err) } - return nil } +// overrideNCCLWorkerScheduling replaces the nodeSelector and/or tolerations +// on the "node" (worker) replicatedJob within a TrainingRuntime unstructured object. +// Only overrides when the provided slice/map is non-nil and non-empty. +func overrideNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector map[string]string, tolerations []v1.Toleration) error { + if len(nodeSelector) == 0 && len(tolerations) == 0 { + return nil + } + + replicatedJobs, found, err := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + if err != nil || !found { + return aicrErrors.New(aicrErrors.ErrCodeInternal, "replicatedJobs not found in TrainingRuntime") + } + + for i, jobRaw := range replicatedJobs { + jobMap, ok := jobRaw.(map[string]interface{}) + if !ok { + continue + } + name, _, _ := unstructured.NestedString(jobMap, "name") + if name != "node" { + continue + } + + // Navigate deep into the worker pod spec. + workerPodSpec, found := nestedMap(jobMap, "template", "spec", "template", "spec") + if !found { + return aicrErrors.New(aicrErrors.ErrCodeInternal, "worker pod spec not found in TrainingRuntime node job") + } + + if len(nodeSelector) > 0 { + ns := make(map[string]interface{}, len(nodeSelector)) + for k, v := range nodeSelector { + ns[k] = v + } + workerPodSpec["nodeSelector"] = ns + slog.Info("Overriding NCCL worker nodeSelector", "selector", nodeSelector) + } + + if len(tolerations) > 0 { + tolList := make([]interface{}, 0, len(tolerations)) + for _, t := range tolerations { + tolMap := map[string]interface{}{ + "operator": string(t.Operator), + } + if t.Key != "" { + tolMap["key"] = t.Key + } + if t.Value != "" { + tolMap["value"] = t.Value + } + if t.Effect != "" { + tolMap["effect"] = string(t.Effect) + } + tolList = append(tolList, tolMap) + } + workerPodSpec["tolerations"] = tolList + slog.Info("Overriding NCCL worker tolerations", "count", len(tolerations)) + } + + replicatedJobs[i] = jobMap + break + } + + return unstructured.SetNestedSlice(obj.Object, replicatedJobs, "spec", "template", "spec", "replicatedJobs") +} + +// nestedMap navigates a chain of string keys through nested map[string]interface{} values. +// Returns the target map and true if found, nil and false otherwise. +func nestedMap(m map[string]interface{}, keys ...string) (map[string]interface{}, bool) { + current := m + for _, key := range keys { + next, ok := current[key] + if !ok { + return nil, false + } + nextMap, ok := next.(map[string]interface{}) + if !ok { + return nil, false + } + current = nextMap + } + return current, true +} + // waitForLauncherPodAndGetLogs waits for the launcher pod to be created and retrieves logs func waitForLauncherPodAndGetLogs(ctx *validators.Context, podHelper *helper.PodLifecycle) (string, error) { slog.Info("Waiting for launcher pod to be created...") diff --git a/validators/performance/nccl_test.go b/validators/performance/nccl_test.go index 55a38f3f1..936a3bff0 100644 --- a/validators/performance/nccl_test.go +++ b/validators/performance/nccl_test.go @@ -21,9 +21,170 @@ import ( "testing" "github.com/NVIDIA/aicr/pkg/recipe" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) +func TestOverrideNCCLWorkerScheduling_NodeSelector(t *testing.T) { + // Build a minimal TrainingRuntime-like unstructured object matching the real template structure. + workerPodSpec := map[string]interface{}{ + "nodeSelector": map[string]interface{}{ + "node.kubernetes.io/instance-type": "p5.48xlarge", + }, + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, + }, + } + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "replicatedJobs": []interface{}{ + map[string]interface{}{"name": "launcher"}, + map[string]interface{}{ + "name": "node", + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": workerPodSpec, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + nodeSelector := map[string]string{"my-org/gpu-pool": "true"} + if err := overrideNCCLWorkerScheduling(obj, nodeSelector, nil); err != nil { + t.Fatalf("overrideNCCLWorkerScheduling() error = %v", err) + } + + // Verify the nodeSelector was replaced in the worker spec. + jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + for _, j := range jobs { + jm, _ := j.(map[string]interface{}) + name, _, _ := unstructured.NestedString(jm, "name") + if name != "node" { + continue + } + ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") + if ns["my-org/gpu-pool"] != "true" { + t.Errorf("worker nodeSelector = %v, want my-org/gpu-pool=true", ns) + } + if _, hasOld := ns["node.kubernetes.io/instance-type"]; hasOld { + t.Error("old instance-type selector should have been replaced") + } + } +} + +func TestOverrideNCCLWorkerScheduling_Tolerations(t *testing.T) { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "replicatedJobs": []interface{}{ + map[string]interface{}{"name": "launcher"}, + map[string]interface{}{ + "name": "node", + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "nodeSelector": map[string]interface{}{ + "cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb", + }, + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + tolerations := []corev1.Toleration{ + {Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + } + if err := overrideNCCLWorkerScheduling(obj, nil, tolerations); err != nil { + t.Fatalf("overrideNCCLWorkerScheduling() error = %v", err) + } + + // nodeSelector should be unchanged (only tolerations overridden). + jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + for _, j := range jobs { + jm, _ := j.(map[string]interface{}) + name, _, _ := unstructured.NestedString(jm, "name") + if name != "node" { + continue + } + ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") + if ns["cloud.google.com/gke-accelerator"] != "nvidia-h100-mega-80gb" { + t.Errorf("nodeSelector should be unchanged, got %v", ns) + } + tolsRaw, _, _ := unstructured.NestedSlice(jm, "template", "spec", "template", "spec", "tolerations") + if len(tolsRaw) != 1 { + t.Fatalf("tolerations count = %d, want 1", len(tolsRaw)) + } + tol, _ := tolsRaw[0].(map[string]interface{}) + if tol["key"] != "gpu-type" || tol["value"] != "h100" || tol["effect"] != "NoSchedule" { + t.Errorf("toleration = %v, want gpu-type=h100:NoSchedule", tol) + } + } +} + +func TestOverrideNCCLWorkerScheduling_NoOp(t *testing.T) { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "replicatedJobs": []interface{}{ + map[string]interface{}{ + "name": "node", + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "nodeSelector": map[string]interface{}{ + "original-key": "original-value", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + // Passing nil for both should leave the object unchanged. + if err := overrideNCCLWorkerScheduling(obj, nil, nil); err != nil { + t.Fatalf("overrideNCCLWorkerScheduling() error = %v", err) + } + + jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + jm, _ := jobs[0].(map[string]interface{}) + ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") + if ns["original-key"] != "original-value" { + t.Errorf("nodeSelector should be unchanged, got %v", ns) + } +} + func TestTemplatePath(t *testing.T) { tests := []struct { name string From bd57fafcfb68d74e5b8f94b4b4503e3e80835898 Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Sat, 21 Mar 2026 18:01:58 -0400 Subject: [PATCH 3/9] refactor(nccl): unify worker scheduling into a single code path Pull platform-specific nodeSelectors and tolerations (EKS instance-type, GKE gke-accelerator+gpu taint) out of runtime.yaml templates and into platformWorkerScheduling() in Go. The effective scheduling (platform default or user override) is now always applied via applyNCCLWorkerScheduling, eliminating the conditional post-processing path. Also removes ${INSTANCE_TYPE} from EKS templateData since it's no longer needed in the YAML template. --- .../nccl_all_reduce_bw_constraint.go | 58 ++++++++++---- validators/performance/nccl_test.go | 76 ++++++++----------- .../testdata/h100/eks/runtime.yaml | 4 - .../testdata/h100/gke/runtime.yaml | 8 -- 4 files changed, 73 insertions(+), 73 deletions(-) diff --git a/validators/performance/nccl_all_reduce_bw_constraint.go b/validators/performance/nccl_all_reduce_bw_constraint.go index 06ecbb15b..9a7f4f903 100644 --- a/validators/performance/nccl_all_reduce_bw_constraint.go +++ b/validators/performance/nccl_all_reduce_bw_constraint.go @@ -309,6 +309,8 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface "MAX_MESSAGE_SIZE": maxMessageSize, } + var instanceType string + // For GKE, discover GPU NIC network names (cluster-specific prefixes). if service == recipe.CriteriaServiceGKE { gpuNICs, err := discoverGKEGPUNICNetworks(ctx.Ctx, dynamicClient) @@ -328,11 +330,11 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface // EFA count of 0 is valid — NCCL falls back to TCP (slower but functional). if service == recipe.CriteriaServiceEKS { warnIfHeterogeneousNodes(config.Nodes) - instanceType, efaCount, err := discoverEKSNodeConfig(config.Nodes[0]) + it, efaCount, err := discoverEKSNodeConfig(config.Nodes[0]) if err != nil { return err } - templateData["INSTANCE_TYPE"] = instanceType + instanceType = it // Indentation matches the resource block position in runtime.yaml. const efaIndent = " " templateData["EFA_RESOURCE_LIMITS"] = buildEFAResourceLine(efaCount, efaIndent) @@ -346,16 +348,25 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface } } - // Parse the per-platform runtime template and apply scheduling overrides - // before creating the resource. This replaces the hardcoded platform-specific - // nodeSelector (e.g., instance-type, gke-accelerator) when the user has - // provided --node-selector or --toleration flags for non-standard clusters. + // Build effective worker scheduling: user override takes precedence over platform default. + defaultNodeSelector, defaultTolerations := platformWorkerScheduling(service, instanceType) + effectiveNodeSelector := defaultNodeSelector + if len(ctx.NodeSelector) > 0 { + effectiveNodeSelector = ctx.NodeSelector + slog.Info("Using user-provided node selector override for NCCL workers", "selector", ctx.NodeSelector) + } + effectiveTolerations := defaultTolerations + if len(ctx.Tolerations) > 0 { + effectiveTolerations = ctx.Tolerations + slog.Info("Using user-provided toleration override for NCCL workers", "count", len(ctx.Tolerations)) + } + runtimeObj, err := parseYAMLTemplate(templatePath(accelerator, service, "runtime.yaml"), templateData) if err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse training runtime template", err) } - if err := overrideNCCLWorkerScheduling(runtimeObj, ctx.NodeSelector, ctx.Tolerations); err != nil { - return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to override NCCL worker scheduling", err) + if err := applyNCCLWorkerScheduling(runtimeObj, effectiveNodeSelector, effectiveTolerations); err != nil { + return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to apply NCCL worker scheduling", err) } if err := createUnstructured(ctx.Ctx, dynamicClient, trainingRuntimeGVR, config.Namespace, runtimeObj); err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to apply training runtime", err) @@ -417,14 +428,29 @@ func createUnstructured(ctx context.Context, dynamicClient dynamic.Interface, gv return nil } -// overrideNCCLWorkerScheduling replaces the nodeSelector and/or tolerations -// on the "node" (worker) replicatedJob within a TrainingRuntime unstructured object. -// Only overrides when the provided slice/map is non-nil and non-empty. -func overrideNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector map[string]string, tolerations []v1.Toleration) error { - if len(nodeSelector) == 0 && len(tolerations) == 0 { - return nil +// platformWorkerScheduling returns the default nodeSelector and tolerations +// for NCCL worker pods on the given service. instanceType is only used for EKS. +func platformWorkerScheduling(service recipe.CriteriaServiceType, instanceType string) (map[string]string, []v1.Toleration) { + switch service { + case recipe.CriteriaServiceEKS: + return map[string]string{ + "node.kubernetes.io/instance-type": instanceType, + }, []v1.Toleration{{Operator: v1.TolerationOpExists}} + case recipe.CriteriaServiceGKE: + return map[string]string{ + "cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb", + }, []v1.Toleration{ + {Operator: v1.TolerationOpExists}, + {Key: "nvidia.com/gpu", Operator: v1.TolerationOpEqual, Value: "present", Effect: v1.TaintEffectNoSchedule}, + } + default: + return nil, nil } +} +// applyNCCLWorkerScheduling sets the nodeSelector and tolerations on the "node" +// (worker) replicatedJob within a TrainingRuntime unstructured object. +func applyNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector map[string]string, tolerations []v1.Toleration) error { replicatedJobs, found, err := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") if err != nil || !found { return aicrErrors.New(aicrErrors.ErrCodeInternal, "replicatedJobs not found in TrainingRuntime") @@ -452,7 +478,7 @@ func overrideNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector m ns[k] = v } workerPodSpec["nodeSelector"] = ns - slog.Info("Overriding NCCL worker nodeSelector", "selector", nodeSelector) + slog.Info("Applying NCCL worker nodeSelector", "selector", nodeSelector) } if len(tolerations) > 0 { @@ -473,7 +499,7 @@ func overrideNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector m tolList = append(tolList, tolMap) } workerPodSpec["tolerations"] = tolList - slog.Info("Overriding NCCL worker tolerations", "count", len(tolerations)) + slog.Info("Applying NCCL worker tolerations", "count", len(tolerations)) } replicatedJobs[i] = jobMap diff --git a/validators/performance/nccl_test.go b/validators/performance/nccl_test.go index 936a3bff0..ee05d120f 100644 --- a/validators/performance/nccl_test.go +++ b/validators/performance/nccl_test.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) -func TestOverrideNCCLWorkerScheduling_NodeSelector(t *testing.T) { +func TestApplyNCCLWorkerScheduling_NodeSelector(t *testing.T) { // Build a minimal TrainingRuntime-like unstructured object matching the real template structure. workerPodSpec := map[string]interface{}{ "nodeSelector": map[string]interface{}{ @@ -60,8 +60,8 @@ func TestOverrideNCCLWorkerScheduling_NodeSelector(t *testing.T) { } nodeSelector := map[string]string{"my-org/gpu-pool": "true"} - if err := overrideNCCLWorkerScheduling(obj, nodeSelector, nil); err != nil { - t.Fatalf("overrideNCCLWorkerScheduling() error = %v", err) + if err := applyNCCLWorkerScheduling(obj, nodeSelector, nil); err != nil { + t.Fatalf("applyNCCLWorkerScheduling() error = %v", err) } // Verify the nodeSelector was replaced in the worker spec. @@ -82,7 +82,7 @@ func TestOverrideNCCLWorkerScheduling_NodeSelector(t *testing.T) { } } -func TestOverrideNCCLWorkerScheduling_Tolerations(t *testing.T) { +func TestApplyNCCLWorkerScheduling_Tolerations(t *testing.T) { obj := &unstructured.Unstructured{ Object: map[string]interface{}{ "spec": map[string]interface{}{ @@ -117,8 +117,8 @@ func TestOverrideNCCLWorkerScheduling_Tolerations(t *testing.T) { tolerations := []corev1.Toleration{ {Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, } - if err := overrideNCCLWorkerScheduling(obj, nil, tolerations); err != nil { - t.Fatalf("overrideNCCLWorkerScheduling() error = %v", err) + if err := applyNCCLWorkerScheduling(obj, nil, tolerations); err != nil { + t.Fatalf("applyNCCLWorkerScheduling() error = %v", err) } // nodeSelector should be unchanged (only tolerations overridden). @@ -144,45 +144,31 @@ func TestOverrideNCCLWorkerScheduling_Tolerations(t *testing.T) { } } -func TestOverrideNCCLWorkerScheduling_NoOp(t *testing.T) { - obj := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "spec": map[string]interface{}{ - "template": map[string]interface{}{ - "spec": map[string]interface{}{ - "replicatedJobs": []interface{}{ - map[string]interface{}{ - "name": "node", - "template": map[string]interface{}{ - "spec": map[string]interface{}{ - "template": map[string]interface{}{ - "spec": map[string]interface{}{ - "nodeSelector": map[string]interface{}{ - "original-key": "original-value", - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - // Passing nil for both should leave the object unchanged. - if err := overrideNCCLWorkerScheduling(obj, nil, nil); err != nil { - t.Fatalf("overrideNCCLWorkerScheduling() error = %v", err) - } - - jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") - jm, _ := jobs[0].(map[string]interface{}) - ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") - if ns["original-key"] != "original-value" { - t.Errorf("nodeSelector should be unchanged, got %v", ns) - } +func TestPlatformWorkerScheduling(t *testing.T) { + t.Run("EKS returns instance-type selector", func(t *testing.T) { + ns, tols := platformWorkerScheduling(recipe.CriteriaServiceEKS, "p5.48xlarge") + if ns["node.kubernetes.io/instance-type"] != "p5.48xlarge" { + t.Errorf("EKS nodeSelector = %v, want instance-type=p5.48xlarge", ns) + } + if len(tols) != 1 || tols[0].Operator != corev1.TolerationOpExists { + t.Errorf("EKS tolerations = %v, want tolerate-all", tols) + } + }) + t.Run("GKE returns gke-accelerator selector", func(t *testing.T) { + ns, tols := platformWorkerScheduling(recipe.CriteriaServiceGKE, "") + if ns["cloud.google.com/gke-accelerator"] != "nvidia-h100-mega-80gb" { + t.Errorf("GKE nodeSelector = %v, want gke-accelerator=nvidia-h100-mega-80gb", ns) + } + if len(tols) != 2 { + t.Errorf("GKE tolerations count = %d, want 2", len(tols)) + } + }) + t.Run("unknown service returns nil", func(t *testing.T) { + ns, tols := platformWorkerScheduling("unknown", "") + if ns != nil || tols != nil { + t.Errorf("unknown service should return nil, got ns=%v tols=%v", ns, tols) + } + }) } func TestTemplatePath(t *testing.T) { diff --git a/validators/performance/testdata/h100/eks/runtime.yaml b/validators/performance/testdata/h100/eks/runtime.yaml index 7552dd8af..0462509b6 100644 --- a/validators/performance/testdata/h100/eks/runtime.yaml +++ b/validators/performance/testdata/h100/eks/runtime.yaml @@ -145,10 +145,6 @@ spec: spec: template: spec: - nodeSelector: - node.kubernetes.io/instance-type: ${INSTANCE_TYPE} - tolerations: - - operator: Exists initContainers: # Fix authorized_keys permissions so sshd accepts incoming connections. # The MPI plugin mounts the secret at /tmp/mpi-keys with 0644; sshd diff --git a/validators/performance/testdata/h100/gke/runtime.yaml b/validators/performance/testdata/h100/gke/runtime.yaml index b34942b0d..feedb9755 100644 --- a/validators/performance/testdata/h100/gke/runtime.yaml +++ b/validators/performance/testdata/h100/gke/runtime.yaml @@ -152,14 +152,6 @@ spec: networking.gke.io/default-interface: eth0 networking.gke.io/interfaces: '${GKE_NETWORK_INTERFACES}' spec: - nodeSelector: - cloud.google.com/gke-accelerator: nvidia-h100-mega-80gb - tolerations: - - operator: Exists - - key: nvidia.com/gpu - operator: Equal - value: present - effect: NoSchedule initContainers: # TCPXO FastRak native sidecar — runs fastrak_gpumem_manager # alongside the worker. Uses NRI device injection (devices.gke.io From 42f6de74a6edbec79edfd900e5fd8cfcd6c69764 Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Sat, 21 Mar 2026 18:10:15 -0400 Subject: [PATCH 4/9] test(nccl): add combined nodeSelector+tolerations scheduling test --- validators/performance/nccl_test.go | 67 +++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/validators/performance/nccl_test.go b/validators/performance/nccl_test.go index ee05d120f..324b00ba8 100644 --- a/validators/performance/nccl_test.go +++ b/validators/performance/nccl_test.go @@ -144,6 +144,73 @@ func TestApplyNCCLWorkerScheduling_Tolerations(t *testing.T) { } } +func TestApplyNCCLWorkerScheduling_Both(t *testing.T) { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "replicatedJobs": []interface{}{ + map[string]interface{}{"name": "launcher"}, + map[string]interface{}{ + "name": "node", + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "template": map[string]interface{}{ + "spec": map[string]interface{}{ + "nodeSelector": map[string]interface{}{ + "node.kubernetes.io/instance-type": "p5.48xlarge", + }, + "tolerations": []interface{}{ + map[string]interface{}{"operator": "Exists"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + nodeSelector := map[string]string{"custom-label/gpu": "a100"} + tolerations := []corev1.Toleration{ + {Key: "custom-taint", Value: "true", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}, + } + if err := applyNCCLWorkerScheduling(obj, nodeSelector, tolerations); err != nil { + t.Fatalf("applyNCCLWorkerScheduling() error = %v", err) + } + + jobs, _, _ := unstructured.NestedSlice(obj.Object, "spec", "template", "spec", "replicatedJobs") + for _, j := range jobs { + jm, _ := j.(map[string]interface{}) + name, _, _ := unstructured.NestedString(jm, "name") + if name != "node" { + continue + } + // Verify nodeSelector was replaced. + ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") + if ns["custom-label/gpu"] != "a100" { + t.Errorf("worker nodeSelector = %v, want custom-label/gpu=a100", ns) + } + if _, hasOld := ns["node.kubernetes.io/instance-type"]; hasOld { + t.Error("old instance-type selector should have been replaced") + } + // Verify tolerations were replaced. + tolsRaw, _, _ := unstructured.NestedSlice(jm, "template", "spec", "template", "spec", "tolerations") + if len(tolsRaw) != 1 { + t.Fatalf("tolerations count = %d, want 1", len(tolsRaw)) + } + tol, _ := tolsRaw[0].(map[string]interface{}) + if tol["key"] != "custom-taint" || tol["value"] != "true" || tol["effect"] != "NoSchedule" { + t.Errorf("toleration = %v, want custom-taint=true:NoSchedule", tol) + } + } +} + func TestPlatformWorkerScheduling(t *testing.T) { t.Run("EKS returns instance-type selector", func(t *testing.T) { ns, tols := platformWorkerScheduling(recipe.CriteriaServiceEKS, "p5.48xlarge") From c92a765f036c0664fa3bda0e79443285d9c8470c Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Sat, 21 Mar 2026 18:39:13 -0400 Subject: [PATCH 5/9] fix: resolve lint issues across validator scheduling changes --- pkg/validator/job/deployer.go | 7 ++++--- validators/context.go | 4 ++-- validators/context_test.go | 1 + .../performance/nccl_all_reduce_bw_constraint.go | 12 +++++++----- validators/performance/nccl_test.go | 14 ++++++++------ 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/pkg/validator/job/deployer.go b/pkg/validator/job/deployer.go index 4171d0733..67b2df944 100644 --- a/pkg/validator/job/deployer.go +++ b/pkg/validator/job/deployer.go @@ -232,12 +232,13 @@ func serializeTolerations(tols []corev1.Toleration) string { parts := make([]string, 0, len(tols)) for _, t := range tols { var part string - if t.Key == "" { + switch { + case t.Key == "": // Universal toleration (operator: Exists, no key) part = ":" - } else if t.Value != "" { + case t.Value != "": part = t.Key + "=" + t.Value + ":" + string(t.Effect) - } else { + default: part = t.Key + ":" + string(t.Effect) } parts = append(parts, part) diff --git a/validators/context.go b/validators/context.go index 498a77b88..c265546b5 100644 --- a/validators/context.go +++ b/validators/context.go @@ -145,7 +145,7 @@ func LoadContext() (*Context, error) { func parseNodeSelectorEnv() (map[string]string, error) { raw := os.Getenv("AICR_NODE_SELECTOR") if raw == "" { - return nil, nil + return nil, nil //nolint:nilnil // nil signals "not set" — callers check len to distinguish from empty } entries := strings.Split(raw, ",") return snapshotter.ParseNodeSelectors(entries) @@ -156,7 +156,7 @@ func parseNodeSelectorEnv() (map[string]string, error) { func parseTolerationEnv() ([]corev1.Toleration, error) { raw := os.Getenv("AICR_TOLERATIONS") if raw == "" { - return nil, nil + return nil, nil //nolint:nilnil // nil signals "not set" — callers check len to distinguish from empty } entries := strings.Split(raw, ",") // Use ParseTolerations but we must not pass empty slice (it returns DefaultTolerations). diff --git a/validators/context_test.go b/validators/context_test.go index b0acb780d..d3b59b33a 100644 --- a/validators/context_test.go +++ b/validators/context_test.go @@ -154,6 +154,7 @@ func TestParseTolerationEnv(t *testing.T) { for i, want := range tt.expected { if got[i].Key != want.Key || got[i].Value != want.Value || got[i].Effect != want.Effect || got[i].Operator != want.Operator { + t.Errorf("parseTolerationEnv()[%d] = %+v, want %+v", i, got[i], want) } } diff --git a/validators/performance/nccl_all_reduce_bw_constraint.go b/validators/performance/nccl_all_reduce_bw_constraint.go index 9a7f4f903..5349a7bfa 100644 --- a/validators/performance/nccl_all_reduce_bw_constraint.go +++ b/validators/performance/nccl_all_reduce_bw_constraint.go @@ -438,11 +438,13 @@ func platformWorkerScheduling(service recipe.CriteriaServiceType, instanceType s }, []v1.Toleration{{Operator: v1.TolerationOpExists}} case recipe.CriteriaServiceGKE: return map[string]string{ - "cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb", - }, []v1.Toleration{ - {Operator: v1.TolerationOpExists}, - {Key: "nvidia.com/gpu", Operator: v1.TolerationOpEqual, Value: "present", Effect: v1.TaintEffectNoSchedule}, - } + "cloud.google.com/gke-accelerator": "nvidia-h100-mega-80gb", + }, []v1.Toleration{ + {Operator: v1.TolerationOpExists}, + {Key: "nvidia.com/gpu", Operator: v1.TolerationOpEqual, Value: "present", Effect: v1.TaintEffectNoSchedule}, + } + case recipe.CriteriaServiceAny, recipe.CriteriaServiceAKS, recipe.CriteriaServiceOKE, recipe.CriteriaServiceKind: + return nil, nil default: return nil, nil } diff --git a/validators/performance/nccl_test.go b/validators/performance/nccl_test.go index 324b00ba8..c8744c4be 100644 --- a/validators/performance/nccl_test.go +++ b/validators/performance/nccl_test.go @@ -25,6 +25,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) +const testWorkerJobName = "node" + func TestApplyNCCLWorkerScheduling_NodeSelector(t *testing.T) { // Build a minimal TrainingRuntime-like unstructured object matching the real template structure. workerPodSpec := map[string]interface{}{ @@ -43,7 +45,7 @@ func TestApplyNCCLWorkerScheduling_NodeSelector(t *testing.T) { "replicatedJobs": []interface{}{ map[string]interface{}{"name": "launcher"}, map[string]interface{}{ - "name": "node", + "name": testWorkerJobName, "template": map[string]interface{}{ "spec": map[string]interface{}{ "template": map[string]interface{}{ @@ -69,7 +71,7 @@ func TestApplyNCCLWorkerScheduling_NodeSelector(t *testing.T) { for _, j := range jobs { jm, _ := j.(map[string]interface{}) name, _, _ := unstructured.NestedString(jm, "name") - if name != "node" { + if name != testWorkerJobName { continue } ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") @@ -91,7 +93,7 @@ func TestApplyNCCLWorkerScheduling_Tolerations(t *testing.T) { "replicatedJobs": []interface{}{ map[string]interface{}{"name": "launcher"}, map[string]interface{}{ - "name": "node", + "name": testWorkerJobName, "template": map[string]interface{}{ "spec": map[string]interface{}{ "template": map[string]interface{}{ @@ -126,7 +128,7 @@ func TestApplyNCCLWorkerScheduling_Tolerations(t *testing.T) { for _, j := range jobs { jm, _ := j.(map[string]interface{}) name, _, _ := unstructured.NestedString(jm, "name") - if name != "node" { + if name != testWorkerJobName { continue } ns, _, _ := unstructured.NestedStringMap(jm, "template", "spec", "template", "spec", "nodeSelector") @@ -153,7 +155,7 @@ func TestApplyNCCLWorkerScheduling_Both(t *testing.T) { "replicatedJobs": []interface{}{ map[string]interface{}{"name": "launcher"}, map[string]interface{}{ - "name": "node", + "name": testWorkerJobName, "template": map[string]interface{}{ "spec": map[string]interface{}{ "template": map[string]interface{}{ @@ -188,7 +190,7 @@ func TestApplyNCCLWorkerScheduling_Both(t *testing.T) { for _, j := range jobs { jm, _ := j.(map[string]interface{}) name, _, _ := unstructured.NestedString(jm, "name") - if name != "node" { + if name != testWorkerJobName { continue } // Verify nodeSelector was replaced. From c6adee11f14934dc08eedcdec9538d8d671663bd Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Sat, 21 Mar 2026 23:07:05 -0400 Subject: [PATCH 6/9] feat(nccl): add B200 and GB200 support for self-managed clusters (service=any) --- recipes/overlays/b200-any-training.yaml | 34 ++++ recipes/overlays/gb200-any-training.yaml | 34 ++++ .../nccl_all_reduce_bw_constraint.go | 7 + validators/performance/nccl_test.go | 20 ++ .../testdata/b200/any/runtime.yaml | 181 ++++++++++++++++++ .../testdata/gb200/any/runtime.yaml | 181 ++++++++++++++++++ 6 files changed, 457 insertions(+) create mode 100644 recipes/overlays/b200-any-training.yaml create mode 100644 recipes/overlays/gb200-any-training.yaml create mode 100644 validators/performance/testdata/b200/any/runtime.yaml create mode 100644 validators/performance/testdata/gb200/any/runtime.yaml diff --git a/recipes/overlays/b200-any-training.yaml b/recipes/overlays/b200-any-training.yaml new file mode 100644 index 000000000..44e5bb5d9 --- /dev/null +++ b/recipes/overlays/b200-any-training.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: RecipeMetadata +apiVersion: aicr.nvidia.com/v1alpha1 +metadata: + name: b200-any-training + +spec: + base: base + + criteria: + service: any + accelerator: b200 + intent: training + + validation: + performance: + checks: + - nccl-all-reduce-bw + constraints: + - name: nccl-all-reduce-bw + value: ">= 350" diff --git a/recipes/overlays/gb200-any-training.yaml b/recipes/overlays/gb200-any-training.yaml new file mode 100644 index 000000000..862a54e43 --- /dev/null +++ b/recipes/overlays/gb200-any-training.yaml @@ -0,0 +1,34 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: RecipeMetadata +apiVersion: aicr.nvidia.com/v1alpha1 +metadata: + name: gb200-any-training + +spec: + base: base + + criteria: + service: any + accelerator: gb200 + intent: training + + validation: + performance: + checks: + - nccl-all-reduce-bw + constraints: + - name: nccl-all-reduce-bw + value: ">= 720" diff --git a/validators/performance/nccl_all_reduce_bw_constraint.go b/validators/performance/nccl_all_reduce_bw_constraint.go index 5349a7bfa..ce506d899 100644 --- a/validators/performance/nccl_all_reduce_bw_constraint.go +++ b/validators/performance/nccl_all_reduce_bw_constraint.go @@ -92,6 +92,7 @@ func templatePath(accelerator recipe.CriteriaAcceleratorType, service recipe.Cri var supportedNCCLCombinations = map[recipe.CriteriaServiceType][]recipe.CriteriaAcceleratorType{ recipe.CriteriaServiceEKS: {recipe.CriteriaAcceleratorH100}, recipe.CriteriaServiceGKE: {recipe.CriteriaAcceleratorH100}, + recipe.CriteriaServiceAny: {recipe.CriteriaAcceleratorB200, recipe.CriteriaAcceleratorGB200}, } // validateNcclAllReduceBw validates NCCL All Reduce bandwidth using Kubeflow TrainJob + MPI. @@ -361,6 +362,12 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface slog.Info("Using user-provided toleration override for NCCL workers", "count", len(ctx.Tolerations)) } + if service == recipe.CriteriaServiceAny && len(effectiveNodeSelector) == 0 { + return aicrErrors.New(aicrErrors.ErrCodeInvalidRequest, + "self-managed clusters (service=any) require --node-selector to identify GPU nodes "+ + "(e.g., --node-selector nvidia.com/gpu.present=true)") + } + runtimeObj, err := parseYAMLTemplate(templatePath(accelerator, service, "runtime.yaml"), templateData) if err != nil { return aicrErrors.Wrap(aicrErrors.ErrCodeInternal, "failed to parse training runtime template", err) diff --git a/validators/performance/nccl_test.go b/validators/performance/nccl_test.go index c8744c4be..187beea68 100644 --- a/validators/performance/nccl_test.go +++ b/validators/performance/nccl_test.go @@ -238,6 +238,12 @@ func TestPlatformWorkerScheduling(t *testing.T) { t.Errorf("unknown service should return nil, got ns=%v tols=%v", ns, tols) } }) + t.Run("any service returns nil", func(t *testing.T) { + ns, tols := platformWorkerScheduling(recipe.CriteriaServiceAny, "") + if ns != nil || tols != nil { + t.Errorf("any service should return nil, got ns=%v tols=%v", ns, tols) + } + }) } func TestTemplatePath(t *testing.T) { @@ -269,6 +275,20 @@ func TestTemplatePath(t *testing.T) { filename: "runtime.yaml", expected: filepath.Join("testdata", "gb200", "gke", "runtime.yaml"), }, + { + name: "b200 any runtime", + accelerator: recipe.CriteriaAcceleratorB200, + service: recipe.CriteriaServiceAny, + filename: "runtime.yaml", + expected: filepath.Join("testdata", "b200", "any", "runtime.yaml"), + }, + { + name: "gb200 any runtime", + accelerator: recipe.CriteriaAcceleratorGB200, + service: recipe.CriteriaServiceAny, + filename: "runtime.yaml", + expected: filepath.Join("testdata", "gb200", "any", "runtime.yaml"), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/validators/performance/testdata/b200/any/runtime.yaml b/validators/performance/testdata/b200/any/runtime.yaml new file mode 100644 index 000000000..5d38929d5 --- /dev/null +++ b/validators/performance/testdata/b200/any/runtime.yaml @@ -0,0 +1,181 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Self-managed (service=any) NCCL All-Reduce TrainingRuntime — InfiniBand/RDMA. +# +# Simplified runtime for bare-metal / self-managed clusters using InfiniBand. +# No EFA, no TCPXO, no cloud-specific sidecars or resources. +# Requires --node-selector to identify GPU nodes (no standard label exists). +# +# Must stay in sync with ncclTrainingRuntimeName in nccl_all_reduce_bw_constraint.go. + +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: TrainingRuntime +metadata: + name: nccl-all-reduce-runtime + namespace: ${NAMESPACE} + labels: + trainer.kubeflow.org/framework: mpi +spec: + mlPolicy: + mpi: + mpiImplementation: OpenMPI + numProcPerNode: ${GPU_COUNT_PER_NODE} + runLauncherAsNode: false + sshAuthMountPath: /tmp/mpi-keys + template: + spec: + network: + enableDNSHostnames: true + publishNotReadyAddresses: true + replicatedJobs: + - name: launcher + replicas: 1 + template: + spec: + template: + spec: + tolerations: + - operator: Exists + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + mkdir -p /root/.ssh + cp /tmp/mpi-keys/id_rsa /root/.ssh/id_rsa + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys + chmod 700 /root/.ssh + chmod 600 /root/.ssh/id_rsa /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + env: + - name: LD_LIBRARY_PATH + value: "/usr/local/nvidia/lib64:/usr/local/cuda/lib64" + command: + - /usr/local/mpi/bin/mpirun + args: + - -np + - "${GPU_COUNT}" + - --allow-run-as-root + - --mca + - plm_rsh_args + - -o StrictHostKeyChecking=no -o ConnectionAttempts=10 + - --mca + - btl + - ^openib + - --mca + - btl_tcp_if_include + - eth0 + - --mca + - oob_tcp_if_include + - eth0 + - -x + - LD_LIBRARY_PATH + - -x + - NCCL_DEBUG=WARN + - -x + - NCCL_SOCKET_IFNAME=eth0 + - /usr/local/bin/${TEST_TYPE}_mpi + - -b + - ${MIN_MESSAGE_SIZE} + - -e + - ${MAX_MESSAGE_SIZE} + - -f + - "2" + - -g + - "1" + resources: + limits: + cpu: "2" + memory: 128Mi + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + volumes: + - name: ssh-config + emptyDir: {} + - name: node + template: + spec: + template: + spec: + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: ["sh", "-c"] + args: + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/* /root/.ssh/ && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys && + /usr/sbin/sshd -De + resources: + limits: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + requests: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + securityContext: + capabilities: + add: ["IPC_LOCK"] + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + - name: dshm + mountPath: /dev/shm + volumes: + - name: ssh-config + emptyDir: {} + - name: dshm + emptyDir: + medium: Memory + successPolicy: + operator: All + targetReplicatedJobs: + - launcher diff --git a/validators/performance/testdata/gb200/any/runtime.yaml b/validators/performance/testdata/gb200/any/runtime.yaml new file mode 100644 index 000000000..5d38929d5 --- /dev/null +++ b/validators/performance/testdata/gb200/any/runtime.yaml @@ -0,0 +1,181 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Self-managed (service=any) NCCL All-Reduce TrainingRuntime — InfiniBand/RDMA. +# +# Simplified runtime for bare-metal / self-managed clusters using InfiniBand. +# No EFA, no TCPXO, no cloud-specific sidecars or resources. +# Requires --node-selector to identify GPU nodes (no standard label exists). +# +# Must stay in sync with ncclTrainingRuntimeName in nccl_all_reduce_bw_constraint.go. + +apiVersion: trainer.kubeflow.org/v1alpha1 +kind: TrainingRuntime +metadata: + name: nccl-all-reduce-runtime + namespace: ${NAMESPACE} + labels: + trainer.kubeflow.org/framework: mpi +spec: + mlPolicy: + mpi: + mpiImplementation: OpenMPI + numProcPerNode: ${GPU_COUNT_PER_NODE} + runLauncherAsNode: false + sshAuthMountPath: /tmp/mpi-keys + template: + spec: + network: + enableDNSHostnames: true + publishNotReadyAddresses: true + replicatedJobs: + - name: launcher + replicas: 1 + template: + spec: + template: + spec: + tolerations: + - operator: Exists + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + mkdir -p /root/.ssh + cp /tmp/mpi-keys/id_rsa /root/.ssh/id_rsa + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys + chmod 700 /root/.ssh + chmod 600 /root/.ssh/id_rsa /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + env: + - name: LD_LIBRARY_PATH + value: "/usr/local/nvidia/lib64:/usr/local/cuda/lib64" + command: + - /usr/local/mpi/bin/mpirun + args: + - -np + - "${GPU_COUNT}" + - --allow-run-as-root + - --mca + - plm_rsh_args + - -o StrictHostKeyChecking=no -o ConnectionAttempts=10 + - --mca + - btl + - ^openib + - --mca + - btl_tcp_if_include + - eth0 + - --mca + - oob_tcp_if_include + - eth0 + - -x + - LD_LIBRARY_PATH + - -x + - NCCL_DEBUG=WARN + - -x + - NCCL_SOCKET_IFNAME=eth0 + - /usr/local/bin/${TEST_TYPE}_mpi + - -b + - ${MIN_MESSAGE_SIZE} + - -e + - ${MAX_MESSAGE_SIZE} + - -f + - "2" + - -g + - "1" + resources: + limits: + cpu: "2" + memory: 128Mi + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + volumes: + - name: ssh-config + emptyDir: {} + - name: node + template: + spec: + template: + spec: + initContainers: + - name: fix-ssh-perms + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: + - /bin/sh + - -c + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/authorized_keys /root/.ssh/authorized_keys && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys + volumeMounts: + - name: mpi-ssh-auth + mountPath: /tmp/mpi-keys + readOnly: true + - name: ssh-config + mountPath: /root/.ssh + containers: + - name: node + image: nvcr.io/nvidia/pytorch:25.06-py3 + command: ["sh", "-c"] + args: + - | + apt-get update && + apt-get install -y --no-install-recommends openssh-server && + mkdir -p /var/run/sshd && + chmod 0755 /var/run/sshd && + mkdir -p /root/.ssh && + cp /tmp/mpi-keys/* /root/.ssh/ && + chmod 700 /root/.ssh && + chmod 600 /root/.ssh/authorized_keys && + /usr/sbin/sshd -De + resources: + limits: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + requests: + nvidia.com/gpu: ${GPU_COUNT_PER_NODE} + securityContext: + capabilities: + add: ["IPC_LOCK"] + volumeMounts: + - name: ssh-config + mountPath: /root/.ssh + - name: dshm + mountPath: /dev/shm + volumes: + - name: ssh-config + emptyDir: {} + - name: dshm + emptyDir: + medium: Memory + successPolicy: + operator: All + targetReplicatedJobs: + - launcher From 1626d631e3c351696de29f3fa47d109a64504564 Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Mon, 23 Mar 2026 11:04:18 -0400 Subject: [PATCH 7/9] fix(validator): use sentinel for universal toleration serialization The default "tolerate all" toleration ({Operator: Exists}) was serialized as ":" which failed to parse in the validator container due to an empty taint effect string. Use "*" as a sentinel value that both sides recognize. --- pkg/snapshotter/agent.go | 5 +++++ pkg/validator/job/deployer.go | 5 ++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/snapshotter/agent.go b/pkg/snapshotter/agent.go index ccd087c92..08c57ac07 100644 --- a/pkg/snapshotter/agent.go +++ b/pkg/snapshotter/agent.go @@ -282,6 +282,11 @@ func ParseTolerations(tolerations []string) ([]corev1.Toleration, error) { result := make([]corev1.Toleration, 0, len(tolerations)) for _, t := range tolerations { + if t == "*" { + result = append(result, corev1.Toleration{Operator: corev1.TolerationOpExists}) + continue + } + // Format: key=value:effect or key:effect (for exists operator) var key, value, effect string diff --git a/pkg/validator/job/deployer.go b/pkg/validator/job/deployer.go index 67b2df944..e77090e08 100644 --- a/pkg/validator/job/deployer.go +++ b/pkg/validator/job/deployer.go @@ -233,9 +233,8 @@ func serializeTolerations(tols []corev1.Toleration) string { for _, t := range tols { var part string switch { - case t.Key == "": - // Universal toleration (operator: Exists, no key) - part = ":" + case t.Key == "" && t.Operator == corev1.TolerationOpExists: + part = "*" case t.Value != "": part = t.Key + "=" + t.Value + ":" + string(t.Effect) default: From b73b135e0c74f88d94ac9a2fd87af515077b35ed Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Mon, 23 Mar 2026 11:04:39 -0400 Subject: [PATCH 8/9] fix(nccl): wait for Trainer controller-manager readiness after install After installing the Kubeflow Trainer operator, the ValidatingWebhookConfiguration is registered before the controller-manager pod is ready to serve admission requests. Poll the controller-manager Deployment until at least one replica is ready before creating TrainingRuntime resources. --- pkg/defaults/timeouts.go | 4 ++ pkg/defaults/timeouts_test.go | 3 ++ validators/performance/trainer_lifecycle.go | 47 +++++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/pkg/defaults/timeouts.go b/pkg/defaults/timeouts.go index 558c042e4..3ed90afdb 100644 --- a/pkg/defaults/timeouts.go +++ b/pkg/defaults/timeouts.go @@ -227,6 +227,10 @@ const ( // to reach the Established condition after installation. TrainerCRDEstablishedTimeout = 2 * time.Minute + // TrainerControllerReadyTimeout is the time to wait for the Kubeflow Trainer + // controller-manager Deployment to have at least one ready replica after installation. + TrainerControllerReadyTimeout = 2 * time.Minute + // NCCLTrainJobTimeout is the maximum time to wait for the NCCL all-reduce TrainJob to complete. NCCLTrainJobTimeout = 30 * time.Minute diff --git a/pkg/defaults/timeouts_test.go b/pkg/defaults/timeouts_test.go index 8b545bd92..cbfe59d82 100644 --- a/pkg/defaults/timeouts_test.go +++ b/pkg/defaults/timeouts_test.go @@ -62,6 +62,9 @@ func TestTimeoutConstants(t *testing.T) { // Gang scheduling co-scheduling window {"CoScheduleWindow", CoScheduleWindow, 10 * time.Second, 60 * time.Second}, + // Trainer timeouts + {"TrainerControllerReadyTimeout", TrainerControllerReadyTimeout, 1 * time.Minute, 5 * time.Minute}, + // Validator timeouts {"ValidatorWaitBuffer", ValidatorWaitBuffer, 10 * time.Second, 60 * time.Second}, {"ValidatorDefaultTimeout", ValidatorDefaultTimeout, 1 * time.Minute, 15 * time.Minute}, diff --git a/validators/performance/trainer_lifecycle.go b/validators/performance/trainer_lifecycle.go index d26aebafa..024915a3e 100644 --- a/validators/performance/trainer_lifecycle.go +++ b/validators/performance/trainer_lifecycle.go @@ -23,6 +23,7 @@ import ( "io" "log/slog" "net/http" + "time" "os" "path/filepath" "slices" @@ -55,6 +56,12 @@ const ( // trainerCRDName is the CRD that signals the Trainer operator is installed. trainerCRDName = "trainjobs.trainer.kubeflow.org" + // trainerControllerDeployment is the Deployment name for the Trainer controller-manager. + trainerControllerDeployment = "kubeflow-trainer-controller-manager" + + // trainerNamespace is the namespace where the Trainer operator is installed. + trainerNamespace = "kubeflow-system" + // maxExtractedFileSize caps individual file sizes during tar extraction (50 MB). maxExtractedFileSize = 50 * 1024 * 1024 ) @@ -178,6 +185,12 @@ func installTrainer(ctx context.Context, dynamicClient dynamic.Interface, discov return applied, aicrErrors.Wrap(aicrErrors.ErrCodeTimeout, "Trainer CRDs not ready after install", err) } + // Wait for the controller-manager to be ready so the ValidatingWebhookConfiguration + // backing Service can serve requests before the caller creates TrainingRuntime resources. + if err := waitForTrainerControllerReady(ctx, dynamicClient); err != nil { + return applied, aicrErrors.Wrap(aicrErrors.ErrCodeTimeout, "Trainer controller not ready after install", err) + } + return applied, nil } @@ -225,6 +238,40 @@ func waitForTrainerCRDsEstablished(ctx context.Context, dynamicClient dynamic.In return nil } +// waitForTrainerControllerReady polls the controller-manager Deployment until at +// least one replica is ready, ensuring the ValidatingWebhookConfiguration can +// serve admission requests before the caller creates Trainer custom resources. +func waitForTrainerControllerReady(ctx context.Context, dynamicClient dynamic.Interface) error { + slog.Info("Waiting for Trainer controller-manager to become ready", + "deployment", trainerControllerDeployment, "namespace", trainerNamespace) + + deployGVR := schema.GroupVersionResource{ + Group: "apps", Version: "v1", Resource: "deployments", + } + + waitCtx, cancel := context.WithTimeout(ctx, defaults.TrainerControllerReadyTimeout) + defer cancel() + + for { + deploy, err := dynamicClient.Resource(deployGVR).Namespace(trainerNamespace). + Get(waitCtx, trainerControllerDeployment, metav1.GetOptions{}) + if err == nil { + readyReplicas, _, _ := unstructured.NestedInt64(deploy.Object, "status", "readyReplicas") + if readyReplicas >= 1 { + slog.Info("Trainer controller-manager is ready", "readyReplicas", readyReplicas) + return nil + } + } + + select { + case <-waitCtx.Done(): + return aicrErrors.Wrap(aicrErrors.ErrCodeTimeout, + "timed out waiting for Trainer controller-manager to become ready", waitCtx.Err()) + case <-time.After(2 * time.Second): + } + } +} + // waitForCRDEstablished watches a CRD until its Established condition is True. // It checks the current state first so the fast path (already established) returns // immediately without starting a watch. From f57df201d7b370ca418f9990924af86052df3987 Mon Sep 17 00:00:00 2001 From: Atif Mahmood Date: Wed, 1 Apr 2026 19:00:03 -0400 Subject: [PATCH 9/9] fix(validator): address post-review feedback on scheduling flags - buildPodSpecApply: hardcode tolerate-all on orchestrator Job so user --toleration flags (targeting inner workloads) cannot prevent the orchestrator from scheduling on CPU nodes; remove buildTolerationsApply - serializeNodeSelector: sort keys for deterministic AICR_NODE_SELECTOR output - orchestratorEnvCount: update capacity hint from 6 to 8 - applyNCCLWorkerScheduling: return ErrCodeInternal when "node" replicatedJob is not found instead of silently succeeding - applyNCCLResources: use ctx.NodeSelector != nil / ctx.Tolerations != nil instead of len() > 0 for consistency with conformance validators - trainer_lifecycle.go: fix stdlib import ordering (time after strings) - agent_test.go: add wildcard "*" test cases for ParseTolerations --- pkg/snapshotter/agent_test.go | 14 +++++ pkg/validator/job/deployer.go | 61 ++++++------------- pkg/validator/job/deployer_test.go | 33 ++++++---- .../nccl_all_reduce_bw_constraint.go | 10 ++- validators/performance/trainer_lifecycle.go | 2 +- 5 files changed, 62 insertions(+), 58 deletions(-) diff --git a/pkg/snapshotter/agent_test.go b/pkg/snapshotter/agent_test.go index bb5f4895f..b818e0262 100644 --- a/pkg/snapshotter/agent_test.go +++ b/pkg/snapshotter/agent_test.go @@ -349,6 +349,12 @@ func TestParseTolerations(t *testing.T) { wantLen: 0, wantErr: true, }, + { + name: "wildcard toleration", + tolerations: []string{"*"}, + wantLen: 1, + wantErr: false, + }, } for _, tt := range tests { @@ -391,6 +397,14 @@ func TestParseTolerationsOperator(t *testing.T) { wantValue: "", wantEffect: corev1.TaintEffectNoExecute, }, + { + name: "wildcard toleration produces Exists with empty key", + toleration: "*", + wantOperator: corev1.TolerationOpExists, + wantKey: "", + wantValue: "", + wantEffect: "", + }, } for _, tt := range tests { diff --git a/pkg/validator/job/deployer.go b/pkg/validator/job/deployer.go index e77090e08..e63a75696 100644 --- a/pkg/validator/job/deployer.go +++ b/pkg/validator/job/deployer.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "fmt" "log/slog" + "slices" "strings" "time" @@ -188,7 +189,7 @@ func (d *Deployer) buildApplyConfig() *applybatchv1.JobApplyConfiguration { } func (d *Deployer) buildEnvApply() []*applycorev1.EnvVarApplyConfiguration { - orchestratorEnvCount := 6 + orchestratorEnvCount := 8 env := make([]*applycorev1.EnvVarApplyConfiguration, 0, orchestratorEnvCount+len(d.entry.Env)) env = append(env, applycorev1.EnvVar().WithName("AICR_SNAPSHOT_PATH").WithValue("/data/snapshot/snapshot.yaml"), @@ -216,11 +217,17 @@ func (d *Deployer) buildEnvApply() []*applycorev1.EnvVarApplyConfiguration { } // serializeNodeSelector encodes a nodeSelector map as a comma-separated key=value string. -// This matches the format expected by snapshotter.ParseNodeSelectors on the receiving end. +// Keys are sorted for deterministic output. This matches the format expected by +// snapshotter.ParseNodeSelectors on the receiving end. func serializeNodeSelector(ns map[string]string) string { + keys := make([]string, 0, len(ns)) + for k := range ns { + keys = append(keys, k) + } + slices.Sort(keys) pairs := make([]string, 0, len(ns)) - for k, v := range ns { - pairs = append(pairs, k+"="+v) + for _, k := range keys { + pairs = append(pairs, k+"="+ns[k]) } return strings.Join(pairs, ",") } @@ -245,22 +252,11 @@ func serializeTolerations(tols []corev1.Toleration) string { return strings.Join(parts, ",") } -// imagePullPolicy returns the appropriate pull policy based on the image reference. -// Local images (ko.local, kind.local, localhost) always use IfNotPresent since they -// are side-loaded into the cluster and cannot be pulled from a registry. -// Remote images with :latest tag use Always to avoid stale cached images. +// imagePullPolicy returns Always when the image uses :latest tag (dev builds), +// PullIfNotPresent otherwise. This ensures dev builds always pull fresh images +// and avoids exec format errors from stale cached images on cluster nodes. func (d *Deployer) imagePullPolicy() corev1.PullPolicy { - img := d.entry.Image - // Local images side-loaded into kind/nvkind — never pull from registry. - if strings.HasPrefix(img, "ko.local") || - strings.HasPrefix(img, "kind.local") || - strings.HasPrefix(img, "localhost/") || - strings.HasPrefix(img, "localhost:") { - - return corev1.PullIfNotPresent - } - - if strings.HasSuffix(img, ":latest") { + if strings.HasSuffix(d.entry.Image, ":latest") { return corev1.PullAlways } return corev1.PullIfNotPresent @@ -275,37 +271,18 @@ func (d *Deployer) buildImagePullSecretsApply() []*applycorev1.LocalObjectRefere } func (d *Deployer) buildPodSpecApply() *applycorev1.PodSpecApplyConfiguration { + // The orchestrator Job always tolerates all taints so it can schedule on any + // available CPU node. User-provided tolerations (--toleration flag) are forwarded + // to inner workloads via AICR_TOLERATIONS and do not affect orchestrator scheduling. return applycorev1.PodSpec(). WithServiceAccountName(ServiceAccountName). WithRestartPolicy(corev1.RestartPolicyNever). WithTerminationGracePeriodSeconds(int64(defaults.ValidatorTerminationGracePeriod.Seconds())). WithImagePullSecrets(d.buildImagePullSecretsApply()...). - WithTolerations(d.buildTolerationsApply()...). + WithTolerations(applycorev1.Toleration().WithOperator(corev1.TolerationOpExists)). WithAffinity(preferCPUNodeAffinityApply()) } -func (d *Deployer) buildTolerationsApply() []*applycorev1.TolerationApplyConfiguration { - tols := make([]*applycorev1.TolerationApplyConfiguration, 0, len(d.tolerations)) - for i := range d.tolerations { - t := &d.tolerations[i] - tol := applycorev1.Toleration().WithOperator(t.Operator) - if t.Key != "" { - tol = tol.WithKey(t.Key) - } - if t.Value != "" { - tol = tol.WithValue(t.Value) - } - if t.Effect != "" { - tol = tol.WithEffect(t.Effect) - } - if t.TolerationSeconds != nil { - tol = tol.WithTolerationSeconds(*t.TolerationSeconds) - } - tols = append(tols, tol) - } - return tols -} - // WaitForCompletion watches the Job until it reaches a terminal state // (Complete or Failed). Returns nil for both — the caller uses ExtractResult // to determine pass/fail/skip from the exit code. diff --git a/pkg/validator/job/deployer_test.go b/pkg/validator/job/deployer_test.go index fbc61650b..6d84672e5 100644 --- a/pkg/validator/job/deployer_test.go +++ b/pkg/validator/job/deployer_test.go @@ -318,14 +318,26 @@ func TestDeployJobImagePullSecrets(t *testing.T) { } } -func TestDeployJobTolerations(t *testing.T) { - ns := createUniqueNamespace(t) - tolerations := []corev1.Toleration{{Operator: corev1.TolerationOpExists}} - job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tolerations, nil)) - - tols := job.Spec.Template.Spec.Tolerations - if len(tols) != 1 || tols[0].Operator != corev1.TolerationOpExists { - t.Errorf("tolerations = %v, want tolerate-all", tols) +func TestDeployJobOrchestratorToleratesTolerateAll(t *testing.T) { + // The orchestrator Job must always have tolerate-all so it can schedule on + // any CPU node, regardless of what tolerations are passed for inner workloads. + tests := []struct { + name string + tolerations []corev1.Toleration + }{ + {"nil tolerations", nil}, + {"narrow GPU toleration", []corev1.Toleration{{Key: "gpu-type", Value: "h100", Effect: corev1.TaintEffectNoSchedule, Operator: corev1.TolerationOpEqual}}}, + {"explicit tolerate-all", []corev1.Toleration{{Operator: corev1.TolerationOpExists}}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ns := createUniqueNamespace(t) + job := deployAndGet(t, NewDeployer(testClientset, testFactory(t, ns), ns, "run1", testEntry(), nil, tt.tolerations, nil)) + tols := job.Spec.Template.Spec.Tolerations + if len(tols) != 1 || tols[0].Operator != corev1.TolerationOpExists || tols[0].Key != "" { + t.Errorf("orchestrator tolerations = %v, want single tolerate-all {Operator: Exists}", tols) + } + }) } } @@ -578,11 +590,6 @@ func TestImagePullPolicy(t *testing.T) { }{ {"latest tag uses Always", "ghcr.io/nvidia/aicr-validators/conformance:latest", corev1.PullAlways}, {"versioned tag uses IfNotPresent", "ghcr.io/nvidia/aicr-validators/conformance:v1.0.0", corev1.PullIfNotPresent}, - {"ko.local uses IfNotPresent", "ko.local:smoke-test", corev1.PullIfNotPresent}, - {"ko.local latest uses IfNotPresent", "ko.local:latest", corev1.PullIfNotPresent}, - {"kind.local uses IfNotPresent", "kind.local/validator:latest", corev1.PullIfNotPresent}, - {"localhost registry uses IfNotPresent", "localhost:5000/validator:latest", corev1.PullIfNotPresent}, - {"localhost path uses IfNotPresent", "localhost/validator:latest", corev1.PullIfNotPresent}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/validators/performance/nccl_all_reduce_bw_constraint.go b/validators/performance/nccl_all_reduce_bw_constraint.go index ce506d899..5777c3e6e 100644 --- a/validators/performance/nccl_all_reduce_bw_constraint.go +++ b/validators/performance/nccl_all_reduce_bw_constraint.go @@ -352,12 +352,12 @@ func applyNCCLResources(ctx *validators.Context, dynamicClient dynamic.Interface // Build effective worker scheduling: user override takes precedence over platform default. defaultNodeSelector, defaultTolerations := platformWorkerScheduling(service, instanceType) effectiveNodeSelector := defaultNodeSelector - if len(ctx.NodeSelector) > 0 { + if ctx.NodeSelector != nil { effectiveNodeSelector = ctx.NodeSelector slog.Info("Using user-provided node selector override for NCCL workers", "selector", ctx.NodeSelector) } effectiveTolerations := defaultTolerations - if len(ctx.Tolerations) > 0 { + if ctx.Tolerations != nil { effectiveTolerations = ctx.Tolerations slog.Info("Using user-provided toleration override for NCCL workers", "count", len(ctx.Tolerations)) } @@ -465,6 +465,7 @@ func applyNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector map[ return aicrErrors.New(aicrErrors.ErrCodeInternal, "replicatedJobs not found in TrainingRuntime") } + nodeJobFound := false for i, jobRaw := range replicatedJobs { jobMap, ok := jobRaw.(map[string]interface{}) if !ok { @@ -474,6 +475,7 @@ func applyNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector map[ if name != "node" { continue } + nodeJobFound = true // Navigate deep into the worker pod spec. workerPodSpec, found := nestedMap(jobMap, "template", "spec", "template", "spec") @@ -515,6 +517,10 @@ func applyNCCLWorkerScheduling(obj *unstructured.Unstructured, nodeSelector map[ break } + if !nodeJobFound { + return aicrErrors.New(aicrErrors.ErrCodeInternal, `replicatedJob "node" not found in TrainingRuntime`) + } + return unstructured.SetNestedSlice(obj.Object, replicatedJobs, "spec", "template", "spec", "replicatedJobs") } diff --git a/validators/performance/trainer_lifecycle.go b/validators/performance/trainer_lifecycle.go index 024915a3e..73395aa2f 100644 --- a/validators/performance/trainer_lifecycle.go +++ b/validators/performance/trainer_lifecycle.go @@ -23,11 +23,11 @@ import ( "io" "log/slog" "net/http" - "time" "os" "path/filepath" "slices" "strings" + "time" "github.com/NVIDIA/aicr/pkg/defaults" aicrErrors "github.com/NVIDIA/aicr/pkg/errors"