Skip to content

Implement Job Timeout and Lifecycle Management System #42

@prodigy

Description

@prodigy

User Story

As a platform administrator, I want to enforce job timeouts and automatic cleanup so that runaway jobs don't consume resources indefinitely and the system remains stable.

Architecture Reference

  • Container Layer: docs/02-system-components.md:824 (activeDeadlineSeconds: 7200)
  • Container Layer: docs/02-system-components.md:822 (ttlSecondsAfterFinished: 3600)
  • Job Manager: docs/02-system-components.md:173-220 (Job lifecycle states)

Key excerpt from docs/02-system-components.md:822-824:

ttlSecondsAfterFinished: 3600
backoffLimit: 3
activeDeadlineSeconds: 7200  # 2 hour timeout

Acceptance Criteria

  • Configurable job timeouts at multiple levels (global, org, job)
  • Graceful shutdown with cleanup opportunity before hard kill
  • Automatic cleanup of completed job resources
  • Dead job detection and recovery
  • Timeout escalation (soft → hard → force)
  • Job lifecycle hooks for custom cleanup logic
  • Orphaned resource detection and cleanup
  • Audit trail for all timeout and cleanup actions

Technical Implementation

1. Job Timeout Manager

package timeout

import (
    "context"
    "fmt"
    "sync"
    "time"
    
    batch "k8s.io/api/batch/v1"
    v1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
)

type TimeoutManager struct {
    k8sClient     kubernetes.Interface
    jobTracker    *JobTracker
    cleanupQueue  chan CleanupTask
    mu            sync.RWMutex
    timeoutRules  map[string]TimeoutPolicy
}

type TimeoutPolicy struct {
    GlobalDefault    time.Duration
    OrganizationMax  map[string]time.Duration
    JobTypeOverrides map[string]time.Duration
    GracePeriod      time.Duration
    EscalationSteps  []EscalationStep
}

type EscalationStep struct {
    After    time.Duration
    Action   TimeoutAction
    Message  string
}

type TimeoutAction string

const (
    ActionWarn        TimeoutAction = "warn"
    ActionSoftKill    TimeoutAction = "soft_kill"
    ActionHardKill    TimeoutAction = "hard_kill"
    ActionForceDelete TimeoutAction = "force_delete"
)

func (tm *TimeoutManager) StartTimeoutMonitor(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            tm.checkJobTimeouts(ctx)
        }
    }
}

func (tm *TimeoutManager) checkJobTimeouts(ctx context.Context) {
    jobs, err := tm.getActiveJobs(ctx)
    if err != nil {
        log.Errorf("Failed to get active jobs: %v", err)
        return
    }

    now := time.Now()
    
    for _, job := range jobs {
        jobInfo := tm.jobTracker.GetJobInfo(job.Name)
        if jobInfo == nil {
            continue
        }

        // Calculate effective timeout
        timeout := tm.calculateEffectiveTimeout(jobInfo)
        elapsed := now.Sub(jobInfo.StartTime)

        if elapsed > timeout {
            tm.handleTimeout(ctx, jobInfo, elapsed, timeout)
        } else {
            // Check for escalation steps
            tm.checkEscalation(ctx, jobInfo, elapsed, timeout)
        }
    }
}

func (tm *TimeoutManager) calculateEffectiveTimeout(job *JobInfo) time.Duration {
    // Priority order: Job-specific > Job Type > Organization > Global
    if job.TimeoutOverride > 0 {
        return job.TimeoutOverride
    }

    if timeout, ok := tm.timeoutRules.JobTypeOverrides[job.Type]; ok {
        return timeout
    }

    if timeout, ok := tm.timeoutRules.OrganizationMax[job.OrganizationID]; ok {
        return timeout
    }

    return tm.timeoutRules.GlobalDefault
}

func (tm *TimeoutManager) handleTimeout(ctx context.Context, job *JobInfo, elapsed, timeout time.Duration) {
    log.Warnf("Job %s exceeded timeout: elapsed=%v, timeout=%v", job.ID, elapsed, timeout)

    // Create timeout event
    event := TimeoutEvent{
        JobID:     job.ID,
        Elapsed:   elapsed,
        Timeout:   timeout,
        Timestamp: time.Now(),
    }

    // Determine action based on how much we've exceeded
    excess := elapsed - timeout
    
    if excess < tm.timeoutRules.GracePeriod {
        // Soft termination - send SIGTERM
        tm.softTerminateJob(ctx, job, event)
    } else if excess < tm.timeoutRules.GracePeriod*2 {
        // Hard termination - send SIGKILL
        tm.hardTerminateJob(ctx, job, event)
    } else {
        // Force delete - remove from Kubernetes
        tm.forceDeleteJob(ctx, job, event)
    }
}

func (tm *TimeoutManager) softTerminateJob(ctx context.Context, job *JobInfo, event TimeoutEvent) error {
    log.Infof("Soft terminating job %s", job.ID)
    
    // Send termination signal to container
    pods, err := tm.getJobPods(ctx, job.ID)
    if err != nil {
        return err
    }

    for _, pod := range pods {
        // Send SIGTERM to main container
        err := tm.execInPod(ctx, pod.Name, pod.Namespace, []string{
            "sh", "-c", "kill -TERM 1",
        })
        if err != nil {
            log.Warnf("Failed to send SIGTERM to pod %s: %v", pod.Name, err)
        }
    }

    // Update job status
    tm.updateJobStatus(ctx, job.ID, "terminating", "Exceeded timeout - graceful shutdown initiated")
    
    // Record event
    tm.recordTimeoutEvent(event, ActionSoftKill)
    
    return nil
}

func (tm *TimeoutManager) hardTerminateJob(ctx context.Context, job *JobInfo, event TimeoutEvent) error {
    log.Warnf("Hard terminating job %s", job.ID)
    
    // Delete pods with zero grace period
    pods, err := tm.getJobPods(ctx, job.ID)
    if err != nil {
        return err
    }

    deleteOptions := metav1.DeleteOptions{
        GracePeriodSeconds: int64Ptr(0),
    }

    for _, pod := range pods {
        err := tm.k8sClient.CoreV1().Pods(pod.Namespace).Delete(
            ctx, pod.Name, deleteOptions,
        )
        if err != nil {
            log.Errorf("Failed to delete pod %s: %v", pod.Name, err)
        }
    }

    // Update job status
    tm.updateJobStatus(ctx, job.ID, "killed", "Exceeded timeout - forcefully terminated")
    
    // Record event
    tm.recordTimeoutEvent(event, ActionHardKill)
    
    // Schedule cleanup
    tm.scheduleCleanup(job.ID, 5*time.Minute)
    
    return nil
}

// Graceful shutdown handler for containers
func (tm *TimeoutManager) installShutdownHandler() string {
    return `
#!/bin/bash
# Graceful shutdown handler

cleanup() {
    echo "Received termination signal, starting cleanup..."
    
    # Save current work
    if [ -d /workspace/repo ]; then
        cd /workspace/repo
        git add -A
        git commit -m "WIP: Job terminated - saving progress" || true
        git push origin HEAD:refs/heads/flowforge-wip-$JOB_ID || true
    fi
    
    # Upload partial artifacts
    if [ -d /artifacts ]; then
        tar czf /tmp/partial-artifacts.tar.gz /artifacts/
        # Upload to S3
        aws s3 cp /tmp/partial-artifacts.tar.gz \
            s3://$ARTIFACT_BUCKET/partial/$JOB_ID/artifacts.tar.gz
    fi
    
    # Signal cleanup complete
    echo "Cleanup completed"
    exit 0
}

trap cleanup SIGTERM SIGINT

# Run main process
"$@" &
PID=$!

# Wait for process or signal
wait $PID
EXIT_CODE=$?

# Normal exit
exit $EXIT_CODE
`
}

2. Job Lifecycle Manager

package lifecycle

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type LifecycleManager struct {
    k8sClient        kubernetes.Interface
    db               *sql.DB
    cleanupWorkers   int
    retentionPolicy  RetentionPolicy
    hooks            map[LifecycleStage][]LifecycleHook
}

type LifecycleStage string

const (
    StagePreStart     LifecycleStage = "pre_start"
    StageRunning      LifecycleStage = "running"
    StageCompleting   LifecycleStage = "completing"
    StageCompleted    LifecycleStage = "completed"
    StageFailed       LifecycleStage = "failed"
    StageTerminating  LifecycleStage = "terminating"
    StageCleanup      LifecycleStage = "cleanup"
)

type LifecycleHook func(context.Context, *Job) error

type RetentionPolicy struct {
    SuccessfulJobs   time.Duration
    FailedJobs       time.Duration
    TerminatedJobs   time.Duration
    ArtifactRetention map[string]time.Duration
}

func (lm *LifecycleManager) ManageJobLifecycle(ctx context.Context, jobID string) error {
    job, err := lm.getJob(jobID)
    if err != nil {
        return err
    }

    // Execute pre-start hooks
    if err := lm.executeHooks(ctx, job, StagePreStart); err != nil {
        return fmt.Errorf("pre-start hooks failed: %w", err)
    }

    // Monitor job throughout lifecycle
    go lm.monitorJob(ctx, job)

    return nil
}

func (lm *LifecycleManager) monitorJob(ctx context.Context, job *Job) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()

    var lastStage LifecycleStage
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            currentStage := lm.determineJobStage(ctx, job)
            
            if currentStage != lastStage {
                log.Infof("Job %s transitioned from %s to %s", 
                    job.ID, lastStage, currentStage)
                
                // Execute stage transition hooks
                if err := lm.executeHooks(ctx, job, currentStage); err != nil {
                    log.Errorf("Hook execution failed for job %s: %v", job.ID, err)
                }
                
                lastStage = currentStage
            }
            
            // Check if job reached terminal state
            if lm.isTerminalStage(currentStage) {
                lm.scheduleCleanup(job, currentStage)
                return
            }
        }
    }
}

func (lm *LifecycleManager) scheduleCleanup(job *Job, finalStage LifecycleStage) {
    // Determine retention period based on final stage
    var retention time.Duration
    
    switch finalStage {
    case StageCompleted:
        retention = lm.retentionPolicy.SuccessfulJobs
    case StageFailed:
        retention = lm.retentionPolicy.FailedJobs
    case StageTerminating:
        retention = lm.retentionPolicy.TerminatedJobs
    default:
        retention = 24 * time.Hour // Default
    }

    cleanupTime := time.Now().Add(retention)
    
    task := CleanupTask{
        JobID:        job.ID,
        ScheduledAt:  cleanupTime,
        ResourceType: "job",
        Stage:        finalStage,
    }
    
    if err := lm.scheduleCleanupTask(task); err != nil {
        log.Errorf("Failed to schedule cleanup for job %s: %v", job.ID, err)
    }
}

// TTL-based cleanup implementation
func (lm *LifecycleManager) RunTTLCleanup(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            lm.cleanupExpiredJobs(ctx)
        }
    }
}

func (lm *LifecycleManager) cleanupExpiredJobs(ctx context.Context) {
    // Get all completed jobs with TTL
    jobs, err := lm.k8sClient.BatchV1().Jobs("").List(ctx, metav1.ListOptions{
        FieldSelector: "status.successful>0",
    })
    if err != nil {
        log.Errorf("Failed to list jobs: %v", err)
        return
    }

    now := time.Now()
    
    for _, job := range jobs.Items {
        // Check if job has TTL annotation
        ttlStr, ok := job.Annotations["flowforge.io/ttl-seconds-after-finished"]
        if !ok {
            continue
        }

        ttl, err := time.ParseDuration(ttlStr + "s")
        if err != nil {
            continue
        }

        // Check completion time
        var completionTime time.Time
        for _, condition := range job.Status.Conditions {
            if condition.Type == batch.JobComplete || condition.Type == batch.JobFailed {
                completionTime = condition.LastTransitionTime.Time
                break
            }
        }

        if completionTime.IsZero() {
            continue
        }

        if now.Sub(completionTime) > ttl {
            log.Infof("Cleaning up expired job %s (TTL: %v)", job.Name, ttl)
            
            // Delete the job
            deletePolicy := metav1.DeletePropagationBackground
            err := lm.k8sClient.BatchV1().Jobs(job.Namespace).Delete(
                ctx, job.Name, metav1.DeleteOptions{
                    PropagationPolicy: &deletePolicy,
                },
            )
            if err != nil {
                log.Errorf("Failed to delete job %s: %v", job.Name, err)
            }
        }
    }
}

3. Orphaned Resource Detector

package cleanup

import (
    "context"
    "fmt"
    "strings"
    "time"
)

type OrphanedResourceDetector struct {
    k8sClient       kubernetes.Interface
    storageClient   StorageClient
    db              *sql.DB
    checkInterval   time.Duration
    resourceTypes   []ResourceType
}

type ResourceType struct {
    Name      string
    Detector  func(context.Context) ([]OrphanedResource, error)
    Cleaner   func(context.Context, OrphanedResource) error
}

type OrphanedResource struct {
    Type       string
    Name       string
    Namespace  string
    JobID      string
    CreatedAt  time.Time
    LastSeen   time.Time
    Reason     string
}

func (ord *OrphanedResourceDetector) StartDetection(ctx context.Context) {
    ticker := time.NewTicker(ord.checkInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            ord.detectAndCleanOrphans(ctx)
        }
    }
}

func (ord *OrphanedResourceDetector) detectAndCleanOrphans(ctx context.Context) {
    for _, resourceType := range ord.resourceTypes {
        orphans, err := resourceType.Detector(ctx)
        if err != nil {
            log.Errorf("Failed to detect orphaned %s: %v", resourceType.Name, err)
            continue
        }

        for _, orphan := range orphans {
            log.Warnf("Found orphaned %s: %s (job: %s, reason: %s)",
                orphan.Type, orphan.Name, orphan.JobID, orphan.Reason)

            // Clean up orphan
            if err := resourceType.Cleaner(ctx, orphan); err != nil {
                log.Errorf("Failed to clean orphaned %s %s: %v",
                    orphan.Type, orphan.Name, err)
            } else {
                ord.recordCleanup(orphan)
            }
        }
    }
}

func (ord *OrphanedResourceDetector) initializeDetectors() {
    ord.resourceTypes = []ResourceType{
        {
            Name:     "pods",
            Detector: ord.detectOrphanedPods,
            Cleaner:  ord.cleanOrphanedPod,
        },
        {
            Name:     "pvcs",
            Detector: ord.detectOrphanedPVCs,
            Cleaner:  ord.cleanOrphanedPVC,
        },
        {
            Name:     "secrets",
            Detector: ord.detectOrphanedSecrets,
            Cleaner:  ord.cleanOrphanedSecret,
        },
        {
            Name:     "configmaps",
            Detector: ord.detectOrphanedConfigMaps,
            Cleaner:  ord.cleanOrphanedConfigMap,
        },
        {
            Name:     "s3-artifacts",
            Detector: ord.detectOrphanedS3Objects,
            Cleaner:  ord.cleanOrphanedS3Object,
        },
    }
}

func (ord *OrphanedResourceDetector) detectOrphanedPods(ctx context.Context) ([]OrphanedResource, error) {
    var orphans []OrphanedResource
    
    // List all pods in job namespaces
    pods, err := ord.k8sClient.CoreV1().Pods("flowforge-jobs").List(ctx, metav1.ListOptions{})
    if err != nil {
        return nil, err
    }

    // Get all active jobs
    activeJobs, err := ord.getActiveJobIDs(ctx)
    if err != nil {
        return nil, err
    }

    for _, pod := range pods.Items {
        jobID, ok := pod.Labels["job-id"]
        if !ok {
            continue
        }

        // Check if job exists
        if _, exists := activeJobs[jobID]; !exists {
            // Check age of pod
            age := time.Since(pod.CreationTimestamp.Time)
            
            if age > 1*time.Hour {
                orphans = append(orphans, OrphanedResource{
                    Type:      "pod",
                    Name:      pod.Name,
                    Namespace: pod.Namespace,
                    JobID:     jobID,
                    CreatedAt: pod.CreationTimestamp.Time,
                    LastSeen:  time.Now(),
                    Reason:    "Job no longer exists",
                })
            }
        }
    }

    return orphans, nil
}

func (ord *OrphanedResourceDetector) detectOrphanedS3Objects(ctx context.Context) ([]OrphanedResource, error) {
    var orphans []OrphanedResource
    
    // List all objects with job prefix
    objects, err := ord.storageClient.ListObjectsWithPrefix(ctx, "jobs/")
    if err != nil {
        return nil, err
    }

    // Get all known jobs from database
    knownJobs, err := ord.getAllJobIDsFromDB(ctx)
    if err != nil {
        return nil, err
    }

    for _, obj := range objects {
        // Extract job ID from object key
        parts := strings.Split(obj.Key, "/")
        if len(parts) < 3 {
            continue
        }
        
        jobID := parts[2]
        
        if !knownJobs[jobID] {
            // Check object age
            if time.Since(obj.LastModified) > 7*24*time.Hour {
                orphans = append(orphans, OrphanedResource{
                    Type:      "s3-object",
                    Name:      obj.Key,
                    JobID:     jobID,
                    CreatedAt: obj.LastModified,
                    LastSeen:  time.Now(),
                    Reason:    "Job not found in database",
                })
            }
        }
    }

    return orphans, nil
}

4. Cleanup Coordinator

package cleanup

import (
    "context"
    "sync"
    "time"
)

type CleanupCoordinator struct {
    k8sClient       kubernetes.Interface
    storageClient   StorageClient
    db              *sql.DB
    workers         int
    taskQueue       chan CleanupTask
    inProgress      sync.Map
}

type CleanupTask struct {
    ID           string
    JobID        string
    ResourceType string
    ResourceName string
    Namespace    string
    ScheduledAt  time.Time
    Priority     int
    RetryCount   int
    MaxRetries   int
}

func (cc *CleanupCoordinator) StartWorkers(ctx context.Context) {
    for i := 0; i < cc.workers; i++ {
        go cc.worker(ctx, i)
    }

    // Start task scheduler
    go cc.taskScheduler(ctx)
}

func (cc *CleanupCoordinator) worker(ctx context.Context, workerID int) {
    log.Infof("Cleanup worker %d started", workerID)
    
    for {
        select {
        case <-ctx.Done():
            return
        case task := <-cc.taskQueue:
            cc.processTask(ctx, task, workerID)
        }
    }
}

func (cc *CleanupCoordinator) processTask(ctx context.Context, task CleanupTask, workerID int) {
    log.Debugf("Worker %d processing cleanup task %s", workerID, task.ID)
    
    // Mark as in progress
    cc.inProgress.Store(task.ID, true)
    defer cc.inProgress.Delete(task.ID)
    
    // Execute cleanup based on resource type
    var err error
    
    switch task.ResourceType {
    case "job":
        err = cc.cleanupJob(ctx, task)
    case "pod":
        err = cc.cleanupPod(ctx, task)
    case "pvc":
        err = cc.cleanupPVC(ctx, task)
    case "secret":
        err = cc.cleanupSecret(ctx, task)
    case "artifacts":
        err = cc.cleanupArtifacts(ctx, task)
    default:
        log.Warnf("Unknown resource type: %s", task.ResourceType)
        return
    }

    if err != nil {
        log.Errorf("Cleanup failed for task %s: %v", task.ID, err)
        
        // Retry if applicable
        if task.RetryCount < task.MaxRetries {
            task.RetryCount++
            task.ScheduledAt = time.Now().Add(time.Duration(task.RetryCount) * 5 * time.Minute)
            cc.scheduleTask(task)
        }
    } else {
        log.Infof("Successfully cleaned up %s %s", task.ResourceType, task.ResourceName)
        cc.recordCleanupSuccess(task)
    }
}

func (cc *CleanupCoordinator) cleanupJob(ctx context.Context, task CleanupTask) error {
    // Delete Kubernetes job
    deletePolicy := metav1.DeletePropagationForeground
    err := cc.k8sClient.BatchV1().Jobs(task.Namespace).Delete(
        ctx, task.ResourceName, metav1.DeleteOptions{
            PropagationPolicy: &deletePolicy,
        },
    )
    if err != nil && !errors.IsNotFound(err) {
        return err
    }

    // Clean up associated resources
    if err := cc.cleanupJobResources(ctx, task.JobID); err != nil {
        log.Warnf("Failed to cleanup job resources: %v", err)
    }

    // Update database
    if err := cc.markJobCleaned(ctx, task.JobID); err != nil {
        log.Warnf("Failed to update job status: %v", err)
    }

    return nil
}

func (cc *CleanupCoordinator) cleanupJobResources(ctx context.Context, jobID string) error {
    var wg sync.WaitGroup
    errChan := make(chan error, 4)

    // Clean PVCs
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := cc.cleanupJobPVCs(ctx, jobID); err != nil {
            errChan <- fmt.Errorf("PVC cleanup failed: %w", err)
        }
    }()

    // Clean secrets
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := cc.cleanupJobSecrets(ctx, jobID); err != nil {
            errChan <- fmt.Errorf("secret cleanup failed: %w", err)
        }
    }()

    // Clean artifacts
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := cc.cleanupJobArtifacts(ctx, jobID); err != nil {
            errChan <- fmt.Errorf("artifact cleanup failed: %w", err)
        }
    }()

    // Clean logs
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := cc.cleanupJobLogs(ctx, jobID); err != nil {
            errChan <- fmt.Errorf("log cleanup failed: %w", err)
        }
    }()

    wg.Wait()
    close(errChan)

    // Collect errors
    var errs []error
    for err := range errChan {
        errs = append(errs, err)
    }

    if len(errs) > 0 {
        return fmt.Errorf("multiple cleanup errors: %v", errs)
    }

    return nil
}

5. Dead Job Recovery

package recovery

import (
    "context"
    "time"
)

type DeadJobRecovery struct {
    k8sClient      kubernetes.Interface
    db             *sql.DB
    jobManager     *JobManager
    checkInterval  time.Duration
    recoveryPolicy RecoveryPolicy
}

type RecoveryPolicy struct {
    MaxRecoveryAttempts int
    RecoveryTimeout     time.Duration
    BackoffMultiplier   float64
}

func (djr *DeadJobRecovery) StartRecoveryMonitor(ctx context.Context) {
    ticker := time.NewTicker(djr.checkInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            djr.checkDeadJobs(ctx)
        }
    }
}

func (djr *DeadJobRecovery) checkDeadJobs(ctx context.Context) {
    // Get jobs that should be running but haven't reported
    deadJobs, err := djr.findDeadJobs(ctx)
    if err != nil {
        log.Errorf("Failed to find dead jobs: %v", err)
        return
    }

    for _, job := range deadJobs {
        log.Warnf("Found dead job %s (last heartbeat: %v ago)",
            job.ID, time.Since(job.LastHeartbeat))

        if djr.shouldRecover(job) {
            if err := djr.recoverJob(ctx, job); err != nil {
                log.Errorf("Failed to recover job %s: %v", job.ID, err)
            }
        } else {
            // Mark as permanently failed
            djr.markJobFailed(ctx, job, "Job unresponsive - exceeded recovery attempts")
        }
    }
}

func (djr *DeadJobRecovery) recoverJob(ctx context.Context, job *DeadJob) error {
    log.Infof("Attempting to recover job %s (attempt %d/%d)",
        job.ID, job.RecoveryAttempts+1, djr.recoveryPolicy.MaxRecoveryAttempts)

    // Check if Kubernetes job still exists
    k8sJob, err := djr.getKubernetesJob(ctx, job.ID)
    if err != nil {
        // Job doesn't exist, recreate it
        return djr.recreateJob(ctx, job)
    }

    // Check pod status
    pods, err := djr.getJobPods(ctx, job.ID)
    if err != nil {
        return err
    }

    if len(pods) == 0 {
        // No pods, job might be stuck
        return djr.restartJob(ctx, k8sJob)
    }

    // Analyze pod issues
    for _, pod := range pods {
        if issue := djr.analyzePodIssue(pod); issue != nil {
            log.Warnf("Pod %s has issue: %v", pod.Name, issue)
            
            switch issue.Type {
            case "ImagePullBackOff":
                return djr.handleImagePullError(ctx, job, pod)
            case "CrashLoopBackOff":
                return djr.handleCrashLoop(ctx, job, pod)
            case "Pending":
                return djr.handlePendingPod(ctx, job, pod)
            }
        }
    }

    return nil
}

6. Audit Trail for Cleanup Actions

type CleanupAuditor struct {
    db *sql.DB
}

func (ca *CleanupAuditor) RecordCleanupAction(action CleanupAction) error {
    _, err := ca.db.Exec(`
        INSERT INTO cleanup_audit_log (
            id, timestamp, job_id, resource_type, resource_name,
            action, reason, initiated_by, success, error_message
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    `, 
        action.ID,
        action.Timestamp,
        action.JobID,
        action.ResourceType,
        action.ResourceName,
        action.Action,
        action.Reason,
        action.InitiatedBy,
        action.Success,
        action.ErrorMessage,
    )
    return err
}

type CleanupAction struct {
    ID           string
    Timestamp    time.Time
    JobID        string
    ResourceType string
    ResourceName string
    Action       string
    Reason       string
    InitiatedBy  string
    Success      bool
    ErrorMessage string
}

7. Job Configuration with Timeouts

apiVersion: batch/v1
kind: Job
metadata:
  name: claude-job-{id}
  annotations:
    flowforge.io/timeout-seconds: "7200"
    flowforge.io/grace-period-seconds: "300"
    flowforge.io/ttl-seconds-after-finished: "3600"
spec:
  ttlSecondsAfterFinished: 3600
  activeDeadlineSeconds: 7200
  backoffLimit: 3
  template:
    spec:
      restartPolicy: OnFailure
      containers:
      - name: worker
        image: flowforge/worker:latest
        env:
        - name: FLOWFORGE_TIMEOUT
          value: "7200"
        - name: FLOWFORGE_GRACE_PERIOD
          value: "300"
        lifecycle:
          preStop:
            exec:
              command:
              - /bin/sh
              - -c
              - |
                # Graceful shutdown script
                echo "Received termination signal"
                # Save state
                /app/save-state.sh
                # Wait for cleanup
                sleep 10

Dependencies

  • Kubernetes Job API with TTL controller
  • Time-based scheduling system
  • Persistent storage for audit logs
  • Monitoring system for dead job detection

Definition of Done

  • Configurable timeouts implemented at all levels
  • Graceful shutdown working with state preservation
  • TTL-based cleanup functioning correctly
  • Dead job detection and recovery tested
  • Orphaned resource cleanup automated
  • Audit trail complete for all actions
  • Performance impact on cluster minimal
  • Documentation includes timeout configuration guide

Effort Estimate

Story Points: 13 (Complex state management and edge case handling)

Labels

  • epic/container-execution
  • priority/high
  • component/lifecycle
  • size/xl
  • reliability

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions