From dee2ac66ec673b8b8ae142ee6e676d215a53f273 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Thu, 15 Sep 2022 21:01:32 +0800 Subject: [PATCH 1/3] Multiple processes transfer parquet to mergetree (#110) * multiple transfer and fix test cmakelists.txt * add ch version check and change default schema --- .../local-engine/tool/parquet_to_mergetree.py | 57 +++++++++++++++++-- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/utils/local-engine/tool/parquet_to_mergetree.py b/utils/local-engine/tool/parquet_to_mergetree.py index af545fb95d70..92051ce9bdc6 100644 --- a/utils/local-engine/tool/parquet_to_mergetree.py +++ b/utils/local-engine/tool/parquet_to_mergetree.py @@ -1,12 +1,15 @@ -from argparse import ArgumentParser import os +import re +import subprocess +from argparse import ArgumentParser +from multiprocessing import Pool parser = ArgumentParser() parser.add_argument("--path", type=str, required=True, help="temp directory for merge tree") parser.add_argument("--source", type=str, required=True, help="directory of parquet files") parser.add_argument("--dst", type=str, required=True, help="destination directory for merge tree") parser.add_argument("--schema", type=str, - default="l_orderkey Int64,l_partkey Int64,l_suppkey Int64,l_linenumber Int64,l_quantity Float64,l_extendedprice Float64,l_discount Float64,l_tax Float64,l_returnflag String,l_linestatus String,l_shipdate Date,l_commitdate Date,l_receiptdate Date,l_shipinstruct String,l_shipmode String,l_comment String") + default="l_orderkey Nullable(Int64),l_partkey Nullable(Int64),l_suppkey Nullable(Int64),l_linenumber Nullable(Int64),l_quantity Nullable(Float64),l_extendedprice Nullable(Float64),l_discount Nullable(Float64),l_tax Nullable(Float64),l_returnflag Nullable(String),l_linestatus Nullable(String),l_shipdate Nullable(Date),l_commitdate Nullable(Date),l_receiptdate Nullable(Date),l_shipinstruct Nullable(String),l_shipmode Nullable(String),l_comment Nullable(String)") def get_transform_command(data_path, @@ -24,7 +27,7 @@ def get_transform_command(data_path, def get_move_command(data_path, dst_path, no): - return f"cp -r {data_path}/data/_local/m1/all_1_* {dst_path}/all_{no}_{no}_0" + return f"mkdir -p {dst_path}/all_{no}_{no}_1; cp -r {data_path}/data/_local/m1/all_1_1_1/* {dst_path}/all_{no}_{no}_1" def get_clean_command(data_path): @@ -48,6 +51,50 @@ def transform(data_path, source, schema, dst): raise Exception(command2 + " failed") print(f"{abs_file}") +class Engine(object): + def __init__(self, source, data_path, schema, dst): + self.source = source + self.data_path = data_path + self.schema = schema + self.dst = dst + def __call__(self, ele): + no = ele[0] + file = ele[1] + abs_file = f"{self.source}/{file}" + print(abs_file) + if not os.path.exists(abs_file): + raise f"{abs_file} not found" + private_path = f"{self.data_path}/{str(no)}" + os.system(f"mkdir -p {private_path}") + command1 = get_transform_command(private_path, abs_file, self.schema) + command2 = get_move_command(private_path, self.dst, no+1) + command3 = get_clean_command(private_path) + if os.system(command3) != 0: + raise Exception(command3 + " failed") + if os.system(command1) != 0: + raise Exception(command1 + " failed") + if os.system(command2) != 0: + raise Exception(command2 + " failed") + print(f"{abs_file}") + + +def multi_transform(data_path, source, schema, dst): + assert os.path.exists(data_path), f"{data_path} is not exist" + data_inputs = enumerate([file for file in os.listdir(source) if file.endswith(".parquet")]) + pool = Pool() + engine = Engine(source, data_path, schema, dst) + pool.map(engine, list(data_inputs)) # process data_inputs iterable with pool + + +def check_version(version): + proc = subprocess.Popen(["clickhouse-local", "--version"], stdout=subprocess.PIPE, shell=False) + (out, err) = proc.communicate() + if err: + raise Exception(f"Fail to call clickhouse-local, error: {err}") + ver = re.search(r'version\s*([\d.]+)', str(out)).group(1) + ver_12 = float(ver.split('.')[0] + '.' + ver.split('.')[1]) + if ver_12 >= float(version): + raise Exception(f"Version of clickhouse-local too high({ver}), should be <= 22.5") """ python3 parquet_to_mergetree.py --path=/root/data/tmp --source=/home/ubuntu/tpch-data-sf100/lineitem --dst=/root/data/mergetree @@ -56,4 +103,6 @@ def transform(data_path, source, schema, dst): args = parser.parse_args() if not os.path.exists(args.dst): os.mkdir(args.dst) - transform(args.path, args.source, args.schema, args.dst) + #transform(args.path, args.source, args.schema, args.dst) + check_version('22.6') + multi_transform(args.path, args.source, args.schema, args.dst) From 7e169ef84def2df18fb7c95083cee7882390045b Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 8 Sep 2022 14:00:03 +0800 Subject: [PATCH 2/3] fix bugs 1. support Date32 in some cases 2. fix a bug in PartitionColumnFillingTransform when column's type is nullable --- .../PartitionColumnFillingTransform.cpp | 42 +++++++++++++++---- .../PartitionColumnFillingTransform.h | 1 + .../Parser/CHColumnToSparkRow.cpp | 9 +++- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp index 4b7c5374ceed..d273bee815da 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp @@ -6,6 +6,12 @@ #include #include #include +#include +#include +#include +#include +#include + using namespace DB; @@ -53,6 +59,18 @@ PartitionColumnFillingTransform::PartitionColumnFillingTransform( partition_column = createPartitionColumn(); } +/// In the case that a partition column is wrapper by nullable and LowCardinality, we need to keep the data type same +/// as input. +ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPtr & nested_col, DataTypePtr original_data_type) +{ + auto result = nested_col; + if (original_data_type->getTypeId() == TypeIndex::Nullable) + { + result = ColumnNullable::create(nested_col, ColumnUInt8::create()); + } + return result; +} + ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() { ColumnPtr result; @@ -68,43 +86,51 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() WhichDataType which(nested_type); if (which.isInt8()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt16()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt32()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt64()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isFloat32()) { - result = createFloatPartitionColumn(partition_col_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value); } else if (which.isFloat64()) { - result = createFloatPartitionColumn(partition_col_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value); } else if (which.isDate()) { DayNum value; auto value_buffer = ReadBufferFromString(partition_col_value); readDateText(value, value_buffer); - result = partition_col_type->createColumnConst(1, value); + result = nested_type->createColumnConst(1, value); + } + else if (which.isDate32()) + { + ExtendedDayNum value; + auto value_buffer = ReadBufferFromString(partition_col_value); + readDateText(value, value_buffer); + result = nested_type->createColumnConst(1, value.toUnderType()); } else if (which.isString()) { - result = partition_col_type->createColumnConst(1, partition_col_value); + result = nested_type->createColumnConst(1, partition_col_value); } else { throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported datatype {}", partition_col_type->getFamilyName()); } + result = tryWrapPartitionColumn(result, partition_col_type); return result; } diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.h b/utils/local-engine/Operator/PartitionColumnFillingTransform.h index f3e0a606a506..65f7c0c2e0e6 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.h +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.h @@ -20,6 +20,7 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform private: DB::ColumnPtr createPartitionColumn(); + static DB::ColumnPtr tryWrapPartitionColumn(const DB::ColumnPtr & nested_col, DB::DataTypePtr original_data_type); DB::DataTypePtr partition_col_type; String partition_col_name; diff --git a/utils/local-engine/Parser/CHColumnToSparkRow.cpp b/utils/local-engine/Parser/CHColumnToSparkRow.cpp index 619b91fce93f..2c2d70453658 100644 --- a/utils/local-engine/Parser/CHColumnToSparkRow.cpp +++ b/utils/local-engine/Parser/CHColumnToSparkRow.cpp @@ -1,10 +1,15 @@ #include "CHColumnToSparkRow.h" +#include #include #include #include #include #include #include +#include "DataTypes/Serializations/ISerialization.h" +#include "base/types.h" +#include + namespace DB { @@ -106,12 +111,14 @@ void writeValue( std::vector & buffer_cursor) { ColumnPtr nested_col = col.column; + const auto * nullable_column = checkAndGetColumn(*col.column); if (nullable_column) { nested_col = nullable_column->getNestedColumnPtr(); } nested_col = nested_col->convertToFullColumnIfConst(); + WhichDataType which(nested_col->getDataType()); if (which.isUInt8()) { @@ -181,7 +188,7 @@ void writeValue( } else { - throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support type {} convert from ch to spark" ,magic_enum::enum_name(nested_col->getDataType())); + throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support type {} convert from ch to spark", col.type->getName()); } } From 0da3ff4aa0ef8cbc7305d64b215cd482da97e399 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 14 Sep 2022 20:54:45 +0800 Subject: [PATCH 3/3] support multi parition columns --- .../PartitionColumnFillingTransform.cpp | 84 +++++++++++-------- .../PartitionColumnFillingTransform.h | 15 ++-- .../Parser/SerializedPlanParser.cpp | 33 +++++--- 3 files changed, 79 insertions(+), 53 deletions(-) diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp index d273bee815da..4ff597dacdf1 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp @@ -6,12 +6,16 @@ #include #include #include +#include "Processors/Chunk.h" #include #include #include #include #include +#include +#include + using namespace DB; @@ -26,40 +30,37 @@ namespace ErrorCodes namespace local_engine { template -requires( - std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v) - ColumnPtr createIntPartitionColumn(DataTypePtr column_type, std::string partition_value) + requires(std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v) +ColumnPtr createIntPartitionColumn(DataTypePtr column_type, std::string partition_value, size_t rows) { Type value; auto value_buffer = ReadBufferFromString(partition_value); readIntText(value, value_buffer); - return column_type->createColumnConst(1, value); + return column_type->createColumnConst(rows, value); } template -requires(std::is_same_v || std::is_same_v) ColumnPtr - createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value) + requires(std::is_same_v || std::is_same_v) +ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value, size_t rows) { Type value; auto value_buffer = ReadBufferFromString(partition_value); readFloatText(value, value_buffer); - return column_type->createColumnConst(1, value); + return column_type->createColumnConst(rows, value); } -//template <> -//ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value); -//template <> -//ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value); PartitionColumnFillingTransform::PartitionColumnFillingTransform( - const DB::Block & input_, const DB::Block & output_, const String & partition_col_name_, const String & partition_col_value_) - : ISimpleTransform(input_, output_, true), partition_col_name(partition_col_name_), partition_col_value(partition_col_value_) + const DB::Block & input_, const DB::Block & output_, const PartitionValues & partition_columns_) + : ISimpleTransform(input_, output_, true), partition_column_values(partition_columns_) { - partition_col_type = output_.getByName(partition_col_name_).type; - partition_column = createPartitionColumn(); + for (const auto & value : partition_column_values) + { + partition_columns[value.first] = value.second; + } } -/// In the case that a partition column is wrapper by nullable and LowCardinality, we need to keep the data type same +/// In the case that a partition column is wrapper by nullable or LowCardinality, we need to keep the data type same /// as input. ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPtr & nested_col, DataTypePtr original_data_type) { @@ -71,9 +72,10 @@ ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPt return result; } -ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() +ColumnPtr PartitionColumnFillingTransform::createPartitionColumn(const String & parition_col, const String & partition_col_value, size_t rows) { ColumnPtr result; + auto partition_col_type = output.getHeader().getByName(parition_col).type; DataTypePtr nested_type = partition_col_type; if (const DataTypeNullable * nullable_type = checkAndGetDataType(partition_col_type.get())) { @@ -86,45 +88,45 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() WhichDataType which(nested_type); if (which.isInt8()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isInt16()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isInt32()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isInt64()) { - result = createIntPartitionColumn(nested_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isFloat32()) { - result = createFloatPartitionColumn(nested_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isFloat64()) { - result = createFloatPartitionColumn(nested_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value, rows); } else if (which.isDate()) { DayNum value; auto value_buffer = ReadBufferFromString(partition_col_value); readDateText(value, value_buffer); - result = nested_type->createColumnConst(1, value); + result = nested_type->createColumnConst(rows, value); } else if (which.isDate32()) { ExtendedDayNum value; auto value_buffer = ReadBufferFromString(partition_col_value); readDateText(value, value_buffer); - result = nested_type->createColumnConst(1, value.toUnderType()); + result = nested_type->createColumnConst(rows, value.toUnderType()); } else if (which.isString()) { - result = nested_type->createColumnConst(1, partition_col_value); + result = nested_type->createColumnConst(rows, partition_col_value); } else { @@ -136,14 +138,30 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() void PartitionColumnFillingTransform::transform(DB::Chunk & chunk) { - size_t partition_column_position = output.getHeader().getPositionByName(partition_col_name); - if (partition_column_position == input.getHeader().columns()) - { - chunk.addColumn(partition_column->cloneResized(chunk.getNumRows())); - } - else + auto rows = chunk.getNumRows(); + auto input_cols = chunk.detachColumns(); + Columns result_cols; + auto input_header = input.getHeader(); + for (const auto & output_col : output.getHeader()) { - chunk.addColumn(partition_column_position, partition_column->cloneResized(chunk.getNumRows())); + if (input_header.has(output_col.name)) + { + size_t pos = input_header.getPositionByName(output_col.name); + result_cols.push_back(input_cols[pos]); + } + else + { + // it's a partition column + auto it = partition_columns.find(output_col.name); + if (it == partition_columns.end()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found column({}) in parition columns", output_col.name); + } + result_cols.emplace_back(createPartitionColumn(it->first, it->second, rows)); + + } + } + chunk = DB::Chunk(std::move(result_cols), rows); } } diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.h b/utils/local-engine/Operator/PartitionColumnFillingTransform.h index 65f7c0c2e0e6..fb3272125145 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.h +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.h @@ -1,6 +1,10 @@ #pragma once #include +#include "Common/StringUtils.h" +#include "Columns/IColumn.h" +#include "Core/Block.h" +#include "DataTypes/Serializations/ISerialization.h" namespace local_engine { @@ -10,8 +14,7 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform PartitionColumnFillingTransform( const DB::Block & input_, const DB::Block & output_, - const String & partition_col_name_, - const String & partition_col_value_); + const PartitionValues & partition_columns_); void transform(DB::Chunk & chunk) override; String getName() const override { @@ -19,13 +22,11 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform } private: - DB::ColumnPtr createPartitionColumn(); + DB::ColumnPtr createPartitionColumn(const String & parition_col, const String & partition_col_value, size_t row); static DB::ColumnPtr tryWrapPartitionColumn(const DB::ColumnPtr & nested_col, DB::DataTypePtr original_data_type); - DB::DataTypePtr partition_col_type; - String partition_col_name; - String partition_col_value; - DB::ColumnPtr partition_column; + PartitionValues partition_column_values; + std::map partition_columns; }; } diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index c64565fdd517..ad5e94b4b622 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -1,4 +1,5 @@ -#include +#include "SerializedPlanParser.h" +#include #include #include #include @@ -41,7 +42,7 @@ #include #include -#include "SerializedPlanParser.h" +#include namespace DB { @@ -197,19 +198,14 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrait::R } auto header = parseNameStruct(rel.base_schema()); PartitionValues partition_values = StringUtils::parsePartitionTablePath(files_info->files[0]); - if (partition_values.size() > 1) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "doesn't support multiple level partition."); - } - ProcessorPtr partition_transform; - if (!partition_values.empty()) + + auto origin_header = header.cloneEmpty(); + for (const auto & partition_value : partition_values) { - auto origin_header = header.cloneEmpty(); - PartitionValue partition_value = partition_values[0]; header.erase(partition_value.first); - partition_transform - = std::make_shared(header, origin_header, partition_value.first, partition_value.second); } + ProcessorPtr partition_transform = std::make_shared(header, origin_header, partition_values); + auto query_plan = std::make_unique(); std::shared_ptr source = std::make_shared(files_info, header, context); auto source_pipe = Pipe(source); @@ -1281,7 +1277,18 @@ QueryPlanPtr SerializedPlanParser::parse(std::string & plan) { auto plan_ptr = std::make_unique(); plan_ptr->ParseFromString(plan); - LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", plan_ptr->DebugString()); + + auto printPlan = [](const std::string & plan_raw){ + substrait::Plan plan; + plan.ParseFromString(plan_raw); + std::string json_ret; + google::protobuf::util::JsonPrintOptions json_opt; + json_opt.add_whitespace = true; + google::protobuf::util::MessageToJsonString(plan, &json_ret, json_opt); + return json_ret; + }; + + LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", printPlan(plan)); return parse(std::move(plan_ptr)); } void SerializedPlanParser::initFunctionEnv()