diff --git a/include/async_io_manager.h b/include/async_io_manager.h index 4eb1aee2..2817b991 100644 --- a/include/async_io_manager.h +++ b/include/async_io_manager.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -481,6 +483,7 @@ class IouringMgr : public AsyncIoManager return 0; // IouringMgr doesn't use local file caching } + KvError TryCleanupLocalPartitionDir(const TableIdent &tbl_id); void CleanManifest(const TableIdent &tbl_id) override; static constexpr uint64_t oflags_dir = O_DIRECTORY | O_RDONLY; @@ -528,6 +531,8 @@ class IouringMgr : public AsyncIoManager * @brief mu_ avoids open/close file concurrently. */ Mutex mu_; + bool opening_{false}; + WaitingZone open_waiting_; int fd_{FdEmpty}; int reg_idx_{-1}; bool dirty_{false}; @@ -899,6 +904,8 @@ class CloudStoreMgr final : public IouringMgr const TableIdent &tbl_id) override; std::pair RefreshManifest( const TableIdent &tbl_id, std::string_view archive_tag); + void RequestGcLocalCleanup(const TableIdent &tbl_id, + const std::vector &filenames); KvError DownloadFile(const TableIdent &tbl_id, FileId file_id, uint64_t term, @@ -1012,6 +1019,8 @@ class CloudStoreMgr final : public IouringMgr * @brief Locally cached files that are not currently opened. */ std::unordered_map closed_files_; + std::unordered_set pending_gc_cleanup_; + std::deque pending_gc_cleanup_queue_; CachedFile lru_file_head_; CachedFile lru_file_tail_; size_t used_local_space_{0}; diff --git a/include/file_gc.h b/include/file_gc.h index 5a01450d..ed250e00 100644 --- a/include/file_gc.h +++ b/include/file_gc.h @@ -80,7 +80,8 @@ KvError DeleteUnreferencedCloudFiles( const std::vector &manifest_terms, const RetainedFiles &retained_files, FileId least_not_archived_file_id, - CloudStoreMgr *cloud_mgr); + CloudStoreMgr *cloud_mgr, + std::vector &deleted_filenames); } // namespace FileGarbageCollector } // namespace eloqstore diff --git a/include/storage/index_page_manager.h b/include/storage/index_page_manager.h index c74eb3ab..28d6398d 100644 --- a/include/storage/index_page_manager.h +++ b/include/storage/index_page_manager.h @@ -55,6 +55,7 @@ class IndexPageManager KvError MakeCowRoot(const TableIdent &tbl_ident, CowRootMeta &cow_meta); void UpdateRoot(const TableIdent &tbl_ident, CowRootMeta new_meta); + void MarkManifestMissing(const TableIdent &tbl_ident); std::pair FindPage(MappingSnapshot *mapping, PageId page_id); diff --git a/src/async_io_manager.cpp b/src/async_io_manager.cpp index 78c7db5f..4fc3dc42 100644 --- a/src/async_io_manager.cpp +++ b/src/async_io_manager.cpp @@ -84,6 +84,29 @@ WriteTask *CurrentWriteTask() return nullptr; } } + +bool ParseCloudCleanupFilename(std::string_view filename, + FileId &file_id, + uint64_t &term) +{ + auto [type, suffix] = ParseFileName(filename); + if (type == FileNameData) + { + return ParseDataFileSuffix(suffix, file_id, term); + } + if (type == FileNameManifest) + { + std::optional tag; + if (!ParseManifestFileSuffix(suffix, term, tag) || + tag.has_value()) + { + return false; + } + file_id = IouringMgr::LruFD::kManifest; + return true; + } + return false; +} } // namespace char *VarPagePtr(const VarPage &page) @@ -899,6 +922,87 @@ size_t IouringMgr::GetOpenFileLimit() const return fd_limit_; } +KvError IouringMgr::TryCleanupLocalPartitionDir(const TableIdent &tbl_id) +{ + const std::string partition_name = tbl_id.ToString(); + + LruFD::Ref dir_fd = GetOpenedFD(tbl_id, LruFD::kDirectory); + LruFD::Ref manifest_fd = GetOpenedFD(tbl_id, LruFD::kManifest); + const bool directory_active = + dir_fd != nullptr && dir_fd.Get()->ref_count_ > 1; + const bool manifest_active = + manifest_fd != nullptr && manifest_fd.Get()->ref_count_ > 1; + + auto close_idle_fd = + [&](LruFD::Ref &fd_ref, std::string_view fd_name) -> KvError + { + if (fd_ref == nullptr || fd_ref.Get()->ref_count_ != 1) + { + return KvError::NoError; + } + + KvError close_err = CloseFile(std::move(fd_ref)); + if (close_err != KvError::NoError) + { + LOG(WARNING) << "Failed to close cached-idle " << fd_name + << " handle for table " << tbl_id << ": " + << ErrorString(close_err); + } + return close_err; + }; + + KvError close_err = close_idle_fd(dir_fd, "directory"); + if (close_err != KvError::NoError) + { + return close_err; + } + close_err = close_idle_fd(manifest_fd, "manifest"); + if (close_err != KvError::NoError) + { + return close_err; + } + + if (directory_active || manifest_active) + { + DLOG(INFO) << "Skip cleaning partition directory " << partition_name + << " because " + << (directory_active ? "directory" : "manifest") + << " handle is still active"; + return KvError::NoError; + } + + FdIdx root_fd = GetRootFD(tbl_id); + struct statx stx = {}; + int stat_res = StatxAt(root_fd, partition_name.c_str(), &stx); + if (stat_res < 0) + { + if (stat_res == -ENOENT) + { + return KvError::NoError; + } + LOG(WARNING) << "Failed to stat partition directory " << partition_name + << " for table " << tbl_id << ": " + << strerror(-stat_res); + return ToKvError(stat_res); + } + if ((stx.stx_mode & S_IFMT) != S_IFDIR) + { + LOG(WARNING) << "Skip cleaning non-directory partition path " + << partition_name << " for table " << tbl_id; + return KvError::IoFail; + } + + int dir_res = UnlinkAt(root_fd, partition_name.c_str(), true); + if (dir_res == 0 || dir_res == -ENOENT || dir_res == -ENOTEMPTY) + { + return KvError::NoError; + } + + LOG(WARNING) << "Failed to delete partition directory " << partition_name + << " for table " << tbl_id << ": " << strerror(-dir_res); + return ToKvError(dir_res); +} + void IouringMgr::CleanManifest(const TableIdent &tbl_id) { if (HasOtherFile(tbl_id)) @@ -941,30 +1045,11 @@ void IouringMgr::CleanManifest(const TableIdent &tbl_id) << " during cleanup: " << ErrorString(dir_err); } - FdIdx root_fd = GetRootFD(tbl_id); - std::string dir_name = tbl_id.ToString(); - int dir_res = UnlinkAt(root_fd, dir_name.c_str(), true); - if (dir_res < 0) - { - if (dir_res == -ENOENT) - { - DLOG(INFO) << "Directory " << dir_name - << " already removed while cleaning manifest"; - } - else if (dir_res == -ENOTEMPTY) - { - DLOG(INFO) << "Directory " << dir_name - << " is not empty, skip removing"; - } - else - { - LOG(WARNING) << "Failed to delete directory " << dir_name << " : " - << strerror(-dir_res); - } - } - else + KvError cleanup_err = TryCleanupLocalPartitionDir(tbl_id); + if (cleanup_err != KvError::NoError) { - DLOG(INFO) << "Successfully deleted directory " << dir_name; + LOG(WARNING) << "Failed to clean partition directory for table " + << tbl_id << ": " << ErrorString(cleanup_err); } } @@ -1069,51 +1154,66 @@ std::pair IouringMgr::OpenOrCreateFD( PartitionFiles *tbl = &it_tbl->second; auto [it_fd, _] = tbl->fds_.try_emplace(file_id, tbl, file_id, term); LruFD::Ref lru_fd(&it_fd->second, this); + LruFD *fd_state = lru_fd.Get(); - // Avoid multiple coroutines from concurrently opening or closing the same - // file duplicately. - lru_fd.Get()->mu_.Lock(); - if (file_id == LruFD::kDirectory) + // Serialize potentially blocking open/create work without holding mu_ so + // cache eviction and cleanup can still inspect the FD state. + while (true) { - if (lru_fd.Get()->fd_ != LruFD::FdEmpty) + fd_state->mu_.Lock(); + if (file_id == LruFD::kDirectory) { - lru_fd.Get()->mu_.Unlock(); - return {std::move(lru_fd), KvError::NoError}; + if (fd_state->fd_ != LruFD::FdEmpty) + { + fd_state->mu_.Unlock(); + return {std::move(lru_fd), KvError::NoError}; + } } - } - else if (lru_fd.Get()->reg_idx_ >= 0) - { - // Check for term mismatch when not in local mode. - if (eloq_store->Mode() != StoreMode::Local && - file_id != LruFD::kDirectory && term != 0) + else if (fd_state->reg_idx_ >= 0) { - uint64_t cached_term = lru_fd.Get()->term_; - if (cached_term != 0 && cached_term != term) + // Check for term mismatch when not in local mode. + if (eloq_store->Mode() != StoreMode::Local && + file_id != LruFD::kDirectory && term != 0) { - // Term mismatch detected, close and reopen with correct term. - int old_idx = lru_fd.Get()->reg_idx_; - int res = CloseDirect(old_idx); - if (res < 0) + uint64_t cached_term = fd_state->term_; + if (cached_term != 0 && cached_term != term) { - lru_fd.Get()->mu_.Unlock(); - return {nullptr, ToKvError(res)}; + // Term mismatch detected, close and reopen with correct + // term. + int old_idx = fd_state->reg_idx_; + int res = CloseDirect(old_idx); + if (res < 0) + { + fd_state->mu_.Unlock(); + return {nullptr, ToKvError(res)}; + } + fd_state->reg_idx_ = -1; + // Fall through to open/create with correct term. + } + else + { + // No mismatch, use cached FD. + fd_state->mu_.Unlock(); + return {std::move(lru_fd), KvError::NoError}; } - lru_fd.Get()->reg_idx_ = -1; - // Fall through to open/create with correct term } else { - // No mismatch, use cached FD. - lru_fd.Get()->mu_.Unlock(); + // Local mode or directory, use cached FD. + fd_state->mu_.Unlock(); return {std::move(lru_fd), KvError::NoError}; } } - else + + if (!fd_state->opening_) { - // Local mode or directory, use cached FD. - lru_fd.Get()->mu_.Unlock(); - return {std::move(lru_fd), KvError::NoError}; + fd_state->opening_ = true; + fd_state->mu_.Unlock(); + break; } + + fd_state->mu_.Unlock(); + fd_state->open_waiting_.Wait(ThdTask()); } int fd; @@ -1166,27 +1266,34 @@ std::pair IouringMgr::OpenOrCreateFD( LOG(ERROR) << "open failed " << tbl_id << " file id " << file_id << " : " << ErrorString(error); } - lru_fd.Get()->mu_.Unlock(); + + fd_state->mu_.Lock(); + fd_state->opening_ = false; + fd_state->open_waiting_.WakeAll(); + fd_state->mu_.Unlock(); return {nullptr, error}; } + fd_state->mu_.Lock(); if (file_id == LruFD::kDirectory) { - lru_fd.Get()->fd_ = fd; - lru_fd.Get()->reg_idx_ = -1; + fd_state->fd_ = fd; + fd_state->reg_idx_ = -1; } else { - lru_fd.Get()->reg_idx_ = fd; - lru_fd.Get()->fd_ = LruFD::FdEmpty; + fd_state->reg_idx_ = fd; + fd_state->fd_ = LruFD::FdEmpty; } // Set term on newly opened data file FD. if (file_id <= LruFD::kMaxDataFile) { - lru_fd.Get()->term_ = term; + fd_state->term_ = term; } - lru_fd.Get()->mu_.Unlock(); + fd_state->opening_ = false; + fd_state->open_waiting_.WakeAll(); + fd_state->mu_.Unlock(); return {std::move(lru_fd), KvError::NoError}; } @@ -3110,6 +3217,8 @@ KvError CloudStoreMgr::RestoreFilesForTable(const TableIdent &tbl_id, } } + const size_t retained_files = + cached_files.size() - (has_max_data_file ? 1U : 0U); for (size_t i = 0; i < cached_files.size(); ++i) { if (has_max_data_file && i == max_data_file_idx) @@ -3124,6 +3233,28 @@ KvError CloudStoreMgr::RestoreFilesForTable(const TableIdent &tbl_id, restored_bytes += file_info.expected_size; } + if (retained_files == 0) + { + std::error_code remove_ec; + bool removed_dir = fs::remove(table_path, remove_ec); + if (remove_ec) + { + if (remove_ec.value() != ENOENT && remove_ec.value() != ENOTEMPTY && + remove_ec.value() != EEXIST) + { + LOG(ERROR) << "Failed to remove idle partition directory " + << table_path << " for table " << tbl_id << ": " + << remove_ec.message(); + return ToKvError(-remove_ec.value()); + } + } + else if (removed_dir) + { + LOG(INFO) << "Removed idle partition directory " << table_path + << " during cache restore"; + } + } + return KvError::NoError; } @@ -3191,6 +3322,7 @@ std::pair CloudStoreMgr::TrimRestoredCacheUsage() bool CloudStoreMgr::IsIdle() { return file_cleaner_.status_ == TaskStatus::Idle && + pending_gc_cleanup_queue_.empty() && active_prewarm_tasks_ == 0 && inflight_cloud_slots_ == 0 && !obj_store_.HasPendingWork(); } @@ -3969,6 +4101,28 @@ std::tuple CloudStoreMgr::ReadTermFile( return {term, download_task.etag_, KvError::NoError}; } +void CloudStoreMgr::RequestGcLocalCleanup( + const TableIdent &tbl_id, const std::vector &filenames) +{ + bool queued = false; + for (const std::string &filename : filenames) + { + FileKey key(tbl_id, filename); + auto [_, inserted] = pending_gc_cleanup_.insert(key); + if (!inserted) + { + continue; + } + pending_gc_cleanup_queue_.push_back(std::move(key)); + queued = true; + } + + if (queued && file_cleaner_.status_ == TaskStatus::Idle) + { + file_cleaner_.Resume(); + } +} + KvError CloudStoreMgr::UpsertTermFile(const TableIdent &tbl_id, uint64_t process_term) { @@ -4209,7 +4363,8 @@ KvError CloudStoreMgr::CreateArchive(const TableIdent &tbl_id, } } - auto [dir_fd, err] = OpenFD(tbl_id, LruFD::kDirectory, false, 0); + auto [dir_fd, err] = OpenOrCreateFD( + tbl_id, LruFD::kDirectory, false, true, 0); CHECK_KV_ERR(err); int res = WriteSnapshot(std::move(dir_fd), key.filename_, snapshot); if (res < 0) @@ -4432,7 +4587,13 @@ KvError CloudStoreMgr::CloseFile(LruFD::Ref fd) { const TableIdent *tbl_id = fd.Get()->tbl_->tbl_id_; uint64_t term = fd.Get()->term_; - EnqueClosedFile(FileKey(*tbl_id, ToFilename(file_id, term))); + FileKey key(*tbl_id, ToFilename(file_id, term)); + const bool pending_cleanup = pending_gc_cleanup_.contains(key); + EnqueClosedFile(std::move(key)); + if (pending_cleanup && file_cleaner_.status_ == TaskStatus::Idle) + { + file_cleaner_.Resume(); + } } return KvError::NoError; } @@ -5256,6 +5417,7 @@ void CloudStoreMgr::FileCleaner::Run() UnlinkReq() = default; CachedFile *file_; fs::path path_; + bool gc_cleanup_{false}; }; std::array unlink_reqs; @@ -5267,8 +5429,11 @@ void CloudStoreMgr::FileCleaner::Run() status_ = TaskStatus::Ongoing; while (true) { - if (!io_mgr_->HasEvictableFile() || - (io_mgr_->used_local_space_ <= threshold && requesting_.Empty())) + const bool has_pending_gc = !io_mgr_->pending_gc_cleanup_queue_.empty(); + const bool needs_pressure_eviction = + io_mgr_->HasEvictableFile() && + (io_mgr_->used_local_space_ > threshold || !requesting_.Empty()); + if (!has_pending_gc && !needs_pressure_eviction) { // No file to evict, or used space is below threshold. requesting_.WakeAll(); @@ -5284,27 +5449,132 @@ void CloudStoreMgr::FileCleaner::Run() } uint16_t req_count = 0; - for (CachedFile *file = io_mgr_->lru_file_tail_.prev_; - file != &io_mgr_->lru_file_head_; - file = file->prev_) + bool made_progress = false; + std::vector retry_keys; + retry_keys.reserve(io_mgr_->pending_gc_cleanup_queue_.size()); + std::unordered_set touched_partitions; + + if (has_pending_gc) { - if (req_count == batch_size) + const size_t pending_count = io_mgr_->pending_gc_cleanup_queue_.size(); + for (size_t i = 0; i < pending_count && req_count < batch_size; ++i) { - // For pointer stability, we can not reallocate this vector. - break; + FileKey key = + std::move(io_mgr_->pending_gc_cleanup_queue_.front()); + io_mgr_->pending_gc_cleanup_queue_.pop_front(); + if (!io_mgr_->pending_gc_cleanup_.contains(key)) + { + continue; + } + + FileId file_id = 0; + uint64_t term = 0; + if (!ParseCloudCleanupFilename(key.filename_, file_id, term)) + { + LOG(WARNING) << "Skip invalid pending GC cleanup file " + << key.filename_ << " for table " + << key.tbl_id_; + io_mgr_->pending_gc_cleanup_.erase(key); + made_progress = true; + continue; + } + + LruFD::Ref fd_ref = io_mgr_->GetOpenedFD(key.tbl_id_, file_id); + if (fd_ref != nullptr && fd_ref.Get()->term_ == term) + { + if (fd_ref.Get()->ref_count_ > 1) + { + retry_keys.emplace_back(std::move(key)); + continue; + } + + KvError close_err = io_mgr_->CloseFile(std::move(fd_ref)); + if (close_err != KvError::NoError) + { + LOG(WARNING) + << "Failed to close cached-idle file during GC " + "cleanup for table " + << key.tbl_id_ << " file " << key.filename_ + << ": " << ErrorString(close_err); + retry_keys.emplace_back(std::move(key)); + continue; + } + made_progress = true; + } + + auto it = io_mgr_->closed_files_.find(key); + if (it == io_mgr_->closed_files_.end()) + { + io_mgr_->pending_gc_cleanup_.erase(key); + made_progress = true; + continue; + } + + CachedFile *file = &it->second; + file->evicting_ = true; + + UnlinkReq &req = unlink_reqs[req_count++]; + req.file_ = file; + req.gc_cleanup_ = true; + req.path_ = file->key_->tbl_id_.ToString(); + req.path_ /= file->key_->filename_; + + io_uring_sqe *sqe = io_mgr_->GetSQE(UserDataType::BaseReq, &req); + int root_fd = io_mgr_->GetRootFD(file->key_->tbl_id_).first; + io_uring_prep_unlinkat(sqe, root_fd, req.path_.c_str(), 0); } - // Set evicting to block task that try to open it. - file->evicting_ = true; + for (FileKey &key : retry_keys) + { + io_mgr_->pending_gc_cleanup_queue_.push_back(std::move(key)); + } + } + + if (req_count < batch_size && needs_pressure_eviction) + { + for (CachedFile *file = io_mgr_->lru_file_tail_.prev_; + file != &io_mgr_->lru_file_head_; + file = file->prev_) + { + if (req_count == batch_size) + { + // For pointer stability, we can not reallocate this vector. + break; + } + if (file->evicting_ || + io_mgr_->pending_gc_cleanup_.contains(*file->key_)) + { + continue; + } + + // Set evicting to block task that try to open it. + file->evicting_ = true; + + UnlinkReq &req = unlink_reqs[req_count++]; + req.file_ = file; + req.gc_cleanup_ = false; + req.path_ = file->key_->tbl_id_.ToString(); + req.path_ /= file->key_->filename_; - UnlinkReq &req = unlink_reqs[req_count++]; - req.file_ = file; - req.path_ = file->key_->tbl_id_.ToString(); - req.path_ /= file->key_->filename_; + io_uring_sqe *sqe = io_mgr_->GetSQE(UserDataType::BaseReq, &req); + int root_fd = io_mgr_->GetRootFD(req.file_->key_->tbl_id_).first; + io_uring_prep_unlinkat(sqe, root_fd, req.path_.c_str(), 0); + } + } - io_uring_sqe *sqe = io_mgr_->GetSQE(UserDataType::BaseReq, &req); - int root_fd = io_mgr_->GetRootFD(req.file_->key_->tbl_id_).first; - io_uring_prep_unlinkat(sqe, root_fd, req.path_.c_str(), 0); + if (req_count == 0) + { + if (!made_progress) + { + status_ = TaskStatus::Idle; + Yield(); + if (killed_) + { + break; + } + status_ = TaskStatus::Ongoing; + } + continue; } WaitIo(); @@ -5313,23 +5583,51 @@ void CloudStoreMgr::FileCleaner::Run() { const UnlinkReq &req = unlink_reqs[i]; CachedFile *file = req.file_; - if (req.res_ < 0) + if (req.res_ < 0 && req.res_ != -ENOENT) { LOG(ERROR) << "unlink file failed: " << req.path_ << " : " << strerror(-req.res_); file->evicting_ = false; file->waiting_.Wake(); + if (req.gc_cleanup_) + { + io_mgr_->pending_gc_cleanup_queue_.push_back(*file->key_); + } continue; } size_t file_size = io_mgr_->EstimateFileSize(file->key_->filename_); - io_mgr_->used_local_space_ -= file_size; + if (file_size <= io_mgr_->used_local_space_) + { + io_mgr_->used_local_space_ -= file_size; + } + else + { + io_mgr_->used_local_space_ = 0; + } + touched_partitions.insert(file->key_->tbl_id_); + if (req.gc_cleanup_) + { + io_mgr_->pending_gc_cleanup_.erase(*file->key_); + } file->Deque(); file->waiting_.Wake(); io_mgr_->closed_files_.erase(*file->key_); requesting_.WakeOne(); } + + for (const TableIdent &tbl_id : touched_partitions) + { + KvError cleanup_err = io_mgr_->TryCleanupLocalPartitionDir(tbl_id); + if (cleanup_err != KvError::NoError) + { + LOG(WARNING) + << "Failed to clean local partition directory for table " + << tbl_id << " after file cleanup: " + << ErrorString(cleanup_err); + } + } DLOG(INFO) << "file cleaner send " << req_count << " unlink requests"; } } diff --git a/src/file_gc.cpp b/src/file_gc.cpp index 9959aef4..f70c917f 100644 --- a/src/file_gc.cpp +++ b/src/file_gc.cpp @@ -19,6 +19,7 @@ #include "replayer.h" #include "storage/mem_index_page.h" #include "storage/object_store.h" +#include "storage/shard.h" #include "tasks/task.h" #include "utils.h" namespace eloqstore @@ -473,10 +474,19 @@ KvError DeleteUnreferencedCloudFiles( const std::vector &manifest_terms, const RetainedFiles &retained_files, FileId least_not_archived_file_id, - CloudStoreMgr *cloud_mgr) + CloudStoreMgr *cloud_mgr, + std::vector &deleted_filenames, + bool *deleted_current_manifest) { std::vector files_to_delete; + std::vector basenames_to_delete; auto process_term = cloud_mgr->ProcessTerm(); + const std::string current_manifest = ManifestFileName(process_term); + deleted_filenames.clear(); + if (deleted_current_manifest != nullptr) + { + *deleted_current_manifest = false; + } for (const std::string &file_name : data_files) { @@ -510,6 +520,7 @@ KvError DeleteUnreferencedCloudFiles( { std::string remote_path = tbl_id.ToString() + "/" + file_name; files_to_delete.push_back(remote_path); + basenames_to_delete.push_back(file_name); } else { @@ -523,6 +534,7 @@ KvError DeleteUnreferencedCloudFiles( { files_to_delete.emplace_back(tbl_id.ToString() + "/" + ManifestFileName(process_term)); + basenames_to_delete.emplace_back(ManifestFileName(process_term)); } // delete expired manifest files. @@ -532,6 +544,7 @@ KvError DeleteUnreferencedCloudFiles( { files_to_delete.emplace_back(tbl_id.ToString() + "/" + ManifestFileName(term)); + basenames_to_delete.emplace_back(ManifestFileName(term)); } } @@ -558,17 +571,34 @@ KvError DeleteUnreferencedCloudFiles( current_task->WaitIo(); - for (const auto &task : delete_tasks) + KvError first_error = KvError::NoError; + deleted_filenames.reserve(basenames_to_delete.size()); + for (size_t i = 0; i < delete_tasks.size(); ++i) { + const auto &task = delete_tasks[i]; + const bool manifest_gone = + basenames_to_delete[i] == current_manifest && + (task.error_ == KvError::NoError || + task.error_ == KvError::NotFound); + if (manifest_gone && deleted_current_manifest != nullptr) + { + *deleted_current_manifest = true; + } if (task.error_ != KvError::NoError) { LOG(ERROR) << "Failed to delete file " << task.remote_path_ << ": " << ErrorString(task.error_); - return task.error_; + if (first_error == KvError::NoError) + { + first_error = task.error_; + } + continue; } + + deleted_filenames.push_back(basenames_to_delete[i]); } - return KvError::NoError; + return first_error; } KvError DeleteUnreferencedLocalFiles(const TableIdent &tbl_id, @@ -732,18 +762,26 @@ KvError ExecuteCloudGC(const TableIdent &tbl_id, } // 5. delete unreferenced data files. + std::vector deleted_filenames; + bool deleted_current_manifest = false; err = DeleteUnreferencedCloudFiles(tbl_id, data_files, manifest_terms, retained_files, least_not_archived_file_id, - cloud_mgr); - if (err != KvError::NoError) + cloud_mgr, + deleted_filenames, + &deleted_current_manifest); + if (!deleted_filenames.empty()) { - return err; + cloud_mgr->RequestGcLocalCleanup(tbl_id, deleted_filenames); + } + if (deleted_current_manifest) + { + shard->IndexManager()->MarkManifestMissing(tbl_id); } - return KvError::NoError; + return err; } } // namespace FileGarbageCollector diff --git a/src/storage/index_page_manager.cpp b/src/storage/index_page_manager.cpp index 185eb63b..b860e8e8 100644 --- a/src/storage/index_page_manager.cpp +++ b/src/storage/index_page_manager.cpp @@ -340,6 +340,33 @@ void IndexPageManager::UpdateRoot(const TableIdent &tbl_ident, root_meta_mgr_.EvictIfNeeded(); } +void IndexPageManager::MarkManifestMissing(const TableIdent &tbl_ident) +{ + auto *entry = root_meta_mgr_.Find(tbl_ident); + if (entry == nullptr) + { + return; + } + + RootMeta &meta = entry->meta_; + if (meta.mapper_ == nullptr || meta.manifest_size_ == 0) + { + return; + } + + // Cloud GC only deletes the current manifest for an empty partition. + // Keep the in-memory RootMeta consistent so the next write rebuilds a + // fresh snapshot instead of appending to a manifest that no longer exists. + if (meta.root_id_ != MaxPageId || meta.ttl_root_id_ != MaxPageId || + meta.mapper_->MappingCount() != 0) + { + return; + } + + meta.manifest_size_ = 0; + root_meta_mgr_.UpdateBytes(entry, RootMetaBytes(meta)); +} + KvError IndexPageManager::InstallExternalSnapshot(const TableIdent &tbl_ident, CowRootMeta &cow_meta, std::string_view reopen_tag) diff --git a/tests/cloud.cpp b/tests/cloud.cpp index d1bea799..c25cbb4a 100644 --- a/tests/cloud.cpp +++ b/tests/cloud.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -11,7 +12,9 @@ #include #include "common.h" +#include "async_io_manager.h" #include "kv_options.h" +#include "storage/shard.h" #include "test_utils.h" #include "utils.h" @@ -65,6 +68,98 @@ void WriteBatches(MapVerifier &writer, next_key += entries_per_batch; } } + +std::vector ListLocalPartitionFiles( + const std::filesystem::path &partition_path) +{ + std::vector files; + if (!std::filesystem::exists(partition_path)) + { + return files; + } + for (const auto &entry : std::filesystem::directory_iterator(partition_path)) + { + if (entry.is_regular_file()) + { + files.push_back(entry.path().filename().string()); + } + } + return files; +} + +void WriteTestFile(const std::filesystem::path &path, + std::string_view content = {}) +{ + std::ofstream file(path, std::ios::binary); + file << content; +} + +bool ContainsFileWithPrefix(const std::vector &files, + std::string_view prefix) +{ + return std::any_of(files.begin(), + files.end(), + [prefix](const std::string &name) + { return name.rfind(prefix, 0) == 0; }); +} + +std::optional FindLowestDataFile( + const std::vector &files) +{ + bool found = false; + eloqstore::FileId best_file_id = 0; + uint64_t best_term = 0; + std::string best_name; + + for (const std::string &name : files) + { + auto [type, suffix] = eloqstore::ParseFileName(name); + if (type != eloqstore::FileNameData) + { + continue; + } + + eloqstore::FileId file_id = 0; + uint64_t term = 0; + if (!eloqstore::ParseDataFileSuffix(suffix, file_id, term)) + { + continue; + } + + if (!found || file_id < best_file_id || + (file_id == best_file_id && term < best_term)) + { + found = true; + best_file_id = file_id; + best_term = term; + best_name = name; + } + } + + if (!found) + { + return std::nullopt; + } + return best_name; +} + +struct EloqStoreAccessor +{ + eloqstore::KvOptions options_; + std::vector root_fds_; + std::unique_ptr cloud_service_; + std::vector> shards_; +}; + +eloqstore::Shard *PrimaryShard(eloqstore::EloqStore *store) +{ + auto *access = reinterpret_cast(store); + if (access->shards_.empty()) + { + return nullptr; + } + return access->shards_.front().get(); +} } // namespace TEST_CASE("cloud prewarm downloads while shards idle", "[cloud][prewarm]") @@ -210,6 +305,126 @@ TEST_CASE("cloud reopen waits on evicting cached file", "[cloud][gc]") writer.Validate(); } +TEST_CASE("cloud gc removes local cached files after remote truncate", + "[cloud][gc][targeted]") +{ + using namespace std::chrono_literals; + namespace fs = std::filesystem; + + eloqstore::KvOptions options = cloud_options; + options.num_threads = 1; + options.pages_per_file_shift = 1; + options.local_space_limit = 256ULL << 20; + + CleanupStore(options); + + eloqstore::EloqStore *store = InitStore(options); + eloqstore::TableIdent tbl_id{"cloud-gc-truncate", 0}; + MapVerifier writer(tbl_id, store, false); + writer.SetAutoClean(false); + writer.SetValueSize(10024); + + writer.Upsert(0, 600); + writer.Validate(); + + const fs::path partition_path = + fs::path(options.store_path[0]) / tbl_id.ToString(); + REQUIRE(WaitForCondition( + 10s, + 50ms, + [&]() { return ContainsFileWithPrefix(ListLocalPartitionFiles(partition_path), "data_"); })); + + const std::vector local_before = + ListLocalPartitionFiles(partition_path); + REQUIRE(ContainsFileWithPrefix(local_before, "data_")); + const auto deleted_local_it = + std::find_if(local_before.begin(), + local_before.end(), + [](const std::string &name) + { return name.rfind("data_", 0) == 0; }); + REQUIRE(deleted_local_it != local_before.end()); + const std::string deleted_local_file = *deleted_local_it; + + writer.Truncate(0, true); + + REQUIRE(WaitForCondition( + 20s, + 100ms, + [&]() + { return !fs::exists(partition_path / deleted_local_file); })); + REQUIRE(WaitForCondition( + 20s, 100ms, [&]() { return !fs::exists(partition_path); })); + + store->Stop(); + CleanupStore(options); +} + +TEST_CASE("cloud gc waits for an actively referenced local file before deleting cache", + "[cloud][gc][targeted]") +{ + using namespace std::chrono_literals; + namespace fs = std::filesystem; + + eloqstore::KvOptions options = cloud_options; + options.num_threads = 1; + options.pages_per_file_shift = 1; + options.local_space_limit = 256ULL << 20; + + CleanupStore(options); + + eloqstore::EloqStore *store = InitStore(options); + eloqstore::TableIdent tbl_id{"cloud-gc-open-file", 0}; + MapVerifier writer(tbl_id, store, false); + writer.SetAutoClean(false); + writer.SetValueSize(10024); + + writer.Upsert(0, 600); + + const fs::path partition_path = + fs::path(options.store_path[0]) / tbl_id.ToString(); + REQUIRE(WaitForCondition( + 10s, + 50ms, + [&]() { return FindLowestDataFile(ListLocalPartitionFiles(partition_path)).has_value(); })); + + std::optional target_name = + FindLowestDataFile(ListLocalPartitionFiles(partition_path)); + REQUIRE(target_name.has_value()); + const fs::path target_path = partition_path / *target_name; + REQUIRE(fs::exists(target_path)); + + writer.Read(0); + + auto [type, suffix] = eloqstore::ParseFileName(*target_name); + REQUIRE(type == eloqstore::FileNameData); + eloqstore::FileId target_file_id = 0; + uint64_t target_term = 0; + REQUIRE( + eloqstore::ParseDataFileSuffix(suffix, target_file_id, target_term)); + + eloqstore::Shard *shard = PrimaryShard(store); + REQUIRE(shard != nullptr); + auto *cloud_mgr = static_cast(shard->IoManager()); + auto held_fd = cloud_mgr->GetOpenedFD(tbl_id, target_file_id); + REQUIRE(held_fd != nullptr); + REQUIRE(held_fd.Get()->term_ == target_term); + + writer.Truncate(0, true); + + REQUIRE(fs::exists(target_path)); + std::this_thread::sleep_for(200ms); + REQUIRE(fs::exists(target_path)); + + held_fd = eloqstore::IouringMgr::LruFD::Ref(); + REQUIRE( + WaitForCondition(20s, 100ms, [&]() { return !fs::exists(target_path); })); + REQUIRE(WaitForCondition( + 20s, 100ms, [&]() { return !fs::exists(partition_path); })); + + store->Stop(); + CleanupStore(options); +} + TEST_CASE("cloud prewarm respects cache budget", "[cloud][prewarm]") { using namespace std::chrono_literals; @@ -352,6 +567,88 @@ TEST_CASE("cloud reuse cache enforces budgets across restarts", CleanupStore(options); } +TEST_CASE("cloud startup restore removes empty idle partition directories", + "[cloud][cache]") +{ + namespace fs = std::filesystem; + + eloqstore::KvOptions options = cloud_options; + options.allow_reuse_local_caches = true; + options.num_threads = 1; + + const std::string suffix = "reuse-empty-idle"; + const std::string local_base = options.store_path[0]; + options.store_path = {local_base + std::string("/") + suffix}; + options.cloud_store_path.push_back('/'); + options.cloud_store_path += suffix; + + CleanupStore(options); + + auto store = std::make_unique(options); + REQUIRE(store->Start() == eloqstore::KvError::NoError); + store->Stop(); + + const eloqstore::TableIdent tbl_id{"reuse_empty_idle", 0}; + const fs::path partition_path = + fs::path(options.store_path.front()) / tbl_id.ToString(); + fs::create_directories(partition_path); + REQUIRE(fs::exists(partition_path)); + REQUIRE(fs::is_empty(partition_path)); + + REQUIRE(store->Start() == eloqstore::KvError::NoError); + REQUIRE(WaitForCondition( + chrono::seconds(5), + chrono::milliseconds(20), + [&]() { return !fs::exists(partition_path); })); + + store->Stop(); + CleanupStore(options); +} + +TEST_CASE("cloud startup restore removes partitions cleaned to empty", + "[cloud][cache]") +{ + namespace fs = std::filesystem; + + eloqstore::KvOptions options = cloud_options; + options.allow_reuse_local_caches = true; + options.num_threads = 1; + + const std::string suffix = "reuse-cleanup-idle"; + const std::string local_base = options.store_path[0]; + options.store_path = {local_base + std::string("/") + suffix}; + options.cloud_store_path.push_back('/'); + options.cloud_store_path += suffix; + + CleanupStore(options); + + auto store = std::make_unique(options); + REQUIRE(store->Start() == eloqstore::KvError::NoError); + store->Stop(); + + const eloqstore::TableIdent tbl_id{"reuse_cleanup_idle", 0}; + const fs::path partition_path = + fs::path(options.store_path.front()) / tbl_id.ToString(); + fs::create_directories(partition_path); + + const fs::path manifest_path = + partition_path / eloqstore::ManifestFileName(0); + const fs::path tmp_path = partition_path / "stale_download.tmp"; + WriteTestFile(manifest_path, "stale-manifest"); + WriteTestFile(tmp_path, "stale-cache"); + REQUIRE(fs::exists(manifest_path)); + REQUIRE(fs::exists(tmp_path)); + + REQUIRE(store->Start() == eloqstore::KvError::NoError); + REQUIRE(WaitForCondition( + chrono::seconds(5), + chrono::milliseconds(20), + [&]() { return !fs::exists(partition_path); })); + + store->Stop(); + CleanupStore(options); +} + TEST_CASE("cloud prewarm honors partition filter", "[cloud][prewarm]") { using namespace std::chrono_literals; @@ -1390,6 +1687,7 @@ TEST_CASE("cloud global reopen refreshes local manifests", "[cloud][reopen]") options.cloud_store_path += "/reopen-global"; options.prewarm_cloud_cache = false; options.allow_reuse_local_caches = true; + options.pages_per_file_shift = 1; // Keep at least one data file after cache restore. CleanupStore(options); @@ -1406,8 +1704,21 @@ TEST_CASE("cloud global reopen refreshes local manifests", "[cloud][reopen]") { verifiers.emplace_back( std::make_unique(tbl_id, store, false)); + verifiers.back()->SetValueSize(10024); } + auto count_data_files = [&](const eloqstore::TableIdent &tbl_id) + { + const std::filesystem::path partition_path = + std::filesystem::path(options.store_path.front()) / tbl_id.ToString(); + const std::vector files = + ListLocalPartitionFiles(partition_path); + return std::count_if(files.begin(), + files.end(), + [](const std::string &name) + { return name.rfind("data_", 0) == 0; }); + }; + // Version 1 data, keep a local backup. std::vector> v1_datasets; v1_datasets.reserve(verifiers.size()); @@ -1422,6 +1733,10 @@ TEST_CASE("cloud global reopen refreshes local manifests", "[cloud][reopen]") // Stop to ensure local files are durable before backup. store->Stop(); + for (const auto &tbl_id : tbl_ids) + { + REQUIRE(count_data_files(tbl_id) > 1); + } const std::string backup_root = "/tmp/test-data-reopen-global-backup"; const std::string manifest_name = eloqstore::ManifestFileName(0); @@ -1486,36 +1801,6 @@ TEST_CASE("cloud global reopen refreshes local manifests", "[cloud][reopen]") std::filesystem::copy_options::recursive | std::filesystem::copy_options::overwrite_existing); } - auto clear_partition_data_files = [&](const eloqstore::TableIdent &table_id) - { - for (const auto &path : options.store_path) - { - std::filesystem::path part_path = - std::filesystem::path(path) / table_id.ToString(); - if (!std::filesystem::exists(part_path)) - { - continue; - } - for (const auto &ent : - std::filesystem::directory_iterator(part_path)) - { - if (!ent.is_regular_file()) - { - continue; - } - auto [type, suffix] = - eloqstore::ParseFileName(ent.path().filename().string()); - if (type == eloqstore::FileNameData) - { - std::filesystem::remove(ent.path()); - } - } - } - }; - for (const auto &tbl_id : tbl_ids) - { - clear_partition_data_files(tbl_id); - } REQUIRE(store->Start() == eloqstore::KvError::NoError); REQUIRE(WaitForCondition(std::chrono::seconds(5), @@ -1523,6 +1808,7 @@ TEST_CASE("cloud global reopen refreshes local manifests", "[cloud][reopen]") [&]() { return store->Inited(); })); for (size_t i = 0; i < tbl_ids.size(); ++i) { + REQUIRE(count_data_files(tbl_ids[i]) > 0); std::filesystem::path restored_manifest = std::filesystem::path(options.store_path.front()) / tbl_ids[i].ToString() / manifest_name; diff --git a/tests/gc.cpp b/tests/gc.cpp index 369950d9..dd0777d0 100644 --- a/tests/gc.cpp +++ b/tests/gc.cpp @@ -62,6 +62,25 @@ bool CheckLocalPartitionExists(const eloqstore::KvOptions &opts, return false; } +std::vector ListLocalPartitionFiles(const eloqstore::KvOptions &opts, + const eloqstore::TableIdent &tbl_id) +{ + std::vector result; + for (const std::string &store_path : opts.store_path) + { + fs::path partition_path = fs::path(store_path) / tbl_id.ToString(); + if (!fs::exists(partition_path)) + { + continue; + } + for (const auto &entry : fs::directory_iterator(partition_path)) + { + result.push_back(entry.path().filename().string()); + } + } + return result; +} + // Helper function to check if cloud partition directory exists bool CheckCloudPartitionExists(const eloqstore::KvOptions &opts, const eloqstore::TableIdent &tbl_id) @@ -95,84 +114,117 @@ void WaitForGC(int seconds = 1) std::this_thread::sleep_for(chrono::seconds(seconds)); } -// TEST_CASE("local mode truncate directory cleanup", "[gc][local]") -// { -// eloqstore::EloqStore *store = InitStore(local_gc_opts); -// eloqstore::TableIdent tbl_id = {"gc_test", 1}; -// MapVerifier tester(tbl_id, store, false); -// tester.SetValueSize(1000); - -// // Write some data to create partition directory -// tester.Upsert(0, 100); -// tester.Validate(); - -// // Verify partition directory exists -// REQUIRE(CheckLocalPartitionExists(local_gc_opts, tbl_id)); - -// // Truncate the partition using MapVerifier (delete all data) -// tester.Truncate(0, true); // Delete all data - -// // Wait for GC to process -// WaitForGC(); - -// // Verify partition directory is removed -// REQUIRE_FALSE(CheckLocalPartitionExists(local_gc_opts, tbl_id)); -// } - -// TEST_CASE("local mode repeated write and truncate", "[gc][local]") -// { -// CleanupStore(local_gc_opts); - -// eloqstore::EloqStore *store = InitStore(local_gc_opts); -// eloqstore::TableIdent tbl_id = {"gc_repeat", 1}; -// MapVerifier tester(tbl_id, store, false); -// tester.SetValueSize(1000); +namespace +{ +template +bool WaitForCondition(std::chrono::milliseconds timeout, + std::chrono::milliseconds step, + Pred &&pred) +{ + auto deadline = std::chrono::steady_clock::now() + timeout; + while (std::chrono::steady_clock::now() < deadline) + { + if (pred()) + { + return true; + } + std::this_thread::sleep_for(step); + } + return pred(); +} +} // namespace -// // Repeat write and truncate operations -// for (int i = 0; i < 3; i++) -// { -// // Write data -// tester.Upsert(i * 100, (i + 1) * 100); -// tester.Validate(); +TEST_CASE("local mode truncate preserves current manifest", "[gc][local]") +{ + using namespace std::chrono_literals; -// // Verify partition directory exists -// REQUIRE(CheckLocalPartitionExists(local_gc_opts, tbl_id)); + CleanupStore(local_gc_opts); -// // Truncate using MapVerifier (delete all data) -// tester.Truncate(0, true); // Delete all data + eloqstore::EloqStore *store = InitStore(local_gc_opts); + eloqstore::TableIdent tbl_id = {"gc_test", 1}; + MapVerifier tester(tbl_id, store, false); + tester.SetValueSize(1000); -// // Wait for GC -// WaitForGC(); + tester.Upsert(0, 100); + tester.Validate(); + REQUIRE(CheckLocalPartitionExists(local_gc_opts, tbl_id)); + + tester.Truncate(0, true); + + REQUIRE(WaitForCondition(3s, + 20ms, + [&]() + { + std::vector files = + ListLocalPartitionFiles(local_gc_opts, + tbl_id); + if (files.empty()) + { + return false; + } + return files.size() == 1 && + files[0] == eloqstore::ManifestFileName(0); + })); + + CleanupStore(local_gc_opts); +} -// // Verify directory is cleaned up -// REQUIRE_FALSE(CheckLocalPartitionExists(local_gc_opts, tbl_id)); -// } -// } +TEST_CASE("local mode clean manifest removes empty partition directory", + "[gc][local]") +{ + using namespace std::chrono_literals; -// TEST_CASE("local mode delete all data cleanup", "[gc][local]") -// { -// eloqstore::EloqStore *store = InitStore(local_gc_opts); -// eloqstore::TableIdent tbl_id = {"gc_delete_all", 1}; -// MapVerifier tester(tbl_id, store, false); -// tester.SetValueSize(1000); + eloqstore::KvOptions opts = local_gc_opts; + opts.num_threads = 1; + opts.root_meta_cache_size = 5000; + opts.init_page_count = 8; + opts.data_page_size = 4096; -// // Write some data -// tester.Upsert(0, 100); -// tester.Validate(); + CleanupStore(opts); -// // Verify partition directory exists -// REQUIRE(CheckLocalPartitionExists(local_gc_opts, tbl_id)); + eloqstore::EloqStore *store = InitStore(opts); + eloqstore::TableIdent tbl_id = {"gc_delete_all", 1}; + MapVerifier tester(tbl_id, store, false); + tester.SetValueSize(256); -// // Delete all data -// tester.Delete(0, 100); -// tester.Validate(); + tester.Upsert(0, 200); + tester.Validate(); + REQUIRE(CheckLocalPartitionExists(opts, tbl_id)); + + tester.Truncate(0, true); + + REQUIRE(WaitForCondition(3s, + 20ms, + [&]() + { + std::vector files = + ListLocalPartitionFiles(opts, tbl_id); + return files.size() == 1 && + files[0] == eloqstore::ManifestFileName(0); + })); + + std::vector> evictors; + for (uint32_t pid = 2; pid < 12; ++pid) + { + auto verifier = + std::make_unique(eloqstore::TableIdent{"gc_evict", pid}, + store, + false); + verifier->SetAutoClean(false); + verifier->SetValueSize(256); + verifier->Upsert(0, 200); + verifier->Read(0); + verifier->Read(1); + evictors.emplace_back(std::move(verifier)); + } -// // Wait for GC to process -// WaitForGC(); + REQUIRE(WaitForCondition(5s, + 20ms, + [&]() { return !CheckLocalPartitionExists(opts, + tbl_id); })); -// // Verify partition directory is removed after deleting all data -// REQUIRE_FALSE(CheckLocalPartitionExists(local_gc_opts, tbl_id)); -// } + CleanupStore(opts); +} TEST_CASE("cloud mode truncate remote directory cleanup", "[gc][cloud]") {