Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions examples/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use bip157::builder::Builder;
use bip157::chain::{BlockHeaderChanges, ChainState};
use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf};
use bip157::client::EventListeners;
use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf};
use std::collections::HashSet;
use tokio::time::Instant;

Expand All @@ -23,7 +24,7 @@ async fn main() {
// Create a new node builder
let builder = Builder::new(NETWORK);
// Add node preferences and build the node/client
let (node, client) = builder
let client = builder
// The number of connections we would like to maintain
.required_peers(2)
// Only scan for taproot scripts
Expand All @@ -34,17 +35,17 @@ async fn main() {
.add_peers(seeds.into_iter().map(From::from))
// Create the node and client
.build();
// Run the node on a separate task
tokio::task::spawn(async move { node.run().await });

// Split the client into components that send messages and listen to messages.
// With this construction, different parts of the program can take ownership of
// specific tasks.
let Client {
requester,
let (client, events) = client.run();
let EventListeners {
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;
} = events;

// Continually listen for events until the node is synced to its peers.
loop {
tokio::select! {
Expand All @@ -54,11 +55,11 @@ async fn main() {
Event::FiltersSynced(update) => {
tracing::info!("Chain tip: {}",update.tip().hash);
// Request information from the node
let fee = requester.broadcast_min_feerate().await.unwrap();
let fee = client.broadcast_min_feerate().await.unwrap();
tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee);
let sync_time = now.elapsed().as_secs_f32();
tracing::info!("Total sync time: {sync_time} seconds");
let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap();
let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap();
tracing::info!("Last block average fee rate: {:#}", avg_fee_rate);
break;
},
Expand All @@ -81,6 +82,6 @@ async fn main() {
}
}
}
let _ = requester.shutdown();
let _ = client.shutdown();
tracing::info!("Shutting down");
}
18 changes: 8 additions & 10 deletions examples/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use bip157::chain::{BlockHeaderChanges, ChainState};
use bip157::messages::Event;
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client};
use bip157::{Address, BlockHash, Network};
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint};
use bip157::{Address, BlockHash, EventListeners, Network};
use std::collections::HashSet;
use std::str::FromStr;

Expand Down Expand Up @@ -32,22 +32,20 @@ async fn main() {
// Create a new node builder
let builder = Builder::new(NETWORK);
// Add node preferences and build the node/client
let (node, client) = builder
let client = builder
// Only scan blocks strictly after a checkpoint
.chain_state(ChainState::Checkpoint(checkpoint))
// The number of connections we would like to maintain
.required_peers(1)
// Create the node and client
.build();

tokio::task::spawn(async move { node.run().await });

let Client {
requester,
let (client, events) = client.run();
let EventListeners {
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;
} = events;

// Continually listen for events until the node is synced to its peers.
loop {
Expand All @@ -71,7 +69,7 @@ async fn main() {
if filter.contains_any(addresses.iter()) {
let hash = filter.block_hash();
tracing::info!("Found script at {}!", hash);
let indexed_block = requester.get_block(hash).await.unwrap();
let indexed_block = client.get_block(hash).await.unwrap();
let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid();
tracing::info!("Coinbase transaction ID: {}", coinbase);
break;
Expand All @@ -87,6 +85,6 @@ async fn main() {
}
}
}
let _ = requester.shutdown();
let _ = client.shutdown();
tracing::info!("Shutting down");
}
9 changes: 5 additions & 4 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bitcoin::Network;

use super::{client::Client, node::Node};
use crate::chain::ChainState;
use crate::client::Idle;
use crate::network::ConnectionType;
use crate::TrustedPeer;
use crate::{Config, FilterType};
Expand All @@ -25,7 +26,7 @@ const MAX_PEERS: u8 = 15;
///
/// let host = (IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), None);
/// let builder = Builder::new(Network::Regtest);
/// let (node, client) = builder
/// let client = builder
/// .add_peers(vec![host.into()])
/// .build();
/// ```
Expand Down Expand Up @@ -138,8 +139,8 @@ impl Builder {
self
}

/// Consume the node builder and receive a [`Node`] and [`Client`].
pub fn build(mut self) -> (Node, Client) {
Node::new(self.network, core::mem::take(&mut self.config))
/// Consume the node builder and receive a [`Client`].
pub fn build(mut self) -> Client<Idle> {
Node::build(self.network, core::mem::take(&mut self.config))
}
}
122 changes: 108 additions & 14 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,33 @@ use tokio::sync::oneshot;

use crate::chain::block_subsidy;
use crate::messages::ClientRequest;
use crate::node::Node;
use crate::{Event, Info, TrustedPeer, Warning};

use super::{error::ClientError, messages::ClientMessage};
use super::{error::FetchBlockError, IndexedBlock};

/// A [`Client`] allows for communication with a running node.
/// Client state when idle.
pub struct Idle;
/// Client state when active.
pub struct Active;

mod sealed {
pub trait Sealed {}
}

impl sealed::Sealed for Idle {}
impl sealed::Sealed for Active {}

/// State of the client
pub trait State: sealed::Sealed {}

impl State for Idle {}
impl State for Active {}

/// Wrapper type for the channels that will receive events.
#[derive(Debug)]
pub struct Client {
/// Send events to a node, such as broadcasting a transaction.
pub requester: Requester,
pub struct EventListeners {
/// Receive informational messages from the node.
pub info_rx: mpsc::Receiver<Info>,
/// Receive warning messages from a node.
Expand All @@ -24,33 +41,110 @@ pub struct Client {
pub event_rx: mpsc::UnboundedReceiver<Event>,
}

impl Client {
pub(crate) fn new(
impl EventListeners {
fn new(
info_rx: mpsc::Receiver<Info>,
warn_rx: mpsc::UnboundedReceiver<Warning>,
event_rx: mpsc::UnboundedReceiver<Event>,
ntx: UnboundedSender<ClientMessage>,
) -> Self {
Self {
requester: Requester::new(ntx),
info_rx,
warn_rx,
event_rx,
}
}
}

/// Send messages to a node that is running so the node may complete a task.
#[derive(Debug, Clone)]
pub struct Requester {
/// A [`Client`] allows for communication with a running node.
#[derive(Debug)]
pub struct Client<S: State> {
/// Send events to a node, such as broadcasting a transaction.
ntx: UnboundedSender<ClientMessage>,
/// Receive informational messages from the node.
events: Option<EventListeners>,
/// Internal node structure.
node: Option<Node>,
/// Marker for state.
_marker: core::marker::PhantomData<S>,
}

impl Requester {
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
Self { ntx }
impl Client<Idle> {
pub(crate) fn new(
info_rx: mpsc::Receiver<Info>,
warn_rx: mpsc::UnboundedReceiver<Warning>,
event_rx: mpsc::UnboundedReceiver<Event>,
ntx: UnboundedSender<ClientMessage>,
node: Node,
) -> Client<Idle> {
Client {
ntx,
events: Some(EventListeners::new(info_rx, warn_rx, event_rx)),
node: Some(node),
_marker: core::marker::PhantomData,
}
}
/// Start the underlying node on a [`tokio::task`]. This assumes there is a runtime present to
/// execute the task.
pub fn run(mut self) -> (Client<Active>, EventListeners) {
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
tokio::task::spawn(async move { node.run().await });
(
Client {
ntx: self.ntx,
events: None,
node: None,
_marker: core::marker::PhantomData,
},
events,
)
}

/// Run on a detached operating system thread. This method is useful in the case where the
/// majority of your application code is blocking, and you do not have a
/// [`tokio::runtime::Runtime`] available. This method will implicitly create a runtime which
/// runs the data fetching process.
pub fn run_detached(mut self) -> (Client<Active>, EventListeners) {
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
std::thread::spawn(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
let _ = node.run().await;
})
});
let client = Client {
ntx: self.ntx,
events: None,
node: None,
_marker: core::marker::PhantomData,
};
(client, events)
}

/// Run the node with an existing [`tokio::runtime::Runtime`].
pub fn run_with_runtime(
mut self,
rt: impl AsRef<tokio::runtime::Runtime>,
) -> (Client<Active>, EventListeners) {
let rt = rt.as_ref();
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
rt.spawn(async move { node.run().await });
let client = Client {
ntx: self.ntx,
events: None,
node: None,
_marker: core::marker::PhantomData,
};
(client, events)
}
}

impl Client<Active> {
/// Tell the node to shut down.
///
/// # Errors
Expand Down
20 changes: 10 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! # Example usage
//!
//! ```no_run
//! use bip157::{Builder, Event, Client, Network, BlockHash};
//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash};
//!
//! #[tokio::main]
//! async fn main() {
Expand All @@ -18,14 +18,15 @@
//! // Create a new node builder
//! let builder = Builder::new(Network::Signet);
//! // Add node preferences and build the node/client
//! let (node, client) = builder
//! let client = builder
//! // The number of connections we would like to maintain
//! .required_peers(2)
//! .build();
//! // Run the node and wait for the sync message;
//! tokio::task::spawn(async move { node.run().await });
//! // Start the node
//! let (client, events) = client.run();
//! // Split the client into components that send messages and listen to messages
//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx } = client;
//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events;
//! // Wait for the sync message;
//! loop {
//! if let Some(event) = event_rx.recv().await {
//! match event {
Expand All @@ -37,7 +38,7 @@
//! }
//! }
//! }
//! requester.shutdown();
//! client.shutdown();
//! }
//! ```

Expand All @@ -57,8 +58,8 @@ pub mod client;
pub mod error;
/// Messages the node may send a client.
pub mod messages;
/// The structure that communicates with the Bitcoin P2P network and collects data.
pub mod node;
// The structure that communicates with the Bitcoin P2P network and collects data.
mod node;

use chain::Filter;

Expand All @@ -81,10 +82,9 @@ use tokio::sync::mpsc::UnboundedSender;
pub use {
crate::builder::Builder,
crate::chain::ChainState,
crate::client::{Client, Requester},
crate::client::{Client, EventListeners},
crate::error::{ClientError, NodeError},
crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning},
crate::node::Node,
};

#[doc(inline)]
Expand Down
Loading