Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.54"

version = "6.4.55"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
14 changes: 13 additions & 1 deletion src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_pa

repl_req_ptr_t rreq = lsn_to_req(lsn);
RD_LOGD("Raft channel: Precommit rreq=[{}]", rreq->to_compact_string());
// Fixme: Check return value of on_pre_commit
m_rd.m_listener->on_pre_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq);

return m_success_ptr;
Expand All @@ -195,7 +196,7 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
// This is the time to ensure flushing of journal happens in the proposer
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
}

// Fixme: Check return value of handle_commit
m_rd.handle_commit(rreq);

return m_success_ptr;
Expand All @@ -206,6 +207,17 @@ void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_pt
// TODO:add more logic here if necessary
}

void RaftStateMachine::rollback_ext(const nuraft::state_machine::ext_op_params& params) {
int64_t lsn = s_cast< int64_t >(params.log_idx);
RD_LOGD("Raft channel: Received rollback message lsn {}, store {}, logdev {}", lsn,
m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id());
repl_req_ptr_t rreq = lsn_to_req(lsn);
RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq");
RD_LOGD("Raft channel: rollback rreq=[{}]", rreq->to_compact_string());
// Fixme: Check return value of on_rollback
m_rd.m_listener->on_rollback(rreq->lsn(), rreq->header(), rreq->key(), rreq);
}

void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) {
for (auto [key, rreq] : m_lsn_req_map) {
cb(key, rreq);
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class RaftStateMachine : public nuraft::state_machine {
raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override;
void commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) override;
void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }
void rollback_ext(const nuraft::state_machine::ext_op_params& params) override;
void become_ready();

void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override;
Expand Down
14 changes: 13 additions & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ class HSTestHelper {
// Fake restart, device list is unchanged.
shutdown_homestore(false);
std::this_thread::sleep_for(std::chrono::seconds{shutdown_delay_sec});
} else if (!m_token.devs_.empty()) {
Copy link
Contributor

@yamingk yamingk Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we have three use cases, priority-wise:

  1. UT specificly passed dev_list, if not there,
  2. --device_list from input, if not there,
  3. m_token.name_ which equeals to test_binary, name.

Can we put it as in comment at line: 178 (the start_homestore api), so that it can be friendly to us after a few month when we look back and don't need to scratch our head wondering why?

// For those UTs already process device list, like test_raft_repl_dev.
LOGINFO("Using passed in dev_list: {}",
std::accumulate(m_token.devs_.begin(), m_token.devs_.end(), std::string(""),
[](const std::string& s, const homestore::dev_info& dinfo) {
return s.empty() ? dinfo.dev_name : s + "," + dinfo.dev_name;
}));

} else if (SISL_OPTIONS.count("device_list")) {
// User has provided explicit device list, use that and initialize them
auto const devs = SISL_OPTIONS["device_list"].as< std::vector< std::string > >();
Expand Down Expand Up @@ -440,6 +448,9 @@ class HSTestHelper {
set_min_chunk_size(m_token.svc_params_[HS_SERVICE::LOG].min_chunk_size);
}

// in UT we assume first drive is FAST, rest are DATA.
bool has_data_drive = m_token.devs_.size() > 1;

if (need_format) {
auto svc_params = m_token.svc_params_;
hsi->format_and_start(
Expand All @@ -460,7 +471,8 @@ class HSTestHelper {
{HS_SERVICE::INDEX,
{.dev_type = homestore::HSDevType::Fast, .size_pct = svc_params[HS_SERVICE::INDEX].size_pct}},
{HS_SERVICE::REPLICATION,
{.size_pct = svc_params[HS_SERVICE::REPLICATION].size_pct,
{.dev_type = has_data_drive ? homestore::HSDevType::Data : homestore::HSDevType::Fast,
.size_pct = svc_params[HS_SERVICE::REPLICATION].size_pct,
.alloc_type = svc_params[HS_SERVICE::REPLICATION].blkalloc_type,
.chunk_sel_type = svc_params[HS_SERVICE::REPLICATION].custom_chunk_selector
? chunk_selector_type_t::CUSTOM
Expand Down
5 changes: 3 additions & 2 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class HSReplTestHelper : public HSTestHelper {
void setup() {
replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >();
sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_));
sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v");
sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%n] [%t] %v");
auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();

boost::uuids::string_generator gen;
Expand Down Expand Up @@ -168,7 +168,8 @@ class HSReplTestHelper : public HSTestHelper {
}
}
for (auto const& dev : rdev_list[replica_num_]) {
dev_list_.emplace_back(dev, homestore::HSDevType::Data);
dev_list_.emplace_back(dev,
dev_list_.empty() ? homestore::HSDevType::Fast : homestore::HSDevType::Data);
}
}

Expand Down
70 changes: 59 additions & 11 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {
void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) override {
LOGINFOMOD(replication, "[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn);
{
std::unique_lock lk(db_mtx_);
rollback_count_++;
}
// continue the test
if (ctx->is_proposer()) { g_helper->runner().next_task(); }
}

void on_restart() {
Expand Down Expand Up @@ -364,6 +370,11 @@ class TestReplicatedDB : public homestore::ReplDevListener {
return commit_count_;
}

uint64_t db_rollback_count() const {
std::shared_lock lk(db_mtx_);
return rollback_count_;
}

uint64_t db_size() const {
std::shared_lock lk(db_mtx_);
return inmem_db_.size();
Expand Down Expand Up @@ -391,6 +402,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
std::map< Key, Value > inmem_db_;
std::map< int64_t, Value > lsn_index_;
uint64_t commit_count_{0};
uint64_t rollback_count_{0};
std::shared_mutex db_mtx_;
std::shared_ptr< snapshot_context > m_last_snapshot{nullptr};
std::mutex m_snapshot_lock;
Expand Down Expand Up @@ -449,6 +461,22 @@ class RaftReplDevTest : public testing::Test {

void wait_for_all_commits() { wait_for_commits(written_entries_); }

uint64_t total_committed_cnt() {
uint64_t total_writes{0};
for (auto const& db : dbs_) {
total_writes += db->db_commit_count();
}
return total_writes;
}

uint64_t total_rollback_cnt() {
uint64_t total_rollback{0};
for (auto const& db : dbs_) {
total_rollback += db->db_rollback_count();
}
return total_rollback;
}

void wait_for_commits(uint64_t exp_writes) {
uint64_t total_writes{0};
while (true) {
Expand Down Expand Up @@ -522,27 +550,47 @@ class RaftReplDevTest : public testing::Test {
LOGINFO("Waiting for leader to be elected");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
} else if (leader_uuid == g_helper->my_replica_id()) {
LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries,
boost::uuids::to_string(g_helper->my_replica_id()));
// LEADER ROLE
auto batch_size = wait_for_commit ? g_helper->runner().qdepth_ * 10 : num_entries;
// cap batch_size but should be larger than QD.
// It is possible after leader switch the writes run on previous leader will fail
// so we need to do more IOs to have num_entries committed.
if (batch_size > num_entries - written_entries_)
batch_size = std::max(num_entries - written_entries_, g_helper->runner().qdepth_);
LOGINFO("Writing {} entries since I am the leader my_uuid={}, target_total {}, written {}", batch_size,
boost::uuids::to_string(g_helper->my_replica_id()), num_entries, written_entries_);
auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
g_helper->runner().set_num_tasks(num_entries);

g_helper->runner().set_num_tasks(batch_size);
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size, db]() {
static std::normal_distribution<> num_blks_gen{3.0, 2.0};
this->generate_writes(std::abs(std::lround(num_blks_gen(g_re))) * block_size, block_size, db);
});
if (wait_for_commit) { g_helper->runner().execute().get(); }
break;
written_entries_ += batch_size;
if (wait_for_commit) {
g_helper->runner().execute().get();
if (total_committed_cnt() >= num_entries) { break; }
} else {
if (written_entries_ >= num_entries) { break; }
}
} else {
LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries,
// FOLLOWER ROLE
LOGINFO("{} entries are expected to be written on the leader_uuid={}, my_uuid={}", num_entries,
boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id()));
break;
if (wait_for_commit) {
LOGINFO("{} entries are expected to be written, now I committed {}, my_uuid={}", num_entries,
total_committed_cnt(), boost::uuids::to_string(leader_uuid),
boost::uuids::to_string(g_helper->my_replica_id()));
if (total_committed_cnt() >= num_entries) { break; }
std::this_thread::sleep_for(std::chrono::milliseconds{5000});
} else {
break;
}
}
} while (true);

written_entries_ += num_entries;
if (wait_for_commit) { this->wait_for_all_commits(); }
LOGINFO("my_uuid={}, {} entries are expected to be written, I wrote {}, committed {}, rollback {}",
boost::uuids::to_string(g_helper->my_replica_id()), num_entries, written_entries_,
total_committed_cnt(), total_rollback_cnt());
}

void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) {
Expand Down