Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/_data/navigation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ docs:
url: /docs/ug-optimize-index/
- title: "Supported data formats"
url: /docs/ug-supported-data-formats/
- title: "Supported index types"
url: /docs/ug-index-types/
- title: "Release Notes"
url: https://github.com/microsoft/hyperspace/releases
# Needs to be written later
Expand Down
315 changes: 315 additions & 0 deletions docs/_docs/09-ug-index-types.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
---
title: "Index types"
permalink: /docs/ug-index-types/
excerpt: "Mutable dataset guide"
last_modified_at: 2021-12-29
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update the date

toc: false
classes: wide
---

# Covering Index Type

In Hyperspace, "covering" index means its index data contains actual source data,
so we could alternatively read from the index data when it is applicable.
It could be whole or partial data, but the data is rearranged as a different data layout
to accelerate some types of Spark queries.

As of v0.6, Hyperspace supports the following two covering indexes:

1. [(Bucketed) Covering Index](#covering-index)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how we can name the covering index.. bucketed? shuffled? 🤔

2. [Z-order Covering Index](#z-order-covering-index) (since v0.6)

## Covering Index

Covering Index is the first type that Hyperspace supports.
It can accelerate join queries and filter queries.

### Index data layout

The index data consists of indexed columns and included columns, and it is stored as bucketed and
sorted within each bucket by indexed columns. "Bucketed" stands for "hash partitioned" simply;
all rows with the same hash value go into the same bucket.
We use 200 buckets by default which is the default value of `spark.sql.shuffle.partitions` config
to increase the possibility of index application.
The bucket number for index data is configurable via `spark.hyperspace.index.numBuckets`.

For example, assume the source data is stored in 1000 files in random order.
At index creation time, Hyperspace performs shuffling and sorting by the indexed columns before
storing index data.
As a result, the index data consists of 200 files (bucket number) by default and within each bucket,
rows are sorted by the indexed columns.

### Applicable queries

#### Sort Merge Join

Sort merge join is a well known expensive operation in Spark because it requires full shuffling
of both left and right dataset. We can optimize the time for shuffling and sorting datasets, as we
already did the required data rearrangement at index creation time; by reading the index data
instead of source data, Spark will skip shuffling & sorting for the join.
Therefore, if some dataset is repeatedly used for Sort merge joins causing shuffles for every query
time, creating a Covering Index would be helpful.
We also support [mutable dataset](https://microsoft.github.io/hyperspace/docs/ug-mutable-dataset/)
for covering index types.

#### Filter conditions for the first indexed column

Queries including filter conditions for the first indexed column can also be accelerated, since
the index data is also sorted within each bucket. Thanks to statistics and data skipping features
of Parquet format, the queries can be accelerated by skipping unnecessary row groups while reading
the data. It could be less effective compared to global sorting, and only applicable for the first
indexed column.

### Index creation API

#### CoveringIndexConfig

To create a Covering Index, `CoveringIndexConfig` should be used at index creation with:
- index name: name of index; should be unique within an index system directory.
- indexed columns: columns to use for shuffling and sorting.
- included columns: columns to include in the index data.

#### Scala
```scala
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index.covering._

val df = spark.read.parquet("<path/to/data>")
val idxConfig = CoveringIndexConfig(
"<indexName>",
Seq("<indexedCol1>", "<indexedCol2>"),
Seq("<includedCol1>"))
val hs = new Hyperspace(sparkSession)
hs.createIndex(df, idxConfig)
```

#### Python
```python
from hyperspace import Hyperspace
from hyperspace import CoveringIndexConfig
hs = Hyperspace(sparkSession)
idxConfig = CoveringIndexConfig('<indexName>', ['<indexedCol1>', '<indexedCol2>'], ['<includedCol1>'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using double quote will be more consistent with the following line. (And I personally prefer double quote 😄)
'<indexName>' -> "<indexName>"

df = spark.read.parquet("<path/to/data>")
hs.createIndex(df, idxConfig)
```

#### Index Specific configs
- `spark.hyperspace.index.numBuckets`

## Z-order Covering Index
Z-order Covering Index type is available since v0.6.
It is kind of sorted data, but multiple columns can contribute the order
which results in similar values could be collocated.

### Index data layout

ZOrderCoveringIndex data is generated by the following:

```scala
sourceData
.withColumn("_zaddr", "<zaddress_calculcation>")
.repartitionByRange(numPartitions, col("_zaddr"))
.sortWithinPartitions("_zaddr")
.drop("_zaddr")
```

`numPartitions` is determined based on `spark.hyperspace.index.zorder.targetSourceBytesPerPartition`.
So there will be numPartitions number of files that sorted by Z-address of indexed columns.
If there is only one indexed column, we just sort the data by the column without calculating Z-address.

### Applicable queries

#### Filter conditions for indexed columns

Queries including filter conditions for any of indexed columns can also be accelerated, as
the index data is sorted by Z-address of indexed column values.
Improvement of query performance is not always guaranteed and depending on the quality of
Z-ordered dataset. For example, if the source data is already sorted by a column, then
Z-ordering the dataset may deteriorate the query performance with a filter condition of
the sorting column. For more information, see below
[Min/Max data layout analysis utility](#minmax-data-layout-analysis-utility-for-z-ordering).


### Index creation API

To create a Z-order Covering Index, `ZOrderCoveringIndexConfig` should be used at index creation with:
- index name: name of index; should be unique within an index system directory.
- indexed columns: columns to use for Z-ordering.
- included columns: columns to include in the index data.

#### Scala
```scala
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index.zordercovering._

val df = spark.read.parquet("<path/to/data>")
val idxConfig = ZOrderCoveringIndexConfig(
"<indexName>",
Seq("<indexedCol1>", "<indexedCol2>"),
Seq("<includedCol1>"))
val hs = new Hyperspace(sparkSession)
hs.createIndex(df, idxConfig)
```

#### Python
```python
from hyperspace import Hyperspace
from hyperspace import ZOrderCoveringIndexConfig
hs = Hyperspace(sparkSession)
idxConfig = ZOrderCoveringIndexConfig('<indexName>', ['<indexedCol1>', '<indexedCol2>'], ['<includedCol1>'])
df = spark.read.parquet("<path/to/data>")
hs.createIndex(df, idxConfig)
```

#### Index Specific configs

- `spark.hyperspace.index.zorder.targetSourceBytesPerPartition`
- `spark.hyperspace.index.zorder.quantile.enabled`
- `spark.hyperspace.index.zorder.quantile.relativeError`

### Min/Max data layout analysis utility for Z-ordering

Z-ordering result might differ based on various factors like data type, value distribution
and also other Z-ordering columns; it is hard to expect that the Z-ordered data is
effective or not. That is the reason why we introduce the analysis functionality for Z-ordering.
It only works for numeric types for now.

Note that it collect the min/max values of each file using spark jobs and generate the result based on them.

#### Result format

The function provides `html` and `text` format to show the result.

##### HTML format
![Result analysis on colA](/hyperspace/assets/images/zordercovering-analysis-zorder-colA.png)

##### Text format
```
Min/Max analysis on colA

< Number of files (%) >
+--------------------------------------------------+
100% | |
| |
| |
| |
75% | |
| |
| |
| |
| |
50% | |
| |
| |
| |
| |
25% | |
| *** |
| *************************************** ***** |
|**************************************************|
|**************************************************|
0% |**************************************************|
+--------------------------------------------------+
Min <----- colA value -----> Max

min(colA): 0
max(colA): 9999999
Total num of files: 100
Total byte size of files: 419650780
Max. num of files for a point lookup: 21 (21.00%)
Estimated average num of files for a point lookup: 17.76 (17.76%)
Max. bytes to read for a point lookup: 88309318 (21.04%)
```

#### How to interpret

- X-axis: it represents the range group of the column values.
- Y-axis: the percentage of number of files to look up a value based on the minimum and maximum value of each file. So lower percentage means better distribution as we could skip more files. To be more specific, it represents the maximum percentage within each range.
- min(colA): the minimum value of the column in the given dataset.
- max(colA): the maximum value of the column in the given dataset.
- Total num of files: total number of files in the given dataset.
- Total byte size of files: summation of file size of the given dataset.
- Max. number of files for a point lookup: the maximum number / percentage of number of files to lookup. It's the highest point of y-axis.
- Estimated average num of files for a point lookup: average of number of files to lookup, excluding the range that does not exist.
- Max. bytes to read for a point lookup: the maximum bytes of files to read for a point lookup.

##### Analysis result on sorted dataset
To understand the result clearly, let us check the result from sorted dataset.

###### Sample data generation
```scala
// Scala
val dataPath = "testDataDir"
val randomDataPath = dataPath + "/randomData"
val sortedDataPath = dataPath + "/sortedData"
spark.range(50000000).map { _ =>
(scala.util.Random.nextInt(10000000).toLong, scala.util.Random.nextInt(1000000000), scala.util.Random.nextInt(2))
}.toDF("colA", "colB", "colC").repartition(100).write.format("parquet").save(randomDataPath)
// 50M rows with random integers stored in 100 parquet files.

val randomDF = spark.read.parquet(randomDataPath)
randomDF.repartitionByRange(100, col("colA")).sortWithinPartitions(col("colA")).write.format("parquet").save(sortedDataPath)
val sortedDF = spark.read.parquet(sortedDataPath)

import com.microsoft.hyperspace.util.MinMaxAnalysisUtil
displayHTML(MinMaxAnalysisUtil.analyze(sortedDF, Seq("colA", "colB"), format = "html")) // format "text" and "html" are available.
// println(MinMaxAnalysisUtil.analyze(sortedDF, Seq("colA", "colB"), format = "text"))
```

###### Result analysis
![Sorted result analysis on colA](/hyperspace/assets/images/zordercovering-analysis-sorted-colA.png)

The maximum number of files to be checked is 1 as the dataset is repartition by range of `colA`.
In other words, we need to check only one file to check if a value exists or not, based on min/max
value of each file. The sorted data layout would be the best performance for `colA`.

However, for `colB`, we should read all the files as its randomly distributed:
![Sorted result analysis on colB](/hyperspace/assets/images/zordercovering-analysis-sorted-colB.png)

Therefore, with this dataset, we cannot accelerate a query with conditions of `colB`.

##### Analyze Z-order covering index data
###### Index creation & analysis
```scala
// Index creation
import com.microsoft.hyperspace.index.zordercovering._
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.util.FileUtils
import org.apache.hadoop.fs.Path

val totalSizeInBytes = FileUtils.getDirectorySize(new Path(randomDataPath))
val sizePerPartition = totalSizeInBytes / 100
spark.conf.set("spark.hyperspace.index.zorder.targetSourceBytesPerPartition", sizePerPartition) // Default: 1G
// Changed per file size for z-order index for demonstration

val df = spark.read.parquet(randomDataPath)
val hs = new Hyperspace(spark)
hs.createIndex(df, ZOrderCoveringIndexConfig("zorderTestIndex", Seq("colA", "colB"), Seq("colC")))

import com.microsoft.hyperspace.util.MinMaxAnalysisUtil
displayHTML(MinMaxAnalysisUtil.analyzeIndex(spark, "zorderTestIndex", "html"))
```

###### Result analysis

![Z-ordered result analysis on colA](/hyperspace/assets/images/zordercovering-analysis-zorder-colA.png)

Compared to sorted dataset, we need to read more files to find a value of `colA` column.
For example, if we have a query `df.filter("colA == 0")` which is the minimum value.
Based on min/max value of each file, we should check about 17 files.
It will show worse performance than above sorted dataset that only one file contains the minimum
value.

![Z-ordered result analysis on colB](/hyperspace/assets/images/zordercovering-analysis-zorder-colB.png)

However, we expect better performance for `colB` as colB values are also considered in the result
ordering. Now we can skip about 80% of files when finding a value of `colB` in the dataset.

### Note

Currently, data skipping for ZOrderCoveringIndex index data relies on statistics and skipping feature
in Parquet format itself. Therefore, we need to read the metadata of all files regardless of how many
files can be skipped. Accessing each file in a remote storage is not usually cheap, and reading more
files also affects spark job scheduling to scan the data.
We can optimize it by utilizing data skipping index or maintaining statistics for each index file
separately in index log entry, but it is not planned yet.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.