From 0a6bb9ecc1693ca500df8b554bc3ec24623143bf Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 6 Mar 2026 13:42:05 +0000 Subject: [PATCH 1/5] Make `LightClient` generic over number of wallets This commit implements the scaffolding such that the `UpdateSubscriber` may handle either a single wallet or multiple wallets. The methods available on the subscriber should change depending on the configuration, but the syncing logic should remain the same. The restriction is implemented here. --- src/builder.rs | 6 +-- src/lib.rs | 138 ++++++++++++++++++++++++++++++++++++------------ tests/client.rs | 4 +- 3 files changed, 109 insertions(+), 39 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index c89ddb6..58df8c5 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -64,7 +64,7 @@ pub trait BuilderExt { self, wallet: &Wallet, scan_type: ScanType, - ) -> Result, BuilderError>; + ) -> Result, BuilderError>; } impl BuilderExt for Builder { @@ -72,7 +72,7 @@ impl BuilderExt for Builder { mut self, wallet: &Wallet, scan_type: ScanType, - ) -> Result, BuilderError> { + ) -> Result, BuilderError> { let network = wallet.network(); if self.network().ne(&network) { return Err(BuilderError::NetworkMismatch); @@ -96,7 +96,7 @@ impl BuilderExt for Builder { event_rx, } = client; let indexed_graph = IndexedTxGraph::new(wallet.spk_index().clone()); - let update_subscriber = UpdateSubscriber::new( + let update_subscriber = UpdateSubscriber::::new( requester.clone(), scan_type, event_rx, diff --git a/src/lib.rs b/src/lib.rs index 2619d13..65c504d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,10 +32,12 @@ //! ``` #![warn(missing_docs)] +use std::collections::BTreeMap; use std::collections::HashSet; use bdk_wallet::chain::BlockId; use bdk_wallet::chain::CheckPoint; +use bdk_wallet::chain::DescriptorId; pub use bdk_wallet::Update; use bdk_wallet::chain::{keychain_txout::KeychainTxOutIndex, IndexedTxGraph}; @@ -106,25 +108,25 @@ pub struct LoggingSubscribers { /// running yet /// - [`Active`]: the client is actively fetching data and may now handle requests. #[derive(Debug)] -pub struct LightClient { +pub struct LightClient { // Send events to a running node (i.e. broadcast a transaction). requester: Requester, // Receive info/warnings from the node as it runs. logging_subscribers: Option, // Receive wallet updates from a node. - update_subscriber: Option, + update_subscriber: Option>, // The underlying node that must be run to fetch blocks from peers. node: Option, _marker: core::marker::PhantomData, } -impl LightClient { +impl LightClient { fn new( requester: Requester, logging: LoggingSubscribers, - update: UpdateSubscriber, + update: UpdateSubscriber, node: bip157::Node, - ) -> LightClient { + ) -> LightClient { LightClient { requester, logging_subscribers: Some(logging), @@ -146,9 +148,9 @@ impl LightClient { pub fn subscribe( mut self, ) -> ( - LightClient, + LightClient, LoggingSubscribers, - UpdateSubscriber, + UpdateSubscriber, ) { let logging = core::mem::take(&mut self.logging_subscribers).expect("cannot subscribe twice."); @@ -165,7 +167,7 @@ impl LightClient { } } -impl LightClient { +impl LightClient { /// Start fetching data for the wallet on a dedicated [`tokio::task`]. This will continually /// run until terminated or no peers could be found. /// @@ -173,7 +175,7 @@ impl LightClient { /// /// If there is no [`tokio::runtime::Runtime`] to drive execution. Common in synchronous /// setups. - pub fn start(mut self) -> LightClient { + pub fn start(mut self) -> LightClient { let node = core::mem::take(&mut self.node).expect("cannot start twice."); tokio::task::spawn(async move { node.run().await }); LightClient { @@ -187,7 +189,7 @@ impl LightClient { /// Take the underlying node process to run in a custom way. Examples include using a dedicated /// [`tokio::runtime::Runtime`] or [`tokio::runtime::Handle`] to drive execution. - pub fn managed_start(mut self) -> (LightClient, Node) { + pub fn managed_start(mut self) -> (LightClient, Node) { let node = core::mem::take(&mut self.node).expect("cannot start twice."); let client = LightClient { requester: self.requester, @@ -200,67 +202,115 @@ impl LightClient { } } -impl LightClient { +impl LightClient { /// The client is active and may now handle requests with a [`Requester`]. pub fn requester(self) -> Requester { self.requester } } -impl From> for Requester { - fn from(value: LightClient) -> Self { +impl From> for Requester { + fn from(value: LightClient) -> Self { value.requester } } -impl AsRef for LightClient { +impl AsRef for LightClient { fn as_ref(&self) -> &Requester { &self.requester } } +/// Tag for number of wallets. +pub mod wallets { + /// Client for a single wallet. + #[derive(Debug)] + pub struct Single; + /// Client for multiple wallets. + #[derive(Debug)] + pub struct Multiple; +} + +impl sealed::Sealed for wallets::Single {} +impl sealed::Sealed for wallets::Multiple {} + +/// Number of wallets. +pub trait Wallets: sealed::Sealed {} + +impl Wallets for wallets::Single {} +impl Wallets for wallets::Multiple {} + /// Interpret events from a node that is running to apply /// updates to an underlying wallet. #[derive(Debug)] -pub struct UpdateSubscriber { +pub struct UpdateSubscriber { // request information from the client requester: Requester, // channel receiver receiver: UnboundedReceiver, - // processes events for the wallet. - update_builder: UpdateBuilder, // queued blocks to fetch queued_blocks: Vec, // queued scripts to check filters spk_cache: HashSet, + // processes events for the wallet. + single_update_builder: Option, + // process events for multiple wallets. + multiple_updates_builder: Option>, + _marker: core::marker::PhantomData, } -impl UpdateSubscriber { +impl UpdateSubscriber { fn new( requester: Requester, scan_type: ScanType, receiver: UnboundedReceiver, cp: CheckPoint, graph: IndexedTxGraph>, - ) -> Self { + ) -> UpdateSubscriber { let update_builder = UpdateBuilder::new(cp, graph); let spk_cache = update_builder.peek_scripts_from_scantype(scan_type); - Self { + UpdateSubscriber { requester, receiver, - update_builder, + single_update_builder: Some(update_builder), + multiple_updates_builder: None, queued_blocks: Vec::new(), spk_cache, + _marker: core::marker::PhantomData, } } - /// Return the most recent update from the node once it has synced to the network's tip. - /// This may take a significant portion of time during wallet recoveries or dormant wallets. - /// Note that you may call this method in a loop as long as the node is running. - /// - /// **Warning** - /// - /// This method is _not_ cancel safe. You cannot use it within a `tokio::select` arm. - pub async fn update(&mut self) -> Result { + + fn new_multiple( + requester: Requester, + receiver: UnboundedReceiver, + wallet_iter: impl Iterator< + Item = ( + DescriptorId, + ScanType, + CheckPoint, + IndexedTxGraph>, + ), + >, + ) -> UpdateSubscriber { + let mut update_map = BTreeMap::new(); + let mut spk_cache = HashSet::new(); + for wallet in wallet_iter { + let update_builder = UpdateBuilder::new(wallet.2, wallet.3); + spk_cache.extend(update_builder.peek_scripts_from_scantype(wallet.1)); + update_map.insert(wallet.0, update_builder); + } + UpdateSubscriber { + requester, + receiver, + single_update_builder: None, + multiple_updates_builder: Some(update_map), + queued_blocks: Vec::new(), + spk_cache, + _marker: core::marker::PhantomData, + } + } + + async fn sync(&mut self) -> Result<(), UpdateError> { while let Some(message) = self.receiver.recv().await { match message { Event::IndexedFilter(filter) => { @@ -270,7 +320,9 @@ impl UpdateSubscriber { } } Event::ChainUpdate(changeset) => { - self.update_builder.apply_chain_event(changeset); + if let Some(single) = self.single_update_builder.as_mut() { + single.apply_chain_event(changeset); + } } Event::FiltersSynced(SyncUpdate { tip: _, @@ -282,11 +334,15 @@ impl UpdateSubscriber { .get_block(hash) .await .map_err(|_| UpdateError::NodeStopped)?; - self.update_builder.apply_block_event(block); + if let Some(single) = self.single_update_builder.as_mut() { + single.apply_block_event(block); + } + } + if let Some(single) = self.single_update_builder.as_mut() { + self.spk_cache + .extend(single.peek_script_to_keychain_lookahead()); } - self.spk_cache - .extend(self.update_builder.peek_script_to_keychain_lookahead()); - return Ok(self.update_builder.finish()); + return Ok(()); } _ => (), } @@ -295,6 +351,20 @@ impl UpdateSubscriber { } } +impl UpdateSubscriber { + /// Return the most recent [`Update`] for a wallet once it has synced to the network's tip. + /// This may take a significant portion of time during wallet recoveries or dormant wallets. + /// Note that you may call this method in a loop as long as the node is running. + /// + /// **Warning** + /// + /// This method is _not_ cancel safe. You cannot use it within a `tokio::select` arm. + pub async fn update(&mut self) -> Result { + self.sync().await?; + Ok(self.single_update_builder.as_mut().unwrap().finish()) + } +} + #[derive(Debug)] struct UpdateBuilder { // Changes to the wallet local chain. diff --git a/tests/client.rs b/tests/client.rs index 299d16b..c3b469c 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,5 +1,5 @@ // #![allow(unused)] -use bdk_kyoto::state::Idle; +use bdk_kyoto::{state::Idle, wallets::Single}; use std::net::IpAddr; use std::path::PathBuf; use std::time::Duration; @@ -42,7 +42,7 @@ fn init_node( env: &TestEnv, wallet: &bdk_wallet::Wallet, tempdir: PathBuf, -) -> anyhow::Result> { +) -> anyhow::Result> { let peer = env.bitcoind.params.p2p_socket.unwrap(); let ip: IpAddr = (*peer.ip()).into(); let port = peer.port(); From c9373040804c20a35c87f22e4ad7bb81ac534635 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 6 Mar 2026 14:42:43 +0000 Subject: [PATCH 2/5] Add multiple wallet syncing logic If there are multiple wallets configured, this commit updates each of them via the `UpdateBuilder`. Each update is indexed on the `DescriptorId` of the public descriptor for the external keychain. These should be unique per wallet if I understand correctly. --- src/lib.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 65c504d..bdaed4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -321,7 +321,12 @@ impl UpdateSubscriber { } Event::ChainUpdate(changeset) => { if let Some(single) = self.single_update_builder.as_mut() { - single.apply_chain_event(changeset); + single.apply_chain_event(&changeset); + } + if let Some(multiple) = self.multiple_updates_builder.as_mut() { + for (_id, builder) in multiple.iter_mut() { + builder.apply_chain_event(&changeset); + } } } Event::FiltersSynced(SyncUpdate { @@ -335,13 +340,24 @@ impl UpdateSubscriber { .await .map_err(|_| UpdateError::NodeStopped)?; if let Some(single) = self.single_update_builder.as_mut() { - single.apply_block_event(block); + single.apply_block_event(&block); + } + if let Some(multiple) = self.multiple_updates_builder.as_mut() { + for (_id, builder) in multiple.iter_mut() { + builder.apply_block_event(&block); + } } } if let Some(single) = self.single_update_builder.as_mut() { self.spk_cache .extend(single.peek_script_to_keychain_lookahead()); } + if let Some(multiple) = self.multiple_updates_builder.as_mut() { + for (_id, builder) in multiple.iter() { + self.spk_cache + .extend(builder.peek_script_to_keychain_lookahead()); + } + } return Ok(()); } _ => (), @@ -365,6 +381,29 @@ impl UpdateSubscriber { } } +impl UpdateSubscriber { + /// Return a set of [`Update`] for the configured wallets when synced to the network's tip. The + /// [`Update`] are grouped with the [`DescriptorId`] of the external descriptor for each + /// wallet. + /// + /// This may take a significant portion of time during wallet recoveries or dormant wallets. + /// Note that you may call this method in a loop as long as the node is running. + /// + /// **Warning** + /// + /// This method is _not_ cancel safe. You cannot use it within a `tokio::select` arm. + pub async fn updates( + &mut self, + ) -> Result, UpdateError> { + self.sync().await?; + let mut map = BTreeMap::new(); + for (id, builder) in self.multiple_updates_builder.as_mut().unwrap().iter_mut() { + map.insert(*id, builder.finish()); + } + Ok(map.into_iter()) + } +} + #[derive(Debug)] struct UpdateBuilder { // Changes to the wallet local chain. @@ -381,7 +420,7 @@ impl UpdateBuilder { Self { cp, graph } } - fn apply_chain_event(&mut self, event: BlockHeaderChanges) { + fn apply_chain_event(&mut self, event: &BlockHeaderChanges) { match event { BlockHeaderChanges::Connected(at) => { let block_id = BlockId { @@ -406,10 +445,10 @@ impl UpdateBuilder { } } - fn apply_block_event(&mut self, block: IndexedBlock) { + fn apply_block_event(&mut self, block: &IndexedBlock) { let height = block.height; - let block = block.block; - let _ = self.graph.apply_block_relevant(&block, height); + let block = &block.block; + let _ = self.graph.apply_block_relevant(block, height); } #[inline] From 05e7fbeae79b8ab49a0076cad56f8e4507365c72 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 6 Mar 2026 15:10:14 +0000 Subject: [PATCH 3/5] Add option to build with multiple wallets Implements a way to create the `UpdateSubscriber`. Here we need to take the lowest configured checkpoint for the scan. While this is a _rescan_ for most wallets in the list, we will need to scan these filters anyway for new wallets, and the `Wallet` should handle redundant updates anyway. --- src/builder.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/lib.rs | 2 +- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 58df8c5..26d4bd8 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -47,8 +47,11 @@ use std::fmt::Display; use bdk_wallet::{ - chain::{CheckPoint, IndexedTxGraph}, - Wallet, + chain::{ + keychain_txout::KeychainTxOutIndex, CheckPoint, ConfirmationBlockTime, DescriptorExt, + DescriptorId, IndexedTxGraph, + }, + KeychainKind, Wallet, }; pub use bip157::Builder; use bip157::{chain::ChainState, HeaderCheckpoint}; @@ -65,6 +68,12 @@ pub trait BuilderExt { wallet: &Wallet, scan_type: ScanType, ) -> Result, BuilderError>; + + /// Attempt to build the node with scripts from multiple [`Wallet`]s and following a [`ScanType`]. + fn build_with_wallets( + self, + wallets: Vec<(&Wallet, ScanType)>, + ) -> Result, BuilderError>; } impl BuilderExt for Builder { @@ -114,6 +123,72 @@ impl BuilderExt for Builder { ); Ok(client) } + + fn build_with_wallets( + mut self, + wallets: Vec<(&Wallet, ScanType)>, + ) -> Result, BuilderError> { + let network = wallets + .first() + .ok_or(BuilderError::EmptyIterator)? + .0 + .network(); + if self.network().ne(&network) { + return Err(BuilderError::NetworkMismatch); + } + let cp_min = wallets + .iter() + .map(|(wallet, scan_type)| match scan_type { + ScanType::Sync => walk_back_max_reorg(wallet.latest_checkpoint()), + ScanType::Recovery { + used_script_index: _, + checkpoint, + } => *checkpoint, + }) + .min() + .ok_or(BuilderError::EmptyIterator)?; + self = self.chain_state(ChainState::Checkpoint(cp_min)); + let (node, client) = self.build(); + let bip157::Client { + requester, + info_rx, + warn_rx, + event_rx, + } = client; + let wallet_iter = wallets + .into_iter() + .map(|(wallet, scan_type)| { + ( + wallet + .public_descriptor(KeychainKind::External) + .descriptor_id(), + scan_type, + wallet.latest_checkpoint(), + IndexedTxGraph::new(wallet.spk_index().clone()), + ) + }) + .collect::>, + )>>(); + let update_subscriber = UpdateSubscriber::::new_multiple( + requester.clone(), + event_rx, + wallet_iter.into_iter(), + ); + let client = LightClient::new( + requester, + LoggingSubscribers { + info_subscriber: info_rx, + warning_subscriber: warn_rx, + }, + update_subscriber, + node, + ); + Ok(client) + } } /// Walk back 7 blocks in case the last sync was an orphan block. @@ -134,6 +209,8 @@ fn walk_back_max_reorg(checkpoint: CheckPoint) -> HeaderCheckpoint { pub enum BuilderError { /// The wallet network and node network do not match. NetworkMismatch, + /// The wallet iterator is empty. + EmptyIterator, } impl Display for BuilderError { @@ -142,6 +219,7 @@ impl Display for BuilderError { BuilderError::NetworkMismatch => { write!(f, "wallet network and node network do not match") } + BuilderError::EmptyIterator => write!(f, "empty wallet iterator."), } } } diff --git a/src/lib.rs b/src/lib.rs index bdaed4b..62aa3d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -381,7 +381,7 @@ impl UpdateSubscriber { } } -impl UpdateSubscriber { +impl UpdateSubscriber { /// Return a set of [`Update`] for the configured wallets when synced to the network's tip. The /// [`Update`] are grouped with the [`DescriptorId`] of the external descriptor for each /// wallet. From 4027e3cab7e17a3b85a20d410ecd5a6cbabebd7d Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Sat, 7 Mar 2026 08:28:03 +0000 Subject: [PATCH 4/5] example: Add `multi` to sync two wallets Demonstrates the new API for users --- examples/multi.rs | 107 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 examples/multi.rs diff --git a/examples/multi.rs b/examples/multi.rs new file mode 100644 index 0000000..6c8dd77 --- /dev/null +++ b/examples/multi.rs @@ -0,0 +1,107 @@ +use bdk_kyoto::builder::{Builder, BuilderExt}; +use bdk_kyoto::{Info, Receiver, ScanType, UnboundedReceiver, Warning}; +use bdk_wallet::bitcoin::Network; +use bdk_wallet::chain::DescriptorExt; +use bdk_wallet::{KeychainKind, Wallet}; +use tokio::select; + +const RECV_ONE: &str = "wpkh([9122d9e0/84'/1'/0']tpubDCYVtmaSaDzTxcgvoP5AHZNbZKZzrvoNH9KARep88vESc6MxRqAp4LmePc2eeGX6XUxBcdhAmkthWTDqygPz2wLAyHWisD299Lkdrj5egY6/0/*)"; +const CHANGE_ONE: &str = "wpkh([9122d9e0/84'/1'/0']tpubDCYVtmaSaDzTxcgvoP5AHZNbZKZzrvoNH9KARep88vESc6MxRqAp4LmePc2eeGX6XUxBcdhAmkthWTDqygPz2wLAyHWisD299Lkdrj5egY6/1/*)"; +const RECV_TWO: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; +const CHANGE_TWO: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; +const NETWORK: Network = Network::Signet; + +/* Sync multiple BDK wallets */ + +async fn traces( + mut info_subscriber: Receiver, + mut warning_subscriber: UnboundedReceiver, +) { + loop { + select! { + info = info_subscriber.recv() => { + if let Some(info) = info { + match info { + Info::Progress(p) => { + tracing::info!("chain height: {}, filter download progress: {}%", p.chain_height(), p.percentage_complete()); + }, + Info::BlockReceived(b) => { + tracing::info!("downloaded block: {b}"); + }, + _ => (), + } + } + } + warn = warning_subscriber.recv() => { + if let Some(warn) = warn { + tracing::warn!("{warn}") + } + } + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber)?; + + let mut wallet_one = Wallet::create(RECV_ONE, CHANGE_ONE) + .network(NETWORK) + .create_wallet_no_persist()?; + + let mut wallet_two = Wallet::create(RECV_TWO, CHANGE_TWO) + .network(NETWORK) + .create_wallet_no_persist()?; + + // Build a request to sync for each wallet and place them in a vector. + let wallet_iter = vec![(&wallet_one, ScanType::Sync), (&wallet_two, ScanType::Sync)]; + + // Now build a client that will sync both wallets simultaneously + let client = Builder::new(NETWORK) + .build_with_wallets(wallet_iter) + .unwrap(); + let (client, logging, mut update_subscriber) = client.subscribe(); + tokio::task::spawn( + async move { traces(logging.info_subscriber, logging.warning_subscriber).await }, + ); + let client = client.start(); + let requester = client.requester(); + + // Sync and apply updates. We can do this in a continual loop while the "application" is running. + // Often this would occur on a separate thread than the underlying application user interface. + loop { + // Updates are grouped with the `DescriptorId` of the public, external descriptor. + let updates = update_subscriber.updates().await?; + for (desc_id, update) in updates { + if wallet_one + .public_descriptor(KeychainKind::External) + .descriptor_id() + .eq(&desc_id) + { + wallet_one.apply_update(update)?; + tracing::info!("Wallet one summary: "); + tracing::info!("Balance: {:#}", wallet_one.balance().total()); + tracing::info!( + "Local chain tip: {}", + wallet_one.local_chain().tip().height() + ); + } else if wallet_two + .public_descriptor(KeychainKind::External) + .descriptor_id() + .eq(&desc_id) + { + wallet_two.apply_update(update)?; + tracing::info!("Wallet two summary: "); + tracing::info!("Balance: {:#}", wallet_two.balance().total()); + tracing::info!( + "Local chain tip: {}", + wallet_two.local_chain().tip().height() + ); + } + } + let fee_filter = requester.broadcast_min_feerate().await.unwrap(); + tracing::info!("Broadcast minimum fee rate: {:#}", fee_filter); + tracing::info!("Press CTRL + C to exit."); + } +} From e2469098fb1318e382199f4097e73c0bca857279 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Sat, 7 Mar 2026 08:46:54 +0000 Subject: [PATCH 5/5] test: Add multiple wallet syncing This test takes the first wallet to a stale block, then shuts down the node. A new wallet is introduced, and we assert that the first wallet still detects and applies the reorganization. --- tests/client.rs | 93 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/client.rs b/tests/client.rs index c3b469c..fbf3a44 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,5 +1,7 @@ // #![allow(unused)] use bdk_kyoto::{state::Idle, wallets::Single}; +use bdk_wallet::chain::{DescriptorExt, DescriptorId}; +use std::collections::BTreeMap; use std::net::IpAddr; use std::path::PathBuf; use std::time::Duration; @@ -17,6 +19,8 @@ use bdk_wallet::Update; const EXTERNAL_DESCRIPTOR: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; const INTERNAL_DESCRIPTOR: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; +const RECV_TWO: &str = "wpkh([9122d9e0/84'/1'/0']tpubDCYVtmaSaDzTxcgvoP5AHZNbZKZzrvoNH9KARep88vESc6MxRqAp4LmePc2eeGX6XUxBcdhAmkthWTDqygPz2wLAyHWisD299Lkdrj5egY6/0/*)"; +const CHANGE_TWO: &str = "wpkh([9122d9e0/84'/1'/0']tpubDCYVtmaSaDzTxcgvoP5AHZNbZKZzrvoNH9KARep88vESc6MxRqAp4LmePc2eeGX6XUxBcdhAmkthWTDqygPz2wLAyHWisD299Lkdrj5egY6/1/*)"; fn testenv() -> anyhow::Result { use bdk_testenv::Config; @@ -239,3 +243,92 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +async fn two_wallets_can_update() -> anyhow::Result<()> { + let env = testenv()?; + + let mut wallet = CreateParams::new(EXTERNAL_DESCRIPTOR, INTERNAL_DESCRIPTOR) + .network(Network::Regtest) + .create_wallet_no_persist()?; + let addr = wallet.peek_address(KeychainKind::External, 0).address; + + let tempdir = tempfile::tempdir()?.path().join("kyoto-data"); + let client = init_node(&env, &wallet, tempdir.clone())?; + let (client, _, mut update_subscriber) = client.subscribe(); + let client = client.start(); + let requester = client.requester(); + + // mine blocks + let miner = env + .rpc_client() + .get_new_address(None, None)? + .assume_checked(); + let _hashes = env.mine_blocks(100, Some(miner.clone()))?; + wait_for_height(&env, 101).await?; + + // send tx + let amt = Amount::from_btc(0.21)?; + let txid = env.send(&addr, amt)?; + let hashes = env.mine_blocks(1, Some(miner.clone()))?; + let blockhash = hashes[0]; + wait_for_height(&env, 102).await?; + + // get update + let res = update_subscriber.update().await?; + let (anchor, anchor_txid) = *res.tx_update.anchors.iter().next().unwrap(); + assert_eq!(anchor.block_id.hash, blockhash); + assert_eq!(anchor_txid, txid); + wallet.apply_update(res).unwrap(); + + // shut down then reorg + requester.shutdown()?; + + let hashes = env.reorg(1)?; // 102 + let new_blockhash = hashes[0]; + _ = env.mine_blocks(20, Some(miner))?; // 122 + wait_for_height(&env, 122).await?; + + // add a new wallet to the sync request + let wallet_two = CreateParams::new(RECV_TWO, CHANGE_TWO) + .network(Network::Regtest) + .create_wallet_no_persist()?; + let peer = env.bitcoind.params.p2p_socket.unwrap(); + let ip: IpAddr = (*peer.ip()).into(); + let port = peer.port(); + let mut peer = TrustedPeer::from_ip(ip); + peer.port = Some(port); + let client = Builder::new(Network::Regtest) + .add_peer(peer) + .data_dir(tempdir) + .required_peers(1) + .build_with_wallets(vec![ + (&wallet, ScanType::Sync), + (&wallet_two, ScanType::Sync), + ])?; + let (client, _, mut update_subscriber) = client.subscribe(); + let client = client.start(); + let requester = client.requester(); + + // expect tx to confirm at same height but different blockhash + let results = update_subscriber.updates().await?; + let res = results + .collect::>() + .get( + &wallet + .public_descriptor(KeychainKind::External) + .descriptor_id(), + ) + .unwrap() + .clone(); + let (anchor, anchor_txid) = *res.tx_update.anchors.iter().next().unwrap(); + assert_eq!(anchor_txid, txid); + assert_eq!(anchor.block_id.height, 102); + assert_ne!(anchor.block_id.hash, blockhash); + assert_eq!(anchor.block_id.hash, new_blockhash); + wallet.apply_update(res).unwrap(); + + requester.shutdown()?; + + Ok(()) +}