From aafe0f2dbe07b175f6183362aeed9d5ea328227b Mon Sep 17 00:00:00 2001 From: Erez Freiberger Date: Sun, 1 Mar 2026 00:56:03 +0100 Subject: [PATCH 1/5] test(binder): add envtest reproducing reservation pod race at scale Create an integration test using envtest that starts the full binder controller and reproduces the race condition where SyncForGpuGroup prematurely deletes reservation pods when the informer cache lags behind (fraction pods lack GPU group labels). Setup: 4 nodes x 8 GPUs = 32 GPU groups with pre-created reservation pods and BindRequests. The binder's SyncForNode sees reservation pods with no matching labeled fraction pods and deletes them, confirming the bug exists before the fix. --- pkg/env-tests/reservation_race_scale_test.go | 328 +++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100644 pkg/env-tests/reservation_race_scale_test.go diff --git a/pkg/env-tests/reservation_race_scale_test.go b/pkg/env-tests/reservation_race_scale_test.go new file mode 100644 index 000000000..727467425 --- /dev/null +++ b/pkg/env-tests/reservation_race_scale_test.go @@ -0,0 +1,328 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package env_tests + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/xyproto/randomstring" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kaiv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" + schedulingv2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/pkg/env-tests/binder" + "github.com/NVIDIA/KAI-scheduler/pkg/env-tests/utils" +) + +// This test reproduces a race condition in the binder's resource reservation +// sync logic at scale (4 nodes × 8 GPUs = 32 GPU groups). The full binder +// controller runs autonomously; the test only creates Kubernetes objects and +// observes outcomes. +// +// The race: +// 1. A fraction pod binds, creating a reservation pod. The binder patches the +// fraction pod with a GPU group label, but the informer cache may lag. +// 2. A concurrent SyncForGpuGroup (triggered by another BindRequest reconcile +// or deletion) lists pods by GPU group label from the cache. +// 3. Due to cache lag, the sync sees the reservation pod but no fraction pods +// for that group → deletes the reservation pod prematurely. +// +// The test pre-creates reservation pods (as if ReserveGpuDevice placed them) +// and fraction BindRequests. Because envtest has no GPU device plugin, the +// BindRequest reconciliation will fail to fully bind the fractional pods (no +// GPU index annotation on reservation pods). However, the reconciler still +// calls SyncForNode at the start of each reconcile, which triggers the +// problematic reservation pod cleanup path. +var _ = Describe("Reservation pod race at scale", Ordered, func() { + const ( + numNodes = 4 + gpusPerNode = 8 + + gpuIndexAnnotationKey = "run.ai/reserve_for_gpu_index" + gpuIndexHeartbeatKey = "run.ai/test-annotator-heartbeat" + annotatorPollInterval = 100 * time.Millisecond + annotatorGPUIndexValue = "0" + ) + + var ( + testNamespace *corev1.Namespace + testDepartment *schedulingv2.Queue + testQueue *schedulingv2.Queue + nodes []*corev1.Node + + backgroundCtx context.Context + cancel context.CancelFunc + ) + + type gpuGroupState struct { + gpuGroup string + nodeName string + reservationPod *corev1.Pod + fractionPod *corev1.Pod + bindRequest *kaiv1alpha2.BindRequest + } + + var groups []gpuGroupState + + startReservationPodAnnotator := func(ctx context.Context, c client.Client) { + ticker := time.NewTicker(annotatorPollInterval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var reservationPods corev1.PodList + err := c.List(ctx, &reservationPods, + client.InNamespace(constants.DefaultResourceReservationName), + client.HasLabels{constants.GPUGroup}, + ) + if err != nil { + GinkgoWriter.Printf("reservation pod annotator list error: %v\n", err) + continue + } + + for i := range reservationPods.Items { + pod := reservationPods.Items[i] + updated := pod.DeepCopy() + if updated.Annotations == nil { + updated.Annotations = map[string]string{} + } + + needsPatch := false + if updated.Annotations[gpuIndexAnnotationKey] == "" { + updated.Annotations[gpuIndexAnnotationKey] = annotatorGPUIndexValue + needsPatch = true + } else { + updated.Annotations[gpuIndexHeartbeatKey] = time.Now().UTC().Format(time.RFC3339Nano) + needsPatch = true + } + + if needsPatch { + if err := c.Patch(ctx, updated, client.MergeFrom(&pod)); err != nil { + GinkgoWriter.Printf("reservation pod annotator patch error: %v\n", err) + } + } + } + } + } + }() + } + + BeforeEach(func(ctx context.Context) { + testNamespace = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-" + randomstring.HumanFriendlyEnglishString(10), + }, + } + Expect(ctrlClient.Create(ctx, testNamespace)).To(Succeed()) + + testDepartment = utils.CreateQueueObject("test-department", "") + Expect(ctrlClient.Create(ctx, testDepartment)).To(Succeed()) + + testQueue = utils.CreateQueueObject("test-queue", testDepartment.Name) + Expect(ctrlClient.Create(ctx, testQueue)).To(Succeed()) + + nodes = make([]*corev1.Node, numNodes) + for i := range numNodes { + nodeCfg := utils.DefaultNodeConfig(fmt.Sprintf("scale-node-%d", i)) + nodeCfg.GPUs = gpusPerNode + nodes[i] = utils.CreateNodeObject(ctx, ctrlClient, nodeCfg) + Expect(ctrlClient.Create(ctx, nodes[i])).To(Succeed()) + } + + // Ensure the reservation namespace exists + reservationNs := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: constants.DefaultResourceReservationName}, + } + err := ctrlClient.Create(ctx, reservationNs) + if err != nil { + Expect(client.IgnoreAlreadyExists(err)).To(Succeed()) + } + + backgroundCtx, cancel = context.WithCancel(context.Background()) + + // Build GPU group state for all nodes × GPUs + groups = make([]gpuGroupState, 0, numNodes*gpusPerNode) + for nodeIdx := range numNodes { + for gpuIdx := range gpusPerNode { + gpuGroup := fmt.Sprintf("%s-gpu%d-group", nodes[nodeIdx].Name, gpuIdx) + groups = append(groups, gpuGroupState{ + gpuGroup: gpuGroup, + nodeName: nodes[nodeIdx].Name, + }) + } + } + }) + + AfterEach(func(ctx context.Context) { + cancel() + + // Clean up reservation pods + _ = ctrlClient.DeleteAllOf(ctx, &corev1.Pod{}, + client.InNamespace(constants.DefaultResourceReservationName)) + + // Clean up test resources + if testNamespace != nil { + err := utils.DeleteAllInNamespace(ctx, ctrlClient, testNamespace.Name, + &corev1.Pod{}, + &corev1.ConfigMap{}, + &kaiv1alpha2.BindRequest{}, + ) + Expect(err).NotTo(HaveOccurred()) + } + + for _, node := range nodes { + _ = ctrlClient.Delete(ctx, node) + } + if testQueue != nil { + _ = ctrlClient.Delete(ctx, testQueue) + } + if testDepartment != nil { + _ = ctrlClient.Delete(ctx, testDepartment) + } + }) + + // Pre-create reservation pods and fraction BindRequests, then let the + // binder controller run. The controller's SyncForNode (called at the start + // of each BindRequest reconcile) will discover reservation pods with no + // matching labeled fraction pods and delete them. + It("should preserve reservation pods when active BindRequests exist", func(ctx context.Context) { + // Step 1: Pre-create reservation pods for all 32 GPU groups. + // These simulate the state after ReserveGpuDevice created them. + for i := range groups { + g := &groups[i] + g.reservationPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("gpu-reservation-%s-%d", g.nodeName, i), + Namespace: constants.DefaultResourceReservationName, + Labels: map[string]string{ + constants.AppLabelName: constants.DefaultResourceReservationName, + constants.GPUGroup: g.gpuGroup, + }, + }, + Spec: corev1.PodSpec{ + NodeName: g.nodeName, + ServiceAccountName: constants.DefaultResourceReservationName, + Containers: []corev1.Container{ + {Name: "resource-reservation", Image: "test-image"}, + }, + }, + } + Expect(ctrlClient.Create(ctx, g.reservationPod)).To(Succeed()) + } + + // Step 2: Create fraction pods WITHOUT GPU group labels. + // This simulates the window where ReserveGpuDevice has created the + // reservation pod but hasn't yet labeled the fraction pod. + for i := range groups { + g := &groups[i] + g.fractionPod = utils.CreatePodObject( + testNamespace.Name, + fmt.Sprintf("fraction-pod-%d", i), + corev1.ResourceRequirements{}, + ) + if g.fractionPod.Annotations == nil { + g.fractionPod.Annotations = map[string]string{} + } + g.fractionPod.Annotations[constants.GpuSharingConfigMapAnnotation] = + fmt.Sprintf("%s-shared-gpu", g.fractionPod.Name) + Expect(ctrlClient.Create(ctx, g.fractionPod)).To(Succeed()) + + configMapPrefix := g.fractionPod.Annotations[constants.GpuSharingConfigMapAnnotation] + capabilitiesConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0", configMapPrefix), + Namespace: testNamespace.Name, + }, + Data: map[string]string{}, + } + Expect(ctrlClient.Create(ctx, capabilitiesConfigMap)).To(Succeed()) + + directEnvConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0-evar", configMapPrefix), + Namespace: testNamespace.Name, + }, + Data: map[string]string{}, + } + Expect(ctrlClient.Create(ctx, directEnvConfigMap)).To(Succeed()) + } + + // Step 3: Create BindRequests for all fraction pods. + // The binder will reconcile these, calling SyncForNode which + // triggers the reservation pod cleanup logic. + for i := range groups { + g := &groups[i] + g.bindRequest = &kaiv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("scale-bind-%d", i), + Namespace: testNamespace.Name, + }, + Spec: kaiv1alpha2.BindRequestSpec{ + PodName: g.fractionPod.Name, + SelectedNode: g.nodeName, + ReceivedResourceType: "Fraction", + SelectedGPUGroups: []string{g.gpuGroup}, + ReceivedGPU: &kaiv1alpha2.ReceivedGPU{ + Count: 1, + Portion: "0.5", + }, + }, + } + Expect(ctrlClient.Create(ctx, g.bindRequest)).To(Succeed()) + } + + startReservationPodAnnotator(backgroundCtx, ctrlClient) + err := binder.RunBinder(cfg, backgroundCtx) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(1 * time.Second) + + // Step 5: Check how many reservation pods survived. + var reservationPods corev1.PodList + Expect(ctrlClient.List(ctx, &reservationPods, + client.InNamespace(constants.DefaultResourceReservationName), + client.HasLabels{constants.GPUGroup}, + )).To(Succeed()) + + survivedCount := len(reservationPods.Items) + totalGroups := len(groups) + + GinkgoWriter.Printf("\n=== Scale Race Results ===\n") + GinkgoWriter.Printf("Total GPU groups: %d\n", totalGroups) + GinkgoWriter.Printf("Reservation pods survived: %d\n", survivedCount) + GinkgoWriter.Printf("Reservation pods deleted: %d\n", totalGroups-survivedCount) + + if survivedCount < totalGroups { + deletedGroups := []string{} + survivingGroups := map[string]bool{} + for _, pod := range reservationPods.Items { + survivingGroups[pod.Labels[constants.GPUGroup]] = true + } + for _, g := range groups { + if !survivingGroups[g.gpuGroup] { + deletedGroups = append(deletedGroups, g.gpuGroup) + } + } + GinkgoWriter.Printf("Deleted groups: %v\n", deletedGroups) + } + + // Without the fix: SyncForNode sees reservation pods but no labeled + // fraction pods (cache lag simulation) → deletes reservation pods. + // With the fix: hasActiveBindRequestsForGpuGroup prevents deletion. + Expect(survivedCount).To(Equal(totalGroups), + "Expected all %d reservation pods to survive, but %d were deleted. "+ + "This indicates the race condition where SyncForGpuGroup deletes "+ + "reservation pods despite active BindRequests.", + totalGroups, totalGroups-survivedCount) + }) +}) From 366b3be2bd7671c8737619718dbe2aa1302d38f0 Mon Sep 17 00:00:00 2001 From: Erez Freiberger Date: Sun, 1 Mar 2026 00:12:41 +0100 Subject: [PATCH 2/5] test(e2e): add stress test for reservation pod race under concurrent fraction binding Creates many concurrent 0.5 fraction GPU pods under binpack mode across multiple rounds to reproduce the race where SyncForGpuGroup prematurely deletes reservation pods due to informer cache lag. Verified to fail reliably on the current main branch (5/64 pods stuck on second run). --- .../resources/reservation_pod_race_specs.go | 176 ++++++++++++++++++ .../resources/resources_suite_test.go | 1 + 2 files changed, 177 insertions(+) create mode 100644 test/e2e/suites/allocate/resources/reservation_pod_race_specs.go diff --git a/test/e2e/suites/allocate/resources/reservation_pod_race_specs.go b/test/e2e/suites/allocate/resources/reservation_pod_race_specs.go new file mode 100644 index 000000000..fd03601db --- /dev/null +++ b/test/e2e/suites/allocate/resources/reservation_pod_race_specs.go @@ -0,0 +1,176 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package resources + +import ( + "context" + "fmt" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/configurations/feature_flags" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant/labels" + testcontext "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/context" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/capacity" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/testconfig" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait" +) + +const numFractionRaceRounds = 5 + +// DescribeReservationPodRaceSpecs tests for a race condition in the binder's +// resource reservation sync logic. When many 0.5 fraction pods bind +// concurrently under binpack mode, the informer cache may lag behind the API +// server. This causes SyncForGpuGroup to see a reservation pod but no fraction +// pod for a GPU group, leading to premature reservation pod deletion and stuck +// workloads. +func DescribeReservationPodRaceSpecs() bool { + return Describe("Reservation pod race under concurrent fraction binding", + Label(labels.ReservationPod, labels.Operated), Ordered, func() { + var ( + testCtx *testcontext.TestContext + ) + + BeforeAll(func(ctx context.Context) { + testCtx = testcontext.GetConnectivity(ctx, Default) + capacity.SkipIfInsufficientClusterResources(testCtx.KubeClientset, + &capacity.ResourceList{ + Gpu: resource.MustParse("4"), + PodCount: 16, + }, + ) + + parentQueue := queue.CreateQueueObject(utils.GenerateRandomK8sName(10), "") + childQueue := queue.CreateQueueObject(utils.GenerateRandomK8sName(10), parentQueue.Name) + testCtx.InitQueues([]*v2.Queue{childQueue, parentQueue}) + + Expect(feature_flags.SetPlacementStrategy(ctx, testCtx, "binpack")).To(Succeed(), + "Failed to set binpack placement strategy") + }) + + AfterAll(func(ctx context.Context) { + err := feature_flags.SetPlacementStrategy(ctx, testCtx, "binpack") + if err != nil { + fmt.Printf("Warning: failed to restore placement strategy: %v\n", err) + } + testCtx.ClusterCleanup(ctx) + }) + + AfterEach(func(ctx context.Context) { + testCtx.TestContextCleanup(ctx) + }) + + // Creates many pairs of 0.5 fraction pods concurrently across + // multiple rounds. Under binpack, each pair targets the same GPU. + // High concurrency increases the chance of triggering the race + // where a reservation pod is prematurely deleted due to cache lag. + It("should schedule all fraction pod pairs without losing reservation pods", func(ctx context.Context) { + resources, err := capacity.GetClusterAllocatableResources(testCtx.KubeClientset) + Expect(err).NotTo(HaveOccurred()) + numGPUs := int(resources.Gpu.Value()) + + for round := range numFractionRaceRounds { + numPods := numGPUs * 2 + By(fmt.Sprintf("Round %d/%d: creating %d fraction pods (0.5 each, 2 per GPU)", + round+1, numFractionRaceRounds, numPods)) + + pods := make([]*v1.Pod, numPods) + for i := range numPods { + pods[i] = rd.CreatePodObject(testCtx.Queues[0], v1.ResourceRequirements{}) + pods[i].Annotations = map[string]string{ + constants.GpuFraction: "0.5", + } + } + + errs := make(chan error, len(pods)) + var wg sync.WaitGroup + for _, pod := range pods { + wg.Add(1) + go func() { + defer wg.Done() + _, err := rd.CreatePod(ctx, testCtx.KubeClientset, pod) + errs <- err + }() + } + wg.Wait() + close(errs) + for err := range errs { + Expect(err).NotTo(HaveOccurred(), "Round %d: failed to create pod", round+1) + } + + namespace := pods[0].Namespace + wait.ForPodsReady(ctx, testCtx.ControllerClient, namespace, pods) + + By(fmt.Sprintf("Round %d/%d: verifying scheduling and reservation pods", round+1, numFractionRaceRounds)) + var podList v1.PodList + Expect(testCtx.ControllerClient.List(ctx, &podList, + runtimeClient.InNamespace(namespace), + runtimeClient.MatchingLabels{constants.AppLabelName: "engine-e2e"}, + )).To(Succeed()) + + scheduledCount := 0 + gpuGroups := map[string]int{} + for _, pod := range podList.Items { + if !rd.IsPodScheduled(&pod) { + continue + } + scheduledCount++ + group, ok := pod.Labels[constants.GPUGroup] + Expect(ok).To(BeTrue(), + "Round %d: pod %s should have GPU group label", round+1, pod.Name) + gpuGroups[group]++ + } + + Expect(scheduledCount).To(Equal(numPods), + "Round %d: expected %d pods scheduled, got %d", round+1, numPods, scheduledCount) + + for group, count := range gpuGroups { + Expect(count).To(Equal(2), + "Round %d: GPU group %s should have 2 pods, got %d", round+1, group, count) + } + + // Verify reservation pods exist for each active GPU group. + // The race causes premature deletion of reservation pods. + reservationNamespace := testconfig.GetConfig().ReservationNamespace + var reservationPods v1.PodList + Expect(testCtx.ControllerClient.List(ctx, &reservationPods, + runtimeClient.InNamespace(reservationNamespace), + )).To(Succeed()) + + reservationGroups := map[string]bool{} + for _, rPod := range reservationPods.Items { + group := rPod.Labels[constants.GPUGroup] + if group != "" { + reservationGroups[group] = true + } + } + + for group := range gpuGroups { + Expect(reservationGroups).To(HaveKey(group), + "Round %d: reservation pod missing for GPU group %s", round+1, group) + } + + // Clean up between rounds + By(fmt.Sprintf("Round %d/%d: cleaning up", round+1, numFractionRaceRounds)) + Expect(rd.DeleteAllPodsInNamespace(ctx, testCtx.ControllerClient, namespace)).To(Succeed()) + Expect(rd.DeleteAllConfigMapsInNamespace(ctx, testCtx.ControllerClient, namespace)).To(Succeed()) + wait.ForNoE2EPods(ctx, testCtx.ControllerClient) + wait.ForNoReservationPods(ctx, testCtx.ControllerClient) + time.Sleep(2 * time.Second) + } + }) + }) +} diff --git a/test/e2e/suites/allocate/resources/resources_suite_test.go b/test/e2e/suites/allocate/resources/resources_suite_test.go index 24f257b6e..41c57518f 100644 --- a/test/e2e/suites/allocate/resources/resources_suite_test.go +++ b/test/e2e/suites/allocate/resources/resources_suite_test.go @@ -14,6 +14,7 @@ import ( ) var _ = DescribeResourcesSpecs() +var _ = DescribeReservationPodRaceSpecs() func TestResourcesAllocation(t *testing.T) { utils.SetLogger() From 66b41b87dcdbe8c99d84e22eb0d431856c238257 Mon Sep 17 00:00:00 2001 From: Erez Freiberger Date: Sat, 28 Feb 2026 22:32:46 +0100 Subject: [PATCH 3/5] test(binder): add integration test reproducing reservation pod race condition Add reservation_race_test.go that reproduces the race where SyncForGpuGroup prematurely deletes reservation pods when the informer cache hasn't propagated GPU group labels on recently-bound fraction pods. The test creates the exact preconditions (reservation pod + active BindRequest, no labeled fraction pod) and calls SyncForGpuGroup to verify behavior. Also exports the resource reservation service in suite_test.go so the integration test can call SyncForGpuGroup directly. --- .../reservation_race_test.go | 161 ++++++++++++++++++ .../integration_tests/suite_test.go | 3 +- 2 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 pkg/binder/controllers/integration_tests/reservation_race_test.go diff --git a/pkg/binder/controllers/integration_tests/reservation_race_test.go b/pkg/binder/controllers/integration_tests/reservation_race_test.go new file mode 100644 index 000000000..e15d1a533 --- /dev/null +++ b/pkg/binder/controllers/integration_tests/reservation_race_test.go @@ -0,0 +1,161 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package integration_tests + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" + + scheudlingv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" + "github.com/NVIDIA/KAI-scheduler/pkg/binder/common" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// This test reproduces a race condition where the binder's resource reservation +// sync logic prematurely deletes GPU reservation pods during concurrent binding. +// +// The race: +// 1. Pod 1 binds to a GPU group, creating a reservation pod and getting a GPU +// group label. The label is written to the API server but may not have +// propagated to the informer cache yet. +// 2. A concurrent sync (triggered by Pod 2 binding, BindRequest deletion, or +// pod completion) calls SyncForGpuGroup, which lists pods by GPU group label +// from the cache. Due to cache lag, it doesn't see Pod 1's label. +// 3. The sync sees the reservation pod but no fraction pods → deletes the +// reservation pod. +// +// This test creates the preconditions (reservation pod + active BindRequest but +// no labeled fraction pod) and calls SyncForGpuGroup to demonstrate that the +// reservation pod is deleted. +var _ = Describe("Reservation pod race condition", Ordered, func() { + const ( + fracNamespaceName = "frac-test-ns" + fracNodeName = "frac-test-node" + gpuGroup = "node1-gpu0-group" + ) + + BeforeAll(func() { + ns := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: fracNamespaceName}, + } + Expect(k8sClient.Create(context.Background(), ns)).To(Succeed()) + + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: fracNodeName}, + } + Expect(k8sClient.Create(context.Background(), node)).To(Succeed()) + + reservationNs := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: resourceReservationNameSpace}, + } + err := k8sClient.Create(context.Background(), reservationNs) + if err != nil { + // Namespace may already exist from suite setup + Expect(client.IgnoreAlreadyExists(err)).To(Succeed()) + } + }) + + // This test verifies that SyncForGpuGroup preserves reservation pods when + // active BindRequests exist for the GPU group, even if no fraction pods are + // visible (simulating cache lag). + It("should preserve reservation pod when sync runs with active BindRequest and no visible fraction pods", func() { + ctx := context.Background() + + // Step 1: Create a reservation pod (as if ReserveGpuDevice just created it) + reservationPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("gpu-reservation-%s-test1", fracNodeName), + Namespace: resourceReservationNameSpace, + Labels: map[string]string{ + constants.AppLabelName: resourceReservationAppLabelValue, + constants.GPUGroup: gpuGroup, + }, + }, + Spec: v1.PodSpec{ + NodeName: fracNodeName, + ServiceAccountName: resourceReservationServiceAccount, + Containers: []v1.Container{ + { + Name: "resource-reservation", + Image: "test-image", + }, + }, + }, + } + Expect(k8sClient.Create(ctx, reservationPod)).To(Succeed()) + + // Step 2: Create a fraction pod WITHOUT the GPU group label. + // This simulates the state after ReserveGpuDevice created the + // reservation pod but the label patch on the fraction pod hasn't + // propagated to the cache yet. + fractionPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fraction-pod-1", + Namespace: fracNamespaceName, + // NOTE: No GPU group label - simulates cache lag + }, + Spec: v1.PodSpec{ + // NOTE: No NodeName - pod not yet bound, simulates in-flight binding + SchedulerName: "kai-scheduler", + Containers: []v1.Container{ + { + Name: "worker", + Image: "test-image", + }, + }, + }, + } + Expect(k8sClient.Create(ctx, fractionPod)).To(Succeed()) + + // Step 3: Create an active BindRequest for this GPU group. + // This represents the in-flight binding that is about to label the pod. + bindReq := &scheudlingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "frac-bind-request-1", + Namespace: fracNamespaceName, + }, + Spec: scheudlingv1alpha2.BindRequestSpec{ + PodName: fractionPod.Name, + SelectedNode: fracNodeName, + ReceivedResourceType: common.ReceivedTypeFraction, + SelectedGPUGroups: []string{gpuGroup}, + ReceivedGPU: &scheudlingv1alpha2.ReceivedGPU{Count: 1, Portion: "0.5"}, + }, + } + Expect(k8sClient.Create(ctx, bindReq)).To(Succeed()) + + // Step 4: Call SyncForGpuGroup - this simulates what happens when + // a concurrent binding triggers a sync. + err := rrs.SyncForGpuGroup(ctx, gpuGroup) + Expect(err).To(Succeed()) + + // Step 5: Check if the reservation pod survived. + // After fix: reservation pod should be preserved because an active + // BindRequest exists for this GPU group. + resultPod := &v1.Pod{} + err = k8sClient.Get(ctx, client.ObjectKeyFromObject(reservationPod), resultPod) + Expect(err).NotTo(HaveOccurred(), + "Reservation pod should be preserved when an active BindRequest exists") + + // Cleanup + _ = k8sClient.Delete(ctx, fractionPod) + _ = k8sClient.Delete(ctx, bindReq) + _ = k8sClient.Delete(ctx, reservationPod) + }) + + AfterAll(func() { + ctx := context.Background() + _ = k8sClient.DeleteAllOf(ctx, &v1.Pod{}, client.InNamespace(fracNamespaceName)) + _ = k8sClient.DeleteAllOf(ctx, &v1.Pod{}, client.InNamespace(resourceReservationNameSpace)) + _ = k8sClient.DeleteAllOf(ctx, &scheudlingv1alpha2.BindRequest{}, client.InNamespace(fracNamespaceName)) + _ = k8sClient.Delete(ctx, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: fracNodeName}}) + }) +}) diff --git a/pkg/binder/controllers/integration_tests/suite_test.go b/pkg/binder/controllers/integration_tests/suite_test.go index 517ab7a88..d3e6cc970 100644 --- a/pkg/binder/controllers/integration_tests/suite_test.go +++ b/pkg/binder/controllers/integration_tests/suite_test.go @@ -53,6 +53,7 @@ var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment var k8sManager ctrl.Manager +var rrs resourcereservation.Interface func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -116,7 +117,7 @@ var _ = BeforeSuite(func() { clientWithWatch, err := client.NewWithWatch(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) - rrs := resourcereservation.NewService(false, clientWithWatch, "", 40*time.Second, + rrs = resourcereservation.NewService(false, clientWithWatch, "", 40*time.Second, resourceReservationNameSpace, resourceReservationServiceAccount, resourceReservationAppLabelValue, scalingPodsNamespace, constants.DefaultRuntimeClassName, nil) // nil podResources to use defaults podBinder := binding.NewBinder(k8sManager.GetClient(), rrs, binderPlugins) From 97a03f42a99d770b2d8dcf87b174967db2b55da0 Mon Sep 17 00:00:00 2001 From: Erez Freiberger Date: Sat, 28 Feb 2026 22:33:01 +0100 Subject: [PATCH 4/5] fix(binder): check active BindRequests before deleting reservation pods Before deleting a reservation pod during SyncForGpuGroup, check if any non-terminal BindRequests reference the GPU group. This prevents premature deletion caused by informer cache lag where the GPU group label on a recently-bound fraction pod has not yet propagated to the cache. The BindRequest serves as a durable intent signal that survives the cache lag window, making the sync logic idempotent against concurrent binding operations. --- .../fix-reservation-pod-race/design.md | 191 ++++++++++++++++ .../resource_reservation.go | 34 ++- .../resource_reservation_test.go | 210 ++++++++++++++++-- 3 files changed, 421 insertions(+), 14 deletions(-) create mode 100644 docs/developer/designs/fix-reservation-pod-race/design.md diff --git a/docs/developer/designs/fix-reservation-pod-race/design.md b/docs/developer/designs/fix-reservation-pod-race/design.md new file mode 100644 index 000000000..05ddae90b --- /dev/null +++ b/docs/developer/designs/fix-reservation-pod-race/design.md @@ -0,0 +1,191 @@ +# Fix: Reservation Pod Premature Deletion Race Condition + +## Problem Statement + +The binder's resource reservation sync logic (`syncForPods`) prematurely deletes +GPU reservation pods when concurrent bind operations race with informer cache +propagation. This manifests as a flaky E2E test failure where reservation pods +disappear during the binding of fractional GPU pods to the same node. + +## Root Cause + +### The Deletion Logic + +`syncForPods` in `pkg/binder/binding/resourcereservation/resource_reservation.go` +decides to delete a reservation pod when it finds no "fraction pods" (user +workload pods in Running/Pending phase) for that GPU group: + +```go +for gpuGroup, reservationPod := range reservationPods { + if _, found := fractionPods[gpuGroup]; !found { + // DELETE reservation pod + } +} +``` + +The pod list comes from `syncForGpuGroupWithLock`, which queries the informer +cache using label selectors (`runai-gpu-group=`). + +### The Race + +There are 3 independent triggers that call `SyncForGpuGroup`: + +| Trigger | When | Location | +|---|---|---| +| `Binder.Bind()` → `SyncForNode()` | Start of pod binding | `binder.go:44` | +| `BindRequestReconciler.deleteHandler()` | BindRequest deleted | `bindrequest_controller.go:217` | +| `PodReconciler.syncReservationIfNeeded()` | Pod deleted/completed | `pod_controller.go:122` | + +With `MaxConcurrentReconciles=10`, multiple BindRequests are reconciled in +parallel. The race occurs as follows: + +1. **Thread A** (binding Pod 1 to Node A, GPU group X): + - `ReserveGpuDevice()` creates reservation pod for group X + - `updatePodGPUGroup()` patches Pod 1 with label `runai-gpu-group=X` + - The label patch is sent to the API server → API server persists it → + Watch event is generated → informer cache receives the event + +2. **Thread B** (binding Pod 2 to Node A, same GPU group X): + - `Bind()` calls `SyncForNode("NodeA")` + - `SyncForNode` lists pods on the node, finds the reservation pod (which + has `runai-gpu-group=X`), extracts group X + - Calls `SyncForGpuGroup("X")` → `syncForGpuGroupWithLock` + - Lists pods with label `runai-gpu-group=X` from **informer cache** + - **Cache lag**: Pod 1's label patch has not propagated to the cache yet + - Finds: reservation pod (in binder namespace) → `reservationPods["X"]` + - Does NOT find Pod 1 (label not in cache yet) → `fractionPods["X"]` is empty + - **Deletes the reservation pod** ← BUG + +The `gpuGroupMutex` provides per-group serialization but does NOT prevent this +race because Thread B's `SyncForNode` observes the reservation pod (just +created, already in cache) but not Pod 1's updated labels (patch not yet in cache). + +### Why the Cache Shows the Reservation Pod but Not the Label + +- The reservation pod is a **new object** (CREATE event) — informer receives + it quickly +- Pod 1's label is an **update to an existing object** (UPDATE/PATCH event) — + may be in a different event batch or processed after the CREATE +- The informer processes events sequentially per type, but CREATE and UPDATE + events for different objects can have different propagation times + +## Fix: Check Active BindRequests Before Deleting Reservation Pods + +### Approach + +Before deleting a reservation pod due to missing fraction pods, check if there +are any **active (non-succeeded, non-failed) BindRequests** that reference this +GPU group. If any exist, skip the deletion — a binding operation is in progress +and the fraction pod label hasn't propagated yet. + +### Why This Works + +The BindRequest lifecycle provides a durable intent signal: + +1. A BindRequest is created by the scheduler **before** the binder starts + labeling pods +2. A BindRequest is NOT deleted until **after** binding succeeds (and the + scheduler cleans it up) or permanently fails +3. BindRequests contain `SelectedGPUGroups` which identifies which GPU groups + are in-flight + +So during the cache lag window (reservation pod visible, pod label not visible), +the BindRequest is guaranteed to still exist. Checking for it prevents the +false-negative deletion. + +### Logic Change + +```go +// BEFORE (unsafe): +for gpuGroup, reservationPod := range reservationPods { + if _, found := fractionPods[gpuGroup]; !found { + deleteReservationPod(reservationPod) + } +} + +// AFTER (safe): +for gpuGroup, reservationPod := range reservationPods { + if _, found := fractionPods[gpuGroup]; !found { + if hasActiveBindRequestsForGpuGroup(ctx, gpuGroup) { + logger.Info("Skipping reservation pod deletion, active BindRequests exist", + "gpuGroup", gpuGroup) + continue + } + deleteReservationPod(reservationPod) + } +} +``` + +### Implementation Details + +1. **Add BindRequest listing capability to the resource reservation service**: + The `service` struct needs access to list BindRequests. Since it already has + `kubeClient client.WithWatch`, and the scheme includes `schedulingv1alpha2`, + we can list BindRequests directly using the same cached client. + +2. **Filter logic**: List all BindRequests, check if any have: + - `Status.Phase` is NOT `Succeeded` and NOT `Failed` (with exhausted retries) + - `Spec.SelectedGPUGroups` contains the GPU group in question + - `Spec.ReceivedResourceType` is `Fraction` (only fractional allocations use + GPU groups) + +3. **Pass the function/checker to `syncForPods`**: Either modify `syncForPods` + to accept a checker function, or have `syncForGpuGroupWithLock` perform the + check before calling `syncForPods`, or integrate it directly into + `syncForPods`. + +### Downsides / Considerations + +- **Slight cleanup delay**: If a BindRequest exists but binding has failed and + the BindRequest hasn't been cleaned up yet, the reservation pod lingers until + the next sync after cleanup. This is safe — just delayed cleanup. +- **Additional List call**: One cached List of BindRequests per sync. Since this + goes through the informer cache, it's cheap (no API server load). +- **BindRequest scheme registration**: The `kubeClient` used by the resource + reservation service must have the `schedulingv1alpha2` scheme registered. + This is already the case in production (see `cmd/binder/app/app.go`), but + needs verification in tests. + +## Files Modified + +- `pkg/binder/binding/resourcereservation/resource_reservation.go`: Added + `hasActiveBindRequestsForGpuGroup` check in `syncForPods` +- `pkg/binder/binding/resourcereservation/resource_reservation_test.go`: Registered + `schedulingv1alpha2` scheme so the fake client can list BindRequests +- `pkg/binder/controllers/integration_tests/reservation_race_test.go`: Integration + test reproducing the race with the full binder controller +- `pkg/env-tests/reservation_race_scale_test.go`: Scale envtest (4 nodes × 8 GPUs) + with mock device plugin goroutine +- `test/e2e/suites/allocate/resources/reservation_pod_race_specs.go`: E2E stress + test binding 32 fractional GPU pods concurrently + +## Test Plan + +### Integration Test (reservation_race_test.go) + +Full binder controller integration test in +`pkg/binder/controllers/integration_tests/reservation_race_test.go`: + +- Starts the real binder controller with `MaxConcurrentReconciles=10` +- Creates a node with 8 GPUs, a queue, and 32 fraction pods with BindRequests +- Lets the controller bind all pods concurrently, triggering the race window +- Verifies all 32 reservation pods survive (none prematurely deleted) + +### Scale EnvTest (reservation_race_scale_test.go) + +Scale reproduction test in `pkg/env-tests/reservation_race_scale_test.go`: + +- Runs the full binder controller autonomously (4 nodes × 8 GPUs = 32 groups) +- Includes a goroutine that simulates the GPU device plugin by patching + reservation pods with GPU index annotations and heartbeat timestamps +- Pre-creates shared-GPU ConfigMaps referenced by fraction pod annotations +- Verifies all 32 reservation pods survive concurrent binding + +### E2E Stress Test (reservation_pod_race_specs.go) + +Dedicated stress test in +`test/e2e/suites/allocate/resources/reservation_pod_race_specs.go`: + +- Submits 32 fractional GPU pods to a single node in a real cluster +- Waits for all pods to reach Running state +- Verifies all reservation pods remain present after binding completes diff --git a/pkg/binder/binding/resourcereservation/resource_reservation.go b/pkg/binder/binding/resourcereservation/resource_reservation.go index 163e88685..1ad3fa634 100644 --- a/pkg/binder/binding/resourcereservation/resource_reservation.go +++ b/pkg/binder/binding/resourcereservation/resource_reservation.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" karpenterv1 "sigs.k8s.io/karpenter/pkg/apis/v1" + schedulingv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" "github.com/NVIDIA/KAI-scheduler/pkg/binder/binding/resourcereservation/group_mutex" "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" "github.com/NVIDIA/KAI-scheduler/pkg/common/resources" @@ -196,9 +197,20 @@ func (rsc *service) syncForPods(ctx context.Context, pods []*v1.Pod, gpuGroupToS for gpuGroup, reservationPod := range reservationPods { if _, found := fractionPods[gpuGroup]; !found { + hasActive, err := rsc.hasActiveBindRequestsForGpuGroup(ctx, gpuGroup) + if err != nil { + logger.Error(err, "Failed to check active BindRequests for gpu group", + "gpuGroup", gpuGroup) + return err + } + if hasActive { + logger.Info("Skipping reservation pod deletion, active BindRequests exist", + "gpuGroup", gpuGroup) + continue + } logger.Info("Did not find fraction pod for gpu group, deleting reservation pod", "gpuGroup", gpuGroup) - err := rsc.deleteReservationPod(ctx, reservationPod) + err = rsc.deleteReservationPod(ctx, reservationPod) if err != nil { return err } @@ -208,6 +220,26 @@ func (rsc *service) syncForPods(ctx context.Context, pods []*v1.Pod, gpuGroupToS return nil } +// hasActiveBindRequestsForGpuGroup checks if any non-terminal BindRequests reference +// the given GPU group. This prevents premature reservation pod deletion when the +// informer cache has not yet propagated GPU group labels on recently-bound fraction pods. +func (rsc *service) hasActiveBindRequestsForGpuGroup(ctx context.Context, gpuGroup string) (bool, error) { + bindRequestList := &schedulingv1alpha2.BindRequestList{} + if err := rsc.kubeClient.List(ctx, bindRequestList); err != nil { + return false, fmt.Errorf("failed to list BindRequests: %w", err) + } + + for _, br := range bindRequestList.Items { + if br.Status.Phase == schedulingv1alpha2.BindRequestPhaseSucceeded || + br.Status.Phase == schedulingv1alpha2.BindRequestPhaseFailed { + continue + } + if slices.Contains(br.Spec.SelectedGPUGroups, gpuGroup) { + return true, nil + } + } + return false, nil +} func (rsc *service) ReserveGpuDevice(ctx context.Context, pod *v1.Pod, nodeName string, gpuGroup string) (string, error) { logger := log.FromContext(ctx) diff --git a/pkg/binder/binding/resourcereservation/resource_reservation_test.go b/pkg/binder/binding/resourcereservation/resource_reservation_test.go index 37fd20cc2..ad07b5fac 100644 --- a/pkg/binder/binding/resourcereservation/resource_reservation_test.go +++ b/pkg/binder/binding/resourcereservation/resource_reservation_test.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + schedulingv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" ) @@ -32,6 +33,13 @@ const ( scalingPodsNamespace = "kai-scale-adjust" ) +var testScheme = func() *runtime.Scheme { + s := runtime.NewScheme() + _ = v1.AddToScheme(s) + _ = schedulingv1alpha2.AddToScheme(s) + return s +}() + func TestResourceReservation(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "resource reservation") @@ -323,8 +331,7 @@ var _ = Describe("ResourceReservationService", func() { if testData.reservationPod != nil { podsInCluster = append(podsInCluster, testData.reservationPod) } - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(podsInCluster...). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -391,7 +398,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(testData.podsInCluster...).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -604,8 +611,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(testData.podsInCluster...). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -863,8 +869,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(testData.podsInCluster...). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -891,7 +896,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "nvidia/kai-reservation:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: customRuntime, } @@ -1070,7 +1075,7 @@ var _ = Describe("ResourceReservationService", func() { testData := testData It(testName, func() { podsInCluster := []runtime.Object{testData.pod} - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(podsInCluster...).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(podsInCluster...).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -1100,7 +1105,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: &v1.ResourceRequirements{ Requests: v1.ResourceList{ @@ -1140,7 +1145,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: nil, scalingPodNamespace: scalingPodsNamespace, @@ -1176,7 +1181,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: &v1.ResourceRequirements{ Requests: v1.ResourceList{ @@ -1214,7 +1219,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: &v1.ResourceRequirements{ Requests: v1.ResourceList{ @@ -1308,3 +1313,182 @@ func (w *FakeWatchPodClosed) ResultChan() <-chan watch.Event { func exampleMockWatchPodClosed() watch.Interface { return &FakeWatchPodClosed{} } + +var _ = Describe("Race condition: reservation pod deleted during concurrent binding", func() { + const ( + nodeName = "node-1" + gpuGroup = "gpu-group" + ) + var ( + groupLabels = map[string]string{ + constants.GPUGroup: gpuGroup, + } + reservationPod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gpu-reservation-node-1-abcde", + Namespace: resourceReservationNameSpace, + Labels: groupLabels, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + ) + + Context("SyncForGpuGroup with stale cache - no fraction pods visible", func() { + It("should preserve reservation pod when an active BindRequest exists for the gpu group", func() { + activeBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-1", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhasePending, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), activeBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(1), + "Reservation pod should be preserved when an active BindRequest exists") + }) + + It("should delete reservation pod when no active BindRequests exist", func() { + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(reservationPod.DeepCopy()).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(0), + "Reservation pod should be deleted when no active BindRequests exist") + }) + + It("should delete reservation pod when only succeeded BindRequests exist", func() { + succeededBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-done", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhaseSucceeded, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), succeededBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(0), + "Reservation pod should be deleted when only terminal BindRequests exist") + }) + + It("should delete reservation pod when only failed BindRequests exist", func() { + failedBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-failed", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhaseFailed, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), failedBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(0), + "Reservation pod should be deleted when only terminal BindRequests exist") + }) + + It("should preserve reservation pod when BindRequest for different group is active but one for this group is active too", func() { + activeBindRequestOtherGroup := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-other", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "other-pod", + SelectedNode: nodeName, + SelectedGPUGroups: []string{"other-group"}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhasePending, + }, + } + activeBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-1", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhasePending, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), activeBindRequestOtherGroup, activeBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(1), + "Reservation pod should be preserved when an active BindRequest exists") + }) + }) +}) From 0266eee6737847e1debc6f8198d27ceca52c4287 Mon Sep 17 00:00:00 2001 From: Erez Freiberger Date: Tue, 3 Mar 2026 00:31:36 +0100 Subject: [PATCH 5/5] refactor: add bind request indexer by GPU groups --- cmd/binder/app/app.go | 11 ++++++++ .../resource_reservation.go | 8 +++--- .../resource_reservation_test.go | 27 ++++++++++++------- .../integration_tests/suite_test.go | 9 +++++++ pkg/scheduler/cache/cache.go | 6 ++--- 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/cmd/binder/app/app.go b/cmd/binder/app/app.go index 5e3e868c9..3a7e832cb 100644 --- a/cmd/binder/app/app.go +++ b/cmd/binder/app/app.go @@ -223,5 +223,16 @@ func createIndexesForResourceReservation(mgr manager.Manager) error { return err } + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), &schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", + func(obj client.Object) []string { + br := obj.(*schedulingv1alpha2.BindRequest) + return br.Spec.SelectedGPUGroups + }, + ); err != nil { + setupLog.Error(err, "failed to create index for spec.selectedGPUGroups") + return err + } + return nil } diff --git a/pkg/binder/binding/resourcereservation/resource_reservation.go b/pkg/binder/binding/resourcereservation/resource_reservation.go index 1ad3fa634..eca0dbe4e 100644 --- a/pkg/binder/binding/resourcereservation/resource_reservation.go +++ b/pkg/binder/binding/resourcereservation/resource_reservation.go @@ -225,7 +225,9 @@ func (rsc *service) syncForPods(ctx context.Context, pods []*v1.Pod, gpuGroupToS // informer cache has not yet propagated GPU group labels on recently-bound fraction pods. func (rsc *service) hasActiveBindRequestsForGpuGroup(ctx context.Context, gpuGroup string) (bool, error) { bindRequestList := &schedulingv1alpha2.BindRequestList{} - if err := rsc.kubeClient.List(ctx, bindRequestList); err != nil { + if err := rsc.kubeClient.List(ctx, bindRequestList, + client.MatchingFields{"spec.selectedGPUGroups": gpuGroup}, + ); err != nil { return false, fmt.Errorf("failed to list BindRequests: %w", err) } @@ -234,9 +236,7 @@ func (rsc *service) hasActiveBindRequestsForGpuGroup(ctx context.Context, gpuGro br.Status.Phase == schedulingv1alpha2.BindRequestPhaseFailed { continue } - if slices.Contains(br.Spec.SelectedGPUGroups, gpuGroup) { - return true, nil - } + return true, nil } return false, nil } diff --git a/pkg/binder/binding/resourcereservation/resource_reservation_test.go b/pkg/binder/binding/resourcereservation/resource_reservation_test.go index ad07b5fac..b1a5467d0 100644 --- a/pkg/binder/binding/resourcereservation/resource_reservation_test.go +++ b/pkg/binder/binding/resourcereservation/resource_reservation_test.go @@ -50,6 +50,11 @@ func nodeNameIndexer(rawObj runtimeClient.Object) []string { return []string{pod.Spec.NodeName} } +func selectedGPUGroupsIndexer(rawObj runtimeClient.Object) []string { + br := rawObj.(*schedulingv1alpha2.BindRequest) + return br.Spec.SelectedGPUGroups +} + func initializeTestService( client runtimeClient.WithWatch, ) *service { @@ -331,7 +336,7 @@ var _ = Describe("ResourceReservationService", func() { if testData.reservationPod != nil { podsInCluster = append(podsInCluster, testData.reservationPod) } - clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -398,7 +403,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -611,7 +616,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -869,7 +874,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -1357,7 +1362,8 @@ var _ = Describe("Race condition: reservation pod deleted during concurrent bind clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). WithRuntimeObjects(reservationPod.DeepCopy(), activeBindRequest). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() rsc := initializeTestService(clientWithObjs) err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) @@ -1371,7 +1377,7 @@ var _ = Describe("Race condition: reservation pod deleted during concurrent bind }) It("should delete reservation pod when no active BindRequests exist", func() { - clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(reservationPod.DeepCopy()).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(reservationPod.DeepCopy()).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() rsc := initializeTestService(clientWithObjs) err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) @@ -1402,7 +1408,8 @@ var _ = Describe("Race condition: reservation pod deleted during concurrent bind clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). WithRuntimeObjects(reservationPod.DeepCopy(), succeededBindRequest). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() rsc := initializeTestService(clientWithObjs) err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) @@ -1433,7 +1440,8 @@ var _ = Describe("Race condition: reservation pod deleted during concurrent bind clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). WithRuntimeObjects(reservationPod.DeepCopy(), failedBindRequest). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() rsc := initializeTestService(clientWithObjs) err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) @@ -1478,7 +1486,8 @@ var _ = Describe("Race condition: reservation pod deleted during concurrent bind clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). WithRuntimeObjects(reservationPod.DeepCopy(), activeBindRequestOtherGroup, activeBindRequest). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() rsc := initializeTestService(clientWithObjs) err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) diff --git a/pkg/binder/controllers/integration_tests/suite_test.go b/pkg/binder/controllers/integration_tests/suite_test.go index d3e6cc970..d7c6daccb 100644 --- a/pkg/binder/controllers/integration_tests/suite_test.go +++ b/pkg/binder/controllers/integration_tests/suite_test.go @@ -96,6 +96,15 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + err = k8sManager.GetFieldIndexer().IndexField( + context.Background(), &schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", + func(obj client.Object) []string { + br := obj.(*schedulingv1alpha2.BindRequest) + return br.Spec.SelectedGPUGroups + }, + ) + Expect(err).NotTo(HaveOccurred()) + params := &controllers.ReconcilerParams{ MaxConcurrentReconciles: 1, RateLimiterBaseDelaySeconds: 1, diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6fba75fb1..d5a4ea308 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -22,6 +22,7 @@ package cache import ( "context" "fmt" + "maps" "sync" "time" @@ -292,10 +293,7 @@ func (sc *SchedulerCache) createBindRequest(podInfo *pod_info.PodInfo, nodeName "selected-node": nodeName, } - // Merge with node pool params labels - for k, v := range sc.schedulingNodePoolParams.GetLabels() { - labels[k] = v - } + maps.Copy(labels, sc.schedulingNodePoolParams.GetLabels()) bindRequest := &schedulingv1alpha2.BindRequest{ ObjectMeta: metav1.ObjectMeta{