From 83d2daedda7cd05fcc196e28648a4502d57402fb Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Thu, 26 Feb 2026 15:08:09 +0000 Subject: [PATCH] Use Kubernetes CronJob for cron-based TaskSpawners Replace the custom cron polling logic with native Kubernetes CronJob scheduling. Previously, cron-based TaskSpawners ran a long-lived Deployment that polled on pollInterval and computed missed cron ticks. This was confusing because pollInterval and cron schedule served overlapping purposes (issue #138). Now, when spec.when.cron is set, the controller creates a CronJob instead of a Deployment. The CronJob runs on the cron schedule itself, executing the spawner binary in one-shot mode (--one-shot flag) which runs a single discovery cycle and exits. This eliminates the need for pollInterval for cron use cases and delegates scheduling to Kubernetes. Key changes: - Add --one-shot flag to axon-spawner binary for single-cycle execution - Add BuildCronJob method to DeploymentBuilder for creating CronJobs - Split controller reconciliation into reconcileDeployment (GitHub/Jira) and reconcileCronJob (cron) paths - Add status.cronJobName field to TaskSpawnerStatus - Add RBAC for batch/cronjobs and watch CronJobs in SetupWithManager - Add tests for CronJob builder, schedule updates, and suspend toggle Closes #430 Related: #138 Co-Authored-By: Claude Opus 4.6 --- api/v1alpha1/taskspawner_types.go | 6 + cmd/kelos-spawner/main.go | 18 +- docs/reference.md | 3 +- install-crd.yaml | 10 +- install.yaml | 1 + internal/controller/taskspawner_controller.go | 292 ++++++- .../taskspawner_deployment_builder.go | 118 ++- .../taskspawner_deployment_builder_test.go | 771 ++++++++++++++++++ internal/manifests/install-crd.yaml | 10 +- internal/manifests/install.yaml | 1 + test/e2e/framework/framework.go | 8 + test/e2e/taskspawner_test.go | 6 +- test/integration/taskspawner_test.go | 47 +- 13 files changed, 1231 insertions(+), 60 deletions(-) diff --git a/api/v1alpha1/taskspawner_types.go b/api/v1alpha1/taskspawner_types.go index c88b86e..210bcb0 100644 --- a/api/v1alpha1/taskspawner_types.go +++ b/api/v1alpha1/taskspawner_types.go @@ -256,9 +256,15 @@ type TaskSpawnerStatus struct { Phase TaskSpawnerPhase `json:"phase,omitempty"` // DeploymentName is the name of the Deployment running the spawner. + // Set for polling-based sources (GitHub Issues, Jira). // +optional DeploymentName string `json:"deploymentName,omitempty"` + // CronJobName is the name of the CronJob running the spawner. + // Set for cron-based sources. + // +optional + CronJobName string `json:"cronJobName,omitempty"` + // TotalDiscovered is the total number of work items discovered. // +optional TotalDiscovered int `json:"totalDiscovered,omitempty"` diff --git a/cmd/kelos-spawner/main.go b/cmd/kelos-spawner/main.go index 4306126..b53ee3e 100644 --- a/cmd/kelos-spawner/main.go +++ b/cmd/kelos-spawner/main.go @@ -41,6 +41,7 @@ func main() { var jiraBaseURL string var jiraProject string var jiraJQL string + var oneShot bool flag.StringVar(&name, "taskspawner-name", "", "Name of the TaskSpawner to manage") flag.StringVar(&namespace, "taskspawner-namespace", "", "Namespace of the TaskSpawner") @@ -51,6 +52,7 @@ func main() { flag.StringVar(&jiraBaseURL, "jira-base-url", "", "Jira instance base URL (e.g. https://mycompany.atlassian.net)") flag.StringVar(&jiraProject, "jira-project", "", "Jira project key") flag.StringVar(&jiraJQL, "jira-jql", "", "Optional JQL filter for Jira issues") + flag.BoolVar(&oneShot, "one-shot", false, "Run a single discovery cycle and exit (used by CronJob)") opts := zap.Options{Development: true} opts.BindFlags(flag.CommandLine) @@ -80,23 +82,31 @@ func main() { ctx := ctrl.SetupSignalHandler() key := types.NamespacedName{Name: name, Namespace: namespace} - log.Info("starting spawner", "taskspawner", key) + log.Info("Starting spawner", "taskspawner", key, "oneShot", oneShot) + + if oneShot { + if err := runCycle(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL); err != nil { + log.Error(err, "Discovery cycle failed") + os.Exit(1) + } + return + } for { if err := runCycle(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL); err != nil { - log.Error(err, "discovery cycle failed") + log.Error(err, "Discovery cycle failed") } // Re-read the TaskSpawner to get the current poll interval var ts kelosv1alpha1.TaskSpawner if err := cl.Get(ctx, key, &ts); err != nil { - log.Error(err, "unable to fetch TaskSpawner for poll interval") + log.Error(err, "Unable to fetch TaskSpawner for poll interval") sleepOrDone(ctx, 5*time.Minute) continue } interval := parsePollInterval(ts.Spec.PollInterval) - log.Info("sleeping until next cycle", "interval", interval) + log.Info("Sleeping until next cycle", "interval", interval) if done := sleepOrDone(ctx, interval); done { return } diff --git a/docs/reference.md b/docs/reference.md index ff3eaca..74728a6 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -130,7 +130,8 @@ The `promptTemplate` field uses Go `text/template` syntax. Available variables d | Field | Description | |-------|-------------| | `status.phase` | Current phase: `Pending`, `Running`, `Suspended`, or `Failed` | -| `status.deploymentName` | Name of the Deployment running the spawner | +| `status.deploymentName` | Name of the Deployment running the spawner (polling-based sources) | +| `status.cronJobName` | Name of the CronJob running the spawner (cron-based sources) | | `status.totalDiscovered` | Total number of items discovered from the source | | `status.totalTasksCreated` | Total number of Tasks created by this spawner | | `status.activeTasks` | Number of currently active (non-terminal) Tasks | diff --git a/install-crd.yaml b/install-crd.yaml index cb8b059..1f98cad 100644 --- a/install-crd.yaml +++ b/install-crd.yaml @@ -1233,9 +1233,15 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + cronJobName: + description: |- + CronJobName is the name of the CronJob running the spawner. + Set for cron-based sources. + type: string deploymentName: - description: DeploymentName is the name of the Deployment running - the spawner. + description: |- + DeploymentName is the name of the Deployment running the spawner. + Set for polling-based sources (GitHub Issues, Jira). type: string lastDiscoveryTime: description: LastDiscoveryTime is the last time the source was polled. diff --git a/install.yaml b/install.yaml index facfec6..fd67d92 100644 --- a/install.yaml +++ b/install.yaml @@ -74,6 +74,7 @@ rules: - apiGroups: - batch resources: + - cronjobs - jobs verbs: - create diff --git a/internal/controller/taskspawner_controller.go b/internal/controller/taskspawner_controller.go index 9f4e639..7d05163 100644 --- a/internal/controller/taskspawner_controller.go +++ b/internal/controller/taskspawner_controller.go @@ -6,6 +6,7 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -42,11 +43,17 @@ type TaskSpawnerReconciler struct { // +kubebuilder:rbac:groups=kelos.dev,resources=taskspawners/finalizers,verbs=update // +kubebuilder:rbac:groups=kelos.dev,resources=workspaces,verbs=get;list;watch // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create // +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=rolebindings,verbs=get;list;watch;create // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch +// isCronBased returns true if the TaskSpawner uses a cron schedule. +func isCronBased(ts *kelosv1alpha1.TaskSpawner) bool { + return ts.Spec.When.Cron != nil +} + // Reconcile handles TaskSpawner reconciliation. func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -76,6 +83,31 @@ func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{Requeue: true}, nil } + // Ensure ServiceAccount and RoleBinding exist in the namespace + if err := r.ensureSpawnerRBAC(ctx, ts.Namespace); err != nil { + logger.Error(err, "unable to ensure spawner RBAC") + return ctrl.Result{}, err + } + + isSuspended := ts.Spec.Suspend != nil && *ts.Spec.Suspend + + // Cron-based TaskSpawners use a CronJob instead of a Deployment. + if isCronBased(&ts) { + return r.reconcileCronJob(ctx, req, &ts, isSuspended) + } + + return r.reconcileDeployment(ctx, req, &ts, isSuspended) +} + +// reconcileDeployment handles the Deployment lifecycle for polling-based TaskSpawners. +func (r *TaskSpawnerReconciler) reconcileDeployment(ctx context.Context, req ctrl.Request, ts *kelosv1alpha1.TaskSpawner, isSuspended bool) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Clean up any existing CronJob from a previous cron-based configuration. + if err := r.deleteStaleResource(ctx, req.NamespacedName, &batchv1.CronJob{}, "CronJob"); err != nil { + return ctrl.Result{}, err + } + // Check if Deployment already exists var deploy appsv1.Deployment deployExists := true @@ -88,12 +120,6 @@ func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } - // Ensure ServiceAccount and RoleBinding exist in the namespace - if err := r.ensureSpawnerRBAC(ctx, ts.Namespace); err != nil { - logger.Error(err, "unable to ensure spawner RBAC") - return ctrl.Result{}, err - } - // Resolve workspace if workspaceRef is set in taskTemplate var workspace *kelosv1alpha1.WorkspaceSpec var isGitHubApp bool @@ -106,7 +132,7 @@ func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) }, &ws); err != nil { if apierrors.IsNotFound(err) { logger.Info("Workspace not found yet, requeuing", "workspace", workspaceRefName) - r.recordEvent(&ts, corev1.EventTypeNormal, "WorkspaceNotFound", "Workspace %s not found, requeuing", workspaceRefName) + r.recordEvent(ts, corev1.EventTypeNormal, "WorkspaceNotFound", "Workspace %s not found, requeuing", workspaceRefName) return ctrl.Result{RequeueAfter: 2 * time.Second}, nil } logger.Error(err, "Unable to fetch Workspace for TaskSpawner", "workspace", workspaceRefName) @@ -135,7 +161,6 @@ func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Determine desired replica count based on suspend state - isSuspended := ts.Spec.Suspend != nil && *ts.Spec.Suspend desiredReplicas := int32(1) if isSuspended { desiredReplicas = 0 @@ -143,11 +168,11 @@ func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Create Deployment if it doesn't exist if !deployExists { - return r.createDeployment(ctx, &ts, workspace, isGitHubApp, desiredReplicas) + return r.createDeployment(ctx, ts, workspace, isGitHubApp, desiredReplicas) } // Update Deployment if spec changed - if err := r.updateDeployment(ctx, &ts, &deploy, workspace, isGitHubApp, desiredReplicas); err != nil { + if err := r.updateDeployment(ctx, ts, &deploy, workspace, isGitHubApp, desiredReplicas); err != nil { logger.Error(err, "unable to update Deployment") return ctrl.Result{}, err } @@ -161,13 +186,14 @@ func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Update status with deployment name or phase if needed - needsStatusUpdate := ts.Status.DeploymentName != deploy.Name || ts.Status.Phase != desiredPhase + needsStatusUpdate := ts.Status.DeploymentName != deploy.Name || ts.Status.CronJobName != "" || ts.Status.Phase != desiredPhase if needsStatusUpdate { if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if getErr := r.Get(ctx, req.NamespacedName, &ts); getErr != nil { + if getErr := r.Get(ctx, req.NamespacedName, ts); getErr != nil { return getErr } ts.Status.DeploymentName = deploy.Name + ts.Status.CronJobName = "" if isSuspended { ts.Status.Phase = kelosv1alpha1.TaskSpawnerPhaseSuspended ts.Status.Message = "Suspended by user" @@ -177,7 +203,110 @@ func (r *TaskSpawnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) } else if ts.Status.Phase == "" { ts.Status.Phase = kelosv1alpha1.TaskSpawnerPhasePending } - return r.Status().Update(ctx, &ts) + return r.Status().Update(ctx, ts) + }); err != nil { + logger.Error(err, "Unable to update TaskSpawner status") + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +// reconcileCronJob handles the CronJob lifecycle for cron-based TaskSpawners. +func (r *TaskSpawnerReconciler) reconcileCronJob(ctx context.Context, req ctrl.Request, ts *kelosv1alpha1.TaskSpawner, isSuspended bool) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Clean up any existing Deployment from a previous polling-based configuration. + if err := r.deleteStaleResource(ctx, req.NamespacedName, &appsv1.Deployment{}, "Deployment"); err != nil { + return ctrl.Result{}, err + } + + var cronJob batchv1.CronJob + cronJobExists := true + if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil { + if apierrors.IsNotFound(err) { + cronJobExists = false + } else { + logger.Error(err, "Unable to fetch CronJob") + return ctrl.Result{}, err + } + } + + // Resolve workspace if workspaceRef is set in taskTemplate + var workspace *kelosv1alpha1.WorkspaceSpec + var isGitHubApp bool + if ts.Spec.TaskTemplate.WorkspaceRef != nil { + workspaceRefName := ts.Spec.TaskTemplate.WorkspaceRef.Name + var ws kelosv1alpha1.Workspace + if err := r.Get(ctx, client.ObjectKey{ + Namespace: ts.Namespace, + Name: workspaceRefName, + }, &ws); err != nil { + if apierrors.IsNotFound(err) { + logger.Info("Workspace not found yet, requeuing", "workspace", workspaceRefName) + r.recordEvent(ts, corev1.EventTypeNormal, "WorkspaceNotFound", "Workspace %s not found, requeuing", workspaceRefName) + return ctrl.Result{RequeueAfter: 2 * time.Second}, nil + } + logger.Error(err, "Unable to fetch Workspace for TaskSpawner", "workspace", workspaceRefName) + return ctrl.Result{}, err + } + workspace = &ws.Spec + + // Detect GitHub App auth + if workspace.SecretRef != nil { + var secret corev1.Secret + if err := r.Get(ctx, client.ObjectKey{ + Namespace: ts.Namespace, + Name: workspace.SecretRef.Name, + }, &secret); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "Unable to fetch workspace secret", "secret", workspace.SecretRef.Name) + return ctrl.Result{}, err + } + } else { + isGitHubApp = githubapp.IsGitHubApp(secret.Data) + if isGitHubApp { + logger.Info("Detected GitHub App secret for TaskSpawner", "secret", workspace.SecretRef.Name) + } + } + } + } + + if !cronJobExists { + return r.createCronJob(ctx, ts, workspace, isGitHubApp, isSuspended) + } + + if err := r.updateCronJob(ctx, ts, &cronJob, workspace, isGitHubApp, isSuspended); err != nil { + logger.Error(err, "Unable to update CronJob") + return ctrl.Result{}, err + } + + // Determine the desired phase based on current state. + // CronJobs are considered Running once they exist and are not suspended. + desiredPhase := ts.Status.Phase + if isSuspended { + desiredPhase = kelosv1alpha1.TaskSpawnerPhaseSuspended + } else if ts.Status.Phase != kelosv1alpha1.TaskSpawnerPhaseRunning { + desiredPhase = kelosv1alpha1.TaskSpawnerPhaseRunning + } + + needsStatusUpdate := ts.Status.CronJobName != cronJob.Name || ts.Status.DeploymentName != "" || ts.Status.Phase != desiredPhase + if needsStatusUpdate { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if getErr := r.Get(ctx, req.NamespacedName, ts); getErr != nil { + return getErr + } + ts.Status.CronJobName = cronJob.Name + ts.Status.DeploymentName = "" + if isSuspended { + ts.Status.Phase = kelosv1alpha1.TaskSpawnerPhaseSuspended + ts.Status.Message = "Suspended by user" + } else { + ts.Status.Phase = kelosv1alpha1.TaskSpawnerPhaseRunning + ts.Status.Message = "" + } + return r.Status().Update(ctx, ts) }); err != nil { logger.Error(err, "Unable to update TaskSpawner status") return ctrl.Result{}, err @@ -192,7 +321,7 @@ func (r *TaskSpawnerReconciler) handleDeletion(ctx context.Context, ts *kelosv1a logger := log.FromContext(ctx) if controllerutil.ContainsFinalizer(ts, taskSpawnerFinalizer) { - // The Deployment will be garbage collected via owner reference, + // The Deployment or CronJob will be garbage collected via owner reference, // but we remove the finalizer to allow the TaskSpawner to be deleted. controllerutil.RemoveFinalizer(ts, taskSpawnerFinalizer) if err := r.Update(ctx, ts); err != nil { @@ -240,6 +369,7 @@ func (r *TaskSpawnerReconciler) createDeployment(ctx context.Context, ts *kelosv ts.Status.Phase = kelosv1alpha1.TaskSpawnerPhasePending } ts.Status.DeploymentName = deploy.Name + ts.Status.CronJobName = "" return r.Status().Update(ctx, ts) }); err != nil { logger.Error(err, "Unable to update TaskSpawner status") @@ -305,6 +435,139 @@ func (r *TaskSpawnerReconciler) updateDeployment(ctx context.Context, ts *kelosv return nil } +// createCronJob creates a CronJob for a cron-based TaskSpawner. +func (r *TaskSpawnerReconciler) createCronJob(ctx context.Context, ts *kelosv1alpha1.TaskSpawner, workspace *kelosv1alpha1.WorkspaceSpec, isGitHubApp bool, isSuspended bool) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + cronJob := r.DeploymentBuilder.BuildCronJob(ts, workspace, isGitHubApp) + cronJob.Spec.Suspend = &isSuspended + + // Set owner reference + if err := controllerutil.SetControllerReference(ts, cronJob, r.Scheme); err != nil { + logger.Error(err, "Unable to set owner reference on CronJob") + return ctrl.Result{}, err + } + + if err := r.Create(ctx, cronJob); err != nil { + if apierrors.IsAlreadyExists(err) { + return ctrl.Result{Requeue: true}, nil + } + logger.Error(err, "Unable to create CronJob") + return ctrl.Result{}, err + } + + logger.Info("Created CronJob", "cronJob", cronJob.Name, "schedule", ts.Spec.When.Cron.Schedule) + r.recordEvent(ts, corev1.EventTypeNormal, "CronJobCreated", "Created spawner CronJob %s with schedule %s", cronJob.Name, ts.Spec.When.Cron.Schedule) + + // Update status + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if getErr := r.Get(ctx, client.ObjectKeyFromObject(ts), ts); getErr != nil { + return getErr + } + if isSuspended { + ts.Status.Phase = kelosv1alpha1.TaskSpawnerPhaseSuspended + ts.Status.Message = "Suspended by user" + } else { + ts.Status.Phase = kelosv1alpha1.TaskSpawnerPhaseRunning + } + ts.Status.CronJobName = cronJob.Name + ts.Status.DeploymentName = "" + return r.Status().Update(ctx, ts) + }); err != nil { + logger.Error(err, "Unable to update TaskSpawner status") + return ctrl.Result{}, err + } + + return ctrl.Result{Requeue: true}, nil +} + +// updateCronJob updates the CronJob if the schedule or suspend state changed. +func (r *TaskSpawnerReconciler) updateCronJob(ctx context.Context, ts *kelosv1alpha1.TaskSpawner, cronJob *batchv1.CronJob, workspace *kelosv1alpha1.WorkspaceSpec, isGitHubApp bool, isSuspended bool) error { + logger := log.FromContext(ctx) + + desired := r.DeploymentBuilder.BuildCronJob(ts, workspace, isGitHubApp) + needsUpdate := false + + if cronJob.Spec.Schedule != desired.Spec.Schedule { + cronJob.Spec.Schedule = desired.Spec.Schedule + needsUpdate = true + } + + if cronJob.Spec.Suspend == nil || *cronJob.Spec.Suspend != isSuspended { + cronJob.Spec.Suspend = &isSuspended + needsUpdate = true + } + + currentPodSpec := &cronJob.Spec.JobTemplate.Spec.Template.Spec + desiredPodSpec := &desired.Spec.JobTemplate.Spec.Template.Spec + + // Update container spec if changed (image, args, env, volumeMounts) + if len(currentPodSpec.Containers) > 0 { + current := currentPodSpec.Containers[0] + target := desiredPodSpec.Containers[0] + + if current.Image != target.Image || + !equalStringSlices(current.Args, target.Args) || + !equalEnvVars(current.Env, target.Env) || + !reflect.DeepEqual(current.VolumeMounts, target.VolumeMounts) { + currentPodSpec.Containers[0].Image = target.Image + currentPodSpec.Containers[0].Args = target.Args + currentPodSpec.Containers[0].Env = target.Env + currentPodSpec.Containers[0].VolumeMounts = target.VolumeMounts + needsUpdate = true + } + } + + // Update init containers if changed + if !reflect.DeepEqual(currentPodSpec.InitContainers, desiredPodSpec.InitContainers) { + currentPodSpec.InitContainers = desiredPodSpec.InitContainers + needsUpdate = true + } + + // Update volumes if changed + if !reflect.DeepEqual(currentPodSpec.Volumes, desiredPodSpec.Volumes) { + currentPodSpec.Volumes = desiredPodSpec.Volumes + needsUpdate = true + } + + if !needsUpdate { + return nil + } + + if err := r.Update(ctx, cronJob); err != nil { + return err + } + + logger.Info("Updated CronJob", "cronJob", cronJob.Name, "schedule", cronJob.Spec.Schedule, "suspended", isSuspended) + r.recordEvent(ts, corev1.EventTypeNormal, "CronJobUpdated", "Updated spawner CronJob %s", cronJob.Name) + return nil +} + +// deleteStaleResource deletes a resource by NamespacedName if it exists. +// This is used to clean up the old resource type when switching between +// Deployment-based and CronJob-based TaskSpawners. +func (r *TaskSpawnerReconciler) deleteStaleResource(ctx context.Context, key types.NamespacedName, obj client.Object, kind string) error { + logger := log.FromContext(ctx) + + if err := r.Get(ctx, key, obj); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + if err := r.Delete(ctx, obj); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + logger.Error(err, "Unable to delete stale "+kind, "name", key.Name) + return err + } + + logger.Info("Deleted stale "+kind+" after switching TaskSpawner type", "name", key.Name) + return nil +} + func equalStringSlices(a, b []string) bool { if len(a) != len(b) { return false @@ -420,6 +683,7 @@ func (r *TaskSpawnerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&kelosv1alpha1.TaskSpawner{}). Owns(&appsv1.Deployment{}). + Owns(&batchv1.CronJob{}). Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.findTaskSpawnersForSecret)). Watches(&kelosv1alpha1.Workspace{}, handler.EnqueueRequestsFromMapFunc(r.findTaskSpawnersForWorkspace)). Complete(r) diff --git a/internal/controller/taskspawner_deployment_builder.go b/internal/controller/taskspawner_deployment_builder.go index 99292c7..55b8c25 100644 --- a/internal/controller/taskspawner_deployment_builder.go +++ b/internal/controller/taskspawner_deployment_builder.go @@ -7,6 +7,7 @@ import ( "strings" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,14 +44,19 @@ func NewDeploymentBuilder() *DeploymentBuilder { } } -// Build creates a Deployment for the given TaskSpawner. -// The workspace parameter provides the repository URL and optional secretRef -// for GitHub API authentication. The isGitHubApp parameter indicates whether -// the workspace secret contains GitHub App credentials, which requires a -// token refresher sidecar. -func (b *DeploymentBuilder) Build(ts *kelosv1alpha1.TaskSpawner, workspace *kelosv1alpha1.WorkspaceSpec, isGitHubApp bool) *appsv1.Deployment { - replicas := int32(1) +// spawnerPodParts holds the components needed to build a spawner pod template. +type spawnerPodParts struct { + args []string + envVars []corev1.EnvVar + volumes []corev1.Volume + volumeMounts []corev1.VolumeMount + initContainers []corev1.Container + labels map[string]string +} +// buildPodParts computes the args, env, volumes, and labels that are shared +// between a Deployment pod and a CronJob pod for the given TaskSpawner. +func (b *DeploymentBuilder) buildPodParts(ts *kelosv1alpha1.TaskSpawner, workspace *kelosv1alpha1.WorkspaceSpec, isGitHubApp bool) spawnerPodParts { args := []string{ "--taskspawner-name=" + ts.Name, "--taskspawner-namespace=" + ts.Namespace, @@ -220,35 +226,54 @@ func (b *DeploymentBuilder) Build(ts *kelosv1alpha1.TaskSpawner, workspace *kelo "kelos.dev/taskspawner": ts.Name, } + return spawnerPodParts{ + args: args, + envVars: envVars, + volumes: volumes, + volumeMounts: volumeMounts, + initContainers: initContainers, + labels: labels, + } +} + +// Build creates a Deployment for the given TaskSpawner. +// The workspace parameter provides the repository URL and optional secretRef +// for GitHub API authentication. The isGitHubApp parameter indicates whether +// the workspace secret contains GitHub App credentials, which requires a +// token refresher sidecar. +func (b *DeploymentBuilder) Build(ts *kelosv1alpha1.TaskSpawner, workspace *kelosv1alpha1.WorkspaceSpec, isGitHubApp bool) *appsv1.Deployment { + replicas := int32(1) + p := b.buildPodParts(ts, workspace, isGitHubApp) + spawnerContainer := corev1.Container{ Name: "spawner", Image: b.SpawnerImage, ImagePullPolicy: b.SpawnerImagePullPolicy, - Args: args, - Env: envVars, - VolumeMounts: volumeMounts, + Args: p.args, + Env: p.envVars, + VolumeMounts: p.volumeMounts, } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: ts.Name, Namespace: ts.Namespace, - Labels: labels, + Labels: p.labels, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ - MatchLabels: labels, + MatchLabels: p.labels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: labels, + Labels: p.labels, }, Spec: corev1.PodSpec{ ServiceAccountName: SpawnerServiceAccount, RestartPolicy: corev1.RestartPolicyAlways, - Volumes: volumes, - InitContainers: initContainers, + Volumes: p.volumes, + InitContainers: p.initContainers, Containers: []corev1.Container{spawnerContainer}, }, }, @@ -256,6 +281,69 @@ func (b *DeploymentBuilder) Build(ts *kelosv1alpha1.TaskSpawner, workspace *kelo } } +// BuildCronJob creates a CronJob for a cron-based TaskSpawner. +// Instead of running a long-lived Deployment with pollInterval, the CronJob +// runs the spawner in one-shot mode on the cron schedule itself. +// The workspace and isGitHubApp parameters are passed through to buildPodParts +// so that CronJob pods get the same GitHub auth and repo args as Deployments. +func (b *DeploymentBuilder) BuildCronJob(ts *kelosv1alpha1.TaskSpawner, workspace *kelosv1alpha1.WorkspaceSpec, isGitHubApp bool) *batchv1.CronJob { + p := b.buildPodParts(ts, workspace, isGitHubApp) + + // Add --one-shot flag so the spawner runs a single cycle and exits. + // Copy to avoid mutating the shared slice from buildPodParts. + args := make([]string, len(p.args), len(p.args)+1) + copy(args, p.args) + args = append(args, "--one-shot") + + spawnerContainer := corev1.Container{ + Name: "spawner", + Image: b.SpawnerImage, + ImagePullPolicy: b.SpawnerImagePullPolicy, + Args: args, + Env: p.envVars, + VolumeMounts: p.volumeMounts, + } + + backoffLimit := int32(0) + // Keep the last 3 successful and 1 failed jobs for debugging. + successfulJobsHistoryLimit := int32(3) + failedJobsHistoryLimit := int32(1) + + return &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: ts.Name, + Namespace: ts.Namespace, + Labels: p.labels, + }, + Spec: batchv1.CronJobSpec{ + Schedule: ts.Spec.When.Cron.Schedule, + ConcurrencyPolicy: batchv1.ForbidConcurrent, + SuccessfulJobsHistoryLimit: &successfulJobsHistoryLimit, + FailedJobsHistoryLimit: &failedJobsHistoryLimit, + JobTemplate: batchv1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: p.labels, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: &backoffLimit, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: p.labels, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: SpawnerServiceAccount, + RestartPolicy: corev1.RestartPolicyNever, + Volumes: p.volumes, + InitContainers: p.initContainers, + Containers: []corev1.Container{spawnerContainer}, + }, + }, + }, + }, + }, + } +} + // httpsRepoRe matches HTTPS-style repository URLs: https://host/owner/repo var httpsRepoRe = regexp.MustCompile(`https?://([^/]+)/([^/]+)/([^/.]+)`) diff --git a/internal/controller/taskspawner_deployment_builder_test.go b/internal/controller/taskspawner_deployment_builder_test.go index 6fda831..7eb3c0f 100644 --- a/internal/controller/taskspawner_deployment_builder_test.go +++ b/internal/controller/taskspawner_deployment_builder_test.go @@ -7,11 +7,15 @@ import ( kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -1234,3 +1238,770 @@ func TestFindTaskSpawnersForWorkspace(t *testing.T) { t.Error("should not have request for spawner-3") } } + +func TestBuildCronJob_BasicSchedule(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "weekly-update", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeAPIKey, + SecretRef: kelosv1alpha1.SecretReference{Name: "creds"}, + }, + }, + }, + } + + cronJob := builder.BuildCronJob(ts, nil, false) + + // Verify CronJob metadata + if cronJob.Name != "weekly-update" { + t.Errorf("expected name %q, got %q", "weekly-update", cronJob.Name) + } + if cronJob.Namespace != "default" { + t.Errorf("expected namespace %q, got %q", "default", cronJob.Namespace) + } + + // Verify schedule + if cronJob.Spec.Schedule != "0 9 * * 1" { + t.Errorf("expected schedule %q, got %q", "0 9 * * 1", cronJob.Spec.Schedule) + } + + // Verify concurrency policy + if cronJob.Spec.ConcurrencyPolicy != "Forbid" { + t.Errorf("expected concurrency policy %q, got %q", "Forbid", cronJob.Spec.ConcurrencyPolicy) + } + + // Verify container args include --one-shot + if len(cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers) != 1 { + t.Fatalf("expected 1 container, got %d", len(cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers)) + } + + spawner := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0] + if spawner.Name != "spawner" { + t.Errorf("container name = %q, want %q", spawner.Name, "spawner") + } + + foundOneShot := false + foundName := false + foundNamespace := false + for _, arg := range spawner.Args { + if arg == "--one-shot" { + foundOneShot = true + } + if arg == "--taskspawner-name=weekly-update" { + foundName = true + } + if arg == "--taskspawner-namespace=default" { + foundNamespace = true + } + } + if !foundOneShot { + t.Errorf("expected --one-shot flag in args, got: %v", spawner.Args) + } + if !foundName { + t.Errorf("expected --taskspawner-name arg, got: %v", spawner.Args) + } + if !foundNamespace { + t.Errorf("expected --taskspawner-namespace arg, got: %v", spawner.Args) + } + + // Verify restart policy is Never (for Job pods) + if cronJob.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever { + t.Errorf("expected restart policy %q, got %q", corev1.RestartPolicyNever, cronJob.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy) + } + + // Verify service account + if cronJob.Spec.JobTemplate.Spec.Template.Spec.ServiceAccountName != SpawnerServiceAccount { + t.Errorf("expected service account %q, got %q", SpawnerServiceAccount, cronJob.Spec.JobTemplate.Spec.Template.Spec.ServiceAccountName) + } + + // Verify labels + if cronJob.Labels["kelos.dev/taskspawner"] != "weekly-update" { + t.Errorf("expected label kelos.dev/taskspawner=weekly-update, got %q", cronJob.Labels["kelos.dev/taskspawner"]) + } + + // Verify history limits + if cronJob.Spec.SuccessfulJobsHistoryLimit == nil || *cronJob.Spec.SuccessfulJobsHistoryLimit != 3 { + t.Errorf("expected SuccessfulJobsHistoryLimit=3, got %v", cronJob.Spec.SuccessfulJobsHistoryLimit) + } + if cronJob.Spec.FailedJobsHistoryLimit == nil || *cronJob.Spec.FailedJobsHistoryLimit != 1 { + t.Errorf("expected FailedJobsHistoryLimit=1, got %v", cronJob.Spec.FailedJobsHistoryLimit) + } +} + +func TestBuildCronJob_BackoffLimit(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cron", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "*/5 * * * *", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + cronJob := builder.BuildCronJob(ts, nil, false) + + // Backoff limit should be 0 (no retries for one-shot) + if cronJob.Spec.JobTemplate.Spec.BackoffLimit == nil || *cronJob.Spec.JobTemplate.Spec.BackoffLimit != 0 { + t.Errorf("expected BackoffLimit=0, got %v", cronJob.Spec.JobTemplate.Spec.BackoffLimit) + } +} + +func TestIsCronBased(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cron-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeAPIKey, + SecretRef: kelosv1alpha1.SecretReference{Name: "creds"}, + }, + }, + }, + } + + // Verify isCronBased works correctly + if !isCronBased(ts) { + t.Error("Expected isCronBased to return true for cron TaskSpawner") + } + + // Verify non-cron TaskSpawner returns false + nonCronTS := &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + GitHubIssues: &kelosv1alpha1.GitHubIssues{}, + }, + }, + } + if isCronBased(nonCronTS) { + t.Error("Expected isCronBased to return false for GitHub TaskSpawner") + } + + _ = builder // Use the builder (tests the compilation) +} + +func TestUpdateCronJob_ScheduleChange(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cron-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 10 * * 1", // Changed from 9 to 10 + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + // Build the original CronJob with old schedule + oldTS := ts.DeepCopy() + oldTS.Spec.When.Cron.Schedule = "0 9 * * 1" + cronJob := builder.BuildCronJob(oldTS, nil, false) + + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(appsv1.AddToScheme(scheme)) + utilruntime.Must(kelosv1alpha1.AddToScheme(scheme)) + + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ts, cronJob). + WithStatusSubresource(ts). + Build() + + r := &TaskSpawnerReconciler{ + Client: cl, + Scheme: scheme, + DeploymentBuilder: builder, + } + + ctx := context.Background() + if err := r.updateCronJob(ctx, ts, cronJob, nil, false, false); err != nil { + t.Fatalf("updateCronJob error: %v", err) + } + + // Verify schedule was updated + if cronJob.Spec.Schedule != "0 10 * * 1" { + t.Errorf("expected schedule %q, got %q", "0 10 * * 1", cronJob.Spec.Schedule) + } +} + +func TestUpdateCronJob_SuspendToggle(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cron-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + Suspend: boolPtr(true), + }, + } + + cronJob := builder.BuildCronJob(ts, nil, false) + notSuspended := false + cronJob.Spec.Suspend = ¬Suspended // Currently not suspended + + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(appsv1.AddToScheme(scheme)) + utilruntime.Must(kelosv1alpha1.AddToScheme(scheme)) + + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ts, cronJob). + WithStatusSubresource(ts). + Build() + + r := &TaskSpawnerReconciler{ + Client: cl, + Scheme: scheme, + DeploymentBuilder: builder, + } + + ctx := context.Background() + if err := r.updateCronJob(ctx, ts, cronJob, nil, false, true); err != nil { + t.Fatalf("updateCronJob error: %v", err) + } + + // Verify suspend was set to true + if cronJob.Spec.Suspend == nil || !*cronJob.Spec.Suspend { + t.Error("expected CronJob to be suspended") + } +} + +func TestUpdateCronJob_PodSpecChanges(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cron-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + cronJob := builder.BuildCronJob(ts, nil, false) + + scheme := newTestScheme() + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ts, cronJob). + WithStatusSubresource(ts). + Build() + + r := &TaskSpawnerReconciler{ + Client: cl, + Scheme: scheme, + DeploymentBuilder: builder, + } + + // Mutate the CronJob to simulate drift: wrong image, extra volume, extra init container + podSpec := &cronJob.Spec.JobTemplate.Spec.Template.Spec + podSpec.Containers[0].Image = "old-image:v1" + podSpec.Containers[0].VolumeMounts = []corev1.VolumeMount{{Name: "stale", MountPath: "/stale"}} + podSpec.InitContainers = []corev1.Container{{Name: "stale-init", Image: "stale:v1"}} + podSpec.Volumes = []corev1.Volume{{Name: "stale", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}} + + ctx := context.Background() + if err := r.updateCronJob(ctx, ts, cronJob, nil, false, false); err != nil { + t.Fatalf("updateCronJob error: %v", err) + } + + // Verify image was corrected + if podSpec.Containers[0].Image != DefaultSpawnerImage { + t.Errorf("expected image %q, got %q", DefaultSpawnerImage, podSpec.Containers[0].Image) + } + + // Verify stale volume mounts were removed (cron with no workspace has none) + if len(podSpec.Containers[0].VolumeMounts) != 0 { + t.Errorf("expected 0 volume mounts, got %d", len(podSpec.Containers[0].VolumeMounts)) + } + + // Verify stale init containers were removed + if len(podSpec.InitContainers) != 0 { + t.Errorf("expected 0 init containers, got %d", len(podSpec.InitContainers)) + } + + // Verify stale volumes were removed + if len(podSpec.Volumes) != 0 { + t.Errorf("expected 0 volumes, got %d", len(podSpec.Volumes)) + } +} + +func newTestScheme() *runtime.Scheme { + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(appsv1.AddToScheme(scheme)) + utilruntime.Must(batchv1.AddToScheme(scheme)) + utilruntime.Must(kelosv1alpha1.AddToScheme(scheme)) + return scheme +} + +func TestReconcileCronJob_DeletesStaleDeployment(t *testing.T) { + builder := NewDeploymentBuilder() + scheme := newTestScheme() + + // A TaskSpawner that was previously polling-based but is now cron-based. + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-spawner", + Namespace: "default", + Finalizers: []string{taskSpawnerFinalizer}, + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + // The stale Deployment left over from the previous polling configuration. + staleDeploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-spawner", + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "spawner"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "spawner"}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "spawner", Image: "test"}}}, + }, + }, + } + + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ts, staleDeploy). + WithStatusSubresource(ts). + Build() + + r := &TaskSpawnerReconciler{ + Client: cl, + Scheme: scheme, + DeploymentBuilder: builder, + } + + ctx := context.Background() + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: "my-spawner", Namespace: "default"}} + _, err := r.reconcileCronJob(ctx, req, ts, false) + if err != nil { + t.Fatalf("reconcileCronJob error: %v", err) + } + + // Verify the stale Deployment was deleted. + var deploy appsv1.Deployment + err = cl.Get(ctx, types.NamespacedName{Name: "my-spawner", Namespace: "default"}, &deploy) + if !apierrors.IsNotFound(err) { + t.Errorf("expected stale Deployment to be deleted, but Get returned: %v", err) + } + + // Verify a CronJob was created. + var cronJob batchv1.CronJob + if err := cl.Get(ctx, types.NamespacedName{Name: "my-spawner", Namespace: "default"}, &cronJob); err != nil { + t.Errorf("expected CronJob to be created, got error: %v", err) + } +} + +func TestReconcileDeployment_DeletesStaleCronJob(t *testing.T) { + builder := NewDeploymentBuilder() + scheme := newTestScheme() + + // A TaskSpawner that was previously cron-based but is now polling-based. + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-spawner", + Namespace: "default", + Finalizers: []string{taskSpawnerFinalizer}, + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + GitHubIssues: &kelosv1alpha1.GitHubIssues{}, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + // The stale CronJob left over from the previous cron configuration. + staleCronJob := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-spawner", + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "0 9 * * 1", + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "spawner", Image: "test"}}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + }, + } + + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ts, staleCronJob). + WithStatusSubresource(ts). + Build() + + r := &TaskSpawnerReconciler{ + Client: cl, + Scheme: scheme, + DeploymentBuilder: builder, + } + + ctx := context.Background() + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: "my-spawner", Namespace: "default"}} + _, err := r.reconcileDeployment(ctx, req, ts, false) + if err != nil { + t.Fatalf("reconcileDeployment error: %v", err) + } + + // Verify the stale CronJob was deleted. + var cronJob batchv1.CronJob + err = cl.Get(ctx, types.NamespacedName{Name: "my-spawner", Namespace: "default"}, &cronJob) + if !apierrors.IsNotFound(err) { + t.Errorf("expected stale CronJob to be deleted, but Get returned: %v", err) + } + + // Verify a Deployment was created. + var deploy appsv1.Deployment + if err := cl.Get(ctx, types.NamespacedName{Name: "my-spawner", Namespace: "default"}, &deploy); err != nil { + t.Errorf("expected Deployment to be created, got error: %v", err) + } +} + +func TestBuildCronJob_WithWorkspacePAT(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cron-with-workspace", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + WorkspaceRef: &kelosv1alpha1.WorkspaceReference{ + Name: "my-workspace", + }, + }, + }, + } + + workspace := &kelosv1alpha1.WorkspaceSpec{ + Repo: "https://github.com/myorg/myrepo", + SecretRef: &kelosv1alpha1.SecretReference{ + Name: "gh-pat-secret", + }, + } + + cronJob := builder.BuildCronJob(ts, workspace, false) + + // Verify GitHub args are present + spawner := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0] + foundOwner := false + foundRepo := false + foundOneShot := false + for _, arg := range spawner.Args { + if arg == "--github-owner=myorg" { + foundOwner = true + } + if arg == "--github-repo=myrepo" { + foundRepo = true + } + if arg == "--one-shot" { + foundOneShot = true + } + } + if !foundOwner { + t.Errorf("expected --github-owner=myorg in args, got: %v", spawner.Args) + } + if !foundRepo { + t.Errorf("expected --github-repo=myrepo in args, got: %v", spawner.Args) + } + if !foundOneShot { + t.Errorf("expected --one-shot in args, got: %v", spawner.Args) + } + + // Verify GITHUB_TOKEN env var is injected from PAT secret + foundTokenEnv := false + for _, env := range spawner.Env { + if env.Name == "GITHUB_TOKEN" && env.ValueFrom != nil && + env.ValueFrom.SecretKeyRef != nil && + env.ValueFrom.SecretKeyRef.Name == "gh-pat-secret" { + foundTokenEnv = true + } + } + if !foundTokenEnv { + t.Errorf("expected GITHUB_TOKEN env from secret gh-pat-secret, got: %v", spawner.Env) + } +} + +func TestBuildCronJob_WithWorkspaceGitHubApp(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cron-github-app", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + WorkspaceRef: &kelosv1alpha1.WorkspaceReference{ + Name: "my-workspace", + }, + }, + }, + } + + workspace := &kelosv1alpha1.WorkspaceSpec{ + Repo: "https://github.com/myorg/myrepo", + SecretRef: &kelosv1alpha1.SecretReference{ + Name: "gh-app-secret", + }, + } + + cronJob := builder.BuildCronJob(ts, workspace, true) + podSpec := cronJob.Spec.JobTemplate.Spec.Template.Spec + + // Verify token-refresher init container is present + if len(podSpec.InitContainers) != 1 { + t.Fatalf("expected 1 init container (token-refresher), got %d", len(podSpec.InitContainers)) + } + if podSpec.InitContainers[0].Name != "token-refresher" { + t.Errorf("expected init container name %q, got %q", "token-refresher", podSpec.InitContainers[0].Name) + } + + // Verify volumes for GitHub App are present + foundTokenVol := false + foundSecretVol := false + for _, vol := range podSpec.Volumes { + if vol.Name == "github-token" { + foundTokenVol = true + } + if vol.Name == "github-app-secret" { + foundSecretVol = true + } + } + if !foundTokenVol { + t.Error("expected github-token volume") + } + if !foundSecretVol { + t.Error("expected github-app-secret volume") + } + + // Verify spawner container has token file arg and volume mount + spawner := podSpec.Containers[0] + foundTokenFileArg := false + for _, arg := range spawner.Args { + if arg == "--github-token-file=/shared/token/GITHUB_TOKEN" { + foundTokenFileArg = true + } + } + if !foundTokenFileArg { + t.Errorf("expected --github-token-file arg, got: %v", spawner.Args) + } + + foundTokenMount := false + for _, vm := range spawner.VolumeMounts { + if vm.Name == "github-token" && vm.MountPath == "/shared/token" { + foundTokenMount = true + } + } + if !foundTokenMount { + t.Errorf("expected github-token volume mount, got: %v", spawner.VolumeMounts) + } +} + +func TestReconcileCronJob_ClearsStaleDeploymentName(t *testing.T) { + builder := NewDeploymentBuilder() + scheme := newTestScheme() + + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-spawner", + Namespace: "default", + Finalizers: []string{taskSpawnerFinalizer}, + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Cron: &kelosv1alpha1.Cron{ + Schedule: "0 9 * * 1", + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + Status: kelosv1alpha1.TaskSpawnerStatus{ + DeploymentName: "my-spawner", + Phase: kelosv1alpha1.TaskSpawnerPhaseRunning, + }, + } + + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ts). + WithStatusSubresource(ts). + Build() + + r := &TaskSpawnerReconciler{ + Client: cl, + Scheme: scheme, + DeploymentBuilder: builder, + } + + ctx := context.Background() + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: "my-spawner", Namespace: "default"}} + _, err := r.reconcileCronJob(ctx, req, ts, false) + if err != nil { + t.Fatalf("reconcileCronJob error: %v", err) + } + + // Re-fetch to see status updates + var updated kelosv1alpha1.TaskSpawner + if err := cl.Get(ctx, req.NamespacedName, &updated); err != nil { + t.Fatalf("failed to get updated TaskSpawner: %v", err) + } + + if updated.Status.CronJobName != "my-spawner" { + t.Errorf("expected CronJobName=%q, got %q", "my-spawner", updated.Status.CronJobName) + } + if updated.Status.DeploymentName != "" { + t.Errorf("expected DeploymentName to be cleared, got %q", updated.Status.DeploymentName) + } +} + +func TestReconcileDeployment_ClearsStaleCronJobName(t *testing.T) { + builder := NewDeploymentBuilder() + scheme := newTestScheme() + + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-spawner", + Namespace: "default", + Finalizers: []string{taskSpawnerFinalizer}, + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + GitHubIssues: &kelosv1alpha1.GitHubIssues{}, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + Status: kelosv1alpha1.TaskSpawnerStatus{ + CronJobName: "my-spawner", + Phase: kelosv1alpha1.TaskSpawnerPhaseRunning, + }, + } + + cl := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ts). + WithStatusSubresource(ts). + Build() + + r := &TaskSpawnerReconciler{ + Client: cl, + Scheme: scheme, + DeploymentBuilder: builder, + } + + ctx := context.Background() + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: "my-spawner", Namespace: "default"}} + _, err := r.reconcileDeployment(ctx, req, ts, false) + if err != nil { + t.Fatalf("reconcileDeployment error: %v", err) + } + + // Re-fetch to see status updates + var updated kelosv1alpha1.TaskSpawner + if err := cl.Get(ctx, req.NamespacedName, &updated); err != nil { + t.Fatalf("failed to get updated TaskSpawner: %v", err) + } + + if updated.Status.DeploymentName != "my-spawner" { + t.Errorf("expected DeploymentName=%q, got %q", "my-spawner", updated.Status.DeploymentName) + } + if updated.Status.CronJobName != "" { + t.Errorf("expected CronJobName to be cleared, got %q", updated.Status.CronJobName) + } +} diff --git a/internal/manifests/install-crd.yaml b/internal/manifests/install-crd.yaml index cb8b059..1f98cad 100644 --- a/internal/manifests/install-crd.yaml +++ b/internal/manifests/install-crd.yaml @@ -1233,9 +1233,15 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + cronJobName: + description: |- + CronJobName is the name of the CronJob running the spawner. + Set for cron-based sources. + type: string deploymentName: - description: DeploymentName is the name of the Deployment running - the spawner. + description: |- + DeploymentName is the name of the Deployment running the spawner. + Set for polling-based sources (GitHub Issues, Jira). type: string lastDiscoveryTime: description: LastDiscoveryTime is the last time the source was polled. diff --git a/internal/manifests/install.yaml b/internal/manifests/install.yaml index facfec6..fd67d92 100644 --- a/internal/manifests/install.yaml +++ b/internal/manifests/install.yaml @@ -74,6 +74,7 @@ rules: - apiGroups: - batch resources: + - cronjobs - jobs verbs: - create diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 26eb4ac..283ab3b 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -302,6 +302,14 @@ func (f *Framework) WaitForDeploymentAvailable(name string) { }, 2*time.Minute, 10*time.Second).Should(BeTrue()) } +// WaitForCronJobCreated waits for a CronJob with the given name to appear. +func (f *Framework) WaitForCronJobCreated(name string) { + Eventually(func() error { + _, err := f.Clientset.BatchV1().CronJobs(f.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) + return err + }, 2*time.Minute, 10*time.Second).Should(Succeed()) +} + // GetTaskPhase returns the phase of a Task. func (f *Framework) GetTaskPhase(name string) string { task, err := f.KelosClientset.ApiV1alpha1().Tasks(f.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) diff --git a/test/e2e/taskspawner_test.go b/test/e2e/taskspawner_test.go index f28ec66..471b963 100644 --- a/test/e2e/taskspawner_test.go +++ b/test/e2e/taskspawner_test.go @@ -153,7 +153,7 @@ var _ = Describe("TaskSpawner", func() { var _ = Describe("Cron TaskSpawner", func() { f := framework.NewFramework("cron") - It("should create a spawner Deployment and discover cron ticks", func() { + It("should create a CronJob and discover cron ticks", func() { By("creating OAuth credentials secret") f.CreateSecret("claude-credentials", "CLAUDE_CODE_OAUTH_TOKEN="+oauthToken) @@ -182,8 +182,8 @@ var _ = Describe("Cron TaskSpawner", func() { }, }) - By("waiting for Deployment to become available") - f.WaitForDeploymentAvailable("cron-spawner") + By("waiting for CronJob to be created") + f.WaitForCronJobCreated("cron-spawner") By("waiting for TaskSpawner phase to become Running") Eventually(func() string { diff --git a/test/integration/taskspawner_test.go b/test/integration/taskspawner_test.go index 38e4fa8..d7a8f30 100644 --- a/test/integration/taskspawner_test.go +++ b/test/integration/taskspawner_test.go @@ -10,6 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -631,7 +632,7 @@ var _ = Describe("TaskSpawner Controller", func() { }) Context("When creating a TaskSpawner with Cron source", func() { - It("Should create a Deployment and update status", func() { + It("Should create a CronJob and update status", func() { By("Creating a namespace") ns := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ @@ -683,47 +684,55 @@ var _ = Describe("TaskSpawner Controller", func() { return false }, timeout, interval).Should(BeTrue()) - By("Verifying a Deployment is created") - deployLookupKey := types.NamespacedName{Name: ts.Name, Namespace: ns.Name} - createdDeploy := &appsv1.Deployment{} + By("Verifying a CronJob is created") + cronJobLookupKey := types.NamespacedName{Name: ts.Name, Namespace: ns.Name} + createdCronJob := &batchv1.CronJob{} Eventually(func() bool { - err := k8sClient.Get(ctx, deployLookupKey, createdDeploy) + err := k8sClient.Get(ctx, cronJobLookupKey, createdCronJob) return err == nil }, timeout, interval).Should(BeTrue()) - By("Verifying the Deployment labels") - Expect(createdDeploy.Labels["kelos.dev/taskspawner"]).To(Equal(ts.Name)) + By("Verifying the CronJob labels") + Expect(createdCronJob.Labels["kelos.dev/taskspawner"]).To(Equal(ts.Name)) - By("Verifying the Deployment spec") - Expect(createdDeploy.Spec.Template.Spec.Containers).To(HaveLen(1)) - container := createdDeploy.Spec.Template.Spec.Containers[0] + By("Verifying the CronJob schedule") + Expect(createdCronJob.Spec.Schedule).To(Equal("0 9 * * 1")) + + By("Verifying the CronJob concurrency policy") + Expect(createdCronJob.Spec.ConcurrencyPolicy).To(Equal(batchv1.ForbidConcurrent)) + + By("Verifying the CronJob pod spec") + podSpec := createdCronJob.Spec.JobTemplate.Spec.Template.Spec + Expect(podSpec.Containers).To(HaveLen(1)) + container := podSpec.Containers[0] Expect(container.Name).To(Equal("spawner")) Expect(container.Image).To(Equal(controller.DefaultSpawnerImage)) Expect(container.Args).To(ConsistOf( "--taskspawner-name="+ts.Name, "--taskspawner-namespace="+ns.Name, + "--one-shot", )) - By("Verifying the Deployment has no env vars (cron needs no secrets)") + By("Verifying the CronJob has no env vars (cron needs no secrets)") Expect(container.Env).To(BeEmpty()) - By("Verifying the Deployment has owner reference") - Expect(createdDeploy.OwnerReferences).To(HaveLen(1)) - Expect(createdDeploy.OwnerReferences[0].Name).To(Equal(ts.Name)) - Expect(createdDeploy.OwnerReferences[0].Kind).To(Equal("TaskSpawner")) + By("Verifying the CronJob has owner reference") + Expect(createdCronJob.OwnerReferences).To(HaveLen(1)) + Expect(createdCronJob.OwnerReferences[0].Name).To(Equal(ts.Name)) + Expect(createdCronJob.OwnerReferences[0].Kind).To(Equal("TaskSpawner")) - By("Verifying TaskSpawner status has deploymentName") + By("Verifying TaskSpawner status has cronJobName") Eventually(func() string { err := k8sClient.Get(ctx, tsLookupKey, createdTS) if err != nil { return "" } - return createdTS.Status.DeploymentName + return createdTS.Status.CronJobName }, timeout, interval).Should(Equal(ts.Name)) - By("Verifying TaskSpawner phase is Pending") - Expect(createdTS.Status.Phase).To(Equal(kelosv1alpha1.TaskSpawnerPhasePending)) + By("Verifying TaskSpawner phase is Running") + Expect(createdTS.Status.Phase).To(Equal(kelosv1alpha1.TaskSpawnerPhaseRunning)) }) })