diff --git a/core/src/storage_init.cpp b/core/src/storage_init.cpp index 965bbc8e..4ea5d6d4 100644 --- a/core/src/storage_init.cpp +++ b/core/src/storage_init.cpp @@ -83,9 +83,7 @@ DEFINE_string(eloq_dss_peer_node, "", "EloqDataStoreService peer node address. Used to fetch eloq-dss " "topology from a working eloq-dss server."); -DEFINE_string(eloq_dss_branch_name, - "development", - "Branch name of EloqDataStore"); +DEFINE_string(eloq_dss_branch_name, "main", "Branch name of EloqDataStore"); DEFINE_string(eloq_dss_config_file_path, "", "EloqDataStoreService config file path. Used to load eloq-dss " @@ -277,7 +275,11 @@ bool DataSubstrate::InitializeStorageHandler(const INIReader &config_reader) defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS) EloqDS::RocksDBConfig rocksdb_config(config_reader, eloq_dss_data_path); EloqDS::RocksDBCloudConfig rocksdb_cloud_config(config_reader); - rocksdb_cloud_config.branch_name_ = FLAGS_eloq_dss_branch_name; + rocksdb_cloud_config.branch_name_ = + !CheckCommandLineFlagIsDefault("eloq_dss_branch_name") + ? FLAGS_eloq_dss_branch_name + : config_reader.GetString( + "store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name); auto ds_factory = std::make_unique( rocksdb_config, rocksdb_cloud_config, @@ -293,6 +295,11 @@ bool DataSubstrate::InitializeStorageHandler(const INIReader &config_reader) eloq_dss_data_path, core_config_.node_memory_limit_mb, core_config_.core_num); + eloq_store_config.branch_name_ = + !CheckCommandLineFlagIsDefault("eloq_dss_branch_name") + ? FLAGS_eloq_dss_branch_name + : config_reader.GetString( + "store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name); auto ds_factory = std::make_unique( std::move(eloq_store_config)); #endif diff --git a/store_handler/eloq_data_store_service/eloq_store_config.h b/store_handler/eloq_data_store_service/eloq_store_config.h index ae14769e..0e01a64c 100644 --- a/store_handler/eloq_data_store_service/eloq_store_config.h +++ b/store_handler/eloq_data_store_service/eloq_store_config.h @@ -44,5 +44,11 @@ struct EloqStoreConfig std::vector &storage_path_vector); ::eloqstore::KvOptions eloqstore_configs_{}; + + // Branch name passed to EloqStore::Start(). Populated from + // FLAGS_eloq_dss_branch_name by the caller before the factory is + // constructed. Defaults to "main" so that code paths that never set this + // field (e.g. unit tests) still work. + std::string branch_name_{"main"}; }; } // namespace EloqDS diff --git a/store_handler/eloq_data_store_service/eloq_store_data_store.cpp b/store_handler/eloq_data_store_service/eloq_store_data_store.cpp index 77bc17a0..8f9affdc 100644 --- a/store_handler/eloq_data_store_service/eloq_store_data_store.cpp +++ b/store_handler/eloq_data_store_service/eloq_store_data_store.cpp @@ -25,8 +25,11 @@ #include #include +#include +#include #include +#include "common.h" #include "eloq_store_data_store_factory.h" #include "internal_request.h" @@ -91,6 +94,7 @@ EloqStoreDataStore::EloqStoreDataStore(uint32_t shard_id, assert(factory != nullptr); ::eloqstore::KvOptions opts = factory->eloq_store_configs_.eloqstore_configs_; + branch_name_ = factory->eloq_store_configs_.branch_name_; DLOG(INFO) << "Create EloqStore storage with workers: " << opts.num_threads << ", store path: " << opts.store_path.front() << ", open files limit: " << opts.fd_limit @@ -589,30 +593,79 @@ void EloqStoreDataStore::CreateSnapshotForBackup( { PoolableGuard req_guard(req); - ::eloqstore::GlobalArchiveRequest global_archive_req; - global_archive_req.SetSnapshotTimestamp(req->GetBackupTs()); - eloq_store_service_->ExecSync(&global_archive_req); + std::string_view backup_name = req->GetBackupName(); + assert(req->GetBackupTs() != 0); - ::EloqDS::remote::DataStoreError ds_error; - std::string error_msg; - switch (global_archive_req.Error()) + if (backup_name.empty() || backup_name == eloq_store_service_->Branch()) { - case ::eloqstore::KvError::NoError: - ds_error = ::EloqDS::remote::DataStoreError::NO_ERROR; - break; - case ::eloqstore::KvError::NotRunning: - ds_error = ::EloqDS::remote::DataStoreError::DB_NOT_OPEN; - error_msg = "EloqStore not running"; - break; - default: - ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR; - error_msg = - "Snapshot failed with error code: " + - std::to_string(static_cast(global_archive_req.Error())); - break; + // If backup_name is empty or matches the current branch, create + // snapshot for current branch. + ::eloqstore::GlobalArchiveRequest global_archive_req; + global_archive_req.SetSnapshotTimestamp(req->GetBackupTs()); + eloq_store_service_->ExecSync(&global_archive_req); + + ::EloqDS::remote::DataStoreError ds_error; + std::string error_msg; + switch (global_archive_req.Error()) + { + case ::eloqstore::KvError::NoError: + ds_error = ::EloqDS::remote::DataStoreError::NO_ERROR; + req->AddBackupFile( + ::eloqstore::BranchArchiveName(eloq_store_service_->Branch(), + eloq_store_service_->Term(), + req->GetBackupTs())); + break; + case ::eloqstore::KvError::NotRunning: + ds_error = ::EloqDS::remote::DataStoreError::DB_NOT_OPEN; + error_msg = "EloqStore not running"; + break; + default: + ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR; + error_msg = + "Snapshot failed with error code: " + + std::to_string(static_cast(global_archive_req.Error())); + break; + } + + req->SetFinish(ds_error, error_msg); } + else + { + // backup_name differs from the current branch — create a new branch + // forked from the current branch. Use backup_ts as the salt so the + // internal filename is deterministic and correlated with the backup + // timestamp. + ::eloqstore::GlobalCreateBranchRequest create_branch_req; + create_branch_req.SetArgs(std::string(backup_name)); + create_branch_req.SetSaltTimestamp(req->GetBackupTs()); + eloq_store_service_->ExecSync(&create_branch_req); + + ::EloqDS::remote::DataStoreError ds_error; + std::string error_msg; + switch (create_branch_req.Error()) + { + case ::eloqstore::KvError::NoError: + ds_error = ::EloqDS::remote::DataStoreError::NO_ERROR; + req->AddBackupFile(create_branch_req.result_branch); + break; + case ::eloqstore::KvError::NotRunning: + ds_error = ::EloqDS::remote::DataStoreError::DB_NOT_OPEN; + error_msg = "EloqStore not running"; + break; + case ::eloqstore::KvError::InvalidArgs: + ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR; + error_msg = "Invalid branch name: " + std::string(backup_name); + break; + default: + ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR; + error_msg = + "Create branch failed with error code: " + + std::to_string(static_cast(create_branch_req.Error())); + break; + } - req->SetFinish(ds_error, error_msg); + req->SetFinish(ds_error, error_msg); + } } void EloqStoreDataStore::ScanDelete(DeleteRangeRequest *delete_range_req) diff --git a/store_handler/eloq_data_store_service/eloq_store_data_store.h b/store_handler/eloq_data_store_service/eloq_store_data_store.h index 6dcd23b3..8f0bd300 100644 --- a/store_handler/eloq_data_store_service/eloq_store_data_store.h +++ b/store_handler/eloq_data_store_service/eloq_store_data_store.h @@ -173,7 +173,8 @@ class EloqStoreDataStore : public DataStore bool StartDB(int64_t term) override { - ::eloqstore::KvError res = eloq_store_service_->Start(term); + ::eloqstore::KvError res = + eloq_store_service_->Start(branch_name_, term); if (res != ::eloqstore::KvError::NoError) { LOG(ERROR) << "EloqStore start failed with error code: " @@ -270,5 +271,9 @@ class EloqStoreDataStore : public DataStore void Floor(ScanRequest *scan_req); std::unique_ptr<::eloqstore::EloqStore> eloq_store_service_{nullptr}; + + // Branch name passed to EloqStore::Start(). Populated from the factory + // config in the EloqStoreDataStore constructor. + std::string branch_name_{"main"}; }; } // namespace EloqDS diff --git a/store_handler/eloq_data_store_service/eloqstore b/store_handler/eloq_data_store_service/eloqstore index 695b094d..337846cb 160000 --- a/store_handler/eloq_data_store_service/eloqstore +++ b/store_handler/eloq_data_store_service/eloqstore @@ -1 +1 @@ -Subproject commit 695b094d6f02148f30589e658efa3536ede0c3c3 +Subproject commit 337846cb61b7d9a46e57268ca548355c2eab3eaf diff --git a/store_handler/eloq_data_store_service/main.cpp b/store_handler/eloq_data_store_service/main.cpp index 3123e7fb..53848f18 100644 --- a/store_handler/eloq_data_store_service/main.cpp +++ b/store_handler/eloq_data_store_service/main.cpp @@ -70,7 +70,7 @@ DEFINE_string(eloq_dss_peer_node, "Data store peer node address. Used to get cluster topology if " "data_store_config_file is not provided."); -DEFINE_string(eloq_dss_branch_name, "development", "Data store branch name."); +DEFINE_string(eloq_dss_branch_name, "main", "Data store branch name."); DEFINE_string(ip, "127.0.0.1", "Server IP"); DEFINE_int32(port, 9100, "Server Port"); @@ -303,7 +303,11 @@ int main(int argc, char *argv[]) // INIReader config_reader(nullptr, 0); EloqDS::RocksDBConfig rocksdb_config(config_reader, data_path); EloqDS::RocksDBCloudConfig rocksdb_cloud_config(config_reader); - rocksdb_cloud_config.branch_name_ = FLAGS_eloq_dss_branch_name; + rocksdb_cloud_config.branch_name_ = + !CheckCommandLineFlagIsDefault("eloq_dss_branch_name") + ? FLAGS_eloq_dss_branch_name + : config_reader.GetString( + "store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name); auto ds_factory = std::make_unique( rocksdb_config, rocksdb_cloud_config, enable_cache_replacement_); @@ -327,6 +331,11 @@ int main(int argc, char *argv[]) uint32_t unused_core_number = 0; EloqDS::EloqStoreConfig eloq_store_config( config_reader, data_path, mem_mib, unused_core_number, true); + eloq_store_config.branch_name_ = + !CheckCommandLineFlagIsDefault("eloq_dss_branch_name") + ? FLAGS_eloq_dss_branch_name + : config_reader.GetString( + "store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name); #ifdef ELOQ_MODULE_ENABLED GFLAGS_NAMESPACE::SetCommandLineOption( diff --git a/store_handler/eloq_data_store_service/rocksdb_cloud_data_store.cpp b/store_handler/eloq_data_store_service/rocksdb_cloud_data_store.cpp index 90cd4622..bb16622a 100644 --- a/store_handler/eloq_data_store_service/rocksdb_cloud_data_store.cpp +++ b/store_handler/eloq_data_store_service/rocksdb_cloud_data_store.cpp @@ -674,6 +674,17 @@ bool RocksDBCloudDataStore::OpenCloudDB( // Disable auto compactions before blocking purger options.disable_auto_compactions = true; + LOG(INFO) << "Open RocksDB Cloud with options create_if_missing: " + << create_db_if_missing_ << ", db_path: " << db_path_ + << ", bucket_name: " << cfs_options_.src_bucket.GetBucketName() + << ", bucket_prefix: " + << cfs_options_.src_bucket.GetBucketPrefix() + << ", object_path: " << cfs_options_.src_bucket.GetObjectPath() + << ", endpoint: " + << (cloud_config_.s3_endpoint_url_.empty() + ? "default" + : cloud_config_.s3_endpoint_url_); + auto start = std::chrono::steady_clock::now(); std::unique_lock db_lk(db_mux_); rocksdb::Status status; @@ -694,6 +705,14 @@ bool RocksDBCloudDataStore::OpenCloudDB( bthread_usleep(retry_num * 200000); } + if (!status.ok()) + { + LOG(ERROR) << "Unable to open db at path " << storage_path_ + << " with bucket " << cfs_options.src_bucket.GetBucketName() + << " with error: " << status.ToString(); + return false; + } + auto end = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(end - start); diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 123cd440..0eeacb76 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -1910,7 +1910,8 @@ void CcNodeService::CreateBackup( assert(store_hd != nullptr); if (store_hd && !request->backup_name().empty()) { - assert(!request->dest_path().empty()); + // dest_path may be empty for cloud/shared-storage mode where the + // snapshot is written directly to cloud (no rsync transfer needed). auto st = store::SnapshotManager::Instance().CreateBackup(request); response->set_status(st); if (st == BackupTaskStatus::Finished) diff --git a/tx_service/src/store/snapshot_manager.cpp b/tx_service/src/store/snapshot_manager.cpp index 179d915c..ecf12d7b 100644 --- a/tx_service/src/store/snapshot_manager.cpp +++ b/tx_service/src/store/snapshot_manager.cpp @@ -675,7 +675,15 @@ txservice::remote::BackupTaskStatus SnapshotManager::CreateBackup( auto backup_it = ng_it->second.find(backup_name); if (backup_it != ng_it->second.end()) { - assert(false); + // Idempotency: if the backup already exists and is finished, + // return success. This allows retrying CreateBackup after a + // previous successful completion. + if (backup_it->second.status() == + txservice::remote::BackupTaskStatus::Finished) + { + return txservice::remote::BackupTaskStatus::Finished; + } + // If backup exists but is not finished, it's an error return txservice::remote::BackupTaskStatus::Failed; } }