diff --git a/README.md b/README.md index deb3266..b8d5c09 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,23 @@ A Kubernetes operator for deploying Valkey, Valkey Clusters and managing its lif ## Description -Valkey Operator is a Kubernetes operator that automates the deployment and management of Valkey, a secure and scalable key management solution. The operator simplifies the process of deploying Valkey on Kubernetes clusters, ensuring that it is configured correctly and operates efficiently. It provides features such as automated installation, configuration management, and lifecycle management of Valkey instances. +Valkey Operator is a Kubernetes operator that automates the deployment and management of Valkey clusters with optional persistent storage support. The operator simplifies the process of deploying Valkey on Kubernetes clusters, ensuring that it is configured correctly and operates efficiently. + +### Key Features + +- 🚀 **Automated Cluster Management** - Deploy and manage Valkey clusters with configurable shards and replicas +- 💾 **Persistent Storage Support** - Optional PersistentVolume support for data durability +- 🔒 **Security Hardened** - Non-root containers, security contexts, and volume permissions +- 📊 **Metrics Export** - Built-in Prometheus metrics exporter +- 🔄 **High Availability** - Multi-shard clusters with replica support +- ⚙️ **Flexible Configuration** - Resource limits, tolerations, node selectors, and affinity rules + +### Storage Modes + +The operator supports two storage modes: + +- **Ephemeral Storage (Default)** - Uses emptyDir for temporary storage, suitable for caching workloads +- **Persistent Storage** - Uses PersistentVolumeClaims with configurable size and storage class for production workloads > **⚠️ EARLY DEVELOPMENT NOTICE** > @@ -62,7 +78,115 @@ make deploy IMG=/valkey-operator:tag You can apply the samples (examples) from the config/sample: ```sh -kubectl apply -k config/samples/ +# Deploy with persistent storage (production) +kubectl apply -f config/samples/v1alpha1_valkeycluster.yaml + +# OR deploy with ephemeral storage (dev/test) +kubectl apply -f config/samples/v1alpha1_valkeycluster_ephemeral.yaml +``` + +### Example Configurations + +#### Persistent Storage (Production) + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-prod +spec: + shards: 3 + replicas: 2 + resources: + requests: + memory: "512Mi" + cpu: "250m" + limits: + memory: "1Gi" + cpu: "500m" + storage: + enabled: true + size: "10Gi" + storageClassName: "gp3" # AWS EBS gp3 + accessModes: + - ReadWriteOnce + volumePermissions: true # Recommended for persistent storage +``` + +#### Ephemeral Storage (Dev/Test) + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-dev +spec: + shards: 3 + replicas: 1 + resources: + requests: + memory: "256Mi" + cpu: "100m" + # storage section omitted - uses emptyDir +``` + +### Deploying to AWS EKS + +1. **Configure AWS credentials and kubectl context:** + +```sh +aws eks update-kubeconfig --region us-east-1 --name your-cluster-name +``` + +2. **Build and push the operator image:** + +```sh +# Set your Docker registry +export IMG=/valkey-operator:latest + +# Build for AMD64 (EKS) +make docker-buildx PLATFORMS=linux/amd64 + +# Or build and push separately +docker buildx build --platform linux/amd64 -t ${IMG} --push . +``` + +3. **Install CRDs and deploy the operator:** + +```sh +make install +make deploy IMG=${IMG} +``` + +4. **Create a ValkeyCluster with EBS storage:** + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-eks +spec: + shards: 3 + replicas: 2 + storage: + enabled: true + size: "20Gi" + storageClassName: "gp3" # EBS gp3 storage class + volumePermissions: true +``` + +5. **Verify deployment:** + +```sh +# Check operator pod +kubectl get pods -n valkey-operator-system + +# Check ValkeyCluster status +kubectl get valkeycluster +kubectl describe valkeycluster valkey-eks + +# Check pods and PVCs +kubectl get pods,pvc -l app.kubernetes.io/name=valkey ``` > **NOTE**: Ensure that the samples has default values to test it out. diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index f5dfad5..c4efca2 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -73,6 +73,38 @@ type ValkeyClusterSpec struct { // +kubebuilder:default:={enabled:true} // +optional Exporter ExporterSpec `json:"exporter,omitempty"` + + // PersistentVolume configuration for data persistence + // +optional + Storage StorageSpec `json:"storage,omitempty"` + + // VolumePermissions enables an init container to set proper ownership on the data volume + // Required when using persistent storage with non-root security contexts + // +kubebuilder:default=false + // +optional + VolumePermissions bool `json:"volumePermissions,omitempty"` +} + +type StorageSpec struct { + // Enable persistent storage for Valkey data + // +kubebuilder:default=false + // +optional + Enabled bool `json:"enabled,omitempty"` + + // StorageClassName is the name of the StorageClass to use for PersistentVolumeClaims + // If not specified, the default StorageClass will be used + // +optional + StorageClassName *string `json:"storageClassName,omitempty"` + + // Size of the persistent volume claim + // +kubebuilder:default="1Gi" + // +optional + Size string `json:"size,omitempty"` + + // AccessModes contains the desired access modes for the volume + // +kubebuilder:default={"ReadWriteOnce"} + // +optional + AccessModes []corev1.PersistentVolumeAccessMode `json:"accessModes,omitempty"` } type ExporterSpec struct { @@ -154,6 +186,8 @@ const ( ReasonSlotsUnassigned = "SlotsUnassigned" ReasonPrimaryLost = "PrimaryLost" ReasonNoSlots = "NoSlotsAvailable" + ReasonVolumeExpanding = "VolumeExpanding" + ReasonVolumeExpanded = "VolumeExpanded" ) // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b239c97..99dbc8c 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -42,6 +42,31 @@ func (in *ExporterSpec) DeepCopy() *ExporterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StorageSpec) DeepCopyInto(out *StorageSpec) { + *out = *in + if in.StorageClassName != nil { + in, out := &in.StorageClassName, &out.StorageClassName + *out = new(string) + **out = **in + } + if in.AccessModes != nil { + in, out := &in.AccessModes, &out.AccessModes + *out = make([]v1.PersistentVolumeAccessMode, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StorageSpec. +func (in *StorageSpec) DeepCopy() *StorageSpec { + if in == nil { + return nil + } + out := new(StorageSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ValkeyCluster) DeepCopyInto(out *ValkeyCluster) { *out = *in @@ -125,6 +150,7 @@ func (in *ValkeyClusterSpec) DeepCopyInto(out *ValkeyClusterSpec) { (*in).DeepCopyInto(*out) } in.Exporter.DeepCopyInto(&out.Exporter) + in.Storage.DeepCopyInto(&out.Storage) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeyClusterSpec. diff --git a/config/crd/bases/valkey.io_valkeyclusters.yaml b/config/crd/bases/valkey.io_valkeyclusters.yaml index f8a188c..513d782 100644 --- a/config/crd/bases/valkey.io_valkeyclusters.yaml +++ b/config/crd/bases/valkey.io_valkeyclusters.yaml @@ -1124,6 +1124,31 @@ spec: format: int32 minimum: 1 type: integer + storage: + description: PersistentVolume configuration for data persistence + properties: + accessModes: + default: + - ReadWriteOnce + description: AccessModes contains the desired access modes for + the volume + items: + type: string + type: array + enabled: + default: false + description: Enable persistent storage for Valkey data + type: boolean + size: + default: 1Gi + description: Size of the persistent volume claim + type: string + storageClassName: + description: |- + StorageClassName is the name of the StorageClass to use for PersistentVolumeClaims + If not specified, the default StorageClass will be used + type: string + type: object tolerations: description: Tolerations to apply to the pods items: @@ -1164,6 +1189,12 @@ spec: type: string type: object type: array + volumePermissions: + default: false + description: |- + VolumePermissions enables an init container to set proper ownership on the data volume + Required when using persistent storage with non-root security contexts + type: boolean type: object status: default: diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 6ef54ed..e8f6d91 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -63,7 +63,7 @@ spec: args: - --leader-elect - --health-probe-bind-address=:8081 - image: controller:latest + image: umarriswan/valkey-operator:latest name: manager ports: [] securityContext: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 578d6db..2804b76 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -8,6 +8,7 @@ rules: - "" resources: - configmaps + - persistentvolumeclaims - services verbs: - create @@ -29,6 +30,7 @@ rules: - apps resources: - deployments + - statefulsets verbs: - create - delete @@ -44,6 +46,14 @@ rules: verbs: - create - patch +- apiGroups: + - storage.k8s.io + resources: + - storageclasses + verbs: + - get + - list + - watch - apiGroups: - valkey.io resources: diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index 62b8d67..c62e6e2 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -1,4 +1,5 @@ ## Append samples of your project ## resources: - v1alpha1_valkeycluster.yaml +- v1alpha1_valkeycluster_ephemeral.yaml # +kubebuilder:scaffold:manifestskustomizesamples diff --git a/config/samples/v1alpha1_valkeycluster.yaml b/config/samples/v1alpha1_valkeycluster.yaml index 540a4d1..dcf24eb 100644 --- a/config/samples/v1alpha1_valkeycluster.yaml +++ b/config/samples/v1alpha1_valkeycluster.yaml @@ -12,3 +12,11 @@ spec: limits: memory: "512Mi" cpu: "500m" + storage: + enabled: true + size: "2Gi" + # storageClassName: standard # Uncomment to specify a StorageClass + accessModes: + - ReadWriteOnce + # Enable init container to fix volume permissions (recommended for persistent storage) + volumePermissions: true diff --git a/config/samples/v1alpha1_valkeycluster_ephemeral.yaml b/config/samples/v1alpha1_valkeycluster_ephemeral.yaml new file mode 100644 index 0000000..409b1c1 --- /dev/null +++ b/config/samples/v1alpha1_valkeycluster_ephemeral.yaml @@ -0,0 +1,21 @@ +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkeycluster-ephemeral +spec: + shards: 3 + replicas: 1 + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + # Storage section omitted - uses ephemeral storage (emptyDir) + # This is suitable for: + # - Development/testing environments + # - Cache-only workloads where data loss is acceptable + # - Temporary clusters + # + # Note: Data will be lost when pods are deleted or restarted diff --git a/config/samples/v1alpha1_valkeycluster_volume_expansion.yaml b/config/samples/v1alpha1_valkeycluster_volume_expansion.yaml new file mode 100644 index 0000000..b7a0bfc --- /dev/null +++ b/config/samples/v1alpha1_valkeycluster_volume_expansion.yaml @@ -0,0 +1,45 @@ +--- +# Initial cluster with 5Gi storage +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-volume-expansion-demo + namespace: default +spec: + shards: 2 + replicas: 1 + storage: + enabled: true + size: "5Gi" + # Adjust this to your storage class that supports expansion + # Run: kubectl get sc + storageClassName: "standard" + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "200m" +--- +# To expand the storage, apply this updated manifest +# Change the size from 5Gi to 10Gi +# apiVersion: valkey.io/v1alpha1 +# kind: ValkeyCluster +# metadata: +# name: valkey-volume-expansion-demo +# namespace: default +# spec: +# shards: 2 +# replicas: 1 +# storage: +# enabled: true +# size: "10Gi" # <-- Changed from 5Gi to 10Gi +# storageClassName: "standard" +# resources: +# requests: +# memory: "256Mi" +# cpu: "100m" +# limits: +# memory: "512Mi" +# cpu: "200m" diff --git a/docs/volume-expansion-example.md b/docs/volume-expansion-example.md new file mode 100644 index 0000000..6ac2b5b --- /dev/null +++ b/docs/volume-expansion-example.md @@ -0,0 +1,215 @@ +# Volume Expansion Example + +This example demonstrates how to expand the persistent storage for a ValkeyCluster using online volume expansion. + +## Prerequisites + +Before attempting volume expansion, ensure: + +1. **Storage Class supports expansion**: Your StorageClass must have `allowVolumeExpansion: true` + ```bash + kubectl get storageclass -o yaml + ``` + +2. **Backup your data**: Always backup your data before performing volume expansion + +3. **Check underlying storage provider**: Ensure your storage provider (e.g., AWS EBS, GCE PD, Azure Disk) supports online volume expansion + +## Initial Cluster with 5Gi Storage + +Create a ValkeyCluster with 5Gi of storage: + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-expandable + namespace: default +spec: + shards: 3 + replicas: 1 + storage: + enabled: true + size: "5Gi" + storageClassName: "standard" # Use your storage class that supports expansion +``` + +Apply the manifest: +```bash +kubectl apply -f valkey-cluster-5gi.yaml +``` + +Wait for the cluster to be ready: +```bash +kubectl wait --for=condition=Ready valkeyCluster/valkey-expandable --timeout=5m +``` + +## Expanding Storage to 10Gi + +To expand the storage, simply update the `spec.storage.size` field: + +```yaml +apiVersion: valkey.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkey-expandable + namespace: default +spec: + shards: 3 + replicas: 1 + storage: + enabled: true + size: "10Gi" # Changed from 5Gi to 10Gi + storageClassName: "standard" +``` + +Apply the updated manifest: +```bash +kubectl apply -f valkey-cluster-10gi.yaml +``` + +## Monitoring the Expansion + +### Check Operator Events +```bash +kubectl get events --sort-by='.lastTimestamp' | grep VolumeExpan +``` + +You should see events like: +- `VolumeExpanding`: Expansion has been initiated +- `VolumeExpanded`: Expansion completed successfully + +### Check PVC Status +```bash +kubectl get pvc -l valkey.io/cluster=valkey-expandable +``` + +Look for PVCs in `Resizing` state or check conditions: +```bash +kubectl describe pvc +``` + +### Check Cluster Status +```bash +kubectl get valkeyCluster valkey-expandable -o jsonpath='{.status.conditions[?(@.type=="Progressing")]}' +``` + +During expansion, you'll see a condition with reason `VolumeExpanding`. + +### Watch Cluster State +```bash +watch kubectl get valkeyCluster valkey-expandable +``` + +## Important Notes + +### ✅ Supported Operations +- **Expanding volumes**: Increase storage size from smaller to larger values +- **Online expansion**: No downtime required (depends on storage provider) +- **Mixed units**: Can change from `5G` to `5Gi` if the size is equivalent + +### ❌ Unsupported Operations +- **Shrinking volumes**: Reducing storage size is **not supported** by Kubernetes +- **Changing storage class**: Cannot change the StorageClass of existing PVCs + +### Best Practices + +1. **One change at a time**: Don't apply other cluster changes simultaneously with volume expansion + +2. **Consistent size units**: Use consistent size definitions (`Gi` or `G`) + - `1Gi` = 1024³ bytes (binary) + - `1G` = 1000³ bytes (decimal) + +3. **Monitor the process**: Watch cluster events and PVC status during expansion + +4. **Plan capacity**: Expand proactively before running out of space + +5. **Test first**: Test volume expansion in a non-production environment first + +## Troubleshooting + +### Expansion Fails + +If expansion fails, check: + +1. **StorageClass configuration**: + ```bash + kubectl get sc -o yaml | grep allowVolumeExpansion + ``` + Should be `true`. + +2. **Storage provider support**: Verify your cloud provider or storage system supports expansion + +3. **PVC conditions**: + ```bash + kubectl describe pvc + ``` + Look for error messages in the conditions. + +4. **Operator logs**: + ```bash + kubectl logs -n valkey-operator-system deployment/valkey-operator-controller-manager -f + ``` + +### Expansion Stuck + +If expansion appears stuck: + +1. Check if filesystem resize is pending: + ```bash + kubectl get pvc -o jsonpath='{.status.conditions[?(@.type=="FileSystemResizePending")]}' + ``` + +2. Check pod status: The pod may need to be restarted for filesystem expansion: + ```bash + kubectl get pods -l valkey.io/cluster=valkey-expandable + ``` + +3. Check storage provider: Some providers require manual intervention for certain volume types + +## Example: Complete Workflow + +```bash +# 1. Create cluster with 5Gi +cat < 0 { + accessModes = cluster.Spec.Storage.AccessModes + } + + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "data", + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: accessModes, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(size), + }, + }, + }, + } + + if cluster.Spec.Storage.StorageClassName != nil { + pvc.Spec.StorageClassName = cluster.Spec.Storage.StorageClassName + } + + volumeClaimTemplates = append(volumeClaimTemplates, pvc) + } + + volumes := []corev1.Volume{ + { + Name: "scripts", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + DefaultMode: func(i int32) *int32 { return &i }(0755), + }, + }, + }, + { + Name: "valkey-conf", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cluster.Name, + }, + }, + }, + }, + } + + // Add emptyDir volume for data if persistent storage is not enabled + if !cluster.Spec.Storage.Enabled { + volumes = append(volumes, corev1.Volume{ + Name: "data", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }) + } + + statefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: cluster.Name + "-", + Namespace: cluster.Namespace, + Labels: labels(cluster), + }, + Spec: appsv1.StatefulSetSpec{ + ServiceName: cluster.Name, + Replicas: func(i int32) *int32 { return &i }(1), + Selector: &metav1.LabelSelector{ + MatchLabels: labels(cluster), + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels(cluster), + }, + Spec: corev1.PodSpec{ + InitContainers: initContainers, + Containers: containers, + Affinity: cluster.Spec.Affinity, + NodeSelector: cluster.Spec.NodeSelector, + Tolerations: cluster.Spec.Tolerations, + Volumes: volumes, + SecurityContext: &corev1.PodSecurityContext{ + FSGroup: func(i int64) *int64 { return &i }(1001), + RunAsUser: func(i int64) *int64 { return &i }(1001), + RunAsGroup: func(i int64) *int64 { return &i }(1001), + }, + }, + }, + VolumeClaimTemplates: volumeClaimTemplates, + }, + } + return statefulSet +} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 12641e4..a329ec4 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -67,6 +67,9 @@ var scripts embed.FS // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="storage.k8s.io",resources=storageclasses,verbs=get;list;watch // +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;patch // Reconcile is the main reconciliation loop. On each invocation it drives the @@ -112,10 +115,39 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } - if err := r.upsertDeployments(ctx, cluster); err != nil { - setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) - _ = r.updateStatus(ctx, cluster, nil) - return ctrl.Result{}, err + // Use StatefulSets if persistent storage is enabled, otherwise use Deployments + if cluster.Spec.Storage.Enabled { + if err := r.upsertStatefulSets(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } + + // Handle volume expansion if storage size has changed + if err := r.handleVolumeExpansion(ctx, cluster); err != nil { + log.Error(err, "volume expansion failed") + r.Recorder.Eventf(cluster, nil, corev1.EventTypeWarning, "VolumeExpansionFailed", "VolumeExpansion", "Volume expansion failed: %v", err) + // Don't return error - allow reconciliation to continue + // The expansion will be retried on the next reconciliation + } + + // Check if volume expansion is in progress + expansionComplete, err := r.checkVolumeExpansionStatus(ctx, cluster) + if err != nil { + log.Error(err, "failed to check volume expansion status") + } else if !expansionComplete { + log.V(1).Info("volume expansion in progress, will recheck") + setCondition(cluster, valkeyiov1alpha1.ConditionProgressing, valkeyiov1alpha1.ReasonVolumeExpanding, "Volume expansion in progress", metav1.ConditionTrue) + _ = r.updateStatus(ctx, cluster, nil) + // Requeue more frequently to monitor expansion progress + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + } else { + if err := r.upsertDeployments(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonDeploymentError, err.Error(), metav1.ConditionFalse) + _ = r.updateStatus(ctx, cluster, nil) + return ctrl.Result{}, err + } } // Get all pods and their current Valkey Cluster state @@ -276,7 +308,11 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, cluster * "valkey.conf": ` cluster-enabled yes protected-mode no -cluster-node-timeout 2000`, +cluster-node-timeout 2000 +dir /data +appendonly yes +appendfilename "appendonly.aof" +appendfsync everysec`, }, } if err := controllerutil.SetControllerReference(cluster, cm, r.Scheme); err != nil { @@ -354,6 +390,36 @@ func (r *ValkeyClusterReconciler) ensureDeployment(ctx context.Context, cluster return nil } +// Create Valkey instances using StatefulSets with persistent volumes +func (r *ValkeyClusterReconciler) upsertStatefulSets(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + + existing := &appsv1.StatefulSetList{} + if err := r.List(ctx, existing, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list StatefulSets") + return err + } + + expected := int(cluster.Spec.Shards * (1 + cluster.Spec.Replicas)) + + // Create missing statefulsets + for i := len(existing.Items); i < expected; i++ { + statefulSet := createClusterStatefulSet(cluster) + if err := controllerutil.SetControllerReference(cluster, statefulSet, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, statefulSet); err != nil { + r.Recorder.Eventf(cluster, statefulSet, corev1.EventTypeWarning, "StatefulSetCreationFailed", "CreateStatefulSet", "Failed to create statefulset: %v", err) + return err + } + r.Recorder.Eventf(cluster, statefulSet, corev1.EventTypeNormal, "StatefulSetCreated", "CreateStatefulSet", "Created statefulset %d of %d", i+1, expected) + } + + // TODO: update existing + + return nil +} + func (r *ValkeyClusterReconciler) getValkeyClusterState(ctx context.Context, pods *corev1.PodList) *valkey.ClusterState { // Create a list of addresses to possible Valkey nodes ips := []string{} diff --git a/internal/controller/volume_expansion.go b/internal/controller/volume_expansion.go new file mode 100644 index 0000000..60badc8 --- /dev/null +++ b/internal/controller/volume_expansion.go @@ -0,0 +1,208 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" +) + +// handleVolumeExpansion checks if PVCs need to be expanded and performs the expansion if needed +func (r *ValkeyClusterReconciler) handleVolumeExpansion(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + log := logf.FromContext(ctx) + + // Only proceed if storage is enabled + if !cluster.Spec.Storage.Enabled { + return nil + } + + // Get the desired storage size from the cluster spec + desiredSize := "1Gi" + if cluster.Spec.Storage.Size != "" { + desiredSize = cluster.Spec.Storage.Size + } + + desiredQuantity, err := resource.ParseQuantity(desiredSize) + if err != nil { + log.Error(err, "failed to parse desired storage size", "size", desiredSize) + return fmt.Errorf("invalid storage size format: %w", err) + } + + // Get all PVCs for this cluster + pvcList := &corev1.PersistentVolumeClaimList{} + if err := r.List(ctx, pvcList, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list PVCs") + return err + } + + if len(pvcList.Items) == 0 { + // No PVCs yet, nothing to expand + return nil + } + + // Check if any PVC needs expansion + needsExpansion := false + var pvcToExpand []corev1.PersistentVolumeClaim + + for _, pvc := range pvcList.Items { + currentSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + + // Compare sizes - only expand if desired is greater than current + if desiredQuantity.Cmp(currentSize) > 0 { + needsExpansion = true + pvcToExpand = append(pvcToExpand, pvc) + log.Info("PVC needs expansion", "pvc", pvc.Name, "currentSize", (¤tSize).String(), "desiredSize", desiredSize) + } else if desiredQuantity.Cmp(currentSize) < 0 { + // Size reduction is not supported + log.Info("Storage size reduction is not supported", "pvc", pvc.Name, "currentSize", (¤tSize).String(), "requestedSize", desiredSize) + r.Recorder.Eventf(cluster, &pvc, corev1.EventTypeWarning, "VolumeExpansionNotSupported", "VolumeExpansion", + "Cannot reduce storage size from %s to %s for PVC %s - shrinking is not supported", + (¤tSize).String(), desiredSize, pvc.Name) + } + } + + if !needsExpansion { + // All PVCs are already at the desired size + return nil + } + + // Verify that the storage class supports volume expansion + if len(pvcToExpand) > 0 { + storageClassName := pvcToExpand[0].Spec.StorageClassName + if storageClassName != nil && *storageClassName != "" { + if err := r.verifyStorageClassSupportsExpansion(ctx, *storageClassName); err != nil { + log.Error(err, "storage class does not support volume expansion", "storageClass", *storageClassName) + r.Recorder.Eventf(cluster, &pvcToExpand[0], corev1.EventTypeWarning, "VolumeExpansionNotSupported", "VolumeExpansion", + "Storage class %s does not support volume expansion: %v", *storageClassName, err) + return err + } + } + } + + // Perform expansion on all PVCs that need it + expandedCount := 0 + for i := range pvcToExpand { + pvc := &pvcToExpand[i] + currentSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + + log.Info("Expanding PVC", "pvc", pvc.Name, "newSize", desiredSize) + r.Recorder.Eventf(cluster, pvc, corev1.EventTypeNormal, "VolumeExpanding", "VolumeExpansion", + "Expanding PVC %s from %s to %s", + pvc.Name, currentSize.String(), desiredSize) + + // Update the PVC with the new size + pvc.Spec.Resources.Requests[corev1.ResourceStorage] = desiredQuantity + if err := r.Update(ctx, pvc); err != nil { + log.Error(err, "failed to update PVC", "pvc", pvc.Name) + r.Recorder.Eventf(cluster, pvc, corev1.EventTypeWarning, "VolumeExpansionFailed", "VolumeExpansion", + "Failed to expand PVC %s: %v", pvc.Name, err) + return err + } + + expandedCount++ + r.Recorder.Eventf(cluster, pvc, corev1.EventTypeNormal, "VolumeExpanded", "VolumeExpansion", + "Successfully initiated expansion of PVC %s to %s", pvc.Name, desiredSize) + } + + if expandedCount > 0 { + log.Info("Volume expansion initiated", "pvcCount", expandedCount, "newSize", desiredSize) + setCondition(cluster, valkeyiov1alpha1.ConditionProgressing, valkeyiov1alpha1.ReasonVolumeExpanding, + fmt.Sprintf("Expanding %d volumes to %s", expandedCount, desiredSize), metav1.ConditionTrue) + } + + return nil +} + +// verifyStorageClassSupportsExpansion checks if the storage class supports volume expansion +func (r *ValkeyClusterReconciler) verifyStorageClassSupportsExpansion(ctx context.Context, storageClassName string) error { + log := logf.FromContext(ctx) + + storageClass := &storagev1.StorageClass{} + if err := r.Get(ctx, client.ObjectKey{Name: storageClassName}, storageClass); err != nil { + return fmt.Errorf("failed to get storage class: %w", err) + } + + if storageClass.AllowVolumeExpansion == nil || !*storageClass.AllowVolumeExpansion { + log.Info("Storage class does not support volume expansion", "storageClass", storageClassName) + return fmt.Errorf("storage class %s does not have allowVolumeExpansion enabled", storageClassName) + } + + log.V(1).Info("Storage class supports volume expansion", "storageClass", storageClassName) + return nil +} + +// checkVolumeExpansionStatus monitors the status of ongoing volume expansions +func (r *ValkeyClusterReconciler) checkVolumeExpansionStatus(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) (bool, error) { + log := logf.FromContext(ctx) + + // Only proceed if storage is enabled + if !cluster.Spec.Storage.Enabled { + return true, nil + } + + // Get all PVCs for this cluster + pvcList := &corev1.PersistentVolumeClaimList{} + if err := r.List(ctx, pvcList, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels(cluster))); err != nil { + log.Error(err, "failed to list PVCs") + return false, err + } + + allCompleted := true + expandingCount := 0 + + for _, pvc := range pvcList.Items { + // Check if PVC is being resized + for _, condition := range pvc.Status.Conditions { + if condition.Type == corev1.PersistentVolumeClaimResizing { + if condition.Status == corev1.ConditionTrue { + allCompleted = false + expandingCount++ + log.V(1).Info("PVC expansion in progress", "pvc", pvc.Name) + } + } + if condition.Type == corev1.PersistentVolumeClaimFileSystemResizePending { + if condition.Status == corev1.ConditionTrue { + allCompleted = false + expandingCount++ + log.V(1).Info("PVC filesystem resize pending", "pvc", pvc.Name) + } + } + } + + // Compare spec vs status to detect pending expansions + specSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage] + statusSize := pvc.Status.Capacity[corev1.ResourceStorage] + if specSize.Cmp(statusSize) > 0 { + allCompleted = false + expandingCount++ + } + } + + if expandingCount > 0 { + log.Info("Volume expansions still in progress", "count", expandingCount) + } + + return allCompleted, nil +}