Skip to content

Commit 24fe41e

Browse files
committed
Enforce client semantics with state
Uses the sealed trait hack to only allow for compiler enforced state transactions. There is first a `run` step, followed by the typical methods a client would expect. I decided to scrap the `Requester` and instead encapsulate the event receivers with a newtype. Running a client gives access to the event receivers as well as the handle to the task. The only downside I see with this is `&mut self` doesn't seem to be possible, as each impl block is defined for it's `State` type. I need to fiddle with that to see if we can't remove the need to return the client each state transition.
1 parent df401d0 commit 24fe41e

File tree

7 files changed

+221
-188
lines changed

7 files changed

+221
-188
lines changed

examples/bitcoin.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
44
use bip157::builder::Builder;
55
use bip157::chain::{BlockHeaderChanges, ChainState};
6-
use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf};
6+
use bip157::client::EventListeners;
7+
use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf};
78
use std::collections::HashSet;
89
use tokio::time::Instant;
910

@@ -34,18 +35,13 @@ async fn main() {
3435
.add_peers(seeds.into_iter().map(From::from))
3536
// Create the node and client
3637
.build();
37-
38-
let (client, _) = client.run();
39-
// Split the client into components that send messages and listen to messages.
40-
// With this construction, different parts of the program can take ownership of
41-
// specific tasks.
42-
let Client {
43-
requester,
38+
let (client, events, _) = client.run();
39+
let EventListeners {
4440
mut info_rx,
4541
mut warn_rx,
4642
mut event_rx,
47-
..
48-
} = client;
43+
} = events;
44+
4945
// Continually listen for events until the node is synced to its peers.
5046
loop {
5147
tokio::select! {
@@ -55,11 +51,11 @@ async fn main() {
5551
Event::FiltersSynced(update) => {
5652
tracing::info!("Chain tip: {}",update.tip().hash);
5753
// Request information from the node
58-
let fee = requester.broadcast_min_feerate().await.unwrap();
54+
let fee = client.broadcast_min_feerate().await.unwrap();
5955
tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee);
6056
let sync_time = now.elapsed().as_secs_f32();
6157
tracing::info!("Total sync time: {sync_time} seconds");
62-
let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap();
58+
let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap();
6359
tracing::info!("Last block average fee rate: {:#}", avg_fee_rate);
6460
break;
6561
},
@@ -82,6 +78,6 @@ async fn main() {
8278
}
8379
}
8480
}
85-
let _ = requester.shutdown();
81+
let _ = client.shutdown();
8682
tracing::info!("Shutting down");
8783
}

examples/signet.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
44
use bip157::chain::{BlockHeaderChanges, ChainState};
55
use bip157::messages::Event;
6-
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client};
7-
use bip157::{Address, BlockHash, Network};
6+
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint};
7+
use bip157::{Address, BlockHash, EventListeners, Network};
88
use std::collections::HashSet;
99
use std::str::FromStr;
1010

@@ -40,15 +40,12 @@ async fn main() {
4040
// Create the node and client
4141
.build();
4242

43-
let (client, _) = client.run();
44-
45-
let Client {
46-
requester,
43+
let (client, events, _) = client.run();
44+
let EventListeners {
4745
mut info_rx,
4846
mut warn_rx,
4947
mut event_rx,
50-
..
51-
} = client;
48+
} = events;
5249

5350
// Continually listen for events until the node is synced to its peers.
5451
loop {
@@ -72,7 +69,7 @@ async fn main() {
7269
if filter.contains_any(addresses.iter()) {
7370
let hash = filter.block_hash();
7471
tracing::info!("Found script at {}!", hash);
75-
let indexed_block = requester.get_block(hash).await.unwrap();
72+
let indexed_block = client.get_block(hash).await.unwrap();
7673
let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid();
7774
tracing::info!("Coinbase transaction ID: {}", coinbase);
7875
break;
@@ -88,6 +85,6 @@ async fn main() {
8885
}
8986
}
9087
}
91-
let _ = requester.shutdown();
88+
let _ = client.shutdown();
9289
tracing::info!("Shutting down");
9390
}

src/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use bitcoin::Network;
55

66
use super::{client::Client, node::Node};
77
use crate::chain::ChainState;
8+
use crate::client::Idle;
89
use crate::network::ConnectionType;
910
use crate::TrustedPeer;
1011
use crate::{Config, FilterType};
@@ -139,7 +140,7 @@ impl Builder {
139140
}
140141

141142
/// Consume the node builder and receive a [`Client`].
142-
pub fn build(mut self) -> Client {
143+
pub fn build(mut self) -> Client<Idle> {
143144
Node::build(self.network, core::mem::take(&mut self.config))
144145
}
145146
}

src/client.rs

Lines changed: 95 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,50 +13,106 @@ use crate::{Event, Info, TrustedPeer, Warning};
1313
use super::{error::ClientError, messages::ClientMessage};
1414
use super::{error::FetchBlockError, IndexedBlock};
1515

16-
/// A [`Client`] allows for communication with a running node.
16+
/// Client state when idle.
17+
pub struct Idle;
18+
/// Client state when active.
19+
pub struct Active;
20+
21+
mod sealed {
22+
pub trait Sealed {}
23+
}
24+
25+
impl sealed::Sealed for Idle {}
26+
impl sealed::Sealed for Active {}
27+
28+
/// State of the client
29+
pub trait State: sealed::Sealed {}
30+
31+
impl State for Idle {}
32+
impl State for Active {}
33+
34+
/// Wrapper type for the channels that will receive events.
1735
#[derive(Debug)]
18-
pub struct Client {
19-
/// Send events to a node, such as broadcasting a transaction.
20-
pub requester: Requester,
36+
pub struct EventListeners {
2137
/// Receive informational messages from the node.
2238
pub info_rx: mpsc::Receiver<Info>,
2339
/// Receive warning messages from a node.
2440
pub warn_rx: mpsc::UnboundedReceiver<Warning>,
2541
/// Receive [`Event`] from a node to act on.
2642
pub event_rx: mpsc::UnboundedReceiver<Event>,
27-
/// Internal node structure.
28-
node: Option<Node>,
2943
}
3044

31-
impl Client {
32-
pub(crate) fn new(
45+
impl EventListeners {
46+
fn new(
3347
info_rx: mpsc::Receiver<Info>,
3448
warn_rx: mpsc::UnboundedReceiver<Warning>,
3549
event_rx: mpsc::UnboundedReceiver<Event>,
36-
ntx: UnboundedSender<ClientMessage>,
37-
node: Node,
3850
) -> Self {
3951
Self {
40-
requester: Requester::new(ntx),
4152
info_rx,
4253
warn_rx,
4354
event_rx,
44-
node: Some(node),
4555
}
4656
}
57+
}
58+
59+
/// A [`Client`] allows for communication with a running node.
60+
#[derive(Debug)]
61+
pub struct Client<S: State> {
62+
/// Send events to a node, such as broadcasting a transaction.
63+
ntx: UnboundedSender<ClientMessage>,
64+
/// Receive informational messages from the node.
65+
events: Option<EventListeners>,
66+
/// Internal node structure.
67+
node: Option<Node>,
68+
/// Marker for state.
69+
_marker: core::marker::PhantomData<S>,
70+
}
4771

72+
impl Client<Idle> {
73+
pub(crate) fn new(
74+
info_rx: mpsc::Receiver<Info>,
75+
warn_rx: mpsc::UnboundedReceiver<Warning>,
76+
event_rx: mpsc::UnboundedReceiver<Event>,
77+
ntx: UnboundedSender<ClientMessage>,
78+
node: Node,
79+
) -> Client<Idle> {
80+
Client {
81+
ntx,
82+
events: Some(EventListeners::new(info_rx, warn_rx, event_rx)),
83+
node: Some(node),
84+
_marker: core::marker::PhantomData,
85+
}
86+
}
4887
/// Start the underlying node on a [`tokio::task`]. This assumes there is a runtime present to
4988
/// execute the task.
50-
pub fn run(mut self) -> (Self, JoinHandle<Result<(), crate::error::NodeError>>) {
89+
pub fn run(
90+
mut self,
91+
) -> (
92+
Client<Active>,
93+
EventListeners,
94+
JoinHandle<Result<(), crate::error::NodeError>>,
95+
) {
96+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
5197
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
52-
(self, tokio::task::spawn(async move { node.run().await }))
98+
(
99+
Client {
100+
ntx: self.ntx,
101+
events: None,
102+
node: None,
103+
_marker: core::marker::PhantomData,
104+
},
105+
events,
106+
tokio::task::spawn(async move { node.run().await }),
107+
)
53108
}
54109

55110
/// Run on a detached operating system thread. This method is useful in the case where the
56111
/// majority of your application code is blocking, and you do not have a
57112
/// [`tokio::runtime::Runtime`] available. This method will implicitly create a runtime which
58113
/// runs the data fetching process.
59-
pub fn run_detached(mut self) -> (Self, std::thread::JoinHandle<()>) {
114+
pub fn run_detached(mut self) -> (Client<Active>, EventListeners, std::thread::JoinHandle<()>) {
115+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
60116
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
61117
let handle = std::thread::spawn(|| {
62118
tokio::runtime::Builder::new_multi_thread()
@@ -67,27 +123,37 @@ impl Client {
67123
let _ = node.run().await;
68124
})
69125
});
70-
(self, handle)
126+
let client = Client {
127+
ntx: self.ntx,
128+
events: None,
129+
node: None,
130+
_marker: core::marker::PhantomData,
131+
};
132+
(client, events, handle)
71133
}
72134

73135
/// Run the node with an existing [`tokio::runtime::Runtime`].
74-
pub fn run_with_runtime(mut self, rt: tokio::runtime::Runtime) -> (Self, JoinHandle<Result<(), crate::error::NodeError>>) {
136+
pub fn run_with_runtime(
137+
mut self,
138+
rt: tokio::runtime::Runtime,
139+
) -> (
140+
Client<Active>,
141+
EventListeners,
142+
JoinHandle<Result<(), crate::error::NodeError>>,
143+
) {
144+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
75145
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
76-
(self, rt.spawn(async move { node.run().await }))
146+
let client = Client {
147+
ntx: self.ntx,
148+
events: None,
149+
node: None,
150+
_marker: core::marker::PhantomData,
151+
};
152+
(client, events, rt.spawn(async move { node.run().await }))
77153
}
78154
}
79155

80-
/// Send messages to a node that is running so the node may complete a task.
81-
#[derive(Debug, Clone)]
82-
pub struct Requester {
83-
ntx: UnboundedSender<ClientMessage>,
84-
}
85-
86-
impl Requester {
87-
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
88-
Self { ntx }
89-
}
90-
156+
impl Client<Active> {
91157
/// Tell the node to shut down.
92158
///
93159
/// # Errors

src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! # Example usage
99
//!
1010
//! ```no_run
11-
//! use bip157::{Builder, Event, Client, Network, BlockHash};
11+
//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash};
1212
//!
1313
//! #[tokio::main]
1414
//! async fn main() {
@@ -22,10 +22,11 @@
2222
//! // The number of connections we would like to maintain
2323
//! .required_peers(2)
2424
//! .build();
25-
//! // Run the node and wait for the sync message;
26-
//! let (client, _) = client.run();
25+
//! // Start the node
26+
//! let (client, events, _) = client.run();
2727
//! // Split the client into components that send messages and listen to messages
28-
//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx, .. } = client;
28+
//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events;
29+
//! // Wait for the sync message;
2930
//! loop {
3031
//! if let Some(event) = event_rx.recv().await {
3132
//! match event {
@@ -37,7 +38,7 @@
3738
//! }
3839
//! }
3940
//! }
40-
//! requester.shutdown();
41+
//! client.shutdown();
4142
//! }
4243
//! ```
4344
@@ -81,7 +82,7 @@ use tokio::sync::mpsc::UnboundedSender;
8182
pub use {
8283
crate::builder::Builder,
8384
crate::chain::ChainState,
84-
crate::client::{Client, Requester},
85+
crate::client::{Client, EventListeners},
8586
crate::error::{ClientError, NodeError},
8687
crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning},
8788
};

src/node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{
3131
error::HeaderSyncError,
3232
CFHeaderChanges, ChainState, FilterCheck, HeightMonitor,
3333
},
34+
client::Idle,
3435
error::FetchBlockError,
3536
messages::ClientRequest,
3637
network::{
@@ -66,7 +67,7 @@ pub(crate) struct Node {
6667
}
6768

6869
impl Node {
69-
pub(crate) fn build(network: Network, config: Config) -> Client {
70+
pub(crate) fn build(network: Network, config: Config) -> Client<Idle> {
7071
let Config {
7172
required_peers,
7273
white_list,

0 commit comments

Comments
 (0)