diff --git a/cpp-ch/CMakeLists.txt b/cpp-ch/CMakeLists.txt index 96826905db3c..3d26d914c9d8 100644 --- a/cpp-ch/CMakeLists.txt +++ b/cpp-ch/CMakeLists.txt @@ -114,9 +114,9 @@ else() -DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF -DENABLE_LDAP=OFF -DENABLE_MSGPACK=OFF -DUSE_REPLXX=OFF -DENABLE_CLICKHOUSE_ALL=OFF -DENABLE_NUMACTL=OFF -DENABLE_GOOGLE_CLOUD_CPP=OFF - -DCOMPILER_FLAGS='-fvisibility=hidden -fvisibility-inlines-hidden' -S - ${CH_SOURCE_DIR} -G Ninja -B ${CH_BINARY_DIR} && cmake --build - ${CH_BINARY_DIR} --target libch\" + -DENABLE_ARROW_FLIGHT=OFF -DCOMPILER_FLAGS='-fvisibility=hidden + -fvisibility-inlines-hidden' -S ${CH_SOURCE_DIR} -G Ninja -B + ${CH_BINARY_DIR} && cmake --build ${CH_BINARY_DIR} --target libch\" OUTPUT _build_ch) endif() diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 38042d425460..8f3f96e66f37 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20250729 -CH_COMMIT=77ef0818976 +CH_BRANCH=rebase_ch/20250816 +CH_COMMIT=02c3ea5f6ea diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index 1d4654bcaed9..459d038c532a 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -164,6 +164,42 @@ target_link_libraries( target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC ch_parquet) +# Wrap the malloc/free and other C-style functions with our own ones to inject +# memory tracking mechanism into them. Sanitizers have their own way of +# intercepting the allocations and deallocations, so we skip this step for them. +# Define a macro to wrap memory allocation/deallocation functions for memory +# tracking Param: target_lib - The target library to apply these wrappers to +macro(add_memory_tracking_wrappers target_lib) + # Only apply these wrappers when not using sanitizers and not on macOS or + # FreeBSD + if(NOT + (SANITIZE + OR SANITIZE_COVERAGE + OR OS_DARWIN + OR OS_FREEBSD)) + # Add linker options to wrap standard C memory allocation functions + target_link_options( + ${target_lib} + PRIVATE + "LINKER:--wrap=malloc" + "LINKER:--wrap=free" + "LINKER:--wrap=calloc" + "LINKER:--wrap=realloc" + "LINKER:--wrap=aligned_alloc" + "LINKER:--wrap=posix_memalign" + "LINKER:--wrap=valloc" + "LINKER:--wrap=memalign" + "LINKER:--wrap=reallocarray") + + # Wrap pvalloc only when not using MUSL C library + if(NOT USE_MUSL) + target_link_options(${target_lib} PRIVATE "LINKER:--wrap=pvalloc") + endif() + endif() +endmacro() + +add_memory_tracking_wrappers(${LOCALENGINE_SHARED_LIB}) + if(NOT APPLE) if(ENABLE_JEMALLOC) target_link_options( diff --git a/cpp-ch/local-engine/Common/AggregateUtil.cpp b/cpp-ch/local-engine/Common/AggregateUtil.cpp index f2f23c13efb7..63514455f778 100644 --- a/cpp-ch/local-engine/Common/AggregateUtil.cpp +++ b/cpp-ch/local-engine/Common/AggregateUtil.cpp @@ -52,6 +52,7 @@ extern const SettingsUInt64 max_block_size; extern const SettingsBool compile_aggregate_expressions; extern const SettingsUInt64 min_count_to_compile_aggregate_expression; extern const SettingsBool enable_software_prefetch_in_aggregation; +extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation; } template @@ -259,7 +260,8 @@ DB::Aggregator::Params AggregatorParamsHelper::buildParams( only_merge, aggregate_settings[DB::Setting::optimize_group_by_constant_keys], aggregate_settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization], - {}}; + {}, + aggregate_settings[DB::Setting::enable_producing_buckets_out_of_order_in_aggregation]}; } diff --git a/cpp-ch/local-engine/Common/LoggerExtend.cpp b/cpp-ch/local-engine/Common/LoggerExtend.cpp index 979c28ae08a4..612fd21cec8f 100644 --- a/cpp-ch/local-engine/Common/LoggerExtend.cpp +++ b/cpp-ch/local-engine/Common/LoggerExtend.cpp @@ -15,8 +15,8 @@ * limitations under the License. */ #include "LoggerExtend.h" +#include #include - #include #include #include diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp index a9d2d88fc82a..6c473b3193ed 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp @@ -597,8 +597,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_ throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found."); auto read_step = storage->reader.readFromParts( RangesInDataParts({selected_parts}), - /* alter_conversions = */ - {}, + std::make_shared(), names_and_types_list.getNames(), storage_snapshot, *query_info, diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h index 4e9f390c9c89..ab21decbe9f3 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h @@ -49,7 +49,7 @@ class SparkMergeTreeDataWriter class SparkStorageMergeTree : public DB::MergeTreeData { friend class MergeSparkMergeTreeTask; - +public: struct SparkMutationsSnapshot : public MutationsSnapshotBase { SparkMutationsSnapshot() = default; @@ -63,7 +63,7 @@ class SparkStorageMergeTree : public DB::MergeTreeData DB::NameSet getAllUpdatedColumns() const override { return {}; } }; -public: + static void wrapRangesInDataParts(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges); static void analysisPartsByRanges(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges_in_data_parts); std::string getName() const override; diff --git a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp index 84b0330bc1f7..20fe335f445a 100644 --- a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp +++ b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp @@ -183,6 +183,7 @@ VectorizedParquetRecordReader::VectorizedParquetRecordReader(const DB::Block & h "Parquet", format_settings_, std::nullopt, + std::nullopt, format_settings_.parquet.allow_missing_columns, format_settings_.null_as_default, format_settings_.date_time_overflow_behavior, diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp index f2cf00e39299..854bd5a3ddba 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp @@ -72,7 +72,7 @@ ORCFormatFile::createInputFormat(const DB::Block & header, const std::shared_ptr format_settings.orc.reader_time_zone_name = mapped_timezone; } //TODO: support prefetch - auto parser_group = std::make_shared(context->getSettingsRef(), 1, filter_actions_dag, context); + auto parser_group = std::make_shared(filter_actions_dag, context, nullptr); auto input_format = std::make_shared(*read_buffer, toShared(header), format_settings, false, 0, parser_group); return std::make_shared(std::move(read_buffer), input_format); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index dcddc57cdd5f..797fa189cc2b 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -205,8 +205,10 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr // We need to disable fiter push down and read all row groups, so that we can get correct row index. format_settings.parquet.filter_push_down = false; } - auto parser_group = std::make_shared(context->getSettingsRef(), 1, filter_actions_dag, context); - auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_group, 8192); + auto parser_group = std::make_shared(filter_actions_dag, context, nullptr); + auto parser_shared_resources + = std::make_shared(context->getSettingsRef(), /*num_streams_=*/1); + auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, 8192); return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header); }; diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index be0b6c0cbc9c..91fb78d7d2f8 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -80,6 +80,7 @@ extern const SettingsUInt64 max_download_buffer_size; extern const SettingsBool input_format_allow_seeks; extern const SettingsUInt64 max_read_buffer_size; extern const SettingsBool s3_slow_all_threads_after_network_error; +extern const SettingsBool backup_slow_all_threads_after_retryable_s3_error; extern const SettingsBool enable_s3_requests_logging; } namespace ErrorCodes @@ -549,12 +550,14 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder } // for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work + unsigned int s3_retry_attempts = static_cast(context->getSettingsRef()[DB::Setting::s3_retry_attempts]); DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration( region_name, context->getRemoteHostFilter(), static_cast(context->getSettingsRef()[DB::Setting::s3_max_redirects]), - static_cast(context->getSettingsRef()[DB::Setting::s3_retry_attempts]), + S3::PocoHTTPClientConfiguration::RetryStrategy{.max_retries = s3_retry_attempts}, context->getSettingsRef()[DB::Setting::s3_slow_all_threads_after_network_error], + context->getSettingsRef()[Setting::backup_slow_all_threads_after_retryable_s3_error], context->getSettingsRef()[DB::Setting::enable_s3_requests_logging], false, nullptr, @@ -657,7 +660,7 @@ class AzureBlobReadBuffer : public ReadBufferBuilder DB::AzureBlobStorage::ConnectionParams params{ .endpoint = DB::AzureBlobStorage::processEndpoint(config, config_prefix), .auth_method = DB::AzureBlobStorage::getAuthMethod(config, config_prefix), - .client_options = DB::AzureBlobStorage::getClientOptions(context, *new_settings, is_client_for_disk), + .client_options = DB::AzureBlobStorage::getClientOptions(context, context->getSettingsRef(), *new_settings, is_client_for_disk), }; shared_client = DB::AzureBlobStorage::getContainerClient(params, true); diff --git a/cpp-ch/local-engine/tests/CMakeLists.txt b/cpp-ch/local-engine/tests/CMakeLists.txt index 9c18d70b0f8e..1eed08c7ed01 100644 --- a/cpp-ch/local-engine/tests/CMakeLists.txt +++ b/cpp-ch/local-engine/tests/CMakeLists.txt @@ -76,6 +76,7 @@ if(ENABLE_TESTS) unit_tests_local_engine ${local_engine_gtest_sources} ${local_engine_udf_sources} ${local_engine_function_parser_sources} ${gtest_utils_sources}) + add_memory_tracking_wrappers(unit_tests_local_engine) target_include_directories( unit_tests_local_engine PRIVATE ${ClickHouse_SOURCE_DIR}/utils/extern-local_engine @@ -85,6 +86,7 @@ if(ENABLE_TESTS) target_link_libraries( unit_tests_local_engine PRIVATE gluten_clickhouse_backend_libs + clickhouse_new_delete clickhouse_common_config clickhouse_common_io clickhouse_parsers @@ -114,6 +116,7 @@ if(ENABLE_BENCHMARKS) benchmark_sum.cpp) target_link_libraries( benchmark_local_engine - PRIVATE gluten_clickhouse_backend_libs ch_contrib::gbenchmark_all loggers - ch_parquet) + PRIVATE gluten_clickhouse_backend_libs clickhouse_new_delete + ch_contrib::gbenchmark_all loggers ch_parquet) + add_memory_tracking_wrappers(benchmark_local_engine) endif() diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp index 388beb12ef53..4289b704628b 100644 --- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp +++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp @@ -88,8 +88,10 @@ void BM_ColumnIndexRead_Old(benchmark::State & state) { ReadBufferFromFilePRead fileReader(file); auto global_context = local_engine::QueryContext::globalContext(); - auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context); - auto format = std::make_shared(fileReader, header, format_settings, parser_group, 8192); + auto parser_group = std::make_shared(nullptr, global_context, nullptr); + auto parser_shared_resources + = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1); + auto format = std::make_shared(fileReader, header, format_settings, parser_shared_resources, parser_group, 8192); auto pipeline = QueryPipeline(std::move(format)); auto reader = std::make_unique(pipeline); while (reader->pull(res)) @@ -113,8 +115,10 @@ void BM_ParquetReadDate32(benchmark::State & state) { auto in = std::make_unique(file); auto global_context = local_engine::QueryContext::globalContext(); - auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context); - auto format = std::make_shared(*in, header, format_settings, parser_group, 8192); + auto parser_group = std::make_shared(nullptr, global_context, nullptr); + auto parser_shared_resources + = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1); + auto format = std::make_shared(*in, header, format_settings, parser_shared_resources, parser_group, 8192); auto pipeline = QueryPipeline(std::move(format)); auto reader = std::make_unique(pipeline); while (reader->pull(res)) diff --git a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp index d6048daa4f81..5b15f3d79f49 100644 --- a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp +++ b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp @@ -59,8 +59,10 @@ static void readParquetFile(const SharedHeader & header, const String & file, Bl auto in = std::make_unique(file); FormatSettings format_settings; auto global_context = QueryContext::globalContext(); - auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context); - auto format = std::make_shared(*in, header, format_settings, std::move(parser_group), 8192); + auto parser_group = std::make_shared(nullptr, global_context, nullptr); + auto parser_shared_resources + = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1); + auto format = std::make_shared(*in, header, format_settings, parser_shared_resources, std::move(parser_group), 8192); auto pipeline = QueryPipeline(std::move(format)); auto reader = std::make_unique(pipeline); while (reader->pull(block)) diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp index 7994b79091b7..021f59722e89 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp @@ -112,9 +112,11 @@ void readData(const String & path, const std::map & fields) InputFormatPtr format; auto parser_group - = std::make_shared(QueryContext::globalContext()->getSettingsRef(), 1, nullptr, QueryContext::globalContext()); + = std::make_shared(nullptr, QueryContext::globalContext(), nullptr); + auto parser_shared_resources + = std::make_shared(QueryContext::globalContext()->getSettingsRef(), /*num_streams_=*/1); if constexpr (std::is_same_v) - format = std::make_shared(in, header, settings, parser_group, 8192); + format = std::make_shared(in, header, settings, parser_shared_resources, parser_group, 8192); else format = std::make_shared(in, header, settings); @@ -366,6 +368,7 @@ TEST(ParquetRead, ArrowRead) "Parquet", format_settings, std::nullopt, + std::nullopt, format_settings.parquet.allow_missing_columns, format_settings.null_as_default, format_settings.date_time_overflow_behavior, diff --git a/cpp-ch/local-engine/tests/gtest_parquet_write.cpp b/cpp-ch/local-engine/tests/gtest_parquet_write.cpp index df01107bd346..d83445129cbb 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_write.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_write.cpp @@ -210,7 +210,7 @@ TEST(ParquetWrite, ComplexTypes) /// Convert Arrow Table to CH Block ArrowColumnToCHColumn arrow2ch( - header, "Parquet", format_settings, std::nullopt, true, true, FormatSettings::DateTimeOverflowBehavior::Ignore, false); + header, "Parquet", format_settings, std::nullopt, std::nullopt, true, true, FormatSettings::DateTimeOverflowBehavior::Ignore, false); Chunk output_chunk = arrow2ch.arrowTableToCHChunk(arrow_table, arrow_table->num_rows(), nullptr, nullptr); /// Compare input and output columns