Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
package com.microsoft.hyperspace.index.rules

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.index.rules.PlanUtils._
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
Expand Down Expand Up @@ -53,7 +54,7 @@ object FilterIndexRule
case ExtractFilterNode(originalPlan, filter, outputColumns, filterColumns) =>
try {
val candidateIndexes =
findCoveringIndexes(filter, outputColumns, filterColumns)
findCoveringIndexes(filter, outputColumns, filterColumns, plan)
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
case Some(index) =>
// As FilterIndexRule is not intended to support bucketed scan, we set
Expand Down Expand Up @@ -94,12 +95,14 @@ object FilterIndexRule
* @param filter Filter node in the subplan that is being optimized.
* @param outputColumns List of output columns in subplan.
* @param filterColumns List of columns in filter predicate.
* @param plan The Logical Plan that contains information about the field.
* @return List of available candidate indexes on fsRelation for the given columns.
*/
private def findCoveringIndexes(
filter: Filter,
outputColumns: Seq[String],
filterColumns: Seq[String]): Seq[IndexLogEntry] = {
filterColumns: Seq[String],
plan: LogicalPlan): Seq[IndexLogEntry] = {
RuleUtils.getRelation(spark, filter) match {
case Some(r) =>
val indexManager = Hyperspace
Expand All @@ -111,20 +114,41 @@ object FilterIndexRule
// See https://github.com/microsoft/hyperspace/issues/65
val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE))

val candidateIndexes = allIndexes.filter { index =>
indexCoversPlan(
outputColumns,
filterColumns,
index.indexedColumns,
index.includedColumns)
def resolveWithChildren(fieldName: String, plan: LogicalPlan, resolver: Resolver) = {
plan.resolveChildren(UnresolvedAttribute.parseAttributeName(fieldName), resolver)
}

// Get candidate via file-level metadata validation. This is performed after pruning
// by column schema, as this might be expensive when there are numerous files in the
// relation or many indexes to be checked.
RuleUtils.getCandidateIndexes(spark, candidateIndexes, r)

case None => Nil // There is zero or more than one supported relations in Filter's sub-plan.
val resolvedOutputColumnsOpt = ResolverUtils.resolve(
spark,
outputColumns,
plan,
resolveWithChildren,
throwIfNotInSchema = false)
val resolvedFilterColumnsOpt = ResolverUtils.resolve(
spark,
filterColumns,
plan,
resolveWithChildren,
throwIfNotInSchema = false)

(resolvedOutputColumnsOpt, resolvedFilterColumnsOpt) match {
case (Some(resolvedOutputColumns), Some(resolvedFilterColumns)) =>
val candidateIndexes = allIndexes.filter { index =>
indexCoversPlan(
resolvedOutputColumns.map(_.normalizedName),
resolvedFilterColumns.map(_.normalizedName),
index.indexedColumns,
index.includedColumns)
}

// Get candidate via file-level metadata validation. This is performed after pruning
// by column schema, as this might be expensive when there are numerous files in the
// relation or many indexes to be checked.
RuleUtils.getCandidateIndexes(spark, candidateIndexes, r)

case _ => Nil
}
case _ => Nil // There is zero or more than one supported relations in Filter's sub-plan.
}
}

Expand All @@ -136,7 +160,6 @@ object FilterIndexRule
* @param filterColumns List of columns in filter predicate.
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
* @param includedColumns List of included columns (e.g. from an index being checked)
* @param fileFormat FileFormat for input relation in original logical plan.
* @return 'true' if
* 1. Index fully covers output and filter columns, and
* 2. Filter predicate contains first column in index's 'indexed' columns.
Expand Down Expand Up @@ -165,12 +188,35 @@ object ExtractFilterNode {
def unapply(plan: LogicalPlan): Option[returnType] = plan match {
case project @ Project(_, filter @ Filter(condition: Expression, ExtractRelation(relation)))
if !RuleUtils.isIndexApplied(relation) =>
/**
* Due to the fact that there is a different way to get their full nested access path,
* we need to go through project columns and filter solumns to extract the columns
* that are required in the query to be part of the index in order to properly work.
*
* For example, given the following simple plan:
* {{{
* Project [id#100, name#101, nested#102.nst.field2]
* +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
* +- Relation[id#100,name#101,nested#102] parquet
* }}}
*
* It should be transformed to
* {{{
* Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
* +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) &&
* (__hs_nested.nested.nst.field1#0 = wa1))
* +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,
* __hs_nested.nested.nst.field2#3]
* Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
*
* }}}
*/
val projectColumnNames = CleanupAliases(project)
.asInstanceOf[Project]
.projectList
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
.map(i => extractNamesFromExpression(i).toKeep)
.flatMap(_.toSeq)
val filterColumnNames = condition.references.map(_.name).toSeq
val filterColumnNames = extractNamesFromExpression(condition).toKeep.toSeq

Some(project, filter, projectColumnNames, filterColumnNames)

Expand Down
195 changes: 195 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/rules/PlanUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.rules

import scala.util.Try

import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BinaryExpression, Expression, GetStructField, IsNotNull, UnaryExpression}
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.util.ResolverUtils

object PlanUtils {

private[hyperspace] case class ExtractedNames(toKeep: Set[String], toDiscard: Set[String])

/**
* The method extract field names from a Spark Catalyst [[Expression]].
*
* @param exp The Spark Catalyst expression from which to extract names.
* @return A set of distinct field names.
*/
def extractNamesFromExpression(exp: Expression): ExtractedNames = {

def extractNames(
e: Expression,
prevExpStrTypes: Seq[String] = Seq.empty): Set[(String, Seq[String])] = {
e match {
case g: GetStructField =>
Set((s"${getChildNameFromStruct(g)}", prevExpStrTypes :+ "getStructField"))
case AttributeReference(name, _, _, _) =>
Set((s"$name", prevExpStrTypes :+ "attrRef"))
case Alias(child, _) =>
extractNames(child, prevExpStrTypes :+ "alias")
case b: BinaryExpression =>
val leftFields = extractNames(b.left, prevExpStrTypes :+ "binaryLeft")
val rightFields = extractNames(b.right, prevExpStrTypes :+ "binaryRight")
leftFields ++ rightFields
case u: IsNotNull =>
extractNames(u.child, prevExpStrTypes :+ "isNotNull")
case u: UnaryExpression =>
extractNames(u.child, prevExpStrTypes :+ "unary")
case _ =>
Set.empty[(String, Seq[String])]
}
}

var toRemove = Seq.empty[String]
val toKeep = extractNames(exp).toSeq
.sortBy(-_._1.length)
.foldLeft(Seq.empty[String]) { (acc, e) =>
val (fieldName, expStrType) = e
if (expStrType.contains("isNotNull") && acc.exists(i => i.startsWith(fieldName))) {
toRemove :+= fieldName
acc
} else {
acc :+ fieldName
}
}
ExtractedNames(toKeep.toSet, toRemove.toSet)
}

/**
* Given a [[GetStructField]] expression for a nested field (aka a struct)
* the method will extract the full field `.` (dot) separated name.
*
* @param field The [[GetStructField]] field from which we want to extract
* the name.
* @return A field name `.` (dot) separated if nested.
*/
def getChildNameFromStruct(field: GetStructField): String = {
field.child match {
case f: GetStructField =>
s"${getChildNameFromStruct(f)}.${field.name.get}"
case a: AttributeReference =>
s"${a.name}.${field.name.get}"
case _ =>
s"${field.name.get}"
}
}

/**
* Given an Spark Catalyst [[Expression]] and a field name the method extracts
* the parent search expression and the expression that contains the field name.
*
* @param exp The Spark Catalyst [[Expression]] to extract from.
* @param name The field name to search for.
* @return A tuple with the parent expression and the leaf expression that
* contains the given name.
*/
def extractSearchQuery(exp: Expression, name: String): (Expression, Expression) = {
val splits = name.split("\\.")
val expFound = exp.find {
case a: AttributeReference if splits.forall(s => a.name.contains(s)) => true
case f: GetStructField if splits.forall(s => f.toString().contains(s)) => true
case _ => false
}.get
val parent = exp.find {
case e: Expression if e.containsChild.contains(expFound) => true
case _ => false
}.get
(parent, expFound)
}

/**
* Given an Spark Catalyst [[Expression]], a needle [[Expression]] and a replace
* [[Expression]] the method will replace the needle with the replacement into
* the parent expression.
*
* @param parent The parent Spark Catalyst [[Expression]] into which to replace.
* @param needle The Spark Catalyst [[Expression]] needle to search for.
* @param repl The replacement Spark Catalyst [[Expression]].
* @return A new Spark Catalyst [[Expression]].
*/
def replaceExpression(parent: Expression, needle: Expression, repl: Expression): Expression = {
parent.mapChildren { c =>
if (c == needle) {
repl
} else {
c
}
}
}

/**
* Given an Spark Catalyst [[Expression]] and a field name the method
* extracts the [[AttributeReference]] for that field name.
*
* @param exp The Spark Catalyst [[Expression]] to extract from.
* @param name The field name for which to extract the attribute reference.
* @return A Spark Catalyst [[AttributeReference]] pointing to the field name.
*/
def extractAttributeRef(exp: Expression, name: String): AttributeReference = {
val splits = name.split("\\.")
val elem = exp.find {
case a: AttributeReference if name == a.name || splits.contains(a.name) => true
case _ => false
}
elem.get.asInstanceOf[AttributeReference]
}

/**
* Given a Spark Catalyst [[Expression]] and a field name the method
* extracts the type of the field as a Spark SQL [[DataType]].
*
* @param exp The Spark Catalyst [[Expression]] from which to extract the type.
* @param name The field name for which we need to get the type.
* @return A Spark SQL [[DataType]] of the given field name.
*/
def extractTypeFromExpression(exp: Expression, name: String): DataType = {
val splits = name.split("\\.")
val elem = exp.flatMap {
case a: AttributeReference =>
if (splits.forall(s => a.name == s)) {
Some((name, a.dataType))
} else {
Try({
val h :: t = splits.toList
if (a.name == h && a.dataType.isInstanceOf[StructType]) {
val itFields = t.flatMap { i =>
a.dataType
.asInstanceOf[StructType]
.find(_.name.equalsIgnoreCase(i))
.map(j => (i, j.dataType))
}
Some(itFields.last)
} else {
None
}
}).getOrElse(None)
}
case f: GetStructField if splits.forall(s => f.toString().contains(s)) =>
Some((name, f.dataType))
case _ => None
}
elem.find(e => e._1 == name || e._1 == splits.last).get._2
}

def prefixNestedField(fieldName: String): String = {
ResolverUtils.ResolvedColumn(fieldName, fieldName.contains(".")).normalizedName
}
}
Loading