diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 9abc1c39945..0dcd153efe0 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -638,7 +638,7 @@ where for m in msgs.drain(..) { distribution[m.discriminant()] += 1; match m { - PeerMsg::RaftMessage(msg, sent_time) => { + PeerMsg::RaftMessage(mut msg, sent_time) => { if let Some(sent_time) = sent_time { let wait_time = sent_time.saturating_elapsed().as_secs_f64(); self.ctx.raft_metrics.process_wait_time.observe(wait_time); @@ -648,6 +648,36 @@ where continue; } + info!("!!!!!! handle_msgs 11111"); + let modify_msg: bool = (|| { + fail::fail_point!("mock_overlapped_region_1", |t| { + let t = t.unwrap().parse::().unwrap(); + t + }); + 0 + })() != 0; + + info!("!!!!!! handle_msgs 22222"); + if modify_msg { + if msg.msg.get_message().get_msg_type() == raft::eraftpb::MessageType::MsgSnapshot && + msg.msg.get_to_peer().get_store_id() == 2 && + msg.msg.region_id == 1 + { + info!("!!!!! origin {:?}", msg.msg); + let mut msg2 = msg.msg.clone(); + let mut snapshot = msg.msg.get_message().get_snapshot(); + let snapshot_data = snapshot.get_data(); + let mut parsed_data = kvproto::raft_serverpb::RaftSnapshotData::default(); + parsed_data.merge_from_bytes(snapshot_data); + parsed_data.mut_region().set_start_key(b"".to_vec()); + parsed_data.mut_region().set_end_key(b"".to_vec()); + msg2.mut_message().mut_snapshot().set_data(parsed_data.write_to_bytes().unwrap().into()); + msg.msg = msg2; + info!("!!!!! after {:?}", msg.msg); + } + } + + info!("!!!!!! handle_msgs 33333"); if let Err(e) = self.on_raft_message(msg) { error!(%e; "handle raft message err"; diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs index 850ad9a0903..e98c719d685 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs @@ -185,7 +185,7 @@ impl Transport for ChannelTransport { type SimulateChannelTransport = SimulateTransport; pub struct NodeCluster { - trans: ChannelTransport, + pub trans: ChannelTransport, pd_client: Arc, nodes: HashMap>, snap_mgrs: HashMap, diff --git a/proxy_tests/proxy/shared/fast_add_peer/fp.rs b/proxy_tests/proxy/shared/fast_add_peer/fp.rs index b22805dfaa9..03c78828dda 100644 --- a/proxy_tests/proxy/shared/fast_add_peer/fp.rs +++ b/proxy_tests/proxy/shared/fast_add_peer/fp.rs @@ -981,7 +981,7 @@ fn test_msgsnapshot_before_msgappend() { debug!("compact at index {}", compact_index); let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); let req = test_raftstore::new_admin_request(1, region.get_region_epoch(), compact_log); - let res = cluster + let _ = cluster .call_command_on_leader(req, Duration::from_secs(3)) .unwrap(); diff --git a/proxy_tests/proxy/shared/snapshot.rs b/proxy_tests/proxy/shared/snapshot.rs index 8ab1544b208..6ad8a8d5a61 100644 --- a/proxy_tests/proxy/shared/snapshot.rs +++ b/proxy_tests/proxy/shared/snapshot.rs @@ -1,3 +1,5 @@ +use fail::fail_point; + // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use crate::utils::v1::*; @@ -493,3 +495,57 @@ fn test_apply_cancelled_pre_handle() { fail::remove("on_ob_cancel_after_pre_handle_snapshot"); cluster.shutdown(); } + +#[test] +fn test_apply_before_snapshot() { + fail::cfg("on_pre_write_apply_state", "return").unwrap(); + + tikv_util::set_panic_hook(true, "./"); + let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2); + assert_eq!(cluster.cfg.proxy_cfg.raft_store.snap_handle_pool_size, 2); + + disable_auto_gen_compact_log(&mut cluster); + + // Disable default max peer count check. + pd_client.disable_default_operator(); + + let r1 = cluster.run_conf_change(); + let eng_ids = cluster + .engines + .iter() + .map(|e| e.0.to_owned()) + .collect::>(); + + cluster.must_put(b"k1", b"v"); + cluster.must_put(b"k2", b"v"); + cluster.must_put(b"k3", b"v"); + cluster.must_put(b"k4", b"v"); + + check_key(&cluster, b"k4", b"v", Some(true), None, Some(vec![1])); + + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(r1, 2) + .msg_type(MessageType::MsgAppend) + .direction(Direction::Both), + )); + + pd_client.add_peer(cluster.get_region(b"k1").get_id(), new_learner_peer(eng_ids[1], 1112)); + + std::thread::sleep(std::time::Duration::from_millis(2000)); + + cluster.must_split(&cluster.get_region(b"k1"), b"k2"); + // k1 in 1000, k4 in 1 + info!("k1 in {}, k4 in {}", cluster.get_region(b"k1").get_id(), cluster.get_region(b"k4").get_id()); + + std::thread::sleep(std::time::Duration::from_millis(2000)); + + check_key(&cluster, b"k1", b"v", Some(true), None, Some(vec![1, 2])); + + fail::cfg("mock_overlapped_region_1", "return(1)"); + cluster.clear_send_filters(); + std::thread::sleep(std::time::Duration::from_millis(2000)); + + check_key(&cluster, b"k4", b"v", Some(true), None, Some(vec![1, 2])); + + cluster.shutdown(); +}