From 838fbd24794902dd671521b4d13157a89d811e72 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Thu, 20 Nov 2025 15:11:05 +0000 Subject: [PATCH 1/3] Make `MessageGenerator` infallible The in-memory buffer cannot fail when serializing a message, and the encryption cannot fail when memory is know as well. --- src/network/outbound_messages.rs | 58 ++++++++++++++------------------ src/network/peer.rs | 30 ++++++++--------- 2 files changed, 41 insertions(+), 47 deletions(-) diff --git a/src/network/outbound_messages.rs b/src/network/outbound_messages.rs index 815aeb42..54a3aab2 100644 --- a/src/network/outbound_messages.rs +++ b/src/network/outbound_messages.rs @@ -18,125 +18,119 @@ use bitcoin::{ use crate::default_port_from_network; -use super::{error::PeerError, KYOTO_VERSION, PROTOCOL_VERSION, RUST_BITCOIN_VERSION}; +use super::{KYOTO_VERSION, PROTOCOL_VERSION, RUST_BITCOIN_VERSION}; // Responsible for serializing messages to write over the wire, either encrypted or plaintext. -pub(crate) struct MessageGenerator { +pub(in crate::network) struct MessageGenerator { pub network: Network, pub transport: Transport, } -pub(crate) enum Transport { +pub(in crate::network) enum Transport { V1, V2 { encryptor: PacketWriter }, } impl MessageGenerator { - fn serialize(&mut self, msg: NetworkMessage) -> Result, PeerError> { + pub(in crate::network) fn serialize(&mut self, msg: NetworkMessage) -> Vec { match &mut self.transport { Transport::V1 => { let data = RawNetworkMessage::new(self.network.magic(), msg); - Ok(serialize(&data)) + serialize(&data) } Transport::V2 { encryptor } => { - let plaintext = serialize_network_message(msg)?; + let plaintext = serialize_network_message(msg); encrypt_plaintext(encryptor, plaintext) } } } - pub(crate) fn version_message(&mut self, port: Option) -> Result, PeerError> { + pub(in crate::network) fn version_message(&mut self, port: Option) -> Vec { let msg = NetworkMessage::Version(make_version(port, &self.network)); self.serialize(msg) } - pub(crate) fn verack(&mut self) -> Result, PeerError> { + pub(in crate::network) fn verack(&mut self) -> Vec { let msg = NetworkMessage::Verack; self.serialize(msg) } - pub(crate) fn addr(&mut self) -> Result, PeerError> { + pub(in crate::network) fn addr(&mut self) -> Vec { let msg = NetworkMessage::GetAddr; self.serialize(msg) } - pub(crate) fn addrv2(&mut self) -> Result, PeerError> { + pub(in crate::network) fn addrv2(&mut self) -> Vec { let msg = NetworkMessage::SendAddrV2; self.serialize(msg) } - pub(crate) fn wtxid_relay(&mut self) -> Result, PeerError> { + pub(in crate::network) fn wtxid_relay(&mut self) -> Vec { let msg = NetworkMessage::WtxidRelay; self.serialize(msg) } - pub(crate) fn sendheaders(&mut self) -> Result, PeerError> { + pub(in crate::network) fn sendheaders(&mut self) -> Vec { let msg = NetworkMessage::SendHeaders; self.serialize(msg) } - pub(crate) fn headers(&mut self, msg: GetHeadersMessage) -> Result, PeerError> { + pub(in crate::network) fn headers(&mut self, msg: GetHeadersMessage) -> Vec { let msg = NetworkMessage::GetHeaders(msg); self.serialize(msg) } - pub(crate) fn cf_headers(&mut self, message: GetCFHeaders) -> Result, PeerError> { + pub(in crate::network) fn cf_headers(&mut self, message: GetCFHeaders) -> Vec { let msg = NetworkMessage::GetCFHeaders(message); self.serialize(msg) } - pub(crate) fn filters(&mut self, message: GetCFilters) -> Result, PeerError> { + pub(in crate::network) fn filters(&mut self, message: GetCFilters) -> Vec { let msg = NetworkMessage::GetCFilters(message); self.serialize(msg) } - pub(crate) fn block(&mut self, hash: BlockHash) -> Result, PeerError> { + pub(in crate::network) fn block(&mut self, hash: BlockHash) -> Vec { let inv = Inventory::Block(hash); let msg = NetworkMessage::GetData(vec![inv]); self.serialize(msg) } - pub(crate) fn ping(&mut self, nonce: u64) -> Result, PeerError> { + pub(in crate::network) fn ping(&mut self, nonce: u64) -> Vec { let msg = NetworkMessage::Ping(nonce); self.serialize(msg) } - pub(crate) fn pong(&mut self, nonce: u64) -> Result, PeerError> { + pub(in crate::network) fn pong(&mut self, nonce: u64) -> Vec { let msg = NetworkMessage::Pong(nonce); self.serialize(msg) } - pub(crate) fn announce_transactions( - &mut self, - wtxids: Vec, - ) -> Result, PeerError> { + pub(in crate::network) fn announce_transactions(&mut self, wtxids: Vec) -> Vec { let msg = NetworkMessage::Inv(wtxids.into_iter().map(Inventory::WTx).collect()); self.serialize(msg) } - pub(crate) fn broadcast_transaction( + pub(in crate::network) fn broadcast_transaction( &mut self, transaction: Transaction, - ) -> Result, PeerError> { + ) -> Vec { let msg = NetworkMessage::Tx(transaction); self.serialize(msg) } } -fn serialize_network_message(message: NetworkMessage) -> Result, PeerError> { - bip324::serde::serialize(message).map_err(From::from) +fn serialize_network_message(message: NetworkMessage) -> Vec { + bip324::serde::serialize(message).expect("in memory serialization cannot fail.") } -fn encrypt_plaintext( - encryptor: &mut PacketWriter, - plaintext: Vec, -) -> Result, PeerError> { +fn encrypt_plaintext(encryptor: &mut PacketWriter, plaintext: Vec) -> Vec { encryptor .encrypt_packet(&plaintext, None, PacketType::Genuine) - .map_err(From::from) + .expect("encryption to in memory buffer cannot fail.") } -fn make_version(port: Option, network: &Network) -> VersionMessage { +pub(in crate::network) fn make_version(port: Option, network: &Network) -> VersionMessage { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("time went backwards") diff --git a/src/network/peer.rs b/src/network/peer.rs index cb41bdcf..71d1638a 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -102,7 +102,7 @@ impl Peer { (outbound_messages, reader) }; - let message = outbound_messages.version_message(None)?; + let message = outbound_messages.version_message(None); self.write_bytes(&mut writer, message).await?; self.message_state.start_version_handshake(); let read_handle = tokio::spawn(async move { peer_reader.read_from_remote().await }); @@ -113,7 +113,7 @@ impl Peer { return Ok(()); } if let Some(nonce) = self.message_state.ping_state.send_ping() { - let msg = outbound_messages.ping(nonce)?; + let msg = outbound_messages.ping(nonce); self.write_bytes(&mut writer, msg).await?; let msg_id = TimeSensitiveId::PING; self.message_state @@ -261,7 +261,7 @@ impl Peer { for wtxid in requests { let transaction = tx_queue.fetch_tx(wtxid); if let Some(transaction) = transaction { - let msg = message_generator.broadcast_transaction(transaction)?; + let msg = message_generator.broadcast_transaction(transaction); self.write_bytes(writer, msg).await?; self.message_state.sent_tx(wtxid); tx_queue.successful(wtxid); @@ -285,7 +285,7 @@ impl Peer { Ok(()) } ReaderMessage::Ping(nonce) => { - let message = message_generator.pong(nonce)?; + let message = message_generator.pong(nonce); self.write_bytes(writer, message).await?; Ok(()) } @@ -333,38 +333,38 @@ impl Peer { } match request { MainThreadMessage::GetAddr => { - let message = message_generator.addr()?; + let message = message_generator.addr(); self.write_bytes(writer, message).await?; } MainThreadMessage::GetAddrV2 => { - let message = message_generator.addrv2()?; + let message = message_generator.addrv2(); self.write_bytes(writer, message).await?; } MainThreadMessage::WtxidRelay => { - let message = message_generator.wtxid_relay()?; + let message = message_generator.wtxid_relay(); self.write_bytes(writer, message).await?; } MainThreadMessage::SendHeaders => { - let message = message_generator.sendheaders()?; + let message = message_generator.sendheaders(); self.write_bytes(writer, message).await?; } MainThreadMessage::GetHeaders(config) => { - let message = message_generator.headers(config)?; + let message = message_generator.headers(config); self.write_bytes(writer, message).await?; } MainThreadMessage::GetFilterHeaders(config) => { - let message = message_generator.cf_headers(config)?; + let message = message_generator.cf_headers(config); self.write_bytes(writer, message).await?; } MainThreadMessage::GetFilters(config) => { self.message_state .filter_rate .batch_requested(config.stop_hash); - let message = message_generator.filters(config)?; + let message = message_generator.filters(config); self.write_bytes(writer, message).await?; } MainThreadMessage::GetBlock(message) => { - let message = message_generator.block(message)?; + let message = message_generator.block(message); self.write_bytes(writer, message).await?; } MainThreadMessage::BroadcastPending => { @@ -377,12 +377,12 @@ impl Peer { queue.pending_wtxid() }; if !wtxids.is_empty() { - let message = message_generator.announce_transactions(wtxids)?; + let message = message_generator.announce_transactions(wtxids); self.write_bytes(writer, message).await?; } } MainThreadMessage::Verack => { - let message = message_generator.verack()?; + let message = message_generator.verack(); self.write_bytes(writer, message).await?; self.message_state.verack.sent_ack(); if self.message_state.verack.both_acks() { @@ -395,7 +395,7 @@ impl Peer { queue.pending_wtxid() }; if !wtxids.is_empty() { - let message = message_generator.announce_transactions(wtxids)?; + let message = message_generator.announce_transactions(wtxids); self.write_bytes(writer, message).await?; } } From ded16e1f31f7282a80c0231564863b628ad918a7 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Thu, 20 Nov 2025 15:49:43 +0000 Subject: [PATCH 2/3] Remove simple `MessageGenerator` methods Use `serialize` to remove some of the code bloat in `outbound_messages`. --- src/network/mod.rs | 2 +- src/network/outbound_messages.rs | 25 ------------------------- src/network/peer.rs | 17 ++++++++++------- src/node.rs | 2 +- 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/src/network/mod.rs b/src/network/mod.rs index c6cfb8f6..0e427633 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -389,7 +389,7 @@ impl TimeSensitiveId { #[derive(Debug, Clone)] pub(crate) enum MainThreadMessage { GetAddr, - GetAddrV2, + SendAddrV2, WtxidRelay, #[allow(unused)] SendHeaders, diff --git a/src/network/outbound_messages.rs b/src/network/outbound_messages.rs index 54a3aab2..5b19ce37 100644 --- a/src/network/outbound_messages.rs +++ b/src/network/outbound_messages.rs @@ -50,31 +50,6 @@ impl MessageGenerator { self.serialize(msg) } - pub(in crate::network) fn verack(&mut self) -> Vec { - let msg = NetworkMessage::Verack; - self.serialize(msg) - } - - pub(in crate::network) fn addr(&mut self) -> Vec { - let msg = NetworkMessage::GetAddr; - self.serialize(msg) - } - - pub(in crate::network) fn addrv2(&mut self) -> Vec { - let msg = NetworkMessage::SendAddrV2; - self.serialize(msg) - } - - pub(in crate::network) fn wtxid_relay(&mut self) -> Vec { - let msg = NetworkMessage::WtxidRelay; - self.serialize(msg) - } - - pub(in crate::network) fn sendheaders(&mut self) -> Vec { - let msg = NetworkMessage::SendHeaders; - self.serialize(msg) - } - pub(in crate::network) fn headers(&mut self, msg: GetHeadersMessage) -> Vec { let msg = NetworkMessage::GetHeaders(msg); self.serialize(msg) diff --git a/src/network/peer.rs b/src/network/peer.rs index 71d1638a..7755a1cd 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -3,7 +3,10 @@ use std::{sync::Arc, time::Duration}; use addrman::Record; use bip324::{AsyncProtocol, PacketReader, PacketWriter, Role}; -use bitcoin::{p2p::ServiceFlags, Network}; +use bitcoin::{ + p2p::{message::NetworkMessage, ServiceFlags}, + Network, +}; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}, net::TcpStream, @@ -333,19 +336,19 @@ impl Peer { } match request { MainThreadMessage::GetAddr => { - let message = message_generator.addr(); + let message = message_generator.serialize(NetworkMessage::GetAddr); self.write_bytes(writer, message).await?; } - MainThreadMessage::GetAddrV2 => { - let message = message_generator.addrv2(); + MainThreadMessage::SendAddrV2 => { + let message = message_generator.serialize(NetworkMessage::SendAddrV2); self.write_bytes(writer, message).await?; } MainThreadMessage::WtxidRelay => { - let message = message_generator.wtxid_relay(); + let message = message_generator.serialize(NetworkMessage::WtxidRelay); self.write_bytes(writer, message).await?; } MainThreadMessage::SendHeaders => { - let message = message_generator.sendheaders(); + let message = message_generator.serialize(NetworkMessage::SendHeaders); self.write_bytes(writer, message).await?; } MainThreadMessage::GetHeaders(config) => { @@ -382,7 +385,7 @@ impl Peer { } } MainThreadMessage::Verack => { - let message = message_generator.verack(); + let message = message_generator.serialize(NetworkMessage::Verack); self.write_bytes(writer, message).await?; self.message_state.verack.sent_ack(); if self.message_state.verack.both_acks() { diff --git a/src/node.rs b/src/node.rs index 59c2cf13..89286e76 100644 --- a/src/node.rs +++ b/src/node.rs @@ -401,7 +401,7 @@ impl Node { self.peer_map.tried(nonce).await; // First we signal for ADDRV2 support self.peer_map - .send_message(nonce, MainThreadMessage::GetAddrV2) + .send_message(nonce, MainThreadMessage::SendAddrV2) .await; // Then for BIP 339 witness transaction broadcast self.peer_map From d7db51c7590b076705b46d152975f603c949c2b1 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Fri, 21 Nov 2025 11:10:06 +0000 Subject: [PATCH 3/3] Simplify module names Because the modules are in the `network` subspace using `inbound/outbound` implies what is going back and forth (network messages). --- src/network/{parsers.rs => inbound.rs} | 0 src/network/mod.rs | 4 ++-- src/network/{outbound_messages.rs => outbound.rs} | 0 src/network/peer.rs | 4 ++-- src/network/reader.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) rename src/network/{parsers.rs => inbound.rs} (100%) rename src/network/{outbound_messages.rs => outbound.rs} (100%) diff --git a/src/network/parsers.rs b/src/network/inbound.rs similarity index 100% rename from src/network/parsers.rs rename to src/network/inbound.rs diff --git a/src/network/mod.rs b/src/network/mod.rs index 0e427633..839505d8 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -30,8 +30,8 @@ use error::PeerError; pub(crate) mod dns; pub(crate) mod error; -pub(crate) mod outbound_messages; -pub(crate) mod parsers; +pub(crate) mod inbound; +pub(crate) mod outbound; pub(crate) mod peer; pub(crate) mod peer_map; pub(crate) mod reader; diff --git a/src/network/outbound_messages.rs b/src/network/outbound.rs similarity index 100% rename from src/network/outbound_messages.rs rename to src/network/outbound.rs diff --git a/src/network/peer.rs b/src/network/peer.rs index 7755a1cd..d1694003 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -22,8 +22,8 @@ use crate::{broadcaster::BroadcastQueue, messages::Warning, Dialog, Info}; use super::{ error::PeerError, - outbound_messages::{MessageGenerator, Transport}, - parsers::MessageParser, + inbound::MessageParser, + outbound::{MessageGenerator, Transport}, reader::{Reader, ReaderMessage}, AddressBook, MainThreadMessage, MessageState, PeerId, PeerMessage, PeerThreadMessage, PeerTimeoutConfig, TimeSensitiveId, diff --git a/src/network/reader.rs b/src/network/reader.rs index 853792ba..f59c2f9b 100644 --- a/src/network/reader.rs +++ b/src/network/reader.rs @@ -18,7 +18,7 @@ use tokio::sync::mpsc::Sender; use crate::messages::RejectPayload; use super::error::ReaderError; -use super::parsers::MessageParser; +use super::inbound::MessageParser; use super::TimeSensitiveId; // From Bitcoin Core PR #29575