diff --git a/Cargo.lock b/Cargo.lock index d824999..30179fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -218,7 +218,7 @@ checksum = "0b47c4ab7a93edb0c7198c5535ed9b52b63095f4e9b45279c6736cec4b856baf" [[package]] name = "bitcoin-jsonrpsee" version = "0.1.1" -source = "git+https://github.com/LayerTwo-Labs/bitcoin-jsonrpsee.git?rev=4c8edd85722ee65e7598323758b3c032efac37bd#4c8edd85722ee65e7598323758b3c032efac37bd" +source = "git+https://github.com/LayerTwo-Labs/bitcoin-jsonrpsee.git?rev=5a1d8c84cd6d1a357106e0397ae66cc31e55b855#5a1d8c84cd6d1a357106e0397ae66cc31e55b855" dependencies = [ "base64", "bitcoin", diff --git a/Cargo.toml b/Cargo.toml index 8c7df8c..6613090 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ publish = false [workspace.dependencies.bitcoin-jsonrpsee] git = "https://github.com/LayerTwo-Labs/bitcoin-jsonrpsee.git" -rev = "4c8edd85722ee65e7598323758b3c032efac37bd" +rev = "5a1d8c84cd6d1a357106e0397ae66cc31e55b855" [workspace.dependencies.futures] version = "0.3.30" diff --git a/app/main.rs b/app/main.rs index ec8aa40..e3f30b7 100644 --- a/app/main.rs +++ b/app/main.rs @@ -31,10 +31,13 @@ fn set_tracing_subscriber(log_level: tracing::Level) -> anyhow::Result<()> { }) } -async fn spawn_rpc_server( - server: server::Server, +async fn spawn_rpc_server( + server: server::Server, serve_rpc_addr: SocketAddr, -) -> anyhow::Result { +) -> anyhow::Result +where + RpcClient: bitcoin_jsonrpsee::MainClient + Send + Sync + 'static, +{ tracing::info!("serving RPC on {}", serve_rpc_addr); use server::RpcServer; @@ -98,6 +101,7 @@ async fn main() -> anyhow::Result<()> { }; mempool::init_sync_mempool( &mut enforcer, + network, &rpc_client, &cli.node_zmq_addr_sequence, shutdown_signal, @@ -109,7 +113,7 @@ async fn main() -> anyhow::Result<()> { enforcer, mempool, tx_cache, - rpc_client, + rpc_client.clone(), sequence_stream, |err| async { let err = anyhow::Error::from(err); @@ -121,6 +125,7 @@ async fn main() -> anyhow::Result<()> { mempool, network, network_info, + rpc_client, sample_block_template, )?; let rpc_server_handle = diff --git a/lib/cusf_enforcer.rs b/lib/cusf_enforcer.rs index 35108f6..c647a8e 100644 --- a/lib/cusf_enforcer.rs +++ b/lib/cusf_enforcer.rs @@ -33,6 +33,9 @@ pub enum TxAcceptAction { /// Transactions that conflict with this one. /// It is not necessary to specify conflicts due to common inputs. conflicts_with: HashSet, + /// Tweak the weight by the specified value, in wu. + /// The weight will saturate at zero and [`Weight::MAX`]. + weight_tweak: i64, }, Reject, } @@ -476,14 +479,21 @@ where match self.0.accept_tx(tx, tx_inputs).map_err(Either::Left)? { TxAcceptAction::Accept { conflicts_with: left_conflicts, + weight_tweak: left_weight_tweak, } => { match self.1.accept_tx(tx, tx_inputs).map_err(Either::Right)? { TxAcceptAction::Accept { conflicts_with: right_conflicts, + weight_tweak: right_weight_tweak, } => { let mut conflicts_with = left_conflicts; conflicts_with.extend(right_conflicts); - Ok(TxAcceptAction::Accept { conflicts_with }) + let weight_tweak = left_weight_tweak + .saturating_add(right_weight_tweak); + Ok(TxAcceptAction::Accept { + conflicts_with, + weight_tweak, + }) } TxAcceptAction::Reject => Ok(TxAcceptAction::Reject), } @@ -537,6 +547,7 @@ impl CusfEnforcer for DefaultEnforcer { { Ok(TxAcceptAction::Accept { conflicts_with: HashSet::new(), + weight_tweak: 0, }) } } diff --git a/lib/mempool/mod.rs b/lib/mempool/mod.rs index c05f9be..835b605 100644 --- a/lib/mempool/mod.rs +++ b/lib/mempool/mod.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashMap, VecDeque}; -use bitcoin::{BlockHash, Target, Transaction, Txid, Weight}; +use bitcoin::{BlockHash, Network, Target, Transaction, Txid, Weight}; use bitcoin_jsonrpsee::client::{BlockTemplateTransaction, RawMempoolTxFees}; use hashlink::{LinkedHashMap, LinkedHashSet}; use imbl::{ordmap, OrdMap, OrdSet}; @@ -17,41 +17,262 @@ pub use sync::{ MempoolSync, }; -#[derive(Clone, Copy, Debug, Eq)] +mod refinement_cmp { + //! Refinement comparison traits and wrappers, for types that have more than + //! one possible ordering/equality relation. + //! The traits in this module must obey the same laws as their counterparts + //! in [`std::cmp`]. + + use std::cmp::Ordering; + + pub trait RefinementPartialEq + where + Rhs: ?Sized, + { + #[must_use] + fn eq(&self, other: &Rhs) -> bool; + + #[inline] + #[must_use] + fn ne(&self, other: &Rhs) -> bool { + !self.eq(other) + } + } + + pub trait RefinementPartialOrd: + RefinementPartialEq + where + Rhs: ?Sized, + { + #[must_use] + fn partial_cmp(&self, other: &Rhs) -> Option; + + #[inline] + #[must_use] + fn lt(&self, other: &Rhs) -> bool { + self.partial_cmp(other).is_some_and(Ordering::is_lt) + } + + #[inline] + #[must_use] + fn le(&self, other: &Rhs) -> bool { + self.partial_cmp(other).is_some_and(Ordering::is_le) + } + + #[inline] + #[must_use] + fn gt(&self, other: &Rhs) -> bool { + self.partial_cmp(other).is_some_and(Ordering::is_gt) + } + + #[inline] + #[must_use] + fn ge(&self, other: &Rhs) -> bool { + self.partial_cmp(other).is_some_and(Ordering::is_ge) + } + } + + pub trait RefinementEq: RefinementPartialEq {} + + pub trait RefinementOrd: RefinementEq + RefinementPartialOrd { + #[must_use] + fn cmp(&self, other: &Self) -> Ordering; + + #[inline] + #[must_use] + fn max(self, other: Self) -> Self + where + Self: Sized, + { + if other.lt(&self) { + self + } else { + other + } + } + + #[inline] + #[must_use] + fn min(self, other: Self) -> Self + where + Self: Sized, + { + if other.lt(&self) { + other + } else { + self + } + } + + #[inline] + #[must_use] + fn clamp(self, min: Self, max: Self) -> Self + where + Self: Sized, + { + assert!(min.le(&max)); + if self.lt(&min) { + min + } else if self.gt(&max) { + max + } else { + self + } + } + } + + /// Wrapper struct that implements comparison traits from [`std::cmp`]. + #[derive(Clone, Copy, Debug)] + #[repr(transparent)] + pub struct RefinementCmp(pub T); + + impl PartialEq> for RefinementCmp + where + T: RefinementPartialEq, + { + #[inline(always)] + fn eq(&self, other: &RefinementCmp) -> bool { + >::eq(&self.0, &other.0) + } + + #[allow(clippy::partialeq_ne_impl)] + #[inline(always)] + fn ne(&self, other: &RefinementCmp) -> bool { + >::ne(&self.0, &other.0) + } + } + + impl PartialOrd> for RefinementCmp + where + T: RefinementPartialOrd, + { + #[inline(always)] + fn partial_cmp(&self, other: &RefinementCmp) -> Option { + >::partial_cmp(&self.0, &other.0) + } + + #[inline(always)] + fn lt(&self, other: &RefinementCmp) -> bool { + >::lt(&self.0, &other.0) + } + + #[inline(always)] + fn le(&self, other: &RefinementCmp) -> bool { + >::le(&self.0, &other.0) + } + + #[inline(always)] + fn gt(&self, other: &RefinementCmp) -> bool { + >::gt(&self.0, &other.0) + } + + #[inline(always)] + fn ge(&self, other: &RefinementCmp) -> bool { + >::ge(&self.0, &other.0) + } + } + + impl Eq for RefinementCmp where T: RefinementEq {} + + impl Ord for RefinementCmp + where + T: RefinementOrd, + { + #[inline(always)] + fn cmp(&self, other: &Self) -> Ordering { + ::cmp(&self.0, &other.0) + } + + #[inline(always)] + fn max(self, other: Self) -> Self + where + Self: Sized, + { + Self(::max(self.0, other.0)) + } + + #[inline(always)] + fn min(self, other: Self) -> Self + where + Self: Sized, + { + Self(::min(self.0, other.0)) + } + + #[inline(always)] + fn clamp(self, min: Self, max: Self) -> Self + where + Self: Sized, + { + Self(::clamp(self.0, min.0, max.0)) + } + } +} + +#[derive(Clone, Copy, Debug)] pub struct FeeRate { fee: u64, - size: u64, + vsize: u64, } -impl Ord for FeeRate { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { +impl FeeRate { + /// Refinement comparison (refinement of canonical quotient comparison). + /// This will always be equivalent to canonical quotient comparison if + /// `self` and `other` are not equal when comparing as quotients. + /// If `self` and `other` are equal when comparing as quotients, + /// orders by `vsize`. + /// ``` + /// (self.fee / self.vsize) < (other.fee / other.vsize) ==> self < other, + /// (self.fee / self.vsize) > (other.fee / other.vsize) ==> self > other, + /// (self.fee == other.fee /\ other.fee == other.vsize) <==> self == other, + /// ((self.fee / self.vsize) == (other.fee / other.vsize) + /// /\ self.vsize < other.vsize) ==> self < other, + /// ((self.fee / self.vsize) == (other.fee / other.vsize) + /// /\ self.vsize > other.vsize) ==> self > other + /// ``` + fn refinement_cmp(&self, other: &Self) -> std::cmp::Ordering { + use std::cmp::Ordering; // (self.fee / self.size) > (other.fee / other.size) ==> // (self.fee * other.size) > (other.fee * self.size) - let lhs = self.fee as u128 * other.size as u128; - let rhs = other.fee as u128 * self.size as u128; - lhs.cmp(&rhs) + let lhs = self.fee as u128 * other.vsize as u128; + let rhs = other.fee as u128 * self.vsize as u128; + match lhs.cmp(&rhs) { + Ordering::Equal => self.vsize.cmp(&other.vsize), + res => res, + } + } +} + +impl refinement_cmp::RefinementOrd for FeeRate { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.refinement_cmp(other) } } -impl PartialEq for FeeRate { +impl refinement_cmp::RefinementPartialEq for FeeRate { fn eq(&self, other: &Self) -> bool { - self.cmp(other).is_eq() + ::cmp(self, other).is_eq() } } -impl PartialOrd for FeeRate { +impl refinement_cmp::RefinementEq for FeeRate {} + +impl refinement_cmp::RefinementPartialOrd for FeeRate { fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + Some(::cmp(self, other)) } } #[derive(Clone, Debug)] pub struct TxInfo { - pub ancestor_size: u64, + pub ancestor_modified_weight: Weight, + pub ancestor_vsize: u64, pub bip125_replaceable: bool, pub depends: OrdSet, - pub descendant_size: u64, + pub descendant_modified_weight: Weight, + pub descendant_vsize: u64, pub fees: RawMempoolTxFees, + pub modified_weight: Weight, pub spent_by: OrdSet, /// Conflicts due to reasons other than shared inputs pub conflicts_with: OrdSet, @@ -162,16 +383,21 @@ impl Conflicts { } #[derive(Clone, Debug, Default)] -struct ByAncestorFeeRate(OrdMap>); +struct ByAncestorFeeRate( + OrdMap, LinkedHashSet>, +); impl ByAncestorFeeRate { fn insert(&mut self, fee_rate: FeeRate, txid: Txid) { - self.0.entry(fee_rate).or_default().insert(txid); + self.0 + .entry(refinement_cmp::RefinementCmp(fee_rate)) + .or_default() + .insert(txid); } /// returns `true` if removed successfully, or `false` if not found fn remove(&mut self, fee_rate: FeeRate, txid: Txid) -> bool { - match self.0.entry(fee_rate) { + match self.0.entry(refinement_cmp::RefinementCmp(fee_rate)) { ordmap::Entry::Occupied(mut entry) => { let txs = entry.get_mut(); txs.remove(&txid); @@ -188,7 +414,7 @@ impl ByAncestorFeeRate { #[allow(dead_code)] fn iter(&self) -> impl DoubleEndedIterator + '_ { self.0.iter().flat_map(|(fee_rate, txids)| { - txids.iter().map(|txid| (*fee_rate, *txid)) + txids.iter().map(|txid| (fee_rate.0, *txid)) }) } @@ -197,7 +423,7 @@ impl ByAncestorFeeRate { &self, ) -> impl DoubleEndedIterator + '_ { self.0.iter().rev().flat_map(|(fee_rate, txids)| { - txids.iter().map(|txid| (*fee_rate, *txid)) + txids.iter().map(|txid| (fee_rate.0, *txid)) }) } } @@ -256,11 +482,71 @@ impl TxChilds { #[derive(Clone, Debug, Default)] struct MempoolTxs(imbl::HashMap); +pub const MAX_USABLE_BLOCK_WEIGHT: Weight = { + const COINBASE_TXIN_WEIGHT: Weight = { + let weight_wu = Weight::from_non_witness_data_size( + // outpoint + 36 + // sequence + + 4 + // SPK + + 5 + ).to_wu() + // witness + + Weight::from_witness_data_size(33).to_wu(); + Weight::from_wu(weight_wu) + }; + const COINBASE_VALUE_TXOUT_WEIGHT: Weight = { + let weight_wu = Weight::from_non_witness_data_size(bitcoin::Amount::SIZE as u64).to_wu() + // SPK weight + + Weight::from_non_witness_data_size(23).to_wu(); + Weight::from_wu(weight_wu) + }; + const COINBASE_TX_WEIGHT: Weight = Weight::from_wu( + // version + Weight::from_non_witness_data_size(4).to_wu() + // locktime + + Weight::from_non_witness_data_size(4).to_wu() + // inputs + + Weight::from_non_witness_data_size(1).to_wu() + + COINBASE_TXIN_WEIGHT.to_wu() + // outputs + + Weight::from_non_witness_data_size(1).to_wu() + + COINBASE_VALUE_TXOUT_WEIGHT.to_wu(), + ); + let res_wu = Weight::MAX_BLOCK.to_wu() - Weight::from_non_witness_data_size( + bitcoin::block::Header::SIZE as u64 + ).to_wu() + // 2 bytes for encoding txs array length + - Weight::from_non_witness_data_size(2).to_wu() + - COINBASE_TX_WEIGHT.to_wu(); + Weight::from_wu(res_wu) +}; + +#[inline(always)] +fn saturating_add_weight(lhs: Weight, rhs: Weight) -> Weight { + let res_wu = lhs.to_wu().saturating_add(rhs.to_wu()); + Weight::from_wu(res_wu) +} + +#[inline(always)] +fn saturating_add_signed_weight(lhs: Weight, rhs_wu: i64) -> Weight { + let res_wu = lhs.to_wu().saturating_add_signed(rhs_wu); + Weight::from_wu(res_wu) +} + +#[inline(always)] +fn saturating_sub_weight(lhs: Weight, rhs: Weight) -> Weight { + let res_wu = lhs.to_wu().saturating_sub(rhs.to_wu()); + Weight::from_wu(res_wu) +} + // MUST be cheap to clone so that constructing block templates is cheap #[derive(Clone, Debug)] pub struct Mempool { by_ancestor_fee_rate: ByAncestorFeeRate, chain: Chain, + network: Network, /// Map of txs (which may not be in the mempool) to their direct child txs, /// which MUST be in the mempool tx_childs: TxChilds, @@ -268,7 +554,7 @@ pub struct Mempool { } impl Mempool { - fn new(prev_blockhash: BlockHash) -> Self { + fn new(network: Network, prev_blockhash: BlockHash) -> Self { let chain = Chain { tip: prev_blockhash, blocks: imbl::HashMap::new(), @@ -276,6 +562,7 @@ impl Mempool { Self { by_ancestor_fee_rate: ByAncestorFeeRate::default(), chain, + network, tx_childs: TxChilds::default(), txs: MempoolTxs::default(), } @@ -285,9 +572,32 @@ impl Mempool { &self.chain.blocks[&self.chain.tip] } - pub fn next_target(&self) -> Target { - // FIXME: calculate this properly - self.chain.blocks[&self.chain.tip].compact_target.into() + /// Next target, if known + pub fn next_target(&self) -> Option { + let tip = self.tip(); + let next_height = tip.height + 1; + let network_params = self.network.params(); + if !network_params.no_pow_retargeting + && next_height % network_params.miner_confirmation_window == 0 + { + if let Some(first_block_in_period) = self + .chain + .iter() + .nth(network_params.miner_confirmation_window as usize - 1) + { + let spacing = tip.time - first_block_in_period.time; + let res = bitcoin::CompactTarget::from_next_work_required( + tip.compact_target, + spacing as u64, + network_params, + ); + Some(res.into()) + } else { + None + } + } else { + Some(tip.compact_target.into()) + } } /// Insert a tx into the mempool, @@ -298,6 +608,7 @@ impl Mempool { tx: Transaction, fee: u64, conflicts_with: imbl::OrdSet, + modified_weight: Weight, ) -> Result, MempoolInsertError> { let txid = tx.compute_txid(); if self.txs.0.contains_key(&txid) { @@ -310,9 +621,13 @@ impl Mempool { let modified_fee = fee; let vsize = tx.vsize() as u64; // initially incorrect, must be computed after insertion - let mut ancestor_size = vsize; + let mut ancestor_modified_weight = modified_weight; + // initially incorrect, must be computed after insertion + let mut ancestor_vsize = vsize; // initially incorrect, must be computed after insertion - let mut descendant_size = vsize; + let mut descendant_modified_weight = modified_weight; + // initially incorrect, must be computed after insertion + let mut descendant_vsize = vsize; // conflicts including ancestor conflicts let mut ancestry_conflicts = conflicts_with.clone(); let depends = tx @@ -341,22 +656,25 @@ impl Mempool { OrdSet::new() }; let info = TxInfo { - ancestor_size, + ancestor_modified_weight, + ancestor_vsize, bip125_replaceable: tx.is_explicitly_rbf(), depends, - descendant_size, + descendant_modified_weight, + descendant_vsize, fees: RawMempoolTxFees { ancestor: ancestor_fees, base: fee, descendant: descendant_fees, modified: modified_fee, }, + modified_weight, spent_by, conflicts_with: ancestry_conflicts, }; let (ndeps, nspenders) = (info.depends.len(), info.spent_by.len()); let res = self.txs.0.insert(txid, (tx, info)).map(|(_, info)| info); - tracing::debug!( + tracing::trace!( fee = %bitcoin::Amount::from_sat(fee).display_dynamic(), modified_fee = %bitcoin::Amount::from_sat(modified_fee).display_dynamic(), %txid, @@ -364,18 +682,35 @@ impl Mempool { ); self.txs.ancestors_mut(txid).try_for_each(|ancestor_info| { let (ancestor_tx, ancestor_info) = ancestor_info?; - ancestor_size += ancestor_tx.vsize() as u64; + ancestor_vsize += ancestor_tx.vsize() as u64; + ancestor_modified_weight = saturating_add_weight( + ancestor_modified_weight, + ancestor_info.modified_weight, + ); ancestor_fees += ancestor_info.fees.modified; - ancestor_info.descendant_size += vsize; + ancestor_info.descendant_vsize += vsize; + ancestor_info.descendant_modified_weight = saturating_add_weight( + ancestor_info.descendant_modified_weight, + modified_weight, + ); ancestor_info.fees.descendant += modified_fee; Result::<_, MempoolInsertError>::Ok(()) })?; self.txs.descendants_mut(txid).skip(1).try_for_each( |descendant_info| { let (descendant_tx, descendant_info) = descendant_info?; - descendant_size += descendant_tx.vsize() as u64; + descendant_vsize += descendant_tx.vsize() as u64; + descendant_modified_weight = saturating_add_weight( + descendant_modified_weight, + descendant_info.modified_weight, + ); descendant_fees += descendant_info.fees.modified; - descendant_info.ancestor_size += vsize; + descendant_info.ancestor_vsize += vsize; + descendant_info.ancestor_modified_weight = + saturating_add_weight( + descendant_info.ancestor_modified_weight, + modified_weight, + ); descendant_info.fees.ancestor += modified_fee; descendant_info.conflicts_with = descendant_info .conflicts_with @@ -396,11 +731,13 @@ impl Mempool { let (_, info) = self.txs.0.get_mut(&txid).unwrap(); info.fees.ancestor = ancestor_fees; info.fees.descendant = descendant_fees; - info.ancestor_size = ancestor_size; - info.descendant_size = descendant_size; + info.ancestor_modified_weight = ancestor_modified_weight; + info.ancestor_vsize = ancestor_vsize; + info.descendant_modified_weight = descendant_modified_weight; + info.descendant_vsize = descendant_vsize; let ancestor_fee_rate = FeeRate { fee: ancestor_fees, - size: ancestor_size, + vsize: ancestor_modified_weight.to_vbytes_ceil(), }; self.by_ancestor_fee_rate.insert(ancestor_fee_rate, txid); Ok(res) @@ -414,7 +751,8 @@ impl Mempool { let Some((tx, info)) = self.txs.0.get(txid) else { return Ok(None); }; - let ancestor_size = info.ancestor_size; + let ancestor_modified_weight = info.ancestor_modified_weight; + let modified_weight = info.modified_weight; let vsize = tx.vsize() as u64; let fees = RawMempoolTxFees { ..info.fees }; for spent_tx in tx.input.iter().map(|input| input.previous_output.txid) @@ -428,7 +766,7 @@ impl Mempool { let (desc_tx, desc_info) = desc?; let ancestor_fee_rate = FeeRate { fee: desc_info.fees.ancestor, - size: desc_info.ancestor_size, + vsize: desc_info.ancestor_modified_weight.to_vbytes_ceil(), }; let desc_txid = desc_tx.compute_txid(); if !self @@ -438,33 +776,38 @@ impl Mempool { let err = MissingByAncestorFeeRateKeyError(ancestor_fee_rate); return Err(err.into()); }; - desc_info.ancestor_size -= vsize; + desc_info.ancestor_modified_weight = saturating_sub_weight( + desc_info.ancestor_modified_weight, + modified_weight, + ); + desc_info.ancestor_vsize -= vsize; desc_info.fees.ancestor -= fees.modified; let ancestor_fee_rate = FeeRate { fee: desc_info.fees.ancestor, - size: desc_info.ancestor_size, + vsize: desc_info.ancestor_modified_weight.to_vbytes_ceil(), }; self.by_ancestor_fee_rate .insert(ancestor_fee_rate, desc_txid); // FIXME: remove - tracing::debug!("removing {txid} as a dep of {desc_txid}"); + tracing::trace!("removing {txid} as a dep of {desc_txid}"); desc_info.depends.remove(txid); Result::<_, MempoolRemoveError>::Ok(()) })?; // Update all ancestors let () = self.txs.ancestors_mut(*txid).try_for_each(|anc| { - let (anc_tx, anc_info) = anc?; - anc_info.descendant_size -= vsize; + let (_anc_tx, anc_info) = anc?; + anc_info.descendant_modified_weight = saturating_sub_weight( + anc_info.descendant_modified_weight, + modified_weight, + ); + anc_info.descendant_vsize -= vsize; anc_info.fees.descendant -= fees.modified; - let anc_txid = anc_tx.compute_txid(); - // FIXME: remove - tracing::debug!("removing {txid} as a spender of {anc_txid}"); anc_info.spent_by.remove(txid); Result::<_, MempoolRemoveError>::Ok(()) })?; let ancestor_fee_rate = FeeRate { fee: fees.ancestor, - size: ancestor_size, + vsize: ancestor_modified_weight.to_vbytes_ceil(), }; // Update `self.by_ancestor_fee_rate` if !self.by_ancestor_fee_rate.remove(ancestor_fee_rate, *txid) { @@ -472,8 +815,6 @@ impl Mempool { return Err(err.into()); }; let res = self.txs.0.remove(txid); - // FIXME: remove - tracing::debug!("Removed {txid} from mempool"); Ok(res) } @@ -576,6 +917,9 @@ impl Mempool { ) -> Result<(), MempoolUpdateError> { for (txid_lo, conflict_txids) in conflicts.iter() { let conflict_txids = OrdSet::from(conflict_txids); + if conflict_txids.is_empty() { + break; + } self.txs.descendants_mut(*txid_lo).try_for_each( |descendant_info| { let (_desc_tx, desc_info) = descendant_info?; @@ -589,6 +933,9 @@ impl Mempool { } for (txid_hi, conflict_txids) in conflicts.iter_inverted() { let conflict_txids = OrdSet::from(conflict_txids); + if conflict_txids.is_empty() { + break; + } self.txs.descendants_mut(txid_hi).try_for_each( |descendant_info| { let (_desc_tx, desc_info) = descendant_info?; @@ -603,21 +950,92 @@ impl Mempool { Ok(()) } - /// choose txs for a block proposal, mutating the underlying mempool + /// Tweak the weight for a tx + fn tweak_weight( + &mut self, + txid: Txid, + tweak: i64, + ) -> Result<(), MempoolRemoveError> { + if tweak == 0 { + return Ok(()); + } + let Some((_, tx_info)) = self.txs.0.get_mut(&txid) else { + return Ok(()); + }; + let modified_weight = + saturating_add_signed_weight(tx_info.modified_weight, tweak); + // weight amount that was actually tweaked. May have lower absolute + // value due to saturating addition + let tweaked_wu = if tweak >= 0 { + (modified_weight - tx_info.modified_weight).to_wu() as i64 + } else { + -((tx_info.modified_weight - modified_weight).to_wu() as i64) + }; + tx_info.modified_weight = modified_weight; + tx_info.descendant_modified_weight = saturating_add_signed_weight( + tx_info.descendant_modified_weight, + tweaked_wu, + ); + self.txs.ancestors_mut(txid).try_for_each(|ancestor_info| { + let (_, anc_info) = ancestor_info?; + anc_info.descendant_modified_weight = saturating_add_signed_weight( + anc_info.descendant_modified_weight, + tweaked_wu, + ); + Ok::<_, MempoolRemoveError>(()) + })?; + self.txs + .descendants_mut(txid) + .try_for_each(|descendant_info| { + let (desc_tx, desc_info) = descendant_info?; + let desc_txid = desc_tx.compute_txid(); + let mut ancestor_fee_rate = FeeRate { + fee: desc_info.fees.ancestor, + vsize: desc_info.ancestor_modified_weight.to_vbytes_ceil(), + }; + if !self + .by_ancestor_fee_rate + .remove(ancestor_fee_rate, desc_txid) + { + let err = + MissingByAncestorFeeRateKeyError(ancestor_fee_rate); + return Err(err.into()); + }; + desc_info.ancestor_modified_weight = + saturating_add_signed_weight( + desc_info.ancestor_modified_weight, + tweaked_wu, + ); + ancestor_fee_rate.vsize = + desc_info.ancestor_modified_weight.to_vbytes_ceil(); + self.by_ancestor_fee_rate + .insert(ancestor_fee_rate, desc_txid); + Ok::<_, MempoolRemoveError>(()) + })?; + Ok(()) + } + + /// choose txs for a block proposal, mutating the underlying mempool. + /// If no weight limit is specified, or the specified weight exceeds + /// `Weight::MAX_BLOCK`, then `Weight::MAX_BLOCK` will be used as the + /// weight limit. fn propose_txs_mut( &mut self, + weight_limit: Option, ) -> Result, MempoolRemoveError> { let mut res = IndexSet::new(); - let mut total_size = 0; + let mut weight_remaining = weight_limit + .unwrap_or(MAX_USABLE_BLOCK_WEIGHT) + .min(MAX_USABLE_BLOCK_WEIGHT); + tracing::debug!(%weight_remaining, "Selecting txs"); loop { let Some((ancestor_fee_rate, txid)) = self .by_ancestor_fee_rate .iter_rev() .find(|(ancestor_fee_rate, _txid)| { - let total_weight = - Weight::from_vb(total_size + ancestor_fee_rate.size); - total_weight - .is_some_and(|weight| weight < Weight::MAX_BLOCK) + Weight::from_vb(ancestor_fee_rate.vsize).is_some_and( + |ancestors_weight| ancestors_weight <= weight_remaining, + ) }) else { break; @@ -652,15 +1070,17 @@ impl Mempool { to_add.extend(info.depends.iter().map(|dep| (*dep, false))) } } - total_size += ancestor_fee_rate.size; + let txs_weight = Weight::from_vb_unwrap(ancestor_fee_rate.vsize); + weight_remaining -= txs_weight; } Ok(res) } pub fn propose_txs( &self, + weight_limit: Option, ) -> Result, MempoolRemoveError> { - let mut txs = self.clone().propose_txs_mut()?; + let mut txs = self.clone().propose_txs_mut(weight_limit)?; let mut res = Vec::new(); // build result in reverse order while let Some(txid) = txs.pop() { diff --git a/lib/mempool/sync/initial_sync.rs b/lib/mempool/sync/initial_sync.rs index 493d319..42f78eb 100644 --- a/lib/mempool/sync/initial_sync.rs +++ b/lib/mempool/sync/initial_sync.rs @@ -373,10 +373,11 @@ where tx.clone(), fee_delta.to_sat(), imbl::OrdSet::new(), + tx.weight(), )?; tracing::trace!("added {txid} to mempool"); let mempool_txs = sync_state.mempool.txs.0.len(); - tracing::debug!(%mempool_txs, "Syncing..."); + tracing::trace!(%mempool_txs, "Syncing..."); Ok(true) } @@ -503,6 +504,7 @@ pub async fn init_sync_mempool< RpcClient, >( enforcer: &mut Enforcer, + network: bitcoin::Network, rpc_client: &RpcClient, zmq_addr_sequence: &str, shutdown_signal: Signal, // Would it be better to return a Some/None, indicating sync stoppage? @@ -546,7 +548,7 @@ where blocks_needed: LinkedHashSet::from_iter([best_block_hash]), enforcer, first_mempool_sequence: Some(mempool_sequence + 1), - mempool: Mempool::new(best_block_hash), + mempool: Mempool::new(network, best_block_hash), post_sync: PostSync::default(), request_queue, seq_message_queue, diff --git a/lib/mempool/sync/task.rs b/lib/mempool/sync/task.rs index e87c772..fddcd6b 100644 --- a/lib/mempool/sync/task.rs +++ b/lib/mempool/sync/task.rs @@ -7,7 +7,7 @@ use std::{ use bitcoin::{ consensus::Decodable, hashes::Hash as _, Amount, BlockHash, OutPoint, - Transaction, Txid, + Transaction, Txid, Weight, }; use educe::Educe; use futures::{future::BoxFuture, stream, StreamExt as _}; @@ -229,7 +229,13 @@ where } // TODO: figure out fee/or conflicts? - inner.mempool.insert(decoded, 0, imbl::OrdSet::new())?; + let unmodified_weight = decoded.weight(); + inner.mempool.insert( + decoded, + 0, + imbl::OrdSet::new(), + unmodified_weight, + )?; inserted_txs += 1; } @@ -360,11 +366,18 @@ where .accept_tx(tx, &input_txs) .map_err(cusf_enforcer::Error::AcceptTx)? { - cusf_enforcer::TxAcceptAction::Accept { conflicts_with } => { + cusf_enforcer::TxAcceptAction::Accept { + conflicts_with, + weight_tweak, + } => { + let modified_weight_wu = + tx.weight().to_wu().saturating_add_signed(weight_tweak); + let modified_weight = Weight::from_wu(modified_weight_wu); inner.mempool.insert( tx.clone(), fee_delta.to_sat(), conflicts_with.into(), + modified_weight, )?; tracing::trace!("added {txid} to mempool"); } @@ -535,6 +548,7 @@ where ref mut mempool, } = *inner_write; let mut conflicts = Conflicts::default(); + let mut weight_tweaks = Vec::new(); let rejected_txs = mempool .try_filter(true, |tx, mempool_inputs| { let mut tx_inputs = mempool_inputs.clone(); @@ -549,11 +563,15 @@ where match enforcer.accept_tx(tx, &tx_inputs)? { cusf_enforcer::TxAcceptAction::Accept { conflicts_with, + weight_tweak, } => { let txid = tx.compute_txid(); for conflict_txid in conflicts_with { conflicts.insert(txid, conflict_txid); } + if weight_tweak != 0 { + weight_tweaks.push((txid, weight_tweak)); + } Ok(true) } cusf_enforcer::TxAcceptAction::Reject => Ok(false), @@ -572,6 +590,9 @@ where .copied() .collect(); let () = mempool.add_conflicts(conflicts)?; + for (txid, weight_tweak) in weight_tweaks { + let () = mempool.tweak_weight(txid, weight_tweak)?; + } drop(inner_write); rejected_txs }; diff --git a/lib/server.rs b/lib/server.rs index e25429e..7ea6aa7 100644 --- a/lib/server.rs +++ b/lib/server.rs @@ -1,10 +1,10 @@ -use std::convert::Infallible; +use std::{collections::HashMap, convert::Infallible}; use async_trait::async_trait; use bitcoin::{ amount::CheckedSum, hashes::Hash as _, merkle_tree, script::PushBytesBuf, Amount, Block, BlockHash, Network, ScriptBuf, Transaction, TxOut, Txid, - WitnessMerkleNode, Wtxid, + Weight, WitnessMerkleNode, Wtxid, }; use bitcoin_jsonrpsee::client::{ BlockTemplate, BlockTemplateRequest, BlockTemplateTransaction, @@ -18,7 +18,7 @@ use thiserror::Error; use crate::{ cusf_block_producer::{self, CusfBlockProducer, InitialBlockTemplate}, - mempool::MempoolSync, + mempool::{self, MempoolSync}, }; #[rpc(client, server)] @@ -28,6 +28,14 @@ pub trait Rpc { &self, request: BlockTemplateRequest, ) -> RpcResult; + + /// Returns None if the block is invalid, otherwise the error code + /// describing why the block was rejected. + #[method(name = "submitblock")] + async fn submit_block( + &self, + block_hex: String, + ) -> RpcResult>; } #[derive(Debug, Error)] @@ -36,20 +44,24 @@ pub enum CreateServerError { SampleBlockTemplate, } -pub struct Server { +pub struct Server { coinbase_spk: ScriptBuf, mempool: MempoolSync, network: Network, network_info: NetworkInfo, + rpc_client: RpcClient, sample_block_template: BlockTemplate, + /// Map of block hashes to known targets for the next block + known_targets: parking_lot::RwLock>, } -impl Server { +impl Server { pub fn new( coinbase_spk: ScriptBuf, mempool: MempoolSync, network: Network, network_info: NetworkInfo, + rpc_client: RpcClient, sample_block_template: BlockTemplate, ) -> Result { if matches!( @@ -63,7 +75,9 @@ impl Server { mempool, network, network_info, + rpc_client, sample_block_template, + known_targets: parking_lot::RwLock::new(HashMap::new()), }) } } @@ -303,7 +317,8 @@ async fn block_txs( let mut mempool = mempool.clone(); tracing::debug!("Inserting prefix txs into cloned mempool"); for (tx, fee) in initial_block_template.prefix_txs.iter().cloned() { - match mempool.insert(tx, fee.to_sat(), Default::default()) { + let weight = tx.weight(); + match mempool.insert(tx, fee.to_sat(), Default::default(), weight) { Ok(_) | Err(crate::mempool::MempoolInsertError::TxAlreadyExists { .. @@ -362,16 +377,66 @@ async fn block_txs( })?; } tracing::debug!("Proposing txs for inclusion in block"); - let mempool_txs = mempool.propose_txs()?; + let coinbase_txouts_weight = { + let txouts_weight = match typewit::MakeTypeWitness::MAKE { + typewit::const_marker::BoolWit::True(wit) => { + let wit = wit.map(cusf_block_producer::CoinbaseTxouts); + wit.in_ref() + .to_right(&initial_block_template.coinbase_txouts) + .iter() + .map(|tx_out| tx_out.weight()) + .sum() + } + typewit::const_marker::BoolWit::False(_) => Weight::ZERO, + }; + const COINBASE_WITNESS_COMMITMENT_TXOUT_WEIGHT: Weight = { + let weight_wu = Weight::from_non_witness_data_size(Amount::SIZE as u64).to_wu() + // SPK weight + + Weight::from_non_witness_data_size(39).to_wu(); + Weight::from_wu(weight_wu) + }; + Weight::from_wu( + txouts_weight.to_wu() + + COINBASE_WITNESS_COMMITMENT_TXOUT_WEIGHT.to_wu(), + ) + }; + let prefix_txs_weight = initial_block_template + .prefix_txs + .iter() + .map(|(tx, _)| tx.weight()) + .sum(); + let initial_block_template_weight = + coinbase_txouts_weight + prefix_txs_weight; + let mempool_txs = mempool.propose_txs(Some( + mempool::MAX_USABLE_BLOCK_WEIGHT - initial_block_template_weight, + ))?; { - let proposed_txids: String = format!( - "[{}]", - mempool_txs - .iter() - .map(|tx| tx.txid.to_string()) - .collect::>() - .join(", ") - ); + let proposed_txids: String = { + use std::fmt::Write; + // Above this number of txs, truncate txids to save space in logs + const SHOW_FULL_TXIDS_MAX_TXS: usize = 10; + let mut proposed_txids = "[".to_owned(); + let n_txs = mempool_txs.len(); + for (idx, tx) in mempool_txs.iter().enumerate() { + if idx < SHOW_FULL_TXIDS_MAX_TXS { + write!(&mut proposed_txids, "{}", tx.txid).unwrap(); + } else { + let txid_bytes = tx.txid.as_byte_array(); + for byte in txid_bytes.iter().rev().take(4) { + write!(&mut proposed_txids, "{byte:02x}").unwrap(); + } + write!(&mut proposed_txids, "...").unwrap(); + for byte in txid_bytes[..4].iter().rev() { + write!(&mut proposed_txids, "{byte:02x}").unwrap(); + } + } + if idx + 1 < n_txs { + write!(&mut proposed_txids, ", ").unwrap(); + } + } + write!(&mut proposed_txids, "]").unwrap(); + proposed_txids + }; tracing::debug!(%proposed_txids, "Proposed txs for inclusion in block"); } initial_block_template @@ -435,9 +500,10 @@ async fn block_txs( } #[async_trait] -impl RpcServer for Server +impl RpcServer for Server where BP: CusfBlockProducer + Send + Sync + 'static, + RpcClient: bitcoin_jsonrpsee::client::MainClient + Send + Sync + 'static, { async fn get_block_template( &self, @@ -540,6 +606,24 @@ where let err = log_error(err); internal_error(err) })?; + let target = { + let known_target = + self.known_targets.read().get(&prev_blockhash).copied(); + if let Some(target) = known_target { + target + } else if let Some(target) = target { + target + } else { + let mining_info = self + .rpc_client + .get_mining_info() + .await + .map_err(internal_error)?; + let target = mining_info.next.target; + self.known_targets.write().insert(prev_blockhash, target); + target + } + }; let coinbase_txn_or_value = if let Some(coinbase_txn) = coinbase_txn { let fee = coinbase_txn .output @@ -566,10 +650,15 @@ where } else { coinbase_txn_or_value.clone() }; - let mintime = std::cmp::max( - tip_block_mediantime as u64 + 1, - current_time_adjusted, - ); + let mintime = + // TODO: calculate this correctly + /* + std::cmp::max( + tip_block_mediantime as u64 + 1, + current_time_adjusted, + ) + */ + tip_block_mediantime as u64 + 1; let height = tip_block_height + 1; let res = BlockTemplate { version, @@ -581,7 +670,7 @@ where coinbase_aux: coinbase_aux.clone(), coinbase_txn_or_value, long_poll_id: None, - target: target.to_le_bytes(), + target: target.to_be_bytes(), mintime, mutable: mutable.clone(), nonce_range: NONCE_RANGE, @@ -596,4 +685,17 @@ where }; Ok(res) } + + async fn submit_block( + &self, + block_hex: String, + ) -> RpcResult> { + self.rpc_client + .submit_block(block_hex) + .await + .map_err(|err| match err { + jsonrpsee::core::ClientError::Call(err) => err, + err => internal_error(err), + }) + } }