diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala index 59f2ae3d8ac0..e3ad3aedc6e4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala @@ -16,13 +16,13 @@ */ package org.apache.gluten.backendsapi.clickhouse -import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.config.{ConfigRegistry, GlutenConfig} import org.apache.spark.SparkConf import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.internal.SQLConf -object CHConfig { +object CHConfig extends ConfigRegistry { private[clickhouse] val BACKEND_NAME: String = "ch" private[clickhouse] val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME) private val RUNTIME_SETTINGS: String = s"$CONF_PREFIX.runtime_settings" @@ -59,8 +59,6 @@ object CHConfig { def get: CHConfig = new CHConfig(SQLConf.get) - import GlutenConfig._ - val ENABLE_ONEPIPELINE_MERGETREE_WRITE = buildConf(prefixOf("mergetree.write.pipeline")) .doc("Using one pipeline to write data to MergeTree table in Spark 3.5") diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 4b8d89c77c28..e8b55bd072b7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -16,8 +16,6 @@ */ package org.apache.gluten.config -import org.apache.gluten.config.GlutenConfig.{buildConf, buildStaticConf, COLUMNAR_MAX_BATCH_SIZE} - import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.internal.SQLConf @@ -46,7 +44,7 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { } def veloxResizeBatchesShuffleInputOutputRange: ResizeRange = { - val standardSize = getConf(COLUMNAR_MAX_BATCH_SIZE) + val standardSize = getConf(GlutenConfig.COLUMNAR_MAX_BATCH_SIZE) val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1) val minSize = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_OUTPUT_MIN_SIZE) .getOrElse(defaultMinSize) @@ -84,9 +82,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN) } -object VeloxConfig { - - def get: VeloxConfig = { +object VeloxConfig extends ConfigRegistry { + override def get: VeloxConfig = { new VeloxConfig(SQLConf.get) } @@ -269,15 +266,19 @@ object VeloxConfig { val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT = buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput") - .doc(s"If true, combine small columnar batches together before sending to shuffle. " + - s"The default minimum output batch size is equal to 0.25 * ${COLUMNAR_MAX_BATCH_SIZE.key}") + .doc( + s"If true, combine small columnar batches together before sending to shuffle. " + + s"The default minimum output batch size is equal to 0.25 * " + + s"${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key}") .booleanConf .createWithDefault(true) val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT = buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput") - .doc(s"If true, combine small columnar batches together right after shuffle read. " + - s"The default minimum output batch size is equal to 0.25 * ${COLUMNAR_MAX_BATCH_SIZE.key}") + .doc( + s"If true, combine small columnar batches together right after shuffle read. " + + s"The default minimum output batch size is equal to 0.25 * " + + s"${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key}") .booleanConf .createWithDefault(false) diff --git a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala index 98c6ee0c8cbd..a693437b8dc8 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala @@ -25,10 +25,7 @@ class AllVeloxConfiguration extends AnyFunSuite { AllGlutenConfiguration.getCodeSourceLocation(this.getClass).split("backends-velox")(0) private val markdown = Paths.get(glutenHome, "docs", "velox-configuration.md").toAbsolutePath - private def loadConfigs = Array(VeloxConfig) - test("Check velox backend configs") { - loadConfigs val builder = MarkdownBuilder(getClass.getName) builder ++= @@ -49,8 +46,7 @@ class AllVeloxConfiguration extends AnyFunSuite { | --- | --- | --- |""" - ConfigEntry.getAllEntries - .filter(_.key.contains("velox")) + VeloxConfig.allEntries .filter(_.isPublic) .filter(!_.isExperimental) .sortBy(_.key) @@ -69,8 +65,7 @@ class AllVeloxConfiguration extends AnyFunSuite { | --- | --- | --- |""" - ConfigEntry.getAllEntries - .filter(_.key.contains("velox")) + VeloxConfig.allEntries .filter(_.isPublic) .filter(_.isExperimental) .sortBy(_.key) diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 75ba605c4f0a..6e80df3f1795 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -67,6 +67,8 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | | spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | | spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | +| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | +| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. | | spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. | | spark.gluten.velox.fs.s3a.connect.timeout | 200s | Timeout for AWS s3 connection. | @@ -74,7 +76,7 @@ nav_order: 16 | Key | Default | Description | |----------------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------| -| spark.gluten.velox.abandonbuild.noduphashminpct | 0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. | +| spark.gluten.velox.abandonbuild.noduphashminpct | 0.0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. | | spark.gluten.velox.abandonbuild.noduphashminrows | 100000 | Experimental: abandon hashmap build if duplicated rows more than this number. | | spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. | diff --git a/gluten-core/src/main/scala/org/apache/gluten/config/ConfigEntry.scala b/gluten-core/src/main/scala/org/apache/gluten/config/ConfigEntry.scala index ed861ec833d3..5f1ee8d4cfde 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/config/ConfigEntry.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/config/ConfigEntry.scala @@ -20,8 +20,6 @@ import org.apache.gluten.config.BackendType.BackendType import org.apache.spark.sql.internal.GlutenConfigProvider -import scala.collection.JavaConverters._ - /** * An entry contains all meta information for a configuration. * @@ -82,8 +80,6 @@ trait ConfigEntry[T] { s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " + s"public=$isPublic, version=$version)" } - - ConfigEntry.registerEntry(this) } private[gluten] class OptionalConfigEntry[T]( @@ -240,24 +236,5 @@ private[gluten] class ConfigEntryFallback[T]( } object ConfigEntry { - val UNDEFINED = "" - - private val knownConfigs = - new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() - - private def registerEntry(entry: ConfigEntry[_]): Unit = { - val existing = knownConfigs.putIfAbsent(entry.key, entry) - require(existing == null, s"Config entry ${entry.key} already registered!") - } - - def containsEntry(entry: ConfigEntry[_]): Boolean = { - Option(knownConfigs.get(entry.key)).isDefined - } - - def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key) - - def getAllEntries: Seq[ConfigEntry[_]] = { - knownConfigs.values().asScala.toSeq - } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/config/ConfigRegistry.scala b/gluten-core/src/main/scala/org/apache/gluten/config/ConfigRegistry.scala new file mode 100644 index 000000000000..de52f752b81b --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/config/ConfigRegistry.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.config + +import org.apache.spark.sql.internal.SQLConf + +import scala.collection.JavaConverters._ + +trait ConfigRegistry { + private val configEntries = + new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]().asScala + + private def register(entry: ConfigEntry[_]): Unit = { + val existing = configEntries.putIfAbsent(entry.key, entry) + require(existing.isEmpty, s"Config entry ${entry.key} already registered!") + } + + /** Visible for testing. */ + private[config] def allEntries: Seq[ConfigEntry[_]] = { + configEntries.values.toSeq + } + + protected def buildConf(key: String): ConfigBuilder = { + ConfigBuilder(key).onCreate { + entry => + register(entry) + ConfigRegistry.registerToAllEntries(entry) + } + } + + protected def buildStaticConf(key: String): ConfigBuilder = { + ConfigBuilder(key).onCreate { + entry => + SQLConf.registerStaticConfigKey(key) + register(entry) + ConfigRegistry.registerToAllEntries(entry) + } + } + + def get: GlutenCoreConfig +} + +object ConfigRegistry { + private val allConfigEntries = + new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]().asScala + + private def registerToAllEntries(entry: ConfigEntry[_]): Unit = { + val existing = allConfigEntries.putIfAbsent(entry.key, entry) + require(existing.isEmpty, s"Config entry ${entry.key} already registered!") + } + + def containsEntry(entry: ConfigEntry[_]): Boolean = { + allConfigEntries.contains(entry.key) + } + + def findEntry(key: String): Option[ConfigEntry[_]] = allConfigEntries.get(key) +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala b/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala index 4d27b7b451ff..e76a9431d15a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala @@ -26,7 +26,7 @@ class GlutenCoreConfig(conf: SQLConf) extends Logging { private lazy val configProvider = new SQLConfProvider(conf) def getConf[T](entry: ConfigEntry[T]): T = { - require(ConfigEntry.containsEntry(entry), s"$entry is not registered") + require(ConfigRegistry.containsEntry(entry), s"$entry is not registered") entry.readFrom(configProvider) } @@ -59,17 +59,11 @@ class GlutenCoreConfig(conf: SQLConf) extends Logging { } /* - * Note: Gluten configiguration.md is automatically generated from this code. + * Note: Gluten configuration.md is automatically generated from this code. * Make sure to run dev/gen_all_config_docs.sh after making changes to this file. */ -object GlutenCoreConfig { - def buildConf(key: String): ConfigBuilder = ConfigBuilder(key) - - def buildStaticConf(key: String): ConfigBuilder = { - ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key)) - } - - def get: GlutenCoreConfig = { +object GlutenCoreConfig extends ConfigRegistry { + override def get: GlutenCoreConfig = { new GlutenCoreConfig(SQLConf.get) } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala b/gluten-core/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala index aba15f7224fc..db8f9858565c 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala @@ -25,7 +25,8 @@ object GlutenConfigUtil { configProvider: GlutenConfigProvider, key: String, value: String): String = { - Option(ConfigEntry.findEntry(key)) + ConfigRegistry + .findEntry(key) .map { _.readFrom(configProvider) match { case o: Option[_] => o.map(_.toString).getOrElse(value) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 05b222084f03..b8776853a6b0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -362,14 +362,7 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB")) } -object GlutenConfig { - import SQLConf._ - - def buildConf(key: String): ConfigBuilder = ConfigBuilder(key) - - def buildStaticConf(key: String): ConfigBuilder = { - ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key)) - } +object GlutenConfig extends ConfigRegistry { // Hive configurations. val SPARK_SQL_PARQUET_COMPRESSION_CODEC: String = "spark.sql.parquet.compression.codec" @@ -505,9 +498,11 @@ object GlutenConfig { nativeConfMap.putAll(conf.filter(e => nativeKeys.contains(e._1)).asJava) val keyWithDefault = ImmutableList.of( - (CASE_SENSITIVE.key, CASE_SENSITIVE.defaultValueString), - (IGNORE_MISSING_FILES.key, IGNORE_MISSING_FILES.defaultValueString), - (LEGACY_STATISTICAL_AGGREGATE.key, LEGACY_STATISTICAL_AGGREGATE.defaultValueString), + (SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString), + (SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString), + ( + SQLConf.LEGACY_STATISTICAL_AGGREGATE.key, + SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString), ( COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.key, COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString), @@ -515,9 +510,9 @@ object GlutenConfig { GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key, GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString), (SPARK_SHUFFLE_SPILL_COMPRESS, SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString), - (MAP_KEY_DEDUP_POLICY.key, MAP_KEY_DEDUP_POLICY.defaultValueString), - (SESSION_LOCAL_TIMEZONE.key, SESSION_LOCAL_TIMEZONE.defaultValueString), - (ANSI_ENABLED.key, ANSI_ENABLED.defaultValueString) + (SQLConf.MAP_KEY_DEDUP_POLICY.key, SQLConf.MAP_KEY_DEDUP_POLICY.defaultValueString), + (SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString), + (SQLConf.ANSI_ENABLED.key, SQLConf.ANSI_ENABLED.defaultValueString) ) keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2))) GlutenConfigUtil.mapByteConfValue( @@ -533,11 +528,11 @@ object GlutenConfig { v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString)) conf - .get(LEGACY_TIME_PARSER_POLICY.key) + .get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) .foreach( v => nativeConfMap - .put(LEGACY_TIME_PARSER_POLICY.key, v.toUpperCase(Locale.ROOT))) + .put(SQLConf.LEGACY_TIME_PARSER_POLICY.key, v.toUpperCase(Locale.ROOT))) // Backend's dynamic session conf only. val confPrefix = prefixOf(backendName) @@ -598,7 +593,7 @@ object GlutenConfig { ("spark.gluten.velox.awsSdkLogLevel", "FATAL"), ("spark.gluten.velox.s3UseProxyFromEnv", "false"), ("spark.gluten.velox.s3PayloadSigningPolicy", "Never"), - (SESSION_LOCAL_TIMEZONE.key, SESSION_LOCAL_TIMEZONE.defaultValueString) + (SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString) ) keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2))) @@ -611,10 +606,10 @@ object GlutenConfig { GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key, GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY, - DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, + SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, SPARK_REDACTION_REGEX, - LEGACY_TIME_PARSER_POLICY.key, - LEGACY_STATISTICAL_AGGREGATE.key, + SQLConf.LEGACY_TIME_PARSER_POLICY.key, + SQLConf.LEGACY_STATISTICAL_AGGREGATE.key, COLUMNAR_CUDF_ENABLED.key ) nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava) diff --git a/gluten-substrait/src/test/scala/org/apache/gluten/config/AllGlutenConfiguration.scala b/gluten-substrait/src/test/scala/org/apache/gluten/config/AllGlutenConfiguration.scala index 776b4b559d2d..b7e98c85d924 100644 --- a/gluten-substrait/src/test/scala/org/apache/gluten/config/AllGlutenConfiguration.scala +++ b/gluten-substrait/src/test/scala/org/apache/gluten/config/AllGlutenConfiguration.scala @@ -49,10 +49,7 @@ class AllGlutenConfiguration extends AnyFunSuite { AllGlutenConfiguration.getCodeSourceLocation(this.getClass).split("gluten-substrait")(0) private val markdown = Paths.get(glutenHome, "docs", "Configuration.md").toAbsolutePath - private def loadConfigs = Array(GlutenConfig, GlutenCoreConfig) - test("Check gluten configs") { - loadConfigs val builder = MarkdownBuilder(getClass.getName) builder ++= @@ -110,7 +107,9 @@ class AllGlutenConfiguration extends AnyFunSuite { | --- | --- | --- |""" - ConfigEntry.getAllEntries + val allEntries = GlutenConfig.allEntries ++ GlutenCoreConfig.allEntries + + allEntries .filter(_.isPublic) .filter(!_.isExperimental) .sortBy(_.key) @@ -129,7 +128,7 @@ class AllGlutenConfiguration extends AnyFunSuite { | --- | --- | --- |""" - ConfigEntry.getAllEntries + allEntries .filter(_.isPublic) .filter(_.isExperimental) .sortBy(_.key)