diff --git a/README.md b/README.md index da2c11f3..d87df858 100644 --- a/README.md +++ b/README.md @@ -135,18 +135,6 @@ just integrate If you do not have `bitcoind` installed, you may simply run `just integrate` and it will be installed for you in the `build` folder. -## Project Layout - -`chain`: Contains all logic for syncing block headers, filter headers, filters, parsing blocks. Also contains preset checkpoints for Signet, Regtest, and Bitcoin networks. Notable files: `chain.rs` - -`core`: Organizes the primary user-facing components of the API. This includes both the `Node` and the `Client` that all developers will interact with, as well as the `NodeBuilder`. Importantly includes `peer_map.rs`, which is the primary file that handles peer threads by sending messages, persisting new peers, banning peers, and managing peer task handles. `node.rs` is the main application loop, responsible for driving the node actions. Notable files: `node.rs`, `peer_map.rs`, `builder.rs`, `client.rs` - -`db`: Defines how data must be persisted with `traits.rs`, and includes some opinionated defaults for database components. - -`filters`: Additional structures for managing compact filter headers and filters, used by `chain.rs` - -`network`: Opens and closes connections, handles encryption and decryption of messages, generates messages, parses messages, times message response times, performs DNS lookups. Notable files: `peer.rs`, `reader.rs`, `parsers.rs`, `outbound_messages.rs` - ## Contributing Please read [CONTRIBUTING.md](./CONTRIBUTING.md) to get started. diff --git a/doc/DETAILS.md b/doc/DETAILS.md index fdbfeb9f..7b8bc1c2 100644 --- a/doc/DETAILS.md +++ b/doc/DETAILS.md @@ -81,18 +81,23 @@ The wallet required 12 block downloads, and took 5 minutes. This section details what behavior to expect when using Kyoto, and why such decisions were made. -### Peer Selection +## Peer Selection Kyoto will first connect to all of the configured peers to maintain the connection requirement, and will use peers gleaned from the peer-to-peer gossip thereafter. If no peers are configured when building the node, and no peers are in the database, Kyoto will resort to DNS. Kyoto will not select a peer of the same netgroup (`/16`) as a previously connected peer. When selecting a new peer from the database, a random preference will be selected between a "new" peer and a peer that has been "tried." Rational is derived from [this research](https://www.ethanheilman.com/p/eclipse/index.html) -### Block Headers and Storage +## Block Headers and Storage Kyoto expects users to adopt some form of persistence between sessions when it comes to block header data. Reason being, Kyoto emits block headers that have been reorganized back to the client in such an event. To do so, in a rare but potential circumstance where the client has shut down on a stale tip, one that is reorganized in the future, Kyoto may use the header persistence to load the older chain into memory. Further, this allows the memory footprint of storing headers in a chain structure to remain small. Kyoto has a soft limit of 20,000 headers in memory at any given time, and if the chain representation exceeds that, Kyoto has a reliable backend to move the excess of block headers. To compensate for this, Kyoto only expects some generic datastore, and does not care about how persistence is implemented. -### Filters +## Filters Block filters for a full block may be 300-400 bytes, and may be needless overhead if scripts are revealed "into the future" for the underlying wallet. Full filters are checked for matches as they are downloaded, but are discarded thereafter. As a result, if the user adds scripts that are believe to be included in blocks _in the past_, Kyoto will have to redownload the filters. But if the wallet has up to date information, a revealing a new script is guaranteed to have not been used. This memory tradeoff was deemed worthwhile, as it is expected rescanning will only occur for recovery scenarios. -### Structure +## Structure + +* `chain`: Contains all logic for syncing block headers, filter headers, filters, parsing blocks. Also contains preset checkpoints for Signet, Regtest, and Bitcoin networks. Notable files: `chain.rs` +* `db`: Defines how data must be persisted with `traits.rs`, and includes some opinionated defaults for database components. +* `filters`: Additional structures for managing compact filter headers and filters, used by `chain.rs` +* `network`: Opens and closes connections, handles encryption and decryption of messages, generates messages, parses messages, times message response times, performs DNS lookups. Notable files: `peer.rs`, `reader.rs`, `parsers.rs`, `outbound_messages.rs` ![Layout](https://github.com/user-attachments/assets/21280bb4-aa88-4e11-9223-aed35a885e99) diff --git a/example/managed.rs b/example/managed.rs index 5b7b8561..cdb3473f 100644 --- a/example/managed.rs +++ b/example/managed.rs @@ -2,8 +2,8 @@ //! many possible scripts to check. Enable the `filter-control` feature to check filters //! manually in your program. -use kyoto::core::messages::Event; -use kyoto::{chain::checkpoints::HeaderCheckpoint, core::builder::NodeBuilder, Client}; +use kyoto::messages::Event; +use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint, Client}; use kyoto::{AddrV2, Address, BlockHash, LogLevel, Network, ServiceFlags, TrustedPeer}; use std::collections::HashSet; use std::{net::Ipv4Addr, str::FromStr}; diff --git a/example/rescan.rs b/example/rescan.rs index bab80db1..b8a0ddba 100644 --- a/example/rescan.rs +++ b/example/rescan.rs @@ -2,7 +2,7 @@ //! the filters for inclusions of these scripts and download the relevant //! blocks. -use kyoto::{chain::checkpoints::HeaderCheckpoint, core::builder::NodeBuilder}; +use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint}; use kyoto::{Address, Client, Event, Network}; use std::collections::HashSet; use std::str::FromStr; diff --git a/example/signet.rs b/example/signet.rs index d50eeeae..40d11552 100644 --- a/example/signet.rs +++ b/example/signet.rs @@ -1,6 +1,6 @@ //! Usual sync on Signet. -use kyoto::{chain::checkpoints::HeaderCheckpoint, core::builder::NodeBuilder}; +use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint}; use kyoto::{AddrV2, Address, Client, Event, Log, Network, ServiceFlags, TrustedPeer}; use std::collections::HashSet; use std::{ diff --git a/example/testnet4.rs b/example/testnet4.rs index 8827c2c4..1616a686 100644 --- a/example/testnet4.rs +++ b/example/testnet4.rs @@ -1,6 +1,6 @@ //! Usual sync on Testnet. -use kyoto::{chain::checkpoints::HeaderCheckpoint, core::builder::NodeBuilder}; +use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint}; use kyoto::{Address, Client, Event, Log, Network, PeerStoreSizeConfig, TrustedPeer}; use std::collections::HashSet; use std::{net::Ipv4Addr, str::FromStr}; diff --git a/src/core/broadcaster.rs b/src/broadcaster.rs similarity index 100% rename from src/core/broadcaster.rs rename to src/broadcaster.rs diff --git a/src/core/builder.rs b/src/builder.rs similarity index 98% rename from src/core/builder.rs rename to src/builder.rs index fe3e6fce..24430dc8 100644 --- a/src/core/builder.rs +++ b/src/builder.rs @@ -5,7 +5,7 @@ use bitcoin::Network; #[cfg(not(feature = "filter-control"))] use bitcoin::ScriptBuf; -use super::{client::Client, config::NodeConfig, node::Node, FilterSyncPolicy}; +use super::{client::Client, config::NodeConfig, node::Node}; #[cfg(feature = "database")] use crate::db::error::SqlInitializationError; #[cfg(feature = "database")] @@ -15,7 +15,7 @@ use crate::{ chain::checkpoints::HeaderCheckpoint, db::traits::{HeaderStore, PeerStore}, }; -use crate::{ConnectionType, LogLevel, PeerStoreSizeConfig, TrustedPeer}; +use crate::{ConnectionType, FilterSyncPolicy, LogLevel, PeerStoreSizeConfig, TrustedPeer}; #[cfg(feature = "database")] /// The default node returned from the [`NodeBuilder`](crate::core). diff --git a/src/chain/block_queue.rs b/src/chain/block_queue.rs index d3f3fc95..3ceed98e 100644 --- a/src/chain/block_queue.rs +++ b/src/chain/block_queue.rs @@ -4,8 +4,8 @@ use bitcoin::BlockHash; use tokio::time::Instant; #[cfg(feature = "filter-control")] -use crate::core::messages::BlockRequest; -use crate::core::messages::BlockSender; +use crate::messages::BlockRequest; +use crate::messages::BlockSender; const SPAM_LIMIT: u64 = 5; diff --git a/src/chain/chain.rs b/src/chain/chain.rs index b4d43705..93e3a61a 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -20,18 +20,13 @@ use super::{ HeightMonitor, }; #[cfg(feature = "filter-control")] -use crate::core::error::FetchBlockError; +use crate::error::FetchBlockError; #[cfg(feature = "filter-control")] -use crate::core::messages::BlockRequest; +use crate::messages::BlockRequest; #[cfg(feature = "filter-control")] use crate::IndexedFilter; use crate::{ chain::header_batch::HeadersBatch, - core::{ - dialog::Dialog, - error::HeaderPersistenceError, - messages::{Event, Warning}, - }, db::traits::HeaderStore, filters::{ cfheader_batch::CFHeaderBatch, @@ -41,6 +36,11 @@ use crate::{ Filter, CF_HEADER_BATCH_SIZE, FILTER_BATCH_SIZE, }, IndexedBlock, + { + dialog::Dialog, + error::HeaderPersistenceError, + messages::{Event, Warning}, + }, }; const MAX_REORG_DEPTH: u32 = 5_000; @@ -902,11 +902,11 @@ mod tests { checkpoints::{HeaderCheckpoint, HeaderCheckpoints}, error::HeaderSyncError, }, - core::{ + filters::cfheader_chain::AppendAttempt, + { dialog::Dialog, messages::{Event, Log, Warning}, }, - filters::cfheader_chain::AppendAttempt, }; use super::{Chain, HeightMonitor}; diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 9613bd12..dde9c609 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -14,7 +14,7 @@ pub(crate) mod header_chain; use std::collections::HashMap; -use crate::core::PeerId; +use crate::network::PeerId; type Height = u32; diff --git a/src/core/channel_messages.rs b/src/channel_messages.rs similarity index 96% rename from src/core/channel_messages.rs rename to src/channel_messages.rs index ff79a563..80cacb86 100644 --- a/src/core/channel_messages.rs +++ b/src/channel_messages.rs @@ -9,9 +9,7 @@ use bitcoin::{ Block, BlockHash, FeeRate, Transaction, Wtxid, }; -use crate::core::messages::RejectPayload; - -use super::PeerId; +use crate::{messages::RejectPayload, network::PeerId}; #[derive(Debug, Clone)] pub(crate) enum MainThreadMessage { diff --git a/src/core/client.rs b/src/client.rs similarity index 100% rename from src/core/client.rs rename to src/client.rs diff --git a/src/core/config.rs b/src/config.rs similarity index 93% rename from src/core/config.rs rename to src/config.rs index e1053501..f41b9a52 100644 --- a/src/core/config.rs +++ b/src/config.rs @@ -3,12 +3,10 @@ use std::{collections::HashSet, path::PathBuf, time::Duration}; use bitcoin::ScriptBuf; use crate::{ - chain::checkpoints::HeaderCheckpoint, network::dns::DnsResolver, ConnectionType, LogLevel, - PeerStoreSizeConfig, TrustedPeer, + chain::checkpoints::HeaderCheckpoint, network::dns::DnsResolver, ConnectionType, + FilterSyncPolicy, LogLevel, PeerStoreSizeConfig, TrustedPeer, }; -use super::FilterSyncPolicy; - const REQUIRED_PEERS: u8 = 1; const TIMEOUT_SECS: u64 = 5; // sec min hour diff --git a/src/core/mod.rs b/src/core/mod.rs deleted file mode 100644 index f408d333..00000000 --- a/src/core/mod.rs +++ /dev/null @@ -1,108 +0,0 @@ -//! Tools to build and run a compact block filters node. -//! -//! All logic for syncing with the Bitcoin network occurs within a [`Node`](node::Node). Nodes emit events of relevance -//! by sending logs, warnings and events. These may be consumed by a [`Client`](client::Client). A client may also send -//! messages to a node to add more Bitcoin scripts, broadcast transactions, and more. -//! -//! To build a [`Node`](node::Node) and [`Client`](client::Client), please refer to the [`NodeBuilder`](builder::NodeBuilder), which allows for node -//! configuration. - -use std::hash::Hash; -use std::time::Duration; - -use tokio::time::Instant; - -mod broadcaster; -/// Convenient way to build a compact filters node. -pub mod builder; -pub(crate) mod channel_messages; -/// Structures to communicate with a node. -pub mod client; -/// Node configuration options. -pub(crate) mod config; -pub(crate) mod dialog; -/// Errors associated with a node. -pub mod error; -/// Messages the node may send a client. -pub mod messages; -/// The structure that communicates with the Bitcoin P2P network and collects data. -pub mod node; -mod peer_map; -#[cfg(feature = "filter-control")] -use crate::IndexedBlock; -#[cfg(feature = "filter-control")] -use error::FetchBlockError; - -/// Receive an [`IndexedBlock`] from a request. -#[cfg(feature = "filter-control")] -pub type BlockReceiver = tokio::sync::oneshot::Receiver>; - -const THIRTY_MINS: u64 = 60 * 30; - -// This struct detects for stale tips and requests headers if no blocks were found after 30 minutes of wait time. -pub(crate) struct LastBlockMonitor { - last_block: Option, -} - -impl LastBlockMonitor { - pub(crate) fn new() -> Self { - Self { last_block: None } - } - - pub(crate) fn reset(&mut self) { - self.last_block = Some(Instant::now()) - } - - pub(crate) fn stale(&self) -> bool { - if let Some(time) = self.last_block { - return Instant::now().duration_since(time) > Duration::from_secs(THIRTY_MINS); - } - false - } -} - -/// Should the node immediately download filters or wait for a command -#[derive(Debug, Default)] -pub enum FilterSyncPolicy { - /// The node will wait for an explicit command to start downloading and checking filters - Halt, - /// Filters are downloaded immediately after CBF headers are synced. - #[default] - Continue, -} - -#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] -pub(crate) struct PeerTimeoutConfig { - pub(crate) response_timeout: Duration, - pub(crate) max_connection_time: Duration, -} - -impl PeerTimeoutConfig { - fn new(response_timeout: Duration, max_connection_time: Duration) -> Self { - Self { - response_timeout, - max_connection_time, - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(crate) struct PeerId(u32); - -impl PeerId { - pub(crate) fn increment(&mut self) { - self.0 = self.0.wrapping_add(1) - } -} - -impl From for PeerId { - fn from(value: u32) -> Self { - PeerId(value) - } -} - -impl std::fmt::Display for PeerId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Peer {}", self.0) - } -} diff --git a/src/core/dialog.rs b/src/dialog.rs similarity index 100% rename from src/core/dialog.rs rename to src/dialog.rs diff --git a/src/core/error.rs b/src/error.rs similarity index 100% rename from src/core/error.rs rename to src/error.rs diff --git a/src/lib.rs b/src/lib.rs index f840de16..40760bc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,10 +71,6 @@ //! } //! ``` //! -//! # Getting started -//! -//! The [`core`] module documentation is likely the best place to start when developing an application with Kyoto. -//! //! # Features //! //! `database`: use the default `rusqlite` database implementations. Default and recommend feature. @@ -85,12 +81,34 @@ #![warn(missing_docs)] pub mod chain; -pub mod core; pub mod db; mod filters; mod network; mod prelude; + +mod broadcaster; +/// Convenient way to build a compact filters node. +pub mod builder; +pub(crate) mod channel_messages; +/// Structures to communicate with a node. +pub mod client; +/// Node configuration options. +pub(crate) mod config; +pub(crate) mod dialog; +/// Errors associated with a node. +pub mod error; +/// Messages the node may send a client. +pub mod messages; +/// The structure that communicates with the Bitcoin P2P network and collects data. +pub mod node; + +/// Receive an [`IndexedBlock`] from a request. +#[cfg(feature = "filter-control")] +pub type BlockReceiver = tokio::sync::oneshot::Receiver>; + +#[cfg(feature = "filter-control")] +use crate::error::FetchBlockError; #[cfg(feature = "filter-control")] use filters::Filter; #[cfg(feature = "filter-control")] @@ -123,11 +141,12 @@ pub use tokio::sync::mpsc::UnboundedReceiver; #[doc(inline)] pub use { - crate::core::builder::NodeBuilder, - crate::core::client::{Client, Requester}, - crate::core::error::{ClientError, NodeError}, - crate::core::messages::{Event, Log, Progress, RejectPayload, SyncUpdate, Warning}, - crate::core::node::{Node, NodeState}, + crate::builder::NodeBuilder, + crate::client::{Client, Requester}, + crate::error::{ClientError, NodeError}, + crate::messages::{Event, Log, Progress, RejectPayload, SyncUpdate, Warning}, + crate::network::PeerTimeoutConfig, + crate::node::{Node, NodeState}, }; #[doc(inline)] @@ -439,6 +458,16 @@ pub enum LogLevel { Warning, } +/// Should the node immediately download filters or wait for a command +#[derive(Debug, Default)] +pub enum FilterSyncPolicy { + /// The node will wait for an explicit command to start downloading and checking filters + Halt, + /// Filters are downloaded immediately after CBF headers are synced. + #[default] + Continue, +} + macro_rules! log { ($dialog:expr, $expr:expr) => { match $dialog.log_level { diff --git a/src/core/messages.rs b/src/messages.rs similarity index 99% rename from src/core/messages.rs rename to src/messages.rs index 79a5f970..63205dc0 100644 --- a/src/core/messages.rs +++ b/src/messages.rs @@ -151,7 +151,7 @@ pub(crate) enum ClientMessage { AddScript(ScriptBuf), /// Starting at the configured anchor checkpoint, look for block inclusions with newly added scripts. Rescan, - /// If the [`FilterSyncPolicy`](crate) is set to `Halt`, issuing this command will + /// If the [`FilterSyncPolicy`] is set to `Halt`, issuing this command will /// start the filter download and checking process. Otherwise, this command will not have any effect /// on node operation. ContinueDownload, diff --git a/src/network/mod.rs b/src/network/mod.rs index 66e33b01..710c32fd 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -3,6 +3,7 @@ use bitcoin::{ io::Read, p2p::{message::CommandString, Magic}, }; +use std::time::Duration; pub(crate) mod counter; pub(crate) mod dns; @@ -11,12 +12,53 @@ pub(crate) mod error; pub(crate) mod outbound_messages; pub(crate) mod parsers; pub(crate) mod peer; +pub(crate) mod peer_map; #[allow(dead_code)] pub(crate) mod reader; #[cfg(feature = "tor")] pub(crate) mod tor; pub(crate) mod traits; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct PeerId(pub(crate) u32); + +impl PeerId { + pub(crate) fn increment(&mut self) { + self.0 = self.0.wrapping_add(1) + } +} + +impl From for PeerId { + fn from(value: u32) -> Self { + PeerId(value) + } +} + +impl std::fmt::Display for PeerId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Peer {}", self.0) + } +} + +/// Configuration for peer connection timeouts +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] +pub struct PeerTimeoutConfig { + /// How long to wait for a peer to respond to a request + pub(crate) response_timeout: Duration, + /// Maximum time to maintain a connection with a peer + pub(crate) max_connection_time: Duration, +} + +impl PeerTimeoutConfig { + /// Create a new peer timeout configuration + pub fn new(response_timeout: Duration, max_connection_time: Duration) -> Self { + Self { + response_timeout, + max_connection_time, + } + } +} + pub const PROTOCOL_VERSION: u32 = 70016; pub const KYOTO_VERSION: &str = "0.8.0"; pub const RUST_BITCOIN_VERSION: &str = "0.32.4"; diff --git a/src/network/outbound_messages.rs b/src/network/outbound_messages.rs index 72994a1a..2273a1b4 100644 --- a/src/network/outbound_messages.rs +++ b/src/network/outbound_messages.rs @@ -17,7 +17,7 @@ use bitcoin::{ BlockHash, Network, Transaction, Wtxid, }; -use crate::{core::channel_messages::GetBlockConfig, prelude::default_port_from_network}; +use crate::{channel_messages::GetBlockConfig, prelude::default_port_from_network}; use super::{ error::PeerError, traits::MessageGenerator, KYOTO_VERSION, PROTOCOL_VERSION, diff --git a/src/network/peer.rs b/src/network/peer.rs index 7cee154a..bb2a5be9 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -14,13 +14,12 @@ use tokio::{ }; use crate::{ - core::{ + network::outbound_messages::V1OutboundMessage, + { channel_messages::{MainThreadMessage, PeerMessage, PeerThreadMessage}, dialog::Dialog, messages::Warning, - PeerId, PeerTimeoutConfig, }, - network::outbound_messages::V1OutboundMessage, }; use super::{ @@ -29,6 +28,7 @@ use super::{ parsers::V1MessageParser, reader::Reader, traits::{MessageGenerator, StreamReader, StreamWriter}, + PeerId, PeerTimeoutConfig, }; #[cfg(not(feature = "tor"))] diff --git a/src/core/peer_map.rs b/src/network/peer_map.rs similarity index 99% rename from src/core/peer_map.rs rename to src/network/peer_map.rs index 88852f34..fcc5d8ff 100644 --- a/src/core/peer_map.rs +++ b/src/network/peer_map.rs @@ -21,23 +21,19 @@ use tokio::{ use crate::{ chain::HeightMonitor, + channel_messages::{CombinedAddr, MainThreadMessage, PeerThreadMessage}, db::{traits::PeerStore, PeerStatus, PersistedPeer}, + dialog::Dialog, + error::PeerManagerError, network::{ dns::DnsResolver, error::PeerError, peer::Peer, traits::{ClearNetConnection, NetworkConnector}, + PeerId, PeerTimeoutConfig, }, prelude::{default_port_from_network, Median, Netgroup}, - ConnectionType, PeerStoreSizeConfig, TrustedPeer, -}; - -use super::{ - channel_messages::{CombinedAddr, MainThreadMessage, PeerThreadMessage}, - dialog::Dialog, - error::PeerManagerError, - messages::Warning, - PeerId, PeerTimeoutConfig, + ConnectionType, PeerStoreSizeConfig, TrustedPeer, Warning, }; const MAX_TRIES: usize = 50; diff --git a/src/network/reader.rs b/src/network/reader.rs index 9328cd71..42bd7773 100644 --- a/src/network/reader.rs +++ b/src/network/reader.rs @@ -5,8 +5,8 @@ use bitcoin::p2p::{message::NetworkMessage, message_blockdata::Inventory, Servic use bitcoin::{FeeRate, Txid}; use tokio::sync::mpsc::Sender; -use crate::core::channel_messages::{CombinedAddr, PeerMessage}; -use crate::core::messages::RejectPayload; +use crate::channel_messages::{CombinedAddr, PeerMessage}; +use crate::messages::RejectPayload; use super::error::PeerReadError; use super::traits::MessageParser; diff --git a/src/network/traits.rs b/src/network/traits.rs index dcf6ff10..c456ef5e 100644 --- a/src/network/traits.rs +++ b/src/network/traits.rs @@ -15,7 +15,7 @@ use tokio::{ sync::Mutex, }; -use crate::{core::channel_messages::GetBlockConfig, prelude::FutureResult}; +use crate::{channel_messages::GetBlockConfig, prelude::FutureResult}; use super::error::{PeerError, PeerReadError}; diff --git a/src/core/node.rs b/src/node.rs similarity index 97% rename from src/core/node.rs rename to src/node.rs index 407f8913..42aa77db 100644 --- a/src/core/node.rs +++ b/src/node.rs @@ -9,11 +9,14 @@ use bitcoin::{ }, Block, BlockHash, Network, ScriptBuf, }; -use tokio::sync::{mpsc::Receiver, Mutex, RwLock}; use tokio::{ select, sync::mpsc::{self}, }; +use tokio::{ + sync::{mpsc::Receiver, Mutex, RwLock}, + time::Instant, +}; use crate::{ chain::{ @@ -22,10 +25,11 @@ use crate::{ error::HeaderSyncError, HeightMonitor, }, - core::{error::FetchHeaderError, peer_map::PeerMap}, db::traits::{HeaderStore, PeerStore}, + error::FetchHeaderError, filters::{cfheader_chain::AppendAttempt, error::CFilterSyncError}, - RejectPayload, TxBroadcastPolicy, + network::{peer_map::PeerMap, PeerId, PeerTimeoutConfig}, + FilterSyncPolicy, RejectPayload, TxBroadcastPolicy, }; use super::{ @@ -39,14 +43,36 @@ use super::{ dialog::Dialog, error::NodeError, messages::{ClientMessage, Event, Log, SyncUpdate, Warning}, - FilterSyncPolicy, LastBlockMonitor, PeerId, PeerTimeoutConfig, }; pub(crate) const WTXID_VERSION: u32 = 70016; const LOOP_TIMEOUT: u64 = 1; +const THIRTY_MINS: u64 = 60 * 30; type PeerRequirement = usize; +// This struct detects for stale tips and requests headers if no blocks were found after 30 minutes of wait time. +pub(crate) struct LastBlockMonitor { + last_block: Option, +} + +impl LastBlockMonitor { + pub(crate) fn new() -> Self { + Self { last_block: None } + } + + pub(crate) fn reset(&mut self) { + self.last_block = Some(Instant::now()) + } + + pub(crate) fn stale(&self) -> bool { + if let Some(time) = self.last_block { + return Instant::now().duration_since(time) > Duration::from_secs(THIRTY_MINS); + } + false + } +} + /// The state of the node with respect to connected peers. #[derive(Debug, Clone, Copy)] pub enum NodeState { diff --git a/tests/core.rs b/tests/core.rs index 42a15767..c1ff072f 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -11,9 +11,9 @@ use corepc_node::serde_json; use corepc_node::{anyhow, exe_path}; use kyoto::{ chain::checkpoints::HeaderCheckpoint, - core::{client::Client, node::Node}, BlockHash, Event, Log, NodeState, ServiceFlags, SqliteHeaderDb, SqlitePeerDb, TrustedPeer, Warning, + {client::Client, node::Node}, }; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::UnboundedReceiver; @@ -48,7 +48,7 @@ fn new_node_sql( let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); let mut trusted: TrustedPeer = host.into(); trusted.set_services(ServiceFlags::P2P_V2); - let builder = kyoto::core::builder::NodeBuilder::new(bitcoin::Network::Regtest); + let builder = kyoto::builder::NodeBuilder::new(bitcoin::Network::Regtest); let (node, client) = builder .add_peer(host) .add_scripts(addrs) @@ -67,7 +67,7 @@ fn new_node_anchor_sql( let addr = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); let mut trusted: TrustedPeer = addr.into(); trusted.set_services(ServiceFlags::P2P_V2); - let builder = kyoto::core::builder::NodeBuilder::new(bitcoin::Network::Regtest); + let builder = kyoto::builder::NodeBuilder::new(bitcoin::Network::Regtest); let (node, client) = builder .add_peer(trusted) .add_scripts(addrs) @@ -171,12 +171,12 @@ async fn live_reorg() { // Make sure the reorg was caught while let Some(message) = channel.recv().await { match message { - kyoto::core::messages::Event::BlocksDisconnected(blocks) => { + kyoto::messages::Event::BlocksDisconnected(blocks) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height); } - kyoto::core::messages::Event::Synced(update) => { + kyoto::messages::Event::Synced(update) => { assert_eq!(update.tip().hash, best); requester.shutdown().await.unwrap(); break; @@ -228,12 +228,12 @@ async fn live_reorg_additional_sync() { // Make sure the reorg was caught while let Some(message) = channel.recv().await { match message { - kyoto::core::messages::Event::BlocksDisconnected(blocks) => { + kyoto::messages::Event::BlocksDisconnected(blocks) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height); } - kyoto::core::messages::Event::Synced(update) => { + kyoto::messages::Event::Synced(update) => { assert_eq!(update.tip().hash, best); break; } @@ -336,12 +336,12 @@ async fn stop_reorg_resync() { // Make sure the reorganization is caught after a cold start while let Some(message) = channel.recv().await { match message { - kyoto::core::messages::Event::BlocksDisconnected(blocks) => { + kyoto::messages::Event::BlocksDisconnected(blocks) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height); } - kyoto::core::messages::Event::Synced(update) => { + kyoto::messages::Event::Synced(update) => { println!("Done"); assert_eq!(update.tip().hash, best); break; @@ -420,12 +420,12 @@ async fn stop_reorg_two_resync() { let handle = tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await }); while let Some(message) = channel.recv().await { match message { - kyoto::core::messages::Event::BlocksDisconnected(blocks) => { + kyoto::messages::Event::BlocksDisconnected(blocks) => { assert_eq!(blocks.len(), 2); assert_eq!(blocks.last().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.last().unwrap().height); } - kyoto::core::messages::Event::Synced(update) => { + kyoto::messages::Event::Synced(update) => { println!("Done"); assert_eq!(update.tip().hash, best); break; @@ -507,12 +507,12 @@ async fn stop_reorg_start_on_orphan() { // Ensure SQL is able to catch the fork by loading in headers from the database while let Some(message) = channel.recv().await { match message { - kyoto::core::messages::Event::BlocksDisconnected(blocks) => { + kyoto::messages::Event::BlocksDisconnected(blocks) => { assert_eq!(blocks.len(), 1); assert_eq!(blocks.first().unwrap().header.block_hash(), old_best); assert_eq!(old_height as u32, blocks.first().unwrap().height); } - kyoto::core::messages::Event::Synced(update) => { + kyoto::messages::Event::Synced(update) => { println!("Done"); assert_eq!(update.tip().hash, best); break; @@ -592,7 +592,7 @@ async fn halting_download_works() { scripts.insert(other.into()); let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); - let builder = kyoto::core::builder::NodeBuilder::new(bitcoin::Network::Regtest); + let builder = kyoto::builder::NodeBuilder::new(bitcoin::Network::Regtest); let (node, client) = builder .add_peers(vec![host.into()]) .add_scripts(scripts) @@ -624,7 +624,7 @@ async fn halting_download_works() { } } while let Some(message) = channel.recv().await { - if let kyoto::core::messages::Event::Synced(update) = message { + if let kyoto::messages::Event::Synced(update) = message { println!("Done"); assert_eq!(update.tip().hash, best); break; @@ -645,7 +645,7 @@ async fn signet_syncs() { let mut set = HashSet::new(); set.insert(address); let host = (IpAddr::from(Ipv4Addr::new(68, 47, 229, 218)), None); - let builder = kyoto::core::builder::NodeBuilder::new(bitcoin::Network::Signet); + let builder = kyoto::builder::NodeBuilder::new(bitcoin::Network::Signet); let (node, client) = builder .add_peer(host) .add_scripts(set)