diff --git a/Gopkg.lock b/Gopkg.lock index c3e6c861f..2c5a06810 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -423,6 +423,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "a4c2d88d2135361c634b51caae45b70cf3fab630b766b68d53d3a3cde2a00db4" + inputs-digest = "e98c554fe619d67147695aed720852084b76d9957a3db2a04080f76db50ae477" solver-name = "gps-cdcl" solver-version = 1 diff --git a/docs/cassandra.rst b/docs/cassandra.rst index bf8e038e7..a5d6c77ea 100644 --- a/docs/cassandra.rst +++ b/docs/cassandra.rst @@ -191,6 +191,7 @@ Supported Configuration Changes Navigator supports the following changes to a Cassandra cluster: * :ref:`create-cluster-cassandra`: Add all initially configured node pools and nodes. + * :ref:`minor-upgrade-cassandra`: Trigger a rolling upgrade of Cassandra nodes by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``. * :ref:`scale-out-cassandra`: Increase ``CassandraCluster.Spec.NodePools[0].Replicas`` to add more C* nodes to a ``nodepool``. Navigator does not currently support any other changes to the Cassandra cluster configuration. @@ -200,7 +201,6 @@ Unsupported Configuration Changes The following configuration changes are not currently supported but will be supported in the near future: - * Minor Upgrade: Trigger a rolling Cassandra upgrade by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``. * Scale In: Decrease ``CassandraCluster.Spec.NodePools[0].Replicas`` to remove C* nodes from a ``nodepool``. The following configuration changes are not currently supported: @@ -220,6 +220,19 @@ in order of ``NodePool`` and according to the process described in :ref:`scale-o The order of node creation is determined by the order of the entries in the ``CassandraCluster.Spec.NodePools`` list. You can look at ``CassandraCluster.Status.NodePools`` to see the current state. +.. _minor-upgrade-cassandra: + +Minor Upgrade +~~~~~~~~~~~~~ + +If you increment the minor or patch number in ``CassandraCluster.Spec.Version``, Navigator will trigger a rolling update of the existing C* nodes. + +C* nodes are upgraded serially, in order of NodePool and Pod ordinal, starting with the pod with the highest ordinal in the first NodePool. + +`StatefulSet Rolling Updates `_ describes the update process in more detail. + +.. note:: Major version upgrades are not yet supported. + .. _scale-out-cassandra: Scale Out diff --git a/hack/e2e.sh b/hack/e2e.sh index bba2a9972..3d680e73c 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -273,12 +273,23 @@ function test_cassandracluster() { stdout_equals "${CASS_VERSION}" \ kubectl --namespace "${namespace}" \ get pilots \ - --output 'jsonpath={.items[0].status.cassandra.version}' + --selector "navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \ + --output 'jsonpath={.items[*].status.cassandra.version}' then kubectl --namespace "${namespace}" get pilots -o yaml fail_test "Pilots failed to report the expected version" fi + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get cassandracluster "${CASS_NAME}" \ + --output 'jsonpath={.status.nodePools.*.version}' + then + kubectl --namespace "${namespace}" get cassandracluster -o yaml + fail_test "NodePools failed to report the expected version" + fi + # Wait 5 minutes for cassandra to start and listen for CQL queries. if ! retry TIMEOUT=300 cql_connect \ "${namespace}" \ @@ -310,6 +321,43 @@ function test_cassandracluster() { --debug \ --execute="INSERT INTO space1.testtable1(key, value) VALUES('testkey1', 'testvalue1')" + # Upgrade to newer patch version + export CASS_VERSION="3.11.2" + kubectl apply \ + --namespace "${namespace}" \ + --filename \ + <(envsubst \ + '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \ + < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") + + # The cluster is upgraded + if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \ + "navigator-controller:CassandraCluster:Normal:UpdateVersion" + then + fail_test "An UpdateVersion event was not recorded" + fi + + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get pilots \ + --selector "navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \ + --output 'jsonpath={.items[*].status.cassandra.version}' + then + kubectl --namespace "${namespace}" get pilots -o yaml + fail_test "Pilots failed to report the expected version" + fi + + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get cassandracluster "${CASS_NAME}" \ + --output 'jsonpath={.status.nodePools.*.version}' + then + kubectl --namespace "${namespace}" get cassandracluster -o yaml + fail_test "NodePools failed to report the expected version" + fi + # Delete the Cassandra pod and wait for the CQL service to become # unavailable (readiness probe fails) diff --git a/hack/testdata/values.yaml b/hack/testdata/values.yaml index 2844eed1b..917270e01 100644 --- a/hack/testdata/values.yaml +++ b/hack/testdata/values.yaml @@ -31,3 +31,4 @@ controller: repository: quay.io/jetstack/navigator-controller tag: build pullPolicy: Never + logLevel: 4 diff --git a/internal/test/util/generate/generate.go b/internal/test/util/generate/generate.go index d202a8d1e..53686407e 100644 --- a/internal/test/util/generate/generate.go +++ b/internal/test/util/generate/generate.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/cassandra/version" ) type PilotConfig struct { @@ -121,6 +122,7 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet { type CassandraClusterConfig struct { Name, Namespace string + Version version.Version } func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster { @@ -129,6 +131,9 @@ func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster { Name: c.Name, Namespace: c.Namespace, }, + Spec: v1alpha1.CassandraClusterSpec{ + Version: c.Version, + }, } } diff --git a/pkg/apis/navigator/types.go b/pkg/apis/navigator/types.go index 9f63052e7..cffb57533 100644 --- a/pkg/apis/navigator/types.go +++ b/pkg/apis/navigator/types.go @@ -51,6 +51,7 @@ type CassandraClusterStatus struct { type CassandraClusterNodePoolStatus struct { ReadyReplicas int32 + Version *version.Version } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/types.go b/pkg/apis/navigator/v1alpha1/types.go index d09825d02..06fdd445f 100644 --- a/pkg/apis/navigator/v1alpha1/types.go +++ b/pkg/apis/navigator/v1alpha1/types.go @@ -82,6 +82,12 @@ type CassandraClusterStatus struct { type CassandraClusterNodePoolStatus struct { ReadyReplicas int32 `json:"readyReplicas"` + // The lowest version of Cassandra found to be running in this nodepool, + // as reported by the Cassandra process. + // nil or empty if the lowest version can not be determined, + // or if the lowest version has not yet been determined + // +optional + Version *version.Version `json:"version,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go index a290cd478..8361f9d7f 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go @@ -202,6 +202,7 @@ func Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNode func autoConvert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClusterNodePoolStatus(in *CassandraClusterNodePoolStatus, out *navigator.CassandraClusterNodePoolStatus, s conversion.Scope) error { out.ReadyReplicas = in.ReadyReplicas + out.Version = (*version.Version)(unsafe.Pointer(in.Version)) return nil } @@ -212,6 +213,7 @@ func Convert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClust func autoConvert_navigator_CassandraClusterNodePoolStatus_To_v1alpha1_CassandraClusterNodePoolStatus(in *navigator.CassandraClusterNodePoolStatus, out *CassandraClusterNodePoolStatus, s conversion.Scope) error { out.ReadyReplicas = in.ReadyReplicas + out.Version = (*version.Version)(unsafe.Pointer(in.Version)) return nil } diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go index 0f15c581b..e158198fa 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go @@ -116,6 +116,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) { *out = *in + if in.Version != nil { + in, out := &in.Version, &out.Version + if *in == nil { + *out = nil + } else { + *out = new(version.Version) + **out = **in + } + } return } @@ -170,7 +179,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) { in, out := &in.NodePools, &out.NodePools *out = make(map[string]CassandraClusterNodePoolStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + newVal := new(CassandraClusterNodePoolStatus) + val.DeepCopyInto(newVal) + (*out)[key] = *newVal } } return diff --git a/pkg/apis/navigator/validation/cassandra.go b/pkg/apis/navigator/validation/cassandra.go index 559fc4cbe..7ecea7bc1 100644 --- a/pkg/apis/navigator/validation/cassandra.go +++ b/pkg/apis/navigator/validation/cassandra.go @@ -1,6 +1,7 @@ package validation import ( + "fmt" "reflect" apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" @@ -10,6 +11,8 @@ import ( "github.com/jetstack/navigator/pkg/apis/navigator" ) +var supportedMajorVersions = sets.NewInt64(3) + func ValidateCassandraClusterNodePool(np *navigator.CassandraClusterNodePool, fldPath *field.Path) field.ErrorList { // TODO: call k8s.io/kubernetes/pkg/apis/core/validation.ValidateResourceRequirements on np.Resources // this will require vendoring kubernetes/kubernetes. @@ -27,6 +30,26 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field. fldPath := field.NewPath("spec") + if new.Spec.Version.LessThan(&old.Spec.Version) { + allErrs = append( + allErrs, + field.Forbidden( + fldPath.Child("version"), + "cannot perform version downgrades", + ), + ) + } + + if new.Spec.Version.Major != old.Spec.Version.Major { + allErrs = append( + allErrs, + field.Forbidden( + fldPath.Child("version"), + "cannot perform major version upgrades", + ), + ) + } + npPath := fldPath.Child("nodePools") for i, newNp := range new.Spec.NodePools { idxPath := npPath.Index(i) @@ -60,6 +83,21 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field. func ValidateCassandraClusterSpec(spec *navigator.CassandraClusterSpec, fldPath *field.Path) field.ErrorList { allErrs := ValidateNavigatorClusterConfig(&spec.NavigatorClusterConfig, fldPath) + + if !supportedMajorVersions.Has(spec.Version.Major) { + allErrs = append( + allErrs, + field.Forbidden( + fldPath.Child("version"), + fmt.Sprintf( + "%s is not supported. Supported major versions are: %v", + spec.Version, + supportedMajorVersions.List(), + ), + ), + ) + } + npPath := fldPath.Child("nodePools") allNames := sets.String{} for i, np := range spec.NodePools { diff --git a/pkg/apis/navigator/validation/cassandra_test.go b/pkg/apis/navigator/validation/cassandra_test.go index 7ba419c4c..2ca093061 100644 --- a/pkg/apis/navigator/validation/cassandra_test.go +++ b/pkg/apis/navigator/validation/cassandra_test.go @@ -17,7 +17,7 @@ var ( Namespace: "bar", }, Spec: navigator.CassandraClusterSpec{ - Version: *version.New("5.6.2"), + Version: *version.New("3.11.2"), Image: &validImageSpec, NavigatorClusterConfig: validNavigatorClusterConfig, NodePools: []navigator.CassandraClusterNodePool{ @@ -128,6 +128,15 @@ func TestValidateCassandraClusterUpdate(t *testing.T) { return c } + setVersion := func( + c *navigator.CassandraCluster, + v *version.Version, + ) *navigator.CassandraCluster { + c = c.DeepCopy() + c.Spec.Version = *v + return c + } + tests := map[string]testT{ "unchanged cluster": { old: validCassCluster, @@ -147,6 +156,24 @@ func TestValidateCassandraClusterUpdate(t *testing.T) { old: setPersistence(validCassCluster, navigator.PersistenceConfig{Enabled: false}), new: validCassCluster, }, + "decrease version": { + old: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMinor()), + new: validCassCluster, + errorExpected: true, + }, + "major version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMajor()), + errorExpected: true, + }, + "minor version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMinor()), + }, + "patch version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpPatch()), + }, } for title, persistence := range persistenceErrorCases { diff --git a/pkg/apis/navigator/zz_generated.deepcopy.go b/pkg/apis/navigator/zz_generated.deepcopy.go index e187c402e..256e9661f 100644 --- a/pkg/apis/navigator/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/zz_generated.deepcopy.go @@ -116,6 +116,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) { *out = *in + if in.Version != nil { + in, out := &in.Version, &out.Version + if *in == nil { + *out = nil + } else { + *out = new(version.Version) + **out = **in + } + } return } @@ -170,7 +179,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) { in, out := &in.NodePools, &out.NodePools *out = make(map[string]CassandraClusterNodePoolStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + newVal := new(CassandraClusterNodePoolStatus) + val.DeepCopyInto(newVal) + (*out)[key] = *newVal } } return diff --git a/pkg/cassandra/version/version.go b/pkg/cassandra/version/version.go index 330d9e48a..5ea13a6d4 100644 --- a/pkg/cassandra/version/version.go +++ b/pkg/cassandra/version/version.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/coreos/go-semver/semver" + utilerror "k8s.io/apimachinery/pkg/util/errors" ) // Version represents a Cassandra database server version. @@ -26,6 +27,9 @@ import ( type Version struct { versionString string semver semver.Version + Major int64 + Minor int64 + Patch int64 } func New(s string) *Version { @@ -41,6 +45,11 @@ func (v *Version) Equal(versionB *Version) bool { return v.semver.Equal(versionB.semver) } +// TODO: Add tests for this +func (v *Version) LessThan(versionB *Version) bool { + return v.semver.LessThan(versionB.semver) +} + func (v *Version) UnmarshalJSON(data []byte) error { s, err := strconv.Unquote(string(data)) if err != nil { @@ -51,35 +60,32 @@ func (v *Version) UnmarshalJSON(data []byte) error { func (v *Version) set(cassVersionString string) error { var versionsTried []string - var errorsEncountered []string - - errorWhileParsingOriginalVersion := v.semver.Set(cassVersionString) - if errorWhileParsingOriginalVersion == nil { - v.versionString = cassVersionString - return nil - } + var errs []error - versionsTried = append(versionsTried, cassVersionString) - errorsEncountered = append(errorsEncountered, errorWhileParsingOriginalVersion.Error()) - - semverString := maybeAddMissingPatchVersion(cassVersionString) - if semverString != cassVersionString { - errorWhileParsingSemverVersion := v.semver.Set(semverString) - if errorWhileParsingSemverVersion == nil { - v.versionString = cassVersionString - return nil + err := v.semver.Set(cassVersionString) + if err != nil { + versionsTried = append(versionsTried, cassVersionString) + errs = append(errs, err) + + semverString := maybeAddMissingPatchVersion(cassVersionString) + if semverString != cassVersionString { + err = v.semver.Set(semverString) + versionsTried = append(versionsTried, semverString) + errs = append(errs, err) } - versionsTried = append(versionsTried, semverString) - errorsEncountered = append(errorsEncountered, errorWhileParsingSemverVersion.Error()) } - - return fmt.Errorf( - "unable to parse Cassandra version as semver. "+ - "Versions tried: '%s'. "+ - "Errors encountered: '%s'.", - strings.Join(versionsTried, "','"), - strings.Join(errorsEncountered, "','"), - ) + if err != nil { + return fmt.Errorf( + "unable to parse Cassandra version as semver. Versions tried: '%s'. Errors: %s.", + strings.Join(versionsTried, "','"), + utilerror.NewAggregate(errs), + ) + } + v.versionString = cassVersionString + v.Major = v.semver.Major + v.Minor = v.semver.Minor + v.Patch = v.semver.Patch + return nil } var _ json.Unmarshaler = &Version{} @@ -99,12 +105,28 @@ func (v Version) String() string { return v.versionString } -func (v Version) MarshalJSON() ([]byte, error) { +func (v *Version) MarshalJSON() ([]byte, error) { return []byte(strconv.Quote(v.String())), nil } var _ json.Marshaler = &Version{} -func (v Version) Semver() string { +func (v *Version) Semver() string { return v.semver.String() } + +func (v *Version) BumpPatch() *Version { + sv := semver.New(v.Semver()) + sv.BumpPatch() + return New(sv.String()) +} +func (v *Version) BumpMinor() *Version { + sv := semver.New(v.Semver()) + sv.BumpMinor() + return New(sv.String()) +} +func (v *Version) BumpMajor() *Version { + sv := semver.New(v.Semver()) + sv.BumpMajor() + return New(sv.String()) +} diff --git a/pkg/controllers/cassandra/actions/create_nodepool.go b/pkg/controllers/cassandra/actions/create_nodepool.go index 0f9cd0333..3aeadc409 100644 --- a/pkg/controllers/cassandra/actions/create_nodepool.go +++ b/pkg/controllers/cassandra/actions/create_nodepool.go @@ -4,6 +4,8 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" + "github.com/pkg/errors" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" @@ -27,7 +29,7 @@ func (a *CreateNodePool) Execute(s *controllers.State) error { return nil } if err != nil { - return err + return errors.Wrap(err, "unable to create statefulset") } s.Recorder.Eventf( a.Cluster, diff --git a/pkg/controllers/cassandra/actions/scaleout.go b/pkg/controllers/cassandra/actions/scaleout.go index c095be9ac..760fc7438 100644 --- a/pkg/controllers/cassandra/actions/scaleout.go +++ b/pkg/controllers/cassandra/actions/scaleout.go @@ -4,6 +4,7 @@ import ( corev1 "k8s.io/api/core/v1" "github.com/golang/glog" + "github.com/pkg/errors" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" @@ -27,32 +28,49 @@ func (a *ScaleOut) Execute(s *controllers.State) error { existingSet, err := s.StatefulSetLister. StatefulSets(baseSet.Namespace).Get(baseSet.Name) if err != nil { - return err + return errors.Wrap(err, "unable to get existing statefulset") } - newSet := existingSet.DeepCopy() if *existingSet.Spec.Replicas == a.NodePool.Replicas { + glog.V(4).Infof( + "The StatefulSet %s/%s already has the desired replicas value %d", + existingSet.Namespace, existingSet.Name, a.NodePool.Replicas, + ) return nil } if *existingSet.Spec.Replicas > a.NodePool.Replicas { glog.Errorf( "ScaleOut error:"+ - "The StatefulSet.Spec.Replicas value (%d) "+ + "The StatefulSet %s/%s replicas value (%d) "+ "is greater than the desired value (%d)", + existingSet.Namespace, existingSet.Name, *existingSet.Spec.Replicas, a.NodePool.Replicas, ) return nil } + if *existingSet.Spec.Replicas != existingSet.Status.ReadyReplicas { + glog.V(4).Infof( + "Waiting for all pods in nodepool %s/%s/%s (statefulset %s/%s) to become ready "+ + "before adding more nodes. "+ + "Replicas: %d, ReadyReplicas: %d", + a.Cluster.Namespace, a.Cluster.Name, a.NodePool.Name, + existingSet.Namespace, existingSet.Name, + *existingSet.Spec.Replicas, existingSet.Status.ReadyReplicas, + ) + return nil + } + newSet := existingSet.DeepCopy() newSet.Spec.Replicas = ptr.Int32(*newSet.Spec.Replicas + 1) _, err = s.Clientset.AppsV1beta1(). StatefulSets(newSet.Namespace).Update(newSet) if err != nil { - return err + return errors.Wrap(err, "unable to update statefulset") } s.Recorder.Eventf( a.Cluster, corev1.EventTypeNormal, a.Name(), - "ScaleOut: NodePool=%q, ReplicaCount=%d, TargetReplicaCount=%d", + "ScaleOut: Cluster=%s/%s, NodePool=%q, ReplicaCount=%d, TargetReplicaCount=%d", + a.Cluster.Namespace, a.Cluster.Name, a.NodePool.Name, *newSet.Spec.Replicas, a.NodePool.Replicas, ) return nil diff --git a/pkg/controllers/cassandra/actions/scaleout_test.go b/pkg/controllers/cassandra/actions/scaleout_test.go index 610041342..1286f1ea3 100644 --- a/pkg/controllers/cassandra/actions/scaleout_test.go +++ b/pkg/controllers/cassandra/actions/scaleout_test.go @@ -38,9 +38,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(122), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + ReadyReplicas: 122, }, ), }, @@ -67,9 +68,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(124), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(124), + ReadyReplicas: 124, }, ), }, @@ -88,13 +90,39 @@ func TestScaleOut(t *testing.T) { }, expectedErr: false, }, + "No update until all existing pods are ready": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + ReadyReplicas: 121, + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + Replicas: 123, + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + }, + }, "Idempotent: No error if ReplicaCount already matches the actual ReplicaCount": { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(124), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(124), + ReadyReplicas: 124, }, ), }, @@ -117,9 +145,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(122), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + ReadyReplicas: 122, }, ), }, @@ -141,9 +170,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(2), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(2), + ReadyReplicas: 2, }, ), }, @@ -203,7 +233,7 @@ func TestScaleOut(t *testing.T) { if *test.expectedStatefulSet.Replicas != *actualStatefulSet.Spec.Replicas { t.Errorf( "Unexpected replica count. Expected: %d. Actual: %d", - test.expectedStatefulSet.Replicas, actualStatefulSet.Spec.Replicas, + *test.expectedStatefulSet.Replicas, *actualStatefulSet.Spec.Replicas, ) } } diff --git a/pkg/controllers/cassandra/actions/update_version.go b/pkg/controllers/cassandra/actions/update_version.go new file mode 100644 index 000000000..9d47e6a73 --- /dev/null +++ b/pkg/controllers/cassandra/actions/update_version.go @@ -0,0 +1,57 @@ +package actions + +import ( + "github.com/golang/glog" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" + "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" +) + +type UpdateVersion struct { + Cluster *v1alpha1.CassandraCluster + NodePool *v1alpha1.CassandraClusterNodePool +} + +var _ controllers.Action = &UpdateVersion{} + +func (a *UpdateVersion) Name() string { + return "UpdateVersion" +} + +func (a *UpdateVersion) Execute(s *controllers.State) error { + baseSet := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool) + existingSet, err := s.StatefulSetLister.StatefulSets(baseSet.Namespace).Get(baseSet.Name) + if err != nil { + return errors.Wrap(err, "unable to get statefulset") + } + newImage := baseSet.Spec.Template.Spec.Containers[0].Image + oldImage := existingSet.Spec.Template.Spec.Containers[0].Image + if newImage == oldImage { + glog.V(4).Infof( + "StatefulSet %q already has the desired image %q", + existingSet.Name, newImage, + ) + return nil + } + glog.V(4).Infof( + "Replacing StatefulSet %q image %q with %q", + existingSet.Name, oldImage, newImage, + ) + newSet := existingSet.DeepCopy() + newSet.Spec.Template.Spec.Containers[0].Image = newImage + _, err = s.Clientset.AppsV1beta1().StatefulSets(newSet.Namespace).Update(newSet) + if err != nil { + return errors.Wrap(err, "unable to update statefulset") + } + s.Recorder.Eventf( + a.Cluster, + corev1.EventTypeNormal, + a.Name(), + "UpdateVersion: NodePool=%q, Version=%q, Image=%q", + a.NodePool.Name, a.Cluster.Spec.Version, newImage, + ) + return nil +} diff --git a/pkg/controllers/cassandra/actions/update_version_test.go b/pkg/controllers/cassandra/actions/update_version_test.go new file mode 100644 index 000000000..71827415b --- /dev/null +++ b/pkg/controllers/cassandra/actions/update_version_test.go @@ -0,0 +1,162 @@ +package actions_test + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/internal/test/unit/framework" + "github.com/jetstack/navigator/internal/test/util/generate" + "github.com/jetstack/navigator/pkg/cassandra/version" + "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" +) + +func TestUpdateVersion(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster generate.CassandraClusterConfig + nodePool generate.CassandraClusterNodePoolConfig + expectedStatefulSet *generate.StatefulSetConfig + expectedErr bool + mutator func(*framework.StateFixture) + } + tests := map[string]testT{ + "Error if StatefulSet not listed": { + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedErr: true, + }, + "Error if clientset.Update fails (e.g. listed but not found)": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedErr: true, + mutator: func(f *framework.StateFixture) { + err := f.KubeClient(). + AppsV1beta1(). + StatefulSets("ns1"). + Delete("cass-cluster1-pool1", &metav1.DeleteOptions{}) + if err != nil { + f.T.Fatal(err) + } + }, + }, + "Idempotent: No error if the image already matches the actual image": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + Version: *version.New("3.11.2"), + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + expectedErr: false, + }, + "The image is updated": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.1", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + Version: *version.New("3.11.2"), + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + }, + } + + for name, test := range tests { + t.Run( + name, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + if test.mutator != nil { + test.mutator(fixture) + } + a := &actions.UpdateVersion{ + Cluster: generate.CassandraCluster(test.cluster), + NodePool: generate.CassandraClusterNodePool(test.nodePool), + } + err := a.Execute(state) + if err != nil { + t.Logf("The error returned by Execute was: %s", err) + } + if !test.expectedErr && err != nil { + t.Errorf("Unexpected error: %s", err) + } + if test.expectedErr && err == nil { + t.Errorf("Expected an error") + } + if test.expectedStatefulSet != nil { + actualStatefulSet, err := fixture.KubeClient(). + AppsV1beta1(). + StatefulSets(test.expectedStatefulSet.Namespace). + Get(test.expectedStatefulSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error retrieving statefulset: %v", err) + } + actualImage := actualStatefulSet.Spec.Template.Spec.Containers[0].Image + if test.expectedStatefulSet.Image != actualImage { + t.Errorf( + "Unexpected image. Expected: %s. Actual: %s", + test.expectedStatefulSet.Image, actualImage, + ) + } + } + }, + ) + } +} diff --git a/pkg/controllers/cassandra/cassandra.go b/pkg/controllers/cassandra/cassandra.go index 43d177644..c154b8f2a 100644 --- a/pkg/controllers/cassandra/cassandra.go +++ b/pkg/controllers/cassandra/cassandra.go @@ -95,6 +95,12 @@ func NewCassandra( WorkFunc: cc.handleObject, }, ) + // An event handler to trigger status updates when pilots change + pilots.Informer().AddEventHandler( + &controllers.BlockingEventHandler{ + WorkFunc: cc.handleObject, + }, + ) cc.cassLister = cassClusters.Lister() cc.statefulSetLister = statefulSets.Lister() cc.cassListerSynced = cassClusters.Informer().HasSynced diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index 2b8bcc92b..61f190424 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -2,7 +2,9 @@ package cassandra import ( "github.com/golang/glog" + "github.com/pkg/errors" apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" @@ -172,6 +174,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro a := NextAction(c) if a != nil { + glog.V(4).Infof("Executing action: %#v", a) err = a.Execute(e.state) if err != nil { e.recorder.Eventf( @@ -181,10 +184,9 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSync, err, ) - return err + return errors.Wrap(err, "failure while executing action") } } - e.recorder.Event( c, apiv1.EventTypeNormal, @@ -206,11 +208,40 @@ func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { } for _, np := range c.Spec.NodePools { nps := c.Status.NodePools[np.Name] - if np.Replicas > nps.ReadyReplicas { + switch { + case np.Replicas == nps.ReadyReplicas: + continue + case np.Replicas > nps.ReadyReplicas: return &actions.ScaleOut{ Cluster: c, NodePool: &np, } + default: + glog.Errorf( + "Unsupported scale change on NodePool %q from %d to %d", + np.Name, nps.ReadyReplicas, np.Replicas, + ) + return nil + } + } + for _, np := range c.Spec.NodePools { + nps := c.Status.NodePools[np.Name] + if nps.Version == nil { + return nil + } + if c.Spec.Version.LessThan(nps.Version) { + glog.Error("Version downgrades are not supported") + return nil + } + if nps.Version.Major != c.Spec.Version.Major { + glog.Error("Major version upgrades are not supported") + return nil + } + if nps.Version.LessThan(&c.Spec.Version) { + return &actions.UpdateVersion{ + Cluster: c, + NodePool: &np, + } } } return nil diff --git a/pkg/controllers/cassandra/cluster_control_test.go b/pkg/controllers/cassandra/cluster_control_test.go index 13b375f03..14d6cff8d 100644 --- a/pkg/controllers/cassandra/cluster_control_test.go +++ b/pkg/controllers/cassandra/cluster_control_test.go @@ -1,60 +1,27 @@ package cassandra_test import ( - "fmt" "math/rand" "reflect" - "strings" "testing" "testing/quick" + "github.com/kr/pretty" + v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers/cassandra" "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" ) -func CassandraClusterSummary(c *v1alpha1.CassandraCluster) string { - return fmt.Sprintf( - "%s/%s {Spec: %s, Status: %s}", - c.Namespace, c.Name, - CassandraClusterSpecSummary(c), - CassandraClusterStatusSummary(c), - ) -} - -func CassandraClusterSpecSummary(c *v1alpha1.CassandraCluster) string { - nodepools := make([]string, len(c.Spec.NodePools)) - for i, np := range c.Spec.NodePools { - nodepools[i] = fmt.Sprintf("%s:%d", np.Name, np.Replicas) - } - return fmt.Sprintf( - "{nodepools: %s}", - strings.Join(nodepools, ", "), - ) -} - -func CassandraClusterStatusSummary(c *v1alpha1.CassandraCluster) string { - nodepools := make([]string, len(c.Status.NodePools)) - i := 0 - for title, nps := range c.Status.NodePools { - nodepools[i] = fmt.Sprintf("%s:%d", title, nps.ReadyReplicas) - i++ - } - return fmt.Sprintf( - "{nodepools: %s}", strings.Join(nodepools, ", "), - ) -} - func TestNextAction(t *testing.T) { - f := func(c *v1alpha1.CassandraCluster) bool { - t.Log(CassandraClusterSummary(c)) + f := func(c *v1alpha1.CassandraCluster) (ret bool) { + defer func() { + if !ret { + t.Logf(pretty.Sprint(c)) + } + }() a := cassandra.NextAction(c) - if a != nil { - t.Log("Action:", a.Name()) - } else { - t.Log("No action") - } switch action := a.(type) { case *actions.CreateNodePool: _, found := c.Status.NodePools[action.NodePool.Name] @@ -72,16 +39,38 @@ func TestNextAction(t *testing.T) { t.Errorf("Unexpected attempt to scale up a nodepool with >= ready replicas") return false } + if nps.Version == nil { + t.Errorf("unexpected scaleout before version is known") + return false + } + case *actions.UpdateVersion: + nps, found := c.Status.NodePools[action.NodePool.Name] + if !found { + t.Errorf("Unexpected UpdateVersion before status reported") + return false + } + if nps.Version == nil { + t.Errorf("Unexpected UpdateVersion before version reported") + return false + } + if nps.Version.Major != c.Spec.Version.Major { + t.Errorf("Unexpected UpdateVersion for major version change") + return false + } + if nps.ReadyReplicas != action.NodePool.Replicas { + t.Errorf("Unexpected UpdateVersion before scale") + return false + } } return true } config := &quick.Config{ - MaxCount: 100, + MaxCount: 1000, Values: func(values []reflect.Value, rnd *rand.Rand) { cluster := &v1alpha1.CassandraCluster{} cluster.SetName("cluster1") cluster.SetNamespace("ns1") - casstesting.FuzzCassandraClusterNodePools(cluster, rnd, 0) + casstesting.FuzzCassandraCluster(cluster, rnd, 0) values[0] = reflect.ValueOf(cluster) }, } diff --git a/pkg/controllers/cassandra/nodepool/resource.go b/pkg/controllers/cassandra/nodepool/resource.go index b61eb7481..106b1f707 100644 --- a/pkg/controllers/cassandra/nodepool/resource.go +++ b/pkg/controllers/cassandra/nodepool/resource.go @@ -37,7 +37,7 @@ func StatefulSetForCluster( statefulSetName := util.NodePoolResourceName(cluster, np) nodePoolLabels := util.NodePoolLabels(cluster, np.Name) - image := cassImageToUse(&cluster.Spec) + image := CassImageToUse(&cluster.Spec) set := &apps.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controllers/cassandra/nodepool/util.go b/pkg/controllers/cassandra/nodepool/util.go index 70b8246fe..4c4f1f8ff 100644 --- a/pkg/controllers/cassandra/nodepool/util.go +++ b/pkg/controllers/cassandra/nodepool/util.go @@ -7,7 +7,7 @@ import ( "github.com/jetstack/navigator/pkg/cassandra/version" ) -func cassImageToUse(spec *v1alpha1.CassandraClusterSpec) *v1alpha1.ImageSpec { +func CassImageToUse(spec *v1alpha1.CassandraClusterSpec) *v1alpha1.ImageSpec { if spec.Image == nil { return defaultCassandraImageForVersion(spec.Version) } diff --git a/pkg/controllers/cassandra/pilot/pilot.go b/pkg/controllers/cassandra/pilot/pilot.go index 418d4762f..41c01cc43 100644 --- a/pkg/controllers/cassandra/pilot/pilot.go +++ b/pkg/controllers/cassandra/pilot/pilot.go @@ -9,17 +9,16 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" + "github.com/golang/glog" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/cassandra/version" navigator "github.com/jetstack/navigator/pkg/client/clientset/versioned" navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/util" ) -const ( - HashAnnotationKey = "navigator.jetstack.io/cassandra-pilot-hash" -) - type Interface interface { Sync(*v1alpha1.CassandraCluster) error } @@ -105,17 +104,65 @@ func (c *pilotControl) syncPilots(cluster *v1alpha1.CassandraCluster) error { return err } +func (c *pilotControl) updateDiscoveredVersions(cluster *v1alpha1.CassandraCluster) error { + glog.V(4).Infof("updateDiscoveredVersions for cluster: %s", cluster.Name) + selector, err := util.SelectorForCluster(cluster) + if err != nil { + return err + } + pilots, err := c.pilots.List(selector) + if err != nil { + return err + } + if len(pilots) < 1 { + glog.V(4).Infof("No pilots found matching selector: %s", selector) + } + for _, pilot := range pilots { + nodePoolNameForPilot, nodePoolNameFound := pilot.Labels[util.NodePoolNameLabelKey] + if !nodePoolNameFound { + glog.Warningf("Skipping pilot without NodePoolNameLabelKey: %s", pilot.Name) + continue + } + nodePoolStatus := cluster.Status.NodePools[nodePoolNameForPilot] + switch { + case pilot.Status.Cassandra == nil: + glog.V(4).Infof( + "Pilot %s/%s has no status. Setting nodepool version to nil", + pilot.Namespace, pilot.Name, + ) + nodePoolStatus.Version = nil + case pilot.Status.Cassandra.Version == nil: + glog.V(4).Infof( + "Pilot %s/%s has not reported its version. Setting nodepool version to nil", + pilot.Namespace, pilot.Name, + ) + nodePoolStatus.Version = nil + case nodePoolStatus.Version == nil: + nodePoolStatus.Version = nil + case pilot.Status.Cassandra.Version.LessThan(nodePoolStatus.Version): + glog.V(4).Infof( + "Found lower pilot version: %s, %s", + nodePoolNameForPilot, pilotVersionStatus, + ) + nodePoolStatus.Version = pilotVersionStatus + } + cluster.Status.NodePools[nodePoolNameForPilot] = nodePoolStatus + } + return nil +} + func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error { err := c.syncPilots(cluster) if err != nil { return err } // TODO: Housekeeping. Remove pilots that don't have a corresponding pod. - return nil + + return c.updateDiscoveredVersions(cluster) } func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot { - return &v1alpha1.Pilot{ + o := &v1alpha1.Pilot{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, Namespace: pod.Namespace, @@ -123,4 +170,67 @@ func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1. OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)}, }, } + o.Labels[util.NodePoolNameLabelKey] = pod.Labels[util.NodePoolNameLabelKey] + return o +} + +func UpdateLabels( + o metav1.Object, + newLabels map[string]string, +) { + labels := o.GetLabels() + if labels == nil { + labels = map[string]string{} + } + for key, val := range newLabels { + labels[key] = val + } + o.SetLabels(labels) +} + +type PilotBuilder struct { + pilot *v1alpha1.Pilot +} + +func NewPilotBuilder() *PilotBuilder { + return &PilotBuilder{ + pilot: &v1alpha1.Pilot{}, + } +} + +func (pb *PilotBuilder) ForCluster(cluster metav1.Object) *PilotBuilder { + UpdateLabels(pb.pilot, util.ClusterLabels(cluster)) + pb.pilot.SetNamespace(cluster.GetNamespace()) + pb.pilot.SetOwnerReferences( + append( + pb.pilot.GetOwnerReferences(), + util.NewControllerRef(cluster), + ), + ) + return pb +} + +func (pb *PilotBuilder) ForNodePool(np *v1alpha1.CassandraClusterNodePool) *PilotBuilder { + UpdateLabels( + pb.pilot, + map[string]string{ + util.NodePoolNameLabelKey: np.Name, + }, + ) + return pb +} + +func (pb *PilotBuilder) WithCassandraStatus() *PilotBuilder { + pb.pilot.Status.Cassandra = &v1alpha1.CassandraPilotStatus{} + return pb +} + +func (pb *PilotBuilder) WithDiscoveredCassandraVersion(v string) *PilotBuilder { + pb.WithCassandraStatus() + pb.pilot.Status.Cassandra.Version = version.New(v) + return pb +} + +func (pb *PilotBuilder) Build() *v1alpha1.Pilot { + return pb.pilot } diff --git a/pkg/controllers/cassandra/pilot/pilot_test.go b/pkg/controllers/cassandra/pilot/pilot_test.go index 1363e4ef5..20b4d5517 100644 --- a/pkg/controllers/cassandra/pilot/pilot_test.go +++ b/pkg/controllers/cassandra/pilot/pilot_test.go @@ -1,6 +1,7 @@ package pilot_test import ( + "reflect" "testing" "k8s.io/api/core/v1" @@ -11,6 +12,11 @@ import ( "github.com/jetstack/navigator/internal/test/unit/framework" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" + + "github.com/kr/pretty" + + "github.com/jetstack/navigator/pkg/cassandra/version" + "github.com/jetstack/navigator/pkg/controllers/cassandra/pilot" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" "github.com/jetstack/navigator/pkg/controllers/cassandra/util" @@ -139,3 +145,127 @@ func TestPilotSync(t *testing.T) { ) } } + +func AssertClusterEqual(t *testing.T, c1, c2 *v1alpha1.CassandraCluster) { + if !reflect.DeepEqual(c1, c2) { + t.Errorf("Clusters are not equal: %s", pretty.Diff(c1, c2)) + } +} + +func TestStatusUpdate(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster *v1alpha1.CassandraCluster + assertions func(t *testing.T, original, updated *v1alpha1.CassandraCluster) + expectErr bool + } + cluster := casstesting.ClusterForTest() + tests := map[string]testT{ + "no matching pilots": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder().Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "nil cassandra status": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder().ForCluster(cluster).Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "nil cassandra version": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + WithCassandraStatus(). + Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "missing nodepool label": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "set version if missing": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + ForNodePool(&cluster.Spec.NodePools[0]). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: func(t *testing.T, inCluster, outCluster *v1alpha1.CassandraCluster) { + expectedVersion := version.New("3.11.2") + actualVersion := outCluster.Status.NodePools["RingNodes"].Version + if !expectedVersion.Equal(actualVersion) { + t.Errorf("Version mismatch. Expected %s != %s", expectedVersion, actualVersion) + } + }, + }, + "set version if lower": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + ForNodePool(&cluster.Spec.NodePools[0]). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: func(t *testing.T, inCluster, outCluster *v1alpha1.CassandraCluster) { + expectedVersion := version.New("3.11.2") + actualVersion := outCluster.Status.NodePools["RingNodes"].Version + if !expectedVersion.Equal(actualVersion) { + t.Errorf("Version mismatch. Expected %s != %s", expectedVersion, actualVersion) + } + }, + }, + } + + for title, test := range tests { + t.Run( + title, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + c := pilot.NewControl( + state.NavigatorClientset, + state.PilotLister, + state.PodLister, + state.StatefulSetLister, + state.Recorder, + ) + cluster = test.cluster.DeepCopy() + err := c.Sync(cluster) + if err != nil { + if !test.expectErr { + t.Errorf("Unexpected error: %s", err) + } + } else { + if test.expectErr { + t.Error("Missing error") + } + } + if test.assertions != nil { + test.assertions(t, test.cluster, cluster) + } + }, + ) + } +} diff --git a/pkg/controllers/cassandra/testing/gen.go b/pkg/controllers/cassandra/testing/gen.go index eccdff9f0..3f036cb69 100644 --- a/pkg/controllers/cassandra/testing/gen.go +++ b/pkg/controllers/cassandra/testing/gen.go @@ -5,8 +5,34 @@ import ( "math/rand" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/cassandra/version" ) +var versions = []*version.Version{ + version.New("2.0.0"), + version.New("3.11"), + version.New("3.11.1"), + version.New("3.11.2"), + version.New("4.0.0"), +} + +func FuzzCassandraCluster(cluster *v1alpha1.CassandraCluster, rand *rand.Rand, size int) { + cluster.Spec.Version = *versions[rand.Intn(len(versions))] + FuzzCassandraClusterNodePools(cluster, rand, size) + // 20% chance of patch upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpPatch() + } + // 20% chance of minor upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpMinor() + } + // 20% chance of major upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpMajor() + } +} + func FuzzCassandraNodePool(np *v1alpha1.CassandraClusterNodePool, rand *rand.Rand, size int) { np.Replicas = rand.Int31n(5) } @@ -24,8 +50,17 @@ func FuzzCassandraClusterNodePools(cluster *v1alpha1.CassandraCluster, rand *ran } FuzzCassandraNodePool(&np, rand, size) nps := v1alpha1.CassandraClusterNodePoolStatus{ + Version: version.New(cluster.Spec.Version.String()), ReadyReplicas: np.Replicas, } + // 20% chance of too new version + if rand.Intn(4) == 0 { + nps.Version = nps.Version.BumpMajor() + } + // 20% chance of unreported version + if rand.Intn(4) == 0 { + nps.Version = nil + } // 20% chance of ScaleOut if rand.Intn(4) == 0 { np.Replicas++ diff --git a/pkg/controllers/cassandra/util/util.go b/pkg/controllers/cassandra/util/util.go index 4001ac8b8..522386d32 100644 --- a/pkg/controllers/cassandra/util/util.go +++ b/pkg/controllers/cassandra/util/util.go @@ -22,7 +22,7 @@ const ( NodePoolNameLabelKey = "navigator.jetstack.io/cassandra-node-pool-name" ) -func NewControllerRef(c *v1alpha1.CassandraCluster) metav1.OwnerReference { +func NewControllerRef(c metav1.Object) metav1.OwnerReference { return *metav1.NewControllerRef(c, schema.GroupVersionKind{ Group: navigator.GroupName, Version: "v1alpha1", @@ -71,10 +71,10 @@ func PilotRBACRoleName(c *v1alpha1.CassandraCluster) string { return fmt.Sprintf("%s-pilot", ResourceBaseName(c)) } -func ClusterLabels(c *v1alpha1.CassandraCluster) map[string]string { +func ClusterLabels(c metav1.Object) map[string]string { return map[string]string{ "app": "cassandracluster", - ClusterNameLabelKey: c.Name, + ClusterNameLabelKey: c.GetName(), } } diff --git a/pkg/pilot/cassandra/v3/pilot.go b/pkg/pilot/cassandra/v3/pilot.go index 2123b307b..b78c67380 100644 --- a/pkg/pilot/cassandra/v3/pilot.go +++ b/pkg/pilot/cassandra/v3/pilot.go @@ -10,6 +10,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/golang/glog" + "github.com/pkg/errors" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/cassandra/nodetool" @@ -85,11 +86,12 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { if pilot.Status.Cassandra == nil { pilot.Status.Cassandra = &v1alpha1.CassandraPilotStatus{} } - version, err := p.nodeTool.Version() - if err != nil { + if err == nil { + glog.V(4).Infof("Got Cassandra version: %s", version) + } else { + glog.Errorf("Error while getting Cassandra version: %s", err) pilot.Status.Cassandra.Version = nil - glog.Errorf("error while getting Cassandra version: %s", err) } pilot.Status.Cassandra.Version = version return nil @@ -98,7 +100,7 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { func localNodeUpAndNormal(nodeTool nodetool.Interface) error { nodes, err := nodeTool.Status() if err != nil { - return err + return errors.Wrap(err, "unable to get cluster status") } localNode := nodes.LocalNode() if localNode == nil { diff --git a/pkg/pilot/genericpilot/controller.go b/pkg/pilot/genericpilot/controller.go index 50c2bad55..cb3828109 100644 --- a/pkg/pilot/genericpilot/controller.go +++ b/pkg/pilot/genericpilot/controller.go @@ -2,6 +2,7 @@ package genericpilot import ( "github.com/golang/glog" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -125,7 +126,7 @@ func (g *GenericPilot) updatePilotStatus(pilot *v1alpha1.Pilot) error { // perform update in API _, err := g.client.NavigatorV1alpha1().Pilots(pilot.Namespace).UpdateStatus(pilot) - return err + return errors.Wrap(err, "unable to update pilot status") } func (g *GenericPilot) constructProcess(pilot *v1alpha1.Pilot) error {