Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions oncetask/fs_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,61 @@ type Recurrence struct {
ExDates []string `json:"exdates" firestore:"exdates"` // Exception dates to skip
}

// ResetStatus represents the outcome of a reset operation for a single task.
type ResetStatus string

const (
// ResetStatusSuccess indicates the task was successfully reset to pending state.
ResetStatusSuccess ResetStatus = "reset"
// ResetStatusNotFound indicates the task ID does not exist.
ResetStatusNotFound ResetStatus = "not_found"
// ResetStatusDifferentEnv indicates the task exists in a different environment.
ResetStatusDifferentEnv ResetStatus = "different_env"
// ResetStatusNotTerminal indicates the task is not in a terminal state (already pending/running).
// This is an idempotent case and not an error.
ResetStatusNotTerminal ResetStatus = "not_terminal"
// ResetStatusError indicates an error occurred while resetting the task.
ResetStatusError ResetStatus = "error"
)

// ResetResult contains the result of resetting a single task.
type ResetResult struct {
TaskID string // ID of the task
Status ResetStatus // Status of the reset operation
Error error // Error if Status is ResetStatusError
}

// ResetTasksResult contains the results of resetting multiple tasks.
type ResetTasksResult struct {
Results []ResetResult // Result for each task
}

// ResetCount returns the number of tasks that were successfully reset.
func (r *ResetTasksResult) ResetCount() int {
count := 0
for _, result := range r.Results {
if result.Status == ResetStatusSuccess {
count++
}
}
return count
}

// Errors returns all errors encountered during the reset operation.
// Returns nil if there were no errors.
func (r *ResetTasksResult) Errors() []error {
var errs []error
for _, result := range r.Results {
if result.Error != nil {
errs = append(errs, result.Error)
}
}
if len(errs) == 0 {
return nil
}
return errs
}

// getTaskEnv returns the task environment from the `EnvVariable` environment variable.
// If not set, returns `DefaultEnv`.
func getTaskEnv() string {
Expand Down
9 changes: 5 additions & 4 deletions oncetask/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ type Manager[TaskKind ~string] interface {

// ResetTask resets a single task back to pending state for re-execution.
// Only applies to tasks in terminal states (doneAt != "").
// Idempotent: no-op if task is already pending/running.
// Returns error if task doesn't exist or is in a different environment.
// Idempotent: returns nil if task is already pending/running.
// Clears all execution state (Attempts, Errors, Result) and cancellation state (IsCancelled).
// Sets WaitUntil=NoWait for immediate execution.
ResetTask(ctx context.Context, taskID string) error

// ResetTasksByIds resets multiple tasks back to pending state (bulk operation via BulkWriter).
// Returns count of tasks reset. Partial failures return both count and aggregated error.
// Returns ResetTasksResult containing the status for each task.
// Only resets tasks in terminal states (doneAt != "").
// Idempotent: Tasks already in non-terminal states are skipped (no-op).
ResetTasksByIds(ctx context.Context, taskIDs []string) (int, error)
// Possible statuses: Success, NotFound, DifferentEnv, NotTerminal (idempotent), Error.
ResetTasksByIds(ctx context.Context, taskIDs []string) ResetTasksResult
}
139 changes: 112 additions & 27 deletions oncetask/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import (
"context"
"errors"

Check failure on line 5 in oncetask/reset.go

View workflow job for this annotation

GitHub Actions / Lint

"errors" imported and not used (typecheck)

Check failure on line 5 in oncetask/reset.go

View workflow job for this annotation

GitHub Actions / Lint

"errors" imported and not used) (typecheck)

Check failure on line 5 in oncetask/reset.go

View workflow job for this annotation

GitHub Actions / Build

"errors" imported and not used

Check failure on line 5 in oncetask/reset.go

View workflow job for this annotation

GitHub Actions / Test (1.25.x)

"errors" imported and not used
"fmt"
"log/slog"

Expand All @@ -23,16 +23,46 @@
// - COMPLETED tasks (e.g., to reprocess data after bug fix)
// - FAILED tasks (e.g., to retry after fixing root cause)
// - CANCELLED tasks (e.g., to resume after cancellation was reverted)
//
// Returns an error if:
// - Task does not exist (ResetStatusNotFound)
// - Task exists in a different environment (ResetStatusDifferentEnv)
// - Error occurred during reset operation (ResetStatusError)
//
// Returns nil (success) if:
// - Task was successfully reset (ResetStatusSuccess)
// - Task is already pending/running (ResetStatusNotTerminal, idempotent)
func (m *firestoreOnceTaskManager[TaskKind]) ResetTask(
ctx context.Context,
taskID string,
) error {
_, err := m.ResetTasksByIds(ctx, []string{taskID})
return err
result := m.ResetTasksByIds(ctx, []string{taskID})

if len(result.Results) == 0 {
return fmt.Errorf("task %s: no result returned", taskID)
}

taskResult := result.Results[0]

switch taskResult.Status {
case ResetStatusSuccess:
return nil
case ResetStatusNotTerminal:
// Idempotent case - task is already pending/running
return nil
case ResetStatusNotFound:
return fmt.Errorf("task %s does not exist", taskID)
case ResetStatusDifferentEnv:
return fmt.Errorf("task %s exists in a different environment", taskID)
case ResetStatusError:
return fmt.Errorf("task %s: %w", taskID, taskResult.Error)
default:
return fmt.Errorf("task %s: unknown reset status %s", taskID, taskResult.Status)
}
}

// ResetTasksByIds resets multiple tasks back to pending state (bulk operation via BulkWriter).
// Returns count of tasks reset. Partial failures return both count and aggregated error.
// Returns ResetTasksResult containing the result for each task.
// Only resets tasks in terminal states (doneAt != "").
//
// Reset clears all execution and cancellation state:
Expand All @@ -45,49 +75,91 @@
// - CancelledAt = ""
// - Result = nil
//
// Idempotent: Tasks already in non-terminal states are skipped (no-op).
// Possible statuses for each task:
// - ResetStatusSuccess: Task was successfully reset
// - ResetStatusNotFound: Task does not exist
// - ResetStatusDifferentEnv: Task exists in a different environment
// - ResetStatusNotTerminal: Task is already pending/running (idempotent)
// - ResetStatusError: Error occurred during reset
func (m *firestoreOnceTaskManager[TaskKind]) ResetTasksByIds(
ctx context.Context,
taskIDs []string,
) (int, error) {
) ResetTasksResult {
results := make([]ResetResult, 0, len(taskIDs))

if len(taskIDs) == 0 {
return 0, nil
return ResetTasksResult{Results: results}
}

// Fetch all tasks first
docRefs := make([]*firestore.DocumentRef, len(taskIDs))
taskIDToIndex := make(map[string]int, len(taskIDs))
for i, id := range taskIDs {
docRefs[i] = m.queryBuilder.doc(id)
taskIDToIndex[id] = i
// Pre-populate results with not found status
results = append(results, ResetResult{
TaskID: id,
Status: ResetStatusNotFound,
})
}

docSnaps, err := m.client.GetAll(ctx, docRefs)
if err != nil {
return 0, fmt.Errorf("failed to fetch tasks: %w", err)
// If we can't fetch tasks at all, mark all as errors
for i := range results {
results[i].Status = ResetStatusError
results[i].Error = fmt.Errorf("failed to fetch tasks: %w", err)
}
return ResetTasksResult{Results: results}
}

bw := m.client.BulkWriter(ctx)
jobs := make([]*firestore.BulkWriterJob, 0, len(docSnaps))
type jobInfo struct {
job *firestore.BulkWriterJob
taskID string
taskType TaskKind
}
jobInfos := make([]jobInfo, 0, len(docSnaps))
env := getTaskEnv()
var errs []error
taskTypes := make(map[TaskKind]struct{})

for _, docSnap := range docSnaps {
taskID := docSnap.Ref.ID
idx := taskIDToIndex[taskID]

if !docSnap.Exists() {
results[idx] = ResetResult{
TaskID: taskID,
Status: ResetStatusNotFound,
}
continue
}

var task OnceTask[TaskKind]
if err := docSnap.DataTo(&task); err != nil {
errs = append(errs, fmt.Errorf("failed to parse task %s: %w", docSnap.Ref.ID, err))
results[idx] = ResetResult{
TaskID: taskID,
Status: ResetStatusError,
Error: fmt.Errorf("failed to parse task: %w", err),
}
continue
}

if task.Env != env {
continue // Environment isolation
results[idx] = ResetResult{
TaskID: taskID,
Status: ResetStatusDifferentEnv,
}
continue
}

if task.DoneAt == "" {
continue // Not in terminal state - already pending/running
results[idx] = ResetResult{
TaskID: taskID,
Status: ResetStatusNotTerminal,
}
continue
}

// Reset all execution and cancellation state
Expand All @@ -104,22 +176,37 @@

job, err := bw.Update(docSnap.Ref, updates)
if err != nil {
errs = append(errs, fmt.Errorf("failed to create update job for task %s: %w", docSnap.Ref.ID, err))
results[idx] = ResetResult{
TaskID: taskID,
Status: ResetStatusError,
Error: fmt.Errorf("failed to create update job: %w", err),
}
} else {
jobs = append(jobs, job)
jobInfos = append(jobInfos, jobInfo{
job: job,
taskID: taskID,
taskType: task.Type,
})
taskTypes[task.Type] = struct{}{}
}
}

bw.End()

resetCount := 0

for _, job := range jobs {
if _, err := job.Results(); err == nil {
resetCount++
// Process job results
for _, info := range jobInfos {
idx := taskIDToIndex[info.taskID]
if _, err := info.job.Results(); err == nil {
results[idx] = ResetResult{
TaskID: info.taskID,
Status: ResetStatusSuccess,
}
} else {
errs = append(errs, err)
results[idx] = ResetResult{
TaskID: info.taskID,
Status: ResetStatusError,
Error: err,
}
}
}

Expand All @@ -128,12 +215,10 @@
m.evaluateNow(taskType)
}

if len(errs) > 0 {
return resetCount, fmt.Errorf("partial reset: %d succeeded, %d failed: %w",
resetCount, len(errs), errors.Join(errs...))
}

slog.InfoContext(ctx, "Reset tasks by IDs", "count", resetCount, "total", len(taskIDs))
result := ResetTasksResult{Results: results}
slog.InfoContext(ctx, "Reset tasks by IDs",
"resetCount", result.ResetCount(),
"total", len(taskIDs))

return resetCount, nil
return result
}
Loading
Loading