Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"@btc-vision/plugin-sdk": "^1.0.1",
"@btc-vision/post-quantum": "^0.5.3",
"@btc-vision/rust-merkle-tree": "^1.0.2",
"@btc-vision/transaction": "^1.8.0",
"@btc-vision/transaction": "^1.8.2",
"@btc-vision/uwebsockets.js": "^20.57.0",
"@chainsafe/libp2p-noise": "^17.0.0",
"@chainsafe/libp2p-quic": "^2.0.0",
Expand Down
5 changes: 4 additions & 1 deletion src/src/Core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { DBManagerInstance } from './db/DBManager.js';
import { IndexManager } from './db/indexes/IndexManager.js';
import { ServicesConfigurations } from './services/ServicesConfigurations.js';
import { MessageType } from './threading/enum/MessageType.js';
import { LinkThreadMessage, LinkType, } from './threading/interfaces/thread-messages/messages/LinkThreadMessage.js';
import {
LinkThreadMessage,
LinkType,
} from './threading/interfaces/thread-messages/messages/LinkThreadMessage.js';
import {
LinkThreadRequestData,
LinkThreadRequestMessage,
Expand Down
1 change: 1 addition & 0 deletions src/src/api/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export class Server extends Logger {
methods: 'GET,HEAD,PUT,PATCH,POST,DELETE',
preflightContinue: false,
optionsSuccessStatus: 204,
maxAge: 86400,
}),
);

Expand Down
46 changes: 41 additions & 5 deletions src/src/blockchain-indexer/processor/BlockIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import { MessageType } from '../../threading/enum/MessageType.js';
import { ThreadData } from '../../threading/interfaces/ThreadData.js';
import { Config } from '../../config/Config.js';
import { RPCBlockFetcher } from '../fetcher/RPCBlockFetcher.js';
import {
CurrentIndexerBlockResponseData
} from '../../threading/interfaces/thread-messages/messages/indexer/CurrentIndexerBlock.js';
import { CurrentIndexerBlockResponseData } from '../../threading/interfaces/thread-messages/messages/indexer/CurrentIndexerBlock.js';
import { ChainObserver } from './observer/ChainObserver.js';
import { IndexingTask } from './tasks/IndexingTask.js';
import { BlockFetcher } from '../fetcher/abstract/BlockFetcher.js';
Expand Down Expand Up @@ -184,10 +182,10 @@ export class BlockIndexer extends Logger {
const opnetEnabled = OPNetConsensus.opnetEnabled;
if (opnetEnabled.ENABLED) {
if (opnetEnabled.BLOCK === 0n) {
// OPNet active from genesis resync not allowed at all
// OPNet active from genesis, resync not allowed at all
throw new Error(
`RESYNC_BLOCK_HEIGHTS cannot be used on this network. ` +
`OPNet is enabled from block 0 all blocks are OPNet blocks.`,
`OPNet is enabled from block 0, all blocks are OPNet blocks.`,
);
}

Expand Down Expand Up @@ -443,6 +441,21 @@ export class BlockIndexer extends Logger {
}
}

// watchBlockChanges only fires when the block hash changes.
// If the incoming height is at or below what we've already processed,
// the block at that height was replaced, this is a chain reorganization.
const incomingHeight = BigInt(header.height);
if (
this.started &&
!this.chainReorged &&
incomingHeight > 0n &&
incomingHeight <= this.chainObserver.pendingBlockHeight
) {
void this.onHeightRegressionDetected(incomingHeight, header.hash);

return;
}

if (!this.started) {
this.startTasks();
this.started = true;
Expand All @@ -455,6 +468,29 @@ export class BlockIndexer extends Logger {
this.startTasks();
}

/**
* Called when onBlockChange receives a height at or below what the node
* has already processed. Since watchBlockChanges only fires on hash
* changes, this means a chain reorganization occurred.
*/
private async onHeightRegressionDetected(
incomingHeight: bigint,
newBest: string,
): Promise<void> {
const pendingHeight = this.chainObserver.pendingBlockHeight;
this.warn(
`Height regression detected: tip=${incomingHeight}, processed=${pendingHeight}. Reverting.`,
);

try {
await this.revertChain(incomingHeight, pendingHeight, newBest, true);

this.startTasks();
} catch (e) {
this.panic(`Height regression reorg failed: ${e}`);
}
}

private async notifyThreadReorg(
fromHeight: bigint,
toHeight: bigint,
Expand Down
18 changes: 18 additions & 0 deletions src/src/blockchain-indexer/processor/reorg/ReorgWatchdog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,24 @@ export class ReorgWatchdog extends Logger {

const chainReorged: boolean = await this.verifyChainReorg(task.block);
if (!chainReorged) {
// Also verify that the block we're processing is still the canonical
// block at this height. Two competing blocks can share the same parent
// (passing the previousBlockHash check) but have different hashes.
// currentHeader comes from the RPC tip, if heights match, compare hashes.
if (
this.currentHeader.blockNumber === task.tip &&
this.currentHeader.blockHash !== task.block.hash
) {
this.warn(
`Block hash mismatch at height ${task.tip}: ` +
`processing=${task.block.hash}, canonical=${this.currentHeader.blockHash}`,
);

await this.restoreBlockchain(task.tip);

return true;
}

this.updateBlock(task.block);

return false;
Expand Down
43 changes: 43 additions & 0 deletions src/src/blockchain-indexer/sync/classes/ChainSynchronisation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ export class ChainSynchronisation extends Logger {

// TODO: Move fetching to an other thread.
private async queryBlock(blockNumber: bigint): Promise<DeserializedBlock> {
// In resync mode, only download block headers, no transaction data needed.
// Transactions are preserved from the original sync; only headers/witnesses
// are re-generated.
if (Config.DEV.RESYNC_BLOCK_HEIGHTS) {
return this.queryBlockHeaderOnly(blockNumber);
}

Comment on lines +345 to +351
return new Promise<DeserializedBlock>(async (resolve, reject) => {
try {
this.bestTip = blockNumber;
Expand Down Expand Up @@ -403,6 +410,42 @@ export class ChainSynchronisation extends Logger {
});
}

/**
* Fetch only the block header (no transaction data) for resync mode.
* Uses getBlockInfoOnly which returns BlockData with tx as txid strings only,
* avoiding the heavy getBlockInfoWithTransactionData RPC call.
*/
private async queryBlockHeaderOnly(blockNumber: bigint): Promise<DeserializedBlock> {
this.bestTip = blockNumber;

const blockHash = await this.rpcClient.getBlockHash(Number(blockNumber));
if (!blockHash) {
throw new Error(`Block hash not found for block ${blockNumber}`);
}

const blockData = await this.rpcClient.getBlockInfoOnly(blockHash);
if (!blockData) {
throw new Error(`Block header not found for block ${blockNumber}`);
}

const abortController = new AbortController();
this.abortControllers.set(blockNumber, abortController);

const block = new Block({
network: this.network,
abortController: abortController,
header: blockData,
processEverythingAsGeneric: true,
});

return {
header: block.header.toJSON(),
rawTransactionData: [],
transactionOrder: undefined,
addressCache: new Map<string, string>(),
};
}

/*private async deserializeBlockBatch(startBlock: bigint): Promise<ThreadData> {
// Instead of calling queryBlocks(...) directly, call getBlocks(...) from BlockFetcher
const blocksData = await this.blockFetcher.getBlocks(startBlock, 10);
Expand Down
12 changes: 10 additions & 2 deletions src/src/poc/PoC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,27 @@ export class PoC extends Logger {

private async onBlockProcessed(m: BlockProcessedMessage): Promise<ThreadData> {
// Wait for previous block to finish so height + proof are always in order.
await this.blockProcessedLock;
// Use catch so a failed broadcast doesn't permanently jam the lock.
await this.blockProcessedLock.catch(() => {});

// Broadcast height to ALL witness instances
this.blockProcessedLock = this.sendMessageToAllThreads(ThreadTypes.WITNESS, {
type: MessageType.WITNESS_HEIGHT_UPDATE,
data: { blockNumber: m.data.blockNumber },
});
await this.blockProcessedLock;

try {
await this.blockProcessedLock;
} catch (e: unknown) {
this.error(`Failed to broadcast height update: ${(e as Error).stack}`);
}

// Round-robin proof generation to ONE witness instance
void this.sendMessageToThread(ThreadTypes.WITNESS, {
type: MessageType.WITNESS_BLOCK_PROCESSED,
data: m.data,
}).catch((e: unknown) => {
this.error(`Failed to dispatch WITNESS_BLOCK_PROCESSED: ${(e as Error).stack}`);
});

// Update consensus height on this thread
Expand Down
2 changes: 1 addition & 1 deletion src/src/poc/mempool/manager/Mempool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ export class Mempool extends Logger {
}
}

// Sequential path testMempoolAccept already passed, broadcast each tx.
// Sequential path, testMempoolAccept already passed, broadcast each tx.
return await this.broadcastTransactionsAfterTest(transactions, rawHexes, testResults);
}

Expand Down
36 changes: 18 additions & 18 deletions src/src/poc/networking/p2p/BlockWitnessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ export class BlockWitnessManager extends Logger {

/**
* Queue a self-generated witness for async processing.
* Returns immediately the P2P message handler is not blocked.
* Returns immediately, the P2P message handler is not blocked.
* The queue drains concurrently with up to MAX_CONCURRENT_SELF_PROOFS
* overlapping proof generations, allowing RPC I/O to interleave.
*/
Expand Down Expand Up @@ -243,6 +243,23 @@ export class BlockWitnessManager extends Logger {
await this.processBlockWitnesses(data.blockNumber, blockWitness);
}

public onBlockWitness(blockWitness: IBlockHeaderWitness): void {
if (this.currentBlock === -1n) {
return;
}

const blockNumber: bigint = BigInt(blockWitness.blockNumber.toString());
if (this.currentBlock < blockNumber) {
// note: if not initialized, this.currentBlock is 0n.
this.addToPendingWitnessesVerification(blockNumber, blockWitness);
return;
}

this.enqueueWitnessValidation(blockNumber, blockWitness);
this.processQueuedWitnesses();
this.drainWitnessQueue();
}

/**
* Drain the self-witness queue. For each item:
* 1. Update consensus height sequentially (must be in block order).
Expand Down Expand Up @@ -331,23 +348,6 @@ export class BlockWitnessManager extends Logger {
await this.processBlockWitnesses(data.blockNumber, blockWitness);
}

public onBlockWitness(blockWitness: IBlockHeaderWitness): void {
if (this.currentBlock === -1n) {
return;
}

const blockNumber: bigint = BigInt(blockWitness.blockNumber.toString());
if (this.currentBlock < blockNumber) {
// note: if not initialized, this.currentBlock is 0n.
this.addToPendingWitnessesVerification(blockNumber, blockWitness);
return;
}

this.enqueueWitnessValidation(blockNumber, blockWitness);
this.processQueuedWitnesses();
this.drainWitnessQueue();
}

private revertKnownWitnessesReorg(toBlock: bigint): void {
const blocks: bigint[] = Array.from(this.knownTrustedWitnesses.keys());

Expand Down
8 changes: 4 additions & 4 deletions src/src/poc/witness/WitnessSerializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {
IBlockHeaderWitness,
OPNetBlockWitness,
} from '../networking/protobuf/packets/blockchain/common/BlockHeaderWitness.js';
import { ISyncBlockHeaderResponse } from '../networking/protobuf/packets/blockchain/responses/SyncBlockHeadersResponse.js';
import {
ISyncBlockHeaderResponse
} from '../networking/protobuf/packets/blockchain/responses/SyncBlockHeadersResponse.js';

/**
* Reconstruct a Long value from a structured-clone-degraded plain object.
Expand Down Expand Up @@ -54,9 +56,7 @@ export function reconstructBlockWitness(data: IBlockHeaderWitness): IBlockHeader
/**
* Reconstruct Long values in an ISyncBlockHeaderResponse after structured clone.
*/
export function reconstructSyncResponse(
data: ISyncBlockHeaderResponse,
): ISyncBlockHeaderResponse {
export function reconstructSyncResponse(data: ISyncBlockHeaderResponse): ISyncBlockHeaderResponse {
return {
...data,
blockNumber: toLong(data.blockNumber),
Expand Down
2 changes: 1 addition & 1 deletion src/src/poc/witness/WitnessThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class WitnessThread extends Thread<ThreadTypes.WITNESS> {

if (!this.currentBlockSet) {
this.currentBlockSet = true;
// Height is now set replay any buffered peer witnesses
// Height is now set, replay any buffered peer witnesses
this.flushPendingPeerMessages();
}

Expand Down
1 change: 1 addition & 0 deletions src/src/poc/witness/WitnessThreadManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export class WitnessThreadManager extends ThreadManager<ThreadTypes.WITNESS> {
protected async createLinkBetweenThreads(): Promise<void> {
// Link to P2P: receives forwarded BLOCK_PROCESSED and peer witness data
await this.threadManager.createLinkBetweenThreads(ThreadTypes.P2P);
// RPC link is established from BitcoinRPCThreadManager side (same pattern as P2P→RPC)

// No INDEXER link is needed. The WitnessThread never calls
// getCurrentBlock() (which would require an INDEXER link); instead,
Expand Down
2 changes: 1 addition & 1 deletion src/src/services/ServicesConfigurations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export const ServicesConfigurations: { [key in ThreadTypes]: ThreaderConfigurati
},

[ThreadTypes.WITNESS]: {
maxInstance: 4,
maxInstance: 2,
managerTarget: './src/poc/witness/WitnessThreadManager.js',
target: './src/poc/witness/WitnessThread.js',
},
Expand Down
5 changes: 1 addition & 4 deletions src/src/vm/storage/VMStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ export abstract class VMStorage extends Logger {
};
}

public abstract revertDataUntilBlock(
height: bigint,
purgeUtxos?: boolean,
): Promise<void>;
public abstract revertDataUntilBlock(height: bigint, purgeUtxos?: boolean): Promise<void>;

public abstract revertBlockHeadersOnly(height: bigint): Promise<void>;

Expand Down
2 changes: 1 addition & 1 deletion src/src/vm/storage/databases/VMMongoStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ export class VMMongoStorage extends VMStorage {
);
}

// Target epochs have no block range delete once upfront
// Target epochs have no block range, delete once upfront
this.log(`Purging target epochs...`);
await this.targetEpochRepository.deleteAllTargetEpochs();

Expand Down
Loading
Loading