Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions sql/threadpool_generic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2539,22 +2539,38 @@ bool thread_group_t::HasActiveWorker() const
#ifdef ELOQ_MODULE_ENABLED
void MariaModule::ExtThdStart(int thd_id)
{
if (thd_id < 0 || (size_t) thd_id >= groups_.size())
thread_group_t *group = nullptr;
{
std::lock_guard<std::mutex> lk(groups_mutex_);
if (thd_id >= 0 && (size_t)thd_id < groups_.size()) {
group = groups_[thd_id];
}
}

if (group == nullptr)
{
return;
}

SetIsSqlThd(false);
groups_[thd_id]->ext_worker_active_.store(true, std::memory_order_relaxed);
group->ext_worker_active_.store(true, std::memory_order_relaxed);
}

void MariaModule::ExtThdEnd(int thd_id)
{
if (thd_id < 0 || (size_t) thd_id >= groups_.size())
thread_group_t *group = nullptr;
{
std::lock_guard<std::mutex> lk(groups_mutex_);
if (thd_id >= 0 && (size_t)thd_id < groups_.size()) {
group = groups_[thd_id];
}
}

if (group == nullptr)
{
return;
}
thread_group_t *group = groups_[thd_id];

group->ext_worker_active_.store(false, std::memory_order_relaxed);

mysql_mutex_lock(&group->mutex);
Expand All @@ -2575,13 +2591,15 @@ void MariaModule::ExtThdEnd(int thd_id)

void MariaModule::Process(int thd_id)
{
if (thd_id < 0 || (size_t) thd_id >= groups_.size())
thread_group_t *group = nullptr;
{
return;
std::lock_guard<std::mutex> lk(groups_mutex_);
if (thd_id >= 0 && (size_t)thd_id < groups_.size()) {
group = groups_[thd_id];
}
}

thread_group_t *group = groups_[thd_id];
if (group->shutdown.load(std::memory_order_relaxed))
if (group == nullptr || group->shutdown.load(std::memory_order_relaxed))
{
return;
}
Expand Down Expand Up @@ -2673,12 +2691,19 @@ void MariaModule::Process(int thd_id)

bool MariaModule::HasTask(int thd_id) const
{
if (thd_id < 0 || (size_t) thd_id >= groups_.size())
thread_group_t *group = nullptr;
{
std::lock_guard<std::mutex> lk(groups_mutex_);
if (thd_id >= 0 && (size_t)thd_id < groups_.size()) {
group = groups_[thd_id];
}
}

if (group == nullptr)
{
return false;
}

thread_group_t *group= groups_[thd_id];
CoroutineInfo *coro_info= group->coroutine_info_.get();
return !coro_info->req_queue_.IsEmpty() ||
!coro_info->resume_queue_.IsEmpty() ||
Expand Down
15 changes: 13 additions & 2 deletions sql/threadpool_generic.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,22 @@ class MariaModule : public eloq::EloqModule
void Process(int thd_id) override;
bool HasTask(int thd_id) const override;

void ResizeGroups(size_t size) { groups_.resize(size, nullptr); }
void SetGroup(size_t gid, thread_group_t *group) { groups_[gid]= group; }
void ResizeGroups(size_t size)
{
std::lock_guard<std::mutex> lk(groups_mutex_);
groups_.resize(size, nullptr);
}
void SetGroup(size_t gid, thread_group_t *group)
{
std::lock_guard<std::mutex> lk(groups_mutex_);
if (gid < groups_.size()) {
groups_[gid] = group;
}
}

private:
std::vector<thread_group_t *> groups_;
mutable std::mutex groups_mutex_;
};
#endif

Expand Down