From c549bf31ee2701367200dbc3620aa84292a08757 Mon Sep 17 00:00:00 2001 From: Umar Riswan A P Date: Mon, 16 Feb 2026 23:52:12 +0530 Subject: [PATCH 1/2] feat: Add persistent volume support for ValkeyCluster - Add StorageSpec to ValkeyCluster API with enabled, size, storageClassName, and accessModes fields - Add VolumePermissions field to enable init container for volume ownership - Implement StatefulSet support for persistent storage with PVC templates - Keep Deployment support for ephemeral storage (emptyDir) - Update valkey.conf with dir /data and AOF persistence settings - Add security contexts (non-root user 1001, drop ALL capabilities) - Add init container to chown /data directory when volumePermissions=true - Update RBAC with StatefulSet and PVC permissions - Add two sample configurations: persistent and ephemeral - Update README with storage features and examples - Tested on EKS with gp3 storage class Signed-off-by: Umar Riswan A P --- README.md | 128 ++++++++++- api/v1alpha1/valkeycluster_types.go | 32 +++ api/v1alpha1/zz_generated.deepcopy.go | 26 +++ .../crd/bases/valkey.io_valkeyclusters.yaml | 31 +++ config/manager/manager.yaml | 2 +- config/rbac/role.yaml | 2 + config/samples/kustomization.yaml | 1 + config/samples/v1alpha1_valkeycluster.yaml | 8 + .../v1alpha1_valkeycluster_ephemeral.yaml | 21 ++ internal/controller/deployment.go | 215 +++++++++++++++--- .../controller/valkeycluster_controller.go | 55 ++++- 11 files changed, 487 insertions(+), 34 deletions(-) create mode 100644 config/samples/v1alpha1_valkeycluster_ephemeral.yaml diff --git a/README.md b/README.md index deb3266..b8d5c09 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,23 @@ A Kubernetes operator for deploying Valkey, Valkey Clusters and managing its lif ## Description -Valkey Operator is a Kubernetes operator that automates the deployment and management of Valkey, a secure and scalable key management solution. The operator simplifies the process of deploying Valkey on Kubernetes clusters, ensuring that it is configured correctly and operates efficiently. It provides features such as automated installation, configuration management, and lifecycle management of Valkey instances. +Valkey Operator is a Kubernetes operator that automates the deployment and management of Valkey clusters with optional persistent storage support. The operator simplifies the process of deploying Valkey on Kubernetes clusters, ensuring that it is configured correctly and operates efficiently. + +### Key Features + +- 🚀 **Automated Cluster Management** - Deploy and manage Valkey clusters with configurable shards and replicas +- 💾 **Persistent Storage Support** - Optional PersistentVolume support for data durability +- 🔒 **Security Hardened** - Non-root containers, security contexts, and volume permissions +- 📊 **Metrics Export** - Built-in Prometheus metrics exporter +- 🔄 **High Availability** - Multi-shard clusters with replica support +- ⚙️ **Flexible Configuration** - Resource limits, tolerations, node selectors, and affinity rules + +### Storage Modes + +The operator supports two storage modes: + +- **Ephemeral Storage (Default)** - Uses emptyDir for temporary storage, suitable for caching workloads +- **Persistent Storage** - Uses PersistentVolumeClaims with configurable size and storage class for production workloads > **⚠️ EARLY DEVELOPMENT NOTICE** > @@ -62,7 +78,115 @@ make deploy IMG=/valkey-operator:tag You can apply the samples (examples) from the config/sample: ```sh -kubectl apply -k config/samples/ +# Deploy with persistent storage (production) +kubectl apply -f config/samples/v1alpha1_valkeycluster.yaml + +# OR deploy with ephemeral storage (dev/test) +kubectl apply -f config/samples/v1alpha1_valkeycluster_ephemeral.yaml +``` + +### Example Configurations + +#### Persistent Storage (Production) + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-prod +spec: + shards: 3 + replicas: 2 + resources: + requests: + memory: "512Mi" + cpu: "250m" + limits: + memory: "1Gi" + cpu: "500m" + storage: + enabled: true + size: "10Gi" + storageClassName: "gp3" # AWS EBS gp3 + accessModes: + - ReadWriteOnce + volumePermissions: true # Recommended for persistent storage +``` + +#### Ephemeral Storage (Dev/Test) + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-dev +spec: + shards: 3 + replicas: 1 + resources: + requests: + memory: "256Mi" + cpu: "100m" + # storage section omitted - uses emptyDir +``` + +### Deploying to AWS EKS + +1. **Configure AWS credentials and kubectl context:** + +```sh +aws eks update-kubeconfig --region us-east-1 --name your-cluster-name +``` + +2. **Build and push the operator image:** + +```sh +# Set your Docker registry +export IMG=/valkey-operator:latest + +# Build for AMD64 (EKS) +make docker-buildx PLATFORMS=linux/amd64 + +# Or build and push separately +docker buildx build --platform linux/amd64 -t ${IMG} --push . +``` + +3. **Install CRDs and deploy the operator:** + +```sh +make install +make deploy IMG=${IMG} +``` + +4. **Create a ValkeyCluster with EBS storage:** + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-eks +spec: + shards: 3 + replicas: 2 + storage: + enabled: true + size: "20Gi" + storageClassName: "gp3" # EBS gp3 storage class + volumePermissions: true +``` + +5. **Verify deployment:** + +```sh +# Check operator pod +kubectl get pods -n valkey-operator-system + +# Check ValkeyCluster status +kubectl get valkeycluster +kubectl describe valkeycluster valkey-eks + +# Check pods and PVCs +kubectl get pods,pvc -l app.kubernetes.io/name=valkey ``` > **NOTE**: Ensure that the samples has default values to test it out. diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index f5dfad5..ddabf03 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -73,6 +73,38 @@ type ValkeyClusterSpec struct { // +kubebuilder:default:={enabled:true} // +optional Exporter ExporterSpec `json:"exporter,omitempty"` + + // PersistentVolume configuration for data persistence + // +optional + Storage StorageSpec `json:"storage,omitempty"` + + // VolumePermissions enables an init container to set proper ownership on the data volume + // Required when using persistent storage with non-root security contexts + // +kubebuilder:default=false + // +optional + VolumePermissions bool `json:"volumePermissions,omitempty"` +} + +type StorageSpec struct { + // Enable persistent storage for Valkey data + // +kubebuilder:default=false + // +optional + Enabled bool `json:"enabled,omitempty"` + + // StorageClassName is the name of the StorageClass to use for PersistentVolumeClaims + // If not specified, the default StorageClass will be used + // +optional + StorageClassName *string `json:"storageClassName,omitempty"` + + // Size of the persistent volume claim + // +kubebuilder:default="1Gi" + // +optional + Size string `json:"size,omitempty"` + + // AccessModes contains the desired access modes for the volume + // +kubebuilder:default={"ReadWriteOnce"} + // +optional + AccessModes []corev1.PersistentVolumeAccessMode `json:"accessModes,omitempty"` } type ExporterSpec struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b239c97..99dbc8c 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -42,6 +42,31 @@ func (in *ExporterSpec) DeepCopy() *ExporterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StorageSpec) DeepCopyInto(out *StorageSpec) { + *out = *in + if in.StorageClassName != nil { + in, out := &in.StorageClassName, &out.StorageClassName + *out = new(string) + **out = **in + } + if in.AccessModes != nil { + in, out := &in.AccessModes, &out.AccessModes + *out = make([]v1.PersistentVolumeAccessMode, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StorageSpec. +func (in *StorageSpec) DeepCopy() *StorageSpec { + if in == nil { + return nil + } + out := new(StorageSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ValkeyCluster) DeepCopyInto(out *ValkeyCluster) { *out = *in @@ -125,6 +150,7 @@ func (in *ValkeyClusterSpec) DeepCopyInto(out *ValkeyClusterSpec) { (*in).DeepCopyInto(*out) } in.Exporter.DeepCopyInto(&out.Exporter) + in.Storage.DeepCopyInto(&out.Storage) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeyClusterSpec. diff --git a/config/crd/bases/valkey.io_valkeyclusters.yaml b/config/crd/bases/valkey.io_valkeyclusters.yaml index f8a188c..513d782 100644 --- a/config/crd/bases/valkey.io_valkeyclusters.yaml +++ b/config/crd/bases/valkey.io_valkeyclusters.yaml @@ -1124,6 +1124,31 @@ spec: format: int32 minimum: 1 type: integer + storage: + description: PersistentVolume configuration for data persistence + properties: + accessModes: + default: + - ReadWriteOnce + description: AccessModes contains the desired access modes for + the volume + items: + type: string + type: array + enabled: + default: false + description: Enable persistent storage for Valkey data + type: boolean + size: + default: 1Gi + description: Size of the persistent volume claim + type: string + storageClassName: + description: |- + StorageClassName is the name of the StorageClass to use for PersistentVolumeClaims + If not specified, the default StorageClass will be used + type: string + type: object tolerations: description: Tolerations to apply to the pods items: @@ -1164,6 +1189,12 @@ spec: type: string type: object type: array + volumePermissions: + default: false + description: |- + VolumePermissions enables an init container to set proper ownership on the data volume + Required when using persistent storage with non-root security contexts + type: boolean type: object status: default: diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 6ef54ed..e8f6d91 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -63,7 +63,7 @@ spec: args: - --leader-elect - --health-probe-bind-address=:8081 - image: controller:latest + image: umarriswan/valkey-operator:latest name: manager ports: [] securityContext: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 578d6db..be43712 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -8,6 +8,7 @@ rules: - "" resources: - configmaps + - persistentvolumeclaims - services verbs: - create @@ -29,6 +30,7 @@ rules: - apps resources: - deployments + - statefulsets verbs: - create - delete diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index 62b8d67..c62e6e2 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -1,4 +1,5 @@ ## Append samples of your project ## resources: - v1alpha1_valkeycluster.yaml +- v1alpha1_valkeycluster_ephemeral.yaml # +kubebuilder:scaffold:manifestskustomizesamples diff --git a/config/samples/v1alpha1_valkeycluster.yaml b/config/samples/v1alpha1_valkeycluster.yaml index 540a4d1..dcf24eb 100644 --- a/config/samples/v1alpha1_valkeycluster.yaml +++ b/config/samples/v1alpha1_valkeycluster.yaml @@ -12,3 +12,11 @@ spec: limits: memory: "512Mi" cpu: "500m" + storage: + enabled: true + size: "2Gi" + # storageClassName: standard # Uncomment to specify a StorageClass + accessModes: + - ReadWriteOnce + # Enable init container to fix volume permissions (recommended for persistent storage) + volumePermissions: true diff --git a/config/samples/v1alpha1_valkeycluster_ephemeral.yaml b/config/samples/v1alpha1_valkeycluster_ephemeral.yaml new file mode 100644 index 0000000..409b1c1 --- /dev/null +++ b/config/samples/v1alpha1_valkeycluster_ephemeral.yaml @@ -0,0 +1,21 @@ +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkeycluster-ephemeral +spec: + shards: 3 + replicas: 1 + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + # Storage section omitted - uses ephemeral storage (emptyDir) + # This is suitable for: + # - Development/testing environments + # - Cache-only workloads where data loss is acceptable + # - Temporary clusters + # + # Note: Data will be lost when pods are deleted or restarted diff --git a/internal/controller/deployment.go b/internal/controller/deployment.go index 4f87a75..75b97f5 100644 --- a/internal/controller/deployment.go +++ b/internal/controller/deployment.go @@ -19,6 +19,7 @@ package controller import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" ) @@ -30,13 +31,22 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con } containers := []corev1.Container{ { - Name: "valkey-server", - Image: image, - Resources: cluster.Spec.Resources, + Name: "valkey-server", + Image: image, + ImagePullPolicy: corev1.PullIfNotPresent, + Resources: cluster.Spec.Resources, Command: []string{ "valkey-server", "/config/valkey.conf", }, + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: func(b bool) *bool { return &b }(false), + RunAsNonRoot: func(b bool) *bool { return &b }(true), + RunAsUser: func(i int64) *int64 { return &i }(1001), + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + }, Ports: []corev1.ContainerPort{ { Name: "client", @@ -105,6 +115,10 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con MountPath: "/config", ReadOnly: true, }, + { + Name: "data", + MountPath: "/data", + }, }, }, } @@ -118,6 +132,41 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con func createClusterDeployment(cluster *valkeyiov1alpha1.ValkeyCluster) *appsv1.Deployment { containers := generateContainersDef(cluster) + + volumes := []corev1.Volume{ + { + Name: "scripts", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + DefaultMode: func(i int32) *int32 { return &i }(0755), + }, + }, + }, + { + Name: "valkey-conf", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + }, + }, + }, + } + + // Add emptyDir volume for data if persistent storage is not enabled + if !cluster.Spec.Storage.Enabled { + volumes = append(volumes, corev1.Volume{ + Name: "data", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }) + } + deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ GenerateName: cluster.Name + "-", @@ -138,32 +187,146 @@ func createClusterDeployment(cluster *valkeyiov1alpha1.ValkeyCluster) *appsv1.De Affinity: cluster.Spec.Affinity, NodeSelector: cluster.Spec.NodeSelector, Tolerations: cluster.Spec.Tolerations, - Volumes: []corev1.Volume{ - { - Name: "scripts", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: cluster.Name, - }, - DefaultMode: func(i int32) *int32 { return &i }(0755), - }, - }, - }, - { - Name: "valkey-conf", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: cluster.Name, - }, - }, - }, - }, - }, + Volumes: volumes, }, }, }, } return deployment } + +func createClusterStatefulSet(cluster *valkeyiov1alpha1.ValkeyCluster) *appsv1.StatefulSet { + containers := generateContainersDef(cluster) + + // Optional init container to fix volume permissions + initContainers := []corev1.Container{} + if cluster.Spec.VolumePermissions && cluster.Spec.Storage.Enabled { + image := DefaultImage + if cluster.Spec.Image != "" { + image = cluster.Spec.Image + } + initContainers = append(initContainers, corev1.Container{ + Name: "volume-permissions", + Image: image, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "/bin/chown", + "-R", + "1001:1001", + "/data", + }, + SecurityContext: &corev1.SecurityContext{ + RunAsUser: func(i int64) *int64 { return &i }(0), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "data", + MountPath: "/data", + }, + }, + }) + } + + // Define volumeClaimTemplates for persistent storage + volumeClaimTemplates := []corev1.PersistentVolumeClaim{} + if cluster.Spec.Storage.Enabled { + size := "1Gi" + if cluster.Spec.Storage.Size != "" { + size = cluster.Spec.Storage.Size + } + + accessModes := []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce} + if len(cluster.Spec.Storage.AccessModes) > 0 { + accessModes = cluster.Spec.Storage.AccessModes + } + + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "data", + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: accessModes, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(size), + }, + }, + }, + } + + if cluster.Spec.Storage.StorageClassName != nil { + pvc.Spec.StorageClassName = cluster.Spec.Storage.StorageClassName + } + + volumeClaimTemplates = append(volumeClaimTemplates, pvc) + } + + volumes := []corev1.Volume{ + { + Name: "scripts", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + DefaultMode: func(i int32) *int32 { return &i }(0755), + }, + }, + }, + { + Name: "valkey-conf", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + }, + }, + }, + } + + // Add emptyDir volume for data if persistent storage is not enabled + if !cluster.Spec.Storage.Enabled { + volumes = append(volumes, corev1.Volume{ + Name: "data", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }) + } + + statefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: cluster.Name + "-", + Namespace: cluster.Namespace, + Labels: labels(cluster), + }, + Spec: appsv1.StatefulSetSpec{ + ServiceName: cluster.Name, + Replicas: func(i int32) *int32 { return &i }(1), + Selector: &metav1.LabelSelector{ + MatchLabels: labels(cluster), + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels(cluster), + }, + Spec: corev1.PodSpec{ + InitContainers: initContainers, + Containers: containers, + Affinity: cluster.Spec.Affinity, + NodeSelector: cluster.Spec.NodeSelector, + Tolerations: cluster.Spec.Tolerations, + Volumes: volumes, + SecurityContext: &corev1.PodSecurityContext{ + FSGroup: func(i int64) *int64 { return &i }(1001), + RunAsUser: func(i int64) *int64 { return &i }(1001), + RunAsGroup: func(i int64) *int64 { return &i }(1001), + }, + }, + }, + VolumeClaimTemplates: volumeClaimTemplates, + }, + } + return statefulSet +} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 9ed9338..602a1d3 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -67,6 +67,8 @@ var scripts embed.FS // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -95,10 +97,19 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - if err := r.upsertDeployments(ctx, cluster); err != nil { - setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) - _ = r.updateStatus(ctx, cluster, nil) - return ctrl.Result{}, err + // Use StatefulSets if persistent storage is enabled, otherwise use Deployments + if cluster.Spec.Storage.Enabled { + if err := r.upsertStatefulSets(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + } else { + if err := r.upsertDeployments(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } } // Get all pods and their current Valkey Cluster state @@ -249,7 +260,11 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, cluster * "valkey.conf": ` cluster-enabled yes protected-mode no -cluster-node-timeout 2000`, +cluster-node-timeout 2000 +dir /data +appendonly yes +appendfilename "appendonly.aof" +appendfsync everysec`, }, } if err := controllerutil.SetControllerReference(cluster, cm, r.Scheme); err != nil { @@ -301,6 +316,36 @@ func (r *ValkeyClusterReconciler) upsertDeployments(ctx context.Context, cluster return nil } +// Create Valkey instances using StatefulSets with persistent volumes +func (r *ValkeyClusterReconciler) upsertStatefulSets(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + + existing := &appsv1.StatefulSetList{} + if err := r.List(ctx, existing, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list StatefulSets") + return err + } + + expected := int(cluster.Spec.Shards * (1 + cluster.Spec.Replicas)) + + // Create missing statefulsets + for i := len(existing.Items); i < expected; i++ { + statefulSet := createClusterStatefulSet(cluster) + if err := controllerutil.SetControllerReference(cluster, statefulSet, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, statefulSet); err != nil { + r.Recorder.Eventf(cluster, statefulSet, corev1.EventTypeWarning, "StatefulSetCreationFailed", "CreateStatefulSet", "Failed to create statefulset: %v", err) + return err + } + r.Recorder.Eventf(cluster, statefulSet, corev1.EventTypeNormal, "StatefulSetCreated", "CreateStatefulSet", "Created statefulset %d of %d", i+1, expected) + } + + // TODO: update existing + + return nil +} + func (r *ValkeyClusterReconciler) getValkeyClusterState(ctx context.Context, pods *corev1.PodList) *valkey.ClusterState { // Create a list of addresses to possible Valkey nodes ips := []string{} From 75b6d18c13dc7d5d245882958a5308a3e78762f3 Mon Sep 17 00:00:00 2001 From: Umar Riswan A P Date: Tue, 17 Feb 2026 21:08:01 +0530 Subject: [PATCH 2/2] feat: Add online volume expansion support for persistent storage Implements automatic PVC expansion when storage size is increased in ValkeyCluster spec. Features: - Detects storage size changes and patches PVCs automatically - Validates StorageClass supports allowVolumeExpansion - Monitors expansion progress with status conditions - Zero downtime - cluster remains operational during expansion - Emits events for expansion stages (VolumeExpanding/VolumeExpanded) - Requeues every 10s during expansion to monitor progress Changes: - Added internal/controller/volume_expansion.go with expansion logic - Updated valkeycluster_controller.go to integrate expansion checks - Added ReasonVolumeExpanding and ReasonVolumeExpanded status reasons - Added StorageClass RBAC permissions (storage.k8s.io) - Created comprehensive documentation and example manifests Tested: - Successfully expanded 12 PVCs from 2Gi to 5Gi on EKS cluster - AWS EBS gp3 volumes with allowVolumeExpansion enabled - Verified zero downtime and data integrity Closes: Volume expansion feature request Signed-off-by: Umar Riswan A P --- api/v1alpha1/valkeycluster_types.go | 2 + config/rbac/role.yaml | 8 + ...alpha1_valkeycluster_volume_expansion.yaml | 45 ++++ docs/volume-expansion-example.md | 215 ++++++++++++++++++ .../controller/valkeycluster_controller.go | 21 ++ internal/controller/volume_expansion.go | 208 +++++++++++++++++ 6 files changed, 499 insertions(+) create mode 100644 config/samples/v1alpha1_valkeycluster_volume_expansion.yaml create mode 100644 docs/volume-expansion-example.md create mode 100644 internal/controller/volume_expansion.go diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index ddabf03..c4efca2 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -186,6 +186,8 @@ const ( ReasonSlotsUnassigned = "SlotsUnassigned" ReasonPrimaryLost = "PrimaryLost" ReasonNoSlots = "NoSlotsAvailable" + ReasonVolumeExpanding = "VolumeExpanding" + ReasonVolumeExpanded = "VolumeExpanded" ) // +kubebuilder:object:root=true diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index be43712..2804b76 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -46,6 +46,14 @@ rules: verbs: - create - patch +- apiGroups: + - storage.k8s.io + resources: + - storageclasses + verbs: + - get + - list + - watch - apiGroups: - valkey.io resources: diff --git a/config/samples/v1alpha1_valkeycluster_volume_expansion.yaml b/config/samples/v1alpha1_valkeycluster_volume_expansion.yaml new file mode 100644 index 0000000..b7a0bfc --- /dev/null +++ b/config/samples/v1alpha1_valkeycluster_volume_expansion.yaml @@ -0,0 +1,45 @@ +--- +# Initial cluster with 5Gi storage +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-volume-expansion-demo + namespace: default +spec: + shards: 2 + replicas: 1 + storage: + enabled: true + size: "5Gi" + # Adjust this to your storage class that supports expansion + # Run: kubectl get sc + storageClassName: "standard" + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "200m" +--- +# To expand the storage, apply this updated manifest +# Change the size from 5Gi to 10Gi +# apiVersion: valkey.io/v1alpha1 +# kind: ValkeyCluster +# metadata: +# name: valkey-volume-expansion-demo +# namespace: default +# spec: +# shards: 2 +# replicas: 1 +# storage: +# enabled: true +# size: "10Gi" # <-- Changed from 5Gi to 10Gi +# storageClassName: "standard" +# resources: +# requests: +# memory: "256Mi" +# cpu: "100m" +# limits: +# memory: "512Mi" +# cpu: "200m" diff --git a/docs/volume-expansion-example.md b/docs/volume-expansion-example.md new file mode 100644 index 0000000..6ac2b5b --- /dev/null +++ b/docs/volume-expansion-example.md @@ -0,0 +1,215 @@ +# Volume Expansion Example + +This example demonstrates how to expand the persistent storage for a ValkeyCluster using online volume expansion. + +## Prerequisites + +Before attempting volume expansion, ensure: + +1. **Storage Class supports expansion**: Your StorageClass must have `allowVolumeExpansion: true` + ```bash + kubectl get storageclass -o yaml + ``` + +2. **Backup your data**: Always backup your data before performing volume expansion + +3. **Check underlying storage provider**: Ensure your storage provider (e.g., AWS EBS, GCE PD, Azure Disk) supports online volume expansion + +## Initial Cluster with 5Gi Storage + +Create a ValkeyCluster with 5Gi of storage: + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-expandable + namespace: default +spec: + shards: 3 + replicas: 1 + storage: + enabled: true + size: "5Gi" + storageClassName: "standard" # Use your storage class that supports expansion +``` + +Apply the manifest: +```bash +kubectl apply -f valkey-cluster-5gi.yaml +``` + +Wait for the cluster to be ready: +```bash +kubectl wait --for=condition=Ready valkeyCluster/valkey-expandable --timeout=5m +``` + +## Expanding Storage to 10Gi + +To expand the storage, simply update the `spec.storage.size` field: + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-expandable + namespace: default +spec: + shards: 3 + replicas: 1 + storage: + enabled: true + size: "10Gi" # Changed from 5Gi to 10Gi + storageClassName: "standard" +``` + +Apply the updated manifest: +```bash +kubectl apply -f valkey-cluster-10gi.yaml +``` + +## Monitoring the Expansion + +### Check Operator Events +```bash +kubectl get events --sort-by='.lastTimestamp' | grep VolumeExpan +``` + +You should see events like: +- `VolumeExpanding`: Expansion has been initiated +- `VolumeExpanded`: Expansion completed successfully + +### Check PVC Status +```bash +kubectl get pvc -l valkey.io/cluster=valkey-expandable +``` + +Look for PVCs in `Resizing` state or check conditions: +```bash +kubectl describe pvc +``` + +### Check Cluster Status +```bash +kubectl get valkeyCluster valkey-expandable -o jsonpath='{.status.conditions[?(@.type=="Progressing")]}' +``` + +During expansion, you'll see a condition with reason `VolumeExpanding`. + +### Watch Cluster State +```bash +watch kubectl get valkeyCluster valkey-expandable +``` + +## Important Notes + +### ✅ Supported Operations +- **Expanding volumes**: Increase storage size from smaller to larger values +- **Online expansion**: No downtime required (depends on storage provider) +- **Mixed units**: Can change from `5G` to `5Gi` if the size is equivalent + +### ❌ Unsupported Operations +- **Shrinking volumes**: Reducing storage size is **not supported** by Kubernetes +- **Changing storage class**: Cannot change the StorageClass of existing PVCs + +### Best Practices + +1. **One change at a time**: Don't apply other cluster changes simultaneously with volume expansion + +2. **Consistent size units**: Use consistent size definitions (`Gi` or `G`) + - `1Gi` = 1024³ bytes (binary) + - `1G` = 1000³ bytes (decimal) + +3. **Monitor the process**: Watch cluster events and PVC status during expansion + +4. **Plan capacity**: Expand proactively before running out of space + +5. **Test first**: Test volume expansion in a non-production environment first + +## Troubleshooting + +### Expansion Fails + +If expansion fails, check: + +1. **StorageClass configuration**: + ```bash + kubectl get sc -o yaml | grep allowVolumeExpansion + ``` + Should be `true`. + +2. **Storage provider support**: Verify your cloud provider or storage system supports expansion + +3. **PVC conditions**: + ```bash + kubectl describe pvc + ``` + Look for error messages in the conditions. + +4. **Operator logs**: + ```bash + kubectl logs -n valkey-operator-system deployment/valkey-operator-controller-manager -f + ``` + +### Expansion Stuck + +If expansion appears stuck: + +1. Check if filesystem resize is pending: + ```bash + kubectl get pvc -o jsonpath='{.status.conditions[?(@.type=="FileSystemResizePending")]}' + ``` + +2. Check pod status: The pod may need to be restarted for filesystem expansion: + ```bash + kubectl get pods -l valkey.io/cluster=valkey-expandable + ``` + +3. Check storage provider: Some providers require manual intervention for certain volume types + +## Example: Complete Workflow + +```bash +# 1. Create cluster with 5Gi +cat < 0 { + needsExpansion = true + pvcToExpand = append(pvcToExpand, pvc) + log.Info("PVC needs expansion", "pvc", pvc.Name, "currentSize", (¤tSize).String(), "desiredSize", desiredSize) + } else if desiredQuantity.Cmp(currentSize) < 0 { + // Size reduction is not supported + log.Info("Storage size reduction is not supported", "pvc", pvc.Name, "currentSize", (¤tSize).String(), "requestedSize", desiredSize) + r.Recorder.Eventf(cluster, &pvc, corev1.EventTypeWarning, "VolumeExpansionNotSupported", "VolumeExpansion", + "Cannot reduce storage size from %s to %s for PVC %s - shrinking is not supported", + (¤tSize).String(), desiredSize, pvc.Name) + } + } + + if !needsExpansion { + // All PVCs are already at the desired size + return nil + } + + // Verify that the storage class supports volume expansion + if len(pvcToExpand) > 0 { + storageClassName := pvcToExpand[0].Spec.StorageClassName + if storageClassName != nil && *storageClassName != "" { + if err := r.verifyStorageClassSupportsExpansion(ctx, *storageClassName); err != nil { + log.Error(err, "storage class does not support volume expansion", "storageClass", *storageClassName) + r.Recorder.Eventf(cluster, &pvcToExpand[0], corev1.EventTypeWarning, "VolumeExpansionNotSupported", "VolumeExpansion", + "Storage class %s does not support volume expansion: %v", *storageClassName, err) + return err + } + } + } + + // Perform expansion on all PVCs that need it + expandedCount := 0 + for i := range pvcToExpand { + pvc := &pvcToExpand[i] + currentSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + + log.Info("Expanding PVC", "pvc", pvc.Name, "newSize", desiredSize) + r.Recorder.Eventf(cluster, pvc, corev1.EventTypeNormal, "VolumeExpanding", "VolumeExpansion", + "Expanding PVC %s from %s to %s", + pvc.Name, currentSize.String(), desiredSize) + + // Update the PVC with the new size + pvc.Spec.Resources.Requests[corev1.ResourceStorage] = desiredQuantity + if err := r.Update(ctx, pvc); err != nil { + log.Error(err, "failed to update PVC", "pvc", pvc.Name) + r.Recorder.Eventf(cluster, pvc, corev1.EventTypeWarning, "VolumeExpansionFailed", "VolumeExpansion", + "Failed to expand PVC %s: %v", pvc.Name, err) + return err + } + + expandedCount++ + r.Recorder.Eventf(cluster, pvc, corev1.EventTypeNormal, "VolumeExpanded", "VolumeExpansion", + "Successfully initiated expansion of PVC %s to %s", pvc.Name, desiredSize) + } + + if expandedCount > 0 { + log.Info("Volume expansion initiated", "pvcCount", expandedCount, "newSize", desiredSize) + setCondition(cluster, valkeyiov1alpha1.ConditionProgressing, valkeyiov1alpha1.ReasonVolumeExpanding, + fmt.Sprintf("Expanding %d volumes to %s", expandedCount, desiredSize), metav1.ConditionTrue) + } + + return nil +} + +// verifyStorageClassSupportsExpansion checks if the storage class supports volume expansion +func (r *ValkeyClusterReconciler) verifyStorageClassSupportsExpansion(ctx context.Context, storageClassName string) error { + log := logf.FromContext(ctx) + + storageClass := &storagev1.StorageClass{} + if err := r.Get(ctx, client.ObjectKey{Name: storageClassName}, storageClass); err != nil { + return fmt.Errorf("failed to get storage class: %w", err) + } + + if storageClass.AllowVolumeExpansion == nil || !*storageClass.AllowVolumeExpansion { + log.Info("Storage class does not support volume expansion", "storageClass", storageClassName) + return fmt.Errorf("storage class %s does not have allowVolumeExpansion enabled", storageClassName) + } + + log.V(1).Info("Storage class supports volume expansion", "storageClass", storageClassName) + return nil +} + +// checkVolumeExpansionStatus monitors the status of ongoing volume expansions +func (r *ValkeyClusterReconciler) checkVolumeExpansionStatus(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) (bool, error) { + log := logf.FromContext(ctx) + + // Only proceed if storage is enabled + if !cluster.Spec.Storage.Enabled { + return true, nil + } + + // Get all PVCs for this cluster + pvcList := &corev1.PersistentVolumeClaimList{} + if err := r.List(ctx, pvcList, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list PVCs") + return false, err + } + + allCompleted := true + expandingCount := 0 + + for _, pvc := range pvcList.Items { + // Check if PVC is being resized + for _, condition := range pvc.Status.Conditions { + if condition.Type == corev1.PersistentVolumeClaimResizing { + if condition.Status == corev1.ConditionTrue { + allCompleted = false + expandingCount++ + log.V(1).Info("PVC expansion in progress", "pvc", pvc.Name) + } + } + if condition.Type == corev1.PersistentVolumeClaimFileSystemResizePending { + if condition.Status == corev1.ConditionTrue { + allCompleted = false + expandingCount++ + log.V(1).Info("PVC filesystem resize pending", "pvc", pvc.Name) + } + } + } + + // Compare spec vs status to detect pending expansions + specSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + statusSize := pvc.Status.Capacity[corev1.ResourceStorage] + if specSize.Cmp(statusSize) > 0 { + allCompleted = false + expandingCount++ + } + } + + if expandingCount > 0 { + log.Info("Volume expansions still in progress", "count", expandingCount) + } + + return allCompleted, nil +}