Skip to content

Commit dfe64ef

Browse files
committed
Enforce client semantics with state
Uses the sealed trait hack to only allow for compiler enforced state transactions. There is first a `run` step, followed by the typical methods a client would expect. I decided to scrap the `Requester` and instead encapsulate the event receivers with a newtype. Running a client gives access to the event receivers as well as the handle to the task. The only downside I see with this is `&mut self` doesn't seem to be possible, as each impl block is defined for it's `State` type. I need to fiddle with that to see if we can't remove the need to return the client each state transition.
1 parent bb7e673 commit dfe64ef

File tree

7 files changed

+215
-191
lines changed

7 files changed

+215
-191
lines changed

examples/bitcoin.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
44
use bip157::builder::Builder;
55
use bip157::chain::{BlockHeaderChanges, ChainState};
6-
use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf};
6+
use bip157::client::EventListeners;
7+
use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf};
78
use std::collections::HashSet;
89
use tokio::time::Instant;
910

@@ -35,17 +36,16 @@ async fn main() {
3536
// Create the node and client
3637
.build();
3738

38-
let client = client.run();
3939
// Split the client into components that send messages and listen to messages.
4040
// With this construction, different parts of the program can take ownership of
4141
// specific tasks.
42-
let Client {
43-
requester,
42+
let (client, events) = client.run();
43+
let EventListeners {
4444
mut info_rx,
4545
mut warn_rx,
4646
mut event_rx,
47-
..
48-
} = client;
47+
} = events;
48+
4949
// Continually listen for events until the node is synced to its peers.
5050
loop {
5151
tokio::select! {
@@ -55,11 +55,11 @@ async fn main() {
5555
Event::FiltersSynced(update) => {
5656
tracing::info!("Chain tip: {}",update.tip().hash);
5757
// Request information from the node
58-
let fee = requester.broadcast_min_feerate().await.unwrap();
58+
let fee = client.broadcast_min_feerate().await.unwrap();
5959
tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee);
6060
let sync_time = now.elapsed().as_secs_f32();
6161
tracing::info!("Total sync time: {sync_time} seconds");
62-
let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap();
62+
let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap();
6363
tracing::info!("Last block average fee rate: {:#}", avg_fee_rate);
6464
break;
6565
},
@@ -82,6 +82,6 @@ async fn main() {
8282
}
8383
}
8484
}
85-
let _ = requester.shutdown();
85+
let _ = client.shutdown();
8686
tracing::info!("Shutting down");
8787
}

examples/signet.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
44
use bip157::chain::{BlockHeaderChanges, ChainState};
55
use bip157::messages::Event;
6-
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client};
7-
use bip157::{Address, BlockHash, Network};
6+
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint};
7+
use bip157::{Address, BlockHash, EventListeners, Network};
88
use std::collections::HashSet;
99
use std::str::FromStr;
1010

@@ -40,15 +40,12 @@ async fn main() {
4040
// Create the node and client
4141
.build();
4242

43-
let client = client.run();
44-
45-
let Client {
46-
requester,
43+
let (client, events) = client.run();
44+
let EventListeners {
4745
mut info_rx,
4846
mut warn_rx,
4947
mut event_rx,
50-
..
51-
} = client;
48+
} = events;
5249

5350
// Continually listen for events until the node is synced to its peers.
5451
loop {
@@ -72,7 +69,7 @@ async fn main() {
7269
if filter.contains_any(addresses.iter()) {
7370
let hash = filter.block_hash();
7471
tracing::info!("Found script at {}!", hash);
75-
let indexed_block = requester.get_block(hash).await.unwrap();
72+
let indexed_block = client.get_block(hash).await.unwrap();
7673
let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid();
7774
tracing::info!("Coinbase transaction ID: {}", coinbase);
7875
break;
@@ -88,6 +85,6 @@ async fn main() {
8885
}
8986
}
9087
}
91-
let _ = requester.shutdown();
88+
let _ = client.shutdown();
9289
tracing::info!("Shutting down");
9390
}

src/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use bitcoin::Network;
55

66
use super::{client::Client, node::Node};
77
use crate::chain::ChainState;
8+
use crate::client::Idle;
89
use crate::network::ConnectionType;
910
use crate::TrustedPeer;
1011
use crate::{Config, FilterType};
@@ -139,7 +140,7 @@ impl Builder {
139140
}
140141

141142
/// Consume the node builder and receive a [`Client`].
142-
pub fn build(mut self) -> Client {
143+
pub fn build(mut self) -> Client<Idle> {
143144
Node::build(self.network, core::mem::take(&mut self.config))
144145
}
145146
}

src/client.rs

Lines changed: 87 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,51 +12,100 @@ use crate::{Event, Info, TrustedPeer, Warning};
1212
use super::{error::ClientError, messages::ClientMessage};
1313
use super::{error::FetchBlockError, IndexedBlock};
1414

15-
/// A [`Client`] allows for communication with a running node.
15+
/// Client state when idle.
16+
pub struct Idle;
17+
/// Client state when active.
18+
pub struct Active;
19+
20+
mod sealed {
21+
pub trait Sealed {}
22+
}
23+
24+
impl sealed::Sealed for Idle {}
25+
impl sealed::Sealed for Active {}
26+
27+
/// State of the client
28+
pub trait State: sealed::Sealed {}
29+
30+
impl State for Idle {}
31+
impl State for Active {}
32+
33+
/// Wrapper type for the channels that will receive events.
1634
#[derive(Debug)]
17-
pub struct Client {
18-
/// Send events to a node, such as broadcasting a transaction.
19-
pub requester: Requester,
35+
pub struct EventListeners {
2036
/// Receive informational messages from the node.
2137
pub info_rx: mpsc::Receiver<Info>,
2238
/// Receive warning messages from a node.
2339
pub warn_rx: mpsc::UnboundedReceiver<Warning>,
2440
/// Receive [`Event`] from a node to act on.
2541
pub event_rx: mpsc::UnboundedReceiver<Event>,
26-
/// Internal node structure.
27-
node: Option<Node>,
2842
}
2943

30-
impl Client {
31-
pub(crate) fn new(
44+
impl EventListeners {
45+
fn new(
3246
info_rx: mpsc::Receiver<Info>,
3347
warn_rx: mpsc::UnboundedReceiver<Warning>,
3448
event_rx: mpsc::UnboundedReceiver<Event>,
35-
ntx: UnboundedSender<ClientMessage>,
36-
node: Node,
3749
) -> Self {
3850
Self {
39-
requester: Requester::new(ntx),
4051
info_rx,
4152
warn_rx,
4253
event_rx,
43-
node: Some(node),
4454
}
4555
}
56+
}
57+
58+
/// A [`Client`] allows for communication with a running node.
59+
#[derive(Debug)]
60+
pub struct Client<S: State> {
61+
/// Send events to a node, such as broadcasting a transaction.
62+
ntx: UnboundedSender<ClientMessage>,
63+
/// Receive informational messages from the node.
64+
events: Option<EventListeners>,
65+
/// Internal node structure.
66+
node: Option<Node>,
67+
/// Marker for state.
68+
_marker: core::marker::PhantomData<S>,
69+
}
4670

71+
impl Client<Idle> {
72+
pub(crate) fn new(
73+
info_rx: mpsc::Receiver<Info>,
74+
warn_rx: mpsc::UnboundedReceiver<Warning>,
75+
event_rx: mpsc::UnboundedReceiver<Event>,
76+
ntx: UnboundedSender<ClientMessage>,
77+
node: Node,
78+
) -> Client<Idle> {
79+
Client {
80+
ntx,
81+
events: Some(EventListeners::new(info_rx, warn_rx, event_rx)),
82+
node: Some(node),
83+
_marker: core::marker::PhantomData,
84+
}
85+
}
4786
/// Start the underlying node on a [`tokio::task`]. This assumes there is a runtime present to
4887
/// execute the task.
49-
pub fn run(mut self) -> Self {
88+
pub fn run(mut self) -> (Client<Active>, EventListeners) {
89+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
5090
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
5191
tokio::task::spawn(async move { node.run().await });
52-
self
92+
(
93+
Client {
94+
ntx: self.ntx,
95+
events: None,
96+
node: None,
97+
_marker: core::marker::PhantomData,
98+
},
99+
events,
100+
)
53101
}
54102

55103
/// Run on a detached operating system thread. This method is useful in the case where the
56104
/// majority of your application code is blocking, and you do not have a
57105
/// [`tokio::runtime::Runtime`] available. This method will implicitly create a runtime which
58106
/// runs the data fetching process.
59-
pub fn run_detached(mut self) -> Self {
107+
pub fn run_detached(mut self) -> (Client<Active>, EventListeners) {
108+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
60109
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
61110
std::thread::spawn(|| {
62111
tokio::runtime::Builder::new_multi_thread()
@@ -67,29 +116,38 @@ impl Client {
67116
let _ = node.run().await;
68117
})
69118
});
70-
self
119+
let client = Client {
120+
ntx: self.ntx,
121+
events: None,
122+
node: None,
123+
_marker: core::marker::PhantomData,
124+
};
125+
(client, events)
71126
}
72127

73128
/// Run the node with an existing [`tokio::runtime::Runtime`].
74-
pub fn run_with_runtime(mut self, rt: impl AsRef<tokio::runtime::Runtime>) -> Self {
129+
pub fn run_with_runtime(
130+
mut self,
131+
rt: impl AsRef<tokio::runtime::Runtime>,
132+
) -> (
133+
Client<Active>,
134+
EventListeners,
135+
) {
75136
let rt = rt.as_ref();
137+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
76138
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
77139
rt.spawn(async move { node.run().await });
78-
self
140+
let client = Client {
141+
ntx: self.ntx,
142+
events: None,
143+
node: None,
144+
_marker: core::marker::PhantomData,
145+
};
146+
(client, events)
79147
}
80148
}
81149

82-
/// Send messages to a node that is running so the node may complete a task.
83-
#[derive(Debug, Clone)]
84-
pub struct Requester {
85-
ntx: UnboundedSender<ClientMessage>,
86-
}
87-
88-
impl Requester {
89-
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
90-
Self { ntx }
91-
}
92-
150+
impl Client<Active> {
93151
/// Tell the node to shut down.
94152
///
95153
/// # Errors

src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! # Example usage
99
//!
1010
//! ```no_run
11-
//! use bip157::{Builder, Event, Client, Network, BlockHash};
11+
//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash};
1212
//!
1313
//! #[tokio::main]
1414
//! async fn main() {
@@ -22,10 +22,11 @@
2222
//! // The number of connections we would like to maintain
2323
//! .required_peers(2)
2424
//! .build();
25-
//! // Run the node and wait for the sync message;
26-
//! let client = client.run();
25+
//! // Start the node
26+
//! let (client, events) = client.run();
2727
//! // Split the client into components that send messages and listen to messages
28-
//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx, .. } = client;
28+
//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events;
29+
//! // Wait for the sync message;
2930
//! loop {
3031
//! if let Some(event) = event_rx.recv().await {
3132
//! match event {
@@ -37,7 +38,7 @@
3738
//! }
3839
//! }
3940
//! }
40-
//! requester.shutdown();
41+
//! client.shutdown();
4142
//! }
4243
//! ```
4344
@@ -81,7 +82,7 @@ use tokio::sync::mpsc::UnboundedSender;
8182
pub use {
8283
crate::builder::Builder,
8384
crate::chain::ChainState,
84-
crate::client::{Client, Requester},
85+
crate::client::{Client, EventListeners},
8586
crate::error::{ClientError, NodeError},
8687
crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning},
8788
};

src/node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{
3131
error::HeaderSyncError,
3232
CFHeaderChanges, ChainState, FilterCheck, HeightMonitor,
3333
},
34+
client::Idle,
3435
error::FetchBlockError,
3536
messages::ClientRequest,
3637
network::{
@@ -66,7 +67,7 @@ pub(crate) struct Node {
6667
}
6768

6869
impl Node {
69-
pub(crate) fn build(network: Network, config: Config) -> Client {
70+
pub(crate) fn build(network: Network, config: Config) -> Client<Idle> {
7071
let Config {
7172
required_peers,
7273
white_list,

0 commit comments

Comments
 (0)