diff --git a/src/network/parsers.rs b/src/network/inbound.rs similarity index 100% rename from src/network/parsers.rs rename to src/network/inbound.rs diff --git a/src/network/mod.rs b/src/network/mod.rs index c6cfb8f6..839505d8 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -30,8 +30,8 @@ use error::PeerError; pub(crate) mod dns; pub(crate) mod error; -pub(crate) mod outbound_messages; -pub(crate) mod parsers; +pub(crate) mod inbound; +pub(crate) mod outbound; pub(crate) mod peer; pub(crate) mod peer_map; pub(crate) mod reader; @@ -389,7 +389,7 @@ impl TimeSensitiveId { #[derive(Debug, Clone)] pub(crate) enum MainThreadMessage { GetAddr, - GetAddrV2, + SendAddrV2, WtxidRelay, #[allow(unused)] SendHeaders, diff --git a/src/network/outbound_messages.rs b/src/network/outbound.rs similarity index 55% rename from src/network/outbound_messages.rs rename to src/network/outbound.rs index 815aeb42..5b19ce37 100644 --- a/src/network/outbound_messages.rs +++ b/src/network/outbound.rs @@ -18,125 +18,94 @@ use bitcoin::{ use crate::default_port_from_network; -use super::{error::PeerError, KYOTO_VERSION, PROTOCOL_VERSION, RUST_BITCOIN_VERSION}; +use super::{KYOTO_VERSION, PROTOCOL_VERSION, RUST_BITCOIN_VERSION}; // Responsible for serializing messages to write over the wire, either encrypted or plaintext. -pub(crate) struct MessageGenerator { +pub(in crate::network) struct MessageGenerator { pub network: Network, pub transport: Transport, } -pub(crate) enum Transport { +pub(in crate::network) enum Transport { V1, V2 { encryptor: PacketWriter }, } impl MessageGenerator { - fn serialize(&mut self, msg: NetworkMessage) -> Result, PeerError> { + pub(in crate::network) fn serialize(&mut self, msg: NetworkMessage) -> Vec { match &mut self.transport { Transport::V1 => { let data = RawNetworkMessage::new(self.network.magic(), msg); - Ok(serialize(&data)) + serialize(&data) } Transport::V2 { encryptor } => { - let plaintext = serialize_network_message(msg)?; + let plaintext = serialize_network_message(msg); encrypt_plaintext(encryptor, plaintext) } } } - pub(crate) fn version_message(&mut self, port: Option) -> Result, PeerError> { + pub(in crate::network) fn version_message(&mut self, port: Option) -> Vec { let msg = NetworkMessage::Version(make_version(port, &self.network)); self.serialize(msg) } - pub(crate) fn verack(&mut self) -> Result, PeerError> { - let msg = NetworkMessage::Verack; - self.serialize(msg) - } - - pub(crate) fn addr(&mut self) -> Result, PeerError> { - let msg = NetworkMessage::GetAddr; - self.serialize(msg) - } - - pub(crate) fn addrv2(&mut self) -> Result, PeerError> { - let msg = NetworkMessage::SendAddrV2; - self.serialize(msg) - } - - pub(crate) fn wtxid_relay(&mut self) -> Result, PeerError> { - let msg = NetworkMessage::WtxidRelay; - self.serialize(msg) - } - - pub(crate) fn sendheaders(&mut self) -> Result, PeerError> { - let msg = NetworkMessage::SendHeaders; - self.serialize(msg) - } - - pub(crate) fn headers(&mut self, msg: GetHeadersMessage) -> Result, PeerError> { + pub(in crate::network) fn headers(&mut self, msg: GetHeadersMessage) -> Vec { let msg = NetworkMessage::GetHeaders(msg); self.serialize(msg) } - pub(crate) fn cf_headers(&mut self, message: GetCFHeaders) -> Result, PeerError> { + pub(in crate::network) fn cf_headers(&mut self, message: GetCFHeaders) -> Vec { let msg = NetworkMessage::GetCFHeaders(message); self.serialize(msg) } - pub(crate) fn filters(&mut self, message: GetCFilters) -> Result, PeerError> { + pub(in crate::network) fn filters(&mut self, message: GetCFilters) -> Vec { let msg = NetworkMessage::GetCFilters(message); self.serialize(msg) } - pub(crate) fn block(&mut self, hash: BlockHash) -> Result, PeerError> { + pub(in crate::network) fn block(&mut self, hash: BlockHash) -> Vec { let inv = Inventory::Block(hash); let msg = NetworkMessage::GetData(vec![inv]); self.serialize(msg) } - pub(crate) fn ping(&mut self, nonce: u64) -> Result, PeerError> { + pub(in crate::network) fn ping(&mut self, nonce: u64) -> Vec { let msg = NetworkMessage::Ping(nonce); self.serialize(msg) } - pub(crate) fn pong(&mut self, nonce: u64) -> Result, PeerError> { + pub(in crate::network) fn pong(&mut self, nonce: u64) -> Vec { let msg = NetworkMessage::Pong(nonce); self.serialize(msg) } - pub(crate) fn announce_transactions( - &mut self, - wtxids: Vec, - ) -> Result, PeerError> { + pub(in crate::network) fn announce_transactions(&mut self, wtxids: Vec) -> Vec { let msg = NetworkMessage::Inv(wtxids.into_iter().map(Inventory::WTx).collect()); self.serialize(msg) } - pub(crate) fn broadcast_transaction( + pub(in crate::network) fn broadcast_transaction( &mut self, transaction: Transaction, - ) -> Result, PeerError> { + ) -> Vec { let msg = NetworkMessage::Tx(transaction); self.serialize(msg) } } -fn serialize_network_message(message: NetworkMessage) -> Result, PeerError> { - bip324::serde::serialize(message).map_err(From::from) +fn serialize_network_message(message: NetworkMessage) -> Vec { + bip324::serde::serialize(message).expect("in memory serialization cannot fail.") } -fn encrypt_plaintext( - encryptor: &mut PacketWriter, - plaintext: Vec, -) -> Result, PeerError> { +fn encrypt_plaintext(encryptor: &mut PacketWriter, plaintext: Vec) -> Vec { encryptor .encrypt_packet(&plaintext, None, PacketType::Genuine) - .map_err(From::from) + .expect("encryption to in memory buffer cannot fail.") } -fn make_version(port: Option, network: &Network) -> VersionMessage { +pub(in crate::network) fn make_version(port: Option, network: &Network) -> VersionMessage { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("time went backwards") diff --git a/src/network/peer.rs b/src/network/peer.rs index cb41bdcf..d1694003 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -3,7 +3,10 @@ use std::{sync::Arc, time::Duration}; use addrman::Record; use bip324::{AsyncProtocol, PacketReader, PacketWriter, Role}; -use bitcoin::{p2p::ServiceFlags, Network}; +use bitcoin::{ + p2p::{message::NetworkMessage, ServiceFlags}, + Network, +}; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}, net::TcpStream, @@ -19,8 +22,8 @@ use crate::{broadcaster::BroadcastQueue, messages::Warning, Dialog, Info}; use super::{ error::PeerError, - outbound_messages::{MessageGenerator, Transport}, - parsers::MessageParser, + inbound::MessageParser, + outbound::{MessageGenerator, Transport}, reader::{Reader, ReaderMessage}, AddressBook, MainThreadMessage, MessageState, PeerId, PeerMessage, PeerThreadMessage, PeerTimeoutConfig, TimeSensitiveId, @@ -102,7 +105,7 @@ impl Peer { (outbound_messages, reader) }; - let message = outbound_messages.version_message(None)?; + let message = outbound_messages.version_message(None); self.write_bytes(&mut writer, message).await?; self.message_state.start_version_handshake(); let read_handle = tokio::spawn(async move { peer_reader.read_from_remote().await }); @@ -113,7 +116,7 @@ impl Peer { return Ok(()); } if let Some(nonce) = self.message_state.ping_state.send_ping() { - let msg = outbound_messages.ping(nonce)?; + let msg = outbound_messages.ping(nonce); self.write_bytes(&mut writer, msg).await?; let msg_id = TimeSensitiveId::PING; self.message_state @@ -261,7 +264,7 @@ impl Peer { for wtxid in requests { let transaction = tx_queue.fetch_tx(wtxid); if let Some(transaction) = transaction { - let msg = message_generator.broadcast_transaction(transaction)?; + let msg = message_generator.broadcast_transaction(transaction); self.write_bytes(writer, msg).await?; self.message_state.sent_tx(wtxid); tx_queue.successful(wtxid); @@ -285,7 +288,7 @@ impl Peer { Ok(()) } ReaderMessage::Ping(nonce) => { - let message = message_generator.pong(nonce)?; + let message = message_generator.pong(nonce); self.write_bytes(writer, message).await?; Ok(()) } @@ -333,38 +336,38 @@ impl Peer { } match request { MainThreadMessage::GetAddr => { - let message = message_generator.addr()?; + let message = message_generator.serialize(NetworkMessage::GetAddr); self.write_bytes(writer, message).await?; } - MainThreadMessage::GetAddrV2 => { - let message = message_generator.addrv2()?; + MainThreadMessage::SendAddrV2 => { + let message = message_generator.serialize(NetworkMessage::SendAddrV2); self.write_bytes(writer, message).await?; } MainThreadMessage::WtxidRelay => { - let message = message_generator.wtxid_relay()?; + let message = message_generator.serialize(NetworkMessage::WtxidRelay); self.write_bytes(writer, message).await?; } MainThreadMessage::SendHeaders => { - let message = message_generator.sendheaders()?; + let message = message_generator.serialize(NetworkMessage::SendHeaders); self.write_bytes(writer, message).await?; } MainThreadMessage::GetHeaders(config) => { - let message = message_generator.headers(config)?; + let message = message_generator.headers(config); self.write_bytes(writer, message).await?; } MainThreadMessage::GetFilterHeaders(config) => { - let message = message_generator.cf_headers(config)?; + let message = message_generator.cf_headers(config); self.write_bytes(writer, message).await?; } MainThreadMessage::GetFilters(config) => { self.message_state .filter_rate .batch_requested(config.stop_hash); - let message = message_generator.filters(config)?; + let message = message_generator.filters(config); self.write_bytes(writer, message).await?; } MainThreadMessage::GetBlock(message) => { - let message = message_generator.block(message)?; + let message = message_generator.block(message); self.write_bytes(writer, message).await?; } MainThreadMessage::BroadcastPending => { @@ -377,12 +380,12 @@ impl Peer { queue.pending_wtxid() }; if !wtxids.is_empty() { - let message = message_generator.announce_transactions(wtxids)?; + let message = message_generator.announce_transactions(wtxids); self.write_bytes(writer, message).await?; } } MainThreadMessage::Verack => { - let message = message_generator.verack()?; + let message = message_generator.serialize(NetworkMessage::Verack); self.write_bytes(writer, message).await?; self.message_state.verack.sent_ack(); if self.message_state.verack.both_acks() { @@ -395,7 +398,7 @@ impl Peer { queue.pending_wtxid() }; if !wtxids.is_empty() { - let message = message_generator.announce_transactions(wtxids)?; + let message = message_generator.announce_transactions(wtxids); self.write_bytes(writer, message).await?; } } diff --git a/src/network/reader.rs b/src/network/reader.rs index 853792ba..f59c2f9b 100644 --- a/src/network/reader.rs +++ b/src/network/reader.rs @@ -18,7 +18,7 @@ use tokio::sync::mpsc::Sender; use crate::messages::RejectPayload; use super::error::ReaderError; -use super::parsers::MessageParser; +use super::inbound::MessageParser; use super::TimeSensitiveId; // From Bitcoin Core PR #29575 diff --git a/src/node.rs b/src/node.rs index 59c2cf13..89286e76 100644 --- a/src/node.rs +++ b/src/node.rs @@ -401,7 +401,7 @@ impl Node { self.peer_map.tried(nonce).await; // First we signal for ADDRV2 support self.peer_map - .send_message(nonce, MainThreadMessage::GetAddrV2) + .send_message(nonce, MainThreadMessage::SendAddrV2) .await; // Then for BIP 339 witness transaction broadcast self.peer_map