diff --git a/Cargo.lock b/Cargo.lock index 1975b75a856..ea5dc81afd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4677,7 +4677,7 @@ dependencies = [ [[package]] name = "raft" version = "0.7.0" -source = "git+https://github.com/tikv/raft-rs?branch=master#a76fb6ef2cbd002ec10d63a2ac68b4a20b20fe3e" +source = "git+https://github.com/tikv/raft-rs?branch=master#1fd05e000fb094015507c5c985849d370100fb72" dependencies = [ "bytes", "fxhash", @@ -4726,7 +4726,7 @@ dependencies = [ [[package]] name = "raft-proto" version = "0.7.0" -source = "git+https://github.com/tikv/raft-rs?branch=master#a76fb6ef2cbd002ec10d63a2ac68b4a20b20fe3e" +source = "git+https://github.com/tikv/raft-rs?branch=master#1fd05e000fb094015507c5c985849d370100fb72" dependencies = [ "bytes", "protobuf", @@ -7437,7 +7437,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "static_assertions", ] diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 005896ef6de..b7a13ef1368 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -90,6 +90,8 @@ pub struct Config { /// The maximum raft log numbers that applied_index can be ahead of /// persisted_index. pub max_apply_unpersisted_log_limit: u64, + /// Number of Raft ticks between follower read index request retries. + pub raft_read_index_retry_interval_ticks: usize, // follower will reject this follower request to avoid falling behind leader too far, // when the read index is ahead of the sum between the applied index and // follower_read_max_log_gap, @@ -459,6 +461,7 @@ impl Default for Config { raft_log_gc_count_limit: None, raft_log_gc_size_limit: None, max_apply_unpersisted_log_limit: 1024, + raft_read_index_retry_interval_ticks: 4, follower_read_max_log_gap: 100, raft_log_reserve_max_ticks: 6, raft_engine_purge_interval: ReadableDuration::secs(10), @@ -1048,6 +1051,12 @@ impl Config { CONFIG_RAFTSTORE_GAUGE .with_label_values(&["raft_log_gc_size_limit"]) .set(self.raft_log_gc_size_limit.unwrap_or_default().0 as f64); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["max_apply_unpersisted_log_limit"]) + .set(self.max_apply_unpersisted_log_limit as f64); + CONFIG_RAFTSTORE_GAUGE + .with_label_values(&["raft_read_index_retry_interval_ticks"]) + .set(self.raft_read_index_retry_interval_ticks as f64); CONFIG_RAFTSTORE_GAUGE .with_label_values(&["raft_log_reserve_max_ticks"]) .set(self.raft_log_reserve_max_ticks as f64); diff --git a/components/raftstore/src/store/read_queue.rs b/components/raftstore/src/store/read_queue.rs index bde49b4ed30..c7fdf51d7a5 100644 --- a/components/raftstore/src/store/read_queue.rs +++ b/components/raftstore/src/store/read_queue.rs @@ -125,8 +125,10 @@ impl ReadIndexQueue { } /// Check it's necessary to retry pending read requests or not. /// Return true if all such conditions are satisfied: - /// 1. more than an election timeout elapsed from the last request push; - /// 2. more than an election timeout elapsed from the last retry; + /// 1. More than the retry interval (in ticks) has elapsed since the last + /// request push. + /// 2. More than the retry interval (in ticks) has elapsed since the last + /// retry. /// 3. there are still unresolved requests in the queue. pub fn check_needs_retry(&mut self, cfg: &Config) -> bool { if self.reads.len() == self.ready_cnt { @@ -134,7 +136,7 @@ impl ReadIndexQueue { } if self.retry_countdown == usize::MAX { - self.retry_countdown = cfg.raft_election_timeout_ticks - 1; + self.retry_countdown = cfg.raft_read_index_retry_interval_ticks - 1; return false; } @@ -143,7 +145,7 @@ impl ReadIndexQueue { return false; } - self.retry_countdown = cfg.raft_election_timeout_ticks; + self.retry_countdown = cfg.raft_read_index_retry_interval_ticks; true } @@ -289,28 +291,10 @@ impl ReadIndexQueue { if min_changed_offset != usize::MAX { self.ready_cnt = cmp::max(self.ready_cnt, max_changed_offset + 1); } - if max_changed_offset > 0 { - self.fold(min_changed_offset, max_changed_offset); - } - } - - fn fold(&mut self, min_changed_offset: usize, max_changed_offset: usize) { - let mut r_idx = self.reads[max_changed_offset].read_index.unwrap(); - let mut check_offset = max_changed_offset - 1; - loop { - let l_idx = self.reads[check_offset].read_index.unwrap_or(u64::MAX); - if l_idx > r_idx { - self.reads[check_offset].read_index = Some(r_idx); - } else if check_offset < min_changed_offset { - break; - } else { - r_idx = l_idx; - } - if check_offset == 0 { - break; - } - check_offset -= 1; - } + // NOTE: We should not try to fold these read index requests anymore, + // an earlier request can rely a higher committed index due to txn + // lock when 1pc/async-commit is used. + // See https://github.com/tikv/tikv/issues/17018 for more details. } pub fn gc(&mut self) { @@ -507,65 +491,6 @@ mod tests { use super::*; use crate::store::Callback; - #[test] - fn test_read_queue_fold() { - let mut queue = ReadIndexQueue::> { - handled_cnt: 125, - ..Default::default() - }; - for _ in 0..100 { - let id = Uuid::new_v4(); - queue.reads.push_back(ReadIndexRequest::with_command( - id, - RaftCmdRequest::default(), - Callback::None, - Timespec::new(0, 0), - )); - - let offset = queue.handled_cnt + queue.reads.len() - 1; - queue.contexts.insert(id, offset); - } - - queue.advance_replica_reads(Vec::new()); - assert_eq!(queue.ready_cnt, 0); - - queue.advance_replica_reads(vec![(queue.reads[0].id, None, 100)]); - assert_eq!(queue.ready_cnt, 1); - - queue.advance_replica_reads(vec![(queue.reads[1].id, None, 100)]); - assert_eq!(queue.ready_cnt, 2); - - queue.advance_replica_reads(vec![ - (queue.reads[80].id, None, 80), - (queue.reads[84].id, None, 100), - (queue.reads[82].id, None, 70), - (queue.reads[78].id, None, 120), - (queue.reads[77].id, None, 40), - ]); - assert_eq!(queue.ready_cnt, 85); - - queue.advance_replica_reads(vec![ - (queue.reads[20].id, None, 80), - (queue.reads[24].id, None, 100), - (queue.reads[22].id, None, 70), - (queue.reads[18].id, None, 120), - (queue.reads[17].id, None, 40), - ]); - assert_eq!(queue.ready_cnt, 85); - - for i in 0..78 { - assert_eq!(queue.reads[i].read_index.unwrap(), 40, "#{} failed", i); - } - for i in 78..83 { - assert_eq!(queue.reads[i].read_index.unwrap(), 70, "#{} failed", i); - } - for i in 84..85 { - assert_eq!(queue.reads[i].read_index.unwrap(), 100, "#{} failed", i); - } - - queue.clear_all(None); - } - #[test] fn test_become_leader_then_become_follower() { let mut queue = ReadIndexQueue::> { diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 8208c129000..4269c873f74 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -189,6 +189,7 @@ fn test_serde_custom_tikv_config() { raft_log_gc_threshold: 12, raft_log_gc_count_limit: Some(12), raft_log_gc_size_limit: Some(ReadableSize::kb(1)), + raft_read_index_retry_interval_ticks: 123, follower_read_max_log_gap: 100, raft_log_reserve_max_ticks: 100, raft_engine_purge_interval: ReadableDuration::minutes(20), diff --git a/tests/integrations/config/test-custom.toml b/tests/integrations/config/test-custom.toml index e6af548894b..a958bcb7566 100644 --- a/tests/integrations/config/test-custom.toml +++ b/tests/integrations/config/test-custom.toml @@ -159,6 +159,7 @@ raft-log-gc-tick-interval = "12s" raft-log-gc-threshold = 12 raft-log-gc-count-limit = 12 raft-log-gc-size-limit = "1KB" +raft-read-index-retry-interval-ticks = 123 raft-log-reserve-max-ticks = 100 raft-engine-purge-interval = "20m" max-manual-flush-rate = 5.0