Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 8 additions & 47 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -502,35 +502,18 @@ index a206e97c353..79813d8e259 100644

test("SPARK-35884: Explain Formatted") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 93275487f29..510e3087e0f 100644
index 93275487f29..ca79ad8b6d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}

import scala.collection.mutable

+import org.apache.comet.CometConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, Path}

@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
case "" => "_LEGACY_ERROR_TEMP_2062"
case _ => "_LEGACY_ERROR_TEMP_2055"
}
+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
checkErrorMatchPVals(
exception = intercept[SparkException] {
testIgnoreMissingFiles(options)
@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -639,7 +640,8 @@ class FileBasedDataSourceSuite extends QueryTest
}

Seq("parquet", "orc").foreach { format =>
Expand All @@ -540,31 +523,31 @@ index 93275487f29..510e3087e0f 100644
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -955,6 +957,7 @@ class FileBasedDataSourceSuite extends QueryTest
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
+ case smJoin: CometSortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1015,6 +1018,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1056,6 +1060,7 @@ class FileBasedDataSourceSuite extends QueryTest

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
@@ -1240,6 +1245,9 @@ class FileBasedDataSourceSuite extends QueryTest
val filters = df.queryExecution.executedPlan.collect {
case f: FileSourceScanLike => f.dataFilters
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
Expand Down Expand Up @@ -965,7 +948,7 @@ index 3cf2bfd17ab..49728c35c42 100644
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index fa1a64460fc..1d2e215d6a3 100644
index fa1a64460fc..134f0db1fb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -17,6 +17,8 @@
Expand Down Expand Up @@ -1422,28 +1405,6 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index a1147c16cc8..c7a29496328 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException}
import org.apache.spark.sql._
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -968,7 +969,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
}
}

- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
withTable("t") {
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
withView("v1") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index eec396b2e39..bf3f1c769d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
Expand Down
44 changes: 31 additions & 13 deletions native/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,37 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
// Fall back to plain SparkError (no context)
throw_spark_error_as_json(env, spark_error)
} else {
// Not a SparkError, use generic exception
// Check for file-not-found errors from object store
let error_msg = e.to_string();
if error_msg.contains("not found")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FileNotFound detection is substring-based and doesn’t ensure the message matches the Object at location ... not found shape that the JVM shim later parses, so a format change could yield a FileNotFound with an unextractable/garbled path. Consider tightening the signal or carrying a structured path so the shim doesn’t depend on message parsing.

Severity: medium

Other Locations
  • native/core/src/errors.rs:441

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The detection of the error type is fragile and it could match a totally unrelated error. It looks like a best effort detection that in the worst case would lead to a wrongly formatted "full" error. It will be improved with the time when the users provide examples of errors which fail be to detected.

&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message: error_msg };
throw_spark_error_as_json(env, &spark_error)
} else {
Comment on lines +416 to +423
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

File-not-found detection is too narrow for multi-backend object stores.

Requiring both substrings ("not found" and "No such file or directory") is fragile and can miss valid not-found errors (then falling back to generic exception flow). Centralize and broaden matching in one helper.

💡 Proposed fix
+fn is_file_not_found_message(msg: &str) -> bool {
+    let m = msg.to_ascii_lowercase();
+    (m.contains("object at location") && m.contains("not found"))
+        || m.contains("no such file or directory")
+        || m.contains("no such key")
+        || m.contains("status code: 404")
+}
+
 fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<String>) {
@@
-                    if error_msg.contains("not found")
-                        && error_msg.contains("No such file or directory")
-                    {
+                    if is_file_not_found_message(&error_msg) {
                         let spark_error = SparkError::FileNotFound { message: error_msg };
                         throw_spark_error_as_json(env, &spark_error)
@@
-                if error_msg.contains("not found")
-                    && error_msg.contains("No such file or directory")
-                {
+                if is_file_not_found_message(&error_msg) {
                     let spark_error = SparkError::FileNotFound { message: error_msg };
                     throw_spark_error_as_json(env, &spark_error)

Also applies to: 439-446

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@native/core/src/errors.rs` around lines 416 - 423, The current not-found
detection logic in the error handling branch is too strict (it requires both
"not found" and "No such file or directory") and will miss backend-specific
messages; refactor by adding a small helper function (e.g. is_not_found_error)
that centralizes broader matching (check for any of several substrings like "not
found", "No such file or directory", "NotFound", "404", backend-specific
phrases) and use it in the existing branches that construct
SparkError::FileNotFound and call throw_spark_error_as_json; update both places
noted (around the SparkError::FileNotFound creation and the similar block at
439-446) to call the helper instead of the dual-substring check so all
object-store backends are covered.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The detection of the error type is fragile and it could match a totally unrelated error. It looks like a best effort detection that in the worst case would lead to a wrongly formatted "full" error. It will be improved with the time when the users provide examples of errors which fail be to detected.

// Not a SparkError, use generic exception
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
exception.class,
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
),
_ => env.throw_new(exception.class, exception.msg),
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated file-not-found detection and fallback logic

Low Severity

The file-not-found detection logic (contains("not found") && contains("No such file or directory")) along with the SparkError::FileNotFound construction and the entire generic-exception fallback block are copy-pasted between the DataFusionError::External arm and the catch-all _ arm. The only difference is the source of error_msg (e.to_string() vs error.to_string()). If the detection heuristic or fallback behavior ever needs updating, it must be changed in both places, risking inconsistency.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Bugbot AI reviewer is correct! The detection of the error type is fragile and it could match a totally unrelated error. It looks like a best effort detection that in the worst case would lead to a wrongly formatted "full" error. It will be improved with the time when the users provide examples of errors which fail be to detected.

}
}
// Handle direct SparkError - serialize to JSON
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
_ => {
// Check for file-not-found errors that may arrive through other wrapping paths
let error_msg = error.to_string();
if error_msg.contains("not found")
&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message: error_msg };
throw_spark_error_as_json(env, &spark_error)
} else {
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
Expand All @@ -424,18 +454,6 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
}
}
}
// Handle direct SparkError - serialize to JSON
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
_ => {
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
exception.class,
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
),
_ => env.throw_new(exception.class, exception.msg),
}
}
}
.expect("Thrown exception")
}
Expand Down
15 changes: 15 additions & 0 deletions native/spark-expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ pub enum SparkError {
#[error("[SCALAR_SUBQUERY_TOO_MANY_ROWS] Scalar subquery returned more than one row.")]
ScalarSubqueryTooManyRows,

#[error("{message}")]
FileNotFound { message: String },

#[error("ArrowError: {0}.")]
Arrow(Arc<ArrowError>),

Expand Down Expand Up @@ -236,6 +239,7 @@ impl SparkError {
SparkError::InvalidRegexGroupIndex { .. } => "InvalidRegexGroupIndex",
SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows",
SparkError::FileNotFound { .. } => "FileNotFound",
SparkError::Arrow(_) => "Arrow",
SparkError::Internal(_) => "Internal",
}
Expand Down Expand Up @@ -421,6 +425,11 @@ impl SparkError {
"dataType": data_type,
})
}
SparkError::FileNotFound { message } => {
serde_json::json!({
"message": message,
})
}
SparkError::Arrow(e) => {
serde_json::json!({
"message": e.to_string(),
Expand Down Expand Up @@ -487,6 +496,9 @@ impl SparkError {
SparkError::DatatypeCannotOrder { .. }
| SparkError::InvalidUtf8String { .. } => "org/apache/spark/SparkIllegalArgumentException",

// FileNotFound - will be converted to SparkFileNotFoundException by the shim
SparkError::FileNotFound { .. } => "org/apache/spark/SparkException",

// Generic errors
SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException",
}
Expand Down Expand Up @@ -559,6 +571,9 @@ impl SparkError {
// Subquery errors
SparkError::ScalarSubqueryTooManyRows => Some("SCALAR_SUBQUERY_TOO_MANY_ROWS"),

// File not found
SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),

// Generic errors (no error class)
SparkError::Arrow(_) | SparkError::Internal(_) => None,
}
Expand Down
15 changes: 1 addition & 14 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

package org.apache.comet

import java.io.FileNotFoundException
import java.lang.management.ManagementFactory

import scala.util.matching.Regex

import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -163,19 +160,9 @@ class CometExecIterator(
// threw the exception, so we log the exception with taskAttemptId here
logError(s"Native execution for task $taskAttemptId failed", e)

val fileNotFoundPattern: Regex =
("""^External: Object at location (.+?) not found: No such file or directory """ +
"""\(os error \d+\)$""").r
val parquetError: Regex =
val parquetError: scala.util.matching.Regex =
"""^Parquet error: (?:.*)$""".r
e.getMessage match {
case fileNotFoundPattern(filePath) =>
// See org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError
throw new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2055",
messageParameters = Map("message" -> e.getMessage),
cause = new FileNotFoundException(filePath)
) // Can't use SparkFileNotFoundException because it's private.
case parquetError() =>
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
// See org.apache.parquet.hadoop.ParquetFileReader for error message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.spark.sql.comet.shims

import java.io.FileNotFoundException

import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -243,6 +245,18 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
// FileNotFoundException: "File <path> does not exist"
val path = "Object at location (.+?) not found".r
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
Some(
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(s"File $path does not exist")))

case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.spark.sql.comet.shims

import java.io.FileNotFoundException

import org.apache.spark.{QueryContext, SparkException}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -239,6 +241,18 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
// FileNotFoundException: "File <path> does not exist"
val path = "Object at location (.+?) not found".r
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the native message doesn’t match the regex, getOrElse(msg) treats the entire native error text as a file path, which can lead to confusing File <full message> does not exist output. Since this behavior is format-dependent, a small regression test for errorType=FileNotFound conversion would help catch future message changes.

Severity: low

Other Locations
  • spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala:255
  • spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala:262

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The detection of the error type is fragile and it could match a totally unrelated error. It looks like a best effort detection that in the worst case would lead to a wrongly formatted "full" error. It will be improved with the time when the users provide examples of errors which fail be to detected.

Some(
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(s"File $path does not exist")))

Comment on lines +244 to +255
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid double-wrapping already formatted file-not-found messages.

If Line 245 already contains File <path> does not exist, the fallback at Line 251 produces File File <path> does not exist does not exist. Normalize the message before wrapping.

💡 Proposed fix
       case "FileNotFound" =>
         val msg = params("message").toString
-        // Extract file path from native error message and format like Hadoop's
-        // FileNotFoundException: "File <path> does not exist"
-        val path = "Object at location (.+?) not found".r
-          .findFirstMatchIn(msg)
-          .map(_.group(1))
-          .getOrElse(msg)
+        // Normalize to Hadoop-style: "File <path> does not exist"
+        val normalizedMsg =
+          if (msg.startsWith("File ") && msg.endsWith(" does not exist")) {
+            msg
+          } else {
+            val path = "Object at location (.+?) not found".r
+              .findFirstMatchIn(msg)
+              .map(_.group(1))
+              .getOrElse(msg)
+            s"File $path does not exist"
+          }
         Some(
           QueryExecutionErrors.readCurrentFileNotFoundError(
-            new FileNotFoundException(s"File $path does not exist")))
+            new FileNotFoundException(normalizedMsg)))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
// FileNotFoundException: "File <path> does not exist"
val path = "Object at location (.+?) not found".r
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
Some(
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(s"File $path does not exist")))
case "FileNotFound" =>
val msg = params("message").toString
// Normalize to Hadoop-style: "File <path> does not exist"
val normalizedMsg =
if (msg.startsWith("File ") && msg.endsWith(" does not exist")) {
msg
} else {
val path = "Object at location (.+?) not found".r
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
s"File $path does not exist"
}
Some(
QueryExecutionErrors.readCurrentFileNotFoundError(
new FileNotFoundException(normalizedMsg)))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala`
around lines 244 - 255, The FileNotFound case in ShimSparkErrorConverter.scala
currently blindly wraps the native message, causing duplicates like "File File
<path> does not exist does not exist"; change the extraction logic in the case
"FileNotFound" block so that you first check if msg already matches the
Hadoop-style pattern "File (.+) does not exist" and, if so, extract only the
path group, otherwise fall back to the existing "Object at location (.+?) not
found" capture or use msg as path; then pass new FileNotFoundException(s"File
$path does not exist") into QueryExecutionErrors.readCurrentFileNotFoundError to
ensure you never produce "File File ..." duplicates (refer to variables msg,
path and the call to QueryExecutionErrors.readCurrentFileNotFoundError).

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The detection of the error type is fragile and it could match a totally unrelated error. It looks like a best effort detection that in the worst case would lead to a wrongly formatted "full" error. It will be improved with the time when the users provide examples of errors which fail be to detected.

case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet.shims

import org.apache.spark.QueryContext
import org.apache.spark.SparkException
import org.apache.spark.SparkFileNotFoundException
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -251,6 +252,20 @@ trait ShimSparkErrorConverter {
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
context.headOption.orNull))

case "FileNotFound" =>
val msg = params("message").toString
// Extract file path from native error message and format like Hadoop's
// FileNotFoundException: "File <path> does not exist"
val path = "Object at location (.+?) not found".r
.findFirstMatchIn(msg)
.map(_.group(1))
.getOrElse(msg)
// readCurrentFileNotFoundError was removed in Spark 4.0; construct directly
Some(
new SparkFileNotFoundException(
errorClass = "_LEGACY_ERROR_TEMP_2055",
messageParameters = Map("message" -> s"File $path does not exist")))

case _ =>
// Unknown error type - return None to trigger fallback
None
Expand Down
Loading