Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.{ConfigRegistry, GlutenConfig}

import org.apache.spark.SparkConf
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf

object CHConfig {
object CHConfig extends ConfigRegistry {
private[clickhouse] val BACKEND_NAME: String = "ch"
private[clickhouse] val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME)
private val RUNTIME_SETTINGS: String = s"$CONF_PREFIX.runtime_settings"
Expand Down Expand Up @@ -59,8 +59,6 @@ object CHConfig {

def get: CHConfig = new CHConfig(SQLConf.get)

import GlutenConfig._

val ENABLE_ONEPIPELINE_MERGETREE_WRITE =
buildConf(prefixOf("mergetree.write.pipeline"))
.doc("Using one pipeline to write data to MergeTree table in Spark 3.5")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.gluten.config

import org.apache.gluten.config.GlutenConfig.{buildConf, buildStaticConf, COLUMNAR_MAX_BATCH_SIZE}

import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -46,7 +44,7 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
}

def veloxResizeBatchesShuffleInputOutputRange: ResizeRange = {
val standardSize = getConf(COLUMNAR_MAX_BATCH_SIZE)
val standardSize = getConf(GlutenConfig.COLUMNAR_MAX_BATCH_SIZE)
val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
val minSize = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_OUTPUT_MIN_SIZE)
.getOrElse(defaultMinSize)
Expand Down Expand Up @@ -84,9 +82,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def cudfEnableTableScan: Boolean = getConf(CUDF_ENABLE_TABLE_SCAN)
}

object VeloxConfig {

def get: VeloxConfig = {
object VeloxConfig extends ConfigRegistry {
override def get: VeloxConfig = {
new VeloxConfig(SQLConf.get)
}

Expand Down Expand Up @@ -269,15 +266,19 @@ object VeloxConfig {

val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput")
.doc(s"If true, combine small columnar batches together before sending to shuffle. " +
s"The default minimum output batch size is equal to 0.25 * ${COLUMNAR_MAX_BATCH_SIZE.key}")
.doc(
s"If true, combine small columnar batches together before sending to shuffle. " +
s"The default minimum output batch size is equal to 0.25 * " +
s"${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key}")
.booleanConf
.createWithDefault(true)

val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput")
.doc(s"If true, combine small columnar batches together right after shuffle read. " +
s"The default minimum output batch size is equal to 0.25 * ${COLUMNAR_MAX_BATCH_SIZE.key}")
.doc(
s"If true, combine small columnar batches together right after shuffle read. " +
s"The default minimum output batch size is equal to 0.25 * " +
s"${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key}")
.booleanConf
.createWithDefault(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ class AllVeloxConfiguration extends AnyFunSuite {
AllGlutenConfiguration.getCodeSourceLocation(this.getClass).split("backends-velox")(0)
private val markdown = Paths.get(glutenHome, "docs", "velox-configuration.md").toAbsolutePath

private def loadConfigs = Array(VeloxConfig)

test("Check velox backend configs") {
loadConfigs
val builder = MarkdownBuilder(getClass.getName)

builder ++=
Expand All @@ -49,8 +46,7 @@ class AllVeloxConfiguration extends AnyFunSuite {
| --- | --- | ---
|"""

ConfigEntry.getAllEntries
.filter(_.key.contains("velox"))
VeloxConfig.allEntries
.filter(_.isPublic)
.filter(!_.isExperimental)
.sortBy(_.key)
Expand All @@ -69,8 +65,7 @@ class AllVeloxConfiguration extends AnyFunSuite {
| --- | --- | ---
|"""

ConfigEntry.getAllEntries
.filter(_.key.contains("velox"))
VeloxConfig.allEntries
.filter(_.isPublic)
.filter(_.isExperimental)
.sortBy(_.key)
Expand Down
4 changes: 3 additions & 1 deletion docs/velox-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. |
| spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. |
| spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing |
| spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. |
| spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. |
| spark.gluten.velox.castFromVarcharAddTrimNode | false | If true, will add a trim node which has the same sementic as vanilla Spark to CAST-from-varchar.Otherwise, do nothing. |
| spark.gluten.velox.fs.s3a.connect.timeout | 200s | Timeout for AWS s3 connection. |

## Gluten Velox backend *experimental* configurations

| Key | Default | Description |
|----------------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------------------------|
| spark.gluten.velox.abandonbuild.noduphashminpct | 0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. |
| spark.gluten.velox.abandonbuild.noduphashminpct | 0.0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. |
| spark.gluten.velox.abandonbuild.noduphashminrows | 100000 | Experimental: abandon hashmap build if duplicated rows more than this number. |
| spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. |

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import org.apache.gluten.config.BackendType.BackendType

import org.apache.spark.sql.internal.GlutenConfigProvider

import scala.collection.JavaConverters._

/**
* An entry contains all meta information for a configuration.
*
Expand Down Expand Up @@ -82,8 +80,6 @@ trait ConfigEntry[T] {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
s"public=$isPublic, version=$version)"
}

ConfigEntry.registerEntry(this)
}

private[gluten] class OptionalConfigEntry[T](
Expand Down Expand Up @@ -240,24 +236,5 @@ private[gluten] class ConfigEntryFallback[T](
}

object ConfigEntry {

val UNDEFINED = "<undefined>"

private val knownConfigs =
new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()

private def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
require(existing == null, s"Config entry ${entry.key} already registered!")
}

def containsEntry(entry: ConfigEntry[_]): Boolean = {
Option(knownConfigs.get(entry.key)).isDefined
}

def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)

def getAllEntries: Seq[ConfigEntry[_]] = {
knownConfigs.values().asScala.toSeq
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.config

import org.apache.spark.sql.internal.SQLConf

import scala.collection.JavaConverters._

trait ConfigRegistry {
private val configEntries =
new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]().asScala

private def register(entry: ConfigEntry[_]): Unit = {
val existing = configEntries.putIfAbsent(entry.key, entry)
require(existing.isEmpty, s"Config entry ${entry.key} already registered!")
}

/** Visible for testing. */
private[config] def allEntries: Seq[ConfigEntry[_]] = {
configEntries.values.toSeq
}

protected def buildConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate {
entry =>
register(entry)
ConfigRegistry.registerToAllEntries(entry)
}
}

protected def buildStaticConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate {
entry =>
SQLConf.registerStaticConfigKey(key)
register(entry)
ConfigRegistry.registerToAllEntries(entry)
}
}

def get: GlutenCoreConfig
}

object ConfigRegistry {
private val allConfigEntries =
new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]().asScala

private def registerToAllEntries(entry: ConfigEntry[_]): Unit = {
val existing = allConfigEntries.putIfAbsent(entry.key, entry)
require(existing.isEmpty, s"Config entry ${entry.key} already registered!")
}

def containsEntry(entry: ConfigEntry[_]): Boolean = {
allConfigEntries.contains(entry.key)
}

def findEntry(key: String): Option[ConfigEntry[_]] = allConfigEntries.get(key)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class GlutenCoreConfig(conf: SQLConf) extends Logging {
private lazy val configProvider = new SQLConfProvider(conf)

def getConf[T](entry: ConfigEntry[T]): T = {
require(ConfigEntry.containsEntry(entry), s"$entry is not registered")
require(ConfigRegistry.containsEntry(entry), s"$entry is not registered")
entry.readFrom(configProvider)
}

Expand Down Expand Up @@ -59,17 +59,11 @@ class GlutenCoreConfig(conf: SQLConf) extends Logging {
}

/*
* Note: Gluten configiguration.md is automatically generated from this code.
* Note: Gluten configuration.md is automatically generated from this code.
* Make sure to run dev/gen_all_config_docs.sh after making changes to this file.
*/
object GlutenCoreConfig {
def buildConf(key: String): ConfigBuilder = ConfigBuilder(key)

def buildStaticConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key))
}

def get: GlutenCoreConfig = {
object GlutenCoreConfig extends ConfigRegistry {
override def get: GlutenCoreConfig = {
new GlutenCoreConfig(SQLConf.get)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ object GlutenConfigUtil {
configProvider: GlutenConfigProvider,
key: String,
value: String): String = {
Option(ConfigEntry.findEntry(key))
ConfigRegistry
.findEntry(key)
.map {
_.readFrom(configProvider) match {
case o: Option[_] => o.map(_.toString).getOrElse(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,7 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {
JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB"))
}

object GlutenConfig {
import SQLConf._

def buildConf(key: String): ConfigBuilder = ConfigBuilder(key)

def buildStaticConf(key: String): ConfigBuilder = {
ConfigBuilder(key).onCreate(_ => SQLConf.registerStaticConfigKey(key))
}
object GlutenConfig extends ConfigRegistry {

// Hive configurations.
val SPARK_SQL_PARQUET_COMPRESSION_CODEC: String = "spark.sql.parquet.compression.codec"
Expand Down Expand Up @@ -505,19 +498,21 @@ object GlutenConfig {
nativeConfMap.putAll(conf.filter(e => nativeKeys.contains(e._1)).asJava)

val keyWithDefault = ImmutableList.of(
(CASE_SENSITIVE.key, CASE_SENSITIVE.defaultValueString),
(IGNORE_MISSING_FILES.key, IGNORE_MISSING_FILES.defaultValueString),
(LEGACY_STATISTICAL_AGGREGATE.key, LEGACY_STATISTICAL_AGGREGATE.defaultValueString),
(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
(SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString),
(
SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
SQLConf.LEGACY_STATISTICAL_AGGREGATE.defaultValueString),
(
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.key,
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString),
(
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
(SPARK_SHUFFLE_SPILL_COMPRESS, SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString),
(MAP_KEY_DEDUP_POLICY.key, MAP_KEY_DEDUP_POLICY.defaultValueString),
(SESSION_LOCAL_TIMEZONE.key, SESSION_LOCAL_TIMEZONE.defaultValueString),
(ANSI_ENABLED.key, ANSI_ENABLED.defaultValueString)
(SQLConf.MAP_KEY_DEDUP_POLICY.key, SQLConf.MAP_KEY_DEDUP_POLICY.defaultValueString),
(SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString),
(SQLConf.ANSI_ENABLED.key, SQLConf.ANSI_ENABLED.defaultValueString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))
GlutenConfigUtil.mapByteConfValue(
Expand All @@ -533,11 +528,11 @@ object GlutenConfig {
v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString))

conf
.get(LEGACY_TIME_PARSER_POLICY.key)
.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key)
.foreach(
v =>
nativeConfMap
.put(LEGACY_TIME_PARSER_POLICY.key, v.toUpperCase(Locale.ROOT)))
.put(SQLConf.LEGACY_TIME_PARSER_POLICY.key, v.toUpperCase(Locale.ROOT)))

// Backend's dynamic session conf only.
val confPrefix = prefixOf(backendName)
Expand Down Expand Up @@ -598,7 +593,7 @@ object GlutenConfig {
("spark.gluten.velox.awsSdkLogLevel", "FATAL"),
("spark.gluten.velox.s3UseProxyFromEnv", "false"),
("spark.gluten.velox.s3PayloadSigningPolicy", "Never"),
(SESSION_LOCAL_TIMEZONE.key, SESSION_LOCAL_TIMEZONE.defaultValueString)
(SQLConf.SESSION_LOCAL_TIMEZONE.key, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

Expand All @@ -611,10 +606,10 @@ object GlutenConfig {
GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key,
GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY,
DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SQLConf.DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SPARK_REDACTION_REGEX,
LEGACY_TIME_PARSER_POLICY.key,
LEGACY_STATISTICAL_AGGREGATE.key,
SQLConf.LEGACY_TIME_PARSER_POLICY.key,
SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
COLUMNAR_CUDF_ENABLED.key
)
nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ class AllGlutenConfiguration extends AnyFunSuite {
AllGlutenConfiguration.getCodeSourceLocation(this.getClass).split("gluten-substrait")(0)
private val markdown = Paths.get(glutenHome, "docs", "Configuration.md").toAbsolutePath

private def loadConfigs = Array(GlutenConfig, GlutenCoreConfig)

test("Check gluten configs") {
loadConfigs
val builder = MarkdownBuilder(getClass.getName)

builder ++=
Expand Down Expand Up @@ -110,7 +107,9 @@ class AllGlutenConfiguration extends AnyFunSuite {
| --- | --- | ---
|"""

ConfigEntry.getAllEntries
val allEntries = GlutenConfig.allEntries ++ GlutenCoreConfig.allEntries

allEntries
.filter(_.isPublic)
.filter(!_.isExperimental)
.sortBy(_.key)
Expand All @@ -129,7 +128,7 @@ class AllGlutenConfiguration extends AnyFunSuite {
| --- | --- | ---
|"""

ConfigEntry.getAllEntries
allEntries
.filter(_.isPublic)
.filter(_.isExperimental)
.sortBy(_.key)
Expand Down