diff --git a/utils/local-engine/CMakeLists.txt b/utils/local-engine/CMakeLists.txt index 6a953180dffa..3c225b1ab76e 100644 --- a/utils/local-engine/CMakeLists.txt +++ b/utils/local-engine/CMakeLists.txt @@ -17,6 +17,7 @@ add_headers_and_sources(common Common) add_headers_and_sources(external External) add_headers_and_sources(shuffle Shuffle) add_headers_and_sources(operator Operator) +add_headers_and_sources(jni jni) include_directories( ${JNI_INCLUDE_DIRS} @@ -38,6 +39,7 @@ add_library(${LOCALENGINE_SHARED_LIB} SHARED ${external_sources} ${shuffle_sources} ${operator_sources} + ${jni_sources} local_engine_jni.cpp) diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp index 4b7c5374ceed..d273bee815da 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp @@ -6,6 +6,12 @@ #include #include #include +#include +#include +#include +#include +#include + using namespace DB; @@ -53,6 +59,18 @@ PartitionColumnFillingTransform::PartitionColumnFillingTransform( partition_column = createPartitionColumn(); } +/// In the case that a partition column is wrapper by nullable and LowCardinality, we need to keep the data type same +/// as input. +ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPtr & nested_col, DataTypePtr original_data_type) +{ + auto result = nested_col; + if (original_data_type->getTypeId() == TypeIndex::Nullable) + { + result = ColumnNullable::create(nested_col, ColumnUInt8::create()); + } + return result; +} + ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() { ColumnPtr result; @@ -68,43 +86,51 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() WhichDataType which(nested_type); if (which.isInt8()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt16()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt32()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt64()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isFloat32()) { - result = createFloatPartitionColumn(partition_col_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value); } else if (which.isFloat64()) { - result = createFloatPartitionColumn(partition_col_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value); } else if (which.isDate()) { DayNum value; auto value_buffer = ReadBufferFromString(partition_col_value); readDateText(value, value_buffer); - result = partition_col_type->createColumnConst(1, value); + result = nested_type->createColumnConst(1, value); + } + else if (which.isDate32()) + { + ExtendedDayNum value; + auto value_buffer = ReadBufferFromString(partition_col_value); + readDateText(value, value_buffer); + result = nested_type->createColumnConst(1, value.toUnderType()); } else if (which.isString()) { - result = partition_col_type->createColumnConst(1, partition_col_value); + result = nested_type->createColumnConst(1, partition_col_value); } else { throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported datatype {}", partition_col_type->getFamilyName()); } + result = tryWrapPartitionColumn(result, partition_col_type); return result; } diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.h b/utils/local-engine/Operator/PartitionColumnFillingTransform.h index f3e0a606a506..65f7c0c2e0e6 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.h +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.h @@ -20,6 +20,7 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform private: DB::ColumnPtr createPartitionColumn(); + static DB::ColumnPtr tryWrapPartitionColumn(const DB::ColumnPtr & nested_col, DB::DataTypePtr original_data_type); DB::DataTypePtr partition_col_type; String partition_col_name; diff --git a/utils/local-engine/Parser/CHColumnToSparkRow.cpp b/utils/local-engine/Parser/CHColumnToSparkRow.cpp index 619b91fce93f..2c2d70453658 100644 --- a/utils/local-engine/Parser/CHColumnToSparkRow.cpp +++ b/utils/local-engine/Parser/CHColumnToSparkRow.cpp @@ -1,10 +1,15 @@ #include "CHColumnToSparkRow.h" +#include #include #include #include #include #include #include +#include "DataTypes/Serializations/ISerialization.h" +#include "base/types.h" +#include + namespace DB { @@ -106,12 +111,14 @@ void writeValue( std::vector & buffer_cursor) { ColumnPtr nested_col = col.column; + const auto * nullable_column = checkAndGetColumn(*col.column); if (nullable_column) { nested_col = nullable_column->getNestedColumnPtr(); } nested_col = nested_col->convertToFullColumnIfConst(); + WhichDataType which(nested_col->getDataType()); if (which.isUInt8()) { @@ -181,7 +188,7 @@ void writeValue( } else { - throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support type {} convert from ch to spark" ,magic_enum::enum_name(nested_col->getDataType())); + throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support type {} convert from ch to spark", col.type->getName()); } } diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index c64565fdd517..d10c6b7c62fa 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -41,6 +41,7 @@ #include #include +#include #include "SerializedPlanParser.h" namespace DB @@ -1281,7 +1282,18 @@ QueryPlanPtr SerializedPlanParser::parse(std::string & plan) { auto plan_ptr = std::make_unique(); plan_ptr->ParseFromString(plan); - LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", plan_ptr->DebugString()); + + auto printPlan = [](const std::string & plan_raw){ + substrait::Plan plan; + plan.ParseFromString(plan_raw); + std::string json_ret; + google::protobuf::util::JsonPrintOptions json_opt; + json_opt.add_whitespace = true; + google::protobuf::util::MessageToJsonString(plan, &json_ret, json_opt); + return json_ret; + }; + + LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", printPlan(plan)); return parse(std::move(plan_ptr)); } void SerializedPlanParser::initFunctionEnv() diff --git a/utils/local-engine/jni/jni_common.cpp b/utils/local-engine/jni/jni_common.cpp new file mode 100644 index 000000000000..a2423eb7e170 --- /dev/null +++ b/utils/local-engine/jni/jni_common.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +jclass CreateGlobalExceptionClassReference(JNIEnv* env, const char* class_name) +{ + jclass local_class = env->FindClass(class_name); + jclass global_class = static_cast(env->NewGlobalRef(local_class)); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_msg = "Unable to createGlobalClassReference for" + std::string(class_name); + throw std::runtime_error(error_msg); + } + return global_class; +} + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) +{ + jclass local_class = env->FindClass(class_name); + jclass global_class = static_cast(env->NewGlobalRef(local_class)); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_message = + "Unable to createGlobalClassReference for" + std::string(class_name); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + return global_class; +} + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) +{ + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + + return ret; +} + +jmethodID GetStaticMethodID(JNIEnv * env, jclass this_class, const char * name, const char * sig) +{ + jmethodID ret = env->GetStaticMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find static method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + return ret; +} + +jstring charTojstring(JNIEnv* env, const char* pat) { + jclass str_class = (env)->FindClass("Ljava/lang/String;"); + jmethodID ctor_id = (env)->GetMethodID(str_class, "", "([BLjava/lang/String;)V"); + jbyteArray bytes = (env)->NewByteArray(strlen(pat)); + (env)->SetByteArrayRegion(bytes, 0, strlen(pat), reinterpret_cast(const_cast(pat))); + jstring encoding = (env)->NewStringUTF("UTF-8"); + jstring result = static_cast((env)->NewObject(str_class, ctor_id, bytes, encoding)); + env->DeleteLocalRef(bytes); + env->DeleteLocalRef(encoding); + return result; +} +} diff --git a/utils/local-engine/jni/jni_common.h b/utils/local-engine/jni/jni_common.h new file mode 100644 index 000000000000..86ff5e5f9c63 --- /dev/null +++ b/utils/local-engine/jni/jni_common.h @@ -0,0 +1,17 @@ +#pragma once +#include + +namespace local_engine +{ +jclass CreateGlobalExceptionClassReference(JNIEnv *env, const char *class_name); + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name); + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig); + +jmethodID GetStaticMethodID(JNIEnv * env, jclass this_class, const char * name, const char * sig); + +jstring charTojstring(JNIEnv* env, const char* pat); + +} + diff --git a/utils/local-engine/jni/jni_error.cpp b/utils/local-engine/jni/jni_error.cpp new file mode 100644 index 000000000000..fcd0b6f80775 --- /dev/null +++ b/utils/local-engine/jni/jni_error.cpp @@ -0,0 +1,85 @@ + +#include +#include +#include +#include +#include +#include "Common/Exception.h" +#include + +namespace local_engine +{ +JniErrorsGlobalState & JniErrorsGlobalState::instance() +{ + static JniErrorsGlobalState instance; + return instance; +} + +void JniErrorsGlobalState::destroy(JNIEnv * env) +{ + if (env) + { + if (io_exception_class) + { + env->DeleteGlobalRef(io_exception_class); + } + if (runtime_exception_class) + { + env->DeleteGlobalRef(runtime_exception_class); + } + if (unsupportedoperation_exception_class) + { + env->DeleteGlobalRef(unsupportedoperation_exception_class); + } + if (illegal_access_exception_class) + { + env->DeleteGlobalRef(illegal_access_exception_class); + } + if (illegal_argument_exception_class) + { + env->DeleteGlobalRef(illegal_argument_exception_class); + } + } +} + +void JniErrorsGlobalState::initialize(JNIEnv * env_) +{ + io_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/io/IOException;"); + runtime_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/RuntimeException;"); + unsupportedoperation_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/UnsupportedOperationException;"); + illegal_access_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/IllegalArgumentException;"); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env, const DB::Exception & e) +{ + throwRuntimeException(env, e.message(), e.getStackTraceString()); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env, const std::exception & e) +{ + throwRuntimeException(env, e.what(), DB::getExceptionStackTraceString(e)); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env,jclass exception_class, const std::string & message, const std::string & stack_trace) +{ + if (exception_class) + { + std::string error_msg = message + "\n" + stack_trace; + env->ThrowNew(exception_class, error_msg.c_str()); + } + else + { + // This will cause a coredump + throw std::runtime_error("Not found java runtime exception class"); + } + +} + +void JniErrorsGlobalState::throwRuntimeException(JNIEnv * env,const std::string & message, const std::string & stack_trace) +{ + throwException(env, runtime_exception_class, message, stack_trace); +} + + +} diff --git a/utils/local-engine/jni/jni_error.h b/utils/local-engine/jni/jni_error.h new file mode 100644 index 000000000000..184664828206 --- /dev/null +++ b/utils/local-engine/jni/jni_error.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +namespace local_engine +{ +class JniErrorsGlobalState : boost::noncopyable +{ +protected: + JniErrorsGlobalState() = default; +public: + ~JniErrorsGlobalState() = default; + + static JniErrorsGlobalState & instance(); + void initialize(JNIEnv * env_); + void destroy(JNIEnv * env); + + inline jclass getIOExceptionClass() { return io_exception_class; } + inline jclass getRuntimeExceptionClass() { return runtime_exception_class; } + inline jclass getUnsupportedOperationExceptionClass() { return unsupportedoperation_exception_class; } + inline jclass getIllegalAccessExceptionClass() { return illegal_access_exception_class; } + inline jclass getIllegalArgumentExceptionClass() { return illegal_argument_exception_class; } + + void throwException(JNIEnv * env, const DB::Exception & e); + void throwException(JNIEnv * env, const std::exception & e); + static void throwException(JNIEnv * env, jclass exception_class, const std::string & message, const std::string & stack_trace = ""); + void throwRuntimeException(JNIEnv * env, const std::string & message, const std::string & stack_trace = ""); + + +private: + jclass io_exception_class = nullptr; + jclass runtime_exception_class = nullptr; + jclass unsupportedoperation_exception_class = nullptr; + jclass illegal_access_exception_class = nullptr; + jclass illegal_argument_exception_class = nullptr; + +}; +// + +#define LOCAL_ENGINE_JNI_METHOD_START \ + try { + +#define LOCAL_ENGINE_JNI_METHOD_END(env, ret) \ + }\ + catch(DB::Exception & e)\ + {\ + local_engine::JniErrorsGlobalState::instance().throwException(env, e);\ + return ret;\ + }\ + catch (std::exception & e)\ + {\ + local_engine::JniErrorsGlobalState::instance().throwException(env, e);\ + return ret;\ + }\ + catch (...)\ + {\ + std::ostringstream ostr;\ + ostr << boost::stacktrace::stacktrace();\ + local_engine::JniErrorsGlobalState::instance().throwRuntimeException(env, "Unknow Exception", ostr.str().c_str());\ + return ret;\ + } +} diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index ddc1ead6920b..c34007420083 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -16,7 +16,11 @@ #include #include #include -#include "jni_common.h" +#include +#include +#include +#include +#include bool inside_main = true; @@ -38,17 +42,12 @@ std::vector stringSplit(const std::string & str, char delim) } } -DB::ColumnWithTypeAndName getColumnFromColumnVector(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address, jint column_position) +DB::ColumnWithTypeAndName inline getColumnFromColumnVector(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try - { - DB::Block * block = reinterpret_cast(block_address); - return block->getByPosition(column_position); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + DB::Block * block = reinterpret_cast(block_address); + return block->getByPosition(column_position); + LOCAL_ENGINE_JNI_METHOD_END(env,{}) } std::string jstring2string(JNIEnv * env, jstring jStr) @@ -103,47 +102,41 @@ jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) { return JNI_ERR; } - io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;"); - runtime_exception_class = CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;"); - unsupportedoperation_exception_class = CreateGlobalClassReference(env, "Ljava/lang/UnsupportedOperationException;"); - illegal_access_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); - illegal_argument_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + local_engine::JniErrorsGlobalState::instance().initialize(env); - spark_row_info_class = CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;"); + spark_row_info_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;"); spark_row_info_constructor = env->GetMethodID(spark_row_info_class, "", "([J[JJJJ)V"); - split_result_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/SplitResult;"); - split_result_constructor = GetMethodID(env, split_result_class, "", "(JJJJJJ[J[J)V"); + split_result_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/SplitResult;"); + split_result_constructor = local_engine::GetMethodID(env, split_result_class, "", "(JJJJJJ[J[J)V"); - local_engine::ShuffleReader::input_stream_class = CreateGlobalClassReference(env, "Ljava/io/InputStream;"); - local_engine::NativeSplitter::iterator_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); - local_engine::WriteBufferFromJavaOutputStream::output_stream_class = CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); + local_engine::ShuffleReader::input_stream_class = local_engine::CreateGlobalClassReference(env, "Ljava/io/InputStream;"); + local_engine::NativeSplitter::iterator_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); + local_engine::WriteBufferFromJavaOutputStream::output_stream_class = local_engine::CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class - = CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); + = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); local_engine::ShuffleReader::input_stream_read = env->GetMethodID(local_engine::ShuffleReader::input_stream_class, "read", "([B)I"); - local_engine::NativeSplitter::iterator_has_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); - local_engine::NativeSplitter::iterator_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); + local_engine::NativeSplitter::iterator_has_next = local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); + local_engine::NativeSplitter::iterator_next = local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); local_engine::WriteBufferFromJavaOutputStream::output_stream_write - = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); + = local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); local_engine::WriteBufferFromJavaOutputStream::output_stream_flush - = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); + = local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext - = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); + = local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next - = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); - + = local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); local_engine::SparkRowToCHColumn::spark_row_interator_class - = CreateGlobalClassReference(env, "Lio/glutenproject/execution/SparkRowIterator;"); + = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/execution/SparkRowIterator;"); local_engine::SparkRowToCHColumn::spark_row_interator_hasNext - = GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "hasNext", "()Z"); + = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "hasNext", "()Z"); local_engine::SparkRowToCHColumn::spark_row_interator_next - = GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B"); - + = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B"); local_engine::JNIUtils::vm = vm; return JNI_VERSION_1_8; @@ -153,11 +146,7 @@ void JNI_OnUnload(JavaVM * vm, void * /*reserved*/) { JNIEnv * env; vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8); - env->DeleteGlobalRef(io_exception_class); - env->DeleteGlobalRef(runtime_exception_class); - env->DeleteGlobalRef(unsupportedoperation_exception_class); - env->DeleteGlobalRef(illegal_access_exception_class); - env->DeleteGlobalRef(illegal_argument_exception_class); + local_engine::JniErrorsGlobalState::instance().destroy(env); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class); env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); @@ -173,146 +162,116 @@ void JNI_OnUnload(JavaVM * vm, void * /*reserved*/) local_engine::BroadCastJoinBuilder::clean(); } -void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv *, jobject, jbyteArray) +void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray) { - try - { - init(); - } - catch (const DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + init(); + LOCAL_ENGINE_JNI_METHOD_END(env, ) } jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithRowIterator( JNIEnv * env, jobject /*obj*/, jbyteArray plan) { - try - { - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto * executor = createExecutor(plan_string); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto * executor = createExecutor(plan_string); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator( JNIEnv * env, jobject /*obj*/, jlong, jbyteArray plan, jobjectArray iter_arr) { - try + LOCAL_ENGINE_JNI_METHOD_START + auto context = Coordination::Context::createCopy(local_engine::SerializedPlanParser::global_context); + + local_engine::SerializedPlanParser parser(context); + jsize iter_num = env->GetArrayLength(iter_arr); + for (jsize i = 0; i < iter_num; i++) { - auto context = Coordination::Context::createCopy(local_engine::SerializedPlanParser::global_context); - local_engine::SerializedPlanParser parser(context); - jsize iter_num = env->GetArrayLength(iter_arr); - for (jsize i = 0; i < iter_num; i++) - { - jobject iter = env->GetObjectArrayElement(iter_arr, i); - iter = env->NewGlobalRef(iter); - parser.addInputIter(iter); - } - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto query_plan = parser.parse(plan_string); - local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); - executor->execute(std::move(query_plan)); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); + jobject iter = env->GetObjectArrayElement(iter_arr, i); + iter = env->NewGlobalRef(iter); + parser.addInputIter(iter); } + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto query_plan = parser.parse(plan_string); + local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); + executor->execute(std::move(query_plan)); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -jboolean Java_io_glutenproject_row_RowIterator_nativeHasNext(JNIEnv * /*env*/, jobject /*obj*/, jlong executor_address) +jboolean Java_io_glutenproject_row_RowIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - return executor->hasNext(); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + return executor->hasNext(); + LOCAL_ENGINE_JNI_METHOD_END(env, false) } jobject Java_io_glutenproject_row_RowIterator_nativeNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - local_engine::SparkRowInfoPtr spark_row_info = executor->next(); + LOCAL_ENGINE_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + local_engine::SparkRowInfoPtr spark_row_info = executor->next(); - auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * offsets_src = reinterpret_cast(spark_row_info->getOffsets().data()); - env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); - auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * lengths_src = reinterpret_cast(spark_row_info->getLengths().data()); - env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); - int64_t address = reinterpret_cast(spark_row_info->getBufferAddress()); - int64_t column_number = reinterpret_cast(spark_row_info->getNumCols()); - int64_t total_size = reinterpret_cast(spark_row_info->getTotalBytes()); + auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); + const auto * offsets_src = reinterpret_cast(spark_row_info->getOffsets().data()); + env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); + auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); + const auto * lengths_src = reinterpret_cast(spark_row_info->getLengths().data()); + env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); + int64_t address = reinterpret_cast(spark_row_info->getBufferAddress()); + int64_t column_number = reinterpret_cast(spark_row_info->getNumCols()); + int64_t total_size = reinterpret_cast(spark_row_info->getTotalBytes()); - jobject spark_row_info_object = env->NewObject( - spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); + jobject spark_row_info_object + = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); - return spark_row_info_object; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + return spark_row_info_object; + LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } -void Java_io_glutenproject_row_RowIterator_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong executor_address) +void Java_io_glutenproject_row_RowIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); delete executor; + LOCAL_ENGINE_JNI_METHOD_END(env,) } // Columnar Iterator -jboolean Java_io_glutenproject_vectorized_BatchIterator_nativeHasNext(JNIEnv * /*env*/, jobject /*obj*/, jlong executor_address) +jboolean Java_io_glutenproject_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - return executor->hasNext(); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + return executor->hasNext(); + LOCAL_ENGINE_JNI_METHOD_END(env, false) } -jlong Java_io_glutenproject_vectorized_BatchIterator_nativeCHNext(JNIEnv * /*env*/, jobject /*obj*/, jlong executor_address) +jlong Java_io_glutenproject_vectorized_BatchIterator_nativeCHNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - DB::Block * column_batch = executor->nextColumnar(); - return reinterpret_cast(column_batch); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + DB::Block * column_batch = executor->nextColumnar(); + return reinterpret_cast(column_batch); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BatchIterator_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong executor_address) +void Java_io_glutenproject_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); delete executor; + LOCAL_ENGINE_JNI_METHOD_END(env,) } void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetJavaTmpDir(JNIEnv * /*env*/, jobject /*obj*/, jstring /*dir*/) @@ -331,58 +290,51 @@ void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMet jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try + LOCAL_ENGINE_JNI_METHOD_START + DB::Block * block = reinterpret_cast(block_address); + auto col = getColumnFromColumnVector(env, obj, block_address, column_position); + if (!col.column->isNullable()) { - DB::Block * block = reinterpret_cast(block_address); - auto col = getColumnFromColumnVector(env, obj, block_address, column_position); - if (!col.column->isNullable()) - { - return false; - } - else - { - const auto * nullable = checkAndGetColumn(*col.column); - size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); - return num_nulls < block->rows(); - } + return false; } - catch (DB::Exception & e) + else { - local_engine::ExceptionUtils::handleException(e); + const auto * nullable = checkAndGetColumn(*col.column); + size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); + return num_nulls < block->rows(); } + LOCAL_ENGINE_JNI_METHOD_END(env,false) } jint Java_io_glutenproject_vectorized_CHColumnVector_nativeNumNulls(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try + LOCAL_ENGINE_JNI_METHOD_START + auto col = getColumnFromColumnVector(env, obj, block_address, column_position); + if (!col.column->isNullable()) { - auto col = getColumnFromColumnVector(env, obj, block_address, column_position); - if (!col.column->isNullable()) - { - return 0; - } - else - { - const auto * nullable = checkAndGetColumn(*col.column); - return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); - } + return 0; } - catch (DB::Exception & e) + else { - local_engine::ExceptionUtils::handleException(e); + const auto * nullable = checkAndGetColumn(*col.column); + return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); } + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeIsNullAt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); return col.column->isNullAt(row_id); + LOCAL_ENGINE_JNI_METHOD_END(env, false) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -390,11 +342,13 @@ jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getBool(row_id); + LOCAL_ENGINE_JNI_METHOD_END(env, false) } jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -402,11 +356,13 @@ jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte( nested_col = nullable_col->getNestedColumnPtr(); } return reinterpret_cast(nested_col->getDataAt(row_id).data)[0]; + LOCAL_ENGINE_JNI_METHOD_END(env, 0) } jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -414,11 +370,13 @@ jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort( nested_col = nullable_col->getNestedColumnPtr(); } return reinterpret_cast(nested_col->getDataAt(row_id).data)[0]; + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -433,11 +391,13 @@ jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt( { return nested_col->getInt(row_id); } + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -445,11 +405,13 @@ jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getInt(row_id); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -457,11 +419,13 @@ jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getFloat32(row_id); + LOCAL_ENGINE_JNI_METHOD_END(env, 0.0) } jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -469,11 +433,13 @@ jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getFloat64(row_id); + LOCAL_ENGINE_JNI_METHOD_END(env, 0.0) } jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn(nested_col.get())) @@ -482,7 +448,8 @@ jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString( } const auto * string_col = checkAndGetColumn(nested_col.get()); auto result = string_col->getDataAt(row_id); - return charTojstring(env, result.toString().c_str()); + return local_engine::charTojstring(env, result.toString().c_str()); + LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::charTojstring(env, "")) } // native block @@ -490,20 +457,25 @@ void Java_io_glutenproject_vectorized_CHNativeBlock_nativeClose(JNIEnv * /*env*/ { } -jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address) +jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * env, jobject /*obj*/, jlong block_address) { + LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast(block_address); return block->rows(); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address) +jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * env, jobject /*obj*/, jlong block_address) { + LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast(block_address); return block->columns(); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } jstring Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * env, jobject /*obj*/, jlong block_address, jint position) { + LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast(block_address); DB::WhichDataType which(block->getByPosition(position).type); std::string type; @@ -568,108 +540,92 @@ jstring Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * throw std::runtime_error("unsupported datatype " + type_name); } - return charTojstring(env, type.c_str()); + return local_engine::charTojstring(env, type.c_str()); + LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::charTojstring(env, "")) } -jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address) +jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * env, jobject /*obj*/, jlong block_address) { + LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast(block_address); return block->bytes(); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader( JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed) { - try - { - auto * input = env->NewGlobalRef(input_stream); - auto read_buffer = std::make_unique(input); - auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed); - return reinterpret_cast(shuffle_reader); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + auto * input = env->NewGlobalRef(input_stream); + auto read_buffer = std::make_unique(input); + auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed); + return reinterpret_cast(shuffle_reader); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * /*env*/, jobject /*obj*/, jlong shuffle_reader) +jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); DB::Block * block = reader->read(); return reinterpret_cast(block); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_CHStreamReader_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong shuffle_reader) +void Java_io_glutenproject_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); delete reader; + LOCAL_ENGINE_JNI_METHOD_END(env,) } -jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv * /*env*/, jobject /*obj*/, jint buf_size) +jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv * env, jobject /*obj*/, jint buf_size) { - try - { - local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size); - return reinterpret_cast(instance); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size); + return reinterpret_cast(instance); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeMergeBlock( - JNIEnv * /*env*/, jobject /*obj*/, jlong instance_address, jlong block_address) + JNIEnv * env, jobject /*obj*/, jlong instance_address, jlong block_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - DB::Block * block = reinterpret_cast(block_address); - auto new_block = DB::Block(*block); - instance->mergeBlock(new_block); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + DB::Block * block = reinterpret_cast(block_address); + auto new_block = DB::Block(*block); + instance->mergeBlock(new_block); + LOCAL_ENGINE_JNI_METHOD_END(env,) } -jboolean Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * /*env*/, jobject /*obj*/, jlong instance_address) +jboolean Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * env, jobject /*obj*/, jlong instance_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - bool full = instance->isFull(); - return full ? JNI_TRUE : JNI_FALSE; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + bool full = instance->isFull(); + return full ? JNI_TRUE : JNI_FALSE; + LOCAL_ENGINE_JNI_METHOD_END(env, false) } -jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv * /*env*/, jobject /*obj*/, jlong instance_address) +jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv * env, jobject /*obj*/, jlong instance_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - auto block = instance->releaseBlock(); - DB::Block * new_block = new DB::Block(); - new_block->swap(block); - auto address = reinterpret_cast(new_block); - return address; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + auto block = instance->releaseBlock(); + DB::Block * new_block = new DB::Block(); + new_block->swap(block); + Int64 address = reinterpret_cast(new_block); + return address; + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong instance_address) +void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong instance_address) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); delete instance; + LOCAL_ENGINE_JNI_METHOD_END(env,) } // Splitter Jni Wrapper @@ -685,98 +641,87 @@ jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMake( jstring data_file, jstring local_dirs) { - try - { - std::vector expr_vec; - if (expr_list != nullptr) + LOCAL_ENGINE_JNI_METHOD_START + std::vector expr_vec; + if (expr_list != nullptr) + { + int len = env->GetArrayLength(expr_list); + auto * str = reinterpret_cast(new char[len]); + memset(str, 0, len); + env->GetByteArrayRegion(expr_list, 0, len, str); + std::string exprs(str, str + len); + delete[] str; + for (const auto & expr : stringSplit(exprs, ',')) { - int len = env->GetArrayLength(expr_list); - auto * str = reinterpret_cast(new char[len]); - memset(str, 0, len); - env->GetByteArrayRegion(expr_list, 0, len, str); - std::string exprs(str, str + len); - delete[] str; - for (const auto & expr : stringSplit(exprs, ',')) - { - expr_vec.emplace_back(expr); - } + expr_vec.emplace_back(expr); } - local_engine::SplitOptions options{ - .buffer_size = static_cast(buffer_size), - .data_file = jstring2string(env, data_file), - .local_tmp_dir = jstring2string(env, local_dirs), - .map_id = static_cast(map_id), - .partition_nums = static_cast(num_partitions), - .exprs = expr_vec, - .compress_method = jstring2string(env, codec)}; - local_engine::SplitterHolder * splitter - = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(jstring2string(env, short_name), options)}; - return reinterpret_cast(splitter); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); } + local_engine::SplitOptions options{ + .buffer_size = static_cast(buffer_size), + .data_file = jstring2string(env, data_file), + .local_tmp_dir = jstring2string(env, local_dirs), + .map_id = static_cast(map_id), + .partition_nums = static_cast(num_partitions), + .exprs = expr_vec, + .compress_method = jstring2string(env, codec)}; + local_engine::SplitterHolder * splitter + = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(jstring2string(env, short_name), options)}; + return reinterpret_cast(splitter); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv *, jobject, jlong splitterId, jint, jlong block) +void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv * env, jobject, jlong splitterId, jint, jlong block) { - try - { - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - DB::Block * data = reinterpret_cast(block); - splitter->splitter->split(*data); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + LOCAL_ENGINE_JNI_METHOD_START + local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); + DB::Block * data = reinterpret_cast(block); + splitter->splitter->split(*data); + LOCAL_ENGINE_JNI_METHOD_END(env,) } jobject Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId) { - try - { - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - auto result = splitter->splitter->stop(); - const auto & partition_lengths = result.partition_length; - auto * partition_length_arr = env->NewLongArray(partition_lengths.size()); - const auto * src = reinterpret_cast(partition_lengths.data()); - env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); - - const auto & raw_partition_lengths = result.raw_partition_length; - auto * raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); - const auto * raw_src = reinterpret_cast(raw_partition_lengths.data()); - env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); - - jobject split_result = env->NewObject( - split_result_class, - split_result_constructor, - result.total_compute_pid_time, - result.total_write_time, - result.total_spill_time, - 0, - result.total_bytes_written, - result.total_bytes_written, - partition_length_arr, - raw_partition_length_arr); - - return split_result; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } -} -void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv *, jobject, jlong splitterId) -{ + LOCAL_ENGINE_JNI_METHOD_START + local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); + auto result = splitter->splitter->stop(); + const auto & partition_lengths = result.partition_length; + auto partition_length_arr = env->NewLongArray(partition_lengths.size()); + auto src = reinterpret_cast(partition_lengths.data()); + env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); + + const auto & raw_partition_lengths = result.raw_partition_length; + auto raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); + auto raw_src = reinterpret_cast(raw_partition_lengths.data()); + env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); + + jobject split_result = env->NewObject( + split_result_class, + split_result_constructor, + result.total_compute_pid_time, + result.total_write_time, + result.total_spill_time, + 0, + result.total_bytes_written, + result.total_bytes_written, + partition_length_arr, + raw_partition_length_arr); + + return split_result; + LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) +} + +void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv * env, jobject, jlong splitterId) +{ + LOCAL_ENGINE_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); delete splitter; + LOCAL_ENGINE_JNI_METHOD_END(env,) } // BlockNativeConverter jobject Java_io_glutenproject_vectorized_BlockNativeConverter_convertColumnarToRow(JNIEnv * env, jobject, jlong block_address) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; DB::Block * block = reinterpret_cast(block_address); auto spark_row_info = converter.convertCHColumnToSparkRow(*block); @@ -795,12 +740,15 @@ jobject Java_io_glutenproject_vectorized_BlockNativeConverter_convertColumnarToR = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); return spark_row_info_object; + LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } -void Java_io_glutenproject_vectorized_BlockNativeConverter_freeMemory(JNIEnv *, jobject, jlong address, jlong size) +void Java_io_glutenproject_vectorized_BlockNativeConverter_freeMemory(JNIEnv * env, jobject, jlong address, jlong size) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; converter.freeMem(reinterpret_cast(address), size); + LOCAL_ENGINE_JNI_METHOD_END(env,) } jlong Java_io_glutenproject_vectorized_BlockNativeConverter_convertSparkRowsToCHColumn( @@ -815,8 +763,8 @@ jlong Java_io_glutenproject_vectorized_BlockNativeConverter_convertSparkRowsToCH jboolean * p_booleans = env->GetBooleanArrayElements(is_nullables, nullptr); for (int i = 0; i < column_size; i++) { - jstring name = (jstring)(env->GetObjectArrayElement(names, i)); - jstring type = (jstring)(env->GetObjectArrayElement(types, i)); + jstring name = reinterpret_cast(env->GetObjectArrayElement(names, i)); + jstring type = reinterpret_cast(env->GetObjectArrayElement(types, i)); c_names.push_back(jstring2string(env, name)); c_types.push_back(jstring2string(env, type)); c_isnullables.push_back(p_booleans[i] == JNI_TRUE); @@ -835,41 +783,52 @@ void Java_io_glutenproject_vectorized_BlockNativeConverter_freeBlock(JNIEnv * en converter.freeBlock(reinterpret_cast(block_address)); } -jlong Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCreateInstance(JNIEnv *, jobject) +jlong Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCreateInstance(JNIEnv * env, jobject) { + LOCAL_ENGINE_JNI_METHOD_START auto * writer = new local_engine::NativeWriterInMemory(); return reinterpret_cast(writer); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeWrite(JNIEnv *, jobject, jlong instance, jlong block_address) +void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { + LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast(instance); auto * block = reinterpret_cast(block_address); writer->write(*block); + LOCAL_ENGINE_JNI_METHOD_END(env,) } -jint Java_io_glutenproject_vectorized_BlockNativeWriter_nativeResultSize(JNIEnv *, jobject, jlong instance) +jint Java_io_glutenproject_vectorized_BlockNativeWriter_nativeResultSize(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast(instance); return static_cast(writer->collect().size()); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCollect(JNIEnv * env, jobject, jlong instance, jbyteArray result) { + LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast(instance); auto data = writer->collect(); env->SetByteArrayRegion(result, 0, data.size(), reinterpret_cast(data.data())); + LOCAL_ENGINE_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeClose(JNIEnv *, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeClose(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast(instance); delete writer; + LOCAL_ENGINE_JNI_METHOD_END(env,) } void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( JNIEnv * env, jobject, jstring hash_table_id_, jobject in, jstring join_key_, jstring join_type_, jbyteArray named_struct) { + LOCAL_ENGINE_JNI_METHOD_START auto * input = env->NewGlobalRef(in); auto read_buffer = std::make_unique(input); auto hash_table_id = jstring2string(env, hash_table_id_); @@ -881,12 +840,14 @@ void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( struct_string.assign(reinterpret_cast(struct_address), struct_size); local_engine::BroadCastJoinBuilder::buildJoinIfNotExist(hash_table_id, std::move(read_buffer), join_key, join_type, struct_string); env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT); + LOCAL_ENGINE_JNI_METHOD_END(env,) } // BlockSplitIterator jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jint partition_num, jint buffer_size) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Options options; options.partition_nums = partition_num; options.buffer_size = buffer_size; @@ -896,98 +857,116 @@ jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{ .splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)}; return reinterpret_cast(splitter); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); delete splitter; + LOCAL_ENGINE_JNI_METHOD_END(env,) } -jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * /*env*/, jobject, jlong instance) +jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return splitter->splitter->hasNext(); + LOCAL_ENGINE_JNI_METHOD_END(env, false) } -jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * /*env*/, jobject, jlong instance) +jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return reinterpret_cast(splitter->splitter->next()); + LOCAL_ENGINE_JNI_METHOD_END(env, false) } -jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * /*env*/, jobject, jlong instance) +jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return reinterpret_cast(splitter->splitter->nextPartitionId()); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(JNIEnv * /*env*/, jobject, jobject output_stream, jbyteArray buffer) +jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = new local_engine::ShuffleWriter(output_stream, buffer); return reinterpret_cast(writer); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); writer->flush(); delete writer; + LOCAL_ENGINE_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * /*env*/, jobject, jlong instance, jlong block_address) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); DB::Block * block = reinterpret_cast(block_address); writer->write(*block); + LOCAL_ENGINE_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * env, jobject, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); writer->flush(); + LOCAL_ENGINE_JNI_METHOD_END(env,) } jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass, jobject input, jbyteArray plan) { - try - { - auto context = DB::Context::createCopy(local_engine::SerializedPlanParser::global_context); - local_engine::SerializedPlanParser parser(context); - jobject iter = env->NewGlobalRef(input); - parser.addInputIter(iter); - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto query_plan = parser.parse(plan_string); - local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); - executor->execute(std::move(query_plan)); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } -} - -void Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * /*env*/, jclass, jlong instance) -{ + LOCAL_ENGINE_JNI_METHOD_START + auto context = DB::Context::createCopy(local_engine::SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(context); + jobject iter = env->NewGlobalRef(input); + parser.addInputIter(iter); + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto query_plan = parser.parse(plan_string); + local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); + executor->execute(std::move(query_plan)); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) +} + +void Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance) +{ + LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); delete executor; + LOCAL_ENGINE_JNI_METHOD_END(env,) } -jboolean Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * /*env*/, jclass, jlong instance) +jboolean Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * env, jclass, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); return executor->hasNext(); + LOCAL_ENGINE_JNI_METHOD_END(env, false) } -jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * /*env*/, jclass, jlong instance) +jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * env, jclass, jlong instance) { + LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); return reinterpret_cast(executor->nextColumnar()); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) } #ifdef __cplusplus diff --git a/utils/local-engine/tool/parquet_to_mergetree.py b/utils/local-engine/tool/parquet_to_mergetree.py index af545fb95d70..92051ce9bdc6 100644 --- a/utils/local-engine/tool/parquet_to_mergetree.py +++ b/utils/local-engine/tool/parquet_to_mergetree.py @@ -1,12 +1,15 @@ -from argparse import ArgumentParser import os +import re +import subprocess +from argparse import ArgumentParser +from multiprocessing import Pool parser = ArgumentParser() parser.add_argument("--path", type=str, required=True, help="temp directory for merge tree") parser.add_argument("--source", type=str, required=True, help="directory of parquet files") parser.add_argument("--dst", type=str, required=True, help="destination directory for merge tree") parser.add_argument("--schema", type=str, - default="l_orderkey Int64,l_partkey Int64,l_suppkey Int64,l_linenumber Int64,l_quantity Float64,l_extendedprice Float64,l_discount Float64,l_tax Float64,l_returnflag String,l_linestatus String,l_shipdate Date,l_commitdate Date,l_receiptdate Date,l_shipinstruct String,l_shipmode String,l_comment String") + default="l_orderkey Nullable(Int64),l_partkey Nullable(Int64),l_suppkey Nullable(Int64),l_linenumber Nullable(Int64),l_quantity Nullable(Float64),l_extendedprice Nullable(Float64),l_discount Nullable(Float64),l_tax Nullable(Float64),l_returnflag Nullable(String),l_linestatus Nullable(String),l_shipdate Nullable(Date),l_commitdate Nullable(Date),l_receiptdate Nullable(Date),l_shipinstruct Nullable(String),l_shipmode Nullable(String),l_comment Nullable(String)") def get_transform_command(data_path, @@ -24,7 +27,7 @@ def get_transform_command(data_path, def get_move_command(data_path, dst_path, no): - return f"cp -r {data_path}/data/_local/m1/all_1_* {dst_path}/all_{no}_{no}_0" + return f"mkdir -p {dst_path}/all_{no}_{no}_1; cp -r {data_path}/data/_local/m1/all_1_1_1/* {dst_path}/all_{no}_{no}_1" def get_clean_command(data_path): @@ -48,6 +51,50 @@ def transform(data_path, source, schema, dst): raise Exception(command2 + " failed") print(f"{abs_file}") +class Engine(object): + def __init__(self, source, data_path, schema, dst): + self.source = source + self.data_path = data_path + self.schema = schema + self.dst = dst + def __call__(self, ele): + no = ele[0] + file = ele[1] + abs_file = f"{self.source}/{file}" + print(abs_file) + if not os.path.exists(abs_file): + raise f"{abs_file} not found" + private_path = f"{self.data_path}/{str(no)}" + os.system(f"mkdir -p {private_path}") + command1 = get_transform_command(private_path, abs_file, self.schema) + command2 = get_move_command(private_path, self.dst, no+1) + command3 = get_clean_command(private_path) + if os.system(command3) != 0: + raise Exception(command3 + " failed") + if os.system(command1) != 0: + raise Exception(command1 + " failed") + if os.system(command2) != 0: + raise Exception(command2 + " failed") + print(f"{abs_file}") + + +def multi_transform(data_path, source, schema, dst): + assert os.path.exists(data_path), f"{data_path} is not exist" + data_inputs = enumerate([file for file in os.listdir(source) if file.endswith(".parquet")]) + pool = Pool() + engine = Engine(source, data_path, schema, dst) + pool.map(engine, list(data_inputs)) # process data_inputs iterable with pool + + +def check_version(version): + proc = subprocess.Popen(["clickhouse-local", "--version"], stdout=subprocess.PIPE, shell=False) + (out, err) = proc.communicate() + if err: + raise Exception(f"Fail to call clickhouse-local, error: {err}") + ver = re.search(r'version\s*([\d.]+)', str(out)).group(1) + ver_12 = float(ver.split('.')[0] + '.' + ver.split('.')[1]) + if ver_12 >= float(version): + raise Exception(f"Version of clickhouse-local too high({ver}), should be <= 22.5") """ python3 parquet_to_mergetree.py --path=/root/data/tmp --source=/home/ubuntu/tpch-data-sf100/lineitem --dst=/root/data/mergetree @@ -56,4 +103,6 @@ def transform(data_path, source, schema, dst): args = parser.parse_args() if not os.path.exists(args.dst): os.mkdir(args.dst) - transform(args.path, args.source, args.schema, args.dst) + #transform(args.path, args.source, args.schema, args.dst) + check_version('22.6') + multi_transform(args.path, args.source, args.schema, args.dst)