diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 68b6fd98a..e4e930358 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -47,6 +47,11 @@ object IndexConstants { "spark.hyperspace.index.hybridscan.maxAppendedRatio" val INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD_DEFAULT = "0.3" + // Config used to set bucketSpec for Filter rule. If bucketSpec is used, Spark can prune + // not applicable buckets, so it can read less files in case of a highly selective query. + val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec" + val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false" + // Identifier injected to HadoopFsRelation as an option if an index is applied. // Currently, the identifier is added to options field of HadoopFsRelation. // In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index fa026674c..7bcb4e58a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -28,7 +28,7 @@ import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.index.rankers.FilterIndexRanker import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} -import com.microsoft.hyperspace.util.ResolverUtils +import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} /** * FilterIndex rule looks for opportunities in a logical plan to replace @@ -56,13 +56,16 @@ object FilterIndexRule findCoveringIndexes(filter, outputColumns, filterColumns) FilterIndexRanker.rank(spark, filter, candidateIndexes) match { case Some(index) => - // Do not set BucketSpec to avoid limiting Spark's degree of parallelism. + // As FilterIndexRule is not intended to support bucketed scan, we set + // useBucketUnionForAppended as false. If it's true, Hybrid Scan can cause + // unnecessary shuffle for appended data to apply BucketUnion for merging data. val transformedPlan = RuleUtils.transformPlanToUseIndex( spark, index, originalPlan, - useBucketSpec = false) + useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark), + useBucketUnionForAppended = false) logEvent( HyperspaceIndexUsageEvent( AppInfo( diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala index 5d689e5bd..635434724 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala @@ -63,10 +63,18 @@ object JoinIndexRule val updatedPlan = join .copy( - left = - RuleUtils.transformPlanToUseIndex(spark, lIndex, l, useBucketSpec = true), - right = - RuleUtils.transformPlanToUseIndex(spark, rIndex, r, useBucketSpec = true)) + left = RuleUtils.transformPlanToUseIndex( + spark, + lIndex, + l, + useBucketSpec = true, + useBucketUnionForAppended = true), + right = RuleUtils.transformPlanToUseIndex( + spark, + rIndex, + r, + useBucketSpec = true, + useBucketUnionForAppended = true)) logEvent( HyperspaceIndexUsageEvent( @@ -325,11 +333,7 @@ object JoinIndexRule compatibleIndexPairs.map( indexPairs => JoinIndexRanker - .rank( - spark, - leftRel, - rightRel, - indexPairs) + .rank(spark, leftRel, rightRel, indexPairs) .head) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 65dcbb398..e7547987e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -202,13 +202,15 @@ object RuleUtils { * @param index Index used in transformation of plan. * @param plan Current logical plan. * @param useBucketSpec Option whether to use BucketSpec for reading index data. + * @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data. * @return Transformed plan. */ def transformPlanToUseIndex( spark: SparkSession, index: IndexLogEntry, plan: LogicalPlan, - useBucketSpec: Boolean): LogicalPlan = { + useBucketSpec: Boolean, + useBucketUnionForAppended: Boolean): LogicalPlan = { // Check pre-requisite. val logicalRelation = getLogicalRelation(plan) assert(logicalRelation.isDefined) @@ -225,7 +227,7 @@ object RuleUtils { lazy val isSourceUpdated = index.hasSourceUpdate val transformed = if (hybridScanRequired || isSourceUpdated) { - transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) + transformPlanToUseHybridScan(spark, index, plan, useBucketSpec, useBucketUnionForAppended) } else { transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) } @@ -302,13 +304,15 @@ object RuleUtils { * @param index Index used in transformation of plan. * @param plan Current logical plan. * @param useBucketSpec Option whether to use BucketSpec for reading index data. + * @param useBucketUnionForAppended Option whether to use BucketUnion to merge appended data. * @return Transformed logical plan that leverages an index and merges appended data. */ private def transformPlanToUseHybridScan( spark: SparkSession, index: IndexLogEntry, plan: LogicalPlan, - useBucketSpec: Boolean): LogicalPlan = { + useBucketSpec: Boolean, + useBucketUnionForAppended: Boolean): LogicalPlan = { var unhandledAppendedFiles: Seq[Path] = Nil // Get transformed plan with index data and appended files if applicable. @@ -426,7 +430,7 @@ object RuleUtils { val planForAppended = transformPlanToReadAppendedFiles(spark, index, plan, unhandledAppendedFiles) - if (useBucketSpec) { + if (useBucketUnionForAppended && useBucketSpec) { // If Bucketing information of the index is used to read the index data, we need to // shuffle the appended data in the same way to correctly merge with bucketed index data. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 7fa6778bb..f587544b3 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -60,6 +60,14 @@ object HyperspaceConf { .toDouble } + def useBucketSpecForFilterRule(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC, + IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT) + .toBoolean + } + def numBucketsForIndex(spark: SparkSession): Int = { getConfStringWithMultipleKeys( spark, diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanForDeltaLakeTest.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanForDeltaLakeTest.scala index 4d37647d1..9ff3432a7 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanForDeltaLakeTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanForDeltaLakeTest.scala @@ -81,7 +81,7 @@ class HybridScanForDeltaLakeTest extends HybridScanSuite { test( "Append-only: filter rule & parquet format, " + - "index relation should include appended file paths") { + "index relation should include appended file paths.") { // This flag is for testing plan transformation if appended files could be load with index // data scan node. Currently, it's applied for a very specific case: FilterIndexRule, // Parquet source format, no partitioning, no deleted files. diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanForNonPartitionedDataTest.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanForNonPartitionedDataTest.scala index bf9962ee4..4a60d3065 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanForNonPartitionedDataTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanForNonPartitionedDataTest.scala @@ -36,7 +36,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite { test( "Append-only: filter rule & parquet format, " + - "index relation should include appended file paths") { + "index relation should include appended file paths.") { // This flag is for testing plan transformation if appended files could be load with index // data scan node. Currently, it's applied for a very specific case: FilterIndexRule, // Parquet source format, no partitioning, no deleted files. @@ -83,7 +83,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite { } } - test("Delete-only: Hybrid Scan for delete support doesn't work without lineage column") { + test("Delete-only: Hybrid Scan for delete support doesn't work without lineage column.") { val indexConfig = IndexConfig("index_ParquetDelete2", Seq("clicks"), Seq("query")) Seq(("indexWithoutLineage", "false", false), ("indexWithLineage", "true", true)) foreach { case (indexName, lineageColumnConfig, transformationExpected) => @@ -119,7 +119,7 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite { } } - test("Delete-only: filter rule, number of delete files threshold") { + test("Delete-only: filter rule, number of delete files threshold.") { withTempPathAsString { testPath => val indexName = "IndexDeleteCntTest" withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { @@ -140,8 +140,6 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite { val afterDeleteSize = FileUtils.getDirectorySize(new Path(testPath)) val deletedRatio = 1 - (afterDeleteSize / sourceSize.toFloat) - // scalastyle:off - println(deletedRatio) withSQLConf(TestConfig.HybridScanEnabled: _*) { withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> @@ -159,5 +157,4 @@ class HybridScanForNonPartitionedDataTest extends HybridScanSuite { } } } - } diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala index 2e97ffa7b..3e2771882 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala @@ -370,7 +370,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { test( "Append-only: join rule, appended data should be shuffled with indexed columns " + - "and merged by BucketUnion") { + "and merged by BucketUnion.") { withTempPathAsString { testPath => val appendPath1 = testPath + "/append1" val appendPath2 = testPath + "/append2" @@ -400,11 +400,12 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { val basePlan = baseQuery.queryExecution.optimizedPlan withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") { - val join = joinQuery() - checkAnswer(join, baseQuery) - assert(basePlan.equals(join.queryExecution.optimizedPlan)) + val join = joinQuery() + checkAnswer(join, baseQuery) + assert(basePlan.equals(join.queryExecution.optimizedPlan)) } withSQLConf(TestConfig.HybridScanEnabled: _*) { @@ -428,7 +429,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { test( "Append-only: filter rule and non-parquet format," + - "appended data should be shuffled and merged by Union") { + "appended data should be shuffled and merged by Union.") { // Note: for delta lake, this test is also eligible as the dataset is partitioned. withTempPathAsString { testPath => val (appendedFiles, deletedFiles) = setupIndexAndChangeData( @@ -460,12 +461,76 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { appendedFiles, deletedFiles, Seq(" <= 2000")) + + // Check bucketSpec is not used. + val bucketSpec = planWithHybridScan collect { + case LogicalRelation(HadoopFsRelation(_, _, _, bucketSpec, _, _), _, _, _) => + bucketSpec + } + assert(bucketSpec.length == 2) + + // bucketSpec.head is for the index plan, bucketSpec.last is for the plan + // for appended files. + assert(bucketSpec.head.isEmpty && bucketSpec.last.isEmpty) + + checkAnswer(baseQuery, filter) + } + } + } + + test( + "Append-only: filter rule and non-parquet format," + + "appended data should be shuffled and merged by Union even with bucketSpec.") { + withTempPathAsString { testPath => + val (appendedFiles, deletedFiles) = setupIndexAndChangeData( + fileFormat2, + testPath, + indexConfig1.copy(indexName = "index_Format2Append"), + appendCnt = 1, + deleteCnt = 0) + + val df = spark.read.format(fileFormat2).load(testPath) + def filterQuery: DataFrame = df.filter(df("clicks") <= 2000).select(df("query")) + + val baseQuery = filterQuery + val basePlan = baseQuery.queryExecution.optimizedPlan + + withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") { + val filter = filterQuery + assert(basePlan.equals(filter.queryExecution.optimizedPlan)) + } + + withSQLConf( + TestConfig.HybridScanEnabledAppendOnly :+ + IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) { + val filter = filterQuery + val planWithHybridScan = filter.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHybridScan)) + + checkFilterIndexHybridScanUnion( + planWithHybridScan, + "index_Format2Append", + appendedFiles, + deletedFiles, + Seq(" <= 2000")) + + // Check bucketSpec is used. + val bucketSpec = planWithHybridScan collect { + case LogicalRelation(HadoopFsRelation(_, _, _, bucketSpec, _, _), _, _, _) => + bucketSpec + } + assert(bucketSpec.length == 2) + // bucketSpec.head is for the index plan, bucketSpec.last is for the plan + // for appended files. + assert(bucketSpec.head.isDefined && bucketSpec.last.isEmpty) + assert(bucketSpec.head.get.bucketColumnNames.toSet === indexConfig1.indexedColumns.toSet) + checkAnswer(baseQuery, filter) } } } - test("Delete-only: index relation should have additional filter for deleted files") { + test("Delete-only: index relation should have additional filter for deleted files.") { val testSet = Seq(("index_ParquetDelete", fileFormat), ("index_JsonDelete", fileFormat2)) testSet foreach { case (indexName, dataFormat) => @@ -640,8 +705,9 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { assert(basePlan.equals(join.queryExecution.optimizedPlan)) } - withSQLConf(TestConfig.HybridScanEnabled :+ - "spark.sql.optimizer.inSetConversionThreshold" -> "1": _*) { + withSQLConf( + TestConfig.HybridScanEnabled :+ + "spark.sql.optimizer.inSetConversionThreshold" -> "1": _*) { // Changed inSetConversionThreshould to check InSet optimization. val join = joinQuery() val planWithHybridScan = join.queryExecution.optimizedPlan