diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index f791ed03d5b..e0d1707269a 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -2539,22 +2539,38 @@ bool thread_group_t::HasActiveWorker() const #ifdef ELOQ_MODULE_ENABLED void MariaModule::ExtThdStart(int thd_id) { - if (thd_id < 0 || (size_t) thd_id >= groups_.size()) + thread_group_t *group = nullptr; + { + std::lock_guard lk(groups_mutex_); + if (thd_id >= 0 && (size_t)thd_id < groups_.size()) { + group = groups_[thd_id]; + } + } + + if (group == nullptr) { return; } SetIsSqlThd(false); - groups_[thd_id]->ext_worker_active_.store(true, std::memory_order_relaxed); + group->ext_worker_active_.store(true, std::memory_order_relaxed); } void MariaModule::ExtThdEnd(int thd_id) { - if (thd_id < 0 || (size_t) thd_id >= groups_.size()) + thread_group_t *group = nullptr; + { + std::lock_guard lk(groups_mutex_); + if (thd_id >= 0 && (size_t)thd_id < groups_.size()) { + group = groups_[thd_id]; + } + } + + if (group == nullptr) { return; } - thread_group_t *group = groups_[thd_id]; + group->ext_worker_active_.store(false, std::memory_order_relaxed); mysql_mutex_lock(&group->mutex); @@ -2575,13 +2591,15 @@ void MariaModule::ExtThdEnd(int thd_id) void MariaModule::Process(int thd_id) { - if (thd_id < 0 || (size_t) thd_id >= groups_.size()) + thread_group_t *group = nullptr; { - return; + std::lock_guard lk(groups_mutex_); + if (thd_id >= 0 && (size_t)thd_id < groups_.size()) { + group = groups_[thd_id]; + } } - thread_group_t *group = groups_[thd_id]; - if (group->shutdown.load(std::memory_order_relaxed)) + if (group == nullptr || group->shutdown.load(std::memory_order_relaxed)) { return; } @@ -2673,12 +2691,19 @@ void MariaModule::Process(int thd_id) bool MariaModule::HasTask(int thd_id) const { - if (thd_id < 0 || (size_t) thd_id >= groups_.size()) + thread_group_t *group = nullptr; + { + std::lock_guard lk(groups_mutex_); + if (thd_id >= 0 && (size_t)thd_id < groups_.size()) { + group = groups_[thd_id]; + } + } + + if (group == nullptr) { return false; } - thread_group_t *group= groups_[thd_id]; CoroutineInfo *coro_info= group->coroutine_info_.get(); return !coro_info->req_queue_.IsEmpty() || !coro_info->resume_queue_.IsEmpty() || diff --git a/sql/threadpool_generic.h b/sql/threadpool_generic.h index 5502540dbb7..5c553d569a9 100644 --- a/sql/threadpool_generic.h +++ b/sql/threadpool_generic.h @@ -251,11 +251,22 @@ class MariaModule : public eloq::EloqModule void Process(int thd_id) override; bool HasTask(int thd_id) const override; - void ResizeGroups(size_t size) { groups_.resize(size, nullptr); } - void SetGroup(size_t gid, thread_group_t *group) { groups_[gid]= group; } + void ResizeGroups(size_t size) + { + std::lock_guard lk(groups_mutex_); + groups_.resize(size, nullptr); + } + void SetGroup(size_t gid, thread_group_t *group) + { + std::lock_guard lk(groups_mutex_); + if (gid < groups_.size()) { + groups_[gid] = group; + } + } private: std::vector groups_; + mutable std::mutex groups_mutex_; }; #endif