Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/bin/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,16 @@ fn main() {
process::exit(1);
});

let daemon = DaemonHandle::start_default(config).unwrap_or_else(|e| {
let handle = DaemonHandle::start_default(config, true).unwrap_or_else(|e| {
log::error!("Error starting Liana daemon: {}", e);
process::exit(1);
});
daemon
.rpc_server()
.expect("JSONRPC server must terminate cleanly");
while handle.is_alive() {
thread::sleep(time::Duration::from_millis(500));
}
if let Err(e) = handle.stop() {
log::error!("Error stopping Liana daemon: {}", e);
}

// We are always logging to stdout, should it be then piped to the log file (if self) or
// not. So just make sure that all messages were actually written.
Expand Down
91 changes: 19 additions & 72 deletions src/bitcoin/poller/looper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use crate::{
descriptors,
};

use std::{
collections::HashSet,
sync::{self, atomic},
thread, time,
};
use std::{collections::HashSet, sync, time};

use miniscript::bitcoin::{self, secp256k1};

Expand Down Expand Up @@ -208,13 +204,11 @@ fn new_tip(bit: &impl BitcoinInterface, current_tip: &BlockChainTip) -> TipUpdat
}

fn updates(
db_conn: &mut Box<dyn DatabaseConnection>,
bit: &impl BitcoinInterface,
db: &impl DatabaseInterface,
descs: &[descriptors::SinglePathLianaDesc],
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
) {
let mut db_conn = db.connection();

// Check if there was a new block before updating ourselves.
let current_tip = db_conn.chain_tip().expect("Always set at first startup");
let latest_tip = match new_tip(bit, &current_tip) {
Expand All @@ -225,18 +219,18 @@ fn updates(
// between our former chain and the new one, then restart fresh.
db_conn.rollback_tip(&new_tip);
log::info!("Tip was rolled back to '{}'.", new_tip);
return updates(bit, db, descs, secp);
return updates(db_conn, bit, descs, secp);
}
};

// Then check the state of our coins. Do it even if the tip did not change since last poll, as
// we may have unconfirmed transactions.
let updated_coins = update_coins(bit, &mut db_conn, &current_tip, descs, secp);
let updated_coins = update_coins(bit, db_conn, &current_tip, descs, secp);

// If the tip changed while we were polling our Bitcoin interface, start over.
if bit.chain_tip() != latest_tip {
log::info!("Chain tip changed while we were updating our state. Starting over.");
return updates(bit, db, descs, secp);
return updates(db_conn, bit, descs, secp);
}

// The chain tip did not change since we started our updates. Record them and the latest tip.
Expand All @@ -258,13 +252,12 @@ fn updates(

// Check if there is any rescan of the backend ongoing or one that just finished.
fn rescan_check(
db_conn: &mut Box<dyn DatabaseConnection>,
bit: &impl BitcoinInterface,
db: &impl DatabaseInterface,
descs: &[descriptors::SinglePathLianaDesc],
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
) {
log::debug!("Checking the state of an ongoing rescan if there is any");
let mut db_conn = db.connection();

// Check if there is an ongoing rescan. If there isn't and we previously asked for a rescan of
// the backend, we treat it as completed.
Expand Down Expand Up @@ -299,14 +292,14 @@ fn rescan_check(
"Rolling back our internal tip to '{}' to update our internal state with past transactions.",
rescan_tip
);
updates(bit, db, descs, secp)
updates(db_conn, bit, descs, secp)
} else {
log::debug!("No ongoing rescan.");
}
}

// If the database chain tip is NULL (first startup), initialize it.
fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
/// If the database chain tip is NULL (first startup), initialize it.
pub fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface) {
let mut db_conn = db.connection();

if db_conn.chain_tip().is_none() {
Expand All @@ -315,7 +308,7 @@ fn maybe_initialize_tip(bit: &impl BitcoinInterface, db: &impl DatabaseInterface
}
}

fn sync_poll_interval() -> time::Duration {
pub fn sync_poll_interval() -> time::Duration {
// TODO: be smarter, like in revaultd, but more generic too.
#[cfg(not(test))]
{
Expand All @@ -325,60 +318,14 @@ fn sync_poll_interval() -> time::Duration {
time::Duration::from_secs(0)
}

/// Main event loop. Repeatedly polls the Bitcoin interface until told to stop through the
/// `shutdown` atomic.
pub fn looper(
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
shutdown: sync::Arc<atomic::AtomicBool>,
poll_interval: time::Duration,
desc: descriptors::LianaDescriptor,
/// Update our state from the Bitcoin backend.
pub fn poll(
bit: &sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: &sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
secp: &secp256k1::Secp256k1<secp256k1::VerifyOnly>,
descs: &[descriptors::SinglePathLianaDesc],
) {
let mut last_poll = None;
let mut synced = false;
let descs = [
desc.receive_descriptor().clone(),
desc.change_descriptor().clone(),
];
let secp = secp256k1::Secp256k1::verification_only();

maybe_initialize_tip(&bit, &db);

while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() {
let now = time::Instant::now();

if let Some(last_poll) = last_poll {
let time_since_poll = now.duration_since(last_poll);
let poll_interval = if synced {
poll_interval
} else {
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
sync_poll_interval()
};
if time_since_poll < poll_interval {
thread::sleep(time::Duration::from_millis(500));
continue;
}
}
last_poll = Some(now);

// Don't poll until the Bitcoin backend is fully synced.
if !synced {
let progress = bit.sync_progress();
log::info!(
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
progress.rounded_up_progress() * 100.0,
progress.blocks,
progress.headers
);
synced = progress.is_complete();
if !synced {
continue;
}
}

updates(&bit, &db, &descs, &secp);
rescan_check(&bit, &db, &descs, &secp);
}
let mut db_conn = db.connection();
updates(&mut db_conn, bit, descs, secp);
rescan_check(&mut db_conn, bit, descs, secp);
}
144 changes: 106 additions & 38 deletions src/bitcoin/poller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,128 @@
mod looper;

use crate::{
bitcoin::{poller::looper::looper, BitcoinInterface},
database::DatabaseInterface,
descriptors,
};
use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors};

use std::{
sync::{self, atomic},
thread, time,
sync::{self, mpsc},
time,
};

use miniscript::bitcoin::secp256k1;

#[derive(Debug, Clone)]
pub enum PollerMessage {
Shutdown,
/// Ask the Bitcoin poller to poll immediately, get notified through the passed channel once
/// it's done.
PollNow(mpsc::SyncSender<()>),
}

/// The Bitcoin poller handler.
pub struct Poller {
handle: thread::JoinHandle<()>,
shutdown: sync::Arc<atomic::AtomicBool>,
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
secp: secp256k1::Secp256k1<secp256k1::VerifyOnly>,
// The receive and change descriptors (in this order).
descs: [descriptors::SinglePathLianaDesc; 2],
}

impl Poller {
pub fn start(
pub fn new(
bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>,
db: sync::Arc<sync::Mutex<dyn DatabaseInterface>>,
poll_interval: time::Duration,
desc: descriptors::LianaDescriptor,
) -> Poller {
let shutdown = sync::Arc::from(atomic::AtomicBool::from(false));
let handle = thread::Builder::new()
.name("Bitcoin poller".to_string())
.spawn({
let shutdown = shutdown.clone();
move || looper(bit, db, shutdown, poll_interval, desc)
})
.expect("Must not fail");

Poller { shutdown, handle }
}
let secp = secp256k1::Secp256k1::verification_only();
let descs = [
desc.receive_descriptor().clone(),
desc.change_descriptor().clone(),
];

pub fn trigger_stop(&self) {
self.shutdown.store(true, atomic::Ordering::Relaxed);
}
// On first startup the tip may be NULL. Make sure it's set as the poller relies on it.
looper::maybe_initialize_tip(&bit, &db);

pub fn stop(self) {
self.trigger_stop();
self.handle.join().expect("The poller loop must not fail");
Poller {
bit,
db,
secp,
descs,
}
}

#[cfg(feature = "nonblocking_shutdown")]
pub fn is_stopped(&self) -> bool {
// Doc says "This might return true for a brief moment after the thread’s main function has
// returned, but before the thread itself has stopped running.". But it's not an issue for
// us, as long as the main poller function has returned we are good.
self.handle.is_finished()
}
/// Continuously update our state from the Bitcoin backend.
/// - `poll_interval`: how frequently to perform an update.
/// - `shutdown`: set to true to stop continuously updating and make this function return.
///
/// Typically this would run for the whole duration of the program in a thread, and the main
/// thread would set the `shutdown` atomic to `true` when shutting down.
pub fn poll_forever(
&self,
poll_interval: time::Duration,
receiver: mpsc::Receiver<PollerMessage>,
) {
let mut last_poll = None;
let mut synced = false;

loop {
// How long to wait before the next poll.
let time_before_poll = if let Some(last_poll) = last_poll {
let time_since_poll = time::Instant::now().duration_since(last_poll);
// Until we are synced we poll less often to avoid harassing bitcoind and impeding
// the sync. As a function since it's mocked for the tests.
let poll_interval = if synced {
poll_interval
} else {
looper::sync_poll_interval()
};
poll_interval.saturating_sub(time_since_poll)
} else {
// Don't wait before doing the first poll.
time::Duration::ZERO
};

// Wait for the duration of the interval between polls, but listen to messages in the
// meantime.
match receiver.recv_timeout(time_before_poll) {
Ok(PollerMessage::Shutdown) => {
log::info!("Bitcoin poller was told to shut down.");
return;
}
Ok(PollerMessage::PollNow(sender)) => {
// We've been asked to poll, don't wait any further and signal completion to
// the caller.
last_poll = Some(time::Instant::now());
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
if let Err(e) = sender.send(()) {
log::error!("Error sending immediate poll completion signal: {}.", e);
}
continue;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// It's been long enough since the last poll.
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
log::error!("Bitcoin poller communication channel got disconnected. Exiting.");
return;
}
}
last_poll = Some(time::Instant::now());

// Don't poll until the Bitcoin backend is fully synced.
if !synced {
let progress = self.bit.sync_progress();
log::info!(
"Block chain synchronization progress: {:.2}% ({} blocks / {} headers)",
progress.rounded_up_progress() * 100.0,
progress.blocks,
progress.headers
);
synced = progress.is_complete();
if !synced {
continue;
}
}

#[cfg(test)]
pub fn test_stop(&mut self) {
self.shutdown.store(true, atomic::Ordering::Relaxed);
looper::poll(&self.bit, &self.db, &self.secp, &self.descs);
}
}
}
Loading