diff --git a/cmd/binder/app/app.go b/cmd/binder/app/app.go index 5e3e868c9..3a7e832cb 100644 --- a/cmd/binder/app/app.go +++ b/cmd/binder/app/app.go @@ -223,5 +223,16 @@ func createIndexesForResourceReservation(mgr manager.Manager) error { return err } + if err := mgr.GetFieldIndexer().IndexField( + context.Background(), &schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", + func(obj client.Object) []string { + br := obj.(*schedulingv1alpha2.BindRequest) + return br.Spec.SelectedGPUGroups + }, + ); err != nil { + setupLog.Error(err, "failed to create index for spec.selectedGPUGroups") + return err + } + return nil } diff --git a/docs/developer/designs/fix-reservation-pod-race/design.md b/docs/developer/designs/fix-reservation-pod-race/design.md new file mode 100644 index 000000000..05ddae90b --- /dev/null +++ b/docs/developer/designs/fix-reservation-pod-race/design.md @@ -0,0 +1,191 @@ +# Fix: Reservation Pod Premature Deletion Race Condition + +## Problem Statement + +The binder's resource reservation sync logic (`syncForPods`) prematurely deletes +GPU reservation pods when concurrent bind operations race with informer cache +propagation. This manifests as a flaky E2E test failure where reservation pods +disappear during the binding of fractional GPU pods to the same node. + +## Root Cause + +### The Deletion Logic + +`syncForPods` in `pkg/binder/binding/resourcereservation/resource_reservation.go` +decides to delete a reservation pod when it finds no "fraction pods" (user +workload pods in Running/Pending phase) for that GPU group: + +```go +for gpuGroup, reservationPod := range reservationPods { + if _, found := fractionPods[gpuGroup]; !found { + // DELETE reservation pod + } +} +``` + +The pod list comes from `syncForGpuGroupWithLock`, which queries the informer +cache using label selectors (`runai-gpu-group=`). + +### The Race + +There are 3 independent triggers that call `SyncForGpuGroup`: + +| Trigger | When | Location | +|---|---|---| +| `Binder.Bind()` → `SyncForNode()` | Start of pod binding | `binder.go:44` | +| `BindRequestReconciler.deleteHandler()` | BindRequest deleted | `bindrequest_controller.go:217` | +| `PodReconciler.syncReservationIfNeeded()` | Pod deleted/completed | `pod_controller.go:122` | + +With `MaxConcurrentReconciles=10`, multiple BindRequests are reconciled in +parallel. The race occurs as follows: + +1. **Thread A** (binding Pod 1 to Node A, GPU group X): + - `ReserveGpuDevice()` creates reservation pod for group X + - `updatePodGPUGroup()` patches Pod 1 with label `runai-gpu-group=X` + - The label patch is sent to the API server → API server persists it → + Watch event is generated → informer cache receives the event + +2. **Thread B** (binding Pod 2 to Node A, same GPU group X): + - `Bind()` calls `SyncForNode("NodeA")` + - `SyncForNode` lists pods on the node, finds the reservation pod (which + has `runai-gpu-group=X`), extracts group X + - Calls `SyncForGpuGroup("X")` → `syncForGpuGroupWithLock` + - Lists pods with label `runai-gpu-group=X` from **informer cache** + - **Cache lag**: Pod 1's label patch has not propagated to the cache yet + - Finds: reservation pod (in binder namespace) → `reservationPods["X"]` + - Does NOT find Pod 1 (label not in cache yet) → `fractionPods["X"]` is empty + - **Deletes the reservation pod** ← BUG + +The `gpuGroupMutex` provides per-group serialization but does NOT prevent this +race because Thread B's `SyncForNode` observes the reservation pod (just +created, already in cache) but not Pod 1's updated labels (patch not yet in cache). + +### Why the Cache Shows the Reservation Pod but Not the Label + +- The reservation pod is a **new object** (CREATE event) — informer receives + it quickly +- Pod 1's label is an **update to an existing object** (UPDATE/PATCH event) — + may be in a different event batch or processed after the CREATE +- The informer processes events sequentially per type, but CREATE and UPDATE + events for different objects can have different propagation times + +## Fix: Check Active BindRequests Before Deleting Reservation Pods + +### Approach + +Before deleting a reservation pod due to missing fraction pods, check if there +are any **active (non-succeeded, non-failed) BindRequests** that reference this +GPU group. If any exist, skip the deletion — a binding operation is in progress +and the fraction pod label hasn't propagated yet. + +### Why This Works + +The BindRequest lifecycle provides a durable intent signal: + +1. A BindRequest is created by the scheduler **before** the binder starts + labeling pods +2. A BindRequest is NOT deleted until **after** binding succeeds (and the + scheduler cleans it up) or permanently fails +3. BindRequests contain `SelectedGPUGroups` which identifies which GPU groups + are in-flight + +So during the cache lag window (reservation pod visible, pod label not visible), +the BindRequest is guaranteed to still exist. Checking for it prevents the +false-negative deletion. + +### Logic Change + +```go +// BEFORE (unsafe): +for gpuGroup, reservationPod := range reservationPods { + if _, found := fractionPods[gpuGroup]; !found { + deleteReservationPod(reservationPod) + } +} + +// AFTER (safe): +for gpuGroup, reservationPod := range reservationPods { + if _, found := fractionPods[gpuGroup]; !found { + if hasActiveBindRequestsForGpuGroup(ctx, gpuGroup) { + logger.Info("Skipping reservation pod deletion, active BindRequests exist", + "gpuGroup", gpuGroup) + continue + } + deleteReservationPod(reservationPod) + } +} +``` + +### Implementation Details + +1. **Add BindRequest listing capability to the resource reservation service**: + The `service` struct needs access to list BindRequests. Since it already has + `kubeClient client.WithWatch`, and the scheme includes `schedulingv1alpha2`, + we can list BindRequests directly using the same cached client. + +2. **Filter logic**: List all BindRequests, check if any have: + - `Status.Phase` is NOT `Succeeded` and NOT `Failed` (with exhausted retries) + - `Spec.SelectedGPUGroups` contains the GPU group in question + - `Spec.ReceivedResourceType` is `Fraction` (only fractional allocations use + GPU groups) + +3. **Pass the function/checker to `syncForPods`**: Either modify `syncForPods` + to accept a checker function, or have `syncForGpuGroupWithLock` perform the + check before calling `syncForPods`, or integrate it directly into + `syncForPods`. + +### Downsides / Considerations + +- **Slight cleanup delay**: If a BindRequest exists but binding has failed and + the BindRequest hasn't been cleaned up yet, the reservation pod lingers until + the next sync after cleanup. This is safe — just delayed cleanup. +- **Additional List call**: One cached List of BindRequests per sync. Since this + goes through the informer cache, it's cheap (no API server load). +- **BindRequest scheme registration**: The `kubeClient` used by the resource + reservation service must have the `schedulingv1alpha2` scheme registered. + This is already the case in production (see `cmd/binder/app/app.go`), but + needs verification in tests. + +## Files Modified + +- `pkg/binder/binding/resourcereservation/resource_reservation.go`: Added + `hasActiveBindRequestsForGpuGroup` check in `syncForPods` +- `pkg/binder/binding/resourcereservation/resource_reservation_test.go`: Registered + `schedulingv1alpha2` scheme so the fake client can list BindRequests +- `pkg/binder/controllers/integration_tests/reservation_race_test.go`: Integration + test reproducing the race with the full binder controller +- `pkg/env-tests/reservation_race_scale_test.go`: Scale envtest (4 nodes × 8 GPUs) + with mock device plugin goroutine +- `test/e2e/suites/allocate/resources/reservation_pod_race_specs.go`: E2E stress + test binding 32 fractional GPU pods concurrently + +## Test Plan + +### Integration Test (reservation_race_test.go) + +Full binder controller integration test in +`pkg/binder/controllers/integration_tests/reservation_race_test.go`: + +- Starts the real binder controller with `MaxConcurrentReconciles=10` +- Creates a node with 8 GPUs, a queue, and 32 fraction pods with BindRequests +- Lets the controller bind all pods concurrently, triggering the race window +- Verifies all 32 reservation pods survive (none prematurely deleted) + +### Scale EnvTest (reservation_race_scale_test.go) + +Scale reproduction test in `pkg/env-tests/reservation_race_scale_test.go`: + +- Runs the full binder controller autonomously (4 nodes × 8 GPUs = 32 groups) +- Includes a goroutine that simulates the GPU device plugin by patching + reservation pods with GPU index annotations and heartbeat timestamps +- Pre-creates shared-GPU ConfigMaps referenced by fraction pod annotations +- Verifies all 32 reservation pods survive concurrent binding + +### E2E Stress Test (reservation_pod_race_specs.go) + +Dedicated stress test in +`test/e2e/suites/allocate/resources/reservation_pod_race_specs.go`: + +- Submits 32 fractional GPU pods to a single node in a real cluster +- Waits for all pods to reach Running state +- Verifies all reservation pods remain present after binding completes diff --git a/pkg/binder/binding/resourcereservation/resource_reservation.go b/pkg/binder/binding/resourcereservation/resource_reservation.go index 163e88685..eca0dbe4e 100644 --- a/pkg/binder/binding/resourcereservation/resource_reservation.go +++ b/pkg/binder/binding/resourcereservation/resource_reservation.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" karpenterv1 "sigs.k8s.io/karpenter/pkg/apis/v1" + schedulingv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" "github.com/NVIDIA/KAI-scheduler/pkg/binder/binding/resourcereservation/group_mutex" "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" "github.com/NVIDIA/KAI-scheduler/pkg/common/resources" @@ -196,9 +197,20 @@ func (rsc *service) syncForPods(ctx context.Context, pods []*v1.Pod, gpuGroupToS for gpuGroup, reservationPod := range reservationPods { if _, found := fractionPods[gpuGroup]; !found { + hasActive, err := rsc.hasActiveBindRequestsForGpuGroup(ctx, gpuGroup) + if err != nil { + logger.Error(err, "Failed to check active BindRequests for gpu group", + "gpuGroup", gpuGroup) + return err + } + if hasActive { + logger.Info("Skipping reservation pod deletion, active BindRequests exist", + "gpuGroup", gpuGroup) + continue + } logger.Info("Did not find fraction pod for gpu group, deleting reservation pod", "gpuGroup", gpuGroup) - err := rsc.deleteReservationPod(ctx, reservationPod) + err = rsc.deleteReservationPod(ctx, reservationPod) if err != nil { return err } @@ -208,6 +220,26 @@ func (rsc *service) syncForPods(ctx context.Context, pods []*v1.Pod, gpuGroupToS return nil } +// hasActiveBindRequestsForGpuGroup checks if any non-terminal BindRequests reference +// the given GPU group. This prevents premature reservation pod deletion when the +// informer cache has not yet propagated GPU group labels on recently-bound fraction pods. +func (rsc *service) hasActiveBindRequestsForGpuGroup(ctx context.Context, gpuGroup string) (bool, error) { + bindRequestList := &schedulingv1alpha2.BindRequestList{} + if err := rsc.kubeClient.List(ctx, bindRequestList, + client.MatchingFields{"spec.selectedGPUGroups": gpuGroup}, + ); err != nil { + return false, fmt.Errorf("failed to list BindRequests: %w", err) + } + + for _, br := range bindRequestList.Items { + if br.Status.Phase == schedulingv1alpha2.BindRequestPhaseSucceeded || + br.Status.Phase == schedulingv1alpha2.BindRequestPhaseFailed { + continue + } + return true, nil + } + return false, nil +} func (rsc *service) ReserveGpuDevice(ctx context.Context, pod *v1.Pod, nodeName string, gpuGroup string) (string, error) { logger := log.FromContext(ctx) diff --git a/pkg/binder/binding/resourcereservation/resource_reservation_test.go b/pkg/binder/binding/resourcereservation/resource_reservation_test.go index 37fd20cc2..b1a5467d0 100644 --- a/pkg/binder/binding/resourcereservation/resource_reservation_test.go +++ b/pkg/binder/binding/resourcereservation/resource_reservation_test.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + schedulingv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" ) @@ -32,6 +33,13 @@ const ( scalingPodsNamespace = "kai-scale-adjust" ) +var testScheme = func() *runtime.Scheme { + s := runtime.NewScheme() + _ = v1.AddToScheme(s) + _ = schedulingv1alpha2.AddToScheme(s) + return s +}() + func TestResourceReservation(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "resource reservation") @@ -42,6 +50,11 @@ func nodeNameIndexer(rawObj runtimeClient.Object) []string { return []string{pod.Spec.NodeName} } +func selectedGPUGroupsIndexer(rawObj runtimeClient.Object) []string { + br := rawObj.(*schedulingv1alpha2.BindRequest) + return br.Spec.SelectedGPUGroups +} + func initializeTestService( client runtimeClient.WithWatch, ) *service { @@ -323,8 +336,7 @@ var _ = Describe("ResourceReservationService", func() { if testData.reservationPod != nil { podsInCluster = append(podsInCluster, testData.reservationPod) } - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(podsInCluster...). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -391,7 +403,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(testData.podsInCluster...).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -604,8 +616,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(testData.podsInCluster...). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -863,8 +874,7 @@ var _ = Describe("ResourceReservationService", func() { testName := testName testData := testData It(testName, func() { - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(testData.podsInCluster...). - WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(testData.podsInCluster...).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -891,7 +901,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "nvidia/kai-reservation:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: customRuntime, } @@ -1070,7 +1080,7 @@ var _ = Describe("ResourceReservationService", func() { testData := testData It(testName, func() { podsInCluster := []runtime.Object{testData.pod} - clientWithObjs := fake.NewClientBuilder().WithRuntimeObjects(podsInCluster...).Build() + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(podsInCluster...).Build() fakeClient := interceptor.NewClient(clientWithObjs, testData.clientInterceptFuncs) rsc := initializeTestService(fakeClient) @@ -1100,7 +1110,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: &v1.ResourceRequirements{ Requests: v1.ResourceList{ @@ -1140,7 +1150,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: nil, scalingPodNamespace: scalingPodsNamespace, @@ -1176,7 +1186,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: &v1.ResourceRequirements{ Requests: v1.ResourceList{ @@ -1214,7 +1224,7 @@ var _ = Describe("ResourceReservationService", func() { appLabelValue: "kai-reservation", serviceAccountName: "kai-sa", reservationPodImage: "test-image:latest", - kubeClient: fake.NewClientBuilder().Build(), + kubeClient: fake.NewClientBuilder().WithScheme(testScheme).Build(), runtimeClassName: "nvidia", podResources: &v1.ResourceRequirements{ Requests: v1.ResourceList{ @@ -1308,3 +1318,186 @@ func (w *FakeWatchPodClosed) ResultChan() <-chan watch.Event { func exampleMockWatchPodClosed() watch.Interface { return &FakeWatchPodClosed{} } + +var _ = Describe("Race condition: reservation pod deleted during concurrent binding", func() { + const ( + nodeName = "node-1" + gpuGroup = "gpu-group" + ) + var ( + groupLabels = map[string]string{ + constants.GPUGroup: gpuGroup, + } + reservationPod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gpu-reservation-node-1-abcde", + Namespace: resourceReservationNameSpace, + Labels: groupLabels, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + ) + + Context("SyncForGpuGroup with stale cache - no fraction pods visible", func() { + It("should preserve reservation pod when an active BindRequest exists for the gpu group", func() { + activeBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-1", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhasePending, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), activeBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(1), + "Reservation pod should be preserved when an active BindRequest exists") + }) + + It("should delete reservation pod when no active BindRequests exist", func() { + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme).WithRuntimeObjects(reservationPod.DeepCopy()).WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer).WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(0), + "Reservation pod should be deleted when no active BindRequests exist") + }) + + It("should delete reservation pod when only succeeded BindRequests exist", func() { + succeededBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-done", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhaseSucceeded, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), succeededBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(0), + "Reservation pod should be deleted when only terminal BindRequests exist") + }) + + It("should delete reservation pod when only failed BindRequests exist", func() { + failedBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-failed", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhaseFailed, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), failedBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(0), + "Reservation pod should be deleted when only terminal BindRequests exist") + }) + + It("should preserve reservation pod when BindRequest for different group is active but one for this group is active too", func() { + activeBindRequestOtherGroup := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-other", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "other-pod", + SelectedNode: nodeName, + SelectedGPUGroups: []string{"other-group"}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhasePending, + }, + } + activeBindRequest := &schedulingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bind-request-1", + Namespace: "team-a", + }, + Spec: schedulingv1alpha2.BindRequestSpec{ + PodName: "fraction-pod-1", + SelectedNode: nodeName, + SelectedGPUGroups: []string{gpuGroup}, + }, + Status: schedulingv1alpha2.BindRequestStatus{ + Phase: schedulingv1alpha2.BindRequestPhasePending, + }, + } + + clientWithObjs := fake.NewClientBuilder().WithScheme(testScheme). + WithRuntimeObjects(reservationPod.DeepCopy(), activeBindRequestOtherGroup, activeBindRequest). + WithIndex(&v1.Pod{}, "spec.nodeName", nodeNameIndexer). + WithIndex(&schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", selectedGPUGroupsIndexer).Build() + rsc := initializeTestService(clientWithObjs) + + err := rsc.SyncForGpuGroup(context.TODO(), gpuGroup) + Expect(err).To(Succeed()) + + pods := &v1.PodList{} + err = clientWithObjs.List(context.Background(), pods) + Expect(err).To(Succeed()) + Expect(len(pods.Items)).To(Equal(1), + "Reservation pod should be preserved when an active BindRequest exists") + }) + }) +}) diff --git a/pkg/binder/controllers/integration_tests/reservation_race_test.go b/pkg/binder/controllers/integration_tests/reservation_race_test.go new file mode 100644 index 000000000..e15d1a533 --- /dev/null +++ b/pkg/binder/controllers/integration_tests/reservation_race_test.go @@ -0,0 +1,161 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package integration_tests + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" + + scheudlingv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" + "github.com/NVIDIA/KAI-scheduler/pkg/binder/common" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// This test reproduces a race condition where the binder's resource reservation +// sync logic prematurely deletes GPU reservation pods during concurrent binding. +// +// The race: +// 1. Pod 1 binds to a GPU group, creating a reservation pod and getting a GPU +// group label. The label is written to the API server but may not have +// propagated to the informer cache yet. +// 2. A concurrent sync (triggered by Pod 2 binding, BindRequest deletion, or +// pod completion) calls SyncForGpuGroup, which lists pods by GPU group label +// from the cache. Due to cache lag, it doesn't see Pod 1's label. +// 3. The sync sees the reservation pod but no fraction pods → deletes the +// reservation pod. +// +// This test creates the preconditions (reservation pod + active BindRequest but +// no labeled fraction pod) and calls SyncForGpuGroup to demonstrate that the +// reservation pod is deleted. +var _ = Describe("Reservation pod race condition", Ordered, func() { + const ( + fracNamespaceName = "frac-test-ns" + fracNodeName = "frac-test-node" + gpuGroup = "node1-gpu0-group" + ) + + BeforeAll(func() { + ns := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: fracNamespaceName}, + } + Expect(k8sClient.Create(context.Background(), ns)).To(Succeed()) + + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: fracNodeName}, + } + Expect(k8sClient.Create(context.Background(), node)).To(Succeed()) + + reservationNs := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: resourceReservationNameSpace}, + } + err := k8sClient.Create(context.Background(), reservationNs) + if err != nil { + // Namespace may already exist from suite setup + Expect(client.IgnoreAlreadyExists(err)).To(Succeed()) + } + }) + + // This test verifies that SyncForGpuGroup preserves reservation pods when + // active BindRequests exist for the GPU group, even if no fraction pods are + // visible (simulating cache lag). + It("should preserve reservation pod when sync runs with active BindRequest and no visible fraction pods", func() { + ctx := context.Background() + + // Step 1: Create a reservation pod (as if ReserveGpuDevice just created it) + reservationPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("gpu-reservation-%s-test1", fracNodeName), + Namespace: resourceReservationNameSpace, + Labels: map[string]string{ + constants.AppLabelName: resourceReservationAppLabelValue, + constants.GPUGroup: gpuGroup, + }, + }, + Spec: v1.PodSpec{ + NodeName: fracNodeName, + ServiceAccountName: resourceReservationServiceAccount, + Containers: []v1.Container{ + { + Name: "resource-reservation", + Image: "test-image", + }, + }, + }, + } + Expect(k8sClient.Create(ctx, reservationPod)).To(Succeed()) + + // Step 2: Create a fraction pod WITHOUT the GPU group label. + // This simulates the state after ReserveGpuDevice created the + // reservation pod but the label patch on the fraction pod hasn't + // propagated to the cache yet. + fractionPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fraction-pod-1", + Namespace: fracNamespaceName, + // NOTE: No GPU group label - simulates cache lag + }, + Spec: v1.PodSpec{ + // NOTE: No NodeName - pod not yet bound, simulates in-flight binding + SchedulerName: "kai-scheduler", + Containers: []v1.Container{ + { + Name: "worker", + Image: "test-image", + }, + }, + }, + } + Expect(k8sClient.Create(ctx, fractionPod)).To(Succeed()) + + // Step 3: Create an active BindRequest for this GPU group. + // This represents the in-flight binding that is about to label the pod. + bindReq := &scheudlingv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "frac-bind-request-1", + Namespace: fracNamespaceName, + }, + Spec: scheudlingv1alpha2.BindRequestSpec{ + PodName: fractionPod.Name, + SelectedNode: fracNodeName, + ReceivedResourceType: common.ReceivedTypeFraction, + SelectedGPUGroups: []string{gpuGroup}, + ReceivedGPU: &scheudlingv1alpha2.ReceivedGPU{Count: 1, Portion: "0.5"}, + }, + } + Expect(k8sClient.Create(ctx, bindReq)).To(Succeed()) + + // Step 4: Call SyncForGpuGroup - this simulates what happens when + // a concurrent binding triggers a sync. + err := rrs.SyncForGpuGroup(ctx, gpuGroup) + Expect(err).To(Succeed()) + + // Step 5: Check if the reservation pod survived. + // After fix: reservation pod should be preserved because an active + // BindRequest exists for this GPU group. + resultPod := &v1.Pod{} + err = k8sClient.Get(ctx, client.ObjectKeyFromObject(reservationPod), resultPod) + Expect(err).NotTo(HaveOccurred(), + "Reservation pod should be preserved when an active BindRequest exists") + + // Cleanup + _ = k8sClient.Delete(ctx, fractionPod) + _ = k8sClient.Delete(ctx, bindReq) + _ = k8sClient.Delete(ctx, reservationPod) + }) + + AfterAll(func() { + ctx := context.Background() + _ = k8sClient.DeleteAllOf(ctx, &v1.Pod{}, client.InNamespace(fracNamespaceName)) + _ = k8sClient.DeleteAllOf(ctx, &v1.Pod{}, client.InNamespace(resourceReservationNameSpace)) + _ = k8sClient.DeleteAllOf(ctx, &scheudlingv1alpha2.BindRequest{}, client.InNamespace(fracNamespaceName)) + _ = k8sClient.Delete(ctx, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: fracNodeName}}) + }) +}) diff --git a/pkg/binder/controllers/integration_tests/suite_test.go b/pkg/binder/controllers/integration_tests/suite_test.go index 517ab7a88..d7c6daccb 100644 --- a/pkg/binder/controllers/integration_tests/suite_test.go +++ b/pkg/binder/controllers/integration_tests/suite_test.go @@ -53,6 +53,7 @@ var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment var k8sManager ctrl.Manager +var rrs resourcereservation.Interface func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -95,6 +96,15 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + err = k8sManager.GetFieldIndexer().IndexField( + context.Background(), &schedulingv1alpha2.BindRequest{}, "spec.selectedGPUGroups", + func(obj client.Object) []string { + br := obj.(*schedulingv1alpha2.BindRequest) + return br.Spec.SelectedGPUGroups + }, + ) + Expect(err).NotTo(HaveOccurred()) + params := &controllers.ReconcilerParams{ MaxConcurrentReconciles: 1, RateLimiterBaseDelaySeconds: 1, @@ -116,7 +126,7 @@ var _ = BeforeSuite(func() { clientWithWatch, err := client.NewWithWatch(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) - rrs := resourcereservation.NewService(false, clientWithWatch, "", 40*time.Second, + rrs = resourcereservation.NewService(false, clientWithWatch, "", 40*time.Second, resourceReservationNameSpace, resourceReservationServiceAccount, resourceReservationAppLabelValue, scalingPodsNamespace, constants.DefaultRuntimeClassName, nil) // nil podResources to use defaults podBinder := binding.NewBinder(k8sManager.GetClient(), rrs, binderPlugins) diff --git a/pkg/env-tests/reservation_race_scale_test.go b/pkg/env-tests/reservation_race_scale_test.go new file mode 100644 index 000000000..727467425 --- /dev/null +++ b/pkg/env-tests/reservation_race_scale_test.go @@ -0,0 +1,328 @@ +// Copyright 2025 NVIDIA CORPORATION +// SPDX-License-Identifier: Apache-2.0 + +package env_tests + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/xyproto/randomstring" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kaiv1alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v1alpha2" + schedulingv2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/pkg/env-tests/binder" + "github.com/NVIDIA/KAI-scheduler/pkg/env-tests/utils" +) + +// This test reproduces a race condition in the binder's resource reservation +// sync logic at scale (4 nodes × 8 GPUs = 32 GPU groups). The full binder +// controller runs autonomously; the test only creates Kubernetes objects and +// observes outcomes. +// +// The race: +// 1. A fraction pod binds, creating a reservation pod. The binder patches the +// fraction pod with a GPU group label, but the informer cache may lag. +// 2. A concurrent SyncForGpuGroup (triggered by another BindRequest reconcile +// or deletion) lists pods by GPU group label from the cache. +// 3. Due to cache lag, the sync sees the reservation pod but no fraction pods +// for that group → deletes the reservation pod prematurely. +// +// The test pre-creates reservation pods (as if ReserveGpuDevice placed them) +// and fraction BindRequests. Because envtest has no GPU device plugin, the +// BindRequest reconciliation will fail to fully bind the fractional pods (no +// GPU index annotation on reservation pods). However, the reconciler still +// calls SyncForNode at the start of each reconcile, which triggers the +// problematic reservation pod cleanup path. +var _ = Describe("Reservation pod race at scale", Ordered, func() { + const ( + numNodes = 4 + gpusPerNode = 8 + + gpuIndexAnnotationKey = "run.ai/reserve_for_gpu_index" + gpuIndexHeartbeatKey = "run.ai/test-annotator-heartbeat" + annotatorPollInterval = 100 * time.Millisecond + annotatorGPUIndexValue = "0" + ) + + var ( + testNamespace *corev1.Namespace + testDepartment *schedulingv2.Queue + testQueue *schedulingv2.Queue + nodes []*corev1.Node + + backgroundCtx context.Context + cancel context.CancelFunc + ) + + type gpuGroupState struct { + gpuGroup string + nodeName string + reservationPod *corev1.Pod + fractionPod *corev1.Pod + bindRequest *kaiv1alpha2.BindRequest + } + + var groups []gpuGroupState + + startReservationPodAnnotator := func(ctx context.Context, c client.Client) { + ticker := time.NewTicker(annotatorPollInterval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var reservationPods corev1.PodList + err := c.List(ctx, &reservationPods, + client.InNamespace(constants.DefaultResourceReservationName), + client.HasLabels{constants.GPUGroup}, + ) + if err != nil { + GinkgoWriter.Printf("reservation pod annotator list error: %v\n", err) + continue + } + + for i := range reservationPods.Items { + pod := reservationPods.Items[i] + updated := pod.DeepCopy() + if updated.Annotations == nil { + updated.Annotations = map[string]string{} + } + + needsPatch := false + if updated.Annotations[gpuIndexAnnotationKey] == "" { + updated.Annotations[gpuIndexAnnotationKey] = annotatorGPUIndexValue + needsPatch = true + } else { + updated.Annotations[gpuIndexHeartbeatKey] = time.Now().UTC().Format(time.RFC3339Nano) + needsPatch = true + } + + if needsPatch { + if err := c.Patch(ctx, updated, client.MergeFrom(&pod)); err != nil { + GinkgoWriter.Printf("reservation pod annotator patch error: %v\n", err) + } + } + } + } + } + }() + } + + BeforeEach(func(ctx context.Context) { + testNamespace = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-" + randomstring.HumanFriendlyEnglishString(10), + }, + } + Expect(ctrlClient.Create(ctx, testNamespace)).To(Succeed()) + + testDepartment = utils.CreateQueueObject("test-department", "") + Expect(ctrlClient.Create(ctx, testDepartment)).To(Succeed()) + + testQueue = utils.CreateQueueObject("test-queue", testDepartment.Name) + Expect(ctrlClient.Create(ctx, testQueue)).To(Succeed()) + + nodes = make([]*corev1.Node, numNodes) + for i := range numNodes { + nodeCfg := utils.DefaultNodeConfig(fmt.Sprintf("scale-node-%d", i)) + nodeCfg.GPUs = gpusPerNode + nodes[i] = utils.CreateNodeObject(ctx, ctrlClient, nodeCfg) + Expect(ctrlClient.Create(ctx, nodes[i])).To(Succeed()) + } + + // Ensure the reservation namespace exists + reservationNs := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: constants.DefaultResourceReservationName}, + } + err := ctrlClient.Create(ctx, reservationNs) + if err != nil { + Expect(client.IgnoreAlreadyExists(err)).To(Succeed()) + } + + backgroundCtx, cancel = context.WithCancel(context.Background()) + + // Build GPU group state for all nodes × GPUs + groups = make([]gpuGroupState, 0, numNodes*gpusPerNode) + for nodeIdx := range numNodes { + for gpuIdx := range gpusPerNode { + gpuGroup := fmt.Sprintf("%s-gpu%d-group", nodes[nodeIdx].Name, gpuIdx) + groups = append(groups, gpuGroupState{ + gpuGroup: gpuGroup, + nodeName: nodes[nodeIdx].Name, + }) + } + } + }) + + AfterEach(func(ctx context.Context) { + cancel() + + // Clean up reservation pods + _ = ctrlClient.DeleteAllOf(ctx, &corev1.Pod{}, + client.InNamespace(constants.DefaultResourceReservationName)) + + // Clean up test resources + if testNamespace != nil { + err := utils.DeleteAllInNamespace(ctx, ctrlClient, testNamespace.Name, + &corev1.Pod{}, + &corev1.ConfigMap{}, + &kaiv1alpha2.BindRequest{}, + ) + Expect(err).NotTo(HaveOccurred()) + } + + for _, node := range nodes { + _ = ctrlClient.Delete(ctx, node) + } + if testQueue != nil { + _ = ctrlClient.Delete(ctx, testQueue) + } + if testDepartment != nil { + _ = ctrlClient.Delete(ctx, testDepartment) + } + }) + + // Pre-create reservation pods and fraction BindRequests, then let the + // binder controller run. The controller's SyncForNode (called at the start + // of each BindRequest reconcile) will discover reservation pods with no + // matching labeled fraction pods and delete them. + It("should preserve reservation pods when active BindRequests exist", func(ctx context.Context) { + // Step 1: Pre-create reservation pods for all 32 GPU groups. + // These simulate the state after ReserveGpuDevice created them. + for i := range groups { + g := &groups[i] + g.reservationPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("gpu-reservation-%s-%d", g.nodeName, i), + Namespace: constants.DefaultResourceReservationName, + Labels: map[string]string{ + constants.AppLabelName: constants.DefaultResourceReservationName, + constants.GPUGroup: g.gpuGroup, + }, + }, + Spec: corev1.PodSpec{ + NodeName: g.nodeName, + ServiceAccountName: constants.DefaultResourceReservationName, + Containers: []corev1.Container{ + {Name: "resource-reservation", Image: "test-image"}, + }, + }, + } + Expect(ctrlClient.Create(ctx, g.reservationPod)).To(Succeed()) + } + + // Step 2: Create fraction pods WITHOUT GPU group labels. + // This simulates the window where ReserveGpuDevice has created the + // reservation pod but hasn't yet labeled the fraction pod. + for i := range groups { + g := &groups[i] + g.fractionPod = utils.CreatePodObject( + testNamespace.Name, + fmt.Sprintf("fraction-pod-%d", i), + corev1.ResourceRequirements{}, + ) + if g.fractionPod.Annotations == nil { + g.fractionPod.Annotations = map[string]string{} + } + g.fractionPod.Annotations[constants.GpuSharingConfigMapAnnotation] = + fmt.Sprintf("%s-shared-gpu", g.fractionPod.Name) + Expect(ctrlClient.Create(ctx, g.fractionPod)).To(Succeed()) + + configMapPrefix := g.fractionPod.Annotations[constants.GpuSharingConfigMapAnnotation] + capabilitiesConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0", configMapPrefix), + Namespace: testNamespace.Name, + }, + Data: map[string]string{}, + } + Expect(ctrlClient.Create(ctx, capabilitiesConfigMap)).To(Succeed()) + + directEnvConfigMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0-evar", configMapPrefix), + Namespace: testNamespace.Name, + }, + Data: map[string]string{}, + } + Expect(ctrlClient.Create(ctx, directEnvConfigMap)).To(Succeed()) + } + + // Step 3: Create BindRequests for all fraction pods. + // The binder will reconcile these, calling SyncForNode which + // triggers the reservation pod cleanup logic. + for i := range groups { + g := &groups[i] + g.bindRequest = &kaiv1alpha2.BindRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("scale-bind-%d", i), + Namespace: testNamespace.Name, + }, + Spec: kaiv1alpha2.BindRequestSpec{ + PodName: g.fractionPod.Name, + SelectedNode: g.nodeName, + ReceivedResourceType: "Fraction", + SelectedGPUGroups: []string{g.gpuGroup}, + ReceivedGPU: &kaiv1alpha2.ReceivedGPU{ + Count: 1, + Portion: "0.5", + }, + }, + } + Expect(ctrlClient.Create(ctx, g.bindRequest)).To(Succeed()) + } + + startReservationPodAnnotator(backgroundCtx, ctrlClient) + err := binder.RunBinder(cfg, backgroundCtx) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(1 * time.Second) + + // Step 5: Check how many reservation pods survived. + var reservationPods corev1.PodList + Expect(ctrlClient.List(ctx, &reservationPods, + client.InNamespace(constants.DefaultResourceReservationName), + client.HasLabels{constants.GPUGroup}, + )).To(Succeed()) + + survivedCount := len(reservationPods.Items) + totalGroups := len(groups) + + GinkgoWriter.Printf("\n=== Scale Race Results ===\n") + GinkgoWriter.Printf("Total GPU groups: %d\n", totalGroups) + GinkgoWriter.Printf("Reservation pods survived: %d\n", survivedCount) + GinkgoWriter.Printf("Reservation pods deleted: %d\n", totalGroups-survivedCount) + + if survivedCount < totalGroups { + deletedGroups := []string{} + survivingGroups := map[string]bool{} + for _, pod := range reservationPods.Items { + survivingGroups[pod.Labels[constants.GPUGroup]] = true + } + for _, g := range groups { + if !survivingGroups[g.gpuGroup] { + deletedGroups = append(deletedGroups, g.gpuGroup) + } + } + GinkgoWriter.Printf("Deleted groups: %v\n", deletedGroups) + } + + // Without the fix: SyncForNode sees reservation pods but no labeled + // fraction pods (cache lag simulation) → deletes reservation pods. + // With the fix: hasActiveBindRequestsForGpuGroup prevents deletion. + Expect(survivedCount).To(Equal(totalGroups), + "Expected all %d reservation pods to survive, but %d were deleted. "+ + "This indicates the race condition where SyncForGpuGroup deletes "+ + "reservation pods despite active BindRequests.", + totalGroups, totalGroups-survivedCount) + }) +}) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6fba75fb1..d5a4ea308 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -22,6 +22,7 @@ package cache import ( "context" "fmt" + "maps" "sync" "time" @@ -292,10 +293,7 @@ func (sc *SchedulerCache) createBindRequest(podInfo *pod_info.PodInfo, nodeName "selected-node": nodeName, } - // Merge with node pool params labels - for k, v := range sc.schedulingNodePoolParams.GetLabels() { - labels[k] = v - } + maps.Copy(labels, sc.schedulingNodePoolParams.GetLabels()) bindRequest := &schedulingv1alpha2.BindRequest{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/e2e/suites/allocate/resources/reservation_pod_race_specs.go b/test/e2e/suites/allocate/resources/reservation_pod_race_specs.go new file mode 100644 index 000000000..fd03601db --- /dev/null +++ b/test/e2e/suites/allocate/resources/reservation_pod_race_specs.go @@ -0,0 +1,176 @@ +/* +Copyright 2025 NVIDIA CORPORATION +SPDX-License-Identifier: Apache-2.0 +*/ +package resources + +import ( + "context" + "fmt" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + + v2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2" + "github.com/NVIDIA/KAI-scheduler/pkg/common/constants" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/configurations/feature_flags" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/constant/labels" + testcontext "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/context" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/capacity" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/resources/rd/queue" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/testconfig" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/utils" + "github.com/NVIDIA/KAI-scheduler/test/e2e/modules/wait" +) + +const numFractionRaceRounds = 5 + +// DescribeReservationPodRaceSpecs tests for a race condition in the binder's +// resource reservation sync logic. When many 0.5 fraction pods bind +// concurrently under binpack mode, the informer cache may lag behind the API +// server. This causes SyncForGpuGroup to see a reservation pod but no fraction +// pod for a GPU group, leading to premature reservation pod deletion and stuck +// workloads. +func DescribeReservationPodRaceSpecs() bool { + return Describe("Reservation pod race under concurrent fraction binding", + Label(labels.ReservationPod, labels.Operated), Ordered, func() { + var ( + testCtx *testcontext.TestContext + ) + + BeforeAll(func(ctx context.Context) { + testCtx = testcontext.GetConnectivity(ctx, Default) + capacity.SkipIfInsufficientClusterResources(testCtx.KubeClientset, + &capacity.ResourceList{ + Gpu: resource.MustParse("4"), + PodCount: 16, + }, + ) + + parentQueue := queue.CreateQueueObject(utils.GenerateRandomK8sName(10), "") + childQueue := queue.CreateQueueObject(utils.GenerateRandomK8sName(10), parentQueue.Name) + testCtx.InitQueues([]*v2.Queue{childQueue, parentQueue}) + + Expect(feature_flags.SetPlacementStrategy(ctx, testCtx, "binpack")).To(Succeed(), + "Failed to set binpack placement strategy") + }) + + AfterAll(func(ctx context.Context) { + err := feature_flags.SetPlacementStrategy(ctx, testCtx, "binpack") + if err != nil { + fmt.Printf("Warning: failed to restore placement strategy: %v\n", err) + } + testCtx.ClusterCleanup(ctx) + }) + + AfterEach(func(ctx context.Context) { + testCtx.TestContextCleanup(ctx) + }) + + // Creates many pairs of 0.5 fraction pods concurrently across + // multiple rounds. Under binpack, each pair targets the same GPU. + // High concurrency increases the chance of triggering the race + // where a reservation pod is prematurely deleted due to cache lag. + It("should schedule all fraction pod pairs without losing reservation pods", func(ctx context.Context) { + resources, err := capacity.GetClusterAllocatableResources(testCtx.KubeClientset) + Expect(err).NotTo(HaveOccurred()) + numGPUs := int(resources.Gpu.Value()) + + for round := range numFractionRaceRounds { + numPods := numGPUs * 2 + By(fmt.Sprintf("Round %d/%d: creating %d fraction pods (0.5 each, 2 per GPU)", + round+1, numFractionRaceRounds, numPods)) + + pods := make([]*v1.Pod, numPods) + for i := range numPods { + pods[i] = rd.CreatePodObject(testCtx.Queues[0], v1.ResourceRequirements{}) + pods[i].Annotations = map[string]string{ + constants.GpuFraction: "0.5", + } + } + + errs := make(chan error, len(pods)) + var wg sync.WaitGroup + for _, pod := range pods { + wg.Add(1) + go func() { + defer wg.Done() + _, err := rd.CreatePod(ctx, testCtx.KubeClientset, pod) + errs <- err + }() + } + wg.Wait() + close(errs) + for err := range errs { + Expect(err).NotTo(HaveOccurred(), "Round %d: failed to create pod", round+1) + } + + namespace := pods[0].Namespace + wait.ForPodsReady(ctx, testCtx.ControllerClient, namespace, pods) + + By(fmt.Sprintf("Round %d/%d: verifying scheduling and reservation pods", round+1, numFractionRaceRounds)) + var podList v1.PodList + Expect(testCtx.ControllerClient.List(ctx, &podList, + runtimeClient.InNamespace(namespace), + runtimeClient.MatchingLabels{constants.AppLabelName: "engine-e2e"}, + )).To(Succeed()) + + scheduledCount := 0 + gpuGroups := map[string]int{} + for _, pod := range podList.Items { + if !rd.IsPodScheduled(&pod) { + continue + } + scheduledCount++ + group, ok := pod.Labels[constants.GPUGroup] + Expect(ok).To(BeTrue(), + "Round %d: pod %s should have GPU group label", round+1, pod.Name) + gpuGroups[group]++ + } + + Expect(scheduledCount).To(Equal(numPods), + "Round %d: expected %d pods scheduled, got %d", round+1, numPods, scheduledCount) + + for group, count := range gpuGroups { + Expect(count).To(Equal(2), + "Round %d: GPU group %s should have 2 pods, got %d", round+1, group, count) + } + + // Verify reservation pods exist for each active GPU group. + // The race causes premature deletion of reservation pods. + reservationNamespace := testconfig.GetConfig().ReservationNamespace + var reservationPods v1.PodList + Expect(testCtx.ControllerClient.List(ctx, &reservationPods, + runtimeClient.InNamespace(reservationNamespace), + )).To(Succeed()) + + reservationGroups := map[string]bool{} + for _, rPod := range reservationPods.Items { + group := rPod.Labels[constants.GPUGroup] + if group != "" { + reservationGroups[group] = true + } + } + + for group := range gpuGroups { + Expect(reservationGroups).To(HaveKey(group), + "Round %d: reservation pod missing for GPU group %s", round+1, group) + } + + // Clean up between rounds + By(fmt.Sprintf("Round %d/%d: cleaning up", round+1, numFractionRaceRounds)) + Expect(rd.DeleteAllPodsInNamespace(ctx, testCtx.ControllerClient, namespace)).To(Succeed()) + Expect(rd.DeleteAllConfigMapsInNamespace(ctx, testCtx.ControllerClient, namespace)).To(Succeed()) + wait.ForNoE2EPods(ctx, testCtx.ControllerClient) + wait.ForNoReservationPods(ctx, testCtx.ControllerClient) + time.Sleep(2 * time.Second) + } + }) + }) +} diff --git a/test/e2e/suites/allocate/resources/resources_suite_test.go b/test/e2e/suites/allocate/resources/resources_suite_test.go index 24f257b6e..41c57518f 100644 --- a/test/e2e/suites/allocate/resources/resources_suite_test.go +++ b/test/e2e/suites/allocate/resources/resources_suite_test.go @@ -14,6 +14,7 @@ import ( ) var _ = DescribeResourcesSpecs() +var _ = DescribeReservationPodRaceSpecs() func TestResourcesAllocation(t *testing.T) { utils.SetLogger()