From 14941fe6d2f457af43f7339c25ede4971e404305 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 8 Feb 2023 13:36:38 +0800 Subject: [PATCH 1/7] add replica_read Signed-off-by: CalvinNeo --- .../src/ffi/engine_store_helper_impls.rs | 4 + .../src/ffi/raftstore_proxy_helper_impls.rs | 4 - new-mock-engine-store/src/mock_cluster.rs | 2 +- proxy_scripts/ci_check.sh | 1 + proxy_server/Cargo.toml | 2 +- proxy_tests/proxy/mod.rs | 1 + proxy_tests/proxy/replica_read.rs | 266 ++++++++++++++++++ 7 files changed, 274 insertions(+), 6 deletions(-) create mode 100644 proxy_tests/proxy/replica_read.rs diff --git a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs b/engine_store_ffi/src/ffi/engine_store_helper_impls.rs index ff4d60fa9df..e8f8aa8b5b2 100644 --- a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs +++ b/engine_store_ffi/src/ffi/engine_store_helper_impls.rs @@ -44,6 +44,10 @@ pub unsafe fn init_engine_store_server_helper(engine_store_server_helper: *const unsafe impl Sync for EngineStoreServerHelper {} +pub fn set_server_info_resp(res: &kvproto::diagnosticspb::ServerInfoResponse, ptr: RawVoidPtr) { + get_engine_store_server_helper().set_server_info_resp(res, ptr) +} + impl EngineStoreServerHelper { pub fn gc_raw_cpp_ptr(&self, ptr: *mut ::std::os::raw::c_void, tp: RawCppPtrType) { debug_assert!(self.fn_gc_raw_cpp_ptr.is_some()); diff --git a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs index e2abf534e41..03522ea212d 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs @@ -30,10 +30,6 @@ use super::{ }; use crate::{read_index_helper, utils, TiFlashEngine}; -pub fn set_server_info_resp(res: &kvproto::diagnosticspb::ServerInfoResponse, ptr: RawVoidPtr) { - get_engine_store_server_helper().set_server_info_resp(res, ptr) -} - pub trait RaftStoreProxyFFI: Sync { fn set_status(&mut self, s: RaftProxyStatus); fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index 21e43ff8050..d3a0c2fd472 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -98,7 +98,7 @@ pub struct TestData { pub struct Cluster> { // Helper to set ffi_helper_set. pub ffi_helper_lst: Vec, - pub ffi_helper_set: Arc>>, + ffi_helper_set: Arc>>, pub cfg: Config, leaders: HashMap, diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index 1f7e1dcb93a..1d87a63bc64 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -45,6 +45,7 @@ elif [[ $M == "testnew" ]]; then cargo test --package proxy_tests --test proxy flashback cargo test --package proxy_tests --test proxy server_cluster_test cargo test --package proxy_tests --test proxy fast_add_peer + cargo test --package proxy_tests --test proxy replica_read cargo test --package proxy_tests --test proxy ffi -- --test-threads 1 cargo test --package proxy_tests --test proxy write --features="proxy_tests/enable-pagestorage" elif [[ $M == "debug" ]]; then diff --git a/proxy_server/Cargo.toml b/proxy_server/Cargo.toml index 69fb8df002d..efa631f4a15 100644 --- a/proxy_server/Cargo.toml +++ b/proxy_server/Cargo.toml @@ -30,7 +30,7 @@ test-engines-rocksdb = [ test-engines-panic = [ "tikv/test-engines-panic", ] -nortcheck = ["engine_rocks/nortcheck"] +nortcheck = ["engine_rocks/nortcheck", "engine_tiflash/nortcheck"] backup-stream-debug = ["backup-stream/backup-stream-debug"] pprof-fp = ["tikv/pprof-fp"] diff --git a/proxy_tests/proxy/mod.rs b/proxy_tests/proxy/mod.rs index c464315afdc..cde92a0c3de 100644 --- a/proxy_tests/proxy/mod.rs +++ b/proxy_tests/proxy/mod.rs @@ -16,6 +16,7 @@ mod flashback; mod normal; mod proxy; mod region; +mod replica_read; mod server_cluster_test; mod snapshot; mod util; diff --git a/proxy_tests/proxy/replica_read.rs b/proxy_tests/proxy/replica_read.rs new file mode 100644 index 00000000000..9b0a35a8ce2 --- /dev/null +++ b/proxy_tests/proxy/replica_read.rs @@ -0,0 +1,266 @@ +use std::{collections::hash_map::Entry, pin::Pin, sync::Mutex, time::Duration}; + +use engine_store_ffi::ffi::{ + ffi_gc_rust_ptr, ffi_make_async_waker, ffi_make_read_index_task, ffi_make_timer_task, + ffi_poll_read_index_task, ffi_poll_timer_task, + interfaces_ffi::{RaftStoreProxyFFIHelper, RawRustPtr, RawVoidPtr}, + ProtoMsgBaseBuff, +}; + +use crate::proxy::*; + +#[derive(Default)] +struct GcMonitor { + data: Mutex>, +} + +impl GcMonitor { + fn add(&self, ptr: &RawRustPtr, x: isize) { + let data = &mut *self.data.lock().unwrap(); + match data.entry(ptr.type_) { + Entry::Occupied(mut v) => { + *v.get_mut() += x; + } + Entry::Vacant(v) => { + v.insert(x); + } + } + } + fn valid_clean(&self) -> bool { + let data = &*self.data.lock().unwrap(); + for (k, v) in data { + if *v != 0 { + error!("GcMonitor::valid_clean failed at {}:{}", k, v); + return false; + } + } + return true; + } + fn is_empty(&self) -> bool { + let data = &*self.data.lock().unwrap(); + data.is_empty() + } +} + +lazy_static! { + static ref GC_MONITOR: GcMonitor = GcMonitor::default(); +} + +struct RawRustPtrWrap(RawRustPtr); + +impl RawRustPtrWrap { + fn new(ptr: RawRustPtr) -> Self { + GC_MONITOR.add(&ptr, 1); + Self(ptr) + } +} + +impl Drop for RawRustPtrWrap { + fn drop(&mut self) { + ffi_gc_rust_ptr(self.0.ptr, self.0.type_); + GC_MONITOR.add(&self.0, -1); + } +} + +struct ReadIndexFutureTask { + ptr: RawRustPtrWrap, +} + +struct Waker { + _inner: RawRustPtrWrap, + notifier: RawVoidPtr, +} + +impl Waker { + pub fn new() -> Self { + let notifier = new_mock_engine_store::ProxyNotifier::new_raw(); + let ptr = notifier.ptr; + let notifier = ffi_make_async_waker(Some(ffi_wake), notifier); + Self { + _inner: RawRustPtrWrap::new(notifier), + notifier: ptr, + } + } + + fn wait_for(&self, timeout: Duration) { + // Block wait for test + self.get_notifier().blocked_wait_for(timeout) + } + + fn get_notifier(&self) -> &new_mock_engine_store::ProxyNotifier { + unsafe { &*(self.notifier as *mut new_mock_engine_store::ProxyNotifier) } + } + + fn get_raw_waker(&self) -> RawVoidPtr { + self._inner.0.ptr + } +} + +fn blocked_read_index( + req: &kvproto::kvrpcpb::ReadIndexRequest, + ffi_helper: &RaftStoreProxyFFIHelper, + waker: Option<&Waker>, +) -> Option { + let mut resp = kvproto::kvrpcpb::ReadIndexResponse::default(); + + let mut task = { + let req = ProtoMsgBaseBuff::new(req); + let ptr = ffi_make_read_index_task(ffi_helper.proxy_ptr, Pin::new(&req).into()); + if ptr.is_null() { + return None; + } else { + Some(ReadIndexFutureTask { + ptr: RawRustPtrWrap::new(ptr), + }) + } + }; + + while task.is_some() { + let t = task.as_ref().unwrap(); + let waker_ptr = match waker { + None => std::ptr::null_mut(), + Some(w) => w.get_raw_waker(), + }; + if 0 != ffi_poll_read_index_task( + ffi_helper.proxy_ptr, + t.ptr.0.ptr, + &mut resp as *mut _ as RawVoidPtr, + waker_ptr, + ) { + task = None; + } else { + if let Some(w) = waker { + w.wait_for(Duration::from_secs(5)); + } else { + std::thread::sleep(Duration::from_millis(5)); + } + } + } + Some(resp) +} + +extern "C" fn ffi_wake(data: RawVoidPtr) { + let notifier = unsafe { &mut *(data as *mut new_mock_engine_store::ProxyNotifier) }; + notifier.wake() +} + +pub fn configure_for_lease_read( + cluster: &mut Cluster, + base_tick_ms: Option, + election_ticks: Option, +) -> Duration { + if let Some(base_tick_ms) = base_tick_ms { + cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(base_tick_ms); + } + let base_tick_interval = cluster.cfg.raft_store.raft_base_tick_interval.0; + if let Some(election_ticks) = election_ticks { + cluster.cfg.raft_store.raft_election_timeout_ticks = election_ticks; + } + let election_ticks = cluster.cfg.raft_store.raft_election_timeout_ticks as u32; + let election_timeout = base_tick_interval * election_ticks; + // Adjust max leader lease. + cluster.cfg.raft_store.raft_store_max_leader_lease = + ReadableDuration(election_timeout - base_tick_interval); + // Use large peer check interval, abnormal and max leader missing duration to + // make a valid config, that is election timeout x 2 < peer stale state + // check < abnormal < max leader missing duration. + cluster.cfg.raft_store.peer_stale_state_check_interval = ReadableDuration(election_timeout * 3); + cluster.cfg.raft_store.abnormal_leader_missing_duration = + ReadableDuration(election_timeout * 4); + cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration(election_timeout * 5); + + election_timeout +} + +#[test] +fn test_read_index() { + // Initialize cluster + let (mut cluster, pd_client) = new_mock_cluster(0, 3); + configure_for_lease_read(&mut cluster, Some(50), Some(10_000)); + cluster.cfg.raft_store.raft_heartbeat_ticks = 1; + pd_client.disable_default_operator(); + + // Set region and peers + let r1 = cluster.run_conf_change(); + let p1 = new_peer(1, 1); + let p2 = new_peer(2, 2); + cluster.pd_client.must_add_peer(r1, p2.clone()); + let p3 = new_peer(3, 3); + cluster.pd_client.must_add_peer(r1, p3.clone()); + cluster.must_put(b"k0", b"v0"); + cluster.pd_client.must_none_pending_peer(p2.clone()); + cluster.pd_client.must_none_pending_peer(p3.clone()); + let region = cluster.get_region(b"k0"); + assert_eq!(cluster.leader_of_region(region.get_id()).unwrap(), p1); + + let waker = Waker::new(); + + for (id, peer, f) in &[(2, p2, true), (3, p3, false)] { + iter_ffi_helpers( + &cluster, + Some(vec![*id]), + &mut |_, _, ffi_helper: &mut FFIHelperSet| { + let mut request = kvproto::kvrpcpb::ReadIndexRequest::default(); + + { + let context = request.mut_context(); + context.set_region_id(region.get_id()); + context.set_peer(peer.clone()); + context.set_region_epoch(region.get_region_epoch().clone()); + request.set_start_ts(666); + + let mut range = kvproto::kvrpcpb::KeyRange::default(); + range.set_start_key(region.get_start_key().to_vec()); + range.set_end_key(region.get_end_key().to_vec()); + request.mut_ranges().push(range); + + debug!("make read index request {:?}", &request); + } + let w = if *f { Some(&waker) } else { None }; + let resp = blocked_read_index(&request, &*ffi_helper.proxy_helper, w).unwrap(); + assert!(resp.get_read_index() != 0); + assert!(!resp.has_region_error()); + assert!(!resp.has_locked()); + }, + ); + } + + drop(waker); + + { + assert!(!GC_MONITOR.is_empty()); + assert!(GC_MONITOR.valid_clean()); + } + + cluster.shutdown(); +} + +#[test] +fn test_util() { + // test timer + { + let timeout = 128; + let task = RawRustPtrWrap::new(ffi_make_timer_task(timeout)); + assert_eq!(0, unsafe { + ffi_poll_timer_task(task.0.ptr, std::ptr::null_mut()) + }); + std::thread::sleep(Duration::from_millis(timeout + 20)); + assert_ne!( + unsafe { ffi_poll_timer_task(task.0.ptr, std::ptr::null_mut()) }, + 0 + ); + + let task = RawRustPtrWrap::new(ffi_make_timer_task(timeout)); + let waker = Waker::new(); + assert_eq!(0, unsafe { + ffi_poll_timer_task(task.0.ptr, waker.get_raw_waker()) + }); + let now = std::time::Instant::now(); + waker.wait_for(Duration::from_secs(256)); + assert_ne!(0, unsafe { + ffi_poll_timer_task(task.0.ptr, waker.get_raw_waker()) + }); + assert!(now.elapsed() < Duration::from_secs(256)); + } + assert!(GC_MONITOR.valid_clean()); +} From eca6268ba57e4db326bc87a5787052513b432ee9 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 8 Feb 2023 13:56:45 +0800 Subject: [PATCH 2/7] remove all ffi_interfaces Signed-off-by: CalvinNeo --- engine_store_ffi/src/core/common.rs | 2 +- engine_store_ffi/src/core/fast_add_peer.rs | 2 +- engine_store_ffi/src/engine/ffihub_impl.rs | 4 +- engine_store_ffi/src/ffi/basic_ffi_impls.rs | 2 +- engine_store_ffi/src/ffi/context_impls.rs | 2 +- engine_store_ffi/src/ffi/domain_impls.rs | 6 +- engine_store_ffi/src/ffi/encryption_impls.rs | 2 +- .../src/ffi/engine_store_helper_impls.rs | 10 +- engine_store_ffi/src/ffi/lock_cf_reader.rs | 2 +- .../src/ffi/raftstore_proxy_helper_impls.rs | 6 +- engine_store_ffi/src/ffi/sst_reader_impls.rs | 2 +- new-mock-engine-store/src/mock_cluster.rs | 2 +- .../src/mock_page_storage.rs | 16 +- new-mock-engine-store/src/mock_store.rs | 235 +++++++++--------- proxy_server/src/util.rs | 2 +- proxy_tests/proxy/proxy.rs | 2 +- .../cases/test_proxy_replica_read.rs | 2 +- 17 files changed, 149 insertions(+), 150 deletions(-) diff --git a/engine_store_ffi/src/core/common.rs b/engine_store_ffi/src/core/common.rs index b0f80d80ced..73111901dc4 100644 --- a/engine_store_ffi/src/core/common.rs +++ b/engine_store_ffi/src/core/common.rs @@ -39,7 +39,7 @@ pub use yatp::{ pub(crate) use crate::{ ffi::{ gen_engine_store_server_helper, - interfaces::root::DB::{ + interfaces_ffi::{ ColumnFamilyType, EngineStoreApplyRes, EngineStoreServerHelper, RaftCmdHeader, RawCppPtr, WriteCmdType, }, diff --git a/engine_store_ffi/src/core/fast_add_peer.rs b/engine_store_ffi/src/core/fast_add_peer.rs index 6a37a928155..fa0f29245c9 100644 --- a/engine_store_ffi/src/core/fast_add_peer.rs +++ b/engine_store_ffi/src/core/fast_add_peer.rs @@ -1,7 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ core::{common::*, ProxyForwarder}, - ffi::interfaces::root::DB::FastAddPeerStatus, + ffi::interfaces_ffi::FastAddPeerStatus, }; pub fn get_region_local_state( diff --git a/engine_store_ffi/src/engine/ffihub_impl.rs b/engine_store_ffi/src/engine/ffihub_impl.rs index 2f5f52d9952..c9866093edd 100644 --- a/engine_store_ffi/src/engine/ffihub_impl.rs +++ b/engine_store_ffi/src/engine/ffihub_impl.rs @@ -2,7 +2,7 @@ use engine_tiflash::{FsStatsExt, RawPSWriteBatchPtr, RawPSWriteBatchWrapper}; use crate::ffi::{ - interfaces::root::DB as ffi_interfaces, + interfaces_ffi, interfaces_ffi::{EngineStoreServerHelper, PageAndCppStrWithView, RawCppPtr}, }; @@ -102,7 +102,7 @@ impl From for RawPSWriteBatchWrapper { } #[allow(clippy::from_over_into)] -impl Into for ffi_interfaces::StoreStats { +impl Into for interfaces_ffi::StoreStats { fn into(self) -> FsStatsExt { FsStatsExt { available: self.fs_stats.avail_size, diff --git a/engine_store_ffi/src/ffi/basic_ffi_impls.rs b/engine_store_ffi/src/ffi/basic_ffi_impls.rs index 228c959e22a..3c756e115f8 100644 --- a/engine_store_ffi/src/ffi/basic_ffi_impls.rs +++ b/engine_store_ffi/src/ffi/basic_ffi_impls.rs @@ -1,7 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use std::pin::Pin; -use super::interfaces::root::DB::BaseBuffView; +pub use super::interfaces_ffi::BaseBuffView; impl From<&[u8]> for BaseBuffView { fn from(s: &[u8]) -> Self { diff --git a/engine_store_ffi/src/ffi/context_impls.rs b/engine_store_ffi/src/ffi/context_impls.rs index 8febbf23cc5..2b511dc8ca5 100644 --- a/engine_store_ffi/src/ffi/context_impls.rs +++ b/engine_store_ffi/src/ffi/context_impls.rs @@ -2,7 +2,7 @@ use super::{ get_engine_store_server_helper, - interfaces::root::DB::{ + interfaces_ffi::{ RawCppPtr, RawCppPtrArr, RawCppPtrCarr, RawCppPtrTuple, RawVoidPtr, SpecialCppPtrType, }, }; diff --git a/engine_store_ffi/src/ffi/domain_impls.rs b/engine_store_ffi/src/ffi/domain_impls.rs index bfa0eac5d17..ca49e8a4e0c 100644 --- a/engine_store_ffi/src/ffi/domain_impls.rs +++ b/engine_store_ffi/src/ffi/domain_impls.rs @@ -3,8 +3,8 @@ use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE}; use super::{ - interfaces, - interfaces::root::DB::{ + interfaces_ffi, + interfaces_ffi::{ BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, WriteCmdType, WriteCmdsView, }, @@ -116,7 +116,7 @@ impl Into for RawRustPtrType { } } -pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces::root::DB::RawRustPtrType) { +pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRustPtrType) { if data.is_null() { return; } diff --git a/engine_store_ffi/src/ffi/encryption_impls.rs b/engine_store_ffi/src/ffi/encryption_impls.rs index 0d0b399ff55..fc56905ea69 100644 --- a/engine_store_ffi/src/ffi/encryption_impls.rs +++ b/engine_store_ffi/src/ffi/encryption_impls.rs @@ -4,7 +4,7 @@ use engine_traits::{EncryptionKeyManager, EncryptionMethod, FileEncryptionInfo}; use super::{ get_engine_store_server_helper, - interfaces::root::DB::{ + interfaces_ffi::{ BaseBuffView, EncryptionMethod as EncryptionMethodImpl, FileEncryptionInfoRaw, FileEncryptionRes, RaftStoreProxyPtr, RawCppStringPtr, }, diff --git a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs b/engine_store_ffi/src/ffi/engine_store_helper_impls.rs index e8f8aa8b5b2..b92eed0db02 100644 --- a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs +++ b/engine_store_ffi/src/ffi/engine_store_helper_impls.rs @@ -5,8 +5,8 @@ use kvproto::{kvrpcpb, metapb, raft_cmdpb}; use super::{ basic_ffi_impls::*, - interfaces, - interfaces::root::DB::{ + interfaces_ffi, + interfaces_ffi::{ BaseBuffView, ColumnFamilyType, CppStrWithView, EngineStoreApplyRes, EngineStoreServerHelper, EngineStoreServerStatus, FastAddPeerRes, HttpRequestRes, RaftCmdHeader, RaftStoreProxyFFIHelper, RawCppPtr, RawCppPtrCarr, RawCppPtrType, @@ -311,7 +311,7 @@ impl EngineStoreServerHelper { pub fn set_read_index_resp(&self, ptr: RawVoidPtr, r: &kvrpcpb::ReadIndexResponse) { let buff = ProtoMsgBaseBuff::new(r); self.set_pb_msg_by_bytes( - interfaces::root::DB::MsgPBType::ReadIndexResponse, + interfaces_ffi::MsgPBType::ReadIndexResponse, ptr, Pin::new(&buff).into(), ) @@ -351,7 +351,7 @@ impl EngineStoreServerHelper { pub fn set_pb_msg_by_bytes( &self, - type_: interfaces::root::DB::MsgPBType, + type_: interfaces_ffi::MsgPBType, ptr: RawVoidPtr, buff: BaseBuffView, ) { @@ -366,7 +366,7 @@ impl EngineStoreServerHelper { ) { let buff = ProtoMsgBaseBuff::new(res); self.set_pb_msg_by_bytes( - interfaces::root::DB::MsgPBType::ServerInfoResponse, + interfaces_ffi::MsgPBType::ServerInfoResponse, ptr, Pin::new(&buff).into(), ) diff --git a/engine_store_ffi/src/ffi/lock_cf_reader.rs b/engine_store_ffi/src/ffi/lock_cf_reader.rs index 9a81037f8a0..6233f9306e8 100644 --- a/engine_store_ffi/src/ffi/lock_cf_reader.rs +++ b/engine_store_ffi/src/ffi/lock_cf_reader.rs @@ -10,7 +10,7 @@ use file_system::File; use raftstore::store::snap::snap_io::get_decrypter_reader; use tikv_util::codec::bytes::CompactBytesFromFileDecoder; -use super::interfaces::root::DB::{BaseBuffView, RawVoidPtr}; +use super::interfaces_ffi::{BaseBuffView, RawVoidPtr}; type LockCFDecoder = BufReader>; diff --git a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs index 03522ea212d..4e976b2d25f 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs @@ -19,8 +19,8 @@ use super::{ domain_impls::*, encryption_impls::*, engine_store_helper_impls::*, - interfaces, - interfaces::root::DB::{ + interfaces_ffi, + interfaces_ffi::{ BaseBuffView, ConstRawVoidPtr, CppStrVecView, KVGetStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, RaftStoreProxyPtr, RawCppPtr, RawCppStringPtr, RawRustPtr, RawVoidPtr, SSTReaderInterfaces, @@ -173,7 +173,7 @@ unsafe extern "C" fn ffi_get_region_local_state( Ok(v) => { if let Some(buff) = v { get_engine_store_server_helper().set_pb_msg_by_bytes( - interfaces::root::DB::MsgPBType::RegionLocalState, + interfaces_ffi::MsgPBType::RegionLocalState, data, buff.into(), ); diff --git a/engine_store_ffi/src/ffi/sst_reader_impls.rs b/engine_store_ffi/src/ffi/sst_reader_impls.rs index 5f4e049448e..17dd4d81a21 100644 --- a/engine_store_ffi/src/ffi/sst_reader_impls.rs +++ b/engine_store_ffi/src/ffi/sst_reader_impls.rs @@ -6,7 +6,7 @@ use engine_rocks::{get_env, RocksSstIterator, RocksSstReader}; use engine_traits::{IterOptions, Iterator, RefIterable, SstReader}; use super::{ - interfaces::root::DB::{ + interfaces_ffi::{ BaseBuffView, ColumnFamilyType, RaftStoreProxyPtr, RawVoidPtr, SSTReaderInterfaces, SSTReaderPtr, SSTView, SSTViewVec, }, diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index d3a0c2fd472..bf98252a34c 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -12,7 +12,7 @@ use collections::{HashMap, HashSet}; use encryption::DataKeyManager; // mock cluster pub use engine_store_ffi::ffi::{ - interfaces::root::DB as ffi_interfaces, + interfaces_ffi, interfaces_ffi::{ EngineStoreServerHelper, RaftProxyStatus, RaftStoreProxyFFIHelper, RawCppPtr, }, diff --git a/new-mock-engine-store/src/mock_page_storage.rs b/new-mock-engine-store/src/mock_page_storage.rs index b9a2f331c84..09897944f69 100644 --- a/new-mock-engine-store/src/mock_page_storage.rs +++ b/new-mock-engine-store/src/mock_page_storage.rs @@ -7,7 +7,7 @@ use std::{ }; pub use engine_store_ffi::ffi::{ - interfaces::root::DB as ffi_interfaces, + interfaces_ffi, interfaces_ffi::{ BaseBuffView, CppStrWithView, EngineStoreServerHelper, PageAndCppStrWithView, RaftStoreProxyFFIHelper, RawCppPtr, RawCppPtrCarr, RawVoidPtr, @@ -76,7 +76,7 @@ pub struct MockPageStorage { } pub unsafe extern "C" fn ffi_mockps_create_write_batch( - wrap: *const ffi_interfaces::EngineStoreServerWrap, + wrap: *const interfaces_ffi::EngineStoreServerWrap, ) -> RawCppPtr { let store = into_engine_store_server_wrap(wrap); let core = (*store.engine_store_server).page_storage.core.clone(); @@ -133,7 +133,7 @@ pub unsafe extern "C" fn ffi_mockps_write_batch_clear(wb: RawVoidPtr) { } pub unsafe extern "C" fn ffi_mockps_consume_write_batch( - wrap: *const ffi_interfaces::EngineStoreServerWrap, + wrap: *const interfaces_ffi::EngineStoreServerWrap, wb: RawVoidPtr, ) { let store = into_engine_store_server_wrap(wrap); @@ -157,7 +157,7 @@ pub unsafe extern "C" fn ffi_mockps_consume_write_batch( } pub unsafe extern "C" fn ffi_mockps_handle_read_page( - wrap: *const ffi_interfaces::EngineStoreServerWrap, + wrap: *const interfaces_ffi::EngineStoreServerWrap, page_id: BaseBuffView, ) -> CppStrWithView { let store = into_engine_store_server_wrap(wrap); @@ -174,7 +174,7 @@ pub unsafe extern "C" fn ffi_mockps_handle_read_page( } pub unsafe extern "C" fn ffi_mockps_handle_scan_page( - wrap: *const ffi_interfaces::EngineStoreServerWrap, + wrap: *const interfaces_ffi::EngineStoreServerWrap, start_page_id: BaseBuffView, end_page_id: BaseBuffView, ) -> RawCppPtrCarr { @@ -211,13 +211,13 @@ pub unsafe extern "C" fn ffi_mockps_handle_scan_page( } pub unsafe extern "C" fn ffi_mockps_handle_purge_pagestorage( - _wrap: *const ffi_interfaces::EngineStoreServerWrap, + _wrap: *const interfaces_ffi::EngineStoreServerWrap, ) { // TODO } pub unsafe extern "C" fn ffi_mockps_handle_seek_ps_key( - wrap: *const ffi_interfaces::EngineStoreServerWrap, + wrap: *const interfaces_ffi::EngineStoreServerWrap, page_id: BaseBuffView, ) -> CppStrWithView { // Find the first great or equal than @@ -233,7 +233,7 @@ pub unsafe extern "C" fn ffi_mockps_handle_seek_ps_key( } pub unsafe extern "C" fn ffi_mockps_ps_is_empty( - wrap: *const ffi_interfaces::EngineStoreServerWrap, + wrap: *const interfaces_ffi::EngineStoreServerWrap, ) -> u8 { let store = into_engine_store_server_wrap(wrap); let guard = (*store.engine_store_server) diff --git a/new-mock-engine-store/src/mock_store.rs b/new-mock-engine-store/src/mock_store.rs index 60b68aa8c5e..d33f2640994 100644 --- a/new-mock-engine-store/src/mock_store.rs +++ b/new-mock-engine-store/src/mock_store.rs @@ -14,7 +14,6 @@ pub use std::{ use assert_type_eq; use collections::{HashMap, HashSet}; pub use engine_store_ffi::ffi::{ - interfaces::root::DB as ffi_interfaces, interfaces_ffi, interfaces_ffi::{EngineStoreServerHelper, RaftStoreProxyFFIHelper, RawCppPtr, RawVoidPtr}, UnwrapExternCFunc, @@ -124,7 +123,7 @@ impl EngineStoreServer { pub fn get_mem( &self, region_id: u64, - cf: ffi_interfaces::ColumnFamilyType, + cf: interfaces_ffi::ColumnFamilyType, key: &Vec, ) -> Option<&Vec> { match self.kvstore.get(®ion_id) { @@ -338,8 +337,8 @@ impl EngineStoreServerWrap { &mut self, req: &kvproto::raft_cmdpb::AdminRequest, resp: &kvproto::raft_cmdpb::AdminResponse, - header: ffi_interfaces::RaftCmdHeader, - ) -> ffi_interfaces::EngineStoreApplyRes { + header: interfaces_ffi::RaftCmdHeader, + ) -> interfaces_ffi::EngineStoreApplyRes { let region_id = header.region_id; let node_id = (*self.engine_store_server).id; info!("handle_admin_raft_cmd"; @@ -359,7 +358,7 @@ impl EngineStoreServerWrap { "node_id"=>node_id, ); panic!("observe obsolete admin index"); - // return ffi_interfaces::EngineStoreApplyRes::None; + // return interfaces_ffi::EngineStoreApplyRes::None; } match req.get_cmd_type() { AdminCmdType::ChangePeer | AdminCmdType::ChangePeerV2 => { @@ -468,7 +467,7 @@ impl EngineStoreServerWrap { } AdminCmdType::CommitMerge => { fail::fail_point!("ffi_before_commit_merge", |_| { - return ffi_interfaces::EngineStoreApplyRes::Persist; + return interfaces_ffi::EngineStoreApplyRes::Persist; }); let (target_id, source_id) = { (region_id, req.get_commit_merge().get_source().get_id()) }; @@ -557,18 +556,18 @@ impl EngineStoreServerWrap { AdminCmdType::CompactLog => { fail::fail_point!("no_persist_compact_log", |_| { // Persist data, but don't persist meta. - ffi_interfaces::EngineStoreApplyRes::None + interfaces_ffi::EngineStoreApplyRes::None }); - ffi_interfaces::EngineStoreApplyRes::Persist + interfaces_ffi::EngineStoreApplyRes::Persist } AdminCmdType::PrepareFlashback | AdminCmdType::FinishFlashback => { fail::fail_point!("no_persist_flashback", |_| { // Persist data, but don't persist meta. - ffi_interfaces::EngineStoreApplyRes::None + interfaces_ffi::EngineStoreApplyRes::None }); - ffi_interfaces::EngineStoreApplyRes::Persist + interfaces_ffi::EngineStoreApplyRes::Persist } - _ => ffi_interfaces::EngineStoreApplyRes::Persist, + _ => interfaces_ffi::EngineStoreApplyRes::Persist, } }; @@ -597,7 +596,7 @@ impl EngineStoreServerWrap { None } }; - if res == ffi_interfaces::EngineStoreApplyRes::Persist { + if res == interfaces_ffi::EngineStoreApplyRes::Persist { // Persist tells ApplyDelegate to do a commit. // So we also need a persist of actual data on engine-store' side. if let Some(region) = region { @@ -617,9 +616,9 @@ impl EngineStoreServerWrap { unsafe fn handle_write_raft_cmd( &mut self, - cmds: ffi_interfaces::WriteCmdsView, - header: ffi_interfaces::RaftCmdHeader, - ) -> ffi_interfaces::EngineStoreApplyRes { + cmds: interfaces_ffi::WriteCmdsView, + header: interfaces_ffi::RaftCmdHeader, + ) -> interfaces_ffi::EngineStoreApplyRes { let region_id = header.region_id; let server = &mut (*self.engine_store_server); let node_id = (*self.engine_store_server).id; @@ -633,7 +632,7 @@ impl EngineStoreServerWrap { "node_id"=>node_id, ); panic!("observe obsolete write index"); - // return ffi_interfaces::EngineStoreApplyRes::None; + // return interfaces_ffi::EngineStoreApplyRes::None; } for i in 0..cmds.len { let key = &*cmds.keys.add(i as _); @@ -667,7 +666,7 @@ impl EngineStoreServerWrap { // If we don't support new proxy, we persist everytime. write_to_db_data(server, region, "write".to_string()); } - ffi_interfaces::EngineStoreApplyRes::None + interfaces_ffi::EngineStoreApplyRes::None }; match (*self.engine_store_server).kvstore.entry(region_id) { @@ -683,20 +682,20 @@ impl EngineStoreServerWrap { } unsafe extern "C" fn ffi_set_pb_msg_by_bytes( - type_: ffi_interfaces::MsgPBType, - ptr: ffi_interfaces::RawVoidPtr, - buff: ffi_interfaces::BaseBuffView, + type_: interfaces_ffi::MsgPBType, + ptr: interfaces_ffi::RawVoidPtr, + buff: interfaces_ffi::BaseBuffView, ) { match type_ { - ffi_interfaces::MsgPBType::ReadIndexResponse => { + interfaces_ffi::MsgPBType::ReadIndexResponse => { let v = &mut *(ptr as *mut kvproto::kvrpcpb::ReadIndexResponse); v.merge_from_bytes(buff.to_slice()).unwrap(); } - ffi_interfaces::MsgPBType::ServerInfoResponse => { + interfaces_ffi::MsgPBType::ServerInfoResponse => { let v = &mut *(ptr as *mut kvproto::diagnosticspb::ServerInfoResponse); v.merge_from_bytes(buff.to_slice()).unwrap(); } - ffi_interfaces::MsgPBType::RegionLocalState => { + interfaces_ffi::MsgPBType::RegionLocalState => { let v = &mut *(ptr as *mut kvproto::raft_serverpb::RegionLocalState); v.merge_from_bytes(buff.to_slice()).unwrap(); } @@ -707,8 +706,8 @@ pub fn gen_engine_store_server_helper( wrap: Pin<&EngineStoreServerWrap>, ) -> EngineStoreServerHelper { EngineStoreServerHelper { - magic_number: ffi_interfaces::RAFT_STORE_PROXY_MAGIC_NUMBER, - version: ffi_interfaces::RAFT_STORE_PROXY_VERSION, + magic_number: interfaces_ffi::RAFT_STORE_PROXY_MAGIC_NUMBER, + version: interfaces_ffi::RAFT_STORE_PROXY_VERSION, inner: &(*wrap) as *const EngineStoreServerWrap as *mut _, fn_gen_cpp_string: Some(ffi_gen_cpp_string), fn_handle_write_raft_cmd: Some(ffi_handle_write_raft_cmd), @@ -749,17 +748,17 @@ pub fn gen_engine_store_server_helper( } pub unsafe fn into_engine_store_server_wrap( - arg1: *const ffi_interfaces::EngineStoreServerWrap, + arg1: *const interfaces_ffi::EngineStoreServerWrap, ) -> &'static mut EngineStoreServerWrap { &mut *(arg1 as *mut EngineStoreServerWrap) } unsafe extern "C" fn ffi_handle_admin_raft_cmd( - arg1: *const ffi_interfaces::EngineStoreServerWrap, - arg2: ffi_interfaces::BaseBuffView, - arg3: ffi_interfaces::BaseBuffView, - arg4: ffi_interfaces::RaftCmdHeader, -) -> ffi_interfaces::EngineStoreApplyRes { + arg1: *const interfaces_ffi::EngineStoreServerWrap, + arg2: interfaces_ffi::BaseBuffView, + arg3: interfaces_ffi::BaseBuffView, + arg4: interfaces_ffi::RaftCmdHeader, +) -> interfaces_ffi::EngineStoreApplyRes { let store = into_engine_store_server_wrap(arg1); let mut req = kvproto::raft_cmdpb::AdminRequest::default(); let mut resp = kvproto::raft_cmdpb::AdminResponse::default(); @@ -769,10 +768,10 @@ unsafe extern "C" fn ffi_handle_admin_raft_cmd( } unsafe extern "C" fn ffi_handle_write_raft_cmd( - arg1: *const ffi_interfaces::EngineStoreServerWrap, - arg2: ffi_interfaces::WriteCmdsView, - arg3: ffi_interfaces::RaftCmdHeader, -) -> ffi_interfaces::EngineStoreApplyRes { + arg1: *const interfaces_ffi::EngineStoreServerWrap, + arg2: interfaces_ffi::WriteCmdsView, + arg3: interfaces_ffi::RaftCmdHeader, +) -> interfaces_ffi::EngineStoreApplyRes { let store = into_engine_store_server_wrap(arg1); store.handle_write_raft_cmd(arg2, arg3) } @@ -789,15 +788,15 @@ pub enum RawCppPtrTypeImpl { PSPageAndCppStr = 15, } -impl From for ffi_interfaces::RawCppPtrType { +impl From for interfaces_ffi::RawCppPtrType { fn from(value: RawCppPtrTypeImpl) -> Self { - assert_type_eq::assert_type_eq!(ffi_interfaces::RawCppPtrType, u32); + assert_type_eq::assert_type_eq!(interfaces_ffi::RawCppPtrType, u32); value.int_value() } } -impl From for RawCppPtrTypeImpl { - fn from(value: ffi_interfaces::RawCppPtrType) -> Self { +impl From for RawCppPtrTypeImpl { + fn from(value: interfaces_ffi::RawCppPtrType) -> Self { if let Ok(s) = RawCppPtrTypeImpl::from_int(value) { s } else { @@ -807,7 +806,7 @@ impl From for RawCppPtrTypeImpl { } extern "C" fn ffi_need_flush_data( - _arg1: *mut ffi_interfaces::EngineStoreServerWrap, + _arg1: *mut interfaces_ffi::EngineStoreServerWrap, _region_id: u64, ) -> u8 { fail::fail_point!("need_flush_data", |e| e.unwrap().parse::().unwrap()); @@ -815,7 +814,7 @@ extern "C" fn ffi_need_flush_data( } unsafe extern "C" fn ffi_try_flush_data( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, + arg1: *mut interfaces_ffi::EngineStoreServerWrap, region_id: u64, _try_until_succeed: u8, index: u64, @@ -868,16 +867,16 @@ unsafe extern "C" fn ffi_try_flush_data( true as u8 } -extern "C" fn ffi_gen_cpp_string(s: ffi_interfaces::BaseBuffView) -> ffi_interfaces::RawCppPtr { +extern "C" fn ffi_gen_cpp_string(s: interfaces_ffi::BaseBuffView) -> interfaces_ffi::RawCppPtr { let str = Box::new(Vec::from(s.to_slice())); let ptr = Box::into_raw(str); - ffi_interfaces::RawCppPtr { + interfaces_ffi::RawCppPtr { ptr: ptr as *mut _, type_: RawCppPtrTypeImpl::String.into(), } } -pub struct RawCppStringPtrGuard(ffi_interfaces::RawCppStringPtr); +pub struct RawCppStringPtrGuard(interfaces_ffi::RawCppStringPtr); impl Default for RawCppStringPtrGuard { fn default() -> Self { @@ -885,20 +884,20 @@ impl Default for RawCppStringPtrGuard { } } -impl std::convert::AsRef for RawCppStringPtrGuard { - fn as_ref(&self) -> &ffi_interfaces::RawCppStringPtr { +impl std::convert::AsRef for RawCppStringPtrGuard { + fn as_ref(&self) -> &interfaces_ffi::RawCppStringPtr { &self.0 } } -impl std::convert::AsMut for RawCppStringPtrGuard { - fn as_mut(&mut self) -> &mut ffi_interfaces::RawCppStringPtr { +impl std::convert::AsMut for RawCppStringPtrGuard { + fn as_mut(&mut self) -> &mut interfaces_ffi::RawCppStringPtr { &mut self.0 } } impl Drop for RawCppStringPtrGuard { fn drop(&mut self) { - ffi_interfaces::RawCppPtr { + interfaces_ffi::RawCppPtr { ptr: self.0 as *mut _, type_: RawCppPtrTypeImpl::String.into(), }; @@ -958,20 +957,20 @@ impl ProxyNotifier { } extern "C" fn ffi_gc_special_raw_cpp_ptr( - ptr: ffi_interfaces::RawVoidPtr, + ptr: interfaces_ffi::RawVoidPtr, hint_len: u64, - tp: ffi_interfaces::SpecialCppPtrType, + tp: interfaces_ffi::SpecialCppPtrType, ) { match tp { - ffi_interfaces::SpecialCppPtrType::None => (), - ffi_interfaces::SpecialCppPtrType::TupleOfRawCppPtr => unsafe { + interfaces_ffi::SpecialCppPtrType::None => (), + interfaces_ffi::SpecialCppPtrType::TupleOfRawCppPtr => unsafe { let p = Box::from_raw(std::slice::from_raw_parts_mut( ptr as *mut RawCppPtr, hint_len as usize, )); drop(p); }, - ffi_interfaces::SpecialCppPtrType::ArrayOfRawCppPtr => unsafe { + interfaces_ffi::SpecialCppPtrType::ArrayOfRawCppPtr => unsafe { let p = Box::from_raw(std::slice::from_raw_parts_mut( ptr as *mut RawVoidPtr, hint_len as usize, @@ -982,8 +981,8 @@ extern "C" fn ffi_gc_special_raw_cpp_ptr( } extern "C" fn ffi_gc_raw_cpp_ptr( - ptr: ffi_interfaces::RawVoidPtr, - tp: ffi_interfaces::RawCppPtrType, + ptr: interfaces_ffi::RawVoidPtr, + tp: interfaces_ffi::RawCppPtrType, ) { match tp.into() { RawCppPtrTypeImpl::None => {} @@ -1007,8 +1006,8 @@ extern "C" fn ffi_gc_raw_cpp_ptr( } extern "C" fn ffi_gc_raw_cpp_ptr_carr( - ptr: ffi_interfaces::RawVoidPtr, - tp: ffi_interfaces::RawCppPtrType, + ptr: interfaces_ffi::RawVoidPtr, + tp: interfaces_ffi::RawCppPtrType, len: u64, ) { match tp.into() { @@ -1037,15 +1036,15 @@ extern "C" fn ffi_gc_raw_cpp_ptr_carr( } unsafe extern "C" fn ffi_atomic_update_proxy( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - arg2: *mut ffi_interfaces::RaftStoreProxyFFIHelper, + arg1: *mut interfaces_ffi::EngineStoreServerWrap, + arg2: *mut interfaces_ffi::RaftStoreProxyFFIHelper, ) { let store = into_engine_store_server_wrap(arg1); store.maybe_proxy_helper = Some(&mut *(arg2 as *mut RaftStoreProxyFFIHelper)); } unsafe extern "C" fn ffi_handle_destroy( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, + arg1: *mut interfaces_ffi::EngineStoreServerWrap, arg2: u64, ) { let store = into_engine_store_server_wrap(arg1); @@ -1058,8 +1057,8 @@ type MockRaftProxyHelper = RaftStoreProxyFFIHelper; #[derive(Debug)] pub struct SSTReader<'a> { proxy_helper: &'a MockRaftProxyHelper, - inner: ffi_interfaces::SSTReaderPtr, - type_: ffi_interfaces::ColumnFamilyType, + inner: interfaces_ffi::SSTReaderPtr, + type_: interfaces_ffi::ColumnFamilyType, } impl<'a> Drop for SSTReader<'a> { @@ -1076,7 +1075,7 @@ impl<'a> Drop for SSTReader<'a> { impl<'a> SSTReader<'a> { pub unsafe fn new( proxy_helper: &'a MockRaftProxyHelper, - view: &'a ffi_interfaces::SSTView, + view: &'a interfaces_ffi::SSTView, ) -> Self { SSTReader { proxy_helper, @@ -1097,14 +1096,14 @@ impl<'a> SSTReader<'a> { != 0 } - pub unsafe fn key(&mut self) -> ffi_interfaces::BaseBuffView { + pub unsafe fn key(&mut self) -> interfaces_ffi::BaseBuffView { (self.proxy_helper.sst_reader_interfaces.fn_key.into_inner())( self.inner.clone(), self.type_, ) } - pub unsafe fn value(&mut self) -> ffi_interfaces::BaseBuffView { + pub unsafe fn value(&mut self) -> interfaces_ffi::BaseBuffView { (self .proxy_helper .sst_reader_interfaces @@ -1125,13 +1124,13 @@ struct PrehandledSnapshot { } unsafe extern "C" fn ffi_pre_handle_snapshot( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - region_buff: ffi_interfaces::BaseBuffView, + arg1: *mut interfaces_ffi::EngineStoreServerWrap, + region_buff: interfaces_ffi::BaseBuffView, peer_id: u64, - snaps: ffi_interfaces::SSTViewVec, + snaps: interfaces_ffi::SSTViewVec, index: u64, term: u64, -) -> ffi_interfaces::RawCppPtr { +) -> interfaces_ffi::RawCppPtr { let store = into_engine_store_server_wrap(arg1); let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap()); let _kvstore = &mut (*store.engine_store_server).kvstore; @@ -1164,7 +1163,7 @@ unsafe extern "C" fn ffi_pre_handle_snapshot( for i in 0..snaps.len { let snapshot = snaps.views.add(i as usize); - let view = &*(snapshot as *mut ffi_interfaces::SSTView); + let view = &*(snapshot as *mut interfaces_ffi::SSTView); let mut sst_reader = SSTReader::new(proxy_helper, view); while sst_reader.remained() { @@ -1187,27 +1186,27 @@ unsafe extern "C" fn ffi_pre_handle_snapshot( region.apply_state.mut_truncated_state().set_index(index); region.apply_state.mut_truncated_state().set_term(term); } - ffi_interfaces::RawCppPtr { + interfaces_ffi::RawCppPtr { ptr: Box::into_raw(Box::new(PrehandledSnapshot { region: Some(*region), - })) as *const Region as ffi_interfaces::RawVoidPtr, + })) as *const Region as interfaces_ffi::RawVoidPtr, type_: RawCppPtrTypeImpl::PreHandledSnapshotWithBlock.into(), } } // In case of newly added cfs. #[allow(unreachable_patterns)] -pub fn cf_to_name(cf: ffi_interfaces::ColumnFamilyType) -> &'static str { +pub fn cf_to_name(cf: interfaces_ffi::ColumnFamilyType) -> &'static str { match cf { - ffi_interfaces::ColumnFamilyType::Lock => CF_LOCK, - ffi_interfaces::ColumnFamilyType::Write => CF_WRITE, - ffi_interfaces::ColumnFamilyType::Default => CF_DEFAULT, + interfaces_ffi::ColumnFamilyType::Lock => CF_LOCK, + interfaces_ffi::ColumnFamilyType::Write => CF_WRITE, + interfaces_ffi::ColumnFamilyType::Default => CF_DEFAULT, _ => unreachable!(), } } unsafe extern "C" fn ffi_handle_safe_ts_update( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, + arg1: *mut interfaces_ffi::EngineStoreServerWrap, _region_id: u64, self_safe_ts: u64, leader_safe_ts: u64, @@ -1219,9 +1218,9 @@ unsafe extern "C" fn ffi_handle_safe_ts_update( } unsafe extern "C" fn ffi_apply_pre_handled_snapshot( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - arg2: ffi_interfaces::RawVoidPtr, - _arg3: ffi_interfaces::RawCppPtrType, + arg1: *mut interfaces_ffi::EngineStoreServerWrap, + arg2: interfaces_ffi::RawVoidPtr, + _arg3: interfaces_ffi::RawCppPtrType, ) { let store = into_engine_store_server_wrap(arg1); let region_meta = &mut *(arg2 as *mut PrehandledSnapshot); @@ -1251,10 +1250,10 @@ unsafe extern "C" fn ffi_apply_pre_handled_snapshot( } unsafe extern "C" fn ffi_handle_ingest_sst( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, - snaps: ffi_interfaces::SSTViewVec, - header: ffi_interfaces::RaftCmdHeader, -) -> ffi_interfaces::EngineStoreApplyRes { + arg1: *mut interfaces_ffi::EngineStoreServerWrap, + snaps: interfaces_ffi::SSTViewVec, + header: interfaces_ffi::RaftCmdHeader, +) -> interfaces_ffi::EngineStoreApplyRes { let store = into_engine_store_server_wrap(arg1); let node_id = (*store.engine_store_server).id; let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap()); @@ -1290,7 +1289,7 @@ unsafe extern "C" fn ffi_handle_ingest_sst( let snapshot = snaps.views.add(i as usize); // let _path = std::str::from_utf8_unchecked((*snapshot).path.to_slice()); let mut sst_reader = - SSTReader::new(proxy_helper, &*(snapshot as *mut ffi_interfaces::SSTView)); + SSTReader::new(proxy_helper, &*(snapshot as *mut interfaces_ffi::SSTView)); while sst_reader.remained() { let key = sst_reader.key(); let value = sst_reader.value(); @@ -1307,21 +1306,21 @@ unsafe extern "C" fn ffi_handle_ingest_sst( } fail::fail_point!("on_handle_ingest_sst_return", |_e| { - ffi_interfaces::EngineStoreApplyRes::None + interfaces_ffi::EngineStoreApplyRes::None }); write_to_db_data( &mut (*store.engine_store_server), region, String::from("ingest-sst"), ); - ffi_interfaces::EngineStoreApplyRes::Persist + interfaces_ffi::EngineStoreApplyRes::Persist } unsafe extern "C" fn ffi_handle_compute_store_stats( - _arg1: *mut ffi_interfaces::EngineStoreServerWrap, -) -> ffi_interfaces::StoreStats { - ffi_interfaces::StoreStats { - fs_stats: ffi_interfaces::FsStats { + _arg1: *mut interfaces_ffi::EngineStoreServerWrap, +) -> interfaces_ffi::StoreStats { + interfaces_ffi::StoreStats { + fs_stats: interfaces_ffi::FsStats { capacity_size: 444444, used_size: 111111, avail_size: 333333, @@ -1336,28 +1335,28 @@ unsafe extern "C" fn ffi_handle_compute_store_stats( pub unsafe fn create_cpp_str_parts( s: Option>, -) -> (ffi_interfaces::RawCppPtr, ffi_interfaces::BaseBuffView) { +) -> (interfaces_ffi::RawCppPtr, interfaces_ffi::BaseBuffView) { match s { Some(s) => { let len = s.len() as u64; let ptr = Box::into_raw(Box::new(s)); // leak ( - ffi_interfaces::RawCppPtr { + interfaces_ffi::RawCppPtr { ptr: ptr as RawVoidPtr, type_: RawCppPtrTypeImpl::String.into(), }, - ffi_interfaces::BaseBuffView { + interfaces_ffi::BaseBuffView { data: (*ptr).as_ptr() as *const _, len, }, ) } None => ( - ffi_interfaces::RawCppPtr { + interfaces_ffi::RawCppPtr { ptr: std::ptr::null_mut(), type_: RawCppPtrTypeImpl::None.into(), }, - ffi_interfaces::BaseBuffView { + interfaces_ffi::BaseBuffView { data: std::ptr::null(), len: 0, }, @@ -1365,17 +1364,17 @@ pub unsafe fn create_cpp_str_parts( } } -pub unsafe fn create_cpp_str(s: Option>) -> ffi_interfaces::CppStrWithView { +pub unsafe fn create_cpp_str(s: Option>) -> interfaces_ffi::CppStrWithView { let (p, v) = create_cpp_str_parts(s); - ffi_interfaces::CppStrWithView { inner: p, view: v } + interfaces_ffi::CppStrWithView { inner: p, view: v } } #[allow(clippy::redundant_closure_call)] unsafe extern "C" fn ffi_fast_add_peer( - arg1: *mut ffi_interfaces::EngineStoreServerWrap, + arg1: *mut interfaces_ffi::EngineStoreServerWrap, region_id: u64, new_peer_id: u64, -) -> ffi_interfaces::FastAddPeerRes { +) -> interfaces_ffi::FastAddPeerRes { let store = into_engine_store_server_wrap(arg1); let cluster = &*(store.cluster_ptr as *const mock_cluster::Cluster); let store_id = (*store.engine_store_server).id; @@ -1384,7 +1383,7 @@ unsafe extern "C" fn ffi_fast_add_peer( }); let failed_add_peer_res = - |status: ffi_interfaces::FastAddPeerStatus| ffi_interfaces::FastAddPeerRes { + |status: interfaces_ffi::FastAddPeerStatus| interfaces_ffi::FastAddPeerRes { status, apply_state: create_cpp_str(None), region: create_cpp_str(None), @@ -1413,7 +1412,7 @@ unsafe extern "C" fn ffi_fast_add_peer( debug!("recover from remote peer: enter from {} to {}", from_store, store_id; "region_id" => region_id); for retry in 0..300 { - let mut ret: Option = None; + let mut ret: Option = None; if retry > 0 { std::thread::sleep(std::time::Duration::from_millis(30)); } @@ -1422,7 +1421,7 @@ unsafe extern "C" fn ffi_fast_add_peer( let source_server = match guard.get_mut(&from_store) { Some(s) => &mut s.engine_store_server, None => { - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::NoSuitable)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::NoSuitable)); return; } }; @@ -1430,7 +1429,7 @@ unsafe extern "C" fn ffi_fast_add_peer( Some(s) => s, None => { error!("recover from remote peer: failed get source engine"; "region_id" => region_id); - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::BadData)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)); return } }; @@ -1439,7 +1438,7 @@ unsafe extern "C" fn ffi_fast_add_peer( Some(s) => s, None => { error!("recover from remote peer: failed read source region info"; "region_id" => region_id); - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::BadData)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)); return; } }; @@ -1451,7 +1450,7 @@ unsafe extern "C" fn ffi_fast_add_peer( None => { debug!("recover from remote peer: preparing from {} to {}:{}, not region state", from_store, store_id, new_peer_id; "region_id" => region_id); // We don't return BadData here, since the data may not be persisted. - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::WaitForData)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData)); return; } }; @@ -1462,7 +1461,7 @@ unsafe extern "C" fn ffi_fast_add_peer( PeerState::Tombstone | PeerState::Applying => { // Note in real implementation, we will avoid selecting this peer. error!("recover from remote peer: preparing from {} to {}:{}, error peer state {:?}", from_store, store_id, new_peer_id, peer_state; "region_id" => region_id); - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::BadData)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)); return; } _ => { @@ -1475,7 +1474,7 @@ unsafe extern "C" fn ffi_fast_add_peer( new_peer_id, ) { debug!("recover from remote peer: preparing from {} to {}, not applied conf change {}", from_store, store_id, new_peer_id; "region_id" => region_id); - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::WaitForData)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData)); return; } // TODO check commit_index and applied_index here @@ -1490,14 +1489,14 @@ unsafe extern "C" fn ffi_fast_add_peer( let target_engines = match (*store.engine_store_server).engines.clone() { Some(s) => s, None => { - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::OtherError)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::OtherError)); return; } }; let target_region = match (*store.engine_store_server).kvstore.get_mut(®ion_id) { Some(s) => s, None => { - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::BadData)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)); return; } }; @@ -1511,7 +1510,7 @@ unsafe extern "C" fn ffi_fast_add_peer( Some(x) => x, None => { error!("recover from remote peer: failed read apply state"; "region_id" => region_id); - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::BadData)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)); return; } }; @@ -1527,7 +1526,7 @@ unsafe extern "C" fn ffi_fast_add_peer( target_region, ) { error!("recover from remote peer: inject error {:?}", e; "region_id" => region_id); - ret = Some(failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::FailedInject)); + ret = Some(failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::FailedInject)); return; } if fail_after_write { @@ -1557,15 +1556,15 @@ unsafe extern "C" fn ffi_fast_add_peer( let region_ptr = create_cpp_str(Some(region_bytes)); // Check if we have commit_index. debug!("recover from remote peer: ok from {} to {}", from_store, store_id; "region_id" => region_id); - ret = Some(ffi_interfaces::FastAddPeerRes { - status: ffi_interfaces::FastAddPeerStatus::Ok, + ret = Some(interfaces_ffi::FastAddPeerRes { + status: interfaces_ffi::FastAddPeerStatus::Ok, apply_state: apply_state_ptr, region: region_ptr, }); }); if let Some(r) = ret { match r.status { - ffi_interfaces::FastAddPeerStatus::WaitForData => { + interfaces_ffi::FastAddPeerStatus::WaitForData => { if block_wait { continue; } else { @@ -1577,7 +1576,7 @@ unsafe extern "C" fn ffi_fast_add_peer( } } error!("recover from remote peer: failed after retry"; "region_id" => region_id); - failed_add_peer_res(ffi_interfaces::FastAddPeerStatus::BadData) + failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData) } #[allow(clippy::single_element_loop)] @@ -1597,7 +1596,7 @@ pub fn move_data_from( let new_region_meta = new_region.region.clone(); let start_key = new_region_meta.get_start_key(); let end_key = new_region_meta.get_end_key(); - for cf in &[ffi_interfaces::ColumnFamilyType::Default] { + for cf in &[interfaces_ffi::ColumnFamilyType::Default] { let cf = (*cf) as usize; for (k, v) in &kvs[cf] { let k = k.as_slice(); diff --git a/proxy_server/src/util.rs b/proxy_server/src/util.rs index 44ab10e9db3..c00d196696a 100644 --- a/proxy_server/src/util.rs +++ b/proxy_server/src/util.rs @@ -2,7 +2,7 @@ use std::time::{Duration, Instant}; -use engine_store_ffi::ffi::interfaces::root::DB::{BaseBuffView, RaftStoreProxyPtr, RawVoidPtr}; +use engine_store_ffi::ffi::interfaces_ffi::{BaseBuffView, RaftStoreProxyPtr, RawVoidPtr}; use futures::{compat::Future01CompatExt, executor::block_on}; use kvproto::diagnosticspb::{ServerInfoRequest, ServerInfoResponse, ServerInfoType}; use protobuf::Message; diff --git a/proxy_tests/proxy/proxy.rs b/proxy_tests/proxy/proxy.rs index 50dcb50b4a4..4e16f1954fd 100644 --- a/proxy_tests/proxy/proxy.rs +++ b/proxy_tests/proxy/proxy.rs @@ -164,7 +164,7 @@ pub fn must_get_mem( value: Option<&[u8]>, ) { let last_res: Option<&Vec> = None; - let cf = new_mock_engine_store::ffi_interfaces::ColumnFamilyType::Default; + let cf = new_mock_engine_store::interfaces_ffi::ColumnFamilyType::Default; for _ in 1..300 { let mut ok = false; { diff --git a/tests/failpoints/cases/test_proxy_replica_read.rs b/tests/failpoints/cases/test_proxy_replica_read.rs index 4b05012b1cf..48ff07b2f72 100644 --- a/tests/failpoints/cases/test_proxy_replica_read.rs +++ b/tests/failpoints/cases/test_proxy_replica_read.rs @@ -1,4 +1,4 @@ -use engine_store_ffi::interfaces::root::DB::RawRustPtr; +use engine_store_ffi::interfaces_ffi::RawRustPtr; use engine_store_ffi::{ ffi_gc_rust_ptr, ffi_make_async_waker, ffi_make_read_index_task, ffi_make_timer_task, ffi_poll_read_index_task, ffi_poll_timer_task, ProtoMsgBaseBuff, RawVoidPtr, From 460ee38e18d0520538072f8291670ca0538285e4 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 8 Feb 2023 14:08:29 +0800 Subject: [PATCH 3/7] fix replica_read test Signed-off-by: CalvinNeo --- proxy_scripts/ci_check.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index 1d87a63bc64..08ad9988051 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -45,7 +45,7 @@ elif [[ $M == "testnew" ]]; then cargo test --package proxy_tests --test proxy flashback cargo test --package proxy_tests --test proxy server_cluster_test cargo test --package proxy_tests --test proxy fast_add_peer - cargo test --package proxy_tests --test proxy replica_read + cargo test --package proxy_tests --test proxy replica_read -- --test-threads 1 cargo test --package proxy_tests --test proxy ffi -- --test-threads 1 cargo test --package proxy_tests --test proxy write --features="proxy_tests/enable-pagestorage" elif [[ $M == "debug" ]]; then From 76b6f8cf8988880a34cca3ec827d37ea4e080f0a Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 8 Feb 2023 15:05:37 +0800 Subject: [PATCH 4/7] split RaftStoreProxyFFIHelper and RaftStoreProxy, gather all ffi dep Signed-off-by: CalvinNeo --- engine_store_ffi/src/ffi/basic_ffi_impls.rs | 2 +- engine_store_ffi/src/ffi/domain_impls.rs | 9 +- engine_store_ffi/src/ffi/mod.rs | 7 +- engine_store_ffi/src/ffi/raftstore_proxy.rs | 92 ++++++++++++++++++ .../src/ffi/raftstore_proxy_helper_impls.rs | 95 ++----------------- .../src/{ => ffi}/read_index_helper.rs | 0 engine_store_ffi/src/{ => ffi}/utils.rs | 0 engine_store_ffi/src/lib.rs | 2 - new-mock-engine-store/src/mock_cluster.rs | 9 +- proxy_server/src/run.rs | 8 +- 10 files changed, 119 insertions(+), 105 deletions(-) create mode 100644 engine_store_ffi/src/ffi/raftstore_proxy.rs rename engine_store_ffi/src/{ => ffi}/read_index_helper.rs (100%) rename engine_store_ffi/src/{ => ffi}/utils.rs (100%) diff --git a/engine_store_ffi/src/ffi/basic_ffi_impls.rs b/engine_store_ffi/src/ffi/basic_ffi_impls.rs index 3c756e115f8..dff24129739 100644 --- a/engine_store_ffi/src/ffi/basic_ffi_impls.rs +++ b/engine_store_ffi/src/ffi/basic_ffi_impls.rs @@ -1,7 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use std::pin::Pin; -pub use super::interfaces_ffi::BaseBuffView; +use super::interfaces_ffi::BaseBuffView; impl From<&[u8]> for BaseBuffView { fn from(s: &[u8]) -> Self { diff --git a/engine_store_ffi/src/ffi/domain_impls.rs b/engine_store_ffi/src/ffi/domain_impls.rs index ca49e8a4e0c..462678c67e1 100644 --- a/engine_store_ffi/src/ffi/domain_impls.rs +++ b/engine_store_ffi/src/ffi/domain_impls.rs @@ -8,6 +8,7 @@ use super::{ BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, WriteCmdType, WriteCmdsView, }, + read_index_helper, utils, }; pub fn name_to_cf(cf: &str) -> ColumnFamilyType { @@ -123,15 +124,13 @@ pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRu let type_: RawRustPtrType = type_.into(); match type_ { RawRustPtrType::ReadIndexTask => unsafe { - drop(Box::from_raw( - data as *mut crate::read_index_helper::ReadIndexTask, - )); + drop(Box::from_raw(data as *mut read_index_helper::ReadIndexTask)); }, RawRustPtrType::ArcFutureWaker => unsafe { - drop(Box::from_raw(data as *mut crate::utils::ArcNotifyWaker)); + drop(Box::from_raw(data as *mut utils::ArcNotifyWaker)); }, RawRustPtrType::TimerTask => unsafe { - drop(Box::from_raw(data as *mut crate::utils::TimerTask)); + drop(Box::from_raw(data as *mut utils::TimerTask)); }, _ => unreachable!(), } diff --git a/engine_store_ffi/src/ffi/mod.rs b/engine_store_ffi/src/ffi/mod.rs index 4f63dbd8e53..08e6c09c69f 100644 --- a/engine_store_ffi/src/ffi/mod.rs +++ b/engine_store_ffi/src/ffi/mod.rs @@ -17,15 +17,18 @@ pub mod encryption_impls; pub mod engine_store_helper_impls; pub(crate) mod lock_cf_reader; // FFI directly related with RaftStoreProxyFFIHelper. +pub mod raftstore_proxy; pub mod raftstore_proxy_helper_impls; +pub mod read_index_helper; pub mod sst_reader_impls; +pub mod utils; pub use engine_tiflash::EngineStoreConfig; pub use self::{ basic_ffi_impls::*, domain_impls::*, encryption_impls::*, engine_store_helper_impls::*, - interfaces::root::DB as interfaces_ffi, lock_cf_reader::*, raftstore_proxy_helper_impls::*, - sst_reader_impls::*, + interfaces::root::DB as interfaces_ffi, lock_cf_reader::*, raftstore_proxy::*, + raftstore_proxy_helper_impls::*, sst_reader_impls::*, }; #[allow(clippy::wrong_self_convention)] diff --git a/engine_store_ffi/src/ffi/raftstore_proxy.rs b/engine_store_ffi/src/ffi/raftstore_proxy.rs new file mode 100644 index 00000000000..c69ed6eb33f --- /dev/null +++ b/engine_store_ffi/src/ffi/raftstore_proxy.rs @@ -0,0 +1,92 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::{ + atomic::{AtomicU8, Ordering}, + Arc, +}; + +use encryption::DataKeyManager; +use engine_traits::Peekable; + +use super::{ + interfaces_ffi::{ConstRawVoidPtr, RaftProxyStatus, RaftStoreProxyPtr}, + raftstore_proxy_helper_impls::*, + read_index_helper, +}; +use crate::TiFlashEngine; + +pub struct RaftStoreProxy { + pub status: AtomicU8, + pub key_manager: Option>, + pub read_index_client: Option>, + pub kv_engine: std::sync::RwLock>, +} + +impl RaftStoreProxy { + pub fn new( + status: AtomicU8, + key_manager: Option>, + read_index_client: Option>, + kv_engine: std::sync::RwLock>, + ) -> Self { + RaftStoreProxy { + status, + key_manager, + read_index_client, + kv_engine, + } + } +} + +impl RaftStoreProxyFFI for RaftStoreProxy { + fn set_kv_engine(&mut self, kv_engine: Option) { + let mut lock = self.kv_engine.write().unwrap(); + *lock = kv_engine; + } + + fn set_status(&mut self, s: RaftProxyStatus) { + self.status.store(s as u8, Ordering::SeqCst); + } + + fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) + where + F: FnOnce(Result, String>), + { + let kv_engine_lock = self.kv_engine.read().unwrap(); + let kv_engine = kv_engine_lock.as_ref(); + if kv_engine.is_none() { + cb(Err("KV engine is not initialized".to_string())); + return; + } + let value = kv_engine.unwrap().get_value_cf(cf, key); + match value { + Ok(v) => { + if let Some(x) = v { + cb(Ok(Some(&x))); + } else { + cb(Ok(None)); + } + } + Err(e) => { + cb(Err(format!("{}", e))); + } + } + } +} + +impl RaftStoreProxyPtr { + pub unsafe fn as_ref(&self) -> &RaftStoreProxy { + &*(self.inner as *const RaftStoreProxy) + } + pub fn is_null(&self) -> bool { + self.inner.is_null() + } +} + +impl From<&RaftStoreProxy> for RaftStoreProxyPtr { + fn from(ptr: &RaftStoreProxy) -> Self { + Self { + inner: ptr as *const _ as ConstRawVoidPtr, + } + } +} diff --git a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs index 4e976b2d25f..a2f642f8f01 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs @@ -2,15 +2,10 @@ use std::{ pin::Pin, - sync::{ - atomic::{AtomicU8, Ordering}, - Arc, - }, + sync::{atomic::Ordering, Arc}, time, }; -use encryption::DataKeyManager; -use engine_traits::Peekable; use kvproto::kvrpcpb; use protobuf::Message; @@ -21,14 +16,14 @@ use super::{ engine_store_helper_impls::*, interfaces_ffi, interfaces_ffi::{ - BaseBuffView, ConstRawVoidPtr, CppStrVecView, KVGetStatus, RaftProxyStatus, - RaftStoreProxyFFIHelper, RaftStoreProxyPtr, RawCppPtr, RawCppStringPtr, RawRustPtr, - RawVoidPtr, SSTReaderInterfaces, + BaseBuffView, CppStrVecView, KVGetStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, + RaftStoreProxyPtr, RawCppPtr, RawCppStringPtr, RawRustPtr, RawVoidPtr, SSTReaderInterfaces, }, + read_index_helper, sst_reader_impls::*, - UnwrapExternCFunc, + utils, UnwrapExternCFunc, }; -use crate::{read_index_helper, utils, TiFlashEngine}; +use crate::TiFlashEngine; pub trait RaftStoreProxyFFI: Sync { fn set_status(&mut self, s: RaftProxyStatus); @@ -38,82 +33,6 @@ pub trait RaftStoreProxyFFI: Sync { fn set_kv_engine(&mut self, kv_engine: Option); } -pub struct RaftStoreProxy { - pub status: AtomicU8, - pub key_manager: Option>, - pub read_index_client: Option>, - pub kv_engine: std::sync::RwLock>, -} - -impl RaftStoreProxy { - pub fn new( - status: AtomicU8, - key_manager: Option>, - read_index_client: Option>, - kv_engine: std::sync::RwLock>, - ) -> Self { - RaftStoreProxy { - status, - key_manager, - read_index_client, - kv_engine, - } - } -} - -impl RaftStoreProxyFFI for RaftStoreProxy { - fn set_kv_engine(&mut self, kv_engine: Option) { - let mut lock = self.kv_engine.write().unwrap(); - *lock = kv_engine; - } - - fn set_status(&mut self, s: RaftProxyStatus) { - self.status.store(s as u8, Ordering::SeqCst); - } - - fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) - where - F: FnOnce(Result, String>), - { - let kv_engine_lock = self.kv_engine.read().unwrap(); - let kv_engine = kv_engine_lock.as_ref(); - if kv_engine.is_none() { - cb(Err("KV engine is not initialized".to_string())); - return; - } - let value = kv_engine.unwrap().get_value_cf(cf, key); - match value { - Ok(v) => { - if let Some(x) = v { - cb(Ok(Some(&x))); - } else { - cb(Ok(None)); - } - } - Err(e) => { - cb(Err(format!("{}", e))); - } - } - } -} - -impl RaftStoreProxyPtr { - pub unsafe fn as_ref(&self) -> &RaftStoreProxy { - &*(self.inner as *const RaftStoreProxy) - } - pub fn is_null(&self) -> bool { - self.inner.is_null() - } -} - -impl From<&RaftStoreProxy> for RaftStoreProxyPtr { - fn from(ptr: &RaftStoreProxy) -> Self { - Self { - inner: ptr as *const _ as ConstRawVoidPtr, - } - } -} - impl Clone for RaftStoreProxyPtr { fn clone(&self) -> RaftStoreProxyPtr { RaftStoreProxyPtr { @@ -125,7 +44,7 @@ impl Clone for RaftStoreProxyPtr { impl Copy for RaftStoreProxyPtr {} impl RaftStoreProxyFFIHelper { - pub fn new(proxy: &RaftStoreProxy) -> Self { + pub fn new(proxy: RaftStoreProxyPtr) -> Self { RaftStoreProxyFFIHelper { proxy_ptr: proxy.into(), fn_handle_get_proxy_status: Some(ffi_handle_get_proxy_status), diff --git a/engine_store_ffi/src/read_index_helper.rs b/engine_store_ffi/src/ffi/read_index_helper.rs similarity index 100% rename from engine_store_ffi/src/read_index_helper.rs rename to engine_store_ffi/src/ffi/read_index_helper.rs diff --git a/engine_store_ffi/src/utils.rs b/engine_store_ffi/src/ffi/utils.rs similarity index 100% rename from engine_store_ffi/src/utils.rs rename to engine_store_ffi/src/ffi/utils.rs diff --git a/engine_store_ffi/src/lib.rs b/engine_store_ffi/src/lib.rs index d068baacf8f..b98bf686af0 100644 --- a/engine_store_ffi/src/lib.rs +++ b/engine_store_ffi/src/lib.rs @@ -6,8 +6,6 @@ pub mod core; pub mod engine; pub mod ffi; pub mod observer; -pub mod read_index_helper; -mod utils; // Be discreet when expose inner mods by pub use. // engine_store_ffi crate includes too many mods here. diff --git a/new-mock-engine-store/src/mock_cluster.rs b/new-mock-engine-store/src/mock_cluster.rs index bf98252a34c..0d20736c11a 100644 --- a/new-mock-engine-store/src/mock_cluster.rs +++ b/new-mock-engine-store/src/mock_cluster.rs @@ -16,7 +16,7 @@ pub use engine_store_ffi::ffi::{ interfaces_ffi::{ EngineStoreServerHelper, RaftProxyStatus, RaftStoreProxyFFIHelper, RawCppPtr, }, - UnwrapExternCFunc, + RaftStoreProxy, UnwrapExternCFunc, }; pub use engine_store_ffi::TiFlashEngine; use engine_tiflash::DB; @@ -180,7 +180,7 @@ impl> Cluster { key_manager: key_mgr.clone(), read_index_client: match router { Some(r) => Some(Box::new( - engine_store_ffi::read_index_helper::ReadIndexClient::new( + engine_store_ffi::ffi::read_index_helper::ReadIndexClient::new( r.clone(), SysQuota::cpu_cores_quota() as usize * 2, ), @@ -190,7 +190,8 @@ impl> Cluster { kv_engine: std::sync::RwLock::new(Some(engines.kv.clone())), }); - let mut proxy_helper = Box::new(RaftStoreProxyFFIHelper::new(&proxy)); + let proxy_ref = proxy.as_ref(); + let mut proxy_helper = Box::new(RaftStoreProxyFFIHelper::new(proxy_ref.into())); let mut engine_store_server = Box::new(EngineStoreServer::new(id, Some(engines))); engine_store_server.proxy_compat = proxy_compat; engine_store_server.mock_cfg = mock_cfg; @@ -418,7 +419,7 @@ impl> Cluster { let mut lock = self.ffi_helper_set.lock().unwrap(); let ffi_helper_set = lock.get_mut(&node_id).unwrap(); ffi_helper_set.proxy.read_index_client = Some(Box::new( - engine_store_ffi::read_index_helper::ReadIndexClient::new( + engine_store_ffi::ffi::read_index_helper::ReadIndexClient::new( router.clone(), SysQuota::cpu_cores_quota() as usize * 2, ), diff --git a/proxy_server/src/run.rs b/proxy_server/src/run.rs index 9c6e68d0b62..21a6190a491 100644 --- a/proxy_server/src/run.rs +++ b/proxy_server/src/run.rs @@ -35,9 +35,9 @@ use engine_store_ffi::{ EngineStoreServerHelper, EngineStoreServerStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, }, + read_index_helper::ReadIndexClient, RaftStoreProxy, RaftStoreProxyFFI, }, - read_index_helper::ReadIndexClient, TiFlashEngine, }; use engine_traits::{ @@ -149,8 +149,9 @@ pub fn run_impl( std::sync::RwLock::new(None), ); + let proxy_ref = &proxy; let proxy_helper = { - let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy); + let mut proxy_helper = RaftStoreProxyFFIHelper::new(proxy_ref.into()); proxy_helper.fn_server_info = Some(ffi_server_info); proxy_helper }; @@ -254,8 +255,9 @@ fn run_impl_only_for_decryption( std::sync::RwLock::new(None), ); + let proxy_ref = &proxy; let proxy_helper = { - let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy); + let mut proxy_helper = RaftStoreProxyFFIHelper::new(proxy_ref.into()); proxy_helper.fn_server_info = Some(ffi_server_info); proxy_helper }; From 47e244ce42f0c0ca12b02a321c1db9b29f9b6262 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 8 Feb 2023 15:17:18 +0800 Subject: [PATCH 5/7] f Signed-off-by: CalvinNeo --- engine_store_ffi/src/ffi/raftstore_proxy.rs | 8 ++++++++ engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs | 9 --------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/engine_store_ffi/src/ffi/raftstore_proxy.rs b/engine_store_ffi/src/ffi/raftstore_proxy.rs index c69ed6eb33f..b66634881bc 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy.rs @@ -15,6 +15,14 @@ use super::{ }; use crate::TiFlashEngine; +pub trait RaftStoreProxyFFI: Sync { + fn set_status(&mut self, s: RaftProxyStatus); + fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) + where + F: FnOnce(Result, String>); + fn set_kv_engine(&mut self, kv_engine: Option); +} + pub struct RaftStoreProxy { pub status: AtomicU8, pub key_manager: Option>, diff --git a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs index a2f642f8f01..12eafcc7943 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs @@ -23,15 +23,6 @@ use super::{ sst_reader_impls::*, utils, UnwrapExternCFunc, }; -use crate::TiFlashEngine; - -pub trait RaftStoreProxyFFI: Sync { - fn set_status(&mut self, s: RaftProxyStatus); - fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) - where - F: FnOnce(Result, String>); - fn set_kv_engine(&mut self, kv_engine: Option); -} impl Clone for RaftStoreProxyPtr { fn clone(&self) -> RaftStoreProxyPtr { From 187a82b43896f2dfe72e62958814c59afad0e93d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 8 Feb 2023 20:36:48 +0800 Subject: [PATCH 6/7] move sst to domain for split of engine store Signed-off-by: CalvinNeo --- engine_store_ffi/src/ffi/domain_impls.rs | 26 +++++++++++++++++-- .../src/ffi/engine_store_helper_impls.rs | 2 +- engine_store_ffi/src/ffi/raftstore_proxy.rs | 10 +------ .../src/ffi/raftstore_proxy_helper_impls.rs | 10 ++++++- engine_store_ffi/src/ffi/sst_reader_impls.rs | 24 ++--------------- 5 files changed, 37 insertions(+), 35 deletions(-) diff --git a/engine_store_ffi/src/ffi/domain_impls.rs b/engine_store_ffi/src/ffi/domain_impls.rs index 462678c67e1..881f557bff9 100644 --- a/engine_store_ffi/src/ffi/domain_impls.rs +++ b/engine_store_ffi/src/ffi/domain_impls.rs @@ -1,16 +1,38 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use std::pin::Pin; + use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE}; use super::{ interfaces_ffi, interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, WriteCmdType, - WriteCmdsView, + BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, SSTView, SSTViewVec, + WriteCmdType, WriteCmdsView, }, read_index_helper, utils, }; +pub fn into_sst_views(snaps: Vec<(&[u8], ColumnFamilyType)>) -> Vec { + let mut snaps_view = vec![]; + for (path, cf) in snaps { + snaps_view.push(SSTView { + type_: cf, + path: path.into(), + }) + } + snaps_view +} + +impl From>> for SSTViewVec { + fn from(snaps_view: Pin<&Vec>) -> Self { + Self { + views: snaps_view.as_ptr(), + len: snaps_view.len() as u64, + } + } +} + pub fn name_to_cf(cf: &str) -> ColumnFamilyType { if cf.is_empty() { return ColumnFamilyType::Default; diff --git a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs b/engine_store_ffi/src/ffi/engine_store_helper_impls.rs index b92eed0db02..f4aea0fdb1e 100644 --- a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs +++ b/engine_store_ffi/src/ffi/engine_store_helper_impls.rs @@ -5,6 +5,7 @@ use kvproto::{kvrpcpb, metapb, raft_cmdpb}; use super::{ basic_ffi_impls::*, + domain_impls::*, interfaces_ffi, interfaces_ffi::{ BaseBuffView, ColumnFamilyType, CppStrWithView, EngineStoreApplyRes, @@ -13,7 +14,6 @@ use super::{ RawCppStringPtr, RawVoidPtr, SpecialCppPtrType, StoreStats, RAFT_STORE_PROXY_MAGIC_NUMBER, RAFT_STORE_PROXY_VERSION, }, - sst_reader_impls::*, UnwrapExternCFunc, WriteCmds, }; diff --git a/engine_store_ffi/src/ffi/raftstore_proxy.rs b/engine_store_ffi/src/ffi/raftstore_proxy.rs index b66634881bc..9f075f22c77 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy.rs @@ -15,14 +15,6 @@ use super::{ }; use crate::TiFlashEngine; -pub trait RaftStoreProxyFFI: Sync { - fn set_status(&mut self, s: RaftProxyStatus); - fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) - where - F: FnOnce(Result, String>); - fn set_kv_engine(&mut self, kv_engine: Option); -} - pub struct RaftStoreProxy { pub status: AtomicU8, pub key_manager: Option>, @@ -46,7 +38,7 @@ impl RaftStoreProxy { } } -impl RaftStoreProxyFFI for RaftStoreProxy { +impl RaftStoreProxyFFI for RaftStoreProxy { fn set_kv_engine(&mut self, kv_engine: Option) { let mut lock = self.kv_engine.write().unwrap(); *lock = kv_engine; diff --git a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs index 55cccfe44e1..92cba0f8f73 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs @@ -21,7 +21,7 @@ use super::{ }, read_index_helper, sst_reader_impls::*, - utils, RaftStoreProxyFFI, UnwrapExternCFunc, + utils, UnwrapExternCFunc, }; impl Clone for RaftStoreProxyPtr { @@ -34,6 +34,14 @@ impl Clone for RaftStoreProxyPtr { impl Copy for RaftStoreProxyPtr {} +pub trait RaftStoreProxyFFI: Sync { + fn set_status(&mut self, s: RaftProxyStatus); + fn get_value_cf(&self, cf: &str, key: &[u8], cb: F) + where + F: FnOnce(Result, String>); + fn set_kv_engine(&mut self, kv_engine: Option); +} + impl RaftStoreProxyFFIHelper { pub fn new(proxy: RaftStoreProxyPtr) -> Self { RaftStoreProxyFFIHelper { diff --git a/engine_store_ffi/src/ffi/sst_reader_impls.rs b/engine_store_ffi/src/ffi/sst_reader_impls.rs index 17dd4d81a21..dd2fd9ac920 100644 --- a/engine_store_ffi/src/ffi/sst_reader_impls.rs +++ b/engine_store_ffi/src/ffi/sst_reader_impls.rs @@ -1,5 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{cell::RefCell, pin::Pin, sync::Arc}; +use std::{cell::RefCell, sync::Arc}; use encryption::DataKeyManager; use engine_rocks::{get_env, RocksSstIterator, RocksSstReader}; @@ -8,7 +8,7 @@ use engine_traits::{IterOptions, Iterator, RefIterable, SstReader}; use super::{ interfaces_ffi::{ BaseBuffView, ColumnFamilyType, RaftStoreProxyPtr, RawVoidPtr, SSTReaderInterfaces, - SSTReaderPtr, SSTView, SSTViewVec, + SSTReaderPtr, SSTView, }, LockCFFileReader, }; @@ -124,26 +124,6 @@ pub unsafe extern "C" fn ffi_gc_sst_reader(reader: SSTReaderPtr, type_: ColumnFa } } -pub fn into_sst_views(snaps: Vec<(&[u8], ColumnFamilyType)>) -> Vec { - let mut snaps_view = vec![]; - for (path, cf) in snaps { - snaps_view.push(SSTView { - type_: cf, - path: path.into(), - }) - } - snaps_view -} - -impl From>> for SSTViewVec { - fn from(snaps_view: Pin<&Vec>) -> Self { - Self { - views: snaps_view.as_ptr(), - len: snaps_view.len() as u64, - } - } -} - pub struct SSTFileReader<'a> { iter: RefCell>>, remained: RefCell, From 3b2ac1752686c7e5e0a2885bda000d52d257630d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 8 Feb 2023 21:05:22 +0800 Subject: [PATCH 7/7] fail Signed-off-by: CalvinNeo --- Cargo.lock | 21 ++++++ Cargo.toml | 2 + engine_store_ffi/Cargo.toml | 2 +- engine_store_ffi/src/ffi/mod.rs | 32 ++------- .../src/ffi/raftstore_proxy_helper_impls.rs | 57 ++++++++++++++++ proxy_ffi/Cargo.toml | 29 ++++++++ .../ffi => proxy_ffi/src}/basic_ffi_impls.rs | 11 ++++ .../ffi => proxy_ffi/src}/context_impls.rs | 0 .../src/ffi => proxy_ffi/src}/domain_impls.rs | 66 +------------------ .../src}/engine_store_helper_impls.rs | 0 .../src/ffi => proxy_ffi/src}/interfaces.rs | 0 proxy_ffi/src/lib.rs | 24 +++++++ .../ffi => proxy_ffi/src}/sst_reader_impls.rs | 0 .../src/ffi => proxy_ffi/src}/utils.rs | 0 14 files changed, 151 insertions(+), 93 deletions(-) create mode 100644 proxy_ffi/Cargo.toml rename {engine_store_ffi/src/ffi => proxy_ffi/src}/basic_ffi_impls.rs (80%) rename {engine_store_ffi/src/ffi => proxy_ffi/src}/context_impls.rs (100%) rename {engine_store_ffi/src/ffi => proxy_ffi/src}/domain_impls.rs (62%) rename {engine_store_ffi/src/ffi => proxy_ffi/src}/engine_store_helper_impls.rs (100%) rename {engine_store_ffi/src/ffi => proxy_ffi/src}/interfaces.rs (100%) create mode 100644 proxy_ffi/src/lib.rs rename {engine_store_ffi/src/ffi => proxy_ffi/src}/sst_reader_impls.rs (100%) rename {engine_store_ffi/src/ffi => proxy_ffi/src}/utils.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index aa7eb276e28..280ea82193b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,6 +1617,7 @@ dependencies = [ "prometheus", "prometheus-static-metric", "protobuf", + "proxy_ffi", "raft", "raft-proto", "raftstore", @@ -4420,6 +4421,26 @@ dependencies = [ "protobuf-codegen", ] +[[package]] +name = "proxy_ffi" +version = "0.0.1" +dependencies = [ + "engine_rocks", + "engine_traits", + "fail", + "futures 0.3.15", + "futures-util", + "keys", + "kvproto", + "lazy_static", + "protobuf", + "slog", + "slog-global", + "tikv_util", + "tokio", + "tokio-timer", +] + [[package]] name = "proxy_server" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 6235d1d9c8a..4e407a30b25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -301,6 +301,7 @@ members = [ "fuzz/fuzzer-honggfuzz", "fuzz/fuzzer-libfuzzer", "gen-proxy-ffi", + "proxy_ffi", "proxy_tests", "raftstore-proxy", "tests", @@ -388,6 +389,7 @@ engine_store_ffi = { path = "engine_store_ffi", default-features = false } new-mock-engine-store = { path = "new-mock-engine-store", default-features = false } proxy_server = { path = "proxy_server", default-features = false } engine_tiflash = { path = "engine_tiflash", default-features = false } +proxy_ffi = { path = "proxy_ffi", default-features = false } [profile.dev.package.grpcio-sys] debug = false diff --git a/engine_store_ffi/Cargo.toml b/engine_store_ffi/Cargo.toml index f24a69afae2..291365e9b88 100644 --- a/engine_store_ffi/Cargo.toml +++ b/engine_store_ffi/Cargo.toml @@ -37,7 +37,6 @@ collections = { workspace = true } crossbeam = "0.8" derivative = "2" encryption = { workspace = true, default-features = false } - engine_rocks = { workspace = true, default-features = false } # Should be [dev-dependencies] but we need to control the features # https://github.com/rust-lang/cargo/issues/6915 @@ -67,6 +66,7 @@ portable-atomic = "0.3" prometheus = { version = "0.13", features = ["nightly"] } prometheus-static-metric = "0.5" protobuf = { version = "2.8", features = ["bytes"] } +proxy_ffi = { workspace = true, default-features = false } raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] } raft-proto = { version = "0.7.0", default-features = false } raftstore = { workspace = true, default-features = false } diff --git a/engine_store_ffi/src/ffi/mod.rs b/engine_store_ffi/src/ffi/mod.rs index 08e6c09c69f..3b1218ea564 100644 --- a/engine_store_ffi/src/ffi/mod.rs +++ b/engine_store_ffi/src/ffi/mod.rs @@ -3,41 +3,17 @@ /// All mods end up with `_impls` impl structs defined in interface. /// Other mods which define and impl structs should not end up with name /// `_impls`. - -#[allow(dead_code)] -pub mod interfaces; -// All ffi impls that without raft domain. -pub mod basic_ffi_impls; -// All ffi impls that within raft domain, but without proxy helper context. -pub mod domain_impls; -// All ffi impls that within proxy helper context. -pub mod context_impls; -pub mod encryption_impls; -// FFI directly related with EngineStoreServerHelper. -pub mod engine_store_helper_impls; pub(crate) mod lock_cf_reader; // FFI directly related with RaftStoreProxyFFIHelper. +pub mod encryption_impls; pub mod raftstore_proxy; pub mod raftstore_proxy_helper_impls; pub mod read_index_helper; -pub mod sst_reader_impls; -pub mod utils; pub use engine_tiflash::EngineStoreConfig; +pub use proxy_ffi::*; pub use self::{ - basic_ffi_impls::*, domain_impls::*, encryption_impls::*, engine_store_helper_impls::*, - interfaces::root::DB as interfaces_ffi, lock_cf_reader::*, raftstore_proxy::*, - raftstore_proxy_helper_impls::*, sst_reader_impls::*, + encryption_impls::*, lock_cf_reader::*, raftstore_proxy::*, raftstore_proxy_helper_impls::*, + sst_reader_impls::*, }; - -#[allow(clippy::wrong_self_convention)] -pub trait UnwrapExternCFunc { - unsafe fn into_inner(&self) -> &T; -} - -impl UnwrapExternCFunc for std::option::Option { - unsafe fn into_inner(&self) -> &T { - std::mem::transmute::<&Self, &T>(self) - } -} diff --git a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs index 92cba0f8f73..42a5a4ff391 100644 --- a/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs +++ b/engine_store_ffi/src/ffi/raftstore_proxy_helper_impls.rs @@ -271,3 +271,60 @@ pub unsafe extern "C" fn ffi_poll_timer_task(task_ptr: RawVoidPtr, waker: RawVoi 0 } } + +#[repr(u32)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +pub enum RawRustPtrType { + None = 0, + ReadIndexTask = 1, + ArcFutureWaker = 2, + TimerTask = 3, +} + +impl From for RawRustPtrType { + fn from(x: u32) -> Self { + unsafe { std::mem::transmute(x) } + } +} + +// TODO remove this warn. +#[allow(clippy::from_over_into)] +impl Into for RawRustPtrType { + fn into(self) -> u32 { + unsafe { std::mem::transmute(self) } + } +} + +pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRustPtrType) { + if data.is_null() { + return; + } + let type_: RawRustPtrType = type_.into(); + match type_ { + RawRustPtrType::ReadIndexTask => unsafe { + drop(Box::from_raw(data as *mut read_index_helper::ReadIndexTask)); + }, + RawRustPtrType::ArcFutureWaker => unsafe { + drop(Box::from_raw(data as *mut utils::ArcNotifyWaker)); + }, + RawRustPtrType::TimerTask => unsafe { + drop(Box::from_raw(data as *mut utils::TimerTask)); + }, + _ => unreachable!(), + } +} + +impl Default for RawRustPtr { + fn default() -> Self { + Self { + ptr: std::ptr::null_mut(), + type_: RawRustPtrType::None.into(), + } + } +} + +impl RawRustPtr { + pub fn is_null(&self) -> bool { + self.ptr.is_null() + } +} diff --git a/proxy_ffi/Cargo.toml b/proxy_ffi/Cargo.toml new file mode 100644 index 00000000000..ddd87380ab8 --- /dev/null +++ b/proxy_ffi/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "proxy_ffi" +version = "0.0.1" +authors = ["The TiKV Authors"] +license = "Apache-2.0" +edition = "2018" +publish = false + +[features] +default = [] +failpoints = ["fail/failpoints"] +testexport = [] + +[dependencies] +encryption = { workspace = true, default-features = false } +engine_rocks = { workspace = true, default-features = false } +engine_traits = { workspace = true, default-features = false } +fail = "0.5" +futures = "0.3" +futures-util = { version = "0.3.1", default-features = false, features = ["io"] } +keys = { workspace = true, default-features = false } +kvproto = { git = "https://github.com/pingcap/kvproto.git" } +lazy_static = "1.3" +protobuf = { version = "2.8", features = ["bytes"] } +slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } +slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } +tikv_util = { workspace = true, default-features = false } +tokio = { version = "1.5", features = ["sync", "rt-multi-thread"] } +tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hotfix" } diff --git a/engine_store_ffi/src/ffi/basic_ffi_impls.rs b/proxy_ffi/src/basic_ffi_impls.rs similarity index 80% rename from engine_store_ffi/src/ffi/basic_ffi_impls.rs rename to proxy_ffi/src/basic_ffi_impls.rs index dff24129739..8aed6197908 100644 --- a/engine_store_ffi/src/ffi/basic_ffi_impls.rs +++ b/proxy_ffi/src/basic_ffi_impls.rs @@ -3,6 +3,17 @@ use std::pin::Pin; use super::interfaces_ffi::BaseBuffView; +#[allow(clippy::wrong_self_convention)] +pub trait UnwrapExternCFunc { + unsafe fn into_inner(&self) -> &T; +} + +impl UnwrapExternCFunc for std::option::Option { + unsafe fn into_inner(&self) -> &T { + std::mem::transmute::<&Self, &T>(self) + } +} + impl From<&[u8]> for BaseBuffView { fn from(s: &[u8]) -> Self { let ptr = s.as_ptr() as *const _; diff --git a/engine_store_ffi/src/ffi/context_impls.rs b/proxy_ffi/src/context_impls.rs similarity index 100% rename from engine_store_ffi/src/ffi/context_impls.rs rename to proxy_ffi/src/context_impls.rs diff --git a/engine_store_ffi/src/ffi/domain_impls.rs b/proxy_ffi/src/domain_impls.rs similarity index 62% rename from engine_store_ffi/src/ffi/domain_impls.rs rename to proxy_ffi/src/domain_impls.rs index 881f557bff9..04e8f424059 100644 --- a/engine_store_ffi/src/ffi/domain_impls.rs +++ b/proxy_ffi/src/domain_impls.rs @@ -4,13 +4,8 @@ use std::pin::Pin; use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE}; -use super::{ - interfaces_ffi, - interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, SSTView, SSTViewVec, - WriteCmdType, WriteCmdsView, - }, - read_index_helper, utils, +use super::interfaces_ffi::{ + BaseBuffView, ColumnFamilyType, RaftCmdHeader, SSTView, SSTViewVec, WriteCmdType, WriteCmdsView, }; pub fn into_sst_views(snaps: Vec<(&[u8], ColumnFamilyType)>) -> Vec { @@ -115,60 +110,3 @@ impl RaftCmdHeader { } } } - -#[repr(u32)] -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] -pub enum RawRustPtrType { - None = 0, - ReadIndexTask = 1, - ArcFutureWaker = 2, - TimerTask = 3, -} - -impl From for RawRustPtrType { - fn from(x: u32) -> Self { - unsafe { std::mem::transmute(x) } - } -} - -// TODO remove this warn. -#[allow(clippy::from_over_into)] -impl Into for RawRustPtrType { - fn into(self) -> u32 { - unsafe { std::mem::transmute(self) } - } -} - -pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRustPtrType) { - if data.is_null() { - return; - } - let type_: RawRustPtrType = type_.into(); - match type_ { - RawRustPtrType::ReadIndexTask => unsafe { - drop(Box::from_raw(data as *mut read_index_helper::ReadIndexTask)); - }, - RawRustPtrType::ArcFutureWaker => unsafe { - drop(Box::from_raw(data as *mut utils::ArcNotifyWaker)); - }, - RawRustPtrType::TimerTask => unsafe { - drop(Box::from_raw(data as *mut utils::TimerTask)); - }, - _ => unreachable!(), - } -} - -impl Default for RawRustPtr { - fn default() -> Self { - Self { - ptr: std::ptr::null_mut(), - type_: RawRustPtrType::None.into(), - } - } -} - -impl RawRustPtr { - pub fn is_null(&self) -> bool { - self.ptr.is_null() - } -} diff --git a/engine_store_ffi/src/ffi/engine_store_helper_impls.rs b/proxy_ffi/src/engine_store_helper_impls.rs similarity index 100% rename from engine_store_ffi/src/ffi/engine_store_helper_impls.rs rename to proxy_ffi/src/engine_store_helper_impls.rs diff --git a/engine_store_ffi/src/ffi/interfaces.rs b/proxy_ffi/src/interfaces.rs similarity index 100% rename from engine_store_ffi/src/ffi/interfaces.rs rename to proxy_ffi/src/interfaces.rs diff --git a/proxy_ffi/src/lib.rs b/proxy_ffi/src/lib.rs new file mode 100644 index 00000000000..ab15451f1c3 --- /dev/null +++ b/proxy_ffi/src/lib.rs @@ -0,0 +1,24 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +#[allow(dead_code)] +pub mod interfaces; + +/// All mods end up with `_impls` impl structs defined in interface. +/// Other mods which define and impl structs should not end up with name +/// `_impls`. +// All ffi impls that without raft domain. +pub mod basic_ffi_impls; +// All ffi impls that within raft domain, but without proxy helper context. +pub mod domain_impls; +// FFI directly related with EngineStoreServerHelper. +pub mod engine_store_helper_impls; +// All ffi impls that within engine store helper context. +pub mod context_impls; + +pub mod sst_reader_impls; +pub mod utils; + +pub use self::{ + basic_ffi_impls::*, context_impls::*, domain_impls::*, engine_store_helper_impls::*, + interfaces::root::DB as interfaces_ffi, sst_reader_impls::*, utils::*, +}; diff --git a/engine_store_ffi/src/ffi/sst_reader_impls.rs b/proxy_ffi/src/sst_reader_impls.rs similarity index 100% rename from engine_store_ffi/src/ffi/sst_reader_impls.rs rename to proxy_ffi/src/sst_reader_impls.rs diff --git a/engine_store_ffi/src/ffi/utils.rs b/proxy_ffi/src/utils.rs similarity index 100% rename from engine_store_ffi/src/ffi/utils.rs rename to proxy_ffi/src/utils.rs