From d0b1e36b5f40e14c99c3af854a7c66295c32749b Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Sun, 28 Sep 2025 09:53:34 +0800 Subject: [PATCH] Reject all JNI calls after native destroy is invoked. --- cpp-ch/local-engine/Common/CHUtil.cpp | 1 - cpp-ch/local-engine/Common/QueryContext.cpp | 24 ++++- cpp-ch/local-engine/Common/QueryContext.h | 20 +++- cpp-ch/local-engine/Parser/LocalExecutor.h | 4 +- cpp-ch/local-engine/jni/jni_common.cpp | 27 +++++ cpp-ch/local-engine/jni/jni_common.h | 103 ++++++++++++++++++++ cpp-ch/local-engine/jni/jni_error.h | 23 ----- cpp-ch/local-engine/local_engine_jni.cpp | 63 +++++++----- 8 files changed, 210 insertions(+), 55 deletions(-) diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 907c4b58dd3e..30c872eda306 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -1000,7 +1000,6 @@ void BackendFinalizerUtil::finalizeGlobally() // Make sure client caches release before ClientCacheRegistry ReadBufferBuilderFactory::instance().clean(); StorageMergeTreeFactory::clear_cache_map(); - QueryContext::resetGlobal(); QueryContext::instance().reset(); std::lock_guard lock(paths_mutex); std::ranges::for_each( diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index 9cc1b030cae1..c6aaa9ccc5d3 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -27,6 +27,7 @@ #include #include #include +#include namespace DB { @@ -59,6 +60,7 @@ DB::ContextMutablePtr QueryContext::globalMutableContext() { return Data::global_context; } + void QueryContext::resetGlobal() { if (Data::global_context) @@ -71,7 +73,19 @@ void QueryContext::resetGlobal() void QueryContext::reset() { + { + std::lock_guard lock(local_executor_mutex_); + for (auto it = active_executors_.begin(); it != active_executors_.end(); ++it) + { + auto * executor = *it; + executor->cancel(); + LOG_ERROR(logger_, "Not closed LocalExecutor({}):\n{}", reinterpret_cast(executor), executor->dumpPipeline()); + delete executor; + } + active_executors_.clear(); + } query_map_.clear(); + QueryContext::resetGlobal(); } DB::ContextMutablePtr QueryContext::createGlobal() @@ -200,14 +214,20 @@ void QueryContext::finalizeQuery(int64_t id) size_t currentThreadGroupMemoryUsage() { if (!CurrentThread::getGroup()) - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first."); + { + LOG_ERROR(getLogger("QueryContext"), "Thread group not found, please call initializeQuery first."); + return 0; + } return CurrentThread::getGroup()->memory_tracker.get(); } double currentThreadGroupMemoryUsageRatio() { if (!CurrentThread::getGroup()) - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first."); + { + LOG_ERROR(getLogger("QueryContext"), "Thread group not found, please call initializeQuery first."); + return 0; + } return static_cast(CurrentThread::getGroup()->memory_tracker.get()) / CurrentThread::getGroup()->memory_tracker.getSoftLimit(); } } diff --git a/cpp-ch/local-engine/Common/QueryContext.h b/cpp-ch/local-engine/Common/QueryContext.h index 74c4a69aeece..cf9b904b2998 100644 --- a/cpp-ch/local-engine/Common/QueryContext.h +++ b/cpp-ch/local-engine/Common/QueryContext.h @@ -18,6 +18,8 @@ #include #include #include +#include +#include namespace DB { @@ -25,7 +27,7 @@ struct ContextSharedPart; } namespace local_engine { - +class LocalExecutor; class QueryContext { struct Data; @@ -47,6 +49,19 @@ class QueryContext void logCurrentPerformanceCounters(ProfileEvents::Counters & counters, const String & task_id) const; size_t currentPeakMemory(int64_t id); void finalizeQuery(int64_t id); + void attachLocalExecutor(LocalExecutor * executor) + { + std::lock_guard lock(local_executor_mutex_); + active_executors_.insert(executor); + } + + // If the executor is found and detached, return true; otherwise return false. + bool detachLocalExecutor(LocalExecutor * executor) + { + std::lock_guard lock(local_executor_mutex_); + auto n = active_executors_.erase(executor); + return n > 0; + } // Clear resources held by the QueryContext instance. void reset(); @@ -55,6 +70,9 @@ class QueryContext QueryContext() = default; LoggerPtr logger_ = getLogger("QueryContextManager"); ConcurrentMap> query_map_{}; + + std::mutex local_executor_mutex_; + std::set active_executors_{}; }; size_t currentThreadGroupMemoryUsage(); diff --git a/cpp-ch/local-engine/Parser/LocalExecutor.h b/cpp-ch/local-engine/Parser/LocalExecutor.h index ea1445c605e1..17f940770145 100644 --- a/cpp-ch/local-engine/Parser/LocalExecutor.h +++ b/cpp-ch/local-engine/Parser/LocalExecutor.h @@ -59,6 +59,8 @@ class LocalExecutor : public BlockIterator RelMetricPtr getMetric() const { return metric; } void setMetric(const RelMetricPtr & metric_) { metric = metric_; } void setExtraPlanHolder(std::vector & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); } + /// Dump processor runtime information to log + std::string dumpPipeline() const; private: // In the case of fallback, there may be multiple native pipelines in one stage. Can determine whether a fallback has occurred by whether a LocalExecutor already exists. @@ -66,8 +68,6 @@ class LocalExecutor : public BlockIterator static thread_local LocalExecutor * current_executor; std::unique_ptr writeBlockToSparkRow(const DB::Block & block) const; void initPullingPipelineExecutor(); - /// Dump processor runtime information to log - std::string dumpPipeline() const; DB::QueryPipelineBuilderPtr query_pipeline_builder; DB::QueryPipeline query_pipeline; diff --git a/cpp-ch/local-engine/jni/jni_common.cpp b/cpp-ch/local-engine/jni/jni_common.cpp index 6eb02a2f450b..0f8a1c6b3c1f 100644 --- a/cpp-ch/local-engine/jni/jni_common.cpp +++ b/cpp-ch/local-engine/jni/jni_common.cpp @@ -94,4 +94,31 @@ jbyteArray stringTojbyteArray(JNIEnv * env, const std::string & str) return jarray; } +JniEnvStatusWatcher & JniEnvStatusWatcher::instance() +{ + static JniEnvStatusWatcher instance; + return instance; +} + +JniMethodCallCounter & JniMethodCallCounter::instance() +{ + static JniMethodCallCounter instance; + return instance; +} + +JniMethodGuard::JniMethodGuard() +{ + JniMethodCallCounter::instance().increment(); } + +JniMethodGuard::~JniMethodGuard() +{ + JniMethodCallCounter::instance().decrement(); +} + +bool JniMethodGuard::couldInvoke() +{ + return JniEnvStatusWatcher::instance().isActive(); +} + +} // namespace local_engine diff --git a/cpp-ch/local-engine/jni/jni_common.h b/cpp-ch/local-engine/jni/jni_common.h index e072d124ca72..635ccafe04e7 100644 --- a/cpp-ch/local-engine/jni/jni_common.h +++ b/cpp-ch/local-engine/jni/jni_common.h @@ -22,6 +22,7 @@ #include #include #include +#include namespace DB { @@ -46,6 +47,108 @@ jstring charTojstring(JNIEnv * env, const char * pat); jbyteArray stringTojbyteArray(JNIEnv * env, const std::string & str); +// A watcher to monitor JNIEnv status: it's active or not +class JniEnvStatusWatcher +{ +public: + static JniEnvStatusWatcher & instance(); + void setActive() { is_active.store(true, std::memory_order_release); } + void setInactive() { is_active.store(false, std::memory_order_release); } + bool isActive() const { return is_active.load(std::memory_order_acquire); } +protected: + JniEnvStatusWatcher() = default; + ~JniEnvStatusWatcher() = default; + std::atomic is_active = false; +}; + +// A counter to track ongoing JNI method calls +// It help to ensure that no JNI method is running when we are destroying the JNI resources +class JniMethodCallCounter +{ +public: + static JniMethodCallCounter & instance(); + void increment() { counter.fetch_add(1, std::memory_order_acq_rel); } + void decrement() { + auto old_val = counter.fetch_sub(1, std::memory_order_acq_rel); + if (old_val == 1) + { + std::lock_guard lock(mutex); + cv.notify_all(); + } + else if (old_val < 1) [[unlikely]] + { + // This should never happen + LOG_ERROR(getLogger("jni"), "JniMethodCallCounter counter underflow: {}", old_val); + } + } + // This will only be called in the destroy JNI method to wait for all ongoing JNI method calls to finish + void waitForZero() + { + std::unique_lock lock(mutex); + cv.wait(lock, [this] { + return counter.load(std::memory_order_acquire) <= 0; + }); + } +protected: + JniMethodCallCounter() = default; + ~JniMethodCallCounter() = default; +private: + std::atomic counter{0}; + std::mutex mutex; + std::condition_variable cv; +}; + +class JniMethodGuard +{ +public: + JniMethodGuard(); + ~JniMethodGuard(); + bool couldInvoke(); +}; + +#define LOCAL_ENGINE_JNI_DESTROY_METHOD_START \ + local_engine::JniEnvStatusWatcher::instance().setInactive(); \ + local_engine::JniMethodCallCounter::instance().waitForZero(); \ + do \ + { \ + try \ + { + +#define LOCAL_ENGINE_JNI_METHOD_START \ + local_engine::JniMethodGuard jni_method_guard; \ + do \ + { \ + try \ + { \ + if (!jni_method_guard.couldInvoke()) \ + { \ + LOG_ERROR(getLogger("jni"), "Call JNI method {} when JNIEnv is not active!", __FUNCTION__); \ + break; \ + } + +#define LOCAL_ENGINE_JNI_METHOD_END(env, ret) \ + } \ + catch (DB::Exception & e) \ + { \ + local_engine::JniErrorsGlobalState::instance().throwException(env, e); \ + break; \ + } \ + catch (std::exception & e) \ + { \ + local_engine::JniErrorsGlobalState::instance().throwException(env, e); \ + break; \ + } \ + catch (...) \ + { \ + DB::WriteBufferFromOwnString ostr; \ + auto trace = boost::stacktrace::stacktrace(); \ + boost::stacktrace::detail::to_string(&trace.as_vector()[0], trace.size()); \ + local_engine::JniErrorsGlobalState::instance().throwRuntimeException(env, "Unknown Exception", ostr.str().c_str()); \ + break; \ + } \ + } while (0); \ + return ret; + #define LOCAL_ENGINE_JNI_JMETHOD_START #define LOCAL_ENGINE_JNI_JMETHOD_END(env) \ if ((env)->ExceptionCheck()) \ diff --git a/cpp-ch/local-engine/jni/jni_error.h b/cpp-ch/local-engine/jni/jni_error.h index c6f46bc8fc6e..0cfed7283aa9 100644 --- a/cpp-ch/local-engine/jni/jni_error.h +++ b/cpp-ch/local-engine/jni/jni_error.h @@ -62,27 +62,4 @@ class JniErrorsGlobalState : boost::noncopyable }; // -#define LOCAL_ENGINE_JNI_METHOD_START \ - try \ - { -#define LOCAL_ENGINE_JNI_METHOD_END(env, ret) \ - } \ - catch (DB::Exception & e) \ - { \ - local_engine::JniErrorsGlobalState::instance().throwException(env, e); \ - return ret; \ - } \ - catch (std::exception & e) \ - { \ - local_engine::JniErrorsGlobalState::instance().throwException(env, e); \ - return ret; \ - } \ - catch (...) \ - { \ - DB::WriteBufferFromOwnString ostr; \ - auto trace = boost::stacktrace::stacktrace(); \ - boost::stacktrace::detail::to_string(&trace.as_vector()[0], trace.size()); \ - local_engine::JniErrorsGlobalState::instance().throwRuntimeException(env, "Unknown Exception", ostr.str().c_str()); \ - return ret; \ - } } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c442b97fe83d..0f56336a688c 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -166,6 +166,8 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) local_engine::SparkRowInfoJNI::init(env); local_engine::JNIUtils::vm = vm; + + local_engine::JniEnvStatusWatcher::instance().setActive(); return JNI_VERSION_1_8; } @@ -194,27 +196,28 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeDestroyNative(JNIEnv * env, jclass) { - LOCAL_ENGINE_JNI_METHOD_START - // Ensure that current_thread is valid before destroying resource to avoid - // any future calls to CurrentThread::get() in parts of the code. - // For example, see `MergeTreeData::getInMemoryMetadataPtr()` - DB::ThreadStatus thread_status; - local_engine::BackendFinalizerUtil::finalizeGlobally(); - - local_engine::JniErrorsGlobalState::instance().destroy(env); - local_engine::BroadCastJoinBuilder::destroy(env); - local_engine::SparkMergeTreeWriterJNI::destroy(env); - local_engine::SparkRowInfoJNI::destroy(env); - - env->DeleteGlobalRef(block_stripes_class); - env->DeleteGlobalRef(split_result_class); - env->DeleteGlobalRef(block_stats_class); - env->DeleteGlobalRef(local_engine::ShuffleReader::shuffle_input_stream_class); - env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class); - env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class); - env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); - env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class); - + LOCAL_ENGINE_JNI_DESTROY_METHOD_START + { + // Ensure that current_thread is valid before destroying resource to avoid + // any future calls to CurrentThread::get() in parts of the code. + // For example, see `MergeTreeData::getInMemoryMetadataPtr()` + DB::ThreadStatus thread_status; + local_engine::BackendFinalizerUtil::finalizeGlobally(); + + local_engine::JniErrorsGlobalState::instance().destroy(env); + local_engine::BroadCastJoinBuilder::destroy(env); + local_engine::SparkMergeTreeWriterJNI::destroy(env); + local_engine::SparkRowInfoJNI::destroy(env); + + env->DeleteGlobalRef(block_stripes_class); + env->DeleteGlobalRef(split_result_class); + env->DeleteGlobalRef(block_stats_class); + env->DeleteGlobalRef(local_engine::ShuffleReader::shuffle_input_stream_class); + env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class); + env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class); + env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); + env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class); + } LOCAL_ENGINE_JNI_METHOD_END(env, ) } @@ -275,9 +278,10 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_ } local_engine::LocalExecutor * executor = parser.createExecutor(plan_pb).release(); - LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast(executor)); + LOG_INFO(getLogger("jni"), "Construct LocalExecutor {}", reinterpret_cast(executor)); executor->setMetric(parser.getMetric()); executor->setExtraPlanHolder(parser.extra_plan_holder); + local_engine::QueryContext::instance().attachLocalExecutor(executor); return reinterpret_cast(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) @@ -306,7 +310,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIE LOCAL_ENGINE_JNI_METHOD_START auto * executor = reinterpret_cast(executor_address); executor->cancel(); - LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast(executor)); + LOG_INFO(getLogger("jni"), "Cancel LocalExecutor {}", reinterpret_cast(executor)); LOCAL_ENGINE_JNI_METHOD_END(env, ) } @@ -314,9 +318,12 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEn { LOCAL_ENGINE_JNI_METHOD_START auto * executor = reinterpret_cast(executor_address); - LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast(executor)); + LOG_INFO(getLogger("jni"), "Finalize LocalExecutor {}", reinterpret_cast(executor)); local_engine::LocalExecutor::resetCurrentExecutor(); - delete executor; + if (local_engine::QueryContext::instance().detachLocalExecutor(executor)) + delete executor; + else + LOG_WARNING(getLogger("jni"), "The LocalExecutor {} has been detached before, should not delete it again.", reinterpret_cast(executor)); LOCAL_ENGINE_JNI_METHOD_END(env, ) } @@ -1296,6 +1303,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE const jobject iter = env->NewGlobalRef(input); parser.addInputIter(iter, false); local_engine::LocalExecutor * executor = parser.createExecutor(plan_pb).release(); + local_engine::QueryContext::instance().attachLocalExecutor(executor); return reinterpret_cast(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } @@ -1304,7 +1312,10 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClos { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast(instance); - delete executor; + if (local_engine::QueryContext::instance().detachLocalExecutor(executor)) + delete executor; + else + LOG_WARNING(getLogger("jni"), "The LocalExecutor {} has been detached before, should not delete it again.", reinterpret_cast(executor)); LOCAL_ENGINE_JNI_METHOD_END(env, ) }