From 6ef50cc2c3dc7919408b87dd24c22c56086a0614 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Thu, 11 Mar 2021 13:42:26 +0200 Subject: [PATCH 1/5] Support filter with indexes on nested fields --- .../index/rules/FilterIndexRule.scala | 69 ++++-- .../hyperspace/index/rules/PlanUtils.scala | 174 +++++++++++++++ .../hyperspace/index/rules/RuleUtils.scala | 91 +++++++- .../FileBasedSourceProviderManager.scala | 46 +++- .../hyperspace/util/ResolverUtils.scala | 208 ++++++++++-------- 5 files changed, 471 insertions(+), 117 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index 23b1f5838..023bf84cc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -17,8 +17,8 @@ package com.microsoft.hyperspace.index.rules import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.analysis.CleanupAliases -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, Resolver, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule @@ -26,6 +26,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.index.rankers.FilterIndexRanker +import com.microsoft.hyperspace.index.rules.PlanUtils._ import com.microsoft.hyperspace.index.sources.FileBasedRelation import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent} import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} @@ -53,7 +54,7 @@ object FilterIndexRule case ExtractFilterNode(originalPlan, filter, outputColumns, filterColumns) => try { val candidateIndexes = - findCoveringIndexes(filter, outputColumns, filterColumns) + findCoveringIndexes(filter, outputColumns, filterColumns, plan) FilterIndexRanker.rank(spark, filter, candidateIndexes) match { case Some(index) => // As FilterIndexRule is not intended to support bucketed scan, we set @@ -94,12 +95,14 @@ object FilterIndexRule * @param filter Filter node in the subplan that is being optimized. * @param outputColumns List of output columns in subplan. * @param filterColumns List of columns in filter predicate. + * @param plan The Logical Plan that contains information about the field. * @return List of available candidate indexes on fsRelation for the given columns. */ private def findCoveringIndexes( filter: Filter, outputColumns: Seq[String], - filterColumns: Seq[String]): Seq[IndexLogEntry] = { + filterColumns: Seq[String], + plan: LogicalPlan): Seq[IndexLogEntry] = { RuleUtils.getRelation(spark, filter) match { case Some(r) => val indexManager = Hyperspace @@ -111,20 +114,41 @@ object FilterIndexRule // See https://github.com/microsoft/hyperspace/issues/65 val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) - val candidateIndexes = allIndexes.filter { index => - indexCoversPlan( - outputColumns, - filterColumns, - index.indexedColumns, - index.includedColumns) + def resolveWithChildren(fieldName: String, plan: LogicalPlan, resolver: Resolver) = { + plan.resolveChildren(UnresolvedAttribute.parseAttributeName(fieldName), resolver) } - // Get candidate via file-level metadata validation. This is performed after pruning - // by column schema, as this might be expensive when there are numerous files in the - // relation or many indexes to be checked. - RuleUtils.getCandidateIndexes(spark, candidateIndexes, r) - - case None => Nil // There is zero or more than one supported relations in Filter's sub-plan. + val resolvedOutputColumnsOpt = ResolverUtils.resolve( + spark, + outputColumns, + plan, + resolveWithChildren, + throwIfNotInSchema = false) + val resolvedFilterColumnsOpt = ResolverUtils.resolve( + spark, + filterColumns, + plan, + resolveWithChildren, + throwIfNotInSchema = false) + + (resolvedOutputColumnsOpt, resolvedFilterColumnsOpt) match { + case (Some(resolvedOutputColumns), Some(resolvedFilterColumns)) => + val candidateIndexes = allIndexes.filter { index => + indexCoversPlan( + resolvedOutputColumns.map(_.name), + resolvedFilterColumns.map(_.name), + index.indexedColumns, + index.includedColumns) + } + + // Get candidate via file-level metadata validation. This is performed after pruning + // by column schema, as this might be expensive when there are numerous files in the + // relation or many indexes to be checked. + RuleUtils.getCandidateIndexes(spark, candidateIndexes, r) + + case _ => Nil + } + case _ => Nil // There is zero or more than one supported relations in Filter's sub-plan. } } @@ -136,7 +160,6 @@ object FilterIndexRule * @param filterColumns List of columns in filter predicate. * @param indexedColumns List of indexed columns (e.g. from an index being checked) * @param includedColumns List of included columns (e.g. from an index being checked) - * @param fileFormat FileFormat for input relation in original logical plan. * @return 'true' if * 1. Index fully covers output and filter columns, and * 2. Filter predicate contains first column in index's 'indexed' columns. @@ -168,9 +191,17 @@ object ExtractFilterNode { val projectColumnNames = CleanupAliases(project) .asInstanceOf[Project] .projectList - .map(_.references.map(_.asInstanceOf[AttributeReference].name)) + .map(extractNamesFromExpression) .flatMap(_.toSeq) - val filterColumnNames = condition.references.map(_.name).toSeq + val filterColumnNames = extractNamesFromExpression(condition).toSeq + .sortBy(-_.length) + .foldLeft(Seq.empty[String]) { (acc, e) => + if (!acc.exists(i => i.startsWith(e))) { + acc :+ e + } else { + acc + } + } Some(project, filter, projectColumnNames, filterColumnNames) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala new file mode 100644 index 000000000..ce17c5605 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala @@ -0,0 +1,174 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import scala.util.Try + +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetStructField} +import org.apache.spark.sql.types.{DataType, StructType} + +import com.microsoft.hyperspace.util.ResolverUtils + +object PlanUtils { + + /** + * The method extract field names from a Spark Catalyst [[Expression]]. + * + * @param exp The Spark Catalyst expression from which to extract names. + * @return A set of distinct field names. + */ + def extractNamesFromExpression(exp: Expression): Set[String] = { + exp match { + case AttributeReference(name, _, _, _) => + Set(s"$name") + case Alias(child, _) => + extractNamesFromExpression(child) + case otherExp => + otherExp.containsChild.flatMap { + case g: GetStructField => + Some(s"${getChildNameFromStruct(g)}") + case e: Expression => + Some(extractNamesFromExpression(e).filter(_.nonEmpty).mkString(".")) + case _ => + None + } + } + } + + /** + * Given a [[GetStructField]] expression for a nested field (aka a struct) + * the method will extract the full field `.` (dot) separated name. + * + * @param field The [[GetStructField]] field from which we want to extract + * the name. + * @return A field name `.` (dot) separated if nested. + */ + def getChildNameFromStruct(field: GetStructField): String = { + field.child match { + case f: GetStructField => + s"${getChildNameFromStruct(f)}.${field.name.get}" + case a: AttributeReference => + s"${a.name}.${field.name.get}" + case _ => + s"${field.name.get}" + } + } + + /** + * Given an Spark Catalyst [[Expression]] and a field name the method extracts + * the parent search expression and the expression that contains the field name + * + * @param exp The Spark Catalyst [[Expression]] to extract from. + * @param name The field name to search for. + * @return A tuple with the parent expression and the leaf expression that + * contains the given name. + */ + def extractSearchQuery(exp: Expression, name: String): (Expression, Expression) = { + val splits = name.split(".") + val expFound = exp.find { + case a: AttributeReference if splits.forall(s => a.name.contains(s)) => true + case f: GetStructField if splits.forall(s => f.toString().contains(s)) => true + case _ => false + }.get + val parent = exp.find { + case e: Expression if e.containsChild.contains(expFound) => true + case _ => false + }.get + (parent, expFound) + } + + /** + * Given an Spark Catalyst [[Expression]], a needle [[Expression]] and a replace + * [[Expression]] the method will replace the needle with the replacement into + * the parent expression. + * + * @param parent The parent Spark Catalyst [[Expression]] into which to replace. + * @param needle The Spark Catalyst [[Expression]] needle to search for. + * @param repl The replacement Spark Catalyst [[Expression]]. + * @return A new Spark Catalyst [[Expression]]. + */ + def replaceInSearchQuery( + parent: Expression, + needle: Expression, + repl: Expression): Expression = { + parent.mapChildren { c => + if (c == needle) { + repl + } else { + c + } + } + } + + /** + * Given an Spark Catalyst [[Expression]] and a field name the method + * extracts the [[AttributeReference]] for that field name. + * + * @param exp The Spark Catalyst [[Expression]] to extract from. + * @param name The field name for which to extract the attribute reference. + * @return A Spark Catalyst [[AttributeReference]] pointing to the field name. + */ + def extractAttributeRef(exp: Expression, name: String): AttributeReference = { + val splits = name.split(".") + val elem = exp.find { + case a: AttributeReference if splits.contains(a.name) => true + case _ => false + } + elem.get.asInstanceOf[AttributeReference] + } + + /** + * Given a Spark Catalyst [[Expression]] and a field name the method + * extracts the type of the field as a Spark SQL [[DataType]]. + * + * @param exp The Spark Catalyst [[Expression]] from which to extract the type. + * @param name The field name for which we need to get the type. + * @return A Spark SQL [[DataType]] of the given field name. + */ + def extractTypeFromExpression(exp: Expression, name: String): DataType = { + val splits = name.split(".") + val elem = exp.flatMap { + case a: AttributeReference => + if (splits.forall(s => a.name == s)) { + Some((name, a.dataType)) + } else { + Try({ + val h :: t = splits.toList + if (a.name == h && a.dataType.isInstanceOf[StructType]) { + val itFields = t.flatMap { i => + a.dataType + .asInstanceOf[StructType] + .find(_.name.equalsIgnoreCase(i)) + .map(j => (i, j.dataType)) + } + Some(itFields.last) + } else { + None + } + }).getOrElse(None) + } + case f: GetStructField if splits.forall(s => f.toString().contains(s)) => + Some((name, f.dataType)) + case _ => None + } + elem.find(e => e._1 == name || e._1 == splits.last).get._2 + } + + def prefixNestedField(fieldName: String): String = { + ResolverUtils.ResolvedColumn(fieldName, fieldName.contains(".")).normalizedName + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 98f5625f3..6c25a48f2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, GetStructField, In, Literal, NamedExpression, Not} import org.apache.spark.sql.catalyst.optimizer.OptimizeIn import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ @@ -32,8 +32,9 @@ import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONFIGS, IS_HYBRIDSCAN_CANDIDATE} import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation} +import com.microsoft.hyperspace.index.rules.PlanUtils._ import com.microsoft.hyperspace.index.sources.FileBasedRelation -import com.microsoft.hyperspace.util.HyperspaceConf +import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils} object RuleUtils { @@ -256,12 +257,15 @@ object RuleUtils { plan: LogicalPlan, useBucketSpec: Boolean): LogicalPlan = { val provider = Hyperspace.getContext(spark).sourceProviderManager - // Note that we transform *only* the base relation and not other portions of the plan - // (e.g., filters). For instance, given the following input plan: + // Note that depending on the case we transform only the base relation + // and sometimes other portions of the plan (e.g., filters). For instance, + // given the following input plan: // Project(A,B) -> Filter(C = 10) -> Scan (A,B,C,D,E) - // in the presence of a suitable index, the getIndexPlan() method will emit: + // in the presence of a suitable index, we will transform to: // Project(A,B) -> Filter(C = 10) -> Index Scan (A,B,C) - plan transformDown { + // In the case of nested fields we will transform the project and + // filter nodes too. + plan transformUp { case l: LeafNode if provider.isSupportedRelation(l) => val relation = provider.getRelation(l) val location = index.withCachedTag(IndexLogEntryTags.INMEMORYFILEINDEX_INDEX_ONLY) { @@ -276,10 +280,29 @@ object RuleUtils { new ParquetFileFormat, Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) - val updatedOutput = relation.plan.output - .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name)) - .map(_.asInstanceOf[AttributeReference]) + val resolvedFields = + ResolverUtils.resolve(spark, index.indexedColumns ++ index.includedColumns, relation.plan) + val updatedOutput = + if (resolvedFields.isDefined && resolvedFields.get.exists(_.isNested)) { + indexFsRelation.schema.flatMap { s => + relation.plan.output.find(a => s.name.contains(a.name)).map { a => + AttributeReference(s.name, s.dataType, a.nullable, a.metadata)( + NamedExpression.newExprId, + a.qualifier) + } + } + } else { + relation.plan.output + .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name)) + .map(_.asInstanceOf[AttributeReference]) + } relation.createLogicalRelation(indexFsRelation, updatedOutput) + + case p: Project if provider.isSupportedProject(p, index) => + transformProject(p) + + case f: Filter if provider.isSupportedFilter(f, index) => + transformFilter(f) } } @@ -566,4 +589,54 @@ object RuleUtils { assert(shuffleInjected) shuffled } + + private def transformProject(project: Project): Project = { + val projectedFields = project.projectList.map { exp => + val fieldName = extractNamesFromExpression(exp).head + val escapedFieldName = PlanUtils.prefixNestedField(fieldName) + val attr = extractAttributeRef(exp, fieldName) + val fieldType = extractTypeFromExpression(exp, fieldName) + getExprId(project, escapedFieldName) match { + case Some(exprId) => + attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)( + exprId, + attr.qualifier) + case _ => + attr + } + } + project.copy(projectList = projectedFields) + } + + private def transformFilter(filter: Filter): Filter = { + val fieldNames = extractNamesFromExpression(filter.condition) + var mutableFilter = filter + fieldNames.foreach { fieldName => + val escapedFieldName = PlanUtils.prefixNestedField(fieldName) + getExprId(filter, escapedFieldName) match { + case Some(exprId) => + val (parentExpresion, exp) = + extractSearchQuery(filter.condition, fieldName) + val fieldType = extractTypeFromExpression(exp, fieldName) + val attr = extractAttributeRef(exp, fieldName) + val newAttr = attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)( + exprId, + attr.qualifier) + val newExp = exp match { + case _: GetStructField => newAttr + case other: Expression => other + } + val newParentExpression = + replaceInSearchQuery(parentExpresion, exp, newExp) + mutableFilter = filter.copy(condition = newParentExpression) + case _ => + mutableFilter = filter + } + } + mutableFilter + } + + private def getExprId(plan: LogicalPlan, fieldName: String): Option[ExprId] = { + plan.output.find(a => a.name.equalsIgnoreCase(fieldName)).map(_.exprId) + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 49dbf0a3e..01e6fdfd5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -19,12 +19,13 @@ package com.microsoft.hyperspace.index.sources import scala.util.{Success, Try} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.util.hyperspace.Utils import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.index.Relation -import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf} +import com.microsoft.hyperspace.index.{IndexLogEntry, Relation} +import com.microsoft.hyperspace.index.rules.PlanUtils._ +import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf, ResolverUtils} /** * [[FileBasedSourceProviderManager]] is responsible for loading source providers which implements @@ -109,6 +110,45 @@ class FileBasedSourceProviderManager(spark: SparkSession) { } } + /** + * Returns true if the given project is a supported project. If all of the registered + * providers return None, this returns false. + * + * @param project Project to check if it's supported. + * @return True if the given project is a supported relation. + */ + def isSupportedProject(project: Project, index: IndexLogEntry): Boolean = { + val indexCols = index.indexedColumns ++ index.includedColumns + val resolvedIndexCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)) + val hasNestedCols = resolvedIndexCols.exists(_.isNested) + val projectListFields = project.projectList.flatMap(extractNamesFromExpression) + val containsNestedFields = projectListFields.exists(i => indexCols.contains(i)) + var containsNestedChildren = false + project.child.foreach { + case f: Filter => + val filterSupported = isSupportedFilter(f, index) + containsNestedChildren = containsNestedChildren || filterSupported + case _ => + } + hasNestedCols && (containsNestedFields || containsNestedChildren) + } + + /** + * Returns true if the given filter is a supported filter. If all of the registered + * providers return None, this returns false. + * + * @param filter Filter to check if it's supported. + * @return True if the given project is a supported relation. + */ + def isSupportedFilter(filter: Filter, index: IndexLogEntry): Boolean = { + val indexCols = index.indexedColumns ++ index.includedColumns + val resolvedIndexCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)) + val hasNestedCols = resolvedIndexCols.exists(_.isNested) + val filterFields = extractNamesFromExpression(filter.condition).toSeq + val supported = filterFields.exists(i => indexCols.contains(i)) + hasNestedCols && supported + } + /** * Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns * [[Option]] for each provider built. This function ensures that only one provider returns diff --git a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala index b198711b7..5fc228fa4 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala @@ -31,76 +31,6 @@ import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn.NESTED_FIELD_P */ object ResolverUtils { - /** - * [[ResolvedColumn]] stores information when a column name is resolved against the - * analyzed plan and its schema. - * - * Outside unit tests, this object should not be created directly, but via the `resolve` function, - * or `ResolvedColumn.apply` with a normalized name. - * - * @param name The column name resolved from an analyzed plan. - * @param isNested Flag to denote if this column is nested or not. - */ - private[hyperspace] case class ResolvedColumn(name: String, isNested: Boolean) { - assert(!isNested || (name.contains(".") && !name.startsWith(NESTED_FIELD_PREFIX))) - // Quotes will be removed from `resolve` and nested columns with quotes (e.g., "a.`b.c`.d") - // are not supported. - assert(!name.contains("`")) - - // For nested fields, the column name is prefixed with `NESTED_FIELD_PREFIX`. - lazy val normalizedName = { - if (isNested) { - s"$NESTED_FIELD_PREFIX$name" - } else { - name - } - } - - /** - * Create a column using the resolved name. Top level column names are quoted, and - * nested column names are aliased with normalized names. - * - * @return [[Column]] object created using the resolved name. - */ - def toColumn: Column = { - if (isNested) { - // No need to quote the string for "as" even if it contains dots. - col(name).as(normalizedName) - } else { - col(quote(name)) - } - } - - /** - * Create a column using the normalized name. Since the normalized name is already flattened - * with "dots", it is quoted. - * - * @return [[Column]] object create using the normalized name. - */ - def toNormalizedColumn: Column = col(quote(normalizedName)) - - private def quote(name: String) = s"`$name`" - } - - private[hyperspace] object ResolvedColumn { - private val NESTED_FIELD_PREFIX = "__hs_nested." - - /** - * Given a normalized column name, create [[ResolvedColumn]] after handling the prefix - * for nested columns. - * - * @param normalizedColumnName Normalized column name. - * @return [[ResolvedColumn]] created from the given normalized column name. - */ - def apply(normalizedColumnName: String): ResolvedColumn = { - if (normalizedColumnName.startsWith(NESTED_FIELD_PREFIX)) { - ResolvedColumn(normalizedColumnName.substring(NESTED_FIELD_PREFIX.length), isNested = true) - } else { - ResolvedColumn(normalizedColumnName, isNested = false) - } - } - } - /** * Return available string if required string can be resolved with it, based on spark resolver. * @@ -162,16 +92,19 @@ object ResolverUtils { def resolve( spark: SparkSession, requiredStrings: Seq[String], - plan: LogicalPlan): Option[Seq[ResolvedColumn]] = { + plan: LogicalPlan, + resolverMethod: (String, LogicalPlan, Resolver) => Option[Expression] = + defaultResolverMethod, + throwIfNotInSchema: Boolean = true): Option[Seq[ResolvedColumn]] = { val schema = plan.schema val resolver = spark.sessionState.conf.resolver val resolved = requiredStrings.map { requiredField => - plan - .resolveQuoted(requiredField, resolver) + resolverMethod(requiredField, plan, resolver) .map { expr => val resolvedColNameParts = extractColumnName(expr) validateResolvedColumnName(requiredField, resolvedColNameParts) - val origColNameParts = getColumnNameFromSchema(schema, resolvedColNameParts, resolver) + val origColNameParts = + getColumnNameFromSchema(schema, resolvedColNameParts, resolver, throwIfNotInSchema) ResolvedColumn(origColNameParts.mkString("."), origColNameParts.length > 1) } .getOrElse { return None } @@ -179,6 +112,22 @@ object ResolverUtils { Some(resolved) } + /** + * The default way of resolving field names. + * + * @param fieldName The field name to resolve. + * @param plan The Logical Plan that contains information about the field. + * @param resolver The resolver to use to resolve the field. + * @return A optional [[Expression]] that is the resolved field name. + * @return + */ + protected[hyperspace] def defaultResolverMethod( + fieldName: String, + plan: LogicalPlan, + resolver: Resolver): Option[Expression] = { + plan.resolveQuoted(fieldName, resolver) + } + // Extracts the parts of a nested field access path from an expression. private def extractColumnName(expr: Expression): Seq[String] = { expr match { @@ -212,19 +161,106 @@ object ResolverUtils { private def getColumnNameFromSchema( schema: StructType, resolvedColNameParts: Seq[String], - resolver: Resolver): Seq[String] = resolvedColNameParts match { + resolver: Resolver, + throwIfNotInSchema: Boolean = true): Seq[String] = resolvedColNameParts match { case h :: tail => - val field = schema.find(f => resolver(f.name, h)).get - field match { - case StructField(name, s: StructType, _, _) => - name +: getColumnNameFromSchema(s, tail, resolver) - case StructField(_, _: ArrayType, _, _) => - // TODO: Nested arrays will be supported later - throw HyperspaceException("Array types are not supported.") - case StructField(_, _: MapType, _, _) => - // TODO: Nested maps will be supported later - throw HyperspaceException("Map types are not supported") - case f => Seq(f.name) + val fieldOpt = schema.find(f => resolver(f.name, h)) + fieldOpt match { + case Some(field) => + field match { + case StructField(name, s: StructType, _, _) => + name +: getColumnNameFromSchema(s, tail, resolver) + case StructField(_, _: ArrayType, _, _) => + // TODO: Nested arrays will be supported later + throw HyperspaceException("Array types are not supported.") + case StructField(_, _: MapType, _, _) => + // TODO: Nested maps will be supported later + throw HyperspaceException("Map types are not supported") + case f => Seq(f.name) + } + case _ => + if (throwIfNotInSchema) { + throw HyperspaceException(s"Hyperspace cannot not find $h in schema") + } else { + if (tail.nonEmpty) { + h +: getColumnNameFromSchema(schema, tail, resolver, throwIfNotInSchema) + } else { + Seq(h) + } + } + } + + } + + /** + * [[ResolvedColumn]] stores information when a column name is resolved against the + * analyzed plan and its schema. + * + * Outside unit tests, this object should not be created directly, but via the `resolve` function, + * or `ResolvedColumn.apply` with a normalized name. + * + * @param name The column name resolved from an analyzed plan. + * @param isNested Flag to denote if this column is nested or not. + */ + private[hyperspace] case class ResolvedColumn(name: String, isNested: Boolean) { + assert(!isNested || (name.contains(".") && !name.startsWith(NESTED_FIELD_PREFIX))) + // Quotes will be removed from `resolve` and nested columns with quotes (e.g., "a.`b.c`.d") + // are not supported. + assert(!name.contains("`")) + + // For nested fields, the column name is prefixed with `NESTED_FIELD_PREFIX`. + lazy val normalizedName = { + if (isNested) { + s"$NESTED_FIELD_PREFIX$name" + } else { + name + } + } + + /** + * Create a column using the resolved name. Top level column names are quoted, and + * nested column names are aliased with normalized names. + * + * @return [[Column]] object created using the resolved name. + */ + def toColumn: Column = { + if (isNested) { + // No need to quote the string for "as" even if it contains dots. + col(name).as(normalizedName) + } else { + col(quote(name)) + } + } + + /** + * Create a column using the normalized name. Since the normalized name is already flattened + * with "dots", it is quoted. + * + * @return [[Column]] object create using the normalized name. + */ + def toNormalizedColumn: Column = col(quote(normalizedName)) + + private def quote(name: String) = s"`$name`" + } + + private[hyperspace] object ResolvedColumn { + private val NESTED_FIELD_PREFIX = "__hs_nested." + + /** + * Given a normalized column name, create [[ResolvedColumn]] after handling the prefix + * for nested columns. + * + * @param normalizedColumnName Normalized column name. + * @return [[ResolvedColumn]] created from the given normalized column name. + */ + def apply(normalizedColumnName: String): ResolvedColumn = { + if (normalizedColumnName.startsWith(NESTED_FIELD_PREFIX)) { + ResolvedColumn( + normalizedColumnName.substring(NESTED_FIELD_PREFIX.length), + isNested = true) + } else { + ResolvedColumn(normalizedColumnName, isNested = false) } + } } } From f329d7a90dd7ee329a6d14601d5bd47f7ead44b7 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Thu, 8 Apr 2021 11:00:28 +0300 Subject: [PATCH 2/5] Integrate feedback --- .../hyperspace/index/rules/PlanUtils.scala | 4 +- .../hyperspace/index/rules/RuleUtils.scala | 6 +- .../FileBasedSourceProviderManager.scala | 44 +++--- .../hyperspace/util/ResolverUtils.scala | 144 +++++++++--------- 4 files changed, 101 insertions(+), 97 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala index ce17c5605..ab11854dc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala @@ -70,7 +70,7 @@ object PlanUtils { /** * Given an Spark Catalyst [[Expression]] and a field name the method extracts - * the parent search expression and the expression that contains the field name + * the parent search expression and the expression that contains the field name. * * @param exp The Spark Catalyst [[Expression]] to extract from. * @param name The field name to search for. @@ -101,7 +101,7 @@ object PlanUtils { * @param repl The replacement Spark Catalyst [[Expression]]. * @return A new Spark Catalyst [[Expression]]. */ - def replaceInSearchQuery( + def replaceExpression( parent: Expression, needle: Expression, repl: Expression): Expression = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 6c25a48f2..ed3fea888 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -298,10 +298,10 @@ object RuleUtils { } relation.createLogicalRelation(indexFsRelation, updatedOutput) - case p: Project if provider.isSupportedProject(p, index) => + case p: Project if provider.hasNestedColumns(p, index) => transformProject(p) - case f: Filter if provider.isSupportedFilter(f, index) => + case f: Filter if provider.hasNestedColumns(f, index) => transformFilter(f) } } @@ -627,7 +627,7 @@ object RuleUtils { case other: Expression => other } val newParentExpression = - replaceInSearchQuery(parentExpresion, exp, newExp) + replaceExpression(parentExpresion, exp, newExp) mutableFilter = filter.copy(condition = newParentExpression) case _ => mutableFilter = filter diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 01e6fdfd5..036f5cd70 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -117,36 +117,40 @@ class FileBasedSourceProviderManager(spark: SparkSession) { * @param project Project to check if it's supported. * @return True if the given project is a supported relation. */ - def isSupportedProject(project: Project, index: IndexLogEntry): Boolean = { + def hasNestedColumns(project: Project, index: IndexLogEntry): Boolean = { val indexCols = index.indexedColumns ++ index.includedColumns - val resolvedIndexCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)) - val hasNestedCols = resolvedIndexCols.exists(_.isNested) - val projectListFields = project.projectList.flatMap(extractNamesFromExpression) - val containsNestedFields = projectListFields.exists(i => indexCols.contains(i)) - var containsNestedChildren = false - project.child.foreach { - case f: Filter => - val filterSupported = isSupportedFilter(f, index) - containsNestedChildren = containsNestedChildren || filterSupported - case _ => + val hasNestedCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)).exists(_.isNested) + if (hasNestedCols) { + val projectListFields = project.projectList.flatMap(extractNamesFromExpression) + val containsNestedFields = projectListFields.exists(i => indexCols.contains(i)) + var containsNestedChildren = false + project.child.foreach { + case f: Filter => + val filterSupported = hasNestedColumns(f, index) + containsNestedChildren = containsNestedChildren || filterSupported + case _ => + } + containsNestedFields || containsNestedChildren + } else { + false } - hasNestedCols && (containsNestedFields || containsNestedChildren) } /** - * Returns true if the given filter is a supported filter. If all of the registered - * providers return None, this returns false. + * Returns true if the given filter has nested columns. * * @param filter Filter to check if it's supported. * @return True if the given project is a supported relation. */ - def isSupportedFilter(filter: Filter, index: IndexLogEntry): Boolean = { + def hasNestedColumns(filter: Filter, index: IndexLogEntry): Boolean = { val indexCols = index.indexedColumns ++ index.includedColumns - val resolvedIndexCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)) - val hasNestedCols = resolvedIndexCols.exists(_.isNested) - val filterFields = extractNamesFromExpression(filter.condition).toSeq - val supported = filterFields.exists(i => indexCols.contains(i)) - hasNestedCols && supported + val hasNestedCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)).exists(_.isNested) + if (hasNestedCols) { + val filterFields = extractNamesFromExpression(filter.condition).toSeq + filterFields.exists(i => indexCols.contains(i)) + } else { + false + } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala index 5fc228fa4..69fb47177 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala @@ -31,6 +31,78 @@ import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn.NESTED_FIELD_P */ object ResolverUtils { + /** + * [[ResolvedColumn]] stores information when a column name is resolved against the + * analyzed plan and its schema. + * + * Outside unit tests, this object should not be created directly, but via the `resolve` function, + * or `ResolvedColumn.apply` with a normalized name. + * + * @param name The column name resolved from an analyzed plan. + * @param isNested Flag to denote if this column is nested or not. + */ + private[hyperspace] case class ResolvedColumn(name: String, isNested: Boolean) { + assert(!isNested || (name.contains(".") && !name.startsWith(NESTED_FIELD_PREFIX))) + // Quotes will be removed from `resolve` and nested columns with quotes (e.g., "a.`b.c`.d") + // are not supported. + assert(!name.contains("`")) + + // For nested fields, the column name is prefixed with `NESTED_FIELD_PREFIX`. + lazy val normalizedName = { + if (isNested) { + s"$NESTED_FIELD_PREFIX$name" + } else { + name + } + } + + /** + * Create a column using the resolved name. Top level column names are quoted, and + * nested column names are aliased with normalized names. + * + * @return [[Column]] object created using the resolved name. + */ + def toColumn: Column = { + if (isNested) { + // No need to quote the string for "as" even if it contains dots. + col(name).as(normalizedName) + } else { + col(quote(name)) + } + } + + /** + * Create a column using the normalized name. Since the normalized name is already flattened + * with "dots", it is quoted. + * + * @return [[Column]] object create using the normalized name. + */ + def toNormalizedColumn: Column = col(quote(normalizedName)) + + private def quote(name: String) = s"`$name`" + } + + private[hyperspace] object ResolvedColumn { + private val NESTED_FIELD_PREFIX = "__hs_nested." + + /** + * Given a normalized column name, create [[ResolvedColumn]] after handling the prefix + * for nested columns. + * + * @param normalizedColumnName Normalized column name. + * @return [[ResolvedColumn]] created from the given normalized column name. + */ + def apply(normalizedColumnName: String): ResolvedColumn = { + if (normalizedColumnName.startsWith(NESTED_FIELD_PREFIX)) { + ResolvedColumn( + normalizedColumnName.substring(NESTED_FIELD_PREFIX.length), + isNested = true) + } else { + ResolvedColumn(normalizedColumnName, isNested = false) + } + } + } + /** * Return available string if required string can be resolved with it, based on spark resolver. * @@ -191,76 +263,4 @@ object ResolverUtils { } } - - /** - * [[ResolvedColumn]] stores information when a column name is resolved against the - * analyzed plan and its schema. - * - * Outside unit tests, this object should not be created directly, but via the `resolve` function, - * or `ResolvedColumn.apply` with a normalized name. - * - * @param name The column name resolved from an analyzed plan. - * @param isNested Flag to denote if this column is nested or not. - */ - private[hyperspace] case class ResolvedColumn(name: String, isNested: Boolean) { - assert(!isNested || (name.contains(".") && !name.startsWith(NESTED_FIELD_PREFIX))) - // Quotes will be removed from `resolve` and nested columns with quotes (e.g., "a.`b.c`.d") - // are not supported. - assert(!name.contains("`")) - - // For nested fields, the column name is prefixed with `NESTED_FIELD_PREFIX`. - lazy val normalizedName = { - if (isNested) { - s"$NESTED_FIELD_PREFIX$name" - } else { - name - } - } - - /** - * Create a column using the resolved name. Top level column names are quoted, and - * nested column names are aliased with normalized names. - * - * @return [[Column]] object created using the resolved name. - */ - def toColumn: Column = { - if (isNested) { - // No need to quote the string for "as" even if it contains dots. - col(name).as(normalizedName) - } else { - col(quote(name)) - } - } - - /** - * Create a column using the normalized name. Since the normalized name is already flattened - * with "dots", it is quoted. - * - * @return [[Column]] object create using the normalized name. - */ - def toNormalizedColumn: Column = col(quote(normalizedName)) - - private def quote(name: String) = s"`$name`" - } - - private[hyperspace] object ResolvedColumn { - private val NESTED_FIELD_PREFIX = "__hs_nested." - - /** - * Given a normalized column name, create [[ResolvedColumn]] after handling the prefix - * for nested columns. - * - * @param normalizedColumnName Normalized column name. - * @return [[ResolvedColumn]] created from the given normalized column name. - */ - def apply(normalizedColumnName: String): ResolvedColumn = { - if (normalizedColumnName.startsWith(NESTED_FIELD_PREFIX)) { - ResolvedColumn( - normalizedColumnName.substring(NESTED_FIELD_PREFIX.length), - isNested = true) - } else { - ResolvedColumn(normalizedColumnName, isNested = false) - } - } - } } From 41d37f8e145930bf54325101705ead86746db8e8 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Tue, 13 Apr 2021 00:34:34 +0300 Subject: [PATCH 3/5] Integrate feedback (2) --- .../index/rules/FilterIndexRule.scala | 39 +++++++--- .../hyperspace/index/rules/PlanUtils.scala | 67 +++++++++++------ .../hyperspace/index/rules/RuleUtils.scala | 74 +++++++++++-------- .../FileBasedSourceProviderManager.scala | 20 +++-- .../hyperspace/util/ResolverUtils.scala | 4 + .../index/E2EHyperspaceRulesTest.scala | 41 +++++++++- 6 files changed, 171 insertions(+), 74 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala index 023bf84cc..b01b9fe12 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala @@ -135,8 +135,8 @@ object FilterIndexRule case (Some(resolvedOutputColumns), Some(resolvedFilterColumns)) => val candidateIndexes = allIndexes.filter { index => indexCoversPlan( - resolvedOutputColumns.map(_.name), - resolvedFilterColumns.map(_.name), + resolvedOutputColumns.map(_.normalizedName), + resolvedFilterColumns.map(_.normalizedName), index.indexedColumns, index.includedColumns) } @@ -188,20 +188,35 @@ object ExtractFilterNode { def unapply(plan: LogicalPlan): Option[returnType] = plan match { case project @ Project(_, filter @ Filter(condition: Expression, ExtractRelation(relation))) if !RuleUtils.isIndexApplied(relation) => + /** + * Due to the fact that there is a different way to get their full nested access path, + * we need to go through project columns and filter solumns to extract the columns + * that are required in the query to be part of the index in order to properly work. + * + * For example, given the following simple plan: + * {{{ + * Project [id#100, name#101, nested#102.nst.field2] + * +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1)) + * +- Relation[id#100,name#101,nested#102] parquet + * }}} + * + * It should be transformed to + * {{{ + * Project [id#1, name#2, __hs_nested.nested.nst.field2#3] + * +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && + * (__hs_nested.nested.nst.field1#0 = wa1)) + * +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2, + * __hs_nested.nested.nst.field2#3] + * Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1) + * + * }}} + */ val projectColumnNames = CleanupAliases(project) .asInstanceOf[Project] .projectList - .map(extractNamesFromExpression) + .map(i => extractNamesFromExpression(i).toKeep) .flatMap(_.toSeq) - val filterColumnNames = extractNamesFromExpression(condition).toSeq - .sortBy(-_.length) - .foldLeft(Seq.empty[String]) { (acc, e) => - if (!acc.exists(i => i.startsWith(e))) { - acc :+ e - } else { - acc - } - } + val filterColumnNames = extractNamesFromExpression(condition).toKeep.toSeq Some(project, filter, projectColumnNames, filterColumnNames) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala index ab11854dc..0c1f0a5c3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala @@ -18,35 +18,59 @@ package com.microsoft.hyperspace.index.rules import scala.util.Try -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetStructField} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BinaryExpression, Expression, GetStructField, IsNotNull, UnaryExpression} import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.util.ResolverUtils object PlanUtils { + private[hyperspace] case class ExtractedNames(toKeep: Set[String], toDiscard: Set[String]) + /** * The method extract field names from a Spark Catalyst [[Expression]]. * * @param exp The Spark Catalyst expression from which to extract names. * @return A set of distinct field names. */ - def extractNamesFromExpression(exp: Expression): Set[String] = { - exp match { - case AttributeReference(name, _, _, _) => - Set(s"$name") - case Alias(child, _) => - extractNamesFromExpression(child) - case otherExp => - otherExp.containsChild.flatMap { - case g: GetStructField => - Some(s"${getChildNameFromStruct(g)}") - case e: Expression => - Some(extractNamesFromExpression(e).filter(_.nonEmpty).mkString(".")) - case _ => - None - } + def extractNamesFromExpression(exp: Expression): ExtractedNames = { + + def extractNames( + e: Expression, + prevExpStrTypes: Seq[String] = Seq.empty): Set[(String, Seq[String])] = { + e match { + case g: GetStructField => + Set((s"${getChildNameFromStruct(g)}", prevExpStrTypes :+ "getStructField")) + case AttributeReference(name, _, _, _) => + Set((s"$name", prevExpStrTypes :+ "attrRef")) + case Alias(child, _) => + extractNames(child, prevExpStrTypes :+ "alias") + case b: BinaryExpression => + val leftFields = extractNames(b.left, prevExpStrTypes :+ "binaryLeft") + val rightFields = extractNames(b.right, prevExpStrTypes :+ "binaryRight") + leftFields ++ rightFields + case u: IsNotNull => + extractNames(u.child, prevExpStrTypes :+ "isNotNull") + case u: UnaryExpression => + extractNames(u.child, prevExpStrTypes :+ "unary") + case _ => + Set.empty[(String, Seq[String])] + } } + + var toRemove = Seq.empty[String] + val toKeep = extractNames(exp).toSeq + .sortBy(-_._1.length) + .foldLeft(Seq.empty[String]) { (acc, e) => + val (fieldName, expStrType) = e + if (expStrType.contains("isNotNull") && acc.exists(i => i.startsWith(fieldName))) { + toRemove :+= fieldName + acc + } else { + acc :+ fieldName + } + } + ExtractedNames(toKeep.toSet, toRemove.toSet) } /** @@ -78,7 +102,7 @@ object PlanUtils { * contains the given name. */ def extractSearchQuery(exp: Expression, name: String): (Expression, Expression) = { - val splits = name.split(".") + val splits = name.split("\\.") val expFound = exp.find { case a: AttributeReference if splits.forall(s => a.name.contains(s)) => true case f: GetStructField if splits.forall(s => f.toString().contains(s)) => true @@ -101,10 +125,7 @@ object PlanUtils { * @param repl The replacement Spark Catalyst [[Expression]]. * @return A new Spark Catalyst [[Expression]]. */ - def replaceExpression( - parent: Expression, - needle: Expression, - repl: Expression): Expression = { + def replaceExpression(parent: Expression, needle: Expression, repl: Expression): Expression = { parent.mapChildren { c => if (c == needle) { repl @@ -123,7 +144,7 @@ object PlanUtils { * @return A Spark Catalyst [[AttributeReference]] pointing to the field name. */ def extractAttributeRef(exp: Expression, name: String): AttributeReference = { - val splits = name.split(".") + val splits = name.split("\\.") val elem = exp.find { case a: AttributeReference if splits.contains(a.name) => true case _ => false @@ -140,7 +161,7 @@ object PlanUtils { * @return A Spark SQL [[DataType]] of the given field name. */ def extractTypeFromExpression(exp: Expression, name: String): DataType = { - val splits = name.split(".") + val splits = name.split("\\.") val elem = exp.flatMap { case a: AttributeReference => if (splits.forall(s => a.name == s)) { diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index ed3fea888..b3cf9d579 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, GetStructField, In, Literal, NamedExpression, Not} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryOperator, ExprId, GetStructField, In, IsNotNull, Literal, NamedExpression, Not} import org.apache.spark.sql.catalyst.optimizer.OptimizeIn import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ @@ -272,16 +272,21 @@ object RuleUtils { new InMemoryFileIndex(spark, index.content.files, Map(), None) } + val newSchema = StructType( + index.schema.filter(i => relation.plan.schema.exists(j => i.name.contains(j.name)))) + val indexFsRelation = new IndexHadoopFsRelation( location, new StructType(), - StructType(index.schema.filter(relation.plan.schema.contains(_))), + StructType(newSchema), if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) - val resolvedFields = - ResolverUtils.resolve(spark, index.indexedColumns ++ index.includedColumns, relation.plan) + val resolvedFields = ResolverUtils.resolve( + spark, + (index.indexedColumns ++ index.includedColumns).map(ResolverUtils.ResolvedColumn(_).name), + relation.plan) val updatedOutput = if (resolvedFields.isDefined && resolvedFields.get.exists(_.isNested)) { indexFsRelation.schema.flatMap { s => @@ -592,11 +597,11 @@ object RuleUtils { private def transformProject(project: Project): Project = { val projectedFields = project.projectList.map { exp => - val fieldName = extractNamesFromExpression(exp).head + val fieldName = extractNamesFromExpression(exp).toKeep.head val escapedFieldName = PlanUtils.prefixNestedField(fieldName) val attr = extractAttributeRef(exp, fieldName) val fieldType = extractTypeFromExpression(exp, fieldName) - getExprId(project, escapedFieldName) match { + getExprId(project.child, escapedFieldName) match { case Some(exprId) => attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)( exprId, @@ -609,31 +614,42 @@ object RuleUtils { } private def transformFilter(filter: Filter): Filter = { - val fieldNames = extractNamesFromExpression(filter.condition) - var mutableFilter = filter - fieldNames.foreach { fieldName => - val escapedFieldName = PlanUtils.prefixNestedField(fieldName) - getExprId(filter, escapedFieldName) match { - case Some(exprId) => - val (parentExpresion, exp) = - extractSearchQuery(filter.condition, fieldName) - val fieldType = extractTypeFromExpression(exp, fieldName) - val attr = extractAttributeRef(exp, fieldName) - val newAttr = attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)( - exprId, - attr.qualifier) - val newExp = exp match { - case _: GetStructField => newAttr - case other: Expression => other + val names = extractNamesFromExpression(filter.condition) + val transformedCondition = filter.condition.transformDown { + case bo @ BinaryOperator(IsNotNull(AttributeReference(name, _, _, _)), other) => + if (names.toDiscard.contains(name)) { + other + } else { + bo + } + case bo @ BinaryOperator(other, IsNotNull(AttributeReference(name, _, _, _))) => + if (names.toDiscard.contains(name)) { + other + } else { + bo + } + case g: GetStructField => + val n = getChildNameFromStruct(g) + if (names.toKeep.contains(n)) { + val escapedFieldName = PlanUtils.prefixNestedField(n) + getExprId(filter, escapedFieldName) match { + case Some(exprId) => + val fieldType = extractTypeFromExpression(g, n) + val attr = extractAttributeRef(g, n) + attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)( + exprId, + attr.qualifier + ) + case _ => + g } - val newParentExpression = - replaceExpression(parentExpresion, exp, newExp) - mutableFilter = filter.copy(condition = newParentExpression) - case _ => - mutableFilter = filter - } + } else { + g + } + case o => + o } - mutableFilter + filter.copy(condition = transformedCondition) } private def getExprId(plan: LogicalPlan, fieldName: String): Option[ExprId] = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 036f5cd70..c24d87cbb 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -118,11 +118,13 @@ class FileBasedSourceProviderManager(spark: SparkSession) { * @return True if the given project is a supported relation. */ def hasNestedColumns(project: Project, index: IndexLogEntry): Boolean = { - val indexCols = index.indexedColumns ++ index.includedColumns - val hasNestedCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)).exists(_.isNested) + val indexCols = + (index.indexedColumns ++ index.includedColumns).map(i => ResolverUtils.ResolvedColumn(i)) + val hasNestedCols = indexCols.exists(_.isNested) if (hasNestedCols) { - val projectListFields = project.projectList.flatMap(extractNamesFromExpression) - val containsNestedFields = projectListFields.exists(i => indexCols.contains(i)) + val projectListFields = project.projectList.flatMap(extractNamesFromExpression(_).toKeep) + val containsNestedFields = + projectListFields.exists(i => indexCols.exists(j => j.isNested && j.name == i)) var containsNestedChildren = false project.child.foreach { case f: Filter => @@ -143,11 +145,13 @@ class FileBasedSourceProviderManager(spark: SparkSession) { * @return True if the given project is a supported relation. */ def hasNestedColumns(filter: Filter, index: IndexLogEntry): Boolean = { - val indexCols = index.indexedColumns ++ index.includedColumns - val hasNestedCols = indexCols.map(i => ResolverUtils.ResolvedColumn(i)).exists(_.isNested) + val indexCols = + (index.indexedColumns ++ index.includedColumns).map(i => ResolverUtils.ResolvedColumn(i)) + val hasNestedCols = indexCols.exists(_.isNested) if (hasNestedCols) { - val filterFields = extractNamesFromExpression(filter.condition).toSeq - filterFields.exists(i => indexCols.contains(i)) + val filterFields = extractNamesFromExpression(filter.condition).toKeep.toSeq + val resolvedFilterFields = filterFields.map(ResolverUtils.ResolvedColumn(_)) + resolvedFilterFields.exists(i => indexCols.exists(j => j == i || j.name == i.name)) } else { false } diff --git a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala index 69fb47177..a6c0dd638 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala @@ -209,9 +209,11 @@ object ResolverUtils { extractColumnName(child) :+ name case _: GetArrayStructFields => // TODO: Nested arrays will be supported later + // See https://github.com/microsoft/hyperspace/issues/372 throw HyperspaceException("Array types are not supported.") case _: GetMapValue => // TODO: Nested maps will be supported later + // See https://github.com/microsoft/hyperspace/issues/411 throw HyperspaceException("Map types are not supported.") case Alias(nested: ExtractValue, _) => extractColumnName(nested) @@ -244,9 +246,11 @@ object ResolverUtils { name +: getColumnNameFromSchema(s, tail, resolver) case StructField(_, _: ArrayType, _, _) => // TODO: Nested arrays will be supported later + // See https://github.com/microsoft/hyperspace/issues/372 throw HyperspaceException("Array types are not supported.") case StructField(_, _: MapType, _, _) => // TODO: Nested maps will be supported later + // See https://github.com/microsoft/hyperspace/issues/411 throw HyperspaceException("Map types are not supported") case f => Seq(f.name) } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index caf3a0986..526a23a75 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -24,11 +24,10 @@ import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig, TestUtils} +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, SampleNestedData, TestConfig, TestUtils} import com.microsoft.hyperspace.index.IndexConstants.{GLOBBING_PATTERN_KEY, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK} import com.microsoft.hyperspace.index.IndexLogEntryTags._ import com.microsoft.hyperspace.index.execution.BucketUnionStrategy -import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} import com.microsoft.hyperspace.util.PathUtils @@ -119,6 +118,44 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { } } + test( + "E2E test for filter query on nested columns") { + val loc = testDir + "samplenestedparquet" + val dataColumns = Seq("Date", "RGUID", "Query", "imprs", "clicks", "nested") + SampleNestedData.save(spark, loc, dataColumns) + + Seq(true, false).foreach { enableLineage => + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> enableLineage.toString) { + withIndex("filterNestedIndex") { + val df = spark.read.parquet(loc) + val indexConfig = IndexConfig("filterNestedIndex", + Seq("nested.leaf.id", "nested.leaf.cnt"), Seq("Date")) + + hyperspace.createIndex(df, indexConfig) + + def query(): DataFrame = + df.filter("nested.leaf.cnt > 10 and nested.leaf.id == 'leaf_id9'") + .select("Date", "nested.leaf.cnt", "nested.leaf.id") + + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) + + val res = query().collect().sortWith((r1, r2) => r1.getInt(1) < r2.getInt(1)) + + assert(res.length == 3) + assert(res(0).getString(0) == "2019-10-03") + assert(res(0).getInt(1) == 12) + assert(res(0).getString(2) == "leaf_id9") + assert(res(1).getString(0) == "2019-10-03") + assert(res(1).getInt(1) == 21) + assert(res(1).getString(2) == "leaf_id9") + assert(res(2).getString(0) == "2019-10-03") + assert(res(2).getInt(1) == 22) + assert(res(2).getString(2) == "leaf_id9") + } + } + } + } + test("E2E test for case insensitive filter query utilizing indexes.") { val df = spark.read.parquet(nonPartitionedDataPath) val indexConfig = IndexConfig("filterIndex", Seq("C3"), Seq("C1")) From 51e5a7576fa73a1f6ebbb1d364a94dd80697650d Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Tue, 13 Apr 2021 13:02:04 +0300 Subject: [PATCH 4/5] Integrate feedback (3) --- .../hyperspace/index/rules/PlanUtils.scala | 2 +- .../hyperspace/index/rules/RuleUtils.scala | 155 ++++++++++++++++-- .../FileBasedSourceProviderManager.scala | 47 ------ 3 files changed, 139 insertions(+), 65 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala index 0c1f0a5c3..762d8042e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala @@ -146,7 +146,7 @@ object PlanUtils { def extractAttributeRef(exp: Expression, name: String): AttributeReference = { val splits = name.split("\\.") val elem = exp.find { - case a: AttributeReference if splits.contains(a.name) => true + case a: AttributeReference if name == a.name || splits.contains(a.name) => true case _ => false } elem.get.asInstanceOf[AttributeReference] diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index b3cf9d579..d332ea928 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -278,35 +278,44 @@ object RuleUtils { val indexFsRelation = new IndexHadoopFsRelation( location, new StructType(), - StructType(newSchema), + newSchema, if (useBucketSpec) Some(index.bucketSpec) else None, new ParquetFileFormat, Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) val resolvedFields = ResolverUtils.resolve( spark, - (index.indexedColumns ++ index.includedColumns).map(ResolverUtils.ResolvedColumn(_).name), + (index.indexedColumns ++ index.includedColumns) + .map(ResolverUtils.ResolvedColumn(_).name), relation.plan) val updatedOutput = if (resolvedFields.isDefined && resolvedFields.get.exists(_.isNested)) { indexFsRelation.schema.flatMap { s => - relation.plan.output.find(a => s.name.contains(a.name)).map { a => - AttributeReference(s.name, s.dataType, a.nullable, a.metadata)( - NamedExpression.newExprId, - a.qualifier) - } + relation.plan.output + .find { a => + ResolverUtils.ResolvedColumn(s.name).name.startsWith(a.name) + } + .map { a => + AttributeReference(s.name, s.dataType, a.nullable, a.metadata)( + NamedExpression.newExprId, + a.qualifier) + } } } else { relation.plan.output - .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name)) - .map(_.asInstanceOf[AttributeReference]) + .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name)) + .map(_.asInstanceOf[AttributeReference]) } relation.createLogicalRelation(indexFsRelation, updatedOutput) - case p: Project if provider.hasNestedColumns(p, index) => + // Given that the index may have top level field for a nested one + // it is needed to transform the projection to use that index field + case p: Project if hasNestedColumns(p, index) => transformProject(p) - case f: Filter if provider.hasNestedColumns(f, index) => + // Given that the index may have top level field for a nested one + // it is needed to transform the filter to use that index field + case f: Filter if hasNestedColumns(f, index) => transformFilter(f) } } @@ -595,12 +604,37 @@ object RuleUtils { shuffled } - private def transformProject(project: Project): Project = { + /** + * The method transforms the project part of a plan to support indexes on + * nested fields. + * + * For example, given the following query: + * {{{ + * df + * .filter("nested.leaf.cnt > 10 and nested.leaf.id == 'leaf_id9'") + * .select("Date", "nested.leaf.cnt") + * }}} + * + * Having this simple projection: + * {{{ + * Project [Date#100, nested#102.leaf.cnt] + * }}} + * + * The projection part should become: + * {{{ + * Project [Date#330, __hs_nested.nested.leaf.cnt#335] + * }}} + * + * @param project The project that needs to be transformed. + * @return The transformed project with support for nested indexed fields. + */ + private[rules] def transformProject(project: Project): Project = { val projectedFields = project.projectList.map { exp => val fieldName = extractNamesFromExpression(exp).toKeep.head val escapedFieldName = PlanUtils.prefixNestedField(fieldName) val attr = extractAttributeRef(exp, fieldName) val fieldType = extractTypeFromExpression(exp, fieldName) + // Try to find it in the project transformed child. getExprId(project.child, escapedFieldName) match { case Some(exprId) => attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)( @@ -613,7 +647,38 @@ object RuleUtils { project.copy(projectList = projectedFields) } - private def transformFilter(filter: Filter): Filter = { + /** + * The method transforms the filter part of a plan to support indexes on + * nested fields. The process is to go through all expression nodes and + * do the following things: + * - Replace retrieval of nested values with index ones. + * - In some specific cases remove the `isnotnull` check because that + * is used on the root of the nested field (ie: `isnotnull(nested#102)` + * does not makes any sense when using the index field). + * + * For example, given the following query: + * {{{ + * df + * .filter("nested.leaf.cnt > 10 and nested.leaf.id == 'leaf_id9'") + * .select("Date", "nested.leaf.cnt") + * }}} + * + * Having this simple filter: + * {{{ + * Filter (isnotnull(nested#102) && (nested#102.leaf.cnt > 10) && + * (nested#102.leaf.id = leaf_id9)) + * }}} + * + * The filter part should become: + * {{{ + * Filter ((__hs_nested.nested.leaf.cnt#335 > 10) && + * (__hs_nested.nested#.leaf.id#336 = leaf_id9)) + * }}} + * + * @param filter The filter that needs to be transformed. + * @return The transformed filter with support for nested indexed fields. + */ + private[rules] def transformFilter(filter: Filter): Filter = { val names = extractNamesFromExpression(filter.condition) val transformedCondition = filter.condition.transformDown { case bo @ BinaryOperator(IsNotNull(AttributeReference(name, _, _, _)), other) => @@ -638,8 +703,7 @@ object RuleUtils { val attr = extractAttributeRef(g, n) attr.copy(escapedFieldName, fieldType, attr.nullable, attr.metadata)( exprId, - attr.qualifier - ) + attr.qualifier) case _ => g } @@ -647,12 +711,69 @@ object RuleUtils { g } case o => - o + o } filter.copy(condition = transformedCondition) } - private def getExprId(plan: LogicalPlan, fieldName: String): Option[ExprId] = { + /** + * The method retrieves the expression id for a given field name. + * + * This method should be mainly used when transforming plans and the + * leaves are already transformed. + * + * @param plan The logical plan from which to get the expression id. + * @param fieldName The name of the field to search for. + * @return An [[ExprId]] if that could be found in the plan otherwise [[None]]. + */ + private[rules] def getExprId(plan: LogicalPlan, fieldName: String): Option[ExprId] = { plan.output.find(a => a.name.equalsIgnoreCase(fieldName)).map(_.exprId) } + + /** + * Returns true if the given project is a supported project. If all of the registered + * providers return None, this returns false. + * + * @param project Project to check if it's supported. + * @return True if the given project is a supported relation. + */ + private[rules] def hasNestedColumns(project: Project, index: IndexLogEntry): Boolean = { + val indexCols = + (index.indexedColumns ++ index.includedColumns).map(i => ResolverUtils.ResolvedColumn(i)) + val hasNestedCols = indexCols.exists(_.isNested) + if (hasNestedCols) { + val projectListFields = project.projectList.flatMap(extractNamesFromExpression(_).toKeep) + val containsNestedFields = + projectListFields.exists(i => indexCols.exists(j => j.isNested && j.name == i)) + var containsNestedChildren = false + project.child.foreach { + case f: Filter => + val filterSupported = hasNestedColumns(f, index) + containsNestedChildren = containsNestedChildren || filterSupported + case _ => + } + containsNestedFields || containsNestedChildren + } else { + false + } + } + + /** + * Returns true if the given filter has nested columns. + * + * @param filter Filter to check if it's supported. + * @return True if the given project is a supported relation. + */ + private[rules] def hasNestedColumns(filter: Filter, index: IndexLogEntry): Boolean = { + val indexCols = + (index.indexedColumns ++ index.includedColumns).map(i => ResolverUtils.ResolvedColumn(i)) + val hasNestedCols = indexCols.exists(_.isNested) + if (hasNestedCols) { + val filterFields = extractNamesFromExpression(filter.condition).toKeep.toSeq + val resolvedFilterFields = filterFields.map(ResolverUtils.ResolvedColumn(_)) + resolvedFilterFields.exists(i => indexCols.exists(j => j == i || j.name == i.name)) + } else { + false + } + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index c24d87cbb..9a7054295 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -110,53 +110,6 @@ class FileBasedSourceProviderManager(spark: SparkSession) { } } - /** - * Returns true if the given project is a supported project. If all of the registered - * providers return None, this returns false. - * - * @param project Project to check if it's supported. - * @return True if the given project is a supported relation. - */ - def hasNestedColumns(project: Project, index: IndexLogEntry): Boolean = { - val indexCols = - (index.indexedColumns ++ index.includedColumns).map(i => ResolverUtils.ResolvedColumn(i)) - val hasNestedCols = indexCols.exists(_.isNested) - if (hasNestedCols) { - val projectListFields = project.projectList.flatMap(extractNamesFromExpression(_).toKeep) - val containsNestedFields = - projectListFields.exists(i => indexCols.exists(j => j.isNested && j.name == i)) - var containsNestedChildren = false - project.child.foreach { - case f: Filter => - val filterSupported = hasNestedColumns(f, index) - containsNestedChildren = containsNestedChildren || filterSupported - case _ => - } - containsNestedFields || containsNestedChildren - } else { - false - } - } - - /** - * Returns true if the given filter has nested columns. - * - * @param filter Filter to check if it's supported. - * @return True if the given project is a supported relation. - */ - def hasNestedColumns(filter: Filter, index: IndexLogEntry): Boolean = { - val indexCols = - (index.indexedColumns ++ index.includedColumns).map(i => ResolverUtils.ResolvedColumn(i)) - val hasNestedCols = indexCols.exists(_.isNested) - if (hasNestedCols) { - val filterFields = extractNamesFromExpression(filter.condition).toKeep.toSeq - val resolvedFilterFields = filterFields.map(ResolverUtils.ResolvedColumn(_)) - resolvedFilterFields.exists(i => indexCols.exists(j => j == i || j.name == i.name)) - } else { - false - } - } - /** * Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns * [[Option]] for each provider built. This function ensures that only one provider returns From 906924f91dd6c20019a19f3a63b344d8ba5d09f6 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Wed, 14 Apr 2021 16:18:48 +0300 Subject: [PATCH 5/5] Support hybrid scans when filtering on nested fields with with index --- .../hyperspace/index/rules/RuleUtils.scala | 71 ++- .../hyperspace/util/ResolverUtils.scala | 12 +- .../index/HybridScanForNestedFieldsTest.scala | 447 ++++++++++++++++++ 3 files changed, 515 insertions(+), 15 deletions(-) create mode 100644 src/test/scala/com/microsoft/hyperspace/index/HybridScanForNestedFieldsTest.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index d332ea928..3756e0829 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -341,7 +341,7 @@ object RuleUtils { useBucketSpec: Boolean, useBucketUnionForAppended: Boolean): LogicalPlan = { val provider = Hyperspace.getContext(spark).sourceProviderManager - var unhandledAppendedFiles: Seq[Path] = Nil + var unhandledAppendedFiles = Seq.empty[Path] // Get transformed plan with index data and appended files if applicable. val indexPlan = plan transformUp { // Use transformUp here as currently one relation is allowed (pre-requisite). @@ -379,8 +379,12 @@ object RuleUtils { } val filesToRead = { + val resolvedFields = + (index.indexedColumns ++ index.includedColumns).map(ResolverUtils.ResolvedColumn(_)) + val hasNestedFields = resolvedFields.exists(_.isNested) + if (useBucketSpec || !index.hasParquetAsSourceFormat || filesDeleted.nonEmpty || - relation.partitionSchema.nonEmpty) { + relation.partitionSchema.nonEmpty || hasNestedFields) { // Since the index data is in "parquet" format, we cannot read source files // in formats other than "parquet" using one FileScan node as the operator requires // files in one homogenous format. To address this, we need to read the appended @@ -404,10 +408,18 @@ object RuleUtils { // In order to handle deleted files, read index data with the lineage column so that // we could inject Filter-Not-In conditions on the lineage column to exclude the indexed // rows from the deleted files. - val newSchema = StructType( - index.schema.filter(s => - relation.plan.schema.contains(s) || (filesDeleted.nonEmpty && s.name.equals( - IndexConstants.DATA_FILE_NAME_ID)))) + val newSchema = StructType(index.schema.filter { s => + val woPrefix = ResolverUtils.ResolvedColumn(s.name).name + val resolved = ResolverUtils + .resolve( + spark, + Seq(woPrefix), + plan, + ResolverUtils.resolveWithChildren, + throwIfNotInSchema = false) + resolved.isDefined || (filesDeleted.nonEmpty && s.name.equals( + IndexConstants.DATA_FILE_NAME_ID)) + }) def fileIndex: InMemoryFileIndex = { new InMemoryFileIndex(spark, filesToRead, Map(), None) @@ -427,9 +439,29 @@ object RuleUtils { new ParquetFileFormat, Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark, index) - val updatedOutput = relation.plan.output - .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name)) - .map(_.asInstanceOf[AttributeReference]) + val resolvedFields = ResolverUtils.resolve( + spark, + (index.indexedColumns ++ index.includedColumns) + .map(ResolverUtils.ResolvedColumn(_).name), + relation.plan) + val updatedOutput = + if (resolvedFields.isDefined && resolvedFields.get.exists(_.isNested)) { + indexFsRelation.schema.flatMap { s => + relation.plan.output + .find { a => + ResolverUtils.ResolvedColumn(s.name).name.startsWith(a.name) + } + .map { a => + AttributeReference(s.name, s.dataType, a.nullable, a.metadata)( + NamedExpression.newExprId, + a.qualifier) + } + } + } else { + relation.plan.output + .filter(attr => indexFsRelation.schema.fieldNames.contains(attr.name)) + .map(_.asInstanceOf[AttributeReference]) + } if (filesDeleted.isEmpty) { relation.createLogicalRelation(indexFsRelation, updatedOutput) @@ -441,6 +473,13 @@ object RuleUtils { val filterForDeleted = Filter(Not(In(lineageAttr, deletedFileIds)), rel) Project(updatedOutput, OptimizeIn(filterForDeleted)) } + + case p: Project if hasNestedColumns(p, index) => + transformProject(p) + + case f: Filter if hasNestedColumns(f, index) => + transformFilter(f) + } if (unhandledAppendedFiles.nonEmpty) { @@ -514,11 +553,17 @@ object RuleUtils { // Set the same output schema with the index plan to merge them using BucketUnion. // Include partition columns for data loading. val partitionColumns = relation.partitionSchema.map(_.name) - val updatedSchema = StructType(relation.plan.schema.filter(col => - index.schema.contains(col) || relation.partitionSchema.contains(col))) + val updatedSchema = StructType(relation.plan.schema.filter { col => + index.schema.exists { i => + ResolverUtils.ResolvedColumn(i.name).name.startsWith(col.name) + } || relation.partitionSchema.contains(col) + }) val updatedOutput = relation.plan.output - .filter(attr => - index.schema.fieldNames.contains(attr.name) || partitionColumns.contains(attr.name)) + .filter { attr => + index.schema.fieldNames.exists { i => + ResolverUtils.ResolvedColumn(i).name.startsWith(attr.name) + } || partitionColumns.contains(attr.name) + } .map(_.asInstanceOf[AttributeReference]) val newRelation = relation.createHadoopFsRelation( newLocation, diff --git a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala index a6c0dd638..625d7c5dc 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala @@ -17,8 +17,8 @@ package com.microsoft.hyperspace.util import org.apache.spark.sql.{Column, SparkSession} -import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetArrayStructFields, GetMapValue, GetStructField} +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetArrayStructFields, GetMapValue, GetStructField, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -200,6 +200,14 @@ object ResolverUtils { plan.resolveQuoted(fieldName, resolver) } + // The resolve children approach for resolving filter field names. + protected[hyperspace] def resolveWithChildren( + fieldName: String, + plan: LogicalPlan, + resolver: Resolver): Option[NamedExpression] = { + plan.resolveChildren(UnresolvedAttribute.parseAttributeName(fieldName), resolver) + } + // Extracts the parts of a nested field access path from an expression. private def extractColumnName(expr: Expression): Seq[String] = { expr match { diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanForNestedFieldsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanForNestedFieldsTest.scala new file mode 100644 index 000000000..b36c1ddec --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanForNestedFieldsTest.scala @@ -0,0 +1,447 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo, In, InSet, Literal, Not} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, RepartitionByExpression, Union} +import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf + +import com.microsoft.hyperspace._ +import com.microsoft.hyperspace.{Hyperspace, SampleNestedData, TestConfig} +import com.microsoft.hyperspace.TestUtils.{latestIndexLogEntry, logManager} +import com.microsoft.hyperspace.index.execution.BucketUnionExec +import com.microsoft.hyperspace.index.plans.logical.BucketUnion +import com.microsoft.hyperspace.util.FileUtils + +// Hybrid Scan tests for non partitioned source data. Test cases of HybridScanSuite are also +// executed with non partitioned source data. +class HybridScanForNestedFieldsTest extends QueryTest with HyperspaceSuite { + override val systemPath = new Path("src/test/resources/hybridScanTestNestedFields") + + val sampleNestedData = SampleNestedData.testData + var hyperspace: Hyperspace = _ + + val fileFormat = "parquet" + + import spark.implicits._ + val nestedDf = sampleNestedData.toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") + val indexConfig1 = + IndexConfig("index1", Seq("nested.leaf.cnt"), Seq("query", "nested.leaf.id")) + val indexConfig2 = + IndexConfig("index2", Seq("nested.leaf.cnt"), Seq("Date", "nested.leaf.id")) + + def normalizePaths(in: Seq[String]): Seq[String] = { + in.map(_.replace("file:///", "file:/")) + } + def equalNormalizedPaths(a: Seq[String], b: Seq[String]): Boolean = { + normalizePaths(a).toSet === normalizePaths(b).toSet + } + + def setupIndexAndChangeData( + sourceFileFormat: String, + sourcePath: String, + indexConfig: IndexConfig, + appendCnt: Int, + deleteCnt: Int): (Seq[String], Seq[String]) = { + nestedDf.write.format(sourceFileFormat).save(sourcePath) + val df = spark.read.format(sourceFileFormat).load(sourcePath) + hyperspace.createIndex(df, indexConfig) + val inputFiles = df.inputFiles + assert(appendCnt + deleteCnt < inputFiles.length) + + val fs = systemPath.getFileSystem(new Configuration) + for (i <- 0 until appendCnt) { + val sourcePath = new Path(inputFiles(i)) + val destPath = new Path(inputFiles(i) + ".copy") + fs.copyToLocalFile(sourcePath, destPath) + } + + for (i <- 1 to deleteCnt) { + fs.delete(new Path(inputFiles(inputFiles.length - i)), false) + } + + val df2 = spark.read.format(sourceFileFormat).load(sourcePath) + (df2.inputFiles diff inputFiles, inputFiles diff df2.inputFiles) + } + + override def beforeAll(): Unit = { + super.beforeAll() + hyperspace = new Hyperspace(spark) + } + + before { + spark.conf.set(IndexConstants.INDEX_LINEAGE_ENABLED, "true") + spark.enableHyperspace() + } + + after { + FileUtils.delete(systemPath) + spark.disableHyperspace() + } + + private def getLatestStableLog(indexName: String): IndexLogEntry = { + val entry = logManager(systemPath, indexName).getLatestStableLog() + assert(entry.isDefined) + assert(entry.get.isInstanceOf[IndexLogEntry]) + entry.get.asInstanceOf[IndexLogEntry] + } + + def checkDeletedFiles( + plan: LogicalPlan, + indexName: String, + expectedDeletedFileNames: Seq[String]): Unit = { + + val fileNameToId = getLatestStableLog(indexName).fileIdTracker.getFileToIdMapping.map { + kv => + (kv._1._1, kv._2) + }.toMap + + val expectedDeletedFiles = + expectedDeletedFileNames.map(f => fileNameToId(f.replace("file:///", "file:/")).toString) + + if (expectedDeletedFiles.nonEmpty) { + log + val inputFiles = plan.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.inputFiles.toSeq + }.flatten + val deletedFilesList = plan collect { + case Filter( + Not(EqualTo(left: Attribute, right: Literal)), + LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // Check new filter condition on lineage column. + val colName = left.toString + val deletedFile = right.toString + assert(colName.contains(IndexConstants.DATA_FILE_NAME_ID)) + val deleted = Seq(deletedFile) + assert(expectedDeletedFiles.length == 1) + // Check the location is replaced with index data files properly. + val files = fsRelation.location.inputFiles + assert(files.nonEmpty && files.forall(_.contains(indexName))) + deleted + case Filter( + Not(InSet(attr, deletedFileIds)), + LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // Check new filter condition on lineage column. + assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_ID)) + val deleted = deletedFileIds.map(_.toString).toSeq + assert( + expectedDeletedFiles.length > spark.conf + .get("spark.sql.optimizer.inSetConversionThreshold") + .toLong) + // Check the location is replaced with index data files properly. + val files = fsRelation.location.inputFiles + assert(files.nonEmpty && files.forall(_.contains(indexName))) + deleted + case Filter( + Not(In(attr, deletedFileIds)), + LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)) => + // Check new filter condition on lineage column. + assert(attr.toString.contains(IndexConstants.DATA_FILE_NAME_ID)) + val deleted = deletedFileIds.map(_.toString) + assert( + expectedDeletedFiles.length <= spark.conf + .get("spark.sql.optimizer.inSetConversionThreshold") + .toLong) + // Check the location is replaced with index data files properly. + val files = fsRelation.location.inputFiles + assert(files.nonEmpty && files.forall(_.contains(indexName))) + deleted + } + assert(deletedFilesList.length === 1) + val deletedFiles = deletedFilesList.flatten + assert(deletedFiles.length === expectedDeletedFiles.size) + assert(deletedFiles.distinct.length === deletedFiles.length) + assert(deletedFiles.forall(f => !inputFiles.contains(f))) + assert(equalNormalizedPaths(deletedFiles, expectedDeletedFiles)) + + val execPlan = spark.sessionState.executePlan(plan).executedPlan + val execNodes = execPlan collect { + case p @ FileSourceScanExec(_, _, _, _, _, dataFilters, _) => + // Check deleted files. + assert(deletedFiles.forall(dataFilters.toString.contains)) + p + } + assert(execNodes.length === 1) + } + } + + def checkJoinIndexHybridScan( + plan: LogicalPlan, + leftIndexName: String, + leftAppended: Seq[String], + leftDeleted: Seq[String], + rightIndexName: String, + rightAppended: Seq[String], + rightDeleted: Seq[String], + filterConditions: Seq[String] = Nil): Unit = { + // Project - Join - children + val left = plan.children.head.children.head + val right = plan.children.head.children.last + + // Check deleted files with the first child of each left and right child. + checkDeletedFiles(left.children.head, leftIndexName, leftDeleted) + checkDeletedFiles(right.children.head, rightIndexName, rightDeleted) + + val leftNodes = left.collect { + case b @ BucketUnion(children, bucketSpec) => + assert(bucketSpec.numBuckets === 200) + assert( + bucketSpec.bucketColumnNames.size === 1 && + bucketSpec.bucketColumnNames.head === "clicks") + + val childNodes = children.collect { + case r @ RepartitionByExpression( + attrs, + Project(_, Filter(_, LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))), + numBucket) => + assert(attrs.size === 1) + assert(attrs.head.asInstanceOf[Attribute].name.contains("clicks")) + + // Check appended file. + val files = fsRelation.location.inputFiles + assert(equalNormalizedPaths(files, leftAppended)) + assert(files.length === leftAppended.length) + assert(numBucket === 200) + r + case p @ Project(_, Filter(_, _)) => + val files = p collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + } + assert(files.nonEmpty && files.flatten.forall(_.contains(leftIndexName))) + p + } + + // BucketUnion has 2 children. + assert(childNodes.size === 2) + assert(childNodes.count(_.isInstanceOf[Project]) === 1) + assert(childNodes.count(_.isInstanceOf[RepartitionByExpression]) === 1) + b + } + + val rightNodes = right.collect { + case b @ BucketUnion(children, bucketSpec) => + assert(bucketSpec.numBuckets === 200) + assert( + bucketSpec.bucketColumnNames.size === 1 && + bucketSpec.bucketColumnNames.head === "clicks") + + val childNodes = children.collect { + case r @ RepartitionByExpression( + attrs, + Project(_, Filter(_, LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))), + numBucket) => + assert(attrs.size === 1) + assert(attrs.head.asInstanceOf[Attribute].name.contains("clicks")) + + // Check appended files. + val files = fsRelation.location.inputFiles + assert(equalNormalizedPaths(files, rightAppended)) + assert(files.length === rightAppended.length) + assert(numBucket === 200) + r + case p @ Project( + _, + Filter(_, LogicalRelation(fsRelation: HadoopFsRelation, _, _, _))) => + // Check index data files. + val files = fsRelation.location.inputFiles + assert(files.nonEmpty && files.forall(_.contains(rightIndexName))) + p + } + + // BucketUnion has 2 children. + assert(childNodes.size === 2) + assert(childNodes.count(_.isInstanceOf[Project]) === 1) + assert(childNodes.count(_.isInstanceOf[RepartitionByExpression]) === 1) + b + } + + // Check BucketUnion node if needed. + assert(leftAppended.isEmpty || leftNodes.count(_.isInstanceOf[BucketUnion]) === 1) + assert(rightAppended.isEmpty || rightNodes.count(_.isInstanceOf[BucketUnion]) === 1) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val execPlan = spark.sessionState.executePlan(plan).executedPlan + val execNodes = execPlan.collect { + case p @ BucketUnionExec(children, bucketSpec) => + assert(children.size === 2) + // children.head is always the index plan. + assert(children.head.isInstanceOf[ProjectExec]) + assert(children.last.isInstanceOf[ShuffleExchangeExec]) + assert(bucketSpec.numBuckets === 200) + p + case p @ FileSourceScanExec(_, _, _, partitionFilters, _, dataFilters, _) => + // Check filter pushed down properly. + if (partitionFilters.nonEmpty) { + assert(filterConditions.forall(partitionFilters.toString.contains)) + } else { + assert(filterConditions.forall(dataFilters.toString.contains)) + } + p + } + var requiredBucketUnion = 0 + if (leftAppended.nonEmpty) requiredBucketUnion += 1 + if (rightAppended.nonEmpty) requiredBucketUnion += 1 + assert(execNodes.count(_.isInstanceOf[BucketUnionExec]) === requiredBucketUnion) + // 2 of index data and number of indexes with appended files. + assert(execNodes.count(_.isInstanceOf[FileSourceScanExec]) === 2 + requiredBucketUnion) + } + } + + test( + "Append-only: union over index and new files due to field names " + + "being different: `__hs_nested.nested.leaf.cnt` + `nested.leaf.cnt`.") { + // This flag is for testing plan transformation if appended files could be load with index + // data scan node. Currently, it's applied for a very specific case: FilterIndexRule, + // Parquet source format, no partitioning, no deleted files. + withTempPathAsString { testPath => + val (appendedFiles, _) = setupIndexAndChangeData( + "parquet", + testPath, + indexConfig1.copy(indexName = "index_Append"), + appendCnt = 1, + deleteCnt = 0) + + val df = spark.read.format("parquet").load(testPath) + def filterQuery: DataFrame = + df.filter(df("nested.leaf.cnt") <= 20).select(df("query")) + + val baseQuery = filterQuery + val basePlan = baseQuery.queryExecution.optimizedPlan + + withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "false") { + val filter = filterQuery + assert(basePlan.equals(filter.queryExecution.optimizedPlan)) + } + + withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) { + val filter = filterQuery + val planWithHybridScan = filter.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHybridScan)) + + // Check appended file is added to relation node or not. + val nodes = planWithHybridScan.collect { + case u @ Union(children) => + val indexChild = children.head + indexChild collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + assert(fsRelation.location.inputFiles.forall(_.contains("index_Append"))) + } + + assert(children.tail.size === 1) + val appendChild = children.last + appendChild collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + val files = fsRelation.location.inputFiles + assert(files.toSeq == appendedFiles) + assert(files.length === appendedFiles.size) + } + u + } + + // Filter Index and Parquet format source file can be handled with 1 LogicalRelation + assert(nodes.length === 1) + val left = baseQuery.collect().map(_.getString(0)) + val right = filter.collect().map(_.getString(0)) + assert(left.diff(right).isEmpty) + assert(right.diff(left).isEmpty) + } + } + } + + test("Delete-only: Hybrid Scan for delete support doesn't work without lineage column.") { + val indexConfig = IndexConfig("index_ParquetDelete2", Seq("nested.leaf.cnt"), Seq("query")) + Seq(("indexWithoutLineage", "false", false), ("indexWithLineage", "true", true)) foreach { + case (indexName, lineageColumnConfig, transformationExpected) => + withTempPathAsString { testPath => + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> lineageColumnConfig) { + setupIndexAndChangeData( + fileFormat, + testPath, + indexConfig.copy(indexName = indexName), + appendCnt = 0, + deleteCnt = 1) + + val df = spark.read.format(fileFormat).load(testPath) + + def filterQuery: DataFrame = + df.filter(df("nested.leaf.cnt") <= 20).select(df("query")) + + val baseQuery = filterQuery + val basePlan = baseQuery.queryExecution.optimizedPlan + withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) { + val filter = filterQuery + assert(basePlan.equals(filter.queryExecution.optimizedPlan)) + } + withSQLConf(TestConfig.HybridScanEnabled: _*) { + val filter = filterQuery + assert( + basePlan + .equals(filter.queryExecution.optimizedPlan) + .equals(!transformationExpected)) + } + } + } + } + } + + test("Delete-only: filter rule, number of delete files threshold.") { + withTempPathAsString { testPath => + val indexName = "IndexDeleteCntTest" + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + setupIndexAndChangeData( + fileFormat, + testPath, + indexConfig1.copy(indexName = indexName), + appendCnt = 0, + deleteCnt = 2) + } + + val df = spark.read.format(fileFormat).load(testPath) + def filterQuery: DataFrame = + df.filter(df("nested.leaf.cnt") <= 20).select(df("query")) + val baseQuery = filterQuery + val basePlan = baseQuery.queryExecution.optimizedPlan + val sourceSize = latestIndexLogEntry(systemPath, indexName).sourceFilesSizeInBytes + + val afterDeleteSize = FileUtils.getDirectorySize(new Path(testPath)) + val deletedRatio = 1 - (afterDeleteSize / sourceSize.toFloat) + + withSQLConf(TestConfig.HybridScanEnabled: _*) { + withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> + (deletedRatio + 0.1).toString) { + val filter = filterQuery + // As deletedRatio is less than the threshold, the index can be applied. + assert(!basePlan.equals(filter.queryExecution.optimizedPlan)) + } + withSQLConf(IndexConstants.INDEX_HYBRID_SCAN_DELETED_RATIO_THRESHOLD -> + (deletedRatio - 0.1).toString) { + val filter = filterQuery + // As deletedRatio is greater than the threshold, the index shouldn't be applied. + assert(basePlan.equals(filter.queryExecution.optimizedPlan)) + } + } + } + } +}