diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b74198602..3efbee055 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,7 +90,6 @@ jobs: # - "integration-tests::chain_cache" FIXME: this must be reintroduced when the chain index test hangs are debugged - "integration-tests::fetch_service" - "integration-tests::json_server" - - "integration-tests::local_cache" - "integration-tests::state_service" - "integration-tests::test_vectors" - "integration-tests::wallet_to_validator" diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index 8645836cf..2dd8f0663 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -123,6 +123,7 @@ mod chain_query_interface { NetworkKind::Testnet => zebra_chain::parameters::Network::new_default_testnet(), NetworkKind::Mainnet => zebra_chain::parameters::Network::Mainnet, }; + // FIXME: when state service is integrated into chain index this initialization must change let state_service = StateService::spawn(StateServiceConfig::new( zebra_state::Config { cache_dir: state_chain_cache_dir, diff --git a/integration-tests/tests/fetch_service.rs b/integration-tests/tests/fetch_service.rs index d3211936d..48ea4651c 100644 --- a/integration-tests/tests/fetch_service.rs +++ b/integration-tests/tests/fetch_service.rs @@ -11,7 +11,7 @@ use zaino_proto::proto::service::{ use zaino_state::ChainIndex; use zaino_state::FetchServiceSubscriber; #[allow(deprecated)] -use zaino_state::{FetchService, LightWalletIndexer, StatusType, ZcashIndexer}; +use zaino_state::{FetchService, LightWalletIndexer, Status, StatusType, ZcashIndexer}; use zaino_testutils::{TestManager, ValidatorExt, ValidatorKind}; use zebra_chain::parameters::subsidy::ParameterSubsidy as _; use zebra_chain::subtree::NoteCommitmentSubtreeIndex; @@ -1918,7 +1918,7 @@ async fn fetch_service_get_subtree_roots(validator: &ValidatorK }; let fetch_service_stream = fetch_service_subscriber - .get_subtree_roots(subtree_roots_arg.clone()) + .get_subtree_roots(subtree_roots_arg) .await .unwrap(); let fetch_service_roots: Vec<_> = fetch_service_stream.collect().await; diff --git a/integration-tests/tests/local_cache.rs b/integration-tests/tests/local_cache.rs deleted file mode 100644 index fe23fa68c..000000000 --- a/integration-tests/tests/local_cache.rs +++ /dev/null @@ -1,194 +0,0 @@ -use zaino_common::{ - ActivationHeights, DatabaseConfig, StorageConfig, ZEBRAD_DEFAULT_ACTIVATION_HEIGHTS, -}; -use zaino_fetch::jsonrpsee::connector::{test_node_and_return_url, JsonRpSeeConnector}; -use zaino_state::test_dependencies::{BlockCache, BlockCacheConfig, BlockCacheSubscriber}; -#[allow(deprecated)] -use zaino_state::FetchService; -use zaino_testutils::{TestManager, ValidatorExt, ValidatorKind}; -use zebra_chain::{block::Height, parameters::NetworkKind}; -use zebra_state::HashOrHeight; - -#[allow(deprecated)] -async fn create_test_manager_and_block_cache( - validator: &ValidatorKind, - chain_cache: Option, - enable_zaino: bool, - enable_clients: bool, -) -> ( - TestManager, - JsonRpSeeConnector, - BlockCache, - BlockCacheSubscriber, -) { - let activation_heights = match validator { - ValidatorKind::Zebrad => ZEBRAD_DEFAULT_ACTIVATION_HEIGHTS, - ValidatorKind::Zcashd => ActivationHeights::default(), - }; - - let test_manager = TestManager::::launch( - validator, - None, - None, - chain_cache, - enable_zaino, - false, - enable_clients, - ) - .await - .unwrap(); - - let json_service = JsonRpSeeConnector::new_with_basic_auth( - test_node_and_return_url( - &test_manager.full_node_rpc_listen_address.to_string(), - None, - Some("xxxxxx".to_string()), - Some("xxxxxx".to_string()), - ) - .await - .unwrap(), - "xxxxxx".to_string(), - "xxxxxx".to_string(), - ) - .unwrap(); - - let network = match test_manager.network { - NetworkKind::Regtest => zebra_chain::parameters::Network::new_regtest( - zebra_chain::parameters::testnet::ConfiguredActivationHeights::from(activation_heights) - .into(), - ), - NetworkKind::Testnet => zebra_chain::parameters::Network::new_default_testnet(), - NetworkKind::Mainnet => zebra_chain::parameters::Network::Mainnet, - }; - - let block_cache_config = BlockCacheConfig { - storage: StorageConfig { - database: DatabaseConfig { - path: test_manager.data_dir.clone(), - ..Default::default() - }, - ..Default::default() - }, - db_version: 1, - network: network.into(), - }; - - let block_cache = BlockCache::spawn(&json_service, None, block_cache_config) - .await - .unwrap(); - - let block_cache_subscriber = block_cache.subscriber(); - - ( - test_manager, - json_service, - block_cache, - block_cache_subscriber, - ) -} - -async fn launch_local_cache(validator: &ValidatorKind) { - let (_test_manager, _json_service, _block_cache, block_cache_subscriber) = - create_test_manager_and_block_cache::(validator, None, false, false).await; - - dbg!(block_cache_subscriber.status()); -} - -/// Launches a testmanager and block cache and generates `n*100` blocks, checking blocks are stored and fetched correctly. -async fn launch_local_cache_process_n_block_batches( - validator: &ValidatorKind, - batches: u32, -) { - let (test_manager, json_service, mut block_cache, mut block_cache_subscriber) = - create_test_manager_and_block_cache::(validator, None, true, false).await; - - let finalised_state = block_cache.finalised_state.take().unwrap(); - let finalised_state_subscriber = block_cache_subscriber.finalised_state.take().unwrap(); - - for _ in 1..=batches { - test_manager.generate_blocks_and_poll(100).await; - - // Check chain height in validator, non-finalised state and finalised state. - let validator_height = dbg!(json_service.get_blockchain_info().await.unwrap().blocks.0); - let non_finalised_state_height = - dbg!(block_cache_subscriber.get_chain_height().await.unwrap().0); - let finalised_state_height = - dbg!(dbg!(finalised_state.get_db_height()).unwrap_or(Height(0)).0); - - assert_eq!(&validator_height, &non_finalised_state_height); - assert_eq!( - (&non_finalised_state_height.saturating_sub(101)), - &finalised_state_height - ); - - // Fetch blocks in non-finalised state. - let mut non_finalised_state_blocks = Vec::new(); - for height in (finalised_state_height + 1)..=non_finalised_state_height { - let block = block_cache_subscriber - .non_finalised_state - .get_compact_block(HashOrHeight::Height(Height(height))) - .await - .unwrap(); - non_finalised_state_blocks.push(block); - } - - // Fetch blocks in finalised state. - let mut finalised_state_blocks = Vec::new(); - for height in 1..=finalised_state_height { - let block = finalised_state_subscriber - .get_compact_block(HashOrHeight::Height(Height(height))) - .await - .unwrap(); - finalised_state_blocks.push(block); - } - - dbg!(non_finalised_state_blocks.first()); - dbg!(non_finalised_state_blocks.last()); - dbg!(finalised_state_blocks.first()); - dbg!(finalised_state_blocks.last()); - } -} - -mod zcashd { - use zaino_testutils::ValidatorKind; - use zcash_local_net::validator::zcashd::Zcashd; - - use crate::{launch_local_cache, launch_local_cache_process_n_block_batches}; - - #[tokio::test] - async fn launch_local_cache_zcashd() { - launch_local_cache::(&ValidatorKind::Zcashd).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn process_100_blocks() { - launch_local_cache_process_n_block_batches::(&ValidatorKind::Zcashd, 1).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn process_200_blocks() { - launch_local_cache_process_n_block_batches::(&ValidatorKind::Zcashd, 2).await; - } -} - -mod zebrad { - use zaino_testutils::ValidatorKind; - use zcash_local_net::validator::zebrad::Zebrad; - - use crate::{launch_local_cache, launch_local_cache_process_n_block_batches}; - - #[tokio::test] - async fn launch_local_cache_zebrad() { - launch_local_cache::(&ValidatorKind::Zebrad).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn process_100_blocks() { - launch_local_cache_process_n_block_batches::(&ValidatorKind::Zebrad, 1).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn process_200_blocks() { - launch_local_cache_process_n_block_batches::(&ValidatorKind::Zebrad, 2).await; - } -} diff --git a/integration-tests/tests/state_service.rs b/integration-tests/tests/state_service.rs index 126c1e884..b4019ce25 100644 --- a/integration-tests/tests/state_service.rs +++ b/integration-tests/tests/state_service.rs @@ -2925,10 +2925,19 @@ mod zebra { .unwrap() .height; + // NOTE / TODO: Zaino can not currently serve non standard script types in compact blocks, + // because of this it does not return the script pub key for the coinbase transaction of the + // genesis block. We should decide whether / how to fix this. + // + // For this reason this test currently does not fetch the genesis block. + // + // Issue: https://github.com/zingolabs/zaino/issues/818 + // + // To see bug update start height of get_block_range to 0. let compact_block_range = state_service_subscriber .get_block_range(BlockRange { start: Some(BlockId { - height: 0, + height: 1, hash: Vec::new(), }), end: Some(BlockId { @@ -2947,6 +2956,7 @@ mod zebra { for cb in compact_block_range.into_iter() { for tx in cb.vtx { + dbg!(&tx); // script pub key of this transaction is not empty assert!(!tx.vout.first().unwrap().script_pub_key.is_empty()); } diff --git a/zaino-common/src/lib.rs b/zaino-common/src/lib.rs index 10621c052..d02052292 100644 --- a/zaino-common/src/lib.rs +++ b/zaino-common/src/lib.rs @@ -5,6 +5,8 @@ pub mod config; pub mod net; +pub mod probing; +pub mod status; // Re-export network utilities pub use net::{resolve_socket_addr, try_resolve_address, AddressResolution}; diff --git a/zaino-common/src/probing.rs b/zaino-common/src/probing.rs new file mode 100644 index 000000000..d6825bc2f --- /dev/null +++ b/zaino-common/src/probing.rs @@ -0,0 +1,71 @@ +//! Service health and readiness probing traits. +//! +//! This module provides decoupled traits for health and readiness checks, +//! following the Kubernetes probe model: +//! +//! - [`Liveness`]: Is the component alive and functioning? +//! - [`Readiness`]: Is the component ready to serve requests? +//! - [`VitalsProbe`]: Combined trait for components supporting both probes. +//! +//! These traits are intentionally simple (returning `bool`) and decoupled +//! from any specific status type, allowing flexible implementation across +//! different components. +//! +//! # Example +//! +//! ``` +//! use zaino_common::probing::{Liveness, Readiness, VitalsProbe}; +//! +//! struct MyService { +//! connected: bool, +//! synced: bool, +//! } +//! +//! impl Liveness for MyService { +//! fn is_live(&self) -> bool { +//! self.connected +//! } +//! } +//! +//! impl Readiness for MyService { +//! fn is_ready(&self) -> bool { +//! self.connected && self.synced +//! } +//! } +//! +//! // VitalsProbe is automatically implemented via blanket impl +//! fn check_service(service: &impl VitalsProbe) { +//! println!("Live: {}, Ready: {}", service.is_live(), service.is_ready()); +//! } +//! ``` + +/// Liveness probe: Is this component alive and functioning? +/// +/// A component is considered "live" if it is not in a broken or +/// unrecoverable state. This corresponds to Kubernetes liveness probes. +/// +/// Failure to be live typically means the component should be restarted. +pub trait Liveness { + /// Returns `true` if the component is alive and functioning. + fn is_live(&self) -> bool; +} + +/// Readiness probe: Is this component ready to serve requests? +/// +/// A component is considered "ready" if it can accept and process +/// requests. This corresponds to Kubernetes readiness probes. +/// +/// A component may be live but not ready (e.g., still syncing). +pub trait Readiness { + /// Returns `true` if the component is ready to serve requests. + fn is_ready(&self) -> bool; +} + +/// Combined vitals probe for components supporting both liveness and readiness. +/// +/// This trait is automatically implemented for any type that implements +/// both [`Liveness`] and [`Readiness`]. +pub trait VitalsProbe: Liveness + Readiness {} + +// Blanket implementation: anything with Liveness + Readiness gets VitalsProbe +impl VitalsProbe for T {} diff --git a/zaino-common/src/status.rs b/zaino-common/src/status.rs new file mode 100644 index 000000000..e544c6b5f --- /dev/null +++ b/zaino-common/src/status.rs @@ -0,0 +1,146 @@ +//! Service status types and traits. +//! +//! This module provides: +//! - [`StatusType`]: An enum representing service operational states +//! - [`Status`]: A trait for types that can report their status +//! +//! Types implementing [`Status`] automatically gain [`Liveness`](crate::probing::Liveness) +//! and [`Readiness`](crate::probing::Readiness) implementations via blanket impls. + +use std::fmt; + +use crate::probing::{Liveness, Readiness}; + +/// Status of a service component. +/// +/// Represents the operational state of a component. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum StatusType { + /// Running initial startup routine. + Spawning = 0, + /// Back-end process is currently syncing. + Syncing = 1, + /// Process is ready. + Ready = 2, + /// Process is busy working. + Busy = 3, + /// Running shutdown routine. + Closing = 4, + /// Offline. + Offline = 5, + /// Non-critical errors. + RecoverableError = 6, + /// Critical errors. + CriticalError = 7, +} + +impl From for StatusType { + fn from(value: usize) -> Self { + match value { + 0 => StatusType::Spawning, + 1 => StatusType::Syncing, + 2 => StatusType::Ready, + 3 => StatusType::Busy, + 4 => StatusType::Closing, + 5 => StatusType::Offline, + 6 => StatusType::RecoverableError, + _ => StatusType::CriticalError, + } + } +} + +impl From for usize { + fn from(status: StatusType) -> Self { + status as usize + } +} + +impl fmt::Display for StatusType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let status_str = match self { + StatusType::Spawning => "Spawning", + StatusType::Syncing => "Syncing", + StatusType::Ready => "Ready", + StatusType::Busy => "Busy", + StatusType::Closing => "Closing", + StatusType::Offline => "Offline", + StatusType::RecoverableError => "RecoverableError", + StatusType::CriticalError => "CriticalError", + }; + write!(f, "{status_str}") + } +} + +impl StatusType { + /// Returns the corresponding status symbol for the StatusType. + pub fn get_status_symbol(&self) -> String { + let (symbol, color_code) = match self { + // Yellow Statuses + StatusType::Syncing => ("\u{1F7E1}", "\x1b[33m"), + // Cyan Statuses + StatusType::Spawning | StatusType::Busy => ("\u{1F7E1}", "\x1b[36m"), + // Green Status + StatusType::Ready => ("\u{1F7E2}", "\x1b[32m"), + // Grey Statuses + StatusType::Closing | StatusType::Offline => ("\u{26AB}", "\x1b[90m"), + // Red Error Statuses + StatusType::RecoverableError | StatusType::CriticalError => ("\u{1F534}", "\x1b[31m"), + }; + + format!("{}{}{}", color_code, symbol, "\x1b[0m") + } + + /// Look at two statuses, and return the more 'severe' of the two. + pub fn combine(self, other: StatusType) -> StatusType { + match (self, other) { + // If either is Closing, return Closing. + (StatusType::Closing, _) | (_, StatusType::Closing) => StatusType::Closing, + // If either is Offline or CriticalError, return CriticalError. + (StatusType::Offline, _) + | (_, StatusType::Offline) + | (StatusType::CriticalError, _) + | (_, StatusType::CriticalError) => StatusType::CriticalError, + // If either is RecoverableError, return RecoverableError. + (StatusType::RecoverableError, _) | (_, StatusType::RecoverableError) => { + StatusType::RecoverableError + } + // If either is Spawning, return Spawning. + (StatusType::Spawning, _) | (_, StatusType::Spawning) => StatusType::Spawning, + // If either is Syncing, return Syncing. + (StatusType::Syncing, _) | (_, StatusType::Syncing) => StatusType::Syncing, + // Otherwise, return Ready. + _ => StatusType::Ready, + } + } + + /// Returns `true` if this status indicates the component is alive (liveness probe). + pub fn is_live(self) -> bool { + !matches!(self, StatusType::Offline | StatusType::CriticalError) + } + + /// Returns `true` if this status indicates the component is ready to serve (readiness probe). + pub fn is_ready(self) -> bool { + matches!(self, StatusType::Ready | StatusType::Busy) + } +} + +/// Trait for types that can report their [`StatusType`]. +/// +/// Implementing this trait automatically provides [`Liveness`] and [`Readiness`] +/// implementations via blanket impls. +pub trait Status { + /// Returns the current status of this component. + fn status(&self) -> StatusType; +} + +impl Liveness for T { + fn is_live(&self) -> bool { + self.status().is_live() + } +} + +impl Readiness for T { + fn is_ready(&self) -> bool { + self.status().is_ready() + } +} diff --git a/zaino-proto/src/proto/utils.rs b/zaino-proto/src/proto/utils.rs index 4eaf60f87..1670a7c39 100644 --- a/zaino-proto/src/proto/utils.rs +++ b/zaino-proto/src/proto/utils.rs @@ -300,6 +300,57 @@ pub fn blockid_to_hashorheight(block_id: BlockId) -> Option { .ok() } +/// prunes a compact block from transaction in formation related to pools not included in the +/// `pool_types` vector. +/// Note: for backwards compatibility an empty vector will return Sapling and Orchard Tx info. +pub fn compact_block_with_pool_types( + mut block: CompactBlock, + pool_types: &[PoolType], +) -> CompactBlock { + if pool_types.is_empty() { + for compact_tx in &mut block.vtx { + // strip out transparent inputs if not Requested + compact_tx.vin.clear(); + compact_tx.vout.clear(); + } + + // Omit transactions that have no Sapling/Orchard elements. + block.vtx.retain(|compact_tx| { + !compact_tx.spends.is_empty() + || !compact_tx.outputs.is_empty() + || !compact_tx.actions.is_empty() + }); + } else { + for compact_tx in &mut block.vtx { + // strip out transparent inputs if not Requested + if !pool_types.contains(&PoolType::Transparent) { + compact_tx.vin.clear(); + compact_tx.vout.clear(); + } + // strip out sapling if not requested + if !pool_types.contains(&PoolType::Sapling) { + compact_tx.spends.clear(); + compact_tx.outputs.clear(); + } + // strip out orchard if not requested + if !pool_types.contains(&PoolType::Orchard) { + compact_tx.actions.clear(); + } + } + + // Omit transactions that have no elements in any requested pool type. + block.vtx.retain(|compact_tx| { + !compact_tx.vin.is_empty() + || !compact_tx.vout.is_empty() + || !compact_tx.spends.is_empty() + || !compact_tx.outputs.is_empty() + || !compact_tx.actions.is_empty() + }); + } + + block +} + /// Strips the ouputs and from all transactions, retains only /// the nullifier from all orcard actions, and clears the chain /// metadata from the block diff --git a/zaino-state/src/backends/fetch.rs b/zaino-state/src/backends/fetch.rs index d0afd5367..7118ab759 100644 --- a/zaino-state/src/backends/fetch.rs +++ b/zaino-state/src/backends/fetch.rs @@ -65,7 +65,7 @@ use crate::{ indexer::{ handle_raw_transaction, IndexerSubscriber, LightWalletIndexer, ZcashIndexer, ZcashService, }, - status::StatusType, + status::{Status, StatusType}, stream::{ AddressStream, CompactBlockStream, CompactTransactionStream, RawTransactionStream, UtxoReplyStream, @@ -97,6 +97,13 @@ pub struct FetchService { config: FetchServiceConfig, } +#[allow(deprecated)] +impl Status for FetchService { + fn status(&self) -> StatusType { + self.indexer.status() + } +} + #[async_trait] #[allow(deprecated)] impl ZcashService for FetchService { @@ -138,7 +145,7 @@ impl ZcashService for FetchService { config, }; - while fetch_service.status().await != StatusType::Ready { + while fetch_service.status() != StatusType::Ready { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } @@ -155,11 +162,6 @@ impl ZcashService for FetchService { }) } - /// Fetches the current status - async fn status(&self) -> StatusType { - self.indexer.status() - } - /// Shuts down the StateService. fn close(&mut self) { tokio::task::block_in_place(|| { @@ -194,9 +196,16 @@ pub struct FetchServiceSubscriber { config: FetchServiceConfig, } +impl Status for FetchServiceSubscriber { + fn status(&self) -> StatusType { + self.indexer.status() + } +} + impl FetchServiceSubscriber { /// Fetches the current status - pub fn status(&self) -> StatusType { + #[deprecated(note = "Use the Status trait method instead")] + pub fn get_status(&self) -> StatusType { self.indexer.status() } @@ -693,7 +702,7 @@ impl LightWalletIndexer for FetchServiceSubscriber { /// Return the height of the tip of the best chain async fn get_latest_block(&self) -> Result { let tip = self.indexer.snapshot_nonfinalized_state().best_tip; - dbg!(&tip); + // dbg!(&tip); Ok(BlockId { height: tip.height.0 as u64, @@ -744,16 +753,6 @@ impl LightWalletIndexer for FetchServiceSubscriber { is greater than the best chain tip [{chain_height}].", ))), ), - HashOrHeight::Height(height) - if height > self.data.network().sapling_activation_height() => - { - Err(FetchServiceError::TonicStatusError( - tonic::Status::out_of_range(format!( - "Error: Height out of range [{hash_or_height}]. Height requested \ - is below sapling activation height [{chain_height}].", - )), - )) - } _otherwise => Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( "Error: Failed to retrieve block from state.", ))), @@ -768,16 +767,6 @@ impl LightWalletIndexer for FetchServiceSubscriber { is greater than the best chain tip [{chain_height}].", ))), ), - HashOrHeight::Height(height) - if height > self.data.network().sapling_activation_height() => - { - Err(FetchServiceError::TonicStatusError( - tonic::Status::out_of_range(format!( - "Error: Height out of range [{hash_or_height}]. Height requested \ - is below sapling activation height [{chain_height}].", - )), - )) - } _otherwise => // TODO: Hide server error from clients before release. Currently useful for dev purposes. { diff --git a/zaino-state/src/backends/state.rs b/zaino-state/src/backends/state.rs index 407126716..f6448872d 100644 --- a/zaino-state/src/backends/state.rs +++ b/zaino-state/src/backends/state.rs @@ -5,23 +5,21 @@ use crate::{ chain_index::{ mempool::{Mempool, MempoolSubscriber}, source::ValidatorConnector, + types as chain_types, ChainIndex, NonFinalizedSnapshot, }, config::StateServiceConfig, + error::ChainIndexError, error::{BlockCacheError, StateServiceError}, indexer::{ handle_raw_transaction, IndexerSubscriber, LightWalletIndexer, ZcashIndexer, ZcashService, }, - local_cache::{ - compact_block_to_nullifiers, compact_block_with_pool_types, BlockCache, - BlockCacheSubscriber, - }, - status::{AtomicStatus, StatusType}, + status::{AtomicStatus, Status, StatusType}, stream::{ AddressStream, CompactBlockStream, CompactTransactionStream, RawTransactionStream, UtxoReplyStream, }, utils::{get_build_info, ServiceMetadata}, - BackendType, MempoolKey, + BackendType, MempoolKey, NodeBackedChainIndex, NodeBackedChainIndexSubscriber, State, }; use nonempty::NonEmpty; use tokio_stream::StreamExt as _; @@ -40,6 +38,10 @@ use zaino_fetch::{ }, }, }; +use zaino_proto::proto::utils::{ + blockid_to_hashorheight, compact_block_to_nullifiers, GetBlockRangeError, PoolTypeError, + PoolTypeFilter, ValidatedBlockRangeRequest, +}; use zaino_proto::proto::{ compact_formats::CompactBlock, service::{ @@ -47,10 +49,6 @@ use zaino_proto::proto::{ GetAddressUtxosReplyList, GetMempoolTxRequest, LightdInfo, PingResponse, RawTransaction, SendResponse, TransparentAddressBlockFilter, TreeState, TxFilter, }, - utils::{ - blockid_to_hashorheight, pool_types_from_vector, PoolTypeError, PoolTypeFilter, - ValidatedBlockRangeRequest, - }, }; use zcash_protocol::consensus::NetworkType; @@ -87,11 +85,12 @@ use chrono::{DateTime, Utc}; use futures::{TryFutureExt as _, TryStreamExt as _}; use hex::{FromHex as _, ToHex}; use indexmap::IndexMap; -use std::{collections::HashSet, error::Error, fmt, future::poll_fn, str::FromStr, sync::Arc}; +use std::{collections::HashSet, error::Error, fmt, str::FromStr, sync::Arc}; use tokio::{ sync::mpsc, time::{self, timeout}, }; +use tokio_stream::StreamExt as _; use tonic::async_trait; use tower::{Service, ServiceExt}; use tracing::{info, warn}; @@ -116,60 +115,38 @@ macro_rules! expected_read_response { /// If we want the ability to clone Service all JoinHandle's should be /// converted to Arc\. #[derive(Debug)] -#[deprecated = "Will be eventually replaced by `BlockchainSource"] +// #[deprecated = "Will be eventually replaced by `BlockchainSource"] pub struct StateService { /// `ReadeStateService` from Zebra-State. read_state_service: ReadStateService, + /// Internal mempool. + mempool: Mempool, + + /// StateService config data. + #[allow(deprecated)] + config: StateServiceConfig, + + /// Listener for when the chain tip changes + chain_tip_change: zebra_state::ChainTipChange, + /// Sync task handle. sync_task_handle: Option>>, /// JsonRPC Client. rpc_client: JsonRpSeeConnector, - /// Local compact block cache. - block_cache: BlockCache, - - /// Internal mempool. - mempool: Mempool, + /// Core indexer. + indexer: NodeBackedChainIndex, /// Service metadata. data: ServiceMetadata, - /// StateService config data. - #[allow(deprecated)] - config: StateServiceConfig, - /// Thread-safe status indicator. status: AtomicStatus, - - /// Listener for when the chain tip changes - chain_tip_change: zebra_state::ChainTipChange, } -#[allow(deprecated)] impl StateService { - /// Uses poll_ready to update the status of the `ReadStateService`. - async fn fetch_status_from_validator(&self) -> StatusType { - let mut read_state_service = self.read_state_service.clone(); - poll_fn(|cx| match read_state_service.poll_ready(cx) { - std::task::Poll::Ready(Ok(())) => { - self.status.store(StatusType::Ready); - std::task::Poll::Ready(StatusType::Ready) - } - std::task::Poll::Ready(Err(e)) => { - eprintln!("Service readiness error: {e:?}"); - self.status.store(StatusType::CriticalError); - std::task::Poll::Ready(StatusType::CriticalError) - } - std::task::Poll::Pending => { - self.status.store(StatusType::Busy); - std::task::Poll::Pending - } - }) - .await - } - #[cfg(feature = "test_dependencies")] /// Helper for tests pub fn read_state_service(&self) -> &ReadStateService { @@ -177,8 +154,19 @@ impl StateService { } } +impl Status for StateService { + fn status(&self) -> StatusType { + let current_status = self.status.load(); + if current_status == StatusType::Closing { + current_status + } else { + self.indexer.status() + } + } +} + #[async_trait] -#[allow(deprecated)] +// #[allow(deprecated)] impl ZcashService for StateService { const BACKEND_TYPE: BackendType = BackendType::State; @@ -272,13 +260,12 @@ impl ZcashService for StateService { } } - let block_cache = BlockCache::spawn( - &rpc_client, - Some(&read_state_service), - config.clone().into(), - ) - .await?; - + // let block_cache = BlockCache::spawn( + // &rpc_client, + // Some(&read_state_service), + // config.clone().into(), + // ) + // .await?; let mempool_source = ValidatorConnector::State(crate::chain_index::source::State { read_state_service: read_state_service.clone(), mempool_fetcher: rpc_client.clone(), @@ -287,13 +274,24 @@ impl ZcashService for StateService { let mempool = Mempool::spawn(mempool_source, None).await?; + let chain_index = NodeBackedChainIndex::new( + ValidatorConnector::State(State { + read_state_service: read_state_service.clone(), + mempool_fetcher: rpc_client.clone(), + network: config.network, + }), + config.clone().into(), + ) + .await + .unwrap(); + let state_service = Self { chain_tip_change, read_state_service, sync_task_handle: Some(Arc::new(sync_task_handle)), rpc_client: rpc_client.clone(), - block_cache, mempool, + indexer: chain_index, data, config, status: AtomicStatus::new(StatusType::Spawning), @@ -308,27 +306,14 @@ impl ZcashService for StateService { IndexerSubscriber::new(StateServiceSubscriber { read_state_service: self.read_state_service.clone(), rpc_client: self.rpc_client.clone(), - block_cache: self.block_cache.subscriber(), mempool: self.mempool.subscriber(), + indexer: self.indexer.subscriber(), data: self.data.clone(), config: self.config.clone(), chain_tip_change: self.chain_tip_change.clone(), }) } - /// Returns the StateService's Status. - /// - /// We first check for `status = StatusType::Closing` as this signifies a shutdown order - /// from an external process. - async fn status(&self) -> StatusType { - let current_status = self.status.load(); - if current_status == StatusType::Closing { - current_status - } else { - self.fetch_status_from_validator().await - } - } - /// Shuts down the StateService. fn close(&mut self) { if self.sync_task_handle.is_some() { @@ -350,29 +335,35 @@ impl Drop for StateService { /// /// Subscribers should be #[derive(Debug, Clone)] -#[deprecated] +// #[deprecated] pub struct StateServiceSubscriber { /// Remote wrappper functionality for zebra's [`ReadStateService`]. pub read_state_service: ReadStateService, - /// JsonRPC Client. - pub rpc_client: JsonRpSeeConnector, - - /// Local compact block cache. - pub block_cache: BlockCacheSubscriber, - /// Internal mempool. pub mempool: MempoolSubscriber, - /// Service metadata. - pub data: ServiceMetadata, - /// StateService config data. #[allow(deprecated)] config: StateServiceConfig, /// Listener for when the chain tip changes chain_tip_change: zebra_state::ChainTipChange, + + /// JsonRPC Client. + pub rpc_client: JsonRpSeeConnector, + + /// Core indexer. + pub indexer: NodeBackedChainIndexSubscriber, + + /// Service metadata. + pub data: ServiceMetadata, +} + +impl Status for StateServiceSubscriber { + fn status(&self) -> StatusType { + self.indexer.status() + } } /// A subscriber to any chaintip updates @@ -398,7 +389,7 @@ impl ChainTipSubscriber { /// /// These would be simple to add to the public interface if /// needed, there are currently no plans to do so. -#[allow(deprecated)] +// #[allow(deprecated)] impl StateServiceSubscriber { /// Gets a Subscriber to any updates to the latest chain tip pub fn chaintip_update_subscriber(&self) -> ChainTipSubscriber { @@ -554,135 +545,127 @@ impl StateServiceSubscriber { async fn get_block_range_inner( &self, request: BlockRange, - trim_non_nullifier: bool, + nullifiers_only: bool, ) -> Result { - let mut validated_request = ValidatedBlockRangeRequest::new_from_block_range(&request) + let validated_request = ValidatedBlockRangeRequest::new_from_block_range(&request) .map_err(StateServiceError::from)?; - // FIXME: this should be changed but this logic is hard to understand and we lack tests. - // we will maintain the behaviour with less smelly code - let lowest_to_highest = if validated_request.is_reverse_ordered() { - validated_request.reverse(); - false - } else { - true - }; + let pool_type_filter = PoolTypeFilter::new_from_pool_types(&validated_request.pool_types()) + .map_err(GetBlockRangeError::PoolTypeArgumentError) + .map_err(StateServiceError::from)?; - let start = validated_request.start(); - let end = validated_request.end(); - let chain_height: u64 = self.block_cache.get_chain_height().await?.0 as u64; - let fetch_service_clone = self.clone(); + // Note conversion here is safe due to the use of [`ValidatedBlockRangeRequest::new_from_block_range`] + let start = validated_request.start() as u32; + let end = validated_request.end() as u32; + + let state_service_clone = self.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); - let pool_types = match pool_types_from_vector(&request.pool_types) { - Ok(p) => Ok(p), - Err(e) => Err(match e { - PoolTypeError::InvalidPoolType => StateServiceError::UnhandledRpcError( - "PoolType::Invalid specified as argument in `BlockRange`.".to_string(), - ), - PoolTypeError::UnknownPoolType(t) => StateServiceError::UnhandledRpcError(format!( - "Unknown value specified in `BlockRange`. Value '{}' is not a known PoolType.", - t - )), - }), - }?; - // FIX: find out why there's repeated code fetching the chain tip and then the rest tokio::spawn(async move { - let timeout = timeout( - time::Duration::from_secs((service_timeout * 4) as u64), - async { - let mut blocks = NonEmpty::new( - match fetch_service_clone - .block_cache - .get_compact_block(end.to_string()) - .await - { - Ok(mut block) => { - if trim_non_nullifier { - block = compact_block_to_nullifiers(block); - } else { - block = compact_block_with_pool_types(block, &pool_types); + let timeout_result = timeout( + time::Duration::from_secs((service_timeout * 4) as u64), + async { + let snapshot = state_service_clone.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_chaintip().height.0; + + match state_service_clone + .indexer + .get_compact_block_stream( + &snapshot, + chain_types::Height(start), + chain_types::Height(end), + pool_type_filter.clone(), + ) + .await + { + Ok(Some(mut compact_block_stream)) => { + if nullifiers_only { + while let Some(stream_item) = compact_block_stream.next().await { + match stream_item { + Ok(block) => { + if channel_tx + .send(Ok(compact_block_to_nullifiers(block))) + .await + .is_err() + { + break; + } + } + Err(status) => { + if channel_tx.send(Err(status)).await.is_err() { + break; + } } - Ok(block) + } } - Err(e) => { - if end >= chain_height { - Err(tonic::Status::out_of_range(format!( - "Error: Height out of range [{end}]. Height \ - requested is greater than the best \ - chain tip [{chain_height}].", - ))) - } else { - Err(tonic::Status::unknown(e.to_string())) + } else { + while let Some(stream_item) = compact_block_stream.next().await { + if channel_tx.send(stream_item).await.is_err() { + break; } } - }, - ); - for i in start..end { - let Ok(child_block) = blocks.last() else { - break; - }; - let Ok(hash_or_height) = - <[u8; 32]>::try_from(child_block.prev_hash.as_slice()) - .map(zebra_chain::block::Hash) - .map(HashOrHeight::from) - else { - break; - }; - blocks.push( - match fetch_service_clone - .block_cache - .get_compact_block(hash_or_height.to_string()) + } + } + Ok(None) => { + // Per `get_compact_block_stream` semantics: `None` means at least one bound is above the tip. + let offending_height = if start > chain_height { start } else { end }; + + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{offending_height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) + .await + { + Ok(_) => {} + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + } + } + } + Err(e) => { + // Preserve previous behaviour: if the request is above tip, surface OutOfRange; + // otherwise return the error (currently exposed for dev). + if start > chain_height || end > chain_height { + let offending_height = if start > chain_height { start } else { end }; + + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{offending_height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) .await { - Ok(mut block) => { - if trim_non_nullifier { - block = compact_block_to_nullifiers(block); - } else { - block = compact_block_with_pool_types(block, &pool_types); - } - Ok(block) - } + Ok(_) => {} Err(e) => { - let height = end - (i - start); - if height >= chain_height { - Err(tonic::Status::out_of_range(format!( - "Error: Height out of range [{height}]. Height requested \ - is greater than the best chain tip [{chain_height}].", - ))) - } else { - Err(tonic::Status::unknown(e.to_string())) - } + warn!("GetBlockRange channel closed unexpectedly: {}", e); } - }, - ); - } - if lowest_to_highest { - blocks = NonEmpty::from_vec(blocks.into_iter().rev().collect::>()) - .expect("known to be non-empty") - } - for block in blocks { - if let Err(e) = channel_tx.send(block).await { - warn!("GetBlockRange channel closed unexpectedly: {e}"); - break; + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + warn!("GetBlockRangeStream closed unexpectedly: {}", e); + } } } - }, - ) - .await; - match timeout { - Ok(_) => {} - Err(_) => { - channel_tx - .send(Err(tonic::Status::deadline_exceeded( - "Error: get_block_range gRPC request timed out.", - ))) - .await - .ok(); } + }, + ) + .await; + + if timeout_result.is_err() { + channel_tx + .send(Err(tonic::Status::deadline_exceeded( + "Error: get_block_range gRPC request timed out.", + ))) + .await + .ok(); } }); + Ok(CompactBlockStream::new(channel_rx)) } @@ -691,7 +674,8 @@ impl StateServiceSubscriber { e: BlockCacheError, height: u32, ) -> Result { - let chain_height = self.block_cache.get_chain_height().await?.0; + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_chaintip().height.0; Err(if height >= chain_height { StateServiceError::TonicStatusError(tonic::Status::out_of_range(format!( "Error: Height out of range [{height}]. Height requested \ @@ -1017,7 +1001,7 @@ impl StateServiceSubscriber { } #[async_trait] -#[allow(deprecated)] +// #[allow(deprecated)] impl ZcashIndexer for StateServiceSubscriber { type Error = StateServiceError; @@ -1262,7 +1246,7 @@ impl ZcashIndexer for StateServiceSubscriber { /// `usage` is the total memory usage for the mempool, in bytes. /// the [optional `fullyNotified` field](), is only utilized for zcashd regtests, is deprecated, and is not included. async fn get_mempool_info(&self) -> Result { - Ok(self.mempool.get_mempool_info().await.into()) + Ok(self.indexer.get_mempool_info().await.into()) } async fn get_peer_info(&self) -> Result { @@ -1463,11 +1447,11 @@ impl ZcashIndexer for StateServiceSubscriber { async fn get_raw_mempool(&self) -> Result, Self::Error> { Ok(self - .mempool - .get_mempool() - .await + .indexer + .get_mempool_txids() + .await? .into_iter() - .map(|(key, _)| key.txid) + .map(|txid| txid.to_string()) .collect()) } @@ -1563,7 +1547,9 @@ impl ZcashIndexer for StateServiceSubscriber { /// method: post /// tags: blockchain async fn get_block_count(&self) -> Result { - Ok(self.block_cache.get_chain_height().await?) + let nfs_snapshot = self.indexer.snapshot_nonfinalized_state(); + let h = nfs_snapshot.best_tip.height; + Ok(h.into()) } async fn validate_address( @@ -1749,25 +1735,27 @@ impl ZcashIndexer for StateServiceSubscriber { // This should be None for sidechain transactions, // which currently aren't returned by ReadResponse::Transaction let best_chain_height = Some(tx.height); - GetRawTransaction::Object(Box::new( - TransactionObject::from_transaction( - tx.tx.clone(), - best_chain_height, - Some(tx.confirmations), - &self.config.network.into(), - Some(tx.block_time), - Some(zebra_chain::block::Hash::from_bytes( - self.block_cache - .get_compact_block( - HashOrHeight::Height(tx.height).to_string(), - ) - .await? - .hash, - )), - Some(best_chain_height.is_some()), - tx.tx.hash(), - ), - )) + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let compact_block = self + .indexer + .get_compact_block( + &snapshot, + chain_types::Height(tx.height.0), + PoolTypeFilter::includes_all(), + ) + .await? + .ok_or_else(|| ChainIndexError::database_hole(tx.height.0))?; + let tx_object = TransactionObject::from_transaction( + tx.tx.clone(), + best_chain_height, + Some(tx.confirmations), + &self.config.network.into(), + Some(tx.block_time), + Some(zebra_chain::block::Hash::from_bytes(compact_block.hash)), + Some(best_chain_height.is_some()), + tx.tx.hash(), + ); + GetRawTransaction::Object(Box::new(tx_object)) } None => GetRawTransaction::Raw(tx.tx.into()), }), @@ -1921,71 +1909,110 @@ impl ZcashIndexer for StateServiceSubscriber { } #[async_trait] -#[allow(deprecated)] +// #[allow(deprecated)] impl LightWalletIndexer for StateServiceSubscriber { /// Return the height of the tip of the best chain async fn get_latest_block(&self) -> Result { - let mut state = self.read_state_service.clone(); - let response = state - .ready() - .and_then(|service| service.call(ReadRequest::Tip)) - .await?; - let (chain_height, chain_hash) = expected_read_response!(response, Tip).ok_or( - RpcError::new_from_legacycode(LegacyCode::Misc, "no blocks in chain"), - )?; + let tip = self.indexer.snapshot_nonfinalized_state().best_tip; Ok(BlockId { - height: chain_height.as_usize() as u64, - hash: chain_hash.0.to_vec(), + height: tip.height.0 as u64, + hash: tip.blockhash.0.to_vec(), }) } /// Return the compact block corresponding to the given block identifier async fn get_block(&self, request: BlockId) -> Result { - let height = request.height; let hash_or_height = blockid_to_hashorheight(request).ok_or( StateServiceError::TonicStatusError(tonic::Status::invalid_argument( "Error: Invalid hash and/or height out of range. Failed to convert to u32.", )), )?; + + let snapshot = self.indexer.snapshot_nonfinalized_state(); + + // Convert HashOrHeight to chain_types::Height + let block_height = match hash_or_height { + HashOrHeight::Height(h) => chain_types::Height(h.0), + HashOrHeight::Hash(h) => self + .indexer + .get_block_height(&snapshot, chain_types::BlockHash(h.0)) + .await + .map_err(|e| StateServiceError::ChainIndexError(e))? + .ok_or_else(|| { + StateServiceError::TonicStatusError(tonic::Status::not_found( + "Error: Block not found for given hash.", + )) + })?, + }; + match self - .block_cache - .get_compact_block(hash_or_height.to_string()) + .indexer + .get_compact_block(&snapshot, block_height, PoolTypeFilter::default()) .await { - Ok(block) => Ok(compact_block_with_pool_types( - block, - &PoolTypeFilter::default().to_pool_types_vector(), - )), + Ok(Some(block)) => Ok(block), + Ok(None) => { + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + StateServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + _otherwise => Err(StateServiceError::TonicStatusError(tonic::Status::unknown( + "Error: Failed to retrieve block from state.", + ))), + } + } Err(e) => { - self.error_get_block(BlockCacheError::Custom(e.to_string()), height as u32) - .await + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + StateServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + _otherwise => + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + { + Err(StateServiceError::TonicStatusError(tonic::Status::unknown( + format!("Error: Failed to retrieve block from node. Server Error: {e}",), + ))) + } + } } + Err(e) => Err(StateServiceError::ChainIndexError(e)), } } /// Same as GetBlock except actions contain only nullifiers, /// and saling outputs are not returned (Sapling spends still are) async fn get_block_nullifiers(&self, request: BlockId) -> Result { - let height: u32 = match request.height.try_into() { - Ok(height) => height, - Err(_) => { - return Err(StateServiceError::TonicStatusError( - tonic::Status::invalid_argument( - "Error: Height out of range. Failed to convert to u32.", - ), - )); - } - }; + let height: u32 = request.height.try_into().map_err(|_| { + StateServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Height out of range. Failed to convert to u32.", + )) + })?; + + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let block_height = chain_types::Height(height); + match self - .block_cache - .get_compact_block_nullifiers(height.to_string()) + .indexer + .get_compact_block(&snapshot, block_height, PoolTypeFilter::default()) .await { - Ok(block) => Ok(block), - Err(e) => { - self.error_get_block(BlockCacheError::Custom(e.to_string()), height) - .await + Ok(Some(block)) => Ok(compact_block_to_nullifiers(block)), + Ok(None) => { + self.error_get_block( + BlockCacheError::Custom("Block not found".to_string()), + height, + ) + .await } + Err(e) => Err(StateServiceError::ChainIndexError(e)), } } @@ -2355,7 +2382,8 @@ impl LightWalletIndexer for StateServiceSubscriber { let mut mempool = self.mempool.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); - let mempool_height = self.block_cache.get_chain_height().await?.0; + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let mempool_height = snapshot.best_chaintip().height.0; tokio::spawn(async move { let timeout = timeout( time::Duration::from_secs((service_timeout * 6) as u64), diff --git a/zaino-state/src/chain_index.rs b/zaino-state/src/chain_index.rs index 422aadad7..d1d58b7d6 100644 --- a/zaino-state/src/chain_index.rs +++ b/zaino-state/src/chain_index.rs @@ -15,7 +15,7 @@ use crate::chain_index::non_finalised_state::BestTip; use crate::chain_index::types::db::metadata::MempoolInfo; use crate::chain_index::types::{BestChainLocation, NonBestChainLocation}; use crate::error::{ChainIndexError, ChainIndexErrorKind, FinalisedStateError}; -use crate::local_cache::compact_block_with_pool_types; +use crate::status::Status; use crate::{AtomicStatus, CompactBlockStream, StatusType, SyncError}; use crate::{IndexedBlock, TransactionHash}; use std::collections::HashSet; @@ -27,7 +27,7 @@ use non_finalised_state::NonfinalizedBlockCacheSnapshot; use source::{BlockchainSource, ValidatorConnector}; use tokio_stream::StreamExt; use tracing::info; -use zaino_proto::proto::utils::PoolTypeFilter; +use zaino_proto::proto::utils::{compact_block_with_pool_types, PoolTypeFilter}; use zebra_chain::parameters::ConsensusBranchId; pub use zebra_chain::parameters::Network as ZebraNetwork; use zebra_chain::serialization::ZcashSerialize; @@ -533,61 +533,68 @@ impl NodeBackedChainIndex { let fs = self.finalized_db.clone(); let status = self.status.clone(); tokio::task::spawn(async move { - loop { - if status.load() == StatusType::Closing { - break; - } - let handle_error = |e| { - tracing::error!("Sync failure: {e:?}. Shutting down."); - status.store(StatusType::CriticalError); - e - }; - - status.store(StatusType::Syncing); - // Sync nfs to chain tip, trimming blocks to finalized tip. - nfs.sync(fs.clone()).await.map_err(handle_error)?; - - // Sync fs to chain tip - 100. - { - let snapshot = nfs.get_snapshot(); - while snapshot.best_tip.height.0 - > (fs - .to_reader() - .db_height() - .await - .map_err(|_e| handle_error(SyncError::CannotReadFinalizedState))? - .unwrap_or(types::Height(0)) - .0 - + 100) + let result: Result<(), SyncError> = async { + loop { + if status.load() == StatusType::Closing { + break; + } + + status.store(StatusType::Syncing); + // Sync nfs to chain tip, trimming blocks to finalized tip. + nfs.sync(fs.clone()).await?; + + // Sync fs to chain tip - 100. { - let next_finalized_height = fs - .to_reader() - .db_height() - .await - .map_err(|_e| handle_error(SyncError::CannotReadFinalizedState))? - .map(|height| height + 1) - .unwrap_or(types::Height(0)); - let next_finalized_block = snapshot - .blocks - .get( - snapshot - .heights_to_hashes - .get(&(next_finalized_height)) - .ok_or(SyncError::CompetingSyncProcess)?, - ) - .ok_or_else(|| handle_error(SyncError::CompetingSyncProcess))?; - // TODO: Handle write errors better (fix db and continue) - fs.write_block(next_finalized_block.clone()) - .await - .map_err(|_e| handle_error(SyncError::CompetingSyncProcess))?; + let snapshot = nfs.get_snapshot(); + while snapshot.best_tip.height.0 + > (fs + .to_reader() + .db_height() + .await + .map_err(|_e| SyncError::CannotReadFinalizedState)? + .unwrap_or(types::Height(0)) + .0 + + 100) + { + let next_finalized_height = fs + .to_reader() + .db_height() + .await + .map_err(|_e| SyncError::CannotReadFinalizedState)? + .map(|height| height + 1) + .unwrap_or(types::Height(0)); + let next_finalized_block = snapshot + .blocks + .get( + snapshot + .heights_to_hashes + .get(&(next_finalized_height)) + .ok_or(SyncError::CompetingSyncProcess)?, + ) + .ok_or(SyncError::CompetingSyncProcess)?; + // TODO: Handle write errors better (fix db and continue) + fs.write_block(next_finalized_block.clone()) + .await + .map_err(|_e| SyncError::CompetingSyncProcess)?; + } } + status.store(StatusType::Ready); + // TODO: configure sleep duration? + tokio::time::sleep(Duration::from_millis(500)).await + // TODO: Check for shutdown signal. } - status.store(StatusType::Ready); - // TODO: configure sleep duration? - tokio::time::sleep(Duration::from_millis(500)).await - // TODO: Check for shutdown signal. + Ok(()) } - Ok(()) + .await; + + // If the sync loop exited unexpectedly with an error, set CriticalError + // so that liveness checks can detect the failure. + if let Err(ref e) = result { + tracing::error!("Sync loop exited with error: {e:?}"); + status.store(StatusType::CriticalError); + } + + result }) } } @@ -607,8 +614,8 @@ pub struct NodeBackedChainIndexSubscriber NodeBackedChainIndexSubscriber { - /// Displays the status of the chain_index - pub fn status(&self) -> StatusType { + /// Returns the combined status of all chain index components. + pub fn combined_status(&self) -> StatusType { let finalized_status = self.finalized_state.status(); let mempool_status = self.mempool.status(); let combined_status = self @@ -680,6 +687,12 @@ impl NodeBackedChainIndexSubscriber { } } +impl Status for NodeBackedChainIndexSubscriber { + fn status(&self) -> StatusType { + self.combined_status() + } +} + impl ChainIndex for NodeBackedChainIndexSubscriber { type Snapshot = Arc; type Error = ChainIndexError; diff --git a/zaino-state/src/chain_index/finalised_state/db/v0.rs b/zaino-state/src/chain_index/finalised_state/db/v0.rs index 0c7bc1436..525b0c9ee 100644 --- a/zaino-state/src/chain_index/finalised_state/db/v0.rs +++ b/zaino-state/src/chain_index/finalised_state/db/v0.rs @@ -49,12 +49,15 @@ use crate::{ }, config::BlockCacheConfig, error::FinalisedStateError, - local_cache::compact_block_with_pool_types, status::{AtomicStatus, StatusType}, CompactBlockStream, Height, IndexedBlock, }; -use zaino_proto::proto::{compact_formats::CompactBlock, service::PoolType, utils::PoolTypeFilter}; +use zaino_proto::proto::{ + compact_formats::CompactBlock, + service::PoolType, + utils::{compact_block_with_pool_types, PoolTypeFilter}, +}; use zebra_chain::{ block::{Hash as ZebraHash, Height as ZebraHeight}, diff --git a/zaino-state/src/chain_index/finalised_state/db/v1.rs b/zaino-state/src/chain_index/finalised_state/db/v1.rs index 82dbb2878..b0e407839 100644 --- a/zaino-state/src/chain_index/finalised_state/db/v1.rs +++ b/zaino-state/src/chain_index/finalised_state/db/v1.rs @@ -953,12 +953,41 @@ impl DbV1 { let block_height = block.index().height(); let block_height_bytes = block_height.to_bytes()?; - // check this is the *next* block in the chain. - tokio::task::block_in_place(|| { + // Check if this specific block already exists (idempotent write support for shared DB). + // This handles the case where multiple processes share the same ZainoDB. + let block_already_exists = tokio::task::block_in_place(|| { let ro = self.env.begin_ro_txn()?; - let cur = ro.open_ro_cursor(self.headers)?; - // Position the cursor at the last header we currently have + // First, check if a block at this specific height already exists + match ro.get(self.headers, &block_height_bytes) { + Ok(stored_header_bytes) => { + // Block exists at this height - verify it's the same block + // Data is stored as StoredEntryVar, so deserialize properly + let stored_entry = StoredEntryVar::::from_bytes(stored_header_bytes) + .map_err(|e| FinalisedStateError::Custom(format!( + "header decode error during idempotency check: {e}" + )))?; + let stored_header = stored_entry.inner(); + if *stored_header.index().hash() == block_hash { + // Same block already written, this is a no-op success + return Ok(true); + } else { + return Err(FinalisedStateError::Custom(format!( + "block at height {block_height:?} already exists with different hash \ + (stored: {:?}, incoming: {:?})", + stored_header.index().hash(), + block_hash + ))); + } + } + Err(lmdb::Error::NotFound) => { + // Block doesn't exist at this height, check if it's the next in sequence + } + Err(e) => return Err(FinalisedStateError::LmdbError(e)), + } + + // Now verify this is the next block in the chain + let cur = ro.open_ro_cursor(self.headers)?; match cur.get(None, None, lmdb_sys::MDB_LAST) { // Database already has blocks Ok((last_height_bytes, _last_header_bytes)) => { @@ -984,9 +1013,19 @@ impl DbV1 { } Err(e) => return Err(FinalisedStateError::LmdbError(e)), } - Ok::<_, FinalisedStateError>(()) + Ok::<_, FinalisedStateError>(false) })?; + // If block already exists with same hash, return success without re-writing + if block_already_exists { + self.status.store(StatusType::Ready); + info!( + "Block {} at height {} already exists in ZainoDB, skipping write.", + &block_hash, &block_height.0 + ); + return Ok(()); + } + // Build DBHeight let height_entry = StoredEntryFixed::new(&block_hash_bytes, block.index().height()); @@ -1311,13 +1350,74 @@ impl DbV1 { Ok(()) } + Err(FinalisedStateError::LmdbError(lmdb::Error::KeyExist)) => { + // Block write failed because key already exists - another process wrote it + // between our check and our write. Wait briefly and verify it's the same block. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let height_bytes = block_height.to_bytes()?; + let verification_result = tokio::task::block_in_place(|| { + // Sync to see latest commits from other processes + self.env.sync(true).ok(); + let ro = self.env.begin_ro_txn()?; + match ro.get(self.headers, &height_bytes) { + Ok(stored_header_bytes) => { + // Data is stored as StoredEntryVar + let stored_entry = StoredEntryVar::::from_bytes(stored_header_bytes) + .map_err(|e| FinalisedStateError::Custom(format!( + "header decode error in KeyExist handler: {e}" + )))?; + let stored_header = stored_entry.inner(); + if *stored_header.index().hash() == block_hash { + Ok(true) // Block already written correctly + } else { + Err(FinalisedStateError::Custom(format!( + "KeyExist race: different block at height {} \ + (stored: {:?}, incoming: {:?})", + block_height.0, + stored_header.index().hash(), + block_hash + ))) + } + } + Err(lmdb::Error::NotFound) => { + Err(FinalisedStateError::Custom(format!( + "KeyExist but block not found at height {} after sync", + block_height.0 + ))) + } + Err(e) => Err(FinalisedStateError::LmdbError(e)), + } + }); + + match verification_result { + Ok(_) => { + // Block was already written correctly by another process + self.status.store(StatusType::Ready); + info!( + "Block {} at height {} was already written by another process, skipping.", + &block_hash, &block_height.0 + ); + Ok(()) + } + Err(e) => { + warn!("Error writing block to DB: {e}"); + self.status.store(StatusType::RecoverableError); + Err(FinalisedStateError::InvalidBlock { + height: block_height.0, + hash: block_hash, + reason: e.to_string(), + }) + } + } + } Err(e) => { - warn!("Error writing block to DB."); + warn!("Error writing block to DB: {e}"); - let _ = self.delete_block(&block).await; + // let _ = self.delete_block(&block).await; tokio::task::block_in_place(|| self.env.sync(true)) .map_err(|e| FinalisedStateError::Custom(format!("LMDB sync failed: {e}")))?; - self.status.store(StatusType::RecoverableError); + self.status.store(StatusType::CriticalError); Err(FinalisedStateError::InvalidBlock { height: block_height.0, hash: block_hash, diff --git a/zaino-state/src/chain_index/mempool.rs b/zaino-state/src/chain_index/mempool.rs index d0692af0c..0fdff2d70 100644 --- a/zaino-state/src/chain_index/mempool.rs +++ b/zaino-state/src/chain_index/mempool.rs @@ -5,7 +5,8 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ broadcast::{Broadcast, BroadcastSubscriber}, chain_index::{ - source::{BlockchainSource, BlockchainSourceError}, types::db::metadata::MempoolInfo, + source::{BlockchainSource, BlockchainSourceError}, + types::db::metadata::MempoolInfo, }, error::{MempoolError, StatusError}, status::{AtomicStatus, StatusType}, diff --git a/zaino-state/src/chain_index/tests/finalised_state/v0.rs b/zaino-state/src/chain_index/tests/finalised_state/v0.rs index d4094895a..42057c867 100644 --- a/zaino-state/src/chain_index/tests/finalised_state/v0.rs +++ b/zaino-state/src/chain_index/tests/finalised_state/v0.rs @@ -5,7 +5,7 @@ use tempfile::TempDir; use zaino_common::network::ActivationHeights; use zaino_common::{DatabaseConfig, Network, StorageConfig}; -use zaino_proto::proto::utils::PoolTypeFilter; +use zaino_proto::proto::utils::{compact_block_with_pool_types, PoolTypeFilter}; use crate::chain_index::finalised_state::reader::DbReader; use crate::chain_index::finalised_state::ZainoDB; @@ -15,7 +15,6 @@ use crate::chain_index::tests::vectors::{ build_mockchain_source, load_test_vectors, TestVectorBlockData, TestVectorData, }; use crate::error::FinalisedStateError; -use crate::local_cache::compact_block_with_pool_types; use crate::{BlockCacheConfig, BlockMetadata, BlockWithMetadata, ChainWork, Height, IndexedBlock}; pub(crate) async fn spawn_v0_zaino_db( diff --git a/zaino-state/src/chain_index/tests/finalised_state/v1.rs b/zaino-state/src/chain_index/tests/finalised_state/v1.rs index bb25a45ca..f31c10aaf 100644 --- a/zaino-state/src/chain_index/tests/finalised_state/v1.rs +++ b/zaino-state/src/chain_index/tests/finalised_state/v1.rs @@ -5,7 +5,7 @@ use tempfile::TempDir; use zaino_common::network::ActivationHeights; use zaino_common::{DatabaseConfig, Network, StorageConfig}; -use zaino_proto::proto::utils::PoolTypeFilter; +use zaino_proto::proto::utils::{compact_block_with_pool_types, PoolTypeFilter}; use crate::chain_index::finalised_state::capability::IndexedBlockExt; use crate::chain_index::finalised_state::db::DbBackend; @@ -18,7 +18,6 @@ use crate::chain_index::tests::vectors::{ }; use crate::chain_index::types::TransactionHash; use crate::error::FinalisedStateError; -use crate::local_cache::compact_block_with_pool_types; use crate::{ AddrScript, BlockCacheConfig, BlockMetadata, BlockWithMetadata, ChainWork, Height, IndexedBlock, Outpoint, diff --git a/zaino-state/src/config.rs b/zaino-state/src/config.rs index ed2b7376c..a7edf5ce5 100644 --- a/zaino-state/src/config.rs +++ b/zaino-state/src/config.rs @@ -26,7 +26,7 @@ pub enum BackendConfig { /// Holds config data for [crate::StateService]. #[derive(Debug, Clone)] -#[deprecated] +// #[deprecated] pub struct StateServiceConfig { /// Zebra [`zebra_state::ReadStateService`] config data pub validator_state_config: zebra_state::Config, diff --git a/zaino-state/src/error.rs b/zaino-state/src/error.rs index 45a3c85be..3229bbab7 100644 --- a/zaino-state/src/error.rs +++ b/zaino-state/src/error.rs @@ -11,7 +11,7 @@ use zaino_fetch::jsonrpsee::connector::RpcRequestError; use zaino_proto::proto::utils::GetBlockRangeError; /// Errors related to the `StateService`. -#[deprecated] +// #[deprecated] #[derive(Debug, thiserror::Error)] pub enum StateServiceError { /// An rpc-specific error we haven't accounted for diff --git a/zaino-state/src/indexer.rs b/zaino-state/src/indexer.rs index bfa1ac6e2..164f5d3ff 100644 --- a/zaino-state/src/indexer.rs +++ b/zaino-state/src/indexer.rs @@ -35,7 +35,7 @@ use zebra_rpc::{ }; use crate::{ - status::StatusType, + status::Status, stream::{ AddressStream, CompactBlockStream, CompactTransactionStream, RawTransactionStream, SubtreeRootReplyStream, UtxoReplyStream, @@ -80,13 +80,16 @@ where } /// Zcash Service functionality. +/// +/// Implementors automatically gain [`Liveness`](zaino_common::probing::Liveness) and +/// [`Readiness`](zaino_common::probing::Readiness) via the [`Status`] supertrait. #[async_trait] -pub trait ZcashService: Sized { +pub trait ZcashService: Sized + Status { /// Backend type. Read state or fetch service. const BACKEND_TYPE: BackendType; /// A subscriber to the service, used to fetch chain data. - type Subscriber: Clone + ZcashIndexer + LightWalletIndexer; + type Subscriber: Clone + ZcashIndexer + LightWalletIndexer + Status; /// Service Config. type Config: Clone; @@ -98,9 +101,6 @@ pub trait ZcashService: Sized { /// Returns a [`IndexerSubscriber`]. fn get_subscriber(&self) -> IndexerSubscriber; - /// Fetches the current status - async fn status(&self) -> StatusType; - /// Shuts down the StateService. fn close(&mut self); } diff --git a/zaino-state/src/lib.rs b/zaino-state/src/lib.rs index eb676fedd..a4c7138ea 100644 --- a/zaino-state/src/lib.rs +++ b/zaino-state/src/lib.rs @@ -27,7 +27,6 @@ pub use backends::{ state::{StateService, StateServiceSubscriber}, }; -// NOTE: This will replace local_cache. Currently WIP. pub mod chain_index; // Core ChainIndex trait and implementations @@ -51,8 +50,6 @@ pub use chain_index::types::{ TreeRootData, TxInCompact, TxLocation, TxOutCompact, TxidList, }; -pub(crate) mod local_cache; - pub use chain_index::mempool::{MempoolKey, MempoolValue}; #[cfg(feature = "test_dependencies")] @@ -62,7 +59,8 @@ pub mod test_dependencies { pub mod chain_index { pub use crate::chain_index::*; } - pub use crate::{config::BlockCacheConfig, local_cache::*}; + + pub use crate::BlockCacheConfig; } pub(crate) mod config; @@ -79,7 +77,7 @@ pub use error::{FetchServiceError, StateServiceError}; pub(crate) mod status; -pub use status::{AtomicStatus, StatusType}; +pub use status::{AtomicStatus, Status, StatusType}; pub(crate) mod stream; diff --git a/zaino-state/src/local_cache.rs b/zaino-state/src/local_cache.rs deleted file mode 100644 index 967b9bbf0..000000000 --- a/zaino-state/src/local_cache.rs +++ /dev/null @@ -1,450 +0,0 @@ -//! Holds Zaino's local compact block cache implementation. - -use std::any::type_name; - -#[allow(deprecated)] -use crate::{ - config::BlockCacheConfig, error::BlockCacheError, status::StatusType, StateServiceSubscriber, -}; - -pub mod finalised_state; -pub mod non_finalised_state; - -use finalised_state::{FinalisedState, FinalisedStateSubscriber}; -use non_finalised_state::{NonFinalisedState, NonFinalisedStateSubscriber}; -use tracing::info; -use zaino_fetch::{ - chain::block::FullBlock, - jsonrpsee::{ - connector::{JsonRpSeeConnector, RpcRequestError}, - error::TransportError, - response::{GetBlockError, GetBlockResponse}, - }, -}; -use zaino_proto::proto::{ - compact_formats::{ChainMetadata, CompactBlock, CompactOrchardAction}, - service::PoolType, - utils::PoolTypeFilter, -}; -use zebra_chain::{ - block::{Hash, Height}, - parameters::Network, -}; -use zebra_rpc::methods::{GetBlock, GetBlockTransaction}; -use zebra_state::{HashOrHeight, ReadStateService}; - -/// Zaino's internal compact block cache. -/// -/// Used by the FetchService for efficiency. -#[derive(Debug)] -pub struct BlockCache { - fetcher: JsonRpSeeConnector, - state: Option, - non_finalised_state: NonFinalisedState, - /// The state below the last 100 blocks, determined - /// to be probabalistically nonreorgable - pub finalised_state: Option, - config: BlockCacheConfig, -} - -impl BlockCache { - /// Spawns a new [`BlockCache`]. - /// - /// Inputs: - /// - fetcher: JsonRPC client. - /// - state: Zebra ReadStateService. - /// - config: Block cache configuration data. - pub async fn spawn( - fetcher: &JsonRpSeeConnector, - state: Option<&ReadStateService>, - config: BlockCacheConfig, - ) -> Result { - info!("Launching Local Block Cache.."); - let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(100); - - let db_size = config.storage.database.size; - let finalised_state = match db_size { - zaino_common::DatabaseSize::Gb(0) => None, - zaino_common::DatabaseSize::Gb(_) => { - Some(FinalisedState::spawn(fetcher, state, channel_rx, config.clone()).await?) - } - }; - - let non_finalised_state = - NonFinalisedState::spawn(fetcher, state, channel_tx, config.clone()).await?; - - Ok(BlockCache { - fetcher: fetcher.clone(), - state: state.cloned(), - non_finalised_state, - finalised_state, - config, - }) - } - - /// Returns a [`BlockCacheSubscriber`]. - pub fn subscriber(&self) -> BlockCacheSubscriber { - let finalised_state_subscriber = self - .finalised_state - .as_ref() - .map(FinalisedState::subscriber); - BlockCacheSubscriber { - fetcher: self.fetcher.clone(), - state: self.state.clone(), - non_finalised_state: self.non_finalised_state.subscriber(), - finalised_state: finalised_state_subscriber, - config: self.config.clone(), - } - } - - /// Returns the status of the block cache. - pub fn status(&self) -> StatusType { - let non_finalised_state_status = self.non_finalised_state.status(); - let finalised_state_status = match self.config.storage.database.size { - zaino_common::DatabaseSize::Gb(0) => StatusType::Ready, - zaino_common::DatabaseSize::Gb(_) => match &self.finalised_state { - Some(finalised_state) => finalised_state.status(), - None => return StatusType::Offline, - }, - }; - - non_finalised_state_status.combine(finalised_state_status) - } - - /// Sets the block cache to close gracefully. - pub fn close(&mut self) { - self.non_finalised_state.close(); - if self.finalised_state.is_some() { - self.finalised_state - .take() - .expect("error taking Option<(Some)finalised_state> in block_cache::close") - .close(); - } - } -} - -/// A subscriber to a [`BlockCache`]. -#[derive(Debug, Clone)] -pub struct BlockCacheSubscriber { - fetcher: JsonRpSeeConnector, - state: Option, - /// the last 100 blocks, stored separately as it could - /// be changed by reorgs - pub non_finalised_state: NonFinalisedStateSubscriber, - /// The state below the last 100 blocks, determined - /// to be probabalistically nonreorgable - pub finalised_state: Option, - config: BlockCacheConfig, -} - -impl BlockCacheSubscriber { - /// Returns a Compact Block from the [`BlockCache`]. - pub async fn get_compact_block( - &self, - hash_or_height: String, - ) -> Result { - let hash_or_height: HashOrHeight = hash_or_height.parse()?; - - if self - .non_finalised_state - .contains_hash_or_height(hash_or_height) - .await - { - // Fetch from non-finalised state. - self.non_finalised_state - .get_compact_block(hash_or_height) - .await - .map_err(Into::into) - } else { - match &self.finalised_state { - // Fetch from finalised state. - Some(finalised_state) => finalised_state - .get_compact_block(hash_or_height) - .await - .map_err(Into::into), - // Fetch from Validator. - None => { - let (_, block) = fetch_block_from_node( - self.state.as_ref(), - Some(&self.config.network.to_zebra_network()), - &self.fetcher, - hash_or_height, - ) - .await - .map_err(|e| BlockCacheError::Custom(e.to_string()))?; - Ok(block) - } - } - } - } - - /// Returns a compact block holding only action nullifiers. - /// - /// NOTE: Currently this only returns Orchard nullifiers to follow Lightwalletd functionality but Sapling could be added if required by wallets. - pub async fn get_compact_block_nullifiers( - &self, - hash_or_height: String, - ) -> Result { - self.get_compact_block(hash_or_height) - .await - .map(compact_block_to_nullifiers) - } - - /// Returns the height of the latest block in the [`BlockCache`]. - pub async fn get_chain_height(&self) -> Result { - self.non_finalised_state - .get_chain_height() - .await - .map_err(BlockCacheError::NonFinalisedStateError) - } - - /// Returns the status of the [`BlockCache`].. - pub fn status(&self) -> StatusType { - let non_finalised_state_status = self.non_finalised_state.status(); - let finalised_state_status = match self.config.storage.database.size { - zaino_common::DatabaseSize::Gb(0) => StatusType::Ready, - zaino_common::DatabaseSize::Gb(_) => match &self.finalised_state { - Some(finalised_state) => finalised_state.status(), - None => return StatusType::Offline, - }, - }; - - non_finalised_state_status.combine(finalised_state_status) - } -} - -/// Fetches CompactBlock from the validator. -/// -/// Uses 2 calls as z_get_block verbosity=1 is required to fetch txids from zcashd. -pub(crate) async fn fetch_block_from_node( - state: Option<&ReadStateService>, - network: Option<&Network>, - fetcher: &JsonRpSeeConnector, - hash_or_height: HashOrHeight, -) -> Result<(Hash, CompactBlock), RpcRequestError> { - if let (Some(state), Some(network)) = (state, network) { - match try_state_path(state, network, hash_or_height).await { - Ok(result) => return Ok(result), - Err(e) => { - eprintln!("StateService fallback triggered due to: {e}"); - } - } - } - try_fetcher_path(fetcher, hash_or_height).await -} - -#[allow(deprecated)] -async fn try_state_path( - state: &ReadStateService, - network: &Network, - hash_or_height: HashOrHeight, -) -> Result<(Hash, CompactBlock), BlockCacheError> { - let (hash, tx, trees) = - StateServiceSubscriber::get_block_inner(state, network, hash_or_height, Some(1)) - .await - .map_err(|e| { - eprintln!("{e}"); - BlockCacheError::Custom("Error retrieving block from ReadStateService".to_string()) - }) - .and_then(|response| match response { - GetBlock::Raw(_) => Err(BlockCacheError::Custom( - "Found transaction of `Raw` type, expected only `Hash` types.".to_string(), - )), - GetBlock::Object(block_obj) => { - Ok((block_obj.hash(), block_obj.tx().clone(), block_obj.trees())) - } - })?; - - StateServiceSubscriber::get_block_inner(state, network, hash_or_height, Some(0)) - .await - .map_err(|_| { - BlockCacheError::Custom("Error retrieving raw block from ReadStateService".to_string()) - }) - .and_then(|response| match response { - GetBlock::Object { .. } => Err(BlockCacheError::Custom( - "Found transaction of `Object` type, expected only `Hash` types.".to_string(), - )), - GetBlock::Raw(block_hex) => { - let txid_strings = tx - .iter() - .filter_map(|t| { - if let GetBlockTransaction::Hash(h) = t { - Some(h.to_string()) - } else { - None - } - }) - .collect::>(); - - Ok(( - hash, - FullBlock::parse_from_hex( - block_hex.as_ref(), - Some(display_txids_to_server(txid_strings)?), - )? - .into_compact_block( - u32::try_from(trees.sapling())?, - u32::try_from(trees.orchard())?, - PoolTypeFilter::includes_all(), - )?, - )) - } - }) -} - -async fn try_fetcher_path( - fetcher: &JsonRpSeeConnector, - hash_or_height: HashOrHeight, -) -> Result<(Hash, CompactBlock), RpcRequestError> { - let (hash, tx, trees) = fetcher - .get_block(hash_or_height.to_string(), Some(1)) - .await - .and_then(|response| match response { - GetBlockResponse::Raw(_) => { - Err(RpcRequestError::Transport(TransportError::BadNodeData( - Box::new(std::io::Error::other("unexpected raw block response")), - type_name::(), - ))) - } - GetBlockResponse::Object(block) => Ok((block.hash, block.tx, block.trees)), - })?; - - fetcher - .get_block(hash.0.to_string(), Some(0)) - .await - .and_then(|response| match response { - GetBlockResponse::Object { .. } => { - Err(RpcRequestError::Transport(TransportError::BadNodeData( - Box::new(std::io::Error::other("unexpected object block response")), - type_name::(), - ))) - } - GetBlockResponse::Raw(block_hex) => Ok(( - hash.0, - FullBlock::parse_from_hex( - block_hex.as_ref(), - Some(display_txids_to_server(tx).map_err(|e| { - RpcRequestError::Transport(TransportError::BadNodeData( - Box::new(e), - type_name::(), - )) - })?), - ) - .map_err(|e| { - RpcRequestError::Transport(TransportError::BadNodeData( - Box::new(e), - type_name::(), - )) - })? - .into_compact_block( - u32::try_from(trees.sapling()).map_err(|e| { - RpcRequestError::Transport(TransportError::BadNodeData( - Box::new(e), - type_name::(), - )) - })?, - u32::try_from(trees.orchard()).map_err(|e| { - RpcRequestError::Transport(TransportError::BadNodeData( - Box::new(e), - type_name::(), - )) - })?, - PoolTypeFilter::includes_all(), - ) - .map_err(|e| { - RpcRequestError::Transport(TransportError::BadNodeData( - Box::new(e), - type_name::(), - )) - })?, - )), - }) -} - -/// Takes a vec of big endian hex encoded txids and returns them as a vec of little endian raw bytes. -pub(crate) fn display_txids_to_server(txids: Vec) -> Result>, BlockCacheError> { - txids - .iter() - .map(|txid| { - txid.as_bytes() - .chunks(2) - .map(|chunk| { - let hex_pair = std::str::from_utf8(chunk).map_err(BlockCacheError::from)?; - u8::from_str_radix(hex_pair, 16).map_err(BlockCacheError::from) - }) - .rev() - .collect::, _>>() - }) - .collect::>, _>>() -} - -/// prunes a compact block from transaction in formation related to pools not included in the -/// `pool_types` vector. -/// Note: for backwards compatibility an empty vector will return Sapling and Orchard Tx info. -pub(crate) fn compact_block_with_pool_types( - mut block: CompactBlock, - pool_types: &[PoolType], -) -> CompactBlock { - if pool_types.is_empty() { - for compact_tx in &mut block.vtx { - // strip out transparent inputs if not Requested - compact_tx.vin.clear(); - compact_tx.vout.clear(); - } - - // Omit transactions that have no Sapling/Orchard elements. - block.vtx.retain(|compact_tx| { - !compact_tx.spends.is_empty() - || !compact_tx.outputs.is_empty() - || !compact_tx.actions.is_empty() - }); - } else { - for compact_tx in &mut block.vtx { - // strip out transparent inputs if not Requested - if !pool_types.contains(&PoolType::Transparent) { - compact_tx.vin.clear(); - compact_tx.vout.clear(); - } - // strip out sapling if not requested - if !pool_types.contains(&PoolType::Sapling) { - compact_tx.spends.clear(); - compact_tx.outputs.clear(); - } - // strip out orchard if not requested - if !pool_types.contains(&PoolType::Orchard) { - compact_tx.actions.clear(); - } - } - - // Omit transactions that have no elements in any requested pool type. - block.vtx.retain(|compact_tx| { - !compact_tx.vin.is_empty() - || !compact_tx.vout.is_empty() - || !compact_tx.spends.is_empty() - || !compact_tx.outputs.is_empty() - || !compact_tx.actions.is_empty() - }); - } - - block -} -/// Strips the ouputs and from all transactions, retains only -/// the nullifier from all orcard actions, and clears the chain -/// metadata from the block -pub(crate) fn compact_block_to_nullifiers(mut block: CompactBlock) -> CompactBlock { - for ctransaction in &mut block.vtx { - ctransaction.outputs = Vec::new(); - for caction in &mut ctransaction.actions { - *caction = CompactOrchardAction { - nullifier: caction.nullifier.clone(), - ..Default::default() - } - } - } - - block.chain_metadata = Some(ChainMetadata { - sapling_commitment_tree_size: 0, - orchard_commitment_tree_size: 0, - }); - block -} diff --git a/zaino-state/src/local_cache/finalised_state.rs b/zaino-state/src/local_cache/finalised_state.rs deleted file mode 100644 index 85b0ed591..000000000 --- a/zaino-state/src/local_cache/finalised_state.rs +++ /dev/null @@ -1,724 +0,0 @@ -//! Compact Block Cache finalised state implementation. - -use lmdb::{Cursor, Database, Environment, Transaction}; -use prost::Message; -use serde::{Deserialize, Serialize}; -use std::{fs, sync::Arc}; -use tracing::{error, info, warn}; - -use zebra_chain::{ - block::{Hash, Height}, - parameters::NetworkKind, -}; -use zebra_state::{HashOrHeight, ReadStateService}; - -use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; -use zaino_proto::proto::compact_formats::CompactBlock; - -use crate::{ - config::BlockCacheConfig, - error::FinalisedStateError, - local_cache::fetch_block_from_node, - status::{AtomicStatus, StatusType}, -}; - -/// Wrapper for `Height`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -struct DbHeight(pub Height); - -impl DbHeight { - /// Converts `[DbHeight]` to 4-byte **big-endian** bytes. - /// Used when storing as an LMDB key. - fn to_be_bytes(self) -> [u8; 4] { - self.0 .0.to_be_bytes() - } - - /// Parse a 4-byte **big-endian** array into a `[DbHeight]`. - fn from_be_bytes(bytes: &[u8]) -> Result { - let arr: [u8; 4] = bytes - .try_into() - .map_err(|_| FinalisedStateError::Custom("Invalid height key length".to_string()))?; - Ok(DbHeight(Height(u32::from_be_bytes(arr)))) - } -} - -/// Wrapper for `Hash`. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -struct DbHash(pub Hash); - -/// Wrapper for `CompactBlock`. -#[derive(Debug, Clone, PartialEq)] -struct DbCompactBlock(pub CompactBlock); - -/// Custom `Serialize` implementation using Prost's `encode_to_vec()`. -impl Serialize for DbCompactBlock { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let bytes = self.0.encode_to_vec(); - serializer.serialize_bytes(&bytes) - } -} - -/// Custom `Deserialize` implementation using Prost's `decode()`. -impl<'de> Deserialize<'de> for DbCompactBlock { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let bytes: Vec = serde::de::Deserialize::deserialize(deserializer)?; - CompactBlock::decode(&*bytes) - .map(DbCompactBlock) - .map_err(serde::de::Error::custom) - } -} - -/// A Zaino database request. -#[derive(Debug)] -struct DbRequest { - hash_or_height: HashOrHeight, - response_channel: tokio::sync::oneshot::Sender>, -} - -impl DbRequest { - /// Creates a new [`DbRequest`]. - fn new( - hash_or_height: HashOrHeight, - response_channel: tokio::sync::oneshot::Sender>, - ) -> Self { - Self { - hash_or_height, - response_channel, - } - } -} - -/// Fanalised part of the chain, held in an LMDB database. -#[derive(Debug)] -pub struct FinalisedState { - /// JsonRPC client based chain fetch service. - fetcher: JsonRpSeeConnector, - /// Optional ReadStateService based chain fetch service. - state: Option, - /// LMDB Database Environmant. - database: Arc, - /// LMDB Database containing ``. - heights_to_hashes: Database, - /// LMDB Database containing ``. - hashes_to_blocks: Database, - /// Database reader request sender. - request_sender: tokio::sync::mpsc::Sender, - /// Database reader task handle. - read_task_handle: Option>, - /// Database writer task handle. - write_task_handle: Option>, - /// Non-finalised state status. - status: AtomicStatus, - /// BlockCache config data. - config: BlockCacheConfig, -} - -impl FinalisedState { - /// Spawns a new [`Self`] and syncs the FinalisedState to the servers finalised state. - /// - /// Inputs: - /// - fetcher: Json RPC client. - /// - db_path: File path of the db. - /// - db_size: Max size of the db in gb. - /// - block_reciever: Channel that recieves new blocks to add to the db. - /// - status_signal: Used to send error status signals to outer processes. - pub async fn spawn( - fetcher: &JsonRpSeeConnector, - state: Option<&ReadStateService>, - block_receiver: tokio::sync::mpsc::Receiver<(Height, Hash, CompactBlock)>, - config: BlockCacheConfig, - ) -> Result { - info!("Launching Finalised State.."); - let db_size_bytes = config.storage.database.size.to_byte_count(); - let db_path_dir = match config.network.to_zebra_network().kind() { - NetworkKind::Mainnet => "live", - NetworkKind::Testnet => "test", - NetworkKind::Regtest => "local", - }; - let db_path = config.storage.database.path.join(db_path_dir); - if !db_path.exists() { - fs::create_dir_all(&db_path)?; - } - let database = Arc::new( - Environment::new() - .set_max_dbs(2) - .set_map_size(db_size_bytes) - .open(&db_path)?, - ); - - let heights_to_hashes = match database.open_db(Some("heights_to_hashes")) { - Ok(db) => db, - Err(lmdb::Error::NotFound) => { - database.create_db(Some("heights_to_hashes"), lmdb::DatabaseFlags::empty())? - } - Err(e) => return Err(FinalisedStateError::LmdbError(e)), - }; - let hashes_to_blocks = match database.open_db(Some("hashes_to_blocks")) { - Ok(db) => db, - Err(lmdb::Error::NotFound) => { - database.create_db(Some("hashes_to_blocks"), lmdb::DatabaseFlags::empty())? - } - Err(e) => return Err(FinalisedStateError::LmdbError(e)), - }; - - let (request_tx, request_rx) = tokio::sync::mpsc::channel(124); - - let mut finalised_state = FinalisedState { - fetcher: fetcher.clone(), - state: state.cloned(), - database, - heights_to_hashes, - hashes_to_blocks, - request_sender: request_tx, - read_task_handle: None, - write_task_handle: None, - status: AtomicStatus::new(StatusType::Spawning), - config, - }; - - finalised_state.sync_db_from_reorg().await?; - finalised_state.spawn_writer(block_receiver).await?; - finalised_state.spawn_reader(request_rx).await?; - - finalised_state.status.store(StatusType::Ready); - - Ok(finalised_state) - } - - async fn spawn_writer( - &mut self, - mut block_receiver: tokio::sync::mpsc::Receiver<(Height, Hash, CompactBlock)>, - ) -> Result<(), FinalisedStateError> { - let finalised_state = Self { - fetcher: self.fetcher.clone(), - state: self.state.clone(), - database: Arc::clone(&self.database), - heights_to_hashes: self.heights_to_hashes, - hashes_to_blocks: self.hashes_to_blocks, - request_sender: self.request_sender.clone(), - read_task_handle: None, - write_task_handle: None, - status: self.status.clone(), - config: self.config.clone(), - }; - - let writer_handle = tokio::spawn(async move { - while let Some((height, mut hash, mut compact_block)) = block_receiver.recv().await { - let mut retry_attempts = 3; - - loop { - match finalised_state.insert_block((height, hash, compact_block.clone())) { - Ok(_) => { - info!( - "Block at height [{}] with hash [{}] successfully committed to finalised state.", - height.0, hash - ); - break; - } - Err(FinalisedStateError::LmdbError(lmdb::Error::KeyExist)) => { - match finalised_state.get_hash(height.0) { - Ok(db_hash) => { - if db_hash != hash { - if finalised_state.delete_block(height).is_err() { - finalised_state.status.store(StatusType::CriticalError); - return; - }; - continue; - } else { - info!( - "Block at height {} already exists, skipping.", - height.0 - ); - break; - } - } - Err(_) => { - finalised_state.status.store(StatusType::CriticalError); - return; - } - } - } - Err(FinalisedStateError::LmdbError(db_err)) => { - error!("LMDB error inserting block {}: {:?}", height.0, db_err); - finalised_state.status.store(StatusType::CriticalError); - return; - } - Err(e) => { - warn!( - "Unknown error inserting block {}: {:?}. Retrying...", - height.0, e - ); - - if retry_attempts == 0 { - error!( - "Failed to insert block {} after multiple retries.", - height.0 - ); - finalised_state.status.store(StatusType::CriticalError); - return; - } - - retry_attempts -= 1; - - match fetch_block_from_node( - finalised_state.state.as_ref(), - Some(&finalised_state.config.network.to_zebra_network()), - &finalised_state.fetcher, - HashOrHeight::Height(height), - ) - .await - { - Ok((new_hash, new_compact_block)) => { - warn!( - "Re-fetched block at height {}, retrying insert.", - height.0 - ); - hash = new_hash; - compact_block = new_compact_block; - } - Err(fetch_err) => { - error!( - "Failed to fetch block {} from validator: {:?}", - height.0, fetch_err - ); - finalised_state.status.store(StatusType::CriticalError); - return; - } - } - - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - } - }); - - self.write_task_handle = Some(writer_handle); - Ok(()) - } - - async fn spawn_reader( - &mut self, - mut request_receiver: tokio::sync::mpsc::Receiver, - ) -> Result<(), FinalisedStateError> { - let finalised_state = Self { - fetcher: self.fetcher.clone(), - state: self.state.clone(), - database: Arc::clone(&self.database), - heights_to_hashes: self.heights_to_hashes, - hashes_to_blocks: self.hashes_to_blocks, - request_sender: self.request_sender.clone(), - read_task_handle: None, - write_task_handle: None, - status: self.status.clone(), - config: self.config.clone(), - }; - - let reader_handle = tokio::spawn(async move { - while let Some(DbRequest { - hash_or_height, - response_channel, - }) = request_receiver.recv().await - { - let response = match finalised_state.get_block(hash_or_height) { - Ok(block) => Ok(block), - Err(_) => { - warn!("Failed to fetch block from DB, re-fetching from validator."); - match fetch_block_from_node( - finalised_state.state.as_ref(), - Some(&finalised_state.config.network.to_zebra_network()), - &finalised_state.fetcher, - hash_or_height, - ) - .await - { - Ok((hash, block)) => { - match finalised_state.insert_block(( - Height(block.height as u32), - hash, - block.clone(), - )) { - Ok(_) => Ok(block), - Err(_) => { - warn!("Failed to insert missing block into DB, serving from validator."); - Ok(block) - } - } - } - Err(_) => Err(FinalisedStateError::Custom(format!( - "Block {hash_or_height:?} not found in finalised state or validator." - ))), - } - } - }; - - if response_channel.send(response).is_err() { - warn!("Failed to send response for request: {:?}", hash_or_height); - } - } - }); - - self.read_task_handle = Some(reader_handle); - Ok(()) - } - - /// Syncs database with the server, and waits for server to sync with P2P network. - /// - /// Checks for reorg before syncing: - /// - Searches from ZainoDB tip backwards looking for the last valid block in the database and sets `reorg_height` to the last VALID block. - /// - Re-populated the database from the NEXT block in the chain (`reorg_height + 1`). - async fn sync_db_from_reorg(&self) -> Result<(), FinalisedStateError> { - let network = self.config.network.to_zebra_network(); - - let mut reorg_height = self.get_db_height().unwrap_or(Height(0)); - // let reorg_height_int = reorg_height.0.saturating_sub(100); - // reorg_height = Height(reorg_height_int); - - let mut reorg_hash = self.get_hash(reorg_height.0).unwrap_or(Hash([0u8; 32])); - - let mut check_hash = match self - .fetcher - .get_block(reorg_height.0.to_string(), Some(1)) - .await? - { - zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block) => block.hash.0, - _ => { - return Err(FinalisedStateError::Custom( - "Unexpected block response type".to_string(), - )) - } - }; - - // Find reorg height. - // - // Here this is the latest height at which the internal block hash matches the server block hash. - while reorg_hash != check_hash { - match reorg_height.previous() { - Ok(height) => reorg_height = height, - // Underflow error meaning reorg_height = start of chain. - // This means the whole finalised state is old or corrupt. - Err(_) => { - { - let mut txn = self.database.begin_rw_txn()?; - txn.clear_db(self.heights_to_hashes)?; - txn.clear_db(self.hashes_to_blocks)?; - txn.commit()?; - } - break; - } - }; - - reorg_hash = self.get_hash(reorg_height.0).unwrap_or(Hash([0u8; 32])); - - check_hash = match self - .fetcher - .get_block(reorg_height.0.to_string(), Some(1)) - .await? - { - zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block) => block.hash.0, - _ => { - return Err(FinalisedStateError::Custom( - "Unexpected block response type".to_string(), - )) - } - }; - } - - // Refill from max(reorg_height[+1], sapling_activation_height) to current server (finalised state) height. - let mut sync_height = self - .fetcher - .get_blockchain_info() - .await? - .blocks - .0 - .saturating_sub(99); - for block_height in ((reorg_height.0 + 1).max( - self.config - .network - .to_zebra_network() - .sapling_activation_height() - .0, - ))..=sync_height - { - if self.get_hash(block_height).is_ok() { - self.delete_block(Height(block_height))?; - } - loop { - match fetch_block_from_node( - self.state.as_ref(), - Some(&network), - &self.fetcher, - HashOrHeight::Height(Height(block_height)), - ) - .await - { - Ok((hash, block)) => { - self.insert_block((Height(block_height), hash, block))?; - info!( - "Block at height {} successfully inserted in finalised state.", - block_height - ); - break; - } - Err(e) => { - self.status.store(StatusType::RecoverableError); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - } - - // Wait for server to sync to with p2p network and sync new blocks. - if !self.config.network.to_zebra_network().is_regtest() { - self.status.store(StatusType::Syncing); - loop { - let blockchain_info = self.fetcher.get_blockchain_info().await?; - let server_height = blockchain_info.blocks.0; - for block_height in (sync_height + 1)..(server_height - 99) { - if self.get_hash(block_height).is_ok() { - self.delete_block(Height(block_height))?; - } - loop { - match fetch_block_from_node( - self.state.as_ref(), - Some(&network), - &self.fetcher, - HashOrHeight::Height(Height(block_height)), - ) - .await - { - Ok((hash, block)) => { - self.insert_block((Height(block_height), hash, block))?; - info!( - "Block at height {} successfully inserted in finalised state.", - block_height - ); - break; - } - Err(e) => { - self.status.store(StatusType::RecoverableError); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - } - sync_height = server_height - 99; - if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64) - .abs() - <= 10 - { - break; - } else { - info!(" - Validator syncing with network. ZainoDB chain height: {}, Validator chain height: {}, Estimated Network chain height: {}", - &sync_height, - &blockchain_info.blocks.0, - &blockchain_info.estimated_height.0 - ); - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; - continue; - } - } - } - - self.status.store(StatusType::Ready); - - Ok(()) - } - - /// Inserts a block into the finalised state. - fn insert_block(&self, block: (Height, Hash, CompactBlock)) -> Result<(), FinalisedStateError> { - let (height, hash, compact_block) = block; - // let height_key = serde_json::to_vec(&DbHeight(height))?; - let height_key = DbHeight(height).to_be_bytes(); - let hash_key = serde_json::to_vec(&DbHash(hash))?; - let block_value = serde_json::to_vec(&DbCompactBlock(compact_block))?; - - let mut txn = self.database.begin_rw_txn()?; - if let Err(database_err) = txn - .put( - self.heights_to_hashes, - &height_key, - &hash_key, - lmdb::WriteFlags::NO_OVERWRITE, - ) - .and_then(|()| { - txn.put( - self.hashes_to_blocks, - &hash_key, - &block_value, - lmdb::WriteFlags::NO_OVERWRITE, - ) - }) - { - txn.abort(); - return Err(FinalisedStateError::LmdbError(database_err)); - } - txn.commit()?; - Ok(()) - } - - /// Deletes a block from the finalised state. - fn delete_block(&self, height: Height) -> Result<(), FinalisedStateError> { - let hash = self.get_hash(height.0)?; - // let height_key = serde_json::to_vec(&DbHeight(height))?; - let height_key = DbHeight(height).to_be_bytes(); - let hash_key = serde_json::to_vec(&DbHash(hash))?; - - let mut txn = self.database.begin_rw_txn()?; - txn.del(self.heights_to_hashes, &height_key, None)?; - txn.del(self.hashes_to_blocks, &hash_key, None)?; - txn.commit()?; - Ok(()) - } - - /// Retrieves a CompactBlock by Height or Hash. - /// - /// NOTE: It may be more efficient to implement a `get_block_range` method and batch database read calls. - fn get_block(&self, height_or_hash: HashOrHeight) -> Result { - let txn = self.database.begin_ro_txn()?; - - let hash_key = match height_or_hash { - HashOrHeight::Height(height) => { - // let height_key = serde_json::to_vec(&DbHeight(height))?; - let height_key = DbHeight(height).to_be_bytes(); - let hash_bytes: &[u8] = txn.get(self.heights_to_hashes, &height_key)?; - hash_bytes.to_vec() - } - HashOrHeight::Hash(hash) => serde_json::to_vec(&DbHash(hash))?, - }; - - let block_bytes: &[u8] = txn.get(self.hashes_to_blocks, &hash_key)?; - let block: DbCompactBlock = serde_json::from_slice(block_bytes)?; - Ok(block.0) - } - - /// Retrieves a Hash by Height. - fn get_hash(&self, height: u32) -> Result { - let txn = self.database.begin_ro_txn()?; - - // let height_key = serde_json::to_vec(&DbHeight(Height(height)))?; - let height_key = DbHeight(Height(height)).to_be_bytes(); - - let hash_bytes: &[u8] = match txn.get(self.heights_to_hashes, &height_key) { - Ok(bytes) => bytes, - Err(lmdb::Error::NotFound) => { - return Err(FinalisedStateError::Custom(format!( - "No hash found for height {height}" - ))); - } - Err(e) => return Err(FinalisedStateError::LmdbError(e)), - }; - - let hash: Hash = serde_json::from_slice(hash_bytes)?; - Ok(hash) - } - - /// Fetches the highest stored height from LMDB. - pub fn get_db_height(&self) -> Result { - let txn = self.database.begin_ro_txn()?; - let mut cursor = txn.open_ro_cursor(self.heights_to_hashes)?; - - if let Some((height_bytes, _)) = cursor.iter().last() { - // let height: DbHeight = serde_json::from_slice(height_bytes)?; - let height = DbHeight::from_be_bytes(height_bytes)?; - Ok(height.0) - } else { - Ok(Height(0)) - } - } - - /// Returns a [`FinalisedStateSubscriber`]. - pub fn subscriber(&self) -> FinalisedStateSubscriber { - FinalisedStateSubscriber { - request_sender: self.request_sender.clone(), - status: self.status.clone(), - } - } - - /// Returns the status of the finalised state. - pub fn status(&self) -> StatusType { - self.status.load() - } - - /// Sets the finalised state to close gracefully. - pub fn close(&mut self) { - self.status.store(StatusType::Closing); - if let Some(handle) = self.read_task_handle.take() { - handle.abort(); - } - if let Some(handle) = self.write_task_handle.take() { - handle.abort(); - } - - if let Err(e) = self.database.sync(true) { - error!("Error syncing LMDB before shutdown: {:?}", e); - } - } -} - -impl Drop for FinalisedState { - fn drop(&mut self) { - self.status.store(StatusType::Closing); - if let Some(handle) = self.read_task_handle.take() { - handle.abort(); - } - if let Some(handle) = self.write_task_handle.take() { - handle.abort(); - } - - if let Err(e) = self.database.sync(true) { - error!("Error syncing LMDB before shutdown: {:?}", e); - } - } -} - -/// A subscriber to a [`crate::test_dependencies::chain_index::non_finalised_state::NonFinalizedState`]. -#[derive(Debug, Clone)] -pub struct FinalisedStateSubscriber { - request_sender: tokio::sync::mpsc::Sender, - status: AtomicStatus, -} - -impl FinalisedStateSubscriber { - /// Returns a Compact Block from the non-finalised state. - pub async fn get_compact_block( - &self, - hash_or_height: HashOrHeight, - ) -> Result { - let (channel_tx, channel_rx) = tokio::sync::oneshot::channel(); - if self - .request_sender - .send(DbRequest::new(hash_or_height, channel_tx)) - .await - .is_err() - { - return Err(FinalisedStateError::Custom( - "Error sending request to db reader".to_string(), - )); - } - - let result = tokio::time::timeout(std::time::Duration::from_secs(30), channel_rx).await; - match result { - Ok(Ok(compact_block)) => compact_block, - Ok(Err(_)) => Err(FinalisedStateError::Custom( - "Error receiving block from db reader".to_string(), - )), - Err(_) => Err(FinalisedStateError::Custom( - "Timeout while waiting for compact block".to_string(), - )), - } - } - - /// Returns the status of the FinalisedState.. - pub fn status(&self) -> StatusType { - self.status.load() - } -} diff --git a/zaino-state/src/local_cache/non_finalised_state.rs b/zaino-state/src/local_cache/non_finalised_state.rs deleted file mode 100644 index aebabd388..000000000 --- a/zaino-state/src/local_cache/non_finalised_state.rs +++ /dev/null @@ -1,504 +0,0 @@ -//! Compact Block Cache non-finalised state implementation. - -use std::collections::HashSet; - -use tracing::{error, info, warn}; -use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; -use zaino_proto::proto::compact_formats::CompactBlock; -use zebra_chain::block::{Hash, Height}; -use zebra_state::{HashOrHeight, ReadStateService}; - -use crate::{ - broadcast::{Broadcast, BroadcastSubscriber}, - config::BlockCacheConfig, - error::NonFinalisedStateError, - local_cache::fetch_block_from_node, - status::{AtomicStatus, StatusType}, -}; - -/// Non-finalised part of the chain (last 100 blocks), held in memory to ease the handling of reorgs. -/// -/// NOTE: We hold the last 102 blocks to ensure there are no gaps in the block cache. -/// TODO: Use ReadStateService when available (implemented in FinalisedState). -#[derive(Debug)] -pub struct NonFinalisedState { - /// Chain fetch service. - fetcher: JsonRpSeeConnector, - /// Optional ReadStateService based chain fetch service. - state: Option, - /// Broadcast containing ``. - heights_to_hashes: Broadcast, - /// Broadcast containing ``. - hashes_to_blocks: Broadcast, - /// Sync task handle. - sync_task_handle: Option>, - /// Used to send blocks to the finalised state. - block_sender: tokio::sync::mpsc::Sender<(Height, Hash, CompactBlock)>, - /// Non-finalised state status. - status: AtomicStatus, - /// BlockCache config data. - config: BlockCacheConfig, -} - -impl NonFinalisedState { - /// Spawns a new [`NonFinalisedState`]. - pub async fn spawn( - fetcher: &JsonRpSeeConnector, - state: Option<&ReadStateService>, - block_sender: tokio::sync::mpsc::Sender<(Height, Hash, CompactBlock)>, - config: BlockCacheConfig, - ) -> Result { - info!("Launching Non-Finalised State.."); - let mut non_finalised_state = NonFinalisedState { - fetcher: fetcher.clone(), - state: state.cloned(), - heights_to_hashes: Broadcast::new( - Some(config.storage.cache.capacity), - Some(1 << config.storage.cache.shard_power), - ), - hashes_to_blocks: Broadcast::new( - Some(config.storage.cache.capacity), - Some(1 << config.storage.cache.shard_power), - ), - sync_task_handle: None, - block_sender, - status: AtomicStatus::new(StatusType::Spawning), - config, - }; - - non_finalised_state.wait_on_server().await?; - - let chain_height = fetcher - .get_blockchain_info() - .await - .map_err(|_| NonFinalisedStateError::Custom("Failed to fetch blockchain info".into()))? - .blocks - .0; - // We do not fetch pre sapling activation. - for height in chain_height.saturating_sub(99).max( - non_finalised_state - .config - .network - .to_zebra_network() - .sapling_activation_height() - .0, - )..=chain_height - { - loop { - match fetch_block_from_node( - non_finalised_state.state.as_ref(), - Some(&non_finalised_state.config.network.to_zebra_network()), - &non_finalised_state.fetcher, - HashOrHeight::Height(Height(height)), - ) - .await - { - Ok((hash, block)) => { - non_finalised_state - .heights_to_hashes - .insert(Height(height), hash, None); - non_finalised_state - .hashes_to_blocks - .insert(hash, block, None); - break; - } - Err(e) => { - non_finalised_state.update_status_and_notify(StatusType::RecoverableError); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - } - - non_finalised_state.sync_task_handle = Some(non_finalised_state.serve().await?); - - Ok(non_finalised_state) - } - - async fn serve(&self) -> Result, NonFinalisedStateError> { - let non_finalised_state = Self { - fetcher: self.fetcher.clone(), - state: self.state.clone(), - heights_to_hashes: self.heights_to_hashes.clone(), - hashes_to_blocks: self.hashes_to_blocks.clone(), - sync_task_handle: None, - block_sender: self.block_sender.clone(), - status: self.status.clone(), - config: self.config.clone(), - }; - - let sync_handle = tokio::spawn(async move { - let mut best_block_hash: Hash; - let mut check_block_hash: Hash; - - loop { - match non_finalised_state.fetcher.get_blockchain_info().await { - Ok(chain_info) => { - best_block_hash = chain_info.best_block_hash; - non_finalised_state.status.store(StatusType::Ready); - break; - } - Err(e) => { - non_finalised_state.update_status_and_notify(StatusType::RecoverableError); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - - loop { - if non_finalised_state.status.load() == StatusType::Closing { - non_finalised_state.update_status_and_notify(StatusType::Closing); - return; - } - - match non_finalised_state.fetcher.get_blockchain_info().await { - Ok(chain_info) => { - check_block_hash = chain_info.best_block_hash; - } - Err(e) => { - non_finalised_state.update_status_and_notify(StatusType::RecoverableError); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - continue; - } - } - - if check_block_hash != best_block_hash { - best_block_hash = check_block_hash; - non_finalised_state.status.store(StatusType::Syncing); - non_finalised_state - .heights_to_hashes - .notify(non_finalised_state.status.load()); - non_finalised_state - .hashes_to_blocks - .notify(non_finalised_state.status.load()); - loop { - match non_finalised_state.fill_from_reorg().await { - Ok(_) => break, - Err(NonFinalisedStateError::Critical(e)) => { - non_finalised_state - .update_status_and_notify(StatusType::CriticalError); - error!("{e}"); - return; - } - Err(e) => { - non_finalised_state - .update_status_and_notify(StatusType::RecoverableError); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - } - non_finalised_state.status.store(StatusType::Ready); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - }); - - Ok(sync_handle) - } - - /// Looks back through the chain to find reorg height and repopulates block cache. - /// - /// Newly mined blocks are treated as a reorg at chain_height[-0]. - async fn fill_from_reorg(&self) -> Result<(), NonFinalisedStateError> { - let mut reorg_height = *self - .heights_to_hashes - .get_state() - .iter() - .max_by_key(|entry| entry.key().0) - .ok_or_else(|| { - NonFinalisedStateError::MissingData( - "Failed to find the maximum height in the non-finalised state.".to_string(), - ) - })? - .key(); - - let mut reorg_hash = self.heights_to_hashes.get(&reorg_height).ok_or_else(|| { - NonFinalisedStateError::MissingData(format!( - "Missing hash for height: {}", - reorg_height.0 - )) - })?; - - let mut check_hash = match self - .fetcher - .get_block(reorg_height.0.to_string(), Some(1)) - .await? - { - zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block) => block.hash.0, - _ => { - return Err(NonFinalisedStateError::Custom( - "Unexpected block response type".to_string(), - )) - } - }; - - // Find reorg height. - // - // Here this is the latest height at which the internal block hash matches the server block hash. - while reorg_hash != check_hash.into() { - match reorg_height.previous() { - Ok(height) => reorg_height = height, - // Underflow error meaning reorg_height = start of chain. - // This means the whole non-finalised state is old. - Err(_) => { - self.heights_to_hashes.clear(); - self.hashes_to_blocks.clear(); - break; - } - }; - - reorg_hash = self.heights_to_hashes.get(&reorg_height).ok_or_else(|| { - NonFinalisedStateError::MissingData(format!( - "Missing hash for height: {}", - reorg_height.0 - )) - })?; - - check_hash = match self - .fetcher - .get_block(reorg_height.0.to_string(), Some(1)) - .await? - { - zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block) => block.hash.0, - _ => { - return Err(NonFinalisedStateError::Custom( - "Unexpected block response type".to_string(), - )) - } - }; - } - - // Refill from max(reorg_height[+1], sapling_activation_height). - // - // reorg_height + 1 is used here as reorg_height represents the last "valid" block known. - let validator_height = self - .fetcher - .get_blockchain_info() - .await - .map_err(|e| NonFinalisedStateError::Custom(e.to_string()))? - .blocks - .0; - for block_height in ((reorg_height.0 + 1).max( - self.config - .network - .to_zebra_network() - .sapling_activation_height() - .0, - ))..=validator_height - { - // Either pop the reorged block or pop the oldest block in non-finalised state. - // If we pop the oldest (valid) block we send it to the finalised state to be saved to disk. - if self.heights_to_hashes.contains_key(&Height(block_height)) { - if let Some(hash) = self.heights_to_hashes.get(&Height(block_height)) { - self.hashes_to_blocks.remove(&hash, None); - self.heights_to_hashes.remove(&Height(block_height), None); - } - } else { - let pop_height = *self - .heights_to_hashes - .get_state() - .iter() - .min_by_key(|entry| entry.key().0) - .ok_or_else(|| { - NonFinalisedStateError::MissingData( - "Failed to find the minimum height in the non-finalised state." - .to_string(), - ) - })? - .key(); - // Only pop block if it is outside of the non-finalised state block range. - if pop_height.0 < (validator_height.saturating_sub(100)) { - if let Some(hash) = self.heights_to_hashes.get(&pop_height) { - // Send to FinalisedState if db is active. - match self.config.storage.database.size { - zaino_common::DatabaseSize::Gb(0) => {} // do nothing - zaino_common::DatabaseSize::Gb(_) => { - if let Some(block) = self.hashes_to_blocks.get(&hash) { - if self - .block_sender - .send((pop_height, *hash, block.as_ref().clone())) - .await - .is_err() - { - self.status.store(StatusType::CriticalError); - return Err(NonFinalisedStateError::Critical( - "Critical error in database. Closing NonFinalisedState" - .to_string(), - )); - } - } - } - } - self.hashes_to_blocks.remove(&hash, None); - self.heights_to_hashes.remove(&pop_height, None); - } - } - } - loop { - match fetch_block_from_node( - self.state.as_ref(), - Some(&self.config.network.to_zebra_network()), - &self.fetcher, - HashOrHeight::Height(Height(block_height)), - ) - .await - { - Ok((hash, block)) => { - self.heights_to_hashes - .insert(Height(block_height), hash, None); - self.hashes_to_blocks.insert(hash, block, None); - info!( - "Block at height [{}] with hash [{}] successfully committed to non-finalised state.", - block_height, hash, - ); - break; - } - Err(e) => { - self.update_status_and_notify(StatusType::RecoverableError); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - } - - Ok(()) - } - - /// Waits for server to sync with p2p network. - pub async fn wait_on_server(&self) -> Result<(), NonFinalisedStateError> { - // If no_db is active wait for server to sync with p2p network. - let no_db = match self.config.storage.database.size { - zaino_common::DatabaseSize::Gb(0) => true, - zaino_common::DatabaseSize::Gb(_) => false, - }; - if no_db && !self.config.network.to_zebra_network().is_regtest() { - self.status.store(StatusType::Syncing); - loop { - let blockchain_info = self.fetcher.get_blockchain_info().await.map_err(|e| { - NonFinalisedStateError::Custom(format!("Failed to fetch blockchain info: {e}")) - })?; - if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64) - .abs() - <= 10 - { - break; - } else { - info!(" - Validator syncing with network. Validator chain height: {}, Estimated Network chain height: {}", - &blockchain_info.blocks.0, - &blockchain_info.estimated_height.0 - ); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - continue; - } - } - Ok(()) - } else { - Ok(()) - } - } - - /// Returns a [`NonFinalisedStateSubscriber`]. - pub fn subscriber(&self) -> NonFinalisedStateSubscriber { - NonFinalisedStateSubscriber { - heights_to_hashes: self.heights_to_hashes.subscriber(), - hashes_to_blocks: self.hashes_to_blocks.subscriber(), - status: self.status.clone(), - } - } - - /// Returns the status of the non-finalised state. - pub fn status(&self) -> StatusType { - self.status.load() - } - - /// Updates the status of the non-finalised state and notifies subscribers. - fn update_status_and_notify(&self, status: StatusType) { - self.status.store(status); - self.heights_to_hashes.notify(self.status.load()); - self.hashes_to_blocks.notify(self.status.load()); - } - - /// Sets the non-finalised state to close gracefully. - pub fn close(&mut self) { - self.update_status_and_notify(StatusType::Closing); - if let Some(handle) = self.sync_task_handle.take() { - handle.abort(); - } - } -} - -impl Drop for NonFinalisedState { - fn drop(&mut self) { - self.update_status_and_notify(StatusType::Closing); - if let Some(handle) = self.sync_task_handle.take() { - handle.abort(); - } - } -} - -/// A subscriber to a [`NonFinalisedState`]. -#[derive(Debug, Clone)] -pub struct NonFinalisedStateSubscriber { - heights_to_hashes: BroadcastSubscriber, - hashes_to_blocks: BroadcastSubscriber, - status: AtomicStatus, -} - -impl NonFinalisedStateSubscriber { - /// Returns a Compact Block from the non-finalised state. - pub async fn get_compact_block( - &self, - hash_or_height: HashOrHeight, - ) -> Result { - let hash = match hash_or_height { - HashOrHeight::Hash(hash) => hash, - HashOrHeight::Height(height) => { - *self.heights_to_hashes.get(&height).ok_or_else(|| { - NonFinalisedStateError::MissingData(format!( - "Height not found in non-finalised state: {}", - height.0 - )) - })? - } - }; - - self.hashes_to_blocks - .get(&hash) - .map(|block| block.as_ref().clone()) - .ok_or_else(|| { - NonFinalisedStateError::MissingData(format!("Block not found for hash: {hash}")) - }) - } - - /// Returns the height of the latest block in the non-finalised state. - pub async fn get_chain_height(&self) -> Result { - let (height, _) = *self - .heights_to_hashes - .get_filtered_state(&HashSet::new()) - .iter() - .max_by_key(|(height, ..)| height.0) - .ok_or_else(|| { - NonFinalisedStateError::MissingData("Non-finalised state is empty.".into()) - })?; - - Ok(height) - } - - /// Predicate checks for presence of Hash.. or Height? - pub async fn contains_hash_or_height(&self, hash_or_height: HashOrHeight) -> bool { - match hash_or_height { - HashOrHeight::Height(height) => self.heights_to_hashes.contains_key(&height), - HashOrHeight::Hash(hash) => self.hashes_to_blocks.contains_key(&hash), - } - } - - /// Returns the status of the NonFinalisedState. - pub fn status(&self) -> StatusType { - self.status.load() - } -} diff --git a/zaino-state/src/status.rs b/zaino-state/src/status.rs index e80c062ed..c92b67d59 100644 --- a/zaino-state/src/status.rs +++ b/zaino-state/src/status.rs @@ -1,150 +1,41 @@ -//! Holds a thread safe status implementation. +//! Thread-safe status wrapper. +//! +//! This module provides [`AtomicStatus`], a thread-safe wrapper for [`StatusType`]. -use std::{ - fmt, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; -/// Holds a thread safe representation of a StatusType. -/// Possible values: -/// - [0: Spawning] -/// - [1: Syncing] -/// - [2: Ready] -/// - [3: Busy] -/// - [4: Closing]. -/// - [>=5: Offline]. -/// - [>=6: Error]. -/// - [>=7: Critical-Error]. -/// TODO: Refine error code spec. +pub use zaino_common::status::{Status, StatusType}; + +/// Holds a thread-safe representation of a [`StatusType`]. #[derive(Debug, Clone)] pub struct AtomicStatus { inner: Arc, } impl AtomicStatus { - /// Creates a new AtomicStatus + /// Creates a new AtomicStatus. pub fn new(status: StatusType) -> Self { Self { inner: Arc::new(AtomicUsize::new(status.into())), } } - /// Loads the value held in the AtomicStatus + /// Loads the value held in the AtomicStatus. pub fn load(&self) -> StatusType { StatusType::from(self.inner.load(Ordering::SeqCst)) } - /// Sets the value held in the AtomicStatus + /// Sets the value held in the AtomicStatus. pub fn store(&self, status: StatusType) { - self.inner - .store(status.into(), std::sync::atomic::Ordering::SeqCst); + self.inner.store(status.into(), Ordering::SeqCst); } } -/// Status of the server's components. -/// -/// TODO: Some of these statuses may be artefacts of a previous version -/// of the status. We may be able to remove some of them -#[derive(Debug, PartialEq, Clone, Copy)] -pub enum StatusType { - /// Running initial startup routine. - Spawning = 0, - /// Back-end process is currently syncing. - Syncing = 1, - /// Process is ready. - Ready = 2, - /// Process is busy working. - Busy = 3, - /// Running shutdown routine. - Closing = 4, - /// Offline. - Offline = 5, - /// Non Critical Errors. - RecoverableError = 6, - /// Critical Errors. - CriticalError = 7, -} - -impl From for StatusType { - fn from(value: usize) -> Self { - match value { - 0 => StatusType::Spawning, - 1 => StatusType::Syncing, - 2 => StatusType::Ready, - 3 => StatusType::Busy, - 4 => StatusType::Closing, - 5 => StatusType::Offline, - 6 => StatusType::RecoverableError, - _ => StatusType::CriticalError, - } - } -} - -impl From for usize { - fn from(status: StatusType) -> Self { - status as usize - } -} - -impl fmt::Display for StatusType { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let status_str = match self { - StatusType::Spawning => "Spawning", - StatusType::Syncing => "Syncing", - StatusType::Ready => "Ready", - StatusType::Busy => "Busy", - StatusType::Closing => "Closing", - StatusType::Offline => "Offline", - StatusType::RecoverableError => "RecoverableError", - StatusType::CriticalError => "CriticalError", - }; - write!(f, "{status_str}") - } -} - -impl StatusType { - /// Returns the corresponding status symbol for the StatusType - pub fn get_status_symbol(&self) -> String { - let (symbol, color_code) = match self { - // Yellow Statuses - StatusType::Syncing => ("\u{1F7E1}", "\x1b[33m"), - // Cyan Statuses - StatusType::Spawning | StatusType::Busy => ("\u{1F7E1}", "\x1b[36m"), - // Green Status - StatusType::Ready => ("\u{1F7E2}", "\x1b[32m"), - // Grey Statuses - StatusType::Closing | StatusType::Offline => ("\u{26AB}", "\x1b[90m"), - // Red Error Statuses - StatusType::RecoverableError | StatusType::CriticalError => ("\u{1F534}", "\x1b[31m"), - }; - - format!("{}{}{}", color_code, symbol, "\x1b[0m") - } - - /// Look at two statuses, and return the more - /// 'severe' of the two statuses - pub fn combine(self, other: StatusType) -> StatusType { - match (self, other) { - // If either is Closing, return Closing. - (StatusType::Closing, _) | (_, StatusType::Closing) => StatusType::Closing, - // If either is Offline or CriticalError, return CriticalError. - (StatusType::Offline, _) - | (_, StatusType::Offline) - | (StatusType::CriticalError, _) - | (_, StatusType::CriticalError) => StatusType::CriticalError, - // If either is RecoverableError, return RecoverableError. - (StatusType::RecoverableError, _) | (_, StatusType::RecoverableError) => { - StatusType::RecoverableError - } - // If either is Spawning, return Spawning. - (StatusType::Spawning, _) | (_, StatusType::Spawning) => StatusType::Spawning, - // If either is Syncing, return Syncing. - (StatusType::Syncing, _) | (_, StatusType::Syncing) => StatusType::Syncing, - // Otherwise, return Ready. - _ => StatusType::Ready, - } +impl Status for AtomicStatus { + fn status(&self) -> StatusType { + self.load() } } diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index ad4c607e2..7a0d352ef 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -18,6 +18,8 @@ use tracing::info; use tracing_subscriber::EnvFilter; use zaino_common::{ network::{ActivationHeights, ZEBRAD_DEFAULT_ACTIVATION_HEIGHTS}, + probing::{Liveness, Readiness}, + status::Status, validator::ValidatorConfig, CacheConfig, DatabaseConfig, Network, ServiceConfig, StorageConfig, }; @@ -63,6 +65,28 @@ pub fn make_uri(indexer_port: portpicker::Port) -> http::Uri { .unwrap() } +/// Polls until the given component reports ready. +/// +/// Returns `true` if the component became ready within the timeout, +/// `false` if the timeout was reached. +pub async fn poll_until_ready( + component: &impl Readiness, + poll_interval: std::time::Duration, + timeout: std::time::Duration, +) -> bool { + tokio::time::timeout(timeout, async { + let mut interval = tokio::time::interval(poll_interval); + loop { + interval.tick().await; + if component.is_ready() { + return; + } + } + }) + .await + .is_ok() +} + // temporary until activation heights are unified to zebra-chain type. // from/into impls not added in zaino-common to avoid unecessary addition of zcash-protocol dep to non-test code /// Convert zaino activation heights into zcash protocol type. @@ -459,8 +483,15 @@ where test_manager.local_net.generate_blocks(1).await.unwrap(); } - // FIXME: zaino's status can still be syncing instead of ready at this point - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // Wait for zaino to be ready to serve requests + if let Some(ref subscriber) = test_manager.service_subscriber { + poll_until_ready( + subscriber, + std::time::Duration::from_millis(100), + std::time::Duration::from_secs(30), + ) + .await; + } Ok(test_manager) } @@ -507,37 +538,83 @@ where } /// Generate `n` blocks for the local network and poll zaino's fetch/state subscriber until the chain index is synced to the target height. - pub async fn generate_blocks_and_poll_indexer( + /// + /// # Panics + /// + /// Panics if the indexer is not live (Offline or CriticalError), indicating the + /// backing validator has crashed or become unreachable. + pub async fn generate_blocks_and_poll_indexer( &self, n: u32, - indexer: &impl LightWalletIndexer, - ) { + indexer: &I, + ) where + I: LightWalletIndexer + Liveness + Status, + { let chain_height = self.local_net.get_chain_height().await; + let target_height = u64::from(chain_height) + n as u64; let mut next_block_height = u64::from(chain_height) + 1; let mut interval = tokio::time::interval(std::time::Duration::from_millis(200)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); interval.tick().await; + // NOTE: readstate service seems to not be functioning correctly when generate multiple blocks at once and polling the latest block. // commented out a fall back to `get_block` to query the cache directly if needed in the future. // while indexer.get_block(zaino_proto::proto::service::BlockId { // height: u64::from(chain_height) + n as u64, // hash: vec![], // }).await.is_err() - while indexer.get_latest_block().await.unwrap().height < u64::from(chain_height) + n as u64 - { + while indexer.get_latest_block().await.unwrap().height < target_height { + // Check liveness - fail fast if the indexer is dead + if !indexer.is_live() { + let status = indexer.status(); + panic!( + "Indexer is not live (status: {status:?}). \ + The backing validator may have crashed or become unreachable." + ); + } + if n == 0 { interval.tick().await; } else { self.local_net.generate_blocks(1).await.unwrap(); while indexer.get_latest_block().await.unwrap().height != next_block_height { + if !indexer.is_live() { + let status = indexer.status(); + panic!( + "Indexer is not live while waiting for block {next_block_height} (status: {status:?})." + ); + } interval.tick().await; } next_block_height += 1; } } + + // After height is reached, wait for readiness and measure if it adds time + if !indexer.is_ready() { + let start = std::time::Instant::now(); + poll_until_ready( + indexer, + std::time::Duration::from_millis(50), + std::time::Duration::from_secs(30), + ) + .await; + let elapsed = start.elapsed(); + if elapsed.as_millis() > 0 { + info!( + "Readiness wait after height poll took {:?} (height polling alone was insufficient)", + elapsed + ); + } + } } /// Generate `n` blocks for the local network and poll zaino's chain index until the chain index is synced to the target height. + /// + /// # Panics + /// + /// Panics if the chain index is not live (Offline or CriticalError), indicating the + /// backing validator has crashed or become unreachable. pub async fn generate_blocks_and_poll_chain_index( &self, n: u32, @@ -555,6 +632,15 @@ where .height, ) < chain_height + n { + // Check liveness - fail fast if the chain index is dead + if !chain_index.is_live() { + let status = chain_index.combined_status(); + panic!( + "Chain index is not live (status: {status:?}). \ + The backing validator may have crashed or become unreachable." + ); + } + if n == 0 { interval.tick().await; } else { @@ -566,11 +652,35 @@ where .height, ) != next_block_height { + if !chain_index.is_live() { + let status = chain_index.combined_status(); + panic!( + "Chain index is not live while waiting for block {next_block_height} (status: {status:?})." + ); + } interval.tick().await; } next_block_height += 1; } } + + // After height is reached, wait for readiness and measure if it adds time + if !chain_index.is_ready() { + let start = std::time::Instant::now(); + poll_until_ready( + chain_index, + std::time::Duration::from_millis(50), + std::time::Duration::from_secs(30), + ) + .await; + let elapsed = start.elapsed(); + if elapsed.as_millis() > 0 { + info!( + "Readiness wait after height poll took {:?} (height polling alone was insufficient)", + elapsed + ); + } + } } /// Closes the TestManager. diff --git a/zainod/src/indexer.rs b/zainod/src/indexer.rs index 91329fe9d..cc4154f8b 100644 --- a/zainod/src/indexer.rs +++ b/zainod/src/indexer.rs @@ -124,18 +124,18 @@ where loop { // Log the servers status. if last_log_time.elapsed() >= log_interval { - indexer.log_status().await; + indexer.log_status(); last_log_time = Instant::now(); } // Check for restart signals. - if indexer.check_for_critical_errors().await { + if indexer.check_for_critical_errors() { indexer.close().await; return Err(IndexerError::Restart); } // Check for shutdown signals. - if indexer.check_for_shutdown().await { + if indexer.check_for_shutdown() { indexer.close().await; return Ok(()); } @@ -148,14 +148,14 @@ where } /// Checks indexers status and servers internal statuses for either offline of critical error signals. - async fn check_for_critical_errors(&self) -> bool { - let status = self.status_int().await; + fn check_for_critical_errors(&self) -> bool { + let status = self.status_int(); status == 5 || status >= 7 } /// Checks indexers status and servers internal status for closure signal. - async fn check_for_shutdown(&self) -> bool { - if self.status_int().await == 4 { + fn check_for_shutdown(&self) -> bool { + if self.status_int() == 4 { return true; } false @@ -179,10 +179,10 @@ where } } - /// Returns the indexers current status usize, caliculates from internal statuses. - async fn status_int(&self) -> usize { + /// Returns the indexers current status usize, calculates from internal statuses. + fn status_int(&self) -> usize { let service_status = match &self.service { - Some(service) => service.inner_ref().status().await, + Some(service) => service.inner_ref().status(), None => return 7, }; @@ -204,14 +204,14 @@ where } /// Returns the current StatusType of the indexer. - pub async fn status(&self) -> StatusType { - StatusType::from(self.status_int().await) + pub fn status(&self) -> StatusType { + StatusType::from(self.status_int()) } /// Logs the indexers status. - pub async fn log_status(&self) { + pub fn log_status(&self) { let service_status = match &self.service { - Some(service) => service.inner_ref().status().await, + Some(service) => service.inner_ref().status(), None => StatusType::Offline, };