Skip to content

Commit 2819091

Browse files
authored
Merge pull request #41 from rustaceanrob/8-19-block-distro
Request blocks in an uneven distribution
2 parents 964806f + 7029ef4 commit 2819091

File tree

3 files changed

+88
-31
lines changed

3 files changed

+88
-31
lines changed

node/config_spec.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ doc = "The bitcoin network to operate on. Default `bitcoin`. Options are `bitcoi
1919
[[param]]
2020
name = "ping_timeout"
2121
type = "u64"
22-
default = "15"
22+
default = "60"
2323
doc = "The time (seconds) a peer has to respond to a `ping` message. Pings are sent aggressively throughout IBD to find slow peers."
2424

2525
[[param]]
@@ -43,11 +43,11 @@ doc = "The maximum time (seconds) to write to a TCP stream until the connection
4343
[[param]]
4444
name = "min_blocks_per_sec"
4545
type = "f64"
46-
default = "1."
46+
default = "3."
4747
doc = "The minimum rate a peer has to respond to block requests."
4848

4949
[[param]]
5050
name = "tasks"
5151
type = "usize"
52-
default = "64"
52+
default = "32"
5353
doc = "The number of tasks to download blocks. Default is 64. Each task uses two OS threads."

node/src/bin/ibd.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use node::{
1515
};
1616
use p2p::net::TimeoutParams;
1717

18-
const PING_INTERVAL: Duration = Duration::from_secs(15);
18+
const PING_INTERVAL: Duration = Duration::from_secs(10 * 60);
1919

2020
configure_me::include_config!();
2121

@@ -74,8 +74,7 @@ fn main() {
7474
let acc_task = std::thread::spawn(move || accumulator_state.verify());
7575
let peers = Arc::new(Mutex::new(peers));
7676
let mut tasks = Vec::new();
77-
let chunk_size = chain.best_header().height() as usize / task_num;
78-
let hashes = hashes_from_chain(Arc::clone(&chain), chunk_size);
77+
let hashes = hashes_from_chain(Arc::clone(&chain), task_num);
7978
for (task_id, chunk) in hashes.into_iter().enumerate() {
8079
let chain = Arc::clone(&chain);
8180
let tx = tx.clone();

node/src/lib.rs

Lines changed: 83 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use p2p::{
3434
};
3535

3636
const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::WTXID_RELAY_VERSION;
37+
const MAX_GETDATA: usize = 50_000;
3738

3839
pub fn elapsed_time(then: Instant) {
3940
let duration_sec = then.elapsed().as_secs_f64();
@@ -79,11 +80,12 @@ pub fn sync_block_headers(
7980
hosts: &[SocketAddr],
8081
chainman: Arc<ChainstateManager>,
8182
network: Network,
82-
timeout_params: TimeoutParams,
83+
mut timeout_params: TimeoutParams,
8384
) {
8485
let mut rng = thread_rng();
8586
let then = Instant::now();
8687
tracing::info!("Syncing block headers to assume valid hash");
88+
timeout_params.ping_interval(Duration::from_secs(30));
8789
loop {
8890
let random = hosts
8991
.choose(&mut rng)
@@ -99,20 +101,24 @@ pub fn sync_block_headers(
99101
Err(_) => continue,
100102
};
101103
tracing::info!("Connection established");
104+
let mut get_next = true;
102105
loop {
103-
let curr = chainman.best_header().block_hash().hash;
104-
let locator = BlockHash::from_byte_array(curr);
105-
let getheaders = GetHeadersMessage {
106-
version: PROTOCOL_VERSION,
107-
locator_hashes: vec![locator],
108-
stop_hash: BlockHash::GENESIS_PREVIOUS_BLOCK_HASH,
109-
};
110-
tracing::info!("Requesting {locator}");
111-
if writer
112-
.send_message(NetworkMessage::GetHeaders(getheaders))
113-
.is_err()
114-
{
115-
break;
106+
let mut kill = false;
107+
if get_next {
108+
let curr = chainman.best_header().block_hash().hash;
109+
let locator = BlockHash::from_byte_array(curr);
110+
let getheaders = GetHeadersMessage {
111+
version: PROTOCOL_VERSION,
112+
locator_hashes: vec![locator],
113+
stop_hash: BlockHash::GENESIS_PREVIOUS_BLOCK_HASH,
114+
};
115+
tracing::info!("Requesting {locator}");
116+
if writer
117+
.send_message(NetworkMessage::GetHeaders(getheaders))
118+
.is_err()
119+
{
120+
break;
121+
}
116122
}
117123
while let Ok(Some(message)) = reader.read_message() {
118124
match message {
@@ -121,6 +127,7 @@ pub fn sync_block_headers(
121127
chainman
122128
.process_new_block_headers(&consensus::serialize(&header), true)
123129
.expect("process headers failed");
130+
get_next = true;
124131
if header.block_hash().eq(&stop_hash) {
125132
tracing::info!("Done syncing block headers");
126133
if let Some(message_rate) =
@@ -138,12 +145,23 @@ pub fn sync_block_headers(
138145
tracing::info!("Update chain tip: {}", chainman.best_header().height());
139146
break;
140147
}
148+
NetworkMessage::Inv(_) => {
149+
kill = true;
150+
break;
151+
}
141152
NetworkMessage::Ping(nonce) => {
153+
get_next = false;
142154
let _ = writer.send_message(NetworkMessage::Pong(nonce));
143155
}
144-
e => tracing::info!("Ignoring message {}", e.command()),
156+
e => {
157+
get_next = false;
158+
tracing::info!("Ignoring message {}", e.command());
159+
}
145160
}
146161
}
162+
if kill {
163+
break;
164+
}
147165
}
148166
}
149167
}
@@ -153,7 +171,7 @@ pub fn get_blocks_for_range(
153171
task_id: u32,
154172
timeout_params: TimeoutParams,
155173
blocks_per_sec: f64,
156-
ping_timeout: Duration,
174+
_ping_timeout: Duration,
157175
network: Network,
158176
block_dir: &Path,
159177
chain: Arc<ChainstateManager>,
@@ -162,6 +180,7 @@ pub fn get_blocks_for_range(
162180
updater: Sender<AccumulatorUpdate>,
163181
mut batch: Vec<BlockHash>,
164182
) {
183+
tracing::info!("{task_id} assigned {} blocks", batch.len());
165184
let mut rng = thread_rng();
166185
loop {
167186
let peer = {
@@ -207,7 +226,15 @@ pub fn get_blocks_for_range(
207226
hints.get_block_offsets(block_height).into_iter().collect();
208227
// tracing::info!("{task_id} -> {block_height}:{hash}");
209228
let file_path = block_dir.join(format!("{hash}.block"));
210-
let mut file = File::create_new(file_path).expect("duplicate block file");
229+
let file = File::create_new(file_path);
230+
let mut file = match file {
231+
Ok(file) => file,
232+
Err(e) => {
233+
tracing::warn!("Conflicting open files at: {}", block_height);
234+
tracing::warn!("{e}");
235+
panic!("files cannot conflict");
236+
}
237+
};
211238
let block_bytes = consensus::serialize(&block);
212239
file.write_all(&block_bytes)
213240
.expect("failed to write block file");
@@ -243,7 +270,9 @@ pub fn get_blocks_for_range(
243270
output_index += 1
244271
}
245272
}
246-
273+
if batch.len() % 100 == 0 {
274+
tracing::info!("{task_id} has {} remaining blocks", batch.len());
275+
}
247276
if batch.is_empty() {
248277
tracing::info!("All block ranges fetched: {task_id}");
249278
return;
@@ -277,10 +306,10 @@ pub fn get_blocks_for_range(
277306
break;
278307
}
279308
}
280-
if metrics.ping_timed_out(ping_timeout) {
281-
tracing::info!("{task_id} failed to respond to a ping");
282-
break;
283-
}
309+
// if metrics.ping_timed_out(ping_timeout) {
310+
// tracing::warn!("{task_id} failed to respond to a ping");
311+
// break;
312+
// }
284313
}
285314
if batch.is_empty() {
286315
break;
@@ -289,21 +318,50 @@ pub fn get_blocks_for_range(
289318
tracing::info!("All block ranges fetched: {task_id}");
290319
}
291320

292-
pub fn hashes_from_chain(chain: Arc<ChainstateManager>, chunks: usize) -> Vec<Vec<BlockHash>> {
321+
pub fn hashes_from_chain(chain: Arc<ChainstateManager>, jobs: usize) -> Vec<Vec<BlockHash>> {
293322
let height = chain.best_header().height();
294323
let mut hashes = Vec::with_capacity(height as usize);
295324
let mut curr = chain.best_header();
296325
let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash);
297326
hashes.push(tip_hash);
327+
let mut out = Vec::new();
298328
while let Ok(next) = curr.prev() {
299329
if next.height() == 0 {
300-
return hashes.chunks(chunks).map(|slice| slice.to_vec()).collect();
330+
break;
301331
}
302332
let hash = BlockHash::from_byte_array(next.block_hash().hash);
303333
hashes.push(hash);
304334
curr = next;
305335
}
306-
hashes.chunks(chunks).map(|slice| slice.to_vec()).collect()
336+
// These blocks are empty. Fetch the maximum amount of blocks.
337+
let first_epoch = hashes.split_off(hashes.len() - 200_000);
338+
let first_chunks: Vec<Vec<BlockHash>> = first_epoch
339+
.chunks(MAX_GETDATA)
340+
.map(|slice| slice.to_vec())
341+
.collect();
342+
out.extend(first_chunks);
343+
// These start to get larger, but are still small
344+
let next_epoch = hashes.split_off(hashes.len() - 100_000);
345+
let next_chunks: Vec<Vec<BlockHash>> = next_epoch
346+
.chunks(MAX_GETDATA / 2)
347+
.map(|slice| slice.to_vec())
348+
.collect();
349+
out.extend(next_chunks);
350+
// Still not entirely full, but almost there
351+
let to_segwit = hashes.split_off(hashes.len() - 100_000);
352+
let to_segwit_chunks: Vec<Vec<BlockHash>> = to_segwit
353+
.chunks(MAX_GETDATA / 4)
354+
.map(|slice| slice.to_vec())
355+
.collect();
356+
out.extend(to_segwit_chunks);
357+
// Now divide the rest among jobs
358+
let chunk_size = hashes.len() / jobs;
359+
let rest: Vec<Vec<BlockHash>> = hashes
360+
.chunks(chunk_size)
361+
.map(|slice| slice.to_vec())
362+
.collect();
363+
out.extend(rest);
364+
out
307365
}
308366

309367
pub trait ChainExt {

0 commit comments

Comments
 (0)