Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c => FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c => HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(_ => V2WritePostRule())
Expand Down Expand Up @@ -222,7 +221,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c => FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c => HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(_ => V2WritePostRule())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def veloxOrcScanEnabled: Boolean =
getConf(VELOX_ORC_SCAN_ENABLED)

def enablePropagateIgnoreNullKeys: Boolean =
getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED)

def floatingPointMode: String = getConf(FLOATING_POINT_MODE)

def enableRewriteCastArrayToString: Boolean =
Expand Down Expand Up @@ -588,15 +585,6 @@ object VeloxConfig extends ConfigRegistry {
.stringConf
.createWithDefault("")

val VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys")
.doc(
"If enabled, we will identify aggregation followed by an inner join " +
"on the grouping keys, and mark the ignoreNullKeys flag to true to " +
"avoid unnecessary aggregation on null keys.")
.booleanConf
.createWithDefault(true)

val FLOATING_POINT_MODE =
buildConf("spark.gluten.sql.columnar.backend.velox.floatingPointMode")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,15 @@ abstract class HashAggregateExecTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan,
ignoreNullKeys: Boolean)
child: SparkPlan)
extends HashAggregateExecBaseTransformer(
requiredChildDistributionExpressions,
groupingExpressions,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
child,
ignoreNullKeys
) {
child) {

override def output: Seq[Attribute] = {
// TODO: We should have a check to make sure the returned schema actually matches the output
Expand Down Expand Up @@ -186,8 +183,7 @@ abstract class HashAggregateExecTransformer(
private def formatExtOptimizationString(isStreaming: Boolean): String = {
val isStreamingStr = if (isStreaming) "1" else "0"
val allowFlushStr = if (allowFlush) "1" else "0"
val ignoreNullKeysStr = if (ignoreNullKeys) "1" else "0"
s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\nignoreNullKeys=$ignoreNullKeysStr\n"
s"isStreaming=$isStreamingStr\nallowFlush=$allowFlushStr\n"
}

// Create aggregate function node and add to list.
Expand Down Expand Up @@ -661,18 +657,15 @@ case class RegularHashAggregateExecTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan,
ignoreNullKeys: Boolean = false)
child: SparkPlan)
extends HashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
child,
ignoreNullKeys
) {
child) {

override protected def allowFlush: Boolean = false

Expand All @@ -696,18 +689,15 @@ case class FlushableHashAggregateExecTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan,
ignoreNullKeys: Boolean = false)
child: SparkPlan)
extends HashAggregateExecTransformer(
requiredChildDistributionExpressions,
groupingExpressions,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
child,
ignoreNullKeys
) {
child) {

override protected def allowFlush: Boolean = true

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1193,28 +1193,6 @@ class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite {
}
}
}

test("aggregate on join keys can set ignoreNullKeys") {
val s =
"""
|select count(1) from
| (select l_orderkey, max(l_partkey) from lineitem group by l_orderkey) a
|inner join
| (select l_orderkey from lineitem) b
|on a.l_orderkey = b.l_orderkey
|""".stripMargin
withSQLConf(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "true") {
runQueryAndCompare(s) {
df =>
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists {
case a: RegularHashAggregateExecTransformer if a.ignoreNullKeys => true
case a: FlushableHashAggregateExecTransformer if a.ignoreNullKeys => true
case _ => false
})
}
}
}
}

class VeloxAggregateFunctionsFlushSuite extends VeloxAggregateFunctionsSuite {
Expand Down
5 changes: 0 additions & 5 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
preGroupingExprs.insert(preGroupingExprs.begin(), veloxGroupingExprs.begin(), veloxGroupingExprs.end());
}

if (aggRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(aggRel.advanced_extension(), "ignoreNullKeys=")) {
ignoreNullKeys = true;
}

// Get the output names of Aggregation.
std::vector<std::string> aggOutNames;
aggOutNames.reserve(aggRel.measures().size());
Expand Down
1 change: 0 additions & 1 deletion docs/velox-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. |
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. |
| spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan |
| spark.gluten.sql.columnar.backend.velox.propagateIgnoreNullKeys | true | If enabled, we will identify aggregation followed by an inner join on the grouping keys, and mark the ignoreNullKeys flag to true to avoid unnecessary aggregation on null keys. |
| spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. |
| spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. |
| spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ abstract class HashAggregateExecBaseTransformer(
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan,
ignoreNullKeys: Boolean = false)
child: SparkPlan)
extends BaseAggregateExec
with UnaryTransformSupport {

Expand Down Expand Up @@ -87,13 +86,11 @@ abstract class HashAggregateExecBaseTransformer(
s"HashAggregateTransformer(keys=$keyString, " +
s"functions=$functionString, " +
s"isStreamingAgg=$isCapableForStreamingAggregation, " +
s"ignoreNullKeys=$ignoreNullKeys, " +
s"output=$outputString)"
} else {
s"HashAggregateTransformer(keys=$keyString, " +
s"functions=$functionString, " +
s"isStreamingAgg=$isCapableForStreamingAggregation, " +
s"ignoreNullKeys=$ignoreNullKeys)"
s"isStreamingAgg=$isCapableForStreamingAggregation)"
}
}

Expand Down