diff --git a/.github/workflows/pr-ci.yml b/.github/workflows/pr-ci.yml index e3100fd603e..1f20a6ad398 100644 --- a/.github/workflows/pr-ci.yml +++ b/.github/workflows/pr-ci.yml @@ -6,7 +6,8 @@ on: # - 'raftstore-proxy*' pull_request: branches: - - 'raftstore-proxy*' + - 'raftstore-proxy-6.*' + - 'raftstore-proxy-5.*' jobs: build-check-old: diff --git a/Makefile b/Makefile index 352ac76cbdb..c0148ff3844 100644 --- a/Makefile +++ b/Makefile @@ -212,7 +212,7 @@ unset-override: pre-format: unset-override @rustup component add rustfmt - @cargo install --force --locked -q cargo-sort + @cargo install --force --locked --version=1.0.9 -q cargo-sort pre-format-fast: unset-override @rustup component add rustfmt @@ -249,3 +249,4 @@ clippy: pre-clippy @./scripts/check-redact-log @./scripts/check-docker-build @./scripts/clippy-all + diff --git a/components/error_code/src/sst_importer.rs b/components/error_code/src/sst_importer.rs index 001f4f146f6..b092796d467 100644 --- a/components/error_code/src/sst_importer.rs +++ b/components/error_code/src/sst_importer.rs @@ -22,5 +22,11 @@ define_error_codes!( TTL_LEN_NOT_EQUALS_TO_PAIRS => ("TtlLenNotEqualsToPairs", "", ""), INCOMPATIBLE_API_VERSION => ("IncompatibleApiVersion", "", ""), INVALID_KEY_MODE => ("InvalidKeyMode", "", ""), - RESOURCE_NOT_ENOUTH => ("ResourceNotEnough", "", "") + RESOURCE_NOT_ENOUTH => ("ResourceNotEnough", "", ""), + SUSPENDED => ("Suspended", + "this request has been suspended.", + "Probably there are some export tools don't support exporting data inserted by `ingest`(say, snapshot backup). Check the user manual and stop them."), + REQUEST_TOO_NEW => ("RequestTooNew", "", ""), + REQUEST_TOO_OLD => ("RequestTooOld", "", ""), + DISK_SPACE_NOT_ENOUGH => ("DiskSpaceNotEnough", "", "") ); diff --git a/components/raftstore-v2/src/operation/command/write/ingest.rs b/components/raftstore-v2/src/operation/command/write/ingest.rs index 7e8ed381ad0..17aa3ffb9b6 100644 --- a/components/raftstore-v2/src/operation/command/write/ingest.rs +++ b/components/raftstore-v2/src/operation/command/write/ingest.rs @@ -40,6 +40,11 @@ impl Store { ctx: &mut StoreContext, ) -> Result<()> { let ssts = box_try!(ctx.sst_importer.list_ssts()); + // filter old version SSTs + let ssts: Vec<_> = ssts + .into_iter() + .filter(|sst| sst.api_version >= sst_importer::API_VERSION_2) + .collect(); if ssts.is_empty() { return Ok(()); } @@ -47,9 +52,9 @@ impl Store { let mut region_ssts: HashMap<_, Vec<_>> = HashMap::default(); for sst in ssts { region_ssts - .entry(sst.get_region_id()) + .entry(sst.meta.get_region_id()) .or_default() - .push(sst); + .push(sst.meta); } let ranges = ctx.sst_importer.ranges_in_import(); diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index c21ea65a589..e967eb17dd3 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -36,7 +36,6 @@ use futures::{compat::Future01CompatExt, FutureExt}; use grpcio_health::HealthService; use keys::{self, data_end_key, data_key, enc_end_key, enc_start_key}; use kvproto::{ - import_sstpb::{SstMeta, SwitchMode}, metapb::{self, Region, RegionEpoch}, pdpb::{self, QueryStats, StoreStats}, raft_cmdpb::{AdminCmdType, AdminRequest}, @@ -814,9 +813,6 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport> } } StoreMsg::CompactedEvent(event) => self.on_compaction_finished(event), - StoreMsg::ValidateSstResult { invalid_ssts } => { - self.on_validate_sst_result(invalid_ssts) - } StoreMsg::ClearRegionSizeInRange { start_key, end_key } => { self.clear_region_size_in_range(&start_key, &end_key) } @@ -1656,12 +1652,7 @@ impl RaftBatchSystem { ); let compact_runner = CompactRunner::new(engines.kv.clone()); - let cleanup_sst_runner = CleanupSstRunner::new( - meta.get_id(), - self.router.clone(), - Arc::clone(&importer), - Arc::clone(&pd_client), - ); + let cleanup_sst_runner = CleanupSstRunner::new(Arc::clone(&importer)); let gc_snapshot_runner = GcSnapshotRunner::new( meta.get_id(), self.router.clone(), // RaftRouter @@ -2760,42 +2751,8 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER } impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER, T> { - fn on_validate_sst_result(&mut self, ssts: Vec) { - if ssts.is_empty() || self.ctx.importer.get_mode() == SwitchMode::Import { - return; - } - // A stale peer can still ingest a stale Sst before it is - // destroyed. We need to make sure that no stale peer exists. - let mut delete_ssts = Vec::new(); - { - let meta = self.ctx.store_meta.lock().unwrap(); - for sst in ssts { - if !meta.regions.contains_key(&sst.get_region_id()) { - delete_ssts.push(sst); - } - } - } - if delete_ssts.is_empty() { - return; - } - - let task = CleanupSstTask::DeleteSst { ssts: delete_ssts }; - if let Err(e) = self - .ctx - .cleanup_scheduler - .schedule(CleanupTask::CleanupSst(task)) - { - error!( - "schedule to delete ssts failed"; - "store_id" => self.fsm.store.id, - "err" => ?e, - ); - } - } - fn on_cleanup_import_sst(&mut self) -> Result<()> { let mut delete_ssts = Vec::new(); - let mut validate_ssts = Vec::new(); let ssts = box_try!(self.ctx.importer.list_ssts()); if ssts.is_empty() { @@ -2804,15 +2761,22 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER { let meta = self.ctx.store_meta.lock().unwrap(); for sst in ssts { - if let Some(r) = meta.regions.get(&sst.get_region_id()) { + if sst.api_version < sst_importer::API_VERSION_2 { + // SST of old versions are created by old TiKV and have different prerequisite + // we can't delete them here. They can only be deleted manually + continue; + } + if let Some(r) = meta.regions.get(&sst.meta.get_region_id()) { let region_epoch = r.get_region_epoch(); - if util::is_epoch_stale(sst.get_region_epoch(), region_epoch) { + if util::is_epoch_stale(sst.meta.get_region_epoch(), region_epoch) { // If the SST epoch is stale, it will not be ingested anymore. - delete_ssts.push(sst); + delete_ssts.push(sst.meta); } } else { - // If the peer doesn't exist, we need to validate the SST through PD. - validate_ssts.push(sst); + // The write RPC of import sst service have make sure the region do exist at the + // write time, and now the region is not found, sst can be + // deleted because it won't be used by ingest in future. + delete_ssts.push(sst.meta); } } } @@ -2832,27 +2796,6 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER } } - // When there is an import job running, the region which this sst belongs may - // has not been split from the origin region because the apply thread is so busy - // that it can not apply SplitRequest as soon as possible. So we can not - // delete this sst file. - if !validate_ssts.is_empty() && self.ctx.importer.get_mode() != SwitchMode::Import { - let task = CleanupSstTask::ValidateSst { - ssts: validate_ssts, - }; - if let Err(e) = self - .ctx - .cleanup_scheduler - .schedule(CleanupTask::CleanupSst(task)) - { - error!( - "schedule to validate ssts failed"; - "store_id" => self.fsm.store.id, - "err" => ?e, - ); - } - } - Ok(()) } diff --git a/components/raftstore/src/store/msg.rs b/components/raftstore/src/store/msg.rs index 1ed8934e0f0..297c8dcc983 100644 --- a/components/raftstore/src/store/msg.rs +++ b/components/raftstore/src/store/msg.rs @@ -10,7 +10,6 @@ use engine_traits::{CompactedEvent, KvEngine, Snapshot}; use futures::channel::mpsc::UnboundedSender; use kvproto::{ brpb::CheckAdminResponse, - import_sstpb::SstMeta, kvrpcpb::{DiskFullOpt, ExtraOp as TxnExtraOp}, metapb, metapb::RegionEpoch, @@ -823,10 +822,6 @@ where { RaftMessage(InspectedRaftMessage), - ValidateSstResult { - invalid_ssts: Vec, - }, - // Clear region size and keys for all regions in the range, so we can force them to // re-calculate their size later. ClearRegionSizeInRange { @@ -883,7 +878,6 @@ where write!(fmt, "Store {} is unreachable", store_id) } StoreMsg::CompactedEvent(ref event) => write!(fmt, "CompactedEvent cf {}", event.cf()), - StoreMsg::ValidateSstResult { .. } => write!(fmt, "Validate SST Result"), StoreMsg::ClearRegionSizeInRange { ref start_key, ref end_key, diff --git a/components/raftstore/src/store/worker/cleanup.rs b/components/raftstore/src/store/worker/cleanup.rs index 632e85f40cc..726b7abe5ce 100644 --- a/components/raftstore/src/store/worker/cleanup.rs +++ b/components/raftstore/src/store/worker/cleanup.rs @@ -3,7 +3,6 @@ use std::fmt::{self, Display, Formatter}; use engine_traits::{KvEngine, RaftEngine}; -use pd_client::PdClient; use tikv_util::worker::Runnable; use super::{ @@ -11,7 +10,6 @@ use super::{ cleanup_sst::{Runner as CleanupSstRunner, Task as CleanupSstTask}, compact::{Runner as CompactRunner, Task as CompactTask}, }; -use crate::store::StoreRouter; pub enum Task { Compact(CompactTask), @@ -29,29 +27,26 @@ impl Display for Task { } } -pub struct Runner +pub struct Runner where E: KvEngine, R: RaftEngine, - S: StoreRouter, { compact: CompactRunner, - cleanup_sst: CleanupSstRunner, + cleanup_sst: CleanupSstRunner, gc_snapshot: GcSnapshotRunner, } -impl Runner +impl Runner where E: KvEngine, R: RaftEngine, - C: PdClient, - S: StoreRouter, { pub fn new( compact: CompactRunner, - cleanup_sst: CleanupSstRunner, + cleanup_sst: CleanupSstRunner, gc_snapshot: GcSnapshotRunner, - ) -> Runner { + ) -> Runner { Runner { compact, cleanup_sst, @@ -60,12 +55,10 @@ where } } -impl Runnable for Runner +impl Runnable for Runner where E: KvEngine, R: RaftEngine, - C: PdClient, - S: StoreRouter, { type Task = Task; diff --git a/components/raftstore/src/store/worker/cleanup_sst.rs b/components/raftstore/src/store/worker/cleanup_sst.rs index 8174b872f4b..44f188e6f8f 100644 --- a/components/raftstore/src/store/worker/cleanup_sst.rs +++ b/components/raftstore/src/store/worker/cleanup_sst.rs @@ -1,62 +1,30 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -use std::{error::Error, fmt, marker::PhantomData, sync::Arc}; +use std::{fmt, sync::Arc}; -use engine_traits::KvEngine; -use kvproto::{import_sstpb::SstMeta, metapb::Region}; -use pd_client::PdClient; +use kvproto::import_sstpb::SstMeta; use sst_importer::SstImporter; -use tikv_util::{error, worker::Runnable}; - -use crate::store::{util::is_epoch_stale, StoreMsg, StoreRouter}; - -type Result = std::result::Result>; +use tikv_util::worker::Runnable; pub enum Task { DeleteSst { ssts: Vec }, - ValidateSst { ssts: Vec }, } impl fmt::Display for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { Task::DeleteSst { ref ssts } => write!(f, "Delete {} ssts", ssts.len()), - Task::ValidateSst { ref ssts } => write!(f, "Validate {} ssts", ssts.len()), } } } -pub struct Runner -where - EK: KvEngine, - S: StoreRouter, -{ - store_id: u64, - store_router: S, +pub struct Runner { importer: Arc, - pd_client: Arc, - _engine: PhantomData, } -impl Runner -where - EK: KvEngine, - C: PdClient, - S: StoreRouter, -{ - pub fn new( - store_id: u64, - store_router: S, - importer: Arc, - pd_client: Arc, - ) -> Runner { - Runner { - store_id, - store_router, - importer, - pd_client, - _engine: PhantomData, - } +impl Runner { + pub fn new(importer: Arc) -> Runner { + Runner { importer } } /// Deletes SST files from the importer. @@ -65,78 +33,9 @@ where let _ = self.importer.delete(sst); } } - - fn get_region_by_meta(&self, sst: &SstMeta) -> Result { - // The SST meta has been delivered with a range, use it directly. - // For now, no case will reach this. But this still could be a guard for - // reducing the superise in the future... - if !sst.get_range().get_start().is_empty() || !sst.get_range().get_end().is_empty() { - return self - .pd_client - .get_region(sst.get_range().get_start()) - .map_err(Into::into); - } - // Once there isn't range provided. - let query_by_start_key_of_full_meta = || { - let start_key = self - .importer - .load_start_key_by_meta::(sst)? - .ok_or_else(|| -> Box { - "failed to load start key from sst, the sst might be empty".into() - })?; - let region = self.pd_client.get_region(&start_key)?; - Result::Ok(region) - }; - query_by_start_key_of_full_meta() - .map_err(|err| - format!("failed to load full sst meta from disk for {:?} and there isn't extra information provided: {err}", sst.get_uuid()).into() - ) - } - - /// Validates whether the SST is stale or not. - fn handle_validate_sst(&self, ssts: Vec) { - let store_id = self.store_id; - let mut invalid_ssts = Vec::new(); - for sst in ssts { - match self.get_region_by_meta(&sst) { - Ok(r) => { - // The region id may or may not be the same as the - // SST file, but it doesn't matter, because the - // epoch of a range will not decrease anyway. - if is_epoch_stale(r.get_region_epoch(), sst.get_region_epoch()) { - // Region has not been updated. - continue; - } - if r.get_id() == sst.get_region_id() - && r.get_peers().iter().any(|p| p.get_store_id() == store_id) - { - // The SST still belongs to this store. - continue; - } - invalid_ssts.push(sst); - } - Err(e) => { - error!("get region failed"; "err" => %e); - } - } - } - - // We need to send back the result to check for the stale - // peer, which may ingest the stale SST before it is - // destroyed. - let msg = StoreMsg::ValidateSstResult { invalid_ssts }; - if let Err(e) = self.store_router.send(msg) { - error!(%e; "send validate sst result failed"); - } - } } -impl Runnable for Runner -where - EK: KvEngine, - C: PdClient, - S: StoreRouter, -{ +impl Runnable for Runner { type Task = Task; fn run(&mut self, task: Task) { @@ -144,9 +43,6 @@ where Task::DeleteSst { ssts } => { self.handle_delete_sst(ssts); } - Task::ValidateSst { ssts } => { - self.handle_validate_sst(ssts); - } } } } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 7ff51474d7d..242152fccc6 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -357,6 +357,7 @@ where router.clone(), config.coprocessor.clone(), )); + let region_info_accessor = RegionInfoAccessor::new(coprocessor_host.as_mut().unwrap()); // Initialize concurrency manager @@ -1069,6 +1070,7 @@ where servers.importer.clone(), None, self.resource_manager.clone(), + Arc::new(self.region_info_accessor.clone()), ); let import_cfg_mgr = import_service.get_config_manager(); diff --git a/components/server/src/server2.rs b/components/server/src/server2.rs index fe2b685313e..0b0f450419d 100644 --- a/components/server/src/server2.rs +++ b/components/server/src/server2.rs @@ -937,6 +937,7 @@ where backup_worker.start(backup_endpoint); // Import SST service. + let region_info_accessor = self.region_info_accessor.as_ref().unwrap().clone(); let import_service = ImportSstService::new( self.core.config.import.clone(), self.core.config.raft_store.raft_entry_max_size, @@ -945,6 +946,7 @@ where servers.importer.clone(), Some(self.router.as_ref().unwrap().store_meta().clone()), self.resource_manager.clone(), + Arc::new(region_info_accessor), ); let import_cfg_mgr = import_service.get_config_manager(); diff --git a/components/sst_importer/src/errors.rs b/components/sst_importer/src/errors.rs index 7ff940fff12..cc551a43245 100644 --- a/components/sst_importer/src/errors.rs +++ b/components/sst_importer/src/errors.rs @@ -2,6 +2,7 @@ use std::{ error::Error as StdError, io::Error as IoError, num::ParseIntError, path::PathBuf, result, + time::Duration, }; use encryption::Error as EncryptionError; @@ -116,6 +117,12 @@ pub enum Error { #[error("Importing a SST file with imcompatible api version")] IncompatibleApiVersion, + #[error("{0}, please retry write later")] + RequestTooNew(String), + + #[error("{0}, please rescan region later")] + RequestTooOld(String), + #[error("Key mode mismatched with the request mode, writer: {:?}, storage: {:?}, key: {}", .writer, .storage_api_version, .key)] InvalidKeyMode { writer: SstWriterType, @@ -125,6 +132,12 @@ pub enum Error { #[error("resource is not enough {0}")] ResourceNotEnough(String), + + #[error("imports are suspended for {time_to_lease_expire:?}")] + Suspended { time_to_lease_expire: Duration }, + + #[error("TiKV disk space is not enough.")] + DiskSpaceNotEnough, } impl Error { @@ -197,6 +210,10 @@ impl ErrorCodeExt for Error { Error::IncompatibleApiVersion => error_code::sst_importer::INCOMPATIBLE_API_VERSION, Error::InvalidKeyMode { .. } => error_code::sst_importer::INVALID_KEY_MODE, Error::ResourceNotEnough(_) => error_code::sst_importer::RESOURCE_NOT_ENOUTH, + Error::Suspended { .. } => error_code::sst_importer::SUSPENDED, + Error::RequestTooNew(_) => error_code::sst_importer::REQUEST_TOO_NEW, + Error::RequestTooOld(_) => error_code::sst_importer::REQUEST_TOO_OLD, + Error::DiskSpaceNotEnough => error_code::sst_importer::DISK_SPACE_NOT_ENOUGH, } } } diff --git a/components/sst_importer/src/import_file.rs b/components/sst_importer/src/import_file.rs index b270d26a411..ae81cf01646 100644 --- a/components/sst_importer/src/import_file.rs +++ b/components/sst_importer/src/import_file.rs @@ -440,7 +440,7 @@ impl ImportDir { Ok(real_key.map(ToOwned::to_owned)) } - pub fn list_ssts(&self) -> Result> { + pub fn list_ssts(&self) -> Result> { let mut ssts = Vec::new(); for e in file_system::read_dir(&self.root_dir)? { let e = e?; @@ -458,20 +458,33 @@ impl ImportDir { } const SST_SUFFIX: &str = ".sst"; - +// version 2: compared to version 1 which is the default version, we will check +// epoch of request and local region in write API. +pub const API_VERSION_2: i32 = 2; + +/// sst_meta_to_path will encode the filepath with default api version (current +/// is 2). So when the SstMeta is created in old version of TiKV and filepath +/// will not correspond to the real file, in the deletion logic we can't remove +/// these files. pub fn sst_meta_to_path(meta: &SstMeta) -> Result { Ok(PathBuf::from(format!( - "{}_{}_{}_{}_{}{}", + "{}_{}_{}_{}_{}_{}{}", UuidBuilder::from_slice(meta.get_uuid())?.build(), meta.get_region_id(), meta.get_region_epoch().get_conf_ver(), meta.get_region_epoch().get_version(), meta.get_cf_name(), + API_VERSION_2, SST_SUFFIX, ))) } -pub fn parse_meta_from_path>(path: P) -> Result { +pub struct SstMetaWithApiVersion { + pub meta: SstMeta, + pub api_version: i32, // in future we may move api_version into SstMeta +} + +pub fn parse_meta_from_path>(path: P) -> Result { let path = path.as_ref(); let file_name = match path.file_name().and_then(|n| n.to_str()) { Some(name) => name, @@ -500,7 +513,11 @@ pub fn parse_meta_from_path>(path: P) -> Result { // cf_name to path. meta.set_cf_name(elems[4].to_owned()); } - Ok(meta) + let mut api_version = 1; + if elems.len() > 5 { + api_version = elems[5].parse()?; + } + Ok(SstMetaWithApiVersion { meta, api_version }) } #[cfg(test)] @@ -520,11 +537,12 @@ mod test { meta.mut_region_epoch().set_version(3); let path = sst_meta_to_path(&meta).unwrap(); - let expected_path = format!("{}_1_2_3_default.sst", uuid); + let expected_path = format!("{}_1_2_3_default_2.sst", uuid); assert_eq!(path.to_str().unwrap(), &expected_path); - let new_meta = parse_meta_from_path(path).unwrap(); - assert_eq!(meta, new_meta); + let meta_with_ver = parse_meta_from_path(path).unwrap(); + assert_eq!(meta, meta_with_ver.meta); + assert_eq!(2, meta_with_ver.api_version); } #[test] @@ -543,8 +561,9 @@ mod test { meta.get_region_epoch().get_version(), SST_SUFFIX, )); - let new_meta = parse_meta_from_path(path).unwrap(); - assert_eq!(meta, new_meta); + let meta_with_ver = parse_meta_from_path(path).unwrap(); + assert_eq!(meta, meta_with_ver.meta); + assert_eq!(1, meta_with_ver.api_version); } #[cfg(feature = "test-engines-rocksdb")] @@ -596,14 +615,20 @@ mod test { w.finish().unwrap(); dp.save(arcmgr.as_deref()).unwrap(); let mut ssts = dir.list_ssts().unwrap(); - ssts.iter_mut().for_each(|meta| { + ssts.iter_mut().for_each(|meta_with_ver| { + let meta = &mut meta_with_ver.meta; let start = dir .load_start_key_by_meta::(meta, arcmgr.clone()) .unwrap() .unwrap(); meta.mut_range().set_start(start) }); - assert_eq!(ssts, vec![meta]); + assert_eq!( + ssts.iter() + .map(|meta_with_ver| { meta_with_ver.meta.clone() }) + .collect(), + vec![meta] + ); } #[test] diff --git a/components/sst_importer/src/lib.rs b/components/sst_importer/src/lib.rs index 0cfc3bab774..ff137005b09 100644 --- a/components/sst_importer/src/lib.rs +++ b/components/sst_importer/src/lib.rs @@ -27,7 +27,7 @@ pub mod sst_importer; pub use self::{ config::{Config, ConfigManager}, errors::{error_inc, Error, Result}, - import_file::sst_meta_to_path, + import_file::{sst_meta_to_path, API_VERSION_2}, import_mode2::range_overlaps, sst_importer::SstImporter, sst_writer::{RawSstWriter, TxnSstWriter}, diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 33f3c691a26..ab22d70de77 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -51,7 +51,7 @@ use txn_types::{Key, TimeStamp, WriteRef}; use crate::{ caching::cache_map::{CacheMap, ShareOwned}, - import_file::{ImportDir, ImportFile}, + import_file::{ImportDir, ImportFile, SstMetaWithApiVersion}, import_mode::{ImportModeSwitcher, RocksDbMetricsFn}, import_mode2::{HashRange, ImportModeSwitcherV2}, metrics::*, @@ -1356,7 +1356,7 @@ impl SstImporter { /// List the basic information of the current SST files. /// The information contains UUID, region ID, region Epoch. /// Other fields may be left blank. - pub fn list_ssts(&self) -> Result> { + pub fn list_ssts(&self) -> Result> { self.dir.list_ssts() } @@ -1556,9 +1556,9 @@ mod tests { for sst in &ssts { ingested .iter() - .find(|s| s.get_uuid() == sst.get_uuid()) + .find(|s| s.get_uuid() == sst.meta.get_uuid()) .unwrap(); - dir.delete(sst, key_manager.as_deref()).unwrap(); + dir.delete(&sst.meta, key_manager.as_deref()).unwrap(); } assert!(dir.list_ssts().unwrap().is_empty()); } diff --git a/components/test_raftstore-v2/src/server.rs b/components/test_raftstore-v2/src/server.rs index 7b5d501a59f..ae938267d6f 100644 --- a/components/test_raftstore-v2/src/server.rs +++ b/components/test_raftstore-v2/src/server.rs @@ -556,6 +556,7 @@ impl ServerCluster { Arc::clone(&importer), Some(store_meta), resource_manager.clone(), + Arc::new(region_info_accessor.clone()), ); // Create deadlock service. diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 8d26bae968d..06dbccf848f 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -451,6 +451,7 @@ impl ServerCluster { Arc::clone(&importer), None, resource_manager.clone(), + Arc::new(region_info_accessor.clone()), ); // Create deadlock service. diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index f2f803d2800..2a0bea5c609 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -396,6 +396,7 @@ impl ServerCluster { Arc::clone(&importer), None, None, // TODO resource_ctl + Arc::new(region_info_accessor.clone()), ); let check_leader_runner = diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index ca2cec2e38e..a1707c59eeb 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -1369,6 +1369,7 @@ impl TiKvServer { servers.importer.clone(), None, self.resource_manager.clone(), + Arc::new(self.region_info_accessor.clone()), ); if servers .server diff --git a/proxy_components/proxy_test_raftstore_v2/src/server.rs b/proxy_components/proxy_test_raftstore_v2/src/server.rs index 921d3b991ab..42258e5169f 100644 --- a/proxy_components/proxy_test_raftstore_v2/src/server.rs +++ b/proxy_components/proxy_test_raftstore_v2/src/server.rs @@ -481,6 +481,9 @@ impl ServerCluster { raft_kv_v2, LocalTablets::Registry(tablet_registry.clone()), Arc::clone(&importer), + None, + None, + Arc::new(region_info_accessor.clone()), ); // Create deadlock service. diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index 0c81873c130..2994d8d65c4 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -12,6 +12,7 @@ use std::{ use engine_traits::{CompactExt, MiscExt, CF_DEFAULT, CF_WRITE}; use file_system::{set_io_type, IoType}; use futures::{sink::SinkExt, stream::TryStreamExt, FutureExt, TryFutureExt}; +use futures_executor::block_on; use grpcio::{ ClientStreamingSink, RequestStream, RpcContext, ServerStreamingSink, UnarySink, WriteFlags, }; @@ -23,7 +24,9 @@ use kvproto::{ SwitchMode, WriteRequest_oneof_chunk as Chunk, *, }, kvrpcpb::Context, + metapb::RegionEpoch, }; +use raftstore::{coprocessor::RegionInfoProvider, store::util::is_epoch_stale, RegionInfoAccessor}; use raftstore_v2::StoreMeta; use resource_control::{with_resource_limiter, ResourceGroupManager}; use sst_importer::{ @@ -35,8 +38,11 @@ use tikv_kv::{ }; use tikv_util::{ config::ReadableSize, - future::create_stream_with_buffer, - sys::thread::ThreadBuildWrapper, + future::{create_stream_with_buffer, paired_future_callback}, + sys::{ + disk::{get_disk_status, DiskUsage}, + thread::ThreadBuildWrapper, + }, time::{Instant, Limiter}, HandyRwLock, }; @@ -115,6 +121,7 @@ pub struct ImportSstService { limiter: Limiter, task_slots: Arc>>, raft_entry_max_size: ReadableSize, + region_info_accessor: Arc, writer: raft_writer::ThrottledTlsEngineWriter, @@ -303,6 +310,7 @@ impl ImportSstService { importer: Arc, store_meta: Option>>>, resource_manager: Option>, + region_info_accessor: Arc, ) -> Self { let props = tikv_util::thread_group::current_properties(); let eng = Mutex::new(engine.clone()); @@ -350,6 +358,7 @@ impl ImportSstService { limiter: Limiter::new(f64::INFINITY), task_slots: Arc::new(Mutex::new(HashSet::default())), raft_entry_max_size, + region_info_accessor, writer, store_meta, resource_manager, @@ -618,6 +627,59 @@ impl ImportSstService { } } +fn check_local_region_stale( + region_id: u64, + epoch: &RegionEpoch, + region_info_accessor: Arc, +) -> Result<()> { + let (cb, f) = paired_future_callback(); + region_info_accessor + .find_region_by_id(region_id, cb) + .map_err(|e| { + Error::Engine(format!("failed to find region {} err {:?}", region_id, e).into()) + })?; + match block_on(f)? { + Some(local_region_info) => { + let local_region_epoch = local_region_info.region.region_epoch.unwrap(); + + // TODO(lance6717): we should only need to check conf_ver because we require all + // peers have SST on the disk, and does not care about which one is + // leader. But since check_sst_for_ingestion also checks epoch version, + // we just keep it here for now. + + // when local region epoch is stale, client can retry write later + if is_epoch_stale(&local_region_epoch, epoch) { + return Err(Error::Engine( + format!("request region {} is ahead of local region, local epoch {:?}, request epoch {:?}, please retry write later", + region_id, local_region_epoch, epoch).into(), + )); + } + // when local region epoch is ahead, client need to rescan region from PD to get + // latest region later + if is_epoch_stale(epoch, &local_region_epoch) { + return Err(Error::Engine( + format!("request region {} is staler than local region, local epoch {:?}, request epoch {:?}, please rescan region later", + region_id, local_region_epoch, epoch).into(), + )); + } + + // not match means to rescan + Ok(()) + } + None => { + // when region not found, we can't tell whether it's stale or ahead, so we just + // return the safest case + Err(Error::Engine( + format!( + "region {} is not found, please rescan region later", + region_id + ) + .into(), + )) + } + } +} + #[macro_export] macro_rules! impl_write { ($fn:ident, $req_ty:ident, $resp_ty:ident, $chunk_ty:ident, $writer_fn:ident) => { @@ -629,6 +691,7 @@ macro_rules! impl_write { ) { let import = self.importer.clone(); let tablets = self.tablets.clone(); + let region_info_accessor = self.region_info_accessor.clone(); let (rx, buf_driver) = create_stream_with_buffer(stream, self.cfg.rl().stream_channel_window); let mut rx = rx.map_err(Error::from); @@ -656,7 +719,15 @@ macro_rules! impl_write { } _ => return Err(Error::InvalidChunk), }; + // wait the region epoch on this TiKV to catch up with the epoch + // in request, which comes from PD and represents the majority + // peers' status. let region_id = meta.get_region_id(); + check_local_region_stale( + region_id, + meta.get_region_epoch(), + region_info_accessor, + )?; let tablet = match tablets.get(region_id) { Some(t) => t, None => { @@ -883,6 +954,10 @@ impl ImportSst for ImportSstService { .observe(start.saturating_elapsed().as_secs_f64()); let mut resp = ApplyResponse::default(); + if get_disk_status(0) != DiskUsage::Normal { + resp.set_error(Error::DiskSpaceNotEnough.into()); + return crate::send_rpc_response!(Ok(resp), sink, label, start); + } match Self::apply_imp(req, importer, applier, limiter, max_raft_size).await { Ok(Some(r)) => resp.set_range(r), @@ -924,6 +999,11 @@ impl ImportSst for ImportSstService { sst_importer::metrics::IMPORTER_DOWNLOAD_DURATION .with_label_values(&["queue"]) .observe(start.saturating_elapsed().as_secs_f64()); + if get_disk_status(0) != DiskUsage::Normal { + let mut resp = DownloadResponse::default(); + resp.set_error(Error::DiskSpaceNotEnough.into()); + return crate::send_rpc_response!(Ok(resp), sink, label, timer); + } // FIXME: download() should be an async fn, to allow BR to cancel // a download task. @@ -1291,19 +1371,30 @@ fn write_needs_restore(write: &[u8]) -> bool { #[cfg(test)] mod test { - use std::collections::HashMap; + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + }; use engine_traits::{CF_DEFAULT, CF_WRITE}; use kvproto::{ kvrpcpb::Context, - metapb::RegionEpoch, + metapb::{Region, RegionEpoch}, raft_cmdpb::{RaftCmdRequest, Request}, }; - use protobuf::Message; + use protobuf::{Message, SingularPtrField}; + use raft::StateRole::Follower; + use raftstore::{ + coprocessor::{region_info_accessor::Callback, RegionInfoProvider}, + RegionInfo, + }; use tikv_kv::{Modify, WriteData}; use txn_types::{Key, TimeStamp, Write, WriteBatchFlags, WriteType}; - use crate::{import::sst_service::RequestCollector, server::raftkv}; + use crate::{ + import::sst_service::{check_local_region_stale, RequestCollector}, + server::raftkv, + }; fn write(key: &[u8], ty: WriteType, commit_ts: u64, start_ts: u64) -> (Vec, Vec) { let k = Key::from_raw(key).append_ts(TimeStamp::new(commit_ts)); @@ -1587,4 +1678,100 @@ mod test { } assert_eq!(total, 100); } + + #[test] + fn test_write_rpc_check_region_epoch() { + struct MockRegionInfoProvider { + map: Mutex>, + } + impl RegionInfoProvider for MockRegionInfoProvider { + fn find_region_by_id( + &self, + region_id: u64, + callback: Callback>, + ) -> Result<(), raftstore::coprocessor::Error> { + callback(self.map.lock().unwrap().get(®ion_id).cloned()); + Ok(()) + } + } + + let mock_provider = Arc::new(MockRegionInfoProvider { + map: Mutex::new(HashMap::new()), + }); + + let mut req_epoch = RegionEpoch { + conf_ver: 10, + version: 10, + ..Default::default() + }; + // test for region not found + let result = check_local_region_stale(1, &req_epoch, mock_provider.clone()); + assert!(result.is_err()); + // check error message contains "rescan region later", client will match this + // string pattern + assert!( + result + .unwrap_err() + .to_string() + .contains("rescan region later") + ); + + let mut local_region_info = RegionInfo { + region: Region { + id: 1, + region_epoch: SingularPtrField::some(req_epoch.clone()), + ..Default::default() + }, + role: Follower, + buckets: 1, + }; + mock_provider + .map + .lock() + .unwrap() + .insert(1, local_region_info.clone()); + // test the local region epoch is same as request + let result = check_local_region_stale(1, &req_epoch, mock_provider.clone()); + result.unwrap(); + + // test the local region epoch is ahead of request + local_region_info + .region + .region_epoch + .as_mut() + .unwrap() + .conf_ver = 11; + mock_provider + .map + .lock() + .unwrap() + .insert(1, local_region_info.clone()); + let result = check_local_region_stale(1, &req_epoch, mock_provider.clone()); + assert!(result.is_err()); + // check error message contains "rescan region later", client will match this + // string pattern + assert!( + result + .unwrap_err() + .to_string() + .contains("rescan region later") + ); + + req_epoch.conf_ver = 11; + let result = check_local_region_stale(1, &req_epoch, mock_provider.clone()); + result.unwrap(); + + // test the local region epoch is staler than request + req_epoch.version = 12; + let result = check_local_region_stale(1, &req_epoch, mock_provider); + assert!(result.is_err()); + // check error message contains "retry write later", client will match this + // string pattern + assert!( + result + .unwrap_err() + .to_string() + .contains("retry write later") + ); + } } diff --git a/tests/failpoints/cases/test_import_service.rs b/tests/failpoints/cases/test_import_service.rs index a2487456108..24c488f3895 100644 --- a/tests/failpoints/cases/test_import_service.rs +++ b/tests/failpoints/cases/test_import_service.rs @@ -8,12 +8,12 @@ use std::{ use file_system::calc_crc32; use futures::{executor::block_on, stream, SinkExt}; use grpcio::{Result, WriteFlags}; -use kvproto::import_sstpb::*; +use kvproto::{disk_usage::DiskUsage, import_sstpb::*, tikvpb_grpc::TikvClient}; use tempfile::{Builder, TempDir}; use test_raftstore::Simulator; use test_sst_importer::*; use tikv::config::TikvConfig; -use tikv_util::{config::ReadableSize, HandyRwLock}; +use tikv_util::{config::ReadableSize, sys::disk, HandyRwLock}; #[allow(dead_code)] #[path = "../../integrations/import/util.rs"] @@ -90,6 +90,43 @@ fn upload_sst(import: &ImportSstClient, meta: &SstMeta, data: &[u8]) -> Result