From 7929fdf1c42e114dbaf817ceee518217d4c5bac9 Mon Sep 17 00:00:00 2001 From: "yang.qiu" Date: Tue, 3 Feb 2026 15:52:06 -0800 Subject: [PATCH 1/2] support external access through nodeport --- .dockerignore | 1 + api/v1alpha1/valkeycluster_types.go | 4 + .../crd/bases/valkey.io_valkeyclusters.yaml | 4 + config/rbac/role.yaml | 3 + internal/controller/deployment.go | 46 ++- internal/controller/deployment_test.go | 30 +- internal/controller/scripts/start-valkey.sh | 25 ++ .../controller/valkeycluster_controller.go | 318 ++++++++++++++++-- .../valkeycluster_controller_test.go | 26 +- 9 files changed, 397 insertions(+), 60 deletions(-) create mode 100644 internal/controller/scripts/start-valkey.sh diff --git a/.dockerignore b/.dockerignore index c67434c..85df0c8 100644 --- a/.dockerignore +++ b/.dockerignore @@ -13,3 +13,4 @@ # Re-include scripts !internal/controller/scripts/readiness-check.sh !internal/controller/scripts/liveness-check.sh +!internal/controller/scripts/start-valkey.sh \ No newline at end of file diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index f5dfad5..b63845c 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -69,6 +69,10 @@ type ValkeyClusterSpec struct { // +optional Affinity *corev1.Affinity `json:"affinity,omitempty"` + // Allow external access to each Valkey pod via a NodePort Service + // +optional + AllowExternalAccess bool `json:"allowExternalAccess,omitempty"` + // Metrics exporter options // +kubebuilder:default:={enabled:true} // +optional diff --git a/config/crd/bases/valkey.io_valkeyclusters.yaml b/config/crd/bases/valkey.io_valkeyclusters.yaml index f8a188c..52fd9aa 100644 --- a/config/crd/bases/valkey.io_valkeyclusters.yaml +++ b/config/crd/bases/valkey.io_valkeyclusters.yaml @@ -973,6 +973,10 @@ spec: x-kubernetes-list-type: atomic type: object type: object + allowExternalAccess: + description: Allow external access to each Valkey pod via a NodePort + Service + type: boolean exporter: default: enabled: true diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 578d6db..d2a3d52 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -24,11 +24,14 @@ rules: verbs: - get - list + - patch + - update - watch - apiGroups: - apps resources: - deployments + - statefulsets verbs: - create - delete diff --git a/internal/controller/deployment.go b/internal/controller/deployment.go index 4f87a75..9843f7c 100644 --- a/internal/controller/deployment.go +++ b/internal/controller/deployment.go @@ -34,8 +34,29 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con Image: image, Resources: cluster.Spec.Resources, Command: []string{ - "valkey-server", - "/config/valkey.conf", + "/scripts/start-valkey.sh", + }, + Env: []corev1.EnvVar{ + { + Name: "VALKEY_NODE_IP", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.hostIP", + }, + }, + }, + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "VALKEY_BASE_NODE_PORT", + Value: "30000", + }, }, Ports: []corev1.ContainerPort{ { @@ -105,6 +126,10 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con MountPath: "/config", ReadOnly: true, }, + { + Name: "data", + MountPath: "/data", + }, }, }, } @@ -116,16 +141,18 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con return containers } -func createClusterDeployment(cluster *valkeyiov1alpha1.ValkeyCluster) *appsv1.Deployment { +func createClusterStatefulSet(cluster *valkeyiov1alpha1.ValkeyCluster, replicas int32) *appsv1.StatefulSet { containers := generateContainersDef(cluster) - deployment := &appsv1.Deployment{ + + return &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: cluster.Name + "-", - Namespace: cluster.Namespace, - Labels: labels(cluster), + Name: cluster.Name, + Namespace: cluster.Namespace, + Labels: labels(cluster), }, - Spec: appsv1.DeploymentSpec{ - Replicas: func(i int32) *int32 { return &i }(1), + Spec: appsv1.StatefulSetSpec{ + ServiceName: cluster.Name, + Replicas: func(i int32) *int32 { return &i }(replicas), Selector: &metav1.LabelSelector{ MatchLabels: labels(cluster), }, @@ -165,5 +192,4 @@ func createClusterDeployment(cluster *valkeyiov1alpha1.ValkeyCluster) *appsv1.De }, }, } - return deployment } diff --git a/internal/controller/deployment_test.go b/internal/controller/deployment_test.go index 778c39a..6f34cb3 100644 --- a/internal/controller/deployment_test.go +++ b/internal/controller/deployment_test.go @@ -31,7 +31,7 @@ import ( valkeyv1 "valkey.io/valkey-operator/api/v1alpha1" ) -func TestCreateClusterDeployment(t *testing.T) { +func TestCreateClusterStatefulSet(t *testing.T) { cluster := &valkeyv1.ValkeyCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "mycluster", @@ -40,25 +40,25 @@ func TestCreateClusterDeployment(t *testing.T) { Image: "container:version", }, } - d := createClusterDeployment(cluster) - if d.Name != "" { - t.Errorf("Expected empty name field, got %v", d.Name) + s := createClusterStatefulSet(cluster, 3) + if s.Name != "mycluster" { + t.Errorf("Expected %v, got %v", "mycluster", s.Name) } - if d.GenerateName != "mycluster-" { - t.Errorf("Expected %v, got %v", "mycluster-", d.GenerateName) + if s.Spec.ServiceName != "mycluster" { + t.Errorf("Expected %v, got %v", "mycluster", s.Spec.ServiceName) } - if *d.Spec.Replicas != 1 { - t.Errorf("Expected %v, got %v", 1, d.Spec.Replicas) + if *s.Spec.Replicas != 3 { + t.Errorf("Expected %v, got %v", 3, s.Spec.Replicas) } - if len(d.Spec.Template.Spec.Containers) != 1 { - t.Errorf("Expected %v, got %v", 1, len(d.Spec.Template.Spec.Containers)) + if len(s.Spec.Template.Spec.Containers) != 1 { + t.Errorf("Expected %v, got %v", 1, len(s.Spec.Template.Spec.Containers)) } - if d.Spec.Template.Spec.Containers[0].Image != "container:version" { - t.Errorf("Expected %v, got %v", "container:version", d.Spec.Template.Spec.Containers[0].Image) + if s.Spec.Template.Spec.Containers[0].Image != "container:version" { + t.Errorf("Expected %v, got %v", "container:version", s.Spec.Template.Spec.Containers[0].Image) } } -func TestCreateClusterDeployment_SetsPodAntiAffinity(t *testing.T) { +func TestCreateClusterStatefulSet_SetsPodAntiAffinity(t *testing.T) { antiAffinity := &corev1.Affinity{ PodAntiAffinity: &corev1.PodAntiAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ @@ -81,9 +81,9 @@ func TestCreateClusterDeployment_SetsPodAntiAffinity(t *testing.T) { }, } - d := createClusterDeployment(cluster) + s := createClusterStatefulSet(cluster, 1) - got := d.Spec.Template.Spec.Affinity + got := s.Spec.Template.Spec.Affinity if diff := cmp.Diff(antiAffinity, got, cmpopts.EquateEmpty()); diff != "" { t.Fatalf("affinity mismatch (-want +got):\n%s", diff) } diff --git a/internal/controller/scripts/start-valkey.sh b/internal/controller/scripts/start-valkey.sh new file mode 100644 index 0000000..b62e113 --- /dev/null +++ b/internal/controller/scripts/start-valkey.sh @@ -0,0 +1,25 @@ +#!/bin/bash +set -e + +announce_file="/data/announce.conf" +mkdir -p /data +cat /dev/null > "$announce_file" + +base_node_port="${VALKEY_BASE_NODE_PORT:-30000}" +if [ -n "$POD_NAME" ] && echo "$POD_NAME" | grep -q -- '-'; then + ordinal="${POD_NAME##*-}" + if echo "$ordinal" | grep -qE '^[0-9]+$'; then + VALKEY_NODE_PORT="$((base_node_port + ordinal * 2))" + VALKEY_NODE_BUS_PORT="$((base_node_port + ordinal * 2 + 1))" + fi +fi + +if [ -n "$VALKEY_NODE_IP" ] && [ -n "$VALKEY_NODE_PORT" ] && [ -n "$VALKEY_NODE_BUS_PORT" ]; then + cat < "$announce_file" +cluster-announce-ip $VALKEY_NODE_IP +cluster-announce-port $VALKEY_NODE_PORT +cluster-announce-bus-port $VALKEY_NODE_BUS_PORT +EOF +fi + +exec valkey-server /config/valkey.conf diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index d7a91b7..86df21a 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -21,7 +21,9 @@ import ( "embed" "errors" "slices" + "sort" "strconv" + "strings" "time" appsv1 "k8s.io/api/apps/v1" @@ -30,7 +32,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/events" + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -40,11 +42,17 @@ import ( ) const ( - DefaultPort = 6379 - DefaultClusterBusPort = 16379 - DefaultImage = "valkey/valkey:9.0.0" - DefaultExporterImage = "oliver006/redis_exporter:v1.80.0" - DefaultExporterPort = 9121 + DefaultPort = 6379 + DefaultClusterBusPort = 16379 + DefaultImage = "valkey/valkey:9.0.0" + DefaultExporterImage = "oliver006/redis_exporter:v1.80.0" + DefaultExporterPort = 9121 + ExternalAccessLabelKey = "valkey.io/external-access" + ExternalAccessLabelValue = "nodeport" + + ShardIndexLabelKey = "valkey.io/shard-index" + ReplicaIndexLabelKey = "valkey.io/replica-index" + RoleLabelKey = "valkey.io/role" // Error messages statusUpdateFailedMsg = "failed to update status" @@ -65,9 +73,10 @@ var scripts embed.FS // +kubebuilder:rbac:groups=valkey.io,resources=valkeyclusters/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;patch +// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -95,12 +104,26 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - if err := r.upsertDeployments(ctx, cluster); err != nil { + if err := r.upsertStatefulSet(ctx, cluster); err != nil { setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) _ = r.updateStatus(ctx, cluster, nil) return ctrl.Result{}, err } + if cluster.Spec.AllowExternalAccess { + if err := r.upsertNodePortServices(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonServiceError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + } else { + if err := r.deleteNodePortServices(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonServiceError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + } + // Get all pods and their current Valkey Cluster state pods := &corev1.PodList{} if err := r.List(ctx, pods, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { @@ -112,6 +135,8 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques state := r.getValkeyClusterState(ctx, pods) defer state.CloseClients() + r.syncPodRoleLabels(ctx, cluster, state, pods) + // Check if we need to forget stale non-existing nodes r.forgetStaleNodes(ctx, cluster, state, pods) @@ -236,6 +261,10 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, cluster * if err != nil { return err } + startValkey, err := scripts.ReadFile("scripts/start-valkey.sh") + if err != nil { + return err + } cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -246,10 +275,12 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, cluster * Data: map[string]string{ "readiness-check.sh": string(readiness), "liveness-check.sh": string(liveness), + "start-valkey.sh": string(startValkey), "valkey.conf": ` cluster-enabled yes protected-mode no -cluster-node-timeout 2000`, +cluster-node-timeout 2000 +include /data/announce.conf`, }, } if err := controllerutil.SetControllerReference(cluster, cm, r.Scheme); err != nil { @@ -271,36 +302,279 @@ cluster-node-timeout 2000`, return nil } -// Create Valkey instances, one Deployment and Pod each -func (r *ValkeyClusterReconciler) upsertDeployments(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + +// Create Valkey instances with a single StatefulSet +func (r *ValkeyClusterReconciler) upsertStatefulSet(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { log := logf.FromContext(ctx) - existing := &appsv1.DeploymentList{} + existing := &appsv1.StatefulSetList{} if err := r.List(ctx, existing, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { - log.Error(err, "failed to list Deployments") + log.Error(err, "failed to list StatefulSets") return err } expected := int(cluster.Spec.Shards * (1 + cluster.Spec.Replicas)) - // Create missing deployments - for i := len(existing.Items); i < expected; i++ { - deployment := createClusterDeployment(cluster) - if err := controllerutil.SetControllerReference(cluster, deployment, r.Scheme); err != nil { + if len(existing.Items) == 0 { + statefulSet := createClusterStatefulSet(cluster, int32(expected)) + if err := controllerutil.SetControllerReference(cluster, statefulSet, r.Scheme); err != nil { return err } - if err := r.Create(ctx, deployment); err != nil { - r.Recorder.Eventf(cluster, deployment, corev1.EventTypeWarning, "DeploymentCreationFailed", "CreateDeployment", "Failed to create deployment: %v", err) + if err := r.Create(ctx, statefulSet); err != nil { + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "StatefulSetCreationFailed", "Failed to create StatefulSet: %v", err) + return err + } + r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "StatefulSetCreated", "Created StatefulSet with %d replicas", expected) + return nil + } + + // Update existing StatefulSet replica count if needed + statefulSet := &existing.Items[0] + updated := false + if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas != int32(expected) { + statefulSet.Spec.Replicas = func(i int32) *int32 { return &i }(int32(expected)) + updated = true + } + if updated { + if err := r.Update(ctx, statefulSet); err != nil { + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "StatefulSetUpdateFailed", "Failed to update StatefulSet: %v", err) return err } - r.Recorder.Eventf(cluster, deployment, corev1.EventTypeNormal, "DeploymentCreated", "CreateDeployment", "Created deployment %d of %d", i+1, expected) } - // TODO: update existing + return nil +} + +// Create or update a NodePort service for each pod. +func (r *ValkeyClusterReconciler) upsertNodePortServices(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + + pods := &corev1.PodList{} + if err := r.List(ctx, pods, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list Pods") + return err + } + + services := &corev1.ServiceList{} + if err := r.List(ctx, services); err != nil { + log.Error(err, "failed to list Services") + return err + } + + externalServices := map[string]*corev1.Service{} + for i := range services.Items { + service := &services.Items[i] + if service.Namespace == cluster.Namespace { + externalServices[service.Name] = service + } + } + + for i := range pods.Items { + pod := &pods.Items[i] + svcName := pod.Name + "-nodeport" + var nodePort int32 + var busNodePort int32 + existingSvc, hasExistingService := externalServices[svcName] + + ordinal, err := parsePodOrdinal(pod.Name) + if err != nil { + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceCreationFailed", "Failed to parse pod ordinal: %v", err) + return err + } + base := int32(30000) + nodePort = base + int32(ordinal*2) + busNodePort = base + int32(ordinal*2+1) + + desiredPorts := []corev1.ServicePort{ + { + Name: "valkey", + Port: DefaultPort, + TargetPort: intstr.FromInt(DefaultPort), + NodePort: nodePort, + }, + { + Name: "cluster-bus", + Port: DefaultClusterBusPort, + TargetPort: intstr.FromInt(DefaultClusterBusPort), + NodePort: busNodePort, + }, + } + + if hasExistingService { + existingSvc.Spec.Type = corev1.ServiceTypeNodePort + existingSvc.Spec.Selector = map[string]string{"statefulset.kubernetes.io/pod-name": pod.Name} + existingSvc.Spec.Ports = desiredPorts + if existingSvc.Labels == nil { + existingSvc.Labels = map[string]string{} + } + for key, value := range labels(cluster) { + existingSvc.Labels[key] = value + } + existingSvc.Labels[ExternalAccessLabelKey] = ExternalAccessLabelValue + if err := controllerutil.SetControllerReference(cluster, existingSvc, r.Scheme); err != nil { + return err + } + if err := r.Update(ctx, existingSvc); err != nil { + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceUpdateFailed", "Failed to update external Service: %v", err) + return err + } + for _, port := range existingSvc.Spec.Ports { + switch port.Name { + case "valkey": + nodePort = port.NodePort + case "cluster-bus": + busNodePort = port.NodePort + } + } + } else { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: cluster.Namespace, + Labels: labels(cluster), + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Selector: map[string]string{"statefulset.kubernetes.io/pod-name": pod.Name}, + Ports: desiredPorts, + }, + } + svc.Labels[ExternalAccessLabelKey] = ExternalAccessLabelValue + if err := controllerutil.SetControllerReference(cluster, svc, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, svc); err != nil { + if !apierrors.IsAlreadyExists(err) { + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceCreationFailed", "Failed to create external Service: %v", err) + return err + } + } else { + r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "ServiceCreated", "Created NodePort Service %s", svcName) + } + } + + } + return nil +} +func (r *ValkeyClusterReconciler) deleteNodePortServices(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + services := &corev1.ServiceList{} + if err := r.List(ctx, services, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list Services") + return err + } + for i := range services.Items { + service := &services.Items[i] + if service.Labels[ExternalAccessLabelKey] != ExternalAccessLabelValue { + continue + } + if err := r.Delete(ctx, service); err != nil && !apierrors.IsNotFound(err) { + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceDeletionFailed", "Failed to delete external Service: %v", err) + return err + } + } return nil } +func parsePodOrdinal(podName string) (int, error) { + lastDash := strings.LastIndex(podName, "-") + if lastDash == -1 || lastDash == len(podName)-1 { + return 0, errors.New("pod name missing ordinal suffix") + } + ordinal, err := strconv.Atoi(podName[lastDash+1:]) + if err != nil { + return 0, err + } + return ordinal, nil +} + +func (r *ValkeyClusterReconciler) syncPodRoleLabels(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, state *valkey.ClusterState, pods *corev1.PodList) { + if state == nil { + return + } + + podsByIP := map[string]*corev1.Pod{} + for i := range pods.Items { + pod := &pods.Items[i] + if pod.Status.PodIP == "" { + continue + } + podsByIP[pod.Status.PodIP] = pod + } + + type shardEntry struct { + shard *valkey.ShardState + key int + id string + } + shardEntries := make([]shardEntry, 0, len(state.Shards)) + for _, shard := range state.Shards { + key := 1<<31 - 1 + if len(shard.Slots) > 0 { + key = shard.Slots[0].Start + } + shardEntries = append(shardEntries, shardEntry{shard: shard, key: key, id: shard.Id}) + } + sort.Slice(shardEntries, func(i, j int) bool { + if shardEntries[i].key != shardEntries[j].key { + return shardEntries[i].key < shardEntries[j].key + } + return shardEntries[i].id < shardEntries[j].id + }) + + shardIndexByID := map[string]int{} + for i, entry := range shardEntries { + shardIndexByID[entry.id] = i + } + + for _, entry := range shardEntries { + shard := entry.shard + nodes := make([]*valkey.NodeState, 0, len(shard.Nodes)) + nodes = append(nodes, shard.Nodes...) + sort.Slice(nodes, func(i, j int) bool { + if nodes[i].IsPrimary() != nodes[j].IsPrimary() { + return nodes[i].IsPrimary() + } + return nodes[i].Id < nodes[j].Id + }) + for idx, node := range nodes { + pod, ok := podsByIP[node.Address] + if !ok { + continue + } + labelsUpdated := false + if pod.Labels == nil { + pod.Labels = map[string]string{} + labelsUpdated = true + } + shardIndex := shardIndexByID[shard.Id] + role := "replica" + if node.IsPrimary() { + role = "primary" + } + if pod.Labels[ShardIndexLabelKey] != strconv.Itoa(shardIndex) { + pod.Labels[ShardIndexLabelKey] = strconv.Itoa(shardIndex) + labelsUpdated = true + } + if pod.Labels[ReplicaIndexLabelKey] != strconv.Itoa(idx) { + pod.Labels[ReplicaIndexLabelKey] = strconv.Itoa(idx) + labelsUpdated = true + } + if pod.Labels[RoleLabelKey] != role { + pod.Labels[RoleLabelKey] = role + labelsUpdated = true + } + if labelsUpdated { + if err := r.Update(ctx, pod); err != nil { + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "PodUpdateFailed", "Failed to update pod labels: %v", err) + return + } + } + } + } +} + func (r *ValkeyClusterReconciler) getValkeyClusterState(ctx context.Context, pods *corev1.PodList) *valkey.ClusterState { // Create a list of addresses to possible Valkey nodes ips := []string{} diff --git a/internal/controller/valkeycluster_controller_test.go b/internal/controller/valkeycluster_controller_test.go index b682546..50470c5 100644 --- a/internal/controller/valkeycluster_controller_test.go +++ b/internal/controller/valkeycluster_controller_test.go @@ -108,7 +108,7 @@ var _ = Describe("ValkeyCluster Controller", func() { Expect(events).ToNot(BeEmpty()) Expect(events).To(ContainElement(ContainSubstring("ServiceCreated"))) Expect(events).To(ContainElement(ContainSubstring("ConfigMapCreated"))) - Expect(events).To(ContainElement(ContainSubstring("DeploymentCreated"))) + Expect(events).To(ContainElement(ContainSubstring("StatefulSetCreated"))) }) }) @@ -279,7 +279,7 @@ var _ = Describe("EventRecorder", func() { Expect(events).To(ContainElement(ContainSubstring("Created ConfigMap with configuration"))) }) - It("should emit DeploymentCreated event on successful deployment creation", func() { + It("should emit StatefulSetCreated event on successful StatefulSet creation", func() { cluster := &valkeyiov1alpha1.ValkeyCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "event-test-cluster", @@ -293,15 +293,15 @@ var _ = Describe("EventRecorder", func() { Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) defer func() { _ = k8sClient.Delete(ctx, cluster) }() - err := r.upsertDeployments(ctx, cluster) + err := r.upsertStatefulSet(ctx, cluster) Expect(err).NotTo(HaveOccurred()) events := collectEvents(fakeRecorder) - Expect(events).To(ContainElement(ContainSubstring("DeploymentCreated"))) + Expect(events).To(ContainElement(ContainSubstring("StatefulSetCreated"))) Expect(events).To(ContainElement(ContainSubstring("Normal"))) - deploymentEvents := filterEventsByType(events, "DeploymentCreated") - Expect(len(deploymentEvents)).To(BeNumerically(">", 1)) - Expect(deploymentEvents[0]).To(ContainSubstring("Normal")) + statefulSetEvents := filterEventsByType(events, "StatefulSetCreated") + Expect(len(statefulSetEvents)).To(BeNumerically(">", 0)) + Expect(statefulSetEvents[0]).To(ContainSubstring("Normal")) }) }) @@ -364,16 +364,16 @@ var _ = Describe("EventRecorder", func() { Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) defer func() { _ = k8sClient.Delete(ctx, cluster) }() - // Trigger deployment creation to verify formatted message - err := r.upsertDeployments(ctx, cluster) + // Trigger StatefulSet creation to verify formatted message + err := r.upsertStatefulSet(ctx, cluster) Expect(err).NotTo(HaveOccurred()) events := collectEvents(fakeRecorder) - deploymentEvents := filterEvents(events, "DeploymentCreated") - Expect(deploymentEvents).ToNot(BeEmpty()) + statefulSetEvents := filterEvents(events, "StatefulSetCreated") + Expect(statefulSetEvents).ToNot(BeEmpty()) - for _, event := range deploymentEvents { - Expect(event).To(MatchRegexp(`Created deployment \d+ of \d+`)) + for _, event := range statefulSetEvents { + Expect(event).To(MatchRegexp(`Created StatefulSet with \d+ replicas`)) } }) }) From d7ed74101c73033045555384a6d20bd19ad70c77 Mon Sep 17 00:00:00 2001 From: "yang.qiu" Date: Wed, 4 Feb 2026 13:29:22 -0800 Subject: [PATCH 2/2] use singleton statefulsets(one statefulset per valkey pod) Signed-off-by: yang.qiu --- api/v1alpha1/valkeycluster_types.go | 4 + .../crd/bases/valkey.io_valkeyclusters.yaml | 3 + config/rbac/role.yaml | 14 +- internal/controller/deployment.go | 49 +- internal/controller/deployment_test.go | 74 +-- internal/controller/replicas.go | 153 ++++++ internal/controller/scripts/start-valkey.sh | 29 +- internal/controller/statefulset.go | 123 +++++ internal/controller/statefulset_test.go | 103 ++++ .../controller/valkeycluster_controller.go | 463 +++++++++++++++--- .../valkeycluster_controller_test.go | 4 +- 11 files changed, 859 insertions(+), 160 deletions(-) create mode 100644 internal/controller/replicas.go create mode 100644 internal/controller/statefulset.go create mode 100644 internal/controller/statefulset_test.go diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index b63845c..2f31aee 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -73,6 +73,10 @@ type ValkeyClusterSpec struct { // +optional AllowExternalAccess bool `json:"allowExternalAccess,omitempty"` + // Use a one-statefulset-per-pod model instead of one-deployment-per-pod. + // +optional + UseStatefulSetPerPod bool `json:"useStatefulSetPerPod,omitempty"` + // Metrics exporter options // +kubebuilder:default:={enabled:true} // +optional diff --git a/config/crd/bases/valkey.io_valkeyclusters.yaml b/config/crd/bases/valkey.io_valkeyclusters.yaml index 52fd9aa..81b340e 100644 --- a/config/crd/bases/valkey.io_valkeyclusters.yaml +++ b/config/crd/bases/valkey.io_valkeyclusters.yaml @@ -1168,6 +1168,9 @@ spec: type: string type: object type: array + useStatefulSetPerPod: + description: Use a one-statefulset-per-pod model instead of one-deployment-per-pod. + type: boolean type: object status: default: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index d2a3d52..d47b220 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -17,6 +17,13 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: @@ -40,13 +47,6 @@ rules: - patch - update - watch -- apiGroups: - - events.k8s.io - resources: - - events - verbs: - - create - - patch - apiGroups: - valkey.io resources: diff --git a/internal/controller/deployment.go b/internal/controller/deployment.go index 9843f7c..9fad67c 100644 --- a/internal/controller/deployment.go +++ b/internal/controller/deployment.go @@ -17,6 +17,8 @@ limitations under the License. package controller import ( + "strconv" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,6 +47,26 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con }, }, }, + { + Name: "VALKEY_ENABLE_EXTERNAL_ANNOUNCE", + Value: strconv.FormatBool(cluster.Spec.AllowExternalAccess), + }, + { + Name: "VALKEY_NODE_PORT", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.annotations['" + ExternalAccessNodePortAnnotationKey + "']", + }, + }, + }, + { + Name: "VALKEY_NODE_BUS_PORT", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.annotations['" + ExternalAccessBusPortAnnotationKey + "']", + }, + }, + }, { Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{ @@ -57,6 +79,10 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con Name: "VALKEY_BASE_NODE_PORT", Value: "30000", }, + { + Name: "VALKEY_REPLICA_COUNT", + Value: strconv.Itoa(int(cluster.Spec.Replicas)), + }, }, Ports: []corev1.ContainerPort{ { @@ -141,24 +167,25 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con return containers } -func createClusterStatefulSet(cluster *valkeyiov1alpha1.ValkeyCluster, replicas int32) *appsv1.StatefulSet { +func createClusterDeployment(cluster *valkeyiov1alpha1.ValkeyCluster, replicas int32, name string) *appsv1.Deployment { containers := generateContainersDef(cluster) + labelsWithTarget := copyMap(labels(cluster)) + labelsWithTarget[ExternalAccessTargetKey] = name - return &appsv1.StatefulSet{ + return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name, + Name: name, Namespace: cluster.Namespace, Labels: labels(cluster), }, - Spec: appsv1.StatefulSetSpec{ - ServiceName: cluster.Name, - Replicas: func(i int32) *int32 { return &i }(replicas), + Spec: appsv1.DeploymentSpec{ + Replicas: func(i int32) *int32 { return &i }(replicas), Selector: &metav1.LabelSelector{ - MatchLabels: labels(cluster), + MatchLabels: labelsWithTarget, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: labels(cluster), + Labels: labelsWithTarget, }, Spec: corev1.PodSpec{ Containers: containers, @@ -187,6 +214,12 @@ func createClusterStatefulSet(cluster *valkeyiov1alpha1.ValkeyCluster, replicas }, }, }, + { + Name: "data", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, }, }, }, diff --git a/internal/controller/deployment_test.go b/internal/controller/deployment_test.go index 6f34cb3..df44633 100644 --- a/internal/controller/deployment_test.go +++ b/internal/controller/deployment_test.go @@ -22,8 +22,6 @@ import ( "path/filepath" "testing" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -31,7 +29,7 @@ import ( valkeyv1 "valkey.io/valkey-operator/api/v1alpha1" ) -func TestCreateClusterStatefulSet(t *testing.T) { +func TestCreateClusterDeployment(t *testing.T) { cluster := &valkeyv1.ValkeyCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "mycluster", @@ -40,72 +38,24 @@ func TestCreateClusterStatefulSet(t *testing.T) { Image: "container:version", }, } - s := createClusterStatefulSet(cluster, 3) - if s.Name != "mycluster" { - t.Errorf("Expected %v, got %v", "mycluster", s.Name) + d := createClusterDeployment(cluster, 2, "mycluster-0-0") + if d.Name != "mycluster-0-0" { + t.Errorf("Expected %v, got %v", "mycluster-0-0", d.Name) } - if s.Spec.ServiceName != "mycluster" { - t.Errorf("Expected %v, got %v", "mycluster", s.Spec.ServiceName) + if *d.Spec.Replicas != 2 { + t.Errorf("Expected %v, got %v", 2, d.Spec.Replicas) } - if *s.Spec.Replicas != 3 { - t.Errorf("Expected %v, got %v", 3, s.Spec.Replicas) + if len(d.Spec.Template.Spec.Containers) != 1 { + t.Errorf("Expected %v, got %v", 1, len(d.Spec.Template.Spec.Containers)) } - if len(s.Spec.Template.Spec.Containers) != 1 { - t.Errorf("Expected %v, got %v", 1, len(s.Spec.Template.Spec.Containers)) + if d.Spec.Template.Spec.Containers[0].Image != "container:version" { + t.Errorf("Expected %v, got %v", "container:version", d.Spec.Template.Spec.Containers[0].Image) } - if s.Spec.Template.Spec.Containers[0].Image != "container:version" { - t.Errorf("Expected %v, got %v", "container:version", s.Spec.Template.Spec.Containers[0].Image) + if d.Spec.Template.Labels[ExternalAccessTargetKey] != "mycluster-0-0" { + t.Errorf("Expected %v, got %v", "mycluster-0-0", d.Spec.Template.Labels[ExternalAccessTargetKey]) } } -func TestCreateClusterStatefulSet_SetsPodAntiAffinity(t *testing.T) { - antiAffinity := &corev1.Affinity{ - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app.kubernetes.io/instance": "mycluster", - }, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - } - cluster := &valkeyv1.ValkeyCluster{ - ObjectMeta: metav1.ObjectMeta{Name: "mycluster"}, - Spec: valkeyv1.ValkeyClusterSpec{ - Image: "container:version", - Affinity: antiAffinity, - }, - } - - s := createClusterStatefulSet(cluster, 1) - - got := s.Spec.Template.Spec.Affinity - if diff := cmp.Diff(antiAffinity, got, cmpopts.EquateEmpty()); diff != "" { - t.Fatalf("affinity mismatch (-want +got):\n%s", diff) - } -} - -func TestCreateClusterDeployment_SetsNodeSelector(t *testing.T) { - nodeSelector := map[string]string{ - "disktype": "ssd", - } - cluster := &valkeyv1.ValkeyCluster{ - ObjectMeta: metav1.ObjectMeta{Name: "mycluster"}, - Spec: valkeyv1.ValkeyClusterSpec{ - Image: "container:version", - NodeSelector: nodeSelector, - }, - } - - d := createClusterDeployment(cluster) - - assert.Equal(t, nodeSelector, d.Spec.Template.Spec.NodeSelector, "node selector should match spec") -} - func TestGenerateContainersDef(t *testing.T) { t.Run("should return only valkey-server when exporter is disabled", func(t *testing.T) { cluster := &valkeyv1.ValkeyCluster{ diff --git a/internal/controller/replicas.go b/internal/controller/replicas.go new file mode 100644 index 0000000..2cd19df --- /dev/null +++ b/internal/controller/replicas.go @@ -0,0 +1,153 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "slices" + "sort" + "strings" + + corev1 "k8s.io/api/core/v1" + logf "sigs.k8s.io/controller-runtime/pkg/log" + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" + "valkey.io/valkey-operator/internal/valkey" +) + +func (r *ValkeyClusterReconciler) attachReplica(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, state *valkey.ClusterState) (bool, error) { + if state == nil { + return false, nil + } + + replicasRequired := int(cluster.Spec.Replicas) + if replicasRequired == 0 { + return false, nil + } + + needsReplica := []*valkey.NodeState{} + emptyMasters := []*valkey.NodeState{} + for _, shard := range state.Shards { + primary := shard.GetPrimaryNode() + if primary == nil { + continue + } + if countSlots(shard.Slots) == 0 { + emptyMasters = append(emptyMasters, primary) + continue + } + // Ask the primary itself how many replicas it sees. + // This is the most authoritative view because CLUSTER REPLICATE + // notifies the primary directly, so it converges fastest. + replicaCount := countReplicasForPrimary(primary.ClusterNodes, primary.Id) + if replicaCount < replicasRequired { + needsReplica = append(needsReplica, primary) + } + } + + if len(needsReplica) == 0 { + return false, nil + } + + pendingCandidates := []*valkey.NodeState{} + for _, node := range state.PendingNodes { + if node.IsPrimary() && len(node.GetSlots()) == 0 { + pendingCandidates = append(pendingCandidates, node) + } + } + + if len(emptyMasters) == 0 && len(pendingCandidates) == 0 { + return false, nil + } + + sort.Slice(needsReplica, func(i, j int) bool { + return needsReplica[i].Address < needsReplica[j].Address + }) + sort.Slice(emptyMasters, func(i, j int) bool { + return emptyMasters[i].Address < emptyMasters[j].Address + }) + sort.Slice(pendingCandidates, func(i, j int) bool { + return pendingCandidates[i].Address < pendingCandidates[j].Address + }) + + log := logf.FromContext(ctx) + candidates := append(emptyMasters, pendingCandidates...) + for _, candidate := range candidates { + for _, target := range needsReplica { + if candidate.Id == target.Id { + continue + } + knowsPrimary := false + for line := range strings.SplitSeq(candidate.ClusterNodes, "\n") { + fields := strings.Fields(line) + if len(fields) < 2 { + continue + } + if fields[0] == target.Id { + knowsPrimary = true + break + } + } + if !knowsPrimary { + log.V(1).Info("meeting primary before replicate", "primary address", target.Address, "replica address", candidate.Address) + if err := candidate.Client.Do(ctx, candidate.Client.B().ClusterMeet().Ip(target.Address).Port(int64(target.Port)).Build()).Error(); err != nil { + log.Error(err, "command failed: CLUSTER MEET", "from", candidate.Address, "to", target.Address) + r.Recorder.Eventf(cluster, nil, corev1.EventTypeWarning, "ClusterMeetFailed", "ClusterMeet", "CLUSTER MEET failed: %v", err) + return false, err + } + r.Recorder.Eventf(cluster, nil, corev1.EventTypeNormal, "ClusterMeet", "ClusterMeet", "Node %v met node %v", candidate.Address, target.Address) + return true, nil + } + log.V(1).Info("add a new replica", "primary address", target.Address, "primary Id", target.Id, "replica address", candidate.Address) + if err := candidate.Client.Do(ctx, candidate.Client.B().ClusterReplicate().NodeId(target.Id).Build()).Error(); err != nil { + log.Error(err, "command failed: CLUSTER REPLICATE", "nodeId", target.Id) + r.Recorder.Eventf(cluster, nil, corev1.EventTypeWarning, "ReplicaCreationFailed", "CreateReplica", "Failed to create replica: %v", err) + return false, err + } + r.Recorder.Eventf(cluster, nil, corev1.EventTypeNormal, "ReplicaCreated", "CreateReplica", "Created replica for primary %v", target.Id) + return true, nil + } + } + + return false, nil +} + +// countReplicasForPrimary counts how many non-failed replicas the primary +// sees for itself in its own CLUSTER NODES output. +func countReplicasForPrimary(clusterNodes string, primaryId string) int { + count := 0 + for line := range strings.SplitSeq(clusterNodes, "\n") { + fields := strings.Fields(line) + if len(fields) < 4 { + continue + } + flags := strings.Split(fields[2], ",") + if slices.Contains(flags, "slave") && !slices.Contains(flags, "fail") && !slices.Contains(flags, "noaddr") { + if fields[3] == primaryId { + count++ + } + } + } + return count +} + +func countSlots(ranges []valkey.SlotsRange) int { + count := 0 + for _, slot := range ranges { + count += slot.End - slot.Start + 1 + } + return count +} diff --git a/internal/controller/scripts/start-valkey.sh b/internal/controller/scripts/start-valkey.sh index b62e113..0829c32 100644 --- a/internal/controller/scripts/start-valkey.sh +++ b/internal/controller/scripts/start-valkey.sh @@ -5,21 +5,32 @@ announce_file="/data/announce.conf" mkdir -p /data cat /dev/null > "$announce_file" -base_node_port="${VALKEY_BASE_NODE_PORT:-30000}" -if [ -n "$POD_NAME" ] && echo "$POD_NAME" | grep -q -- '-'; then - ordinal="${POD_NAME##*-}" - if echo "$ordinal" | grep -qE '^[0-9]+$'; then - VALKEY_NODE_PORT="$((base_node_port + ordinal * 2))" - VALKEY_NODE_BUS_PORT="$((base_node_port + ordinal * 2 + 1))" +if [ "$VALKEY_ENABLE_EXTERNAL_ANNOUNCE" = "true" ]; then + if [ -z "$VALKEY_NODE_PORT" ] || [ -z "$VALKEY_NODE_BUS_PORT" ]; then + base_node_port="${VALKEY_BASE_NODE_PORT:-30000}" + replica_count="${VALKEY_REPLICA_COUNT:-0}" + if [ -n "$POD_NAME" ]; then + trimmed="${POD_NAME%-0}" + if [ "$trimmed" != "$POD_NAME" ]; then + replica="${trimmed##*-}" + tmp="${trimmed%-*}" + shard="${tmp##*-}" + if echo "$replica" | grep -qE '^[0-9]+$' && echo "$shard" | grep -qE '^[0-9]+$'; then + ordinal="$((shard * (replica_count + 1) + replica))" + VALKEY_NODE_PORT="$((base_node_port + ordinal * 2))" + VALKEY_NODE_BUS_PORT="$((base_node_port + ordinal * 2 + 1))" + fi + fi + fi fi -fi -if [ -n "$VALKEY_NODE_IP" ] && [ -n "$VALKEY_NODE_PORT" ] && [ -n "$VALKEY_NODE_BUS_PORT" ]; then - cat < "$announce_file" + if [ -n "$VALKEY_NODE_IP" ] && [ -n "$VALKEY_NODE_PORT" ] && [ -n "$VALKEY_NODE_BUS_PORT" ]; then + cat < "$announce_file" cluster-announce-ip $VALKEY_NODE_IP cluster-announce-port $VALKEY_NODE_PORT cluster-announce-bus-port $VALKEY_NODE_BUS_PORT EOF + fi fi exec valkey-server /config/valkey.conf diff --git a/internal/controller/statefulset.go b/internal/controller/statefulset.go new file mode 100644 index 0000000..6f9054e --- /dev/null +++ b/internal/controller/statefulset.go @@ -0,0 +1,123 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "errors" + "strconv" + "strings" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" +) + +func createClusterStatefulSet(cluster *valkeyiov1alpha1.ValkeyCluster, replicas int32, name string) *appsv1.StatefulSet { + containers := generateContainersDef(cluster) + + labelsWithIdentity := labels(cluster) + if shardIndex, replicaIndex, err := parseShardReplicaFromStatefulSetName(name); err == nil { + labelsWithIdentity = copyMap(labelsWithIdentity) + labelsWithIdentity[ShardIndexLabelKey] = strconv.Itoa(shardIndex) + labelsWithIdentity[ReplicaIndexLabelKey] = strconv.Itoa(replicaIndex) + } + + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: cluster.Namespace, + Labels: labels(cluster), + }, + Spec: appsv1.StatefulSetSpec{ + ServiceName: cluster.Name, + Replicas: func(i int32) *int32 { return &i }(replicas), + Selector: &metav1.LabelSelector{ + MatchLabels: labels(cluster), + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labelsWithIdentity, + }, + Spec: corev1.PodSpec{ + Containers: containers, + Affinity: cluster.Spec.Affinity, + NodeSelector: cluster.Spec.NodeSelector, + Tolerations: cluster.Spec.Tolerations, + Volumes: []corev1.Volume{ + { + Name: "scripts", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + DefaultMode: func(i int32) *int32 { return &i }(0755), + }, + }, + }, + { + Name: "valkey-conf", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + }, + }, + }, + { + Name: "data", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + }, + } +} + +func parseShardReplicaFromStatefulSetName(name string) (int, int, error) { + lastDash := strings.LastIndex(name, "-") + if lastDash == -1 || lastDash == len(name)-1 { + return 0, 0, errors.New("statefulset name missing replica suffix") + } + replica, err := strconv.Atoi(name[lastDash+1:]) + if err != nil { + return 0, 0, err + } + rest := name[:lastDash] + secondDash := strings.LastIndex(rest, "-") + if secondDash == -1 || secondDash == len(rest)-1 { + return 0, 0, errors.New("statefulset name missing shard suffix") + } + shard, err := strconv.Atoi(rest[secondDash+1:]) + if err != nil { + return 0, 0, err + } + return shard, replica, nil +} + +func copyMap(src map[string]string) map[string]string { + dst := map[string]string{} + for key, value := range src { + dst[key] = value + } + return dst +} diff --git a/internal/controller/statefulset_test.go b/internal/controller/statefulset_test.go new file mode 100644 index 0000000..bf6ca80 --- /dev/null +++ b/internal/controller/statefulset_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + valkeyv1 "valkey.io/valkey-operator/api/v1alpha1" +) + +func TestCreateClusterStatefulSet(t *testing.T) { + cluster := &valkeyv1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mycluster", + }, + Spec: valkeyv1.ValkeyClusterSpec{ + Image: "container:version", + }, + } + s := createClusterStatefulSet(cluster, 3, "mycluster-0-0") + if s.Name != "mycluster-0-0" { + t.Errorf("Expected %v, got %v", "mycluster-0-0", s.Name) + } + if s.Spec.ServiceName != "mycluster" { + t.Errorf("Expected %v, got %v", "mycluster", s.Spec.ServiceName) + } + if *s.Spec.Replicas != 3 { + t.Errorf("Expected %v, got %v", 3, s.Spec.Replicas) + } + if len(s.Spec.Template.Spec.Containers) != 1 { + t.Errorf("Expected %v, got %v", 1, len(s.Spec.Template.Spec.Containers)) + } + if s.Spec.Template.Spec.Containers[0].Image != "container:version" { + t.Errorf("Expected %v, got %v", "container:version", s.Spec.Template.Spec.Containers[0].Image) + } +} + +func TestCreateClusterStatefulSet_SetsPodAntiAffinity(t *testing.T) { + antiAffinity := &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/instance": "mycluster", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + } + cluster := &valkeyv1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "mycluster"}, + Spec: valkeyv1.ValkeyClusterSpec{ + Image: "container:version", + Affinity: antiAffinity, + }, + } + + s := createClusterStatefulSet(cluster, 1, "mycluster-0-0") + + got := s.Spec.Template.Spec.Affinity + if diff := cmp.Diff(antiAffinity, got, cmpopts.EquateEmpty()); diff != "" { + t.Fatalf("affinity mismatch (-want +got):\n%s", diff) + } +} + +func TestCreateClusterStatefulSet_SetsNodeSelector(t *testing.T) { + nodeSelector := map[string]string{ + "disktype": "ssd", + } + cluster := &valkeyv1.ValkeyCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "mycluster"}, + Spec: valkeyv1.ValkeyClusterSpec{ + Image: "container:version", + NodeSelector: nodeSelector, + }, + } + + s := createClusterStatefulSet(cluster, 1, "mycluster-0-0") + + assert.Equal(t, nodeSelector, s.Spec.Template.Spec.NodeSelector, "node selector should match spec") +} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 86df21a..cf17deb 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -20,6 +20,8 @@ import ( "context" "embed" "errors" + "fmt" + "reflect" "slices" "sort" "strconv" @@ -33,6 +35,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/events" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -42,13 +45,16 @@ import ( ) const ( - DefaultPort = 6379 - DefaultClusterBusPort = 16379 - DefaultImage = "valkey/valkey:9.0.0" - DefaultExporterImage = "oliver006/redis_exporter:v1.80.0" - DefaultExporterPort = 9121 - ExternalAccessLabelKey = "valkey.io/external-access" - ExternalAccessLabelValue = "nodeport" + DefaultPort = 6379 + DefaultClusterBusPort = 16379 + DefaultImage = "valkey/valkey:9.0.0" + DefaultExporterImage = "oliver006/redis_exporter:v1.80.0" + DefaultExporterPort = 9121 + ExternalAccessLabelKey = "valkey.io/external-access" + ExternalAccessLabelValue = "nodeport" + ExternalAccessTargetKey = "valkey.io/external-access-target" + ExternalAccessNodePortAnnotationKey = "valkey.io/external-node-port" + ExternalAccessBusPortAnnotationKey = "valkey.io/external-bus-port" ShardIndexLabelKey = "valkey.io/shard-index" ReplicaIndexLabelKey = "valkey.io/replica-index" @@ -104,10 +110,28 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - if err := r.upsertStatefulSet(ctx, cluster); err != nil { - setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) - _ = r.updateStatus(ctx, cluster, nil) - return ctrl.Result{}, err + if cluster.Spec.UseStatefulSetPerPod { + if err := r.upsertStatefulSets(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + if err := r.deleteDeployments(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + } else { + if err := r.upsertDeployments(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + if err := r.deleteStatefulSets(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } } if cluster.Spec.AllowExternalAccess { @@ -302,9 +326,122 @@ include /data/announce.conf`, return nil } +// Create Valkey instances with one Deployment per pod. +func (r *ValkeyClusterReconciler) upsertDeployments(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + + existing := &appsv1.DeploymentList{} + if err := r.List(ctx, existing, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list Deployments") + return err + } + + replicaSpan := int(cluster.Spec.Replicas) + 1 + expectedNames := map[string]struct{}{} + + for shard := int32(0); shard < cluster.Spec.Shards; shard++ { + for replica := 0; replica < replicaSpan; replica++ { + name := fmt.Sprintf("%s-%d-%d", cluster.Name, shard, replica) + expectedNames[name] = struct{}{} + } + } + + existingByName := map[string]*appsv1.Deployment{} + for i := range existing.Items { + deploy := &existing.Items[i] + existingByName[deploy.Name] = deploy + } + + for name := range expectedNames { + if deploy, ok := existingByName[name]; ok { + updated := false + if deploy.Spec.Replicas == nil || *deploy.Spec.Replicas != 1 { + deploy.Spec.Replicas = func(i int32) *int32 { return &i }(1) + updated = true + } + desired := createClusterDeployment(cluster, 1, name) + if deploy.Spec.Selector == nil || !reflect.DeepEqual(deploy.Spec.Selector.MatchLabels, desired.Spec.Selector.MatchLabels) { + if err := r.Delete(ctx, deploy); err != nil && !apierrors.IsNotFound(err) { + r.Recorder.Eventf(cluster, deploy, corev1.EventTypeWarning, "DeploymentDeletionFailed", "DeleteDeployment", "Failed to delete Deployment: %v", err) + return err + } + r.Recorder.Eventf(cluster, deploy, corev1.EventTypeNormal, "DeploymentRecreated", "DeleteDeployment", "Recreating Deployment %s with updated selector", deploy.Name) + continue + } + if !reflect.DeepEqual(deploy.Spec.Template.Labels, desired.Spec.Template.Labels) { + deploy.Spec.Template.Labels = desired.Spec.Template.Labels + updated = true + } + if updated { + if err := r.Update(ctx, deploy); err != nil { + r.Recorder.Eventf(cluster, deploy, corev1.EventTypeWarning, "DeploymentUpdateFailed", "UpdateDeployment", "Failed to update Deployment: %v", err) + return err + } + } + continue + } + + deployment := createClusterDeployment(cluster, 1, name) + if err := controllerutil.SetControllerReference(cluster, deployment, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, deployment); err != nil { + r.Recorder.Eventf(cluster, deployment, corev1.EventTypeWarning, "DeploymentCreationFailed", "CreateDeployment", "Failed to create Deployment: %v", err) + return err + } + r.Recorder.Eventf(cluster, deployment, corev1.EventTypeNormal, "DeploymentCreated", "CreateDeployment", "Created Deployment %s", deployment.Name) + } + + for i := range existing.Items { + deploy := &existing.Items[i] + if _, ok := expectedNames[deploy.Name]; ok { + continue + } + if err := r.Delete(ctx, deploy); err != nil && !apierrors.IsNotFound(err) { + r.Recorder.Eventf(cluster, deploy, corev1.EventTypeWarning, "DeploymentDeletionFailed", "DeleteDeployment", "Failed to delete Deployment: %v", err) + return err + } + } + + return nil +} + +func (r *ValkeyClusterReconciler) deleteDeployments(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + existing := &appsv1.DeploymentList{} + if err := r.List(ctx, existing, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list Deployments") + return err + } + for i := range existing.Items { + deploy := &existing.Items[i] + if err := r.Delete(ctx, deploy); err != nil && !apierrors.IsNotFound(err) { + r.Recorder.Eventf(cluster, deploy, corev1.EventTypeWarning, "DeploymentDeletionFailed", "DeleteDeployment", "Failed to delete Deployment: %v", err) + return err + } + } + return nil +} + +func (r *ValkeyClusterReconciler) deleteStatefulSets(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + existing := &appsv1.StatefulSetList{} + if err := r.List(ctx, existing, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list StatefulSets") + return err + } + for i := range existing.Items { + sts := &existing.Items[i] + if err := r.Delete(ctx, sts); err != nil && !apierrors.IsNotFound(err) { + r.Recorder.Eventf(cluster, sts, corev1.EventTypeWarning, "StatefulSetDeletionFailed", "DeleteStatefulSet", "Failed to delete StatefulSet: %v", err) + return err + } + } + return nil +} -// Create Valkey instances with a single StatefulSet -func (r *ValkeyClusterReconciler) upsertStatefulSet(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { +// Create Valkey instances with one StatefulSet per pod. +func (r *ValkeyClusterReconciler) upsertStatefulSets(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { log := logf.FromContext(ctx) existing := &appsv1.StatefulSetList{} @@ -313,31 +450,61 @@ func (r *ValkeyClusterReconciler) upsertStatefulSet(ctx context.Context, cluster return err } - expected := int(cluster.Spec.Shards * (1 + cluster.Spec.Replicas)) + replicaSpan := int(cluster.Spec.Replicas) + 1 + expectedNames := map[string]struct{}{} + + for shard := int32(0); shard < cluster.Spec.Shards; shard++ { + for replica := 0; replica < replicaSpan; replica++ { + name := fmt.Sprintf("%s-%d-%d", cluster.Name, shard, replica) + expectedNames[name] = struct{}{} + } + } + + existingByName := map[string]*appsv1.StatefulSet{} + for i := range existing.Items { + sts := &existing.Items[i] + existingByName[sts.Name] = sts + } + + for name := range expectedNames { + if sts, ok := existingByName[name]; ok { + updated := false + if sts.Spec.Replicas == nil || *sts.Spec.Replicas != 1 { + sts.Spec.Replicas = func(i int32) *int32 { return &i }(1) + updated = true + } + desired := createClusterStatefulSet(cluster, 1, name) + if !reflect.DeepEqual(sts.Spec.Template.Labels, desired.Spec.Template.Labels) { + sts.Spec.Template.Labels = desired.Spec.Template.Labels + updated = true + } + if updated { + if err := r.Update(ctx, sts); err != nil { + r.Recorder.Eventf(cluster, sts, corev1.EventTypeWarning, "StatefulSetUpdateFailed", "UpdateStatefulSet", "Failed to update StatefulSet: %v", err) + return err + } + } + continue + } - if len(existing.Items) == 0 { - statefulSet := createClusterStatefulSet(cluster, int32(expected)) + statefulSet := createClusterStatefulSet(cluster, 1, name) if err := controllerutil.SetControllerReference(cluster, statefulSet, r.Scheme); err != nil { return err } if err := r.Create(ctx, statefulSet); err != nil { - r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "StatefulSetCreationFailed", "Failed to create StatefulSet: %v", err) + r.Recorder.Eventf(cluster, statefulSet, corev1.EventTypeWarning, "StatefulSetCreationFailed", "CreateStatefulSet", "Failed to create StatefulSet: %v", err) return err } - r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "StatefulSetCreated", "Created StatefulSet with %d replicas", expected) - return nil + r.Recorder.Eventf(cluster, statefulSet, corev1.EventTypeNormal, "StatefulSetCreated", "CreateStatefulSet", "Created StatefulSet %s", statefulSet.Name) } - // Update existing StatefulSet replica count if needed - statefulSet := &existing.Items[0] - updated := false - if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas != int32(expected) { - statefulSet.Spec.Replicas = func(i int32) *int32 { return &i }(int32(expected)) - updated = true - } - if updated { - if err := r.Update(ctx, statefulSet); err != nil { - r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "StatefulSetUpdateFailed", "Failed to update StatefulSet: %v", err) + for i := range existing.Items { + sts := &existing.Items[i] + if _, ok := expectedNames[sts.Name]; ok { + continue + } + if err := r.Delete(ctx, sts); err != nil && !apierrors.IsNotFound(err) { + r.Recorder.Eventf(cluster, sts, corev1.EventTypeWarning, "StatefulSetDeletionFailed", "DeleteStatefulSet", "Failed to delete StatefulSet: %v", err) return err } } @@ -349,12 +516,6 @@ func (r *ValkeyClusterReconciler) upsertStatefulSet(ctx context.Context, cluster func (r *ValkeyClusterReconciler) upsertNodePortServices(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { log := logf.FromContext(ctx) - pods := &corev1.PodList{} - if err := r.List(ctx, pods, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { - log.Error(err, "failed to list Pods") - return err - } - services := &corev1.ServiceList{} if err := r.List(ctx, services); err != nil { log.Error(err, "failed to list Services") @@ -369,40 +530,128 @@ func (r *ValkeyClusterReconciler) upsertNodePortServices(ctx context.Context, cl } } - for i := range pods.Items { - pod := &pods.Items[i] - svcName := pod.Name + "-nodeport" - var nodePort int32 - var busNodePort int32 - existingSvc, hasExistingService := externalServices[svcName] - - ordinal, err := parsePodOrdinal(pod.Name) - if err != nil { - r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceCreationFailed", "Failed to parse pod ordinal: %v", err) + if cluster.Spec.UseStatefulSetPerPod { + pods := &corev1.PodList{} + if err := r.List(ctx, pods, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list Pods") return err } - base := int32(30000) - nodePort = base + int32(ordinal*2) - busNodePort = base + int32(ordinal*2+1) + + for i := range pods.Items { + pod := &pods.Items[i] + svcName := pod.Name + "-nodeport" + var nodePort int32 + var busNodePort int32 + existingSvc, hasExistingService := externalServices[svcName] + + shardIndex, replicaIndex, err := parsePodShardReplica(pod.Name) + if err != nil { + r.Recorder.Eventf(cluster, nil, corev1.EventTypeWarning, "ServiceCreationFailed", "ParsePodName", "Failed to parse pod shard/replica: %v", err) + return err + } + ordinal := shardIndex*(int(cluster.Spec.Replicas)+1) + replicaIndex + base := int32(30000) + nodePort = base + int32(ordinal*2) + busNodePort = base + int32(ordinal*2+1) + + desiredPorts := []corev1.ServicePort{ + { + Name: "valkey", + Port: DefaultPort, + TargetPort: intstr.FromInt(DefaultPort), + NodePort: nodePort, + }, + { + Name: "cluster-bus", + Port: DefaultClusterBusPort, + TargetPort: intstr.FromInt(DefaultClusterBusPort), + NodePort: busNodePort, + }, + } + + if hasExistingService { + existingSvc.Spec.Type = corev1.ServiceTypeNodePort + existingSvc.Spec.Selector = map[string]string{"statefulset.kubernetes.io/pod-name": pod.Name} + existingSvc.Spec.Ports = desiredPorts + if existingSvc.Labels == nil { + existingSvc.Labels = map[string]string{} + } + for key, value := range labels(cluster) { + existingSvc.Labels[key] = value + } + existingSvc.Labels[ExternalAccessLabelKey] = ExternalAccessLabelValue + if err := controllerutil.SetControllerReference(cluster, existingSvc, r.Scheme); err != nil { + return err + } + if err := r.Update(ctx, existingSvc); err != nil { + r.Recorder.Eventf(cluster, existingSvc, corev1.EventTypeWarning, "ServiceUpdateFailed", "UpdateService", "Failed to update external Service: %v", err) + return err + } + } else { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: svcName, + Namespace: cluster.Namespace, + Labels: labels(cluster), + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeNodePort, + Selector: map[string]string{"statefulset.kubernetes.io/pod-name": pod.Name}, + Ports: desiredPorts, + }, + } + svc.Labels[ExternalAccessLabelKey] = ExternalAccessLabelValue + if err := controllerutil.SetControllerReference(cluster, svc, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, svc); err != nil { + if !apierrors.IsAlreadyExists(err) { + r.Recorder.Eventf(cluster, svc, corev1.EventTypeWarning, "ServiceCreationFailed", "CreateService", "Failed to create external Service: %v", err) + return err + } + } else { + r.Recorder.Eventf(cluster, svc, corev1.EventTypeNormal, "ServiceCreated", "CreateService", "Created NodePort Service %s", svcName) + } + } + } + + return nil + } + + deployments := &appsv1.DeploymentList{} + if err := r.List(ctx, deployments, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list Deployments") + return err + } + + for i := range deployments.Items { + deploy := &deployments.Items[i] + svcName := deploy.Name + "-nodeport" + existingSvc, hasExistingService := externalServices[svcName] desiredPorts := []corev1.ServicePort{ { Name: "valkey", Port: DefaultPort, TargetPort: intstr.FromInt(DefaultPort), - NodePort: nodePort, }, { Name: "cluster-bus", Port: DefaultClusterBusPort, TargetPort: intstr.FromInt(DefaultClusterBusPort), - NodePort: busNodePort, }, } if hasExistingService { + for i := range desiredPorts { + for _, port := range existingSvc.Spec.Ports { + if desiredPorts[i].Name == port.Name && port.NodePort != 0 { + desiredPorts[i].NodePort = port.NodePort + } + } + } existingSvc.Spec.Type = corev1.ServiceTypeNodePort - existingSvc.Spec.Selector = map[string]string{"statefulset.kubernetes.io/pod-name": pod.Name} + existingSvc.Spec.Selector = map[string]string{ExternalAccessTargetKey: deploy.Name} existingSvc.Spec.Ports = desiredPorts if existingSvc.Labels == nil { existingSvc.Labels = map[string]string{} @@ -415,17 +664,9 @@ func (r *ValkeyClusterReconciler) upsertNodePortServices(ctx context.Context, cl return err } if err := r.Update(ctx, existingSvc); err != nil { - r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceUpdateFailed", "Failed to update external Service: %v", err) + r.Recorder.Eventf(cluster, existingSvc, corev1.EventTypeWarning, "ServiceUpdateFailed", "UpdateService", "Failed to update external Service: %v", err) return err } - for _, port := range existingSvc.Spec.Ports { - switch port.Name { - case "valkey": - nodePort = port.NodePort - case "cluster-bus": - busNodePort = port.NodePort - } - } } else { svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -435,7 +676,7 @@ func (r *ValkeyClusterReconciler) upsertNodePortServices(ctx context.Context, cl }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeNodePort, - Selector: map[string]string{"statefulset.kubernetes.io/pod-name": pod.Name}, + Selector: map[string]string{ExternalAccessTargetKey: deploy.Name}, Ports: desiredPorts, }, } @@ -445,15 +686,52 @@ func (r *ValkeyClusterReconciler) upsertNodePortServices(ctx context.Context, cl } if err := r.Create(ctx, svc); err != nil { if !apierrors.IsAlreadyExists(err) { - r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceCreationFailed", "Failed to create external Service: %v", err) + r.Recorder.Eventf(cluster, svc, corev1.EventTypeWarning, "ServiceCreationFailed", "CreateService", "Failed to create external Service: %v", err) return err } } else { - r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "ServiceCreated", "Created NodePort Service %s", svcName) + r.Recorder.Eventf(cluster, svc, corev1.EventTypeNormal, "ServiceCreated", "CreateService", "Created NodePort Service %s", svcName) } } + if !hasExistingService { + continue + } + + var nodePort int32 + var busNodePort int32 + for _, port := range existingSvc.Spec.Ports { + switch port.Name { + case "valkey": + nodePort = port.NodePort + case "cluster-bus": + busNodePort = port.NodePort + } + } + if nodePort == 0 || busNodePort == 0 { + continue + } + + if deploy.Spec.Template.Annotations == nil { + deploy.Spec.Template.Annotations = map[string]string{} + } + updated := false + if deploy.Spec.Template.Annotations[ExternalAccessNodePortAnnotationKey] != strconv.Itoa(int(nodePort)) { + deploy.Spec.Template.Annotations[ExternalAccessNodePortAnnotationKey] = strconv.Itoa(int(nodePort)) + updated = true + } + if deploy.Spec.Template.Annotations[ExternalAccessBusPortAnnotationKey] != strconv.Itoa(int(busNodePort)) { + deploy.Spec.Template.Annotations[ExternalAccessBusPortAnnotationKey] = strconv.Itoa(int(busNodePort)) + updated = true + } + if updated { + if err := r.Update(ctx, deploy); err != nil { + r.Recorder.Eventf(cluster, deploy, corev1.EventTypeWarning, "DeploymentUpdateFailed", "UpdateDeployment", "Failed to update Deployment annotations: %v", err) + return err + } + } } + return nil } @@ -470,23 +748,64 @@ func (r *ValkeyClusterReconciler) deleteNodePortServices(ctx context.Context, cl continue } if err := r.Delete(ctx, service); err != nil && !apierrors.IsNotFound(err) { - r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "ServiceDeletionFailed", "Failed to delete external Service: %v", err) + r.Recorder.Eventf(cluster, service, corev1.EventTypeWarning, "ServiceDeletionFailed", "DeleteService", "Failed to delete external Service: %v", err) return err } } + if !cluster.Spec.UseStatefulSetPerPod { + deployments := &appsv1.DeploymentList{} + if err := r.List(ctx, deployments, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list Deployments") + return err + } + for i := range deployments.Items { + deploy := &deployments.Items[i] + if deploy.Spec.Template.Annotations == nil { + continue + } + updated := false + if _, ok := deploy.Spec.Template.Annotations[ExternalAccessNodePortAnnotationKey]; ok { + delete(deploy.Spec.Template.Annotations, ExternalAccessNodePortAnnotationKey) + updated = true + } + if _, ok := deploy.Spec.Template.Annotations[ExternalAccessBusPortAnnotationKey]; ok { + delete(deploy.Spec.Template.Annotations, ExternalAccessBusPortAnnotationKey) + updated = true + } + if updated { + if err := r.Update(ctx, deploy); err != nil { + r.Recorder.Eventf(cluster, deploy, corev1.EventTypeWarning, "DeploymentUpdateFailed", "UpdateDeployment", "Failed to clear Deployment annotations: %v", err) + return err + } + } + } + } return nil } -func parsePodOrdinal(podName string) (int, error) { - lastDash := strings.LastIndex(podName, "-") - if lastDash == -1 || lastDash == len(podName)-1 { - return 0, errors.New("pod name missing ordinal suffix") +func parsePodShardReplica(podName string) (int, int, error) { + trimmed := strings.TrimSuffix(podName, "-0") + if trimmed == podName { + return 0, 0, errors.New("pod name missing statefulset ordinal suffix") + } + lastDash := strings.LastIndex(trimmed, "-") + if lastDash == -1 || lastDash == len(trimmed)-1 { + return 0, 0, errors.New("pod name missing replica suffix") + } + replica, err := strconv.Atoi(trimmed[lastDash+1:]) + if err != nil { + return 0, 0, err + } + rest := trimmed[:lastDash] + secondDash := strings.LastIndex(rest, "-") + if secondDash == -1 || secondDash == len(rest)-1 { + return 0, 0, errors.New("pod name missing shard suffix") } - ordinal, err := strconv.Atoi(podName[lastDash+1:]) + shard, err := strconv.Atoi(rest[secondDash+1:]) if err != nil { - return 0, err + return 0, 0, err } - return ordinal, nil + return shard, replica, nil } func (r *ValkeyClusterReconciler) syncPodRoleLabels(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster, state *valkey.ClusterState, pods *corev1.PodList) { @@ -567,7 +886,7 @@ func (r *ValkeyClusterReconciler) syncPodRoleLabels(ctx context.Context, cluster } if labelsUpdated { if err := r.Update(ctx, pod); err != nil { - r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "PodUpdateFailed", "Failed to update pod labels: %v", err) + r.Recorder.Eventf(cluster, pod, corev1.EventTypeWarning, "PodUpdateFailed", "UpdatePodLabels", "Failed to update pod labels: %v", err) return } } diff --git a/internal/controller/valkeycluster_controller_test.go b/internal/controller/valkeycluster_controller_test.go index 50470c5..717e32e 100644 --- a/internal/controller/valkeycluster_controller_test.go +++ b/internal/controller/valkeycluster_controller_test.go @@ -293,7 +293,7 @@ var _ = Describe("EventRecorder", func() { Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) defer func() { _ = k8sClient.Delete(ctx, cluster) }() - err := r.upsertStatefulSet(ctx, cluster) + err := r.upsertStatefulSets(ctx, cluster) Expect(err).NotTo(HaveOccurred()) events := collectEvents(fakeRecorder) @@ -365,7 +365,7 @@ var _ = Describe("EventRecorder", func() { defer func() { _ = k8sClient.Delete(ctx, cluster) }() // Trigger StatefulSet creation to verify formatted message - err := r.upsertStatefulSet(ctx, cluster) + err := r.upsertStatefulSets(ctx, cluster) Expect(err).NotTo(HaveOccurred()) events := collectEvents(fakeRecorder)