Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions example/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ async fn main() {
.anchor_checkpoint(checkpoint)
// The number of connections we would like to maintain
.required_peers(1)
// Omit informational messages
.log_level(LogLevel::Warning)
// Omit debug messages
.log_level(LogLevel::Info)
// Create the node and client
.build()
.unwrap();
Expand All @@ -55,17 +55,23 @@ async fn main() {

let Client {
requester,
mut log_rx,
warn_rx: _,
log_rx: _,
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;

// Continually listen for events until the node is synced to its peers.
loop {
tokio::select! {
log = log_rx.recv() => {
if let Some(log) = log {
tracing::info!("{log}");
info = info_rx.recv() => {
if let Some(info) = info {
tracing::info!("{info}");
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
}
}
event = event_rx.recv() => {
Expand Down
6 changes: 6 additions & 0 deletions example/rescan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn main() {
let Client {
requester,
mut log_rx,
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;
Expand All @@ -63,6 +64,11 @@ async fn main() {
tracing::info!("{log}");
}
}
info = info_rx.recv() => {
if let Some(info) = info {
tracing::info!("{info}");
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
Expand Down
19 changes: 12 additions & 7 deletions example/signet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Usual sync on Signet.

use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint};
use kyoto::{AddrV2, Address, Client, Event, Log, Network, ServiceFlags, TrustedPeer};
use kyoto::{AddrV2, Address, Client, Event, Info, Network, ServiceFlags, TrustedPeer};
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
Expand Down Expand Up @@ -57,6 +57,7 @@ async fn main() {
let Client {
requester,
mut log_rx,
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;
Expand Down Expand Up @@ -84,16 +85,20 @@ async fn main() {
}
}
}
log = log_rx.recv() => {
if let Some(log) = log {
match log {
Log::Debug(d)=> tracing::info!("{d}"),
Log::StateChange(node_state) => tracing::info!("{node_state}"),
Log::ConnectionsMet => tracing::info!("All required connections met"),
info = info_rx.recv() => {
if let Some(info) = info {
match info {
Info::StateChange(node_state) => tracing::info!("{node_state}"),
Info::ConnectionsMet => tracing::info!("All required connections met"),
_ => (),
}
}
}
log = log_rx.recv() => {
if let Some(log) = log {
tracing::info!("{log}");
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
Expand Down
19 changes: 12 additions & 7 deletions example/testnet4.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Usual sync on Testnet.

use kyoto::{builder::NodeBuilder, chain::checkpoints::HeaderCheckpoint};
use kyoto::{Address, Client, Event, Log, Network, PeerStoreSizeConfig, TrustedPeer};
use kyoto::{Address, Client, Event, Info, Network, PeerStoreSizeConfig, TrustedPeer};
use std::collections::HashSet;
use std::{net::Ipv4Addr, str::FromStr};

Expand Down Expand Up @@ -52,6 +52,7 @@ async fn main() {
let Client {
requester,
mut log_rx,
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;
Expand All @@ -76,16 +77,20 @@ async fn main() {
}
}
}
log = log_rx.recv() => {
if let Some(log) = log {
match log {
Log::Debug(d)=> tracing::info!("{d}"),
Log::StateChange(node_state) => tracing::info!("{node_state}"),
Log::ConnectionsMet => tracing::info!("All required connections met"),
info = info_rx.recv() => {
if let Some(info) = info {
match info {
Info::StateChange(node_state) => tracing::info!("{node_state}"),
Info::ConnectionsMet => tracing::info!("All required connections met"),
_ => (),
}
}
}
log = log_rx.recv() => {
if let Some(log) = log {
tracing::info!("{log}");
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
Expand Down
6 changes: 4 additions & 2 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ mod tests {
filters::cfheader_chain::AppendAttempt,
{
dialog::Dialog,
messages::{Event, Log, Warning},
messages::{Event, Info, Warning},
},
};

Expand All @@ -699,7 +699,8 @@ mod tests {
height_monitor: Arc<Mutex<HeightMonitor>>,
peers: usize,
) -> Chain<()> {
let (log_tx, _) = tokio::sync::mpsc::channel::<Log>(1);
let (log_tx, _) = tokio::sync::mpsc::channel::<String>(1);
let (info_tx, _) = tokio::sync::mpsc::channel::<Info>(1);
let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::<Warning>();
let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::<Event>();
let mut checkpoints = HeaderCheckpoints::new(&bitcoin::Network::Regtest);
Expand All @@ -712,6 +713,7 @@ mod tests {
Arc::new(Dialog::new(
crate::LogLevel::Debug,
log_tx,
info_tx,
warn_tx,
event_tx,
)),
Expand Down
26 changes: 14 additions & 12 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{collections::BTreeMap, ops::Range, time::Duration};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;

use crate::{Event, Log, TrustedPeer, TxBroadcast, Warning};
use crate::{Event, Info, TrustedPeer, TxBroadcast, Warning};

#[cfg(feature = "filter-control")]
use super::{error::FetchBlockError, messages::BlockRequest, BlockReceiver, IndexedBlock};
Expand All @@ -22,8 +22,10 @@ use super::{
pub struct Client {
/// Send events to a node, such as broadcasting a transaction.
pub requester: Requester,
/// Receive log messages from a node.
pub log_rx: mpsc::Receiver<Log>,
/// Receive log/debug messages from a node.
pub log_rx: mpsc::Receiver<String>,
/// Receive informational messages from the node.
pub info_rx: mpsc::Receiver<Info>,
/// Receive warning messages from a node.
pub warn_rx: mpsc::UnboundedReceiver<Warning>,
/// Receive [`Event`] from a node to act on.
Expand All @@ -32,14 +34,16 @@ pub struct Client {

impl Client {
pub(crate) fn new(
log_rx: mpsc::Receiver<Log>,
log_rx: mpsc::Receiver<String>,
info_rx: mpsc::Receiver<Info>,
warn_rx: mpsc::UnboundedReceiver<Warning>,
event_rx: mpsc::UnboundedReceiver<Event>,
ntx: Sender<ClientMessage>,
) -> Self {
Self {
requester: Requester::new(ntx),
log_rx,
info_rx,
warn_rx,
event_rx,
}
Expand Down Expand Up @@ -354,25 +358,23 @@ mod tests {
#[tokio::test]
async fn test_client_works() {
let transaction: Transaction = deserialize(&hex::decode("0200000001aad73931018bd25f84ae400b68848be09db706eac2ac18298babee71ab656f8b0000000048473044022058f6fc7c6a33e1b31548d481c826c015bd30135aad42cd67790dab66d2ad243b02204a1ced2604c6735b6393e5b41691dd78b00f0c5942fb9f751856faa938157dba01feffffff0280f0fa020000000017a9140fb9463421696b82c833af241c78c17ddbde493487d0f20a270100000017a91429ca74f8a08f81999428185c97b5d852e4063f618765000000").unwrap()).unwrap();
let (log_tx, log_rx) = tokio::sync::mpsc::channel::<Log>(1);
let (log_tx, log_rx) = tokio::sync::mpsc::channel::<String>(1);
let (_, info_rx) = tokio::sync::mpsc::channel::<Info>(1);
let (_, warn_rx) = tokio::sync::mpsc::unbounded_channel::<Warning>();
let (_, event_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
let (ctx, crx) = mpsc::channel::<ClientMessage>(5);
let Client {
requester,
mut log_rx,
info_rx: _,
warn_rx: _,
event_rx: _,
} = Client::new(log_rx, warn_rx, event_rx, ctx);
let send_res = log_tx.send(Log::Debug("An important message".into())).await;
} = Client::new(log_rx, info_rx, warn_rx, event_rx, ctx);
let send_res = log_tx.send("An important message".into()).await;
assert!(send_res.is_ok());
let message = log_rx.recv().await;
assert!(message.is_some());
tokio::task::spawn(async move {
log_tx
.send(Log::Debug("Another important message".into()))
.await
});
tokio::task::spawn(async move { log_tx.send("Another important message".into()).await });
assert!(send_res.is_ok());
let message = log_rx.recv().await;
assert!(message.is_some());
Expand Down
35 changes: 20 additions & 15 deletions src/dialog.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
use tokio::sync::mpsc::{Sender, UnboundedSender};

use super::messages::{Event, Log, Progress, Warning};
use super::messages::{Event, Info, Progress, Warning};
use crate::LogLevel;

#[derive(Debug, Clone)]
pub(crate) struct Dialog {
pub(crate) log_level: LogLevel,
log_tx: Sender<Log>,
log_tx: Sender<String>,
info_tx: Sender<Info>,
warn_tx: UnboundedSender<Warning>,
event_tx: UnboundedSender<Event>,
}

impl Dialog {
pub(crate) fn new(
log_level: LogLevel,
log_tx: Sender<Log>,
log_tx: Sender<String>,
info_tx: Sender<Info>,
warn_tx: UnboundedSender<Warning>,
event_tx: UnboundedSender<Event>,
) -> Self {
Self {
log_level,
log_tx,
info_tx,
warn_tx,
event_tx,
}
}

pub(crate) async fn send_dialog(&self, dialog: impl Into<String>) {
let _ = self.log_tx.send(Log::Debug(dialog.into())).await;
let _ = self.log_tx.send(dialog.into()).await;
}

pub(crate) async fn chain_update(
Expand All @@ -37,29 +40,31 @@ impl Dialog {
num_filters: u32,
best_height: u32,
) {
let _ = self
.log_tx
.send(Log::Progress(Progress::new(
num_cf_headers,
num_filters,
best_height,
)))
.await;
if matches!(self.log_level, LogLevel::Debug | LogLevel::Info) {
let _ = self
.info_tx
.send(Info::Progress(Progress::new(
num_cf_headers,
num_filters,
best_height,
)))
.await;
}
if matches!(self.log_level, LogLevel::Debug) {
let message = format!(
"Headers ({}/{}) Compact Filter Headers ({}/{}) Filters ({}/{})",
num_headers, best_height, num_cf_headers, best_height, num_filters, best_height
);
let _ = self.log_tx.send(Log::Debug(message)).await;
let _ = self.log_tx.send(message).await;
}
}

pub(crate) fn send_warning(&self, warning: Warning) {
let _ = self.warn_tx.send(warning);
}

pub(crate) async fn send_info(&self, info: Log) {
let _ = self.log_tx.send(info).await;
pub(crate) async fn send_info(&self, info: Info) {
let _ = self.info_tx.send(info).await;
}

pub(crate) fn send_event(&self, message: Event) {
Expand Down
Loading