Skip to content

Commit 0f9f80c

Browse files
committed
WIP add post-IBD block migration
1 parent 80e848e commit 0f9f80c

3 files changed

Lines changed: 103 additions & 10 deletions

File tree

node/config_spec.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ doc = "The path to your `bitcoin.hints` file that will be used for IBD. Default
88
name = "blocks_dir"
99
type = "String"
1010
default = "\"./blockfiles\".into()"
11-
doc = "Directory where you would like to store the bitcoin blocks. Default `./blockfiles`"
11+
doc = "Temporary directory where you would like to store the bitcoin blocks. Default `./blockfiles`"
1212

1313
[[param]]
1414
name = "network"

node/src/bin/ibd.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{
22
fs::File,
33
path::PathBuf,
4+
str::FromStr,
45
sync::{mpsc::channel, Arc, Mutex},
56
time::{Duration, Instant},
67
};
@@ -10,8 +11,8 @@ use hintfile::Hints;
1011
use kernel::{ChainstateManager, ChainstateManagerOptions, ContextBuilder};
1112

1213
use node::{
13-
bootstrap_dns, elapsed_time, get_blocks_for_range, hashes_from_chain, sync_block_headers,
14-
AccumulatorState, ChainExt,
14+
bootstrap_dns, elapsed_time, emit_hashes_in_order, get_blocks_for_range, hashes_from_chain,
15+
migrate_blocks, setup_validation_interface, sync_block_headers, AccumulatorState, ChainExt,
1516
};
1617
use p2p::net::TimeoutParams;
1718

@@ -20,9 +21,14 @@ const PING_INTERVAL: Duration = Duration::from_secs(10 * 60);
2021
configure_me::include_config!();
2122

2223
fn main() {
24+
let subscriber = tracing_subscriber::FmtSubscriber::new();
25+
tracing::subscriber::set_global_default(subscriber).unwrap();
2326
let (config, _) = Config::including_optional_config_files::<&[&str]>(&[]).unwrap_or_exit();
2427
let hint_path = config.hintfile;
2528
let blocks_dir = config.blocks_dir;
29+
let home_var = std::env::var("HOME").unwrap();
30+
let home_dir = PathBuf::from_str(&home_var).unwrap();
31+
let bitcoind_dir = home_dir.join(".bitcoin");
2632
let network = config
2733
.network
2834
.parse::<Network>()
@@ -38,8 +44,6 @@ fn main() {
3844
timeout_conf.write_timeout(write_timeout);
3945
timeout_conf.tcp_handshake_timeout(tcp_timeout);
4046
timeout_conf.ping_interval(PING_INTERVAL);
41-
let subscriber = tracing_subscriber::FmtSubscriber::new();
42-
tracing::subscriber::set_global_default(subscriber).unwrap();
4347
let hintfile_start_time = Instant::now();
4448
tracing::info!("Reading in {hint_path}");
4549
let mut hintfile = File::open(hint_path).expect("invalid hintfile path");
@@ -58,9 +62,16 @@ fn main() {
5862
let kernel_start_time = Instant::now();
5963
let ctx = ContextBuilder::new()
6064
.chain_type(network.chain_type())
65+
.validation_interface(setup_validation_interface())
6166
.build()
6267
.unwrap();
63-
let options = ChainstateManagerOptions::new(&ctx, ".", "./blocks").unwrap();
68+
let bitcoind_block_dir = bitcoind_dir.join("blocks");
69+
let options = ChainstateManagerOptions::new(
70+
&ctx,
71+
bitcoind_dir.to_str().unwrap(),
72+
bitcoind_block_dir.to_str().unwrap(),
73+
)
74+
.unwrap();
6475
let chainman = ChainstateManager::new(options).unwrap();
6576
elapsed_time(kernel_start_time);
6677
let tip = chainman.best_header().height();
@@ -74,7 +85,12 @@ fn main() {
7485
let acc_task = std::thread::spawn(move || accumulator_state.verify());
7586
let peers = Arc::new(Mutex::new(peers));
7687
let mut tasks = Vec::new();
77-
let hashes = hashes_from_chain(Arc::clone(&chain), task_num);
88+
let hashes = if matches!(network, Network::Bitcoin) {
89+
hashes_from_chain(Arc::clone(&chain), task_num)
90+
} else {
91+
let hashes = emit_hashes_in_order(Arc::clone(&chain)).collect::<Vec<BlockHash>>();
92+
hashes.chunks(10_000).map(|slice| slice.to_vec()).collect()
93+
};
7894
for (task_id, chunk) in hashes.into_iter().enumerate() {
7995
let chain = Arc::clone(&chain);
8096
let tx = tx.clone();
@@ -107,4 +123,6 @@ fn main() {
107123
let acc_result = acc_task.join().unwrap();
108124
tracing::info!("Verified: {acc_result}");
109125
elapsed_time(main_routine_time);
126+
tracing::info!("Migrating blocks to Bitcoin Core");
127+
migrate_blocks(chain, &block_file_path);
110128
}

node/src/lib.rs

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
collections::HashSet,
33
fs::File,
4-
io::Write,
4+
io::{Read, Write},
55
net::{IpAddr, Ipv4Addr, SocketAddr},
66
path::Path,
77
sync::{
@@ -198,7 +198,12 @@ pub fn get_blocks_for_range(
198198
continue;
199199
};
200200
// tracing::info!("Connection successful");
201-
let payload = InventoryPayload(batch.iter().map(|hash| Inventory::Block(*hash)).collect());
201+
let payload = InventoryPayload(
202+
batch
203+
.iter()
204+
.map(|hash| Inventory::WitnessBlock(*hash))
205+
.collect(),
206+
);
202207
// tracing::info!("Requesting {} blocks", payload.0.len());
203208
let getdata = NetworkMessage::GetData(payload);
204209
if writer.send_message(getdata).is_err() {
@@ -232,9 +237,12 @@ pub fn get_blocks_for_range(
232237
panic!("files cannot conflict");
233238
}
234239
};
235-
let block_bytes = consensus::serialize(&block);
240+
let block_bytes = consensus::encode::serialize(&block);
236241
file.write_all(&block_bytes)
237242
.expect("failed to write block file");
243+
file.flush().expect("failed to flush entire block to disk");
244+
file.sync_all().expect("file failed to sync with the OS");
245+
drop(file);
238246
// tracing::info!("Wrote {hash} to file");
239247
let (_, transactions) = block.into_parts();
240248
let mut output_index = 0;
@@ -361,6 +369,73 @@ pub fn hashes_from_chain(chain: Arc<ChainstateManager>, jobs: usize) -> Vec<Vec<
361369
out
362370
}
363371

372+
pub fn emit_hashes_in_order(chain: Arc<ChainstateManager>) -> impl Iterator<Item = BlockHash> {
373+
let height = chain.best_header().height();
374+
let mut hashes = Vec::with_capacity(height as usize);
375+
let mut curr = chain.best_header();
376+
let tip_hash = BlockHash::from_byte_array(curr.block_hash().hash);
377+
hashes.push(tip_hash);
378+
while let Ok(next) = curr.prev() {
379+
if next.height() == 0 {
380+
break;
381+
}
382+
let hash = BlockHash::from_byte_array(next.block_hash().hash);
383+
hashes.push(hash);
384+
curr = next;
385+
}
386+
hashes.into_iter().rev()
387+
}
388+
389+
pub fn migrate_blocks(chain: Arc<ChainstateManager>, block_dir: &Path) {
390+
let start = Instant::now();
391+
for (i, hash) in emit_hashes_in_order(Arc::clone(&chain)).enumerate() {
392+
let file_path = block_dir.join(format!("{hash}.block"));
393+
let mut file =
394+
File::open(&file_path).expect("block file not present. did IBD complete successfully?");
395+
let mut block_bytes = Vec::new();
396+
file.read_to_end(&mut block_bytes)
397+
.expect("unexpected error parsing block file");
398+
let block =
399+
kernel::Block::try_from(block_bytes.as_slice()).expect("invalid block serialization");
400+
let (accepted, _) = chain.process_block(&block);
401+
if !accepted {
402+
tracing::warn!("{hash} was rejected");
403+
panic!("could not migrate blocks");
404+
}
405+
drop(file);
406+
if let Err(e) = std::fs::remove_file(file_path) {
407+
tracing::warn!("Could not remove block file {e}");
408+
}
409+
if i % 100 == 0 {
410+
tracing::info!("{i}th block migrated");
411+
elapsed_time(start);
412+
}
413+
}
414+
}
415+
416+
pub fn setup_validation_interface() -> Box<kernel::ValidationInterfaceCallbacks> {
417+
Box::new(kernel::ValidationInterfaceCallbacks {
418+
block_checked: Box::new(move |_block, _mode, result| match result {
419+
kernel::BlockValidationResult::MUTATED => tracing::warn!("Received mutated block"),
420+
kernel::BlockValidationResult::CONSENSUS => tracing::warn!("Invalid consensus"),
421+
kernel::BlockValidationResult::CACHED_INVALID => tracing::warn!("Cached as invalid"),
422+
kernel::BlockValidationResult::INVALID_HEADER => {
423+
tracing::warn!("Block header is malformed")
424+
}
425+
kernel::BlockValidationResult::TIME_FUTURE => tracing::warn!("Invalid timestamp"),
426+
kernel::BlockValidationResult::MISSING_PREV => tracing::warn!("Missing previous block"),
427+
kernel::BlockValidationResult::HEADER_LOW_WORK => {
428+
tracing::warn!("Header has too-low work")
429+
}
430+
kernel::BlockValidationResult::INVALID_PREV => {
431+
tracing::warn!("Invalid previous block hash")
432+
}
433+
// Result is unset (not rejected)
434+
_ => (),
435+
}),
436+
})
437+
}
438+
364439
pub trait ChainExt {
365440
fn chain_type(&self) -> ChainType;
366441
}

0 commit comments

Comments
 (0)