From d11b928e2201d6b284598daae0627f0ae017dda6 Mon Sep 17 00:00:00 2001 From: Chen Zhao Date: Tue, 17 Mar 2026 18:48:25 +0800 Subject: [PATCH 1/3] feat: execute droptable and reopen tasks in batch --- include/kv_options.h | 5 + src/eloq_store.cpp | 260 ++++++++++++++++++++++++++++++++----------- src/kv_options.cpp | 6 + 3 files changed, 203 insertions(+), 68 deletions(-) diff --git a/include/kv_options.h b/include/kv_options.h index 66d4b94e..150864dd 100644 --- a/include/kv_options.h +++ b/include/kv_options.h @@ -91,6 +91,11 @@ struct KvOptions * @brief The maximum number of running archive tasks at the same time. */ uint16_t max_archive_tasks = 256; + /** + * @brief Maximum number of per-partition requests submitted at a time by + * global operations such as global archive/reopen. + */ + uint32_t max_global_request_batch = 1000; /** * @brief Move pages in data file that space amplification factor * bigger than this value. diff --git a/src/eloq_store.cpp b/src/eloq_store.cpp index 8dfa746d..698f0661 100644 --- a/src/eloq_store.cpp +++ b/src/eloq_store.cpp @@ -930,40 +930,98 @@ void EloqStore::HandleDropTableRequest(DropTableRequest *req) req->pending_.store(static_cast(partitions.size()), std::memory_order_relaxed); - auto on_truncate_done = [req](KvRequest *sub_req) + for (const TableIdent &partition : partitions) + { + auto trunc_req = std::make_unique(); + trunc_req->SetArgs(partition, std::string_view{}); + req->truncate_reqs_.push_back(std::move(trunc_req)); + } + + struct DropTableScheduleState + : public std::enable_shared_from_this { - KvError sub_err = sub_req->Error(); - if (sub_err != KvError::NoError) + EloqStore *store = nullptr; + DropTableRequest *req = nullptr; + size_t total = 0; + std::atomic next_index{0}; + + bool HandleTruncateResult(KvError sub_err) { - uint8_t expected = static_cast(KvError::NoError); - uint8_t desired = static_cast(sub_err); - req->first_error_.compare_exchange_strong( - expected, - desired, - std::memory_order_relaxed, - std::memory_order_relaxed); + if (sub_err != KvError::NoError) + { + uint8_t expected = static_cast(KvError::NoError); + uint8_t desired = static_cast(sub_err); + req->first_error_.compare_exchange_strong( + expected, + desired, + std::memory_order_relaxed, + std::memory_order_relaxed); + } + if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + KvError final_err = static_cast( + req->first_error_.load(std::memory_order_relaxed)); + req->SetDone(final_err); + return true; + } + return false; } - if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1) + + void OnTruncateDone(KvRequest *sub_req) { - KvError final_err = static_cast( - req->first_error_.load(std::memory_order_relaxed)); - req->SetDone(final_err); + if (HandleTruncateResult(sub_req->Error())) + { + return; + } + ScheduleNext(); } - }; - req->truncate_reqs_.reserve(partitions.size()); - for (const TableIdent &partition : partitions) - { - auto trunc_req = std::make_unique(); - trunc_req->SetArgs(partition, std::string_view{}); - TruncateRequest *ptr = trunc_req.get(); - req->truncate_reqs_.push_back(std::move(trunc_req)); - if (!ExecAsyn(ptr, 0, on_truncate_done)) + void ScheduleNext() { - LOG(ERROR) - << "Handle droptable request, enqueue truncate request fail"; - ptr->SetDone(KvError::NotRunning); + while (true) + { + size_t idx = next_index.fetch_add(1, std::memory_order_relaxed); + if (idx >= total) + { + return; + } + + TruncateRequest *ptr = req->truncate_reqs_[idx].get(); + auto self = shared_from_this(); + auto on_truncate_done = [self](KvRequest *sub_req) + { self->OnTruncateDone(sub_req); }; + if (store->ExecAsyn(ptr, 0, on_truncate_done)) + { + return; + } + + LOG(ERROR) << "Handle droptable request, enqueue truncate " + "request fail"; + ptr->callback_ = nullptr; + ptr->SetDone(KvError::NotRunning); + if (HandleTruncateResult(KvError::NotRunning)) + { + return; + } + } } + }; + + auto state = std::make_shared(); + state->store = this; + state->req = req; + state->total = req->truncate_reqs_.size(); + + size_t max_inflight = + std::max(options_.max_global_request_batch, 1); + if (max_inflight > state->total) + { + max_inflight = state->total; + } + + for (size_t i = 0; i < max_inflight; ++i) + { + state->ScheduleNext(); } } @@ -1211,6 +1269,10 @@ void EloqStore::HandleGlobalArchiveRequest(GlobalArchiveRequest *req) { max_inflight = 1; } + max_inflight = std::min( + max_inflight, + static_cast(std::max( + options_.max_global_request_batch, 1))); if (max_inflight > state->total) { max_inflight = state->total; @@ -1334,59 +1396,121 @@ void EloqStore::HandleGlobalReopenRequest(GlobalReopenRequest *req) req->pending_.store(static_cast(partitions.size()), std::memory_order_relaxed); - auto on_reopen_done = [req](KvRequest *sub_req) - { - auto *reopen_req = static_cast(sub_req); - KvError sub_err = reopen_req->Error(); - if (sub_err != KvError::NoError) - { - LOG(ERROR) << "HandleGlobalReopenRequest sub request failed, table " - << reopen_req->TableId() << ", tag " << reopen_req->Tag() - << ", error " << static_cast(sub_err) - << ", msg " << reopen_req->ErrMessage(); - uint8_t expected = static_cast(KvError::NoError); - uint8_t desired = static_cast(sub_err); - req->first_error_.compare_exchange_strong( - expected, - desired, - std::memory_order_relaxed, - std::memory_order_relaxed); - } - else - { - DLOG(INFO) << "HandleGlobalReopenRequest sub request succeeded, " - << "table " << reopen_req->TableId() << ", tag " - << reopen_req->Tag(); - } - if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1) - { - KvError final_err = static_cast( - req->first_error_.load(std::memory_order_relaxed)); - DLOG(INFO) << "HandleGlobalReopenRequest finish, tag " << req->Tag() - << ", final_error " << static_cast(final_err); - req->SetDone(final_err); - } - }; - for (const TableIdent &partition : partitions) { - DLOG(INFO) << "HandleGlobalReopenRequest enqueue partition " - << partition << ", tag " << req->Tag(); auto reopen_req = std::make_unique(); reopen_req->SetArgs(partition); if (!req->Tag().empty()) { reopen_req->SetTag(req->Tag()); } - ReopenRequest *ptr = reopen_req.get(); req->reopen_reqs_.push_back(std::move(reopen_req)); - if (!ExecAsyn(ptr, 0, on_reopen_done)) + } + + struct ReopenScheduleState + : public std::enable_shared_from_this + { + EloqStore *store = nullptr; + GlobalReopenRequest *req = nullptr; + size_t total = 0; + std::atomic next_index{0}; + + bool HandleReopenResult(ReopenRequest *reopen_req) { - LOG(ERROR) - << "Handle global reopen request, enqueue reopen request fail, " - << "partition " << partition << ", tag " << req->Tag(); - ptr->SetDone(KvError::NotRunning); + KvError sub_err = reopen_req->Error(); + if (sub_err != KvError::NoError) + { + LOG(ERROR) << "HandleGlobalReopenRequest sub request failed, " + << "table " << reopen_req->TableId() << ", tag " + << reopen_req->Tag() << ", error " + << static_cast(sub_err) << ", msg " + << reopen_req->ErrMessage(); + uint8_t expected = static_cast(KvError::NoError); + uint8_t desired = static_cast(sub_err); + req->first_error_.compare_exchange_strong( + expected, + desired, + std::memory_order_relaxed, + std::memory_order_relaxed); + } + else + { + DLOG(INFO) << "HandleGlobalReopenRequest sub request " + << "succeeded, table " << reopen_req->TableId() + << ", tag " << reopen_req->Tag(); + } + if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + KvError final_err = static_cast( + req->first_error_.load(std::memory_order_relaxed)); + DLOG(INFO) << "HandleGlobalReopenRequest finish, tag " + << req->Tag() << ", final_error " + << static_cast(final_err); + req->SetDone(final_err); + return true; + } + return false; } + + void OnReopenDone(KvRequest *sub_req) + { + auto *reopen_req = static_cast(sub_req); + if (HandleReopenResult(reopen_req)) + { + return; + } + ScheduleNext(); + } + + void ScheduleNext() + { + while (true) + { + size_t idx = next_index.fetch_add(1, std::memory_order_relaxed); + if (idx >= total) + { + return; + } + + ReopenRequest *ptr = req->reopen_reqs_[idx].get(); + DLOG(INFO) << "HandleGlobalReopenRequest enqueue partition " + << ptr->TableId() << ", tag " << req->Tag(); + auto self = shared_from_this(); + auto on_reopen_done = [self](KvRequest *sub_req) + { self->OnReopenDone(sub_req); }; + if (store->ExecAsyn(ptr, 0, on_reopen_done)) + { + return; + } + + LOG(ERROR) << "Handle global reopen request, enqueue reopen " + "request fail, partition " + << ptr->TableId() << ", tag " << req->Tag(); + ptr->callback_ = nullptr; + ptr->SetDone(KvError::NotRunning); + if (HandleReopenResult(ptr)) + { + return; + } + } + } + }; + + auto state = std::make_shared(); + state->store = this; + state->req = req; + state->total = req->reopen_reqs_.size(); + + size_t max_inflight = + std::max(options_.max_global_request_batch, 1); + if (max_inflight > state->total) + { + max_inflight = state->total; + } + + for (size_t i = 0; i < max_inflight; ++i) + { + state->ScheduleNext(); } } diff --git a/src/kv_options.cpp b/src/kv_options.cpp index 216c3786..e4ff3465 100644 --- a/src/kv_options.cpp +++ b/src/kv_options.cpp @@ -175,6 +175,11 @@ int KvOptions::LoadFromIni(const char *path) max_archive_tasks = reader.GetUnsigned(sec_run, "max_archive_tasks", 256); } + if (reader.HasValue(sec_run, "max_global_request_batch")) + { + max_global_request_batch = reader.GetUnsigned( + sec_run, "max_global_request_batch", max_global_request_batch); + } if (reader.HasValue(sec_run, "file_amplify_factor")) { file_amplify_factor = @@ -392,6 +397,7 @@ bool KvOptions::operator==(const KvOptions &other) const num_retained_archives == other.num_retained_archives && archive_interval_secs == other.archive_interval_secs && max_archive_tasks == other.max_archive_tasks && + max_global_request_batch == other.max_global_request_batch && file_amplify_factor == other.file_amplify_factor && local_space_limit == other.local_space_limit && reserve_space_ratio == other.reserve_space_ratio && From c85978c7fd16a670475038697509a473727e0018 Mon Sep 17 00:00:00 2001 From: Chen Zhao Date: Tue, 17 Mar 2026 18:49:52 +0800 Subject: [PATCH 2/3] fmt --- src/eloq_store.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/eloq_store.cpp b/src/eloq_store.cpp index 698f0661..52a498ab 100644 --- a/src/eloq_store.cpp +++ b/src/eloq_store.cpp @@ -1269,10 +1269,9 @@ void EloqStore::HandleGlobalArchiveRequest(GlobalArchiveRequest *req) { max_inflight = 1; } - max_inflight = std::min( - max_inflight, - static_cast(std::max( - options_.max_global_request_batch, 1))); + max_inflight = std::min(max_inflight, + static_cast(std::max( + options_.max_global_request_batch, 1))); if (max_inflight > state->total) { max_inflight = state->total; From c7d1941892da8bbb458b8b8b49a6534469b46506 Mon Sep 17 00:00:00 2001 From: Chen Zhao Date: Wed, 18 Mar 2026 12:51:07 +0800 Subject: [PATCH 3/3] fix --- src/eloq_store.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/eloq_store.cpp b/src/eloq_store.cpp index 52a498ab..235616ea 100644 --- a/src/eloq_store.cpp +++ b/src/eloq_store.cpp @@ -66,6 +66,11 @@ bool EloqStore::ValidateOptions(KvOptions &opts) LOG(ERROR) << "Option max_inflight_write cannot be zero"; return false; } + if (opts.max_global_request_batch == 0) + { + LOG(ERROR) << "Option max_global_request_batch cannot be zero"; + return false; + } if ((opts.data_page_size & (page_align - 1)) != 0) { LOG(ERROR) << "Option data_page_size is not page aligned";