Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 63 additions & 21 deletions internal/controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,37 +241,79 @@ func (r *RuleReadinessController) hasTaintBySpec(node *corev1.Node, taintSpec co
}

// addTaintBySpec adds a taint to a node.
// We use client.MergeFromWithOptimisticLock because patching a list with a
// JSON merge patch can cause races due to the fact that it fully replaces
// the list on a change. Optimistic locking ensures the patch fails with a
// conflict error if the node was modified concurrently, allowing the
// controller to retry with fresh state.
func (r *RuleReadinessController) addTaintBySpec(ctx context.Context, node *corev1.Node, taintSpec corev1.Taint, ruleName string) error {
patch := client.StrategicMergeFrom(node.DeepCopy())
node.Spec.Taints = append(node.Spec.Taints, taintSpec)
if err := r.Patch(ctx, node, patch); err != nil {
return err
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Fetch latest node state
latestNode := &corev1.Node{}
if err := r.Get(ctx, client.ObjectKey{Name: node.Name}, latestNode); err != nil {
return err
}

// Check if taint already exists
if r.hasTaintBySpec(latestNode, taintSpec) {
return nil
}

stored := latestNode.DeepCopy()
latestNode.Spec.Taints = append(latestNode.Spec.Taints, taintSpec)
if err := r.Patch(ctx, latestNode, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return err
}

message := fmt.Sprintf("Taint '%s:%s' added by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
r.EventRecorder.Event(latestNode, corev1.EventTypeNormal, "TaintAdded", message)

message := fmt.Sprintf("Taint '%s:%s' added by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
r.EventRecorder.Event(node, corev1.EventTypeNormal, "TaintAdded", message)
// Update the original node reference with the latest state
*node = *latestNode

return nil
return nil
})
}

// removeTaintBySpec removes a taint from a node.
// We use client.MergeFromWithOptimisticLock because patching a list with a
// JSON merge patch can cause races due to the fact that it fully replaces
// the list on a change. Optimistic locking ensures the patch fails with a
// conflict error if the node was modified concurrently, allowing the
// controller to retry with fresh state.
func (r *RuleReadinessController) removeTaintBySpec(ctx context.Context, node *corev1.Node, taintSpec corev1.Taint, ruleName string) error {
patch := client.StrategicMergeFrom(node.DeepCopy())
var newTaints []corev1.Taint
for _, taint := range node.Spec.Taints {
if taint.Key != taintSpec.Key || taint.Effect != taintSpec.Effect {
newTaints = append(newTaints, taint)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Fetch latest node state
latestNode := &corev1.Node{}
if err := r.Get(ctx, client.ObjectKey{Name: node.Name}, latestNode); err != nil {
return err
}
}
node.Spec.Taints = newTaints
if err := r.Patch(ctx, node, patch); err != nil {
return err
}

message := fmt.Sprintf("Taint '%s:%s' removed by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
r.EventRecorder.Event(node, corev1.EventTypeNormal, "TaintRemoved", message)
// Check if taint is already absent
if !r.hasTaintBySpec(latestNode, taintSpec) {
return nil
}

return nil
stored := latestNode.DeepCopy()
var newTaints []corev1.Taint
for _, taint := range latestNode.Spec.Taints {
if taint.Key != taintSpec.Key || taint.Effect != taintSpec.Effect {
newTaints = append(newTaints, taint)
}
}
latestNode.Spec.Taints = newTaints
if err := r.Patch(ctx, latestNode, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return err
}

message := fmt.Sprintf("Taint '%s:%s' removed by rule '%s'", taintSpec.Key, taintSpec.Effect, ruleName)
r.EventRecorder.Event(latestNode, corev1.EventTypeNormal, "TaintRemoved", message)

// Update the original node reference with the latest state
*node = *latestNode

return nil
})
}

// Bootstrap completion tracking.
Expand Down
248 changes: 248 additions & 0 deletions internal/controller/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ package controller

import (
"context"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

nodereadinessiov1alpha1 "sigs.k8s.io/node-readiness-controller/api/v1alpha1"
Expand Down Expand Up @@ -689,4 +694,247 @@ var _ = Describe("Node Controller", func() {
}, time.Second*5).Should(BeTrue(), "NodeEvaluation should be updated with new condition and taint status")
})
})

// These tests use the controller-runtime fake client (not envtest's
// k8sClient) with interceptors to simulate concurrent node modifications.
// The fake client enforces resourceVersion checks, so when
// MergeFromWithOptimisticLock is used and another write bumps the
// resourceVersion, the patch fails with a Conflict error — the same
// behavior a real API server would produce.
Context("optimistic locking on taint operations", func() {
var (
ctx context.Context
testScheme *runtime.Scheme
)

BeforeEach(func() {
ctx = context.Background()
testScheme = runtime.NewScheme()
Expect(corev1.AddToScheme(testScheme)).To(Succeed())
})

It("should retry and succeed when removeTaintBySpec encounters a conflict", func() {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "ol-remove-conflict"},
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{Key: "readiness.k8s.io/test", Effect: corev1.TaintEffectNoSchedule},
{Key: "other-controller/taint", Effect: corev1.TaintEffectNoSchedule},
},
},
}

var patchCount atomic.Int32

// The interceptor simulates a concurrent modification: on the
// first Patch call it updates the node (bumping resourceVersion)
// before delegating to the real Patch. Because
// MergeFromWithOptimisticLock embeds the original resourceVersion,
// the fake client detects the mismatch and returns a Conflict.
// The retry logic should handle this and succeed on the second attempt.
fc := fakeclient.NewClientBuilder().
WithScheme(testScheme).
WithObjects(node).
WithInterceptorFuncs(interceptor.Funcs{
Patch: func(ctx context.Context, c client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if obj.GetName() == "ol-remove-conflict" && patchCount.Add(1) == 1 {
// Simulate concurrent modification by another controller.
current := &corev1.Node{}
Expect(c.Get(ctx, types.NamespacedName{Name: obj.GetName()}, current)).To(Succeed())
current.Spec.Taints = append(current.Spec.Taints, corev1.Taint{
Key: "concurrent-controller/new-taint", Effect: corev1.TaintEffectNoSchedule,
})
Expect(c.Update(ctx, current)).To(Succeed())
}
return c.Patch(ctx, obj, patch, opts...)
},
}).
Build()

controller := &RuleReadinessController{
Client: fc,
Scheme: testScheme,
clientset: fake.NewSimpleClientset(),
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
EventRecorder: record.NewFakeRecorder(10),
}

Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())

err := controller.removeTaintBySpec(ctx, node, corev1.Taint{
Key: "readiness.k8s.io/test",
Effect: corev1.TaintEffectNoSchedule,
}, "test-rule")

// Should succeed after retry
Expect(err).NotTo(HaveOccurred())

// Verify the taint was removed and concurrent modification was preserved
updated := &corev1.Node{}
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, updated)).To(Succeed())
Expect(updated.Spec.Taints).To(HaveLen(2))

// Check that our taint was removed but the others remain
taintKeys := make(map[string]bool)
for _, taint := range updated.Spec.Taints {
taintKeys[taint.Key] = true
}
Expect(taintKeys).NotTo(HaveKey("readiness.k8s.io/test"))
Expect(taintKeys).To(HaveKey("other-controller/taint"))
Expect(taintKeys).To(HaveKey("concurrent-controller/new-taint"))

// Verify that the patch was attempted twice (first failed, second succeeded)
Expect(patchCount.Load()).To(BeNumerically(">=", 2))
})

It("should retry and succeed when addTaintBySpec encounters a conflict", func() {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "ol-add-conflict"},
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{Key: "other-controller/taint", Effect: corev1.TaintEffectNoSchedule},
},
},
}

var patchCount atomic.Int32

// The interceptor simulates a concurrent modification on the first
// patch attempt, which should trigger a retry that succeeds.
fc := fakeclient.NewClientBuilder().
WithScheme(testScheme).
WithObjects(node).
WithInterceptorFuncs(interceptor.Funcs{
Patch: func(ctx context.Context, c client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if obj.GetName() == "ol-add-conflict" && patchCount.Add(1) == 1 {
current := &corev1.Node{}
Expect(c.Get(ctx, types.NamespacedName{Name: obj.GetName()}, current)).To(Succeed())
current.Spec.Taints = append(current.Spec.Taints, corev1.Taint{
Key: "concurrent-controller/new-taint", Effect: corev1.TaintEffectNoSchedule,
})
Expect(c.Update(ctx, current)).To(Succeed())
}
return c.Patch(ctx, obj, patch, opts...)
},
}).
Build()

controller := &RuleReadinessController{
Client: fc,
Scheme: testScheme,
clientset: fake.NewSimpleClientset(),
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
EventRecorder: record.NewFakeRecorder(10),
}

Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())

err := controller.addTaintBySpec(ctx, node, corev1.Taint{
Key: "readiness.k8s.io/test",
Effect: corev1.TaintEffectNoSchedule,
}, "test-rule")

// Should succeed after retry
Expect(err).NotTo(HaveOccurred())

// Verify both taints are present (ours and the concurrent one)
updated := &corev1.Node{}
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, updated)).To(Succeed())
Expect(updated.Spec.Taints).To(HaveLen(3))

// Check that all expected taints are present
taintKeys := make(map[string]bool)
for _, taint := range updated.Spec.Taints {
taintKeys[taint.Key] = true
}
Expect(taintKeys).To(HaveKey("readiness.k8s.io/test"))
Expect(taintKeys).To(HaveKey("other-controller/taint"))
Expect(taintKeys).To(HaveKey("concurrent-controller/new-taint"))

// Verify that the patch was attempted twice (first failed, second succeeded)
Expect(patchCount.Load()).To(BeNumerically(">=", 2))
})

It("should succeed when no concurrent modification occurs", func() {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "ol-no-conflict"},
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{Key: "readiness.k8s.io/test", Effect: corev1.TaintEffectNoSchedule},
{Key: "other/taint", Effect: corev1.TaintEffectNoSchedule},
},
},
}

fc := fakeclient.NewClientBuilder().
WithScheme(testScheme).
WithObjects(node).
Build()

controller := &RuleReadinessController{
Client: fc,
Scheme: testScheme,
clientset: fake.NewSimpleClientset(),
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
EventRecorder: record.NewFakeRecorder(10),
}

Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())

err := controller.removeTaintBySpec(ctx, node, corev1.Taint{
Key: "readiness.k8s.io/test",
Effect: corev1.TaintEffectNoSchedule,
}, "test-rule")
Expect(err).NotTo(HaveOccurred())

updated := &corev1.Node{}
Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, updated)).To(Succeed())
Expect(updated.Spec.Taints).To(HaveLen(1))
Expect(updated.Spec.Taints[0].Key).To(Equal("other/taint"))
})

It("should skip patch when removing a taint that does not exist", func() {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: "ol-noop"},
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{Key: "other/taint", Effect: corev1.TaintEffectNoSchedule},
},
},
}

var patchCalled atomic.Bool

fc := fakeclient.NewClientBuilder().
WithScheme(testScheme).
WithObjects(node).
WithInterceptorFuncs(interceptor.Funcs{
Patch: func(ctx context.Context, c client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if obj.GetName() == "ol-noop" {
patchCalled.Store(true)
}
return c.Patch(ctx, obj, patch, opts...)
},
}).
Build()

controller := &RuleReadinessController{
Client: fc,
Scheme: testScheme,
clientset: fake.NewSimpleClientset(),
ruleCache: make(map[string]*nodereadinessiov1alpha1.NodeReadinessRule),
EventRecorder: record.NewFakeRecorder(10),
}

Expect(fc.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed())

err := controller.removeTaintBySpec(ctx, node, corev1.Taint{
Key: "readiness.k8s.io/nonexistent",
Effect: corev1.TaintEffectNoSchedule,
}, "test-rule")
Expect(err).NotTo(HaveOccurred())
Expect(patchCalled.Load()).To(BeFalse(),
"Patch should not be called when taint removal is a no-op")
})
})
})
Loading