Skip to content

gc remove local dir#414

Open
liunyl wants to merge 3 commits intomainfrom
gc
Open

gc remove local dir#414
liunyl wants to merge 3 commits intomainfrom
gc

Conversation

@liunyl
Copy link
Contributor

@liunyl liunyl commented Mar 16, 2026

Here are some reminders before you submit the pull request

  • Add tests for the change
  • Document changes
  • Reference the link of issue using fixes eloqdb/eloqstore#issue_id
  • Reference the link of RFC if exists
  • Pass ctest --test-dir build/tests/

Summary by CodeRabbit

Release Notes

  • New Features

    • Automatic cleanup of local partition directories during garbage collection operations.
    • Enhanced archive tagging for improved database version and snapshot management.
    • Improved manifest recovery and detection during database restore and startup procedures.
  • Bug Fixes

    • Fixed orphaned partition directory retention after garbage collection completes.

@coderabbitai
Copy link

coderabbitai bot commented Mar 16, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR introduces a garbage collection cleanup coordination mechanism that enables tracking of deleted cloud files, scheduling local partition directory cleanup after GC operations, and adds tag-aware manifest/archive handling. The changes coordinate between cloud file deletion and local cache cleanup through new request/tracking APIs.

Changes

Cohort / File(s) Summary
GC Cleanup Coordination
include/async_io_manager.h, src/async_io_manager.cpp
Adds TryCleanupLocalPartitionDir to safely close and unlink idle partition directories, introduces RequestGcLocalCleanup for enqueueing GC cleanup tasks, extends FileCleaner::Run to prioritize GC cleanup with file eviction and partition trimming, and updates IsIdle() to track pending GC cleanup queue.
File Deletion Tracking
include/file_gc.h, src/file_gc.cpp
Updates DeleteUnreferencedCloudFiles signature to output deleted filenames and manifest-deletion flag, extends ExecuteCloudGC to trigger RequestGcLocalCleanup when deletions occur, and updates DeleteUnreferencedLocalFiles and ExecuteLocalGC to use new RetainedFiles structure for retention checks.
Archive & Manifest APIs
include/async_io_manager.h, src/async_io_manager.cpp
Updates MemStoreMgr::CreateArchive and DeleteArchive to accept tag parameters, extends CloudStoreMgr::RefreshManifest signature with archive_tag, and adds CloudStoreMgr::RequestGcLocalCleanup for cleanup signaling.
Manifest State Management
include/storage/index_page_manager.h, src/storage/index_page_manager.cpp
Adds MarkManifestMissing() method to clear manifest state after GC cleanup, updates InstallExternalSnapshot signature to include reopen_tag parameter for enhanced observability during manifest refresh operations.
Test Infrastructure & Coverage
tests/cloud.cpp, tests/gc.cpp
Adds test helpers (ListLocalPartitionFiles, WaitForCondition) for GC verification, introduces test cases for local partition cleanup, manifest handling during truncation, partition directory purge with cache deletion, and restore behavior during restart scenarios.

Sequence Diagram(s)

sequenceDiagram
    participant FileCleaner as FileCleaner::Run
    participant CloudMgr as CloudStoreMgr
    participant GC as GC Cleanup<br/>(ExecuteCloudGC)
    participant IO as IouringMgr<br/>(FileCleaner)
    participant Index as IndexPageManager
    participant Shard as Shard

    FileCleaner->>GC: ExecuteCloudGC(tbl_id,<br/>retained_files)
    
    GC->>GC: DeleteUnreferencedCloudFiles()<br/>returns deleted_filenames
    
    alt Deletions Occurred
        GC->>CloudMgr: RequestGcLocalCleanup(tbl_id,<br/>deleted_filenames)
        
        GC->>Index: MarkManifestMissing<br/>(if manifest deleted)
        
        CloudMgr->>CloudMgr: Enqueue GC cleanup<br/>to pending queue
        
        CloudMgr->>FileCleaner: Wake if idle
    end
    
    FileCleaner->>FileCleaner: Poll pending_gc_cleanup_queue
    
    FileCleaner->>IO: CloseFile()/Evict files<br/>via io_uring unlink
    
    FileCleaner->>IO: TryCleanupLocalPartitionDir<br/>(touched partitions)
    
    IO->>IO: Close idle FDs,<br/>verify dir state
    
    alt Directory Idle
        IO->>IO: unlink partition dir
    end
    
    FileCleaner->>FileCleaner: Mark GC cleanup done
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

  • #313: Modifies IndexPageManager cow rootmeta lifecycle and InstallExternalSnapshot parameter handling, directly intersecting with manifest state management changes.
  • #356: Updates CloudStoreMgr::RefreshManifest and IndexPageManager::InstallExternalSnapshot signatures with tag/reopen_tag parameters, aligning with manifest-refresh API changes.
  • #293: Modifies file GC API signatures (DeleteUnreferencedCloudFiles, ExecuteCloudGC) and retention structures, overlapping with GC function signature and RetainedFiles usage updates.

Suggested labels

enhancement

Suggested reviewers

  • thweetkomputer
  • eatbreads

Poem

🐰 Hop, hop, the files do fall,
GC cleanup answers the call,
Mark manifests missing with care,
Partition dirs disappear in air,
Local cache sings its final prayer!

🚥 Pre-merge checks | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description only contains an empty checklist template with no implementation details, behavioral changes, test information, or issue references. Complete the PR description by explaining the changes made, why they're needed, what functionality was added, and reference any related issues or RFCs. Ensure all checklist items are properly addressed.
Docstring Coverage ⚠️ Warning Docstring coverage is 7.18% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'gc remove local dir' is vague and generic, lacking specificity about what changes are being made or which components are affected. Provide a more descriptive title that explains the specific functionality being added, such as 'Add local partition directory cleanup during GC' or 'Implement garbage collection for local partition directories'.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch gc
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

You can validate your CodeRabbit configuration file in your editor.

If your editor has YAML language server, you can enable auto-completion and validation by adding # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json at the top of your CodeRabbit configuration file.

return false;
}

std::vector<std::string> ListLocalPartitionFiles(const eloqstore::KvOptions &opts,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

}

std::vector<std::string> ListLocalPartitionFiles(const eloqstore::KvOptions &opts,
const eloqstore::TableIdent &tbl_id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

// tester.Validate();
TEST_CASE("local mode truncate preserves current manifest", "[gc][local]")
{
using namespace std::chrono_literals;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Do not use namespace using-directives. Use using-declarations instead. [build/namespaces] [5]

return false;
}
return files.size() == 1 &&
files[0] == eloqstore::ManifestFileName(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

TEST_CASE("local mode clean manifest removes empty partition directory",
"[gc][local]")
{
using namespace std::chrono_literals;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Do not use namespace using-directives. Use using-declarations instead. [build/namespaces] [5]

std::vector<std::string> files =
ListLocalPartitionFiles(opts, tbl_id);
return files.size() == 1 &&
files[0] == eloqstore::ManifestFileName(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

for (uint32_t pid = 2; pid < 12; ++pid)
{
auto verifier =
std::make_unique<MapVerifier>(eloqstore::TableIdent{"gc_evict", pid},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

REQUIRE(WaitForCondition(5s,
20ms,
[&]() { return !CheckLocalPartitionExists(opts,
tbl_id); }));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

for (uint32_t pid = 2; pid < 12; ++pid)
{
auto verifier =
std::make_unique<MapVerifier>(eloqstore::TableIdent{"gc_evict", pid},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Add #include for make_unique<> [build/include_what_you_use] [4]

verifier->Upsert(0, 200);
verifier->Read(0);
verifier->Read(1);
evictors.emplace_back(std::move(verifier));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Add #include for move [build/include_what_you_use] [4]

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/file_gc.cpp (1)

436-544: ⚠️ Potential issue | 🟡 Minor

Consider handling partial deletion failures.

When any delete task fails (lines 535-540), the function returns immediately without populating deleted_filenames. This means if some cloud files were successfully deleted before the failure, the corresponding local cache files won't be cleaned up.

While this is consistent with fail-fast behavior (per learnings), it could leave orphaned local cache files if cloud deletion partially succeeds. The next GC cycle should eventually clean these up, but you may want to document this behavior or consider tracking successfully deleted files before the failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/file_gc.cpp` around lines 436 - 544, The function
DeleteUnreferencedCloudFiles currently returns immediately on the first failed
delete task and leaves deleted_filenames empty, which can prevent cleanup of
locally cached files that were actually removed remotely; update
DeleteUnreferencedCloudFiles to scan all delete_tasks after WaitIo(), collect
basenames corresponding to tasks with task.error_ == KvError::NoError into
deleted_filenames (use the basenames_to_delete vector to map indices to names),
log each failing task (but continue the scan), and then return a non-success
KvError (e.g., the first encountered error or an aggregated error) so callers
know the operation partially failed while still receiving the list of
successfully deleted filenames. Ensure references: DeleteUnreferencedCloudFiles,
deleted_filenames, delete_tasks, basenames_to_delete, and task.error_.
🧹 Nitpick comments (2)
tests/gc.cpp (1)

65-82: Inconsistency: This helper includes all directory entries, not just regular files.

Unlike ListLocalPartitionFiles in tests/cloud.cpp (which filters to is_regular_file()), this version includes all entries. This could cause tests to behave unexpectedly if subdirectories exist in the partition path.

Consider filtering to regular files for consistency:

♻️ Suggested fix
 for (const auto &entry : fs::directory_iterator(partition_path))
 {
+    if (!entry.is_regular_file())
+    {
+        continue;
+    }
     result.push_back(entry.path().filename().string());
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/gc.cpp` around lines 65 - 82, ListLocalPartitionFiles currently pushes
all directory entries (including subdirectories and symlinks) which is
inconsistent with the cloud version; update the function
(ListLocalPartitionFiles) to only include regular files by checking each entry
with fs::is_regular_file(entry.path()) or entry.is_regular_file() before pushing
its filename so the result matches the behavior of the ListLocalPartitionFiles
in tests/cloud.cpp.
tests/cloud.cpp (1)

138-154: The reinterpret_cast for accessing private members is fragile.

This approach relies on the exact memory layout of EloqStore matching EloqStoreAccessor. While it currently aligns with the layout in include/eloq_store.h (options_, root_fds_, cloud_service_, shards_), any future changes to EloqStore's member order or addition of members before shards_ will silently break this.

Consider adding EloqStore as a friend of the test or providing a test-only accessor method to make this more robust.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/cloud.cpp` around lines 138 - 154, The test uses a fragile
reinterpret_cast in PrimaryShard to access internal members of EloqStore
(EloqStoreAccessor, shards_), which breaks if EloqStore's layout changes;
instead expose a stable test accessor: add a test-only method on EloqStore
(e.g., GetPrimaryShard() or GetShards()) or declare the test helper as a friend
so tests can call store->GetPrimaryShard() (or access shards_ directly via
friendship) and update PrimaryShard to call that method rather than
reinterpret_casting; reference EloqStore, EloqStoreAccessor, PrimaryShard, and
shards_ when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/file_gc.cpp`:
- Around line 436-544: The function DeleteUnreferencedCloudFiles currently
returns immediately on the first failed delete task and leaves deleted_filenames
empty, which can prevent cleanup of locally cached files that were actually
removed remotely; update DeleteUnreferencedCloudFiles to scan all delete_tasks
after WaitIo(), collect basenames corresponding to tasks with task.error_ ==
KvError::NoError into deleted_filenames (use the basenames_to_delete vector to
map indices to names), log each failing task (but continue the scan), and then
return a non-success KvError (e.g., the first encountered error or an aggregated
error) so callers know the operation partially failed while still receiving the
list of successfully deleted filenames. Ensure references:
DeleteUnreferencedCloudFiles, deleted_filenames, delete_tasks,
basenames_to_delete, and task.error_.

---

Nitpick comments:
In `@tests/cloud.cpp`:
- Around line 138-154: The test uses a fragile reinterpret_cast in PrimaryShard
to access internal members of EloqStore (EloqStoreAccessor, shards_), which
breaks if EloqStore's layout changes; instead expose a stable test accessor: add
a test-only method on EloqStore (e.g., GetPrimaryShard() or GetShards()) or
declare the test helper as a friend so tests can call store->GetPrimaryShard()
(or access shards_ directly via friendship) and update PrimaryShard to call that
method rather than reinterpret_casting; reference EloqStore, EloqStoreAccessor,
PrimaryShard, and shards_ when making the change.

In `@tests/gc.cpp`:
- Around line 65-82: ListLocalPartitionFiles currently pushes all directory
entries (including subdirectories and symlinks) which is inconsistent with the
cloud version; update the function (ListLocalPartitionFiles) to only include
regular files by checking each entry with fs::is_regular_file(entry.path()) or
entry.is_regular_file() before pushing its filename so the result matches the
behavior of the ListLocalPartitionFiles in tests/cloud.cpp.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f0478db3-e739-4d08-80ee-0e74801ff4bc

📥 Commits

Reviewing files that changed from the base of the PR and between 004823a and b1ad65b.

📒 Files selected for processing (6)
  • include/async_io_manager.h
  • include/file_gc.h
  • src/async_io_manager.cpp
  • src/file_gc.cpp
  • tests/cloud.cpp
  • tests/gc.cpp

{
return files;
}
for (const auto &entry : std::filesystem::directory_iterator(partition_path))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

TEST_CASE("cloud gc removes local cached files after remote truncate",
"[cloud][gc][targeted]")
{
using namespace std::chrono_literals;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Do not use namespace using-directives. Use using-declarations instead. [build/namespaces] [5]

REQUIRE(WaitForCondition(
10s,
50ms,
[&]() { return ContainsFileWithPrefix(ListLocalPartitionFiles(partition_path), "data_"); }));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

CleanupStore(options);
}

TEST_CASE("cloud gc waits for an actively referenced local file before deleting cache",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

TEST_CASE("cloud gc waits for an actively referenced local file before deleting cache",
"[cloud][gc][targeted]")
{
using namespace std::chrono_literals;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Do not use namespace using-directives. Use using-declarations instead. [build/namespaces] [5]

REQUIRE(WaitForCondition(
10s,
50ms,
[&]() { return FindLowestDataFile(ListLocalPartitionFiles(partition_path)).has_value(); }));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]


eloqstore::Shard *shard = PrimaryShard(store);
REQUIRE(shard != nullptr);
auto *cloud_mgr = static_cast<eloqstore::CloudStoreMgr *>(shard->IoManager());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]


held_fd = eloqstore::IouringMgr::LruFD::Ref();
REQUIRE(
WaitForCondition(20s, 100ms, [&]() { return !fs::exists(target_path); }));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/async_io_manager.cpp`:
- Around line 3177-3197: The cleanup currently only attempts to remove partition
directories inside RestoreFilesForTable() when retained_files == 0, which misses
directories that become empty later during RestoreLocalCacheState() trimming;
update the logic so that after RestoreLocalCacheState() (or immediately after
any trimming/eviction that can decrement retained_files) you re-check the
partition directory and attempt removal (e.g., call fs::remove or test
fs::is_empty on table_path) and log errors similarly; reference
RestoreFilesForTable(), RestoreLocalCacheState(), the retained_files variable
and the existing remove/table_path removal block so the removal attempt runs
after trimming completes and covers the over-budget case.
- Around line 3263-3268: IsIdle() currently treats pending_gc_cleanup_queue_ as
making the shard non-idle, which hides the fact the FileCleaner has yielded and
will retry later; update CloudStoreMgr::IsIdle() (the CloudStoreMgr::IsIdle
method) to stop factoring pending_gc_cleanup_queue_.empty() into the idle check
so deferred GC retries don't prevent idle detection—i.e., remove or ignore
pending_gc_cleanup_queue_ in the boolean expression and rely on
file_cleaner_.status_, CloseFile() resuming the cleaner, and the other counts
(active_prewarm_tasks_, inflight_cloud_slots_, obj_store_.HasPendingWork()) to
determine idleness.

In `@tests/cloud.cpp`:
- Around line 72-88: ListLocalPartitionFiles currently calls
std::filesystem::exists() then constructs a std::filesystem::directory_iterator,
which can throw if the directory is removed concurrently; change it to use the
std::error_code overloads and non-throwing checks: call
std::filesystem::exists(partition_path, ec) and iterate with
std::filesystem::directory_iterator(partition_path, ec), bail out and return the
empty vector if ec is set, and use non-throwing checks (e.g.,
entry.is_regular_file() or status with an error_code) while iterating; update
the logic inside ListLocalPartitionFiles to handle errors via error_code rather
than letting std::filesystem::filesystem_error propagate.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b5b26d78-3cfc-43d3-a1cc-ed90da61c0e2

📥 Commits

Reviewing files that changed from the base of the PR and between b1ad65b and 660b436.

📒 Files selected for processing (3)
  • src/async_io_manager.cpp
  • src/file_gc.cpp
  • tests/cloud.cpp

Comment on lines +72 to +88
std::vector<std::string> ListLocalPartitionFiles(
const std::filesystem::path &partition_path)
{
std::vector<std::string> files;
if (!std::filesystem::exists(partition_path))
{
return files;
}
for (const auto &entry : std::filesystem::directory_iterator(partition_path))
{
if (entry.is_regular_file())
{
files.push_back(entry.path().filename().string());
}
}
return files;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make ListLocalPartitionFiles() resilient to concurrent deletion.

These tests intentionally race with async cleanup, so the partition can disappear between exists() and directory_iterator(...). Right now that turns a valid outcome into a flaky filesystem_error instead of just returning “no files”.

Suggested fix
 std::vector<std::string> ListLocalPartitionFiles(
     const std::filesystem::path &partition_path)
 {
     std::vector<std::string> files;
-    if (!std::filesystem::exists(partition_path))
+    std::error_code ec;
+    if (!std::filesystem::exists(partition_path, ec) || ec)
     {
         return files;
     }
-    for (const auto &entry : std::filesystem::directory_iterator(partition_path))
+    for (std::filesystem::directory_iterator it(partition_path, ec), end;
+         !ec && it != end;
+         it.increment(ec))
     {
-        if (entry.is_regular_file())
+        std::error_code type_ec;
+        if (it->is_regular_file(type_ec) && !type_ec)
         {
-            files.push_back(entry.path().filename().string());
+            files.push_back(it->path().filename().string());
         }
     }
     return files;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::vector<std::string> ListLocalPartitionFiles(
const std::filesystem::path &partition_path)
{
std::vector<std::string> files;
if (!std::filesystem::exists(partition_path))
{
return files;
}
for (const auto &entry : std::filesystem::directory_iterator(partition_path))
{
if (entry.is_regular_file())
{
files.push_back(entry.path().filename().string());
}
}
return files;
}
std::vector<std::string> ListLocalPartitionFiles(
const std::filesystem::path &partition_path)
{
std::vector<std::string> files;
std::error_code ec;
if (!std::filesystem::exists(partition_path, ec) || ec)
{
return files;
}
for (std::filesystem::directory_iterator it(partition_path, ec), end;
!ec && it != end;
it.increment(ec))
{
std::error_code type_ec;
if (it->is_regular_file(type_ec) && !type_ec)
{
files.push_back(it->path().filename().string());
}
}
return files;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/cloud.cpp` around lines 72 - 88, ListLocalPartitionFiles currently
calls std::filesystem::exists() then constructs a
std::filesystem::directory_iterator, which can throw if the directory is removed
concurrently; change it to use the std::error_code overloads and non-throwing
checks: call std::filesystem::exists(partition_path, ec) and iterate with
std::filesystem::directory_iterator(partition_path, ec), bail out and return the
empty vector if ec is set, and use non-throwing checks (e.g.,
entry.is_regular_file() or status with an error_code) while iterating; update
the logic inside ListLocalPartitionFiles to handle errors via error_code rather
than letting std::filesystem::filesystem_error propagate.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
src/async_io_manager.cpp (3)

3177-3197: ⚠️ Potential issue | 🟠 Major

Startup directory cleanup still misses partitions emptied during post-restore trimming.

Line 3177 removes the partition directory only when retained_files == 0 before TrimRestoredCacheUsage(). If trimming later evicts the last retained file, the now-empty partition dir is never revisited.

Suggested fix
-    auto [trimmed_files, trimmed_bytes] = TrimRestoredCacheUsage();
+    auto [trimmed_files, trimmed_bytes] = TrimRestoredCacheUsage();
+    // Re-check directories after trimming, because trimming may empty a partition.
+    // (Implement by collecting touched TableIdent values during trimming and
+    // calling TryCleanupLocalPartitionDir(tbl_id) here.)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3177 - 3197, The cleanup logic
currently removes a partition directory only when retained_files == 0 before
calling TrimRestoredCacheUsage(), so partitions that are emptied by
TrimRestoredCacheUsage() are never removed; move or duplicate the
empty-directory removal logic to run after TrimRestoredCacheUsage() (or extract
it into a helper invoked both before and after trimming), using the same
fs::remove with remove_ec handling and the same LOG/error return behavior
referencing table_path and tbl_id so that directories emptied during
post-restore trimming are removed and errors are reported consistently.

5071-5071: ⚠️ Potential issue | 🟡 Minor

Add a direct include for std::unordered_set.

std::unordered_set<TableIdent> is used at Line 5071, but <unordered_set> is not included in this translation unit. Please include it explicitly to avoid transitive-include fragility.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` at line 5071, The translation unit uses
std::unordered_set<TableIdent> (see the touched_partitions declaration) but
lacks a direct include for <unordered_set>; add an explicit `#include`
<unordered_set> near the other STL includes so the type is defined (reference
symbols: std::unordered_set, TableIdent, touched_partitions).

3263-3268: ⚠️ Potential issue | 🟠 Major

Do not gate shard idleness on pending GC retry queue.

Line 3266 makes deferred GC retries look “busy” even when FileCleaner is idle/yielded and waiting for a close event. That can break idle detection and cause unnecessary worker spinning.

Suggested fix
 bool CloudStoreMgr::IsIdle()
 {
     return file_cleaner_.status_ == TaskStatus::Idle &&
-           pending_gc_cleanup_queue_.empty() &&
            active_prewarm_tasks_ == 0 && inflight_cloud_slots_ == 0 &&
            !obj_store_.HasPendingWork();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3263 - 3268, CloudStoreMgr::IsIdle
currently treats pending_gc_cleanup_queue_ as keeping the shard busy; remove
that check so deferred GC retry entries don't prevent idleness — update the
IsIdle() method to rely on file_cleaner_.status_ (TaskStatus),
active_prewarm_tasks_, inflight_cloud_slots_, and obj_store_.HasPendingWork()
only, or alternately only consider pending_gc_cleanup_queue_ when
file_cleaner_.status_ is not Idle/Yielded; change the condition in IsIdle()
accordingly to avoid gating idle detection on pending_gc_cleanup_queue_.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/async_io_manager.cpp`:
- Around line 3177-3197: The cleanup logic currently removes a partition
directory only when retained_files == 0 before calling TrimRestoredCacheUsage(),
so partitions that are emptied by TrimRestoredCacheUsage() are never removed;
move or duplicate the empty-directory removal logic to run after
TrimRestoredCacheUsage() (or extract it into a helper invoked both before and
after trimming), using the same fs::remove with remove_ec handling and the same
LOG/error return behavior referencing table_path and tbl_id so that directories
emptied during post-restore trimming are removed and errors are reported
consistently.
- Line 5071: The translation unit uses std::unordered_set<TableIdent> (see the
touched_partitions declaration) but lacks a direct include for <unordered_set>;
add an explicit `#include` <unordered_set> near the other STL includes so the type
is defined (reference symbols: std::unordered_set, TableIdent,
touched_partitions).
- Around line 3263-3268: CloudStoreMgr::IsIdle currently treats
pending_gc_cleanup_queue_ as keeping the shard busy; remove that check so
deferred GC retry entries don't prevent idleness — update the IsIdle() method to
rely on file_cleaner_.status_ (TaskStatus), active_prewarm_tasks_,
inflight_cloud_slots_, and obj_store_.HasPendingWork() only, or alternately only
consider pending_gc_cleanup_queue_ when file_cleaner_.status_ is not
Idle/Yielded; change the condition in IsIdle() accordingly to avoid gating idle
detection on pending_gc_cleanup_queue_.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d16bd0fd-ce79-41a1-a98d-7e9a44b5bccf

📥 Commits

Reviewing files that changed from the base of the PR and between 660b436 and 4c1a741.

📒 Files selected for processing (1)
  • src/async_io_manager.cpp

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
include/tasks/background_write.h (1)

3-6: ⚠️ Potential issue | 🟡 Minor

Missing #include <string_view> for std::string_view usage.

std::string_view is used in the CreateArchive declaration (line 23) but the <string_view> header is not explicitly included. Relying on transitive includes is fragile and may cause compilation failures.

Proposed fix
 `#include` <utility>
 `#include` <vector>
+#include <string_view>
 
 `#include` "tasks/write_task.h"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/tasks/background_write.h` around lines 3 - 6, The header is missing
an explicit include for std::string_view used in the CreateArchive declaration;
add `#include` <string_view> near the other includes in
include/tasks/background_write.h so CreateArchive and any other uses of
std::string_view compile reliably without relying on transitive includes.
src/async_io_manager.cpp (1)

3948-3975: ⚠️ Potential issue | 🟠 Major

Archive reopen only probes the current term.

This branch always downloads ArchiveName(ProcessTerm(), archive_tag). Once the process term advances, reopening an older tagged snapshot will return NotFound even though manifest_<old_term>_<tag> still exists in cloud storage. Resolve the archive by tag first, then download the matching term.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3948 - 3975, The code currently
constructs selected_filename with ArchiveName(ProcessTerm(), archive_tag) and
then downloads it, which fails when the requested tag exists only under an older
term; change the logic to first resolve which term owns the given archive_tag
(e.g. implement or call a helper like ResolveTermForArchiveTag(archive_tag) that
queries object store for manifest_<term>_<tag> or lists/archive manifests to
find the matching term), set selected_term to that resolved term (with an
optional fallback to ProcessTerm() if no manifest is found), then compute
selected_filename = ArchiveName(selected_term, archive_tag) and invoke the
existing download_to_buffer(lambda) using that filename; keep existing error
handling and buffer acquisition but ensure the resolution step runs before
download_to_buffer so reopen uses the manifest's term, not the current
ProcessTerm().
src/file_gc.cpp (1)

367-418: ⚠️ Potential issue | 🔴 Critical

Don't infer archive recency from the user tag.

Tags are now arbitrary strings, but GC still picks the “latest” archive by numeric/lexicographic tag order. A newer snapshot tagged "daily" can sort behind an older "weekly" one, which makes least_not_archived_file_id too small and lets GC delete files still needed by the newest retained archive. Order by real snapshot/version metadata instead, or compute the max file id across all retained archives.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/file_gc.cpp` around lines 367 - 418, GetOrUpdateArchivedMaxFileId
currently selects a single "latest" archive using archive_tags
(numeric/lexicographic), which is invalid because tags are arbitrary; instead,
iterate over all archive_files (and their metadata/snapshots) to compute the
maximum archived file id and use that to set least_not_archived_file_id.
Concretely, in GetOrUpdateArchivedMaxFileId replace the tag-based selection
logic (variables: latest_archive, latest_tag, latest_is_numeric, latest_numeric
and the for-loop decision) with code that for each archive_files[i] reads the
archive's snapshot/version metadata (or otherwise extracts its max-file-id) and
tracks the overall maximum file id across all retained archives, then set
least_not_archived_file_id to that maximum and cache it in
io_mgr->least_not_archived_file_ids_.
♻️ Duplicate comments (2)
tests/cloud.cpp (1)

72-88: ⚠️ Potential issue | 🟡 Minor

Make ListLocalPartitionFiles() resilient to concurrent deletion.

This helper is used in tests that race with async cleanup. The partition directory can disappear between exists() and directory_iterator(...), causing a filesystem_error exception instead of gracefully returning an empty vector.

🛡️ Proposed fix using error_code overloads
 std::vector<std::string> ListLocalPartitionFiles(
     const std::filesystem::path &partition_path)
 {
     std::vector<std::string> files;
-    if (!std::filesystem::exists(partition_path))
+    std::error_code ec;
+    if (!std::filesystem::exists(partition_path, ec) || ec)
     {
         return files;
     }
-    for (const auto &entry : std::filesystem::directory_iterator(partition_path))
+    for (std::filesystem::directory_iterator it(partition_path, ec), end;
+         !ec && it != end;
+         it.increment(ec))
     {
-        if (entry.is_regular_file())
+        std::error_code type_ec;
+        if (it->is_regular_file(type_ec) && !type_ec)
         {
-            files.push_back(entry.path().filename().string());
+            files.push_back(it->path().filename().string());
         }
     }
     return files;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/cloud.cpp` around lines 72 - 88, ListLocalPartitionFiles currently
calls std::filesystem::exists() and std::filesystem::directory_iterator(...)
which can throw if the directory is removed concurrently; change to use the
error_code overloads to avoid exceptions: call
std::filesystem::exists(partition_path, ec) and if it returns false (or ec set)
return empty vector, then construct
std::filesystem::directory_iterator(partition_path, ec) and if ec is set return
empty vector, otherwise iterate the iterator normally and push regular file
names; reference function ListLocalPartitionFiles and use a local
std::error_code variable for both checks.
src/async_io_manager.cpp (1)

3254-3298: ⚠️ Potential issue | 🟠 Major

Restore-time trimming still misses partitions that become empty later.

TrimRestoredCacheUsage() deletes restored files directly but never re-checks the parent partition directory. If over-budget trimming evicts the last restored file for a table, startup still leaves an empty local dir behind. Track touched partitions here and call TryCleanupLocalPartitionDir() after successful trims.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3254 - 3298, TrimRestoredCacheUsage
deletes restored files but doesn’t clean up now-empty partition directories,
leaving empty local dirs after startup trims; update
CloudStoreMgr::TrimRestoredCacheUsage to record the partition identity (e.g.
from FileKey.tbl_id_ or the parent directory of file_path) for every successful
remove + victim->Deque() + closed_files_.erase(key_copy), collect them into a
set of touched partitions, and after the trimming loop call
TryCleanupLocalPartitionDir() for each unique partition to remove empty dirs;
ensure you only call TryCleanupLocalPartitionDir() for partitions where deletion
succeeded (and after updating used_local_space_) to avoid unnecessary work.
🧹 Nitpick comments (7)
include/storage/index_page_manager.h (1)

61-65: LGTM! Consider updating the doc comment.

The signature change is well-designed: using std::string_view by value is idiomatic, and the default {} maintains backward compatibility with existing callers.

Optional: The comment on lines 61-62 could be updated to briefly describe the purpose of the reopen_tag parameter for future maintainers.
,

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/storage/index_page_manager.h` around lines 61 - 65, Update the doc
comment for InstallExternalSnapshot to mention the new reopen_tag parameter and
its purpose: explain that reopen_tag (std::string_view reopen_tag = {}) is an
optional tag used when reopening or annotating the installed external snapshot
(e.g., to record provenance, trigger reopening behavior, or associate a
manifest/reopen marker) and that it defaults to empty to preserve backward
compatibility; modify the comment immediately above the InstallExternalSnapshot
declaration to include a short phrase about reopen_tag and its default behavior
to help future maintainers locate and understand this parameter.
include/utils.h (1)

255-262: Dead code: total_slots == 0 is unreachable.

Since the normalization loop at lines 229-235 ensures all weights are at least 1, and we only reach line 250 if weights is non-empty (guarded by line 223), total_slots will always be ≥ normalized.size() > 0. This branch can never execute.

♻️ Suggested simplification
     size_t total_slots = 0;
     for (uint64_t weight : normalized)
     {
         total_slots += weight;
     }
-    if (total_slots == 0)
-    {
-        total_slots = normalized.size();
-        for (uint64_t &weight : normalized)
-        {
-            weight = 1;
-        }
-    }
 
     if (total_slots > max_entries)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/utils.h` around lines 255 - 262, The if (total_slots == 0) branch is
dead and should be removed: delete the entire conditional that sets total_slots
= normalized.size() and resets normalized weights to 1, and rely on the existing
normalization and non-empty weights guard (weights, normalized) to guarantee
total_slots > 0; optionally replace the removed branch with a debug assert or
comment if you want to document the invariant that total_slots >=
normalized.size(). Ensure references to total_slots and normalized in
surrounding code remain correct after removal.
include/async_io_manager.h (1)

1020-1021: Consider documenting the relationship between these two data structures.

pending_gc_cleanup_ (unordered_set) and pending_gc_cleanup_queue_ (deque) appear to work together for GC cleanup tracking. A brief comment explaining their relationship (e.g., set for O(1) lookup, deque for ordered processing) would improve maintainability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/async_io_manager.h` around lines 1020 - 1021, Add a brief comment
above the declarations of pending_gc_cleanup_ and pending_gc_cleanup_queue_
explaining their relationship: pending_gc_cleanup_ is an unordered_set used for
O(1) membership checks to prevent duplicate enqueueing, and
pending_gc_cleanup_queue_ is a deque that preserves FIFO ordering for processing
cleanup tasks; also mention the invariant that both containers must be kept in
sync when pushing or popping FileKey entries (i.e., insert into set when
enqueuing, remove from set when dequeuing or canceling). This clarifies intent
for future maintainers and points to the synchronization requirement for any
code interacting with these containers.
include/tasks/reopen_task.h (1)

16-22: Add a clarifying comment to document ownership semantics for request_.

The request_ pointer is used in the Reopen() implementation with a null check, but the header provides no documentation of the ownership contract. Add a comment clarifying that the caller must ensure ReopenRequest outlives the task execution, or document that SetRequest should only be called with a valid pointer passed from the caller's stack frame.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/tasks/reopen_task.h` around lines 16 - 22, Add a clear ownership
comment above the private member request_ (and/or above SetRequest) stating the
ownership contract: SetRequest(ReopenRequest *req) does not take ownership or
perform copies, the caller retains ownership and must ensure the ReopenRequest
object outlives the task execution and any call to Reopen(); do not pass
temporaries or free the pointer while the task may run. Reference the symbols
request_, SetRequest, and Reopen() in the comment so readers know where the
lifetime requirement applies.
tests/gc.cpp (1)

172-227: Consider adding cleanup for eviction verifiers.

The test creates 10 MapVerifier instances with SetAutoClean(false) to trigger cache eviction. While the test directory is cleaned up at the end, these verifiers leave behind data in gc_evict partitions that may persist.

If CleanupStore(opts) handles the entire store path including other partitions, this is fine. Otherwise, consider cleaning up the gc_evict partitions explicitly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/gc.cpp` around lines 172 - 227, The test leaves MapVerifier instances
for partitions "gc_evict" alive which can persist data; ensure those evictor
verifiers are destroyed before cleaning the store by explicitly clearing or
resetting the evictors vector (evictors) or letting them go out of scope prior
to calling CleanupStore(opts), or iterate evictors and call any provided cleanup
method on MapVerifier (e.g., via SetAutoClean or a destructor cleanup) so that
the gc_evict partitions are removed before CleanupStore runs.
include/standby_service.h (1)

58-68: Document ownership semantics for raw pointer members in job structs.

ListPartitionsJob::partitions and PrepareManifestJob::source_term are raw pointers without documented lifetime expectations. Callers must ensure these pointers remain valid for the duration of job execution.

Consider adding comments clarifying that these are non-owning pointers that must outlive the job, or use std::reference_wrapper to make the intent clearer.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/standby_service.h` around lines 58 - 68,
ListPartitionsJob::partitions and PrepareManifestJob::source_term are raw
pointers with no documented ownership/lifetime; add a brief comment above each
member stating they are non-owning and callers must ensure the pointed-to
objects outlive the job, or alternatively change the member types to express
non-ownership explicitly (e.g., std::reference_wrapper<std::vector<std::string>>
or std::span<const std::string> for partitions, and
std::reference_wrapper<uint64_t> or uint64_t& for source_term) to make lifetime
intent clear; update constructors/usage where these structs are created to match
the chosen convention.
include/eloq_store.h (1)

764-769: Minor: Consider using atomic<RunningStatus> for type safety.

The running_status_ uses atomic<uint8_t> with casts, while store_mode_ uses atomic<StoreMode> directly. Using atomic<RunningStatus> would provide better type safety and be consistent with store_mode_.

♻️ Suggested change for consistency
-    std::atomic<uint8_t> running_status_{
-        static_cast<uint8_t>(RunningStatus::Stopped)};
+    std::atomic<RunningStatus> running_status_{RunningStatus::Stopped};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/eloq_store.h` around lines 764 - 769, Change running_status_ from
std::atomic<uint8_t> to std::atomic<RunningStatus> to match store_mode_ and gain
type safety: replace the declaration of running_status_ with
std::atomic<RunningStatus> running_status_{RunningStatus::Stopped}; remove all
static_cast<uint8_t> uses around RunningStatus when initializing or
reading/writing running_status_; and update any code that currently treats
running_status_ as a uint8_t to use RunningStatus values or explicit casts only
at API boundaries.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@include/async_io_manager.h`:
- Around line 1083-1112: StandbyStoreMgr lacks an explicit destructor, so Stop()
(and thus WaitForStandbyTasksToDrain()) may never run on destruction; add a
public destructor ~StandbyStoreMgr() that calls Stop() to ensure
WaitForStandbyTasksToDrain() runs and inflight_standby_tasks_ drains before
teardown. Locate the StandbyStoreMgr class declaration and implement the
destructor (and definition in the .cpp) to invoke Stop(), keeping behavior
idempotent if Stop() may already have been called. Ensure the implementation
references Stop() and does not throw.

In `@src/async_io_manager.cpp`:
- Around line 4367-4395: DeleteArchive currently only unlinks the file locally
but leaves cache accounting in closed_files_ and used_local_space_ unchanged;
after a successful local unlink (local_unlink_res >= 0) construct the same
FileKey used for closed_files_ (e.g., FileKey(tbl_id, term, tag) or the
key-building helper used elsewhere), remove that entry from closed_files_ under
the same mutex/lock used for closed_files_ updates, and decrement
used_local_space_ by the archive/manifest size (compute size the same way other
code does for manifest bookkeeping) so the cache reservation is released; ensure
these bookkeeping changes happen before submitting the cloud DeleteTask and are
thread-safe.

In `@src/eloq_store.cpp`:
- Around line 401-405: The fail_start lambda currently only sets running_status_
back to RunningStatus::Stopped which leaves partially-initialized state
(root_fds_, shards_, and service objects) behind when InitStoreSpace() succeeded
but a later Shard::Init() fails; modify fail_start (and the similar failure
handling around the other block at lines 490-526) to fully roll back created
state: close and clear root_fds_, destroy or clear partially-created shard
objects in shards_, and reset or release any service objects created during
startup before storing RunningStatus::Stopped and returning the KvError so a
subsequent retry starts from a clean state; reference the fail_start lambda,
InitStoreSpace(), Shard::Init(), root_fds_, shards_, and RunningStatus::Stopped
to locate the code to update.
- Around line 361-375: UpdateStandbyMasterAddr currently updates options_ and
standby_service_ but does not retarget already-constructed StandbyStoreMgr
instances which copied standby_master_addr into their own remote_addr_; as a
result live shards keep using the old master. Modify UpdateStandbyMasterAddr to
propagate the new address into every shard IO manager (or call a method on each
StandbyStoreMgr to update its remote_addr_), e.g., iterate your shard manager
collection and call something like SetRemoteAddr(new_addr) on each
StandbyStoreMgr or make StandbyStoreMgr read options_.standby_master_addr on
demand and call its UpdateRemoteAddr equivalent so rsync-based fetches use the
updated master address.
- Around line 1393-1484: The current HandleGlobalListArchiveTagsRequest walks
local options_.store_path directories and thus misses manifests only present in
cloud/object-store; replace the local filesystem traversal with the object-store
listing used elsewhere (e.g., call the existing object-store list API or the
helper that mirrors object-store listings for archives) to enumerate manifest
objects instead of using fs::directory_iterator. Keep the same parsing/filtering
logic (use ParseFileName and ParseManifestFileSuffix to extract term and tag,
apply req->prefix_ filter, push_back ArchiveEntry, then sort+unique) but obtain
filenames from the object-store listing results (and handle pagination/errors)
rather than local files so GlobalListArchiveTags returns a complete tag set.
Ensure you still call req->SetDone(KvError::NoError) at the end.
- Around line 1234-1321: The non-StandbyReplica branch always scans
options_.store_path, which misses partitions present only in cloud storage;
update the else branch so when options_.cloud_store_path is non-empty you
enumerate the cloud store (instead of or in addition to options_.store_path) to
build partitions. Locate the else branch that iterates options_.store_path and
change the source of roots to options_.cloud_store_path when set (or merge both
lists), reusing TableIdent::FromString, the existing partition_filter check, and
error handling (req->SetDone/ToKvError) so cloud-only partitions get added to
partitions for the global reopen. Ensure Mode()/StoreMode::StandbyReplica logic
and the StandbyReplica path remain unchanged.

In `@src/standby_service.cpp`:
- Around line 1264-1273: The pidfd_open failure path can leak the
already-spawned child (spawn occurred earlier) — modify the error branch after
pidfd_open to ensure the child is reaped before returning: call kill(child_pid,
SIGKILL) (or SIGTERM first if preferred), then waitpid(child_pid, &status, 0) to
reap it, close and reset *stdout_fd as already done, log the error with
saved_errno, and then return ToKvError(-saved_errno); alternatively, if you
prefer the fallback polling behavior, treat non-ENOSYS/EINVAL/EPERM pidfd_open
failures as non-fatal by not returning an error and instead proceed to the
existing polling fallback path; update the code around pidfd_open, child_pid,
*stdout_fd, LOG and ToKvError accordingly.

In `@src/storage/object_store.cpp`:
- Around line 816-819: The error log is printing raw request data via
task->json_data_ which may contain full URLs and sensitive query parameters;
update the logging in the HTTP error path to redact query parameters before
including the URL in the log: parse the URL(s) found in task->json_data_ (or
detect the first '?' in the URL string) and replace or remove the query string
portion (e.g., keep scheme+host+path and append "?REDACTED") prior to
constructing the LOG(ERROR) message in the same block where response_code,
task->Info(), response_body are logged; ensure the sanitized value replaces
task->json_data_ in the log so no raw query/token data is emitted.

In `@tests/cloud.cpp`:
- Around line 146-162: The test uses reinterpret_cast<EloqStoreAccessor*>(store)
in PrimaryShard to read private members (shards_), which relies on the exact
memory layout of EloqStore and is brittle; replace this by adding a safe test
accessor: either make a friend test accessor class or add a test-only getter on
EloqStore (e.g., GetShardsForTest or friend TestAccessor) that returns a
pointer/reference to the shards_ or the primary shard, then update PrimaryShard
to call that accessor instead of performing reinterpret_cast; locate the
reinterpret_cast in PrimaryShard and the EloqStoreAccessor struct to change
usage to the new accessor method or friend class.

---

Outside diff comments:
In `@include/tasks/background_write.h`:
- Around line 3-6: The header is missing an explicit include for
std::string_view used in the CreateArchive declaration; add `#include`
<string_view> near the other includes in include/tasks/background_write.h so
CreateArchive and any other uses of std::string_view compile reliably without
relying on transitive includes.

In `@src/async_io_manager.cpp`:
- Around line 3948-3975: The code currently constructs selected_filename with
ArchiveName(ProcessTerm(), archive_tag) and then downloads it, which fails when
the requested tag exists only under an older term; change the logic to first
resolve which term owns the given archive_tag (e.g. implement or call a helper
like ResolveTermForArchiveTag(archive_tag) that queries object store for
manifest_<term>_<tag> or lists/archive manifests to find the matching term), set
selected_term to that resolved term (with an optional fallback to ProcessTerm()
if no manifest is found), then compute selected_filename =
ArchiveName(selected_term, archive_tag) and invoke the existing
download_to_buffer(lambda) using that filename; keep existing error handling and
buffer acquisition but ensure the resolution step runs before download_to_buffer
so reopen uses the manifest's term, not the current ProcessTerm().

In `@src/file_gc.cpp`:
- Around line 367-418: GetOrUpdateArchivedMaxFileId currently selects a single
"latest" archive using archive_tags (numeric/lexicographic), which is invalid
because tags are arbitrary; instead, iterate over all archive_files (and their
metadata/snapshots) to compute the maximum archived file id and use that to set
least_not_archived_file_id. Concretely, in GetOrUpdateArchivedMaxFileId replace
the tag-based selection logic (variables: latest_archive, latest_tag,
latest_is_numeric, latest_numeric and the for-loop decision) with code that for
each archive_files[i] reads the archive's snapshot/version metadata (or
otherwise extracts its max-file-id) and tracks the overall maximum file id
across all retained archives, then set least_not_archived_file_id to that
maximum and cache it in io_mgr->least_not_archived_file_ids_.

---

Duplicate comments:
In `@src/async_io_manager.cpp`:
- Around line 3254-3298: TrimRestoredCacheUsage deletes restored files but
doesn’t clean up now-empty partition directories, leaving empty local dirs after
startup trims; update CloudStoreMgr::TrimRestoredCacheUsage to record the
partition identity (e.g. from FileKey.tbl_id_ or the parent directory of
file_path) for every successful remove + victim->Deque() +
closed_files_.erase(key_copy), collect them into a set of touched partitions,
and after the trimming loop call TryCleanupLocalPartitionDir() for each unique
partition to remove empty dirs; ensure you only call
TryCleanupLocalPartitionDir() for partitions where deletion succeeded (and after
updating used_local_space_) to avoid unnecessary work.

In `@tests/cloud.cpp`:
- Around line 72-88: ListLocalPartitionFiles currently calls
std::filesystem::exists() and std::filesystem::directory_iterator(...) which can
throw if the directory is removed concurrently; change to use the error_code
overloads to avoid exceptions: call std::filesystem::exists(partition_path, ec)
and if it returns false (or ec set) return empty vector, then construct
std::filesystem::directory_iterator(partition_path, ec) and if ec is set return
empty vector, otherwise iterate the iterator normally and push regular file
names; reference function ListLocalPartitionFiles and use a local
std::error_code variable for both checks.

---

Nitpick comments:
In `@include/async_io_manager.h`:
- Around line 1020-1021: Add a brief comment above the declarations of
pending_gc_cleanup_ and pending_gc_cleanup_queue_ explaining their relationship:
pending_gc_cleanup_ is an unordered_set used for O(1) membership checks to
prevent duplicate enqueueing, and pending_gc_cleanup_queue_ is a deque that
preserves FIFO ordering for processing cleanup tasks; also mention the invariant
that both containers must be kept in sync when pushing or popping FileKey
entries (i.e., insert into set when enqueuing, remove from set when dequeuing or
canceling). This clarifies intent for future maintainers and points to the
synchronization requirement for any code interacting with these containers.

In `@include/eloq_store.h`:
- Around line 764-769: Change running_status_ from std::atomic<uint8_t> to
std::atomic<RunningStatus> to match store_mode_ and gain type safety: replace
the declaration of running_status_ with std::atomic<RunningStatus>
running_status_{RunningStatus::Stopped}; remove all static_cast<uint8_t> uses
around RunningStatus when initializing or reading/writing running_status_; and
update any code that currently treats running_status_ as a uint8_t to use
RunningStatus values or explicit casts only at API boundaries.

In `@include/standby_service.h`:
- Around line 58-68: ListPartitionsJob::partitions and
PrepareManifestJob::source_term are raw pointers with no documented
ownership/lifetime; add a brief comment above each member stating they are
non-owning and callers must ensure the pointed-to objects outlive the job, or
alternatively change the member types to express non-ownership explicitly (e.g.,
std::reference_wrapper<std::vector<std::string>> or std::span<const std::string>
for partitions, and std::reference_wrapper<uint64_t> or uint64_t& for
source_term) to make lifetime intent clear; update constructors/usage where
these structs are created to match the chosen convention.

In `@include/storage/index_page_manager.h`:
- Around line 61-65: Update the doc comment for InstallExternalSnapshot to
mention the new reopen_tag parameter and its purpose: explain that reopen_tag
(std::string_view reopen_tag = {}) is an optional tag used when reopening or
annotating the installed external snapshot (e.g., to record provenance, trigger
reopening behavior, or associate a manifest/reopen marker) and that it defaults
to empty to preserve backward compatibility; modify the comment immediately
above the InstallExternalSnapshot declaration to include a short phrase about
reopen_tag and its default behavior to help future maintainers locate and
understand this parameter.

In `@include/tasks/reopen_task.h`:
- Around line 16-22: Add a clear ownership comment above the private member
request_ (and/or above SetRequest) stating the ownership contract:
SetRequest(ReopenRequest *req) does not take ownership or perform copies, the
caller retains ownership and must ensure the ReopenRequest object outlives the
task execution and any call to Reopen(); do not pass temporaries or free the
pointer while the task may run. Reference the symbols request_, SetRequest, and
Reopen() in the comment so readers know where the lifetime requirement applies.

In `@include/utils.h`:
- Around line 255-262: The if (total_slots == 0) branch is dead and should be
removed: delete the entire conditional that sets total_slots = normalized.size()
and resets normalized weights to 1, and rely on the existing normalization and
non-empty weights guard (weights, normalized) to guarantee total_slots > 0;
optionally replace the removed branch with a debug assert or comment if you want
to document the invariant that total_slots >= normalized.size(). Ensure
references to total_slots and normalized in surrounding code remain correct
after removal.

In `@tests/gc.cpp`:
- Around line 172-227: The test leaves MapVerifier instances for partitions
"gc_evict" alive which can persist data; ensure those evictor verifiers are
destroyed before cleaning the store by explicitly clearing or resetting the
evictors vector (evictors) or letting them go out of scope prior to calling
CleanupStore(opts), or iterate evictors and call any provided cleanup method on
MapVerifier (e.g., via SetAutoClean or a destructor cleanup) so that the
gc_evict partitions are removed before CleanupStore runs.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: aa40f172-d017-4fc9-b5a0-fc87110fd7ab

📥 Commits

Reviewing files that changed from the base of the PR and between 4c1a741 and 53d1d22.

📒 Files selected for processing (48)
  • CMakeLists.txt
  • README.md
  • include/async_io_manager.h
  • include/cloud_storage_service.h
  • include/common.h
  • include/eloq_store.h
  • include/error.h
  • include/file_gc.h
  • include/kv_options.h
  • include/standby_service.h
  • include/storage/index_page_manager.h
  • include/storage/object_store.h
  • include/storage/shard.h
  • include/tasks/archive_crond.h
  • include/tasks/background_write.h
  • include/tasks/list_standby_partition_task.h
  • include/tasks/reopen_task.h
  • include/tasks/task.h
  • include/tasks/task_manager.h
  • include/tasks/write_task.h
  • include/types.h
  • include/utils.h
  • rust/eloqstore-sys/vendor/CMakeLists.txt
  • src/async_io_manager.cpp
  • src/cloud_storage_service.cpp
  • src/eloq_store.cpp
  • src/file_gc.cpp
  • src/kv_options.cpp
  • src/replayer.cpp
  • src/standby_service.cpp
  • src/storage/index_page_manager.cpp
  • src/storage/object_store.cpp
  • src/storage/shard.cpp
  • src/tasks/archive_crond.cpp
  • src/tasks/background_write.cpp
  • src/tasks/prewarm_task.cpp
  • src/tasks/reopen_task.cpp
  • src/tasks/task_manager.cpp
  • src/tasks/write_task.cpp
  • src/test_utils.cpp
  • tests/CMakeLists.txt
  • tests/cloud.cpp
  • tests/eloq_store_test.cpp
  • tests/filename_parsing.cpp
  • tests/gc.cpp
  • tests/manifest.cpp
  • tests/replayer_term.cpp
  • tests/standby.cpp
💤 Files with no reviewable changes (1)
  • include/error.h

Comment on lines +1083 to +1112
class StandbyStoreMgr final : public IouringMgr
{
public:
StandbyStoreMgr(const KvOptions *opts, uint32_t fd_limit);
void Stop() override;
void SetProcessTerm(uint64_t term)
{
process_term_ = term;
}
uint64_t ProcessTerm() const override
{
return process_term_;
}

std::pair<ManifestFilePtr, KvError> GetManifest(
const TableIdent &tbl_id) override;

std::pair<ManifestFilePtr, KvError> RefreshManifest(
const TableIdent &tbl_id);

private:
void WaitForStandbyTasksToDrain();
std::string BuildRemoteFilePath(const TableIdent &tbl_id,
std::string_view filename) const;
int RunRsync(const std::string &remote, const std::string &dst);
std::atomic<size_t> inflight_standby_tasks_{0};
uint64_t process_term_{0};

std::string remote_addr_;
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for StandbyStoreMgr destructor implementation
ast-grep --pattern $'class StandbyStoreMgr {
  $$$
  ~StandbyStoreMgr() {
    $$$
  }
  $$$
}'

# Search for Stop() implementation in StandbyStoreMgr
rg -n -A 10 'StandbyStoreMgr::Stop'

Repository: eloqdata/eloqstore

Length of output: 628


🏁 Script executed:

# Search for StandbyStoreMgr destructor definition with broader pattern
rg -n '~StandbyStoreMgr' src/

# Check the class definition in header for destructor declaration
rg -n -B 5 -A 15 'class StandbyStoreMgr final' include/

Repository: eloqdata/eloqstore

Length of output: 1209


🏁 Script executed:

# Search for IouringMgr destructor
rg -n '~IouringMgr' src/

# Check IouringMgr class definition and destructor
rg -n -B 5 -A 20 'class IouringMgr' include/async_io_manager.h | head -60

Repository: eloqdata/eloqstore

Length of output: 994


🏁 Script executed:

# Check IouringMgr destructor implementation
sed -n '199,250p' src/async_io_manager.cpp

Repository: eloqdata/eloqstore

Length of output: 1275


Add explicit destructor to ensure WaitForStandbyTasksToDrain() is called.

StandbyStoreMgr has no explicit destructor; the default destructor calls IouringMgr::~IouringMgr(), which does not call Stop(). This means WaitForStandbyTasksToDrain() is only invoked if Stop() is explicitly called before destruction, leaving inflight standby tasks unguarded. Add an explicit destructor that calls Stop() to ensure safe cleanup during object destruction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/async_io_manager.h` around lines 1083 - 1112, StandbyStoreMgr lacks
an explicit destructor, so Stop() (and thus WaitForStandbyTasksToDrain()) may
never run on destruction; add a public destructor ~StandbyStoreMgr() that calls
Stop() to ensure WaitForStandbyTasksToDrain() runs and inflight_standby_tasks_
drains before teardown. Locate the StandbyStoreMgr class declaration and
implement the destructor (and definition in the .cpp) to invoke Stop(), keeping
behavior idempotent if Stop() may already have been called. Ensure the
implementation references Stop() and does not throw.

Comment on lines +4367 to +4395
KvError CloudStoreMgr::DeleteArchive(const TableIdent &tbl_id,
uint64_t term,
std::string_view tag)
{
if (term == std::numeric_limits<uint64_t>::max())
{
term = ProcessTerm();
}
const std::string name = ArchiveName(term, tag);
auto [dir_fd, open_err] = OpenFD(tbl_id, LruFD::kDirectory, false, 0);
CHECK_KV_ERR(open_err);
const int local_unlink_res = UnlinkAt(dir_fd.FdPair(), name.c_str(), false);
if (local_unlink_res < 0 && local_unlink_res != -ENOENT)
{
return ToKvError(local_unlink_res);
}

KvTask *current_task = ThdTask();
ObjectStore::DeleteTask delete_task(tbl_id.ToString() + "/" + name);
delete_task.SetKvTask(current_task);
AcquireCloudSlot(current_task);
obj_store_.SubmitTask(&delete_task, shard);
current_task->WaitIo();
if (delete_task.error_ == KvError::NotFound)
{
return KvError::NoError;
}
return delete_task.error_;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

DeleteArchive leaves stale local-cache accounting behind.

After the local unlink succeeds, the archive is still left in closed_files_ and used_local_space_ is unchanged. That makes cache reservation think deleted archives still consume space, and the cleaner can later try to evict a file that no longer exists. Remove the FileKey and release the manifest-sized charge when local deletion succeeds.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 4367 - 4395, DeleteArchive currently
only unlinks the file locally but leaves cache accounting in closed_files_ and
used_local_space_ unchanged; after a successful local unlink (local_unlink_res
>= 0) construct the same FileKey used for closed_files_ (e.g., FileKey(tbl_id,
term, tag) or the key-building helper used elsewhere), remove that entry from
closed_files_ under the same mutex/lock used for closed_files_ updates, and
decrement used_local_space_ by the archive/manifest size (compute size the same
way other code does for manifest bookkeeping) so the cache reservation is
released; ensure these bookkeeping changes happen before submitting the cloud
DeleteTask and are thread-safe.

Comment on lines +401 to +405
auto fail_start = [this](KvError err)
{
KvError err = InitStoreSpace();
CHECK_KV_ERR(err);
running_status_.store(static_cast<uint8_t>(RunningStatus::Stopped),
std::memory_order_release);
return err;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Start failures leave partially initialized state behind.

fail_start() only flips running_status_ back to Stopped. If InitStoreSpace() succeeds and a later Shard::Init() fails, root_fds_, shards_, and service objects remain populated, so a retry can hit assert(root_fds_.empty()) or reuse half-initialized shards. Roll back the created state before returning.

Also applies to: 490-526

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/eloq_store.cpp` around lines 401 - 405, The fail_start lambda currently
only sets running_status_ back to RunningStatus::Stopped which leaves
partially-initialized state (root_fds_, shards_, and service objects) behind
when InitStoreSpace() succeeded but a later Shard::Init() fails; modify
fail_start (and the similar failure handling around the other block at lines
490-526) to fully roll back created state: close and clear root_fds_, destroy or
clear partially-created shard objects in shards_, and reset or release any
service objects created during startup before storing RunningStatus::Stopped and
returning the KvError so a subsequent retry starts from a clean state; reference
the fail_start lambda, InitStoreSpace(), Shard::Init(), root_fds_, shards_, and
RunningStatus::Stopped to locate the code to update.

Comment on lines 1234 to 1321
std::vector<TableIdent> partitions;
std::error_code ec;
for (const fs::path root : options_.store_path)
if (Mode() == StoreMode::StandbyReplica)
{
const fs::path db_path(root);
fs::directory_iterator dir_it(db_path, ec);
if (ec)
std::vector<std::string> names;
ListStandbyPartitionRequest list_request(&names);
#ifdef ELOQ_MODULE_ENABLED
{
req->SetDone(ToKvError(-ec.value()));
std::lock_guard<bthread::Mutex> lk(list_request.mutex_);
list_request.done_ = false;
}
#else
list_request.done_.store(false, std::memory_order_relaxed);
#endif
list_request.err_ = KvError::NoError;
shards_[utils::RandomInt(static_cast<int>(shards_.size()))]
->AddKvRequest(&list_request);
list_request.Wait();
if (list_request.Error() != KvError::NoError)
{
req->SetDone(list_request.Error());
return;
}
fs::directory_iterator end;
for (; dir_it != end; dir_it.increment(ec))

partitions.reserve(names.size());
for (const std::string &name : names)
{
if (ec)
TableIdent tbl_id = TableIdent::FromString(name);
if (!tbl_id.IsValid())
{
req->SetDone(ToKvError(-ec.value()));
return;
continue;
}
if (options_.partition_filter && !options_.partition_filter(tbl_id))
{
continue;
}
const fs::directory_entry &ent = *dir_it;
const fs::path ent_path = ent.path();
bool is_dir = fs::is_directory(ent_path, ec);
partitions.emplace_back(std::move(tbl_id));
}
}
else
{
std::error_code ec;
for (const fs::path root : options_.store_path)
{
const fs::path db_path(root);
fs::directory_iterator dir_it(db_path, ec);
if (ec)
{
req->SetDone(ToKvError(-ec.value()));
return;
}
if (!is_dir)
fs::directory_iterator end;
for (; dir_it != end; dir_it.increment(ec))
{
continue;
}
if (ec)
{
req->SetDone(ToKvError(-ec.value()));
return;
}
const fs::directory_entry &ent = *dir_it;
const fs::path ent_path = ent.path();
bool is_dir = fs::is_directory(ent_path, ec);
if (ec)
{
req->SetDone(ToKvError(-ec.value()));
return;
}
if (!is_dir)
{
continue;
}

TableIdent tbl_id = TableIdent::FromString(ent_path.filename());
if (tbl_id.tbl_name_.empty())
{
LOG(WARNING) << "unexpected partition " << ent.path();
continue;
}
TableIdent tbl_id = TableIdent::FromString(ent_path.filename());
if (tbl_id.tbl_name_.empty())
{
LOG(WARNING) << "unexpected partition " << ent.path();
continue;
}

if (options_.partition_filter && !options_.partition_filter(tbl_id))
{
continue;
}
if (options_.partition_filter &&
!options_.partition_filter(tbl_id))
{
continue;
}

partitions.emplace_back(std::move(tbl_id));
partitions.emplace_back(std::move(tbl_id));
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Cloud-mode global reopen only sees locally cached partitions.

Outside standby-replica mode this code always scans options_.store_path. On a fresh cloud node, or after local cache eviction, partitions that exist only in object storage will never get a ReopenRequest, so the global reopen becomes incomplete or a no-op. Use the cloud listing path when cloud_store_path is set.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/eloq_store.cpp` around lines 1234 - 1321, The non-StandbyReplica branch
always scans options_.store_path, which misses partitions present only in cloud
storage; update the else branch so when options_.cloud_store_path is non-empty
you enumerate the cloud store (instead of or in addition to options_.store_path)
to build partitions. Locate the else branch that iterates options_.store_path
and change the source of roots to options_.cloud_store_path when set (or merge
both lists), reusing TableIdent::FromString, the existing partition_filter
check, and error handling (req->SetDone/ToKvError) so cloud-only partitions get
added to partitions for the global reopen. Ensure
Mode()/StoreMode::StandbyReplica logic and the StandbyReplica path remain
unchanged.

Comment on lines +1393 to +1484
void EloqStore::HandleGlobalListArchiveTagsRequest(
GlobalListArchiveTagsRequest *req)
{
req->entries_.clear();
std::error_code ec;
for (const std::string &root : options_.store_path)
{
fs::directory_iterator part_it(fs::path(root), ec);
if (ec)
{
ec.clear();
continue;
}

for (; part_it != fs::directory_iterator{}; part_it.increment(ec))
{
if (ec)
{
ec.clear();
break;
}
if (!part_it->is_directory(ec))
{
ec.clear();
continue;
}

fs::directory_iterator file_it(part_it->path(), ec);
if (ec)
{
ec.clear();
continue;
}
for (; file_it != fs::directory_iterator{}; file_it.increment(ec))
{
if (ec)
{
ec.clear();
break;
}
if (!file_it->is_regular_file(ec))
{
ec.clear();
continue;
}

const std::string filename =
file_it->path().filename().string();
auto [type, suffix] = ParseFileName(filename);
if (type != FileNameManifest)
{
continue;
}

uint64_t term = 0;
std::optional<std::string> tag;
if (!ParseManifestFileSuffix(suffix, term, tag) ||
!tag.has_value())
{
continue;
}
if (!req->prefix_.empty() && tag->rfind(req->prefix_, 0) != 0)
{
continue;
}
req->entries_.push_back(
GlobalListArchiveTagsRequest::ArchiveEntry{
.term = term, .tag = std::move(*tag)});
}
}
}

std::sort(req->entries_.begin(),
req->entries_.end(),
[](const GlobalListArchiveTagsRequest::ArchiveEntry &lhs,
const GlobalListArchiveTagsRequest::ArchiveEntry &rhs)
{
if (lhs.tag != rhs.tag)
{
return lhs.tag < rhs.tag;
}
return lhs.term < rhs.term;
});
req->entries_.erase(
std::unique(req->entries_.begin(),
req->entries_.end(),
[](const GlobalListArchiveTagsRequest::ArchiveEntry &lhs,
const GlobalListArchiveTagsRequest::ArchiveEntry &rhs)
{ return lhs.term == rhs.term && lhs.tag == rhs.tag; }),
req->entries_.end());
req->SetDone(KvError::NoError);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

ListArchiveTags is incomplete in cloud deployments.

This handler only inspects local directories, so any archive manifest that has been evicted from cache or was never downloaded is invisible. In cloud mode that makes GlobalListArchiveTags return only a partial tag set. Mirror the object-store listing path here instead of walking store_path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/eloq_store.cpp` around lines 1393 - 1484, The current
HandleGlobalListArchiveTagsRequest walks local options_.store_path directories
and thus misses manifests only present in cloud/object-store; replace the local
filesystem traversal with the object-store listing used elsewhere (e.g., call
the existing object-store list API or the helper that mirrors object-store
listings for archives) to enumerate manifest objects instead of using
fs::directory_iterator. Keep the same parsing/filtering logic (use ParseFileName
and ParseManifestFileSuffix to extract term and tag, apply req->prefix_ filter,
push_back ArchiveEntry, then sort+unique) but obtain filenames from the
object-store listing results (and handle pagination/errors) rather than local
files so GlobalListArchiveTags returns a complete tag set. Ensure you still call
req->SetDone(KvError::NoError) at the end.

Comment on lines +1264 to +1273
const int saved_errno = errno;
if (*stdout_fd >= 0)
{
close(*stdout_fd);
*stdout_fd = -1;
}
LOG(ERROR) << "StandbyService: pidfd_open failed: "
<< strerror(saved_errno);
return ToKvError(-saved_errno);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential zombie process leak on unexpected pidfd_open failure.

If pidfd_open fails with an error other than ENOSYS, EINVAL, or EPERM, the child process has already been spawned (line 1230) but will not be tracked. The caller receives an error and won't monitor this process, causing it to become a zombie when it exits.

Consider either:

  1. Killing the child process before returning the error, or
  2. Treating all pidfd_open failures as non-fatal (like the handled cases) since the polling fallback exists
🐛 Proposed fix to prevent zombie
     const int saved_errno = errno;
     if (*stdout_fd >= 0)
     {
         close(*stdout_fd);
         *stdout_fd = -1;
     }
+    // Kill the orphaned child to prevent zombie
+    kill(*pid, SIGKILL);
+    waitpid(*pid, nullptr, 0);
+    *pid = -1;
     LOG(ERROR) << "StandbyService: pidfd_open failed: "
                << strerror(saved_errno);
     return ToKvError(-saved_errno);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/standby_service.cpp` around lines 1264 - 1273, The pidfd_open failure
path can leak the already-spawned child (spawn occurred earlier) — modify the
error branch after pidfd_open to ensure the child is reaped before returning:
call kill(child_pid, SIGKILL) (or SIGTERM first if preferred), then
waitpid(child_pid, &status, 0) to reap it, close and reset *stdout_fd as already
done, log the error with saved_errno, and then return ToKvError(-saved_errno);
alternatively, if you prefer the fallback polling behavior, treat
non-ENOSYS/EINVAL/EPERM pidfd_open failures as non-fatal by not returning an
error and instead proceed to the existing polling fallback path; update the code
around pidfd_open, child_pid, *stdout_fd, LOG and ToKvError accordingly.

Comment on lines +816 to +819
LOG(ERROR) << "HTTP error: " << response_code
<< ", task=" << task->Info()
<< ", request=" << task->json_data_
<< ", response_body=" << response_body;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging raw request URLs in error logs.

At Line [818], task->json_data_ can carry full request URLs; if query-signed/auth-bearing URLs are used by a backend, this can leak credentials/tokens into logs. Please redact query parameters before logging.

🔒 Suggested hardening (redact URL query before logging)
+namespace {
+std::string RedactUrlForLog(std::string_view url)
+{
+    size_t q = url.find('?');
+    if (q == std::string_view::npos)
+    {
+        return std::string(url);
+    }
+    std::string out(url.substr(0, q));
+    out += "?<redacted>";
+    return out;
+}
+}  // namespace
...
-                    LOG(ERROR) << "HTTP error: " << response_code
+                    const std::string safe_request =
+                        RedactUrlForLog(task->json_data_);
+                    LOG(ERROR) << "HTTP error: " << response_code
                                << ", task=" << task->Info()
-                               << ", request=" << task->json_data_
+                               << ", request=" << safe_request
                                << ", response_body=" << response_body;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_store.cpp` around lines 816 - 819, The error log is
printing raw request data via task->json_data_ which may contain full URLs and
sensitive query parameters; update the logging in the HTTP error path to redact
query parameters before including the URL in the log: parse the URL(s) found in
task->json_data_ (or detect the first '?' in the URL string) and replace or
remove the query string portion (e.g., keep scheme+host+path and append
"?REDACTED") prior to constructing the LOG(ERROR) message in the same block
where response_code, task->Info(), response_body are logged; ensure the
sanitized value replaces task->json_data_ in the log so no raw query/token data
is emitted.

Comment on lines +146 to +162
struct EloqStoreAccessor
{
eloqstore::KvOptions options_;
std::vector<int> root_fds_;
std::unique_ptr<eloqstore::CloudStorageService> cloud_service_;
std::vector<std::unique_ptr<eloqstore::Shard>> shards_;
};

eloqstore::Shard *PrimaryShard(eloqstore::EloqStore *store)
{
auto *access = reinterpret_cast<EloqStoreAccessor *>(store);
if (access->shards_.empty())
{
return nullptr;
}
return access->shards_.front().get();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the actual layout of EloqStore to verify it matches EloqStoreAccessor
ast-grep --pattern $'class EloqStore {
  $$$
}'

Repository: eloqdata/eloqstore

Length of output: 44


🏁 Script executed:

# Search for EloqStore class definition
rg "class EloqStore" -A 15

# Also search in header files specifically
fd -e "hpp" -e "h" | xargs rg "class EloqStore" -A 15

Repository: eloqdata/eloqstore

Length of output: 22213


🏁 Script executed:

# Get the full EloqStore class definition
rg "^class EloqStore" -A 100 include/eloq_store.h | head -150

Repository: eloqdata/eloqstore

Length of output: 3094


🏁 Script executed:

# Search for the private members section of EloqStore
rg "class EloqStore" -A 300 include/eloq_store.h | grep -A 200 "^private:" | head -80

Repository: eloqdata/eloqstore

Length of output: 2380


🏁 Script executed:

# Get the line number where EloqStore class is defined and read around it
line_num=$(rg "^class EloqStore" -n include/eloq_store.h | head -1 | cut -d: -f1)
echo "EloqStore class starts at line: $line_num"

# Read from that line forward to find private members
tail -n +$line_num include/eloq_store.h | head -300 | grep -A 50 "^private:"

Repository: eloqdata/eloqstore

Length of output: 1695


🏁 Script executed:

# Check where PrimaryShard and EloqStoreAccessor are used
rg "PrimaryShard|EloqStoreAccessor" tests/

# Also verify this is test-only by checking if it's used in production code
rg "PrimaryShard|EloqStoreAccessor" src/

Repository: eloqdata/eloqstore

Length of output: 326


reinterpret_cast to EloqStoreAccessor relies on exact memory layout matching.

The cast assumes EloqStore's private members (options_, root_fds_, cloud_service_, shards_) appear in this exact order with no intervening members. While currently safe, this is fragile—any reordering of these fields silently breaks the test. Consider adding a friend test accessor class or exposing a test-only getter API instead of relying on layout assumptions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/cloud.cpp` around lines 146 - 162, The test uses
reinterpret_cast<EloqStoreAccessor*>(store) in PrimaryShard to read private
members (shards_), which relies on the exact memory layout of EloqStore and is
brittle; replace this by adding a safe test accessor: either make a friend test
accessor class or add a test-only getter on EloqStore (e.g., GetShardsForTest or
friend TestAccessor) that returns a pointer/reference to the shards_ or the
primary shard, then update PrimaryShard to call that accessor instead of
performing reinterpret_cast; locate the reinterpret_cast in PrimaryShard and the
EloqStoreAccessor struct to change usage to the new accessor method or friend
class.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
src/async_io_manager.cpp (3)

3300-3304: ⚠️ Potential issue | 🟠 Major

Deferred GC retries should not make the shard appear busy.

Including pending_gc_cleanup_queue_.empty() in IsIdle() can keep the shard non-idle while cleaner work is deferred/retrying, which interferes with idle detection.

Suggested change
 bool CloudStoreMgr::IsIdle()
 {
     return file_cleaner_.status_ == TaskStatus::Idle &&
-           pending_gc_cleanup_queue_.empty() &&
            active_prewarm_tasks_ == 0 && inflight_cloud_slots_ == 0 &&
            !obj_store_.HasPendingWork();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3300 - 3304, IsIdle() currently
considers pending_gc_cleanup_queue_.empty(), which makes a shard appear busy
when GC work is only deferred/retrying; update CloudStoreMgr::IsIdle to exclude
pending_gc_cleanup_queue_ from the idle check (leave checks for
file_cleaner_.status_ == TaskStatus::Idle, active_prewarm_tasks_, and
inflight_cloud_slots_), or alternatively treat deferred GC entries with a
separate flag (e.g., is_gc_pending_but_deferred) that does not block idleness;
modify the logic around pending_gc_cleanup_queue_ access in
CloudStoreMgr::IsIdle accordingly so deferred/retry GC entries no longer prevent
the shard from being considered idle.

3198-3234: ⚠️ Potential issue | 🟠 Major

Startup partition cleanup still only runs before restore-time trimming.

Directory removal is gated on retained_files computed before TrimRestoredCacheUsage(). Partitions that become empty during trimming are not revisited, so empty dirs can still be left behind.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3198 - 3234, The cleanup currently
computes retained_files and may remove table_path before
TrimRestoredCacheUsage() runs, so partitions that become empty during trimming
are missed; update the flow to perform directory-empty checks after trimming
(i.e., run the removal logic once more or recompute retained_files immediately
after calling TrimRestoredCacheUsage()), using the same removal/error-handling
code around fs::remove and the LOG calls; locate the block that uses
retained_files, max_data_file_idx, EnqueClosedFile, and TrimRestoredCacheUsage()
and move or duplicate the directory removal check so empty dirs produced by
TrimRestoredCacheUsage() are removed and errors are handled the same way.

4367-4395: ⚠️ Potential issue | 🟠 Major

DeleteArchive still leaks local-cache bookkeeping after successful local unlink.

After local delete succeeds, this path does not evict the corresponding FileKey from closed_files_ or release used_local_space_. That can leave stale cache pressure and future evictions targeting already-missing files.

Suggested change
 KvError CloudStoreMgr::DeleteArchive(const TableIdent &tbl_id,
                                      uint64_t term,
                                      std::string_view tag)
 {
@@
     const std::string name = ArchiveName(term, tag);
@@
     const int local_unlink_res = UnlinkAt(dir_fd.FdPair(), name.c_str(), false);
     if (local_unlink_res < 0 && local_unlink_res != -ENOENT)
     {
         return ToKvError(local_unlink_res);
     }
+    if (local_unlink_res == 0)
+    {
+        FileKey key(tbl_id, name);
+        if (DequeClosedFile(key))
+        {
+            const size_t released = EstimateFileSize(name);
+            used_local_space_ =
+                used_local_space_ > released ? used_local_space_ - released : 0;
+        }
+    }
 
     KvTask *current_task = ThdTask();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 4367 - 4395, DeleteArchive currently
unlinks the local file but leaves the local-cache bookkeeping (closed_files_
entry and used_local_space_) intact; after computing name = ArchiveName(term,
tag) and confirming local_unlink_res indicates success (local_unlink_res >= 0),
locate and erase the corresponding FileKey for tbl_id + name from closed_files_
and decrement used_local_space_ by that FileKey's size (using the same
lock/guard used elsewhere when accessing closed_files_), ensuring any associated
resources (e.g., cached fd handles) are released before proceeding to submit the
ObjectStore::DeleteTask.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/async_io_manager.cpp`:
- Around line 3300-3304: IsIdle() currently considers
pending_gc_cleanup_queue_.empty(), which makes a shard appear busy when GC work
is only deferred/retrying; update CloudStoreMgr::IsIdle to exclude
pending_gc_cleanup_queue_ from the idle check (leave checks for
file_cleaner_.status_ == TaskStatus::Idle, active_prewarm_tasks_, and
inflight_cloud_slots_), or alternatively treat deferred GC entries with a
separate flag (e.g., is_gc_pending_but_deferred) that does not block idleness;
modify the logic around pending_gc_cleanup_queue_ access in
CloudStoreMgr::IsIdle accordingly so deferred/retry GC entries no longer prevent
the shard from being considered idle.
- Around line 3198-3234: The cleanup currently computes retained_files and may
remove table_path before TrimRestoredCacheUsage() runs, so partitions that
become empty during trimming are missed; update the flow to perform
directory-empty checks after trimming (i.e., run the removal logic once more or
recompute retained_files immediately after calling TrimRestoredCacheUsage()),
using the same removal/error-handling code around fs::remove and the LOG calls;
locate the block that uses retained_files, max_data_file_idx, EnqueClosedFile,
and TrimRestoredCacheUsage() and move or duplicate the directory removal check
so empty dirs produced by TrimRestoredCacheUsage() are removed and errors are
handled the same way.
- Around line 4367-4395: DeleteArchive currently unlinks the local file but
leaves the local-cache bookkeeping (closed_files_ entry and used_local_space_)
intact; after computing name = ArchiveName(term, tag) and confirming
local_unlink_res indicates success (local_unlink_res >= 0), locate and erase the
corresponding FileKey for tbl_id + name from closed_files_ and decrement
used_local_space_ by that FileKey's size (using the same lock/guard used
elsewhere when accessing closed_files_), ensuring any associated resources
(e.g., cached fd handles) are released before proceeding to submit the
ObjectStore::DeleteTask.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9b521f2c-03c2-48ba-b617-9965b0f0606c

📥 Commits

Reviewing files that changed from the base of the PR and between 53d1d22 and f3c07d7.

📒 Files selected for processing (1)
  • src/async_io_manager.cpp

remove empty dir on start up

fix reopen test failure
if (has_pending_gc)
{
if (req_count == batch_size)
const size_t pending_count = io_mgr_->pending_gc_cleanup_queue_.size();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

req.path_ = file->key_->tbl_id_.ToString();
req.path_ /= file->key_->filename_;

io_uring_sqe *sqe = io_mgr_->GetSQE(UserDataType::BaseReq, &req);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

req.file_ = file;
req.path_ = file->key_->tbl_id_.ToString();
req.path_ /= file->key_->filename_;
io_uring_sqe *sqe = io_mgr_->GetSQE(UserDataType::BaseReq, &req);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

req.path_ = file->key_->tbl_id_.ToString();
req.path_ /= file->key_->filename_;
io_uring_sqe *sqe = io_mgr_->GetSQE(UserDataType::BaseReq, &req);
int root_fd = io_mgr_->GetRootFD(req.file_->key_->tbl_id_).first;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

bool made_progress = false;
std::vector<FileKey> retry_keys;
retry_keys.reserve(io_mgr_->pending_gc_cleanup_queue_.size());
std::unordered_set<TableIdent> touched_partitions;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Add #include <unordered_set> for unordered_set<> [build/include_what_you_use] [4]

}

// 5. delete unreferenced data files.
std::vector<std::string> deleted_filenames;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Add #include for string [build/include_what_you_use] [4]

options.cloud_store_path += "/reopen-global";
options.prewarm_cloud_cache = false;
options.allow_reuse_local_caches = true;
options.pages_per_file_shift = 1; // Keep at least one data file after cache restore.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

auto count_data_files = [&](const eloqstore::TableIdent &tbl_id)
{
const std::filesystem::path partition_path =
std::filesystem::path(options.store_path.front()) / tbl_id.ToString();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]

@liunyl
Copy link
Contributor Author

liunyl commented Mar 18, 2026

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
include/file_gc.h (1)

77-84: ⚠️ Potential issue | 🔴 Critical

Add missing parameter to DeleteUnreferencedCloudFiles declaration.

The header declaration (7 parameters) does not match the definition in src/file_gc.cpp (8 parameters with bool *deleted_current_manifest). External callers will encounter linker errors.

Required fix
 KvError DeleteUnreferencedCloudFiles(
     const TableIdent &tbl_id,
     const std::vector<std::string> &data_files,
     const std::vector<uint64_t> &manifest_terms,
     const RetainedFiles &retained_files,
     FileId least_not_archived_file_id,
     CloudStoreMgr *cloud_mgr,
-    std::vector<std::string> &deleted_filenames);
+    std::vector<std::string> &deleted_filenames,
+    bool *deleted_current_manifest = nullptr);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/file_gc.h` around lines 77 - 84, The declaration of
DeleteUnreferencedCloudFiles in include/file_gc.h is missing the final parameter
bool *deleted_current_manifest present in the implementation; update the
prototype for DeleteUnreferencedCloudFiles to include the bool
*deleted_current_manifest parameter so it exactly matches the definition in
src/file_gc.cpp (keep existing parameter order: TableIdent, data_files,
manifest_terms, RetainedFiles, least_not_archived_file_id, CloudStoreMgr*,
deleted_filenames, bool *deleted_current_manifest) to avoid linker errors and
ensure callers can pass the flag.
♻️ Duplicate comments (2)
src/async_io_manager.cpp (2)

3322-3327: ⚠️ Potential issue | 🟠 Major

IsIdle() should not depend on pending GC retry queue emptiness.

Line 3325 makes deferred cleanup retries look like active work even when file_cleaner_ has already yielded idle, which can interfere with idle-round behavior in shard loops.

Suggested fix
 bool CloudStoreMgr::IsIdle()
 {
     return file_cleaner_.status_ == TaskStatus::Idle &&
-           pending_gc_cleanup_queue_.empty() &&
            active_prewarm_tasks_ == 0 && inflight_cloud_slots_ == 0 &&
            !obj_store_.HasPendingWork();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3322 - 3327, The IsIdle() check
currently treats pending_gc_cleanup_queue_ as active work which incorrectly
prevents IsIdle() from returning true; update CloudStoreMgr::IsIdle() to remove
the pending_gc_cleanup_queue_.empty() condition so idle state only depends on
file_cleaner_.status_, active_prewarm_tasks_, inflight_cloud_slots_, and
obj_store_.HasPendingWork(); locate the IsIdle function and delete the
pending_gc_cleanup_queue_ clause (or replace it with a comment explaining GC
retry work is deferred and should not block idle detection).

3220-3256: ⚠️ Potential issue | 🟠 Major

Startup cleanup still misses partitions emptied by restore-time trimming.

This branch removes directories only when retained_files == 0 immediately after RestoreFilesForTable(). If TrimRestoredCacheUsage() later evicts the last retained file in a partition, that now-empty directory is never revisited and remains on disk.

Please add a post-trim partition-directory cleanup pass (or track touched partitions during trim and remove empty dirs afterward).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 3220 - 3256, The cleanup currently
only removes partition dirs when retained_files == 0 immediately in
RestoreFilesForTable(), but TrimRestoredCacheUsage() can later evict the last
file and leave an empty dir; modify the flow to perform a post-trim cleanup
pass: after calling TrimRestoredCacheUsage() (or inside it), iterate the set of
partitions touched during restore/trim (track touched partition identifiers /
table_path when calling EnqueClosedFile or when TrimRestoredCacheUsage evicts a
file) and for each partition check if the directory is empty and attempt
fs::remove(table_path, remove_ec) using the same error handling used in the
existing block (ignore ENOENT/ENOTEMPTY/EEXIST, log and return ToKvError on
other errors, log INFO on successful removal). Ensure you reference and update
the same symbols (RestoreFilesForTable, TrimRestoredCacheUsage, EnqueClosedFile,
table_path, tbl_id) so empty dirs removed after trimming are not left on disk.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@include/async_io_manager.h`:
- Around line 907-908: Concurrent accesses to pending_gc_cleanup_ and
pending_gc_cleanup_queue_ (used by RequestGcLocalCleanup and the file-cleaner
background task) must be protected by a single mutex so the insert→queue
operation is atomic; add a dedicated mutex (e.g., gc_cleanup_mu_) and wrap all
reads/writes to pending_gc_cleanup_ and pending_gc_cleanup_queue_ with a
std::lock_guard or std::scoped_lock, update RequestGcLocalCleanup to lock before
checking/inserting and pushing to the queue, and update the file cleaner
background task (and any other places that read or pop from these structures) to
acquire the same lock around those operations. Ensure no access to these two
containers occurs without holding gc_cleanup_mu_ so the pair of operations
remains atomic.

In `@include/storage/index_page_manager.h`:
- Line 58: MarkManifestMissing() mutates meta_ (e.g. manifest_size_) after
calling Find() without using the RootMetaMgr Handle/Pin/Unpin or any lock,
risking concurrent access; fix it by acquiring the entry via RootMetaMgr::Handle
(call Find(tbl_ident) to get a Handle), Pin() before mutating meta_ and Unpin()
when done (or wrap the mutation in the existing per-entry mutex if one exists),
or alternatively add explicit synchronization around access to meta_ (e.g. a
std::mutex guarding manifest_size_ updates) and use it in MarkManifestMissing();
if you intentionally rely on single-threaded use, add clear documentation on
MarkManifestMissing() stating that it must only be called from a single-threaded
context.

In `@src/file_gc.cpp`:
- Around line 577-601: The GC loop currently treats KvError::NotFound as a
failure and skips adding such basenames to deleted_filenames (and may set
first_error), leaving stale local files; change the error-check branch in the
loop that examines delete_tasks[i] so that only errors other than
KvError::NotFound are considered failures (i.e., replace the condition that
checks task.error_ != KvError::NoError with one that treats KvError::NotFound as
success), ensuring that basenames_to_delete[i] is appended to deleted_filenames
for both KvError::NoError and KvError::NotFound and that first_error is updated
only for true errors; keep the existing manifest_gone/deleted_current_manifest
logic which already treats NotFound as success.

In `@tests/gc.cpp`:
- Around line 65-82: ListLocalPartitionFiles can throw
std::filesystem::filesystem_error if the partition directory is removed between
the exists() check and directory iteration; wrap the directory iteration in a
try/catch that catches std::filesystem::filesystem_error (or std::exception)
around the fs::directory_iterator loop and skip/continue on error so the
function returns the files it can read instead of propagating. Locate the
iteration over partition_path in ListLocalPartitionFiles (uses opts.store_path,
tbl_id.ToString(), and entry.path()) and add the resilient iteration/exception
handling there.
- Around line 137-170: The tests create a live EloqStore* named store via
InitStore(...) but call CleanupStore(...) while store is still active, which can
race with background IO/GC; fix both tests (the TEST_CASE "local mode truncate
preserves current manifest" and the other at lines 172-227) by stopping/tearing
down the store before calling CleanupStore: explicitly stop the store (e.g.,
call the store shutdown method or delete the store pointer and set store =
nullptr) after the test operations and before CleanupStore(local_gc_opts) to
ensure all background threads have stopped.

---

Outside diff comments:
In `@include/file_gc.h`:
- Around line 77-84: The declaration of DeleteUnreferencedCloudFiles in
include/file_gc.h is missing the final parameter bool *deleted_current_manifest
present in the implementation; update the prototype for
DeleteUnreferencedCloudFiles to include the bool *deleted_current_manifest
parameter so it exactly matches the definition in src/file_gc.cpp (keep existing
parameter order: TableIdent, data_files, manifest_terms, RetainedFiles,
least_not_archived_file_id, CloudStoreMgr*, deleted_filenames, bool
*deleted_current_manifest) to avoid linker errors and ensure callers can pass
the flag.

---

Duplicate comments:
In `@src/async_io_manager.cpp`:
- Around line 3322-3327: The IsIdle() check currently treats
pending_gc_cleanup_queue_ as active work which incorrectly prevents IsIdle()
from returning true; update CloudStoreMgr::IsIdle() to remove the
pending_gc_cleanup_queue_.empty() condition so idle state only depends on
file_cleaner_.status_, active_prewarm_tasks_, inflight_cloud_slots_, and
obj_store_.HasPendingWork(); locate the IsIdle function and delete the
pending_gc_cleanup_queue_ clause (or replace it with a comment explaining GC
retry work is deferred and should not block idle detection).
- Around line 3220-3256: The cleanup currently only removes partition dirs when
retained_files == 0 immediately in RestoreFilesForTable(), but
TrimRestoredCacheUsage() can later evict the last file and leave an empty dir;
modify the flow to perform a post-trim cleanup pass: after calling
TrimRestoredCacheUsage() (or inside it), iterate the set of partitions touched
during restore/trim (track touched partition identifiers / table_path when
calling EnqueClosedFile or when TrimRestoredCacheUsage evicts a file) and for
each partition check if the directory is empty and attempt
fs::remove(table_path, remove_ec) using the same error handling used in the
existing block (ignore ENOENT/ENOTEMPTY/EEXIST, log and return ToKvError on
other errors, log INFO on successful removal). Ensure you reference and update
the same symbols (RestoreFilesForTable, TrimRestoredCacheUsage, EnqueClosedFile,
table_path, tbl_id) so empty dirs removed after trimming are not left on disk.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d97c57f7-af95-4fd7-bb64-8551bc457cc6

📥 Commits

Reviewing files that changed from the base of the PR and between 53d1d22 and 01f40dc.

📒 Files selected for processing (8)
  • include/async_io_manager.h
  • include/file_gc.h
  • include/storage/index_page_manager.h
  • src/async_io_manager.cpp
  • src/file_gc.cpp
  • src/storage/index_page_manager.cpp
  • tests/cloud.cpp
  • tests/gc.cpp
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/storage/index_page_manager.cpp

Comment on lines +907 to +908
void RequestGcLocalCleanup(const TableIdent &tbl_id,
const std::vector<std::string> &filenames);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect all mutation/read sites for pending GC cleanup structures and lock usage.
rg -n "RequestGcLocalCleanup|pending_gc_cleanup_|pending_gc_cleanup_queue_|std::lock_guard|Mutex|mu_" include/async_io_manager.h src/async_io_manager.cpp -C4

Repository: eloqdata/eloqstore

Length of output: 19467


Add synchronization for concurrent access to GC cleanup tracking structures.

RequestGcLocalCleanup() and the file cleaner background task access pending_gc_cleanup_ and pending_gc_cleanup_queue_ without visible synchronization. Protect all accesses to these structures with a consistent lock, ensuring the insert→queue pair remains atomic.

Also applies to: 1022-1023

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/async_io_manager.h` around lines 907 - 908, Concurrent accesses to
pending_gc_cleanup_ and pending_gc_cleanup_queue_ (used by RequestGcLocalCleanup
and the file-cleaner background task) must be protected by a single mutex so the
insert→queue operation is atomic; add a dedicated mutex (e.g., gc_cleanup_mu_)
and wrap all reads/writes to pending_gc_cleanup_ and pending_gc_cleanup_queue_
with a std::lock_guard or std::scoped_lock, update RequestGcLocalCleanup to lock
before checking/inserting and pushing to the queue, and update the file cleaner
background task (and any other places that read or pop from these structures) to
acquire the same lock around those operations. Ensure no access to these two
containers occurs without holding gc_cleanup_mu_ so the pair of operations
remains atomic.

KvError MakeCowRoot(const TableIdent &tbl_ident, CowRootMeta &cow_meta);

void UpdateRoot(const TableIdent &tbl_ident, CowRootMeta new_meta);
void MarkManifestMissing(const TableIdent &tbl_ident);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify locking/serialization around MarkManifestMissing and RootMeta updates.
rg -n "IndexPageManager::MarkManifestMissing|RootMetaMgr::Find\\(|RootMetaMgr::UpdateBytes\\(|manifest_size_" include src -C3

Repository: eloqdata/eloqstore

Length of output: 9831


🏁 Script executed:

# Find RootMetaMgr class definition with lock information
rg -n "class RootMetaMgr|mu_|lock_|mutex_|rwlock_" include/storage src/storage -B2 -A10 | head -100

Repository: eloqdata/eloqstore

Length of output: 4975


🏁 Script executed:

# Check the full Find() implementation for any lock acquisition
sed -n '204,220p' src/storage/root_meta.cpp

Repository: eloqdata/eloqstore

Length of output: 404


🏁 Script executed:

# Check the full UpdateBytes() implementation
sed -n '250,270p' src/storage/root_meta.cpp

Repository: eloqdata/eloqstore

Length of output: 502


🏁 Script executed:

# Look for lock guards around MarkManifestMissing
sed -n '343,368p' src/storage/index_page_manager.cpp

Repository: eloqdata/eloqstore

Length of output: 829


🏁 Script executed:

# Get full RootMetaMgr class definition
sed -n '96,250p' include/storage/root_meta.h

Repository: eloqdata/eloqstore

Length of output: 3146


🏁 Script executed:

# Search for all callers of MarkManifestMissing to see the call context
rg -n "MarkManifestMissing" include src

Repository: eloqdata/eloqstore

Length of output: 351


🏁 Script executed:

# Look for mutex or lock fields in root_meta files
rg -n "mu_|mutex_|lock_|rwlock_|RWLock" include/storage/root_meta.h src/storage/root_meta.cpp

Repository: eloqdata/eloqstore

Length of output: 44


🏁 Script executed:

# Get Pin/Unpin implementation
rg -n "void RootMetaMgr::Pin|void RootMetaMgr::Unpin" src/storage -A10

Repository: eloqdata/eloqstore

Length of output: 1245


🏁 Script executed:

# Check context around MarkManifestMissing call in file_gc.cpp
sed -n '770,795p' src/file_gc.cpp

Repository: eloqdata/eloqstore

Length of output: 665


🏁 Script executed:

# Search for any synchronization around the GC operations
sed -n '750,800p' src/file_gc.cpp

Repository: eloqdata/eloqstore

Length of output: 1412


Add synchronization to MarkManifestMissing() or document its single-threaded usage.

The implementation calls Find() and directly accesses meta_ fields without synchronization. While RootMetaMgr provides Handle with Pin/Unpin for reference counting, MarkManifestMissing() bypasses this pattern entirely, leaving concurrent access to manifest_size_ (and other meta fields) unprotected. Either acquire the entry via Handle before mutation or add explicit locking, or clearly document that this method is only safe when called from a single-threaded context.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/storage/index_page_manager.h` at line 58, MarkManifestMissing()
mutates meta_ (e.g. manifest_size_) after calling Find() without using the
RootMetaMgr Handle/Pin/Unpin or any lock, risking concurrent access; fix it by
acquiring the entry via RootMetaMgr::Handle (call Find(tbl_ident) to get a
Handle), Pin() before mutating meta_ and Unpin() when done (or wrap the mutation
in the existing per-entry mutex if one exists), or alternatively add explicit
synchronization around access to meta_ (e.g. a std::mutex guarding
manifest_size_ updates) and use it in MarkManifestMissing(); if you
intentionally rely on single-threaded use, add clear documentation on
MarkManifestMissing() stating that it must only be called from a single-threaded
context.

Comment on lines 577 to +601
{
const auto &task = delete_tasks[i];
const bool manifest_gone =
basenames_to_delete[i] == current_manifest &&
(task.error_ == KvError::NoError ||
task.error_ == KvError::NotFound);
if (manifest_gone && deleted_current_manifest != nullptr)
{
*deleted_current_manifest = true;
}
if (task.error_ != KvError::NoError)
{
LOG(ERROR) << "Failed to delete file " << task.remote_path_ << ": "
<< ErrorString(task.error_);
return task.error_;
if (first_error == KvError::NoError)
{
first_error = task.error_;
}
continue;
}

deleted_filenames.push_back(basenames_to_delete[i]);
}

return KvError::NoError;
return first_error;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Treat KvError::NotFound as success for GC delete bookkeeping.

Currently NotFound paths are recorded as errors and excluded from deleted_filenames, so local cleanup may not run for files already gone remotely (stale local cache can remain).

Suggested idempotent delete handling
-    for (size_t i = 0; i < delete_tasks.size(); ++i)
+    for (size_t i = 0; i < delete_tasks.size(); ++i)
     {
         const auto &task = delete_tasks[i];
+        const bool delete_succeeded =
+            (task.error_ == KvError::NoError || task.error_ == KvError::NotFound);
         const bool manifest_gone =
             basenames_to_delete[i] == current_manifest &&
-            (task.error_ == KvError::NoError ||
-             task.error_ == KvError::NotFound);
+            delete_succeeded;
         if (manifest_gone && deleted_current_manifest != nullptr)
         {
             *deleted_current_manifest = true;
         }
-        if (task.error_ != KvError::NoError)
+        if (!delete_succeeded)
         {
             LOG(ERROR) << "Failed to delete file " << task.remote_path_ << ": "
                        << ErrorString(task.error_);
             if (first_error == KvError::NoError)
             {
                 first_error = task.error_;
             }
             continue;
         }
 
         deleted_filenames.push_back(basenames_to_delete[i]);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/file_gc.cpp` around lines 577 - 601, The GC loop currently treats
KvError::NotFound as a failure and skips adding such basenames to
deleted_filenames (and may set first_error), leaving stale local files; change
the error-check branch in the loop that examines delete_tasks[i] so that only
errors other than KvError::NotFound are considered failures (i.e., replace the
condition that checks task.error_ != KvError::NoError with one that treats
KvError::NotFound as success), ensuring that basenames_to_delete[i] is appended
to deleted_filenames for both KvError::NoError and KvError::NotFound and that
first_error is updated only for true errors; keep the existing
manifest_gone/deleted_current_manifest logic which already treats NotFound as
success.

Comment on lines +65 to +82
std::vector<std::string> ListLocalPartitionFiles(const eloqstore::KvOptions &opts,
const eloqstore::TableIdent &tbl_id)
{
std::vector<std::string> result;
for (const std::string &store_path : opts.store_path)
{
fs::path partition_path = fs::path(store_path) / tbl_id.ToString();
if (!fs::exists(partition_path))
{
continue;
}
for (const auto &entry : fs::directory_iterator(partition_path))
{
result.push_back(entry.path().filename().string());
}
}
return result;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Harden ListLocalPartitionFiles against concurrent directory removal.

This helper can throw filesystem_error when the partition disappears between exists() and iteration.

Suggested resilient iteration
 std::vector<std::string> ListLocalPartitionFiles(const eloqstore::KvOptions &opts,
                                                  const eloqstore::TableIdent &tbl_id)
 {
     std::vector<std::string> result;
     for (const std::string &store_path : opts.store_path)
     {
         fs::path partition_path = fs::path(store_path) / tbl_id.ToString();
-        if (!fs::exists(partition_path))
+        std::error_code ec;
+        if (!fs::exists(partition_path, ec) || ec)
         {
             continue;
         }
-        for (const auto &entry : fs::directory_iterator(partition_path))
+        for (fs::directory_iterator it(partition_path, ec), end;
+             !ec && it != end;
+             it.increment(ec))
         {
-            result.push_back(entry.path().filename().string());
+            std::error_code type_ec;
+            if (it->is_regular_file(type_ec) && !type_ec)
+            {
+                result.push_back(it->path().filename().string());
+            }
         }
     }
     return result;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/gc.cpp` around lines 65 - 82, ListLocalPartitionFiles can throw
std::filesystem::filesystem_error if the partition directory is removed between
the exists() check and directory iteration; wrap the directory iteration in a
try/catch that catches std::filesystem::filesystem_error (or std::exception)
around the fs::directory_iterator loop and skip/continue on error so the
function returns the files it can read instead of propagating. Locate the
iteration over partition_path in ListLocalPartitionFiles (uses opts.store_path,
tbl_id.ToString(), and entry.path()) and add the resilient iteration/exception
handling there.

Comment on lines +137 to +170
TEST_CASE("local mode truncate preserves current manifest", "[gc][local]")
{
using namespace std::chrono_literals;

// // Verify partition directory exists
// REQUIRE(CheckLocalPartitionExists(local_gc_opts, tbl_id));
CleanupStore(local_gc_opts);

// // Truncate using MapVerifier (delete all data)
// tester.Truncate(0, true); // Delete all data
eloqstore::EloqStore *store = InitStore(local_gc_opts);
eloqstore::TableIdent tbl_id = {"gc_test", 1};
MapVerifier tester(tbl_id, store, false);
tester.SetValueSize(1000);

// // Wait for GC
// WaitForGC();
tester.Upsert(0, 100);
tester.Validate();
REQUIRE(CheckLocalPartitionExists(local_gc_opts, tbl_id));

tester.Truncate(0, true);

REQUIRE(WaitForCondition(3s,
20ms,
[&]()
{
std::vector<std::string> files =
ListLocalPartitionFiles(local_gc_opts,
tbl_id);
if (files.empty())
{
return false;
}
return files.size() == 1 &&
files[0] == eloqstore::ManifestFileName(0);
}));

CleanupStore(local_gc_opts);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stop the store before CleanupStore(...) in both new local GC tests.

Both tests teardown with CleanupStore(...) while store is still active, which can race background IO/GC and cause nondeterministic failures.

Suggested teardown fix
 TEST_CASE("local mode truncate preserves current manifest", "[gc][local]")
 {
@@
-    CleanupStore(local_gc_opts);
+    store->Stop();
+    CleanupStore(local_gc_opts);
 }
@@
 TEST_CASE("local mode clean manifest removes empty partition directory",
           "[gc][local]")
 {
@@
-    CleanupStore(opts);
+    store->Stop();
+    CleanupStore(opts);
 }

Also applies to: 172-227

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/gc.cpp` around lines 137 - 170, The tests create a live EloqStore*
named store via InitStore(...) but call CleanupStore(...) while store is still
active, which can race with background IO/GC; fix both tests (the TEST_CASE
"local mode truncate preserves current manifest" and the other at lines 172-227)
by stopping/tearing down the store before calling CleanupStore: explicitly stop
the store (e.g., call the store shutdown method or delete the store pointer and
set store = nullptr) after the test operations and before
CleanupStore(local_gc_opts) to ensure all background threads have stopped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature]: Remove empty directory after gc the entire partition dir

1 participant