Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions cmd/kelos-spawner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,44 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa
return fmt.Errorf("listing existing Tasks: %w", err)
}

existingTasks := make(map[string]bool)
existingTaskMap := make(map[string]*kelosv1alpha1.Task)
activeTasks := 0
for _, t := range existingTaskList.Items {
existingTasks[t.Name] = true
for i := range existingTaskList.Items {
t := &existingTaskList.Items[i]
existingTaskMap[t.Name] = t
if t.Status.Phase != kelosv1alpha1.TaskPhaseSucceeded && t.Status.Phase != kelosv1alpha1.TaskPhaseFailed {
activeTasks++
}
}

// Determine whether the source supports retrigger via TriggerComment.
hasTriggerComment := ts.Spec.When.GitHubIssues != nil && ts.Spec.When.GitHubIssues.TriggerComment != ""

var newItems []source.WorkItem
for _, item := range items {
taskName := fmt.Sprintf("%s-%s", ts.Name, item.ID)
if !existingTasks[taskName] {
existing, found := existingTaskMap[taskName]
if !found {
newItems = append(newItems, item)
continue
}

// Retrigger: when TriggerComment is configured and the existing task
// is completed, check whether a trigger comment was posted after the
// task finished. If so, delete the completed task so a new one can be
// created. Note: if creation is later blocked by maxConcurrency or
// maxTotalTasks, the item will be picked up as new on the next cycle
// since the old task no longer exists.
if hasTriggerComment && !item.TriggerTime.IsZero() &&
(existing.Status.Phase == kelosv1alpha1.TaskPhaseSucceeded || existing.Status.Phase == kelosv1alpha1.TaskPhaseFailed) &&
existing.Status.CompletionTime != nil &&
item.TriggerTime.After(existing.Status.CompletionTime.Time) {

if err := cl.Delete(ctx, existing); err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "Deleting completed task for retrigger", "task", taskName)
continue
}
log.Info("Deleted completed task for retrigger", "task", taskName)
newItems = append(newItems, item)
}
}
Expand Down
241 changes: 241 additions & 0 deletions cmd/kelos-spawner/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -995,3 +996,243 @@ func TestRunCycleWithSource_CommentFieldsPassedToSource(t *testing.T) {
t.Errorf("ExcludeComments = %v, want %v", ghSrc.ExcludeComments, []string{"/kelos needs-input"})
}
}

func newCompletedTask(name, namespace, spawnerName string, phase kelosv1alpha1.TaskPhase, completionTime time.Time) kelosv1alpha1.Task {
ct := metav1.NewTime(completionTime)
return kelosv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"kelos.dev/taskspawner": spawnerName,
},
},
Spec: kelosv1alpha1.TaskSpec{
Type: "claude-code",
Prompt: "test",
Credentials: kelosv1alpha1.Credentials{
Type: kelosv1alpha1.CredentialTypeOAuth,
SecretRef: kelosv1alpha1.SecretReference{Name: "creds"},
},
},
Status: kelosv1alpha1.TaskStatus{
Phase: phase,
CompletionTime: &ct,
},
}
}

func TestRunCycleWithSource_RetriggerCompletedTask(t *testing.T) {
ts := newTaskSpawner("spawner", "default", nil)
ts.Spec.When.GitHubIssues.TriggerComment = "/kelos pick-up"

completionTime := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
triggerTime := time.Date(2026, 1, 2, 12, 0, 0, 0, time.UTC) // after completion

existingTasks := []kelosv1alpha1.Task{
newCompletedTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded, completionTime),
}
cl, key := setupTest(t, ts, existingTasks...)

src := &fakeSource{
items: []source.WorkItem{
{ID: "1", Title: "Retriggered item", TriggerTime: triggerTime},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// The old completed task should be deleted and a new one created
var taskList kelosv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (old deleted, new created), got %d", len(taskList.Items))
}
// The new task should not have a completion time (it's freshly created)
if taskList.Items[0].Status.CompletionTime != nil {
t.Error("Expected new task to have no CompletionTime")
}
}

func TestRunCycleWithSource_RetriggerSkippedWhenTriggerBeforeCompletion(t *testing.T) {
ts := newTaskSpawner("spawner", "default", nil)
ts.Spec.When.GitHubIssues.TriggerComment = "/kelos pick-up"

completionTime := time.Date(2026, 1, 2, 12, 0, 0, 0, time.UTC)
triggerTime := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC) // before completion

existingTasks := []kelosv1alpha1.Task{
newCompletedTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded, completionTime),
}
cl, key := setupTest(t, ts, existingTasks...)

src := &fakeSource{
items: []source.WorkItem{
{ID: "1", Title: "Old trigger", TriggerTime: triggerTime},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// The completed task should remain and no new task should be created
var taskList kelosv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (no retrigger), got %d", len(taskList.Items))
}
if taskList.Items[0].Status.CompletionTime == nil {
t.Error("Expected the original completed task to remain")
}
}

func TestRunCycleWithSource_RetriggerFailedTask(t *testing.T) {
ts := newTaskSpawner("spawner", "default", nil)
ts.Spec.When.GitHubIssues.TriggerComment = "/kelos pick-up"

completionTime := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
triggerTime := time.Date(2026, 1, 2, 12, 0, 0, 0, time.UTC) // after completion

existingTasks := []kelosv1alpha1.Task{
newCompletedTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseFailed, completionTime),
}
cl, key := setupTest(t, ts, existingTasks...)

src := &fakeSource{
items: []source.WorkItem{
{ID: "1", Title: "Retriggered after failure", TriggerTime: triggerTime},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// The failed task should be deleted and a new one created
var taskList kelosv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (old deleted, new created), got %d", len(taskList.Items))
}
if taskList.Items[0].Status.CompletionTime != nil {
t.Error("Expected new task to have no CompletionTime")
}
}

func TestRunCycleWithSource_RetriggerSkippedWithoutTriggerComment(t *testing.T) {
// When TriggerComment is not configured, retrigger should not happen
ts := newTaskSpawner("spawner", "default", nil)

completionTime := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
triggerTime := time.Date(2026, 1, 2, 12, 0, 0, 0, time.UTC)

existingTasks := []kelosv1alpha1.Task{
newCompletedTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded, completionTime),
}
cl, key := setupTest(t, ts, existingTasks...)

src := &fakeSource{
items: []source.WorkItem{
{ID: "1", Title: "Item with trigger time but no config", TriggerTime: triggerTime},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Completed task should remain — no retrigger without TriggerComment config
var taskList kelosv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (no retrigger), got %d", len(taskList.Items))
}
if taskList.Items[0].Status.CompletionTime == nil {
t.Error("Expected the original completed task to remain")
}
}

func TestRunCycleWithSource_RetriggerSkippedForRunningTask(t *testing.T) {
// Active (running) tasks should never be retriggered
ts := newTaskSpawner("spawner", "default", nil)
ts.Spec.When.GitHubIssues.TriggerComment = "/kelos pick-up"

existingTasks := []kelosv1alpha1.Task{
newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseRunning),
}
cl, key := setupTest(t, ts, existingTasks...)

src := &fakeSource{
items: []source.WorkItem{
{ID: "1", Title: "Item", TriggerTime: time.Now()},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Running task should remain and no new task should be created
var taskList kelosv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (no retrigger for running), got %d", len(taskList.Items))
}
}

func TestRunCycleWithSource_RetriggerRespectsMaxConcurrency(t *testing.T) {
ts := newTaskSpawner("spawner", "default", int32Ptr(1))
ts.Spec.When.GitHubIssues.TriggerComment = "/kelos pick-up"

completionTime := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
triggerTime := time.Date(2026, 1, 2, 12, 0, 0, 0, time.UTC)

// One running task already at the concurrency limit, plus one completed task to retrigger
existingTasks := []kelosv1alpha1.Task{
newTask("spawner-running", "default", "spawner", kelosv1alpha1.TaskPhaseRunning),
newCompletedTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded, completionTime),
}
cl, key := setupTest(t, ts, existingTasks...)

src := &fakeSource{
items: []source.WorkItem{
{ID: "running", Title: "Running"},
{ID: "1", Title: "Retriggered", TriggerTime: triggerTime},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// The completed task should be deleted (retrigger), but the new task
// should not be created because maxConcurrency=1 is already reached
// by the running task.
var taskList kelosv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}

// The completed task was deleted during the retrigger phase, but the new
// task was not created because maxConcurrency=1 is already filled by the
// running task. The item will be picked up as new on the next cycle.
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (running only, retrigger blocked by concurrency), got %d", len(taskList.Items))
}
if taskList.Items[0].Name != "spawner-running" {
t.Errorf("Expected spawner-running to remain, got %q", taskList.Items[0].Name)
}
}
Loading
Loading