From 62f825931147569928d115c5aa3420ece5daab8c Mon Sep 17 00:00:00 2001 From: Caleb Ice Date: Fri, 17 Apr 2020 10:30:09 -0700 Subject: [PATCH] Adding igneous specific patches --- README.md | 8 ++++ cmd/kubelet/app/plugins.go | 2 + cmd/kubelet/app/plugins_disabled.go | 44 +++++++++++++++++++ pkg/kubelet/container/helpers.go | 17 ++++++- pkg/kubelet/kubelet.go | 3 ++ pkg/kubelet/kubelet_pods.go | 6 +++ pkg/kubelet/kuberuntime/helpers.go | 8 +++- .../kuberuntime/kuberuntime_container.go | 5 +++ .../kuberuntime/kuberuntime_manager.go | 20 ++++++--- pkg/kubelet/pod_workers.go | 28 ++++++++++-- pkg/kubelet/pod_workers_test.go | 32 +++++++++++++- 11 files changed, 160 insertions(+), 13 deletions(-) create mode 100644 cmd/kubelet/app/plugins_disabled.go diff --git a/README.md b/README.md index dc85376c65f3d..69cae0cbc2f22 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,11 @@ +This is the Igneous fork of Kubernetes. This repo contains features we've +backported as well as customizations that are not (yet) upstream. We vendor +this repo into our product codebase and in that repo link in our equivalent +replacements for etcd, the apiserver, and other minor components. We provide +no support for this repo. + +You can find the official project at [kubernetes.io]. + # Kubernetes [![GoDoc Widget]][GoDoc] [![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/569/badge)](https://bestpractices.coreinfrastructure.org/projects/569) diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index af9f708dc5258..d054d9d7405b6 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -1,3 +1,5 @@ +// +build kubernetes_plugins + /* Copyright 2014 The Kubernetes Authors. diff --git a/cmd/kubelet/app/plugins_disabled.go b/cmd/kubelet/app/plugins_disabled.go new file mode 100644 index 0000000000000..9d613283e6a6c --- /dev/null +++ b/cmd/kubelet/app/plugins_disabled.go @@ -0,0 +1,44 @@ +// +build !kubernetes_plugins + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +// This file exists to force no plugin implementations to be linked. +import ( + // Network plugins + "k8s.io/kubernetes/pkg/kubelet/dockershim/network" + // Volume plugins + "k8s.io/kubernetes/pkg/volume" +) + +// ProbeVolumePlugins collects all volume plugins into an easy to use list. +func ProbeVolumePlugins() []volume.VolumePlugin { + return []volume.VolumePlugin{} +} + +// GetDynamicPluginProber gets the probers of dynamically discoverable plugins +// for kubelet. +// Currently only Flexvolume plugins are dynamically discoverable. +func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber { + return nil +} + +// ProbeNetworkPlugins collects all compiled-in plugins +func ProbeNetworkPlugins(cniConfDir, cniBinDir string) []network.NetworkPlugin { + return []network.NetworkPlugin{} +} diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index b15be2d7500bc..b46d593d2010b 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -21,6 +21,7 @@ import ( "fmt" "hash/fnv" "strings" + "time" "k8s.io/klog" @@ -58,6 +59,13 @@ type RuntimeHelper interface { GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 } +// InCreatedStateGracePeriod returns true if the container in created state was +// created within last one minute. This container is expected to transition +// out of created state soon. +func InCreatedStateGracePeriod(cs *ContainerStatus) bool { + return cs != nil && cs.State == ContainerStateCreated && time.Since(cs.CreatedAt) < time.Minute +} + // ShouldContainerBeRestarted checks whether a container needs to be restarted. // TODO(yifan): Think about how to refactor this. func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool { @@ -76,10 +84,15 @@ func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus if status.State == ContainerStateRunning { return false } - // Always restart container in the unknown, or in the created state. - if status.State == ContainerStateUnknown || status.State == ContainerStateCreated { + // Always restart container in the unknown. + if status.State == ContainerStateUnknown { return true } + // Only restart created container if it hasn't been started in over + // a minute. + if status.State == ContainerStateCreated { + return !InCreatedStateGracePeriod(status) + } // Check RestartPolicy for dead container if pod.Spec.RestartPolicy == v1.RestartPolicyNever { klog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, format.Pod(pod)) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 08df1031a34a2..68dc2d34b27d9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1749,6 +1749,9 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { } kl.podWorkers.ForgetWorker(pod.UID) + // Wait until the worker unblocks on any pod cache update and exits. + kl.podWorkers.WaitWorkerExit(pod.UID) + // Runtime cache may not have been updated to with the pod, but it's okay // because the periodic cleanup routine will attempt to delete again later. runningPods, err := kl.runtimeCache.GetPods() diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 9d45549bf64fb..b4bb6aa18852e 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1487,6 +1487,12 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon case kubecontainer.ContainerStateRunning: status.State.Running = &v1.ContainerStateRunning{StartedAt: metav1.NewTime(cs.StartedAt)} case kubecontainer.ContainerStateCreated: + // Don't assume the container as exited if it's still + // in the creation grace period. + if kubecontainer.InCreatedStateGracePeriod(cs) { + status.State.Waiting = &v1.ContainerStateWaiting{Reason: "ContainerCreating"} + break + } // Treat containers in the "created" state as if they are exited. // The pod workers are supposed start all containers it creates in // one sync (syncPod) iteration. There should not be any normal diff --git a/pkg/kubelet/kuberuntime/helpers.go b/pkg/kubelet/kuberuntime/helpers.go index 2d970333b3bd2..aace3958f876f 100644 --- a/pkg/kubelet/kuberuntime/helpers.go +++ b/pkg/kubelet/kuberuntime/helpers.go @@ -22,7 +22,7 @@ import ( "strconv" "strings" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog" @@ -141,6 +141,8 @@ func (m *kubeGenericRuntimeManager) getImageUser(image string) (*int64, string, // isInitContainerFailed returns true if container has exited and exitcode is not zero // or is in unknown state. +// it has been in created state for over a minute. + func isInitContainerFailed(status *kubecontainer.ContainerStatus) bool { if status.State == kubecontainer.ContainerStateExited && status.ExitCode != 0 { return true @@ -150,6 +152,10 @@ func isInitContainerFailed(status *kubecontainer.ContainerStatus) bool { return true } + if status.State == kubecontainer.ContainerStateCreated { + return !kubecontainer.InCreatedStateGracePeriod(status) + } + return false } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 89be5366b4f9a..43ec220b47639 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -792,6 +792,11 @@ func findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus) return nil, nil, false } + // container is created within last one minute, return not done. + if status.State == kubecontainer.ContainerStateCreated && kubecontainer.InCreatedStateGracePeriod(status) { + return nil, nil, false + } + if status.State == kubecontainer.ContainerStateExited { // all init containers successful if i == (len(pod.Spec.InitContainers) - 1) { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 9035f6a9be019..ba62c764843b2 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -539,7 +539,11 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku changes.KillPod = true } else { // Always try to stop containers in unknown state first. - if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown { + // A created state container is only returned if + // it has been in that state for over a minute. + // So always attempt to stop it. + if initLastStatus != nil && (initLastStatus.State == kubecontainer.ContainerStateUnknown || + initLastStatus.State == kubecontainer.ContainerStateCreated) { changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{ name: next.Name, container: next, @@ -564,24 +568,28 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku // Call internal container post-stop lifecycle hook for any non-running container so that any // allocated cpus are released immediately. If the container is restarted, cpus will be re-allocated // to it. - if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning { + if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning && !kubecontainer.InCreatedStateGracePeriod(containerStatus) { if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil { klog.Errorf("internal container post-stop lifecycle hook failed for container %v in pod %v with error %v", container.Name, pod.Name, err) } } - // If container does not exist, or is not running, check whether we + // If container does not exist, or is not running, or has been + // in created state for over a minute, check whether we // need to restart it. - if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning { + if containerStatus == nil || (containerStatus.State != kubecontainer.ContainerStateRunning && !kubecontainer.InCreatedStateGracePeriod(containerStatus)) { if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) { message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container) klog.V(3).Infof(message) changes.ContainersToStart = append(changes.ContainersToStart, idx) - if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown { + if containerStatus != nil && (containerStatus.State == kubecontainer.ContainerStateUnknown || containerStatus.State == kubecontainer.ContainerStateCreated) { // If container is in unknown state, we don't know whether it // is actually running or not, always try killing it before // restart to avoid having 2 running instances of the same container. + // Similarly, kill created containers + // that haven't started for over a + // minute. changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{ name: containerStatus.Name, container: &pod.Spec.Containers[idx], @@ -592,7 +600,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku } continue } - // The container is running, but kill the container if any of the following condition is met. + // The container is running or is in created state, but kill the container if any of the following condition is met. var message string restart := shouldRestartOnFailure(pod) if _, _, changed := containerChanged(&container, containerStatus); changed { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 29dbe8ef5fdf6..f727225f755a5 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -22,7 +22,7 @@ import ( "sync" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -75,6 +75,7 @@ type PodWorkers interface { UpdatePod(options *UpdatePodOptions) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty) ForgetWorker(uid types.UID) + WaitWorkerExit(uid types.UID) } // syncPodOptions provides the arguments to a SyncPod operation. @@ -120,6 +121,9 @@ type podWorkers struct { // undelivered if it comes in while the worker is working. lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions + // Track the worker stop + workerDone map[types.UID]chan struct{} + workQueue queue.WorkQueue // This function is run to sync the desired stated of pod. @@ -146,6 +150,7 @@ func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQ podUpdates: map[types.UID]chan UpdatePodOptions{}, isWorking: map[types.UID]bool{}, lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{}, + workerDone: map[types.UID]chan struct{}{}, syncPodFn: syncPodFn, recorder: recorder, workQueue: workQueue, @@ -213,14 +218,18 @@ func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { podUpdates = make(chan UpdatePodOptions, 1) p.podUpdates[uid] = podUpdates + // Setup the worker exit channel + p.workerDone[uid] = make(chan struct{}) + // Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. - go func() { + go func(doneCh chan<- struct{}) { defer runtime.HandleCrash() p.managePodLoop(podUpdates) - }() + close(doneCh) + }(p.workerDone[uid]) } if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true @@ -260,6 +269,19 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets. } } +// WaitWorkerExit waits for an exited worker to stop +func (p *podWorkers) WaitWorkerExit(uid types.UID) { + p.podLock.Lock() + doneCh, ok := p.workerDone[uid] + if !ok { + p.podLock.Unlock() + return + } + delete(p.workerDone, uid) + p.podLock.Unlock() + <-doneCh +} + func (p *podWorkers) wrapUp(uid types.UID, syncErr error) { // Requeue the last update if the last sync returned error. switch { diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 3584b54fa4921..74498917baa0c 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -22,7 +22,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" @@ -62,6 +62,8 @@ func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]s func (f *fakePodWorkers) ForgetWorker(uid types.UID) {} +func (f *fakePodWorkers) WaitWorkerExit(uid types.UID) {} + type TestingInterface interface { Errorf(format string, args ...interface{}) } @@ -341,3 +343,31 @@ func TestKillPodNowFunc(t *testing.T) { t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill) } } + +func TestWaitWorkerExit(t *testing.T) { + fakeRecorder := &record.FakeRecorder{} + fakeRuntime := &containertest.FakeRuntime{} + fakeCache := containertest.NewFakeCache(fakeRuntime) + + kubeletForRealWorkers := &simpleFakeKubelet{} + realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache) + + pod := podWithUIDNameNs("dummyuid", "foo", "bar") + kubeletForRealWorkers.wg.Add(1) + realPodWorkers.UpdatePod(&UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodUpdate, + }) + kubeletForRealWorkers.wg.Wait() + + realPodWorkers.ForgetWorker(pod.GetUID()) + realPodWorkers.WaitWorkerExit(pod.GetUID()) + + realPodWorkers.podLock.Lock() + _, updateExists := realPodWorkers.podUpdates[pod.GetUID()] + _, doneExists := realPodWorkers.workerDone[pod.GetUID()] + realPodWorkers.podLock.Unlock() + if updateExists || doneExists { + t.Errorf("Pod worker not existed") + } +}