Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion store_handler/eloq_data_store_service/data_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DataStore
* @param term The term value to use when starting the database.
* @return True if start successfully, otherwise false.
*/
virtual bool StartDB(int64_t term) = 0;
virtual bool StartDB(int64_t term, uint32_t shard_id) = 0;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Read-only verification: find StartDB declarations/definitions in data store service code.
rg -nP -C2 '\bStartDB\s*\(\s*int64_t\s+term\s*(,\s*uint32_t\s+\w+\s*)?\)\s*(override)?' \
  store_handler/eloq_data_store_service

# Expected result:
# - No remaining `StartDB(int64_t term) override` in subclasses of DataStore.
# - All subclass declarations/definitions aligned to `(int64_t term, uint32_t shard_id)`.

Repository: eloqdata/tx_service

Length of output: 2179


Update RocksDBDataStore and RocksDBCloudDataStore to match the new StartDB signature.

The base class signature changed to StartDB(int64_t term, uint32_t shard_id), but rocksdb_data_store.h:51 and rocksdb_cloud_data_store.h:65 still declare StartDB(int64_t term) override. The corresponding definitions in .cpp files also use the old signature. This mismatch breaks the override contract and causes compilation failure. Update both the header declarations and .cpp implementations in both files.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/eloq_data_store_service/data_store.h` at line 65, The StartDB
signature changed to StartDB(int64_t term, uint32_t shard_id); update both the
declaration and definition for RocksDBDataStore::StartDB and
RocksDBCloudDataStore::StartDB to match that exact signature (include the
uint32_t shard_id parameter) and keep the override specifier; modify the
corresponding .cpp implementations to accept and use the shard_id parameter and
adjust any internal calls or forwarded calls to StartDB to pass the shard_id as
well so the override contract and compilation succeed.


/**
* @brief Close the data store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id,
return false;
}

res = shard_ref.data_store_->StartDB(term);
res = shard_ref.data_store_->StartDB(term, data_shard_id);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Find all StartDB call sites to verify they pass the shard_id parameter
# Expect: All call sites should pass two parameters (term and shard_id), or the factories should be excluded from this PR intentionally

echo "=== All StartDB call sites ==="
rg -n --type=cpp 'StartDB\s*\(' -A2 -B2

echo ""
echo "=== Calls with only one parameter (potential issues) ==="
rg -n --type=cpp '->StartDB\s*\([^,)]+\)\s*;'

Repository: eloqdata/tx_service

Length of output: 7594


🏁 Script executed:

# Check the RocksDBDataStore::StartDB implementation
echo "=== RocksDBDataStore::StartDB implementation ==="
sed -n '85,110p' store_handler/eloq_data_store_service/rocksdb_data_store.cpp

echo ""
echo "=== RocksDBCloudDataStore::StartDB implementation ==="
sed -n '263,290p' store_handler/eloq_data_store_service/rocksdb_cloud_data_store.cpp

echo ""
echo "=== Check base class interface ==="
sed -n '60,70p' store_handler/eloq_data_store_service/data_store.h

Repository: eloqdata/tx_service

Length of output: 2277


Update RocksDBDataStore and RocksDBCloudDataStore signatures to match the base interface.

The change at line 428 correctly passes shard_id to StartDB, but two critical implementations have incorrect signatures that violate the base class contract:

  • RocksDBDataStore::StartDB(int64_t term) - should accept uint32_t shard_id parameter
  • RocksDBCloudDataStore::StartDB(int64_t term) - should accept uint32_t shard_id parameter

The base class interface defines: virtual bool StartDB(int64_t term, uint32_t shard_id) = 0;

This will cause method resolution failures when factories for these implementations (rocksdb_data_store_factory.h, rocksdb_cloud_data_store_factory.h) create and initialize data stores from line 428.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/eloq_data_store_service/data_store_service.cpp` at line 428,
The implementations RocksDBDataStore::StartDB(int64_t term) and
RocksDBCloudDataStore::StartDB(int64_t term) have the wrong signature and must
be changed to match the base class virtual bool StartDB(int64_t term, uint32_t
shard_id); update both the method declarations and definitions to bool
RocksDBDataStore::StartDB(int64_t term, uint32_t shard_id) and bool
RocksDBCloudDataStore::StartDB(int64_t term, uint32_t shard_id), and adjust any
internal uses of the missing shard_id parameter; ensure the corresponding
factory code in rocksdb_data_store_factory.h and
rocksdb_cloud_data_store_factory.h and any callers (e.g., the call
shard_ref.data_store_->StartDB(term, data_shard_id)) compile against this
corrected signature.

if (!res)
{
LOG(ERROR) << "Failed to start db instance in data store service";
Expand Down
11 changes: 11 additions & 0 deletions store_handler/eloq_data_store_service/eloq_store_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ DEFINE_uint32(eloq_store_max_cloud_concurrency,
DEFINE_uint32(eloq_store_cloud_request_threads,
1,
"EloqStore cloud request thread number");
DEFINE_uint32(eloq_store_max_global_request_batch,
1000,
"EloqStore maximum number of requests processed in one global "
"batch.");
DEFINE_uint32(eloq_store_max_write_concurrency,
0,
"EloqStore maximum number of concurrent write tasks per shard; "
Expand Down Expand Up @@ -802,6 +806,13 @@ EloqStoreConfig::EloqStoreConfig(const INIReader &config_reader,
: config_reader.GetInteger("store",
"eloq_store_cloud_request_threads",
FLAGS_eloq_store_cloud_request_threads);
eloqstore_configs_.max_global_request_batch =
!CheckCommandLineFlagIsDefault("eloq_store_max_global_request_batch")
? FLAGS_eloq_store_max_global_request_batch
: config_reader.GetInteger(
"store",
"eloq_store_max_global_request_batch",
FLAGS_eloq_store_max_global_request_batch);
eloqstore_configs_.max_write_concurrency =
!CheckCommandLineFlagIsDefault("eloq_store_max_write_concurrency")
? FLAGS_eloq_store_max_write_concurrency
Expand Down
5 changes: 3 additions & 2 deletions store_handler/eloq_data_store_service/eloq_store_data_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ class EloqStoreDataStore : public DataStore

bool Initialize() override;

bool StartDB(int64_t term) override
bool StartDB(int64_t term, uint32_t data_shard_id = 0) override
{
::eloqstore::KvError res = eloq_store_service_->Start(term);
::eloqstore::KvError res =
eloq_store_service_->Start(term, data_shard_id);
Comment on lines +180 to +183
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== StartDB declarations/definitions =="
rg -n --type=cpp -C2 '\bStartDB\s*\('

echo
echo "== Potential one-argument StartDB invocations (missing shard_id) =="
rg -nP --type=cpp '\bStartDB\s*\(\s*[^,()]+\s*\)' -g '!**/build/**' -g '!**/dist/**'

echo
echo "Expected:"
echo "- No remaining one-argument call sites for StartDB."
echo "- Consistent signatures between DataStore and overrides."

Repository: eloqdata/tx_service

Length of output: 8177


Remove the default argument from StartDB override to match the base class interface.

Line 180 has data_shard_id = 0 only in the derived override, while DataStore::StartDB defines the parameter without a default. In C++, default arguments are statically bound, so this inconsistency allows one-argument calls that would violate the base class contract and weaken explicit shard propagation.

Suggested fix
-    bool StartDB(int64_t term, uint32_t data_shard_id = 0) override
+    bool StartDB(int64_t term, uint32_t data_shard_id) override
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/eloq_data_store_service/eloq_store_data_store.h` around lines
180 - 183, The StartDB override currently declares a default value for
data_shard_id which differs from the base DataStore::StartDB signature; remove
the default argument from the derived declaration so the signature is identical
(change bool StartDB(int64_t term, uint32_t data_shard_id = 0) to bool
StartDB(int64_t term, uint32_t data_shard_id)), leaving the body that calls
eloq_store_service_->Start(term, data_shard_id) unchanged; ensure any callers
rely on the base's parameter semantics rather than the removed default.

if (res != ::eloqstore::KvError::NoError)
{
LOG(ERROR) << "EloqStore start failed with error code: "
Expand Down
12 changes: 11 additions & 1 deletion tx_service/src/remote/cc_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,16 @@ void CcNodeService::UpdateStandbyCkptTs(
DLOG(INFO) << "Receive UpdateStandbyCkptTs req, req ckpt ts:"
<< request->primary_succ_ckpt_ts() << ", has_data_store_write: "
<< (int) request->has_data_store_write();
auto standby_node_term = Sharder::Instance().StandbyNodeTerm();
if (standby_node_term == -1 ||
(standby_node_term >> 32) != request->ng_term())
{
DLOG(INFO) << "Discard UpdateStandbyCkptTs req, req ckpt ts:"
<< request->primary_succ_ckpt_ts()
<< ", standby_node_term=" << standby_node_term;
response->set_error(true);
return;
}
if (request->primary_succ_ckpt_ts() <=
Sharder::Instance().NativeNodeGroupCkptTs())
{
Expand Down Expand Up @@ -2051,7 +2061,7 @@ void CcNodeService::RequestSyncSnapshot(
const uint32_t local_node_id = Sharder::Instance().NodeId();
DLOG(INFO) << "RequestSyncSnapshot RPC received, ng_id=" << request->ng_id()
<< ", snapshot_ts=" << request->snapshot_ts()
<< ", local_node_id=" << local_node_id << ", role=follower";
<< ", local_node_id=" << local_node_id;
auto store_hd = Sharder::Instance().GetLocalCcShards()->store_hd_;
if (!store_hd)
{
Expand Down
Loading