From 31b35ddf387214fc44ef359c62eb6d7279b6cf17 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 11 Apr 2025 21:07:19 +0200 Subject: [PATCH] refactor!: introduce `Info` and separate info/debug strings --- example/managed.rs | 20 ++++++++++++------- example/rescan.rs | 6 ++++++ example/signet.rs | 19 +++++++++++------- example/testnet4.rs | 19 +++++++++++------- src/chain/chain.rs | 6 ++++-- src/client.rs | 26 +++++++++++++------------ src/dialog.rs | 35 ++++++++++++++++++--------------- src/lib.rs | 31 +++++++++++++++++++----------- src/messages.rs | 15 ++++++--------- src/node.rs | 46 +++++++++++++++++++++----------------------- tests/core.rs | 47 +++++++++++++++++++++++++++------------------ 11 files changed, 157 insertions(+), 113 deletions(-) diff --git a/example/managed.rs b/example/managed.rs index cdb3473f..6cc298f0 100644 --- a/example/managed.rs +++ b/example/managed.rs @@ -45,8 +45,8 @@ async fn main() { .anchor_checkpoint(checkpoint) // The number of connections we would like to maintain .required_peers(1) - // Omit informational messages - .log_level(LogLevel::Warning) + // Omit debug messages + .log_level(LogLevel::Info) // Create the node and client .build() .unwrap(); @@ -55,17 +55,23 @@ async fn main() { let Client { requester, - mut log_rx, - warn_rx: _, + log_rx: _, + mut info_rx, + mut warn_rx, mut event_rx, } = client; // Continually listen for events until the node is synced to its peers. loop { tokio::select! { - log = log_rx.recv() => { - if let Some(log) = log { - tracing::info!("{log}"); + info = info_rx.recv() => { + if let Some(info) = info { + tracing::info!("{info}"); + } + } + warn = warn_rx.recv() => { + if let Some(warn) = warn { + tracing::warn!("{warn}"); } } event = event_rx.recv() => { diff --git a/example/rescan.rs b/example/rescan.rs index b8a0ddba..44222b35 100644 --- a/example/rescan.rs +++ b/example/rescan.rs @@ -45,6 +45,7 @@ async fn main() { let Client { requester, mut log_rx, + mut info_rx, mut warn_rx, mut event_rx, } = client; @@ -63,6 +64,11 @@ async fn main() { tracing::info!("{log}"); } } + info = info_rx.recv() => { + if let Some(info) = info { + tracing::info!("{info}"); + } + } warn = warn_rx.recv() => { if let Some(warn) = warn { tracing::warn!("{warn}"); diff --git a/example/signet.rs b/example/signet.rs index 40d11552..fb988a23 100644 --- a/example/signet.rs +++ b/example/signet.rs @@ -1,7 +1,7 @@ //! Usual sync on Signet. use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint}; -use kyoto::{AddrV2, Address, Client, Event, Log, Network, ServiceFlags, TrustedPeer}; +use kyoto::{AddrV2, Address, Client, Event, Info, Network, ServiceFlags, TrustedPeer}; use std::collections::HashSet; use std::{ net::{IpAddr, Ipv4Addr}, @@ -57,6 +57,7 @@ async fn main() { let Client { requester, mut log_rx, + mut info_rx, mut warn_rx, mut event_rx, } = client; @@ -84,16 +85,20 @@ async fn main() { } } } - log = log_rx.recv() => { - if let Some(log) = log { - match log { - Log::Debug(d)=> tracing::info!("{d}"), - Log::StateChange(node_state) => tracing::info!("{node_state}"), - Log::ConnectionsMet => tracing::info!("All required connections met"), + info = info_rx.recv() => { + if let Some(info) = info { + match info { + Info::StateChange(node_state) => tracing::info!("{node_state}"), + Info::ConnectionsMet => tracing::info!("All required connections met"), _ => (), } } } + log = log_rx.recv() => { + if let Some(log) = log { + tracing::info!("{log}"); + } + } warn = warn_rx.recv() => { if let Some(warn) = warn { tracing::warn!("{warn}"); diff --git a/example/testnet4.rs b/example/testnet4.rs index 1616a686..0754362e 100644 --- a/example/testnet4.rs +++ b/example/testnet4.rs @@ -1,7 +1,7 @@ //! Usual sync on Testnet. use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint}; -use kyoto::{Address, Client, Event, Log, Network, PeerStoreSizeConfig, TrustedPeer}; +use kyoto::{Address, Client, Event, Info, Network, PeerStoreSizeConfig, TrustedPeer}; use std::collections::HashSet; use std::{net::Ipv4Addr, str::FromStr}; @@ -52,6 +52,7 @@ async fn main() { let Client { requester, mut log_rx, + mut info_rx, mut warn_rx, mut event_rx, } = client; @@ -76,16 +77,20 @@ async fn main() { } } } - log = log_rx.recv() => { - if let Some(log) = log { - match log { - Log::Debug(d)=> tracing::info!("{d}"), - Log::StateChange(node_state) => tracing::info!("{node_state}"), - Log::ConnectionsMet => tracing::info!("All required connections met"), + info = info_rx.recv() => { + if let Some(info) = info { + match info { + Info::StateChange(node_state) => tracing::info!("{node_state}"), + Info::ConnectionsMet => tracing::info!("All required connections met"), _ => (), } } } + log = log_rx.recv() => { + if let Some(log) = log { + tracing::info!("{log}"); + } + } warn = warn_rx.recv() => { if let Some(warn) = warn { tracing::warn!("{warn}"); diff --git a/src/chain/chain.rs b/src/chain/chain.rs index 282e17a1..c307591c 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -688,7 +688,7 @@ mod tests { filters::cfheader_chain::AppendAttempt, { dialog::Dialog, - messages::{Event, Log, Warning}, + messages::{Event, Info, Warning}, }, }; @@ -699,7 +699,8 @@ mod tests { height_monitor: Arc>, peers: usize, ) -> Chain<()> { - let (log_tx, _) = tokio::sync::mpsc::channel::(1); + let (log_tx, _) = tokio::sync::mpsc::channel::(1); + let (info_tx, _) = tokio::sync::mpsc::channel::(1); let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::(); let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::(); let mut checkpoints = HeaderCheckpoints::new(&bitcoin::Network::Regtest); @@ -712,6 +713,7 @@ mod tests { Arc::new(Dialog::new( crate::LogLevel::Debug, log_tx, + info_tx, warn_tx, event_tx, )), diff --git a/src/client.rs b/src/client.rs index 32e99b23..05c0ad3c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,7 +8,7 @@ use std::{collections::BTreeMap, ops::Range, time::Duration}; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; -use crate::{Event, Log, TrustedPeer, TxBroadcast, Warning}; +use crate::{Event, Info, TrustedPeer, TxBroadcast, Warning}; #[cfg(feature = "filter-control")] use super::{error::FetchBlockError, messages::BlockRequest, BlockReceiver, IndexedBlock}; @@ -22,8 +22,10 @@ use super::{ pub struct Client { /// Send events to a node, such as broadcasting a transaction. pub requester: Requester, - /// Receive log messages from a node. - pub log_rx: mpsc::Receiver, + /// Receive log/debug messages from a node. + pub log_rx: mpsc::Receiver, + /// Receive informational messages from the node. + pub info_rx: mpsc::Receiver, /// Receive warning messages from a node. pub warn_rx: mpsc::UnboundedReceiver, /// Receive [`Event`] from a node to act on. @@ -32,7 +34,8 @@ pub struct Client { impl Client { pub(crate) fn new( - log_rx: mpsc::Receiver, + log_rx: mpsc::Receiver, + info_rx: mpsc::Receiver, warn_rx: mpsc::UnboundedReceiver, event_rx: mpsc::UnboundedReceiver, ntx: Sender, @@ -40,6 +43,7 @@ impl Client { Self { requester: Requester::new(ntx), log_rx, + info_rx, warn_rx, event_rx, } @@ -354,25 +358,23 @@ mod tests { #[tokio::test] async fn test_client_works() { let transaction: Transaction = deserialize(&hex::decode("0200000001aad73931018bd25f84ae400b68848be09db706eac2ac18298babee71ab656f8b0000000048473044022058f6fc7c6a33e1b31548d481c826c015bd30135aad42cd67790dab66d2ad243b02204a1ced2604c6735b6393e5b41691dd78b00f0c5942fb9f751856faa938157dba01feffffff0280f0fa020000000017a9140fb9463421696b82c833af241c78c17ddbde493487d0f20a270100000017a91429ca74f8a08f81999428185c97b5d852e4063f618765000000").unwrap()).unwrap(); - let (log_tx, log_rx) = tokio::sync::mpsc::channel::(1); + let (log_tx, log_rx) = tokio::sync::mpsc::channel::(1); + let (_, info_rx) = tokio::sync::mpsc::channel::(1); let (_, warn_rx) = tokio::sync::mpsc::unbounded_channel::(); let (_, event_rx) = tokio::sync::mpsc::unbounded_channel::(); let (ctx, crx) = mpsc::channel::(5); let Client { requester, mut log_rx, + info_rx: _, warn_rx: _, event_rx: _, - } = Client::new(log_rx, warn_rx, event_rx, ctx); - let send_res = log_tx.send(Log::Debug("An important message".into())).await; + } = Client::new(log_rx, info_rx, warn_rx, event_rx, ctx); + let send_res = log_tx.send("An important message".into()).await; assert!(send_res.is_ok()); let message = log_rx.recv().await; assert!(message.is_some()); - tokio::task::spawn(async move { - log_tx - .send(Log::Debug("Another important message".into())) - .await - }); + tokio::task::spawn(async move { log_tx.send("Another important message".into()).await }); assert!(send_res.is_ok()); let message = log_rx.recv().await; assert!(message.is_some()); diff --git a/src/dialog.rs b/src/dialog.rs index 6466ae4d..f9344d59 100644 --- a/src/dialog.rs +++ b/src/dialog.rs @@ -1,12 +1,13 @@ use tokio::sync::mpsc::{Sender, UnboundedSender}; -use super::messages::{Event, Log, Progress, Warning}; +use super::messages::{Event, Info, Progress, Warning}; use crate::LogLevel; #[derive(Debug, Clone)] pub(crate) struct Dialog { pub(crate) log_level: LogLevel, - log_tx: Sender, + log_tx: Sender, + info_tx: Sender, warn_tx: UnboundedSender, event_tx: UnboundedSender, } @@ -14,20 +15,22 @@ pub(crate) struct Dialog { impl Dialog { pub(crate) fn new( log_level: LogLevel, - log_tx: Sender, + log_tx: Sender, + info_tx: Sender, warn_tx: UnboundedSender, event_tx: UnboundedSender, ) -> Self { Self { log_level, log_tx, + info_tx, warn_tx, event_tx, } } pub(crate) async fn send_dialog(&self, dialog: impl Into) { - let _ = self.log_tx.send(Log::Debug(dialog.into())).await; + let _ = self.log_tx.send(dialog.into()).await; } pub(crate) async fn chain_update( @@ -37,20 +40,22 @@ impl Dialog { num_filters: u32, best_height: u32, ) { - let _ = self - .log_tx - .send(Log::Progress(Progress::new( - num_cf_headers, - num_filters, - best_height, - ))) - .await; + if matches!(self.log_level, LogLevel::Debug | LogLevel::Info) { + let _ = self + .info_tx + .send(Info::Progress(Progress::new( + num_cf_headers, + num_filters, + best_height, + ))) + .await; + } if matches!(self.log_level, LogLevel::Debug) { let message = format!( "Headers ({}/{}) Compact Filter Headers ({}/{}) Filters ({}/{})", num_headers, best_height, num_cf_headers, best_height, num_filters, best_height ); - let _ = self.log_tx.send(Log::Debug(message)).await; + let _ = self.log_tx.send(message).await; } } @@ -58,8 +63,8 @@ impl Dialog { let _ = self.warn_tx.send(warning); } - pub(crate) async fn send_info(&self, info: Log) { - let _ = self.log_tx.send(info).await; + pub(crate) async fn send_info(&self, info: Info) { + let _ = self.info_tx.send(info).await; } pub(crate) fn send_event(&self, message: Event) { diff --git a/src/lib.rs b/src/lib.rs index 446aa432..4db6f529 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ //! ```no_run //! use std::str::FromStr; //! use std::collections::HashSet; -//! use kyoto::{NodeBuilder, Log, Event, Client, Address, Network, HeaderCheckpoint, BlockHash}; +//! use kyoto::{NodeBuilder, Event, Client, Address, Network, HeaderCheckpoint, BlockHash}; //! //! #[tokio::main] //! async fn main() { @@ -42,16 +42,13 @@ //! // Run the node and wait for the sync message; //! tokio::task::spawn(async move { node.run().await }); //! // Split the client into components that send messages and listen to messages -//! let Client { requester, mut log_rx, warn_rx: _, mut event_rx } = client; +//! let Client { requester, mut log_rx, info_rx: _, warn_rx: _, mut event_rx } = client; //! // Sync with the single script added //! loop { //! tokio::select! { //! log = log_rx.recv() => { //! if let Some(log) = log { -//! match log { -//! Log::Debug(d) => tracing::info!("{d}"), -//! _ => (), -//! } +//! tracing::info!("{log}"); //! } //! } //! event = event_rx.recv() => { @@ -137,7 +134,7 @@ pub use { crate::builder::NodeBuilder, crate::client::{Client, Requester}, crate::error::{ClientError, NodeError}, - crate::messages::{Event, Log, Progress, RejectPayload, SyncUpdate, Warning}, + crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning}, crate::network::PeerTimeoutConfig, crate::node::Node, }; @@ -389,12 +386,13 @@ pub enum PeerStoreSizeConfig { /// Select the category of messages for the node to emit. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub enum LogLevel { - /// Send `Log::Debug` messages. These messages are intended for debugging or troubleshooting + /// Send debug strings. These messages are intended for debugging or troubleshooting /// node operation. #[default] Debug, - /// Omit `Log::Debug` messages, including their memory allocations. Ideal for a production - /// application that uses minimal logging. + /// Send info and warning messages, but omit debug strings - including their memory allocations. Ideal for a production application that uses minimal logging. + Info, + /// Send warnings only. Warning, } @@ -445,9 +443,20 @@ macro_rules! log { ($dialog:expr, $expr:expr) => { match $dialog.log_level { crate::LogLevel::Debug => $dialog.send_dialog($expr).await, - crate::LogLevel::Warning => (), + _ => (), + } + }; +} + +macro_rules! info { + ($dialog:expr, $expr:expr) => { + match $dialog.log_level { + crate::LogLevel::Debug => $dialog.send_info($expr).await, + crate::LogLevel::Info => $dialog.send_info($expr).await, + _ => (), } }; } +pub(crate) use info; pub(crate) use log; diff --git a/src/messages.rs b/src/messages.rs index b7265694..faaf04bc 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -15,9 +15,7 @@ use super::error::{FetchBlockError, FetchHeaderError}; /// Informational messages emitted by a node #[derive(Debug, Clone)] -pub enum Log { - /// Human readable dialog of what the node is currently doing. - Debug(String), +pub enum Info { /// The current state of the node in the syncing process. StateChange(NodeState), /// The node is connected to all required peers. @@ -30,14 +28,13 @@ pub enum Log { TxSent(Txid), } -impl core::fmt::Display for Log { +impl core::fmt::Display for Info { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { - Log::Debug(d) => write!(f, "{}", d), - Log::StateChange(s) => write!(f, "{}", s), - Log::TxSent(txid) => write!(f, "Transaction sent: {}", txid), - Log::ConnectionsMet => write!(f, "Required connections met"), - Log::Progress(p) => { + Info::StateChange(s) => write!(f, "{}", s), + Info::TxSent(txid) => write!(f, "Transaction sent: {}", txid), + Info::ConnectionsMet => write!(f, "Required connections met"), + Info::Progress(p) => { let progress_percent = p.percentage_complete(); write!(f, "Percent complete: {}", progress_percent) } diff --git a/src/node.rs b/src/node.rs index 274fea9c..3d80d14a 100644 --- a/src/node.rs +++ b/src/node.rs @@ -39,7 +39,7 @@ use super::{ config::NodeConfig, dialog::Dialog, error::NodeError, - messages::{ClientMessage, Event, Log, SyncUpdate, Warning}, + messages::{ClientMessage, Event, Info, SyncUpdate, Warning}, }; pub(crate) const WTXID_VERSION: u32 = 70016; @@ -84,13 +84,14 @@ impl Node { } = config; let timeout_config = PeerTimeoutConfig::new(response_timeout, max_connection_time); // Set up a communication channel between the node and client - let (log_tx, log_rx) = mpsc::channel::(32); + let (log_tx, log_rx) = mpsc::channel::(32); + let (info_tx, info_rx) = mpsc::channel::(32); let (warn_tx, warn_rx) = mpsc::unbounded_channel::(); let (event_tx, event_rx) = mpsc::unbounded_channel::(); let (ctx, crx) = mpsc::channel::(5); - let client = Client::new(log_rx, warn_rx, event_rx, ctx); + let client = Client::new(log_rx, info_rx, warn_rx, event_rx, ctx); // A structured way to talk to the client - let dialog = Arc::new(Dialog::new(log_level, log_tx, warn_tx, event_tx)); + let dialog = Arc::new(Dialog::new(log_level, log_tx, info_tx, warn_tx, event_tx)); // We always assume we are behind let state = Arc::new(RwLock::new(NodeState::Behind)); // Configure the peer manager @@ -383,7 +384,7 @@ impl Node { } }; if did_broadcast { - self.dialog.send_info(Log::TxSent(txid)).await; + crate::info!(self.dialog, Info::TxSent(txid)); } else { self.dialog.send_warning(Warning::TransactionRejected { payload: RejectPayload::from_txid(txid), @@ -400,27 +401,24 @@ impl Node { NodeState::Behind => { let header_chain = self.chain.lock().await; if header_chain.is_synced().await { - self.dialog - .send_info(Log::StateChange(NodeState::HeadersSynced)) - .await; + crate::info!(self.dialog, Info::StateChange(NodeState::HeadersSynced)); *state = NodeState::HeadersSynced; } } NodeState::HeadersSynced => { let header_chain = self.chain.lock().await; if header_chain.is_cf_headers_synced() { - self.dialog - .send_info(Log::StateChange(NodeState::FilterHeadersSynced)) - .await; + crate::info!( + self.dialog, + Info::StateChange(NodeState::FilterHeadersSynced) + ); *state = NodeState::FilterHeadersSynced; } } NodeState::FilterHeadersSynced => { let header_chain = self.chain.lock().await; if header_chain.is_filters_synced() { - self.dialog - .send_info(Log::StateChange(NodeState::FiltersSynced)) - .await; + crate::info!(self.dialog, Info::StateChange(NodeState::FiltersSynced)); *state = NodeState::FiltersSynced; } } @@ -435,9 +433,10 @@ impl Node { ), chain.last_ten(), ); - self.dialog - .send_info(Log::StateChange(NodeState::TransactionsSynced)) - .await; + crate::info!( + self.dialog, + Info::StateChange(NodeState::TransactionsSynced) + ); self.dialog.send_event(Event::Synced(update)); } } @@ -532,7 +531,7 @@ impl Node { } // Inform the user we are connected to all required peers if peer_map.live().eq(&self.required_peers) { - self.dialog.send_info(Log::ConnectionsMet).await + crate::info!(self.dialog, Info::ConnectionsMet); } // Even if we start the node as caught up in terms of height, we need to check for reorgs. So we can send this unconditionally. let mut chain = self.chain.lock().await; @@ -690,9 +689,7 @@ impl Node { .into_iter() .any(|block| !chain.header_chain.contains(block)) { - self.dialog - .send_info(Log::StateChange(NodeState::Behind)) - .await; + crate::info!(self.dialog, Info::StateChange(NodeState::Behind)); *state = NodeState::Behind; let next_headers = GetHeaderConfig { locators: chain.locators().await, @@ -722,9 +719,10 @@ impl Node { NodeState::HeadersSynced => None, _ => { chain.clear_filters(); - self.dialog - .send_info(Log::StateChange(NodeState::FilterHeadersSynced)) - .await; + crate::info!( + self.dialog, + Info::StateChange(NodeState::FilterHeadersSynced) + ); *state = NodeState::FilterHeadersSynced; Some(MainThreadMessage::GetFilters( chain.next_filter_message().await, diff --git a/tests/core.rs b/tests/core.rs index ab4b63bc..a0050ba8 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -10,10 +10,8 @@ use bitcoin::{address::NetworkChecked, ScriptBuf}; use corepc_node::serde_json; use corepc_node::{anyhow, exe_path}; use kyoto::{ - chain::checkpoints::HeaderCheckpoint, - BlockHash, Event, Log, NodeState, ServiceFlags, SqliteHeaderDb, SqlitePeerDb, TrustedPeer, - Warning, - {client::Client, node::Node}, + chain::checkpoints::HeaderCheckpoint, client::Client, node::Node, BlockHash, Event, LogLevel, + ServiceFlags, SqliteHeaderDb, SqlitePeerDb, TrustedPeer, Warning, }; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::UnboundedReceiver; @@ -116,7 +114,7 @@ async fn sync_assert(best: &bitcoin::BlockHash, channel: &mut UnboundedReceiver< } } -async fn print_logs(mut log_rx: Receiver, mut warn_rx: UnboundedReceiver) { +async fn print_logs(mut log_rx: Receiver, mut warn_rx: UnboundedReceiver) { loop { tokio::select! { log = log_rx.recv() => { @@ -157,6 +155,7 @@ async fn live_reorg() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -212,6 +211,7 @@ async fn live_reorg_additional_sync() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -270,6 +270,7 @@ async fn various_client_methods() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -309,6 +310,7 @@ async fn stop_reorg_resync() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -329,6 +331,7 @@ async fn stop_reorg_resync() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -360,6 +363,7 @@ async fn stop_reorg_resync() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -393,6 +397,7 @@ async fn stop_reorg_two_resync() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -414,6 +419,7 @@ async fn stop_reorg_two_resync() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -444,6 +450,7 @@ async fn stop_reorg_two_resync() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -476,6 +483,7 @@ async fn stop_reorg_start_on_orphan() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -500,6 +508,7 @@ async fn stop_reorg_start_on_orphan() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -537,6 +546,7 @@ async fn stop_reorg_start_on_orphan() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -561,6 +571,7 @@ async fn stop_reorg_start_on_orphan() { let Client { requester, log_rx, + info_rx: _, warn_rx, event_rx: mut channel, } = client; @@ -604,25 +615,22 @@ async fn halting_download_works() { tokio::task::spawn(async move { node.run().await }); let Client { requester, - log_rx: mut log, + log_rx: _, + info_rx: mut info, warn_rx: _, event_rx: mut channel, } = client; - // Ensure SQL is able to catch the fork by loading in headers from the database - while let Some(message) = log.recv().await { - match message { - Log::Debug(d) => println!("{d}"), - Log::StateChange(node_state) => { - if let NodeState::FilterHeadersSynced = node_state { - println!("Sleeping for one second..."); - tokio::time::sleep(Duration::from_secs(1)).await; - requester.continue_download().await.unwrap(); - break; - } + while let Some(message) = info.recv().await { + if let kyoto::Info::StateChange(node_state) = message { + if let kyoto::NodeState::FilterHeadersSynced = node_state { + println!("Sleeping for one second..."); + tokio::time::sleep(Duration::from_secs(1)).await; + requester.continue_download().await.unwrap(); + break; } - _ => (), } } + while let Some(message) = channel.recv().await { if let kyoto::messages::Event::Synced(update) = message { println!("Done"); @@ -650,6 +658,7 @@ async fn signet_syncs() { .add_peer(host) .add_scripts(set) .data_dir(tempdir) + .log_level(LogLevel::Info) .build() .unwrap(); tokio::task::spawn(async move { node.run().await }); @@ -663,7 +672,7 @@ async fn signet_syncs() { break; } } - log = client.log_rx.recv() => { + log = client.info_rx.recv() => { if let Some(log) = log { println!("{log}"); }