From 12b8fb29ed0af2934b65334219a5cfc2918cd046 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 20 Nov 2025 11:29:02 -0800 Subject: [PATCH 1/3] support_ansi_avg_wip --- native/core/src/execution/planner.rs | 8 +- native/proto/src/proto/expr.proto | 2 +- native/spark-expr/src/agg_funcs/avg_int.rs | 143 ++++++++++++++++++ native/spark-expr/src/agg_funcs/mod.rs | 2 + .../org/apache/comet/serde/aggregates.scala | 16 +- 5 files changed, 156 insertions(+), 15 deletions(-) create mode 100644 native/spark-expr/src/agg_funcs/avg_int.rs diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index a33df705b3..52b92f3c65 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -116,7 +116,7 @@ use datafusion_comet_spark_expr::{ ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct, GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RLike, RandExpr, RandnExpr, SparkCastOptions, Stddev, SubstringExpr, SumDecimal, TimestampTruncExpr, - ToJson, UnboundColumn, Variance, + ToJson, UnboundColumn, Variance, AvgInt }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -1854,6 +1854,12 @@ impl PhysicalPlanner { let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let input_datatype = to_arrow_datatype(expr.sum_datatype.as_ref().unwrap()); let builder = match datatype { + + DataType::Int8 | DataType::UInt8 | DataType::Int16 | DataType::UInt16 | DataType::Int32 => { + let func = + AggregateUDF::new_from_impl(AvgInt::new(datatype, input_datatype)); + AggregateExprBuilder::new(Arc::new(func), vec![child]) + } DataType::Decimal128(_, _) => { let func = AggregateUDF::new_from_impl(AvgDecimal::new(datatype, input_datatype)); diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index c9037dcd69..7ec4a9aebe 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -137,7 +137,7 @@ message Avg { Expr child = 1; DataType datatype = 2; DataType sum_datatype = 3; - bool fail_on_error = 4; // currently unused (useful for deciding Ansi vs Legacy mode) + EvalMode eval_mode = 4; } message First { diff --git a/native/spark-expr/src/agg_funcs/avg_int.rs b/native/spark-expr/src/agg_funcs/avg_int.rs new file mode 100644 index 0000000000..c48e84ff78 --- /dev/null +++ b/native/spark-expr/src/agg_funcs/avg_int.rs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use arrow::array::{ArrayRef, BooleanArray}; +use arrow::datatypes::{DataType, FieldRef}; +use datafusion::common::{DataFusionError, Result as DFResult, ScalarValue}; +use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature}; +use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion::logical_expr::type_coercion::aggregates::avg_return_type; +use datafusion::logical_expr::Volatility::Immutable; +use crate::{AvgDecimal, EvalMode}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AvgInt { + signature: Signature, + eval_mode: EvalMode, +} + +impl AvgInt { + pub fn try_new(data_type: DataType, eval_mode: EvalMode) -> DFResult { + match data_type { + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { + Ok(Self { + signature: Signature::user_defined(Immutable), + eval_mode + }) + }, + _ => {Err(DataFusionError::Internal("inalid data type for AvgInt".to_string()))} + } + } +} + +impl AggregateUDFImpl for AvgInt { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "avg" + } + + fn reverse_expr(&self) -> ReversedUDAF { + ReversedUDAF::Identical + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion::common::Result { + avg_return_type(self.name(), &arg_types[0]) + } + + fn is_nullable(&self) -> bool { + true + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> datafusion::common::Result> { + todo!() + } + + fn state_fields(&self, args: StateFieldsArgs) -> datafusion::common::Result> { + todo!() + } + + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + false + } + + fn create_groups_accumulator(&self, _args: AccumulatorArgs) -> datafusion::common::Result> { + Ok(Box::new(AvgIntGroupsAccumulator::new(self.eval_mode))) + } + + fn default_value(&self, data_type: &DataType) -> datafusion::common::Result { + todo!() + } +} + +struct AvgIntegerAccumulator{ + sum: Option, + count: u64, + eval_mode: EvalMode, +} + +impl AvgIntegerAccumulator { + fn new(eval_mode: EvalMode) -> Self { + Self{ + sum : Some(0), + count: 0, + eval_mode + } + } +} + +impl Accumulator for AvgIntegerAccumulator { + +} + +struct AvgIntGroupsAccumulator { + +} + +impl AvgIntGroupsAccumulator { + +} + + +impl GroupsAccumulator for AvgIntGroupsAccumulator { + fn update_batch(&mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize) -> datafusion::common::Result<()> { + todo!() + } + + fn evaluate(&mut self, emit_to: EmitTo) -> datafusion::common::Result { + todo!() + } + + fn state(&mut self, emit_to: EmitTo) -> datafusion::common::Result> { + todo!() + } + + fn merge_batch(&mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize) -> datafusion::common::Result<()> { + todo!() + } + + fn size(&self) -> usize { + todo!() + } +} \ No newline at end of file diff --git a/native/spark-expr/src/agg_funcs/mod.rs b/native/spark-expr/src/agg_funcs/mod.rs index 252da78890..19398cd1e2 100644 --- a/native/spark-expr/src/agg_funcs/mod.rs +++ b/native/spark-expr/src/agg_funcs/mod.rs @@ -22,6 +22,7 @@ mod covariance; mod stddev; mod sum_decimal; mod variance; +mod avg_int; pub use avg::Avg; pub use avg_decimal::AvgDecimal; @@ -30,3 +31,4 @@ pub use covariance::Covariance; pub use stddev::Stddev; pub use sum_decimal::SumDecimal; pub use variance::Variance; +pub use avg_int::AvgInt; diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index d00bbf4dfa..c1760a8b67 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.types.{ByteType, DataTypes, DecimalType, IntegerType import org.apache.comet.CometConf import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} +import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProto, serializeDataType} +import org.apache.comet.shims.CometEvalModeUtil object CometMin extends CometAggregateExpressionSerde[Min] { @@ -150,17 +151,6 @@ object CometCount extends CometAggregateExpressionSerde[Count] { object CometAverage extends CometAggregateExpressionSerde[Average] { - override def getSupportLevel(avg: Average): SupportLevel = { - avg.evalMode match { - case EvalMode.ANSI => - Incompatible(Some("ANSI mode is not supported")) - case EvalMode.TRY => - Incompatible(Some("TRY mode is not supported")) - case _ => - Compatible() - } - } - override def convert( aggExpr: AggregateExpression, avg: Average, @@ -192,7 +182,7 @@ object CometAverage extends CometAggregateExpressionSerde[Average] { val builder = ExprOuterClass.Avg.newBuilder() builder.setChild(childExpr.get) builder.setDatatype(dataType.get) - builder.setFailOnError(avg.evalMode == EvalMode.ANSI) + builder.setEvalMode(evalModeToProto(CometEvalModeUtil.fromSparkEvalMode(avg.evalMode))) builder.setSumDatatype(sumDataType.get) Some( From 6b3ba806f1cffdf68b85818b9cbda0ffe334c8e2 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 20 Nov 2025 11:29:27 -0800 Subject: [PATCH 2/3] support_ansi_avg_wip --- native/core/src/execution/planner.rs | 15 +++-- native/spark-expr/src/agg_funcs/avg_int.rs | 71 +++++++++++++--------- native/spark-expr/src/agg_funcs/mod.rs | 4 +- 3 files changed, 53 insertions(+), 37 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 52b92f3c65..6a5e61b8d7 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -113,10 +113,10 @@ use datafusion_comet_proto::{ }; use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId; use datafusion_comet_spark_expr::{ - ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct, - GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RLike, - RandExpr, RandnExpr, SparkCastOptions, Stddev, SubstringExpr, SumDecimal, TimestampTruncExpr, - ToJson, UnboundColumn, Variance, AvgInt + ArrayInsert, Avg, AvgDecimal, AvgInt, Cast, CheckOverflow, Correlation, Covariance, + CreateNamedStruct, GetArrayStructFields, GetStructField, IfExpr, ListExtract, + NormalizeNaNAndZero, RLike, RandExpr, RandnExpr, SparkCastOptions, Stddev, SubstringExpr, + SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -1854,8 +1854,11 @@ impl PhysicalPlanner { let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let input_datatype = to_arrow_datatype(expr.sum_datatype.as_ref().unwrap()); let builder = match datatype { - - DataType::Int8 | DataType::UInt8 | DataType::Int16 | DataType::UInt16 | DataType::Int32 => { + DataType::Int8 + | DataType::UInt8 + | DataType::Int16 + | DataType::UInt16 + | DataType::Int32 => { let func = AggregateUDF::new_from_impl(AvgInt::new(datatype, input_datatype)); AggregateExprBuilder::new(Arc::new(func), vec![child]) diff --git a/native/spark-expr/src/agg_funcs/avg_int.rs b/native/spark-expr/src/agg_funcs/avg_int.rs index c48e84ff78..103c2ac19e 100644 --- a/native/spark-expr/src/agg_funcs/avg_int.rs +++ b/native/spark-expr/src/agg_funcs/avg_int.rs @@ -15,15 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; +use crate::{AvgDecimal, EvalMode}; use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{DataType, FieldRef}; use datafusion::common::{DataFusionError, Result as DFResult, ScalarValue}; -use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature}; use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion::logical_expr::type_coercion::aggregates::avg_return_type; use datafusion::logical_expr::Volatility::Immutable; -use crate::{AvgDecimal, EvalMode}; +use datafusion::logical_expr::{ + Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, +}; +use std::any::Any; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AvgInt { @@ -34,13 +36,13 @@ pub struct AvgInt { impl AvgInt { pub fn try_new(data_type: DataType, eval_mode: EvalMode) -> DFResult { match data_type { - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { - Ok(Self { + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => Ok(Self { signature: Signature::user_defined(Immutable), - eval_mode - }) - }, - _ => {Err(DataFusionError::Internal("inalid data type for AvgInt".to_string()))} + eval_mode, + }), + _ => Err(DataFusionError::Internal( + "inalid data type for AvgInt".to_string(), + )), } } } @@ -58,7 +60,7 @@ impl AggregateUDFImpl for AvgInt { ReversedUDAF::Identical } - fn signature(&self) -> &Signature { + fn signature(&self) -> &Signature { &self.signature } @@ -70,7 +72,10 @@ impl AggregateUDFImpl for AvgInt { true } - fn accumulator(&self, acc_args: AccumulatorArgs) -> datafusion::common::Result> { + fn accumulator( + &self, + acc_args: AccumulatorArgs, + ) -> datafusion::common::Result> { todo!() } @@ -82,7 +87,10 @@ impl AggregateUDFImpl for AvgInt { false } - fn create_groups_accumulator(&self, _args: AccumulatorArgs) -> datafusion::common::Result> { + fn create_groups_accumulator( + &self, + _args: AccumulatorArgs, + ) -> datafusion::common::Result> { Ok(Box::new(AvgIntGroupsAccumulator::new(self.eval_mode))) } @@ -91,7 +99,7 @@ impl AggregateUDFImpl for AvgInt { } } -struct AvgIntegerAccumulator{ +struct AvgIntegerAccumulator { sum: Option, count: u64, eval_mode: EvalMode, @@ -99,29 +107,28 @@ struct AvgIntegerAccumulator{ impl AvgIntegerAccumulator { fn new(eval_mode: EvalMode) -> Self { - Self{ - sum : Some(0), + Self { + sum: Some(0), count: 0, - eval_mode + eval_mode, } } } -impl Accumulator for AvgIntegerAccumulator { - -} +impl Accumulator for AvgIntegerAccumulator {} -struct AvgIntGroupsAccumulator { - -} - -impl AvgIntGroupsAccumulator { - -} +struct AvgIntGroupsAccumulator {} +impl AvgIntGroupsAccumulator {} impl GroupsAccumulator for AvgIntGroupsAccumulator { - fn update_batch(&mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize) -> datafusion::common::Result<()> { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> datafusion::common::Result<()> { todo!() } @@ -133,11 +140,17 @@ impl GroupsAccumulator for AvgIntGroupsAccumulator { todo!() } - fn merge_batch(&mut self, values: &[ArrayRef], group_indices: &[usize], opt_filter: Option<&BooleanArray>, total_num_groups: usize) -> datafusion::common::Result<()> { + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> datafusion::common::Result<()> { todo!() } fn size(&self) -> usize { todo!() } -} \ No newline at end of file +} diff --git a/native/spark-expr/src/agg_funcs/mod.rs b/native/spark-expr/src/agg_funcs/mod.rs index 19398cd1e2..8025fc7a08 100644 --- a/native/spark-expr/src/agg_funcs/mod.rs +++ b/native/spark-expr/src/agg_funcs/mod.rs @@ -17,18 +17,18 @@ mod avg; mod avg_decimal; +mod avg_int; mod correlation; mod covariance; mod stddev; mod sum_decimal; mod variance; -mod avg_int; pub use avg::Avg; pub use avg_decimal::AvgDecimal; +pub use avg_int::AvgInt; pub use correlation::Correlation; pub use covariance::Covariance; pub use stddev::Stddev; pub use sum_decimal::SumDecimal; pub use variance::Variance; -pub use avg_int::AvgInt; From bd839783b6c781176086c42063ec2ff8bb0374d5 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 23 Nov 2025 16:11:46 -0800 Subject: [PATCH 3/3] support_ansi_avg --- .../source/user-guide/latest/compatibility.md | 172 +++--- docs/source/user-guide/latest/configs.md | 526 +++++++++--------- native/core/src/execution/planner.rs | 32 +- native/spark-expr/src/agg_funcs/avg.rs | 52 +- native/spark-expr/src/agg_funcs/avg_int.rs | 156 ------ native/spark-expr/src/agg_funcs/mod.rs | 2 - .../comet/exec/CometAggregateSuite.scala | 36 ++ 7 files changed, 419 insertions(+), 557 deletions(-) delete mode 100644 native/spark-expr/src/agg_funcs/avg_int.rs diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index 17a9515787..cb5366bd73 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -89,84 +89,82 @@ The following cast operations are generally compatible with Spark except for the - -| From Type | To Type | Notes | -| --------- | ------- | --------------------------------------------------------------------------------------------------------------- | -| boolean | byte | | -| boolean | short | | -| boolean | integer | | -| boolean | long | | -| boolean | float | | -| boolean | double | | -| boolean | string | | -| byte | boolean | | -| byte | short | | -| byte | integer | | -| byte | long | | -| byte | float | | -| byte | double | | -| byte | decimal | | -| byte | string | | -| short | boolean | | -| short | byte | | -| short | integer | | -| short | long | | -| short | float | | -| short | double | | -| short | decimal | | -| short | string | | -| integer | boolean | | -| integer | byte | | -| integer | short | | -| integer | long | | -| integer | float | | -| integer | double | | -| integer | decimal | | -| integer | string | | -| long | boolean | | -| long | byte | | -| long | short | | -| long | integer | | -| long | float | | -| long | double | | -| long | decimal | | -| long | string | | -| float | boolean | | -| float | byte | | -| float | short | | -| float | integer | | -| float | long | | -| float | double | | -| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| double | boolean | | -| double | byte | | -| double | short | | -| double | integer | | -| double | long | | -| double | float | | -| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| decimal | boolean | | -| decimal | byte | | -| decimal | short | | -| decimal | integer | | -| decimal | long | | -| decimal | float | | -| decimal | double | | -| decimal | decimal | | -| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | -| string | boolean | | -| string | byte | | -| string | short | | -| string | integer | | -| string | long | | -| string | binary | | -| string | date | Only supports years between 262143 BC and 262142 AD | -| binary | string | | -| date | string | | -| timestamp | long | | -| timestamp | string | | -| timestamp | date | | - +| From Type | To Type | Notes | +|-|-|-| +| boolean | byte | | +| boolean | short | | +| boolean | integer | | +| boolean | long | | +| boolean | float | | +| boolean | double | | +| boolean | string | | +| byte | boolean | | +| byte | short | | +| byte | integer | | +| byte | long | | +| byte | float | | +| byte | double | | +| byte | decimal | | +| byte | string | | +| short | boolean | | +| short | byte | | +| short | integer | | +| short | long | | +| short | float | | +| short | double | | +| short | decimal | | +| short | string | | +| integer | boolean | | +| integer | byte | | +| integer | short | | +| integer | long | | +| integer | float | | +| integer | double | | +| integer | decimal | | +| integer | string | | +| long | boolean | | +| long | byte | | +| long | short | | +| long | integer | | +| long | float | | +| long | double | | +| long | decimal | | +| long | string | | +| float | boolean | | +| float | byte | | +| float | short | | +| float | integer | | +| float | long | | +| float | double | | +| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| double | boolean | | +| double | byte | | +| double | short | | +| double | integer | | +| double | long | | +| double | float | | +| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| decimal | boolean | | +| decimal | byte | | +| decimal | short | | +| decimal | integer | | +| decimal | long | | +| decimal | float | | +| decimal | double | | +| decimal | decimal | | +| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | +| string | boolean | | +| string | byte | | +| string | short | | +| string | integer | | +| string | long | | +| string | binary | | +| string | date | Only supports years between 262143 BC and 262142 AD | +| binary | string | | +| date | string | | +| timestamp | long | | +| timestamp | string | | +| timestamp | date | | ### Incompatible Casts @@ -176,16 +174,14 @@ The following cast operations are not compatible with Spark for all inputs and a - -| From Type | To Type | Notes | -| --------- | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | -| float | decimal | There can be rounding differences | -| double | decimal | There can be rounding differences | -| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | -| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | -| string | timestamp | Not all valid formats are supported | - +| From Type | To Type | Notes | +|-|-|-| +| float | decimal | There can be rounding differences | +| double | decimal | There can be rounding differences | +| string | float | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. | +| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits | +| string | timestamp | Not all valid formats are supported | ### Unsupported Casts diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 7e3d2a79f7..df6ff0e313 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -25,54 +25,48 @@ Comet provides the following configuration settings. - -| Config | Description | Default Value | -| --------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.scan.allowIncompatible` | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | -| `spark.comet.scan.enabled` | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. | true | -| `spark.comet.scan.preFetch.enabled` | Whether to enable pre-fetching feature of CometScan. | false | -| `spark.comet.scan.preFetch.threadNum` | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | -| `spark.hadoop.fs.comet.libhdfs.schemes` | Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses via libhdfs, separated by commas. Valid only when built with hdfs feature enabled. | | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.scan.allowIncompatible` | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| `spark.comet.scan.enabled` | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. | true | +| `spark.comet.scan.preFetch.enabled` | Whether to enable pre-fetching feature of CometScan. | false | +| `spark.comet.scan.preFetch.threadNum` | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | +| `spark.hadoop.fs.comet.libhdfs.schemes` | Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses via libhdfs, separated by commas. Valid only when built with hdfs feature enabled. | | ## Parquet Reader Configuration Settings - -| Config | Description | Default Value | -| ------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.parquet.enable.directBuffer` | Whether to use Java direct byte buffer when reading Parquet. | false | -| `spark.comet.parquet.read.io.adjust.readRange.skew` | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false | -| `spark.comet.parquet.read.io.mergeRanges` | When enabled the parallel reader will try to merge ranges of data that are separated by less than `comet.parquet.read.io.mergeRanges.delta` bytes. Longer continuous reads are faster on cloud storage. | true | -| `spark.comet.parquet.read.io.mergeRanges.delta` | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 | -| `spark.comet.parquet.read.parallel.io.enabled` | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | -| `spark.comet.parquet.read.parallel.io.thread-pool.size` | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | -| `spark.comet.parquet.respectFilterPushdown` | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, disabled by default | false | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.parquet.enable.directBuffer` | Whether to use Java direct byte buffer when reading Parquet. | false | +| `spark.comet.parquet.read.io.adjust.readRange.skew` | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false | +| `spark.comet.parquet.read.io.mergeRanges` | When enabled the parallel reader will try to merge ranges of data that are separated by less than `comet.parquet.read.io.mergeRanges.delta` bytes. Longer continuous reads are faster on cloud storage. | true | +| `spark.comet.parquet.read.io.mergeRanges.delta` | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 | +| `spark.comet.parquet.read.parallel.io.enabled` | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | +| `spark.comet.parquet.read.parallel.io.thread-pool.size` | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | +| `spark.comet.parquet.respectFilterPushdown` | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, disabled by default | false | ## Query Execution Settings - -| Config | Description | Default Value | -| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | -| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | -| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true | -| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. It can be overridden by the environment variable `ENABLE_COMET`. | true | -| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | -| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec..enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true | -| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false | -| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark for floating-point operations that may differ from Spark, such as when comparing or sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | -| `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b | -| `spark.comet.metrics.updateInterval` | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | -| `spark.comet.nativeLoadRequired` | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | -| `spark.comet.regexp.allowIncompatible` | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.caseConversion.enabled` | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | +| `spark.comet.debug.enabled` | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | +| `spark.comet.dppFallback.enabled` | Whether to fall back to Spark for queries that use DPP. | true | +| `spark.comet.enabled` | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and `spark.comet.exec.enabled` need to be enabled. It can be overridden by the environment variable `ENABLE_COMET`. | true | +| `spark.comet.exceptionOnDatetimeRebase` | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | +| `spark.comet.exec.enabled` | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of `spark.comet.exec..enabled` at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. | true | +| `spark.comet.exec.replaceSortMergeJoin` | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | false | +| `spark.comet.exec.strictFloatingPoint` | When enabled, fall back to Spark for floating-point operations that may differ from Spark, such as when comparing or sorting -0.0 and 0.0. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | +| `spark.comet.maxTempDirectorySize` | The maximum amount of data (in bytes) stored inside the temporary directories. | 107374182400b | +| `spark.comet.metrics.updateInterval` | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 | +| `spark.comet.nativeLoadRequired` | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | +| `spark.comet.regexp.allowIncompatible` | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the [Comet Compatibility Guide](https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | ## Viewing Explain Plan & Fallback Reasons @@ -81,277 +75,263 @@ These settings can be used to determine which parts of the plan are accelerated - -| Config | Description | Default Value | -| ---------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.explain.format` | Choose extended explain output. The default format of 'verbose' will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. The format 'fallback' provides a list of fallback reasons instead. | verbose | -| `spark.comet.explain.native.enabled` | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | -| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false | -| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | -| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. It can be overridden by the environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.explain.format` | Choose extended explain output. The default format of 'verbose' will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. The format 'fallback' provides a list of fallback reasons instead. | verbose | +| `spark.comet.explain.native.enabled` | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | +| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false | +| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | +| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. It can be overridden by the environment variable `ENABLE_COMET_LOG_FALLBACK_REASONS`. | false | ## Shuffle Configuration Settings - -| Config | Description | Default Value | -| ------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.columnar.shuffle.async.enabled` | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false | -| `spark.comet.columnar.shuffle.async.max.thread.num` | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores \* the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | -| `spark.comet.columnar.shuffle.async.thread.num` | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | -| `spark.comet.columnar.shuffle.batch.size` | Batch size when writing out sorted spill files on the native side. Note that this should not be larger than batch size (i.e., `spark.comet.batchSize`). Otherwise it will produce larger batches than expected in the native operator after shuffle. | 8192 | -| `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | -| `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 | -| `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true | -| `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true | -| `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true | -| `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | -| `spark.comet.shuffle.sizeInBytesMultiplier` | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.columnar.shuffle.async.enabled` | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false | +| `spark.comet.columnar.shuffle.async.max.thread.num` | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | +| `spark.comet.columnar.shuffle.async.thread.num` | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | +| `spark.comet.columnar.shuffle.batch.size` | Batch size when writing out sorted spill files on the native side. Note that this should not be larger than batch size (i.e., `spark.comet.batchSize`). Otherwise it will produce larger batches than expected in the native operator after shuffle. | 8192 | +| `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | +| `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 | +| `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true | +| `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true | +| `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true | +| `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | +| `spark.comet.shuffle.sizeInBytesMultiplier` | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 | ## Memory & Tuning Configuration Settings - -| Config | Description | Default Value | -| -------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | -| `spark.comet.batchSize` | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 | -| `spark.comet.exec.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | fair_unified | -| `spark.comet.exec.memoryPool.fraction` | Fraction of off-heap memory pool that is available to Comet. Only applies to off-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 | -| `spark.comet.tracing.enabled` | Enable fine-grained tracing of events and memory usage. For more information, refer to the [Comet Tracing Guide](https://datafusion.apache.org/comet/user-guide/tracing.html). | false | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.batchSize` | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 | +| `spark.comet.exec.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | fair_unified | +| `spark.comet.exec.memoryPool.fraction` | Fraction of off-heap memory pool that is available to Comet. Only applies to off-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 | +| `spark.comet.tracing.enabled` | Enable fine-grained tracing of events and memory usage. For more information, refer to the [Comet Tracing Guide](https://datafusion.apache.org/comet/user-guide/tracing.html). | false | ## Development & Testing Settings - -| Config | Description | Default Value | -| --------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------- | -| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to be allocated per executor process for columnar shuffle when running in on-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 | -| `spark.comet.convert.csv.enabled` | When enabled, data from Spark (non-native) CSV v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | -| `spark.comet.convert.json.enabled` | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | -| `spark.comet.convert.parquet.enabled` | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | -| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false | -| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared | -| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB | -| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false | -| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan | -| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to be allocated per executor process for columnar shuffle when running in on-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 | +| `spark.comet.convert.csv.enabled` | When enabled, data from Spark (non-native) CSV v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | +| `spark.comet.convert.json.enabled` | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | +| `spark.comet.convert.parquet.enabled` | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. This is an experimental feature and has known issues with non-UTC timezones. | false | +| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false | +| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared | +| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB | +| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false | +| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan | +| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false | ## Enabling or Disabling Individual Operators - -| Config | Description | Default Value | -| ------------------------------------------------------ | ---------------------------------------------------- | ------------- | -| `spark.comet.exec.aggregate.enabled` | Whether to enable aggregate by default. | true | -| `spark.comet.exec.broadcastExchange.enabled` | Whether to enable broadcastExchange by default. | true | -| `spark.comet.exec.broadcastHashJoin.enabled` | Whether to enable broadcastHashJoin by default. | true | -| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true | -| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true | -| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true | -| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true | -| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | -| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | -| `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true | -| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | false | -| `spark.comet.exec.project.enabled` | Whether to enable project by default. | true | -| `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true | -| `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true | -| `spark.comet.exec.sortMergeJoinWithJoinFilter.enabled` | Experimental support for Sort Merge Join with filter | false | -| `spark.comet.exec.takeOrderedAndProject.enabled` | Whether to enable takeOrderedAndProject by default. | true | -| `spark.comet.exec.union.enabled` | Whether to enable union by default. | true | -| `spark.comet.exec.window.enabled` | Whether to enable window by default. | true | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.exec.aggregate.enabled` | Whether to enable aggregate by default. | true | +| `spark.comet.exec.broadcastExchange.enabled` | Whether to enable broadcastExchange by default. | true | +| `spark.comet.exec.broadcastHashJoin.enabled` | Whether to enable broadcastHashJoin by default. | true | +| `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true | +| `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true | +| `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true | +| `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true | +| `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | +| `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | +| `spark.comet.exec.localLimit.enabled` | Whether to enable localLimit by default. | true | +| `spark.comet.exec.localTableScan.enabled` | Whether to enable localTableScan by default. | false | +| `spark.comet.exec.project.enabled` | Whether to enable project by default. | true | +| `spark.comet.exec.sort.enabled` | Whether to enable sort by default. | true | +| `spark.comet.exec.sortMergeJoin.enabled` | Whether to enable sortMergeJoin by default. | true | +| `spark.comet.exec.sortMergeJoinWithJoinFilter.enabled` | Experimental support for Sort Merge Join with filter | false | +| `spark.comet.exec.takeOrderedAndProject.enabled` | Whether to enable takeOrderedAndProject by default. | true | +| `spark.comet.exec.union.enabled` | Whether to enable union by default. | true | +| `spark.comet.exec.window.enabled` | Whether to enable window by default. | true | ## Enabling or Disabling Individual Scalar Expressions - -| Config | Description | Default Value | -| ---------------------------------------------------------- | --------------------------------------------------------- | ------------- | -| `spark.comet.expression.Abs.enabled` | Enable Comet acceleration for `Abs` | true | -| `spark.comet.expression.Acos.enabled` | Enable Comet acceleration for `Acos` | true | -| `spark.comet.expression.Add.enabled` | Enable Comet acceleration for `Add` | true | -| `spark.comet.expression.Alias.enabled` | Enable Comet acceleration for `Alias` | true | -| `spark.comet.expression.And.enabled` | Enable Comet acceleration for `And` | true | -| `spark.comet.expression.ArrayAppend.enabled` | Enable Comet acceleration for `ArrayAppend` | true | -| `spark.comet.expression.ArrayCompact.enabled` | Enable Comet acceleration for `ArrayCompact` | true | -| `spark.comet.expression.ArrayContains.enabled` | Enable Comet acceleration for `ArrayContains` | true | -| `spark.comet.expression.ArrayDistinct.enabled` | Enable Comet acceleration for `ArrayDistinct` | true | -| `spark.comet.expression.ArrayExcept.enabled` | Enable Comet acceleration for `ArrayExcept` | true | -| `spark.comet.expression.ArrayFilter.enabled` | Enable Comet acceleration for `ArrayFilter` | true | -| `spark.comet.expression.ArrayInsert.enabled` | Enable Comet acceleration for `ArrayInsert` | true | -| `spark.comet.expression.ArrayIntersect.enabled` | Enable Comet acceleration for `ArrayIntersect` | true | -| `spark.comet.expression.ArrayJoin.enabled` | Enable Comet acceleration for `ArrayJoin` | true | -| `spark.comet.expression.ArrayMax.enabled` | Enable Comet acceleration for `ArrayMax` | true | -| `spark.comet.expression.ArrayMin.enabled` | Enable Comet acceleration for `ArrayMin` | true | -| `spark.comet.expression.ArrayRemove.enabled` | Enable Comet acceleration for `ArrayRemove` | true | -| `spark.comet.expression.ArrayRepeat.enabled` | Enable Comet acceleration for `ArrayRepeat` | true | -| `spark.comet.expression.ArrayUnion.enabled` | Enable Comet acceleration for `ArrayUnion` | true | -| `spark.comet.expression.ArraysOverlap.enabled` | Enable Comet acceleration for `ArraysOverlap` | true | -| `spark.comet.expression.Ascii.enabled` | Enable Comet acceleration for `Ascii` | true | -| `spark.comet.expression.Asin.enabled` | Enable Comet acceleration for `Asin` | true | -| `spark.comet.expression.Atan.enabled` | Enable Comet acceleration for `Atan` | true | -| `spark.comet.expression.Atan2.enabled` | Enable Comet acceleration for `Atan2` | true | -| `spark.comet.expression.AttributeReference.enabled` | Enable Comet acceleration for `AttributeReference` | true | -| `spark.comet.expression.BitLength.enabled` | Enable Comet acceleration for `BitLength` | true | -| `spark.comet.expression.BitwiseAnd.enabled` | Enable Comet acceleration for `BitwiseAnd` | true | -| `spark.comet.expression.BitwiseCount.enabled` | Enable Comet acceleration for `BitwiseCount` | true | -| `spark.comet.expression.BitwiseGet.enabled` | Enable Comet acceleration for `BitwiseGet` | true | -| `spark.comet.expression.BitwiseNot.enabled` | Enable Comet acceleration for `BitwiseNot` | true | -| `spark.comet.expression.BitwiseOr.enabled` | Enable Comet acceleration for `BitwiseOr` | true | -| `spark.comet.expression.BitwiseXor.enabled` | Enable Comet acceleration for `BitwiseXor` | true | -| `spark.comet.expression.CaseWhen.enabled` | Enable Comet acceleration for `CaseWhen` | true | -| `spark.comet.expression.Cast.enabled` | Enable Comet acceleration for `Cast` | true | -| `spark.comet.expression.Ceil.enabled` | Enable Comet acceleration for `Ceil` | true | -| `spark.comet.expression.CheckOverflow.enabled` | Enable Comet acceleration for `CheckOverflow` | true | -| `spark.comet.expression.Chr.enabled` | Enable Comet acceleration for `Chr` | true | -| `spark.comet.expression.Coalesce.enabled` | Enable Comet acceleration for `Coalesce` | true | -| `spark.comet.expression.Concat.enabled` | Enable Comet acceleration for `Concat` | true | -| `spark.comet.expression.ConcatWs.enabled` | Enable Comet acceleration for `ConcatWs` | true | -| `spark.comet.expression.Contains.enabled` | Enable Comet acceleration for `Contains` | true | -| `spark.comet.expression.Cos.enabled` | Enable Comet acceleration for `Cos` | true | -| `spark.comet.expression.Cot.enabled` | Enable Comet acceleration for `Cot` | true | -| `spark.comet.expression.CreateArray.enabled` | Enable Comet acceleration for `CreateArray` | true | -| `spark.comet.expression.CreateNamedStruct.enabled` | Enable Comet acceleration for `CreateNamedStruct` | true | -| `spark.comet.expression.DateAdd.enabled` | Enable Comet acceleration for `DateAdd` | true | -| `spark.comet.expression.DateSub.enabled` | Enable Comet acceleration for `DateSub` | true | -| `spark.comet.expression.DayOfMonth.enabled` | Enable Comet acceleration for `DayOfMonth` | true | -| `spark.comet.expression.DayOfWeek.enabled` | Enable Comet acceleration for `DayOfWeek` | true | -| `spark.comet.expression.DayOfYear.enabled` | Enable Comet acceleration for `DayOfYear` | true | -| `spark.comet.expression.Divide.enabled` | Enable Comet acceleration for `Divide` | true | -| `spark.comet.expression.ElementAt.enabled` | Enable Comet acceleration for `ElementAt` | true | -| `spark.comet.expression.EndsWith.enabled` | Enable Comet acceleration for `EndsWith` | true | -| `spark.comet.expression.EqualNullSafe.enabled` | Enable Comet acceleration for `EqualNullSafe` | true | -| `spark.comet.expression.EqualTo.enabled` | Enable Comet acceleration for `EqualTo` | true | -| `spark.comet.expression.Exp.enabled` | Enable Comet acceleration for `Exp` | true | -| `spark.comet.expression.Expm1.enabled` | Enable Comet acceleration for `Expm1` | true | -| `spark.comet.expression.Flatten.enabled` | Enable Comet acceleration for `Flatten` | true | -| `spark.comet.expression.Floor.enabled` | Enable Comet acceleration for `Floor` | true | -| `spark.comet.expression.FromUnixTime.enabled` | Enable Comet acceleration for `FromUnixTime` | true | -| `spark.comet.expression.GetArrayItem.enabled` | Enable Comet acceleration for `GetArrayItem` | true | -| `spark.comet.expression.GetArrayStructFields.enabled` | Enable Comet acceleration for `GetArrayStructFields` | true | -| `spark.comet.expression.GetMapValue.enabled` | Enable Comet acceleration for `GetMapValue` | true | -| `spark.comet.expression.GetStructField.enabled` | Enable Comet acceleration for `GetStructField` | true | -| `spark.comet.expression.GreaterThan.enabled` | Enable Comet acceleration for `GreaterThan` | true | -| `spark.comet.expression.GreaterThanOrEqual.enabled` | Enable Comet acceleration for `GreaterThanOrEqual` | true | -| `spark.comet.expression.Hex.enabled` | Enable Comet acceleration for `Hex` | true | -| `spark.comet.expression.Hour.enabled` | Enable Comet acceleration for `Hour` | true | -| `spark.comet.expression.If.enabled` | Enable Comet acceleration for `If` | true | -| `spark.comet.expression.In.enabled` | Enable Comet acceleration for `In` | true | -| `spark.comet.expression.InSet.enabled` | Enable Comet acceleration for `InSet` | true | -| `spark.comet.expression.InitCap.enabled` | Enable Comet acceleration for `InitCap` | true | -| `spark.comet.expression.IntegralDivide.enabled` | Enable Comet acceleration for `IntegralDivide` | true | -| `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for `IsNaN` | true | -| `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for `IsNotNull` | true | -| `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true | -| `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true | -| `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true | -| `spark.comet.expression.LessThanOrEqual.enabled` | Enable Comet acceleration for `LessThanOrEqual` | true | -| `spark.comet.expression.Like.enabled` | Enable Comet acceleration for `Like` | true | -| `spark.comet.expression.Literal.enabled` | Enable Comet acceleration for `Literal` | true | -| `spark.comet.expression.Log.enabled` | Enable Comet acceleration for `Log` | true | -| `spark.comet.expression.Log10.enabled` | Enable Comet acceleration for `Log10` | true | -| `spark.comet.expression.Log2.enabled` | Enable Comet acceleration for `Log2` | true | -| `spark.comet.expression.Lower.enabled` | Enable Comet acceleration for `Lower` | true | -| `spark.comet.expression.MapEntries.enabled` | Enable Comet acceleration for `MapEntries` | true | -| `spark.comet.expression.MapFromArrays.enabled` | Enable Comet acceleration for `MapFromArrays` | true | -| `spark.comet.expression.MapKeys.enabled` | Enable Comet acceleration for `MapKeys` | true | -| `spark.comet.expression.MapValues.enabled` | Enable Comet acceleration for `MapValues` | true | -| `spark.comet.expression.Md5.enabled` | Enable Comet acceleration for `Md5` | true | -| `spark.comet.expression.Minute.enabled` | Enable Comet acceleration for `Minute` | true | -| `spark.comet.expression.MonotonicallyIncreasingID.enabled` | Enable Comet acceleration for `MonotonicallyIncreasingID` | true | -| `spark.comet.expression.Month.enabled` | Enable Comet acceleration for `Month` | true | -| `spark.comet.expression.Multiply.enabled` | Enable Comet acceleration for `Multiply` | true | -| `spark.comet.expression.Murmur3Hash.enabled` | Enable Comet acceleration for `Murmur3Hash` | true | -| `spark.comet.expression.Not.enabled` | Enable Comet acceleration for `Not` | true | -| `spark.comet.expression.OctetLength.enabled` | Enable Comet acceleration for `OctetLength` | true | -| `spark.comet.expression.Or.enabled` | Enable Comet acceleration for `Or` | true | -| `spark.comet.expression.Pow.enabled` | Enable Comet acceleration for `Pow` | true | -| `spark.comet.expression.Quarter.enabled` | Enable Comet acceleration for `Quarter` | true | -| `spark.comet.expression.RLike.enabled` | Enable Comet acceleration for `RLike` | true | -| `spark.comet.expression.Rand.enabled` | Enable Comet acceleration for `Rand` | true | -| `spark.comet.expression.Randn.enabled` | Enable Comet acceleration for `Randn` | true | -| `spark.comet.expression.RegExpReplace.enabled` | Enable Comet acceleration for `RegExpReplace` | true | -| `spark.comet.expression.Remainder.enabled` | Enable Comet acceleration for `Remainder` | true | -| `spark.comet.expression.Reverse.enabled` | Enable Comet acceleration for `Reverse` | true | -| `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true | -| `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true | -| `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true | -| `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true | -| `spark.comet.expression.ShiftLeft.enabled` | Enable Comet acceleration for `ShiftLeft` | true | -| `spark.comet.expression.ShiftRight.enabled` | Enable Comet acceleration for `ShiftRight` | true | -| `spark.comet.expression.Signum.enabled` | Enable Comet acceleration for `Signum` | true | -| `spark.comet.expression.Sin.enabled` | Enable Comet acceleration for `Sin` | true | -| `spark.comet.expression.SortOrder.enabled` | Enable Comet acceleration for `SortOrder` | true | -| `spark.comet.expression.SparkPartitionID.enabled` | Enable Comet acceleration for `SparkPartitionID` | true | -| `spark.comet.expression.Sqrt.enabled` | Enable Comet acceleration for `Sqrt` | true | -| `spark.comet.expression.StartsWith.enabled` | Enable Comet acceleration for `StartsWith` | true | -| `spark.comet.expression.StaticInvoke.enabled` | Enable Comet acceleration for `StaticInvoke` | true | -| `spark.comet.expression.StringInstr.enabled` | Enable Comet acceleration for `StringInstr` | true | -| `spark.comet.expression.StringLPad.enabled` | Enable Comet acceleration for `StringLPad` | true | -| `spark.comet.expression.StringRPad.enabled` | Enable Comet acceleration for `StringRPad` | true | -| `spark.comet.expression.StringRepeat.enabled` | Enable Comet acceleration for `StringRepeat` | true | -| `spark.comet.expression.StringReplace.enabled` | Enable Comet acceleration for `StringReplace` | true | -| `spark.comet.expression.StringSpace.enabled` | Enable Comet acceleration for `StringSpace` | true | -| `spark.comet.expression.StringTranslate.enabled` | Enable Comet acceleration for `StringTranslate` | true | -| `spark.comet.expression.StringTrim.enabled` | Enable Comet acceleration for `StringTrim` | true | -| `spark.comet.expression.StringTrimBoth.enabled` | Enable Comet acceleration for `StringTrimBoth` | true | -| `spark.comet.expression.StringTrimLeft.enabled` | Enable Comet acceleration for `StringTrimLeft` | true | -| `spark.comet.expression.StringTrimRight.enabled` | Enable Comet acceleration for `StringTrimRight` | true | -| `spark.comet.expression.StructsToJson.enabled` | Enable Comet acceleration for `StructsToJson` | true | -| `spark.comet.expression.Substring.enabled` | Enable Comet acceleration for `Substring` | true | -| `spark.comet.expression.Subtract.enabled` | Enable Comet acceleration for `Subtract` | true | -| `spark.comet.expression.Tan.enabled` | Enable Comet acceleration for `Tan` | true | -| `spark.comet.expression.TruncDate.enabled` | Enable Comet acceleration for `TruncDate` | true | -| `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true | -| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true | -| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true | -| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true | -| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true | -| `spark.comet.expression.WeekOfYear.enabled` | Enable Comet acceleration for `WeekOfYear` | true | -| `spark.comet.expression.XxHash64.enabled` | Enable Comet acceleration for `XxHash64` | true | -| `spark.comet.expression.Year.enabled` | Enable Comet acceleration for `Year` | true | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.expression.Abs.enabled` | Enable Comet acceleration for `Abs` | true | +| `spark.comet.expression.Acos.enabled` | Enable Comet acceleration for `Acos` | true | +| `spark.comet.expression.Add.enabled` | Enable Comet acceleration for `Add` | true | +| `spark.comet.expression.Alias.enabled` | Enable Comet acceleration for `Alias` | true | +| `spark.comet.expression.And.enabled` | Enable Comet acceleration for `And` | true | +| `spark.comet.expression.ArrayAppend.enabled` | Enable Comet acceleration for `ArrayAppend` | true | +| `spark.comet.expression.ArrayCompact.enabled` | Enable Comet acceleration for `ArrayCompact` | true | +| `spark.comet.expression.ArrayContains.enabled` | Enable Comet acceleration for `ArrayContains` | true | +| `spark.comet.expression.ArrayDistinct.enabled` | Enable Comet acceleration for `ArrayDistinct` | true | +| `spark.comet.expression.ArrayExcept.enabled` | Enable Comet acceleration for `ArrayExcept` | true | +| `spark.comet.expression.ArrayFilter.enabled` | Enable Comet acceleration for `ArrayFilter` | true | +| `spark.comet.expression.ArrayInsert.enabled` | Enable Comet acceleration for `ArrayInsert` | true | +| `spark.comet.expression.ArrayIntersect.enabled` | Enable Comet acceleration for `ArrayIntersect` | true | +| `spark.comet.expression.ArrayJoin.enabled` | Enable Comet acceleration for `ArrayJoin` | true | +| `spark.comet.expression.ArrayMax.enabled` | Enable Comet acceleration for `ArrayMax` | true | +| `spark.comet.expression.ArrayMin.enabled` | Enable Comet acceleration for `ArrayMin` | true | +| `spark.comet.expression.ArrayRemove.enabled` | Enable Comet acceleration for `ArrayRemove` | true | +| `spark.comet.expression.ArrayRepeat.enabled` | Enable Comet acceleration for `ArrayRepeat` | true | +| `spark.comet.expression.ArrayUnion.enabled` | Enable Comet acceleration for `ArrayUnion` | true | +| `spark.comet.expression.ArraysOverlap.enabled` | Enable Comet acceleration for `ArraysOverlap` | true | +| `spark.comet.expression.Ascii.enabled` | Enable Comet acceleration for `Ascii` | true | +| `spark.comet.expression.Asin.enabled` | Enable Comet acceleration for `Asin` | true | +| `spark.comet.expression.Atan.enabled` | Enable Comet acceleration for `Atan` | true | +| `spark.comet.expression.Atan2.enabled` | Enable Comet acceleration for `Atan2` | true | +| `spark.comet.expression.AttributeReference.enabled` | Enable Comet acceleration for `AttributeReference` | true | +| `spark.comet.expression.BitLength.enabled` | Enable Comet acceleration for `BitLength` | true | +| `spark.comet.expression.BitwiseAnd.enabled` | Enable Comet acceleration for `BitwiseAnd` | true | +| `spark.comet.expression.BitwiseCount.enabled` | Enable Comet acceleration for `BitwiseCount` | true | +| `spark.comet.expression.BitwiseGet.enabled` | Enable Comet acceleration for `BitwiseGet` | true | +| `spark.comet.expression.BitwiseNot.enabled` | Enable Comet acceleration for `BitwiseNot` | true | +| `spark.comet.expression.BitwiseOr.enabled` | Enable Comet acceleration for `BitwiseOr` | true | +| `spark.comet.expression.BitwiseXor.enabled` | Enable Comet acceleration for `BitwiseXor` | true | +| `spark.comet.expression.CaseWhen.enabled` | Enable Comet acceleration for `CaseWhen` | true | +| `spark.comet.expression.Cast.enabled` | Enable Comet acceleration for `Cast` | true | +| `spark.comet.expression.Ceil.enabled` | Enable Comet acceleration for `Ceil` | true | +| `spark.comet.expression.CheckOverflow.enabled` | Enable Comet acceleration for `CheckOverflow` | true | +| `spark.comet.expression.Chr.enabled` | Enable Comet acceleration for `Chr` | true | +| `spark.comet.expression.Coalesce.enabled` | Enable Comet acceleration for `Coalesce` | true | +| `spark.comet.expression.Concat.enabled` | Enable Comet acceleration for `Concat` | true | +| `spark.comet.expression.ConcatWs.enabled` | Enable Comet acceleration for `ConcatWs` | true | +| `spark.comet.expression.Contains.enabled` | Enable Comet acceleration for `Contains` | true | +| `spark.comet.expression.Cos.enabled` | Enable Comet acceleration for `Cos` | true | +| `spark.comet.expression.Cot.enabled` | Enable Comet acceleration for `Cot` | true | +| `spark.comet.expression.CreateArray.enabled` | Enable Comet acceleration for `CreateArray` | true | +| `spark.comet.expression.CreateNamedStruct.enabled` | Enable Comet acceleration for `CreateNamedStruct` | true | +| `spark.comet.expression.DateAdd.enabled` | Enable Comet acceleration for `DateAdd` | true | +| `spark.comet.expression.DateSub.enabled` | Enable Comet acceleration for `DateSub` | true | +| `spark.comet.expression.DayOfMonth.enabled` | Enable Comet acceleration for `DayOfMonth` | true | +| `spark.comet.expression.DayOfWeek.enabled` | Enable Comet acceleration for `DayOfWeek` | true | +| `spark.comet.expression.DayOfYear.enabled` | Enable Comet acceleration for `DayOfYear` | true | +| `spark.comet.expression.Divide.enabled` | Enable Comet acceleration for `Divide` | true | +| `spark.comet.expression.ElementAt.enabled` | Enable Comet acceleration for `ElementAt` | true | +| `spark.comet.expression.EndsWith.enabled` | Enable Comet acceleration for `EndsWith` | true | +| `spark.comet.expression.EqualNullSafe.enabled` | Enable Comet acceleration for `EqualNullSafe` | true | +| `spark.comet.expression.EqualTo.enabled` | Enable Comet acceleration for `EqualTo` | true | +| `spark.comet.expression.Exp.enabled` | Enable Comet acceleration for `Exp` | true | +| `spark.comet.expression.Expm1.enabled` | Enable Comet acceleration for `Expm1` | true | +| `spark.comet.expression.Flatten.enabled` | Enable Comet acceleration for `Flatten` | true | +| `spark.comet.expression.Floor.enabled` | Enable Comet acceleration for `Floor` | true | +| `spark.comet.expression.FromUnixTime.enabled` | Enable Comet acceleration for `FromUnixTime` | true | +| `spark.comet.expression.GetArrayItem.enabled` | Enable Comet acceleration for `GetArrayItem` | true | +| `spark.comet.expression.GetArrayStructFields.enabled` | Enable Comet acceleration for `GetArrayStructFields` | true | +| `spark.comet.expression.GetMapValue.enabled` | Enable Comet acceleration for `GetMapValue` | true | +| `spark.comet.expression.GetStructField.enabled` | Enable Comet acceleration for `GetStructField` | true | +| `spark.comet.expression.GreaterThan.enabled` | Enable Comet acceleration for `GreaterThan` | true | +| `spark.comet.expression.GreaterThanOrEqual.enabled` | Enable Comet acceleration for `GreaterThanOrEqual` | true | +| `spark.comet.expression.Hex.enabled` | Enable Comet acceleration for `Hex` | true | +| `spark.comet.expression.Hour.enabled` | Enable Comet acceleration for `Hour` | true | +| `spark.comet.expression.If.enabled` | Enable Comet acceleration for `If` | true | +| `spark.comet.expression.In.enabled` | Enable Comet acceleration for `In` | true | +| `spark.comet.expression.InSet.enabled` | Enable Comet acceleration for `InSet` | true | +| `spark.comet.expression.InitCap.enabled` | Enable Comet acceleration for `InitCap` | true | +| `spark.comet.expression.IntegralDivide.enabled` | Enable Comet acceleration for `IntegralDivide` | true | +| `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for `IsNaN` | true | +| `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for `IsNotNull` | true | +| `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true | +| `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true | +| `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true | +| `spark.comet.expression.LessThanOrEqual.enabled` | Enable Comet acceleration for `LessThanOrEqual` | true | +| `spark.comet.expression.Like.enabled` | Enable Comet acceleration for `Like` | true | +| `spark.comet.expression.Literal.enabled` | Enable Comet acceleration for `Literal` | true | +| `spark.comet.expression.Log.enabled` | Enable Comet acceleration for `Log` | true | +| `spark.comet.expression.Log10.enabled` | Enable Comet acceleration for `Log10` | true | +| `spark.comet.expression.Log2.enabled` | Enable Comet acceleration for `Log2` | true | +| `spark.comet.expression.Lower.enabled` | Enable Comet acceleration for `Lower` | true | +| `spark.comet.expression.MapEntries.enabled` | Enable Comet acceleration for `MapEntries` | true | +| `spark.comet.expression.MapFromArrays.enabled` | Enable Comet acceleration for `MapFromArrays` | true | +| `spark.comet.expression.MapKeys.enabled` | Enable Comet acceleration for `MapKeys` | true | +| `spark.comet.expression.MapValues.enabled` | Enable Comet acceleration for `MapValues` | true | +| `spark.comet.expression.Md5.enabled` | Enable Comet acceleration for `Md5` | true | +| `spark.comet.expression.Minute.enabled` | Enable Comet acceleration for `Minute` | true | +| `spark.comet.expression.MonotonicallyIncreasingID.enabled` | Enable Comet acceleration for `MonotonicallyIncreasingID` | true | +| `spark.comet.expression.Month.enabled` | Enable Comet acceleration for `Month` | true | +| `spark.comet.expression.Multiply.enabled` | Enable Comet acceleration for `Multiply` | true | +| `spark.comet.expression.Murmur3Hash.enabled` | Enable Comet acceleration for `Murmur3Hash` | true | +| `spark.comet.expression.Not.enabled` | Enable Comet acceleration for `Not` | true | +| `spark.comet.expression.OctetLength.enabled` | Enable Comet acceleration for `OctetLength` | true | +| `spark.comet.expression.Or.enabled` | Enable Comet acceleration for `Or` | true | +| `spark.comet.expression.Pow.enabled` | Enable Comet acceleration for `Pow` | true | +| `spark.comet.expression.Quarter.enabled` | Enable Comet acceleration for `Quarter` | true | +| `spark.comet.expression.RLike.enabled` | Enable Comet acceleration for `RLike` | true | +| `spark.comet.expression.Rand.enabled` | Enable Comet acceleration for `Rand` | true | +| `spark.comet.expression.Randn.enabled` | Enable Comet acceleration for `Randn` | true | +| `spark.comet.expression.RegExpReplace.enabled` | Enable Comet acceleration for `RegExpReplace` | true | +| `spark.comet.expression.Remainder.enabled` | Enable Comet acceleration for `Remainder` | true | +| `spark.comet.expression.Reverse.enabled` | Enable Comet acceleration for `Reverse` | true | +| `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true | +| `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true | +| `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true | +| `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true | +| `spark.comet.expression.ShiftLeft.enabled` | Enable Comet acceleration for `ShiftLeft` | true | +| `spark.comet.expression.ShiftRight.enabled` | Enable Comet acceleration for `ShiftRight` | true | +| `spark.comet.expression.Signum.enabled` | Enable Comet acceleration for `Signum` | true | +| `spark.comet.expression.Sin.enabled` | Enable Comet acceleration for `Sin` | true | +| `spark.comet.expression.SortOrder.enabled` | Enable Comet acceleration for `SortOrder` | true | +| `spark.comet.expression.SparkPartitionID.enabled` | Enable Comet acceleration for `SparkPartitionID` | true | +| `spark.comet.expression.Sqrt.enabled` | Enable Comet acceleration for `Sqrt` | true | +| `spark.comet.expression.StartsWith.enabled` | Enable Comet acceleration for `StartsWith` | true | +| `spark.comet.expression.StaticInvoke.enabled` | Enable Comet acceleration for `StaticInvoke` | true | +| `spark.comet.expression.StringInstr.enabled` | Enable Comet acceleration for `StringInstr` | true | +| `spark.comet.expression.StringLPad.enabled` | Enable Comet acceleration for `StringLPad` | true | +| `spark.comet.expression.StringRPad.enabled` | Enable Comet acceleration for `StringRPad` | true | +| `spark.comet.expression.StringRepeat.enabled` | Enable Comet acceleration for `StringRepeat` | true | +| `spark.comet.expression.StringReplace.enabled` | Enable Comet acceleration for `StringReplace` | true | +| `spark.comet.expression.StringSpace.enabled` | Enable Comet acceleration for `StringSpace` | true | +| `spark.comet.expression.StringTranslate.enabled` | Enable Comet acceleration for `StringTranslate` | true | +| `spark.comet.expression.StringTrim.enabled` | Enable Comet acceleration for `StringTrim` | true | +| `spark.comet.expression.StringTrimBoth.enabled` | Enable Comet acceleration for `StringTrimBoth` | true | +| `spark.comet.expression.StringTrimLeft.enabled` | Enable Comet acceleration for `StringTrimLeft` | true | +| `spark.comet.expression.StringTrimRight.enabled` | Enable Comet acceleration for `StringTrimRight` | true | +| `spark.comet.expression.StructsToJson.enabled` | Enable Comet acceleration for `StructsToJson` | true | +| `spark.comet.expression.Substring.enabled` | Enable Comet acceleration for `Substring` | true | +| `spark.comet.expression.Subtract.enabled` | Enable Comet acceleration for `Subtract` | true | +| `spark.comet.expression.Tan.enabled` | Enable Comet acceleration for `Tan` | true | +| `spark.comet.expression.TruncDate.enabled` | Enable Comet acceleration for `TruncDate` | true | +| `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true | +| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true | +| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true | +| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true | +| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true | +| `spark.comet.expression.WeekOfYear.enabled` | Enable Comet acceleration for `WeekOfYear` | true | +| `spark.comet.expression.XxHash64.enabled` | Enable Comet acceleration for `XxHash64` | true | +| `spark.comet.expression.Year.enabled` | Enable Comet acceleration for `Year` | true | ## Enabling or Disabling Individual Aggregate Expressions - -| Config | Description | Default Value | -| ----------------------------------------------------- | ---------------------------------------------------- | ------------- | -| `spark.comet.expression.Average.enabled` | Enable Comet acceleration for `Average` | true | -| `spark.comet.expression.BitAndAgg.enabled` | Enable Comet acceleration for `BitAndAgg` | true | -| `spark.comet.expression.BitOrAgg.enabled` | Enable Comet acceleration for `BitOrAgg` | true | -| `spark.comet.expression.BitXorAgg.enabled` | Enable Comet acceleration for `BitXorAgg` | true | -| `spark.comet.expression.BloomFilterAggregate.enabled` | Enable Comet acceleration for `BloomFilterAggregate` | true | -| `spark.comet.expression.Corr.enabled` | Enable Comet acceleration for `Corr` | true | -| `spark.comet.expression.Count.enabled` | Enable Comet acceleration for `Count` | true | -| `spark.comet.expression.CovPopulation.enabled` | Enable Comet acceleration for `CovPopulation` | true | -| `spark.comet.expression.CovSample.enabled` | Enable Comet acceleration for `CovSample` | true | -| `spark.comet.expression.First.enabled` | Enable Comet acceleration for `First` | true | -| `spark.comet.expression.Last.enabled` | Enable Comet acceleration for `Last` | true | -| `spark.comet.expression.Max.enabled` | Enable Comet acceleration for `Max` | true | -| `spark.comet.expression.Min.enabled` | Enable Comet acceleration for `Min` | true | -| `spark.comet.expression.StddevPop.enabled` | Enable Comet acceleration for `StddevPop` | true | -| `spark.comet.expression.StddevSamp.enabled` | Enable Comet acceleration for `StddevSamp` | true | -| `spark.comet.expression.Sum.enabled` | Enable Comet acceleration for `Sum` | true | -| `spark.comet.expression.VariancePop.enabled` | Enable Comet acceleration for `VariancePop` | true | -| `spark.comet.expression.VarianceSamp.enabled` | Enable Comet acceleration for `VarianceSamp` | true | - +| Config | Description | Default Value | +|--------|-------------|---------------| +| `spark.comet.expression.Average.enabled` | Enable Comet acceleration for `Average` | true | +| `spark.comet.expression.BitAndAgg.enabled` | Enable Comet acceleration for `BitAndAgg` | true | +| `spark.comet.expression.BitOrAgg.enabled` | Enable Comet acceleration for `BitOrAgg` | true | +| `spark.comet.expression.BitXorAgg.enabled` | Enable Comet acceleration for `BitXorAgg` | true | +| `spark.comet.expression.BloomFilterAggregate.enabled` | Enable Comet acceleration for `BloomFilterAggregate` | true | +| `spark.comet.expression.Corr.enabled` | Enable Comet acceleration for `Corr` | true | +| `spark.comet.expression.Count.enabled` | Enable Comet acceleration for `Count` | true | +| `spark.comet.expression.CovPopulation.enabled` | Enable Comet acceleration for `CovPopulation` | true | +| `spark.comet.expression.CovSample.enabled` | Enable Comet acceleration for `CovSample` | true | +| `spark.comet.expression.First.enabled` | Enable Comet acceleration for `First` | true | +| `spark.comet.expression.Last.enabled` | Enable Comet acceleration for `Last` | true | +| `spark.comet.expression.Max.enabled` | Enable Comet acceleration for `Max` | true | +| `spark.comet.expression.Min.enabled` | Enable Comet acceleration for `Min` | true | +| `spark.comet.expression.StddevPop.enabled` | Enable Comet acceleration for `StddevPop` | true | +| `spark.comet.expression.StddevSamp.enabled` | Enable Comet acceleration for `StddevSamp` | true | +| `spark.comet.expression.Sum.enabled` | Enable Comet acceleration for `Sum` | true | +| `spark.comet.expression.VariancePop.enabled` | Enable Comet acceleration for `VariancePop` | true | +| `spark.comet.expression.VarianceSamp.enabled` | Enable Comet acceleration for `VarianceSamp` | true | diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 6a5e61b8d7..e510930d95 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -113,10 +113,10 @@ use datafusion_comet_proto::{ }; use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId; use datafusion_comet_spark_expr::{ - ArrayInsert, Avg, AvgDecimal, AvgInt, Cast, CheckOverflow, Correlation, Covariance, - CreateNamedStruct, GetArrayStructFields, GetStructField, IfExpr, ListExtract, - NormalizeNaNAndZero, RLike, RandExpr, RandnExpr, SparkCastOptions, Stddev, SubstringExpr, - SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance, + ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct, + GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RLike, + RandExpr, RandnExpr, SparkCastOptions, Stddev, SubstringExpr, SumDecimal, TimestampTruncExpr, + ToJson, UnboundColumn, Variance, }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -1853,28 +1853,24 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let input_datatype = to_arrow_datatype(expr.sum_datatype.as_ref().unwrap()); + let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + let builder = match datatype { - DataType::Int8 - | DataType::UInt8 - | DataType::Int16 - | DataType::UInt16 - | DataType::Int32 => { - let func = - AggregateUDF::new_from_impl(AvgInt::new(datatype, input_datatype)); - AggregateExprBuilder::new(Arc::new(func), vec![child]) - } DataType::Decimal128(_, _) => { let func = AggregateUDF::new_from_impl(AvgDecimal::new(datatype, input_datatype)); AggregateExprBuilder::new(Arc::new(func), vec![child]) } _ => { - // cast to the result data type of AVG if the result data type is different - // from the input type, e.g. AVG(Int32). We should not expect a cast - // failure since it should have already been checked at Spark side. + // For all other numeric types (Int8/16/32/64, Float32/64): + // Cast to Float64 for accumulation let child: Arc = - Arc::new(CastExpr::new(Arc::clone(&child), datatype.clone(), None)); - let func = AggregateUDF::new_from_impl(Avg::new("avg", datatype)); + Arc::new(CastExpr::new(Arc::clone(&child), DataType::Float64, None)); + let func = AggregateUDF::new_from_impl(Avg::new( + "avg", + DataType::Float64, + eval_mode, + )); AggregateExprBuilder::new(Arc::new(func), vec![child]) } }; diff --git a/native/spark-expr/src/agg_funcs/avg.rs b/native/spark-expr/src/agg_funcs/avg.rs index e8b90b4f46..9850c02605 100644 --- a/native/spark-expr/src/agg_funcs/avg.rs +++ b/native/spark-expr/src/agg_funcs/avg.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::EvalMode; use arrow::array::{ builder::PrimitiveBuilder, cast::AsArray, types::{Float64Type, Int64Type}, - Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray, + Array, ArrayRef, ArrowNativeTypeOp, ArrowNumericType, Int64Array, PrimitiveArray, }; use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; @@ -31,24 +32,22 @@ use datafusion::logical_expr::{ use datafusion::physical_expr::expressions::format_state_name; use std::{any::Any, sync::Arc}; -use arrow::array::ArrowNativeTypeOp; use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion::logical_expr::Volatility::Immutable; use DataType::*; -/// AVG aggregate expression #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Avg { name: String, signature: Signature, - // expr: Arc, input_data_type: DataType, result_data_type: DataType, + eval_mode: EvalMode, } impl Avg { /// Create a new AVG aggregate function - pub fn new(name: impl Into, data_type: DataType) -> Self { + pub fn new(name: impl Into, data_type: DataType, eval_mode: EvalMode) -> Self { let result_data_type = avg_return_type("avg", &data_type).unwrap(); Self { @@ -56,20 +55,20 @@ impl Avg { signature: Signature::user_defined(Immutable), input_data_type: data_type, result_data_type, + eval_mode, } } } impl AggregateUDFImpl for Avg { - /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self } fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { - // instantiate specialized accumulator based for the type + // All numeric types use Float64 accumulation after casting match (&self.input_data_type, &self.result_data_type) { - (Float64, Float64) => Ok(Box::::default()), + (Float64, Float64) => Ok(Box::new(AvgAccumulator::new(self.eval_mode))), _ => not_impl_err!( "AvgAccumulator for ({} --> {})", self.input_data_type, @@ -109,10 +108,10 @@ impl AggregateUDFImpl for Avg { &self, _args: AccumulatorArgs, ) -> Result> { - // instantiate specialized accumulator based for the type match (&self.input_data_type, &self.result_data_type) { (Float64, Float64) => Ok(Box::new(AvgGroupsAccumulator::::new( &self.input_data_type, + self.eval_mode, |sum: f64, count: i64| Ok(sum / count as f64), ))), @@ -137,11 +136,22 @@ impl AggregateUDFImpl for Avg { } } -/// An accumulator to compute the average -#[derive(Debug, Default)] +#[derive(Debug)] pub struct AvgAccumulator { sum: Option, count: i64, + #[allow(dead_code)] + eval_mode: EvalMode, +} + +impl AvgAccumulator { + pub fn new(eval_mode: EvalMode) -> Self { + Self { + sum: None, + count: 0, + eval_mode, + } + } } impl Accumulator for AvgAccumulator { @@ -166,7 +176,7 @@ impl Accumulator for AvgAccumulator { // counts are summed self.count += sum(states[1].as_primitive::()).unwrap_or_default(); - // sums are summed + // sums are summed - no overflow checking if let Some(x) = sum(states[0].as_primitive::()) { let v = self.sum.get_or_insert(0.); *v += x; @@ -176,8 +186,6 @@ impl Accumulator for AvgAccumulator { fn evaluate(&mut self) -> Result { if self.count == 0 { - // If all input are nulls, count will be 0 and we will get null after the division. - // This is consistent with Spark Average implementation. Ok(ScalarValue::Float64(None)) } else { Ok(ScalarValue::Float64( @@ -192,7 +200,7 @@ impl Accumulator for AvgAccumulator { } /// An accumulator to compute the average of `[PrimitiveArray]`. -/// Stores values as native types, and does overflow checking +/// Stores values as native types. /// /// F: Function that calculates the average value from a sum of /// T::Native and a total count @@ -211,6 +219,10 @@ where /// Sums per group, stored as the native type sums: Vec, + /// Evaluation mode (stored but not used for Float64) + #[allow(dead_code)] + eval_mode: EvalMode, + /// Function that computes the final average (value / count) avg_fn: F, } @@ -220,11 +232,12 @@ where T: ArrowNumericType + Send, F: Fn(T::Native, i64) -> Result + Send, { - pub fn new(return_data_type: &DataType, avg_fn: F) -> Self { + pub fn new(return_data_type: &DataType, eval_mode: EvalMode, avg_fn: F) -> Self { Self { return_data_type: return_data_type.clone(), counts: vec![], sums: vec![], + eval_mode, avg_fn, } } @@ -254,6 +267,7 @@ where if values.null_count() == 0 { for (&group_index, &value) in iter { let sum = &mut self.sums[group_index]; + // No overflow checking - INFINITY is a valid result *sum = (*sum).add_wrapping(value); self.counts[group_index] += 1; } @@ -264,7 +278,6 @@ where } let sum = &mut self.sums[group_index]; *sum = (*sum).add_wrapping(value); - self.counts[group_index] += 1; } } @@ -280,9 +293,9 @@ where total_num_groups: usize, ) -> Result<()> { assert_eq!(values.len(), 2, "two arguments to merge_batch"); - // first batch is partial sums, second is counts let partial_sums = values[0].as_primitive::(); let partial_counts = values[1].as_primitive::(); + // update counts with partial counts self.counts.resize(total_num_groups, 0); let iter1 = group_indices.iter().zip(partial_counts.values().iter()); @@ -290,7 +303,7 @@ where self.counts[group_index] += partial_count; } - // update sums + // update sums - no overflow checking self.sums.resize(total_num_groups, T::default_value()); let iter2 = group_indices.iter().zip(partial_sums.values().iter()); for (&group_index, &new_value) in iter2 { @@ -319,7 +332,6 @@ where Ok(Arc::new(array)) } - // return arrays for sums and counts fn state(&mut self, emit_to: EmitTo) -> Result> { let counts = emit_to.take_needed(&mut self.counts); let counts = Int64Array::new(counts.into(), None); diff --git a/native/spark-expr/src/agg_funcs/avg_int.rs b/native/spark-expr/src/agg_funcs/avg_int.rs deleted file mode 100644 index 103c2ac19e..0000000000 --- a/native/spark-expr/src/agg_funcs/avg_int.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::{AvgDecimal, EvalMode}; -use arrow::array::{ArrayRef, BooleanArray}; -use arrow::datatypes::{DataType, FieldRef}; -use datafusion::common::{DataFusionError, Result as DFResult, ScalarValue}; -use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs}; -use datafusion::logical_expr::type_coercion::aggregates::avg_return_type; -use datafusion::logical_expr::Volatility::Immutable; -use datafusion::logical_expr::{ - Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, -}; -use std::any::Any; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct AvgInt { - signature: Signature, - eval_mode: EvalMode, -} - -impl AvgInt { - pub fn try_new(data_type: DataType, eval_mode: EvalMode) -> DFResult { - match data_type { - DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => Ok(Self { - signature: Signature::user_defined(Immutable), - eval_mode, - }), - _ => Err(DataFusionError::Internal( - "inalid data type for AvgInt".to_string(), - )), - } - } -} - -impl AggregateUDFImpl for AvgInt { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "avg" - } - - fn reverse_expr(&self) -> ReversedUDAF { - ReversedUDAF::Identical - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> datafusion::common::Result { - avg_return_type(self.name(), &arg_types[0]) - } - - fn is_nullable(&self) -> bool { - true - } - - fn accumulator( - &self, - acc_args: AccumulatorArgs, - ) -> datafusion::common::Result> { - todo!() - } - - fn state_fields(&self, args: StateFieldsArgs) -> datafusion::common::Result> { - todo!() - } - - fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { - false - } - - fn create_groups_accumulator( - &self, - _args: AccumulatorArgs, - ) -> datafusion::common::Result> { - Ok(Box::new(AvgIntGroupsAccumulator::new(self.eval_mode))) - } - - fn default_value(&self, data_type: &DataType) -> datafusion::common::Result { - todo!() - } -} - -struct AvgIntegerAccumulator { - sum: Option, - count: u64, - eval_mode: EvalMode, -} - -impl AvgIntegerAccumulator { - fn new(eval_mode: EvalMode) -> Self { - Self { - sum: Some(0), - count: 0, - eval_mode, - } - } -} - -impl Accumulator for AvgIntegerAccumulator {} - -struct AvgIntGroupsAccumulator {} - -impl AvgIntGroupsAccumulator {} - -impl GroupsAccumulator for AvgIntGroupsAccumulator { - fn update_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> datafusion::common::Result<()> { - todo!() - } - - fn evaluate(&mut self, emit_to: EmitTo) -> datafusion::common::Result { - todo!() - } - - fn state(&mut self, emit_to: EmitTo) -> datafusion::common::Result> { - todo!() - } - - fn merge_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - ) -> datafusion::common::Result<()> { - todo!() - } - - fn size(&self) -> usize { - todo!() - } -} diff --git a/native/spark-expr/src/agg_funcs/mod.rs b/native/spark-expr/src/agg_funcs/mod.rs index 8025fc7a08..252da78890 100644 --- a/native/spark-expr/src/agg_funcs/mod.rs +++ b/native/spark-expr/src/agg_funcs/mod.rs @@ -17,7 +17,6 @@ mod avg; mod avg_decimal; -mod avg_int; mod correlation; mod covariance; mod stddev; @@ -26,7 +25,6 @@ mod variance; pub use avg::Avg; pub use avg_decimal::AvgDecimal; -pub use avg_int::AvgInt; pub use correlation::Correlation; pub use covariance::Covariance; pub use stddev::Stddev; diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 7e577c5fda..a9af3bc4f1 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -1471,6 +1471,42 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("AVG and try_avg - basic functionality") { + withParquetTable( + Seq( + (10L, 1), + (20L, 1), + (null.asInstanceOf[Long], 1), + (100L, 2), + (200L, 2), + (null.asInstanceOf[Long], 3)), + "tbl") { + + Seq(true, false).foreach({ k => + // without GROUP BY + withSQLConf(SQLConf.ANSI_ENABLED.key -> k.toString) { + val res = sql("SELECT avg(_1) FROM tbl") + checkSparkAnswerAndOperator(res) + } + + // with GROUP BY + withSQLConf(SQLConf.ANSI_ENABLED.key -> k.toString) { + val res = sql("SELECT _2, avg(_1) FROM tbl GROUP BY _2") + checkSparkAnswerAndOperator(res) + } + + }) + + // try_avg without GROUP BY + val resTry = sql("SELECT try_avg(_1) FROM tbl") + checkSparkAnswerAndOperator(resTry) + + // try_avg with GROUP BY + val resTryGroup = sql("SELECT _2, try_avg(_1) FROM tbl GROUP BY _2") + checkSparkAnswerAndOperator(resTryGroup) + } + } + protected def checkSparkAnswerAndNumOfAggregates(query: String, numAggregates: Int): Unit = { val df = sql(query) checkSparkAnswer(df)