diff --git a/proxy_components/engine_store_ffi/src/observer.rs b/proxy_components/engine_store_ffi/src/observer.rs index 5da31809092..b49d7be3326 100644 --- a/proxy_components/engine_store_ffi/src/observer.rs +++ b/proxy_components/engine_store_ffi/src/observer.rs @@ -1,5 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use encryption::DataKeyManager; use engine_traits::RaftEngine; @@ -32,12 +32,21 @@ const TIFLASH_OBSERVER_PRIORITY: u32 = 0; #[derive(Clone)] pub struct TiFlashObserver { - pub forwarder: ProxyForwarder, + pub forwarder: Arc>>>, +} + +impl Default for TiFlashObserver { + fn default() -> Self { + Self { + forwarder: Arc::new(RwLock::new(None)), + } + } } impl TiFlashObserver { #[allow(clippy::too_many_arguments)] - pub fn new( + pub fn init_forwarder( + &mut self, store_id: u64, engine: engine_tiflash::MixedModeEngine, raft_engine: ER, @@ -47,20 +56,19 @@ impl TiFlashObserver { packed_envs: PackedEnvs, debug_struct: DebugStruct, key_manager: Option>, - ) -> Self { - TiFlashObserver { - forwarder: ProxyForwarder::new( - store_id, - engine, - raft_engine, - sst_importer, - trans, - snap_mgr, - packed_envs, - debug_struct, - key_manager, - ), - } + ) { + let f = ProxyForwarder::new( + store_id, + engine, + raft_engine, + sst_importer, + trans, + snap_mgr, + packed_envs, + debug_struct, + key_manager, + ); + self.forwarder.write().expect("poisoned").replace(f); } pub fn register_to( @@ -105,7 +113,9 @@ impl TiFlashObserver { impl Coprocessor for TiFlashObserver { fn stop(&self) { - self.forwarder.stop(); + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.stop(); + } } } @@ -117,8 +127,11 @@ impl AdminObserver for TiFlashObserver bool { - self.forwarder - .pre_exec_admin(ob_ctx.region(), req, index, term) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_exec_admin(ob_ctx.region(), req, index, term) + } else { + false + } } fn post_exec_admin( @@ -129,19 +142,25 @@ impl AdminObserver for TiFlashObserver, ) -> bool { - self.forwarder.post_exec_admin( - ob_ctx.region(), - cmd, - apply_state, - region_state, - apply_ctx_info, - ) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.post_exec_admin( + ob_ctx.region(), + cmd, + apply_state, + region_state, + apply_ctx_info, + ) + } else { + false + } } } impl QueryObserver for TiFlashObserver { fn on_empty_cmd(&self, ob_ctx: &mut ObserverContext<'_>, index: u64, term: u64) { - self.forwarder.on_empty_cmd(ob_ctx.region(), index, term) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_empty_cmd(ob_ctx.region(), index, term) + } } fn post_exec_query( @@ -152,20 +171,25 @@ impl QueryObserver for TiFlashObserver, ) -> bool { - self.forwarder.post_exec_query( - ob_ctx.region(), - cmd, - apply_state, - region_state, - apply_ctx_info, - ) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.post_exec_query( + ob_ctx.region(), + cmd, + apply_state, + region_state, + apply_ctx_info, + ) + } else { + false + } } } impl UpdateSafeTsObserver for TiFlashObserver { fn on_update_safe_ts(&self, region_id: u64, self_safe_ts: u64, leader_safe_ts: u64) { - self.forwarder - .on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts) + } } } @@ -176,7 +200,9 @@ impl RegionChangeObserver for TiFlashObs e: RegionChangeEvent, r: StateRole, ) { - self.forwarder.on_region_changed(ob_ctx.region(), e, r) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_region_changed(ob_ctx.region(), e, r) + } } #[allow(clippy::match_like_matches_macro)] @@ -186,24 +212,37 @@ impl RegionChangeObserver for TiFlashObs is_finished: bool, cmd: Option<&RaftCmdRequest>, ) -> bool { - self.forwarder - .pre_persist(ob_ctx.region(), is_finished, cmd) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_persist(ob_ctx.region(), is_finished, cmd) + } else { + true + } } fn pre_write_apply_state(&self, ob_ctx: &mut ObserverContext<'_>) -> bool { - self.forwarder.pre_write_apply_state(ob_ctx.region()) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_write_apply_state(ob_ctx.region()) + } else { + true + } } } impl MessageObserver for TiFlashObserver { fn on_raft_message(&self, msg: &RaftMessage) -> bool { - self.forwarder.on_raft_message(msg) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_raft_message(msg) + } else { + true + } } } impl PdTaskObserver for TiFlashObserver { fn on_compute_engine_size(&self, store_size: &mut Option) { - self.forwarder.on_compute_engine_size(store_size) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_compute_engine_size(store_size) + } } } @@ -216,8 +255,9 @@ impl ApplySnapshotObserver for TiFlashOb snap_key: &store::SnapKey, snap: Option<&store::Snapshot>, ) { - self.forwarder - .pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + } } fn post_apply_snapshot( @@ -227,21 +267,30 @@ impl ApplySnapshotObserver for TiFlashOb snap_key: &store::SnapKey, snap: Option<&store::Snapshot>, ) { - self.forwarder - .post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap) + } } fn should_pre_apply_snapshot(&self) -> bool { - self.forwarder.should_pre_apply_snapshot() + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.should_pre_apply_snapshot() + } else { + false + } } fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) { - self.forwarder.cancel_apply_snapshot(region_id, peer_id) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.cancel_apply_snapshot(region_id, peer_id) + } } } impl RoleObserver for TiFlashObserver { fn on_role_change(&self, ob_ctx: &mut ObserverContext<'_>, r: &RoleChange) { - self.forwarder.on_role_change(ob_ctx.region(), r) + if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") { + forwarder.on_role_change(ob_ctx.region(), r) + } } } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs index 34c3d0590a2..3e32cefc283 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs @@ -39,6 +39,19 @@ pub struct FFIHelperSet { pub struct TestData { pub expected_leader_safe_ts: u64, pub expected_self_safe_ts: u64, + pub updated_leader_safe_ts: u64, + pub updated_self_safe_ts: u64, + pub checked_time: u64, +} + +impl TestData { + pub fn reset(&mut self) { + self.expected_leader_safe_ts = 0; + self.expected_self_safe_ts = 0; + self.updated_leader_safe_ts = 0; + self.updated_self_safe_ts = 0; + self.checked_time = 0; + } } #[allow(clippy::type_complexity)] diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs index e2d62d4a9c6..827a9d721f7 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster.rs @@ -14,7 +14,7 @@ use engine_traits::SnapshotContext; // mock cluster use engine_traits::{Engines, KvEngine, CF_DEFAULT}; use file_system::IoRateLimiter; -use futures::executor::block_on; +use futures::{executor::block_on, future::BoxFuture, StreamExt}; use kvproto::{ errorpb::Error as PbError, metapb::{self, PeerRole, RegionEpoch, StoreLabel}, @@ -35,8 +35,8 @@ use raftstore::{ initial_region, msg::StoreTick, prepare_bootstrap_cluster, Callback, CasualMessage, CasualRouter, RaftCmdExtraOpts, - RaftRouter, SnapManager, StoreMsg, StoreRouter, WriteResponse, INIT_EPOCH_CONF_VER, - INIT_EPOCH_VER, + RaftRouter, ReadResponse, SnapManager, StoreMsg, StoreRouter, WriteResponse, + INIT_EPOCH_CONF_VER, INIT_EPOCH_VER, }, Error, Result, }; @@ -44,13 +44,15 @@ use resource_control::ResourceGroupManager; use tempfile::TempDir; use test_pd_client::TestPdClient; use test_raftstore::{ - is_error_response, make_cb, new_admin_request, new_delete_cmd, new_peer, new_put_cf_cmd, + is_error_response, new_admin_request, new_delete_cmd, new_peer, new_put_cf_cmd, new_put_cmd, new_region_leader_cmd, new_request, new_status_request, new_store, new_tikv_config, new_transfer_leader_cmd, sleep_ms, }; use tikv::server::Result as ServerResult; use tikv_util::{ - debug, error, safe_panic, + debug, error, + mpsc::future, + safe_panic, thread_group::GroupProperties, time::{Instant, ThreadReadId}, warn, HandyRwLock, @@ -64,6 +66,61 @@ use super::{ util::*, }; +#[derive(Default)] +struct CallbackLeakDetector { + called: bool, +} + +impl Drop for CallbackLeakDetector { + fn drop(&mut self) { + if self.called { + return; + } + + debug!("before capture"); + let bt = std::backtrace::Backtrace::capture(); + debug!("callback is dropped"; "backtrace" => ?bt); + } +} + +pub fn check_raft_cmd_request(cmd: &RaftCmdRequest) -> bool { + let mut is_read = cmd.has_status_request(); + let mut is_write = cmd.has_admin_request(); + for req in cmd.get_requests() { + match req.get_cmd_type() { + CmdType::Get | CmdType::Snap | CmdType::ReadIndex => is_read = true, + CmdType::Put | CmdType::Delete | CmdType::DeleteRange | CmdType::IngestSst => { + is_write = true + } + CmdType::Invalid | CmdType::Prewrite => panic!("Invalid RaftCmdRequest: {:?}", cmd), + } + } + assert!(is_read ^ is_write, "Invalid RaftCmdRequest: {:?}", cmd); + is_read +} + +pub fn make_cb( + cmd: &RaftCmdRequest, +) -> (Callback, future::Receiver) { + let is_read = check_raft_cmd_request(cmd); + let (tx, rx) = future::bounded(1, future::WakePolicy::Immediately); + let mut detector = CallbackLeakDetector::default(); + let cb = if is_read { + Callback::read(Box::new(move |resp: ReadResponse| { + detector.called = true; + // we don't care error actually. + let _ = tx.send(resp.response); + })) + } else { + Callback::write(Box::new(move |resp: WriteResponse| { + detector.called = true; + // we don't care error actually. + let _ = tx.send(resp.response); + })) + }; + (cb, rx) +} + // We simulate 3 or 5 nodes, each has a store. // Sometimes, we use fixed id to test, which means the id // isn't allocated by pd, and node id, store id are same. @@ -1063,4 +1120,40 @@ impl> Cluster { panic!("Flashback call msg failed"); } } + + pub fn async_request( + &mut self, + req: RaftCmdRequest, + ) -> Result> { + self.async_request_with_opts(req, Default::default()) + } + + pub fn async_request_with_opts( + &mut self, + mut req: RaftCmdRequest, + opts: RaftCmdExtraOpts, + ) -> Result> { + let region_id = req.get_header().get_region_id(); + let leader = self.leader_of_region(region_id).unwrap(); + req.mut_header().set_peer(leader.clone()); + let (cb, mut rx) = make_cb::(&req); + self.sim + .rl() + .async_command_on_node_with_opts(leader.get_store_id(), req, cb, opts)?; + Ok(Box::pin(async move { + let fut = rx.next(); + fut.await.unwrap() + })) + } + + pub fn async_put( + &mut self, + key: &[u8], + value: &[u8], + ) -> Result> { + let mut region = self.get_region(key); + let reqs = vec![new_put_cmd(key, value)]; + let put = new_request(region.get_id(), region.take_region_epoch(), reqs, false); + self.async_request(put) + } } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs index 915b40d2f64..ec05398a39f 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs @@ -348,7 +348,8 @@ impl Simulator for NodeCluster { pd_endpoints: cfg.pd.endpoints.clone(), snap_handle_pool_size: cfg.proxy_cfg.raft_store.snap_handle_pool_size, }; - let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new( + let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default(); + tiflash_ob.init_forwarder( node_id, engines.kv.clone(), engines.raft.clone(), diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index c2d2e4778ce..cbd2ba296f6 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -293,6 +293,13 @@ impl ServerCluster { } let key_mgr_cloned = key_manager.clone(); + + // Create coprocessor. + let mut coprocessor_host = CoprocessorHost::new(router.clone(), cfg.coprocessor.clone()); + + let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default(); + tiflash_ob.register_to(&mut coprocessor_host); + let local_reader = LocalReader::new( engines.kv.clone(), StoreMetaDelegate::new(store_meta.clone(), engines.kv.clone()), @@ -306,7 +313,6 @@ impl ServerCluster { } else { Arc::new(|| false) }; - let mut coprocessor_host = CoprocessorHost::new(router.clone(), cfg.coprocessor.clone()); let region_info_accessor = RegionInfoAccessor::new(&mut coprocessor_host, enable_region_stats_mgr_cb); @@ -556,8 +562,7 @@ impl ServerCluster { let mut server = server.unwrap(); let addr = server.listening_addr(); cfg.server.addr = format!("{}", addr); - let trans = server.transport(); - let simulate_trans = SimulateTransport::new(trans); + let simulate_trans = SimulateTransport::new(server.transport()); let max_grpc_thread_count = cfg.server.grpc_concurrency; let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); @@ -566,7 +571,7 @@ impl ServerCluster { pd_endpoints: cfg.pd.endpoints.clone(), snap_handle_pool_size: cfg.proxy_cfg.raft_store.snap_handle_pool_size, }; - let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new( + tiflash_ob.init_forwarder( node_id, engines.kv.clone(), engines.raft.clone(), @@ -577,7 +582,6 @@ impl ServerCluster { DebugStruct::default(), key_mgr_cloned, ); - tiflash_ob.register_to(&mut coprocessor_host); engines .kv .proxy_ext diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index d9176d4baf0..838c409b04d 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -583,17 +583,28 @@ unsafe extern "C" fn ffi_handle_destroy( unsafe extern "C" fn ffi_handle_safe_ts_update( arg1: *mut interfaces_ffi::EngineStoreServerWrap, - _region_id: u64, + region_id: u64, self_safe_ts: u64, leader_safe_ts: u64, ) { - let store = into_engine_store_server_wrap(arg1); - let cluster_ext = store.cluster_ext_ptr as *const mock_cluster::ClusterExt; - assert_eq!(self_safe_ts, (*cluster_ext).test_data.expected_self_safe_ts); - assert_eq!( - leader_safe_ts, - (*cluster_ext).test_data.expected_leader_safe_ts + info!( + "ffi_handle_safe_ts_update region_id {}, self_safe_ts {} leader_safe_ts {}", + region_id, self_safe_ts, leader_safe_ts ); + let store = into_engine_store_server_wrap(arg1); + let cluster_ext = store.cluster_ext_ptr as *mut mock_cluster::ClusterExt; + if (*cluster_ext).test_data.expected_self_safe_ts != 0 { + assert_eq!(self_safe_ts, (*cluster_ext).test_data.expected_self_safe_ts); + } + if (*cluster_ext).test_data.expected_leader_safe_ts != 0 { + assert_eq!( + leader_safe_ts, + (*cluster_ext).test_data.expected_leader_safe_ts + ); + } + (*cluster_ext).test_data.updated_leader_safe_ts = leader_safe_ts; + (*cluster_ext).test_data.updated_self_safe_ts = self_safe_ts; + (*cluster_ext).test_data.checked_time += 1; } unsafe extern "C" fn ffi_handle_compute_store_stats( diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index ba9aef03c5b..94367b3322b 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -1214,6 +1214,10 @@ impl TiKvServer { } let importer = Arc::new(importer); + // Must be registered before `CheckLeaderRunner`, to get safe_ts updates. + let mut tiflash_ob = engine_store_ffi::observer::TiFlashObserver::default(); + tiflash_ob.register_to(self.coprocessor_host.as_mut().unwrap()); + let check_leader_runner = CheckLeaderRunner::new( engines.store_meta.clone(), self.coprocessor_host.clone().unwrap(), @@ -1256,7 +1260,7 @@ impl TiKvServer { pd_endpoints: self.core.config.pd.endpoints.clone(), snap_handle_pool_size: self.proxy_config.raft_store.snap_handle_pool_size, }; - let tiflash_ob = engine_store_ffi::observer::TiFlashObserver::new( + tiflash_ob.init_forwarder( node.id(), self.engines.as_ref().unwrap().engines.kv.clone(), self.engines.as_ref().unwrap().engines.raft.clone(), @@ -1267,7 +1271,6 @@ impl TiKvServer { DebugStruct::default(), self.core.encryption_key_manager.clone(), ); - tiflash_ob.register_to(self.coprocessor_host.as_mut().unwrap()); cfg_controller.register( tikv::config::Module::Server, diff --git a/proxy_tests/proxy/shared/server_cluster_test.rs b/proxy_tests/proxy/shared/server_cluster_test.rs index 8f954925bac..9b5687df134 100644 --- a/proxy_tests/proxy/shared/server_cluster_test.rs +++ b/proxy_tests/proxy/shared/server_cluster_test.rs @@ -101,6 +101,159 @@ fn test_safe_ts_basic() { .cluster_ext .set_expected_safe_ts(physical_time, physical_time); suite.must_check_leader(1, TimeStamp::new(physical_time), 1, 1); + assert_eq!(suite.cluster.cluster_ext.test_data.checked_time, 1); suite.stop(); } + +const INVALID_TIMESTAMP: u64 = u64::MAX; + +#[test] +fn test_safe_ts_updates() { + let mut suite = TestSuite::new(1); + + suite.cluster.cluster_ext.test_data.reset(); + + let states = collect_all_states(&suite.cluster.cluster_ext, 1); + let applied_index = states + .get(&1) + .unwrap() + .in_memory_apply_state + .get_applied_index(); + + let physical_time = 646454654654; + suite.must_check_leader(1, TimeStamp::new(physical_time), applied_index + 1, 1); + + assert_ne!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time + ); + + suite.cluster.must_put(b"k1", b"v1"); + + let eng_ids = suite + .cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + check_key( + &*(suite.cluster), + b"k1", + b"v1", + Some(true), + None, + Some(vec![eng_ids[0]]), + ); + + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time + ); + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_leader_safe_ts, + INVALID_TIMESTAMP + ); + + let physical_time2 = 666454654654; + suite.must_check_leader(1, TimeStamp::new(physical_time2), applied_index + 2, 1); + + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time + ); + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_leader_safe_ts, + physical_time2 + ); + + suite.cluster.must_put(b"k2", b"v1"); + + check_key( + &*(suite.cluster), + b"k2", + b"v1", + Some(true), + None, + Some(vec![eng_ids[0]]), + ); + + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_self_safe_ts, + physical_time2 + ); + assert_eq!( + suite.cluster.cluster_ext.test_data.updated_leader_safe_ts, + INVALID_TIMESTAMP + ); + suite.stop(); +} + +#[test] +fn test_raft_message_observer() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + + cluster.must_put(b"k1", b"v1"); + + fail::cfg("tiflash_force_reject_raft_append_message", "return").unwrap(); + fail::cfg("tiflash_force_reject_raft_snapshot_message", "return").unwrap(); + + cluster.pd_client.add_peer(r1, new_peer(2, 2)); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + check_key( + &cluster, + b"k1", + b"v", + Some(false), + Some(false), + Some(vec![2]), + ); + + fail::remove("tiflash_force_reject_raft_append_message"); + fail::remove("tiflash_force_reject_raft_snapshot_message"); + + cluster.pd_client.must_have_peer(r1, new_peer(2, 2)); + cluster.pd_client.must_add_peer(r1, new_peer(3, 3)); + + check_key( + &cluster, + b"k1", + b"v1", + Some(true), + Some(true), + Some(vec![2, 3]), + ); + + fail::cfg("tiflash_force_reject_raft_append_message", "return").unwrap(); + + let _ = cluster.async_put(b"k2", b"v2").unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + check_key( + &cluster, + b"k3", + b"v3", + Some(false), + Some(false), + Some(vec![2, 3]), + ); + + fail::remove("tiflash_force_reject_raft_append_message"); + + cluster.must_put(b"k3", b"v3"); + check_key( + &cluster, + b"k3", + b"v3", + Some(true), + Some(true), + Some(vec![1, 2, 3]), + ); + cluster.shutdown(); +} diff --git a/proxy_tests/proxy/utils/v1_server.rs b/proxy_tests/proxy/utils/v1_server.rs index fbb0b175da7..88bae96c30f 100644 --- a/proxy_tests/proxy/utils/v1_server.rs +++ b/proxy_tests/proxy/utils/v1_server.rs @@ -72,9 +72,10 @@ impl TestSuite { req.set_regions(regions.into()); req.set_ts(resolved_ts.into_inner()); - let _check_leader_resp = self + let check_leader_resp = self .get_client_from_store_id(store_id) .check_leader(&req) .unwrap(); + info!("check_leader_resp: {:?}", check_leader_resp); } }