Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,6 @@ void BackendFinalizerUtil::finalizeGlobally()
// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear_cache_map();
QueryContext::resetGlobal();
QueryContext::instance().reset();
std::lock_guard lock(paths_mutex);
std::ranges::for_each(
Expand Down
24 changes: 22 additions & 2 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Common/ThreadStatus.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Parser/LocalExecutor.h>

namespace DB
{
Expand Down Expand Up @@ -59,6 +60,7 @@ DB::ContextMutablePtr QueryContext::globalMutableContext()
{
return Data::global_context;
}

void QueryContext::resetGlobal()
{
if (Data::global_context)
Expand All @@ -71,7 +73,19 @@ void QueryContext::resetGlobal()

void QueryContext::reset()
{
{
std::lock_guard<std::mutex> lock(local_executor_mutex_);
for (auto it = active_executors_.begin(); it != active_executors_.end(); ++it)
{
auto * executor = *it;
executor->cancel();
LOG_ERROR(logger_, "Not closed LocalExecutor({}):\n{}", reinterpret_cast<intptr_t>(executor), executor->dumpPipeline());
delete executor;
}
active_executors_.clear();
}
query_map_.clear();
QueryContext::resetGlobal();
}

DB::ContextMutablePtr QueryContext::createGlobal()
Expand Down Expand Up @@ -200,14 +214,20 @@ void QueryContext::finalizeQuery(int64_t id)
size_t currentThreadGroupMemoryUsage()
{
if (!CurrentThread::getGroup())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first.");
{
LOG_ERROR(getLogger("QueryContext"), "Thread group not found, please call initializeQuery first.");
return 0;
}
return CurrentThread::getGroup()->memory_tracker.get();
}

double currentThreadGroupMemoryUsageRatio()
{
if (!CurrentThread::getGroup())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first.");
{
LOG_ERROR(getLogger("QueryContext"), "Thread group not found, please call initializeQuery first.");
return 0;
}
return static_cast<double>(CurrentThread::getGroup()->memory_tracker.get()) / CurrentThread::getGroup()->memory_tracker.getSoftLimit();
}
}
20 changes: 19 additions & 1 deletion cpp-ch/local-engine/Common/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
#include <Interpreters/Context_fwd.h>
#include <Common/ConcurrentMap.h>
#include <Common/ThreadStatus.h>
#include <mutex>
#include <set>

namespace DB
{
struct ContextSharedPart;
}
namespace local_engine
{

class LocalExecutor;
class QueryContext
{
struct Data;
Expand All @@ -47,6 +49,19 @@ class QueryContext
void logCurrentPerformanceCounters(ProfileEvents::Counters & counters, const String & task_id) const;
size_t currentPeakMemory(int64_t id);
void finalizeQuery(int64_t id);
void attachLocalExecutor(LocalExecutor * executor)
{
std::lock_guard<std::mutex> lock(local_executor_mutex_);
active_executors_.insert(executor);
}

// If the executor is found and detached, return true; otherwise return false.
bool detachLocalExecutor(LocalExecutor * executor)
{
std::lock_guard<std::mutex> lock(local_executor_mutex_);
auto n = active_executors_.erase(executor);
return n > 0;
}

// Clear resources held by the QueryContext instance.
void reset();
Expand All @@ -55,6 +70,9 @@ class QueryContext
QueryContext() = default;
LoggerPtr logger_ = getLogger("QueryContextManager");
ConcurrentMap<int64_t, std::shared_ptr<Data>> query_map_{};

std::mutex local_executor_mutex_;
std::set<LocalExecutor *> active_executors_{};
};

size_t currentThreadGroupMemoryUsage();
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Parser/LocalExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ class LocalExecutor : public BlockIterator
RelMetricPtr getMetric() const { return metric; }
void setMetric(const RelMetricPtr & metric_) { metric = metric_; }
void setExtraPlanHolder(std::vector<DB::QueryPlanPtr> & extra_plan_holder_) { extra_plan_holder = std::move(extra_plan_holder_); }
/// Dump processor runtime information to log
std::string dumpPipeline() const;

private:
// In the case of fallback, there may be multiple native pipelines in one stage. Can determine whether a fallback has occurred by whether a LocalExecutor already exists.
// Updated when the LocalExecutor is created and reset when the task ends
static thread_local LocalExecutor * current_executor;
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(const DB::Block & block) const;
void initPullingPipelineExecutor();
/// Dump processor runtime information to log
std::string dumpPipeline() const;

DB::QueryPipelineBuilderPtr query_pipeline_builder;
DB::QueryPipeline query_pipeline;
Expand Down
27 changes: 27 additions & 0 deletions cpp-ch/local-engine/jni/jni_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,31 @@ jbyteArray stringTojbyteArray(JNIEnv * env, const std::string & str)
return jarray;
}

JniEnvStatusWatcher & JniEnvStatusWatcher::instance()
{
static JniEnvStatusWatcher instance;
return instance;
}

JniMethodCallCounter & JniMethodCallCounter::instance()
{
static JniMethodCallCounter instance;
return instance;
}

JniMethodGuard::JniMethodGuard()
{
JniMethodCallCounter::instance().increment();
}

JniMethodGuard::~JniMethodGuard()
{
JniMethodCallCounter::instance().decrement();
}

bool JniMethodGuard::couldInvoke()
{
return JniEnvStatusWatcher::instance().isActive();
}

} // namespace local_engine
103 changes: 103 additions & 0 deletions cpp-ch/local-engine/jni/jni_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Poco/Logger.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <mutex>

namespace DB
{
Expand All @@ -46,6 +47,108 @@ jstring charTojstring(JNIEnv * env, const char * pat);

jbyteArray stringTojbyteArray(JNIEnv * env, const std::string & str);

// A watcher to monitor JNIEnv status: it's active or not
class JniEnvStatusWatcher
Copy link
Contributor

@zhanglistar zhanglistar Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add

JniEnvStatusWatcher() = default;
    ~JniEnvStatusWatcher() = default;  

and set to private to avoid compiler autogenerate. Same as above other singleton classes.

{
public:
static JniEnvStatusWatcher & instance();
void setActive() { is_active.store(true, std::memory_order_release); }
void setInactive() { is_active.store(false, std::memory_order_release); }
bool isActive() const { return is_active.load(std::memory_order_acquire); }
protected:
JniEnvStatusWatcher() = default;
~JniEnvStatusWatcher() = default;
std::atomic<bool> is_active = false;
};

// A counter to track ongoing JNI method calls
// It help to ensure that no JNI method is running when we are destroying the JNI resources
class JniMethodCallCounter
{
public:
static JniMethodCallCounter & instance();
void increment() { counter.fetch_add(1, std::memory_order_acq_rel); }
void decrement() {
auto old_val = counter.fetch_sub(1, std::memory_order_acq_rel);
if (old_val == 1)
{
std::lock_guard<std::mutex> lock(mutex);
cv.notify_all();
}
else if (old_val < 1) [[unlikely]]
{
// This should never happen
LOG_ERROR(getLogger("jni"), "JniMethodCallCounter counter underflow: {}", old_val);
}
}
// This will only be called in the destroy JNI method to wait for all ongoing JNI method calls to finish
void waitForZero()
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this] {
return counter.load(std::memory_order_acquire) <= 0;
});
}
protected:
JniMethodCallCounter() = default;
~JniMethodCallCounter() = default;
private:
std::atomic<int64_t> counter{0};
std::mutex mutex;
std::condition_variable cv;
};

class JniMethodGuard
{
public:
JniMethodGuard();
~JniMethodGuard();
bool couldInvoke();
};

#define LOCAL_ENGINE_JNI_DESTROY_METHOD_START \
local_engine::JniEnvStatusWatcher::instance().setInactive(); \
local_engine::JniMethodCallCounter::instance().waitForZero(); \
do \
{ \
try \
{

#define LOCAL_ENGINE_JNI_METHOD_START \
local_engine::JniMethodGuard jni_method_guard; \
do \
{ \
try \
{ \
if (!jni_method_guard.couldInvoke()) \
{ \
LOG_ERROR(getLogger("jni"), "Call JNI method {} when JNIEnv is not active!", __FUNCTION__); \
break; \
}

#define LOCAL_ENGINE_JNI_METHOD_END(env, ret) \
} \
catch (DB::Exception & e) \
{ \
local_engine::JniErrorsGlobalState::instance().throwException(env, e); \
break; \
} \
catch (std::exception & e) \
{ \
local_engine::JniErrorsGlobalState::instance().throwException(env, e); \
break; \
} \
catch (...) \
{ \
DB::WriteBufferFromOwnString ostr; \
auto trace = boost::stacktrace::stacktrace(); \
boost::stacktrace::detail::to_string(&trace.as_vector()[0], trace.size()); \
local_engine::JniErrorsGlobalState::instance().throwRuntimeException(env, "Unknown Exception", ostr.str().c_str()); \
break; \
} \
} while (0); \
return ret;

#define LOCAL_ENGINE_JNI_JMETHOD_START
#define LOCAL_ENGINE_JNI_JMETHOD_END(env) \
if ((env)->ExceptionCheck()) \
Expand Down
23 changes: 0 additions & 23 deletions cpp-ch/local-engine/jni/jni_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,4 @@ class JniErrorsGlobalState : boost::noncopyable
};
//

#define LOCAL_ENGINE_JNI_METHOD_START \
try \
{
#define LOCAL_ENGINE_JNI_METHOD_END(env, ret) \
} \
catch (DB::Exception & e) \
{ \
local_engine::JniErrorsGlobalState::instance().throwException(env, e); \
return ret; \
} \
catch (std::exception & e) \
{ \
local_engine::JniErrorsGlobalState::instance().throwException(env, e); \
return ret; \
} \
catch (...) \
{ \
DB::WriteBufferFromOwnString ostr; \
auto trace = boost::stacktrace::stacktrace(); \
boost::stacktrace::detail::to_string(&trace.as_vector()[0], trace.size()); \
local_engine::JniErrorsGlobalState::instance().throwRuntimeException(env, "Unknown Exception", ostr.str().c_str()); \
return ret; \
}
}
Loading