Skip to content

Commit dbd0be9

Browse files
committed
Add broadcast_tx timeout
1 parent 49b2a32 commit dbd0be9

6 files changed

Lines changed: 134 additions & 6 deletions

File tree

src/builder.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ impl Builder {
143143
self
144144
}
145145

146+
/// Set a timeout for transaction broadcasting. When set, [`Requester::broadcast_tx`] will
147+
/// return an error if no peer requests the transaction within the given duration.
148+
///
149+
/// If none is provided, `broadcast_tx` will wait indefinitely for a peer to request the transaction.
150+
pub fn broadcast_timeout(mut self, timeout: impl Into<Duration>) -> Self {
151+
self.config.broadcast_timeout = Some(timeout.into());
152+
self
153+
}
154+
146155
/// Consume the node builder and receive a [`Node`] and [`Client`].
147156
pub fn build(mut self) -> (Node, Client) {
148157
Node::new(self.network, core::mem::take(&mut self.config))

src/client.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use bitcoin::p2p::address::AddrV2;
24
use bitcoin::p2p::ServiceFlags;
35
use bitcoin::{Amount, Transaction, Wtxid};
@@ -32,9 +34,10 @@ impl Client {
3234
warn_rx: mpsc::UnboundedReceiver<Warning>,
3335
event_rx: mpsc::UnboundedReceiver<Event>,
3436
ntx: UnboundedSender<ClientMessage>,
37+
broadcast_timeout: Option<Duration>,
3538
) -> Self {
3639
Self {
37-
requester: Requester::new(ntx),
40+
requester: Requester::new(ntx, broadcast_timeout),
3841
info_rx,
3942
warn_rx,
4043
event_rx,
@@ -46,11 +49,15 @@ impl Client {
4649
#[derive(Debug, Clone)]
4750
pub struct Requester {
4851
ntx: UnboundedSender<ClientMessage>,
52+
broadcast_timeout: Option<Duration>,
4953
}
5054

5155
impl Requester {
52-
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
53-
Self { ntx }
56+
fn new(ntx: UnboundedSender<ClientMessage>, broadcast_timeout: Option<Duration>) -> Self {
57+
Self {
58+
ntx,
59+
broadcast_timeout,
60+
}
5461
}
5562

5663
/// Tell the node to shut down.
@@ -77,14 +84,22 @@ impl Requester {
7784
///
7885
/// # Errors
7986
///
80-
/// If the node has stopped running.
87+
/// If the node has stopped running or if the configured broadcast timeout expires before
88+
/// any peer requests the transaction. A timeout does not necessarily mean the broadcast
89+
/// failed — the peer may already have the transaction and simply not sent a `getdata` request.
8190
pub async fn broadcast_tx(&self, transaction: Transaction) -> Result<Wtxid, ClientError> {
8291
let (tx, rx) = tokio::sync::oneshot::channel::<Wtxid>();
8392
let client_request = ClientRequest::new(transaction, tx);
8493
self.ntx
8594
.send(ClientMessage::Broadcast(client_request))
8695
.map_err(|_| ClientError::SendError)?;
87-
rx.await.map_err(|_| ClientError::RecvError)
96+
match self.broadcast_timeout {
97+
Some(timeout) => tokio::time::timeout(timeout, rx)
98+
.await
99+
.map_err(|_| ClientError::BroadcastTimeout)?
100+
.map_err(|_| ClientError::RecvError),
101+
None => rx.await.map_err(|_| ClientError::RecvError),
102+
}
88103
}
89104

90105
/// A connection has a minimum transaction fee requirement to enter its mempool. For proper transaction propagation,

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub enum ClientError {
2828
SendError,
2929
/// A channel was dropped before sending its value back.
3030
RecvError,
31+
/// No peer requested the transaction within the configured broadcast timeout.
32+
BroadcastTimeout,
3133
}
3234

3335
impl core::fmt::Display for ClientError {
@@ -39,6 +41,12 @@ impl core::fmt::Display for ClientError {
3941
ClientError::RecvError => {
4042
write!(f, "the sender of data was dropped from memory.")
4143
}
44+
ClientError::BroadcastTimeout => {
45+
write!(
46+
f,
47+
"no peer requested the transaction within the configured timeout."
48+
)
49+
}
4250
}
4351
}
4452
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use chain::Filter;
6464

6565
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6666
use std::path::PathBuf;
67+
use std::time::Duration;
6768

6869
// Re-exports
6970
#[doc(inline)]
@@ -343,6 +344,7 @@ struct Config {
343344
peer_timeout_config: PeerTimeoutConfig,
344345
filter_type: FilterType,
345346
block_type: BlockType,
347+
broadcast_timeout: Option<Duration>,
346348
}
347349

348350
impl Default for Config {
@@ -356,6 +358,7 @@ impl Default for Config {
356358
peer_timeout_config: PeerTimeoutConfig::default(),
357359
filter_type: FilterType::default(),
358360
block_type: BlockType::default(),
361+
broadcast_timeout: None,
359362
}
360363
}
361364
}

src/node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,14 @@ impl Node {
7676
peer_timeout_config,
7777
filter_type,
7878
block_type,
79+
broadcast_timeout,
7980
} = config;
8081
// Set up a communication channel between the node and client
8182
let (info_tx, info_rx) = mpsc::channel::<Info>(32);
8283
let (warn_tx, warn_rx) = mpsc::unbounded_channel::<Warning>();
8384
let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
8485
let (ctx, crx) = mpsc::unbounded_channel::<ClientMessage>();
85-
let client = Client::new(info_rx, warn_rx, event_rx, ctx);
86+
let client = Client::new(info_rx, warn_rx, event_rx, ctx, broadcast_timeout);
8687
// A structured way to talk to the client
8788
let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx));
8889
// We always assume we are behind

tests/core.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,98 @@ async fn tx_can_broadcast() {
633633
.unwrap();
634634
}
635635

636+
// Verify that broadcasting a transaction that the peer already has.
637+
#[tokio::test]
638+
async fn tx_broadcast_does_not_hang_when_peer_has_tx() {
639+
let amount_to_us = Amount::from_sat(100_000);
640+
let amount_to_op_return = Amount::from_sat(50_000);
641+
let (bitcoind, socket_addr) = start_bitcoind(true).unwrap();
642+
let rpc = &bitcoind.client;
643+
let tempdir = tempfile::TempDir::new().unwrap().path().to_owned();
644+
let mut rng = StdRng::seed_from_u64(20002);
645+
let secret = SecretKey::new(&mut rng);
646+
let secp = Secp256k1::new();
647+
let keypair = Keypair::from_secret_key(&secp, &secret);
648+
let (internal_key, _) = keypair.x_only_public_key();
649+
let send_to_this_address = Address::p2tr(&secp, internal_key, None, KnownHrp::Regtest);
650+
let miner = rpc.new_address().unwrap();
651+
mine_blocks(rpc, &miner, 110, 10).await;
652+
let tx_info = rpc
653+
.send_to_address(&send_to_this_address, amount_to_us)
654+
.unwrap();
655+
let txid = tx_info.txid().unwrap();
656+
let tx_details = rpc.get_transaction(txid).unwrap().details;
657+
let (vout, amt) = tx_details
658+
.iter()
659+
.find(|detail| detail.address.eq(&send_to_this_address.to_string()))
660+
.map(|detail| (detail.vout, detail.amount))
661+
.unwrap();
662+
let txout = TxOut {
663+
script_pubkey: miner.script_pubkey(),
664+
value: amount_to_op_return,
665+
};
666+
let outpoint = OutPoint { txid, vout };
667+
let txin = TxIn {
668+
previous_output: outpoint,
669+
script_sig: ScriptBuf::default(),
670+
sequence: Sequence::ENABLE_RBF_NO_LOCKTIME,
671+
witness: Witness::default(),
672+
};
673+
let mut unsigned_tx = Transaction {
674+
version: bitcoin::transaction::Version::TWO,
675+
lock_time: absolute::LockTime::ZERO,
676+
input: vec![txin],
677+
output: vec![txout],
678+
};
679+
let input_index = 0;
680+
let sighash_type = TapSighashType::Default;
681+
let prevout = TxOut {
682+
script_pubkey: send_to_this_address.script_pubkey(),
683+
value: Amount::from_btc(amt.abs()).unwrap(),
684+
};
685+
let prevouts = vec![prevout];
686+
let prevouts = Prevouts::All(&prevouts);
687+
let mut sighasher = SighashCache::new(&mut unsigned_tx);
688+
let sighash = sighasher
689+
.taproot_key_spend_signature_hash(input_index, &prevouts, sighash_type)
690+
.unwrap();
691+
let tweaked: bitcoin::key::TweakedKeypair = keypair.tap_tweak(&secp, None);
692+
let msg = bitcoin::secp256k1::Message::from(sighash);
693+
let signature = secp.sign_schnorr(&msg, &tweaked.to_keypair());
694+
let signature = bitcoin::taproot::Signature {
695+
signature,
696+
sighash_type,
697+
};
698+
*sighasher.witness_mut(input_index).unwrap() = Witness::p2tr_key_spend(&signature);
699+
let tx = sighasher.into_transaction().to_owned();
700+
// Submit the tx directly to bitcoind via RPC — now the peer already has it.
701+
rpc.send_raw_transaction(&tx).unwrap();
702+
println!("Submitted tx {} to bitcoind via RPC", tx.compute_txid());
703+
// Now broadcast the same tx via kyoto with a zero timeout.
704+
let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port()));
705+
let (node, client) = bip157::Builder::new(bitcoin::Network::Regtest)
706+
.add_peer(host)
707+
.data_dir(tempdir)
708+
.chain_state(ChainState::Checkpoint(HeaderCheckpoint::from_genesis(
709+
bitcoin::Network::Regtest,
710+
)))
711+
.broadcast_timeout(Duration::ZERO)
712+
.build();
713+
tokio::task::spawn(async move { node.run().await });
714+
let Client {
715+
requester,
716+
info_rx: _,
717+
warn_rx: _,
718+
event_rx: _,
719+
} = client;
720+
let result = requester.broadcast_tx(tx).await;
721+
assert!(
722+
matches!(result, Err(bip157::ClientError::BroadcastTimeout)),
723+
"expected BroadcastTimeout, got: {:?}",
724+
result,
725+
);
726+
}
727+
636728
#[tokio::test]
637729
async fn dns_works() {
638730
let hostname = bip157::lookup_host("seed.bitcoin.sipa.be").await;

0 commit comments

Comments
 (0)