Conversation
|
Caution Review failedThe pull request is closed. Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds standby file-cache synchronization: new SyncFileCache RPC and request type, worker threads to collect and propagate SST file metadata, logic to determine and manage local SST cache, optional S3 downloader, and factory/config accessors to expose storage/S3 settings and cache-size configuration. Changes
Sequence Diagram(s)sequenceDiagram
participant Primary as Primary Node
participant Standby as Standby Node
participant S3 as S3 Storage
rect `#F0F8FF`
Note over Primary: Periodic trigger on primary
Primary->>Primary: FileCacheSyncWorker collects cached SST info
Primary->>Standby: SyncFileCache RPC (shard_id, FileInfo list)
end
rect `#F0FFF0`
Note over Standby: Standby applies sync
Standby->>Standby: ProcessSyncFileCache → DetermineFilesToKeep
Standby->>Standby: Delete non-kept local SSTs
end
rect `#FFF5E6`
Note over Standby,S3: Optional S3 retrieval for missing SSTs
Standby->>Standby: CreateS3Downloader()
Standby->>S3: DownloadFile(s) for missing SSTs
S3-->>Standby: Stream object → write local file
end
rect `#F5F5FF`
Note over Standby: Finish
Standby->>Standby: Finish request, update state
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (5)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
This PR is being reviewed by Cursor Bugbot
Details
You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
eloq_data_store_service/CMakeLists.txt (1)
380-407: CMake syntax error confirmed—elseif blocks orphaned after endifThe actual file matches the review comment exactly. After the second
endif()at line 397, theelseifstatements at lines 398 and 403 have no parentifblock to belong to. CMake requireselseifto be part of a continuous if/elseif/endif chain; they cannot appear after anendif(). This will cause CMake configuration to fail.The suggested restructuring in the review comment is correct: consolidate into a single if/elseif/elseif/endif chain to restore valid syntax.
🧹 Nitpick comments (10)
eloq_data_store_service/ds_request.proto (1)
42-44: SyncFileCache RPC and messages look consistent; consider future extensibilityThe new
SyncFileCacheRPC and theFileInfo/SyncFileCacheRequestmessages are well-structured and consistent with the existing proto style (per‑shard, usinguint32 shard_id,uint64sizes, and a dedicated message for file metadata). Usinggoogle.protobuf.Emptyis fine for a best‑effort warm‑up call; if you later need per‑file error reporting or metrics, you might want to introduce a small response message instead, but nothing blocks this PR.Also applies to: 357-367
eloq_data_store_service/data_store_service_config.h (1)
326-335: File cache sync interval wiring is correct; consider documenting semantics of 0 / small valuesCopy constructor, assignment operator, getter/setter, and the default
file_cache_sync_interval_sec_{30}are wired consistently and follow the existing locking pattern. One improvement would be to document (and, if needed, clamp in the setter) howinterval_sec == 0or very small intervals are interpreted (disabled vs. tight loop) so future callers don’t accidentally misconfigure the sync worker.Also applies to: 342-360, 457-465, 526-528
eloq_data_store_service/data_store_service_config.cpp (1)
803-815: Interval accessors are correctly synchronized; clarify configuration sourceThe new
SetFileCacheSyncIntervalSec/GetFileCacheSyncIntervalSecmethods use the sameshared_mutexpattern as the rest of the cluster manager and cleanly expose the interval. Behaviorally, the interval is currently only set programmatically (not from the INI config); if that’s intentional, consider adding a brief comment near the field or setter noting expected configuration flow (e.g., “runtime‑tunable only”) so it’s clear whyLoad()doesn’t touch it.eloq_data_store_service/rocksdb_cloud_data_store.cpp (1)
38-38: SST cache collection logic is sound; minor robustness and efficiency tweaks possibleThe implementation of
CollectCachedSstFilescorrectly:
- Guards DB access with
db_mux_.- Short‑circuits when
db_is null.- Intersects live file metadata with locally present
.sst-files.- Populates
FileInfo(name, size, parsedfile_number) and logs the count.
ExtractFileNumberis also defensive about malformed filenames (missing.sst-, all zeros, parse failures) and degrades gracefully by logging and returning 0.Two small improvements you might consider:
- Hoist the
dynamic_cast<rocksdb::CloudFileSystem *>and null‑check once before the metadata loop (mirroring theCloudFileSystemImplcheck earlier in the file) instead of per‑entry casting.- If directory traversal errors other than the initial
directory_iteratorconstruction are a concern in your environment, wrap the loop in atry/catchor use theerror_codeoverloads on increment as well to avoid unexpected exceptions during cache collection.These are optional; the current behavior is otherwise coherent with the rest of the RocksDB Cloud startup and shutdown paths.
Also applies to: 836-897, 899-929
eloq_data_store_service/s3_file_downloader.h (1)
24-28: Be aware of AWS SDK header dependency and static-analysis error; consider minimizing header includes.Static analysis reports
'aws/core/Aws.h' file not foundhere. That may just mean the analysis environment lacks the AWS SDK, but to keep this header lighter and reduce coupling you can:
- Remove
<aws/core/Aws.h>from the header (it isn’t needed just to hold astd::shared_ptr<Aws::S3::S3Client>), and- Keep the heavier AWS includes in the
.cppwhere they’re actually used.That will avoid spurious failures in tools that don’t have the SDK while still compiling fine in your real build where the SDK is available.
If your build actually depends on
<aws/core/Aws.h>from other translation units, we should instead ensure the include paths are correctly configured rather than removing it.Also applies to: 32-65
eloq_data_store_service/data_store_service.cpp (4)
295-346: SyncFileCache RPC routing is sensible, but error handling on worker-init failures could be clearer.Positives:
- Validates
shard_idmatchesshard_id_and only processes whenshard_status_ == DSShardStatus::Closed, which correctly targets standby nodes.- Offloads heavy file I/O to
file_sync_worker_to avoid blocking the bthread.Concerns / suggestions:
- When
file_sync_worker_ == nullptrorSubmitWorkfails, the code logs an error, callsreq->Free(), and intentionally lets the RPC “timeout”. This makes failures opaque to the caller and can look like a network issue instead of a server-side problem.- Since the response type is
Empty, a common pattern is to set a controller error (cntl->SetFailed("...")) and then calldone->Run()to return a clear failure to the primary.If feasible, consider marking the RPC as failed instead of relying on timeouts so operators can distinguish logic/config errors from transport issues.
1348-1478: ProcessSyncFileCache: overall flow is good, but relies on specific path layout and S3 downloader creation.The three-step flow is clear:
- Build a
file_info_mapand determinefiles_to_keepviaGetSstFileCacheSizeLimit()andDetermineFilesToKeep().- Iterate the local DB directory and delete
.sst-files not infiles_to_keep.- Optionally (S3 build) instantiate an
S3FileDownloaderand download any missing kept files.Points to verify / small concerns:
DB path construction:
db_path = storage_path + "/ds_" + std::to_string(shard_id_) + "/db/";assumes RocksDB Cloud layouts always follow this convention. If configs ever differ, this could log “Failed to list local directory” and skip sync. Please confirm this matchesRocksDBCloudDataStore’sdb_path_construction.Robustness to empty / misconfigured factory: You already guard against
data_store_factory_ == nullptrand emptystorage_path, which is good. Errors callreq->Finish()so the RPC completes.Deletion filter: Limiting deletions to filenames containing
.sst-(and regular files) is a good safety check; it avoids touching WALs or other metadata.S3 downloader reuse:
CreateS3Downloader()is called per request. If SyncFileCache is frequent and the number of files is large, you might want to cache a downloader instance inDataStoreService(or in the factory) to avoid repeatedly constructing S3 clients, but this is an optimization, not a correctness bug.
2761-2855: FileCacheSyncWorker: primary-node sender loop is reasonable, but lacks backoff/timeout control on SyncFileCache RPCs.Behavior looks correct:
- Wakes up periodically using
file_cache_sync_cv_.wait_for(...)and exits cleanly whenfile_cache_sync_running_is set tofalse.- Only proceeds when
shard_status_ == ReadWriteanddata_store_is non-null, so standby or uninitialized nodes don’t attempt sync.- Uses
dynamic_cast<RocksDBCloudDataStore*>and skips if not cloud-backed, which keeps this feature limited to the right backend.- Collects cached SST files and fans out a
SyncFileCacheRequestto each member except self.Improvements to consider:
- The
SyncFileCacheRPC uses a defaultbrpc::Controllerwithout explicit timeout or retry. If a standby is slow or down, you may want to set a bounded timeout and optionally a small retry policy to avoid long hangs on a single node.- If
cluster_manager_.GetShard(shard_id_)ever changes membership frequently, you might want to snapshot the member list outside the loop or add minimal logging when the shard lookup fails (currently assumed to succeed).
2901-2917: GetSstFileCacheSizeLimit: double-check semantics of “0 means not applicable”.This method:
- Returns a default of
20ULL * 1024 * 1024 * 1024(20 GiB) if the factory is null orGetSstFileCacheSize()returns0.- Otherwise returns the configured value.
Note that this subtly changes the meaning of
0from “not applicable” (perDataStoreFactorydoc) to “use 20 GiB default”. That’s fine if it’s intentional, but if0was meant to disable cache-based warmup entirely, this will still keep up to 20 GiB of files.If you do want “no cache warmup” when factories return
0, consider returning0here as well and makingDetermineFilesToKeeptreatcache_size_limit == 0as “keep nothing”.eloq_data_store_service/data_store_service.h (1)
775-819: File cache sync helpers and members are cohesive and match the new feature design.
FileCacheSyncWorker,ProcessSyncFileCache, and (S3-only)CreateS3Downloaderform a clear internal API for the standby warm-up flow.GetSstFileCacheSizeLimitandDetermineFilesToKeepencapsulate cache sizing logic, keeping the RPC handler simpler.file_cache_sync_worker_,file_sync_worker_, and the associated mutex/CV/flag are appropriately scoped as private members.Once
<map>is included as noted above, this layout looks good.If this feature grows, you might later consider isolating file-cache sync into a dedicated helper class to reduce the size of
DataStoreService.Also applies to: 813-820
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
eloq_data_store_service/CMakeLists.txt(1 hunks)eloq_data_store_service/data_store_factory.h(1 hunks)eloq_data_store_service/data_store_service.cpp(7 hunks)eloq_data_store_service/data_store_service.h(4 hunks)eloq_data_store_service/data_store_service_config.cpp(2 hunks)eloq_data_store_service/data_store_service_config.h(4 hunks)eloq_data_store_service/ds_request.proto(2 hunks)eloq_data_store_service/eloq_store_data_store_factory.h(1 hunks)eloq_data_store_service/internal_request.h(1 hunks)eloq_data_store_service/rocksdb_cloud_data_store.cpp(2 hunks)eloq_data_store_service/rocksdb_cloud_data_store.h(2 hunks)eloq_data_store_service/rocksdb_cloud_data_store_factory.h(1 hunks)eloq_data_store_service/rocksdb_data_store_factory.h(1 hunks)eloq_data_store_service/s3_file_downloader.cpp(1 hunks)eloq_data_store_service/s3_file_downloader.h(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-22T09:48:06.044Z
Learnt from: githubzilla
Repo: eloqdata/store_handler PR: 65
File: eloq_data_store_service/rocksdb_cloud_data_store.cpp:392-397
Timestamp: 2025-09-22T09:48:06.044Z
Learning: In RocksDB Cloud data store, when FindMaxTermFromCloudManifestFiles returns false (no manifest files found), max_term = -1 is the intended behavior for new branches or first-time startup. The system correctly bootstraps from this state by creating new cookies with term 0.
Applied to files:
eloq_data_store_service/rocksdb_cloud_data_store.cpp
🧬 Code graph analysis (8)
eloq_data_store_service/internal_request.h (2)
eloq_data_store_service/object_pool.h (1)
Poolable(39-192)eloq_data_store_service/data_store_service.cpp (18)
Clear(188-197)Clear(188-188)done_guard(445-445)done_guard(455-455)done_guard(483-483)done_guard(492-492)done_guard(516-516)done_guard(528-528)done_guard(560-560)done_guard(571-571)done_guard(604-604)done_guard(618-618)done_guard(655-655)done_guard(666-666)done_guard(706-706)done_guard(718-718)done_guard(750-750)done_guard(761-761)
eloq_data_store_service/rocksdb_cloud_data_store.cpp (2)
eloq_data_store_service/data_store_service.cpp (1)
dir_ite(1389-1389)rocksdb_handler.cpp (2)
dir_ite(3518-3518)path(3530-3530)
eloq_data_store_service/s3_file_downloader.cpp (1)
eloq_data_store_service/s3_file_downloader.h (1)
S3FileDownloader(32-65)
eloq_data_store_service/s3_file_downloader.h (3)
eloq_data_store_service/data_store_factory.h (1)
EloqDS(29-92)eloq_data_store_service/rocksdb_cloud_data_store_factory.h (1)
EloqDS(32-123)eloq_data_store_service/s3_file_downloader.cpp (4)
S3FileDownloader(36-116)S3FileDownloader(118-118)DownloadFile(120-159)DownloadFile(120-121)
eloq_data_store_service/data_store_service.cpp (2)
eloq_data_store_service/data_store_service_util.h (1)
shard_id_(118-118)eloq_data_store_service/rocksdb_cloud_data_store.cpp (1)
dir_ite(854-854)
eloq_data_store_service/rocksdb_cloud_data_store.h (1)
eloq_data_store_service/rocksdb_cloud_data_store.cpp (4)
CollectCachedSstFiles(836-897)CollectCachedSstFiles(836-837)ExtractFileNumber(899-929)ExtractFileNumber(899-899)
eloq_data_store_service/data_store_service.h (7)
eloq_data_store_service/data_store_factory.h (1)
EloqDS(29-92)eloq_data_store_service/data_store_service_config.h (1)
EloqDS(98-529)eloq_data_store_service/rocksdb_cloud_data_store_factory.h (1)
EloqDS(32-123)eloq_data_store_service/s3_file_downloader.h (1)
EloqDS(29-67)eloq_data_store_service/thread_worker_pool.h (1)
EloqDS(34-58)eloq_data_store_service/data_store_service.cpp (4)
SyncFileCache(1295-1346)SyncFileCache(1295-1299)DetermineFilesToKeep(2919-2964)DetermineFilesToKeep(2919-2921)eloq_data_store_service/internal_request.h (3)
remote(1611-1614)string(1221-1224)string(1402-1405)
eloq_data_store_service/data_store_service_config.h (1)
eloq_data_store_service/data_store_service_config.cpp (4)
SetFileCacheSyncIntervalSec(803-808)SetFileCacheSyncIntervalSec(803-804)GetFileCacheSyncIntervalSec(810-814)GetFileCacheSyncIntervalSec(810-810)
🪛 Clang (14.0.6)
eloq_data_store_service/s3_file_downloader.h
[error] 24-24: 'aws/core/Aws.h' file not found
(clang-diagnostic-error)
🔇 Additional comments (10)
eloq_data_store_service/internal_request.h (1)
1591-1625: SyncFileCacheLocalRequest matches existing Poolable patternsThe new
SyncFileCacheLocalRequestfollows the same Poolable lifecycle as other*LocalRequesttypes:Clear()resets internal pointers,SetRequest()wires the request and closure, andFinish()usesbrpc::ClosureGuardto run the closure exactly once. This is consistent and sufficient for an Empty response RPC.eloq_data_store_service/rocksdb_cloud_data_store.h (1)
78-84: New SST cache APIs in the header look consistent with implementation
CollectCachedSstFiles(std::vector<::EloqDS::remote::FileInfo>&)and the privateExtractFileNumberhelper are declared with signatures that match the implementation and fit naturally intoRocksDBCloudDataStore’s responsibilities. As long asds_request.pb.his available via existing includes, no additional header changes are needed here.Also applies to: 155-161
eloq_data_store_service/data_store_factory.h (1)
43-89: Credential handling already follows best practices; no changes requiredVerification confirms that the interface extension is well-designed and the credential handling already aligns with the reviewer's concerns:
- Credentials are only fetched and immediately passed to
S3FileDownloader(data_store_service.cpp, lines 2891–2897); they are never logged or stored beyond what the AWS SDK requires.- The empty-string fallback to the default credential provider chain is already documented in comments (data_store_service.cpp line 2894, s3_file_downloader.h lines 40–41).
- Extensive search found no credential logging violations across the codebase; only informational logs about credential availability, never actual values.
- The
RocksDBCloudDataStoreFactoryimplementation is properly contained, with credentials stored in a privatecloud_config_member and only exposed through the getter when S3 storage is active.The design is sound and the stated conventions are already being followed. The original assessment does not match the actual implementation.
eloq_data_store_service/eloq_store_data_store_factory.h (1)
59-102: EloqStore factory getters look consistent with interface semantics.
GetStoragePath()safely handles an emptystore_pathvector, and the S3-related getters plusGetSstFileCacheSize()correctly return “not applicable” defaults for EloqStore.eloq_data_store_service/rocksdb_data_store_factory.h (1)
74-112: Non-cloud RocksDB factory getters are straightforward and correct.
GetStoragePath()delegates toconfig_.storage_path_, and all cloud/S3-related getters return empty or0, which matches “not applicable” semantics for this factory.eloq_data_store_service/rocksdb_cloud_data_store_factory.h (1)
77-115: Cloud factory getters correctly expose S3 and cache configuration.The mapping from
cloud_config_toGetS3BucketName/GetS3ObjectPath/GetS3Region/GetS3EndpointUrland credentials, plusGetSstFileCacheSize(), is consistent with howDataStoreService::CreateS3DownloaderandGetSstFileCacheSizeLimitconsume them.Please just double-check at config level that
bucket_prefix_ + bucket_name_yields a plain bucket name without embedded path segments, sinceCreateS3Downloaderassumes the path portion is entirely inGetS3ObjectPath().eloq_data_store_service/data_store_service.cpp (3)
224-261: Worker lifecycle wiring (file cache sync + file sync) looks correct.
- Destructor cleanly flips
file_cache_sync_running_under the mutex, signals the condition variable, then callsShutdown()onfile_cache_sync_worker_, and also shuts downfile_sync_worker_.StartServiceinitializes the workers only once (server_guard) and passes the configuredsync_interval_secintoFileCacheSyncWorker.This avoids obvious races or leaks around the new background workers.
Also applies to: 235-251, 336-349
2858-2898: CreateS3Downloader: config wiring is correct; just ensure S3 config completeness.This helper:
- Pulls bucket, object path, region, endpoint, and credentials from
DataStoreFactory.- Builds an S3 URL of the form
s3://{bucket}ors3://{bucket}/{object_path}/, ensuring a trailing slash when needed.- Falls back to default AWS credentials behavior when keys are empty.
The only hard failure condition is
bucket_name.empty(), which is reasonable.Please confirm that for your RocksDB Cloud configurations
GetS3Region()andGetS3EndpointUrl()are always set consistently with the actual S3/MinIO deployment; otherwise downloads may silently fail with endpoint/region mismatches.
2919-2964: DetermineFilesToKeep: selection strategy is clear and looks correct.
- Builds a vector of
(file_number, filename)pairs from thefile_info_map.- Sorts ascending by
file_number, so “older” SSTs (lower numbers) are considered first.- Iteratively adds files until the cumulative
file_sizewould exceedcache_size_limit, then stops.That yields a deterministic and size-bounded keep-set, and the logging of
files_to_keep.size()andcurrent_sizehelps with observability.eloq_data_store_service/data_store_service.h (1)
459-470: SyncFileCache RPC declaration matches implementation and is well-documented.The comment clearly describes purpose and parameters, and the signature aligns with the implementation in
data_store_service.cpp(controller,SyncFileCacheRequest,Emptyresponse,Closure *done).
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Bug: Race condition between file sync and DB open
The is_file_sync_running_ flag is set to true after creating the directory iterator but before the file deletion loop. This creates a race condition where ConnectAndStartDataStore could check the flag (line 383), see it as false, proceed to open the DB, while ProcessSyncFileCache is already iterating and deleting files. The flag needs to be set before any file system operations begin to properly synchronize with DB opening.
| size_t first_non_zero = base.find_first_not_of('0'); | ||
| if (first_non_zero == std::string::npos) | ||
| { | ||
| LOG(ERROR) << "All zeros in file number from " << file_name; |
There was a problem hiding this comment.
Bug: File number zero incorrectly treated as error
When extracting file numbers from SST filenames like 000000.sst-epoch, the function treats an all-zeros prefix as an error case and returns 0. However, file number 0 is a valid RocksDB SST file number. The function cannot distinguish between a legitimate file number 0 and an error condition since both return 0. This causes file number 0 SST files to be incorrectly prioritized or handled in cache management.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
eloq_data_store_service/s3_file_downloader.cpp (1)
121-131: ** Guard against nulls3_client_before dereferencing.**The constructor can return early (lines 47 or 73) without initializing
s3_client_, leaving it null. TheDownloadFilemethod then unconditionally dereferencess3_client_at line 131, which will crash. Add a null check at the start ofDownloadFile:if (!s3_client_) { LOG(ERROR) << "S3 client not initialized; cannot download " << s3_file_name; return false; }eloq_data_store_service/data_store_service.cpp (1)
1332-1351: ** RPC closure called twice on error paths causes undefined behavior.**After
done_guard.release()at line 1330, thedoneclosure ownership is transferred toreq. On error paths:
- Lines 1332-1338: Calls
req->Free()which invokesdone, then creates a newClosureGuard done_guard(done)which will calldoneagain- Lines 1345-1351: Same pattern
Calling the RPC closure twice causes undefined behavior. The fix is to remove the manual
brpc::ClosureGuard done_guard(done)lines, sincereq->Free()already handles callingdone:if (file_sync_worker_ == nullptr) { LOG(ERROR) << "FileSyncWorker not initialized, cannot process SyncFileCache"; req->Free(); - // Note: done_guard was released, so we need to manually call done - brpc::ClosureGuard done_guard(done); return; } bool res = file_sync_worker_->SubmitWork([this, req]() { ProcessSyncFileCache(req); }); if (!res) { req->Free(); LOG(ERROR) << "Failed to submit SyncFileCache work to file sync worker"; - // Note: done_guard was released, so we need to manually call done - brpc::ClosureGuard done_guard(done); }
🧹 Nitpick comments (2)
eloq_data_store_service/data_store_service.cpp (2)
1455-1467: Check shard status before downloading IDENTITY and creating CURRENT.IDENTITY and CURRENT file operations occur without checking if the shard status changed. If the shard is being opened concurrently, these operations should be skipped. Add a status check:
uint32_t downloaded_count = 0; + if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) + { + LOG(WARNING) << "Shard status is not closed, skipping file sync"; + is_file_sync_running_.store(false, std::memory_order_release); + req->Finish(); + return; + } if (!std::filesystem::exists(db_path + "IDENTITY")) {
2870-2893: Consider async RPC calls to avoid blocking on slow standby nodes.The loop sends SyncFileCache RPCs synchronously to each standby node. If one node is slow or unreachable, it delays syncing to other nodes. For better resilience, consider:
// Option 1: Set shorter timeout cntl.set_timeout_ms(1000); // 1 second timeout // Option 2: Use async RPCs with callbacks (more complex) // Submit all RPCs, then wait for completion
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
eloq_data_store_service/CMakeLists.txt(1 hunks)eloq_data_store_service/data_store_service.cpp(8 hunks)eloq_data_store_service/data_store_service.h(5 hunks)eloq_data_store_service/rocksdb_cloud_data_store.cpp(4 hunks)eloq_data_store_service/s3_file_downloader.cpp(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- eloq_data_store_service/rocksdb_cloud_data_store.cpp
🧰 Additional context used
🧬 Code graph analysis (3)
eloq_data_store_service/data_store_service.h (8)
eloq_data_store_service/data_store_factory.h (1)
EloqDS(29-92)eloq_data_store_service/eloq_store_data_store_factory.h (1)
EloqDS(30-109)eloq_data_store_service/data_store_service_config.h (1)
EloqDS(98-529)eloq_data_store_service/rocksdb_data_store_factory.h (1)
EloqDS(32-119)eloq_data_store_service/rocksdb_cloud_data_store_factory.h (1)
EloqDS(32-123)eloq_data_store_service/s3_file_downloader.h (1)
EloqDS(29-67)eloq_data_store_service/thread_worker_pool.h (1)
EloqDS(34-58)eloq_data_store_service/data_store_service.cpp (4)
SyncFileCache(1301-1352)SyncFileCache(1301-1305)DetermineFilesToKeep(2959-3004)DetermineFilesToKeep(2959-2961)
eloq_data_store_service/data_store_service.cpp (2)
eloq_data_store_service/data_store_service_util.h (1)
shard_id_(118-118)eloq_data_store_service/rocksdb_cloud_data_store.cpp (1)
dir_ite(855-855)
eloq_data_store_service/s3_file_downloader.cpp (1)
eloq_data_store_service/s3_file_downloader.h (1)
S3FileDownloader(32-65)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Cursor Bugbot
🔇 Additional comments (7)
eloq_data_store_service/data_store_service.h (1)
29-30: LGTM! Missing<map>include now resolved.The previously flagged missing
<map>include has been added at line 30, resolving the compilation issue forDetermineFilesToKeepsignature at lines 815-817.eloq_data_store_service/CMakeLists.txt (1)
402-406: LGTM! S3 downloader conditionally included for RocksDB Cloud S3 backend.The addition of
s3_file_downloader.cppto the build is correctly guarded by theELOQDSS_ROCKSDB_CLOUD_S3condition, ensuring it's only compiled when needed.eloq_data_store_service/data_store_service.cpp (5)
2898-2939: LGTM! S3 downloader creation is well-structured.The method correctly:
- Validates factory and bucket configuration
- Constructs proper S3 URLs with path handling
- Falls back to default credential provider when credentials are empty
2959-3004: LGTM! File retention logic correctly prioritizes older SST files.The method correctly sorts files by ascending file number and accumulates them until the cache size limit is reached. The implementation matches the documented behavior.
Note: If a single file exceeds
cache_size_limit, no files will be kept. This is the documented behavior, but ensure this edge case is acceptable for your use case.
235-250: LGTM! Worker cleanup properly signals shutdown.The destructor correctly:
- Signals file cache sync worker to stop via condition variable
- Waits for worker shutdown
- Shuts down file sync worker
336-349: LGTM! File cache sync workers initialized correctly for RocksDB Cloud S3.The initialization:
- Correctly guarded by compile-time flag
- Retrieves sync interval from cluster manager
- Creates both primary and standby workers
- Submits the periodic sync task
382-386: Ensureis_file_sync_running_is cleared on all ProcessSyncFileCache exit paths.This wait loop prevents concurrent DB operations during file sync. However, if
is_file_sync_running_is not properly cleared (see comment on line 1410), this will hang indefinitely. Ensure the flag is cleared on all return paths inProcessSyncFileCache.
| return; | ||
| } | ||
|
|
||
| is_file_sync_running_.store(true, std::memory_order_release); |
There was a problem hiding this comment.
Set is_file_sync_running_ to false on all early return paths.
The flag is set to true at line 1410 but only cleared at line 1517. Early returns at lines 1406, 1415, and 1450 leave the flag stuck at true, causing ConnectAndStartDataStore (lines 383-386) to wait indefinitely.
Move the flag setting to the start of the method and use RAII or ensure all return paths clear it:
+ is_file_sync_running_.store(true, std::memory_order_release);
+
const auto *request = req->GetRequest();
// Get storage path from factory (even though DB is closed, path still exists)
if (data_store_factory_ == nullptr)
{
LOG(ERROR) << "DataStoreFactory is null, cannot process file cache sync";
+ is_file_sync_running_.store(false, std::memory_order_release);
req->Finish();
return;
}Apply similar pattern to all early returns (lines 1372, 1406, 1450).
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.cpp around lines 1372–1517,
is_file_sync_running_ is set to true at line 1410 but not cleared on multiple
early-return paths (lines 1372, 1406, 1415, 1450), leaving the flag stuck and
causing ConnectAndStartDataStore (lines 383–386) to block; move the flag
initialization to the start of the method and use RAII (e.g., a scope
guard/unique object whose destructor sets is_file_sync_running_ to false) or
explicitly clear the flag on every early return so that all exit paths reset
is_file_sync_running_ to false.
| if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) | ||
| { | ||
| LOG(WARNING) << "Shard status is not closed, skipping file sync"; | ||
| break; | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Return immediately when shard status changes during file sync.
The check at lines 1413-1417 breaks the deletion loop when shard status is no longer Closed, but execution continues to the download phase. If the shard is being opened, all file operations should stop immediately. Change break to early return:
if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed)
{
LOG(WARNING) << "Shard status is not closed, skipping file sync";
- break;
+ is_file_sync_running_.store(false, std::memory_order_release);
+ req->Finish();
+ return;
}Apply the same pattern at lines 1472-1476 in the download loop.
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.cpp around lines 1413-1417 and
similarly at 1472-1476, the loops check shard_status_ and currently use break
which exits only the inner loop and allows subsequent file operations (like
downloads) to continue; change those break statements to an immediate return
from the function so all file processing halts as soon as shard_status_ is no
longer Closed, ensuring no further deletions or downloads occur when the shard
is opening.
File-cache synchronization across primary and standby nodes to warm up local caches. Automatic S3-backed retrieval of missing cache files with optional credentials and custom endpoint support. Configurable cache-sync interval (default 30s) to tune refresh frequency. SST cache size limit and deterministic retention policy to manage local cache contents. Collection of local cached SST metadata for coordinated syncing.
Note
Adds periodic primary→standby SST file-cache sync with a new RPC and S3 downloader, plus config and factory hooks to support it.
is_file_sync_running_and proper shutdown.SyncFileCachehandles incoming cache sync; asynchronously prunes local.sst-files to a size limit and downloads missing ones (S3 when enabled).DetermineFilesToKeepbyfile_numberandGetSstFileCacheSizeLimit(); builds localdb/path per shard.CreateS3Downloader()to fetchIDENTITY,CURRENT, and required SSTs.CollectCachedSstFiles()gathers live/local SST metadata;ExtractFileNumber()helper.FileInfoandSyncFileCacheRequest; new service RPCSyncFileCache.DataStoreFactorygains getters forGetStoragePath, S3 settings, andGetSstFileCacheSize; implemented inrocksdb_cloud_data_store_factory.h,rocksdb_data_store_factory.h,eloq_store_data_store_factory.h.DataStoreServiceClusterManageraddsGet/SetFileCacheSyncIntervalSec(default 30s).s3_file_downloader.{h,cpp}and includes AWS SDK headers/libs.Written by Cursor Bugbot for commit 61ff2ff. This will update automatically on new commits. Configure here.
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.