From a82f4a903a2ed40e65f7c3bcfd653c571fe4f24a Mon Sep 17 00:00:00 2001 From: liunyl Date: Wed, 19 Nov 2025 07:12:21 +0000 Subject: [PATCH 1/2] Warm up local ssd of standby node --- eloq_data_store_service/CMakeLists.txt | 7 + eloq_data_store_service/data_store_factory.h | 48 ++ .../data_store_service.cpp | 429 ++++++++++++++++++ eloq_data_store_service/data_store_service.h | 65 +++ .../data_store_service_config.cpp | 14 + .../data_store_service_config.h | 17 +- eloq_data_store_service/ds_request.proto | 15 + .../eloq_store_data_store_factory.h | 45 ++ eloq_data_store_service/internal_request.h | 36 ++ .../rocksdb_cloud_data_store.cpp | 96 ++++ .../rocksdb_cloud_data_store.h | 14 + .../rocksdb_cloud_data_store_factory.h | 40 ++ .../rocksdb_data_store_factory.h | 40 ++ .../s3_file_downloader.cpp | 162 +++++++ eloq_data_store_service/s3_file_downloader.h | 68 +++ 15 files changed, 1095 insertions(+), 1 deletion(-) create mode 100644 eloq_data_store_service/s3_file_downloader.cpp create mode 100644 eloq_data_store_service/s3_file_downloader.h diff --git a/eloq_data_store_service/CMakeLists.txt b/eloq_data_store_service/CMakeLists.txt index b336fc3..0d3b2d2 100644 --- a/eloq_data_store_service/CMakeLists.txt +++ b/eloq_data_store_service/CMakeLists.txt @@ -386,6 +386,13 @@ if ((WITH_DATA_STORE STREQUAL "ELOQDSS_ROCKSDB_CLOUD_S3") OR purger_event_listener.cpp purger_sliding_window.cpp ) +endif() + +if (WITH_DATA_STORE STREQUAL "ELOQDSS_ROCKSDB_CLOUD_S3") + SET(RESELOQ_SOURCES ${RESELOQ_SOURCES} + s3_file_downloader.cpp + ) +endif() elseif (WITH_DATA_STORE STREQUAL "ELOQDSS_ROCKSDB") SET(RESELOQ_SOURCES ${RESELOQ_SOURCES} rocksdb_data_store_common.cpp diff --git a/eloq_data_store_service/data_store_factory.h b/eloq_data_store_service/data_store_factory.h index fe492dd..2e4dbba 100644 --- a/eloq_data_store_service/data_store_factory.h +++ b/eloq_data_store_service/data_store_factory.h @@ -39,6 +39,54 @@ class DataStoreFactory uint32_t shard_id, DataStoreService *data_store_service, bool start_db = true) = 0; + + /** + * @brief Get storage path for the data store + * @return Storage path string, empty if not applicable + */ + virtual std::string GetStoragePath() const = 0; + + /** + * @brief Get S3 bucket name (with prefix) for constructing S3 URLs + * @return S3 bucket name with prefix, empty if not applicable + */ + virtual std::string GetS3BucketName() const = 0; + + /** + * @brief Get S3 object path for constructing S3 URLs + * @return S3 object path, empty if not applicable + */ + virtual std::string GetS3ObjectPath() const = 0; + + /** + * @brief Get S3 region + * @return S3 region, empty if not applicable + */ + virtual std::string GetS3Region() const = 0; + + /** + * @brief Get S3 endpoint URL (for custom endpoints like MinIO) + * @return S3 endpoint URL, empty if using default AWS endpoint + */ + virtual std::string GetS3EndpointUrl() const = 0; + + /** + * @brief Get AWS access key ID for S3 authentication + * @return AWS access key ID, empty if not applicable or using default credentials + */ + virtual std::string GetAwsAccessKeyId() const = 0; + + /** + * @brief Get AWS secret key for S3 authentication + * @return AWS secret key, empty if not applicable or using default credentials + */ + virtual std::string GetAwsSecretKey() const = 0; + + /** + * @brief Get SST file cache size limit in bytes + * @return Cache size limit in bytes, 0 if not applicable + */ + virtual uint64_t GetSstFileCacheSize() const = 0; }; } // namespace EloqDS diff --git a/eloq_data_store_service/data_store_service.cpp b/eloq_data_store_service/data_store_service.cpp index d145dbc..9d7baf9 100644 --- a/eloq_data_store_service/data_store_service.cpp +++ b/eloq_data_store_service/data_store_service.cpp @@ -25,12 +25,15 @@ #include #include +#include #include #include #include #include #include +#include #include +#include #include #include #include @@ -38,6 +41,7 @@ #include "data_store_fault_inject.h" // ACTION_FAULT_INJECTOR #include "internal_request.h" #include "object_pool.h" +#include "rocksdb_cloud_data_store.h" namespace EloqDS { @@ -69,6 +73,8 @@ thread_local ObjectPool thread_local ObjectPool local_create_snapshot_req_pool_; +thread_local ObjectPool local_sync_file_cache_req_pool_; + TTLWrapperCache::TTLWrapperCache() { ttl_check_running_ = true; @@ -226,6 +232,23 @@ DataStoreService::~DataStoreService() migrate_worker_.Shutdown(); + // Stop file cache sync worker + if (file_cache_sync_worker_ != nullptr) + { + { + std::unique_lock lk(file_cache_sync_mutex_); + file_cache_sync_running_ = false; + file_cache_sync_cv_.notify_one(); + } + file_cache_sync_worker_->Shutdown(); + } + + // Stop file sync worker + if (file_sync_worker_ != nullptr) + { + file_sync_worker_->Shutdown(); + } + // shutdown all data_store if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) { @@ -310,6 +333,21 @@ bool DataStoreService::StartService(bool create_db_if_missing, LOG(INFO) << "DataStoreService started on port " << cluster_manager_.GetThisNode().port_; +#ifdef DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3 + // Start file cache sync worker (for primary node to send file cache to standby) + uint32_t sync_interval_sec = + cluster_manager_.GetFileCacheSyncIntervalSec(); + file_cache_sync_running_ = true; + file_cache_sync_worker_ = std::make_unique(1); + file_cache_sync_worker_->SubmitWork( + [this, sync_interval_sec]() { FileCacheSyncWorker(sync_interval_sec); }); + + // Start file sync worker (for standby node to process incoming sync requests) + file_sync_worker_ = std::make_unique(1); + // ThreadWorkerPool manages its own worker threads internally + // We just need to create it and submit work items to it +#endif + CheckAndRecoverMigrateTask(); return true; @@ -1254,6 +1292,191 @@ void DataStoreService::CreateSnapshotForBackup( data_store_->CreateSnapshotForBackup(req); } +void DataStoreService::SyncFileCache( + ::google::protobuf::RpcController *controller, + const ::EloqDS::remote::SyncFileCacheRequest *request, + ::google::protobuf::Empty *response, + ::google::protobuf::Closure *done) +{ + brpc::ClosureGuard done_guard(done); + + // Validate shard ID + if (request->shard_id() != shard_id_) + { + LOG(WARNING) << "Invalid shard ID in SyncFileCache request: " + << request->shard_id() << " (expected " << shard_id_ << ")"; + // Note: Since response is Empty, we can't return error code. + // Errors are logged and RPC completes successfully. + // The primary node can check logs if needed. + return; + } + + // Only process if we're a standby node (closed status) + if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) + { + LOG(WARNING) << "SyncFileCache called on non-standby node (status: " + << static_cast(shard_status_.load()) << ")"; + return; + } + + // Submit to file sync worker for async processing (file I/O should not block bthread) + SyncFileCacheLocalRequest *req = local_sync_file_cache_req_pool_.NextObject(); + req->SetRequest(request, done_guard.release()); + + if (file_sync_worker_ == nullptr) + { + LOG(ERROR) << "FileSyncWorker not initialized, cannot process SyncFileCache"; + req->Free(); + // Note: done_guard was released, so we need to manually call done + // Since we can't process, we'll let the RPC timeout (not ideal but acceptable) + return; + } + + bool res = file_sync_worker_->SubmitWork([this, req]() { + ProcessSyncFileCache(req); + }); + + if (!res) + { + req->Free(); + LOG(ERROR) << "Failed to submit SyncFileCache work to file sync worker"; + // Note: done_guard was released, so we need to manually call done + // if submission fails. Since we can't process, we'll let the RPC timeout + } +} + +void DataStoreService::ProcessSyncFileCache(SyncFileCacheLocalRequest *req) +{ + std::unique_ptr poolable_guard = std::make_unique(req); + + const auto *request = req->GetRequest(); + + // Get storage path from factory (even though DB is closed, path still exists) + if (data_store_factory_ == nullptr) + { + LOG(ERROR) << "DataStoreFactory is null, cannot process file cache sync"; + req->Finish(); + return; + } + + std::string storage_path = data_store_factory_->GetStoragePath(); + if (storage_path.empty()) + { + LOG(ERROR) << "Storage path is empty, cannot process file cache sync"; + req->Finish(); + return; + } + + // Construct full storage path with shard ID: {storage_path}/ds_{shard_id}/db/ + std::string db_path = storage_path + "/ds_" + std::to_string(shard_id_) + "/db/"; + + // Build file info map from request + std::map file_info_map; + for (const auto &file_info : request->files()) + { + file_info_map[file_info.file_name()] = file_info; + } + + // Step 1: Decide the list of files to keep based on cache size and file number + // Prioritize files with lower file numbers (older files are more likely to be needed) + uint64_t cache_size_limit = GetSstFileCacheSizeLimit(); + std::set files_to_keep = DetermineFilesToKeep( + file_info_map, cache_size_limit); + + // Step 2: List local directory and remove SST files that don't belong to keep list + uint32_t deleted_count = 0; + std::error_code ec; + std::filesystem::directory_iterator dir_ite(db_path, ec); + + if (ec.value() != 0) + { + LOG(ERROR) << "Failed to list local directory: " << ec.message(); + req->Finish(); + return; + } + + for (const auto &entry : dir_ite) + { + if (entry.is_regular_file()) + { + std::string filename = entry.path().filename().string(); + // Only process .sst- files (format: {file_number}.sst-{epoch}) + if (filename.find(".sst-") != std::string::npos) + { + if (files_to_keep.find(filename) == files_to_keep.end()) + { + std::string file_path = db_path + filename; + std::error_code del_ec; + if (std::filesystem::remove(file_path, del_ec)) + { + deleted_count++; + DLOG(INFO) << "Deleted file not in keep list: " << filename; + } + else + { + LOG(WARNING) << "Failed to delete file " << filename + << ": " << del_ec.message(); + } + } + } + } + } + + // Step 3: Download missing files from S3 that are in the keep list +#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) + std::unique_ptr downloader = CreateS3Downloader(); + if (downloader == nullptr) + { + LOG(ERROR) << "Failed to create S3 downloader, skipping downloads"; + req->Finish(); + return; + } + + uint32_t downloaded_count = 0; + for (const auto &filename : files_to_keep) + { + std::string file_path = db_path + filename; + + // Check if file already exists locally + if (std::filesystem::exists(file_path)) + { + continue; // Already have this file + } + + // Download from S3 + auto it = file_info_map.find(filename); + if (it == file_info_map.end()) + { + LOG(WARNING) << "File " << filename + << " in keep list but not in file info map"; + continue; + } + + if (downloader->DownloadFile(filename, file_path)) + { + downloaded_count++; + DLOG(INFO) << "Downloaded " << filename; + } + else + { + LOG(ERROR) << "Failed to download " << filename; + } + } +#else + // S3 download not available for non-RocksDB Cloud S3 backends + uint32_t downloaded_count = 0; + DLOG(INFO) << "S3 download not available for this data store type"; +#endif + + DLOG(INFO) << "File cache sync complete: received=" << request->files_size() + << ", keep_list_size=" << files_to_keep.size() + << ", deleted=" << deleted_count + << ", downloaded=" << downloaded_count; + + // Finish the RPC (response is Empty, so just call done) + req->Finish(); +} + void DataStoreService::FetchDSSClusterConfig( ::google::protobuf::RpcController *controller, const ::google::protobuf::Empty *request, @@ -2251,6 +2474,7 @@ bool DataStoreService::SwitchReadWriteToReadOnly(uint32_t shard_id) "ReadWrite or ReadOnly"; return false; } + // wait for all write requests to finish while (ongoing_write_requests_.load(std::memory_order_acquire) > 0) { @@ -2534,4 +2758,209 @@ void DataStoreService::CleanupOldMigrateLogs() } } +void DataStoreService::FileCacheSyncWorker(uint32_t interval_sec) +{ + while (true) + { + { + // Wait for interval or stop signal using condition variable + std::unique_lock lk(file_cache_sync_mutex_); + file_cache_sync_cv_.wait_for( + lk, + std::chrono::seconds(interval_sec), + [this] { return !file_cache_sync_running_; }); + + if (!file_cache_sync_running_) + { + break; + } + } + + // Only sync if we're the primary node + if (shard_status_.load(std::memory_order_acquire) != + DSShardStatus::ReadWrite) + { + continue; + } + + if (data_store_ == nullptr) + { + continue; + } + + // Collect file cache + std::vector<::EloqDS::remote::FileInfo> file_infos; + auto *cloud_store = + dynamic_cast(data_store_.get()); + if (cloud_store == nullptr) + { + continue; // Not a RocksDB Cloud store + } + + if (!cloud_store->CollectCachedSstFiles(file_infos)) + { + LOG(WARNING) << "Failed to collect file cache for sync"; + continue; + } + + // Get standby nodes from cluster manager + uint32_t shard_id = shard_id_; + const auto shard = cluster_manager_.GetShard(shard_id); + const auto &members = shard.nodes_; // Access nodes_ vector directly + + // Send to each standby node + for (const auto &member : members) + { + if (member == cluster_manager_.GetThisNode()) + { + continue; // Skip self + } + + // Get channel to standby node by node (not by shard id) + DSSNode standby_node(member.host_name_, member.port_); + auto channel = + cluster_manager_.GetDataStoreServiceChannel(standby_node); + if (channel == nullptr) + { + LOG(WARNING) << "Failed to get channel to standby node " + << member.host_name_ << ":" << member.port_; + continue; + } + + // Create RPC stub and send + ::EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + ::EloqDS::remote::SyncFileCacheRequest request; + google::protobuf::Empty response; + brpc::Controller cntl; + + request.set_shard_id(shard_id); + for (const auto &file_info : file_infos) + { + *request.add_files() = file_info; + } + + stub.SyncFileCache(&cntl, &request, &response, nullptr); + + if (cntl.Failed()) + { + LOG(WARNING) << "Failed to sync file cache to standby: " + << cntl.ErrorText(); + } + else + { + DLOG(INFO) << "Synced " << file_infos.size() + << " files to standby node " << member.host_name_; + } + } + } +} + +#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) +std::unique_ptr DataStoreService::CreateS3Downloader() const +{ + if (data_store_factory_ == nullptr) + { + return nullptr; + } + + // Get S3 configuration from factory + std::string bucket_name = data_store_factory_->GetS3BucketName(); + std::string object_path = data_store_factory_->GetS3ObjectPath(); + std::string region = data_store_factory_->GetS3Region(); + std::string endpoint_url = data_store_factory_->GetS3EndpointUrl(); + + if (bucket_name.empty()) + { + LOG(ERROR) << "S3 configuration incomplete, cannot create downloader"; + return nullptr; + } + + // Construct S3 URL: s3://bucket-name/object-path/ + std::string s3_url = "s3://" + bucket_name; + if (!object_path.empty()) + { + s3_url += "/" + object_path; + // Ensure URL ends with '/' if object_path doesn't end with it + if (object_path.back() != '/') + { + s3_url += "/"; + } + } + + // Get AWS credentials from factory + std::string aws_access_key_id = data_store_factory_->GetAwsAccessKeyId(); + std::string aws_secret_key = data_store_factory_->GetAwsSecretKey(); + + // Note: If credentials are empty, S3FileDownloader will use default credential provider + + return std::make_unique( + s3_url, region, aws_access_key_id, aws_secret_key, endpoint_url); +} +#endif + +uint64_t DataStoreService::GetSstFileCacheSizeLimit() const +{ + if (data_store_factory_ == nullptr) + { + // Return default if factory is not available + return 20ULL * 1024 * 1024 * 1024; // Default 20GB + } + + uint64_t cache_size = data_store_factory_->GetSstFileCacheSize(); + if (cache_size == 0) + { + // Return default if factory returns 0 (not applicable or not set) + return 20ULL * 1024 * 1024 * 1024; // Default 20GB + } + + return cache_size; +} + +std::set DataStoreService::DetermineFilesToKeep( + const std::map &file_info_map, + uint64_t cache_size_limit) const +{ + std::set files_to_keep; + + // Sort files by file number (ascending) - lower numbers are older, prioritize keeping these + std::vector> files_sorted; + for (const auto &[filename, file_info] : file_info_map) + { + files_sorted.push_back({file_info.file_number(), filename}); + } + + std::sort(files_sorted.begin(), files_sorted.end(), + [](const auto &a, const auto &b) { + return a.first < b.first; // Ascending: lower file numbers first + }); + + // Add files to keep list until we reach cache size limit + uint64_t current_size = 0; + for (const auto &[file_number, filename] : files_sorted) + { + auto it = file_info_map.find(filename); + if (it == file_info_map.end()) + { + continue; + } + + uint64_t file_size = it->second.file_size(); + + // If adding this file would exceed limit, stop (files with higher numbers won't be kept) + if (current_size + file_size > cache_size_limit) + { + break; + } + + files_to_keep.insert(filename); + current_size += file_size; + } + + DLOG(INFO) << "Determined " << files_to_keep.size() + << " files to keep (total size: " << current_size + << " bytes, limit: " << cache_size_limit << " bytes)"; + + return files_to_keep; +} + } // namespace EloqDS \ No newline at end of file diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index d308376..7dc28db 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -26,7 +26,9 @@ #include #include +#include #include +#include #include #include #include @@ -39,11 +41,16 @@ #include "data_store_service_config.h" #include "data_store_service_util.h" #include "ds_request.pb.h" +#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) +#include "s3_file_downloader.h" +#endif #include "thread_worker_pool.h" namespace EloqDS { +class SyncFileCacheLocalRequest; + enum class WriteOpType { DELETE = 0, @@ -449,6 +456,18 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService ::EloqDS::remote::CreateSnapshotForBackupResponse *response, ::google::protobuf::Closure *done) override; + /** + * @brief RPC handler for file cache synchronization (generic for any storage backend) + * @param controller RPC controller + * @param request File cache sync request + * @param response Empty response (google.protobuf.Empty) + * @param done Callback function + */ + void SyncFileCache(::google::protobuf::RpcController *controller, + const ::EloqDS::remote::SyncFileCacheRequest *request, + ::google::protobuf::Empty *response, + ::google::protobuf::Closure *done) override; + /** * @brief Create snapshot for backup operation * @param result Result (output) @@ -752,6 +771,52 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService // map{event_id->migrate_log} std::shared_mutex migrate_task_mux_; std::unordered_map migrate_task_map_; + + /** + * @brief Worker thread for periodic file cache sync to standby nodes + * @param interval_sec Sync interval in seconds + */ + void FileCacheSyncWorker(uint32_t interval_sec); + + /** + * @brief Process file cache sync request (called by file_sync_worker_) + * @param req Local request containing the sync request + */ + void ProcessSyncFileCache(SyncFileCacheLocalRequest *req); + +#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) + /** + * @brief Create S3 downloader instance + * @return Unique pointer to S3FileDownloader, or nullptr on failure + */ + std::unique_ptr CreateS3Downloader() const; +#endif + + /** + * @brief Get SST file cache size limit from config + * @return Cache size limit in bytes + */ + uint64_t GetSstFileCacheSizeLimit() const; + + /** + * @brief Determine which files to keep based on cache size limit and file number + * Files with lower file numbers are prioritized (older files are kept first) + * Files with higher file numbers are excluded if cache size limit is exceeded + * @param file_info_map Map of all available files from primary node + * @param cache_size_limit Maximum cache size in bytes + * @return Set of file names that should be kept on local disk + */ + std::set DetermineFilesToKeep( + const std::map &file_info_map, + uint64_t cache_size_limit) const; + + std::unique_ptr file_cache_sync_worker_; + std::unique_ptr file_sync_worker_; + + // File cache sync worker synchronization + std::mutex file_cache_sync_mutex_; + std::condition_variable file_cache_sync_cv_; + bool file_cache_sync_running_{false}; }; } // namespace EloqDS diff --git a/eloq_data_store_service/data_store_service_config.cpp b/eloq_data_store_service/data_store_service_config.cpp index d87e7ad..e5d641a 100644 --- a/eloq_data_store_service/data_store_service_config.cpp +++ b/eloq_data_store_service/data_store_service_config.cpp @@ -34,6 +34,7 @@ #include "INIReader.h" #include "glog/logging.h" + namespace EloqDS { @@ -799,6 +800,19 @@ void DataStoreServiceClusterManager::AppendThisNodeKey( ss << this_node_.host_name_ << ":" << this_node_.port_; } +void DataStoreServiceClusterManager::SetFileCacheSyncIntervalSec( + uint32_t interval_sec) +{ + std::unique_lock lk(mutex_); + file_cache_sync_interval_sec_ = interval_sec; +} + +uint32_t DataStoreServiceClusterManager::GetFileCacheSyncIntervalSec() const +{ + std::shared_lock lk(mutex_); + return file_cache_sync_interval_sec_; +} + uint32_t DataStoreServiceClusterManager::GetShardIdByPartitionId( const uint32_t partition_id) { diff --git a/eloq_data_store_service/data_store_service_config.h b/eloq_data_store_service/data_store_service_config.h index 9cb696b..4d8c365 100644 --- a/eloq_data_store_service/data_store_service_config.h +++ b/eloq_data_store_service/data_store_service_config.h @@ -329,7 +329,8 @@ class DataStoreServiceClusterManager sharding_algorithm_(config.sharding_algorithm_ ? CreateShardingAlgorithm( config.sharding_algorithm_->GetName()) - : nullptr) + : nullptr), + file_cache_sync_interval_sec_(config.file_cache_sync_interval_sec_) { } @@ -354,6 +355,7 @@ class DataStoreServiceClusterManager config.sharding_algorithm_ ? CreateShardingAlgorithm(config.sharding_algorithm_->GetName()) : nullptr; + file_cache_sync_interval_sec_ = config.file_cache_sync_interval_sec_; return *this; } @@ -452,6 +454,16 @@ class DataStoreServiceClusterManager void SetThisNode(const std::string &ip, uint16_t port); + /** + * @brief Set the file cache sync interval in seconds. + */ + void SetFileCacheSyncIntervalSec(uint32_t interval_sec); + + /** + * @brief Get the file cache sync interval in seconds. + */ + uint32_t GetFileCacheSyncIntervalSec() const; + /** * @brief Append the key of this node to the specified string stream. */ @@ -510,5 +522,8 @@ class DataStoreServiceClusterManager // channel to other DSS service std::map> node_channel_map_; + + // file cache sync interval in seconds + uint32_t file_cache_sync_interval_sec_{30}; }; } // namespace EloqDS diff --git a/eloq_data_store_service/ds_request.proto b/eloq_data_store_service/ds_request.proto index c9b8604..1ea3875 100644 --- a/eloq_data_store_service/ds_request.proto +++ b/eloq_data_store_service/ds_request.proto @@ -39,6 +39,9 @@ service DataStoreRpcService { //-----Snapshot and branch rpc CreateSnapshotForBackup(CreateSnapshotForBackupRequest) returns (CreateSnapshotForBackupResponse); + //-----File cache sync for standby warm-up (generic protocol for any storage backend) + rpc SyncFileCache(SyncFileCacheRequest) returns (google.protobuf.Empty); + }; enum DataStoreError { @@ -350,3 +353,15 @@ message FaultInjectRequest { message FaultInjectResponse { bool finished = 1; } + +//-----File cache sync for standby warm-up +message FileInfo { + string file_name = 1; // File name (e.g., "000123.sst") + uint64 file_size = 2; // File size in bytes + uint64 file_number = 3; // File number extracted from filename for eviction priority +} + +message SyncFileCacheRequest { + uint32 shard_id = 1; + repeated FileInfo files = 2; +} diff --git a/eloq_data_store_service/eloq_store_data_store_factory.h b/eloq_data_store_service/eloq_store_data_store_factory.h index 15b78dd..9568f65 100644 --- a/eloq_data_store_service/eloq_store_data_store_factory.h +++ b/eloq_data_store_service/eloq_store_data_store_factory.h @@ -56,6 +56,51 @@ class EloqStoreDataStoreFactory : public DataStoreFactory return ds; } + std::string GetStoragePath() const override + { + // EloqStore uses multiple paths, return first one or empty + if (eloq_store_configs_.eloqstore_configs_.store_path.empty()) + { + return ""; + } + return eloq_store_configs_.eloqstore_configs_.store_path[0]; + } + + std::string GetS3BucketName() const override + { + return ""; // Not applicable for EloqStore + } + + std::string GetS3ObjectPath() const override + { + return ""; // Not applicable for EloqStore + } + + std::string GetS3Region() const override + { + return ""; // Not applicable for EloqStore + } + + std::string GetS3EndpointUrl() const override + { + return ""; // Not applicable for EloqStore + } + + std::string GetAwsAccessKeyId() const override + { + return ""; // Not applicable for EloqStore + } + + std::string GetAwsSecretKey() const override + { + return ""; // Not applicable for EloqStore + } + + uint64_t GetSstFileCacheSize() const override + { + return 0; // Not applicable for EloqStore + } + private: const EloqStoreConfig eloq_store_configs_; diff --git a/eloq_data_store_service/internal_request.h b/eloq_data_store_service/internal_request.h index c000bd7..564aa96 100644 --- a/eloq_data_store_service/internal_request.h +++ b/eloq_data_store_service/internal_request.h @@ -1588,4 +1588,40 @@ class CreateSnapshotForBackupLocalRequest google::protobuf::Closure *done_{nullptr}; }; +class SyncFileCacheLocalRequest : public Poolable +{ +public: + SyncFileCacheLocalRequest() = default; + SyncFileCacheLocalRequest(const SyncFileCacheLocalRequest &other) = delete; + SyncFileCacheLocalRequest &operator=(const SyncFileCacheLocalRequest &other) = delete; + + void Clear() override + { + request_ = nullptr; + done_ = nullptr; + } + + void SetRequest(const ::EloqDS::remote::SyncFileCacheRequest *request, + google::protobuf::Closure *done) + { + request_ = request; + done_ = done; + } + + const ::EloqDS::remote::SyncFileCacheRequest *GetRequest() const + { + return request_; + } + + void Finish() + { + brpc::ClosureGuard done_guard(done_); + // Response is Empty, nothing to set + } + +private: + const ::EloqDS::remote::SyncFileCacheRequest *request_{nullptr}; + google::protobuf::Closure *done_{nullptr}; +}; + } // namespace EloqDS diff --git a/eloq_data_store_service/rocksdb_cloud_data_store.cpp b/eloq_data_store_service/rocksdb_cloud_data_store.cpp index 9f79be4..49ffc1e 100644 --- a/eloq_data_store_service/rocksdb_cloud_data_store.cpp +++ b/eloq_data_store_service/rocksdb_cloud_data_store.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -832,6 +833,101 @@ rocksdb::DBCloud *RocksDBCloudDataStore::GetDBPtr() return db_; } +bool RocksDBCloudDataStore::CollectCachedSstFiles( + std::vector<::EloqDS::remote::FileInfo> &file_infos) +{ + std::shared_lock db_lk(db_mux_); + + if (db_ == nullptr) + { + LOG(ERROR) << "DB not open, cannot collect file cache"; + return false; + } + + // Get live file metadata from RocksDB + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + + // Get list of files in local directory + std::set local_files; + std::error_code ec; + std::filesystem::directory_iterator dir_ite(db_path_, ec); + if (ec.value() != 0) + { + LOG(ERROR) << "Failed to list local directory: " << ec.message(); + return false; + } + + for (const auto &entry : dir_ite) + { + if (entry.is_regular_file()) + { + std::string filename = entry.path().filename().string(); + // Only include .sst- files (format: {file_number}.sst-{epoch}) + if (filename.find(".sst-") != std::string::npos) + { + local_files.insert(filename); + } + } + } + + // Build intersection: files that are both in metadata and local directory + file_infos.clear(); + for (const auto &meta : metadata) + { + std::string filename = std::filesystem::path(meta.name).filename().string(); + rocksdb::CloudFileSystem *cfs = + dynamic_cast(cloud_fs_.get()); + std::string remapped_filename = cfs->RemapFilename(filename); + // Only include files that exist locally + if (local_files.find(remapped_filename) != local_files.end()) + { + ::EloqDS::remote::FileInfo file_info; + file_info.set_file_name(remapped_filename); + file_info.set_file_size(meta.size); + file_info.set_file_number(ExtractFileNumber(remapped_filename)); + + file_infos.push_back(file_info); + } + } + + DLOG(INFO) << "Collected " << file_infos.size() + << " cached SST files for shard " << shard_id_; + return true; +} + +uint64_t RocksDBCloudDataStore::ExtractFileNumber(const std::string &file_name) +{ + // SST file names are in format: {file_number}.sst-{epoch} + // Example: "000011.sst-ef6b2d92d3687a84" + // Extract numeric part before ".sst-" + size_t sst_pos = file_name.find(".sst-"); + if (sst_pos == std::string::npos) + { + LOG(ERROR) << "Failed to extract file number from " << file_name; + return 0; + } + + std::string base = file_name.substr(0, sst_pos); + // Remove leading zeros and convert to number + size_t first_non_zero = base.find_first_not_of('0'); + if (first_non_zero == std::string::npos) + { + LOG(ERROR) << "All zeros in file number from " << file_name; + return 0; // All zeros + } + + try + { + return std::stoull(base.substr(first_non_zero)); + } + catch (const std::exception &e) + { + LOG(ERROR) << "Failed to extract file number from " << file_name; + return 0; + } +} + inline std::string RocksDBCloudDataStore::MakeCloudManifestCookie( const std::string &branch_name, int64_t dss_shard_id, int64_t term) { diff --git a/eloq_data_store_service/rocksdb_cloud_data_store.h b/eloq_data_store_service/rocksdb_cloud_data_store.h index 1533b43..b677648 100644 --- a/eloq_data_store_service/rocksdb_cloud_data_store.h +++ b/eloq_data_store_service/rocksdb_cloud_data_store.h @@ -75,6 +75,13 @@ class RocksDBCloudDataStore : public RocksDBDataStoreCommon */ void Shutdown() override; + /** + * @brief Collect list of cached SST files (intersection of live files and local directory) + * @param file_infos Output vector of file information + * @return true if collection succeeded, false otherwise + */ + bool CollectCachedSstFiles(std::vector<::EloqDS::remote::FileInfo> &file_infos); + protected: /** * @brief Get the RocksDB pointer. @@ -145,6 +152,13 @@ class RocksDBCloudDataStore : public RocksDBDataStoreCommon * from the number without any loss in the string representation. */ bool String2ll(const char *s, size_t slen, int64_t &value); + /** + * @brief Extract file number from SST file name (e.g., "000011.sst-ef6b2d92d3687a84" -> 11) + * @param file_name SST file name in format {file_number}.sst-{epoch} + * @return File number, or 0 if extraction fails + */ + uint64_t ExtractFileNumber(const std::string &file_name); + /** * @brief Open the cloud database. * @param cfs_options The cloud file system options. diff --git a/eloq_data_store_service/rocksdb_cloud_data_store_factory.h b/eloq_data_store_service/rocksdb_cloud_data_store_factory.h index 793eb90..c545973 100644 --- a/eloq_data_store_service/rocksdb_cloud_data_store_factory.h +++ b/eloq_data_store_service/rocksdb_cloud_data_store_factory.h @@ -74,6 +74,46 @@ class RocksDBCloudDataStoreFactory : public DataStoreFactory return ds; } + std::string GetStoragePath() const override + { + return config_.storage_path_; + } + + std::string GetS3BucketName() const override + { + return cloud_config_.bucket_prefix_ + cloud_config_.bucket_name_; + } + + std::string GetS3ObjectPath() const override + { + return cloud_config_.object_path_; + } + + std::string GetS3Region() const override + { + return cloud_config_.region_; + } + + std::string GetS3EndpointUrl() const override + { + return cloud_config_.s3_endpoint_url_; + } + + std::string GetAwsAccessKeyId() const override + { + return cloud_config_.aws_access_key_id_; + } + + std::string GetAwsSecretKey() const override + { + return cloud_config_.aws_secret_key_; + } + + uint64_t GetSstFileCacheSize() const override + { + return cloud_config_.sst_file_cache_size_; + } + private: ::EloqDS::RocksDBConfig config_; ::EloqDS::RocksDBCloudConfig cloud_config_; diff --git a/eloq_data_store_service/rocksdb_data_store_factory.h b/eloq_data_store_service/rocksdb_data_store_factory.h index 417ae9e..7f907f9 100644 --- a/eloq_data_store_service/rocksdb_data_store_factory.h +++ b/eloq_data_store_service/rocksdb_data_store_factory.h @@ -71,6 +71,46 @@ class RocksDBDataStoreFactory : public DataStoreFactory return ds; } + std::string GetStoragePath() const override + { + return config_.storage_path_; + } + + std::string GetS3BucketName() const override + { + return ""; // Not applicable for non-cloud RocksDB + } + + std::string GetS3ObjectPath() const override + { + return ""; // Not applicable for non-cloud RocksDB + } + + std::string GetS3Region() const override + { + return ""; // Not applicable for non-cloud RocksDB + } + + std::string GetS3EndpointUrl() const override + { + return ""; // Not applicable for non-cloud RocksDB + } + + std::string GetAwsAccessKeyId() const override + { + return ""; // Not applicable for non-cloud RocksDB + } + + std::string GetAwsSecretKey() const override + { + return ""; // Not applicable for non-cloud RocksDB + } + + uint64_t GetSstFileCacheSize() const override + { + return 0; // Not applicable for non-cloud RocksDB + } + private: ::EloqDS::RocksDBConfig config_; bool tx_enable_cache_replacement_; diff --git a/eloq_data_store_service/s3_file_downloader.cpp b/eloq_data_store_service/s3_file_downloader.cpp new file mode 100644 index 0000000..d0daf63 --- /dev/null +++ b/eloq_data_store_service/s3_file_downloader.cpp @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2025 EloqData Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under either of the following two licenses: + * 1. GNU Affero General Public License, version 3, as published by the Free + * Software Foundation. + * 2. GNU General Public License as published by the Free Software + * Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License or GNU General Public License for more + * details. + * + * You should have received a copy of the GNU Affero General Public License + * and GNU General Public License V2 along with this program. If not, see + * . + * + */ +#include "s3_file_downloader.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace EloqDS +{ + +S3FileDownloader::S3FileDownloader(const std::string &s3_url, + const std::string ®ion, + const std::string &aws_access_key_id, + const std::string &aws_secret_key, + const std::string &s3_endpoint_url) +{ + // Parse S3 URL: s3://bucket-name/object-path-prefix + // Example: s3://my-bucket/my-prefix/ -> bucket_name_ = "my-bucket", object_path_prefix_ = "my-prefix/" + if (s3_url.find("s3://") != 0) + { + LOG(ERROR) << "Invalid S3 URL format, must start with s3://: " << s3_url; + return; + } + + std::string url_without_scheme = s3_url.substr(5); // Remove "s3://" + size_t slash_pos = url_without_scheme.find('/'); + + if (slash_pos == std::string::npos) + { + // No object path prefix, just bucket name + bucket_name_ = url_without_scheme; + object_path_prefix_ = ""; + } + else + { + bucket_name_ = url_without_scheme.substr(0, slash_pos); + object_path_prefix_ = url_without_scheme.substr(slash_pos + 1); + // Ensure object_path_prefix_ ends with '/' if not empty + if (!object_path_prefix_.empty() && object_path_prefix_.back() != '/') + { + object_path_prefix_ += "/"; + } + } + + if (bucket_name_.empty()) + { + LOG(ERROR) << "Invalid S3 URL: bucket name is empty"; + return; + } + + Aws::Client::ClientConfiguration config; + config.region = region; + + if (!s3_endpoint_url.empty()) + { + config.endpointOverride = s3_endpoint_url; + // Determine scheme from endpoint + std::string lower_endpoint = s3_endpoint_url; + std::transform(lower_endpoint.begin(), lower_endpoint.end(), + lower_endpoint.begin(), ::tolower); + if (lower_endpoint.find("https://") == 0) + { + config.scheme = Aws::Http::Scheme::HTTPS; + } + else + { + config.scheme = Aws::Http::Scheme::HTTP; + } + config.verifySSL = false; + } + + std::shared_ptr credentials_provider; + if (!aws_access_key_id.empty() && !aws_secret_key.empty()) + { + credentials_provider = + std::make_shared( + Aws::String(aws_access_key_id.c_str()), + Aws::String(aws_secret_key.c_str())); + } + else + { + credentials_provider = + std::make_shared(); + } + + s3_client_ = std::make_shared( + credentials_provider, + config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + true /* useVirtualAddressing */); +} + +S3FileDownloader::~S3FileDownloader() = default; + +bool S3FileDownloader::DownloadFile(const std::string &s3_file_name, + const std::string &local_file_path) +{ + // Construct S3 object key: {object_path_prefix}{file_name} + std::string s3_key = object_path_prefix_ + s3_file_name; + + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(bucket_name_); + request.SetKey(s3_key); + + auto outcome = s3_client_->GetObject(request); + + if (!outcome.IsSuccess()) + { + LOG(ERROR) << "Failed to download " << s3_file_name + << " from S3: " << outcome.GetError().GetMessage(); + return false; + } + + // Write to local file + std::ofstream out_file(local_file_path, std::ios::binary); + if (!out_file.is_open()) + { + LOG(ERROR) << "Failed to open local file for writing: " << local_file_path; + return false; + } + + auto &body = outcome.GetResult().GetBody(); + out_file << body.rdbuf(); + out_file.close(); + + if (!out_file.good()) + { + LOG(ERROR) << "Failed to write file: " << local_file_path; + return false; + } + + DLOG(INFO) << "Downloaded " << s3_file_name << " to " << local_file_path; + return true; +} + +} // namespace EloqDS + diff --git a/eloq_data_store_service/s3_file_downloader.h b/eloq_data_store_service/s3_file_downloader.h new file mode 100644 index 0000000..014275e --- /dev/null +++ b/eloq_data_store_service/s3_file_downloader.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2025 EloqData Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under either of the following two licenses: + * 1. GNU Affero General Public License, version 3, as published by the Free + * Software Foundation. + * 2. GNU General Public License as published by the Free Software + * Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License or GNU General Public License for more + * details. + * + * You should have received a copy of the GNU Affero General Public License + * and GNU General Public License V2 along with this program. If not, see + * . + * + */ +#pragma once + +#include +#include +#include +#include + +namespace EloqDS +{ + +class S3FileDownloader +{ +public: + /** + * @brief Construct S3FileDownloader from S3 URL + * @param s3_url S3 URL in format: s3://bucket-name/object-path-prefix + * Example: s3://my-bucket/my-prefix/ + * @param region AWS region + * @param aws_access_key_id AWS access key ID (empty to use default provider) + * @param aws_secret_key AWS secret key (empty to use default provider) + * @param s3_endpoint_url Custom S3 endpoint URL (empty for default AWS endpoint) + */ + S3FileDownloader(const std::string &s3_url, + const std::string ®ion, + const std::string &aws_access_key_id, + const std::string &aws_secret_key, + const std::string &s3_endpoint_url = ""); + + ~S3FileDownloader(); + + /** + * @brief Download a file from S3 to local path + * @param s3_file_name File name in S3 (e.g., "000123.sst") + * @param local_file_path Full local path to save the file + * @return true if download succeeded, false otherwise + */ + bool DownloadFile(const std::string &s3_file_name, + const std::string &local_file_path); + +private: + std::shared_ptr s3_client_; + std::string bucket_name_; + std::string object_path_prefix_; +}; + +} // namespace EloqDS + From 61ff2ff56926dff0def5ae2471591255831a52da Mon Sep 17 00:00:00 2001 From: liunyl Date: Thu, 20 Nov 2025 03:10:43 +0000 Subject: [PATCH 2/2] download meta data file to pass rocksdbcloud reinit dir check --- eloq_data_store_service/CMakeLists.txt | 12 ++-- .../data_store_service.cpp | 55 ++++++++++++++++++- eloq_data_store_service/data_store_service.h | 18 ++++-- .../rocksdb_cloud_data_store.cpp | 22 ++++++-- .../s3_file_downloader.cpp | 3 +- 5 files changed, 87 insertions(+), 23 deletions(-) diff --git a/eloq_data_store_service/CMakeLists.txt b/eloq_data_store_service/CMakeLists.txt index 0d3b2d2..99a0253 100644 --- a/eloq_data_store_service/CMakeLists.txt +++ b/eloq_data_store_service/CMakeLists.txt @@ -386,13 +386,6 @@ if ((WITH_DATA_STORE STREQUAL "ELOQDSS_ROCKSDB_CLOUD_S3") OR purger_event_listener.cpp purger_sliding_window.cpp ) -endif() - -if (WITH_DATA_STORE STREQUAL "ELOQDSS_ROCKSDB_CLOUD_S3") - SET(RESELOQ_SOURCES ${RESELOQ_SOURCES} - s3_file_downloader.cpp - ) -endif() elseif (WITH_DATA_STORE STREQUAL "ELOQDSS_ROCKSDB") SET(RESELOQ_SOURCES ${RESELOQ_SOURCES} rocksdb_data_store_common.cpp @@ -406,6 +399,11 @@ elseif (WITH_DATA_STORE STREQUAL "ELOQDSS_ELOQSTORE") ) endif () +if (WITH_DATA_STORE STREQUAL "ELOQDSS_ROCKSDB_CLOUD_S3") + SET(RESELOQ_SOURCES ${RESELOQ_SOURCES} + s3_file_downloader.cpp + ) +endif() include_directories( diff --git a/eloq_data_store_service/data_store_service.cpp b/eloq_data_store_service/data_store_service.cpp index 9d7baf9..d97d4b9 100644 --- a/eloq_data_store_service/data_store_service.cpp +++ b/eloq_data_store_service/data_store_service.cpp @@ -379,6 +379,12 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, return expect_status == open_mode; } + // Make sure file sync is not running + while (is_file_sync_running_.load(std::memory_order_relaxed)) + { + bthread_usleep(10000); + } + DLOG(INFO) << "Connecting and starting data store for shard id:" << data_shard_id << ", open_mode:" << static_cast(open_mode) @@ -1328,7 +1334,7 @@ void DataStoreService::SyncFileCache( LOG(ERROR) << "FileSyncWorker not initialized, cannot process SyncFileCache"; req->Free(); // Note: done_guard was released, so we need to manually call done - // Since we can't process, we'll let the RPC timeout (not ideal but acceptable) + brpc::ClosureGuard done_guard(done); return; } @@ -1341,7 +1347,7 @@ void DataStoreService::SyncFileCache( req->Free(); LOG(ERROR) << "Failed to submit SyncFileCache work to file sync worker"; // Note: done_guard was released, so we need to manually call done - // if submission fails. Since we can't process, we'll let the RPC timeout + brpc::ClosureGuard done_guard(done); } } @@ -1350,6 +1356,13 @@ void DataStoreService::ProcessSyncFileCache(SyncFileCacheLocalRequest *req) std::unique_ptr poolable_guard = std::make_unique(req); const auto *request = req->GetRequest(); + + if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) + { + LOG(WARNING) << "Shard status is not closed, skipping file sync"; + req->Finish(); + return; + } // Get storage path from factory (even though DB is closed, path still exists) if (data_store_factory_ == nullptr) @@ -1369,6 +1382,12 @@ void DataStoreService::ProcessSyncFileCache(SyncFileCacheLocalRequest *req) // Construct full storage path with shard ID: {storage_path}/ds_{shard_id}/db/ std::string db_path = storage_path + "/ds_" + std::to_string(shard_id_) + "/db/"; + + // Create local db_path if not exists + if (!std::filesystem::exists(db_path)) + { + std::filesystem::create_directories(db_path); + } // Build file info map from request std::map file_info_map; @@ -1394,9 +1413,17 @@ void DataStoreService::ProcessSyncFileCache(SyncFileCacheLocalRequest *req) req->Finish(); return; } - + + is_file_sync_running_.store(true, std::memory_order_release); for (const auto &entry : dir_ite) { + if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) + { + LOG(WARNING) << "Shard status is not closed, skipping file sync"; + is_file_sync_running_.store(false, std::memory_order_release); + req->Finish(); + return; + } if (entry.is_regular_file()) { std::string filename = entry.path().filename().string(); @@ -1428,13 +1455,34 @@ void DataStoreService::ProcessSyncFileCache(SyncFileCacheLocalRequest *req) if (downloader == nullptr) { LOG(ERROR) << "Failed to create S3 downloader, skipping downloads"; + is_file_sync_running_.store(false, std::memory_order_release); req->Finish(); return; } uint32_t downloaded_count = 0; + if (!std::filesystem::exists(db_path + "IDENTITY")) + { + downloader->DownloadFile("IDENTITY", db_path + "IDENTITY"); + } + if (!std::filesystem::exists(db_path + "CURRENT")) + { + // Create dummy local CURRENT file. This file needs to be in local db dir so that + // rocksdb cloud will not clean up the db dir when opening db. Rocksdbcloud + // ignores content of CURRENT file so we can set any content we want. + std::ofstream current_file(db_path + "CURRENT"); + current_file << "MANIFEST-000001\n"; + current_file.close(); + } + + for (const auto &filename : files_to_keep) { + if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) + { + LOG(WARNING) << "Shard status is not closed, skipping file sync"; + break; + } std::string file_path = db_path + filename; // Check if file already exists locally @@ -1475,6 +1523,7 @@ void DataStoreService::ProcessSyncFileCache(SyncFileCacheLocalRequest *req) // Finish the RPC (response is Empty, so just call done) req->Finish(); + is_file_sync_running_.store(false, std::memory_order_release); } void DataStoreService::FetchDSSClusterConfig( diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index 7dc28db..6e51c2f 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -457,7 +458,8 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService ::google::protobuf::Closure *done) override; /** - * @brief RPC handler for file cache synchronization (generic for any storage backend) + * @brief RPC handler for file cache synchronization (generic for any + * storage backend) * @param controller RPC controller * @param request File cache sync request * @param response Empty response (google.protobuf.Empty) @@ -699,6 +701,9 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService std::unique_ptr data_store_{nullptr}; std::atomic shard_status_{DSShardStatus::Closed}; std::atomic ongoing_write_requests_{0}; + // Whether the file cache sync is running. Used to avoid concurrent local ssd file operations + // between db and file sync worker. + std::atomic is_file_sync_running_{false}; // scan iterator cache TTLWrapperCache scan_iter_cache_; @@ -791,17 +796,18 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService */ std::unique_ptr CreateS3Downloader() const; #endif - + /** * @brief Get SST file cache size limit from config * @return Cache size limit in bytes */ uint64_t GetSstFileCacheSizeLimit() const; - + /** - * @brief Determine which files to keep based on cache size limit and file number - * Files with lower file numbers are prioritized (older files are kept first) - * Files with higher file numbers are excluded if cache size limit is exceeded + * @brief Determine which files to keep based on cache size limit and file + * number Files with lower file numbers are prioritized (older files are + * kept first) Files with higher file numbers are excluded if cache size + * limit is exceeded * @param file_info_map Map of all available files from primary node * @param cache_size_limit Maximum cache size in bytes * @return Set of file names that should be kept on local disk diff --git a/eloq_data_store_service/rocksdb_cloud_data_store.cpp b/eloq_data_store_service/rocksdb_cloud_data_store.cpp index 49ffc1e..7f92735 100644 --- a/eloq_data_store_service/rocksdb_cloud_data_store.cpp +++ b/eloq_data_store_service/rocksdb_cloud_data_store.cpp @@ -19,6 +19,8 @@ * . * */ +#include "rocksdb_cloud_data_store.h" + #include #include #include @@ -48,7 +50,6 @@ #include "purger_event_listener.h" #include "rocksdb/cloud/cloud_file_system_impl.h" #include "rocksdb/cloud/cloud_storage_provider.h" -#include "rocksdb_cloud_data_store.h" #define LONG_STR_SIZE 21 @@ -452,8 +453,8 @@ bool RocksDBCloudDataStore::OpenCloudDB( // boost write performance by enabling unordered write options.unordered_write = true; // skip Consistency check, which compares the actual file size with the size - // recorded in the metadata, which can fail when skip_cloud_files_in_getchildren is - // set to true + // recorded in the metadata, which can fail when + // skip_cloud_files_in_getchildren is set to true options.paranoid_checks = false; // print db statistics every 60 seconds @@ -873,11 +874,20 @@ bool RocksDBCloudDataStore::CollectCachedSstFiles( // Build intersection: files that are both in metadata and local directory file_infos.clear(); + rocksdb::CloudFileSystem *cfs = + dynamic_cast(cloud_fs_.get()); + + // Add cloud manifest file to file_infos. Set its file number to 0 so + // that it is always kept in the keep list. + ::EloqDS::remote::FileInfo file_info; + file_info.set_file_name("CLOUDMANIFEST-" + cfs_options_.new_cookie_on_open); + file_info.set_file_size(0); + file_info.set_file_number(0); + file_infos.push_back(file_info); for (const auto &meta : metadata) { - std::string filename = std::filesystem::path(meta.name).filename().string(); - rocksdb::CloudFileSystem *cfs = - dynamic_cast(cloud_fs_.get()); + std::string filename = + std::filesystem::path(meta.name).filename().string(); std::string remapped_filename = cfs->RemapFilename(filename); // Only include files that exist locally if (local_files.find(remapped_filename) != local_files.end()) diff --git a/eloq_data_store_service/s3_file_downloader.cpp b/eloq_data_store_service/s3_file_downloader.cpp index d0daf63..bb83e7c 100644 --- a/eloq_data_store_service/s3_file_downloader.cpp +++ b/eloq_data_store_service/s3_file_downloader.cpp @@ -86,12 +86,13 @@ S3FileDownloader::S3FileDownloader(const std::string &s3_url, if (lower_endpoint.find("https://") == 0) { config.scheme = Aws::Http::Scheme::HTTPS; + config.verifySSL = true; // keep TLS verification for HTTPS } else { config.scheme = Aws::Http::Scheme::HTTP; + config.verifySSL = false; } - config.verifySSL = false; } std::shared_ptr credentials_provider;