From 3835fbfdc736a6d4e0cd3861b7d3ea1ef8c3df23 Mon Sep 17 00:00:00 2001 From: "pingshan.wj" Date: Wed, 11 Mar 2026 15:36:03 +0800 Subject: [PATCH] feat(k8s): Add lifecycle feat in task schedule --- .../sandbox/v1alpha1/batchsandbox_types.go | 58 +++ .../sandbox/v1alpha1/zz_generated.deepcopy.go | 75 ++++ ...sandbox.opensandbox.io_batchsandboxes.yaml | 5 + kubernetes/config/manager/kustomization.yaml | 4 +- ...ndbox_v1alpha1_batchsandbox-lifecycle.yaml | 60 ++++ .../controller/batchsandbox_controller.go | 13 +- .../task_scheduling_strategy_default.go | 65 +++- .../internal/scheduler/default_scheduler.go | 13 + kubernetes/internal/scheduler/mock/types.go | 14 + kubernetes/internal/scheduler/types.go | 4 + .../task-executor/manager/task_manager.go | 74 +++- .../internal/task-executor/runtime/process.go | 150 +++++++- .../task-executor/runtime/process_test.go | 305 ++++++++++++++++ .../internal/task-executor/types/task.go | 40 +++ kubernetes/pkg/task-executor/types.go | 37 ++ kubernetes/test/e2e/e2e_test.go | 330 ++++++++++++++++++ ...tchsandbox-with-failing-prestart-task.yaml | 18 + .../batchsandbox-with-poststop-task.yaml | 17 + .../batchsandbox-with-prestart-task.yaml | 18 + ...tchsandbox-with-timeout-prestart-task.yaml | 19 + kubernetes/test/e2e_task/task_e2e_test.go | 260 ++++++++++++++ 21 files changed, 1555 insertions(+), 24 deletions(-) create mode 100644 kubernetes/config/samples/sandbox_v1alpha1_batchsandbox-lifecycle.yaml create mode 100644 kubernetes/test/e2e/testdata/batchsandbox-with-failing-prestart-task.yaml create mode 100644 kubernetes/test/e2e/testdata/batchsandbox-with-poststop-task.yaml create mode 100644 kubernetes/test/e2e/testdata/batchsandbox-with-prestart-task.yaml create mode 100644 kubernetes/test/e2e/testdata/batchsandbox-with-timeout-prestart-task.yaml diff --git a/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go b/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go index 234c1006..5cbcfca2 100644 --- a/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go +++ b/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go @@ -100,6 +100,10 @@ type BatchSandboxStatus struct { TaskPending int32 `json:"taskPending"` // TaskUnknown is the number of Unknown task TaskUnknown int32 `json:"taskUnknown"` + // TaskLastErrorMessage holds the most recent error message from a failed task. + // This includes lifecycle hook failures (e.g., ossfs mount timeout or error output). + // +optional + TaskLastErrorMessage string `json:"taskLastErrorMessage,omitempty"` } // +genclient @@ -145,6 +149,17 @@ type TaskTemplateSpec struct { Spec TaskSpec `json:"spec,omitempty"` } +// ExecMode defines where a process should be executed. +// +kubebuilder:validation:Enum=Local;Remote +type ExecMode string + +const ( + // ExecModeLocal executes in the task-executor container. + ExecModeLocal ExecMode = "Local" + // ExecModeRemote executes in the main container via nsenter. + ExecModeRemote ExecMode = "Remote" +) + type TaskSpec struct { // +optional Process *ProcessTask `json:"process,omitempty"` @@ -169,6 +184,49 @@ type ProcessTask struct { // WorkingDir task working directory. // +optional WorkingDir string `json:"workingDir,omitempty"` + // ExecMode controls where the process runs. + // Local: inside task-executor container. + // Remote: inside main container (via nsenter). + // +optional + // +kubebuilder:default=Local + ExecMode ExecMode `json:"execMode,omitempty"` + // Lifecycle defines actions to be executed before and after the main process. + // +optional + Lifecycle *ProcessLifecycle `json:"lifecycle,omitempty"` +} + +// ProcessLifecycle defines lifecycle hooks for a process. +type ProcessLifecycle struct { + // PreStart is executed before the main process starts. + // +optional + PreStart *LifecycleHandler `json:"preStart,omitempty"` + // PostStop is executed after the main process stops. + // +optional + PostStop *LifecycleHandler `json:"postStop,omitempty"` +} + +// LifecycleHandler defines a lifecycle action. +type LifecycleHandler struct { + // Exec specifies the action to take. + // +optional + Exec *ExecAction `json:"exec,omitempty"` + // ExecMode controls where the action runs. + // +optional + // +kubebuilder:default=Local + ExecMode ExecMode `json:"execMode,omitempty"` + // TimeoutSeconds is the maximum number of seconds the hook may run. + // If the hook does not complete within this time, it is killed and the + // enclosing operation (Start or Stop) is treated as failed. + // If not set or zero, there is no timeout. + // +optional + TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` +} + +// ExecAction describes a "run in container" action. +type ExecAction struct { + // Command is the command line to execute inside the container. + // +kubebuilder:validation:Required + Command []string `json:"command"` } // TaskStatus task status diff --git a/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go b/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go index 82cae84c..9d376241 100644 --- a/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go +++ b/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go @@ -165,6 +165,51 @@ func (in *CapacitySpec) DeepCopy() *CapacitySpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExecAction) DeepCopyInto(out *ExecAction) { + *out = *in + if in.Command != nil { + in, out := &in.Command, &out.Command + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExecAction. +func (in *ExecAction) DeepCopy() *ExecAction { + if in == nil { + return nil + } + out := new(ExecAction) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LifecycleHandler) DeepCopyInto(out *LifecycleHandler) { + *out = *in + if in.Exec != nil { + in, out := &in.Exec, &out.Exec + *out = new(ExecAction) + (*in).DeepCopyInto(*out) + } + if in.TimeoutSeconds != nil { + in, out := &in.TimeoutSeconds, &out.TimeoutSeconds + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LifecycleHandler. +func (in *LifecycleHandler) DeepCopy() *LifecycleHandler { + if in == nil { + return nil + } + out := new(LifecycleHandler) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Pool) DeepCopyInto(out *Pool) { *out = *in @@ -260,6 +305,31 @@ func (in *PoolStatus) DeepCopy() *PoolStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ProcessLifecycle) DeepCopyInto(out *ProcessLifecycle) { + *out = *in + if in.PreStart != nil { + in, out := &in.PreStart, &out.PreStart + *out = new(LifecycleHandler) + (*in).DeepCopyInto(*out) + } + if in.PostStop != nil { + in, out := &in.PostStop, &out.PostStop + *out = new(LifecycleHandler) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProcessLifecycle. +func (in *ProcessLifecycle) DeepCopy() *ProcessLifecycle { + if in == nil { + return nil + } + out := new(ProcessLifecycle) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ProcessTask) DeepCopyInto(out *ProcessTask) { *out = *in @@ -280,6 +350,11 @@ func (in *ProcessTask) DeepCopyInto(out *ProcessTask) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Lifecycle != nil { + in, out := &in.Lifecycle, &out.Lifecycle + *out = new(ProcessLifecycle) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProcessTask. diff --git a/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml b/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml index 72c43bdd..bd8cb41f 100644 --- a/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml +++ b/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml @@ -156,6 +156,11 @@ spec: description: TaskFailed is the number of Failed task format: int32 type: integer + taskLastErrorMessage: + description: |- + TaskLastErrorMessage holds the most recent error message from a failed task. + This includes lifecycle hook failures (e.g., ossfs mount timeout or error output). + type: string taskPending: description: TaskPending is the number of Pending task which is unassigned format: int32 diff --git a/kubernetes/config/manager/kustomization.yaml b/kubernetes/config/manager/kustomization.yaml index ae9eb8f4..fdcb79fa 100644 --- a/kubernetes/config/manager/kustomization.yaml +++ b/kubernetes/config/manager/kustomization.yaml @@ -4,8 +4,8 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: controller - newTag: dev + newName: example.com/sandbox-k8s + newTag: v0.0.1 - name: manager newName: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/controller newTag: v0.0.1 diff --git a/kubernetes/config/samples/sandbox_v1alpha1_batchsandbox-lifecycle.yaml b/kubernetes/config/samples/sandbox_v1alpha1_batchsandbox-lifecycle.yaml new file mode 100644 index 00000000..3576a295 --- /dev/null +++ b/kubernetes/config/samples/sandbox_v1alpha1_batchsandbox-lifecycle.yaml @@ -0,0 +1,60 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: BatchSandbox +metadata: + labels: + app.kubernetes.io/name: opensandbox + app.kubernetes.io/managed-by: kustomize + name: batchsandbox-lifecycle-sample + namespace: opensandbox +spec: + replicas: 1 + template: + metadata: + labels: + app: lifecycle-example + spec: + containers: + - name: main + image: registry.k8s.io/e2e-test-images/busybox:1.29-4 + command: + - tail + - -f + - /dev/null + expireTime: "2025-12-31T23:59:59Z" + taskTemplate: + spec: + process: + command: + - sleep + args: + - "30" + env: + - name: TASK_NAME + value: lifecycle-task + lifecycle: + # preStart hook runs before the main task process starts. + # Useful for preparing the environment, checking dependencies, warming up data, etc. + preStart: + exec: + command: + - /bin/sh + - -c + - | + echo "[preStart] Starting environment preparation..." + mkdir -p /tmp/workdir + echo "$(date): Task starting" > /tmp/workdir/prestart-marker + echo "[preStart] Environment ready!" + timeoutSeconds: 30 + # postStop hook runs after the main task process exits. + # Useful for cleaning up resources, uploading logs, sending notifications, etc. + postStop: + exec: + command: + - /bin/sh + - -c + - | + echo "[postStop] Starting cleanup..." + echo "$(date): Task finished" >> /tmp/workdir/poststop-marker + echo "Task logs:" && cat /tmp/workdir/* + echo "[postStop] Cleanup complete!" + timeoutSeconds: 30 diff --git a/kubernetes/internal/controller/batchsandbox_controller.go b/kubernetes/internal/controller/batchsandbox_controller.go index 6008d3cd..542d0dc4 100644 --- a/kubernetes/internal/controller/batchsandbox_controller.go +++ b/kubernetes/internal/controller/batchsandbox_controller.go @@ -368,6 +368,7 @@ func (r *BatchSandboxReconciler) scheduleTasks(ctx context.Context, tSch tasksch var ( running, failed, succeed, unknown int32 pending int32 + lastErrorMessage string ) for i := range len(tasks) { task := tasks[i] @@ -385,6 +386,10 @@ func (r *BatchSandboxReconciler) scheduleTasks(ctx context.Context, tSch tasksch succeed++ case taskscheduler.FailedTaskState: failed++ + // Capture the most recent error message to surface in status. + if msg := task.GetTerminatedMessage(); msg != "" { + lastErrorMessage = msg + } case taskscheduler.UnknownTaskState: unknown++ } @@ -405,8 +410,14 @@ func (r *BatchSandboxReconciler) scheduleTasks(ctx context.Context, tSch tasksch newStatus.TaskSucceed = succeed newStatus.TaskUnknown = unknown newStatus.TaskPending = pending + // Persist error message from the most recently failed task (e.g. lifecycle hook stderr). + // Only update when a new non-empty message is available, to avoid clearing a previously + // recorded error on subsequent reconcile cycles where the task may have been released. + if lastErrorMessage != "" { + newStatus.TaskLastErrorMessage = lastErrorMessage + } if !reflect.DeepEqual(newStatus, oldStatus) { - log.Info("To update BatchSandbox status", "replicas", newStatus.Replicas, "task_running", newStatus.TaskRunning, "task_succeed", newStatus.TaskSucceed, "task_failed", newStatus.TaskFailed, "task_unknown", newStatus.TaskUnknown, "task_pending", newStatus.TaskPending) + log.Info("To update BatchSandbox status", "replicas", newStatus.Replicas, "task_running", newStatus.TaskRunning, "task_succeed", newStatus.TaskSucceed, "task_failed", newStatus.TaskFailed, "task_unknown", newStatus.TaskUnknown, "task_pending", newStatus.TaskPending, "last_error", newStatus.TaskLastErrorMessage) if err := r.updateStatus(batchSbx, newStatus); err != nil { return err } diff --git a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go index 4cf02b63..8e655834 100644 --- a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go +++ b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go @@ -72,21 +72,58 @@ func (s *DefaultTaskSchedulingStrategy) getTaskSpec(idx int) (*api.Task, error) if err = json.Unmarshal(modified, newTaskTemplate); err != nil { return nil, fmt.Errorf("batchsandbox: failed to unmarshal %s to TaskTemplateSpec, idx %d, err %w", modified, idx, err) } - task.Process = &api.Process{ - Command: newTaskTemplate.Spec.Process.Command, - Args: newTaskTemplate.Spec.Process.Args, - Env: newTaskTemplate.Spec.Process.Env, - WorkingDir: newTaskTemplate.Spec.Process.WorkingDir, - TimeoutSeconds: s.Spec.TaskTemplate.Spec.TimeoutSeconds, - } + task.Process = convertProcessSpec(newTaskTemplate.Spec.Process, s.Spec.TaskTemplate.Spec.TimeoutSeconds) } else if s.Spec.TaskTemplate != nil && s.Spec.TaskTemplate.Spec.Process != nil { - task.Process = &api.Process{ - Command: s.Spec.TaskTemplate.Spec.Process.Command, - Args: s.Spec.TaskTemplate.Spec.Process.Args, - Env: s.Spec.TaskTemplate.Spec.Process.Env, - WorkingDir: s.Spec.TaskTemplate.Spec.Process.WorkingDir, - TimeoutSeconds: s.Spec.TaskTemplate.Spec.TimeoutSeconds, - } + task.Process = convertProcessSpec(s.Spec.TaskTemplate.Spec.Process, s.Spec.TaskTemplate.Spec.TimeoutSeconds) } return task, nil } + +// convertProcessSpec converts sandboxv1alpha1.ProcessTask to api.Process. +func convertProcessSpec(src *sandboxv1alpha1.ProcessTask, timeoutSeconds *int64) *api.Process { + if src == nil { + return nil + } + return &api.Process{ + Command: src.Command, + Args: src.Args, + Env: src.Env, + WorkingDir: src.WorkingDir, + TimeoutSeconds: timeoutSeconds, + ExecMode: api.ExecMode(src.ExecMode), + Lifecycle: convertLifecycle(src.Lifecycle), + } +} + +// convertLifecycle converts sandboxv1alpha1.ProcessLifecycle to api.ProcessLifecycle. +func convertLifecycle(src *sandboxv1alpha1.ProcessLifecycle) *api.ProcessLifecycle { + if src == nil { + return nil + } + return &api.ProcessLifecycle{ + PreStart: convertLifecycleHandler(src.PreStart), + PostStop: convertLifecycleHandler(src.PostStop), + } +} + +// convertLifecycleHandler converts sandboxv1alpha1.LifecycleHandler to api.LifecycleHandler. +func convertLifecycleHandler(src *sandboxv1alpha1.LifecycleHandler) *api.LifecycleHandler { + if src == nil { + return nil + } + return &api.LifecycleHandler{ + Exec: convertExecAction(src.Exec), + ExecMode: api.ExecMode(src.ExecMode), + TimeoutSeconds: src.TimeoutSeconds, + } +} + +// convertExecAction converts sandboxv1alpha1.ExecAction to api.ExecAction. +func convertExecAction(src *sandboxv1alpha1.ExecAction) *api.ExecAction { + if src == nil { + return nil + } + return &api.ExecAction{ + Command: src.Command, + } +} diff --git a/kubernetes/internal/scheduler/default_scheduler.go b/kubernetes/internal/scheduler/default_scheduler.go index a4bf818f..b3c78cc7 100644 --- a/kubernetes/internal/scheduler/default_scheduler.go +++ b/kubernetes/internal/scheduler/default_scheduler.go @@ -73,6 +73,19 @@ func (t *taskNode) IsResourceReleased() bool { return t.sState == stateReleased } +// GetTerminatedMessage returns the failure message from the task status, including +// lifecycle hook stderr output. It is used by the controller to populate +// BatchSandboxStatus.TaskLastErrorMessage. +func (t *taskNode) GetTerminatedMessage() string { + if t.Status == nil { + return "" + } + if t.Status.ProcessStatus != nil && t.Status.ProcessStatus.Terminated != nil { + return t.Status.ProcessStatus.Terminated.Message + } + return "" +} + func (t *taskNode) isTaskCompleted() bool { return t.tState == SucceedTaskState || t.tState == FailedTaskState } diff --git a/kubernetes/internal/scheduler/mock/types.go b/kubernetes/internal/scheduler/mock/types.go index 03940884..dc9a8125 100644 --- a/kubernetes/internal/scheduler/mock/types.go +++ b/kubernetes/internal/scheduler/mock/types.go @@ -90,3 +90,17 @@ func (mr *MockTaskMockRecorder) IsResourceReleased() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsResourceReleased", reflect.TypeOf((*MockTask)(nil).IsResourceReleased)) } + +// GetTerminatedMessage mocks base method. +func (m *MockTask) GetTerminatedMessage() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTerminatedMessage") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetTerminatedMessage indicates an expected call of GetTerminatedMessage. +func (mr *MockTaskMockRecorder) GetTerminatedMessage() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTerminatedMessage", reflect.TypeOf((*MockTask)(nil).GetTerminatedMessage)) +} diff --git a/kubernetes/internal/scheduler/types.go b/kubernetes/internal/scheduler/types.go index 6e7ef952..45a02058 100644 --- a/kubernetes/internal/scheduler/types.go +++ b/kubernetes/internal/scheduler/types.go @@ -21,6 +21,10 @@ type Task interface { // IsResourceReleased task resource is released // TODO func name is strange IsResourceReleased() bool + // GetTerminatedMessage returns a human-readable message when the task has + // reached a terminal failure state (e.g., a lifecycle hook error with stderr). + // Returns an empty string if there is no message or the task is not failed. + GetTerminatedMessage() string } type TaskState string diff --git a/kubernetes/internal/task-executor/manager/task_manager.go b/kubernetes/internal/task-executor/manager/task_manager.go index 9a00a772..05ba3424 100644 --- a/kubernetes/internal/task-executor/manager/task_manager.go +++ b/kubernetes/internal/task-executor/manager/task_manager.go @@ -118,9 +118,27 @@ func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task } if err := m.executor.Start(ctx, task); err != nil { - if delErr := m.store.Delete(ctx, task.Name); delErr != nil { - klog.ErrorS(delErr, "failed to rollback task creation", "name", task.Name) + // Persist the task in Failed state so the scheduler can observe the failure + // and surface it in BatchSandbox status, rather than silently discarding it. + reason := "StartFailed" + var startErr *types.StartError + if errors.As(err, &startErr) { + reason = startErr.Reason } + now := time.Now() + task.Status = types.Status{ + State: types.TaskStateFailed, + SubStatuses: []types.SubStatus{{ + Reason: reason, + Message: err.Error(), + ExitCode: 1, + FinishedAt: &now, + }}, + } + if updateErr := m.store.Update(ctx, task); updateErr != nil { + klog.ErrorS(updateErr, "failed to persist failed task status after Start error", "name", task.Name) + } + m.tasks[task.Name] = task return nil, fmt.Errorf("failed to start task: %w", err) } @@ -282,7 +300,25 @@ func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) er } if err := m.executor.Start(ctx, task); err != nil { - m.store.Delete(ctx, task.Name) + reason := "StartFailed" + var startErr *types.StartError + if errors.As(err, &startErr) { + reason = startErr.Reason + } + now := time.Now() + task.Status = types.Status{ + State: types.TaskStateFailed, + SubStatuses: []types.SubStatus{{ + Reason: reason, + Message: err.Error(), + ExitCode: 1, + FinishedAt: &now, + }}, + } + if updateErr := m.store.Update(ctx, task); updateErr != nil { + klog.ErrorS(updateErr, "failed to persist failed task status after Start error", "name", task.Name) + } + m.tasks[task.Name] = task return fmt.Errorf("failed to start task: %w", err) } @@ -380,6 +416,16 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { } state := status.State + // If the task is already in a terminal state (e.g., preStart hook failure set it + // to Failed) but Inspect returns non-terminal (e.g., Pending because no process + // was ever started), preserve the existing terminal state. Without this, the + // reconcile loop would overwrite the Failed status with Pending and prevent + // deletion of tasks that failed before the process launched. + if isTerminalState(task.Status.State) && !isTerminalState(state) { + state = task.Status.State + status = &task.Status + } + shouldStop := false stopReason := "" @@ -407,12 +453,32 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { klog.V(1).InfoS("task stop initiated", "name", taskName, "reason", stopReason) if err := m.executor.Stop(ctx, t); err != nil { klog.ErrorS(err, "failed to stop task", "name", taskName) + // Extract structured reason from StopError and annotate task status + reason := "StopFailed" + var stopErr *types.StopError + if errors.As(err, &stopErr) { + reason = stopErr.Reason + } + m.mu.Lock() + if existingTask, ok := m.tasks[taskName]; ok { + now := time.Now() + existingTask.Status.State = types.TaskStateFailed + existingTask.Status.SubStatuses = append(existingTask.Status.SubStatuses, types.SubStatus{ + Reason: reason, + Message: err.Error(), + FinishedAt: &now, + }) + if updateErr := m.store.Update(ctx, existingTask); updateErr != nil { + klog.ErrorS(updateErr, "failed to persist stop error status", "name", taskName) + } + } + m.mu.Unlock() } klog.InfoS("task stopped", "name", taskName) }(task, name) } - if task.DeletionTimestamp != nil && isTerminalState(state) { + if task.DeletionTimestamp != nil && isTerminalState(state) && !m.stopping[name] { klog.InfoS("task terminated, finalizing deletion", "name", name) tasksToDelete = append(tasksToDelete, name) } diff --git a/kubernetes/internal/task-executor/runtime/process.go b/kubernetes/internal/task-executor/runtime/process.go index 607854f5..ac8b0cf2 100644 --- a/kubernetes/internal/task-executor/runtime/process.go +++ b/kubernetes/internal/task-executor/runtime/process.go @@ -30,6 +30,7 @@ import ( "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/config" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/types" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/utils" + api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) const ( @@ -54,6 +55,19 @@ func (e *processExecutor) Start(ctx context.Context, task *types.Task) error { if task == nil { return fmt.Errorf("task cannot be nil") } + + // Execute preStart lifecycle hook if present + if task.Process != nil && task.Process.Lifecycle != nil && task.Process.Lifecycle.PreStart != nil { + klog.InfoS("Executing preStart lifecycle hook", "task", task.Name) + if err := e.execLifecycleHook(ctx, task, task.Process.Lifecycle.PreStart); err != nil { + return &types.StartError{ + Reason: types.ReasonPreStartHookFailed, + Message: fmt.Sprintf("preStart hook failed: %v", err), + } + } + klog.InfoS("preStart lifecycle hook completed", "task", task.Name) + } + taskDir, err := utils.SafeJoin(e.rootDir, task.Name) if err != nil { return fmt.Errorf("invalid task name: %w", err) @@ -77,7 +91,15 @@ func (e *processExecutor) Start(ctx context.Context, task *types.Task) error { var cmd *exec.Cmd - if e.config.EnableSidecarMode { + // Determine if we should use nsenter based on ExecMode or global config + useNsenter := e.config.EnableSidecarMode + if task.Process != nil && task.Process.ExecMode == api.ExecModeLocal { + useNsenter = false + } else if task.Process != nil && task.Process.ExecMode == api.ExecModeRemote { + useNsenter = true + } + + if useNsenter { targetPID, err := e.findPidByEnvVar("SANDBOX_MAIN_CONTAINER", e.config.MainContainerName) if err != nil { return fmt.Errorf("failed to resolve target PID: %w", err) @@ -112,7 +134,8 @@ func (e *processExecutor) Start(ctx context.Context, task *types.Task) error { return e.executeCommand(task, cmd, pidPath) } -// executeCommand handles log setup and process starting +// executeCommand handles log setup and process starting. +// Returns *types.StartError with ReasonProcessStartFailed on failure. func (e *processExecutor) executeCommand(task *types.Task, cmd *exec.Cmd, pidPath string) error { if task == nil || cmd == nil { return fmt.Errorf("task and cmd cannot be nil") @@ -157,7 +180,10 @@ func (e *processExecutor) executeCommand(task *types.Task, cmd *exec.Cmd, pidPat klog.ErrorS(err, "failed to start command", "name", task.Name) stdoutFile.Close() stderrFile.Close() - return fmt.Errorf("failed to start cmd: %w", err) + return &types.StartError{ + Reason: types.ReasonProcessStartFailed, + Message: fmt.Sprintf("failed to start cmd: %v", err), + } } // Write PID to file immediately (Host-side PID) @@ -286,6 +312,39 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types } func (e *processExecutor) Stop(ctx context.Context, task *types.Task) error { + var processErr, postStopErr error + + // Stop main process first + if err := e.stopMainProcess(ctx, task); err != nil { + klog.ErrorS(err, "Failed to stop main process", "task", task.Name) + processErr = &types.StopError{ + Reason: types.ReasonProcessStopFailed, + Message: fmt.Sprintf("failed to stop main process: %v", err), + } + } + + // Execute postStop lifecycle hook if present + if task.Process != nil && task.Process.Lifecycle != nil && task.Process.Lifecycle.PostStop != nil { + klog.InfoS("Executing postStop lifecycle hook", "task", task.Name) + if err := e.execLifecycleHook(ctx, task, task.Process.Lifecycle.PostStop); err != nil { + klog.ErrorS(err, "postStop hook failed", "task", task.Name) + postStopErr = &types.StopError{ + Reason: types.ReasonPostStopHookFailed, + Message: fmt.Sprintf("postStop hook failed: %v", err), + } + } else { + klog.InfoS("postStop lifecycle hook completed", "task", task.Name) + } + } + + // Return the first error encountered (process stop takes priority) + if processErr != nil { + return processErr + } + return postStopErr +} + +func (e *processExecutor) stopMainProcess(ctx context.Context, task *types.Task) error { taskDir, err := utils.SafeJoin(e.rootDir, task.Name) if err != nil { return fmt.Errorf("invalid task name: %w", err) @@ -373,6 +432,91 @@ func isProcessRunning(pid int) bool { return process.Signal(syscall.Signal(0)) == nil } +// execLifecycleHook executes a lifecycle hook (preStart or postStop). +// It returns a descriptive error that includes the stderr output on failure. +func (e *processExecutor) execLifecycleHook(ctx context.Context, task *types.Task, hook *api.LifecycleHandler) error { + if hook == nil || hook.Exec == nil { + return nil + } + + cmdList := hook.Exec.Command + if len(cmdList) == 0 { + return fmt.Errorf("empty command in lifecycle hook") + } + + safeCmdStr := shellEscape(cmdList) + + // Apply per-hook timeout when specified. + hookCtx := ctx + if hook.TimeoutSeconds != nil && *hook.TimeoutSeconds > 0 { + var cancel context.CancelFunc + hookCtx, cancel = context.WithTimeout(ctx, time.Duration(*hook.TimeoutSeconds)*time.Second) + defer cancel() + } + + // Determine execution mode + useNsenter := e.config.EnableSidecarMode + if hook.ExecMode == api.ExecModeLocal { + useNsenter = false + } else if hook.ExecMode == api.ExecModeRemote { + useNsenter = true + } + + var cmd *exec.Cmd + if useNsenter { + targetPID, err := e.findPidByEnvVar("SANDBOX_MAIN_CONTAINER", e.config.MainContainerName) + if err != nil { + return fmt.Errorf("failed to resolve target PID for lifecycle hook: %w", err) + } + + targetEnv, err := getProcEnviron(targetPID) + if err != nil { + return fmt.Errorf("failed to read target process environment: %w", err) + } + + nsenterArgs := []string{ + "-t", strconv.Itoa(targetPID), + "--mount", "--uts", "--ipc", "--net", "--pid", + "--", + "/bin/sh", "-c", safeCmdStr, + } + cmd = exec.CommandContext(hookCtx, "nsenter", nsenterArgs...) + cmd.Env = targetEnv + klog.InfoS("Executing lifecycle hook via nsenter", "task", task.Name, "targetPID", targetPID, "cmd", safeCmdStr) + } else { + cmd = exec.CommandContext(hookCtx, "/bin/sh", "-c", safeCmdStr) + cmd.Env = os.Environ() + klog.InfoS("Executing lifecycle hook locally", "task", task.Name, "cmd", safeCmdStr) + } + + // Create a new process group so that on timeout cancellation we can kill + // the entire tree (shell + children like sleep). Without this, + // CombinedOutput blocks waiting for orphaned child I/O pipes to close. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Cancel = func() error { + return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + } + cmd.WaitDelay = 3 * time.Second + + output, err := cmd.CombinedOutput() + if err != nil { + stderr := strings.TrimSpace(string(output)) + // When a per-hook timeout is configured and the hook context has been + // canceled (either DeadlineExceeded or Canceled propagated from the + // timeout), treat it as a timeout error. We avoid comparing with + // context.DeadlineExceeded directly because the Go exec package may + // wrap or race with the context error after cmd.Cancel fires. + if hook.TimeoutSeconds != nil && *hook.TimeoutSeconds > 0 && hookCtx.Err() != nil { + return fmt.Errorf("lifecycle hook timed out after %ds: %w; output: %s", + *hook.TimeoutSeconds, context.DeadlineExceeded, stderr) + } + return fmt.Errorf("lifecycle hook failed: %w; output: %s", err, stderr) + } + + klog.InfoS("Lifecycle hook executed successfully", "task", task.Name, "output", strings.TrimSpace(string(output))) + return nil +} + // shellEscape quotes arguments for safe shell execution func shellEscape(args []string) string { quoted := make([]string, len(args)) diff --git a/kubernetes/internal/task-executor/runtime/process_test.go b/kubernetes/internal/task-executor/runtime/process_test.go index 4040cb48..f2775d0a 100644 --- a/kubernetes/internal/task-executor/runtime/process_test.go +++ b/kubernetes/internal/task-executor/runtime/process_test.go @@ -16,6 +16,7 @@ package runtime import ( "context" + "errors" "os" "os/exec" "path/filepath" @@ -374,6 +375,196 @@ func TestProcessExecutor_TimeoutNotExceeded(t *testing.T) { assert.Equal(t, types.TaskStateSucceeded, status.State, "Task should be Succeeded, not Timeout") } +func TestProcessExecutor_PreStartHook(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + // Create a marker file path to verify preStart ran + markerFile := filepath.Join(pExecutor.rootDir, "prestart-marker") + + task := &types.Task{ + Name: "prestart-test", + Process: &api.Process{ + Command: []string{"echo", "main-process"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo prestart-executed > " + markerFile}, + }, + }, + }, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + // Start should execute preStart hook first, then main process + if err := executor.Start(ctx, task); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Wait for completion + time.Sleep(300 * time.Millisecond) + + // Verify preStart hook created the marker file + data, err := os.ReadFile(markerFile) + assert.Nil(t, err, "preStart hook should have created marker file") + assert.Contains(t, string(data), "prestart-executed") + + // Verify main process ran successfully + status, err := executor.Inspect(ctx, task) + assert.Nil(t, err) + assert.Equal(t, types.TaskStateSucceeded, status.State) +} + +func TestProcessExecutor_PreStartHookFailure(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + task := &types.Task{ + Name: "prestart-fail-test", + Process: &api.Process{ + Command: []string{"echo", "should-not-run"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "exit 1"}, + }, + }, + }, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + // Start should fail because preStart hook fails + err = executor.Start(ctx, task) + assert.NotNil(t, err, "Start should fail when preStart hook fails") + assert.Contains(t, err.Error(), "preStart hook failed") + + // Verify the error is a StartError with correct Reason + var startErr *types.StartError + assert.True(t, errors.As(err, &startErr), "error should be *types.StartError") + assert.Equal(t, types.ReasonPreStartHookFailed, startErr.Reason) + + // Main process should not have started (no pid file) + pidPath := filepath.Join(taskDir, PidFile) + _, err = os.ReadFile(pidPath) + assert.NotNil(t, err, "PID file should not exist when preStart hook fails") +} + +func TestProcessExecutor_PostStopHook(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + // Create a marker file path to verify postStop ran + markerFile := filepath.Join(pExecutor.rootDir, "poststop-marker") + + task := &types.Task{ + Name: "poststop-test", + Process: &api.Process{ + Command: []string{"/bin/sh", "-c", "sleep 10"}, + Lifecycle: &api.ProcessLifecycle{ + PostStop: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo poststop-executed > " + markerFile}, + }, + }, + }, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + // Start the task + if err := executor.Start(ctx, task); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Verify it's running + time.Sleep(100 * time.Millisecond) + status, err := executor.Inspect(ctx, task) + assert.Nil(t, err) + assert.Equal(t, types.TaskStateRunning, status.State) + + // Stop should execute postStop hook after main process stops + if err := executor.Stop(ctx, task); err != nil { + t.Fatalf("Stop failed: %v", err) + } + + // Verify postStop hook created the marker file + time.Sleep(200 * time.Millisecond) + data, err := os.ReadFile(markerFile) + assert.Nil(t, err, "postStop hook should have created marker file") + assert.Contains(t, string(data), "poststop-executed") +} + +func TestProcessExecutor_LifecycleExecModeLocal(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + // Even with EnableSidecarMode=true, ExecModeLocal should run locally + dataDir := t.TempDir() + cfg := &config.Config{ + DataDir: dataDir, + EnableSidecarMode: false, // Can't test nsenter without /proc, but verify Local works + } + executor, err := NewProcessExecutor(cfg) + assert.Nil(t, err) + ctx := context.Background() + + markerFile := filepath.Join(dataDir, "local-hook-marker") + + task := &types.Task{ + Name: "execmode-local-test", + Process: &api.Process{ + Command: []string{"echo", "main"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo local-hook > " + markerFile}, + }, + ExecMode: api.ExecModeLocal, + }, + }, + }, + } + + taskDir, err := utils.SafeJoin(dataDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + if err := executor.Start(ctx, task); err != nil { + t.Fatalf("Start failed: %v", err) + } + + time.Sleep(300 * time.Millisecond) + + // Verify hook ran locally + data, err := os.ReadFile(markerFile) + assert.Nil(t, err) + assert.Contains(t, string(data), "local-hook") +} + func TestProcessExecutor_NoTimeout(t *testing.T) { if _, err := exec.LookPath("sh"); err != nil { t.Skip("sh not found") @@ -410,3 +601,117 @@ func TestProcessExecutor_NoTimeout(t *testing.T) { // Cleanup executor.Stop(ctx, task) } + +func TestProcessExecutor_LifecycleHookTimeout(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + timeoutSec := int64(1) + task := &types.Task{ + Name: "hook-timeout-test", + Process: &api.Process{ + Command: []string{"echo", "main"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "sleep 30"}, + }, + TimeoutSeconds: &timeoutSec, + }, + }, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + start := time.Now() + err = executor.Start(ctx, task) + elapsed := time.Since(start) + + assert.NotNil(t, err, "Start should fail when preStart hook times out") + assert.Contains(t, err.Error(), "timed out", "Error should mention timeout") + // Should not take much longer than the 1s timeout + assert.Less(t, elapsed, 5*time.Second, "Should fail within a reasonable time after timeout") + + // Verify it's a StartError with PreStartHookFailed reason + var startErr *types.StartError + assert.True(t, errors.As(err, &startErr)) + assert.Equal(t, types.ReasonPreStartHookFailed, startErr.Reason) +} + +func TestProcessExecutor_LifecycleHookStderrCaptured(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + task := &types.Task{ + Name: "hook-stderr-test", + Process: &api.Process{ + Command: []string{"echo", "main"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo 'mount error: device busy' >&2; exit 1"}, + }, + }, + }, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + err = executor.Start(ctx, task) + assert.NotNil(t, err) + // The stderr output should be included in the error message + assert.Contains(t, err.Error(), "mount error: device busy", + "stderr from failed hook should be included in error message") +} + +func TestProcessExecutor_LifecycleHookTimeoutZero_NoDeadline(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + // TimeoutSeconds = 0 means no timeout — hook that finishes quickly should succeed. + zero := int64(0) + markerFile := filepath.Join(pExecutor.rootDir, "zero-timeout-marker") + task := &types.Task{ + Name: "hook-zero-timeout", + Process: &api.Process{ + Command: []string{"echo", "main"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo ok > " + markerFile}, + }, + TimeoutSeconds: &zero, + }, + }, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + assert.Nil(t, executor.Start(ctx, task)) + + time.Sleep(300 * time.Millisecond) + data, err := os.ReadFile(markerFile) + assert.Nil(t, err) + assert.Contains(t, string(data), "ok") +} diff --git a/kubernetes/internal/task-executor/types/task.go b/kubernetes/internal/task-executor/types/task.go index a4bb1ad4..bb2e4248 100644 --- a/kubernetes/internal/task-executor/types/task.go +++ b/kubernetes/internal/task-executor/types/task.go @@ -61,3 +61,43 @@ type Task struct { // Status is now a first-class citizen and persisted. Status Status `json:"status"` } + +// StartError is returned by Executor.Start() to carry a structured Reason so +// that the manager can persist an accurate SubStatus without guessing. +type StartError struct { + // Reason is a machine-readable token such as "PreStartHookFailed" or + // "ProcessStartFailed". + Reason string + // Message is a human-readable description, typically including stderr. + Message string +} + +func (e *StartError) Error() string { + return e.Message +} + +// Predefined Reason constants for StartError. +const ( + ReasonPreStartHookFailed = "PreStartHookFailed" + ReasonProcessStartFailed = "ProcessStartFailed" +) + +// StopError is returned by Executor.Stop() to carry a structured Reason so +// that the manager can persist an accurate SubStatus without guessing. +type StopError struct { + // Reason is a machine-readable token such as "ProcessStopFailed" or + // "PostStopHookFailed". + Reason string + // Message is a human-readable description, typically including stderr. + Message string +} + +func (e *StopError) Error() string { + return e.Message +} + +// Predefined Reason constants for StopError. +const ( + ReasonProcessStopFailed = "ProcessStopFailed" + ReasonPostStopHookFailed = "PostStopHookFailed" +) diff --git a/kubernetes/pkg/task-executor/types.go b/kubernetes/pkg/task-executor/types.go index 5a3ba5ab..0c70ed1e 100644 --- a/kubernetes/pkg/task-executor/types.go +++ b/kubernetes/pkg/task-executor/types.go @@ -32,6 +32,16 @@ type Task struct { PodStatus *corev1.PodStatus `json:"podStatus,omitempty"` } +// ExecMode defines where a process should be executed. +type ExecMode string + +const ( + // ExecModeLocal executes in the task-executor container. + ExecModeLocal ExecMode = "Local" + // ExecModeRemote executes in the main container via nsenter. + ExecModeRemote ExecMode = "Remote" +) + type Process struct { // Command command Command []string `json:"command"` @@ -43,6 +53,33 @@ type Process struct { WorkingDir string `json:"workingDir,omitempty"` // TimeoutSeconds process timeout seconds. TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` + // ExecMode controls where the process runs. + ExecMode ExecMode `json:"execMode,omitempty"` + // Lifecycle defines actions to be executed before and after the main process. + Lifecycle *ProcessLifecycle `json:"lifecycle,omitempty"` +} + +// ProcessLifecycle defines lifecycle hooks for a process. +type ProcessLifecycle struct { + PreStart *LifecycleHandler `json:"preStart,omitempty"` + PostStop *LifecycleHandler `json:"postStop,omitempty"` +} + +// LifecycleHandler defines a lifecycle action. +type LifecycleHandler struct { + Exec *ExecAction `json:"exec,omitempty"` + ExecMode ExecMode `json:"execMode,omitempty"` + // TimeoutSeconds is the maximum number of seconds the hook may run. + // If the hook does not complete within this time, it is killed and the + // enclosing operation (Start or Stop) is treated as failed. + // If not set or zero, there is no timeout. + // +optional + TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` +} + +// ExecAction describes a "run in container" action. +type ExecAction struct { + Command []string `json:"command"` } // ProcessStatus holds a possible state of process. diff --git a/kubernetes/test/e2e/e2e_test.go b/kubernetes/test/e2e/e2e_test.go index 298e5a28..c5f32ce1 100644 --- a/kubernetes/test/e2e/e2e_test.go +++ b/kubernetes/test/e2e/e2e_test.go @@ -1291,6 +1291,336 @@ var _ = Describe("Manager", Ordered, func() { os.Remove(poolFile) os.Remove(batchSandboxFile) }) + + It("should succeed with a preStart lifecycle hook", func() { + const poolName = "test-pool-prestart" + const batchSandboxName = "test-bs-prestart-ok" + const testNamespace = "default" + const replicas = 1 + + By("creating a Pool with task-executor sidecar") + poolYAML, err := renderTemplate("testdata/pool-with-task-executor.yaml", map[string]interface{}{ + "PoolName": poolName, + "Namespace": testNamespace, + "TaskExecutorImage": taskExecutorImage, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-prestart.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for Pool to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).NotTo(BeEmpty()) + }, 2*time.Minute).Should(Succeed()) + + By("creating a BatchSandbox with a preStart hook") + bsYAML, err := renderTemplate("testdata/batchsandbox-with-prestart-task.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": replicas, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-prestart-ok.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying task succeeds (preStart runs, then main process succeeds)") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.taskSucceed}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal(fmt.Sprintf("%d", replicas))) + }, 2*time.Minute).Should(Succeed()) + + By("verifying no task failures") + cmd = exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.taskFailed}") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + Expect(output).To(Equal("0")) + + By("cleaning up") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, _ = utils.Run(cmd) + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, _ = utils.Run(cmd) + }) + + It("should report preStart failure in BatchSandbox status with error message", func() { + const poolName = "test-pool-prestart-fail" + const batchSandboxName = "test-bs-prestart-fail" + const testNamespace = "default" + const replicas = 1 + + By("creating a Pool with task-executor sidecar") + poolYAML, err := renderTemplate("testdata/pool-with-task-executor.yaml", map[string]interface{}{ + "PoolName": poolName, + "Namespace": testNamespace, + "TaskExecutorImage": taskExecutorImage, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-prestart-fail.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for Pool to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).NotTo(BeEmpty()) + }, 2*time.Minute).Should(Succeed()) + + By("creating a BatchSandbox with a failing preStart hook") + bsYAML, err := renderTemplate("testdata/batchsandbox-with-failing-prestart-task.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": replicas, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-prestart-fail.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying task fails with PreStartHookFailed") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.taskFailed}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal(fmt.Sprintf("%d", replicas)), + "Task should be counted as failed") + }, 2*time.Minute).Should(Succeed()) + + By("verifying taskLastErrorMessage contains stderr output") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.taskLastErrorMessage}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(ContainSubstring("mount failed: device busy"), + "taskLastErrorMessage should contain the stderr from the failing hook") + }, 30*time.Second).Should(Succeed()) + + By("cleaning up") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, _ = utils.Run(cmd) + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, _ = utils.Run(cmd) + }) + + It("should report preStart timeout in BatchSandbox status", func() { + const poolName = "test-pool-prestart-timeout" + const batchSandboxName = "test-bs-prestart-timeout" + const testNamespace = "default" + const replicas = 1 + + By("creating a Pool with task-executor sidecar") + poolYAML, err := renderTemplate("testdata/pool-with-task-executor.yaml", map[string]interface{}{ + "PoolName": poolName, + "Namespace": testNamespace, + "TaskExecutorImage": taskExecutorImage, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-prestart-timeout.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for Pool to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).NotTo(BeEmpty()) + }, 2*time.Minute).Should(Succeed()) + + By("creating a BatchSandbox with a preStart hook that times out") + bsYAML, err := renderTemplate("testdata/batchsandbox-with-timeout-prestart-task.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": replicas, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-prestart-timeout.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying task fails") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.taskFailed}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal(fmt.Sprintf("%d", replicas))) + }, 2*time.Minute).Should(Succeed()) + + By("verifying taskLastErrorMessage contains timeout info") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.taskLastErrorMessage}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(ContainSubstring("timed out"), + "taskLastErrorMessage should indicate timeout") + }, 30*time.Second).Should(Succeed()) + + By("cleaning up") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, _ = utils.Run(cmd) + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, _ = utils.Run(cmd) + }) + + It("should execute postStop hook when task is stopped", func() { + const poolName = "test-pool-poststop" + const batchSandboxName = "test-bs-poststop" + const testNamespace = "default" + const replicas = 1 + + By("creating a Pool with task-executor sidecar") + poolYAML, err := renderTemplate("testdata/pool-with-task-executor.yaml", map[string]interface{}{ + "PoolName": poolName, + "Namespace": testNamespace, + "TaskExecutorImage": taskExecutorImage, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-poststop.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for Pool to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).NotTo(BeEmpty()) + }, 2*time.Minute).Should(Succeed()) + + By("creating a BatchSandbox with a long-running task and postStop hook") + bsYAML, err := renderTemplate("testdata/batchsandbox-with-poststop-task.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": replicas, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-poststop.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying task is running") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.status.taskRunning}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal(fmt.Sprintf("%d", replicas))) + }, 2*time.Minute).Should(Succeed()) + + By("finding the allocated pod") + var allocatedPod string + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.metadata.annotations.sandbox\\.opensandbox\\.io/alloc-status}") + allocStatusJSON, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(allocStatusJSON).NotTo(BeEmpty()) + + var allocStatus struct { + Pods []string `json:"pods"` + } + err = json.Unmarshal([]byte(allocStatusJSON), &allocStatus) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(allocStatus.Pods).NotTo(BeEmpty()) + allocatedPod = allocStatus.Pods[0] + }, 30*time.Second).Should(Succeed()) + + By("deleting the BatchSandbox to trigger task stop and postStop hook") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying BatchSandbox is deleted") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, err := utils.Run(cmd) + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(ContainSubstring("not found")) + }, 2*time.Minute).Should(Succeed()) + + By("verifying postStop hook executed by checking marker file in the task-executor container") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "exec", allocatedPod, "-n", testNamespace, + "-c", "task-executor", "--", "cat", "/tmp/poststop-marker") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred(), "postStop marker file should exist") + g.Expect(output).To(ContainSubstring("poststop-executed")) + }, 30*time.Second).Should(Succeed()) + + By("cleaning up Pool") + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, _ = utils.Run(cmd) + }) }) }) diff --git a/kubernetes/test/e2e/testdata/batchsandbox-with-failing-prestart-task.yaml b/kubernetes/test/e2e/testdata/batchsandbox-with-failing-prestart-task.yaml new file mode 100644 index 00000000..8a24d767 --- /dev/null +++ b/kubernetes/test/e2e/testdata/batchsandbox-with-failing-prestart-task.yaml @@ -0,0 +1,18 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: BatchSandbox +metadata: + name: {{.BatchSandboxName}} + namespace: {{.Namespace}} +spec: + replicas: {{.Replicas}} + poolRef: {{.PoolName}} + taskTemplate: + spec: + process: + command: ["echo"] + args: ["should not run"] + lifecycle: + preStart: + exec: + command: ["/bin/sh", "-c", "echo 'mount failed: device busy' >&2; exit 1"] + execMode: Local diff --git a/kubernetes/test/e2e/testdata/batchsandbox-with-poststop-task.yaml b/kubernetes/test/e2e/testdata/batchsandbox-with-poststop-task.yaml new file mode 100644 index 00000000..4e9e9d93 --- /dev/null +++ b/kubernetes/test/e2e/testdata/batchsandbox-with-poststop-task.yaml @@ -0,0 +1,17 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: BatchSandbox +metadata: + name: {{.BatchSandboxName}} + namespace: {{.Namespace}} +spec: + replicas: {{.Replicas}} + poolRef: {{.PoolName}} + taskTemplate: + spec: + process: + command: ["sleep", "3600"] + lifecycle: + postStop: + exec: + command: ["/bin/sh", "-c", "echo poststop-executed > /tmp/poststop-marker"] + execMode: Local diff --git a/kubernetes/test/e2e/testdata/batchsandbox-with-prestart-task.yaml b/kubernetes/test/e2e/testdata/batchsandbox-with-prestart-task.yaml new file mode 100644 index 00000000..706936b3 --- /dev/null +++ b/kubernetes/test/e2e/testdata/batchsandbox-with-prestart-task.yaml @@ -0,0 +1,18 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: BatchSandbox +metadata: + name: {{.BatchSandboxName}} + namespace: {{.Namespace}} +spec: + replicas: {{.Replicas}} + poolRef: {{.PoolName}} + taskTemplate: + spec: + process: + command: ["echo"] + args: ["Hello from task with preStart"] + lifecycle: + preStart: + exec: + command: ["/bin/sh", "-c", "echo prestart-ok"] + execMode: Local diff --git a/kubernetes/test/e2e/testdata/batchsandbox-with-timeout-prestart-task.yaml b/kubernetes/test/e2e/testdata/batchsandbox-with-timeout-prestart-task.yaml new file mode 100644 index 00000000..c90db19d --- /dev/null +++ b/kubernetes/test/e2e/testdata/batchsandbox-with-timeout-prestart-task.yaml @@ -0,0 +1,19 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: BatchSandbox +metadata: + name: {{.BatchSandboxName}} + namespace: {{.Namespace}} +spec: + replicas: {{.Replicas}} + poolRef: {{.PoolName}} + taskTemplate: + spec: + process: + command: ["echo"] + args: ["should not run"] + lifecycle: + preStart: + exec: + command: ["/bin/sh", "-c", "sleep 60"] + execMode: Local + timeoutSeconds: 3 diff --git a/kubernetes/test/e2e_task/task_e2e_test.go b/kubernetes/test/e2e_task/task_e2e_test.go index d2eea840..dddd1131 100644 --- a/kubernetes/test/e2e_task/task_e2e_test.go +++ b/kubernetes/test/e2e_task/task_e2e_test.go @@ -308,4 +308,264 @@ var _ = Describe("Task Executor E2E", Ordered, func() { }, 5*time.Second, 500*time.Millisecond).Should(BeNil()) }) }) + + // ===== Lifecycle Hook E2E Tests ===== + + Context("When creating a task with a successful preStart hook", func() { + taskName := "e2e-prestart-ok" + + It("should execute preStart before main process and succeed", func() { + By("Creating task with preStart that writes a marker file to shared volume") + task := &api.Task{ + Name: taskName, + Process: &api.Process{ + // Main process reads the marker created by preStart via shared volume + Command: []string{"cat", "/tmp/tasks/prestart-marker"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo prestart-ok > /tmp/tasks/prestart-marker"}, + }, + ExecMode: api.ExecModeLocal, + }, + }, + }, + } + _, err := client.Set(context.Background(), task) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for task to succeed") + Eventually(func(g Gomega) { + got, err := client.Get(context.Background()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(got).NotTo(BeNil()) + g.Expect(got.ProcessStatus).NotTo(BeNil()) + g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus) + g.Expect(got.ProcessStatus.Terminated.ExitCode).To(BeZero(), + "Main process should succeed because preStart created the marker file") + }, 15*time.Second, 1*time.Second).Should(Succeed()) + }) + + It("should be deletable", func() { + _, err := client.Set(context.Background(), nil) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() *api.Task { + got, _ := client.Get(context.Background()) + return got + }, 5*time.Second, 500*time.Millisecond).Should(BeNil()) + }) + }) + + Context("When creating a task with a failing preStart hook", func() { + taskName := "e2e-prestart-fail" + + It("should fail with PreStartHookFailed reason and include stderr", func() { + By("Creating task with preStart that exits with error") + task := &api.Task{ + Name: taskName, + Process: &api.Process{ + Command: []string{"echo", "should-not-run"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo 'mount failed: device busy' >&2; exit 1"}, + }, + ExecMode: api.ExecModeLocal, + }, + }, + }, + } + _, err := client.Set(context.Background(), task) + // Set may return error since the task fails immediately, or it may + // accept the task and report failure via status — both are valid. + if err != nil { + Expect(err.Error()).To(ContainSubstring("preStart hook failed")) + } + + By("Waiting for task to report failure with error details") + Eventually(func(g Gomega) { + got, err := client.Get(context.Background()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(got).NotTo(BeNil()) + g.Expect(got.ProcessStatus).NotTo(BeNil()) + g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus) + g.Expect(got.ProcessStatus.Terminated.ExitCode).NotTo(BeZero(), + "Task should have failed") + g.Expect(got.ProcessStatus.Terminated.Reason).To(Equal("PreStartHookFailed"), + "Reason should indicate preStart failure") + g.Expect(got.ProcessStatus.Terminated.Message).To(ContainSubstring("mount failed: device busy"), + "Message should contain stderr from the hook") + }, 10*time.Second, 1*time.Second).Should(Succeed()) + }) + + It("should be deletable", func() { + _, err := client.Set(context.Background(), nil) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() *api.Task { + got, _ := client.Get(context.Background()) + return got + }, 5*time.Second, 500*time.Millisecond).Should(BeNil()) + }) + }) + + Context("When creating a task with a preStart hook that times out", func() { + taskName := "e2e-prestart-timeout" + + It("should fail with timeout error", func() { + By("Creating task with preStart that hangs and a 2s timeout") + timeoutSec := int64(2) + task := &api.Task{ + Name: taskName, + Process: &api.Process{ + Command: []string{"echo", "should-not-run"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "sleep 60"}, + }, + ExecMode: api.ExecModeLocal, + TimeoutSeconds: &timeoutSec, + }, + }, + }, + } + + start := time.Now() + _, err := client.Set(context.Background(), task) + // Same as above: error may come inline or via status + _ = err + + By("Waiting for task to report timeout failure") + Eventually(func(g Gomega) { + got, err := client.Get(context.Background()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(got).NotTo(BeNil()) + g.Expect(got.ProcessStatus).NotTo(BeNil()) + g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus) + g.Expect(got.ProcessStatus.Terminated.Reason).To(Equal("PreStartHookFailed")) + g.Expect(got.ProcessStatus.Terminated.Message).To(ContainSubstring("timed out")) + }, 15*time.Second, 1*time.Second).Should(Succeed()) + + elapsed := time.Since(start) + Expect(elapsed).To(BeNumerically("<", 10*time.Second), + "Should not wait much longer than the 2s hook timeout") + }) + + It("should be deletable", func() { + _, err := client.Set(context.Background(), nil) + Expect(err).NotTo(HaveOccurred()) + Eventually(func() *api.Task { + got, _ := client.Get(context.Background()) + return got + }, 5*time.Second, 500*time.Millisecond).Should(BeNil()) + }) + }) + + Context("When creating a task with a postStop hook", func() { + taskName := "e2e-poststop-ok" + + It("should execute postStop when task is deleted", func() { + By("Creating a long-running task with postStop that writes a marker file") + task := &api.Task{ + Name: taskName, + Process: &api.Process{ + Command: []string{"sleep", "60"}, + Lifecycle: &api.ProcessLifecycle{ + PostStop: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo poststop-ok > /tmp/tasks/poststop-marker"}, + }, + ExecMode: api.ExecModeLocal, + }, + }, + }, + } + _, err := client.Set(context.Background(), task) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for task to be running") + Eventually(func(g Gomega) { + got, err := client.Get(context.Background()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(got).NotTo(BeNil()) + g.Expect(got.ProcessStatus).NotTo(BeNil()) + g.Expect(got.ProcessStatus.Running).NotTo(BeNil(), "Task status: %v", got.ProcessStatus) + }, 10*time.Second, 1*time.Second).Should(Succeed()) + + By("Deleting the task to trigger postStop") + _, err = client.Set(context.Background(), nil) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for task to be fully deleted") + Eventually(func() *api.Task { + got, _ := client.Get(context.Background()) + return got + }, 10*time.Second, 500*time.Millisecond).Should(BeNil()) + + By("Verifying postStop hook executed by checking marker file in executor container") + out, err := exec.Command("docker", "exec", ExecutorContainer, "cat", "/tmp/tasks/poststop-marker").CombinedOutput() + Expect(err).NotTo(HaveOccurred(), "postStop marker file should exist: %s", string(out)) + Expect(string(out)).To(ContainSubstring("poststop-ok")) + }) + }) + + Context("When creating a task with both preStart and postStop hooks", func() { + taskName := "e2e-lifecycle-both" + + It("should run preStart → main → postStop in order", func() { + By("Creating a long-running task where each stage appends to a log file") + task := &api.Task{ + Name: taskName, + Process: &api.Process{ + Command: []string{"/bin/sh", "-c", "echo step2-main >> /tmp/tasks/lifecycle-order.log; sleep 60"}, + Lifecycle: &api.ProcessLifecycle{ + PreStart: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo step1-prestart > /tmp/tasks/lifecycle-order.log"}, + }, + ExecMode: api.ExecModeLocal, + }, + PostStop: &api.LifecycleHandler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo step3-poststop >> /tmp/tasks/lifecycle-order.log"}, + }, + ExecMode: api.ExecModeLocal, + }, + }, + }, + } + _, err := client.Set(context.Background(), task) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for task to be running (preStart completed)") + Eventually(func(g Gomega) { + got, err := client.Get(context.Background()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(got).NotTo(BeNil()) + g.Expect(got.ProcessStatus).NotTo(BeNil()) + g.Expect(got.ProcessStatus.Running).NotTo(BeNil(), "Task status: %v", got.ProcessStatus) + }, 10*time.Second, 1*time.Second).Should(Succeed()) + + By("Verifying preStart and main have executed") + out, err := exec.Command("docker", "exec", ExecutorContainer, "cat", "/tmp/tasks/lifecycle-order.log").CombinedOutput() + Expect(err).NotTo(HaveOccurred()) + Expect(string(out)).To(ContainSubstring("step1-prestart")) + Expect(string(out)).To(ContainSubstring("step2-main")) + + By("Deleting the task to trigger postStop") + _, err = client.Set(context.Background(), nil) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for task to be fully deleted") + Eventually(func() *api.Task { + got, _ := client.Get(context.Background()) + return got + }, 10*time.Second, 500*time.Millisecond).Should(BeNil()) + + By("Verifying postStop hook executed") + out, err = exec.Command("docker", "exec", ExecutorContainer, "cat", "/tmp/tasks/lifecycle-order.log").CombinedOutput() + Expect(err).NotTo(HaveOccurred()) + Expect(string(out)).To(ContainSubstring("step3-poststop")) + }) + }) })