From 92c2613d4fe049b8b3dd424319fe5164f861df73 Mon Sep 17 00:00:00 2001 From: Chen Zhao Date: Wed, 18 Mar 2026 12:52:52 +0800 Subject: [PATCH] update --- include/async_io_manager.h | 1 + include/eloq_store.h | 63 ++++++++++++++++++++++++- include/error.h | 3 ++ include/kv_options.h | 16 ++++--- include/storage/shard.h | 16 +++++++ include/tasks/task.h | 9 ++++ include/tasks/task_manager.h | 20 ++++++++ include/types.h | 2 + src/async_io_manager.cpp | 31 ++++++++++++- src/eloq_store.cpp | 73 ++++++++++++++++++++++++++--- src/standby_service.cpp | 2 +- src/storage/shard.cpp | 90 +++++++++++++++++++++++++++++++++++- src/tasks/prewarm_task.cpp | 3 +- src/tasks/reopen_task.cpp | 2 +- src/tasks/task.cpp | 4 ++ tests/cloud.cpp | 28 ++++++++--- tests/manifest.cpp | 9 +++- 17 files changed, 344 insertions(+), 28 deletions(-) diff --git a/include/async_io_manager.h b/include/async_io_manager.h index 4eb1aee2..5b140d5d 100644 --- a/include/async_io_manager.h +++ b/include/async_io_manager.h @@ -355,6 +355,7 @@ class AsyncIoManager }; KvError ToKvError(int err_no); +int KvErrorToIoResult(KvError err); class IouringMgr : public AsyncIoManager { diff --git a/include/eloq_store.h b/include/eloq_store.h index 9c0d4242..152a1fd5 100644 --- a/include/eloq_store.h +++ b/include/eloq_store.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -51,7 +52,9 @@ enum class RequestType : uint8_t CleanExpired, GlobalArchive, GlobalReopen, - GlobalListArchiveTags + GlobalListArchiveTags, + ChangePartition, + ApplyPartitionChange }; inline const char *RequestTypeToString(RequestType type) @@ -90,6 +93,10 @@ inline const char *RequestTypeToString(RequestType type) return "global_reopen"; case RequestType::GlobalListArchiveTags: return "global_list_archive_tags"; + case RequestType::ChangePartition: + return "change_partition"; + case RequestType::ApplyPartitionChange: + return "apply_partition_change"; default: return "unknown"; } @@ -617,6 +624,55 @@ class GlobalListArchiveTagsRequest : public KvRequest friend class EloqStore; }; +class ChangePartitionRequest : public KvRequest +{ +public: + RequestType Type() const override + { + return RequestType::ChangePartition; + } + + void SetPartitionFilter(PartitionFilter partition_filter) + { + if (partition_filter) + { + partition_filter_ = std::make_shared( + std::move(partition_filter)); + } + else + { + partition_filter_.reset(); + } + } + + void SetStopStore(bool stop_store) + { + stop_store_ = stop_store; + } + +private: + std::shared_ptr partition_filter_{nullptr}; + bool stop_store_{false}; + + friend class EloqStore; +}; + +class ApplyPartitionChangeRequest : public KvRequest +{ +public: + RequestType Type() const override + { + return RequestType::ApplyPartitionChange; + } + +private: + std::shared_ptr partition_filter_{nullptr}; + std::atomic *pending_{nullptr}; + + friend class EloqStore; + friend class Shard; +}; + class CompactRequest : public WriteRequest { public: @@ -747,6 +803,10 @@ class EloqStore void HandleGlobalArchiveRequest(GlobalArchiveRequest *req); void HandleGlobalReopenRequest(GlobalReopenRequest *req); void HandleGlobalListArchiveTagsRequest(GlobalListArchiveTagsRequest *req); + void HandleChangePartitionRequest(ChangePartitionRequest *req); + std::optional ResolvePartitionGroup( + const TableIdent &tbl_id) const; + bool PartitionIncluded(const TableIdent &tbl_id) const; KvError CollectTablePartitions(const std::string &table_name, std::vector &partitions) const; KvError InitStoreSpace(); @@ -773,6 +833,7 @@ class EloqStore bool enable_eloqstore_metrics_{false}; std::atomic store_mode_{StoreMode::Local}; + std::shared_ptr partition_filter_{nullptr}; friend class Shard; friend class AsyncIoManager; diff --git a/include/error.h b/include/error.h index 1859f6ad..3a31604e 100644 --- a/include/error.h +++ b/include/error.h @@ -27,6 +27,7 @@ enum struct KvError : uint8_t NoPermission, // Permission denied (EPERM). CloudErr, // Cloud service error (non-timeout HTTP/CURL). IoFail, // Unclassified local I/O error. + Aborted, // Operation aborted after in-flight I/O drained. ExpiredTerm, // Cloud term file indicates stale process term. OssInsufficientStorage, // Object storage out of capacity (HTTP 507). }; @@ -59,6 +60,8 @@ constexpr const char *ErrorString(KvError err) return "Device or resource busy"; case KvError::IoFail: return "I/O failure"; + case KvError::Aborted: + return "Operation aborted"; case KvError::CloudErr: return "Cloud service is unavailable"; case KvError::Timeout: diff --git a/include/kv_options.h b/include/kv_options.h index 66d4b94e..95ad12e6 100644 --- a/include/kv_options.h +++ b/include/kv_options.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -20,6 +21,9 @@ constexpr int64_t TB = 1LL << 40; constexpr uint8_t max_overflow_pointers = 128; constexpr uint16_t max_read_pages_batch = max_overflow_pointers; +using PartitionFilter = + std::function(const TableIdent &)>; + struct KvOptions { int LoadFromIni(const char *path); @@ -304,15 +308,15 @@ struct KvOptions uint16_t prewarm_task_count = 3; /** - * @brief Filter function to determine which partitions belong to this - * instance. - * The filter returning true means that this table partition belongs to the - * current instance and should be included in operations like prewarming, - * snapshotting, etc. + * @brief Classify which partition group a table partition belongs to. + * Returning std::nullopt means that this partition does not belong to this + * instance. If the function returns a partition group id, runtime + * partition-group state decides whether that partition is currently active + * for operations like prewarming, snapshotting, and reopen. * If not set (empty), all partitions are considered to belong to this * instance. */ - std::function partition_filter; + PartitionFilter partition_filter; }; } // namespace eloqstore diff --git a/include/storage/shard.h b/include/storage/shard.h index 872e6818..f7e1a241 100644 --- a/include/storage/shard.h +++ b/include/storage/shard.h @@ -77,6 +77,22 @@ class Shard void OnTaskFinished(KvTask *task); void OnReceivedReq(KvRequest *req); bool ProcessReq(KvRequest *req); + static bool ShouldCheckPartitionFilter(RequestType type) + { + switch (type) + { + case RequestType::ListObject: + case RequestType::ListStandbyPartition: + case RequestType::GlobalArchive: + case RequestType::GlobalReopen: + case RequestType::GlobalListArchiveTags: + case RequestType::ChangePartition: + case RequestType::ApplyPartitionChange: + return false; + default: + return true; + } + } void TryStartPendingWrite(const TableIdent &tbl_id); void TryDispatchPendingWrites(); diff --git a/include/tasks/task.h b/include/tasks/task.h index 41ce8829..95656ba1 100644 --- a/include/tasks/task.h +++ b/include/tasks/task.h @@ -115,10 +115,19 @@ class KvTask int WaitIoResult(); void WaitIo(); void FinishIo(); + void ForceAbort() + { + force_aborted_ = true; + } + bool ForceAborted() const + { + return force_aborted_; + } uint32_t inflight_io_{0}; int io_res_{0}; uint32_t io_flags_{0}; + bool force_aborted_{false}; TaskStatus status_{TaskStatus::Idle}; KvRequest *req_{nullptr}; diff --git a/include/tasks/task_manager.h b/include/tasks/task_manager.h index e87f2d60..06be9507 100644 --- a/include/tasks/task_manager.h +++ b/include/tasks/task_manager.h @@ -44,6 +44,26 @@ class TaskManager void FinishExternalTask(); size_t NumActive() const; + template + void ForEachActiveTask(F &&visitor) + { + auto visit = [&](KvTask *task) + { + if (task->status_ == TaskStatus::Idle || + task->status_ == TaskStatus::Finished) + { + return; + } + visitor(task); + }; + batch_write_pool_.ForEachTask(visit); + bg_write_pool_.ForEachTask(visit); + read_pool_.ForEachTask(visit); + scan_pool_.ForEachTask(visit); + list_object_pool_.ForEachTask(visit); + list_standby_partition_pool_.ForEachTask(visit); + reopen_pool_.ForEachTask(visit); + } void Shutdown(); diff --git a/include/types.h b/include/types.h index 80d41b55..3e7e20ff 100644 --- a/include/types.h +++ b/include/types.h @@ -14,6 +14,8 @@ namespace eloqstore { +using PartitionGroupId = uint32_t; + enum class StoreMode { Local = 0, diff --git a/src/async_io_manager.cpp b/src/async_io_manager.cpp index 78c7db5f..3973273e 100644 --- a/src/async_io_manager.cpp +++ b/src/async_io_manager.cpp @@ -992,12 +992,41 @@ KvError ToKvError(int err_no) return KvError::OpenFileLimit; case -ENOSPC: return KvError::OutOfSpace; + case -ECANCELED: + return KvError::Aborted; default: LOG(ERROR) << "ToKvError: " << err_no; return KvError::IoFail; } } +int KvErrorToIoResult(KvError err) +{ + switch (err) + { + case KvError::NoError: + return 0; + case KvError::NoPermission: + return -EPERM; + case KvError::NotFound: + return -ENOENT; + case KvError::TryAgain: + return -EAGAIN; + case KvError::OutOfMem: + return -ENOMEM; + case KvError::Busy: + return -EBUSY; + case KvError::OpenFileLimit: + return -EMFILE; + case KvError::OutOfSpace: + return -ENOSPC; + case KvError::Aborted: + return -ECANCELED; + default: + return -EIO; + } +} + std::pair IouringMgr::DecodeUserData( uint64_t user_data) { @@ -4673,7 +4702,7 @@ std::pair StandbyStoreMgr::GetManifest( return {nullptr, prep_err}; } current_task->WaitIo(); - prep_err = static_cast(current_task->io_res_); + prep_err = ToKvError(current_task->io_res_); if (prep_err != KvError::NoError) { return {nullptr, prep_err}; diff --git a/src/eloq_store.cpp b/src/eloq_store.cpp index 8dfa746d..48a53ecb 100644 --- a/src/eloq_store.cpp +++ b/src/eloq_store.cpp @@ -381,6 +381,11 @@ EloqStore::EloqStore(const KvOptions &opts) : options_(opts) { LOG(FATAL) << "Invalid KvOptions configuration"; } + if (options_.partition_filter) + { + partition_filter_ = + std::make_shared(options_.partition_filter); + } } EloqStore::~EloqStore() @@ -1033,8 +1038,7 @@ void EloqStore::HandleGlobalArchiveRequest(GlobalArchiveRequest *req) continue; } - if (options_.partition_filter && - !options_.partition_filter(tbl_id)) + if (!PartitionIncluded(tbl_id)) { continue; } @@ -1087,8 +1091,7 @@ void EloqStore::HandleGlobalArchiveRequest(GlobalArchiveRequest *req) continue; } - if (options_.partition_filter && - !options_.partition_filter(tbl_id)) + if (!PartitionIncluded(tbl_id)) { continue; } @@ -1262,7 +1265,7 @@ void EloqStore::HandleGlobalReopenRequest(GlobalReopenRequest *req) { continue; } - if (options_.partition_filter && !options_.partition_filter(tbl_id)) + if (!PartitionIncluded(tbl_id)) { continue; } @@ -1309,8 +1312,7 @@ void EloqStore::HandleGlobalReopenRequest(GlobalReopenRequest *req) continue; } - if (options_.partition_filter && - !options_.partition_filter(tbl_id)) + if (!PartitionIncluded(tbl_id)) { continue; } @@ -1523,11 +1525,68 @@ bool EloqStore::SendRequest(KvRequest *req) static_cast(req)); return true; } + if (req->Type() == RequestType::ChangePartition) + { + HandleChangePartitionRequest(static_cast(req)); + return true; + } Shard *shard = shards_[req->TableId().ShardIndex(shards_.size())].get(); return shard->AddKvRequest(req); } +std::optional EloqStore::ResolvePartitionGroup( + const TableIdent &tbl_id) const +{ + std::shared_ptr partition_filter = + std::atomic_load(&partition_filter_); + if (!partition_filter) + { + return PartitionGroupId{0}; + } + return (*partition_filter)(tbl_id); +} + +bool EloqStore::PartitionIncluded(const TableIdent &tbl_id) const +{ + return ResolvePartitionGroup(tbl_id).has_value(); +} + +void EloqStore::HandleChangePartitionRequest(ChangePartitionRequest *req) +{ + std::vector> sub_reqs; + sub_reqs.reserve(shards_.size()); + std::atomic pending(static_cast(shards_.size())); + + for (auto &shard : shards_) + { + auto sub_req = std::make_unique(); + sub_req->partition_filter_ = req->partition_filter_; + sub_req->pending_ = &pending; + if (!shard->AddKvRequest(sub_req.get())) + { + req->SetDone(KvError::Busy); + return; + } + sub_reqs.push_back(std::move(sub_req)); + } + + while (pending.load(std::memory_order_acquire) > 0) + { +#ifdef ELOQ_MODULE_ENABLED + bthread_usleep(1000); +#else + std::this_thread::sleep_for(std::chrono::milliseconds(1)); +#endif + } + + if (req->stop_store_) + { + Stop(); + } + req->SetDone(KvError::NoError); +} + void EloqStore::Stop() { while (true) diff --git a/src/standby_service.cpp b/src/standby_service.cpp index 4b06b4d3..95737ea5 100644 --- a/src/standby_service.cpp +++ b/src/standby_service.cpp @@ -552,7 +552,7 @@ void StandbyService::ProcessReadyTasks(size_t shard_id) << shard_id << ", task=" << completion.task << ", result=" << static_cast(completion.result); } - completion.task->io_res_ = static_cast(completion.result); + completion.task->io_res_ = KvErrorToIoResult(completion.result); completion.task->FinishIo(); if (completion.task->Type() == TaskType::ListStandbyPartition) { diff --git a/src/storage/shard.cpp b/src/storage/shard.cpp index 4dbecb4e..af264a74 100644 --- a/src/storage/shard.cpp +++ b/src/storage/shard.cpp @@ -422,6 +422,16 @@ void Shard::TryDispatchPendingWrites() bool Shard::ProcessReq(KvRequest *req) { + if (ShouldCheckPartitionFilter(req->Type())) + { + const TableIdent &tbl_id = req->TableId(); + if (tbl_id.IsValid() && !store_->PartitionIncluded(tbl_id)) + { + req->SetDone(KvError::NotRunning); + return true; + } + } + switch (req->Type()) { case RequestType::Read: @@ -537,7 +547,7 @@ bool Shard::ProcessReq(KvRequest *req) DLOG(INFO) << "Shard::ProcessReq ListStandbyPartition done, req=" << req << ", task=" << current_task << ", io_res=" << current_task->io_res_; - return static_cast(current_task->io_res_); + return ToKvError(current_task->io_res_); }; StartTask(task, req, lbd); return true; @@ -636,6 +646,84 @@ bool Shard::ProcessReq(KvRequest *req) req->SetDone(KvError::InvalidArgs); return true; } + case RequestType::ChangePartition: + { + LOG(ERROR) << "ChangePartition request routed to shard unexpectedly"; + req->SetDone(KvError::InvalidArgs); + return true; + } + case RequestType::ApplyPartitionChange: + { + auto *change_req = static_cast(req); + auto partition_included = [&](const TableIdent &tbl_id) -> bool + { + if (!change_req->partition_filter_) + { + return true; + } + return (*change_req->partition_filter_)(tbl_id).has_value(); + }; + + task_mgr_.ForEachActiveTask( + [&](KvTask *task) + { + if (task->req_ == nullptr) + { + return; + } + const TableIdent &tbl_id = task->req_->TableId(); + if (!tbl_id.IsValid()) + { + return; + } + if (!partition_included(tbl_id)) + { + task->ForceAbort(); + } + }); + + std::vector empty_queues; + empty_queues.reserve(pending_queues_.size()); + for (auto &[tbl_id, pending_q] : pending_queues_) + { + if (partition_included(tbl_id)) + { + continue; + } + + while (WriteRequest *queued_req = pending_q.PopFront(); + queued_req != nullptr) + { + queued_req->SetDone(KvError::NotRunning); + } + + if (!pending_q.running_) + { + empty_queues.push_back(tbl_id); + } + } + + for (const TableIdent &tbl_id : empty_queues) + { + auto it = pending_queues_.find(tbl_id); + if (it == pending_queues_.end()) + { + continue; + } + if (!it->second.running_ && it->second.Empty()) + { + pending_queues_.erase(it); + --running_writing_tasks_; + } + } + + auto *store = const_cast(store_); + std::atomic_store(&store->partition_filter_, + change_req->partition_filter_); + change_req->pending_->fetch_sub(1, std::memory_order_acq_rel); + req->SetDone(KvError::NoError); + return true; + } case RequestType::Compact: { BackgroundWrite *task = task_mgr_.GetBackgroundWrite(req->TableId()); diff --git a/src/tasks/prewarm_task.cpp b/src/tasks/prewarm_task.cpp index 78cf5272..d4d98cef 100644 --- a/src/tasks/prewarm_task.cpp +++ b/src/tasks/prewarm_task.cpp @@ -486,8 +486,7 @@ void PrewarmService::PrewarmCloudCache(const std::string &remote_path) total_files_skipped++; continue; } - if (store_->options_.partition_filter && - !store_->options_.partition_filter(tbl_id)) + if (!store_->PartitionIncluded(tbl_id)) { total_files_skipped++; continue; diff --git a/src/tasks/reopen_task.cpp b/src/tasks/reopen_task.cpp index a2627818..f12cc5b9 100644 --- a/src/tasks/reopen_task.cpp +++ b/src/tasks/reopen_task.cpp @@ -50,7 +50,7 @@ KvError ReopenTask::Reopen(const TableIdent &tbl_id) return enqueue_err; } current_task->WaitIo(); - KvError sync_err = static_cast(current_task->io_res_); + KvError sync_err = ToKvError(current_task->io_res_); if (sync_err != KvError::NoError) { LOG(ERROR) << "Reopen " << tbl_id << " rsync failed, tag " << tag diff --git a/src/tasks/task.cpp b/src/tasks/task.cpp index 21a23671..222715d5 100644 --- a/src/tasks/task.cpp +++ b/src/tasks/task.cpp @@ -70,6 +70,10 @@ void KvTask::FinishIo() case TaskStatus::BlockedIO: if (inflight_io_ == 0) { + if (force_aborted_ && io_res_ == 0) + { + io_res_ = -ECANCELED; + } Resume(); } break; diff --git a/tests/cloud.cpp b/tests/cloud.cpp index d1bea799..24d37aac 100644 --- a/tests/cloud.cpp +++ b/tests/cloud.cpp @@ -373,8 +373,13 @@ TEST_CASE("cloud prewarm honors partition filter", "[cloud][prewarm]") partitions[1], }; options.partition_filter = - [included](const eloqstore::TableIdent &tbl) -> bool - { return included.count(tbl) != 0; }; + [included](const eloqstore::TableIdent &tbl) + -> std::optional + { + return included.count(tbl) != 0 + ? std::optional{0} + : std::nullopt; + }; eloqstore::EloqStore *store = InitStore(options); for (const auto &tbl_id : partitions) @@ -884,8 +889,13 @@ TEST_CASE("cloud global archive shares timestamp and filters partitions", second_page_partition_id, }; options.partition_filter = - [included_ids](const eloqstore::TableIdent &tbl) -> bool - { return included_ids.count(tbl.partition_id_) != 0; }; + [included_ids](const eloqstore::TableIdent &tbl) + -> std::optional + { + return included_ids.count(tbl.partition_id_) != 0 + ? std::optional{0} + : std::nullopt; + }; eloqstore::EloqStore *store = InitStore(options); std::vector> writers; @@ -1277,8 +1287,14 @@ TEST_CASE("cloud reopen triggers prewarm to download newer remote data files", options.prewarm_cloud_cache = true; options.prewarm_task_count = 1; options.allow_reuse_local_caches = true; - options.partition_filter = [&enable_partition_filter](const auto &) - { return enable_partition_filter.load(std::memory_order_relaxed); }; + options.partition_filter = + [&enable_partition_filter](const auto &) + -> std::optional + { + return enable_partition_filter.load(std::memory_order_relaxed) + ? std::optional{0} + : std::nullopt; + }; CleanupStore(options); diff --git a/tests/manifest.cpp b/tests/manifest.cpp index 25878c19..32d055af 100644 --- a/tests/manifest.cpp +++ b/tests/manifest.cpp @@ -196,8 +196,13 @@ TEST_CASE("global archive shares timestamp and filters partitions", partitions[2].partition_id_, }; opts.partition_filter = - [included_ids](const eloqstore::TableIdent &tbl) -> bool - { return included_ids.count(tbl.partition_id_) != 0; }; + [included_ids](const eloqstore::TableIdent &tbl) + -> std::optional + { + return included_ids.count(tbl.partition_id_) != 0 + ? std::optional{0} + : std::nullopt; + }; eloqstore::EloqStore *store = InitStore(opts); std::vector> writers;