Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
common_metrics "github.com/armadaproject/armada/internal/common/metrics"
"github.com/armadaproject/armada/internal/common/task"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/executor/categorizer"
"github.com/armadaproject/armada/internal/executor/configuration"
executor_context "github.com/armadaproject/armada/internal/executor/context"
"github.com/armadaproject/armada/internal/executor/job"
Expand Down Expand Up @@ -202,6 +203,11 @@ func setupExecutorApiComponents(
ctx.Fatalf("Config error in failed pod checks: %s", err)
}

classifier, err := categorizer.NewClassifier(config.Application.ErrorCategories)
if err != nil {
ctx.Fatalf("Config error in error categories: %s", err)
}

eventReporter, stopReporter := reporter.NewJobEventReporter(eventSender, clock.RealClock{}, 200)

submitter := job.NewSubmitter(
Expand Down Expand Up @@ -240,6 +246,7 @@ func setupExecutorApiComponents(
pendingPodChecker,
failedPodChecker,
config.Kubernetes.StuckTerminatingPodExpiry,
classifier,
)
if err != nil {
ctx.Fatalf("Failed to create pod issue service: %s", err)
Expand All @@ -249,6 +256,7 @@ func setupExecutorApiComponents(
clusterContext,
eventReporter,
podIssueService,
classifier,
)
if err != nil {
ctx.Fatalf("Failed to create job state reporter: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/job/processors/preempt_runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (j *RunPreemptedProcessor) reportPodPreempted(run *job.RunState, pod *v1.Po
if err != nil {
return fmt.Errorf("failed creating preempted event because - %s", err)
}
failedEvent, err := reporter.CreateSimpleJobFailedEvent(pod, "Run preempted", "", j.clusterContext.GetClusterId(), armadaevents.KubernetesReason_AppError)
failedEvent, err := reporter.CreateSimpleJobFailedEvent(pod, "Run preempted", "", j.clusterContext.GetClusterId(), armadaevents.KubernetesReason_AppError, nil)
if err != nil {
return fmt.Errorf("failed creating failed event because - %s", err)
}
Expand Down
14 changes: 9 additions & 5 deletions internal/executor/reporter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
v1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"

"github.com/armadaproject/armada/internal/executor/categorizer"
"github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/internal/executor/util"
"github.com/armadaproject/armada/pkg/armadaevents"
)

func CreateEventForCurrentState(pod *v1.Pod, clusterId string) (*armadaevents.EventSequence, error) {
func CreateEventForCurrentState(pod *v1.Pod, clusterId string, classifier *categorizer.Classifier) (*armadaevents.EventSequence, error) {
phase := pod.Status.Phase
sequence := createEmptySequence(pod)
jobId, runId, err := extractIds(pod)
Expand Down Expand Up @@ -80,13 +81,15 @@ func CreateEventForCurrentState(pod *v1.Pod, clusterId string) (*armadaevents.Ev
})
return sequence, nil
case v1.PodFailed:
failureInfo := util.ExtractFailureInfo(pod, classifier.Classify(pod))
return CreateJobFailedEvent(
pod,
util.ExtractPodFailedReason(pod),
util.ExtractPodFailureCause(pod),
"",
util.ExtractFailedPodContainerStatuses(pod, clusterId),
clusterId)
clusterId,
failureInfo)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
case v1.PodSucceeded:
sequence.Events = append(sequence.Events, &armadaevents.EventSequence_Event{
Created: now,
Expand Down Expand Up @@ -203,12 +206,12 @@ func CreateSimpleJobPreemptedEvent(pod *v1.Pod) (*armadaevents.EventSequence, er
return sequence, nil
}

func CreateSimpleJobFailedEvent(pod *v1.Pod, reason string, debugMessage string, clusterId string, cause armadaevents.KubernetesReason) (*armadaevents.EventSequence, error) {
return CreateJobFailedEvent(pod, reason, cause, debugMessage, []*armadaevents.ContainerError{}, clusterId)
func CreateSimpleJobFailedEvent(pod *v1.Pod, reason string, debugMessage string, clusterId string, cause armadaevents.KubernetesReason, failureInfo *armadaevents.FailureInfo) (*armadaevents.EventSequence, error) {
return CreateJobFailedEvent(pod, reason, cause, debugMessage, []*armadaevents.ContainerError{}, clusterId, failureInfo)
}

func CreateJobFailedEvent(pod *v1.Pod, reason string, cause armadaevents.KubernetesReason, debugMessage string,
containerStatuses []*armadaevents.ContainerError, clusterId string,
containerStatuses []*armadaevents.ContainerError, clusterId string, failureInfo *armadaevents.FailureInfo,
) (*armadaevents.EventSequence, error) {
sequence := createEmptySequence(pod)
jobId, runId, err := extractIds(pod)
Expand Down Expand Up @@ -241,6 +244,7 @@ func CreateJobFailedEvent(pod *v1.Pod, reason string, cause armadaevents.Kuberne
DebugMessage: debugMessage,
},
},
FailureInfo: failureInfo,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/reporter/event_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func generateEventMessages(t *testing.T, count int) []EventMessage {

for i := 0; i < count; i++ {
pod := makeTestPod(v1.PodRunning)
event, err := CreateEventForCurrentState(pod, "cluster-1")
event, err := CreateEventForCurrentState(pod, "cluster-1", nil)
require.NoError(t, err)
result = append(result, EventMessage{Event: event, JobRunId: uuid.New().String()})
}
Expand Down
84 changes: 78 additions & 6 deletions internal/executor/reporter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"

"github.com/armadaproject/armada/internal/common/errormatch"
"github.com/armadaproject/armada/internal/executor/categorizer"
"github.com/armadaproject/armada/pkg/armadaevents"
)

func TestCreateEventForCurrentState_WhenPodPending(t *testing.T) {
pod := makeTestPod(v1.PodPending)

result, err := CreateEventForCurrentState(pod, "cluster1")
result, err := CreateEventForCurrentState(pod, "cluster1", nil)
assert.Nil(t, err)

assert.Len(t, result.Events, 1)
Expand All @@ -25,7 +28,7 @@ func TestCreateEventForCurrentState_WhenPodPending(t *testing.T) {
func TestCreateEventForCurrentState_WhenPodRunning(t *testing.T) {
pod := makeTestPod(v1.PodRunning)

result, err := CreateEventForCurrentState(pod, "cluster1")
result, err := CreateEventForCurrentState(pod, "cluster1", nil)
assert.Nil(t, err)

assert.Len(t, result.Events, 1)
Expand All @@ -37,20 +40,89 @@ func TestCreateEventForCurrentState_WhenPodRunning(t *testing.T) {
func TestCreateEventForCurrentState_WhenPodFailed(t *testing.T) {
pod := makeTestPod(v1.PodFailed)

result, err := CreateEventForCurrentState(pod, "cluster1")
result, err := CreateEventForCurrentState(pod, "cluster1", nil)
assert.Nil(t, err)

assert.Len(t, result.Events, 1)
event, ok := result.Events[0].Event.(*armadaevents.EventSequence_Event_JobRunErrors)
assert.True(t, ok)
assert.Len(t, event.JobRunErrors.Errors, 1)
assert.True(t, event.JobRunErrors.Errors[0].GetPodError() != nil)
assert.NotNil(t, event.JobRunErrors.Errors[0].GetPodError())
assert.NotNil(t, event.JobRunErrors.Errors[0].GetFailureInfo(), "FailureInfo should always be set on failed events")
}

func TestCreateEventForCurrentState_WhenPodFailed_WithClassifier(t *testing.T) {
pod := makeTestPod(v1.PodFailed)
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{
Name: "main",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 74,
Reason: "Error",
Message: "custom error",
},
},
},
}

classifier, err := categorizer.NewClassifier([]categorizer.CategoryConfig{
{
Name: "custom-error",
Rules: []categorizer.CategoryRule{
{OnExitCodes: &errormatch.ExitCodeMatcher{Operator: errormatch.ExitCodeOperatorIn, Values: []int32{74}}},
},
},
})
require.NoError(t, err)

result, err := CreateEventForCurrentState(pod, "cluster1", classifier)
assert.NoError(t, err)

assert.Len(t, result.Events, 1)
event, ok := result.Events[0].Event.(*armadaevents.EventSequence_Event_JobRunErrors)
assert.True(t, ok)
assert.Len(t, event.JobRunErrors.Errors, 1)

failureInfo := event.JobRunErrors.Errors[0].GetFailureInfo()
require.NotNil(t, failureInfo)
assert.Equal(t, int32(74), failureInfo.ExitCode)
assert.Equal(t, "custom error", failureInfo.TerminationMessage)
assert.Equal(t, []string{"custom-error"}, failureInfo.Categories)
}

func TestCreateEventForCurrentState_WhenPodFailed_NilClassifier(t *testing.T) {
pod := makeTestPod(v1.PodFailed)
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{
Name: "main",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 1,
Reason: "Error",
},
},
},
}

result, err := CreateEventForCurrentState(pod, "cluster1", nil)
assert.NoError(t, err)
require.Len(t, result.Events, 1)

event, ok := result.Events[0].Event.(*armadaevents.EventSequence_Event_JobRunErrors)
require.True(t, ok)
require.Len(t, event.JobRunErrors.Errors, 1)

failureInfo := event.JobRunErrors.Errors[0].GetFailureInfo()
require.NotNil(t, failureInfo)
assert.Equal(t, int32(1), failureInfo.ExitCode)
assert.Empty(t, failureInfo.Categories)
}

func TestCreateEventForCurrentState_WhenPodSucceeded(t *testing.T) {
pod := makeTestPod(v1.PodSucceeded)

result, err := CreateEventForCurrentState(pod, "cluster1")
result, err := CreateEventForCurrentState(pod, "cluster1", nil)
assert.Nil(t, err)

assert.Len(t, result.Events, 1)
Expand All @@ -61,7 +133,7 @@ func TestCreateEventForCurrentState_WhenPodSucceeded(t *testing.T) {
func TestCreateEventForCurrentState_ShouldError_WhenPodPhaseUnknown(t *testing.T) {
pod := makeTestPod(v1.PodUnknown)

_, err := CreateEventForCurrentState(pod, "cluster1")
_, err := CreateEventForCurrentState(pod, "cluster1", nil)
assert.Error(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/executor/reporter/job_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestJobEventReporter_SendsAllEventsInBuffer_EachBatchTickInterval(t *testin
}

func createFailedEvent(t *testing.T, pod *v1.Pod) *armadaevents.EventSequence {
event, err := CreateSimpleJobFailedEvent(pod, "failed", "", "cluster1", armadaevents.KubernetesReason_AppError)
event, err := CreateSimpleJobFailedEvent(pod, "failed", "", "cluster1", armadaevents.KubernetesReason_AppError, nil)
require.NoError(t, err)
return event
}
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/service/cluster_allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (allocationService *ClusterAllocationService) sendReturnLeaseEvent(details
}

func (allocationService *ClusterAllocationService) sendFailedEvent(details *job.FailedSubmissionDetails, message string) error {
failEvent, err := reporter.CreateSimpleJobFailedEvent(details.Pod, message, "", allocationService.clusterId.GetClusterId(), armadaevents.KubernetesReason_AppError)
failEvent, err := reporter.CreateSimpleJobFailedEvent(details.Pod, message, "", allocationService.clusterId.GetClusterId(), armadaevents.KubernetesReason_AppError, nil)
if err != nil {
return fmt.Errorf("failed to create return lease event %s", err)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/executor/service/job_state_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"k8s.io/client-go/tools/cache"

log "github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/executor/categorizer"
clusterContext "github.com/armadaproject/armada/internal/executor/context"
domain2 "github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/internal/executor/reporter"
Expand All @@ -17,17 +18,20 @@ type JobStateReporter struct {
eventReporter reporter.EventReporter
clusterContext clusterContext.ClusterContext
podIssueHandler IssueHandler
classifier *categorizer.Classifier
}

func NewJobStateReporter(
clusterContext clusterContext.ClusterContext,
eventReporter reporter.EventReporter,
podIssueHandler IssueHandler,
classifier *categorizer.Classifier,
) (*JobStateReporter, error) {
stateReporter := &JobStateReporter{
eventReporter: eventReporter,
clusterContext: clusterContext,
podIssueHandler: podIssueHandler,
classifier: classifier,
}

_, err := clusterContext.AddPodEventHandler(stateReporter.podEventHandler())
Expand Down Expand Up @@ -86,7 +90,7 @@ func (stateReporter *JobStateReporter) reportCurrentStatus(pod *v1.Pod) {
return
}

event, err := reporter.CreateEventForCurrentState(pod, stateReporter.clusterContext.GetClusterId())
event, err := reporter.CreateEventForCurrentState(pod, stateReporter.clusterContext.GetClusterId(), stateReporter.classifier)
if err != nil {
log.Errorf("Failed to report event: %v", err)
return
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/service/job_state_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func setUpJobStateReporterTest(t *testing.T) (*JobStateReporter, *stubIssueHandl
fakeClusterContext := fakecontext.NewSyncFakeClusterContext()
eventReporter := mocks.NewFakeEventReporter()
issueHandler := &stubIssueHandler{detectAndRegisterFailedPodIssueResult: false, detectAndRegisterFailedPodIssueError: nil}
jobStateReporter, err := NewJobStateReporter(fakeClusterContext, eventReporter, issueHandler)
jobStateReporter, err := NewJobStateReporter(fakeClusterContext, eventReporter, issueHandler, nil)
require.NoError(t, err)
return jobStateReporter, issueHandler, eventReporter, fakeClusterContext
}
Expand Down
8 changes: 7 additions & 1 deletion internal/executor/service/pod_issue_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
log "github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/executor/categorizer"
"github.com/armadaproject/armada/internal/executor/configuration"
executorContext "github.com/armadaproject/armada/internal/executor/context"
"github.com/armadaproject/armada/internal/executor/job"
Expand Down Expand Up @@ -75,6 +76,7 @@ type PodIssueHandler struct {
pendingPodChecker podchecks.PodChecker
failedPodChecker failedpodchecks.RetryChecker
stateChecksConfig configuration.StateChecksConfiguration
classifier *categorizer.Classifier

stuckTerminatingPodExpiry time.Duration

Expand All @@ -93,6 +95,7 @@ func NewPodIssuerHandler(
pendingPodChecker podchecks.PodChecker,
failedPodChecker failedpodchecks.RetryChecker,
stuckTerminatingPodExpiry time.Duration,
classifier *categorizer.Classifier,
) (*PodIssueHandler, error) {
issueHandler := &PodIssueHandler{
jobRunState: jobRunState,
Expand All @@ -101,6 +104,7 @@ func NewPodIssuerHandler(
pendingPodChecker: pendingPodChecker,
failedPodChecker: failedPodChecker,
stateChecksConfig: stateChecksConfig,
classifier: classifier,
stuckTerminatingPodExpiry: stuckTerminatingPodExpiry,
knownPodIssues: map[string]*runIssue{},
podIssueMutex: sync.Mutex{},
Expand Down Expand Up @@ -420,7 +424,9 @@ func (p *PodIssueHandler) handleNonRetryableJobIssue(issue *issue) {
log.Infof("Handling non-retryable issue detected for job %s run %s", issue.RunIssue.JobId, issue.RunIssue.RunId)
podIssue := issue.RunIssue.PodIssue

failedEvent, err := reporter.CreateSimpleJobFailedEvent(podIssue.OriginalPodState, podIssue.Message, podIssue.DebugMessage, p.clusterContext.GetClusterId(), podIssue.Cause)
failureInfo := util.ExtractFailureInfo(podIssue.OriginalPodState, p.classifier.Classify(podIssue.OriginalPodState))

failedEvent, err := reporter.CreateSimpleJobFailedEvent(podIssue.OriginalPodState, podIssue.Message, podIssue.DebugMessage, p.clusterContext.GetClusterId(), podIssue.Cause, failureInfo)
if err != nil {
log.Errorf("Failed to create failed event for job %s because %s", issue.RunIssue.JobId, err)
return
Expand Down
Loading
Loading