From ea94b48e6d8fa9d2f6e156dffe7da2eb8250bc77 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Wed, 2 Apr 2025 17:15:50 +0200 Subject: [PATCH 1/2] refactor(chain): introduce `BlockTree` to manage forks and make accesses O(1) --- src/chain/chain.rs | 24 +- src/chain/graph.rs | 628 ++++++++++++++++++++++++++++++++++++++ src/chain/header_chain.rs | 10 - src/chain/mod.rs | 15 + src/prelude.rs | 12 +- 5 files changed, 655 insertions(+), 34 deletions(-) create mode 100644 src/chain/graph.rs diff --git a/src/chain/chain.rs b/src/chain/chain.rs index 93e3a61a..3610288f 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -8,7 +8,7 @@ use std::{ use bitcoin::{ block::Header, p2p::message_filter::{CFHeaders, CFilter, GetCFHeaders, GetCFilters}, - Block, BlockHash, CompactTarget, Network, ScriptBuf, TxOut, Work, + Block, BlockHash, CompactTarget, Network, ScriptBuf, TxOut, }; use tokio::sync::Mutex; @@ -62,7 +62,6 @@ pub(crate) struct Chain { dialog: Arc, } -#[allow(dead_code)] impl Chain { #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -165,21 +164,6 @@ impl Chain { self.header_chain.contains_header(header) } - // Canoncial chainwork - pub(crate) fn chainwork(&self) -> Work { - self.header_chain.chainwork() - } - - // Calculate the chainwork after a fork height to evalutate the fork - pub(crate) fn chainwork_after_height(&self, height: u32) -> Work { - self.header_chain.chainwork_after_height(height) - } - - // Human readable chainwork - pub(crate) fn log2_work(&self) -> f64 { - self.header_chain.log2_work() - } - // Have we hit the known checkpoints pub(crate) fn checkpoints_complete(&self) -> bool { self.checkpoints.is_exhausted() @@ -870,12 +854,6 @@ impl Chain { self.cf_header_chain.clear_queue(); } - // We found a reorg and some filters are no longer valid. - async fn clear_filter_headers(&mut self) { - self.cf_header_chain.clear_queue(); - self.cf_header_chain.clear_headers(); - } - // Clear the filter header cache to rescan the filters for new scripts. pub(crate) fn clear_filters(&mut self) { self.filter_chain.clear_cache(); diff --git a/src/chain/graph.rs b/src/chain/graph.rs new file mode 100644 index 00000000..cf7a56e2 --- /dev/null +++ b/src/chain/graph.rs @@ -0,0 +1,628 @@ +use std::collections::{BTreeMap, HashMap}; + +use crate::{prelude::ZerolikeExt, HeaderCheckpoint}; + +use bitcoin::{ + block::Header, constants::genesis_block, params::Params, BlockHash, CompactTarget, FilterHash, + Network, Work, +}; + +use super::IndexedHeader; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct Height(u32); + +impl Height { + fn new(height: u32) -> Self { + Height(height) + } + + fn from_u64_checked(height: u64) -> Option { + match height.try_into() { + Ok(height) => Some(Height::new(height)), + Err(_) => None, + } + } + + fn increment(&self) -> Self { + Self(self.0 + 1) + } + + fn checked_sub(&self, other: Height) -> Option { + let height_sub_checked = self.0.checked_sub(other.0); + height_sub_checked.map(Self) + } + + fn is_adjustment_multiple(&self, params: impl AsRef) -> bool { + self.0 as u64 % params.as_ref().difficulty_adjustment_interval() == 0 + } + + pub(crate) fn to_u32(self) -> u32 { + self.0 + } + + #[allow(dead_code)] + fn to_u64(self) -> u64 { + self.0 as u64 + } +} + +impl From for Height { + fn from(value: u32) -> Self { + Height(value) + } +} + +#[derive(Debug, Clone)] +pub(crate) enum AcceptHeaderChanges { + Accepted { + connected_at: IndexedHeader, + }, + Duplicate, + ExtendedFork { + connected_at: IndexedHeader, + }, + Reorganization { + accepted: Vec, + disconnected: Vec, + }, + Rejected(HeaderRejection), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum HeaderRejection { + InvalidPow { + expected: CompactTarget, + got: CompactTarget, + }, + UnknownPrevHash(BlockHash), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct Tip { + pub hash: BlockHash, + pub height: Height, + pub next_work_required: Option, +} + +impl Tip { + pub(crate) fn from_checkpoint(height: impl Into, hash: BlockHash) -> Self { + let height = height.into(); + Self { + hash, + height, + next_work_required: None, + } + } +} + +impl From for Tip { + fn from(value: HeaderCheckpoint) -> Self { + Tip::from_checkpoint(value.height, value.hash) + } +} + +#[derive(Debug, Clone, Hash)] +pub(crate) struct BlockNode { + pub height: Height, + pub header: Header, + pub acc_work: Work, + pub filter_hash: Option, +} + +impl BlockNode { + fn new(height: Height, header: Header, acc_work: Work) -> Self { + Self { + height, + header, + acc_work, + filter_hash: None, + } + } +} + +#[derive(Debug)] +pub struct BlockTree { + canonical_hashes: BTreeMap, + headers: HashMap, + active_tip: Tip, + candidate_forks: Vec, + network: Network, +} + +#[allow(unused)] +impl BlockTree { + pub fn new(tip: impl Into, network: Network) -> Self { + let tip = tip.into(); + Self { + canonical_hashes: BTreeMap::new(), + headers: HashMap::with_capacity(20_000), + active_tip: tip, + candidate_forks: Vec::with_capacity(2), + network, + } + } + + pub fn from_genesis(network: Network) -> Self { + let genesis = genesis_block(network); + let height = Height::new(0); + let hash = genesis.block_hash(); + let tip = Tip { + hash, + height, + next_work_required: Some(genesis.header.bits), + }; + let mut headers = HashMap::with_capacity(20_000); + let block_node = BlockNode::new(height, genesis.header, genesis.header.work()); + headers.insert(hash, block_node); + let mut canonical_hashes = BTreeMap::new(); + canonical_hashes.insert(Height::new(0), hash); + Self { + canonical_hashes, + headers, + active_tip: tip, + candidate_forks: Vec::with_capacity(2), + network, + } + } + + pub fn from_header(height: impl Into, header: Header, network: Network) -> Self { + let height = height.into(); + let hash = header.block_hash(); + let tip = Tip { + hash, + height, + next_work_required: Some(header.bits), + }; + let mut headers = HashMap::with_capacity(20_000); + let block_node = BlockNode::new(height, header, header.work()); + headers.insert(hash, block_node); + Self { + canonical_hashes: BTreeMap::new(), + headers, + active_tip: tip, + candidate_forks: Vec::with_capacity(2), + network, + } + } + + fn accept_header(&mut self, new_header: Header) -> AcceptHeaderChanges { + let new_hash = new_header.block_hash(); + let prev_hash = new_header.prev_blockhash; + + if self.active_tip.hash.eq(&prev_hash) { + let new_height = self.active_tip.height.increment(); + let params = self.network.params(); + let next_work = if !params.no_pow_retargeting + && !params.allow_min_difficulty_blocks + && new_height.is_adjustment_multiple(self.network) + { + self.compute_next_work_required(new_height) + } else { + self.active_tip.next_work_required + }; + if let Some(work) = next_work { + if new_header.bits.ne(&work) { + return AcceptHeaderChanges::Rejected(HeaderRejection::InvalidPow { + expected: work, + got: new_header.bits, + }); + } + } + let new_tip = Tip { + hash: new_hash, + height: new_height, + next_work_required: next_work, + }; + let prev_work = self + .headers + .get(&prev_hash) + .map(|block| block.acc_work) + .unwrap_or(Work::zero()); + let new_work = prev_work + new_header.work(); + let new_block_node = BlockNode::new(new_height, new_header, new_work); + self.headers.insert(new_hash, new_block_node); + self.active_tip = new_tip; + self.canonical_hashes.insert(new_height, new_hash); + return AcceptHeaderChanges::Accepted { + connected_at: IndexedHeader::new(new_height.to_u32(), new_header), + }; + } + + if self.headers.contains_key(&new_hash) { + return AcceptHeaderChanges::Duplicate; + } + + if let Some(fork_index) = self + .candidate_forks + .iter() + .position(|fork| fork.hash.eq(&prev_hash)) + { + let fork = self.candidate_forks.swap_remove(fork_index); + if let Some(node) = self.headers.get(&fork.hash) { + let new_height = node.height.increment(); + let params = self.network.params(); + let next_work = if !params.no_pow_retargeting + && !params.allow_min_difficulty_blocks + && new_height.is_adjustment_multiple(self.network) + { + self.compute_next_work_required(new_height) + } else { + fork.next_work_required + }; + if let Some(work) = next_work { + if new_header.bits.ne(&work) { + return AcceptHeaderChanges::Rejected(HeaderRejection::InvalidPow { + expected: work, + got: new_header.bits, + }); + } + } + let acc_work = node.acc_work + new_header.work(); + let new_tip = Tip { + hash: new_hash, + height: new_height, + next_work_required: next_work, + }; + let new_block_node = BlockNode::new(new_height, new_header, acc_work); + self.headers.insert(new_hash, new_block_node); + if acc_work + > self + .headers + .get(&self.active_tip.hash) + .map(|node| node.acc_work) + .unwrap_or(Work::zero()) + { + self.candidate_forks.push(self.active_tip); + self.active_tip = new_tip; + let (accepted, disconnected) = self.switch_to_fork(&new_tip); + return AcceptHeaderChanges::Reorganization { + accepted, + disconnected, + }; + } else { + self.candidate_forks.push(new_tip); + return AcceptHeaderChanges::ExtendedFork { + connected_at: IndexedHeader::new(new_height.to_u32(), new_header), + }; + } + } + } + + match self.headers.get(&prev_hash) { + // A new fork was detected + Some(node) => { + let new_height = node.height.increment(); + let params = self.network.params(); + let next_work = if !params.no_pow_retargeting + && !params.allow_min_difficulty_blocks + && new_height.is_adjustment_multiple(self.network) + { + self.compute_next_work_required(new_height) + } else { + Some(node.header.bits) + }; + if let Some(work) = next_work { + if new_header.bits.ne(&work) { + return AcceptHeaderChanges::Rejected(HeaderRejection::InvalidPow { + expected: work, + got: new_header.bits, + }); + } + } + let acc_work = node.acc_work + new_header.work(); + let new_tip = Tip { + hash: new_hash, + height: new_height, + next_work_required: next_work, + }; + self.candidate_forks.push(new_tip); + let new_block_node = BlockNode::new(new_height, new_header, acc_work); + self.headers.insert(new_hash, new_block_node); + AcceptHeaderChanges::ExtendedFork { + connected_at: IndexedHeader::new(new_height.to_u32(), new_header), + } + } + // This chain doesn't link to ours in any known way + None => AcceptHeaderChanges::Rejected(HeaderRejection::UnknownPrevHash(prev_hash)), + } + } + + fn switch_to_fork(&mut self, new_best: &Tip) -> (Vec, Vec) { + let mut curr_hash = new_best.hash; + let mut connections = Vec::new(); + let mut disconnections = Vec::new(); + loop { + match self.headers.get(&curr_hash) { + Some(node) => { + let next = node.header.prev_blockhash; + match self.canonical_hashes.get_mut(&node.height) { + Some(canonical_hash) => { + let reorged_hash = *canonical_hash; + if reorged_hash.ne(&curr_hash) { + if let Some(reorged) = self.headers.get(&reorged_hash) { + disconnections.push(IndexedHeader::new( + reorged.height.to_u32(), + reorged.header, + )); + } + *canonical_hash = curr_hash; + connections + .push(IndexedHeader::new(node.height.to_u32(), node.header)); + curr_hash = next; + } else { + return (connections, disconnections); + } + } + None => { + self.canonical_hashes.insert(node.height, curr_hash); + connections.push(IndexedHeader::new(node.height.to_u32(), node.header)); + curr_hash = next; + } + } + } + None => return (connections, disconnections), + } + } + } + + fn compute_next_work_required(&self, new_height: Height) -> Option { + let adjustment_period = + Height::from_u64_checked(self.network.params().difficulty_adjustment_interval())?; + let epoch_start = new_height.checked_sub(adjustment_period)?; + let epoch_end = new_height.checked_sub(Height::new(1))?; + let epoch_start_hash = self.canonical_hashes.get(&epoch_start)?; + let epoch_end_hash = self.canonical_hashes.get(&epoch_end)?; + let epoch_start_header = self.headers.get(epoch_start_hash).map(|node| node.header)?; + let epoch_end_header = self.headers.get(epoch_end_hash).map(|node| node.header)?; + let new_target = CompactTarget::from_header_difficulty_adjustment( + epoch_start_header, + epoch_end_header, + self.network, + ); + Some(new_target) + } + + pub(crate) fn block_hash_at_height(&self, height: impl Into) -> Option { + let height = height.into(); + self.canonical_hashes.get(&height).copied() + } + + pub(crate) fn header_at_height(&self, height: impl Into) -> Option
{ + let height = height.into(); + let hash = self.canonical_hashes.get(&height)?; + self.headers.get(hash).map(|node| node.header) + } + + pub(crate) fn height_of_hash(&self, hash: BlockHash) -> Option { + self.headers.get(&hash).map(|node| node.height.to_u32()) + } + + pub(crate) fn height(&self) -> u32 { + self.active_tip.height.to_u32() + } + + pub(crate) fn filter_hash(&self, block_hash: BlockHash) -> Option { + self.headers.get(&block_hash)?.filter_hash + } + + pub(crate) fn filter_hash_at_height(&self, height: impl Into) -> Option { + let height = height.into(); + let hash = self.canonical_hashes.get(&height)?; + self.headers.get(hash)?.filter_hash + } + + pub(crate) fn iter(&self) -> BlockTreeIterator { + BlockTreeIterator { + block_tree: self, + current: self.active_tip.hash, + } + } +} + +pub(crate) struct BlockTreeIterator<'a> { + block_tree: &'a BlockTree, + current: BlockHash, +} + +impl Iterator for BlockTreeIterator<'_> { + type Item = IndexedHeader; + + fn next(&mut self) -> Option { + let node = self.block_tree.headers.get(&self.current)?; + self.current = node.header.prev_blockhash; + Some(IndexedHeader::new(node.height.to_u32(), node.header)) + } +} + +#[cfg(test)] +mod tests { + use bitcoin::consensus::deserialize; + + use super::*; + use std::str::FromStr; + + #[test] + fn test_depth_one_reorg() { + let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap(); + let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap(); + let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap(); + // let batch_1 = vec![block_8, block_9, block_10]; + let new_block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093792151c0e9ce4e4c789ca98427d7740cc7acf30d2ca0c08baef266bf152289d814567e5e66ffff7f2001000000").unwrap()).unwrap(); + let block_11: Header = deserialize(&hex::decode("00000020efcf8b12221fccc735b9b0b657ce15b31b9c50aff530ce96a5b4cfe02d8c0068496c1b8a89cf5dec22e46c35ea1035f80f5b666a1b3aa7f3d6f0880d0061adcc567e5e66ffff7f2001000000").unwrap()).unwrap(); + // let fork = vec![new_block_10, block_11]; + + let tip = Tip::from_checkpoint( + 7, + BlockHash::from_str("62c28f380692524a3a8f1fc66252bc0eb31d6b6a127d2263bdcbee172529fe16") + .unwrap(), + ); + let mut chain = BlockTree::new(tip, Network::Regtest); + let accept_8 = chain.accept_header(block_8); + assert!(matches!( + accept_8, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + let accept_9 = chain.accept_header(block_9); + assert!(matches!( + accept_9, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + let accept_10 = chain.accept_header(block_10); + assert!(matches!( + accept_10, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + assert_eq!(chain.height(), 10); + let accept_new_10 = chain.accept_header(new_block_10); + assert!(matches!( + accept_new_10, + AcceptHeaderChanges::ExtendedFork { connected_at: _ } + )); + assert_eq!(chain.height(), 10); + assert_eq!(chain.header_at_height(10), Some(block_10)); + let accept_11 = chain.accept_header(block_11); + match accept_11 { + AcceptHeaderChanges::Reorganization { + accepted, + disconnected, + } => { + assert_eq!( + accepted, + vec![ + IndexedHeader::new(11, block_11), + IndexedHeader::new(10, new_block_10) + ] + ); + assert_eq!(block_10, disconnected.first().unwrap().header); + assert_eq!(10, disconnected.first().unwrap().height); + assert_eq!(1, disconnected.len()) + } + _ => panic!("reorganization should have been accepted"), + } + assert_eq!(chain.header_at_height(12), None); + assert_eq!(chain.header_at_height(11), Some(block_11)); + assert_eq!(chain.header_at_height(10), Some(new_block_10)); + assert_eq!(chain.header_at_height(9), Some(block_9)); + assert_eq!(chain.header_at_height(8), Some(block_8)); + } + + #[test] + fn test_depth_two_reorg() { + let block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f047eb4d0fe76345e307d0e020a079cedfa37101ee7ac84575cf829a611b0f84bc4805e66ffff7f2001000000").unwrap()).unwrap(); + let block_2: Header = deserialize(&hex::decode("00000020299e41732deb76d869fcdb5f72518d3784e99482f572afb73068d52134f1f75e1f20f5da8d18661d0f13aa3db8fff0f53598f7d61f56988a6d66573394b2c6ffc5805e66ffff7f2001000000").unwrap()).unwrap(); + let block_3: Header = deserialize(&hex::decode("00000020b96feaa82716f11befeb608724acee4743e0920639a70f35f1637a88b8b6ea3471f1dbedc283ce6a43a87ed3c8e6326dae8d3dbacce1b2daba08e508054ffdb697815e66ffff7f2001000000").unwrap()).unwrap(); + let block_4: Header = deserialize(&hex::decode("0000002052ff614fa461ff38b4a5c101d04fdcac2f34722e60bd43d12c8de0a394fe0c60444fb24b7e9885f60fed9927d27f23854ecfab29287163ef2b868d5d626f82ed97815e66ffff7f2002000000").unwrap()).unwrap(); + let new_block_3: Header = deserialize(&hex::decode("00000020b96feaa82716f11befeb608724acee4743e0920639a70f35f1637a88b8b6ea349c6240c5d0521966771808950f796c9c04088bc9551a828b64f1cf06831705dfbc835e66ffff7f2000000000").unwrap()).unwrap(); + let new_block_4: Header = deserialize(&hex::decode("00000020d2a1c6ba2be393f405fe2f4574565f9ee38ac68d264872fcd82b030970d0232ce882eb47c3dd138587120f1ad97dd0e73d1e30b79559ad516cb131f83dcb87e9bc835e66ffff7f2002000000").unwrap()).unwrap(); + let mut chain = BlockTree::from_genesis(Network::Regtest); + let accept_1 = chain.accept_header(block_1); + assert!(matches!( + accept_1, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + let accept_2 = chain.accept_header(block_2); + assert!(matches!( + accept_2, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + let accept_3 = chain.accept_header(block_3); + assert!(matches!( + accept_3, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + let accept_4 = chain.accept_header(block_4); + assert!(matches!( + accept_4, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + assert_eq!(chain.height(), 4); + // Create a new fork + let accept_new_3 = chain.accept_header(new_block_3); + assert!(matches!( + accept_new_3, + AcceptHeaderChanges::ExtendedFork { connected_at: _ } + )); + assert_eq!(chain.height(), 4); + assert_eq!(chain.header_at_height(3), Some(block_3)); + // Advertise the same fork + let accept_new_3 = chain.accept_header(new_block_3); + assert!(matches!(accept_new_3, AcceptHeaderChanges::Duplicate)); + assert_eq!(chain.height(), 4); + // Extend the fork, but do not switch to it + let accept_new_4 = chain.accept_header(new_block_4); + assert!(matches!( + accept_new_4, + AcceptHeaderChanges::ExtendedFork { connected_at: _ } + )); + assert_eq!(chain.height(), 4); + assert_eq!(chain.header_at_height(4), Some(block_4)); + assert_eq!(chain.header_at_height(3), Some(block_3)); + assert_eq!(chain.header_at_height(2), Some(block_2)); + assert_eq!(chain.header_at_height(1), Some(block_1)); + } + + #[test] + fn test_reorg_to_root() { + let block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f575b313ad3ef825cfc204c34da8f3c1fd1784e2553accfa38001010587cb57241f855e66ffff7f2000000000").unwrap()).unwrap(); + let block_2: Header = deserialize(&hex::decode("00000020c81cedd6a989939936f31448e49d010a13c2e750acf02d3fa73c9c7ecfb9476e798da2e5565335929ad303fc746acabc812ee8b06139bcf2a4c0eb533c21b8c420855e66ffff7f2000000000").unwrap()).unwrap(); + // batch_1 = vec![block_1, block_2]; + let new_block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f575b313ad3ef825cfc204c34da8f3c1fd1784e2553accfa38001010587cb5724d5855e66ffff7f2004000000").unwrap()).unwrap(); + let new_block_2: Header = deserialize(&hex::decode("00000020d1d80f53343a084bd0da6d6ab846f9fe4a133de051ea00e7cae16ed19f601065798da2e5565335929ad303fc746acabc812ee8b06139bcf2a4c0eb533c21b8c4d6855e66ffff7f2000000000").unwrap()).unwrap(); + // fork = vec![new_block_1, new_block_2]; + let block_3: Header = deserialize(&hex::decode("0000002080f38c14e898d6646dd426428472888966e0d279d86453f42edc56fdb143241aa66c8fa8837d95be3f85d53f22e86a0d6d456b1ab348e073da4d42a39f50637423865e66ffff7f2000000000").unwrap()).unwrap(); + let block_4: Header = deserialize(&hex::decode("000000204877fed370af64c0a1f7a76f6944e1127aad965b1865f99ecfdf8fa72ae23377f51921d01ff1131bd589500a8ca142884297ceeb1aa762ad727249e9a23f2cb023865e66ffff7f2000000000").unwrap()).unwrap(); + // batch_2 = vec![block_3, block_4]; + let mut chain = BlockTree::from_genesis(Network::Regtest); + let accept_1 = chain.accept_header(block_1); + assert!(matches!( + accept_1, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + let accept_2 = chain.accept_header(block_2); + assert!(matches!( + accept_2, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + assert_eq!(chain.height(), 2); + let fork_1 = chain.accept_header(new_block_1); + assert!(matches!( + fork_1, + AcceptHeaderChanges::ExtendedFork { connected_at: _ } + )); + let fork_2 = chain.accept_header(new_block_2); + assert!(matches!( + fork_2, + AcceptHeaderChanges::ExtendedFork { connected_at: _ } + )); + let reorg_1 = chain.accept_header(block_3); + match reorg_1 { + AcceptHeaderChanges::Reorganization { + accepted, + disconnected, + } => { + assert_eq!( + accepted, + vec![ + IndexedHeader::new(3, block_3), + IndexedHeader::new(2, new_block_2), + IndexedHeader::new(1, new_block_1), + ] + ); + assert_eq!( + disconnected, + vec![ + IndexedHeader::new(2, block_2), + IndexedHeader::new(1, block_1), + ] + ); + } + _ => panic!("reorganization should have been accepted"), + } + let accept_4 = chain.accept_header(block_4); + assert!(matches!( + accept_4, + AcceptHeaderChanges::Accepted { connected_at: _ } + )); + } +} diff --git a/src/chain/header_chain.rs b/src/chain/header_chain.rs index 72047bf5..02e80afa 100644 --- a/src/chain/header_chain.rs +++ b/src/chain/header_chain.rs @@ -120,16 +120,6 @@ impl HeaderChain { } } - // Human readable chainwork - pub(crate) fn log2_work(&self) -> f64 { - let work = self - .headers - .values() - .map(|header| header.work().log2()) - .reduce(|acc, next| acc + next); - work.unwrap_or(0.0) - } - // The block locators are a way to inform our peer of blocks we know about pub(crate) fn locators(&mut self) -> Vec { let mut locators = Vec::new(); diff --git a/src/chain/mod.rs b/src/chain/mod.rs index dde9c609..2f1b0835 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -9,15 +9,30 @@ pub mod checkpoints; /// Errors associated with the blockchain representation. #[allow(dead_code)] pub(crate) mod error; +pub(crate) mod graph; pub(crate) mod header_batch; pub(crate) mod header_chain; use std::collections::HashMap; +use bitcoin::block::Header; + use crate::network::PeerId; type Height = u32; +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct IndexedHeader { + pub height: u32, + pub header: Header, +} + +impl IndexedHeader { + pub(crate) fn new(height: u32, header: Header) -> Self { + Self { height, header } + } +} + #[derive(Debug)] pub(crate) struct HeightMonitor { map: HashMap, diff --git a/src/prelude.rs b/src/prelude.rs index e58209eb..a73e41ad 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,6 +1,6 @@ use core::{future::Future, pin::Pin}; -use bitcoin::{hex::DisplayHex, p2p::address::AddrV2, Network}; +use bitcoin::{hex::DisplayHex, p2p::address::AddrV2, Network, Work}; #[allow(dead_code)] pub const MAX_FUTURE_BLOCK_TIME: i64 = 60 * 60 * 2; @@ -105,6 +105,16 @@ pub(crate) fn encode_qname(domain: &str) -> Vec { qname } +pub(crate) trait ZerolikeExt { + fn zero() -> Self; +} + +impl ZerolikeExt for Work { + fn zero() -> Self { + Self::from_be_bytes([0; 32]) + } +} + #[cfg(test)] mod tests { use super::Median; From 3fbb7d4f62ea3d40b866e1257a75e238da9fa076 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 4 Apr 2025 13:02:00 +0200 Subject: [PATCH 2/2] refactor(chain): use `BlockTree` as header chain implementation --- src/chain/chain.rs | 692 +++++++++++--------------------------- src/chain/error.rs | 8 - src/chain/graph.rs | 44 ++- src/chain/header_batch.rs | 25 +- src/chain/header_chain.rs | 404 ---------------------- src/chain/mod.rs | 4 +- src/db/mod.rs | 16 + src/db/sqlite/headers.rs | 139 ++++---- src/db/traits.rs | 32 +- src/lib.rs | 15 - src/messages.rs | 6 +- src/node.rs | 27 +- 12 files changed, 347 insertions(+), 1065 deletions(-) delete mode 100644 src/chain/header_chain.rs diff --git a/src/chain/chain.rs b/src/chain/chain.rs index 3610288f..282e17a1 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -8,7 +8,7 @@ use std::{ use bitcoin::{ block::Header, p2p::message_filter::{CFHeaders, CFilter, GetCFHeaders, GetCFilters}, - Block, BlockHash, CompactTarget, Network, ScriptBuf, TxOut, + Block, BlockHash, Network, ScriptBuf, TxOut, }; use tokio::sync::Mutex; @@ -16,7 +16,7 @@ use super::{ block_queue::BlockQueue, checkpoints::{HeaderCheckpoint, HeaderCheckpoints}, error::{BlockScanError, HeaderSyncError}, - header_chain::HeaderChain, + graph::{AcceptHeaderChanges, BlockTree, HeaderRejection, Tip}, HeightMonitor, }; #[cfg(feature = "filter-control")] @@ -27,7 +27,9 @@ use crate::messages::BlockRequest; use crate::IndexedFilter; use crate::{ chain::header_batch::HeadersBatch, - db::traits::HeaderStore, + db::{traits::HeaderStore, BlockHeaderChanges}, + dialog::Dialog, + error::HeaderPersistenceError, filters::{ cfheader_batch::CFHeaderBatch, cfheader_chain::{AppendAttempt, CFHeaderChain, QueuedCFHeader}, @@ -35,22 +37,16 @@ use crate::{ filter_chain::FilterChain, Filter, CF_HEADER_BATCH_SIZE, FILTER_BATCH_SIZE, }, + messages::{Event, Warning}, IndexedBlock, - { - dialog::Dialog, - error::HeaderPersistenceError, - messages::{Event, Warning}, - }, }; -const MAX_REORG_DEPTH: u32 = 5_000; const REORG_LOOKBACK: u32 = 7; -const MAX_HEADER_SIZE: usize = 20_000; const FILTER_BASIC: u8 = 0x00; #[derive(Debug)] pub(crate) struct Chain { - header_chain: HeaderChain, + pub(crate) header_chain: BlockTree, cf_header_chain: CFHeaderChain, filter_chain: FilterChain, checkpoints: HeaderCheckpoints, @@ -74,7 +70,7 @@ impl Chain { db: H, quorum_required: usize, ) -> Self { - let header_chain = HeaderChain::new(anchor); + let header_chain = BlockTree::new(anchor, network); let cf_header_chain = CFHeaderChain::new(anchor, quorum_required); let filter_chain = FilterChain::new(anchor); Chain { @@ -91,21 +87,6 @@ impl Chain { } } - // Top of the chain - pub(crate) fn tip(&self) -> BlockHash { - self.header_chain.tip() - } - - // The canoncial height of the chain, one less than the length - pub(crate) fn height(&self) -> u32 { - self.header_chain.height() - } - - // This header chain contains a block hash in memory - pub(crate) fn contains_hash(&self, blockhash: BlockHash) -> bool { - self.header_chain.contains_hash(blockhash) - } - // This header chain contains a block hash, potentially checking the disk pub(crate) async fn height_of_hash(&self, blockhash: BlockHash) -> Option { match self.header_chain.height_of_hash(blockhash) { @@ -118,7 +99,7 @@ impl Chain { } // This header chain contains a block hash in memory - pub(crate) fn cached_header_at_height(&self, height: u32) -> Option<&Header> { + pub(crate) fn cached_header_at_height(&self, height: u32) -> Option
{ self.header_chain.header_at_height(height) } @@ -128,7 +109,7 @@ impl Chain { height: u32, ) -> Result, HeaderPersistenceError> { match self.header_chain.header_at_height(height) { - Some(header) => Ok(Some(*header)), + Some(header) => Ok(Some(header)), None => { let mut db = self.db.lock().await; let header_opt = db.header_at(height).await; @@ -159,11 +140,6 @@ impl Chain { } } - // This header chain contains a block hash - pub(crate) fn contains_header(&self, header: &Header) -> bool { - self.header_chain.contains_header(header) - } - // Have we hit the known checkpoints pub(crate) fn checkpoints_complete(&self) -> bool { self.checkpoints.is_exhausted() @@ -171,7 +147,11 @@ impl Chain { // The last ten heights and headers in the chain pub(crate) fn last_ten(&self) -> BTreeMap { - self.header_chain.last_ten() + self.header_chain + .iter() + .take(10) + .map(|index| (index.height, index.header)) + .collect() } // Do we have best known height and is our height equal to it @@ -180,7 +160,7 @@ impl Chain { pub(crate) async fn is_synced(&self) -> bool { let height_lock = self.heights.lock().await; match height_lock.max() { - Some(peer_max) => self.height() >= peer_max, + Some(peer_max) => self.header_chain.height() >= peer_max, None => false, } } @@ -189,17 +169,17 @@ impl Chain { pub(crate) async fn locators(&mut self) -> Vec { // If a peer is sending us a fork at this point they are faulty. if !self.checkpoints_complete() { - vec![self.tip()] + vec![self.header_chain.tip_hash()] } else { // We should try to catch any reorgs if we are on a fresh start. // The database may have a header that is useful to the remote node // that is not currently in memory. - if self.header_chain.inner_len() < REORG_LOOKBACK as usize { - let older_locator = self.height().saturating_sub(REORG_LOOKBACK); + if self.header_chain.internally_cached_headers() < REORG_LOOKBACK as usize { + let older_locator = self.header_chain.height().saturating_sub(REORG_LOOKBACK); let mut db_lock = self.db.lock().await; let hash = db_lock.hash_at(older_locator).await; if let Ok(Some(locator)) = hash { - vec![self.tip(), locator] + vec![self.header_chain.tip_hash(), locator] } else { // We couldn't find a header deep enough to send over. Just proceed as usual self.header_chain.locators() @@ -212,29 +192,8 @@ impl Chain { } // Write the chain to disk - pub(crate) async fn flush_to_disk(&mut self) { - if let Err(e) = self - .db - .lock() - .await - .write(self.header_chain.headers()) - .await - { - self.dialog.send_warning(Warning::FailedPersistence { - warning: format!("Could not save headers to disk: {e}"), - }); - } - } - - // Write the chain to disk, overriding previous heights - pub(crate) async fn flush_over_height(&mut self, height: u32) { - if let Err(e) = self - .db - .lock() - .await - .write_over(self.header_chain.headers(), height) - .await - { + pub(crate) async fn write_changes(&mut self, changes: BlockHeaderChanges) { + if let Err(e) = self.db.lock().await.write(changes).await { self.dialog.send_warning(Warning::FailedPersistence { warning: format!("Could not save headers to disk: {e}"), }); @@ -247,87 +206,156 @@ impl Chain { .db .lock() .await - .load(self.height() + 1..) + .load(self.header_chain.height() + 1..) .await .map_err(HeaderPersistenceError::Database)?; - if let Some(first) = loaded_headers.values().next() { - if first.prev_blockhash.ne(&self.tip()) { - self.dialog.send_warning(Warning::InvalidStartHeight); - // The header chain did not align, so just start from the anchor - return Err(HeaderPersistenceError::CannotLocateHistory); - } else if loaded_headers - .iter() - .zip(loaded_headers.iter().skip(1)) - .any(|(first, second)| first.1.block_hash().ne(&second.1.prev_blockhash)) - { - self.dialog.send_warning(Warning::CorruptedHeaders); - return Err(HeaderPersistenceError::HeadersDoNotLink); - } - loaded_headers.iter().for_each(|header| { - if let Some(checkpoint) = self.checkpoints.next() { - if header.1.block_hash().eq(&checkpoint.hash) { - self.checkpoints.advance() + for (height, header) in loaded_headers { + let apply_header_changes = self.header_chain.accept_header(header); + match apply_header_changes { + AcceptHeaderChanges::Accepted { connected_at } => { + if height.ne(&connected_at.height) { + self.dialog.send_warning(Warning::CorruptedHeaders); + return Err(HeaderPersistenceError::HeadersDoNotLink); + } + if let Some(checkpoint) = self.checkpoints.next() { + if connected_at.header.block_hash().eq(&checkpoint.hash) { + self.checkpoints.advance() + } } } - }) + AcceptHeaderChanges::Rejected(reject_reason) => match reject_reason { + HeaderRejection::UnknownPrevHash(_) => { + return Err(HeaderPersistenceError::CannotLocateHistory); + } + HeaderRejection::InvalidPow { expected, got } => { + crate::log!( + self.dialog, + format!( + "Unexpected invalid proof of work when importing a block header. expected {}, got: {}", + expected.to_consensus(), + got.to_consensus() + ) + ); + } + }, + _ => (), + } } - self.header_chain.set_headers(loaded_headers); Ok(()) } - // If the number of headers in memory gets too large, move some of them to the disk - pub(crate) async fn manage_memory(&mut self) { - if self.header_chain.inner_len() > MAX_HEADER_SIZE { - self.flush_to_disk().await; - self.header_chain.move_up(); - } - } - // Sync the chain with headers from a peer, adjusting to reorgs if needed pub(crate) async fn sync_chain(&mut self, message: Vec
) -> Result<(), HeaderSyncError> { let header_batch = HeadersBatch::new(message).map_err(|_| HeaderSyncError::EmptyMessage)?; // If our chain already has the last header in the message there is no new information - if self.contains_hash(header_batch.last().block_hash()) { + if self.header_chain.contains(header_batch.last().block_hash()) { return Ok(()); } // We check first if the peer is sending us nonsense self.sanity_check(&header_batch)?; - // How we handle forks depends on if we are caught up through all checkpoints or not - match self.checkpoints.next().cloned() { - Some(checkpoint) => { - self.catch_up_sync(header_batch, checkpoint).await?; - } - None => { - // Nothing left to do but add the headers to the chain - if self.tip().eq(&header_batch.first().prev_blockhash) { - self.audit_difficulty(self.height(), &header_batch).await?; - self.header_chain.extend(header_batch.inner()); - return Ok(()); + let next_checkpoint = self.checkpoints.next().copied(); + for header in header_batch.into_iter() { + let changes = self.header_chain.accept_header(header); + match changes { + AcceptHeaderChanges::Accepted { connected_at } => { + crate::log!( + self.dialog, + format!( + "Chain updated {} -> {}", + connected_at.height, + connected_at.header.block_hash() + ) + ); + self.write_changes(BlockHeaderChanges::Connected(connected_at)) + .await; + if let Some(checkpoint) = next_checkpoint { + if connected_at.height.eq(&checkpoint.height) { + if connected_at.header.block_hash().eq(&checkpoint.hash) { + crate::log!( + self.dialog, + format!("Found checkpoint, height: {}", checkpoint.height) + ); + self.checkpoints.advance(); + } else { + self.dialog + .send_warning( + Warning::UnexpectedSyncError { warning: "Unmatched checkpoint sent by a peer. Restarting header sync with new peers.".into() } + ); + return Err(HeaderSyncError::InvalidCheckpoint); + } + } + } } - // We see if we have this previous hash in the database, and reload our - // chain from that hash if so. - let fork_start_hash = header_batch.first().prev_blockhash; - if !self.contains_hash(fork_start_hash) { - self.load_fork(&header_batch).await?; + AcceptHeaderChanges::Duplicate => (), + AcceptHeaderChanges::ExtendedFork { connected_at } => match next_checkpoint { + Some(_checkpoint_expected) => { + crate::log!(self.dialog, "Detected fork before known checkpoint"); + self.dialog.send_warning(Warning::UnexpectedSyncError { + warning: "Pre-checkpoint fork".into(), + }); + } + None => { + crate::log!( + self.dialog, + format!("Fork created or extended {}", connected_at.height) + ) + } + }, + AcceptHeaderChanges::Reorganization { + accepted, + disconnected, + } => { + crate::log!(self.dialog, "Valid reorganization found"); + self.clear_compact_filter_queue(); + let removed_hashes: Vec = disconnected + .iter() + .map(|index| index.header.block_hash()) + .collect(); + self.cf_header_chain.remove(&removed_hashes); + self.filter_chain.remove(&removed_hashes); + self.block_queue.remove(&removed_hashes); + self.write_changes(BlockHeaderChanges::Reorganized { + accepted, + reorganized: disconnected.clone(), + }) + .await; + let disconnected_event = + Event::BlocksDisconnected(disconnected.into_iter().rev().collect()); + self.dialog.send_event(disconnected_event); } - // Check if the fork has more work. - self.evaluate_fork(&header_batch).await?; + AcceptHeaderChanges::Rejected(rejected_header) => match rejected_header { + HeaderRejection::InvalidPow { + expected: _, + got: _, + } => return Err(HeaderSyncError::InvalidBits), + HeaderRejection::UnknownPrevHash(hash) => { + let mut db = self.db.lock().await; + let header_res = db.height_of(&hash).await.ok().flatten(); + match header_res { + Some(height) => { + let tip = Tip::from_checkpoint(height, hash); + self.header_chain = BlockTree::new(tip, self.network); + drop(db); + if let Err(e) = self.load_headers().await { + crate::log!(self.dialog, + "Failure when attempting to fetch previous headers while syncing" + ); + self.dialog.send_warning(Warning::FailedPersistence { + warning: format!("Persistence failure: {e}"), + }); + } + } + None => return Err(HeaderSyncError::FloatingHeaders), + } + } + }, } - }; - self.manage_memory().await; + } Ok(()) } // These are invariants in all batches of headers we receive fn sanity_check(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> { - let initially_syncing = !self.checkpoints.is_exhausted(); - // Some basic sanity checks that should result in peer bans on errors - - // If we aren't synced up to the checkpoints we don't accept any forks - if initially_syncing && self.tip().ne(&header_batch.first().prev_blockhash) { - return Err(HeaderSyncError::PreCheckpointFork); - } - // All the headers connect with each other and is the difficulty adjustment not absurd if !header_batch.connected() { return Err(HeaderSyncError::HeadersNotConnected); @@ -345,228 +373,6 @@ impl Chain { Ok(()) } - /// Sync with extra requirements on checkpoints and forks - async fn catch_up_sync( - &mut self, - header_batch: HeadersBatch, - checkpoint: HeaderCheckpoint, - ) -> Result<(), HeaderSyncError> { - self.audit_difficulty(self.height(), &header_batch).await?; - // Eagerly append the batch to the chain - self.header_chain.extend(header_batch.inner()); - // We need to check a hard-coded checkpoint - if self.height().ge(&checkpoint.height) { - if self - .blockhash_at_height(checkpoint.height) - .await - .ok_or(HeaderSyncError::InvalidCheckpoint)? - .eq(&checkpoint.hash) - { - crate::log!( - self.dialog, - format!("Found checkpoint, height: {}", checkpoint.height) - ); - crate::log!(self.dialog, "Writing progress to disk..."); - self.checkpoints.advance(); - self.flush_to_disk().await; - } else { - self.dialog - .send_warning( - Warning::UnexpectedSyncError { warning: "Unmatched checkpoint sent by a peer. Restarting header sync with new peers.".into() } - ); - return Err(HeaderSyncError::InvalidCheckpoint); - } - } - Ok(()) - } - - // Audit the difficulty adjustment of the blocks we received - - // This function draws from the neutrino implemention, where even if a fork is valid - // we only accept it if there is more work provided. otherwise, we disconnect the peer sending - // us this fork - async fn evaluate_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> { - self.dialog.send_warning(Warning::EvaluatingFork); - // We only care about the headers these two chains do not have in common - let uncommon: Vec
= header_batch - .inner() - .iter() - .filter(|header| !self.contains_header(header)) - .copied() - .collect(); - let challenge_chainwork = uncommon - .iter() - .map(|header| header.work()) - .reduce(|acc, next| acc + next) - .ok_or(HeaderSyncError::FloatingHeaders)?; - let stem_position = self - .height_of_hash( - uncommon - .first() - .ok_or(HeaderSyncError::FloatingHeaders)? - .prev_blockhash, - ) - .await; - if let Some(stem) = stem_position { - let current_chainwork = self.header_chain.chainwork_after_height(stem); - if current_chainwork.lt(&challenge_chainwork) { - crate::log!(self.dialog, "Valid reorganization found"); - let reorged = self.header_chain.extend(&uncommon); - let removed_hashes = &reorged - .iter() - .map(|disconnect| disconnect.header.block_hash()) - .collect::>(); - self.clear_compact_filter_queue(); - self.cf_header_chain.remove(removed_hashes); - self.filter_chain.remove(removed_hashes); - self.block_queue.remove(removed_hashes); - self.dialog.send_event(Event::BlocksDisconnected(reorged)); - self.flush_over_height(stem).await; - Ok(()) - } else { - self.dialog.send_warning(Warning::UnexpectedSyncError { - warning: "Peer sent us a fork with less work than the current chain".into(), - }); - Err(HeaderSyncError::LessWorkFork) - } - } else { - Err(HeaderSyncError::FloatingHeaders) - } - } - - async fn audit_difficulty( - &mut self, - height_start: u32, - batch: &HeadersBatch, - ) -> Result<(), HeaderSyncError> { - let params = self.network.params(); - if params.no_pow_retargeting { - return Ok(()); - } - if params.allow_min_difficulty_blocks { - return Ok(()); - } - // Next adjustment height = (floor(current height / interval) + 1) * interval - let adjustment_interval = params.difficulty_adjustment_interval() as u32; - let next_multiple = (height_start / adjustment_interval) + 1; - let next_adjustment_height = next_multiple * adjustment_interval; - // The height in the batch where the next adjustment is contained - let offset = next_adjustment_height - height_start; - // We already audited the difficulty last batch - if offset == 0 { - return Ok(()); - } - // We cannot audit the difficulty yet, as the next adjustment will be contained in the next batch - if offset > batch.len() as u32 { - if let Some(tip) = self.cached_header_at_height(height_start) { - if batch.inner().iter().any(|header| header.bits.ne(&tip.bits)) { - self.dialog - .send_warning(Warning::UnexpectedSyncError { - warning: - "The remote peer miscalculated the difficulty adjustment when syncing a batch of headers" - .into(), - }); - return Err(HeaderSyncError::MiscalculatedDifficulty); - } - } - return Ok(()); - } - // The difficulty should be adjusted at this height - let audit_index = (offset - 1) as usize; - // This is the timestamp used to start the boundary - let last_epoch_start_index = next_adjustment_height - adjustment_interval; - // This is the timestamp used to end the boundary - let last_epoch_boundary = if offset == 1 { - // This is the case where the last epoch ends on the tip of our chain - self.fetch_header(height_start).await.ok().flatten() - } else { - // Otherwise we can simply index into the batch and find the header at the boundary' - let last_epoch_boundary_index = (offset - 2) as usize; - batch.get(last_epoch_boundary_index).copied() - }; - // The start of the epoch will always be a member of our chain because the batch size - // is less than the adjustment interval - let last_epoch_start = self - .fetch_header(last_epoch_start_index) - .await - .ok() - .flatten(); - - let audit = batch.get_slice(audit_index..); - - match audit { - Some(headers) => match last_epoch_start.zip(last_epoch_boundary) { - Some((first, second)) => { - let target = - CompactTarget::from_header_difficulty_adjustment(first, second, params); - for header in headers { - let retarget_bits = header.bits; - if retarget_bits.ne(&target) { - self.dialog - .send_warning(Warning::UnexpectedSyncError { - warning: - "The remote peer miscalculated the difficulty adjustment when syncing a batch of headers" - .into(), - }); - return Err(HeaderSyncError::MiscalculatedDifficulty); - } - } - return Ok(()); - } - None => { - crate::log!( - self.dialog, - "Unable to audit difficulty. This is likely due to no history present in the header store" - ); - } - }, - None => { - self.dialog.send_warning(Warning::UnexpectedSyncError { - warning: "Unable to audit the difficulty adjustment due to an index overflow" - .into(), - }); - } - } - Ok(()) - } - - // We don't have a header in memory that we need to evaluate a fork. - // We check if we have it on disk, and load some more headers into memory. - // This call occurs if we sync to a block that is later reorganized out of the chain, - // but we have restarted our node in between these events. - async fn load_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> { - let prev_hash = header_batch.first().prev_blockhash; - let maybe_height = { - let mut db_lock = self.db.lock().await; - db_lock - .height_of(&prev_hash) - .await - .map_err(|_| HeaderSyncError::DbError)? - }; - match maybe_height { - Some(height) => { - // This is a very generous check to ensure a peer cannot get us to load an - // absurd amount of headers into RAM. Because headers come in batches of 2,000, - // we wouldn't accept a fork of a depth more than around 2,000 anyway. - // The only reorgs that have ever been recorded are of depth 1. - if self.height() - height > MAX_REORG_DEPTH { - return Err(HeaderSyncError::FloatingHeaders); - } else { - let older_anchor = HeaderCheckpoint::new(height, prev_hash); - self.header_chain = HeaderChain::new(older_anchor); - self.cf_header_chain = - CFHeaderChain::new(older_anchor, self.cf_header_chain.quorum_required()); - self.filter_chain = FilterChain::new(older_anchor); - } - } - None => return Err(HeaderSyncError::FloatingHeaders), - } - self.load_headers() - .await - .map_err(|_| HeaderSyncError::DbError)?; - Ok(()) - } - // Sync the compact filter headers, possibly encountering conflicts pub(crate) async fn sync_cf_headers( &mut self, @@ -577,10 +383,10 @@ impl Chain { let peer_max = self.heights.lock().await.max(); self.dialog .chain_update( - self.height(), + self.header_chain.height(), self.cf_header_chain.height(), self.filter_chain.height(), - peer_max.unwrap_or(self.height()), + peer_max.unwrap_or(self.header_chain.height()), ) .await; match batch.last_header() { @@ -659,7 +465,7 @@ impl Chain { let stop_hash = self .blockhash_at_height(stop_hash_index) .await - .unwrap_or(self.tip()); + .unwrap_or(self.header_chain.tip_hash()); self.cf_header_chain.set_last_stop_hash(stop_hash); GetCFHeaders { filter_type: FILTER_BASIC, @@ -670,7 +476,9 @@ impl Chain { // Are the compact filter headers caught up to the header chain pub(crate) fn is_cf_headers_synced(&self) -> bool { - self.height().le(&self.cf_header_chain.height()) + self.header_chain + .height() + .le(&self.cf_header_chain.height()) } // Handle a new filter @@ -741,14 +549,14 @@ impl Chain { let stop_hash = self .blockhash_at_height(stop_hash_index) .await - .unwrap_or(self.tip()); + .unwrap_or(self.header_chain.tip_hash()); let peer_max = self.heights.lock().await.max(); self.dialog .chain_update( - self.height(), + self.header_chain.height(), self.cf_header_chain.height(), self.filter_chain.height(), - peer_max.unwrap_or(self.height()), + peer_max.unwrap_or(self.header_chain.height()), ) .await; self.filter_chain.set_last_stop_hash(stop_hash); @@ -761,7 +569,7 @@ impl Chain { // Are we synced with filters pub(crate) fn is_filters_synced(&self) -> bool { - self.height().le(&self.filter_chain.height()) + self.header_chain.height().le(&self.filter_chain.height()) } // Pop a block from the queue of interesting blocks @@ -876,10 +684,7 @@ mod tests { use tokio::sync::Mutex; use crate::{ - chain::{ - checkpoints::{HeaderCheckpoint, HeaderCheckpoints}, - error::HeaderSyncError, - }, + chain::checkpoints::{HeaderCheckpoint, HeaderCheckpoints}, filters::cfheader_chain::AppendAttempt, { dialog::Dialog, @@ -916,66 +721,6 @@ mod tests { ) } - #[tokio::test] - async fn test_depth_one_fork() { - let gen = HeaderCheckpoint::new( - 7, - BlockHash::from_str("62c28f380692524a3a8f1fc66252bc0eb31d6b6a127d2263bdcbee172529fe16") - .unwrap(), - ); - let height_monitor = Arc::new(Mutex::new(HeightMonitor::new())); - let mut chain = new_regtest(gen, height_monitor, 1); - let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap(); - let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap(); - let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap(); - let batch_1 = vec![block_8, block_9, block_10]; - let new_block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093792151c0e9ce4e4c789ca98427d7740cc7acf30d2ca0c08baef266bf152289d814567e5e66ffff7f2001000000").unwrap()).unwrap(); - let block_11: Header = deserialize(&hex::decode("00000020efcf8b12221fccc735b9b0b657ce15b31b9c50aff530ce96a5b4cfe02d8c0068496c1b8a89cf5dec22e46c35ea1035f80f5b666a1b3aa7f3d6f0880d0061adcc567e5e66ffff7f2001000000").unwrap()).unwrap(); - let batch_2 = vec![new_block_10]; - let batch_3 = vec![block_11]; - let batch_4 = vec![block_9, new_block_10, block_11]; - let chain_sync = chain.sync_chain(batch_1).await; - assert!(chain_sync.is_ok()); - // Forks of equal height to the chain should just get rejected - let fork_sync = chain.sync_chain(batch_2).await; - assert!(fork_sync.is_err()); - assert_eq!(fork_sync.err().unwrap(), HeaderSyncError::LessWorkFork); - assert_eq!(10, chain.height()); - // A peer sent us a block we don't know about yet, but is in the best chain - // Best we can do is wait to get the fork from another peer - let float_sync = chain.sync_chain(batch_3).await; - assert!(float_sync.is_err()); - assert_eq!(float_sync.err().unwrap(), HeaderSyncError::FloatingHeaders); - assert_eq!(10, chain.height()); - // Now we can accept the fork because it has more work - let extend_sync = chain.sync_chain(batch_4.clone()).await; - assert_eq!(11, chain.height()); - assert!(extend_sync.is_ok()); - assert_eq!( - vec![block_8, block_9, new_block_10, block_11], - chain - .header_chain - .headers() - .values() - .copied() - .collect::>() - ); - assert_eq!(chain.fetch_header(10).await.unwrap().unwrap(), new_block_10); - // A new peer sending us these headers should not do anything - let dup_sync = chain.sync_chain(batch_4).await; - assert_eq!(11, chain.height()); - assert!(dup_sync.is_ok()); - assert_eq!( - vec![block_8, block_9, new_block_10, block_11], - chain - .header_chain - .headers() - .values() - .copied() - .collect::>() - ); - } - #[tokio::test] async fn test_fork_includes_old_vals() { let gen = HeaderCheckpoint::new( @@ -994,81 +739,26 @@ mod tests { let batch_2 = vec![block_1, block_2, new_block_3, block_4]; let chain_sync = chain.sync_chain(batch_1).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 3); - assert_eq!( - chain - .header_chain - .headers() - .values() - .copied() - .collect::>(), - vec![block_1, block_2, block_3] - ); - let chain_sync = chain.sync_chain(batch_2).await; - assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 4); - assert_eq!( - chain - .header_chain - .headers() - .values() - .copied() - .collect::>(), - vec![block_1, block_2, new_block_3, block_4] - ); - } - - #[tokio::test] - async fn test_depth_two_fork() { - let gen = HeaderCheckpoint::new( - 0, - BlockHash::from_str("0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206") - .unwrap(), - ); - let height_monitor = Arc::new(Mutex::new(HeightMonitor::new())); - let mut chain = new_regtest(gen, height_monitor, 1); - let block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f575b313ad3ef825cfc204c34da8f3c1fd1784e2553accfa38001010587cb57241f855e66ffff7f2000000000").unwrap()).unwrap(); - let block_2: Header = deserialize(&hex::decode("00000020c81cedd6a989939936f31448e49d010a13c2e750acf02d3fa73c9c7ecfb9476e798da2e5565335929ad303fc746acabc812ee8b06139bcf2a4c0eb533c21b8c420855e66ffff7f2000000000").unwrap()).unwrap(); - let batch_1 = vec![block_1, block_2]; - let new_block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f575b313ad3ef825cfc204c34da8f3c1fd1784e2553accfa38001010587cb5724d5855e66ffff7f2004000000").unwrap()).unwrap(); - let new_block_2: Header = deserialize(&hex::decode("00000020d1d80f53343a084bd0da6d6ab846f9fe4a133de051ea00e7cae16ed19f601065798da2e5565335929ad303fc746acabc812ee8b06139bcf2a4c0eb533c21b8c4d6855e66ffff7f2000000000").unwrap()).unwrap(); - let block_3: Header = deserialize(&hex::decode("0000002080f38c14e898d6646dd426428472888966e0d279d86453f42edc56fdb143241aa66c8fa8837d95be3f85d53f22e86a0d6d456b1ab348e073da4d42a39f50637423865e66ffff7f2000000000").unwrap()).unwrap(); - let batch_2 = vec![new_block_1]; - let batch_3 = vec![new_block_1, new_block_2]; - let batch_4 = vec![new_block_1, new_block_2, block_3]; - let chain_sync = chain.sync_chain(batch_1).await; - assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2); - assert_eq!( - chain - .header_chain - .headers() - .values() - .copied() - .collect::>(), - vec![block_1, block_2] - ); + assert_eq!(chain.header_chain.height(), 3); + let mut index = 1; + for block in vec![block_1, block_2, block_3] { + assert_eq!( + chain.header_chain.block_hash_at_height(index).unwrap(), + block.into() + ); + index += 1; + } let chain_sync = chain.sync_chain(batch_2).await; - assert!(chain_sync.is_err()); - assert_eq!(chain_sync.err().unwrap(), HeaderSyncError::LessWorkFork); - assert_eq!(chain.height(), 2); - let chain_sync = chain.sync_chain(batch_3).await; - assert!(chain_sync.is_err()); - assert_eq!(chain_sync.err().unwrap(), HeaderSyncError::LessWorkFork); - assert_eq!(chain.height(), 2); - let chain_sync = chain.sync_chain(batch_4).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 3); - assert_eq!(chain.fetch_header(3).await.unwrap().unwrap(), block_3); - assert_eq!( - chain - .header_chain - .headers() - .values() - .copied() - .collect::>(), - vec![new_block_1, new_block_2, block_3] - ); + assert_eq!(chain.header_chain.height(), 4); + let mut index = 1; + for block in vec![block_1, block_2, new_block_3, block_4] { + assert_eq!( + chain.header_chain.block_hash_at_height(index).unwrap(), + block.into() + ); + index += 1; + } } #[tokio::test] @@ -1087,7 +777,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1170,7 +860,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1236,7 +926,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1317,7 +1007,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1409,7 +1099,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1497,7 +1187,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1523,7 +1213,7 @@ mod tests { let header_batch = vec![new_block_4, block_5]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2501); + assert_eq!(chain.header_chain.height(), 2501); chain.next_cf_header_message().await; let cf_headers = CFHeaders { filter_type: 0x00, @@ -1620,7 +1310,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1658,7 +1348,7 @@ mod tests { let header_batch = vec![new_block_4, block_5]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2501); + assert_eq!(chain.header_chain.height(), 2501); // Request the CF headers again chain.next_cf_header_message().await; let cf_headers = CFHeaders { @@ -1719,7 +1409,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1844,7 +1534,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1865,7 +1555,7 @@ mod tests { chain.next_cf_header_message().await; let chain_sync = chain.sync_chain(vec![block_5]).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2501); + assert_eq!(chain.header_chain.height(), 2501); assert!(chain.is_synced().await); chain.next_cf_header_message().await; let cf_headers = CFHeaders { @@ -1936,7 +1626,7 @@ mod tests { let header_batch = vec![block_1, block_2, block_3, block_4]; let chain_sync = chain.sync_chain(header_batch).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2500); + assert_eq!(chain.header_chain.height(), 2500); height_monitor.lock().await.insert(1.into(), 2500); assert!(chain.is_synced().await); let filter_1 = hex::decode("018976c0").unwrap(); @@ -1969,7 +1659,7 @@ mod tests { assert!(cf_header_sync_res.is_ok()); let chain_sync = chain.sync_chain(vec![block_5]).await; assert!(chain_sync.is_ok()); - assert_eq!(chain.height(), 2501); + assert_eq!(chain.header_chain.height(), 2501); chain.next_cf_header_message().await; let cf_headers = CFHeaders { filter_type: 0x00, diff --git a/src/chain/error.rs b/src/chain/error.rs index 26d36f6a..03eedc2a 100644 --- a/src/chain/error.rs +++ b/src/chain/error.rs @@ -8,12 +8,10 @@ pub(crate) enum HeaderSyncError { HeadersNotConnected, InvalidHeaderWork, InvalidHeaderTimes, - PreCheckpointFork, InvalidCheckpoint, MiscalculatedDifficulty, InvalidBits, FloatingHeaders, - LessWorkFork, DbError, } @@ -30,9 +28,6 @@ impl Display for HeaderSyncError { HeaderSyncError::InvalidHeaderTimes => { write!(f, "one or more headers does not have a valid block time.") } - HeaderSyncError::PreCheckpointFork => { - write!(f, "the sync peer sent us a discontinuous chain.") - } HeaderSyncError::InvalidCheckpoint => { write!(f, "a checkpoint in the chain did not match.") } @@ -43,9 +38,6 @@ impl Display for HeaderSyncError { f, "the peer sent us a chain that does not connect to any header of ours." ), - HeaderSyncError::LessWorkFork => { - write!(f, "a peer sent us a fork with less work than our chain.") - } HeaderSyncError::DbError => write!(f, "the database could not load a fork."), HeaderSyncError::InvalidBits => write!( f, diff --git a/src/chain/graph.rs b/src/chain/graph.rs index cf7a56e2..841a42b2 100644 --- a/src/chain/graph.rs +++ b/src/chain/graph.rs @@ -9,6 +9,8 @@ use bitcoin::{ use super::IndexedHeader; +const LOCATOR_INDEX: &[u32] = &[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]; + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub(crate) struct Height(u32); @@ -28,7 +30,8 @@ impl Height { Self(self.0 + 1) } - fn checked_sub(&self, other: Height) -> Option { + fn checked_sub(&self, other: impl Into) -> Option { + let other = other.into(); let height_sub_checked = self.0.checked_sub(other.0); height_sub_checked.map(Self) } @@ -132,7 +135,7 @@ pub struct BlockTree { #[allow(unused)] impl BlockTree { - pub fn new(tip: impl Into, network: Network) -> Self { + pub(crate) fn new(tip: impl Into, network: Network) -> Self { let tip = tip.into(); Self { canonical_hashes: BTreeMap::new(), @@ -143,7 +146,7 @@ impl BlockTree { } } - pub fn from_genesis(network: Network) -> Self { + pub(crate) fn from_genesis(network: Network) -> Self { let genesis = genesis_block(network); let height = Height::new(0); let hash = genesis.block_hash(); @@ -166,7 +169,7 @@ impl BlockTree { } } - pub fn from_header(height: impl Into, header: Header, network: Network) -> Self { + pub(crate) fn from_header(height: impl Into, header: Header, network: Network) -> Self { let height = height.into(); let hash = header.block_hash(); let tip = Tip { @@ -186,7 +189,7 @@ impl BlockTree { } } - fn accept_header(&mut self, new_header: Header) -> AcceptHeaderChanges { + pub(crate) fn accept_header(&mut self, new_header: Header) -> AcceptHeaderChanges { let new_hash = new_header.block_hash(); let prev_hash = new_header.prev_blockhash; @@ -385,6 +388,9 @@ impl BlockTree { pub(crate) fn block_hash_at_height(&self, height: impl Into) -> Option { let height = height.into(); + if self.active_tip.height.eq(&height) { + return Some(self.active_tip.hash); + } self.canonical_hashes.get(&height).copied() } @@ -402,6 +408,14 @@ impl BlockTree { self.active_tip.height.to_u32() } + pub(crate) fn contains(&self, hash: BlockHash) -> bool { + self.headers.contains_key(&hash) || self.active_tip.hash.eq(&hash) + } + + pub(crate) fn tip_hash(&self) -> BlockHash { + self.active_tip.hash + } + pub(crate) fn filter_hash(&self, block_hash: BlockHash) -> Option { self.headers.get(&block_hash)?.filter_hash } @@ -412,6 +426,26 @@ impl BlockTree { self.headers.get(hash)?.filter_hash } + pub(crate) fn locators(&self) -> Vec { + let mut locators = Vec::new(); + locators.push(self.active_tip.hash); + for locator in LOCATOR_INDEX { + let height = self.active_tip.height.checked_sub(*locator); + match height { + Some(height) => match self.block_hash_at_height(height) { + Some(hash) => locators.push(hash), + None => return locators.into_iter().rev().collect(), + }, + None => return locators.into_iter().rev().collect(), + } + } + locators.into_iter().rev().collect() + } + + pub(crate) fn internally_cached_headers(&self) -> usize { + self.headers.len() + } + pub(crate) fn iter(&self) -> BlockTreeIterator { BlockTreeIterator { block_tree: self, diff --git a/src/chain/header_batch.rs b/src/chain/header_batch.rs index 667db881..556a7b23 100644 --- a/src/chain/header_batch.rs +++ b/src/chain/header_batch.rs @@ -1,5 +1,3 @@ -use std::ops::RangeFrom; - use bitcoin::{block::Header, params::Params, Target}; use crate::impl_sourceless_error; @@ -57,27 +55,8 @@ impl HeadersBatch { .expect("headers have at least one element by construction") } - // This should connect to the last header we have - pub(crate) fn first(&self) -> &Header { - self.batch - .first() - .expect("headers have at least one element by construction") - } - - pub(crate) fn len(&self) -> usize { - self.batch.len() - } - - pub(crate) fn get(&self, index: usize) -> Option<&Header> { - self.batch.get(index) - } - - pub(crate) fn get_slice(&self, index: RangeFrom) -> Option<&[Header]> { - self.batch.get(index) - } - - pub(crate) fn inner(&self) -> &[Header] { - &self.batch + pub(crate) fn into_iter(self) -> impl Iterator { + self.batch.into_iter() } } diff --git a/src/chain/header_chain.rs b/src/chain/header_chain.rs deleted file mode 100644 index 02e80afa..00000000 --- a/src/chain/header_chain.rs +++ /dev/null @@ -1,404 +0,0 @@ -use std::collections::BTreeMap; - -use bitcoin::{block::Header, BlockHash, Work}; - -use crate::DisconnectedHeader; - -use super::checkpoints::HeaderCheckpoint; - -type Height = u32; -pub(crate) type Headers = BTreeMap; - -const LOCATOR_LOOKBACKS: &[usize] = &[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]; -const MAX_LOOKBACK: usize = 1025; -const RESIZE: usize = 2001; - -#[derive(Debug)] -pub(crate) struct HeaderChain { - anchor_checkpoint: HeaderCheckpoint, - headers: Headers, -} - -impl HeaderChain { - pub(crate) fn new(checkpoint: HeaderCheckpoint) -> Self { - Self { - anchor_checkpoint: checkpoint, - headers: BTreeMap::new(), - } - } - - // Set the headers to those loaded from a database. - // Done separately, such that all asynchronous work is done - // when the node is running. - pub(crate) fn set_headers(&mut self, headers: Headers) { - self.headers = headers; - } - - // Top of the chain - pub(crate) fn tip(&self) -> BlockHash { - match self.headers.values().last() { - Some(header) => header.block_hash(), - None => self.anchor_checkpoint.hash, - } - } - - // The canoncial height of the chain, one less than the length - pub(crate) fn height(&self) -> Height { - self.headers.len() as Height + self.anchor_checkpoint.height - } - - // The length of the chain we have interally - pub(crate) fn inner_len(&self) -> usize { - self.headers().len() - } - - // All the headers of the chain - pub(crate) fn headers(&self) -> &Headers { - &self.headers - } - - // This header chain contains a block hash - pub(crate) fn contains_hash(&self, blockhash: BlockHash) -> bool { - self.headers - .values() - .any(|header| header.block_hash().eq(&blockhash)) - || blockhash.eq(&self.anchor_checkpoint.hash) - } - - // The height of the blockhash in the chain - pub(crate) fn height_of_hash(&self, blockhash: BlockHash) -> Option { - if blockhash.eq(&self.anchor_checkpoint.hash) { - return Some(self.anchor_checkpoint.height); - } - for (height, header) in self.headers.iter().rev() { - if header.block_hash().eq(&blockhash) { - return Some(*height); - } - } - None - } - - // This header chain contains a block hash - pub(crate) fn header_at_height(&self, height: Height) -> Option<&Header> { - self.headers.get(&height) - } - - // This header chain contains a block hash - pub(crate) fn contains_header(&self, other: &Header) -> bool { - self.headers.values().any(|header| header.eq(other)) - } - - // Compute the total work for the chain - fn get_chainwork(&self, headers: &Headers) -> Work { - let work = headers - .values() - .map(|header| header.work()) - .reduce(|acc, next| acc + next); - match work { - Some(w) => w, - None => Work::from_be_bytes([0; 32]), - } - } - - // Canoncial chainwork from the anchor checkpoint - pub(crate) fn chainwork(&self) -> Work { - self.get_chainwork(&self.headers) - } - - // Calculate the chainwork after a fork height to evalutate the fork - pub(crate) fn chainwork_after_height(&self, height: Height) -> Work { - let work = self - .headers - .iter() - .filter(|(h, _)| h.gt(&&height)) - .map(|(_, header)| header.work()) - .reduce(|acc, next| acc + next); - match work { - Some(work) => work, - // If the height is higher than the known chain, we don't have any work - None => Work::from_be_bytes([0; 32]), - } - } - - // The block locators are a way to inform our peer of blocks we know about - pub(crate) fn locators(&mut self) -> Vec { - let mut locators = Vec::new(); - let rev: Vec = self - .headers - .values() - .rev() - .take(MAX_LOOKBACK) - .map(|header| header.block_hash()) - .collect(); - locators.push(self.tip()); - for locator in LOCATOR_LOOKBACKS { - match rev.get(*locator) { - Some(hash) => locators.push(*hash), - None => break, - } - } - locators - } - - // The last ten heights and headers in chronological order - pub(crate) fn last_ten(&self) -> BTreeMap { - self.headers - .iter() - .rev() - .take(10) - .rev() - .map(|(height, header)| (*height, *header)) - .collect() - } - - // Remove old headers from the map to save memory, having saved them on disk - pub(crate) fn move_up(&mut self) { - let mut header_iter = self.headers.iter().rev().take(RESIZE).rev(); - if let Some((height, header)) = header_iter.nth(0) { - self.anchor_checkpoint = HeaderCheckpoint::new(*height, header.block_hash()); - self.headers = header_iter - .map(|(height, header)| (*height, *header)) - .collect(); - } - } - - // Extend the current chain, potentially rewriting history. Higher order functions should decide what we extend - pub(crate) fn extend(&mut self, batch: &[Header]) -> Vec { - let mut reorged = Vec::new(); - // We cannot extend from nothing - if batch.is_empty() { - return reorged; - } - // This is the usual case where the headers link - if self.tip().eq(&batch - .first() - .expect("cannot extend from empty batch") - .prev_blockhash) - { - let current_anchor = self.height(); - for (index, header) in batch.iter().enumerate() { - self.headers - .insert(current_anchor + 1 + index as Height, *header); - } - } else { - // Panic if we don't contain the hash. Something went wrong further up the call stack. - assert!(self.contains_hash( - batch - .first() - .expect("cannot extend from empty batch") - .prev_blockhash - )); - // Supporting MSRV of 1.63 requires this round-about way of removing the headers instead of `pop_last` - // Find the header that matches the new batch prev block hash, collecting the disconnected headers - for (height, header) in self.headers.iter().rev() { - if header.block_hash().ne(&batch - .first() - .expect("cannot extend from empty batch") - .prev_blockhash) - { - reorged.push(DisconnectedHeader::new(*height, *header)); - } else { - break; - } - } - // Actually remove anything that should no longer be connected - for removal in reorged.iter() { - self.remove(&removal.height); - } - // Add back the new headers, starting at the proper link - let current_anchor = self.height(); - for (index, header) in batch.iter().enumerate() { - self.headers - .insert(current_anchor + 1 + index as Height, *header); - } - } - reorged.into_iter().rev().collect() - } - - fn remove(&mut self, height: &Height) { - self.headers.remove(height); - } - - // Clear all the headers from our chain. Only to be used when a peer has feed us faulty checkpoints - #[allow(dead_code)] - pub(crate) fn clear_all(&mut self) { - self.headers.clear() - } -} - -#[cfg(test)] -mod tests { - use bitcoin::consensus::deserialize; - - use super::*; - use std::str::FromStr; - - #[test] - fn test_empty_chain() { - let chain = HeaderChain::new(HeaderCheckpoint::new( - 190_000, - BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2") - .unwrap(), - )); - assert_eq!(chain.chainwork(), Work::from_be_bytes([0; 32])); - assert_eq!( - chain.chainwork_after_height(189_999), - Work::from_be_bytes([0; 32]) - ); - assert_eq!(chain.height(), 190_000); - assert_eq!(chain.tip(), BlockHash::from_str( - "0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2", - ) - .unwrap()); - assert!( - chain.contains_hash( - BlockHash::from_str( - "0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2", - ) - .unwrap() - ) - ); - assert_eq!(chain.inner_len(), 0); - } - - #[test] - fn test_nonempty_chain() { - let block_190_001: Header = deserialize(&hex::decode("00000020f2aeab421cc0995fd40b44de2d07e95e26b3164383a37b0b36b743613a0100001f521e92a0bdc70b68554351f2a72c7476204f312bce7f084768b7054ad915f4f41110669d41011ec816bf00").unwrap()).unwrap(); - let block_190_002: Header = deserialize(&hex::decode("0000002060553eca679219d7c61307fd1e01922416ee1d2dfa09e2a5062aa08ef800000063b913b2d4fde38d46f01bac8b9b177ae5c23c938736dcafc31c3d10c282cc20ff1510669d41011e55671a00").unwrap()).unwrap(); - let block_190_003: Header = deserialize(&hex::decode("0000002042c5fa907f5d28affaa72b430f2732052d7a19f203be794fea39153e7e0000009c8705706dce105bbaf42a9a692d3bdcca1d7e34399e8cc7684700da439bf144291810669d41011ed0241501").unwrap()).unwrap(); - let batch_1 = vec![block_190_001]; - let batch_2 = vec![block_190_002, block_190_003]; - let mut chain = HeaderChain::new(HeaderCheckpoint::new( - 190_000, - BlockHash::from_str("0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2") - .unwrap(), - )); - let reorg = chain.extend(&batch_1); - assert!(reorg.is_empty()); - assert_eq!(chain.height(), 190_001); - assert_eq!(chain.inner_len(), 1); - assert_eq!(chain.chainwork(), block_190_001.work()); - assert_eq!(chain.header_at_height(190_001).unwrap(), &block_190_001); - assert_eq!(chain.chainwork_after_height(190_000), block_190_001.work()); - assert_eq!(chain.chainwork_after_height(189_999), block_190_001.work()); - assert_eq!( - chain.chainwork_after_height(190_001), - Work::from_be_bytes([0; 32]) - ); - let reorg = chain.extend(&batch_2); - assert!(reorg.is_empty()); - assert_eq!(chain.height(), 190_003); - assert_eq!(chain.inner_len(), 3); - assert_eq!(chain.header_at_height(190_003).unwrap(), &block_190_003); - assert_eq!( - chain.chainwork_after_height(190_001), - block_190_002.work() + block_190_003.work() - ); - assert_eq!(chain.tip(), block_190_003.block_hash()); - } - - #[test] - fn test_depth_one_reorg() { - let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap(); - let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap(); - let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap(); - let batch_1 = vec![block_8, block_9, block_10]; - let new_block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093792151c0e9ce4e4c789ca98427d7740cc7acf30d2ca0c08baef266bf152289d814567e5e66ffff7f2001000000").unwrap()).unwrap(); - let block_11: Header = deserialize(&hex::decode("00000020efcf8b12221fccc735b9b0b657ce15b31b9c50aff530ce96a5b4cfe02d8c0068496c1b8a89cf5dec22e46c35ea1035f80f5b666a1b3aa7f3d6f0880d0061adcc567e5e66ffff7f2001000000").unwrap()).unwrap(); - let fork = vec![new_block_10, block_11]; - let mut chain = HeaderChain::new(HeaderCheckpoint::new( - 7, - BlockHash::from_str("62c28f380692524a3a8f1fc66252bc0eb31d6b6a127d2263bdcbee172529fe16") - .unwrap(), - )); - let reorg = chain.extend(&batch_1); - assert!(reorg.is_empty()); - assert_eq!(chain.height(), 10); - assert_eq!( - chain.chainwork_after_height(7), - block_8.work() + block_9.work() + block_10.work() - ); - assert_eq!( - chain.chainwork_after_height(8), - block_9.work() + block_10.work() - ); - let reorg = chain.extend(&fork); - assert_eq!(reorg.len(), 1); - assert_eq!(reorg.first().unwrap().header, block_10); - assert_eq!( - vec![block_8, block_9, new_block_10, block_11], - chain.headers().values().copied().collect::>() - ); - assert_eq!(chain.header_at_height(12), None); - assert_eq!(chain.header_at_height(11), Some(&block_11)); - assert_eq!(chain.header_at_height(10), Some(&new_block_10)); - assert_eq!(chain.header_at_height(9), Some(&block_9)); - assert_eq!(chain.header_at_height(8), Some(&block_8)); - } - - #[test] - fn test_depth_two_reorg() { - let block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f047eb4d0fe76345e307d0e020a079cedfa37101ee7ac84575cf829a611b0f84bc4805e66ffff7f2001000000").unwrap()).unwrap(); - let block_2: Header = deserialize(&hex::decode("00000020299e41732deb76d869fcdb5f72518d3784e99482f572afb73068d52134f1f75e1f20f5da8d18661d0f13aa3db8fff0f53598f7d61f56988a6d66573394b2c6ffc5805e66ffff7f2001000000").unwrap()).unwrap(); - let block_3: Header = deserialize(&hex::decode("00000020b96feaa82716f11befeb608724acee4743e0920639a70f35f1637a88b8b6ea3471f1dbedc283ce6a43a87ed3c8e6326dae8d3dbacce1b2daba08e508054ffdb697815e66ffff7f2001000000").unwrap()).unwrap(); - let block_4: Header = deserialize(&hex::decode("0000002052ff614fa461ff38b4a5c101d04fdcac2f34722e60bd43d12c8de0a394fe0c60444fb24b7e9885f60fed9927d27f23854ecfab29287163ef2b868d5d626f82ed97815e66ffff7f2002000000").unwrap()).unwrap(); - let batch_1 = vec![block_1, block_2, block_3, block_4]; - let new_block_3: Header = deserialize(&hex::decode("00000020b96feaa82716f11befeb608724acee4743e0920639a70f35f1637a88b8b6ea349c6240c5d0521966771808950f796c9c04088bc9551a828b64f1cf06831705dfbc835e66ffff7f2000000000").unwrap()).unwrap(); - let new_block_4: Header = deserialize(&hex::decode("00000020d2a1c6ba2be393f405fe2f4574565f9ee38ac68d264872fcd82b030970d0232ce882eb47c3dd138587120f1ad97dd0e73d1e30b79559ad516cb131f83dcb87e9bc835e66ffff7f2002000000").unwrap()).unwrap(); - let fork = vec![new_block_3, new_block_4]; - let mut chain = HeaderChain::new(HeaderCheckpoint::new( - 0, - BlockHash::from_str("0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206") - .unwrap(), - )); - chain.extend(&batch_1); - let reorged = chain.extend(&fork); - assert_eq!(reorged.len(), 2); - assert_eq!( - vec![block_1, block_2, new_block_3, new_block_4], - chain.headers().values().copied().collect::>() - ); - assert_eq!(chain.header_at_height(4), Some(&new_block_4)); - assert_eq!(chain.header_at_height(3), Some(&new_block_3)); - assert_eq!(chain.header_at_height(2), Some(&block_2)); - assert_eq!(chain.header_at_height(1), Some(&block_1)); - } - - #[tokio::test] - async fn reorg_is_stump() { - let block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f575b313ad3ef825cfc204c34da8f3c1fd1784e2553accfa38001010587cb57241f855e66ffff7f2000000000").unwrap()).unwrap(); - let block_2: Header = deserialize(&hex::decode("00000020c81cedd6a989939936f31448e49d010a13c2e750acf02d3fa73c9c7ecfb9476e798da2e5565335929ad303fc746acabc812ee8b06139bcf2a4c0eb533c21b8c420855e66ffff7f2000000000").unwrap()).unwrap(); - let batch_1 = vec![block_1, block_2]; - let new_block_1: Header = deserialize(&hex::decode("0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f575b313ad3ef825cfc204c34da8f3c1fd1784e2553accfa38001010587cb5724d5855e66ffff7f2004000000").unwrap()).unwrap(); - let new_block_2: Header = deserialize(&hex::decode("00000020d1d80f53343a084bd0da6d6ab846f9fe4a133de051ea00e7cae16ed19f601065798da2e5565335929ad303fc746acabc812ee8b06139bcf2a4c0eb533c21b8c4d6855e66ffff7f2000000000").unwrap()).unwrap(); - let fork = vec![new_block_1, new_block_2]; - let block_3: Header = deserialize(&hex::decode("0000002080f38c14e898d6646dd426428472888966e0d279d86453f42edc56fdb143241aa66c8fa8837d95be3f85d53f22e86a0d6d456b1ab348e073da4d42a39f50637423865e66ffff7f2000000000").unwrap()).unwrap(); - let block_4: Header = deserialize(&hex::decode("000000204877fed370af64c0a1f7a76f6944e1127aad965b1865f99ecfdf8fa72ae23377f51921d01ff1131bd589500a8ca142884297ceeb1aa762ad727249e9a23f2cb023865e66ffff7f2000000000").unwrap()).unwrap(); - let batch_2 = vec![block_3, block_4]; - let mut chain = HeaderChain::new(HeaderCheckpoint::new( - 0, - BlockHash::from_str("0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206") - .unwrap(), - )); - chain.extend(&batch_1); - let reorged = chain.extend(&fork); - assert_eq!(reorged.len(), 2); - assert_eq!( - reorged.iter().map(|f| f.header).collect::>(), - vec![block_1, block_2] - ); - assert_eq!( - vec![new_block_1, new_block_2], - chain.headers().values().copied().collect::>() - ); - let no_org = chain.extend(&batch_2); - assert_eq!(no_org.len(), 0); - assert_eq!( - vec![new_block_1, new_block_2, block_3, block_4], - chain.headers().values().copied().collect::>() - ); - assert_eq!(chain.header_at_height(3), Some(&block_3)); - let want = chain.height_of_hash(new_block_2.block_hash()); - assert_eq!(Some(2), want); - } -} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 2f1b0835..30ea63c7 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -11,7 +11,6 @@ pub mod checkpoints; pub(crate) mod error; pub(crate) mod graph; pub(crate) mod header_batch; -pub(crate) mod header_chain; use std::collections::HashMap; @@ -21,9 +20,12 @@ use crate::network::PeerId; type Height = u32; +/// A block header with associated height. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct IndexedHeader { + /// The height in the blockchain for this header. pub height: u32, + /// The block header. pub header: Header, } diff --git a/src/db/mod.rs b/src/db/mod.rs index a08a13c3..e720dba7 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -10,6 +10,8 @@ use bitcoin::key::rand::{thread_rng, Rng}; use bitcoin::p2p::address::AddrV2; use bitcoin::p2p::ServiceFlags; +use crate::chain::IndexedHeader; + /// Errors a database backend may produce. pub mod error; /// Persistence traits defined with SQL Lite to store data between sessions. @@ -75,3 +77,17 @@ impl PeerStatus { rng.gen() } } + +/// Changes applied to the chain of block headers. +#[derive(Debug, Clone)] +pub enum BlockHeaderChanges { + /// A block was connected to the tip of the chain. + Connected(IndexedHeader), + /// Blocks were reorganized and a new chain of most work was selected. + Reorganized { + /// Newly accepted blocks from the chain of most work. + accepted: Vec, + /// Blocks that were removed from the chain. + reorganized: Vec, + }, +} diff --git a/src/db/sqlite/headers.rs b/src/db/sqlite/headers.rs index 0d754d43..3c846a1a 100644 --- a/src/db/sqlite/headers.rs +++ b/src/db/sqlite/headers.rs @@ -12,6 +12,7 @@ use tokio::sync::Mutex; use crate::db::error::{SqlHeaderStoreError, SqlInitializationError}; use crate::db::traits::HeaderStore; +use crate::db::BlockHeaderChanges; use crate::prelude::FutureResult; use super::{DATA_DIR, DEFAULT_CWD}; @@ -160,16 +161,12 @@ impl SqliteHeaderDb { Ok(headers) } - async fn write( - &mut self, - header_chain: &BTreeMap, - ) -> Result<(), SqlHeaderStoreError> { + async fn write(&mut self, changes: BlockHeaderChanges) -> Result<(), SqlHeaderStoreError> { let mut write_lock = self.conn.lock().await; let tx = write_lock.transaction()?; - let best_height: Option = - tx.query_row("SELECT MAX(height) FROM headers", [], |row| row.get(0))?; - for (height, header) in header_chain { - if height.ge(&(best_height.unwrap_or(0))) { + match changes { + BlockHeaderChanges::Connected(indexed_header) => { + let header = indexed_header.header; let hash: String = header.block_hash().to_string(); let version: i32 = header.version.to_consensus(); let prev_hash: String = header.prev_blockhash.as_raw_hash().to_string(); @@ -181,7 +178,7 @@ impl SqliteHeaderDb { tx.execute( stmt, params![ - height, + indexed_header.height, hash, version, prev_hash, @@ -192,43 +189,37 @@ impl SqliteHeaderDb { ], )?; } - } - tx.commit()?; - Ok(()) - } - - async fn write_over( - &mut self, - header_chain: &BTreeMap, - height: u32, - ) -> Result<(), SqlHeaderStoreError> { - let mut write_lock = self.conn.lock().await; - let tx = write_lock.transaction()?; - for (new_height, header) in header_chain { - if new_height.ge(&height) { - let hash: String = header.block_hash().to_string(); - let version: i32 = header.version.to_consensus(); - let prev_hash: String = header.prev_blockhash.as_raw_hash().to_string(); - let merkle_root: String = header.merkle_root.to_string(); - let time: u32 = header.time; - let bits: u32 = header.bits.to_consensus(); - let nonce: u32 = header.nonce; - let stmt = "INSERT OR REPLACE INTO headers (height, block_hash, version, prev_hash, merkle_root, time, bits, nonce) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"; - tx.execute( - stmt, - params![ - new_height, - hash, - version, - prev_hash, - merkle_root, - time, - bits, - nonce - ], - )?; + BlockHeaderChanges::Reorganized { + accepted, + reorganized: _, + } => { + for indexed_header in accepted { + let header = indexed_header.header; + let hash: String = header.block_hash().to_string(); + let version: i32 = header.version.to_consensus(); + let prev_hash: String = header.prev_blockhash.as_raw_hash().to_string(); + let merkle_root: String = header.merkle_root.to_string(); + let time: u32 = header.time; + let bits: u32 = header.bits.to_consensus(); + let nonce: u32 = header.nonce; + let stmt = "INSERT OR REPLACE INTO headers (height, block_hash, version, prev_hash, merkle_root, time, bits, nonce) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"; + tx.execute( + stmt, + params![ + indexed_header.height, + hash, + version, + prev_hash, + merkle_root, + time, + bits, + nonce + ], + )?; + } } } + tx.commit()?; Ok(()) } @@ -325,19 +316,8 @@ impl HeaderStore for SqliteHeaderDb { Box::pin(self.load(range)) } - fn write<'a>( - &'a mut self, - header_chain: &'a BTreeMap, - ) -> FutureResult<'a, (), Self::Error> { - Box::pin(self.write(header_chain)) - } - - fn write_over<'a>( - &'a mut self, - header_chain: &'a BTreeMap, - height: u32, - ) -> FutureResult<'a, (), Self::Error> { - Box::pin(self.write_over(header_chain, height)) + fn write(&mut self, changes: BlockHeaderChanges) -> FutureResult<(), Self::Error> { + Box::pin(self.write(changes)) } fn height_of<'a>( @@ -358,6 +338,8 @@ impl HeaderStore for SqliteHeaderDb { #[cfg(test)] mod tests { + use crate::chain::IndexedHeader; + use super::*; use bitcoin::consensus::deserialize; @@ -369,19 +351,27 @@ mod tests { let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap(); let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap(); let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap(); + let changes_8 = IndexedHeader::new(8, block_8); + let changes_9 = IndexedHeader::new(9, block_9); + let changes_10 = IndexedHeader::new(10, block_10); let mut map = BTreeMap::new(); map.insert(8, block_8); map.insert(9, block_9); map.insert(10, block_10); let block_hash_8 = block_8.block_hash(); let block_hash_9 = block_9.block_hash(); - let w = db.write(&map).await; + let w = db.write(BlockHeaderChanges::Connected(changes_8)).await; + assert!(w.is_ok()); + let w = db.write(BlockHeaderChanges::Connected(changes_9)).await; + assert!(w.is_ok()); + let w = db.write(BlockHeaderChanges::Connected(changes_10)).await; assert!(w.is_ok()); let get_hash_9 = db.hash_at(9).await.unwrap().unwrap(); assert_eq!(get_hash_9, block_hash_9); let get_height_8 = db.height_of(&block_hash_8).await.unwrap().unwrap(); assert_eq!(get_height_8, 8); let load = db.load(7..).await.unwrap(); + assert_eq!(map, load); let get_header_9 = db.header_at(9).await.unwrap().unwrap(); assert_eq!(get_header_9, block_9); @@ -405,7 +395,14 @@ mod tests { map.insert(8, block_8); map.insert(9, block_9); map.insert(10, block_10); - let w = db.write(&map).await; + let changes_8 = IndexedHeader::new(8, block_8); + let changes_9 = IndexedHeader::new(9, block_9); + let changes_10 = IndexedHeader::new(10, block_10); + let w = db.write(BlockHeaderChanges::Connected(changes_8)).await; + assert!(w.is_ok()); + let w = db.write(BlockHeaderChanges::Connected(changes_9)).await; + assert!(w.is_ok()); + let w = db.write(BlockHeaderChanges::Connected(changes_10)).await; assert!(w.is_ok()); let get_height_10 = db.header_at(10).await.unwrap().unwrap(); assert_eq!(block_10, get_height_10); @@ -414,12 +411,20 @@ mod tests { let mut map = BTreeMap::new(); map.insert(10, new_block_10); map.insert(11, block_11); - let w = db.write_over(&map, 10).await; + let accepted = vec![ + IndexedHeader::new(10, new_block_10), + IndexedHeader::new(11, block_11), + ]; + let reorganized = vec![IndexedHeader::new(10, block_10)]; + let w = db + .write(BlockHeaderChanges::Reorganized { + accepted, + reorganized, + }) + .await; assert!(w.is_ok()); let block_hash_11 = block_11.block_hash(); let block_hash_10 = new_block_10.block_hash(); - let w = db.write(&map).await; - assert!(w.is_ok()); let get_height_10 = db.header_at(10).await.unwrap().unwrap(); assert_eq!(new_block_10, get_height_10); let get_height_12 = db.header_at(12).await.unwrap(); @@ -451,7 +456,15 @@ mod tests { map.insert(8, block_8); map.insert(9, block_9); map.insert(10, block_10); - let w = db.write(&map).await; + let changes_8 = IndexedHeader::new(8, block_8); + let changes_9 = IndexedHeader::new(9, block_9); + let changes_10 = IndexedHeader::new(10, block_10); + let w = db.write(BlockHeaderChanges::Connected(changes_8)).await; + assert!(w.is_ok()); + let w = db.write(BlockHeaderChanges::Connected(changes_9)).await; + assert!(w.is_ok()); + let w = db.write(BlockHeaderChanges::Connected(changes_10)).await; + assert!(w.is_ok()); assert!(w.is_ok()); let load = db.load(7..).await.unwrap(); assert_eq!(map, load); diff --git a/src/db/traits.rs b/src/db/traits.rs index a77ecd37..3268cd94 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -6,7 +6,7 @@ use bitcoin::{block::Header, BlockHash}; use crate::prelude::FutureResult; -use super::PersistedPeer; +use super::{BlockHeaderChanges, PersistedPeer}; /// Methods required to persist the chain of block headers. pub trait HeaderStore: Debug + Send + Sync { @@ -18,18 +18,8 @@ pub trait HeaderStore: Debug + Send + Sync { range: impl RangeBounds + Send + Sync + 'a, ) -> FutureResult<'a, BTreeMap, Self::Error>; - /// Write an indexed map of block headers to the database, ignoring if they already exist. - fn write<'a>( - &'a mut self, - header_chain: &'a BTreeMap, - ) -> FutureResult<'a, (), Self::Error>; - - /// Write the headers to the database, replacing headers over the specified height. - fn write_over<'a>( - &'a mut self, - header_chain: &'a BTreeMap, - height: u32, - ) -> FutureResult<'a, (), Self::Error>; + /// Write an changes to the backend as new headers are found. + fn write(&mut self, changes: BlockHeaderChanges) -> FutureResult<(), Self::Error>; /// Return the height of a block hash in the database, if it exists. fn height_of<'a>( @@ -114,27 +104,13 @@ mod test { Box::pin(do_load()) } - fn write<'a>( - &'a mut self, - _header_chain: &'a BTreeMap, - ) -> FutureResult<'a, (), Self::Error> { + fn write(&mut self, _changes: BlockHeaderChanges) -> FutureResult<(), Self::Error> { async fn do_write() -> Result<(), Infallible> { Ok(()) } Box::pin(do_write()) } - fn write_over<'a>( - &'a mut self, - _header_chain: &'a BTreeMap, - _height: u32, - ) -> FutureResult<'a, (), Self::Error> { - async fn do_write_over() -> Result<(), Infallible> { - Ok(()) - } - Box::pin(do_write_over()) - } - fn height_of<'a>( &'a mut self, _block_hash: &'a BlockHash, diff --git a/src/lib.rs b/src/lib.rs index de8a81d3..6f889b77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,21 +173,6 @@ impl IndexedTransaction { } } -/// A block [`Header`] that was disconnected from the chain of most work along with its previous height. -#[derive(Debug, Clone, Copy)] -pub struct DisconnectedHeader { - /// The height where this header used to be in the chain. - pub height: u32, - /// The reorganized header. - pub header: Header, -} - -impl DisconnectedHeader { - pub(crate) fn new(height: u32, header: Header) -> Self { - Self { height, header } - } -} - /// A Bitcoin [`Block`] with associated height. #[derive(Debug, Clone)] pub struct IndexedBlock { diff --git a/src/messages.rs b/src/messages.rs index 8e631d3a..b7265694 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -7,8 +7,8 @@ use bitcoin::{block::Header, p2p::message_network::RejectReason, FeeRate, Script #[cfg(feature = "filter-control")] use crate::IndexedFilter; use crate::{ - chain::checkpoints::HeaderCheckpoint, DisconnectedHeader, IndexedBlock, NodeState, TrustedPeer, - TxBroadcast, + chain::{checkpoints::HeaderCheckpoint, IndexedHeader}, + IndexedBlock, NodeState, TrustedPeer, TxBroadcast, }; use super::error::{FetchBlockError, FetchHeaderError}; @@ -55,7 +55,7 @@ pub enum Event { /// The node is fully synced, having scanned the requested range. Synced(SyncUpdate), /// Blocks were reorganized out of the chain. - BlocksDisconnected(Vec), + BlocksDisconnected(Vec), /// A compact block filter with associated height and block hash. #[cfg(feature = "filter-control")] IndexedFilter(IndexedFilter), diff --git a/src/node.rs b/src/node.rs index 91e8d4fc..274fea9c 100644 --- a/src/node.rs +++ b/src/node.rs @@ -398,9 +398,8 @@ impl Node { let mut state = self.state.write().await; match *state { NodeState::Behind => { - let mut header_chain = self.chain.lock().await; + let header_chain = self.chain.lock().await; if header_chain.is_synced().await { - header_chain.flush_to_disk().await; self.dialog .send_info(Log::StateChange(NodeState::HeadersSynced)) .await; @@ -426,12 +425,15 @@ impl Node { } } NodeState::FiltersSynced => { - let header_chain = self.chain.lock().await; - if header_chain.block_queue_empty() { + let chain = self.chain.lock().await; + if chain.block_queue_empty() { *state = NodeState::TransactionsSynced; let update = SyncUpdate::new( - HeaderCheckpoint::new(header_chain.height(), header_chain.tip()), - header_chain.last_ten(), + HeaderCheckpoint::new( + chain.header_chain.height(), + chain.header_chain.tip_hash(), + ), + chain.last_ten(), ); self.dialog .send_info(Log::StateChange(NodeState::TransactionsSynced)) @@ -566,12 +568,6 @@ impl Node { } return self.next_stateful_message(chain.deref_mut()).await; } - HeaderSyncError::LessWorkFork => { - self.dialog.send_warning(Warning::UnexpectedSyncError { - warning: "A peer sent us a fork with less work.".into(), - }); - return Some(MainThreadMessage::Disconnect); - } _ => { self.dialog.send_warning(Warning::UnexpectedSyncError { warning: format!("Unexpected header syncing error: {}", e), @@ -683,14 +679,17 @@ impl Node { let mut peer_map = self.peer_map.lock().await; for block in blocks.iter() { peer_map.increment_height(nonce).await; - if !chain.contains_hash(*block) { + if !chain.header_chain.contains(*block) { crate::log!(self.dialog, format!("New block: {}", block)); } } match *state { NodeState::Behind => None, _ => { - if blocks.into_iter().any(|block| !chain.contains_hash(block)) { + if blocks + .into_iter() + .any(|block| !chain.header_chain.contains(block)) + { self.dialog .send_info(Log::StateChange(NodeState::Behind)) .await;