diff --git a/Cargo.lock b/Cargo.lock index 9b4f1ff1b..831139bc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -262,9 +262,9 @@ dependencies = [ [[package]] name = "bip324" -version = "0.7.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53157fcb2d6ec2851c7602d0690536d0b79209e393972cb2b36bd5d72dbd1879" +checksum = "10e93f07cf774278cf032e8f1d97343a08d079017a33bb20097df380fd55d26c" dependencies = [ "bitcoin", "bitcoin_hashes 0.15.0", diff --git a/crates/floresta-wire/Cargo.toml b/crates/floresta-wire/Cargo.toml index 18fb825d9..2fdf35454 100644 --- a/crates/floresta-wire/Cargo.toml +++ b/crates/floresta-wire/Cargo.toml @@ -15,7 +15,7 @@ keywords = ["bitcoin", "utreexo", "p2p", "networking"] categories = ["cryptography::cryptocurrencies", "network-programming"] [dependencies] -bip324 = { version = "=0.7.0", features = [ "tokio" ] } +bip324 = { version = "0.10", features = [ "tokio" ] } bitcoin = { workspace = true } dns-lookup = { workspace = true } rand = { workspace = true } diff --git a/crates/floresta-wire/src/p2p_wire/transport.rs b/crates/floresta-wire/src/p2p_wire/transport.rs index b9c9eee87..226e9520a 100644 --- a/crates/floresta-wire/src/p2p_wire/transport.rs +++ b/crates/floresta-wire/src/p2p_wire/transport.rs @@ -6,14 +6,15 @@ use core::fmt::Display; use core::fmt::Formatter; use std::io; +use bip324::futures::Protocol; +use bip324::futures::ProtocolReader; +use bip324::futures::ProtocolWriter; +use bip324::io::Payload; +use bip324::io::ProtocolError; +use bip324::io::ProtocolFailureSuggestion; use bip324::serde::deserialize as deserialize_v2; use bip324::serde::serialize as serialize_v2; use bip324::serde::CommandString; -use bip324::AsyncProtocol; -use bip324::AsyncProtocolReader; -use bip324::AsyncProtocolWriter; -use bip324::ProtocolError; -use bip324::ProtocolFailureSuggestion; use bip324::Role; use bitcoin::consensus::deserialize; use bitcoin::consensus::deserialize_partial; @@ -43,7 +44,7 @@ use tokio::io::WriteHalf; use tokio::net::TcpStream; use tokio::net::ToSocketAddrs; use tracing::debug; -use tracing::info; +use tracing::error; use super::socks::Socks5Addr; use super::socks::Socks5Error; @@ -150,13 +151,13 @@ impl_error_from!(TransportError, encode::Error, SerdeV1); impl_error_from!(TransportError, Socks5Error, Proxy); pub enum ReadTransport { - V2(R, AsyncProtocolReader), V1(R, Network), + V2(ProtocolReader), } pub enum WriteTransport { - V2(W, AsyncProtocolWriter), V1(W, Network), + V2(ProtocolWriter), } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -260,39 +261,30 @@ async fn try_connection( Ok(addr) => addr.to_string(), Err(_) => String::from("unknown peer"), }; - let (reader, mut writer) = tokio::io::split(tcp_stream); - let mut reader = BufReader::new(reader); + let (reader, writer) = tokio::io::split(tcp_stream); + let reader = BufReader::new(reader); match force_v1 { true => { - debug!("Using V1 protocol for connection to {peer_addr}"); + debug!("Established a P2PV1 connection with peer={peer_addr}"); Ok(( ReadTransport::V1(reader, network), WriteTransport::V1(writer, network), TransportProtocol::V1, )) } - false => match AsyncProtocol::new( - network, - Role::Initiator, - None, - None, - &mut reader, - &mut writer, - ) - .await - { + false => match Protocol::new(network, Role::Initiator, None, None, reader, writer).await { Ok(protocol) => { - debug!("Successfully established V2 protocol connection to {peer_addr}",); + debug!("Established a P2PV2 connection with peer={peer_addr}"); let (reader_protocol, writer_protocol) = protocol.into_split(); Ok(( - ReadTransport::V2(reader, reader_protocol), - WriteTransport::V2(writer, writer_protocol), + ReadTransport::V2(reader_protocol), + WriteTransport::V2(writer_protocol), TransportProtocol::V2, )) } Err(e) => { - debug!("Failed to establish V2 protocol connection to {peer_addr}: {e:?}",); + debug!("Failed to establish a P2PV2 connection with peer={peer_addr}: {e:?}"); Err(TransportError::Protocol(e)) } }, @@ -321,7 +313,7 @@ async fn try_connection( /// /// Returns a `TransportError` if the proxy connection cannot be established, the connection /// to the target fails, or protocol negotiation fails. -pub async fn connect_proxy( +pub async fn connect_proxy( proxy_addr: A, address: LocalAddress, network: Network, @@ -350,56 +342,41 @@ pub async fn connect_proxy( } } -async fn try_proxy_connection( +async fn try_proxy_connection( proxy_addr: A, target_addr: &Socks5Addr, port: u16, network: Network, force_v1: bool, ) -> TransportResult { - let proxy = TcpStream::connect(proxy_addr).await?; + let proxy = TcpStream::connect(proxy_addr.clone()).await?; let stream = Socks5StreamBuilder::connect(proxy, target_addr, port).await?; - let (reader, mut writer) = tokio::io::split(stream); - let mut reader = BufReader::new(reader); + let (reader, writer) = tokio::io::split(stream); + let reader = BufReader::new(reader); match force_v1 { true => { - info!("Using V1 protocol for proxy connection to {target_addr:?}",); + debug!("Established a P2PV1 connection over SOCKS5 using proxy={proxy_addr:?} with peer={target_addr:?}"); Ok(( ReadTransport::V1(reader, network), WriteTransport::V1(writer, network), TransportProtocol::V1, )) } - false => { - match AsyncProtocol::new( - network, - Role::Initiator, - None, - None, - &mut reader, - &mut writer, - ) - .await - { - Ok(protocol) => { - info!( - "Successfully established V2 protocol proxy connection to {target_addr:?}", - ); - let (reader_protocol, writer_protocol) = protocol.into_split(); - Ok(( - ReadTransport::V2(reader, reader_protocol), - WriteTransport::V2(writer, writer_protocol), - TransportProtocol::V2, - )) - } - Err(e) => { - debug!( - "Failed to establish V2 protocol proxy connection to {target_addr:?}: {e:?}", - ); - Err(TransportError::Protocol(e)) - } + false => match Protocol::new(network, Role::Initiator, None, None, reader, writer).await { + Ok(protocol) => { + debug!("Established a P2PV2 connection over SOCKS5 using proxy={proxy_addr:?} with peer={target_addr:?}"); + let (reader_protocol, writer_protocol) = protocol.into_split(); + Ok(( + ReadTransport::V2(reader_protocol), + WriteTransport::V2(writer_protocol), + TransportProtocol::V2, + )) } - } + Err(e) => { + error!("Failed to establish a P2PV2 connection over SOCKS5 using proxy={proxy_addr:?} with peer={target_addr:?}: {e:?}"); + Err(TransportError::Protocol(e)) + } + }, } } @@ -407,11 +384,11 @@ impl ReadTransport where R: AsyncRead + Unpin + Send, { - /// Read the next [`NetworkMessage`] from the transport's [`AsyncProtocolReader`] buffer. + /// Read the next [`NetworkMessage`] from the transport's [`ProtocolReader`] buffer. pub async fn read_message(&mut self) -> Result { match self { - ReadTransport::V2(reader, protocol) => { - let payload = protocol.read_and_decrypt(reader).await?; + ReadTransport::V2(protocol) => { + let payload = protocol.read().await?; let contents = payload.contents(); // TODO: remove this once https://github.com/rust-bitcoin/rust-bitcoin/pull/5671 @@ -471,10 +448,10 @@ impl WriteTransport where W: AsyncWrite + Unpin + Send + Sync, { - /// Write a [`NetworkMessage`] to the transport's [`AsyncProtocolWriter`] buffer. + /// Write a [`NetworkMessage`] to the transport's [`ProtocolWriter`] buffer. pub async fn write_message(&mut self, message: NetworkMessage) -> Result<(), TransportError> { match self { - WriteTransport::V2(writer, protocol) => { + WriteTransport::V2(protocol) => { // TODO: remove this once https://github.com/rust-bitcoin/rust-bitcoin/pull/5671 and // https://github.com/rust-bitcoin/rust-bitcoin/pull/5009 make it into a release if let NetworkMessage::Unknown { command, payload } = message { @@ -491,13 +468,13 @@ where let mut data = vec![]; data.push(P2PV2_GETUPROOF_MSG_TYPE); data.extend(payload); - protocol.encrypt_and_write(&data, writer).await?; + protocol.write(&Payload::genuine(data)).await?; return Ok(()); } - let data = serialize_v2(message)?; - protocol.encrypt_and_write(&data, writer).await?; + let data = serialize_v2(message); + protocol.write(&Payload::genuine(data)).await?; } WriteTransport::V1(writer, network) => { if let NetworkMessage::Unknown { payload, command } = message { @@ -536,9 +513,9 @@ where /// Shutdown the transport. pub async fn shutdown(&mut self) -> Result<(), TransportError> { match self { - WriteTransport::V2(writer, _) => { - writer.shutdown().await?; - } + // The V2 transport does not require an explicit `writer.shutdown()` call, + // since the buffer is already flushed internally on each `write()` call. + WriteTransport::V2(_) => {} WriteTransport::V1(writer, _) => { writer.shutdown().await?; }