diff --git a/examples/bitcoin.rs b/examples/bitcoin.rs index b98439ed..af180191 100644 --- a/examples/bitcoin.rs +++ b/examples/bitcoin.rs @@ -3,7 +3,8 @@ use bip157::builder::Builder; use bip157::chain::{BlockHeaderChanges, ChainState}; -use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf}; +use bip157::client::EventListeners; +use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf}; use std::collections::HashSet; use tokio::time::Instant; @@ -23,7 +24,7 @@ async fn main() { // Create a new node builder let builder = Builder::new(NETWORK); // Add node preferences and build the node/client - let (node, client) = builder + let client = builder // The number of connections we would like to maintain .required_peers(2) // Only scan for taproot scripts @@ -34,17 +35,18 @@ async fn main() { .add_peers(seeds.into_iter().map(From::from)) // Create the node and client .build(); - // Run the node on a separate task - tokio::task::spawn(async move { node.run().await }); + // Split the client into components that send messages and listen to messages. // With this construction, different parts of the program can take ownership of // specific tasks. - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { mut info_rx, mut warn_rx, mut event_rx, - } = client; + } = events; + let client = client.start(); + // Continually listen for events until the node is synced to its peers. loop { tokio::select! { @@ -54,11 +56,11 @@ async fn main() { Event::FiltersSynced(update) => { tracing::info!("Chain tip: {}",update.tip().hash); // Request information from the node - let fee = requester.broadcast_min_feerate().await.unwrap(); + let fee = client.broadcast_min_feerate().await.unwrap(); tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee); let sync_time = now.elapsed().as_secs_f32(); tracing::info!("Total sync time: {sync_time} seconds"); - let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap(); + let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap(); tracing::info!("Last block average fee rate: {:#}", avg_fee_rate); break; }, @@ -81,6 +83,6 @@ async fn main() { } } } - let _ = requester.shutdown(); + let _ = client.shutdown(); tracing::info!("Shutting down"); } diff --git a/examples/signet.rs b/examples/signet.rs index 64128790..2339ae30 100644 --- a/examples/signet.rs +++ b/examples/signet.rs @@ -3,8 +3,8 @@ use bip157::chain::{BlockHeaderChanges, ChainState}; use bip157::messages::Event; -use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client}; -use bip157::{Address, BlockHash, Network}; +use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint}; +use bip157::{Address, BlockHash, EventListeners, Network}; use std::collections::HashSet; use std::str::FromStr; @@ -32,7 +32,7 @@ async fn main() { // Create a new node builder let builder = Builder::new(NETWORK); // Add node preferences and build the node/client - let (node, client) = builder + let client = builder // Only scan blocks strictly after a checkpoint .chain_state(ChainState::Checkpoint(checkpoint)) // The number of connections we would like to maintain @@ -40,14 +40,13 @@ async fn main() { // Create the node and client .build(); - tokio::task::spawn(async move { node.run().await }); - - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { mut info_rx, mut warn_rx, mut event_rx, - } = client; + } = events; + let client = client.start(); // Continually listen for events until the node is synced to its peers. loop { @@ -71,7 +70,7 @@ async fn main() { if filter.contains_any(addresses.iter()) { let hash = filter.block_hash(); tracing::info!("Found script at {}!", hash); - let indexed_block = requester.get_block(hash).await.unwrap(); + let indexed_block = client.get_block(hash).await.unwrap(); let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid(); tracing::info!("Coinbase transaction ID: {}", coinbase); break; @@ -87,6 +86,6 @@ async fn main() { } } } - let _ = requester.shutdown(); + let _ = client.shutdown(); tracing::info!("Shutting down"); } diff --git a/src/builder.rs b/src/builder.rs index f1fab3f2..a24dd317 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -5,6 +5,7 @@ use bitcoin::Network; use super::{client::Client, node::Node}; use crate::chain::ChainState; +use crate::client::Idle; use crate::network::ConnectionType; use crate::TrustedPeer; use crate::{Config, FilterType}; @@ -25,7 +26,7 @@ const MAX_PEERS: u8 = 15; /// /// let host = (IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), None); /// let builder = Builder::new(Network::Regtest); -/// let (node, client) = builder +/// let client = builder /// .add_peers(vec![host.into()]) /// .build(); /// ``` @@ -138,8 +139,8 @@ impl Builder { self } - /// Consume the node builder and receive a [`Node`] and [`Client`]. - pub fn build(mut self) -> (Node, Client) { - Node::new(self.network, core::mem::take(&mut self.config)) + /// Consume the node builder and receive a [`Client`]. + pub fn build(mut self) -> Client { + Node::build(self.network, core::mem::take(&mut self.config)) } } diff --git a/src/client.rs b/src/client.rs index 7eda3bf5..8bbfdc90 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,16 +6,37 @@ use tokio::sync::oneshot; use crate::chain::block_subsidy; use crate::messages::ClientRequest; +use crate::node::Node; use crate::{Event, Info, TrustedPeer, Warning}; use super::{error::ClientError, messages::ClientMessage}; use super::{error::FetchBlockError, IndexedBlock}; -/// A [`Client`] allows for communication with a running node. +/// Client state when idle. +pub struct Idle; +/// Client state when subscribed to events. +pub struct Subscribed; +/// Client state when active. +pub struct Active; + +mod sealed { + pub trait Sealed {} +} + +impl sealed::Sealed for Idle {} +impl sealed::Sealed for Subscribed {} +impl sealed::Sealed for Active {} + +/// State of the client. +pub trait State: sealed::Sealed {} + +impl State for Idle {} +impl State for Subscribed {} +impl State for Active {} + +/// Wrapper type for the channels that will receive events. #[derive(Debug)] -pub struct Client { - /// Send events to a node, such as broadcasting a transaction. - pub requester: Requester, +pub struct EventListeners { /// Receive informational messages from the node. pub info_rx: mpsc::Receiver, /// Receive warning messages from a node. @@ -24,15 +45,13 @@ pub struct Client { pub event_rx: mpsc::UnboundedReceiver, } -impl Client { - pub(crate) fn new( +impl EventListeners { + fn new( info_rx: mpsc::Receiver, warn_rx: mpsc::UnboundedReceiver, event_rx: mpsc::UnboundedReceiver, - ntx: UnboundedSender, ) -> Self { Self { - requester: Requester::new(ntx), info_rx, warn_rx, event_rx, @@ -40,17 +59,88 @@ impl Client { } } -/// Send messages to a node that is running so the node may complete a task. -#[derive(Debug, Clone)] -pub struct Requester { +/// A [`Client`] allows for communication with a running node. +/// +/// The [`Client`] is generic over 3 states: +/// - [`Idle`]: the client is not running and event handling has not been initialized. +/// - [`Subscribed`]: events have been subscribed to in the program, but the client has not started. +/// - [`Active`]: data is actively being fetched and the [`Client`] may perform actions. +/// +#[derive(Debug)] +pub struct Client { + /// Send events to a node, such as broadcasting a transaction. ntx: UnboundedSender, + /// Receive informational messages from the node. + events: Option, + /// Internal node structure. + node: Option, + /// Marker for state. + _marker: core::marker::PhantomData, } -impl Requester { - fn new(ntx: UnboundedSender) -> Self { - Self { ntx } +impl Client { + pub(crate) fn new( + info_rx: mpsc::Receiver, + warn_rx: mpsc::UnboundedReceiver, + event_rx: mpsc::UnboundedReceiver, + ntx: UnboundedSender, + node: Node, + ) -> Client { + Client { + ntx, + events: Some(EventListeners::new(info_rx, warn_rx, event_rx)), + node: Some(node), + _marker: core::marker::PhantomData, + } + } + + /// Subscribe to the events published by the light client. Applications may perform arbitrary behavior + /// when receiving these events, such as logging or applying the effect of a block to a wallet. + /// The client is not yet running after this step. + pub fn subscribe(mut self) -> (Client, EventListeners) { + let events = core::mem::take(&mut self.events).expect("cannot call run twice."); + ( + Client { + ntx: self.ntx, + events: None, + node: self.node, + _marker: core::marker::PhantomData, + }, + events, + ) } +} + +impl Client { + /// Start the client, which will begin publishing events to subscribers. This will implicitly + /// spawn a [`tokio::task`] to fetch data for the client. + pub fn start(mut self) -> Client { + let node = core::mem::take(&mut self.node).expect("cannot call run twice."); + tokio::task::spawn(async move { node.run().await }); + Client { + ntx: self.ntx, + events: None, + node: None, + _marker: core::marker::PhantomData, + } + } + + /// Receive a [`Node`] to run on a dedicated resource, likely with a custom [`tokio::runtime::Runtime`]. + pub fn start_managed(mut self) -> (Client, Node) { + let node = core::mem::take(&mut self.node).expect("cannot call run twice."); + ( + Client { + ntx: self.ntx, + events: None, + node: None, + _marker: core::marker::PhantomData, + }, + node, + ) + } +} +impl Client { /// Tell the node to shut down. /// /// # Errors diff --git a/src/lib.rs b/src/lib.rs index de58f6f1..aaa2e437 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ //! # Example usage //! //! ```no_run -//! use bip157::{Builder, Event, Client, Network, BlockHash}; +//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash}; //! //! #[tokio::main] //! async fn main() { @@ -18,14 +18,16 @@ //! // Create a new node builder //! let builder = Builder::new(Network::Signet); //! // Add node preferences and build the node/client -//! let (node, client) = builder +//! let client = builder //! // The number of connections we would like to maintain //! .required_peers(2) //! .build(); -//! // Run the node and wait for the sync message; -//! tokio::task::spawn(async move { node.run().await }); +//! // Start the node +//! let (client, events) = client.subscribe(); +//! let client = client.start(); //! // Split the client into components that send messages and listen to messages -//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx } = client; +//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events; +//! // Wait for the sync message; //! loop { //! if let Some(event) = event_rx.recv().await { //! match event { @@ -37,7 +39,7 @@ //! } //! } //! } -//! requester.shutdown(); +//! client.shutdown(); //! } //! ``` @@ -81,7 +83,7 @@ use tokio::sync::mpsc::UnboundedSender; pub use { crate::builder::Builder, crate::chain::ChainState, - crate::client::{Client, Requester}, + crate::client::{Client, EventListeners}, crate::error::{ClientError, NodeError}, crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning}, crate::node::Node, diff --git a/src/node.rs b/src/node.rs index 9258770e..1f7cf45b 100644 --- a/src/node.rs +++ b/src/node.rs @@ -31,6 +31,7 @@ use crate::{ error::HeaderSyncError, CFHeaderChanges, ChainState, FilterCheck, HeightMonitor, }, + client::Idle, error::FetchBlockError, messages::ClientRequest, network::{ @@ -66,7 +67,7 @@ pub struct Node { } impl Node { - pub(crate) fn new(network: Network, config: Config) -> (Self, Client) { + pub(crate) fn build(network: Network, config: Config) -> Client { let Config { required_peers, white_list, @@ -81,7 +82,6 @@ impl Node { let (warn_tx, warn_rx) = mpsc::unbounded_channel::(); let (event_tx, event_rx) = mpsc::unbounded_channel::(); let (ctx, crx) = mpsc::unbounded_channel::(); - let client = Client::new(info_rx, warn_rx, event_rx, ctx); // A structured way to talk to the client let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx)); // We always assume we are behind @@ -110,19 +110,17 @@ impl Node { required_peers, filter_type, ); - ( - Self { - state, - chain, - peer_map, - required_peers: required_peers.into(), - dialog, - block_queue: BlockQueue::new(), - client_recv: crx, - peer_recv: mrx, - }, - client, - ) + let node = Self { + state, + chain, + peer_map, + required_peers: required_peers.into(), + dialog, + block_queue: BlockQueue::new(), + client_recv: crx, + peer_recv: mrx, + }; + Client::new(info_rx, warn_rx, event_rx, ctx, node) } /// Run the node continuously. Typically run on a separate thread than the underlying application. diff --git a/tests/core.rs b/tests/core.rs index 00dafacf..cbd30c2a 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -6,8 +6,7 @@ use std::{ use bip157::{ chain::{checkpoints::HeaderCheckpoint, BlockHeaderChanges, ChainState}, - client::Client, - node::Node, + client::{Client, EventListeners, Idle}, Address, BlockHash, Event, Info, ServiceFlags, Transaction, TrustedPeer, Warning, }; use bitcoin::{ @@ -53,14 +52,13 @@ fn new_node( socket_addr: SocketAddrV4, tempdir_path: PathBuf, chain_state: ChainState, -) -> (Node, Client) { +) -> Client { let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); let mut trusted: TrustedPeer = host.into(); trusted.set_services(ServiceFlags::P2P_V2); let builder = bip157::builder::Builder::new(bitcoin::Network::Regtest); let builder = builder.chain_state(chain_state); - let (node, client) = builder.add_peer(host).data_dir(tempdir_path).build(); - (node, client) + builder.add_peer(host).data_dir(tempdir_path).build() } fn num_blocks(rpc: &corepc_node::Client) -> i64 { @@ -87,10 +85,10 @@ async fn invalidate_block(rpc: &corepc_node::Client, hash: &bitcoin::BlockHash) tokio::time::sleep(Duration::from_secs(2)).await; } -async fn sync_assert(best: &bitcoin::BlockHash, channel: &mut UnboundedReceiver) { +async fn sync_assert(best: &bitcoin::BlockHash, event_rx: &mut UnboundedReceiver) { loop { tokio::select! { - event = channel.recv() => { + event = event_rx.recv() => { if let Some(Event::FiltersSynced(update)) = event { assert_eq!(update.tip().hash, *best); println!("Correct sync"); @@ -127,20 +125,20 @@ async fn live_reorg() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; + mut event_rx, + } = events; + let client = client.start(); tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; + sync_assert(&best, &mut event_rx).await; // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -148,7 +146,7 @@ async fn live_reorg() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the reorg was caught - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -160,13 +158,13 @@ async fn live_reorg() { } bip157::messages::Event::FiltersSynced(update) => { assert_eq!(update.tip().hash, best); - requester.shutdown().unwrap(); + client.shutdown().unwrap(); break; } _ => {} } } - requester.shutdown().unwrap(); + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -179,20 +177,20 @@ async fn live_reorg_additional_sync() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; + mut event_rx, + } = events; + let client = client.start(); tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; + sync_assert(&best, &mut event_rx).await; // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -200,7 +198,7 @@ async fn live_reorg_additional_sync() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the reorg was caught - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -219,8 +217,8 @@ async fn live_reorg_additional_sync() { } mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -233,23 +231,23 @@ async fn various_client_methods() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 500, 15).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; + mut event_rx, + } = events; + let client = client.start(); tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - let _ = requester.broadcast_min_feerate().await.unwrap(); - assert!(requester.is_running()); - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + let _ = client.broadcast_min_feerate().await.unwrap(); + assert!(client.is_running()); + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -262,21 +260,21 @@ async fn stop_reorg_resync() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; + mut event_rx, + } = events; + let client = client.start(); tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -284,21 +282,21 @@ async fn stop_reorg_resync() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Spin up the node on a cold start - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); // Make sure the reorganization is caught after a cold start - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -316,28 +314,27 @@ async fn stop_reorg_resync() { _ => {} } } - requester.shutdown().unwrap(); - drop(handle); + client.shutdown().unwrap(); // Mine more blocks mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the node does not have any corrupted headers - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -350,21 +347,21 @@ async fn stop_reorg_two_resync() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 10, 2).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); // Reorganize the blocks let old_height = num_blocks(rpc); let old_best = best; @@ -373,22 +370,21 @@ async fn stop_reorg_two_resync() { invalidate_block(rpc, &best).await; mine_blocks(rpc, &miner, 3, 1).await; let best = best_hash(rpc); - drop(handle); // Make sure the reorganization is caught after a cold start - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - while let Some(message) = channel.recv().await { + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Reorganized { accepted: _, @@ -406,28 +402,27 @@ async fn stop_reorg_two_resync() { _ => {} } } - drop(handle); - requester.shutdown().unwrap(); + client.shutdown().unwrap(); // Mine more blocks mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the node does not have any corrupted headers - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir, ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let client = client.start(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -439,22 +434,21 @@ async fn stop_reorg_start_on_orphan() { let miner = rpc.new_address().unwrap(); mine_blocks(rpc, &miner, 17, 3).await; let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); - sync_assert(&best, &mut channel).await; - drop(handle); - requester.shutdown().unwrap(); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); @@ -462,22 +456,22 @@ async fn stop_reorg_start_on_orphan() { mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Spin up the node on a cold start with a stale tip - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); let mut headers = Vec::new(); // Ensure SQL is able to catch the fork by loading in headers from the database - while let Some(message) = channel.recv().await { + while let Some(message) = event_rx.recv().await { match message { bip157::messages::Event::ChainUpdate(BlockHeaderChanges::Connected(header)) => { headers.push(header); @@ -498,41 +492,39 @@ async fn stop_reorg_start_on_orphan() { _ => {} } } - drop(handle); - requester.shutdown().unwrap(); + client.shutdown().unwrap(); let best = best_hash(rpc); - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Snapshot(headers.clone()), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; - let handle = tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + mut event_rx, + } = events; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - drop(handle); - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); // Make sure the node does not have any corrupted headers - let (node, client) = new_node(socket_addr, tempdir, ChainState::Snapshot(headers.clone())); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, + let client = new_node(socket_addr, tempdir, ChainState::Snapshot(headers.clone())); + let (client, events) = client.subscribe(); + let EventListeners { info_rx, warn_rx, - event_rx: mut channel, - } = client; + mut event_rx, + } = events; tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + let client = client.start(); // The node properly syncs after persisting a reorg - sync_assert(&best, &mut channel).await; - requester.shutdown().unwrap(); + sync_assert(&best, &mut event_rx).await; + client.shutdown().unwrap(); rpc.stop().unwrap(); } @@ -611,19 +603,15 @@ async fn tx_can_broadcast() { let tx = sighasher.into_transaction().to_owned(); println!("Signed transaction"); // Build a node to broadcast the transaction - let (node, client) = new_node( + let client = new_node( socket_addr, tempdir.clone(), ChainState::Checkpoint(HeaderCheckpoint::from_genesis(bitcoin::Network::Regtest)), ); - tokio::task::spawn(async move { node.run().await }); - let Client { - requester, - info_rx: _, - warn_rx: _, - event_rx: _, - } = client; - tokio::time::timeout(Duration::from_secs(60), requester.broadcast_tx(tx)) + + let (client, _events) = client.subscribe(); + let client = client.start(); + tokio::time::timeout(Duration::from_secs(60), client.broadcast_tx(tx)) .await .unwrap() .unwrap();