Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 76 additions & 20 deletions app/pipeline/runner/engine/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,26 @@ func (e *kube) StartStep(rCtx context.Context, pCtx *pipeline.RunnerContext, ste
log := zerolog.Ctx(rCtx)
log.Trace().Msg("engine: starting step")

var backoff = wait.Backoff{
backoff := wait.Backoff{
Steps: 15,
Duration: 500 * time.Millisecond,
Factor: 1.0,
Jitter: 0.5,
}

//flag to determine if the step container found
// flag to determine if the step container found
found := false

err := retry.RetryOnConflict(backoff, func() error {
//TODO: Lock mutex?
// TODO: Lock mutex?
pod, err := e.client.CoreV1().Pods(e.nameSpace).Get(rCtx, pCtx.RunnerName, metav1.GetOptions{})
if err != nil {
return err
}

for i, container := range pod.Spec.Containers {
if container.Name != step.Name {
//TODO: what if container not found?
// TODO: what if container not found?
continue
}

Expand Down Expand Up @@ -205,31 +205,87 @@ func (e *kube) WaitStep(ctx context.Context, pCtx *pipeline.RunnerContext, step
return nil, err
}

pod, err := e.client.CoreV1().Pods(e.nameSpace).Get(ctx, pCtx.RunnerName, metav1.GetOptions{})
// Wait for the container to reach a terminated state
state, err := e.waitForContainerTermination(ctx, pCtx.RunnerName, step.Name)
if err != nil {
return nil, err
}

if len(pod.Status.ContainerStatuses) == 0 {
return nil, fmt.Errorf("no container status found in pod %s", pCtx.RunnerName)
return state, nil
}

// waitForContainerTermination waits for a container to reach terminated state
func (e *kube) waitForContainerTermination(ctx context.Context, podName string, containerName string) (*engine.State, error) {
log := zerolog.Ctx(ctx)

backoff := wait.Backoff{
Steps: 60, // More retry attempts for long image pulls
Duration: 2 * time.Second, // Start with 2 second intervals
Factor: 1.5, // Gradual increase
Jitter: 0.1,
Cap: 30 * time.Second, // Max 30 seconds between retries
}

for _, cs := range pod.Status.ContainerStatuses {
if cs.Name != step.Name {
continue
var finalState *engine.State

err := retry.RetryOnConflict(backoff, func() error {
pod, err := e.client.CoreV1().Pods(e.nameSpace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return err
}

if len(pod.Status.ContainerStatuses) == 0 {
log.Trace().Str("pod", podName).Msg("no container statuses yet, retrying...")
return fmt.Errorf("no container status found in pod %s", podName)
}

if cs.State.Terminated == nil {
return nil, fmt.Errorf("no terminated state found in container %s/%s", pCtx.RunnerName, step.Name)
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name != containerName {
continue
}

// Check for image pull errors - these are permanent failures
if isImagePullBackOffStateContainer(cs) || isInvalidImageNameContainer(cs) {
return wait.ErrorInterrupted(fmt.Errorf("could not pull image for container %s/%s", podName, containerName))
}

// Container is still waiting (e.g., pulling image)
if cs.State.Waiting != nil {
log.Trace().
Str("pod", podName).
Str("container", containerName).
Str("reason", cs.State.Waiting.Reason).
Msg("container still waiting, retrying...")
return fmt.Errorf("container %s/%s is still waiting: %s", podName, containerName, cs.State.Waiting.Reason)
}

// Container is running - keep waiting
if cs.State.Running != nil {
log.Trace().
Str("pod", podName).
Str("container", containerName).
Msg("container still running, retrying...")
return fmt.Errorf("container %s/%s is still running", podName, containerName)
}

// Container has terminated - success!
if cs.State.Terminated != nil {
finalState = &engine.State{
ExitCode: int(cs.State.Terminated.ExitCode),
Exited: true,
OOMKilled: cs.State.Terminated.Reason == "OOMKilled",
}
return nil
}
}
return &engine.State{
ExitCode: int(cs.State.Terminated.ExitCode),
Exited: true,
OOMKilled: false,
}, nil

return fmt.Errorf("no status found for container %s/%s", podName, containerName)
})
if err != nil {
return nil, err
}

return nil, fmt.Errorf("no status found for container %s/%s", pCtx.RunnerName, step.Name)
return finalState, nil
}

func (e *kube) Destroy(ctx context.Context, pCtx *pipeline.RunnerContext) error {
Expand All @@ -239,7 +295,7 @@ func (e *kube) Destroy(ctx context.Context, pCtx *pipeline.RunnerContext) error
}
}()

//delete secret
// delete secret
err := e.client.CoreV1().Secrets(e.nameSpace).Delete(ctx, pCtx.RunnerName, metav1.DeleteOptions{})
if err != nil {
return err
Expand All @@ -251,7 +307,7 @@ func (e *kube) Destroy(ctx context.Context, pCtx *pipeline.RunnerContext) error
func (e *kube) tailPod(rCtx context.Context, podName string) error {
log := zerolog.Ctx(rCtx)

var backoff = wait.Backoff{
backoff := wait.Backoff{
Steps: 10,
Duration: 1 * time.Second,
Factor: 2.0,
Expand Down
Loading