From 7360f5faeed2a446c4a5a87181630a6aeaecca1b Mon Sep 17 00:00:00 2001 From: idky137 Date: Thu, 9 Oct 2025 14:34:01 +0100 Subject: [PATCH 1/2] switched fetch service to use chainindex --- integration-tests/tests/chain_cache.rs | 4 +- zaino-state/src/backends/fetch.rs | 610 +++++++++++------- zaino-state/src/backends/state.rs | 2 +- zaino-state/src/chain_index.rs | 97 ++- .../src/chain_index/finalised_state.rs | 1 + .../src/chain_index/finalised_state/db.rs | 1 + .../src/chain_index/finalised_state/db/v1.rs | 1 + .../src/chain_index/finalised_state/reader.rs | 2 +- .../src/chain_index/finalised_state/router.rs | 1 + zaino-state/src/chain_index/mempool.rs | 9 +- .../src/chain_index/non_finalised_state.rs | 1 + zaino-state/src/chain_index/tests.rs | 6 +- zaino-state/src/chain_index/tests/mempool.rs | 2 +- zaino-state/src/chain_index/types.rs | 62 ++ zaino-state/src/error.rs | 224 ++++--- zaino-state/src/utils.rs | 27 + 16 files changed, 687 insertions(+), 363 deletions(-) diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index c05df6631..909174aaf 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -198,7 +198,7 @@ mod chain_query_interface { ) .await .unwrap(); - let index_reader = chain_index.subscriber().await; + let index_reader = chain_index.subscriber(); tokio::time::sleep(Duration::from_secs(3)).await; ( @@ -231,7 +231,7 @@ mod chain_query_interface { ) .await .unwrap(); - let index_reader = chain_index.subscriber().await; + let index_reader = chain_index.subscriber(); tokio::time::sleep(Duration::from_secs(3)).await; (test_manager, json_service, None, chain_index, index_reader) diff --git a/zaino-state/src/backends/fetch.rs b/zaino-state/src/backends/fetch.rs index 41959e946..488ad64eb 100644 --- a/zaino-state/src/backends/fetch.rs +++ b/zaino-state/src/backends/fetch.rs @@ -2,13 +2,15 @@ use futures::StreamExt; use hex::FromHex; -use std::time; +use std::{io::Cursor, time}; use tokio::{sync::mpsc, time::timeout}; use tonic::async_trait; use tracing::{info, warn}; use zebra_state::HashOrHeight; -use zebra_chain::{block::Height, subtree::NoteCommitmentSubtreeIndex}; +use zebra_chain::{ + block::Height, serialization::ZcashDeserialize as _, subtree::NoteCommitmentSubtreeIndex, +}; use zebra_rpc::{ client::{GetSubtreesByIndexResponse, GetTreestateResponse, ValidateAddressResponse}, methods::{ @@ -36,22 +38,21 @@ use zaino_proto::proto::{ }; use crate::{ - chain_index::{ - mempool::{Mempool, MempoolSubscriber}, - source::ValidatorConnector, - }, + chain_index::{source::ValidatorConnector, types}, config::FetchServiceConfig, - error::{BlockCacheError, FetchServiceError}, + error::FetchServiceError, indexer::{ handle_raw_transaction, IndexerSubscriber, LightWalletIndexer, ZcashIndexer, ZcashService, }, - local_cache::{BlockCache, BlockCacheSubscriber}, status::StatusType, stream::{ AddressStream, CompactBlockStream, CompactTransactionStream, RawTransactionStream, UtxoReplyStream, }, - utils::{blockid_to_hashorheight, get_build_info, ServiceMetadata}, + utils::{ + blockid_to_hashorheight, compact_block_to_nullifiers, get_build_info, ServiceMetadata, + }, + ChainIndex as _, NodeBackedChainIndex, NodeBackedChainIndexSubscriber, }; /// Chain fetch service backed by Zcashd's JsonRPC engine. @@ -66,10 +67,8 @@ use crate::{ pub struct FetchService { /// JsonRPC Client. fetcher: JsonRpSeeConnector, - /// Local compact block cache. - block_cache: BlockCache, - /// Internal mempool. - mempool: Mempool, + /// Core indexer. + indexer: NodeBackedChainIndex, /// Service metadata. data: ServiceMetadata, /// StateService config data. @@ -102,22 +101,14 @@ impl ZcashService for FetchService { ); info!("Using Zcash build: {}", data); - let block_cache = BlockCache::spawn(&fetcher, None, config.clone().into()) + let source = ValidatorConnector::Fetch(fetcher.clone()); + let indexer = NodeBackedChainIndex::new(source, config.clone().into()) .await - .map_err(|e| { - FetchServiceError::BlockCacheError(BlockCacheError::Custom(e.to_string())) - })?; - - let mempool_source = ValidatorConnector::Fetch(fetcher.clone()); - - let mempool = Mempool::spawn(mempool_source, None).await.map_err(|e| { - FetchServiceError::BlockCacheError(BlockCacheError::Custom(e.to_string())) - })?; + .unwrap(); let fetch_service = Self { fetcher, - block_cache, - mempool, + indexer, data, config, }; @@ -133,8 +124,7 @@ impl ZcashService for FetchService { fn get_subscriber(&self) -> IndexerSubscriber { IndexerSubscriber::new(FetchServiceSubscriber { fetcher: self.fetcher.clone(), - block_cache: self.block_cache.subscriber(), - mempool: self.mempool.subscriber(), + indexer: self.indexer.subscriber(), data: self.data.clone(), config: self.config.clone(), }) @@ -142,16 +132,16 @@ impl ZcashService for FetchService { /// Fetches the current status async fn status(&self) -> StatusType { - let mempool_status = self.mempool.status(); - let block_cache_status = self.block_cache.status(); - - mempool_status.combine(block_cache_status) + self.indexer.status() } /// Shuts down the StateService. fn close(&mut self) { - self.mempool.close(); - self.block_cache.close(); + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let _ = self.indexer.shutdown().await; + }); + }); } } @@ -168,10 +158,8 @@ impl Drop for FetchService { pub struct FetchServiceSubscriber { /// JsonRPC Client. pub fetcher: JsonRpSeeConnector, - /// Local compact block cache. - pub block_cache: BlockCacheSubscriber, - /// Internal mempool. - pub mempool: MempoolSubscriber, + /// Core indexer. + pub indexer: NodeBackedChainIndexSubscriber, /// Service metadata. pub data: ServiceMetadata, /// StateService config data. @@ -181,10 +169,7 @@ pub struct FetchServiceSubscriber { impl FetchServiceSubscriber { /// Fetches the current status pub fn status(&self) -> StatusType { - let mempool_status = self.mempool.status(); - let block_cache_status = self.block_cache.status(); - - mempool_status.combine(block_cache_status) + self.indexer.status() } /// Returns the network type running. @@ -247,7 +232,7 @@ impl ZcashIndexer for FetchServiceSubscriber { /// /// Zebra does not support this RPC call directly. async fn get_mempool_info(&self) -> Result { - Ok(self.mempool.get_mempool_info().await?) + Ok(self.indexer.get_mempool_info().await.into()) } /// Returns the proof-of-work difficulty as a multiple of the minimum difficulty. @@ -415,11 +400,11 @@ impl ZcashIndexer for FetchServiceSubscriber { async fn get_raw_mempool(&self) -> Result, Self::Error> { // Ok(self.fetcher.get_raw_mempool().await?.transactions) Ok(self - .mempool - .get_mempool() + .indexer + .get_mempool_txids() .await .into_iter() - .map(|(key, _)| key.txid) + .map(|mempool_txids| mempool_txids.iter().map(|txid| txid.to_string()).collect()) .collect()) } @@ -513,7 +498,9 @@ impl ZcashIndexer for FetchServiceSubscriber { } async fn chain_height(&self) -> Result { - Ok(self.block_cache.get_chain_height().await?) + Ok(Height( + self.indexer.snapshot_nonfinalized_state().best_tip.height.0, + )) } /// Returns the transaction ids made by the provided transparent addresses. /// @@ -582,16 +569,12 @@ impl ZcashIndexer for FetchServiceSubscriber { impl LightWalletIndexer for FetchServiceSubscriber { /// Return the height of the tip of the best chain async fn get_latest_block(&self) -> Result { - let latest_height = self.block_cache.get_chain_height().await?; - let latest_hash = self - .block_cache - .get_compact_block(latest_height.0.to_string()) - .await? - .hash; + let tip = self.indexer.snapshot_nonfinalized_state().best_tip; + dbg!(&tip); Ok(BlockId { - height: latest_height.0 as u64, - hash: latest_hash, + height: tip.height.0 as u64, + hash: tip.blockhash.0.to_vec(), }) } @@ -602,14 +585,59 @@ impl LightWalletIndexer for FetchServiceSubscriber { "Error: Invalid hash and/or height out of range. Failed to convert to u32.", )), )?; + + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let height = match hash_or_height { + HashOrHeight::Height(height) => height.0, + HashOrHeight::Hash(hash) => { + match self.indexer.get_block_height(&snapshot, hash.into()).await { + Ok(Some(height)) => height.0, + Ok(None) => { + return Err(FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Hash not founf in chain", + ))); + } + Err(_e) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::internal("Error: Internal db error."), + )); + } + } + } + }; + match self - .block_cache - .get_compact_block(hash_or_height.to_string()) + .indexer + .get_compact_block(&snapshot, types::Height(height)) .await { - Ok(block) => Ok(block), + Ok(Some(block)) => Ok(block), + Ok(None) => { + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + "Error: Failed to retrieve block from state.", + ))), + } + } Err(e) => { - let chain_height = self.block_cache.get_chain_height().await?.0; + let chain_height = snapshot.best_tip.height.0; match hash_or_height { HashOrHeight::Height(Height(height)) if height >= chain_height => Err( FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( @@ -643,35 +671,86 @@ impl LightWalletIndexer for FetchServiceSubscriber { /// /// NOTE: Currently this only returns Orchard nullifiers to follow Lightwalletd functionality but Sapling could be added if required by wallets. async fn get_block_nullifiers(&self, request: BlockId) -> Result { - let height: u32 = match request.height.try_into() { - Ok(height) => height, - Err(_) => { - return Err(FetchServiceError::TonicStatusError( - tonic::Status::invalid_argument( - "Error: Height out of range. Failed to convert to u32.", - ), - )); + let hash_or_height = blockid_to_hashorheight(request).ok_or( + FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Failed to convert to u32.", + )), + )?; + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let height = match hash_or_height { + HashOrHeight::Height(height) => height.0, + HashOrHeight::Hash(hash) => { + match self.indexer.get_block_height(&snapshot, hash.into()).await { + Ok(Some(height)) => height.0, + Ok(None) => { + return Err(FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Hash not founf in chain", + ))); + } + Err(_e) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::internal("Error: Internal db error."), + )); + } + } } }; match self - .block_cache - .get_compact_block_nullifiers(height.to_string()) + .indexer + .get_compact_block(&snapshot, types::Height(height)) .await { - Ok(block) => Ok(block), + Ok(Some(block)) => Ok(compact_block_to_nullifiers(block)), + Ok(None) => { + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + "Error: Failed to retrieve block from state.", + ))), + } + } Err(e) => { - let chain_height = self.block_cache.get_chain_height().await?.0; - if height >= chain_height { - Err(FetchServiceError::TonicStatusError(tonic::Status::out_of_range( - format!( - "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", - ) - ))) - } else { + let chain_height = snapshot.best_tip.height.0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => // TODO: Hide server error from clients before release. Currently useful for dev purposes. - Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( - format!("Error: Failed to retrieve block from node. Server Error: {e}",), - ))) + { + Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + format!("Error: Failed to retrieve block from node. Server Error: {e}",), + ))) + } } } } @@ -722,26 +801,49 @@ impl LightWalletIndexer for FetchServiceSubscriber { } else { false }; - let chain_height = self.block_cache.get_chain_height().await?.0; let fetch_service_clone = self.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); tokio::spawn(async move { let timeout = timeout(time::Duration::from_secs((service_timeout*4) as u64), async { + let snapshot = fetch_service_clone.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_tip.height.0; for height in start..=end { let height = if rev_order { end - (height - start) } else { height }; - match fetch_service_clone.block_cache.get_compact_block( - height.to_string(), + match fetch_service_clone.indexer.get_compact_block( + &snapshot, + types::Height(height), ).await { - Ok(block) => { + Ok(Some(block)) => { if channel_tx.send(Ok(block)).await.is_err() { break; } } + Ok(None) => if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) + .await + + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; + } + } + } else if channel_tx + .send(Err(tonic::Status::unknown("Internal error, Failed to fetch block."))) + .await + .is_err() + { + break; + } Err(e) => { if height >= chain_height { match channel_tx @@ -794,78 +896,128 @@ impl LightWalletIndexer for FetchServiceSubscriber { &self, request: BlockRange, ) -> Result { - let tonic_status_error = - |err| FetchServiceError::TonicStatusError(tonic::Status::invalid_argument(err)); - let mut start = match request.start { - Some(block_id) => match u32::try_from(block_id.height) { - Ok(height) => Ok(height), - Err(_) => Err("Error: Start height out of range. Failed to convert to u32."), + let mut start: u32 = match request.start { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: Start height out of range. Failed to convert to u32.", + ), + )); + } }, - None => Err("Error: No start height given."), - } - .map_err(tonic_status_error)?; - let mut end = match request.end { - Some(block_id) => match u32::try_from(block_id.height) { - Ok(height) => Ok(height), - Err(_) => Err("Error: End height out of range. Failed to convert to u32."), + None => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; + let mut end: u32 = match request.end { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: End height out of range. Failed to convert to u32.", + ), + )); + } }, - None => Err("Error: No start height given."), - } - .map_err(tonic_status_error)?; + None => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; let rev_order = if start > end { (start, end) = (end, start); true } else { false }; - let chain_height = self.block_cache.get_chain_height().await?.0; let fetch_service_clone = self.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); tokio::spawn(async move { - let timeout = timeout( - time::Duration::from_secs((service_timeout * 4) as u64), - async { + let timeout = timeout(time::Duration::from_secs((service_timeout*4) as u64), async { + let snapshot = fetch_service_clone.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_tip.height.0; for height in start..=end { let height = if rev_order { end - (height - start) } else { height }; - if let Err(e) = channel_tx - .send( - fetch_service_clone - .block_cache - .get_compact_block_nullifiers(height.to_string()) + match fetch_service_clone.indexer.get_compact_block( + &snapshot, + types::Height(height), + ).await { + Ok(Some(block)) => { + if channel_tx.send(Ok(compact_block_to_nullifiers(block))).await.is_err() { + break; + } + } + Ok(None) => if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) .await - .map_err(|e| { - if height >= chain_height { - tonic::Status::out_of_range(format!( - "Error: Height out of range [{height}]. Height requested \ - is greater than the best chain tip [{chain_height}].", - )) - } else { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - tonic::Status::unknown(e.to_string()) + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; } - }), - ) - .await - { - warn!("GetBlockRangeNullifiers channel closed unexpectedly: {}", e); - break; + } + } else if channel_tx + .send(Err(tonic::Status::unknown("Internal error, Failed to fetch block."))) + .await + .is_err() + { + break; + } + Err(e) => { + if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) + .await + + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; + } + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + break; + } + } + } } } - }, - ) - .await; - if timeout.is_err() { - channel_tx - .send(Err(tonic::Status::deadline_exceeded( - "Error: get_block_range_nullifiers gRPC request timed out.", - ))) - .await - .ok(); + }) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::deadline_exceeded( + "Error: get_block_range gRPC request timed out.", + ))) + .await + .ok(); + } } }); Ok(CompactBlockStream::new(channel_rx)) @@ -889,7 +1041,7 @@ impl LightWalletIndexer for FetchServiceSubscriber { let height: u64 = match height { Some(h) => h as u64, // Zebra returns None for mempool transactions, convert to `Mempool Height`. - None => self.block_cache.get_chain_height().await?.0 as u64, + None => self.indexer.snapshot_nonfinalized_state().best_tip.height.0 as u64, }; Ok(RawTransaction { @@ -1096,72 +1248,75 @@ impl LightWalletIndexer for FetchServiceSubscriber { }) .collect(); - let mempool = self.mempool.clone(); + let mempool = self.indexer.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); + tokio::spawn(async move { let timeout = timeout( time::Duration::from_secs((service_timeout * 4) as u64), async { - for (mempool_key, mempool_value) in - mempool.get_filtered_mempool(exclude_txids).await - { - let txid_bytes = match hex::decode(mempool_key.txid) { - Ok(bytes) => bytes, - Err(error) => { - if channel_tx - .send(Err(tonic::Status::unknown(error.to_string()))) - .await - .is_err() - { - break; - } else { - continue; - } - } - }; - match ::parse_from_slice( - mempool_value.serialized_tx.as_ref().as_ref(), - Some(vec![txid_bytes]), - None, - ) { - Ok(transaction) => { - // ParseFromSlice returns any data left after the conversion to a - // FullTransaction, If the conversion has succeeded this should be empty. - if transaction.0.is_empty() { - if channel_tx - .send( - transaction - .1 - .to_compact(0) - .map_err(|e| tonic::Status::unknown(e.to_string())), - ) - .await - .is_err() - { - break; + match mempool.get_mempool_transactions(exclude_txids).await { + Ok(transactions) => { + for serialized_transaction_bytes in transactions { + // TODO: This implementation should be cleaned up to not use parse_from_slice. + // This could be done by implementing try_from zebra_chain::transaction::Transaction for CompactTxData, + // (which implements to_compact())letting us avoid double parsing of transaction bytes. + let transaction: zebra_chain::transaction::Transaction = + zebra_chain::transaction::Transaction::zcash_deserialize( + &mut Cursor::new(&serialized_transaction_bytes), + ) + .unwrap(); + // TODO: Check this is in the correct format and does not need hex decoding or reversing. + let txid = transaction.hash().0.to_vec(); + + match ::parse_from_slice( + &serialized_transaction_bytes, + Some(vec![txid]), + None, + ) { + Ok(transaction) => { + // ParseFromSlice returns any data left after the conversion to a + // FullTransaction, If the conversion has succeeded this should be empty. + if transaction.0.is_empty() { + if channel_tx + .send(transaction.1.to_compact(0).map_err(|e| { + tonic::Status::unknown(e.to_string()) + })) + .await + .is_err() + { + break; + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown("Error: "))) + .await + .is_err() + { + break; + } + } } - } else { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - if channel_tx - .send(Err(tonic::Status::unknown("Error: "))) - .await - .is_err() - { - break; + Err(e) => { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + break; + } } } } - Err(e) => { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - if channel_tx - .send(Err(tonic::Status::unknown(e.to_string()))) - .await - .is_err() - { - break; - } - } + } + Err(e) => { + channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .ok(); } } }, @@ -1179,64 +1334,56 @@ impl LightWalletIndexer for FetchServiceSubscriber { } } }); - Ok(CompactTransactionStream::new(channel_rx)) } /// Return a stream of current Mempool transactions. This will keep the output stream open while /// there are mempool transactions. It will close the returned stream when a new block is mined. async fn get_mempool_stream(&self) -> Result { - let mut mempool = self.mempool.clone(); + let indexer = self.indexer.clone(); let service_timeout = self.config.service.timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service.channel_size as usize); - let mempool_height = self.block_cache.get_chain_height().await?.0; tokio::spawn(async move { let timeout = timeout( time::Duration::from_secs((service_timeout * 6) as u64), async { - let (mut mempool_stream, _mempool_handle) = match mempool - .get_mempool_stream(None) - .await - { - Ok(stream) => stream, - Err(e) => { - warn!("Error fetching stream from mempool: {:?}", e); + let mempool_height = indexer.snapshot_nonfinalized_state().best_tip.height.0; + match indexer.get_mempool_stream(None) { + Some(mut mempool_stream) => { + while let Some(result) = mempool_stream.next().await { + match result { + Ok(transaction_bytes) => { + if channel_tx + .send(Ok(RawTransaction { + data: transaction_bytes, + height: mempool_height as u64, + })) + .await + .is_err() + { + break; + } + } + Err(e) => { + channel_tx + .send(Err(tonic::Status::internal(format!( + "Error in mempool stream: {e:?}" + )))) + .await + .ok(); + break; + } + } + } + } + None => { + warn!("Error fetching stream from mempool, Incorrect chain tip!"); channel_tx .send(Err(tonic::Status::internal("Error getting mempool stream"))) .await .ok(); - return; } }; - while let Some(result) = mempool_stream.recv().await { - match result { - Ok((_mempool_key, mempool_value)) => { - if channel_tx - .send(Ok(RawTransaction { - data: mempool_value - .serialized_tx - .as_ref() - .as_ref() - .to_vec(), - height: mempool_height as u64, - })) - .await - .is_err() - { - break; - } - } - Err(e) => { - channel_tx - .send(Err(tonic::Status::internal(format!( - "Error in mempool stream: {e:?}" - )))) - .await - .ok(); - break; - } - } - } }, ) .await; @@ -1252,7 +1399,6 @@ impl LightWalletIndexer for FetchServiceSubscriber { } } }); - Ok(RawTransactionStream::new(channel_rx)) } diff --git a/zaino-state/src/backends/state.rs b/zaino-state/src/backends/state.rs index 6f122f03d..42203ea8e 100644 --- a/zaino-state/src/backends/state.rs +++ b/zaino-state/src/backends/state.rs @@ -1032,7 +1032,7 @@ impl ZcashIndexer for StateServiceSubscriber { /// `usage` is the total memory usage for the mempool, in bytes. /// the [optional `fullyNotified` field](), is only utilized for zcashd regtests, is deprecated, and is not included. async fn get_mempool_info(&self) -> Result { - Ok(self.mempool.get_mempool_info().await?) + Ok(self.mempool.get_mempool_info().await.into()) } async fn z_get_address_balance( diff --git a/zaino-state/src/chain_index.rs b/zaino-state/src/chain_index.rs index db7358da8..6f4fb1fa0 100644 --- a/zaino-state/src/chain_index.rs +++ b/zaino-state/src/chain_index.rs @@ -12,14 +12,15 @@ //! - NOTE: Full transaction and block data is served from the backend finalizer. use crate::chain_index::non_finalised_state::BestTip; -use crate::chain_index::types::{BestChainLocation, NonBestChainLocation}; +use crate::chain_index::types::{BestChainLocation, MempoolInfo, NonBestChainLocation}; use crate::error::{ChainIndexError, ChainIndexErrorKind, FinalisedStateError}; -use crate::IndexedBlock; use crate::{AtomicStatus, StatusType, SyncError}; +use crate::{IndexedBlock, TransactionHash}; use std::collections::HashSet; use std::{sync::Arc, time::Duration}; use futures::{FutureExt, Stream}; +use hex::FromHex as _; use non_finalised_state::NonfinalizedBlockCacheSnapshot; use source::{BlockchainSource, ValidatorConnector}; use tokio_stream::StreamExt; @@ -196,6 +197,21 @@ pub trait ChainIndex { end: Option, ) -> Option, Self::Error>>>; + /// Returns the *compact* block for the given height. + /// + /// Returns None if the specified height + /// is greater than the snapshot's tip + /// + /// TODO: Add range fetch method or update this? + #[allow(clippy::type_complexity)] + fn get_compact_block( + &self, + nonfinalized_snapshot: &Self::Snapshot, + height: types::Height, + ) -> impl std::future::Future< + Output = Result, Self::Error>, + >; + /// Finds the newest ancestor of the given block on the main /// chain, or the block itself if it is on the main chain. fn find_fork_point( @@ -238,6 +254,11 @@ pub trait ChainIndex { Output = Result<(Option, HashSet), Self::Error>, >; + /// Returns all txids currently in the mempool. + fn get_mempool_txids( + &self, + ) -> impl std::future::Future, Self::Error>>; + /// Returns all transactions currently in the mempool, filtered by `exclude_list`. /// /// The `exclude_list` may contain shortened transaction ID hex prefixes (client-endian). @@ -249,12 +270,18 @@ pub trait ChainIndex { /// Returns a stream of mempool transactions, ending the stream when the chain tip block hash /// changes (a new block is mined or a reorg occurs). /// - /// If the chain tip has changed from the given spanshot returns None. + /// If a snapshot is given and the chain tip has changed from the given spanshot, returns None. #[allow(clippy::type_complexity)] fn get_mempool_stream( &self, - snapshot: &Self::Snapshot, + snapshot: Option<&Self::Snapshot>, ) -> Option, Self::Error>>>; + + /// Returns Information about the mempool state: + /// - size: Current tx count + /// - bytes: Sum of all tx sizes + /// - usage: Total memory usage for the mempool + fn get_mempool_info(&self) -> impl std::future::Future; } /// The combined index. Contains a view of the mempool, and the full @@ -380,6 +407,7 @@ pub trait ChainIndex { /// - Unified access to finalized and non-finalized blockchain state /// - Automatic synchronization between state layers /// - Snapshot-based consistency for queries +#[derive(Debug)] pub struct NodeBackedChainIndex { blockchain_source: std::sync::Arc, #[allow(dead_code)] @@ -434,7 +462,7 @@ impl NodeBackedChainIndex { /// Creates a [`NodeBackedChainIndexSubscriber`] from self, /// a clone-safe, drop-safe, read-only view onto the running indexer. - pub async fn subscriber(&self) -> NodeBackedChainIndexSubscriber { + pub fn subscriber(&self) -> NodeBackedChainIndexSubscriber { NodeBackedChainIndexSubscriber { blockchain_source: self.blockchain_source.as_ref().clone(), mempool: self.mempool.subscriber(), @@ -532,7 +560,7 @@ impl NodeBackedChainIndex { /// Designed for concurrent efficiency. /// /// [`NodeBackedChainIndexSubscriber`] can safely be cloned and dropped freely. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct NodeBackedChainIndexSubscriber { blockchain_source: Source, mempool: mempool::MempoolSubscriber, @@ -695,6 +723,30 @@ impl ChainIndex for NodeBackedChainIndexSubscriber Result, Self::Error> { + if height <= nonfinalized_snapshot.best_tip.height { + Ok(Some( + match nonfinalized_snapshot.get_chainblock_by_height(&height) { + Some(block) => block.to_compact_block(), + None => match self.finalized_state.get_compact_block(height).await { + Ok(block) => block, + Err(_e) => return Err(ChainIndexError::database_hole(height)), + }, + }, + )) + } else { + Ok(None) + } + } + /// Finds the newest ancestor of the given block on the main /// chain, or the block itself if it is on the main chain. fn find_fork_point( @@ -868,6 +920,19 @@ impl ChainIndex for NodeBackedChainIndexSubscriber Result, Self::Error> { + self.mempool + .get_mempool() + .await + .into_iter() + .map(|(txid_key, _)| { + TransactionHash::from_hex(&txid_key.txid) + .map_err(ChainIndexError::backing_validator) + }) + .collect::>() + } + /// Returns all transactions currently in the mempool, filtered by `exclude_list`. /// /// The `exclude_list` may contain shortened transaction ID hex prefixes (client-endian). @@ -879,11 +944,9 @@ impl ChainIndex for NodeBackedChainIndexSubscriber, ) -> Result>, Self::Error> { - let subscriber = self.mempool.clone(); - // Use the mempool's own filtering (it already handles client-endian shortened prefixes). let pairs: Vec<(mempool::MempoolKey, mempool::MempoolValue)> = - subscriber.get_filtered_mempool(exclude_list).await; + self.mempool.get_filtered_mempool(exclude_list).await; // Transform to the Vec> that the trait requires. let bytes: Vec> = pairs @@ -897,16 +960,16 @@ impl ChainIndex for NodeBackedChainIndexSubscriber, ) -> Option, Self::Error>>> { - let expected_chain_tip = snapshot.best_tip.blockhash; + let expected_chain_tip = snapshot.map(|snapshot| snapshot.best_tip.blockhash); let mut subscriber = self.mempool.clone(); match subscriber - .get_mempool_stream(Some(expected_chain_tip)) + .get_mempool_stream(expected_chain_tip) .now_or_never() { Some(Ok((in_rx, _handle))) => { @@ -957,6 +1020,14 @@ impl ChainIndex for NodeBackedChainIndexSubscriber MempoolInfo { + self.mempool.get_mempool_info().await + } } impl NonFinalizedSnapshot for Arc diff --git a/zaino-state/src/chain_index/finalised_state.rs b/zaino-state/src/chain_index/finalised_state.rs index f40f9c907..6ed7851ab 100644 --- a/zaino-state/src/chain_index/finalised_state.rs +++ b/zaino-state/src/chain_index/finalised_state.rs @@ -30,6 +30,7 @@ use tokio::time::{interval, MissedTickBehavior}; use super::source::BlockchainSource; +#[derive(Debug)] pub(crate) struct ZainoDB { db: Arc, cfg: BlockCacheConfig, diff --git a/zaino-state/src/chain_index/finalised_state/db.rs b/zaino-state/src/chain_index/finalised_state/db.rs index 6b1b84d72..4f81b7419 100644 --- a/zaino-state/src/chain_index/finalised_state/db.rs +++ b/zaino-state/src/chain_index/finalised_state/db.rs @@ -31,6 +31,7 @@ use super::capability::Capability; pub(super) const VERSION_DIRS: [&str; 1] = ["v1"]; /// All concrete database implementations. +#[derive(Debug)] pub(crate) enum DbBackend { V0(DbV0), V1(DbV1), diff --git a/zaino-state/src/chain_index/finalised_state/db/v1.rs b/zaino-state/src/chain_index/finalised_state/db/v1.rs index 6ab7cb651..fea3fb68e 100644 --- a/zaino-state/src/chain_index/finalised_state/db/v1.rs +++ b/zaino-state/src/chain_index/finalised_state/db/v1.rs @@ -385,6 +385,7 @@ impl TransparentHistExt for DbV1 { /// Zaino’s Finalised state database V1. /// Implements a persistent LMDB-backed chain index for fast read access and verified data. +#[derive(Debug)] pub(crate) struct DbV1 { /// Shared LMDB environment. env: Arc, diff --git a/zaino-state/src/chain_index/finalised_state/reader.rs b/zaino-state/src/chain_index/finalised_state/reader.rs index 163b6c98c..259d14883 100644 --- a/zaino-state/src/chain_index/finalised_state/reader.rs +++ b/zaino-state/src/chain_index/finalised_state/reader.rs @@ -27,7 +27,7 @@ use std::sync::Arc; /// Immutable view onto an already-running [`ZainoDB`]. /// /// Carries a plain reference with the same lifetime as the parent DB -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct DbReader { /// Immutable read-only view onto the running ZainoDB pub(crate) inner: Arc, diff --git a/zaino-state/src/chain_index/finalised_state/router.rs b/zaino-state/src/chain_index/finalised_state/router.rs index 0010f0a23..86c9b783d 100644 --- a/zaino-state/src/chain_index/finalised_state/router.rs +++ b/zaino-state/src/chain_index/finalised_state/router.rs @@ -21,6 +21,7 @@ use std::sync::{ Arc, }; +#[derive(Debug)] pub(crate) struct Router { /// Primary active database. primary: ArcSwap, diff --git a/zaino-state/src/chain_index/mempool.rs b/zaino-state/src/chain_index/mempool.rs index 801bc31a3..d2cc92f25 100644 --- a/zaino-state/src/chain_index/mempool.rs +++ b/zaino-state/src/chain_index/mempool.rs @@ -4,7 +4,10 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ broadcast::{Broadcast, BroadcastSubscriber}, - chain_index::source::{BlockchainSource, BlockchainSourceError}, + chain_index::{ + source::{BlockchainSource, BlockchainSourceError}, + types::MempoolInfo, + }, error::{MempoolError, StatusError}, status::{AtomicStatus, StatusType}, BlockHash, @@ -521,7 +524,7 @@ impl MempoolSubscriber { /// Returns information about the mempool. Used by the `getmempoolinfo` RPC. /// Computed from local Broadcast state. - pub async fn get_mempool_info(&self) -> Result { + pub async fn get_mempool_info(&self) -> MempoolInfo { let mempool_transactions: Vec<(MempoolKey, MempoolValue)> = self.subscriber.get_filtered_state(&HashSet::new()); @@ -541,7 +544,7 @@ impl MempoolSubscriber { let usage: u64 = bytes.saturating_add(key_heap_bytes); - Ok(GetMempoolInfoResponse { size, bytes, usage }) + MempoolInfo { size, bytes, usage } } // TODO noted here too diff --git a/zaino-state/src/chain_index/non_finalised_state.rs b/zaino-state/src/chain_index/non_finalised_state.rs index 98fb7c5f8..1564b00e2 100644 --- a/zaino-state/src/chain_index/non_finalised_state.rs +++ b/zaino-state/src/chain_index/non_finalised_state.rs @@ -16,6 +16,7 @@ use zebra_chain::parameters::Network; use zebra_state::HashOrHeight; /// Holds the block cache +#[derive(Debug)] pub struct NonFinalizedState { /// We need access to the validator's best block hash, as well /// as a source of blocks diff --git a/zaino-state/src/chain_index/tests.rs b/zaino-state/src/chain_index/tests.rs index 836c09561..5d4ccc4df 100644 --- a/zaino-state/src/chain_index/tests.rs +++ b/zaino-state/src/chain_index/tests.rs @@ -90,7 +90,7 @@ mod mockchain_tests { let indexer = NodeBackedChainIndex::new(source.clone(), config) .await .unwrap(); - let index_reader = indexer.subscriber().await; + let index_reader = indexer.subscriber(); loop { let check_height: u32 = match active_mockchain_source { @@ -462,7 +462,7 @@ mod mockchain_tests { let mempool_stream_task = tokio::spawn(async move { let nonfinalized_snapshot = index_reader.snapshot_nonfinalized_state(); let mut mempool_stream = index_reader - .get_mempool_stream(&nonfinalized_snapshot) + .get_mempool_stream(Some(&nonfinalized_snapshot)) .expect("failed to create mempool stream"); let mut indexer_mempool_transactions: Vec = @@ -506,7 +506,7 @@ mod mockchain_tests { mockchain.mine_blocks(1); sleep(Duration::from_millis(2000)).await; - let mempool_stream = index_reader.get_mempool_stream(&stale_nonfinalized_snapshot); + let mempool_stream = index_reader.get_mempool_stream(Some(&stale_nonfinalized_snapshot)); assert!(mempool_stream.is_none()); } diff --git a/zaino-state/src/chain_index/tests/mempool.rs b/zaino-state/src/chain_index/tests/mempool.rs index dd60586c4..e5bd6cd3d 100644 --- a/zaino-state/src/chain_index/tests/mempool.rs +++ b/zaino-state/src/chain_index/tests/mempool.rs @@ -237,7 +237,7 @@ async fn get_mempool_info() { }) .unwrap_or_default(); - let subscriber_mempool_info = subscriber.get_mempool_info().await.unwrap(); + let subscriber_mempool_info = subscriber.get_mempool_info().await; let expected_size: u64 = mempool_transactions.len() as u64; diff --git a/zaino-state/src/chain_index/types.rs b/zaino-state/src/chain_index/types.rs index b98df36ba..b0f294758 100644 --- a/zaino-state/src/chain_index/types.rs +++ b/zaino-state/src/chain_index/types.rs @@ -3173,6 +3173,68 @@ impl ZainoVersionedSerialise for OrchardTxList { } } +// *** Metadata Objects *** + +/// Holds information about the mempool state. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct MempoolInfo { + /// Current tx count + pub size: u64, + /// Sum of all tx sizes + pub bytes: u64, + /// Total memory usage for the mempool + pub usage: u64, +} + +impl ZainoVersionedSerialise for MempoolInfo { + const VERSION: u8 = version::V1; + + fn encode_body(&self, w: &mut W) -> io::Result<()> { + let mut w = w; + write_u64_le(&mut w, self.size)?; + write_u64_le(&mut w, self.bytes)?; + write_u64_le(&mut w, self.usage) + } + + fn decode_latest(r: &mut R) -> io::Result { + Self::decode_v1(r) + } + + fn decode_v1(r: &mut R) -> io::Result { + let mut r = r; + let size = read_u64_le(&mut r)?; + let bytes = read_u64_le(&mut r)?; + let usage = read_u64_le(&mut r)?; + Ok(MempoolInfo { size, bytes, usage }) + } +} + +/// 24 byte body. +impl FixedEncodedLen for MempoolInfo { + /// 8 byte size + 8 byte bytes + 8 byte usage + const ENCODED_LEN: usize = 8 + 8 + 8; +} + +impl From for MempoolInfo { + fn from(resp: zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse) -> Self { + MempoolInfo { + size: resp.size, + bytes: resp.bytes, + usage: resp.usage, + } + } +} + +impl From for zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse { + fn from(info: MempoolInfo) -> Self { + zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse { + size: info.size, + bytes: info.bytes, + usage: info.usage, + } + } +} + // *** Custom serde based debug serialisation *** #[cfg(test)] diff --git a/zaino-state/src/error.rs b/zaino-state/src/error.rs index 504708cf6..c9782883e 100644 --- a/zaino-state/src/error.rs +++ b/zaino-state/src/error.rs @@ -6,27 +6,6 @@ use std::{any::type_name, fmt::Display}; use zaino_fetch::jsonrpsee::connector::RpcRequestError; -impl From> for StateServiceError { - fn from(value: RpcRequestError) -> Self { - match value { - RpcRequestError::Transport(transport_error) => { - Self::JsonRpcConnectorError(transport_error) - } - RpcRequestError::Method(e) => Self::UnhandledRpcError(format!( - "{}: {}", - std::any::type_name::(), - e.to_string() - )), - RpcRequestError::JsonRpc(error) => Self::Custom(format!("bad argument: {error}")), - RpcRequestError::InternalUnrecoverable(e) => Self::Custom(e.to_string()), - RpcRequestError::ServerWorkQueueFull => { - Self::Custom("Server queue full. Handling for this not yet implemented".to_string()) - } - RpcRequestError::UnexpectedErrorResponse(error) => Self::Custom(format!("{error}")), - } - } -} - /// Errors related to the `StateService`. #[derive(Debug, thiserror::Error)] pub enum StateServiceError { @@ -49,6 +28,10 @@ pub enum StateServiceError { #[error("RPC error: {0:?}")] RpcError(#[from] zaino_fetch::jsonrpsee::connector::RpcError), + /// Chain index error. + #[error("Chain index error: {0}")] + ChainIndexError(#[from] ChainIndexError), + /// Error from the block cache. #[error("Mempool error: {0}")] BlockCacheError(#[from] BlockCacheError), @@ -105,6 +88,12 @@ impl From for tonic::Status { StateServiceError::RpcError(err) => { tonic::Status::internal(format!("RPC error: {err:?}")) } + StateServiceError::ChainIndexError(err) => match err.kind { + ChainIndexErrorKind::InternalServerError => tonic::Status::internal(err.message), + ChainIndexErrorKind::InvalidSnapshot => { + tonic::Status::failed_precondition(err.message) + } + }, StateServiceError::BlockCacheError(err) => { tonic::Status::internal(format!("BlockCache error: {err:?}")) } @@ -130,33 +119,23 @@ impl From for tonic::Status { } } -impl From> for FetchServiceError { +impl From> for StateServiceError { fn from(value: RpcRequestError) -> Self { match value { RpcRequestError::Transport(transport_error) => { - FetchServiceError::JsonRpcConnectorError(transport_error) - } - RpcRequestError::JsonRpc(error) => { - FetchServiceError::Critical(format!("argument failed to serialze: {error}")) - } - RpcRequestError::InternalUnrecoverable(e) => { - FetchServiceError::Critical(format!("Internal unrecoverable error: {e}")) + Self::JsonRpcConnectorError(transport_error) } - RpcRequestError::ServerWorkQueueFull => FetchServiceError::Critical( - "Server queue full. Handling for this not yet implemented".to_string(), - ), - RpcRequestError::Method(e) => FetchServiceError::Critical(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), + RpcRequestError::Method(e) => Self::UnhandledRpcError(format!( + "{}: {}", + std::any::type_name::(), e.to_string() )), - RpcRequestError::UnexpectedErrorResponse(error) => { - FetchServiceError::Critical(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - error - )) + RpcRequestError::JsonRpc(error) => Self::Custom(format!("bad argument: {error}")), + RpcRequestError::InternalUnrecoverable(e) => Self::Custom(e.to_string()), + RpcRequestError::ServerWorkQueueFull => { + Self::Custom("Server queue full. Handling for this not yet implemented".to_string()) } + RpcRequestError::UnexpectedErrorResponse(error) => Self::Custom(format!("{error}")), } } } @@ -172,13 +151,9 @@ pub enum FetchServiceError { #[error("JsonRpcConnector error: {0}")] JsonRpcConnectorError(#[from] zaino_fetch::jsonrpsee::error::TransportError), - /// Error from the block cache. - #[error("Mempool error: {0}")] - BlockCacheError(#[from] BlockCacheError), - - /// Error from the mempool. - #[error("Mempool error: {0}")] - MempoolError(#[from] MempoolError), + /// Chain index error. + #[error("Chain index error: {0}")] + ChainIndexError(#[from] ChainIndexError), /// RPC error in compatibility with zcashd. #[error("RPC error: {0:?}")] @@ -200,12 +175,12 @@ impl From for tonic::Status { FetchServiceError::JsonRpcConnectorError(err) => { tonic::Status::internal(format!("JsonRpcConnector error: {err}")) } - FetchServiceError::BlockCacheError(err) => { - tonic::Status::internal(format!("BlockCache error: {err}")) - } - FetchServiceError::MempoolError(err) => { - tonic::Status::internal(format!("Mempool error: {err}")) - } + FetchServiceError::ChainIndexError(err) => match err.kind { + ChainIndexErrorKind::InternalServerError => tonic::Status::internal(err.message), + ChainIndexErrorKind::InvalidSnapshot => { + tonic::Status::failed_precondition(err.message) + } + }, FetchServiceError::RpcError(err) => { tonic::Status::internal(format!("RPC error: {err:?}")) } @@ -216,33 +191,34 @@ impl From for tonic::Status { } } } -/// These aren't the best conversions, but the MempoolError should go away -/// in favor of a new type with the new chain cache is complete -impl From> for MempoolError { + +impl From> for FetchServiceError { fn from(value: RpcRequestError) -> Self { match value { RpcRequestError::Transport(transport_error) => { - MempoolError::JsonRpcConnectorError(transport_error) + FetchServiceError::JsonRpcConnectorError(transport_error) } RpcRequestError::JsonRpc(error) => { - MempoolError::Critical(format!("argument failed to serialze: {error}")) + FetchServiceError::Critical(format!("argument failed to serialze: {error}")) } RpcRequestError::InternalUnrecoverable(e) => { - MempoolError::Critical(format!("Internal unrecoverable error: {e}")) + FetchServiceError::Critical(format!("Internal unrecoverable error: {e}")) } - RpcRequestError::ServerWorkQueueFull => MempoolError::Critical( + RpcRequestError::ServerWorkQueueFull => FetchServiceError::Critical( "Server queue full. Handling for this not yet implemented".to_string(), ), - RpcRequestError::Method(e) => MempoolError::Critical(format!( + RpcRequestError::Method(e) => FetchServiceError::Critical(format!( "unhandled rpc-specific {} error: {}", type_name::(), e.to_string() )), - RpcRequestError::UnexpectedErrorResponse(error) => MempoolError::Critical(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - error - )), + RpcRequestError::UnexpectedErrorResponse(error) => { + FetchServiceError::Critical(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + error + )) + } } } } @@ -280,6 +256,37 @@ pub enum MempoolError { StatusError(StatusError), } +/// These aren't the best conversions, but the MempoolError should go away +/// in favor of a new type with the new chain cache is complete +impl From> for MempoolError { + fn from(value: RpcRequestError) -> Self { + match value { + RpcRequestError::Transport(transport_error) => { + MempoolError::JsonRpcConnectorError(transport_error) + } + RpcRequestError::JsonRpc(error) => { + MempoolError::Critical(format!("argument failed to serialze: {error}")) + } + RpcRequestError::InternalUnrecoverable(e) => { + MempoolError::Critical(format!("Internal unrecoverable error: {e}")) + } + RpcRequestError::ServerWorkQueueFull => MempoolError::Critical( + "Server queue full. Handling for this not yet implemented".to_string(), + ), + RpcRequestError::Method(e) => MempoolError::Critical(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + e.to_string() + )), + RpcRequestError::UnexpectedErrorResponse(error) => MempoolError::Critical(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + error + )), + } + } +} + /// Errors related to the `BlockCache`. #[derive(Debug, thiserror::Error)] pub enum BlockCacheError { @@ -323,38 +330,6 @@ pub enum BlockCacheError { #[error("Integer conversion error: {0}")] TryFromIntError(#[from] std::num::TryFromIntError), } -/// These aren't the best conversions, but the NonFinalizedStateError should go away -/// in favor of a new type with the new chain cache is complete -impl From> for NonFinalisedStateError { - fn from(value: RpcRequestError) -> Self { - match value { - RpcRequestError::Transport(transport_error) => { - NonFinalisedStateError::JsonRpcConnectorError(transport_error) - } - RpcRequestError::JsonRpc(error) => { - NonFinalisedStateError::Custom(format!("argument failed to serialze: {error}")) - } - RpcRequestError::InternalUnrecoverable(e) => { - NonFinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) - } - RpcRequestError::ServerWorkQueueFull => NonFinalisedStateError::Custom( - "Server queue full. Handling for this not yet implemented".to_string(), - ), - RpcRequestError::Method(e) => NonFinalisedStateError::Custom(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - e.to_string() - )), - RpcRequestError::UnexpectedErrorResponse(error) => { - NonFinalisedStateError::Custom(format!( - "unhandled rpc-specific {} error: {}", - type_name::(), - error - )) - } - } - } -} /// Errors related to the `NonFinalisedState`. #[derive(Debug, thiserror::Error)] @@ -379,30 +354,31 @@ pub enum NonFinalisedStateError { #[error("Status error: {0:?}")] StatusError(StatusError), } -/// These aren't the best conversions, but the FinalizedStateError should go away + +/// These aren't the best conversions, but the NonFinalizedStateError should go away /// in favor of a new type with the new chain cache is complete -impl From> for FinalisedStateError { +impl From> for NonFinalisedStateError { fn from(value: RpcRequestError) -> Self { match value { RpcRequestError::Transport(transport_error) => { - FinalisedStateError::JsonRpcConnectorError(transport_error) + NonFinalisedStateError::JsonRpcConnectorError(transport_error) } RpcRequestError::JsonRpc(error) => { - FinalisedStateError::Custom(format!("argument failed to serialze: {error}")) + NonFinalisedStateError::Custom(format!("argument failed to serialze: {error}")) } RpcRequestError::InternalUnrecoverable(e) => { - FinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) + NonFinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) } - RpcRequestError::ServerWorkQueueFull => FinalisedStateError::Custom( + RpcRequestError::ServerWorkQueueFull => NonFinalisedStateError::Custom( "Server queue full. Handling for this not yet implemented".to_string(), ), - RpcRequestError::Method(e) => FinalisedStateError::Custom(format!( + RpcRequestError::Method(e) => NonFinalisedStateError::Custom(format!( "unhandled rpc-specific {} error: {}", type_name::(), e.to_string() )), RpcRequestError::UnexpectedErrorResponse(error) => { - FinalisedStateError::Custom(format!( + NonFinalisedStateError::Custom(format!( "unhandled rpc-specific {} error: {}", type_name::(), error @@ -478,6 +454,39 @@ pub enum FinalisedStateError { IoError(#[from] std::io::Error), } +/// These aren't the best conversions, but the FinalizedStateError should go away +/// in favor of a new type with the new chain cache is complete +impl From> for FinalisedStateError { + fn from(value: RpcRequestError) -> Self { + match value { + RpcRequestError::Transport(transport_error) => { + FinalisedStateError::JsonRpcConnectorError(transport_error) + } + RpcRequestError::JsonRpc(error) => { + FinalisedStateError::Custom(format!("argument failed to serialze: {error}")) + } + RpcRequestError::InternalUnrecoverable(e) => { + FinalisedStateError::Custom(format!("Internal unrecoverable error: {e}")) + } + RpcRequestError::ServerWorkQueueFull => FinalisedStateError::Custom( + "Server queue full. Handling for this not yet implemented".to_string(), + ), + RpcRequestError::Method(e) => FinalisedStateError::Custom(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + e.to_string() + )), + RpcRequestError::UnexpectedErrorResponse(error) => { + FinalisedStateError::Custom(format!( + "unhandled rpc-specific {} error: {}", + type_name::(), + error + )) + } + } + } +} + /// A general error type to represent error StatusTypes. #[derive(Debug, Clone, thiserror::Error)] #[error("Unexpected status error: {server_status:?}")] @@ -562,6 +571,7 @@ impl ChainIndexError { } } } + impl From for ChainIndexError { fn from(value: FinalisedStateError) -> Self { let message = match &value { diff --git a/zaino-state/src/utils.rs b/zaino-state/src/utils.rs index 09e31a8b0..2ab1f4280 100644 --- a/zaino-state/src/utils.rs +++ b/zaino-state/src/utils.rs @@ -6,6 +6,8 @@ use zaino_proto::proto::service::BlockId; use zebra_chain::{block::Height, parameters::Network}; use zebra_state::HashOrHeight; +// *** Metadata structs *** + /// Zaino build info. #[derive(Debug, Clone)] pub(crate) struct BuildInfo { @@ -116,6 +118,8 @@ impl fmt::Display for ServiceMetadata { } } +// *** Data transforms *** + pub(crate) fn blockid_to_hashorheight(block_id: BlockId) -> Option { <[u8; 32]>::try_from(block_id.hash) .map(zebra_chain::block::Hash) @@ -128,3 +132,26 @@ pub(crate) fn blockid_to_hashorheight(block_id: BlockId) -> Option }) .ok() } + +/// Strips the ouputs and from all transactions, retains only +/// the nullifier from all orcard actions, and clears the chain +/// metadata from the block +pub(crate) fn compact_block_to_nullifiers( + mut block: zaino_proto::proto::compact_formats::CompactBlock, +) -> zaino_proto::proto::compact_formats::CompactBlock { + for ctransaction in &mut block.vtx { + ctransaction.outputs = Vec::new(); + for caction in &mut ctransaction.actions { + *caction = zaino_proto::proto::compact_formats::CompactOrchardAction { + nullifier: caction.nullifier.clone(), + ..Default::default() + } + } + } + + block.chain_metadata = Some(zaino_proto::proto::compact_formats::ChainMetadata { + sapling_commitment_tree_size: 0, + orchard_commitment_tree_size: 0, + }); + block +} From d61043cb15802da02715c1a1fb6e234c07627df9 Mon Sep 17 00:00:00 2001 From: idky137 Date: Thu, 9 Oct 2025 16:00:08 +0100 Subject: [PATCH 2/2] started updating tests --- integration-tests/tests/fetch_service.rs | 185 ++++++++++++----------- integration-tests/tests/json_server.rs | 44 +++--- integration-tests/tests/state_service.rs | 96 ++++++------ integration-tests/tests/test_vectors.rs | 2 +- zaino-common/src/network.rs | 6 +- zaino-testutils/src/lib.rs | 82 +++++----- 6 files changed, 218 insertions(+), 197 deletions(-) diff --git a/integration-tests/tests/fetch_service.rs b/integration-tests/tests/fetch_service.rs index 2d7f93809..cbee05239 100644 --- a/integration-tests/tests/fetch_service.rs +++ b/integration-tests/tests/fetch_service.rs @@ -7,8 +7,8 @@ use zaino_proto::proto::service::{ TransparentAddressBlockFilter, TxFilter, }; use zaino_state::{ - BackendType, FetchService, FetchServiceConfig, FetchServiceSubscriber, LightWalletIndexer, - StatusType, ZcashIndexer, ZcashService as _, + BackendType, ChainIndex as _, FetchService, FetchServiceConfig, FetchServiceSubscriber, + LightWalletIndexer, StatusType, ZcashIndexer, ZcashService as _, }; use zaino_testutils::Validator as _; use zaino_testutils::{TestManager, ValidatorKind}; @@ -299,23 +299,28 @@ pub async fn test_get_mempool_info(validator: &ValidatorKind) { let info = fetch_service_subscriber.get_mempool_info().await.unwrap(); // Derive expected values directly from the current mempool contents. - let entries = fetch_service_subscriber.mempool.get_mempool().await; + + let keys = fetch_service_subscriber + .indexer + .get_mempool_txids() + .await + .unwrap(); + + let values = fetch_service_subscriber + .indexer + .get_mempool_transactions(Vec::new()) + .await + .unwrap(); // Size - assert_eq!(info.size, entries.len() as u64); + assert_eq!(info.size, values.len() as u64); assert!(info.size >= 1); // Bytes: sum of SerializedTransaction lengths - let expected_bytes: u64 = entries - .iter() - .map(|(_, value)| value.serialized_tx.as_ref().as_ref().len() as u64) - .sum(); + let expected_bytes: u64 = values.iter().map(|entry| entry.len() as u64).sum(); // Key heap bytes: sum of txid String capacities - let expected_key_heap_bytes: u64 = entries - .iter() - .map(|(key, _)| key.txid.capacity() as u64) - .sum(); + let expected_key_heap_bytes: u64 = keys.iter().map(|key| key.0.len() as u64).sum(); let expected_usage = expected_bytes.saturating_add(expected_key_heap_bytes); @@ -476,12 +481,12 @@ async fn fetch_service_get_address_tx_ids(validator: &ValidatorKind) { test_manager.local_net.generate_blocks(1).await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let chain_height = fetch_service_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + let chain_height: u32 = fetch_service_subscriber + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); dbg!(&chain_height); let fetch_service_txids = fetch_service_subscriber @@ -932,12 +937,12 @@ async fn fetch_service_get_taddress_txids(validator: &ValidatorKind) { test_manager.local_net.generate_blocks(1).await.unwrap(); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let chain_height = fetch_service_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + let chain_height: u32 = fetch_service_subscriber + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); dbg!(&chain_height); let block_filter = TransparentAddressBlockFilter { @@ -1358,12 +1363,12 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn regtest_no_cache() { launch_fetch_service(&ValidatorKind::Zcashd, None).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore = "We no longer use chain caches. See zcashd::launch::regtest_no_cache."] pub(crate) async fn regtest_with_cache() { launch_fetch_service( @@ -1378,7 +1383,7 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn validate_address() { fetch_service_validate_address(&ValidatorKind::Zcashd).await; } @@ -1388,27 +1393,27 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_balance() { fetch_service_get_address_balance(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_raw() { fetch_service_get_block_raw(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_object() { fetch_service_get_block_object(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_mempool() { fetch_service_get_raw_mempool(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_info() { test_get_mempool_info(&ValidatorKind::Zcashd).await; } @@ -1417,128 +1422,128 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn get_treestate() { fetch_service_z_get_treestate(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtrees_by_index() { fetch_service_z_get_subtrees_by_index(&ValidatorKind::Zcashd).await; } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_transaction() { fetch_service_get_raw_transaction(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_tx_ids() { fetch_service_get_address_tx_ids(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_utxos() { fetch_service_get_address_utxos(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_block() { fetch_service_get_latest_block(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block() { fetch_service_get_block(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn difficulty() { assert_fetch_service_difficulty_matches_rpc(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn best_blockhash() { fetch_service_get_best_blockhash(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_count() { fetch_service_get_block_count(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_nullifiers() { fetch_service_get_block_nullifiers(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range() { fetch_service_get_block_range(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range_nullifiers() { fetch_service_get_block_range_nullifiers(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mined() { fetch_service_get_transaction_mined(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mempool() { fetch_service_get_transaction_mempool(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_txids() { fetch_service_get_taddress_txids(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_balance() { fetch_service_get_taddress_balance(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_tx() { fetch_service_get_mempool_tx(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_stream() { fetch_service_get_mempool_stream(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn tree_state() { fetch_service_get_tree_state(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_tree_state() { fetch_service_get_latest_tree_state(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtree_roots() { fetch_service_get_subtree_roots(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos() { fetch_service_get_taddress_utxos(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos_stream() { fetch_service_get_taddress_utxos_stream(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn lightd_info() { fetch_service_get_lightd_info(&ValidatorKind::Zcashd).await; } @@ -1553,12 +1558,12 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn regtest_no_cache() { launch_fetch_service(&ValidatorKind::Zebrad, None).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore = "We no longer use chain caches. See zebrad::launch::regtest_no_cache."] pub(crate) async fn regtest_with_cache() { launch_fetch_service( @@ -1573,7 +1578,7 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn validate_address() { fetch_service_validate_address(&ValidatorKind::Zebrad).await; } @@ -1583,27 +1588,27 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_balance() { fetch_service_get_address_balance(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_raw() { fetch_service_get_block_raw(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_object() { fetch_service_get_block_object(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_mempool() { fetch_service_get_raw_mempool(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_info() { test_get_mempool_info(&ValidatorKind::Zebrad).await; } @@ -1612,128 +1617,128 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn treestate() { fetch_service_z_get_treestate(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtrees_by_index() { fetch_service_z_get_subtrees_by_index(&ValidatorKind::Zebrad).await; } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_transaction() { fetch_service_get_raw_transaction(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_tx_ids() { fetch_service_get_address_tx_ids(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_utxos() { fetch_service_get_address_utxos(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_block() { fetch_service_get_latest_block(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block() { fetch_service_get_block(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn difficulty() { assert_fetch_service_difficulty_matches_rpc(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn best_blockhash() { fetch_service_get_best_blockhash(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_count() { fetch_service_get_block_count(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_nullifiers() { fetch_service_get_block_nullifiers(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range() { fetch_service_get_block_range(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range_nullifiers() { fetch_service_get_block_range_nullifiers(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mined() { fetch_service_get_transaction_mined(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mempool() { fetch_service_get_transaction_mempool(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_txids() { fetch_service_get_taddress_txids(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_balance() { fetch_service_get_taddress_balance(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_tx() { fetch_service_get_mempool_tx(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_stream() { fetch_service_get_mempool_stream(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn tree_state() { fetch_service_get_tree_state(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_tree_state() { fetch_service_get_latest_tree_state(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtree_roots() { fetch_service_get_subtree_roots(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos() { fetch_service_get_taddress_utxos(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos_stream() { fetch_service_get_taddress_utxos_stream(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn lightd_info() { fetch_service_get_lightd_info(&ValidatorKind::Zebrad).await; } diff --git a/integration-tests/tests/json_server.rs b/integration-tests/tests/json_server.rs index e7b67f9bf..31d4411a9 100644 --- a/integration-tests/tests/json_server.rs +++ b/integration-tests/tests/json_server.rs @@ -1,8 +1,8 @@ use zaino_common::network::ActivationHeights; use zaino_common::{DatabaseConfig, ServiceConfig, StorageConfig}; use zaino_state::{ - BackendType, FetchService, FetchServiceConfig, FetchServiceSubscriber, ZcashIndexer, - ZcashService as _, + BackendType, ChainIndex, FetchService, FetchServiceConfig, FetchServiceSubscriber, + ZcashIndexer, ZcashService as _, }; use zaino_testutils::{from_inputs, Validator as _}; use zaino_testutils::{TestManager, ValidatorKind}; @@ -583,11 +583,11 @@ async fn get_address_tx_ids_inner() { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let chain_height = zcashd_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); dbg!(&chain_height); let zcashd_txids = zcashd_subscriber @@ -670,27 +670,27 @@ mod zcashd { pub(crate) mod zcash_indexer { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn check_info_no_cookie() { launch_json_server_check_info(false).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn check_info_with_cookie() { launch_json_server_check_info(false).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn z_get_address_balance() { z_get_address_balance_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_best_blockhash() { get_best_blockhash_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_block_count() { get_block_count_inner().await; } @@ -699,7 +699,7 @@ mod zcashd { /// /// This tests generates blocks and checks that the difficulty is the same between zcashd and zaino /// after each block is generated. - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_difficulty() { let ( mut test_manager, @@ -723,47 +723,47 @@ mod zcashd { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn validate_address() { validate_address_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn z_get_block() { z_get_block_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_raw_mempool() { get_raw_mempool_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_mempool_info() { get_mempool_info_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn z_get_treestate() { z_get_treestate_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn z_get_subtrees_by_index() { z_get_subtrees_by_index_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_raw_transaction() { get_raw_transaction_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_address_tx_ids() { get_address_tx_ids_inner().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn z_get_address_utxos() { z_get_address_utxos_inner().await; } diff --git a/integration-tests/tests/state_service.rs b/integration-tests/tests/state_service.rs index d9efa422b..6b0b406ac 100644 --- a/integration-tests/tests/state_service.rs +++ b/integration-tests/tests/state_service.rs @@ -1,6 +1,6 @@ use zaino_common::network::ActivationHeights; use zaino_common::{DatabaseConfig, ServiceConfig, StorageConfig}; -use zaino_state::BackendType; +use zaino_state::{BackendType, ChainIndex as _}; use zaino_state::{ FetchService, FetchServiceConfig, FetchServiceSubscriber, LightWalletIndexer, StateService, StateServiceConfig, StateServiceSubscriber, ZcashIndexer, ZcashService as _, @@ -857,11 +857,12 @@ async fn state_service_get_address_tx_ids(validator: &ValidatorKind) { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let chain_height = fetch_service_subscriber - .block_cache - .get_chain_height() - .await - .unwrap() - .0; + .indexer + .snapshot_nonfinalized_state() + .best_tip + .height + .into(); + dbg!(&chain_height); let fetch_service_txids = fetch_service_subscriber @@ -1043,12 +1044,12 @@ mod zebrad { use super::*; use zaino_testutils::ZEBRAD_CHAIN_CACHE_DIR; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn regtest_no_cache() { state_service_check_info(&ValidatorKind::Zebrad, None, NetworkKind::Regtest).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn state_service_chaintip_update_subscriber() { let ( test_manager, @@ -1081,7 +1082,7 @@ mod zebrad { } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore = "We no longer use chain caches. See zcashd::check_info::regtest_no_cache."] async fn regtest_with_cache() { state_service_check_info( @@ -1093,7 +1094,7 @@ mod zebrad { } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn testnet() { state_service_check_info( &ValidatorKind::Zebrad, @@ -1108,40 +1109,40 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn address_utxos() { state_service_get_address_utxos(&ValidatorKind::Zebrad).await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn address_utxos_testnet() { state_service_get_address_utxos_testnet().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn address_tx_ids_regtest() { state_service_get_address_tx_ids(&ValidatorKind::Zebrad).await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn address_tx_ids_testnet() { state_service_get_address_tx_ids_testnet().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn raw_transaction_regtest() { state_service_get_raw_transaction(&ValidatorKind::Zebrad).await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn raw_transaction_testnet() { state_service_get_raw_transaction_testnet().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn best_blockhash() { let ( test_manager, @@ -1167,7 +1168,7 @@ mod zebrad { assert_eq!(fetch_service_bbh, state_service_bbh); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_count() { let ( mut test_manager, @@ -1195,7 +1196,7 @@ mod zebrad { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn difficulty() { let ( mut test_manager, @@ -1239,36 +1240,36 @@ mod zebrad { mod z { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtrees_by_index_regtest() { state_service_z_get_subtrees_by_index(&ValidatorKind::Zebrad).await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtrees_by_index_testnet() { state_service_z_get_subtrees_by_index_testnet().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn treestate_regtest() { state_service_z_get_treestate(&ValidatorKind::Zebrad).await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn treestate_testnet() { state_service_z_get_treestate_testnet().await; } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn raw_mempool_regtest() { state_service_get_raw_mempool(&ValidatorKind::Zebrad).await; } /// `getmempoolinfo` computed from local Broadcast state - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_mempool_info() { let ( mut test_manager, @@ -1343,19 +1344,19 @@ mod zebrad { } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn raw_mempool_testnet() { state_service_get_raw_mempool_testnet().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_object_regtest() { state_service_get_block_object(&ValidatorKind::Zebrad, None, NetworkKind::Regtest) .await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_object_testnet() { state_service_get_block_object( &ValidatorKind::Zebrad, @@ -1365,13 +1366,13 @@ mod zebrad { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_raw_regtest() { state_service_get_block_raw(&ValidatorKind::Zebrad, None, NetworkKind::Regtest).await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_raw_testnet() { state_service_get_block_raw( &ValidatorKind::Zebrad, @@ -1381,13 +1382,13 @@ mod zebrad { .await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn address_balance_regtest() { state_service_get_address_balance(&ValidatorKind::Zebrad).await; } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn address_balance_testnet() { state_service_get_address_balance_testnet().await; } @@ -1401,7 +1402,7 @@ mod zebrad { use zebra_rpc::methods::GetAddressTxIdsRequest; use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_latest_block() { let ( test_manager, @@ -1427,7 +1428,7 @@ mod zebrad { assert_eq!(fetch_service_block, state_service_block); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_block() { let ( test_manager, @@ -1473,7 +1474,8 @@ mod zebrad { assert_eq!(fetch_service_block_by_hash, state_service_block_by_hash); assert_eq!(state_service_block_by_hash, state_service_block_by_height) } - #[tokio::test] + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_tree_state() { let ( test_manager, @@ -1509,7 +1511,8 @@ mod zebrad { state_service_treestate_by_height ); } - #[tokio::test] + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_subtree_roots() { let ( test_manager, @@ -1552,7 +1555,8 @@ mod zebrad { state_service_sapling_subtree_roots ); } - #[tokio::test] + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_latest_tree_state() { let ( test_manager, @@ -1644,15 +1648,17 @@ mod zebrad { } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_block_range_full() { get_block_range_helper(false).await; } - #[tokio::test] + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_block_range_nullifiers() { get_block_range_helper(true).await; } - #[tokio::test] + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_transaction() { let ( mut test_manager, @@ -1707,7 +1713,7 @@ mod zebrad { assert_eq!(fetch_service_raw_transaction, state_service_raw_transaction); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_taddress_txids() { let ( mut test_manager, @@ -1746,7 +1752,7 @@ mod zebrad { assert_eq!(fetch_service_taddress_txids, state_service_taddress_txids); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_address_utxos_stream() { let ( mut test_manager, @@ -1805,7 +1811,8 @@ mod zebrad { .as_ref() ); } - #[tokio::test] + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_address_utxos() { let ( mut test_manager, @@ -1859,7 +1866,8 @@ mod zebrad { .as_ref() ); } - #[tokio::test] + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_taddress_balance() { let ( mut test_manager, diff --git a/integration-tests/tests/test_vectors.rs b/integration-tests/tests/test_vectors.rs index 2a6f4230c..c80bfed1e 100644 --- a/integration-tests/tests/test_vectors.rs +++ b/integration-tests/tests/test_vectors.rs @@ -134,7 +134,7 @@ async fn create_test_manager_and_services( (test_manager, state_service, state_subscriber) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore = "Not a test! Used to build test vector data for zaino_state::chain_index unit tests."] async fn create_200_block_regtest_chain_vectors() { let (mut test_manager, _state_service, state_service_subscriber) = diff --git a/zaino-common/src/network.rs b/zaino-common/src/network.rs index c612b6d3e..d932c242a 100644 --- a/zaino-common/src/network.rs +++ b/zaino-common/src/network.rs @@ -83,9 +83,9 @@ impl Default for ActivationHeights { blossom: Some(1), heartwood: Some(1), canopy: Some(1), - nu5: Some(1), - nu6: Some(1), - nu6_1: Some(1), + nu5: Some(2), + nu6: Some(2), + nu6_1: Some(1000), nu7: None, } } diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index ba34b3def..7de2b7fd3 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -39,8 +39,8 @@ pub const ZEBRAD_DEFAULT_ACTIVATION_HEIGHTS: ActivationHeights = ActivationHeigh blossom: Some(1), heartwood: Some(1), canopy: Some(1), - nu5: Some(1), - nu6: Some(1), + nu5: Some(2), + nu6: Some(2), nu6_1: Some(1000), nu7: None, }; @@ -548,7 +548,7 @@ impl TestManager { None }; - Ok(Self { + let test_manager = Self { local_net, data_dir, network: network_kind, @@ -559,7 +559,15 @@ impl TestManager { zaino_grpc_listen_address, json_server_cookie_dir: zaino_json_server_cookie_dir, clients, - }) + }; + + // NOTE: We currently havee to turn nu5 and nu6 on at block height = 2, + // for this reason we generate an extra block to activate the expexted network upgrades. + // + // Not doing this here would require changing many tests temporarily while we wait for the atual fix. + test_manager.generate_blocks_with_delay(1).await; + + Ok(test_manager) } /// Helper function to support default test case. @@ -641,7 +649,7 @@ mod launch_testmanager { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn basic() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zcashd, @@ -658,13 +666,13 @@ mod launch_testmanager { .await .unwrap(); assert_eq!( - 1, + 2, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn generate_blocks() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zcashd, @@ -681,19 +689,19 @@ mod launch_testmanager { .await .unwrap(); assert_eq!( - 1, + 2, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.local_net.generate_blocks(1).await.unwrap(); assert_eq!( - 2, + 3, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.close().await; } #[ignore = "chain cache needs development"] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn with_chain() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zcashd, @@ -716,7 +724,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zcashd, @@ -743,7 +751,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zcashd, @@ -771,7 +779,7 @@ mod launch_testmanager { /// This test shows currently we do not receive mining rewards from Zebra unless we mine 100 blocks at a time. /// This is not the case with Zcashd and should not be the case here. /// Even if rewards need 100 confirmations these blocks should not have to be mined at the same time. - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zcashd, @@ -821,7 +829,7 @@ mod launch_testmanager { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn basic() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -838,13 +846,13 @@ mod launch_testmanager { .await .unwrap(); assert_eq!( - 1, + 2, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn generate_blocks() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -861,19 +869,19 @@ mod launch_testmanager { .await .unwrap(); assert_eq!( - 1, + 2, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.local_net.generate_blocks(1).await.unwrap(); assert_eq!( - 2, + 3, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.close().await; } #[ignore = "chain cache needs development"] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn with_chain() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -896,7 +904,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -923,7 +931,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -951,7 +959,7 @@ mod launch_testmanager { /// This test shows currently we do not receive mining rewards from Zebra unless we mine 100 blocks at a time. /// This is not the case with Zcashd and should not be the case here. /// Even if rewards need 100 confirmations these blocks should not have to be mined at the same time. - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -998,7 +1006,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward_and_send() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1100,7 +1108,7 @@ mod launch_testmanager { } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_testnet() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1132,7 +1140,7 @@ mod launch_testmanager { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn basic() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1149,13 +1157,13 @@ mod launch_testmanager { .await .unwrap(); assert_eq!( - 1, + 2, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn generate_blocks() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1172,19 +1180,19 @@ mod launch_testmanager { .await .unwrap(); assert_eq!( - 1, + 2, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.generate_blocks_with_delay(1).await; assert_eq!( - 2, + 3, u32::from(test_manager.local_net.get_chain_height().await) ); test_manager.close().await; } #[ignore = "chain cache needs development"] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn with_chain() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1207,7 +1215,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1234,7 +1242,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1262,8 +1270,8 @@ mod launch_testmanager { /// This test shows currently we do not receive mining rewards from Zebra unless we mine 100 blocks at a time. /// This is not the case with Zcashd and should not be the case here. /// Even if rewards need 100 confirmations these blocks should not have to be mined at the same time. - #[ignore = "Bug in zingolib 1.0 sync, reinstate on zinglib 2.0 upgrade."] - #[tokio::test] + // #[ignore = "Bug in zingolib 1.0 sync, reinstate on zinglib 2.0 upgrade."] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1311,8 +1319,8 @@ mod launch_testmanager { test_manager.close().await; } - #[ignore = "Bug in zingolib 1.0 sync, reinstate on zinglib 2.0 upgrade."] - #[tokio::test] + // #[ignore = "Bug in zingolib 1.0 sync, reinstate on zinglib 2.0 upgrade."] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward_and_send() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad, @@ -1412,7 +1420,7 @@ mod launch_testmanager { } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_testnet() { let mut test_manager = TestManager::launch_with_default_activation_heights( &ValidatorKind::Zebrad,