Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion cmd/axon-spawner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,13 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa
}

existingTasks := make(map[string]bool)
failedTasks := make(map[string]bool)
activeTasks := 0
for _, t := range existingTaskList.Items {
existingTasks[t.Name] = true
if t.Status.Phase == axonv1alpha1.TaskPhaseFailed {
failedTasks[t.Name] = true
}
if t.Status.Phase != axonv1alpha1.TaskPhaseSucceeded && t.Status.Phase != axonv1alpha1.TaskPhaseFailed {
activeTasks++
}
Expand All @@ -185,7 +189,7 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa
var newItems []source.WorkItem
for _, item := range items {
taskName := fmt.Sprintf("%s-%s", ts.Name, item.ID)
if !existingTasks[taskName] {
if !existingTasks[taskName] || failedTasks[taskName] {
newItems = append(newItems, item)
}
}
Expand Down Expand Up @@ -216,6 +220,21 @@ func runCycleWithSource(ctx context.Context, cl client.Client, key types.Namespa

taskName := fmt.Sprintf("%s-%s", ts.Name, item.ID)

// Delete failed Task so it can be recreated for retry
if failedTasks[taskName] {
old := &axonv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: taskName,
Namespace: ts.Namespace,
},
}
if err := cl.Delete(ctx, old); err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "Deleting failed Task for retry", "task", taskName)
continue
}
log.Info("Deleted failed Task for retry", "task", taskName)
}

prompt, err := source.RenderPrompt(ts.Spec.TaskTemplate.PromptTemplate, item)
if err != nil {
log.Error(err, "rendering prompt", "item", item.ID)
Expand Down
107 changes: 104 additions & 3 deletions cmd/axon-spawner/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestRunCycleWithSource_CompletedTasksDontCountTowardsLimit(t *testing.T) {
ts := newTaskSpawner("spawner", "default", int32Ptr(2))
existingTasks := []axonv1alpha1.Task{
newTask("spawner-done1", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded),
newTask("spawner-done2", "default", "spawner", axonv1alpha1.TaskPhaseFailed),
newTask("spawner-done2", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded),
}
cl, key := setupTest(t, ts, existingTasks...)

Expand All @@ -304,13 +304,13 @@ func TestRunCycleWithSource_CompletedTasksDontCountTowardsLimit(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

// 2 completed tasks don't count, so 2 new can be created (maxConcurrency=2)
// 2 succeeded tasks don't count towards concurrency, so 2 new can be created (maxConcurrency=2)
var taskList axonv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 4 {
t.Errorf("Expected 4 tasks (2 completed + 2 new), got %d", len(taskList.Items))
t.Errorf("Expected 4 tasks (2 succeeded + 2 new), got %d", len(taskList.Items))
}
}

Expand Down Expand Up @@ -859,3 +859,104 @@ func TestRunCycleWithSource_NotSuspendedConditionCleared(t *testing.T) {
}
}
}

func TestRunCycleWithSource_FailedTaskRetriedImmediately(t *testing.T) {
ts := newTaskSpawner("spawner", "default", nil)
failedTask := newTask("spawner-42", "default", "spawner", axonv1alpha1.TaskPhaseFailed)
cl, key := setupTest(t, ts, failedTask)

src := &fakeSource{
items: []source.WorkItem{
{ID: "42", Number: 42, Title: "Fix bug"},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// The failed task should be deleted and a new one created
var taskList axonv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (failed deleted, new created), got %d", len(taskList.Items))
}

// The recreated task should be in Pending phase (not Failed)
task := taskList.Items[0]
if task.Name != "spawner-42" {
t.Errorf("Expected task name %q, got %q", "spawner-42", task.Name)
}
if task.Status.Phase == axonv1alpha1.TaskPhaseFailed {
t.Error("Expected recreated task to not be in Failed phase")
}
}

func TestRunCycleWithSource_SucceededTaskNotRetried(t *testing.T) {
ts := newTaskSpawner("spawner", "default", nil)
succeededTask := newTask("spawner-42", "default", "spawner", axonv1alpha1.TaskPhaseSucceeded)
cl, key := setupTest(t, ts, succeededTask)

src := &fakeSource{
items: []source.WorkItem{
{ID: "42", Number: 42, Title: "Fix bug"},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Succeeded task should NOT be retried
var taskList axonv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}
if len(taskList.Items) != 1 {
t.Fatalf("Expected 1 task (succeeded not retried), got %d", len(taskList.Items))
}
if taskList.Items[0].Status.Phase != axonv1alpha1.TaskPhaseSucceeded {
t.Error("Expected succeeded task to remain unchanged")
}
}

func TestRunCycleWithSource_FailedTaskRetryRespectsMaxConcurrency(t *testing.T) {
ts := newTaskSpawner("spawner", "default", int32Ptr(1))
existingTasks := []axonv1alpha1.Task{
newTask("spawner-running", "default", "spawner", axonv1alpha1.TaskPhaseRunning),
newTask("spawner-failed", "default", "spawner", axonv1alpha1.TaskPhaseFailed),
}
cl, key := setupTest(t, ts, existingTasks...)

src := &fakeSource{
items: []source.WorkItem{
{ID: "running", Title: "Running"},
{ID: "failed", Title: "Failed"},
},
}

if err := runCycleWithSource(context.Background(), cl, key, src); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// The failed task is eligible for retry but maxConcurrency=1 is already
// occupied by the running task, so no new task should be created.
// The failed task should still be deleted since it was selected for retry.
var taskList axonv1alpha1.TaskList
if err := cl.List(context.Background(), &taskList, client.InNamespace("default")); err != nil {
t.Fatalf("Listing tasks: %v", err)
}

// Count active tasks - should still be 1 (the running task)
active := 0
for _, task := range taskList.Items {
if task.Status.Phase != axonv1alpha1.TaskPhaseSucceeded && task.Status.Phase != axonv1alpha1.TaskPhaseFailed {
active++
}
}
if active > 1 {
t.Errorf("Expected at most 1 active task (maxConcurrency=1), got %d", active)
}
}
Loading