From 13d14beb3a84df9b743d7f9604618f0bb50980bd Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Wed, 18 Feb 2026 09:02:06 +0000 Subject: [PATCH 1/4] Move `Node` into `Client` First step in cleaning up the `Node`/`Client` relationship. This abstracts running a `task`, which will require some new methods to run with more flexibility (bring your own runtime or use an OS thread). The panic introduced will be impossible to hit in a later commit where I will introduce sealed traits. --- examples/bitcoin.rs | 7 ++-- examples/signet.rs | 5 +-- src/builder.rs | 8 ++--- src/client.rs | 13 ++++++++ src/lib.rs | 6 ++-- src/node.rs | 27 +++++++-------- tests/core.rs | 80 +++++++++++++++++++++++++-------------------- 7 files changed, 83 insertions(+), 63 deletions(-) diff --git a/examples/bitcoin.rs b/examples/bitcoin.rs index b98439ed..1f813dbd 100644 --- a/examples/bitcoin.rs +++ b/examples/bitcoin.rs @@ -23,7 +23,7 @@ async fn main() { // Create a new node builder let builder = Builder::new(NETWORK); // Add node preferences and build the node/client - let (node, client) = builder + let client = builder // The number of connections we would like to maintain .required_peers(2) // Only scan for taproot scripts @@ -34,8 +34,8 @@ async fn main() { .add_peers(seeds.into_iter().map(From::from)) // Create the node and client .build(); - // Run the node on a separate task - tokio::task::spawn(async move { node.run().await }); + + let client = client.run(); // Split the client into components that send messages and listen to messages. // With this construction, different parts of the program can take ownership of // specific tasks. @@ -44,6 +44,7 @@ async fn main() { mut info_rx, mut warn_rx, mut event_rx, + .. } = client; // Continually listen for events until the node is synced to its peers. loop { diff --git a/examples/signet.rs b/examples/signet.rs index 64128790..17b97442 100644 --- a/examples/signet.rs +++ b/examples/signet.rs @@ -32,7 +32,7 @@ async fn main() { // Create a new node builder let builder = Builder::new(NETWORK); // Add node preferences and build the node/client - let (node, client) = builder + let client = builder // Only scan blocks strictly after a checkpoint .chain_state(ChainState::Checkpoint(checkpoint)) // The number of connections we would like to maintain @@ -40,13 +40,14 @@ async fn main() { // Create the node and client .build(); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, mut info_rx, mut warn_rx, mut event_rx, + .. } = client; // Continually listen for events until the node is synced to its peers. diff --git a/src/builder.rs b/src/builder.rs index f1fab3f2..9504ec3e 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -25,7 +25,7 @@ const MAX_PEERS: u8 = 15; /// /// let host = (IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), None); /// let builder = Builder::new(Network::Regtest); -/// let (node, client) = builder +/// let client = builder /// .add_peers(vec![host.into()]) /// .build(); /// ``` @@ -138,8 +138,8 @@ impl Builder { self } - /// Consume the node builder and receive a [`Node`] and [`Client`]. - pub fn build(mut self) -> (Node, Client) { - Node::new(self.network, core::mem::take(&mut self.config)) + /// Consume the node builder and receive a [`Client`]. + pub fn build(mut self) -> Client { + Node::build(self.network, core::mem::take(&mut self.config)) } } diff --git a/src/client.rs b/src/client.rs index 7eda3bf5..4b7ba5ba 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,6 +6,7 @@ use tokio::sync::oneshot; use crate::chain::block_subsidy; use crate::messages::ClientRequest; +use crate::node::Node; use crate::{Event, Info, TrustedPeer, Warning}; use super::{error::ClientError, messages::ClientMessage}; @@ -22,6 +23,8 @@ pub struct Client { pub warn_rx: mpsc::UnboundedReceiver, /// Receive [`Event`] from a node to act on. pub event_rx: mpsc::UnboundedReceiver, + /// Internal node structure. + node: Option, } impl Client { @@ -30,14 +33,24 @@ impl Client { warn_rx: mpsc::UnboundedReceiver, event_rx: mpsc::UnboundedReceiver, ntx: UnboundedSender, + node: Node, ) -> Self { Self { requester: Requester::new(ntx), info_rx, warn_rx, event_rx, + node: Some(node), } } + + /// Start the underlying node on a [`tokio::task`]. This assumes there is a runtime present to + /// execute the task. + pub fn run(mut self) -> Self { + let node = core::mem::take(&mut self.node).expect("cannot call run twice."); + tokio::task::spawn(async move { node.run().await }); + self + } } /// Send messages to a node that is running so the node may complete a task. diff --git a/src/lib.rs b/src/lib.rs index de58f6f1..75597509 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,14 +18,14 @@ //! // Create a new node builder //! let builder = Builder::new(Network::Signet); //! // Add node preferences and build the node/client -//! let (node, client) = builder +//! let client = builder //! // The number of connections we would like to maintain //! .required_peers(2) //! .build(); //! // Run the node and wait for the sync message; -//! tokio::task::spawn(async move { node.run().await }); +//! let client = client.run(); //! // Split the client into components that send messages and listen to messages -//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx } = client; +//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx, .. } = client; //! loop { //! if let Some(event) = event_rx.recv().await { //! match event { diff --git a/src/node.rs b/src/node.rs index 9258770e..24d3463e 100644 --- a/src/node.rs +++ b/src/node.rs @@ -66,7 +66,7 @@ pub struct Node { } impl Node { - pub(crate) fn new(network: Network, config: Config) -> (Self, Client) { + pub(crate) fn build(network: Network, config: Config) -> Client { let Config { required_peers, white_list, @@ -81,7 +81,6 @@ impl Node { let (warn_tx, warn_rx) = mpsc::unbounded_channel::(); let (event_tx, event_rx) = mpsc::unbounded_channel::(); let (ctx, crx) = mpsc::unbounded_channel::(); - let client = Client::new(info_rx, warn_rx, event_rx, ctx); // A structured way to talk to the client let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx)); // We always assume we are behind @@ -110,19 +109,17 @@ impl Node { required_peers, filter_type, ); - ( - Self { - state, - chain, - peer_map, - required_peers: required_peers.into(), - dialog, - block_queue: BlockQueue::new(), - client_recv: crx, - peer_recv: mrx, - }, - client, - ) + let node = Self { + state, + chain, + peer_map, + required_peers: required_peers.into(), + dialog, + block_queue: BlockQueue::new(), + client_recv: crx, + peer_recv: mrx, + }; + Client::new(info_rx, warn_rx, event_rx, ctx, node) } /// Run the node continuously. Typically run on a separate thread than the underlying application. diff --git a/tests/core.rs b/tests/core.rs index 00dafacf..b13d6777 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -7,7 +7,6 @@ use std::{ use bip157::{ chain::{checkpoints::HeaderCheckpoint, BlockHeaderChanges, ChainState}, client::Client, - node::Node, Address, BlockHash, Event, Info, ServiceFlags, Transaction, TrustedPeer, Warning, }; use bitcoin::{ @@ -49,18 +48,13 @@ fn start_bitcoind(with_v2_transport: bool) -> anyhow::Result<(corepc_node::Node, Ok((bitcoind, socket_addr)) } -fn new_node( - socket_addr: SocketAddrV4, - tempdir_path: PathBuf, - chain_state: ChainState, -) -> (Node, Client) { +fn new_node(socket_addr: SocketAddrV4, tempdir_path: PathBuf, chain_state: ChainState) -> Client { let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); let mut trusted: TrustedPeer = host.into(); trusted.set_services(ServiceFlags::P2P_V2); let builder = bip157::builder::Builder::new(bitcoin::Network::Regtest); let builder = builder.chain_state(chain_state); - let (node, client) = builder.add_peer(host).data_dir(tempdir_path).build(); - (node, client) + builder.add_peer(host).data_dir(tempdir_path).build() } fn num_blocks(rpc: &corepc_node::Client) -> i64 { @@ -127,17 +121,18 @@ async fn live_reorg() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; @@ -179,17 +174,18 @@ async fn live_reorg_additional_sync() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; @@ -233,17 +229,18 @@ async fn various_client_methods() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 500, 15).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; @@ -262,17 +259,18 @@ async fn stop_reorg_resync() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; @@ -284,17 +282,18 @@ async fn stop_reorg_resync() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Spin up the node on a cold start - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // Make sure the reorganization is caught after a cold start @@ -322,17 +321,18 @@ async fn stop_reorg_resync() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the node does not have any corrupted headers - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg @@ -350,17 +350,18 @@ async fn stop_reorg_two_resync() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; @@ -375,17 +376,18 @@ async fn stop_reorg_two_resync() { let best = best_hash(rpc); drop(handle); // Make sure the reorganization is caught after a cold start - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); while let Some(message) = channel.recv().await { @@ -412,17 +414,18 @@ async fn stop_reorg_two_resync() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the node does not have any corrupted headers - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg @@ -439,17 +442,18 @@ async fn stop_reorg_start_on_orphan() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 17, 3).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; @@ -462,17 +466,18 @@ async fn stop_reorg_start_on_orphan() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Spin up the node on a cold start with a stale tip - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); let mut headers = Vec::new(); @@ -501,17 +506,18 @@ async fn stop_reorg_start_on_orphan() { drop(handle); requester.shutdown().unwrap(); let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Snapshot(headers.clone()), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg @@ -521,13 +527,14 @@ async fn stop_reorg_start_on_orphan() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the node does not have any corrupted headers - let (node, client) = new_node(socket_addr, tempdir, ChainState::Snapshot(headers.clone())); - tokio::task::spawn(async move { node.run().await }); + let client = new_node(socket_addr, tempdir, ChainState::Snapshot(headers.clone())); + let client = client.run(); let Client { requester, info_rx, warn_rx, event_rx: mut channel, + .. } = client; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg @@ -611,17 +618,18 @@ async fn tx_can_broadcast() { let tx = sighasher.into_transaction().to_owned(); println!("Signed transaction"); // Build a node to broadcast the transaction - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); + let client = client.run(); let Client { requester, info_rx: _, warn_rx: _, event_rx: _, + .. } = client; tokio::time::timeout(Duration::from_secs(60), requester.broadcast_tx(tx)) .await From 03b1a9cc1fe3db3171685a6e1458b0bdf3b02621 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Wed, 18 Feb 2026 09:09:36 +0000 Subject: [PATCH 2/4] Make `Node` a crate-only type --- src/lib.rs | 5 ++--- src/node.rs | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 75597509..5ec8a1fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,8 +57,8 @@ pub mod client; pub mod error; /// Messages the node may send a client. pub mod messages; -/// The structure that communicates with the Bitcoin P2P network and collects data. -pub mod node; +// The structure that communicates with the Bitcoin P2P network and collects data. +mod node; use chain::Filter; @@ -84,7 +84,6 @@ pub use { crate::client::{Client, Requester}, crate::error::{ClientError, NodeError}, crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning}, - crate::node::Node, }; #[doc(inline)] diff --git a/src/node.rs b/src/node.rs index 24d3463e..e5fc60e2 100644 --- a/src/node.rs +++ b/src/node.rs @@ -54,7 +54,7 @@ type PeerRequirement = usize; /// A compact block filter node. Nodes download Bitcoin block headers, block filters, and blocks to send relevant events to a client. #[derive(Debug)] -pub struct Node { +pub(crate) struct Node { state: NodeState, chain: Chain, peer_map: PeerMap, @@ -127,7 +127,7 @@ impl Node { /// # Errors /// /// If the node has exhausted all options to find connections. - pub async fn run(mut self) -> Result<(), NodeError> { + pub(crate) async fn run(mut self) -> Result<(), NodeError> { crate::debug!("Starting node"); crate::debug!(format!( "Configured connection requirement: {} peers", From bb7e673d2744d3b864b256c8fae07ff33cfe2dfa Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Wed, 18 Feb 2026 09:15:36 +0000 Subject: [PATCH 3/4] Add flexible options to run the node This is particularly useful in the native language bindings case. ref: https://github.com/bitcoindevkit/bdk-ffi/blob/master/bdk-ffi/src/kyoto.rs#L68 --- src/client.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/client.rs b/src/client.rs index 4b7ba5ba..6ca5ab49 100644 --- a/src/client.rs +++ b/src/client.rs @@ -51,6 +51,32 @@ impl Client { tokio::task::spawn(async move { node.run().await }); self } + + /// Run on a detached operating system thread. This method is useful in the case where the + /// majority of your application code is blocking, and you do not have a + /// [`tokio::runtime::Runtime`] available. This method will implicitly create a runtime which + /// runs the data fetching process. + pub fn run_detached(mut self) -> Self { + let node = core::mem::take(&mut self.node).expect("cannot call run twice."); + std::thread::spawn(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { + let _ = node.run().await; + }) + }); + self + } + + /// Run the node with an existing [`tokio::runtime::Runtime`]. + pub fn run_with_runtime(mut self, rt: impl AsRef) -> Self { + let rt = rt.as_ref(); + let node = core::mem::take(&mut self.node).expect("cannot call run twice."); + rt.spawn(async move { node.run().await }); + self + } } /// Send messages to a node that is running so the node may complete a task. From 373ecc523e2b29e8d8fbb8216e41b3ce40d08281 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Wed, 18 Feb 2026 09:32:49 +0000 Subject: [PATCH 4/4] Enforce client semantics with state Uses the sealed trait hack to only allow for compiler enforced state transactions. There is first a `run` step, followed by the typical methods a client would expect. I decided to scrap the `Requester` and instead encapsulate the event receivers with a newtype. Running a client gives access to the event receivers as well as the handle to the task. The only downside I see with this is `&mut self` doesn't seem to be possible, as each impl block is defined for it's `State` type. I need to fiddle with that to see if we can't remove the need to return the client each state transition. --- examples/bitcoin.rs | 18 ++-- examples/signet.rs | 17 ++-- src/builder.rs | 3 +- src/client.rs | 113 +++++++++++++++------ src/lib.rs | 13 +-- src/node.rs | 3 +- tests/core.rs | 236 +++++++++++++++++++------------------------- 7 files changed, 212 insertions(+), 191 deletions(-) diff --git a/examples/bitcoin.rs b/examples/bitcoin.rs index 1f813dbd..e3827dd3 100644 --- a/examples/bitcoin.rs +++ b/examples/bitcoin.rs @@ -3,7 +3,8 @@ use bip157::builder::Builder; use bip157::chain::{BlockHeaderChanges, ChainState}; -use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf}; +use bip157::client::EventListeners; +use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf}; use std::collections::HashSet; use tokio::time::Instant; @@ -35,17 +36,16 @@ async fn main() { // Create the node and client .build(); - let client = client.run(); // Split the client into components that send messages and listen to messages. // With this construction, different parts of the program can take ownership of // specific tasks. - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { mut info_rx, mut warn_rx, mut event_rx, - .. - } = client; + } = events; + // Continually listen for events until the node is synced to its peers. loop { tokio::select! { @@ -55,11 +55,11 @@ async fn main() { Event::FiltersSynced(update) => { tracing::info!("Chain tip: {}",update.tip().hash); // Request information from the node - let fee = requester.broadcast_min_feerate().await.unwrap(); + let fee = client.broadcast_min_feerate().await.unwrap(); tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee); let sync_time = now.elapsed().as_secs_f32(); tracing::info!("Total sync time: {sync_time} seconds"); - let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap(); + let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap(); tracing::info!("Last block average fee rate: {:#}", avg_fee_rate); break; }, @@ -82,6 +82,6 @@ async fn main() { } } } - let _ = requester.shutdown(); + let _ = client.shutdown(); tracing::info!("Shutting down"); } diff --git a/examples/signet.rs b/examples/signet.rs index 17b97442..8019f8e7 100644 --- a/examples/signet.rs +++ b/examples/signet.rs @@ -3,8 +3,8 @@ use bip157::chain::{BlockHeaderChanges, ChainState}; use bip157::messages::Event; -use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client}; -use bip157::{Address, BlockHash, Network}; +use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint}; +use bip157::{Address, BlockHash, EventListeners, Network}; use std::collections::HashSet; use std::str::FromStr; @@ -40,15 +40,12 @@ async fn main() { // Create the node and client .build(); - let client = client.run(); - - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { mut info_rx, mut warn_rx, mut event_rx, - .. - } = client; + } = events; // Continually listen for events until the node is synced to its peers. loop { @@ -72,7 +69,7 @@ async fn main() { if filter.contains_any(addresses.iter()) { let hash = filter.block_hash(); tracing::info!("Found script at {}!", hash); - let indexed_block = requester.get_block(hash).await.unwrap(); + let indexed_block = client.get_block(hash).await.unwrap(); let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid(); tracing::info!("Coinbase transaction ID: {}", coinbase); break; @@ -88,6 +85,6 @@ async fn main() { } } } - let _ = requester.shutdown(); + let _ = client.shutdown(); tracing::info!("Shutting down"); } diff --git a/src/builder.rs b/src/builder.rs index 9504ec3e..a24dd317 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -5,6 +5,7 @@ use bitcoin::Network; use super::{client::Client, node::Node}; use crate::chain::ChainState; +use crate::client::Idle; use crate::network::ConnectionType; use crate::TrustedPeer; use crate::{Config, FilterType}; @@ -139,7 +140,7 @@ impl Builder { } /// Consume the node builder and receive a [`Client`]. - pub fn build(mut self) -> Client { + pub fn build(mut self) -> Client { Node::build(self.network, core::mem::take(&mut self.config)) } } diff --git a/src/client.rs b/src/client.rs index 6ca5ab49..20537541 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,51 +12,100 @@ use crate::{Event, Info, TrustedPeer, Warning}; use super::{error::ClientError, messages::ClientMessage}; use super::{error::FetchBlockError, IndexedBlock}; -/// A [`Client`] allows for communication with a running node. +/// Client state when idle. +pub struct Idle; +/// Client state when active. +pub struct Active; + +mod sealed { + pub trait Sealed {} +} + +impl sealed::Sealed for Idle {} +impl sealed::Sealed for Active {} + +/// State of the client +pub trait State: sealed::Sealed {} + +impl State for Idle {} +impl State for Active {} + +/// Wrapper type for the channels that will receive events. #[derive(Debug)] -pub struct Client { - /// Send events to a node, such as broadcasting a transaction. - pub requester: Requester, +pub struct EventListeners { /// Receive informational messages from the node. pub info_rx: mpsc::Receiver, /// Receive warning messages from a node. pub warn_rx: mpsc::UnboundedReceiver, /// Receive [`Event`] from a node to act on. pub event_rx: mpsc::UnboundedReceiver, - /// Internal node structure. - node: Option, } -impl Client { - pub(crate) fn new( +impl EventListeners { + fn new( info_rx: mpsc::Receiver, warn_rx: mpsc::UnboundedReceiver, event_rx: mpsc::UnboundedReceiver, - ntx: UnboundedSender, - node: Node, ) -> Self { Self { - requester: Requester::new(ntx), info_rx, warn_rx, event_rx, - node: Some(node), } } +} + +/// A [`Client`] allows for communication with a running node. +#[derive(Debug)] +pub struct Client { + /// Send events to a node, such as broadcasting a transaction. + ntx: UnboundedSender, + /// Receive informational messages from the node. + events: Option, + /// Internal node structure. + node: Option, + /// Marker for state. + _marker: core::marker::PhantomData, +} +impl Client { + pub(crate) fn new( + info_rx: mpsc::Receiver, + warn_rx: mpsc::UnboundedReceiver, + event_rx: mpsc::UnboundedReceiver, + ntx: UnboundedSender, + node: Node, + ) -> Client { + Client { + ntx, + events: Some(EventListeners::new(info_rx, warn_rx, event_rx)), + node: Some(node), + _marker: core::marker::PhantomData, + } + } /// Start the underlying node on a [`tokio::task`]. This assumes there is a runtime present to /// execute the task. - pub fn run(mut self) -> Self { + pub fn run(mut self) -> (Client, EventListeners) { + let events = core::mem::take(&mut self.events).expect("cannot call run twice."); let node = core::mem::take(&mut self.node).expect("cannot call run twice."); tokio::task::spawn(async move { node.run().await }); - self + ( + Client { + ntx: self.ntx, + events: None, + node: None, + _marker: core::marker::PhantomData, + }, + events, + ) } /// Run on a detached operating system thread. This method is useful in the case where the /// majority of your application code is blocking, and you do not have a /// [`tokio::runtime::Runtime`] available. This method will implicitly create a runtime which /// runs the data fetching process. - pub fn run_detached(mut self) -> Self { + pub fn run_detached(mut self) -> (Client, EventListeners) { + let events = core::mem::take(&mut self.events).expect("cannot call run twice."); let node = core::mem::take(&mut self.node).expect("cannot call run twice."); std::thread::spawn(|| { tokio::runtime::Builder::new_multi_thread() @@ -67,29 +116,35 @@ impl Client { let _ = node.run().await; }) }); - self + let client = Client { + ntx: self.ntx, + events: None, + node: None, + _marker: core::marker::PhantomData, + }; + (client, events) } /// Run the node with an existing [`tokio::runtime::Runtime`]. - pub fn run_with_runtime(mut self, rt: impl AsRef) -> Self { + pub fn run_with_runtime( + mut self, + rt: impl AsRef, + ) -> (Client, EventListeners) { let rt = rt.as_ref(); + let events = core::mem::take(&mut self.events).expect("cannot call run twice."); let node = core::mem::take(&mut self.node).expect("cannot call run twice."); rt.spawn(async move { node.run().await }); - self + let client = Client { + ntx: self.ntx, + events: None, + node: None, + _marker: core::marker::PhantomData, + }; + (client, events) } } -/// Send messages to a node that is running so the node may complete a task. -#[derive(Debug, Clone)] -pub struct Requester { - ntx: UnboundedSender, -} - -impl Requester { - fn new(ntx: UnboundedSender) -> Self { - Self { ntx } - } - +impl Client { /// Tell the node to shut down. /// /// # Errors diff --git a/src/lib.rs b/src/lib.rs index 5ec8a1fa..888f6d3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ //! # Example usage //! //! ```no_run -//! use bip157::{Builder, Event, Client, Network, BlockHash}; +//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash}; //! //! #[tokio::main] //! async fn main() { @@ -22,10 +22,11 @@ //! // The number of connections we would like to maintain //! .required_peers(2) //! .build(); -//! // Run the node and wait for the sync message; -//! let client = client.run(); +//! // Start the node +//! let (client, events) = client.run(); //! // Split the client into components that send messages and listen to messages -//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx, .. } = client; +//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events; +//! // Wait for the sync message; //! loop { //! if let Some(event) = event_rx.recv().await { //! match event { @@ -37,7 +38,7 @@ //! } //! } //! } -//! requester.shutdown(); +//! client.shutdown(); //! } //! ``` @@ -81,7 +82,7 @@ use tokio::sync::mpsc::UnboundedSender; pub use { crate::builder::Builder, crate::chain::ChainState, - crate::client::{Client, Requester}, + crate::client::{Client, EventListeners}, crate::error::{ClientError, NodeError}, crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning}, }; diff --git a/src/node.rs b/src/node.rs index e5fc60e2..ca137eb6 100644 --- a/src/node.rs +++ b/src/node.rs @@ -31,6 +31,7 @@ use crate::{ error::HeaderSyncError, CFHeaderChanges, ChainState, FilterCheck, HeightMonitor, }, + client::Idle, error::FetchBlockError, messages::ClientRequest, network::{ @@ -66,7 +67,7 @@ pub(crate) struct Node { } impl Node { - pub(crate) fn build(network: Network, config: Config) -> Client { + pub(crate) fn build(network: Network, config: Config) -> Client { let Config { required_peers, white_list, diff --git a/tests/core.rs b/tests/core.rs index b13d6777..fd44b613 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -6,7 +6,7 @@ use std::{ use bip157::{ chain::{checkpoints::HeaderCheckpoint, BlockHeaderChanges, ChainState}, - client::Client, + client::{Client, EventListeners, Idle}, Address, BlockHash, Event, Info, ServiceFlags, Transaction, TrustedPeer, Warning, }; use bitcoin::{ @@ -48,7 +48,11 @@ fn start_bitcoind(with_v2_transport: bool) -> anyhow::Result<(corepc_node::Node, Ok((bitcoind, socket_addr)) } -fn new_node(socket_addr: SocketAddrV4, tempdir_path: PathBuf, chain_state: ChainState) -> Client { +fn new_node( + socket_addr: SocketAddrV4, + tempdir_path: PathBuf, + chain_state: ChainState, +) -> Client { let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); let mut trusted: TrustedPeer = host.into(); trusted.set_services(ServiceFlags::P2P_V2); @@ -81,10 +85,10 @@ async fn invalidate_block(rpc: &corepc_node::Client, hash: &bitcoin::BlockHash) tokio::time::sleep(Duration::from_secs(2)).await; } -async fn sync_assert(best: &bitcoin::BlockHash, channel: &mut UnboundedReceiver) { +async fn sync_assert(best: &bitcoin::BlockHash, event_rx: &mut UnboundedReceiver) { loop { tokio::select! { - event = channel.recv() => { + event = event_rx.recv() => { if let Some(Event::FiltersSynced(update)) = event { assert_eq!(update.tip().hash, *best); println!("Correct sync"); @@ -126,16 +130,14 @@ async fn live_reorg() { tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; + sync_assert(&best, &mut event_rx).await; // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -143,7 +145,7 @@ async fn live_reorg() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the reorg was caught - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -155,13 +157,13 @@ async fn live_reorg() { } bip157::messages::Event::FiltersSynced(update) => { assert_eq!(update.tip().hash, best); - requester.shutdown().unwrap(); + client.shutdown().unwrap(); break; } _ => {} } } - requester.shutdown().unwrap(); + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -179,16 +181,14 @@ async fn live_reorg_additional_sync() { tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; + sync_assert(&best, &mut event_rx).await; // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -196,7 +196,7 @@ async fn live_reorg_additional_sync() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the reorg was caught - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -215,8 +215,8 @@ async fn live_reorg_additional_sync() { } mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -234,19 +234,17 @@ async fn various_client_methods() { tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - let _ = requester.broadcast_min_feerate().await.unwrap(); - assert!(requester.is_running()); - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + let _ = client.broadcast_min_feerate().await.unwrap(); + assert!(client.is_running()); + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -264,17 +262,15 @@ async fn stop_reorg_resync() { tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -287,17 +283,15 @@ async fn stop_reorg_resync() { tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // Make sure the reorganization is caught after a cold start - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -315,8 +309,7 @@ async fn stop_reorg_resync() { _ => {} } } - requester.shutdown().unwrap(); - drop(handle); + client.shutdown().unwrap(); // Mine more blocks mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); @@ -326,18 +319,16 @@ async fn stop_reorg_resync() { tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -355,17 +346,15 @@ async fn stop_reorg_two_resync() { tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); // Reorganize the blocks let old_height = num_blocks(rpc); let old_best = best; @@ -374,23 +363,20 @@ async fn stop_reorg_two_resync() { invalidate_block(rpc, &best).await; mine_blocks(rpc, &miner, 3, 1).await; let best = best_hash(rpc); - drop(handle); // Make sure the reorganization is caught after a cold start let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - while let Some(message) = channel.recv().await { + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -408,8 +394,7 @@ async fn stop_reorg_two_resync() { _ => {} } } - drop(handle); - requester.shutdown().unwrap(); + client.shutdown().unwrap(); // Mine more blocks mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); @@ -419,18 +404,16 @@ async fn stop_reorg_two_resync() { tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -447,18 +430,15 @@ async fn stop_reorg_start_on_orphan() { tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - drop(handle); - requester.shutdown().unwrap(); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -471,18 +451,16 @@ async fn stop_reorg_start_on_orphan() { tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); let mut headers = Vec::new(); // Ensure SQL is able to catch the fork by loading in headers from the database - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Connected(header)) => { headers.push(header); @@ -503,43 +481,37 @@ async fn stop_reorg_start_on_orphan() { _ => {} } } - drop(handle); - requester.shutdown().unwrap(); + client.shutdown().unwrap(); let best = best_hash(rpc); let client = new_node( socket_addr, tempdir.clone(), ChainState::Snapshot(headers.clone()), ); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - drop(handle); - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the node does not have any corrupted headers let client = new_node(socket_addr, tempdir, ChainState::Snapshot(headers.clone())); - let client = client.run(); - let Client { - requester, + let (client, events) = client.run(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - .. - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -623,15 +595,9 @@ async fn tx_can_broadcast() { tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - let client = client.run(); - let Client { - requester, - info_rx: _, - warn_rx: _, - event_rx: _, - .. - } = client; - tokio::time::timeout(Duration::from_secs(60), requester.broadcast_tx(tx)) + + let (client, _events) = client.run(); + tokio::time::timeout(Duration::from_secs(60), client.broadcast_tx(tx)) .await .unwrap() .unwrap();