diff --git a/node/README.md b/node/README.md index b850ced..25cf11e 100644 --- a/node/README.md +++ b/node/README.md @@ -12,7 +12,18 @@ sudo apt-get install unzip unzip bitcoin.hints.zip ``` +To build the Bitcoin kernel, you will need the following on Ubuntu: + +``` +sudo apt-get install build-essential cmake pkgconf python3 libevent-dev libboost-dev +``` + +For other systems, follow the Bitcoin Core documentation on how to install the requirements [here](https://github.com/bitcoin/bitcoin/tree/master/doc). + +Finally, you will need Rust and cargo installed, you may download them from [here](https://www.rust-lang.org/tools/install). + To start fast IBD: + ``` cargo run --bin ibd --release -- ``` @@ -39,3 +50,9 @@ Arguments: --write-timeout The maximum time (seconds) to write to a TCP stream until the connection is killed. ``` + +If IBD completes, or you experience a bug, you will need to remove the kernel directories from this repository to run the binary again: + +``` +rm -rf blocks chainstate +``` diff --git a/node/config_spec.toml b/node/config_spec.toml index 90d64ea..dd0cb90 100644 --- a/node/config_spec.toml +++ b/node/config_spec.toml @@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi [[param]] name = "min_blocks_per_sec" type = "f64" -default = "3." +default = "1." doc = "The minimum rate a peer has to respond to block requests." [[param]] @@ -28,12 +28,6 @@ type = "usize" default = "32" doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads." -[[param]] -name = "ping_timeout" -type = "u64" -default = "60" -doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers." - [[param]] name = "tcp_timeout" type = "u64" diff --git a/node/src/bin/ibd.rs b/node/src/bin/ibd.rs index 447779b..252e06d 100644 --- a/node/src/bin/ibd.rs +++ b/node/src/bin/ibd.rs @@ -27,7 +27,6 @@ fn main() { .network .parse::() .expect("invalid network string"); - let ping_timeout = Duration::from_secs(config.ping_timeout); let block_per_sec = config.min_blocks_per_sec; let task_num = config.tasks; let tcp_timeout = Duration::from_secs(config.tcp_timeout); @@ -76,11 +75,13 @@ fn main() { let acc_task = std::thread::spawn(move || accumulator_state.verify()); let peers = Arc::new(Mutex::new(peers)); let mut tasks = Vec::new(); - let hashes = hashes_from_chain(Arc::clone(&chain), network, task_num); - for (task_id, chunk) in hashes.into_iter().enumerate() { + let hashes = hashes_from_chain(Arc::clone(&chain)); + let arc_hashes = Arc::new(Mutex::new(hashes)); + for task_id in 0..task_num { let chain = Arc::clone(&chain); let tx = tx.clone(); let peers = Arc::clone(&peers); + let hashes = Arc::clone(&arc_hashes); let hints = Arc::clone(&hints); let block_file_path = block_file_path.clone(); let block_task = std::thread::spawn(move || { @@ -88,14 +89,13 @@ fn main() { task_id as u32, timeout_conf, block_per_sec, - ping_timeout, network, block_file_path, chain, &hints, peers, tx, - chunk, + hashes, ) }); tasks.push(block_task); diff --git a/node/src/lib.rs b/node/src/lib.rs index 8118611..8eb8be6 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -34,7 +34,8 @@ use p2p::{ }; const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION; -const MAX_GETDATA: usize = 50_000; +const CHUNK_SIZE: usize = 100; +const CONSIDERED_DEAD: f64 = 0.1; pub fn elapsed_time(then: Instant) { let duration_sec = then.elapsed().as_secs_f64(); @@ -168,16 +169,15 @@ pub fn get_blocks_for_range( task_id: u32, timeout_params: TimeoutParams, blocks_per_sec: f64, - _ping_timeout: Duration, network: Network, block_dir: Option, chain: Arc, hints: &Hints, peers: Arc>>, updater: Sender, - mut batch: Vec, + hashes: Arc>>>, ) { - tracing::info!("{task_id} assigned {} blocks", batch.len()); + let mut batch = Vec::new(); let mut rng = thread_rng(); loop { let peer = { @@ -185,6 +185,14 @@ pub fn get_blocks_for_range( let socket_addr = lock_opt.and_then(|lock| lock.choose(&mut rng).copied()); socket_addr }; + if batch.is_empty() { + let mut jobs_lock = hashes.lock().expect("could not take lock on hashes"); + let Some(next) = jobs_lock.pop() else { + return; + }; + tracing::info!("[thread {task_id:2}]: requesting next batch"); + batch = next; + } let Some(peer) = peer else { continue }; // tracing::info!("Connecting to {peer}"); let conn = ConnectionConfig::new() @@ -197,9 +205,9 @@ pub fn get_blocks_for_range( // tracing::warn!("Connection failed"); continue; }; - // tracing::info!("Connection successful"); + let mut completed_batches = 0; + tracing::info!("[thread {task_id:2}]: established connection {peer}"); let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect()); - // tracing::info!("Requesting {} blocks", payload.0.len()); let getdata = NetworkMessage::GetData(payload); if writer.send_message(getdata).is_err() { continue; @@ -211,6 +219,7 @@ pub fn get_blocks_for_range( } NetworkMessage::Block(block) => { let hash = block.block_hash(); + // tracing::info!("[thread {task_id:2}]: {hash}"); batch.retain(|b| hash.ne(b)); let kernal_hash: kernel::BlockHash = kernel::BlockHash { hash: hash.to_byte_array(), @@ -221,7 +230,6 @@ pub fn get_blocks_for_range( let block_height = block_index.height().unsigned_abs(); let unspent_indexes: HashSet = hints.get_indexes(block_height).into_iter().collect(); - // tracing::info!("{task_id} -> {block_height}:{hash}"); if let Some(block_dir) = block_dir.as_ref() { let file_path = block_dir.join(format!("{hash}.block")); let file = File::create_new(file_path); @@ -238,7 +246,6 @@ pub fn get_blocks_for_range( .expect("failed to write block file"); file.sync_data().expect("could not sync file with OS"); } - // tracing::info!("Wrote {hash} to file"); let (_, transactions) = block.into_parts(); let mut output_index = 0; for transaction in transactions { @@ -270,12 +277,29 @@ pub fn get_blocks_for_range( output_index += 1 } } - if batch.len() % 100 == 0 { - tracing::info!("{task_id} has {} remaining blocks", batch.len()); - } if batch.is_empty() { - tracing::info!("All block ranges fetched: {task_id}"); - return; + let mut jobs_lock = hashes.lock().expect("could not take lock on hashes"); + let Some(next) = jobs_lock.pop() else { + tracing::info!("[thread {task_id:2}]: no jobs remaining, please wait for other threads"); + return; + }; + batch = next; + completed_batches += 1; + tracing::info!( + "[thread {task_id:2}]: requesting next batch. blocks downloaded: {}", + CHUNK_SIZE * completed_batches + ); + tracing::info!( + "[thread m]: blocks remaining {}", + CHUNK_SIZE * jobs_lock.len() + ); + let payload = InventoryPayload( + batch.iter().map(|hash| Inventory::Block(*hash)).collect(), + ); + let getdata = NetworkMessage::GetData(payload); + if writer.send_message(getdata).is_err() { + break; + } } } NetworkMessage::AddrV2(payload) => { @@ -288,47 +312,34 @@ pub fn get_blocks_for_range( }) .map(|(_, addr)| addr) .collect(); - // tracing::info!("Adding {} peers", addrs.len()); lock.extend(addrs); } } _ => (), } if let Some(message_rate) = metrics.message_rate(TimedMessage::Block) { - if message_rate.total_count() < 100 { - continue; - } let Some(rate) = message_rate.messages_per_secs(Instant::now()) else { continue; }; - if rate < blocks_per_sec { - tracing::warn!("Disconnecting from {task_id} for stalling"); + if rate < CONSIDERED_DEAD { + tracing::warn!("[thread {task_id:2}]: block rate considered dead"); + break; + } + if rate < blocks_per_sec && message_rate.total_count() > 20 { + tracing::warn!("[thread {task_id:2}]: insufficient blocks/second rate"); break; } } - // if metrics.ping_timed_out(ping_timeout) { - // tracing::warn!("{task_id} failed to respond to a ping"); - // break; - // } - } - if batch.is_empty() { - break; } } - tracing::info!("All block ranges fetched: {task_id}"); } -pub fn hashes_from_chain( - chain: Arc, - network: Network, - jobs: usize, -) -> Vec> { +pub fn hashes_from_chain(chain: Arc) -> Vec> { let height = chain.best_header().height(); let mut hashes = Vec::with_capacity(height as usize); let mut curr = chain.best_header(); let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash); hashes.push(tip_hash); - let mut out = Vec::new(); while let Ok(next) = curr.prev() { if next.height() == 0 { break; @@ -337,38 +348,11 @@ pub fn hashes_from_chain( hashes.push(hash); curr = next; } - if matches!(network, Network::Signet) { - return hashes.chunks(20_000).map(|slice| slice.to_vec()).collect(); - } - // These blocks are empty. Fetch the maximum amount of blocks. - let first_epoch = hashes.split_off(hashes.len() - 200_000); - let first_chunks: Vec> = first_epoch - .chunks(MAX_GETDATA) + hashes + .chunks(CHUNK_SIZE) .map(|slice| slice.to_vec()) - .collect(); - out.extend(first_chunks); - // These start to get larger, but are still small - let next_epoch = hashes.split_off(hashes.len() - 100_000); - let next_chunks: Vec> = next_epoch - .chunks(MAX_GETDATA / 2) - .map(|slice| slice.to_vec()) - .collect(); - out.extend(next_chunks); - // Still not entirely full, but almost there - let to_segwit = hashes.split_off(hashes.len() - 100_000); - let to_segwit_chunks: Vec> = to_segwit - .chunks(MAX_GETDATA / 4) - .map(|slice| slice.to_vec()) - .collect(); - out.extend(to_segwit_chunks); - // Now divide the rest among jobs - let chunk_size = hashes.len() / jobs; - let rest: Vec> = hashes - .chunks(chunk_size) - .map(|slice| slice.to_vec()) - .collect(); - out.extend(rest); - out + .rev() + .collect() } pub trait ChainExt {