Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class Metrics implements IMetrics {
public long[] numReplacedWithDynamicFilterRows;
public long[] flushRowCount;
public long[] loadedToValueHook;
public long[] bloomFilterBlocksByteSize;
public long[] skippedSplits;
public long[] processedSplits;
public long[] skippedStrides;
Expand Down Expand Up @@ -91,6 +92,7 @@ public Metrics(
long[] numReplacedWithDynamicFilterRows,
long[] flushRowCount,
long[] loadedToValueHook,
long[] bloomFilterBlocksByteSize,
long[] scanTime,
long[] skippedSplits,
long[] processedSplits,
Expand Down Expand Up @@ -134,6 +136,7 @@ public Metrics(
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
this.skippedSplits = skippedSplits;
this.processedSplits = processedSplits;
this.skippedStrides = skippedStrides;
Expand Down Expand Up @@ -183,6 +186,7 @@ public OperatorMetrics getOperatorMetrics(int index) {
numReplacedWithDynamicFilterRows[index],
flushRowCount[index],
loadedToValueHook[index],
bloomFilterBlocksByteSize[index],
scanTime[index],
skippedSplits[index],
processedSplits[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long numReplacedWithDynamicFilterRows;
public long flushRowCount;
public long loadedToValueHook;
public long bloomFilterBlocksByteSize;
public long skippedSplits;
public long processedSplits;
public long skippedStrides;
Expand Down Expand Up @@ -84,6 +85,7 @@ public OperatorMetrics(
long numReplacedWithDynamicFilterRows,
long flushRowCount,
long loadedToValueHook,
long bloomFilterBlocksByteSize,
long scanTime,
long skippedSplits,
long processedSplits,
Expand Down Expand Up @@ -125,6 +127,7 @@ public OperatorMetrics(
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.flushRowCount = flushRowCount;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
this.skippedSplits = skippedSplits;
this.processedSplits = processedSplits;
this.skippedStrides = skippedStrides;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"loadedToValueHook" -> SQLMetrics.createMetric(
sparkContext,
"number of pushdown aggregations"),
"bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric(
sparkContext,
"bloom filter blocks byte size"),
"rowConstructionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"rowConstruction cpu wall time count"),
Expand Down Expand Up @@ -625,6 +628,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"hashProbeDynamicFiltersProduced" -> SQLMetrics.createMetric(
sparkContext,
"number of hash probe dynamic filters produced"),
"bloomFilterBlocksByteSize" -> SQLMetrics.createSizeMetric(
sparkContext,
"bloom filter blocks byte size"),
"streamPreProjectionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"stream preProject cpu wall time count"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)

def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)

def hashProbeBloomFilterPushdownMaxSize: Long = getConf(HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE)

def hashProbeDynamicFilterPushdownEnabled: Boolean =
getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED)
}

object VeloxConfig extends ConfigRegistry {
Expand Down Expand Up @@ -447,6 +452,22 @@ object VeloxConfig extends ConfigRegistry {
.longConf
.createWithDefault(4194304L)

val HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize")
.doc("The maximum byte size of Bloom filter that can be generated from hash probe. When " +
"set to 0, no Bloom filter will be generated. To achieve optimal performance, this should" +
" not be too larger than the CPU cache size on the host.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration type and default value are incorrect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the variable name and the configuration don't match.

val COLUMNAR_VELOX_HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED =
    buildConf("spark.gluten.sql.columnar.backend.velox.hash_probe_bloom_filter_pushdown_max_size")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to disable the bloom filter pushdown by default?

Copy link
Contributor Author

@infvg infvg Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


val HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled")
.doc(
"Whether hash probe can generate any dynamic filter (including Bloom filter) and push" +
" down to upstream operators.")
.booleanConf
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
val hashProbeDynamicFiltersProduced: SQLMetric =
metrics("hashProbeDynamicFiltersProduced")

val bloomFilterBlocksByteSize: SQLMetric = metrics("bloomFilterBlocksByteSize")

val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount")
val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos")

Expand Down Expand Up @@ -127,6 +129,7 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
hashProbeSpilledFiles += hashProbeMetrics.spilledFiles
hashProbeReplacedWithDynamicFilterRows += hashProbeMetrics.numReplacedWithDynamicFilterRows
hashProbeDynamicFiltersProduced += hashProbeMetrics.numDynamicFiltersProduced
bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize
idx += 1

// HashBuild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ object MetricsUtil extends Logging {
var numReplacedWithDynamicFilterRows: Long = 0
var flushRowCount: Long = 0
var loadedToValueHook: Long = 0
var bloomFilterBlocksByteSize: Long = 0
var scanTime: Long = 0
var skippedSplits: Long = 0
var processedSplits: Long = 0
Expand Down Expand Up @@ -156,6 +157,7 @@ object MetricsUtil extends Logging {
numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows
flushRowCount += metrics.flushRowCount
loadedToValueHook += metrics.loadedToValueHook
bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
scanTime += metrics.scanTime
skippedSplits += metrics.skippedSplits
processedSplits += metrics.processedSplits
Expand Down Expand Up @@ -197,6 +199,7 @@ object MetricsUtil extends Logging {
numReplacedWithDynamicFilterRows,
flushRowCount,
loadedToValueHook,
bloomFilterBlocksByteSize,
scanTime,
skippedSplits,
processedSplits,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,40 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
runQueryAndCompare(q5) { _ => }
}
}

test("Hash probe dynamic filter pushdown") {
withSQLConf(
VeloxConfig.HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED.key -> "true",
VeloxConfig.HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE.key -> "1048576"
) {
withTable("probe_table", "build_table") {
spark.sql("""
CREATE TABLE probe_table USING PARQUET
AS SELECT id as a FROM range(110001)
""")

spark.sql("""
CREATE TABLE build_table USING PARQUET
AS SELECT id * 1000 as b FROM range(220002)
""")

runQueryAndCompare(
"SELECT a FROM probe_table JOIN build_table ON a = b"
) {
df =>
val join = find(df.queryExecution.executedPlan) {
case _: BroadcastHashJoinExecTransformer => true
case _ => false
}
assert(join.isDefined)
val metrics = join.get.metrics
assert(metrics.contains("bloomFilterBlocksByteSize"))
assert(metrics("bloomFilterBlocksByteSize").value > 0)

assert(metrics.contains("hashProbeDynamicFiltersProduced"))
assert(metrics("hashProbeDynamicFiltersProduced").value == 1)
}
}
}
}
}
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -583,6 +583,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kNumReplacedWithDynamicFilterRows],
longArray[Metrics::kFlushRowCount],
longArray[Metrics::kLoadedToValueHook],
longArray[Metrics::kBloomFilterBlocksByteSize],
longArray[Metrics::kScanTime],
longArray[Metrics::kSkippedSplits],
longArray[Metrics::kProcessedSplits],
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct Metrics {
kNumReplacedWithDynamicFilterRows,
kFlushRowCount,
kLoadedToValueHook,
kBloomFilterBlocksByteSize,
kScanTime,
kSkippedSplits,
kProcessedSplits,
Expand Down
8 changes: 8 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted";
const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows";
const std::string kFlushRowCount = "flushRowCount";
const std::string kLoadedToValueHook = "loadedToValueHook";
const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
const std::string kTotalScanTime = "totalScanTime";
const std::string kSkippedSplits = "skippedSplits";
const std::string kProcessedSplits = "processedSplits";
Expand Down Expand Up @@ -462,6 +463,8 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount);
metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
runtimeMetric("sum", second->customStats, kLoadedToValueHook);
metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] =
runtimeMetric("sum", second->customStats, kBloomFilterBlocksByteSize);
metrics_->get(Metrics::kScanTime)[metricIndex] = runtimeMetric("sum", second->customStats, kTotalScanTime);
metrics_->get(Metrics::kSkippedSplits)[metricIndex] = runtimeMetric("sum", second->customStats, kSkippedSplits);
metrics_->get(Metrics::kProcessedSplits)[metricIndex] =
Expand Down Expand Up @@ -612,6 +615,11 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterNumBits, 8388608));
configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] =
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterMaxNumBits, 4194304));

configs[velox::core::QueryConfig::kHashProbeDynamicFilterPushdownEnabled] =
std::to_string(veloxCfg_->get<bool>(kHashProbeDynamicFilterPushdownEnabled, true));
configs[velox::core::QueryConfig::kHashProbeBloomFilterPushdownMaxSize] =
std::to_string(veloxCfg_->get<uint64_t>(kHashProbeBloomFilterPushdownMaxSize, 0));
// spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver takes no effect if
// spark.gluten.sql.columnar.backend.velox.IOThreads is set to 0
configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] =
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox
const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits";
const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver";

const std::string kHashProbeDynamicFilterPushdownEnabled =
"spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled";

const std::string kHashProbeBloomFilterPushdownMaxSize =
"spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize";

const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished";
const bool kShowTaskMetricsWhenFinishedDefault = false;

Expand Down
2 changes: 2 additions & 0 deletions docs/velox-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.floatingPointMode | loose | Config used to control the tolerance of floating point operations alignment with Spark. When the mode is set to strict, flushing is disabled for sum(float/double)and avg(float/double). When set to loose, flushing will be enabled. |
| spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation | true | Enable flushable aggregation. If true, Gluten will try converting regular aggregation into Velox's flushable aggregation when applicable. A flushable aggregation could emit intermediate result at anytime when memory is full / data reduction ratio is low. |
| spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | 32KB | Set the footer estimated size for velox file scan, refer to Velox's footer-estimated-size |
| spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize | 0b | The maximum byte size of Bloom filter that can be generated from hash probe. When set to 0, no Bloom filter will be generated. To achieve optimal performance, this should not be too larger than the CPU cache size on the host. |
| spark.gluten.sql.columnar.backend.velox.hashProbe.dynamicFilterPushdown.enabled | true | Whether hash probe can generate any dynamic filter (including Bloom filter) and push down to upstream operators. |
| spark.gluten.sql.columnar.backend.velox.loadQuantum | 256MB | Set the load quantum for velox file scan, recommend to use the default value (256MB) for performance consideration. If Velox cache is enabled, it can be 8MB at most. |
| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | 64MB | Set the max coalesced bytes for velox file scan |
| spark.gluten.sql.columnar.backend.velox.maxCoalescedDistance | 512KB | Set the max coalesced distance bytes for velox file scan |
Expand Down