diff --git a/log_service b/log_service index 07b5be6d..8d30c8b6 160000 --- a/log_service +++ b/log_service @@ -1 +1 @@ -Subproject commit 07b5be6dc4acfd2879dc1a2b398c825ddc48a76b +Subproject commit 8d30c8b67ae3b580610721df2726182ae46ff304 diff --git a/store_handler/data_store_service_client.cpp b/store_handler/data_store_service_client.cpp index 2fd359b8..233864e0 100644 --- a/store_handler/data_store_service_client.cpp +++ b/store_handler/data_store_service_client.cpp @@ -21,13 +21,18 @@ */ #include "data_store_service_client.h" +#include +#include #include #include #include #include #include +#include +#include #include +#include #include #include #include @@ -40,6 +45,7 @@ #include "cc_req_misc.h" #include "data_store_service_client_closure.h" #include "eloq_data_store_service/data_store_service_config.h" +#include "eloq_data_store_service/data_store_service_util.h" #include "eloq_data_store_service/object_pool.h" // ObjectPool #include "eloq_data_store_service/thread_worker_pool.h" #include "metrics.h" @@ -50,6 +56,12 @@ #include "tx_service/include/error_messages.h" #include "tx_service/include/sequences/sequences.h" +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE +#include "gflags/gflags.h" +DECLARE_string(eloq_store_data_path_list); +DECLARE_string(tx_standby_ip_port_list); +#endif + namespace EloqDS { @@ -3661,6 +3673,33 @@ bool DataStoreServiceClient::CreateSnapshotForBackup( return !callback_data->HasError(); } +bool DataStoreServiceClient::CreateSnapshotForStandby( + uint32_t ng_id, + std::vector &snapshot_files, + uint64_t snapshot_ts) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (!bind_data_shard_with_ng_ || data_store_service_ == nullptr) + { + return false; + } + DLOG(INFO) << "CreateSnapshotForStandby begin, generated_snapshot_ts=" + << snapshot_ts; + const bool ok = data_store_service_->CreateSnapshotForStandby( + ng_id, ng_id, snapshot_ts); + snapshot_files.clear(); + DLOG(INFO) << "CreateSnapshotForStandby success, snapshot_ts=" + << snapshot_ts << ", snapshot_files=" << snapshot_files.size() + << ", ok=" << ok; + return ok; +#else + (void) ng_id; + (void) snapshot_files; + (void) snapshot_ts; + return false; +#endif +} + /** * @brief Internal method for creating snapshots for backup operations. * @@ -3778,6 +3817,17 @@ bool DataStoreServiceClient::OnLeaderStart(uint32_t ng_id, LOG_IF(FATAL, bucket_ids.empty()) << "bucket_ids is empty, ng_id: " << ng_id; // Binded data store shard with ng. +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + const bool enable_local_standby = + !FLAGS_tx_standby_ip_port_list.empty(); + data_store_service_->SetEnableLocalStandbyForEloqStore( + enable_local_standby); + data_store_service_->ClearStandbySnapshotPayloadForEloqStore(ng_id); + DLOG(INFO) << "OnLeaderStart reset eloqstore standby snapshot payload, " + << "enable_local_standby=" << enable_local_standby + << ", tx_standby_ip_port_list=" + << FLAGS_tx_standby_ip_port_list; +#endif data_store_service_->OpenDataStore(ng_id, std::move(bucket_ids), term); } @@ -3796,6 +3846,8 @@ bool DataStoreServiceClient::OnLeaderStop(uint32_t ng_id, int64_t term) if (data_store_service_ != nullptr) { // Close the data store shard. + LOG(INFO) << "DataStoreServiceClient::OnLeaderStop closing data store, " + << "ng_id=" << ng_id << ", term=" << term << ", role=leader"; data_store_service_->CloseDataStore(ng_id); } return true; @@ -3821,6 +3873,15 @@ void DataStoreServiceClient::OnStartFollowing(uint32_t ng_id, if (data_store_service_ != nullptr) { +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + const bool enable_local_standby = + !FLAGS_tx_standby_ip_port_list.empty(); + data_store_service_->SetEnableLocalStandbyForEloqStore( + enable_local_standby); + DLOG(INFO) << "OnStartFollowing set eloqstore enable_local_standby=" + << enable_local_standby << ", tx_standby_ip_port_list=" + << FLAGS_tx_standby_ip_port_list; +#endif data_store_service_->CloseDataStore(ng_id); } @@ -3869,52 +3930,158 @@ void DataStoreServiceClient::OnShutdown() { } +std::string DataStoreServiceClient::SnapshotSyncDestPath() const +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + return FLAGS_eloq_store_data_path_list; +#else + return std::string(""); +#endif +} + bool DataStoreServiceClient::OnSnapshotReceived( const txservice::remote::OnSnapshotSyncedRequest *req) +{ + (void) req; + return true; +} + +bool DataStoreServiceClient::OnUpdateStandbyCkptTs(uint32_t ng_id, + int64_t ng_term, + uint64_t snapshot_ts, + bool skip_reload_data) { #ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE if (!bind_data_shard_with_ng_) { + DLOG(INFO) << "bind_data_shard_with_ng_ is false, return"; return true; } - - if (data_store_service_ != nullptr) + assert(data_store_service_ != nullptr); + if (!skip_reload_data) { - uint32_t ng_id = req->ng_id(); - std::unordered_set bucket_ids; - for (auto &[bucket_id, bucket_info] : bucket_infos_) + const bool reload_ok = + data_store_service_->ReloadData(ng_id, ng_term, snapshot_ts); + if (!reload_ok) { - if (bucket_info->BucketOwner() == ng_id) - { - bucket_ids.insert(bucket_id); - } + LOG(WARNING) + << "DataStoreServiceClient::OnUpdateStandbyCkptTs skip ckpt " + "update because reload failed, ng_id=" + << ng_id << ", term=" << ng_term + << ", snapshot_ts=" << snapshot_ts; + return false; } - int64_t term = - txservice::PrimaryTermFromStandbyTerm(req->standby_node_term()); - data_store_service_->OnSnapshotReceived( - ng_id, term, std::move(bucket_ids), req->snapshot_path()); - - return true; } #endif return true; } -bool DataStoreServiceClient::OnUpdateStandbyCkptTs(uint32_t ng_id, - int64_t ng_term) +bool DataStoreServiceClient::RequestSyncSnapshot(uint32_t ng_id, + int64_t ng_term, + uint64_t snapshot_ts) { #ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE if (!bind_data_shard_with_ng_) { return true; } + assert(data_store_service_ != nullptr); + std::unordered_set bucket_ids; + for (auto &[bucket_id, bucket_info] : bucket_infos_) + { + if (bucket_info->BucketOwner() == ng_id) + { + bucket_ids.insert(bucket_id); + } + } + DLOG(INFO) << "DataStoreServiceClient::RequestSyncSnapshot prepare open " + "data store, ng_id=" + << ng_id << ", term=" << ng_term + << ", bucket_count=" << bucket_ids.size() + << ", snapshot_ts=" << snapshot_ts; + if (data_store_service_->FetchDSShardStatus(ng_id) == DSShardStatus::Closed) + { + data_store_service_->OpenDataStore( + ng_id, std::move(bucket_ids), ng_term); + } + return data_store_service_->ReloadData(ng_id, ng_term, snapshot_ts); +#else + (void) ng_id; + (void) ng_term; + (void) snapshot_ts; + return false; +#endif +} + +void DataStoreServiceClient::DeleteStandbySnapshot(uint32_t ng_id, + uint64_t snapshot_ts) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (data_store_service_ == nullptr) + { + return; + } + const bool ok = + data_store_service_->DeleteStandbySnapshot(ng_id, snapshot_ts); + DLOG(INFO) << "DataStoreServiceClient::DeleteStandbySnapshot, ng_id=" + << ng_id << ", snapshot_ts=" << snapshot_ts << ", ok=" << ok; +#else + (void) ng_id; + (void) snapshot_ts; +#endif +} + +void DataStoreServiceClient::DeleteStandbySnapshotsBefore(uint32_t ng_id, + uint64_t snapshot_ts) +{ + if (!bind_data_shard_with_ng_) + { + return; + } + assert(data_store_service_ != nullptr); +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + data_store_service_->DeleteStandbySnapshotsBefore(ng_id, snapshot_ts); +#else + (void) ng_id; + (void) snapshot_ts; +#endif +} + +uint64_t DataStoreServiceClient::CurrentStandbySnapshotTs(uint32_t ng_id) +{ + if (!bind_data_shard_with_ng_) + { + return 0; + } + assert(data_store_service_ != nullptr); +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + const uint64_t ts = data_store_service_->CurrentStandbySnapshotTs(ng_id); + DLOG(INFO) << "CurrentStandbySnapshotTs from data_store_service, ng_id=" + << ng_id << ", snapshot_ts=" << ts; + return ts; +#else + (void) ng_id; + return 0; +#endif +} + +void DataStoreServiceClient::SetStandbySnapshotPayload( + uint32_t ng_id, const std::string &snapshot_path) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (!bind_data_shard_with_ng_) + { + return; + } if (data_store_service_ != nullptr) { - data_store_service_->OnUpdateStandbyCkptTs(ng_id, ng_term); + data_store_service_->SetStandbySnapshotPayload(ng_id, snapshot_path); } +#else + (void) ng_id; + (void) snapshot_path; #endif - return true; } /** diff --git a/store_handler/data_store_service_client.h b/store_handler/data_store_service_client.h index fb877d1e..e086a17f 100644 --- a/store_handler/data_store_service_client.h +++ b/store_handler/data_store_service_client.h @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -198,7 +199,11 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler { if (bind_data_shard_with_ng_ && data_store_service_ != nullptr) { +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + return true; +#else return data_store_service_->IsCloudMode(); +#endif } else { @@ -452,11 +457,10 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler std::vector &archives, uint64_t from_ts) override; - /** - * @brief Create a snapshot for backup. - * @param snapshot_files The output snapshot files. - * @return True if create successfully, otherwise false. - */ + bool CreateSnapshotForStandby(uint32_t ng_id, + std::vector &snapshot_files, + uint64_t snapshot_ts) override; + bool CreateSnapshotForBackup(const std::string &backup_name, std::vector &backup_files, uint64_t backup_ts = 0) override; @@ -480,6 +484,13 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler void OnShutdown() override; + /** + * For EloqStore local-standby mode, snapshot sync is pull-based: + * standby/follower pulls data from master. This returns the master-side + * source path list (optionally with weights) to send to standby. + */ + std::string SnapshotSyncDestPath() const override; + bool RemoveBackupSnapshot(const std::string &backup_name) override { return true; @@ -488,7 +499,19 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler bool OnSnapshotReceived( const txservice::remote::OnSnapshotSyncedRequest *req) override; - bool OnUpdateStandbyCkptTs(uint32_t ng_id, int64_t ng_term) override; + bool OnUpdateStandbyCkptTs(uint32_t ng_id, + int64_t ng_term, + uint64_t snapshot_ts, + bool skip_reload_data = false) override; + bool RequestSyncSnapshot(uint32_t ng_id, + int64_t ng_term, + uint64_t snapshot_ts) override; + void DeleteStandbySnapshot(uint32_t ng_id, uint64_t snapshot_ts) override; + void DeleteStandbySnapshotsBefore(uint32_t ng_id, + uint64_t snapshot_ts) override; + uint64_t CurrentStandbySnapshotTs(uint32_t ng_id) override; + void SetStandbySnapshotPayload(uint32_t ng_id, + const std::string &snapshot_path) override; /** * Serialize a record with is_deleted flag and record string. diff --git a/store_handler/eloq_data_store_service/build_eloq_store.cmake b/store_handler/eloq_data_store_service/build_eloq_store.cmake index e73801e3..eb44729d 100644 --- a/store_handler/eloq_data_store_service/build_eloq_store.cmake +++ b/store_handler/eloq_data_store_service/build_eloq_store.cmake @@ -102,6 +102,7 @@ set(ELOQ_STORE_SOURCES ${ELOQ_STORE_SOURCE_DIR}/src/file_gc.cpp ${ELOQ_STORE_SOURCE_DIR}/src/kill_point.cpp ${ELOQ_STORE_SOURCE_DIR}/src/replayer.cpp + ${ELOQ_STORE_SOURCE_DIR}/src/standby_service.cpp ${ELOQ_STORE_SOURCE_DIR}/src/storage/cloud_backend.cpp ${ELOQ_STORE_SOURCE_DIR}/src/storage/data_page.cpp ${ELOQ_STORE_SOURCE_DIR}/src/storage/data_page_builder.cpp @@ -143,7 +144,7 @@ if(WITH_TXSERVICE) # Add include directory for metrics headers # From build_eloq_store.cmake location, eloq_metrics is at ../../eloq_metrics # CMAKE_CURRENT_LIST_DIR is the directory of this file - target_include_directories(eloqstore PRIVATE + target_include_directories(eloqstore PRIVATE ${CMAKE_CURRENT_LIST_DIR}/../../eloq_metrics/include) # ELOQSTORE_WITH_TXSERVICE is already defined via add_compile_definitions in parent CMakeLists.txt # but we ensure it's also set for this target explicitly diff --git a/store_handler/eloq_data_store_service/data_store.h b/store_handler/eloq_data_store_service/data_store.h index 9a399871..dad914b1 100644 --- a/store_handler/eloq_data_store_service/data_store.h +++ b/store_handler/eloq_data_store_service/data_store.h @@ -25,6 +25,8 @@ #include #include #include +#include +#include namespace EloqDS { @@ -129,6 +131,14 @@ class DataStore virtual void CreateSnapshotForBackup( CreateSnapshotForBackupRequest *req) = 0; + // Default no-op. EloqStoreDataStore overrides this to delete a standby + // snapshot archive by tag. + virtual void DeleteStandbySnapshot(uint64_t term, std::string_view tag) + { + (void) term; + (void) tag; + } + /** * @brief Switch the data store to read only mode. */ @@ -140,9 +150,28 @@ class DataStore virtual void SwitchToReadWrite() = 0; /** - * @brief For cloud mode, sync the data from cloud and reload the data. + * @brief Reload data for standby sync (both cloud and local modes). + * Default no-op for stores that don't need this path. */ - virtual void ReloadDataFromCloud(int64_t term) = 0; + virtual bool ReloadData(int64_t term, uint64_t snapshot_ts) + { + (void) term; + (void) snapshot_ts; + return true; + } + + virtual void UpdateStandbyMasterStorePaths( + const std::vector &store_paths, + const std::vector &store_path_weights) + { + (void) store_paths; + (void) store_path_weights; + } + + virtual void UpdateStandbyMasterAddr(const std::string &standby_master_addr) + { + (void) standby_master_addr; + } protected: uint32_t shard_id_; diff --git a/store_handler/eloq_data_store_service/data_store_service.cpp b/store_handler/eloq_data_store_service/data_store_service.cpp index 484a475f..c6d10e37 100644 --- a/store_handler/eloq_data_store_service/data_store_service.cpp +++ b/store_handler/eloq_data_store_service/data_store_service.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -39,6 +40,11 @@ #include #include "data_store_fault_inject.h" // ACTION_FAULT_INJECTOR +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE +#include "eloq_store_data_store.h" +#include "eloq_store_data_store_factory.h" +#include "eloqstore/include/common.h" +#endif #include "internal_request.h" #include "object_pool.h" #if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) @@ -228,6 +234,7 @@ DataStoreService::DataStoreService( { data_shards_[i].shard_id_ = i; data_shards_[i].shard_status_.store(DSShardStatus::Closed); + data_shards_[i].latest_term_.store(0, std::memory_order_release); } } @@ -2205,6 +2212,11 @@ void DataStoreService::OpenDataStore(uint32_t shard_id, std::unordered_set &&bucket_ids, int64_t term) { + auto &ds_ref = data_shards_.at(shard_id); + const size_t bucket_count = bucket_ids.size(); + DLOG(INFO) << "OpenDataStore begin for DSS shard " << shard_id << ", term " + << term << ", bucket_count " << bucket_count << ", shard_status " + << static_cast(ds_ref.shard_status_.load()); if (data_store_factory_ != nullptr) { data_store_factory_->InitializePartitionFilter(shard_id, @@ -2212,7 +2224,6 @@ void DataStoreService::OpenDataStore(uint32_t shard_id, } auto start_time = std::chrono::steady_clock::now(); - auto &ds_ref = data_shards_.at(shard_id); if (ds_ref.shard_status_.load() != DSShardStatus::Closed) { LOG(INFO) << "OpenDataStore no-op for DSS shard status is not closed, " @@ -2240,87 +2251,338 @@ void DataStoreService::OpenDataStore(uint32_t shard_id, { LOG(ERROR) << "OpenDataStore failed for DSS shard " << shard_id << ", shard_id_: " << ds_ref.shard_id_ << ", shard_status_: " - << static_cast(ds_ref.shard_status_.load()) + << static_cast(ds_ref.shard_status_.load()) << ", term " + << term << ", bucket_count " << bucket_count << ", use time: " << use_time << " ms"; } else { + ds_ref.latest_term_.store(term, std::memory_order_release); LOG(INFO) << "OpenDataStore success for DSS shard " << shard_id << ", shard_id_: " << ds_ref.shard_id_ << ", shard_status_: " - << static_cast(ds_ref.shard_status_.load()) + << static_cast(ds_ref.shard_status_.load()) << ", term " + << term << ", bucket_count " << bucket_count << ", use time: " << use_time << " ms"; } } void DataStoreService::OnSnapshotReceived( - uint32_t shard_id, - int64_t term, - std::unordered_set &&bucket_ids, - const std::string &snapshot_path) + uint32_t shard_id, int64_t term, std::unordered_set &&bucket_ids) +{ + (void) shard_id; + (void) term; + (void) bucket_ids; +} + +void DataStoreService::SetStandbySnapshotPayload( + uint32_t shard_id, const std::string &snapshot_path) { #ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE auto &ds_ref = data_shards_.at(shard_id); + DLOG(INFO) << "SetStandbySnapshotPayload " << snapshot_path; - if (!data_store_factory_->IsCloudMode()) + std::string standby_addr; + std::string store_path_list = snapshot_path; + const size_t addr_delim_pos = snapshot_path.find(':'); + if (addr_delim_pos != std::string::npos) { - // TODO(lzx): Handle the case of local data store mode. (replace the - // data with the snapshot) - LOG(FATAL) - << "OnSnapshotReceived not implemented for local data store mode"; + const std::string addr_candidate = + snapshot_path.substr(0, addr_delim_pos); + if (addr_candidate.empty() || addr_candidate == "local" || + (addr_candidate.find('/') == std::string::npos && + addr_candidate.find('@') != std::string::npos)) + { + standby_addr = addr_candidate; + store_path_list = snapshot_path.substr(addr_delim_pos + 1); + } } - else + + std::vector standby_master_store_paths; + std::vector standby_master_store_path_weights; + std::string error_message; + if (!::eloqstore::ParseStorePathListWithWeights( + store_path_list, + standby_master_store_paths, + standby_master_store_path_weights, + &error_message)) { - if (ds_ref.shard_status_.load(std::memory_order_acquire) == - DSShardStatus::Closed) + LOG(ERROR) << "SetStandbySnapshotPayload invalid payload, shard " + << shard_id << ", payload=" << snapshot_path + << ", error=" << error_message; + return; + } + + auto *eloq_store_factory = + static_cast(data_store_factory_.get()); + if (!standby_addr.empty()) + { + eloq_store_factory->UpdateStandbyMasterAddr(standby_addr); + } + eloq_store_factory->UpdateStandbyMasterStorePaths( + standby_master_store_paths, standby_master_store_path_weights); + if (ds_ref.data_store_ != nullptr) + { + if (!standby_addr.empty()) { - // If the shard is closed, open it and load data from cloud. - LOG(INFO) << "OnSnapshotReceived, open data store for DSS shard " - << shard_id << " and term " << term; - OpenDataStore(shard_id, std::move(bucket_ids), term); + ds_ref.data_store_->UpdateStandbyMasterAddr(standby_addr); } - else - { - while (ds_ref.shard_status_.load(std::memory_order_acquire) == - DSShardStatus::Starting) - { - bthread_usleep(1000); - LOG(INFO) << "OnSnapshotReceived, data store is starting, " - "waiting for data store to be ready"; - } + ds_ref.data_store_->UpdateStandbyMasterStorePaths( + standby_master_store_paths, standby_master_store_path_weights); + } - LOG(INFO) - << "OnSnapshotReceived, reload data from cloud for DSS shard " - << shard_id << " and term " << term; - ds_ref.data_store_->ReloadDataFromCloud(term); - return; - } + DLOG(INFO) << "SetStandbySnapshotPayload assigned, shard " << shard_id + << ", payload=" << snapshot_path + << ", standby_addr=" << standby_addr + << ", path_count=" << standby_master_store_paths.size(); +#else + (void) shard_id; + (void) snapshot_path; +#endif +} + +void DataStoreService::ClearStandbySnapshotPayloadForEloqStore( + uint32_t shard_id) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &ds_ref = data_shards_.at(shard_id); + auto *eloq_store_factory = + static_cast(data_store_factory_.get()); + eloq_store_factory->UpdateStandbyMasterAddr(""); + eloq_store_factory->UpdateStandbyMasterStorePaths({}, {}); + if (ds_ref.data_store_ != nullptr) + { + ds_ref.data_store_->UpdateStandbyMasterAddr(""); + ds_ref.data_store_->UpdateStandbyMasterStorePaths({}, {}); } + DLOG(INFO) << "ClearStandbySnapshotPayloadForEloqStore, shard=" << shard_id; +#else + (void) shard_id; +#endif +} + +void DataStoreService::SetEnableLocalStandbyForEloqStore(bool enable) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto *eloq_store_factory = + static_cast(data_store_factory_.get()); + eloq_store_factory->SetEnableLocalStandby(enable); + DLOG(INFO) << "SetEnableLocalStandbyForEloqStore, enable=" << enable; #else - LOG(INFO) << "OnSnapshotReceived no-op for non-eloqstore data store"; + (void) enable; #endif } -void DataStoreService::OnUpdateStandbyCkptTs(uint32_t shard_id, int64_t term) +bool DataStoreService::ReloadData(uint32_t shard_id, + int64_t term, + uint64_t snapshot_ts) { #ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE auto &ds_ref = data_shards_.at(shard_id); + const int64_t latest_term = + ds_ref.latest_term_.load(std::memory_order_acquire); + if (term < latest_term) + { + LOG(INFO) << "ReloadData discard stale term for DSS shard " << shard_id + << ", term " << term << ", latest_term " << latest_term + << ", snapshot_ts " << snapshot_ts; + return false; + } + const uint64_t latest_snapshot_ts = + ds_ref.latest_snapshot_ts_.load(std::memory_order_acquire); + if (snapshot_ts < latest_snapshot_ts) + { + LOG(INFO) << "ReloadData discard stale snapshot for DSS shard " + << shard_id << ", term " << term << ", snapshot_ts " + << snapshot_ts << ", latest_snapshot_ts " + << latest_snapshot_ts; + return false; + } + + bool ok = false; if (ds_ref.data_store_ != nullptr && ds_ref.shard_status_.load() != DSShardStatus::Closed) { - LOG(INFO) - << "StandbySyncAndReloadData reload data from cloud for DSS shard " - << shard_id; - ds_ref.data_store_->ReloadDataFromCloud(term); + LOG(INFO) << "ReloadData for DSS shard " << shard_id << ", term " + << term << ", snapshot_ts " << snapshot_ts; + ok = ds_ref.data_store_->ReloadData(term, snapshot_ts); + if (ok) + { + ds_ref.latest_term_.store(term, std::memory_order_release); + ds_ref.latest_snapshot_ts_.store(snapshot_ts, + std::memory_order_release); + } } else { - LOG(ERROR) << "StandbySyncAndReloadData no-op for DSS shard status is " + LOG(ERROR) << "ReloadData no-op for DSS shard status is " "closed or data store is nullptr, " << shard_id; } + DLOG(INFO) << "ReloadData finished, shard " << shard_id << ", term " << term + << ", snapshot_ts " << snapshot_ts << ", ok=" << ok; + return ok; +#else + LOG(INFO) << "ReloadData no-op for non-eloqstore data store"; + (void) shard_id; + (void) term; + (void) snapshot_ts; + return false; +#endif +} + +bool DataStoreService::CreateSnapshotForStandby(uint32_t shard_id, + uint32_t ng_id, + uint64_t snapshot_ts) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &ds_ref = data_shards_.at(shard_id); + if (ds_ref.data_store_ == nullptr || + ds_ref.shard_status_.load(std::memory_order_acquire) == + DSShardStatus::Closed) + { + LOG(ERROR) + << "CreateSnapshotForStandby no-op for DSS shard status is closed " + "or data store is nullptr, " + << shard_id; + return false; + } + auto *eloq_store = + static_cast(ds_ref.data_store_.get()); + const std::string tag = + std::string(DataStoreService::kStandbySnapshotTagPrefix) + + std::to_string(snapshot_ts); + const bool ok = eloq_store->CreateSnapshotForStandby(ng_id, tag); + DLOG(INFO) << "CreateSnapshotForStandby finished, shard " << shard_id + << ", ng_id " << ng_id << ", snapshot_ts " << snapshot_ts + << ", tag " << tag << ", ok=" << ok; + return ok; +#else + (void) shard_id; + (void) ng_id; + (void) snapshot_ts; + return false; +#endif +} + +bool DataStoreService::DeleteStandbySnapshot(uint32_t shard_id, + uint64_t snapshot_ts) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &ds_ref = data_shards_.at(shard_id); + if (ds_ref.data_store_ == nullptr) + { + LOG(WARNING) << "DeleteStandbySnapshot skipped: data store is nullptr, " + << "shard " << shard_id << ", snapshot_ts " << snapshot_ts; + return false; + } + const std::string tag = "snapshot_" + std::to_string(snapshot_ts); + ds_ref.data_store_->DeleteStandbySnapshot( + std::numeric_limits::max(), tag); + DLOG(INFO) << "DeleteStandbySnapshot submitted, shard " << shard_id + << ", tag " << tag; + return true; +#else + (void) shard_id; + (void) snapshot_ts; + return false; +#endif +} + +void DataStoreService::DeleteStandbySnapshotsBefore(uint32_t shard_id, + uint64_t snapshot_ts) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &ds_ref = data_shards_.at(shard_id); + const uint64_t latest_delete_archive_ts = + ds_ref.latest_delete_archive_ts_.load(std::memory_order_acquire); + if (snapshot_ts <= latest_delete_archive_ts) + { + DLOG(INFO) << "DeleteStandbySnapshotsBefore skipped by threshold, " + << "shard " << shard_id << ", snapshot_ts " << snapshot_ts + << ", latest_delete_archive_ts " << latest_delete_archive_ts; + return; + } + if (ds_ref.data_store_ == nullptr) + { + LOG(WARNING) + << "DeleteStandbySnapshotsBefore skipped: data store is nullptr, " + << "shard " << shard_id << ", snapshot_ts " << snapshot_ts; + return; + } + std::vector entries; + auto *eloq_store = + static_cast(ds_ref.data_store_.get()); + if (!eloq_store->ListArchiveTags("snapshot_", &entries)) + { + LOG(WARNING) << "DeleteStandbySnapshotsBefore failed to list entries, " + << "shard " << shard_id; + return; + } + for (const auto &entry : entries) + { + const std::string &tag = entry.tag; + if (tag.rfind("snapshot_", 0) != 0) + { + continue; + } + const std::string ts_str = tag.substr(std::size("snapshot_") - 1); + uint64_t tag_ts = 0; + if (!eloqstore::ParseUint64(ts_str, tag_ts)) + { + continue; + } + if (tag_ts < snapshot_ts) + { + ds_ref.data_store_->DeleteStandbySnapshot(entry.term, tag); + DLOG(INFO) << "DeleteStandbySnapshotsBefore removed entry, shard " + << shard_id << ", term " << entry.term << ", tag " << tag + << ", threshold_snapshot_ts " << snapshot_ts; + } + } + UpdateLatestDeleteArchiveTs(shard_id, snapshot_ts); +#else + (void) shard_id; + (void) snapshot_ts; +#endif +} + +uint64_t DataStoreService::CurrentStandbySnapshotTs(uint32_t shard_id) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &ds_ref = data_shards_.at(shard_id); + return ds_ref.latest_snapshot_ts_.load(std::memory_order_acquire); +#else + (void) shard_id; + return 0; +#endif +} + +uint64_t DataStoreService::LatestDeleteArchiveTs(uint32_t shard_id) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &ds_ref = data_shards_.at(shard_id); + return ds_ref.latest_delete_archive_ts_.load(std::memory_order_acquire); +#else + (void) shard_id; + return 0; +#endif +} +void DataStoreService::UpdateLatestDeleteArchiveTs(uint32_t shard_id, + uint64_t ts) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &ds_ref = data_shards_.at(shard_id); + uint64_t cur = + ds_ref.latest_delete_archive_ts_.load(std::memory_order_acquire); + while (ts > cur && + !ds_ref.latest_delete_archive_ts_.compare_exchange_weak( + cur, ts, std::memory_order_release, std::memory_order_acquire)) + { + } #else - LOG(INFO) << "OnUpdateStandbyCkptTs no-op for non-eloqstore data store"; + (void) shard_id; + (void) ts; #endif } @@ -2745,8 +3007,8 @@ bool DataStoreService::SwitchReadOnlyToClosed(uint32_t shard_id) } auto &ds_ref = data_shards_.at(shard_id); DSShardStatus expected = DSShardStatus::ReadOnly; - if (!ds_ref.shard_status_.compare_exchange_strong(expected, - DSShardStatus::Closed) && + if (!ds_ref.shard_status_.compare_exchange_strong( + expected, DSShardStatus::Starting) && expected != DSShardStatus::Closed) { DLOG(ERROR) << "SwitchReadOnlyToClosed failed, shard status is not " @@ -2756,8 +3018,14 @@ bool DataStoreService::SwitchReadOnlyToClosed(uint32_t shard_id) if (expected == DSShardStatus::ReadOnly) { - cluster_manager_.SwitchShardToClosed(shard_id, expected); + DLOG(INFO) << "SwitchReadOnlyToClosed enter shutdown, shard " + << shard_id; ds_ref.data_store_->Shutdown(); + DSShardStatus expected_after_shutdown = DSShardStatus::Starting; + const bool switched = ds_ref.shard_status_.compare_exchange_strong( + expected_after_shutdown, DSShardStatus::Closed); + CHECK(switched); + cluster_manager_.SwitchShardToClosed(shard_id, DSShardStatus::ReadOnly); } return true; } diff --git a/store_handler/eloq_data_store_service/data_store_service.h b/store_handler/eloq_data_store_service/data_store_service.h index bf3b9456..0c3f4b4b 100644 --- a/store_handler/eloq_data_store_service/data_store_service.h +++ b/store_handler/eloq_data_store_service/data_store_service.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -655,6 +656,13 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService return data_store_factory_->IsCloudMode(); } +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + static constexpr const char *kStandbySnapshotTagPrefix = "snapshot_"; +#endif + + void SetEnableLocalStandbyForEloqStore(bool enable); + void ClearStandbySnapshotPayloadForEloqStore(uint32_t shard_id); + void CloseDataStore(uint32_t shard_id); void OpenDataStore(uint32_t shard_id, std::unordered_set &&bucket_ids, @@ -664,11 +672,20 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService // function to reload the snapshot or sync from cloud. void OnSnapshotReceived(uint32_t shard_id, int64_t term, - std::unordered_set &&bucket_ids, - const std::string &snapshot_path); + std::unordered_set &&bucket_ids); + void SetStandbySnapshotPayload(uint32_t shard_id, + const std::string &snapshot_path); // When primary flush data to cloud, the standby node will call this // function to sync the data from cloud. - void OnUpdateStandbyCkptTs(uint32_t shard_id, int64_t ng_term); + bool ReloadData(uint32_t shard_id, int64_t ng_term, uint64_t snapshot_ts); + bool CreateSnapshotForStandby(uint32_t shard_id, + uint32_t ng_id, + uint64_t snapshot_ts); + bool DeleteStandbySnapshot(uint32_t shard_id, uint64_t snapshot_ts); + void DeleteStandbySnapshotsBefore(uint32_t shard_id, uint64_t snapshot_ts); + uint64_t CurrentStandbySnapshotTs(uint32_t shard_id); + uint64_t LatestDeleteArchiveTs(uint32_t shard_id); + void UpdateLatestDeleteArchiveTs(uint32_t shard_id, uint64_t ts); DataStoreServiceClusterManager &GetClusterManager() { @@ -696,7 +713,6 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService bool SwitchReadWriteToReadOnly(uint32_t shard_id); bool SwitchReadOnlyToClosed(uint32_t shard_id); bool SwitchReadOnlyToReadWrite(uint32_t shard_id); - bool WriteMigrationLog(uint32_t shard_id, const std::string &event_id, const std::string &target_node_ip, @@ -746,6 +762,12 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService std::unique_ptr data_store_{nullptr}; std::atomic shard_status_{DSShardStatus::Closed}; std::atomic ongoing_write_requests_{0}; + std::atomic latest_term_{0}; + // NOTE: latest_snapshot_ts_ ordering relies on ReloadData calls being + // sequential for a shard. This is true when tx_service and DSS run in + // the same process. Standalone DSS server mode does not guarantee it. + std::atomic latest_snapshot_ts_{0}; + std::atomic latest_delete_archive_ts_{0}; std::unique_ptr scan_iter_cache_{nullptr}; // Whether the file cache sync is running. Used to avoid concurrent diff --git a/store_handler/eloq_data_store_service/eloq_store_config.cpp b/store_handler/eloq_data_store_service/eloq_store_config.cpp index 803b8b6f..66844620 100644 --- a/store_handler/eloq_data_store_service/eloq_store_config.cpp +++ b/store_handler/eloq_data_store_service/eloq_store_config.cpp @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -42,8 +43,9 @@ DEFINE_uint32(eloq_store_worker_num, 1, "EloqStore server worker num."); DEFINE_string(eloq_store_data_path_list, "", - "The data paths of the EloqStore (default is " - "'{eloq_data_path}/eloq_dss/eloqstore_data')."); + "EloqStore local store paths in format " + "path1,path2,...[,pathN][:weight1,weight2,...,weightN]. " + "Weights are optional."); DEFINE_uint32(eloq_store_open_files_limit, 400000, "EloqStore maximum open files."); @@ -153,13 +155,21 @@ DEFINE_uint32(eloq_store_cloud_request_threads, "EloqStore cloud request thread number"); DEFINE_uint32(eloq_store_max_write_concurrency, 0, - "EloqStore max write concurrency"); + "EloqStore maximum number of concurrent write tasks per shard; " + "0 keeps legacy unlimited behavior and is rewritten to " + "max_cloud_concurrency in cloud mode."); +DEFINE_uint32(eloq_store_standby_max_concurrency, + 100, + "EloqStore maximum number of concurrent standby rsync/ssh jobs."); DEFINE_uint32(eloq_store_direct_io_buffer_pool_size, 16, - "EloqStore DirectIO buffer pool size per shard"); -DEFINE_bool(eloq_store_cloud_auto_credentials, - true, - "EloqStore automatically detect OSS credentials"); + "EloqStore maximum number of cached DirectIO buffers per shard."); +DEFINE_bool( + eloq_store_cloud_auto_credentials, + true, + "EloqStore automatically retrieves cloud credentials from the " + "environment or instance metadata instead of explicit access/secret " + "keys."); namespace EloqDS { @@ -423,11 +433,7 @@ EloqStoreConfig::EloqStoreConfig(const INIReader &config_reader, : config_reader.GetString("store", "eloq_store_data_path_list", FLAGS_eloq_store_data_path_list); - if (!storage_path_list.empty()) - { - ParseStoragePath(storage_path_list, eloqstore_configs_.store_path); - } - else + if (storage_path_list.empty()) { eloqstore_configs_.store_path.emplace_back() .append(base_data_path) @@ -437,6 +443,24 @@ EloqStoreConfig::EloqStoreConfig(const INIReader &config_reader, std::filesystem::create_directories( eloqstore_configs_.store_path.back()); } + storage_path_list = eloqstore_configs_.store_path.back(); + GFLAGS_NAMESPACE::SetCommandLineOption("eloq_store_data_path_list", + storage_path_list.c_str()); + } + if (!storage_path_list.empty()) + { + std::string error_message; + if (!::eloqstore::ParseStorePathListWithWeights( + storage_path_list, + eloqstore_configs_.store_path, + eloqstore_configs_.store_path_weights, + &error_message)) + { + LOG(FATAL) << "Invalid eloq_store_data_path_list: " + << storage_path_list << ", error: " << error_message; + } + GFLAGS_NAMESPACE::SetCommandLineOption("eloq_store_data_path_list", + storage_path_list.c_str()); } eloqstore_configs_.fd_limit = @@ -721,6 +745,8 @@ EloqStoreConfig::EloqStoreConfig(const INIReader &config_reader, : config_reader.GetBoolean("store", "eloq_store_reuse_local_files", FLAGS_eloq_store_reuse_local_files); + eloqstore_configs_.standby_master_addr.clear(); + eloqstore_configs_.enable_local_standby = false; eloqstore_configs_.data_page_size = !CheckCommandLineFlagIsDefault("eloq_store_data_page_size") ? FLAGS_eloq_store_data_page_size @@ -782,6 +808,13 @@ EloqStoreConfig::EloqStoreConfig(const INIReader &config_reader, : config_reader.GetInteger("store", "eloq_store_max_write_concurrency", FLAGS_eloq_store_max_write_concurrency); + eloqstore_configs_.standby_max_concurrency = + !CheckCommandLineFlagIsDefault("eloq_store_standby_max_concurrency") + ? FLAGS_eloq_store_standby_max_concurrency + : config_reader.GetInteger( + "store", + "eloq_store_standby_max_concurrency", + FLAGS_eloq_store_standby_max_concurrency); eloqstore_configs_.direct_io_buffer_pool_size = !CheckCommandLineFlagIsDefault("eloq_store_direct_io_buffer_pool_size") ? FLAGS_eloq_store_direct_io_buffer_pool_size @@ -820,18 +853,4 @@ EloqStoreConfig::EloqStoreConfig(const INIReader &config_reader, FLAGS_eloq_store_cloud_auto_credentials); } -void EloqStoreConfig::ParseStoragePath( - const std::string_view storage_path_list, - std::vector &storage_path_vector) -{ - storage_path_vector.clear(); - const char path_delimiter = ','; - std::string token; - std::istringstream tokenStream(storage_path_list.data()); - while (std::getline(tokenStream, token, path_delimiter)) - { - storage_path_vector.emplace_back(token); - } -} - } // namespace EloqDS diff --git a/store_handler/eloq_data_store_service/eloq_store_config.h b/store_handler/eloq_data_store_service/eloq_store_config.h index ae14769e..ad9891b7 100644 --- a/store_handler/eloq_data_store_service/eloq_store_config.h +++ b/store_handler/eloq_data_store_service/eloq_store_config.h @@ -40,9 +40,6 @@ struct EloqStoreConfig EloqStoreConfig(const EloqStoreConfig &rhs) = delete; EloqStoreConfig(EloqStoreConfig &&rhs) = default; - static void ParseStoragePath(const std::string_view storage_path_list, - std::vector &storage_path_vector); - ::eloqstore::KvOptions eloqstore_configs_{}; }; } // namespace EloqDS diff --git a/store_handler/eloq_data_store_service/eloq_store_data_store.cpp b/store_handler/eloq_data_store_service/eloq_store_data_store.cpp index 77bc17a0..fe796c7a 100644 --- a/store_handler/eloq_data_store_service/eloq_store_data_store.cpp +++ b/store_handler/eloq_data_store_service/eloq_store_data_store.cpp @@ -25,8 +25,10 @@ #include #include +#include #include +#include "common.h" #include "eloq_store_data_store_factory.h" #include "internal_request.h" @@ -51,9 +53,6 @@ thread_local ObjectPool> thread_local ObjectPool> eloq_store_drop_table_op_pool_; thread_local ObjectPool eloq_store_scan_del_op_pool_; -thread_local ObjectPool< - EloqStoreOperationData<::eloqstore::GlobalReopenRequest>> - eloq_store_global_reopen_op_pool_; inline void BuildKey(const WriteRecordsRequest &write_req, const std::size_t key_first_idx, @@ -589,8 +588,12 @@ void EloqStoreDataStore::CreateSnapshotForBackup( { PoolableGuard req_guard(req); + const std::string tag = std::to_string(req->GetBackupTs()); + ::eloqstore::GlobalArchiveRequest global_archive_req; - global_archive_req.SetSnapshotTimestamp(req->GetBackupTs()); + global_archive_req.SetAction( + ::eloqstore::GlobalArchiveRequest::Action::Create); + global_archive_req.SetTag(tag); eloq_store_service_->ExecSync(&global_archive_req); ::EloqDS::remote::DataStoreError ds_error; @@ -615,6 +618,65 @@ void EloqStoreDataStore::CreateSnapshotForBackup( req->SetFinish(ds_error, error_msg); } +bool EloqStoreDataStore::CreateSnapshotForStandby(uint32_t ng_id, + std::string_view tag) +{ + (void) ng_id; + ::eloqstore::GlobalArchiveRequest global_archive_req; + global_archive_req.SetAction( + ::eloqstore::GlobalArchiveRequest::Action::Create); + global_archive_req.SetTag(std::string(tag)); + eloq_store_service_->ExecSync(&global_archive_req); + if (global_archive_req.Error() != ::eloqstore::KvError::NoError) + { + LOG(WARNING) << "CreateSnapshotForStandby failed, tag=" << tag + << ", error=" + << static_cast(global_archive_req.Error()); + return false; + } + return true; +} + +void EloqStoreDataStore::DeleteStandbySnapshot(uint64_t term, + std::string_view tag) +{ + ::eloqstore::GlobalArchiveRequest global_archive_req; + global_archive_req.SetAction( + ::eloqstore::GlobalArchiveRequest::Action::Delete); + global_archive_req.SetTerm(term); + global_archive_req.SetTag(std::string(tag)); + eloq_store_service_->ExecSync(&global_archive_req); + if (global_archive_req.Error() != ::eloqstore::KvError::NoError) + { + LOG(WARNING) << "DeleteStandbySnapshot failed for tag=" << tag + << ", error=" + << static_cast(global_archive_req.Error()); + } +} + +bool EloqStoreDataStore::ListArchiveTags(std::string_view prefix, + std::vector *entries) +{ + if (entries == nullptr) + { + return false; + } + ::eloqstore::GlobalListArchiveTagsRequest req; + req.SetPrefix(std::string(prefix)); + eloq_store_service_->ExecSync(&req); + if (req.Error() != ::eloqstore::KvError::NoError) + { + return false; + } + entries->clear(); + entries->reserve(req.Entries().size()); + for (const auto &entry : req.Entries()) + { + entries->push_back(ArchiveEntry{.term = entry.term, .tag = entry.tag}); + } + return true; +} + void EloqStoreDataStore::ScanDelete(DeleteRangeRequest *delete_range_req) { ::eloqstore::TableIdent eloq_store_table_id; @@ -877,62 +939,63 @@ void EloqStoreDataStore::OnFloor(::eloqstore::KvRequest *req) ds_scan_req->SetFinish(::EloqDS::remote::DataStoreError::NO_ERROR); } -void EloqStoreDataStore::ReloadDataFromCloud(int64_t term) +bool EloqStoreDataStore::ReloadData(int64_t term, uint64_t snapshot_ts) { - LOG(INFO) << "EloqStoreDataStore::ReloadDataFromCloud, term: " << term; + LOG(INFO) << "EloqStoreDataStore::ReloadData, term: " << term + << ", snapshot_ts: " << snapshot_ts; if (eloq_store_service_->Term() != static_cast(term)) { - LOG(FATAL) << "EloqStoreDataStore::ReloadDataFromCloud, term mismatch, " + LOG(ERROR) << "EloqStoreDataStore::ReloadData, term mismatch, " "expected: " - << term << ", actual: " << eloq_store_service_->Term(); - return; + << term << ", actual: " << eloq_store_service_->Term() + << ")"; + return false; } - EloqStoreOperationData<::eloqstore::GlobalReopenRequest> *global_reopen_op = - eloq_store_global_reopen_op_pool_.NextObject(); - global_reopen_op->Reset(nullptr); - PoolableGuard op_guard(global_reopen_op); - - ::eloqstore::GlobalReopenRequest &global_reopen_req = - global_reopen_op->EloqStoreRequest(); - - DLOG(INFO) - << "===============EloqStoreDataStore::ReloadDataFromCloud, send " - "reopen request: " - << &global_reopen_req << " to EloqStore"; - uint64_t user_data = reinterpret_cast(global_reopen_op); - if (!eloq_store_service_->ExecAsyn( - &global_reopen_req, user_data, OnReLoaded)) + const std::string reopen_tag = "snapshot_" + std::to_string(snapshot_ts); + DLOG(INFO) << "EloqStoreDataStore::ReloadData issue reopen, term: " << term + << ", snapshot_ts: " << snapshot_ts << ", tag: " << reopen_tag; + ::eloqstore::GlobalReopenRequest global_reopen_req; + global_reopen_req.SetTag(reopen_tag); + eloq_store_service_->ExecSync(&global_reopen_req); + + if (global_reopen_req.Error() != ::eloqstore::KvError::NoError) { - LOG(ERROR) << "Send reopen request to EloqStore failed"; - return; + LOG(ERROR) << "ReloadData reopen failed, snapshot_ts=" << snapshot_ts + << ", tag=" << reopen_tag << ", error=" + << static_cast(global_reopen_req.Error()) + << ", msg=" << global_reopen_req.ErrMessage(); + return false; } - - op_guard.Release(); + DLOG(INFO) << "ReloadData reopen succeeded, snapshot_ts=" << snapshot_ts + << ", tag=" << reopen_tag; + return true; } -void EloqStoreDataStore::OnReLoaded(::eloqstore::KvRequest *req) +void EloqStoreDataStore::UpdateStandbyMasterStorePaths( + const std::vector &store_paths, + const std::vector &store_path_weights) { - EloqStoreOperationData<::eloqstore::GlobalReopenRequest> *global_reopen_op = - static_cast *>( - reinterpret_cast(req->UserData())); - assert(req == &global_reopen_op->EloqStoreRequest()); - ::eloqstore::GlobalReopenRequest *global_reopen_req = - static_cast<::eloqstore::GlobalReopenRequest *>(req); - PoolableGuard op_guard(global_reopen_op); - - DLOG(INFO) - << "===============EloqStoreDataStore::OnReLoaded, reopen request: " - << &global_reopen_req << " from EloqStore"; - - if (global_reopen_req->Error() == ::eloqstore::KvError::NoError) + auto res = eloq_store_service_->UpdateStandbyMasterStorePaths( + std::vector(store_paths.begin(), store_paths.end()), + std::vector(store_path_weights.begin(), + store_path_weights.end())); + if (res != ::eloqstore::KvError::NoError) { - LOG(INFO) << "Reopen EloqStore succeeded"; + LOG(WARNING) << "UpdateStandbyMasterStorePaths failed with error: " + << static_cast(res); } - else +} + +void EloqStoreDataStore::UpdateStandbyMasterAddr( + const std::string &standby_master_addr) +{ + auto res = + eloq_store_service_->UpdateStandbyMasterAddr(standby_master_addr); + if (res != ::eloqstore::KvError::NoError) { - LOG(ERROR) << "Reopen EloqStore failed with error code: " - << static_cast(global_reopen_req->Error()) - << ", error message: " << global_reopen_req->ErrMessage(); + LOG(WARNING) << "UpdateStandbyMasterAddr failed with error: " + << static_cast(res); } } + } // namespace EloqDS diff --git a/store_handler/eloq_data_store_service/eloq_store_data_store.h b/store_handler/eloq_data_store_service/eloq_store_data_store.h index 6dcd23b3..0ba872b8 100644 --- a/store_handler/eloq_data_store_service/eloq_store_data_store.h +++ b/store_handler/eloq_data_store_service/eloq_store_data_store.h @@ -165,6 +165,12 @@ struct ScanDeleteOperationData : public Poolable class EloqStoreDataStore : public DataStore { public: + struct ArchiveEntry + { + uint64_t term{0}; + std::string tag; + }; + EloqStoreDataStore(uint32_t shard_id, DataStoreService *data_store_service); ~EloqStoreDataStore() override = default; @@ -253,8 +259,17 @@ class EloqStoreDataStore : public DataStore void SwitchToReadWrite() override; void CreateSnapshotForBackup(CreateSnapshotForBackupRequest *req) override; - - void ReloadDataFromCloud(int64_t term) override; + bool CreateSnapshotForStandby(uint32_t ng_id, std::string_view tag); + void DeleteStandbySnapshot(uint64_t term, std::string_view tag) override; + bool ListArchiveTags(std::string_view prefix, + std::vector *entries); + + bool ReloadData(int64_t term, uint64_t snapshot_ts) override; + void UpdateStandbyMasterStorePaths( + const std::vector &store_paths, + const std::vector &store_path_weights) override; + void UpdateStandbyMasterAddr( + const std::string &standby_master_addr) override; private: static void OnRead(::eloqstore::KvRequest *req); @@ -264,8 +279,6 @@ class EloqStoreDataStore : public DataStore static void OnScanNext(::eloqstore::KvRequest *req); static void OnScanDelete(::eloqstore::KvRequest *req); static void OnFloor(::eloqstore::KvRequest *req); - static void OnReLoaded(::eloqstore::KvRequest *req); - void ScanDelete(DeleteRangeRequest *delete_range_req); void Floor(ScanRequest *scan_req); diff --git a/store_handler/eloq_data_store_service/eloq_store_data_store_factory.h b/store_handler/eloq_data_store_service/eloq_store_data_store_factory.h index 598fda5a..d81b6a08 100644 --- a/store_handler/eloq_data_store_service/eloq_store_data_store_factory.h +++ b/store_handler/eloq_data_store_service/eloq_store_data_store_factory.h @@ -22,6 +22,7 @@ #pragma once #include +#include #include #include @@ -144,6 +145,27 @@ class EloqStoreDataStoreFactory : public DataStoreFactory return !eloq_store_configs_.eloqstore_configs_.cloud_store_path.empty(); } + void UpdateStandbyMasterStorePaths( + const std::vector &store_paths, + const std::vector &store_path_weights) + { + eloq_store_configs_.eloqstore_configs_.standby_master_store_paths = + store_paths; + eloq_store_configs_.eloqstore_configs_ + .standby_master_store_path_weights = store_path_weights; + } + + void UpdateStandbyMasterAddr(const std::string &standby_master_addr) + { + eloq_store_configs_.eloqstore_configs_.standby_master_addr = + standby_master_addr; + } + + void SetEnableLocalStandby(bool enable) + { + eloq_store_configs_.eloqstore_configs_.enable_local_standby = enable; + } + private: EloqStoreConfig eloq_store_configs_; diff --git a/store_handler/eloq_data_store_service/eloqstore b/store_handler/eloq_data_store_service/eloqstore index 004823a4..24483fbc 160000 --- a/store_handler/eloq_data_store_service/eloqstore +++ b/store_handler/eloq_data_store_service/eloqstore @@ -1 +1 @@ -Subproject commit 004823a4e9e1619bb2f3909894fd38e9db83998e +Subproject commit 24483fbc9fbb53f608a13aeea72f2c0db1804544 diff --git a/store_handler/eloq_data_store_service/rocksdb_data_store_common.h b/store_handler/eloq_data_store_service/rocksdb_data_store_common.h index 420d569f..8d3a8419 100644 --- a/store_handler/eloq_data_store_service/rocksdb_data_store_common.h +++ b/store_handler/eloq_data_store_service/rocksdb_data_store_common.h @@ -283,10 +283,12 @@ class RocksDBDataStoreCommon : public DataStore rocksdb::InfoLogLevel StringToInfoLogLevel( const std::string &log_level_str); - virtual void ReloadDataFromCloud(int64_t term) override + virtual bool ReloadData(int64_t term, uint64_t snapshot_ts) override { - LOG(ERROR) - << "RocksDBDataStoreCommon::ReloadDataFromCloud, not implemented"; + (void) term; + (void) snapshot_ts; + LOG(ERROR) << "RocksDBDataStoreCommon::ReloadData, not implemented"; + return false; } protected: diff --git a/store_handler/rocksdb_handler.cpp b/store_handler/rocksdb_handler.cpp index 47c039aa..fb81f021 100644 --- a/store_handler/rocksdb_handler.cpp +++ b/store_handler/rocksdb_handler.cpp @@ -527,8 +527,7 @@ bool RocksDBHandler::PutAll( if (!status.ok()) { - LOG(ERROR) << "PutAll end failed " - << ", thread id: " << this_id + LOG(ERROR) << "PutAll end failed " << ", thread id: " << this_id << ", result:" << static_cast(status.ok()) << ", batch size:" << batch.size() << ", error: " << status.ToString() @@ -2447,8 +2446,12 @@ bool RocksDBCloudHandlerImpl::CreateSnapshot( } bool RocksDBCloudHandlerImpl::CreateSnapshotForStandby( - std::vector &snapshot_files) + uint32_t ng_id, + std::vector &snapshot_files, + uint64_t snapshot_ts) { + (void) ng_id; + (void) snapshot_ts; return CreateSnapshot(ckpt_path_, snapshot_files); } @@ -2472,6 +2475,10 @@ bool RocksDBCloudHandlerImpl::SendSnapshotToRemote( std::vector &snapshot_files, const std::string &remote_dest) { + (void) ng_id; + (void) ng_term; + (void) snapshot_files; + (void) remote_dest; return true; } @@ -3769,8 +3776,12 @@ bool RocksDBHandlerImpl::CreateSnapshot( } bool RocksDBHandlerImpl::CreateSnapshotForStandby( - std::vector &snapshot_files) + uint32_t ng_id, + std::vector &snapshot_files, + uint64_t snapshot_ts) { + (void) ng_id; + (void) snapshot_ts; return CreateSnapshot(ckpt_path_, snapshot_files); } bool RocksDBHandlerImpl::CreateSnapshotForBackup( diff --git a/store_handler/rocksdb_handler.h b/store_handler/rocksdb_handler.h index 8742b064..ed441153 100644 --- a/store_handler/rocksdb_handler.h +++ b/store_handler/rocksdb_handler.h @@ -691,8 +691,9 @@ class RocksDBCloudHandlerImpl : public RocksDBHandler bool CreateSnapshot(const std::string &snapshot_path, std::vector &snapshot_files); - bool CreateSnapshotForStandby( - std::vector &snapshot_files) override; + bool CreateSnapshotForStandby(uint32_t ng_id, + std::vector &snapshot_files, + uint64_t snapshot_ts) override; bool CreateSnapshotForBackup( const std::string &backup_name, std::vector &snapshot_files) override; @@ -747,8 +748,9 @@ class RocksDBHandlerImpl : public RocksDBHandler bool CreateSnapshot(const std::string &snapshot_path, std::vector &snapshot_files); - bool CreateSnapshotForStandby( - std::vector &snapshot_files) override; + bool CreateSnapshotForStandby(uint32_t ng_id, + std::vector &snapshot_files, + uint64_t snapshot_ts) override; bool CreateSnapshotForBackup(const std::string &backup_name, std::vector &snapshot_files, uint64_t backup_ts) override; diff --git a/tx_service/include/proto/cc_request.proto b/tx_service/include/proto/cc_request.proto index d9d722ec..5af82f1d 100644 --- a/tx_service/include/proto/cc_request.proto +++ b/tx_service/include/proto/cc_request.proto @@ -313,6 +313,7 @@ message StandbyStartFollowingResponse bool error = 1; repeated uint64 start_sequence_id = 2; uint32 subscribe_id = 3; + string snapshot_path = 4; } message UpdateStandbyCkptTsRequest @@ -326,6 +327,7 @@ message UpdateStandbyCkptTsRequest message UpdateStandbyCkptTsResponse { bool error = 1; + uint64 current_ckpt_ts = 2; } message UpdateStandbyConsistentTsRequest @@ -375,10 +377,23 @@ message OnSnapshotSyncedRequest { int64 standby_node_term = 1; string snapshot_path = 2; uint32 ng_id = 3; + uint64 snapshot_ts = 4; } message OnSnapshotSyncedResponse { bool error = 1; + uint64 current_ckpt_ts = 2; +} + +message RequestSyncSnapshotRequest { + uint32 ng_id = 1; + int64 standby_node_term = 2; + uint64 snapshot_ts = 3; +} + +message RequestSyncSnapshotResponse { + bool error = 1; + uint64 current_ckpt_ts = 2; } @@ -518,6 +533,7 @@ service CcRpcService { rpc UpdateStandbyCkptTs(UpdateStandbyCkptTsRequest) returns (UpdateStandbyCkptTsResponse); rpc RequestStorageSnapshotSync(StorageSnapshotSyncRequest) returns (StorageSnapshotSyncResponse); rpc OnSnapshotSynced(OnSnapshotSyncedRequest) returns (OnSnapshotSyncedResponse); + rpc RequestSyncSnapshot(RequestSyncSnapshotRequest) returns (RequestSyncSnapshotResponse); rpc FetchNodeInfo(FetchNodeInfoRequest) returns (FetchNodeInfoResponse); rpc ResetStandbySequenceId(ResetStandbySequenceIdRequest) returns (ResetStandbySequenceIdResponse); rpc CreateBackup(CreateBackupRequest) returns (FetchBackupResponse); diff --git a/tx_service/include/remote/cc_node_service.h b/tx_service/include/remote/cc_node_service.h index d15c876d..da43fe0c 100644 --- a/tx_service/include/remote/cc_node_service.h +++ b/tx_service/include/remote/cc_node_service.h @@ -24,6 +24,12 @@ #include #include +#include +#include +#include +#include +#include + #include "proto/cc_request.pb.h" namespace txservice @@ -43,6 +49,7 @@ class CcNodeService : public CcRpcService { public: CcNodeService(LocalCcShards &local_shards); + ~CcNodeService() override; void OnLeaderStart(::google::protobuf::RpcController *controller, const OnLeaderStartRequest *request, @@ -184,6 +191,12 @@ class CcNodeService : public CcRpcService ::txservice::remote::OnSnapshotSyncedResponse *response, ::google::protobuf::Closure *done) override; + void RequestSyncSnapshot( + ::google::protobuf::RpcController *controller, + const ::txservice::remote::RequestSyncSnapshotRequest *request, + ::txservice::remote::RequestSyncSnapshotResponse *response, + ::google::protobuf::Closure *done) override; + void FetchNodeInfo(::google::protobuf::RpcController *controller, const ::txservice::remote::FetchNodeInfoRequest *request, ::txservice::remote::FetchNodeInfoResponse *response, @@ -242,7 +255,36 @@ class CcNodeService : public CcRpcService ::google::protobuf::Closure *done) override; private: + enum class StandbyTaskType + { + UpdateStandbyCkptTs, + RequestSyncSnapshot, + }; + + struct StandbyTask + { + StandbyTaskType type; + uint32_t ng_id{0}; + int64_t ng_term{0}; + uint64_t ts{0}; + std::function fn; + }; + + void EnqueueStandbyTask(StandbyTask task); + void StandbyTaskWorkerMain(); + LocalCcShards &local_shards_; + // This queue is only used for standby snapshot/checkpoint coordination, + // which is low-frequency compared with the normal tx path, so a std::mutex + // is acceptable here. A generic MPSC queue is not suitable because it does + // not guarantee a single global order across multiple producers, while + // RequestSyncSnapshot and UpdateStandbyCkptTs rely on strict cross-thread + // enqueue order. + std::mutex standby_task_mu_; + std::condition_variable standby_task_cv_; + std::deque standby_tasks_; + bool standby_task_running_{true}; + std::thread standby_task_worker_; }; } // namespace remote } // namespace txservice diff --git a/tx_service/include/store/data_store_handler.h b/tx_service/include/store/data_store_handler.h index 4059431a..abb7b0ab 100644 --- a/tx_service/include/store/data_store_handler.h +++ b/tx_service/include/store/data_store_handler.h @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -372,8 +373,12 @@ class DataStoreHandler } virtual bool CreateSnapshotForStandby( - std::vector &snapshot_files) + uint32_t ng_id, + std::vector &snapshot_files, + uint64_t snapshot_ts) { + (void) ng_id, (void) snapshot_ts; + (void) snapshot_files; assert(false); return true; } @@ -407,11 +412,54 @@ class DataStoreHandler return true; } - virtual bool OnUpdateStandbyCkptTs(uint32_t ng_id, int64_t ng_term) + virtual bool OnUpdateStandbyCkptTs(uint32_t ng_id, + int64_t ng_term, + uint64_t snapshot_ts, + bool skip_reload_data = false) { (void) ng_id; (void) ng_term; - return true; + (void) snapshot_ts; + (void) skip_reload_data; + assert(false); + return false; + } + + virtual bool RequestSyncSnapshot(uint32_t ng_id, + int64_t ng_term, + uint64_t snapshot_ts) + { + (void) ng_id; + (void) ng_term; + (void) snapshot_ts; + assert(false); + return false; + } + + virtual void DeleteStandbySnapshot(uint32_t ng_id, uint64_t snapshot_ts) + { + (void) ng_id; + (void) snapshot_ts; + } + + virtual void DeleteStandbySnapshotsBefore(uint32_t ng_id, + uint64_t snapshot_ts) + { + (void) ng_id; + (void) snapshot_ts; + } + + virtual uint64_t CurrentStandbySnapshotTs(uint32_t ng_id) + { + (void) ng_id; + return 0; + } + + virtual void SetStandbySnapshotPayload(uint32_t ng_id, + const std::string &snapshot_path) + { + (void) ng_id; + (void) snapshot_path; } virtual void OnStartFollowing(uint32_t ng_id, diff --git a/tx_service/include/store/snapshot_manager.h b/tx_service/include/store/snapshot_manager.h index f7619525..571a183c 100644 --- a/tx_service/include/store/snapshot_manager.h +++ b/tx_service/include/store/snapshot_manager.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include "store/data_store_handler.h" @@ -128,6 +129,17 @@ class SnapshotManager */ void EraseSubscriptionBarrierLocked(uint32_t standby_node_id, int64_t standby_node_term); + bool IsSnapshotSyncCompletedLocked(uint32_t standby_node_id, + int64_t standby_node_term) const; +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + bool GetCompletedSnapshotTsLocked(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t *standby_snapshot_ts) const; +#endif + void MarkSnapshotSyncCompletedLocked(uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t standby_snapshot_ts); + void EraseSnapshotSyncCompletedByNodeLocked(uint32_t standby_node_id); struct PendingSnapshotSyncTask { @@ -158,6 +170,15 @@ class SnapshotManager // max ts) std::unordered_map> subscription_barrier_; +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + // standby node id -> (completed standby term -> standby snapshot ts) + std::unordered_map> + completed_snapshot_term_and_ts_; +#else + // standby node id -> completed standby terms + std::unordered_map> + completed_snapshot_terms_; +#endif bool terminated_{false}; const std::string backup_path_; diff --git a/tx_service/src/checkpointer.cpp b/tx_service/src/checkpointer.cpp index 071802e5..8ae1ea04 100644 --- a/tx_service/src/checkpointer.cpp +++ b/tx_service/src/checkpointer.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include "catalog_key_record.h" #include "cc_request.h" @@ -166,6 +167,7 @@ void Checkpointer::Ckpt(bool is_last_ckpt) snapshot_req.set_standby_node_term(candidate_standby_node_term); snapshot_req.set_standby_node_id(Sharder::Instance().NodeId()); +#ifndef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE std::array buffer; std::string username; FILE *output_stream = popen("echo $USER", "r"); @@ -182,7 +184,14 @@ void Checkpointer::Ckpt(bool is_last_ckpt) pclose(output_stream); snapshot_req.set_user(username); - snapshot_req.set_dest_path(store_hd_->SnapshotSyncDestPath()); + const std::string dest_path = store_hd_->SnapshotSyncDestPath(); + snapshot_req.set_dest_path(dest_path); +#endif + DLOG(INFO) << "Checkpointer send RequestStorageSnapshotSync, ng_id=" + << snapshot_req.ng_id() + << ", standby_node_id=" << snapshot_req.standby_node_id() + << ", standby_node_term=" + << snapshot_req.standby_node_term(); brpc::Controller cntl; stub.RequestStorageSnapshotSync( &cntl, &snapshot_req, &snapshot_resp, nullptr); @@ -193,8 +202,11 @@ void Checkpointer::Ckpt(bool is_last_ckpt) int64_t standby_node_term = Sharder::Instance().StandbyNodeTerm(); bool is_standby_node = standby_node_term > 0; - if (is_standby_node && - Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()) + if (is_standby_node +#ifndef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + && Sharder::Instance().GetDataStoreHandler()->IsSharedStorage() +#endif + ) { // Standby only needs to do checkpoint if its using local disk storage. return; diff --git a/tx_service/src/data_sync_task.cpp b/tx_service/src/data_sync_task.cpp index d12d8c30..6f4c9259 100644 --- a/tx_service/src/data_sync_task.cpp +++ b/tx_service/src/data_sync_task.cpp @@ -154,10 +154,15 @@ void DataSyncTask::SetFinish() } } +#ifndef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE if (!is_standby_node_ckpt_ && Sharder::Instance() .GetDataStoreHandler() ->IsSharedStorage()) { +#else + if (!is_standby_node_ckpt_) + { +#endif BrocastPrimaryCkptTs(node_group_id_, node_group_term_, status_->truncate_log_ts_, diff --git a/tx_service/src/fault/cc_node.cpp b/tx_service/src/fault/cc_node.cpp index cf5dbabe..9def2def 100644 --- a/tx_service/src/fault/cc_node.cpp +++ b/tx_service/src/fault/cc_node.cpp @@ -943,6 +943,18 @@ void CcNode::SubscribePrimaryNode(uint32_t leader_node_id, &cntl, &start_follow_req, &start_follow_resp, nullptr); } +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (!txservice_skip_kv && store_hd != nullptr) + { + store_hd->SetStandbySnapshotPayload(ng_id_, + start_follow_resp.snapshot_path()); + DLOG(INFO) + << "SubscribePrimaryNode set standby snapshot payload, ng_id=" + << ng_id_ << ", leader_node_id=" << leader_node_id + << ", payload=" << start_follow_resp.snapshot_path(); + } +#endif + expected = false; while (!is_processing_.compare_exchange_strong( expected, true, std::memory_order_acq_rel)) @@ -1222,8 +1234,15 @@ void CcNode::SubscribePrimaryNode(uint32_t leader_node_id, // Checkpointer will retry if snapshot sync failed. Do not retry here // too frequently since it might overwhelm the primary node. - snapshot_req.set_dest_path(store_hd->SnapshotSyncDestPath()); + const std::string dest_path = store_hd->SnapshotSyncDestPath(); + snapshot_req.set_dest_path(dest_path); snapshot_req.set_user(username); + DLOG(INFO) << "CcNode::SubscribePrimaryNode send " + "RequestStorageSnapshotSync, ng_id=" + << snapshot_req.ng_id() + << ", standby_node_id=" << snapshot_req.standby_node_id() + << ", standby_node_term=" << snapshot_req.standby_node_term() + << ", dest_path=" << dest_path; cntl.Reset(); cntl.set_timeout_ms(10000); stub->RequestStorageSnapshotSync( diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 6c1b5918..9278648d 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -24,9 +24,12 @@ #include #include #include +#include +#include #include #include +#include #include #include "cc/local_cc_shards.h" @@ -52,9 +55,141 @@ thread_local CcRequestPool read_pool; namespace remote { +namespace +{ +int64_t CurrentSyncedPrimaryTerm() +{ + const int64_t standby_term = Sharder::Instance().StandbyNodeTerm(); + if (standby_term <= 0) + { + return -1; + } + return PrimaryTermFromStandbyTerm(standby_term); +} +} // namespace + CcNodeService::CcNodeService(LocalCcShards &local_shards) : local_shards_(local_shards) { + standby_task_worker_ = std::thread([this]() { StandbyTaskWorkerMain(); }); +} + +CcNodeService::~CcNodeService() +{ + { + std::lock_guard lk(standby_task_mu_); + standby_task_running_ = false; + } + standby_task_cv_.notify_all(); + if (standby_task_worker_.joinable()) + { + standby_task_worker_.join(); + } +} + +void CcNodeService::EnqueueStandbyTask(StandbyTask task) +{ + { + std::lock_guard lk(standby_task_mu_); + standby_tasks_.push_back(std::move(task)); + } + standby_task_cv_.notify_one(); +} + +void CcNodeService::StandbyTaskWorkerMain() +{ + std::optional deferred_task; + while (true) + { + StandbyTask task{StandbyTaskType::RequestSyncSnapshot, 0, 0, 0, {}}; + if (deferred_task.has_value()) + { + task = std::move(*deferred_task); + deferred_task.reset(); + } + else + { + std::unique_lock lk(standby_task_mu_); + while (standby_task_running_ && standby_tasks_.empty()) + { + standby_task_cv_.wait_for(lk, std::chrono::milliseconds(100)); + } + if (!standby_task_running_) + { + break; + } + task = std::move(standby_tasks_.front()); + standby_tasks_.pop_front(); + } + + if (task.type == StandbyTaskType::UpdateStandbyCkptTs) + { + while (true) + { + std::unique_lock lk(standby_task_mu_); + if (standby_tasks_.empty()) + { + break; + } + StandbyTask next_task = std::move(standby_tasks_.front()); + standby_tasks_.pop_front(); + lk.unlock(); + + if (next_task.type == StandbyTaskType::UpdateStandbyCkptTs && + next_task.ng_id == task.ng_id && + (next_task.ng_term > task.ng_term || + (next_task.ng_term == task.ng_term && + next_task.ts >= task.ts))) + { + task = std::move(next_task); + continue; + } + + deferred_task = std::move(next_task); + break; + } + + const int64_t current_synced_term = CurrentSyncedPrimaryTerm(); + if (current_synced_term > task.ng_term) + { + DLOG(INFO) << "Skip stale UpdateStandbyCkptTs task by term, " + << "ng_id=" << task.ng_id + << ", task_ng_term=" << task.ng_term + << ", current_synced_term=" << current_synced_term + << ", task_ckpt_ts=" << task.ts; + continue; + } + + const uint64_t current_ckpt_ts = + Sharder::Instance().NativeNodeGroupCkptTs(); + if (task.ts <= current_ckpt_ts) + { + DLOG(INFO) << "Skip stale UpdateStandbyCkptTs task, ng_id=" + << task.ng_id << ", ng_term=" << task.ng_term + << ", task_ckpt_ts=" << task.ts + << ", current_ckpt_ts=" << current_ckpt_ts; + continue; + } + } + else + { + const int64_t current_synced_term = CurrentSyncedPrimaryTerm(); + const int64_t task_primary_term = + PrimaryTermFromStandbyTerm(task.ng_term); + if (current_synced_term > task_primary_term) + { + DLOG(INFO) << "Skip stale RequestSyncSnapshot task by term, " + << "ng_id=" << task.ng_id + << ", task_standby_term=" << task.ng_term + << ", task_primary_term=" << task_primary_term + << ", current_synced_term=" << current_synced_term + << ", snapshot_ts=" << task.ts; + continue; + } + } + + task.fn(); + } } void CcNodeService::OnLeaderStart(::google::protobuf::RpcController *controller, @@ -1672,6 +1807,33 @@ void CcNodeService::StandbyStartFollowing( auto subscribe_id = Sharder::Instance().GetNextSubscribeId(); response->set_subscribe_id(subscribe_id); +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto *store_hd = local_shards_.store_hd_; + if (store_hd != nullptr) + { + std::string leader_ip; + uint16_t leader_port = 0; + Sharder::Instance().GetNodeAddress( + Sharder::Instance().NodeId(), leader_ip, leader_port); + (void) leader_port; + const char *user = std::getenv("USER"); + std::string snapshot_path; + if (user != nullptr && user[0] != '\0') + { + snapshot_path = std::string(user) + "@" + leader_ip + ":" + + store_hd->SnapshotSyncDestPath(); + } + else + { + snapshot_path = leader_ip + ":" + store_hd->SnapshotSyncDestPath(); + } + response->set_snapshot_path(snapshot_path); + DLOG(INFO) << "StandbyStartFollowing set snapshot_path, ng_id=" + << request->node_group_id() + << ", node_id=" << request->node_id() + << ", snapshot_path=" << snapshot_path; + } +#endif response->set_error(false); } @@ -1683,6 +1845,48 @@ void CcNodeService::UpdateStandbyCkptTs( { brpc::ClosureGuard done_guard(done); +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + DLOG(INFO) << "Receive UpdateStandbyCkptTs req, req ckpt ts:" + << request->primary_succ_ckpt_ts() << ", has_data_store_write: " + << (int) request->has_data_store_write(); + if (request->primary_succ_ckpt_ts() <= + Sharder::Instance().NativeNodeGroupCkptTs()) + { + DLOG(INFO) << "Discard UpdateStandbyCkptTs req, req ckpt ts:" + << request->primary_succ_ckpt_ts() + << ", own:" << Sharder::Instance().NativeNodeGroupCkptTs(); + response->set_error(false); + response->set_current_ckpt_ts( + Sharder::Instance().NativeNodeGroupCkptTs()); + return; + } + auto store_hd = Sharder::Instance().GetLocalCcShards()->store_hd_; + const bool has_data_store_write = request->has_data_store_write(); + + const uint32_t ng_id = request->node_group_id(); + const int64_t ng_term = request->ng_term(); + const uint64_t ckpt_ts = request->primary_succ_ckpt_ts(); + EnqueueStandbyTask( + {StandbyTaskType::UpdateStandbyCkptTs, + ng_id, + ng_term, + ckpt_ts, + [store_hd, ng_id, ng_term, ckpt_ts, has_data_store_write]() + { + const bool ok = + store_hd == nullptr + ? true + : store_hd->OnUpdateStandbyCkptTs( + ng_id, ng_term, ckpt_ts, !has_data_store_write); + if (ok) + { + Sharder::Instance().UpdateNodeGroupCkptTs(ng_id, ckpt_ts); + } + }}); + DLOG(INFO) << "Enqueued UpdateStandbyCkptTs req, req ckpt ts:" + << request->primary_succ_ckpt_ts() + << ", has_data_store_write: " << (int) has_data_store_write; +#else if (Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()) { auto store_hd = Sharder::Instance().GetLocalCcShards()->store_hd_; @@ -1691,15 +1895,21 @@ void CcNodeService::UpdateStandbyCkptTs( if (store_hd && has_data_store_write) { store_hd->OnUpdateStandbyCkptTs(request->node_group_id(), - request->ng_term()); + request->ng_term(), + request->primary_succ_ckpt_ts()); } Sharder::Instance().UpdateNodeGroupCkptTs( request->node_group_id(), request->primary_succ_ckpt_ts()); } +#endif // response does not matter response->set_error(false); + response->set_current_ckpt_ts(Sharder::Instance().NativeNodeGroupCkptTs()); + DLOG(INFO) << "Finished UpdateStandbyCkptTs req, req ckpt ts:" + << request->primary_succ_ckpt_ts() << ", current ckpt ts:" + << Sharder::Instance().NativeNodeGroupCkptTs(); } void CcNodeService::UpdateStandbyConsistentTs( @@ -1763,16 +1973,31 @@ void CcNodeService::RequestStorageSnapshotSync( if (!store_hd) { // kv store not enabled + DLOG(INFO) << "RequestStorageSnapshotSync rejected: store handler is " + "nullptr, ng_id=" + << request->ng_id() + << ", standby_node_id=" << request->standby_node_id() + << ", standby_term=" << request->standby_node_term(); response->set_error(true); return; } int64_t standby_node_term = request->standby_node_term(); int64_t primary_leader_term = PrimaryTermFromStandbyTerm(standby_node_term); + DLOG(INFO) << "RequestStorageSnapshotSync received, ng_id=" + << request->ng_id() + << ", standby_node_id=" << request->standby_node_id() + << ", standby_term=" << standby_node_term + << ", primary_term=" << primary_leader_term + << ", dest_path=" << request->dest_path(); if (!Sharder::Instance().CheckLeaderTerm(request->ng_id(), primary_leader_term)) { + DLOG(INFO) << "RequestStorageSnapshotSync rejected by term check, " + "ng_id=" + << request->ng_id() << ", standby_term=" << standby_node_term + << ", primary_term=" << primary_leader_term; response->set_error(true); return; } @@ -1810,7 +2035,73 @@ void CcNodeService::OnSnapshotSynced( } bool succ = Sharder::Instance().OnSnapshotReceived(request); + const uint64_t current_ckpt_ts = + store_hd->CurrentStandbySnapshotTs(request->ng_id()); response->set_error(!succ); + response->set_current_ckpt_ts(current_ckpt_ts); +} + +void CcNodeService::RequestSyncSnapshot( + ::google::protobuf::RpcController *controller, + const ::txservice::remote::RequestSyncSnapshotRequest *request, + ::txservice::remote::RequestSyncSnapshotResponse *response, + ::google::protobuf::Closure *done) +{ + brpc::ClosureGuard done_guard(done); + const uint32_t local_node_id = Sharder::Instance().NodeId(); + DLOG(INFO) << "RequestSyncSnapshot RPC received, ng_id=" << request->ng_id() + << ", snapshot_ts=" << request->snapshot_ts() + << ", local_node_id=" << local_node_id << ", role=follower"; + auto store_hd = Sharder::Instance().GetLocalCcShards()->store_hd_; + if (!store_hd) + { + response->set_error(true); + return; + } + const int64_t term = + PrimaryTermFromStandbyTerm(request->standby_node_term()); + const uint32_t ng_id = request->ng_id(); + const int64_t standby_term = request->standby_node_term(); + const uint64_t snapshot_ts = request->snapshot_ts(); + EnqueueStandbyTask( + {StandbyTaskType::RequestSyncSnapshot, + ng_id, + standby_term, + snapshot_ts, + [store_hd, ng_id, standby_term, term, snapshot_ts]() + { + const bool ok = + store_hd->RequestSyncSnapshot(ng_id, term, snapshot_ts); + if (ok) + { + Sharder::Instance().SetStandbyNodeTerm(standby_term); + Sharder::Instance().SetCandidateStandbyNodeTerm(-1); + if (!txservice::txservice_skip_kv && + !txservice::txservice_enable_cache_replacement) + { + store_hd->RestoreTxCache(ng_id, standby_term); + } + DLOG(INFO) << "RequestSyncSnapshot async RequestSyncSnapshot " + "successfully, ng_id=" + << ng_id << ", standby_term=" << standby_term + << ", snapshot_ts=" << snapshot_ts; + } + DLOG(INFO) << "RequestSyncSnapshot async RequestSyncSnapshot " + "done, ng_id=" + << ng_id << ", standby_term=" << standby_term + << ", snapshot_ts=" << snapshot_ts << ", success=" << ok + << ", role=follower"; + }}); + const uint64_t current_ckpt_ts = + store_hd->CurrentStandbySnapshotTs(request->ng_id()); + DLOG(INFO) << "RequestSyncSnapshot handled, ng_id=" << request->ng_id() + << ", standby_term=" << request->standby_node_term() + << ", term=" << term + << ", snapshot_ts=" << request->snapshot_ts() + << ", current_ckpt_ts=" << current_ckpt_ts + << ", local_node_id=" << local_node_id; + response->set_error(false); + response->set_current_ckpt_ts(current_ckpt_ts); } void CcNodeService::FetchNodeInfo( diff --git a/tx_service/src/standby.cpp b/tx_service/src/standby.cpp index 0d2e7e7e..17f5b9b4 100644 --- a/tx_service/src/standby.cpp +++ b/tx_service/src/standby.cpp @@ -21,11 +21,66 @@ */ #include "standby.h" +#include +#include + +#include +#include + #include "cc_request.h" #include "local_cc_shards.h" namespace txservice { +namespace +{ +struct UpdateStandbyCkptAgg +{ + bthread::Mutex mux; + bthread::ConditionVariable cv; + size_t pending{0}; + size_t total{0}; + size_t success{0}; + uint64_t min_ack_ckpt_ts{std::numeric_limits::max()}; +}; + +struct UpdateStandbyCkptRpcCtx +{ + brpc::Controller cntl; + remote::UpdateStandbyCkptTsResponse resp; + UpdateStandbyCkptAgg *agg{nullptr}; +}; + +class UpdateStandbyCkptDone : public google::protobuf::Closure +{ +public: + explicit UpdateStandbyCkptDone(std::shared_ptr ctx) + : ctx_(std::move(ctx)) + { + } + + void Run() override + { + std::unique_ptr self_guard(this); + auto *agg = ctx_->agg; + const bool succ = !ctx_->cntl.Failed() && !ctx_->resp.error(); + { + std::lock_guard lk(agg->mux); + if (succ) + { + agg->success++; + agg->min_ack_ckpt_ts = std::min(agg->min_ack_ckpt_ts, + ctx_->resp.current_ckpt_ts()); + } + agg->pending--; + } + agg->cv.notify_all(); + } + +private: + std::shared_ptr ctx_; +}; +} // namespace void StandbyForwardEntry::AddTxCommand(ApplyCc &cc_req) { @@ -66,6 +121,7 @@ void BrocastPrimaryCkptTs(NodeGroupId node_group_id, uint64_t primary_ckpt_ts, bool has_data_store_write) { + DLOG(INFO) << "BrocastPrimaryCkptTs, ckpt ts " << primary_ckpt_ts; std::vector subscribe_node_ids; WaitableCc get_subscribe_node_ids_cc; get_subscribe_node_ids_cc.Reset( @@ -81,9 +137,17 @@ void BrocastPrimaryCkptTs(NodeGroupId node_group_id, if (!subscribe_node_ids.empty()) { - brpc::Controller cntl; +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (has_data_store_write) + { + auto *store_hd = Sharder::Instance().GetDataStoreHandler(); + std::vector snapshot_files; + store_hd->CreateSnapshotForStandby( + node_group_id, snapshot_files, primary_ckpt_ts); + } +#endif + remote::UpdateStandbyCkptTsRequest update_standby_ckpt_ts_req; - remote::UpdateStandbyCkptTsResponse update_standby_ckpt_ts_resp; update_standby_ckpt_ts_req.set_ng_term(node_group_term); update_standby_ckpt_ts_req.set_node_group_id(node_group_id); @@ -91,23 +155,64 @@ void BrocastPrimaryCkptTs(NodeGroupId node_group_id, update_standby_ckpt_ts_req.set_has_data_store_write( has_data_store_write); + UpdateStandbyCkptAgg agg; for (uint32_t node_id : subscribe_node_ids) { auto channel = Sharder::Instance().GetCcNodeServiceChannel(node_id); - if (channel) + if (!channel) + { + continue; + } + remote::CcRpcService_Stub stub(channel.get()); + auto rpc_ctx = std::make_shared(); + rpc_ctx->cntl.set_timeout_ms(300); + rpc_ctx->agg = &agg; + { + std::lock_guard lk(agg.mux); + agg.pending++; + agg.total++; + } + DLOG(INFO) << "send UpdateStandbyCkptTs to node " << node_id + << ", snapshot_ts " << primary_ckpt_ts; + stub.UpdateStandbyCkptTs(&rpc_ctx->cntl, + &update_standby_ckpt_ts_req, + &rpc_ctx->resp, + new UpdateStandbyCkptDone(rpc_ctx)); + } + + { + std::unique_lock lk(agg.mux); + while (agg.pending != 0) { - remote::CcRpcService_Stub stub(channel.get()); - cntl.Reset(); - cntl.set_timeout_ms(300); - update_standby_ckpt_ts_resp.Clear(); - - // We don't care about response - stub.UpdateStandbyCkptTs(&cntl, - &update_standby_ckpt_ts_req, - &update_standby_ckpt_ts_resp, - nullptr); + agg.cv.wait(lk); } } + +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto *store_hd = Sharder::Instance().GetDataStoreHandler(); + DLOG(INFO) << "BrocastPrimaryCkptTs cleanup check, ng_id=" + << node_group_id + << ", has_data_store_write=" << has_data_store_write + << ", ack_total=" << agg.total + << ", ack_success=" << agg.success + << ", min_ack_ckpt_ts=" << agg.min_ack_ckpt_ts; + if (agg.total > 0 && agg.success == agg.total && + agg.min_ack_ckpt_ts != std::numeric_limits::max()) + { + store_hd->DeleteStandbySnapshotsBefore(node_group_id, + agg.min_ack_ckpt_ts); + } + else + { + DLOG(INFO) << "BrocastPrimaryCkptTs skip " + "DeleteStandbySnapshotsBefore, ng_id=" + << node_group_id + << ", has_data_store_write=" << has_data_store_write + << ", ack_total=" << agg.total + << ", ack_success=" << agg.success + << ", min_ack_ckpt_ts=" << agg.min_ack_ckpt_ts; + } +#endif } } diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index e925006b..b06d0257 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -21,6 +21,12 @@ */ #include "store/snapshot_manager.h" +#include +#include +#include + +#include +#include #include #include "cc/local_cc_shards.h" @@ -30,6 +36,121 @@ namespace txservice namespace store { +namespace +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE +struct RequestSyncSnapshotAggregate +{ + bthread::Mutex mux; + bthread::ConditionVariable cv; + size_t pending{0}; + size_t success{0}; + uint64_t min_ack_ckpt_ts{std::numeric_limits::max()}; +}; + +struct RequestSyncSnapshotRpcCtx +{ + brpc::Controller cntl; + remote::RequestSyncSnapshotRequest req; + remote::RequestSyncSnapshotResponse resp; + RequestSyncSnapshotAggregate *aggregate{nullptr}; + uint32_t standby_node_id{0}; + int64_t standby_term{0}; + uint32_t ng_id{0}; +}; + +class RequestSyncSnapshotDone : public google::protobuf::Closure +{ +public: + explicit RequestSyncSnapshotDone( + std::shared_ptr ctx) + : ctx_(std::move(ctx)) + { + } + + void Run() override + { + std::unique_ptr self_guard(this); + auto *agg = ctx_->aggregate; + const bool succ = !ctx_->cntl.Failed() && !ctx_->resp.error(); + if (agg == nullptr) + { + if (!succ) + { + LOG(WARNING) + << "RequestSyncSnapshot async dispatch failed, ng_id=" + << ctx_->ng_id + << ", standby_node_id=" << ctx_->standby_node_id + << ", standby_term=" << ctx_->standby_term + << ", error=" << ctx_->cntl.ErrorText() + << ", failed=" << ctx_->cntl.Failed() + << ", resp_error=" << ctx_->resp.error(); + } + return; + } + { + std::lock_guard lk(agg->mux); + if (succ) + { + agg->success++; + agg->min_ack_ckpt_ts = std::min(agg->min_ack_ckpt_ts, + ctx_->resp.current_ckpt_ts()); + } + agg->pending--; + } + agg->cv.notify_all(); + } + +private: + std::shared_ptr ctx_; +}; + +bool DispatchRequestSyncSnapshotAsync(uint32_t ng_id, + uint32_t standby_node_id, + int64_t standby_term, + uint64_t snapshot_ts, + RequestSyncSnapshotAggregate *aggregate) +{ + auto channel = Sharder::Instance().GetCcNodeServiceChannel(standby_node_id); + if (!channel) + { + LOG(WARNING) << "RequestSyncSnapshot channel is nullptr for standby " + << "node #" << standby_node_id << " at standby term " + << standby_term << ", ng_id=" << ng_id + << ", snapshot_ts=" << snapshot_ts; + return false; + } + + auto rpc_ctx = std::make_shared(); + rpc_ctx->cntl.set_timeout_ms(1000); + rpc_ctx->req.set_standby_node_term(standby_term); + rpc_ctx->req.set_ng_id(ng_id); + rpc_ctx->req.set_snapshot_ts(snapshot_ts); + rpc_ctx->aggregate = aggregate; + rpc_ctx->standby_node_id = standby_node_id; + rpc_ctx->standby_term = standby_term; + rpc_ctx->ng_id = ng_id; + + if (aggregate != nullptr) + { + std::lock_guard lk(aggregate->mux); + aggregate->pending++; + } + + remote::CcRpcService_Stub stub(channel.get()); + DLOG(INFO) << "RequestSyncSnapshot async dispatch, ng_id=" << ng_id + << ", standby_node_id=" << standby_node_id + << ", standby_term=" << standby_term + << ", snapshot_ts=" << snapshot_ts + << ", aggregate=" << (aggregate != nullptr); + stub.RequestSyncSnapshot(&rpc_ctx->cntl, + &rpc_ctx->req, + &rpc_ctx->resp, + new RequestSyncSnapshotDone(rpc_ctx)); + return true; +} +#endif +} // namespace void SnapshotManager::Start() { @@ -87,7 +208,9 @@ bool SnapshotManager::OnSnapshotSyncRequested( { DLOG(INFO) << "Received snapshot sync request from standby node #" << req->standby_node_id() - << " for standby term: " << req->standby_node_term(); + << " for standby term: " << req->standby_node_term() + << ", ng_id=" << req->ng_id() + << ", dest_path=" << req->dest_path(); assert(store_hd_ != nullptr); if (store_hd_ == nullptr) { @@ -95,54 +218,146 @@ bool SnapshotManager::OnSnapshotSyncRequested( return false; } - std::unique_lock lk(standby_sync_mux_); - - if (!terminated_) +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + bool should_resend_completed_snapshot = false; + uint64_t completed_snapshot_ts = 0; + uint32_t completed_ng_id = 0; + uint32_t completed_standby_node_id = 0; + int64_t completed_standby_term = 0; +#endif { - auto node_it = subscription_barrier_.find(req->standby_node_id()); - if (node_it == subscription_barrier_.end()) + std::unique_lock lk(standby_sync_mux_); + + if (terminated_) { - LOG(WARNING) << "No subscription barrier found for standby node #" - << req->standby_node_id() - << ", standby term: " << req->standby_node_term(); return false; } - auto barrier_it = node_it->second.find(req->standby_node_term()); - if (barrier_it == node_it->second.end()) + auto node_it = subscription_barrier_.find(req->standby_node_id()); + if (node_it == subscription_barrier_.end()) { - LOG(WARNING) << "No barrier found for standby node #" - << req->standby_node_id() - << " at standby term: " << req->standby_node_term(); - return false; + if (IsSnapshotSyncCompletedLocked(req->standby_node_id(), + req->standby_node_term())) + { +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (!GetCompletedSnapshotTsLocked(req->standby_node_id(), + req->standby_node_term(), + &completed_snapshot_ts)) + { + LOG(WARNING) + << "Completed term found without snapshot ts, " + << "standby node #" << req->standby_node_id() + << ", standby term: " << req->standby_node_term(); + return false; + } + DLOG(INFO) << "Received duplicate snapshot sync request for " + "completed standby node #" + << req->standby_node_id() + << ", standby term: " << req->standby_node_term() + << ", snapshot_ts=" << completed_snapshot_ts; + should_resend_completed_snapshot = true; + completed_ng_id = req->ng_id(); + completed_standby_node_id = req->standby_node_id(); + completed_standby_term = req->standby_node_term(); +#else + return true; +#endif + } + else + { + LOG(WARNING) << "No subscription barrier found for standby " + "node #" + << req->standby_node_id() + << ", standby term: " << req->standby_node_term(); + return false; + } } - - uint64_t active_tx_max_ts = barrier_it->second; - - auto ins_pair = pending_req_.try_emplace(req->standby_node_id()); - if (!ins_pair.second) + else { - // check if the queued task is newer than the new received req. If - // so, discard the new req, otherwise, update the task. - auto &cur_task = ins_pair.first->second; - int64_t cur_task_standby_node_term = - cur_task.req.standby_node_term(); - int64_t req_standby_node_term = req->standby_node_term(); - - if (cur_task_standby_node_term >= req_standby_node_term) + auto barrier_it = node_it->second.find(req->standby_node_term()); + if (barrier_it == node_it->second.end()) { - // discard the task. + if (IsSnapshotSyncCompletedLocked(req->standby_node_id(), + req->standby_node_term())) + { +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (!GetCompletedSnapshotTsLocked(req->standby_node_id(), + req->standby_node_term(), + &completed_snapshot_ts)) + { + LOG(WARNING) + << "Completed term found without snapshot ts, " + << "standby node #" << req->standby_node_id() + << ", standby term: " << req->standby_node_term(); + return false; + } + DLOG(INFO) + << "Received duplicate snapshot sync request for " + "completed standby node #" + << req->standby_node_id() + << ", standby term: " << req->standby_node_term() + << ", snapshot_ts=" << completed_snapshot_ts; + should_resend_completed_snapshot = true; + completed_ng_id = req->ng_id(); + completed_standby_node_id = req->standby_node_id(); + completed_standby_term = req->standby_node_term(); +#else + return true; +#endif + } + else + { + LOG(WARNING) + << "No barrier found for standby node #" + << req->standby_node_id() + << " at standby term: " << req->standby_node_term(); + return false; + } + } + else + { + uint64_t active_tx_max_ts = barrier_it->second; + + auto ins_pair = + pending_req_.try_emplace(req->standby_node_id()); + if (!ins_pair.second) + { + // check if the queued task is newer than the new received + // req. If so, discard the new req, otherwise, update the + // task. + auto &cur_task = ins_pair.first->second; + int64_t cur_task_standby_node_term = + cur_task.req.standby_node_term(); + int64_t req_standby_node_term = req->standby_node_term(); + + if (cur_task_standby_node_term >= req_standby_node_term) + { + // discard the task. + return true; + } + } + + ins_pair.first->second.req.CopyFrom(*req); + ins_pair.first->second.subscription_active_tx_max_ts = + active_tx_max_ts; + standby_sync_cv_.notify_all(); return true; } } + } - ins_pair.first->second.req.CopyFrom(*req); - ins_pair.first->second.subscription_active_tx_max_ts = active_tx_max_ts; - standby_sync_cv_.notify_all(); - return true; +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (should_resend_completed_snapshot) + { + return DispatchRequestSyncSnapshotAsync(completed_ng_id, + completed_standby_node_id, + completed_standby_term, + completed_snapshot_ts, + nullptr); } +#endif - return false; + return true; } void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, @@ -150,6 +365,60 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, uint64_t active_tx_max_ts) { std::unique_lock lk(standby_sync_mux_); +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto completed_it = completed_snapshot_term_and_ts_.find(standby_node_id); + if (completed_it != completed_snapshot_term_and_ts_.end()) + { + for (const auto &entry : completed_it->second) + { + int64_t completed_term = entry.first; + if (completed_term > standby_node_term) + { + DLOG(INFO) << "Ignore stale subscription barrier registration " + "for standby node #" + << standby_node_id << ", term " << standby_node_term + << " because completed term " << completed_term + << " is newer"; + return; + } + } + + if (completed_it->second.find(standby_node_term) != + completed_it->second.end()) + { + DLOG(INFO) << "Skip barrier registration for already completed " + "standby node #" + << standby_node_id << ", term " << standby_node_term; + return; + } + } +#else + auto completed_it = completed_snapshot_terms_.find(standby_node_id); + if (completed_it != completed_snapshot_terms_.end()) + { + for (int64_t completed_term : completed_it->second) + { + if (completed_term > standby_node_term) + { + DLOG(INFO) << "Ignore stale subscription barrier registration " + "for standby node #" + << standby_node_id << ", term " << standby_node_term + << " because completed term " << completed_term + << " is newer"; + return; + } + } + + if (completed_it->second.find(standby_node_term) != + completed_it->second.end()) + { + DLOG(INFO) << "Skip barrier registration for already completed " + "standby node #" + << standby_node_id << ", term " << standby_node_term; + return; + } + } +#endif // Ignore out-of-order old barrier registrations when a newer standby term // is already known for this standby node. @@ -191,6 +460,48 @@ void SnapshotManager::RegisterSubscriptionBarrier(uint32_t standby_node_id, pending_req_.erase(pending_it); } +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (completed_it != completed_snapshot_term_and_ts_.end()) + { + auto term_it = completed_it->second.begin(); + while (term_it != completed_it->second.end()) + { + if (term_it->first < standby_node_term) + { + term_it = completed_it->second.erase(term_it); + } + else + { + ++term_it; + } + } + if (completed_it->second.empty()) + { + completed_snapshot_term_and_ts_.erase(completed_it); + } + } +#else + if (completed_it != completed_snapshot_terms_.end()) + { + auto term_it = completed_it->second.begin(); + while (term_it != completed_it->second.end()) + { + if (*term_it < standby_node_term) + { + term_it = completed_it->second.erase(term_it); + } + else + { + ++term_it; + } + } + if (completed_it->second.empty()) + { + completed_snapshot_terms_.erase(completed_it); + } + } +#endif + auto &node_barriers = subscription_barrier_[standby_node_id]; // Keep only current and newer terms for this node. @@ -253,6 +564,7 @@ void SnapshotManager::EraseSubscriptionBarriersByNode(uint32_t standby_node_id) std::unique_lock lk(standby_sync_mux_); pending_req_.erase(standby_node_id); subscription_barrier_.erase(standby_node_id); + EraseSnapshotSyncCompletedByNodeLocked(standby_node_id); } void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id, @@ -271,6 +583,104 @@ void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id, } } +bool SnapshotManager::IsSnapshotSyncCompletedLocked( + uint32_t standby_node_id, int64_t standby_node_term) const +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto node_it = completed_snapshot_term_and_ts_.find(standby_node_id); + if (node_it == completed_snapshot_term_and_ts_.end()) + { + return false; + } +#else + auto node_it = completed_snapshot_terms_.find(standby_node_id); + if (node_it == completed_snapshot_terms_.end()) + { + return false; + } +#endif + return node_it->second.find(standby_node_term) != node_it->second.end(); +} + +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE +bool SnapshotManager::GetCompletedSnapshotTsLocked( + uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t *standby_snapshot_ts) const +{ + if (standby_snapshot_ts == nullptr) + { + return false; + } + + auto node_it = completed_snapshot_term_and_ts_.find(standby_node_id); + if (node_it == completed_snapshot_term_and_ts_.end()) + { + return false; + } + + auto term_it = node_it->second.find(standby_node_term); + if (term_it == node_it->second.end()) + { + return false; + } + + *standby_snapshot_ts = term_it->second; + return true; +} +#endif + +void SnapshotManager::MarkSnapshotSyncCompletedLocked( + uint32_t standby_node_id, + int64_t standby_node_term, + uint64_t standby_snapshot_ts) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + auto &snapshot_term_and_ts = + completed_snapshot_term_and_ts_[standby_node_id]; + snapshot_term_and_ts[standby_node_term] = standby_snapshot_ts; + auto it = snapshot_term_and_ts.begin(); + while (it != snapshot_term_and_ts.end()) + { + if (it->first < standby_node_term) + { + it = snapshot_term_and_ts.erase(it); + } + else + { + ++it; + } + } +#else + auto &completed_terms = completed_snapshot_terms_[standby_node_id]; + completed_terms.insert(standby_node_term); + + auto it = completed_terms.begin(); + while (it != completed_terms.end()) + { + if (*it < standby_node_term) + { + it = completed_terms.erase(it); + } + else + { + ++it; + } + } + (void) standby_snapshot_ts; +#endif +} + +void SnapshotManager::EraseSnapshotSyncCompletedByNodeLocked( + uint32_t standby_node_id) +{ +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + completed_snapshot_term_and_ts_.erase(standby_node_id); +#else + completed_snapshot_terms_.erase(standby_node_id); +#endif +} + // If kvstore is enabled, we must flush data in-memory to kvstore firstly. // For non-shared kvstore, also we create and send the snapshot to standby // nodes. @@ -279,6 +689,7 @@ void SnapshotManager::EraseSubscriptionBarrierLocked(uint32_t standby_node_id, // kvstore on cache miss). void SnapshotManager::SyncWithStandby() { + DLOG(INFO) << "SyncWithStandby"; assert(store_hd_ != nullptr); if (store_hd_ == nullptr) { @@ -297,6 +708,11 @@ void SnapshotManager::SyncWithStandby() // clear barriers as well, all queued sync states are stale when this // leader term is no longer valid. subscription_barrier_.clear(); +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + completed_snapshot_term_and_ts_.clear(); +#else + completed_snapshot_terms_.clear(); +#endif return; } @@ -306,7 +722,7 @@ void SnapshotManager::SyncWithStandby() std::vector tasks; - // Pick all pending tasks that can be covered by current checkpoint ts. + // Dequeue all pending tasks that can be covered by this snapshot. { std::unique_lock lk(standby_sync_mux_); auto it = pending_req_.begin(); @@ -387,7 +803,6 @@ void SnapshotManager::SyncWithStandby() } bool ckpt_res = this->RunOneRoundCheckpoint(node_group, leader_term); - if (!ckpt_res) { // Data flush failed. Retry on next run. @@ -395,18 +810,29 @@ void SnapshotManager::SyncWithStandby() return; } - // Now take a snapshot for non-shared storage, and then send to standby - // node. std::vector snapshot_files; - if (!store_hd_->IsSharedStorage()) +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + // Create a fresh standby snapshot after the checkpoint completed. + const uint64_t standby_snapshot_ts = static_cast( + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count()); + bool res = store_hd_->CreateSnapshotForStandby( + node_group, snapshot_files, standby_snapshot_ts); + if (!res) { - bool res = store_hd_->CreateSnapshotForStandby(snapshot_files); - if (!res) - { - LOG(ERROR) << "Failed to create snpashot for sync with standby"; - return; - } + LOG(ERROR) << "Failed to create snpashot for sync with standby"; + return; } + DLOG(INFO) << "SyncWithStandby created standby snapshot, ng_id=" + << node_group << ", term=" << leader_term + << ", snapshot_ts=" << standby_snapshot_ts; + + uint64_t min_ack_ckpt_ts = std::numeric_limits::max(); + size_t sync_snapshot_rpc_count = 0; + size_t sync_snapshot_success = 0; + RequestSyncSnapshotAggregate sync_snapshot_agg; +#endif for (auto &task : tasks) { @@ -430,7 +856,9 @@ void SnapshotManager::SyncWithStandby() std::string ip; uint16_t port; Sharder::Instance().GetNodeAddress(node_id, ip, port); +#ifndef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE std::string remote_dest = req.user() + "@" + ip + ":" + req.dest_path(); +#endif int64_t req_primary_term = PrimaryTermFromStandbyTerm(req_standby_node_term); @@ -441,21 +869,37 @@ void SnapshotManager::SyncWithStandby() } bool succ = true; +#ifndef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE if (!snapshot_files.empty()) { succ = store_hd_->SendSnapshotToRemote( req.ng_id(), req_primary_term, snapshot_files, remote_dest); } +#else + assert(snapshot_files.empty()); +#endif if (succ) { + bool notify_succ = false; +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + notify_succ = + DispatchRequestSyncSnapshotAsync(req.ng_id(), + req.standby_node_id(), + req.standby_node_term(), + standby_snapshot_ts, + &sync_snapshot_agg); + if (notify_succ) + { + sync_snapshot_rpc_count++; + } +#else auto channel = Sharder::Instance().GetCcNodeServiceChannel( req.standby_node_id()); DLOG(INFO) << "Notifying standby node #" << req.standby_node_id() << " for snapshot synced at term " << req.standby_node_term() << ", channel: " << (channel ? "valid" : "null"); - if (channel) { // needs retry if failed @@ -484,19 +928,46 @@ void SnapshotManager::SyncWithStandby() std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } - else + if (on_sync_resp.error()) { - break; + LOG(WARNING) + << "OnSnapshotSynced to standby node #" + << req.standby_node_id() + << " returned error response at standby term " + << req.standby_node_term(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + continue; } + + notify_succ = true; + break; } } + else + { + LOG(WARNING) << "OnSnapshotSynced channel is nullptr for " + << "standby node #" << req.standby_node_id() + << " at standby term " << req.standby_node_term(); + } +#endif + + if (!notify_succ) + { + // Keep pending/barrier so standby can retry with the same + // standby term. + continue; + } - // We don't care if the notification is successful or not. - // We just need to erase the same request. Even if the notification - // fails, after a while, the standby node will resend the - // request. { std::unique_lock lk(standby_sync_mux_); +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + const uint64_t completed_snapshot_ts = standby_snapshot_ts; +#else + const uint64_t completed_snapshot_ts = 0; +#endif + MarkSnapshotSyncCompletedLocked(req.standby_node_id(), + req.standby_node_term(), + completed_snapshot_ts); auto pending_req_iter = pending_req_.find(req.standby_node_id()); if (pending_req_iter != pending_req_.end()) @@ -542,6 +1013,43 @@ void SnapshotManager::SyncWithStandby() req.standby_node_term()); } } +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + { + std::unique_lock lk(sync_snapshot_agg.mux); + while (sync_snapshot_agg.pending != 0) + { + sync_snapshot_agg.cv.wait(lk); + } + min_ack_ckpt_ts = sync_snapshot_agg.min_ack_ckpt_ts; + sync_snapshot_success = sync_snapshot_agg.success; + } +#endif + +#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE + if (standby_snapshot_ts != 0 && sync_snapshot_rpc_count > 0 && + sync_snapshot_success == sync_snapshot_rpc_count && + min_ack_ckpt_ts != std::numeric_limits::max()) + { + DLOG(INFO) + << "SyncWithStandby deleting standby snapshot archives by min " + "acked ts, ng_id=" + << node_group << ", term=" << leader_term + << ", current_snapshot_ts=" << standby_snapshot_ts + << ", sync_snapshot_rpc_count=" << sync_snapshot_rpc_count + << ", sync_snapshot_success=" << sync_snapshot_success + << ", min_ack_ckpt_ts=" << min_ack_ckpt_ts; + store_hd_->DeleteStandbySnapshotsBefore(node_group, min_ack_ckpt_ts); + } + else if (standby_snapshot_ts != 0) + { + DLOG(INFO) << "SyncWithStandby skip DeleteStandbySnapshotsBefore, " + << "ng_id=" << node_group << ", term=" << leader_term + << ", current_snapshot_ts=" << standby_snapshot_ts + << ", sync_snapshot_rpc_count=" << sync_snapshot_rpc_count + << ", sync_snapshot_success=" << sync_snapshot_success + << ", min_ack_ckpt_ts=" << min_ack_ckpt_ts; + } +#endif } uint64_t SnapshotManager::GetCurrentCheckpointTs(uint32_t node_group)