diff --git a/cmd/axon-spawner/main.go b/cmd/axon-spawner/main.go index 02243c3..8fbf64f 100644 --- a/cmd/axon-spawner/main.go +++ b/cmd/axon-spawner/main.go @@ -174,9 +174,13 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa } existingTasks := make(map[string]bool) + failedTasks := make(map[string]bool) activeTasks := 0 for _, t := range existingTaskList.Items { existingTasks[t.Name] = true + if t.Status.Phase == axonv1alpha1.TaskPhaseFailed { + failedTasks[t.Name] = true + } if t.Status.Phase != axonv1alpha1.TaskPhaseSucceeded && t.Status.Phase != axonv1alpha1.TaskPhaseFailed { activeTasks++ } @@ -185,7 +189,7 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa var newItems []source.WorkItem for _, item := range items { taskName := fmt.Sprintf("%s-%s", ts.Name, item.ID) - if !existingTasks[taskName] { + if !existingTasks[taskName] || failedTasks[taskName] { newItems = append(newItems, item) } } @@ -216,6 +220,21 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa taskName := fmt.Sprintf("%s-%s", ts.Name, item.ID) + // Delete failed Task so it can be recreated for retry + if failedTasks[taskName] { + old := &axonv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: taskName, + Namespace: ts.Namespace, + }, + } + if err := cl.Delete(ctx, old); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "Deleting failed Task for retry", "task", taskName) + continue + } + log.Info("Deleted failed Task for retry", "task", taskName) + } + prompt, err := source.RenderPrompt(ts.Spec.TaskTemplate.PromptTemplate, item) if err != nil { log.Error(err, "rendering prompt", "item", item.ID) diff --git a/cmd/axon-spawner/main_test.go b/cmd/axon-spawner/main_test.go index b1b8251..80b4ada 100644 --- a/cmd/axon-spawner/main_test.go +++ b/cmd/axon-spawner/main_test.go @@ -286,7 +286,7 @@ func TestRunCycleWithSource_CompletedTasksDontCountTowardsLimit(t *testing.T) { ts := newTaskSpawner("spawner", "default", int32Ptr(2)) existingTasks := []axonv1alpha1.Task{ newTask("spawner-done1", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded), - newTask("spawner-done2", "default", "spawner", axonv1alpha1.TaskPhaseFailed), + newTask("spawner-done2", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded), } cl, key := setupTest(t, ts, existingTasks...) @@ -304,13 +304,13 @@ func TestRunCycleWithSource_CompletedTasksDontCountTowardsLimit(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - // 2 completed tasks don't count, so 2 new can be created (maxConcurrency=2) + // 2 succeeded tasks don't count towards concurrency, so 2 new can be created (maxConcurrency=2) var taskList axonv1alpha1.TaskList if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { t.Fatalf("Listing tasks: %v", err) } if len(taskList.Items) != 4 { - t.Errorf("Expected 4 tasks (2 completed + 2 new), got %d", len(taskList.Items)) + t.Errorf("Expected 4 tasks (2 succeeded + 2 new), got %d", len(taskList.Items)) } } @@ -859,3 +859,104 @@ func TestRunCycleWithSource_NotSuspendedConditionCleared(t *testing.T) { } } } + +func TestRunCycleWithSource_FailedTaskRetriedImmediately(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + failedTask := newTask("spawner-42", "default", "spawner", axonv1alpha1.TaskPhaseFailed) + cl, key := setupTest(t, ts, failedTask) + + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "42", Number: 42, Title: "Fix bug"}, + }, + } + + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // The failed task should be deleted and a new one created + var taskList axonv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + if len(taskList.Items) != 1 { + t.Fatalf("Expected 1 task (failed deleted, new created), got %d", len(taskList.Items)) + } + + // The recreated task should be in Pending phase (not Failed) + task := taskList.Items[0] + if task.Name != "spawner-42" { + t.Errorf("Expected task name %q, got %q", "spawner-42", task.Name) + } + if task.Status.Phase == axonv1alpha1.TaskPhaseFailed { + t.Error("Expected recreated task to not be in Failed phase") + } +} + +func TestRunCycleWithSource_SucceededTaskNotRetried(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + succeededTask := newTask("spawner-42", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded) + cl, key := setupTest(t, ts, succeededTask) + + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "42", Number: 42, Title: "Fix bug"}, + }, + } + + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Succeeded task should NOT be retried + var taskList axonv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + if len(taskList.Items) != 1 { + t.Fatalf("Expected 1 task (succeeded not retried), got %d", len(taskList.Items)) + } + if taskList.Items[0].Status.Phase != axonv1alpha1.TaskPhaseSucceeded { + t.Error("Expected succeeded task to remain unchanged") + } +} + +func TestRunCycleWithSource_FailedTaskRetryRespectsMaxConcurrency(t *testing.T) { + ts := newTaskSpawner("spawner", "default", int32Ptr(1)) + existingTasks := []axonv1alpha1.Task{ + newTask("spawner-running", "default", "spawner", axonv1alpha1.TaskPhaseRunning), + newTask("spawner-failed", "default", "spawner", axonv1alpha1.TaskPhaseFailed), + } + cl, key := setupTest(t, ts, existingTasks...) + + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "running", Title: "Running"}, + {ID: "failed", Title: "Failed"}, + }, + } + + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // The failed task is eligible for retry but maxConcurrency=1 is already + // occupied by the running task, so no new task should be created. + // The failed task should still be deleted since it was selected for retry. + var taskList axonv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + + // Count active tasks - should still be 1 (the running task) + active := 0 + for _, task := range taskList.Items { + if task.Status.Phase != axonv1alpha1.TaskPhaseSucceeded && task.Status.Phase != axonv1alpha1.TaskPhaseFailed { + active++ + } + } + if active > 1 { + t.Errorf("Expected at most 1 active task (maxConcurrency=1), got %d", active) + } +}