diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index 2a3ccc751e25..bb0fc9802875 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -89,7 +89,7 @@ class GlutenClickHouseWholeStageTransformerSuite .set("spark.sql.warehouse.dir", warehouse) .setCHConfig("user_defined_path", "/tmp/user_defined") .set(RuntimeConfig.PATH.key, UTSystemParameters.diskOutputDataPath) - .set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/$SPARK_DIR_NAME") + .set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/") if (UTSystemParameters.testMergeTreeOnObjectStorage) { minioHelper.setFileSystem(conf) minioHelper.setStoreConfig(conf, BUCKET_NAME) diff --git a/cpp-ch/CMakeLists.txt b/cpp-ch/CMakeLists.txt index 96826905db3c..3d26d914c9d8 100644 --- a/cpp-ch/CMakeLists.txt +++ b/cpp-ch/CMakeLists.txt @@ -114,9 +114,9 @@ else() -DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF -DENABLE_LDAP=OFF -DENABLE_MSGPACK=OFF -DUSE_REPLXX=OFF -DENABLE_CLICKHOUSE_ALL=OFF -DENABLE_NUMACTL=OFF -DENABLE_GOOGLE_CLOUD_CPP=OFF - -DCOMPILER_FLAGS='-fvisibility=hidden -fvisibility-inlines-hidden' -S - ${CH_SOURCE_DIR} -G Ninja -B ${CH_BINARY_DIR} && cmake --build - ${CH_BINARY_DIR} --target libch\" + -DENABLE_ARROW_FLIGHT=OFF -DCOMPILER_FLAGS='-fvisibility=hidden + -fvisibility-inlines-hidden' -S ${CH_SOURCE_DIR} -G Ninja -B + ${CH_BINARY_DIR} && cmake --build ${CH_BINARY_DIR} --target libch\" OUTPUT _build_ch) endif() diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 38042d425460..3c5c2032feca 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20250729 -CH_COMMIT=77ef0818976 +CH_BRANCH=rebase_ch/20250904 +CH_COMMIT=4fc3e17478c diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index 1d4654bcaed9..459d038c532a 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -164,6 +164,42 @@ target_link_libraries( target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC ch_parquet) +# Wrap the malloc/free and other C-style functions with our own ones to inject +# memory tracking mechanism into them. Sanitizers have their own way of +# intercepting the allocations and deallocations, so we skip this step for them. +# Define a macro to wrap memory allocation/deallocation functions for memory +# tracking Param: target_lib - The target library to apply these wrappers to +macro(add_memory_tracking_wrappers target_lib) + # Only apply these wrappers when not using sanitizers and not on macOS or + # FreeBSD + if(NOT + (SANITIZE + OR SANITIZE_COVERAGE + OR OS_DARWIN + OR OS_FREEBSD)) + # Add linker options to wrap standard C memory allocation functions + target_link_options( + ${target_lib} + PRIVATE + "LINKER:--wrap=malloc" + "LINKER:--wrap=free" + "LINKER:--wrap=calloc" + "LINKER:--wrap=realloc" + "LINKER:--wrap=aligned_alloc" + "LINKER:--wrap=posix_memalign" + "LINKER:--wrap=valloc" + "LINKER:--wrap=memalign" + "LINKER:--wrap=reallocarray") + + # Wrap pvalloc only when not using MUSL C library + if(NOT USE_MUSL) + target_link_options(${target_lib} PRIVATE "LINKER:--wrap=pvalloc") + endif() + endif() +endmacro() + +add_memory_tracking_wrappers(${LOCALENGINE_SHARED_LIB}) + if(NOT APPLE) if(ENABLE_JEMALLOC) target_link_options( diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 55938082a3e1..b50541931a14 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -783,11 +783,13 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) global_context->setConfig(config); auto tmp_path = config->getString("tmp_path", PathConfig::DEFAULT_TEMP_FILE_PATH); + LOG_ERROR(getLogger("BackendInitializerUtil"), "xxxx tmp path: {}. default tmp path: {}", tmp_path, PathConfig::DEFAULT_TEMP_FILE_PATH); if (config->getBool(PathConfig::USE_CURRENT_DIRECTORY_AS_TMP, false)) { char buffer[PATH_MAX]; if (getcwd(buffer, sizeof(buffer)) != nullptr) tmp_path = std::string(buffer) + tmp_path; + LOG_ERROR(getLogger("BackendInitializerUtil"), "use current directory as tmp path: {}", tmp_path); }; global_context->setTemporaryStoragePath(tmp_path, 0); diff --git a/cpp-ch/local-engine/Common/LoggerExtend.cpp b/cpp-ch/local-engine/Common/LoggerExtend.cpp index 979c28ae08a4..52cfbf77ea63 100644 --- a/cpp-ch/local-engine/Common/LoggerExtend.cpp +++ b/cpp-ch/local-engine/Common/LoggerExtend.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ #include "LoggerExtend.h" +#include #include #include diff --git a/cpp-ch/local-engine/Functions/SparkCastComplexTypesToString.h b/cpp-ch/local-engine/Functions/SparkCastComplexTypesToString.h index e26dfcd65bc2..0ab1a468121d 100644 --- a/cpp-ch/local-engine/Functions/SparkCastComplexTypesToString.h +++ b/cpp-ch/local-engine/Functions/SparkCastComplexTypesToString.h @@ -114,7 +114,7 @@ class SparkCastComplexTypesToString : public DB::IFunction for (size_t row = 0; row < input_rows_count; ++row) { serializeTuple(*tuple_col, row, tuple_type->getElements(), write_buffer, format_settings); - write_helper.rowWritten(); + write_helper.finishRow(); } write_helper.finalize(); } @@ -126,7 +126,7 @@ class SparkCastComplexTypesToString : public DB::IFunction for (size_t row = 0; row < input_rows_count; ++row) { serializeMap(*map_col, row, key_type, value_type, write_buffer, format_settings); - write_helper.rowWritten(); + write_helper.finishRow(); } write_helper.finalize(); } @@ -136,7 +136,7 @@ class SparkCastComplexTypesToString : public DB::IFunction for (size_t row = 0; row < input_rows_count; ++row) { serializeArray(*array_col, row, array_type->getNestedType(), write_buffer, format_settings); - write_helper.rowWritten(); + write_helper.finishRow(); } write_helper.finalize(); } diff --git a/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp b/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp index bf65b253479b..89b07f263f07 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp @@ -131,7 +131,7 @@ class SparkFunctionArrayJoin : public IFunction } else { - const StringRef s(&string_data[data_pos], string_offsets[j + array_pos] - data_pos - 1); + const StringRef s(&string_data[data_pos], string_offsets[j + array_pos] - data_pos); res += s.toString(); last_not_null_pos = res.size(); if (j != array_size - 1) diff --git a/cpp-ch/local-engine/Functions/SparkFunctionBin.cpp b/cpp-ch/local-engine/Functions/SparkFunctionBin.cpp index 9aa44bf45cf6..f4c979f27c6b 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionBin.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionBin.cpp @@ -112,8 +112,7 @@ namespace val >>= 1; } while (val != 0 && char_pos > 0); - pos += len + 1; - out_chars[pos - 1] = '\0'; + pos += len; out_offsets[i] = pos; } diff --git a/cpp-ch/local-engine/Functions/SparkFunctionCastFloatToString.cpp b/cpp-ch/local-engine/Functions/SparkFunctionCastFloatToString.cpp index 2c564997a211..3563bf3894bb 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionCastFloatToString.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionCastFloatToString.cpp @@ -103,7 +103,6 @@ class SparkFunctionCastFloatToString : public IFunction { writeFloatText(src_col->getElement(i), write_buffer); writeFloatEnd(src_col->getElement(i), write_buffer); - writeChar(0, write_buffer); res_offsets[i] = write_buffer.count(); } return true; diff --git a/cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h b/cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h index 1399b422856c..7183e4e8b5df 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h +++ b/cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h @@ -762,7 +762,7 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction bool document_ok = false; if (col_json_const) { - std::string_view json{reinterpret_cast(chars.data()), offsets[0] - 1}; + std::string_view json{reinterpret_cast(chars.data()), offsets[0]}; document_ok = safeParseJson(json, parser, document); } @@ -778,7 +778,7 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction { if (!col_json_const) { - std::string_view json{reinterpret_cast(&chars[offsets[i - 1]]), offsets[i] - offsets[i - 1] - 1}; + std::string_view json{reinterpret_cast(&chars[offsets[i - 1]]), offsets[i] - offsets[i - 1]}; document_ok = safeParseJson(json, parser, document); } if (document_ok) diff --git a/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h b/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h index f0dac5d3d72d..a1acccc1ca60 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h +++ b/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h @@ -373,7 +373,7 @@ class SparkFunctionAnyHash : public DB::IFunction { if (!null_map || !(*null_map)[i]) [[likely]] vec_to[i] = applyUnsafeBytes( - reinterpret_cast(&data[current_offset]), offsets[i] - current_offset - 1, vec_to[i]); + reinterpret_cast(&data[current_offset]), offsets[i] - current_offset, vec_to[i]); current_offset = offsets[i]; } diff --git a/cpp-ch/local-engine/Functions/SparkFunctionPositionUTF8.cpp b/cpp-ch/local-engine/Functions/SparkFunctionPositionUTF8.cpp index 856dd6bd0957..68cb431dfbfc 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionPositionUTF8.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionPositionUTF8.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB { @@ -176,8 +177,8 @@ struct PositionSparkImpl for (size_t i = 0; i < input_rows_count; ++i) { - size_t needle_size = needle_offsets[i] - prev_needle_offset - 1; - size_t haystack_size = haystack_offsets[i] - prev_haystack_offset - 1; + size_t needle_size = needle_offsets[i] - prev_needle_offset; + size_t haystack_size = haystack_offsets[i] - prev_haystack_offset; auto start = start_pos != nullptr ? start_pos->getUInt(i) : UInt64(0); @@ -195,14 +196,14 @@ struct PositionSparkImpl /// It is assumed that the StringSearcher is not very difficult to initialize. typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack( reinterpret_cast(&needle_data[prev_needle_offset]), - needle_offsets[i] - prev_needle_offset - 1); /// zero byte at the end + needle_offsets[i] - prev_needle_offset); const char * beg = Impl::advancePos( reinterpret_cast(&haystack_data[prev_haystack_offset]), - reinterpret_cast(&haystack_data[haystack_offsets[i] - 1]), + reinterpret_cast(&haystack_data[haystack_offsets[i]]), start - 1); /// searcher returns a pointer to the found substring or to the end of `haystack`. - size_t pos = searcher.search(reinterpret_cast(beg), &haystack_data[haystack_offsets[i] - 1]) + size_t pos = searcher.search(reinterpret_cast(beg), &haystack_data[haystack_offsets[i]]) - &haystack_data[prev_haystack_offset]; if (pos != haystack_size) @@ -239,7 +240,7 @@ struct PositionSparkImpl for (size_t i = 0; i < input_rows_count; ++i) { - size_t needle_size = needle_offsets[i] - prev_needle_offset - 1; + size_t needle_size = needle_offsets[i] - prev_needle_offset; auto start = start_pos != nullptr ? start_pos->getUInt(i) : UInt64(0); @@ -254,7 +255,7 @@ struct PositionSparkImpl else { typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack( - reinterpret_cast(&needle_data[prev_needle_offset]), needle_offsets[i] - prev_needle_offset - 1); + reinterpret_cast(&needle_data[prev_needle_offset]), needle_offsets[i] - prev_needle_offset); const char * beg = Impl::advancePos(haystack.data(), haystack.data() + haystack.size(), start - 1); size_t pos = searcher.search( diff --git a/cpp-ch/local-engine/Functions/SparkFunctionRegexpExtractAll.cpp b/cpp-ch/local-engine/Functions/SparkFunctionRegexpExtractAll.cpp index 68136713f59c..54304671968c 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionRegexpExtractAll.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionRegexpExtractAll.cpp @@ -169,16 +169,14 @@ namespace const auto & match = matches[match_index]; if (match.offset != std::string::npos) { - res_strings_chars.resize_exact(res_strings_offset + match.length + 1); + res_strings_chars.resize_exact(res_strings_offset + match.length); memcpySmallAllowReadWriteOverflow15(&res_strings_chars[res_strings_offset], pos + match.offset, match.length); res_strings_offset += match.length; } else - res_strings_chars.resize_exact(res_strings_offset + 1); + res_strings_chars.resize_exact(res_strings_offset); /// Update offsets of Column:String - res_strings_chars[res_strings_offset] = 0; - ++res_strings_offset; res_strings_offsets.push_back(res_strings_offset); ++i; @@ -221,7 +219,7 @@ namespace for (size_t cur_offset : offsets) { Pos start = reinterpret_cast(&data[prev_offset]); - Pos end = start + (cur_offset - prev_offset - 1); + Pos end = start + (cur_offset - prev_offset); saveMatchs( start, end, @@ -272,7 +270,7 @@ namespace size_t cur_offset = offsets[i]; Pos start = reinterpret_cast(&data[prev_offset]); - Pos end = start + (cur_offset - prev_offset - 1); + Pos end = start + (cur_offset - prev_offset); saveMatchs( start, end, @@ -356,16 +354,14 @@ namespace /// Append matched segment into res_strings_chars if (match.offset != std::string::npos) { - res_strings_chars.resize_exact(res_strings_offset + match.length + 1); + res_strings_chars.resize_exact(res_strings_offset + match.length); memcpySmallAllowReadWriteOverflow15(&res_strings_chars[res_strings_offset], start + match.offset, match.length); res_strings_offset += match.length; } else - res_strings_chars.resize_exact(res_strings_offset + 1); + res_strings_chars.resize_exact(res_strings_offset); /// Update offsets of Column:String - res_strings_chars[res_strings_offset] = 0; - ++res_strings_offset; res_strings_offsets.push_back(res_strings_offset); } diff --git a/cpp-ch/local-engine/Functions/SparkFunctionReinterpretAsString.cpp b/cpp-ch/local-engine/Functions/SparkFunctionReinterpretAsString.cpp index 5c3e5b6d4449..08ff46045c22 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionReinterpretAsString.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionReinterpretAsString.cpp @@ -88,11 +88,9 @@ namespace if (!is_string_type) std::reverse(data.begin(), data.end()); - data_to.resize(offset + data.size() + 1); + data_to.resize(offset + data.size()); memcpy(&data_to[offset], data.data(), data.size()); offset += data.size(); - data_to[offset] = 0; - ++offset; offsets_to[i] = offset; } diff --git a/cpp-ch/local-engine/Functions/SparkFunctionStrToMap.cpp b/cpp-ch/local-engine/Functions/SparkFunctionStrToMap.cpp index 19955c4dabe7..84940af869c5 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionStrToMap.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionStrToMap.cpp @@ -71,7 +71,7 @@ class TrivialCharSplitter bool next(Pos & token_begin, Pos & token_end) { - if (str_cursor >= str_end) + if (str_cursor > str_end) return false; token_begin = str_cursor; auto next_token_pos = static_cast(memmem(str_cursor, str_end - str_cursor, delimiter.c_str(), delimiter.size())); @@ -79,7 +79,7 @@ class TrivialCharSplitter if (!next_token_pos) { token_end = str_end; - str_cursor = str_end; + str_cursor = str_end + 1; delimiter_begin = nullptr; delimiter_end = nullptr; } @@ -126,7 +126,7 @@ struct RegularSplitter bool next(Pos & token_begin, Pos & token_end) { - if (str_cursor >= str_end) + if (str_cursor > str_end) return false; // If delimiter is empty, return each character as a token. if (!re) @@ -143,7 +143,7 @@ struct RegularSplitter { token_begin = str_cursor; token_end = str_end; - str_cursor = str_end; + str_cursor = str_end + 1; delimiter_begin = nullptr; delimiter_end = nullptr; return true; @@ -271,7 +271,7 @@ class SparkFunctionStrToMap : public DB::IFunction { DB::Tuple tuple(2); size_t key_len = key_end - key_begin; - tuple[0] = key_end == str_end ? std::string_view(key_begin, key_len - 1) : std::string_view(key_begin, key_len); + tuple[0] = key_end == str_end ? std::string_view(key_begin, key_len) : std::string_view(key_begin, key_len); auto delimiter_begin = kv_generator.getDelimiterBegin(); auto delimiter_end = kv_generator.getDelimiterEnd(); LOG_TRACE( @@ -284,7 +284,7 @@ class SparkFunctionStrToMap : public DB::IFunction std::string_view(key_begin, key_end - key_begin)); if (delimiter_begin && delimiter_begin != str_end) { - DB::Field value = pair_end == str_end ? std::string_view(delimiter_end, pair_end - delimiter_end - 1) + DB::Field value = pair_end == str_end ? std::string_view(delimiter_end, pair_end - delimiter_end) : std::string_view(delimiter_end, pair_end - delimiter_end); tuple[1] = std::move(value); } diff --git a/cpp-ch/local-engine/Functions/SparkFunctionTrim.cpp b/cpp-ch/local-engine/Functions/SparkFunctionTrim.cpp index 3f05a76d90d1..348bccba5e54 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionTrim.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionTrim.cpp @@ -185,8 +185,7 @@ namespace size_t res_offset = row > 0 ? res_offsets[row - 1] : 0; res_data.resize_exact(res_data.size() + dst_size + 1); memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], dst, dst_size); - res_offset += dst_size + 1; - res_data[res_offset - 1] = '\0'; + res_offset += dst_size; res_offsets[row] = res_offset; } diff --git a/cpp-ch/local-engine/Functions/SparkParseURL.cpp b/cpp-ch/local-engine/Functions/SparkParseURL.cpp index 97c177c3f213..97fa819acaa8 100644 --- a/cpp-ch/local-engine/Functions/SparkParseURL.cpp +++ b/cpp-ch/local-engine/Functions/SparkParseURL.cpp @@ -58,7 +58,7 @@ struct ExtractNullableSubstringImpl for (size_t i = 0; i < size; ++i) { - String s(reinterpret_cast(&data[prev_offset]), offsets[i] - prev_offset - 1); + String s(reinterpret_cast(&data[prev_offset]), offsets[i] - prev_offset); try { Poco::URI uri(s, false); @@ -69,7 +69,7 @@ struct ExtractNullableSubstringImpl start = nullptr; length = 0; } - res_data.resize_exact(res_data.size() + length + 1); + res_data.resize_exact(res_data.size() + length); if (start) { memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], start, length); @@ -79,8 +79,7 @@ struct ExtractNullableSubstringImpl { null_map.insert(1); } - res_offset += length + 1; - res_data[res_offset - 1] = 0; + res_offset += length; res_offsets[i] = res_offset; prev_offset = offsets[i]; diff --git a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp index 6dbd28b95f74..ccc49c80124c 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp @@ -39,6 +39,7 @@ #include #include #include +#include namespace DB { @@ -201,6 +202,19 @@ void JoinRelParser::renamePlanColumns(DB::QueryPlan & left, DB::QueryPlan & righ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::QueryPlanPtr left, DB::QueryPlanPtr right) { + std::string path = "/tmp/libch"; // 当前目录 + + try + { + for (const auto& entry : std::filesystem::directory_iterator(path)) + { + LOG_ERROR(getLogger("JoinRelParser"), "Find temp files: {}", entry.path().filename().string()); + } + } + catch (const std::filesystem::filesystem_error& e) + { + LOG_ERROR(getLogger("JoinRelParser"), "Error: {}", e.what()); + } auto join_config = JoinConfig::loadFromContext(getContext()); google::protobuf::StringValue optimization_info; optimization_info.ParseFromString(join.advanced_extension().optimization().value()); @@ -778,10 +792,11 @@ DB::QueryPlanPtr JoinRelParser::buildSingleOnClauseHashJoin( MultiEnum join_algorithm = context->getSettingsRef()[Setting::join_algorithm]; if (join_algorithm.isSet(DB::JoinAlgorithm::GRACE_HASH)) { + auto temp_dist = Context::getGlobalContextInstance()->getTempDataOnDisk(); hash_join = std::make_shared( context->getSettingsRef()[Setting::grace_hash_join_initial_buckets], context->getSettingsRef()[Setting::grace_hash_join_max_buckets], - table_join, left_plan->getCurrentHeader(), right_plan->getCurrentHeader(), context->getTempDataOnDisk()); + table_join, left_plan->getCurrentHeader(), right_plan->getCurrentHeader(), temp_dist); } else { diff --git a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp index 4a416abe2a1c..a7501d6a2820 100644 --- a/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp +++ b/cpp-ch/local-engine/Storages/IO/AggregateSerializationUtils.cpp @@ -89,7 +89,6 @@ DB::ColumnWithTypeAndName convertAggregateStateToString(const DB::ColumnWithType for (const auto & item : aggregate_col->getData()) { aggregate_col->getAggregateFunction()->serialize(item, value_writer); - writeChar('\0', value_writer); column_offsets.emplace_back(value_writer.count()); } return DB::ColumnWithTypeAndName(std::move(res_col), res_type, col.name); diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp index cd175dc2d8f7..524cb3af3835 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp @@ -116,7 +116,6 @@ readVarSizeAggregateData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t row AggregateDataPtr place = arena.alignedAlloc(column_parse_util.aggregate_state_size, column_parse_util.aggregate_state_align); column_parse_util.aggregate_function->create(place); column_parse_util.aggregate_function->deserialize(place, in, std::nullopt, &arena); - in.ignore(); vec.push_back(place); } } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp index bb80bc5c6fd5..8ab29e7e8150 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp @@ -290,7 +290,7 @@ MergeTreeData::LoadPartResult SparkStorageMergeTree::loadDataPart( // without it "test mergetree optimize partitioned by one low card column" will log ERROR resetColumnSizes(); - calculateColumnAndSecondaryIndexSizesIfNeeded(); + calculateColumnAndSecondaryIndexSizesImpl(); LOG_TRACE(log, "Finished loading {} part {} on disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName()); return res; diff --git a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp index 381055d57124..f87109b67e13 100644 --- a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp @@ -46,7 +46,7 @@ OutputFormatFile::OutputFormatPtr ParquetOutputFormatFile::createOutputFormat(co auto new_header = toShared(createHeaderWithPreferredSchema(header)); // TODO: align all spark parquet config with ch parquet config auto format_settings = DB::getFormatSettings(context); - auto output_format = std::make_shared(*(res->write_buffer), new_header, format_settings); + auto output_format = std::make_shared(*(res->write_buffer), new_header, format_settings, nullptr); res->output = output_format; return res; } diff --git a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp index 84b0330bc1f7..20fe335f445a 100644 --- a/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp +++ b/cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp @@ -183,6 +183,7 @@ VectorizedParquetRecordReader::VectorizedParquetRecordReader(const DB::Block & h "Parquet", format_settings_, std::nullopt, + std::nullopt, format_settings_.parquet.allow_missing_columns, format_settings_.null_as_default, format_settings_.date_time_overflow_behavior, diff --git a/cpp-ch/local-engine/Storages/Serializations/ExcelStringReader.h b/cpp-ch/local-engine/Storages/Serializations/ExcelStringReader.h index 082b6f72f737..e6939be3061a 100644 --- a/cpp-ch/local-engine/Storages/Serializations/ExcelStringReader.h +++ b/cpp-ch/local-engine/Storages/Serializations/ExcelStringReader.h @@ -33,7 +33,6 @@ static inline void excelRead(DB::IColumn & column, Reader && reader) try { reader(data); - data.push_back(0); offsets.push_back(data.size()); } catch (...) diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp index f2cf00e39299..854bd5a3ddba 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ORCFormatFile.cpp @@ -72,7 +72,7 @@ ORCFormatFile::createInputFormat(const DB::Block & header, const std::shared_ptr format_settings.orc.reader_time_zone_name = mapped_timezone; } //TODO: support prefetch - auto parser_group = std::make_shared(context->getSettingsRef(), 1, filter_actions_dag, context); + auto parser_group = std::make_shared(filter_actions_dag, context, nullptr); auto input_format = std::make_shared(*read_buffer, toShared(header), format_settings, false, 0, parser_group); return std::make_shared(std::move(read_buffer), input_format); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index dcddc57cdd5f..f8e3a3c91418 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -205,8 +205,9 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr // We need to disable fiter push down and read all row groups, so that we can get correct row index. format_settings.parquet.filter_push_down = false; } - auto parser_group = std::make_shared(context->getSettingsRef(), 1, filter_actions_dag, context); - auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_group, 8192); + auto parser_group = std::make_shared(filter_actions_dag, context, nullptr); + auto parser_shared_resources = std::make_shared(context->getSettingsRef(), /*num_streams_=*/1); + auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, 8192); return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header); }; diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index be0b6c0cbc9c..258a1b89d976 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -80,6 +80,7 @@ extern const SettingsUInt64 max_download_buffer_size; extern const SettingsBool input_format_allow_seeks; extern const SettingsUInt64 max_read_buffer_size; extern const SettingsBool s3_slow_all_threads_after_network_error; +extern const SettingsBool s3_slow_all_threads_after_retryable_error; extern const SettingsBool enable_s3_requests_logging; } namespace ErrorCodes @@ -549,12 +550,14 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder } // for AWS CN, the endpoint is like: https://s3.cn-north-1.amazonaws.com.cn, can still work + unsigned int s3_retry_attempts = static_cast(context->getSettingsRef()[DB::Setting::s3_retry_attempts]); DB::S3::PocoHTTPClientConfiguration client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration( region_name, context->getRemoteHostFilter(), static_cast(context->getSettingsRef()[DB::Setting::s3_max_redirects]), - static_cast(context->getSettingsRef()[DB::Setting::s3_retry_attempts]), + S3::PocoHTTPClientConfiguration::RetryStrategy{.max_retries = s3_retry_attempts}, context->getSettingsRef()[DB::Setting::s3_slow_all_threads_after_network_error], + context->getSettingsRef()[Setting::s3_slow_all_threads_after_retryable_error], context->getSettingsRef()[DB::Setting::enable_s3_requests_logging], false, nullptr, @@ -657,7 +660,7 @@ class AzureBlobReadBuffer : public ReadBufferBuilder DB::AzureBlobStorage::ConnectionParams params{ .endpoint = DB::AzureBlobStorage::processEndpoint(config, config_prefix), .auth_method = DB::AzureBlobStorage::getAuthMethod(config, config_prefix), - .client_options = DB::AzureBlobStorage::getClientOptions(context, *new_settings, is_client_for_disk), + .client_options = DB::AzureBlobStorage::getClientOptions(context, context->getSettingsRef(), *new_settings, is_client_for_disk), }; shared_client = DB::AzureBlobStorage::getContainerClient(params, true); diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp index 388beb12ef53..4b5a9d2f6a13 100644 --- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp +++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp @@ -88,8 +88,10 @@ void BM_ColumnIndexRead_Old(benchmark::State & state) { ReadBufferFromFilePRead fileReader(file); auto global_context = local_engine::QueryContext::globalContext(); - auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context); - auto format = std::make_shared(fileReader, header, format_settings, parser_group, 8192); + auto parser_group = std::make_shared(nullptr, global_context, nullptr); + auto parser_shared_resources + = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1); + auto format = std::make_shared(*in, header, format_settings, parser_shared_resources, parser_group, 8192); auto pipeline = QueryPipeline(std::move(format)); auto reader = std::make_unique(pipeline); while (reader->pull(res)) @@ -113,8 +115,10 @@ void BM_ParquetReadDate32(benchmark::State & state) { auto in = std::make_unique(file); auto global_context = local_engine::QueryContext::globalContext(); - auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context); - auto format = std::make_shared(*in, header, format_settings, parser_group, 8192); + auto parser_group = std::make_shared(nullptr, global_context, nullptr); + auto parser_shared_resources + = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1); + auto format = std::make_shared(fileReader, header, format_settings, parser_shared_resources, parser_group, 8192); auto pipeline = QueryPipeline(std::move(format)); auto reader = std::make_unique(pipeline); while (reader->pull(res)) diff --git a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp index d6048daa4f81..5b15f3d79f49 100644 --- a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp +++ b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp @@ -59,8 +59,10 @@ static void readParquetFile(const SharedHeader & header, const String & file, Bl auto in = std::make_unique(file); FormatSettings format_settings; auto global_context = QueryContext::globalContext(); - auto parser_group = std::make_shared(global_context->getSettingsRef(), 1, nullptr, global_context); - auto format = std::make_shared(*in, header, format_settings, std::move(parser_group), 8192); + auto parser_group = std::make_shared(nullptr, global_context, nullptr); + auto parser_shared_resources + = std::make_shared(global_context->getSettingsRef(), /*num_streams_=*/1); + auto format = std::make_shared(*in, header, format_settings, parser_shared_resources, std::move(parser_group), 8192); auto pipeline = QueryPipeline(std::move(format)); auto reader = std::make_unique(pipeline); while (reader->pull(block)) diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp index 7994b79091b7..021f59722e89 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp @@ -112,9 +112,11 @@ void readData(const String & path, const std::map & fields) InputFormatPtr format; auto parser_group - = std::make_shared(QueryContext::globalContext()->getSettingsRef(), 1, nullptr, QueryContext::globalContext()); + = std::make_shared(nullptr, QueryContext::globalContext(), nullptr); + auto parser_shared_resources + = std::make_shared(QueryContext::globalContext()->getSettingsRef(), /*num_streams_=*/1); if constexpr (std::is_same_v) - format = std::make_shared(in, header, settings, parser_group, 8192); + format = std::make_shared(in, header, settings, parser_shared_resources, parser_group, 8192); else format = std::make_shared(in, header, settings); @@ -366,6 +368,7 @@ TEST(ParquetRead, ArrowRead) "Parquet", format_settings, std::nullopt, + std::nullopt, format_settings.parquet.allow_missing_columns, format_settings.null_as_default, format_settings.date_time_overflow_behavior, diff --git a/cpp-ch/local-engine/tests/gtest_parquet_write.cpp b/cpp-ch/local-engine/tests/gtest_parquet_write.cpp index df01107bd346..d83445129cbb 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_write.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_write.cpp @@ -210,7 +210,7 @@ TEST(ParquetWrite, ComplexTypes) /// Convert Arrow Table to CH Block ArrowColumnToCHColumn arrow2ch( - header, "Parquet", format_settings, std::nullopt, true, true, FormatSettings::DateTimeOverflowBehavior::Ignore, false); + header, "Parquet", format_settings, std::nullopt, std::nullopt, true, true, FormatSettings::DateTimeOverflowBehavior::Ignore, false); Chunk output_chunk = arrow2ch.arrowTableToCHChunk(arrow_table, arrow_table->num_rows(), nullptr, nullptr); /// Compare input and output columns