From 203a6d118b2ea6f6b6c7651cad96968028ff61f9 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 27 Feb 2026 10:35:43 +0000 Subject: [PATCH] Encapsulate `LightClient` with typestate I made an attempt in the base client to use typestate for running the node [here](https://github.com/2140-dev/kyoto/pull/535). While I think the pattern is ergonomic, it imposes too many restrictions on the `Client` ownership model. The pattern does seem appropriate here, however, where we are taking a more opinionated approach as to how the user interacts with the light client. This API abstracts the `Node` entirely for users that have a `tokio` environment present. I divided the stages of the `LightClient` into `Idle`, `Subscribed`, `Active`. The differentiation between `Idle` and `Subscribed` can be seen in the example. This allows the user to set up the necessary logging tasks required for their app. Then they may call `start` to execute the task, and finally make requests thereafter. --- examples/example.rs | 21 +++--- src/builder.rs | 25 +++---- src/lib.rs | 166 +++++++++++++++++++++++++++++++++++++++----- tests/client.rs | 52 +++++--------- 4 files changed, 187 insertions(+), 77 deletions(-) diff --git a/examples/example.rs b/examples/example.rs index 5ae2f5f..cacb9f7 100644 --- a/examples/example.rs +++ b/examples/example.rs @@ -1,7 +1,5 @@ use bdk_kyoto::builder::{Builder, BuilderExt}; -use bdk_kyoto::{ - HeaderCheckpoint, Info, LightClient, Receiver, ScanType, UnboundedReceiver, Warning, -}; +use bdk_kyoto::{HeaderCheckpoint, Info, Receiver, ScanType, UnboundedReceiver, Warning}; use bdk_wallet::bitcoin::Network; use bdk_wallet::{KeychainKind, Wallet}; use tokio::select; @@ -56,18 +54,15 @@ async fn main() -> anyhow::Result<()> { }; // The light client builder handles the logic of inserting the SPKs - let LightClient { - requester, - info_subscriber, - warning_subscriber, - mut update_subscriber, - node, - } = Builder::new(NETWORK) + let client = Builder::new(NETWORK) .build_with_wallet(&wallet, scan_type) .unwrap(); - - tokio::task::spawn(async move { node.run().await }); - tokio::task::spawn(async move { traces(info_subscriber, warning_subscriber).await }); + let (client, logging, mut update_subscriber) = client.subscribe(); + tokio::task::spawn( + async move { traces(logging.info_subscriber, logging.warning_subscriber).await }, + ); + let client = client.start(); + let requester = client.requester(); // Sync and apply updates. We can do this in a continual loop while the "application" is running. // Often this would occur on a separate thread than the underlying application user interface. diff --git a/src/builder.rs b/src/builder.rs index dcbe85f..600d733 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -30,13 +30,7 @@ //! //! let scan_type = ScanType::Sync; //! -//! let LightClient { -//! requester, -//! info_subscriber, -//! warning_subscriber, -//! update_subscriber, -//! node -//! } = Builder::new(Network::Signet) +//! let client = Builder::new(Network::Signet) //! // A node may handle multiple connections //! .required_peers(2) //! // Choose where to store node data @@ -59,7 +53,7 @@ use bdk_wallet::{ pub use bip157::Builder; use bip157::{chain::ChainState, HeaderCheckpoint}; -use crate::{LightClient, ScanType, UpdateSubscriber}; +use crate::{Idle, LightClient, LoggingSubscribers, ScanType, UpdateSubscriber}; const IMPOSSIBLE_REORG_DEPTH: usize = 7; @@ -70,7 +64,7 @@ pub trait BuilderExt { self, wallet: &Wallet, scan_type: ScanType, - ) -> Result; + ) -> Result, BuilderError>; } impl BuilderExt for Builder { @@ -78,7 +72,7 @@ impl BuilderExt for Builder { mut self, wallet: &Wallet, scan_type: ScanType, - ) -> Result { + ) -> Result, BuilderError> { let network = wallet.network(); if self.network().ne(&network) { return Err(BuilderError::NetworkMismatch); @@ -109,13 +103,16 @@ impl BuilderExt for Builder { wallet.latest_checkpoint(), indexed_graph, ); - Ok(LightClient { + let client = LightClient::new( requester, - info_subscriber: info_rx, - warning_subscriber: warn_rx, + LoggingSubscribers { + info_subscriber: info_rx, + warning_subscriber: warn_rx, + }, update_subscriber, node, - }) + ); + Ok(client) } } diff --git a/src/lib.rs b/src/lib.rs index 7012159..acc9600 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,15 +19,9 @@ //! .network(Network::Signet) //! .create_wallet_no_persist()?; //! -//! let LightClient { -//! requester, -//! info_subscriber: _, -//! warning_subscriber: _, -//! mut update_subscriber, -//! node -//! } = Builder::new(Network::Signet).build_with_wallet(&wallet, ScanType::Sync)?; -//! -//! tokio::task::spawn(async move { node.run().await }); +//! let client = Builder::new(Network::Signet).build_with_wallet(&wallet, ScanType::Sync)?; +//! let (client, _, mut update_subscriber) = client.subscribe(); +//! client.start(); //! //! loop { //! let update = update_subscriber.update().await?; @@ -51,6 +45,7 @@ use bdk_wallet::KeychainKind; pub extern crate bip157; use bip157::chain::BlockHeaderChanges; +use bip157::tokio; use bip157::IndexedBlock; use bip157::ScriptBuf; #[doc(inline)] @@ -69,19 +64,156 @@ pub use bip157::UnboundedReceiver; pub use builder::BuilderExt; pub mod builder; +/// Client state when idle. +pub struct Idle; +/// Client state when subscribed to events. +pub struct Subscribed; +/// Client state when active. +pub struct Active; + +mod sealed { + pub trait Sealed {} +} + +impl sealed::Sealed for Idle {} +impl sealed::Sealed for Subscribed {} +impl sealed::Sealed for Active {} + +/// State of the client. +pub trait State: sealed::Sealed {} + +impl State for Idle {} +impl State for Subscribed {} +impl State for Active {} + +/// Subscribe to events, notably #[derive(Debug)] -/// A node and associated structs to send and receive events to and from the node. -pub struct LightClient { - /// Send events to a running node (i.e. broadcast a transaction). - pub requester: Requester, +pub struct LoggingSubscribers { /// Receive informational messages as the node runs. pub info_subscriber: Receiver, /// Receive warnings from the node as it runs. pub warning_subscriber: UnboundedReceiver, - /// Receive wallet updates from a node. - pub update_subscriber: UpdateSubscriber, - /// The underlying node that must be run to fetch blocks from peers. - pub node: Node, +} + +/// A client and associated structs to send and receive events to and from a node process. +/// +/// The client has three states: +/// - [`Idle`]: the client has been initialized. +/// - [`Subscribed`]: the application is ready to handle logs and updates, but the process is not +/// running yet +/// - [`Active`]: the client is actively fetching data and may now handle requests. +#[derive(Debug)] +pub struct LightClient { + // Send events to a running node (i.e. broadcast a transaction). + requester: Requester, + // Receive info/warnings from the node as it runs. + logging_subscribers: Option, + // Receive wallet updates from a node. + update_subscriber: Option, + // The underlying node that must be run to fetch blocks from peers. + node: Option, + _marker: core::marker::PhantomData, +} + +impl LightClient { + fn new( + requester: Requester, + logging: LoggingSubscribers, + update: UpdateSubscriber, + node: bip157::Node, + ) -> LightClient { + LightClient { + requester, + logging_subscribers: Some(logging), + update_subscriber: Some(update), + node: Some(node), + _marker: core::marker::PhantomData, + } + } + + /// Subscribe to events emitted by the underlying data fetching process. This includes logging + /// and wallet updates. During this step, one may start threads that log to a file and apply + /// updates to a wallet. + /// + /// # Returns + /// + /// - [`LightClient`], a client ready to start. + /// - [`LoggingSubscribers`], info and warning messages to display to a user or write to file. + /// - [`UpdateSubscriber`], used to await updates related to the user's wallet. + pub fn subscribe( + mut self, + ) -> ( + LightClient, + LoggingSubscribers, + UpdateSubscriber, + ) { + let logging = + core::mem::take(&mut self.logging_subscribers).expect("cannot subscribe twice."); + let updates = + core::mem::take(&mut self.update_subscriber).expect("cannot subscribe twice."); + let client = LightClient { + requester: self.requester, + logging_subscribers: None, + update_subscriber: None, + node: self.node, + _marker: core::marker::PhantomData, + }; + (client, logging, updates) + } +} + +impl LightClient { + /// Start fetching data for the wallet on a dedicated [`tokio::task`]. This will continually + /// run until terminated or no peers could be found. + /// + /// # Panics + /// + /// If there is no [`tokio::runtime::Runtime`] to drive execution. Common in synchronous + /// setups. + pub fn start(mut self) -> LightClient { + let node = core::mem::take(&mut self.node).expect("cannot start twice."); + tokio::task::spawn(async move { node.run().await }); + LightClient { + requester: self.requester, + logging_subscribers: None, + update_subscriber: None, + node: None, + _marker: core::marker::PhantomData, + } + } + + /// Take the underlying node process to run in a custom way. Examples include using a dedicated + /// [`tokio::runtime::Runtime`] or [`tokio::runtime::Handle`] to drive execution. + pub fn managed_start(mut self) -> (LightClient, Node) { + let node = core::mem::take(&mut self.node).expect("cannot start twice."); + let client = LightClient { + requester: self.requester, + logging_subscribers: None, + update_subscriber: None, + node: None, + _marker: core::marker::PhantomData, + }; + (client, node) + } +} + +impl LightClient { + /// The client is active and may now handle requests with a [`Requester`]. + pub fn requester(self) -> Requester { + self.requester + } +} + +impl From> for Requester { + fn from(value: LightClient) -> Self { + value.requester + } +} + +impl AsRef for LightClient { + fn as_ref(&self) -> &Requester { + &self.requester + } } /// Interpret events from a node that is running to apply diff --git a/tests/client.rs b/tests/client.rs index 5667780..5f6ee0c 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,8 +1,8 @@ // #![allow(unused)] +use bdk_kyoto::Idle; use std::net::IpAddr; use std::path::PathBuf; use std::time::Duration; -use tokio::task; use tokio::time; use bdk_kyoto::builder::{Builder, BuilderExt}; @@ -42,7 +42,7 @@ fn init_node( env: &TestEnv, wallet: &bdk_wallet::Wallet, tempdir: PathBuf, -) -> anyhow::Result { +) -> anyhow::Result> { let peer = env.bitcoind.params.p2p_socket.unwrap(); let ip: IpAddr = (*peer.ip()).into(); let port = peer.port(); @@ -73,12 +73,8 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> { // build node/client let tempdir = tempfile::tempdir()?.path().join("kyoto-data"); - let LightClient { - requester, - mut update_subscriber, - node, - .. - } = init_node(&env, &wallet, tempdir)?; + let client = init_node(&env, &wallet, tempdir)?; + let (client, _, mut update_subscriber) = client.subscribe(); // mine blocks let _hashes = env.mine_blocks(100, Some(miner.clone()))?; @@ -90,8 +86,9 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> { let hashes = env.mine_blocks(1, Some(miner))?; wait_for_height(&env, 102).await?; - // run node - task::spawn(async move { node.run().await }); + let client = client.start(); + let requester = client.requester(); + // get update let res = update_subscriber.update().await?; let Update { @@ -131,12 +128,8 @@ async fn update_handles_reorg() -> anyhow::Result<()> { let addr = wallet.peek_address(KeychainKind::External, 0).address; let tempdir = tempfile::tempdir()?.path().join("kyoto-data"); - let LightClient { - requester, - mut update_subscriber, - node, - .. - } = init_node(&env, &wallet, tempdir)?; + let client = init_node(&env, &wallet, tempdir)?; + let (client, _, mut update_subscriber) = client.subscribe(); // mine blocks let miner = env @@ -153,7 +146,8 @@ async fn update_handles_reorg() -> anyhow::Result<()> { let blockhash = hashes[0]; wait_for_height(&env, 102).await?; - task::spawn(async move { node.run().await }); + let client = client.start(); + let requester = client.requester(); // get update let res = update_subscriber.update().await?; @@ -192,12 +186,10 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { let addr = wallet.peek_address(KeychainKind::External, 0).address; let tempdir = tempfile::tempdir()?.path().join("kyoto-data"); - let LightClient { - requester, - mut update_subscriber, - node, - .. - } = init_node(&env, &wallet, tempdir.clone())?; + let client = init_node(&env, &wallet, tempdir.clone())?; + let (client, _, mut update_subscriber) = client.subscribe(); + let client = client.start(); + let requester = client.requester(); // mine blocks let miner = env @@ -214,8 +206,6 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { let blockhash = hashes[0]; wait_for_height(&env, 102).await?; - task::spawn(async move { node.run().await }); - // get update let res = update_subscriber.update().await?; let (anchor, anchor_txid) = *res.tx_update.anchors.iter().next().unwrap(); @@ -231,14 +221,10 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { _ = env.mine_blocks(20, Some(miner))?; // 122 wait_for_height(&env, 122).await?; - let LightClient { - requester, - mut update_subscriber, - node, - .. - } = init_node(&env, &wallet, tempdir)?; - - task::spawn(async move { node.run().await }); + let client = init_node(&env, &wallet, tempdir)?; + let (client, _, mut update_subscriber) = client.subscribe(); + let client = client.start(); + let requester = client.requester(); // expect tx to confirm at same height but different blockhash let res = update_subscriber.update().await?;