diff --git a/src/builder.rs b/src/builder.rs index 8040b15c..83c4df6d 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -143,6 +143,15 @@ impl Builder { self } + /// Set a timeout for transaction broadcasting. When set, [`Requester::broadcast_tx`] will + /// return an error if no peer requests the transaction within the given duration. + /// + /// If none is provided, `broadcast_tx` will wait indefinitely for a peer to request the transaction. + pub fn broadcast_timeout(mut self, timeout: impl Into) -> Self { + self.config.broadcast_timeout = Some(timeout.into()); + self + } + /// Consume the node builder and receive a [`Node`] and [`Client`]. pub fn build(mut self) -> (Node, Client) { Node::new(self.network, core::mem::take(&mut self.config)) diff --git a/src/client.rs b/src/client.rs index 4bf37165..ea60d284 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use bitcoin::p2p::address::AddrV2; use bitcoin::p2p::ServiceFlags; use bitcoin::{Amount, Transaction, Wtxid}; @@ -32,9 +34,10 @@ impl Client { warn_rx: mpsc::UnboundedReceiver, event_rx: mpsc::UnboundedReceiver, ntx: UnboundedSender, + broadcast_timeout: Option, ) -> Self { Self { - requester: Requester::new(ntx), + requester: Requester::new(ntx, broadcast_timeout), info_rx, warn_rx, event_rx, @@ -46,11 +49,15 @@ impl Client { #[derive(Debug, Clone)] pub struct Requester { ntx: UnboundedSender, + broadcast_timeout: Option, } impl Requester { - fn new(ntx: UnboundedSender) -> Self { - Self { ntx } + fn new(ntx: UnboundedSender, broadcast_timeout: Option) -> Self { + Self { + ntx, + broadcast_timeout, + } } /// Tell the node to shut down. @@ -77,14 +84,22 @@ impl Requester { /// /// # Errors /// - /// If the node has stopped running. + /// If the node has stopped running or if the configured broadcast timeout expires before + /// any peer requests the transaction. A timeout does not necessarily mean the broadcast + /// failed — the peer may already have the transaction and simply not sent a `getdata` request. pub async fn broadcast_tx(&self, transaction: Transaction) -> Result { let (tx, rx) = tokio::sync::oneshot::channel::(); let client_request = ClientRequest::new(transaction, tx); self.ntx .send(ClientMessage::Broadcast(client_request)) .map_err(|_| ClientError::SendError)?; - rx.await.map_err(|_| ClientError::RecvError) + match self.broadcast_timeout { + Some(timeout) => tokio::time::timeout(timeout, rx) + .await + .map_err(|_| ClientError::BroadcastTimeout)? + .map_err(|_| ClientError::RecvError), + None => rx.await.map_err(|_| ClientError::RecvError), + } } /// A connection has a minimum transaction fee requirement to enter its mempool. For proper transaction propagation, diff --git a/src/error.rs b/src/error.rs index 334c32eb..3770519b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -28,6 +28,8 @@ pub enum ClientError { SendError, /// A channel was dropped before sending its value back. RecvError, + /// No peer requested the transaction within the configured broadcast timeout. + BroadcastTimeout, } impl core::fmt::Display for ClientError { @@ -39,6 +41,12 @@ impl core::fmt::Display for ClientError { ClientError::RecvError => { write!(f, "the sender of data was dropped from memory.") } + ClientError::BroadcastTimeout => { + write!( + f, + "no peer requested the transaction within the configured timeout." + ) + } } } } diff --git a/src/lib.rs b/src/lib.rs index 2af34ffe..97e8a08a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,6 +64,7 @@ use chain::Filter; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; +use std::time::Duration; // Re-exports #[doc(inline)] @@ -343,6 +344,7 @@ struct Config { peer_timeout_config: PeerTimeoutConfig, filter_type: FilterType, block_type: BlockType, + broadcast_timeout: Option, } impl Default for Config { @@ -356,6 +358,7 @@ impl Default for Config { peer_timeout_config: PeerTimeoutConfig::default(), filter_type: FilterType::default(), block_type: BlockType::default(), + broadcast_timeout: None, } } } diff --git a/src/node.rs b/src/node.rs index 8e8d226e..71749231 100644 --- a/src/node.rs +++ b/src/node.rs @@ -76,13 +76,14 @@ impl Node { peer_timeout_config, filter_type, block_type, + broadcast_timeout, } = config; // Set up a communication channel between the node and client let (info_tx, info_rx) = mpsc::channel::(32); let (warn_tx, warn_rx) = mpsc::unbounded_channel::(); let (event_tx, event_rx) = mpsc::unbounded_channel::(); let (ctx, crx) = mpsc::unbounded_channel::(); - let client = Client::new(info_rx, warn_rx, event_rx, ctx); + let client = Client::new(info_rx, warn_rx, event_rx, ctx, broadcast_timeout); // A structured way to talk to the client let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx)); // We always assume we are behind diff --git a/tests/core.rs b/tests/core.rs index 5d62e984..08edefeb 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -633,6 +633,98 @@ async fn tx_can_broadcast() { .unwrap(); } +// Verify that broadcasting a transaction that the peer already has. +#[tokio::test] +async fn tx_broadcast_does_not_hang_when_peer_has_tx() { + let amount_to_us = Amount::from_sat(100_000); + let amount_to_op_return = Amount::from_sat(50_000); + let (bitcoind, socket_addr) = start_bitcoind(true).unwrap(); + let rpc = &bitcoind.client; + let tempdir = tempfile::TempDir::new().unwrap().path().to_owned(); + let mut rng = StdRng::seed_from_u64(20002); + let secret = SecretKey::new(&mut rng); + let secp = Secp256k1::new(); + let keypair = Keypair::from_secret_key(&secp, &secret); + let (internal_key, _) = keypair.x_only_public_key(); + let send_to_this_address = Address::p2tr(&secp, internal_key, None, KnownHrp::Regtest); + let miner = rpc.new_address().unwrap(); + mine_blocks(rpc, &miner, 110, 10).await; + let tx_info = rpc + .send_to_address(&send_to_this_address, amount_to_us) + .unwrap(); + let txid = tx_info.txid().unwrap(); + let tx_details = rpc.get_transaction(txid).unwrap().details; + let (vout, amt) = tx_details + .iter() + .find(|detail| detail.address.eq(&send_to_this_address.to_string())) + .map(|detail| (detail.vout, detail.amount)) + .unwrap(); + let txout = TxOut { + script_pubkey: miner.script_pubkey(), + value: amount_to_op_return, + }; + let outpoint = OutPoint { txid, vout }; + let txin = TxIn { + previous_output: outpoint, + script_sig: ScriptBuf::default(), + sequence: Sequence::ENABLE_RBF_NO_LOCKTIME, + witness: Witness::default(), + }; + let mut unsigned_tx = Transaction { + version: bitcoin::transaction::Version::TWO, + lock_time: absolute::LockTime::ZERO, + input: vec![txin], + output: vec![txout], + }; + let input_index = 0; + let sighash_type = TapSighashType::Default; + let prevout = TxOut { + script_pubkey: send_to_this_address.script_pubkey(), + value: Amount::from_btc(amt.abs()).unwrap(), + }; + let prevouts = vec![prevout]; + let prevouts = Prevouts::All(&prevouts); + let mut sighasher = SighashCache::new(&mut unsigned_tx); + let sighash = sighasher + .taproot_key_spend_signature_hash(input_index, &prevouts, sighash_type) + .unwrap(); + let tweaked: bitcoin::key::TweakedKeypair = keypair.tap_tweak(&secp, None); + let msg = bitcoin::secp256k1::Message::from(sighash); + let signature = secp.sign_schnorr(&msg, &tweaked.to_keypair()); + let signature = bitcoin::taproot::Signature { + signature, + sighash_type, + }; + *sighasher.witness_mut(input_index).unwrap() = Witness::p2tr_key_spend(&signature); + let tx = sighasher.into_transaction().to_owned(); + // Submit the tx directly to bitcoind via RPC — now the peer already has it. + rpc.send_raw_transaction(&tx).unwrap(); + println!("Submitted tx {} to bitcoind via RPC", tx.compute_txid()); + // Now broadcast the same tx via kyoto with a zero timeout. + let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); + let (node, client) = bip157::Builder::new(bitcoin::Network::Regtest) + .add_peer(host) + .data_dir(tempdir) + .chain_state(ChainState::Checkpoint(HeaderCheckpoint::from_genesis( + bitcoin::Network::Regtest, + ))) + .broadcast_timeout(Duration::ZERO) + .build(); + tokio::task::spawn(async move { node.run().await }); + let Client { + requester, + info_rx: _, + warn_rx: _, + event_rx: _, + } = client; + let result = requester.broadcast_tx(tx).await; + assert!( + matches!(result, Err(bip157::ClientError::BroadcastTimeout)), + "expected BroadcastTimeout, got: {:?}", + result, + ); +} + #[tokio::test] async fn dns_works() { let hostname = bip157::lookup_host("seed.bitcoin.sipa.be").await;