Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 17 additions & 40 deletions internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,52 +194,30 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
// if we get the poison pill, we should see if any job tasks are failed, if so we must exist
// if all jobs are either succeeded or skipped, we can exit
case struct{}:
remainingTasks := map[string]bool{}
pendingTasks := map[string]bool{}
for _, x := range taskNames {
remainingTasks[x] = true
pendingTasks[x] = true
}

for _, node := range subgraph.Nodes {
// Check if task should cause immediate exit
if node.Phase == "failed" && node.Task.GetRestartPolicy() == "Never" {
logger.Printf("🚫 exiting because task %q failed and should not be restarted", node.Name)
cancel()
continue
// if all requests tasks are succeeded, we can exit
{
pendingTasks := map[string]bool{}
for _, x := range taskNames {
pendingTasks[x] = true
}

// Check if task is complete and should be removed from tracking
isComplete := false
switch node.Phase {
case "succeeded", "skipped":
isComplete = true
case "running", "stalled":
// Services are complete when running (ready to serve)
// Jobs are only complete when succeeded
isComplete = node.Task.GetType() == types.TaskTypeService
}

if isComplete {
delete(remainingTasks, node.Name)
if node.Task.GetRestartPolicy() != "Always" {
for _, node := range subgraph.Nodes {
if (node.Phase == "succeeded" || node.Phase == "skipped") && node.Task.GetRestartPolicy() != "Always" {
delete(pendingTasks, node.Name)
}
}

if len(pendingTasks) == 0 {
logger.Println("exiting because all requested tasks completed and none should be restarted")
cancel()
}
}

if len(pendingTasks) == 0 {
logger.Println("βœ… exiting because all requested tasks completed and none should be restarted")
cancel()
} else if len(remainingTasks) == 0 {
logger.Println("πŸ”΅ all requested tasks are running:")
// print a list of running tasks, and their ports
for _, node := range subgraph.Nodes {
if (node.Phase == "running" || node.Phase == "stalled") && node.Task.Ports != nil {
for _, port := range node.Task.Ports {
logger.Printf(" - %s: http://localhost:%d\n", node.Name, port.HostPort)
}
}
// if a task that should not be restarted failed, we must exit
for _, node := range subgraph.Nodes {
if node.Phase == "failed" && node.Task.GetRestartPolicy() == "Never" {
logger.Printf("exiting because task %q should not be restarted, and it failed", node.Name)
cancel()
}
}

Expand Down Expand Up @@ -301,7 +279,6 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
stallTimers[node.Name].Reset(node.Task.GetStalledTimeout())
logger.Println(node.Message)
statusEvents <- node
events <- poisonPill
}

setNodeStatus(node, "waiting", "")
Expand Down