Skip to content
This repository was archived by the owner on Sep 9, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
This is the Igneous fork of Kubernetes. This repo contains features we've
backported as well as customizations that are not (yet) upstream. We vendor
this repo into our product codebase and in that repo link in our equivalent
replacements for etcd, the apiserver, and other minor components. We provide
no support for this repo.

You can find the official project at [kubernetes.io].

# Kubernetes

[![GoDoc Widget]][GoDoc] [![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/569/badge)](https://bestpractices.coreinfrastructure.org/projects/569)
Expand Down
2 changes: 2 additions & 0 deletions cmd/kubelet/app/plugins.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build kubernetes_plugins

/*
Copyright 2014 The Kubernetes Authors.

Expand Down
44 changes: 44 additions & 0 deletions cmd/kubelet/app/plugins_disabled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// +build !kubernetes_plugins

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

// This file exists to force no plugin implementations to be linked.
import (
// Network plugins
"k8s.io/kubernetes/pkg/kubelet/dockershim/network"
// Volume plugins
"k8s.io/kubernetes/pkg/volume"
)

// ProbeVolumePlugins collects all volume plugins into an easy to use list.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{}
}

// GetDynamicPluginProber gets the probers of dynamically discoverable plugins
// for kubelet.
// Currently only Flexvolume plugins are dynamically discoverable.
func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber {
return nil
}

// ProbeNetworkPlugins collects all compiled-in plugins
func ProbeNetworkPlugins(cniConfDir, cniBinDir string) []network.NetworkPlugin {
return []network.NetworkPlugin{}
}
17 changes: 15 additions & 2 deletions pkg/kubelet/container/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"hash/fnv"
"strings"
"time"

"k8s.io/klog"

Expand Down Expand Up @@ -58,6 +59,13 @@ type RuntimeHelper interface {
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
}

// InCreatedStateGracePeriod returns true if the container in created state was
// created within last one minute. This container is expected to transition
// out of created state soon.
func InCreatedStateGracePeriod(cs *ContainerStatus) bool {
return cs != nil && cs.State == ContainerStateCreated && time.Since(cs.CreatedAt) < time.Minute
}

// ShouldContainerBeRestarted checks whether a container needs to be restarted.
// TODO(yifan): Think about how to refactor this.
func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool {
Expand All @@ -76,10 +84,15 @@ func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus
if status.State == ContainerStateRunning {
return false
}
// Always restart container in the unknown, or in the created state.
if status.State == ContainerStateUnknown || status.State == ContainerStateCreated {
// Always restart container in the unknown.
if status.State == ContainerStateUnknown {
return true
}
// Only restart created container if it hasn't been started in over
// a minute.
if status.State == ContainerStateCreated {
return !InCreatedStateGracePeriod(status)
}
// Check RestartPolicy for dead container
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
klog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, format.Pod(pod))
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1749,6 +1749,9 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
}
kl.podWorkers.ForgetWorker(pod.UID)

// Wait until the worker unblocks on any pod cache update and exits.
kl.podWorkers.WaitWorkerExit(pod.UID)

// Runtime cache may not have been updated to with the pod, but it's okay
// because the periodic cleanup routine will attempt to delete again later.
runningPods, err := kl.runtimeCache.GetPods()
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,12 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
case kubecontainer.ContainerStateRunning:
status.State.Running = &v1.ContainerStateRunning{StartedAt: metav1.NewTime(cs.StartedAt)}
case kubecontainer.ContainerStateCreated:
// Don't assume the container as exited if it's still
// in the creation grace period.
if kubecontainer.InCreatedStateGracePeriod(cs) {
status.State.Waiting = &v1.ContainerStateWaiting{Reason: "ContainerCreating"}
break
}
// Treat containers in the "created" state as if they are exited.
// The pod workers are supposed start all containers it creates in
// one sync (syncPod) iteration. There should not be any normal
Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/kuberuntime/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"strconv"
"strings"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog"
Expand Down Expand Up @@ -141,6 +141,8 @@ func (m *kubeGenericRuntimeManager) getImageUser(image string) (*int64, string,

// isInitContainerFailed returns true if container has exited and exitcode is not zero
// or is in unknown state.
// it has been in created state for over a minute.

func isInitContainerFailed(status *kubecontainer.ContainerStatus) bool {
if status.State == kubecontainer.ContainerStateExited && status.ExitCode != 0 {
return true
Expand All @@ -150,6 +152,10 @@ func isInitContainerFailed(status *kubecontainer.ContainerStatus) bool {
return true
}

if status.State == kubecontainer.ContainerStateCreated {
return !kubecontainer.InCreatedStateGracePeriod(status)
}

return false
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,11 @@ func findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus)
return nil, nil, false
}

// container is created within last one minute, return not done.
if status.State == kubecontainer.ContainerStateCreated && kubecontainer.InCreatedStateGracePeriod(status) {
return nil, nil, false
}

if status.State == kubecontainer.ContainerStateExited {
// all init containers successful
if i == (len(pod.Spec.InitContainers) - 1) {
Expand Down
20 changes: 14 additions & 6 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,11 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku
changes.KillPod = true
} else {
// Always try to stop containers in unknown state first.
if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {
// A created state container is only returned if
// it has been in that state for over a minute.
// So always attempt to stop it.
if initLastStatus != nil && (initLastStatus.State == kubecontainer.ContainerStateUnknown ||
initLastStatus.State == kubecontainer.ContainerStateCreated) {
changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{
name: next.Name,
container: next,
Expand All @@ -564,24 +568,28 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku
// Call internal container post-stop lifecycle hook for any non-running container so that any
// allocated cpus are released immediately. If the container is restarted, cpus will be re-allocated
// to it.
if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning {
if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning && !kubecontainer.InCreatedStateGracePeriod(containerStatus) {
if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil {
klog.Errorf("internal container post-stop lifecycle hook failed for container %v in pod %v with error %v",
container.Name, pod.Name, err)
}
}

// If container does not exist, or is not running, check whether we
// If container does not exist, or is not running, or has been
// in created state for over a minute, check whether we
// need to restart it.
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
if containerStatus == nil || (containerStatus.State != kubecontainer.ContainerStateRunning && !kubecontainer.InCreatedStateGracePeriod(containerStatus)) {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
klog.V(3).Infof(message)
changes.ContainersToStart = append(changes.ContainersToStart, idx)
if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown {
if containerStatus != nil && (containerStatus.State == kubecontainer.ContainerStateUnknown || containerStatus.State == kubecontainer.ContainerStateCreated) {
// If container is in unknown state, we don't know whether it
// is actually running or not, always try killing it before
// restart to avoid having 2 running instances of the same container.
// Similarly, kill created containers
// that haven't started for over a
// minute.
changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
name: containerStatus.Name,
container: &pod.Spec.Containers[idx],
Expand All @@ -592,7 +600,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku
}
continue
}
// The container is running, but kill the container if any of the following condition is met.
// The container is running or is in created state, but kill the container if any of the following condition is met.
var message string
restart := shouldRestartOnFailure(pod)
if _, _, changed := containerChanged(&container, containerStatus); changed {
Expand Down
28 changes: 25 additions & 3 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"sync"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -75,6 +75,7 @@ type PodWorkers interface {
UpdatePod(options *UpdatePodOptions)
ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty)
ForgetWorker(uid types.UID)
WaitWorkerExit(uid types.UID)
}

// syncPodOptions provides the arguments to a SyncPod operation.
Expand Down Expand Up @@ -120,6 +121,9 @@ type podWorkers struct {
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions

// Track the worker stop
workerDone map[types.UID]chan struct{}

workQueue queue.WorkQueue

// This function is run to sync the desired stated of pod.
Expand All @@ -146,6 +150,7 @@ func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQ
podUpdates: map[types.UID]chan UpdatePodOptions{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
workerDone: map[types.UID]chan struct{}{},
syncPodFn: syncPodFn,
recorder: recorder,
workQueue: workQueue,
Expand Down Expand Up @@ -213,14 +218,18 @@ func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates

// Setup the worker exit channel
p.workerDone[uid] = make(chan struct{})

// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
go func() {
go func(doneCh chan<- struct{}) {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
close(doneCh)
}(p.workerDone[uid])
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
Expand Down Expand Up @@ -260,6 +269,19 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.
}
}

// WaitWorkerExit waits for an exited worker to stop
func (p *podWorkers) WaitWorkerExit(uid types.UID) {
p.podLock.Lock()
doneCh, ok := p.workerDone[uid]
if !ok {
p.podLock.Unlock()
return
}
delete(p.workerDone, uid)
p.podLock.Unlock()
<-doneCh
}

func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
// Requeue the last update if the last sync returned error.
switch {
Expand Down
32 changes: 31 additions & 1 deletion pkg/kubelet/pod_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"testing"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
Expand Down Expand Up @@ -62,6 +62,8 @@ func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]s

func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}

func (f *fakePodWorkers) WaitWorkerExit(uid types.UID) {}

type TestingInterface interface {
Errorf(format string, args ...interface{})
}
Expand Down Expand Up @@ -341,3 +343,31 @@ func TestKillPodNowFunc(t *testing.T) {
t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
}
}

func TestWaitWorkerExit(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
fakeRuntime := &containertest.FakeRuntime{}
fakeCache := containertest.NewFakeCache(fakeRuntime)

kubeletForRealWorkers := &simpleFakeKubelet{}
realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache)

pod := podWithUIDNameNs("dummyuid", "foo", "bar")
kubeletForRealWorkers.wg.Add(1)
realPodWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodUpdate,
})
kubeletForRealWorkers.wg.Wait()

realPodWorkers.ForgetWorker(pod.GetUID())
realPodWorkers.WaitWorkerExit(pod.GetUID())

realPodWorkers.podLock.Lock()
_, updateExists := realPodWorkers.podUpdates[pod.GetUID()]
_, doneExists := realPodWorkers.workerDone[pod.GetUID()]
realPodWorkers.podLock.Unlock()
if updateExists || doneExists {
t.Errorf("Pod worker not existed")
}
}