Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ jobs:
needs: build
steps:
- name: Create Release
uses: softprops/action-gh-release@v1
uses: softprops/action-gh-release@v2
with:
generate_release_notes: true
5 changes: 4 additions & 1 deletion api/rds/v1alpha1/valkey_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ type ValkeySpec struct {
// for detailed settings, please refer to https://github.com/valkey-io/valkey/blob/unstable/valkey.conf
CustomConfigs map[string]string `json:"customConfigs,omitempty"`

// Modules defines the module settings for Valkey
// Modules defines a list of modules to be loaded into the valkey instance.
// Each module is specified by its name and version.
// Modules are loaded at startup and can extend Redis functionality.
// +optional
Modules []core.ValkeyModule `json:"modules,omitempty"`

// Storage defines the storage settings for Valkey
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
ClusterResourceCleanFinalizer = "buf.red/cluster-resource-clean"
)

type ShardConfig struct {
// Slots is the slot range for the shard, eg: 0-1000,1002,1005-1100
//+kubebuilder:validation:Pattern:=`^(\d{1,5}|(\d{1,5}-\d{1,5}))(,(\d{1,5}|(\d{1,5}-\d{1,5})))*$`
Expand Down
51 changes: 21 additions & 30 deletions cmd/helper/commands/cluster/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,21 @@ import (
"k8s.io/client-go/kubernetes"
)

// ExposeNodePort
func ExposeNodePort(ctx context.Context, client *kubernetes.Clientset, namespace, podName, ipfamily string, serviceType corev1.ServiceType, logger logr.Logger) error {
logger.Info("Info", "serviceType", serviceType, "ipfamily", ipfamily)
// Access
func Access(ctx context.Context, client *kubernetes.Clientset, namespace, podName, ipfamily string, logger logr.Logger) error {
podSvc, err := commands.RetryGetService(ctx, client, namespace, podName, 20, logger)
if errors.IsNotFound(err) {
if podSvc, err = commands.RetryGetService(ctx, client, namespace, podName, 20, logger); err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
} else if err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
serviceType := podSvc.Spec.Type

logger.Info("check pod service type", "ipfamily", ipfamily, "serviceType", serviceType, "podName", podName)
pod, err := commands.GetPod(ctx, client, namespace, podName, logger)
if err != nil {
logger.Error(err, "get pods failed", "namespace", namespace, "name", podName)
Expand All @@ -54,16 +66,6 @@ func ExposeNodePort(ctx context.Context, client *kubernetes.Clientset, namespace
announceIPort int32 = 16379
)
if serviceType == corev1.ServiceTypeNodePort {
podSvc, err := commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeNodePort, 20, logger)
if errors.IsNotFound(err) {
if podSvc, err = commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeNodePort, 20, logger); err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
} else if err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
for _, v := range podSvc.Spec.Ports {
if v.Name == "client" {
announcePort = v.NodePort
Expand Down Expand Up @@ -121,17 +123,6 @@ func ExposeNodePort(ctx context.Context, client *kubernetes.Clientset, namespace
return err
}
} else if serviceType == corev1.ServiceTypeLoadBalancer {
podSvc, err := commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeLoadBalancer, 20, logger)
if errors.IsNotFound(err) {
if podSvc, err = commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeLoadBalancer, 20, logger); err != nil {
logger.Error(err, "retry get lb service failed")
return err
}
} else if err != nil {
logger.Error(err, "get lb service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}

for _, v := range podSvc.Status.LoadBalancer.Ingress {
if v.IP != "" {
ip, err := netip.ParseAddr(v.IP)
Expand Down Expand Up @@ -165,14 +156,14 @@ func ExposeNodePort(ctx context.Context, client *kubernetes.Clientset, namespace
}
}

format_announceIp := strings.Replace(announceIp, ":", "-", -1)
if strings.HasSuffix(format_announceIp, "-") {
format_announceIp = fmt.Sprintf("%s0", format_announceIp)
fAnnounceIp := strings.ReplaceAll(announceIp, ":", "-")
if strings.HasSuffix(fAnnounceIp, "-") {
fAnnounceIp = fmt.Sprintf("%s0", fAnnounceIp)
}
labelPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/labels/%s","value":"%s"},{"op":"add","path":"/metadata/labels/%s","value":"%s"},{"op":"add","path":"/metadata/labels/%s","value":"%s"}]`,
strings.Replace(builder.AnnounceIPLabelKey, "/", "~1", -1), format_announceIp,
strings.Replace(builder.AnnouncePortLabelKey, "/", "~1", -1), strconv.Itoa(int(announcePort)),
strings.Replace(builder.AnnounceIPortLabelKey, "/", "~1", -1), strconv.Itoa(int(announceIPort)))
strings.ReplaceAll(builder.AnnounceIPLabelKey, "/", "~1"), fAnnounceIp,
strings.ReplaceAll(builder.AnnouncePortLabelKey, "/", "~1"), strconv.Itoa(int(announcePort)),
strings.ReplaceAll(builder.AnnounceIPortLabelKey, "/", "~1"), strconv.Itoa(int(announceIPort)))

logger.Info(labelPatch)
_, err = client.CoreV1().Pods(pod.Namespace).Patch(ctx, podName, types.JSONPatchType, []byte(labelPatch), metav1.PatchOptions{})
Expand Down
16 changes: 4 additions & 12 deletions cmd/helper/commands/cluster/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/chideat/valkey-operator/cmd/helper/commands"
"github.com/urfave/cli/v2"
corev1 "k8s.io/api/core/v1"
)

func NewCommand(ctx context.Context) *cli.Command {
Expand Down Expand Up @@ -99,12 +98,6 @@ func NewCommand(ctx context.Context) *cli.Command {
Usage: "IP_FAMILY for expose",
EnvVars: []string{"IP_FAMILY_PREFER"},
},
&cli.StringFlag{
Name: "service-type",
Usage: "Service type for sentinel service",
EnvVars: []string{"SERVICE_TYPE"},
Value: "ClusterIP",
},
},
Subcommands: []*cli.Command{
{
Expand All @@ -113,10 +106,9 @@ func NewCommand(ctx context.Context) *cli.Command {
Flags: []cli.Flag{},
Action: func(c *cli.Context) error {
var (
namespace = c.String("namespace")
podName = c.String("pod-name")
ipFamily = c.String("ip-family")
serviceType = corev1.ServiceType(c.String("service-type"))
namespace = c.String("namespace")
podName = c.String("pod-name")
ipFamily = c.String("ip-family")
)
if namespace == "" {
return cli.Exit("require namespace", 1)
Expand All @@ -133,7 +125,7 @@ func NewCommand(ctx context.Context) *cli.Command {
return cli.Exit(err, 1)
}

if err := ExposeNodePort(ctx, client, namespace, podName, ipFamily, serviceType, logger); err != nil {
if err := Access(ctx, client, namespace, podName, ipFamily, logger); err != nil {
logger.Error(err, "expose node port failed")
return cli.Exit(err, 1)
}
Expand Down
48 changes: 19 additions & 29 deletions cmd/helper/commands/failover/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,25 @@ import (
"k8s.io/client-go/kubernetes"
)

func Access(ctx context.Context, client *kubernetes.Clientset, namespace, podName, ipfamily string, serviceType corev1.ServiceType, logger logr.Logger) error {
logger.Info("service access", "serviceType", serviceType, "ipfamily", ipfamily, "podName", podName)
func Access(ctx context.Context, client *kubernetes.Clientset, namespace, podName, ipfamily string, logger logr.Logger) error {
podSvc, err := commands.RetryGetService(ctx, client, namespace, podName, 20, logger)
if errors.IsNotFound(err) {
if podSvc, err = commands.RetryGetService(ctx, client, namespace, podName, 20, logger); err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
} else if err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
serviceType := podSvc.Spec.Type

logger.Info("check pod service type", "ipfamily", ipfamily, "serviceType", serviceType, "podName", podName)
pod, err := commands.GetPod(ctx, client, namespace, podName, logger)
if err != nil {
logger.Error(err, "get pods failed", "namespace", namespace, "name", podName)
return err
}

if pod.Status.HostIP == "" {
return fmt.Errorf("pod not found or pod with invalid hostIP")
}
Expand All @@ -51,16 +62,6 @@ func Access(ctx context.Context, client *kubernetes.Clientset, namespace, podNam
announcePort int32 = 6379
)
if serviceType == corev1.ServiceTypeNodePort {
podSvc, err := commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeNodePort, 20, logger)
if errors.IsNotFound(err) {
if podSvc, err = commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeNodePort, 20, logger); err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
} else if err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}
for _, v := range podSvc.Spec.Ports {
if v.Name == "client" {
announcePort = v.NodePort
Expand Down Expand Up @@ -115,17 +116,6 @@ func Access(ctx context.Context, client *kubernetes.Clientset, namespace, podNam
return err
}
} else if serviceType == corev1.ServiceTypeLoadBalancer {
podSvc, err := commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeLoadBalancer, 20, logger)
if errors.IsNotFound(err) {
if podSvc, err = commands.RetryGetService(ctx, client, namespace, podName, corev1.ServiceTypeLoadBalancer, 20, logger); err != nil {
logger.Error(err, "retry get lb service failed")
return err
}
} else if err != nil {
logger.Error(err, "get lb service failed", "target", fmt.Sprintf("%s/%s", namespace, podName))
return err
}

for _, v := range podSvc.Status.LoadBalancer.Ingress {
if v.IP == "" {
continue
Expand Down Expand Up @@ -161,13 +151,13 @@ func Access(ctx context.Context, client *kubernetes.Clientset, namespace, podNam
}
}

format_announceIp := strings.Replace(announceIp, ":", "-", -1)
if strings.HasSuffix(format_announceIp, "-") {
format_announceIp = fmt.Sprintf("%s0", format_announceIp)
fAnnounceIp := strings.ReplaceAll(announceIp, ":", "-")
if strings.HasSuffix(fAnnounceIp, "-") {
fAnnounceIp = fmt.Sprintf("%s0", fAnnounceIp)
}
labelPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/labels/%s","value":"%s"},{"op":"add","path":"/metadata/labels/%s","value":"%s"}]`,
strings.Replace(builder.AnnounceIPLabelKey, "/", "~1", -1), format_announceIp,
strings.Replace(builder.AnnouncePortLabelKey, "/", "~1", -1), strconv.Itoa(int(announcePort)))
strings.ReplaceAll(builder.AnnounceIPLabelKey, "/", "~1"), fAnnounceIp,
strings.ReplaceAll(builder.AnnouncePortLabelKey, "/", "~1"), strconv.Itoa(int(announcePort)))

logger.Info(labelPatch)
_, err = client.CoreV1().Pods(pod.Namespace).Patch(ctx, podName, types.JSONPatchType, []byte(labelPatch), metav1.PatchOptions{})
Expand Down
21 changes: 4 additions & 17 deletions cmd/helper/commands/failover/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/chideat/valkey-operator/cmd/helper/commands"
"github.com/urfave/cli/v2"
corev1 "k8s.io/api/core/v1"
)

func NewCommand(ctx context.Context) *cli.Command {
Expand Down Expand Up @@ -49,11 +48,6 @@ func NewCommand(ctx context.Context) *cli.Command {
Usage: "The id of current pod",
EnvVars: []string{"POD_UID"},
},
&cli.StringFlag{
Name: "service-name",
Usage: "Service name of the statefulset",
EnvVars: []string{"SERVICE_NAME"},
},
&cli.StringFlag{
Name: "operator-username",
Usage: "Operator username",
Expand Down Expand Up @@ -108,12 +102,6 @@ func NewCommand(ctx context.Context) *cli.Command {
Usage: "IP_FAMILY for servie access",
EnvVars: []string{"IP_FAMILY_PREFER"},
},
&cli.StringFlag{
Name: "service-type",
Usage: "Service type for sentinel service",
EnvVars: []string{"SERVICE_TYPE"},
Value: "ClusterIP",
},
},
Subcommands: []*cli.Command{
{
Expand All @@ -122,10 +110,9 @@ func NewCommand(ctx context.Context) *cli.Command {
Flags: []cli.Flag{},
Action: func(c *cli.Context) error {
var (
namespace = c.String("namespace")
podName = c.String("pod-name")
ipFamily = c.String("ip-family")
serviceType = corev1.ServiceType(c.String("service-type"))
namespace = c.String("namespace")
podName = c.String("pod-name")
ipFamily = c.String("ip-family")
)
if namespace == "" {
return cli.Exit("require namespace", 1)
Expand All @@ -142,7 +129,7 @@ func NewCommand(ctx context.Context) *cli.Command {
return cli.Exit(err, 1)
}

if err := Access(ctx, client, namespace, podName, ipFamily, serviceType, logger); err != nil {
if err := Access(ctx, client, namespace, podName, ipFamily, logger); err != nil {
logger.Error(err, "enable nodeport service access failed")
return cli.Exit(err, 1)
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/helper/commands/failover/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func loadAnnounceAddress(filepath string, logger logr.Logger) string {
}
data, err := os.ReadFile(filepath)
if err != nil {
if os.IsNotExist(err) {
return ""
}
logger.Error(err, "read announce file failed", "path", filepath)
return ""
}
Expand Down
14 changes: 4 additions & 10 deletions cmd/helper/commands/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,15 @@ func GetPod(ctx context.Context, client *kubernetes.Clientset, namespace, name s
return pod, nil
}

func RetryGetService(ctx context.Context, clientset *kubernetes.Clientset, svcNamespace, svcName string, typ corev1.ServiceType,
count int, logger logr.Logger) (*corev1.Service, error) {

serviceChecker := func(svc *corev1.Service, typ corev1.ServiceType) error {
func RetryGetService(ctx context.Context, clientset *kubernetes.Clientset, svcNamespace, svcName string, count int, logger logr.Logger) (*corev1.Service, error) {
serviceChecker := func(svc *corev1.Service) error {
if svc == nil {
return fmt.Errorf("service not found")
}
if len(svc.Spec.Ports) < 1 {
return fmt.Errorf("service port not found")
}

if svc.Spec.Type != typ {
return fmt.Errorf("service type not match")
}

switch svc.Spec.Type {
case corev1.ServiceTypeNodePort:
for _, port := range svc.Spec.Ports {
Expand All @@ -233,13 +227,13 @@ func RetryGetService(ctx context.Context, clientset *kubernetes.Clientset, svcNa
}

logger.Info("retry get service", "target", fmt.Sprintf("%s/%s", svcNamespace, svcName), "count", count)
for i := 0; i < count+1; i++ {
for range count + 1 {
svc, err := clientset.CoreV1().Services(svcNamespace).Get(ctx, svcName, metav1.GetOptions{})
if err != nil {
logger.Error(err, "get service failed", "target", fmt.Sprintf("%s/%s", svcNamespace, svcName))
return nil, err
}
if serviceChecker(svc, typ) != nil {
if serviceChecker(svc) != nil {
logger.Error(err, "service check failed", "target", fmt.Sprintf("%s/%s", svcNamespace, svcName))
} else {
return svc, nil
Expand Down
Loading
Loading