From 5942ef2b416ad89bee0f95e3d52a1e6a7f6cd9e3 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 7 Mar 2018 15:49:35 +0000 Subject: [PATCH 01/21] Add a Cassandra upgrade mechanism * Add a NodePool.Status.Version attribute which has the lowest reported version from all the pilots in the pool. * Or nil, if the pilot failed to query its Cassandra database for its version. * Calculate an UpdateVersion action if the desired Cassandra version is higher than the version reported for a NodePool. * NodePools are upgraded one at a time. * Pods within a Nodepool are upgraded one at a time, using a rolling update strategy without any partitioning. * Upgrades are performed before any extra Nodepools are created and before nodepool scale out. Fixes: #257 --- hack/e2e.sh | 19 +++ pkg/apis/navigator/types.go | 1 + pkg/apis/navigator/v1alpha1/types.go | 3 +- .../v1alpha1/zz_generated.conversion.go | 2 + .../v1alpha1/zz_generated.deepcopy.go | 13 +- pkg/apis/navigator/zz_generated.deepcopy.go | 13 +- pkg/cassandra/version/version.go | 5 + .../cassandra/actions/update_version.go | 43 ++++++ pkg/controllers/cassandra/cassandra.go | 6 + pkg/controllers/cassandra/cluster_control.go | 11 ++ .../cassandra/nodepool/resource.go | 2 +- pkg/controllers/cassandra/nodepool/util.go | 2 +- pkg/controllers/cassandra/pilot/pilot.go | 114 ++++++++++++++- pkg/controllers/cassandra/pilot/pilot_test.go | 130 ++++++++++++++++++ pkg/controllers/cassandra/util/util.go | 6 +- pkg/pilot/cassandra/v3/pilot.go | 1 - 16 files changed, 357 insertions(+), 14 deletions(-) create mode 100644 pkg/controllers/cassandra/actions/update_version.go diff --git a/hack/e2e.sh b/hack/e2e.sh index bba2a9972..216c58694 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -310,6 +310,25 @@ function test_cassandracluster() { --debug \ --execute="INSERT INTO space1.testtable1(key, value) VALUES('testkey1', 'testvalue1')" + # Upgrade to newer patch version + export CASS_VERSION="3.11.2" + kubectl apply \ + --namespace "${namespace}" \ + --filename \ + <(envsubst \ + '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \ + < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") + + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get pilots \ + --output 'jsonpath={.items[0].status.cassandra.version}' + then + kubectl --namespace "${namespace}" get pilots -o yaml + fail_test "Pilots failed to report the expected version" + fi + # Delete the Cassandra pod and wait for the CQL service to become # unavailable (readiness probe fails) diff --git a/pkg/apis/navigator/types.go b/pkg/apis/navigator/types.go index 9f63052e7..cffb57533 100644 --- a/pkg/apis/navigator/types.go +++ b/pkg/apis/navigator/types.go @@ -51,6 +51,7 @@ type CassandraClusterStatus struct { type CassandraClusterNodePoolStatus struct { ReadyReplicas int32 + Version *version.Version } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/types.go b/pkg/apis/navigator/v1alpha1/types.go index d09825d02..0513cc014 100644 --- a/pkg/apis/navigator/v1alpha1/types.go +++ b/pkg/apis/navigator/v1alpha1/types.go @@ -81,7 +81,8 @@ type CassandraClusterStatus struct { } type CassandraClusterNodePoolStatus struct { - ReadyReplicas int32 `json:"readyReplicas"` + ReadyReplicas int32 `json:"readyReplicas"` + Version *version.Version `json:"version"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go index a290cd478..8361f9d7f 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go @@ -202,6 +202,7 @@ func Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNode func autoConvert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClusterNodePoolStatus(in *CassandraClusterNodePoolStatus, out *navigator.CassandraClusterNodePoolStatus, s conversion.Scope) error { out.ReadyReplicas = in.ReadyReplicas + out.Version = (*version.Version)(unsafe.Pointer(in.Version)) return nil } @@ -212,6 +213,7 @@ func Convert_v1alpha1_CassandraClusterNodePoolStatus_To_navigator_CassandraClust func autoConvert_navigator_CassandraClusterNodePoolStatus_To_v1alpha1_CassandraClusterNodePoolStatus(in *navigator.CassandraClusterNodePoolStatus, out *CassandraClusterNodePoolStatus, s conversion.Scope) error { out.ReadyReplicas = in.ReadyReplicas + out.Version = (*version.Version)(unsafe.Pointer(in.Version)) return nil } diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go index 0f15c581b..e158198fa 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go @@ -116,6 +116,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) { *out = *in + if in.Version != nil { + in, out := &in.Version, &out.Version + if *in == nil { + *out = nil + } else { + *out = new(version.Version) + **out = **in + } + } return } @@ -170,7 +179,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) { in, out := &in.NodePools, &out.NodePools *out = make(map[string]CassandraClusterNodePoolStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + newVal := new(CassandraClusterNodePoolStatus) + val.DeepCopyInto(newVal) + (*out)[key] = *newVal } } return diff --git a/pkg/apis/navigator/zz_generated.deepcopy.go b/pkg/apis/navigator/zz_generated.deepcopy.go index e187c402e..256e9661f 100644 --- a/pkg/apis/navigator/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/zz_generated.deepcopy.go @@ -116,6 +116,15 @@ func (in *CassandraClusterNodePool) DeepCopy() *CassandraClusterNodePool { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CassandraClusterNodePoolStatus) DeepCopyInto(out *CassandraClusterNodePoolStatus) { *out = *in + if in.Version != nil { + in, out := &in.Version, &out.Version + if *in == nil { + *out = nil + } else { + *out = new(version.Version) + **out = **in + } + } return } @@ -170,7 +179,9 @@ func (in *CassandraClusterStatus) DeepCopyInto(out *CassandraClusterStatus) { in, out := &in.NodePools, &out.NodePools *out = make(map[string]CassandraClusterNodePoolStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + newVal := new(CassandraClusterNodePoolStatus) + val.DeepCopyInto(newVal) + (*out)[key] = *newVal } } return diff --git a/pkg/cassandra/version/version.go b/pkg/cassandra/version/version.go index 330d9e48a..8b2c24a09 100644 --- a/pkg/cassandra/version/version.go +++ b/pkg/cassandra/version/version.go @@ -41,6 +41,11 @@ func (v *Version) Equal(versionB *Version) bool { return v.semver.Equal(versionB.semver) } +// TODO: Add tests for this +func (v *Version) LessThan(versionB *Version) bool { + return v.semver.LessThan(versionB.semver) +} + func (v *Version) UnmarshalJSON(data []byte) error { s, err := strconv.Unquote(string(data)) if err != nil { diff --git a/pkg/controllers/cassandra/actions/update_version.go b/pkg/controllers/cassandra/actions/update_version.go new file mode 100644 index 000000000..3df714daa --- /dev/null +++ b/pkg/controllers/cassandra/actions/update_version.go @@ -0,0 +1,43 @@ +package actions + +import ( + "fmt" + + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" + "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" + "github.com/jetstack/navigator/pkg/controllers/cassandra/util" +) + +type UpdateVersion struct { + Cluster *v1alpha1.CassandraCluster + NodePool *v1alpha1.CassandraClusterNodePool +} + +var _ controllers.Action = &UpdateVersion{} + +func (c *UpdateVersion) Name() string { + return "UpdateVersion" +} + +func (c *UpdateVersion) Execute(state *controllers.State) error { + statefulSetName := util.NodePoolResourceName(c.Cluster, c.NodePool) + statefulSet, err := state.StatefulSetLister.StatefulSets(c.Cluster.Namespace).Get(statefulSetName) + if err != nil { + return err + } + statefulSet = statefulSet.DeepCopy() + newImage := nodepool.CassImageToUse(&c.Cluster.Spec) + newImageString := fmt.Sprintf( + "%s:%s", + newImage.Repository, + newImage.Tag, + ) + oldImageString := statefulSet.Spec.Template.Spec.Containers[0].Image + if newImageString == oldImageString { + return nil + } + statefulSet.Spec.Template.Spec.Containers[0].Image = newImageString + _, err = state.Clientset.AppsV1beta1().StatefulSets(statefulSet.Namespace).Update(statefulSet) + return err +} diff --git a/pkg/controllers/cassandra/cassandra.go b/pkg/controllers/cassandra/cassandra.go index 43d177644..c154b8f2a 100644 --- a/pkg/controllers/cassandra/cassandra.go +++ b/pkg/controllers/cassandra/cassandra.go @@ -95,6 +95,12 @@ func NewCassandra( WorkFunc: cc.handleObject, }, ) + // An event handler to trigger status updates when pilots change + pilots.Informer().AddEventHandler( + &controllers.BlockingEventHandler{ + WorkFunc: cc.handleObject, + }, + ) cc.cassLister = cassClusters.Lister() cc.statefulSetLister = statefulSets.Lister() cc.cassListerSynced = cassClusters.Informer().HasSynced diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index 2b8bcc92b..fb841a95f 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -213,5 +213,16 @@ func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { } } } + for _, np := range c.Spec.NodePools { + nps := c.Status.NodePools[np.Name] + if nps.Version != nil { + if nps.Version.LessThan(&c.Spec.Version) { + return &actions.UpdateVersion{ + Cluster: c, + NodePool: &np, + } + } + } + } return nil } diff --git a/pkg/controllers/cassandra/nodepool/resource.go b/pkg/controllers/cassandra/nodepool/resource.go index b61eb7481..106b1f707 100644 --- a/pkg/controllers/cassandra/nodepool/resource.go +++ b/pkg/controllers/cassandra/nodepool/resource.go @@ -37,7 +37,7 @@ func StatefulSetForCluster( statefulSetName := util.NodePoolResourceName(cluster, np) nodePoolLabels := util.NodePoolLabels(cluster, np.Name) - image := cassImageToUse(&cluster.Spec) + image := CassImageToUse(&cluster.Spec) set := &apps.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controllers/cassandra/nodepool/util.go b/pkg/controllers/cassandra/nodepool/util.go index 70b8246fe..4c4f1f8ff 100644 --- a/pkg/controllers/cassandra/nodepool/util.go +++ b/pkg/controllers/cassandra/nodepool/util.go @@ -7,7 +7,7 @@ import ( "github.com/jetstack/navigator/pkg/cassandra/version" ) -func cassImageToUse(spec *v1alpha1.CassandraClusterSpec) *v1alpha1.ImageSpec { +func CassImageToUse(spec *v1alpha1.CassandraClusterSpec) *v1alpha1.ImageSpec { if spec.Image == nil { return defaultCassandraImageForVersion(spec.Version) } diff --git a/pkg/controllers/cassandra/pilot/pilot.go b/pkg/controllers/cassandra/pilot/pilot.go index 418d4762f..adc6fb085 100644 --- a/pkg/controllers/cassandra/pilot/pilot.go +++ b/pkg/controllers/cassandra/pilot/pilot.go @@ -9,17 +9,16 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" + "github.com/golang/glog" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/cassandra/version" navigator "github.com/jetstack/navigator/pkg/client/clientset/versioned" navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/util" ) -const ( - HashAnnotationKey = "navigator.jetstack.io/cassandra-pilot-hash" -) - type Interface interface { Sync(*v1alpha1.CassandraCluster) error } @@ -105,13 +104,57 @@ func (c *pilotControl) syncPilots(cluster *v1alpha1.CassandraCluster) error { return err } +func (c *pilotControl) updateDiscoveredVersions(cluster *v1alpha1.CassandraCluster) error { + glog.V(4).Infof("updateDiscoveredVersions for cluster: %s", cluster.Name) + selector, err := util.SelectorForCluster(cluster) + if err != nil { + return err + } + pilots, err := c.pilots.List(selector) + if err != nil { + return err + } + if len(pilots) < 1 { + glog.V(4).Infof("No pilots found matching selector: %s", selector) + } + for _, pilot := range pilots { + if pilot.Status.Cassandra == nil { + glog.V(4).Infof("Skipping pilot with nil status: %s", pilot.Name) + continue + } + version := pilot.Status.Cassandra.Version + if version == nil { + glog.V(4).Infof("Skipping pilot with nil version: %s", pilot.Name) + continue + } + nodePoolNameForPilot, nodePoolNameFound := pilot.Labels[util.NodePoolNameLabelKey] + if !nodePoolNameFound { + glog.V(4).Infof("Skipping pilot without NodePoolNameLabelKey: %s", pilot.Name) + continue + } + if cluster.Status.NodePools == nil { + glog.V(4).Infof("Initialising Status.NodePools for: %s", cluster.Name) + cluster.Status.NodePools = map[string]v1alpha1.CassandraClusterNodePoolStatus{} + } + nodePoolStatus := cluster.Status.NodePools[nodePoolNameForPilot] + if nodePoolStatus.Version == nil || version.LessThan(nodePoolStatus.Version) { + glog.V(4).Infof("Found lower pilot version: %s, %s", nodePoolNameForPilot, version) + nodePoolStatus.Version = version + cluster.Status.NodePools[nodePoolNameForPilot] = nodePoolStatus + continue + } + } + return nil +} + func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error { err := c.syncPilots(cluster) if err != nil { return err } // TODO: Housekeeping. Remove pilots that don't have a corresponding pod. - return nil + + return c.updateDiscoveredVersions(cluster) } func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot { @@ -124,3 +167,64 @@ func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1. }, } } + +func UpdateLabels( + o metav1.Object, + newLabels map[string]string, +) { + labels := o.GetLabels() + if labels == nil { + labels = map[string]string{} + } + for key, val := range newLabels { + labels[key] = val + } + o.SetLabels(labels) +} + +type PilotBuilder struct { + pilot *v1alpha1.Pilot +} + +func NewPilotBuilder() *PilotBuilder { + return &PilotBuilder{ + pilot: &v1alpha1.Pilot{}, + } +} + +func (pb *PilotBuilder) ForCluster(cluster metav1.Object) *PilotBuilder { + UpdateLabels(pb.pilot, util.ClusterLabels(cluster)) + pb.pilot.SetNamespace(cluster.GetNamespace()) + pb.pilot.SetOwnerReferences( + append( + pb.pilot.GetOwnerReferences(), + util.NewControllerRef(cluster), + ), + ) + return pb +} + +func (pb *PilotBuilder) ForNodePool(np *v1alpha1.CassandraClusterNodePool) *PilotBuilder { + UpdateLabels( + pb.pilot, + map[string]string{ + util.NodePoolNameLabelKey: np.Name, + }, + ) + return pb +} + +func (pb *PilotBuilder) WithCassandraStatus() *PilotBuilder { + pb.pilot.Status.Cassandra = &v1alpha1.CassandraPilotStatus{} + return pb +} + +func (pb *PilotBuilder) WithDiscoveredCassandraVersion(v string) *PilotBuilder { + pb.WithCassandraStatus() + pb.pilot.Status.Cassandra.Version = version.New(v) + return pb +} + +func (pb *PilotBuilder) Build() *v1alpha1.Pilot { + return pb.pilot +} diff --git a/pkg/controllers/cassandra/pilot/pilot_test.go b/pkg/controllers/cassandra/pilot/pilot_test.go index 1363e4ef5..20b4d5517 100644 --- a/pkg/controllers/cassandra/pilot/pilot_test.go +++ b/pkg/controllers/cassandra/pilot/pilot_test.go @@ -1,6 +1,7 @@ package pilot_test import ( + "reflect" "testing" "k8s.io/api/core/v1" @@ -11,6 +12,11 @@ import ( "github.com/jetstack/navigator/internal/test/unit/framework" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" + + "github.com/kr/pretty" + + "github.com/jetstack/navigator/pkg/cassandra/version" + "github.com/jetstack/navigator/pkg/controllers/cassandra/pilot" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" "github.com/jetstack/navigator/pkg/controllers/cassandra/util" @@ -139,3 +145,127 @@ func TestPilotSync(t *testing.T) { ) } } + +func AssertClusterEqual(t *testing.T, c1, c2 *v1alpha1.CassandraCluster) { + if !reflect.DeepEqual(c1, c2) { + t.Errorf("Clusters are not equal: %s", pretty.Diff(c1, c2)) + } +} + +func TestStatusUpdate(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster *v1alpha1.CassandraCluster + assertions func(t *testing.T, original, updated *v1alpha1.CassandraCluster) + expectErr bool + } + cluster := casstesting.ClusterForTest() + tests := map[string]testT{ + "no matching pilots": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder().Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "nil cassandra status": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder().ForCluster(cluster).Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "nil cassandra version": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + WithCassandraStatus(). + Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "missing nodepool label": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: AssertClusterEqual, + }, + "set version if missing": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + ForNodePool(&cluster.Spec.NodePools[0]). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: func(t *testing.T, inCluster, outCluster *v1alpha1.CassandraCluster) { + expectedVersion := version.New("3.11.2") + actualVersion := outCluster.Status.NodePools["RingNodes"].Version + if !expectedVersion.Equal(actualVersion) { + t.Errorf("Version mismatch. Expected %s != %s", expectedVersion, actualVersion) + } + }, + }, + "set version if lower": { + navObjects: []runtime.Object{ + pilot.NewPilotBuilder(). + ForCluster(cluster). + ForNodePool(&cluster.Spec.NodePools[0]). + WithDiscoveredCassandraVersion("3.11.2"). + Build(), + }, + cluster: cluster, + assertions: func(t *testing.T, inCluster, outCluster *v1alpha1.CassandraCluster) { + expectedVersion := version.New("3.11.2") + actualVersion := outCluster.Status.NodePools["RingNodes"].Version + if !expectedVersion.Equal(actualVersion) { + t.Errorf("Version mismatch. Expected %s != %s", expectedVersion, actualVersion) + } + }, + }, + } + + for title, test := range tests { + t.Run( + title, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + c := pilot.NewControl( + state.NavigatorClientset, + state.PilotLister, + state.PodLister, + state.StatefulSetLister, + state.Recorder, + ) + cluster = test.cluster.DeepCopy() + err := c.Sync(cluster) + if err != nil { + if !test.expectErr { + t.Errorf("Unexpected error: %s", err) + } + } else { + if test.expectErr { + t.Error("Missing error") + } + } + if test.assertions != nil { + test.assertions(t, test.cluster, cluster) + } + }, + ) + } +} diff --git a/pkg/controllers/cassandra/util/util.go b/pkg/controllers/cassandra/util/util.go index 4001ac8b8..522386d32 100644 --- a/pkg/controllers/cassandra/util/util.go +++ b/pkg/controllers/cassandra/util/util.go @@ -22,7 +22,7 @@ const ( NodePoolNameLabelKey = "navigator.jetstack.io/cassandra-node-pool-name" ) -func NewControllerRef(c *v1alpha1.CassandraCluster) metav1.OwnerReference { +func NewControllerRef(c metav1.Object) metav1.OwnerReference { return *metav1.NewControllerRef(c, schema.GroupVersionKind{ Group: navigator.GroupName, Version: "v1alpha1", @@ -71,10 +71,10 @@ func PilotRBACRoleName(c *v1alpha1.CassandraCluster) string { return fmt.Sprintf("%s-pilot", ResourceBaseName(c)) } -func ClusterLabels(c *v1alpha1.CassandraCluster) map[string]string { +func ClusterLabels(c metav1.Object) map[string]string { return map[string]string{ "app": "cassandracluster", - ClusterNameLabelKey: c.Name, + ClusterNameLabelKey: c.GetName(), } } diff --git a/pkg/pilot/cassandra/v3/pilot.go b/pkg/pilot/cassandra/v3/pilot.go index 2123b307b..62bf806a8 100644 --- a/pkg/pilot/cassandra/v3/pilot.go +++ b/pkg/pilot/cassandra/v3/pilot.go @@ -85,7 +85,6 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { if pilot.Status.Cassandra == nil { pilot.Status.Cassandra = &v1alpha1.CassandraPilotStatus{} } - version, err := p.nodeTool.Version() if err != nil { pilot.Status.Cassandra.Version = nil From 19a3571eee5197fd391231eb6ca74d93939276b8 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 11:58:33 +0100 Subject: [PATCH 02/21] Add documentation --- docs/cassandra.rst | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/cassandra.rst b/docs/cassandra.rst index bf8e038e7..a5d6c77ea 100644 --- a/docs/cassandra.rst +++ b/docs/cassandra.rst @@ -191,6 +191,7 @@ Supported Configuration Changes Navigator supports the following changes to a Cassandra cluster: * :ref:`create-cluster-cassandra`: Add all initially configured node pools and nodes. + * :ref:`minor-upgrade-cassandra`: Trigger a rolling upgrade of Cassandra nodes by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``. * :ref:`scale-out-cassandra`: Increase ``CassandraCluster.Spec.NodePools[0].Replicas`` to add more C* nodes to a ``nodepool``. Navigator does not currently support any other changes to the Cassandra cluster configuration. @@ -200,7 +201,6 @@ Unsupported Configuration Changes The following configuration changes are not currently supported but will be supported in the near future: - * Minor Upgrade: Trigger a rolling Cassandra upgrade by increasing the minor and / or patch components of ``CassandraCluster.Spec.Version``. * Scale In: Decrease ``CassandraCluster.Spec.NodePools[0].Replicas`` to remove C* nodes from a ``nodepool``. The following configuration changes are not currently supported: @@ -220,6 +220,19 @@ in order of ``NodePool`` and according to the process described in :ref:`scale-o The order of node creation is determined by the order of the entries in the ``CassandraCluster.Spec.NodePools`` list. You can look at ``CassandraCluster.Status.NodePools`` to see the current state. +.. _minor-upgrade-cassandra: + +Minor Upgrade +~~~~~~~~~~~~~ + +If you increment the minor or patch number in ``CassandraCluster.Spec.Version``, Navigator will trigger a rolling update of the existing C* nodes. + +C* nodes are upgraded serially, in order of NodePool and Pod ordinal, starting with the pod with the highest ordinal in the first NodePool. + +`StatefulSet Rolling Updates `_ describes the update process in more detail. + +.. note:: Major version upgrades are not yet supported. + .. _scale-out-cassandra: Scale Out From 9986345522b5be013e78d0d61d44c37409cba29b Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 13:58:53 +0100 Subject: [PATCH 03/21] Attempt to get more debug output --- hack/testdata/values.yaml | 1 + pkg/controllers/cassandra/actions/scaleout.go | 5 ++- .../cassandra/actions/update_version.go | 44 ++++++++++--------- pkg/controllers/cassandra/cluster_control.go | 6 ++- pkg/controllers/cassandra/pilot/pilot.go | 6 +-- pkg/pilot/cassandra/v3/pilot.go | 9 ++-- pkg/pilot/genericpilot/controller.go | 3 +- 7 files changed, 41 insertions(+), 33 deletions(-) diff --git a/hack/testdata/values.yaml b/hack/testdata/values.yaml index 2844eed1b..917270e01 100644 --- a/hack/testdata/values.yaml +++ b/hack/testdata/values.yaml @@ -31,3 +31,4 @@ controller: repository: quay.io/jetstack/navigator-controller tag: build pullPolicy: Never + logLevel: 4 diff --git a/pkg/controllers/cassandra/actions/scaleout.go b/pkg/controllers/cassandra/actions/scaleout.go index c095be9ac..d394eb980 100644 --- a/pkg/controllers/cassandra/actions/scaleout.go +++ b/pkg/controllers/cassandra/actions/scaleout.go @@ -4,6 +4,7 @@ import ( corev1 "k8s.io/api/core/v1" "github.com/golang/glog" + "github.com/pkg/errors" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" @@ -27,7 +28,7 @@ func (a *ScaleOut) Execute(s *controllers.State) error { existingSet, err := s.StatefulSetLister. StatefulSets(baseSet.Namespace).Get(baseSet.Name) if err != nil { - return err + return errors.Wrap(err, "unable to get existing statefulset") } newSet := existingSet.DeepCopy() if *existingSet.Spec.Replicas == a.NodePool.Replicas { @@ -46,7 +47,7 @@ func (a *ScaleOut) Execute(s *controllers.State) error { _, err = s.Clientset.AppsV1beta1(). StatefulSets(newSet.Namespace).Update(newSet) if err != nil { - return err + return errors.Wrap(err, "unable to update statefulset") } s.Recorder.Eventf( a.Cluster, diff --git a/pkg/controllers/cassandra/actions/update_version.go b/pkg/controllers/cassandra/actions/update_version.go index 3df714daa..07657f8d1 100644 --- a/pkg/controllers/cassandra/actions/update_version.go +++ b/pkg/controllers/cassandra/actions/update_version.go @@ -1,12 +1,11 @@ package actions import ( - "fmt" - "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" - "github.com/jetstack/navigator/pkg/controllers/cassandra/util" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" ) type UpdateVersion struct { @@ -16,28 +15,33 @@ type UpdateVersion struct { var _ controllers.Action = &UpdateVersion{} -func (c *UpdateVersion) Name() string { +func (a *UpdateVersion) Name() string { return "UpdateVersion" } -func (c *UpdateVersion) Execute(state *controllers.State) error { - statefulSetName := util.NodePoolResourceName(c.Cluster, c.NodePool) - statefulSet, err := state.StatefulSetLister.StatefulSets(c.Cluster.Namespace).Get(statefulSetName) +func (a *UpdateVersion) Execute(s *controllers.State) error { + baseSet := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool) + existingSet, err := s.StatefulSetLister.StatefulSets(baseSet.Namespace).Get(baseSet.Name) if err != nil { - return err + return errors.Wrap(err, "unable to get statefulset") } - statefulSet = statefulSet.DeepCopy() - newImage := nodepool.CassImageToUse(&c.Cluster.Spec) - newImageString := fmt.Sprintf( - "%s:%s", - newImage.Repository, - newImage.Tag, - ) - oldImageString := statefulSet.Spec.Template.Spec.Containers[0].Image - if newImageString == oldImageString { + newImage := baseSet.Spec.Template.Spec.Containers[0].Image + oldImage := existingSet.Spec.Template.Spec.Containers[0].Image + if newImage == oldImage { return nil } - statefulSet.Spec.Template.Spec.Containers[0].Image = newImageString - _, err = state.Clientset.AppsV1beta1().StatefulSets(statefulSet.Namespace).Update(statefulSet) - return err + newSet := existingSet.DeepCopy() + newSet.Spec.Template.Spec.Containers[0].Image = newImage + _, err = s.Clientset.AppsV1beta1().StatefulSets(newSet.Namespace).Update(newSet) + if err != nil { + return errors.Wrap(err, "unable to update statefulset") + } + s.Recorder.Eventf( + a.Cluster, + corev1.EventTypeNormal, + a.Name(), + "UpdateVersion: NodePool=%q, Version=%q, Image=%q", + a.NodePool.Name, a.Cluster.Spec.Version, newImage, + ) + return nil } diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index fb841a95f..69e0f3c76 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -2,7 +2,9 @@ package cassandra import ( "github.com/golang/glog" + "github.com/pkg/errors" apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" @@ -172,6 +174,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro a := NextAction(c) if a != nil { + glog.V(4).Infof("Executing action: %#v") err = a.Execute(e.state) if err != nil { e.recorder.Eventf( @@ -181,10 +184,9 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSync, err, ) - return err + return errors.Wrap(err, "failure while executing action") } } - e.recorder.Event( c, apiv1.EventTypeNormal, diff --git a/pkg/controllers/cassandra/pilot/pilot.go b/pkg/controllers/cassandra/pilot/pilot.go index adc6fb085..a3ed807ca 100644 --- a/pkg/controllers/cassandra/pilot/pilot.go +++ b/pkg/controllers/cassandra/pilot/pilot.go @@ -122,11 +122,6 @@ func (c *pilotControl) updateDiscoveredVersions(cluster *v1alpha1.CassandraClust glog.V(4).Infof("Skipping pilot with nil status: %s", pilot.Name) continue } - version := pilot.Status.Cassandra.Version - if version == nil { - glog.V(4).Infof("Skipping pilot with nil version: %s", pilot.Name) - continue - } nodePoolNameForPilot, nodePoolNameFound := pilot.Labels[util.NodePoolNameLabelKey] if !nodePoolNameFound { glog.V(4).Infof("Skipping pilot without NodePoolNameLabelKey: %s", pilot.Name) @@ -136,6 +131,7 @@ func (c *pilotControl) updateDiscoveredVersions(cluster *v1alpha1.CassandraClust glog.V(4).Infof("Initialising Status.NodePools for: %s", cluster.Name) cluster.Status.NodePools = map[string]v1alpha1.CassandraClusterNodePoolStatus{} } + version := pilot.Status.Cassandra.Version nodePoolStatus := cluster.Status.NodePools[nodePoolNameForPilot] if nodePoolStatus.Version == nil || version.LessThan(nodePoolStatus.Version) { glog.V(4).Infof("Found lower pilot version: %s, %s", nodePoolNameForPilot, version) diff --git a/pkg/pilot/cassandra/v3/pilot.go b/pkg/pilot/cassandra/v3/pilot.go index 62bf806a8..b78c67380 100644 --- a/pkg/pilot/cassandra/v3/pilot.go +++ b/pkg/pilot/cassandra/v3/pilot.go @@ -10,6 +10,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/golang/glog" + "github.com/pkg/errors" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/cassandra/nodetool" @@ -86,9 +87,11 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { pilot.Status.Cassandra = &v1alpha1.CassandraPilotStatus{} } version, err := p.nodeTool.Version() - if err != nil { + if err == nil { + glog.V(4).Infof("Got Cassandra version: %s", version) + } else { + glog.Errorf("Error while getting Cassandra version: %s", err) pilot.Status.Cassandra.Version = nil - glog.Errorf("error while getting Cassandra version: %s", err) } pilot.Status.Cassandra.Version = version return nil @@ -97,7 +100,7 @@ func (p *Pilot) syncFunc(pilot *v1alpha1.Pilot) error { func localNodeUpAndNormal(nodeTool nodetool.Interface) error { nodes, err := nodeTool.Status() if err != nil { - return err + return errors.Wrap(err, "unable to get cluster status") } localNode := nodes.LocalNode() if localNode == nil { diff --git a/pkg/pilot/genericpilot/controller.go b/pkg/pilot/genericpilot/controller.go index 50c2bad55..cb3828109 100644 --- a/pkg/pilot/genericpilot/controller.go +++ b/pkg/pilot/genericpilot/controller.go @@ -2,6 +2,7 @@ package genericpilot import ( "github.com/golang/glog" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -125,7 +126,7 @@ func (g *GenericPilot) updatePilotStatus(pilot *v1alpha1.Pilot) error { // perform update in API _, err := g.client.NavigatorV1alpha1().Pilots(pilot.Namespace).UpdateStatus(pilot) - return err + return errors.Wrap(err, "unable to update pilot status") } func (g *GenericPilot) constructProcess(pilot *v1alpha1.Pilot) error { From f66a620d0b063d00fbf161c20fbbd1da0bb964b2 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 13:59:10 +0100 Subject: [PATCH 04/21] ./hack/update-lint.sh --- pkg/controllers/cassandra/actions/update_version.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/cassandra/actions/update_version.go b/pkg/controllers/cassandra/actions/update_version.go index 07657f8d1..a641703a2 100644 --- a/pkg/controllers/cassandra/actions/update_version.go +++ b/pkg/controllers/cassandra/actions/update_version.go @@ -1,11 +1,12 @@ package actions import ( + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" ) type UpdateVersion struct { From d737e77f30e7b0f04a7910eb5d97b023cfd6456b Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 14:01:27 +0100 Subject: [PATCH 05/21] dep ensure -no-vendor -v --- Gopkg.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gopkg.lock b/Gopkg.lock index c3e6c861f..2c5a06810 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -423,6 +423,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "a4c2d88d2135361c634b51caae45b70cf3fab630b766b68d53d3a3cde2a00db4" + inputs-digest = "e98c554fe619d67147695aed720852084b76d9957a3db2a04080f76db50ae477" solver-name = "gps-cdcl" solver-version = 1 From 585d596e4fabf6a8abe0331a801d59c4e6edb10b Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 14:05:26 +0100 Subject: [PATCH 06/21] Wrap errors in createnodepool too --- pkg/controllers/cassandra/actions/create_nodepool.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/cassandra/actions/create_nodepool.go b/pkg/controllers/cassandra/actions/create_nodepool.go index 0f9cd0333..3aeadc409 100644 --- a/pkg/controllers/cassandra/actions/create_nodepool.go +++ b/pkg/controllers/cassandra/actions/create_nodepool.go @@ -4,6 +4,8 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" + "github.com/pkg/errors" + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" @@ -27,7 +29,7 @@ func (a *CreateNodePool) Execute(s *controllers.State) error { return nil } if err != nil { - return err + return errors.Wrap(err, "unable to create statefulset") } s.Recorder.Eventf( a.Cluster, From 49eaaf55ba7fbf5f13fda5bc6917faab95618303 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 14:29:26 +0100 Subject: [PATCH 07/21] test for an upgrade event --- hack/e2e.sh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/hack/e2e.sh b/hack/e2e.sh index 216c58694..c8b460bc5 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -319,6 +319,13 @@ function test_cassandracluster() { '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \ < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") + # The cluster is upgraded + if ! retry TIMEOUT=300 kube_event_exists "${namespace}" \ + "navigator-controller:CassandraCluster:Normal:UpdateVersion" + then + fail_test "An UpdateVersion event was not recorded" + fi + if ! retry TIMEOUT=300 \ stdout_equals "${CASS_VERSION}" \ kubectl --namespace "${namespace}" \ From c3b518c0fe446c9287ea9b1c3375ee30ebcf7540 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 15:00:28 +0100 Subject: [PATCH 08/21] Log the action. --- pkg/controllers/cassandra/cluster_control.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index 69e0f3c76..a564740c2 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -174,7 +174,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro a := NextAction(c) if a != nil { - glog.V(4).Infof("Executing action: %#v") + glog.V(4).Infof("Executing action: %#v", a) err = a.Execute(e.state) if err != nil { e.recorder.Eventf( From 3142361d5308241b5ff18b0d3dba4c7a493e1301 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 15:00:42 +0100 Subject: [PATCH 09/21] Set the NodePool label on pilots. --- pkg/controllers/cassandra/pilot/pilot.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/cassandra/pilot/pilot.go b/pkg/controllers/cassandra/pilot/pilot.go index a3ed807ca..323486e53 100644 --- a/pkg/controllers/cassandra/pilot/pilot.go +++ b/pkg/controllers/cassandra/pilot/pilot.go @@ -154,7 +154,7 @@ func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error { } func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot { - return &v1alpha1.Pilot{ + o := &v1alpha1.Pilot{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, Namespace: pod.Namespace, @@ -162,6 +162,8 @@ func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1. OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)}, }, } + o.Labels[util.NodePoolNameLabelKey] = pod.Labels[util.NodePoolNameLabelKey] + return o } func UpdateLabels( From 9fdecd7dcc570808384078515c4d9984b83eeb1d Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 17:23:06 +0100 Subject: [PATCH 10/21] Add tests for upgrades --- pkg/cassandra/version/version.go | 73 ++++++++++++------- pkg/controllers/cassandra/cluster_control.go | 19 +++-- .../cassandra/cluster_control_test.go | 36 ++++++--- pkg/controllers/cassandra/testing/gen.go | 35 +++++++++ 4 files changed, 118 insertions(+), 45 deletions(-) diff --git a/pkg/cassandra/version/version.go b/pkg/cassandra/version/version.go index 8b2c24a09..5ea13a6d4 100644 --- a/pkg/cassandra/version/version.go +++ b/pkg/cassandra/version/version.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/coreos/go-semver/semver" + utilerror "k8s.io/apimachinery/pkg/util/errors" ) // Version represents a Cassandra database server version. @@ -26,6 +27,9 @@ import ( type Version struct { versionString string semver semver.Version + Major int64 + Minor int64 + Patch int64 } func New(s string) *Version { @@ -56,35 +60,32 @@ func (v *Version) UnmarshalJSON(data []byte) error { func (v *Version) set(cassVersionString string) error { var versionsTried []string - var errorsEncountered []string + var errs []error - errorWhileParsingOriginalVersion := v.semver.Set(cassVersionString) - if errorWhileParsingOriginalVersion == nil { - v.versionString = cassVersionString - return nil - } - - versionsTried = append(versionsTried, cassVersionString) - errorsEncountered = append(errorsEncountered, errorWhileParsingOriginalVersion.Error()) - - semverString := maybeAddMissingPatchVersion(cassVersionString) - if semverString != cassVersionString { - errorWhileParsingSemverVersion := v.semver.Set(semverString) - if errorWhileParsingSemverVersion == nil { - v.versionString = cassVersionString - return nil + err := v.semver.Set(cassVersionString) + if err != nil { + versionsTried = append(versionsTried, cassVersionString) + errs = append(errs, err) + + semverString := maybeAddMissingPatchVersion(cassVersionString) + if semverString != cassVersionString { + err = v.semver.Set(semverString) + versionsTried = append(versionsTried, semverString) + errs = append(errs, err) } - versionsTried = append(versionsTried, semverString) - errorsEncountered = append(errorsEncountered, errorWhileParsingSemverVersion.Error()) } - - return fmt.Errorf( - "unable to parse Cassandra version as semver. "+ - "Versions tried: '%s'. "+ - "Errors encountered: '%s'.", - strings.Join(versionsTried, "','"), - strings.Join(errorsEncountered, "','"), - ) + if err != nil { + return fmt.Errorf( + "unable to parse Cassandra version as semver. Versions tried: '%s'. Errors: %s.", + strings.Join(versionsTried, "','"), + utilerror.NewAggregate(errs), + ) + } + v.versionString = cassVersionString + v.Major = v.semver.Major + v.Minor = v.semver.Minor + v.Patch = v.semver.Patch + return nil } var _ json.Unmarshaler = &Version{} @@ -104,12 +105,28 @@ func (v Version) String() string { return v.versionString } -func (v Version) MarshalJSON() ([]byte, error) { +func (v *Version) MarshalJSON() ([]byte, error) { return []byte(strconv.Quote(v.String())), nil } var _ json.Marshaler = &Version{} -func (v Version) Semver() string { +func (v *Version) Semver() string { return v.semver.String() } + +func (v *Version) BumpPatch() *Version { + sv := semver.New(v.Semver()) + sv.BumpPatch() + return New(sv.String()) +} +func (v *Version) BumpMinor() *Version { + sv := semver.New(v.Semver()) + sv.BumpMinor() + return New(sv.String()) +} +func (v *Version) BumpMajor() *Version { + sv := semver.New(v.Semver()) + sv.BumpMajor() + return New(sv.String()) +} diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index a564740c2..6b73b1f71 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -217,13 +217,20 @@ func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { } for _, np := range c.Spec.NodePools { nps := c.Status.NodePools[np.Name] - if nps.Version != nil { - if nps.Version.LessThan(&c.Spec.Version) { - return &actions.UpdateVersion{ - Cluster: c, - NodePool: &np, - } + if nps.Version == nil { + return nil + } + if nps.Version.LessThan(&c.Spec.Version) { + // if nps.Version.Major != c.Spec.Version.Major { + // glog.Error("Major version upgrades are not supported") + // return nil + // } + return &actions.UpdateVersion{ + Cluster: c, + NodePool: &np, } + } else { + glog.Error("Version downgrades are not supported") } } return nil diff --git a/pkg/controllers/cassandra/cluster_control_test.go b/pkg/controllers/cassandra/cluster_control_test.go index 13b375f03..57221a3f8 100644 --- a/pkg/controllers/cassandra/cluster_control_test.go +++ b/pkg/controllers/cassandra/cluster_control_test.go @@ -29,7 +29,8 @@ func CassandraClusterSpecSummary(c *v1alpha1.CassandraCluster) string { nodepools[i] = fmt.Sprintf("%s:%d", np.Name, np.Replicas) } return fmt.Sprintf( - "{nodepools: %s}", + "{version: %s, nodepools: %s}", + c.Spec.Version, strings.Join(nodepools, ", "), ) } @@ -38,7 +39,7 @@ func CassandraClusterStatusSummary(c *v1alpha1.CassandraCluster) string { nodepools := make([]string, len(c.Status.NodePools)) i := 0 for title, nps := range c.Status.NodePools { - nodepools[i] = fmt.Sprintf("%s:%d", title, nps.ReadyReplicas) + nodepools[i] = fmt.Sprintf("%s:%d:%s", title, nps.ReadyReplicas, nps.Version) i++ } return fmt.Sprintf( @@ -47,14 +48,13 @@ func CassandraClusterStatusSummary(c *v1alpha1.CassandraCluster) string { } func TestNextAction(t *testing.T) { - f := func(c *v1alpha1.CassandraCluster) bool { - t.Log(CassandraClusterSummary(c)) + f := func(c *v1alpha1.CassandraCluster) (ret bool) { + defer func() { + if !ret { + t.Log(CassandraClusterSummary(c)) + } + }() a := cassandra.NextAction(c) - if a != nil { - t.Log("Action:", a.Name()) - } else { - t.Log("No action") - } switch action := a.(type) { case *actions.CreateNodePool: _, found := c.Status.NodePools[action.NodePool.Name] @@ -62,6 +62,20 @@ func TestNextAction(t *testing.T) { t.Errorf("Unexpected attempt to create a nodepool when there's an existing status") return false } + case *actions.UpdateVersion: + nps, found := c.Status.NodePools[action.NodePool.Name] + if !found { + t.Errorf("Unexpected updateversion before status reported") + return false + } + if nps.Version == nil { + t.Errorf("Unexpected updateversion before version reported") + return false + } + if nps.Version.Major != c.Spec.Version.Major { + t.Errorf("Unexpected updateversion for major version change") + return false + } case *actions.ScaleOut: nps, found := c.Status.NodePools[action.NodePool.Name] if !found { @@ -76,12 +90,12 @@ func TestNextAction(t *testing.T) { return true } config := &quick.Config{ - MaxCount: 100, + MaxCount: 1000, Values: func(values []reflect.Value, rnd *rand.Rand) { cluster := &v1alpha1.CassandraCluster{} cluster.SetName("cluster1") cluster.SetNamespace("ns1") - casstesting.FuzzCassandraClusterNodePools(cluster, rnd, 0) + casstesting.FuzzCassandraCluster(cluster, rnd, 0) values[0] = reflect.ValueOf(cluster) }, } diff --git a/pkg/controllers/cassandra/testing/gen.go b/pkg/controllers/cassandra/testing/gen.go index eccdff9f0..3f036cb69 100644 --- a/pkg/controllers/cassandra/testing/gen.go +++ b/pkg/controllers/cassandra/testing/gen.go @@ -5,8 +5,34 @@ import ( "math/rand" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/cassandra/version" ) +var versions = []*version.Version{ + version.New("2.0.0"), + version.New("3.11"), + version.New("3.11.1"), + version.New("3.11.2"), + version.New("4.0.0"), +} + +func FuzzCassandraCluster(cluster *v1alpha1.CassandraCluster, rand *rand.Rand, size int) { + cluster.Spec.Version = *versions[rand.Intn(len(versions))] + FuzzCassandraClusterNodePools(cluster, rand, size) + // 20% chance of patch upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpPatch() + } + // 20% chance of minor upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpMinor() + } + // 20% chance of major upgrade + if rand.Intn(4) == 0 { + cluster.Spec.Version = *cluster.Spec.Version.BumpMajor() + } +} + func FuzzCassandraNodePool(np *v1alpha1.CassandraClusterNodePool, rand *rand.Rand, size int) { np.Replicas = rand.Int31n(5) } @@ -24,8 +50,17 @@ func FuzzCassandraClusterNodePools(cluster *v1alpha1.CassandraCluster, rand *ran } FuzzCassandraNodePool(&np, rand, size) nps := v1alpha1.CassandraClusterNodePoolStatus{ + Version: version.New(cluster.Spec.Version.String()), ReadyReplicas: np.Replicas, } + // 20% chance of too new version + if rand.Intn(4) == 0 { + nps.Version = nps.Version.BumpMajor() + } + // 20% chance of unreported version + if rand.Intn(4) == 0 { + nps.Version = nil + } // 20% chance of ScaleOut if rand.Intn(4) == 0 { np.Replicas++ From 7531157dc147e7db2181ae403778ff504349546a Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 4 Apr 2018 17:50:51 +0100 Subject: [PATCH 11/21] Prevent major version upgrades --- pkg/controllers/cassandra/cluster_control.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index 6b73b1f71..b42bb2b98 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -221,17 +221,16 @@ func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { return nil } if nps.Version.LessThan(&c.Spec.Version) { - // if nps.Version.Major != c.Spec.Version.Major { - // glog.Error("Major version upgrades are not supported") - // return nil - // } + if nps.Version.Major != c.Spec.Version.Major { + glog.Error("Major version upgrades are not supported") + return nil + } return &actions.UpdateVersion{ Cluster: c, NodePool: &np, } - } else { - glog.Error("Version downgrades are not supported") } + glog.Error("Version downgrades are not supported") } return nil } From 0d64ffb3af6f316725c2f3eb85b00ae1ab096e6d Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 10:04:07 +0100 Subject: [PATCH 12/21] Add unit tests for update_version action --- internal/test/util/generate/generate.go | 5 + .../cassandra/actions/update_version.go | 9 + .../cassandra/actions/update_version_test.go | 162 ++++++++++++++++++ 3 files changed, 176 insertions(+) create mode 100644 pkg/controllers/cassandra/actions/update_version_test.go diff --git a/internal/test/util/generate/generate.go b/internal/test/util/generate/generate.go index d202a8d1e..53686407e 100644 --- a/internal/test/util/generate/generate.go +++ b/internal/test/util/generate/generate.go @@ -7,6 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/cassandra/version" ) type PilotConfig struct { @@ -121,6 +122,7 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet { type CassandraClusterConfig struct { Name, Namespace string + Version version.Version } func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster { @@ -129,6 +131,9 @@ func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster { Name: c.Name, Namespace: c.Namespace, }, + Spec: v1alpha1.CassandraClusterSpec{ + Version: c.Version, + }, } } diff --git a/pkg/controllers/cassandra/actions/update_version.go b/pkg/controllers/cassandra/actions/update_version.go index a641703a2..9d47e6a73 100644 --- a/pkg/controllers/cassandra/actions/update_version.go +++ b/pkg/controllers/cassandra/actions/update_version.go @@ -1,6 +1,7 @@ package actions import ( + "github.com/golang/glog" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -29,8 +30,16 @@ func (a *UpdateVersion) Execute(s *controllers.State) error { newImage := baseSet.Spec.Template.Spec.Containers[0].Image oldImage := existingSet.Spec.Template.Spec.Containers[0].Image if newImage == oldImage { + glog.V(4).Infof( + "StatefulSet %q already has the desired image %q", + existingSet.Name, newImage, + ) return nil } + glog.V(4).Infof( + "Replacing StatefulSet %q image %q with %q", + existingSet.Name, oldImage, newImage, + ) newSet := existingSet.DeepCopy() newSet.Spec.Template.Spec.Containers[0].Image = newImage _, err = s.Clientset.AppsV1beta1().StatefulSets(newSet.Namespace).Update(newSet) diff --git a/pkg/controllers/cassandra/actions/update_version_test.go b/pkg/controllers/cassandra/actions/update_version_test.go new file mode 100644 index 000000000..71827415b --- /dev/null +++ b/pkg/controllers/cassandra/actions/update_version_test.go @@ -0,0 +1,162 @@ +package actions_test + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/internal/test/unit/framework" + "github.com/jetstack/navigator/internal/test/util/generate" + "github.com/jetstack/navigator/pkg/cassandra/version" + "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" +) + +func TestUpdateVersion(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster generate.CassandraClusterConfig + nodePool generate.CassandraClusterNodePoolConfig + expectedStatefulSet *generate.StatefulSetConfig + expectedErr bool + mutator func(*framework.StateFixture) + } + tests := map[string]testT{ + "Error if StatefulSet not listed": { + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedErr: true, + }, + "Error if clientset.Update fails (e.g. listed but not found)": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedErr: true, + mutator: func(f *framework.StateFixture) { + err := f.KubeClient(). + AppsV1beta1(). + StatefulSets("ns1"). + Delete("cass-cluster1-pool1", &metav1.DeleteOptions{}) + if err != nil { + f.T.Fatal(err) + } + }, + }, + "Idempotent: No error if the image already matches the actual image": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + Version: *version.New("3.11.2"), + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + expectedErr: false, + }, + "The image is updated": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.1", + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + Version: *version.New("3.11.2"), + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Image: "docker.io/cassandra:3.11.2", + }, + }, + } + + for name, test := range tests { + t.Run( + name, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + if test.mutator != nil { + test.mutator(fixture) + } + a := &actions.UpdateVersion{ + Cluster: generate.CassandraCluster(test.cluster), + NodePool: generate.CassandraClusterNodePool(test.nodePool), + } + err := a.Execute(state) + if err != nil { + t.Logf("The error returned by Execute was: %s", err) + } + if !test.expectedErr && err != nil { + t.Errorf("Unexpected error: %s", err) + } + if test.expectedErr && err == nil { + t.Errorf("Expected an error") + } + if test.expectedStatefulSet != nil { + actualStatefulSet, err := fixture.KubeClient(). + AppsV1beta1(). + StatefulSets(test.expectedStatefulSet.Namespace). + Get(test.expectedStatefulSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error retrieving statefulset: %v", err) + } + actualImage := actualStatefulSet.Spec.Template.Spec.Containers[0].Image + if test.expectedStatefulSet.Image != actualImage { + t.Errorf( + "Unexpected image. Expected: %s. Actual: %s", + test.expectedStatefulSet.Image, actualImage, + ) + } + } + }, + ) + } +} From dbc264038473726ab93e9790d34b06077466aaee Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 10:28:40 +0100 Subject: [PATCH 13/21] Add E2E tests for nodepool status version reporting. --- hack/e2e.sh | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/hack/e2e.sh b/hack/e2e.sh index c8b460bc5..ea54418fb 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -273,12 +273,23 @@ function test_cassandracluster() { stdout_equals "${CASS_VERSION}" \ kubectl --namespace "${namespace}" \ get pilots \ - --output 'jsonpath={.items[0].status.cassandra.version}' + --selector "navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \ + --output 'jsonpath={.items[*].status.cassandra.version}' then kubectl --namespace "${namespace}" get pilots -o yaml fail_test "Pilots failed to report the expected version" fi + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get cassandracluster "${CASS_NAME}" \ + --output 'jsonpath={.status.nodePools[*].version}' + then + kubectl --namespace "${namespace}" get cassandracluster -o yaml + fail_test "NodePools failed to report the expected version" + fi + # Wait 5 minutes for cassandra to start and listen for CQL queries. if ! retry TIMEOUT=300 cql_connect \ "${namespace}" \ @@ -330,12 +341,23 @@ function test_cassandracluster() { stdout_equals "${CASS_VERSION}" \ kubectl --namespace "${namespace}" \ get pilots \ - --output 'jsonpath={.items[0].status.cassandra.version}' + --selector "navigator.jetstack.io/cassandra-cluster-name=${CASS_NAME}" \ + --output 'jsonpath={.items[*].status.cassandra.version}' then kubectl --namespace "${namespace}" get pilots -o yaml fail_test "Pilots failed to report the expected version" fi + if ! retry TIMEOUT=300 \ + stdout_equals "${CASS_VERSION}" \ + kubectl --namespace "${namespace}" \ + get cassandracluster "${CASS_NAME}" \ + --output 'jsonpath={.status.nodePools[*].version}' + then + kubectl --namespace "${namespace}" get cassandracluster -o yaml + fail_test "NodePools failed to report the expected version" + fi + # Delete the Cassandra pod and wait for the CQL service to become # unavailable (readiness probe fails) From d3f73ea991ee123d831785360c508812c4a92891 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 11:00:06 +0100 Subject: [PATCH 14/21] Add version change validation --- pkg/apis/navigator/validation/cassandra.go | 20 ++++++++++++++ .../navigator/validation/cassandra_test.go | 27 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/pkg/apis/navigator/validation/cassandra.go b/pkg/apis/navigator/validation/cassandra.go index 559fc4cbe..ae4ed5fbc 100644 --- a/pkg/apis/navigator/validation/cassandra.go +++ b/pkg/apis/navigator/validation/cassandra.go @@ -27,6 +27,26 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field. fldPath := field.NewPath("spec") + if new.Spec.Version.LessThan(&old.Spec.Version) { + allErrs = append( + allErrs, + field.Forbidden( + fldPath.Child("version"), + "cannot perform version downgrades", + ), + ) + } + + if new.Spec.Version.Major != old.Spec.Version.Major { + allErrs = append( + allErrs, + field.Forbidden( + fldPath.Child("version"), + "cannot perform major version upgrades", + ), + ) + } + npPath := fldPath.Child("nodePools") for i, newNp := range new.Spec.NodePools { idxPath := npPath.Index(i) diff --git a/pkg/apis/navigator/validation/cassandra_test.go b/pkg/apis/navigator/validation/cassandra_test.go index 7ba419c4c..742c3f9ad 100644 --- a/pkg/apis/navigator/validation/cassandra_test.go +++ b/pkg/apis/navigator/validation/cassandra_test.go @@ -128,6 +128,15 @@ func TestValidateCassandraClusterUpdate(t *testing.T) { return c } + setVersion := func( + c *navigator.CassandraCluster, + v *version.Version, + ) *navigator.CassandraCluster { + c = c.DeepCopy() + c.Spec.Version = *v + return c + } + tests := map[string]testT{ "unchanged cluster": { old: validCassCluster, @@ -147,6 +156,24 @@ func TestValidateCassandraClusterUpdate(t *testing.T) { old: setPersistence(validCassCluster, navigator.PersistenceConfig{Enabled: false}), new: validCassCluster, }, + "decrease version": { + old: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMinor()), + new: validCassCluster, + errorExpected: true, + }, + "major version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMajor()), + errorExpected: true, + }, + "minor version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpMinor()), + }, + "patch version upgrade": { + old: validCassCluster, + new: setVersion(validCassCluster, validCassCluster.Spec.Version.BumpPatch()), + }, } for title, persistence := range persistenceErrorCases { From 7578d31b51ee9fc8b84d5c30bc03d8658e1a6a93 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 11:53:11 +0100 Subject: [PATCH 15/21] Prevent major versions other than 3 --- pkg/apis/navigator/validation/cassandra.go | 18 ++++++++++++++++++ .../navigator/validation/cassandra_test.go | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/pkg/apis/navigator/validation/cassandra.go b/pkg/apis/navigator/validation/cassandra.go index ae4ed5fbc..7ecea7bc1 100644 --- a/pkg/apis/navigator/validation/cassandra.go +++ b/pkg/apis/navigator/validation/cassandra.go @@ -1,6 +1,7 @@ package validation import ( + "fmt" "reflect" apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" @@ -10,6 +11,8 @@ import ( "github.com/jetstack/navigator/pkg/apis/navigator" ) +var supportedMajorVersions = sets.NewInt64(3) + func ValidateCassandraClusterNodePool(np *navigator.CassandraClusterNodePool, fldPath *field.Path) field.ErrorList { // TODO: call k8s.io/kubernetes/pkg/apis/core/validation.ValidateResourceRequirements on np.Resources // this will require vendoring kubernetes/kubernetes. @@ -80,6 +83,21 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field. func ValidateCassandraClusterSpec(spec *navigator.CassandraClusterSpec, fldPath *field.Path) field.ErrorList { allErrs := ValidateNavigatorClusterConfig(&spec.NavigatorClusterConfig, fldPath) + + if !supportedMajorVersions.Has(spec.Version.Major) { + allErrs = append( + allErrs, + field.Forbidden( + fldPath.Child("version"), + fmt.Sprintf( + "%s is not supported. Supported major versions are: %v", + spec.Version, + supportedMajorVersions.List(), + ), + ), + ) + } + npPath := fldPath.Child("nodePools") allNames := sets.String{} for i, np := range spec.NodePools { diff --git a/pkg/apis/navigator/validation/cassandra_test.go b/pkg/apis/navigator/validation/cassandra_test.go index 742c3f9ad..2ca093061 100644 --- a/pkg/apis/navigator/validation/cassandra_test.go +++ b/pkg/apis/navigator/validation/cassandra_test.go @@ -17,7 +17,7 @@ var ( Namespace: "bar", }, Spec: navigator.CassandraClusterSpec{ - Version: *version.New("5.6.2"), + Version: *version.New("3.11.2"), Image: &validImageSpec, NavigatorClusterConfig: validNavigatorClusterConfig, NodePools: []navigator.CassandraClusterNodePool{ From 037189ba242ec9e370f734c56cfe3e32d3d83893 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 13:17:29 +0100 Subject: [PATCH 16/21] Fix jsonpath --- hack/e2e.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hack/e2e.sh b/hack/e2e.sh index ea54418fb..3d680e73c 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -284,7 +284,7 @@ function test_cassandracluster() { stdout_equals "${CASS_VERSION}" \ kubectl --namespace "${namespace}" \ get cassandracluster "${CASS_NAME}" \ - --output 'jsonpath={.status.nodePools[*].version}' + --output 'jsonpath={.status.nodePools.*.version}' then kubectl --namespace "${namespace}" get cassandracluster -o yaml fail_test "NodePools failed to report the expected version" @@ -352,7 +352,7 @@ function test_cassandracluster() { stdout_equals "${CASS_VERSION}" \ kubectl --namespace "${namespace}" \ get cassandracluster "${CASS_NAME}" \ - --output 'jsonpath={.status.nodePools[*].version}' + --output 'jsonpath={.status.nodePools.*.version}' then kubectl --namespace "${namespace}" get cassandracluster -o yaml fail_test "NodePools failed to report the expected version" From 01dc131cb97874a51414d89bfe957757010cd66c Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 15:27:17 +0100 Subject: [PATCH 17/21] Document the Version status field --- pkg/apis/navigator/v1alpha1/types.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/apis/navigator/v1alpha1/types.go b/pkg/apis/navigator/v1alpha1/types.go index 0513cc014..06fdd445f 100644 --- a/pkg/apis/navigator/v1alpha1/types.go +++ b/pkg/apis/navigator/v1alpha1/types.go @@ -81,8 +81,13 @@ type CassandraClusterStatus struct { } type CassandraClusterNodePoolStatus struct { - ReadyReplicas int32 `json:"readyReplicas"` - Version *version.Version `json:"version"` + ReadyReplicas int32 `json:"readyReplicas"` + // The lowest version of Cassandra found to be running in this nodepool, + // as reported by the Cassandra process. + // nil or empty if the lowest version can not be determined, + // or if the lowest version has not yet been determined + // +optional + Version *version.Version `json:"version,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object From 8cb77bc9b576d536be57ac2bcf63a8cb820b6ced Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 15:45:36 +0100 Subject: [PATCH 18/21] Remove dodgy cluster summary printing and replace with pretty printed output --- .../cassandra/cluster_control_test.go | 39 ++----------------- 1 file changed, 3 insertions(+), 36 deletions(-) diff --git a/pkg/controllers/cassandra/cluster_control_test.go b/pkg/controllers/cassandra/cluster_control_test.go index 57221a3f8..a44c211bd 100644 --- a/pkg/controllers/cassandra/cluster_control_test.go +++ b/pkg/controllers/cassandra/cluster_control_test.go @@ -1,57 +1,24 @@ package cassandra_test import ( - "fmt" "math/rand" "reflect" - "strings" "testing" "testing/quick" + "github.com/kr/pretty" + v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers/cassandra" "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" ) -func CassandraClusterSummary(c *v1alpha1.CassandraCluster) string { - return fmt.Sprintf( - "%s/%s {Spec: %s, Status: %s}", - c.Namespace, c.Name, - CassandraClusterSpecSummary(c), - CassandraClusterStatusSummary(c), - ) -} - -func CassandraClusterSpecSummary(c *v1alpha1.CassandraCluster) string { - nodepools := make([]string, len(c.Spec.NodePools)) - for i, np := range c.Spec.NodePools { - nodepools[i] = fmt.Sprintf("%s:%d", np.Name, np.Replicas) - } - return fmt.Sprintf( - "{version: %s, nodepools: %s}", - c.Spec.Version, - strings.Join(nodepools, ", "), - ) -} - -func CassandraClusterStatusSummary(c *v1alpha1.CassandraCluster) string { - nodepools := make([]string, len(c.Status.NodePools)) - i := 0 - for title, nps := range c.Status.NodePools { - nodepools[i] = fmt.Sprintf("%s:%d:%s", title, nps.ReadyReplicas, nps.Version) - i++ - } - return fmt.Sprintf( - "{nodepools: %s}", strings.Join(nodepools, ", "), - ) -} - func TestNextAction(t *testing.T) { f := func(c *v1alpha1.CassandraCluster) (ret bool) { defer func() { if !ret { - t.Log(CassandraClusterSummary(c)) + t.Logf(pretty.Sprint(c)) } }() a := cassandra.NextAction(c) From 0df952553c24c7bac7121a31ee6d47467578ccf8 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 17:28:46 +0100 Subject: [PATCH 19/21] Check that all nodes are ready before adding more nodes --- pkg/controllers/cassandra/actions/scaleout.go | 23 ++++++- .../cassandra/actions/scaleout_test.go | 62 ++++++++++++++----- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/pkg/controllers/cassandra/actions/scaleout.go b/pkg/controllers/cassandra/actions/scaleout.go index d394eb980..760fc7438 100644 --- a/pkg/controllers/cassandra/actions/scaleout.go +++ b/pkg/controllers/cassandra/actions/scaleout.go @@ -30,19 +30,35 @@ func (a *ScaleOut) Execute(s *controllers.State) error { if err != nil { return errors.Wrap(err, "unable to get existing statefulset") } - newSet := existingSet.DeepCopy() if *existingSet.Spec.Replicas == a.NodePool.Replicas { + glog.V(4).Infof( + "The StatefulSet %s/%s already has the desired replicas value %d", + existingSet.Namespace, existingSet.Name, a.NodePool.Replicas, + ) return nil } if *existingSet.Spec.Replicas > a.NodePool.Replicas { glog.Errorf( "ScaleOut error:"+ - "The StatefulSet.Spec.Replicas value (%d) "+ + "The StatefulSet %s/%s replicas value (%d) "+ "is greater than the desired value (%d)", + existingSet.Namespace, existingSet.Name, *existingSet.Spec.Replicas, a.NodePool.Replicas, ) return nil } + if *existingSet.Spec.Replicas != existingSet.Status.ReadyReplicas { + glog.V(4).Infof( + "Waiting for all pods in nodepool %s/%s/%s (statefulset %s/%s) to become ready "+ + "before adding more nodes. "+ + "Replicas: %d, ReadyReplicas: %d", + a.Cluster.Namespace, a.Cluster.Name, a.NodePool.Name, + existingSet.Namespace, existingSet.Name, + *existingSet.Spec.Replicas, existingSet.Status.ReadyReplicas, + ) + return nil + } + newSet := existingSet.DeepCopy() newSet.Spec.Replicas = ptr.Int32(*newSet.Spec.Replicas + 1) _, err = s.Clientset.AppsV1beta1(). StatefulSets(newSet.Namespace).Update(newSet) @@ -53,7 +69,8 @@ func (a *ScaleOut) Execute(s *controllers.State) error { a.Cluster, corev1.EventTypeNormal, a.Name(), - "ScaleOut: NodePool=%q, ReplicaCount=%d, TargetReplicaCount=%d", + "ScaleOut: Cluster=%s/%s, NodePool=%q, ReplicaCount=%d, TargetReplicaCount=%d", + a.Cluster.Namespace, a.Cluster.Name, a.NodePool.Name, *newSet.Spec.Replicas, a.NodePool.Replicas, ) return nil diff --git a/pkg/controllers/cassandra/actions/scaleout_test.go b/pkg/controllers/cassandra/actions/scaleout_test.go index 610041342..1286f1ea3 100644 --- a/pkg/controllers/cassandra/actions/scaleout_test.go +++ b/pkg/controllers/cassandra/actions/scaleout_test.go @@ -38,9 +38,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(122), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + ReadyReplicas: 122, }, ), }, @@ -67,9 +68,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(124), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(124), + ReadyReplicas: 124, }, ), }, @@ -88,13 +90,39 @@ func TestScaleOut(t *testing.T) { }, expectedErr: false, }, + "No update until all existing pods are ready": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + ReadyReplicas: 121, + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + Replicas: 123, + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + }, + }, "Idempotent: No error if ReplicaCount already matches the actual ReplicaCount": { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(124), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(124), + ReadyReplicas: 124, }, ), }, @@ -117,9 +145,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(122), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(122), + ReadyReplicas: 122, }, ), }, @@ -141,9 +170,10 @@ func TestScaleOut(t *testing.T) { kubeObjects: []runtime.Object{ generate.StatefulSet( generate.StatefulSetConfig{ - Name: "cass-cluster1-pool1", - Namespace: "ns1", - Replicas: ptr.Int32(2), + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: ptr.Int32(2), + ReadyReplicas: 2, }, ), }, @@ -203,7 +233,7 @@ func TestScaleOut(t *testing.T) { if *test.expectedStatefulSet.Replicas != *actualStatefulSet.Spec.Replicas { t.Errorf( "Unexpected replica count. Expected: %d. Actual: %d", - test.expectedStatefulSet.Replicas, actualStatefulSet.Spec.Replicas, + *test.expectedStatefulSet.Replicas, *actualStatefulSet.Spec.Replicas, ) } } From 5cd91b2f594476ba48e17c88e30b93711779e324 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 5 Apr 2018 17:30:33 +0100 Subject: [PATCH 20/21] Trying to make the ScaleOut decision clearer. --- pkg/controllers/cassandra/cluster_control.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index b42bb2b98..afb15da3b 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -208,11 +208,20 @@ func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { } for _, np := range c.Spec.NodePools { nps := c.Status.NodePools[np.Name] - if np.Replicas > nps.ReadyReplicas { + switch { + case np.Replicas == nps.ReadyReplicas: + continue + case np.Replicas > nps.ReadyReplicas: return &actions.ScaleOut{ Cluster: c, NodePool: &np, } + default: + glog.Errorf( + "Unsupported scale change on NodePool %q from %d to %d", + np.Name, nps.ReadyReplicas, np.Replicas, + ) + return nil } } for _, np := range c.Spec.NodePools { From c4fd2e714460d7dd9fb8a656840934bdd92d9417 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Fri, 6 Apr 2018 15:46:21 +0100 Subject: [PATCH 21/21] Random stuff --- pkg/controllers/cassandra/cluster_control.go | 13 ++++--- .../cassandra/cluster_control_test.go | 28 +++++++++----- pkg/controllers/cassandra/pilot/pilot.go | 38 +++++++++++-------- 3 files changed, 49 insertions(+), 30 deletions(-) diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index afb15da3b..61f190424 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -229,17 +229,20 @@ func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { if nps.Version == nil { return nil } + if c.Spec.Version.LessThan(nps.Version) { + glog.Error("Version downgrades are not supported") + return nil + } + if nps.Version.Major != c.Spec.Version.Major { + glog.Error("Major version upgrades are not supported") + return nil + } if nps.Version.LessThan(&c.Spec.Version) { - if nps.Version.Major != c.Spec.Version.Major { - glog.Error("Major version upgrades are not supported") - return nil - } return &actions.UpdateVersion{ Cluster: c, NodePool: &np, } } - glog.Error("Version downgrades are not supported") } return nil } diff --git a/pkg/controllers/cassandra/cluster_control_test.go b/pkg/controllers/cassandra/cluster_control_test.go index a44c211bd..14d6cff8d 100644 --- a/pkg/controllers/cassandra/cluster_control_test.go +++ b/pkg/controllers/cassandra/cluster_control_test.go @@ -29,28 +29,36 @@ func TestNextAction(t *testing.T) { t.Errorf("Unexpected attempt to create a nodepool when there's an existing status") return false } - case *actions.UpdateVersion: + case *actions.ScaleOut: nps, found := c.Status.NodePools[action.NodePool.Name] if !found { - t.Errorf("Unexpected updateversion before status reported") + t.Errorf("Unexpected attempt to scale up a nodepool without a status") return false } - if nps.Version == nil { - t.Errorf("Unexpected updateversion before version reported") + if action.NodePool.Replicas <= nps.ReadyReplicas { + t.Errorf("Unexpected attempt to scale up a nodepool with >= ready replicas") return false } - if nps.Version.Major != c.Spec.Version.Major { - t.Errorf("Unexpected updateversion for major version change") + if nps.Version == nil { + t.Errorf("unexpected scaleout before version is known") return false } - case *actions.ScaleOut: + case *actions.UpdateVersion: nps, found := c.Status.NodePools[action.NodePool.Name] if !found { - t.Errorf("Unexpected attempt to scale up a nodepool without a status") + t.Errorf("Unexpected UpdateVersion before status reported") return false } - if action.NodePool.Replicas <= nps.ReadyReplicas { - t.Errorf("Unexpected attempt to scale up a nodepool with >= ready replicas") + if nps.Version == nil { + t.Errorf("Unexpected UpdateVersion before version reported") + return false + } + if nps.Version.Major != c.Spec.Version.Major { + t.Errorf("Unexpected UpdateVersion for major version change") + return false + } + if nps.ReadyReplicas != action.NodePool.Replicas { + t.Errorf("Unexpected UpdateVersion before scale") return false } } diff --git a/pkg/controllers/cassandra/pilot/pilot.go b/pkg/controllers/cassandra/pilot/pilot.go index 323486e53..41c01cc43 100644 --- a/pkg/controllers/cassandra/pilot/pilot.go +++ b/pkg/controllers/cassandra/pilot/pilot.go @@ -118,27 +118,35 @@ func (c *pilotControl) updateDiscoveredVersions(cluster *v1alpha1.CassandraClust glog.V(4).Infof("No pilots found matching selector: %s", selector) } for _, pilot := range pilots { - if pilot.Status.Cassandra == nil { - glog.V(4).Infof("Skipping pilot with nil status: %s", pilot.Name) - continue - } nodePoolNameForPilot, nodePoolNameFound := pilot.Labels[util.NodePoolNameLabelKey] if !nodePoolNameFound { - glog.V(4).Infof("Skipping pilot without NodePoolNameLabelKey: %s", pilot.Name) + glog.Warningf("Skipping pilot without NodePoolNameLabelKey: %s", pilot.Name) continue } - if cluster.Status.NodePools == nil { - glog.V(4).Infof("Initialising Status.NodePools for: %s", cluster.Name) - cluster.Status.NodePools = map[string]v1alpha1.CassandraClusterNodePoolStatus{} - } - version := pilot.Status.Cassandra.Version nodePoolStatus := cluster.Status.NodePools[nodePoolNameForPilot] - if nodePoolStatus.Version == nil || version.LessThan(nodePoolStatus.Version) { - glog.V(4).Infof("Found lower pilot version: %s, %s", nodePoolNameForPilot, version) - nodePoolStatus.Version = version - cluster.Status.NodePools[nodePoolNameForPilot] = nodePoolStatus - continue + switch { + case pilot.Status.Cassandra == nil: + glog.V(4).Infof( + "Pilot %s/%s has no status. Setting nodepool version to nil", + pilot.Namespace, pilot.Name, + ) + nodePoolStatus.Version = nil + case pilot.Status.Cassandra.Version == nil: + glog.V(4).Infof( + "Pilot %s/%s has not reported its version. Setting nodepool version to nil", + pilot.Namespace, pilot.Name, + ) + nodePoolStatus.Version = nil + case nodePoolStatus.Version == nil: + nodePoolStatus.Version = nil + case pilot.Status.Cassandra.Version.LessThan(nodePoolStatus.Version): + glog.V(4).Infof( + "Found lower pilot version: %s, %s", + nodePoolNameForPilot, pilotVersionStatus, + ) + nodePoolStatus.Version = pilotVersionStatus } + cluster.Status.NodePools[nodePoolNameForPilot] = nodePoolStatus } return nil }