Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ba9f089
Initial commit
githubzilla Mar 6, 2026
f2af152
fix: remove assert(!dest_path.empty()) in CreateBackup for cloud mode
githubzilla Mar 8, 2026
995cbbf
chore: change default eloq_dss_branch_name from "development" to "main"
githubzilla Mar 8, 2026
85391a2
feat: read eloq_dss_branch_name from ini config file
githubzilla Mar 8, 2026
cdbf2be
feat: read eloq_dss_branch_name from ini config file in eloq_dss main
githubzilla Mar 8, 2026
c11fd93
fix: support idempotency in CreateBackup by returning Finished for ex…
githubzilla Mar 9, 2026
7e0c712
Add backup ts assert
githubzilla Mar 9, 2026
81fa368
Update eloqstore
githubzilla Mar 9, 2026
dcadd7b
Update eloqstore
githubzilla Mar 10, 2026
7095efd
Change default branch from development to main
githubzilla Mar 11, 2026
7d765d6
Update eloqstore submodule
githubzilla Mar 11, 2026
e286474
temp change the data file size
githubzilla Mar 11, 2026
f0f4c3a
Update eloqstore
githubzilla Mar 12, 2026
cfd6157
Update eloqstore
githubzilla Mar 13, 2026
e748ade
Restore default eloq_store_pages_per_file_shift
githubzilla Mar 13, 2026
8233cb5
Update eloqstore
githubzilla Mar 13, 2026
84cd906
Update eloqstore
githubzilla Mar 13, 2026
cca09f2
Update eloqstore
githubzilla Mar 14, 2026
b245ef7
Update eloqstore
githubzilla Mar 14, 2026
0b3210b
Update eloqstore
githubzilla Mar 15, 2026
74b382a
Update eloqstore
githubzilla Mar 15, 2026
5c5d5ac
Update eloqstore
githubzilla Mar 17, 2026
44bc457
Refine logs in RocksDBCloudDataStore
githubzilla Mar 17, 2026
082da9a
Update eloqstore
githubzilla Mar 17, 2026
9fe7fcb
Update eloqstore
githubzilla Mar 17, 2026
d18ca0d
Update eloqstore: remove unused parent_branch_ from GlobalCreateBranc…
githubzilla Mar 17, 2026
495ce1d
Update eloqstore
githubzilla Mar 17, 2026
aea167b
Fix clang-format
githubzilla Mar 17, 2026
30bc00f
Update eloqstore
githubzilla Mar 18, 2026
0828906
Update eloqstore
githubzilla Mar 18, 2026
eede0de
Update eloqstore
githubzilla Mar 18, 2026
b08d1ff
Fix config section for eloq_dss_branch_name to use 'store' instead of…
githubzilla Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/src/storage_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ DEFINE_string(eloq_dss_peer_node,
"",
"EloqDataStoreService peer node address. Used to fetch eloq-dss "
"topology from a working eloq-dss server.");
DEFINE_string(eloq_dss_branch_name,
"development",
"Branch name of EloqDataStore");
DEFINE_string(eloq_dss_branch_name, "main", "Branch name of EloqDataStore");
DEFINE_string(eloq_dss_config_file_path,
"",
"EloqDataStoreService config file path. Used to load eloq-dss "
Expand Down Expand Up @@ -277,7 +275,11 @@ bool DataSubstrate::InitializeStorageHandler(const INIReader &config_reader)
defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS)
EloqDS::RocksDBConfig rocksdb_config(config_reader, eloq_dss_data_path);
EloqDS::RocksDBCloudConfig rocksdb_cloud_config(config_reader);
rocksdb_cloud_config.branch_name_ = FLAGS_eloq_dss_branch_name;
rocksdb_cloud_config.branch_name_ =
!CheckCommandLineFlagIsDefault("eloq_dss_branch_name")
? FLAGS_eloq_dss_branch_name
: config_reader.GetString(
"store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name);
auto ds_factory = std::make_unique<EloqDS::RocksDBCloudDataStoreFactory>(
rocksdb_config,
rocksdb_cloud_config,
Expand All @@ -293,6 +295,11 @@ bool DataSubstrate::InitializeStorageHandler(const INIReader &config_reader)
eloq_dss_data_path,
core_config_.node_memory_limit_mb,
core_config_.core_num);
eloq_store_config.branch_name_ =
!CheckCommandLineFlagIsDefault("eloq_dss_branch_name")
? FLAGS_eloq_dss_branch_name
: config_reader.GetString(
"store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name);
auto ds_factory = std::make_unique<EloqDS::EloqStoreDataStoreFactory>(
std::move(eloq_store_config));
#endif
Expand Down
6 changes: 6 additions & 0 deletions store_handler/eloq_data_store_service/eloq_store_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,11 @@ struct EloqStoreConfig
std::vector<std::string> &storage_path_vector);

::eloqstore::KvOptions eloqstore_configs_{};

// Branch name passed to EloqStore::Start(). Populated from
// FLAGS_eloq_dss_branch_name by the caller before the factory is
// constructed. Defaults to "main" so that code paths that never set this
// field (e.g. unit tests) still work.
std::string branch_name_{"main"};
};
} // namespace EloqDS
93 changes: 73 additions & 20 deletions store_handler/eloq_data_store_service/eloq_store_data_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
#include <bthread/mutex.h>

#include <algorithm>
#include <cassert>
#include <filesystem>
#include <memory>

#include "common.h"
#include "eloq_store_data_store_factory.h"
#include "internal_request.h"

Expand Down Expand Up @@ -91,6 +94,7 @@ EloqStoreDataStore::EloqStoreDataStore(uint32_t shard_id,
assert(factory != nullptr);
::eloqstore::KvOptions opts =
factory->eloq_store_configs_.eloqstore_configs_;
branch_name_ = factory->eloq_store_configs_.branch_name_;
DLOG(INFO) << "Create EloqStore storage with workers: " << opts.num_threads
<< ", store path: " << opts.store_path.front()
<< ", open files limit: " << opts.fd_limit
Expand Down Expand Up @@ -589,30 +593,79 @@ void EloqStoreDataStore::CreateSnapshotForBackup(
{
PoolableGuard req_guard(req);

::eloqstore::GlobalArchiveRequest global_archive_req;
global_archive_req.SetSnapshotTimestamp(req->GetBackupTs());
eloq_store_service_->ExecSync(&global_archive_req);
std::string_view backup_name = req->GetBackupName();
assert(req->GetBackupTs() != 0);

::EloqDS::remote::DataStoreError ds_error;
std::string error_msg;
switch (global_archive_req.Error())
if (backup_name.empty() || backup_name == eloq_store_service_->Branch())
{
case ::eloqstore::KvError::NoError:
ds_error = ::EloqDS::remote::DataStoreError::NO_ERROR;
break;
case ::eloqstore::KvError::NotRunning:
ds_error = ::EloqDS::remote::DataStoreError::DB_NOT_OPEN;
error_msg = "EloqStore not running";
break;
default:
ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR;
error_msg =
"Snapshot failed with error code: " +
std::to_string(static_cast<int>(global_archive_req.Error()));
break;
// If backup_name is empty or matches the current branch, create
// snapshot for current branch.
::eloqstore::GlobalArchiveRequest global_archive_req;
global_archive_req.SetSnapshotTimestamp(req->GetBackupTs());
eloq_store_service_->ExecSync(&global_archive_req);

::EloqDS::remote::DataStoreError ds_error;
std::string error_msg;
switch (global_archive_req.Error())
{
case ::eloqstore::KvError::NoError:
ds_error = ::EloqDS::remote::DataStoreError::NO_ERROR;
req->AddBackupFile(
::eloqstore::BranchArchiveName(eloq_store_service_->Branch(),
eloq_store_service_->Term(),
req->GetBackupTs()));
break;
case ::eloqstore::KvError::NotRunning:
ds_error = ::EloqDS::remote::DataStoreError::DB_NOT_OPEN;
error_msg = "EloqStore not running";
break;
default:
ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR;
error_msg =
"Snapshot failed with error code: " +
std::to_string(static_cast<int>(global_archive_req.Error()));
break;
}

req->SetFinish(ds_error, error_msg);
}
else
{
// backup_name differs from the current branch — create a new branch
// forked from the current branch. Use backup_ts as the salt so the
// internal filename is deterministic and correlated with the backup
// timestamp.
::eloqstore::GlobalCreateBranchRequest create_branch_req;
create_branch_req.SetArgs(std::string(backup_name));
create_branch_req.SetSaltTimestamp(req->GetBackupTs());
eloq_store_service_->ExecSync(&create_branch_req);

::EloqDS::remote::DataStoreError ds_error;
std::string error_msg;
switch (create_branch_req.Error())
{
case ::eloqstore::KvError::NoError:
ds_error = ::EloqDS::remote::DataStoreError::NO_ERROR;
req->AddBackupFile(create_branch_req.result_branch);
break;
case ::eloqstore::KvError::NotRunning:
ds_error = ::EloqDS::remote::DataStoreError::DB_NOT_OPEN;
error_msg = "EloqStore not running";
break;
case ::eloqstore::KvError::InvalidArgs:
ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR;
error_msg = "Invalid branch name: " + std::string(backup_name);
break;
default:
ds_error = ::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR;
error_msg =
"Create branch failed with error code: " +
std::to_string(static_cast<int>(create_branch_req.Error()));
break;
}

req->SetFinish(ds_error, error_msg);
req->SetFinish(ds_error, error_msg);
}
}

void EloqStoreDataStore::ScanDelete(DeleteRangeRequest *delete_range_req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ class EloqStoreDataStore : public DataStore

bool StartDB(int64_t term) override
{
::eloqstore::KvError res = eloq_store_service_->Start(term);
::eloqstore::KvError res =
eloq_store_service_->Start(branch_name_, term);
if (res != ::eloqstore::KvError::NoError)
{
LOG(ERROR) << "EloqStore start failed with error code: "
Expand Down Expand Up @@ -270,5 +271,9 @@ class EloqStoreDataStore : public DataStore
void Floor(ScanRequest *scan_req);

std::unique_ptr<::eloqstore::EloqStore> eloq_store_service_{nullptr};

// Branch name passed to EloqStore::Start(). Populated from the factory
// config in the EloqStoreDataStore constructor.
std::string branch_name_{"main"};
};
} // namespace EloqDS
2 changes: 1 addition & 1 deletion store_handler/eloq_data_store_service/eloqstore
Submodule eloqstore updated 59 files
+3 −8 .github/workflows/ci.yml
+1 −0 .gitignore
+7 −0 CMakeLists.txt
+1 −1 benchmark/eloq_store_bm.cc
+4 −1 benchmark/load_bench.cpp
+4 −1 benchmark/simple_bench.cpp
+1 −1 benchmark/simple_test.cpp
+4 −1 db_stress/concurrent_test.cpp
+1 −1 db_stress/db_stress_driver.cpp
+1 −1 db_stress/test_client.cpp
+1 −1 examples/basic_example.cpp
+187 −59 include/async_io_manager.h
+534 −102 include/common.h
+118 −18 include/eloq_store.h
+4 −0 include/error.h
+38 −18 include/file_gc.h
+1 −1 include/replayer.h
+13 −8 include/storage/root_meta.h
+4 −0 include/tasks/background_write.h
+1 −0 include/tasks/prewarm_task.h
+0 −4 include/tasks/write_task.h
+0 −1 include/test_utils.h
+41 −0 include/types.h
+3 −0 rust/eloqstore-sys/src/error.rs
+1 −0 rust/eloqstore-sys/src/lib.rs
+1 −0 rust/eloqstore-sys/vendor/ffi/include/eloqstore_capi.h
+4 −1 rust/eloqstore-sys/vendor/ffi/src/eloqstore_capi.cpp
+4 −0 rust/eloqstore/src/error.rs
+5 −158 scripts/install_dependency_ubuntu2404.sh
+868 −176 src/async_io_manager.cpp
+201 −2 src/eloq_store.cpp
+450 −171 src/file_gc.cpp
+50 −33 src/replayer.cpp
+76 −13 src/storage/index_page_manager.cpp
+3 −0 src/storage/object_store.cpp
+21 −15 src/storage/root_meta.cpp
+45 −12 src/storage/shard.cpp
+149 −16 src/tasks/background_write.cpp
+17 −7 src/tasks/prewarm_task.cpp
+34 −23 src/tasks/write_task.cpp
+12 −7 src/test_utils.cpp
+4 −3 tests/CMakeLists.txt
+853 −0 tests/branch_filename_parsing.cpp
+316 −0 tests/branch_gc.cpp
+1,306 −0 tests/branch_operations.cpp
+4 −26 tests/chore.cpp
+99 −68 tests/cloud.cpp
+10 −5 tests/cloud_term.cpp
+5 −1 tests/common.cpp
+162 −38 tests/common.h
+8 −8 tests/eloq_store_test.cpp
+0 −67 tests/fileid_term_mapping.cpp
+189 −162 tests/filename_parsing.cpp
+14 −7 tests/gc.cpp
+17 −7 tests/manifest.cpp
+253 −37 tests/manifest_payload.cpp
+9 −5 tests/persist.cpp
+222 −65 tests/replayer_term.cpp
+2 −2 tests/test_unit_coverage.sh
13 changes: 11 additions & 2 deletions store_handler/eloq_data_store_service/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ DEFINE_string(eloq_dss_peer_node,
"Data store peer node address. Used to get cluster topology if "
"data_store_config_file is not provided.");

DEFINE_string(eloq_dss_branch_name, "development", "Data store branch name.");
DEFINE_string(eloq_dss_branch_name, "main", "Data store branch name.");

DEFINE_string(ip, "127.0.0.1", "Server IP");
DEFINE_int32(port, 9100, "Server Port");
Expand Down Expand Up @@ -303,7 +303,11 @@ int main(int argc, char *argv[])
// INIReader config_reader(nullptr, 0);
EloqDS::RocksDBConfig rocksdb_config(config_reader, data_path);
EloqDS::RocksDBCloudConfig rocksdb_cloud_config(config_reader);
rocksdb_cloud_config.branch_name_ = FLAGS_eloq_dss_branch_name;
rocksdb_cloud_config.branch_name_ =
!CheckCommandLineFlagIsDefault("eloq_dss_branch_name")
? FLAGS_eloq_dss_branch_name
: config_reader.GetString(
"store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name);
auto ds_factory = std::make_unique<EloqDS::RocksDBCloudDataStoreFactory>(
rocksdb_config, rocksdb_cloud_config, enable_cache_replacement_);

Expand All @@ -327,6 +331,11 @@ int main(int argc, char *argv[])
uint32_t unused_core_number = 0;
EloqDS::EloqStoreConfig eloq_store_config(
config_reader, data_path, mem_mib, unused_core_number, true);
eloq_store_config.branch_name_ =
!CheckCommandLineFlagIsDefault("eloq_dss_branch_name")
? FLAGS_eloq_dss_branch_name
: config_reader.GetString(
"store", "eloq_dss_branch_name", FLAGS_eloq_dss_branch_name);

#ifdef ELOQ_MODULE_ENABLED
GFLAGS_NAMESPACE::SetCommandLineOption(
Expand Down
19 changes: 19 additions & 0 deletions store_handler/eloq_data_store_service/rocksdb_cloud_data_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,17 @@ bool RocksDBCloudDataStore::OpenCloudDB(
// Disable auto compactions before blocking purger
options.disable_auto_compactions = true;

LOG(INFO) << "Open RocksDB Cloud with options create_if_missing: "
<< create_db_if_missing_ << ", db_path: " << db_path_
<< ", bucket_name: " << cfs_options_.src_bucket.GetBucketName()
<< ", bucket_prefix: "
<< cfs_options_.src_bucket.GetBucketPrefix()
<< ", object_path: " << cfs_options_.src_bucket.GetObjectPath()
<< ", endpoint: "
<< (cloud_config_.s3_endpoint_url_.empty()
? "default"
: cloud_config_.s3_endpoint_url_);

auto start = std::chrono::steady_clock::now();
std::unique_lock<std::shared_mutex> db_lk(db_mux_);
rocksdb::Status status;
Expand All @@ -694,6 +705,14 @@ bool RocksDBCloudDataStore::OpenCloudDB(
bthread_usleep(retry_num * 200000);
}

if (!status.ok())
{
LOG(ERROR) << "Unable to open db at path " << storage_path_
<< " with bucket " << cfs_options.src_bucket.GetBucketName()
<< " with error: " << status.ToString();
return false;
}

auto end = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
Expand Down
3 changes: 2 additions & 1 deletion tx_service/src/remote/cc_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1910,7 +1910,8 @@ void CcNodeService::CreateBackup(
assert(store_hd != nullptr);
if (store_hd && !request->backup_name().empty())
{
assert(!request->dest_path().empty());
// dest_path may be empty for cloud/shared-storage mode where the
// snapshot is written directly to cloud (no rsync transfer needed).
auto st = store::SnapshotManager::Instance().CreateBackup(request);
response->set_status(st);
if (st == BackupTaskStatus::Finished)
Expand Down
10 changes: 9 additions & 1 deletion tx_service/src/store/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,15 @@ txservice::remote::BackupTaskStatus SnapshotManager::CreateBackup(
auto backup_it = ng_it->second.find(backup_name);
if (backup_it != ng_it->second.end())
{
assert(false);
// Idempotency: if the backup already exists and is finished,
// return success. This allows retrying CreateBackup after a
// previous successful completion.
if (backup_it->second.status() ==
txservice::remote::BackupTaskStatus::Finished)
{
return txservice::remote::BackupTaskStatus::Finished;
}
// If backup exists but is not finished, it's an error
return txservice::remote::BackupTaskStatus::Failed;
}
}
Expand Down
Loading