diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index 940daf977a11..3d23dc94db97 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -42,6 +42,7 @@ public class Metrics implements IMetrics { public long[] numReplacedWithDynamicFilterRows; public long[] flushRowCount; public long[] loadedToValueHook; + public long[] bloomFilterBlocksByteSize; public long[] skippedSplits; public long[] processedSplits; public long[] skippedStrides; @@ -91,6 +92,7 @@ public Metrics( long[] numReplacedWithDynamicFilterRows, long[] flushRowCount, long[] loadedToValueHook, + long[] bloomFilterBlocksByteSize, long[] scanTime, long[] skippedSplits, long[] processedSplits, @@ -134,6 +136,7 @@ public Metrics( this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; this.flushRowCount = flushRowCount; this.loadedToValueHook = loadedToValueHook; + this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; this.skippedSplits = skippedSplits; this.processedSplits = processedSplits; this.skippedStrides = skippedStrides; @@ -183,6 +186,7 @@ public OperatorMetrics getOperatorMetrics(int index) { numReplacedWithDynamicFilterRows[index], flushRowCount[index], loadedToValueHook[index], + bloomFilterBlocksByteSize[index], scanTime[index], skippedSplits[index], processedSplits[index], diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 0f4ab96cbc3c..10563e507e9b 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -40,6 +40,7 @@ public class OperatorMetrics implements IOperatorMetrics { public long numReplacedWithDynamicFilterRows; public long flushRowCount; public long loadedToValueHook; + public long bloomFilterBlocksByteSize; public long skippedSplits; public long processedSplits; public long skippedStrides; @@ -84,6 +85,7 @@ public OperatorMetrics( long numReplacedWithDynamicFilterRows, long flushRowCount, long loadedToValueHook, + long bloomFilterBlocksByteSize, long scanTime, long skippedSplits, long processedSplits, @@ -125,6 +127,7 @@ public OperatorMetrics( this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; this.flushRowCount = flushRowCount; this.loadedToValueHook = loadedToValueHook; + this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; this.skippedSplits = skippedSplits; this.processedSplits = processedSplits; this.skippedStrides = skippedStrides; diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 80afe9f19f77..29d882c3d3db 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -298,6 +298,9 @@ class VeloxMetricsApi extends MetricsApi with Logging { "loadedToValueHook" -> SQLMetrics.createMetric( sparkContext, "number of pushdown aggregations"), + "bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric( + sparkContext, + "bloom filter blocks byte size"), "rowConstructionCpuCount" -> SQLMetrics.createMetric( sparkContext, "rowConstruction cpu wall time count"), @@ -625,6 +628,9 @@ class VeloxMetricsApi extends MetricsApi with Logging { "hashProbeDynamicFiltersProduced" -> SQLMetrics.createMetric( sparkContext, "number of hash probe dynamic filters produced"), + "bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric( + sparkContext, + "bloom filter blocks byte size"), "streamPreProjectionCpuCount" -> SQLMetrics.createMetric( sparkContext, "stream preProject cpu wall time count"), diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 03781d2fb5e9..ee0866391ce0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -85,6 +85,11 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES) def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES) + + def hashProbeBloomFilterPushdownMaxSize: Long = getConf(HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE) + + def hashProbeDynamicFilterPushdownEnabled: Boolean = + getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED) } object VeloxConfig extends ConfigRegistry { @@ -447,6 +452,22 @@ object VeloxConfig extends ConfigRegistry { .longConf .createWithDefault(4194304L) + val HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE = + buildConf("spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize") + .doc("The maximum byte size of Bloom filter that can be generated from hash probe. When " + + "set to 0, no Bloom filter will be generated. To achieve optimal performance, this should" + + " not be too larger than the CPU cache size on the host.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(0) + + val HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED = + buildConf("spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled") + .doc( + "Whether hash probe can generate any dynamic filter (including Bloom filter) and push" + + " down to upstream operators.") + .booleanConf + .createWithDefault(true) + val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index 103bd00fbfaa..cf894b9da466 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -99,6 +99,8 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) val hashProbeDynamicFiltersProduced: SQLMetric = metrics("hashProbeDynamicFiltersProduced") + val bloomFilterBlocksByteSize: SQLMetric = metrics("bloomFilterBlocksByteSize") + val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount") val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos") @@ -127,6 +129,7 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) hashProbeSpilledFiles += hashProbeMetrics.spilledFiles hashProbeReplacedWithDynamicFilterRows += hashProbeMetrics.numReplacedWithDynamicFilterRows hashProbeDynamicFiltersProduced += hashProbeMetrics.numDynamicFiltersProduced + bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize idx += 1 // HashBuild diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 0624fd1f2db5..607de718ce07 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -122,6 +122,7 @@ object MetricsUtil extends Logging { var numReplacedWithDynamicFilterRows: Long = 0 var flushRowCount: Long = 0 var loadedToValueHook: Long = 0 + var bloomFilterBlocksByteSize: Long = 0 var scanTime: Long = 0 var skippedSplits: Long = 0 var processedSplits: Long = 0 @@ -156,6 +157,7 @@ object MetricsUtil extends Logging { numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows flushRowCount += metrics.flushRowCount loadedToValueHook += metrics.loadedToValueHook + bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize scanTime += metrics.scanTime skippedSplits += metrics.skippedSplits processedSplits += metrics.processedSplits @@ -197,6 +199,7 @@ object MetricsUtil extends Logging { numReplacedWithDynamicFilterRows, flushRowCount, loadedToValueHook, + bloomFilterBlocksByteSize, scanTime, skippedSplits, processedSplits, diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala index 5958baa3771f..5d477180a106 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala @@ -307,4 +307,40 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { runQueryAndCompare(q5) { _ => } } } + + test("Hash probe dynamic filter pushdown") { + withSQLConf( + VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true", + VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576" + ) { + withTable("probe_table", "build_table") { + spark.sql(""" + CREATE TABLE probe_table USING PARQUET + AS SELECT id as a FROM range(110001) + """) + + spark.sql(""" + CREATE TABLE build_table USING PARQUET + AS SELECT id * 1000 as b FROM range(220002) + """) + + runQueryAndCompare( + "SELECT a FROM probe_table JOIN build_table ON a = b" + ) { + df => + val join = find(df.queryExecution.executedPlan) { + case _: BroadcastHashJoinExecTransformer => true + case _ => false + } + assert(join.isDefined) + val metrics = join.get.metrics + assert(metrics.contains("bloomFilterBlocksByteSize")) + assert(metrics("bloomFilterBlocksByteSize").value > 0) + + assert(metrics.contains("hashProbeDynamicFiltersProduced")) + assert(metrics("hashProbeDynamicFiltersProduced").value == 1) + } + } + } + } } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index adada15f91df..be3bbc90e3ef 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -271,7 +271,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metricsBuilderClass, "", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -583,6 +583,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kNumReplacedWithDynamicFilterRows], longArray[Metrics::kFlushRowCount], longArray[Metrics::kLoadedToValueHook], + longArray[Metrics::kBloomFilterBlocksByteSize], longArray[Metrics::kScanTime], longArray[Metrics::kSkippedSplits], longArray[Metrics::kProcessedSplits], diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index c271a28a4f86..8e33a4a9c613 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -69,6 +69,7 @@ struct Metrics { kNumReplacedWithDynamicFilterRows, kFlushRowCount, kLoadedToValueHook, + kBloomFilterBlocksByteSize, kScanTime, kSkippedSplits, kProcessedSplits, diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index e91e2ad69d62..cdfefc2b3378 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -42,6 +42,7 @@ const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted"; const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows"; const std::string kFlushRowCount = "flushRowCount"; const std::string kLoadedToValueHook = "loadedToValueHook"; +const std::string kBloomFilterBlocksByteSize = "bloomFilterSize"; const std::string kTotalScanTime = "totalScanTime"; const std::string kSkippedSplits = "skippedSplits"; const std::string kProcessedSplits = "processedSplits"; @@ -462,6 +463,8 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount); metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] = runtimeMetric("sum", second->customStats, kLoadedToValueHook); + metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] = + runtimeMetric("sum", second->customStats, kBloomFilterBlocksByteSize); metrics_->get(Metrics::kScanTime)[metricIndex] = runtimeMetric("sum", second->customStats, kTotalScanTime); metrics_->get(Metrics::kSkippedSplits)[metricIndex] = runtimeMetric("sum", second->customStats, kSkippedSplits); metrics_->get(Metrics::kProcessedSplits)[metricIndex] = @@ -612,6 +615,11 @@ std::unordered_map WholeStageResultIterator::getQueryC std::to_string(veloxCfg_->get(kBloomFilterNumBits, 8388608)); configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] = std::to_string(veloxCfg_->get(kBloomFilterMaxNumBits, 4194304)); + + configs[velox::core::QueryConfig::kHashProbeDynamicFilterPushdownEnabled] = + std::to_string(veloxCfg_->get(kHashProbeDynamicFilterPushdownEnabled, true)); + configs[velox::core::QueryConfig::kHashProbeBloomFilterPushdownMaxSize] = + std::to_string(veloxCfg_->get(kHashProbeBloomFilterPushdownMaxSize, 0)); // spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver takes no effect if // spark.gluten.sql.columnar.backend.velox.IOThreads is set to 0 configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] = diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 2cacee5369cb..566ce875aacc 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -72,6 +72,12 @@ const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits"; const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver"; +const std::string kHashProbeDynamicFilterPushdownEnabled = + "spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled"; + +const std::string kHashProbeBloomFilterPushdownMaxSize = + "spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize"; + const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished"; const bool kShowTaskMetricsWhenFinishedDefault = false; diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index bd838f357c3d..f4a79c465211 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -33,6 +33,8 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. | | spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. | | spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size | +| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. | +| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. | | spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. | | spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan | | spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan |