From cf6c3bf4a60dcd1ebde8104c9aba1aa20637e213 Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Wed, 25 Feb 2026 12:25:17 +0000 Subject: [PATCH] Auto-retry failed Tasks in spawner dedup logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a Task created by a TaskSpawner fails, the spawner's dedup logic previously treated it the same as a succeeded task — the task name existed, so the work item was skipped. This meant failed cron tasks would block retries until TTL cleanup (up to 10 days for the strategist spawner). Now the spawner detects failed Tasks during dedup, deletes them, and recreates them for immediate retry. Succeeded tasks remain deduplicated as before. The retry still respects maxConcurrency and maxTotalTasks limits. Partially addresses #287 (problem 1: cron dedup prevents retry after failure). Co-Authored-By: Claude Opus 4.6 --- cmd/axon-spawner/main.go | 21 ++++++- cmd/axon-spawner/main_test.go | 107 +++++++++++++++++++++++++++++++++- 2 files changed, 124 insertions(+), 4 deletions(-) diff --git a/cmd/axon-spawner/main.go b/cmd/axon-spawner/main.go index 02243c3..8fbf64f 100644 --- a/cmd/axon-spawner/main.go +++ b/cmd/axon-spawner/main.go @@ -174,9 +174,13 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa } existingTasks := make(map[string]bool) + failedTasks := make(map[string]bool) activeTasks := 0 for _, t := range existingTaskList.Items { existingTasks[t.Name] = true + if t.Status.Phase == axonv1alpha1.TaskPhaseFailed { + failedTasks[t.Name] = true + } if t.Status.Phase != axonv1alpha1.TaskPhaseSucceeded && t.Status.Phase != axonv1alpha1.TaskPhaseFailed { activeTasks++ } @@ -185,7 +189,7 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa var newItems []source.WorkItem for _, item := range items { taskName := fmt.Sprintf("%s-%s", ts.Name, item.ID) - if !existingTasks[taskName] { + if !existingTasks[taskName] || failedTasks[taskName] { newItems = append(newItems, item) } } @@ -216,6 +220,21 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa taskName := fmt.Sprintf("%s-%s", ts.Name, item.ID) + // Delete failed Task so it can be recreated for retry + if failedTasks[taskName] { + old := &axonv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: taskName, + Namespace: ts.Namespace, + }, + } + if err := cl.Delete(ctx, old); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "Deleting failed Task for retry", "task", taskName) + continue + } + log.Info("Deleted failed Task for retry", "task", taskName) + } + prompt, err := source.RenderPrompt(ts.Spec.TaskTemplate.PromptTemplate, item) if err != nil { log.Error(err, "rendering prompt", "item", item.ID) diff --git a/cmd/axon-spawner/main_test.go b/cmd/axon-spawner/main_test.go index b1b8251..80b4ada 100644 --- a/cmd/axon-spawner/main_test.go +++ b/cmd/axon-spawner/main_test.go @@ -286,7 +286,7 @@ func TestRunCycleWithSource_CompletedTasksDontCountTowardsLimit(t *testing.T) { ts := newTaskSpawner("spawner", "default", int32Ptr(2)) existingTasks := []axonv1alpha1.Task{ newTask("spawner-done1", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded), - newTask("spawner-done2", "default", "spawner", axonv1alpha1.TaskPhaseFailed), + newTask("spawner-done2", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded), } cl, key := setupTest(t, ts, existingTasks...) @@ -304,13 +304,13 @@ func TestRunCycleWithSource_CompletedTasksDontCountTowardsLimit(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - // 2 completed tasks don't count, so 2 new can be created (maxConcurrency=2) + // 2 succeeded tasks don't count towards concurrency, so 2 new can be created (maxConcurrency=2) var taskList axonv1alpha1.TaskList if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { t.Fatalf("Listing tasks: %v", err) } if len(taskList.Items) != 4 { - t.Errorf("Expected 4 tasks (2 completed + 2 new), got %d", len(taskList.Items)) + t.Errorf("Expected 4 tasks (2 succeeded + 2 new), got %d", len(taskList.Items)) } } @@ -859,3 +859,104 @@ func TestRunCycleWithSource_NotSuspendedConditionCleared(t *testing.T) { } } } + +func TestRunCycleWithSource_FailedTaskRetriedImmediately(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + failedTask := newTask("spawner-42", "default", "spawner", axonv1alpha1.TaskPhaseFailed) + cl, key := setupTest(t, ts, failedTask) + + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "42", Number: 42, Title: "Fix bug"}, + }, + } + + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // The failed task should be deleted and a new one created + var taskList axonv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + if len(taskList.Items) != 1 { + t.Fatalf("Expected 1 task (failed deleted, new created), got %d", len(taskList.Items)) + } + + // The recreated task should be in Pending phase (not Failed) + task := taskList.Items[0] + if task.Name != "spawner-42" { + t.Errorf("Expected task name %q, got %q", "spawner-42", task.Name) + } + if task.Status.Phase == axonv1alpha1.TaskPhaseFailed { + t.Error("Expected recreated task to not be in Failed phase") + } +} + +func TestRunCycleWithSource_SucceededTaskNotRetried(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + succeededTask := newTask("spawner-42", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded) + cl, key := setupTest(t, ts, succeededTask) + + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "42", Number: 42, Title: "Fix bug"}, + }, + } + + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Succeeded task should NOT be retried + var taskList axonv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + if len(taskList.Items) != 1 { + t.Fatalf("Expected 1 task (succeeded not retried), got %d", len(taskList.Items)) + } + if taskList.Items[0].Status.Phase != axonv1alpha1.TaskPhaseSucceeded { + t.Error("Expected succeeded task to remain unchanged") + } +} + +func TestRunCycleWithSource_FailedTaskRetryRespectsMaxConcurrency(t *testing.T) { + ts := newTaskSpawner("spawner", "default", int32Ptr(1)) + existingTasks := []axonv1alpha1.Task{ + newTask("spawner-running", "default", "spawner", axonv1alpha1.TaskPhaseRunning), + newTask("spawner-failed", "default", "spawner", axonv1alpha1.TaskPhaseFailed), + } + cl, key := setupTest(t, ts, existingTasks...) + + src := &fakeSource{ + items: []source.WorkItem{ + {ID: "running", Title: "Running"}, + {ID: "failed", Title: "Failed"}, + }, + } + + if err := runCycleWithSource(context.Background(), cl, key, src); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // The failed task is eligible for retry but maxConcurrency=1 is already + // occupied by the running task, so no new task should be created. + // The failed task should still be deleted since it was selected for retry. + var taskList axonv1alpha1.TaskList + if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil { + t.Fatalf("Listing tasks: %v", err) + } + + // Count active tasks - should still be 1 (the running task) + active := 0 + for _, task := range taskList.Items { + if task.Status.Phase != axonv1alpha1.TaskPhaseSucceeded && task.Status.Phase != axonv1alpha1.TaskPhaseFailed { + active++ + } + } + if active > 1 { + t.Errorf("Expected at most 1 active task (maxConcurrency=1), got %d", active) + } +}