From 201260f9218f12163ca4dfac56f30fbaea3b555f Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 5 Sep 2024 21:20:57 +0800 Subject: [PATCH 1/4] a Signed-off-by: Calvin Neo --- components/raftstore/src/store/fsm/peer.rs | 5 +++ components/raftstore/src/store/peer.rs | 2 + .../src/core/forward_raft/command.rs | 14 ++++++ .../engine_store_ffi/src/core/forwarder.rs | 7 ++- .../src/mock_cluster/v1/cluster.rs | 5 +++ .../src/mock_cluster/v1/node.rs | 8 +++- .../src/mock_cluster/v1/server.rs | 5 +++ proxy_tests/proxy/shared/region.rs | 43 +++++++++++++++++++ 8 files changed, 86 insertions(+), 3 deletions(-) diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 9abc1c39945..eeae35015e7 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -6589,10 +6589,12 @@ where } fn on_check_peer_stale_state_tick(&mut self) { + info!("!!!!!! on_check_peer_stale_state_tick 1 {:?}", self.ctx.cfg.max_leader_missing_duration); if self.fsm.peer.pending_remove { return; } + info!("!!!!!! on_check_peer_stale_state_tick 2"); self.register_check_peer_stale_state_tick(); if self.fsm.peer.is_handling_snapshot() || self.fsm.peer.has_pending_snapshot() { @@ -6615,6 +6617,7 @@ where } } + info!("!!!!!! on_check_peer_stale_state_tick 4"); if let Some(ForceLeaderState::ForceLeader { time, .. }) = self.fsm.peer.force_leader { // Clean up the force leader state after a timeout, since the PD recovery // process may have been aborted for some reasons. @@ -6650,6 +6653,7 @@ where } } + info!("!!!!!! on_check_peer_stale_state_tick 5"); // If this peer detects the leader is missing for a long long time, // it should consider itself as a stale peer which is removed from // the original cluster. @@ -6668,6 +6672,7 @@ where // already. let state = self.fsm.peer.check_stale_state(self.ctx); fail_point!("peer_check_stale_state", state != StaleState::Valid, |_| {}); + info!("!!!!!! on_check_peer_stale_state_tick 6 {:?}", state); match state { StaleState::Valid => (), StaleState::LeaderMissing | StaleState::MaybeLeaderMissing => { diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index ca702525d15..78083b26a08 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -2208,6 +2208,7 @@ where } pub fn check_stale_state(&mut self, ctx: &mut PollContext) -> StaleState { + info!("!!!!!! check_stale_state 0 {:?}", self.leader_missing_time); if self.is_leader() { // Leaders always have valid state. // @@ -2222,6 +2223,7 @@ where // If we are checking this it means we suspect the leader might be missing. // Mark down the time when we are called, so we can check later if it's been // longer than it should be. + info!("!!!!!! check_stale_state 1 {:?}", self.leader_missing_time); match self.leader_missing_time { None => { self.leader_missing_time = Instant::now().into(); diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs index 5210a49bfdc..a0c198afcd2 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs @@ -498,6 +498,20 @@ impl ProxyForwarder { && msg.region_id == 1, |_| unreachable!() ); + let record_all_messages = (|| { + fail::fail_point!("proxy_record_all_messages", |t| { + let t = t.unwrap().parse::().unwrap(); + t + }); + 0 + })(); + #[cfg(any(test, feature = "testexport"))] + if record_all_messages == 1 { + info!("!!!!! msgmgmgmmg {:?}", msg); + if msg.get_is_tombstone() { + self.debug_struct.gc_message_count.as_ref().fetch_add(1, Ordering::SeqCst); + } + } !self.maybe_fast_path_tick(msg) } } diff --git a/proxy_components/engine_store_ffi/src/core/forwarder.rs b/proxy_components/engine_store_ffi/src/core/forwarder.rs index 84d5bbc9616..c7ba6d04fb1 100644 --- a/proxy_components/engine_store_ffi/src/core/forwarder.rs +++ b/proxy_components/engine_store_ffi/src/core/forwarder.rs @@ -1,6 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use encryption::DataKeyManager; +use std::sync::atomic::AtomicU64; use crate::core::common::*; @@ -43,8 +44,10 @@ pub struct PackedEnvs { pub snap_handle_pool_size: usize, } -#[derive(Debug, Default)] -pub struct DebugStruct {} +#[derive(Debug, Default, Clone)] +pub struct DebugStruct { + pub gc_message_count: Arc, +} impl DebugStruct {} diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index 62e03331a82..e810a8c0d42 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -48,6 +48,7 @@ use test_raftstore::{ new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config, new_transfer_leader_cmd, sleep_ms, }; +use engine_store_ffi::core::DebugStruct; use tikv::server::Result as ServerResult; use tikv_util::{ debug, error, safe_panic, @@ -70,6 +71,7 @@ use super::{ // E,g, for node 1, the node id and store id are both 1. pub trait Simulator { + fn get_debug_struct(&self) -> DebugStruct; // Pass 0 to let pd allocate a node id if db is empty. // If node id > 0, the node must be created in db already, // and the node id must be the same as given argument. @@ -183,6 +185,9 @@ impl> std::panic::UnwindSafe for Cluster {} // Copied or modified from test_raftstore impl> Cluster { + pub fn get_debug_struct(&self) -> DebugStruct { + self.sim.as_ref().read().expect("").get_debug_struct() + } pub fn new( id: u64, count: usize, diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs index 850ad9a0903..825be2ebd53 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs @@ -195,6 +195,7 @@ pub struct NodeCluster { #[allow(clippy::type_complexity)] post_create_coprocessor_host: Option)>>, pub importer: Option>>, + debug_struct: DebugStruct, } impl std::panic::UnwindSafe for NodeCluster {} @@ -211,6 +212,7 @@ impl NodeCluster { concurrency_managers: HashMap::default(), post_create_coprocessor_host: None, importer: None, + debug_struct: DebugStruct::default(), } } } @@ -260,6 +262,10 @@ impl NodeCluster { } impl Simulator for NodeCluster { + fn get_debug_struct(&self) -> DebugStruct { + self.debug_struct.clone() + } + fn run_node( &mut self, node_id: u64, @@ -356,7 +362,7 @@ impl Simulator for NodeCluster { simulate_trans.clone(), snap_mgr.clone(), packed_envs, - DebugStruct::default(), + self.debug_struct.clone(), key_mgr_cloned, ); tiflash_ob.register_to(&mut coprocessor_host); diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index cf7f19bfe37..e6e9e1711f8 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -161,6 +161,7 @@ pub struct ServerCluster { concurrency_managers: HashMap, env: Arc, pub causal_ts_providers: HashMap>, + debug_struct: DebugStruct, } impl ServerCluster { @@ -205,6 +206,7 @@ impl ServerCluster { env, txn_extra_schedulers: HashMap::default(), causal_ts_providers: HashMap::default(), + debug_struct: DebugStruct::default(), } } @@ -668,6 +670,9 @@ impl ServerCluster { } impl Simulator for ServerCluster { + fn get_debug_struct(&self) -> DebugStruct { + self.debug_struct.clone() + } fn run_node( &mut self, node_id: u64, diff --git a/proxy_tests/proxy/shared/region.rs b/proxy_tests/proxy/shared/region.rs index 9f3795524a8..34dc0c91138 100644 --- a/proxy_tests/proxy/shared/region.rs +++ b/proxy_tests/proxy/shared/region.rs @@ -1,3 +1,5 @@ +use fail::fail_point; + // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use crate::utils::v1::*; @@ -142,3 +144,44 @@ fn test_get_region_local_state() { cluster.shutdown(); } + + +#[test] +fn test_stale_peer() { + let (mut cluster, pd_client) = new_mock_cluster(0, 2); + + cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(6); + + pd_client.disable_default_operator(); + disable_auto_gen_compact_log(&mut cluster); + // Otherwise will panic with `assert_eq!(apply_state, last_applied_state)`. + fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); + // Set region and peers + let r1 = cluster.run_conf_change(); + + let p2 = new_learner_peer(2, 2); + pd_client.must_add_peer(r1, p2.clone()); + cluster.must_put(b"k0", b"v"); + check_key(&cluster, b"k0", b"v", Some(true), None, Some(vec![1, 2])); + + + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 2).direction(Direction::Both), + )); + + cluster.must_put(b"k1", b"v"); + cluster.must_put(b"k2", b"v"); + check_key(&cluster, b"k2", b"v", Some(true), None, Some(vec![1])); + check_key(&cluster, b"k2", b"v", Some(false), None, Some(vec![2])); + + pd_client.must_remove_peer(1, p2.clone()); + std::thread::sleep(std::time::Duration::from_millis(1500)); + + fail::cfg("proxy_record_all_messages", "return(1)"); + cluster.clear_send_filters(); + + std::thread::sleep(std::time::Duration::from_millis(1500)); + check_key(&cluster, b"k2", b"v", Some(false), None, Some(vec![2])); + std::thread::sleep(std::time::Duration::from_millis(6000)); + assert_ne!(cluster.get_debug_struct().gc_message_count.as_ref().load(Ordering::SeqCst), 0); +} \ No newline at end of file From 3c18cbb2b0a38ea7454cf91ab633e9b17c22e723 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 5 Sep 2024 21:22:14 +0800 Subject: [PATCH 2/4] a Signed-off-by: Calvin Neo --- components/raftstore/src/store/fsm/peer.rs | 5 ----- components/raftstore/src/store/peer.rs | 2 -- .../src/core/forward_raft/command.rs | 6 ++++-- .../engine_store_ffi/src/core/forwarder.rs | 3 ++- .../src/mock_cluster/v1/cluster.rs | 2 +- proxy_tests/proxy/shared/region.rs | 13 +++++++++---- 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index eeae35015e7..9abc1c39945 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -6589,12 +6589,10 @@ where } fn on_check_peer_stale_state_tick(&mut self) { - info!("!!!!!! on_check_peer_stale_state_tick 1 {:?}", self.ctx.cfg.max_leader_missing_duration); if self.fsm.peer.pending_remove { return; } - info!("!!!!!! on_check_peer_stale_state_tick 2"); self.register_check_peer_stale_state_tick(); if self.fsm.peer.is_handling_snapshot() || self.fsm.peer.has_pending_snapshot() { @@ -6617,7 +6615,6 @@ where } } - info!("!!!!!! on_check_peer_stale_state_tick 4"); if let Some(ForceLeaderState::ForceLeader { time, .. }) = self.fsm.peer.force_leader { // Clean up the force leader state after a timeout, since the PD recovery // process may have been aborted for some reasons. @@ -6653,7 +6650,6 @@ where } } - info!("!!!!!! on_check_peer_stale_state_tick 5"); // If this peer detects the leader is missing for a long long time, // it should consider itself as a stale peer which is removed from // the original cluster. @@ -6672,7 +6668,6 @@ where // already. let state = self.fsm.peer.check_stale_state(self.ctx); fail_point!("peer_check_stale_state", state != StaleState::Valid, |_| {}); - info!("!!!!!! on_check_peer_stale_state_tick 6 {:?}", state); match state { StaleState::Valid => (), StaleState::LeaderMissing | StaleState::MaybeLeaderMissing => { diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 78083b26a08..ca702525d15 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -2208,7 +2208,6 @@ where } pub fn check_stale_state(&mut self, ctx: &mut PollContext) -> StaleState { - info!("!!!!!! check_stale_state 0 {:?}", self.leader_missing_time); if self.is_leader() { // Leaders always have valid state. // @@ -2223,7 +2222,6 @@ where // If we are checking this it means we suspect the leader might be missing. // Mark down the time when we are called, so we can check later if it's been // longer than it should be. - info!("!!!!!! check_stale_state 1 {:?}", self.leader_missing_time); match self.leader_missing_time { None => { self.leader_missing_time = Instant::now().into(); diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs index a0c198afcd2..45f8ee97dda 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs @@ -507,9 +507,11 @@ impl ProxyForwarder { })(); #[cfg(any(test, feature = "testexport"))] if record_all_messages == 1 { - info!("!!!!! msgmgmgmmg {:?}", msg); if msg.get_is_tombstone() { - self.debug_struct.gc_message_count.as_ref().fetch_add(1, Ordering::SeqCst); + self.debug_struct + .gc_message_count + .as_ref() + .fetch_add(1, Ordering::SeqCst); } } !self.maybe_fast_path_tick(msg) diff --git a/proxy_components/engine_store_ffi/src/core/forwarder.rs b/proxy_components/engine_store_ffi/src/core/forwarder.rs index c7ba6d04fb1..96bcb19c476 100644 --- a/proxy_components/engine_store_ffi/src/core/forwarder.rs +++ b/proxy_components/engine_store_ffi/src/core/forwarder.rs @@ -1,8 +1,9 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use encryption::DataKeyManager; use std::sync::atomic::AtomicU64; +use encryption::DataKeyManager; + use crate::core::common::*; pub struct PtrWrapper(pub RawCppPtr); diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index e810a8c0d42..09151c4cf8a 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -10,6 +10,7 @@ use std::{ use collections::{HashMap, HashSet}; use encryption::DataKeyManager; +use engine_store_ffi::core::DebugStruct; use engine_traits::SnapshotContext; // mock cluster use engine_traits::{Engines, KvEngine, CF_DEFAULT}; @@ -48,7 +49,6 @@ use test_raftstore::{ new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config, new_transfer_leader_cmd, sleep_ms, }; -use engine_store_ffi::core::DebugStruct; use tikv::server::Result as ServerResult; use tikv_util::{ debug, error, safe_panic, diff --git a/proxy_tests/proxy/shared/region.rs b/proxy_tests/proxy/shared/region.rs index 34dc0c91138..052c7dbfc35 100644 --- a/proxy_tests/proxy/shared/region.rs +++ b/proxy_tests/proxy/shared/region.rs @@ -145,7 +145,6 @@ fn test_get_region_local_state() { cluster.shutdown(); } - #[test] fn test_stale_peer() { let (mut cluster, pd_client) = new_mock_cluster(0, 2); @@ -164,7 +163,6 @@ fn test_stale_peer() { cluster.must_put(b"k0", b"v"); check_key(&cluster, b"k0", b"v", Some(true), None, Some(vec![1, 2])); - cluster.add_send_filter(CloneFilterFactory( RegionPacketFilter::new(1, 2).direction(Direction::Both), )); @@ -183,5 +181,12 @@ fn test_stale_peer() { std::thread::sleep(std::time::Duration::from_millis(1500)); check_key(&cluster, b"k2", b"v", Some(false), None, Some(vec![2])); std::thread::sleep(std::time::Duration::from_millis(6000)); - assert_ne!(cluster.get_debug_struct().gc_message_count.as_ref().load(Ordering::SeqCst), 0); -} \ No newline at end of file + assert_ne!( + cluster + .get_debug_struct() + .gc_message_count + .as_ref() + .load(Ordering::SeqCst), + 0 + ); +} From c7794876001af47eb6bcc3044d7c5225af6482ec Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 5 Sep 2024 21:26:01 +0800 Subject: [PATCH 3/4] a Signed-off-by: Calvin Neo --- proxy_tests/proxy/shared/region.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/proxy_tests/proxy/shared/region.rs b/proxy_tests/proxy/shared/region.rs index 052c7dbfc35..cdac134d49a 100644 --- a/proxy_tests/proxy/shared/region.rs +++ b/proxy_tests/proxy/shared/region.rs @@ -145,11 +145,14 @@ fn test_get_region_local_state() { cluster.shutdown(); } +/// When a learner peer is removed by conf change, it will not wait until the conf change. +/// However, leader could stop sending raft messages to the peer at an earlier stage, +/// the peer is eventually removed by stale peer checking. #[test] fn test_stale_peer() { let (mut cluster, pd_client) = new_mock_cluster(0, 2); - cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(6); + cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(4); pd_client.disable_default_operator(); disable_auto_gen_compact_log(&mut cluster); @@ -173,14 +176,17 @@ fn test_stale_peer() { check_key(&cluster, b"k2", b"v", Some(false), None, Some(vec![2])); pd_client.must_remove_peer(1, p2.clone()); - std::thread::sleep(std::time::Duration::from_millis(1500)); + std::thread::sleep(std::time::Duration::from_millis(1000)); fail::cfg("proxy_record_all_messages", "return(1)"); cluster.clear_send_filters(); - std::thread::sleep(std::time::Duration::from_millis(1500)); + std::thread::sleep(std::time::Duration::from_millis(1000)); check_key(&cluster, b"k2", b"v", Some(false), None, Some(vec![2])); - std::thread::sleep(std::time::Duration::from_millis(6000)); + std::thread::sleep(std::time::Duration::from_millis(3000)); + // Must received a gc peer message. + /// "receives gc message, trying to remove" + /// "raft message is stale, tell to gc" assert_ne!( cluster .get_debug_struct() @@ -189,4 +195,5 @@ fn test_stale_peer() { .load(Ordering::SeqCst), 0 ); + fail::remove("proxy_record_all_messages") } From 21ff63a006968255a57cf5812ca4ab3f8682b0c9 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 5 Sep 2024 21:40:20 +0800 Subject: [PATCH 4/4] a Signed-off-by: Calvin Neo --- proxy_tests/proxy/shared/region.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/proxy_tests/proxy/shared/region.rs b/proxy_tests/proxy/shared/region.rs index cdac134d49a..fbaddd7183d 100644 --- a/proxy_tests/proxy/shared/region.rs +++ b/proxy_tests/proxy/shared/region.rs @@ -1,6 +1,5 @@ -use fail::fail_point; - // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use fail::fail_point; use crate::utils::v1::*; #[test]