From 9977c0b20a00e59a135aad3b4c91d3e1d1e4d68a Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Tue, 12 Aug 2025 13:23:12 +0100 Subject: [PATCH] Nuke the crate structure The `p2p` related changes are moved into a separate repository to move quickly on iterations in that API. Two of these crates are small but stable in the accumulator and "peers" crate. Removing the `utxo_verifier` as a more interesting binary should be developed in the coming months. --- Cargo.toml | 4 +- accumulator/tests/test.rs | 2 +- contrib/{ => data}/bitcoin_headers.sqlite | Bin contrib/{ => data}/signet_headers.sqlite | Bin contrib/{ => data}/signet_outpoints.sqlite | Bin p2p/Cargo.toml | 40 -- p2p/README.md | 11 - p2p/examples/feeler.rs | 36 - p2p/examples/update_accumulator.rs | 112 --- p2p/src/lib.rs | 791 --------------------- p2p/src/net.rs | 247 ------- p2p/src/tokio_ext.rs | 314 -------- p2p/src/validation.rs | 25 - p2p/tests/tests.rs | 107 --- utxo_verifier/Cargo.toml | 14 - utxo_verifier/README.md | 24 - utxo_verifier/src/hasher.rs | 35 - utxo_verifier/src/job.rs | 172 ----- utxo_verifier/src/loader.rs | 62 -- utxo_verifier/src/main.rs | 100 --- utxo_verifier/utxo_to_sqlite.py | 195 ----- 21 files changed, 3 insertions(+), 2288 deletions(-) rename contrib/{ => data}/bitcoin_headers.sqlite (100%) rename contrib/{ => data}/signet_headers.sqlite (100%) rename contrib/{ => data}/signet_outpoints.sqlite (100%) delete mode 100644 p2p/Cargo.toml delete mode 100644 p2p/README.md delete mode 100644 p2p/examples/feeler.rs delete mode 100644 p2p/examples/update_accumulator.rs delete mode 100644 p2p/src/lib.rs delete mode 100644 p2p/src/net.rs delete mode 100644 p2p/src/tokio_ext.rs delete mode 100644 p2p/src/validation.rs delete mode 100644 p2p/tests/tests.rs delete mode 100644 utxo_verifier/Cargo.toml delete mode 100644 utxo_verifier/README.md delete mode 100644 utxo_verifier/src/hasher.rs delete mode 100644 utxo_verifier/src/job.rs delete mode 100644 utxo_verifier/src/loader.rs delete mode 100644 utxo_verifier/src/main.rs delete mode 100755 utxo_verifier/utxo_to_sqlite.py diff --git a/Cargo.toml b/Cargo.toml index e83c6df..337bf85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["accumulator", "p2p", "peers", "utxo_verifier"] -default-members = ["accumulator", "p2p", "peers", "utxo_verifier"] +members = ["accumulator", "peers"] +default-members = ["accumulator", "peers"] resolver = "3" [workspace.dependencies] diff --git a/accumulator/tests/test.rs b/accumulator/tests/test.rs index 2b46ee6..5a537c5 100644 --- a/accumulator/tests/test.rs +++ b/accumulator/tests/test.rs @@ -7,7 +7,7 @@ const SELECT_STMT: &str = "SELECT txid, vout FROM utxos"; #[test] fn test_static_utxo_set() { let mut acc = Accumulator::new(); - let conn = Connection::open("../contrib/signet_outpoints.sqlite").unwrap(); + let conn = Connection::open("../contrib/data/signet_outpoints.sqlite").unwrap(); let mut stmt = conn.prepare(SELECT_STMT).unwrap(); let mut rows = stmt.query([]).unwrap(); while let Some(row) = rows.next().unwrap() { diff --git a/contrib/bitcoin_headers.sqlite b/contrib/data/bitcoin_headers.sqlite similarity index 100% rename from contrib/bitcoin_headers.sqlite rename to contrib/data/bitcoin_headers.sqlite diff --git a/contrib/signet_headers.sqlite b/contrib/data/signet_headers.sqlite similarity index 100% rename from contrib/signet_headers.sqlite rename to contrib/data/signet_headers.sqlite diff --git a/contrib/signet_outpoints.sqlite b/contrib/data/signet_outpoints.sqlite similarity index 100% rename from contrib/signet_outpoints.sqlite rename to contrib/data/signet_outpoints.sqlite diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml deleted file mode 100644 index f978c4a..0000000 --- a/p2p/Cargo.toml +++ /dev/null @@ -1,40 +0,0 @@ -[package] -name = "swiftsync-p2p" -authors = ["Rob "] -version = "0.1.0" -description = "Bitcoin peer-to-peer connection tools" -edition = "2021" -keywords = ["bitcoin", "cryptography", "network", "peer-to-peer"] -categories = ["cryptography::cryptocurrencies"] -rust-version = "1.75.0" - -[dependencies] -bitcoin = { workspace = true, features = ["rand-std"] } -p2p = { package = "bitcoin-p2p-messages", git = "https://github.com/rust-bitcoin/rust-bitcoin", rev = "2bb9bb6bc99ba07ed3d543a512ec3d2a9462770d" } -tokio = { version = "1", default-features = false, optional = true, features = [ - "sync", - "io-util", - "time", - "net", -]} - -[dev-dependencies] -corepc-node = { version = "0.8.0", default-features = false, features = [ - "29_0", "download" -] } -tokio = { version = "1", default-features = false, features = ["full"] } -tracing = "0.1" -tracing-subscriber = "0.3" - -accumulator = { path = "../accumulator/" } -peers = { path = "../peers/", default-features = true } - -[features] -default = ["tokio"] -tokio = ["dep:tokio"] - -[[example]] -name = "update_accumulator" - -[[example]] -name = "feeler" diff --git a/p2p/README.md b/p2p/README.md deleted file mode 100644 index 827b554..0000000 --- a/p2p/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# Domain Specific Bitcoin Peer-to-Peer Crate - -This crate is intended to facilitate connections between two bitcoin nodes. Many messages exchanged over the bitcoin network are involved in version/feature negotiation. These message exchanges are abstracted away by the types of this crate, so users can focus on the messages that are important for the implementation. When reading messages, some simple validation is performed to omit spam or malformed messages. Writing invalid messages is also made difficult. Through the use of extension traits, one can read/write protocol messages directly to a TCP stream. - -## Example - -Try the example, which fetches a block and updates a SwiftSync accumulator: -``` -cargo run --example update_accumulator --release -``` - diff --git a/p2p/examples/feeler.rs b/p2p/examples/feeler.rs deleted file mode 100644 index 06c6ddd..0000000 --- a/p2p/examples/feeler.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::{net::SocketAddr, time::Duration}; - -use bitcoin::{ - secp256k1::rand::{seq::SliceRandom, thread_rng}, - Network, -}; -use peers::{dns::DnsQuery, PortExt, SeedsExt}; -use swiftsync_p2p::{net::ConnectionExt, ConnectionBuilder}; - -const NETWORK: Network = Network::Signet; - -fn main() { - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber).unwrap(); - tracing::info!("Finding a peer with DNS"); - let recommended_seed = NETWORK.dns_seeds().first().copied().unwrap(); - let dns = DnsQuery::new_cloudflare(recommended_seed).lookup().unwrap(); - let any = dns.choose(&mut thread_rng()).copied().unwrap(); - let socket_addr = SocketAddr::new(any, NETWORK.port()); - tracing::info!("Attempting a connection"); - let connection = ConnectionBuilder::new() - .set_user_agent("/bitcoin-feeler:0.1.0".to_string()) - .connection_timeout(Duration::from_millis(3500)) - .change_network(NETWORK) - .open_feeler(socket_addr); - match connection { - Ok(f) => { - tracing::info!( - "Connection successful: Advertised protocol version {}, Adveristed services {}", - f.protocol_version.0, - f.services - ); - } - Err(e) => tracing::warn!("Connection failed {e:?}"), - } -} diff --git a/p2p/examples/update_accumulator.rs b/p2p/examples/update_accumulator.rs deleted file mode 100644 index d564e6a..0000000 --- a/p2p/examples/update_accumulator.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - time::Instant, -}; - -use accumulator::Accumulator; -use bitcoin::{ - block::BlockUncheckedExt, - secp256k1::rand::{seq::SliceRandom, thread_rng}, - BlockHash, Network, OutPoint, -}; -use p2p::{ - message::NetworkMessage, - message_blockdata::{GetBlocksMessage, Inventory}, - ServiceFlags, -}; -use peers::{ - dns::{DnsQuery, TokioDnsExt}, - PortExt, -}; -use swiftsync_p2p::{ - tokio_ext::{TokioConnectionExt, TokioReadNetworkMessageExt, TokioWriteNetworkMessageExt}, - ConnectionBuilder, -}; - -const DNS_SEED: &str = "seed.bitcoin.sprovoost.nl"; -const NETWORK: Network = Network::Bitcoin; -const CLOUDFLARE: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 53); -const START_HEIGHT: i32 = 900_000; - -#[tokio::main] -async fn main() { - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber).unwrap(); - - let mut acc = Accumulator::new(); - let locator_hash = "000000000000000000010538edbfd2d5b809a33dd83f284aeea41c6d0d96968a" - .parse::() - .unwrap(); - let zero = BlockHash::from_byte_array([0u8; 32]); - tracing::info!("Configuring connection requirements"); - let connection_builder = ConnectionBuilder::new() - .change_network(NETWORK) - .add_start_height(START_HEIGHT) - .set_user_agent("/example-accumulator:0.1.0".to_string()) - .no_cmpct_blocks() - .announce_by_inv() - .their_services_expected(ServiceFlags::NETWORK); - tracing::info!("Querying DNS for peers"); - let dns = DnsQuery::new(DNS_SEED, CLOUDFLARE) - .lookup_async() - .await - .unwrap(); - tracing::info!("Connecting to the first result"); - let first = dns.choose(&mut thread_rng()).unwrap(); - let peer = SocketAddr::new(*first, NETWORK.port()); - let (mut stream, mut ctx) = connection_builder.open_connection(peer).await.unwrap(); - tracing::info!("Completed version handshake"); - let get_blocks_request = GetBlocksMessage::new(vec![locator_hash], zero); - let message = NetworkMessage::GetBlocks(get_blocks_request); - tracing::info!("Requesting blocks"); - stream.write_message(message, &mut ctx).await.unwrap(); - tracing::info!("Waiting for response"); - loop { - let response = stream.read_message(&mut ctx).await.unwrap(); - if let Some(message) = response { - match message { - NetworkMessage::Ping(nonce) => stream - .write_message(NetworkMessage::Pong(nonce), &mut ctx) - .await - .unwrap(), - NetworkMessage::Inv(data) => { - if data - .0 - .iter() - .any(|inv| matches!(inv, Inventory::Block(_) | Inventory::WitnessBlock(_))) - { - let getdata = NetworkMessage::GetData(data); - stream.write_message(getdata, &mut ctx).await.unwrap(); - } - } - NetworkMessage::Block(block) => { - let checked = block.validate().unwrap(); - let hash = checked.block_hash(); - tracing::info!("Validated block: {hash}"); - let now = Instant::now(); - tracing::info!("Updating the accumulator"); - for tx in checked.transactions() { - for input in &tx.input { - let outpoint = input.previous_output; - acc.spend(outpoint); - } - let txid = tx.compute_txid(); - for ind in 0..tx.output.len() { - let outpoint = OutPoint { - txid, - vout: ind as u32, - }; - acc.add(outpoint); - } - } - tracing::info!( - "Updated accumulator in {} milliseconds", - now.elapsed().as_millis() - ); - return; - } - other => tracing::info!("{}", other.cmd()), - } - } - } -} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs deleted file mode 100644 index 2eaf25b..0000000 --- a/p2p/src/lib.rs +++ /dev/null @@ -1,791 +0,0 @@ -use std::{ - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, -}; - -use bitcoin::{consensus, FeeRate, Network}; -use p2p::{ - message::{CommandString, NetworkMessage, RawNetworkMessage}, - message_network::VersionMessage, - Address, Magic, ServiceFlags, -}; -use validation::ValidationExt; - -/// Extension traits for `std` networking tools. -pub mod net; -/// Extension traits for use with the `tokio` asynchronous runtime framework. -#[cfg(feature = "tokio")] -pub mod tokio_ext; - -mod validation; - -/// The maximum network message size in bytes. -pub const MAX_MESSAGE_SIZE: u32 = 1024 * 1024 * 32; -/// The default user agent field when sending a `version` message. -pub const DEFAULT_USER_AGENT: &str = "/swiftsync:0.1.0/"; -const LOCAL_HOST: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0); -const UNREACHABLE: SocketAddr = SocketAddr::V4(SocketAddrV4::new(LOCAL_HOST, 0)); - -/// A version of the Bitcoin peer-to-peer messages. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, std::hash::Hash)] -pub struct ProtocolVerison(pub u32); - -impl ProtocolVerison { - /// Support for relaying transactions by WTXID - pub const WTXID_RELAY: ProtocolVerison = ProtocolVerison(70016); - /// Invalid compact blocks are not a ban - pub const NO_BAN_CMPCT: ProtocolVerison = ProtocolVerison(70015); - /// Compact block message support - pub const CMPCT_BLOCKS: ProtocolVerison = ProtocolVerison(70014); - /// Support the `feefilter` message - pub const FEE_FILTER: ProtocolVerison = ProtocolVerison(70013); - /// Support `sendheaders` message to advertise new blocks with `header` messages - pub const SEND_HEADERS: ProtocolVerison = ProtocolVerison(70012); - /// Support NODE_BLOOM messages and do not support bloom filter messages if not set - pub const NODE_BLOOM: ProtocolVerison = ProtocolVerison(70011); - /// Support `reject` messages - pub const REJECT: ProtocolVerison = ProtocolVerison(70002); - /// Support bloom filter messages - pub const BLOOM_FILTERS: ProtocolVerison = ProtocolVerison(70001); - /// Support `mempool` messages - pub const MEMPOOL: ProtocolVerison = ProtocolVerison(60002); - /// Support `ping` and `pong` messages - pub const PING_PONG: ProtocolVerison = ProtocolVerison(60001); -} - -/// The context for the connection. This includes data like the current cipher state, their -/// services offered, their fee filter, last message time, and more. -#[derive(Debug)] -pub struct ConnectionContext { - read_ctx: ReadContext, - write_ctx: WriteContext, -} - -impl ConnectionContext { - fn new( - write_half: WriteHalf, - read_half: ReadHalf, - negotiation: Negotiation, - their_services: ServiceFlags, - their_version: ProtocolVerison, - ) -> Self { - let read_ctx = ReadContext { - read_half, - negotiation, - fee_filter: FeeRate::BROADCAST_MIN, - last_message: Instant::now(), - final_alert: false, - addrs_received: 0, - }; - let write_ctx = WriteContext { - write_half, - negotiation, - their_services, - their_protocol_verison: their_version, - }; - Self { - read_ctx, - write_ctx, - } - } - - /// Split the connection context into reading and writing halves. This is particularly useful - /// if your program is writing messages to a peer on a different task or thread than the one - /// receiving messages. - pub fn into_split(self) -> (ReadContext, WriteContext) { - (self.read_ctx, self.write_ctx) - } - - /// The number of peer-to-peer addresses gossiped by this peer during this session. - pub fn addrs_received(&self) -> usize { - self.read_ctx.addrs_received - } - - /// The time of the last message received. - pub fn last_message(&self) -> Instant { - self.read_ctx.last_message - } - - /// The minimum fee rate required to relay a transaction to this peer. - pub fn fee_filter(&self) -> FeeRate { - self.read_ctx.fee_filter - } -} - -impl AsMut for ConnectionContext { - fn as_mut(&mut self) -> &mut ReadContext { - &mut self.read_ctx - } -} - -impl AsMut for ConnectionContext { - fn as_mut(&mut self) -> &mut WriteContext { - &mut self.write_ctx - } -} - -/// The context when reading a message from this peer. -#[derive(Debug)] -pub struct ReadContext { - read_half: ReadHalf, - negotiation: Negotiation, - fee_filter: FeeRate, - final_alert: bool, - last_message: Instant, - addrs_received: usize, -} - -impl ReadContext { - /// The number of peer-to-peer addresses gossiped by this peer during this session. - pub fn addrs_received(&self) -> usize { - self.addrs_received - } - - /// The time of the last message received. - pub fn last_message(&self) -> Instant { - self.last_message - } - - /// The minimum fee rate required to relay a transaction to this peer. - pub fn fee_filter(&self) -> FeeRate { - self.fee_filter - } - - fn ok_to_recv_message(&self, message: &NetworkMessage) -> bool { - if matches!( - message, - NetworkMessage::FilterClear - | NetworkMessage::FilterAdd(_) - | NetworkMessage::FilterLoad(_) - | NetworkMessage::WtxidRelay - | NetworkMessage::SendAddrV2 - | NetworkMessage::MemPool - | NetworkMessage::Verack - | NetworkMessage::Version(_) - ) { - return false; - } - if matches!(message, NetworkMessage::Alert(_)) { - return !self.final_alert; - } - true - } - - fn is_valid(&self, message: &NetworkMessage) -> bool { - match &message { - NetworkMessage::FeeFilter(f) => *f > 0, - NetworkMessage::Headers(h) => h.is_valid(), - NetworkMessage::GetData(r) => r.0.is_valid(), - NetworkMessage::Inv(r) => r.0.is_valid(), - _ => true, - } - } - - fn update_metadata(&mut self, message: &NetworkMessage) { - self.last_message = Instant::now(); - match &message { - NetworkMessage::FeeFilter(f) => { - let fee_rate = FeeRate::from_sat_per_kwu(*f as u32 / 4); - self.fee_filter = fee_rate; - } - NetworkMessage::Alert(_) => self.final_alert = true, - NetworkMessage::SendHeaders => { - self.negotiation.send_headers.them = true; - } - NetworkMessage::Addr(payload) => self.addrs_received += payload.0.len(), - NetworkMessage::AddrV2(payload) => self.addrs_received += payload.0.len(), - _ => (), - } - } -} - -/// The context when writing a message to this peer. The context will reject messages that should -/// not be sent, in particular any messages that should be exchanged during the version handshake. -/// -/// Additional messages that may be rejected are deprecated messages, or those that the peer has -/// not advertised support for. -#[derive(Debug)] -pub struct WriteContext { - write_half: WriteHalf, - negotiation: Negotiation, - their_services: ServiceFlags, - their_protocol_verison: ProtocolVerison, -} - -impl WriteContext { - fn ok_to_send(&self, message: &NetworkMessage) -> bool { - if matches!( - message, - NetworkMessage::FilterClear - | NetworkMessage::FilterAdd(_) - | NetworkMessage::FilterLoad(_) - | NetworkMessage::Alert(_) - | NetworkMessage::WtxidRelay - | NetworkMessage::SendAddrV2 - | NetworkMessage::SendCmpct(_) - | NetworkMessage::SendHeaders - | NetworkMessage::MemPool - | NetworkMessage::Verack - | NetworkMessage::Version(_) - ) { - return false; - } - if matches!(message, NetworkMessage::Addr(_)) && self.negotiation.addrv2.agree() { - return false; - } - if matches!( - message, - NetworkMessage::BlockTxn(_) | NetworkMessage::CmpctBlock(_) - ) && !self.negotiation.cmpct_block.agree() - { - return false; - } - if matches!( - message, - NetworkMessage::GetCFilters(_) - | NetworkMessage::GetCFCheckpt(_) - | NetworkMessage::GetCFHeaders(_) - ) && !self.their_services.has(ServiceFlags::COMPACT_FILTERS) - { - return false; - } - true - } -} - -/// Build a connection with a bitcoin peer based on the desired preferences. -/// -/// The state of the connection builder is defined with the latest protocol version. You must -/// downgrade the version you will accept or your offered version with the methods on the builder. -#[derive(Debug)] -pub struct ConnectionBuilder { - network: Network, - our_ip: SocketAddr, - offered_services: ServiceFlags, - their_services: ServiceFlags, - our_version: ProtocolVerison, - their_version: ProtocolVerison, - offer: Offered, - start_height: i32, - user_agent: String, - tcp_timeout: Duration, -} - -impl ConnectionBuilder { - /// Start a new connection builder. Note that the default network is `Bitcoin`. - pub fn new() -> Self { - Self { - network: Network::Bitcoin, - our_ip: UNREACHABLE, - offered_services: ServiceFlags::NONE, - their_services: ServiceFlags::NONE, - our_version: ProtocolVerison::WTXID_RELAY, - their_version: ProtocolVerison::WTXID_RELAY, - offer: Offered::default(), - start_height: 0, - user_agent: DEFAULT_USER_AGENT.to_string(), - tcp_timeout: Duration::from_secs(2), - } - } - - /// The services you will offer the peer. The default is `ServiceFlags::NONE`. - pub fn offered_services(self, us: ServiceFlags) -> Self { - Self { - offered_services: us, - ..self - } - } - - /// The services you expect the node to offer, for example `ServiceFlags::NETWORK`. - pub fn their_services_expected(self, them: ServiceFlags) -> Self { - Self { - their_services: them, - ..self - } - } - - /// Downgrade your advertised version. - pub fn downgrade_to_version(self, us: ProtocolVerison) -> Self { - Self { - our_version: us, - ..self - } - } - - /// Accept a minimum version. - pub fn accept_minimum_version(self, them: ProtocolVerison) -> Self { - Self { - their_version: them, - ..self - } - } - - /// Advertise a starting height of your local chain. - pub fn add_start_height(self, start_height: i32) -> Self { - Self { - start_height, - ..self - } - } - - /// Add a timeout to the initial TCP connection. - pub fn connection_timeout(self, timeout: Duration) -> Self { - Self { - tcp_timeout: timeout, - ..self - } - } - - /// Set the user agent sent as part of your version message. - pub fn set_user_agent(self, user_agent: String) -> Self { - Self { user_agent, ..self } - } - - /// Change the network of preference. - pub fn change_network(self, network: Network) -> Self { - Self { network, ..self } - } - - /// Do not advertise support for `sendcmpct`. - pub fn no_cmpct_blocks(mut self) -> Self { - self.offer.cmpct_block = false; - Self { ..self } - } - - /// Prefer that peers advertise new blocks by an `inv` message. - /// - /// Otherwise, a `headers` message will be used to share new blocks. - pub fn announce_by_inv(mut self) -> Self { - self.offer.send_headers = false; - Self { ..self } - } - - /// Set the local IP address sent in your version message. The default is `0.0.0.0:0` - pub fn set_local_ip(self, us: SocketAddr) -> Self { - Self { our_ip: us, ..self } - } -} - -impl Default for ConnectionBuilder { - fn default() -> Self { - Self::new() - } -} - -#[derive(Debug)] -enum WriteHalf { - V1(Magic), -} - -impl WriteHalf { - fn serialize_message(&mut self, message: NetworkMessage) -> Vec { - match self { - Self::V1(magic) => { - let raw = RawNetworkMessage::new(*magic, message); - consensus::serialize(&raw) - } - } - } -} - -#[derive(Debug)] -enum ReadHalf { - V1(Magic), -} - -#[derive(Debug, Clone, Copy, Default)] -struct Negotiation { - wtxid_relay: TwoParty, - addrv2: TwoParty, - cmpct_block: TwoParty, - send_headers: TwoParty, -} - -#[derive(Debug, Clone, Copy, Default)] -struct TwoParty { - us: bool, - them: bool, -} - -impl TwoParty { - fn agree(&self) -> bool { - self.us && self.them - } -} - -#[derive(Debug)] -struct Offered { - wtxid_relay: bool, - addrv2: bool, - cmpct_block: bool, - send_headers: bool, -} - -impl Default for Offered { - fn default() -> Self { - Self { - wtxid_relay: true, - addrv2: true, - cmpct_block: true, - send_headers: true, - } - } -} - -#[derive(Debug, Clone, Copy)] -pub struct Feeler { - pub services: ServiceFlags, - pub protocol_version: ProtocolVerison, -} - -pub(crate) struct MessageHeader { - magic: Magic, - _command: CommandString, - length: u32, - _checksum: u32, -} - -impl consensus::Decodable for MessageHeader { - fn consensus_decode( - reader: &mut R, - ) -> Result { - let magic = Magic::consensus_decode(reader)?; - let _command = CommandString::consensus_decode(reader)?; - let length = u32::consensus_decode(reader)?; - let _checksum = u32::consensus_decode(reader)?; - Ok(Self { - magic, - _command, - length, - _checksum, - }) - } -} - -fn make_version( - version: ProtocolVerison, - our_services: ServiceFlags, - their_services: ServiceFlags, - our_ip: SocketAddr, - start_height: i32, - user_agent: String, - nonce: u64, -) -> VersionMessage { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() as i64; - let them = Address::new(&UNREACHABLE, their_services); - let us = Address::new(&our_ip, our_services); - VersionMessage { - version: version.0, - services: our_services, - timestamp: now, - receiver: them, - sender: us, - nonce, - user_agent, - start_height, - relay: false, - } -} - -#[allow(clippy::result_large_err)] -fn interpret_first_message( - message: NetworkMessage, - nonce: u64, - their_expected_version: ProtocolVerison, - their_expected_services: ServiceFlags, -) -> Result<(ProtocolVerison, ServiceFlags), HandshakeError> { - if let NetworkMessage::Version(version) = message { - if version.nonce.eq(&nonce) { - return Err(HandshakeError::ConnectedToSelf); - } - if version.version < their_expected_version.0 { - return Err(HandshakeError::TooLowVersion(ProtocolVerison( - version.version, - ))); - } - if !version.services.has(their_expected_services) { - return Err(HandshakeError::UnsupportedFeature); - } - Ok((ProtocolVerison(version.version), version.services)) - } else { - Err(HandshakeError::IrrelevantMessage(message)) - } -} - -/// Errors when parsing a peer-to-peer message. -#[derive(Debug)] -pub enum ParseMessageError { - /// The magic received does not match our network. - UnexpectedMagic { want: Magic, got: Magic }, - /// The reported size of the inbound message is absurdly large. - AbsurdSize { - /// The size of the message in bytes. - message_size: u32, - }, - /// Invalid consensus encoding. - Consensus(consensus::ParseError), - /// Invalid deserialization. - Deserialize(consensus::encode::DeserializeError), - /// The message is malformed in some way. For example the block headers advertised do not connect to - /// each other. - Malformed, -} - -impl From for ParseMessageError { - fn from(value: bitcoin::consensus::ParseError) -> Self { - Self::Consensus(value) - } -} - -impl From for ParseMessageError { - fn from(value: consensus::encode::DeserializeError) -> Self { - Self::Deserialize(value) - } -} - -impl std::fmt::Display for ParseMessageError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Deserialize(d) => write!(f, "{d}"), - Self::Consensus(c) => write!(f, "{c}"), - Self::AbsurdSize { message_size } => write!(f, "absurd message size: {message_size}"), - Self::UnexpectedMagic { want, got } => write!(f, "expected magic: {want}, got: {got}"), - Self::Malformed => write!(f, "malformed message"), - } - } -} - -impl std::error::Error for ParseMessageError {} - -/// A protocol handshake error. -#[derive(Debug, Clone)] -pub enum HandshakeError { - /// Their version is too low for the configured preferences. - TooLowVersion(ProtocolVerison), - /// Some message was sent before the handshake completed. - IrrelevantMessage(NetworkMessage), - /// This is a connection to self. - ConnectedToSelf, - /// The peer sent a decoy as the first protocol message. - BadDecoy, - /// The node does not support a feature we require. - UnsupportedFeature, - /// The TCP connection timed out. - Timeout, -} - -impl std::fmt::Display for HandshakeError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Timeout => write!(f, "connection timeout"), - Self::IrrelevantMessage(m) => { - write!(f, "unexpected message during handshake: {}", m.cmd()) - } - Self::ConnectedToSelf => write!(f, "accidental connection to self"), - Self::BadDecoy => write!(f, "expected a message but got a decoy"), - Self::UnsupportedFeature => write!( - f, - "a feature we require is not supported by the connection." - ), - Self::TooLowVersion(version) => { - write!(f, "the remote peer had a too-low version: {}", version.0) - } - } - } -} - -impl std::error::Error for HandshakeError {} - -#[macro_export] -macro_rules! define_read_message_logic { - ($awaiter:ident, $reader:expr, $magic:expr) => {{ - macro_rules! read { - ($buffer:expr) => { - $awaiter!($reader.read_exact($buffer)) - }; - } - - let mut message_buf = vec![0_u8; 24]; - read!(&mut message_buf)?; - let header: $crate::MessageHeader = consensus::deserialize_partial(&message_buf) - .map_err(ParseMessageError::Consensus)? - .0; - if header.magic != $magic { - return Err($crate::ParseMessageError::UnexpectedMagic { - want: header.magic, - got: header.magic, - } - .into()); - } - if header.length > $crate::MAX_MESSAGE_SIZE { - return Err(ParseMessageError::AbsurdSize { - message_size: header.length, - } - .into()); - } - let mut contents_buf = vec![0_u8; header.length as usize]; - read!(&mut contents_buf)?; - message_buf.extend_from_slice(&contents_buf); - let message: RawNetworkMessage = - consensus::deserialize(&message_buf).map_err(ParseMessageError::Deserialize)?; - Ok(Some(message.into_payload())) - }}; -} - -#[macro_export] -macro_rules! define_version_message_logic { - ($awaiter:ident, $reader:expr, $conn:expr) => {{ - macro_rules! write_message { - ($line:expr) => { - $awaiter!($line) - }; - } - - let mut negotiation = Negotiation::default(); - let magic = Magic::from_params($conn.network).expect("unknown network"); - let mut write_half = WriteHalf::V1(magic); - let mut read_half = ReadHalf::V1(magic); - let nonce = rand::random(); - let version = NetworkMessage::Version(make_version( - $conn.our_version, - $conn.offered_services, - $conn.their_services, - $conn.our_ip, - $conn.start_height, - $conn.user_agent, - nonce, - )); - write_message!(write_message(&mut $reader, version, &mut write_half))?; - let version = $awaiter!(read_half.read_message(&mut $reader))?; - match version { - Some(version) => { - let (protocol, services) = interpret_first_message( - version, - nonce, - $conn.their_version, - $conn.their_services, - ) - .map_err(ConnectionError::Protocol)?; - $conn.their_services = services; - $conn.their_version = protocol; - } - None => { - return Err(ConnectionError::Protocol(HandshakeError::BadDecoy)); - } - } - if $conn.offer.addrv2 { - write_message!(write_message( - &mut $reader, - NetworkMessage::SendAddrV2, - &mut write_half - ))?; - } - if $conn.offer.wtxid_relay { - write_message!(write_message( - &mut $reader, - NetworkMessage::WtxidRelay, - &mut write_half - ))?; - } - loop { - let message = $awaiter!(read_half.read_message(&mut $reader))?; - match message { - Some(message) => match message { - NetworkMessage::Verack => break, - NetworkMessage::WtxidRelay => negotiation.wtxid_relay.them = true, - NetworkMessage::SendHeaders => negotiation.send_headers.them = true, - NetworkMessage::SendAddrV2 => negotiation.addrv2.them = true, - NetworkMessage::SendCmpct(_) => negotiation.cmpct_block.them = true, - other => { - return Err(ConnectionError::Protocol( - HandshakeError::IrrelevantMessage(other), - )); - } - }, - None => continue, - } - } - write_message!(write_message( - &mut $reader, - NetworkMessage::Verack, - &mut write_half - ))?; - if $conn.offer.cmpct_block { - write_message!(write_message( - &mut $reader, - NetworkMessage::SendCmpct(SendCmpct { - version: 0x02, - send_compact: true, - }), - &mut write_half, - ))?; - } - if $conn.offer.send_headers { - write_message!(write_message( - &mut $reader, - NetworkMessage::SendHeaders, - &mut write_half, - ))?; - } - let context = ConnectionContext::new( - write_half, - read_half, - negotiation, - $conn.their_services, - $conn.their_version, - ); - Ok(($reader, context)) - }}; -} - -#[cfg(feature = "tokio")] -macro_rules! async_awaiter { - ($e:expr) => { - $e.await - }; -} - -macro_rules! blocking_awaiter { - ($e:expr) => { - $e - }; -} - -#[cfg(feature = "tokio")] -macro_rules! read_message_async { - ($reader:expr, $magic:expr) => { - $crate::define_read_message_logic!(async_awaiter, $reader, $magic) - }; -} - -macro_rules! read_message_blocking { - ($reader:expr, $magic:expr) => { - $crate::define_read_message_logic!(blocking_awaiter, $reader, $magic) - }; -} - -macro_rules! version_handshake_blocking { - ($reader:expr, $conn:ident) => { - $crate::define_version_message_logic!(blocking_awaiter, $reader, $conn) - }; -} - -#[cfg(feature = "tokio")] -macro_rules! version_handshake_async { - ($reader:expr, $conn:ident) => { - $crate::define_version_message_logic!(async_awaiter, $reader, $conn) - }; -} - -#[cfg(feature = "tokio")] -pub(crate) use async_awaiter; -pub(crate) use blocking_awaiter; -#[cfg(feature = "tokio")] -pub(crate) use read_message_async; -pub(crate) use read_message_blocking; -#[cfg(feature = "tokio")] -pub(crate) use version_handshake_async; -pub(crate) use version_handshake_blocking; diff --git a/p2p/src/net.rs b/p2p/src/net.rs deleted file mode 100644 index d844d85..0000000 --- a/p2p/src/net.rs +++ /dev/null @@ -1,247 +0,0 @@ -use std::{ - io::{self, Read, Write}, - net::{SocketAddr, TcpStream}, -}; - -use bitcoin::consensus; -use bitcoin::secp256k1::rand; -use p2p::message::NetworkMessage; -use p2p::message::RawNetworkMessage; -use p2p::message_compact_blocks::SendCmpct; -use p2p::Magic; - -use crate::{ - blocking_awaiter, interpret_first_message, make_version, version_handshake_blocking, - ConnectionBuilder, ConnectionContext, Feeler, HandshakeError, Negotiation, ParseMessageError, - ReadContext, ReadHalf, WriteContext, WriteHalf, -}; - -/// Open a connection to a potential peer. -#[allow(clippy::result_large_err)] -pub trait ConnectionExt { - /// Create a connection to the socket address - fn open_connection( - self, - to: impl Into, - ) -> Result<(TcpStream, ConnectionContext), ConnectionError>; - - /// Start a handshake with a pre-existing connection. Normally used after establishing a Socks5 - /// proxy connection. - fn start_handshake( - self, - tcp_stream: TcpStream, - ) -> Result<(TcpStream, ConnectionContext), ConnectionError>; - - /// Open a "feeler" connection to test if the peer is online and update their services and - /// protocol version. - fn open_feeler(self, to: impl Into) -> Result; -} - -impl ConnectionExt for ConnectionBuilder { - fn open_connection( - mut self, - to: impl Into, - ) -> Result<(TcpStream, ConnectionContext), ConnectionError> { - let socket_addr = to.into(); - let mut tcp_stream = TcpStream::connect_timeout(&socket_addr, self.tcp_timeout)?; - // Make V2 connection - version_handshake_blocking!(tcp_stream, self) - } - - fn start_handshake( - mut self, - mut tcp_stream: TcpStream, - ) -> Result<(TcpStream, ConnectionContext), ConnectionError> { - version_handshake_blocking!(tcp_stream, self) - } - - fn open_feeler(mut self, to: impl Into) -> Result { - let socket_addr = to.into(); - let mut tcp_stream = TcpStream::connect_timeout(&socket_addr, self.tcp_timeout)?; - let res: Result<(TcpStream, ConnectionContext), ConnectionError> = - version_handshake_blocking!(tcp_stream, self); - let (_, ctx) = res?; - let (_, wtx) = ctx.into_split(); - let services = wtx.their_services; - let protocol_version = wtx.their_protocol_verison; - Ok(Feeler { - services, - protocol_version, - }) - } -} - -#[allow(clippy::result_large_err)] -trait ReadHalfExt { - fn read_message( - &mut self, - reader: &mut R, - ) -> Result, ReadError>; -} - -impl ReadHalfExt for ReadHalf { - fn read_message( - &mut self, - reader: &mut R, - ) -> Result, ReadError> { - match self { - Self::V1(magic) => { - crate::read_message_blocking!(reader, *magic) - } - } - } -} - -/// Write a network message directly to a stream. -#[allow(clippy::result_large_err)] -pub trait WriteExt { - fn write_message( - &mut self, - message: NetworkMessage, - ctx: impl AsMut, - ) -> Result<(), WriteError>; -} - -impl WriteExt for TcpStream { - fn write_message( - &mut self, - message: NetworkMessage, - mut ctx: impl AsMut, - ) -> Result<(), WriteError> { - let ctx = ctx.as_mut(); - if !ctx.ok_to_send(&message) { - return Err(WriteError::NotRecommended(message)); - } - write_message(self, message, &mut ctx.write_half).map_err(WriteError::Io) - } -} - -fn write_message( - tcp_stream: &mut W, - message: NetworkMessage, - write_half: &mut WriteHalf, -) -> Result<(), io::Error> { - let msg_bytes = write_half.serialize_message(message); - tcp_stream.write_all(&msg_bytes)?; - tcp_stream.flush()?; - Ok(()) -} - -/// Read a message directly off of the stream. -pub trait ReadExt { - #[allow(clippy::result_large_err)] - fn read_message( - &mut self, - ctx: impl AsMut, - ) -> Result, ReadError>; -} - -impl ReadExt for TcpStream { - fn read_message( - &mut self, - mut ctx: impl AsMut, - ) -> Result, ReadError> { - let ctx = ctx.as_mut(); - let message = ctx.read_half.read_message(self)?; - match message { - Some(message) => { - if !ctx.ok_to_recv_message(&message) { - return Err(ReadError::NonsenseMessage(message)); - } - if !ctx.is_valid(&message) { - return Err(ReadError::ParseMessageError(ParseMessageError::Malformed)); - } - ctx.update_metadata(&message); - Ok(Some(message)) - } - None => Ok(None), - } - } -} - -// Error implementations - -/// Errors occurring during an attempted connection. -#[derive(Debug)] -pub enum ConnectionError { - Io(io::Error), - Protocol(HandshakeError), - Reader(ReadError), -} - -impl From for ConnectionError { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} - -impl From for ConnectionError { - fn from(value: ReadError) -> Self { - Self::Reader(value) - } -} - -impl From for ConnectionError { - fn from(value: HandshakeError) -> Self { - Self::Protocol(value) - } -} - -impl std::fmt::Display for ConnectionError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Protocol(p) => write!(f, "{p}"), - Self::Io(io) => write!(f, "{io}"), - Self::Reader(r) => write!(f, "{r}"), - } - } -} - -impl std::error::Error for ConnectionError {} - -pub enum WriteError { - Io(io::Error), - NotRecommended(NetworkMessage), -} - -impl From for WriteError { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} - -/// Errors when reading messages off of the stream. -#[derive(Debug)] -pub enum ReadError { - /// The message violates the protocol. Normally, these are deprecated messages or messages that - /// should have been sent during the handshake. - NonsenseMessage(NetworkMessage), - /// Parsing a message failed. - ParseMessageError(ParseMessageError), - /// The stream was closed or reset. - Io(io::Error), -} - -impl std::fmt::Display for ReadError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::ParseMessageError(r) => write!(f, "{r}"), - Self::Io(io) => write!(f, "{io}"), - Self::NonsenseMessage(n) => write!(f, "{}", n.cmd()), - } - } -} - -impl std::error::Error for ReadError {} - -impl From for ReadError { - fn from(value: ParseMessageError) -> Self { - Self::ParseMessageError(value) - } -} - -impl From for ReadError { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} diff --git a/p2p/src/tokio_ext.rs b/p2p/src/tokio_ext.rs deleted file mode 100644 index e4e38fb..0000000 --- a/p2p/src/tokio_ext.rs +++ /dev/null @@ -1,314 +0,0 @@ -use ::std::fmt::{Debug, Display}; -use std::net::SocketAddr; - -use bitcoin::consensus; -use bitcoin::secp256k1::rand; -use p2p::message::{NetworkMessage, RawNetworkMessage}; -use p2p::message_compact_blocks::SendCmpct; -use p2p::Magic; -use tokio::io::AsyncWriteExt; -use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; -use tokio::{ - io::{self, AsyncReadExt}, - net::TcpStream, -}; - -use crate::{ - async_awaiter, interpret_first_message, make_version, version_handshake_async, - ConnectionBuilder, ConnectionContext, Feeler, HandshakeError, Negotiation, ParseMessageError, - ReadContext, ReadHalf, WriteContext, WriteHalf, -}; - -/// Connect to peers using `tokio`. -pub trait TokioConnectionExt { - type Error: Debug + Display + Send + Sync + std::error::Error; - - /// Open a TCP connection to a peer. - #[allow(async_fn_in_trait)] - async fn open_connection( - self, - to: impl Into, - ) -> Result<(TcpStream, ConnectionContext), Self::Error>; - - /// Start a handshake with a pre-existing connection. Normally used after establishing a Socks5 - /// proxy connection. - #[allow(async_fn_in_trait)] - async fn start_handshake( - self, - tcp_stream: TcpStream, - ) -> Result<(TcpStream, ConnectionContext), Self::Error>; - - /// Open a feeler to test a node's liveliness - #[allow(async_fn_in_trait)] - async fn open_feeler(self, to: impl Into) -> Result; -} - -impl TokioConnectionExt for ConnectionBuilder { - type Error = ConnectionError; - - async fn open_connection( - mut self, - to: impl Into, - ) -> Result<(TcpStream, ConnectionContext), Self::Error> { - let socket_addr = to.into(); - let timeout = tokio::time::timeout(self.tcp_timeout, TcpStream::connect(socket_addr)).await; - let mut tcp_stream = - timeout.map_err(|_| ConnectionError::Protocol(HandshakeError::Timeout))??; - version_handshake_async!(tcp_stream, self) - } - - async fn start_handshake( - mut self, - mut tcp_stream: TcpStream, - ) -> Result<(TcpStream, ConnectionContext), Self::Error> { - version_handshake_async!(tcp_stream, self) - } - - async fn open_feeler(mut self, to: impl Into) -> Result { - let socket_addr = to.into(); - let timeout = tokio::time::timeout(self.tcp_timeout, TcpStream::connect(socket_addr)).await; - let mut tcp_stream = - timeout.map_err(|_| ConnectionError::Protocol(HandshakeError::Timeout))??; - let res: Result<(TcpStream, ConnectionContext), Self::Error> = - version_handshake_async!(tcp_stream, self); - let (_, ctx) = res?; - let services = ctx.write_ctx.their_services; - let protocol_version = ctx.write_ctx.their_protocol_verison; - Ok(Feeler { - services, - protocol_version, - }) - } -} - -async fn write_message( - write: &mut W, - message: NetworkMessage, - write_half: &mut WriteHalf, -) -> Result<(), io::Error> { - let msg_bytes = write_half.serialize_message(message); - write.write_all(&msg_bytes).await?; - write.flush().await?; - Ok(()) -} - -trait TokioTransportExt { - #[allow(async_fn_in_trait)] - async fn read_message( - &mut self, - reader: &mut R, - ) -> Result, ReadError>; -} - -impl TokioTransportExt for ReadHalf { - async fn read_message( - &mut self, - reader: &mut R, - ) -> Result, ReadError> { - match self { - Self::V1(magic) => { - crate::read_message_async!(reader, *magic) - } - } - } -} - -/// Write bitcoin network messages directly over `tokio` TCP streams. -pub trait TokioWriteNetworkMessageExt { - /// Write a message with the current context. - #[allow(async_fn_in_trait)] - async fn write_message( - &mut self, - message: NetworkMessage, - ctx: impl AsMut, - ) -> Result<(), WriteError>; -} - -impl TokioWriteNetworkMessageExt for TcpStream { - fn write_message( - &mut self, - message: NetworkMessage, - ctx: impl AsMut, - ) -> impl std::future::Future> { - write_for_any(self, message, ctx) - } -} - -impl TokioWriteNetworkMessageExt for OwnedWriteHalf { - fn write_message( - &mut self, - message: NetworkMessage, - ctx: impl AsMut, - ) -> impl std::future::Future> { - write_for_any(self, message, ctx) - } -} - -async fn write_for_any( - writer: &mut W, - message: NetworkMessage, - mut ctx: impl AsMut, -) -> Result<(), WriteError> { - let ctx = ctx.as_mut(); - if !ctx.ok_to_send(&message) { - return Err(WriteError::NotRecommended(message)); - }; - write_message(writer, message, &mut ctx.write_half).await?; - Ok(()) -} - -/// Read a message directly off a TCP stream. -pub trait TokioReadNetworkMessageExt { - /// Try to read a message and error otherwise. - /// - /// This method performs some light validation to ensure the node is not sending spam or - /// non-sensical messages. - #[allow(async_fn_in_trait)] - async fn read_message( - &mut self, - ctx: impl AsMut, - ) -> Result, ReadError>; -} - -impl TokioReadNetworkMessageExt for TcpStream { - async fn read_message( - &mut self, - mut rtx: impl AsMut, - ) -> Result, ReadError> { - let ctx = rtx.as_mut(); - let message = ctx.read_half.read_message(self).await?; - match message { - Some(message) => { - if !ctx.ok_to_recv_message(&message) { - return Err(ReadError::NonsenseMessage(message)); - } - if !ctx.is_valid(&message) { - return Err(ReadError::ParseMessageError(ParseMessageError::Malformed)); - } - ctx.update_metadata(&message); - Ok(Some(message)) - } - None => Ok(None), - } - } -} - -impl TokioReadNetworkMessageExt for OwnedReadHalf { - async fn read_message( - &mut self, - mut rtx: impl AsMut, - ) -> Result, ReadError> { - let ctx = rtx.as_mut(); - let message = ctx.read_half.read_message(self).await?; - match message { - Some(message) => { - if !ctx.ok_to_recv_message(&message) { - return Err(ReadError::NonsenseMessage(message)); - } - if !ctx.is_valid(&message) { - return Err(ReadError::ParseMessageError(ParseMessageError::Malformed)); - } - ctx.update_metadata(&message); - Ok(Some(message)) - } - None => Ok(None), - } - } -} - -// Error implementation section - -/// Errors that may occur when starting a connection. -#[derive(Debug)] -pub enum ConnectionError { - /// Read or write failure. - Io(io::Error), - /// The handshake failed to malformed messages or a mismatch in preferences. - Protocol(HandshakeError), - /// A message that was read violated the protocol. - Reader(ReadError), -} - -impl From for ConnectionError { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} - -impl From for ConnectionError { - fn from(value: ReadError) -> Self { - Self::Reader(value) - } -} - -impl Display for ConnectionError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Io(io) => write!(f, "{io}"), - Self::Protocol(proto) => write!(f, "{proto}"), - Self::Reader(read) => write!(f, "{read}"), - } - } -} - -impl std::error::Error for ConnectionError {} - -/// Errors when attempting to write a message. -#[derive(Debug)] -pub enum WriteError { - /// Writing to the stream failed. - Io(io::Error), - /// The message is invalid or not supported. - NotRecommended(NetworkMessage), -} - -impl From for WriteError { - fn from(value: io::Error) -> Self { - WriteError::Io(value) - } -} - -impl Display for WriteError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Io(io) => write!(f, "{io}"), - Self::NotRecommended(msg) => write!(f, "non-sensical message: {}", msg.cmd()), - } - } -} - -/// Errors when reading messages off of the stream. -#[derive(Debug)] -pub enum ReadError { - /// The message violates the protocol. Normally, these are deprecated messages or messages that - /// should have been sent during the handshake. - NonsenseMessage(NetworkMessage), - /// Parsing a message failed. - ParseMessageError(ParseMessageError), - /// The stream was closed. - Io(io::Error), -} - -impl Display for ReadError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::ParseMessageError(r) => write!(f, "{r}"), - Self::Io(io) => write!(f, "{io}"), - Self::NonsenseMessage(n) => write!(f, "{}", n.cmd()), - } - } -} - -impl std::error::Error for ReadError {} - -impl From for ReadError { - fn from(value: ParseMessageError) -> Self { - Self::ParseMessageError(value) - } -} - -impl From for ReadError { - fn from(value: io::Error) -> Self { - Self::Io(value) - } -} diff --git a/p2p/src/validation.rs b/p2p/src/validation.rs deleted file mode 100644 index 57686ef..0000000 --- a/p2p/src/validation.rs +++ /dev/null @@ -1,25 +0,0 @@ -use bitcoin::{block::HeaderExt, BlockHeader}; -use p2p::message_blockdata::Inventory; - -pub(crate) trait ValidationExt { - fn is_valid(&self) -> bool; -} - -impl ValidationExt for Vec { - fn is_valid(&self) -> bool { - self.iter() - .zip(self.iter().skip(1)) - .all(|(first, second)| first.block_hash().eq(&second.prev_blockhash)) - && !self.iter().any(|header| { - let target = header.target(); - let valid_pow = header.validate_pow(target); - valid_pow.is_err() - }) - } -} - -impl ValidationExt for Vec { - fn is_valid(&self) -> bool { - self.len() < 50_000 - } -} diff --git a/p2p/tests/tests.rs b/p2p/tests/tests.rs deleted file mode 100644 index 32fb64f..0000000 --- a/p2p/tests/tests.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::net::SocketAddrV4; - -use bitcoin::{BlockHash, Network}; -use corepc_node::{exe_path, P2P}; -use p2p::{message::NetworkMessage, ServiceFlags}; -use swiftsync_p2p::ConnectionBuilder; - -#[derive(Debug, Clone)] -struct TestNodeBuilder<'a> { - conf: corepc_node::Conf<'a>, -} - -impl<'a> TestNodeBuilder<'a> { - fn new() -> Self { - let mut conf = corepc_node::Conf::default(); - conf.p2p = P2P::Yes; - conf.args.push("--listen=1"); - conf.args.push("--rest=1"); - conf.args.push("--server=1"); - Self { conf } - } - - fn push_arg(mut self, arg: &'a str) -> Self { - self.conf.args.push(arg); - self - } - - fn start(self) -> (corepc_node::Node, SocketAddrV4) { - let path = exe_path().unwrap(); - let bitcoind = corepc_node::Node::with_conf(path, &self.conf).unwrap(); - let socket_addr = bitcoind.params.p2p_socket.unwrap(); - (bitcoind, socket_addr) - } -} - -// Standard library tests - -#[test] -fn does_handshake() { - use swiftsync_p2p::net::ConnectionExt; - let (mut bitcoind, socket_addr) = TestNodeBuilder::new().start(); - let _ = ConnectionBuilder::new() - .change_network(Network::Regtest) - .open_connection(socket_addr) - .unwrap(); - bitcoind.stop().unwrap(); -} - -#[test] -fn filters_unsupported_messages() { - use swiftsync_p2p::net::{ConnectionExt, WriteExt}; - let (mut bitcoind, socket_addr) = TestNodeBuilder::new().start(); - let (mut tcp_stream, mut ctx) = ConnectionBuilder::new() - .change_network(Network::Regtest) - .open_connection(socket_addr) - .unwrap(); - let err = tcp_stream.write_message(NetworkMessage::Verack, &mut ctx); - assert!(err.is_err()); - let err = tcp_stream.write_message(NetworkMessage::MemPool, &mut ctx); - assert!(err.is_err()); - let err = tcp_stream.write_message( - NetworkMessage::GetCFilters(p2p::message_filter::GetCFilters { - filter_type: 0x00, - start_height: 0.into(), - stop_hash: BlockHash::from_byte_array([0; 32]), - }), - &mut ctx, - ); - assert!(err.is_err()); - bitcoind.stop().unwrap(); -} - -#[test] -fn enforces_desired_services() { - use swiftsync_p2p::net::ConnectionExt; - let (mut bitcoind, socket_addr) = TestNodeBuilder::new().start(); - let err = ConnectionBuilder::new() - .change_network(Network::Regtest) - .their_services_expected(ServiceFlags::COMPACT_FILTERS) - .open_connection(socket_addr); - assert!(err.is_err()); - bitcoind.stop().unwrap(); - let (mut bitcoind, socket_addr) = TestNodeBuilder::new() - .push_arg("--blockfilterindex") - .push_arg("--peerblockfilters") - .start(); - let ok = ConnectionBuilder::new() - .change_network(Network::Regtest) - .their_services_expected(ServiceFlags::COMPACT_FILTERS) - .open_connection(socket_addr); - assert!(ok.is_ok()); - bitcoind.stop().unwrap(); -} - -// Tokio tests - -#[tokio::test] -async fn does_handshake_async() { - use swiftsync_p2p::tokio_ext::TokioConnectionExt; - let (mut bitcoind, socket_addr) = TestNodeBuilder::new().start(); - let _ = ConnectionBuilder::new() - .change_network(Network::Regtest) - .open_connection(socket_addr) - .await - .unwrap(); - bitcoind.stop().unwrap(); -} diff --git a/utxo_verifier/Cargo.toml b/utxo_verifier/Cargo.toml deleted file mode 100644 index 421d2f5..0000000 --- a/utxo_verifier/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "utxo_verifier" -version = "0.1.0" -edition = "2024" - -[dependencies] -accumulator = { path = "../accumulator/" } -bitcoin = { workspace = true, features = ["rand-std"] } -peers = { path = "../peers/", default-features = false } -p2p = { path = "../p2p/", package = "swiftsync-p2p", default-features = false } -p2p-messages = { package = "bitcoin-p2p-messages", git = "https://github.com/rust-bitcoin/rust-bitcoin", rev = "2bb9bb6bc99ba07ed3d543a512ec3d2a9462770d" } -tracing = "0.1" -tracing-subscriber = "0.3" -rusqlite = { version = "0.36.0", features = ["bundled"] } diff --git a/utxo_verifier/README.md b/utxo_verifier/README.md deleted file mode 100644 index 87ed0d1..0000000 --- a/utxo_verifier/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# UTXO Snapshot Verifier - -The UTXO set at a particular height in the blockchain may be verified by computing an accumulator state generated from the snapshot and the accumulator state generated from downloading the block history. This program accepts a path to a sqlite file representing the OutPoints in the UTXO set and downloads blocks in parallel from many peers. The result of the program is either successful in the case the accumulators match, or panic if there is a mis-match. The snapshot height used is 880_000 on Bitcoin and is 160_000 on Signet. - -## SQLite File Generation - -The sqlite file for Signet is in the `/contrib` folder of this repository. Skip to the next section to run the Signet example. To generate the sqlite file required to run this verifier on Bitcoin, you will need a `utxos.dat` file representing the UTXO state at height 880_000. This may be found on https://utxo.download. Be aware you will need at least 32GB of storage at the time of writing. - -``` -curl https://utxo.download/mainnet-880000.dat --output utxos.dat -``` - -To convert this snapshot to sqlite, in this directory: - -``` -python utxos_to_sqlite.py /path/to/utxos.dat /path/to/outpoints.sqlite -``` - -## Running the Verifier - -To run the verifier on Signet: -``` -cargo run ../contrib/signet_outpoints.sqlite --release -``` diff --git a/utxo_verifier/src/hasher.rs b/utxo_verifier/src/hasher.rs deleted file mode 100644 index 769a57f..0000000 --- a/utxo_verifier/src/hasher.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::sync::mpsc; - -use accumulator::Accumulator; -use bitcoin::{Network, OutPoint, Txid}; - -use crate::{AccumulatorUpdate, NETWORK}; - -pub fn update_accumulator_from_blocks( - receiver: mpsc::Receiver>, -) -> Accumulator { - let mut acc = Accumulator::new(); - // These outpoints will show up twice, but can only be spent once - // - if matches!(NETWORK, Network::Bitcoin) { - let coinbase_one = crate::DUP_COINBASE_ONE.parse::().unwrap(); - let coinbase_two = crate::DUP_COINBASE_TWO.parse::().unwrap(); - acc.spend(OutPoint { - txid: coinbase_one, - vout: 0, - }); - acc.spend(OutPoint { - txid: coinbase_two, - vout: 0, - }); - } - while let Ok(values) = receiver.recv() { - for value in values { - match value { - AccumulatorUpdate::Spent(outpoint) => acc.spend_hashed_outpoint(outpoint), - AccumulatorUpdate::Created(txout) => acc.add_hashed_outpoint(txout), - } - } - } - acc -} diff --git a/utxo_verifier/src/job.rs b/utxo_verifier/src/job.rs deleted file mode 100644 index 38cbdc9..0000000 --- a/utxo_verifier/src/job.rs +++ /dev/null @@ -1,172 +0,0 @@ -use std::{ - collections::HashSet, - net::IpAddr, - sync::{Arc, mpsc}, - time::Duration, -}; - -use bitcoin::{ - BlockHash, OutPoint, - script::ScriptExt, - secp256k1::rand::{seq::IteratorRandom, thread_rng}, - transaction::TransactionExt, -}; -use p2p::{ - ConnectionBuilder, - net::{ConnectionExt, ReadExt, WriteExt}, -}; -use p2p_messages::{ - ServiceFlags, - message::{InventoryPayload, NetworkMessage}, - message_blockdata::Inventory, -}; -use peers::PortExt; - -use crate::{AccumulatorUpdate, NETWORK}; - -pub fn fetch_blocks( - id: usize, - sender: mpsc::Sender>, - mut queue: Vec>, - peers: Arc>, -) { - let mut batch = queue.pop().unwrap(); - loop { - let any = peers.iter().choose(&mut thread_rng()).copied().unwrap(); - let connection = ConnectionBuilder::new() - .no_cmpct_blocks() - .announce_by_inv() - .connection_timeout(Duration::from_secs(2)) - .change_network(NETWORK) - .their_services_expected(ServiceFlags::NETWORK) - .open_connection((any, NETWORK.port())); - tracing::info!("Connecting to {any}"); - match connection { - Ok((mut tcp_stream, mut ctx)) => { - let _ = tcp_stream.set_read_timeout(Some(Duration::from_secs(1))); - let inv = InventoryPayload(batch.iter().copied().map(Inventory::Block).collect()); - let getdata = NetworkMessage::GetData(inv); - if tcp_stream.write_message(getdata, &mut ctx).is_err() { - tracing::warn!("Failed to write message"); - } - loop { - let message = tcp_stream.read_message(&mut ctx); - match message { - Ok(message) => { - if let Some(message) = message { - match message { - NetworkMessage::Block(b) => { - let this_hash = b.block_hash(); - let (_, transactions) = b.into_parts(); - let mut updates = Vec::with_capacity(transactions.len()); - for tx in transactions { - if !tx.is_coinbase() { - for input in tx.inputs() { - updates.push(AccumulatorUpdate::Spent( - accumulator::hash_outpoint( - input.previous_output, - ), - )); - } - } - let txid = tx.compute_txid(); - for (index, output) in tx.outputs().iter().enumerate() { - if output.script_pubkey.is_op_return() { - continue; - } - let outpoint = OutPoint { - txid, - vout: index as u32, - }; - let hash = accumulator::hash_outpoint(outpoint); - updates.push(AccumulatorUpdate::Created(hash)); - } - } - sender.send(updates).unwrap(); - tracing::info!("{id}:{this_hash}"); - batch.retain(|hash| hash.ne(&this_hash)); - if batch.is_empty() { - match queue.pop() { - Some(next) => { - batch = next; - tracing::info!( - "{any} has {} more batches", - queue.len() - ); - let inv = InventoryPayload( - batch - .iter() - .copied() - .map(Inventory::Block) - .collect(), - ); - let getdata = NetworkMessage::GetData(inv); - if tcp_stream - .write_message(getdata, &mut ctx) - .is_err() - { - tracing::warn!("Failed to write message"); - break; - } - } - None => { - tracing::info!("{any} finished a job"); - return; - } - } - } - } - NetworkMessage::Ping(nonce) => { - let e = tcp_stream - .write_message(NetworkMessage::Pong(nonce), &mut ctx); - if e.is_err() { - break; - } - } - other => tracing::info!("{}", other.cmd()), - } - } - } - Err(_) => { - tracing::warn!("Connection to {any} closed"); - break; - } - } - } - } - Err(e) => { - tracing::warn!("Connection to {any} failed: {e}"); - } - } - } -} - -pub fn divide_jobs(vec: Vec, num_jobs: usize) -> Vec> { - assert!(num_jobs > 0); - let n = vec.len(); - let mut jobs = vec![Vec::new(); num_jobs]; - #[allow(clippy::needless_range_loop)] - for i in 0..num_jobs { - let mut j = i; - while j < n { - jobs[i].push(vec[j]); - j += num_jobs; - } - } - jobs -} - -#[cfg(test)] -mod tests { - use super::divide_jobs; - - #[test] - fn test_job_divison() { - let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let m = 3; - let jobs = divide_jobs(data, m); - let want = vec![1, 4, 7, 10]; - let got = jobs.first().unwrap().clone(); - assert_eq!(want, got); - } -} diff --git a/utxo_verifier/src/loader.rs b/utxo_verifier/src/loader.rs deleted file mode 100644 index abb72c3..0000000 --- a/utxo_verifier/src/loader.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::{path::Path, time::Instant}; - -use accumulator::Accumulator; -use bitcoin::{OutPoint, Txid}; -use std::collections::BTreeMap; - -use bitcoin::{BlockHash, consensus}; -use rusqlite::Connection; - -const SELECT_STMT: &str = "SELECT txid, vout FROM utxos"; - -pub fn update_acc_from_outpoint_set>(path: P) -> Accumulator { - let mut acc = Accumulator::new(); - let conn = Connection::open(path).unwrap(); - let mut stmt = conn.prepare(SELECT_STMT).unwrap(); - let mut rows = stmt.query([]).unwrap(); - tracing::info!("Spending UTXOs from the accumulator"); - let mut outpoints_spent = 0; - let now = Instant::now(); - while let Some(row) = rows.next().unwrap() { - let txid: String = row.get(0).unwrap(); - let vout: u32 = row.get(1).unwrap(); - let txid = txid.parse::().unwrap(); - let outpoint = OutPoint { txid, vout }; - acc.add(outpoint); - outpoints_spent += 1; - if outpoints_spent % 1_000_000 == 0 { - tracing::info!("{outpoints_spent} OutPoints added to the accumulator"); - } - } - tracing::info!("Done spending UTXOs in {} seconds", now.elapsed().as_secs()); - acc -} - -pub fn get_block_hashes_from_store>( - path: P, - assume_valid: BlockHash, -) -> BTreeMap { - let mut hashes = BTreeMap::new(); - let conn = Connection::open(path).unwrap(); - let mut stmt = conn - .prepare("SELECT height, block_hash FROM headers") - .unwrap(); - tracing::info!("Loading hashes from storage"); - let now = Instant::now(); - let mut rows = stmt.query([]).unwrap(); - while let Some(row) = rows.next().unwrap() { - let height: u32 = row.get(0).unwrap(); - let hash: [u8; 32] = row.get(1).unwrap(); - if height == 0 { - continue; - } - let block_hash: BlockHash = consensus::deserialize(&hash).unwrap(); - hashes.insert(height, block_hash); - if block_hash.eq(&assume_valid) { - let secs = now.elapsed().as_secs(); - tracing::info!("Done loading hashes in {} seconds", secs); - return hashes; - } - } - panic!("expected assume valid hash"); -} diff --git a/utxo_verifier/src/main.rs b/utxo_verifier/src/main.rs deleted file mode 100644 index 49b7db2..0000000 --- a/utxo_verifier/src/main.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::{ - collections::HashSet, - net::IpAddr, - sync::{Arc, mpsc::channel}, - time::Instant, -}; - -use bitcoin::{BlockHash, Network}; - -#[allow(unused)] -use bitcoin::{OutPoint, Txid}; -use hasher::update_accumulator_from_blocks; -use job::{divide_jobs, fetch_blocks}; -use loader::{get_block_hashes_from_store, update_acc_from_outpoint_set}; - -use peers::{SeedsExt, dns::DnsQuery}; - -mod hasher; -mod job; -mod loader; - -pub const NETWORK: Network = Network::Signet; -// Signet -const ASSUME_VALID_HASH: &str = "0000003ca3c99aff040f2563c2ad8f8ec88bd0fd6b8f0895cfaf1ef90353a62c"; -// Bitcoin -// const ASSUME_VALID_HASH: &str = "000000000000000000010b17283c3c400507969a9c2afd1dcf2082ec5cca2880"; -const WORKERS: usize = 32; -pub const DUP_COINBASE_ONE: &str = - "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468"; -pub const DUP_COINBASE_TWO: &str = - "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"; -const REQUEST_SIZE: usize = 2_000; - -fn bootstrap() -> HashSet { - let seeds = NETWORK.dns_seeds(); - let mut peers = HashSet::with_capacity(300); - for seed in seeds { - let query = DnsQuery::new_cloudflare(seed).lookup(); - if let Ok(addrs) = query { - tracing::info!("Adding IPs {} from {}", addrs.len(), seed); - peers.extend(addrs); - } - } - tracing::info!("DNS task exit"); - peers -} - -#[derive(Debug, Clone, Copy)] -enum AccumulatorUpdate { - Spent([u8; 32]), - Created([u8; 32]), -} - -fn main() { - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber).unwrap(); - let mut args = std::env::args(); - let _ = args.next(); - let path = args - .next() - .expect("Provide a file path to the utxos.sqlite/outpoints.sqlite file"); - tracing::info!("Fetching peers from DNS"); - let peers = Arc::new(bootstrap()); - tracing::info!("Loading OutPoint set and updating the accumulator"); - let utxo_handle = std::thread::spawn(move || update_acc_from_outpoint_set(path)); - let path = match NETWORK { - Network::Signet => "../contrib/signet_headers.sqlite", - Network::Bitcoin => "../contrib/bitcoin_headers.sqlite", - _ => unimplemented!("unsupported network"), - }; - let hashes = get_block_hashes_from_store(path, ASSUME_VALID_HASH.parse::().unwrap()); - let hashes = hashes.into_values().collect::>(); - let (tx, rx) = channel(); - let block_handle = std::thread::spawn(move || update_accumulator_from_blocks(rx)); - let now = Instant::now(); - tracing::info!("Spawning workers"); - let mut handles = Vec::with_capacity(WORKERS); - let jobs = divide_jobs(hashes, WORKERS); - for (id, job) in jobs.into_iter().enumerate() { - let windows = job - .chunks(REQUEST_SIZE) - .map(|slice| slice.to_vec()) - .collect(); - let sender = tx.clone(); - let peers = Arc::clone(&peers); - let handle = std::thread::spawn(move || fetch_blocks(id, sender, windows, peers.clone())); - handles.push(handle); - } - tracing::info!("Waiting for jobs"); - for handle in handles { - handle.join().unwrap(); - } - drop(tx); - let utxo_acc = utxo_handle.join().unwrap(); - let block_acc = block_handle.join().unwrap(); - let done = now.elapsed().as_secs(); - tracing::info!("Block download complete in {done} seconds"); - assert_eq!(utxo_acc, block_acc); - tracing::info!("Verified"); -} diff --git a/utxo_verifier/utxo_to_sqlite.py b/utxo_verifier/utxo_to_sqlite.py deleted file mode 100755 index 9d02607..0000000 --- a/utxo_verifier/utxo_to_sqlite.py +++ /dev/null @@ -1,195 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) 2024-present The Bitcoin Core developers -# Distributed under the MIT software license, see the accompanying -# file COPYING or http://www.opensource.org/licenses/mit-license.php. -"""Tool to convert a compact-serialized UTXO set to a SQLite3 database. - -The input UTXO set can be generated by Bitcoin Core with the `dumptxoutset` RPC: -$ bitcoin-cli dumptxoutset ~/utxos.dat latest - -The created database contains a table `utxos` with the following schema: -(txid TEXT, vout INT, value INT, coinbase INT, height INT, scriptpubkey TEXT) -""" -import argparse -import os -import sqlite3 -import sys -import time - - -UTXO_DUMP_MAGIC = b'utxo\xff' -UTXO_DUMP_VERSION = 2 -NET_MAGIC_BYTES = { - b"\xf9\xbe\xb4\xd9": "Mainnet", - b"\x0a\x03\xcf\x40": "Signet", - b"\x0b\x11\x09\x07": "Testnet3", - b"\x1c\x16\x3f\x28": "Testnet4", - b"\xfa\xbf\xb5\xda": "Regtest", -} - - -def read_varint(f): - """Equivalent of `ReadVarInt()` (see serialization module).""" - n = 0 - while True: - dat = f.read(1)[0] - n = (n << 7) | (dat & 0x7f) - if (dat & 0x80) > 0: - n += 1 - else: - return n - - -def read_compactsize(f): - """Equivalent of `ReadCompactSize()` (see serialization module).""" - n = f.read(1)[0] - if n == 253: - n = int.from_bytes(f.read(2), "little") - elif n == 254: - n = int.from_bytes(f.read(4), "little") - elif n == 255: - n = int.from_bytes(f.read(8), "little") - return n - - -def decompress_amount(x): - """Equivalent of `DecompressAmount()` (see compressor module).""" - if x == 0: - return 0 - x -= 1 - e = x % 10 - x //= 10 - n = 0 - if e < 9: - d = (x % 9) + 1 - x //= 9 - n = x * 10 + d - else: - n = x + 1 - while e > 0: - n *= 10 - e -= 1 - return n - - -def decompress_script(f): - """Equivalent of `DecompressScript()` (see compressor module).""" - size = read_varint(f) # sizes 0-5 encode compressed script types - if size == 0: # P2PKH - return bytes([0x76, 0xa9, 20]) + f.read(20) + bytes([0x88, 0xac]) - elif size == 1: # P2SH - return bytes([0xa9, 20]) + f.read(20) + bytes([0x87]) - elif size in (2, 3): # P2PK (compressed) - return bytes([33, size]) + f.read(32) + bytes([0xac]) - elif size in (4, 5): # P2PK (uncompressed) - compressed_pubkey = bytes([size - 2]) + f.read(32) - return bytes([65]) + decompress_pubkey(compressed_pubkey) + bytes([0xac]) - else: # others (bare multisig, segwit etc.) - size -= 6 - assert size <= 10000, f"too long script with size {size}" - return f.read(size) - - -def decompress_pubkey(compressed_pubkey): - """Decompress pubkey by calculating y = sqrt(x^3 + 7) % p - (see functions `secp256k1_eckey_pubkey_parse` and `secp256k1_ge_set_xo_var`). - """ - P = 2**256 - 2**32 - 977 # secp256k1 field size - assert len(compressed_pubkey) == 33 and compressed_pubkey[0] in (2, 3) - x = int.from_bytes(compressed_pubkey[1:], 'big') - rhs = (x**3 + 7) % P - y = pow(rhs, (P + 1)//4, P) # get sqrt using Tonelli-Shanks algorithm (for p % 4 = 3) - assert pow(y, 2, P) == rhs, f"pubkey is not on curve ({compressed_pubkey.hex()})" - tag_is_odd = compressed_pubkey[0] == 3 - y_is_odd = (y & 1) == 1 - if tag_is_odd != y_is_odd: # fix parity (even/odd) if necessary - y = P - y - return bytes([4]) + x.to_bytes(32, 'big') + y.to_bytes(32, 'big') - - -def main(): - parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('infile', help='filename of compact-serialized UTXO set (input)') - parser.add_argument('outfile', help='filename of created SQLite3 database (output)') - parser.add_argument('-v', '--verbose', action='store_true', help='show details about each UTXO') - args = parser.parse_args() - - if not os.path.exists(args.infile): - print(f"Error: provided input file '{args.infile}' doesn't exist.") - sys.exit(1) - - if os.path.exists(args.outfile): - print(f"Error: provided output file '{args.outfile}' already exists.") - sys.exit(1) - - # create database table - con = sqlite3.connect(args.outfile) - con.execute("CREATE TABLE utxos(txid TEXT, vout INT)") - - # read metadata (magic bytes, version, network magic, block hash, UTXO count) - f = open(args.infile, 'rb') - magic_bytes = f.read(5) - version = int.from_bytes(f.read(2), 'little') - network_magic = f.read(4) - block_hash = f.read(32) - num_utxos = int.from_bytes(f.read(8), 'little') - if magic_bytes != UTXO_DUMP_MAGIC: - print(f"Error: provided input file '{args.infile}' is not an UTXO dump.") - sys.exit(1) - if version != UTXO_DUMP_VERSION: - print(f"Error: provided input file '{args.infile}' has unknown UTXO dump version {version} " - f"(only version {UTXO_DUMP_VERSION} supported)") - sys.exit(1) - network_string = NET_MAGIC_BYTES.get(network_magic, f"unknown network ({network_magic.hex()})") - print(f"UTXO Snapshot for {network_string} at block hash " - f"{block_hash[::-1].hex()[:32]}..., contains {num_utxos} coins") - - start_time = time.time() - write_batch = [] - coins_per_hash_left = 0 - prevout_hash = None - max_height = 0 - - for coin_idx in range(1, num_utxos+1): - # read key (COutPoint) - if coins_per_hash_left == 0: # read next prevout hash - prevout_hash = f.read(32)[::-1].hex() - coins_per_hash_left = read_compactsize(f) - prevout_index = read_compactsize(f) - # read value (Coin) - code = read_varint(f) - height = code >> 1 - is_coinbase = code & 1 - amount = decompress_amount(read_varint(f)) - scriptpubkey = decompress_script(f).hex() - write_batch.append((prevout_hash, prevout_index)) - if height > max_height: - max_height = height - coins_per_hash_left -= 1 - - if args.verbose: - print(f"Coin {coin_idx}/{num_utxos}:") - print(f" prevout = {prevout_hash}:{prevout_index}") - print(f" amount = {amount}, height = {height}, coinbase = {is_coinbase}") - print(f" scriptPubKey = {scriptpubkey}\n") - - if coin_idx % (16*1024) == 0 or coin_idx == num_utxos: - # write utxo batch to database - con.executemany("INSERT INTO utxos VALUES(?, ?)", write_batch) - con.commit() - write_batch.clear() - - if coin_idx % (1024*1024) == 0: - elapsed = time.time() - start_time - print(f"{coin_idx} coins converted [{coin_idx/num_utxos*100:.2f}%], " + - f"{elapsed:.3f}s passed since start") - con.close() - - print(f"TOTAL: {num_utxos} coins written to {args.outfile}, snapshot height is {max_height}.") - if f.read(1) != b'': # EOF should be reached by now - print(f"WARNING: input file {args.infile} has not reached EOF yet!") - sys.exit(1) - - -if __name__ == '__main__': - main()