Skip to content

Commit 88d3a1e

Browse files
Merge branch 'apache:main' into main
2 parents 7c5eb20 + 7878f0d commit 88d3a1e

File tree

12 files changed

+496
-127
lines changed

12 files changed

+496
-127
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ jobs:
384384
- name: Java test steps
385385
uses: ./.github/actions/java-test
386386
with:
387-
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}
387+
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}-${{ matrix.profile.scan_impl }}
388388
suites: ${{ matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12' && '' || matrix.suite.value }}
389389
maven_opts: ${{ matrix.profile.maven_opts }}
390390
scan_impl: ${{ matrix.profile.scan_impl }}

dev/diffs/3.4.3.diff

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ index a6b295578d6..91acca4306f 100644
523523

524524
test("SPARK-35884: Explain Formatted") {
525525
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
526-
index 2796b1cf154..d628f44e4ee 100644
526+
index 2796b1cf154..53dcfde932e 100644
527527
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
528528
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
529529
@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
@@ -534,41 +534,70 @@ index 2796b1cf154..d628f44e4ee 100644
534534
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
535535
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
536536
import org.apache.spark.sql.execution.datasources.FilePartition
537-
@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest
538-
}
537+
@@ -516,21 +517,24 @@ class FileBasedDataSourceSuite extends QueryTest
538+
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
539+
540+
// RuntimeException is triggered at executor side, which is then wrapped as
541+
- // SparkException at driver side
542+
- val e1 = intercept[SparkException] {
543+
- sql(s"select b from $tableName").collect()
544+
+ // SparkException at driver side. Comet native readers throw RuntimeException
545+
+ // directly without the SparkException wrapper.
546+
+ def getDuplicateFieldError(query: String): RuntimeException = {
547+
+ try {
548+
+ sql(query).collect()
549+
+ fail("Expected an exception").asInstanceOf[RuntimeException]
550+
+ } catch {
551+
+ case e: SparkException =>
552+
+ e.getCause.asInstanceOf[RuntimeException]
553+
+ case e: RuntimeException => e
554+
+ }
555+
}
556+
- assert(
557+
- e1.getCause.isInstanceOf[RuntimeException] &&
558+
- e1.getCause.getMessage.contains(
559+
- """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
560+
- val e2 = intercept[SparkException] {
561+
- sql(s"select B from $tableName").collect()
562+
- }
563+
- assert(
564+
- e2.getCause.isInstanceOf[RuntimeException] &&
565+
- e2.getCause.getMessage.contains(
566+
- """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
567+
+ val e1 = getDuplicateFieldError(s"select b from $tableName")
568+
+ assert(e1.getMessage.contains(
569+
+ """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
570+
+ val e2 = getDuplicateFieldError(s"select B from $tableName")
571+
+ assert(e2.getMessage.contains(
572+
+ """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
573+
}
539574

540-
Seq("parquet", "orc").foreach { format =>
541-
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
542-
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
543-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
544-
withTempDir { dir =>
545-
val tableName = s"spark_25132_${format}_native"
546-
val tableDir = dir.getCanonicalPath + s"/$tableName"
547-
@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest
575+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
576+
@@ -815,6 +819,7 @@ class FileBasedDataSourceSuite extends QueryTest
548577
assert(bJoinExec.isEmpty)
549578
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
550579
case smJoin: SortMergeJoinExec => smJoin
551580
+ case smJoin: CometSortMergeJoinExec => smJoin
552581
}
553582
assert(smJoinExec.nonEmpty)
554583
}
555-
@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest
584+
@@ -875,6 +880,7 @@ class FileBasedDataSourceSuite extends QueryTest
556585

557586
val fileScan = df.queryExecution.executedPlan collectFirst {
558587
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
559588
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _), _, _) => f
560589
}
561590
assert(fileScan.nonEmpty)
562591
assert(fileScan.get.partitionFilters.nonEmpty)
563-
@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest
592+
@@ -916,6 +922,7 @@ class FileBasedDataSourceSuite extends QueryTest
564593

565594
val fileScan = df.queryExecution.executedPlan collectFirst {
566595
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
567596
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _), _, _) => f
568597
}
569598
assert(fileScan.nonEmpty)
570599
assert(fileScan.get.partitionFilters.isEmpty)
571-
@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest
600+
@@ -1100,6 +1107,9 @@ class FileBasedDataSourceSuite extends QueryTest
572601
val filters = df.queryExecution.executedPlan.collect {
573602
case f: FileSourceScanLike => f.dataFilters
574603
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -2003,7 +2032,7 @@ index 07e2849ce6f..3e73645b638 100644
20032032
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
20042033
)
20052034
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
2006-
index 104b4e416cd..d865077684f 100644
2035+
index 104b4e416cd..b8af360fa14 100644
20072036
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
20082037
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
20092038
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
@@ -2083,17 +2112,32 @@ index 104b4e416cd..d865077684f 100644
20832112
val schema = StructType(Seq(
20842113
StructField("a", IntegerType, nullable = false)
20852114
))
2086-
@@ -1934,7 +1950,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2087-
}
2088-
}
2115+
@@ -1950,11 +1966,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2116+
""".stripMargin)
2117+
2118+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
2119+
- val e = intercept[SparkException] {
2120+
+ // Spark native readers wrap the error in SparkException.
2121+
+ // Comet native readers throw RuntimeException directly.
2122+
+ val msg = try {
2123+
sql(s"select a from $tableName where b > 0").collect()
2124+
+ fail("Expected an exception")
2125+
+ } catch {
2126+
+ case e: SparkException =>
2127+
+ assert(e.getCause.isInstanceOf[RuntimeException])
2128+
+ e.getCause.getMessage
2129+
+ case e: RuntimeException =>
2130+
+ e.getMessage
2131+
}
2132+
- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
2133+
- """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
2134+
+ assert(msg.contains(
2135+
+ """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""),
2136+
+ s"Unexpected error message: $msg")
2137+
}
20892138

2090-
- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
2091-
+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode",
2092-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
2093-
withTempPath { dir =>
2094-
val count = 10
2095-
val tableName = "spark_25207"
2096-
@@ -1985,7 +2002,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2139+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
2140+
@@ -1985,7 +2011,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20972141
}
20982142
}
20992143

@@ -2103,7 +2147,7 @@ index 104b4e416cd..d865077684f 100644
21032147
// block 1:
21042148
// null count min max
21052149
// page-0 0 0 99
2106-
@@ -2045,7 +2063,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2150+
@@ -2045,7 +2072,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
21072151
}
21082152
}
21092153

@@ -2113,7 +2157,7 @@ index 104b4e416cd..d865077684f 100644
21132157
withTempPath { dir =>
21142158
val path = dir.getCanonicalPath
21152159
spark.range(100).selectExpr("id * 2 AS id")
2116-
@@ -2277,7 +2296,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
2160+
@@ -2277,7 +2305,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
21172161
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21182162
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21192163

@@ -2126,7 +2170,7 @@ index 104b4e416cd..d865077684f 100644
21262170
} else {
21272171
assert(selectedFilters.isEmpty, "There is filter pushed down")
21282172
}
2129-
@@ -2337,7 +2360,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
2173+
@@ -2337,7 +2369,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
21302174
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21312175
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21322176

dev/diffs/3.5.8.diff

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ index a206e97c353..fea1149b67d 100644
494494

495495
test("SPARK-35884: Explain Formatted") {
496496
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
497-
index 93275487f29..601cb6647fe 100644
497+
index 93275487f29..77a27d1c40a 100644
498498
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
499499
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
500500
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
@@ -522,41 +522,64 @@ index 93275487f29..601cb6647fe 100644
522522
checkErrorMatchPVals(
523523
exception = intercept[SparkException] {
524524
testIgnoreMissingFiles(options)
525-
@@ -639,7 +643,8 @@ class FileBasedDataSourceSuite extends QueryTest
526-
}
527-
528-
Seq("parquet", "orc").foreach { format =>
529-
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
530-
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
531-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
532-
withTempDir { dir =>
533-
val tableName = s"spark_25132_${format}_native"
534-
val tableDir = dir.getCanonicalPath + s"/$tableName"
535-
@@ -955,6 +960,7 @@ class FileBasedDataSourceSuite extends QueryTest
525+
@@ -656,18 +660,25 @@ class FileBasedDataSourceSuite extends QueryTest
526+
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
527+
528+
// RuntimeException is triggered at executor side, which is then wrapped as
529+
- // SparkException at driver side
530+
+ // SparkException at driver side. Comet native readers throw
531+
+ // SparkRuntimeException directly without the SparkException wrapper.
532+
+ def getDuplicateFieldError(query: String): SparkRuntimeException = {
533+
+ try {
534+
+ sql(query).collect()
535+
+ fail("Expected an exception").asInstanceOf[SparkRuntimeException]
536+
+ } catch {
537+
+ case e: SparkException =>
538+
+ e.getCause.asInstanceOf[SparkRuntimeException]
539+
+ case e: SparkRuntimeException => e
540+
+ }
541+
+ }
542+
checkError(
543+
- exception = intercept[SparkException] {
544+
- sql(s"select b from $tableName").collect()
545+
- }.getCause.asInstanceOf[SparkRuntimeException],
546+
+ exception = getDuplicateFieldError(s"select b from $tableName"),
547+
errorClass = "_LEGACY_ERROR_TEMP_2093",
548+
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
549+
)
550+
checkError(
551+
- exception = intercept[SparkException] {
552+
- sql(s"select B from $tableName").collect()
553+
- }.getCause.asInstanceOf[SparkRuntimeException],
554+
+ exception = getDuplicateFieldError(s"select B from $tableName"),
555+
errorClass = "_LEGACY_ERROR_TEMP_2093",
556+
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
557+
)
558+
@@ -955,6 +966,7 @@ class FileBasedDataSourceSuite extends QueryTest
536559
assert(bJoinExec.isEmpty)
537560
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
538561
case smJoin: SortMergeJoinExec => smJoin
539562
+ case smJoin: CometSortMergeJoinExec => smJoin
540563
}
541564
assert(smJoinExec.nonEmpty)
542565
}
543-
@@ -1015,6 +1021,7 @@ class FileBasedDataSourceSuite extends QueryTest
566+
@@ -1015,6 +1027,7 @@ class FileBasedDataSourceSuite extends QueryTest
544567

545568
val fileScan = df.queryExecution.executedPlan collectFirst {
546569
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
547570
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
548571
}
549572
assert(fileScan.nonEmpty)
550573
assert(fileScan.get.partitionFilters.nonEmpty)
551-
@@ -1056,6 +1063,7 @@ class FileBasedDataSourceSuite extends QueryTest
574+
@@ -1056,6 +1069,7 @@ class FileBasedDataSourceSuite extends QueryTest
552575

553576
val fileScan = df.queryExecution.executedPlan collectFirst {
554577
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
555578
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
556579
}
557580
assert(fileScan.nonEmpty)
558581
assert(fileScan.get.partitionFilters.isEmpty)
559-
@@ -1240,6 +1248,9 @@ class FileBasedDataSourceSuite extends QueryTest
582+
@@ -1240,6 +1254,9 @@ class FileBasedDataSourceSuite extends QueryTest
560583
val filters = df.queryExecution.executedPlan.collect {
561584
case f: FileSourceScanLike => f.dataFilters
562585
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -1959,7 +1982,7 @@ index 07e2849ce6f..3e73645b638 100644
19591982
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
19601983
)
19611984
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
1962-
index 8e88049f51e..f19c12c98e6 100644
1985+
index 8e88049f51e..f9d515edee1 100644
19631986
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19641987
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19651988
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -2030,17 +2053,25 @@ index 8e88049f51e..f19c12c98e6 100644
20302053
val schema = StructType(Seq(
20312054
StructField("a", IntegerType, nullable = false)
20322055
))
2033-
@@ -1952,8 +1966,17 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2034-
val e = intercept[SparkException] {
2056+
@@ -1949,11 +1965,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2057+
""".stripMargin)
2058+
2059+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
2060+
- val e = intercept[SparkException] {
2061+
+ // Spark native readers wrap the error in SparkException(FAILED_READ_FILE).
2062+
+ // Comet native readers throw SparkRuntimeException directly.
2063+
+ val msg = try {
20352064
sql(s"select a from $tableName where b > 0").collect()
2065+
+ fail("Expected an exception")
2066+
+ } catch {
2067+
+ case e: SparkException =>
2068+
+ assert(e.getCause.isInstanceOf[RuntimeException])
2069+
+ e.getCause.getMessage
2070+
+ case e: RuntimeException =>
2071+
+ e.getMessage
20362072
}
20372073
- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
20382074
- """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
2039-
+ assert(e.getCause.isInstanceOf[RuntimeException])
2040-
+ val msg = e.getCause.getMessage
2041-
+ // native_datafusion converts DataFusion's "Unable to get field named" error
2042-
+ // to _LEGACY_ERROR_TEMP_2093 but with a lowercase field name ("b" vs "B")
2043-
+ // because DataFusion resolves field names case-insensitively
20442075
+ assert(
20452076
+ msg.contains(
20462077
+ """Found duplicate field(s) "B": [B, b] in case-insensitive mode""") ||
@@ -2050,7 +2081,7 @@ index 8e88049f51e..f19c12c98e6 100644
20502081
}
20512082

20522083
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
2053-
@@ -1984,7 +2007,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2084+
@@ -1984,7 +2013,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20542085
}
20552086
}
20562087

@@ -2060,7 +2091,7 @@ index 8e88049f51e..f19c12c98e6 100644
20602091
// block 1:
20612092
// null count min max
20622093
// page-0 0 0 99
2063-
@@ -2044,7 +2068,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2094+
@@ -2044,7 +2074,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20642095
}
20652096
}
20662097

@@ -2070,7 +2101,7 @@ index 8e88049f51e..f19c12c98e6 100644
20702101
withTempPath { dir =>
20712102
val path = dir.getCanonicalPath
20722103
spark.range(100).selectExpr("id * 2 AS id")
2073-
@@ -2276,7 +2301,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
2104+
@@ -2276,7 +2307,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
20742105
assert(pushedParquetFilters.exists(_.getClass === filterClass),
20752106
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
20762107

@@ -2083,7 +2114,7 @@ index 8e88049f51e..f19c12c98e6 100644
20832114
} else {
20842115
assert(selectedFilters.isEmpty, "There is filter pushed down")
20852116
}
2086-
@@ -2336,7 +2365,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
2117+
@@ -2336,7 +2371,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
20872118
assert(pushedParquetFilters.exists(_.getClass === filterClass),
20882119
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
20892120

0 commit comments

Comments
 (0)