From 4798c2b4b45d06d6208f66a2e9465a75db214b91 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Mon, 8 Sep 2025 11:19:53 +0100 Subject: [PATCH] node: All threads remain active during IBD In the previous iteration when threads complete their jobs the return to the main loop. We can keep good peers running by giving them more blocks to fetch. This implements a simple queue that jobs can steal work from. --- node/README.md | 17 +++++++ node/config_spec.toml | 8 +-- node/src/bin/ibd.rs | 10 ++-- node/src/lib.rs | 112 ++++++++++++++++++------------------------ 4 files changed, 71 insertions(+), 76 deletions(-) diff --git a/node/README.md b/node/README.md index b850ced..25cf11e 100644 --- a/node/README.md +++ b/node/README.md @@ -12,7 +12,18 @@ sudo apt-get install unzip unzip bitcoin.hints.zip ``` +To build the Bitcoin kernel, you will need the following on Ubuntu: + +``` +sudo apt-get install build-essential cmake pkgconf python3 libevent-dev libboost-dev +``` + +For other systems, follow the Bitcoin Core documentation on how to install the requirements [here](https://github.com/bitcoin/bitcoin/tree/master/doc). + +Finally, you will need Rust and cargo installed, you may download them from [here](https://www.rust-lang.org/tools/install). + To start fast IBD: + ``` cargo run --bin ibd --release -- ``` @@ -39,3 +50,9 @@ Arguments: --write-timeout The maximum time (seconds) to write to a TCP stream until the connection is killed. ``` + +If IBD completes, or you experience a bug, you will need to remove the kernel directories from this repository to run the binary again: + +``` +rm -rf blocks chainstate +``` diff --git a/node/config_spec.toml b/node/config_spec.toml index 90d64ea..dd0cb90 100644 --- a/node/config_spec.toml +++ b/node/config_spec.toml @@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi [[param]] name = "min_blocks_per_sec" type = "f64" -default = "3." +default = "1." doc = "The minimum rate a peer has to respond to block requests." [[param]] @@ -28,12 +28,6 @@ type = "usize" default = "32" doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads." -[[param]] -name = "ping_timeout" -type = "u64" -default = "60" -doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers." - [[param]] name = "tcp_timeout" type = "u64" diff --git a/node/src/bin/ibd.rs b/node/src/bin/ibd.rs index 447779b..252e06d 100644 --- a/node/src/bin/ibd.rs +++ b/node/src/bin/ibd.rs @@ -27,7 +27,6 @@ fn main() { .network .parse::() .expect("invalid network string"); - let ping_timeout = Duration::from_secs(config.ping_timeout); let block_per_sec = config.min_blocks_per_sec; let task_num = config.tasks; let tcp_timeout = Duration::from_secs(config.tcp_timeout); @@ -76,11 +75,13 @@ fn main() { let acc_task = std::thread::spawn(move || accumulator_state.verify()); let peers = Arc::new(Mutex::new(peers)); let mut tasks = Vec::new(); - let hashes = hashes_from_chain(Arc::clone(&chain), network, task_num); - for (task_id, chunk) in hashes.into_iter().enumerate() { + let hashes = hashes_from_chain(Arc::clone(&chain)); + let arc_hashes = Arc::new(Mutex::new(hashes)); + for task_id in 0..task_num { let chain = Arc::clone(&chain); let tx = tx.clone(); let peers = Arc::clone(&peers); + let hashes = Arc::clone(&arc_hashes); let hints = Arc::clone(&hints); let block_file_path = block_file_path.clone(); let block_task = std::thread::spawn(move || { @@ -88,14 +89,13 @@ fn main() { task_id as u32, timeout_conf, block_per_sec, - ping_timeout, network, block_file_path, chain, &hints, peers, tx, - chunk, + hashes, ) }); tasks.push(block_task); diff --git a/node/src/lib.rs b/node/src/lib.rs index 8118611..8eb8be6 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -34,7 +34,8 @@ use p2p::{ }; const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION; -const MAX_GETDATA: usize = 50_000; +const CHUNK_SIZE: usize = 100; +const CONSIDERED_DEAD: f64 = 0.1; pub fn elapsed_time(then: Instant) { let duration_sec = then.elapsed().as_secs_f64(); @@ -168,16 +169,15 @@ pub fn get_blocks_for_range( task_id: u32, timeout_params: TimeoutParams, blocks_per_sec: f64, - _ping_timeout: Duration, network: Network, block_dir: Option, chain: Arc, hints: &Hints, peers: Arc>>, updater: Sender, - mut batch: Vec, + hashes: Arc>>>, ) { - tracing::info!("{task_id} assigned {} blocks", batch.len()); + let mut batch = Vec::new(); let mut rng = thread_rng(); loop { let peer = { @@ -185,6 +185,14 @@ pub fn get_blocks_for_range( let socket_addr = lock_opt.and_then(|lock| lock.choose(&mut rng).copied()); socket_addr }; + if batch.is_empty() { + let mut jobs_lock = hashes.lock().expect("could not take lock on hashes"); + let Some(next) = jobs_lock.pop() else { + return; + }; + tracing::info!("[thread {task_id:2}]: requesting next batch"); + batch = next; + } let Some(peer) = peer else { continue }; // tracing::info!("Connecting to {peer}"); let conn = ConnectionConfig::new() @@ -197,9 +205,9 @@ pub fn get_blocks_for_range( // tracing::warn!("Connection failed"); continue; }; - // tracing::info!("Connection successful"); + let mut completed_batches = 0; + tracing::info!("[thread {task_id:2}]: established connection {peer}"); let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect()); - // tracing::info!("Requesting {} blocks", payload.0.len()); let getdata = NetworkMessage::GetData(payload); if writer.send_message(getdata).is_err() { continue; @@ -211,6 +219,7 @@ pub fn get_blocks_for_range( } NetworkMessage::Block(block) => { let hash = block.block_hash(); + // tracing::info!("[thread {task_id:2}]: {hash}"); batch.retain(|b| hash.ne(b)); let kernal_hash: kernel::BlockHash = kernel::BlockHash { hash: hash.to_byte_array(), @@ -221,7 +230,6 @@ pub fn get_blocks_for_range( let block_height = block_index.height().unsigned_abs(); let unspent_indexes: HashSet = hints.get_indexes(block_height).into_iter().collect(); - // tracing::info!("{task_id} -> {block_height}:{hash}"); if let Some(block_dir) = block_dir.as_ref() { let file_path = block_dir.join(format!("{hash}.block")); let file = File::create_new(file_path); @@ -238,7 +246,6 @@ pub fn get_blocks_for_range( .expect("failed to write block file"); file.sync_data().expect("could not sync file with OS"); } - // tracing::info!("Wrote {hash} to file"); let (_, transactions) = block.into_parts(); let mut output_index = 0; for transaction in transactions { @@ -270,12 +277,29 @@ pub fn get_blocks_for_range( output_index += 1 } } - if batch.len() % 100 == 0 { - tracing::info!("{task_id} has {} remaining blocks", batch.len()); - } if batch.is_empty() { - tracing::info!("All block ranges fetched: {task_id}"); - return; + let mut jobs_lock = hashes.lock().expect("could not take lock on hashes"); + let Some(next) = jobs_lock.pop() else { + tracing::info!("[thread {task_id:2}]: no jobs remaining, please wait for other threads"); + return; + }; + batch = next; + completed_batches += 1; + tracing::info!( + "[thread {task_id:2}]: requesting next batch. blocks downloaded: {}", + CHUNK_SIZE * completed_batches + ); + tracing::info!( + "[thread m]: blocks remaining {}", + CHUNK_SIZE * jobs_lock.len() + ); + let payload = InventoryPayload( + batch.iter().map(|hash| Inventory::Block(*hash)).collect(), + ); + let getdata = NetworkMessage::GetData(payload); + if writer.send_message(getdata).is_err() { + break; + } } } NetworkMessage::AddrV2(payload) => { @@ -288,47 +312,34 @@ pub fn get_blocks_for_range( }) .map(|(_, addr)| addr) .collect(); - // tracing::info!("Adding {} peers", addrs.len()); lock.extend(addrs); } } _ => (), } if let Some(message_rate) = metrics.message_rate(TimedMessage::Block) { - if message_rate.total_count() < 100 { - continue; - } let Some(rate) = message_rate.messages_per_secs(Instant::now()) else { continue; }; - if rate < blocks_per_sec { - tracing::warn!("Disconnecting from {task_id} for stalling"); + if rate < CONSIDERED_DEAD { + tracing::warn!("[thread {task_id:2}]: block rate considered dead"); + break; + } + if rate < blocks_per_sec && message_rate.total_count() > 20 { + tracing::warn!("[thread {task_id:2}]: insufficient blocks/second rate"); break; } } - // if metrics.ping_timed_out(ping_timeout) { - // tracing::warn!("{task_id} failed to respond to a ping"); - // break; - // } - } - if batch.is_empty() { - break; } } - tracing::info!("All block ranges fetched: {task_id}"); } -pub fn hashes_from_chain( - chain: Arc, - network: Network, - jobs: usize, -) -> Vec> { +pub fn hashes_from_chain(chain: Arc) -> Vec> { let height = chain.best_header().height(); let mut hashes = Vec::with_capacity(height as usize); let mut curr = chain.best_header(); let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash); hashes.push(tip_hash); - let mut out = Vec::new(); while let Ok(next) = curr.prev() { if next.height() == 0 { break; @@ -337,38 +348,11 @@ pub fn hashes_from_chain( hashes.push(hash); curr = next; } - if matches!(network, Network::Signet) { - return hashes.chunks(20_000).map(|slice| slice.to_vec()).collect(); - } - // These blocks are empty. Fetch the maximum amount of blocks. - let first_epoch = hashes.split_off(hashes.len() - 200_000); - let first_chunks: Vec> = first_epoch - .chunks(MAX_GETDATA) + hashes + .chunks(CHUNK_SIZE) .map(|slice| slice.to_vec()) - .collect(); - out.extend(first_chunks); - // These start to get larger, but are still small - let next_epoch = hashes.split_off(hashes.len() - 100_000); - let next_chunks: Vec> = next_epoch - .chunks(MAX_GETDATA / 2) - .map(|slice| slice.to_vec()) - .collect(); - out.extend(next_chunks); - // Still not entirely full, but almost there - let to_segwit = hashes.split_off(hashes.len() - 100_000); - let to_segwit_chunks: Vec> = to_segwit - .chunks(MAX_GETDATA / 4) - .map(|slice| slice.to_vec()) - .collect(); - out.extend(to_segwit_chunks); - // Now divide the rest among jobs - let chunk_size = hashes.len() / jobs; - let rest: Vec> = hashes - .chunks(chunk_size) - .map(|slice| slice.to_vec()) - .collect(); - out.extend(rest); - out + .rev() + .collect() } pub trait ChainExt {