From 8d32cab20e23f296ae7f7e41297a79612ace89b0 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Wed, 7 Sep 2022 02:13:12 +0000 Subject: [PATCH 1/5] multiple transfer and fix test cmakelists.txt --- utils/local-engine/tests/CMakeLists.txt | 2 +- .../local-engine/tool/parquet_to_mergetree.py | 40 ++++++++++++++++++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/utils/local-engine/tests/CMakeLists.txt b/utils/local-engine/tests/CMakeLists.txt index da3fb5e3b5a2..5723c6f1a57f 100644 --- a/utils/local-engine/tests/CMakeLists.txt +++ b/utils/local-engine/tests/CMakeLists.txt @@ -19,7 +19,7 @@ configure_file( ) set(HAVE_POSIX_REGEX 1) include(FetchContent) -FetchContent_Declare(googlebenchmark GIT_REPOSITORY https://github.com/google/benchmark GIT_TAG master) +FetchContent_Declare(googlebenchmark GIT_REPOSITORY https://github.com/google/benchmark GIT_TAG main) FetchContent_MakeAvailable(googlebenchmark) include_directories( ${builder_headers} diff --git a/utils/local-engine/tool/parquet_to_mergetree.py b/utils/local-engine/tool/parquet_to_mergetree.py index af545fb95d70..99ff74137551 100644 --- a/utils/local-engine/tool/parquet_to_mergetree.py +++ b/utils/local-engine/tool/parquet_to_mergetree.py @@ -1,5 +1,6 @@ from argparse import ArgumentParser import os +from multiprocessing import Pool parser = ArgumentParser() parser.add_argument("--path", type=str, required=True, help="temp directory for merge tree") @@ -24,7 +25,7 @@ def get_transform_command(data_path, def get_move_command(data_path, dst_path, no): - return f"cp -r {data_path}/data/_local/m1/all_1_* {dst_path}/all_{no}_{no}_0" + return f"mkdir -p {dst_path}/all_{no}_{no}_1; cp -r {data_path}/data/_local/m1/all_1_1_1/* {dst_path}/all_{no}_{no}_1" def get_clean_command(data_path): @@ -48,6 +49,40 @@ def transform(data_path, source, schema, dst): raise Exception(command2 + " failed") print(f"{abs_file}") +class Engine(object): + def __init__(self, source, data_path, schema, dst): + self.source = source + self.data_path = data_path + self.schema = schema + self.dst = dst + def __call__(self, ele): + no = ele[0] + file = ele[1] + abs_file = f"{self.source}/{file}" + print(abs_file) + if not os.path.exists(abs_file): + raise f"{abs_file} not found" + private_path = f"{self.data_path}/{str(no)}" + os.system(f"mkdir -p {private_path}") + command1 = get_transform_command(private_path, abs_file, self.schema) + command2 = get_move_command(private_path, self.dst, no+1) + command3 = get_clean_command(private_path) + if os.system(command3) != 0: + raise Exception(command3 + " failed") + if os.system(command1) != 0: + raise Exception(command1 + " failed") + if os.system(command2) != 0: + raise Exception(command2 + " failed") + print(f"{abs_file}") + + +def multi_transform(data_path, source, schema, dst): + assert os.path.exists(data_path), f"{data_path} is not exist" + data_inputs = enumerate([file for file in os.listdir(source) if file.endswith(".parquet")]) + pool = Pool() + engine = Engine(source, data_path, schema, dst) + pool.map(engine, list(data_inputs)) # process data_inputs iterable with pool + """ python3 parquet_to_mergetree.py --path=/root/data/tmp --source=/home/ubuntu/tpch-data-sf100/lineitem --dst=/root/data/mergetree @@ -56,4 +91,5 @@ def transform(data_path, source, schema, dst): args = parser.parse_args() if not os.path.exists(args.dst): os.mkdir(args.dst) - transform(args.path, args.source, args.schema, args.dst) + #transform(args.path, args.source, args.schema, args.dst) + multi_transform(args.path, args.source, args.schema, args.dst) From da1d3ab01e41a7d8c34c315b4f7f713e306b8589 Mon Sep 17 00:00:00 2001 From: zhanglistar Date: Wed, 7 Sep 2022 03:53:07 +0000 Subject: [PATCH 2/5] add ch version check and change default schema --- utils/local-engine/tool/parquet_to_mergetree.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/utils/local-engine/tool/parquet_to_mergetree.py b/utils/local-engine/tool/parquet_to_mergetree.py index 99ff74137551..92051ce9bdc6 100644 --- a/utils/local-engine/tool/parquet_to_mergetree.py +++ b/utils/local-engine/tool/parquet_to_mergetree.py @@ -1,5 +1,7 @@ -from argparse import ArgumentParser import os +import re +import subprocess +from argparse import ArgumentParser from multiprocessing import Pool parser = ArgumentParser() @@ -7,7 +9,7 @@ parser.add_argument("--source", type=str, required=True, help="directory of parquet files") parser.add_argument("--dst", type=str, required=True, help="destination directory for merge tree") parser.add_argument("--schema", type=str, - default="l_orderkey Int64,l_partkey Int64,l_suppkey Int64,l_linenumber Int64,l_quantity Float64,l_extendedprice Float64,l_discount Float64,l_tax Float64,l_returnflag String,l_linestatus String,l_shipdate Date,l_commitdate Date,l_receiptdate Date,l_shipinstruct String,l_shipmode String,l_comment String") + default="l_orderkey Nullable(Int64),l_partkey Nullable(Int64),l_suppkey Nullable(Int64),l_linenumber Nullable(Int64),l_quantity Nullable(Float64),l_extendedprice Nullable(Float64),l_discount Nullable(Float64),l_tax Nullable(Float64),l_returnflag Nullable(String),l_linestatus Nullable(String),l_shipdate Nullable(Date),l_commitdate Nullable(Date),l_receiptdate Nullable(Date),l_shipinstruct Nullable(String),l_shipmode Nullable(String),l_comment Nullable(String)") def get_transform_command(data_path, @@ -84,6 +86,16 @@ def multi_transform(data_path, source, schema, dst): pool.map(engine, list(data_inputs)) # process data_inputs iterable with pool +def check_version(version): + proc = subprocess.Popen(["clickhouse-local", "--version"], stdout=subprocess.PIPE, shell=False) + (out, err) = proc.communicate() + if err: + raise Exception(f"Fail to call clickhouse-local, error: {err}") + ver = re.search(r'version\s*([\d.]+)', str(out)).group(1) + ver_12 = float(ver.split('.')[0] + '.' + ver.split('.')[1]) + if ver_12 >= float(version): + raise Exception(f"Version of clickhouse-local too high({ver}), should be <= 22.5") + """ python3 parquet_to_mergetree.py --path=/root/data/tmp --source=/home/ubuntu/tpch-data-sf100/lineitem --dst=/root/data/mergetree """ @@ -92,4 +104,5 @@ def multi_transform(data_path, source, schema, dst): if not os.path.exists(args.dst): os.mkdir(args.dst) #transform(args.path, args.source, args.schema, args.dst) + check_version('22.6') multi_transform(args.path, args.source, args.schema, args.dst) From 4bc69ae8a14d99e85474fbc1e10dd1dea3d07050 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 8 Sep 2022 14:00:03 +0800 Subject: [PATCH 3/5] fix bugs 1. support Date32 in some cases 2. fix a bug in PartitionColumnFillingTransform when column's type is nullable --- .../PartitionColumnFillingTransform.cpp | 42 +++++++++++++++---- .../PartitionColumnFillingTransform.h | 1 + .../Parser/CHColumnToSparkRow.cpp | 13 +++++- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp index 55dfa75f0a62..885390811309 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.cpp @@ -6,6 +6,12 @@ #include #include #include +#include +#include +#include +#include +#include + using namespace DB; @@ -66,6 +72,18 @@ PartitionColumnFillingTransform::PartitionColumnFillingTransform( partition_column = createPartitionColumn(); } +/// In the case that a partition column is wrapper by nullable and LowCardinality, we need to keep the data type same +/// as input. +ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPtr & nested_col, DataTypePtr original_data_type) +{ + auto result = nested_col; + if (original_data_type->getTypeId() == TypeIndex::Nullable) + { + result = ColumnNullable::create(nested_col, ColumnUInt8::create()); + } + return result; +} + ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() { ColumnPtr result; @@ -81,43 +99,51 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn() WhichDataType which(nested_type); if (which.isInt8()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt16()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt32()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isInt64()) { - result = createIntPartitionColumn(partition_col_type, partition_col_value); + result = createIntPartitionColumn(nested_type, partition_col_value); } else if (which.isFloat32()) { - result = createFloatPartitionColumn(partition_col_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value); } else if (which.isFloat64()) { - result = createFloatPartitionColumn(partition_col_type, partition_col_value); + result = createFloatPartitionColumn(nested_type, partition_col_value); } else if (which.isDate()) { DayNum value; auto value_buffer = ReadBufferFromString(partition_col_value); readDateText(value, value_buffer); - result = partition_col_type->createColumnConst(1, value); + result = nested_type->createColumnConst(1, value); + } + else if (which.isDate32()) + { + ExtendedDayNum value; + auto value_buffer = ReadBufferFromString(partition_col_value); + readDateText(value, value_buffer); + result = nested_type->createColumnConst(1, value.toUnderType()); } else if (which.isString()) { - result = partition_col_type->createColumnConst(1, partition_col_value); + result = nested_type->createColumnConst(1, partition_col_value); } else { throw Exception(ErrorCodes::UNKNOWN_TYPE, "unsupported datatype {}", partition_col_type->getFamilyName()); } + result = tryWrapPartitionColumn(result, partition_col_type); return result; } diff --git a/utils/local-engine/Operator/PartitionColumnFillingTransform.h b/utils/local-engine/Operator/PartitionColumnFillingTransform.h index f3e0a606a506..65f7c0c2e0e6 100644 --- a/utils/local-engine/Operator/PartitionColumnFillingTransform.h +++ b/utils/local-engine/Operator/PartitionColumnFillingTransform.h @@ -20,6 +20,7 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform private: DB::ColumnPtr createPartitionColumn(); + static DB::ColumnPtr tryWrapPartitionColumn(const DB::ColumnPtr & nested_col, DB::DataTypePtr original_data_type); DB::DataTypePtr partition_col_type; String partition_col_name; diff --git a/utils/local-engine/Parser/CHColumnToSparkRow.cpp b/utils/local-engine/Parser/CHColumnToSparkRow.cpp index 8e726295a740..798bfe6cae59 100644 --- a/utils/local-engine/Parser/CHColumnToSparkRow.cpp +++ b/utils/local-engine/Parser/CHColumnToSparkRow.cpp @@ -1,10 +1,15 @@ #include "CHColumnToSparkRow.h" +#include #include #include #include #include #include #include +#include "DataTypes/Serializations/ISerialization.h" +#include "base/types.h" +#include + namespace DB { @@ -111,12 +116,14 @@ void writeValue( std::vector & buffer_cursor) { ColumnPtr nested_col = col.column; + const auto * nullable_column = checkAndGetColumn(*col.column); if (nullable_column) { nested_col = nullable_column->getNestedColumnPtr(); } nested_col = nested_col->convertToFullColumnIfConst(); + WhichDataType which(nested_col->getDataType()); if (which.isUInt8()) { @@ -158,6 +165,10 @@ void writeValue( { WRITE_VECTOR_COLUMN(UInt16, uint16_t, get64) } + else if (which.isDate32()) + { + WRITE_VECTOR_COLUMN(UInt32, uint32_t, getInt) + } else if (which.isString()) { const auto * string_col = checkAndGetColumn(*nested_col); @@ -182,7 +193,7 @@ void writeValue( } else { - throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support type {} convert from ch to spark" ,magic_enum::enum_name(nested_col->getDataType())); + throw Exception(ErrorCodes::UNKNOWN_TYPE, "doesn't support type {} convert from ch to spark", col.type->getName()); } } From a662ba536fc4b0e5642494af921199d5720220ba Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 8 Sep 2022 20:09:01 +0800 Subject: [PATCH 4/5] catch c++ exceptions and throw a new java exception --- utils/local-engine/CMakeLists.txt | 2 + utils/local-engine/jni/jni_common.cpp | 69 +++ utils/local-engine/jni/jni_common.h | 17 + utils/local-engine/jni/jni_error.cpp | 85 +++ utils/local-engine/jni/jni_error.h | 68 +++ utils/local-engine/jni_common.h | 55 -- utils/local-engine/local_engine_jni.cpp | 669 +++++++++++------------- 7 files changed, 560 insertions(+), 405 deletions(-) create mode 100644 utils/local-engine/jni/jni_common.cpp create mode 100644 utils/local-engine/jni/jni_common.h create mode 100644 utils/local-engine/jni/jni_error.cpp create mode 100644 utils/local-engine/jni/jni_error.h delete mode 100644 utils/local-engine/jni_common.h diff --git a/utils/local-engine/CMakeLists.txt b/utils/local-engine/CMakeLists.txt index 214bd3c786a6..514386bcc05b 100644 --- a/utils/local-engine/CMakeLists.txt +++ b/utils/local-engine/CMakeLists.txt @@ -25,6 +25,7 @@ add_headers_and_sources(common Common) add_headers_and_sources(external External) add_headers_and_sources(shuffle Shuffle) add_headers_and_sources(operator Operator) +add_headers_and_sources(jni jni) include_directories( ${JNI_INCLUDE_DIRS} @@ -48,6 +49,7 @@ add_library(${LOCALENGINE_SHARED_LIB} SHARED ${external_sources} ${shuffle_sources} ${operator_sources} + ${jni_sources} local_engine_jni.cpp) diff --git a/utils/local-engine/jni/jni_common.cpp b/utils/local-engine/jni/jni_common.cpp new file mode 100644 index 000000000000..04b53f903e05 --- /dev/null +++ b/utils/local-engine/jni/jni_common.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include + +namespace gluten +{ +jclass CreateGlobalExceptionClassReference(JNIEnv* env, const char* class_name) +{ + jclass local_class = env->FindClass(class_name); + jclass global_class = static_cast(env->NewGlobalRef(local_class)); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_msg = "Unable to createGlobalClassReference for" + std::string(class_name); + throw std::runtime_error(error_msg); + } + return global_class; +} + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) +{ + jclass local_class = env->FindClass(class_name); + jclass global_class = static_cast(env->NewGlobalRef(local_class)); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_message = + "Unable to createGlobalClassReference for" + std::string(class_name); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + return global_class; +} + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) +{ + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + + return ret; +} + +jmethodID GetStaticMethodID(JNIEnv * env, jclass this_class, const char * name, const char * sig) +{ + jmethodID ret = env->GetStaticMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find static method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + return ret; +} + +jstring charTojstring(JNIEnv* env, const char* pat) { + jclass str_class = (env)->FindClass("Ljava/lang/String;"); + jmethodID ctor_id = (env)->GetMethodID(str_class, "", "([BLjava/lang/String;)V"); + jbyteArray bytes = (env)->NewByteArray(strlen(pat)); + (env)->SetByteArrayRegion(bytes, 0, strlen(pat), reinterpret_cast(const_cast(pat))); + jstring encoding = (env)->NewStringUTF("UTF-8"); + jstring result = static_cast((env)->NewObject(str_class, ctor_id, bytes, encoding)); + env->DeleteLocalRef(bytes); + env->DeleteLocalRef(encoding); + return result; +} +} diff --git a/utils/local-engine/jni/jni_common.h b/utils/local-engine/jni/jni_common.h new file mode 100644 index 000000000000..7fe33f9fdbe5 --- /dev/null +++ b/utils/local-engine/jni/jni_common.h @@ -0,0 +1,17 @@ +#pragma once +#include + +namespace gluten +{ +jclass CreateGlobalExceptionClassReference(JNIEnv *env, const char *class_name); + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name); + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig); + +jmethodID GetStaticMethodID(JNIEnv * env, jclass this_class, const char * name, const char * sig); + +jstring charTojstring(JNIEnv* env, const char* pat); + +} + diff --git a/utils/local-engine/jni/jni_error.cpp b/utils/local-engine/jni/jni_error.cpp new file mode 100644 index 000000000000..dbe942bb26c6 --- /dev/null +++ b/utils/local-engine/jni/jni_error.cpp @@ -0,0 +1,85 @@ + +#include +#include +#include +#include +#include +#include "Common/Exception.h" +#include + +namespace gluten +{ +JniErrorsGlobalState & JniErrorsGlobalState::instance() +{ + static JniErrorsGlobalState instance; + return instance; +} + +void JniErrorsGlobalState::destroy(JNIEnv * env) +{ + if (env) + { + if (io_exception_class) + { + env->DeleteGlobalRef(io_exception_class); + } + if (runtime_exception_class) + { + env->DeleteGlobalRef(runtime_exception_class); + } + if (unsupportedoperation_exception_class) + { + env->DeleteGlobalRef(unsupportedoperation_exception_class); + } + if (illegal_access_exception_class) + { + env->DeleteGlobalRef(illegal_access_exception_class); + } + if (illegal_argument_exception_class) + { + env->DeleteGlobalRef(illegal_argument_exception_class); + } + } +} + +void JniErrorsGlobalState::initialize(JNIEnv * env_) +{ + io_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/io/IOException;"); + runtime_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/RuntimeException;"); + unsupportedoperation_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/UnsupportedOperationException;"); + illegal_access_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/IllegalArgumentException;"); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env, const DB::Exception & e) +{ + throwRuntimeException(env, e.message(), e.getStackTraceString()); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env, const std::exception & e) +{ + throwRuntimeException(env, e.what(), DB::getExceptionStackTraceString(e)); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env,jclass exception_class, const std::string & message, const std::string & stack_trace) +{ + if (exception_class) + { + std::string error_msg = message + "\n" + stack_trace; + env->ThrowNew(exception_class, error_msg.c_str()); + } + else + { + // This will cause a coredump + throw std::runtime_error("Not found java runtime exception class"); + } + +} + +void JniErrorsGlobalState::throwRuntimeException(JNIEnv * env,const std::string & message, const std::string & stack_trace) +{ + throwException(env, runtime_exception_class, message, stack_trace); +} + + +} diff --git a/utils/local-engine/jni/jni_error.h b/utils/local-engine/jni/jni_error.h new file mode 100644 index 000000000000..89452e98c564 --- /dev/null +++ b/utils/local-engine/jni/jni_error.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +namespace gluten +{ +class JniErrorsGlobalState : boost::noncopyable +{ +protected: + JniErrorsGlobalState() = default; +public: + ~JniErrorsGlobalState() = default; + + static JniErrorsGlobalState & instance(); + void initialize(JNIEnv * env_); + void destroy(JNIEnv * env); + + inline jclass getIOExceptionClass() { return io_exception_class; } + inline jclass getRuntimeExceptionClass() { return runtime_exception_class; } + inline jclass getUnsupportedOperationExceptionClass() { return unsupportedoperation_exception_class; } + inline jclass getIllegalAccessExceptionClass() { return illegal_access_exception_class; } + inline jclass getIllegalArgumentExceptionClass() { return illegal_argument_exception_class; } + + void throwException(JNIEnv * env, const DB::Exception & e); + void throwException(JNIEnv * env, const std::exception & e); + static void throwException(JNIEnv * env, jclass exception_class, const std::string & message, const std::string & stack_trace = ""); + void throwRuntimeException(JNIEnv * env, const std::string & message, const std::string & stack_trace = ""); + + +private: + jclass io_exception_class = nullptr; + jclass runtime_exception_class = nullptr; + jclass unsupportedoperation_exception_class = nullptr; + jclass illegal_access_exception_class = nullptr; + jclass illegal_argument_exception_class = nullptr; + +}; +// + +#define GLUTEN_JNI_METHOD_START \ + try { + +#define GLUTEN_JNI_METHOD_END(env, ret) \ + }\ + catch(DB::Exception & e)\ + {\ + gluten::JniErrorsGlobalState::instance().throwException(env, e);\ + return ret;\ + }\ + catch (std::exception & e)\ + {\ + gluten::JniErrorsGlobalState::instance().throwException(env, e);\ + return ret;\ + }\ + catch (...)\ + {\ + std::ostringstream ostr;\ + ostr << boost::stacktrace::stacktrace();\ + gluten::JniErrorsGlobalState::instance().throwRuntimeException(env, "Unknow Exception", ostr.str().c_str());\ + return ret;\ + } +} diff --git a/utils/local-engine/jni_common.h b/utils/local-engine/jni_common.h deleted file mode 100644 index 47c692bf095b..000000000000 --- a/utils/local-engine/jni_common.h +++ /dev/null @@ -1,55 +0,0 @@ -#pragma once - -static jclass io_exception_class; -static jclass runtime_exception_class; -static jclass unsupportedoperation_exception_class; -static jclass illegal_access_exception_class; -static jclass illegal_argument_exception_class; - -jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { - jclass local_class = env->FindClass(class_name); - jclass global_class = (jclass)env->NewGlobalRef(local_class); - env->DeleteLocalRef(local_class); - if (global_class == nullptr) { - std::string error_message = - "Unable to createGlobalClassReference for" + std::string(class_name); - env->ThrowNew(illegal_access_exception_class, error_message.c_str()); - } - return global_class; -} - -jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) { - jmethodID ret = env->GetMethodID(this_class, name, sig); - if (ret == nullptr) { - std::string error_message = "Unable to find method " + std::string(name) + - " within signature" + std::string(sig); - env->ThrowNew(illegal_access_exception_class, error_message.c_str()); - } - - return ret; -} - -jmethodID GetStaticMethodID(JNIEnv* env, jclass this_class, const char* name, - const char* sig) { - jmethodID ret = env->GetStaticMethodID(this_class, name, sig); - if (ret == nullptr) { - std::string error_message = "Unable to find static method " + std::string(name) + - " within signature" + std::string(sig); - env->ThrowNew(illegal_access_exception_class, error_message.c_str()); - } - return ret; -} - -jstring charTojstring(JNIEnv* env, const char* pat) { - jclass strClass = (env)->FindClass("Ljava/lang/String;"); - jmethodID ctorID = (env)->GetMethodID(strClass, "", "([BLjava/lang/String;)V"); - jbyteArray bytes = (env)->NewByteArray(strlen(pat)); - (env)->SetByteArrayRegion(bytes, 0, strlen(pat), (jbyte*) pat); - jstring encoding = (env)->NewStringUTF("UTF-8"); - jstring result = static_cast((env)->NewObject(strClass, ctorID, bytes, encoding)); - env->DeleteLocalRef(bytes); - env->DeleteLocalRef(encoding); - return result; -} - - diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index 5d82628fa1ca..e7cceb434d94 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -16,7 +16,10 @@ #include #include #include -#include "jni_common.h" +#include +#include +#include +#include bool inside_main = true; #ifdef __cplusplus @@ -45,39 +48,35 @@ jint JNI_OnLoad(JavaVM * vm, void * reserved) { return JNI_ERR; } - io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;"); - runtime_exception_class = CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;"); - unsupportedoperation_exception_class = CreateGlobalClassReference(env, "Ljava/lang/UnsupportedOperationException;"); - illegal_access_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); - illegal_argument_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + gluten::JniErrorsGlobalState::instance().initialize(env); - spark_row_info_class = CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;"); + spark_row_info_class = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;"); spark_row_info_constructor = env->GetMethodID(spark_row_info_class, "", "([J[JJJJ)V"); - split_result_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/SplitResult;"); - split_result_constructor = GetMethodID(env, split_result_class, "", "(JJJJJJ[J[J)V"); + split_result_class = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/SplitResult;"); + split_result_constructor = gluten::GetMethodID(env, split_result_class, "", "(JJJJJJ[J[J)V"); - local_engine::ShuffleReader::input_stream_class = CreateGlobalClassReference(env, "Ljava/io/InputStream;"); - local_engine::NativeSplitter::iterator_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); - local_engine::WriteBufferFromJavaOutputStream::output_stream_class = CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); + local_engine::ShuffleReader::input_stream_class = gluten::CreateGlobalClassReference(env, "Ljava/io/InputStream;"); + local_engine::NativeSplitter::iterator_class = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); + local_engine::WriteBufferFromJavaOutputStream::output_stream_class = gluten::CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class - = CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); + = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); local_engine::ShuffleReader::input_stream_read = env->GetMethodID(local_engine::ShuffleReader::input_stream_class, "read", "([B)I"); - local_engine::NativeSplitter::iterator_has_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); - local_engine::NativeSplitter::iterator_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); + local_engine::NativeSplitter::iterator_has_next = gluten::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); + local_engine::NativeSplitter::iterator_next = gluten::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); local_engine::WriteBufferFromJavaOutputStream::output_stream_write - = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); + = gluten::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); local_engine::WriteBufferFromJavaOutputStream::output_stream_flush - = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); + = gluten::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext - = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); + = gluten::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next - = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); + = gluten::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); local_engine::JNIUtils::vm = vm; return JNI_VERSION_1_8; } @@ -86,11 +85,7 @@ void JNI_OnUnload(JavaVM * vm, void * reserved) { JNIEnv * env; vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8); - env->DeleteGlobalRef(io_exception_class); - env->DeleteGlobalRef(runtime_exception_class); - env->DeleteGlobalRef(unsupportedoperation_exception_class); - env->DeleteGlobalRef(illegal_access_exception_class); - env->DeleteGlobalRef(illegal_argument_exception_class); + gluten::JniErrorsGlobalState::instance().destroy(env); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class); env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); @@ -106,146 +101,116 @@ void JNI_OnUnload(JavaVM * vm, void * reserved) } //static SharedContextHolder shared_context; -void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv *, jobject, jbyteArray) +[[ noreturn ]] void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray) { - try - { - init(); - } - catch (const DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + init(); + GLUTEN_JNI_METHOD_END(env, ) } jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithRowIterator( JNIEnv * env, jobject obj, jbyteArray plan) { - try - { - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto * executor = createExecutor(plan_string); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto * executor = createExecutor(plan_string); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator( JNIEnv * env, jobject obj, jlong, jbyteArray plan, jobjectArray iter_arr) { - try + + GLUTEN_JNI_METHOD_START + auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(context); + jsize iter_num = env->GetArrayLength(iter_arr); + for (jsize i = 0; i < iter_num; i++) { - auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); - local_engine::SerializedPlanParser parser(context); - jsize iter_num = env->GetArrayLength(iter_arr); - for (jsize i = 0; i < iter_num; i++) - { - jobject iter = env->GetObjectArrayElement(iter_arr, i); - iter = env->NewGlobalRef(iter); - parser.addInputIter(iter); - } - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto query_plan = parser.parse(plan_string); - local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); - executor->execute(std::move(query_plan)); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); + jobject iter = env->GetObjectArrayElement(iter_arr, i); + iter = env->NewGlobalRef(iter); + parser.addInputIter(iter); } + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto query_plan = parser.parse(plan_string); + local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); + executor->execute(std::move(query_plan)); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + GLUTEN_JNI_METHOD_END(env, -1) } jboolean Java_io_glutenproject_row_RowIterator_nativeHasNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - return executor->hasNext(); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + return executor->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } jobject Java_io_glutenproject_row_RowIterator_nativeNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - local_engine::SparkRowInfoPtr spark_row_info = executor->next(); + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + local_engine::SparkRowInfoPtr spark_row_info = executor->next(); - auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * offsets_src = reinterpret_cast(spark_row_info->getOffsets().data()); - env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); - auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * lengths_src = reinterpret_cast(spark_row_info->getLengths().data()); - env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); - int64_t address = reinterpret_cast(spark_row_info->getBufferAddress()); - int64_t column_number = reinterpret_cast(spark_row_info->getNumCols()); - int64_t total_size = reinterpret_cast(spark_row_info->getTotalBytes()); + auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); + const auto * offsets_src = reinterpret_cast(spark_row_info->getOffsets().data()); + env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); + auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); + const auto * lengths_src = reinterpret_cast(spark_row_info->getLengths().data()); + env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); + int64_t address = reinterpret_cast(spark_row_info->getBufferAddress()); + int64_t column_number = reinterpret_cast(spark_row_info->getNumCols()); + int64_t total_size = reinterpret_cast(spark_row_info->getTotalBytes()); - jobject spark_row_info_object = env->NewObject( - spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); + jobject spark_row_info_object + = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); - return spark_row_info_object; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + return spark_row_info_object; + GLUTEN_JNI_METHOD_END(env, nullptr) } void Java_io_glutenproject_row_RowIterator_nativeClose(JNIEnv * env, jobject obj, jlong executor_address) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); delete executor; + GLUTEN_JNI_METHOD_END(env,) } // Columnar Iterator jboolean Java_io_glutenproject_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - return executor->hasNext(); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + return executor->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } jlong Java_io_glutenproject_vectorized_BatchIterator_nativeCHNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - Block * column_batch = executor->nextColumnar(); - return reinterpret_cast(column_batch); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + Block * column_batch = executor->nextColumnar(); + return reinterpret_cast(column_batch); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject obj, jlong executor_address) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); delete executor; + GLUTEN_JNI_METHOD_END(env,) } @@ -263,73 +228,61 @@ void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMet ColumnWithTypeAndName inline getColumnFromColumnVector(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try - { - Block * block = reinterpret_cast(block_address); - return block->getByPosition(column_position); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + Block * block = reinterpret_cast(block_address); + return block->getByPosition(column_position); + GLUTEN_JNI_METHOD_END(env,{}) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try + GLUTEN_JNI_METHOD_START + Block * block = reinterpret_cast(block_address); + auto col = getColumnFromColumnVector(env, obj, block_address, column_position); + if (!col.column->isNullable()) { - Block * block = reinterpret_cast(block_address); - auto col = getColumnFromColumnVector(env, obj, block_address, column_position); - if (!col.column->isNullable()) - { - return false; - } - else - { - auto * nullable = checkAndGetColumn(*col.column); - size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); - return num_nulls < block->rows(); - } + return false; } - catch (DB::Exception & e) + else { - local_engine::ExceptionUtils::handleException(e); + const auto * nullable = checkAndGetColumn(*col.column); + size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); + return num_nulls < block->rows(); } + GLUTEN_JNI_METHOD_END(env,false) } jint Java_io_glutenproject_vectorized_CHColumnVector_nativeNumNulls(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try + GLUTEN_JNI_METHOD_START + auto col = getColumnFromColumnVector(env, obj, block_address, column_position); + if (!col.column->isNullable()) { - auto col = getColumnFromColumnVector(env, obj, block_address, column_position); - if (!col.column->isNullable()) - { - return 0; - } - else - { - auto * nullable = checkAndGetColumn(*col.column); - return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); - } + return 0; } - catch (DB::Exception & e) + else { - local_engine::ExceptionUtils::handleException(e); + const auto * nullable = checkAndGetColumn(*col.column); + return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); } + GLUTEN_JNI_METHOD_END(env, -1) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeIsNullAt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); return col.column->isNullAt(row_id); + GLUTEN_JNI_METHOD_END(env, false) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -337,11 +290,13 @@ jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getBool(row_id); + GLUTEN_JNI_METHOD_END(env, false) } jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -349,11 +304,13 @@ jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte( nested_col = nullable_col->getNestedColumnPtr(); } return reinterpret_cast(nested_col->getDataAt(row_id).data)[0]; + GLUTEN_JNI_METHOD_END(env, 0) } jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -361,11 +318,13 @@ jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort( nested_col = nullable_col->getNestedColumnPtr(); } return reinterpret_cast(nested_col->getDataAt(row_id).data)[0]; + GLUTEN_JNI_METHOD_END(env, -1) } jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -380,11 +339,13 @@ jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt( { return nested_col->getInt(row_id); } + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -392,11 +353,13 @@ jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getInt(row_id); + GLUTEN_JNI_METHOD_END(env, -1) } jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -404,11 +367,13 @@ jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getFloat32(row_id); + GLUTEN_JNI_METHOD_END(env, 0.0) } jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -416,11 +381,13 @@ jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getFloat64(row_id); + GLUTEN_JNI_METHOD_END(env, 0.0) } jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -429,7 +396,8 @@ jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString( } const ColumnString * string_col = checkAndGetColumn(nested_col.get()); auto result = string_col->getDataAt(row_id); - return charTojstring(env, result.toString().c_str()); + return gluten::charTojstring(env, result.toString().c_str()); + GLUTEN_JNI_METHOD_END(env, gluten::charTojstring(env, "")) } // native block @@ -442,18 +410,23 @@ void Java_io_glutenproject_vectorized_CHNativeBlock_nativeClose(JNIEnv * env, jo jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * env, jobject obj, jlong block_address) { + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); return block->rows(); + GLUTEN_JNI_METHOD_END(env, -1) } jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * env, jobject obj, jlong block_address) { + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); return block->columns(); + GLUTEN_JNI_METHOD_END(env, -1) } jstring Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * env, jobject obj, jlong block_address, jint position) -{ +{ + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); WhichDataType which(block->getByPosition(position).type); std::string type; @@ -518,144 +491,116 @@ jstring Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * throw std::runtime_error("unsupported datatype " + type_name); } - return charTojstring(env, type.c_str()); + return gluten::charTojstring(env, type.c_str()); + GLUTEN_JNI_METHOD_END(env, gluten::charTojstring(env, "")) } jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * env, jobject obj, jlong block_address) { + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); return block->bytes(); + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader(JNIEnv * env, jclass clazz, jobject input_stream, jboolean compressed) { - try - { - auto input = env->NewGlobalRef(input_stream); - auto read_buffer = std::make_unique(input); - auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed); - return reinterpret_cast(shuffle_reader); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + auto * input = env->NewGlobalRef(input_stream); + auto read_buffer = std::make_unique(input); + auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed); + return reinterpret_cast(shuffle_reader); + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * env, jobject obj, jlong shuffle_reader) { -// try -// { - local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); - Block * block = reader->read(); - return reinterpret_cast(block); -// } -// catch (DB::Exception & e) -// { -// local_engine::ExceptionUtils::handleException(e); -// } + GLUTEN_JNI_METHOD_START + local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); + Block * block = reader->read(); + return reinterpret_cast(block); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject obj, jlong shuffle_reader) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); delete reader; + GLUTEN_JNI_METHOD_END(env,) } // CHCoalesceOperator jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv * env, jobject obj, jint buf_size) { - try - { - local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size); - return reinterpret_cast(instance); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size); + return reinterpret_cast(instance); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeMergeBlock( JNIEnv * env, jobject obj, jlong instance_address, jlong block_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - DB::Block * block = reinterpret_cast(block_address); - auto new_block = DB::Block(*block); - instance->mergeBlock(new_block); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + DB::Block * block = reinterpret_cast(block_address); + auto new_block = DB::Block(*block); + instance->mergeBlock(new_block); + GLUTEN_JNI_METHOD_END(env,) } jboolean Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * env, jobject obj, jlong instance_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - bool full = instance->isFull(); - return full ? JNI_TRUE : JNI_FALSE; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + bool full = instance->isFull(); + return full ? JNI_TRUE : JNI_FALSE; + GLUTEN_JNI_METHOD_END(env, false) } jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv * env, jobject obj, jlong instance_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - auto block = instance->releaseBlock(); - DB::Block * new_block = new DB::Block(); - new_block->swap(block); - long address = reinterpret_cast(new_block); - return address; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + auto block = instance->releaseBlock(); + DB::Block * new_block = new DB::Block(); + new_block->swap(block); + Int64 address = reinterpret_cast(new_block); + return address; + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * env, jobject obj, jlong instance_address) { + GLUTEN_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); delete instance; + GLUTEN_JNI_METHOD_END(env,) } std::string jstring2string(JNIEnv * env, jstring jStr) { - try - { - if (!jStr) - return ""; + GLUTEN_JNI_METHOD_START + if (!jStr) + return ""; - const jclass stringClass = env->GetObjectClass(jStr); - const jmethodID getBytes = env->GetMethodID(stringClass, "getBytes", "(Ljava/lang/String;)[B"); - const jbyteArray stringJbytes = static_cast(env->CallObjectMethod(jStr, getBytes, env->NewStringUTF("UTF-8"))); + auto * string_class = env->GetObjectClass(jStr); + auto * get_bytes = env->GetMethodID(string_class, "getBytes", "(Ljava/lang/String;)[B"); + auto * string_jbytes = static_cast(env->CallObjectMethod(jStr, get_bytes, env->NewStringUTF("UTF-8"))); - size_t length = static_cast(env->GetArrayLength(stringJbytes)); - jbyte * pBytes = env->GetByteArrayElements(stringJbytes, nullptr); + size_t length = static_cast(env->GetArrayLength(string_jbytes)); + jbyte * pbytes = env->GetByteArrayElements(string_jbytes, nullptr); - std::string ret = std::string(reinterpret_cast(pBytes), length); - env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT); + std::string ret = std::string(reinterpret_cast(pbytes), length); + env->ReleaseByteArrayElements(string_jbytes, pbytes, JNI_ABORT); - env->DeleteLocalRef(stringJbytes); - env->DeleteLocalRef(stringClass); - return ret; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + env->DeleteLocalRef(string_jbytes); + env->DeleteLocalRef(string_class); + return ret; + GLUTEN_JNI_METHOD_END(env,"") } std::vector stringSplit(const std::string & str, char delim) @@ -672,6 +617,7 @@ std::vector stringSplit(const std::string & str, char delim) { local_engine::ExceptionUtils::handleException(e); } + __builtin_unreachable(); } @@ -688,98 +634,87 @@ jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMake( jstring data_file, jstring local_dirs) { - try - { - std::vector expr_vec; - if (expr_list != nullptr) + GLUTEN_JNI_METHOD_START + std::vector expr_vec; + if (expr_list != nullptr) + { + int len = env->GetArrayLength(expr_list); + auto * str = reinterpret_cast(new char[len]); + memset(str, 0, len); + env->GetByteArrayRegion(expr_list, 0, len, str); + std::string exprs(str, str + len); + delete[] str; + for (const auto & expr : stringSplit(exprs, ',')) { - int len = env->GetArrayLength(expr_list); - auto * str = reinterpret_cast(new char[len]); - memset(str, 0, len); - env->GetByteArrayRegion(expr_list, 0, len, str); - std::string exprs(str, str + len); - delete[] str; - for (const auto & expr : stringSplit(exprs, ',')) - { - expr_vec.emplace_back(expr); - } + expr_vec.emplace_back(expr); } - local_engine::SplitOptions options{ - .buffer_size = static_cast(buffer_size), - .data_file = jstring2string(env, data_file), - .local_tmp_dir = jstring2string(env, local_dirs), - .map_id = static_cast(map_id), - .partition_nums = static_cast(num_partitions), - .exprs = expr_vec, - .compress_method = jstring2string(env, codec)}; - local_engine::SplitterHolder * splitter - = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(jstring2string(env, short_name), options)}; - return reinterpret_cast(splitter); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); } + local_engine::SplitOptions options{ + .buffer_size = static_cast(buffer_size), + .data_file = jstring2string(env, data_file), + .local_tmp_dir = jstring2string(env, local_dirs), + .map_id = static_cast(map_id), + .partition_nums = static_cast(num_partitions), + .exprs = expr_vec, + .compress_method = jstring2string(env, codec)}; + local_engine::SplitterHolder * splitter + = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(jstring2string(env, short_name), options)}; + return reinterpret_cast(splitter); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv *, jobject, jlong splitterId, jint, jlong block) +void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv * env, jobject, jlong splitterId, jint, jlong block) { - try - { - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - Block * data = reinterpret_cast(block); - splitter->splitter->split(*data); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); + Block * data = reinterpret_cast(block); + splitter->splitter->split(*data); + GLUTEN_JNI_METHOD_END(env,) } jobject Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId) { - try - { - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - auto result = splitter->splitter->stop(); - const auto & partition_lengths = result.partition_length; - auto partition_length_arr = env->NewLongArray(partition_lengths.size()); - auto src = reinterpret_cast(partition_lengths.data()); - env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); - - const auto & raw_partition_lengths = result.raw_partition_length; - auto raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); - auto raw_src = reinterpret_cast(raw_partition_lengths.data()); - env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); - - jobject split_result = env->NewObject( - split_result_class, - split_result_constructor, - result.total_compute_pid_time, - result.total_write_time, - result.total_spill_time, - 0, - result.total_bytes_written, - result.total_bytes_written, - partition_length_arr, - raw_partition_length_arr); - - return split_result; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } -} -void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv *, jobject, jlong splitterId) -{ + GLUTEN_JNI_METHOD_START + local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); + auto result = splitter->splitter->stop(); + const auto & partition_lengths = result.partition_length; + auto partition_length_arr = env->NewLongArray(partition_lengths.size()); + auto src = reinterpret_cast(partition_lengths.data()); + env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); + + const auto & raw_partition_lengths = result.raw_partition_length; + auto raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); + auto raw_src = reinterpret_cast(raw_partition_lengths.data()); + env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); + + jobject split_result = env->NewObject( + split_result_class, + split_result_constructor, + result.total_compute_pid_time, + result.total_write_time, + result.total_spill_time, + 0, + result.total_bytes_written, + result.total_bytes_written, + partition_length_arr, + raw_partition_length_arr); + + return split_result; + GLUTEN_JNI_METHOD_END(env, nullptr) +} + +void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv * env, jobject, jlong splitterId) +{ + GLUTEN_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); delete splitter; + GLUTEN_JNI_METHOD_END(env,) } // BlockNativeConverter jobject Java_io_glutenproject_vectorized_BlockNativeConverter_converColumarToRow(JNIEnv * env, jobject, jlong block_address) { + GLUTEN_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; Block * block = reinterpret_cast(block_address); auto spark_row_info = converter.convertCHColumnToSparkRow(*block); @@ -798,52 +733,66 @@ jobject Java_io_glutenproject_vectorized_BlockNativeConverter_converColumarToRow = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); return spark_row_info_object; + GLUTEN_JNI_METHOD_END(env, nullptr) } -void Java_io_glutenproject_vectorized_BlockNativeConverter_freeMemory(JNIEnv *, jobject, jlong address, jlong size) +void Java_io_glutenproject_vectorized_BlockNativeConverter_freeMemory(JNIEnv * env, jobject, jlong address, jlong size) { + GLUTEN_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; converter.freeMem(reinterpret_cast(address), size); + GLUTEN_JNI_METHOD_END(env,) } // BlockNativeWriter -jlong Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCreateInstance(JNIEnv *, jobject) +jlong Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCreateInstance(JNIEnv * env, jobject) { + GLUTEN_JNI_METHOD_START auto * writer = new local_engine::NativeWriterInMemory(); return reinterpret_cast(writer); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeWrite(JNIEnv *, jobject, jlong instance, jlong block_address) +void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); auto * block = reinterpret_cast(block_address); writer->write(*block); + GLUTEN_JNI_METHOD_END(env,) } -jint Java_io_glutenproject_vectorized_BlockNativeWriter_nativeResultSize(JNIEnv *, jobject, jlong instance) +jint Java_io_glutenproject_vectorized_BlockNativeWriter_nativeResultSize(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); return static_cast(writer->collect().size()); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCollect(JNIEnv * env, jobject, jlong instance, jbyteArray result) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); auto data = writer->collect(); env->SetByteArrayRegion(result, 0, data.size(), reinterpret_cast(data.data())); + GLUTEN_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeClose(JNIEnv *, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeClose(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); delete writer; + GLUTEN_JNI_METHOD_END(env,) } void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( JNIEnv * env, jobject, jstring hash_table_id_, jobject in, jstring join_key_, jstring join_type_, jbyteArray named_struct) { + GLUTEN_JNI_METHOD_START auto * input = env->NewGlobalRef(in); auto read_buffer = std::make_unique(input); auto hash_table_id = jstring2string(env, hash_table_id_); @@ -855,12 +804,14 @@ void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( struct_string.assign(reinterpret_cast(struct_address), struct_size); local_engine::BroadCastJoinBuilder::buildJoinIfNotExist(hash_table_id, std::move(read_buffer), join_key, join_type, struct_string); env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT); + GLUTEN_JNI_METHOD_END(env,) } // BlockSplitIterator jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jint partition_num, jint buffer_size) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Options options; options.partition_nums = partition_num; options.buffer_size = buffer_size; @@ -870,102 +821,120 @@ jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{ .splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)}; return reinterpret_cast(splitter); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); delete splitter; + GLUTEN_JNI_METHOD_END(env,) } -jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * /*env*/, jobject, jlong instance) +jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return splitter->splitter->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } -jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * /*env*/, jobject, jlong instance) +jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return reinterpret_cast(splitter->splitter->next()); + GLUTEN_JNI_METHOD_END(env, false) } -jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * /*env*/, jobject, jlong instance) +jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return reinterpret_cast(splitter->splitter->nextPartitionId()); + GLUTEN_JNI_METHOD_END(env, -1) } // BlockOutputStream -jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(JNIEnv * /*env*/, jobject, jobject output_stream, jbyteArray buffer) +jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = new local_engine::ShuffleWriter(output_stream, buffer); return reinterpret_cast(writer); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); writer->flush(); delete writer; + GLUTEN_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * /*env*/, jobject, jlong instance, jlong block_address) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); DB::Block * block = reinterpret_cast(block_address); writer->write(*block); + GLUTEN_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); writer->flush(); + GLUTEN_JNI_METHOD_END(env,) } // SimpleExpressionEval jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass , jobject input, jbyteArray plan) { - try - { - auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); - local_engine::SerializedPlanParser parser(context); - jobject iter = env->NewGlobalRef(input); - parser.addInputIter(iter); - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto query_plan = parser.parse(plan_string); - local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); - executor->execute(std::move(query_plan)); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(context); + jobject iter = env->NewGlobalRef(input); + parser.addInputIter(iter); + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto query_plan = parser.parse(plan_string); + local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); + executor->execute(std::move(query_plan)); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass , jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); delete executor; + GLUTEN_JNI_METHOD_END(env,) } jboolean Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * env, jclass , jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); return executor->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * env, jclass , jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); return reinterpret_cast(executor->nextColumnar()); + GLUTEN_JNI_METHOD_END(env, -1) } #ifdef __cplusplus } From 5f6c2660560e6adcbded6d76d7e6a5d6cf225c64 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 8 Sep 2022 20:09:01 +0800 Subject: [PATCH 5/5] catch c++ exceptions and throw a new java exception --- utils/local-engine/CMakeLists.txt | 2 + .../Parser/SerializedPlanParser.cpp | 15 +- utils/local-engine/jni/jni_common.cpp | 69 ++ utils/local-engine/jni/jni_common.h | 17 + utils/local-engine/jni/jni_error.cpp | 85 +++ utils/local-engine/jni/jni_error.h | 68 ++ utils/local-engine/jni_common.h | 55 -- utils/local-engine/local_engine_jni.cpp | 669 +++++++++--------- 8 files changed, 574 insertions(+), 406 deletions(-) create mode 100644 utils/local-engine/jni/jni_common.cpp create mode 100644 utils/local-engine/jni/jni_common.h create mode 100644 utils/local-engine/jni/jni_error.cpp create mode 100644 utils/local-engine/jni/jni_error.h delete mode 100644 utils/local-engine/jni_common.h diff --git a/utils/local-engine/CMakeLists.txt b/utils/local-engine/CMakeLists.txt index 214bd3c786a6..514386bcc05b 100644 --- a/utils/local-engine/CMakeLists.txt +++ b/utils/local-engine/CMakeLists.txt @@ -25,6 +25,7 @@ add_headers_and_sources(common Common) add_headers_and_sources(external External) add_headers_and_sources(shuffle Shuffle) add_headers_and_sources(operator Operator) +add_headers_and_sources(jni jni) include_directories( ${JNI_INCLUDE_DIRS} @@ -48,6 +49,7 @@ add_library(${LOCALENGINE_SHARED_LIB} SHARED ${external_sources} ${shuffle_sources} ${operator_sources} + ${jni_sources} local_engine_jni.cpp) diff --git a/utils/local-engine/Parser/SerializedPlanParser.cpp b/utils/local-engine/Parser/SerializedPlanParser.cpp index 5c828150c144..7c87d40fac1d 100644 --- a/utils/local-engine/Parser/SerializedPlanParser.cpp +++ b/utils/local-engine/Parser/SerializedPlanParser.cpp @@ -41,6 +41,8 @@ #include #include +#include + #include using namespace DB; @@ -1278,7 +1280,18 @@ QueryPlanPtr SerializedPlanParser::parse(std::string & plan) { auto plan_ptr = std::make_unique(); plan_ptr->ParseFromString(plan); - LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", plan_ptr->DebugString()); + + auto printPlan = [](const std::string & plan_raw){ + substrait::Plan plan; + plan.ParseFromString(plan_raw); + std::string json_ret; + google::protobuf::util::JsonPrintOptions json_opt; + json_opt.add_whitespace = true; + google::protobuf::util::MessageToJsonString(plan, &json_ret, json_opt); + return json_ret; + }; + + LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", printPlan(plan)); return parse(std::move(plan_ptr)); } void SerializedPlanParser::initFunctionEnv() diff --git a/utils/local-engine/jni/jni_common.cpp b/utils/local-engine/jni/jni_common.cpp new file mode 100644 index 000000000000..04b53f903e05 --- /dev/null +++ b/utils/local-engine/jni/jni_common.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include + +namespace gluten +{ +jclass CreateGlobalExceptionClassReference(JNIEnv* env, const char* class_name) +{ + jclass local_class = env->FindClass(class_name); + jclass global_class = static_cast(env->NewGlobalRef(local_class)); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_msg = "Unable to createGlobalClassReference for" + std::string(class_name); + throw std::runtime_error(error_msg); + } + return global_class; +} + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) +{ + jclass local_class = env->FindClass(class_name); + jclass global_class = static_cast(env->NewGlobalRef(local_class)); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_message = + "Unable to createGlobalClassReference for" + std::string(class_name); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + return global_class; +} + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) +{ + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + + return ret; +} + +jmethodID GetStaticMethodID(JNIEnv * env, jclass this_class, const char * name, const char * sig) +{ + jmethodID ret = env->GetStaticMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find static method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(JniErrorsGlobalState::instance().getIllegalAccessExceptionClass(), error_message.c_str()); + } + return ret; +} + +jstring charTojstring(JNIEnv* env, const char* pat) { + jclass str_class = (env)->FindClass("Ljava/lang/String;"); + jmethodID ctor_id = (env)->GetMethodID(str_class, "", "([BLjava/lang/String;)V"); + jbyteArray bytes = (env)->NewByteArray(strlen(pat)); + (env)->SetByteArrayRegion(bytes, 0, strlen(pat), reinterpret_cast(const_cast(pat))); + jstring encoding = (env)->NewStringUTF("UTF-8"); + jstring result = static_cast((env)->NewObject(str_class, ctor_id, bytes, encoding)); + env->DeleteLocalRef(bytes); + env->DeleteLocalRef(encoding); + return result; +} +} diff --git a/utils/local-engine/jni/jni_common.h b/utils/local-engine/jni/jni_common.h new file mode 100644 index 000000000000..7fe33f9fdbe5 --- /dev/null +++ b/utils/local-engine/jni/jni_common.h @@ -0,0 +1,17 @@ +#pragma once +#include + +namespace gluten +{ +jclass CreateGlobalExceptionClassReference(JNIEnv *env, const char *class_name); + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name); + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig); + +jmethodID GetStaticMethodID(JNIEnv * env, jclass this_class, const char * name, const char * sig); + +jstring charTojstring(JNIEnv* env, const char* pat); + +} + diff --git a/utils/local-engine/jni/jni_error.cpp b/utils/local-engine/jni/jni_error.cpp new file mode 100644 index 000000000000..dbe942bb26c6 --- /dev/null +++ b/utils/local-engine/jni/jni_error.cpp @@ -0,0 +1,85 @@ + +#include +#include +#include +#include +#include +#include "Common/Exception.h" +#include + +namespace gluten +{ +JniErrorsGlobalState & JniErrorsGlobalState::instance() +{ + static JniErrorsGlobalState instance; + return instance; +} + +void JniErrorsGlobalState::destroy(JNIEnv * env) +{ + if (env) + { + if (io_exception_class) + { + env->DeleteGlobalRef(io_exception_class); + } + if (runtime_exception_class) + { + env->DeleteGlobalRef(runtime_exception_class); + } + if (unsupportedoperation_exception_class) + { + env->DeleteGlobalRef(unsupportedoperation_exception_class); + } + if (illegal_access_exception_class) + { + env->DeleteGlobalRef(illegal_access_exception_class); + } + if (illegal_argument_exception_class) + { + env->DeleteGlobalRef(illegal_argument_exception_class); + } + } +} + +void JniErrorsGlobalState::initialize(JNIEnv * env_) +{ + io_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/io/IOException;"); + runtime_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/RuntimeException;"); + unsupportedoperation_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/UnsupportedOperationException;"); + illegal_access_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = CreateGlobalExceptionClassReference(env_, "Ljava/lang/IllegalArgumentException;"); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env, const DB::Exception & e) +{ + throwRuntimeException(env, e.message(), e.getStackTraceString()); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env, const std::exception & e) +{ + throwRuntimeException(env, e.what(), DB::getExceptionStackTraceString(e)); +} + +void JniErrorsGlobalState::throwException(JNIEnv * env,jclass exception_class, const std::string & message, const std::string & stack_trace) +{ + if (exception_class) + { + std::string error_msg = message + "\n" + stack_trace; + env->ThrowNew(exception_class, error_msg.c_str()); + } + else + { + // This will cause a coredump + throw std::runtime_error("Not found java runtime exception class"); + } + +} + +void JniErrorsGlobalState::throwRuntimeException(JNIEnv * env,const std::string & message, const std::string & stack_trace) +{ + throwException(env, runtime_exception_class, message, stack_trace); +} + + +} diff --git a/utils/local-engine/jni/jni_error.h b/utils/local-engine/jni/jni_error.h new file mode 100644 index 000000000000..89452e98c564 --- /dev/null +++ b/utils/local-engine/jni/jni_error.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +namespace gluten +{ +class JniErrorsGlobalState : boost::noncopyable +{ +protected: + JniErrorsGlobalState() = default; +public: + ~JniErrorsGlobalState() = default; + + static JniErrorsGlobalState & instance(); + void initialize(JNIEnv * env_); + void destroy(JNIEnv * env); + + inline jclass getIOExceptionClass() { return io_exception_class; } + inline jclass getRuntimeExceptionClass() { return runtime_exception_class; } + inline jclass getUnsupportedOperationExceptionClass() { return unsupportedoperation_exception_class; } + inline jclass getIllegalAccessExceptionClass() { return illegal_access_exception_class; } + inline jclass getIllegalArgumentExceptionClass() { return illegal_argument_exception_class; } + + void throwException(JNIEnv * env, const DB::Exception & e); + void throwException(JNIEnv * env, const std::exception & e); + static void throwException(JNIEnv * env, jclass exception_class, const std::string & message, const std::string & stack_trace = ""); + void throwRuntimeException(JNIEnv * env, const std::string & message, const std::string & stack_trace = ""); + + +private: + jclass io_exception_class = nullptr; + jclass runtime_exception_class = nullptr; + jclass unsupportedoperation_exception_class = nullptr; + jclass illegal_access_exception_class = nullptr; + jclass illegal_argument_exception_class = nullptr; + +}; +// + +#define GLUTEN_JNI_METHOD_START \ + try { + +#define GLUTEN_JNI_METHOD_END(env, ret) \ + }\ + catch(DB::Exception & e)\ + {\ + gluten::JniErrorsGlobalState::instance().throwException(env, e);\ + return ret;\ + }\ + catch (std::exception & e)\ + {\ + gluten::JniErrorsGlobalState::instance().throwException(env, e);\ + return ret;\ + }\ + catch (...)\ + {\ + std::ostringstream ostr;\ + ostr << boost::stacktrace::stacktrace();\ + gluten::JniErrorsGlobalState::instance().throwRuntimeException(env, "Unknow Exception", ostr.str().c_str());\ + return ret;\ + } +} diff --git a/utils/local-engine/jni_common.h b/utils/local-engine/jni_common.h deleted file mode 100644 index 47c692bf095b..000000000000 --- a/utils/local-engine/jni_common.h +++ /dev/null @@ -1,55 +0,0 @@ -#pragma once - -static jclass io_exception_class; -static jclass runtime_exception_class; -static jclass unsupportedoperation_exception_class; -static jclass illegal_access_exception_class; -static jclass illegal_argument_exception_class; - -jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { - jclass local_class = env->FindClass(class_name); - jclass global_class = (jclass)env->NewGlobalRef(local_class); - env->DeleteLocalRef(local_class); - if (global_class == nullptr) { - std::string error_message = - "Unable to createGlobalClassReference for" + std::string(class_name); - env->ThrowNew(illegal_access_exception_class, error_message.c_str()); - } - return global_class; -} - -jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) { - jmethodID ret = env->GetMethodID(this_class, name, sig); - if (ret == nullptr) { - std::string error_message = "Unable to find method " + std::string(name) + - " within signature" + std::string(sig); - env->ThrowNew(illegal_access_exception_class, error_message.c_str()); - } - - return ret; -} - -jmethodID GetStaticMethodID(JNIEnv* env, jclass this_class, const char* name, - const char* sig) { - jmethodID ret = env->GetStaticMethodID(this_class, name, sig); - if (ret == nullptr) { - std::string error_message = "Unable to find static method " + std::string(name) + - " within signature" + std::string(sig); - env->ThrowNew(illegal_access_exception_class, error_message.c_str()); - } - return ret; -} - -jstring charTojstring(JNIEnv* env, const char* pat) { - jclass strClass = (env)->FindClass("Ljava/lang/String;"); - jmethodID ctorID = (env)->GetMethodID(strClass, "", "([BLjava/lang/String;)V"); - jbyteArray bytes = (env)->NewByteArray(strlen(pat)); - (env)->SetByteArrayRegion(bytes, 0, strlen(pat), (jbyte*) pat); - jstring encoding = (env)->NewStringUTF("UTF-8"); - jstring result = static_cast((env)->NewObject(strClass, ctorID, bytes, encoding)); - env->DeleteLocalRef(bytes); - env->DeleteLocalRef(encoding); - return result; -} - - diff --git a/utils/local-engine/local_engine_jni.cpp b/utils/local-engine/local_engine_jni.cpp index 5d82628fa1ca..e7cceb434d94 100644 --- a/utils/local-engine/local_engine_jni.cpp +++ b/utils/local-engine/local_engine_jni.cpp @@ -16,7 +16,10 @@ #include #include #include -#include "jni_common.h" +#include +#include +#include +#include bool inside_main = true; #ifdef __cplusplus @@ -45,39 +48,35 @@ jint JNI_OnLoad(JavaVM * vm, void * reserved) { return JNI_ERR; } - io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;"); - runtime_exception_class = CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;"); - unsupportedoperation_exception_class = CreateGlobalClassReference(env, "Ljava/lang/UnsupportedOperationException;"); - illegal_access_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); - illegal_argument_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + gluten::JniErrorsGlobalState::instance().initialize(env); - spark_row_info_class = CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;"); + spark_row_info_class = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;"); spark_row_info_constructor = env->GetMethodID(spark_row_info_class, "", "([J[JJJJ)V"); - split_result_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/SplitResult;"); - split_result_constructor = GetMethodID(env, split_result_class, "", "(JJJJJJ[J[J)V"); + split_result_class = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/SplitResult;"); + split_result_constructor = gluten::GetMethodID(env, split_result_class, "", "(JJJJJJ[J[J)V"); - local_engine::ShuffleReader::input_stream_class = CreateGlobalClassReference(env, "Ljava/io/InputStream;"); - local_engine::NativeSplitter::iterator_class = CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); - local_engine::WriteBufferFromJavaOutputStream::output_stream_class = CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); + local_engine::ShuffleReader::input_stream_class = gluten::CreateGlobalClassReference(env, "Ljava/io/InputStream;"); + local_engine::NativeSplitter::iterator_class = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); + local_engine::WriteBufferFromJavaOutputStream::output_stream_class = gluten::CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class - = CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); + = gluten::CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); local_engine::ShuffleReader::input_stream_read = env->GetMethodID(local_engine::ShuffleReader::input_stream_class, "read", "([B)I"); - local_engine::NativeSplitter::iterator_has_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); - local_engine::NativeSplitter::iterator_next = GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); + local_engine::NativeSplitter::iterator_has_next = gluten::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); + local_engine::NativeSplitter::iterator_next = gluten::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); local_engine::WriteBufferFromJavaOutputStream::output_stream_write - = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); + = gluten::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); local_engine::WriteBufferFromJavaOutputStream::output_stream_flush - = GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); + = gluten::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext - = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); + = gluten::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next - = GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); + = gluten::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); local_engine::JNIUtils::vm = vm; return JNI_VERSION_1_8; } @@ -86,11 +85,7 @@ void JNI_OnUnload(JavaVM * vm, void * reserved) { JNIEnv * env; vm->GetEnv(reinterpret_cast(&env), JNI_VERSION_1_8); - env->DeleteGlobalRef(io_exception_class); - env->DeleteGlobalRef(runtime_exception_class); - env->DeleteGlobalRef(unsupportedoperation_exception_class); - env->DeleteGlobalRef(illegal_access_exception_class); - env->DeleteGlobalRef(illegal_argument_exception_class); + gluten::JniErrorsGlobalState::instance().destroy(env); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class); env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); @@ -106,146 +101,116 @@ void JNI_OnUnload(JavaVM * vm, void * reserved) } //static SharedContextHolder shared_context; -void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv *, jobject, jbyteArray) +[[ noreturn ]] void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray) { - try - { - init(); - } - catch (const DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + init(); + GLUTEN_JNI_METHOD_END(env, ) } jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithRowIterator( JNIEnv * env, jobject obj, jbyteArray plan) { - try - { - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto * executor = createExecutor(plan_string); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto * executor = createExecutor(plan_string); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator( JNIEnv * env, jobject obj, jlong, jbyteArray plan, jobjectArray iter_arr) { - try + + GLUTEN_JNI_METHOD_START + auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(context); + jsize iter_num = env->GetArrayLength(iter_arr); + for (jsize i = 0; i < iter_num; i++) { - auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); - local_engine::SerializedPlanParser parser(context); - jsize iter_num = env->GetArrayLength(iter_arr); - for (jsize i = 0; i < iter_num; i++) - { - jobject iter = env->GetObjectArrayElement(iter_arr, i); - iter = env->NewGlobalRef(iter); - parser.addInputIter(iter); - } - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto query_plan = parser.parse(plan_string); - local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); - executor->execute(std::move(query_plan)); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); + jobject iter = env->GetObjectArrayElement(iter_arr, i); + iter = env->NewGlobalRef(iter); + parser.addInputIter(iter); } + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto query_plan = parser.parse(plan_string); + local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); + executor->execute(std::move(query_plan)); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + GLUTEN_JNI_METHOD_END(env, -1) } jboolean Java_io_glutenproject_row_RowIterator_nativeHasNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - return executor->hasNext(); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + return executor->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } jobject Java_io_glutenproject_row_RowIterator_nativeNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - local_engine::SparkRowInfoPtr spark_row_info = executor->next(); + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + local_engine::SparkRowInfoPtr spark_row_info = executor->next(); - auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * offsets_src = reinterpret_cast(spark_row_info->getOffsets().data()); - env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); - auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); - const auto * lengths_src = reinterpret_cast(spark_row_info->getLengths().data()); - env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); - int64_t address = reinterpret_cast(spark_row_info->getBufferAddress()); - int64_t column_number = reinterpret_cast(spark_row_info->getNumCols()); - int64_t total_size = reinterpret_cast(spark_row_info->getTotalBytes()); + auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); + const auto * offsets_src = reinterpret_cast(spark_row_info->getOffsets().data()); + env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); + auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); + const auto * lengths_src = reinterpret_cast(spark_row_info->getLengths().data()); + env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); + int64_t address = reinterpret_cast(spark_row_info->getBufferAddress()); + int64_t column_number = reinterpret_cast(spark_row_info->getNumCols()); + int64_t total_size = reinterpret_cast(spark_row_info->getTotalBytes()); - jobject spark_row_info_object = env->NewObject( - spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); + jobject spark_row_info_object + = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); - return spark_row_info_object; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + return spark_row_info_object; + GLUTEN_JNI_METHOD_END(env, nullptr) } void Java_io_glutenproject_row_RowIterator_nativeClose(JNIEnv * env, jobject obj, jlong executor_address) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); delete executor; + GLUTEN_JNI_METHOD_END(env,) } // Columnar Iterator jboolean Java_io_glutenproject_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - return executor->hasNext(); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + return executor->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } jlong Java_io_glutenproject_vectorized_BatchIterator_nativeCHNext(JNIEnv * env, jobject obj, jlong executor_address) { - try - { - local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); - Block * column_batch = executor->nextColumnar(); - return reinterpret_cast(column_batch); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); + Block * column_batch = executor->nextColumnar(); + return reinterpret_cast(column_batch); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject obj, jlong executor_address) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); delete executor; + GLUTEN_JNI_METHOD_END(env,) } @@ -263,73 +228,61 @@ void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMet ColumnWithTypeAndName inline getColumnFromColumnVector(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try - { - Block * block = reinterpret_cast(block_address); - return block->getByPosition(column_position); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + Block * block = reinterpret_cast(block_address); + return block->getByPosition(column_position); + GLUTEN_JNI_METHOD_END(env,{}) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try + GLUTEN_JNI_METHOD_START + Block * block = reinterpret_cast(block_address); + auto col = getColumnFromColumnVector(env, obj, block_address, column_position); + if (!col.column->isNullable()) { - Block * block = reinterpret_cast(block_address); - auto col = getColumnFromColumnVector(env, obj, block_address, column_position); - if (!col.column->isNullable()) - { - return false; - } - else - { - auto * nullable = checkAndGetColumn(*col.column); - size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); - return num_nulls < block->rows(); - } + return false; } - catch (DB::Exception & e) + else { - local_engine::ExceptionUtils::handleException(e); + const auto * nullable = checkAndGetColumn(*col.column); + size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); + return num_nulls < block->rows(); } + GLUTEN_JNI_METHOD_END(env,false) } jint Java_io_glutenproject_vectorized_CHColumnVector_nativeNumNulls(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { - try + GLUTEN_JNI_METHOD_START + auto col = getColumnFromColumnVector(env, obj, block_address, column_position); + if (!col.column->isNullable()) { - auto col = getColumnFromColumnVector(env, obj, block_address, column_position); - if (!col.column->isNullable()) - { - return 0; - } - else - { - auto * nullable = checkAndGetColumn(*col.column); - return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); - } + return 0; } - catch (DB::Exception & e) + else { - local_engine::ExceptionUtils::handleException(e); + const auto * nullable = checkAndGetColumn(*col.column); + return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); } + GLUTEN_JNI_METHOD_END(env, -1) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeIsNullAt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); return col.column->isNullAt(row_id); + GLUTEN_JNI_METHOD_END(env, false) } jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -337,11 +290,13 @@ jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getBool(row_id); + GLUTEN_JNI_METHOD_END(env, false) } jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -349,11 +304,13 @@ jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte( nested_col = nullable_col->getNestedColumnPtr(); } return reinterpret_cast(nested_col->getDataAt(row_id).data)[0]; + GLUTEN_JNI_METHOD_END(env, 0) } jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -361,11 +318,13 @@ jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort( nested_col = nullable_col->getNestedColumnPtr(); } return reinterpret_cast(nested_col->getDataAt(row_id).data)[0]; + GLUTEN_JNI_METHOD_END(env, -1) } jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -380,11 +339,13 @@ jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt( { return nested_col->getInt(row_id); } + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -392,11 +353,13 @@ jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getInt(row_id); + GLUTEN_JNI_METHOD_END(env, -1) } jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -404,11 +367,13 @@ jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getFloat32(row_id); + GLUTEN_JNI_METHOD_END(env, 0.0) } jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -416,11 +381,13 @@ jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble( nested_col = nullable_col->getNestedColumnPtr(); } return nested_col->getFloat64(row_id); + GLUTEN_JNI_METHOD_END(env, 0.0) } jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { + GLUTEN_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); ColumnPtr nested_col = col.column; if (const ColumnNullable * nullable_col = checkAndGetColumn(nested_col.get())) @@ -429,7 +396,8 @@ jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString( } const ColumnString * string_col = checkAndGetColumn(nested_col.get()); auto result = string_col->getDataAt(row_id); - return charTojstring(env, result.toString().c_str()); + return gluten::charTojstring(env, result.toString().c_str()); + GLUTEN_JNI_METHOD_END(env, gluten::charTojstring(env, "")) } // native block @@ -442,18 +410,23 @@ void Java_io_glutenproject_vectorized_CHNativeBlock_nativeClose(JNIEnv * env, jo jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * env, jobject obj, jlong block_address) { + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); return block->rows(); + GLUTEN_JNI_METHOD_END(env, -1) } jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * env, jobject obj, jlong block_address) { + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); return block->columns(); + GLUTEN_JNI_METHOD_END(env, -1) } jstring Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * env, jobject obj, jlong block_address, jint position) -{ +{ + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); WhichDataType which(block->getByPosition(position).type); std::string type; @@ -518,144 +491,116 @@ jstring Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * throw std::runtime_error("unsupported datatype " + type_name); } - return charTojstring(env, type.c_str()); + return gluten::charTojstring(env, type.c_str()); + GLUTEN_JNI_METHOD_END(env, gluten::charTojstring(env, "")) } jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * env, jobject obj, jlong block_address) { + GLUTEN_JNI_METHOD_START Block * block = reinterpret_cast(block_address); return block->bytes(); + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader(JNIEnv * env, jclass clazz, jobject input_stream, jboolean compressed) { - try - { - auto input = env->NewGlobalRef(input_stream); - auto read_buffer = std::make_unique(input); - auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed); - return reinterpret_cast(shuffle_reader); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + auto * input = env->NewGlobalRef(input_stream); + auto read_buffer = std::make_unique(input); + auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed); + return reinterpret_cast(shuffle_reader); + GLUTEN_JNI_METHOD_END(env, -1) } jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * env, jobject obj, jlong shuffle_reader) { -// try -// { - local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); - Block * block = reader->read(); - return reinterpret_cast(block); -// } -// catch (DB::Exception & e) -// { -// local_engine::ExceptionUtils::handleException(e); -// } + GLUTEN_JNI_METHOD_START + local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); + Block * block = reader->read(); + return reinterpret_cast(block); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject obj, jlong shuffle_reader) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast(shuffle_reader); delete reader; + GLUTEN_JNI_METHOD_END(env,) } // CHCoalesceOperator jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv * env, jobject obj, jint buf_size) { - try - { - local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size); - return reinterpret_cast(instance); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size); + return reinterpret_cast(instance); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeMergeBlock( JNIEnv * env, jobject obj, jlong instance_address, jlong block_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - DB::Block * block = reinterpret_cast(block_address); - auto new_block = DB::Block(*block); - instance->mergeBlock(new_block); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + DB::Block * block = reinterpret_cast(block_address); + auto new_block = DB::Block(*block); + instance->mergeBlock(new_block); + GLUTEN_JNI_METHOD_END(env,) } jboolean Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * env, jobject obj, jlong instance_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - bool full = instance->isFull(); - return full ? JNI_TRUE : JNI_FALSE; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + bool full = instance->isFull(); + return full ? JNI_TRUE : JNI_FALSE; + GLUTEN_JNI_METHOD_END(env, false) } jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv * env, jobject obj, jlong instance_address) { - try - { - local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); - auto block = instance->releaseBlock(); - DB::Block * new_block = new DB::Block(); - new_block->swap(block); - long address = reinterpret_cast(new_block); - return address; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); + auto block = instance->releaseBlock(); + DB::Block * new_block = new DB::Block(); + new_block->swap(block); + Int64 address = reinterpret_cast(new_block); + return address; + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * env, jobject obj, jlong instance_address) { + GLUTEN_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = reinterpret_cast(instance_address); delete instance; + GLUTEN_JNI_METHOD_END(env,) } std::string jstring2string(JNIEnv * env, jstring jStr) { - try - { - if (!jStr) - return ""; + GLUTEN_JNI_METHOD_START + if (!jStr) + return ""; - const jclass stringClass = env->GetObjectClass(jStr); - const jmethodID getBytes = env->GetMethodID(stringClass, "getBytes", "(Ljava/lang/String;)[B"); - const jbyteArray stringJbytes = static_cast(env->CallObjectMethod(jStr, getBytes, env->NewStringUTF("UTF-8"))); + auto * string_class = env->GetObjectClass(jStr); + auto * get_bytes = env->GetMethodID(string_class, "getBytes", "(Ljava/lang/String;)[B"); + auto * string_jbytes = static_cast(env->CallObjectMethod(jStr, get_bytes, env->NewStringUTF("UTF-8"))); - size_t length = static_cast(env->GetArrayLength(stringJbytes)); - jbyte * pBytes = env->GetByteArrayElements(stringJbytes, nullptr); + size_t length = static_cast(env->GetArrayLength(string_jbytes)); + jbyte * pbytes = env->GetByteArrayElements(string_jbytes, nullptr); - std::string ret = std::string(reinterpret_cast(pBytes), length); - env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT); + std::string ret = std::string(reinterpret_cast(pbytes), length); + env->ReleaseByteArrayElements(string_jbytes, pbytes, JNI_ABORT); - env->DeleteLocalRef(stringJbytes); - env->DeleteLocalRef(stringClass); - return ret; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + env->DeleteLocalRef(string_jbytes); + env->DeleteLocalRef(string_class); + return ret; + GLUTEN_JNI_METHOD_END(env,"") } std::vector stringSplit(const std::string & str, char delim) @@ -672,6 +617,7 @@ std::vector stringSplit(const std::string & str, char delim) { local_engine::ExceptionUtils::handleException(e); } + __builtin_unreachable(); } @@ -688,98 +634,87 @@ jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMake( jstring data_file, jstring local_dirs) { - try - { - std::vector expr_vec; - if (expr_list != nullptr) + GLUTEN_JNI_METHOD_START + std::vector expr_vec; + if (expr_list != nullptr) + { + int len = env->GetArrayLength(expr_list); + auto * str = reinterpret_cast(new char[len]); + memset(str, 0, len); + env->GetByteArrayRegion(expr_list, 0, len, str); + std::string exprs(str, str + len); + delete[] str; + for (const auto & expr : stringSplit(exprs, ',')) { - int len = env->GetArrayLength(expr_list); - auto * str = reinterpret_cast(new char[len]); - memset(str, 0, len); - env->GetByteArrayRegion(expr_list, 0, len, str); - std::string exprs(str, str + len); - delete[] str; - for (const auto & expr : stringSplit(exprs, ',')) - { - expr_vec.emplace_back(expr); - } + expr_vec.emplace_back(expr); } - local_engine::SplitOptions options{ - .buffer_size = static_cast(buffer_size), - .data_file = jstring2string(env, data_file), - .local_tmp_dir = jstring2string(env, local_dirs), - .map_id = static_cast(map_id), - .partition_nums = static_cast(num_partitions), - .exprs = expr_vec, - .compress_method = jstring2string(env, codec)}; - local_engine::SplitterHolder * splitter - = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(jstring2string(env, short_name), options)}; - return reinterpret_cast(splitter); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); } + local_engine::SplitOptions options{ + .buffer_size = static_cast(buffer_size), + .data_file = jstring2string(env, data_file), + .local_tmp_dir = jstring2string(env, local_dirs), + .map_id = static_cast(map_id), + .partition_nums = static_cast(num_partitions), + .exprs = expr_vec, + .compress_method = jstring2string(env, codec)}; + local_engine::SplitterHolder * splitter + = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(jstring2string(env, short_name), options)}; + return reinterpret_cast(splitter); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv *, jobject, jlong splitterId, jint, jlong block) +void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv * env, jobject, jlong splitterId, jint, jlong block) { - try - { - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - Block * data = reinterpret_cast(block); - splitter->splitter->split(*data); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); + Block * data = reinterpret_cast(block); + splitter->splitter->split(*data); + GLUTEN_JNI_METHOD_END(env,) } jobject Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId) { - try - { - local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); - auto result = splitter->splitter->stop(); - const auto & partition_lengths = result.partition_length; - auto partition_length_arr = env->NewLongArray(partition_lengths.size()); - auto src = reinterpret_cast(partition_lengths.data()); - env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); - - const auto & raw_partition_lengths = result.raw_partition_length; - auto raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); - auto raw_src = reinterpret_cast(raw_partition_lengths.data()); - env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); - - jobject split_result = env->NewObject( - split_result_class, - split_result_constructor, - result.total_compute_pid_time, - result.total_write_time, - result.total_spill_time, - 0, - result.total_bytes_written, - result.total_bytes_written, - partition_length_arr, - raw_partition_length_arr); - - return split_result; - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } -} -void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv *, jobject, jlong splitterId) -{ + GLUTEN_JNI_METHOD_START + local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); + auto result = splitter->splitter->stop(); + const auto & partition_lengths = result.partition_length; + auto partition_length_arr = env->NewLongArray(partition_lengths.size()); + auto src = reinterpret_cast(partition_lengths.data()); + env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); + + const auto & raw_partition_lengths = result.raw_partition_length; + auto raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); + auto raw_src = reinterpret_cast(raw_partition_lengths.data()); + env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); + + jobject split_result = env->NewObject( + split_result_class, + split_result_constructor, + result.total_compute_pid_time, + result.total_write_time, + result.total_spill_time, + 0, + result.total_bytes_written, + result.total_bytes_written, + partition_length_arr, + raw_partition_length_arr); + + return split_result; + GLUTEN_JNI_METHOD_END(env, nullptr) +} + +void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv * env, jobject, jlong splitterId) +{ + GLUTEN_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); delete splitter; + GLUTEN_JNI_METHOD_END(env,) } // BlockNativeConverter jobject Java_io_glutenproject_vectorized_BlockNativeConverter_converColumarToRow(JNIEnv * env, jobject, jlong block_address) { + GLUTEN_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; Block * block = reinterpret_cast(block_address); auto spark_row_info = converter.convertCHColumnToSparkRow(*block); @@ -798,52 +733,66 @@ jobject Java_io_glutenproject_vectorized_BlockNativeConverter_converColumarToRow = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); return spark_row_info_object; + GLUTEN_JNI_METHOD_END(env, nullptr) } -void Java_io_glutenproject_vectorized_BlockNativeConverter_freeMemory(JNIEnv *, jobject, jlong address, jlong size) +void Java_io_glutenproject_vectorized_BlockNativeConverter_freeMemory(JNIEnv * env, jobject, jlong address, jlong size) { + GLUTEN_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; converter.freeMem(reinterpret_cast(address), size); + GLUTEN_JNI_METHOD_END(env,) } // BlockNativeWriter -jlong Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCreateInstance(JNIEnv *, jobject) +jlong Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCreateInstance(JNIEnv * env, jobject) { + GLUTEN_JNI_METHOD_START auto * writer = new local_engine::NativeWriterInMemory(); return reinterpret_cast(writer); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeWrite(JNIEnv *, jobject, jlong instance, jlong block_address) +void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); auto * block = reinterpret_cast(block_address); writer->write(*block); + GLUTEN_JNI_METHOD_END(env,) } -jint Java_io_glutenproject_vectorized_BlockNativeWriter_nativeResultSize(JNIEnv *, jobject, jlong instance) +jint Java_io_glutenproject_vectorized_BlockNativeWriter_nativeResultSize(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); return static_cast(writer->collect().size()); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeCollect(JNIEnv * env, jobject, jlong instance, jbyteArray result) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); auto data = writer->collect(); env->SetByteArrayRegion(result, 0, data.size(), reinterpret_cast(data.data())); + GLUTEN_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeClose(JNIEnv *, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockNativeWriter_nativeClose(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START auto * writer = reinterpret_cast(instance); delete writer; + GLUTEN_JNI_METHOD_END(env,) } void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( JNIEnv * env, jobject, jstring hash_table_id_, jobject in, jstring join_key_, jstring join_type_, jbyteArray named_struct) { + GLUTEN_JNI_METHOD_START auto * input = env->NewGlobalRef(in); auto read_buffer = std::make_unique(input); auto hash_table_id = jstring2string(env, hash_table_id_); @@ -855,12 +804,14 @@ void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( struct_string.assign(reinterpret_cast(struct_address), struct_size); local_engine::BroadCastJoinBuilder::buildJoinIfNotExist(hash_table_id, std::move(read_buffer), join_key, join_type, struct_string); env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT); + GLUTEN_JNI_METHOD_END(env,) } // BlockSplitIterator jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jint partition_num, jint buffer_size) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Options options; options.partition_nums = partition_num; options.buffer_size = buffer_size; @@ -870,102 +821,120 @@ jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{ .splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)}; return reinterpret_cast(splitter); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); delete splitter; + GLUTEN_JNI_METHOD_END(env,) } -jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * /*env*/, jobject, jlong instance) +jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return splitter->splitter->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } -jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * /*env*/, jobject, jlong instance) +jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return reinterpret_cast(splitter->splitter->next()); + GLUTEN_JNI_METHOD_END(env, false) } -jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * /*env*/, jobject, jlong instance) +jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast(instance); return reinterpret_cast(splitter->splitter->nextPartitionId()); + GLUTEN_JNI_METHOD_END(env, -1) } // BlockOutputStream -jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(JNIEnv * /*env*/, jobject, jobject output_stream, jbyteArray buffer) +jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = new local_engine::ShuffleWriter(output_stream, buffer); return reinterpret_cast(writer); + GLUTEN_JNI_METHOD_END(env, -1) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); writer->flush(); delete writer; + GLUTEN_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * /*env*/, jobject, jlong instance, jlong block_address) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); DB::Block * block = reinterpret_cast(block_address); writer->write(*block); + GLUTEN_JNI_METHOD_END(env,) } -void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * /*env*/, jobject, jlong instance) +void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * env, jobject, jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast(instance); writer->flush(); + GLUTEN_JNI_METHOD_END(env,) } // SimpleExpressionEval jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass , jobject input, jbyteArray plan) { - try - { - auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); - local_engine::SerializedPlanParser parser(context); - jobject iter = env->NewGlobalRef(input); - parser.addInputIter(iter); - jsize plan_size = env->GetArrayLength(plan); - jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); - std::string plan_string; - plan_string.assign(reinterpret_cast(plan_address), plan_size); - auto query_plan = parser.parse(plan_string); - local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); - executor->execute(std::move(query_plan)); - env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); - return reinterpret_cast(executor); - } - catch (DB::Exception & e) - { - local_engine::ExceptionUtils::handleException(e); - } + GLUTEN_JNI_METHOD_START + auto context = Context::createCopy(local_engine::SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(context); + jobject iter = env->NewGlobalRef(input); + parser.addInputIter(iter); + jsize plan_size = env->GetArrayLength(plan); + jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); + std::string plan_string; + plan_string.assign(reinterpret_cast(plan_address), plan_size); + auto query_plan = parser.parse(plan_string); + local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context); + executor->execute(std::move(query_plan)); + env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); + return reinterpret_cast(executor); + GLUTEN_JNI_METHOD_END(env, -1) } void Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass , jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); delete executor; + GLUTEN_JNI_METHOD_END(env,) } jboolean Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * env, jclass , jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); return executor->hasNext(); + GLUTEN_JNI_METHOD_END(env, false) } jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * env, jclass , jlong instance) { + GLUTEN_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); return reinterpret_cast(executor->nextColumnar()); + GLUTEN_JNI_METHOD_END(env, -1) } #ifdef __cplusplus }