Skip to content

Commit 93f1874

Browse files
authored
Merge pull request #521 from rustaceanrob/11-20-network-cleanup
Make `MessageGenerator` infallible
2 parents 194263c + d7db51c commit 93f1874

File tree

6 files changed

+48
-76
lines changed

6 files changed

+48
-76
lines changed

src/network/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use error::PeerError;
3030

3131
pub(crate) mod dns;
3232
pub(crate) mod error;
33-
pub(crate) mod outbound_messages;
34-
pub(crate) mod parsers;
33+
pub(crate) mod inbound;
34+
pub(crate) mod outbound;
3535
pub(crate) mod peer;
3636
pub(crate) mod peer_map;
3737
pub(crate) mod reader;
@@ -389,7 +389,7 @@ impl TimeSensitiveId {
389389
#[derive(Debug, Clone)]
390390
pub(crate) enum MainThreadMessage {
391391
GetAddr,
392-
GetAddrV2,
392+
SendAddrV2,
393393
WtxidRelay,
394394
#[allow(unused)]
395395
SendHeaders,
Lines changed: 21 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,125 +18,94 @@ use bitcoin::{
1818

1919
use crate::default_port_from_network;
2020

21-
use super::{error::PeerError, KYOTO_VERSION, PROTOCOL_VERSION, RUST_BITCOIN_VERSION};
21+
use super::{KYOTO_VERSION, PROTOCOL_VERSION, RUST_BITCOIN_VERSION};
2222

2323
// Responsible for serializing messages to write over the wire, either encrypted or plaintext.
24-
pub(crate) struct MessageGenerator {
24+
pub(in crate::network) struct MessageGenerator {
2525
pub network: Network,
2626
pub transport: Transport,
2727
}
2828

29-
pub(crate) enum Transport {
29+
pub(in crate::network) enum Transport {
3030
V1,
3131
V2 { encryptor: PacketWriter },
3232
}
3333

3434
impl MessageGenerator {
35-
fn serialize(&mut self, msg: NetworkMessage) -> Result<Vec<u8>, PeerError> {
35+
pub(in crate::network) fn serialize(&mut self, msg: NetworkMessage) -> Vec<u8> {
3636
match &mut self.transport {
3737
Transport::V1 => {
3838
let data = RawNetworkMessage::new(self.network.magic(), msg);
39-
Ok(serialize(&data))
39+
serialize(&data)
4040
}
4141
Transport::V2 { encryptor } => {
42-
let plaintext = serialize_network_message(msg)?;
42+
let plaintext = serialize_network_message(msg);
4343
encrypt_plaintext(encryptor, plaintext)
4444
}
4545
}
4646
}
4747

48-
pub(crate) fn version_message(&mut self, port: Option<u16>) -> Result<Vec<u8>, PeerError> {
48+
pub(in crate::network) fn version_message(&mut self, port: Option<u16>) -> Vec<u8> {
4949
let msg = NetworkMessage::Version(make_version(port, &self.network));
5050
self.serialize(msg)
5151
}
5252

53-
pub(crate) fn verack(&mut self) -> Result<Vec<u8>, PeerError> {
54-
let msg = NetworkMessage::Verack;
55-
self.serialize(msg)
56-
}
57-
58-
pub(crate) fn addr(&mut self) -> Result<Vec<u8>, PeerError> {
59-
let msg = NetworkMessage::GetAddr;
60-
self.serialize(msg)
61-
}
62-
63-
pub(crate) fn addrv2(&mut self) -> Result<Vec<u8>, PeerError> {
64-
let msg = NetworkMessage::SendAddrV2;
65-
self.serialize(msg)
66-
}
67-
68-
pub(crate) fn wtxid_relay(&mut self) -> Result<Vec<u8>, PeerError> {
69-
let msg = NetworkMessage::WtxidRelay;
70-
self.serialize(msg)
71-
}
72-
73-
pub(crate) fn sendheaders(&mut self) -> Result<Vec<u8>, PeerError> {
74-
let msg = NetworkMessage::SendHeaders;
75-
self.serialize(msg)
76-
}
77-
78-
pub(crate) fn headers(&mut self, msg: GetHeadersMessage) -> Result<Vec<u8>, PeerError> {
53+
pub(in crate::network) fn headers(&mut self, msg: GetHeadersMessage) -> Vec<u8> {
7954
let msg = NetworkMessage::GetHeaders(msg);
8055
self.serialize(msg)
8156
}
8257

83-
pub(crate) fn cf_headers(&mut self, message: GetCFHeaders) -> Result<Vec<u8>, PeerError> {
58+
pub(in crate::network) fn cf_headers(&mut self, message: GetCFHeaders) -> Vec<u8> {
8459
let msg = NetworkMessage::GetCFHeaders(message);
8560
self.serialize(msg)
8661
}
8762

88-
pub(crate) fn filters(&mut self, message: GetCFilters) -> Result<Vec<u8>, PeerError> {
63+
pub(in crate::network) fn filters(&mut self, message: GetCFilters) -> Vec<u8> {
8964
let msg = NetworkMessage::GetCFilters(message);
9065
self.serialize(msg)
9166
}
9267

93-
pub(crate) fn block(&mut self, hash: BlockHash) -> Result<Vec<u8>, PeerError> {
68+
pub(in crate::network) fn block(&mut self, hash: BlockHash) -> Vec<u8> {
9469
let inv = Inventory::Block(hash);
9570
let msg = NetworkMessage::GetData(vec![inv]);
9671
self.serialize(msg)
9772
}
9873

99-
pub(crate) fn ping(&mut self, nonce: u64) -> Result<Vec<u8>, PeerError> {
74+
pub(in crate::network) fn ping(&mut self, nonce: u64) -> Vec<u8> {
10075
let msg = NetworkMessage::Ping(nonce);
10176
self.serialize(msg)
10277
}
10378

104-
pub(crate) fn pong(&mut self, nonce: u64) -> Result<Vec<u8>, PeerError> {
79+
pub(in crate::network) fn pong(&mut self, nonce: u64) -> Vec<u8> {
10580
let msg = NetworkMessage::Pong(nonce);
10681
self.serialize(msg)
10782
}
10883

109-
pub(crate) fn announce_transactions(
110-
&mut self,
111-
wtxids: Vec<Wtxid>,
112-
) -> Result<Vec<u8>, PeerError> {
84+
pub(in crate::network) fn announce_transactions(&mut self, wtxids: Vec<Wtxid>) -> Vec<u8> {
11385
let msg = NetworkMessage::Inv(wtxids.into_iter().map(Inventory::WTx).collect());
11486
self.serialize(msg)
11587
}
11688

117-
pub(crate) fn broadcast_transaction(
89+
pub(in crate::network) fn broadcast_transaction(
11890
&mut self,
11991
transaction: Transaction,
120-
) -> Result<Vec<u8>, PeerError> {
92+
) -> Vec<u8> {
12193
let msg = NetworkMessage::Tx(transaction);
12294
self.serialize(msg)
12395
}
12496
}
12597

126-
fn serialize_network_message(message: NetworkMessage) -> Result<Vec<u8>, PeerError> {
127-
bip324::serde::serialize(message).map_err(From::from)
98+
fn serialize_network_message(message: NetworkMessage) -> Vec<u8> {
99+
bip324::serde::serialize(message).expect("in memory serialization cannot fail.")
128100
}
129101

130-
fn encrypt_plaintext(
131-
encryptor: &mut PacketWriter,
132-
plaintext: Vec<u8>,
133-
) -> Result<Vec<u8>, PeerError> {
102+
fn encrypt_plaintext(encryptor: &mut PacketWriter, plaintext: Vec<u8>) -> Vec<u8> {
134103
encryptor
135104
.encrypt_packet(&plaintext, None, PacketType::Genuine)
136-
.map_err(From::from)
105+
.expect("encryption to in memory buffer cannot fail.")
137106
}
138107

139-
fn make_version(port: Option<u16>, network: &Network) -> VersionMessage {
108+
pub(in crate::network) fn make_version(port: Option<u16>, network: &Network) -> VersionMessage {
140109
let now = SystemTime::now()
141110
.duration_since(UNIX_EPOCH)
142111
.expect("time went backwards")

src/network/peer.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use std::{sync::Arc, time::Duration};
33

44
use addrman::Record;
55
use bip324::{AsyncProtocol, PacketReader, PacketWriter, Role};
6-
use bitcoin::{p2p::ServiceFlags, Network};
6+
use bitcoin::{
7+
p2p::{message::NetworkMessage, ServiceFlags},
8+
Network,
9+
};
710
use tokio::{
811
io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
912
net::TcpStream,
@@ -19,8 +22,8 @@ use crate::{broadcaster::BroadcastQueue, messages::Warning, Dialog, Info};
1922

2023
use super::{
2124
error::PeerError,
22-
outbound_messages::{MessageGenerator, Transport},
23-
parsers::MessageParser,
25+
inbound::MessageParser,
26+
outbound::{MessageGenerator, Transport},
2427
reader::{Reader, ReaderMessage},
2528
AddressBook, MainThreadMessage, MessageState, PeerId, PeerMessage, PeerThreadMessage,
2629
PeerTimeoutConfig, TimeSensitiveId,
@@ -102,7 +105,7 @@ impl Peer {
102105
(outbound_messages, reader)
103106
};
104107

105-
let message = outbound_messages.version_message(None)?;
108+
let message = outbound_messages.version_message(None);
106109
self.write_bytes(&mut writer, message).await?;
107110
self.message_state.start_version_handshake();
108111
let read_handle = tokio::spawn(async move { peer_reader.read_from_remote().await });
@@ -113,7 +116,7 @@ impl Peer {
113116
return Ok(());
114117
}
115118
if let Some(nonce) = self.message_state.ping_state.send_ping() {
116-
let msg = outbound_messages.ping(nonce)?;
119+
let msg = outbound_messages.ping(nonce);
117120
self.write_bytes(&mut writer, msg).await?;
118121
let msg_id = TimeSensitiveId::PING;
119122
self.message_state
@@ -261,7 +264,7 @@ impl Peer {
261264
for wtxid in requests {
262265
let transaction = tx_queue.fetch_tx(wtxid);
263266
if let Some(transaction) = transaction {
264-
let msg = message_generator.broadcast_transaction(transaction)?;
267+
let msg = message_generator.broadcast_transaction(transaction);
265268
self.write_bytes(writer, msg).await?;
266269
self.message_state.sent_tx(wtxid);
267270
tx_queue.successful(wtxid);
@@ -285,7 +288,7 @@ impl Peer {
285288
Ok(())
286289
}
287290
ReaderMessage::Ping(nonce) => {
288-
let message = message_generator.pong(nonce)?;
291+
let message = message_generator.pong(nonce);
289292
self.write_bytes(writer, message).await?;
290293
Ok(())
291294
}
@@ -333,38 +336,38 @@ impl Peer {
333336
}
334337
match request {
335338
MainThreadMessage::GetAddr => {
336-
let message = message_generator.addr()?;
339+
let message = message_generator.serialize(NetworkMessage::GetAddr);
337340
self.write_bytes(writer, message).await?;
338341
}
339-
MainThreadMessage::GetAddrV2 => {
340-
let message = message_generator.addrv2()?;
342+
MainThreadMessage::SendAddrV2 => {
343+
let message = message_generator.serialize(NetworkMessage::SendAddrV2);
341344
self.write_bytes(writer, message).await?;
342345
}
343346
MainThreadMessage::WtxidRelay => {
344-
let message = message_generator.wtxid_relay()?;
347+
let message = message_generator.serialize(NetworkMessage::WtxidRelay);
345348
self.write_bytes(writer, message).await?;
346349
}
347350
MainThreadMessage::SendHeaders => {
348-
let message = message_generator.sendheaders()?;
351+
let message = message_generator.serialize(NetworkMessage::SendHeaders);
349352
self.write_bytes(writer, message).await?;
350353
}
351354
MainThreadMessage::GetHeaders(config) => {
352-
let message = message_generator.headers(config)?;
355+
let message = message_generator.headers(config);
353356
self.write_bytes(writer, message).await?;
354357
}
355358
MainThreadMessage::GetFilterHeaders(config) => {
356-
let message = message_generator.cf_headers(config)?;
359+
let message = message_generator.cf_headers(config);
357360
self.write_bytes(writer, message).await?;
358361
}
359362
MainThreadMessage::GetFilters(config) => {
360363
self.message_state
361364
.filter_rate
362365
.batch_requested(config.stop_hash);
363-
let message = message_generator.filters(config)?;
366+
let message = message_generator.filters(config);
364367
self.write_bytes(writer, message).await?;
365368
}
366369
MainThreadMessage::GetBlock(message) => {
367-
let message = message_generator.block(message)?;
370+
let message = message_generator.block(message);
368371
self.write_bytes(writer, message).await?;
369372
}
370373
MainThreadMessage::BroadcastPending => {
@@ -377,12 +380,12 @@ impl Peer {
377380
queue.pending_wtxid()
378381
};
379382
if !wtxids.is_empty() {
380-
let message = message_generator.announce_transactions(wtxids)?;
383+
let message = message_generator.announce_transactions(wtxids);
381384
self.write_bytes(writer, message).await?;
382385
}
383386
}
384387
MainThreadMessage::Verack => {
385-
let message = message_generator.verack()?;
388+
let message = message_generator.serialize(NetworkMessage::Verack);
386389
self.write_bytes(writer, message).await?;
387390
self.message_state.verack.sent_ack();
388391
if self.message_state.verack.both_acks() {
@@ -395,7 +398,7 @@ impl Peer {
395398
queue.pending_wtxid()
396399
};
397400
if !wtxids.is_empty() {
398-
let message = message_generator.announce_transactions(wtxids)?;
401+
let message = message_generator.announce_transactions(wtxids);
399402
self.write_bytes(writer, message).await?;
400403
}
401404
}

src/network/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use tokio::sync::mpsc::Sender;
1818
use crate::messages::RejectPayload;
1919

2020
use super::error::ReaderError;
21-
use super::parsers::MessageParser;
21+
use super::inbound::MessageParser;
2222
use super::TimeSensitiveId;
2323

2424
// From Bitcoin Core PR #29575

src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ impl Node {
401401
self.peer_map.tried(nonce).await;
402402
// First we signal for ADDRV2 support
403403
self.peer_map
404-
.send_message(nonce, MainThreadMessage::GetAddrV2)
404+
.send_message(nonce, MainThreadMessage::SendAddrV2)
405405
.await;
406406
// Then for BIP 339 witness transaction broadcast
407407
self.peer_map

0 commit comments

Comments
 (0)