diff --git a/oncetask/fs_model.go b/oncetask/fs_model.go index e101e21..99d963b 100644 --- a/oncetask/fs_model.go +++ b/oncetask/fs_model.go @@ -43,6 +43,61 @@ type Recurrence struct { ExDates []string `json:"exdates" firestore:"exdates"` // Exception dates to skip } +// ResetStatus represents the outcome of a reset operation for a single task. +type ResetStatus string + +const ( + // ResetStatusSuccess indicates the task was successfully reset to pending state. + ResetStatusSuccess ResetStatus = "reset" + // ResetStatusNotFound indicates the task ID does not exist. + ResetStatusNotFound ResetStatus = "not_found" + // ResetStatusDifferentEnv indicates the task exists in a different environment. + ResetStatusDifferentEnv ResetStatus = "different_env" + // ResetStatusNotTerminal indicates the task is not in a terminal state (already pending/running). + // This is an idempotent case and not an error. + ResetStatusNotTerminal ResetStatus = "not_terminal" + // ResetStatusError indicates an error occurred while resetting the task. + ResetStatusError ResetStatus = "error" +) + +// ResetResult contains the result of resetting a single task. +type ResetResult struct { + TaskID string // ID of the task + Status ResetStatus // Status of the reset operation + Error error // Error if Status is ResetStatusError +} + +// ResetTasksResult contains the results of resetting multiple tasks. +type ResetTasksResult struct { + Results []ResetResult // Result for each task +} + +// ResetCount returns the number of tasks that were successfully reset. +func (r *ResetTasksResult) ResetCount() int { + count := 0 + for _, result := range r.Results { + if result.Status == ResetStatusSuccess { + count++ + } + } + return count +} + +// Errors returns all errors encountered during the reset operation. +// Returns nil if there were no errors. +func (r *ResetTasksResult) Errors() []error { + var errs []error + for _, result := range r.Results { + if result.Error != nil { + errs = append(errs, result.Error) + } + } + if len(errs) == 0 { + return nil + } + return errs +} + // getTaskEnv returns the task environment from the `EnvVariable` environment variable. // If not set, returns `DefaultEnv`. func getTaskEnv() string { diff --git a/oncetask/interface.go b/oncetask/interface.go index ea42eba..eb86e97 100644 --- a/oncetask/interface.go +++ b/oncetask/interface.go @@ -143,14 +143,15 @@ type Manager[TaskKind ~string] interface { // ResetTask resets a single task back to pending state for re-execution. // Only applies to tasks in terminal states (doneAt != ""). - // Idempotent: no-op if task is already pending/running. + // Returns error if task doesn't exist or is in a different environment. + // Idempotent: returns nil if task is already pending/running. // Clears all execution state (Attempts, Errors, Result) and cancellation state (IsCancelled). // Sets WaitUntil=NoWait for immediate execution. ResetTask(ctx context.Context, taskID string) error // ResetTasksByIds resets multiple tasks back to pending state (bulk operation via BulkWriter). - // Returns count of tasks reset. Partial failures return both count and aggregated error. + // Returns ResetTasksResult containing the status for each task. // Only resets tasks in terminal states (doneAt != ""). - // Idempotent: Tasks already in non-terminal states are skipped (no-op). - ResetTasksByIds(ctx context.Context, taskIDs []string) (int, error) + // Possible statuses: Success, NotFound, DifferentEnv, NotTerminal (idempotent), Error. + ResetTasksByIds(ctx context.Context, taskIDs []string) ResetTasksResult } diff --git a/oncetask/reset.go b/oncetask/reset.go index 2a70679..8eca3d3 100644 --- a/oncetask/reset.go +++ b/oncetask/reset.go @@ -23,16 +23,46 @@ import ( // - COMPLETED tasks (e.g., to reprocess data after bug fix) // - FAILED tasks (e.g., to retry after fixing root cause) // - CANCELLED tasks (e.g., to resume after cancellation was reverted) +// +// Returns an error if: +// - Task does not exist (ResetStatusNotFound) +// - Task exists in a different environment (ResetStatusDifferentEnv) +// - Error occurred during reset operation (ResetStatusError) +// +// Returns nil (success) if: +// - Task was successfully reset (ResetStatusSuccess) +// - Task is already pending/running (ResetStatusNotTerminal, idempotent) func (m *firestoreOnceTaskManager[TaskKind]) ResetTask( ctx context.Context, taskID string, ) error { - _, err := m.ResetTasksByIds(ctx, []string{taskID}) - return err + result := m.ResetTasksByIds(ctx, []string{taskID}) + + if len(result.Results) == 0 { + return fmt.Errorf("task %s: no result returned", taskID) + } + + taskResult := result.Results[0] + + switch taskResult.Status { + case ResetStatusSuccess: + return nil + case ResetStatusNotTerminal: + // Idempotent case - task is already pending/running + return nil + case ResetStatusNotFound: + return fmt.Errorf("task %s does not exist", taskID) + case ResetStatusDifferentEnv: + return fmt.Errorf("task %s exists in a different environment", taskID) + case ResetStatusError: + return fmt.Errorf("task %s: %w", taskID, taskResult.Error) + default: + return fmt.Errorf("task %s: unknown reset status %s", taskID, taskResult.Status) + } } // ResetTasksByIds resets multiple tasks back to pending state (bulk operation via BulkWriter). -// Returns count of tasks reset. Partial failures return both count and aggregated error. +// Returns ResetTasksResult containing the result for each task. // Only resets tasks in terminal states (doneAt != ""). // // Reset clears all execution and cancellation state: @@ -45,49 +75,91 @@ func (m *firestoreOnceTaskManager[TaskKind]) ResetTask( // - CancelledAt = "" // - Result = nil // -// Idempotent: Tasks already in non-terminal states are skipped (no-op). +// Possible statuses for each task: +// - ResetStatusSuccess: Task was successfully reset +// - ResetStatusNotFound: Task does not exist +// - ResetStatusDifferentEnv: Task exists in a different environment +// - ResetStatusNotTerminal: Task is already pending/running (idempotent) +// - ResetStatusError: Error occurred during reset func (m *firestoreOnceTaskManager[TaskKind]) ResetTasksByIds( ctx context.Context, taskIDs []string, -) (int, error) { +) ResetTasksResult { + results := make([]ResetResult, 0, len(taskIDs)) + if len(taskIDs) == 0 { - return 0, nil + return ResetTasksResult{Results: results} } // Fetch all tasks first docRefs := make([]*firestore.DocumentRef, len(taskIDs)) + taskIDToIndex := make(map[string]int, len(taskIDs)) for i, id := range taskIDs { docRefs[i] = m.queryBuilder.doc(id) + taskIDToIndex[id] = i + // Pre-populate results with not found status + results = append(results, ResetResult{ + TaskID: id, + Status: ResetStatusNotFound, + }) } docSnaps, err := m.client.GetAll(ctx, docRefs) if err != nil { - return 0, fmt.Errorf("failed to fetch tasks: %w", err) + // If we can't fetch tasks at all, mark all as errors + for i := range results { + results[i].Status = ResetStatusError + results[i].Error = fmt.Errorf("failed to fetch tasks: %w", err) + } + return ResetTasksResult{Results: results} } bw := m.client.BulkWriter(ctx) - jobs := make([]*firestore.BulkWriterJob, 0, len(docSnaps)) + type jobInfo struct { + job *firestore.BulkWriterJob + taskID string + taskType TaskKind + } + jobInfos := make([]jobInfo, 0, len(docSnaps)) env := getTaskEnv() - var errs []error taskTypes := make(map[TaskKind]struct{}) for _, docSnap := range docSnaps { + taskID := docSnap.Ref.ID + idx := taskIDToIndex[taskID] + if !docSnap.Exists() { + results[idx] = ResetResult{ + TaskID: taskID, + Status: ResetStatusNotFound, + } continue } var task OnceTask[TaskKind] if err := docSnap.DataTo(&task); err != nil { - errs = append(errs, fmt.Errorf("failed to parse task %s: %w", docSnap.Ref.ID, err)) + results[idx] = ResetResult{ + TaskID: taskID, + Status: ResetStatusError, + Error: fmt.Errorf("failed to parse task: %w", err), + } continue } if task.Env != env { - continue // Environment isolation + results[idx] = ResetResult{ + TaskID: taskID, + Status: ResetStatusDifferentEnv, + } + continue } if task.DoneAt == "" { - continue // Not in terminal state - already pending/running + results[idx] = ResetResult{ + TaskID: taskID, + Status: ResetStatusNotTerminal, + } + continue } // Reset all execution and cancellation state @@ -104,22 +176,37 @@ func (m *firestoreOnceTaskManager[TaskKind]) ResetTasksByIds( job, err := bw.Update(docSnap.Ref, updates) if err != nil { - errs = append(errs, fmt.Errorf("failed to create update job for task %s: %w", docSnap.Ref.ID, err)) + results[idx] = ResetResult{ + TaskID: taskID, + Status: ResetStatusError, + Error: fmt.Errorf("failed to create update job: %w", err), + } } else { - jobs = append(jobs, job) + jobInfos = append(jobInfos, jobInfo{ + job: job, + taskID: taskID, + taskType: task.Type, + }) taskTypes[task.Type] = struct{}{} } } bw.End() - resetCount := 0 - - for _, job := range jobs { - if _, err := job.Results(); err == nil { - resetCount++ + // Process job results + for _, info := range jobInfos { + idx := taskIDToIndex[info.taskID] + if _, err := info.job.Results(); err == nil { + results[idx] = ResetResult{ + TaskID: info.taskID, + Status: ResetStatusSuccess, + } } else { - errs = append(errs, err) + results[idx] = ResetResult{ + TaskID: info.taskID, + Status: ResetStatusError, + Error: err, + } } } @@ -128,12 +215,10 @@ func (m *firestoreOnceTaskManager[TaskKind]) ResetTasksByIds( m.evaluateNow(taskType) } - if len(errs) > 0 { - return resetCount, fmt.Errorf("partial reset: %d succeeded, %d failed: %w", - resetCount, len(errs), errors.Join(errs...)) - } - - slog.InfoContext(ctx, "Reset tasks by IDs", "count", resetCount, "total", len(taskIDs)) + result := ResetTasksResult{Results: results} + slog.InfoContext(ctx, "Reset tasks by IDs", + "resetCount", result.ResetCount(), + "total", len(taskIDs)) - return resetCount, nil + return result } diff --git a/oncetask/reset_test.go b/oncetask/reset_test.go index aab4c00..1925828 100644 --- a/oncetask/reset_test.go +++ b/oncetask/reset_test.go @@ -1,6 +1,7 @@ package oncetask import ( + "fmt" "testing" "time" ) @@ -342,3 +343,138 @@ func TestResetPreservesStructuralMetadata(t *testing.T) { } }) } + +func TestResetTaskValidation(t *testing.T) { + // These tests document the expected validation behavior for ResetTask(). + // ResetTask() should return an error when: + // 1. Task does not exist (ResetStatusNotFound) + // 2. Task exists in a different environment (ResetStatusDifferentEnv) + // 3. Error occurred during reset (ResetStatusError) + // ResetTask() should succeed (return nil) when: + // 4. Task is successfully reset (ResetStatusSuccess) + // 5. Task is already pending (ResetStatusNotTerminal, idempotent no-op) + + t.Run("Validation: Non-existent task returns NotFound status", func(t *testing.T) { + // When a task doesn't exist, ResetTasksByIds should return + // ResetStatusNotFound for that task + + taskID := "non-existent-task" + expectedStatus := ResetStatusNotFound + + t.Logf("Expected: ResetTasksByIds should return %s for non-existent task %s", + expectedStatus, taskID) + }) + + t.Run("Validation: Different environment returns DifferentEnv status", func(t *testing.T) { + // When a task exists in a different environment, ResetTasksByIds should return + // ResetStatusDifferentEnv for that task + + taskID := "task-in-different-env" + expectedStatus := ResetStatusDifferentEnv + + t.Logf("Expected: ResetTasksByIds should return %s for task in different environment %s", + expectedStatus, taskID) + }) + + t.Run("Validation: Pending task returns NotTerminal status (idempotent)", func(t *testing.T) { + // When a task is already pending, ResetTasksByIds should return + // ResetStatusNotTerminal (idempotent case) + + task := createPendingTask("pending-task") + expectedStatus := ResetStatusNotTerminal + + if task.DoneAt != "" { + t.Fatalf("Test setup error: task should be pending (doneAt empty)") + } + + t.Logf("Expected: ResetTasksByIds should return %s for already-pending task %s", + expectedStatus, task.Id) + }) + + t.Run("Validation: Terminal task returns Success status", func(t *testing.T) { + // When a task is in a terminal state and in the current environment, + // ResetTasksByIds should return ResetStatusSuccess + + task := createCompletedTask("completed-task") + currentEnv := getTaskEnv() + expectedStatus := ResetStatusSuccess + + if task.DoneAt == "" { + t.Fatalf("Test setup error: task should be in terminal state (doneAt set)") + } + if task.Env != currentEnv { + t.Fatalf("Test setup error: task should be in current environment") + } + + t.Logf("Expected: ResetTasksByIds should return %s for terminal task %s in current environment", + expectedStatus, task.Id) + }) +} + +func TestResetTasksByIdsValidation(t *testing.T) { + // These tests document the expected validation behavior for ResetTasksByIds(). + // ResetTasksByIds returns ResetTasksResult with detailed status for each task. + + t.Run("Bulk reset: Returns detailed result for each task", func(t *testing.T) { + // When ResetTasksByIds() is called with multiple task IDs, + // it should return a ResetTasksResult with a result for each task. + // Each result contains: TaskID, Status, and Error (if applicable) + + taskIDs := []string{"completed-1", "non-existent", "different-env", "already-pending"} + + // Test expectation: ResetTasksByIds(ctx, taskIDs) should return: + // ResetTasksResult with 4 results: + // - completed-1: ResetStatusSuccess + // - non-existent: ResetStatusNotFound + // - different-env: ResetStatusDifferentEnv + // - already-pending: ResetStatusNotTerminal + + t.Logf("Expected: ResetTasksByIds should return ResetTasksResult with %d results, each with appropriate status", + len(taskIDs)) + }) + + t.Run("Bulk reset: ResetCount helper returns successful resets", func(t *testing.T) { + // ResetTasksResult.ResetCount() should return the count of tasks + // with ResetStatusSuccess + + // Example result with mixed statuses: + result := ResetTasksResult{ + Results: []ResetResult{ + {TaskID: "task-1", Status: ResetStatusSuccess}, + {TaskID: "task-2", Status: ResetStatusNotFound}, + {TaskID: "task-3", Status: ResetStatusSuccess}, + {TaskID: "task-4", Status: ResetStatusNotTerminal}, + }, + } + + expectedCount := 2 // two successful resets + actualCount := result.ResetCount() + + if actualCount != expectedCount { + t.Errorf("ResetCount() = %d, want %d", actualCount, expectedCount) + } + }) + + t.Run("Bulk reset: Errors helper returns all errors", func(t *testing.T) { + // ResetTasksResult.Errors() should return all non-nil errors + // from results with ResetStatusError + + testErr1 := fmt.Errorf("parse error") + testErr2 := fmt.Errorf("update error") + + result := ResetTasksResult{ + Results: []ResetResult{ + {TaskID: "task-1", Status: ResetStatusSuccess}, + {TaskID: "task-2", Status: ResetStatusError, Error: testErr1}, + {TaskID: "task-3", Status: ResetStatusError, Error: testErr2}, + {TaskID: "task-4", Status: ResetStatusNotFound}, + }, + } + + errors := result.Errors() + + if len(errors) != 2 { + t.Errorf("Errors() returned %d errors, want 2", len(errors)) + } + }) +}