diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index 91eed2d96..1b906dfc8 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -63,8 +63,7 @@ mod chain_query_interface { use super::*; - async fn create_test_manager_and_chain_index( - validator: &ValidatorKind, + async fn create_test_manager_and_chain_index_zebrad( chain_cache: Option, enable_zaino: bool, zaino_no_sync: bool, @@ -77,7 +76,7 @@ mod chain_query_interface { NodeBackedChainIndexSubscriber, ) { let (test_manager, json_service) = create_test_manager_and_connector( - validator, + &ValidatorKind::Zebrad, chain_cache.clone(), enable_zaino, zaino_no_sync, @@ -149,7 +148,49 @@ mod chain_query_interface { db_version: 1, db_path, db_size: None, - network: zebra_chain::parameters::Network::new_regtest( + network: network.clone(), + no_sync: false, + no_db: false, + }; + let chain_index = NodeBackedChainIndex::new( + ValidatorConnector::State(chain_index::source::State { + read_state_service: state_service.read_state_service().clone(), + mempool_fetcher: json_service, + }), + config, + ) + .await + .unwrap(); + let index_reader = chain_index.subscriber(); + tokio::time::sleep(Duration::from_secs(3)).await; + + (test_manager, state_service, chain_index, index_reader) + } + + async fn create_test_manager_and_chain_index_zcashd( + chain_cache: Option, + enable_zaino: bool, + zaino_no_sync: bool, + zaino_no_db: bool, + enable_clients: bool, + ) -> ( + TestManager, + JsonRpSeeConnector, + NodeBackedChainIndex, + NodeBackedChainIndexSubscriber, + ) { + let (test_manager, json_service) = create_test_manager_and_connector( + &ValidatorKind::Zcashd, + chain_cache.clone(), + enable_zaino, + zaino_no_sync, + zaino_no_db, + enable_clients, + ) + .await; + + let network = match test_manager.network.to_string().as_str() { + "Regtest" => zebra_chain::parameters::Network::new_regtest( zebra_chain::parameters::testnet::ConfiguredActivationHeights { before_overwinter: Some(1), overwinter: Some(1), @@ -159,41 +200,41 @@ mod chain_query_interface { canopy: Some(1), nu5: Some(1), nu6: Some(1), - // see https://zips.z.cash/#nu6-1-candidate-zips for info on NU6.1 + // TODO: What is network upgrade 6.1? What does a minor version NU mean? nu6_1: None, nu7: None, }, ), + "Testnet" => zebra_chain::parameters::Network::new_default_testnet(), + "Mainnet" => zebra_chain::parameters::Network::Mainnet, + _ => panic!("Incorrect newtork type found."), + }; + let temp_dir: TempDir = tempfile::tempdir().unwrap(); + let db_path: PathBuf = temp_dir.path().to_path_buf(); + let config = BlockCacheConfig { + map_capacity: None, + map_shard_amount: None, + db_version: 1, + db_path, + db_size: None, + network: network.clone(), no_sync: false, no_db: false, }; - let chain_index = NodeBackedChainIndex::new( - ValidatorConnector::State(chain_index::source::State { - read_state_service: state_service.read_state_service().clone(), - mempool_fetcher: json_service, - }), - config, - ) - .await - .unwrap(); - let index_reader = chain_index.subscriber().await; + let chain_index = + NodeBackedChainIndex::new(ValidatorConnector::Fetch(json_service.clone()), config) + .await + .unwrap(); + let index_reader = chain_index.subscriber(); tokio::time::sleep(Duration::from_secs(3)).await; - (test_manager, state_service, chain_index, index_reader) + (test_manager, json_service, chain_index, index_reader) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn get_block_range() { - let (test_manager, _json_service, _chain_index, indexer) = - create_test_manager_and_chain_index( - &ValidatorKind::Zebrad, - None, - true, - false, - false, - true, - ) - .await; + async fn get_block_range_zebrad() { + let (test_manager, _state_service, _chain_index, indexer) = + create_test_manager_and_chain_index_zebrad(None, true, false, false, true).await; // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; @@ -224,17 +265,42 @@ mod chain_query_interface { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn find_fork_point() { + async fn get_block_range_zcashd() { let (test_manager, _json_service, _chain_index, indexer) = - create_test_manager_and_chain_index( - &ValidatorKind::Zebrad, - None, - true, - false, - false, - true, - ) - .await; + create_test_manager_and_chain_index_zcashd(None, true, false, false, true).await; + + // this delay had to increase. Maybe we tweak sync loop rerun time? + test_manager.generate_blocks_with_delay(5).await; + let snapshot = indexer.snapshot_nonfinalized_state(); + assert_eq!(snapshot.as_ref().blocks.len(), 7); + let range = indexer + .get_block_range(&snapshot, Height::try_from(0).unwrap(), None) + .unwrap() + .try_collect::>() + .await + .unwrap(); + for block in range { + let block = block + .zcash_deserialize_into::() + .unwrap(); + assert_eq!( + block.hash().0, + snapshot + .heights_to_hashes + .get( + &chain_index::types::Height::try_from(block.coinbase_height().unwrap()) + .unwrap() + ) + .unwrap() + .0 + ); + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn find_fork_point_zebrad() { + let (test_manager, _state_service, _chain_index, indexer) = + create_test_manager_and_chain_index_zebrad(None, true, false, false, true).await; // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; @@ -255,17 +321,32 @@ mod chain_query_interface { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn get_raw_transaction() { + async fn find_fork_point_zchashd() { let (test_manager, _json_service, _chain_index, indexer) = - create_test_manager_and_chain_index( - &ValidatorKind::Zebrad, - None, - true, - false, - false, - true, + create_test_manager_and_chain_index_zcashd(None, true, false, false, true).await; + + // this delay had to increase. Maybe we tweak sync loop rerun time? + test_manager.generate_blocks_with_delay(5).await; + let snapshot = indexer.snapshot_nonfinalized_state(); + assert_eq!(snapshot.as_ref().blocks.len(), 7); + for block_hash in snapshot.heights_to_hashes.values() { + // As all blocks are currently on the main chain, + // this should be the block provided + assert_eq!( + block_hash, + &indexer + .find_fork_point(&snapshot, block_hash) + .unwrap() + .unwrap() + .0 ) - .await; + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn get_raw_transaction_zebrad() { + let (test_manager, _state_service, _chain_index, indexer) = + create_test_manager_and_chain_index_zebrad(None, true, false, false, true).await; // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; @@ -292,17 +373,38 @@ mod chain_query_interface { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn get_transaction_status() { + async fn get_raw_transaction_zcashd() { let (test_manager, _json_service, _chain_index, indexer) = - create_test_manager_and_chain_index( - &ValidatorKind::Zebrad, - None, - true, - false, - false, - true, - ) - .await; + create_test_manager_and_chain_index_zcashd(None, true, false, false, true).await; + + // this delay had to increase. Maybe we tweak sync loop rerun time? + test_manager.generate_blocks_with_delay(5).await; + let snapshot = indexer.snapshot_nonfinalized_state(); + assert_eq!(snapshot.as_ref().blocks.len(), 7); + for txid in snapshot + .blocks + .values() + .flat_map(|block| block.transactions().iter().map(|txdata| txdata.txid().0)) + { + let raw_transaction = indexer + .get_raw_transaction(&snapshot, &TransactionHash(txid)) + .await + .unwrap() + .unwrap(); + let zebra_txn = + zebra_chain::transaction::Transaction::zcash_deserialize(&raw_transaction[..]) + .unwrap(); + + let correct_txid = zebra_txn.hash().0; + + assert_eq!(txid, correct_txid); + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn get_transaction_status_zebrad() { + let (test_manager, _state_service, _chain_index, indexer) = + create_test_manager_and_chain_index_zebrad(None, true, false, false, true).await; let snapshot = indexer.snapshot_nonfinalized_state(); // I don't know where this second block is generated. Somewhere in the // guts of create_test_manager_and_chain_index @@ -329,17 +431,38 @@ mod chain_query_interface { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn sync_large_chain() { + async fn get_transaction_status_zcashd() { + let (test_manager, _json_service, _chain_index, indexer) = + create_test_manager_and_chain_index_zcashd(None, true, false, false, true).await; + let snapshot = indexer.snapshot_nonfinalized_state(); + // I don't know where this second block is generated. Somewhere in the + // guts of create_test_manager_and_chain_index + assert_eq!(snapshot.as_ref().blocks.len(), 2); + + // this delay had to increase. Maybe we tweak sync loop rerun time? + test_manager.generate_blocks_with_delay(5).await; + let snapshot = indexer.snapshot_nonfinalized_state(); + assert_eq!(snapshot.as_ref().blocks.len(), 7); + for (txid, height, block_hash) in snapshot.blocks.values().flat_map(|block| { + block + .transactions() + .iter() + .map(|txdata| (txdata.txid().0, block.height(), block.hash())) + }) { + let (transaction_status_blocks, _transaction_mempool_status) = indexer + .get_transaction_status(&snapshot, &TransactionHash(txid)) + .await + .unwrap(); + assert_eq!(1, transaction_status_blocks.len()); + assert_eq!(transaction_status_blocks.keys().next().unwrap(), block_hash); + assert_eq!(transaction_status_blocks.values().next().unwrap(), &height) + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn sync_large_chain_zebrad() { let (test_manager, state_service, _chain_index, indexer) = - create_test_manager_and_chain_index( - &ValidatorKind::Zebrad, - None, - true, - false, - false, - true, - ) - .await; + create_test_manager_and_chain_index_zebrad(None, true, false, false, true).await; // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; @@ -388,4 +511,55 @@ mod chain_query_interface { .unwrap(); } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn sync_large_chain_zcashd() { + let (test_manager, json_service, _chain_index, indexer) = + create_test_manager_and_chain_index_zcashd(None, true, false, false, true).await; + + // this delay had to increase. Maybe we tweak sync loop rerun time? + test_manager.generate_blocks_with_delay(5).await; + { + let chain_height = json_service.get_blockchain_info().await.unwrap().blocks.0; + let indexer_height = indexer.snapshot_nonfinalized_state().best_tip.0; + assert_eq!(Height::try_from(chain_height).unwrap(), indexer_height); + } + + test_manager.generate_blocks_with_delay(150).await; + + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; + + let snapshot = indexer.snapshot_nonfinalized_state(); + let chain_height = json_service.get_blockchain_info().await.unwrap().blocks.0; + let indexer_height = snapshot.best_tip.0; + assert_eq!(Height::try_from(chain_height).unwrap(), indexer_height); + + let finalised_start = Height::try_from(chain_height - 150).unwrap(); + let finalised_tip = Height::try_from(chain_height - 100).unwrap(); + let end = Height::try_from(chain_height - 50).unwrap(); + + let finalized_blocks = indexer + .get_block_range(&snapshot, finalised_start, Some(finalised_tip)) + .unwrap() + .try_collect::>() + .await + .unwrap(); + for block in finalized_blocks { + block + .zcash_deserialize_into::() + .unwrap(); + } + + let non_finalised_blocks = indexer + .get_block_range(&snapshot, finalised_tip, Some(end)) + .unwrap() + .try_collect::>() + .await + .unwrap(); + for block in non_finalised_blocks { + block + .zcash_deserialize_into::() + .unwrap(); + } + } } diff --git a/integration-tests/tests/fetch_service.rs b/integration-tests/tests/fetch_service.rs index e50d309f0..71290fc21 100644 --- a/integration-tests/tests/fetch_service.rs +++ b/integration-tests/tests/fetch_service.rs @@ -1355,12 +1355,12 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn regtest_no_cache() { launch_fetch_service(&ValidatorKind::Zcashd, None).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore = "We no longer use chain caches. See zcashd::launch::regtest_no_cache."] pub(crate) async fn regtest_with_cache() { launch_fetch_service( @@ -1375,7 +1375,7 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn validate_address() { fetch_service_validate_address(&ValidatorKind::Zcashd).await; } @@ -1385,27 +1385,27 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_balance() { fetch_service_get_address_balance(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_raw() { fetch_service_get_block_raw(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_object() { fetch_service_get_block_object(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_mempool() { fetch_service_get_raw_mempool(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_info() { test_get_mempool_info(&ValidatorKind::Zcashd).await; } @@ -1414,128 +1414,128 @@ mod zcashd { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn get_treestate() { fetch_service_z_get_treestate(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtrees_by_index() { fetch_service_z_get_subtrees_by_index(&ValidatorKind::Zcashd).await; } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_transaction() { fetch_service_get_raw_transaction(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_tx_ids() { fetch_service_get_address_tx_ids(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_utxos() { fetch_service_get_address_utxos(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_block() { fetch_service_get_latest_block(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block() { fetch_service_get_block(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn difficulty() { assert_fetch_service_difficulty_matches_rpc(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn best_blockhash() { fetch_service_get_best_blockhash(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_count() { fetch_service_get_block_count(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_nullifiers() { fetch_service_get_block_nullifiers(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range() { fetch_service_get_block_range(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range_nullifiers() { fetch_service_get_block_range_nullifiers(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mined() { fetch_service_get_transaction_mined(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mempool() { fetch_service_get_transaction_mempool(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_txids() { fetch_service_get_taddress_txids(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_balance() { fetch_service_get_taddress_balance(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_tx() { fetch_service_get_mempool_tx(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_stream() { fetch_service_get_mempool_stream(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn tree_state() { fetch_service_get_tree_state(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_tree_state() { fetch_service_get_latest_tree_state(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtree_roots() { fetch_service_get_subtree_roots(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos() { fetch_service_get_taddress_utxos(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos_stream() { fetch_service_get_taddress_utxos_stream(&ValidatorKind::Zcashd).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn lightd_info() { fetch_service_get_lightd_info(&ValidatorKind::Zcashd).await; } @@ -1550,12 +1550,12 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn regtest_no_cache() { launch_fetch_service(&ValidatorKind::Zebrad, None).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore = "We no longer use chain caches. See zebrad::launch::regtest_no_cache."] pub(crate) async fn regtest_with_cache() { launch_fetch_service( @@ -1570,7 +1570,7 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn validate_address() { fetch_service_validate_address(&ValidatorKind::Zebrad).await; } @@ -1580,27 +1580,27 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_balance() { fetch_service_get_address_balance(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_raw() { fetch_service_get_block_raw(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_object() { fetch_service_get_block_object(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_mempool() { fetch_service_get_raw_mempool(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_info() { test_get_mempool_info(&ValidatorKind::Zebrad).await; } @@ -1609,128 +1609,128 @@ mod zebrad { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn treestate() { fetch_service_z_get_treestate(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtrees_by_index() { fetch_service_z_get_subtrees_by_index(&ValidatorKind::Zebrad).await; } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn raw_transaction() { fetch_service_get_raw_transaction(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_tx_ids() { fetch_service_get_address_tx_ids(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn address_utxos() { fetch_service_get_address_utxos(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_block() { fetch_service_get_latest_block(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block() { fetch_service_get_block(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn difficulty() { assert_fetch_service_difficulty_matches_rpc(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn best_blockhash() { fetch_service_get_best_blockhash(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_count() { fetch_service_get_block_count(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_nullifiers() { fetch_service_get_block_nullifiers(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range() { fetch_service_get_block_range(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn block_range_nullifiers() { fetch_service_get_block_range_nullifiers(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mined() { fetch_service_get_transaction_mined(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn transaction_mempool() { fetch_service_get_transaction_mempool(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_txids() { fetch_service_get_taddress_txids(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_balance() { fetch_service_get_taddress_balance(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_tx() { fetch_service_get_mempool_tx(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn mempool_stream() { fetch_service_get_mempool_stream(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn tree_state() { fetch_service_get_tree_state(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn latest_tree_state() { fetch_service_get_latest_tree_state(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn subtree_roots() { fetch_service_get_subtree_roots(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos() { fetch_service_get_taddress_utxos(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn taddress_utxos_stream() { fetch_service_get_taddress_utxos_stream(&ValidatorKind::Zebrad).await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn lightd_info() { fetch_service_get_lightd_info(&ValidatorKind::Zebrad).await; } diff --git a/zaino-state/src/backends/fetch.rs b/zaino-state/src/backends/fetch.rs index c0b1cea6e..8fcf66be3 100644 --- a/zaino-state/src/backends/fetch.rs +++ b/zaino-state/src/backends/fetch.rs @@ -2,13 +2,15 @@ use futures::StreamExt; use hex::FromHex; -use std::time; +use std::{io::Cursor, time}; use tokio::{sync::mpsc, time::timeout}; use tonic::async_trait; use tracing::{info, warn}; use zebra_state::HashOrHeight; -use zebra_chain::{block::Height, subtree::NoteCommitmentSubtreeIndex}; +use zebra_chain::{ + block::Height, serialization::ZcashDeserialize as _, subtree::NoteCommitmentSubtreeIndex, +}; use zebra_rpc::{ client::{GetSubtreesByIndexResponse, GetTreestateResponse, ValidateAddressResponse}, methods::{ @@ -39,6 +41,7 @@ use crate::{ chain_index::{ mempool::{Mempool, MempoolSubscriber}, source::ValidatorConnector, + types, }, config::FetchServiceConfig, error::{BlockCacheError, FetchServiceError}, @@ -51,7 +54,10 @@ use crate::{ AddressStream, CompactBlockStream, CompactTransactionStream, RawTransactionStream, UtxoReplyStream, }, - utils::{blockid_to_hashorheight, get_build_info, ServiceMetadata}, + utils::{ + blockid_to_hashorheight, compact_block_to_nullifiers, get_build_info, ServiceMetadata, + }, + ChainIndex as _, NodeBackedChainIndex, NodeBackedChainIndexSubscriber, }; /// Chain fetch service backed by Zcashd's JsonRPC engine. @@ -70,6 +76,8 @@ pub struct FetchService { block_cache: BlockCache, /// Internal mempool. mempool: Mempool, + /// Core indexer. + indexer: NodeBackedChainIndex, /// Service metadata. data: ServiceMetadata, /// StateService config data. @@ -102,14 +110,17 @@ impl ZcashService for FetchService { ); info!("Using Zcash build: {}", data); + let source = ValidatorConnector::Fetch(fetcher.clone()); + let indexer = NodeBackedChainIndex::new(source, config.clone().into()) + .await + .unwrap(); + let block_cache = BlockCache::spawn(&fetcher, None, config.clone().into()) .await .map_err(|e| { FetchServiceError::BlockCacheError(BlockCacheError::Custom(e.to_string())) })?; - let mempool_source = ValidatorConnector::Fetch(fetcher.clone()); - let mempool = Mempool::spawn(mempool_source, None).await.map_err(|e| { FetchServiceError::BlockCacheError(BlockCacheError::Custom(e.to_string())) })?; @@ -118,6 +129,7 @@ impl ZcashService for FetchService { fetcher, block_cache, mempool, + indexer, data, config, }; @@ -135,6 +147,7 @@ impl ZcashService for FetchService { fetcher: self.fetcher.clone(), block_cache: self.block_cache.subscriber(), mempool: self.mempool.subscriber(), + indexer: self.indexer.subscriber(), data: self.data.clone(), config: self.config.clone(), }) @@ -172,6 +185,8 @@ pub struct FetchServiceSubscriber { pub block_cache: BlockCacheSubscriber, /// Internal mempool. pub mempool: MempoolSubscriber, + /// Core indexer. + indexer: NodeBackedChainIndexSubscriber, /// Service metadata. pub data: ServiceMetadata, /// StateService config data. @@ -181,10 +196,7 @@ pub struct FetchServiceSubscriber { impl FetchServiceSubscriber { /// Fetches the current status pub fn status(&self) -> StatusType { - let mempool_status = self.mempool.status(); - let block_cache_status = self.block_cache.status(); - - mempool_status.combine(block_cache_status) + self.indexer.status() } } @@ -242,7 +254,7 @@ impl ZcashIndexer for FetchServiceSubscriber { /// /// Zebra does not support this RPC call directly. async fn get_mempool_info(&self) -> Result { - Ok(self.mempool.get_mempool_info().await?) + Ok(self.indexer.get_mempool_info().await?.into()) } /// Returns the proof-of-work difficulty as a multiple of the minimum difficulty. @@ -410,11 +422,11 @@ impl ZcashIndexer for FetchServiceSubscriber { async fn get_raw_mempool(&self) -> Result, Self::Error> { // Ok(self.fetcher.get_raw_mempool().await?.transactions) Ok(self - .mempool - .get_mempool() + .indexer + .get_mempool_txids() .await .into_iter() - .map(|(key, _)| key.0) + .map(|mempool_txids| mempool_txids.iter().map(|txid| txid.to_string()).collect()) .collect()) } @@ -508,7 +520,9 @@ impl ZcashIndexer for FetchServiceSubscriber { } async fn chain_height(&self) -> Result { - Ok(self.block_cache.get_chain_height().await?) + Ok(Height( + self.indexer.snapshot_nonfinalized_state().best_tip.0 .0, + )) } /// Returns the transaction ids made by the provided transparent addresses. /// @@ -577,17 +591,12 @@ impl ZcashIndexer for FetchServiceSubscriber { impl LightWalletIndexer for FetchServiceSubscriber { /// Return the height of the tip of the best chain async fn get_latest_block(&self) -> Result { - let latest_height = self.block_cache.get_chain_height().await?; - let mut latest_hash = self - .block_cache - .get_compact_block(latest_height.0.to_string()) - .await? - .hash; - latest_hash.reverse(); + let tip = self.indexer.snapshot_nonfinalized_state().best_tip; + dbg!(&tip); Ok(BlockId { - height: latest_height.0 as u64, - hash: latest_hash, + height: tip.0 .0 as u64, + hash: tip.1 .0.to_vec(), }) } @@ -598,14 +607,57 @@ impl LightWalletIndexer for FetchServiceSubscriber { "Error: Invalid hash and/or height out of range. Failed to convert to u32.", )), )?; + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let height = match hash_or_height { + HashOrHeight::Height(height) => height.0, + HashOrHeight::Hash(hash) => { + match self.indexer.get_block_height(&snapshot, hash.into()).await { + Ok(Some(height)) => height.0, + Ok(None) => { + return Err(FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Hash not founf in chain", + ))); + } + Err(_e) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::internal("Error: Internal db error."), + )); + } + } + } + }; match self - .block_cache - .get_compact_block(hash_or_height.to_string()) + .indexer + .get_compact_block(&snapshot, types::Height(height)) .await { - Ok(block) => Ok(block), + Ok(Some(block)) => Ok(block), + Ok(None) => { + let chain_height = snapshot.best_tip.0 .0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + "Error: Failed to retrieve block from state.", + ))), + } + } Err(e) => { - let chain_height = self.block_cache.get_chain_height().await?.0; + let chain_height = snapshot.best_tip.0 .0; match hash_or_height { HashOrHeight::Height(Height(height)) if height >= chain_height => Err( FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( @@ -639,35 +691,86 @@ impl LightWalletIndexer for FetchServiceSubscriber { /// /// NOTE: Currently this only returns Orchard nullifiers to follow Lightwalletd functionality but Sapling could be added if required by wallets. async fn get_block_nullifiers(&self, request: BlockId) -> Result { - let height: u32 = match request.height.try_into() { - Ok(height) => height, - Err(_) => { - return Err(FetchServiceError::TonicStatusError( - tonic::Status::invalid_argument( - "Error: Height out of range. Failed to convert to u32.", - ), - )); + let hash_or_height = blockid_to_hashorheight(request).ok_or( + FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Failed to convert to u32.", + )), + )?; + let snapshot = self.indexer.snapshot_nonfinalized_state(); + let height = match hash_or_height { + HashOrHeight::Height(height) => height.0, + HashOrHeight::Hash(hash) => { + match self.indexer.get_block_height(&snapshot, hash.into()).await { + Ok(Some(height)) => height.0, + Ok(None) => { + return Err(FetchServiceError::TonicStatusError(tonic::Status::invalid_argument( + "Error: Invalid hash and/or height out of range. Hash not founf in chain", + ))); + } + Err(_e) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::internal("Error: Internal db error."), + )); + } + } } }; match self - .block_cache - .get_compact_block_nullifiers(height.to_string()) + .indexer + .get_compact_block(&snapshot, types::Height(height)) .await { - Ok(block) => Ok(block), + Ok(Some(block)) => Ok(compact_block_to_nullifiers(block)), + Ok(None) => { + let chain_height = snapshot.best_tip.0 .0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + "Error: Failed to retrieve block from state.", + ))), + } + } Err(e) => { - let chain_height = self.block_cache.get_chain_height().await?.0; - if height >= chain_height { - Err(FetchServiceError::TonicStatusError(tonic::Status::out_of_range( - format!( - "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", - ) - ))) - } else { + let chain_height = snapshot.best_tip.0 .0; + match hash_or_height { + HashOrHeight::Height(Height(height)) if height >= chain_height => Err( + FetchServiceError::TonicStatusError(tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is greater than the best chain tip [{chain_height}].", + ))), + ), + HashOrHeight::Height(height) + if height > self.data.network().sapling_activation_height() => + { + Err(FetchServiceError::TonicStatusError( + tonic::Status::out_of_range(format!( + "Error: Height out of range [{hash_or_height}]. Height requested \ + is below sapling activation height [{chain_height}].", + )), + )) + } + _otherwise => // TODO: Hide server error from clients before release. Currently useful for dev purposes. - Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( - format!("Error: Failed to retrieve block from node. Server Error: {e}",), - ))) + { + Err(FetchServiceError::TonicStatusError(tonic::Status::unknown( + format!("Error: Failed to retrieve block from node. Server Error: {e}",), + ))) + } } } } @@ -678,6 +781,7 @@ impl LightWalletIndexer for FetchServiceSubscriber { &self, request: BlockRange, ) -> Result { + dbg!(&request); let mut start: u32 = match request.start { Some(block_id) => match block_id.height.try_into() { Ok(height) => height, @@ -718,26 +822,49 @@ impl LightWalletIndexer for FetchServiceSubscriber { } else { false }; - let chain_height = self.block_cache.get_chain_height().await?.0; let fetch_service_clone = self.clone(); let service_timeout = self.config.service_timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service_channel_size as usize); tokio::spawn(async move { let timeout = timeout(time::Duration::from_secs((service_timeout*4) as u64), async { + let snapshot = fetch_service_clone.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_tip.0.0; for height in start..=end { let height = if rev_order { end - (height - start) } else { height }; - match fetch_service_clone.block_cache.get_compact_block( - height.to_string(), + match fetch_service_clone.indexer.get_compact_block( + &snapshot, + types::Height(height), ).await { - Ok(block) => { + Ok(Some(block)) => { if channel_tx.send(Ok(block)).await.is_err() { break; } } + Ok(None) => if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) + .await + + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; + } + } + } else if channel_tx + .send(Err(tonic::Status::unknown("Internal error, Failed to fetch block."))) + .await + .is_err() + { + break; + } Err(e) => { if height >= chain_height { match channel_tx @@ -790,78 +917,128 @@ impl LightWalletIndexer for FetchServiceSubscriber { &self, request: BlockRange, ) -> Result { - let tonic_status_error = - |err| FetchServiceError::TonicStatusError(tonic::Status::invalid_argument(err)); - let mut start = match request.start { - Some(block_id) => match u32::try_from(block_id.height) { - Ok(height) => Ok(height), - Err(_) => Err("Error: Start height out of range. Failed to convert to u32."), + let mut start: u32 = match request.start { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: Start height out of range. Failed to convert to u32.", + ), + )); + } }, - None => Err("Error: No start height given."), - } - .map_err(tonic_status_error)?; - let mut end = match request.end { - Some(block_id) => match u32::try_from(block_id.height) { - Ok(height) => Ok(height), - Err(_) => Err("Error: End height out of range. Failed to convert to u32."), + None => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; + let mut end: u32 = match request.end { + Some(block_id) => match block_id.height.try_into() { + Ok(height) => height, + Err(_) => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument( + "Error: End height out of range. Failed to convert to u32.", + ), + )); + } }, - None => Err("Error: No start height given."), - } - .map_err(tonic_status_error)?; + None => { + return Err(FetchServiceError::TonicStatusError( + tonic::Status::invalid_argument("Error: No start height given."), + )); + } + }; let rev_order = if start > end { (start, end) = (end, start); true } else { false }; - let chain_height = self.block_cache.get_chain_height().await?.0; let fetch_service_clone = self.clone(); let service_timeout = self.config.service_timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service_channel_size as usize); tokio::spawn(async move { - let timeout = timeout( - time::Duration::from_secs((service_timeout * 4) as u64), - async { + let timeout = timeout(time::Duration::from_secs((service_timeout*4) as u64), async { + let snapshot = fetch_service_clone.indexer.snapshot_nonfinalized_state(); + let chain_height = snapshot.best_tip.0.0; for height in start..=end { let height = if rev_order { end - (height - start) } else { height }; - if let Err(e) = channel_tx - .send( - fetch_service_clone - .block_cache - .get_compact_block_nullifiers(height.to_string()) + match fetch_service_clone.indexer.get_compact_block( + &snapshot, + types::Height(height), + ).await { + Ok(Some(block)) => { + if channel_tx.send(Ok(compact_block_to_nullifiers(block))).await.is_err() { + break; + } + } + Ok(None) => if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) .await - .map_err(|e| { - if height >= chain_height { - tonic::Status::out_of_range(format!( - "Error: Height out of range [{height}]. Height requested \ - is greater than the best chain tip [{chain_height}].", - )) - } else { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - tonic::Status::unknown(e.to_string()) + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; } - }), - ) - .await - { - warn!("GetBlockRangeNullifiers channel closed unexpectedly: {}", e); - break; + } + } else if channel_tx + .send(Err(tonic::Status::unknown("Internal error, Failed to fetch block."))) + .await + .is_err() + { + break; + } + Err(e) => { + if height >= chain_height { + match channel_tx + .send(Err(tonic::Status::out_of_range(format!( + "Error: Height out of range [{height}]. Height requested is greater than the best chain tip [{chain_height}].", + )))) + .await + + { + Ok(_) => break, + Err(e) => { + warn!("GetBlockRange channel closed unexpectedly: {}", e); + break; + } + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + break; + } + } + } } } - }, - ) - .await; - if timeout.is_err() { - channel_tx - .send(Err(tonic::Status::deadline_exceeded( - "Error: get_block_range_nullifiers gRPC request timed out.", - ))) - .await - .ok(); + }) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::deadline_exceeded( + "Error: get_block_range gRPC request timed out.", + ))) + .await + .ok(); + } } }); Ok(CompactBlockStream::new(channel_rx)) @@ -885,7 +1062,7 @@ impl LightWalletIndexer for FetchServiceSubscriber { let height: u64 = match height { Some(h) => h as u64, // Zebra returns None for mempool transactions, convert to `Mempool Height`. - None => self.block_cache.get_chain_height().await?.0 as u64, + None => self.indexer.snapshot_nonfinalized_state().best_tip.0 .0 as u64, }; Ok(RawTransaction { @@ -1092,72 +1269,75 @@ impl LightWalletIndexer for FetchServiceSubscriber { }) .collect(); - let mempool = self.mempool.clone(); + let mempool = self.indexer.clone(); let service_timeout = self.config.service_timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service_channel_size as usize); + tokio::spawn(async move { let timeout = timeout( time::Duration::from_secs((service_timeout * 4) as u64), async { - for (txid, serialized_transaction) in - mempool.get_filtered_mempool(exclude_txids).await - { - let txid_bytes = match hex::decode(txid.0) { - Ok(bytes) => bytes, - Err(error) => { - if channel_tx - .send(Err(tonic::Status::unknown(error.to_string()))) - .await - .is_err() - { - break; - } else { - continue; - } - } - }; - match ::parse_from_slice( - serialized_transaction.0.as_ref().as_ref(), - Some(vec![txid_bytes]), - None, - ) { - Ok(transaction) => { - // ParseFromSlice returns any data left after the conversion to a - // FullTransaction, If the conversion has succeeded this should be empty. - if transaction.0.is_empty() { - if channel_tx - .send( - transaction - .1 - .to_compact(0) - .map_err(|e| tonic::Status::unknown(e.to_string())), - ) - .await - .is_err() - { - break; + match mempool.get_mempool_transactions(exclude_txids).await { + Ok(transactions) => { + for serialized_transaction_bytes in transactions { + // TODO: This implementation should be cleaned up to not use parse_from_slice. + // This could be done by implementing try_from zebra_chain::transaction::Transaction for CompactTxData, + // (which implements to_compact())letting us avoid double parsing of transaction bytes. + let transaction: zebra_chain::transaction::Transaction = + zebra_chain::transaction::Transaction::zcash_deserialize( + &mut Cursor::new(&serialized_transaction_bytes), + ) + .unwrap(); + // TODO: Check this is in the correct format and does not need hex decoding or reversing. + let txid = transaction.hash().0.to_vec(); + + match ::parse_from_slice( + &serialized_transaction_bytes, + Some(vec![txid]), + None, + ) { + Ok(transaction) => { + // ParseFromSlice returns any data left after the conversion to a + // FullTransaction, If the conversion has succeeded this should be empty. + if transaction.0.is_empty() { + if channel_tx + .send(transaction.1.to_compact(0).map_err(|e| { + tonic::Status::unknown(e.to_string()) + })) + .await + .is_err() + { + break; + } + } else { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown("Error: "))) + .await + .is_err() + { + break; + } + } } - } else { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - if channel_tx - .send(Err(tonic::Status::unknown("Error: "))) - .await - .is_err() - { - break; + Err(e) => { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + if channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .is_err() + { + break; + } } } } - Err(e) => { - // TODO: Hide server error from clients before release. Currently useful for dev purposes. - if channel_tx - .send(Err(tonic::Status::unknown(e.to_string()))) - .await - .is_err() - { - break; - } - } + } + Err(e) => { + channel_tx + .send(Err(tonic::Status::unknown(e.to_string()))) + .await + .ok(); } } }, @@ -1182,53 +1362,50 @@ impl LightWalletIndexer for FetchServiceSubscriber { /// Return a stream of current Mempool transactions. This will keep the output stream open while /// there are mempool transactions. It will close the returned stream when a new block is mined. async fn get_mempool_stream(&self) -> Result { - let mut mempool = self.mempool.clone(); + let indexer = self.indexer.clone(); let service_timeout = self.config.service_timeout; let (channel_tx, channel_rx) = mpsc::channel(self.config.service_channel_size as usize); - let mempool_height = self.block_cache.get_chain_height().await?.0; tokio::spawn(async move { let timeout = timeout( time::Duration::from_secs((service_timeout * 6) as u64), async { - let (mut mempool_stream, _mempool_handle) = match mempool - .get_mempool_stream(None) - .await - { - Ok(stream) => stream, - Err(e) => { - warn!("Error fetching stream from mempool: {:?}", e); + let mempool_height = indexer.snapshot_nonfinalized_state().best_tip.0 .0; + match indexer.get_mempool_stream(None) { + Some(mut mempool_stream) => { + while let Some(result) = mempool_stream.next().await { + match result { + Ok(transaction_bytes) => { + if channel_tx + .send(Ok(RawTransaction { + data: transaction_bytes, + height: mempool_height as u64, + })) + .await + .is_err() + { + break; + } + } + Err(e) => { + channel_tx + .send(Err(tonic::Status::internal(format!( + "Error in mempool stream: {e:?}" + )))) + .await + .ok(); + break; + } + } + } + } + None => { + warn!("Error fetching stream from mempool, Incorrect chain tip!"); channel_tx .send(Err(tonic::Status::internal("Error getting mempool stream"))) .await .ok(); - return; } }; - while let Some(result) = mempool_stream.recv().await { - match result { - Ok((_mempool_key, mempool_value)) => { - if channel_tx - .send(Ok(RawTransaction { - data: mempool_value.0.as_ref().as_ref().to_vec(), - height: mempool_height as u64, - })) - .await - .is_err() - { - break; - } - } - Err(e) => { - channel_tx - .send(Err(tonic::Status::internal(format!( - "Error in mempool stream: {e:?}" - )))) - .await - .ok(); - break; - } - } - } }, ) .await; diff --git a/zaino-state/src/backends/state.rs b/zaino-state/src/backends/state.rs index 482148e2d..f662cf047 100644 --- a/zaino-state/src/backends/state.rs +++ b/zaino-state/src/backends/state.rs @@ -17,7 +17,8 @@ use crate::{ UtxoReplyStream, }, utils::{blockid_to_hashorheight, get_build_info, ServiceMetadata}, - BlockHash, MempoolKey, + BlockHash, + MempoolKey, // NodeBackedChainIndex, NodeBackedChainIndexSubscriber, }; use nonempty::NonEmpty; @@ -110,6 +111,8 @@ pub struct StateService { block_cache: BlockCache, /// Internal mempool. mempool: Mempool, + // /// Core indexer. + // indexer: NodeBackedChainIndex, /// Service metadata. data: ServiceMetadata, /// StateService config data. @@ -241,18 +244,24 @@ impl ZcashService for StateService { } } + // let source = ValidatorConnector::State(crate::State { + // read_state_service: read_state_service.clone(), + // mempool_fetcher: rpc_client.clone(), + // }); + // let indexer = NodeBackedChainIndex::new(source, config.clone().into()) + // .await + // .unwrap(); + let block_cache = BlockCache::spawn( &rpc_client, Some(&read_state_service), config.clone().into(), ) .await?; - let mempool_source = ValidatorConnector::State(crate::chain_index::source::State { read_state_service: read_state_service.clone(), mempool_fetcher: rpc_client.clone(), }); - let mempool = Mempool::spawn(mempool_source, None).await?; let state_service = Self { @@ -262,6 +271,7 @@ impl ZcashService for StateService { rpc_client: rpc_client.clone(), block_cache, mempool, + // indexer, data, config, status: AtomicStatus::new(StatusType::Spawning.into()), @@ -278,6 +288,7 @@ impl ZcashService for StateService { rpc_client: self.rpc_client.clone(), block_cache: self.block_cache.subscriber(), mempool: self.mempool.subscriber(), + // indexer: self.indexer.subscriber(), data: self.data.clone(), config: self.config.clone(), chain_tip_change: self.chain_tip_change.clone(), @@ -326,6 +337,8 @@ pub struct StateServiceSubscriber { pub block_cache: BlockCacheSubscriber, /// Internal mempool. pub mempool: MempoolSubscriber, + // /// Core indexer. + // indexer: NodeBackedChainIndexSubscriber, /// Service metadata. pub data: ServiceMetadata, /// StateService config data. @@ -1021,7 +1034,7 @@ impl ZcashIndexer for StateServiceSubscriber { /// `usage` is the total memory usage for the mempool, in bytes. /// the [optional `fullyNotified` field](), is only utilized for zcashd regtests, is deprecated, and is not included. async fn get_mempool_info(&self) -> Result { - Ok(self.mempool.get_mempool_info().await?) + Ok(self.mempool.get_mempool_info().await.into()) } async fn z_get_address_balance( diff --git a/zaino-state/src/chain_index.rs b/zaino-state/src/chain_index.rs index e9404fda6..4bab123cc 100644 --- a/zaino-state/src/chain_index.rs +++ b/zaino-state/src/chain_index.rs @@ -12,10 +12,11 @@ //! - NOTE: Full transaction and block data is served from the backend finalizer. use crate::error::{ChainIndexError, ChainIndexErrorKind, FinalisedStateError}; -use crate::{AtomicStatus, StatusType, SyncError}; +use crate::{AtomicStatus, StatusType, SyncError, TransactionHash}; use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{FutureExt, Stream}; +use hex::FromHex as _; use non_finalised_state::NonfinalizedBlockCacheSnapshot; use source::{BlockchainSource, ValidatorConnector}; use tokio_stream::StreamExt; @@ -183,6 +184,30 @@ pub trait ChainIndex { end: Option, ) -> Option, Self::Error>>>; + /// Returns the *compact* block for the given height. + /// + /// Returns None if the specified height + /// is greater than the snapshot's tip + /// + /// TODO: Add range fetch method or update this? + #[allow(clippy::type_complexity)] + fn get_compact_block( + &self, + nonfinalized_snapshot: &Self::Snapshot, + height: types::Height, + ) -> impl std::future::Future< + Output = Result, Self::Error>, + >; + + /// Returns Some(Height) for the given block hash *if* it is currently in the best chain. + /// + /// Returns None if the specified block is not in the best chain or is not found. + fn get_block_height( + &self, + nonfinalized_snapshot: &Self::Snapshot, + hash: types::BlockHash, + ) -> impl std::future::Future, Self::Error>>; + /// Finds the newest ancestor of the given block on the main /// chain, or the block itself if it is on the main chain. fn find_fork_point( @@ -218,6 +243,11 @@ pub trait ChainIndex { >, >; + /// Returns all txids currently in the mempool. + fn get_mempool_txids( + &self, + ) -> impl std::future::Future, Self::Error>>; + /// Returns all transactions currently in the mempool, filtered by `exclude_list`. /// /// The `exclude_list` may contain shortened transaction ID hex prefixes (client-endian). @@ -229,12 +259,20 @@ pub trait ChainIndex { /// Returns a stream of mempool transactions, ending the stream when the chain tip block hash /// changes (a new block is mined or a reorg occurs). /// - /// If the chain tip has changed from the given spanshot returns None. + /// If a snapshot is given and the chain tip has changed from the given spanshot, returns None. #[allow(clippy::type_complexity)] fn get_mempool_stream( &self, - snapshot: &Self::Snapshot, + snapshot: Option<&Self::Snapshot>, ) -> Option, Self::Error>>>; + + /// Returns Information about the mempool state: + /// - size: Current tx count + /// - bytes: Sum of all tx sizes + /// - usage: Total memory usage for the mempool + fn get_mempool_info( + &self, + ) -> impl std::future::Future>; } /// The combined index. Contains a view of the mempool, and the full @@ -360,6 +398,7 @@ pub trait ChainIndex { /// - Unified access to finalized and non-finalized blockchain state /// - Automatic synchronization between state layers /// - Snapshot-based consistency for queries +#[derive(Debug)] pub struct NodeBackedChainIndex { #[allow(dead_code)] mempool: std::sync::Arc>, @@ -406,7 +445,7 @@ impl NodeBackedChainIndex { /// Creates a [`NodeBackedChainIndexSubscriber`] from self, /// a clone-safe, drop-safe, read-only view onto the running indexer. - pub async fn subscriber(&self) -> NodeBackedChainIndexSubscriber { + pub fn subscriber(&self) -> NodeBackedChainIndexSubscriber { NodeBackedChainIndexSubscriber { mempool: self.mempool.subscriber(), non_finalized_state: self.non_finalized_state.clone(), @@ -501,7 +540,7 @@ impl NodeBackedChainIndex { /// Designed for concurrent efficiency. /// /// [`NodeBackedChainIndexSubscriber`] can safely be cloned and dropped freely. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct NodeBackedChainIndexSubscriber { mempool: mempool::MempoolSubscriber, non_finalized_state: std::sync::Arc>, @@ -642,6 +681,49 @@ impl ChainIndex for NodeBackedChainIndexSubscriber Result, Self::Error> { + if height <= nonfinalized_snapshot.best_tip.0 { + Ok(Some( + match nonfinalized_snapshot.get_chainblock_by_height(&height) { + Some(block) => block.to_compact_block(), + None => match self.finalized_state.get_compact_block(height).await { + Ok(block) => block, + Err(_e) => return Err(ChainIndexError::database_hole(height)), + }, + }, + )) + } else { + Ok(None) + } + } + + /// Returns Some(Height) for the given block hash *if* it is currently in the best chain. + /// + /// Returns None if the specified block is not in the best chain or is not found. + /// + /// Used for hash based block lookup (random access). + async fn get_block_height( + &self, + nonfinalized_snapshot: &Self::Snapshot, + hash: types::BlockHash, + ) -> Result, Self::Error> { + match nonfinalized_snapshot.blocks.get(&hash).cloned() { + Some(block) => Ok(block.index().height()), + None => match self.finalized_state.get_block_height(hash).await { + Ok(height) => Ok(height), + Err(_e) => Err(ChainIndexError::database_hole(hash)), + }, + } + } + /// Finds the newest ancestor of the given block on the main /// chain, or the block itself if it is on the main chain. fn find_fork_point( @@ -738,6 +820,18 @@ impl ChainIndex for NodeBackedChainIndexSubscriber Result, Self::Error> { + self.mempool + .get_mempool() + .await + .into_iter() + .map(|(txid_key, _)| { + TransactionHash::from_hex(&txid_key.0).map_err(ChainIndexError::backing_validator) + }) + .collect::>() + } + /// Returns all transactions currently in the mempool, filtered by `exclude_list`. /// /// The `exclude_list` may contain shortened transaction ID hex prefixes (client-endian). @@ -749,13 +843,9 @@ impl ChainIndex for NodeBackedChainIndexSubscriber, ) -> Result>, Self::Error> { - let subscriber = self.mempool.clone(); - - // Use the mempool's own filtering (it already handles client-endian shortened prefixes). let pairs: Vec<(mempool::MempoolKey, mempool::MempoolValue)> = - subscriber.get_filtered_mempool(exclude_list).await; + self.mempool.get_filtered_mempool(exclude_list).await; - // Transform to the Vec> that the trait requires. let bytes: Vec> = pairs .into_iter() .map(|(_, v)| v.0.as_ref().as_ref().to_vec()) @@ -770,13 +860,13 @@ impl ChainIndex for NodeBackedChainIndexSubscriber, ) -> Option, Self::Error>>> { - let expected_chain_tip = snapshot.best_tip.1; + let expected_chain_tip = snapshot.map(|snapshot| snapshot.best_tip.1); let mut subscriber = self.mempool.clone(); match subscriber - .get_mempool_stream(Some(expected_chain_tip)) + .get_mempool_stream(expected_chain_tip) .now_or_never() { Some(Ok((in_rx, _handle))) => { @@ -823,6 +913,14 @@ impl ChainIndex for NodeBackedChainIndexSubscriber Result { + Ok(self.mempool.get_mempool_info().await) + } } /// A snapshot of the non-finalized state, for consistent queries diff --git a/zaino-state/src/chain_index/finalised_state.rs b/zaino-state/src/chain_index/finalised_state.rs index fd44b7096..015a1e9cf 100644 --- a/zaino-state/src/chain_index/finalised_state.rs +++ b/zaino-state/src/chain_index/finalised_state.rs @@ -30,6 +30,7 @@ use tokio::time::{interval, MissedTickBehavior}; use super::source::BlockchainSource; +#[derive(Debug)] pub(crate) struct ZainoDB { db: Arc, cfg: BlockCacheConfig, diff --git a/zaino-state/src/chain_index/finalised_state/db.rs b/zaino-state/src/chain_index/finalised_state/db.rs index d6a7130e0..7b08d90fd 100644 --- a/zaino-state/src/chain_index/finalised_state/db.rs +++ b/zaino-state/src/chain_index/finalised_state/db.rs @@ -31,6 +31,7 @@ use super::capability::Capability; pub(super) const VERSION_DIRS: [&str; 1] = ["v1"]; /// All concrete database implementations. +#[derive(Debug)] pub(crate) enum DbBackend { V0(DbV0), V1(DbV1), diff --git a/zaino-state/src/chain_index/finalised_state/db/v1.rs b/zaino-state/src/chain_index/finalised_state/db/v1.rs index 226570316..d6111564f 100644 --- a/zaino-state/src/chain_index/finalised_state/db/v1.rs +++ b/zaino-state/src/chain_index/finalised_state/db/v1.rs @@ -385,6 +385,7 @@ impl TransparentHistExt for DbV1 { /// Zaino’s Finalised state database V1. /// Implements a persistent LMDB-backed chain index for fast read access and verified data. +#[derive(Debug)] pub(crate) struct DbV1 { /// Shared LMDB environment. env: Arc, diff --git a/zaino-state/src/chain_index/finalised_state/reader.rs b/zaino-state/src/chain_index/finalised_state/reader.rs index 3b40d3cda..d1cacf4fe 100644 --- a/zaino-state/src/chain_index/finalised_state/reader.rs +++ b/zaino-state/src/chain_index/finalised_state/reader.rs @@ -27,7 +27,7 @@ use std::sync::Arc; /// Immutable view onto an already-running [`ZainoDB`]. /// /// Carries a plain reference with the same lifetime as the parent DB -#[derive(Clone)] +#[derive(Debug, Clone)] pub(crate) struct DbReader { /// Immutable read-only view onto the running ZainoDB pub(crate) inner: Arc, diff --git a/zaino-state/src/chain_index/finalised_state/router.rs b/zaino-state/src/chain_index/finalised_state/router.rs index cc5e2246c..7a9661ec4 100644 --- a/zaino-state/src/chain_index/finalised_state/router.rs +++ b/zaino-state/src/chain_index/finalised_state/router.rs @@ -21,6 +21,7 @@ use std::sync::{ Arc, }; +#[derive(Debug)] pub(crate) struct Router { /// Primary active database. primary: ArcSwap, diff --git a/zaino-state/src/chain_index/mempool.rs b/zaino-state/src/chain_index/mempool.rs index a8de3e838..f3f2aa988 100644 --- a/zaino-state/src/chain_index/mempool.rs +++ b/zaino-state/src/chain_index/mempool.rs @@ -4,13 +4,15 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ broadcast::{Broadcast, BroadcastSubscriber}, - chain_index::source::{BlockchainSource, BlockchainSourceError}, + chain_index::{ + source::{BlockchainSource, BlockchainSourceError}, + types::MempoolInfo, + }, error::{MempoolError, StatusError}, status::{AtomicStatus, StatusType}, BlockHash, }; use tracing::{info, warn}; -use zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse; use zebra_chain::{block::Hash, transaction::SerializedTransaction}; /// Mempool key @@ -280,7 +282,7 @@ impl Mempool { /// Returns information about the mempool. Used by the `getmempoolinfo` RPC. /// Computed from local Broadcast state. - pub async fn get_mempool_info(&self) -> Result { + pub async fn get_mempool_info(&self) -> MempoolInfo { let map = self.state.get_state(); let size = map.len() as u64; @@ -298,7 +300,7 @@ impl Mempool { let usage = bytes.saturating_add(key_heap_bytes); - Ok(GetMempoolInfoResponse { size, bytes, usage }) + MempoolInfo { size, bytes, usage } } #[inline] @@ -498,7 +500,7 @@ impl MempoolSubscriber { /// Returns information about the mempool. Used by the `getmempoolinfo` RPC. /// Computed from local Broadcast state. - pub async fn get_mempool_info(&self) -> Result { + pub async fn get_mempool_info(&self) -> MempoolInfo { let mempool_transactions: Vec<(MempoolKey, MempoolValue)> = self.subscriber.get_filtered_state(&HashSet::new()); @@ -517,7 +519,7 @@ impl MempoolSubscriber { let usage: u64 = bytes.saturating_add(key_heap_bytes); - Ok(GetMempoolInfoResponse { size, bytes, usage }) + MempoolInfo { size, bytes, usage } } /// Returns the status of the mempool. diff --git a/zaino-state/src/chain_index/non_finalised_state.rs b/zaino-state/src/chain_index/non_finalised_state.rs index 0392a5767..a58db7221 100644 --- a/zaino-state/src/chain_index/non_finalised_state.rs +++ b/zaino-state/src/chain_index/non_finalised_state.rs @@ -17,6 +17,7 @@ use zebra_chain::parameters::Network; use zebra_state::HashOrHeight; /// Holds the block cache +#[derive(Debug)] pub struct NonFinalizedState { /// We need access to the validator's best block hash, as well /// as a source of blocks @@ -353,11 +354,13 @@ impl NonFinalizedState { // // see https://github.com/ZcashFoundation/zebra/issues/9541 + dbg!(&best_tip); + while let Some(block) = self .source - .get_block(HashOrHeight::Height(zebra_chain::block::Height( - u32::from(best_tip.0) + 1, - ))) + .get_block(HashOrHeight::Height(zebra_chain::block::Height(dbg!( + u32::from(best_tip.0) + 1 + )))) .await .map_err(|e| { // TODO: Check error. Determine what kind of error to return, this may be recoverable @@ -383,7 +386,9 @@ impl NonFinalizedState { )) })?, }; + dbg!(&block.coinbase_height()); let chainblock = self.block_to_chainblock(prev_block, &block).await?; + dbg!(&chainblock.index()); info!( "syncing block {} at height {}", &chainblock.index().hash(), diff --git a/zaino-state/src/chain_index/source.rs b/zaino-state/src/chain_index/source.rs index 7f76b09ea..fc9282ca3 100644 --- a/zaino-state/src/chain_index/source.rs +++ b/zaino-state/src/chain_index/source.rs @@ -200,38 +200,52 @@ impl BlockchainSource for ValidatorConnector { let GetTreestateResponse { sapling, orchard, .. } = tree_responses; + // Sapling let sapling_frontier = sapling .commitments() .final_state() - .as_ref() - .map(hex::decode) - .transpose() - .map_err(|_e| { - BlockchainSourceError::Unrecoverable( - InvalidData(format!("could not interpret sapling tree of block {id}")) - .to_string(), - ) - })? - .as_deref() + .as_deref() // Option<&[u8]> .map(read_commitment_tree::) .transpose() .map_err(|e| BlockchainSourceError::Unrecoverable(format!("io error: {e}")))?; + + // Orchard (same pattern) let orchard_frontier = orchard .commitments() .final_state() - .as_ref() - .map(hex::decode) - .transpose() - .map_err(|_e| { - BlockchainSourceError::Unrecoverable( - InvalidData(format!("could not interpret orchard tree of block {id}")) - .to_string(), - ) - })? .as_deref() .map(read_commitment_tree::) .transpose() .map_err(|e| BlockchainSourceError::Unrecoverable(format!("io error: {e}")))?; + // let sapling_frontier = dbg!(sapling.commitments().final_state().as_ref()) + // .map(hex::decode) + // .transpose() + // .map_err(|_e| { + // BlockchainSourceError::Unrecoverable( + // InvalidData(format!("could not interpret sapling tree of block {id}")) + // .to_string(), + // ) + // })? + // .as_deref() + // .map(read_commitment_tree::) + // .transpose() + // .map_err(|e| BlockchainSourceError::Unrecoverable(format!("io error: {e}")))?; + // let orchard_frontier = orchard + // .commitments() + // .final_state() + // .as_ref() + // .map(hex::decode) + // .transpose() + // .map_err(|_e| { + // BlockchainSourceError::Unrecoverable( + // InvalidData(format!("could not interpret orchard tree of block {id}")) + // .to_string(), + // ) + // })? + // .as_deref() + // .map(read_commitment_tree::) + // .transpose() + // .map_err(|e| BlockchainSourceError::Unrecoverable(format!("io error: {e}")))?; let sapling_root = sapling_frontier .map(|tree| { zebra_chain::sapling::tree::Root::try_from(*tree.root().as_ref()) diff --git a/zaino-state/src/chain_index/tests.rs b/zaino-state/src/chain_index/tests.rs index 116dde176..10ff75b0c 100644 --- a/zaino-state/src/chain_index/tests.rs +++ b/zaino-state/src/chain_index/tests.rs @@ -97,7 +97,7 @@ mod mockchain_tests { let indexer = NodeBackedChainIndex::new(source.clone(), config) .await .unwrap(); - let index_reader = indexer.subscriber().await; + let index_reader = indexer.subscriber(); loop { let check_height: u32 = match active_mockchain_source { @@ -407,7 +407,7 @@ mod mockchain_tests { let mempool_stream_task = tokio::spawn(async move { let nonfinalized_snapshot = index_reader.snapshot_nonfinalized_state(); let mut mempool_stream = index_reader - .get_mempool_stream(&nonfinalized_snapshot) + .get_mempool_stream(Some(&nonfinalized_snapshot)) .expect("failed to create mempool stream"); let mut indexer_mempool_transactions: Vec = @@ -451,7 +451,7 @@ mod mockchain_tests { mockchain.mine_blocks(1); sleep(Duration::from_millis(2000)).await; - let mempool_stream = index_reader.get_mempool_stream(&stale_nonfinalized_snapshot); + let mempool_stream = index_reader.get_mempool_stream(Some(&stale_nonfinalized_snapshot)); assert!(mempool_stream.is_none()); } diff --git a/zaino-state/src/chain_index/tests/mempool.rs b/zaino-state/src/chain_index/tests/mempool.rs index 9c10cae64..cac086990 100644 --- a/zaino-state/src/chain_index/tests/mempool.rs +++ b/zaino-state/src/chain_index/tests/mempool.rs @@ -222,7 +222,7 @@ async fn get_mempool_info() { dbg!(&transaction.hash()); } - let subscriber_mempool_info = subscriber.get_mempool_info().await.unwrap(); + let subscriber_mempool_info = subscriber.get_mempool_info().await; let expected_size: u64 = mempool_transactions.len() as u64; diff --git a/zaino-state/src/chain_index/types.rs b/zaino-state/src/chain_index/types.rs index 03f953c58..c3a1fb14f 100644 --- a/zaino-state/src/chain_index/types.rs +++ b/zaino-state/src/chain_index/types.rs @@ -3274,3 +3274,65 @@ pub mod serde_arrays { .map_err(|_| serde::de::Error::custom(format!("invalid length for [u8; {N}]"))) } } + +// *** Metadata Objects *** + +/// Holds information about the mempool state. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct MempoolInfo { + /// Current tx count + pub size: u64, + /// Sum of all tx sizes + pub bytes: u64, + /// Total memory usage for the mempool + pub usage: u64, +} + +impl ZainoVersionedSerialise for MempoolInfo { + const VERSION: u8 = version::V1; + + fn encode_body(&self, w: &mut W) -> io::Result<()> { + let mut w = w; + write_u64_le(&mut w, self.size)?; + write_u64_le(&mut w, self.bytes)?; + write_u64_le(&mut w, self.usage) + } + + fn decode_latest(r: &mut R) -> io::Result { + Self::decode_v1(r) + } + + fn decode_v1(r: &mut R) -> io::Result { + let mut r = r; + let size = read_u64_le(&mut r)?; + let bytes = read_u64_le(&mut r)?; + let usage = read_u64_le(&mut r)?; + Ok(MempoolInfo { size, bytes, usage }) + } +} + +/// 24 byte body. +impl FixedEncodedLen for MempoolInfo { + /// 8 byte size + 8 byte bytes + 8 byte usage + const ENCODED_LEN: usize = 8 + 8 + 8; +} + +impl From for MempoolInfo { + fn from(resp: zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse) -> Self { + MempoolInfo { + size: resp.size, + bytes: resp.bytes, + usage: resp.usage, + } + } +} + +impl From for zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse { + fn from(info: MempoolInfo) -> Self { + zaino_fetch::jsonrpsee::response::GetMempoolInfoResponse { + size: info.size, + bytes: info.bytes, + usage: info.usage, + } + } +} diff --git a/zaino-state/src/error.rs b/zaino-state/src/error.rs index c780eb18d..c8cd5fedc 100644 --- a/zaino-state/src/error.rs +++ b/zaino-state/src/error.rs @@ -49,6 +49,10 @@ pub enum StateServiceError { #[error("RPC error: {0:?}")] RpcError(#[from] zaino_fetch::jsonrpsee::connector::RpcError), + /// Chain index error. + #[error("Chain index error: {0}")] + ChainIndexError(#[from] ChainIndexError), + /// Error from the block cache. #[error("Mempool error: {0}")] BlockCacheError(#[from] BlockCacheError), @@ -126,6 +130,12 @@ impl From for tonic::Status { tonic::Status::internal(err.to_string()) } StateServiceError::UnhandledRpcError(e) => tonic::Status::internal(e.to_string()), + StateServiceError::ChainIndexError(err) => match err.kind { + ChainIndexErrorKind::InternalServerError => tonic::Status::internal(err.message), + ChainIndexErrorKind::InvalidSnapshot => { + tonic::Status::failed_precondition(err.message) + } + }, } } } @@ -172,6 +182,10 @@ pub enum FetchServiceError { #[error("JsonRpcConnector error: {0}")] JsonRpcConnectorError(#[from] zaino_fetch::jsonrpsee::error::TransportError), + /// Chain index error. + #[error("Chain index error: {0}")] + ChainIndexError(#[from] ChainIndexError), + /// Error from the block cache. #[error("Mempool error: {0}")] BlockCacheError(#[from] BlockCacheError), @@ -213,9 +227,16 @@ impl From for tonic::Status { FetchServiceError::SerializationError(err) => { tonic::Status::internal(format!("Serialization error: {err}")) } + FetchServiceError::ChainIndexError(err) => match err.kind { + ChainIndexErrorKind::InternalServerError => tonic::Status::internal(err.message), + ChainIndexErrorKind::InvalidSnapshot => { + tonic::Status::failed_precondition(err.message) + } + }, } } } + /// These aren't the best conversions, but the MempoolError should go away /// in favor of a new type with the new chain cache is complete impl From> for MempoolError { @@ -323,6 +344,7 @@ pub enum BlockCacheError { #[error("Integer conversion error: {0}")] TryFromIntError(#[from] std::num::TryFromIntError), } + /// These aren't the best conversions, but the NonFinalizedStateError should go away /// in favor of a new type with the new chain cache is complete impl From> for NonFinalisedStateError { @@ -379,6 +401,7 @@ pub enum NonFinalisedStateError { #[error("Status error: {0:?}")] StatusError(StatusError), } + /// These aren't the best conversions, but the FinalizedStateError should go away /// in favor of a new type with the new chain cache is complete impl From> for FinalisedStateError { @@ -561,6 +584,7 @@ impl ChainIndexError { } } } + impl From for ChainIndexError { fn from(value: FinalisedStateError) -> Self { let message = match &value { diff --git a/zaino-state/src/indexer.rs b/zaino-state/src/indexer.rs index a75d03c80..1fa5be905 100644 --- a/zaino-state/src/indexer.rs +++ b/zaino-state/src/indexer.rs @@ -730,6 +730,7 @@ pub trait LightWalletIndexer: Send + Sync + Clone + ZcashIndexer + 'static { /// NOTE: Currently unimplemented in Zaino. async fn ping(&self, request: Duration) -> Result; } + /// Zcash Service functionality. #[async_trait] pub trait LightWalletService: Sized + ZcashService {} diff --git a/zaino-state/src/lib.rs b/zaino-state/src/lib.rs index 015573797..24faa0a3f 100644 --- a/zaino-state/src/lib.rs +++ b/zaino-state/src/lib.rs @@ -41,6 +41,7 @@ pub use chain_index::non_finalised_state::{ UpdateError, }; // NOTE: Should these be pub at all? +pub use chain_index::mempool::{MempoolKey, MempoolValue}; pub use chain_index::types::{ AddrHistRecord, AddrScript, BlockData, BlockHash, BlockHeaderData, BlockIndex, ChainBlock, ChainWork, CommitmentTreeData, CommitmentTreeRoots, CommitmentTreeSizes, CompactOrchardAction, @@ -52,8 +53,6 @@ pub use chain_index::types::{ pub(crate) mod local_cache; -pub use chain_index::mempool::{MempoolKey, MempoolValue}; - #[cfg(feature = "bench")] /// allow public access to additional APIs, for testing pub mod bench { diff --git a/zaino-state/src/utils.rs b/zaino-state/src/utils.rs index 09e31a8b0..2ab1f4280 100644 --- a/zaino-state/src/utils.rs +++ b/zaino-state/src/utils.rs @@ -6,6 +6,8 @@ use zaino_proto::proto::service::BlockId; use zebra_chain::{block::Height, parameters::Network}; use zebra_state::HashOrHeight; +// *** Metadata structs *** + /// Zaino build info. #[derive(Debug, Clone)] pub(crate) struct BuildInfo { @@ -116,6 +118,8 @@ impl fmt::Display for ServiceMetadata { } } +// *** Data transforms *** + pub(crate) fn blockid_to_hashorheight(block_id: BlockId) -> Option { <[u8; 32]>::try_from(block_id.hash) .map(zebra_chain::block::Hash) @@ -128,3 +132,26 @@ pub(crate) fn blockid_to_hashorheight(block_id: BlockId) -> Option }) .ok() } + +/// Strips the ouputs and from all transactions, retains only +/// the nullifier from all orcard actions, and clears the chain +/// metadata from the block +pub(crate) fn compact_block_to_nullifiers( + mut block: zaino_proto::proto::compact_formats::CompactBlock, +) -> zaino_proto::proto::compact_formats::CompactBlock { + for ctransaction in &mut block.vtx { + ctransaction.outputs = Vec::new(); + for caction in &mut ctransaction.actions { + *caction = zaino_proto::proto::compact_formats::CompactOrchardAction { + nullifier: caction.nullifier.clone(), + ..Default::default() + } + } + } + + block.chain_metadata = Some(zaino_proto::proto::compact_formats::ChainMetadata { + sapling_commitment_tree_size: 0, + orchard_commitment_tree_size: 0, + }); + block +} diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index 3e67a7fe8..3ad64d36a 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -724,7 +724,7 @@ mod launch_testmanager { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn basic() { let mut test_manager = TestManager::launch( &ValidatorKind::Zcashd, @@ -747,7 +747,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn generate_blocks() { let mut test_manager = TestManager::launch( &ValidatorKind::Zcashd, @@ -776,7 +776,7 @@ mod launch_testmanager { } #[ignore = "chain cache needs development"] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn with_chain() { let mut test_manager = TestManager::launch( &ValidatorKind::Zcashd, @@ -799,7 +799,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino() { let mut test_manager = TestManager::launch( &ValidatorKind::Zcashd, @@ -832,7 +832,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients() { let mut test_manager = TestManager::launch( &ValidatorKind::Zcashd, @@ -860,7 +860,7 @@ mod launch_testmanager { /// This test shows currently we do not receive mining rewards from Zebra unless we mine 100 blocks at a time. /// This is not the case with Zcashd and should not be the case here. /// Even if rewards need 100 confirmations these blocks should not have to be mined at the same time. - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward() { let mut test_manager = TestManager::launch( &ValidatorKind::Zcashd, @@ -904,7 +904,7 @@ mod launch_testmanager { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn basic() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -927,7 +927,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn generate_blocks() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -956,7 +956,7 @@ mod launch_testmanager { } #[ignore = "chain cache needs development"] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn with_chain() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -979,7 +979,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1012,7 +1012,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1040,7 +1040,7 @@ mod launch_testmanager { /// This test shows currently we do not receive mining rewards from Zebra unless we mine 100 blocks at a time. /// This is not the case with Zcashd and should not be the case here. /// Even if rewards need 100 confirmations these blocks should not have to be mined at the same time. - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1079,7 +1079,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward_and_send() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1163,7 +1163,7 @@ mod launch_testmanager { } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_testnet() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1194,7 +1194,7 @@ mod launch_testmanager { use super::*; - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn basic() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1217,7 +1217,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn generate_blocks() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1246,7 +1246,7 @@ mod launch_testmanager { } #[ignore = "chain cache needs development"] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn with_chain() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1269,7 +1269,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1302,7 +1302,7 @@ mod launch_testmanager { test_manager.close().await; } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1331,7 +1331,7 @@ mod launch_testmanager { /// This is not the case with Zcashd and should not be the case here. /// Even if rewards need 100 confirmations these blocks should not have to be mined at the same time. #[ignore = "Bug in zingolib 1.0 sync, reinstate on zinglib 2.0 upgrade."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1372,7 +1372,7 @@ mod launch_testmanager { } #[ignore = "Bug in zingolib 1.0 sync, reinstate on zinglib 2.0 upgrade."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_clients_receive_mining_reward_and_send() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad, @@ -1454,7 +1454,7 @@ mod launch_testmanager { } #[ignore = "requires fully synced testnet."] - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub(crate) async fn zaino_testnet() { let mut test_manager = TestManager::launch( &ValidatorKind::Zebrad,