Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,14 +464,14 @@ class GlutenClickHouseTPCHSuite extends MergeTreeSuite {
| insert into cross_join_t
| select id as a, cast(id as string) as b,
| concat('1231231232323232322', cast(id as string)) as c
| from range(0, 10000)
| from range(0, 5000)
|""".stripMargin
spark.sql(sql)
sql = """
| insert into cross_join_t
| select id as a, cast(id as string) as b,
| concat('1231231232323232322', cast(id as string)) as c
| from range(10000, 20000)
| from range(5000, 10000)
|""".stripMargin
spark.sql(sql)
sql = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,59 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS

val aggregate_sql = "select ids, aggregate(ids, 3, (acc, x) -> acc + x) from tb_array"
runQueryAndCompare(aggregate_sql)(checkGlutenPlan[ProjectExecTransformer])

val aggregate_finish_sql =
"""
|select
| aggregate(
| ids,
| cast(struct(0 as count, 0.0 as sum) as struct<count:int, sum:double>),
| (acc, x) -> struct(acc.count + 1, acc.sum + coalesce(cast(x as double), 0.0)),
| acc -> acc.sum
| )
|from tb_array
|""".stripMargin
runQueryAndCompare(aggregate_finish_sql)(checkGlutenPlan[ProjectExecTransformer])
}
}

test("array aggregate with nested struct and nulls") {
withTable("tb_array_complex") {
sql("create table tb_array_complex(items array<struct<v:int, w:double>>) using parquet")
sql("""
|insert into tb_array_complex values
|(array(named_struct('v', 1, 'w', 1.5), named_struct('v', null, 'w', 2.0), null)),
|(array()),
|(null),
|(array(named_struct('v', 2, 'w', null), named_struct('v', 3, 'w', 4.5)))
|""".stripMargin)

val aggregate_struct_sql =
"""
|select
| aggregate(
| items,
| cast(struct(0 as cnt, 0.0 as sum) as struct<cnt:int, sum:double>),
| (acc, x) -> struct(
| acc.cnt + if(x is null or x.v is null, 0, 1),
| acc.sum + coalesce(x.w, 0.0)
| ),
| acc -> if(acc.cnt = 0, cast(null as double), acc.sum / acc.cnt)
| ) as avg_w
|from tb_array_complex
|""".stripMargin
runQueryAndCompare(aggregate_struct_sql)(checkGlutenPlan[ProjectExecTransformer])

val transform_filter_sql =
"""
|select
| transform(
| filter(items, x -> x is not null),
| x -> coalesce(x.v, 0) + cast(coalesce(x.w, 0.0) as int)
| )
|from tb_array_complex
|""".stripMargin
runQueryAndCompare(transform_filter_sql)(checkGlutenPlan[ProjectExecTransformer])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite
test("nothing array in shuffle") {
val sql =
"""
|select t1.k1 as a, t1.k2 as b, t2.k1 as c, t2.k2 as d, t1.x as x from (
| select k1, k2, array() as x from t1
|select t1.k1 as a, t1.k2 as b, t2.k1 as c, t2.k2 as d, t1.x as x, t1.o as o from (
| select k1, k2, array() as x, '' as o from t1
| union all
| select k1, k2, array(123) as x from t2
| select k1, k2, array(123) as x, '123' as o from t2
|) t1 left join (
| select k1, k2 from t3
|) t2 on t1.k1 = t2.k1 and t1.k2 = t2.k2
|order by t1.k1, t1.k2, t2.k1, t2.k2
|order by t1.k1, t1.k2, t2.k1, t2.k2, t1.o
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect()
assertResult(600572)(ret.apply(0).get(0))

assertResult(462)(
assertResult(480)(
countFiles(new File(s"$dataHome/lineitem_mergetree_optimize"))
) // many merged parts
}
Expand Down Expand Up @@ -183,19 +183,19 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
assertResult(600572)(ret.apply(0).get(0))

assertResult(372)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
assertResult(343)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
if (spark32) {
assertResult(239)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
} else {
assertResult(241)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
assertResult(240)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
// the second VACUUM will remove some empty folders
if (spark32) {
assertResult(220)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
} else {
assertResult(226)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
assertResult(229)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p2")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p2").collect()
Expand All @@ -220,18 +220,18 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
assertResult(600572)(ret.apply(0).get(0))

assertResult(491)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
assertResult(458)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (spark32) {
assertResult(302)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
} else {
assertResult(304)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
assertResult(306)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
if (spark32) {
assertResult(275)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
} else {
assertResult(281)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
assertResult(288)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p3")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p3").collect()
Expand All @@ -257,18 +257,18 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
assertResult(600572)(ret.apply(0).get(0))

assertResult(491)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
assertResult(458)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (spark32) {
assertResult(302)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
} else {
assertResult(304)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
assertResult(306)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
}
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
if (spark32) {
assertResult(275)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
} else {
assertResult(281)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
assertResult(288)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p4")))
}

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p4").collect()
Expand Down Expand Up @@ -300,7 +300,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
// this case will create a checkpoint
assertResult(105)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
assertResult(106)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
Expand All @@ -323,7 +323,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
assertResult(93)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assertResult(104)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
assertResult(106)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
Expand All @@ -339,7 +339,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
assertResult(77)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
} else {
// For Spark 3.3 + Delta 2.3, vacuum command will create two commit files in deltalog dir.
assertResult(93)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
assertResult(94)(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p5")))
}

val ret = spark.sql("select count(*) from lineitem_mergetree_optimize_p5").collect()
Expand Down Expand Up @@ -367,11 +367,11 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
assertResult(600572)(ret.apply(0).get(0))

assertResult(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p6")))(
if (spark32) 491 else 519)
if (spark32) 491 else 489)
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
assertResult(countFiles(new File(s"$dataHome/lineitem_mergetree_optimize_p6")))(
if (spark32) 315 else 327)
if (spark32) 315 else 333)

val ret2 = spark.sql("select count(*) from lineitem_mergetree_optimize_p6").collect()
assertResult(600572)(ret2.apply(0).get(0))
Expand Down Expand Up @@ -443,7 +443,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
if (spark32) {
assertResult(99)(countFiles(new File(dataPath)))
} else {
assertResult(105)(countFiles(new File(dataPath)))
assertResult(106)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
Expand All @@ -466,7 +466,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
if (spark32) {
assertResult(93)(countFiles(new File(dataPath)))
} else {
assertResult(104)(countFiles(new File(dataPath)))
assertResult(106)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
Expand All @@ -482,7 +482,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite extends CreateMergeTreeSuite {
if (spark32) {
assertResult(77)(countFiles(new File(dataPath)))
} else {
assertResult(93)(countFiles(new File(dataPath)))
assertResult(94)(countFiles(new File(dataPath)))
}

val ret = spark.sql(s"select count(*) from clickhouse.`$dataPath`").collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,35 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite extends ParquetTPCHSuit
}
}

test("TPCH Q6 + New CH Parquet V3") {
compareResultsAgainstVanillaSpark(
"""
|SELECT
| sum(l_extendedprice * l_discount) AS revenue
|FROM
| lineitem
|WHERE
| l_shipdate >= date'1994-01-01'
| AND l_shipdate < date'1994-01-01' + interval 1 year
| AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
| AND l_orderkey = 119430;
|""".stripMargin,
customCheck = {
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
assert(plans.size == 1)

assert(plans(0).metrics("numFiles").value === 1)
assert(plans(0).metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans(0).metrics("filesSize").value === 19230111)
assert(plans(0).metrics("numOutputRows").value === 20000)
}
)
}

test("Check the metrics values") {
withSQLConf((GlutenConfig.COLUMNAR_SORT_ENABLED.key, "false")) {
customCheck(1, native = false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans.head.metrics("numFiles").value === 4)
assert(plans.head.metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans.head.metrics("numOutputRows").value === 600572)
assert(plans.head.metrics("numOutputRows").value === 160000)
}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ class GlutenClickHouseTPCHParquetBucketSuite

assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(2).metrics("numFiles").value === 4)
assert(plans(2).metrics("numOutputRows").value === 600572)
assert(plans(2).metrics("numOutputRows").value === 240000)
}

withSQLConf(
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20250916
CH_COMMIT=39da31eab7b
CH_BRANCH=rebase_ch/20260310_25_12
CH_COMMIT=408fb65018b
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ include_directories(
${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include
${ClickHouse_SOURCE_DIR}/contrib/llvm-project/utils/bazel/llvm-project-overlay/llvm/include
${ClickHouse_SOURCE_DIR}/contrib/libdivide
${ClickHouse_SOURCE_DIR}/contrib/libdivide-cmake)
${ClickHouse_SOURCE_DIR}/contrib/libdivide-cmake
${ClickHouse_SOURCE_DIR}/contrib/StringZilla/include)

add_subdirectory(Storages/Parquet)
add_subdirectory(Storages/SubstraitSource)
Expand Down
6 changes: 5 additions & 1 deletion cpp-ch/local-engine/Common/AggregateUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ extern const SettingsUInt64 max_block_size;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation;
extern const SettingsBool serialize_string_in_memory_with_zero_byte;
}

template <typename Method>
Expand Down Expand Up @@ -259,7 +261,9 @@ DB::Aggregator::Params AggregatorParamsHelper::buildParams(
only_merge,
aggregate_settings[DB::Setting::optimize_group_by_constant_keys],
aggregate_settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization],
{}};
{},
settings[DB::Setting::enable_producing_buckets_out_of_order_in_aggregation],
settings[DB::Setting::serialize_string_in_memory_with_zero_byte]};
}


Expand Down
4 changes: 3 additions & 1 deletion cpp-ch/local-engine/Common/ArrayJoinHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Poco/Logger.h>
#include <Common/DebugUtils.h>
#include <Common/logger_useful.h>
Expand All @@ -39,6 +40,7 @@ extern const int LOGICAL_ERROR;
namespace Setting
{
extern const SettingsUInt64 max_block_size;
extern const SettingsBool enable_lazy_columns_replication;
}
}

Expand Down Expand Up @@ -146,7 +148,7 @@ addArrayJoinStep(DB::ContextPtr context, DB::QueryPlan & plan, const DB::Actions
array_join.columns = std::move(array_joined_columns);
array_join.is_left = is_left;
auto array_join_step = std::make_unique<DB::ArrayJoinStep>(
plan.getCurrentHeader(), std::move(array_join), false, context->getSettingsRef()[DB::Setting::max_block_size]);
plan.getCurrentHeader(), std::move(array_join), false, context->getSettingsRef()[DB::Setting::max_block_size], context->getSettingsRef()[DB::Setting::enable_lazy_columns_replication]);
array_join_step->setStepDescription("ARRAY JOIN In Generate");
steps.emplace_back(array_join_step.get());
plan.addStep(std::move(array_join_step));
Expand Down
8 changes: 6 additions & 2 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeType(
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const DB::DataTypePtr & cast_to_type,
DB::ContextPtr context,
const std::string & result_name,
DB::CastType cast_type)
{
Expand All @@ -421,7 +422,7 @@ const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeType(
DB::CastDiagnostic diagnostic = {node->result_name, node->result_name};
DB::ColumnWithTypeAndName left_column{nullptr, node->result_type, {}};
DB::ActionsDAG::NodeRawConstPtrs children = {left_arg, right_arg};
auto func_base_cast = createInternalCast(std::move(left_column), cast_to_type, cast_type, diagnostic);
auto func_base_cast = createInternalCast(std::move(left_column), cast_to_type, cast_type, diagnostic, context);

return &actions_dag.addFunction(func_base_cast, std::move(children), result_name);
}
Expand All @@ -430,13 +431,14 @@ const DB::ActionsDAG::Node * ActionsDAGUtil::convertNodeTypeIfNeeded(
DB::ActionsDAG & actions_dag,
const DB::ActionsDAG::Node * node,
const DB::DataTypePtr & dst_type,
DB::ContextPtr context,
const std::string & result_name,
DB::CastType cast_type)
{
if (node->result_type->equals(*dst_type))
return node;

return convertNodeType(actions_dag, node, dst_type, result_name, cast_type);
return convertNodeType(actions_dag, node, dst_type, context, result_name, cast_type);
}

String QueryPipelineUtil::explainPipeline(DB::QueryPipeline & pipeline)
Expand Down Expand Up @@ -791,6 +793,8 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
};

global_context->setTemporaryStoragePath(tmp_path, 0);
if (!fs::exists(tmp_path))
fs::create_directories(tmp_path);
global_context->setPath(config->getString("path", "/"));

String uncompressed_cache_policy = config->getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
Expand Down
Loading
Loading