diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index dd5ded9da3a0..63fa2677eea9 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -79,6 +79,10 @@
org.apache.logging.log4j
log4j-slf4j-impl
+
+ org.slf4j
+ slf4j-log4j12
+
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
index 9b08f56c6615..67b1a0e2f077 100644
--- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
@@ -45,7 +45,7 @@ class GlutenClickHouseNativeWriteTableSuite
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.memory.offHeap.enabled", "true")
- .set("spark.memory.offHeap.size", "1073741824")
+ .set("spark.memory.offHeap.size", "6442450944")
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
diff --git a/cpp-ch/CMakeLists.txt b/cpp-ch/CMakeLists.txt
index 96826905db3c..3d26d914c9d8 100644
--- a/cpp-ch/CMakeLists.txt
+++ b/cpp-ch/CMakeLists.txt
@@ -114,9 +114,9 @@ else()
-DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF -DENABLE_LDAP=OFF
-DENABLE_MSGPACK=OFF -DUSE_REPLXX=OFF -DENABLE_CLICKHOUSE_ALL=OFF
-DENABLE_NUMACTL=OFF -DENABLE_GOOGLE_CLOUD_CPP=OFF
- -DCOMPILER_FLAGS='-fvisibility=hidden -fvisibility-inlines-hidden' -S
- ${CH_SOURCE_DIR} -G Ninja -B ${CH_BINARY_DIR} && cmake --build
- ${CH_BINARY_DIR} --target libch\"
+ -DENABLE_ARROW_FLIGHT=OFF -DCOMPILER_FLAGS='-fvisibility=hidden
+ -fvisibility-inlines-hidden' -S ${CH_SOURCE_DIR} -G Ninja -B
+ ${CH_BINARY_DIR} && cmake --build ${CH_BINARY_DIR} --target libch\"
OUTPUT _build_ch)
endif()
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 38042d425460..f17fa2d5dba6 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250729
-CH_COMMIT=77ef0818976
+CH_BRANCH=rebase_ch/20250818
+CH_COMMIT=102d0e14c1b
diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt
index 1d4654bcaed9..459d038c532a 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -164,6 +164,42 @@ target_link_libraries(
target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC ch_parquet)
+# Wrap the malloc/free and other C-style functions with our own ones to inject
+# memory tracking mechanism into them. Sanitizers have their own way of
+# intercepting the allocations and deallocations, so we skip this step for them.
+# Define a macro to wrap memory allocation/deallocation functions for memory
+# tracking Param: target_lib - The target library to apply these wrappers to
+macro(add_memory_tracking_wrappers target_lib)
+ # Only apply these wrappers when not using sanitizers and not on macOS or
+ # FreeBSD
+ if(NOT
+ (SANITIZE
+ OR SANITIZE_COVERAGE
+ OR OS_DARWIN
+ OR OS_FREEBSD))
+ # Add linker options to wrap standard C memory allocation functions
+ target_link_options(
+ ${target_lib}
+ PRIVATE
+ "LINKER:--wrap=malloc"
+ "LINKER:--wrap=free"
+ "LINKER:--wrap=calloc"
+ "LINKER:--wrap=realloc"
+ "LINKER:--wrap=aligned_alloc"
+ "LINKER:--wrap=posix_memalign"
+ "LINKER:--wrap=valloc"
+ "LINKER:--wrap=memalign"
+ "LINKER:--wrap=reallocarray")
+
+ # Wrap pvalloc only when not using MUSL C library
+ if(NOT USE_MUSL)
+ target_link_options(${target_lib} PRIVATE "LINKER:--wrap=pvalloc")
+ endif()
+ endif()
+endmacro()
+
+add_memory_tracking_wrappers(${LOCALENGINE_SHARED_LIB})
+
if(NOT APPLE)
if(ENABLE_JEMALLOC)
target_link_options(
diff --git a/cpp-ch/local-engine/Common/AggregateUtil.cpp b/cpp-ch/local-engine/Common/AggregateUtil.cpp
index f2f23c13efb7..63514455f778 100644
--- a/cpp-ch/local-engine/Common/AggregateUtil.cpp
+++ b/cpp-ch/local-engine/Common/AggregateUtil.cpp
@@ -52,6 +52,7 @@ extern const SettingsUInt64 max_block_size;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsBool enable_software_prefetch_in_aggregation;
+extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation;
}
template
@@ -259,7 +260,8 @@ DB::Aggregator::Params AggregatorParamsHelper::buildParams(
only_merge,
aggregate_settings[DB::Setting::optimize_group_by_constant_keys],
aggregate_settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization],
- {}};
+ {},
+ aggregate_settings[DB::Setting::enable_producing_buckets_out_of_order_in_aggregation]};
}
diff --git a/cpp-ch/local-engine/Common/LoggerExtend.cpp b/cpp-ch/local-engine/Common/LoggerExtend.cpp
index 979c28ae08a4..612fd21cec8f 100644
--- a/cpp-ch/local-engine/Common/LoggerExtend.cpp
+++ b/cpp-ch/local-engine/Common/LoggerExtend.cpp
@@ -15,8 +15,8 @@
* limitations under the License.
*/
#include "LoggerExtend.h"
+#include
#include
-
#include
#include
#include
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
index a9d2d88fc82a..6c473b3193ed 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
@@ -597,8 +597,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
auto read_step = storage->reader.readFromParts(
RangesInDataParts({selected_parts}),
- /* alter_conversions = */
- {},
+ std::make_shared(),
names_and_types_list.getNames(),
storage_snapshot,
*query_info,
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
index 4e9f390c9c89..ab21decbe9f3 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
@@ -49,7 +49,7 @@ class SparkMergeTreeDataWriter
class SparkStorageMergeTree : public DB::MergeTreeData
{
friend class MergeSparkMergeTreeTask;
-
+public:
struct SparkMutationsSnapshot : public MutationsSnapshotBase
{
SparkMutationsSnapshot() = default;
@@ -63,7 +63,7 @@ class SparkStorageMergeTree : public DB::MergeTreeData
DB::NameSet getAllUpdatedColumns() const override { return {}; }
};
-public:
+
static void wrapRangesInDataParts(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges);
static void analysisPartsByRanges(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges_in_data_parts);
std::string getName() const override;
diff --git a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp
index 84b0330bc1f7..20fe335f445a 100644
--- a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp
+++ b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp
@@ -183,6 +183,7 @@ VectorizedParquetRecordReader::VectorizedParquetRecordReader(const DB::Block & h
"Parquet",
format_settings_,
std::nullopt,
+ std::nullopt,
format_settings_.parquet.allow_missing_columns,
format_settings_.null_as_default,
format_settings_.date_time_overflow_behavior,
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
index f2cf00e39299..854bd5a3ddba 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp
@@ -72,7 +72,7 @@ ORCFormatFile::createInputFormat(const DB::Block & header, const std::shared_ptr
format_settings.orc.reader_time_zone_name = mapped_timezone;
}
//TODO: support prefetch
- auto parser_group = std::make_shared(context->getSettingsRef(), 1, filter_actions_dag, context);
+ auto parser_group = std::make_shared(filter_actions_dag, context, nullptr);
auto input_format
= std::make_shared(*read_buffer, toShared(header), format_settings, false, 0, parser_group);
return std::make_shared(std::move(read_buffer), input_format);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index dcddc57cdd5f..797fa189cc2b 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -205,8 +205,10 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr
// We need to disable fiter push down and read all row groups, so that we can get correct row index.
format_settings.parquet.filter_push_down = false;
}
- auto parser_group = std::make_shared(context->getSettingsRef(), 1, filter_actions_dag, context);
- auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_group, 8192);
+ auto parser_group = std::make_shared(filter_actions_dag, context, nullptr);
+ auto parser_shared_resources
+ = std::make_shared(context->getSettingsRef(), /*num_streams_=*/1);
+ auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, 8192);
return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header);
};
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index be0b6c0cbc9c..91fb78d7d2f8 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -80,6 +80,7 @@ extern const SettingsUInt64 max_download_buffer_size;
extern const SettingsBool input_format_allow_seeks;
extern const SettingsUInt64 max_read_buffer_size;
extern const SettingsBool s3_slow_all_threads_after_network_error;
+extern const SettingsBool backup_slow_all_threads_after_retryable_s3_error;
extern const SettingsBool enable_s3_requests_logging;
}
namespace ErrorCodes
@@ -549,12 +550,14 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder
}
// for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work
+ unsigned int s3_retry_attempts = static_cast(context->getSettingsRef()[DB::Setting::s3_retry_attempts]);
DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
region_name,
context->getRemoteHostFilter(),
static_cast(context->getSettingsRef()[DB::Setting::s3_max_redirects]),
- static_cast(context->getSettingsRef()[DB::Setting::s3_retry_attempts]),
+ S3::PocoHTTPClientConfiguration::RetryStrategy{.max_retries = s3_retry_attempts},
context->getSettingsRef()[DB::Setting::s3_slow_all_threads_after_network_error],
+ context->getSettingsRef()[Setting::backup_slow_all_threads_after_retryable_s3_error],
context->getSettingsRef()[DB::Setting::enable_s3_requests_logging],
false,
nullptr,
@@ -657,7 +660,7 @@ class AzureBlobReadBuffer : public ReadBufferBuilder
DB::AzureBlobStorage::ConnectionParams params{
.endpoint = DB::AzureBlobStorage::processEndpoint(config, config_prefix),
.auth_method = DB::AzureBlobStorage::getAuthMethod(config, config_prefix),
- .client_options = DB::AzureBlobStorage::getClientOptions(context, *new_settings, is_client_for_disk),
+ .client_options = DB::AzureBlobStorage::getClientOptions(context, context->getSettingsRef(), *new_settings, is_client_for_disk),
};
shared_client = DB::AzureBlobStorage::getContainerClient(params, true);
diff --git a/cpp-ch/local-engine/tests/CMakeLists.txt b/cpp-ch/local-engine/tests/CMakeLists.txt
index 9c18d70b0f8e..1eed08c7ed01 100644
--- a/cpp-ch/local-engine/tests/CMakeLists.txt
+++ b/cpp-ch/local-engine/tests/CMakeLists.txt
@@ -76,6 +76,7 @@ if(ENABLE_TESTS)
unit_tests_local_engine
${local_engine_gtest_sources} ${local_engine_udf_sources}
${local_engine_function_parser_sources} ${gtest_utils_sources})
+ add_memory_tracking_wrappers(unit_tests_local_engine)
target_include_directories(
unit_tests_local_engine
PRIVATE ${ClickHouse_SOURCE_DIR}/utils/extern-local_engine
@@ -85,6 +86,7 @@ if(ENABLE_TESTS)
target_link_libraries(
unit_tests_local_engine
PRIVATE gluten_clickhouse_backend_libs
+ clickhouse_new_delete
clickhouse_common_config
clickhouse_common_io
clickhouse_parsers
@@ -114,6 +116,7 @@ if(ENABLE_BENCHMARKS)
benchmark_sum.cpp)
target_link_libraries(
benchmark_local_engine
- PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers
- ch_parquet)
+ PRIVATE gluten_clickhouse_backend_libs clickhouse_new_delete
+ ch_contrib::gbenchmark_all loggers ch_parquet)
+ add_memory_tracking_wrappers(benchmark_local_engine)
endif()
diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
index 388beb12ef53..4289b704628b 100644
--- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
@@ -88,8 +88,10 @@ void BM_ColumnIndexRead_Old(benchmark::State & state)
{
ReadBufferFromFilePRead fileReader(file);
auto global_context = local_engine::QueryContext::globalContext();
- auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context);
- auto format = std::make_shared(fileReader, header, format_settings, parser_group, 8192);
+ auto parser_group = std::make_shared(nullptr, global_context, nullptr);
+ auto parser_shared_resources
+ = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1);
+ auto format = std::make_shared(fileReader, header, format_settings, parser_shared_resources, parser_group, 8192);
auto pipeline = QueryPipeline(std::move(format));
auto reader = std::make_unique(pipeline);
while (reader->pull(res))
@@ -113,8 +115,10 @@ void BM_ParquetReadDate32(benchmark::State & state)
{
auto in = std::make_unique(file);
auto global_context = local_engine::QueryContext::globalContext();
- auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context);
- auto format = std::make_shared(*in, header, format_settings, parser_group, 8192);
+ auto parser_group = std::make_shared(nullptr, global_context, nullptr);
+ auto parser_shared_resources
+ = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1);
+ auto format = std::make_shared(*in, header, format_settings, parser_shared_resources, parser_group, 8192);
auto pipeline = QueryPipeline(std::move(format));
auto reader = std::make_unique(pipeline);
while (reader->pull(res))
diff --git a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
index d6048daa4f81..5b15f3d79f49 100644
--- a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
@@ -59,8 +59,10 @@ static void readParquetFile(const SharedHeader & header, const String & file, Bl
auto in = std::make_unique(file);
FormatSettings format_settings;
auto global_context = QueryContext::globalContext();
- auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context);
- auto format = std::make_shared(*in, header, format_settings, std::move(parser_group), 8192);
+ auto parser_group = std::make_shared(nullptr, global_context, nullptr);
+ auto parser_shared_resources
+ = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1);
+ auto format = std::make_shared(*in, header, format_settings, parser_shared_resources, std::move(parser_group), 8192);
auto pipeline = QueryPipeline(std::move(format));
auto reader = std::make_unique(pipeline);
while (reader->pull(block))
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
index 7994b79091b7..021f59722e89 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
@@ -112,9 +112,11 @@ void readData(const String & path, const std::map & fields)
InputFormatPtr format;
auto parser_group
- = std::make_shared(QueryContext::globalContext()->getSettingsRef(), 1, nullptr, QueryContext::globalContext());
+ = std::make_shared(nullptr, QueryContext::globalContext(), nullptr);
+ auto parser_shared_resources
+ = std::make_shared(QueryContext::globalContext()->getSettingsRef(), /*num_streams_=*/1);
if constexpr (std::is_same_v)
- format = std::make_shared(in, header, settings, parser_group, 8192);
+ format = std::make_shared(in, header, settings, parser_shared_resources, parser_group, 8192);
else
format = std::make_shared(in, header, settings);
@@ -366,6 +368,7 @@ TEST(ParquetRead, ArrowRead)
"Parquet",
format_settings,
std::nullopt,
+ std::nullopt,
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.date_time_overflow_behavior,
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_write.cpp b/cpp-ch/local-engine/tests/gtest_parquet_write.cpp
index df01107bd346..d83445129cbb 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_write.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_write.cpp
@@ -210,7 +210,7 @@ TEST(ParquetWrite, ComplexTypes)
/// Convert Arrow Table to CH Block
ArrowColumnToCHColumn arrow2ch(
- header, "Parquet", format_settings, std::nullopt, true, true, FormatSettings::DateTimeOverflowBehavior::Ignore, false);
+ header, "Parquet", format_settings, std::nullopt, std::nullopt, true, true, FormatSettings::DateTimeOverflowBehavior::Ignore, false);
Chunk output_chunk = arrow2ch.arrowTableToCHChunk(arrow_table, arrow_table->num_rows(), nullptr, nullptr);
/// Compare input and output columns