diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index f8e8f5387..4103b587e 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -198,7 +198,7 @@ mod chain_query_interface { ) .await .unwrap(); - let index_reader = chain_index.subscriber().await; + let index_reader = chain_index.subscriber(); tokio::time::sleep(Duration::from_secs(3)).await; ( @@ -231,7 +231,7 @@ mod chain_query_interface { ) .await .unwrap(); - let index_reader = chain_index.subscriber().await; + let index_reader = chain_index.subscriber(); tokio::time::sleep(Duration::from_secs(3)).await; (test_manager, json_service, None, chain_index, index_reader) diff --git a/integration-tests/tests/fetch_service.rs b/integration-tests/tests/fetch_service.rs index a5d5047e1..22e63081e 100644 --- a/integration-tests/tests/fetch_service.rs +++ b/integration-tests/tests/fetch_service.rs @@ -9,8 +9,8 @@ use zaino_proto::proto::service::{ TransparentAddressBlockFilter, TxFilter, }; use zaino_state::{ - BackendType, FetchService, FetchServiceConfig, FetchServiceSubscriber, LightWalletIndexer, - StatusType, ZcashIndexer, ZcashService as _, + BackendType, ChainIndex as _, FetchService, FetchServiceConfig, FetchServiceSubscriber, + LightWalletIndexer, StatusType, ZcashIndexer, ZcashService as _, }; use zaino_testutils::Validator as _; use zaino_testutils::{TestManager, ValidatorKind}; @@ -301,23 +301,28 @@ pub async fn test_get_mempool_info(validator: &ValidatorKind) { let info = fetch_service_subscriber.get_mempool_info().await.unwrap(); // Derive expected values directly from the current mempool contents. - let entries = fetch_service_subscriber.mempool.get_mempool().await; + + let keys = fetch_service_subscriber + .indexer + .get_mempool_txids() + .await + .unwrap(); + + let values = fetch_service_subscriber + .indexer + .get_mempool_transactions(Vec::new()) + .await + .unwrap(); // Size - assert_eq!(info.size, entries.len() as u64); + assert_eq!(info.size, values.len() as u64); assert!(info.size >= 1); // Bytes: sum of SerializedTransaction lengths - let expected_bytes: u64 = entries - .iter() - .map(|(_, value)| value.serialized_tx.as_ref().as_ref().len() as u64) - .sum(); + let expected_bytes: u64 = values.iter().map(|entry| entry.len() as u64).sum(); // Key heap bytes: sum of txid String capacities - let expected_key_heap_bytes: u64 = entries - .iter() - .map(|(key, _)| key.txid.capacity() as u64) - .sum(); + let expected_key_heap_bytes: u64 = keys.iter().map(|key| key.0.len() as u64).sum(); let expected_usage = expected_bytes.saturating_add(expected_key_heap_bytes); @@ -478,12 +483,12 @@ async fn fetch_service_get_address_tx_ids(validator: &ValidatorKind) { test_manager.local_net.generate_blocks(1).await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let chain_height = fetch_service_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + let chain_height: u32 = fetch_service_subscriber + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); dbg!(&chain_height); let fetch_service_txids = fetch_service_subscriber @@ -994,12 +999,12 @@ async fn fetch_service_get_taddress_txids(validator: &ValidatorKind) { test_manager.local_net.generate_blocks(1).await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let chain_height = fetch_service_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + let chain_height: u32 = fetch_service_subscriber + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); dbg!(&chain_height); let block_filter = TransparentAddressBlockFilter { diff --git a/integration-tests/tests/json_server.rs b/integration-tests/tests/json_server.rs index dd0c03b2d..339daf5e9 100644 --- a/integration-tests/tests/json_server.rs +++ b/integration-tests/tests/json_server.rs @@ -3,8 +3,8 @@ use zaino_common::network::ActivationHeights; use zaino_common::{DatabaseConfig, ServiceConfig, StorageConfig}; use zaino_state::{ - BackendType, FetchService, FetchServiceConfig, FetchServiceSubscriber, ZcashIndexer, - ZcashService as _, + BackendType, ChainIndex, FetchService, FetchServiceConfig, FetchServiceSubscriber, + ZcashIndexer, ZcashService as _, }; use zaino_testutils::{from_inputs, Validator as _}; use zaino_testutils::{TestManager, ValidatorKind}; @@ -585,11 +585,11 @@ async fn get_address_tx_ids_inner() { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let chain_height = zcashd_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); dbg!(&chain_height); let zcashd_txids = zcashd_subscriber diff --git a/integration-tests/tests/state_service.rs b/integration-tests/tests/state_service.rs index 720f9e302..0c4b1c3a7 100644 --- a/integration-tests/tests/state_service.rs +++ b/integration-tests/tests/state_service.rs @@ -1,6 +1,6 @@ use zaino_common::network::ActivationHeights; use zaino_common::{DatabaseConfig, ServiceConfig, StorageConfig}; -use zaino_state::BackendType; +use zaino_state::{BackendType, ChainIndex as _}; use zaino_state::{ FetchService, FetchServiceConfig, FetchServiceSubscriber, LightWalletIndexer, StateService, StateServiceConfig, StateServiceSubscriber, ZcashIndexer, ZcashService as _, @@ -857,11 +857,12 @@ async fn state_service_get_address_tx_ids(validator: &ValidatorKind) { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let chain_height = fetch_service_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); + dbg!(&chain_height); let fetch_service_txids = fetch_service_subscriber diff --git a/zaino-state/src/backends/fetch.rs b/zaino-state/src/backends/fetch.rs index ba5c96e2c..da3bc04e3 100644 --- a/zaino-state/src/backends/fetch.rs +++ b/zaino-state/src/backends/fetch.rs @@ -2,13 +2,15 @@ use futures::StreamExt; use hex::FromHex; -use std::time; +use std::{io::Cursor, time}; use tokio::{sync::mpsc, time::timeout}; use tonic::async_trait; use tracing::{info, warn}; use zebra_state::HashOrHeight; -use zebra_chain::{block::Height, subtree::NoteCommitmentSubtreeIndex}; +use zebra_chain::{ + block::Height, serialization::ZcashDeserialize as _, subtree::NoteCommitmentSubtreeIndex, +}; use zebra_rpc::{ client::{GetSubtreesByIndexResponse, GetTreestateResponse, ValidateAddressResponse}, methods::{ @@ -38,22 +40,21 @@ use zaino_proto::proto::{ }; use crate::{ - chain_index::{ - mempool::{Mempool, MempoolSubscriber}, - source::ValidatorConnector, - }, + chain_index::{source::ValidatorConnector, types}, config::FetchServiceConfig, - error::{BlockCacheError, FetchServiceError}, + error::FetchServiceError, indexer::{ handle_raw_transaction, IndexerSubscriber, LightWalletIndexer, ZcashIndexer, ZcashService, }, - local_cache::{BlockCache, BlockCacheSubscriber}, status::StatusType, stream::{ AddressStream, CompactBlockStream, CompactTransactionStream, RawTransactionStream, UtxoReplyStream, }, - utils::{blockid_to_hashorheight, get_build_info, ServiceMetadata}, + utils::{ + blockid_to_hashorheight, compact_block_to_nullifiers, get_build_info, ServiceMetadata, + }, + ChainIndex as _, NodeBackedChainIndex, NodeBackedChainIndexSubscriber, }; /// Chain fetch service backed by Zcashd's JsonRPC engine. @@ -68,10 +69,8 @@ use crate::{ pub struct FetchService { /// JsonRPC Client. fetcher: JsonRpSeeConnector, - /// Local compact block cache. - block_cache: BlockCache, - /// Internal mempool. - mempool: Mempool, + /// Core indexer. + indexer: NodeBackedChainIndex, /// Service metadata. data: ServiceMetadata, /// StateService config data. @@ -104,22 +103,14 @@ impl ZcashService for FetchService { ); info!("Using Zcash build: {}", data); - let block_cache = BlockCache::spawn(&fetcher, None, config.clone().into()) + let source = ValidatorConnector::Fetch(fetcher.clone()); + let indexer = NodeBackedChainIndex::new(source, config.clone().into()) .await - .map_err(|e| { - FetchServiceError::BlockCacheError(BlockCacheError::Custom(e.to_string())) - })?; - - let mempool_source = ValidatorConnector::Fetch(fetcher.clone()); - - let mempool = Mempool::spawn(mempool_source, None).await.map_err(|e| { - FetchServiceError::BlockCacheError(BlockCacheError::Custom(e.to_string())) - })?; + .unwrap(); let fetch_service = Self { fetcher, - block_cache, - mempool, + indexer, data, config, }; @@ -135,8 +126,7 @@ impl ZcashService for FetchService { fn get_subscriber(&self) -> IndexerSubscriber { IndexerSubscriber::new(FetchServiceSubscriber { fetcher: self.fetcher.clone(), - block_cache: self.block_cache.subscriber(), - mempool: self.mempool.subscriber(), + indexer: self.indexer.subscriber(), data: self.data.clone(), config: self.config.clone(), }) @@ -144,16 +134,16 @@ impl ZcashService for FetchService { /// Fetches the current status async fn status(&self) -> StatusType { - let mempool_status = self.mempool.status(); - let block_cache_status = self.block_cache.status(); - - mempool_status.combine(block_cache_status) + self.indexer.status() } /// Shuts down the StateService. fn close(&mut self) { - self.mempool.close(); - self.block_cache.close(); + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let _ = self.indexer.shutdown().await; + }); + }); } } @@ -170,10 +160,8 @@ impl Drop for FetchService { pub struct FetchServiceSubscriber { /// JsonRPC Client. pub fetcher: JsonRpSeeConnector, - /// Local compact block cache. - pub block_cache: BlockCacheSubscriber, - /// Internal mempool. - pub mempool: MempoolSubscriber, + /// Core indexer. + pub indexer: NodeBackedChainIndexSubscriber, /// Service metadata. pub data: ServiceMetadata, /// StateService config data. @@ -183,10 +171,7 @@ pub struct FetchServiceSubscriber { impl FetchServiceSubscriber { /// Fetches the current status pub fn status(&self) -> StatusType { - let mempool_status = self.mempool.status(); - let block_cache_status = self.block_cache.status(); - - mempool_status.combine(block_cache_status) + self.indexer.status() } /// Returns the network type running. @@ -249,7 +234,7 @@ impl ZcashIndexer for FetchServiceSubscriber { /// /// Zebra does not support this RPC call directly. async fn get_mempool_info(&self) -> Result { - Ok(self.mempool.get_mempool_info().await?) + Ok(self.indexer.get_mempool_info().await.into()) } async fn get_peer_info(&self) -> Result { @@ -425,11 +410,11 @@ impl ZcashIndexer for FetchServiceSubscriber { async fn get_raw_mempool(&self) -> Result, Self::Error> { // Ok(self.fetcher.get_raw_mempool().await?.transactions) Ok(self - .mempool - .get_mempool() + .indexer + .get_mempool_txids() .await .into_iter() - .map(|(key, _)| key.txid) + .map(|mempool_txids| mempool_txids.iter().map(|txid| txid.to_string()).collect()) .collect()) } @@ -523,7 +508,9 @@ impl ZcashIndexer for FetchServiceSubscriber { } async fn chain_height(&self) -> Result { - Ok(self.block_cache.get_chain_height().await?) + Ok(Height( + self.indexer.snapshot_nonfinalized_state().best_tip.height.0, + )) } /// Returns the transaction ids made by the provided transparent addresses. /// @@ -592,16 +579,12 @@ impl ZcashIndexer for FetchServiceSubscriber { impl LightWalletIndexer for FetchServiceSubscriber { /// Return the height of the tip of the best chain async fn get_latest_block(&self) -> Result { - let latest_height = self.block_cache.get_chain_height().await?; - let latest_hash = self - .block_cache - .get_compact_block(latest_height.0.to_string()) - .await? - .hash; + let tip = self.indexer.snapshot_nonfinalized_state().best_tip; + dbg!(&tip); Ok(BlockId { - height: latest_height.0 as u64, - hash: latest_hash, + height: tip.height.0 as u64, + hash: tip.blockhash.0.to_vec(), }) } @@ -612,14 +595,59 @@ impl LightWalletIndexer for FetchServiceSubscriber { "Error: Invalid hash and/or height out of range. Failed to convert to u32.", )), )?; + + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let height = match hash_or_height { + HashOrHeight::Height(height) => height.0, + HashOrHeight::Hash(hash) => { + match self.indexer.get_block_height(&snapshot, hash.into()).await { + Ok(Some(height)) => height.0, + Ok(None) => { + return Err(FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Hash not founf in chain", + ))); + } + Err(_e) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::internal("Error: Internal db error."), + )); + } + } + } + }; + match self - .block_cache - .get_compact_block(hash_or_height.to_string()) + .indexer + .get_compact_block(&snapshot, types::Height(height)) .await { - Ok(block) => Ok(block), + Ok(Some(block)) => Ok(block), + Ok(None) => { + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + "Error: Failed to retrieve block from state.", + ))), + } + } Err(e) => { - let chain_height = self.block_cache.get_chain_height().await?.0; + let chain_height = snapshot.best_tip.height.0; match hash_or_height { HashOrHeight::Height(Height(height)) if height >= chain_height => Err( FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( @@ -653,35 +681,86 @@ impl LightWalletIndexer for FetchServiceSubscriber { /// /// NOTE: Currently this only returns Orchard nullifiers to follow Lightwalletd functionality but Sapling could be added if required by wallets. async fn get_block_nullifiers(&self, request: BlockId) -> Result { - let height: u32 = match request.height.try_into() { - Ok(height) => height, - Err(_) => { - return Err(FetchServiceError::TonicStatusError( - tonic::Status::invalid_argument( - "Error: Height out of range. Failed to convert to u32.", - ), - )); + let hash_or_height = blockid_to_hashorheight(request).ok_or( + FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Failed to convert to u32.", + )), + )?; + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let height = match hash_or_height { + HashOrHeight::Height(height) => height.0, + HashOrHeight::Hash(hash) => { + match self.indexer.get_block_height(&snapshot, hash.into()).await { + Ok(Some(height)) => height.0, + Ok(None) => { + return Err(FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Hash not founf in chain", + ))); + } + Err(_e) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::internal("Error: Internal db error."), + )); + } + } } }; match self - .block_cache - .get_compact_block_nullifiers(height.to_string()) + .indexer + .get_compact_block(&snapshot, types::Height(height)) .await { - Ok(block) => Ok(block), + Ok(Some(block)) => Ok(compact_block_to_nullifiers(block)), + Ok(None) => { + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + "Error: Failed to retrieve block from state.", + ))), + } + } Err(e) => { - let chain_height = self.block_cache.get_chain_height().await?.0; - if height >= chain_height { - Err(FetchServiceError::TonicStatusError(tonic::Status::out_of_range( - format!( - "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", - ) - ))) - } else { + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => // TODO: Hide server error from clients before release. Currently useful for dev purposes. - Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( - format!("Error: Failed to retrieve block from node. Server Error: {e}",), - ))) + { + Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + format!("Error: Failed to retrieve block from node. Server Error: {e}",), + ))) + } } } } @@ -732,26 +811,49 @@ impl LightWalletIndexer for FetchServiceSubscriber { } else { false }; - let chain_height = self.block_cache.get_chain_height().await?.0; let fetch_service_clone = self.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); tokio::spawn(async move { let timeout = timeout(time::Duration::from_secs((service_timeout*4) as u64), async { + let snapshot = fetch_service_clone.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_tip.height.0; for height in start..=end { let height = if rev_order { end - (height - start) } else { height }; - match fetch_service_clone.block_cache.get_compact_block( - height.to_string(), + match fetch_service_clone.indexer.get_compact_block( + &snapshot, + types::Height(height), ).await { - Ok(block) => { + Ok(Some(block)) => { if channel_tx.send(Ok(block)).await.is_err() { break; } } + Ok(None) => if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) + .await + + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; + } + } + } else if channel_tx + .send(Err(tonic::Status::unknown("Internal error, Failed to fetch block."))) + .await + .is_err() + { + break; + } Err(e) => { if height >= chain_height { match channel_tx @@ -804,78 +906,128 @@ impl LightWalletIndexer for FetchServiceSubscriber { &self, request: BlockRange, ) -> Result { - let tonic_status_error = - |err| FetchServiceError::TonicStatusError(tonic::Status::invalid_argument(err)); - let mut start = match request.start { - Some(block_id) => match u32::try_from(block_id.height) { - Ok(height) => Ok(height), - Err(_) => Err("Error: Start height out of range. Failed to convert to u32."), + let mut start: u32 = match request.start { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: Start height out of range. Failed to convert to u32.", + ), + )); + } }, - None => Err("Error: No start height given."), - } - .map_err(tonic_status_error)?; - let mut end = match request.end { - Some(block_id) => match u32::try_from(block_id.height) { - Ok(height) => Ok(height), - Err(_) => Err("Error: End height out of range. Failed to convert to u32."), + None => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; + let mut end: u32 = match request.end { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: End height out of range. Failed to convert to u32.", + ), + )); + } }, - None => Err("Error: No start height given."), - } - .map_err(tonic_status_error)?; + None => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; let rev_order = if start > end { (start, end) = (end, start); true } else { false }; - let chain_height = self.block_cache.get_chain_height().await?.0; let fetch_service_clone = self.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); tokio::spawn(async move { - let timeout = timeout( - time::Duration::from_secs((service_timeout * 4) as u64), - async { + let timeout = timeout(time::Duration::from_secs((service_timeout*4) as u64), async { + let snapshot = fetch_service_clone.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_tip.height.0; for height in start..=end { let height = if rev_order { end - (height - start) } else { height }; - if let Err(e) = channel_tx - .send( - fetch_service_clone - .block_cache - .get_compact_block_nullifiers(height.to_string()) + match fetch_service_clone.indexer.get_compact_block( + &snapshot, + types::Height(height), + ).await { + Ok(Some(block)) => { + if channel_tx.send(Ok(compact_block_to_nullifiers(block))).await.is_err() { + break; + } + } + Ok(None) => if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) .await - .map_err(|e| { - if height >= chain_height { - tonic::Status::out_of_range(format!( - "Error: Height out of range [{height}]. Height requested \ - is greater than the best chain tip [{chain_height}].", - )) - } else { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - tonic::Status::unknown(e.to_string()) + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; } - }), - ) - .await - { - warn!("GetBlockRangeNullifiers channel closed unexpectedly: {}", e); - break; + } + } else if channel_tx + .send(Err(tonic::Status::unknown("Internal error, Failed to fetch block."))) + .await + .is_err() + { + break; + } + Err(e) => { + if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) + .await + + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; + } + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + break; + } + } + } } } - }, - ) - .await; - if timeout.is_err() { - channel_tx - .send(Err(tonic::Status::deadline_exceeded( - "Error: get_block_range_nullifiers gRPC request timed out.", - ))) - .await - .ok(); + }) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::deadline_exceeded( + "Error: get_block_range gRPC request timed out.", + ))) + .await + .ok(); + } } }); Ok(CompactBlockStream::new(channel_rx)) @@ -899,7 +1051,7 @@ impl LightWalletIndexer for FetchServiceSubscriber { let height: u64 = match height { Some(h) => h as u64, // Zebra returns None for mempool transactions, convert to `Mempool Height`. - None => self.block_cache.get_chain_height().await?.0 as u64, + None => self.indexer.snapshot_nonfinalized_state().best_tip.height.0 as u64, }; Ok(RawTransaction { @@ -1106,72 +1258,75 @@ impl LightWalletIndexer for FetchServiceSubscriber { }) .collect(); - let mempool = self.mempool.clone(); + let mempool = self.indexer.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); + tokio::spawn(async move { let timeout = timeout( time::Duration::from_secs((service_timeout * 4) as u64), async { - for (mempool_key, mempool_value) in - mempool.get_filtered_mempool(exclude_txids).await - { - let txid_bytes = match hex::decode(mempool_key.txid) { - Ok(bytes) => bytes, - Err(error) => { - if channel_tx - .send(Err(tonic::Status::unknown(error.to_string()))) - .await - .is_err() - { - break; - } else { - continue; - } - } - }; - match ::parse_from_slice( - mempool_value.serialized_tx.as_ref().as_ref(), - Some(vec![txid_bytes]), - None, - ) { - Ok(transaction) => { - // ParseFromSlice returns any data left after the conversion to a - // FullTransaction, If the conversion has succeeded this should be empty. - if transaction.0.is_empty() { - if channel_tx - .send( - transaction - .1 - .to_compact(0) - .map_err(|e| tonic::Status::unknown(e.to_string())), - ) - .await - .is_err() - { - break; + match mempool.get_mempool_transactions(exclude_txids).await { + Ok(transactions) => { + for serialized_transaction_bytes in transactions { + // TODO: This implementation should be cleaned up to not use parse_from_slice. + // This could be done by implementing try_from zebra_chain::transaction::Transaction for CompactTxData, + // (which implements to_compact())letting us avoid double parsing of transaction bytes. + let transaction: zebra_chain::transaction::Transaction = + zebra_chain::transaction::Transaction::zcash_deserialize( + &mut Cursor::new(&serialized_transaction_bytes), + ) + .unwrap(); + // TODO: Check this is in the correct format and does not need hex decoding or reversing. + let txid = transaction.hash().0.to_vec(); + + match ::parse_from_slice( + &serialized_transaction_bytes, + Some(vec![txid]), + None, + ) { + Ok(transaction) => { + // ParseFromSlice returns any data left after the conversion to a + // FullTransaction, If the conversion has succeeded this should be empty. + if transaction.0.is_empty() { + if channel_tx + .send(transaction.1.to_compact(0).map_err(|e| { + tonic::Status::unknown(e.to_string()) + })) + .await + .is_err() + { + break; + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown("Error: "))) + .await + .is_err() + { + break; + } + } } - } else { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - if channel_tx - .send(Err(tonic::Status::unknown("Error: "))) - .await - .is_err() - { - break; + Err(e) => { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + break; + } } } } - Err(e) => { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - if channel_tx - .send(Err(tonic::Status::unknown(e.to_string()))) - .await - .is_err() - { - break; - } - } + } + Err(e) => { + channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .ok(); } } }, @@ -1189,64 +1344,56 @@ impl LightWalletIndexer for FetchServiceSubscriber { } } }); - Ok(CompactTransactionStream::new(channel_rx)) } /// Return a stream of current Mempool transactions. This will keep the output stream open while /// there are mempool transactions. It will close the returned stream when a new block is mined. async fn get_mempool_stream(&self) -> Result { - let mut mempool = self.mempool.clone(); + let indexer = self.indexer.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); - let mempool_height = self.block_cache.get_chain_height().await?.0; tokio::spawn(async move { let timeout = timeout( time::Duration::from_secs((service_timeout * 6) as u64), async { - let (mut mempool_stream, _mempool_handle) = match mempool - .get_mempool_stream(None) - .await - { - Ok(stream) => stream, - Err(e) => { - warn!("Error fetching stream from mempool: {:?}", e); + let mempool_height = indexer.snapshot_nonfinalized_state().best_tip.height.0; + match indexer.get_mempool_stream(None) { + Some(mut mempool_stream) => { + while let Some(result) = mempool_stream.next().await { + match result { + Ok(transaction_bytes) => { + if channel_tx + .send(Ok(RawTransaction { + data: transaction_bytes, + height: mempool_height as u64, + })) + .await + .is_err() + { + break; + } + } + Err(e) => { + channel_tx + .send(Err(tonic::Status::internal(format!( + "Error in mempool stream: {e:?}" + )))) + .await + .ok(); + break; + } + } + } + } + None => { + warn!("Error fetching stream from mempool, Incorrect chain tip!"); channel_tx .send(Err(tonic::Status::internal("Error getting mempool stream"))) .await .ok(); - return; } }; - while let Some(result) = mempool_stream.recv().await { - match result { - Ok((_mempool_key, mempool_value)) => { - if channel_tx - .send(Ok(RawTransaction { - data: mempool_value - .serialized_tx - .as_ref() - .as_ref() - .to_vec(), - height: mempool_height as u64, - })) - .await - .is_err() - { - break; - } - } - Err(e) => { - channel_tx - .send(Err(tonic::Status::internal(format!( - "Error in mempool stream: {e:?}" - )))) - .await - .ok(); - break; - } - } - } }, ) .await; @@ -1262,7 +1409,6 @@ impl LightWalletIndexer for FetchServiceSubscriber { } } }); - Ok(RawTransactionStream::new(channel_rx)) } diff --git a/zaino-state/src/backends/state.rs b/zaino-state/src/backends/state.rs index 2fd6652c0..3a949b025 100644 --- a/zaino-state/src/backends/state.rs +++ b/zaino-state/src/backends/state.rs @@ -1042,7 +1042,7 @@ impl ZcashIndexer for StateServiceSubscriber { /// `usage` is the total memory usage for the mempool, in bytes. /// the [optional `fullyNotified` field](), is only utilized for zcashd regtests, is deprecated, and is not included. async fn get_mempool_info(&self) -> Result { - Ok(self.mempool.get_mempool_info().await?) + Ok(self.mempool.get_mempool_info().await.into()) } async fn get_peer_info(&self) -> Result { diff --git a/zaino-state/src/chain_index.rs b/zaino-state/src/chain_index.rs index db7358da8..6f4fb1fa0 100644 --- a/zaino-state/src/chain_index.rs +++ b/zaino-state/src/chain_index.rs @@ -12,14 +12,15 @@ //! - NOTE: Full transaction and block data is served from the backend finalizer. use crate::chain_index::non_finalised_state::BestTip; -use crate::chain_index::types::{BestChainLocation, NonBestChainLocation}; +use crate::chain_index::types::{BestChainLocation, MempoolInfo, NonBestChainLocation}; use crate::error::{ChainIndexError, ChainIndexErrorKind, FinalisedStateError}; -use crate::IndexedBlock; use crate::{AtomicStatus, StatusType, SyncError}; +use crate::{IndexedBlock, TransactionHash}; use std::collections::HashSet; use std::{sync::Arc, time::Duration}; use futures::{FutureExt, Stream}; +use hex::FromHex as _; use non_finalised_state::NonfinalizedBlockCacheSnapshot; use source::{BlockchainSource, ValidatorConnector}; use tokio_stream::StreamExt; @@ -196,6 +197,21 @@ pub trait ChainIndex { end: Option, ) -> Option, Self::Error>>>; + /// Returns the *compact* block for the given height. + /// + /// Returns None if the specified height + /// is greater than the snapshot's tip + /// + /// TODO: Add range fetch method or update this? + #[allow(clippy::type_complexity)] + fn get_compact_block( + &self, + nonfinalized_snapshot: &Self::Snapshot, + height: types::Height, + ) -> impl std::future::Future< + Output = Result, Self::Error>, + >; + /// Finds the newest ancestor of the given block on the main /// chain, or the block itself if it is on the main chain. fn find_fork_point( @@ -238,6 +254,11 @@ pub trait ChainIndex { Output = Result<(Option, HashSet), Self::Error>, >; + /// Returns all txids currently in the mempool. + fn get_mempool_txids( + &self, + ) -> impl std::future::Future, Self::Error>>; + /// Returns all transactions currently in the mempool, filtered by `exclude_list`. /// /// The `exclude_list` may contain shortened transaction ID hex prefixes (client-endian). @@ -249,12 +270,18 @@ pub trait ChainIndex { /// Returns a stream of mempool transactions, ending the stream when the chain tip block hash /// changes (a new block is mined or a reorg occurs). /// - /// If the chain tip has changed from the given spanshot returns None. + /// If a snapshot is given and the chain tip has changed from the given spanshot, returns None. #[allow(clippy::type_complexity)] fn get_mempool_stream( &self, - snapshot: &Self::Snapshot, + snapshot: Option<&Self::Snapshot>, ) -> Option, Self::Error>>>; + + /// Returns Information about the mempool state: + /// - size: Current tx count + /// - bytes: Sum of all tx sizes + /// - usage: Total memory usage for the mempool + fn get_mempool_info(&self) -> impl std::future::Future; } /// The combined index. Contains a view of the mempool, and the full @@ -380,6 +407,7 @@ pub trait ChainIndex { /// - Unified access to finalized and non-finalized blockchain state /// - Automatic synchronization between state layers /// - Snapshot-based consistency for queries +#[derive(Debug)] pub struct NodeBackedChainIndex { blockchain_source: std::sync::Arc, #[allow(dead_code)] @@ -434,7 +462,7 @@ impl NodeBackedChainIndex { /// Creates a [`NodeBackedChainIndexSubscriber`] from self, /// a clone-safe, drop-safe, read-only view onto the running indexer. - pub async fn subscriber(&self) -> NodeBackedChainIndexSubscriber { + pub fn subscriber(&self) -> NodeBackedChainIndexSubscriber { NodeBackedChainIndexSubscriber { blockchain_source: self.blockchain_source.as_ref().clone(), mempool: self.mempool.subscriber(), @@ -532,7 +560,7 @@ impl NodeBackedChainIndex { /// Designed for concurrent efficiency. /// /// [`NodeBackedChainIndexSubscriber`] can safely be cloned and dropped freely. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct NodeBackedChainIndexSubscriber { blockchain_source: Source, mempool: mempool::MempoolSubscriber, @@ -695,6 +723,30 @@ impl ChainIndex for NodeBackedChainIndexSubscriber Result, Self::Error> { + if height <= nonfinalized_snapshot.best_tip.height { + Ok(Some( + match nonfinalized_snapshot.get_chainblock_by_height(&height) { + Some(block) => block.to_compact_block(), + None => match self.finalized_state.get_compact_block(height).await { + Ok(block) => block, + Err(_e) => return Err(ChainIndexError::database_hole(height)), + }, + }, + )) + } else { + Ok(None) + } + } + /// Finds the newest ancestor of the given block on the main /// chain, or the block itself if it is on the main chain. fn find_fork_point( @@ -868,6 +920,19 @@ impl ChainIndex for NodeBackedChainIndexSubscriber Result, Self::Error> { + self.mempool + .get_mempool() + .await + .into_iter() + .map(|(txid_key, _)| { + TransactionHash::from_hex(&txid_key.txid) + .map_err(ChainIndexError::backing_validator) + }) + .collect::>() + } + /// Returns all transactions currently in the mempool, filtered by `exclude_list`. /// /// The `exclude_list` may contain shortened transaction ID hex prefixes (client-endian). @@ -879,11 +944,9 @@ impl ChainIndex for NodeBackedChainIndexSubscriber, ) -> Result>, Self::Error> { - let subscriber = self.mempool.clone(); - // Use the mempool's own filtering (it already handles client-endian shortened prefixes). let pairs: Vec<(mempool::MempoolKey, mempool::MempoolValue)> = - subscriber.get_filtered_mempool(exclude_list).await; + self.mempool.get_filtered_mempool(exclude_list).await; // Transform to the Vec> that the trait requires. let bytes: Vec> = pairs @@ -897,16 +960,16 @@ impl ChainIndex for NodeBackedChainIndexSubscriber, ) -> Option, Self::Error>>> { - let expected_chain_tip = snapshot.best_tip.blockhash; + let expected_chain_tip = snapshot.map(|snapshot| snapshot.best_tip.blockhash); let mut subscriber = self.mempool.clone(); match subscriber - .get_mempool_stream(Some(expected_chain_tip)) + .get_mempool_stream(expected_chain_tip) .now_or_never() { Some(Ok((in_rx, _handle))) => { @@ -957,6 +1020,14 @@ impl ChainIndex for NodeBackedChainIndexSubscriber MempoolInfo { + self.mempool.get_mempool_info().await + } } impl NonFinalizedSnapshot for Arc diff --git a/zaino-state/src/chain_index/finalised_state.rs b/zaino-state/src/chain_index/finalised_state.rs index f40f9c907..6ed7851ab 100644 --- a/zaino-state/src/chain_index/finalised_state.rs +++ b/zaino-state/src/chain_index/finalised_state.rs @@ -30,6 +30,7 @@ use tokio::time::{interval, MissedTickBehavior}; use super::source::BlockchainSource; +#[derive(Debug)] pub(crate) struct ZainoDB { db: Arc, cfg: BlockCacheConfig, diff --git a/zaino-state/src/chain_index/finalised_state/db.rs b/zaino-state/src/chain_index/finalised_state/db.rs index 6b1b84d72..4f81b7419 100644 --- a/zaino-state/src/chain_index/finalised_state/db.rs +++ b/zaino-state/src/chain_index/finalised_state/db.rs @@ -31,6 +31,7 @@ use super::capability::Capability; pub(super) const VERSION_DIRS: [&str; 1] = ["v1"]; /// All concrete database implementations. +#[derive(Debug)] pub(crate) enum DbBackend { V0(DbV0), V1(DbV1), diff --git a/zaino-state/src/chain_index/finalised_state/db/v1.rs b/zaino-state/src/chain_index/finalised_state/db/v1.rs index 6ab7cb651..fea3fb68e 100644 --- a/zaino-state/src/chain_index/finalised_state/db/v1.rs +++ b/zaino-state/src/chain_index/finalised_state/db/v1.rs @@ -385,6 +385,7 @@ impl TransparentHistExt for DbV1 { /// Zaino’s Finalised state database V1. /// Implements a persistent LMDB-backed chain index for fast read access and verified data. +#[derive(Debug)] pub(crate) struct DbV1 { /// Shared LMDB environment. env: Arc, diff --git a/zaino-state/src/chain_index/finalised_state/reader.rs b/zaino-state/src/chain_index/finalised_state/reader.rs index 163b6c98c..259d14883 100644 --- a/zaino-state/src/chain_index/finalised_state/reader.rs +++ b/zaino-state/src/chain_index/finalised_state/reader.rs @@ -27,7 +27,7 @@ use std::sync::Arc; /// Immutable view onto an already-running [`ZainoDB`]. /// /// Carries a plain reference with the same lifetime as the parent DB -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct DbReader { /// Immutable read-only view onto the running ZainoDB pub(crate) inner: Arc, diff --git a/zaino-state/src/chain_index/finalised_state/router.rs b/zaino-state/src/chain_index/finalised_state/router.rs index 0010f0a23..86c9b783d 100644 --- a/zaino-state/src/chain_index/finalised_state/router.rs +++ b/zaino-state/src/chain_index/finalised_state/router.rs @@ -21,6 +21,7 @@ use std::sync::{ Arc, }; +#[derive(Debug)] pub(crate) struct Router { /// Primary active database. primary: ArcSwap, diff --git a/zaino-state/src/chain_index/mempool.rs b/zaino-state/src/chain_index/mempool.rs index 801bc31a3..d2cc92f25 100644 --- a/zaino-state/src/chain_index/mempool.rs +++ b/zaino-state/src/chain_index/mempool.rs @@ -4,7 +4,10 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ broadcast::{Broadcast, BroadcastSubscriber}, - chain_index::source::{BlockchainSource, BlockchainSourceError}, + chain_index::{ + source::{BlockchainSource, BlockchainSourceError}, + types::MempoolInfo, + }, error::{MempoolError, StatusError}, status::{AtomicStatus, StatusType}, BlockHash, @@ -521,7 +524,7 @@ impl MempoolSubscriber { /// Returns information about the mempool. Used by the `getmempoolinfo` RPC. /// Computed from local Broadcast state. - pub async fn get_mempool_info(&self) -> Result { + pub async fn get_mempool_info(&self) -> MempoolInfo { let mempool_transactions: Vec<(MempoolKey, MempoolValue)> = self.subscriber.get_filtered_state(&HashSet::new()); @@ -541,7 +544,7 @@ impl MempoolSubscriber { let usage: u64 = bytes.saturating_add(key_heap_bytes); - Ok(GetMempoolInfoResponse { size, bytes, usage }) + MempoolInfo { size, bytes, usage } } // TODO noted here too diff --git a/zaino-state/src/chain_index/non_finalised_state.rs b/zaino-state/src/chain_index/non_finalised_state.rs index 98fb7c5f8..1564b00e2 100644 --- a/zaino-state/src/chain_index/non_finalised_state.rs +++ b/zaino-state/src/chain_index/non_finalised_state.rs @@ -16,6 +16,7 @@ use zebra_chain::parameters::Network; use zebra_state::HashOrHeight; /// Holds the block cache +#[derive(Debug)] pub struct NonFinalizedState { /// We need access to the validator's best block hash, as well /// as a source of blocks diff --git a/zaino-state/src/chain_index/tests.rs b/zaino-state/src/chain_index/tests.rs index 297514d43..6f61e2c15 100644 --- a/zaino-state/src/chain_index/tests.rs +++ b/zaino-state/src/chain_index/tests.rs @@ -90,7 +90,7 @@ mod mockchain_tests { let indexer = NodeBackedChainIndex::new(source.clone(), config) .await .unwrap(); - let index_reader = indexer.subscriber().await; + let index_reader = indexer.subscriber(); loop { let check_height: u32 = match active_mockchain_source { @@ -466,7 +466,7 @@ mod mockchain_tests { let mempool_stream_task = tokio::spawn(async move { let nonfinalized_snapshot = index_reader.snapshot_nonfinalized_state(); let mut mempool_stream = index_reader - .get_mempool_stream(&nonfinalized_snapshot) + .get_mempool_stream(Some(&nonfinalized_snapshot)) .expect("failed to create mempool stream"); let mut indexer_mempool_transactions: Vec = @@ -510,7 +510,7 @@ mod mockchain_tests { mockchain.mine_blocks(1); sleep(Duration::from_millis(2000)).await; - let mempool_stream = index_reader.get_mempool_stream(&stale_nonfinalized_snapshot); + let mempool_stream = index_reader.get_mempool_stream(Some(&stale_nonfinalized_snapshot)); assert!(mempool_stream.is_none()); } diff --git a/zaino-state/src/chain_index/tests/mempool.rs b/zaino-state/src/chain_index/tests/mempool.rs index dd60586c4..e5bd6cd3d 100644 --- a/zaino-state/src/chain_index/tests/mempool.rs +++ b/zaino-state/src/chain_index/tests/mempool.rs @@ -237,7 +237,7 @@ async fn get_mempool_info() { }) .unwrap_or_default(); - let subscriber_mempool_info = subscriber.get_mempool_info().await.unwrap(); + let subscriber_mempool_info = subscriber.get_mempool_info().await; let expected_size: u64 = mempool_transactions.len() as u64; diff --git a/zaino-state/src/chain_index/types.rs b/zaino-state/src/chain_index/types.rs index b98df36ba..b0f294758 100644 --- a/zaino-state/src/chain_index/types.rs +++ b/zaino-state/src/chain_index/types.rs @@ -3173,6 +3173,68 @@ impl ZainoVersionedSerialise for OrchardTxList { } } +// *** Metadata Objects *** + +/// Holds information about the mempool state. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct MempoolInfo { + /// Current tx count + pub size: u64, + /// Sum of all tx sizes + pub bytes: u64, + /// Total memory usage for the mempool + pub usage: u64, +} + +impl ZainoVersionedSerialise for MempoolInfo { + const VERSION: u8 = version::V1; + + fn encode_body(&self, w: &mut W) -> io::Result<()> { + let mut w = w; + write_u64_le(&mut w, self.size)?; + write_u64_le(&mut w, self.bytes)?; + write_u64_le(&mut w, self.usage) + } + + fn decode_latest(r: &mut R) -> io::Result { + Self::decode_v1(r) + } + + fn decode_v1(r: &mut R) -> io::Result { + let mut r = r; + let size = read_u64_le(&mut r)?; + let bytes = read_u64_le(&mut r)?; + let usage = read_u64_le(&mut r)?; + Ok(MempoolInfo { size, bytes, usage }) + } +} + +/// 24 byte body. +impl FixedEncodedLen for MempoolInfo { + /// 8 byte size + 8 byte bytes + 8 byte usage + const ENCODED_LEN: usize = 8 + 8 + 8; +} + +impl From for MempoolInfo { + fn from(resp: zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse) -> Self { + MempoolInfo { + size: resp.size, + bytes: resp.bytes, + usage: resp.usage, + } + } +} + +impl From for zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse { + fn from(info: MempoolInfo) -> Self { + zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse { + size: info.size, + bytes: info.bytes, + usage: info.usage, + } + } +} + // *** Custom serde based debug serialisation *** #[cfg(test)] diff --git a/zaino-state/src/error.rs b/zaino-state/src/error.rs index 504708cf6..c9782883e 100644 --- a/zaino-state/src/error.rs +++ b/zaino-state/src/error.rs @@ -6,27 +6,6 @@ use std::{any::type_name, fmt::Display}; use zaino_fetch::jsonrpsee::connector::RpcRequestError; -impl From> for StateServiceError { - fn from(value: RpcRequestError) -> Self { - match value { - RpcRequestError::Transport(transport_error) => { - Self::JsonRpcConnectorError(transport_error) - } - RpcRequestError::Method(e) => Self::UnhandledRpcError(format!( - "{}: {}", - std::any::type_name::(), - e.to_string() - )), - RpcRequestError::JsonRpc(error) => Self::Custom(format!("bad argument: {error}")), - RpcRequestError::InternalUnrecoverable(e) => Self::Custom(e.to_string()), - RpcRequestError::ServerWorkQueueFull => { - Self::Custom("Server queue full. Handling for this not yet implemented".to_string()) - } - RpcRequestError::UnexpectedErrorResponse(error) => Self::Custom(format!("{error}")), - } - } -} - /// Errors related to the `StateService`. #[derive(Debug, thiserror::Error)] pub enum StateServiceError { @@ -49,6 +28,10 @@ pub enum StateServiceError { #[error("RPC error: {0:?}")] RpcError(#[from] zaino_fetch::jsonrpsee::connector::RpcError), + /// Chain index error. + #[error("Chain index error: {0}")] + ChainIndexError(#[from] ChainIndexError), + /// Error from the block cache. #[error("Mempool error: {0}")] BlockCacheError(#[from] BlockCacheError), @@ -105,6 +88,12 @@ impl From for tonic::Status { StateServiceError::RpcError(err) => { tonic::Status::internal(format!("RPC error: {err:?}")) } + StateServiceError::ChainIndexError(err) => match err.kind { + ChainIndexErrorKind::InternalServerError => tonic::Status::internal(err.message), + ChainIndexErrorKind::InvalidSnapshot => { + tonic::Status::failed_precondition(err.message) + } + }, StateServiceError::BlockCacheError(err) => { tonic::Status::internal(format!("BlockCache error: {err:?}")) } @@ -130,33 +119,23 @@ impl From for tonic::Status { } } -impl From> for FetchServiceError { +impl From> for StateServiceError { fn from(value: RpcRequestError) -> Self { match value { RpcRequestError::Transport(transport_error) => { - FetchServiceError::JsonRpcConnectorError(transport_error) - } - RpcRequestError::JsonRpc(error) => { - FetchServiceError::Critical(format!("argument failed to serialze: {error}")) - } - RpcRequestError::InternalUnrecoverable(e) => { - FetchServiceError::Critical(format!("Internal unrecoverable error: {e}")) + Self::JsonRpcConnectorError(transport_error) } - RpcRequestError::ServerWorkQueueFull => FetchServiceError::Critical( - "Server queue full. Handling for this not yet implemented".to_string(), - ), - RpcRequestError::Method(e) => FetchServiceError::Critical(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), + RpcRequestError::Method(e) => Self::UnhandledRpcError(format!( + "{}: {}", + std::any::type_name::(), e.to_string() )), - RpcRequestError::UnexpectedErrorResponse(error) => { - FetchServiceError::Critical(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - error - )) + RpcRequestError::JsonRpc(error) => Self::Custom(format!("bad argument: {error}")), + RpcRequestError::InternalUnrecoverable(e) => Self::Custom(e.to_string()), + RpcRequestError::ServerWorkQueueFull => { + Self::Custom("Server queue full. Handling for this not yet implemented".to_string()) } + RpcRequestError::UnexpectedErrorResponse(error) => Self::Custom(format!("{error}")), } } } @@ -172,13 +151,9 @@ pub enum FetchServiceError { #[error("JsonRpcConnector error: {0}")] JsonRpcConnectorError(#[from] zaino_fetch::jsonrpsee::error::TransportError), - /// Error from the block cache. - #[error("Mempool error: {0}")] - BlockCacheError(#[from] BlockCacheError), - - /// Error from the mempool. - #[error("Mempool error: {0}")] - MempoolError(#[from] MempoolError), + /// Chain index error. + #[error("Chain index error: {0}")] + ChainIndexError(#[from] ChainIndexError), /// RPC error in compatibility with zcashd. #[error("RPC error: {0:?}")] @@ -200,12 +175,12 @@ impl From for tonic::Status { FetchServiceError::JsonRpcConnectorError(err) => { tonic::Status::internal(format!("JsonRpcConnector error: {err}")) } - FetchServiceError::BlockCacheError(err) => { - tonic::Status::internal(format!("BlockCache error: {err}")) - } - FetchServiceError::MempoolError(err) => { - tonic::Status::internal(format!("Mempool error: {err}")) - } + FetchServiceError::ChainIndexError(err) => match err.kind { + ChainIndexErrorKind::InternalServerError => tonic::Status::internal(err.message), + ChainIndexErrorKind::InvalidSnapshot => { + tonic::Status::failed_precondition(err.message) + } + }, FetchServiceError::RpcError(err) => { tonic::Status::internal(format!("RPC error: {err:?}")) } @@ -216,33 +191,34 @@ impl From for tonic::Status { } } } -/// These aren't the best conversions, but the MempoolError should go away -/// in favor of a new type with the new chain cache is complete -impl From> for MempoolError { + +impl From> for FetchServiceError { fn from(value: RpcRequestError) -> Self { match value { RpcRequestError::Transport(transport_error) => { - MempoolError::JsonRpcConnectorError(transport_error) + FetchServiceError::JsonRpcConnectorError(transport_error) } RpcRequestError::JsonRpc(error) => { - MempoolError::Critical(format!("argument failed to serialze: {error}")) + FetchServiceError::Critical(format!("argument failed to serialze: {error}")) } RpcRequestError::InternalUnrecoverable(e) => { - MempoolError::Critical(format!("Internal unrecoverable error: {e}")) + FetchServiceError::Critical(format!("Internal unrecoverable error: {e}")) } - RpcRequestError::ServerWorkQueueFull => MempoolError::Critical( + RpcRequestError::ServerWorkQueueFull => FetchServiceError::Critical( "Server queue full. Handling for this not yet implemented".to_string(), ), - RpcRequestError::Method(e) => MempoolError::Critical(format!( + RpcRequestError::Method(e) => FetchServiceError::Critical(format!( "unhandled rpc-specific {} error: {}", type_name::(), e.to_string() )), - RpcRequestError::UnexpectedErrorResponse(error) => MempoolError::Critical(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - error - )), + RpcRequestError::UnexpectedErrorResponse(error) => { + FetchServiceError::Critical(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + error + )) + } } } } @@ -280,6 +256,37 @@ pub enum MempoolError { StatusError(StatusError), } +/// These aren't the best conversions, but the MempoolError should go away +/// in favor of a new type with the new chain cache is complete +impl From> for MempoolError { + fn from(value: RpcRequestError) -> Self { + match value { + RpcRequestError::Transport(transport_error) => { + MempoolError::JsonRpcConnectorError(transport_error) + } + RpcRequestError::JsonRpc(error) => { + MempoolError::Critical(format!("argument failed to serialze: {error}")) + } + RpcRequestError::InternalUnrecoverable(e) => { + MempoolError::Critical(format!("Internal unrecoverable error: {e}")) + } + RpcRequestError::ServerWorkQueueFull => MempoolError::Critical( + "Server queue full. Handling for this not yet implemented".to_string(), + ), + RpcRequestError::Method(e) => MempoolError::Critical(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + e.to_string() + )), + RpcRequestError::UnexpectedErrorResponse(error) => MempoolError::Critical(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + error + )), + } + } +} + /// Errors related to the `BlockCache`. #[derive(Debug, thiserror::Error)] pub enum BlockCacheError { @@ -323,38 +330,6 @@ pub enum BlockCacheError { #[error("Integer conversion error: {0}")] TryFromIntError(#[from] std::num::TryFromIntError), } -/// These aren't the best conversions, but the NonFinalizedStateError should go away -/// in favor of a new type with the new chain cache is complete -impl From> for NonFinalisedStateError { - fn from(value: RpcRequestError) -> Self { - match value { - RpcRequestError::Transport(transport_error) => { - NonFinalisedStateError::JsonRpcConnectorError(transport_error) - } - RpcRequestError::JsonRpc(error) => { - NonFinalisedStateError::Custom(format!("argument failed to serialze: {error}")) - } - RpcRequestError::InternalUnrecoverable(e) => { - NonFinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) - } - RpcRequestError::ServerWorkQueueFull => NonFinalisedStateError::Custom( - "Server queue full. Handling for this not yet implemented".to_string(), - ), - RpcRequestError::Method(e) => NonFinalisedStateError::Custom(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - e.to_string() - )), - RpcRequestError::UnexpectedErrorResponse(error) => { - NonFinalisedStateError::Custom(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - error - )) - } - } - } -} /// Errors related to the `NonFinalisedState`. #[derive(Debug, thiserror::Error)] @@ -379,30 +354,31 @@ pub enum NonFinalisedStateError { #[error("Status error: {0:?}")] StatusError(StatusError), } -/// These aren't the best conversions, but the FinalizedStateError should go away + +/// These aren't the best conversions, but the NonFinalizedStateError should go away /// in favor of a new type with the new chain cache is complete -impl From> for FinalisedStateError { +impl From> for NonFinalisedStateError { fn from(value: RpcRequestError) -> Self { match value { RpcRequestError::Transport(transport_error) => { - FinalisedStateError::JsonRpcConnectorError(transport_error) + NonFinalisedStateError::JsonRpcConnectorError(transport_error) } RpcRequestError::JsonRpc(error) => { - FinalisedStateError::Custom(format!("argument failed to serialze: {error}")) + NonFinalisedStateError::Custom(format!("argument failed to serialze: {error}")) } RpcRequestError::InternalUnrecoverable(e) => { - FinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) + NonFinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) } - RpcRequestError::ServerWorkQueueFull => FinalisedStateError::Custom( + RpcRequestError::ServerWorkQueueFull => NonFinalisedStateError::Custom( "Server queue full. Handling for this not yet implemented".to_string(), ), - RpcRequestError::Method(e) => FinalisedStateError::Custom(format!( + RpcRequestError::Method(e) => NonFinalisedStateError::Custom(format!( "unhandled rpc-specific {} error: {}", type_name::(), e.to_string() )), RpcRequestError::UnexpectedErrorResponse(error) => { - FinalisedStateError::Custom(format!( + NonFinalisedStateError::Custom(format!( "unhandled rpc-specific {} error: {}", type_name::(), error @@ -478,6 +454,39 @@ pub enum FinalisedStateError { IoError(#[from] std::io::Error), } +/// These aren't the best conversions, but the FinalizedStateError should go away +/// in favor of a new type with the new chain cache is complete +impl From> for FinalisedStateError { + fn from(value: RpcRequestError) -> Self { + match value { + RpcRequestError::Transport(transport_error) => { + FinalisedStateError::JsonRpcConnectorError(transport_error) + } + RpcRequestError::JsonRpc(error) => { + FinalisedStateError::Custom(format!("argument failed to serialze: {error}")) + } + RpcRequestError::InternalUnrecoverable(e) => { + FinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) + } + RpcRequestError::ServerWorkQueueFull => FinalisedStateError::Custom( + "Server queue full. Handling for this not yet implemented".to_string(), + ), + RpcRequestError::Method(e) => FinalisedStateError::Custom(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + e.to_string() + )), + RpcRequestError::UnexpectedErrorResponse(error) => { + FinalisedStateError::Custom(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + error + )) + } + } + } +} + /// A general error type to represent error StatusTypes. #[derive(Debug, Clone, thiserror::Error)] #[error("Unexpected status error: {server_status:?}")] @@ -562,6 +571,7 @@ impl ChainIndexError { } } } + impl From for ChainIndexError { fn from(value: FinalisedStateError) -> Self { let message = match &value { diff --git a/zaino-state/src/utils.rs b/zaino-state/src/utils.rs index 09e31a8b0..2ab1f4280 100644 --- a/zaino-state/src/utils.rs +++ b/zaino-state/src/utils.rs @@ -6,6 +6,8 @@ use zaino_proto::proto::service::BlockId; use zebra_chain::{block::Height, parameters::Network}; use zebra_state::HashOrHeight; +// *** Metadata structs *** + /// Zaino build info. #[derive(Debug, Clone)] pub(crate) struct BuildInfo { @@ -116,6 +118,8 @@ impl fmt::Display for ServiceMetadata { } } +// *** Data transforms *** + pub(crate) fn blockid_to_hashorheight(block_id: BlockId) -> Option { <[u8; 32]>::try_from(block_id.hash) .map(zebra_chain::block::Hash) @@ -128,3 +132,26 @@ pub(crate) fn blockid_to_hashorheight(block_id: BlockId) -> Option }) .ok() } + +/// Strips the ouputs and from all transactions, retains only +/// the nullifier from all orcard actions, and clears the chain +/// metadata from the block +pub(crate) fn compact_block_to_nullifiers( + mut block: zaino_proto::proto::compact_formats::CompactBlock, +) -> zaino_proto::proto::compact_formats::CompactBlock { + for ctransaction in &mut block.vtx { + ctransaction.outputs = Vec::new(); + for caction in &mut ctransaction.actions { + *caction = zaino_proto::proto::compact_formats::CompactOrchardAction { + nullifier: caction.nullifier.clone(), + ..Default::default() + } + } + } + + block.chain_metadata = Some(zaino_proto::proto::compact_formats::ChainMetadata { + sapling_commitment_tree_size: 0, + orchard_commitment_tree_size: 0, + }); + block +}