Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 93 additions & 91 deletions zaino-state/src/local_cache/finalised_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,26 +375,71 @@ impl FinalisedState {
/// - Re-populated the database from the NEXT block in the chain (`reorg_height + 1`).
async fn sync_db_from_reorg(&self) -> Result<(), FinalisedStateError> {
let network = self.config.network.to_zebra_network();

let mut reorg_height = self.get_db_height().unwrap_or(Height(0));
// let reorg_height_int = reorg_height.0.saturating_sub(100);
// reorg_height = Height(reorg_height_int);

let mut reorg_hash = self.get_hash(reorg_height.0).unwrap_or(Hash([0u8; 32]));

let mut check_hash = match self

// Check if cached height exceeds current chain height.
// This can occur when:
// - Switching between networks (mainnet/testnet/regtest)
// - Chain has reset or been pruned
// - Database persisted from a previous chain state
// Without this check, attempting to fetch non-existent blocks would cause a crash.
// If detected, clear the cache and resync from genesis.
let current_chain_height = self
.fetcher
.get_block(reorg_height.0.to_string(), Some(1))
.get_blockchain_info()
.await?
{
zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block) => block.hash.0,
_ => {
return Err(FinalisedStateError::Custom(
"Unexpected block response type".to_string(),
))
.blocks
.0;

if reorg_height.0 > current_chain_height {
tracing::warn!(
"Cached height {} is beyond current chain height {}. Clearing cache for fresh chain.",
reorg_height.0,
current_chain_height
);
{
let mut txn = self.database.begin_rw_txn()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This erases the whole cache. While it might be fine for Regtest, it probably isn't the appropriate behavior in production.

txn.clear_db(self.heights_to_hashes)?;
txn.clear_db(self.hashes_to_blocks)?;
txn.commit()?;
}
// Reset to height 0 for fresh chain
reorg_height = Height(0);
}

let mut reorg_hash = self.get_hash(reorg_height.0).unwrap_or(Hash([0u8; 32]));

// Wait for initial block to exist instead of crashing.
// On fresh chains or during initial sync, blocks may not be available immediately.
// This loop polls until the block at reorg_height is available, preventing
// crashes when the indexer starts before the chain has any blocks.
let mut check_hash = loop {
match self
.fetcher
.get_block(reorg_height.0.to_string(), Some(1))
.await
{
Ok(zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block)) => {
break block.hash.0;
}
Ok(_) => {
return Err(FinalisedStateError::Custom(
"Unexpected block response type".to_string(),
))
}
Err(e) => {
// Block doesn't exist yet - wait for it to be mined
tracing::debug!(
"Block at height {} not found yet, waiting for chain to populate: {}",
reorg_height.0,
e
);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
}
};

// Find reorg height.
//
// Here this is the latest height at which the internal block hash matches the server block hash.
Expand All @@ -413,31 +458,42 @@ impl FinalisedState {
break;
}
};

reorg_hash = self.get_hash(reorg_height.0).unwrap_or(Hash([0u8; 32]));

check_hash = match self
.fetcher
.get_block(reorg_height.0.to_string(), Some(1))
.await?
{
zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block) => block.hash.0,
_ => {
return Err(FinalisedStateError::Custom(
"Unexpected block response type".to_string(),
))

// Wait for block to exist instead of crashing.
// During reorg detection, we walk backwards through the chain to find the
// last valid block. If we're near the chain tip and blocks haven't been
// mined yet, this prevents crashes by waiting for blocks to become available.
check_hash = loop {
match self
.fetcher
.get_block(reorg_height.0.to_string(), Some(1))
.await
{
Ok(zaino_fetch::jsonrpsee::response::GetBlockResponse::Object(block)) => {
break block.hash.0;
}
Ok(_) => {
return Err(FinalisedStateError::Custom(
"Unexpected block response type".to_string(),
))
}
Err(e) => {
tracing::debug!(
"Block at height {} not found yet, waiting: {}",
reorg_height.0,
e
);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
}
};
}

// Refill from max(reorg_height[+1], sapling_activation_height) to current server (finalised state) height.
let mut sync_height = self
.fetcher
.get_blockchain_info()
.await?
.blocks
.0
.saturating_sub(99);
let sync_height = current_chain_height.saturating_sub(99);

for block_height in ((reorg_height.0 + 1).max(
self.config
.network
Expand Down Expand Up @@ -474,62 +530,8 @@ impl FinalisedState {
}
}
}

// Wait for server to sync to with p2p network and sync new blocks.
if !self.config.network.to_zebra_network().is_regtest() {
self.status.store(StatusType::Syncing);
loop {
let blockchain_info = self.fetcher.get_blockchain_info().await?;
let server_height = blockchain_info.blocks.0;
for block_height in (sync_height + 1)..(server_height - 99) {
if self.get_hash(block_height).is_ok() {
self.delete_block(Height(block_height))?;
}
loop {
match fetch_block_from_node(
self.state.as_ref(),
Some(&network),
&self.fetcher,
HashOrHeight::Height(Height(block_height)),
)
.await
{
Ok((hash, block)) => {
self.insert_block((Height(block_height), hash, block))?;
info!(
"Block at height {} successfully inserted in finalised state.",
block_height
);
break;
}
Err(e) => {
self.status.store(StatusType::RecoverableError);
warn!("{e}");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
}
sync_height = server_height - 99;
if (blockchain_info.blocks.0 as i64 - blockchain_info.estimated_height.0 as i64)
.abs()
<= 10
{
break;
} else {
info!(" - Validator syncing with network. ZainoDB chain height: {}, Validator chain height: {}, Estimated Network chain height: {}",
&sync_height,
&blockchain_info.blocks.0,
&blockchain_info.estimated_height.0
);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
continue;
}
}
}


self.status.store(StatusType::Ready);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you removing this line where the status is updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! That line shouldn't have been removed. Looking at the code flow, the status needs to be set to Ready after the sync completes successfully.

I'll restore this line:

self.status.store(StatusType::Ready);
Ok(())

This was an unintended change while refactoring the sync logic.


Ok(())
}

Expand Down Expand Up @@ -721,4 +723,4 @@ impl FinalisedStateSubscriber {
pub fn status(&self) -> StatusType {
self.status.load()
}
}
}