From 05adc3386192f5293b7ec7399c3c31dd33893411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C5=91rinc?= Date: Sat, 21 Feb 2026 20:43:26 +0100 Subject: [PATCH] test: add stale-first reorg proxy with reorg validation Add a stale-first historical reorg proxy and functional coverage: - add test framework helper to serve stale blocks before canonical blocks - add p2p_historical_reorg_proxy functional test - add contrib/historical_reorg_proxy.py runner for local/real-node replay - add debug.log reorg validation summary in the contrib runner - add docs and register test in test_runner.py Reproducer (local, stop at height 300000): BASE_DIR="/mnt/my_storage" SRC_DIR="/bitcoin" SRC_DATA_DIR="/BitcoinData" TGT_DATA_DIR="/ShallowBitcoinData" LOG_DIR="/logs" STALE_DIR="/data/my_storage/stale-blocks/blocks" # Terminal 1: upstream node (RPC required; cookie auth, no rpcuser/rpcpassword) "/build/bin/bitcoind" \ -datadir="" \ -server=1 \ -rpcbind=127.0.0.1 \ -rpcallowip=127.0.0.1 \ -rpcport=19443 \ -stopatheight=300000 \ -debuglogfile="/upstream-300k.log" # Terminal 2: stale-first proxy "/contrib/historical_reorg_proxy.py" \ --upstream-rpc=http://127.0.0.1:19443 \ --upstream-cookie-file="/.cookie" \ --stale-blocks-dir="" \ --target-debug-log="/shallow-300k.log" \ --listen-host=127.0.0.1 \ --listen-port=8338 \ --network=mainnet # Terminal 3: target node under test (no -server=1) "/build/bin/bitcoind" \ -datadir="" \ -prune=550 \ -noconnect \ -addnode=127.0.0.1:8338 \ -listen=0 \ -dnsseed=0 \ -fixedseeds=0 \ -discover=0 \ -stopatheight=300000 \ -debuglogfile="/shallow-300k.log" After the target exits at 300000, stop the proxy with Ctrl-C to print the reorg validation summary from debug.log. ---- Reproducer (local, stop at height 300000): BASE_DIR="/mnt/my_storage" SRC_DIR="$BASE_DIR/bitcoin" SRC_DATA_DIR="$BASE_DIR/BitcoinData" TGT_DATA_DIR="$BASE_DIR/ShallowBitcoinData" LOG_DIR="$BASE_DIR/logs" STALE_DIR="/data/my_storage/stale-blocks/blocks" # Terminal 1: upstream node (RPC required; cookie auth, no rpcuser/rpcpassword) "$SRC_DIR/build/bin/bitcoind" \ -datadir="$SRC_DATA_DIR" \ -server=1 \ -rpcbind=127.0.0.1 \ -rpcallowip=127.0.0.1 \ -rpcport=19443 \ -stopatheight=300000 \ -debuglogfile="$LOG_DIR/upstream-300k.log" # Terminal 2: stale-first proxy "$SRC_DIR/contrib/historical_reorg_proxy.py" \ --upstream-rpc=http://127.0.0.1:19443 \ --upstream-cookie-file="$SRC_DATA_DIR/.cookie" \ --stale-blocks-dir="$STALE_DIR" \ --target-debug-log="$LOG_DIR/shallow-300k.log" \ --listen-host=127.0.0.1 \ --listen-port=8338 \ --network=mainnet # Terminal 3: target node under test (no -server=1) "$SRC_DIR/build/bin/bitcoind" \ -datadir="$TGT_DATA_DIR" \ -prune=550 \ -noconnect \ -addnode=127.0.0.1:8338 \ -listen=0 \ -dnsseed=0 \ -fixedseeds=0 \ -discover=0 \ -stopatheight=300000 \ -debuglogfile="$LOG_DIR/shallow-300k.log" After the target exits at 300000, stop the proxy with Ctrl-C to print the reorg validation summary from debug.log. --- contrib/historical_reorg_proxy.py | 744 ++++++++++ contrib/run_latest_reorg_tmux.sh | 58 + doc/historical_reorg_proxy.md | 109 ++ test/functional/p2p_historical_reorg_proxy.py | 301 ++++ .../test_framework/stale_reorg_proxy.py | 1306 +++++++++++++++++ test/functional/test_runner.py | 1 + 6 files changed, 2519 insertions(+) create mode 100755 contrib/historical_reorg_proxy.py create mode 100755 contrib/run_latest_reorg_tmux.sh create mode 100644 doc/historical_reorg_proxy.md create mode 100755 test/functional/p2p_historical_reorg_proxy.py create mode 100644 test/functional/test_framework/stale_reorg_proxy.py diff --git a/contrib/historical_reorg_proxy.py b/contrib/historical_reorg_proxy.py new file mode 100755 index 000000000000..e165aa528b94 --- /dev/null +++ b/contrib/historical_reorg_proxy.py @@ -0,0 +1,744 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Run a stale-first block proxy for historical reorg replay.""" + +from argparse import ArgumentParser +from collections import defaultdict +import http.client +import json +from pathlib import Path +import queue +import re +import sys +import threading +import time +from urllib.parse import ( + quote, + urlsplit, + urlunsplit, +) + + +REPO_ROOT = Path(__file__).resolve().parents[1] +FUNCTIONAL_ROOT = REPO_ROOT / "test" / "functional" +if str(FUNCTIONAL_ROOT) not in sys.path: + sys.path.insert(0, str(FUNCTIONAL_ROOT)) + +from test_framework.authproxy import ( # noqa: E402 + AuthServiceProxy, + JSONRPCException, +) +from test_framework.p2p import ( # noqa: E402 + NetworkThread, + P2P_SERVICES, +) +from test_framework.stale_reorg_proxy import ( # noqa: E402 + HistoricalReorgProxy, + RPCChainView, + hash_int_to_hex, + load_stale_blocks_from_dir, +) + +UPDATE_TIP_RE = re.compile(r"UpdateTip: new best=([0-9a-f]{64}) height=(\d+)") +TARGET_VALIDATION_MARKERS = ( + "block is marked invalid", + "ConnectBlock", + "ActivateBestChain failed", + "InvalidChainFound", + "DisconnectTip", + "REORGANIZE", + "Chain reorganization", + "bad-blk", + "bad-txns", +) + +LISTEN_HOST = "127.0.0.1" +NETWORK = "mainnet" +RAW_CACHE_MIB = 1024 +RPC_BLOCK_BATCH_SIZE = 256 +RPC_HEADER_BATCH_SIZE = 1024 +RPC_HASH_BATCH_SIZE = 4096 +TIP_HEIGHT_CACHE_SECONDS = 2.0 +ACTIVE_PREFETCH_WINDOW = 1024 +ACTIVE_SEND_AHEAD = 256 +DEFER_STALE_UNTIL_GETDATA = True +PROGRESS_LOG_SECONDS = 5.0 +POLL_SECONDS = 1.0 + + +class ResilientRPCClient: + """Reconnect-and-retry wrapper around AuthServiceProxy.""" + + def __init__(self, rpc_url, *, timeout, retries=12, retry_delay=0.2): + self._rpc_url = rpc_url + self._timeout = timeout + self._retries = retries + self._retry_delay = retry_delay + self._rpc = AuthServiceProxy(self._rpc_url, timeout=self._timeout) + + def _reconnect(self): + self._rpc = AuthServiceProxy(self._rpc_url, timeout=self._timeout) + + @staticmethod + def _is_retryable_error(exc): + return isinstance( + exc, + ( + BrokenPipeError, + ConnectionError, + OSError, + TimeoutError, + http.client.HTTPException, + ), + ) + + def _call(self, method_name, *args, **kwargs): + delay = self._retry_delay + attempt = 0 + while True: + try: + return getattr(self._rpc, method_name)(*args, **kwargs) + except JSONRPCException as exc: + # During startup, bitcoind may still be warming up and return -28. + # Treat this as transient and wait until the node is ready. + if isinstance(exc.error, dict) and exc.error.get("code") == -28: + time.sleep(delay) + delay = min(delay * 2, 2.0) + continue + raise + except Exception as exc: + if not self._is_retryable_error(exc): + raise + attempt += 1 + if attempt >= self._retries: + raise + print( + f"Transient RPC error on {method_name} " + f"(attempt {attempt}/{self._retries}): {exc!r}; reconnecting..." + ) + self._reconnect() + time.sleep(delay) + delay = min(delay * 2, 2.0) + raise RuntimeError("unreachable") + + def batch(self, rpc_call_list): + delay = self._retry_delay + attempt = 0 + while True: + try: + return self._rpc.batch(rpc_call_list) + except JSONRPCException as exc: + if isinstance(exc.error, dict) and exc.error.get("code") == -28: + time.sleep(delay) + delay = min(delay * 2, 2.0) + continue + raise + except Exception as exc: + if not self._is_retryable_error(exc): + raise + attempt += 1 + if attempt >= self._retries: + raise + print( + f"Transient RPC error on batch " + f"(attempt {attempt}/{self._retries}): {exc!r}; reconnecting..." + ) + self._reconnect() + time.sleep(delay) + delay = min(delay * 2, 2.0) + raise RuntimeError("unreachable") + + def __getattr__(self, name): + if name.startswith("__") and name.endswith("__"): + raise AttributeError + + def caller(*args, **kwargs): + return self._call(name, *args, **kwargs) + + return caller + + +def parse_args(): + parser = ArgumentParser(description=__doc__) + parser.add_argument( + "--upstream-rpc", + required=True, + help="Upstream node RPC URL, e.g. http://127.0.0.1:8332", + ) + parser.add_argument( + "--upstream-cookie-file", + default=None, + help="Optional upstream RPC .cookie file used for authentication", + ) + parser.add_argument( + "--target-rpc", + default=None, + help="Optional target node RPC URL used for UTXO snapshot output", + ) + parser.add_argument( + "--target-cookie-file", + default=None, + help="Optional target RPC .cookie file used for authentication", + ) + parser.add_argument( + "--stale-blocks-dir", + default=str(REPO_ROOT / "stale-blocks" / "blocks"), + help="Directory containing stale block blobs (-.bin)", + ) + parser.add_argument( + "--listen-port", + type=int, + default=8338, + help="Port to bind the proxy listener", + ) + parser.add_argument( + "--target-debug-log", + default=None, + help="Path to target node debug.log for reorg validation summary", + ) + return parser.parse_args() + + +def collect_snapshot(target_rpc): + snapshot = target_rpc.gettxoutsetinfo("hash_serialized_3", use_index=False) + return { + "height": snapshot["height"], + "bestblock": snapshot["bestblock"], + "hash_serialized_3": snapshot.get("hash_serialized_3"), + "txouts": snapshot["txouts"], + "total_amount": str(snapshot["total_amount"]), + } + +def collect_utxo_hash(rpc): + info = rpc.gettxoutsetinfo("hash_serialized_3", use_index=False) + return { + "height": info["height"], + "bestblock": info["bestblock"], + "hash_serialized_3": info.get("hash_serialized_3"), + } + + +def print_utxo_hash_comparison(upstream, target): + match = ( + upstream.get("hash_serialized_3") is not None + and upstream.get("bestblock") == target.get("bestblock") + and upstream.get("hash_serialized_3") == target.get("hash_serialized_3") + ) + print("UTXO set hash comparison (hash_serialized_3)") + print( + " upstream: height={} bestblock={} hash_serialized_3={}".format( + upstream.get("height"), + upstream.get("bestblock"), + upstream.get("hash_serialized_3"), + ) + ) + print( + " target: height={} bestblock={} hash_serialized_3={}".format( + target.get("height"), + target.get("bestblock"), + target.get("hash_serialized_3"), + ) + ) + print(" result: {}".format("MATCH" if match else "MISMATCH")) + return match + + +def try_collect_snapshot(target_rpc): + if target_rpc is None: + return None + try: + return collect_snapshot(target_rpc) + except Exception as exc: + print(f"Skipping final snapshot collection: {exc!r}") + return None + + +def try_getblockcount(target_rpc): + if target_rpc is None: + return None + try: + return target_rpc.getblockcount() + except JSONRPCException as exc: + # Target may still be starting up; treat as transient. + if isinstance(exc.error, dict) and exc.error.get("code") == -28: + return None + raise + except Exception: + return None + + +def parse_update_tip_events(debug_log_path, start_offset=0): + path = Path(debug_log_path) + if not path.exists(): + return [] + + events = [] + with path.open("r", encoding="utf-8", errors="replace") as log_file: + if start_offset > 0: + file_size = path.stat().st_size + if file_size >= start_offset: + log_file.seek(start_offset) + + for lineno, line in enumerate(log_file, start=1): + match = UPDATE_TIP_RE.search(line) + if match is None: + continue + events.append( + { + "lineno": lineno, + "hash": match.group(1), + "height": int(match.group(2)), + } + ) + return events + + +def scan_target_debug_events(debug_log_path, start_offset=0, last_tip=None): + path = Path(debug_log_path) + if not path.exists(): + return start_offset, last_tip, [] + + file_size = path.stat().st_size + offset = start_offset if 0 <= start_offset <= file_size else 0 + events = [] + tip = last_tip + + with path.open("r", encoding="utf-8", errors="replace") as log_file: + log_file.seek(offset) + for line in log_file: + stripped = line.rstrip("\n") + match = UPDATE_TIP_RE.search(stripped) + if match is not None: + new_hash = match.group(1) + new_height = int(match.group(2)) + if tip is not None: + prev_height, prev_hash = tip + if new_hash != prev_hash and new_height <= prev_height: + events.append( + { + "kind": "reorg", + "line": stripped, + "prev_height": prev_height, + "prev_hash": prev_hash, + "new_height": new_height, + "new_hash": new_hash, + } + ) + tip = (new_height, new_hash) + continue + + if any(marker in stripped for marker in TARGET_VALIDATION_MARKERS): + events.append({"kind": "validation", "line": stripped}) + + new_offset = log_file.tell() + + return new_offset, tip, events + + +def validate_reorgs_from_events(served_stale, update_tip_events): + """Validate that each served stale tip later gets replaced.""" + positions_by_hash = defaultdict(list) + for idx, event in enumerate(update_tip_events): + positions_by_hash[event["hash"]].append(idx) + + per_block = [] + for stale in sorted(served_stale, key=lambda item: (item["height"], item["hash"])): + stale_hash = stale["hash"] + stale_height = stale["height"] + positions = positions_by_hash.get(stale_hash, []) + + connected_as_tip = bool(positions) + reorged_out = False + saw_disconnect_phase = False + replaced_by = None + + if connected_as_tip: + first_pos = positions[0] + for event in update_tip_events[first_pos + 1 :]: + if event["height"] < stale_height: + saw_disconnect_phase = True + if event["hash"] != stale_hash and event["height"] >= stale_height: + reorged_out = True + replaced_by = { + "hash": event["hash"], + "height": event["height"], + "lineno": event["lineno"], + } + break + + per_block.append( + { + "hash": stale_hash, + "height": stale_height, + "connected_as_tip": connected_as_tip, + "reorged_out": reorged_out, + "saw_disconnect_phase": saw_disconnect_phase, + "replaced_by": replaced_by, + } + ) + + connected = [block for block in per_block if block["connected_as_tip"]] + reorged = [block for block in per_block if block["reorged_out"]] + missing_tip = [block for block in per_block if not block["connected_as_tip"]] + missing_reorg = [ + block + for block in per_block + if block["connected_as_tip"] and not block["reorged_out"] + ] + saw_disconnect = [block for block in per_block if block["saw_disconnect_phase"]] + + return { + "served_stale": len(served_stale), + "update_tip_events": len(update_tip_events), + "connected_as_tip": len(connected), + "reorged_out": len(reorged), + "with_disconnect_phase": len(saw_disconnect), + "missing_tip": missing_tip, + "missing_reorg": missing_reorg, + "details": per_block, + } + + +def print_reorg_summary(summary): + print("Reorg validation summary") + print(f" Served stale blocks: {summary['served_stale']}") + print(f" UpdateTip log events: {summary['update_tip_events']}") + print(f" Connected as tip: {summary['connected_as_tip']}") + print(f" Reorged out: {summary['reorged_out']}") + print(f" With disconnect evidence: {summary['with_disconnect_phase']}") + + if summary["missing_tip"]: + print(" Missing tip activations:") + for block in summary["missing_tip"]: + print(f" {block['height']}-{block['hash']}") + + if summary["missing_reorg"]: + print(" Missing reorg evidence:") + for block in summary["missing_reorg"]: + print(f" {block['height']}-{block['hash']}") + + +def rpc_url_with_cookie(rpc_url, cookie_file): + normalized_url = rpc_url if "://" in rpc_url else f"http://{rpc_url}" + if cookie_file is None: + return normalized_url + + cookie_text = Path(cookie_file).read_text(encoding="utf-8").strip() + if ":" not in cookie_text: + raise ValueError(f"Malformed cookie file (missing user:pass): {cookie_file}") + + cookie_user, cookie_pass = cookie_text.split(":", 1) + split = urlsplit(normalized_url) + + # If credentials already exist in URL, keep them. + if split.username is not None: + return normalized_url + + netloc = f"{quote(cookie_user, safe='')}:{quote(cookie_pass, safe='')}@{split.netloc}" + return urlunsplit((split.scheme, netloc, split.path, split.query, split.fragment)) + + +def main(): + args = parse_args() + debug_log_offset = 0 + live_debug_offset = 0 + last_tip = None + + upstream_rpc_url = None + while upstream_rpc_url is None: + try: + upstream_rpc_url = rpc_url_with_cookie(args.upstream_rpc, args.upstream_cookie_file) + except FileNotFoundError: + time.sleep(0.2) + upstream_rpc = ResilientRPCClient(upstream_rpc_url, timeout=300) + + target_rpc = None + stale_force_queue = None + stale_force_thread = None + forced_stale_hashes = set() + compare_height = None + + def ensure_target_rpc(): + nonlocal target_rpc + if not args.target_rpc: + return None + if target_rpc is not None: + return target_rpc + try: + target_rpc_url = rpc_url_with_cookie(args.target_rpc, args.target_cookie_file) + except FileNotFoundError: + return None + target_rpc = ResilientRPCClient(target_rpc_url, timeout=300) + return target_rpc + + stale_dir = Path(args.stale_blocks_dir) + stale_blocks = [] + if stale_dir.exists(): + stale_blocks = load_stale_blocks_from_dir(stale_dir) + else: + print(f"Warning: stale blocks directory does not exist: {stale_dir}") + + print(f"Loaded {len(stale_blocks)} stale blocks from {stale_dir}") + + if args.target_rpc: + compare_height = upstream_rpc.getblockcount() + print(f"Tracking final sync target height={compare_height}") + + if args.target_debug_log: + debug_log_path = Path(args.target_debug_log) + if debug_log_path.exists(): + debug_log_offset = debug_log_path.stat().st_size + live_debug_offset = debug_log_offset + print( + f"Tracking debug log for reorg validation: {debug_log_path} " + f"(start byte={debug_log_offset})" + ) + + chain_view = RPCChainView( + upstream_rpc, + block_raw_cache_max_bytes=RAW_CACHE_MIB * 1024 * 1024, + rpc_batch_size=RPC_BLOCK_BATCH_SIZE, + rpc_header_batch_size=RPC_HEADER_BATCH_SIZE, + rpc_hash_batch_size=RPC_HASH_BATCH_SIZE, + tip_height_cache_seconds=TIP_HEIGHT_CACHE_SECONDS, + ) + print( + "Proxy tuning: raw_cache_mib={} rpc_block_batch={} rpc_header_batch={} " + "rpc_hash_batch={} tip_cache_s={} prefetch_window={} send_ahead={}".format( + RAW_CACHE_MIB, + RPC_BLOCK_BATCH_SIZE, + RPC_HEADER_BATCH_SIZE, + RPC_HASH_BATCH_SIZE, + TIP_HEIGHT_CACHE_SECONDS, + ACTIVE_PREFETCH_WINDOW, + ACTIVE_SEND_AHEAD, + ) + ) + + stale_served_hook = None + if args.target_rpc: + stale_force_queue = queue.SimpleQueue() + + def stale_served_hook(stale, source): + stale_force_queue.put((stale, source)) + + proxy = HistoricalReorgProxy( + chain_view=chain_view, + stale_blocks=stale_blocks, + defer_stale_until_getdata=DEFER_STALE_UNTIL_GETDATA, + stale_served_hook=stale_served_hook, + record_all_served_blocks=False, + record_getdata_requests=False, + active_prefetch_window=ACTIVE_PREFETCH_WINDOW, + active_send_ahead=ACTIVE_SEND_AHEAD, + ) + proxy.peer_connect_helper("0", 0, NETWORK, 1) + proxy.configure_inbound_reconnect( + net=NETWORK, + services=P2P_SERVICES, + supports_v2_p2p=False, + ) + + network_thread = NetworkThread() + network_thread.start() + + ready = threading.Event() + + def on_listen(addr, port): + print(f"Proxy listening on {addr}:{port}") + print(f"Connect target node with: -connect={addr}:{port}") + ready.set() + + NetworkThread.listen(proxy, on_listen, addr=LISTEN_HOST, port=args.listen_port) + if not ready.wait(timeout=10): + raise RuntimeError("Timed out while waiting for proxy listener startup") + + if args.target_rpc: + + def worker(): + nonlocal forced_stale_hashes + while True: + item = stale_force_queue.get() + if item is None: + return + stale, source = item + stale_hash_hex = hash_int_to_hex(stale.hash_int) + if stale_hash_hex in forced_stale_hashes: + continue + + rpc = None + while rpc is None: + rpc = ensure_target_rpc() + if rpc is None: + time.sleep(0.2) + continue + height = try_getblockcount(rpc) + if height is None: + rpc = None + time.sleep(0.2) + + forced = False + for attempt in range(1, 9): + try: + rpc.preciousblock(stale_hash_hex) + forced = True + break + except JSONRPCException as exc: + msg = None + code = None + if isinstance(exc.error, dict): + code = exc.error.get("code") + msg = exc.error.get("message") + if code == -28: + time.sleep(min(0.25 * attempt, 2.0)) + continue + if msg and "Block not found" in msg: + try: + submit_res = rpc.submitblock(stale.raw_block.hex()) + if submit_res is not None: + print( + f"[proxy] submitblock returned {submit_res!r} for stale {stale.height}-{stale_hash_hex}", + flush=True, + ) + except Exception as submit_exc: + print( + f"[proxy] submitblock failed for stale {stale.height}-{stale_hash_hex}: {submit_exc!r}", + flush=True, + ) + time.sleep(min(0.25 * attempt, 2.0)) + continue + print( + f"[proxy] preciousblock failed for stale {stale.height}-{stale_hash_hex}: {exc!r}", + flush=True, + ) + break + except Exception as exc: + print( + f"[proxy] preciousblock transport failure for stale {stale.height}-{stale_hash_hex}: {exc!r}", + flush=True, + ) + time.sleep(min(0.25 * attempt, 2.0)) + if forced: + forced_stale_hashes.add(stale_hash_hex) + try: + best = rpc.getbestblockhash() + except Exception: + best = None + print( + "[proxy] forced precious stale [{}] height={} hash={} best={}".format( + source, + stale.height, + stale_hash_hex, + best if best is not None else "?", + ), + flush=True, + ) + + stale_force_thread = threading.Thread(target=worker, name="stale-precious-worker", daemon=True) + stale_force_thread.start() + + exit_code = 0 + snapshot_printed = False + last_height_log_time = 0.0 + last_target_height = None + target_height = None + try: + while True: + snapshot_rpc = ensure_target_rpc() + if snapshot_rpc is not None: + target_height = try_getblockcount(snapshot_rpc) + now = time.monotonic() + if ( + last_target_height is None + or (target_height is not None and target_height != last_target_height) + or now - last_height_log_time >= PROGRESS_LOG_SECONDS + ): + if target_height is not None: + print(f"[target-progress] height={target_height}") + last_target_height = target_height + last_height_log_time = now + + if args.target_debug_log: + live_debug_offset, last_tip, live_events = scan_target_debug_events( + args.target_debug_log, + start_offset=live_debug_offset, + last_tip=last_tip, + ) + for event in live_events: + if event["kind"] == "reorg": + print( + "[target-reorg] prev_height={} prev_hash={} -> " + "new_height={} new_hash={}".format( + event["prev_height"], + event["prev_hash"], + event["new_height"], + event["new_hash"], + ) + ) + else: + print(f"[target-validation] {event['line']}") + + if compare_height is not None and snapshot_rpc is not None and target_height is not None: + if target_height >= compare_height: + print("Computing UTXO set hashes (hash_serialized_3)...") + snapshot = collect_snapshot(snapshot_rpc) + upstream = collect_utxo_hash(upstream_rpc) + target = { + "height": snapshot.get("height"), + "bestblock": snapshot.get("bestblock"), + "hash_serialized_3": snapshot.get("hash_serialized_3"), + } + if not print_utxo_hash_comparison(upstream, target): + exit_code = 1 + print(json.dumps(snapshot, indent=2, sort_keys=True)) + snapshot_printed = True + break + time.sleep(POLL_SECONDS) + except KeyboardInterrupt: + pass + finally: + if not snapshot_printed: + snapshot_rpc = ensure_target_rpc() + snapshot = try_collect_snapshot(snapshot_rpc) + if snapshot is not None: + print(json.dumps(snapshot, indent=2, sort_keys=True)) + + if args.target_debug_log: + served_stale = [] + seen_stale_hashes = set() + for kind, height, block_hash_int in proxy.served_blocks: + if kind != "stale": + continue + if block_hash_int in seen_stale_hashes: + continue + seen_stale_hashes.add(block_hash_int) + served_stale.append( + { + "height": int(height), + "hash": hash_int_to_hex(block_hash_int), + } + ) + + update_tip_events = parse_update_tip_events( + args.target_debug_log, + start_offset=debug_log_offset, + ) + summary = validate_reorgs_from_events(served_stale, update_tip_events) + print_reorg_summary(summary) + + if summary["missing_tip"] or summary["missing_reorg"]: + print("Reorg validation failed.") + exit_code = 1 + + network_thread.close(timeout=10) + if stale_force_queue is not None: + stale_force_queue.put(None) + if stale_force_thread is not None: + stale_force_thread.join(timeout=10) + return exit_code + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/contrib/run_latest_reorg_tmux.sh b/contrib/run_latest_reorg_tmux.sh new file mode 100755 index 000000000000..b70f82254153 --- /dev/null +++ b/contrib/run_latest_reorg_tmux.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +set -euo pipefail + +BTC_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +UPSTREAM_DATADIR="${UPSTREAM_DATADIR:-/mnt/my_storage/BitcoinData}" +TARGET_DATADIR="${TARGET_DATADIR:-/mnt/my_storage/ShallowBitcoinDataLatest}" +STALE_BLOCKS_DIR="${STALE_BLOCKS_DIR:-/mnt/my_storage/stale-blocks/blocks}" + +UPSTREAM_RPC_PORT="${UPSTREAM_RPC_PORT:-8332}" +PROXY_PORT="${PROXY_PORT:-8348}" +TARGET_RPC_PORT="${TARGET_RPC_PORT:-19454}" + +WINDOW_NAME="${WINDOW_NAME:-stale-reorg}" +SESSION_NAME="${SESSION_NAME:-stale-reorg}" + +COMPILE="${COMPILE:-1}" +CCACHE_DISABLE="${CCACHE_DISABLE:-1}" + +if [[ "$COMPILE" == "1" ]]; then + echo "[build] configuring + compiling for current commit..." + ( + cd "$BTC_DIR" && \ + cmake -B build -DCMAKE_BUILD_TYPE=Release && \ + CCACHE_DISABLE="$CCACHE_DISABLE" cmake --build build -j"$(nproc)" --target bitcoind bitcoin-cli + ) +fi + +mkdir -p "$TARGET_DATADIR" + +UPSTREAM_CMD="cd '$BTC_DIR' && build/bin/bitcoind -datadir='$UPSTREAM_DATADIR' -server=1 -noconnect -rpcport='$UPSTREAM_RPC_PORT' -listen=0 -printtoconsole=1" +PROXY_CMD="cd '$BTC_DIR' && python3 -u contrib/historical_reorg_proxy.py --upstream-rpc=http://127.0.0.1:$UPSTREAM_RPC_PORT --upstream-cookie-file='$UPSTREAM_DATADIR/.cookie' --target-rpc=http://127.0.0.1:$TARGET_RPC_PORT --target-cookie-file='$TARGET_DATADIR/.cookie' --stale-blocks-dir='$STALE_BLOCKS_DIR' --listen-port=$PROXY_PORT" +TARGET_CMD="cd '$BTC_DIR' && build/bin/bitcoind -datadir='$TARGET_DATADIR' -server=1 -rpcport='$TARGET_RPC_PORT' -prune=10000 -dbcache=2000 -assumevalid=0 -connect=127.0.0.1:$PROXY_PORT -listen=0 -printtoconsole=1" + +if [[ -n "${TMUX:-}" ]]; then + tmux new-window -n "$WINDOW_NAME" -c "$BTC_DIR" + TARGET_WIN="{last}" +else + tmux new-session -d -s "$SESSION_NAME" -n "$WINDOW_NAME" -c "$BTC_DIR" + TARGET_WIN="$SESSION_NAME:0" +fi + +tmux split-window -h -t "$TARGET_WIN" +tmux split-window -h -t "$TARGET_WIN" +tmux select-layout -t "$TARGET_WIN" even-horizontal + +tmux send-keys -t "${TARGET_WIN}.0" "$UPSTREAM_CMD" C-m +tmux send-keys -t "${TARGET_WIN}.1" "$PROXY_CMD" C-m +tmux send-keys -t "${TARGET_WIN}.2" "$TARGET_CMD" C-m + +echo "[ok] launched 3 panes:" +echo " pane 0: upstream bitcoind" +echo " pane 1: historical reorg proxy (auto UTXO compare)" +echo " pane 2: target bitcoind (assumevalid=0, prune=10000, dbcache=2000)" + +if [[ -z "${TMUX:-}" ]]; then + exec tmux attach -t "$SESSION_NAME" +fi diff --git a/doc/historical_reorg_proxy.md b/doc/historical_reorg_proxy.md new file mode 100644 index 000000000000..7cf47af5be25 --- /dev/null +++ b/doc/historical_reorg_proxy.md @@ -0,0 +1,109 @@ +# Historical Reorg Proxy + +`contrib/historical_reorg_proxy.py` runs a stale-first P2P proxy that can replay +historical stale blocks before serving canonical blocks from an upstream node. + +This is intended for exercising reorg behavior on a target node during sync. + +## Inputs + +- Upstream RPC node: serves canonical headers/blocks (`getblockhash`, + `getblockheader`, `getblock`). +- Optional upstream `.cookie` file: authenticate RPC without explicit + `rpcuser`/`rpcpassword`. +- Optional stale block directory: raw files named `-.bin` (the + same format used by `bitcoin-data/stale-blocks`). +- Optional target RPC node: used for final UTXO snapshot output. +- Optional target `.cookie` file: authenticate target RPC without explicit + credentials. +- Optional target `debug.log`: used to validate that served stale tips were + later reorged out. + +## Example + +```bash +contrib/historical_reorg_proxy.py \ + --upstream-rpc=http://127.0.0.1:8332 \ + --upstream-cookie-file=/data/upstream/.cookie \ + --target-rpc=http://127.0.0.1:18443 \ + --target-cookie-file=/data/target/.cookie \ + --target-debug-log=/data/target/regtest/debug.log \ + --stale-blocks-dir=./stale-blocks/blocks \ + --listen-port=8338 +``` + +Start the target node with: + +```bash +bitcoind ... -connect=127.0.0.1:8338 +``` + +When `--target-rpc` is set, the proxy forces each served stale block to become +preferred on the target node, waits for the target to reach the upstream tip, +then prints and compares the final `gettxoutsetinfo(hash_serialized_3)` result. + +When `--target-debug-log` is set, the proxy also prints a reorg summary from +`UpdateTip` log lines: + +- number of served stale blocks +- number that became tip +- number that were later reorged out +- hashes missing tip or reorg evidence + +Missing reorg evidence causes a non-zero exit code. + +## Local 300k shallow run + +```bash +BASE_DIR="/mnt/my_storage" +SRC_DIR="$BASE_DIR/bitcoin" +SRC_DATA_DIR="$BASE_DIR/BitcoinData" +TGT_DATA_DIR="$BASE_DIR/ShallowBitcoinData" +LOG_DIR="$BASE_DIR/logs" +STALE_DIR="/data/my_storage/stale-blocks/blocks" +``` + +Terminal 1 (upstream node, RPC with cookie auth): + +```bash +"$SRC_DIR/build/bin/bitcoind" \ + -datadir="$SRC_DATA_DIR" \ + -server=1 \ + -rpcbind=127.0.0.1 \ + -rpcallowip=127.0.0.1 \ + -rpcport=19443 \ + -stopatheight=300000 \ + -debuglogfile="$LOG_DIR/upstream-300k.log" +``` + +Terminal 2 (proxy): + +```bash +"$SRC_DIR/contrib/historical_reorg_proxy.py" \ + --upstream-rpc=http://127.0.0.1:19443 \ + --upstream-cookie-file="$SRC_DATA_DIR/.cookie" \ + --stale-blocks-dir="$STALE_DIR" \ + --target-rpc=http://127.0.0.1:19454 \ + --target-cookie-file="$TGT_DATA_DIR/.cookie" \ + --target-debug-log="$LOG_DIR/shallow-300k.log" \ + --listen-port=8338 +``` + +Terminal 3 (target shallow/pruned node under test, no `-server=1`): + +```bash +"$SRC_DIR/build/bin/bitcoind" \ + -datadir="$TGT_DATA_DIR" \ + -prune=550 \ + -noconnect \ + -addnode=127.0.0.1:8338 \ + -listen=0 \ + -dnsseed=0 \ + -fixedseeds=0 \ + -discover=0 \ + -stopatheight=300000 \ + -debuglogfile="$LOG_DIR/shallow-300k.log" +``` + +When the target reaches the upstream tip, the proxy prints the final UTXO hash +comparison and reorg validation summary, then exits. diff --git a/test/functional/p2p_historical_reorg_proxy.py b/test/functional/p2p_historical_reorg_proxy.py new file mode 100755 index 000000000000..48a84869b6e8 --- /dev/null +++ b/test/functional/p2p_historical_reorg_proxy.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test stale-first historical reorg proxy behavior.""" + +from pathlib import Path +import re + +from test_framework.blocktools import ( + create_block, + create_coinbase, +) +from test_framework.test_framework import BitcoinTestFramework +from test_framework.stale_reorg_proxy import ( + HistoricalReorgProxy, + RPCChainView, + RawBlockMessage, + load_stale_blocks_from_dir, +) +from test_framework.messages import ( + CInv, + MSG_BLOCK, + msg_getdata, + msg_headers, + msg_inv, + msg_notfound, +) +from test_framework.util import ( + assert_equal, +) + +UPDATE_TIP_RE = re.compile(r"UpdateTip: new best=([0-9a-f]{64}) height=(\d+)") + + +class CapturingHistoricalReorgProxy(HistoricalReorgProxy): + """Proxy variant for direct functional regression checks without a socket.""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.sent_messages = [] + + def send_without_ping(self, message, is_decoy=False): + self.sent_messages.append(message) + + +class HistoricalReorgProxyTest(BitcoinTestFramework): + def set_test_params(self): + self.num_nodes = 2 + self.setup_clean_chain = True + + def setup_network(self): + self.setup_nodes() + + def _write_stale_blocks(self, *, source_node, stale_dir): + # Build one two-block stale branch at heights 80->81 and two independent stale blocks. + stale_plan = [ + {"name": "h30", "height": 30, "parent": ("active", 29)}, + {"name": "h80", "height": 80, "parent": ("active", 79)}, + {"name": "h81", "height": 81, "parent": ("stale", "h80")}, + {"name": "h140", "height": 140, "parent": ("active", 139)}, + ] + + stale_hashes_by_name = {} + stale_times_by_name = {} + active_hashes_by_height = {} + + for entry in stale_plan: + stale_height = entry["height"] + parent_kind, parent_ref = entry["parent"] + if parent_kind == "active": + parent_hash = source_node.getblockhash(parent_ref) + parent_time = source_node.getblockheader(parent_hash)["time"] + else: + parent_hash = stale_hashes_by_name[parent_ref] + parent_time = stale_times_by_name[parent_ref] + + active_hash = source_node.getblockhash(stale_height) + active_time = source_node.getblockheader(active_hash)["time"] + + # Ensure stale block differs from active block while remaining valid. + stale_time = max(parent_time + 1, active_time + 1) + stale_block = create_block( + int(parent_hash, 16), + create_coinbase(stale_height), + stale_time, + ) + stale_block.solve() + + stale_path = stale_dir / f"{stale_height}-{stale_block.hash_hex}.bin" + stale_path.write_bytes(stale_block.serialize()) + + stale_hashes_by_name[entry["name"]] = stale_block.hash_hex + stale_times_by_name[entry["name"]] = stale_time + active_hashes_by_height[stale_height] = active_hash + + return stale_hashes_by_name, active_hashes_by_height + + def _run_direct_proxy_regression_checks(self, *, source_node, stale_blocks): + self.log.info("Regression check: proactive stale announce on getdata-only progression") + stale_80 = next(stale for stale in stale_blocks if stale.height == 80) + proactive_proxy = CapturingHistoricalReorgProxy( + chain_view=RPCChainView(source_node), + stale_blocks=[stale_80], + max_height=220, + defer_stale_until_getdata=True, + record_all_served_blocks=False, + record_getdata_requests=False, + active_prefetch_window=0, + active_send_ahead=0, + ) + proactive_proxy.on_getdata( + msg_getdata([CInv(MSG_BLOCK, int(source_node.getblockhash(79), 16))]) + ) + assert_equal(proactive_proxy._stats["stale_headers"], 1) + assert_equal(proactive_proxy._stats["stale_inv_nudge"], 1) + assert stale_80.hash_int in proactive_proxy.announced_stale_headers + announced_header_hashes = [ + header.hash_int + for message in proactive_proxy.sent_messages + if isinstance(message, msg_headers) + for header in message.headers + ] + announced_inv_hashes = [ + inv.hash + for message in proactive_proxy.sent_messages + if isinstance(message, msg_inv) + for inv in message.inv + ] + assert stale_80.hash_int in announced_header_hashes + assert stale_80.hash_int in announced_inv_hashes + assert any( + isinstance(message, RawBlockMessage) for message in proactive_proxy.sent_messages + ) + + self.log.info("Regression check: stale re-request falls back to active chain") + stale_30 = next(stale for stale in stale_blocks if stale.height == 30) + repeat_proxy = CapturingHistoricalReorgProxy( + chain_view=RPCChainView(source_node), + stale_blocks=[stale_30], + max_height=220, + defer_stale_until_getdata=True, + record_all_served_blocks=False, + record_getdata_requests=False, + active_prefetch_window=0, + active_send_ahead=0, + ) + repeat_proxy.on_getdata(msg_getdata([CInv(MSG_BLOCK, stale_30.hash_int)])) + assert_equal(repeat_proxy._stats["stale_blocks"], 1) + repeat_proxy.sent_messages = [] + repeat_proxy.on_getdata(msg_getdata([CInv(MSG_BLOCK, stale_30.hash_int)])) + assert_equal(repeat_proxy._stats["stale_repeats"], 1) + assert stale_30.hash_int in repeat_proxy.disabled_stale_blocks + notfound_hashes = [ + inv.hash + for message in repeat_proxy.sent_messages + if isinstance(message, msg_notfound) + for inv in message.vec + ] + fallback_inv_hashes = [ + inv.hash + for message in repeat_proxy.sent_messages + if isinstance(message, msg_inv) + for inv in message.inv + ] + assert stale_30.hash_int in notfound_hashes + assert int(source_node.getblockhash(stale_30.height), 16) in fallback_inv_hashes + + self.log.info("Regression check: inject stale block even if peer only requests active hash") + inject_proxy = CapturingHistoricalReorgProxy( + chain_view=RPCChainView(source_node), + stale_blocks=[stale_80], + max_height=220, + defer_stale_until_getdata=True, + record_all_served_blocks=True, + record_getdata_requests=False, + active_prefetch_window=0, + active_send_ahead=0, + ) + active_80_hash_int = int(source_node.getblockhash(80), 16) + inject_proxy.on_getdata(msg_getdata([CInv(MSG_BLOCK, active_80_hash_int)])) + assert_equal(inject_proxy._stats["stale_blocks"], 1) + assert_equal(inject_proxy.served_blocks[0][0], "stale") + assert_equal(inject_proxy.served_blocks[0][1], 80) + assert_equal(inject_proxy.served_blocks[1], ("active", 80, active_80_hash_int)) + + self.log.info("Regression check: keep advancing active headers after stale replay is exhausted") + follow_proxy = CapturingHistoricalReorgProxy( + chain_view=RPCChainView(source_node), + stale_blocks=[], + max_height=220, + defer_stale_until_getdata=True, + record_all_served_blocks=False, + record_getdata_requests=False, + active_prefetch_window=0, + active_send_ahead=0, + ) + follow_proxy._highest_announced_active_height = 100 + active_100_hash_int = int(source_node.getblockhash(100), 16) + follow_proxy.on_getdata(msg_getdata([CInv(MSG_BLOCK, active_100_hash_int)])) + header_batches = [ + message for message in follow_proxy.sent_messages if isinstance(message, msg_headers) + ] + assert header_batches, "proxy did not extend the active header frontier" + followed_headers = header_batches[-1].headers + assert_equal(followed_headers[0].hash_hex, source_node.getblockhash(101)) + assert_equal(followed_headers[-1].hash_hex, source_node.getblockhash(220)) + + def run_test(self): + source_node = self.nodes[0] + sync_node = self.nodes[1] + + self.log.info("Generate canonical source chain") + chain_length = 220 + self.generate(source_node, chain_length, sync_fun=self.no_op) + + self.log.info("Create stale blocks in stale-blocks format (-.bin)") + stale_dir = Path(sync_node.datadir_path) / "stale-blocks" + stale_dir.mkdir(parents=True, exist_ok=True) + stale_hashes_by_name, active_hashes_by_height = self._write_stale_blocks( + source_node=source_node, + stale_dir=stale_dir, + ) + + stale_blocks = load_stale_blocks_from_dir(stale_dir) + assert_equal(len(stale_blocks), len(stale_hashes_by_name)) + self._run_direct_proxy_regression_checks(source_node=source_node, stale_blocks=stale_blocks) + + self.log.info("Connect sync node to stale-first proxy only") + proxy = sync_node.add_outbound_p2p_connection( + HistoricalReorgProxy( + chain_view=RPCChainView(source_node), + stale_blocks=stale_blocks, + max_height=chain_length, + ), + p2p_idx=0, + connection_type="outbound-full-relay", + ) + + self.log.info("Wait for full sync through proxy") + self.wait_until(lambda: sync_node.getblockcount() == chain_length) + assert_equal(sync_node.getbestblockhash(), source_node.getbestblockhash()) + + self.log.info("Verify each stale block became the active tip (and was later reorged out)") + debug_log = Path(sync_node.datadir_path) / "regtest" / "debug.log" + update_tips = [] + with debug_log.open("r", encoding="utf-8", errors="replace") as log_file: + for line in log_file: + match = UPDATE_TIP_RE.search(line) + if match is None: + continue + update_tips.append((int(match.group(2)), match.group(1))) + + self.log.info("Verify stale blocks were requested before their active counterparts") + served_positions = {} + for idx, (_, _, served_hash) in enumerate(proxy.served_blocks): + served_positions.setdefault(served_hash, idx) + + for stale_height, stale_name in ((30, "h30"), (80, "h80"), (81, "h81"), (140, "h140")): + stale_hash_int = int(stale_hashes_by_name[stale_name], 16) + active_hash_int = int(active_hashes_by_height[stale_height], 16) + assert stale_hash_int in served_positions + assert active_hash_int in served_positions + assert served_positions[stale_hash_int] < served_positions[active_hash_int] + + stale_hash_hex = stale_hashes_by_name[stale_name] + assert any( + height == stale_height and block_hash == stale_hash_hex + for height, block_hash in update_tips + ), f"stale block never became tip: {stale_height}-{stale_hash_hex}" + header = sync_node.getblockheader(stale_hash_hex) + assert header["confirmations"] < 0, f"stale block still on active chain: {stale_height}-{stale_hash_hex}" + + self.log.info("Verify multi-block stale branch served in order (80 -> 81)") + stale_80 = int(stale_hashes_by_name["h80"], 16) + stale_81 = int(stale_hashes_by_name["h81"], 16) + assert served_positions[stale_80] < served_positions[stale_81] + + self.log.info("Verify stale tips remain indexed as forks") + tips_by_hash = {tip["hash"]: tip["status"] for tip in sync_node.getchaintips()} + for stale_hash in ( + stale_hashes_by_name["h30"], + stale_hashes_by_name["h81"], + stale_hashes_by_name["h140"], + ): + assert stale_hash in tips_by_hash + assert tips_by_hash[stale_hash] == "valid-fork" + + self.log.info("Snapshot and compare final UTXO set hash") + source_utxo = source_node.gettxoutsetinfo("hash_serialized_3", use_index=False) + sync_utxo = sync_node.gettxoutsetinfo("hash_serialized_3", use_index=False) + assert_equal(sync_utxo["hash_serialized_3"], source_utxo["hash_serialized_3"]) + self.log.info( + "Final UTXO snapshot hash_serialized_3 at height %d: %s", + sync_utxo["height"], + sync_utxo["hash_serialized_3"], + ) + + +if __name__ == '__main__': + HistoricalReorgProxyTest(__file__).main() diff --git a/test/functional/test_framework/stale_reorg_proxy.py b/test/functional/test_framework/stale_reorg_proxy.py new file mode 100644 index 000000000000..a590f541149a --- /dev/null +++ b/test/functional/test_framework/stale_reorg_proxy.py @@ -0,0 +1,1306 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Helpers for replaying stale blocks before canonical blocks over P2P.""" + +from bisect import bisect_left +from collections import OrderedDict, defaultdict +from dataclasses import dataclass +from io import BytesIO +from pathlib import Path +import binascii +import random +import re +import time +import traceback + +from test_framework.messages import ( + CBlockHeader, + CInv, + MAX_HEADERS_RESULTS, + MSG_BLOCK, + MSG_TYPE_MASK, + msg_headers, + msg_inv, + msg_notfound, +) +from test_framework.p2p import ( + P2P_SERVICES, + P2PInterface, +) +from test_framework.v2_p2p import EncryptedP2PState + + +STALE_BLOCK_FILE_RE = re.compile(r"^(?P\d+)-(?P[0-9a-f]{64})\.bin$") + + +def hash_int_to_hex(hash_int): + return f"{hash_int:064x}" + + +def hash_hex_to_int(hash_hex): + return int(hash_hex, 16) + + +@dataclass +class StaleBlockRecord: + height: int + hash_int: int + prev_hash_int: int + header: CBlockHeader + raw_block: bytes + path: Path + + +class RawBlockMessage: + """P2P `block` message carrying pre-serialized block bytes.""" + + msgtype = b"block" + + def __init__(self, payload): + self.payload = payload + + def serialize(self): + return self.payload + + +def load_stale_blocks_from_dir(blocks_dir): + """Load stale blocks from files named -.bin.""" + records = [] + for path in sorted(Path(blocks_dir).glob("*.bin")): + match = STALE_BLOCK_FILE_RE.fullmatch(path.name) + if match is None: + continue + raw_block = path.read_bytes() + header = CBlockHeader() + try: + # Parse only the header to avoid retaining large per-tx structures. + header.deserialize(BytesIO(raw_block)) + except Exception as exc: + print(f"Skipping malformed stale block file {path}: {exc!r}", flush=True) + continue + + expected_hash = match.group("hash") + if header.hash_hex != expected_hash: + print( + "Skipping stale block hash mismatch for {}: expected {}, got {}".format( + path, expected_hash, header.hash_hex + ), + flush=True, + ) + continue + + records.append( + StaleBlockRecord( + height=int(match.group("height")), + hash_int=header.hash_int, + prev_hash_int=header.hashPrevBlock, + header=header, + raw_block=raw_block, + path=path, + ) + ) + + records.sort(key=lambda rec: (rec.height, rec.hash_int)) + return records + + +def group_stale_blocks_by_parent(stale_blocks): + stale_by_parent = defaultdict(list) + for stale_block in stale_blocks: + stale_by_parent[stale_block.prev_hash_int].append(stale_block) + for stale_list in stale_by_parent.values(): + stale_list.sort(key=lambda rec: (rec.height, rec.hash_int)) + return stale_by_parent + + +class RPCChainView: + """Lazy RPC-backed access to active-chain headers and blocks.""" + + def __init__( + self, + rpc, + *, + block_raw_cache_max_bytes=256 * 1024 * 1024, + rpc_batch_size=64, + rpc_header_batch_size=512, + rpc_hash_batch_size=1024, + tip_height_cache_seconds=1.0, + ): + self.rpc = rpc + self._active_height_by_hash = {} + self._block_hash_by_height = {} + self._header_by_hash = {} + self._block_raw_by_hash = OrderedDict() + self._block_raw_cache_bytes = 0 + self._block_raw_cache_max_bytes = int(block_raw_cache_max_bytes) + self._rpc_batch_size = max(1, int(rpc_batch_size)) + self._rpc_header_batch_size = max(1, int(rpc_header_batch_size)) + self._rpc_hash_batch_size = max(1, int(rpc_hash_batch_size)) + self._raw_cache_evictions = 0 + self._raw_block_size_ema = None + self._raw_block_size_samples = 0 + self._raw_block_size_recent_max = None + self._tip_height_cache_seconds = max(0.0, float(tip_height_cache_seconds)) + self._tip_height_cache_value = None + self._tip_height_cache_at = 0.0 + + def _note_raw_block_size(self, raw_len): + # Track a smoothed estimate so prefetch windows can adapt as blocks grow. + raw_len = int(raw_len) + if raw_len <= 0: + return + alpha = 0.02 + if self._raw_block_size_ema is None: + self._raw_block_size_ema = float(raw_len) + else: + self._raw_block_size_ema = (1.0 - alpha) * self._raw_block_size_ema + alpha * float(raw_len) + self._raw_block_size_samples += 1 + if self._raw_block_size_recent_max is None: + self._raw_block_size_recent_max = float(raw_len) + else: + # Decay slowly so a recent large block quickly clamps batch sizing. + self._raw_block_size_recent_max = max( + float(raw_len), + self._raw_block_size_recent_max * 0.99, + ) + + def estimated_raw_block_size(self): + # Before we have any samples, assume a modest historical mainnet block. + # This is only used to bound prefetch work. + if self._raw_block_size_ema is None: + return 150_000 + est = float(self._raw_block_size_ema) + if self._raw_block_size_recent_max is not None: + est = max(est, float(self._raw_block_size_recent_max)) + return max(1, int(est)) + + def estimated_raw_cache_capacity_blocks(self, *, fill_ratio=0.80): + max_bytes = int(self._block_raw_cache_max_bytes) + if max_bytes <= 0: + return 0 + est = self.estimated_raw_block_size() + if est <= 0: + return 0 + return max(0, int((max_bytes * float(fill_ratio)) // est)) + + def raw_cache_stats(self): + return { + "entries": len(self._block_raw_by_hash), + "bytes": self._block_raw_cache_bytes, + "max_bytes": self._block_raw_cache_max_bytes, + "evictions": self._raw_cache_evictions, + } + + def _cache_get_raw(self, block_hash_int): + raw = self._block_raw_by_hash.get(block_hash_int) + if raw is None: + return None + self._block_raw_by_hash.move_to_end(block_hash_int) + return raw + + def _cache_put_raw(self, block_hash_int, raw): + if self._block_raw_cache_max_bytes <= 0: + return + raw_len = len(raw) + if raw_len > self._block_raw_cache_max_bytes: + # Block too large to cache under the configured cap. + return + self._note_raw_block_size(raw_len) + existing = self._block_raw_by_hash.pop(block_hash_int, None) + if existing is not None: + self._block_raw_cache_bytes -= len(existing) + self._block_raw_by_hash[block_hash_int] = raw + self._block_raw_cache_bytes += raw_len + while ( + self._block_raw_cache_bytes > self._block_raw_cache_max_bytes + and self._block_raw_by_hash + ): + _, evicted = self._block_raw_by_hash.popitem(last=False) + self._block_raw_cache_bytes -= len(evicted) + self._raw_cache_evictions += 1 + + def tip_height(self): + now = time.monotonic() + if ( + self._tip_height_cache_value is not None + and self._tip_height_cache_seconds > 0.0 + and now - self._tip_height_cache_at <= self._tip_height_cache_seconds + ): + return self._tip_height_cache_value + tip = self.rpc.getblockcount() + self._tip_height_cache_value = tip + self._tip_height_cache_at = now + return tip + + def cached_tip(self): + if not self._block_hash_by_height: + return None, None + tip_height = max(self._block_hash_by_height.keys()) + return hash_hex_to_int(self._block_hash_by_height[tip_height]), tip_height + + def get_block_hash(self, height): + if height not in self._block_hash_by_height: + block_hash_hex = self.rpc.getblockhash(height) + self._block_hash_by_height[height] = block_hash_hex + self._active_height_by_hash[hash_hex_to_int(block_hash_hex)] = height + return self._block_hash_by_height[height] + + def get_block_hashes_range(self, start_height, end_height): + if end_height < start_height: + return [] + + heights = list(range(start_height, end_height + 1)) + missing = [height for height in heights if height not in self._block_hash_by_height] + raw_rpc = getattr(self.rpc, "_rpc", None) + if missing and raw_rpc is not None and hasattr(self.rpc, "batch"): + unresolved = [] + for offset in range(0, len(missing), self._rpc_hash_batch_size): + chunk = missing[offset : offset + self._rpc_hash_batch_size] + requests = [raw_rpc.getblockhash.get_request(height) for height in chunk] + responses = self.rpc.batch(requests) + for height, response in zip(chunk, responses): + block_hash_hex = response.get("result") + if response.get("error") is not None or block_hash_hex is None: + unresolved.append(height) + continue + self._block_hash_by_height[height] = block_hash_hex + self._active_height_by_hash[hash_hex_to_int(block_hash_hex)] = height + missing = unresolved + + for height in missing: + self.get_block_hash(height) + + return [self._block_hash_by_height[height] for height in heights] + + def get_active_height(self, block_hash_int): + if block_hash_int in self._active_height_by_hash: + return self._active_height_by_hash[block_hash_int] + + try: + header = self.rpc.getblockheader(hash_int_to_hex(block_hash_int)) + except Exception: + self._active_height_by_hash[block_hash_int] = None + return None + + height = header["height"] if header.get("confirmations", -1) >= 0 else None + self._active_height_by_hash[block_hash_int] = height + return height + + def get_active_heights_batch(self, block_hash_ints): + result = {} + missing = [] + for block_hash_int in block_hash_ints: + if block_hash_int in self._active_height_by_hash: + result[block_hash_int] = self._active_height_by_hash[block_hash_int] + else: + missing.append(block_hash_int) + + if missing: + raw_rpc = getattr(self.rpc, "_rpc", None) + if raw_rpc is not None and hasattr(self.rpc, "batch"): + unresolved = [] + for offset in range(0, len(missing), self._rpc_header_batch_size): + chunk = missing[offset : offset + self._rpc_header_batch_size] + requests = [ + raw_rpc.getblockheader.get_request(hash_int_to_hex(block_hash_int)) + for block_hash_int in chunk + ] + responses = self.rpc.batch(requests) + for block_hash_int, response in zip(chunk, responses): + header = response.get("result") + if response.get("error") is not None or header is None: + unresolved.append(block_hash_int) + continue + height = ( + header["height"] + if header.get("confirmations", -1) >= 0 + else None + ) + self._active_height_by_hash[block_hash_int] = height + result[block_hash_int] = height + missing = unresolved + + for block_hash_int in missing: + result[block_hash_int] = self.get_active_height(block_hash_int) + + return result + + def get_header(self, block_hash_int): + if block_hash_int not in self._header_by_hash: + header_hex = self.rpc.getblockheader(hash_int_to_hex(block_hash_int), False) + header = CBlockHeader() + header.deserialize(BytesIO(bytes.fromhex(header_hex))) + self._header_by_hash[block_hash_int] = header + return self._header_by_hash[block_hash_int] + + def get_headers_batch(self, block_hash_ints): + result = {} + missing = [] + for block_hash_int in block_hash_ints: + header = self._header_by_hash.get(block_hash_int) + if header is not None: + result[block_hash_int] = header + else: + missing.append(block_hash_int) + + if not missing: + return result + + raw_rpc = getattr(self.rpc, "_rpc", None) + if raw_rpc is not None and hasattr(self.rpc, "batch"): + unresolved = [] + for offset in range(0, len(missing), self._rpc_header_batch_size): + chunk = missing[offset : offset + self._rpc_header_batch_size] + requests = [ + raw_rpc.getblockheader.get_request(hash_int_to_hex(block_hash_int), False) + for block_hash_int in chunk + ] + responses = self.rpc.batch(requests) + for block_hash_int, response in zip(chunk, responses): + header_hex = response.get("result") + if response.get("error") is not None or header_hex is None: + unresolved.append(block_hash_int) + continue + header = CBlockHeader() + header.deserialize(BytesIO(bytes.fromhex(header_hex))) + self._header_by_hash[block_hash_int] = header + result[block_hash_int] = header + missing = unresolved + + for block_hash_int in missing: + result[block_hash_int] = self.get_header(block_hash_int) + return result + + def get_block_raw(self, block_hash_int): + cached = self._cache_get_raw(block_hash_int) + if cached is not None: + return cached + block_hex = self.rpc.getblock(hash_int_to_hex(block_hash_int), 0) + raw = binascii.unhexlify(block_hex) + self._cache_put_raw(block_hash_int, raw) + return raw + + def get_blocks_raw_batch(self, block_hash_ints, *, cache_only=False): + result = {} if not cache_only else None + missing = [] + for block_hash_int in block_hash_ints: + cached = self._cache_get_raw(block_hash_int) + if cached is not None: + if not cache_only: + result[block_hash_int] = cached + else: + missing.append(block_hash_int) + + if not missing: + return {} if cache_only else result + + raw_rpc = getattr(self.rpc, "_rpc", None) + if raw_rpc is not None and hasattr(self.rpc, "batch"): + unresolved = [] + batch_size = self._rpc_batch_size if self._rpc_batch_size > 0 else len(missing) + # Avoid huge JSON-RPC responses once blocks get large by bounding the + # expected response size. `getblock(verbosity=0)` returns hex, so + # payload is roughly 2x raw block size. + raw_budget_bytes = 16 * 1024 * 1024 + est_raw = self.estimated_raw_block_size() + max_by_size = max(1, raw_budget_bytes // max(1, est_raw)) + batch_size = max(1, min(batch_size, max_by_size)) + for offset in range(0, len(missing), batch_size): + chunk = missing[offset : offset + batch_size] + requests = [ + raw_rpc.getblock.get_request(hash_int_to_hex(h), 0) for h in chunk + ] + responses = self.rpc.batch(requests) + for block_hash_int, response in zip(chunk, responses): + if response.get("error") is not None or response.get("result") is None: + unresolved.append(block_hash_int) + continue + raw = binascii.unhexlify(response["result"]) + self._cache_put_raw(block_hash_int, raw) + if not cache_only: + result[block_hash_int] = raw + missing = unresolved + + for block_hash_int in missing: + raw = self.get_block_raw(block_hash_int) + if not cache_only: + result[block_hash_int] = raw + + return {} if cache_only else result + + +class HistoricalReorgProxy(P2PInterface): + """Serve stale blocks first, then canonical blocks from an RPC upstream.""" + + def __init__( + self, + *, + chain_view, + stale_blocks, + max_height=None, + stale_choice_seed=None, + defer_stale_until_getdata=False, + stale_served_hook=None, + record_all_served_blocks=True, + record_getdata_requests=True, + active_prefetch_window=255, + active_send_ahead=0, + active_send_ahead_min_batch=16, + header_follow_window=4096, + ): + super().__init__() + self.chain_view = chain_view + self.max_height = max_height + self.defer_stale_until_getdata = defer_stale_until_getdata + self._stale_served_hook = stale_served_hook + self._active_prefetch_window = max(0, int(active_prefetch_window)) + self._active_send_ahead = max(0, int(active_send_ahead)) + self._active_send_ahead_min_batch = max( + 1, + min(int(active_send_ahead_min_batch), self._active_send_ahead) + if self._active_send_ahead > 0 + else 1, + ) + self._header_follow_window = max(0, int(header_follow_window)) + self.stale_blocks = list(stale_blocks) + self.stale_by_parent = group_stale_blocks_by_parent(self.stale_blocks) + self.stale_by_hash = {stale.hash_int: stale for stale in self.stale_blocks} + self.announced_stale_headers = set() + self.served_stale_blocks = set() + self.disabled_stale_blocks = set() + self.getdata_requests = [] + self._record_getdata_requests = record_getdata_requests + self.served_blocks = [] # (kind, height, hash_int) + self._record_all_served_blocks = record_all_served_blocks + self._stale_rng = random.Random(stale_choice_seed) + self._stale_parent_order = {} + self._stale_subtree_depth_cache = {} + self._accept_net = None + self._accept_services = None + self._accept_supports_v2 = True + self._stats = defaultdict(int) + self._pre_getdata_window_counts = defaultdict(int) + self._last_stats_log_time = time.monotonic() + self._first_getheaders_at = None + self._first_getdata_at = None + self._last_getdata_seen_at = None + self._highest_active_height_sent = 0 + self._highest_active_height_requested = 0 + self._highest_announced_active_height = 0 + self._highest_active_height_prefetched = 0 + self._pending_stale_hashes = set() + self._pending_stale_count_by_height = defaultdict(int) + pending_heights = set() + for stale in self.stale_blocks: + if self.max_height is not None and stale.height > self.max_height: + continue + self._pending_stale_hashes.add(stale.hash_int) + self._pending_stale_count_by_height[stale.height] += 1 + pending_heights.add(stale.height) + self._pending_stale_heights = sorted(pending_heights) + + def _log(self, message): + print(f"[proxy] {message}", flush=True) + + def _advertised_starting_height(self): + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + return min(tip_height, self.max_height) + return tip_height + + def _maybe_log_stats(self, *, force=False): + now = time.monotonic() + if not force and now - self._last_stats_log_time < 10.0: + return + self._last_stats_log_time = now + raw_cache = self.chain_view.raw_cache_stats() + self._log( + "stats getheaders={} getdata={} headers_sent={} stale_headers={} " + "stale_blocks={} active_blocks={} prefetch_cached={} " + "active_send_ahead={} stale_inv_nudge={} stale_repeats={} fallback_active_inv={} " + "req_height={} raw_cache_entries={} raw_cache_mib={:.1f} raw_cache_evict={}".format( + self._stats["getheaders"], + self._stats["getdata"], + self._stats["headers_sent"], + self._stats["stale_headers"], + self._stats["stale_blocks"], + self._stats["active_blocks"], + self._stats["active_prefetch_cached_blocks"], + self._stats["active_send_ahead_blocks"], + self._stats["stale_inv_nudge"], + self._stats["stale_repeats"], + self._stats["fallback_active_inv"], + self._highest_active_height_requested, + raw_cache["entries"], + raw_cache["bytes"] / (1024 * 1024), + raw_cache["evictions"], + ) + ) + + def configure_inbound_reconnect(self, *, net, services=P2P_SERVICES, supports_v2_p2p=True): + self._accept_net = net + self._accept_services = services + self._accept_supports_v2 = supports_v2_p2p + if supports_v2_p2p: + self.v2_state = EncryptedP2PState(initiating=False, net=net) + self.peer_connect_send_version(services) + if self.on_connection_send_msg is not None: + self.on_connection_send_msg.nStartingHeight = self._advertised_starting_height() + self.reconnect = True + self._log( + "inbound reconnect enabled net={} supports_v2={} defer_stale_until_getdata={}".format( + net, + supports_v2_p2p, + self.defer_stale_until_getdata, + ) + ) + + def on_close(self): + # Re-arm handshake state when a new inbound connection reuses this object. + if self._accept_services is not None: + self.peer_connect_send_version(self._accept_services) + if self.on_connection_send_msg is not None: + self.on_connection_send_msg.nStartingHeight = self._advertised_starting_height() + if self._accept_supports_v2 and self._accept_net is not None: + self.v2_state = EncryptedP2PState(initiating=False, net=self._accept_net) + self._log("peer disconnected, re-arming inbound handshake state") + + def _find_serving_fork(self, locator_hashes, hash_stop): + best_hash = None + best_height = -1 + locator_height_cap = None + if self._stats["getdata"] > 0: + locator_height_cap = self._highest_active_height_requested + self._header_follow_window + + for locator_hash in locator_hashes: + stale = self.stale_by_hash.get(locator_hash) + if stale is not None and ( + stale.hash_int in self.served_stale_blocks + or stale.hash_int in self.announced_stale_headers + ): + if self._next_stale_candidate(stale.hash_int, hash_stop) is not None: + if ( + locator_height_cap is None + or stale.height <= locator_height_cap + ) and stale.height > best_height: + best_hash = stale.hash_int + best_height = stale.height + + height = self.chain_view.get_active_height(locator_hash) + if ( + height is not None + and ( + locator_height_cap is None + or height <= locator_height_cap + ) + and height > best_height + ): + best_hash = locator_hash + best_height = height + + if best_hash is not None: + return best_hash, best_height + + if locator_height_cap is not None: + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + tip_height = min(tip_height, self.max_height) + if tip_height >= 0: + capped_height = min(locator_height_cap, tip_height) + capped_hash = hash_hex_to_int(self.chain_view.get_block_hash(capped_height)) + return capped_hash, capped_height + + cached_tip_hash, cached_tip_height = self.chain_view.cached_tip() + if cached_tip_hash is not None: + return cached_tip_hash, cached_tip_height + genesis_hash = hash_hex_to_int(self.chain_view.get_block_hash(0)) + return genesis_hash, 0 + + def _stale_subtree_depth(self, parent_hash): + if parent_hash in self._stale_subtree_depth_cache: + return self._stale_subtree_depth_cache[parent_hash] + + best = 0 + for stale in self.stale_by_parent.get(parent_hash, []): + depth = 1 + self._stale_subtree_depth(stale.hash_int) + if depth > best: + best = depth + self._stale_subtree_depth_cache[parent_hash] = best + return best + + def _next_stale_candidate(self, parent_hash, hash_stop, *, allow_announced=False): + # During initial headers pre-sync (before any getdata), stale + # diversions can trigger repeated pre-sync loops. Defer stale + # injection until block download begins. + if self.defer_stale_until_getdata and self._stats["getdata"] == 0: + return None + + if parent_hash not in self._stale_parent_order: + ordered = list(self.stale_by_parent.get(parent_hash, [])) + if len(ordered) > 1: + # Prefer siblings that allow longer stale sequences. + self._stale_rng.shuffle(ordered) + ordered.sort( + key=lambda rec: ( + -self._stale_subtree_depth(rec.hash_int), + rec.height, + rec.hash_int, + ) + ) + self._stale_parent_order[parent_hash] = ordered + + for stale in self._stale_parent_order.get(parent_hash, []): + if stale.hash_int in self.served_stale_blocks: + continue + if stale.hash_int in self.announced_stale_headers and not allow_announced: + # In long-run replay mode, never re-announce the same stale + # header; repeated diversions can deadlock header sync. + if self.defer_stale_until_getdata: + continue + # Functional test mode: allow re-announcement once getdata + # begins so skipped stale headers can still be fetched. + if self._stats["getdata"] == 0: + continue + if stale.hash_int in self.disabled_stale_blocks: + continue + if self.max_height is not None and stale.height > self.max_height: + continue + if hash_stop and hash_stop != stale.hash_int: + continue + return stale + return None + + def _next_pending_stale_height(self, min_height): + index = bisect_left(self._pending_stale_heights, min_height) + while index < len(self._pending_stale_heights): + height = self._pending_stale_heights[index] + if self._pending_stale_count_by_height.get(height, 0) > 0: + return height + index += 1 + return None + + def _consume_pending_stale(self, stale): + if stale.hash_int not in self._pending_stale_hashes: + return + self._pending_stale_hashes.remove(stale.hash_int) + remaining = self._pending_stale_count_by_height.get(stale.height, 0) + if remaining > 0: + self._pending_stale_count_by_height[stale.height] = remaining - 1 + + def _serve_stale_with_followups(self, stale, *, fallback_active_hash_int=None, source="getdata"): + self.served_stale_blocks.add(stale.hash_int) + self._consume_pending_stale(stale) + self.served_blocks.append(("stale", stale.height, stale.hash_int)) + self.send_without_ping(RawBlockMessage(stale.raw_block)) + self._stats["stale_blocks"] += 1 + self._log( + "served stale block [{}] height={} hash={} parent={}".format( + source, + stale.height, + hash_int_to_hex(stale.hash_int), + hash_int_to_hex(stale.prev_hash_int), + ) + ) + if self._stale_served_hook is not None: + try: + self._stale_served_hook(stale, source) + except Exception as exc: + self._log( + "stale_served_hook failed: height={} hash={} err={!r}".format( + stale.height, hash_int_to_hex(stale.hash_int), exc + ) + ) + + invs = [] + next_stale = self._next_stale_candidate(stale.hash_int, 0) + if next_stale is not None: + invs.append(CInv(MSG_BLOCK, next_stale.hash_int)) + self._log( + "stale continuation candidate: height={} hash={}".format( + next_stale.height, hash_int_to_hex(next_stale.hash_int) + ) + ) + + if fallback_active_hash_int is None: + fallback_active_hash_int = hash_hex_to_int( + self.chain_view.get_block_hash(stale.height) + ) + if fallback_active_hash_int != stale.hash_int: + invs.append(CInv(MSG_BLOCK, fallback_active_hash_int)) + self._stats["fallback_active_inv"] += 1 + self._log( + "stale fallback: announcing active peer block " + "height={} hash={}".format( + stale.height, hash_int_to_hex(fallback_active_hash_int) + ) + ) + + if next_stale is None: + next_height = stale.height + 1 + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + tip_height = min(tip_height, self.max_height) + if next_height <= tip_height: + next_hash_int = hash_hex_to_int(self.chain_view.get_block_hash(next_height)) + if next_hash_int != fallback_active_hash_int: + invs.append(CInv(MSG_BLOCK, next_hash_int)) + + if invs: + self.send_without_ping(msg_inv(invs)) + + def _maybe_proactive_stale_announce(self, max_requested_height): + """Inject stale headers during block download even if getheaders is quiet. + + When the target already has headers synced, it may stop asking getheaders + entirely (getheaders ~= 1). In that mode, stale injection via getheaders + never triggers. This proactively announces one stale header at a time as + the peer approaches the corresponding height. + """ + if self._stats["getdata"] == 0 or max_requested_height is None: + return + + next_stale_height = self._next_pending_stale_height(max_requested_height) + if next_stale_height is None: + return + if next_stale_height > max_requested_height + 1: + return + + if next_stale_height <= 0: + return + parent_hash_int = hash_hex_to_int( + self.chain_view.get_block_hash(next_stale_height - 1) + ) + stale = self._next_stale_candidate(parent_hash_int, 0) + if stale is None: + return + if stale.hash_int in self.announced_stale_headers: + return + + self.announced_stale_headers.add(stale.hash_int) + self.send_without_ping(msg_headers([CBlockHeader(stale.header)])) + self._stats["headers_sent"] += 1 + self._stats["stale_headers"] += 1 + self.send_without_ping(msg_inv([CInv(MSG_BLOCK, stale.hash_int)])) + self._stats["stale_inv_nudge"] += 1 + self._log( + "proactive stale announce near download frontier: " + "requested_height={} stale_height={} stale_hash={} parent={}".format( + max_requested_height, + stale.height, + hash_int_to_hex(stale.hash_int), + hash_int_to_hex(stale.prev_hash_int), + ) + ) + + def _maybe_follow_active_headers(self, max_requested_height): + """Advance the peer's active header frontier during getdata-only sync. + + After the last stale range, some peers stop issuing fresh getheaders + requests while they are still downloading blocks. If we do not extend + the announced active headers here, block download can stall at the last + announced height. + """ + if max_requested_height is None: + return + + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + tip_height = min(tip_height, self.max_height) + if self._highest_announced_active_height >= tip_height: + return + + follow_threshold = min(128, MAX_HEADERS_RESULTS // 4) + if max_requested_height + follow_threshold < self._highest_announced_active_height: + return + + start_height = max(self._highest_announced_active_height + 1, max_requested_height + 1) + next_stale_height = self._next_pending_stale_height(start_height) + end_height = min(tip_height, start_height + MAX_HEADERS_RESULTS - 1) + if next_stale_height is not None: + end_height = min(end_height, next_stale_height - 1) + if end_height < start_height: + return + + active_hash_hexes = self.chain_view.get_block_hashes_range(start_height, end_height) + active_hashes = [hash_hex_to_int(block_hash_hex) for block_hash_hex in active_hash_hexes] + if not active_hashes: + return + + header_map = self.chain_view.get_headers_batch(active_hashes) + headers = [header_map[block_hash_int] for block_hash_int in active_hashes] + self.send_without_ping(msg_headers(headers)) + self._stats["headers_sent"] += len(headers) + self._highest_announced_active_height = max( + self._highest_announced_active_height, + end_height, + ) + self._log( + "active header follow count={} range={}..{} next_stale_height={}".format( + len(headers), + start_height, + end_height, + next_stale_height if next_stale_height is not None else "none", + ) + ) + + def on_getheaders(self, message): + try: + self._stats["getheaders"] += 1 + if self._first_getheaders_at is None: + self._first_getheaders_at = time.monotonic() + locator_hashes = message.locator.vHave + hash_stop = message.hashstop + fork_hash, fork_height = self._find_serving_fork(locator_hashes, hash_stop) + + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + tip_height = min(tip_height, self.max_height) + + next_height = fork_height + 1 + if next_height > tip_height: + if self._stats["getdata"] == 0: + self._log( + "pre-getdata empty headers fork_height={} tip_height={} " + "locators={} hash_stop={}".format( + fork_height, + tip_height, + len(locator_hashes), + hash_int_to_hex(hash_stop) if hash_stop else "0", + ) + ) + # Some peers can keep requesting headers after reaching the + # tip without transitioning to block download. Nudge with + # the current tip inv to trigger getdata. + if tip_height > 0 and self._stats["getheaders"] % 25 == 0: + tip_hash_int = hash_hex_to_int(self.chain_view.get_block_hash(tip_height)) + self.send_without_ping(msg_inv([CInv(MSG_BLOCK, tip_hash_int)])) + self._stats["fallback_active_inv"] += 1 + self._log( + "pre-getdata tip nudge: announcing tip height={} hash={}".format( + tip_height, hash_int_to_hex(tip_hash_int) + ) + ) + self.send_without_ping(msg_headers([])) + return + + if self._stats["getdata"] == 0: + window_key = (fork_height, next_height, tip_height) + self._pre_getdata_window_counts[window_key] += 1 + repeat = self._pre_getdata_window_counts[window_key] + if repeat in (1, 5, 10) or repeat % 25 == 0: + first_locator = locator_hashes[0] if locator_hashes else 0 + first_locator_height = ( + self.chain_view.get_active_height(first_locator) + if first_locator + else None + ) + self._log( + "pre-getdata headers window fork_height={} next_height={} " + "tip_height={} repeat={} locators={} first_locator={} " + "first_locator_height={} hash_stop={}".format( + fork_height, + next_height, + tip_height, + repeat, + len(locator_hashes), + hash_int_to_hex(first_locator) if first_locator else "0", + first_locator_height, + hash_int_to_hex(hash_stop) if hash_stop else "0", + ) + ) + if ( + self._first_getheaders_at is not None + and self._stats["getheaders"] >= 200 + and self._stats["getheaders"] % 50 == 0 + ): + age = time.monotonic() - self._first_getheaders_at + self._log( + "pre-getdata stall-watch getheaders={} headers_sent={} age={:.1f}s" + .format( + self._stats["getheaders"], + self._stats["headers_sent"], + age, + ) + ) + + headers = [] + height = next_height + parent_hash = fork_hash + stale_header = None + stale_was_repeat = False + announced_active_height = fork_height + allow_stale_diversion = not ( + self.defer_stale_until_getdata and self._stats["getdata"] == 0 + ) + while height <= tip_height and len(headers) < MAX_HEADERS_RESULTS: + stale = self._next_stale_candidate(parent_hash, hash_stop) + if stale is not None: + stale_was_repeat = stale.hash_int in self.announced_stale_headers + headers.append(CBlockHeader(stale.header)) + self.announced_stale_headers.add(stale.hash_int) + stale_header = stale + break + remaining = MAX_HEADERS_RESULTS - len(headers) + active_end = min(tip_height, height + remaining - 1) + if allow_stale_diversion and not hash_stop: + next_stale_height = self._next_pending_stale_height(height) + if next_stale_height is not None: + active_end = min(active_end, next_stale_height - 1) + if active_end < height: + break + + active_hash_hexes = self.chain_view.get_block_hashes_range(height, active_end) + active_hashes = [hash_hex_to_int(block_hash_hex) for block_hash_hex in active_hash_hexes] + if hash_stop: + try: + stop_index = active_hashes.index(hash_stop) + except ValueError: + stop_index = None + if stop_index is not None: + active_hashes = active_hashes[: stop_index + 1] + + if not active_hashes: + break + + header_map = self.chain_view.get_headers_batch(active_hashes) + for offset, block_hash_int in enumerate(active_hashes): + headers.append(header_map[block_hash_int]) + parent_hash = block_hash_int + announced_active_height = height + offset + + if hash_stop and active_hashes[-1] == hash_stop: + break + height = announced_active_height + 1 + + self.send_without_ping(msg_headers(headers)) + self._stats["headers_sent"] += len(headers) + if announced_active_height > self._highest_announced_active_height: + self._highest_announced_active_height = announced_active_height + if stale_header is not None: + self._stats["stale_headers"] += 1 + self._log( + "header diversion fork_height={} fork_hash={} -> stale_height={} stale_hash={} " + "repeat={} locators={} hash_stop={}".format( + fork_height, + hash_int_to_hex(fork_hash), + stale_header.height, + hash_int_to_hex(stale_header.hash_int), + stale_was_repeat, + len(locator_hashes), + hash_int_to_hex(hash_stop) if hash_stop else "0", + ) + ) + # Explicitly announce the stale block so peers request it + # immediately instead of silently staying on the active branch. + self.send_without_ping(msg_inv([CInv(MSG_BLOCK, stale_header.hash_int)])) + self._stats["stale_inv_nudge"] += 1 + self._log( + "header diversion nudge: announcing stale block " + "height={} hash={}".format( + stale_header.height, + hash_int_to_hex(stale_header.hash_int), + ) + ) + if len(headers) < MAX_HEADERS_RESULTS: + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + tip_height = min(tip_height, self.max_height) + next_height = stale_header.height + 1 + if next_height <= tip_height: + next_active_hash = hash_hex_to_int(self.chain_view.get_block_hash(next_height)) + self.send_without_ping(msg_inv([CInv(MSG_BLOCK, next_active_hash)])) + self._stats["fallback_active_inv"] += 1 + self._log( + "header diversion nudge: announcing next active block " + "height={} hash={}".format( + next_height, hash_int_to_hex(next_active_hash) + ) + ) + self._maybe_log_stats() + except Exception as exc: + print(f"on_getheaders failure: {exc!r}", flush=True) + print(traceback.format_exc(), flush=True) + + def on_getdata(self, message): + try: + self._stats["getdata"] += 1 + now = time.monotonic() + if self._first_getdata_at is None: + self._first_getdata_at = now + header_age = 0.0 + if self._first_getheaders_at is not None: + header_age = now - self._first_getheaders_at + self._log( + "first getdata received after {:.1f}s (getheaders={} headers_sent={})".format( + header_age, + self._stats["getheaders"], + self._stats["headers_sent"], + ) + ) + self._last_getdata_seen_at = now + active_requests = [] + missing_active = [] + active_candidates = [] + for inv in message.inv: + if self._record_getdata_requests: + self.getdata_requests.append(inv.hash) + if (inv.type & MSG_TYPE_MASK) != MSG_BLOCK: + continue + + stale = self.stale_by_hash.get(inv.hash) + if stale is not None: + if stale.hash_int in self.served_stale_blocks: + # If peer asks again for stale block, steer back to active. + self.disabled_stale_blocks.add(stale.hash_int) + self._consume_pending_stale(stale) + self._stats["stale_repeats"] += 1 + self.send_without_ping(msg_notfound([CInv(inv.type, stale.hash_int)])) + active_hash_int = hash_hex_to_int(self.chain_view.get_block_hash(stale.height)) + if active_hash_int != stale.hash_int: + self.send_without_ping(msg_inv([CInv(MSG_BLOCK, active_hash_int)])) + self._stats["fallback_active_inv"] += 1 + self._log( + "stale re-requested (likely failed validation), " + "sent notfound + active fallback: height={} stale={} active={}".format( + stale.height, + hash_int_to_hex(stale.hash_int), + hash_int_to_hex(active_hash_int), + ) + ) + continue + + self._serve_stale_with_followups(stale, source="getdata") + continue + + active_candidates.append(inv) + + active_height_map = self.chain_view.get_active_heights_batch( + [inv.hash for inv in active_candidates] + ) + for inv in active_candidates: + height = active_height_map.get(inv.hash) + if height is None: + missing_active.append(inv) + continue + + active_requests.append((height, inv.hash)) + if height > self._highest_active_height_requested: + self._highest_active_height_requested = height + + if active_requests: + # Keep getdata responses in ascending height order. This helps + # stale-first replay behave deterministically, and ensures we + # don't stream children blocks before parents. + active_requests.sort( + key=lambda item: (item[0] is None, item[0] if item[0] is not None else 0) + ) + active_hashes = [block_hash_int for _, block_hash_int in active_requests] + block_map = self.chain_view.get_blocks_raw_batch(active_hashes) + for height, block_hash_int in active_requests: + # Some nodes won't request stale blocks during IBD even if + # we announce them. To reliably replay reorgs, inject stale + # blocks at their heights right before serving the active + # block for that height. + if height is not None and height > 0: + # Some stale blocks build on other stale blocks (a rare + # two-block stale branch). Prefer stales that extend the + # active parent first, then fall back to any pending + # stale at this height whose parent stale was already + # served. + while self._pending_stale_count_by_height.get(height, 0) > 0: + parent_hash_int = hash_hex_to_int( + self.chain_view.get_block_hash(height - 1) + ) + stale = self._next_stale_candidate( + parent_hash_int, + 0, + allow_announced=True, + ) + if stale is None or stale.height != height: + # Fall back to stale-on-stale children. + candidates = [] + for rec in self.stale_blocks: + if rec.height != height: + continue + if rec.hash_int in self.served_stale_blocks: + continue + if rec.hash_int in self.disabled_stale_blocks: + continue + if rec.hash_int not in self._pending_stale_hashes: + continue + if rec.prev_hash_int in self.served_stale_blocks: + candidates.append(rec) + candidates.sort(key=lambda rec: rec.hash_int) + stale = candidates[0] if candidates else None + if stale is None: + break + if stale.hash_int not in self.announced_stale_headers: + self.announced_stale_headers.add(stale.hash_int) + # Announce first so the peer doesn't treat the + # incoming block as unexpected. + self.send_without_ping(msg_headers([CBlockHeader(stale.header)])) + self._stats["headers_sent"] += 1 + self._stats["stale_headers"] += 1 + self.send_without_ping(msg_inv([CInv(MSG_BLOCK, stale.hash_int)])) + self._stats["stale_inv_nudge"] += 1 + self._log( + "injected stale announce: height={} hash={} parent={}".format( + stale.height, + hash_int_to_hex(stale.hash_int), + hash_int_to_hex(stale.prev_hash_int), + ) + ) + self._serve_stale_with_followups( + stale, + fallback_active_hash_int=block_hash_int, + source="active-getdata", + ) + + if self._record_all_served_blocks: + self.served_blocks.append(("active", height, block_hash_int)) + self.send_without_ping(RawBlockMessage(block_map[block_hash_int])) + self._stats["active_blocks"] += 1 + if height is not None and height > self._highest_active_height_sent: + self._highest_active_height_sent = height + if self._stats["active_blocks"] % 5000 == 0: + self._log( + "served active block count={} latest_height={} hash={}".format( + self._stats["active_blocks"], + height, + hash_int_to_hex(block_hash_int), + ) + ) + + # Optionally stream a bounded number of active blocks ahead to + # reduce RTT-limited getdata loops. Never cross the next stale + # height so stale opportunities remain intact. + max_requested_height = max((h for h, _ in active_requests if h is not None), default=None) + self._maybe_proactive_stale_announce(max_requested_height) + self._maybe_follow_active_headers(max_requested_height) + est_raw = self.chain_view.estimated_raw_block_size() + raw_cache_max_bytes = self.chain_view.raw_cache_stats()["max_bytes"] + if self._active_send_ahead > 0 and max_requested_height is not None: + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + tip_height = min(tip_height, self.max_height) + # Keep proactive streaming bounded to a fixed window ahead + # of what the peer actually requested. Do not grow the + # ahead distance cumulatively across getdata rounds. + send_start = max( + self._highest_active_height_sent + 1, + max_requested_height + 1, + ) + send_end = min( + max_requested_height + self._active_send_ahead, + tip_height, + self._highest_announced_active_height, + ) + next_stale_height = self._next_pending_stale_height(send_start) + if next_stale_height is not None: + send_end = min(send_end, next_stale_height - 1) + # Send ahead in small chunks so we don't stall the P2P thread + # on very large blocks (common after ~200k on mainnet). + send_budget_bytes = min( + 64 * 1024 * 1024, + max(8 * 1024 * 1024, raw_cache_max_bytes // 64), + ) + send_limit_blocks = max(1, send_budget_bytes // max(1, est_raw)) + send_limit_blocks = min(send_limit_blocks, self._active_send_ahead) + if send_limit_blocks > 0: + send_end = min(send_end, send_start + send_limit_blocks - 1) + send_count = send_end - send_start + 1 + min_batch = min(self._active_send_ahead_min_batch, max(1, send_limit_blocks)) + if send_count >= min_batch: + send_hash_hexes = self.chain_view.get_block_hashes_range(send_start, send_end) + send_hashes = [hash_hex_to_int(block_hash_hex) for block_hash_hex in send_hash_hexes] + send_block_map = self.chain_view.get_blocks_raw_batch(send_hashes) + for send_height, send_hash_int in zip( + range(send_start, send_end + 1), + send_hashes, + ): + if self._record_all_served_blocks: + self.served_blocks.append(("active", send_height, send_hash_int)) + self.send_without_ping(RawBlockMessage(send_block_map[send_hash_int])) + self._stats["active_blocks"] += 1 + self._stats["active_send_ahead_blocks"] += 1 + if send_height > self._highest_active_height_sent: + self._highest_active_height_sent = send_height + if send_count >= 32: + self._log( + "active send-ahead count={} range={}..{} next_stale_height={}".format( + send_count, + send_start, + send_end, + next_stale_height if next_stale_height is not None else "none", + ) + ) + + # Push a bounded amount of active blocks ahead to avoid RTT-limited + # getdata loops by prefetching raw blocks from upstream RPC. Stop + # before the next stale height so reorg opportunities are preserved. + if max_requested_height is not None: + tip_height = self.chain_view.tip_height() + if self.max_height is not None: + tip_height = min(tip_height, self.max_height) + cap_blocks = self.chain_view.estimated_raw_cache_capacity_blocks() + effective_prefetch_window = ( + min(self._active_prefetch_window, cap_blocks) if cap_blocks > 0 else 0 + ) + if effective_prefetch_window > 0: + prefetch_start = max( + self._highest_active_height_prefetched + 1, + max_requested_height + 1, + ) + prefetch_end = min( + max_requested_height + effective_prefetch_window, + tip_height, + self._highest_announced_active_height, + ) + next_stale_height = self._next_pending_stale_height(prefetch_start) + if next_stale_height is not None: + prefetch_end = min(prefetch_end, next_stale_height - 1) + # Bound prefetch so we don't block `on_getdata` for seconds. + prefetch_budget_bytes = min( + 32 * 1024 * 1024, + max(4 * 1024 * 1024, raw_cache_max_bytes // 128), + ) + prefetch_limit_blocks = max(1, prefetch_budget_bytes // max(1, est_raw)) + prefetch_limit_blocks = min(prefetch_limit_blocks, 512, effective_prefetch_window) + if prefetch_limit_blocks > 0: + prefetch_end = min(prefetch_end, prefetch_start + prefetch_limit_blocks - 1) + if prefetch_end >= prefetch_start: + prefetch_hash_hexes = self.chain_view.get_block_hashes_range( + prefetch_start, + prefetch_end, + ) + prefetch_hashes = [ + hash_hex_to_int(block_hash_hex) for block_hash_hex in prefetch_hash_hexes + ] + self.chain_view.get_blocks_raw_batch(prefetch_hashes, cache_only=True) + self._stats["active_prefetch_cached_blocks"] += len(prefetch_hashes) + if prefetch_end > self._highest_active_height_prefetched: + self._highest_active_height_prefetched = prefetch_end + if len(prefetch_hashes) >= 512: + self._log( + "active prefetch cached count={} range={}..{} next_stale_height={}".format( + len(prefetch_hashes), + prefetch_start, + prefetch_end, + next_stale_height if next_stale_height is not None else "none", + ) + ) + + if missing_active: + self.send_without_ping(msg_notfound(missing_active)) + self._log( + "notfound for {} unknown/unsupported block requests; first_hash={}".format( + len(missing_active), + hash_int_to_hex(missing_active[0].hash), + ) + ) + self._maybe_log_stats() + except Exception as exc: + print(f"on_getdata failure: {exc!r}", flush=True) + print(traceback.format_exc(), flush=True) diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index a4052f910243..1e08f075b1a8 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -322,6 +322,7 @@ 'wallet_spend_unconfirmed.py', 'wallet_rescan_unconfirmed.py', 'p2p_fingerprint.py', + 'p2p_historical_reorg_proxy.py', 'feature_uacomment.py', 'feature_init.py', 'wallet_coinbase_category.py',