diff --git a/docs/_data/navigation.yml b/docs/_data/navigation.yml index 2bff21dac..dfff2f147 100644 --- a/docs/_data/navigation.yml +++ b/docs/_data/navigation.yml @@ -21,6 +21,8 @@ docs: url: /docs/ug-optimize-index/ - title: "Supported data formats" url: /docs/ug-supported-data-formats/ + - title: "Supported index types" + url: /docs/ug-index-types/ - title: "Release Notes" url: https://github.com/microsoft/hyperspace/releases # Needs to be written later diff --git a/docs/_docs/09-ug-index-types.md b/docs/_docs/09-ug-index-types.md new file mode 100644 index 000000000..3c043bb1d --- /dev/null +++ b/docs/_docs/09-ug-index-types.md @@ -0,0 +1,315 @@ +--- +title: "Index types" +permalink: /docs/ug-index-types/ +excerpt: "Mutable dataset guide" +last_modified_at: 2021-12-29 +toc: false +classes: wide +--- + +# Covering Index Type + +In Hyperspace, "covering" index means its index data contains actual source data, +so we could alternatively read from the index data when it is applicable. +It could be whole or partial data, but the data is rearranged as a different data layout +to accelerate some types of Spark queries. + +As of v0.6, Hyperspace supports the following two covering indexes: + +1. [(Bucketed) Covering Index](#covering-index) +2. [Z-order Covering Index](#z-order-covering-index) (since v0.6) + +## Covering Index + +Covering Index is the first type that Hyperspace supports. +It can accelerate join queries and filter queries. + +### Index data layout + +The index data consists of indexed columns and included columns, and it is stored as bucketed and +sorted within each bucket by indexed columns. "Bucketed" stands for "hash partitioned" simply; +all rows with the same hash value go into the same bucket. +We use 200 buckets by default which is the default value of `spark.sql.shuffle.partitions` config +to increase the possibility of index application. +The bucket number for index data is configurable via `spark.hyperspace.index.numBuckets`. + +For example, assume the source data is stored in 1000 files in random order. +At index creation time, Hyperspace performs shuffling and sorting by the indexed columns before +storing index data. +As a result, the index data consists of 200 files (bucket number) by default and within each bucket, +rows are sorted by the indexed columns. + +### Applicable queries + +#### Sort Merge Join + +Sort merge join is a well known expensive operation in Spark because it requires full shuffling +of both left and right dataset. We can optimize the time for shuffling and sorting datasets, as we +already did the required data rearrangement at index creation time; by reading the index data +instead of source data, Spark will skip shuffling & sorting for the join. +Therefore, if some dataset is repeatedly used for Sort merge joins causing shuffles for every query +time, creating a Covering Index would be helpful. +We also support [mutable dataset](https://microsoft.github.io/hyperspace/docs/ug-mutable-dataset/) +for covering index types. + +#### Filter conditions for the first indexed column + +Queries including filter conditions for the first indexed column can also be accelerated, since +the index data is also sorted within each bucket. Thanks to statistics and data skipping features +of Parquet format, the queries can be accelerated by skipping unnecessary row groups while reading +the data. It could be less effective compared to global sorting, and only applicable for the first +indexed column. + +### Index creation API + +#### CoveringIndexConfig + +To create a Covering Index, `CoveringIndexConfig` should be used at index creation with: +- index name: name of index; should be unique within an index system directory. +- indexed columns: columns to use for shuffling and sorting. +- included columns: columns to include in the index data. + +#### Scala +```scala +import com.microsoft.hyperspace._ +import com.microsoft.hyperspace.index.covering._ + +val df = spark.read.parquet("") +val idxConfig = CoveringIndexConfig( + "", + Seq("", ""), + Seq("")) +val hs = new Hyperspace(sparkSession) +hs.createIndex(df, idxConfig) +``` + +#### Python +```python +from hyperspace import Hyperspace +from hyperspace import CoveringIndexConfig +hs = Hyperspace(sparkSession) +idxConfig = CoveringIndexConfig('', ['', ''], ['']) +df = spark.read.parquet("") +hs.createIndex(df, idxConfig) +``` + +#### Index Specific configs +- `spark.hyperspace.index.numBuckets` + +## Z-order Covering Index +Z-order Covering Index type is available since v0.6. +It is kind of sorted data, but multiple columns can contribute the order +which results in similar values could be collocated. + +### Index data layout + +ZOrderCoveringIndex data is generated by the following: + +```scala +sourceData + .withColumn("_zaddr", "") + .repartitionByRange(numPartitions, col("_zaddr")) + .sortWithinPartitions("_zaddr") + .drop("_zaddr") +``` + +`numPartitions` is determined based on `spark.hyperspace.index.zorder.targetSourceBytesPerPartition`. +So there will be numPartitions number of files that sorted by Z-address of indexed columns. +If there is only one indexed column, we just sort the data by the column without calculating Z-address. + +### Applicable queries + +#### Filter conditions for indexed columns + +Queries including filter conditions for any of indexed columns can also be accelerated, as +the index data is sorted by Z-address of indexed column values. +Improvement of query performance is not always guaranteed and depending on the quality of +Z-ordered dataset. For example, if the source data is already sorted by a column, then +Z-ordering the dataset may deteriorate the query performance with a filter condition of +the sorting column. For more information, see below +[Min/Max data layout analysis utility](#minmax-data-layout-analysis-utility-for-z-ordering). + + +### Index creation API + +To create a Z-order Covering Index, `ZOrderCoveringIndexConfig` should be used at index creation with: +- index name: name of index; should be unique within an index system directory. +- indexed columns: columns to use for Z-ordering. +- included columns: columns to include in the index data. + +#### Scala +```scala +import com.microsoft.hyperspace._ +import com.microsoft.hyperspace.index.zordercovering._ + +val df = spark.read.parquet("") +val idxConfig = ZOrderCoveringIndexConfig( + "", + Seq("", ""), + Seq("")) +val hs = new Hyperspace(sparkSession) +hs.createIndex(df, idxConfig) +``` + +#### Python +```python +from hyperspace import Hyperspace +from hyperspace import ZOrderCoveringIndexConfig +hs = Hyperspace(sparkSession) +idxConfig = ZOrderCoveringIndexConfig('', ['', ''], ['']) +df = spark.read.parquet("") +hs.createIndex(df, idxConfig) +``` + +#### Index Specific configs + +- `spark.hyperspace.index.zorder.targetSourceBytesPerPartition` +- `spark.hyperspace.index.zorder.quantile.enabled` +- `spark.hyperspace.index.zorder.quantile.relativeError` + +### Min/Max data layout analysis utility for Z-ordering + +Z-ordering result might differ based on various factors like data type, value distribution +and also other Z-ordering columns; it is hard to expect that the Z-ordered data is +effective or not. That is the reason why we introduce the analysis functionality for Z-ordering. +It only works for numeric types for now. + +Note that it collect the min/max values of each file using spark jobs and generate the result based on them. + +#### Result format + +The function provides `html` and `text` format to show the result. + +##### HTML format +![Result analysis on colA](/hyperspace/assets/images/zordercovering-analysis-zorder-colA.png) + +##### Text format +``` +Min/Max analysis on colA + + < Number of files (%) > + +--------------------------------------------------+ +100% | | + | | + | | + | | + 75% | | + | | + | | + | | + | | + 50% | | + | | + | | + | | + | | + 25% | | + | *** | + | *************************************** ***** | + |**************************************************| + |**************************************************| + 0% |**************************************************| + +--------------------------------------------------+ + Min <----- colA value -----> Max + +min(colA): 0 +max(colA): 9999999 +Total num of files: 100 +Total byte size of files: 419650780 +Max. num of files for a point lookup: 21 (21.00%) +Estimated average num of files for a point lookup: 17.76 (17.76%) +Max. bytes to read for a point lookup: 88309318 (21.04%) +``` + +#### How to interpret + +- X-axis: it represents the range group of the column values. +- Y-axis: the percentage of number of files to look up a value based on the minimum and maximum value of each file. So lower percentage means better distribution as we could skip more files. To be more specific, it represents the maximum percentage within each range. +- min(colA): the minimum value of the column in the given dataset. +- max(colA): the maximum value of the column in the given dataset. +- Total num of files: total number of files in the given dataset. +- Total byte size of files: summation of file size of the given dataset. +- Max. number of files for a point lookup: the maximum number / percentage of number of files to lookup. It's the highest point of y-axis. +- Estimated average num of files for a point lookup: average of number of files to lookup, excluding the range that does not exist. +- Max. bytes to read for a point lookup: the maximum bytes of files to read for a point lookup. + +##### Analysis result on sorted dataset +To understand the result clearly, let us check the result from sorted dataset. + +###### Sample data generation +```scala +// Scala +val dataPath = "testDataDir" +val randomDataPath = dataPath + "/randomData" +val sortedDataPath = dataPath + "/sortedData" +spark.range(50000000).map { _ => + (scala.util.Random.nextInt(10000000).toLong, scala.util.Random.nextInt(1000000000), scala.util.Random.nextInt(2)) +}.toDF("colA", "colB", "colC").repartition(100).write.format("parquet").save(randomDataPath) +// 50M rows with random integers stored in 100 parquet files. + +val randomDF = spark.read.parquet(randomDataPath) +randomDF.repartitionByRange(100, col("colA")).sortWithinPartitions(col("colA")).write.format("parquet").save(sortedDataPath) +val sortedDF = spark.read.parquet(sortedDataPath) + +import com.microsoft.hyperspace.util.MinMaxAnalysisUtil +displayHTML(MinMaxAnalysisUtil.analyze(sortedDF, Seq("colA", "colB"), format = "html")) // format "text" and "html" are available. +// println(MinMaxAnalysisUtil.analyze(sortedDF, Seq("colA", "colB"), format = "text")) +``` + +###### Result analysis +![Sorted result analysis on colA](/hyperspace/assets/images/zordercovering-analysis-sorted-colA.png) + +The maximum number of files to be checked is 1 as the dataset is repartition by range of `colA`. +In other words, we need to check only one file to check if a value exists or not, based on min/max +value of each file. The sorted data layout would be the best performance for `colA`. + +However, for `colB`, we should read all the files as its randomly distributed: +![Sorted result analysis on colB](/hyperspace/assets/images/zordercovering-analysis-sorted-colB.png) + +Therefore, with this dataset, we cannot accelerate a query with conditions of `colB`. + +##### Analyze Z-order covering index data +###### Index creation & analysis +```scala +// Index creation +import com.microsoft.hyperspace.index.zordercovering._ +import com.microsoft.hyperspace._ +import com.microsoft.hyperspace.util.FileUtils +import org.apache.hadoop.fs.Path + +val totalSizeInBytes = FileUtils.getDirectorySize(new Path(randomDataPath)) +val sizePerPartition = totalSizeInBytes / 100 +spark.conf.set("spark.hyperspace.index.zorder.targetSourceBytesPerPartition", sizePerPartition) // Default: 1G +// Changed per file size for z-order index for demonstration + +val df = spark.read.parquet(randomDataPath) +val hs = new Hyperspace(spark) +hs.createIndex(df, ZOrderCoveringIndexConfig("zorderTestIndex", Seq("colA", "colB"), Seq("colC"))) + +import com.microsoft.hyperspace.util.MinMaxAnalysisUtil +displayHTML(MinMaxAnalysisUtil.analyzeIndex(spark, "zorderTestIndex", "html")) +``` + +###### Result analysis + +![Z-ordered result analysis on colA](/hyperspace/assets/images/zordercovering-analysis-zorder-colA.png) + +Compared to sorted dataset, we need to read more files to find a value of `colA` column. +For example, if we have a query `df.filter("colA == 0")` which is the minimum value. +Based on min/max value of each file, we should check about 17 files. +It will show worse performance than above sorted dataset that only one file contains the minimum +value. + +![Z-ordered result analysis on colB](/hyperspace/assets/images/zordercovering-analysis-zorder-colB.png) + +However, we expect better performance for `colB` as colB values are also considered in the result +ordering. Now we can skip about 80% of files when finding a value of `colB` in the dataset. + +### Note + +Currently, data skipping for ZOrderCoveringIndex index data relies on statistics and skipping feature +in Parquet format itself. Therefore, we need to read the metadata of all files regardless of how many +files can be skipped. Accessing each file in a remote storage is not usually cheap, and reading more +files also affects spark job scheduling to scan the data. +We can optimize it by utilizing data skipping index or maintaining statistics for each index file +separately in index log entry, but it is not planned yet. \ No newline at end of file diff --git a/docs/assets/images/zordercovering-analysis-sorted-colA.png b/docs/assets/images/zordercovering-analysis-sorted-colA.png new file mode 100644 index 000000000..d9abbe3cc Binary files /dev/null and b/docs/assets/images/zordercovering-analysis-sorted-colA.png differ diff --git a/docs/assets/images/zordercovering-analysis-sorted-colB.png b/docs/assets/images/zordercovering-analysis-sorted-colB.png new file mode 100644 index 000000000..271c5f365 Binary files /dev/null and b/docs/assets/images/zordercovering-analysis-sorted-colB.png differ diff --git a/docs/assets/images/zordercovering-analysis-zorder-colA.png b/docs/assets/images/zordercovering-analysis-zorder-colA.png new file mode 100644 index 000000000..9c0ce31d0 Binary files /dev/null and b/docs/assets/images/zordercovering-analysis-zorder-colA.png differ diff --git a/docs/assets/images/zordercovering-analysis-zorder-colB.png b/docs/assets/images/zordercovering-analysis-zorder-colB.png new file mode 100644 index 000000000..368ff37af Binary files /dev/null and b/docs/assets/images/zordercovering-analysis-zorder-colB.png differ