Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 99 additions & 50 deletions proxy_components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use encryption::DataKeyManager;
use engine_traits::RaftEngine;
Expand Down Expand Up @@ -32,12 +32,21 @@ const TIFLASH_OBSERVER_PRIORITY: u32 = 0;

#[derive(Clone)]
pub struct TiFlashObserver<T: Transport + 'static, ER: RaftEngine> {
pub forwarder: ProxyForwarder<T, ER>,
pub forwarder: Arc<RwLock<Option<ProxyForwarder<T, ER>>>>,
}

impl<T: Transport + 'static, ER: RaftEngine> Default for TiFlashObserver<T, ER> {
fn default() -> Self {
Self {
forwarder: Arc::new(RwLock::new(None)),
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {
#[allow(clippy::too_many_arguments)]
pub fn new(
pub fn init_forwarder(
&mut self,
store_id: u64,
engine: engine_tiflash::MixedModeEngine,
raft_engine: ER,
Expand All @@ -47,20 +56,19 @@ impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {
packed_envs: PackedEnvs,
debug_struct: DebugStruct,
key_manager: Option<Arc<DataKeyManager>>,
) -> Self {
TiFlashObserver {
forwarder: ProxyForwarder::new(
store_id,
engine,
raft_engine,
sst_importer,
trans,
snap_mgr,
packed_envs,
debug_struct,
key_manager,
),
}
) {
let f = ProxyForwarder::new(
store_id,
engine,
raft_engine,
sst_importer,
trans,
snap_mgr,
packed_envs,
debug_struct,
key_manager,
);
self.forwarder.write().expect("poisoned").replace(f);
}

pub fn register_to<E: engine_traits::KvEngine>(
Expand Down Expand Up @@ -105,7 +113,9 @@ impl<T: Transport + 'static, ER: RaftEngine> TiFlashObserver<T, ER> {

impl<T: Transport + 'static, ER: RaftEngine> Coprocessor for TiFlashObserver<T, ER> {
fn stop(&self) {
self.forwarder.stop();
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.stop();
}
}
}

Expand All @@ -117,8 +127,11 @@ impl<T: Transport + 'static, ER: RaftEngine> AdminObserver for TiFlashObserver<T
index: u64,
term: u64,
) -> bool {
self.forwarder
.pre_exec_admin(ob_ctx.region(), req, index, term)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_exec_admin(ob_ctx.region(), req, index, term)
} else {
false
}
}

fn post_exec_admin(
Expand All @@ -129,19 +142,25 @@ impl<T: Transport + 'static, ER: RaftEngine> AdminObserver for TiFlashObserver<T
region_state: &RegionState,
apply_ctx_info: &mut ApplyCtxInfo<'_>,
) -> bool {
self.forwarder.post_exec_admin(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.post_exec_admin(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
} else {
false
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> QueryObserver for TiFlashObserver<T, ER> {
fn on_empty_cmd(&self, ob_ctx: &mut ObserverContext<'_>, index: u64, term: u64) {
self.forwarder.on_empty_cmd(ob_ctx.region(), index, term)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_empty_cmd(ob_ctx.region(), index, term)
}
}

fn post_exec_query(
Expand All @@ -152,20 +171,25 @@ impl<T: Transport + 'static, ER: RaftEngine> QueryObserver for TiFlashObserver<T
region_state: &RegionState,
apply_ctx_info: &mut ApplyCtxInfo<'_>,
) -> bool {
self.forwarder.post_exec_query(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.post_exec_query(
ob_ctx.region(),
cmd,
apply_state,
region_state,
apply_ctx_info,
)
} else {
false
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> UpdateSafeTsObserver for TiFlashObserver<T, ER> {
fn on_update_safe_ts(&self, region_id: u64, self_safe_ts: u64, leader_safe_ts: u64) {
self.forwarder
.on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_update_safe_ts(region_id, self_safe_ts, leader_safe_ts)
}
}
}

Expand All @@ -176,7 +200,9 @@ impl<T: Transport + 'static, ER: RaftEngine> RegionChangeObserver for TiFlashObs
e: RegionChangeEvent,
r: StateRole,
) {
self.forwarder.on_region_changed(ob_ctx.region(), e, r)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_region_changed(ob_ctx.region(), e, r)
}
}

#[allow(clippy::match_like_matches_macro)]
Expand All @@ -186,24 +212,37 @@ impl<T: Transport + 'static, ER: RaftEngine> RegionChangeObserver for TiFlashObs
is_finished: bool,
cmd: Option<&RaftCmdRequest>,
) -> bool {
self.forwarder
.pre_persist(ob_ctx.region(), is_finished, cmd)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_persist(ob_ctx.region(), is_finished, cmd)
} else {
true
}
}

fn pre_write_apply_state(&self, ob_ctx: &mut ObserverContext<'_>) -> bool {
self.forwarder.pre_write_apply_state(ob_ctx.region())
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_write_apply_state(ob_ctx.region())
} else {
true
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> MessageObserver for TiFlashObserver<T, ER> {
fn on_raft_message(&self, msg: &RaftMessage) -> bool {
self.forwarder.on_raft_message(msg)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_raft_message(msg)
} else {
true
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> PdTaskObserver for TiFlashObserver<T, ER> {
fn on_compute_engine_size(&self, store_size: &mut Option<StoreSizeInfo>) {
self.forwarder.on_compute_engine_size(store_size)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_compute_engine_size(store_size)
}
}
}

Expand All @@ -216,8 +255,9 @@ impl<T: Transport + 'static, ER: RaftEngine> ApplySnapshotObserver for TiFlashOb
snap_key: &store::SnapKey,
snap: Option<&store::Snapshot>,
) {
self.forwarder
.pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.pre_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
}
}

fn post_apply_snapshot(
Expand All @@ -227,21 +267,30 @@ impl<T: Transport + 'static, ER: RaftEngine> ApplySnapshotObserver for TiFlashOb
snap_key: &store::SnapKey,
snap: Option<&store::Snapshot>,
) {
self.forwarder
.post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.post_apply_snapshot(ob_ctx.region(), peer_id, snap_key, snap)
}
}

fn should_pre_apply_snapshot(&self) -> bool {
self.forwarder.should_pre_apply_snapshot()
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.should_pre_apply_snapshot()
} else {
false
}
}

fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) {
self.forwarder.cancel_apply_snapshot(region_id, peer_id)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.cancel_apply_snapshot(region_id, peer_id)
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> RoleObserver for TiFlashObserver<T, ER> {
fn on_role_change(&self, ob_ctx: &mut ObserverContext<'_>, r: &RoleChange) {
self.forwarder.on_role_change(ob_ctx.region(), r)
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_role_change(ob_ctx.region(), r)
}
}
}
13 changes: 13 additions & 0 deletions proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ pub struct FFIHelperSet {
pub struct TestData {
pub expected_leader_safe_ts: u64,
pub expected_self_safe_ts: u64,
pub updated_leader_safe_ts: u64,
pub updated_self_safe_ts: u64,
pub checked_time: u64,
}

impl TestData {
pub fn reset(&mut self) {
self.expected_leader_safe_ts = 0;
self.expected_self_safe_ts = 0;
self.updated_leader_safe_ts = 0;
self.updated_self_safe_ts = 0;
self.checked_time = 0;
}
}

#[allow(clippy::type_complexity)]
Expand Down
Loading
Loading