From 04aace931aa8d3a5a4b82ae0d1b01981be19f913 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sat, 6 Feb 2021 18:13:47 +0900 Subject: [PATCH 1/2] Add adaptive bucketSpec feature --- .../hyperspace/index/IndexConstants.scala | 8 ++ .../index/rules/FilterIndexRule.scala | 9 +- .../hyperspace/util/HyperspaceConf.scala | 8 ++ .../hyperspace/util/LogicalPlanUtils.scala | 131 +++++++++++++++++- .../index/E2EHyperspaceRulesTest.scala | 129 ++++++++++++++++- .../hyperspace/index/HybridScanSuite.scala | 1 + .../index/plananalysis/ExplainTest.scala | 3 +- 7 files changed, 281 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index e4e930358..75ce15662 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -52,6 +52,14 @@ object IndexConstants { val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec" val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false" + // If this config is true, Hyperspace generates a plan with bucketSpec first and check + // the selectivity of the filter query by creating the physical plan in advance. + // If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec. + // Otherwise, newly generated bucketSpec is not used for Filter Rule. + val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED = + "spark.hyperspace.index.filterRule.bucketCheck.enabled" + val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true" + // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. // In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index 7bcb4e58a..d8f66443b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -29,6 +29,7 @@ import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.index.rankers.FilterIndexRanker import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} +import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector /** * FilterIndex rule looks for opportunities in a logical plan to replace @@ -56,6 +57,11 @@ object FilterIndexRule findCoveringIndexes(filter, outputColumns, filterColumns) FilterIndexRanker.rank(spark, filter, candidateIndexes) match { case Some(index) => + val useBucketSpec = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) { + BucketSelector(plan, index.bucketSpec).isDefined + } else { + HyperspaceConf.useBucketSpecForFilterRule(spark) + } // As FilterIndexRule is not intended to support bucketed scan, we set // useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause // unnecessary shuffle for appended data to apply BucketUnion for merging data. @@ -64,7 +70,7 @@ object FilterIndexRule spark, index, originalPlan, - useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark), + useBucketSpec = useBucketSpec, useBucketUnionForAppended = false) logEvent( HyperspaceIndexUsageEvent( @@ -136,7 +142,6 @@ object FilterIndexRule * @param filterColumns List of columns in filter predicate. * @param indexedColumns List of indexed columns (e.g. from an index being checked) * @param includedColumns List of included columns (e.g. from an index being checked) - * @param fileFormat FileFormat for input relation in original logical plan. * @return 'true' if * 1. Index fully covers output and filter columns, and * 2. Filter predicate contains first column in index's 'indexed' columns. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index f587544b3..b899d0438 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -68,6 +68,14 @@ object HyperspaceConf { .toBoolean } + def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED, + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT) + .toBoolean + } + def numBucketsForIndex(spark: SparkSession): Int = { getConfStringWithMultipleKeys( spark, diff --git a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala index 0487fd0c5..b07e4cc4e 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala @@ -16,8 +16,18 @@ package com.microsoft.hyperspace.util +import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{DoubleType, FloatType} +import org.apache.spark.util.collection.BitSet + +import com.microsoft.hyperspace.index.IndexLogEntry /** * Utility functions for logical plan. @@ -35,4 +45,123 @@ object LogicalPlanUtils { case _ => false } } + + /** + * BucketSelector returns the selected buckets if bucket pruning is applicable for the given + * query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark. + */ + object BucketSelector { + // should prune buckets iff num buckets is greater than 1 and there is only one bucket column + private def shouldPruneBuckets(spec: BucketSpec): Boolean = { + spec.bucketColumnNames.length == 1 && spec.numBuckets > 1 + } + + private def getExpressionBuckets( + expr: Expression, + bucketColumnName: String, + numBuckets: Int): BitSet = { + + def getBucketNumber(attr: Attribute, v: Any): Int = { + BucketingUtils.getBucketIdFromValue(attr, numBuckets, v) + } + + def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + iter + .map(v => getBucketNumber(attr, v)) + .foreach(bucketNum => matchedBuckets.set(bucketNum)) + matchedBuckets + } + + def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.set(getBucketNumber(attr, v)) + matchedBuckets + } + + expr match { + case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => + getBucketSetFromValue(a, v) + case expressions.In(a: Attribute, list) + if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => + getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow))) + case expressions.InSet(a: Attribute, hset) + if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => + // NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372 + // Bucket pruning is not applied for InSet without the fix. + getBucketSetFromIterable(a, hset) + case expressions.IsNull(a: Attribute) if a.name == bucketColumnName => + getBucketSetFromValue(a, null) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == FloatType => + getBucketSetFromValue(a, Float.NaN) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == DoubleType => + getBucketSetFromValue(a, Double.NaN) + case expressions.And(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) & + getExpressionBuckets(right, bucketColumnName, numBuckets) + case expressions.Or(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) | + getExpressionBuckets(right, bucketColumnName, numBuckets) + case _ => + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.setUntil(numBuckets) + matchedBuckets + } + } + + private def genBucketSet( + normalizedFilters: Seq[Expression], + bucketSpec: BucketSpec): Option[BitSet] = { + if (normalizedFilters.isEmpty) { + return None + } + + val bucketColumnName = bucketSpec.bucketColumnNames.head + val numBuckets = bucketSpec.numBuckets + + val normalizedFiltersAndExpr = normalizedFilters + .reduce(expressions.And) + val matchedBuckets = + getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets) + + val numBucketsSelected = matchedBuckets.cardinality() + + // None means all the buckets need to be scanned + if (numBucketsSelected == numBuckets) { + None + } else { + Some(matchedBuckets) + } + } + + def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match { + case PhysicalOperation( + projects, + filters, + l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we do not need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(l.output.find(_.semanticEquals(a)).get.name) + } + } + // subquery expressions are filtered out because they can't be used to prune buckets or + // pushed down as data filters, yet they would be executed + val normalizedFiltersWithoutSubqueries = + normalizedFilters.filterNot(SubqueryExpression.hasSubquery) + + val bucketSet = if (shouldPruneBuckets(bucketSpec)) { + genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec) + } else { + None + } + bucketSet + case _ => None + } + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index 69b1a4e47..02729267a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -20,8 +20,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.SortExec -import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec} +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils} @@ -30,6 +30,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags._ import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} +import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector import com.microsoft.hyperspace.util.PathUtils class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { @@ -165,7 +166,8 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { def query(): DataFrame = spark.sql("SELECT * from t where c4 = 1") // Verify no Project node is present in the query plan, as a result of using SELECT * - assert(query().queryExecution.optimizedPlan.collect { case p: Project => p }.isEmpty) + val queryPlan = query().queryExecution.optimizedPlan + assert(queryPlan.collect { case p: Project => p }.isEmpty) verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) } @@ -581,6 +583,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } } + test("Verify adaptive bucket spec application for FilterIndexRule.") { + withTempPathAsString { testPath => + // Setup. Create data. + val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4")) + import spark.implicits._ + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(10) + .write + .json(testPath) + val df = spark.read.json(testPath) + + // Create index. + hyperspace.createIndex(df, indexConfig) + spark.enableHyperspace() + + def query(): DataFrame = + df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3") + + withIndex("index") { + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") { + // Check bucketSpec is applied. + val execPlan = query.queryExecution.executedPlan + val foundPrunedBuckets = execPlan.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) => + optionalBucketSet.get.cardinality() + } + assert(foundPrunedBuckets.length == 1) + assert(foundPrunedBuckets.head == 3) + + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0))) + } + + // TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator. + // As indexes are bucketed, supporting bucket pruning can improve the performance of + // queries with high selectivity. Will add a new FileSourceScanStrategy soon. + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") { + val execPlan = query().queryExecution.executedPlan + // Check bucketSpec is not applied. + val foundPrunedBuckets = execPlan.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if (optionalBucketSet.isDefined) => + optionalBucketSet.get.cardinality() + } + assert(foundPrunedBuckets.isEmpty) + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0))) + } + + // Append to original data. + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(3) + .write + .mode("append") + .json(testPath) + + val df2 = spark.read.json(testPath) + val inputFiles = df.inputFiles + val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_)) + def query2(): DataFrame = { + df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3") + } + + withSQLConf(TestConfig.HybridScanEnabled: _*) { + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") { + // Check bucketSpec is applied. + val execPlan = query2().queryExecution.executedPlan + val foundPrunedBuckets = execPlan.collect { + case _ @UnionExec(children) => + val p = children.head.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if optionalBucketSet.isDefined => + optionalBucketSet.get.cardinality() + } + p.head + } + assert(foundPrunedBuckets.length == 1) + assert(foundPrunedBuckets.head == 3) + verifyIndexUsage( + query2, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq) + } + + withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") { + val execPlan = query2().queryExecution.executedPlan + // Check bucketSpec is not applied. + val foundPrunedBuckets = execPlan.collect { + case _ @ UnionExec(children) => + val b = children.head.collect { + case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) + if optionalBucketSet.isDefined => + optionalBucketSet.get.cardinality() + } + assert(b.isEmpty) + true + } + assert(foundPrunedBuckets.length == 1) + verifyIndexUsage( + query2, + getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq) + } + } + } + } + } + test( "Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " + "gets deleted and some appended to source data.") { @@ -733,7 +841,7 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { withIndex(indexConfig.indexName) { spark.enableHyperspace() val df = spark.read.parquet(testPath) - def query(df: DataFrame): DataFrame = df.filter("c3 == 'facebook'").select("c3", "c4") + def query(df: DataFrame): DataFrame = df.filter("c3 != 'facebook'").select("c3", "c4") val indexManager = Hyperspace .getContext(spark) @@ -980,6 +1088,19 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { }.flatten } + private def getIndexFilesPathWithBucketSelector( + plan: LogicalPlan, + indexName: String, + versions: Seq[Int] = Seq(0)): Seq[Path] = { + val paths = getIndexFilesPath(indexName, versions) + BucketSelector(plan, TestUtils.latestIndexLogEntry(systemPath, indexName).bucketSpec) match { + case Some(buckets) => + paths.filter(f => buckets.get(BucketingUtils.getBucketId(f.getName).get)) + case None => + paths + } + } + private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = { versions.flatMap { v => Content diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala index 3e2771882..84ebbe31c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala @@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { withSQLConf( TestConfig.HybridScanEnabledAppendOnly :+ + IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+ IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) { val filter = filterQuery val planWithHybridScan = filter.queryExecution.optimizedPlan diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index a62b78428..76e553b06 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -485,7 +485,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append("Batched: true, Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") - .append("ReadSchema: struct" + displayMode.highlightTag.close) + .append("ReadSchema: struct, SelectedBucketsCount: 1 out of 200") + .append(displayMode.highlightTag.close) .append(displayMode.newLine) .append(displayMode.newLine) .append("=============================================================") From 7baa1a104e5b40b2414cfa18c29169f355be213a Mon Sep 17 00:00:00 2001 From: sezruby Date: Tue, 9 Feb 2021 10:59:15 -0800 Subject: [PATCH 2/2] Review commit --- .../hyperspace/index/IndexConstants.scala | 13 +- .../index/rules/FilterIndexRule.scala | 12 +- .../hyperspace/util/BucketSelector.scala | 145 ++++++++++++++++++ .../hyperspace/util/HyperspaceConf.scala | 8 +- .../hyperspace/util/LogicalPlanUtils.scala | 131 +--------------- .../index/E2EHyperspaceRulesTest.scala | 3 +- .../hyperspace/index/HybridScanSuite.scala | 2 +- 7 files changed, 166 insertions(+), 148 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/util/BucketSelector.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 75ce15662..a3cfd9ecd 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -52,13 +52,12 @@ object IndexConstants { val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec" val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false" - // If this config is true, Hyperspace generates a plan with bucketSpec first and check - // the selectivity of the filter query by creating the physical plan in advance. - // If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec. - // Otherwise, newly generated bucketSpec is not used for Filter Rule. - val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED = - "spark.hyperspace.index.filterRule.bucketCheck.enabled" - val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true" + // If INDEX_FILTER_RULE_USE_BUCKET_SPEC config is false, Hyperspace applies bucketing if + // the given filter query is applicable for bucket pruning AND the number of selected buckets + // is under this threshold. + val INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD = + "spark.hyperspace.index.filterRule.autoBucketing.threshold" + val INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD_DEFAULT = "0.8" // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index d8f66443b..c876d6b74 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -29,7 +29,7 @@ import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.index.rankers.FilterIndexRanker import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} -import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector +import com.microsoft.hyperspace.util.BucketSelector /** * FilterIndex rule looks for opportunities in a logical plan to replace @@ -57,10 +57,14 @@ object FilterIndexRule findCoveringIndexes(filter, outputColumns, filterColumns) FilterIndexRanker.rank(spark, filter, candidateIndexes) match { case Some(index) => - val useBucketSpec = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) { - BucketSelector(plan, index.bucketSpec).isDefined + val useBucketSpec = if (HyperspaceConf.useBucketSpecForFilterRule(spark)) { + true } else { - HyperspaceConf.useBucketSpecForFilterRule(spark) + // Check bucket pruning is applicable and threshold condition. + val selectedBuckets = BucketSelector(plan, index.bucketSpec).map(_.cardinality()) + selectedBuckets.isDefined && + selectedBuckets.get <= index.bucketSpec.numBuckets * HyperspaceConf + .prunedBucketRatioToAutoEnableBucketRead(spark) } // As FilterIndexRule is not intended to support bucketed scan, we set // useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause diff --git a/src/main/scala/com/microsoft/hyperspace/util/BucketSelector.scala b/src/main/scala/com/microsoft/hyperspace/util/BucketSelector.scala new file mode 100644 index 000000000..57c1249bc --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/util/BucketSelector.scala @@ -0,0 +1,145 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.util + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{DoubleType, FloatType} +import org.apache.spark.util.collection.BitSet + +/** + * BucketSelector returns the selected buckets if bucket pruning is applicable for the given + * query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark. + */ +object BucketSelector { + // should prune buckets iff num buckets is greater than 1 and there is only one bucket column + private def shouldPruneBuckets(spec: BucketSpec): Boolean = { + spec.bucketColumnNames.length == 1 && spec.numBuckets > 1 + } + + private def getExpressionBuckets( + expr: Expression, + bucketColumnName: String, + numBuckets: Int): BitSet = { + + def getBucketNumber(attr: Attribute, v: Any): Int = { + BucketingUtils.getBucketIdFromValue(attr, numBuckets, v) + } + + def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + iter + .map(v => getBucketNumber(attr, v)) + .foreach(bucketNum => matchedBuckets.set(bucketNum)) + matchedBuckets + } + + def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = { + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.set(getBucketNumber(attr, v)) + matchedBuckets + } + + expr match { + case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => + getBucketSetFromValue(a, v) + case expressions.In(a: Attribute, list) + if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => + getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow))) + case expressions.InSet(a: Attribute, hset) + if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => + // NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372 + // Bucket pruning is not applied for InSet without the fix. + getBucketSetFromIterable(a, hset) + case expressions.IsNull(a: Attribute) if a.name == bucketColumnName => + getBucketSetFromValue(a, null) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == FloatType => + getBucketSetFromValue(a, Float.NaN) + case expressions.IsNaN(a: Attribute) + if a.name == bucketColumnName && a.dataType == DoubleType => + getBucketSetFromValue(a, Double.NaN) + case expressions.And(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) & + getExpressionBuckets(right, bucketColumnName, numBuckets) + case expressions.Or(left, right) => + getExpressionBuckets(left, bucketColumnName, numBuckets) | + getExpressionBuckets(right, bucketColumnName, numBuckets) + case _ => + val matchedBuckets = new BitSet(numBuckets) + matchedBuckets.setUntil(numBuckets) + matchedBuckets + } + } + + private def genBucketSet( + normalizedFilters: Seq[Expression], + bucketSpec: BucketSpec): Option[BitSet] = { + if (normalizedFilters.isEmpty) { + return None + } + + val bucketColumnName = bucketSpec.bucketColumnNames.head + val numBuckets = bucketSpec.numBuckets + + val normalizedFiltersAndExpr = normalizedFilters + .reduce(expressions.And) + val matchedBuckets = + getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets) + + val numBucketsSelected = matchedBuckets.cardinality() + + // None means all the buckets need to be scanned + if (numBucketsSelected == numBuckets) { + None + } else { + Some(matchedBuckets) + } + } + + def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match { + case PhysicalOperation( + projects, + filters, + l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we do not need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(l.output.find(_.semanticEquals(a)).get.name) + } + } + // subquery expressions are filtered out because they can't be used to prune buckets or + // pushed down as data filters, yet they would be executed + val normalizedFiltersWithoutSubqueries = + normalizedFilters.filterNot(SubqueryExpression.hasSubquery) + + val bucketSet = if (shouldPruneBuckets(bucketSpec)) { + genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec) + } else { + None + } + bucketSet + case _ => None + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index b899d0438..07f6eb7c1 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -68,12 +68,12 @@ object HyperspaceConf { .toBoolean } - def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = { + def prunedBucketRatioToAutoEnableBucketRead(spark: SparkSession): Double = { spark.conf .get( - IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED, - IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT) - .toBoolean + IndexConstants.INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD, + IndexConstants.INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD_DEFAULT) + .toDouble } def numBucketsForIndex(spark: SparkSession): Int = { diff --git a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala index b07e4cc4e..0487fd0c5 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/LogicalPlanUtils.scala @@ -16,18 +16,8 @@ package com.microsoft.hyperspace.util -import org.apache.hadoop.fs.Path -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, Literal, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.{BucketingUtils, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.types.{DoubleType, FloatType} -import org.apache.spark.util.collection.BitSet - -import com.microsoft.hyperspace.index.IndexLogEntry +import org.apache.spark.sql.execution.datasources.LogicalRelation /** * Utility functions for logical plan. @@ -45,123 +35,4 @@ object LogicalPlanUtils { case _ => false } } - - /** - * BucketSelector returns the selected buckets if bucket pruning is applicable for the given - * query plan. The logic is extracted from [[FileSourceScanStrategy]] in Spark. - */ - object BucketSelector { - // should prune buckets iff num buckets is greater than 1 and there is only one bucket column - private def shouldPruneBuckets(spec: BucketSpec): Boolean = { - spec.bucketColumnNames.length == 1 && spec.numBuckets > 1 - } - - private def getExpressionBuckets( - expr: Expression, - bucketColumnName: String, - numBuckets: Int): BitSet = { - - def getBucketNumber(attr: Attribute, v: Any): Int = { - BucketingUtils.getBucketIdFromValue(attr, numBuckets, v) - } - - def getBucketSetFromIterable(attr: Attribute, iter: Iterable[Any]): BitSet = { - val matchedBuckets = new BitSet(numBuckets) - iter - .map(v => getBucketNumber(attr, v)) - .foreach(bucketNum => matchedBuckets.set(bucketNum)) - matchedBuckets - } - - def getBucketSetFromValue(attr: Attribute, v: Any): BitSet = { - val matchedBuckets = new BitSet(numBuckets) - matchedBuckets.set(getBucketNumber(attr, v)) - matchedBuckets - } - - expr match { - case expressions.Equality(a: Attribute, Literal(v, _)) if a.name == bucketColumnName => - getBucketSetFromValue(a, v) - case expressions.In(a: Attribute, list) - if list.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => - getBucketSetFromIterable(a, list.map(e => e.eval(EmptyRow))) - case expressions.InSet(a: Attribute, hset) - if hset.forall(_.isInstanceOf[Literal]) && a.name == bucketColumnName => - // NOTE: Spark bug - https://issues.apache.org/jira/browse/SPARK-33372 - // Bucket pruning is not applied for InSet without the fix. - getBucketSetFromIterable(a, hset) - case expressions.IsNull(a: Attribute) if a.name == bucketColumnName => - getBucketSetFromValue(a, null) - case expressions.IsNaN(a: Attribute) - if a.name == bucketColumnName && a.dataType == FloatType => - getBucketSetFromValue(a, Float.NaN) - case expressions.IsNaN(a: Attribute) - if a.name == bucketColumnName && a.dataType == DoubleType => - getBucketSetFromValue(a, Double.NaN) - case expressions.And(left, right) => - getExpressionBuckets(left, bucketColumnName, numBuckets) & - getExpressionBuckets(right, bucketColumnName, numBuckets) - case expressions.Or(left, right) => - getExpressionBuckets(left, bucketColumnName, numBuckets) | - getExpressionBuckets(right, bucketColumnName, numBuckets) - case _ => - val matchedBuckets = new BitSet(numBuckets) - matchedBuckets.setUntil(numBuckets) - matchedBuckets - } - } - - private def genBucketSet( - normalizedFilters: Seq[Expression], - bucketSpec: BucketSpec): Option[BitSet] = { - if (normalizedFilters.isEmpty) { - return None - } - - val bucketColumnName = bucketSpec.bucketColumnNames.head - val numBuckets = bucketSpec.numBuckets - - val normalizedFiltersAndExpr = normalizedFilters - .reduce(expressions.And) - val matchedBuckets = - getExpressionBuckets(normalizedFiltersAndExpr, bucketColumnName, numBuckets) - - val numBucketsSelected = matchedBuckets.cardinality() - - // None means all the buckets need to be scanned - if (numBucketsSelected == numBuckets) { - None - } else { - Some(matchedBuckets) - } - } - - def apply(plan: LogicalPlan, bucketSpec: BucketSpec): Option[BitSet] = plan match { - case PhysicalOperation( - projects, - filters, - l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => - // The attribute name of predicate could be different than the one in schema in case of - // case insensitive, we should change them to match the one in schema, so we do not need to - // worry about case sensitivity anymore. - val normalizedFilters = filters.map { e => - e transform { - case a: AttributeReference => - a.withName(l.output.find(_.semanticEquals(a)).get.name) - } - } - // subquery expressions are filtered out because they can't be used to prune buckets or - // pushed down as data filters, yet they would be executed - val normalizedFiltersWithoutSubqueries = - normalizedFilters.filterNot(SubqueryExpression.hasSubquery) - - val bucketSet = if (shouldPruneBuckets(bucketSpec)) { - genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec) - } else { - None - } - bucketSet - case _ => None - } - } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index 02729267a..88ca15498 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -28,9 +28,8 @@ import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK} import com.microsoft.hyperspace.index.IndexLogEntryTags._ import com.microsoft.hyperspace.index.execution.BucketUnionStrategy -import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} -import com.microsoft.hyperspace.util.LogicalPlanUtils.BucketSelector +import com.microsoft.hyperspace.util.BucketSelector import com.microsoft.hyperspace.util.PathUtils class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala index 84ebbe31c..cba259b39 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala @@ -502,7 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { withSQLConf( TestConfig.HybridScanEnabledAppendOnly :+ - IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+ + IndexConstants.INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD -> "0.0" :+ IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) { val filter = filterQuery val planWithHybridScan = filter.queryExecution.optimizedPlan