Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 3 additions & 19 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::oneshot;

use crate::chain::block_subsidy;
use crate::messages::ClientRequest;
use crate::{Event, Info, TrustedPeer, TxBroadcast, Warning};
use crate::{Event, Info, TrustedPeer, Warning};

use super::{error::ClientError, messages::ClientMessage};
use super::{error::FetchBlockError, IndexedBlock};
Expand Down Expand Up @@ -76,25 +76,9 @@ impl Requester {
/// # Errors
///
/// If the node has stopped running.
pub async fn broadcast_tx(&self, tx_broadcast: TxBroadcast) -> Result<Wtxid, ClientError> {
pub async fn broadcast_tx(&self, transaction: Transaction) -> Result<Wtxid, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Wtxid>();
let client_request = ClientRequest::new(tx_broadcast, tx);
self.ntx
.send(ClientMessage::Broadcast(client_request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
}

/// Broadcast a new transaction to the network to a random peer, waiting for the peer to
/// request the data.
///
/// # Errors
///
/// If the node has stopped running.
pub async fn broadcast_random(&self, tx: Transaction) -> Result<Wtxid, ClientError> {
let tx_broadcast = TxBroadcast::random_broadcast(tx);
let (tx, rx) = tokio::sync::oneshot::channel::<Wtxid>();
let client_request = ClientRequest::new(tx_broadcast, tx);
let client_request = ClientRequest::new(transaction, tx);
self.ntx
.send(ClientMessage::Broadcast(client_request))
.map_err(|_| ClientError::SendError)?;
Expand Down
37 changes: 0 additions & 37 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,43 +177,6 @@ impl std::cmp::Ord for IndexedFilter {
}
}

/// Broadcast a [`Transaction`] to a set of connected peers.
#[derive(Debug, Clone)]
pub struct TxBroadcast {
/// The presumably valid Bitcoin transaction.
pub tx: Transaction,
/// The strategy for how this transaction should be shared with the network.
pub broadcast_policy: TxBroadcastPolicy,
}

impl TxBroadcast {
/// Prepare a transaction for broadcast with associated broadcast strategy.
pub fn new(tx: Transaction, broadcast_policy: TxBroadcastPolicy) -> Self {
Self {
tx,
broadcast_policy,
}
}

/// Prepare a transaction to be broadcasted to a random connection.
pub fn random_broadcast(tx: Transaction) -> Self {
Self {
tx,
broadcast_policy: TxBroadcastPolicy::RandomPeer,
}
}
}

/// The strategy for how this transaction should be shared with the network.
#[derive(Debug, Default, Clone)]
pub enum TxBroadcastPolicy {
/// Broadcast the transaction to all peers at the same time.
AllPeers,
/// Broadcast the transaction to a single random peer, optimal for user privacy.
#[default]
RandomPeer,
}

/// A peer on the Bitcoin P2P network
///
/// # Building peers
Expand Down
8 changes: 5 additions & 3 deletions src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::BTreeMap;
use std::ops::Div;

use bitcoin::{block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Wtxid};
use bitcoin::{
block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, Transaction, Wtxid,
};

use crate::chain::BlockHeaderChanges;
use crate::IndexedFilter;
use crate::{chain::checkpoints::HeaderCheckpoint, IndexedBlock, TrustedPeer, TxBroadcast};
use crate::{chain::checkpoints::HeaderCheckpoint, IndexedBlock, TrustedPeer};

use super::error::FetchBlockError;

Expand Down Expand Up @@ -138,7 +140,7 @@ pub(crate) enum ClientMessage {
/// Stop the node.
Shutdown,
/// Broadcast a [`crate::Transaction`] with a [`crate::TxBroadcastPolicy`].
Broadcast(ClientRequest<TxBroadcast, Wtxid>),
Broadcast(ClientRequest<Transaction, Wtxid>),
/// Starting at the configured anchor checkpoint, re-emit all filters.
Rescan,
/// Explicitly request a block from the node.
Expand Down
32 changes: 9 additions & 23 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bitcoin::{
message_network::VersionMessage,
ServiceFlags,
},
Block, BlockHash, Network, Wtxid,
Block, BlockHash, Network, Transaction, Wtxid,
};
use tokio::{
select,
Expand Down Expand Up @@ -37,7 +37,7 @@ use crate::{
peer_map::PeerMap, LastBlockMonitor, MainThreadMessage, PeerId, PeerMessage,
PeerThreadMessage,
},
Config, IndexedBlock, NodeState, TxBroadcast, TxBroadcastPolicy,
Config, IndexedBlock, NodeState,
};

use super::{
Expand Down Expand Up @@ -288,29 +288,15 @@ impl Node {
}

// Broadcast transactions according to the configured policy
async fn broadcast_transaction(&self, broadcast: ClientRequest<TxBroadcast, Wtxid>) {
async fn broadcast_transaction(&self, broadcast: ClientRequest<Transaction, Wtxid>) {
let mut queue = self.peer_map.tx_queue.lock().await;
let (data, oneshot) = broadcast.into_values();
let policy = data.broadcast_policy;
queue.add_to_queue(data.tx, oneshot);
let (transaction, oneshot) = broadcast.into_values();
queue.add_to_queue(transaction, oneshot);
drop(queue);
match policy {
TxBroadcastPolicy::AllPeers => {
crate::debug!(format!(
"Sending transaction to {} connected peers",
self.peer_map.live()
));
self.peer_map
.broadcast(MainThreadMessage::BroadcastPending)
.await
}
TxBroadcastPolicy::RandomPeer => {
crate::debug!("Sending transaction to a random peer");
self.peer_map
.send_random(MainThreadMessage::BroadcastPending)
.await
}
};
crate::debug!("Sending transaction to a random peer");
self.peer_map
.send_random(MainThreadMessage::BroadcastPending)
.await;
}

// Try to continue with the syncing process
Expand Down
2 changes: 1 addition & 1 deletion tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ async fn tx_can_broadcast() {
warn_rx: _,
event_rx: _,
} = client;
tokio::time::timeout(Duration::from_secs(60), requester.broadcast_random(tx))
tokio::time::timeout(Duration::from_secs(60), requester.broadcast_tx(tx))
.await
.unwrap()
.unwrap();
Expand Down