Skip to content

Commit 3360ca5

Browse files
Merge branch 'apache:main' into main
2 parents 498c989 + 941c300 commit 3360ca5

File tree

16 files changed

+1055
-203
lines changed

16 files changed

+1055
-203
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ jobs:
129129
org.apache.comet.exec.CometAggregateSuite
130130
org.apache.comet.exec.CometExec3_4PlusSuite
131131
org.apache.comet.exec.CometExecSuite
132+
org.apache.comet.exec.CometWindowExecSuite
132133
org.apache.comet.exec.CometJoinSuite
133134
org.apache.comet.CometArrayExpressionSuite
134135
org.apache.comet.CometCastSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ jobs:
9494
org.apache.comet.exec.CometAggregateSuite
9595
org.apache.comet.exec.CometExec3_4PlusSuite
9696
org.apache.comet.exec.CometExecSuite
97+
org.apache.comet.exec.CometWindowExecSuite
9798
org.apache.comet.exec.CometJoinSuite
9899
org.apache.comet.CometArrayExpressionSuite
99100
org.apache.comet.CometCastSuite

docs/source/contributor-guide/spark-sql-tests.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ git apply ../datafusion-comet/dev/diffs/3.4.3.diff
5656

5757
### Use the following commands to run the Spark SQL test suite locally.
5858

59+
Optionally, enable Comet fallback logging, so that all fallback reasons are logged at `WARN` level.
60+
61+
```shell
62+
export ENABLE_COMET_LOG_FALLBACK_REASONS=true
63+
```
64+
5965
```shell
6066
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt catalyst/test
6167
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest"
@@ -68,7 +74,7 @@ ENABLE_COMET=true ENABLE_COMET_ONHEAP=true build/sbt "hive/testOnly * -- -n org.
6874
### Steps to run individual test suites through SBT
6975
1. Open SBT with Comet enabled
7076
```shell
71-
ENABLE_COMET=true sbt -J-Xmx4096m -Dspark.test.includeSlowTests=true
77+
ENABLE_COMET=true ENABLE_COMET_ONHEAP=true sbt -J-Xmx4096m -Dspark.test.includeSlowTests=true
7278
```
7379
2. Run individual tests (Below code runs test named `SPARK-35568` in the `spark-sql` module)
7480
```shell

docs/source/user-guide/latest/compatibility.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ and sorting on floating-point data can be enabled by setting `spark.comet.expres
5656
## Incompatible Expressions
5757

5858
Expressions that are not 100% Spark-compatible will fall back to Spark by default and can be enabled by setting
59-
`spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark expression class name. See
60-
the [Comet Supported Expressions Guide](expressions.md) for more information on this configuration setting.
59+
`spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark expression class name. See
60+
the [Comet Supported Expressions Guide](expressions.md) for more information on this configuration setting.
6161

6262
It is also possible to specify `spark.comet.expression.allowIncompatible=true` to enable all
6363
incompatible expressions.

fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments)
3131
opt[String](required = true, descr = "Folder with Spark produced results in Parquet format")
3232
val inputCometFolder: ScallopOption[String] =
3333
opt[String](required = true, descr = "Folder with Comet produced results in Parquet format")
34+
val tolerance: ScallopOption[Double] =
35+
opt[Double](default = Some(0.000002), descr = "Tolerance for floating point comparisons")
3436
}
3537
addSubcommand(compareParquet)
3638
verify()
@@ -49,7 +51,8 @@ object ComparisonTool {
4951
compareParquetFolders(
5052
spark,
5153
conf.compareParquet.inputSparkFolder(),
52-
conf.compareParquet.inputCometFolder())
54+
conf.compareParquet.inputCometFolder(),
55+
conf.compareParquet.tolerance())
5356

5457
case _ =>
5558
// scalastyle:off println
@@ -62,7 +65,8 @@ object ComparisonTool {
6265
private def compareParquetFolders(
6366
spark: SparkSession,
6467
sparkFolderPath: String,
65-
cometFolderPath: String): Unit = {
68+
cometFolderPath: String,
69+
tolerance: Double): Unit = {
6670

6771
val output = QueryRunner.createOutputMdFile()
6872

@@ -115,7 +119,7 @@ object ComparisonTool {
115119
val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect()
116120

117121
// Compare the results
118-
if (QueryComparison.assertSameRows(sparkRows, cometRows, output)) {
122+
if (QueryComparison.assertSameRows(sparkRows, cometRows, output, tolerance)) {
119123
output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")
120124
} else {
121125
// Output schema if dataframes are not equal

fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ object QueryComparison {
148148
def assertSameRows(
149149
sparkRows: Array[Row],
150150
cometRows: Array[Row],
151-
output: BufferedWriter): Boolean = {
151+
output: BufferedWriter,
152+
tolerance: Double = 0.000001): Boolean = {
152153
if (sparkRows.length == cometRows.length) {
153154
var i = 0
154155
while (i < sparkRows.length) {
@@ -164,7 +165,7 @@ object QueryComparison {
164165

165166
assert(l.length == r.length)
166167
for (j <- 0 until l.length) {
167-
if (!same(l(j), r(j))) {
168+
if (!same(l(j), r(j), tolerance)) {
168169
output.write(s"First difference at row $i:\n")
169170
output.write("Spark: `" + formatRow(l) + "`\n")
170171
output.write("Comet: `" + formatRow(r) + "`\n")
@@ -186,7 +187,7 @@ object QueryComparison {
186187
true
187188
}
188189

189-
private def same(l: Any, r: Any): Boolean = {
190+
private def same(l: Any, r: Any, tolerance: Double): Boolean = {
190191
if (l == null || r == null) {
191192
return l == null && r == null
192193
}
@@ -195,20 +196,20 @@ object QueryComparison {
195196
case (a: Float, b: Float) if a.isNegInfinity => b.isNegInfinity
196197
case (a: Float, b: Float) if a.isInfinity => b.isInfinity
197198
case (a: Float, b: Float) if a.isNaN => b.isNaN
198-
case (a: Float, b: Float) => (a - b).abs <= 0.000001f
199+
case (a: Float, b: Float) => (a - b).abs <= tolerance
199200
case (a: Double, b: Double) if a.isPosInfinity => b.isPosInfinity
200201
case (a: Double, b: Double) if a.isNegInfinity => b.isNegInfinity
201202
case (a: Double, b: Double) if a.isInfinity => b.isInfinity
202203
case (a: Double, b: Double) if a.isNaN => b.isNaN
203-
case (a: Double, b: Double) => (a - b).abs <= 0.000001
204+
case (a: Double, b: Double) => (a - b).abs <= tolerance
204205
case (a: Array[_], b: Array[_]) =>
205-
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2))
206+
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance))
206207
case (a: mutable.WrappedArray[_], b: mutable.WrappedArray[_]) =>
207-
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2))
208+
a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance))
208209
case (a: Row, b: Row) =>
209210
val aa = a.toSeq
210211
val bb = b.toSeq
211-
aa.length == bb.length && aa.zip(bb).forall(x => same(x._1, x._2))
212+
aa.length == bb.length && aa.zip(bb).forall(x => same(x._1, x._2, tolerance))
212213
case (a, b) => a == b
213214
}
214215
}

spark/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ under the License.
326326
</goals>
327327
<configuration>
328328
<mainClass>org.apache.comet.GenerateDocs</mainClass>
329+
<arguments>${project.parent.basedir}/docs/source/user-guide/latest/</arguments>
329330
<classpathScope>compile</classpathScope>
330331
</configuration>
331332
</execution>

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,16 @@ import org.apache.comet.serde.{Compatible, Incompatible, QueryPlanSerde}
3636
*/
3737
object GenerateDocs {
3838

39-
private def userGuideLocation = "docs/source/user-guide/latest/"
40-
41-
val publicConfigs: Set[ConfigEntry[_]] = CometConf.allConfs.filter(_.isPublic).toSet
39+
private val publicConfigs: Set[ConfigEntry[_]] = CometConf.allConfs.filter(_.isPublic).toSet
4240

4341
def main(args: Array[String]): Unit = {
44-
generateConfigReference()
45-
generateCompatibilityGuide()
42+
val userGuideLocation = args(0)
43+
generateConfigReference(s"$userGuideLocation/configs.md")
44+
generateCompatibilityGuide(s"$userGuideLocation/compatibility.md")
4645
}
4746

48-
private def generateConfigReference(): Unit = {
47+
private def generateConfigReference(filename: String): Unit = {
4948
val pattern = "<!--BEGIN:CONFIG_TABLE\\[(.*)]-->".r
50-
val filename = s"$userGuideLocation/configs.md"
5149
val lines = readFile(filename)
5250
val w = new BufferedOutputStream(new FileOutputStream(filename))
5351
for (line <- lines) {
@@ -95,8 +93,7 @@ object GenerateDocs {
9593
w.close()
9694
}
9795

98-
private def generateCompatibilityGuide(): Unit = {
99-
val filename = s"$userGuideLocation/compatibility.md"
96+
private def generateCompatibilityGuide(filename: String): Unit = {
10097
val lines = readFile(filename)
10198
val w = new BufferedOutputStream(new FileOutputStream(filename))
10299
for (line <- lines) {

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
887887
var supported = true
888888
for (o <- orderings) {
889889
if (QueryPlanSerde.exprToProto(o, inputs).isEmpty) {
890-
withInfo(s, s"unsupported range partitioning sort order: $o")
890+
withInfo(s, s"unsupported range partitioning sort order: $o", o)
891891
supported = false
892892
// We don't short-circuit in case there is more than one unsupported expression
893893
// to provide info for.

spark/src/main/scala/org/apache/comet/serde/literals.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.lang
2323

2424
import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
26-
import org.apache.spark.sql.catalyst.util.GenericArrayData
26+
import org.apache.spark.sql.catalyst.util.ArrayData
2727
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampNTZType, TimestampType}
2828
import org.apache.spark.unsafe.types.UTF8String
2929

@@ -92,7 +92,7 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging {
9292

9393
case arr: ArrayType =>
9494
val listLiteralBuilder: ListLiteral.Builder =
95-
makeListLiteral(value.asInstanceOf[GenericArrayData].array, arr)
95+
makeListLiteral(value.asInstanceOf[ArrayData].array, arr)
9696
exprBuilder.setListVal(listLiteralBuilder.build())
9797
exprBuilder.setDatatype(serializeDataType(dataType).get)
9898
case dt =>
@@ -198,7 +198,7 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging {
198198
})
199199
case a: ArrayType =>
200200
array.foreach(v => {
201-
val casted = v.asInstanceOf[GenericArrayData]
201+
val casted = v.asInstanceOf[ArrayData]
202202
listLiteralBuilder.addListValues(if (casted != null) {
203203
makeListLiteral(casted.array, a)
204204
} else ListLiteral.newBuilder())

0 commit comments

Comments
 (0)