Skip to content

Commit 0bf3f62

Browse files
committed
node: All threads remain active during IBD
1 parent 2557016 commit 0bf3f62

File tree

4 files changed

+71
-76
lines changed

4 files changed

+71
-76
lines changed

node/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,18 @@ sudo apt-get install unzip
1212
unzip bitcoin.hints.zip
1313
```
1414

15+
To build the Bitcoin kernel, you will need the following on Ubuntu:
16+
17+
```
18+
sudo apt-get install build-essential cmake pkgconf python3 libevent-dev libboost-dev
19+
```
20+
21+
For other systems, follow the Bitcoin Core documentation on how to install the requirements [here](https://github.com/bitcoin/bitcoin/tree/master/doc).
22+
23+
Finally, you will need Rust and cargo installed, you may download them from [here](https://www.rust-lang.org/tools/install).
24+
1525
To start fast IBD:
26+
1627
```
1728
cargo run --bin ibd --release -- <args>
1829
```
@@ -39,3 +50,9 @@ Arguments:
3950
--write-timeout The maximum time (seconds) to write to a TCP
4051
stream until the connection is killed.
4152
```
53+
54+
If IBD completes, or you experience a bug, you will need to remove the kernel directories from this repository to run the binary again:
55+
56+
```
57+
rm -rf blocks chainstate
58+
```

node/config_spec.toml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi
1919
[[param]]
2020
name = "min_blocks_per_sec"
2121
type = "f64"
22-
default = "3."
22+
default = "1."
2323
doc = "The minimum rate a peer has to respond to block requests."
2424

2525
[[param]]
@@ -28,12 +28,6 @@ type = "usize"
2828
default = "32"
2929
doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads."
3030

31-
[[param]]
32-
name = "ping_timeout"
33-
type = "u64"
34-
default = "60"
35-
doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers."
36-
3731
[[param]]
3832
name = "tcp_timeout"
3933
type = "u64"

node/src/bin/ibd.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ fn main() {
2727
.network
2828
.parse::<Network>()
2929
.expect("invalid network string");
30-
let ping_timeout = Duration::from_secs(config.ping_timeout);
3130
let block_per_sec = config.min_blocks_per_sec;
3231
let task_num = config.tasks;
3332
let tcp_timeout = Duration::from_secs(config.tcp_timeout);
@@ -76,26 +75,27 @@ fn main() {
7675
let acc_task = std::thread::spawn(move || accumulator_state.verify());
7776
let peers = Arc::new(Mutex::new(peers));
7877
let mut tasks = Vec::new();
79-
let hashes = hashes_from_chain(Arc::clone(&chain), network, task_num);
80-
for (task_id, chunk) in hashes.into_iter().enumerate() {
78+
let hashes = hashes_from_chain(Arc::clone(&chain));
79+
let arc_hashes = Arc::new(Mutex::new(hashes));
80+
for task_id in 0..task_num {
8181
let chain = Arc::clone(&chain);
8282
let tx = tx.clone();
8383
let peers = Arc::clone(&peers);
84+
let hashes = Arc::clone(&arc_hashes);
8485
let hints = Arc::clone(&hints);
8586
let block_file_path = block_file_path.clone();
8687
let block_task = std::thread::spawn(move || {
8788
get_blocks_for_range(
8889
task_id as u32,
8990
timeout_conf,
9091
block_per_sec,
91-
ping_timeout,
9292
network,
9393
block_file_path,
9494
chain,
9595
&hints,
9696
peers,
9797
tx,
98-
chunk,
98+
hashes,
9999
)
100100
});
101101
tasks.push(block_task);

node/src/lib.rs

Lines changed: 48 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use p2p::{
3434
};
3535

3636
const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION;
37-
const MAX_GETDATA: usize = 50_000;
37+
const CHUNK_SIZE: usize = 100;
38+
const CONSIDERED_DEAD: f64 = 0.1;
3839

3940
pub fn elapsed_time(then: Instant) {
4041
let duration_sec = then.elapsed().as_secs_f64();
@@ -168,23 +169,30 @@ pub fn get_blocks_for_range(
168169
task_id: u32,
169170
timeout_params: TimeoutParams,
170171
blocks_per_sec: f64,
171-
_ping_timeout: Duration,
172172
network: Network,
173173
block_dir: Option<PathBuf>,
174174
chain: Arc<ChainstateManager>,
175175
hints: &Hints,
176176
peers: Arc<Mutex<Vec<SocketAddr>>>,
177177
updater: Sender<AccumulatorUpdate>,
178-
mut batch: Vec<BlockHash>,
178+
hashes: Arc<Mutex<Vec<Vec<BlockHash>>>>,
179179
) {
180-
tracing::info!("{task_id} assigned {} blocks", batch.len());
180+
let mut batch = Vec::new();
181181
let mut rng = thread_rng();
182182
loop {
183183
let peer = {
184184
let lock_opt = peers.lock().ok();
185185
let socket_addr = lock_opt.and_then(|lock| lock.choose(&mut rng).copied());
186186
socket_addr
187187
};
188+
if batch.is_empty() {
189+
let mut jobs_lock = hashes.lock().expect("could not take lock on hashes");
190+
let Some(next) = jobs_lock.pop() else {
191+
return;
192+
};
193+
tracing::info!("[thread {task_id:2}]: requesting next batch");
194+
batch = next;
195+
}
188196
let Some(peer) = peer else { continue };
189197
// tracing::info!("Connecting to {peer}");
190198
let conn = ConnectionConfig::new()
@@ -197,9 +205,9 @@ pub fn get_blocks_for_range(
197205
// tracing::warn!("Connection failed");
198206
continue;
199207
};
200-
// tracing::info!("Connection successful");
208+
let mut completed_batches = 0;
209+
tracing::info!("[thread {task_id:2}]: established connection {peer}");
201210
let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect());
202-
// tracing::info!("Requesting {} blocks", payload.0.len());
203211
let getdata = NetworkMessage::GetData(payload);
204212
if writer.send_message(getdata).is_err() {
205213
continue;
@@ -211,6 +219,7 @@ pub fn get_blocks_for_range(
211219
}
212220
NetworkMessage::Block(block) => {
213221
let hash = block.block_hash();
222+
// tracing::info!("[thread {task_id:2}]: {hash}");
214223
batch.retain(|b| hash.ne(b));
215224
let kernal_hash: kernel::BlockHash = kernel::BlockHash {
216225
hash: hash.to_byte_array(),
@@ -221,7 +230,6 @@ pub fn get_blocks_for_range(
221230
let block_height = block_index.height().unsigned_abs();
222231
let unspent_indexes: HashSet<u64> =
223232
hints.get_indexes(block_height).into_iter().collect();
224-
// tracing::info!("{task_id} -> {block_height}:{hash}");
225233
if let Some(block_dir) = block_dir.as_ref() {
226234
let file_path = block_dir.join(format!("{hash}.block"));
227235
let file = File::create_new(file_path);
@@ -238,7 +246,6 @@ pub fn get_blocks_for_range(
238246
.expect("failed to write block file");
239247
file.sync_data().expect("could not sync file with OS");
240248
}
241-
// tracing::info!("Wrote {hash} to file");
242249
let (_, transactions) = block.into_parts();
243250
let mut output_index = 0;
244251
for transaction in transactions {
@@ -270,12 +277,29 @@ pub fn get_blocks_for_range(
270277
output_index += 1
271278
}
272279
}
273-
if batch.len() % 100 == 0 {
274-
tracing::info!("{task_id} has {} remaining blocks", batch.len());
275-
}
276280
if batch.is_empty() {
277-
tracing::info!("All block ranges fetched: {task_id}");
278-
return;
281+
let mut jobs_lock = hashes.lock().expect("could not take lock on hashes");
282+
let Some(next) = jobs_lock.pop() else {
283+
tracing::info!("[thread {task_id:2}]: no jobs remaining, please wait for other threads");
284+
return;
285+
};
286+
batch = next;
287+
completed_batches += 1;
288+
tracing::info!(
289+
"[thread {task_id:2}]: requesting next batch. blocks downloaded: {}",
290+
CHUNK_SIZE * completed_batches
291+
);
292+
tracing::info!(
293+
"[thread m]: blocks remaining {}",
294+
CHUNK_SIZE * jobs_lock.len()
295+
);
296+
let payload = InventoryPayload(
297+
batch.iter().map(|hash| Inventory::Block(*hash)).collect(),
298+
);
299+
let getdata = NetworkMessage::GetData(payload);
300+
if writer.send_message(getdata).is_err() {
301+
break;
302+
}
279303
}
280304
}
281305
NetworkMessage::AddrV2(payload) => {
@@ -288,47 +312,34 @@ pub fn get_blocks_for_range(
288312
})
289313
.map(|(_, addr)| addr)
290314
.collect();
291-
// tracing::info!("Adding {} peers", addrs.len());
292315
lock.extend(addrs);
293316
}
294317
}
295318
_ => (),
296319
}
297320
if let Some(message_rate) = metrics.message_rate(TimedMessage::Block) {
298-
if message_rate.total_count() < 100 {
299-
continue;
300-
}
301321
let Some(rate) = message_rate.messages_per_secs(Instant::now()) else {
302322
continue;
303323
};
304-
if rate < blocks_per_sec {
305-
tracing::warn!("Disconnecting from {task_id} for stalling");
324+
if rate < CONSIDERED_DEAD {
325+
tracing::warn!("[thread {task_id:2}]: block rate considered dead");
326+
break;
327+
}
328+
if rate < blocks_per_sec && message_rate.total_count() > 20 {
329+
tracing::warn!("[thread {task_id:2}]: insufficient blocks/second rate");
306330
break;
307331
}
308332
}
309-
// if metrics.ping_timed_out(ping_timeout) {
310-
// tracing::warn!("{task_id} failed to respond to a ping");
311-
// break;
312-
// }
313-
}
314-
if batch.is_empty() {
315-
break;
316333
}
317334
}
318-
tracing::info!("All block ranges fetched: {task_id}");
319335
}
320336

321-
pub fn hashes_from_chain(
322-
chain: Arc<ChainstateManager>,
323-
network: Network,
324-
jobs: usize,
325-
) -> Vec<Vec<BlockHash>> {
337+
pub fn hashes_from_chain(chain: Arc<ChainstateManager>) -> Vec<Vec<BlockHash>> {
326338
let height = chain.best_header().height();
327339
let mut hashes = Vec::with_capacity(height as usize);
328340
let mut curr = chain.best_header();
329341
let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash);
330342
hashes.push(tip_hash);
331-
let mut out = Vec::new();
332343
while let Ok(next) = curr.prev() {
333344
if next.height() == 0 {
334345
break;
@@ -337,38 +348,11 @@ pub fn hashes_from_chain(
337348
hashes.push(hash);
338349
curr = next;
339350
}
340-
if matches!(network, Network::Signet) {
341-
return hashes.chunks(20_000).map(|slice| slice.to_vec()).collect();
342-
}
343-
// These blocks are empty. Fetch the maximum amount of blocks.
344-
let first_epoch = hashes.split_off(hashes.len() - 200_000);
345-
let first_chunks: Vec<Vec<BlockHash>> = first_epoch
346-
.chunks(MAX_GETDATA)
351+
hashes
352+
.chunks(CHUNK_SIZE)
347353
.map(|slice| slice.to_vec())
348-
.collect();
349-
out.extend(first_chunks);
350-
// These start to get larger, but are still small
351-
let next_epoch = hashes.split_off(hashes.len() - 100_000);
352-
let next_chunks: Vec<Vec<BlockHash>> = next_epoch
353-
.chunks(MAX_GETDATA / 2)
354-
.map(|slice| slice.to_vec())
355-
.collect();
356-
out.extend(next_chunks);
357-
// Still not entirely full, but almost there
358-
let to_segwit = hashes.split_off(hashes.len() - 100_000);
359-
let to_segwit_chunks: Vec<Vec<BlockHash>> = to_segwit
360-
.chunks(MAX_GETDATA / 4)
361-
.map(|slice| slice.to_vec())
362-
.collect();
363-
out.extend(to_segwit_chunks);
364-
// Now divide the rest among jobs
365-
let chunk_size = hashes.len() / jobs;
366-
let rest: Vec<Vec<BlockHash>> = hashes
367-
.chunks(chunk_size)
368-
.map(|slice| slice.to_vec())
369-
.collect();
370-
out.extend(rest);
371-
out
354+
.rev()
355+
.collect()
372356
}
373357

374358
pub trait ChainExt {

0 commit comments

Comments
 (0)