Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions pkg/gameservers/missing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import (
"k8s.io/client-go/tools/record"
)

// MissingPodController makes sure that any GameServer
// that isn't in a Scheduled or Unhealthy state and is missing a Pod is
// moved to Unhealthy.
// MissingPodController makes sure that any GameServer that isn't in a
// Scheduled or Unhealthy state and is either missing a Pod, or has a Pod in a
// Failed state, is moved to Unhealthy.
//
// It's possible that a GameServer is missing its associated pod due to
// unexpected controller downtime or if the Pod is deleted with no subsequent Delete event.
// Similarly, a Pod can reach the Failed phase (e.g. non-zero exit with SidecarContainers
// enabled) without a corresponding Unhealthy transition being triggered.
//
// Since resync on the controller is every 30 seconds, even if there is some time in which a GameServer
// is in a broken state, it will eventually move to Unhealthy, and get replaced (if in a Fleet).
Expand Down Expand Up @@ -94,9 +96,9 @@ func NewMissingPodController(health healthcheck.Handler,
if _, isDev := gs.GetDevAddress(); !isDev && !isBeforePodCreated(gs) && !gs.IsBeingDeleted() &&
!(gs.Status.State == agonesv1.GameServerStateUnhealthy) && !(gs.Status.State == agonesv1.GameServerStateError) {

// Only queue the Pod if there is an issue retrieving it. If it exists, don't queue it, since we know it's not missing.
// Enqueue if the pod is missing, not a game server pod, or has failed.
// If there was an error accessing the Kubernetes control plane then enqueue it, so it can be rechecked when the control plane comes back up.
if pod, err := c.podLister.Pods(gs.ObjectMeta.Namespace).Get(gs.ObjectMeta.Name); err != nil || !isGameServerPod(pod) {
if pod, err := c.podLister.Pods(gs.ObjectMeta.Namespace).Get(gs.ObjectMeta.Name); err != nil || !isGameServerPod(pod) || pod.Status.Phase == corev1.PodFailed {
c.workerqueue.Enqueue(gs)
}
}
Expand Down Expand Up @@ -132,16 +134,24 @@ func (c *MissingPodController) syncGameServer(ctx context.Context, key string) e
return nil
}

// check if the pod exists
// check if the pod exists and is healthy
podFailed := false
if pod, err := c.podLister.Pods(namespace).Get(name); err != nil {
if !k8serrors.IsNotFound(err) {
return errors.Wrapf(err, "error retrieving Pod %s from namespace %s", name, namespace)
}
} else if isGameServerPod(pod) {
// if the pod exists, all is well, and we can continue on our merry way.
return nil
if pod.Status.Phase != corev1.PodFailed {
// pod exists and is not failed - all is well
return nil
}
podFailed = true
}
if podFailed {
c.loggerForGameServerKey(key).Debug("Pod has failed. Moving GameServer to Unhealthy.")
} else {
c.loggerForGameServerKey(key).Debug("Pod is missing. Moving GameServer to Unhealthy.")
}
c.loggerForGameServerKey(key).Debug("Pod is missing. Moving GameServer to Unhealthy.")

gs, err := c.gameServerLister.GameServers(namespace).Get(name)
if err != nil {
Expand All @@ -165,6 +175,10 @@ func (c *MissingPodController) syncGameServer(ctx context.Context, key string) e
return errors.Wrap(err, "error updating GameServer to Unhealthy")
}

c.recorder.Event(gs, corev1.EventTypeWarning, string(gs.Status.State), "Pod is missing")
eventMessage := "Pod is missing"
if podFailed {
eventMessage = "Pod has failed"
}
c.recorder.Event(gs, corev1.EventTypeWarning, string(gs.Status.State), eventMessage)
return nil
}
15 changes: 15 additions & 0 deletions pkg/gameservers/missing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ func TestMissingPodControllerSyncGameServer(t *testing.T) {
},
},
},
"pod exists but is in Failed state": {
setup: func(gs *agonesv1.GameServer, pod *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) {
pod.Status.Phase = corev1.PodFailed
return gs, pod
},
expected: expected{
updated: true,
updateTests: func(t *testing.T, gs *agonesv1.GameServer) {
assert.Equal(t, agonesv1.GameServerStateUnhealthy, gs.Status.State)
},
postTests: func(t *testing.T, m agtesting.Mocks) {
agtesting.AssertEventContains(t, m.FakeRecorder.Events, "Warning Unhealthy Pod has failed")
},
},
},
}

for k, v := range fixtures {
Expand Down
18 changes: 18 additions & 0 deletions pkg/gameservers/succeeded.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ func NewSucceededController(health healthcheck.Handler,
},
})

// Recovery path: if a pod Succeeded event was missed (e.g. during controller restart),
// the GameServer informer resync will re-check the pod phase and re-enqueue.
_, _ = gameServers.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, newObj interface{}) {
gs := newObj.(*agonesv1.GameServer)
if _, isDev := gs.GetDevAddress(); isDev {
return
}
if gs.IsBeingDeleted() || agonesv1.TerminalGameServerStates[gs.Status.State] || isBeforePodCreated(gs) {
return
}
pod, err := c.podLister.Pods(gs.ObjectMeta.Namespace).Get(gs.ObjectMeta.Name)
if err == nil && isGameServerPod(pod) && pod.Status.Phase == corev1.PodSucceeded {
c.workerqueue.Enqueue(pod)
}
},
})

return c
}

Expand Down
103 changes: 103 additions & 0 deletions pkg/gameservers/succeeded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
package gameservers

import (
"context"
"testing"
"time"

agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
agtesting "agones.dev/agones/pkg/testing"
"github.com/heptiolabs/healthcheck"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)

func TestSucceededControllerSyncGameServer(t *testing.T) {
Expand Down Expand Up @@ -249,3 +253,102 @@ func TestSucceededControllerRun(t *testing.T) {
require.True(t, value)
}
}

// TestSucceededControllerRunGameServerResync verifies the recovery path: if a pod Succeeded
// event was missed, the GameServer informer resync will detect the pod phase and re-enqueue.
func TestSucceededControllerRunGameServerResync(t *testing.T) {
m := agtesting.NewMocks()
c := NewSucceededController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)

gs := &agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: newSingleContainerSpec(), Status: agonesv1.GameServerStatus{}}
gs.ApplyDefaults()
gs.Status.State = agonesv1.GameServerStateReady

pod, err := gs.Pod(agtesting.FakeAPIHooks{})
require.NoError(t, err)

received := make(chan string, 10)
c.workerqueue.SyncHandler = func(_ context.Context, name string) error {
received <- name
return nil
}

gsWatch := watch.NewFake()
podWatch := watch.NewFake()
m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(gsWatch, nil))
m.KubeClient.AddWatchReactor("pods", k8stesting.DefaultWatchReactor(podWatch, nil))

// Pod starts in Running state so the initial pod AddFunc does not enqueue
m.KubeClient.AddReactor("list", "pods", func(_ k8stesting.Action) (bool, runtime.Object, error) {
return true, &corev1.PodList{Items: []corev1.Pod{*pod}}, nil
})
m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, runtime.Object, error) {
return true, &agonesv1.GameServerList{Items: []agonesv1.GameServer{*gs}}, nil
})

ctx, cancel := agtesting.StartInformers(m, c.gameServerSynced, c.podSynced)
defer cancel()

go func() {
err := c.Run(ctx, 1)
assert.Nil(t, err)
}()

noChange := func(reason string) {
require.True(t, cache.WaitForCacheSync(ctx.Done(), c.gameServerSynced, c.podSynced))
select {
case <-received:
require.FailNow(t, "should not have run sync: "+reason)
default:
}
}

result := func() {
select {
case res := <-received:
require.Equal(t, "default/test", res)
case <-time.After(2 * time.Second):
require.FailNow(t, "timed out waiting for sync")
}
}

// Update pod to Succeeded via pod watch — updates the lister cache and triggers
// the existing pod UpdateFunc path (consumed here, not the focus of this test)
pod.Status.Phase = corev1.PodSucceeded
podWatch.Modify(pod.DeepCopy())
result()

// Pod lister now has a Succeeded pod. Test the GS resync path for various states:

// before pod is created — should not enqueue
gs.Status.State = agonesv1.GameServerStateStarting
gsWatch.Modify(gs.DeepCopy())
noChange("starting state")

// Ready — should enqueue
gs.Status.State = agonesv1.GameServerStateReady
gsWatch.Modify(gs.DeepCopy())
result()

// Allocated — should enqueue
gs.Status.State = agonesv1.GameServerStateAllocated
gsWatch.Modify(gs.DeepCopy())
result()

// terminal: Unhealthy — should not enqueue
gs.Status.State = agonesv1.GameServerStateUnhealthy
gsWatch.Modify(gs.DeepCopy())
noChange("unhealthy state")

// terminal: Shutdown — should not enqueue
gs.Status.State = agonesv1.GameServerStateShutdown
gsWatch.Modify(gs.DeepCopy())
noChange("shutdown state")

// dev GameServer — should not enqueue
gs.Status.State = agonesv1.GameServerStateReady
gs.ObjectMeta.Annotations[agonesv1.DevAddressAnnotation] = ipFixture
gsWatch.Modify(gs.DeepCopy())
noChange("dev server")
}
Loading