From b7bc21238de55fa55e8c72af82609502d39da9d9 Mon Sep 17 00:00:00 2001 From: EJ Song Date: Thu, 13 Jan 2022 16:35:17 -0800 Subject: [PATCH] Support MinMaxAnalysisUtil.analyzeIndex API --- .../hyperspace/util/MinMaxAnalysisUtil.scala | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/util/MinMaxAnalysisUtil.scala b/src/main/scala/com/microsoft/hyperspace/util/MinMaxAnalysisUtil.scala index 603a47b4b..64227d1f5 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/MinMaxAnalysisUtil.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/MinMaxAnalysisUtil.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.functions.{col, input_file_name, max, min} import org.apache.spark.sql.types.{StructField, StructType} -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} +import com.microsoft.hyperspace.index.IndexLogEntry case class MinMaxAnalysisResult( colName: String, @@ -777,4 +778,31 @@ object MinMaxAnalysisUtil extends MinMaxAnalysis { def analyze(df: DataFrame, colNames: Seq[String]): String = { analyze(df, colNames, "text") } + + private def latestIndexEntry(spark: SparkSession, indexName: String): IndexLogEntry = { + val idxManager = Hyperspace.getContext(spark).indexCollectionManager + val latestVer = idxManager.getIndexVersions(indexName, Seq("ACTIVE")).max + val indexEntry = idxManager.getIndex(indexName, latestVer).get + + if (!Seq("CoveringIndex", "ZOrderCoveringIndex").contains(indexEntry.derivedDataset.kind)) { + throw HyperspaceException(s"Does not support index type: ${indexEntry.derivedDataset.kind}") + } + indexEntry + } + + def analyzeIndex( + spark: SparkSession, + indexName: String, + colNames: Seq[String], + format: String): String = { + val indexEntry = latestIndexEntry(spark, indexName) + val df = spark.read.parquet(indexEntry.content.files.map(_.toString): _*) + analyze(df, colNames, format) + } + + def analyzeIndex(spark: SparkSession, indexName: String, format: String): String = { + val indexEntry = latestIndexEntry(spark, indexName) + val df = spark.read.parquet(indexEntry.content.files.map(_.toString): _*) + analyze(df, indexEntry.indexedColumns, format) + } }