From 42dcc8560abf0b1ee1a12de2049ea61ff9c25407 Mon Sep 17 00:00:00 2001 From: Alay Patel Date: Sun, 11 Oct 2020 02:49:11 -0400 Subject: [PATCH 1/2] add cronjob_controllerv2.go --- cmd/kube-controller-manager/app/batch.go | 14 +- .../app/options/cronjobcontroller.go | 56 ++ .../app/options/options.go | 9 + .../app/options/options_test.go | 9 + .../cronjob/cronjob_controller_test.go | 8 + .../cronjob/cronjob_controllerv2.go | 673 ++++++++++++++++++ .../cronjob/cronjob_controllerv2_test.go | 451 ++++++++++++ pkg/controller/cronjob/utils.go | 60 ++ pkg/controller/cronjob/utils_test.go | 146 ++++ pkg/features/kube_features.go | 11 + test/e2e/apps/cronjob.go | 26 + 11 files changed, 1462 insertions(+), 1 deletion(-) create mode 100644 cmd/kube-controller-manager/app/options/cronjobcontroller.go create mode 100644 pkg/controller/cronjob/cronjob_controllerv2.go create mode 100644 pkg/controller/cronjob/cronjob_controllerv2_test.go diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index e4528a99500d0..f42dfca418e81 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -22,12 +22,13 @@ package app import ( "fmt" - "net/http" "k8s.io/apimachinery/pkg/runtime/schema" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/job" + kubefeatures "k8s.io/kubernetes/pkg/features" ) func startJobController(ctx ControllerContext) (http.Handler, bool, error) { @@ -46,6 +47,17 @@ func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { return nil, false, nil } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) { + cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(), + ctx.InformerFactory.Batch().V1beta1().CronJobs(), + ctx.ClientBuilder.ClientOrDie("cronjob-controller"), + ) + if err != nil { + return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) + } + go cj2c.Run(int(ctx.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Stop) + return nil, true, nil + } cjc, err := cronjob.NewController( ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ) diff --git a/cmd/kube-controller-manager/app/options/cronjobcontroller.go b/cmd/kube-controller-manager/app/options/cronjobcontroller.go new file mode 100644 index 0000000000000..48f03ca67c0e8 --- /dev/null +++ b/cmd/kube-controller-manager/app/options/cronjobcontroller.go @@ -0,0 +1,56 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "github.com/spf13/pflag" + + cronjobconfig "k8s.io/kubernetes/pkg/controller/cronjob/config" +) + +// CronJobControllerOptions holds the CronJobController options. +type CronJobControllerOptions struct { + *cronjobconfig.CronJobControllerConfiguration +} + +// AddFlags adds flags related to JobController for controller manager to the specified FlagSet. +func (o *CronJobControllerOptions) AddFlags(fs *pflag.FlagSet) { + if o == nil { + return + } +} + +// ApplyTo fills up JobController config with options. +func (o *CronJobControllerOptions) ApplyTo(cfg *cronjobconfig.CronJobControllerConfiguration) error { + if o == nil { + return nil + } + + cfg.ConcurrentCronJobSyncs = o.ConcurrentCronJobSyncs + + return nil +} + +// Validate checks validation of CronJobControllerOptions. +func (o *CronJobControllerOptions) Validate() []error { + if o == nil { + return nil + } + + errs := []error{} + return errs +} diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index ee6058cf6263c..32fa0561233cb 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -71,6 +71,7 @@ type KubeControllerManagerOptions struct { GarbageCollectorController *GarbageCollectorControllerOptions HPAController *HPAControllerOptions JobController *JobControllerOptions + CronJobController *CronJobControllerOptions NamespaceController *NamespaceControllerOptions NodeIPAMController *NodeIPAMControllerOptions NodeLifecycleController *NodeLifecycleControllerOptions @@ -144,6 +145,9 @@ func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) { JobController: &JobControllerOptions{ &componentConfig.JobController, }, + CronJobController: &CronJobControllerOptions{ + &componentConfig.CronJobController, + }, NamespaceController: &NamespaceControllerOptions{ &componentConfig.NamespaceController, }, @@ -244,6 +248,7 @@ func (s *KubeControllerManagerOptions) Flags(allControllers []string, disabledBy s.GarbageCollectorController.AddFlags(fss.FlagSet("garbagecollector controller")) s.HPAController.AddFlags(fss.FlagSet("horizontalpodautoscaling controller")) s.JobController.AddFlags(fss.FlagSet("job controller")) + s.CronJobController.AddFlags(fss.FlagSet("cronjob controller")) s.NamespaceController.AddFlags(fss.FlagSet("namespace controller")) s.NodeIPAMController.AddFlags(fss.FlagSet("nodeipam controller")) s.NodeLifecycleController.AddFlags(fss.FlagSet("nodelifecycle controller")) @@ -309,6 +314,9 @@ func (s *KubeControllerManagerOptions) ApplyTo(c *kubecontrollerconfig.Config) e if err := s.JobController.ApplyTo(&c.ComponentConfig.JobController); err != nil { return err } + if err := s.CronJobController.ApplyTo(&c.ComponentConfig.CronJobController); err != nil { + return err + } if err := s.NamespaceController.ApplyTo(&c.ComponentConfig.NamespaceController); err != nil { return err } @@ -383,6 +391,7 @@ func (s *KubeControllerManagerOptions) Validate(allControllers []string, disable errs = append(errs, s.GarbageCollectorController.Validate()...) errs = append(errs, s.HPAController.Validate()...) errs = append(errs, s.JobController.Validate()...) + errs = append(errs, s.CronJobController.Validate()...) errs = append(errs, s.NamespaceController.Validate()...) errs = append(errs, s.NodeIPAMController.Validate()...) errs = append(errs, s.NodeLifecycleController.Validate()...) diff --git a/cmd/kube-controller-manager/app/options/options_test.go b/cmd/kube-controller-manager/app/options/options_test.go index 872415def16ff..845433a58bc30 100644 --- a/cmd/kube-controller-manager/app/options/options_test.go +++ b/cmd/kube-controller-manager/app/options/options_test.go @@ -35,6 +35,7 @@ import ( kubecontrollerconfig "k8s.io/kubernetes/cmd/kube-controller-manager/app/config" kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config" csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config" + cronjobconfig "k8s.io/kubernetes/pkg/controller/cronjob/config" daemonconfig "k8s.io/kubernetes/pkg/controller/daemon/config" deploymentconfig "k8s.io/kubernetes/pkg/controller/deployment/config" endpointconfig "k8s.io/kubernetes/pkg/controller/endpoint/config" @@ -312,6 +313,11 @@ func TestAddFlags(t *testing.T) { ConcurrentJobSyncs: 5, }, }, + CronJobController: &CronJobControllerOptions{ + &cronjobconfig.CronJobControllerConfiguration{ + ConcurrentCronJobSyncs: 5, + }, + }, NamespaceController: &NamespaceControllerOptions{ &namespaceconfig.NamespaceControllerConfiguration{ NamespaceSyncPeriod: metav1.Duration{Duration: 10 * time.Minute}, @@ -560,6 +566,9 @@ func TestApplyTo(t *testing.T) { JobController: jobconfig.JobControllerConfiguration{ ConcurrentJobSyncs: 5, }, + CronJobController: cronjobconfig.CronJobControllerConfiguration{ + ConcurrentCronJobSyncs: 5, + }, NamespaceController: namespaceconfig.NamespaceControllerConfiguration{ NamespaceSyncPeriod: metav1.Duration{Duration: 10 * time.Minute}, ConcurrentNamespaceSyncs: 20, diff --git a/pkg/controller/cronjob/cronjob_controller_test.go b/pkg/controller/cronjob/cronjob_controller_test.go index 66467ce71e198..c3a7450dfe1df 100644 --- a/pkg/controller/cronjob/cronjob_controller_test.go +++ b/pkg/controller/cronjob/cronjob_controller_test.go @@ -48,6 +48,14 @@ func justBeforeTheHour() time.Time { return T1 } +func justASecondBeforeTheHour() time.Time { + T1, err := time.Parse(time.RFC3339, "2016-05-19T09:59:59Z") + if err != nil { + panic("test setup error") + } + return T1 +} + func topOfTheHour() time.Time { T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") if err != nil { diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go new file mode 100644 index 0000000000000..1586847bc04fc --- /dev/null +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -0,0 +1,673 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjob + +import ( + "fmt" + "reflect" + "time" + + "github.com/robfig/cron" + + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + batchv1informers "k8s.io/client-go/informers/batch/v1" + batchv1beta1informers "k8s.io/client-go/informers/batch/v1beta1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + batchv1listers "k8s.io/client-go/listers/batch/v1" + "k8s.io/client-go/listers/batch/v1beta1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/component-base/metrics/prometheus/ratelimiter" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller" +) + +var ( + delta100ms = 100 * time.Millisecond + retriesForEnqueingAllCronJobs = 5 +) + +// Refactored Cronjob controller that uses DelayingQueue and informers + +// ControllerV2 is a controller for CronJobs. +type ControllerV2 struct { + queue workqueue.DelayingInterface + recorder record.EventRecorder + + jobControl jobControlInterface + cronJobControl cjControlInterface + + jobLister batchv1listers.JobLister + cronJobLister v1beta1.CronJobLister + + jobListerSynced cache.InformerSynced + cronJobListerSynced cache.InformerSynced + + // now is a function that returns current time, done to facilitate unit tests + now func() time.Time +} + +// NewController creates and initializes a new Controller. +func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1beta1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + + if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { + if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { + return nil, err + } + } + + jm := &ControllerV2{ + queue: workqueue.NewNamedDelayingQueue("cronjob"), + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}), + + jobControl: realJobControl{KubeClient: kubeClient}, + cronJobControl: &realCJControl{KubeClient: kubeClient}, + + jobLister: jobInformer.Lister(), + cronJobLister: cronJobsInformer.Lister(), + + jobListerSynced: jobInformer.Informer().HasSynced, + cronJobListerSynced: cronJobsInformer.Informer().HasSynced, + now: time.Now, + } + + jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: jm.addJob, + UpdateFunc: jm.updateJob, + DeleteFunc: jm.deleteJob, + }) + + cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + jm.enqueueController(obj) + }, + UpdateFunc: jm.updateCronJob, + DeleteFunc: func(obj interface{}) { + jm.enqueueController(obj) + }, + }) + + return jm, nil +} + +// Run starts the main goroutine responsible for watching and syncing jobs. +func (jm *ControllerV2) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer jm.queue.ShutDown() + + klog.Infof("Starting cronjob controller v2") + defer klog.Infof("Shutting down cronjob controller v2") + + go jm.enqueueAllCronJobsWithRetries(retriesForEnqueingAllCronJobs) + + if !cache.WaitForNamedCacheSync("cronjob", stopCh, jm.jobListerSynced, jm.cronJobListerSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(jm.worker, time.Second, stopCh) + } + + <-stopCh +} + +// enqueueAllCronJobsWithRetries enqueues all the cronjobs and is thread-safe. +func (jm *ControllerV2) enqueueAllCronJobsWithRetries(retries int) { + cronjobs := jm.listCronJobsWithRetries(retries) + for _, cronjob := range cronjobs { + jm.enqueueController(cronjob) + } + return +} + +// listCronJobsWithRetries tries to list the cronjobs with retries upon hitting an error +func (jm *ControllerV2) listCronJobsWithRetries(retries int) []*batchv1beta1.CronJob { + cronjobs := []*batchv1beta1.CronJob{} + var err error + for i := 0; i < retries; i++ { + cronjobs, err = jm.cronJobLister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to list cronjobs at the start of cronjob controller, retrying %d more time/s, error: %v\n", retries-i-1, err)) + continue + } + } + return cronjobs +} + +func (jm *ControllerV2) worker() { + for jm.processNextWorkItem() { + } +} + +func (jm *ControllerV2) processNextWorkItem() bool { + key, quit := jm.queue.Get() + if quit { + return false + } + defer jm.queue.Done(key) + err, requeueAfter := jm.sync(key.(string)) + switch { + case err != nil: + utilruntime.HandleError(fmt.Errorf("Error syncing CronJobController %v, requeuing: %v", key.(string), err)) + jm.queue.Add(key) + case requeueAfter != nil: + jm.queue.AddAfter(key, *requeueAfter) + default: + jm.queue.Done(key) + } + return true +} + +func (jm *ControllerV2) sync(cronJobKey string) (error, *time.Duration) { + ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey) + if err != nil { + return err, nil + } + + cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name) + switch { + case errors.IsNotFound(err): + // may be cronjob is deleted, dont need to requeue this key + klog.V(2).InfoS("cronjob not found, may be it is deleted", "cronjob", klog.KRef(ns, name), "err", err) + return nil, nil + case err != nil: + // for other transient apiserver error requeue with exponential backoff + return err, nil + } + + jobList := []*batchv1.Job{} + if len(cronJob.Spec.JobTemplate.Labels) == 0 { + jobList, err = jm.jobLister.Jobs(ns).List(labels.Everything()) + } else { + jobList, err = jm.jobLister.Jobs(ns).List(labels.Set(cronJob.Spec.JobTemplate.Labels).AsSelector()) + } + if err != nil { + return err, nil + } + + jobsToBeReconciled := []batchv1.Job{} + + for _, job := range jobList { + // If it has a ControllerRef, that's all that matters. + if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == name { + // this job is needs to be reconciled + jobsToBeReconciled = append(jobsToBeReconciled, *job) + } + } + + err, requeueAfter := syncOne2(cronJob, jobsToBeReconciled, time.Now(), jm.jobControl, jm.cronJobControl, jm.recorder) + if err != nil { + klog.V(2).InfoS("error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) + return err, nil + } + + err = cleanupFinishedJobs2(cronJob, jobsToBeReconciled, jm.jobControl, jm.cronJobControl, jm.recorder) + if err != nil { + klog.V(2).InfoS("error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err) + return err, nil + } + + if requeueAfter != nil { + klog.V(4).InfoS("re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter) + return nil, requeueAfter + } + // this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format + return nil, nil +} + +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the correct Kind. +func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batchv1beta1.CronJob { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + cronJob, err := jm.cronJobLister.CronJobs(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if cronJob.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return cronJob +} + +// When a job is created, enqueue the controller that manages it and update it's expectations. +func (jm *ControllerV2) addJob(obj interface{}) { + job := obj.(*batchv1.Job) + if job.DeletionTimestamp != nil { + // on a restart of the controller controller, it's possible a new job shows up in a state that + // is already pending deletion. Prevent the job from being a creation observation. + jm.deleteJob(job) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := metav1.GetControllerOf(job); controllerRef != nil { + cronJob := jm.resolveControllerRef(job.Namespace, controllerRef) + if cronJob == nil { + return + } + jm.enqueueController(cronJob) + return + } +} + +// updateJob figures out what CronJob(s) manage a Job when the Job +// is updated and wake them up. If the anything of the Job have changed, we need to +// awaken both the old and new CronJob. old and cur must be *batchv1.Job +// types. +func (jm *ControllerV2) updateJob(old, cur interface{}) { + curJob := cur.(*batchv1.Job) + oldJob := old.(*batchv1.Job) + if curJob.ResourceVersion == oldJob.ResourceVersion { + // Periodic resync will send update events for all known jobs. + // Two different versions of the same jobs will always have different RVs. + return + } + + curControllerRef := metav1.GetControllerOf(curJob) + oldControllerRef := metav1.GetControllerOf(oldJob) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if cronJob := jm.resolveControllerRef(oldJob.Namespace, oldControllerRef); cronJob != nil { + jm.enqueueController(cronJob) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + cronJob := jm.resolveControllerRef(curJob.Namespace, curControllerRef) + if cronJob == nil { + return + } + jm.enqueueController(cronJob) + return + } +} + +func (jm *ControllerV2) deleteJob(obj interface{}) { + job, ok := obj.(*batchv1.Job) + + // When a delete is dropped, the relist will notice a job in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + job, ok = tombstone.Obj.(*batchv1.Job) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj)) + return + } + } + + controllerRef := metav1.GetControllerOf(job) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + cronJob := jm.resolveControllerRef(job.Namespace, controllerRef) + if cronJob == nil { + return + } + jm.enqueueController(cronJob) +} + +func (jm *ControllerV2) enqueueController(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + + jm.queue.Add(key) +} + +func (jm *ControllerV2) enqueueControllerWithTime(obj interface{}, t time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + + jm.queue.AddAfter(key, t) +} + +// updateCronJob re-queues the CronJob for next scheduled time if there is a +// change in spec.schedule otherwise it re-queues it now +func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { + oldCJ, okOld := old.(*batchv1beta1.CronJob) + newCJ, okNew := curr.(*batchv1beta1.CronJob) + + if !okOld || !okNew { + // typecasting of one failed, handle this better, may be log entry + return + } + // if the change in schedule results in next requeue having to be sooner than it already was, + // it will be handled here by the queue. If the next requeue is further than previous schedule, + // the sync loop will essentially be a no-op for the already queued key with old schedule. + if oldCJ.Spec.Schedule != newCJ.Spec.Schedule { + // schedule changed, change the requeue time + sched, err := cron.ParseStandard(newCJ.Spec.Schedule) + if err != nil { + // this is likely a user error in defining the spec value + // we should log the error and not reconcile this cronjob until an update to spec + klog.V(2).InfoS("unparseable schedule for cronjob", "cronjob", klog.KRef(newCJ.GetNamespace(), newCJ.GetName()), "schedule", newCJ.Spec.Schedule, "err", err) + return + } + now := jm.now() + t := nextScheduledTimeDurationWithDelta(sched, now) + + jm.enqueueControllerWithTime(curr, *t) + return + } + + // other parameters changed, requeue this now and if this gets triggered + // within deadline, sync loop will work on the CJ otherwise updates will be handled + // during the next schedule + // TODO: need to handle the change of spec.JobTemplate.metadata.labels explicitly + // to cleanup jobs with old labels + jm.enqueueController(curr) +} + +// TODO: @alpatel we need to return errors from this function, in order to allow +// for errors to propagate up the reconcile function and kick in queues exponential +// backed off retries more details on inline code comments in the func + +// syncOne reconciles a CronJob with a list of any Jobs that it created. +// All known jobs created by "cj" should be included in "js". +// The current time is passed in to facilitate testing. +// It has no receiver, to facilitate testing. +func syncOne2( + cj *batchv1beta1.CronJob, + js []batchv1.Job, + now time.Time, + jc jobControlInterface, + cjc cjControlInterface, + recorder record.EventRecorder) (error, *time.Duration) { + + childrenJobs := make(map[types.UID]bool) + for _, j := range js { + childrenJobs[j.ObjectMeta.UID] = true + found := inActiveList(*cj, j.ObjectMeta.UID) + if !found && !IsJobFinished(&j) { + recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) + // We found an unfinished job that has us as the parent, but it is not in our Active list. + // This could happen if we crashed right after creating the Job and before updating the status, + // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created + // a job that they wanted us to adopt. + + // TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't + // stop users from creating jobs if they have permission. It is assumed that if a + // user has permission to create a job within a namespace, then they have permission to make any cronJob + // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. + // TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about? + } else if found && IsJobFinished(&j) { + _, status := getFinishedStatus(&j) + deleteFromActiveList(cj, j.ObjectMeta.UID) + recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) + } + } + + // Remove any job reference from the active list if the corresponding job does not exist any more. + // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching + // job running. + for _, j := range cj.Status.Active { + if found := childrenJobs[j.UID]; !found { + recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) + deleteFromActiveList(cj, j.UID) + } + } + + // TODO: @alpatel explore if cached client can be used as realCJControl + updatedCJ, err := cjc.UpdateStatus(cj) + if err != nil { + klog.V(2).InfoS("Unable to update status for cronjon", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) + return err, nil + } + *cj = *updatedCJ + + if cj.DeletionTimestamp != nil { + // The CronJob is being deleted. + // Don't do anything other than updating status. + return fmt.Errorf("cronjob %s/%s is being deleted", cj.Namespace, cj.Name), nil + } + + if cj.Spec.Suspend != nil && *cj.Spec.Suspend { + klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + return nil, nil + } + + // TODO: alpatel: this is here now just to pass the unit tests, + // move this to the start of the function to return on error early + sched, err := cron.ParseStandard(cj.Spec.Schedule) + if err != nil { + // this is likely a user error in defining the spec value + // we should log the error and not reconcile this cronjob until an update to spec + klog.V(2).InfoS("unparseable schedule", "cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err) + recorder.Eventf(cj, v1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %s : %s", cj.Spec.Schedule, err) + return nil, nil + } + times, err := getRecentUnmetScheduleTimes2(*cj, now, sched) + switch { + case err != nil && len(times) == 0: + // too many missed jobs, schedule the next one on time and return + // TODO: @alpatel, with revised workflow this probably needs be reworked + // the thought process is we will always miss the schedule time and + // controller will reconcile after scheduled time + delta time spent + // in reconciliation loop. With that if a job misses 100 schedule times + // with this block, it will always return here. + recorder.Eventf(cj, v1.EventTypeWarning, "TooManyMissedTimes", "Too many missed times for the cronjob, will schedule the next one", err) + klog.ErrorS(err, "too many missed times", "cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), err) + // schedule for next period + t := nextScheduledTimeDurationWithDelta(sched, now) + + // in order to unwedge the cronjob from always returning from this block + // TODO: @alpatel, in the future we should add a .status.nextScheduleTime + // and refactor getRecentUnmetScheduleTimes to give us 101th time after + // 100 missed times + cj.Status.LastScheduleTime = &metav1.Time{Time: now} + if _, err := cjc.UpdateStatus(cj); err != nil { + klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) + return fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err), nil + } + return nil, t + case len(times) == 0: + // no unmet start time, return. + // The only time this should happen is if queue is filled after restart. + // Otherwise, the queue is always suppose to trigger sync function at the time of + // the scheduled time, that will give atleast 1 unmet time schedule + klog.V(4).InfoS("No unmet start times", "cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + t := nextScheduledTimeDurationWithDelta(sched, now) + return nil, t + } + + scheduledTime := times[len(times)-1] + tooLate := false + if cj.Spec.StartingDeadlineSeconds != nil { + tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now) + } + if tooLate { + klog.V(4).InfoS("Missed starting window", "cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z)) + // TODO: @alpatel: confirm with @soltysh the following TODO is not true anymore. We will now + // only requeue the for the next scheduled time, instead of hitting this error again + // and again. With the new workflow we might only hit this if controller got wedged + // and on restart we miss the schedule time by more than deadline. In that case + // schedule for next time + // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing + // the miss every cycle. In order to avoid sending multiple events, and to avoid processing + // the cj again and again, we could set a Status.LastMissedTime when we notice a miss. + // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp, + // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate + // and event the next time we process it, and also so the user looking at the status + // can see easily that there was a missed execution. + t := nextScheduledTimeDurationWithDelta(sched, now) + return nil, t + } + if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 { + // Regardless which source of information we use for the set of active jobs, + // there is some risk that we won't see an active job when there is one. + // (because we haven't seen the status update to the SJ or the created pod). + // So it is theoretically possible to have concurrency with Forbid. + // As long the as the invocations are "far enough apart in time", this usually won't happen. + // + // TODO: @alpatel confirm we @soltysh we can probably set a deterministic job name per + // schedule time. The formula: last_run=UTCInSeconds(now)-UTCInSeconds(creationTime)/intervalInSeconds + // might give use last scheduled counter. May be save it in status and reconcile on that + // TODO: for Forbid, we could use the same name for every execution, as a lock. + // With replace, we could use a name that is deterministic per execution time. + // But that would mean that you could not inspect prior successes or failures of Forbid jobs. + klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + t := nextScheduledTimeDurationWithDelta(sched, now) + return nil, t + } + if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { + for _, j := range cj.Status.Active { + klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name)) + + job, err := jc.GetJob(j.Namespace, j.Name) + if err != nil { + recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) + return err, nil + } + if !deleteJob(cj, job, jc, recorder) { + return fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name), nil + } + } + } + + jobReq, err := getJobFromTemplate(cj, scheduledTime) + if err != nil { + klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + return err, nil + } + jobResp, err := jc.CreateJob(cj.Namespace, jobReq) + switch { + case errors.HasStatusCause(err, v1.NamespaceTerminatingCause): + // TODO: @alpatel log, event? + case errors.IsAlreadyExists(err): + // TODO: @alpatel handle this, we tried to create a job that already exists. may be update/patch? + case err != nil: + // default error handling + recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) + return err, nil + } + klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) + + // ------------------------------------------------------------------ // + + // If this process restarts at this point (after posting a job, but + // before updating the status), then we might try to start the job on + // the next time. Actually, if we re-list the SJs and Jobs on the next + // iteration of syncAll, we might not see our own status update, and + // then post one again. So, we need to use the job name as a lock to + // prevent us from making the job twice (name the job with hash of its + // scheduled time). + + // Add the just-started job to the status list. + jobRef, err := getRef(jobResp) + if err != nil { + klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "err", err) + return fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cj.GetNamespace(), cj.GetName())), nil + } else { + cj.Status.Active = append(cj.Status.Active, *jobRef) + } + cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} + if _, err := cjc.UpdateStatus(cj); err != nil { + klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) + return fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err), nil + } + + t := nextScheduledTimeDurationWithDelta(sched, now) + return nil, t +} + +// nextScheduledTimeDurationWithDelta returns the time duration to requeue based on +// the schedule and current time. It adds a 100ms padding to the next requeue to account +// for Network Time Protocol(NTP) time skews. If the time drifts are adjusted which in most +// realistic cases would be around 100s, scheduled cron will still be executed without missing +// the schedule. +func nextScheduledTimeDurationWithDelta(sched cron.Schedule, now time.Time) *time.Duration { + t := sched.Next(now).Add(delta100ms).Sub(now) + return &t +} + +// cleanupFinishedJobs cleanups finished jobs created by a CronJob +func cleanupFinishedJobs2(cj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, + cjc cjControlInterface, recorder record.EventRecorder) error { + // If neither limits are active, there is no need to do anything. + if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { + return nil + } + + failedJobs := []batchv1.Job{} + successfulJobs := []batchv1.Job{} + + for _, job := range js { + isFinished, finishedStatus := getFinishedStatus(&job) + if isFinished && finishedStatus == batchv1.JobComplete { + successfulJobs = append(successfulJobs, job) + } else if isFinished && finishedStatus == batchv1.JobFailed { + failedJobs = append(failedJobs, job) + } + } + + if cj.Spec.SuccessfulJobsHistoryLimit != nil { + removeOldestJobs(cj, + successfulJobs, + jc, + *cj.Spec.SuccessfulJobsHistoryLimit, + recorder) + } + + if cj.Spec.FailedJobsHistoryLimit != nil { + removeOldestJobs(cj, + failedJobs, + jc, + *cj.Spec.FailedJobsHistoryLimit, + recorder) + } + + // Update the CronJob, in case jobs were removed from the list. + _, err := cjc.UpdateStatus(cj) + return err +} diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go new file mode 100644 index 0000000000000..38f32a2515e4d --- /dev/null +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -0,0 +1,451 @@ +package cronjob + +import ( + "fmt" + "k8s.io/apimachinery/pkg/labels" + "reflect" + "strings" + "sync" + "testing" + "time" + + "github.com/robfig/cron" + batchv1 "k8s.io/api/batch/v1" + batchV1beta1 "k8s.io/api/batch/v1beta1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/listers/batch/v1beta1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + // For the cronjob controller to do conversions. + _ "k8s.io/kubernetes/pkg/apis/batch/install" + _ "k8s.io/kubernetes/pkg/apis/core/install" +) + +func Test_syncOne2(t *testing.T) { + // Check expectations on deadline parameters + if shortDead/60/60 >= 1 { + t.Errorf("shortDead should be less than one hour") + } + + if mediumDead/60/60 < 1 || mediumDead/60/60 >= 24 { + t.Errorf("mediumDead should be between one hour and one day") + } + + if longDead/60/60/24 < 10 { + t.Errorf("longDead should be at least ten days") + } + + testCases := map[string]struct { + // cj spec + concurrencyPolicy batchV1beta1.ConcurrencyPolicy + suspend bool + schedule string + deadline int64 + + // cj status + ranPreviously bool + stillActive bool + + jobCreationTime time.Time + + // environment + now time.Time + + // expectations + expectCreate bool + expectDelete bool + expectActive int + expectedWarnings int + expectErr bool + expectRequeueAfter bool + }{ + "never ran, not valid schedule, A": {A, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F}, + "never ran, not valid schedule, F": {f, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F}, + "never ran, not valid schedule, R": {f, F, errorSchedule, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 1, F, F}, + "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T}, + "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T}, + "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T}, + "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, F}, + "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, T}, + "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + + "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T}, + "prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T}, + "prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justBeforeTheHour(), F, F, 0, 0, F, T}, + "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, F}, + "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), justAfterTheHour(), F, F, 0, 0, F, T}, + "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterThePriorHour(), justAfterTheHour(), T, F, 1, 0, F, T}, + + "still active, not time, A": {A, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T}, + "still active, not time, F": {f, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T}, + "still active, not time, R": {R, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justBeforeTheHour(), F, F, 1, 0, F, T}, + "still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, F, 2, 0, F, T}, + "still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, T}, + "still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, T, 1, 0, F, T}, + "still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, F}, + "still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterThePriorHour(), justAfterTheHour(), F, F, 1, 0, F, T}, + "still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterThePriorHour(), justAfterTheHour(), T, F, 2, 0, F, T}, + + // Controller should fail to schedule these, as there are too many missed starting times + // and either no deadline or a too long deadline. + "prev ran but done, long overdue, not past deadline, A": {A, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), F, F, 0, 1, F, T}, + "prev ran but done, long overdue, not past deadline, R": {R, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), F, F, 0, 1, F, T}, + "prev ran but done, long overdue, not past deadline, F": {f, F, onTheHour, longDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), F, F, 0, 1, F, T}, + "prev ran but done, long overdue, no deadline, A": {A, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), F, F, 0, 1, F, T}, + "prev ran but done, long overdue, no deadline, R": {R, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), F, F, 0, 1, F, T}, + "prev ran but done, long overdue, no deadline, F": {f, F, onTheHour, noDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), F, F, 0, 1, F, T}, + + "prev ran but done, long overdue, past medium deadline, A": {A, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T}, + "prev ran but done, long overdue, past short deadline, A": {A, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T}, + + "prev ran but done, long overdue, past medium deadline, R": {R, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T}, + "prev ran but done, long overdue, past short deadline, R": {R, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T}, + + "prev ran but done, long overdue, past medium deadline, F": {f, F, onTheHour, mediumDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T}, + "prev ran but done, long overdue, past short deadline, F": {f, F, onTheHour, shortDead, T, F, justAfterThePriorHour(), weekAfterTheHour(), T, F, 1, 0, F, T}, + + // Tests for time skews + "this ran but done, time drifted back, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), justBeforeTheHour(), F, F, 0, 0, F, T}, + } + for name, tc := range testCases { + name := name + tc := tc + t.Run(name, func(t *testing.T) { + if name == "this ran but done, time drifted back, F" { + println("hello") + } + cj := cronJob() + cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy + cj.Spec.Suspend = &tc.suspend + cj.Spec.Schedule = tc.schedule + if tc.deadline != noDead { + cj.Spec.StartingDeadlineSeconds = &tc.deadline + } + + var ( + job *batchv1.Job + err error + ) + js := []batchv1.Job{} + if tc.ranPreviously { + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()} + cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()} + job, err = getJobFromTemplate(&cj, tc.jobCreationTime) + if err != nil { + t.Fatalf("%s: unexpected error creating a job from template: %v", name, err) + } + job.UID = "1234" + job.Namespace = "" + if tc.stillActive { + cj.Status.Active = []v1.ObjectReference{{UID: job.UID}} + js = append(js, *job) + } + } else { + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} + if tc.stillActive { + t.Errorf("%s: test setup error: this case makes no sense", name) + } + } + + jc := &fakeJobControl{Job: job} + cjc := &fakeCJControl{} + recorder := record.NewFakeRecorder(10) + + err, requeueAfter := syncOne2(&cj, js, tc.now, jc, cjc, recorder) + if tc.expectErr && err == nil { + t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter) + } + if tc.expectRequeueAfter { + sched, err := cron.ParseStandard(tc.schedule) + if err != nil { + t.Errorf("%s: test setup error: the schedule %s is unparseable: %#v", name, tc.schedule, err) + } + expectedRequeueAfter := nextScheduledTimeDurationWithDelta(sched, tc.now) + if !reflect.DeepEqual(requeueAfter, expectedRequeueAfter) { + t.Errorf("%s: expected requeueAfter: %+v, got requeueAfter time: %+v", name, expectedRequeueAfter, requeueAfter) + } + } + expectedCreates := 0 + if tc.expectCreate { + expectedCreates = 1 + } + if len(jc.Jobs) != expectedCreates { + t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs)) + } + for i := range jc.Jobs { + job := &jc.Jobs[i] + controllerRef := metav1.GetControllerOf(job) + if controllerRef == nil { + t.Errorf("%s: expected job to have ControllerRef: %#v", name, job) + } else { + if got, want := controllerRef.APIVersion, "batch/v1beta1"; got != want { + t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want) + } + if got, want := controllerRef.Kind, "CronJob"; got != want { + t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want) + } + if got, want := controllerRef.Name, cj.Name; got != want { + t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want) + } + if got, want := controllerRef.UID, cj.UID; got != want { + t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want) + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("%s: controllerRef.Controller is not set to true", name) + } + } + } + + expectedDeletes := 0 + if tc.expectDelete { + expectedDeletes = 1 + } + if len(jc.DeleteJobName) != expectedDeletes { + t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName)) + } + + // Status update happens once when ranging through job list, and another one if create jobs. + expectUpdates := 1 + expectedEvents := 0 + if tc.expectCreate { + expectedEvents++ + expectUpdates++ + } + if tc.expectDelete { + expectedEvents++ + } + expectedEvents += tc.expectedWarnings + + if len(recorder.Events) != expectedEvents { + t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events)) + } + + numWarnings := 0 + for i := 1; i <= len(recorder.Events); i++ { + e := <-recorder.Events + if strings.HasPrefix(e, v1.EventTypeWarning) { + numWarnings++ + } + } + if numWarnings != tc.expectedWarnings { + t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings) + } + + if tc.expectActive != len(cjc.Updates[expectUpdates-1].Status.Active) { + t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(cjc.Updates[expectUpdates-1].Status.Active)) + } + }) + } + +} + +// this test will take around 61 seconds to complete +func TestController2_updateCronJob(t *testing.T) { + cjc := &fakeCJControl{} + jc := &fakeJobControl{} + type fields struct { + queue workqueue.DelayingInterface + recorder record.EventRecorder + jobControl jobControlInterface + cronJobControl cjControlInterface + } + type args struct { + oldJobTemplate *batchV1beta1.JobTemplateSpec + newJobTemplate *batchV1beta1.JobTemplateSpec + oldJobSchedule string + newJobSchedule string + } + tests := []struct { + name string + fields fields + args args + deltaTimeForQueue time.Duration + roundOffTimeDuration time.Duration + }{ + { + name: "spec.template changed", + fields: fields{ + queue: workqueue.NewDelayingQueue(), + recorder: record.NewFakeRecorder(10), + jobControl: jc, + cronJobControl: cjc, + }, + args: args{ + oldJobTemplate: &batchV1beta1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: jobSpec(), + }, + newJobTemplate: &batchV1beta1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"a": "foo"}, + Annotations: map[string]string{"x": "y"}, + }, + Spec: jobSpec(), + }, + }, + deltaTimeForQueue: 0 * time.Second, + roundOffTimeDuration: 500*time.Millisecond + delta100ms, + }, + { + name: "spec.schedule changed", + fields: fields{ + queue: workqueue.NewDelayingQueue(), + recorder: record.NewFakeRecorder(10), + jobControl: jc, + cronJobControl: cjc, + }, + args: args{ + oldJobSchedule: "30 * * * *", + newJobSchedule: "1 * * * *", + }, + deltaTimeForQueue: 61*time.Second + delta100ms, + roundOffTimeDuration: 750 * time.Millisecond, + }, + // TODO: Add more test cases for updating scheduling. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cj := cronJob() + newCj := cronJob() + if tt.args.oldJobTemplate != nil { + cj.Spec.JobTemplate = *tt.args.oldJobTemplate + } + if tt.args.newJobTemplate != nil { + newCj.Spec.JobTemplate = *tt.args.newJobTemplate + } + if tt.args.oldJobSchedule != "" { + cj.Spec.Schedule = tt.args.oldJobSchedule + } + if tt.args.newJobSchedule != "" { + newCj.Spec.Schedule = tt.args.newJobSchedule + } + jm := &ControllerV2{ + queue: tt.fields.queue, + recorder: tt.fields.recorder, + jobControl: tt.fields.jobControl, + cronJobControl: tt.fields.cronJobControl, + } + jm.now = justASecondBeforeTheHour + now := time.Now() + then := time.Now() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + now = time.Now() + jm.queue.Get() + then = time.Now() + wg.Done() + return + }() + jm.updateCronJob(&cj, &newCj) + wg.Wait() + d := then.Sub(now) + if d.Round(tt.roundOffTimeDuration).Seconds() != tt.deltaTimeForQueue.Round(tt.roundOffTimeDuration).Seconds() { + t.Errorf("Expected %#v got %#v", tt.deltaTimeForQueue.Round(tt.roundOffTimeDuration).String(), d.Round(tt.roundOffTimeDuration).String()) + } + }) + } +} + +type FakeCronJobLister struct { + cronjobs []*batchV1beta1.CronJob + errCount int + listCalled int +} + +func (f FakeCronJobLister) List(selector labels.Selector) ([]*batchV1beta1.CronJob, error) { + if f.listCalled <= f.errCount { + return nil, fmt.Errorf("fake error") + } + ret := []*batchV1beta1.CronJob{} + for _, cj := range f.cronjobs { + if selector.Matches(labels.Set(cj.GetLabels())) { + ret = append(ret, cj) + } + } + return ret, nil +} + +func (f FakeCronJobLister) CronJobs(namespace string) v1beta1.CronJobNamespaceLister { + panic("implement me") +} + +func TestControllerV2_enqueueAllExistingCronJobsWithRetries(t *testing.T) { + type fields struct { + queue workqueue.DelayingInterface + cronJobLister FakeCronJobLister + } + type args struct { + retries int + } + tests := []struct { + name string + fields fields + args args + error bool + }{ + { + name: "test without errors", + fields: fields{cronJobLister: FakeCronJobLister{ + cronjobs: []*batchV1beta1.CronJob{ + &batchV1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "bar", + Name: "foo", + }, + }, + }, + errCount: -1, + listCalled: 0, + }, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-enqueue-all"), + }, + args: args{retries: 5}, + error: false, + }, + { + name: "test without errors", + fields: fields{cronJobLister: FakeCronJobLister{ + cronjobs: []*batchV1beta1.CronJob{ + &batchV1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "bar", + Name: "foo", + }, + }, + }, + errCount: 6, + listCalled: 0, + }, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-enqueue-all"), + }, + args: args{retries: 5}, + error: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + jm := &ControllerV2{ + queue: tt.fields.queue, + cronJobLister: tt.fields.cronJobLister, + } + jm.enqueueAllCronJobsWithRetries(tt.args.retries) + if !tt.error && jm.queue.Len() != len(tt.fields.cronJobLister.cronjobs) { + t.Errorf("expected %d, got %d\n", len(tt.fields.cronJobLister.cronjobs), jm.queue.Len()) + } + if tt.error && jm.queue.Len() != 0 { + t.Errorf("expected 0, got %d", jm.queue.Len()) + } + }) + } +} diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index 1ffbd5927dd3d..769ed66269703 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -45,6 +45,8 @@ func deleteFromActiveList(cj *batchv1beta1.CronJob, uid types.UID) { if cj == nil { return } + // TODO: @alpatel the memory footprint can may be reduced here by + // cj.Status.Active = append(cj.Status.Active[:indexToRemove], cj.Status.Active[indexToRemove:]...) newActive := []v1.ObjectReference{} for _, j := range cj.Status.Active { if j.UID != uid { @@ -147,6 +149,64 @@ func getRecentUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time) ([]time return starts, nil } +// getRecentUnmetScheduleTimes2 gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not. +// +// If there are too many (>100) unstarted times, just give up and return an empty slice. +// If there were missed times prior to the last known start time, then those are not returned. +func getRecentUnmetScheduleTimes2(cj batchv1beta1.CronJob, now time.Time, schedule cron.Schedule) ([]time.Time, error) { + starts := []time.Time{} + + var earliestTime time.Time + if cj.Status.LastScheduleTime != nil { + earliestTime = cj.Status.LastScheduleTime.Time + } else { + // If none found, then this is either a recently created cronJob, + // or the active/completed info was somehow lost (contract for status + // in kubernetes says it may need to be recreated), or that we have + // started a job, but have not noticed it yet (distributed systems can + // have arbitrary delays). In any case, use the creation time of the + // CronJob as last known start time. + earliestTime = cj.ObjectMeta.CreationTimestamp.Time + } + if cj.Spec.StartingDeadlineSeconds != nil { + // Controller is not going to schedule anything below this point + schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)) + + if schedulingDeadline.After(earliestTime) { + earliestTime = schedulingDeadline + } + } + if earliestTime.After(now) { + return []time.Time{}, nil + } + + for t := schedule.Next(earliestTime); !t.After(now); t = schedule.Next(t) { + starts = append(starts, t) + // An object might miss several starts. For example, if + // controller gets wedged on friday at 5:01pm when everyone has + // gone home, and someone comes in on tuesday AM and discovers + // the problem and restarts the controller, then all the hourly + // jobs, more than 80 of them for one hourly cronJob, should + // all start running with no further intervention (if the cronJob + // allows concurrency and late starts). + // + // However, if there is a bug somewhere, or incorrect clock + // on controller's server or apiservers (for setting creationTimestamp) + // then there could be so many missed start times (it could be off + // by decades or more), that it would eat up all the CPU and memory + // of this controller. In that case, we want to not try to list + // all the missed start times. + // + // I've somewhat arbitrarily picked 100, as more than 80, + // but less than "lots". + if len(starts) > 100 { + // We can't get the most recent times so just return an empty slice + return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew") + } + } + return starts, nil +} + // getJobFromTemplate makes a Job from a CronJob func getJobFromTemplate(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) { labels := copyLabels(&cj.Spec.JobTemplate) diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index 8df0f3fe2233e..31cd160c00e5d 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + cron "github.com/robfig/cron" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/api/core/v1" @@ -242,6 +243,151 @@ func TestGroupJobsByParent(t *testing.T) { } } +func TestGetRecentUnmetScheduleTimes2(t *testing.T) { + // schedule is hourly on the hour + schedule := "0 * * * ?" + + PraseSchedule := func(schedule string) cron.Schedule { + sched, err := cron.ParseStandard(schedule) + if err != nil { + t.Errorf("Error parsing schedule: %#v", err) + return nil + } + return sched + } + // T1 is a scheduled start time of that schedule + T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z") + if err != nil { + t.Errorf("test setup error: %v", err) + } + // T2 is a scheduled start time of that schedule after T1 + T2, err := time.Parse(time.RFC3339, "2016-05-19T11:00:00Z") + if err != nil { + t.Errorf("test setup error: %v", err) + } + + cj := batchv1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mycronjob", + Namespace: metav1.NamespaceDefault, + UID: types.UID("1a2b3c"), + }, + Spec: batchv1beta1.CronJobSpec{ + Schedule: schedule, + ConcurrencyPolicy: batchv1beta1.AllowConcurrent, + JobTemplate: batchv1beta1.JobTemplateSpec{}, + }, + } + { + // Case 1: no known start times, and none needed yet. + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Current time is more than creation time, but less than T1. + now := T1.Add(-7 * time.Minute) + times, err := getRecentUnmetScheduleTimes2(cj, now, PraseSchedule(cj.Spec.Schedule)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 0 { + t.Errorf("expected no start times, got: %v", times) + } + } + { + // Case 2: no known start times, and one needed. + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Current time is after T1 + now := T1.Add(2 * time.Second) + times, err := getRecentUnmetScheduleTimes2(cj, now, PraseSchedule(cj.Spec.Schedule)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 1 { + t.Errorf("expected 1 start time, got: %v", times) + } else if !times[0].Equal(T1) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + } + { + // Case 3: known LastScheduleTime, no start needed. + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Status shows a start at the expected time. + cj.Status.LastScheduleTime = &metav1.Time{Time: T1} + // Current time is after T1 + now := T1.Add(2 * time.Minute) + times, err := getRecentUnmetScheduleTimes2(cj, now, PraseSchedule(cj.Spec.Schedule)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 0 { + t.Errorf("expected 0 start times, got: %v", times) + } + } + { + // Case 4: known LastScheduleTime, a start needed + // Creation time is before T1. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} + // Status shows a start at the expected time. + cj.Status.LastScheduleTime = &metav1.Time{Time: T1} + // Current time is after T1 and after T2 + now := T2.Add(5 * time.Minute) + times, err := getRecentUnmetScheduleTimes2(cj, now, PraseSchedule(cj.Spec.Schedule)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 1 { + t.Errorf("expected 1 start times, got: %v", times) + } else if !times[0].Equal(T2) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + } + { + // Case 5: known LastScheduleTime, two starts needed + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} + cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} + // Current time is after T1 and after T2 + now := T2.Add(5 * time.Minute) + times, err := getRecentUnmetScheduleTimes2(cj, now, PraseSchedule(cj.Spec.Schedule)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(times) != 2 { + t.Errorf("expected 2 start times, got: %v", times) + } else { + if !times[0].Equal(T1) { + t.Errorf("expected: %v, got: %v", T1, times[0]) + } + if !times[1].Equal(T2) { + t.Errorf("expected: %v, got: %v", T2, times[1]) + } + } + } + { + // Case 6: now is way way ahead of last start time, and there is no deadline. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} + cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} + now := T2.Add(10 * 24 * time.Hour) + _, err := getRecentUnmetScheduleTimes2(cj, now, PraseSchedule(cj.Spec.Schedule)) + if err == nil { + t.Errorf("expected an error") + } + } + { + // Case 7: now is way way ahead of last start time, but there is a short deadline. + cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} + cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} + now := T2.Add(10 * 24 * time.Hour) + // Deadline is short + deadline := int64(2 * 60 * 60) + cj.Spec.StartingDeadlineSeconds = &deadline + _, err := getRecentUnmetScheduleTimes2(cj, now, PraseSchedule(cj.Spec.Schedule)) + if err != nil { + t.Errorf("unexpected error") + } + } +} + func TestGetRecentUnmetScheduleTimes(t *testing.T) { // schedule is hourly on the hour schedule := "0 * * * ?" diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 2ab0b3bc3660c..86aa70fb60698 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -570,6 +570,16 @@ const ( // Enable all logic related to the PodDisruptionBudget API object in policy PodDisruptionBudget featuregate.Feature = "PodDisruptionBudget" + // owner: @alaypatel07, @soltysh + // alpha: v1.20 + // beta: v1.21 + // + // CronJobControllerV2 controls whether the controller manager starts old cronjob + // controller or new one which is implemented with informers and delaying queue + // + // This feature is deprecated, and will be removed in v1.22. + CronJobControllerV2 featuregate.Feature = "CronJobControllerV2" + // owner: @m1093782566 // alpha: v1.17 // @@ -723,6 +733,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.21 AllowInsecureBackendProxy: {Default: true, PreRelease: featuregate.Beta}, PodDisruptionBudget: {Default: true, PreRelease: featuregate.Beta}, + CronJobControllerV2: {Default: false, PreRelease: featuregate.Alpha}, ServiceTopology: {Default: false, PreRelease: featuregate.Alpha}, ServiceAppProtocol: {Default: true, PreRelease: featuregate.Beta}, ImmutableEphemeralVolumes: {Default: true, PreRelease: featuregate.Beta}, diff --git a/test/e2e/apps/cronjob.go b/test/e2e/apps/cronjob.go index 228bf6944ff47..a5fea77ce4c6e 100644 --- a/test/e2e/apps/cronjob.go +++ b/test/e2e/apps/cronjob.go @@ -170,6 +170,32 @@ var _ = SIGDescribe("CronJob", func() { framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name) }) + ginkgo.It("should be able to schedule after more than 100 missed schedule", func() { + ginkgo.By("Creating a cronjob") + cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batchv1beta1.ForbidConcurrent, + sleepCommand, nil, nil) + creationTime := time.Now().Add(-99 * 24 * time.Hour) + lastScheduleTime := creationTime.Add(-1 * 24 * time.Hour) + cronJob.CreationTimestamp = metav1.Time{Time: creationTime} + cronJob.Status.LastScheduleTime = &metav1.Time{Time: lastScheduleTime} + cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) + framework.ExpectNoError(err, "Failed to create CronJob in namespace %s", f.Namespace.Name) + + ginkgo.By("Ensuring one job is running") + err = waitForActiveJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, 1) + framework.ExpectNoError(err, "Failed to wait for active jobs in CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name) + + ginkgo.By("Ensuring at least one running jobs exists by listing jobs explicitly") + jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err, "Failed to list the CronJobs in namespace %s", f.Namespace.Name) + activeJobs, _ := filterActiveJobs(jobs) + gomega.Expect(len(activeJobs)).To(gomega.BeNumerically(">=", 1)) + + ginkgo.By("Removing cronjob") + err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name) + framework.ExpectNoError(err, "Failed to delete CronJob %s in namespace %s", cronJob.Name, f.Namespace.Name) + }) + // shouldn't give us unexpected warnings ginkgo.It("should not emit unexpected warnings", func() { ginkgo.By("Creating a cronjob") From c337a845d8cfc7c2a53b2126fdd89e6a1103bb26 Mon Sep 17 00:00:00 2001 From: Alay Patel Date: Sun, 11 Oct 2020 02:51:37 -0400 Subject: [PATCH 2/2] update violation_exceptions.list and make generated --- api/api-rules/violation_exceptions.list | 2 + pkg/controller/apis/config/BUILD | 1 + pkg/controller/apis/config/types.go | 4 + pkg/controller/apis/config/v1alpha1/BUILD | 1 + .../apis/config/v1alpha1/defaults.go | 3 + .../v1alpha1/zz_generated.conversion.go | 7 ++ .../apis/config/zz_generated.deepcopy.go | 1 + pkg/controller/cronjob/BUILD | 16 +++- pkg/controller/cronjob/config/BUILD | 29 ++++++ pkg/controller/cronjob/config/OWNERS | 14 +++ pkg/controller/cronjob/config/doc.go | 19 ++++ pkg/controller/cronjob/config/types.go | 26 ++++++ pkg/controller/cronjob/config/v1alpha1/BUILD | 36 ++++++++ .../cronjob/config/v1alpha1/conversion.go | 40 ++++++++ .../cronjob/config/v1alpha1/defaults.go | 36 ++++++++ pkg/controller/cronjob/config/v1alpha1/doc.go | 21 +++++ .../cronjob/config/v1alpha1/register.go | 31 +++++++ .../v1alpha1/zz_generated.conversion.go | 91 +++++++++++++++++++ .../config/v1alpha1/zz_generated.deepcopy.go | 21 +++++ .../cronjob/config/zz_generated.deepcopy.go | 37 ++++++++ .../config/v1alpha1/types.go | 12 +++ .../config/v1alpha1/zz_generated.deepcopy.go | 17 ++++ 22 files changed, 464 insertions(+), 1 deletion(-) create mode 100644 pkg/controller/cronjob/config/BUILD create mode 100644 pkg/controller/cronjob/config/OWNERS create mode 100644 pkg/controller/cronjob/config/doc.go create mode 100644 pkg/controller/cronjob/config/types.go create mode 100644 pkg/controller/cronjob/config/v1alpha1/BUILD create mode 100644 pkg/controller/cronjob/config/v1alpha1/conversion.go create mode 100644 pkg/controller/cronjob/config/v1alpha1/defaults.go create mode 100644 pkg/controller/cronjob/config/v1alpha1/doc.go create mode 100644 pkg/controller/cronjob/config/v1alpha1/register.go create mode 100644 pkg/controller/cronjob/config/v1alpha1/zz_generated.conversion.go create mode 100644 pkg/controller/cronjob/config/v1alpha1/zz_generated.deepcopy.go create mode 100644 pkg/controller/cronjob/config/zz_generated.deepcopy.go diff --git a/api/api-rules/violation_exceptions.list b/api/api-rules/violation_exceptions.list index d2fc81b01e98a..3344d1079be55 100644 --- a/api/api-rules/violation_exceptions.list +++ b/api/api-rules/violation_exceptions.list @@ -480,6 +480,7 @@ API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,C API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,CSRSigningControllerConfiguration,LegacyUnknownSignerConfiguration API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,CloudProviderConfiguration,CloudConfigFile API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,CloudProviderConfiguration,Name +API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,CronJobControllerConfiguration,ConcurrentCronJobSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,DaemonSetControllerConfiguration,ConcurrentDaemonSetSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,DeploymentControllerConfiguration,ConcurrentDeploymentSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,DeploymentControllerConfiguration,DeploymentControllerSyncPeriod @@ -530,6 +531,7 @@ API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,K API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,KubeCloudSharedConfiguration,UseServiceAccountCredentials API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,KubeControllerManagerConfiguration,AttachDetachController API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,KubeControllerManagerConfiguration,CSRSigningController +API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,KubeControllerManagerConfiguration,CronJobController API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,KubeControllerManagerConfiguration,DaemonSetController API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,KubeControllerManagerConfiguration,DeploymentController API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,KubeControllerManagerConfiguration,DeprecatedController diff --git a/pkg/controller/apis/config/BUILD b/pkg/controller/apis/config/BUILD index f5340ed02bda8..b4478db964482 100644 --- a/pkg/controller/apis/config/BUILD +++ b/pkg/controller/apis/config/BUILD @@ -12,6 +12,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/controller/certificates/signer/config:go_default_library", + "//pkg/controller/cronjob/config:go_default_library", "//pkg/controller/daemon/config:go_default_library", "//pkg/controller/deployment/config:go_default_library", "//pkg/controller/endpoint/config:go_default_library", diff --git a/pkg/controller/apis/config/types.go b/pkg/controller/apis/config/types.go index 1548cfc1c246d..aaecb7092d09c 100644 --- a/pkg/controller/apis/config/types.go +++ b/pkg/controller/apis/config/types.go @@ -20,6 +20,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" componentbaseconfig "k8s.io/component-base/config" csrsigningconfig "k8s.io/kubernetes/pkg/controller/certificates/signer/config" + cronjobconfig "k8s.io/kubernetes/pkg/controller/cronjob/config" daemonconfig "k8s.io/kubernetes/pkg/controller/daemon/config" deploymentconfig "k8s.io/kubernetes/pkg/controller/deployment/config" endpointconfig "k8s.io/kubernetes/pkg/controller/endpoint/config" @@ -89,6 +90,9 @@ type KubeControllerManagerConfiguration struct { HPAController poautosclerconfig.HPAControllerConfiguration // JobControllerConfiguration holds configuration for JobController related features. JobController jobconfig.JobControllerConfiguration + // CronJobControllerConfiguration holds configuration for CronJobController + // related features. + CronJobController cronjobconfig.CronJobControllerConfiguration // NamespaceControllerConfiguration holds configuration for NamespaceController // related features. NamespaceController namespaceconfig.NamespaceControllerConfiguration diff --git a/pkg/controller/apis/config/v1alpha1/BUILD b/pkg/controller/apis/config/v1alpha1/BUILD index 82d2e12570ccf..ffc1c9242377f 100644 --- a/pkg/controller/apis/config/v1alpha1/BUILD +++ b/pkg/controller/apis/config/v1alpha1/BUILD @@ -16,6 +16,7 @@ go_library( deps = [ "//pkg/controller/apis/config:go_default_library", "//pkg/controller/certificates/signer/config/v1alpha1:go_default_library", + "//pkg/controller/cronjob/config/v1alpha1:go_default_library", "//pkg/controller/daemon/config/v1alpha1:go_default_library", "//pkg/controller/deployment/config/v1alpha1:go_default_library", "//pkg/controller/endpoint/config/v1alpha1:go_default_library", diff --git a/pkg/controller/apis/config/v1alpha1/defaults.go b/pkg/controller/apis/config/v1alpha1/defaults.go index eea3ac3e7be8e..28012182ea471 100644 --- a/pkg/controller/apis/config/v1alpha1/defaults.go +++ b/pkg/controller/apis/config/v1alpha1/defaults.go @@ -24,6 +24,7 @@ import ( componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" kubectrlmgrconfigv1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" csrsigningconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/certificates/signer/config/v1alpha1" + cronjobconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/cronjob/config/v1alpha1" daemonconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/daemon/config/v1alpha1" deploymentconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/deployment/config/v1alpha1" endpointconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/endpoint/config/v1alpha1" @@ -88,6 +89,8 @@ func SetDefaults_KubeControllerManagerConfiguration(obj *kubectrlmgrconfigv1alph garbagecollectorconfigv1alpha1.RecommendedDefaultGarbageCollectorControllerConfiguration(&obj.GarbageCollectorController) // Use the default RecommendedDefaultJobControllerConfiguration options jobconfigv1alpha1.RecommendedDefaultJobControllerConfiguration(&obj.JobController) + // Use the default RecommendedDefaultCronJobControllerConfiguration options + cronjobconfigv1alpha1.RecommendedDefaultCronJobControllerConfiguration(&obj.CronJobController) // Use the default RecommendedDefaultNamespaceControllerConfiguration options namespaceconfigv1alpha1.RecommendedDefaultNamespaceControllerConfiguration(&obj.NamespaceController) // Use the default RecommendedDefaultNodeIPAMControllerConfiguration options diff --git a/pkg/controller/apis/config/v1alpha1/zz_generated.conversion.go b/pkg/controller/apis/config/v1alpha1/zz_generated.conversion.go index a93b332a4c945..b5ad141a75f21 100644 --- a/pkg/controller/apis/config/v1alpha1/zz_generated.conversion.go +++ b/pkg/controller/apis/config/v1alpha1/zz_generated.conversion.go @@ -30,6 +30,7 @@ import ( v1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" config "k8s.io/kubernetes/pkg/controller/apis/config" signerconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/certificates/signer/config/v1alpha1" + cronjobconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/cronjob/config/v1alpha1" daemonconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/daemon/config/v1alpha1" deploymentconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/deployment/config/v1alpha1" endpointconfigv1alpha1 "k8s.io/kubernetes/pkg/controller/endpoint/config/v1alpha1" @@ -310,6 +311,9 @@ func autoConvert_v1alpha1_KubeControllerManagerConfiguration_To_config_KubeContr if err := jobconfigv1alpha1.Convert_v1alpha1_JobControllerConfiguration_To_config_JobControllerConfiguration(&in.JobController, &out.JobController, s); err != nil { return err } + if err := cronjobconfigv1alpha1.Convert_v1alpha1_CronJobControllerConfiguration_To_config_CronJobControllerConfiguration(&in.CronJobController, &out.CronJobController, s); err != nil { + return err + } if err := namespaceconfigv1alpha1.Convert_v1alpha1_NamespaceControllerConfiguration_To_config_NamespaceControllerConfiguration(&in.NamespaceController, &out.NamespaceController, s); err != nil { return err } @@ -394,6 +398,9 @@ func autoConvert_config_KubeControllerManagerConfiguration_To_v1alpha1_KubeContr if err := jobconfigv1alpha1.Convert_config_JobControllerConfiguration_To_v1alpha1_JobControllerConfiguration(&in.JobController, &out.JobController, s); err != nil { return err } + if err := cronjobconfigv1alpha1.Convert_config_CronJobControllerConfiguration_To_v1alpha1_CronJobControllerConfiguration(&in.CronJobController, &out.CronJobController, s); err != nil { + return err + } if err := namespaceconfigv1alpha1.Convert_config_NamespaceControllerConfiguration_To_v1alpha1_NamespaceControllerConfiguration(&in.NamespaceController, &out.NamespaceController, s); err != nil { return err } diff --git a/pkg/controller/apis/config/zz_generated.deepcopy.go b/pkg/controller/apis/config/zz_generated.deepcopy.go index cba4ff2b8d06e..c565e6537f99e 100644 --- a/pkg/controller/apis/config/zz_generated.deepcopy.go +++ b/pkg/controller/apis/config/zz_generated.deepcopy.go @@ -120,6 +120,7 @@ func (in *KubeControllerManagerConfiguration) DeepCopyInto(out *KubeControllerMa in.GarbageCollectorController.DeepCopyInto(&out.GarbageCollectorController) out.HPAController = in.HPAController out.JobController = in.JobController + out.CronJobController = in.CronJobController out.NamespaceController = in.NamespaceController out.NodeIPAMController = in.NodeIPAMController out.NodeLifecycleController = in.NodeLifecycleController diff --git a/pkg/controller/cronjob/BUILD b/pkg/controller/cronjob/BUILD index e5f3e37bc11aa..db8ea629545ff 100644 --- a/pkg/controller/cronjob/BUILD +++ b/pkg/controller/cronjob/BUILD @@ -10,12 +10,14 @@ go_library( name = "go_default_library", srcs = [ "cronjob_controller.go", + "cronjob_controllerv2.go", "doc.go", "injection.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/controller/cronjob", deps = [ + "//pkg/controller:go_default_library", "//staging/src/k8s.io/api/batch/v1:go_default_library", "//staging/src/k8s.io/api/batch/v1beta1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -26,12 +28,18 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/informers/batch/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/batch/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/batch/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/batch/v1beta1:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/pager:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/reference:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library", "//vendor/github.com/robfig/cron:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", @@ -42,6 +50,7 @@ go_test( name = "go_default_test", srcs = [ "cronjob_controller_test.go", + "cronjob_controllerv2_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -55,6 +64,8 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/github.com/robfig/cron:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], ) @@ -68,6 +79,9 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//pkg/controller/cronjob/config:all-srcs", + ], tags = ["automanaged"], ) diff --git a/pkg/controller/cronjob/config/BUILD b/pkg/controller/cronjob/config/BUILD new file mode 100644 index 0000000000000..39558cfd452b9 --- /dev/null +++ b/pkg/controller/cronjob/config/BUILD @@ -0,0 +1,29 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "types.go", + "zz_generated.deepcopy.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/cronjob/config", + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/controller/cronjob/config/v1alpha1:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/cronjob/config/OWNERS b/pkg/controller/cronjob/config/OWNERS new file mode 100644 index 0000000000000..e3149dc3c06ad --- /dev/null +++ b/pkg/controller/cronjob/config/OWNERS @@ -0,0 +1,14 @@ +approvers: +- api-approvers +- deads2k +- luxas +- mtaufen +- sttts +- stewart-yu +reviewers: +- api-reviewers +- deads2k +- luxas +- mtaufen +- sttts +- stewart-yu diff --git a/pkg/controller/cronjob/config/doc.go b/pkg/controller/cronjob/config/doc.go new file mode 100644 index 0000000000000..f5d5ba187351c --- /dev/null +++ b/pkg/controller/cronjob/config/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package + +package config // import "k8s.io/kubernetes/pkg/controller/cronjob/config" diff --git a/pkg/controller/cronjob/config/types.go b/pkg/controller/cronjob/config/types.go new file mode 100644 index 0000000000000..3db0b535c9966 --- /dev/null +++ b/pkg/controller/cronjob/config/types.go @@ -0,0 +1,26 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +// CronJobControllerConfiguration contains elements describing the +// CronJobControllerV2. +type CronJobControllerConfiguration struct { + // ConcurrentCronJobSyncs is the number of cron job objects that are + // allowed to sync concurrently. Larger number = more responsive jobs, + // but more CPU (and network) load. + ConcurrentCronJobSyncs int32 +} diff --git a/pkg/controller/cronjob/config/v1alpha1/BUILD b/pkg/controller/cronjob/config/v1alpha1/BUILD new file mode 100644 index 0000000000000..58d019105fb68 --- /dev/null +++ b/pkg/controller/cronjob/config/v1alpha1/BUILD @@ -0,0 +1,36 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "conversion.go", + "defaults.go", + "doc.go", + "register.go", + "zz_generated.conversion.go", + "zz_generated.deepcopy.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/cronjob/config/v1alpha1", + visibility = ["//visibility:public"], + deps = [ + "//pkg/controller/cronjob/config:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/kube-controller-manager/config/v1alpha1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/cronjob/config/v1alpha1/conversion.go b/pkg/controller/cronjob/config/v1alpha1/conversion.go new file mode 100644 index 0000000000000..a032a0df8e6f4 --- /dev/null +++ b/pkg/controller/cronjob/config/v1alpha1/conversion.go @@ -0,0 +1,40 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/kube-controller-manager/config/v1alpha1" + "k8s.io/kubernetes/pkg/controller/cronjob/config" +) + +// Important! The public back-and-forth conversion functions for the types in this package +// with CronJobControllerConfiguration types need to be manually exposed like this in order for +// other packages that reference this package to be able to call these conversion functions +// in an autogenerated manner. +// TODO: Fix the bug in conversion-gen so it automatically discovers these Convert_* functions +// in autogenerated code as well. + +// Convert_v1alpha1_CronJobControllerConfiguration_To_config_CronJobControllerConfiguration is an autogenerated conversion function. +func Convert_v1alpha1_CronJobControllerConfiguration_To_config_CronJobControllerConfiguration(in *v1alpha1.CronJobControllerConfiguration, out *config.CronJobControllerConfiguration, s conversion.Scope) error { + return autoConvert_v1alpha1_CronJobControllerConfiguration_To_config_CronJobControllerConfiguration(in, out, s) +} + +// Convert_config_CronJobControllerConfiguration_To_v1alpha1_CronJobControllerConfiguration is an autogenerated conversion function. +func Convert_config_CronJobControllerConfiguration_To_v1alpha1_CronJobControllerConfiguration(in *config.CronJobControllerConfiguration, out *v1alpha1.CronJobControllerConfiguration, s conversion.Scope) error { + return autoConvert_config_CronJobControllerConfiguration_To_v1alpha1_CronJobControllerConfiguration(in, out, s) +} diff --git a/pkg/controller/cronjob/config/v1alpha1/defaults.go b/pkg/controller/cronjob/config/v1alpha1/defaults.go new file mode 100644 index 0000000000000..6ce1aa29402a4 --- /dev/null +++ b/pkg/controller/cronjob/config/v1alpha1/defaults.go @@ -0,0 +1,36 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + kubectrlmgrconfigv1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" +) + +// RecommendedDefaultCronJobControllerConfiguration defaults a pointer to a +// CronJobControllerConfiguration struct. This will set the recommended default +// values, but they may be subject to change between API versions. This function +// is intentionally not registered in the scheme as a "normal" `SetDefaults_Foo` +// function to allow consumers of this type to set whatever defaults for their +// embedded configs. Forcing consumers to use these defaults would be problematic +// as defaulting in the scheme is done as part of the conversion, and there would +// be no easy way to opt-out. Instead, if you want to use this defaulting method +// run it in your wrapper struct of this type in its `SetDefaults_` method. +func RecommendedDefaultCronJobControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.CronJobControllerConfiguration) { + if obj.ConcurrentCronJobSyncs == 0 { + obj.ConcurrentCronJobSyncs = 5 + } +} diff --git a/pkg/controller/cronjob/config/v1alpha1/doc.go b/pkg/controller/cronjob/config/v1alpha1/doc.go new file mode 100644 index 0000000000000..7a66b80104e8b --- /dev/null +++ b/pkg/controller/cronjob/config/v1alpha1/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package +// +k8s:conversion-gen=k8s.io/kubernetes/pkg/controller/cronjob/config +// +k8s:conversion-gen-external-types=k8s.io/kube-controller-manager/config/v1alpha1 + +package v1alpha1 // import "k8s.io/kubernetes/pkg/controller/cronjob/config/v1alpha1" diff --git a/pkg/controller/cronjob/config/v1alpha1/register.go b/pkg/controller/cronjob/config/v1alpha1/register.go new file mode 100644 index 0000000000000..47e07078220e1 --- /dev/null +++ b/pkg/controller/cronjob/config/v1alpha1/register.go @@ -0,0 +1,31 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" +) + +var ( + // SchemeBuilder is the scheme builder with scheme init functions to run for this API package + SchemeBuilder runtime.SchemeBuilder + // localSchemeBuilder extends the SchemeBuilder instance with the external types. In this package, + // defaulting and conversion init funcs are registered as well. + localSchemeBuilder = &SchemeBuilder + // AddToScheme is a global function that registers this API group & version to a scheme + AddToScheme = localSchemeBuilder.AddToScheme +) diff --git a/pkg/controller/cronjob/config/v1alpha1/zz_generated.conversion.go b/pkg/controller/cronjob/config/v1alpha1/zz_generated.conversion.go new file mode 100644 index 0000000000000..1fb123d540838 --- /dev/null +++ b/pkg/controller/cronjob/config/v1alpha1/zz_generated.conversion.go @@ -0,0 +1,91 @@ +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by conversion-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" + v1alpha1 "k8s.io/kube-controller-manager/config/v1alpha1" + config "k8s.io/kubernetes/pkg/controller/cronjob/config" +) + +func init() { + localSchemeBuilder.Register(RegisterConversions) +} + +// RegisterConversions adds conversion functions to the given scheme. +// Public to allow building arbitrary schemes. +func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*v1alpha1.GroupResource)(nil), (*v1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_GroupResource_To_v1_GroupResource(a.(*v1alpha1.GroupResource), b.(*v1.GroupResource), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1.GroupResource)(nil), (*v1alpha1.GroupResource)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1_GroupResource_To_v1alpha1_GroupResource(a.(*v1.GroupResource), b.(*v1alpha1.GroupResource), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*config.CronJobControllerConfiguration)(nil), (*v1alpha1.CronJobControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_CronJobControllerConfiguration_To_v1alpha1_CronJobControllerConfiguration(a.(*config.CronJobControllerConfiguration), b.(*v1alpha1.CronJobControllerConfiguration), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*v1alpha1.CronJobControllerConfiguration)(nil), (*config.CronJobControllerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_CronJobControllerConfiguration_To_config_CronJobControllerConfiguration(a.(*v1alpha1.CronJobControllerConfiguration), b.(*config.CronJobControllerConfiguration), scope) + }); err != nil { + return err + } + return nil +} + +func autoConvert_v1alpha1_CronJobControllerConfiguration_To_config_CronJobControllerConfiguration(in *v1alpha1.CronJobControllerConfiguration, out *config.CronJobControllerConfiguration, s conversion.Scope) error { + out.ConcurrentCronJobSyncs = in.ConcurrentCronJobSyncs + return nil +} + +func autoConvert_config_CronJobControllerConfiguration_To_v1alpha1_CronJobControllerConfiguration(in *config.CronJobControllerConfiguration, out *v1alpha1.CronJobControllerConfiguration, s conversion.Scope) error { + out.ConcurrentCronJobSyncs = in.ConcurrentCronJobSyncs + return nil +} + +func autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error { + out.Group = in.Group + out.Resource = in.Resource + return nil +} + +// Convert_v1alpha1_GroupResource_To_v1_GroupResource is an autogenerated conversion function. +func Convert_v1alpha1_GroupResource_To_v1_GroupResource(in *v1alpha1.GroupResource, out *v1.GroupResource, s conversion.Scope) error { + return autoConvert_v1alpha1_GroupResource_To_v1_GroupResource(in, out, s) +} + +func autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error { + out.Group = in.Group + out.Resource = in.Resource + return nil +} + +// Convert_v1_GroupResource_To_v1alpha1_GroupResource is an autogenerated conversion function. +func Convert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, out *v1alpha1.GroupResource, s conversion.Scope) error { + return autoConvert_v1_GroupResource_To_v1alpha1_GroupResource(in, out, s) +} diff --git a/pkg/controller/cronjob/config/v1alpha1/zz_generated.deepcopy.go b/pkg/controller/cronjob/config/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 0000000000000..0ec19467c4048 --- /dev/null +++ b/pkg/controller/cronjob/config/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,21 @@ +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1alpha1 diff --git a/pkg/controller/cronjob/config/zz_generated.deepcopy.go b/pkg/controller/cronjob/config/zz_generated.deepcopy.go new file mode 100644 index 0000000000000..2d08f74c1455e --- /dev/null +++ b/pkg/controller/cronjob/config/zz_generated.deepcopy.go @@ -0,0 +1,37 @@ +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package config + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CronJobControllerConfiguration) DeepCopyInto(out *CronJobControllerConfiguration) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CronJobControllerConfiguration. +func (in *CronJobControllerConfiguration) DeepCopy() *CronJobControllerConfiguration { + if in == nil { + return nil + } + out := new(CronJobControllerConfiguration) + in.DeepCopyInto(out) + return out +} diff --git a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go index c693f30d95a50..74e91fb52b6c1 100644 --- a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go +++ b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go @@ -124,6 +124,10 @@ type KubeControllerManagerConfiguration struct { HPAController HPAControllerConfiguration // JobControllerConfiguration holds configuration for JobController related features. JobController JobControllerConfiguration + // CronJobControllerConfiguration holds configuration for CronJobController related features. + CronJobController CronJobControllerConfiguration + // NamespaceControllerConfiguration holds configuration for NamespaceController + // related features. // NamespaceControllerConfiguration holds configuration for NamespaceController // related features. NamespaceController NamespaceControllerConfiguration @@ -413,6 +417,14 @@ type JobControllerConfiguration struct { ConcurrentJobSyncs int32 } +// CronJobControllerConfiguration contains elements describing CrongJob2Controller. +type CronJobControllerConfiguration struct { + // concurrentCronJobSyncs is the number of job objects that are + // allowed to sync concurrently. Larger number = more responsive jobs, + // but more CPU (and network) load. + ConcurrentCronJobSyncs int32 +} + // NamespaceControllerConfiguration contains elements describing NamespaceController. type NamespaceControllerConfiguration struct { // namespaceSyncPeriod is the period for syncing namespace life-cycle diff --git a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go index 453675db20768..592f2f7cc7788 100644 --- a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/zz_generated.deepcopy.go @@ -94,6 +94,22 @@ func (in *CloudProviderConfiguration) DeepCopy() *CloudProviderConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CronJobControllerConfiguration) DeepCopyInto(out *CronJobControllerConfiguration) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CronJobControllerConfiguration. +func (in *CronJobControllerConfiguration) DeepCopy() *CronJobControllerConfiguration { + if in == nil { + return nil + } + out := new(CronJobControllerConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DaemonSetControllerConfiguration) DeepCopyInto(out *DaemonSetControllerConfiguration) { *out = *in @@ -348,6 +364,7 @@ func (in *KubeControllerManagerConfiguration) DeepCopyInto(out *KubeControllerMa in.GarbageCollectorController.DeepCopyInto(&out.GarbageCollectorController) in.HPAController.DeepCopyInto(&out.HPAController) out.JobController = in.JobController + out.CronJobController = in.CronJobController out.NamespaceController = in.NamespaceController out.NodeIPAMController = in.NodeIPAMController in.NodeLifecycleController.DeepCopyInto(&out.NodeLifecycleController)