[GLUTEN-11383][VL] Allow bloom filter pushdown in hash probe#11392
[GLUTEN-11383][VL] Allow bloom filter pushdown in hash probe#11392rui-mo merged 1 commit intoapache:mainfrom
Conversation
marin-ma
left a comment
There was a problem hiding this comment.
Please run dev/gen-all-config-docs.sh when adding or modifying the configurations.
These configurations are not actually passed to and used by the Velox Backend. Do you plan to enable it in this patch?
| buildConf("spark.gluten.sql.columnar.backend.velox.hash_probe_bloom_filter_pushdown_max_size") | ||
| .doc("The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host.") | ||
| .intConf | ||
| .createWithDefault(0) |
There was a problem hiding this comment.
The configuration type and default value are incorrect.
There was a problem hiding this comment.
Seems like the variable name and the configuration don't match.
val COLUMNAR_VELOX_HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.velox.hash_probe_bloom_filter_pushdown_max_size")
| buildConf("spark.gluten.sql.columnar.backend.velox.hash_probe_dynamic_filter_pushdown_enabled") | ||
| .doc("Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators.") | ||
| .booleanConf | ||
| .createWithDefault(true) |
rui-mo
left a comment
There was a problem hiding this comment.
+1. Could you please pass this configuration to Velox’s query context so that Gluten users can control the behavior via Scala configuration?
f69a085 to
19ba6e9
Compare
| .createWithDefault(4194304L) | ||
|
|
||
| val COLUMNAR_VELOX_HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE = | ||
| buildConf("spark.gluten.sql.columnar.backend.velox.hash_probe_bloom_filter_pushdown_max_size") |
There was a problem hiding this comment.
Should we follow the camel case naming convention for consistency instead of using underscore case?
There was a problem hiding this comment.
The other variables in this scope are all underscore though
There was a problem hiding this comment.
@infvg, let me clarify. I meant the use of hash_probe_bloom_filter_pushdown_max_size in the config string. Should we convert it to camel case for consistency?
There was a problem hiding this comment.
+1 Let's follow the camel case naming for the newly added configurations. If the configuration name is too long, you can break it into multiple parts. e.g. .hashProbe.bloomFilterPushDownMaxSize
For the variable name, please shorten it to HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE
There was a problem hiding this comment.
Ah I see, I changed it. I set it to hashProbe.bloomFilterPushdown.maxSize since it feels more natural since we are setting the max size. Please let me know what you think
|
|
||
| def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES) | ||
|
|
||
| def veloxHashProbeBloomFilterPushdownMaxSize: Long = getConf(COLUMNAR_VELOX_HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE) |
There was a problem hiding this comment.
veloxHashProbeBloomFilterPushdownMaxSize -> hashProbeBloomFilterPushdownMaxSize
d499080 to
e61ad52
Compare
| val metrics = join.get.metrics | ||
|
|
||
| assert(metrics.contains("hashProbeDynamicFiltersProduced")) | ||
| assert(metrics("hashProbeDynamicFiltersProduced").value > 0) |
There was a problem hiding this comment.
I investigated the cause of the test failure and identified the following issues:
- The test uses a broadcast hash join, which causes it to fail at L335. We can set the configuration
spark.sql.autoBroadcastJoinThresholdto -1 to force the use of a shuffled hash join instead. - After applying this change, the metrics assertion still fails because
metrics("hashProbeDynamicFiltersProduced").valueis zero. This happens because, in this scenario, the Velox plan consists ofValuesStreamoperators as the children of the HashJoin, which is a typical join plan in Gluten when shuffle exists, but does not support dynamic filter pushdown. Dynamic filters only take effect when theHashProbechild is aTableScan. I'm not sure whether such a case can be constructed in a Gluten unit test.
-- Project[4][expressions: (n4_2:BIGINT, "n3_2")] -> n4_2:BIGINT
-- Project[3][expressions: (n3_2:BIGINT, "n0_0"), (n3_3:BIGINT, "n1_0")] -> n3_2:BIGINT, n3_3:BIGINT
-- HashJoin[2][INNER n0_0=n1_0] -> n0_0:BIGINT, n1_0:BIGINT
-- ValueStream[0][] -> n0_0:BIGINT
-- ValueStream[1][] -> n1_0:BIGINT
| assert(metrics("hashProbeReplacedWithDynamicFilterRows").value > 0) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
@infvg I made a few changes to the test above and confirmed that bloom filter pushdown is working by printing the following metrics in Velox.
bloomFilter->blocksByteSize(): 144704
numFiltersProduced: 1
A good next step would be to pass the bloomFilter->blocksByteSize() metric from Velox to Gluten (see WholeStageResultIterator::collectMetrics()), and then verify in this test that both blocksByteSize and numFiltersProduced are greater than 0.
This is the test with my modifications:
withSQLConf(
VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true",
VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576"
) {
withTable("probe_table", "build_table") {
spark.sql("""
CREATE TABLE probe_table USING PARQUET
AS SELECT id as a FROM range(110001)
""")
spark.sql("""
CREATE TABLE build_table USING PARQUET
AS SELECT id * 1000 as b FROM range(220002)
""")
runQueryAndCompare(
"SELECT a FROM probe_table JOIN build_table ON a = b"
) {
df =>
val join = find(df.queryExecution.executedPlan) {
case _: BroadcastHashJoinExecTransformer => true
case _ => false
}
assert(join.isDefined)
val metrics = join.get.metrics
// TODO: assert the relevant metrics.
}
}
}
31ed8b4 to
a9fe475
Compare
| "set to 0, no Bloom filter will be generated. To achieve optimal performance, this should" + | ||
| " not be too larger than the CPU cache size on the host.") | ||
| .bytesConf(ByteUnit.BYTE) | ||
| .createWithDefault(0) |
There was a problem hiding this comment.
Do we need to disable the bloom filter pushdown by default?
There was a problem hiding this comment.
Velox has the default set to true & 0:
https://github.com/facebookincubator/velox/blob/10bdc0688f892dcda83cfbcf723f656ba5e1e6b4/velox/core/QueryConfig.h#L1304-L1308
| aggSpilledFiles += aggMetrics.spilledFiles | ||
| flushRowCount += aggMetrics.flushRowCount | ||
| loadedToValueHook += aggMetrics.loadedToValueHook | ||
| bloomFilterBlocksByteSize += aggMetrics.bloomFilterBlocksByteSize |
There was a problem hiding this comment.
It seems like the bloom filter pushdown in aggregate hasn’t been confirmed to actually take effect yet, right?
Added ``spark.gluten.sql.columnar.backend.velox.hash_probe_bloom_filter_pushdown_max_size`` and ``spark.gluten.sql.columnar.backend.velox.hash_probe_dynamic_filter_pushdown_enabled`` as config options for velox
Added
spark.gluten.sql.columnar.backend.velox.hash_probe_bloom_filter_pushdown_max_sizeandspark.gluten.sql.columnar.backend.velox.hash_probe_dynamic_filter_pushdown_enabledas config options for veloxResolves: #11383