From a219f72beb03ebb5028bd3d5cf7f798500f05514 Mon Sep 17 00:00:00 2001 From: Yuan Date: Fri, 29 Aug 2025 14:46:26 +0100 Subject: [PATCH 1/6] [VL] Adding configuration for hash table build Signed-off-by: Yuan --- .../org/apache/gluten/config/VeloxConfig.scala | 14 ++++++++++++++ cpp/core/config/GlutenConfig.h | 3 +++ cpp/velox/compute/WholeStageResultIterator.cc | 6 ++++++ 3 files changed, 23 insertions(+) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index e87b18e07884..b4db3ac477a2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -529,6 +529,20 @@ object VeloxConfig { .booleanConf .createWithDefault(false) + val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS = + buildConf("spark.gluten.velox.abandonbuild.noduphashminrows") + .internal() + .doc("Experimental: abandon hashmap build if duplicated rows more than this number.") + .intConf + .createWithDefault(100000) + + val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_PCT = + buildConf("spark.gluten.velox.abandonbuild.noduphashminpct") + .internal() + .doc("Experimental: abandon hashmap build if duplicated rows are more than this pct.") + .doubleConf + .createWithDefault(0) + val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled") .doc("Enable query tracing flag.") .booleanConf diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 0384622e61af..1496283cfb53 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -93,6 +93,9 @@ const std::string kSparkLegacyStatisticalAggregate = "spark.sql.legacy.statistic const std::string kSparkJsonIgnoreNullFields = "spark.sql.jsonGenerator.ignoreNullFields"; +const std::string kAbandonBuildNoDupHashMinRows = "abandon_build_no_dup_hash_min_rows"; +const std::string kAbandonBuildNoDupHashMinPct = "abandon_build_no_dup_hash_min_pct"; + // cudf #ifdef GLUTEN_ENABLE_GPU const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf"; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b70676a62ecd..cb89d92aedee 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -593,6 +593,12 @@ std::unordered_map WholeStageResultIterator::getQueryC configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] = std::to_string(veloxCfg_->get(kVeloxSplitPreloadPerDriver, 2)); + // hashtable build optimizations + configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinRows] = + std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinRows, 100000)); + configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinPct] = + std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinPct, 0)); + // Disable driver cpu time slicing. configs[velox::core::QueryConfig::kDriverCpuTimeSliceLimitMs] = "0"; From 81a9d6c3ea2d6c2f2db2bebe1be8268fd63b6df1 Mon Sep 17 00:00:00 2001 From: Yuan Date: Mon, 8 Sep 2025 14:08:42 +0100 Subject: [PATCH 2/6] fix default value Signed-off-by: Yuan --- .../src/main/scala/org/apache/gluten/config/VeloxConfig.scala | 2 +- cpp/core/config/GlutenConfig.h | 3 --- cpp/velox/compute/WholeStageResultIterator.cc | 2 +- cpp/velox/config/VeloxConfig.h | 4 ++++ 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index b4db3ac477a2..8dfc8d6ef722 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -541,7 +541,7 @@ object VeloxConfig { .internal() .doc("Experimental: abandon hashmap build if duplicated rows are more than this pct.") .doubleConf - .createWithDefault(0) + .createWithDefault(100) val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled") .doc("Enable query tracing flag.") diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 1496283cfb53..0384622e61af 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -93,9 +93,6 @@ const std::string kSparkLegacyStatisticalAggregate = "spark.sql.legacy.statistic const std::string kSparkJsonIgnoreNullFields = "spark.sql.jsonGenerator.ignoreNullFields"; -const std::string kAbandonBuildNoDupHashMinRows = "abandon_build_no_dup_hash_min_rows"; -const std::string kAbandonBuildNoDupHashMinPct = "abandon_build_no_dup_hash_min_pct"; - // cudf #ifdef GLUTEN_ENABLE_GPU const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf"; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index cb89d92aedee..413dcb40345b 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -597,7 +597,7 @@ std::unordered_map WholeStageResultIterator::getQueryC configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinRows] = std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinRows, 100000)); configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinPct] = - std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinPct, 0)); + std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinPct, 100)); // Disable driver cpu time slicing. configs[velox::core::QueryConfig::kDriverCpuTimeSliceLimitMs] = "0"; diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index e37c99987e1c..2a2bdaf56d59 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -60,6 +60,10 @@ const std::string kAbandonPartialAggregationMinPct = const std::string kAbandonPartialAggregationMinRows = "spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows"; +// hashmap build +const std::string kAbandonBuildNoDupHashMinRows = "spark.gluten.velox.abandonbuild.noduphashminrows"; +const std::string kAbandonBuildNoDupHashMinPct = "spark.gluten.velox.abandonbuild.noduphashminpct"; + // execution const std::string kBloomFilterExpectedNumItems = "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems"; const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits"; From f030c7354e1313aeb4570633c6d41e5daf9b79f1 Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 18 Sep 2025 15:34:36 +0100 Subject: [PATCH 3/6] set as experimental Signed-off-by: Yuan --- .../src/main/scala/org/apache/gluten/config/VeloxConfig.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 8dfc8d6ef722..561328290b15 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -531,14 +531,14 @@ object VeloxConfig { val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS = buildConf("spark.gluten.velox.abandonbuild.noduphashminrows") - .internal() + .experimental() .doc("Experimental: abandon hashmap build if duplicated rows more than this number.") .intConf .createWithDefault(100000) val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_PCT = buildConf("spark.gluten.velox.abandonbuild.noduphashminpct") - .internal() + .experimental() .doc("Experimental: abandon hashmap build if duplicated rows are more than this pct.") .doubleConf .createWithDefault(100) From 7b0a64aee7e7307fd1f1df81c66604395204e736 Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 18 Sep 2025 18:21:03 +0100 Subject: [PATCH 4/6] update docs Signed-off-by: Yuan --- docs/Configuration.md | 30 +++++++++++++++--------------- docs/velox-configuration.md | 2 ++ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index b7e725278a35..9aacdb377743 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -75,21 +75,21 @@ nav_order: 15 | spark.gluten.sql.columnar.maxBatchSize | 4096 | | spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | | spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | -| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | -| spark.gluten.sql.columnar.partial.generate | true | evaluates the non-offload-able HiveUDTF using vanilla Spark generator | -| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | -| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | -| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | -| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | -| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | -| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | -| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | -| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | -| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | -| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | -| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | -| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | -| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | +| spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | +| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | +| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | +| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | +| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | +| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | +| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | +| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | +| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | +| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | +| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | +| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | +| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | +| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | +| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | | spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | | spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | | spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index b5724f24e899..2933ac02a7d2 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -73,5 +73,7 @@ nav_order: 16 | Key | Default | Description | |----------------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------| +| spark.gluten.velox.abandonbuild.noduphashminpct | 100.0 | Experimental: abandon hashmap build if duplicated rows are more than this pct. | +| spark.gluten.velox.abandonbuild.noduphashminrows | 100000 | Experimental: abandon hashmap build if duplicated rows more than this number. | | spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. | From 0b3bee50b951f70d48147f102660c1155f3413c5 Mon Sep 17 00:00:00 2001 From: Yuan Date: Wed, 24 Sep 2025 13:31:37 +0100 Subject: [PATCH 5/6] disable by default Signed-off-by: Yuan --- .../src/main/scala/org/apache/gluten/config/VeloxConfig.scala | 4 ++-- docs/velox-configuration.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 561328290b15..d7eee76c53a6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -539,9 +539,9 @@ object VeloxConfig { val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_PCT = buildConf("spark.gluten.velox.abandonbuild.noduphashminpct") .experimental() - .doc("Experimental: abandon hashmap build if duplicated rows are more than this pct.") + .doc("Experimental: abandon hashmap build if duplicated rows are more than this percentile.") .doubleConf - .createWithDefault(100) + .createWithDefault(0) val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled") .doc("Enable query tracing flag.") diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 2933ac02a7d2..d4fe5c263970 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -73,7 +73,7 @@ nav_order: 16 | Key | Default | Description | |----------------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------| -| spark.gluten.velox.abandonbuild.noduphashminpct | 100.0 | Experimental: abandon hashmap build if duplicated rows are more than this pct. | +| spark.gluten.velox.abandonbuild.noduphashminpct | 0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. | | spark.gluten.velox.abandonbuild.noduphashminrows | 100000 | Experimental: abandon hashmap build if duplicated rows more than this number. | | spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. | From de42175b9e86191a34998c53050194df2fa5fc0f Mon Sep 17 00:00:00 2001 From: Yuan Date: Mon, 29 Sep 2025 10:13:51 +0100 Subject: [PATCH 6/6] fix default value Signed-off-by: Yuan --- cpp/velox/compute/WholeStageResultIterator.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 413dcb40345b..cb89d92aedee 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -597,7 +597,7 @@ std::unordered_map WholeStageResultIterator::getQueryC configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinRows] = std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinRows, 100000)); configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinPct] = - std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinPct, 100)); + std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinPct, 0)); // Disable driver cpu time slicing. configs[velox::core::QueryConfig::kDriverCpuTimeSliceLimitMs] = "0";