Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/src/Core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import { DBManagerInstance } from './db/DBManager.js';
import { IndexManager } from './db/indexes/IndexManager.js';
import { ServicesConfigurations } from './services/ServicesConfigurations.js';
import { MessageType } from './threading/enum/MessageType.js';
import {
LinkThreadMessage,
LinkType,
} from './threading/interfaces/thread-messages/messages/LinkThreadMessage.js';
import { LinkThreadMessage, LinkType, } from './threading/interfaces/thread-messages/messages/LinkThreadMessage.js';
import {
LinkThreadRequestData,
LinkThreadRequestMessage,
Expand Down Expand Up @@ -67,6 +64,7 @@ export class Core extends Logger {
if (Config.POC.ENABLED) {
await this.createThread(ThreadTypes.MEMPOOL_MANAGER);
await this.createThread(ThreadTypes.MEMPOOL);
await this.createThread(ThreadTypes.WITNESS);
await this.createThread(ThreadTypes.P2P);
}

Expand Down
1 change: 1 addition & 0 deletions src/src/blockchain-indexer/rpc/BitcoinRPCThreadManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ export class BitcoinRPCThreadManager extends ThreadManager<ThreadTypes.RPC> {
await this.threadManager.createLinkBetweenThreads(ThreadTypes.MEMPOOL);
await this.threadManager.createLinkBetweenThreads(ThreadTypes.P2P);
await this.threadManager.createLinkBetweenThreads(ThreadTypes.API);
await this.threadManager.createLinkBetweenThreads(ThreadTypes.WITNESS);
}
}
3 changes: 3 additions & 0 deletions src/src/blockchain-indexer/rpc/thread/BitcoinRPCThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ export class BitcoinRPCThread extends Thread<ThreadTypes.RPC> {
case ThreadTypes.MEMPOOL: {
return await this.processAPIMessage(m as RPCMessage<BitcoinRPCThreadMessageType>);
}
case ThreadTypes.WITNESS: {
return await this.processAPIMessage(m as RPCMessage<BitcoinRPCThreadMessageType>);
}
default:
this.log(`Unknown thread message received. {Type: ${m.type}}`);
break;
Expand Down
33 changes: 30 additions & 3 deletions src/src/poc/PoC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import { P2PManager } from './networking/P2PManager.js';
import { RPCMessage } from '../threading/interfaces/thread-messages/messages/api/RPCMessage.js';
import { BitcoinRPCThreadMessageType } from '../blockchain-indexer/rpc/thread/messages/BitcoinRPCThreadMessage.js';
import { OPNetBroadcastData } from '../threading/interfaces/thread-messages/messages/api/BroadcastTransactionOPNet.js';
import { IBlockHeaderWitness } from './networking/protobuf/packets/blockchain/common/BlockHeaderWitness.js';

export class PoC extends Logger {
public readonly logColor: string = '#00ffe1';

private readonly p2p: P2PManager;

/** Serializes onBlockProcessed calls so each completes before the next starts. */
private blockProcessedLock: Promise<void> = Promise.resolve();

constructor(private readonly config: BtcIndexerConfig) {
super();

Expand Down Expand Up @@ -61,6 +65,14 @@ export class PoC extends Logger {
}
}

public async broadcastBlockWitness(witness: IBlockHeaderWitness): Promise<void> {
await this.p2p.broadcastBlockWitnessToNetwork(witness);
}

public async requestPeerWitnesses(blockNumber: bigint): Promise<void> {
await this.p2p.requestWitnessesFromPeers(blockNumber);
}

private async handleGetPeerMessage(): Promise<ThreadData> {
const peers = await this.p2p.getOPNetPeers();

Expand Down Expand Up @@ -95,9 +107,24 @@ export class PoC extends Logger {
}

private async onBlockProcessed(m: BlockProcessedMessage): Promise<ThreadData> {
const data = m.data;

await this.p2p.generateBlockHeaderProof(data, true);
// Wait for previous block to finish so height + proof are always in order.
await this.blockProcessedLock;

// Broadcast height to ALL witness instances
this.blockProcessedLock = this.sendMessageToAllThreads(ThreadTypes.WITNESS, {
type: MessageType.WITNESS_HEIGHT_UPDATE,
data: { blockNumber: m.data.blockNumber },
});
await this.blockProcessedLock;

// Round-robin proof generation to ONE witness instance
void this.sendMessageToThread(ThreadTypes.WITNESS, {
type: MessageType.WITNESS_BLOCK_PROCESSED,
data: m.data,
});

// Update consensus height on this thread
this.p2p.updateConsensusHeight(m.data.blockNumber);

return {};
}
Expand Down
28 changes: 28 additions & 0 deletions src/src/poc/PoCThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { ThreadData } from '../threading/interfaces/ThreadData.js';
import { ThreadTypes } from '../threading/thread/enums/ThreadTypes.js';
import { Thread } from '../threading/thread/Thread.js';
import { PoC } from './PoC.js';
import { IBlockHeaderWitness } from './networking/protobuf/packets/blockchain/common/BlockHeaderWitness.js';
import { reconstructBlockWitness } from './witness/WitnessSerializer.js';

export class PoCThread extends Thread<ThreadTypes.P2P> {
public readonly threadType: ThreadTypes.P2P = ThreadTypes.P2P;
Expand Down Expand Up @@ -48,6 +50,9 @@ export class PoCThread extends Thread<ThreadTypes.P2P> {
case ThreadTypes.SSH: {
return await this.handleBitcoinIndexerMessage(m);
}
case ThreadTypes.WITNESS: {
return await this.handleWitnessMessage(m);
}
default: {
throw new Error(`Unknown message sent by thread of type: ${type}`);
}
Expand All @@ -63,6 +68,29 @@ export class PoCThread extends Thread<ThreadTypes.P2P> {
): Promise<ThreadData> {
return await this.poc.handleBitcoinIndexerMessage(m);
}

private async handleWitnessMessage(
m: ThreadMessageBase<MessageType>,
): Promise<ThreadData | undefined> {
switch (m.type) {
case MessageType.WITNESS_BROADCAST: {
// Witness thread wants us to broadcast a witness to peers.
// Long instances lose their prototype after structured clone
// (worker_threads postMessage); reconstruct before use.
const witness = reconstructBlockWitness(m.data as IBlockHeaderWitness);
await this.poc.broadcastBlockWitness(witness);
return {};
}
case MessageType.WITNESS_REQUEST_PEERS: {
// Witness thread wants us to request witnesses from peers
const data = m.data as { blockNumber: bigint };
await this.poc.requestPeerWitnesses(data.blockNumber);
return {};
}
default:
return undefined;
}
}
}

new PoCThread();
1 change: 1 addition & 0 deletions src/src/poc/PoCThreadManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export class PoCThreadManager extends ThreadManager<ThreadTypes.P2P> {
protected async createLinkBetweenThreads(): Promise<void> {
await this.threadManager.createLinkBetweenThreads(ThreadTypes.INDEXER);
await this.threadManager.createLinkBetweenThreads(ThreadTypes.API);
await this.threadManager.createLinkBetweenThreads(ThreadTypes.WITNESS);
}

private async createAllThreads(): Promise<void> {
Expand Down
54 changes: 48 additions & 6 deletions src/src/poc/networking/P2PManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { OPNetPeer } from '../peer/OPNetPeer.js';
import { DisconnectionCode } from './enums/DisconnectionCode.js';
import { BlockWitnessManager } from './p2p/BlockWitnessManager.js';
import { IBlockHeaderWitness } from './protobuf/packets/blockchain/common/BlockHeaderWitness.js';
import { ISyncBlockHeaderResponse } from './protobuf/packets/blockchain/responses/SyncBlockHeadersResponse.js';
import { ITransactionPacket } from './protobuf/packets/blockchain/common/TransactionPacket.js';
import { OPNetPeerInfo } from './protobuf/packets/peering/DiscoveryResponsePacket.js';
import { AuthenticationManager } from './server/managers/AuthenticationManager.js';
Expand Down Expand Up @@ -191,6 +192,40 @@ export class P2PManager extends Logger {
throw new Error('sendMessageToAllThreads not implemented.');
};

/**
* Queue a self-generated block header proof for async processing.
* Returns immediately so the caller's thread is not blocked.
*/
public queueBlockHeaderProof(data: BlockProcessedData): void {
this.blockWitnessManager.queueSelfWitness(data, () => {
// After witness is generated, request witnesses from peers.
if (Config.OP_NET.MODE !== OPNetIndexerMode.LIGHT) {
if (data.blockNumber - 1n > 0n) {
void this.requestBlockWitnessesFromPeer(data.blockNumber - 1n);
}

void this.requestBlockWitnessesFromPeer(data.blockNumber);
}
});
}

public updateConsensusHeight(blockNumber: bigint): void {
void this.blockWitnessManager.setCurrentBlock(blockNumber, true);
}

public async broadcastBlockWitnessToNetwork(witness: IBlockHeaderWitness): Promise<void> {
await this.broadcastBlockWitness(witness);
}

public async requestWitnessesFromPeers(blockNumber: bigint): Promise<void> {
if (Config.OP_NET.MODE !== OPNetIndexerMode.LIGHT) {
if (blockNumber - 1n > 0n) {
await this.requestBlockWitnessesFromPeer(blockNumber - 1n);
}
await this.requestBlockWitnessesFromPeer(blockNumber);
}
}

public async generateBlockHeaderProof(
data: BlockProcessedData,
isSelf: boolean = false,
Expand Down Expand Up @@ -895,12 +930,19 @@ export class P2PManager extends Logger {
peer.sendMsg = this.sendToPeer.bind(this);
peer.reportAuthenticatedPeer = this.reportAuthenticatedPeer.bind(this);
peer.getOPNetPeers = this.getOPNetPeers.bind(this);
peer.onBlockWitness = this.blockWitnessManager.onBlockWitness.bind(
this.blockWitnessManager,
);
peer.onBlockWitnessResponse = this.blockWitnessManager.onBlockWitnessResponse.bind(
this.blockWitnessManager,
);
// Forward incoming peer witnesses to the dedicated WITNESS thread
peer.onBlockWitness = (witness: IBlockHeaderWitness) => {
void this.sendMessageToThread(ThreadTypes.WITNESS, {
type: MessageType.WITNESS_PEER_DATA,
data: witness,
});
};
peer.onBlockWitnessResponse = (packet: ISyncBlockHeaderResponse): void => {
void this.sendMessageToThread(ThreadTypes.WITNESS, {
type: MessageType.WITNESS_PEER_RESPONSE,
data: packet,
});
};
peer.onPeersDiscovered = this.onOPNetPeersDiscovered.bind(this);
peer.requestBlockWitnesses = this.blockWitnessManager.requestBlockWitnesses.bind(
this.blockWitnessManager,
Expand Down
120 changes: 119 additions & 1 deletion src/src/poc/networking/p2p/BlockWitnessManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class BlockWitnessManager extends Logger {

private MAXIMUM_WITNESSES_PER_MESSAGE: number = 20;

/** Witness validation concurrency control */
/** Witness validation concurrency control (external peer witnesses) */
private witnessQueue: Array<{ blockNumber: bigint; witness: IBlockHeaderWitness }> = [];
private activeValidations: number = 0;

Expand All @@ -66,6 +66,21 @@ export class BlockWitnessManager extends Logger {

private blockValidationCount: Map<bigint, number> = new Map();

/** Self-witness generation queue (option 2: decouple via queue) */
private selfWitnessQueue: Array<{
data: BlockProcessedData;
onComplete?: () => void;
onHeightSet?: () => void;
}> = [];

/** Concurrent self-witness proof generation (option 3: threaded processing) */
private activeSelfProofs: number = 0;
private selfQueueDraining: boolean = false;
private readonly MAX_CONCURRENT_SELF_PROOFS: number = 10;

/** Resolvers waiting for a concurrency slot to open. */
private slotWaiters: Array<() => void> = [];

constructor(
private readonly config: BtcIndexerConfig,
private readonly identity: OPNetIdentity,
Expand Down Expand Up @@ -186,6 +201,21 @@ export class BlockWitnessManager extends Logger {
OPNetConsensus.setBlockHeight(this.currentBlock, reorg);
}

/**
* Queue a self-generated witness for async processing.
* Returns immediately — the P2P message handler is not blocked.
* The queue drains concurrently with up to MAX_CONCURRENT_SELF_PROOFS
* overlapping proof generations, allowing RPC I/O to interleave.
*/
public queueSelfWitness(
data: BlockProcessedData,
onComplete?: () => void,
onHeightSet?: () => void,
): void {
this.selfWitnessQueue.push({ data, onComplete, onHeightSet });
this.drainSelfWitnessQueue();
}

public async generateBlockHeaderProof(
data: BlockProcessedData,
isSelf: boolean,
Expand Down Expand Up @@ -213,6 +243,94 @@ export class BlockWitnessManager extends Logger {
await this.processBlockWitnesses(data.blockNumber, blockWitness);
}

/**
* Drain the self-witness queue. For each item:
* 1. Update consensus height sequentially (must be in block order).
* 2. Fire off proof generation concurrently (up to MAX_CONCURRENT_SELF_PROOFS).
*
* This allows multiple proof generations to overlap their I/O (RPC round-trips)
* while keeping consensus height updates strictly ordered.
*/
private drainSelfWitnessQueue(): void {
if (this.selfQueueDraining) return;
this.selfQueueDraining = true;

void this.processSelfQueue().finally(() => {
this.selfQueueDraining = false;
});
}

private async processSelfQueue(): Promise<void> {
while (this.selfWitnessQueue.length > 0) {
// Wait for a concurrency slot
if (this.activeSelfProofs >= this.MAX_CONCURRENT_SELF_PROOFS) {
await new Promise<void>((resolve) => {
this.slotWaiters.push(resolve);
});
}

const item = this.selfWitnessQueue.shift();
if (!item) continue;

const { data, onComplete, onHeightSet } = item;

// Sequential: update consensus height in block order.
// This must happen before firing off the concurrent proof.
if (this.currentBlock >= data.blockNumber) {
this.revertKnownWitnessesReorg(data.blockNumber);
}
await this.setCurrentBlock(data.blockNumber, true);

if (onHeightSet) {
try {
onHeightSet();
} catch (e: unknown) {
this.error(`Self witness onHeightSet error: ${(e as Error).stack}`);
}
}

// Concurrent: fire off proof generation without awaiting.
// Multiple proofs overlap their RPC I/O on the event loop.
this.activeSelfProofs++;
void this.generateSelfProof(data)
.catch((e: unknown) => {
this.error(`Self witness generation error: ${(e as Error).stack}`);
})
.finally(() => {
this.activeSelfProofs--;

// Wake up a waiter if one is blocked on a concurrency slot
if (this.slotWaiters.length > 0) {
const waiter = this.slotWaiters.shift();
if (waiter) waiter();
}

if (onComplete) {
try {
onComplete();
} catch (e: unknown) {
this.error(`Self witness onComplete error: ${(e as Error).stack}`);
}
}
});
}
}

private async generateSelfProof(data: BlockProcessedData): Promise<void> {
const blockChecksumHash = this.generateBlockHeaderChecksumHash(data);
const signedWitness = this.identity.acknowledgeData(blockChecksumHash);
const trustedWitness = this.identity.acknowledgeTrustedData(blockChecksumHash);

const blockWitness: IBlockHeaderWitness = {
...data,
blockNumber: Long.fromString(data.blockNumber.toString()),
validatorWitnesses: [signedWitness],
trustedWitnesses: [trustedWitness],
};

await this.processBlockWitnesses(data.blockNumber, blockWitness);
}

public onBlockWitness(blockWitness: IBlockHeaderWitness): void {
if (this.currentBlock === -1n) {
return;
Expand Down
Loading
Loading