diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala new file mode 100644 index 000000000000..950c2269d7b7 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.sql.Row +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.{DeltaExcludedTestMixin, DeltaSQLCommandTest} + +import org.scalatest.Ignore + +// spotless:off +class DeleteSQLSuite extends DeleteSuiteBase + with DeltaExcludedTestMixin + with DeltaSQLCommandTest { + + import testImplicits._ + + override protected def executeDelete(target: String, where: String = null): Unit = { + val whereClause = Option(where).map(c => s"WHERE $c").getOrElse("") + sql(s"DELETE FROM $target $whereClause") + } + + override def excluded: Seq[String] = super.excluded ++ + Seq( + // FIXME: Excluded by Gluten as results are mismatch. + "test delete on temp view - nontrivial projection - SQL TempView", + "test delete on temp view - nontrivial projection - Dataset TempView" + ) + + // For EXPLAIN, which is not supported in OSS + test("explain") { + append(Seq((2, 2)).toDF("key", "value")) + val df = sql(s"EXPLAIN DELETE FROM delta.`$tempPath` WHERE key = 2") + val outputs = df.collect().map(_.mkString).mkString + assert(outputs.contains("Delta")) + assert(!outputs.contains("index") && !outputs.contains("ActionLog")) + // no change should be made by explain + checkAnswer(readDeltaTable(tempPath), Row(2, 2)) + } + + test("delete from a temp view") { + withTable("tab") { + withTempView("v") { + Seq((1, 1), (0, 3), (1, 5)).toDF("key", "value").write.format("delta").saveAsTable("tab") + spark.table("tab").as("name").createTempView("v") + sql("DELETE FROM v WHERE key = 1") + checkAnswer(spark.table("tab"), Row(0, 3)) + } + } + } + + test("delete from a SQL temp view") { + withTable("tab") { + withTempView("v") { + Seq((1, 1), (0, 3), (1, 5)).toDF("key", "value").write.format("delta").saveAsTable("tab") + sql("CREATE TEMP VIEW v AS SELECT * FROM tab") + sql("DELETE FROM v WHERE key = 1 AND VALUE = 5") + checkAnswer(spark.table("tab"), Seq(Row(1, 1), Row(0, 3))) + } + } + } + + Seq(true, false).foreach { partitioned => + test(s"User defined _change_type column doesn't get dropped - partitioned=$partitioned") { + withTable("tab") { + sql( + s"""CREATE TABLE tab USING DELTA + |${if (partitioned) "PARTITIONED BY (part) " else ""} + |TBLPROPERTIES (delta.enableChangeDataFeed = false) + |AS SELECT id, int(id / 10) AS part, 'foo' as _change_type + |FROM RANGE(1000) + |""".stripMargin) + val rowsToDelete = (1 to 1000 by 42).mkString("(", ", ", ")") + executeDelete("tab", s"id in $rowsToDelete") + sql("SELECT id, _change_type FROM tab").collect().foreach { row => + val _change_type = row.getString(1) + assert(_change_type === "foo", s"Invalid _change_type for id=${row.get(0)}") + } + } + } + } +} + +// FIXME: Enable the test. +// Skipping as function input_file_name doesn't get correctly resolved. +@Ignore +class DeleteSQLNameColumnMappingSuite extends DeleteSQLSuite + with DeltaColumnMappingEnableNameMode { + + protected override def runOnlyTests: Seq[String] = Seq(true, false).map { isPartitioned => + s"basic case - delete from a Delta table by name - Partition=$isPartitioned" + } ++ Seq(true, false).flatMap { isPartitioned => + Seq( + s"where key columns - Partition=$isPartitioned", + s"where data columns - Partition=$isPartitioned") + } + +} + +class DeleteSQLWithDeletionVectorsSuite extends DeleteSQLSuite + with DeltaExcludedTestMixin + with DeletionVectorsTestUtils { + override def beforeAll(): Unit = { + super.beforeAll() + enableDeletionVectors(spark, delete = true) + spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key, "false") + } + + override def excluded: Seq[String] = super.excluded ++ + Seq( + // The following two tests must fail when DV is used. Covered by another test case: + // "throw error when non-pinned TahoeFileIndex snapshot is used". + "data and partition columns - Partition=true Skipping=false", + "data and partition columns - Partition=false Skipping=false", + // The scan schema contains additional row index filter columns. + "nested schema pruning on data condition", + // The number of records is not recomputed when using DVs + "delete throws error if number of records increases", + "delete logs error if number of records are missing in stats", + // FIXME: Excluded by Gluten as results are mismatch. + "test delete on temp view - nontrivial projection - SQL TempView", + "test delete on temp view - nontrivial projection - Dataset TempView" + ) + + // This works correctly with DVs, but fails in classic DELETE. + override def testSuperSetColsTempView(): Unit = { + testComplexTempViews("superset cols")( + text = "SELECT key, value, 1 FROM tab", + expectResult = Row(0, 3, 1) :: Nil) + } +} + +class DeleteSQLWithDeletionVectorsAndPredicatePushdownSuite + extends DeleteSQLWithDeletionVectorsSuite { + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key, "true") + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala new file mode 100644 index 000000000000..8ab9510ff31f --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.functions.{lit, struct} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType + +import shims.DeltaExcludedBySparkVersionTestMixinShims + +// spotless:off +abstract class DeleteSuiteBase extends QueryTest + with SharedSparkSession + with DeltaDMLTestUtils + with DeltaTestUtilsForTempViews + with DeltaExcludedBySparkVersionTestMixinShims { + + import testImplicits._ + + protected def executeDelete(target: String, where: String = null): Unit + + protected def checkDelete( + condition: Option[String], + expectedResults: Seq[Row], + tableName: Option[String] = None): Unit = { + executeDelete(target = tableName.getOrElse(s"delta.`$tempPath`"), where = condition.orNull) + checkAnswer(readDeltaTable(tempPath), expectedResults) + } + + Seq(true, false).foreach { isPartitioned => + test(s"basic case - Partition=$isPartitioned") { + val partitions = if (isPartitioned) "key" :: Nil else Nil + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), partitions) + + checkDelete(condition = None, Nil) + } + } + + Seq(true, false).foreach { isPartitioned => + test(s"basic case - delete from a Delta table by path - Partition=$isPartitioned") { + withTable("deltaTable") { + val partitions = if (isPartitioned) "key" :: Nil else Nil + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + append(input, partitions) + + checkDelete(Some("value = 4 and key = 3"), + Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil) + checkDelete(Some("value = 4 and key = 1"), + Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil) + checkDelete(Some("value = 2 or key = 1"), + Row(0, 3) :: Nil) + checkDelete(Some("key = 0 or value = 99"), Nil) + } + } + } + + Seq(true, false).foreach { isPartitioned => + test(s"basic case - delete from a Delta table by name - Partition=$isPartitioned") { + withTable("delta_table") { + val partitionByClause = if (isPartitioned) "PARTITIONED BY (key)" else "" + sql( + s""" + |CREATE TABLE delta_table(key INT, value INT) + |USING delta + |OPTIONS('path'='$tempPath') + |$partitionByClause + """.stripMargin) + + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + append(input) + + checkDelete(Some("value = 4 and key = 3"), + Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil, + Some("delta_table")) + checkDelete(Some("value = 4 and key = 1"), + Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil, + Some("delta_table")) + checkDelete(Some("value = 2 or key = 1"), + Row(0, 3) :: Nil, + Some("delta_table")) + checkDelete(Some("key = 0 or value = 99"), + Nil, + Some("delta_table")) + } + } + } + + Seq(true, false).foreach { isPartitioned => + test(s"basic key columns - Partition=$isPartitioned") { + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + val partitions = if (isPartitioned) "key" :: Nil else Nil + append(input, partitions) + + checkDelete(Some("key > 2"), Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil) + checkDelete(Some("key < 2"), Row(2, 2) :: Nil) + checkDelete(Some("key = 2"), Nil) + } + } + + Seq(true, false).foreach { isPartitioned => + test(s"where key columns - Partition=$isPartitioned") { + val partitions = if (isPartitioned) "key" :: Nil else Nil + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), partitions) + + checkDelete(Some("key = 1"), Row(2, 2) :: Row(0, 3) :: Nil) + checkDelete(Some("key = 2"), Row(0, 3) :: Nil) + checkDelete(Some("key = 0"), Nil) + } + } + + Seq(true, false).foreach { isPartitioned => + test(s"where data columns - Partition=$isPartitioned") { + val partitions = if (isPartitioned) "key" :: Nil else Nil + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), partitions) + + checkDelete(Some("value <= 2"), Row(1, 4) :: Row(0, 3) :: Nil) + checkDelete(Some("value = 3"), Row(1, 4) :: Nil) + checkDelete(Some("value != 0"), Nil) + } + } + + test("where data columns and partition columns") { + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + append(input, Seq("key")) + + checkDelete(Some("value = 4 and key = 3"), + Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil) + checkDelete(Some("value = 4 and key = 1"), + Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil) + checkDelete(Some("value = 2 or key = 1"), + Row(0, 3) :: Nil) + checkDelete(Some("key = 0 or value = 99"), + Nil) + } + + Seq(true, false).foreach { skippingEnabled => + Seq(true, false).foreach { isPartitioned => + test(s"data and partition columns - Partition=$isPartitioned Skipping=$skippingEnabled") { + withSQLConf(DeltaSQLConf.DELTA_STATS_SKIPPING.key -> skippingEnabled.toString) { + val partitions = if (isPartitioned) "key" :: Nil else Nil + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + append(input, partitions) + + checkDelete(Some("value = 4 and key = 3"), + Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil) + checkDelete(Some("value = 4 and key = 1"), + Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil) + checkDelete(Some("value = 2 or key = 1"), + Row(0, 3) :: Nil) + checkDelete(Some("key = 0 or value = 99"), + Nil) + } + } + } + } + + test("Negative case - non-Delta target") { + Seq((1, 1), (0, 3), (1, 5)).toDF("key1", "value") + .write.format("parquet").mode("append").save(tempPath) + val e = intercept[DeltaAnalysisException] { + executeDelete(target = s"delta.`$tempPath`") + }.getMessage + assert(e.contains("DELETE destination only supports Delta sources") || + e.contains("is not a Delta table") || e.contains("doesn't exist") || + e.contains("Incompatible format")) + } + + test("Negative case - non-deterministic condition") { + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")) + val e = intercept[AnalysisException] { + executeDelete(target = s"delta.`$tempPath`", where = "rand() > 0.5") + }.getMessage + assert(e.contains("nondeterministic expressions are only allowed in") || + e.contains("The operator expects a deterministic expression")) + } + + test("Negative case - DELETE the child directory") { + append(Seq((2, 2), (3, 2)).toDF("key", "value"), partitionBy = "key" :: Nil) + val e = intercept[AnalysisException] { + executeDelete(target = s"delta.`$tempPath/key=2`", where = "value = 2") + }.getMessage + assert(e.contains("Expect a full scan of Delta sources, but found a partial scan")) + } + + test("delete cached table by name") { + withTable("cached_delta_table") { + Seq((2, 2), (1, 4)).toDF("key", "value") + .write.format("delta").saveAsTable("cached_delta_table") + + spark.table("cached_delta_table").cache() + spark.table("cached_delta_table").collect() + executeDelete(target = "cached_delta_table", where = "key = 2") + checkAnswer(spark.table("cached_delta_table"), Row(1, 4) :: Nil) + } + } + + test("delete cached table by path") { + Seq((2, 2), (1, 4)).toDF("key", "value") + .write.mode("overwrite").format("delta").save(tempPath) + spark.read.format("delta").load(tempPath).cache() + spark.read.format("delta").load(tempPath).collect() + executeDelete(s"delta.`$tempPath`", where = "key = 2") + checkAnswer(spark.read.format("delta").load(tempPath), Row(1, 4) :: Nil) + } + + Seq(true, false).foreach { isPartitioned => + test(s"condition having current_date - Partition=$isPartitioned") { + val partitions = if (isPartitioned) "key" :: Nil else Nil + append( + Seq((java.sql.Date.valueOf("1969-12-31"), 2), + (java.sql.Date.valueOf("2099-12-31"), 4)) + .toDF("key", "value"), partitions) + + checkDelete(Some("CURRENT_DATE > key"), + Row(java.sql.Date.valueOf("2099-12-31"), 4) :: Nil) + checkDelete(Some("CURRENT_DATE <= key"), Nil) + } + } + + test("condition having current_timestamp - Partition by Timestamp") { + append( + Seq((java.sql.Timestamp.valueOf("2012-12-31 16:00:10.011"), 2), + (java.sql.Timestamp.valueOf("2099-12-31 16:00:10.011"), 4)) + .toDF("key", "value"), Seq("key")) + + checkDelete(Some("CURRENT_TIMESTAMP > key"), + Row(java.sql.Timestamp.valueOf("2099-12-31 16:00:10.011"), 4) :: Nil) + checkDelete(Some("CURRENT_TIMESTAMP <= key"), Nil) + } + + Seq(true, false).foreach { isPartitioned => + test(s"foldable condition - Partition=$isPartitioned") { + val partitions = if (isPartitioned) "key" :: Nil else Nil + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), partitions) + + val allRows = Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil + + checkDelete(Some("false"), allRows) + checkDelete(Some("1 <> 1"), allRows) + checkDelete(Some("1 > null"), allRows) + checkDelete(Some("true"), Nil) + checkDelete(Some("1 = 1"), Nil) + } + } + + test("SC-12232: should not delete the rows where condition evaluates to null") { + append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key", "value").coalesce(1)) + + // "null = null" evaluates to null + checkDelete(Some("value = null"), + Row("a", null) :: Row("b", null) :: Row("c", "v") :: Row("d", "vv") :: Nil) + + // these expressions evaluate to null when value is null + checkDelete(Some("value = 'v'"), + Row("a", null) :: Row("b", null) :: Row("d", "vv") :: Nil) + checkDelete(Some("value <> 'v'"), + Row("a", null) :: Row("b", null) :: Nil) + } + + test("SC-12232: delete rows with null values using isNull") { + append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key", "value").coalesce(1)) + + // when value is null, this expression evaluates to true + checkDelete(Some("value is null"), + Row("c", "v") :: Row("d", "vv") :: Nil) + } + + test("SC-12232: delete rows with null values using EqualNullSafe") { + append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key", "value").coalesce(1)) + + // when value is null, this expression evaluates to true + checkDelete(Some("value <=> null"), + Row("c", "v") :: Row("d", "vv") :: Nil) + } + + test("do not support subquery test") { + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")) + Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("c", "d").createOrReplaceTempView("source") + + // basic subquery + val e0 = intercept[AnalysisException] { + executeDelete(target = s"delta.`$tempPath`", "key < (SELECT max(c) FROM source)") + }.getMessage + assert(e0.contains("Subqueries are not supported")) + + // subquery with EXISTS + val e1 = intercept[AnalysisException] { + executeDelete(target = s"delta.`$tempPath`", "EXISTS (SELECT max(c) FROM source)") + }.getMessage + assert(e1.contains("Subqueries are not supported")) + + // subquery with NOT EXISTS + val e2 = intercept[AnalysisException] { + executeDelete(target = s"delta.`$tempPath`", "NOT EXISTS (SELECT max(c) FROM source)") + }.getMessage + assert(e2.contains("Subqueries are not supported")) + + // subquery with IN + val e3 = intercept[AnalysisException] { + executeDelete(target = s"delta.`$tempPath`", "key IN (SELECT max(c) FROM source)") + }.getMessage + assert(e3.contains("Subqueries are not supported")) + + // subquery with NOT IN + val e4 = intercept[AnalysisException] { + executeDelete(target = s"delta.`$tempPath`", "key NOT IN (SELECT max(c) FROM source)") + }.getMessage + assert(e4.contains("Subqueries are not supported")) + } + + test("schema pruning on data condition") { + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + append(input, Nil) + // Start from a cached snapshot state + deltaLog.update().stateDF + + val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) { + checkDelete(Some("key = 2"), + Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil) + } + + val scans = executedPlans.flatMap(_.collect { + case f: FileSourceScanExec => f + }) + + // The first scan is for finding files to delete. We only are matching against the key + // so that should be the only field in the schema + assert(scans.head.schema.findNestedField(Seq("key")).nonEmpty) + assert(scans.head.schema.findNestedField(Seq("value")).isEmpty) + } + + + test("nested schema pruning on data condition") { + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + .select(struct("key", "value").alias("nested")) + append(input, Nil) + // Start from a cached snapshot state + deltaLog.update().stateDF + + val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) { + checkDelete(Some("nested.key = 2"), + Row(Row(1, 4)) :: Row(Row(1, 1)) :: Row(Row(0, 3)) :: Nil) + } + + val scans = executedPlans.flatMap(_.collect { + case f: FileSourceScanExec => f + }) + + assert(scans.head.schema == StructType.fromDDL("nested STRUCT")) + } + + /** + * @param function the unsupported function. + * @param functionType The type of the unsupported expression to be tested. + * @param data the data in the table. + * @param where the where clause containing the unsupported expression. + * @param expectException whether an exception is expected to be thrown + * @param customErrorRegex customized error regex. + */ + def testUnsupportedExpression( + function: String, + functionType: String, + data: => DataFrame, + where: String, + expectException: Boolean, + customErrorRegex: Option[String] = None): Unit = { + test(s"$functionType functions in delete - expect exception: $expectException") { + withTable("deltaTable") { + data.write.format("delta").saveAsTable("deltaTable") + + val expectedErrorRegex = "(?s).*(?i)unsupported.*(?i).*Invalid expressions.*" + + var catchException = true + + var errorRegex = if (functionType.equals("Generate")) { + ".*Subqueries are not supported in the DELETE.*" + } else customErrorRegex.getOrElse(expectedErrorRegex) + + + if (catchException) { + val dataBeforeException = spark.read.format("delta").table("deltaTable").collect() + val e = intercept[Exception] { + executeDelete(target = "deltaTable", where = where) + } + val message = if (e.getCause != null) { + e.getCause.getMessage + } else e.getMessage + assert(message.matches(errorRegex)) + checkAnswer(spark.read.format("delta").table("deltaTable"), dataBeforeException) + } else { + executeDelete(target = "deltaTable", where = where) + } + } + } + } + + testUnsupportedExpression( + function = "row_number", + functionType = "Window", + data = Seq((2, 2), (1, 4)).toDF("key", "value"), + where = "row_number() over (order by value) > 1", + expectException = true + ) + + testUnsupportedExpression( + function = "max", + functionType = "Aggregate", + data = Seq((2, 2), (1, 4)).toDF("key", "value"), + where = "key > max(value)", + expectException = true + ) + + // Explode functions are supported in where if only one row generated. + testUnsupportedExpression( + function = "explode", + functionType = "Generate", + data = Seq((2, List(2))).toDF("key", "value"), + where = "key = (select explode(value) from deltaTable)", + expectException = false // generate only one row, no exception. + ) + + // Explode functions are supported in where but if there's more than one row generated, + // it will throw an exception. + testUnsupportedExpression( + function = "explode", + functionType = "Generate", + data = Seq((2, List(2)), (1, List(4, 5))).toDF("key", "value"), + where = "key = (select explode(value) from deltaTable)", + expectException = true, // generate more than one row. Exception expected. + customErrorRegex = + Some(".*More than one row returned by a subquery used as an expression(?s).*") + ) + + Seq(true, false).foreach { isPartitioned => + val name = s"test delete on temp view - basic - Partition=$isPartitioned" + testWithTempView(name) { isSQLTempView => + val partitions = if (isPartitioned) "key" :: Nil else Nil + append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), partitions) + createTempViewFromTable(s"delta.`$tempPath`", isSQLTempView) + checkDelete( + condition = Some("key <= 1"), + expectedResults = Row(2, 2) :: Nil, + tableName = Some("v")) + } + } + + protected def testInvalidTempViews(name: String)( + text: String, + expectedErrorMsgForSQLTempView: String = null, + expectedErrorMsgForDataSetTempView: String = null, + expectedErrorClassForSQLTempView: String = null, + expectedErrorClassForDataSetTempView: String = null): Unit = { + testWithTempView(s"test delete on temp view - $name") { isSQLTempView => + withTable("tab") { + Seq((0, 3), (1, 2)).toDF("key", "value").write.format("delta").saveAsTable("tab") + if (isSQLTempView) { + sql(s"CREATE TEMP VIEW v AS $text") + } else { + sql(text).createOrReplaceTempView("v") + } + val ex = intercept[AnalysisException] { + executeDelete( + "v", + "key >= 1 and value < 3" + ) + } + testErrorMessageAndClass( + isSQLTempView, + ex, + expectedErrorMsgForSQLTempView, + expectedErrorMsgForDataSetTempView, + expectedErrorClassForSQLTempView, + expectedErrorClassForDataSetTempView) + } + } + } + testInvalidTempViews("subset cols")( + text = "SELECT key FROM tab", + expectedErrorClassForSQLTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + expectedErrorClassForDataSetTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION" + ) + + // Need to be able to override this, because it works in some configurations. + protected def testSuperSetColsTempView(): Unit = { + testInvalidTempViews("superset cols")( + text = "SELECT key, value, 1 FROM tab", + // The analyzer can't tell whether the table originally had the extra column or not. + expectedErrorMsgForSQLTempView = "Can't resolve column 1 in root", + expectedErrorMsgForDataSetTempView = "Can't resolve column 1 in root" + ) + } + + testSuperSetColsTempView() + + protected def testComplexTempViews(name: String)( + text: String, + expectResult: Seq[Row]): Unit = { + testWithTempView(s"test delete on temp view - $name") { isSQLTempView => + withTable("tab") { + Seq((0, 3), (1, 2)).toDF("key", "value").write.format("delta").saveAsTable("tab") + createTempViewFromSelect(text, isSQLTempView) + executeDelete( + "v", + "key >= 1 and value < 3" + ) + checkAnswer(spark.read.format("delta").table("v"), expectResult) + } + } + } + + testComplexTempViews("nontrivial projection")( + text = "SELECT value as key, key as value FROM tab", + expectResult = Row(3, 0) :: Nil + ) + + testComplexTempViews("view with too many internal aliases")( + text = "SELECT * FROM (SELECT * FROM tab AS t1) AS t2", + expectResult = Row(0, 3) :: Nil + ) + + testSparkMasterOnly("Variant type") { + val dstDf = sql( + """SELECT parse_json(cast(id as string)) v, id i + FROM range(3)""") + append(dstDf) + + executeDelete(target = s"delta.`$tempPath`", where = "to_json(v) = '1'") + + checkAnswer(readDeltaTable(tempPath).selectExpr("i", "to_json(v)"), + Seq(Row(0, "0"), Row(2, "2"))) + } + + test("delete on partitioned table with special chars") { + val partValue = "part%one" + spark.range(0, 3, 1, 1).toDF("key").withColumn("value", lit(partValue)) + .write.format("delta").partitionBy("value").save(tempPath) + checkDelete( + condition = Some(s"value = '$partValue' and key = 1"), + expectedResults = Row(0, partValue) :: Row(2, partValue) :: Nil) + checkDelete( + condition = Some(s"value = '$partValue' and key = 2"), + expectedResults = Row(0, partValue) :: Nil) + checkDelete( + condition = Some(s"value = '$partValue'"), + expectedResults = Nil) + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala new file mode 100644 index 000000000000..5bb022c12d70 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.sql.{DataFrame, QueryTest, RuntimeConfig, SparkSession} +import org.apache.spark.sql.delta.DeltaOperations.Truncate +import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, RemoveFile} +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore +import org.apache.spark.sql.delta.test.DeltaSQLTestUtils +import org.apache.spark.sql.delta.util.PathWithFileSystem +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.test.SharedSparkSession + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path + +import java.io.File +import java.util.UUID + +// spotless:off +/** Collection of test utilities related with persistent Deletion Vectors. */ +trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession with DeltaSQLTestUtils { + + def enableDeletionVectors( + spark: SparkSession, + delete: Boolean = false, + update: Boolean = false, + merge: Boolean = false): Unit = { + val global = delete || update || merge + spark.conf + .set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, global.toString) + spark.conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, delete.toString) + spark.conf.set(DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key, update.toString) + spark.conf.set(DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS.key, merge.toString) + } + + def enableDeletionVectorsForAllSupportedOperations(spark: SparkSession): Unit = + enableDeletionVectors(spark, delete = true, update = true) + + def testWithDVs(testName: String, testTags: org.scalatest.Tag*)(thunk: => Unit): Unit = { + test(testName, testTags : _*) { + withDeletionVectorsEnabled() { + thunk + } + } + } + + /** Run a thunk with Deletion Vectors enabled/disabled. */ + def withDeletionVectorsEnabled(enabled: Boolean = true)(thunk: => Unit): Unit = { + val enabledStr = enabled.toString + withSQLConf( + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> enabledStr, + DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr, + DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr, + DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr) { + thunk + } + } + + /** Helper to run 'fn' with a temporary Delta table. */ + def withTempDeltaTable( + dataDF: DataFrame, + partitionBy: Seq[String] = Seq.empty, + enableDVs: Boolean = true, + conf: Seq[(String, String)] = Nil) + (fn: (() => io.delta.tables.DeltaTable, DeltaLog) => Unit): Unit = { + withTempPath { path => + val tablePath = new Path(path.getAbsolutePath) + withSQLConf(conf: _*) { + dataDF.write + .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, enableDVs.toString) + .partitionBy(partitionBy: _*) + .format("delta") + .save(tablePath.toString) + } + // DeltaTable hangs on to the DataFrame it is created with for the entire object lifetime. + // That means subsequent `targetTable.toDF` calls will return the same snapshot. + // The DV tests are generally written assuming `targetTable.toDF` would return a new snapshot. + // So create a function here instead of a n instance, so `targetTable().toDF` + // will actually provide a new snapshot. + val targetTable = + () => io.delta.tables.DeltaTable.forPath(tablePath.toString) + val targetLog = DeltaLog.forTable(spark, tablePath) + fn(targetTable, targetLog) + } + } + + /** Create a temp path which contains special characters. */ + override def withTempPath(f: File => Unit): Unit = { + super.withTempPath(prefix = "s p a r k %2a")(f) + } + + /** Create a temp path which contains special characters. */ + override protected def withTempDir(f: File => Unit): Unit = { + super.withTempDir(prefix = "s p a r k %2a")(f) + } + + /** Helper that verifies whether a defined number of DVs exist */ + def verifyDVsExist(targetLog: DeltaLog, filesWithDVsSize: Int): Unit = { + val filesWithDVs = getFilesWithDeletionVectors(targetLog) + assert(filesWithDVs.size === filesWithDVsSize) + assertDeletionVectorsExist(targetLog, filesWithDVs) + } + + /** Returns all [[AddFile]] actions of a Delta table that contain Deletion Vectors. */ + def getFilesWithDeletionVectors(log: DeltaLog): Seq[AddFile] = + log.update().allFiles.collect().filter(_.deletionVector != null).toSeq + + /** Lists the Deletion Vectors files of a table. */ + def listDeletionVectors(log: DeltaLog): Seq[File] = { + val dir = new File(log.dataPath.toUri.getPath) + dir.listFiles().filter(_.getName.startsWith( + DeletionVectorDescriptor.DELETION_VECTOR_FILE_NAME_CORE)) + } + + /** Helper to check that the Deletion Vectors of the provided file actions exist on disk. */ + def assertDeletionVectorsExist(log: DeltaLog, filesWithDVs: Seq[AddFile]): Unit = { + val tablePath = new Path(log.dataPath.toUri.getPath) + for (file <- filesWithDVs) { + val dv = file.deletionVector + assert(dv != null) + assert(dv.isOnDisk && !dv.isInline) + assert(dv.offset.isDefined) + + // Check that DV exists. + val dvPath = dv.absolutePath(tablePath) + assert(new File(dvPath.toString).exists(), s"DV not found $dvPath") + + // Check that cardinality is correct. + val bitmap = newDVStore.read(dvPath, dv.offset.get, dv.sizeInBytes) + assert(dv.cardinality === bitmap.cardinality) + } + } + + /** Enable persistent deletion vectors in new Delta tables. */ + def enableDeletionVectorsInNewTables(conf: RuntimeConfig): Unit = + conf.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, "true") + + /** Enable persistent Deletion Vectors in a Delta table. */ + def enableDeletionVectorsInTable(tablePath: Path, enable: Boolean): Unit = + spark.sql( + s"""ALTER TABLE delta.`$tablePath` + |SET TBLPROPERTIES ('${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = '$enable') + |""".stripMargin) + + /** Enable persistent Deletion Vectors in a Delta table. */ + def enableDeletionVectorsInTable(deltaLog: DeltaLog, enable: Boolean = true): Unit = + enableDeletionVectorsInTable(deltaLog.dataPath, enable) + + /** Enable persistent deletion vectors in new tables and DELETE DML commands. */ + def enableDeletionVectors(conf: RuntimeConfig): Unit = { + enableDeletionVectorsInNewTables(conf) + conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, "true") + } + + // ======== HELPER METHODS TO WRITE DVs ========== + /** Helper method to remove the specified rows in the given file using DVs */ + protected def removeRowsFromFileUsingDV( + log: DeltaLog, + addFile: AddFile, + rowIds: Seq[Long]): Seq[Action] = { + val dv = RoaringBitmapArray(rowIds: _*) + writeFileWithDV(log, addFile, dv) + } + + /** Utility method to remove a ratio of rows from the given file */ + protected def deleteRows( + log: DeltaLog, file: AddFile, approxPhyRows: Long, ratioOfRowsToDelete: Double): Unit = { + val numRowsToDelete = + Math.ceil(ratioOfRowsToDelete * file.numPhysicalRecords.getOrElse(approxPhyRows)).toInt + removeRowsFromFile(log, file, Seq.range(0, numRowsToDelete)) + } + + /** Utility method to remove the given rows from the given file using DVs */ + protected def removeRowsFromFile( + log: DeltaLog, addFile: AddFile, rowIndexesToRemove: Seq[Long]): Unit = { + val txn = log.startTransaction() + val actions = removeRowsFromFileUsingDV(log, addFile, rowIndexesToRemove) + txn.commit(actions, Truncate()) + } + + protected def getFileActionsInLastVersion(log: DeltaLog): (Seq[AddFile], Seq[RemoveFile]) = { + val version = log.update().version + val allFiles = log.getChanges(version).toSeq.head._2 + val add = allFiles.collect { case a: AddFile => a } + val remove = allFiles.collect { case r: RemoveFile => r } + (add, remove) + } + + protected def serializeRoaringBitmapArrayWithDefaultFormat( + dv: RoaringBitmapArray): Array[Byte] = { + val serializationFormat = RoaringBitmapArrayFormat.Portable + dv.serializeAsByteArray(serializationFormat) + } + + /** + * Produce a new [[AddFile]] that will store `dv` in the log using default settings for choosing + * inline or on-disk storage. + * + * Also returns the corresponding [[RemoveFile]] action for `currentFile`. + * + * TODO: Always on-disk for now. Inline support comes later. + */ + protected def writeFileWithDV( + log: DeltaLog, + currentFile: AddFile, + dv: RoaringBitmapArray): Seq[Action] = { + writeFileWithDVOnDisk(log, currentFile, dv) + } + + /** Name of the partition column used by [[createTestDF()]]. */ + val PARTITION_COL = "partitionColumn" + + def createTestDF( + start: Long, + end: Long, + numFiles: Int, + partitionColumn: Option[Int] = None): DataFrame = { + val df = spark.range(start, end, 1, numFiles).withColumn("v", col("id")) + if (partitionColumn.isEmpty) { + df + } else { + df.withColumn(PARTITION_COL, lit(partitionColumn.get)) + } + } + + /** + * Produce a new [[AddFile]] that will reference the `dv` in the log while storing it on-disk. + * + * Also returns the corresponding [[RemoveFile]] action for `currentFile`. + */ + protected def writeFileWithDVOnDisk( + log: DeltaLog, + currentFile: AddFile, + dv: RoaringBitmapArray): Seq[Action] = writeFilesWithDVsOnDisk(log, Seq((currentFile, dv))) + + protected def withDVWriter[T]( + log: DeltaLog, + dvFileID: UUID)(fn: DeletionVectorStore.Writer => T): T = { + val dvStore = newDVStore + // scalastyle:off deltahadoopconfiguration + val conf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val tableWithFS = PathWithFileSystem.withConf(log.dataPath, conf) + val dvPath = + DeletionVectorStore.assembleDeletionVectorPathWithFileSystem(tableWithFS, dvFileID) + val writer = dvStore.createWriter(dvPath) + try { + fn(writer) + } finally { + writer.close() + } + } + + /** + * Produce new [[AddFile]] actions that will reference associated DVs in the log while storing + * all DVs in the same file on-disk. + * + * Also returns the corresponding [[RemoveFile]] actions for the original file entries. + */ + protected def writeFilesWithDVsOnDisk( + log: DeltaLog, + filesWithDVs: Seq[(AddFile, RoaringBitmapArray)]): Seq[Action] = { + val dvFileId = UUID.randomUUID() + withDVWriter(log, dvFileId) { writer => + filesWithDVs.flatMap { case (currentFile, dv) => + val range = writer.write(serializeRoaringBitmapArrayWithDefaultFormat(dv)) + val dvData = DeletionVectorDescriptor.onDiskWithRelativePath( + id = dvFileId, + sizeInBytes = range.length, + cardinality = dv.cardinality, + offset = Some(range.offset)) + val (add, remove) = currentFile.removeRows( + dvData, + updateStats = true + ) + Seq(add, remove) + } + } + } + + /** + * Removes the `numRowsToRemovePerFile` from each file via DV. + * Returns the total number of rows removed. + */ + protected def removeRowsFromAllFilesInLog( + log: DeltaLog, + numRowsToRemovePerFile: Long): Long = { + var numFiles: Option[Int] = None + // This is needed to make the manual commit work correctly, since we are not actually + // running a command that produces metrics. + withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "false") { + val txn = log.startTransaction() + val allAddFiles = txn.snapshot.allFiles.collect() + numFiles = Some(allAddFiles.length) + val bitmap = RoaringBitmapArray(0L until numRowsToRemovePerFile: _*) + val actions = allAddFiles.flatMap { file => + if (file.numPhysicalRecords.isDefined) { + // Only when stats are enabled. Can't check when stats are disabled + assert(file.numPhysicalRecords.get > numRowsToRemovePerFile) + } + writeFileWithDV(log, file, bitmap) + } + txn.commit(actions, DeltaOperations.Delete(predicate = Seq.empty)) + } + numFiles.get * numRowsToRemovePerFile + } + + def newDVStore(): DeletionVectorStore = { + // scalastyle:off deltahadoopconfiguration + DeletionVectorStore.createInstance(spark.sessionState.newHadoopConf()) + // scalastyle:on deltahadoopconfiguration + } + + /** + * Updates an [[AddFile]] with a [[DeletionVectorDescriptor]]. + */ + protected def updateFileDV( + addFile: AddFile, + dvDescriptor: DeletionVectorDescriptor): (AddFile, RemoveFile) = { + addFile.removeRows( + dvDescriptor, + updateStats = true + ) + } + + /** Delete the DV file in the given [[AddFile]]. Assumes the [[AddFile]] has a valid DV. */ + protected def deleteDVFile(tablePath: String, addFile: AddFile): Unit = { + assert(addFile.deletionVector != null) + val dvPath = addFile.deletionVector.absolutePath(new Path(tablePath)) + FileUtils.delete(new File(dvPath.toString)) + } + + /** + * Creates a [[DeletionVectorDescriptor]] from an [[RoaringBitmapArray]] + */ + protected def writeDV( + log: DeltaLog, + bitmapArray: RoaringBitmapArray): DeletionVectorDescriptor = { + val dvFileId = UUID.randomUUID() + withDVWriter(log, dvFileId) { writer => + val range = writer.write(serializeRoaringBitmapArrayWithDefaultFormat(bitmapArray)) + DeletionVectorDescriptor.onDiskWithRelativePath( + id = dvFileId, + sizeInBytes = range.length, + cardinality = bitmapArray.cardinality, + offset = Some(range.offset)) + } + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala new file mode 100644 index 000000000000..68c47b42bb04 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Column, Dataset} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} + +import org.apache.hadoop.fs.Path + +import java.io.File + +import scala.collection.mutable + +// spotless:off +trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession { + + import testImplicits._ + + protected def columnMappingMode: String = NoMapping.name + + private val PHYSICAL_NAME_REGEX = + "col-[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r + + implicit class PhysicalNameString(s: String) { + def phy(deltaLog: DeltaLog): String = { + PHYSICAL_NAME_REGEX + .findFirstIn(s) + .getOrElse(getPhysicalName(s, deltaLog)) + } + } + + protected def columnMappingEnabled: Boolean = { + columnMappingModeString != "none" + } + + protected def columnMappingModeString: String = { + spark.conf.getOption(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey) + .getOrElse("none") + } + + /** + * Check if two schemas are equal ignoring column mapping metadata + * @param schema1 Schema + * @param schema2 Schema + */ + protected def assertEqual(schema1: StructType, schema2: StructType): Unit = { + if (columnMappingEnabled) { + assert( + DeltaColumnMapping.dropColumnMappingMetadata(schema1) == + DeltaColumnMapping.dropColumnMappingMetadata(schema2) + ) + } else { + assert(schema1 == schema2) + } + } + + /** + * Check if two table configurations are equal ignoring column mapping metadata + * @param config1 Table config + * @param config2 Table config + */ + protected def assertEqual( + config1: Map[String, String], + config2: Map[String, String]): Unit = { + if (columnMappingEnabled) { + assert(dropColumnMappingConfigurations(config1) == dropColumnMappingConfigurations(config2)) + } else { + assert(config1 == config2) + } + } + + /** + * Check if a partition with specific values exists. + * Handles both column mapped and non-mapped cases + * @param partCol Partition column name + * @param partValue Partition value + * @param deltaLog DeltaLog + */ + protected def assertPartitionWithValueExists( + partCol: String, + partValue: String, + deltaLog: DeltaLog): Unit = { + assert(getPartitionFilePathsWithValue(partCol, partValue, deltaLog).nonEmpty) + } + + /** + * Assert partition exists in an array of set of partition names/paths + * @param partCol Partition column name + * @param deltaLog Delta log + * @param inputFiles Input files to scan for DF + */ + protected def assertPartitionExists( + partCol: String, + deltaLog: DeltaLog, + inputFiles: Array[String]): Unit = { + val physicalName = partCol.phy(deltaLog) + val allFiles = deltaLog.update().allFiles.collect() + // NOTE: inputFiles are *not* URL-encoded. + val filesWithPartitions = inputFiles.map { f => + allFiles.filter { af => + f.contains(af.toPath.toString) + }.flatMap(_.partitionValues.keys).toSet + } + assert(filesWithPartitions.forall(p => p.count(_ == physicalName) > 0)) + // for non-column mapped mode, we can check the file paths as well + if (!columnMappingEnabled) { + assert(inputFiles.forall(path => path.contains(s"$physicalName=")), + s"${inputFiles.toSeq.mkString("\n")}\ndidn't contain partition columns $physicalName") + } + } + + /** + * Load Deltalog from path + * @param pathOrIdentifier Location + * @param isIdentifier Whether the previous argument is a metastore identifier + * @return + */ + protected def loadDeltaLog(pathOrIdentifier: String, isIdentifier: Boolean = false): DeltaLog = { + if (isIdentifier) { + DeltaLog.forTable(spark, TableIdentifier(pathOrIdentifier)) + } else { + DeltaLog.forTable(spark, pathOrIdentifier) + } + } + + /** + * Convert a (nested) column string to sequence of name parts + * @param col Column string + * @return Sequence of parts + */ + protected def columnNameToParts(col: String): Seq[String] = { + UnresolvedAttribute.parseAttributeName(col) + } + + /** + * Get partition file paths for a specific partition value + * @param partCol Logical or physical partition name + * @param partValue Partition value + * @param deltaLog DeltaLog + * @return List of paths + */ + protected def getPartitionFilePathsWithValue( + partCol: String, + partValue: String, + deltaLog: DeltaLog): Array[String] = { + getPartitionFilePaths(partCol, deltaLog).getOrElse(partValue, Array.empty) + } + + /** + * Get the partition value for null + */ + protected def nullPartitionValue: String = { + if (columnMappingEnabled) { + null + } else { + ExternalCatalogUtils.DEFAULT_PARTITION_NAME + } + } + + /** + * Get partition file paths grouped by partition value + * @param partCol Logical or physical partition name + * @param deltaLog DeltaLog + * @return Partition value to paths + */ + protected def getPartitionFilePaths( + partCol: String, + deltaLog: DeltaLog): Map[String, Array[String]] = { + if (columnMappingEnabled) { + val colName = partCol.phy(deltaLog) + deltaLog.update().allFiles.collect() + .groupBy(_.partitionValues(colName)) + .mapValues(_.map(deltaLog.dataPath.toUri.getPath + "/" + _.path)).toMap + } else { + val partColEscaped = s"${ExternalCatalogUtils.escapePathName(partCol)}" + val dataPath = new File(deltaLog.dataPath.toUri.getPath) + dataPath.listFiles().filter(_.getName.startsWith(s"$partColEscaped=")) + .groupBy(_.getName.split("=").last).mapValues(_.map(_.getPath)).toMap + } + } + + /** + * Group a list of input file paths by partition key-value pair w.r.t. delta log + * @param inputFiles Input file paths + * @param deltaLog Delta log + * @return A mapped array each with the corresponding partition keys + */ + protected def groupInputFilesByPartition( + inputFiles: Array[String], + deltaLog: DeltaLog): Map[(String, String), Array[String]] = { + if (columnMappingEnabled) { + val allFiles = deltaLog.update().allFiles.collect() + val grouped = inputFiles.flatMap { f => + allFiles.find { + af => f.contains(af.toPath.toString) + }.head.partitionValues.map(entry => (f, entry)) + }.groupBy(_._2) + grouped.mapValues(_.map(_._1)).toMap + } else { + inputFiles.groupBy(p => { + val nameParts = new Path(p).getParent.getName.split("=") + (nameParts(0), nameParts(1)) + }) + } + } + + /** + * Drop column mapping configurations from Map + * @param configuration Table configuration + * @return Configuration + */ + protected def dropColumnMappingConfigurations( + configuration: Map[String, String]): Map[String, String] = { + configuration - DeltaConfigs.COLUMN_MAPPING_MODE.key - DeltaConfigs.COLUMN_MAPPING_MAX_ID.key + } + + /** + * Drop column mapping configurations from Dataset (e.g. sql("SHOW TBLPROPERTIES t1") + * @param configs Table configuration + * @return Configuration Dataset + */ + protected def dropColumnMappingConfigurations( + configs: Dataset[(String, String)]): Dataset[(String, String)] = { + spark.createDataset(configs.collect().filter(p => + !Seq( + DeltaConfigs.COLUMN_MAPPING_MAX_ID.key, + DeltaConfigs.COLUMN_MAPPING_MODE.key + ).contains(p._1) + )) + } + + /** Return KV pairs of Protocol-related stuff for checking the result of DESCRIBE TABLE. */ + protected def buildProtocolProps(snapshot: Snapshot): Seq[(String, String)] = { + val mergedConf = + DeltaConfigs.mergeGlobalConfigs(spark.sessionState.conf, snapshot.metadata.configuration) + val metadata = snapshot.metadata.copy(configuration = mergedConf) + var props = Seq( + (Protocol.MIN_READER_VERSION_PROP, + Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString), + (Protocol.MIN_WRITER_VERSION_PROP, + Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString)) + if (snapshot.protocol.supportsReaderFeatures || snapshot.protocol.supportsWriterFeatures) { + props ++= + Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures( + spark, metadata, snapshot.protocol) + ._3 + .map(f => ( + s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}${f.name}", + TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED)) + } + props + } + + /** + * Convert (nested) column name string into physical name with reference from DeltaLog + * If target field does not have physical name, display name is returned + * @param col Logical column name + * @param deltaLog Reference DeltaLog + * @return Physical column name + */ + protected def getPhysicalName(col: String, deltaLog: DeltaLog): String = { + val nameParts = UnresolvedAttribute.parseAttributeName(col) + val realSchema = deltaLog.update().schema + getPhysicalName(nameParts, realSchema) + } + + protected def getPhysicalName(col: String, schema: StructType): String = { + val nameParts = UnresolvedAttribute.parseAttributeName(col) + getPhysicalName(nameParts, schema) + } + + protected def getPhysicalName(nameParts: Seq[String], schema: StructType): String = { + SchemaUtils.findNestedFieldIgnoreCase(schema, nameParts, includeCollections = true) + .map(DeltaColumnMapping.getPhysicalName) + .get + } + + protected def withColumnMappingConf(mode: String)(f: => Any): Any = { + withSQLConf(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey -> mode) { + f + } + } + + protected def withMaxColumnIdConf(maxId: String)(f: => Any): Any = { + withSQLConf(DeltaConfigs.COLUMN_MAPPING_MAX_ID.defaultTablePropertyKey -> maxId) { + f + } + } + + /** + * Gets the physical names of a path. This is used for converting column paths in stats schema, + * so it's ok to not support MapType and ArrayType. + */ + def getPhysicalPathForStats(path: Seq[String], schema: StructType): Option[Seq[String]] = { + if (path.isEmpty) return Some(Seq.empty) + val field = schema.fields.find(_.name.equalsIgnoreCase(path.head)) + field match { + case Some(f @ StructField(_, _: AtomicType, _, _ )) => + if (path.size == 1) Some(Seq(DeltaColumnMapping.getPhysicalName(f))) else None + case Some(f @ StructField(_, st: StructType, _, _)) => + val tail = getPhysicalPathForStats(path.tail, st) + tail.map(DeltaColumnMapping.getPhysicalName(f) +: _) + case _ => + None + } + } + + /** + * Convert (nested) column name string into physical name. + * Ignore parts of special paths starting with: + * 1. stats columns: minValues, maxValues, numRecords + * 2. stats df: stats_parsed + * 3. partition values: partitionValues_parsed, partitionValues + * @param col Logical column name (e.g. a.b.c) + * @param schema Reference schema with metadata + * @return Unresolved attribute with physical name paths + */ + protected def convertColumnNameToAttributeWithPhysicalName( + col: String, + schema: StructType): UnresolvedAttribute = { + val parts = UnresolvedAttribute.parseAttributeName(col) + val shouldIgnoreFirstPart = Set( + "minValues", + "maxValues", + "numRecords", + Checkpoints.STRUCT_PARTITIONS_COL_NAME, + "partitionValues") + val shouldIgnoreSecondPart = Set(Checkpoints.STRUCT_STATS_COL_NAME, "stats") + val physical = if (shouldIgnoreFirstPart.contains(parts.head)) { + parts.head +: getPhysicalPathForStats(parts.tail, schema).getOrElse(parts.tail) + } else if (shouldIgnoreSecondPart.contains(parts.head)) { + parts.take(2) ++ getPhysicalPathForStats(parts.slice(2, parts.length), schema) + .getOrElse(parts.slice(2, parts.length)) + } else { + getPhysicalPathForStats(parts, schema).getOrElse(parts) + } + UnresolvedAttribute(physical) + } + + /** + * Convert a list of (nested) stats columns into physical name with reference from DeltaLog + * @param columns Logical columns + * @param deltaLog Reference DeltaLog + * @return Physical columns + */ + protected def convertToPhysicalColumns( + columns: Seq[Column], + deltaLog: DeltaLog): Seq[Column] = { + val schema = deltaLog.update().schema + columns.map { col => + val newExpr = col.expr.transform { + case a: Attribute => + convertColumnNameToAttributeWithPhysicalName(a.name, schema) + } + Column(newExpr) + } + } + + /** + * Standard CONVERT TO DELTA + * @param tableOrPath String + */ + protected def convertToDelta(tableOrPath: String): Unit = { + sql(s"CONVERT TO DELTA $tableOrPath") + } + + /** + * Force enable streaming read (with possible data loss) on column mapping enabled table with + * drop / rename schema changes. + */ + protected def withStreamingReadOnColumnMappingTableEnabled(f: => Unit): Unit = { + if (columnMappingEnabled) { + withSQLConf(DeltaSQLConf + .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES.key -> "true") { + f + } + } else { + f + } + } + +} + +trait DeltaColumnMappingTestUtils extends DeltaColumnMappingTestUtilsBase + +/** + * Include this trait to enable Id column mapping mode for a suite + */ +trait DeltaColumnMappingEnableIdMode extends SharedSparkSession + with DeltaColumnMappingTestUtils + with DeltaColumnMappingSelectedTestMixin { + + protected override def columnMappingMode: String = IdMapping.name + + protected override def sparkConf: SparkConf = + super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, "id") + + /** + * CONVERT TO DELTA blocked in id mode + */ + protected override def convertToDelta(tableOrPath: String): Unit = + throw DeltaErrors.convertToDeltaWithColumnMappingNotSupported( + DeltaColumnMappingMode(columnMappingModeString) + ) +} + +/** + * Include this trait to enable Name column mapping mode for a suite + */ +trait DeltaColumnMappingEnableNameMode extends SharedSparkSession + with DeltaColumnMappingTestUtils + with DeltaColumnMappingSelectedTestMixin { + + protected override def columnMappingMode: String = NameMapping.name + + protected override def sparkConf: SparkConf = + super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, columnMappingMode) + + /** + * CONVERT TO DELTA can be possible under name mode in tests + */ + protected override def convertToDelta(tableOrPath: String): Unit = { + withColumnMappingConf("none") { + super.convertToDelta(tableOrPath) + } + + val (deltaPath, deltaLog) = + if (tableOrPath.contains("parquet") && tableOrPath.contains("`")) { + // parquet.`PATH` + val plainPath = tableOrPath.split('.').last.drop(1).dropRight(1) + (s"delta.`$plainPath`", DeltaLog.forTable(spark, plainPath)) + } else { + (tableOrPath, DeltaLog.forTable(spark, TableIdentifier(tableOrPath))) + } + + val tableReaderVersion = deltaLog.unsafeVolatileSnapshot.protocol.minReaderVersion + val tableWriterVersion = deltaLog.unsafeVolatileSnapshot.protocol.minWriterVersion + val requiredReaderVersion = if (tableWriterVersion >= + TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) { + // If the writer version of the table supports table features, we need to + // bump the reader version to table features to enable column mapping. + TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION + } else { + ColumnMappingTableFeature.minReaderVersion + } + val readerVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION).max( + requiredReaderVersion) + val writerVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION).max( + ColumnMappingTableFeature.minWriterVersion) + + val properties = mutable.ListBuffer(DeltaConfigs.COLUMN_MAPPING_MODE.key -> "name") + if (tableReaderVersion < readerVersion) { + properties += DeltaConfigs.MIN_READER_VERSION.key -> readerVersion.toString + } + if (tableWriterVersion < writerVersion) { + properties += DeltaConfigs.MIN_WRITER_VERSION.key -> writerVersion.toString + } + val propertiesStr = properties.map(kv => s"'${kv._1}' = '${kv._2}'").mkString(", ") + sql(s"ALTER TABLE $deltaPath SET TBLPROPERTIES ($propertiesStr)") + } + +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala new file mode 100644 index 000000000000..4e7326c8c15c --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.{SparkContext, SparkFunSuite, SparkThrowable} +import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode} +import org.apache.spark.sql.delta.DeltaTestUtils.Plans +import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLTestUtils +import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.QueryExecutionListener +import org.apache.spark.util.Utils + +import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord} +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.delta.tables.{DeltaTable => IODeltaTable} +import org.apache.hadoop.fs.{FileStatus, Path} +import org.scalatest.BeforeAndAfterEach + +import java.io.{BufferedReader, File, InputStreamReader} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Locale +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.concurrent +import scala.reflect.ClassTag +import scala.util.matching.Regex + +// spotless:off +trait DeltaTestUtilsBase { + import DeltaTestUtils.TableIdentifierOrPath + + final val BOOLEAN_DOMAIN: Seq[Boolean] = Seq(true, false) + + class PlanCapturingListener() extends QueryExecutionListener { + + private[this] var capturedPlans = List.empty[Plans] + + def plans: Seq[Plans] = capturedPlans.reverse + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + capturedPlans ::= Plans( + qe.analyzed, + qe.optimizedPlan, + qe.sparkPlan, + qe.executedPlan) + } + + override def onFailure( + funcName: String, qe: QueryExecution, error: Exception): Unit = {} + } + + /** + * Run a thunk with physical plans for all queries captured and passed into a provided buffer. + */ + def withLogicalPlansCaptured[T]( + spark: SparkSession, + optimizedPlan: Boolean)( + thunk: => Unit): Seq[LogicalPlan] = { + val planCapturingListener = new PlanCapturingListener + + spark.sparkContext.listenerBus.waitUntilEmpty(15000) + spark.listenerManager.register(planCapturingListener) + try { + thunk + spark.sparkContext.listenerBus.waitUntilEmpty(15000) + planCapturingListener.plans.map { plans => + if (optimizedPlan) plans.optimized else plans.analyzed + } + } finally { + spark.listenerManager.unregister(planCapturingListener) + } + } + + /** + * Run a thunk with physical plans for all queries captured and passed into a provided buffer. + */ + def withPhysicalPlansCaptured[T]( + spark: SparkSession)( + thunk: => Unit): Seq[SparkPlan] = { + val planCapturingListener = new PlanCapturingListener + + spark.sparkContext.listenerBus.waitUntilEmpty(15000) + spark.listenerManager.register(planCapturingListener) + try { + thunk + spark.sparkContext.listenerBus.waitUntilEmpty(15000) + planCapturingListener.plans.map(_.sparkPlan) + } finally { + spark.listenerManager.unregister(planCapturingListener) + } + } + + /** + * Run a thunk with logical and physical plans for all queries captured and passed + * into a provided buffer. + */ + def withAllPlansCaptured[T]( + spark: SparkSession)( + thunk: => Unit): Seq[Plans] = { + val planCapturingListener = new PlanCapturingListener + + spark.sparkContext.listenerBus.waitUntilEmpty(15000) + spark.listenerManager.register(planCapturingListener) + try { + thunk + spark.sparkContext.listenerBus.waitUntilEmpty(15000) + planCapturingListener.plans + } finally { + spark.listenerManager.unregister(planCapturingListener) + } + } + + def countSparkJobs(sc: SparkContext, f: => Unit): Int = { + val jobs: concurrent.Map[Int, Long] = new ConcurrentHashMap[Int, Long]().asScala + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobs.put(jobStart.jobId, jobStart.stageInfos.map(_.numTasks).sum) + } + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = jobEnd.jobResult match { + case JobFailed(_) => jobs.remove(jobEnd.jobId) + case _ => // On success, do nothing. + } + } + sc.addSparkListener(listener) + try { + sc.listenerBus.waitUntilEmpty(15000) + f + sc.listenerBus.waitUntilEmpty(15000) + } finally { + sc.removeSparkListener(listener) + } + // Spark will always log a job start/end event even when the job does not launch any task. + jobs.values.count(_ > 0) + } + + /** Filter `usageRecords` by the `opType` tag or field. */ + def filterUsageRecords(usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = { + usageRecords.filter { r => + r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType) + } + } + + def collectUsageLogs(opType: String)(f: => Unit): collection.Seq[UsageRecord] = { + Log4jUsageLogger.track(f).filter { r => + r.metric == "tahoeEvent" && + r.tags.get("opType").contains(opType) + } + } + + /** + * Remove protocol and metadata fields from checksum file of json format + */ + def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit = { + // scalastyle:off deltahadoopconfiguration + val fs = checksumFilePath.getFileSystem( + SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get + ) + // scalastyle:on deltahadoopconfiguration + if (!fs.exists(checksumFilePath)) return + val stream = fs.open(checksumFilePath) + val reader = new BufferedReader(new InputStreamReader(stream, UTF_8)) + val content = reader.readLine() + stream.close() + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val map = mapper.readValue(content, classOf[Map[String, String]]) + val partialContent = mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n" + val output = fs.create(checksumFilePath, true) + output.write(partialContent.getBytes(UTF_8)) + output.close() + } + + protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = { + // The expected plan for touched file computation is of the format below. + // The data column should be pruned from both leaves. + // HashAggregate(output=[count#3463L]) + // +- HashAggregate(output=[count#3466L]) + // +- Project + // +- Filter (isnotnull(count#3454L) AND (count#3454L > 1)) + // +- HashAggregate(output=[count#3454L]) + // +- HashAggregate(output=[_row_id_#3418L, sum#3468L]) + // +- Project [_row_id_#3418L, UDF(_file_name_#3422) AS one#3448] + // +- BroadcastHashJoin [id#3342L], [id#3412L], Inner, BuildLeft + // :- Project [id#3342L] + // : +- Filter isnotnull(id#3342L) + // : +- FileScan parquet [id#3342L,part#3343L] + // +- Filter isnotnull(id#3412L) + // +- Project [...] + // +- Project [...] + // +- FileScan parquet [id#3412L,part#3413L] + // Note: It can be RDDScanExec instead of FileScan if the source was materialized. + // We pick the first plan starting from FileScan and ending in HashAggregate as a + // stable heuristic for the one we want. + plans.map(_.executedPlan) + .filter { + case WholeStageCodegenExec(hash: HashAggregateExec) => + hash.collectLeaves().size == 2 && + hash.collectLeaves() + .forall { s => + s.isInstanceOf[FileSourceScanExec] || + s.isInstanceOf[RDDScanExec] + } + case _ => false + }.head + } + + /** + * Separate name- from path-based SQL table identifiers. + */ + def getTableIdentifierOrPath(sqlIdentifier: String): TableIdentifierOrPath = { + // Match: delta.`path`[[ as] alias] or tahoe.`path`[[ as] alias] + val pathMatcher: Regex = raw"(?:delta|tahoe)\.`([^`]+)`(?:(?: as)? (.+))?".r + // Match: db.table[[ as] alias] + val qualifiedDbMatcher: Regex = raw"`?([^\.` ]+)`?\.`?([^\.` ]+)`?(?:(?: as)? (.+))?".r + // Match: table[[ as] alias] + val unqualifiedNameMatcher: Regex = raw"([^ ]+)(?:(?: as)? (.+))?".r + sqlIdentifier match { + case pathMatcher(path, alias) => + TableIdentifierOrPath.Path(path, Option(alias)) + case qualifiedDbMatcher(dbName, tableName, alias) => + TableIdentifierOrPath.Identifier(TableIdentifier(tableName, Some(dbName)), Option(alias)) + case unqualifiedNameMatcher(tableName, alias) => + TableIdentifierOrPath.Identifier(TableIdentifier(tableName), Option(alias)) + } + } + + /** + * Produce a DeltaTable instance given a `TableIdentifierOrPath` instance. + */ + def getDeltaTableForIdentifierOrPath( + spark: SparkSession, + identifierOrPath: TableIdentifierOrPath): IODeltaTable = { + identifierOrPath match { + case TableIdentifierOrPath.Identifier(id, optionalAlias) => + val table = IODeltaTable.forName(spark, id.unquotedString) + optionalAlias.map(table.as(_)).getOrElse(table) + case TableIdentifierOrPath.Path(path, optionalAlias) => + val table = IODeltaTable.forPath(spark, path) + optionalAlias.map(table.as(_)).getOrElse(table) + } + } + + @deprecated("Use checkError() instead") + protected def errorContains(errMsg: String, str: String): Unit = { + assert(errMsg.toLowerCase(Locale.ROOT).contains(str.toLowerCase(Locale.ROOT))) + } + + /** + * Helper types to define the expected result of a test case. + * Either: + * - Success: include an expected value to check, e.g. expected schema or result as a DF or rows. + * - Failure: an exception is thrown and the caller passes a function to check that it matches an + * expected error, typ. `checkError()` or `checkErrorMatchPVals()`. + */ + sealed trait ExpectedResult[-T] + object ExpectedResult { + case class Success[T](expected: T) extends ExpectedResult[T] + case class Failure[T](checkError: SparkThrowable => Unit) extends ExpectedResult[T] + } + + /** Utility method to check exception `e` is of type `E` or a cause of it is of type `E` */ + def findIfResponsible[E <: Throwable: ClassTag](e: Throwable): Option[E] = e match { + case culprit: E => Some(culprit) + case _ => + val children = Option(e.getCause).iterator ++ e.getSuppressed.iterator + children + .map(findIfResponsible[E](_)) + .collectFirst { case Some(culprit) => culprit } + } + + def verifyBackfilled(file: FileStatus): Unit = { + val unbackfilled = file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString) + assert(!unbackfilled, s"File $file was not backfilled") + } + + def verifyUnbackfilled(file: FileStatus): Unit = { + val unbackfilled = file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString) + assert(unbackfilled, s"File $file was backfilled") + } +} + +trait DeltaCheckpointTestUtils + extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession => + + def testDifferentCheckpoints(testName: String, quiet: Boolean = false) + (f: (CheckpointPolicy.Policy, Option[V2Checkpoint.Format]) => Unit): Unit = { + test(s"$testName [Checkpoint V1]") { + def testFunc(): Unit = { + withSQLConf(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> + CheckpointPolicy.Classic.name) { + f(CheckpointPolicy.Classic, None) + } + } + if (quiet) quietly { testFunc() } else testFunc() + } + for (checkpointFormat <- V2Checkpoint.Format.ALL) + test(s"$testName [Checkpoint V2, format: ${checkpointFormat.name}]") { + def testFunc(): Unit = { + withSQLConf( + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> checkpointFormat.name + ) { + f(CheckpointPolicy.V2, Some(checkpointFormat)) + } + } + if (quiet) quietly { testFunc() } else testFunc() + } + } + + /** + * Helper method to get the dataframe corresponding to the files which has the file actions for a + * given checkpoint. + */ + def getCheckpointDfForFilesContainingFileActions( + log: DeltaLog, + checkpointFile: Path): DataFrame = { + val ci = CheckpointInstance.apply(checkpointFile) + val allCheckpointFiles = log + .listFrom(ci.version) + .filter(FileNames.isCheckpointFile) + .filter(f => CheckpointInstance(f.getPath) == ci) + .toSeq + val fileActionsFileIndex = ci.format match { + case CheckpointInstance.Format.V2 => + val incompleteCheckpointProvider = ci.getCheckpointProvider(log, allCheckpointFiles) + val df = log.loadIndex(incompleteCheckpointProvider.topLevelFileIndex.get, Action.logSchema) + val sidecarFileStatuses = df.as[SingleAction].collect().map(_.unwrap).collect { + case sf: SidecarFile => sf + }.map(sf => sf.toFileStatus(log.logPath)) + DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, sidecarFileStatuses) + case CheckpointInstance.Format.SINGLE | CheckpointInstance.Format.WITH_PARTS => + DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, + allCheckpointFiles.toArray) + case _ => + throw new Exception(s"Unexpected checkpoint format for file $checkpointFile") + } + fileActionsFileIndex.files + .map(fileStatus => spark.read.parquet(fileStatus.getPath.toString)) + .reduce(_.union(_)) + } +} + +object DeltaTestUtils extends DeltaTestUtilsBase { + + sealed trait TableIdentifierOrPath + object TableIdentifierOrPath { + case class Identifier(id: TableIdentifier, alias: Option[String]) + extends TableIdentifierOrPath + case class Path(path: String, alias: Option[String]) extends TableIdentifierOrPath + } + + case class Plans( + analyzed: LogicalPlan, + optimized: LogicalPlan, + sparkPlan: SparkPlan, + executedPlan: SparkPlan) + + /** + * Creates an AddFile that can be used for tests where the exact parameters do not matter. + */ + def createTestAddFile( + encodedPath: String = "foo", + partitionValues: Map[String, String] = Map.empty, + size: Long = 1L, + modificationTime: Long = 1L, + dataChange: Boolean = true, + stats: String = "{\"numRecords\": 1}"): AddFile = { + AddFile(encodedPath, partitionValues, size, modificationTime, dataChange, stats) + } + + /** + * Extracts the table name and alias (if any) from the given string. Correctly handles whitespaces + * in table name but doesn't support whitespaces in alias. + */ + def parseTableAndAlias(table: String): (String, Option[String]) = { + // Matches 'delta.`path` AS alias' (case insensitive). + val deltaPathWithAsAlias = raw"(?i)(delta\.`.+`)(?: AS) (\S+)".r + // Matches 'delta.`path` alias'. + val deltaPathWithAlias = raw"(delta\.`.+`) (\S+)".r + // Matches 'delta.`path`'. + val deltaPath = raw"(delta\.`.+`)".r + // Matches 'tableName AS alias' (case insensitive). + val tableNameWithAsAlias = raw"(?i)(.+)(?: AS) (\S+)".r + // Matches 'tableName alias'. + val tableNameWithAlias = raw"(.+) (.+)".r + + table match { + case deltaPathWithAsAlias(tableName, alias) => tableName -> Some(alias) + case deltaPathWithAlias(tableName, alias) => tableName -> Some(alias) + case deltaPath(tableName) => tableName -> None + case tableNameWithAsAlias(tableName, alias) => tableName -> Some(alias) + case tableNameWithAlias(tableName, alias) => tableName -> Some(alias) + case tableName => tableName -> None + } + } + + /** + * Implements an ordering where `x < y` iff both reader and writer versions of + * `x` are strictly less than those of `y`. + * + * Can be used to conveniently check that this relationship holds in tests/assertions + * without having to write out the conjunction of the two subconditions every time. + */ + case object StrictProtocolOrdering extends PartialOrdering[Protocol] { + override def tryCompare(x: Protocol, y: Protocol): Option[Int] = { + if (x.minReaderVersion == y.minReaderVersion && + x.minWriterVersion == y.minWriterVersion) { + Some(0) + } else if (x.minReaderVersion < y.minReaderVersion && + x.minWriterVersion < y.minWriterVersion) { + Some(-1) + } else if (x.minReaderVersion > y.minReaderVersion && + x.minWriterVersion > y.minWriterVersion) { + Some(1) + } else { + None + } + } + + override def lteq(x: Protocol, y: Protocol): Boolean = + x.minReaderVersion <= y.minReaderVersion && x.minWriterVersion <= y.minWriterVersion + + // Just a more readable version of `lteq`. + def fulfillsVersionRequirements(actual: Protocol, requirement: Protocol): Boolean = + lteq(requirement, actual) + } +} + +trait DeltaTestUtilsForTempViews + extends SharedSparkSession + with DeltaTestUtilsBase { + + def testWithTempView(testName: String)(testFun: Boolean => Any): Unit = { + Seq(true, false).foreach { isSQLTempView => + val tempViewUsed = if (isSQLTempView) "SQL TempView" else "Dataset TempView" + test(s"$testName - $tempViewUsed") { + withTempView("v") { + testFun(isSQLTempView) + } + } + } + } + + def testQuietlyWithTempView(testName: String)(testFun: Boolean => Any): Unit = { + Seq(true, false).foreach { isSQLTempView => + val tempViewUsed = if (isSQLTempView) "SQL TempView" else "Dataset TempView" + testQuietly(s"$testName - $tempViewUsed") { + withTempView("v") { + testFun(isSQLTempView) + } + } + } + } + + def createTempViewFromTable( + tableName: String, + isSQLTempView: Boolean, + format: Option[String] = None): Unit = { + if (isSQLTempView) { + sql(s"CREATE OR REPLACE TEMP VIEW v AS SELECT * from $tableName") + } else { + spark.read.format(format.getOrElse("delta")).table(tableName).createOrReplaceTempView("v") + } + } + + def createTempViewFromSelect(text: String, isSQLTempView: Boolean): Unit = { + if (isSQLTempView) { + sql(s"CREATE OR REPLACE TEMP VIEW v AS $text") + } else { + sql(text).createOrReplaceTempView("v") + } + } + + def testErrorMessageAndClass( + isSQLTempView: Boolean, + ex: AnalysisException, + expectedErrorMsgForSQLTempView: String = null, + expectedErrorMsgForDataSetTempView: String = null, + expectedErrorClassForSQLTempView: String = null, + expectedErrorClassForDataSetTempView: String = null): Unit = { + if (isSQLTempView) { + if (expectedErrorMsgForSQLTempView != null) { + errorContains(ex.getMessage, expectedErrorMsgForSQLTempView) + } + if (expectedErrorClassForSQLTempView != null) { + assert(ex.getErrorClass == expectedErrorClassForSQLTempView) + } + } else { + if (expectedErrorMsgForDataSetTempView != null) { + errorContains(ex.getMessage, expectedErrorMsgForDataSetTempView) + } + if (expectedErrorClassForDataSetTempView != null) { + assert(ex.getErrorClass == expectedErrorClassForDataSetTempView, ex.getMessage) + } + } + } +} + +/** + * Trait collecting helper methods for DML tests e.p. creating a test table for each test and + * cleaning it up after each test. + */ +trait DeltaDMLTestUtils + extends DeltaSQLTestUtils + with DeltaTestUtilsBase + with BeforeAndAfterEach { + self: SharedSparkSession => + + import testImplicits._ + + protected var tempDir: File = _ + + protected var deltaLog: DeltaLog = _ + + protected def tempPath: String = tempDir.getCanonicalPath + + override protected def beforeEach(): Unit = { + super.beforeEach() + // Using a space in path to provide coverage for special characters. + tempDir = Utils.createTempDir(namePrefix = "spark test") + deltaLog = DeltaLog.forTable(spark, new Path(tempPath)) + } + + override protected def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + DeltaLog.clearCache() + } finally { + super.afterEach() + } + } + + protected def append(df: DataFrame, partitionBy: Seq[String] = Nil): Unit = { + val dfw = df.write.format("delta").mode("append") + if (partitionBy.nonEmpty) { + dfw.partitionBy(partitionBy: _*) + } + dfw.save(tempPath) + } + + protected def withKeyValueData( + source: Seq[(Int, Int)], + target: Seq[(Int, Int)], + isKeyPartitioned: Boolean = false, + sourceKeyValueNames: (String, String) = ("key", "value"), + targetKeyValueNames: (String, String) = ("key", "value"))( + thunk: (String, String) => Unit = null): Unit = { + + import testImplicits._ + + append(target.toDF(targetKeyValueNames._1, targetKeyValueNames._2).coalesce(2), + if (isKeyPartitioned) Seq(targetKeyValueNames._1) else Nil) + withTempView("source") { + source.toDF(sourceKeyValueNames._1, sourceKeyValueNames._2).createOrReplaceTempView("source") + thunk("source", s"delta.`$tempPath`") + } + } + + /** + * Parse the input JSON data into a dataframe, one row per input element. + * Throws an exception on malformed inputs or records that don't comply with the provided schema. + */ + protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = { + if (schema != null) { + spark.read + .schema(schema) + .option("mode", FailFastMode.name) + .json(data.toDS) + } else { + spark.read + .option("mode", FailFastMode.name) + .json(data.toDS) + } + } + + protected def readDeltaTable(path: String): DataFrame = { + spark.read.format("delta").load(path) + } + + protected def getDeltaFileStmt(path: String): String = s"SELECT * FROM delta.`$path`" + + /** + * Finds the latest operation of the given type that ran on the test table and returns the + * dataframe with the changes of the corresponding table version. + * + * @param operation Delta operation name, see [[DeltaOperations]]. + */ + protected def getCDCForLatestOperation(deltaLog: DeltaLog, operation: String): DataFrame = { + val latestOperation = deltaLog.history + .getHistory(None) + .find(_.operation == operation) + assert(latestOperation.nonEmpty, s"Couldn't find a ${operation} operation to check CDF") + + val latestOperationVersion = latestOperation.get.version + assert(latestOperationVersion.nonEmpty, + s"Latest ${operation} operation doesn't have a version associated with it") + + CDCReader + .changesToBatchDF( + deltaLog, + latestOperationVersion.get, + latestOperationVersion.get, + spark) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP) + .drop(CDCReader.CDC_COMMIT_VERSION) + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala new file mode 100644 index 000000000000..135dd97bfae2 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.test + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.delta.DeltaColumnMappingTestUtils +import org.apache.spark.sql.delta.DeltaConfigs + +import org.scalactic.source.Position +import org.scalatest.Tag +import org.scalatest.exceptions.TestFailedException + +import scala.collection.mutable + +// spotless:off +/** + * A trait for selective enabling certain tests to run for column mapping modes + */ +trait DeltaColumnMappingSelectedTestMixin extends SparkFunSuite + with DeltaSQLTestUtils with DeltaColumnMappingTestUtils { + + protected def runOnlyTests: Seq[String] = Seq() + + /** + * If true, will run all tests. + * Requires that `runOnlyTests` is empty. + */ + protected def runAllTests: Boolean = false + + private val testsRun: mutable.Set[String] = mutable.Set.empty + + override protected def test( + testName: String, + testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { + require(!runAllTests || runOnlyTests.isEmpty, + "If `runAllTests` is true then `runOnlyTests` must be empty") + + if (runAllTests || runOnlyTests.contains(testName)) { + super.test(s"$testName - column mapping $columnMappingMode mode", testTags: _*) { + testsRun.add(testName) + withSQLConf( + DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey -> columnMappingMode) { + testFun + } + } + } else { + super.ignore(s"$testName - ignored by DeltaColumnMappingSelectedTestMixin")(testFun) + } + } + + override def afterAll(): Unit = { + super.afterAll() + val missingTests = runOnlyTests.toSet diff testsRun + if (missingTests.nonEmpty) { + throw new TestFailedException( + Some("Not all selected column mapping tests were run. Missing: " + + missingTests.mkString(", ")), None, 0) + } + } + +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala new file mode 100644 index 000000000000..b1666972843b --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.test + +import org.apache.spark.sql.QueryTest + +import org.scalactic.source.Position +import org.scalatest.Tag + +// spotless:off +trait DeltaExcludedTestMixin extends QueryTest { + + /** Tests to be ignored by the runner. */ + override def excluded: Seq[String] = Seq.empty + + protected override def test(testName: String, testTags: Tag*) + (testFun: => Any) + (implicit pos: Position): Unit = { + if (excluded.contains(testName)) { + super.ignore(testName, testTags: _*)(testFun) + } else { + super.test(testName, testTags: _*)(testFun) + } + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala new file mode 100644 index 000000000000..3d94d2bde33f --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.test + +import org.apache.spark.SparkConf +import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.test.SharedSparkSession + +import io.delta.sql.DeltaSparkSessionExtension + +// spotless:off +/** + * A trait for tests that are testing a fully set up SparkSession with all of Delta's requirements, + * such as the configuration of the DeltaCatalog and the addition of all Delta extensions. + */ +trait DeltaSQLCommandTest extends SharedSparkSession { + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + + // Delta. + conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, + classOf[DeltaSparkSessionExtension].getName) + .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, + classOf[DeltaCatalog].getName) + + // Gluten. + conf.set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.default.parallelism", "1") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala new file mode 100644 index 000000000000..22f4e9fa1137 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.test + +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils + +import java.io.File + +// spotless:off +trait DeltaSQLTestUtils extends SQLTestUtils { + /** + * Override the temp dir/path creation methods from [[SQLTestUtils]] to: + * 1. Drop the call to `waitForTasksToFinish` which is a source of flakiness due to timeouts + * without clear benefits. + * 2. Allow creating paths with special characters for better test coverage. + */ + + protected val defaultTempDirPrefix: String = "spark%dir%prefix" + + override protected def withTempDir(f: File => Unit): Unit = { + withTempDir(prefix = defaultTempDirPrefix)(f) + } + + override protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { + withTempPaths(numPaths, prefix = defaultTempDirPrefix)(f) + } + + override def withTempPath(f: File => Unit): Unit = { + withTempPath(prefix = defaultTempDirPrefix)(f) + } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + */ + def withTempDir(prefix: String)(f: File => Unit): Unit = { + val path = Utils.createTempDir(namePrefix = prefix) + try f(path) finally Utils.deleteRecursively(path) + } + + /** + * Generates a temporary directory path without creating the actual directory, which is then + * passed to `f` and will be deleted after `f` returns. + */ + def withTempPath(prefix: String)(f: File => Unit): Unit = { + val path = Utils.createTempDir(namePrefix = prefix) + path.delete() + try f(path) finally Utils.deleteRecursively(path) + } + + /** + * Generates the specified number of temporary directory paths without creating the actual + * directories, which are then passed to `f` and will be deleted after `f` returns. + */ + protected def withTempPaths(numPaths: Int, prefix: String)(f: Seq[File] => Unit): Unit = { + val files = + Seq.fill[File](numPaths)(Utils.createTempDir(namePrefix = prefix).getCanonicalFile) + files.foreach(_.delete()) + try f(files) finally { + files.foreach(Utils.deleteRecursively) + } + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala new file mode 100644 index 000000000000..f2e7acc695fa --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.test + +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.delta.{DeltaLog, OptimisticTransaction, Snapshot} +import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate, Operation, Write} +import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics +import org.apache.spark.sql.delta.coordinatedcommits.TableCommitCoordinatorClient +import org.apache.spark.sql.delta.hooks.AutoCompact +import org.apache.spark.sql.delta.stats.StatisticsCollection +import org.apache.spark.util.Clock + +import io.delta.storage.commit.{CommitResponse, GetCommitsResponse, UpdatedActions} +import org.apache.hadoop.fs.Path + +import java.io.File + +// spotless:off +/** + * Additional method definitions for Delta classes that are intended for use only in testing. + */ +object DeltaTestImplicits { + implicit class OptimisticTxnTestHelper(txn: OptimisticTransaction) { + + /** Ensure that the initial commit of a Delta table always contains a Metadata action */ + def commitActions(op: Operation, actions: Action*): Long = { + if (txn.readVersion == -1) { + val metadataOpt = actions.collectFirst { case m: Metadata => m } + val protocolOpt = actions.collectFirst { case p: Protocol => p } + val otherActions = + actions.filterNot(a => a.isInstanceOf[Metadata] || a.isInstanceOf[Protocol]) + (metadataOpt, protocolOpt) match { + case (Some(metadata), Some(protocol)) => + // When both metadata and protocol are explicitly passed, use them. + txn.updateProtocol(protocol) + // This will auto upgrade any required table features in the passed protocol as per + // given metadata. + txn.updateMetadataForNewTable(metadata) + case (Some(metadata), None) => + // When just metadata is passed, use it. + // This will auto generate protocol as per metadata. + txn.updateMetadataForNewTable(metadata) + case (None, Some(protocol)) => + txn.updateProtocol(protocol) + txn.updateMetadataForNewTable(Metadata()) + case (None, None) => + // If neither metadata nor protocol is explicitly passed, then use default Metadata and + // with the maximum protocol. + txn.updateMetadataForNewTable(Metadata()) + txn.updateProtocol(Action.supportedProtocolVersion()) + } + txn.commit(otherActions, op) + } else { + txn.commit(actions, op) + } + } + + def commitManually(actions: Action*): Long = { + commitActions(ManualUpdate, actions: _*) + } + + def commitWriteAppend(actions: Action*): Long = { + commitActions(Write(SaveMode.Append), actions: _*) + } + } + + /** Add test-only File overloads for DeltaTable.forPath */ + implicit class DeltaLogObjectTestHelper(deltaLog: DeltaLog.type) { + def forTable(spark: SparkSession, dataPath: File): DeltaLog = { + DeltaLog.forTable(spark, new Path(dataPath.getCanonicalPath)) + } + + def forTable(spark: SparkSession, dataPath: File, clock: Clock): DeltaLog = { + DeltaLog.forTable(spark, new Path(dataPath.getCanonicalPath), clock) + } + } + + /** Helper class for working with [[TableCommitCoordinatorClient]] */ + implicit class TableCommitCoordinatorClientTestHelper( + tableCommitCoordinatorClient: TableCommitCoordinatorClient) { + + def commit( + commitVersion: Long, + actions: Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { + tableCommitCoordinatorClient.commit( + commitVersion, actions, updatedActions, tableIdentifierOpt = None) + } + + def getCommits( + startVersion: Option[Long] = None, + endVersion: Option[Long] = None): GetCommitsResponse = { + tableCommitCoordinatorClient.getCommits(tableIdentifierOpt = None, startVersion, endVersion) + } + + def backfillToVersion( + version: Long, + lastKnownBackfilledVersion: Option[Long] = None): Unit = { + tableCommitCoordinatorClient.backfillToVersion( + tableIdentifierOpt = None, version, lastKnownBackfilledVersion) + } + } + + + /** Helper class for working with [[Snapshot]] */ + implicit class SnapshotTestHelper(snapshot: Snapshot) { + def ensureCommitFilesBackfilled(): Unit = { + snapshot.ensureCommitFilesBackfilled(catalogTableOpt = None) + } + } + + /** + * Helper class for working with the most recent snapshot in the deltaLog + */ + implicit class DeltaLogTestHelper(deltaLog: DeltaLog) { + def snapshot: Snapshot = { + deltaLog.unsafeVolatileSnapshot + } + + def checkpoint(): Unit = { + deltaLog.checkpoint(snapshot) + } + + def checkpointInterval(): Int = { + deltaLog.checkpointInterval(snapshot.metadata) + } + + def deltaRetentionMillis(): Long = { + deltaLog.deltaRetentionMillis(snapshot.metadata) + } + + def enableExpiredLogCleanup(): Boolean = { + deltaLog.enableExpiredLogCleanup(snapshot.metadata) + } + + def upgradeProtocol(newVersion: Protocol): Unit = { + upgradeProtocol(deltaLog.unsafeVolatileSnapshot, newVersion) + } + + def upgradeProtocol(snapshot: Snapshot, newVersion: Protocol): Unit = { + deltaLog.upgradeProtocol(None, snapshot, newVersion) + } + } + + implicit class DeltaTableV2ObjectTestHelper(dt: DeltaTableV2.type) { + /** Convenience overload that omits the cmd arg (which is not helpful in tests). */ + def apply(spark: SparkSession, id: TableIdentifier): DeltaTableV2 = + dt.apply(spark, id, "test") + } + + implicit class DeltaTableV2TestHelper(deltaTable: DeltaTableV2) { + /** For backward compatibility with existing unit tests */ + def snapshot: Snapshot = deltaTable.initialSnapshot + } + + implicit class AutoCompactObjectTestHelper(ac: AutoCompact.type) { + private[delta] def compact( + spark: SparkSession, + deltaLog: DeltaLog, + partitionPredicates: Seq[Expression] = Nil, + opType: String = AutoCompact.OP_TYPE): Seq[OptimizeMetrics] = { + AutoCompact.compact( + spark, deltaLog, catalogTable = None, + partitionPredicates, opType) + } + } + + implicit class StatisticsCollectionObjectTestHelper(sc: StatisticsCollection.type) { + + /** + * This is an implicit helper required for backward compatibility with existing + * unit tests. It allows to call [[StatisticsCollection.recompute]] without a + * catalog table and in the actual call, sets it to [[None]]. + */ + def recompute( + spark: SparkSession, + deltaLog: DeltaLog, + predicates: Seq[Expression] = Seq(Literal(true)), + fileFilter: AddFile => Boolean = af => true): Unit = { + StatisticsCollection.recompute( + spark, deltaLog, catalogTable = None, predicates, fileFilter) + } + } +} +// spotless:on diff --git a/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala b/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala new file mode 100644 index 000000000000..26c1a69481f0 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package shims + +import org.apache.spark.sql.QueryTest + +// spotless:off +trait DeltaExcludedBySparkVersionTestMixinShims extends QueryTest { + /** + * Tests that are meant for Delta compiled against Spark Latest Release only. Executed since this + * is the Spark Latest Release shim. + */ + protected def testSparkLatestOnly( + testName: String, testTags: org.scalatest.Tag*) + (testFun: => Any) + (implicit pos: org.scalactic.source.Position): Unit = { + test(testName, testTags: _*)(testFun)(pos) + } + + /** + * Tests that are meant for Delta compiled against Spark Master Release only. Ignored since this + * is the Spark Latest Release shim. + */ + protected def testSparkMasterOnly( + testName: String, testTags: org.scalatest.Tag*) + (testFun: => Any) + (implicit pos: org.scalactic.source.Position): Unit = { + ignore(testName, testTags: _*)(testFun)(pos) + } +} +// spotless:on