Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion log_service
205 changes: 186 additions & 19 deletions store_handler/data_store_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
*/
#include "data_store_service_client.h"

#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <glog/logging.h>

#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <cctype>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <random>
#include <string>
Expand All @@ -40,6 +45,7 @@
#include "cc_req_misc.h"
#include "data_store_service_client_closure.h"
#include "eloq_data_store_service/data_store_service_config.h"
#include "eloq_data_store_service/data_store_service_util.h"
#include "eloq_data_store_service/object_pool.h" // ObjectPool
#include "eloq_data_store_service/thread_worker_pool.h"
#include "metrics.h"
Expand All @@ -50,6 +56,12 @@
#include "tx_service/include/error_messages.h"
#include "tx_service/include/sequences/sequences.h"

#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
#include "gflags/gflags.h"
DECLARE_string(eloq_store_data_path_list);
DECLARE_string(tx_standby_ip_port_list);
#endif

namespace EloqDS
{

Expand Down Expand Up @@ -3661,6 +3673,33 @@ bool DataStoreServiceClient::CreateSnapshotForBackup(
return !callback_data->HasError();
}

bool DataStoreServiceClient::CreateSnapshotForStandby(
uint32_t ng_id,
std::vector<std::string> &snapshot_files,
uint64_t snapshot_ts)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
if (!bind_data_shard_with_ng_ || data_store_service_ == nullptr)
{
return false;
}
DLOG(INFO) << "CreateSnapshotForStandby begin, generated_snapshot_ts="
<< snapshot_ts;
const bool ok = data_store_service_->CreateSnapshotForStandby(
ng_id, ng_id, snapshot_ts);
snapshot_files.clear();
DLOG(INFO) << "CreateSnapshotForStandby success, snapshot_ts="
<< snapshot_ts << ", snapshot_files=" << snapshot_files.size()
<< ", ok=" << ok;
return ok;
#else
(void) ng_id;
(void) snapshot_files;
(void) snapshot_ts;
return false;
#endif
}
Comment on lines +3676 to +3701
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the signature of DataStoreService::CreateSnapshotForStandby
ast-grep --pattern $'CreateSnapshotForStandby($_, $_, $_) {
  $$$
}'

# Also search for declaration
rg -n "CreateSnapshotForStandby" --type cpp -A 2 -B 1

Repository: eloqdata/tx_service

Length of output: 8104


Output parameter snapshot_files is cleared but never populated.

The function signature promises to populate snapshot_files with snapshot file paths (line 3641), but the implementation clears the vector (line 3653) and never populates it. The underlying data_store_service_->CreateSnapshotForStandby() method doesn't provide a way to retrieve snapshot files, making it impossible to fulfill this contract. Multiple callers (snapshot_manager.cpp, data_sync_task.cpp, checkpointer.cpp) expect snapshot_files to contain actual file paths after this call returns.

Either remove the snapshot_files parameter from the function signature, provide a way to retrieve snapshot file paths from the underlying service, or document that this parameter will remain empty.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.cpp` around lines 3639 - 3663, The
CreateSnapshotForStandby implementation clears snapshot_files but never fills
it; update DataStoreServiceClient::CreateSnapshotForStandby to either (A)
populate snapshot_files by invoking or adding a method on data_store_service_
that returns the generated snapshot file paths (e.g., extend
data_store_service_->CreateSnapshotForStandby or add
data_store_service_->GetSnapshotFiles(ng_id, snapshot_ts, snapshot_files) and
call it after the snapshot is created), or (B) remove the snapshot_files
parameter from the DataStoreServiceClient::CreateSnapshotForStandby signature
and update callers (snapshot_manager.cpp, data_sync_task.cpp, checkpointer.cpp)
accordingly so they no longer expect file paths, or (C) if files are
intentionally unavailable, document in the function comment and update all
callers to handle an empty snapshot_files; ensure you reference the
DataStoreServiceClient::CreateSnapshotForStandby and
data_store_service_->CreateSnapshotForStandby symbols when making the change.


/**
* @brief Internal method for creating snapshots for backup operations.
*
Expand Down Expand Up @@ -3778,6 +3817,17 @@ bool DataStoreServiceClient::OnLeaderStart(uint32_t ng_id,
LOG_IF(FATAL, bucket_ids.empty())
<< "bucket_ids is empty, ng_id: " << ng_id;
// Binded data store shard with ng.
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
const bool enable_local_standby =
!FLAGS_tx_standby_ip_port_list.empty();
data_store_service_->SetEnableLocalStandbyForEloqStore(
enable_local_standby);
data_store_service_->ClearStandbySnapshotPayloadForEloqStore(ng_id);
DLOG(INFO) << "OnLeaderStart reset eloqstore standby snapshot payload, "
<< "enable_local_standby=" << enable_local_standby
<< ", tx_standby_ip_port_list="
<< FLAGS_tx_standby_ip_port_list;
#endif
data_store_service_->OpenDataStore(ng_id, std::move(bucket_ids), term);
}

Expand All @@ -3796,6 +3846,8 @@ bool DataStoreServiceClient::OnLeaderStop(uint32_t ng_id, int64_t term)
if (data_store_service_ != nullptr)
{
// Close the data store shard.
LOG(INFO) << "DataStoreServiceClient::OnLeaderStop closing data store, "
<< "ng_id=" << ng_id << ", term=" << term << ", role=leader";
data_store_service_->CloseDataStore(ng_id);
}
return true;
Expand All @@ -3821,6 +3873,15 @@ void DataStoreServiceClient::OnStartFollowing(uint32_t ng_id,

if (data_store_service_ != nullptr)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
const bool enable_local_standby =
!FLAGS_tx_standby_ip_port_list.empty();
data_store_service_->SetEnableLocalStandbyForEloqStore(
enable_local_standby);
DLOG(INFO) << "OnStartFollowing set eloqstore enable_local_standby="
<< enable_local_standby << ", tx_standby_ip_port_list="
<< FLAGS_tx_standby_ip_port_list;
#endif
data_store_service_->CloseDataStore(ng_id);
}

Expand Down Expand Up @@ -3869,52 +3930,158 @@ void DataStoreServiceClient::OnShutdown()
{
}

std::string DataStoreServiceClient::SnapshotSyncDestPath() const
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
return FLAGS_eloq_store_data_path_list;
#else
Comment on lines +3933 to +3937
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return a real sync directory here, not the raw *_path_list config.

SnapshotSyncDestPath() is a singular destination accessor, but it currently returns FLAGS_eloq_store_data_path_list verbatim. As soon as that flag contains multiple or weighted paths, callers will get a config blob instead of a usable filesystem path, and snapshot sync will target an invalid destination.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.cpp` around lines 3899 - 3903,
SnapshotSyncDestPath() currently returns the raw FLAGS_eloq_store_data_path_list
which may be a multi/weighted-list; change it to parse that config and return a
single concrete filesystem directory (e.g., pick the primary/first path or the
unweighted path entry), not the raw blob. In the
DataStoreServiceClient::SnapshotSyncDestPath implementation, replace the direct
return of FLAGS_eloq_store_data_path_list with logic to split the list format,
handle weights/whitespace, validate the chosen path exists or is well-formed,
and return that single path string so callers get a usable destination rather
than the full config list.

return std::string("");
#endif
}

bool DataStoreServiceClient::OnSnapshotReceived(
const txservice::remote::OnSnapshotSyncedRequest *req)
{
(void) req;
return true;
}

bool DataStoreServiceClient::OnUpdateStandbyCkptTs(uint32_t ng_id,
int64_t ng_term,
uint64_t snapshot_ts,
bool skip_reload_data)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
if (!bind_data_shard_with_ng_)
{
DLOG(INFO) << "bind_data_shard_with_ng_ is false, return";
return true;
}

if (data_store_service_ != nullptr)
assert(data_store_service_ != nullptr);
if (!skip_reload_data)
{
uint32_t ng_id = req->ng_id();
std::unordered_set<uint16_t> bucket_ids;
for (auto &[bucket_id, bucket_info] : bucket_infos_)
const bool reload_ok =
data_store_service_->ReloadData(ng_id, ng_term, snapshot_ts);
if (!reload_ok)
{
if (bucket_info->BucketOwner() == ng_id)
{
bucket_ids.insert(bucket_id);
}
LOG(WARNING)
<< "DataStoreServiceClient::OnUpdateStandbyCkptTs skip ckpt "
"update because reload failed, ng_id="
<< ng_id << ", term=" << ng_term
<< ", snapshot_ts=" << snapshot_ts;
return false;
}
int64_t term =
txservice::PrimaryTermFromStandbyTerm(req->standby_node_term());
data_store_service_->OnSnapshotReceived(
ng_id, term, std::move(bucket_ids), req->snapshot_path());

return true;
}
#endif
return true;
}

bool DataStoreServiceClient::OnUpdateStandbyCkptTs(uint32_t ng_id,
int64_t ng_term)
bool DataStoreServiceClient::RequestSyncSnapshot(uint32_t ng_id,
int64_t ng_term,
uint64_t snapshot_ts)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
if (!bind_data_shard_with_ng_)
{
return true;
}
assert(data_store_service_ != nullptr);
std::unordered_set<uint16_t> bucket_ids;
for (auto &[bucket_id, bucket_info] : bucket_infos_)
{
if (bucket_info->BucketOwner() == ng_id)
{
bucket_ids.insert(bucket_id);
}
}
DLOG(INFO) << "DataStoreServiceClient::RequestSyncSnapshot prepare open "
"data store, ng_id="
<< ng_id << ", term=" << ng_term
<< ", bucket_count=" << bucket_ids.size()
<< ", snapshot_ts=" << snapshot_ts;
if (data_store_service_->FetchDSShardStatus(ng_id) == DSShardStatus::Closed)
{
data_store_service_->OpenDataStore(
ng_id, std::move(bucket_ids), ng_term);
}
return data_store_service_->ReloadData(ng_id, ng_term, snapshot_ts);
#else
(void) ng_id;
(void) ng_term;
(void) snapshot_ts;
return false;
#endif
}

void DataStoreServiceClient::DeleteStandbySnapshot(uint32_t ng_id,
uint64_t snapshot_ts)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
if (data_store_service_ == nullptr)
{
return;
}
const bool ok =
data_store_service_->DeleteStandbySnapshot(ng_id, snapshot_ts);
DLOG(INFO) << "DataStoreServiceClient::DeleteStandbySnapshot, ng_id="
<< ng_id << ", snapshot_ts=" << snapshot_ts << ", ok=" << ok;
#else
(void) ng_id;
(void) snapshot_ts;
#endif
}

void DataStoreServiceClient::DeleteStandbySnapshotsBefore(uint32_t ng_id,
uint64_t snapshot_ts)
{
if (!bind_data_shard_with_ng_)
{
return;
}
assert(data_store_service_ != nullptr);
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
data_store_service_->DeleteStandbySnapshotsBefore(ng_id, snapshot_ts);
#else
(void) ng_id;
(void) snapshot_ts;
#endif
}

uint64_t DataStoreServiceClient::CurrentStandbySnapshotTs(uint32_t ng_id)
{
if (!bind_data_shard_with_ng_)
{
return 0;
}
assert(data_store_service_ != nullptr);
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
const uint64_t ts = data_store_service_->CurrentStandbySnapshotTs(ng_id);
DLOG(INFO) << "CurrentStandbySnapshotTs from data_store_service, ng_id="
<< ng_id << ", snapshot_ts=" << ts;
return ts;
#else
(void) ng_id;
return 0;
#endif
}

void DataStoreServiceClient::SetStandbySnapshotPayload(
uint32_t ng_id, const std::string &snapshot_path)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
if (!bind_data_shard_with_ng_)
{
return;
}

if (data_store_service_ != nullptr)
{
data_store_service_->OnUpdateStandbyCkptTs(ng_id, ng_term);
data_store_service_->SetStandbySnapshotPayload(ng_id, snapshot_path);
}
#else
(void) ng_id;
(void) snapshot_path;
#endif
return true;
}

/**
Expand Down
35 changes: 29 additions & 6 deletions store_handler/data_store_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <cstdint>
#include <deque>
#include <functional>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -198,7 +199,11 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
{
if (bind_data_shard_with_ng_ && data_store_service_ != nullptr)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
return true;
#else
return data_store_service_->IsCloudMode();
#endif
}
else
{
Expand Down Expand Up @@ -452,11 +457,10 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
std::vector<txservice::VersionTxRecord> &archives,
uint64_t from_ts) override;

/**
* @brief Create a snapshot for backup.
* @param snapshot_files The output snapshot files.
* @return True if create successfully, otherwise false.
*/
bool CreateSnapshotForStandby(uint32_t ng_id,
std::vector<std::string> &snapshot_files,
uint64_t snapshot_ts) override;

bool CreateSnapshotForBackup(const std::string &backup_name,
std::vector<std::string> &backup_files,
uint64_t backup_ts = 0) override;
Expand All @@ -480,6 +484,13 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler

void OnShutdown() override;

/**
* For EloqStore local-standby mode, snapshot sync is pull-based:
* standby/follower pulls data from master. This returns the master-side
* source path list (optionally with weights) to send to standby.
*/
std::string SnapshotSyncDestPath() const override;

bool RemoveBackupSnapshot(const std::string &backup_name) override
{
return true;
Expand All @@ -488,7 +499,19 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
bool OnSnapshotReceived(
const txservice::remote::OnSnapshotSyncedRequest *req) override;

bool OnUpdateStandbyCkptTs(uint32_t ng_id, int64_t ng_term) override;
bool OnUpdateStandbyCkptTs(uint32_t ng_id,
int64_t ng_term,
uint64_t snapshot_ts,
bool skip_reload_data = false) override;
bool RequestSyncSnapshot(uint32_t ng_id,
int64_t ng_term,
uint64_t snapshot_ts) override;
void DeleteStandbySnapshot(uint32_t ng_id, uint64_t snapshot_ts) override;
void DeleteStandbySnapshotsBefore(uint32_t ng_id,
uint64_t snapshot_ts) override;
uint64_t CurrentStandbySnapshotTs(uint32_t ng_id) override;
void SetStandbySnapshotPayload(uint32_t ng_id,
const std::string &snapshot_path) override;

/**
* Serialize a record with is_deleted flag and record string.
Expand Down
3 changes: 2 additions & 1 deletion store_handler/eloq_data_store_service/build_eloq_store.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ set(ELOQ_STORE_SOURCES
${ELOQ_STORE_SOURCE_DIR}/src/file_gc.cpp
${ELOQ_STORE_SOURCE_DIR}/src/kill_point.cpp
${ELOQ_STORE_SOURCE_DIR}/src/replayer.cpp
${ELOQ_STORE_SOURCE_DIR}/src/standby_service.cpp
${ELOQ_STORE_SOURCE_DIR}/src/storage/cloud_backend.cpp
${ELOQ_STORE_SOURCE_DIR}/src/storage/data_page.cpp
${ELOQ_STORE_SOURCE_DIR}/src/storage/data_page_builder.cpp
Expand Down Expand Up @@ -143,7 +144,7 @@ if(WITH_TXSERVICE)
# Add include directory for metrics headers
# From build_eloq_store.cmake location, eloq_metrics is at ../../eloq_metrics
# CMAKE_CURRENT_LIST_DIR is the directory of this file
target_include_directories(eloqstore PRIVATE
target_include_directories(eloqstore PRIVATE
${CMAKE_CURRENT_LIST_DIR}/../../eloq_metrics/include)
# ELOQSTORE_WITH_TXSERVICE is already defined via add_compile_definitions in parent CMakeLists.txt
# but we ensure it's also set for this target explicitly
Expand Down
Loading
Loading