From ea6923e2c075416882304c263b29be99ac24f824 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Fri, 5 Jan 2024 09:48:10 +0100 Subject: [PATCH 1/7] poller: make the updating process into its own function. --- src/bitcoin/poller/looper.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs index cdff3920c..a2381a617 100644 --- a/src/bitcoin/poller/looper.rs +++ b/src/bitcoin/poller/looper.rs @@ -325,6 +325,17 @@ fn sync_poll_interval() -> time::Duration { time::Duration::from_secs(0) } +/// Update our state from the Bitcoin backend. +pub fn poll( + bit: &sync::Arc>, + db: &sync::Arc>, + secp: &secp256k1::Secp256k1, + descs: &[descriptors::SinglePathLianaDesc], +) { + updates(bit, db, descs, secp); + rescan_check(bit, db, descs, secp); +} + /// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the /// `shutdown` atomic. pub fn looper( @@ -378,7 +389,6 @@ pub fn looper( } } - updates(&bit, &db, &descs, &secp); - rescan_check(&bit, &db, &descs, &secp); + poll(&bit, &db, &secp, &descs); } } From fd5387f954948303cf8403925062f577ae7bc49e Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Fri, 5 Jan 2024 09:56:52 +0100 Subject: [PATCH 2/7] poller: use the same database connection across one update round --- src/bitcoin/poller/looper.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs index a2381a617..bdd7bb9f0 100644 --- a/src/bitcoin/poller/looper.rs +++ b/src/bitcoin/poller/looper.rs @@ -208,13 +208,11 @@ fn new_tip(bit: &impl BitcoinInterface, current_tip: &BlockChainTip) -> TipUpdat } fn updates( + db_conn: &mut Box, bit: &impl BitcoinInterface, - db: &impl DatabaseInterface, descs: &[descriptors::SinglePathLianaDesc], secp: &secp256k1::Secp256k1, ) { - let mut db_conn = db.connection(); - // Check if there was a new block before updating ourselves. let current_tip = db_conn.chain_tip().expect("Always set at first startup"); let latest_tip = match new_tip(bit, ¤t_tip) { @@ -225,18 +223,18 @@ fn updates( // between our former chain and the new one, then restart fresh. db_conn.rollback_tip(&new_tip); log::info!("Tip was rolled back to '{}'.", new_tip); - return updates(bit, db, descs, secp); + return updates(db_conn, bit, descs, secp); } }; // Then check the state of our coins. Do it even if the tip did not change since last poll, as // we may have unconfirmed transactions. - let updated_coins = update_coins(bit, &mut db_conn, ¤t_tip, descs, secp); + let updated_coins = update_coins(bit, db_conn, ¤t_tip, descs, secp); // If the tip changed while we were polling our Bitcoin interface, start over. if bit.chain_tip() != latest_tip { log::info!("Chain tip changed while we were updating our state. Starting over."); - return updates(bit, db, descs, secp); + return updates(db_conn, bit, descs, secp); } // The chain tip did not change since we started our updates. Record them and the latest tip. @@ -258,13 +256,12 @@ fn updates( // Check if there is any rescan of the backend ongoing or one that just finished. fn rescan_check( + db_conn: &mut Box, bit: &impl BitcoinInterface, - db: &impl DatabaseInterface, descs: &[descriptors::SinglePathLianaDesc], secp: &secp256k1::Secp256k1, ) { log::debug!("Checking the state of an ongoing rescan if there is any"); - let mut db_conn = db.connection(); // Check if there is an ongoing rescan. If there isn't and we previously asked for a rescan of // the backend, we treat it as completed. @@ -299,7 +296,7 @@ fn rescan_check( "Rolling back our internal tip to '{}' to update our internal state with past transactions.", rescan_tip ); - updates(bit, db, descs, secp) + updates(db_conn, bit, descs, secp) } else { log::debug!("No ongoing rescan."); } @@ -332,8 +329,9 @@ pub fn poll( secp: &secp256k1::Secp256k1, descs: &[descriptors::SinglePathLianaDesc], ) { - updates(bit, db, descs, secp); - rescan_check(bit, db, descs, secp); + let mut db_conn = db.connection(); + updates(&mut db_conn, bit, descs, secp); + rescan_check(&mut db_conn, bit, descs, secp); } /// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the From b4fe963a5b2817ef07f3d96ddcc3c71f9a6606d3 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Thu, 14 Mar 2024 17:42:30 +0100 Subject: [PATCH 3/7] lib: encapsulate the handling of both threads (poller and RPC server) This is inspired from the work in https://github.com/wizardsardine/liana/pull/909 (specifically https://github.com/wizardsardine/liana/pull/909/commits/d8c59e30eda74df3e2fe5801207d806d4e4089ba) to externalize the management of the poller thread. However, there may be only one poller thread. Starting more than one can lead to a crash or potentially to data corruption. Therefore it feels safer to manage it internally. Instead of exposing the management of the poller to the user of the library, we manage both threads inside the `DaemonHandle` data structure and expose a way for a user to check for errors which may have occured in any of the threads. This makes it possible to: 1. Eventually propagate errors from the threads to the user of the daemon (https://github.com/wizardsardine/liana/pull/909); 2. Communicate internally with the poller thread, for instance to trigger a poll immediately (following commits). --- src/bin/daemon.rs | 11 +- src/bitcoin/poller/looper.rs | 69 +---------- src/bitcoin/poller/mod.rs | 107 ++++++++++++------ src/commands/mod.rs | 16 +-- src/jsonrpc/server.rs | 7 +- src/lib.rs | 214 ++++++++++++++++++++++------------- src/testutils.rs | 41 +++++-- 7 files changed, 258 insertions(+), 207 deletions(-) diff --git a/src/bin/daemon.rs b/src/bin/daemon.rs index 51e3c1347..3917dc115 100644 --- a/src/bin/daemon.rs +++ b/src/bin/daemon.rs @@ -70,13 +70,16 @@ fn main() { process::exit(1); }); - let daemon = DaemonHandle::start_default(config).unwrap_or_else(|e| { + let handle = DaemonHandle::start_default(config, true).unwrap_or_else(|e| { log::error!("Error starting Liana daemon: {}", e); process::exit(1); }); - daemon - .rpc_server() - .expect("JSONRPC server must terminate cleanly"); + while handle.is_alive() { + thread::sleep(time::Duration::from_millis(500)); + } + if let Err(e) = handle.stop() { + log::error!("Error stopping Liana daemon: {}", e); + } // We are always logging to stdout, should it be then piped to the log file (if self) or // not. So just make sure that all messages were actually written. diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs index bdd7bb9f0..93c85749d 100644 --- a/src/bitcoin/poller/looper.rs +++ b/src/bitcoin/poller/looper.rs @@ -4,11 +4,7 @@ use crate::{ descriptors, }; -use std::{ - collections::HashSet, - sync::{self, atomic}, - thread, time, -}; +use std::{collections::HashSet, sync, time}; use miniscript::bitcoin::{self, secp256k1}; @@ -302,8 +298,8 @@ fn rescan_check( } } -// If the database chain tip is NULL (first startup), initialize it. -fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { +/// If the database chain tip is NULL (first startup), initialize it. +pub fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { let mut db_conn = db.connection(); if db_conn.chain_tip().is_none() { @@ -312,7 +308,7 @@ fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface } } -fn sync_poll_interval() -> time::Duration { +pub fn sync_poll_interval() -> time::Duration { // TODO: be smarter, like in revaultd, but more generic too. #[cfg(not(test))] { @@ -333,60 +329,3 @@ pub fn poll( updates(&mut db_conn, bit, descs, secp); rescan_check(&mut db_conn, bit, descs, secp); } - -/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the -/// `shutdown` atomic. -pub fn looper( - bit: sync::Arc>, - db: sync::Arc>, - shutdown: sync::Arc, - poll_interval: time::Duration, - desc: descriptors::LianaDescriptor, -) { - let mut last_poll = None; - let mut synced = false; - let descs = [ - desc.receive_descriptor().clone(), - desc.change_descriptor().clone(), - ]; - let secp = secp256k1::Secp256k1::verification_only(); - - maybe_initialize_tip(&bit, &db); - - while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { - let now = time::Instant::now(); - - if let Some(last_poll) = last_poll { - let time_since_poll = now.duration_since(last_poll); - let poll_interval = if synced { - poll_interval - } else { - // Until we are synced we poll less often to avoid harassing bitcoind and impeding - // the sync. As a function since it's mocked for the tests. - sync_poll_interval() - }; - if time_since_poll < poll_interval { - thread::sleep(time::Duration::from_millis(500)); - continue; - } - } - last_poll = Some(now); - - // Don't poll until the Bitcoin backend is fully synced. - if !synced { - let progress = bit.sync_progress(); - log::info!( - "Block chain synchronization progress: {:.2}% ({} blocks / {} headers)", - progress.rounded_up_progress() * 100.0, - progress.blocks, - progress.headers - ); - synced = progress.is_complete(); - if !synced { - continue; - } - } - - poll(&bit, &db, &secp, &descs); - } -} diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index 7731af2e1..1825e8a1c 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -1,60 +1,95 @@ mod looper; -use crate::{ - bitcoin::{poller::looper::looper, BitcoinInterface}, - database::DatabaseInterface, - descriptors, -}; +use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors}; use std::{ sync::{self, atomic}, thread, time, }; +use miniscript::bitcoin::secp256k1; + /// The Bitcoin poller handler. pub struct Poller { - handle: thread::JoinHandle<()>, - shutdown: sync::Arc, + bit: sync::Arc>, + db: sync::Arc>, + secp: secp256k1::Secp256k1, + // The receive and change descriptors (in this order). + descs: [descriptors::SinglePathLianaDesc; 2], } impl Poller { - pub fn start( + pub fn new( bit: sync::Arc>, db: sync::Arc>, - poll_interval: time::Duration, desc: descriptors::LianaDescriptor, ) -> Poller { - let shutdown = sync::Arc::from(atomic::AtomicBool::from(false)); - let handle = thread::Builder::new() - .name("Bitcoin poller".to_string()) - .spawn({ - let shutdown = shutdown.clone(); - move || looper(bit, db, shutdown, poll_interval, desc) - }) - .expect("Must not fail"); - - Poller { shutdown, handle } - } + let secp = secp256k1::Secp256k1::verification_only(); + let descs = [ + desc.receive_descriptor().clone(), + desc.change_descriptor().clone(), + ]; - pub fn trigger_stop(&self) { - self.shutdown.store(true, atomic::Ordering::Relaxed); - } + // On first startup the tip may be NULL. Make sure it's set as the poller relies on it. + looper::maybe_initialize_tip(&bit, &db); - pub fn stop(self) { - self.trigger_stop(); - self.handle.join().expect("The poller loop must not fail"); + Poller { + bit, + db, + secp, + descs, + } } - #[cfg(feature = "nonblocking_shutdown")] - pub fn is_stopped(&self) -> bool { - // Doc says "This might return true for a brief moment after the thread’s main function has - // returned, but before the thread itself has stopped running.". But it's not an issue for - // us, as long as the main poller function has returned we are good. - self.handle.is_finished() - } + /// Continuously update our state from the Bitcoin backend. + /// - `poll_interval`: how frequently to perform an update. + /// - `shutdown`: set to true to stop continuously updating and make this function return. + /// + /// Typically this would run for the whole duration of the program in a thread, and the main + /// thread would set the `shutdown` atomic to `true` when shutting down. + pub fn poll_forever( + &self, + poll_interval: time::Duration, + shutdown: sync::Arc, + ) { + let mut last_poll = None; + let mut synced = false; + + while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { + let now = time::Instant::now(); + + if let Some(last_poll) = last_poll { + let time_since_poll = now.duration_since(last_poll); + let poll_interval = if synced { + poll_interval + } else { + // Until we are synced we poll less often to avoid harassing bitcoind and impeding + // the sync. As a function since it's mocked for the tests. + looper::sync_poll_interval() + }; + if time_since_poll < poll_interval { + thread::sleep(time::Duration::from_millis(500)); + continue; + } + } + last_poll = Some(now); + + // Don't poll until the Bitcoin backend is fully synced. + if !synced { + let progress = self.bit.sync_progress(); + log::info!( + "Block chain synchronization progress: {:.2}% ({} blocks / {} headers)", + progress.rounded_up_progress() * 100.0, + progress.blocks, + progress.headers + ); + synced = progress.is_complete(); + if !synced { + continue; + } + } - #[cfg(test)] - pub fn test_stop(&mut self) { - self.shutdown.store(true, atomic::Ordering::Relaxed); + looper::poll(&self.bit, &self.db, &self.secp, &self.descs); + } } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 6cb372d73..25af70407 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1251,7 +1251,7 @@ mod tests { fn getinfo() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); // We can query getinfo - ms.handle.control.get_info(); + ms.control().get_info(); ms.shutdown(); } @@ -1259,7 +1259,7 @@ mod tests { fn getnewaddress() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); // We can get an address let addr = control.get_new_address().address; assert_eq!( @@ -1281,7 +1281,7 @@ mod tests { fn listaddresses() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let list = control.list_addresses(Some(2), Some(5)).unwrap(); @@ -1412,7 +1412,7 @@ mod tests { ), ); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); // Arguments sanity checking let dummy_addr = @@ -1900,7 +1900,7 @@ mod tests { .insert(dummy_op_a.txid, (dummy_tx.clone(), None)); dummy_bitcoind.txs.insert(dummy_op_b.txid, (dummy_tx, None)); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let mut db_conn = control.db().lock().unwrap().connection(); // Add two (unconfirmed) coins in DB @@ -2046,7 +2046,7 @@ mod tests { let dummy_txid_a = dummy_psbt_a.unsigned_tx.txid(); dummy_bitcoind.txs.insert(dummy_txid_a, (dummy_tx_a, None)); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let mut db_conn = control.db().lock().unwrap().connection(); // The spend needs to be in DB before using RBF. assert_eq!( @@ -2284,7 +2284,7 @@ mod tests { let ms = DummyLiana::new(btc, db); - let control = &ms.handle.control; + let control = &ms.control(); let transactions = control.list_confirmed_transactions(0, 4, 10).transactions; assert_eq!(transactions.len(), 4); @@ -2416,7 +2416,7 @@ mod tests { let ms = DummyLiana::new(btc, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let transactions = control.list_transactions(&[tx1.txid()]).transactions; assert_eq!(transactions.len(), 1); diff --git a/src/jsonrpc/server.rs b/src/jsonrpc/server.rs index dc6f7ca1c..424adc2a0 100644 --- a/src/jsonrpc/server.rs +++ b/src/jsonrpc/server.rs @@ -122,11 +122,11 @@ fn connection_handler( pub fn rpcserver_loop( listener: net::UnixListener, daemon_control: DaemonControl, + shutdown: sync::Arc, ) -> Result<(), io::Error> { // Keep it simple. We don't need great performances so just treat each connection in // its thread, with a given maximum number of connections. let connections_counter = sync::Arc::from(atomic::AtomicU32::new(0)); - let shutdown = sync::Arc::from(atomic::AtomicBool::new(false)); listener.set_nonblocking(true)?; while !shutdown.load(atomic::Ordering::Relaxed) { @@ -400,7 +400,7 @@ mod tests { #[cfg(not(target_os = "macos"))] #[test] fn server_sanity_check() { - let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); + let ms = DummyLiana::new_server(DummyBitcoind::new(), DummyDatabase::new()); let socket_path: path::PathBuf = [ ms.tmp_dir.as_path(), path::Path::new("d"), @@ -410,7 +410,6 @@ mod tests { .iter() .collect(); - let t = thread::spawn(move || ms.rpc_server().unwrap()); while !socket_path.exists() { thread::sleep(time::Duration::from_millis(100)); } @@ -426,6 +425,6 @@ mod tests { &[&serde_json::to_vec(&stop_req).unwrap(), b"\n"], ); - t.join().unwrap(); + ms.shutdown(); } } diff --git a/src/lib.rs b/src/lib.rs index b608b24e7..163c14be0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,7 @@ use crate::{ }, }; -use std::{error, fmt, fs, io, path, sync}; +use std::{error, fmt, fs, io, path, sync, thread}; use miniscript::bitcoin::secp256k1; @@ -261,7 +261,7 @@ pub struct DaemonControl { } impl DaemonControl { - pub fn new( + pub(crate) fn new( config: Config, bitcoin: sync::Arc>, db: sync::Arc>, @@ -282,13 +282,28 @@ impl DaemonControl { } } -pub struct DaemonHandle { - pub control: DaemonControl, - bitcoin_poller: poller::Poller, +/// The handle to a Liana daemon. It might either be the handle for a daemon which exposes a +/// JSONRPC server or one which exposes its API through a `DaemonControl`. +pub enum DaemonHandle { + Controller { + poller_shutdown: sync::Arc, + poller_handle: thread::JoinHandle<()>, + control: DaemonControl, + }, + Server { + poller_shutdown: sync::Arc, + poller_handle: thread::JoinHandle<()>, + rpcserver_shutdown: sync::Arc, + rpcserver_handle: thread::JoinHandle>, + }, } impl DaemonHandle { - /// This starts the Liana daemon. Call `shutdown` to shut it down. + /// This starts the Liana daemon. A user of this interface should regularly poll the `is_alive` + /// method to check for internal errors. To shut down the daemon use the `stop` method. + /// + /// The `with_rpc_server` controls whether we should start a JSONRPC server to receive queries + /// or instead return a `DaemonControl` object for a caller to access the daemon's API. /// /// You may specify a custom Bitcoin interface through the `bitcoin` parameter. If `None`, the /// default Bitcoin interface (`bitcoind` JSONRPC) will be used. @@ -301,6 +316,7 @@ impl DaemonHandle { config: Config, bitcoin: Option, db: Option, + with_rpc_server: bool, ) -> Result { #[cfg(not(test))] setup_panic_hook(); @@ -353,82 +369,119 @@ impl DaemonHandle { } } - // Spawn the bitcoind poller with a retry limit high enough that we'd fail after that. - let bitcoin_poller = poller::Poller::start( - bit.clone(), - db.clone(), - config.bitcoin_config.poll_interval_secs, - config.main_descriptor.clone(), - ); - - // Finally, set up the API. + // Start the poller thread. Keep the thread handle to be able to check if it crashed. Store + // an atomic to be able to stop it. + let bitcoin_poller = + poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone()); + let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let poller_handle = thread::Builder::new() + .name("Bitcoin Network poller".to_string()) + .spawn({ + let poll_interval = config.bitcoin_config.poll_interval_secs; + let shutdown = poller_shutdown.clone(); + move || { + log::info!("Bitcoin poller started."); + bitcoin_poller.poll_forever(poll_interval, shutdown); + log::info!("Bitcoin poller stopped."); + } + }) + .expect("Spawning the poller thread must never fail."); + + // Create the API the external world will use to talk to us, either directly through the Rust + // structure or through the JSONRPC server we may setup below. let control = DaemonControl::new(config, bit, db, secp); - Ok(Self { - control, - bitcoin_poller, + Ok(if with_rpc_server { + let rpcserver_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let rpcserver_handle = thread::Builder::new() + .name("Bitcoin Network poller".to_string()) + .spawn({ + let shutdown = rpcserver_shutdown.clone(); + move || { + let mut rpc_socket = data_dir; + rpc_socket.push("lianad_rpc"); + let listener = rpcserver_setup(&rpc_socket)?; + log::info!("JSONRPC server started."); + + rpcserver_loop(listener, control, shutdown)?; + log::info!("JSONRPC server stopped."); + Ok(()) + } + }) + .expect("Spawning the RPC server thread should never fail."); + + DaemonHandle::Server { + poller_shutdown, + poller_handle, + rpcserver_shutdown, + rpcserver_handle, + } + } else { + DaemonHandle::Controller { + poller_shutdown, + poller_handle, + control, + } }) } /// Start the Liana daemon with the default Bitcoin and database interfaces (`bitcoind` RPC /// and SQLite). - pub fn start_default(config: Config) -> Result { - DaemonHandle::start(config, Option::::None, Option::::None) - } - - /// Start the JSONRPC server and listen for incoming commands until we die. - /// Like DaemonHandle::shutdown(), this stops the Bitcoin poller at teardown. - #[cfg(feature = "daemon")] - pub fn rpc_server(self) -> Result<(), io::Error> { - let DaemonHandle { - control, - bitcoin_poller: poller, - } = self; - - let rpc_socket: path::PathBuf = [ - control - .config - .data_dir() - .expect("Didn't fail at startup, must not now") - .as_path(), - path::Path::new(&control.config.bitcoin_config.network.to_string()), - path::Path::new("lianad_rpc"), - ] - .iter() - .collect(); - let listener = rpcserver_setup(&rpc_socket)?; - log::info!("JSONRPC server started."); - - rpcserver_loop(listener, control)?; - log::info!("JSONRPC server stopped."); - - poller.stop(); - - Ok(()) - } - - /// Shut down the Liana daemon. - pub fn shutdown(self) { - self.bitcoin_poller.stop(); - } - - /// Tell the daemon to shut down. This will return before the shutdown completes. The structure - /// must not be reused after triggering shutdown. - #[cfg(feature = "nonblocking_shutdown")] - pub fn trigger_shutdown(&self) { - self.bitcoin_poller.trigger_stop() + pub fn start_default( + config: Config, + with_rpc_server: bool, + ) -> Result { + Self::start( + config, + Option::::None, + Option::::None, + with_rpc_server, + ) } - /// Whether the daemon has finished shutting down. - #[cfg(feature = "nonblocking_shutdown")] - pub fn shutdown_complete(&self) -> bool { - self.bitcoin_poller.is_stopped() + /// Check whether the daemon is still up and running. This needs to be regularly polled to + /// check for internal errors. If this returns `false`, collect the error using the `stop` + /// method. + pub fn is_alive(&self) -> bool { + match self { + Self::Controller { + ref poller_handle, .. + } => !poller_handle.is_finished(), + Self::Server { + ref poller_handle, + ref rpcserver_handle, + .. + } => !poller_handle.is_finished() && !rpcserver_handle.is_finished(), + } } - // We need a shutdown utility that does not move for implementing Drop for the DummyLiana - #[cfg(test)] - pub fn test_shutdown(&mut self) { - self.bitcoin_poller.test_stop(); + /// Stop the Liana daemon. This returns any error which may have occurred. + pub fn stop(self) -> Result<(), Box> { + match self { + Self::Controller { + poller_shutdown, + poller_handle, + .. + } => { + poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_handle.join().expect("Poller thread must not panic"); + Ok(()) + } + Self::Server { + poller_shutdown, + poller_handle, + rpcserver_shutdown, + rpcserver_handle, + } => { + poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed); + rpcserver_handle + .join() + .expect("Poller thread must not panic")?; + poller_handle.join().expect("Poller thread must not panic"); + Ok(()) + } + } } } @@ -681,11 +734,11 @@ mod tests { }; // Start the daemon in a new thread so the current one acts as the bitcoind server. - let daemon_thread = thread::spawn({ + let t = thread::spawn({ let config = config.clone(); move || { - let handle = DaemonHandle::start_default(config).unwrap(); - handle.shutdown(); + let handle = DaemonHandle::start_default(config, false).unwrap(); + handle.stop().unwrap(); } }); complete_sanity_check(&server); @@ -697,12 +750,15 @@ mod tests { complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_tip_init(&server); complete_sync_check(&server); - daemon_thread.join().unwrap(); + t.join().unwrap(); // The datadir is created now, so if we restart it it won't create the wo wallet. - let daemon_thread = thread::spawn(move || { - let handle = DaemonHandle::start_default(config).unwrap(); - handle.shutdown(); + let t = thread::spawn({ + let config = config.clone(); + move || { + let handle = DaemonHandle::start_default(config, false).unwrap(); + handle.stop().unwrap(); + } }); complete_sanity_check(&server); complete_version_check(&server); @@ -711,7 +767,7 @@ mod tests { complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_sync_check(&server); - daemon_thread.join().unwrap(); + t.join().unwrap(); fs::remove_dir_all(&tmp_dir).unwrap(); } diff --git a/src/testutils.rs b/src/testutils.rs index 9e2adfc06..35ff27d49 100644 --- a/src/testutils.rs +++ b/src/testutils.rs @@ -2,13 +2,13 @@ use crate::{ bitcoin::{BitcoinInterface, Block, BlockChainTip, MempoolEntry, SyncProgress, UTxO}, config::{BitcoinConfig, Config}, database::{BlockInfo, Coin, CoinStatus, DatabaseConnection, DatabaseInterface, LabelItem}, - descriptors, DaemonHandle, + descriptors, DaemonControl, DaemonHandle, }; use std::convert::TryInto; use std::{ collections::{HashMap, HashSet}, - env, fs, io, path, process, + env, fs, path, process, str::FromStr, sync, thread, time, time::{SystemTime, UNIX_EPOCH}, @@ -464,9 +464,10 @@ pub fn tmp_dir() -> path::PathBuf { impl DummyLiana { /// Creates a new DummyLiana interface - pub fn new( + pub fn _new( bitcoin_interface: impl BitcoinInterface + 'static, database: impl DatabaseInterface + 'static, + rpc_server: bool, ) -> DummyLiana { let tmp_dir = tmp_dir(); fs::create_dir_all(&tmp_dir).unwrap(); @@ -497,19 +498,37 @@ impl DummyLiana { main_descriptor: desc, }; - let handle = DaemonHandle::start(config, Some(bitcoin_interface), Some(database)).unwrap(); + let handle = + DaemonHandle::start(config, Some(bitcoin_interface), Some(database), rpc_server) + .unwrap(); DummyLiana { tmp_dir, handle } } - #[cfg(feature = "daemon")] - pub fn rpc_server(self) -> Result<(), io::Error> { - self.handle.rpc_server()?; - fs::remove_dir_all(&self.tmp_dir)?; - Ok(()) + /// Creates a new DummyLiana interface + pub fn new( + bitcoin_interface: impl BitcoinInterface + 'static, + database: impl DatabaseInterface + 'static, + ) -> DummyLiana { + Self::_new(bitcoin_interface, database, false) + } + + /// Creates a new DummyLiana interface which also spins up an RPC server. + pub fn new_server( + bitcoin_interface: impl BitcoinInterface + 'static, + database: impl DatabaseInterface + 'static, + ) -> DummyLiana { + Self::_new(bitcoin_interface, database, true) + } + + pub fn control(&self) -> &DaemonControl { + match self.handle { + DaemonHandle::Controller { ref control, .. } => control, + DaemonHandle::Server { .. } => unreachable!(), + } } pub fn shutdown(self) { - self.handle.shutdown(); - fs::remove_dir_all(&self.tmp_dir).unwrap(); + self.handle.stop().unwrap(); + fs::remove_dir_all(self.tmp_dir).unwrap(); } } From f6ce85cfd32f669f6eb061ed8b367f4182512b95 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Thu, 14 Mar 2024 17:53:16 +0100 Subject: [PATCH 4/7] lib: remove the panic hook. We now provide a way for a user of the daemon to poll for errors in the threads, so aborting the process on a thread panic shouldn't be necessary anymore. --- src/lib.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 163c14be0..4b57b1905 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,7 +33,7 @@ use std::{error, fmt, fs, io, path, sync, thread}; use miniscript::bitcoin::secp256k1; #[cfg(not(test))] -use std::{panic, process}; +use std::panic; // A panic in any thread should stop the main thread, and print the panic. #[cfg(not(test))] fn setup_panic_hook() { @@ -60,8 +60,6 @@ fn setup_panic_hook() { info, bt ); - - process::exit(1); })); } @@ -309,9 +307,6 @@ impl DaemonHandle { /// default Bitcoin interface (`bitcoind` JSONRPC) will be used. /// You may specify a custom Database interface through the `db` parameter. If `None`, the /// default Database interface (SQLite) will be used. - /// - /// **Note**: we internally use threads, and set a panic hook. A downstream application must - /// not overwrite this panic hook. pub fn start( config: Config, bitcoin: Option, From 1cf42d9aeec34b184d93484ea25d9a843a45bc3f Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Fri, 15 Mar 2024 11:15:53 +0100 Subject: [PATCH 5/7] poller: introduce a communication channel with the poller thread We'll need to ask the poller thread another thing besides to shut down, so it's cleaner to start using proper messages. The mpsc channel in the std lib was buggy for awhile but since they merged crossbeam and are using this behind the hood now it should be fine starting with Rust 1.67. That's (slightly) higher than our MSRV but it's what we use for releases so that's reasonable. See https://github.com/rust-lang/rust/issues/39364 for details. --- src/bitcoin/poller/mod.rs | 48 +++++++++++++++++++++++++++----------- src/lib.rs | 49 ++++++++++++++++++--------------------- 2 files changed, 57 insertions(+), 40 deletions(-) diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index 1825e8a1c..e760f211b 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -3,12 +3,17 @@ mod looper; use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors}; use std::{ - sync::{self, atomic}, - thread, time, + sync::{self, mpsc}, + time, }; use miniscript::bitcoin::secp256k1; +#[derive(Debug, Clone)] +pub enum PollerMessage { + Shutdown, +} + /// The Bitcoin poller handler. pub struct Poller { bit: sync::Arc>, @@ -50,29 +55,44 @@ impl Poller { pub fn poll_forever( &self, poll_interval: time::Duration, - shutdown: sync::Arc, + receiver: mpsc::Receiver, ) { let mut last_poll = None; let mut synced = false; - while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { - let now = time::Instant::now(); - - if let Some(last_poll) = last_poll { - let time_since_poll = now.duration_since(last_poll); + loop { + // How long to wait before the next poll. + let time_before_poll = if let Some(last_poll) = last_poll { + let time_since_poll = time::Instant::now().duration_since(last_poll); + // Until we are synced we poll less often to avoid harassing bitcoind and impeding + // the sync. As a function since it's mocked for the tests. let poll_interval = if synced { poll_interval } else { - // Until we are synced we poll less often to avoid harassing bitcoind and impeding - // the sync. As a function since it's mocked for the tests. looper::sync_poll_interval() }; - if time_since_poll < poll_interval { - thread::sleep(time::Duration::from_millis(500)); - continue; + poll_interval.saturating_sub(time_since_poll) + } else { + // Don't wait before doing the first poll. + time::Duration::ZERO + }; + + // Wait for the duration of the interval between polls, but listen to messages in the + // meantime. + match receiver.recv_timeout(time_before_poll) { + Ok(PollerMessage::Shutdown) => { + log::info!("Bitcoin poller was told to shut down."); + return; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // It's been long enough since the last poll. + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + log::error!("Bitcoin poller communication channel got disconnected. Exiting."); + return; } } - last_poll = Some(now); + last_poll = Some(time::Instant::now()); // Don't poll until the Bitcoin backend is fully synced. if !synced { diff --git a/src/lib.rs b/src/lib.rs index 4b57b1905..d3a5222b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,11 @@ use crate::{ }, }; -use std::{error, fmt, fs, io, path, sync, thread}; +use std::{ + error, fmt, fs, io, path, + sync::{self, mpsc}, + thread, +}; use miniscript::bitcoin::secp256k1; @@ -284,12 +288,12 @@ impl DaemonControl { /// JSONRPC server or one which exposes its API through a `DaemonControl`. pub enum DaemonHandle { Controller { - poller_shutdown: sync::Arc, + poller_sender: mpsc::SyncSender, poller_handle: thread::JoinHandle<()>, control: DaemonControl, }, Server { - poller_shutdown: sync::Arc, + poller_sender: mpsc::SyncSender, poller_handle: thread::JoinHandle<()>, rpcserver_shutdown: sync::Arc, rpcserver_handle: thread::JoinHandle>, @@ -368,15 +372,14 @@ impl DaemonHandle { // an atomic to be able to stop it. let bitcoin_poller = poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone()); - let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let (poller_sender, poller_receiver) = mpsc::sync_channel(0); let poller_handle = thread::Builder::new() .name("Bitcoin Network poller".to_string()) .spawn({ let poll_interval = config.bitcoin_config.poll_interval_secs; - let shutdown = poller_shutdown.clone(); move || { log::info!("Bitcoin poller started."); - bitcoin_poller.poll_forever(poll_interval, shutdown); + bitcoin_poller.poll_forever(poll_interval, poller_receiver); log::info!("Bitcoin poller stopped."); } }) @@ -406,14 +409,14 @@ impl DaemonHandle { .expect("Spawning the RPC server thread should never fail."); DaemonHandle::Server { - poller_shutdown, + poller_sender, poller_handle, rpcserver_shutdown, rpcserver_handle, } } else { DaemonHandle::Controller { - poller_shutdown, + poller_sender, poller_handle, control, } @@ -454,21 +457,25 @@ impl DaemonHandle { pub fn stop(self) -> Result<(), Box> { match self { Self::Controller { - poller_shutdown, + poller_sender, poller_handle, .. } => { - poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); poller_handle.join().expect("Poller thread must not panic"); Ok(()) } Self::Server { - poller_shutdown, + poller_sender, poller_handle, rpcserver_shutdown, rpcserver_handle, } => { - poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed); rpcserver_handle .join() @@ -656,18 +663,6 @@ mod tests { stream.flush().unwrap(); } - // Send them a response to 'getblockchaininfo' saying we are far from being synced - fn complete_sync_check(server: &net::TcpListener) { - let net_resp = [ - "HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1,\"headers\":1000,\"blocks\":100}}\n".as_bytes(), - ] - .concat(); - let (mut stream, _) = server.accept().unwrap(); - read_til_json_end(&mut stream); - stream.write_all(&net_resp).unwrap(); - stream.flush().unwrap(); - } - // TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the // bitcoind interface, and use the DummyLiana from testutils to sanity check the startup. // Note that startup as checked by this unit test is also tested in the functional test @@ -744,7 +739,8 @@ mod tests { complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_tip_init(&server); - complete_sync_check(&server); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. t.join().unwrap(); // The datadir is created now, so if we restart it it won't create the wo wallet. @@ -761,7 +757,8 @@ mod tests { complete_wallet_loading(&server); complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); - complete_sync_check(&server); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. t.join().unwrap(); fs::remove_dir_all(&tmp_dir).unwrap(); From b7fde6a9e433afed1f23cabbab59093d953b2d54 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Fri, 15 Mar 2024 11:29:56 +0100 Subject: [PATCH 6/7] commands: update our state immediately after broadcasting a tx --- src/bitcoin/poller/mod.rs | 13 +++++++++++++ src/commands/mod.rs | 17 +++++++++++++++-- src/lib.rs | 5 ++++- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index e760f211b..8e9874e16 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -12,6 +12,9 @@ use miniscript::bitcoin::secp256k1; #[derive(Debug, Clone)] pub enum PollerMessage { Shutdown, + /// Ask the Bitcoin poller to poll immediately, get notified through the passed channel once + /// it's done. + PollNow(mpsc::SyncSender<()>), } /// The Bitcoin poller handler. @@ -84,6 +87,16 @@ impl Poller { log::info!("Bitcoin poller was told to shut down."); return; } + Ok(PollerMessage::PollNow(sender)) => { + // We've been asked to poll, don't wait any further and signal completion to + // the caller. + last_poll = Some(time::Instant::now()); + looper::poll(&self.bit, &self.db, &self.secp, &self.descs); + if let Err(e) = sender.send(()) { + log::error!("Error sending immediate poll completion signal: {}.", e); + } + continue; + } Err(mpsc::RecvTimeoutError::Timeout) => { // It's been long enough since the last poll. } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 25af70407..a77fb8d25 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -8,6 +8,7 @@ use crate::{ bitcoin::BitcoinInterface, database::{Coin, DatabaseConnection, DatabaseInterface}, descriptors, + poller::PollerMessage, spend::{ create_spend, AddrInfo, AncestorInfo, CandidateCoin, CreateSpendRes, SpendCreationError, SpendOutputAddress, SpendTxFees, TxGetter, @@ -24,7 +25,8 @@ use utils::{ use std::{ collections::{hash_map, HashMap, HashSet}, - fmt, sync, + fmt, + sync::{self, mpsc}, }; use miniscript::{ @@ -688,7 +690,18 @@ impl DaemonControl { let final_tx = spend_psbt.extract_tx_unchecked_fee_rate(); self.bitcoin .broadcast_tx(&final_tx) - .map_err(CommandError::TxBroadcast) + .map_err(CommandError::TxBroadcast)?; + + // Finally, update our state with the changes from this transaction. + let (tx, rx) = mpsc::sync_channel(0); + if let Err(e) = self.poller_sender.send(PollerMessage::PollNow(tx)) { + log::error!("Error requesting update from poller: {}", e); + } + if let Err(e) = rx.recv() { + log::error!("Error receiving completion signal from poller: {}", e); + } + + Ok(()) } /// Create PSBT to replace the given transaction using RBF. diff --git a/src/lib.rs b/src/lib.rs index d3a5222b1..0381f9f97 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -257,6 +257,7 @@ fn setup_bitcoind( pub struct DaemonControl { config: Config, bitcoin: sync::Arc>, + poller_sender: mpsc::SyncSender, // FIXME: Should we require Sync on DatabaseInterface rather than using a Mutex? db: sync::Arc>, secp: secp256k1::Secp256k1, @@ -266,12 +267,14 @@ impl DaemonControl { pub(crate) fn new( config: Config, bitcoin: sync::Arc>, + poller_sender: mpsc::SyncSender, db: sync::Arc>, secp: secp256k1::Secp256k1, ) -> DaemonControl { DaemonControl { config, bitcoin, + poller_sender, db, secp, } @@ -387,7 +390,7 @@ impl DaemonHandle { // Create the API the external world will use to talk to us, either directly through the Rust // structure or through the JSONRPC server we may setup below. - let control = DaemonControl::new(config, bit, db, secp); + let control = DaemonControl::new(config, bit, poller_sender.clone(), db, secp); Ok(if with_rpc_server { let rpcserver_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); From 58c71c794a050a2df460aec241c90efe89cf9ae0 Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Sat, 16 Mar 2024 11:55:03 +0100 Subject: [PATCH 7/7] lib: gate the RPC server availability on the 'daemon' feature This is a temporary hack. We should improve this API. --- src/lib.rs | 41 ++++++++++++++++++++++++++++------------- src/testutils.rs | 13 ++++++++++--- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0381f9f97..19f02a63f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -295,6 +295,7 @@ pub enum DaemonHandle { poller_handle: thread::JoinHandle<()>, control: DaemonControl, }, + #[cfg(feature = "daemon")] Server { poller_sender: mpsc::SyncSender, poller_handle: thread::JoinHandle<()>, @@ -318,7 +319,7 @@ impl DaemonHandle { config: Config, bitcoin: Option, db: Option, - with_rpc_server: bool, + #[cfg(feature = "daemon")] with_rpc_server: bool, ) -> Result { #[cfg(not(test))] setup_panic_hook(); @@ -392,7 +393,8 @@ impl DaemonHandle { // structure or through the JSONRPC server we may setup below. let control = DaemonControl::new(config, bit, poller_sender.clone(), db, secp); - Ok(if with_rpc_server { + #[cfg(feature = "daemon")] + if with_rpc_server { let rpcserver_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); let rpcserver_handle = thread::Builder::new() .name("Bitcoin Network poller".to_string()) @@ -411,18 +413,18 @@ impl DaemonHandle { }) .expect("Spawning the RPC server thread should never fail."); - DaemonHandle::Server { + return Ok(DaemonHandle::Server { poller_sender, poller_handle, rpcserver_shutdown, rpcserver_handle, - } - } else { - DaemonHandle::Controller { - poller_sender, - poller_handle, - control, - } + }); + } + + Ok(DaemonHandle::Controller { + poller_sender, + poller_handle, + control, }) } @@ -430,12 +432,13 @@ impl DaemonHandle { /// and SQLite). pub fn start_default( config: Config, - with_rpc_server: bool, + #[cfg(feature = "daemon")] with_rpc_server: bool, ) -> Result { Self::start( config, Option::::None, Option::::None, + #[cfg(feature = "daemon")] with_rpc_server, ) } @@ -448,6 +451,7 @@ impl DaemonHandle { Self::Controller { ref poller_handle, .. } => !poller_handle.is_finished(), + #[cfg(feature = "daemon")] Self::Server { ref poller_handle, ref rpcserver_handle, @@ -470,6 +474,7 @@ impl DaemonHandle { poller_handle.join().expect("Poller thread must not panic"); Ok(()) } + #[cfg(feature = "daemon")] Self::Server { poller_sender, poller_handle, @@ -730,7 +735,12 @@ mod tests { let t = thread::spawn({ let config = config.clone(); move || { - let handle = DaemonHandle::start_default(config, false).unwrap(); + let handle = DaemonHandle::start_default( + config, + #[cfg(feature = "daemon")] + false, + ) + .unwrap(); handle.stop().unwrap(); } }); @@ -750,7 +760,12 @@ mod tests { let t = thread::spawn({ let config = config.clone(); move || { - let handle = DaemonHandle::start_default(config, false).unwrap(); + let handle = DaemonHandle::start_default( + config, + #[cfg(feature = "daemon")] + false, + ) + .unwrap(); handle.stop().unwrap(); } }); diff --git a/src/testutils.rs b/src/testutils.rs index 35ff27d49..712d39069 100644 --- a/src/testutils.rs +++ b/src/testutils.rs @@ -498,9 +498,14 @@ impl DummyLiana { main_descriptor: desc, }; - let handle = - DaemonHandle::start(config, Some(bitcoin_interface), Some(database), rpc_server) - .unwrap(); + let handle = DaemonHandle::start( + config, + Some(bitcoin_interface), + Some(database), + #[cfg(feature = "daemon")] + rpc_server, + ) + .unwrap(); DummyLiana { tmp_dir, handle } } @@ -513,6 +518,7 @@ impl DummyLiana { } /// Creates a new DummyLiana interface which also spins up an RPC server. + #[cfg(feature = "daemon")] pub fn new_server( bitcoin_interface: impl BitcoinInterface + 'static, database: impl DatabaseInterface + 'static, @@ -523,6 +529,7 @@ impl DummyLiana { pub fn control(&self) -> &DaemonControl { match self.handle { DaemonHandle::Controller { ref control, .. } => control, + #[cfg(feature = "daemon")] DaemonHandle::Server { .. } => unreachable!(), } }