diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs index 5210a49bfdc..45f8ee97dda 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs @@ -498,6 +498,22 @@ impl ProxyForwarder { && msg.region_id == 1, |_| unreachable!() ); + let record_all_messages = (|| { + fail::fail_point!("proxy_record_all_messages", |t| { + let t = t.unwrap().parse::().unwrap(); + t + }); + 0 + })(); + #[cfg(any(test, feature = "testexport"))] + if record_all_messages == 1 { + if msg.get_is_tombstone() { + self.debug_struct + .gc_message_count + .as_ref() + .fetch_add(1, Ordering::SeqCst); + } + } !self.maybe_fast_path_tick(msg) } } diff --git a/proxy_components/engine_store_ffi/src/core/forwarder.rs b/proxy_components/engine_store_ffi/src/core/forwarder.rs index 84d5bbc9616..96bcb19c476 100644 --- a/proxy_components/engine_store_ffi/src/core/forwarder.rs +++ b/proxy_components/engine_store_ffi/src/core/forwarder.rs @@ -1,5 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use std::sync::atomic::AtomicU64; + use encryption::DataKeyManager; use crate::core::common::*; @@ -43,8 +45,10 @@ pub struct PackedEnvs { pub snap_handle_pool_size: usize, } -#[derive(Debug, Default)] -pub struct DebugStruct {} +#[derive(Debug, Default, Clone)] +pub struct DebugStruct { + pub gc_message_count: Arc, +} impl DebugStruct {} diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index 62e03331a82..09151c4cf8a 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -10,6 +10,7 @@ use std::{ use collections::{HashMap, HashSet}; use encryption::DataKeyManager; +use engine_store_ffi::core::DebugStruct; use engine_traits::SnapshotContext; // mock cluster use engine_traits::{Engines, KvEngine, CF_DEFAULT}; @@ -70,6 +71,7 @@ use super::{ // E,g, for node 1, the node id and store id are both 1. pub trait Simulator { + fn get_debug_struct(&self) -> DebugStruct; // Pass 0 to let pd allocate a node id if db is empty. // If node id > 0, the node must be created in db already, // and the node id must be the same as given argument. @@ -183,6 +185,9 @@ impl> std::panic::UnwindSafe for Cluster {} // Copied or modified from test_raftstore impl> Cluster { + pub fn get_debug_struct(&self) -> DebugStruct { + self.sim.as_ref().read().expect("").get_debug_struct() + } pub fn new( id: u64, count: usize, diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs index 850ad9a0903..825be2ebd53 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs @@ -195,6 +195,7 @@ pub struct NodeCluster { #[allow(clippy::type_complexity)] post_create_coprocessor_host: Option)>>, pub importer: Option>>, + debug_struct: DebugStruct, } impl std::panic::UnwindSafe for NodeCluster {} @@ -211,6 +212,7 @@ impl NodeCluster { concurrency_managers: HashMap::default(), post_create_coprocessor_host: None, importer: None, + debug_struct: DebugStruct::default(), } } } @@ -260,6 +262,10 @@ impl NodeCluster { } impl Simulator for NodeCluster { + fn get_debug_struct(&self) -> DebugStruct { + self.debug_struct.clone() + } + fn run_node( &mut self, node_id: u64, @@ -356,7 +362,7 @@ impl Simulator for NodeCluster { simulate_trans.clone(), snap_mgr.clone(), packed_envs, - DebugStruct::default(), + self.debug_struct.clone(), key_mgr_cloned, ); tiflash_ob.register_to(&mut coprocessor_host); diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index cf7f19bfe37..e6e9e1711f8 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -161,6 +161,7 @@ pub struct ServerCluster { concurrency_managers: HashMap, env: Arc, pub causal_ts_providers: HashMap>, + debug_struct: DebugStruct, } impl ServerCluster { @@ -205,6 +206,7 @@ impl ServerCluster { env, txn_extra_schedulers: HashMap::default(), causal_ts_providers: HashMap::default(), + debug_struct: DebugStruct::default(), } } @@ -668,6 +670,9 @@ impl ServerCluster { } impl Simulator for ServerCluster { + fn get_debug_struct(&self) -> DebugStruct { + self.debug_struct.clone() + } fn run_node( &mut self, node_id: u64, diff --git a/proxy_tests/proxy/shared/region.rs b/proxy_tests/proxy/shared/region.rs index 9f3795524a8..fbaddd7183d 100644 --- a/proxy_tests/proxy/shared/region.rs +++ b/proxy_tests/proxy/shared/region.rs @@ -1,4 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use fail::fail_point; use crate::utils::v1::*; #[test] @@ -142,3 +143,56 @@ fn test_get_region_local_state() { cluster.shutdown(); } + +/// When a learner peer is removed by conf change, it will not wait until the conf change. +/// However, leader could stop sending raft messages to the peer at an earlier stage, +/// the peer is eventually removed by stale peer checking. +#[test] +fn test_stale_peer() { + let (mut cluster, pd_client) = new_mock_cluster(0, 2); + + cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(4); + + pd_client.disable_default_operator(); + disable_auto_gen_compact_log(&mut cluster); + // Otherwise will panic with `assert_eq!(apply_state, last_applied_state)`. + fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); + // Set region and peers + let r1 = cluster.run_conf_change(); + + let p2 = new_learner_peer(2, 2); + pd_client.must_add_peer(r1, p2.clone()); + cluster.must_put(b"k0", b"v"); + check_key(&cluster, b"k0", b"v", Some(true), None, Some(vec![1, 2])); + + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 2).direction(Direction::Both), + )); + + cluster.must_put(b"k1", b"v"); + cluster.must_put(b"k2", b"v"); + check_key(&cluster, b"k2", b"v", Some(true), None, Some(vec![1])); + check_key(&cluster, b"k2", b"v", Some(false), None, Some(vec![2])); + + pd_client.must_remove_peer(1, p2.clone()); + std::thread::sleep(std::time::Duration::from_millis(1000)); + + fail::cfg("proxy_record_all_messages", "return(1)"); + cluster.clear_send_filters(); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + check_key(&cluster, b"k2", b"v", Some(false), None, Some(vec![2])); + std::thread::sleep(std::time::Duration::from_millis(3000)); + // Must received a gc peer message. + /// "receives gc message, trying to remove" + /// "raft message is stale, tell to gc" + assert_ne!( + cluster + .get_debug_struct() + .gc_message_count + .as_ref() + .load(Ordering::SeqCst), + 0 + ); + fail::remove("proxy_record_all_messages") +}