diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala index 013df1e47693..50b5c1171c7f 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala @@ -35,5 +35,4 @@ object VeloxIcebergAppendDataExec { original.write ) } - } diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala new file mode 100644 index 000000000000..fc78642f3366 --- /dev/null +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.connector.write.Write +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.OverwritePartitionsDynamicExec + +case class VeloxIcebergOverwritePartitionsDynamicExec( + query: SparkPlan, + refreshCache: () => Unit, + write: Write) + extends AbstractIcebergWriteExec { + + override protected def withNewChildInternal(newChild: SparkPlan): IcebergWriteExec = + copy(query = newChild) +} + +object VeloxIcebergOverwritePartitionsDynamicExec { + def apply(original: OverwritePartitionsDynamicExec): IcebergWriteExec = { + VeloxIcebergOverwritePartitionsDynamicExec( + original.query, + original.refreshCache, + original.write + ) + } +} diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala index 98135fae497a..67b701f771e4 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.execution.{VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec} +import org.apache.gluten.execution.{VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec} import org.apache.gluten.extension.columnar.enumerated.RasOffload import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform import org.apache.gluten.extension.columnar.offload.OffloadSingleNode @@ -25,9 +25,9 @@ import org.apache.gluten.extension.columnar.validator.Validators import org.apache.gluten.extension.injector.Injector import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec, ReplaceDataExec} +import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec} -case class OffloadIcebergWrite() extends OffloadSingleNode { +case class OffloadIcebergAppend() extends OffloadSingleNode { override def offload(plan: SparkPlan): SparkPlan = plan match { case a: AppendDataExec => VeloxIcebergAppendDataExec(a) @@ -35,7 +35,7 @@ case class OffloadIcebergWrite() extends OffloadSingleNode { } } -case class OffloadIcebergDelete() extends OffloadSingleNode { +case class OffloadIcebergReplaceData() extends OffloadSingleNode { override def offload(plan: SparkPlan): SparkPlan = plan match { case r: ReplaceDataExec => VeloxIcebergReplaceDataExec(r) @@ -51,12 +51,24 @@ case class OffloadIcebergOverwrite() extends OffloadSingleNode { } } +case class OffloadIcebergOverwritePartitionsDynamic() extends OffloadSingleNode { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case r: OverwritePartitionsDynamicExec => + VeloxIcebergOverwritePartitionsDynamicExec(r) + case other => other + } +} + object OffloadIcebergWrite { def inject(injector: Injector): Unit = { // Inject legacy rule. injector.gluten.legacy.injectTransform { c => - val offload = Seq(OffloadIcebergWrite(), OffloadIcebergDelete(), OffloadIcebergOverwrite()) + val offload = Seq( + OffloadIcebergAppend(), + OffloadIcebergReplaceData(), + OffloadIcebergOverwrite(), + OffloadIcebergOverwritePartitionsDynamic()) HeuristicTransform.Simple( Validators.newValidator(new GlutenConfig(c.sqlConf), offload), offload @@ -64,9 +76,10 @@ object OffloadIcebergWrite { } val offloads: Seq[RasOffload] = Seq( - RasOffload.from[AppendDataExec](OffloadIcebergWrite()), - RasOffload.from[ReplaceDataExec](OffloadIcebergDelete()), - RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite()) + RasOffload.from[AppendDataExec](OffloadIcebergAppend()), + RasOffload.from[ReplaceDataExec](OffloadIcebergReplaceData()), + RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite()), + RasOffload.from[OverwritePartitionsDynamicExec](OffloadIcebergOverwritePartitionsDynamic()) ) offloads.foreach( offload => diff --git a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala index b7270e69a0a1..6702c778856d 100644 --- a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala +++ b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.execution.enhanced -import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite, VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec} +import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite, VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec} import org.apache.gluten.tags.EnhancedFeaturesTest import org.apache.spark.sql.{DataFrame, Row} @@ -259,4 +259,28 @@ class VeloxIcebergSuite extends IcebergSuite { checkColumnarToRow(df, 0) } } + + test("iceberg dynamic insert overwrite partition") { + withTable("iceberg_tbl") { + spark.sql(""" + |create table if not exists iceberg_tbl (a int, pt int) using iceberg + |partitioned by (pt) + |""".stripMargin) + + spark.sql("insert into table iceberg_tbl values (1, 1), (2, 2)") + + withSQLConf("spark.sql.sources.partitionOverwriteMode" -> "dynamic") { + val df = spark.sql("insert overwrite table iceberg_tbl values (11, 1)") + assert( + df.queryExecution.executedPlan + .asInstanceOf[CommandResultExec] + .commandPhysicalPlan + .isInstanceOf[VeloxIcebergOverwritePartitionsDynamicExec]) + checkAnswer( + spark.sql("select * from iceberg_tbl order by pt"), + Seq(Row(11, 1), Row(2, 2)) + ) + } + } + } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index a410f15f89e0..cb5cb9165d29 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -559,4 +559,7 @@ object VeloxBackendSettings extends BackendSettingsApi { override def supportOverwriteByExpression(): Boolean = GlutenConfig.get.enableOverwriteByExpression && enableEnhancedFeatures() + + override def supportOverwritePartitionsDynamic(): Boolean = + GlutenConfig.get.enableOverwritePartitionsDynamic && enableEnhancedFeatures() } diff --git a/docs/Configuration.md b/docs/Configuration.md index 9aacdb377743..c3d0c396fd6d 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -74,6 +74,7 @@ nav_order: 15 | spark.gluten.sql.columnar.limit | true | | spark.gluten.sql.columnar.maxBatchSize | 4096 | | spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | +| spark.gluten.sql.columnar.overwritePartitionsDynamic | true | Enable or disable columnar v2 command overwrite partitions dynamic. | | spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | | spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | | spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala index 5565ccbf35c8..a3346468df4c 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala @@ -66,8 +66,9 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec { if (IcebergWriteUtil.hasUnsupportedDataType(write)) { return ValidationResult.failed("Contains UUID ot FIXED data type") } - if (BackendsApiManager.getValidatorApiInstance.doSchemaValidate(query.schema).isDefined) { - return ValidationResult.failed("Contains unsupported data type") + BackendsApiManager.getValidatorApiInstance.doSchemaValidate(query.schema) match { + case Some(reason) => return ValidationResult.failed(reason) + case None => } val spec = IcebergWriteUtil.getTable(write).spec() if (spec.isPartitioned) { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 72caa1203e63..eea34e8c447b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -161,4 +161,6 @@ trait BackendSettingsApi { def supportReplaceDataExec(): Boolean = false def supportOverwriteByExpression(): Boolean = false + + def supportOverwritePartitionsDynamic(): Boolean = false } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 6e686210ef2a..aeecbdb1987c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -90,6 +90,9 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def enableOverwriteByExpression: Boolean = getConf(COLUMNAR_OVERWRIET_BY_EXPRESSION_ENABLED) + def enableOverwritePartitionsDynamic: Boolean = + getConf(COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED) + def enableColumnarShuffledHashJoin: Boolean = getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED) def shuffledHashJoinOptimizeBuildSide: Boolean = @@ -864,6 +867,12 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED = + buildConf("spark.gluten.sql.columnar.overwriteOverwritePartitionsDynamic") + .doc("Enable or disable columnar v2 command overwrite partitions dynamic.") + .booleanConf + .createWithDefault(true) + val COLUMNAR_PREFER_STREAMING_AGGREGATE = buildConf("spark.gluten.sql.columnar.preferStreamingAggregate") .doc( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 2eda0ee12c43..14d4a7ffcfe4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec -import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, BatchScanExec, OverwriteByExpressionExec, ReplaceDataExec} +import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec @@ -138,6 +138,8 @@ object Validators { case p: AppendDataExec if !settings.supportAppendDataExec() => fail(p) case p: ReplaceDataExec if !settings.supportReplaceDataExec() => fail(p) case p: OverwriteByExpressionExec if !settings.supportOverwriteByExpression() => fail(p) + case p: OverwritePartitionsDynamicExec if !settings.supportOverwritePartitionsDynamic() => + fail(p) case _ => pass() } } @@ -160,6 +162,8 @@ object Validators { case p: AppendDataExec if !glutenConf.enableAppendData => fail(p) case p: ReplaceDataExec if !glutenConf.enableReplaceData => fail(p) case p: OverwriteByExpressionExec if !glutenConf.enableOverwriteByExpression => fail(p) + case p: OverwritePartitionsDynamicExec if !glutenConf.enableOverwritePartitionsDynamic => + fail(p) case p @ (_: LocalLimitExec | _: GlobalLimitExec) if !glutenConf.enableColumnarLimit => fail(p) case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p)