Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/floresta-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ keywords = ["bitcoin", "utreexo", "p2p", "networking"]
categories = ["cryptography::cryptocurrencies", "network-programming"]

[dependencies]
bip324 = { version = "=0.7.0", features = [ "tokio" ] }
bip324 = { version = "0.10", features = [ "tokio" ] }
bitcoin = { workspace = true }
dns-lookup = { workspace = true }
rand = { workspace = true }
Expand Down
119 changes: 48 additions & 71 deletions crates/floresta-wire/src/p2p_wire/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use core::fmt::Display;
use core::fmt::Formatter;
use std::io;

use bip324::futures::Protocol;
use bip324::futures::ProtocolReader;
use bip324::futures::ProtocolWriter;
use bip324::io::Payload;
use bip324::io::ProtocolError;
use bip324::io::ProtocolFailureSuggestion;
use bip324::serde::deserialize as deserialize_v2;
use bip324::serde::serialize as serialize_v2;
use bip324::serde::CommandString;
use bip324::AsyncProtocol;
use bip324::AsyncProtocolReader;
use bip324::AsyncProtocolWriter;
use bip324::ProtocolError;
use bip324::ProtocolFailureSuggestion;
use bip324::Role;
use bitcoin::consensus::deserialize;
use bitcoin::consensus::deserialize_partial;
Expand Down Expand Up @@ -43,7 +44,7 @@ use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio::net::ToSocketAddrs;
use tracing::debug;
use tracing::info;
use tracing::error;

use super::socks::Socks5Addr;
use super::socks::Socks5Error;
Expand Down Expand Up @@ -150,13 +151,13 @@ impl_error_from!(TransportError, encode::Error, SerdeV1);
impl_error_from!(TransportError, Socks5Error, Proxy);

pub enum ReadTransport<R: AsyncRead + Unpin + Send> {
V2(R, AsyncProtocolReader),
V1(R, Network),
V2(ProtocolReader<R>),
}

pub enum WriteTransport<W: AsyncWrite + Unpin + Send + Sync> {
V2(W, AsyncProtocolWriter),
V1(W, Network),
V2(ProtocolWriter<W>),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -260,39 +261,30 @@ async fn try_connection<A: ToSocketAddrs>(
Ok(addr) => addr.to_string(),
Err(_) => String::from("unknown peer"),
};
let (reader, mut writer) = tokio::io::split(tcp_stream);
let mut reader = BufReader::new(reader);
let (reader, writer) = tokio::io::split(tcp_stream);
let reader = BufReader::new(reader);

match force_v1 {
true => {
debug!("Using V1 protocol for connection to {peer_addr}");
debug!("Established a P2PV1 connection with peer={peer_addr}");
Ok((
ReadTransport::V1(reader, network),
WriteTransport::V1(writer, network),
TransportProtocol::V1,
))
}
false => match AsyncProtocol::new(
network,
Role::Initiator,
None,
None,
&mut reader,
&mut writer,
)
.await
{
false => match Protocol::new(network, Role::Initiator, None, None, reader, writer).await {
Ok(protocol) => {
debug!("Successfully established V2 protocol connection to {peer_addr}",);
debug!("Established a P2PV2 connection with peer={peer_addr}");
let (reader_protocol, writer_protocol) = protocol.into_split();
Ok((
ReadTransport::V2(reader, reader_protocol),
WriteTransport::V2(writer, writer_protocol),
ReadTransport::V2(reader_protocol),
WriteTransport::V2(writer_protocol),
TransportProtocol::V2,
))
}
Err(e) => {
debug!("Failed to establish V2 protocol connection to {peer_addr}: {e:?}",);
debug!("Failed to establish a P2PV2 connection with peer={peer_addr}: {e:?}");
Err(TransportError::Protocol(e))
}
},
Expand Down Expand Up @@ -321,7 +313,7 @@ async fn try_connection<A: ToSocketAddrs>(
///
/// Returns a `TransportError` if the proxy connection cannot be established, the connection
/// to the target fails, or protocol negotiation fails.
pub async fn connect_proxy<A: ToSocketAddrs>(
pub async fn connect_proxy<A: ToSocketAddrs + Clone + Debug>(
proxy_addr: A,
address: LocalAddress,
network: Network,
Expand Down Expand Up @@ -350,68 +342,53 @@ pub async fn connect_proxy<A: ToSocketAddrs>(
}
}

async fn try_proxy_connection<A: ToSocketAddrs>(
async fn try_proxy_connection<A: ToSocketAddrs + Clone + Debug>(
proxy_addr: A,
target_addr: &Socks5Addr,
port: u16,
network: Network,
force_v1: bool,
) -> TransportResult {
let proxy = TcpStream::connect(proxy_addr).await?;
let proxy = TcpStream::connect(proxy_addr.clone()).await?;
let stream = Socks5StreamBuilder::connect(proxy, target_addr, port).await?;
let (reader, mut writer) = tokio::io::split(stream);
let mut reader = BufReader::new(reader);
let (reader, writer) = tokio::io::split(stream);
let reader = BufReader::new(reader);
match force_v1 {
true => {
info!("Using V1 protocol for proxy connection to {target_addr:?}",);
debug!("Established a P2PV1 connection over SOCKS5 using proxy={proxy_addr:?} with peer={target_addr:?}");
Ok((
ReadTransport::V1(reader, network),
WriteTransport::V1(writer, network),
TransportProtocol::V1,
))
}
false => {
match AsyncProtocol::new(
network,
Role::Initiator,
None,
None,
&mut reader,
&mut writer,
)
.await
{
Ok(protocol) => {
info!(
"Successfully established V2 protocol proxy connection to {target_addr:?}",
);
let (reader_protocol, writer_protocol) = protocol.into_split();
Ok((
ReadTransport::V2(reader, reader_protocol),
WriteTransport::V2(writer, writer_protocol),
TransportProtocol::V2,
))
}
Err(e) => {
debug!(
"Failed to establish V2 protocol proxy connection to {target_addr:?}: {e:?}",
);
Err(TransportError::Protocol(e))
}
false => match Protocol::new(network, Role::Initiator, None, None, reader, writer).await {
Ok(protocol) => {
debug!("Established a P2PV2 connection over SOCKS5 using proxy={proxy_addr:?} with peer={target_addr:?}");
let (reader_protocol, writer_protocol) = protocol.into_split();
Ok((
ReadTransport::V2(reader_protocol),
WriteTransport::V2(writer_protocol),
TransportProtocol::V2,
))
}
}
Err(e) => {
error!("Failed to establish a P2PV2 connection over SOCKS5 using proxy={proxy_addr:?} with peer={target_addr:?}: {e:?}");
Err(TransportError::Protocol(e))
}
},
}
}

impl<R> ReadTransport<R>
where
R: AsyncRead + Unpin + Send,
{
/// Read the next [`NetworkMessage`] from the transport's [`AsyncProtocolReader`] buffer.
/// Read the next [`NetworkMessage`] from the transport's [`ProtocolReader`] buffer.
pub async fn read_message(&mut self) -> Result<NetworkMessage, TransportError> {
match self {
ReadTransport::V2(reader, protocol) => {
let payload = protocol.read_and_decrypt(reader).await?;
ReadTransport::V2(protocol) => {
let payload = protocol.read().await?;
let contents = payload.contents();

// TODO: remove this once https://github.com/rust-bitcoin/rust-bitcoin/pull/5671
Expand Down Expand Up @@ -471,10 +448,10 @@ impl<W> WriteTransport<W>
where
W: AsyncWrite + Unpin + Send + Sync,
{
/// Write a [`NetworkMessage`] to the transport's [`AsyncProtocolWriter`] buffer.
/// Write a [`NetworkMessage`] to the transport's [`ProtocolWriter`] buffer.
pub async fn write_message(&mut self, message: NetworkMessage) -> Result<(), TransportError> {
match self {
WriteTransport::V2(writer, protocol) => {
WriteTransport::V2(protocol) => {
// TODO: remove this once https://github.com/rust-bitcoin/rust-bitcoin/pull/5671 and
// https://github.com/rust-bitcoin/rust-bitcoin/pull/5009 make it into a release
if let NetworkMessage::Unknown { command, payload } = message {
Expand All @@ -491,13 +468,13 @@ where
let mut data = vec![];
data.push(P2PV2_GETUPROOF_MSG_TYPE);
data.extend(payload);
protocol.encrypt_and_write(&data, writer).await?;
protocol.write(&Payload::genuine(data)).await?;

return Ok(());
}

let data = serialize_v2(message)?;
protocol.encrypt_and_write(&data, writer).await?;
let data = serialize_v2(message);
protocol.write(&Payload::genuine(data)).await?;
}
WriteTransport::V1(writer, network) => {
if let NetworkMessage::Unknown { payload, command } = message {
Expand Down Expand Up @@ -536,9 +513,9 @@ where
/// Shutdown the transport.
pub async fn shutdown(&mut self) -> Result<(), TransportError> {
match self {
WriteTransport::V2(writer, _) => {
writer.shutdown().await?;
}
// The V2 transport does not require an explicit `writer.shutdown()` call,
// since the buffer is already flushed internally on each `write()` call.
WriteTransport::V2(_) => {}
WriteTransport::V1(writer, _) => {
writer.shutdown().await?;
}
Expand Down
Loading