diff --git a/src/bin/daemon.rs b/src/bin/daemon.rs index 51e3c1347..3917dc115 100644 --- a/src/bin/daemon.rs +++ b/src/bin/daemon.rs @@ -70,13 +70,16 @@ fn main() { process::exit(1); }); - let daemon = DaemonHandle::start_default(config).unwrap_or_else(|e| { + let handle = DaemonHandle::start_default(config, true).unwrap_or_else(|e| { log::error!("Error starting Liana daemon: {}", e); process::exit(1); }); - daemon - .rpc_server() - .expect("JSONRPC server must terminate cleanly"); + while handle.is_alive() { + thread::sleep(time::Duration::from_millis(500)); + } + if let Err(e) = handle.stop() { + log::error!("Error stopping Liana daemon: {}", e); + } // We are always logging to stdout, should it be then piped to the log file (if self) or // not. So just make sure that all messages were actually written. diff --git a/src/bitcoin/poller/looper.rs b/src/bitcoin/poller/looper.rs index cdff3920c..93c85749d 100644 --- a/src/bitcoin/poller/looper.rs +++ b/src/bitcoin/poller/looper.rs @@ -4,11 +4,7 @@ use crate::{ descriptors, }; -use std::{ - collections::HashSet, - sync::{self, atomic}, - thread, time, -}; +use std::{collections::HashSet, sync, time}; use miniscript::bitcoin::{self, secp256k1}; @@ -208,13 +204,11 @@ fn new_tip(bit: &impl BitcoinInterface, current_tip: &BlockChainTip) -> TipUpdat } fn updates( + db_conn: &mut Box, bit: &impl BitcoinInterface, - db: &impl DatabaseInterface, descs: &[descriptors::SinglePathLianaDesc], secp: &secp256k1::Secp256k1, ) { - let mut db_conn = db.connection(); - // Check if there was a new block before updating ourselves. let current_tip = db_conn.chain_tip().expect("Always set at first startup"); let latest_tip = match new_tip(bit, ¤t_tip) { @@ -225,18 +219,18 @@ fn updates( // between our former chain and the new one, then restart fresh. db_conn.rollback_tip(&new_tip); log::info!("Tip was rolled back to '{}'.", new_tip); - return updates(bit, db, descs, secp); + return updates(db_conn, bit, descs, secp); } }; // Then check the state of our coins. Do it even if the tip did not change since last poll, as // we may have unconfirmed transactions. - let updated_coins = update_coins(bit, &mut db_conn, ¤t_tip, descs, secp); + let updated_coins = update_coins(bit, db_conn, ¤t_tip, descs, secp); // If the tip changed while we were polling our Bitcoin interface, start over. if bit.chain_tip() != latest_tip { log::info!("Chain tip changed while we were updating our state. Starting over."); - return updates(bit, db, descs, secp); + return updates(db_conn, bit, descs, secp); } // The chain tip did not change since we started our updates. Record them and the latest tip. @@ -258,13 +252,12 @@ fn updates( // Check if there is any rescan of the backend ongoing or one that just finished. fn rescan_check( + db_conn: &mut Box, bit: &impl BitcoinInterface, - db: &impl DatabaseInterface, descs: &[descriptors::SinglePathLianaDesc], secp: &secp256k1::Secp256k1, ) { log::debug!("Checking the state of an ongoing rescan if there is any"); - let mut db_conn = db.connection(); // Check if there is an ongoing rescan. If there isn't and we previously asked for a rescan of // the backend, we treat it as completed. @@ -299,14 +292,14 @@ fn rescan_check( "Rolling back our internal tip to '{}' to update our internal state with past transactions.", rescan_tip ); - updates(bit, db, descs, secp) + updates(db_conn, bit, descs, secp) } else { log::debug!("No ongoing rescan."); } } -// If the database chain tip is NULL (first startup), initialize it. -fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { +/// If the database chain tip is NULL (first startup), initialize it. +pub fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) { let mut db_conn = db.connection(); if db_conn.chain_tip().is_none() { @@ -315,7 +308,7 @@ fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface } } -fn sync_poll_interval() -> time::Duration { +pub fn sync_poll_interval() -> time::Duration { // TODO: be smarter, like in revaultd, but more generic too. #[cfg(not(test))] { @@ -325,60 +318,14 @@ fn sync_poll_interval() -> time::Duration { time::Duration::from_secs(0) } -/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the -/// `shutdown` atomic. -pub fn looper( - bit: sync::Arc>, - db: sync::Arc>, - shutdown: sync::Arc, - poll_interval: time::Duration, - desc: descriptors::LianaDescriptor, +/// Update our state from the Bitcoin backend. +pub fn poll( + bit: &sync::Arc>, + db: &sync::Arc>, + secp: &secp256k1::Secp256k1, + descs: &[descriptors::SinglePathLianaDesc], ) { - let mut last_poll = None; - let mut synced = false; - let descs = [ - desc.receive_descriptor().clone(), - desc.change_descriptor().clone(), - ]; - let secp = secp256k1::Secp256k1::verification_only(); - - maybe_initialize_tip(&bit, &db); - - while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { - let now = time::Instant::now(); - - if let Some(last_poll) = last_poll { - let time_since_poll = now.duration_since(last_poll); - let poll_interval = if synced { - poll_interval - } else { - // Until we are synced we poll less often to avoid harassing bitcoind and impeding - // the sync. As a function since it's mocked for the tests. - sync_poll_interval() - }; - if time_since_poll < poll_interval { - thread::sleep(time::Duration::from_millis(500)); - continue; - } - } - last_poll = Some(now); - - // Don't poll until the Bitcoin backend is fully synced. - if !synced { - let progress = bit.sync_progress(); - log::info!( - "Block chain synchronization progress: {:.2}% ({} blocks / {} headers)", - progress.rounded_up_progress() * 100.0, - progress.blocks, - progress.headers - ); - synced = progress.is_complete(); - if !synced { - continue; - } - } - - updates(&bit, &db, &descs, &secp); - rescan_check(&bit, &db, &descs, &secp); - } + let mut db_conn = db.connection(); + updates(&mut db_conn, bit, descs, secp); + rescan_check(&mut db_conn, bit, descs, secp); } diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index 7731af2e1..8e9874e16 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -1,60 +1,128 @@ mod looper; -use crate::{ - bitcoin::{poller::looper::looper, BitcoinInterface}, - database::DatabaseInterface, - descriptors, -}; +use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors}; use std::{ - sync::{self, atomic}, - thread, time, + sync::{self, mpsc}, + time, }; +use miniscript::bitcoin::secp256k1; + +#[derive(Debug, Clone)] +pub enum PollerMessage { + Shutdown, + /// Ask the Bitcoin poller to poll immediately, get notified through the passed channel once + /// it's done. + PollNow(mpsc::SyncSender<()>), +} + /// The Bitcoin poller handler. pub struct Poller { - handle: thread::JoinHandle<()>, - shutdown: sync::Arc, + bit: sync::Arc>, + db: sync::Arc>, + secp: secp256k1::Secp256k1, + // The receive and change descriptors (in this order). + descs: [descriptors::SinglePathLianaDesc; 2], } impl Poller { - pub fn start( + pub fn new( bit: sync::Arc>, db: sync::Arc>, - poll_interval: time::Duration, desc: descriptors::LianaDescriptor, ) -> Poller { - let shutdown = sync::Arc::from(atomic::AtomicBool::from(false)); - let handle = thread::Builder::new() - .name("Bitcoin poller".to_string()) - .spawn({ - let shutdown = shutdown.clone(); - move || looper(bit, db, shutdown, poll_interval, desc) - }) - .expect("Must not fail"); - - Poller { shutdown, handle } - } + let secp = secp256k1::Secp256k1::verification_only(); + let descs = [ + desc.receive_descriptor().clone(), + desc.change_descriptor().clone(), + ]; - pub fn trigger_stop(&self) { - self.shutdown.store(true, atomic::Ordering::Relaxed); - } + // On first startup the tip may be NULL. Make sure it's set as the poller relies on it. + looper::maybe_initialize_tip(&bit, &db); - pub fn stop(self) { - self.trigger_stop(); - self.handle.join().expect("The poller loop must not fail"); + Poller { + bit, + db, + secp, + descs, + } } - #[cfg(feature = "nonblocking_shutdown")] - pub fn is_stopped(&self) -> bool { - // Doc says "This might return true for a brief moment after the thread’s main function has - // returned, but before the thread itself has stopped running.". But it's not an issue for - // us, as long as the main poller function has returned we are good. - self.handle.is_finished() - } + /// Continuously update our state from the Bitcoin backend. + /// - `poll_interval`: how frequently to perform an update. + /// - `shutdown`: set to true to stop continuously updating and make this function return. + /// + /// Typically this would run for the whole duration of the program in a thread, and the main + /// thread would set the `shutdown` atomic to `true` when shutting down. + pub fn poll_forever( + &self, + poll_interval: time::Duration, + receiver: mpsc::Receiver, + ) { + let mut last_poll = None; + let mut synced = false; + + loop { + // How long to wait before the next poll. + let time_before_poll = if let Some(last_poll) = last_poll { + let time_since_poll = time::Instant::now().duration_since(last_poll); + // Until we are synced we poll less often to avoid harassing bitcoind and impeding + // the sync. As a function since it's mocked for the tests. + let poll_interval = if synced { + poll_interval + } else { + looper::sync_poll_interval() + }; + poll_interval.saturating_sub(time_since_poll) + } else { + // Don't wait before doing the first poll. + time::Duration::ZERO + }; + + // Wait for the duration of the interval between polls, but listen to messages in the + // meantime. + match receiver.recv_timeout(time_before_poll) { + Ok(PollerMessage::Shutdown) => { + log::info!("Bitcoin poller was told to shut down."); + return; + } + Ok(PollerMessage::PollNow(sender)) => { + // We've been asked to poll, don't wait any further and signal completion to + // the caller. + last_poll = Some(time::Instant::now()); + looper::poll(&self.bit, &self.db, &self.secp, &self.descs); + if let Err(e) = sender.send(()) { + log::error!("Error sending immediate poll completion signal: {}.", e); + } + continue; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // It's been long enough since the last poll. + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + log::error!("Bitcoin poller communication channel got disconnected. Exiting."); + return; + } + } + last_poll = Some(time::Instant::now()); + + // Don't poll until the Bitcoin backend is fully synced. + if !synced { + let progress = self.bit.sync_progress(); + log::info!( + "Block chain synchronization progress: {:.2}% ({} blocks / {} headers)", + progress.rounded_up_progress() * 100.0, + progress.blocks, + progress.headers + ); + synced = progress.is_complete(); + if !synced { + continue; + } + } - #[cfg(test)] - pub fn test_stop(&mut self) { - self.shutdown.store(true, atomic::Ordering::Relaxed); + looper::poll(&self.bit, &self.db, &self.secp, &self.descs); + } } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 6cb372d73..a77fb8d25 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -8,6 +8,7 @@ use crate::{ bitcoin::BitcoinInterface, database::{Coin, DatabaseConnection, DatabaseInterface}, descriptors, + poller::PollerMessage, spend::{ create_spend, AddrInfo, AncestorInfo, CandidateCoin, CreateSpendRes, SpendCreationError, SpendOutputAddress, SpendTxFees, TxGetter, @@ -24,7 +25,8 @@ use utils::{ use std::{ collections::{hash_map, HashMap, HashSet}, - fmt, sync, + fmt, + sync::{self, mpsc}, }; use miniscript::{ @@ -688,7 +690,18 @@ impl DaemonControl { let final_tx = spend_psbt.extract_tx_unchecked_fee_rate(); self.bitcoin .broadcast_tx(&final_tx) - .map_err(CommandError::TxBroadcast) + .map_err(CommandError::TxBroadcast)?; + + // Finally, update our state with the changes from this transaction. + let (tx, rx) = mpsc::sync_channel(0); + if let Err(e) = self.poller_sender.send(PollerMessage::PollNow(tx)) { + log::error!("Error requesting update from poller: {}", e); + } + if let Err(e) = rx.recv() { + log::error!("Error receiving completion signal from poller: {}", e); + } + + Ok(()) } /// Create PSBT to replace the given transaction using RBF. @@ -1251,7 +1264,7 @@ mod tests { fn getinfo() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); // We can query getinfo - ms.handle.control.get_info(); + ms.control().get_info(); ms.shutdown(); } @@ -1259,7 +1272,7 @@ mod tests { fn getnewaddress() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); // We can get an address let addr = control.get_new_address().address; assert_eq!( @@ -1281,7 +1294,7 @@ mod tests { fn listaddresses() { let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let list = control.list_addresses(Some(2), Some(5)).unwrap(); @@ -1412,7 +1425,7 @@ mod tests { ), ); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); // Arguments sanity checking let dummy_addr = @@ -1900,7 +1913,7 @@ mod tests { .insert(dummy_op_a.txid, (dummy_tx.clone(), None)); dummy_bitcoind.txs.insert(dummy_op_b.txid, (dummy_tx, None)); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let mut db_conn = control.db().lock().unwrap().connection(); // Add two (unconfirmed) coins in DB @@ -2046,7 +2059,7 @@ mod tests { let dummy_txid_a = dummy_psbt_a.unsigned_tx.txid(); dummy_bitcoind.txs.insert(dummy_txid_a, (dummy_tx_a, None)); let ms = DummyLiana::new(dummy_bitcoind, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let mut db_conn = control.db().lock().unwrap().connection(); // The spend needs to be in DB before using RBF. assert_eq!( @@ -2284,7 +2297,7 @@ mod tests { let ms = DummyLiana::new(btc, db); - let control = &ms.handle.control; + let control = &ms.control(); let transactions = control.list_confirmed_transactions(0, 4, 10).transactions; assert_eq!(transactions.len(), 4); @@ -2416,7 +2429,7 @@ mod tests { let ms = DummyLiana::new(btc, DummyDatabase::new()); - let control = &ms.handle.control; + let control = &ms.control(); let transactions = control.list_transactions(&[tx1.txid()]).transactions; assert_eq!(transactions.len(), 1); diff --git a/src/jsonrpc/server.rs b/src/jsonrpc/server.rs index dc6f7ca1c..424adc2a0 100644 --- a/src/jsonrpc/server.rs +++ b/src/jsonrpc/server.rs @@ -122,11 +122,11 @@ fn connection_handler( pub fn rpcserver_loop( listener: net::UnixListener, daemon_control: DaemonControl, + shutdown: sync::Arc, ) -> Result<(), io::Error> { // Keep it simple. We don't need great performances so just treat each connection in // its thread, with a given maximum number of connections. let connections_counter = sync::Arc::from(atomic::AtomicU32::new(0)); - let shutdown = sync::Arc::from(atomic::AtomicBool::new(false)); listener.set_nonblocking(true)?; while !shutdown.load(atomic::Ordering::Relaxed) { @@ -400,7 +400,7 @@ mod tests { #[cfg(not(target_os = "macos"))] #[test] fn server_sanity_check() { - let ms = DummyLiana::new(DummyBitcoind::new(), DummyDatabase::new()); + let ms = DummyLiana::new_server(DummyBitcoind::new(), DummyDatabase::new()); let socket_path: path::PathBuf = [ ms.tmp_dir.as_path(), path::Path::new("d"), @@ -410,7 +410,6 @@ mod tests { .iter() .collect(); - let t = thread::spawn(move || ms.rpc_server().unwrap()); while !socket_path.exists() { thread::sleep(time::Duration::from_millis(100)); } @@ -426,6 +425,6 @@ mod tests { &[&serde_json::to_vec(&stop_req).unwrap(), b"\n"], ); - t.join().unwrap(); + ms.shutdown(); } } diff --git a/src/lib.rs b/src/lib.rs index b608b24e7..19f02a63f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,12 +28,16 @@ use crate::{ }, }; -use std::{error, fmt, fs, io, path, sync}; +use std::{ + error, fmt, fs, io, path, + sync::{self, mpsc}, + thread, +}; use miniscript::bitcoin::secp256k1; #[cfg(not(test))] -use std::{panic, process}; +use std::panic; // A panic in any thread should stop the main thread, and print the panic. #[cfg(not(test))] fn setup_panic_hook() { @@ -60,8 +64,6 @@ fn setup_panic_hook() { info, bt ); - - process::exit(1); })); } @@ -255,21 +257,24 @@ fn setup_bitcoind( pub struct DaemonControl { config: Config, bitcoin: sync::Arc>, + poller_sender: mpsc::SyncSender, // FIXME: Should we require Sync on DatabaseInterface rather than using a Mutex? db: sync::Arc>, secp: secp256k1::Secp256k1, } impl DaemonControl { - pub fn new( + pub(crate) fn new( config: Config, bitcoin: sync::Arc>, + poller_sender: mpsc::SyncSender, db: sync::Arc>, secp: secp256k1::Secp256k1, ) -> DaemonControl { DaemonControl { config, bitcoin, + poller_sender, db, secp, } @@ -282,25 +287,39 @@ impl DaemonControl { } } -pub struct DaemonHandle { - pub control: DaemonControl, - bitcoin_poller: poller::Poller, +/// The handle to a Liana daemon. It might either be the handle for a daemon which exposes a +/// JSONRPC server or one which exposes its API through a `DaemonControl`. +pub enum DaemonHandle { + Controller { + poller_sender: mpsc::SyncSender, + poller_handle: thread::JoinHandle<()>, + control: DaemonControl, + }, + #[cfg(feature = "daemon")] + Server { + poller_sender: mpsc::SyncSender, + poller_handle: thread::JoinHandle<()>, + rpcserver_shutdown: sync::Arc, + rpcserver_handle: thread::JoinHandle>, + }, } impl DaemonHandle { - /// This starts the Liana daemon. Call `shutdown` to shut it down. + /// This starts the Liana daemon. A user of this interface should regularly poll the `is_alive` + /// method to check for internal errors. To shut down the daemon use the `stop` method. + /// + /// The `with_rpc_server` controls whether we should start a JSONRPC server to receive queries + /// or instead return a `DaemonControl` object for a caller to access the daemon's API. /// /// You may specify a custom Bitcoin interface through the `bitcoin` parameter. If `None`, the /// default Bitcoin interface (`bitcoind` JSONRPC) will be used. /// You may specify a custom Database interface through the `db` parameter. If `None`, the /// default Database interface (SQLite) will be used. - /// - /// **Note**: we internally use threads, and set a panic hook. A downstream application must - /// not overwrite this panic hook. pub fn start( config: Config, bitcoin: Option, db: Option, + #[cfg(feature = "daemon")] with_rpc_server: bool, ) -> Result { #[cfg(not(test))] setup_panic_hook(); @@ -353,82 +372,126 @@ impl DaemonHandle { } } - // Spawn the bitcoind poller with a retry limit high enough that we'd fail after that. - let bitcoin_poller = poller::Poller::start( - bit.clone(), - db.clone(), - config.bitcoin_config.poll_interval_secs, - config.main_descriptor.clone(), - ); - - // Finally, set up the API. - let control = DaemonControl::new(config, bit, db, secp); + // Start the poller thread. Keep the thread handle to be able to check if it crashed. Store + // an atomic to be able to stop it. + let bitcoin_poller = + poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone()); + let (poller_sender, poller_receiver) = mpsc::sync_channel(0); + let poller_handle = thread::Builder::new() + .name("Bitcoin Network poller".to_string()) + .spawn({ + let poll_interval = config.bitcoin_config.poll_interval_secs; + move || { + log::info!("Bitcoin poller started."); + bitcoin_poller.poll_forever(poll_interval, poller_receiver); + log::info!("Bitcoin poller stopped."); + } + }) + .expect("Spawning the poller thread must never fail."); + + // Create the API the external world will use to talk to us, either directly through the Rust + // structure or through the JSONRPC server we may setup below. + let control = DaemonControl::new(config, bit, poller_sender.clone(), db, secp); + + #[cfg(feature = "daemon")] + if with_rpc_server { + let rpcserver_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let rpcserver_handle = thread::Builder::new() + .name("Bitcoin Network poller".to_string()) + .spawn({ + let shutdown = rpcserver_shutdown.clone(); + move || { + let mut rpc_socket = data_dir; + rpc_socket.push("lianad_rpc"); + let listener = rpcserver_setup(&rpc_socket)?; + log::info!("JSONRPC server started."); + + rpcserver_loop(listener, control, shutdown)?; + log::info!("JSONRPC server stopped."); + Ok(()) + } + }) + .expect("Spawning the RPC server thread should never fail."); + + return Ok(DaemonHandle::Server { + poller_sender, + poller_handle, + rpcserver_shutdown, + rpcserver_handle, + }); + } - Ok(Self { + Ok(DaemonHandle::Controller { + poller_sender, + poller_handle, control, - bitcoin_poller, }) } /// Start the Liana daemon with the default Bitcoin and database interfaces (`bitcoind` RPC /// and SQLite). - pub fn start_default(config: Config) -> Result { - DaemonHandle::start(config, Option::::None, Option::::None) - } - - /// Start the JSONRPC server and listen for incoming commands until we die. - /// Like DaemonHandle::shutdown(), this stops the Bitcoin poller at teardown. - #[cfg(feature = "daemon")] - pub fn rpc_server(self) -> Result<(), io::Error> { - let DaemonHandle { - control, - bitcoin_poller: poller, - } = self; - - let rpc_socket: path::PathBuf = [ - control - .config - .data_dir() - .expect("Didn't fail at startup, must not now") - .as_path(), - path::Path::new(&control.config.bitcoin_config.network.to_string()), - path::Path::new("lianad_rpc"), - ] - .iter() - .collect(); - let listener = rpcserver_setup(&rpc_socket)?; - log::info!("JSONRPC server started."); - - rpcserver_loop(listener, control)?; - log::info!("JSONRPC server stopped."); - - poller.stop(); - - Ok(()) - } - - /// Shut down the Liana daemon. - pub fn shutdown(self) { - self.bitcoin_poller.stop(); - } - - /// Tell the daemon to shut down. This will return before the shutdown completes. The structure - /// must not be reused after triggering shutdown. - #[cfg(feature = "nonblocking_shutdown")] - pub fn trigger_shutdown(&self) { - self.bitcoin_poller.trigger_stop() + pub fn start_default( + config: Config, + #[cfg(feature = "daemon")] with_rpc_server: bool, + ) -> Result { + Self::start( + config, + Option::::None, + Option::::None, + #[cfg(feature = "daemon")] + with_rpc_server, + ) } - /// Whether the daemon has finished shutting down. - #[cfg(feature = "nonblocking_shutdown")] - pub fn shutdown_complete(&self) -> bool { - self.bitcoin_poller.is_stopped() + /// Check whether the daemon is still up and running. This needs to be regularly polled to + /// check for internal errors. If this returns `false`, collect the error using the `stop` + /// method. + pub fn is_alive(&self) -> bool { + match self { + Self::Controller { + ref poller_handle, .. + } => !poller_handle.is_finished(), + #[cfg(feature = "daemon")] + Self::Server { + ref poller_handle, + ref rpcserver_handle, + .. + } => !poller_handle.is_finished() && !rpcserver_handle.is_finished(), + } } - // We need a shutdown utility that does not move for implementing Drop for the DummyLiana - #[cfg(test)] - pub fn test_shutdown(&mut self) { - self.bitcoin_poller.test_stop(); + /// Stop the Liana daemon. This returns any error which may have occurred. + pub fn stop(self) -> Result<(), Box> { + match self { + Self::Controller { + poller_sender, + poller_handle, + .. + } => { + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); + poller_handle.join().expect("Poller thread must not panic"); + Ok(()) + } + #[cfg(feature = "daemon")] + Self::Server { + poller_sender, + poller_handle, + rpcserver_shutdown, + rpcserver_handle, + } => { + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); + rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed); + rpcserver_handle + .join() + .expect("Poller thread must not panic")?; + poller_handle.join().expect("Poller thread must not panic"); + Ok(()) + } + } } } @@ -608,18 +671,6 @@ mod tests { stream.flush().unwrap(); } - // Send them a response to 'getblockchaininfo' saying we are far from being synced - fn complete_sync_check(server: &net::TcpListener) { - let net_resp = [ - "HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1,\"headers\":1000,\"blocks\":100}}\n".as_bytes(), - ] - .concat(); - let (mut stream, _) = server.accept().unwrap(); - read_til_json_end(&mut stream); - stream.write_all(&net_resp).unwrap(); - stream.flush().unwrap(); - } - // TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the // bitcoind interface, and use the DummyLiana from testutils to sanity check the startup. // Note that startup as checked by this unit test is also tested in the functional test @@ -681,11 +732,16 @@ mod tests { }; // Start the daemon in a new thread so the current one acts as the bitcoind server. - let daemon_thread = thread::spawn({ + let t = thread::spawn({ let config = config.clone(); move || { - let handle = DaemonHandle::start_default(config).unwrap(); - handle.shutdown(); + let handle = DaemonHandle::start_default( + config, + #[cfg(feature = "daemon")] + false, + ) + .unwrap(); + handle.stop().unwrap(); } }); complete_sanity_check(&server); @@ -696,13 +752,22 @@ mod tests { complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_tip_init(&server); - complete_sync_check(&server); - daemon_thread.join().unwrap(); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. + t.join().unwrap(); // The datadir is created now, so if we restart it it won't create the wo wallet. - let daemon_thread = thread::spawn(move || { - let handle = DaemonHandle::start_default(config).unwrap(); - handle.shutdown(); + let t = thread::spawn({ + let config = config.clone(); + move || { + let handle = DaemonHandle::start_default( + config, + #[cfg(feature = "daemon")] + false, + ) + .unwrap(); + handle.stop().unwrap(); + } }); complete_sanity_check(&server); complete_version_check(&server); @@ -710,8 +775,9 @@ mod tests { complete_wallet_loading(&server); complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); - complete_sync_check(&server); - daemon_thread.join().unwrap(); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. + t.join().unwrap(); fs::remove_dir_all(&tmp_dir).unwrap(); } diff --git a/src/testutils.rs b/src/testutils.rs index 9e2adfc06..712d39069 100644 --- a/src/testutils.rs +++ b/src/testutils.rs @@ -2,13 +2,13 @@ use crate::{ bitcoin::{BitcoinInterface, Block, BlockChainTip, MempoolEntry, SyncProgress, UTxO}, config::{BitcoinConfig, Config}, database::{BlockInfo, Coin, CoinStatus, DatabaseConnection, DatabaseInterface, LabelItem}, - descriptors, DaemonHandle, + descriptors, DaemonControl, DaemonHandle, }; use std::convert::TryInto; use std::{ collections::{HashMap, HashSet}, - env, fs, io, path, process, + env, fs, path, process, str::FromStr, sync, thread, time, time::{SystemTime, UNIX_EPOCH}, @@ -464,9 +464,10 @@ pub fn tmp_dir() -> path::PathBuf { impl DummyLiana { /// Creates a new DummyLiana interface - pub fn new( + pub fn _new( bitcoin_interface: impl BitcoinInterface + 'static, database: impl DatabaseInterface + 'static, + rpc_server: bool, ) -> DummyLiana { let tmp_dir = tmp_dir(); fs::create_dir_all(&tmp_dir).unwrap(); @@ -497,19 +498,44 @@ impl DummyLiana { main_descriptor: desc, }; - let handle = DaemonHandle::start(config, Some(bitcoin_interface), Some(database)).unwrap(); + let handle = DaemonHandle::start( + config, + Some(bitcoin_interface), + Some(database), + #[cfg(feature = "daemon")] + rpc_server, + ) + .unwrap(); DummyLiana { tmp_dir, handle } } + /// Creates a new DummyLiana interface + pub fn new( + bitcoin_interface: impl BitcoinInterface + 'static, + database: impl DatabaseInterface + 'static, + ) -> DummyLiana { + Self::_new(bitcoin_interface, database, false) + } + + /// Creates a new DummyLiana interface which also spins up an RPC server. #[cfg(feature = "daemon")] - pub fn rpc_server(self) -> Result<(), io::Error> { - self.handle.rpc_server()?; - fs::remove_dir_all(&self.tmp_dir)?; - Ok(()) + pub fn new_server( + bitcoin_interface: impl BitcoinInterface + 'static, + database: impl DatabaseInterface + 'static, + ) -> DummyLiana { + Self::_new(bitcoin_interface, database, true) + } + + pub fn control(&self) -> &DaemonControl { + match self.handle { + DaemonHandle::Controller { ref control, .. } => control, + #[cfg(feature = "daemon")] + DaemonHandle::Server { .. } => unreachable!(), + } } pub fn shutdown(self) { - self.handle.shutdown(); - fs::remove_dir_all(&self.tmp_dir).unwrap(); + self.handle.stop().unwrap(); + fs::remove_dir_all(self.tmp_dir).unwrap(); } }