diff --git a/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala b/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala new file mode 100644 index 000000000000..e2cb2d7bf8b3 --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/task/TaskErrorLogger.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.task + +import org.apache.spark.{TaskContext, TaskKilledException} +import org.apache.spark.internal.Logging + +/** Utility object for logging task errors in a consistent manner. */ +object TaskErrorLogger extends Logging { + + /** + * Logs task failure errors with appropriate filtering. + * + * @param context + * The TaskContext of the failed task + * @param error + * The error that caused the task failure + */ + def logTaskFailure(context: TaskContext, error: Throwable): Unit = { + error match { + case e: TaskKilledException if e.reason == "another attempt succeeded" => + // This is an expected scenario in speculative execution, no need to log as error + logDebug(s"Task ${context.taskAttemptId()} was killed because another attempt succeeded") + case _ => + // Log genuine errors for debugging + logError(s"Task ${context.taskAttemptId()} failed with error: ", error) + } + } +} diff --git a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala index bf7c8cd42277..2c386e52ff16 100644 --- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala +++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala @@ -18,9 +18,9 @@ package org.apache.spark.task import org.apache.gluten.config.GlutenCoreConfig import org.apache.gluten.memory.SimpleMemoryUsageRecorder -import org.apache.gluten.task.TaskListener +import org.apache.gluten.task.{TaskErrorLogger, TaskListener} -import org.apache.spark.{TaskContext, TaskFailedReason, TaskKilledException, UnknownReason} +import org.apache.spark.{TaskContext, TaskFailedReason, UnknownReason} import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{SparkTaskUtil, TaskCompletionListener, TaskFailureListener} @@ -204,16 +204,14 @@ object TaskResources extends TaskListener with Logging { } val registry = new TaskResourceRegistry RESOURCE_REGISTRIES.put(tc, registry) + // TODO: Propose upstream Spark changes for resilient error logging when + // CompletionListener crashes. Using TaskErrorLogger as workaround tc.addTaskFailureListener( // in case of crashing in task completion listener, errors may be swallowed new TaskFailureListener { override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { - // TODO: - // The general duty of printing error message should not reside in memory module - error match { - case e: TaskKilledException if e.reason == "another attempt succeeded" => - case _ => logError(s"Task ${context.taskAttemptId()} failed by error: ", error) - } + // Delegate error logging to TaskErrorLogger utility + TaskErrorLogger.logTaskFailure(context, error) } }) tc.addTaskCompletionListener(new TaskCompletionListener {