Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ impl Builder {
self
}

/// Set a timeout for transaction broadcasting. When set, [`Requester::broadcast_tx`] will
/// return an error if no peer requests the transaction within the given duration.
///
/// If none is provided, `broadcast_tx` will wait indefinitely for a peer to request the transaction.
pub fn broadcast_timeout(mut self, timeout: impl Into<Duration>) -> Self {
self.config.broadcast_timeout = Some(timeout.into());
self
}

/// Consume the node builder and receive a [`Node`] and [`Client`].
pub fn build(mut self) -> (Node, Client) {
Node::new(self.network, core::mem::take(&mut self.config))
Expand Down
25 changes: 20 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use bitcoin::p2p::address::AddrV2;
use bitcoin::p2p::ServiceFlags;
use bitcoin::{Amount, Transaction, Wtxid};
Expand Down Expand Up @@ -32,9 +34,10 @@ impl Client {
warn_rx: mpsc::UnboundedReceiver<Warning>,
event_rx: mpsc::UnboundedReceiver<Event>,
ntx: UnboundedSender<ClientMessage>,
broadcast_timeout: Option<Duration>,
) -> Self {
Self {
requester: Requester::new(ntx),
requester: Requester::new(ntx, broadcast_timeout),
info_rx,
warn_rx,
event_rx,
Expand All @@ -46,11 +49,15 @@ impl Client {
#[derive(Debug, Clone)]
pub struct Requester {
ntx: UnboundedSender<ClientMessage>,
broadcast_timeout: Option<Duration>,
}

impl Requester {
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
Self { ntx }
fn new(ntx: UnboundedSender<ClientMessage>, broadcast_timeout: Option<Duration>) -> Self {
Copy link
Copy Markdown
Collaborator

@rustaceanrob rustaceanrob Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of having this associated with the Requester. If a user would like to manage their own timeout, they should use broadcast_tx. Otherwise they should use a new method on the requester called broadcast_tx_with_timeout

Self {
ntx,
broadcast_timeout,
}
}

/// Tell the node to shut down.
Expand All @@ -77,14 +84,22 @@ impl Requester {
///
/// # Errors
///
/// If the node has stopped running.
/// If the node has stopped running or if the configured broadcast timeout expires before
/// any peer requests the transaction. A timeout does not necessarily mean the broadcast
/// failed — the peer may already have the transaction and simply not sent a `getdata` request.
pub async fn broadcast_tx(&self, transaction: Transaction) -> Result<Wtxid, ClientError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Wtxid>();
let client_request = ClientRequest::new(transaction, tx);
self.ntx
.send(ClientMessage::Broadcast(client_request))
.map_err(|_| ClientError::SendError)?;
rx.await.map_err(|_| ClientError::RecvError)
match self.broadcast_timeout {
Some(timeout) => tokio::time::timeout(timeout, rx)
.await
.map_err(|_| ClientError::BroadcastTimeout)?
.map_err(|_| ClientError::RecvError),
None => rx.await.map_err(|_| ClientError::RecvError),
}
}

/// A connection has a minimum transaction fee requirement to enter its mempool. For proper transaction propagation,
Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum ClientError {
SendError,
/// A channel was dropped before sending its value back.
RecvError,
/// No peer requested the transaction within the configured broadcast timeout.
BroadcastTimeout,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a new error variant for this particular method (see FetchBlockError)

}

impl core::fmt::Display for ClientError {
Expand All @@ -39,6 +41,12 @@ impl core::fmt::Display for ClientError {
ClientError::RecvError => {
write!(f, "the sender of data was dropped from memory.")
}
ClientError::BroadcastTimeout => {
write!(
f,
"no peer requested the transaction within the configured timeout."
)
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use chain::Filter;

use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::time::Duration;

// Re-exports
#[doc(inline)]
Expand Down Expand Up @@ -343,6 +344,7 @@ struct Config {
peer_timeout_config: PeerTimeoutConfig,
filter_type: FilterType,
block_type: BlockType,
broadcast_timeout: Option<Duration>,
}

impl Default for Config {
Expand All @@ -356,6 +358,7 @@ impl Default for Config {
peer_timeout_config: PeerTimeoutConfig::default(),
filter_type: FilterType::default(),
block_type: BlockType::default(),
broadcast_timeout: None,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ impl Node {
peer_timeout_config,
filter_type,
block_type,
broadcast_timeout,
} = config;
// Set up a communication channel between the node and client
let (info_tx, info_rx) = mpsc::channel::<Info>(32);
let (warn_tx, warn_rx) = mpsc::unbounded_channel::<Warning>();
let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
let (ctx, crx) = mpsc::unbounded_channel::<ClientMessage>();
let client = Client::new(info_rx, warn_rx, event_rx, ctx);
let client = Client::new(info_rx, warn_rx, event_rx, ctx, broadcast_timeout);
// A structured way to talk to the client
let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx));
// We always assume we are behind
Expand Down
92 changes: 92 additions & 0 deletions tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,98 @@ async fn tx_can_broadcast() {
.unwrap();
}

// Verify that broadcasting a transaction that the peer already has.
#[tokio::test]
async fn tx_broadcast_does_not_hang_when_peer_has_tx() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use the existing transaction broadcasting test instead of creating a new one?

let amount_to_us = Amount::from_sat(100_000);
let amount_to_op_return = Amount::from_sat(50_000);
let (bitcoind, socket_addr) = start_bitcoind(true).unwrap();
let rpc = &bitcoind.client;
let tempdir = tempfile::TempDir::new().unwrap().path().to_owned();
let mut rng = StdRng::seed_from_u64(20002);
let secret = SecretKey::new(&mut rng);
let secp = Secp256k1::new();
let keypair = Keypair::from_secret_key(&secp, &secret);
let (internal_key, _) = keypair.x_only_public_key();
let send_to_this_address = Address::p2tr(&secp, internal_key, None, KnownHrp::Regtest);
let miner = rpc.new_address().unwrap();
mine_blocks(rpc, &miner, 110, 10).await;
let tx_info = rpc
.send_to_address(&send_to_this_address, amount_to_us)
.unwrap();
let txid = tx_info.txid().unwrap();
let tx_details = rpc.get_transaction(txid).unwrap().details;
let (vout, amt) = tx_details
.iter()
.find(|detail| detail.address.eq(&send_to_this_address.to_string()))
.map(|detail| (detail.vout, detail.amount))
.unwrap();
let txout = TxOut {
script_pubkey: miner.script_pubkey(),
value: amount_to_op_return,
};
let outpoint = OutPoint { txid, vout };
let txin = TxIn {
previous_output: outpoint,
script_sig: ScriptBuf::default(),
sequence: Sequence::ENABLE_RBF_NO_LOCKTIME,
witness: Witness::default(),
};
let mut unsigned_tx = Transaction {
version: bitcoin::transaction::Version::TWO,
lock_time: absolute::LockTime::ZERO,
input: vec![txin],
output: vec![txout],
};
let input_index = 0;
let sighash_type = TapSighashType::Default;
let prevout = TxOut {
script_pubkey: send_to_this_address.script_pubkey(),
value: Amount::from_btc(amt.abs()).unwrap(),
};
let prevouts = vec![prevout];
let prevouts = Prevouts::All(&prevouts);
let mut sighasher = SighashCache::new(&mut unsigned_tx);
let sighash = sighasher
.taproot_key_spend_signature_hash(input_index, &prevouts, sighash_type)
.unwrap();
let tweaked: bitcoin::key::TweakedKeypair = keypair.tap_tweak(&secp, None);
let msg = bitcoin::secp256k1::Message::from(sighash);
let signature = secp.sign_schnorr(&msg, &tweaked.to_keypair());
let signature = bitcoin::taproot::Signature {
signature,
sighash_type,
};
*sighasher.witness_mut(input_index).unwrap() = Witness::p2tr_key_spend(&signature);
let tx = sighasher.into_transaction().to_owned();
// Submit the tx directly to bitcoind via RPC — now the peer already has it.
rpc.send_raw_transaction(&tx).unwrap();
println!("Submitted tx {} to bitcoind via RPC", tx.compute_txid());
// Now broadcast the same tx via kyoto with a zero timeout.
let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port()));
let (node, client) = bip157::Builder::new(bitcoin::Network::Regtest)
.add_peer(host)
.data_dir(tempdir)
.chain_state(ChainState::Checkpoint(HeaderCheckpoint::from_genesis(
bitcoin::Network::Regtest,
)))
.broadcast_timeout(Duration::ZERO)
.build();
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
info_rx: _,
warn_rx: _,
event_rx: _,
} = client;
let result = requester.broadcast_tx(tx).await;
assert!(
matches!(result, Err(bip157::ClientError::BroadcastTimeout)),
"expected BroadcastTimeout, got: {:?}",
result,
);
}

#[tokio::test]
async fn dns_works() {
let hostname = bip157::lookup_host("seed.bitcoin.sipa.be").await;
Expand Down
Loading