Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/network/parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ use bip324::{PacketReader, PacketType};
use bitcoin::consensus::{deserialize, deserialize_partial};
use bitcoin::p2p::message::RawNetworkMessage;
use bitcoin::Network;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncBufReadExt, AsyncReadExt};

use super::error::ReaderError;
use super::V1Header;

const MAX_MESSAGE_BYTES: u32 = 1024 * 1024 * 32;

pub(crate) enum MessageParser<R: AsyncReadExt + Send + Sync + Unpin> {
pub(crate) enum MessageParser<R: AsyncBufReadExt + Send + Sync + Unpin> {
V2(R, PacketReader),
V1(R, Network),
}

impl<R: AsyncReadExt + Send + Sync + Unpin> MessageParser<R> {
impl<R: AsyncBufReadExt + Send + Sync + Unpin> MessageParser<R> {
pub async fn read_message(&mut self) -> Result<Option<NetworkMessage>, ReaderError> {
match self {
MessageParser::V2(stream, decryptor) => {
Expand Down
5 changes: 3 additions & 2 deletions src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use addrman::Record;
use bip324::{AsyncProtocol, PacketReader, PacketWriter, Role};
use bitcoin::{p2p::ServiceFlags, Network};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
net::TcpStream,
select,
sync::{
Expand Down Expand Up @@ -79,7 +79,8 @@ impl Peer {
pub async fn run(&mut self, connection: TcpStream) -> Result<(), PeerError> {
let start_time = Instant::now();
let (tx, mut rx) = mpsc::channel(32);
let (mut reader, mut writer) = connection.into_split();
let (reader, mut writer) = connection.into_split();
let mut reader = BufReader::new(reader);
// If a peer signals for V2 we will use it, otherwise just use plaintext.
let (mut outbound_messages, mut peer_reader) =
if self.source.service_flags().has(ServiceFlags::P2P_V2) {
Expand Down
6 changes: 3 additions & 3 deletions src/network/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::net::IpAddr;
use bitcoin::p2p::address::AddrV2;
use bitcoin::p2p::{message::NetworkMessage, message_blockdata::Inventory, ServiceFlags};
use bitcoin::{FeeRate, Wtxid};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc::Sender;

use crate::channel_messages::{CombinedAddr, ReaderMessage};
Expand All @@ -17,12 +17,12 @@ const MAX_ADDR: usize = 1_000;
const MAX_INV: usize = 50_000;
const MAX_HEADERS: usize = 2_000;

pub(crate) struct Reader<R: AsyncReadExt + Send + Sync + Unpin> {
pub(crate) struct Reader<R: AsyncBufReadExt + Send + Sync + Unpin> {
parser: MessageParser<R>,
tx: Sender<ReaderMessage>,
}

impl<R: AsyncReadExt + Send + Sync + Unpin> Reader<R> {
impl<R: AsyncBufReadExt + Send + Sync + Unpin> Reader<R> {
pub fn new(parser: MessageParser<R>, tx: Sender<ReaderMessage>) -> Self {
Self { parser, tx }
}
Expand Down