From e07e29067d091b4955b8df5ba040a2e383f263aa Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Tue, 16 Jul 2024 09:52:41 -0700 Subject: [PATCH 1/3] Fix device list for test_raft_repl_dev. Previous code will add `ndevices` simulated drive into device list which make each replica go with one real drive and ndevices simulated drive. Those simulated drives are identified as FAST, meta/log were on them. Due to the very limited size of simulated drive, we can hit size limit in long running test. Fixing by honor input from hs_repl_test_common. After this fix, if only one drive passed in for a replica of test_raft_repl_dev, that drive will be used as FAST. All services will be started on that real drive. Signed-off-by: Xiaoxi Chen --- conanfile.py | 3 +-- src/tests/test_common/homestore_test_common.hpp | 14 +++++++++++++- src/tests/test_common/hs_repl_test_common.hpp | 3 ++- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/conanfile.py b/conanfile.py index 96d080591..51ffcd78b 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,8 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.4.54" - + version = "6.4.55" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 5fe6aef8a..38ef568b0 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -359,6 +359,14 @@ class HSTestHelper { // Fake restart, device list is unchanged. shutdown_homestore(false); std::this_thread::sleep_for(std::chrono::seconds{shutdown_delay_sec}); + } else if (!m_token.devs_.empty()) { + // For those UTs already process device list, like test_raft_repl_dev. + LOGINFO("Using passed in dev_list: {}", + std::accumulate(m_token.devs_.begin(), m_token.devs_.end(), std::string(""), + [](const std::string& s, const homestore::dev_info& dinfo) { + return s.empty() ? dinfo.dev_name : s + "," + dinfo.dev_name; + })); + } else if (SISL_OPTIONS.count("device_list")) { // User has provided explicit device list, use that and initialize them auto const devs = SISL_OPTIONS["device_list"].as< std::vector< std::string > >(); @@ -440,6 +448,9 @@ class HSTestHelper { set_min_chunk_size(m_token.svc_params_[HS_SERVICE::LOG].min_chunk_size); } + // in UT we assume first drive is FAST, rest are DATA. + bool has_data_drive = m_token.devs_.size() > 1; + if (need_format) { auto svc_params = m_token.svc_params_; hsi->format_and_start( @@ -460,7 +471,8 @@ class HSTestHelper { {HS_SERVICE::INDEX, {.dev_type = homestore::HSDevType::Fast, .size_pct = svc_params[HS_SERVICE::INDEX].size_pct}}, {HS_SERVICE::REPLICATION, - {.size_pct = svc_params[HS_SERVICE::REPLICATION].size_pct, + {.dev_type = has_data_drive ? homestore::HSDevType::Data : homestore::HSDevType::Fast, + .size_pct = svc_params[HS_SERVICE::REPLICATION].size_pct, .alloc_type = svc_params[HS_SERVICE::REPLICATION].blkalloc_type, .chunk_sel_type = svc_params[HS_SERVICE::REPLICATION].custom_chunk_selector ? chunk_selector_type_t::CUSTOM diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 67abe2f8e..584525178 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -168,7 +168,8 @@ class HSReplTestHelper : public HSTestHelper { } } for (auto const& dev : rdev_list[replica_num_]) { - dev_list_.emplace_back(dev, homestore::HSDevType::Data); + dev_list_.emplace_back(dev, + dev_list_.empty() ? homestore::HSDevType::Fast : homestore::HSDevType::Data); } } From 04da4e8c29fcd373eaf1bf5362f578cca44ed9cd Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Sat, 20 Jul 2024 22:37:40 +0800 Subject: [PATCH 2/3] Implement on_rollback() Signed-off-by: Xiaoxi Chen --- .../replication/repl_dev/raft_state_machine.cpp | 14 +++++++++++++- src/lib/replication/repl_dev/raft_state_machine.h | 1 + 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index ba30095ca..a89908059 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -179,6 +179,7 @@ raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_pa repl_req_ptr_t rreq = lsn_to_req(lsn); RD_LOGD("Raft channel: Precommit rreq=[{}]", rreq->to_compact_string()); + // Fixme: Check return value of on_pre_commit m_rd.m_listener->on_pre_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq); return m_success_ptr; @@ -195,7 +196,7 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params // This is the time to ensure flushing of journal happens in the proposer rreq->add_state(repl_req_state_t::LOG_FLUSHED); } - + // Fixme: Check return value of handle_commit m_rd.handle_commit(rreq); return m_success_ptr; @@ -206,6 +207,17 @@ void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_pt // TODO:add more logic here if necessary } +void RaftStateMachine::rollback_ext(const nuraft::state_machine::ext_op_params& params) { + int64_t lsn = s_cast< int64_t >(params.log_idx); + RD_LOGD("Raft channel: Received rollback message lsn {}, store {}, logdev {}", lsn, + m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id()); + repl_req_ptr_t rreq = lsn_to_req(lsn); + RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq"); + RD_LOGD("Raft channel: rollback rreq=[{}]", rreq->to_compact_string()); + // Fixme: Check return value of on_rollback + m_rd.m_listener->on_rollback(rreq->lsn(), rreq->header(), rreq->key(), rreq); +} + void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) { for (auto [key, rreq] : m_lsn_req_map) { cb(key, rreq); diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index b931e42f4..b664a5523 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -110,6 +110,7 @@ class RaftStateMachine : public nuraft::state_machine { raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override; void commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) override; void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); } + void rollback_ext(const nuraft::state_machine::ext_op_params& params) override; void become_ready(); void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override; From aa77d7121ce5beac8154ed55395f9971d3c5ccfb Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Sat, 20 Jul 2024 22:37:59 +0800 Subject: [PATCH 3/3] UT improvement When using the write_on_leader() for long running tests, it is very possbile a leader switch happens during the test. Previous implementation the new leader(prvious follower) will not able to aware the role change and pick up to do more write. Signed-off-by: Xiaoxi Chen --- src/tests/test_common/hs_repl_test_common.hpp | 2 +- src/tests/test_raft_repl_dev.cpp | 70 ++++++++++++++++--- 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 584525178..24c8985df 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -137,7 +137,7 @@ class HSReplTestHelper : public HSTestHelper { void setup() { replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >(); sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_)); - sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v"); + sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%n] [%t] %v"); auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); boost::uuids::string_generator gen; diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index cc55187db..42a0e3609 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -157,6 +157,12 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, cintrusive< repl_req_ctx >& ctx) override { LOGINFOMOD(replication, "[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn); + { + std::unique_lock lk(db_mtx_); + rollback_count_++; + } + // continue the test + if (ctx->is_proposer()) { g_helper->runner().next_task(); } } void on_restart() { @@ -364,6 +370,11 @@ class TestReplicatedDB : public homestore::ReplDevListener { return commit_count_; } + uint64_t db_rollback_count() const { + std::shared_lock lk(db_mtx_); + return rollback_count_; + } + uint64_t db_size() const { std::shared_lock lk(db_mtx_); return inmem_db_.size(); @@ -391,6 +402,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::map< Key, Value > inmem_db_; std::map< int64_t, Value > lsn_index_; uint64_t commit_count_{0}; + uint64_t rollback_count_{0}; std::shared_mutex db_mtx_; std::shared_ptr< snapshot_context > m_last_snapshot{nullptr}; std::mutex m_snapshot_lock; @@ -449,6 +461,22 @@ class RaftReplDevTest : public testing::Test { void wait_for_all_commits() { wait_for_commits(written_entries_); } + uint64_t total_committed_cnt() { + uint64_t total_writes{0}; + for (auto const& db : dbs_) { + total_writes += db->db_commit_count(); + } + return total_writes; + } + + uint64_t total_rollback_cnt() { + uint64_t total_rollback{0}; + for (auto const& db : dbs_) { + total_rollback += db->db_rollback_count(); + } + return total_rollback; + } + void wait_for_commits(uint64_t exp_writes) { uint64_t total_writes{0}; while (true) { @@ -522,27 +550,47 @@ class RaftReplDevTest : public testing::Test { LOGINFO("Waiting for leader to be elected"); std::this_thread::sleep_for(std::chrono::milliseconds{500}); } else if (leader_uuid == g_helper->my_replica_id()) { - LOGINFO("Writing {} entries since I am the leader my_uuid={}", num_entries, - boost::uuids::to_string(g_helper->my_replica_id())); + // LEADER ROLE + auto batch_size = wait_for_commit ? g_helper->runner().qdepth_ * 10 : num_entries; + // cap batch_size but should be larger than QD. + // It is possible after leader switch the writes run on previous leader will fail + // so we need to do more IOs to have num_entries committed. + if (batch_size > num_entries - written_entries_) + batch_size = std::max(num_entries - written_entries_, g_helper->runner().qdepth_); + LOGINFO("Writing {} entries since I am the leader my_uuid={}, target_total {}, written {}", batch_size, + boost::uuids::to_string(g_helper->my_replica_id()), num_entries, written_entries_); auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - g_helper->runner().set_num_tasks(num_entries); - + g_helper->runner().set_num_tasks(batch_size); LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); g_helper->runner().set_task([this, block_size, db]() { static std::normal_distribution<> num_blks_gen{3.0, 2.0}; this->generate_writes(std::abs(std::lround(num_blks_gen(g_re))) * block_size, block_size, db); }); - if (wait_for_commit) { g_helper->runner().execute().get(); } - break; + written_entries_ += batch_size; + if (wait_for_commit) { + g_helper->runner().execute().get(); + if (total_committed_cnt() >= num_entries) { break; } + } else { + if (written_entries_ >= num_entries) { break; } + } } else { - LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries, + // FOLLOWER ROLE + LOGINFO("{} entries are expected to be written on the leader_uuid={}, my_uuid={}", num_entries, boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id())); - break; + if (wait_for_commit) { + LOGINFO("{} entries are expected to be written, now I committed {}, my_uuid={}", num_entries, + total_committed_cnt(), boost::uuids::to_string(leader_uuid), + boost::uuids::to_string(g_helper->my_replica_id())); + if (total_committed_cnt() >= num_entries) { break; } + std::this_thread::sleep_for(std::chrono::milliseconds{5000}); + } else { + break; + } } } while (true); - - written_entries_ += num_entries; - if (wait_for_commit) { this->wait_for_all_commits(); } + LOGINFO("my_uuid={}, {} entries are expected to be written, I wrote {}, committed {}, rollback {}", + boost::uuids::to_string(g_helper->my_replica_id()), num_entries, written_entries_, + total_committed_cnt(), total_rollback_cnt()); } void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) {