Skip to content

Commit 6df3b18

Browse files
authored
Merge pull request rust-bitcoin#131 from nyonson/bench-io
Add bufreader example
2 parents ed9c63b + e786037 commit 6df3b18

1 file changed

Lines changed: 232 additions & 0 deletions

File tree

protocol/examples/bufreader.rs

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// SPDX-License-Identifier: CC0-1.0
2+
3+
//! Benchmark exploring the performance impact of using a `BufReader` with BIP-324 protocol streams.
4+
//!
5+
//! The BIP-324 protocol requires many relatively small read operations. For every packet, first
6+
//! the 3-byte length is read and then the rest of the packet. Bitcoin p2p messages are also
7+
//! relatively small in size. These characteristics can lead to inefficient system calls.
8+
//!
9+
//! This example does not model real life very well, because the write half just dumps
10+
//! all the messages at once. In reality, a bitcoin p2p connection is bursty or even
11+
//! quite. And this doesn't model any sort of network latency or partially written
12+
//! packets. But this example *does* highlight how during heavy write periods, a bufreader
13+
//! improves performance by ironing out some of BIP-324 characteristics.
14+
//!
15+
//! # Usage
16+
//!
17+
//! ```bash
18+
//! cargo run --release --example bufreader --features tokio
19+
//! ```
20+
21+
use bip324::futures::Protocol;
22+
use bip324::{Network, Role};
23+
use std::fmt;
24+
use std::time::{Duration, Instant};
25+
use tokio::io::BufReader;
26+
use tokio::net::{TcpListener, TcpStream};
27+
28+
/// Test scenario configuration.
29+
#[derive(Clone)]
30+
struct Scenario {
31+
name: &'static str,
32+
/// Message traffic pattern set by the sizes of messages to send.
33+
message_sizes: Vec<usize>,
34+
/// Number of times to repeat the message traffic pattern.
35+
iterations: usize,
36+
}
37+
38+
impl Scenario {
39+
fn bitcoin_typical() -> Self {
40+
Self {
41+
name: "Bitcoin Traffic",
42+
// Some common bitcoin message sizes.
43+
//
44+
// * ping/pong: ~10 bytes
45+
// * inv: ~37 bytes per item
46+
// * addr: ~30 bytes per address
47+
// * tx: 200-500 bytes
48+
// * block header: ~80 bytes
49+
message_sizes: vec![10, 37, 30, 250, 80, 500, 37, 30, 10, 10],
50+
iterations: 10000,
51+
}
52+
}
53+
54+
fn large_messages() -> Self {
55+
Self {
56+
name: "Large Messages",
57+
message_sizes: vec![8192, 16384, 65536],
58+
iterations: 1000,
59+
}
60+
}
61+
62+
fn small_messages() -> Self {
63+
Self {
64+
name: "Small Messages",
65+
message_sizes: vec![1, 2, 3, 4, 5],
66+
iterations: 20000,
67+
}
68+
}
69+
70+
fn total_messages(&self) -> usize {
71+
self.message_sizes.len() * self.iterations
72+
}
73+
74+
fn total_bytes(&self) -> usize {
75+
self.message_sizes.iter().sum::<usize>() * self.iterations
76+
}
77+
78+
/// Display benchmark results for this scenario.
79+
fn display_results(&self, without_buf: Duration, with_buf: Duration) {
80+
let improvement = ((without_buf.as_secs_f64() - with_buf.as_secs_f64())
81+
/ without_buf.as_secs_f64())
82+
* 100.0;
83+
84+
println!("{self}");
85+
println!(" Without BufReader: {} ms", without_buf.as_millis());
86+
println!(" With BufReader: {} ms", with_buf.as_millis());
87+
println!(" Improvement: {improvement:.1}%");
88+
}
89+
}
90+
91+
impl fmt::Display for Scenario {
92+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93+
write!(
94+
f,
95+
"{}: {} total messages, {} bytes",
96+
self.name,
97+
self.total_messages(),
98+
self.total_bytes()
99+
)
100+
}
101+
}
102+
103+
/// Run benchmark for a specific scenario.
104+
async fn benchmark_scenario(scenario: &Scenario) -> Result<(), Box<dyn std::error::Error>> {
105+
let (server_addr, _server_handle) = start_server(scenario.clone()).await?;
106+
107+
let without_buf = Client::NonBuffered.run(&server_addr, scenario).await?;
108+
let with_buf = Client::Buffered.run(&server_addr, scenario).await?;
109+
110+
scenario.display_results(without_buf, with_buf);
111+
112+
Ok(())
113+
}
114+
115+
/// Start the server which write out all the messages of a scenario.
116+
async fn start_server(
117+
scenario: Scenario,
118+
) -> Result<(String, tokio::task::JoinHandle<()>), Box<dyn std::error::Error>> {
119+
let listener = TcpListener::bind("127.0.0.1:0").await?;
120+
let addr = listener.local_addr()?.to_string();
121+
122+
let handle = tokio::spawn(async move {
123+
// Handle two connections per scenario, one with buffer and one without buffer.
124+
for _ in 0..2 {
125+
let (stream, _) = listener.accept().await.unwrap();
126+
let (reader, writer) = stream.into_split();
127+
128+
let garbage = vec![0x88u8; 512];
129+
let mut protocol = Protocol::new(
130+
Network::Bitcoin,
131+
Role::Responder,
132+
Some(&garbage),
133+
None,
134+
reader,
135+
writer,
136+
)
137+
.await
138+
.unwrap();
139+
140+
// Pre-allocate messages to send.
141+
let messages: Vec<Vec<u8>> = scenario
142+
.message_sizes
143+
.iter()
144+
.map(|&size| vec![0x42u8; size])
145+
.collect();
146+
147+
// Dump them all at once. This is not very realistic,
148+
// but the test is trying trying to measure the read
149+
// syscalls. Don't want to introduce write performance.
150+
for _ in 0..scenario.iterations {
151+
for message in &messages {
152+
protocol.write(message).await.unwrap();
153+
}
154+
}
155+
}
156+
});
157+
158+
Ok((addr, handle))
159+
}
160+
161+
/// Client reads all the messages.
162+
enum Client {
163+
Buffered,
164+
NonBuffered,
165+
}
166+
167+
impl Client {
168+
/// Run the client for a scenario and return the duration to read all the messages.
169+
async fn run(
170+
&self,
171+
server_addr: &str,
172+
scenario: &Scenario,
173+
) -> Result<Duration, Box<dyn std::error::Error>> {
174+
let start = Instant::now();
175+
176+
let stream = TcpStream::connect(server_addr).await?;
177+
let (reader, writer) = stream.into_split();
178+
179+
match self {
180+
Client::Buffered => {
181+
let buffered_reader = BufReader::new(reader);
182+
let mut protocol = Protocol::new(
183+
Network::Bitcoin,
184+
Role::Initiator,
185+
None,
186+
None,
187+
buffered_reader,
188+
writer,
189+
)
190+
.await?;
191+
192+
// Read all messages
193+
for _ in 0..scenario.total_messages() {
194+
let _payload = protocol.read().await?;
195+
}
196+
}
197+
Client::NonBuffered => {
198+
let mut protocol = Protocol::new(
199+
Network::Bitcoin,
200+
Role::Initiator,
201+
None,
202+
None,
203+
reader,
204+
writer,
205+
)
206+
.await?;
207+
208+
// Read all messages
209+
for _ in 0..scenario.total_messages() {
210+
let _payload = protocol.read().await?;
211+
}
212+
}
213+
};
214+
215+
Ok(start.elapsed())
216+
}
217+
}
218+
219+
#[tokio::main]
220+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
221+
let scenarios = vec![
222+
Scenario::bitcoin_typical(),
223+
Scenario::large_messages(),
224+
Scenario::small_messages(),
225+
];
226+
227+
for scenario in scenarios {
228+
benchmark_scenario(&scenario).await?;
229+
}
230+
231+
Ok(())
232+
}

0 commit comments

Comments
 (0)