Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class GlutenClickHouseWholeStageTransformerSuite
.set("spark.sql.warehouse.dir", warehouse)
.setCHConfig("user_defined_path", "/tmp/user_defined")
.set(RuntimeConfig.PATH.key, UTSystemParameters.diskOutputDataPath)
.set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/$SPARK_DIR_NAME")
.set(RuntimeConfig.TMP_PATH.key, s"/tmp/libch/")
if (UTSystemParameters.testMergeTreeOnObjectStorage) {
minioHelper.setFileSystem(conf)
minioHelper.setStoreConfig(conf, BUCKET_NAME)
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ else()
-DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF -DENABLE_LDAP=OFF
-DENABLE_MSGPACK=OFF -DUSE_REPLXX=OFF -DENABLE_CLICKHOUSE_ALL=OFF
-DENABLE_NUMACTL=OFF -DENABLE_GOOGLE_CLOUD_CPP=OFF
-DCOMPILER_FLAGS='-fvisibility=hidden -fvisibility-inlines-hidden' -S
${CH_SOURCE_DIR} -G Ninja -B ${CH_BINARY_DIR} && cmake --build
${CH_BINARY_DIR} --target libch\"
-DENABLE_ARROW_FLIGHT=OFF -DCOMPILER_FLAGS='-fvisibility=hidden
-fvisibility-inlines-hidden' -S ${CH_SOURCE_DIR} -G Ninja -B
${CH_BINARY_DIR} && cmake --build ${CH_BINARY_DIR} --target libch\"
OUTPUT _build_ch)
endif()

Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20250729
CH_COMMIT=77ef0818976
CH_BRANCH=rebase_ch/20250904
CH_COMMIT=4fc3e17478c
36 changes: 36 additions & 0 deletions cpp-ch/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,42 @@ target_link_libraries(

target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC ch_parquet)

# Wrap the malloc/free and other C-style functions with our own ones to inject
# memory tracking mechanism into them. Sanitizers have their own way of
# intercepting the allocations and deallocations, so we skip this step for them.
# Define a macro to wrap memory allocation/deallocation functions for memory
# tracking Param: target_lib - The target library to apply these wrappers to
macro(add_memory_tracking_wrappers target_lib)
# Only apply these wrappers when not using sanitizers and not on macOS or
# FreeBSD
if(NOT
(SANITIZE
OR SANITIZE_COVERAGE
OR OS_DARWIN
OR OS_FREEBSD))
# Add linker options to wrap standard C memory allocation functions
target_link_options(
${target_lib}
PRIVATE
"LINKER:--wrap=malloc"
"LINKER:--wrap=free"
"LINKER:--wrap=calloc"
"LINKER:--wrap=realloc"
"LINKER:--wrap=aligned_alloc"
"LINKER:--wrap=posix_memalign"
"LINKER:--wrap=valloc"
"LINKER:--wrap=memalign"
"LINKER:--wrap=reallocarray")

# Wrap pvalloc only when not using MUSL C library
if(NOT USE_MUSL)
target_link_options(${target_lib} PRIVATE "LINKER:--wrap=pvalloc")
endif()
endif()
endmacro()

add_memory_tracking_wrappers(${LOCALENGINE_SHARED_LIB})

if(NOT APPLE)
if(ENABLE_JEMALLOC)
target_link_options(
Expand Down
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,13 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
global_context->setConfig(config);

auto tmp_path = config->getString("tmp_path", PathConfig::DEFAULT_TEMP_FILE_PATH);
LOG_ERROR(getLogger("BackendInitializerUtil"), "xxxx tmp path: {}. default tmp path: {}", tmp_path, PathConfig::DEFAULT_TEMP_FILE_PATH);
if (config->getBool(PathConfig::USE_CURRENT_DIRECTORY_AS_TMP, false))
{
char buffer[PATH_MAX];
if (getcwd(buffer, sizeof(buffer)) != nullptr)
tmp_path = std::string(buffer) + tmp_path;
LOG_ERROR(getLogger("BackendInitializerUtil"), "use current directory as tmp path: {}", tmp_path);
};

global_context->setTemporaryStoragePath(tmp_path, 0);
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Common/LoggerExtend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "LoggerExtend.h"
#include <Loggers/ExtendedLogMessage.h>
#include <Loggers/OwnSplitChannel.h>

#include <Loggers/Loggers.h>
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Functions/SparkCastComplexTypesToString.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class SparkCastComplexTypesToString : public DB::IFunction
for (size_t row = 0; row < input_rows_count; ++row)
{
serializeTuple(*tuple_col, row, tuple_type->getElements(), write_buffer, format_settings);
write_helper.rowWritten();
write_helper.finishRow();
}
write_helper.finalize();
}
Expand All @@ -126,7 +126,7 @@ class SparkCastComplexTypesToString : public DB::IFunction
for (size_t row = 0; row < input_rows_count; ++row)
{
serializeMap(*map_col, row, key_type, value_type, write_buffer, format_settings);
write_helper.rowWritten();
write_helper.finishRow();
}
write_helper.finalize();
}
Expand All @@ -136,7 +136,7 @@ class SparkCastComplexTypesToString : public DB::IFunction
for (size_t row = 0; row < input_rows_count; ++row)
{
serializeArray(*array_col, row, array_type->getNestedType(), write_buffer, format_settings);
write_helper.rowWritten();
write_helper.finishRow();
}
write_helper.finalize();
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class SparkFunctionArrayJoin : public IFunction
}
else
{
const StringRef s(&string_data[data_pos], string_offsets[j + array_pos] - data_pos - 1);
const StringRef s(&string_data[data_pos], string_offsets[j + array_pos] - data_pos);
res += s.toString();
last_not_null_pos = res.size();
if (j != array_size - 1)
Expand Down
3 changes: 1 addition & 2 deletions cpp-ch/local-engine/Functions/SparkFunctionBin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ namespace
val >>= 1;
} while (val != 0 && char_pos > 0);

pos += len + 1;
out_chars[pos - 1] = '\0';
pos += len;
out_offsets[i] = pos;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ class SparkFunctionCastFloatToString : public IFunction
{
writeFloatText(src_col->getElement(i), write_buffer);
writeFloatEnd<F>(src_col->getElement(i), write_buffer);
writeChar(0, write_buffer);
res_offsets[i] = write_buffer.count();
}
return true;
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
bool document_ok = false;
if (col_json_const)
{
std::string_view json{reinterpret_cast<const char *>(chars.data()), offsets[0] - 1};
std::string_view json{reinterpret_cast<const char *>(chars.data()), offsets[0]};
document_ok = safeParseJson(json, parser, document);
}

Expand All @@ -778,7 +778,7 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
{
if (!col_json_const)
{
std::string_view json{reinterpret_cast<const char *>(&chars[offsets[i - 1]]), offsets[i] - offsets[i - 1] - 1};
std::string_view json{reinterpret_cast<const char *>(&chars[offsets[i - 1]]), offsets[i] - offsets[i - 1]};
document_ok = safeParseJson(json, parser, document);
}
if (document_ok)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class SparkFunctionAnyHash : public DB::IFunction
{
if (!null_map || !(*null_map)[i]) [[likely]]
vec_to[i] = applyUnsafeBytes(
reinterpret_cast<const char *>(&data[current_offset]), offsets[i] - current_offset - 1, vec_to[i]);
reinterpret_cast<const char *>(&data[current_offset]), offsets[i] - current_offset, vec_to[i]);

current_offset = offsets[i];
}
Expand Down
15 changes: 8 additions & 7 deletions cpp-ch/local-engine/Functions/SparkFunctionPositionUTF8.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsStringSearch.h>
#include <Functions/PositionImpl.h>
#include <Common/logger_useful.h>

namespace DB
{
Expand Down Expand Up @@ -176,8 +177,8 @@ struct PositionSparkImpl

for (size_t i = 0; i < input_rows_count; ++i)
{
size_t needle_size = needle_offsets[i] - prev_needle_offset - 1;
size_t haystack_size = haystack_offsets[i] - prev_haystack_offset - 1;
size_t needle_size = needle_offsets[i] - prev_needle_offset;
size_t haystack_size = haystack_offsets[i] - prev_haystack_offset;

auto start = start_pos != nullptr ? start_pos->getUInt(i) : UInt64(0);

Expand All @@ -195,14 +196,14 @@ struct PositionSparkImpl
/// It is assumed that the StringSearcher is not very difficult to initialize.
typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack(
reinterpret_cast<const char *>(&needle_data[prev_needle_offset]),
needle_offsets[i] - prev_needle_offset - 1); /// zero byte at the end
needle_offsets[i] - prev_needle_offset);

const char * beg = Impl::advancePos(
reinterpret_cast<const char *>(&haystack_data[prev_haystack_offset]),
reinterpret_cast<const char *>(&haystack_data[haystack_offsets[i] - 1]),
reinterpret_cast<const char *>(&haystack_data[haystack_offsets[i]]),
start - 1);
/// searcher returns a pointer to the found substring or to the end of `haystack`.
size_t pos = searcher.search(reinterpret_cast<const UInt8 *>(beg), &haystack_data[haystack_offsets[i] - 1])
size_t pos = searcher.search(reinterpret_cast<const UInt8 *>(beg), &haystack_data[haystack_offsets[i]])
- &haystack_data[prev_haystack_offset];

if (pos != haystack_size)
Expand Down Expand Up @@ -239,7 +240,7 @@ struct PositionSparkImpl

for (size_t i = 0; i < input_rows_count; ++i)
{
size_t needle_size = needle_offsets[i] - prev_needle_offset - 1;
size_t needle_size = needle_offsets[i] - prev_needle_offset;

auto start = start_pos != nullptr ? start_pos->getUInt(i) : UInt64(0);

Expand All @@ -254,7 +255,7 @@ struct PositionSparkImpl
else
{
typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack(
reinterpret_cast<const char *>(&needle_data[prev_needle_offset]), needle_offsets[i] - prev_needle_offset - 1);
reinterpret_cast<const char *>(&needle_data[prev_needle_offset]), needle_offsets[i] - prev_needle_offset);

const char * beg = Impl::advancePos(haystack.data(), haystack.data() + haystack.size(), start - 1);
size_t pos = searcher.search(
Expand Down
16 changes: 6 additions & 10 deletions cpp-ch/local-engine/Functions/SparkFunctionRegexpExtractAll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,14 @@ namespace
const auto & match = matches[match_index];
if (match.offset != std::string::npos)
{
res_strings_chars.resize_exact(res_strings_offset + match.length + 1);
res_strings_chars.resize_exact(res_strings_offset + match.length);
memcpySmallAllowReadWriteOverflow15(&res_strings_chars[res_strings_offset], pos + match.offset, match.length);
res_strings_offset += match.length;
}
else
res_strings_chars.resize_exact(res_strings_offset + 1);
res_strings_chars.resize_exact(res_strings_offset);

/// Update offsets of Column:String
res_strings_chars[res_strings_offset] = 0;
++res_strings_offset;
res_strings_offsets.push_back(res_strings_offset);
++i;

Expand Down Expand Up @@ -221,7 +219,7 @@ namespace
for (size_t cur_offset : offsets)
{
Pos start = reinterpret_cast<const char *>(&data[prev_offset]);
Pos end = start + (cur_offset - prev_offset - 1);
Pos end = start + (cur_offset - prev_offset);
saveMatchs(
start,
end,
Expand Down Expand Up @@ -272,7 +270,7 @@ namespace

size_t cur_offset = offsets[i];
Pos start = reinterpret_cast<const char *>(&data[prev_offset]);
Pos end = start + (cur_offset - prev_offset - 1);
Pos end = start + (cur_offset - prev_offset);
saveMatchs(
start,
end,
Expand Down Expand Up @@ -356,16 +354,14 @@ namespace
/// Append matched segment into res_strings_chars
if (match.offset != std::string::npos)
{
res_strings_chars.resize_exact(res_strings_offset + match.length + 1);
res_strings_chars.resize_exact(res_strings_offset + match.length);
memcpySmallAllowReadWriteOverflow15(&res_strings_chars[res_strings_offset], start + match.offset, match.length);
res_strings_offset += match.length;
}
else
res_strings_chars.resize_exact(res_strings_offset + 1);
res_strings_chars.resize_exact(res_strings_offset);

/// Update offsets of Column:String
res_strings_chars[res_strings_offset] = 0;
++res_strings_offset;
res_strings_offsets.push_back(res_strings_offset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ namespace
if (!is_string_type)
std::reverse(data.begin(), data.end());

data_to.resize(offset + data.size() + 1);
data_to.resize(offset + data.size());
memcpy(&data_to[offset], data.data(), data.size());
offset += data.size();
data_to[offset] = 0;
++offset;
offsets_to[i] = offset;
}

Expand Down
12 changes: 6 additions & 6 deletions cpp-ch/local-engine/Functions/SparkFunctionStrToMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ class TrivialCharSplitter

bool next(Pos & token_begin, Pos & token_end)
{
if (str_cursor >= str_end)
if (str_cursor > str_end)
return false;
token_begin = str_cursor;
auto next_token_pos = static_cast<Pos>(memmem(str_cursor, str_end - str_cursor, delimiter.c_str(), delimiter.size()));
// If delimiter is not found, return the remaining string.
if (!next_token_pos)
{
token_end = str_end;
str_cursor = str_end;
str_cursor = str_end + 1;
delimiter_begin = nullptr;
delimiter_end = nullptr;
}
Expand Down Expand Up @@ -126,7 +126,7 @@ struct RegularSplitter

bool next(Pos & token_begin, Pos & token_end)
{
if (str_cursor >= str_end)
if (str_cursor > str_end)
return false;
// If delimiter is empty, return each character as a token.
if (!re)
Expand All @@ -143,7 +143,7 @@ struct RegularSplitter
{
token_begin = str_cursor;
token_end = str_end;
str_cursor = str_end;
str_cursor = str_end + 1;
delimiter_begin = nullptr;
delimiter_end = nullptr;
return true;
Expand Down Expand Up @@ -271,7 +271,7 @@ class SparkFunctionStrToMap : public DB::IFunction
{
DB::Tuple tuple(2);
size_t key_len = key_end - key_begin;
tuple[0] = key_end == str_end ? std::string_view(key_begin, key_len - 1) : std::string_view(key_begin, key_len);
tuple[0] = key_end == str_end ? std::string_view(key_begin, key_len) : std::string_view(key_begin, key_len);
auto delimiter_begin = kv_generator.getDelimiterBegin();
auto delimiter_end = kv_generator.getDelimiterEnd();
LOG_TRACE(
Expand All @@ -284,7 +284,7 @@ class SparkFunctionStrToMap : public DB::IFunction
std::string_view(key_begin, key_end - key_begin));
if (delimiter_begin && delimiter_begin != str_end)
{
DB::Field value = pair_end == str_end ? std::string_view(delimiter_end, pair_end - delimiter_end - 1)
DB::Field value = pair_end == str_end ? std::string_view(delimiter_end, pair_end - delimiter_end)
: std::string_view(delimiter_end, pair_end - delimiter_end);
tuple[1] = std::move(value);
}
Expand Down
3 changes: 1 addition & 2 deletions cpp-ch/local-engine/Functions/SparkFunctionTrim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ namespace
size_t res_offset = row > 0 ? res_offsets[row - 1] : 0;
res_data.resize_exact(res_data.size() + dst_size + 1);
memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], dst, dst_size);
res_offset += dst_size + 1;
res_data[res_offset - 1] = '\0';
res_offset += dst_size;
res_offsets[row] = res_offset;
}

Expand Down
7 changes: 3 additions & 4 deletions cpp-ch/local-engine/Functions/SparkParseURL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct ExtractNullableSubstringImpl

for (size_t i = 0; i < size; ++i)
{
String s(reinterpret_cast<const char *>(&data[prev_offset]), offsets[i] - prev_offset - 1);
String s(reinterpret_cast<const char *>(&data[prev_offset]), offsets[i] - prev_offset);
try
{
Poco::URI uri(s, false);
Expand All @@ -69,7 +69,7 @@ struct ExtractNullableSubstringImpl
start = nullptr;
length = 0;
}
res_data.resize_exact(res_data.size() + length + 1);
res_data.resize_exact(res_data.size() + length);
if (start)
{
memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], start, length);
Expand All @@ -79,8 +79,7 @@ struct ExtractNullableSubstringImpl
{
null_map.insert(1);
}
res_offset += length + 1;
res_data[res_offset - 1] = 0;
res_offset += length;

res_offsets[i] = res_offset;
prev_offset = offsets[i];
Expand Down
Loading