diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index e87b18e07884..d7eee76c53a6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -529,6 +529,20 @@ object VeloxConfig { .booleanConf .createWithDefault(false) + val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS = + buildConf("spark.gluten.velox.abandonbuild.noduphashminrows") + .experimental() + .doc("Experimental: abandon hashmap build if duplicated rows more than this number.") + .intConf + .createWithDefault(100000) + + val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_PCT = + buildConf("spark.gluten.velox.abandonbuild.noduphashminpct") + .experimental() + .doc("Experimental: abandon hashmap build if duplicated rows are more than this percentile.") + .doubleConf + .createWithDefault(0) + val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled") .doc("Enable query tracing flag.") .booleanConf diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index b70676a62ecd..cb89d92aedee 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -593,6 +593,12 @@ std::unordered_map WholeStageResultIterator::getQueryC configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] = std::to_string(veloxCfg_->get(kVeloxSplitPreloadPerDriver, 2)); + // hashtable build optimizations + configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinRows] = + std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinRows, 100000)); + configs[velox::core::QueryConfig::kAbandonBuildNoDupHashMinPct] = + std::to_string(veloxCfg_->get(kAbandonBuildNoDupHashMinPct, 0)); + // Disable driver cpu time slicing. configs[velox::core::QueryConfig::kDriverCpuTimeSliceLimitMs] = "0"; diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index e37c99987e1c..2a2bdaf56d59 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -60,6 +60,10 @@ const std::string kAbandonPartialAggregationMinPct = const std::string kAbandonPartialAggregationMinRows = "spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows"; +// hashmap build +const std::string kAbandonBuildNoDupHashMinRows = "spark.gluten.velox.abandonbuild.noduphashminrows"; +const std::string kAbandonBuildNoDupHashMinPct = "spark.gluten.velox.abandonbuild.noduphashminpct"; + // execution const std::string kBloomFilterExpectedNumItems = "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems"; const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits"; diff --git a/docs/Configuration.md b/docs/Configuration.md index b7e725278a35..9aacdb377743 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -75,21 +75,21 @@ nav_order: 15 | spark.gluten.sql.columnar.maxBatchSize | 4096 | | spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | | spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | -| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | -| spark.gluten.sql.columnar.partial.generate | true | evaluates the non-offload-able HiveUDTF using vanilla Spark generator | -| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | -| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | -| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | -| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | -| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | -| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | -| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | -| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | -| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | -| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | -| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | -| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | -| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | +| spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | +| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | +| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | +| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | +| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | +| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | +| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | +| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | +| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | +| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | +| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | +| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | +| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | +| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | +| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | | spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | | spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | | spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index b5724f24e899..d4fe5c263970 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -73,5 +73,7 @@ nav_order: 16 | Key | Default | Description | |----------------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------| +| spark.gluten.velox.abandonbuild.noduphashminpct | 0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. | +| spark.gluten.velox.abandonbuild.noduphashminrows | 100000 | Experimental: abandon hashmap build if duplicated rows more than this number. | | spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. |