From d15bcd9ab13423ad2c4908fbc21de968ff070a37 Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Wed, 25 Feb 2026 10:54:53 +0800 Subject: [PATCH 01/16] feat:support gluten-level approx_percentile --- .../backendsapi/velox/VeloxRuleApi.scala | 1 + .../velox/VeloxSparkPlanExecApi.scala | 3 +- .../aggregate/VeloxApproxPercentile.scala | 113 ++++++++++++++++++ .../extension/PercentileRewriteRule.scala | 54 +++++++++ .../SubstraitToVeloxPlanValidator.cc | 3 +- .../apache/gluten/config/GlutenConfig.scala | 11 ++ 6 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala create mode 100644 backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 1d805362903a..68c304b3e3c7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -64,6 +64,7 @@ object VeloxRuleApi { injector.injectOptimizerRule(CollectRewriteRule.apply) injector.injectOptimizerRule(HLLRewriteRule.apply) injector.injectOptimizerRule(CollapseGetJsonObjectExpressionRule.apply) + injector.injectOptimizerRule(ApproxPercentileRewriteRule.apply) injector.injectOptimizerRule(RewriteCastFromArray.apply) injector.injectOptimizerRule(RewriteUnboundedWindow.apply) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 338bef20dfe5..2eea282f33b4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -21,7 +21,7 @@ import org.apache.gluten.config.{GlutenConfig, HashShuffleWriterType, ReservedKe import org.apache.gluten.exception.{GlutenExceptionUtil, GlutenNotSupportException} import org.apache.gluten.execution._ import org.apache.gluten.expression._ -import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} +import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxApproximatePercentile, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension.JoinKeysTag import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer @@ -1091,6 +1091,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { Sig[CollectList](ExpressionNames.COLLECT_LIST), Sig[VeloxCollectSet](ExpressionNames.COLLECT_SET), Sig[CollectSet](ExpressionNames.COLLECT_SET), + Sig[VeloxApproximatePercentile](ExpressionNames.APPROX_PERCENTILE), Sig[VeloxBloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN), Sig[VeloxBloomFilterAggregate](ExpressionNames.BLOOM_FILTER_AGG), Sig[MapFilter](ExpressionNames.MAP_FILTER), diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala new file mode 100644 index 000000000000..e9c5ba7b4946 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.expression.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions.aggregate.{ApproximatePercentile, ImperativeAggregate} +import org.apache.spark.sql.catalyst.trees.TernaryLike +import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, DoubleType, IntegerType, LongType, StructField, StructType} + +/** + * We add this wrapper to 1) align the intermediate type between Gluten and Velox. 2) switch between + * different velox algorithms 3) unify the input signature to velox + */ +case class VeloxApproximatePercentile( + child: Expression, + percentageExpression: Expression, + accuracyExpression: Expression, + override val mutableAggBufferOffset: Int, + override val inputAggBufferOffset: Int) + extends ImperativeAggregate + with TernaryLike[Expression] { + + private val delegate = ApproximatePercentile( + child, + percentageExpression, + accuracyExpression, + mutableAggBufferOffset, + inputAggBufferOffset + ) + + // the intermediate result type is kept as double. + private lazy val aggBufferDataType: DataType = { + val childType = child.dataType + StructType( + Array( + StructField("col1", ArrayType(DoubleType)), + StructField("col2", BooleanType, false), + StructField("col3", DoubleType, false), + StructField("col4", IntegerType, false), + StructField("col5", LongType, false), + StructField("col6", childType, false), + StructField("col7", childType, false), + StructField("col8", ArrayType(childType)), + StructField("col9", ArrayType(IntegerType)) + ) + ) + } + + override lazy val aggBufferAttributes: Seq[AttributeReference] = + List(AttributeReference("buffer", aggBufferDataType)()) + + final override lazy val aggBufferSchema: StructType = + StructType( + aggBufferAttributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)) + ) + + final override lazy val inputAggBufferAttributes: Seq[AttributeReference] = + aggBufferAttributes.map(_.newInstance()) + + override def first: Expression = child + override def second: Expression = percentageExpression + override def third: Expression = accuracyExpression + + override def checkInputDataTypes(): TypeCheckResult = delegate.checkInputDataTypes() + override def nullable: Boolean = delegate.nullable + override def dataType: DataType = delegate.dataType + override def prettyName: String = "velox_approx_percentile" + + // explicitly throw exceptions on fallback. + override def eval(input: InternalRow): Any = { + throw new UnsupportedOperationException("eval") + } + + override def initialize(mutableAggBuffer: InternalRow): Unit = { + throw new UnsupportedOperationException("initialize") + } + + override def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit = { + throw new UnsupportedOperationException("merge") + } + + override def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit = { + throw new UnsupportedOperationException("update") + } + + override def withNewMutableAggBufferOffset(newOffset: Int): ImperativeAggregate = + copy(mutableAggBufferOffset = newOffset) + + override def withNewInputAggBufferOffset(newOffset: Int): ImperativeAggregate = + copy(inputAggBufferOffset = newOffset) + + override protected def withNewChildrenInternal( + newFirst: Expression, + newSecond: Expression, + newThird: Expression): VeloxApproximatePercentile = + copy(child = newFirst, percentageExpression = newSecond, accuracyExpression = newThird) +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala new file mode 100644 index 000000000000..a90a16f8fced --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.expression.aggregate.VeloxApproximatePercentile + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXPRESSION} + +/** + * Rewrite Spark native ApproximatePercentile to VeloxApproximatePercentile: + * - Accuracy argument must be DOUBLE type (Velox requirement) + * - Only rewrite when all expressions are resolved + */ +case class ApproxPercentileRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + if (GlutenConfig.get.glutenPercentileFunctionFallback) { + return plan + } + plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) { + case a: Aggregate => + a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { + case aggExpr @ AggregateExpression(sparkPercentile: ApproximatePercentile, _, _, _, _) => + val veloxPercentile = VeloxApproximatePercentile( + child = sparkPercentile.child, + percentageExpression = sparkPercentile.percentageExpression, + accuracyExpression = sparkPercentile.accuracyExpression, + mutableAggBufferOffset = sparkPercentile.mutableAggBufferOffset, + inputAggBufferOffset = sparkPercentile.inputAggBufferOffset + ) + + aggExpr.copy(aggregateFunction = veloxPercentile) + } + } + } +} diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc index 38d81320f9d9..bbb175936eec 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc @@ -60,7 +60,7 @@ const std::unordered_set kRegexFunctions = { "split"}; const std::unordered_set kBlackList = - {"split_part", "sequence", "approx_percentile", "map_from_arrays"}; + {"split_part", "sequence", "map_from_arrays"}; } // namespace bool SubstraitToVeloxPlanValidator::parseVeloxType( @@ -1290,6 +1290,7 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::AggregateRel& ag "bloom_filter_agg", "var_samp", "var_pop", + "approx_percentile", "bit_and", "bit_or", "bit_xor", diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 8d71e15964ea..ce7dd155324d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -384,6 +384,7 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def maxBroadcastTableSize: Long = JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB")) + def glutenPercentileFunctionFallback: Boolean = getConf(GLUTEN_PERCENTILE_FUNCTION_FALLBACK) } object GlutenConfig extends ConfigRegistry { @@ -1595,4 +1596,14 @@ object GlutenConfig extends ConfigRegistry { "total size of small files is below this threshold.") .doubleConf .createWithDefault(0.5) + + val GLUTEN_PERCENTILE_FUNCTION_FALLBACK = + buildConf("spark.gluten.sql.glutenPercentileFunctionStrategy") + .doc( + "Chooses whether percentile algorithm falls back " + + "for the percentile function. kll is provided by open-source velox, but " + + "we can always disable this function and fallback.") + .internal() + .booleanConf + .createWithDefault(false) } From 5cb45ad05dfe262b46e79bdaf6ef7593aba85f39 Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Wed, 25 Feb 2026 11:05:36 +0800 Subject: [PATCH 02/16] copied clickhouse related changes --- .../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index cdf2eae418b4..a3e817d57b55 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -41,7 +41,13 @@ import org.apache.spark.shuffle.utils.CHShuffleUtil import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate, CollectList, CollectSet} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, CollectList, CollectSet} +>>>>>>> 46c41c8ef... copied clickhouse related changes +======= +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, BloomFilterAggregate, CollectList, CollectSet} +======= +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, CollectList, CollectSet} +>>>>>>> 46c41c8ef... copied clickhouse related changes import org.apache.spark.sql.catalyst.optimizer.BuildSide import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning} @@ -597,6 +603,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { List( Sig[CollectList](ExpressionNames.COLLECT_LIST), Sig[CollectSet](ExpressionNames.COLLECT_SET), + Sig[ApproximatePercentile](ExpressionNames.APPROX_PERCENTILE), Sig[MonotonicallyIncreasingID](MONOTONICALLY_INCREASING_ID), CHFlattenedExpression.sigAnd, CHFlattenedExpression.sigOr From 67b419961350d008fb8a38e9578c66b141d44b80 Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Wed, 25 Feb 2026 19:53:38 +0800 Subject: [PATCH 03/16] address comments --- .../gluten/extension/PercentileRewriteRule.scala | 4 ---- ep/build-velox/src/get-velox.sh | 2 +- .../scala/org/apache/gluten/config/GlutenConfig.scala | 11 ----------- 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala index a90a16f8fced..71e8b32371c7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.extension -import org.apache.gluten.config.GlutenConfig import org.apache.gluten.expression.aggregate.VeloxApproximatePercentile import org.apache.spark.sql.SparkSession @@ -32,9 +31,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXP */ case class ApproxPercentileRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { - if (GlutenConfig.get.glutenPercentileFunctionFallback) { - return plan - } plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) { case a: Aggregate => a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 558f988e5c93..b4903890fcd4 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -25,7 +25,7 @@ RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF # Developer use only for testing Velox PR. -UPSTREAM_VELOX_PR_ID="" +UPSTREAM_VELOX_PR_ID="16320" OS=`uname -s` diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index ce7dd155324d..8d71e15964ea 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -384,7 +384,6 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def maxBroadcastTableSize: Long = JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB")) - def glutenPercentileFunctionFallback: Boolean = getConf(GLUTEN_PERCENTILE_FUNCTION_FALLBACK) } object GlutenConfig extends ConfigRegistry { @@ -1596,14 +1595,4 @@ object GlutenConfig extends ConfigRegistry { "total size of small files is below this threshold.") .doubleConf .createWithDefault(0.5) - - val GLUTEN_PERCENTILE_FUNCTION_FALLBACK = - buildConf("spark.gluten.sql.glutenPercentileFunctionStrategy") - .doc( - "Chooses whether percentile algorithm falls back " + - "for the percentile function. kll is provided by open-source velox, but " + - "we can always disable this function and fallback.") - .internal() - .booleanConf - .createWithDefault(false) } From 65fb782e1c1ae2b59457352b88641f3e7f51548f Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Thu, 26 Feb 2026 10:46:06 +0800 Subject: [PATCH 04/16] remove optimizer tricks so that signature is consistent with vanilla spark --- .../backendsapi/velox/VeloxRuleApi.scala | 1 - .../velox/VeloxSparkPlanExecApi.scala | 3 +- .../aggregate/VeloxApproxPercentile.scala | 113 ------------------ .../extension/PercentileRewriteRule.scala | 50 -------- 4 files changed, 1 insertion(+), 166 deletions(-) delete mode 100644 backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala delete mode 100644 backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 68c304b3e3c7..1d805362903a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -64,7 +64,6 @@ object VeloxRuleApi { injector.injectOptimizerRule(CollectRewriteRule.apply) injector.injectOptimizerRule(HLLRewriteRule.apply) injector.injectOptimizerRule(CollapseGetJsonObjectExpressionRule.apply) - injector.injectOptimizerRule(ApproxPercentileRewriteRule.apply) injector.injectOptimizerRule(RewriteCastFromArray.apply) injector.injectOptimizerRule(RewriteUnboundedWindow.apply) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 2eea282f33b4..338bef20dfe5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -21,7 +21,7 @@ import org.apache.gluten.config.{GlutenConfig, HashShuffleWriterType, ReservedKe import org.apache.gluten.exception.{GlutenExceptionUtil, GlutenNotSupportException} import org.apache.gluten.execution._ import org.apache.gluten.expression._ -import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxApproximatePercentile, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} +import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension.JoinKeysTag import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer @@ -1091,7 +1091,6 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { Sig[CollectList](ExpressionNames.COLLECT_LIST), Sig[VeloxCollectSet](ExpressionNames.COLLECT_SET), Sig[CollectSet](ExpressionNames.COLLECT_SET), - Sig[VeloxApproximatePercentile](ExpressionNames.APPROX_PERCENTILE), Sig[VeloxBloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN), Sig[VeloxBloomFilterAggregate](ExpressionNames.BLOOM_FILTER_AGG), Sig[MapFilter](ExpressionNames.MAP_FILTER), diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala deleted file mode 100644 index e9c5ba7b4946..000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.expression.aggregate - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.catalyst.expressions.aggregate.{ApproximatePercentile, ImperativeAggregate} -import org.apache.spark.sql.catalyst.trees.TernaryLike -import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, DoubleType, IntegerType, LongType, StructField, StructType} - -/** - * We add this wrapper to 1) align the intermediate type between Gluten and Velox. 2) switch between - * different velox algorithms 3) unify the input signature to velox - */ -case class VeloxApproximatePercentile( - child: Expression, - percentageExpression: Expression, - accuracyExpression: Expression, - override val mutableAggBufferOffset: Int, - override val inputAggBufferOffset: Int) - extends ImperativeAggregate - with TernaryLike[Expression] { - - private val delegate = ApproximatePercentile( - child, - percentageExpression, - accuracyExpression, - mutableAggBufferOffset, - inputAggBufferOffset - ) - - // the intermediate result type is kept as double. - private lazy val aggBufferDataType: DataType = { - val childType = child.dataType - StructType( - Array( - StructField("col1", ArrayType(DoubleType)), - StructField("col2", BooleanType, false), - StructField("col3", DoubleType, false), - StructField("col4", IntegerType, false), - StructField("col5", LongType, false), - StructField("col6", childType, false), - StructField("col7", childType, false), - StructField("col8", ArrayType(childType)), - StructField("col9", ArrayType(IntegerType)) - ) - ) - } - - override lazy val aggBufferAttributes: Seq[AttributeReference] = - List(AttributeReference("buffer", aggBufferDataType)()) - - final override lazy val aggBufferSchema: StructType = - StructType( - aggBufferAttributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)) - ) - - final override lazy val inputAggBufferAttributes: Seq[AttributeReference] = - aggBufferAttributes.map(_.newInstance()) - - override def first: Expression = child - override def second: Expression = percentageExpression - override def third: Expression = accuracyExpression - - override def checkInputDataTypes(): TypeCheckResult = delegate.checkInputDataTypes() - override def nullable: Boolean = delegate.nullable - override def dataType: DataType = delegate.dataType - override def prettyName: String = "velox_approx_percentile" - - // explicitly throw exceptions on fallback. - override def eval(input: InternalRow): Any = { - throw new UnsupportedOperationException("eval") - } - - override def initialize(mutableAggBuffer: InternalRow): Unit = { - throw new UnsupportedOperationException("initialize") - } - - override def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit = { - throw new UnsupportedOperationException("merge") - } - - override def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit = { - throw new UnsupportedOperationException("update") - } - - override def withNewMutableAggBufferOffset(newOffset: Int): ImperativeAggregate = - copy(mutableAggBufferOffset = newOffset) - - override def withNewInputAggBufferOffset(newOffset: Int): ImperativeAggregate = - copy(inputAggBufferOffset = newOffset) - - override protected def withNewChildrenInternal( - newFirst: Expression, - newSecond: Expression, - newThird: Expression): VeloxApproximatePercentile = - copy(child = newFirst, percentageExpression = newSecond, accuracyExpression = newThird) -} diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala deleted file mode 100644 index 71e8b32371c7..000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.extension - -import org.apache.gluten.expression.aggregate.VeloxApproximatePercentile - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXPRESSION} - -/** - * Rewrite Spark native ApproximatePercentile to VeloxApproximatePercentile: - * - Accuracy argument must be DOUBLE type (Velox requirement) - * - Only rewrite when all expressions are resolved - */ -case class ApproxPercentileRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = { - plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) { - case a: Aggregate => - a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { - case aggExpr @ AggregateExpression(sparkPercentile: ApproximatePercentile, _, _, _, _) => - val veloxPercentile = VeloxApproximatePercentile( - child = sparkPercentile.child, - percentageExpression = sparkPercentile.percentageExpression, - accuracyExpression = sparkPercentile.accuracyExpression, - mutableAggBufferOffset = sparkPercentile.mutableAggBufferOffset, - inputAggBufferOffset = sparkPercentile.inputAggBufferOffset - ) - - aggExpr.copy(aggregateFunction = veloxPercentile) - } - } - } -} From 413c7007a25c968f0b64fa6cc5b96c5627e9bac5 Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Mon, 2 Mar 2026 11:21:10 +0800 Subject: [PATCH 05/16] pick approx_percentile changes from 47fc0fdf 5e6e1e9c 4d459e1f: convert VeloxApproximatePercentile to DeclarativeAggregate with KLL sketch --- .../backendsapi/velox/VeloxRuleApi.scala | 1 + .../velox/VeloxSparkPlanExecApi.scala | 3 +- .../aggregate/VeloxApproxPercentile.scala | 189 ++++++++ .../VeloxApproxPercentileExpressions.scala | 455 ++++++++++++++++++ .../extension/PercentileRewriteRule.scala | 52 ++ .../apache/gluten/config/GlutenConfig.scala | 12 + 6 files changed, 711 insertions(+), 1 deletion(-) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala create mode 100644 backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala create mode 100644 backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 1d805362903a..68c304b3e3c7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -64,6 +64,7 @@ object VeloxRuleApi { injector.injectOptimizerRule(CollectRewriteRule.apply) injector.injectOptimizerRule(HLLRewriteRule.apply) injector.injectOptimizerRule(CollapseGetJsonObjectExpressionRule.apply) + injector.injectOptimizerRule(ApproxPercentileRewriteRule.apply) injector.injectOptimizerRule(RewriteCastFromArray.apply) injector.injectOptimizerRule(RewriteUnboundedWindow.apply) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 338bef20dfe5..2eea282f33b4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -21,7 +21,7 @@ import org.apache.gluten.config.{GlutenConfig, HashShuffleWriterType, ReservedKe import org.apache.gluten.exception.{GlutenExceptionUtil, GlutenNotSupportException} import org.apache.gluten.execution._ import org.apache.gluten.expression._ -import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} +import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxApproximatePercentile, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension.JoinKeysTag import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer @@ -1091,6 +1091,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { Sig[CollectList](ExpressionNames.COLLECT_LIST), Sig[VeloxCollectSet](ExpressionNames.COLLECT_SET), Sig[CollectSet](ExpressionNames.COLLECT_SET), + Sig[VeloxApproximatePercentile](ExpressionNames.APPROX_PERCENTILE), Sig[VeloxBloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN), Sig[VeloxBloomFilterAggregate](ExpressionNames.BLOOM_FILTER_AGG), Sig[MapFilter](ExpressionNames.MAP_FILTER), diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala new file mode 100644 index 000000000000..c3f4d8b9bc11 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.expression.aggregate + +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{ApproximatePercentile, DeclarativeAggregate} +import org.apache.spark.sql.catalyst.trees.TernaryLike +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ + +/** + * Velox-compatible DeclarativeAggregate for approx_percentile. + * + * Unlike Spark's ApproximatePercentile (which uses QuantileSummaries/GK algorithm with BinaryType + * intermediate data), this implementation uses KLL sketch with a 9-field StructType intermediate + * that is fully compatible with Velox's approx_percentile accumulator layout: + * + * 0: percentiles - Array(Double) 1: percentilesIsArray - Boolean 2: accuracy - Double 3: k - + * Integer (KLL parameter) 4: n - Long (total count) 5: minValue - childType 6: maxValue - childType + * 7: items - Array(childType) 8: levels - Array(Integer) + * + * Because aggBufferAttributes has 9 fields (> 1), the existing VeloxIntermediateData.Type default + * branch (aggBufferAttributes.size > 1) will match automatically, meaning: + * - No special handling needed in HashAggregateExecTransformer + * - extractStruct / rowConstruct projections work out of the box + * - Partial fallback (Velox partial -> Spark final) is supported + * + * This follows the same pattern as VeloxCollectList/VeloxCollectSet. + */ +case class VeloxApproximatePercentile( + child: Expression, + percentageExpression: Expression, + accuracyExpression: Expression) + extends DeclarativeAggregate + with TernaryLike[Expression] { + + override def first: Expression = child + override def second: Expression = percentageExpression + override def third: Expression = accuracyExpression + + override def prettyName: String = "velox_approx_percentile" + + // Mark as lazy so that expressions are not evaluated during tree transformation. + private lazy val accuracy: Double = accuracyExpression.eval() match { + case null => ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY.toDouble + case num: Number => num.doubleValue() + } + + private lazy val (returnPercentileArray, percentages): (Boolean, Array[Double]) = + percentageExpression.eval() match { + case null => (false, null) + case num: Double => (false, Array(num)) + case arrayData: ArrayData => (true, arrayData.toDoubleArray()) + } + + override def checkInputDataTypes(): TypeCheckResult = { + // Delegate to Spark's ApproximatePercentile for validation + ApproximatePercentile(child, percentageExpression, accuracyExpression) + .checkInputDataTypes() + } + + override def nullable: Boolean = true + + override def dataType: DataType = { + if (returnPercentileArray) ArrayType(child.dataType, containsNull = false) + else child.dataType + } + + // --- The 9 aggBuffer attributes matching Velox KLL sketch intermediate type --- + + private lazy val percentilesBuf: AttributeReference = + AttributeReference("percentiles", ArrayType(DoubleType))() + private lazy val percentilesIsArrayBuf: AttributeReference = + AttributeReference("percentilesIsArray", BooleanType)() + private lazy val accuracyBuf: AttributeReference = + AttributeReference("accuracy", DoubleType)() + private lazy val kBuf: AttributeReference = + AttributeReference("k", IntegerType)() + private lazy val nBuf: AttributeReference = + AttributeReference("n", LongType)() + private lazy val minValueBuf: AttributeReference = + AttributeReference("minValue", child.dataType)() + private lazy val maxValueBuf: AttributeReference = + AttributeReference("maxValue", child.dataType)() + private lazy val itemsBuf: AttributeReference = + AttributeReference("items", ArrayType(child.dataType))() + private lazy val levelsBuf: AttributeReference = + AttributeReference("levels", ArrayType(IntegerType))() + + override def aggBufferAttributes: Seq[AttributeReference] = Seq( + percentilesBuf, + percentilesIsArrayBuf, + accuracyBuf, + kBuf, + nBuf, + minValueBuf, + maxValueBuf, + itemsBuf, + levelsBuf + ) + + // --- Initial values: create an empty KLL sketch --- + + private lazy val percentilesLiteral: Literal = { + if (percentages == null) Literal.create(null, ArrayType(DoubleType)) + else + Literal.create( + new org.apache.spark.sql.catalyst.util.GenericArrayData( + percentages.map(_.asInstanceOf[Any])), + ArrayType(DoubleType)) + } + + override lazy val initialValues: Seq[Expression] = Seq( + percentilesLiteral, // percentiles + Literal.create(returnPercentileArray, BooleanType), // percentilesIsArray + Literal.create(accuracy, DoubleType), // accuracy + Literal.create(KllSketchFieldIndex.DEFAULT_K, IntegerType), // k + Literal.create(0L, LongType), // n + Literal.create(null, child.dataType), // minValue + Literal.create(null, child.dataType), // maxValue + Literal.create( + new org.apache.spark.sql.catalyst.util.GenericArrayData(Array.empty[Any]), + ArrayType(child.dataType) + ), // items + Literal.create( + new org.apache.spark.sql.catalyst.util.GenericArrayData(Array(0, 0)), + ArrayType(IntegerType) + ) // levels + ) + + // --- Update expressions: add a value to the sketch --- + + override lazy val updateExpressions: Seq[Expression] = { + // When input is null, keep buffer unchanged; otherwise call KllSketchAdd + val structExpr = CreateStruct(aggBufferAttributes) + val updated = If( + IsNull(child), + structExpr, + KllSketchAdd(structExpr, child, child.dataType) + ) + // Extract fields from the updated struct back to individual buffer attributes + aggBufferAttributes.zipWithIndex.map { + case (attr, idx) => + GetStructField(updated, idx, Some(attr.name)) + } + } + + // --- Merge expressions: merge two sketches --- + + override lazy val mergeExpressions: Seq[Expression] = { + val leftStruct = CreateStruct(aggBufferAttributes.map(_.left)) + val rightStruct = CreateStruct(aggBufferAttributes.map(_.right)) + val merged = KllSketchMerge(leftStruct, rightStruct, child.dataType) + aggBufferAttributes.zipWithIndex.map { + case (attr, idx) => + GetStructField(merged, idx, Some(attr.name)) + } + } + + // --- Evaluate expression: extract percentiles from the sketch --- + + override lazy val evaluateExpression: Expression = { + val structExpr = CreateStruct(aggBufferAttributes) + KllSketchEval(structExpr, returnPercentileArray, dataType, child.dataType) + } + + override def defaultResult: Option[Literal] = Option(Literal.create(null, dataType)) + + override protected def withNewChildrenInternal( + newFirst: Expression, + newSecond: Expression, + newThird: Expression): VeloxApproximatePercentile = + copy(child = newFirst, percentageExpression = newSecond, accuracyExpression = newThird) +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala new file mode 100644 index 000000000000..af2810d36886 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.expression.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ + +import java.util + +/** + * KLL sketch field indices matching Velox's ApproxPercentileIntermediateTypeChildIndex. + * + * The intermediate StructType has 9 fields: 0: percentiles - Array(Double) 1: percentilesIsArray - + * Boolean 2: accuracy - Double 3: k - Integer (KLL parameter) 4: n - Long (total count) 5: minValue + * \- childType 6: maxValue - childType 7: items - Array(childType) 8: levels - Array(Integer) + */ +object KllSketchFieldIndex { + val PERCENTILES = 0 + val PERCENTILES_IS_ARRAY = 1 + val ACCURACY = 2 + val K = 3 + val N = 4 + val MIN_VALUE = 5 + val MAX_VALUE = 6 + val ITEMS = 7 + val LEVELS = 8 + val NUM_FIELDS = 9 + + /** Build the StructType for KLL sketch intermediate data. */ + def intermediateStructType(childType: DataType): StructType = StructType( + Array( + StructField("percentiles", ArrayType(DoubleType), nullable = true), + StructField("percentilesIsArray", BooleanType, nullable = true), + StructField("accuracy", DoubleType, nullable = true), + StructField("k", IntegerType, nullable = true), + StructField("n", LongType, nullable = true), + StructField("minValue", childType, nullable = true), + StructField("maxValue", childType, nullable = true), + StructField("items", ArrayType(childType), nullable = true), + StructField("levels", ArrayType(IntegerType), nullable = true) + )) + + /** Default KLL k parameter (same as Velox default). */ + val DEFAULT_K: Int = 200 +} + +/** + * Helper object encapsulating the core KLL sketch algorithm logic. + * + * This is a simplified implementation that runs on the Spark side during fallback. The struct + * layout is fully compatible with Velox's KLL sketch intermediate type, enabling partial fallback + * (Velox partial -> Spark final). + * + * The algorithm stores all inserted values in the items array (level 0). When the items array grows + * too large, a compaction step is performed to reduce memory usage while maintaining approximate + * quantile guarantees. + */ +object KllSketchHelper { + + /** + * Create an empty KLL sketch as an InternalRow (struct). + * + * @param percentiles + * Array of percentile values + * @param isArray + * Whether the percentile argument is an array + * @param accuracy + * The accuracy parameter (maps to relativeError = 1/accuracy) + * @param childType + * The data type of values being aggregated + */ + def createEmpty( + percentiles: ArrayData, + isArray: Boolean, + accuracy: Double, + childType: DataType): InternalRow = { + val k = KllSketchFieldIndex.DEFAULT_K + InternalRow( + percentiles, // percentiles + isArray, // percentilesIsArray + accuracy, // accuracy + k, // k + 0L, // n + null, // minValue + null, // maxValue + new GenericArrayData(Array.empty[Any]), // items + new GenericArrayData(Array(0, 0)) // levels: [0, 0] means 1 level with 0 items + ) + } + + /** + * Add a value to the KLL sketch. Returns a new InternalRow representing the updated sketch. + * + * @param sketch + * The current sketch as InternalRow + * @param value + * The value to add + * @param childType + * The data type of the value + */ + def add(sketch: InternalRow, value: Any, childType: DataType): InternalRow = { + if (value == null) return sketch + + val n = sketch.getLong(KllSketchFieldIndex.N) + val k = sketch.getInt(KllSketchFieldIndex.K) + val items = sketch.getArray(KllSketchFieldIndex.ITEMS) + val levels = sketch.getArray(KllSketchFieldIndex.LEVELS) + + val doubleValue = toDouble(value, childType) + + // Update min/max + val oldMin = if (sketch.isNullAt(KllSketchFieldIndex.MIN_VALUE)) { + doubleValue + } else { + math.min( + toDouble(sketch.get(KllSketchFieldIndex.MIN_VALUE, childType), childType), + doubleValue) + } + val oldMax = if (sketch.isNullAt(KllSketchFieldIndex.MAX_VALUE)) { + doubleValue + } else { + math.max( + toDouble(sketch.get(KllSketchFieldIndex.MAX_VALUE, childType), childType), + doubleValue) + } + + // Append value to items (level 0) + val newItemsArr = new Array[Any](items.numElements() + 1) + var i = 0 + while (i < items.numElements()) { + newItemsArr(i) = items.get(i, childType) + i += 1 + } + newItemsArr(items.numElements()) = fromDouble(doubleValue, childType) + + // Update levels: increment the last element (end of level 0) + val newLevelsArr = new Array[Int](levels.numElements()) + i = 0 + while (i < levels.numElements()) { + newLevelsArr(i) = levels.getInt(i) + i += 1 + } + newLevelsArr(newLevelsArr.length - 1) += 1 + + // Compaction: if level 0 is too large (> 2*k), compact + var finalItems = newItemsArr + var finalLevels = newLevelsArr + val level0Size = newLevelsArr(newLevelsArr.length - 1) - newLevelsArr(0) + if (level0Size > 2 * k) { + val compacted = compactLevel0(finalItems, finalLevels, k, childType) + finalItems = compacted._1 + finalLevels = compacted._2 + } + + InternalRow( + sketch.getArray(KllSketchFieldIndex.PERCENTILES), + sketch.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), + sketch.getDouble(KllSketchFieldIndex.ACCURACY), + k, + n + 1, + fromDouble(oldMin, childType), + fromDouble(oldMax, childType), + new GenericArrayData(finalItems), + new GenericArrayData(finalLevels.map(_.asInstanceOf[Any])) + ) + } + + /** Merge two KLL sketches. Returns a new InternalRow representing the merged sketch. */ + def merge(left: InternalRow, right: InternalRow, childType: DataType): InternalRow = { + if (left == null || left.getLong(KllSketchFieldIndex.N) == 0) return right + if (right == null || right.getLong(KllSketchFieldIndex.N) == 0) return left + + val leftN = left.getLong(KllSketchFieldIndex.N) + val rightN = right.getLong(KllSketchFieldIndex.N) + val k = math.max(left.getInt(KllSketchFieldIndex.K), right.getInt(KllSketchFieldIndex.K)) + + // Merge min/max + val leftMin = toDouble(left.get(KllSketchFieldIndex.MIN_VALUE, childType), childType) + val rightMin = toDouble(right.get(KllSketchFieldIndex.MIN_VALUE, childType), childType) + val leftMax = toDouble(left.get(KllSketchFieldIndex.MAX_VALUE, childType), childType) + val rightMax = toDouble(right.get(KllSketchFieldIndex.MAX_VALUE, childType), childType) + val mergedMin = math.min(leftMin, rightMin) + val mergedMax = math.max(leftMax, rightMax) + + // Merge items: concatenate all items from both sketches + val leftItems = left.getArray(KllSketchFieldIndex.ITEMS) + val rightItems = right.getArray(KllSketchFieldIndex.ITEMS) + val leftLevels = left.getArray(KllSketchFieldIndex.LEVELS) + val rightLevels = right.getArray(KllSketchFieldIndex.LEVELS) + + val totalItemsSize = leftItems.numElements() + rightItems.numElements() + val mergedItems = new Array[Any](totalItemsSize) + var idx = 0 + var i = 0 + while (i < leftItems.numElements()) { + mergedItems(idx) = leftItems.get(i, childType) + idx += 1 + i += 1 + } + i = 0 + while (i < rightItems.numElements()) { + mergedItems(idx) = rightItems.get(i, childType) + idx += 1 + i += 1 + } + + // Merge levels: combine level structures + // Simple approach: put all items into a single level + val mergedLevels = Array(0, totalItemsSize) + + // Compact if necessary + var finalItems = mergedItems + var finalLevels = mergedLevels + if (totalItemsSize > 2 * k) { + val compacted = compactLevel0(finalItems, finalLevels, k, childType) + finalItems = compacted._1 + finalLevels = compacted._2 + } + + // Use left's percentiles/accuracy settings + InternalRow( + left.getArray(KllSketchFieldIndex.PERCENTILES), + left.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), + left.getDouble(KllSketchFieldIndex.ACCURACY), + k, + leftN + rightN, + fromDouble(mergedMin, childType), + fromDouble(mergedMax, childType), + new GenericArrayData(finalItems), + new GenericArrayData(finalLevels.map(_.asInstanceOf[Any])) + ) + } + + /** + * Evaluate percentiles from a KLL sketch. + * + * @param sketch + * The sketch as InternalRow + * @param childType + * The data type of values + * @return + * The percentile value(s) - either a single value or an ArrayData + */ + def eval(sketch: InternalRow, childType: DataType): Any = { + val n = sketch.getLong(KllSketchFieldIndex.N) + if (n == 0) return null + + val percentiles = sketch.getArray(KllSketchFieldIndex.PERCENTILES) + val isArray = sketch.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY) + val items = sketch.getArray(KllSketchFieldIndex.ITEMS) + + // Collect all items and sort them as doubles for quantile estimation + val numItems = items.numElements() + val doubles = new Array[Double](numItems) + var i = 0 + while (i < numItems) { + doubles(i) = toDouble(items.get(i, childType), childType) + i += 1 + } + util.Arrays.sort(doubles) + + // Compute percentiles + val numPercentiles = percentiles.numElements() + val results = new Array[Any](numPercentiles) + i = 0 + while (i < numPercentiles) { + val p = percentiles.getDouble(i) + val rank = math.min((p * numItems).toInt, numItems - 1) + results(i) = fromDouble(doubles(math.max(0, rank)), childType) + i += 1 + } + + if (results.isEmpty) { + null + } else if (isArray) { + new GenericArrayData(results) + } else { + results(0) + } + } + + /** Compact level 0 by sorting and taking every other element. */ + private def compactLevel0( + items: Array[Any], + levels: Array[Int], + k: Int, + childType: DataType): (Array[Any], Array[Int]) = { + // Sort the items by their double values + val doubles = items.map(v => toDouble(v, childType)) + val indices = doubles.indices.toArray + util.Arrays.sort(indices, (a: Int, b: Int) => java.lang.Double.compare(doubles(a), doubles(b))) + + // Keep every other element (simple compaction) + val half = (items.length + 1) / 2 + val compactedItems = new Array[Any](half) + var i = 0 + while (i < half) { + compactedItems(i) = items(indices(i * 2)) + i += 1 + } + + (compactedItems, Array(0, half)) + } + + /** Convert a value to Double for comparison/sorting. */ + private def toDouble(value: Any, dataType: DataType): Double = { + if (value == null) return Double.NaN + dataType match { + case DoubleType => value.asInstanceOf[Double] + case FloatType => value.asInstanceOf[Float].toDouble + case IntegerType | DateType => value.asInstanceOf[Int].toDouble + case LongType => value.asInstanceOf[Long].toDouble + case ShortType => value.asInstanceOf[Short].toDouble + case ByteType => value.asInstanceOf[Byte].toDouble + case _: DecimalType => + value.asInstanceOf[Decimal].toDouble + case _ => value.asInstanceOf[Number].doubleValue() + } + } + + /** Convert a Double back to the original data type. */ + private def fromDouble(value: Double, dataType: DataType): Any = { + dataType match { + case DoubleType => value + case FloatType => value.toFloat + case IntegerType | DateType => value.toInt + case LongType => value.toLong + case ShortType => value.toShort + case ByteType => value.toByte + case dt: DecimalType => Decimal(value, dt.precision, dt.scale) + case _ => value + } + } +} + +/** + * Expression that adds a value to a KLL sketch. Used as the update expression in + * VeloxApproximatePercentile's DeclarativeAggregate. + * + * @param sketch + * The current sketch (struct expression) + * @param value + * The value to add + * @param childType + * The data type of the value being aggregated + */ +case class KllSketchAdd(sketch: Expression, value: Expression, childType: DataType) + extends BinaryExpression { + + override def left: Expression = sketch + override def right: Expression = value + override def dataType: DataType = sketch.dataType + override def nullable: Boolean = false + + override protected def withNewChildrenInternal( + newLeft: Expression, + newRight: Expression): Expression = + copy(sketch = newLeft, value = newRight) + + override def eval(input: InternalRow): Any = { + val sketchRow = left.eval(input).asInstanceOf[InternalRow] + val v = right.eval(input) + if (v == null) return sketchRow + KllSketchHelper.add(sketchRow, v, childType) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException("KllSketchAdd does not support codegen") +} + +/** + * Expression that merges two KLL sketches. Used as the merge expression in + * VeloxApproximatePercentile's DeclarativeAggregate. + * + * @param left + * The left sketch + * @param right + * The right sketch + * @param childType + * The data type of the values being aggregated + */ +case class KllSketchMerge(left: Expression, right: Expression, childType: DataType) + extends BinaryExpression { + + override def dataType: DataType = left.dataType + override def nullable: Boolean = false + + override protected def withNewChildrenInternal( + newLeft: Expression, + newRight: Expression): Expression = + copy(left = newLeft, right = newRight) + + override def eval(input: InternalRow): Any = { + val leftRow = left.eval(input).asInstanceOf[InternalRow] + val rightRow = right.eval(input).asInstanceOf[InternalRow] + if (leftRow == null) return rightRow + if (rightRow == null) return leftRow + KllSketchHelper.merge(leftRow, rightRow, childType) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException("KllSketchMerge does not support codegen") +} + +/** + * Expression that evaluates percentiles from a KLL sketch. Used as the evaluate expression in + * VeloxApproximatePercentile's DeclarativeAggregate. + * + * @param sketch + * The sketch expression + * @param returnArray + * Whether to return an array of percentiles + * @param resultType + * The result data type + */ +case class KllSketchEval( + sketch: Expression, + returnArray: Boolean, + resultType: DataType, + childType: DataType) + extends UnaryExpression { + + override def child: Expression = sketch + override def dataType: DataType = resultType + override def nullable: Boolean = true + + override protected def withNewChildInternal(newChild: Expression): Expression = + copy(sketch = newChild) + + override def eval(input: InternalRow): Any = { + val sketchRow = child.eval(input).asInstanceOf[InternalRow] + if (sketchRow == null) return null + KllSketchHelper.eval(sketchRow, childType) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException("KllSketchEval does not support codegen") +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala new file mode 100644 index 000000000000..f8ea4dc0b8fe --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.expression.aggregate.VeloxApproximatePercentile + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXPRESSION} + +/** + * Rewrite Spark native ApproximatePercentile to VeloxApproximatePercentile: + * - Accuracy argument must be DOUBLE type (Velox requirement) + * - Only rewrite when all expressions are resolved + */ +case class ApproxPercentileRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + if (GlutenConfig.get.glutenPercentileFunctionFallback) { + return plan + } + plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) { + case a: Aggregate => + a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { + case aggExpr @ AggregateExpression(sparkPercentile: ApproximatePercentile, _, _, _, _) => + val veloxPercentile = VeloxApproximatePercentile( + child = sparkPercentile.child, + percentageExpression = sparkPercentile.percentageExpression, + accuracyExpression = sparkPercentile.accuracyExpression + ) + + aggExpr.copy(aggregateFunction = veloxPercentile) + } + } + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 8d71e15964ea..6da5c69cabcd 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -384,6 +384,8 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def maxBroadcastTableSize: Long = JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB")) + + def glutenPercentileFunctionFallback: Boolean = getConf(GLUTEN_PERCENTILE_FUNCTION_FALLBACK) } object GlutenConfig extends ConfigRegistry { @@ -1595,4 +1597,14 @@ object GlutenConfig extends ConfigRegistry { "total size of small files is below this threshold.") .doubleConf .createWithDefault(0.5) + + val GLUTEN_PERCENTILE_FUNCTION_FALLBACK = + buildConf("spark.gluten.sql.glutenPercentileFunctionStrategy") + .doc( + "Chooses whether percentile algorithm falls back " + + "for the percentile function. kll is provided by open-source velox, but " + + "we can always disable this function and fallback.") + .internal() + .booleanConf + .createWithDefault(false) } From 34adbb47c9c20d5936125315a68838ce30dfd9ff Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Mon, 2 Mar 2026 17:29:14 +0800 Subject: [PATCH 06/16] =?UTF-8?q?1=EF=BC=89add=20partial=20fallback=20nati?= =?UTF-8?q?ve=20spark=20implementation=202)add=20corresponding=20ut?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../aggregate/VeloxApproxPercentile.scala | 447 +++++++++++++++++- .../VeloxAggregateFunctionsSuite.scala | 50 ++ 2 files changed, 493 insertions(+), 4 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala index c3f4d8b9bc11..70a5e3b3713e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala @@ -16,13 +16,18 @@ */ package org.apache.gluten.expression.aggregate +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{ApproximatePercentile, DeclarativeAggregate} +import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile +import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.trees.TernaryLike -import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.types._ +import java.util + /** * Velox-compatible DeclarativeAggregate for approx_percentile. * @@ -117,12 +122,14 @@ case class VeloxApproximatePercentile( // --- Initial values: create an empty KLL sketch --- private lazy val percentilesLiteral: Literal = { - if (percentages == null) Literal.create(null, ArrayType(DoubleType)) - else + if (percentages == null) { + Literal.create(null, ArrayType(DoubleType)) + } else { Literal.create( new org.apache.spark.sql.catalyst.util.GenericArrayData( percentages.map(_.asInstanceOf[Any])), ArrayType(DoubleType)) + } } override lazy val initialValues: Seq[Expression] = Seq( @@ -187,3 +194,435 @@ case class VeloxApproximatePercentile( newThird: Expression): VeloxApproximatePercentile = copy(child = newFirst, percentageExpression = newSecond, accuracyExpression = newThird) } + +/** + * KLL sketch field indices matching Velox's ApproxPercentileIntermediateTypeChildIndex. + * + * The intermediate StructType has 9 fields: 0: percentiles - Array(Double) 1: percentilesIsArray - + * Boolean 2: accuracy - Double 3: k - Integer (KLL parameter) 4: n - Long (total count) 5: minValue + * \- childType 6: maxValue - childType 7: items - Array(childType) 8: levels - Array(Integer) + */ +object KllSketchFieldIndex { + val PERCENTILES = 0 + val PERCENTILES_IS_ARRAY = 1 + val ACCURACY = 2 + val K = 3 + val N = 4 + val MIN_VALUE = 5 + val MAX_VALUE = 6 + val ITEMS = 7 + val LEVELS = 8 + val NUM_FIELDS = 9 + + /** Build the StructType for KLL sketch intermediate data. */ + def intermediateStructType(childType: DataType): StructType = StructType( + Array( + StructField("percentiles", ArrayType(DoubleType), nullable = true), + StructField("percentilesIsArray", BooleanType, nullable = true), + StructField("accuracy", DoubleType, nullable = true), + StructField("k", IntegerType, nullable = true), + StructField("n", LongType, nullable = true), + StructField("minValue", childType, nullable = true), + StructField("maxValue", childType, nullable = true), + StructField("items", ArrayType(childType), nullable = true), + StructField("levels", ArrayType(IntegerType), nullable = true) + )) + + /** Default KLL k parameter (same as Velox default). */ + val DEFAULT_K: Int = 200 +} + +/** + * Helper object encapsulating the core KLL sketch algorithm logic. + * + * This is a simplified implementation that runs on the Spark side during fallback. The struct + * layout is fully compatible with Velox's KLL sketch intermediate type, enabling partial fallback + * (Velox partial -> Spark final). + * + * The algorithm stores all inserted values in the items array (level 0). When the items array grows + * too large, a compaction step is performed to reduce memory usage while maintaining approximate + * quantile guarantees. + */ +object KllSketchHelper { + + /** + * Create an empty KLL sketch as an InternalRow (struct). + * + * @param percentiles + * Array of percentile values + * @param isArray + * Whether the percentile argument is an array + * @param accuracy + * The accuracy parameter (maps to relativeError = 1/accuracy) + * @param childType + * The data type of values being aggregated + */ + def createEmpty( + percentiles: ArrayData, + isArray: Boolean, + accuracy: Double, + childType: DataType): InternalRow = { + val k = KllSketchFieldIndex.DEFAULT_K + InternalRow( + percentiles, // percentiles + isArray, // percentilesIsArray + accuracy, // accuracy + k, // k + 0L, // n + null, // minValue + null, // maxValue + new GenericArrayData(Array.empty[Any]), // items + new GenericArrayData(Array(0, 0)) // levels: [0, 0] means 1 level with 0 items + ) + } + + /** + * Add a value to the KLL sketch. Returns a new InternalRow representing the updated sketch. + * + * @param sketch + * The current sketch as InternalRow + * @param value + * The value to add + * @param childType + * The data type of the value + */ + def add(sketch: InternalRow, value: Any, childType: DataType): InternalRow = { + if (value == null) return sketch + + val n = sketch.getLong(KllSketchFieldIndex.N) + val k = sketch.getInt(KllSketchFieldIndex.K) + val items = sketch.getArray(KllSketchFieldIndex.ITEMS) + val levels = sketch.getArray(KllSketchFieldIndex.LEVELS) + + val doubleValue = toDouble(value, childType) + + // Update min/max + val oldMin = if (sketch.isNullAt(KllSketchFieldIndex.MIN_VALUE)) { + doubleValue + } else { + math.min( + toDouble(sketch.get(KllSketchFieldIndex.MIN_VALUE, childType), childType), + doubleValue) + } + val oldMax = if (sketch.isNullAt(KllSketchFieldIndex.MAX_VALUE)) { + doubleValue + } else { + math.max( + toDouble(sketch.get(KllSketchFieldIndex.MAX_VALUE, childType), childType), + doubleValue) + } + + // Append value to items (level 0) + val newItemsArr = new Array[Any](items.numElements() + 1) + var i = 0 + while (i < items.numElements()) { + newItemsArr(i) = items.get(i, childType) + i += 1 + } + newItemsArr(items.numElements()) = fromDouble(doubleValue, childType) + + // Update levels: increment the last element (end of level 0) + val newLevelsArr = new Array[Int](levels.numElements()) + i = 0 + while (i < levels.numElements()) { + newLevelsArr(i) = levels.getInt(i) + i += 1 + } + newLevelsArr(newLevelsArr.length - 1) += 1 + + // Compaction: if level 0 is too large (> 2*k), compact + var finalItems = newItemsArr + var finalLevels = newLevelsArr + val level0Size = newLevelsArr(newLevelsArr.length - 1) - newLevelsArr(0) + if (level0Size > 2 * k) { + val compacted = compactLevel0(finalItems, finalLevels, k, childType) + finalItems = compacted._1 + finalLevels = compacted._2 + } + + InternalRow( + sketch.getArray(KllSketchFieldIndex.PERCENTILES), + sketch.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), + sketch.getDouble(KllSketchFieldIndex.ACCURACY), + k, + n + 1, + fromDouble(oldMin, childType), + fromDouble(oldMax, childType), + new GenericArrayData(finalItems), + new GenericArrayData(finalLevels.map(_.asInstanceOf[Any])) + ) + } + + /** Merge two KLL sketches. Returns a new InternalRow representing the merged sketch. */ + def merge(left: InternalRow, right: InternalRow, childType: DataType): InternalRow = { + if (left == null || left.getLong(KllSketchFieldIndex.N) == 0) return right + if (right == null || right.getLong(KllSketchFieldIndex.N) == 0) return left + + val leftN = left.getLong(KllSketchFieldIndex.N) + val rightN = right.getLong(KllSketchFieldIndex.N) + val k = math.max(left.getInt(KllSketchFieldIndex.K), right.getInt(KllSketchFieldIndex.K)) + + // Merge min/max + val leftMin = toDouble(left.get(KllSketchFieldIndex.MIN_VALUE, childType), childType) + val rightMin = toDouble(right.get(KllSketchFieldIndex.MIN_VALUE, childType), childType) + val leftMax = toDouble(left.get(KllSketchFieldIndex.MAX_VALUE, childType), childType) + val rightMax = toDouble(right.get(KllSketchFieldIndex.MAX_VALUE, childType), childType) + val mergedMin = math.min(leftMin, rightMin) + val mergedMax = math.max(leftMax, rightMax) + + // Merge items: concatenate all items from both sketches + val leftItems = left.getArray(KllSketchFieldIndex.ITEMS) + val rightItems = right.getArray(KllSketchFieldIndex.ITEMS) + val leftLevels = left.getArray(KllSketchFieldIndex.LEVELS) + val rightLevels = right.getArray(KllSketchFieldIndex.LEVELS) + + val totalItemsSize = leftItems.numElements() + rightItems.numElements() + val mergedItems = new Array[Any](totalItemsSize) + var idx = 0 + var i = 0 + while (i < leftItems.numElements()) { + mergedItems(idx) = leftItems.get(i, childType) + idx += 1 + i += 1 + } + i = 0 + while (i < rightItems.numElements()) { + mergedItems(idx) = rightItems.get(i, childType) + idx += 1 + i += 1 + } + + // Merge levels: combine level structures + // Simple approach: put all items into a single level + val mergedLevels = Array(0, totalItemsSize) + + // Compact if necessary + var finalItems = mergedItems + var finalLevels = mergedLevels + if (totalItemsSize > 2 * k) { + val compacted = compactLevel0(finalItems, finalLevels, k, childType) + finalItems = compacted._1 + finalLevels = compacted._2 + } + + // Use left's percentiles/accuracy settings + InternalRow( + left.getArray(KllSketchFieldIndex.PERCENTILES), + left.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), + left.getDouble(KllSketchFieldIndex.ACCURACY), + k, + leftN + rightN, + fromDouble(mergedMin, childType), + fromDouble(mergedMax, childType), + new GenericArrayData(finalItems), + new GenericArrayData(finalLevels.map(_.asInstanceOf[Any])) + ) + } + + /** + * Evaluate percentiles from a KLL sketch. + * + * @param sketch + * The sketch as InternalRow + * @param childType + * The data type of values + * @return + * The percentile value(s) - either a single value or an ArrayData + */ + def eval(sketch: InternalRow, childType: DataType): Any = { + val n = sketch.getLong(KllSketchFieldIndex.N) + if (n == 0) return null + + val percentiles = sketch.getArray(KllSketchFieldIndex.PERCENTILES) + val isArray = sketch.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY) + val items = sketch.getArray(KllSketchFieldIndex.ITEMS) + + // Collect all items and sort them as doubles for quantile estimation + val numItems = items.numElements() + val doubles = new Array[Double](numItems) + var i = 0 + while (i < numItems) { + doubles(i) = toDouble(items.get(i, childType), childType) + i += 1 + } + util.Arrays.sort(doubles) + + // Compute percentiles + val numPercentiles = percentiles.numElements() + val results = new Array[Any](numPercentiles) + i = 0 + while (i < numPercentiles) { + val p = percentiles.getDouble(i) + val rank = math.min((p * numItems).toInt, numItems - 1) + results(i) = fromDouble(doubles(math.max(0, rank)), childType) + i += 1 + } + + if (results.isEmpty) { + null + } else if (isArray) { + new GenericArrayData(results) + } else { + results(0) + } + } + + /** Compact level 0 by sorting and taking every other element. */ + private def compactLevel0( + items: Array[Any], + levels: Array[Int], + k: Int, + childType: DataType): (Array[Any], Array[Int]) = { + // Sort the items by their double values + val doubles = items.map(v => toDouble(v, childType)) + val indices = doubles.indices.toArray.map(Integer.valueOf) + util.Arrays.sort( + indices, + (a: Integer, b: Integer) => java.lang.Double.compare(doubles(a), doubles(b))) + + // Keep every other element (simple compaction) + val half = (items.length + 1) / 2 + val compactedItems = new Array[Any](half) + var i = 0 + while (i < half) { + compactedItems(i) = items(indices(i * 2)) + i += 1 + } + + (compactedItems, Array(0, half)) + } + + /** Convert a value to Double for comparison/sorting. */ + private def toDouble(value: Any, dataType: DataType): Double = { + if (value == null) return Double.NaN + dataType match { + case DoubleType => value.asInstanceOf[Double] + case FloatType => value.asInstanceOf[Float].toDouble + case IntegerType | DateType => value.asInstanceOf[Int].toDouble + case LongType => value.asInstanceOf[Long].toDouble + case ShortType => value.asInstanceOf[Short].toDouble + case ByteType => value.asInstanceOf[Byte].toDouble + case _: DecimalType => + value.asInstanceOf[Decimal].toDouble + case _ => value.asInstanceOf[Number].doubleValue() + } + } + + /** Convert a Double back to the original data type. */ + private def fromDouble(value: Double, dataType: DataType): Any = { + dataType match { + case DoubleType => value + case FloatType => value.toFloat + case IntegerType | DateType => value.toInt + case LongType => value.toLong + case ShortType => value.toShort + case ByteType => value.toByte + case dt: DecimalType => Decimal(value, dt.precision, dt.scale) + case _ => value + } + } +} + +/** + * Expression that adds a value to a KLL sketch. Used as the update expression in + * VeloxApproximatePercentile's DeclarativeAggregate. + * + * @param sketch + * The current sketch (struct expression) + * @param value + * The value to add + * @param childType + * The data type of the value being aggregated + */ +case class KllSketchAdd(sketch: Expression, value: Expression, childType: DataType) + extends BinaryExpression { + + override def left: Expression = sketch + override def right: Expression = value + override def dataType: DataType = sketch.dataType + override def nullable: Boolean = false + + override protected def withNewChildrenInternal( + newLeft: Expression, + newRight: Expression): Expression = + copy(sketch = newLeft, value = newRight) + + override def eval(input: InternalRow): Any = { + val sketchRow = left.eval(input).asInstanceOf[InternalRow] + val v = right.eval(input) + if (v == null) return sketchRow + KllSketchHelper.add(sketchRow, v, childType) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException("KllSketchAdd does not support codegen") +} + +/** + * Expression that merges two KLL sketches. Used as the merge expression in + * VeloxApproximatePercentile's DeclarativeAggregate. + * + * @param left + * The left sketch + * @param right + * The right sketch + * @param childType + * The data type of the values being aggregated + */ +case class KllSketchMerge(left: Expression, right: Expression, childType: DataType) + extends BinaryExpression { + + override def dataType: DataType = left.dataType + override def nullable: Boolean = false + + override protected def withNewChildrenInternal( + newLeft: Expression, + newRight: Expression): Expression = + copy(left = newLeft, right = newRight) + + override def eval(input: InternalRow): Any = { + val leftRow = left.eval(input).asInstanceOf[InternalRow] + val rightRow = right.eval(input).asInstanceOf[InternalRow] + if (leftRow == null) return rightRow + if (rightRow == null) return leftRow + KllSketchHelper.merge(leftRow, rightRow, childType) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException("KllSketchMerge does not support codegen") +} + +/** + * Expression that evaluates percentiles from a KLL sketch. Used as the evaluate expression in + * VeloxApproximatePercentile's DeclarativeAggregate. + * + * @param sketch + * The sketch expression + * @param returnArray + * Whether to return an array of percentiles + * @param resultType + * The result data type + */ +case class KllSketchEval( + sketch: Expression, + returnArray: Boolean, + resultType: DataType, + childType: DataType) + extends UnaryExpression { + + override def child: Expression = sketch + override def dataType: DataType = resultType + override def nullable: Boolean = true + + override protected def withNewChildInternal(newChild: Expression): Expression = + copy(sketch = newChild) + + override def eval(input: InternalRow): Any = { + val sketchRow = child.eval(input).asInstanceOf[InternalRow] + if (sketchRow == null) return null + KllSketchHelper.eval(sketchRow, childType) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new UnsupportedOperationException("KllSketchEval does not support codegen") +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala index e1c89f986967..3eb43bf19e8a 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala @@ -1068,6 +1068,56 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu } } + // Test approx_percentile with all fallback modes. + List(Offload, FallbackPartial, FallbackFinal, FallbackAll).foreach { + mode => + test(s"test fallback approx_percentile, $mode") { + mode match { + case Offload => doApproxPercentileTest() + case FallbackPartial => + FallbackInjects.fallbackOn { + case agg: BaseAggregateExec => + agg.aggregateExpressions.exists(_.mode == Partial) + } { + doApproxPercentileTest() + } + case FallbackFinal => + FallbackInjects.fallbackOn { + case agg: BaseAggregateExec => + agg.aggregateExpressions.exists(_.mode == Final) + } { + doApproxPercentileTest() + } + case FallbackAll => + FallbackInjects.fallbackOn { case _: BaseAggregateExec => true } { + doApproxPercentileTest() + } + } + + def doApproxPercentileTest(): Unit = { + withTempView("approx_pct_tmp") { + Seq(0, 6, 7, 9, 10) + .toDF("col") + .createOrReplaceTempView("approx_pct_tmp") + + // single percentile + runQueryAndCompare("SELECT approx_percentile(col, 0.5) FROM approx_pct_tmp") { _ => } + + // array percentile + runQueryAndCompare( + "SELECT approx_percentile(col, array(0.25, 0.5, 0.75)) FROM approx_pct_tmp") { _ => } + + // with group by + Seq((1, 10), (1, 20), (1, 30), (2, 5), (2, 15)) + .toDF("grp", "val") + .createOrReplaceTempView("approx_pct_grp") + runQueryAndCompare( + "SELECT grp, approx_percentile(val, 0.5) FROM approx_pct_grp GROUP BY grp") { _ => } + } + } + } + } + test("count(1)") { runQueryAndCompare( """ From a31fd5bcd7b26f511343da2eb2dd30e011d47edc Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Mon, 2 Mar 2026 17:32:02 +0800 Subject: [PATCH 07/16] remove extra config --- .../gluten/extension/PercentileRewriteRule.scala | 4 ---- .../org/apache/gluten/config/GlutenConfig.scala | 12 ------------ 2 files changed, 16 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala index f8ea4dc0b8fe..121384b42327 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.extension -import org.apache.gluten.config.GlutenConfig import org.apache.gluten.expression.aggregate.VeloxApproximatePercentile import org.apache.spark.sql.SparkSession @@ -32,9 +31,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXP */ case class ApproxPercentileRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { - if (GlutenConfig.get.glutenPercentileFunctionFallback) { - return plan - } plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) { case a: Aggregate => a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 6da5c69cabcd..8d71e15964ea 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -384,8 +384,6 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def maxBroadcastTableSize: Long = JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB")) - - def glutenPercentileFunctionFallback: Boolean = getConf(GLUTEN_PERCENTILE_FUNCTION_FALLBACK) } object GlutenConfig extends ConfigRegistry { @@ -1597,14 +1595,4 @@ object GlutenConfig extends ConfigRegistry { "total size of small files is below this threshold.") .doubleConf .createWithDefault(0.5) - - val GLUTEN_PERCENTILE_FUNCTION_FALLBACK = - buildConf("spark.gluten.sql.glutenPercentileFunctionStrategy") - .doc( - "Chooses whether percentile algorithm falls back " + - "for the percentile function. kll is provided by open-source velox, but " + - "we can always disable this function and fallback.") - .internal() - .booleanConf - .createWithDefault(false) } From 0e9d7b47b0b2d6402691c7d8a67618c2912ad804 Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Mon, 2 Mar 2026 17:36:16 +0800 Subject: [PATCH 08/16] remove extra file --- .../VeloxApproxPercentileExpressions.scala | 455 ------------------ 1 file changed, 455 deletions(-) delete mode 100644 backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala deleted file mode 100644 index af2810d36886..000000000000 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentileExpressions.scala +++ /dev/null @@ -1,455 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.expression.aggregate - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} -import org.apache.spark.sql.types._ - -import java.util - -/** - * KLL sketch field indices matching Velox's ApproxPercentileIntermediateTypeChildIndex. - * - * The intermediate StructType has 9 fields: 0: percentiles - Array(Double) 1: percentilesIsArray - - * Boolean 2: accuracy - Double 3: k - Integer (KLL parameter) 4: n - Long (total count) 5: minValue - * \- childType 6: maxValue - childType 7: items - Array(childType) 8: levels - Array(Integer) - */ -object KllSketchFieldIndex { - val PERCENTILES = 0 - val PERCENTILES_IS_ARRAY = 1 - val ACCURACY = 2 - val K = 3 - val N = 4 - val MIN_VALUE = 5 - val MAX_VALUE = 6 - val ITEMS = 7 - val LEVELS = 8 - val NUM_FIELDS = 9 - - /** Build the StructType for KLL sketch intermediate data. */ - def intermediateStructType(childType: DataType): StructType = StructType( - Array( - StructField("percentiles", ArrayType(DoubleType), nullable = true), - StructField("percentilesIsArray", BooleanType, nullable = true), - StructField("accuracy", DoubleType, nullable = true), - StructField("k", IntegerType, nullable = true), - StructField("n", LongType, nullable = true), - StructField("minValue", childType, nullable = true), - StructField("maxValue", childType, nullable = true), - StructField("items", ArrayType(childType), nullable = true), - StructField("levels", ArrayType(IntegerType), nullable = true) - )) - - /** Default KLL k parameter (same as Velox default). */ - val DEFAULT_K: Int = 200 -} - -/** - * Helper object encapsulating the core KLL sketch algorithm logic. - * - * This is a simplified implementation that runs on the Spark side during fallback. The struct - * layout is fully compatible with Velox's KLL sketch intermediate type, enabling partial fallback - * (Velox partial -> Spark final). - * - * The algorithm stores all inserted values in the items array (level 0). When the items array grows - * too large, a compaction step is performed to reduce memory usage while maintaining approximate - * quantile guarantees. - */ -object KllSketchHelper { - - /** - * Create an empty KLL sketch as an InternalRow (struct). - * - * @param percentiles - * Array of percentile values - * @param isArray - * Whether the percentile argument is an array - * @param accuracy - * The accuracy parameter (maps to relativeError = 1/accuracy) - * @param childType - * The data type of values being aggregated - */ - def createEmpty( - percentiles: ArrayData, - isArray: Boolean, - accuracy: Double, - childType: DataType): InternalRow = { - val k = KllSketchFieldIndex.DEFAULT_K - InternalRow( - percentiles, // percentiles - isArray, // percentilesIsArray - accuracy, // accuracy - k, // k - 0L, // n - null, // minValue - null, // maxValue - new GenericArrayData(Array.empty[Any]), // items - new GenericArrayData(Array(0, 0)) // levels: [0, 0] means 1 level with 0 items - ) - } - - /** - * Add a value to the KLL sketch. Returns a new InternalRow representing the updated sketch. - * - * @param sketch - * The current sketch as InternalRow - * @param value - * The value to add - * @param childType - * The data type of the value - */ - def add(sketch: InternalRow, value: Any, childType: DataType): InternalRow = { - if (value == null) return sketch - - val n = sketch.getLong(KllSketchFieldIndex.N) - val k = sketch.getInt(KllSketchFieldIndex.K) - val items = sketch.getArray(KllSketchFieldIndex.ITEMS) - val levels = sketch.getArray(KllSketchFieldIndex.LEVELS) - - val doubleValue = toDouble(value, childType) - - // Update min/max - val oldMin = if (sketch.isNullAt(KllSketchFieldIndex.MIN_VALUE)) { - doubleValue - } else { - math.min( - toDouble(sketch.get(KllSketchFieldIndex.MIN_VALUE, childType), childType), - doubleValue) - } - val oldMax = if (sketch.isNullAt(KllSketchFieldIndex.MAX_VALUE)) { - doubleValue - } else { - math.max( - toDouble(sketch.get(KllSketchFieldIndex.MAX_VALUE, childType), childType), - doubleValue) - } - - // Append value to items (level 0) - val newItemsArr = new Array[Any](items.numElements() + 1) - var i = 0 - while (i < items.numElements()) { - newItemsArr(i) = items.get(i, childType) - i += 1 - } - newItemsArr(items.numElements()) = fromDouble(doubleValue, childType) - - // Update levels: increment the last element (end of level 0) - val newLevelsArr = new Array[Int](levels.numElements()) - i = 0 - while (i < levels.numElements()) { - newLevelsArr(i) = levels.getInt(i) - i += 1 - } - newLevelsArr(newLevelsArr.length - 1) += 1 - - // Compaction: if level 0 is too large (> 2*k), compact - var finalItems = newItemsArr - var finalLevels = newLevelsArr - val level0Size = newLevelsArr(newLevelsArr.length - 1) - newLevelsArr(0) - if (level0Size > 2 * k) { - val compacted = compactLevel0(finalItems, finalLevels, k, childType) - finalItems = compacted._1 - finalLevels = compacted._2 - } - - InternalRow( - sketch.getArray(KllSketchFieldIndex.PERCENTILES), - sketch.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), - sketch.getDouble(KllSketchFieldIndex.ACCURACY), - k, - n + 1, - fromDouble(oldMin, childType), - fromDouble(oldMax, childType), - new GenericArrayData(finalItems), - new GenericArrayData(finalLevels.map(_.asInstanceOf[Any])) - ) - } - - /** Merge two KLL sketches. Returns a new InternalRow representing the merged sketch. */ - def merge(left: InternalRow, right: InternalRow, childType: DataType): InternalRow = { - if (left == null || left.getLong(KllSketchFieldIndex.N) == 0) return right - if (right == null || right.getLong(KllSketchFieldIndex.N) == 0) return left - - val leftN = left.getLong(KllSketchFieldIndex.N) - val rightN = right.getLong(KllSketchFieldIndex.N) - val k = math.max(left.getInt(KllSketchFieldIndex.K), right.getInt(KllSketchFieldIndex.K)) - - // Merge min/max - val leftMin = toDouble(left.get(KllSketchFieldIndex.MIN_VALUE, childType), childType) - val rightMin = toDouble(right.get(KllSketchFieldIndex.MIN_VALUE, childType), childType) - val leftMax = toDouble(left.get(KllSketchFieldIndex.MAX_VALUE, childType), childType) - val rightMax = toDouble(right.get(KllSketchFieldIndex.MAX_VALUE, childType), childType) - val mergedMin = math.min(leftMin, rightMin) - val mergedMax = math.max(leftMax, rightMax) - - // Merge items: concatenate all items from both sketches - val leftItems = left.getArray(KllSketchFieldIndex.ITEMS) - val rightItems = right.getArray(KllSketchFieldIndex.ITEMS) - val leftLevels = left.getArray(KllSketchFieldIndex.LEVELS) - val rightLevels = right.getArray(KllSketchFieldIndex.LEVELS) - - val totalItemsSize = leftItems.numElements() + rightItems.numElements() - val mergedItems = new Array[Any](totalItemsSize) - var idx = 0 - var i = 0 - while (i < leftItems.numElements()) { - mergedItems(idx) = leftItems.get(i, childType) - idx += 1 - i += 1 - } - i = 0 - while (i < rightItems.numElements()) { - mergedItems(idx) = rightItems.get(i, childType) - idx += 1 - i += 1 - } - - // Merge levels: combine level structures - // Simple approach: put all items into a single level - val mergedLevels = Array(0, totalItemsSize) - - // Compact if necessary - var finalItems = mergedItems - var finalLevels = mergedLevels - if (totalItemsSize > 2 * k) { - val compacted = compactLevel0(finalItems, finalLevels, k, childType) - finalItems = compacted._1 - finalLevels = compacted._2 - } - - // Use left's percentiles/accuracy settings - InternalRow( - left.getArray(KllSketchFieldIndex.PERCENTILES), - left.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), - left.getDouble(KllSketchFieldIndex.ACCURACY), - k, - leftN + rightN, - fromDouble(mergedMin, childType), - fromDouble(mergedMax, childType), - new GenericArrayData(finalItems), - new GenericArrayData(finalLevels.map(_.asInstanceOf[Any])) - ) - } - - /** - * Evaluate percentiles from a KLL sketch. - * - * @param sketch - * The sketch as InternalRow - * @param childType - * The data type of values - * @return - * The percentile value(s) - either a single value or an ArrayData - */ - def eval(sketch: InternalRow, childType: DataType): Any = { - val n = sketch.getLong(KllSketchFieldIndex.N) - if (n == 0) return null - - val percentiles = sketch.getArray(KllSketchFieldIndex.PERCENTILES) - val isArray = sketch.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY) - val items = sketch.getArray(KllSketchFieldIndex.ITEMS) - - // Collect all items and sort them as doubles for quantile estimation - val numItems = items.numElements() - val doubles = new Array[Double](numItems) - var i = 0 - while (i < numItems) { - doubles(i) = toDouble(items.get(i, childType), childType) - i += 1 - } - util.Arrays.sort(doubles) - - // Compute percentiles - val numPercentiles = percentiles.numElements() - val results = new Array[Any](numPercentiles) - i = 0 - while (i < numPercentiles) { - val p = percentiles.getDouble(i) - val rank = math.min((p * numItems).toInt, numItems - 1) - results(i) = fromDouble(doubles(math.max(0, rank)), childType) - i += 1 - } - - if (results.isEmpty) { - null - } else if (isArray) { - new GenericArrayData(results) - } else { - results(0) - } - } - - /** Compact level 0 by sorting and taking every other element. */ - private def compactLevel0( - items: Array[Any], - levels: Array[Int], - k: Int, - childType: DataType): (Array[Any], Array[Int]) = { - // Sort the items by their double values - val doubles = items.map(v => toDouble(v, childType)) - val indices = doubles.indices.toArray - util.Arrays.sort(indices, (a: Int, b: Int) => java.lang.Double.compare(doubles(a), doubles(b))) - - // Keep every other element (simple compaction) - val half = (items.length + 1) / 2 - val compactedItems = new Array[Any](half) - var i = 0 - while (i < half) { - compactedItems(i) = items(indices(i * 2)) - i += 1 - } - - (compactedItems, Array(0, half)) - } - - /** Convert a value to Double for comparison/sorting. */ - private def toDouble(value: Any, dataType: DataType): Double = { - if (value == null) return Double.NaN - dataType match { - case DoubleType => value.asInstanceOf[Double] - case FloatType => value.asInstanceOf[Float].toDouble - case IntegerType | DateType => value.asInstanceOf[Int].toDouble - case LongType => value.asInstanceOf[Long].toDouble - case ShortType => value.asInstanceOf[Short].toDouble - case ByteType => value.asInstanceOf[Byte].toDouble - case _: DecimalType => - value.asInstanceOf[Decimal].toDouble - case _ => value.asInstanceOf[Number].doubleValue() - } - } - - /** Convert a Double back to the original data type. */ - private def fromDouble(value: Double, dataType: DataType): Any = { - dataType match { - case DoubleType => value - case FloatType => value.toFloat - case IntegerType | DateType => value.toInt - case LongType => value.toLong - case ShortType => value.toShort - case ByteType => value.toByte - case dt: DecimalType => Decimal(value, dt.precision, dt.scale) - case _ => value - } - } -} - -/** - * Expression that adds a value to a KLL sketch. Used as the update expression in - * VeloxApproximatePercentile's DeclarativeAggregate. - * - * @param sketch - * The current sketch (struct expression) - * @param value - * The value to add - * @param childType - * The data type of the value being aggregated - */ -case class KllSketchAdd(sketch: Expression, value: Expression, childType: DataType) - extends BinaryExpression { - - override def left: Expression = sketch - override def right: Expression = value - override def dataType: DataType = sketch.dataType - override def nullable: Boolean = false - - override protected def withNewChildrenInternal( - newLeft: Expression, - newRight: Expression): Expression = - copy(sketch = newLeft, value = newRight) - - override def eval(input: InternalRow): Any = { - val sketchRow = left.eval(input).asInstanceOf[InternalRow] - val v = right.eval(input) - if (v == null) return sketchRow - KllSketchHelper.add(sketchRow, v, childType) - } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - throw new UnsupportedOperationException("KllSketchAdd does not support codegen") -} - -/** - * Expression that merges two KLL sketches. Used as the merge expression in - * VeloxApproximatePercentile's DeclarativeAggregate. - * - * @param left - * The left sketch - * @param right - * The right sketch - * @param childType - * The data type of the values being aggregated - */ -case class KllSketchMerge(left: Expression, right: Expression, childType: DataType) - extends BinaryExpression { - - override def dataType: DataType = left.dataType - override def nullable: Boolean = false - - override protected def withNewChildrenInternal( - newLeft: Expression, - newRight: Expression): Expression = - copy(left = newLeft, right = newRight) - - override def eval(input: InternalRow): Any = { - val leftRow = left.eval(input).asInstanceOf[InternalRow] - val rightRow = right.eval(input).asInstanceOf[InternalRow] - if (leftRow == null) return rightRow - if (rightRow == null) return leftRow - KllSketchHelper.merge(leftRow, rightRow, childType) - } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - throw new UnsupportedOperationException("KllSketchMerge does not support codegen") -} - -/** - * Expression that evaluates percentiles from a KLL sketch. Used as the evaluate expression in - * VeloxApproximatePercentile's DeclarativeAggregate. - * - * @param sketch - * The sketch expression - * @param returnArray - * Whether to return an array of percentiles - * @param resultType - * The result data type - */ -case class KllSketchEval( - sketch: Expression, - returnArray: Boolean, - resultType: DataType, - childType: DataType) - extends UnaryExpression { - - override def child: Expression = sketch - override def dataType: DataType = resultType - override def nullable: Boolean = true - - override protected def withNewChildInternal(newChild: Expression): Expression = - copy(sketch = newChild) - - override def eval(input: InternalRow): Any = { - val sketchRow = child.eval(input).asInstanceOf[InternalRow] - if (sketchRow == null) return null - KllSketchHelper.eval(sketchRow, childType) - } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - throw new UnsupportedOperationException("KllSketchEval does not support codegen") -} From e20a3c6341b4461f85cbcf5351c6e9ce388128b7 Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Mon, 2 Mar 2026 17:46:20 +0800 Subject: [PATCH 09/16] remove extra file --- .../gluten/expression/aggregate/VeloxApproxPercentile.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala index 70a5e3b3713e..5c3dc2744b04 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala @@ -75,7 +75,7 @@ case class VeloxApproximatePercentile( override def checkInputDataTypes(): TypeCheckResult = { // Delegate to Spark's ApproximatePercentile for validation - ApproximatePercentile(child, percentageExpression, accuracyExpression) + new ApproximatePercentile(child, percentageExpression, accuracyExpression) .checkInputDataTypes() } From eb706393710aab3cf1a6655d7cbe3a14ecd2ab1f Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Tue, 3 Mar 2026 11:01:53 +0800 Subject: [PATCH 10/16] remove extra file --- .../aggregate/VeloxApproxPercentile.scala | 30 +++++----- .../extension/PercentileRewriteRule.scala | 60 +++++++++++++++---- 2 files changed, 61 insertions(+), 29 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala index 5c3dc2744b04..8b80036afaa1 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala @@ -35,7 +35,7 @@ import java.util * intermediate data), this implementation uses KLL sketch with a 9-field StructType intermediate * that is fully compatible with Velox's approx_percentile accumulator layout: * - * 0: percentiles - Array(Double) 1: percentilesIsArray - Boolean 2: accuracy - Double 3: k - + * 0: percentiles - Array(Double) 1: percentilesIsArray - Boolean 2: accuracy - Integer (Spark accuracy, e.g. 10000; Velox internally computes epsilon = 1.0/accuracy) 3: k - * Integer (KLL parameter) 4: n - Long (total count) 5: minValue - childType 6: maxValue - childType * 7: items - Array(childType) 8: levels - Array(Integer) * @@ -61,9 +61,9 @@ case class VeloxApproximatePercentile( override def prettyName: String = "velox_approx_percentile" // Mark as lazy so that expressions are not evaluated during tree transformation. - private lazy val accuracy: Double = accuracyExpression.eval() match { - case null => ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY.toDouble - case num: Number => num.doubleValue() + private lazy val accuracy: Int = accuracyExpression.eval() match { + case null => ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY.toInt + case num: Number => num.intValue() } private lazy val (returnPercentileArray, percentages): (Boolean, Array[Double]) = @@ -93,7 +93,7 @@ case class VeloxApproximatePercentile( private lazy val percentilesIsArrayBuf: AttributeReference = AttributeReference("percentilesIsArray", BooleanType)() private lazy val accuracyBuf: AttributeReference = - AttributeReference("accuracy", DoubleType)() + AttributeReference("accuracy", IntegerType)() private lazy val kBuf: AttributeReference = AttributeReference("k", IntegerType)() private lazy val nBuf: AttributeReference = @@ -126,7 +126,7 @@ case class VeloxApproximatePercentile( Literal.create(null, ArrayType(DoubleType)) } else { Literal.create( - new org.apache.spark.sql.catalyst.util.GenericArrayData( + new GenericArrayData( percentages.map(_.asInstanceOf[Any])), ArrayType(DoubleType)) } @@ -135,17 +135,17 @@ case class VeloxApproximatePercentile( override lazy val initialValues: Seq[Expression] = Seq( percentilesLiteral, // percentiles Literal.create(returnPercentileArray, BooleanType), // percentilesIsArray - Literal.create(accuracy, DoubleType), // accuracy + Literal.create(accuracy, IntegerType), // accuracy Literal.create(KllSketchFieldIndex.DEFAULT_K, IntegerType), // k Literal.create(0L, LongType), // n Literal.create(null, child.dataType), // minValue Literal.create(null, child.dataType), // maxValue Literal.create( - new org.apache.spark.sql.catalyst.util.GenericArrayData(Array.empty[Any]), + new GenericArrayData(Array.empty[Any]), ArrayType(child.dataType) ), // items Literal.create( - new org.apache.spark.sql.catalyst.util.GenericArrayData(Array(0, 0)), + new GenericArrayData(Array(0, 0)), ArrayType(IntegerType) ) // levels ) @@ -199,7 +199,7 @@ case class VeloxApproximatePercentile( * KLL sketch field indices matching Velox's ApproxPercentileIntermediateTypeChildIndex. * * The intermediate StructType has 9 fields: 0: percentiles - Array(Double) 1: percentilesIsArray - - * Boolean 2: accuracy - Double 3: k - Integer (KLL parameter) 4: n - Long (total count) 5: minValue + * Boolean 2: accuracy - Integer (Spark accuracy) 3: k - Integer (KLL parameter) 4: n - Long (total count) 5: minValue * \- childType 6: maxValue - childType 7: items - Array(childType) 8: levels - Array(Integer) */ object KllSketchFieldIndex { @@ -219,7 +219,7 @@ object KllSketchFieldIndex { Array( StructField("percentiles", ArrayType(DoubleType), nullable = true), StructField("percentilesIsArray", BooleanType, nullable = true), - StructField("accuracy", DoubleType, nullable = true), + StructField("accuracy", IntegerType, nullable = true), StructField("k", IntegerType, nullable = true), StructField("n", LongType, nullable = true), StructField("minValue", childType, nullable = true), @@ -260,7 +260,7 @@ object KllSketchHelper { def createEmpty( percentiles: ArrayData, isArray: Boolean, - accuracy: Double, + accuracy: Int, childType: DataType): InternalRow = { val k = KllSketchFieldIndex.DEFAULT_K InternalRow( @@ -343,7 +343,7 @@ object KllSketchHelper { InternalRow( sketch.getArray(KllSketchFieldIndex.PERCENTILES), sketch.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), - sketch.getDouble(KllSketchFieldIndex.ACCURACY), + sketch.getInt(KllSketchFieldIndex.ACCURACY), k, n + 1, fromDouble(oldMin, childType), @@ -373,8 +373,6 @@ object KllSketchHelper { // Merge items: concatenate all items from both sketches val leftItems = left.getArray(KllSketchFieldIndex.ITEMS) val rightItems = right.getArray(KllSketchFieldIndex.ITEMS) - val leftLevels = left.getArray(KllSketchFieldIndex.LEVELS) - val rightLevels = right.getArray(KllSketchFieldIndex.LEVELS) val totalItemsSize = leftItems.numElements() + rightItems.numElements() val mergedItems = new Array[Any](totalItemsSize) @@ -409,7 +407,7 @@ object KllSketchHelper { InternalRow( left.getArray(KllSketchFieldIndex.PERCENTILES), left.getBoolean(KllSketchFieldIndex.PERCENTILES_IS_ARRAY), - left.getDouble(KllSketchFieldIndex.ACCURACY), + left.getInt(KllSketchFieldIndex.ACCURACY), k, leftN + rightN, fromDouble(mergedMin, childType), diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala index 121384b42327..f87eae463ff0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/PercentileRewriteRule.scala @@ -16,33 +16,67 @@ */ package org.apache.gluten.extension +import org.apache.gluten.expression.ExpressionMappings import org.apache.gluten.expression.aggregate.VeloxApproximatePercentile import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, AGGREGATE_EXPRESSION} +import scala.reflect.{classTag, ClassTag} + /** * Rewrite Spark native ApproximatePercentile to VeloxApproximatePercentile: - * - Accuracy argument must be DOUBLE type (Velox requirement) - * - Only rewrite when all expressions are resolved + * - Velox uses a 9-field StructType intermediate (KLL sketch), incompatible with Spark's + * TypedImperativeAggregate (single BinaryType buffer). + * - Accuracy is passed as-is (Spark's original integer value, e.g. 10000). Velox C++ + * SparkAccuracyPolicy internally computes epsilon = 1.0 / accuracy. */ case class ApproxPercentileRewriteRule(spark: SparkSession) extends Rule[LogicalPlan] { + import ApproxPercentileRewriteRule._ override def apply(plan: LogicalPlan): LogicalPlan = { - plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) { - case a: Aggregate => - a.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { - case aggExpr @ AggregateExpression(sparkPercentile: ApproximatePercentile, _, _, _, _) => - val veloxPercentile = VeloxApproximatePercentile( - child = sparkPercentile.child, - percentageExpression = sparkPercentile.percentageExpression, - accuracyExpression = sparkPercentile.accuracyExpression - ) - - aggExpr.copy(aggregateFunction = veloxPercentile) + if (!has[VeloxApproximatePercentile]) { + return plan + } + + val newPlan = plan.transformUpWithPruning(_.containsPattern(AGGREGATE)) { + case node => + replaceApproxPercentile(node) + } + if (newPlan.fastEquals(plan)) { + return plan + } + newPlan + } + + private def replaceApproxPercentile(node: LogicalPlan): LogicalPlan = { + node match { + case agg: Aggregate => + agg.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { + case ToVeloxApproxPercentile(newAggExpr) => + newAggExpr } + case other => other + } + } +} + +object ApproxPercentileRewriteRule { + private object ToVeloxApproxPercentile { + def unapply(expr: Expression): Option[Expression] = expr match { + case aggExpr @ AggregateExpression(ap: ApproximatePercentile, _, _, _, _) + if has[VeloxApproximatePercentile] => + val newAggExpr = aggExpr.copy( + aggregateFunction = + VeloxApproximatePercentile(ap.child, ap.percentageExpression, ap.accuracyExpression)) + Some(newAggExpr) + case _ => None } } + + private def has[T <: Expression: ClassTag]: Boolean = + ExpressionMappings.expressionsMap.contains(classTag[T].runtimeClass) } From d7101522c6a388f1cc97b19d06a0f95c4174a96f Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Tue, 3 Mar 2026 11:08:41 +0800 Subject: [PATCH 11/16] checkstyle --- .../aggregate/VeloxApproxPercentile.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala index 8b80036afaa1..c9011fd3d272 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala @@ -35,9 +35,10 @@ import java.util * intermediate data), this implementation uses KLL sketch with a 9-field StructType intermediate * that is fully compatible with Velox's approx_percentile accumulator layout: * - * 0: percentiles - Array(Double) 1: percentilesIsArray - Boolean 2: accuracy - Integer (Spark accuracy, e.g. 10000; Velox internally computes epsilon = 1.0/accuracy) 3: k - - * Integer (KLL parameter) 4: n - Long (total count) 5: minValue - childType 6: maxValue - childType - * 7: items - Array(childType) 8: levels - Array(Integer) + * 0: percentiles - Array(Double) 1: percentilesIsArray - Boolean 2: accuracy - Integer (Spark + * accuracy, e.g. 10000; Velox internally computes epsilon = 1.0/accuracy) 3: k - Integer (KLL + * parameter) 4: n - Long (total count) 5: minValue - childType 6: maxValue - childType 7: items - + * Array(childType) 8: levels - Array(Integer) * * Because aggBufferAttributes has 9 fields (> 1), the existing VeloxIntermediateData.Type default * branch (aggBufferAttributes.size > 1) will match automatically, meaning: @@ -126,8 +127,7 @@ case class VeloxApproximatePercentile( Literal.create(null, ArrayType(DoubleType)) } else { Literal.create( - new GenericArrayData( - percentages.map(_.asInstanceOf[Any])), + new GenericArrayData(percentages.map(_.asInstanceOf[Any])), ArrayType(DoubleType)) } } @@ -199,8 +199,9 @@ case class VeloxApproximatePercentile( * KLL sketch field indices matching Velox's ApproxPercentileIntermediateTypeChildIndex. * * The intermediate StructType has 9 fields: 0: percentiles - Array(Double) 1: percentilesIsArray - - * Boolean 2: accuracy - Integer (Spark accuracy) 3: k - Integer (KLL parameter) 4: n - Long (total count) 5: minValue - * \- childType 6: maxValue - childType 7: items - Array(childType) 8: levels - Array(Integer) + * Boolean 2: accuracy - Integer (Spark accuracy) 3: k - Integer (KLL parameter) 4: n - Long (total + * count) 5: minValue \- childType 6: maxValue - childType 7: items - Array(childType) 8: levels - + * Array(Integer) */ object KllSketchFieldIndex { val PERCENTILES = 0 From 85c9fc5e60f5c1e53214fc0d4331b51f5d31bcaf Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Tue, 3 Mar 2026 11:46:47 +0800 Subject: [PATCH 12/16] remove optimizer tricks so that signature is consistent with vanilla spark --- .../aggregate/VeloxApproxPercentile.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala index c9011fd3d272..309dbc76b148 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxApproxPercentile.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.trees.TernaryLike import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.types._ @@ -452,8 +452,10 @@ object KllSketchHelper { i = 0 while (i < numPercentiles) { val p = percentiles.getDouble(i) - val rank = math.min((p * numItems).toInt, numItems - 1) - results(i) = fromDouble(doubles(math.max(0, rank)), childType) + // Use "nearest rank" method to compute the percentile index, aligning with Spark's + // GK algorithm results: rank = ceil(p * n) - 1 (0-indexed), clamped to valid range + val rank = math.max(0, math.min(math.ceil(p * numItems).toInt - 1, numItems - 1)) + results(i) = fromDouble(doubles(rank), childType) i += 1 } @@ -534,7 +536,8 @@ object KllSketchHelper { * The data type of the value being aggregated */ case class KllSketchAdd(sketch: Expression, value: Expression, childType: DataType) - extends BinaryExpression { + extends BinaryExpression + with CodegenFallback { override def left: Expression = sketch override def right: Expression = value @@ -552,9 +555,6 @@ case class KllSketchAdd(sketch: Expression, value: Expression, childType: DataTy if (v == null) return sketchRow KllSketchHelper.add(sketchRow, v, childType) } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - throw new UnsupportedOperationException("KllSketchAdd does not support codegen") } /** @@ -569,7 +569,8 @@ case class KllSketchAdd(sketch: Expression, value: Expression, childType: DataTy * The data type of the values being aggregated */ case class KllSketchMerge(left: Expression, right: Expression, childType: DataType) - extends BinaryExpression { + extends BinaryExpression + with CodegenFallback { override def dataType: DataType = left.dataType override def nullable: Boolean = false @@ -586,9 +587,6 @@ case class KllSketchMerge(left: Expression, right: Expression, childType: DataTy if (rightRow == null) return leftRow KllSketchHelper.merge(leftRow, rightRow, childType) } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - throw new UnsupportedOperationException("KllSketchMerge does not support codegen") } /** @@ -601,13 +599,16 @@ case class KllSketchMerge(left: Expression, right: Expression, childType: DataTy * Whether to return an array of percentiles * @param resultType * The result data type + * @param childType + * The data type of values being aggregated */ case class KllSketchEval( sketch: Expression, returnArray: Boolean, resultType: DataType, childType: DataType) - extends UnaryExpression { + extends UnaryExpression + with CodegenFallback { override def child: Expression = sketch override def dataType: DataType = resultType @@ -621,7 +622,4 @@ case class KllSketchEval( if (sketchRow == null) return null KllSketchHelper.eval(sketchRow, childType) } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - throw new UnsupportedOperationException("KllSketchEval does not support codegen") } From f757475caaeefd979d806664bbaa75382d3e16ee Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Fri, 6 Mar 2026 11:44:38 +0800 Subject: [PATCH 13/16] resolve conflict markers in CHSparkPlanExecApi.scala and remove CH-backend ApproximatePercentile registration (Velox PR only) --- .../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index a3e817d57b55..cdf2eae418b4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -41,13 +41,7 @@ import org.apache.spark.shuffle.utils.CHShuffleUtil import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, CollectList, CollectSet} ->>>>>>> 46c41c8ef... copied clickhouse related changes -======= -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, BloomFilterAggregate, CollectList, CollectSet} -======= -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, CollectList, CollectSet} ->>>>>>> 46c41c8ef... copied clickhouse related changes +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate, CollectList, CollectSet} import org.apache.spark.sql.catalyst.optimizer.BuildSide import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning} @@ -603,7 +597,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { List( Sig[CollectList](ExpressionNames.COLLECT_LIST), Sig[CollectSet](ExpressionNames.COLLECT_SET), - Sig[ApproximatePercentile](ExpressionNames.APPROX_PERCENTILE), Sig[MonotonicallyIncreasingID](MONOTONICALLY_INCREASING_ID), CHFlattenedExpression.sigAnd, CHFlattenedExpression.sigOr From d14b740f36400804a99919182de346531352f082 Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Fri, 6 Mar 2026 11:45:42 +0800 Subject: [PATCH 14/16] remove ck api change --- .../gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index cdf2eae418b4..fa91e12e0fc5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -41,7 +41,7 @@ import org.apache.spark.shuffle.utils.CHShuffleUtil import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate, CollectList, CollectSet} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, BloomFilterAggregate, CollectList, CollectSet} import org.apache.spark.sql.catalyst.optimizer.BuildSide import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning} @@ -595,6 +595,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { /** Define backend specfic expression mappings. */ override def extraExpressionMappings: Seq[Sig] = { List( + Sig[ApproximatePercentile](ExpressionNames.APPROX_PERCENTILE), Sig[CollectList](ExpressionNames.COLLECT_LIST), Sig[CollectSet](ExpressionNames.COLLECT_SET), Sig[MonotonicallyIncreasingID](MONOTONICALLY_INCREASING_ID), From cdf32ccf36bb38df7552cd061947da82e8a6396b Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Tue, 10 Mar 2026 16:32:17 +0800 Subject: [PATCH 15/16] Clear UPSTREAM_VELOX_PR_ID for developer testing --- ep/build-velox/src/get-velox.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index b4903890fcd4..558f988e5c93 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -25,7 +25,7 @@ RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF # Developer use only for testing Velox PR. -UPSTREAM_VELOX_PR_ID="16320" +UPSTREAM_VELOX_PR_ID="" OS=`uname -s` From 7eb59aef4f33f6faffb0b86dc6fa19240d572c1a Mon Sep 17 00:00:00 2001 From: yizhouyang Date: Tue, 10 Mar 2026 19:15:23 +0800 Subject: [PATCH 16/16] fix approx_percentile test: add more data points to make median deterministic across Spark and Velox implementations --- .../apache/gluten/execution/VeloxAggregateFunctionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala index 3eb43bf19e8a..008909890388 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala @@ -1108,7 +1108,7 @@ abstract class VeloxAggregateFunctionsSuite extends VeloxWholeStageTransformerSu "SELECT approx_percentile(col, array(0.25, 0.5, 0.75)) FROM approx_pct_tmp") { _ => } // with group by - Seq((1, 10), (1, 20), (1, 30), (2, 5), (2, 15)) + Seq((1, 10), (1, 20), (1, 30), (2, 5), (2, 10), (2, 15)) .toDF("grp", "val") .createOrReplaceTempView("approx_pct_grp") runQueryAndCompare(