Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 97 additions & 4 deletions packages/demux/src/AloxideActionHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EntityConfig } from '@aloxide/bridge/src';
import { AbstractActionHandler } from 'demux';
import { AbstractActionHandler, MismatchedBlockHashError } from 'demux';

import { indexStateSchema } from './indexStateSchema';

Expand Down Expand Up @@ -33,6 +33,16 @@ export class AloxideActionHandler extends AbstractActionHandler {
protected dataAdapter: DataAdapter<any, any>,
handlerVersions: HandlerVersion[],
options?: AloxideActionHandlerOptions,
/*
user define function to revert data when chain fork;
data to be reverted from blockNumber + 1 to current last processed block number
*/
protected rollbackData?: (blockNumber: number) => Promise<void>,
/*
user define function to delete audit history saved to rollback(if any);
history to be deleted that created by block before blockNumber
*/
protected deleteAuditHistory?: (blockNumber: number) => Promise<void>,
) {
super(handlerVersions, options);
if (options) {
Expand All @@ -44,7 +54,7 @@ export class AloxideActionHandler extends AbstractActionHandler {
return this.indexStateModelName || `DemuxIndexState_${this.bcName.replace(/\W+/, '_')}`;
}

protected updateIndexState(
protected async updateIndexState(
state: any,
nextBlock: NextBlock,
isReplay: boolean,
Expand All @@ -63,11 +73,23 @@ export class AloxideActionHandler extends AbstractActionHandler {
}

if (context) {
this.indexStateModel.liBlockNumber = context.info.lastIrreversibleBlockNumber;
this.indexStateModel.lpBlockHash = context.info.lastProcessedBlockHash;
this.indexStateModel.lpBlockNumber = context.info.lastProcessedBlockNumber;
}

if (
!this.indexStateModel.liBlockNumber ||
(+this.indexStateModel.blockNumber > +nextBlock.lastIrreversibleBlockNumber &&
+this.indexStateModel.liBlockNumber !== +nextBlock.lastIrreversibleBlockNumber)
) {
if (this.deleteAuditHistory) {
await this.deleteAuditHistory(nextBlock.lastIrreversibleBlockNumber);
}
}

this.indexStateModel.liBlockNumber = nextBlock.lastIrreversibleBlockNumber;
this.lastIrreversibleBlockNumber = nextBlock.lastIrreversibleBlockNumber;

return this.dataAdapter
.update(this.getIndexStateModelName(), this.indexStateModel, {
entity: this.indexStateEntity,
Expand All @@ -78,6 +100,19 @@ export class AloxideActionHandler extends AbstractActionHandler {
.then<void>(() => {});
}

protected async rollbackIndexState(blockNumber: number): Promise<any> {
this.indexStateModel.blockNumber = blockNumber;
this.indexStateModel.blockHash = '';

return this.dataAdapter
.update(this.getIndexStateModelName(), this.indexStateModel, {
entity: this.indexStateEntity,
})
.catch(err => {
this.log.error('---- demux updateIndexState error:', err);
});
}

protected loadIndexState(): Promise<IndexState> {
this.log.debug('-- demux loadIndexState - start');
const DemuxIndexState = this.getIndexStateModelName();
Expand Down Expand Up @@ -113,6 +148,8 @@ export class AloxideActionHandler extends AbstractActionHandler {
this.indexStateModel.blockNumber = parseInt(this.indexStateModel.blockNumber, 10);
}

this.indexStateModel.lastIrreversibleBlockNumber = +this.indexStateModel.liBlockNumber;

this.log.debug('-- demux loadIndexState - block:', item.blockNumber);
return item;
})
Expand Down Expand Up @@ -143,6 +180,59 @@ export class AloxideActionHandler extends AbstractActionHandler {
return handle(null, context);
}

public async handleBlock(nextBlock: NextBlock, isReplay: boolean): Promise<number | null> {
try {
return await super.handleBlock(nextBlock, isReplay);
} catch (e) {
if (e instanceof MismatchedBlockHashError) {
if (this.lastProcessedBlockHash) {
/* ActionReader store processed block info in ephemeral memory,
in case of they lost it, they can't detect a fork and return new block to ActionHandler
ActionHandler store lasted index state in DB, detect that fork but can detect the point of fork is
ActionHandler should rollback to last irreversible blockNumber
Ref https://github.com/EOSIO/demux-js/issues/123#issuecomment-465756166
*/

// - rollbackTo should be as fast as possible to speed of sync with blockchain
await this.handleRollbackToLastIreversibleBlock();
return this.lastIrreversibleBlockNumber + 1;
} else {
/*
this.lastProcessedBlockHash is empty,
it means that hanlder has just processed rollback, it don't know current processed block hash is
compare between this.lastProcessedBlockHash and NextBlock.blockHash therefore to be different
just process NextBlock and update it to IndexState
*/

const handleWithArgs: (state: any, context?: any) => Promise<void> = async (
state: any,
context: any = {},
) => {
await this.handleActions(state, context, nextBlock, isReplay);
};
await this.handleWithState(handleWithArgs);
return null;
}
}

throw e;
}
}

private async handleRollbackToLastIreversibleBlock() {
const rollbackBlockNumber = this.lastIrreversibleBlockNumber;
const rollbackCount = this.lastProcessedBlockNumber - rollbackBlockNumber;
this.log.debug(`Rolling back ${rollbackCount} blocks to block ${rollbackBlockNumber}...`);
const rollbackStart = Date.now();
await this.rollbackTo(rollbackBlockNumber);
// @ts-ignore
this.rollbackDeferredEffects(this.lastIrreversibleBlockNumber);
const rollbackTime = Date.now() - rollbackStart;
this.log.info(
`Rolled back ${rollbackCount} blocks to block ${rollbackBlockNumber} (${rollbackTime}ms)`,
);
}

protected async setup(): Promise<void> {
/**
* Table DemuxIndexState
Expand All @@ -153,6 +243,9 @@ export class AloxideActionHandler extends AbstractActionHandler {
}

protected async rollbackTo(blockNumber: number): Promise<void> {
this.log.debug('-- roll back to block number:', blockNumber);
await this.rollbackIndexState(blockNumber);
if (this.rollbackData) {
await this.rollbackData(blockNumber);
}
}
}
Loading