From 5efc78aee5e7cd06d3bd151aec7ff67511fd99fa Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 9 Oct 2025 12:25:38 +0800 Subject: [PATCH 1/2] Revert "[GLUTEN-8966][VL] Propagate HashAggregate's ignoreNullKeys when possible (#8967)" This reverts commit e368a32ac7a7b832def212596363361e2c245a90. --- .../backendsapi/velox/VeloxRuleApi.scala | 2 - .../apache/gluten/config/VeloxConfig.scala | 12 --- .../HashAggregateExecTransformer.scala | 24 ++--- .../HashAggregateIgnoreNullKeysRule.scala | 89 ------------------- .../VeloxAggregateFunctionsSuite.scala | 24 +---- cpp/velox/substrait/SubstraitToVeloxPlan.cc | 5 -- .../HashAggregateExecBaseTransformer.scala | 7 +- 7 files changed, 10 insertions(+), 153 deletions(-) delete mode 100644 backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 57d86a18d48d..485bf48f1523 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -120,7 +120,6 @@ object VeloxRuleApi { injector.injectPostTransform(_ => PullOutDuplicateProject) injector.injectPostTransform(_ => CollapseProjectExecTransformer) injector.injectPostTransform(c => FlushableHashAggregateRule.apply(c.session)) - injector.injectPostTransform(c => HashAggregateIgnoreNullKeysRule.apply(c.session)) injector.injectPostTransform(_ => CollectLimitTransformerRule()) injector.injectPostTransform(_ => CollectTailTransformerRule()) injector.injectPostTransform(_ => V2WritePostRule()) @@ -222,7 +221,6 @@ object VeloxRuleApi { injector.injectPostTransform(_ => PullOutDuplicateProject) injector.injectPostTransform(_ => CollapseProjectExecTransformer) injector.injectPostTransform(c => FlushableHashAggregateRule.apply(c.session)) - injector.injectPostTransform(c => HashAggregateIgnoreNullKeysRule.apply(c.session)) injector.injectPostTransform(_ => CollectLimitTransformerRule()) injector.injectPostTransform(_ => CollectTailTransformerRule()) injector.injectPostTransform(_ => V2WritePostRule()) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index b4f4556fe1fb..1be6749e08e2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -64,9 +64,6 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def veloxOrcScanEnabled: Boolean = getConf(VELOX_ORC_SCAN_ENABLED) - def enablePropagateIgnoreNullKeys: Boolean = - getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED) - def floatingPointMode: String = getConf(FLOATING_POINT_MODE) def enableRewriteCastArrayToString: Boolean = @@ -588,15 +585,6 @@ object VeloxConfig extends ConfigRegistry { .stringConf .createWithDefault("") - val VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED = - buildConf("spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys") - .doc( - "If enabled, we will identify aggregation followed by an inner join " + - "on the grouping keys, and mark the ignoreNullKeys flag to true to " + - "avoid unnecessary aggregation on null keys.") - .booleanConf - .createWithDefault(true) - val FLOATING_POINT_MODE = buildConf("spark.gluten.sql.columnar.backend.velox.floatingPointMode") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala index 9bb72acb1a52..e46d5340d0c3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashAggregateExecTransformer.scala @@ -49,8 +49,7 @@ abstract class HashAggregateExecTransformer( aggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], - child: SparkPlan, - ignoreNullKeys: Boolean) + child: SparkPlan) extends HashAggregateExecBaseTransformer( requiredChildDistributionExpressions, groupingExpressions, @@ -58,9 +57,7 @@ abstract class HashAggregateExecTransformer( aggregateAttributes, initialInputBufferOffset, resultExpressions, - child, - ignoreNullKeys - ) { + child) { override def output: Seq[Attribute] = { // TODO: We should have a check to make sure the returned schema actually matches the output @@ -186,8 +183,7 @@ abstract class HashAggregateExecTransformer( private def formatExtOptimizationString(isStreaming: Boolean): String = { val isStreamingStr = if (isStreaming) "1" else "0" val allowFlushStr = if (allowFlush) "1" else "0" - val ignoreNullKeysStr = if (ignoreNullKeys) "1" else "0" - s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\nignoreNullKeys=$ignoreNullKeysStr\n" + s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\n" } // Create aggregate function node and add to list. @@ -661,8 +657,7 @@ case class RegularHashAggregateExecTransformer( aggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], - child: SparkPlan, - ignoreNullKeys: Boolean = false) + child: SparkPlan) extends HashAggregateExecTransformer( requiredChildDistributionExpressions, groupingExpressions, @@ -670,9 +665,7 @@ case class RegularHashAggregateExecTransformer( aggregateAttributes, initialInputBufferOffset, resultExpressions, - child, - ignoreNullKeys - ) { + child) { override protected def allowFlush: Boolean = false @@ -696,8 +689,7 @@ case class FlushableHashAggregateExecTransformer( aggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], - child: SparkPlan, - ignoreNullKeys: Boolean = false) + child: SparkPlan) extends HashAggregateExecTransformer( requiredChildDistributionExpressions, groupingExpressions, @@ -705,9 +697,7 @@ case class FlushableHashAggregateExecTransformer( aggregateAttributes, initialInputBufferOffset, resultExpressions, - child, - ignoreNullKeys - ) { + child) { override protected def allowFlush: Boolean = true diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala deleted file mode 100644 index 1f57d93de045..000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/HashAggregateIgnoreNullKeysRule.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.extension - -import org.apache.gluten.config.VeloxConfig -import org.apache.gluten.execution._ - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike -import org.apache.spark.sql.execution.joins.BaseJoinExec - -/** - * To identify aggregates that the groupby key is used as inner join keys. In this case, we can set - * ignoreNullKeys to true when convert to velox's AggregateNode. - */ -case class HashAggregateIgnoreNullKeysRule(session: SparkSession) extends Rule[SparkPlan] { - override def apply(plan: SparkPlan): SparkPlan = { - if (!VeloxConfig.get.enablePropagateIgnoreNullKeys) { - return plan - } - plan.transformUp { - case join: BaseJoinExec if join.joinType == Inner => - val newLeftChild = setIgnoreKeysIfAggregateOnJoinKeys(join.left, join.leftKeys) - val newRightChild = setIgnoreKeysIfAggregateOnJoinKeys(join.right, join.rightKeys) - if (newLeftChild.fastEquals(join.left) && newRightChild.fastEquals(join.right)) { - join - } else { - join.withNewChildren(Seq(newLeftChild, newRightChild)) - } - case p => p - } - } - - private def setIgnoreKeysIfAggregateOnJoinKeys( - plan: SparkPlan, - joinKeys: Seq[Expression]): SparkPlan = plan match { - case agg: HashAggregateExecTransformer => - val newChild = setIgnoreKeysIfAggregateOnJoinKeys(agg.child, joinKeys) - val canIgnoreNullKeysRule = semanticEquals(agg.groupingExpressions, joinKeys) - agg match { - case f: FlushableHashAggregateExecTransformer => - f.copy(ignoreNullKeys = canIgnoreNullKeysRule, child = newChild) - case r: RegularHashAggregateExecTransformer => - r.copy(ignoreNullKeys = canIgnoreNullKeysRule, child = newChild) - case _ => agg - } - case s: ShuffleQueryStageExec => - s.copy(plan = setIgnoreKeysIfAggregateOnJoinKeys(s.plan, joinKeys)) - case p if !canPropagate(p) => p - case other => - other.withNewChildren( - other.children.map(c => setIgnoreKeysIfAggregateOnJoinKeys(c, joinKeys))) - } - - private def canPropagate(plan: SparkPlan): Boolean = plan match { - case _: ProjectExecTransformer => true - case _: WholeStageTransformer => true - case _: VeloxResizeBatchesExec => true - case _: ShuffleExchangeLike => true - case _: VeloxColumnarToRowExec => true - case _: SortExecTransformer => true - case _ => false - } - - private def semanticEquals(aggExpression: Seq[Expression], joinKeys: Seq[Expression]): Boolean = { - aggExpression.size == joinKeys.size && aggExpression.zip(joinKeys).forall { - case (e1: Expression, e2: Expression) => e1.semanticEquals(e2) - } - } -} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala index b5ebcf1785ec..e90171cece73 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.execution -import org.apache.gluten.config.{GlutenConfig, VeloxConfig} +import org.apache.gluten.config.VeloxConfig import org.apache.gluten.extension.columnar.validator.FallbackInjects import org.apache.spark.SparkConf @@ -1193,28 +1193,6 @@ class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite { } } } - - test("aggregate on join keys can set ignoreNullKeys") { - val s = - """ - |select count(1) from - | (select l_orderkey, max(l_partkey) from lineitem group by l_orderkey) a - |inner join - | (select l_orderkey from lineitem) b - |on a.l_orderkey = b.l_orderkey - |""".stripMargin - withSQLConf(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "true") { - runQueryAndCompare(s) { - df => - val executedPlan = getExecutedPlan(df) - assert(executedPlan.exists { - case a: RegularHashAggregateExecTransformer if a.ignoreNullKeys => true - case a: FlushableHashAggregateExecTransformer if a.ignoreNullKeys => true - case _ => false - }) - } - } - } } class VeloxAggregateFunctionsFlushSuite extends VeloxAggregateFunctionsSuite { diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index ba7a70756848..7e4cd1f51476 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -462,11 +462,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: preGroupingExprs.insert(preGroupingExprs.begin(), veloxGroupingExprs.begin(), veloxGroupingExprs.end()); } - if (aggRel.has_advanced_extension() && - SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), "ignoreNullKeys=")) { - ignoreNullKeys = true; - } - // Get the output names of Aggregation. std::vector aggOutNames; aggOutNames.reserve(aggRel.measures().size()); diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala index 6e2d638b8228..a4bcc6081e43 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala @@ -39,8 +39,7 @@ abstract class HashAggregateExecBaseTransformer( aggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], - child: SparkPlan, - ignoreNullKeys: Boolean = false) + child: SparkPlan) extends BaseAggregateExec with UnaryTransformSupport { @@ -87,13 +86,11 @@ abstract class HashAggregateExecBaseTransformer( s"HashAggregateTransformer(keys=$keyString, " + s"functions=$functionString, " + s"isStreamingAgg=$isCapableForStreamingAggregation, " + - s"ignoreNullKeys=$ignoreNullKeys, " + s"output=$outputString)" } else { s"HashAggregateTransformer(keys=$keyString, " + s"functions=$functionString, " + - s"isStreamingAgg=$isCapableForStreamingAggregation, " + - s"ignoreNullKeys=$ignoreNullKeys)" + s"isStreamingAgg=$isCapableForStreamingAggregation)" } } From 0dbce157a202f483d74879be4401db92566cbc6e Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 9 Oct 2025 12:55:06 +0800 Subject: [PATCH 2/2] fix --- .../apache/gluten/execution/VeloxAggregateFunctionsSuite.scala | 2 +- docs/velox-configuration.md | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala index e90171cece73..06081d63088b 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.execution -import org.apache.gluten.config.VeloxConfig +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} import org.apache.gluten.extension.columnar.validator.FallbackInjects import org.apache.spark.SparkConf diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 19db11c1d522..160f0c88ae5e 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -49,7 +49,6 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | | spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | -| spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys | true | If enabled, we will identify aggregation followed by an inner join on the grouping keys, and mark the ignoreNullKeys flag to true to avoid unnecessary aggregation on null keys. | | spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | | spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. | | spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize |