From b1a75811dfa7bbd46b4b40b33eba7a2328853e55 Mon Sep 17 00:00:00 2001 From: dorianvp Date: Sun, 19 Oct 2025 21:13:33 -0300 Subject: [PATCH 1/5] flakey tests. DO NOT MERGE --- integration-tests/tests/chain_cache.rs | 41 ++++++++++++++++++++++++++ zaino-testutils/src/lib.rs | 2 +- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index a31a1c786..06be49a52 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -239,6 +239,47 @@ mod chain_query_interface { } } + // Copied over from `get_block_range` + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn repro_flake_zcashd() { + let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + create_test_manager_and_chain_index( + &ValidatorKind::Zcashd, + None, + false, + false, + false, + false, + ) + .await; + + // this delay had to increase. Maybe we tweak sync loop rerun time? + test_manager.generate_blocks_with_delay(5).await; + let snapshot = indexer.snapshot_nonfinalized_state(); + assert_eq!(snapshot.as_ref().blocks.len(), 8); + } + + // Copied over from `get_block_range` + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn repro_flake_zebrad() { + let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + create_test_manager_and_chain_index( + &ValidatorKind::Zebrad, + None, + false, + false, + false, + false, + ) + .await; + + // this delay had to increase. Maybe we tweak sync loop rerun time? + test_manager.generate_blocks_with_delay(5).await; + // indexer. + let snapshot = indexer.snapshot_nonfinalized_state(); + assert_eq!(snapshot.as_ref().blocks.len(), 8); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_block_range_zebrad() { get_block_range(&ValidatorKind::Zebrad).await diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index ee7a7b31e..9f2bb806a 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -608,7 +608,7 @@ impl TestManager { pub async fn generate_blocks_with_delay(&self, blocks: u32) { for _ in 0..blocks { self.local_net.generate_blocks(1).await.unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } From f0b85db5d337113566369f1cbf598350457a460a Mon Sep 17 00:00:00 2001 From: dorianvp Date: Mon, 20 Oct 2025 00:06:38 -0300 Subject: [PATCH 2/5] test(chain_index): fix for repro --- integration-tests/tests/chain_cache.rs | 15 +++- zaino-state/src/chain_index.rs | 78 +++++++++++++++++++ .../src/chain_index/non_finalised_state.rs | 3 + 3 files changed, 95 insertions(+), 1 deletion(-) diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index 06be49a52..85f0708aa 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -242,7 +242,7 @@ mod chain_query_interface { // Copied over from `get_block_range` #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn repro_flake_zcashd() { - let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + let (test_manager, json_service, _option_state_service, _chain_index, indexer) = create_test_manager_and_chain_index( &ValidatorKind::Zcashd, None, @@ -255,6 +255,19 @@ mod chain_query_interface { // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; + + let node_tip: zebra_chain::block::Height = + json_service.get_block_count().await.unwrap().into(); + + // Try commenting this out + match indexer + .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .await + { + Ok(()) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } + let snapshot = indexer.snapshot_nonfinalized_state(); assert_eq!(snapshot.as_ref().blocks.len(), 8); } diff --git a/zaino-state/src/chain_index.rs b/zaino-state/src/chain_index.rs index db7358da8..05ac9d0f9 100644 --- a/zaino-state/src/chain_index.rs +++ b/zaino-state/src/chain_index.rs @@ -22,6 +22,7 @@ use std::{sync::Arc, time::Duration}; use futures::{FutureExt, Stream}; use non_finalised_state::NonfinalizedBlockCacheSnapshot; use source::{BlockchainSource, ValidatorConnector}; +use tokio::time::{timeout, Interval, MissedTickBehavior}; use tokio_stream::StreamExt; use tracing::info; use zebra_chain::parameters::ConsensusBranchId; @@ -610,6 +611,83 @@ impl NodeBackedChainIndexSubscriber { .into_iter(), )) } + + /// Wait until the non-finalized best tip height is >= `min_height`. + /// Returns Err on timeout or if the index is closing. + pub async fn wait_for_height( + &self, + min_height: types::Height, + deadline: Duration, + ) -> Result<(), ChainIndexError> { + if self.non_finalized_state.get_snapshot().best_tip.height >= min_height { + return Ok(()); + } + + timeout(deadline, async { + let mut ticker: Interval = tokio::time::interval(Duration::from_millis(25)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + ticker.tick().await; + + if self.status.load() == StatusType::Closing { + return Err(ChainIndexError { + kind: ChainIndexErrorKind::InternalServerError, + message: "index is shutting down while waiting for height".into(), + source: None, + }); + } + + let snap = self.non_finalized_state.get_snapshot(); + if snap.best_tip.height >= min_height { + return Ok(()); + } + } + }) + .await + .map_err(|_| ChainIndexError { + kind: ChainIndexErrorKind::InternalServerError, + message: format!("timeout waiting for height >= {}", min_height.0), + source: None, + })? + } + + /// Wait until a specific block hash is present in the snapshot. + pub async fn wait_for_hash( + &self, + hash: types::BlockHash, + deadline: Duration, + ) -> Result { + timeout(deadline, async { + let mut ticker = tokio::time::interval(Duration::from_millis(25)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + ticker.tick().await; + + if self.status.load() == StatusType::Closing { + return Err(ChainIndexError { + kind: ChainIndexErrorKind::InternalServerError, + message: "index is shutting down while waiting for hash".into(), + source: None, + }); + } + + let snap = self.non_finalized_state.get_snapshot(); + if let Some(block) = snap.get_chainblock_by_hash(&hash) { + if let Some(h) = block.height() { + return Ok(h); + } + } + } + }) + .await + .map_err(|_| ChainIndexError { + kind: ChainIndexErrorKind::InternalServerError, + message: "timeout waiting for block hash to appear".into(), + source: None, + })? + } } impl ChainIndex for NodeBackedChainIndexSubscriber { diff --git a/zaino-state/src/chain_index/non_finalised_state.rs b/zaino-state/src/chain_index/non_finalised_state.rs index e312cf4c2..130400dce 100644 --- a/zaino-state/src/chain_index/non_finalised_state.rs +++ b/zaino-state/src/chain_index/non_finalised_state.rs @@ -263,6 +263,7 @@ impl NonFinalizedState { network: &Network, start_block: Option, ) -> Result { + dbg!(&start_block); match start_block { Some(block) => Ok(block), None => Self::get_genesis_indexed_block(source, network).await, @@ -295,6 +296,8 @@ impl NonFinalizedState { .fetch_main_chain_blocks(&initial_state, &mut nonbest_blocks) .await?; + dbg!(&new_blocks.len()); + // Stage and update new blocks self.stage_new_blocks(new_blocks, &finalized_db).await?; From fed8b93d33ad977472f27495a040295c3401c749 Mon Sep 17 00:00:00 2001 From: dorianvp Date: Mon, 20 Oct 2025 02:03:27 -0300 Subject: [PATCH 3/5] chore: wip --- integration-tests/tests/chain_cache.rs | 126 ++++++++++++++++++++++--- zaino-testutils/src/lib.rs | 9 +- 2 files changed, 119 insertions(+), 16 deletions(-) diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index 85f0708aa..cd6e2a616 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -275,7 +275,7 @@ mod chain_query_interface { // Copied over from `get_block_range` #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn repro_flake_zebrad() { - let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + let (test_manager, json_service, _option_state_service, _chain_index, indexer) = create_test_manager_and_chain_index( &ValidatorKind::Zebrad, None, @@ -288,7 +288,18 @@ mod chain_query_interface { // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; - // indexer. + + let node_tip: zebra_chain::block::Height = + json_service.get_block_count().await.unwrap().into(); + + match indexer + .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .await + { + Ok(()) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } + let snapshot = indexer.snapshot_nonfinalized_state(); assert_eq!(snapshot.as_ref().blocks.len(), 8); } @@ -304,11 +315,23 @@ mod chain_query_interface { } async fn get_block_range(validator: &ValidatorKind) { - let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + let (test_manager, json_service, _option_state_service, _chain_index, indexer) = create_test_manager_and_chain_index(validator, None, false, false, false, false).await; // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; + + let node_tip: zebra_chain::block::Height = + dbg!(json_service.get_block_count().await.unwrap().into()); + + match indexer + .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .await + { + Ok(()) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } + let snapshot = indexer.snapshot_nonfinalized_state(); assert_eq!(snapshot.as_ref().blocks.len(), 8); let range = indexer @@ -346,11 +369,23 @@ mod chain_query_interface { } async fn find_fork_point(validator: &ValidatorKind) { - let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + let (test_manager, json_service, _option_state_service, _chain_index, indexer) = create_test_manager_and_chain_index(validator, None, false, false, false, false).await; // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; + + let node_tip: zebra_chain::block::Height = + dbg!(json_service.get_block_count().await.unwrap().into()); + + match indexer + .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .await + { + Ok(()) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } + let snapshot = indexer.snapshot_nonfinalized_state(); assert_eq!(snapshot.as_ref().blocks.len(), 8); for block_hash in snapshot.heights_to_hashes.values() { @@ -378,11 +413,23 @@ mod chain_query_interface { } async fn get_raw_transaction(validator: &ValidatorKind) { - let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + let (test_manager, json_service, _option_state_service, _chain_index, indexer) = create_test_manager_and_chain_index(validator, None, false, false, false, false).await; // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; + + let node_tip: zebra_chain::block::Height = + dbg!(json_service.get_block_count().await.unwrap().into()); + + match indexer + .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .await + { + Ok(()) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } + let snapshot = indexer.snapshot_nonfinalized_state(); assert_eq!(snapshot.as_ref().blocks.len(), 8); for (txid, height) in snapshot.blocks.values().flat_map(|block| { @@ -432,7 +479,7 @@ mod chain_query_interface { } async fn get_transaction_status(validator: &ValidatorKind) { - let (test_manager, _json_service, _option_state_service, _chain_index, indexer) = + let (test_manager, json_service, _option_state_service, _chain_index, indexer) = create_test_manager_and_chain_index(validator, None, false, false, false, false).await; let snapshot = indexer.snapshot_nonfinalized_state(); // I don't know where this second block is generated. Somewhere in the @@ -441,6 +488,18 @@ mod chain_query_interface { // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; + + let node_tip: zebra_chain::block::Height = + dbg!(json_service.get_block_count().await.unwrap().into()); + + match indexer + .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .await + { + Ok(()) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } + let snapshot = indexer.snapshot_nonfinalized_state(); assert_eq!(snapshot.as_ref().blocks.len(), 8); for (txid, height, block_hash) in snapshot.blocks.values().flat_map(|block| { @@ -477,17 +536,60 @@ mod chain_query_interface { // this delay had to increase. Maybe we tweak sync loop rerun time? test_manager.generate_blocks_with_delay(5).await; + + // let node_tip: zebra_chain::block::Height = + // dbg!(json_service.get_block_count().await.unwrap().into()); + + let latest_block_hash: zebra_chain::block::Hash = + json_service.get_best_blockhash().await.unwrap().0; + + match indexer + // .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .wait_for_hash(latest_block_hash.into(), Duration::from_secs(30)) + .await { - let chain_height = - Height::try_from(json_service.get_blockchain_info().await.unwrap().blocks.0) - .unwrap(); - let indexer_height = indexer.snapshot_nonfinalized_state().best_tip.height; - assert_eq!(chain_height, indexer_height); + Ok(_) => (), + Err(e) => panic!("wait_for_height failed: {}", e), } + // // TODO: This block could probably be removed + // { + // let chain_height = + // Height::try_from(json_service.get_blockchain_info().await.unwrap().blocks.0) + // .unwrap(); + // let indexer_height = indexer.snapshot_nonfinalized_state().best_tip.height; + // assert_eq!(chain_height, indexer_height); + // } + test_manager.generate_blocks_with_delay(150).await; - tokio::time::sleep(std::time::Duration::from_millis(5000)).await; + // tokio::time::sleep(std::time::Duration::from_millis(5000)).await; + + // let node_tip: zebra_chain::block::Height = + // dbg!(json_service.get_block_count().await.unwrap().into()); + + // match indexer + // .wait_for_height( + // Height::try_from(node_tip).unwrap(), + // Duration::from_secs(600), + // ) + // .await + // { + // Ok(()) => (), + // Err(e) => panic!("wait_for_height failed: {}", e), + // } + + let latest_block_hash: zebra_chain::block::Hash = + json_service.get_best_blockhash().await.unwrap().0; + + match indexer + // .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .wait_for_hash(latest_block_hash.into(), Duration::from_secs(300)) + .await + { + Ok(_) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } let snapshot = indexer.snapshot_nonfinalized_state(); let chain_height = json_service.get_blockchain_info().await.unwrap().blocks.0; diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index 9f2bb806a..beff4b074 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -606,10 +606,11 @@ impl TestManager { /// Generates `blocks` regtest blocks. /// Adds a delay between blocks to allow zaino / zebra to catch up with test. pub async fn generate_blocks_with_delay(&self, blocks: u32) { - for _ in 0..blocks { - self.local_net.generate_blocks(1).await.unwrap(); - // tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } + self.local_net.generate_blocks(blocks).await.unwrap(); + // for _ in 0..blocks { + // self.local_net.generate_blocks(1).await.unwrap(); + // // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // } } /// Closes the TestManager. From 4b757a179377a5c471235c8aa472d7f3865c9b6d Mon Sep 17 00:00:00 2001 From: dorianvp Date: Mon, 20 Oct 2025 16:52:18 -0300 Subject: [PATCH 4/5] test(chain_index): add new repro and potential fix --- integration-tests/tests/chain_cache.rs | 75 +++++++++++-------- zaino-state/src/chain_index.rs | 40 +++++++++- .../src/chain_index/non_finalised_state.rs | 2 - zaino-testutils/src/lib.rs | 4 +- 4 files changed, 84 insertions(+), 37 deletions(-) diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index cd6e2a616..247b41399 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -259,9 +259,11 @@ mod chain_query_interface { let node_tip: zebra_chain::block::Height = json_service.get_block_count().await.unwrap().into(); - // Try commenting this out match indexer - .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .wait_for_height( + Height::try_from(node_tip).unwrap(), + Duration::from_secs(300), + ) .await { Ok(()) => (), @@ -293,7 +295,10 @@ mod chain_query_interface { json_service.get_block_count().await.unwrap().into(); match indexer - .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) + .wait_for_height( + Height::try_from(node_tip).unwrap(), + Duration::from_secs(300), + ) .await { Ok(()) => (), @@ -552,39 +557,20 @@ mod chain_query_interface { Err(e) => panic!("wait_for_height failed: {}", e), } - // // TODO: This block could probably be removed - // { - // let chain_height = - // Height::try_from(json_service.get_blockchain_info().await.unwrap().blocks.0) - // .unwrap(); - // let indexer_height = indexer.snapshot_nonfinalized_state().best_tip.height; - // assert_eq!(chain_height, indexer_height); - // } - test_manager.generate_blocks_with_delay(150).await; - // tokio::time::sleep(std::time::Duration::from_millis(5000)).await; + // let latest_block_hash: zebra_chain::block::Hash = + // dbg!(json_service.get_best_blockhash().await.unwrap().0); - // let node_tip: zebra_chain::block::Height = - // dbg!(json_service.get_block_count().await.unwrap().into()); - - // match indexer - // .wait_for_height( - // Height::try_from(node_tip).unwrap(), - // Duration::from_secs(600), - // ) - // .await - // { - // Ok(()) => (), - // Err(e) => panic!("wait_for_height failed: {}", e), - // } - - let latest_block_hash: zebra_chain::block::Hash = - json_service.get_best_blockhash().await.unwrap().0; + let node_tip: zebra_chain::block::Height = + dbg!(json_service.get_block_count().await.unwrap().into()); match indexer - // .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(30)) - .wait_for_hash(latest_block_hash.into(), Duration::from_secs(300)) + .wait_for_height( + Height::try_from(node_tip).unwrap(), + Duration::from_secs(120), + ) + // .wait_for_hash(latest_block_hash.into(), Duration::from_secs(300)) .await { Ok(_) => (), @@ -624,4 +610,31 @@ mod chain_query_interface { .unwrap(); } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn repro_nfs_drain() { + let (test_manager, json_service, _option_state_service, _chain_index, indexer) = + create_test_manager_and_chain_index( + &ValidatorKind::Zebrad, + None, + false, + false, + false, + false, + ) + .await; + + test_manager.generate_blocks_with_delay(200).await; + + let node_tip: zebra_chain::block::Height = + dbg!(json_service.get_block_count().await.unwrap().into()); + + match indexer + .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(20)) + .await + { + Ok(_) => (), + Err(e) => panic!("wait_for_height failed: {}", e), + } + } } diff --git a/zaino-state/src/chain_index.rs b/zaino-state/src/chain_index.rs index 05ac9d0f9..ecb1f0e38 100644 --- a/zaino-state/src/chain_index.rs +++ b/zaino-state/src/chain_index.rs @@ -22,13 +22,15 @@ use std::{sync::Arc, time::Duration}; use futures::{FutureExt, Stream}; use non_finalised_state::NonfinalizedBlockCacheSnapshot; use source::{BlockchainSource, ValidatorConnector}; -use tokio::time::{timeout, Interval, MissedTickBehavior}; +use tokio::time::{timeout, Instant, Interval, MissedTickBehavior}; use tokio_stream::StreamExt; use tracing::info; +use zebra_chain::block::Height; +use zebra_chain::chain_tip::ChainTip; use zebra_chain::parameters::ConsensusBranchId; pub use zebra_chain::parameters::Network as ZebraNetwork; use zebra_chain::serialization::ZcashSerialize; -use zebra_state::HashOrHeight; +use zebra_state::{ChainTipChange, HashOrHeight, LatestChainTip, TipAction}; pub mod encoding; /// All state at least 100 blocks old @@ -688,6 +690,40 @@ impl NodeBackedChainIndexSubscriber { source: None, })? } + + /// Wait until Zebra’s *best tip* (non-finalized if available, else finalized) + /// reaches at least `target`. Handles both Grow and Reset actions and skipped blocks. + pub async fn wait_for_tip_at_least( + target: Height, + mut changes: ChainTipChange, + latest: LatestChainTip, + max_wait: Duration, + ) -> Result<(), ChainIndexError> { + let deadline = Instant::now() + max_wait; + + if latest.best_tip_height().map_or(false, |h| h >= target) { + return Ok(()); + } + + loop { + let remain = deadline + .checked_duration_since(Instant::now()) + .unwrap_or_default(); + if remain.is_zero() { + panic!("timeout waiting for height >= {0}", target.0); + } + + let action: TipAction = timeout(remain, changes.wait_for_tip_change()) + .await + .unwrap() + .unwrap(); + + // TipAction can be Grow{..} or Reset{..}. Both expose the new best height: + if action.best_tip_height() >= target { + return Ok(()); + } + } + } } impl ChainIndex for NodeBackedChainIndexSubscriber { diff --git a/zaino-state/src/chain_index/non_finalised_state.rs b/zaino-state/src/chain_index/non_finalised_state.rs index 130400dce..d1cd1ea00 100644 --- a/zaino-state/src/chain_index/non_finalised_state.rs +++ b/zaino-state/src/chain_index/non_finalised_state.rs @@ -296,8 +296,6 @@ impl NonFinalizedState { .fetch_main_chain_blocks(&initial_state, &mut nonbest_blocks) .await?; - dbg!(&new_blocks.len()); - // Stage and update new blocks self.stage_new_blocks(new_blocks, &finalized_db).await?; diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index beff4b074..9dee0c1a5 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -607,9 +607,9 @@ impl TestManager { /// Adds a delay between blocks to allow zaino / zebra to catch up with test. pub async fn generate_blocks_with_delay(&self, blocks: u32) { self.local_net.generate_blocks(blocks).await.unwrap(); - // for _ in 0..blocks { + // for i in 0..blocks { + // dbg!(i); // self.local_net.generate_blocks(1).await.unwrap(); - // // tokio::time::sleep(std::time::Duration::from_millis(100)).await; // } } From f787c1d6857691d7f9e33a3186e08d1e625ed16d Mon Sep 17 00:00:00 2001 From: dorianvp Date: Thu, 23 Oct 2025 21:04:49 -0300 Subject: [PATCH 5/5] test(chain_index): add logger for repro --- Cargo.lock | 1 + integration-tests/tests/chain_cache.rs | 266 ++++++++++++++++++++++++- zaino-state/src/backends/state.rs | 2 +- zaino-testutils/Cargo.toml | 23 ++- zaino-testutils/src/lib.rs | 11 +- 5 files changed, 283 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2fb35cfb..331d18db4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8512,6 +8512,7 @@ dependencies = [ "zcash_local_net", "zcash_protocol 0.6.1 (git+https://github.com/zcash/librustzcash?rev=d387aed7e04e881dbe30c6ff8b26a96c834c094b)", "zebra-chain", + "zebra-state", "zingo_common_components 0.1.0 (git+https://github.com/zingolabs/zingo-common.git?tag=zingo_common_components_v0.1.0)", "zingo_netutils 0.1.0 (git+https://github.com/zingolabs/zingo-common.git?tag=zingo_common_components_v0.1.0)", "zingo_test_vectors 0.0.1 (git+https://github.com/zingolabs/infrastructure.git?tag=zcash_local_net_v0.1.0)", diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index 247b41399..db5006173 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -2,6 +2,7 @@ use zaino_common::network::ActivationHeights; use zaino_fetch::jsonrpsee::connector::{test_node_and_return_url, JsonRpSeeConnector}; use zaino_state::BackendType; use zaino_testutils::{TestManager, Validator as _, ValidatorKind}; +use zebra_chain::chain_tip::ChainTip; async fn create_test_manager_and_connector( validator: &ValidatorKind, @@ -51,6 +52,7 @@ mod chain_query_interface { use futures::TryStreamExt as _; use tempfile::TempDir; + use tokio::time::{timeout, Instant}; use zaino_common::{ network::ActivationHeights, CacheConfig, DatabaseConfig, ServiceConfig, StorageConfig, }; @@ -70,6 +72,7 @@ mod chain_query_interface { parameters::NetworkKind, serialization::{ZcashDeserialize, ZcashDeserializeInto}, }; + use zebra_state::{ChainTipChange, LatestChainTip, TipAction}; use super::*; @@ -116,7 +119,7 @@ mod chain_query_interface { }, }; - let (test_manager, json_service) = create_test_manager_and_connector( + let (mut test_manager, json_service) = create_test_manager_and_connector( validator, Some(activation_heights), chain_cache.clone(), @@ -173,6 +176,13 @@ mod chain_query_interface { )) .await .unwrap(); + + let change = state_service.chain_tip_change.clone(); // field, not a method + let latest = state_service.chain_tip_change.latest_chain_tip(); // method on ChainTipChange + + test_manager.latest_chain_tip = Some(latest.clone()); + test_manager.chain_tip_change = Some(change.clone()); + let temp_dir: TempDir = tempfile::tempdir().unwrap(); let db_path: PathBuf = temp_dir.path().to_path_buf(); let config = BlockCacheConfig { @@ -611,6 +621,175 @@ mod chain_query_interface { } } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn simple_drain_repro() { + // until zaino is switched over to using chain index we will keep these activation heights separate. + // TODO: unify acitvation heights after switchover to chain index + let activation_heights = ActivationHeights { + overwinter: Some(1), + before_overwinter: Some(1), + sapling: Some(1), + blossom: Some(1), + heartwood: Some(1), + canopy: Some(1), + nu5: Some(2), + nu6: Some(2), + nu6_1: Some(1000), + nu7: None, + }; + + let (mut test_manager, json_service) = create_test_manager_and_connector( + &ValidatorKind::Zebrad, + Some(activation_heights), + None, + false, + false, + false, + false, + ) + .await; + + let state_chain_cache_dir = test_manager.data_dir.clone(); + let network = match test_manager.network { + NetworkKind::Regtest => { + zebra_chain::parameters::Network::new_regtest(activation_heights.into()) + } + NetworkKind::Testnet => zebra_chain::parameters::Network::new_default_testnet(), + NetworkKind::Mainnet => zebra_chain::parameters::Network::Mainnet, + }; + let state_service = StateService::spawn(StateServiceConfig::new( + zebra_state::Config { + cache_dir: state_chain_cache_dir, + ephemeral: false, + delete_old_database: true, + debug_stop_at_height: None, + debug_validity_check_interval: None, + }, + test_manager.zebrad_rpc_listen_address, + test_manager.zebrad_grpc_listen_address, + false, + None, + None, + None, + ServiceConfig::default(), + StorageConfig { + cache: CacheConfig::default(), + database: DatabaseConfig { + path: test_manager + .local_net + .data_dir() + .path() + .to_path_buf() + .join("zaino"), + ..Default::default() + }, + }, + network.into(), + false, + false, + )) + .await + .unwrap(); + + let change = state_service.chain_tip_change.clone(); // field, not a method + let latest = state_service.chain_tip_change.latest_chain_tip(); // method on ChainTipChange + + let _tip_logger = spawn_basic_tip_logger(&state_service); + + test_manager.latest_chain_tip = Some(latest.clone()); + test_manager.chain_tip_change = Some(change.clone()); + + let temp_dir: TempDir = tempfile::tempdir().unwrap(); + let db_path: PathBuf = temp_dir.path().to_path_buf(); + let config = BlockCacheConfig { + storage: StorageConfig { + database: DatabaseConfig { + path: db_path, + ..Default::default() + }, + ..Default::default() + }, + db_version: 1, + network: zaino_common::Network::Regtest(activation_heights), + no_sync: false, + no_db: false, + }; + let chain_index = NodeBackedChainIndex::new( + ValidatorConnector::State(chain_index::source::State { + read_state_service: state_service.read_state_service().clone(), + mempool_fetcher: json_service.clone(), + network: config.network, + }), + config, + ) + .await + .unwrap(); + let index_reader = chain_index.subscriber().await; + tokio::time::sleep(Duration::from_secs(3)).await; + + // ( + // test_manager, + // json_service, + // Some(state_service), + // chain_index, + // index_reader, + // ) + + // Mine 50 blocks + test_manager.local_net.generate_blocks(120).await.unwrap(); + + let target = zebra_chain::block::Height(120); + + let mut change = change.clone(); + let latest_clone = latest.clone(); + wait_for_tip_at_least(&mut change, &latest_clone, target, Duration::from_secs(60)) + .await + .map_err(|e| panic!("wait_for_tip_at_least failed: {e}")) + .unwrap(); + + // let node_tip: zebra_chain::block::Height = + // json_service.get_block_count().await.unwrap().into(); + // assert!( + // node_tip.0 >= target.0, + // "node tip {} < target {}", + // node_tip.0, + // target.0 + // ); + + index_reader + .wait_for_height( + zaino_state::Height::try_from(target.0).unwrap(), + Duration::from_secs(30), + ) + .await + .unwrap_or_else(|e| panic!("wait_for_height failed: {e}")); + } + + /// Spawn a background task that prints a line every time the best tip changes. + /// Ends cleanly when the sender (state service) shuts down. + fn spawn_basic_tip_logger(state_service: &StateService) -> tokio::task::JoinHandle<()> { + let mut latest = state_service.chain_tip_change.latest_chain_tip(); + + tokio::spawn(async move { + loop { + // wait for any change; stops if channel closes + if let Err(_closed) = latest.best_tip_changed().await { + eprintln!("[tip] watch closed; stopping tip logger"); + break; + } + + // log the new tip (if present), then mark as seen + if let Some((h, hash)) = latest.best_tip_height_and_hash() { + println!("[tip] height={} hash={:?}", h.0, hash); + } else { + println!("[tip] (no tip yet)"); + } + latest.mark_best_tip_seen(); + } + }) + } + + // TODO: This is now broken. See `simple_drain_repro` for a working but non-ideal version. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn repro_nfs_drain() { let (test_manager, json_service, _option_state_service, _chain_index, indexer) = @@ -624,17 +803,86 @@ mod chain_query_interface { ) .await; + // Mine 200 blocks test_manager.generate_blocks_with_delay(200).await; - let node_tip: zebra_chain::block::Height = - dbg!(json_service.get_block_count().await.unwrap().into()); + // Wait for Zebra’s (NFS/finalized) best tip to reach 200 + // test_manager + // .wait_for_tip_at_least(zebra_chain::block::Height(200), Duration::from_secs(60)) + // .await + // .unwrap(); - match indexer - .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(20)) - .await - { - Ok(_) => (), - Err(e) => panic!("wait_for_height failed: {}", e), + // // Optional: now assert RPC/indexer views + // let node_tip: zebra_chain::block::Height = + // dbg!(json_service.get_block_count().await.unwrap().into()); + + // indexer + // .wait_for_height( + // Height::try_from(node_tip.0).unwrap(), + // Duration::from_secs(30), + // ) + // .await + // .unwrap_or_else(|e| panic!("wait_for_height failed: {e}")); + + // test_manager.generate_blocks_with_delay(200).await; + + // NodeBackedChainIndexSubscriber::wait_for_tip_at_least( + // zebra_chain::block::Height(201), + // test_manager.chain_tip_change.clone(), + // test_manager.latest_chain_tip.clone(), + // Duration::from_secs(30), + // ) + // .await + // .unwrap(); + + // let node_tip: zebra_chain::block::Height = + // dbg!(json_service.get_block_count().await.unwrap().into()); + + // match indexer + // .wait_for_height(Height::try_from(node_tip).unwrap(), Duration::from_secs(20)) + // .await + // { + // Ok(_) => (), + // Err(e) => panic!("wait_for_height failed: {}", e), + // } + } + + async fn wait_for_tip_at_least( + change: &mut ChainTipChange, + latest: &LatestChainTip, + target: zebra_chain::block::Height, + max_wait: Duration, + ) -> Result<(), String> { + println!("[TIP] waiting for tip >= {0}", target.0); + if latest.best_tip_height().map_or(false, |h| h >= target) { + return Ok(()); + } + + let deadline = Instant::now() + max_wait; + loop { + let remain = deadline.saturating_duration_since(Instant::now()); + if remain.is_zero() { + return Err(format!("timeout waiting for height >= {}", target.0)); + } + + match timeout(remain, change.wait_for_tip_change()).await { + Ok(Ok(_)) => { + // check current height after the action + if latest.best_tip_height().map_or(false, |h| h >= target) { + return Ok(()); + } + } + Ok(Err(_recv_closed)) => { + // sender died; last-chance check + if latest.best_tip_height().map_or(false, |h| h >= target) { + return Ok(()); + } + return Err("tip channel closed (state service shut down)".into()); + } + Err(_elapsed) => { + return Err(format!("timeout waiting for height >= {}", target.0)); + } + } } } } diff --git a/zaino-state/src/backends/state.rs b/zaino-state/src/backends/state.rs index 957f66463..d70497c2c 100644 --- a/zaino-state/src/backends/state.rs +++ b/zaino-state/src/backends/state.rs @@ -120,7 +120,7 @@ pub struct StateService { /// Thread-safe status indicator. status: AtomicStatus, /// Listener for when the chain tip changes - chain_tip_change: zebra_state::ChainTipChange, + pub chain_tip_change: zebra_state::ChainTipChange, } impl StateService { diff --git a/zaino-testutils/Cargo.toml b/zaino-testutils/Cargo.toml index 49cd255b5..db14e8376 100644 --- a/zaino-testutils/Cargo.toml +++ b/zaino-testutils/Cargo.toml @@ -20,21 +20,26 @@ zainod = { workspace = true } zcash_protocol = { git = "https://github.com/zcash/librustzcash", rev = "d387aed7e04e881dbe30c6ff8b26a96c834c094b", features = [ "local-consensus", ] } -zcash_client_backend = { git = "https://github.com/zcash/librustzcash", rev = "d387aed7e04e881dbe30c6ff8b26a96c834c094b", features = ["lightwalletd-tonic"] } +zcash_client_backend = { git = "https://github.com/zcash/librustzcash", rev = "d387aed7e04e881dbe30c6ff8b26a96c834c094b", features = [ + "lightwalletd-tonic", +] } # Zebra zebra-chain = { workspace = true } +zebra-state = { workspace = true } # Zingo-infra zcash_local_net = { git = "https://github.com/zingolabs/infrastructure.git", tag = "zcash_local_net_v0.1.0" } -zingo_test_vectors = { git = "https://github.com/zingolabs/infrastructure.git", tag = "zcash_local_net_v0.1.0" } +zingo_test_vectors = { git = "https://github.com/zingolabs/infrastructure.git", tag = "zcash_local_net_v0.1.0" } # Zingo-common -zingo_common_components = { git = "https://github.com/zingolabs/zingo-common.git", tag = "zingo_common_components_v0.1.0", features = [ "for_test" ]} +zingo_common_components = { git = "https://github.com/zingolabs/zingo-common.git", tag = "zingo_common_components_v0.1.0", features = [ + "for_test", +] } # ZingoLib zingolib = { git = "https://github.com/zingolabs/zingolib.git", rev = "f88e1d76ea244d6cc48d7fd4c3a609c6598318dc", features = [ - "testutils", + "testutils", ] } # Miscellaneous @@ -45,9 +50,9 @@ tokio = { workspace = true } tonic = { workspace = true } tempfile = { workspace = true } tracing-subscriber = { workspace = true, features = [ - "fmt", - "env-filter", - "time", + "fmt", + "env-filter", + "time", ] } # We don't need this as a direct dependency. @@ -60,7 +65,7 @@ tracing-subscriber = { workspace = true, features = [ # its rand version update. proptest = { workspace = true } lazy_static = { workspace = true } -zip32 = {workspace = true} +zip32 = { workspace = true } [dev-dependencies] -zingo_netutils = { git = "https://github.com/zingolabs/zingo-common.git", tag = "zingo_common_components_v0.1.0" } +zingo_netutils = { git = "https://github.com/zingolabs/zingo-common.git", tag = "zingo_common_components_v0.1.0" } diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index 9dee0c1a5..70076fd3b 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -10,10 +10,13 @@ pub mod test_vectors { use once_cell::sync::Lazy; use std::{ + io, net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, + time::Duration, }; use tempfile::TempDir; +use tokio::time::{timeout, Instant}; use tracing_subscriber::EnvFilter; use zaino_common::{ network::ActivationHeights, CacheConfig, DatabaseConfig, Network, ServiceConfig, StorageConfig, @@ -23,7 +26,9 @@ use zainodlib::config::default_ephemeral_cookie_path; pub use zcash_local_net as services; pub use zcash_local_net::validator::Validator; use zcash_local_net::validator::{ZcashdConfig, ZebradConfig}; -use zebra_chain::parameters::NetworkKind; +use zebra_chain::chain_tip::ChainTip; +use zebra_chain::{block::Height, parameters::NetworkKind}; +use zebra_state::{ChainTipChange, LatestChainTip}; use zingo_test_vectors::seeds; pub use zingolib::get_base_address_macro; pub use zingolib::lightclient::LightClient; @@ -371,6 +376,8 @@ pub struct TestManager { pub json_server_cookie_dir: Option, /// Zingolib lightclients. pub clients: Option, + pub latest_chain_tip: Option, + pub chain_tip_change: Option, } impl TestManager { @@ -559,6 +566,8 @@ impl TestManager { zaino_grpc_listen_address, json_server_cookie_dir: zaino_json_server_cookie_dir, clients, + latest_chain_tip: None, + chain_tip_change: None, }; // Generate an extra block to turn on NU5 and NU6,