Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions examples/example.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use bdk_kyoto::builder::{Builder, BuilderExt};
use bdk_kyoto::{
HeaderCheckpoint, Info, LightClient, Receiver, ScanType, UnboundedReceiver, Warning,
};
use bdk_kyoto::{HeaderCheckpoint, Info, Receiver, ScanType, UnboundedReceiver, Warning};
use bdk_wallet::bitcoin::Network;
use bdk_wallet::{KeychainKind, Wallet};
use tokio::select;
Expand Down Expand Up @@ -56,18 +54,15 @@ async fn main() -> anyhow::Result<()> {
};

// The light client builder handles the logic of inserting the SPKs
let LightClient {
requester,
info_subscriber,
warning_subscriber,
mut update_subscriber,
node,
} = Builder::new(NETWORK)
let client = Builder::new(NETWORK)
.build_with_wallet(&wallet, scan_type)
.unwrap();

tokio::task::spawn(async move { node.run().await });
tokio::task::spawn(async move { traces(info_subscriber, warning_subscriber).await });
let (client, logging, mut update_subscriber) = client.subscribe();
tokio::task::spawn(
async move { traces(logging.info_subscriber, logging.warning_subscriber).await },
);
let client = client.start();
let requester = client.requester();

// Sync and apply updates. We can do this in a continual loop while the "application" is running.
// Often this would occur on a separate thread than the underlying application user interface.
Expand Down
25 changes: 11 additions & 14 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@
//!
//! let scan_type = ScanType::Sync;
//!
//! let LightClient {
//! requester,
//! info_subscriber,
//! warning_subscriber,
//! update_subscriber,
//! node
//! } = Builder::new(Network::Signet)
//! let client = Builder::new(Network::Signet)
//! // A node may handle multiple connections
//! .required_peers(2)
//! // Choose where to store node data
Expand All @@ -59,7 +53,7 @@ use bdk_wallet::{
pub use bip157::Builder;
use bip157::{chain::ChainState, HeaderCheckpoint};

use crate::{LightClient, ScanType, UpdateSubscriber};
use crate::{Idle, LightClient, LoggingSubscribers, ScanType, UpdateSubscriber};

const IMPOSSIBLE_REORG_DEPTH: usize = 7;

Expand All @@ -70,15 +64,15 @@ pub trait BuilderExt {
self,
wallet: &Wallet,
scan_type: ScanType,
) -> Result<LightClient, BuilderError>;
) -> Result<LightClient<Idle>, BuilderError>;
}

impl BuilderExt for Builder {
fn build_with_wallet(
mut self,
wallet: &Wallet,
scan_type: ScanType,
) -> Result<LightClient, BuilderError> {
) -> Result<LightClient<Idle>, BuilderError> {
let network = wallet.network();
if self.network().ne(&network) {
return Err(BuilderError::NetworkMismatch);
Expand Down Expand Up @@ -109,13 +103,16 @@ impl BuilderExt for Builder {
wallet.latest_checkpoint(),
indexed_graph,
);
Ok(LightClient {
let client = LightClient::new(
requester,
info_subscriber: info_rx,
warning_subscriber: warn_rx,
LoggingSubscribers {
info_subscriber: info_rx,
warning_subscriber: warn_rx,
},
update_subscriber,
node,
})
);
Ok(client)
}
}

Expand Down
166 changes: 149 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,9 @@
//! .network(Network::Signet)
//! .create_wallet_no_persist()?;
//!
//! let LightClient {
//! requester,
//! info_subscriber: _,
//! warning_subscriber: _,
//! mut update_subscriber,
//! node
//! } = Builder::new(Network::Signet).build_with_wallet(&wallet, ScanType::Sync)?;
//!
//! tokio::task::spawn(async move { node.run().await });
//! let client = Builder::new(Network::Signet).build_with_wallet(&wallet, ScanType::Sync)?;
//! let (client, _, mut update_subscriber) = client.subscribe();
//! client.start();
//!
//! loop {
//! let update = update_subscriber.update().await?;
Expand All @@ -51,6 +45,7 @@ use bdk_wallet::KeychainKind;
pub extern crate bip157;

use bip157::chain::BlockHeaderChanges;
use bip157::tokio;
use bip157::IndexedBlock;
use bip157::ScriptBuf;
#[doc(inline)]
Expand All @@ -69,19 +64,156 @@ pub use bip157::UnboundedReceiver;
pub use builder::BuilderExt;
pub mod builder;

/// Client state when idle.
pub struct Idle;
/// Client state when subscribed to events.
pub struct Subscribed;
/// Client state when active.
pub struct Active;

mod sealed {
pub trait Sealed {}
}

impl sealed::Sealed for Idle {}
impl sealed::Sealed for Subscribed {}
impl sealed::Sealed for Active {}

/// State of the client.
pub trait State: sealed::Sealed {}

impl State for Idle {}
impl State for Subscribed {}
impl State for Active {}

/// Subscribe to events, notably
#[derive(Debug)]
/// A node and associated structs to send and receive events to and from the node.
pub struct LightClient {
/// Send events to a running node (i.e. broadcast a transaction).
pub requester: Requester,
pub struct LoggingSubscribers {
/// Receive informational messages as the node runs.
pub info_subscriber: Receiver<Info>,
/// Receive warnings from the node as it runs.
pub warning_subscriber: UnboundedReceiver<Warning>,
/// Receive wallet updates from a node.
pub update_subscriber: UpdateSubscriber,
/// The underlying node that must be run to fetch blocks from peers.
pub node: Node,
}

/// A client and associated structs to send and receive events to and from a node process.
///
/// The client has three states:
/// - [`Idle`]: the client has been initialized.
/// - [`Subscribed`]: the application is ready to handle logs and updates, but the process is not
/// running yet
/// - [`Active`]: the client is actively fetching data and may now handle requests.
#[derive(Debug)]
pub struct LightClient<S: State> {
// Send events to a running node (i.e. broadcast a transaction).
requester: Requester,
// Receive info/warnings from the node as it runs.
logging_subscribers: Option<LoggingSubscribers>,
// Receive wallet updates from a node.
update_subscriber: Option<UpdateSubscriber>,
// The underlying node that must be run to fetch blocks from peers.
node: Option<Node>,
_marker: core::marker::PhantomData<S>,
}

impl LightClient<Idle> {
fn new(
requester: Requester,
logging: LoggingSubscribers,
update: UpdateSubscriber,
node: bip157::Node,
) -> LightClient<Idle> {
LightClient {
requester,
logging_subscribers: Some(logging),
update_subscriber: Some(update),
node: Some(node),
_marker: core::marker::PhantomData,
}
}

/// Subscribe to events emitted by the underlying data fetching process. This includes logging
/// and wallet updates. During this step, one may start threads that log to a file and apply
/// updates to a wallet.
///
/// # Returns
///
/// - [`LightClient<Subscribed>`], a client ready to start.
/// - [`LoggingSubscribers`], info and warning messages to display to a user or write to file.
/// - [`UpdateSubscriber`], used to await updates related to the user's wallet.
pub fn subscribe(
mut self,
) -> (
LightClient<Subscribed>,
LoggingSubscribers,
UpdateSubscriber,
) {
let logging =
core::mem::take(&mut self.logging_subscribers).expect("cannot subscribe twice.");
let updates =
core::mem::take(&mut self.update_subscriber).expect("cannot subscribe twice.");
let client = LightClient {
requester: self.requester,
logging_subscribers: None,
update_subscriber: None,
node: self.node,
_marker: core::marker::PhantomData,
};
(client, logging, updates)
}
}

impl LightClient<Subscribed> {
/// Start fetching data for the wallet on a dedicated [`tokio::task`]. This will continually
/// run until terminated or no peers could be found.
///
/// # Panics
///
/// If there is no [`tokio::runtime::Runtime`] to drive execution. Common in synchronous
/// setups.
pub fn start(mut self) -> LightClient<Active> {
let node = core::mem::take(&mut self.node).expect("cannot start twice.");
tokio::task::spawn(async move { node.run().await });
LightClient {
requester: self.requester,
logging_subscribers: None,
update_subscriber: None,
node: None,
_marker: core::marker::PhantomData,
}
}

/// Take the underlying node process to run in a custom way. Examples include using a dedicated
/// [`tokio::runtime::Runtime`] or [`tokio::runtime::Handle`] to drive execution.
pub fn managed_start(mut self) -> (LightClient<Active>, Node) {
let node = core::mem::take(&mut self.node).expect("cannot start twice.");
let client = LightClient {
requester: self.requester,
logging_subscribers: None,
update_subscriber: None,
node: None,
_marker: core::marker::PhantomData,
};
(client, node)
}
}

impl LightClient<Active> {
/// The client is active and may now handle requests with a [`Requester`].
pub fn requester(self) -> Requester {
self.requester
}
}

impl From<LightClient<Active>> for Requester {
fn from(value: LightClient<Active>) -> Self {
value.requester
}
}

impl AsRef<Requester> for LightClient<Active> {
fn as_ref(&self) -> &Requester {
&self.requester
}
}

/// Interpret events from a node that is running to apply
Expand Down
Loading