From 1b0086f09269f1d0d672b4095c5f27aa251ffd83 Mon Sep 17 00:00:00 2001 From: danielAlvess Date: Mon, 14 Dec 2020 18:13:56 +0700 Subject: [PATCH] feat(demux) :sparkles: implement roll back to --- packages/demux/src/AloxideActionHandler.ts | 101 +++- .../demux/test/AloxideActionHandler.test.ts | 565 +++++++++++++++++- 2 files changed, 650 insertions(+), 16 deletions(-) diff --git a/packages/demux/src/AloxideActionHandler.ts b/packages/demux/src/AloxideActionHandler.ts index fc39b02..46ad174 100644 --- a/packages/demux/src/AloxideActionHandler.ts +++ b/packages/demux/src/AloxideActionHandler.ts @@ -1,5 +1,5 @@ import { EntityConfig } from '@aloxide/bridge/src'; -import { AbstractActionHandler } from 'demux'; +import { AbstractActionHandler, MismatchedBlockHashError } from 'demux'; import { indexStateSchema } from './indexStateSchema'; @@ -33,6 +33,16 @@ export class AloxideActionHandler extends AbstractActionHandler { protected dataAdapter: DataAdapter, handlerVersions: HandlerVersion[], options?: AloxideActionHandlerOptions, + /* + user define function to revert data when chain fork; + data to be reverted from blockNumber + 1 to current last processed block number + */ + protected rollbackData?: (blockNumber: number) => Promise, + /* + user define function to delete audit history saved to rollback(if any); + history to be deleted that created by block before blockNumber + */ + protected deleteAuditHistory?: (blockNumber: number) => Promise, ) { super(handlerVersions, options); if (options) { @@ -44,7 +54,7 @@ export class AloxideActionHandler extends AbstractActionHandler { return this.indexStateModelName || `DemuxIndexState_${this.bcName.replace(/\W+/, '_')}`; } - protected updateIndexState( + protected async updateIndexState( state: any, nextBlock: NextBlock, isReplay: boolean, @@ -63,11 +73,23 @@ export class AloxideActionHandler extends AbstractActionHandler { } if (context) { - this.indexStateModel.liBlockNumber = context.info.lastIrreversibleBlockNumber; this.indexStateModel.lpBlockHash = context.info.lastProcessedBlockHash; this.indexStateModel.lpBlockNumber = context.info.lastProcessedBlockNumber; } + if ( + !this.indexStateModel.liBlockNumber || + (+this.indexStateModel.blockNumber > +nextBlock.lastIrreversibleBlockNumber && + +this.indexStateModel.liBlockNumber !== +nextBlock.lastIrreversibleBlockNumber) + ) { + if (this.deleteAuditHistory) { + await this.deleteAuditHistory(nextBlock.lastIrreversibleBlockNumber); + } + } + + this.indexStateModel.liBlockNumber = nextBlock.lastIrreversibleBlockNumber; + this.lastIrreversibleBlockNumber = nextBlock.lastIrreversibleBlockNumber; + return this.dataAdapter .update(this.getIndexStateModelName(), this.indexStateModel, { entity: this.indexStateEntity, @@ -78,6 +100,19 @@ export class AloxideActionHandler extends AbstractActionHandler { .then(() => {}); } + protected async rollbackIndexState(blockNumber: number): Promise { + this.indexStateModel.blockNumber = blockNumber; + this.indexStateModel.blockHash = ''; + + return this.dataAdapter + .update(this.getIndexStateModelName(), this.indexStateModel, { + entity: this.indexStateEntity, + }) + .catch(err => { + this.log.error('---- demux updateIndexState error:', err); + }); + } + protected loadIndexState(): Promise { this.log.debug('-- demux loadIndexState - start'); const DemuxIndexState = this.getIndexStateModelName(); @@ -113,6 +148,8 @@ export class AloxideActionHandler extends AbstractActionHandler { this.indexStateModel.blockNumber = parseInt(this.indexStateModel.blockNumber, 10); } + this.indexStateModel.lastIrreversibleBlockNumber = +this.indexStateModel.liBlockNumber; + this.log.debug('-- demux loadIndexState - block:', item.blockNumber); return item; }) @@ -143,6 +180,59 @@ export class AloxideActionHandler extends AbstractActionHandler { return handle(null, context); } + public async handleBlock(nextBlock: NextBlock, isReplay: boolean): Promise { + try { + return await super.handleBlock(nextBlock, isReplay); + } catch (e) { + if (e instanceof MismatchedBlockHashError) { + if (this.lastProcessedBlockHash) { + /* ActionReader store processed block info in ephemeral memory, + in case of they lost it, they can't detect a fork and return new block to ActionHandler + ActionHandler store lasted index state in DB, detect that fork but can detect the point of fork is + ActionHandler should rollback to last irreversible blockNumber + Ref https://github.com/EOSIO/demux-js/issues/123#issuecomment-465756166 + */ + + // - rollbackTo should be as fast as possible to speed of sync with blockchain + await this.handleRollbackToLastIreversibleBlock(); + return this.lastIrreversibleBlockNumber + 1; + } else { + /* + this.lastProcessedBlockHash is empty, + it means that hanlder has just processed rollback, it don't know current processed block hash is + compare between this.lastProcessedBlockHash and NextBlock.blockHash therefore to be different + just process NextBlock and update it to IndexState + */ + + const handleWithArgs: (state: any, context?: any) => Promise = async ( + state: any, + context: any = {}, + ) => { + await this.handleActions(state, context, nextBlock, isReplay); + }; + await this.handleWithState(handleWithArgs); + return null; + } + } + + throw e; + } + } + + private async handleRollbackToLastIreversibleBlock() { + const rollbackBlockNumber = this.lastIrreversibleBlockNumber; + const rollbackCount = this.lastProcessedBlockNumber - rollbackBlockNumber; + this.log.debug(`Rolling back ${rollbackCount} blocks to block ${rollbackBlockNumber}...`); + const rollbackStart = Date.now(); + await this.rollbackTo(rollbackBlockNumber); + // @ts-ignore + this.rollbackDeferredEffects(this.lastIrreversibleBlockNumber); + const rollbackTime = Date.now() - rollbackStart; + this.log.info( + `Rolled back ${rollbackCount} blocks to block ${rollbackBlockNumber} (${rollbackTime}ms)`, + ); + } + protected async setup(): Promise { /** * Table DemuxIndexState @@ -153,6 +243,9 @@ export class AloxideActionHandler extends AbstractActionHandler { } protected async rollbackTo(blockNumber: number): Promise { - this.log.debug('-- roll back to block number:', blockNumber); + await this.rollbackIndexState(blockNumber); + if (this.rollbackData) { + await this.rollbackData(blockNumber); + } } } diff --git a/packages/demux/test/AloxideActionHandler.test.ts b/packages/demux/test/AloxideActionHandler.test.ts index 5661895..12c0cb5 100644 --- a/packages/demux/test/AloxideActionHandler.test.ts +++ b/packages/demux/test/AloxideActionHandler.test.ts @@ -28,6 +28,10 @@ class TestAloxideActionHandler extends AloxideActionHandler { return this.updateIndexState(state, nextBlock, isReplay, handlerVersionName, context); } + callRollbackIndexState(blockNumber) { + return this.rollbackIndexState(blockNumber); + } + callLoadIndexState() { return this.loadIndexState(); } @@ -60,7 +64,7 @@ describe('test AloxideActionHandler', () => { findAll: jest.fn(), find: jest.fn(), create: jest.fn(), - update: jest.fn(), + update: jest.fn().mockResolvedValue(undefined), delete: jest.fn(), }; } @@ -172,9 +176,7 @@ describe('test AloxideActionHandler', () => { expect(handler.indexStateModel.isReplay).toEqual(isReplay); expect(handler.indexStateModel.handlerVersionName).toEqual(handlerVersionName); - expect(handler.indexStateModel.liBlockNumber).toEqual( - context.info.lastIrreversibleBlockNumber, - ); + expect(handler.indexStateModel.liBlockNumber).toEqual(nextBlock.lastIrreversibleBlockNumber); expect(handler.indexStateModel.lpBlockHash).toEqual(context.info.lastProcessedBlockHash); expect(handler.indexStateModel.lpBlockNumber).toEqual(context.info.lastProcessedBlockNumber); expect(handler.indexStateModel.state).toEqual('{}'); @@ -184,6 +186,179 @@ describe('test AloxideActionHandler', () => { }); }); + it('call prune history if lastIrreversibleBlockNumber change', async () => { + const state = {}; + const nextBlock: NextBlock = { + block: { + actions: [], + blockInfo: { + blockHash: '', + blockNumber: 11, + previousBlockHash: '', + timestamp: new Date(), + }, + }, + blockMeta: { + isEarliestBlock: false, + isNewBlock: true, + isRollback: false, + }, + lastIrreversibleBlockNumber: 5, + }; + const isReplay = false; + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const deleteAuditHistory = jest.fn().mockResolvedValue(undefined); + const rollBackData = jest.fn().mockResolvedValue(undefined); + + const handler = new TestAloxideActionHandler( + bcName, + dataAdapter, + handlerVersions, + { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }, + rollBackData, + deleteAuditHistory, + ); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 2, + blockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + lpBlockHash: '0312597792ca276ac1b484c62c425562e3fd3a0652c307da56169e21aaa76180', + blockNumber: 10, + liBlockNumber: 10, + lpBlockNumber: 10, + handlerVersionName: 'v1', + isReplay: false, + }; + + const context = { + info: handler.info, + }; + + (dataProvider.update as jest.Mock).mockImplementation(() => Promise.resolve({})); + + await handler.callUpdateIndexState(state, nextBlock, isReplay, handlerVersionName, context); + + expect(handler.indexStateModel.blockNumber).toEqual(nextBlock.block.blockInfo.blockNumber); + expect(handler.indexStateModel.blockHash).toEqual(nextBlock.block.blockInfo.blockHash); + + expect(deleteAuditHistory).toBeCalledTimes(1); + expect(deleteAuditHistory).toBeCalledWith(nextBlock.lastIrreversibleBlockNumber); + }); + + it('should not call prune history if last processed block less than lastIrreversibleBlockNumber', async () => { + const state = {}; + const nextBlock: NextBlock = { + block: { + actions: [], + blockInfo: { + blockHash: '', + blockNumber: 5, + previousBlockHash: '', + timestamp: new Date(), + }, + }, + blockMeta: { + isEarliestBlock: false, + isNewBlock: true, + isRollback: false, + }, + lastIrreversibleBlockNumber: 15, + }; + const isReplay = false; + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 10, + blockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + lpBlockHash: '0312597792ca276ac1b484c62c425562e3fd3a0652c307da56169e21aaa76180', + blockNumber: 4, + liBlockNumber: 4, + lpBlockNumber: 4, + handlerVersionName: 'v1', + isReplay: false, + }; + + const context = { + info: handler.info, + }; + + (dataProvider.update as jest.Mock).mockImplementation(() => Promise.resolve({})); + + await handler.callUpdateIndexState(state, nextBlock, isReplay, handlerVersionName, context); + + expect(handler.indexStateModel.blockNumber).toEqual(nextBlock.block.blockInfo.blockNumber); + expect(handler.indexStateModel.blockHash).toEqual(nextBlock.block.blockInfo.blockHash); + }); + + it('should not call prune history if last irreversible block number doesnt not change', async () => { + const state = {}; + const nextBlock: NextBlock = { + block: { + actions: [], + blockInfo: { + blockHash: '', + blockNumber: 5, + previousBlockHash: '', + timestamp: new Date(), + }, + }, + blockMeta: { + isEarliestBlock: false, + isNewBlock: true, + isRollback: false, + }, + lastIrreversibleBlockNumber: 10, + }; + const isReplay = false; + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 10, + blockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + lpBlockHash: '0312597792ca276ac1b484c62c425562e3fd3a0652c307da56169e21aaa76180', + blockNumber: 4, + liBlockNumber: 4, + lpBlockNumber: 4, + handlerVersionName: 'v1', + isReplay: false, + }; + + const context = { + info: handler.info, + }; + + (dataProvider.update as jest.Mock).mockImplementation(() => Promise.resolve({})); + + await handler.callUpdateIndexState(state, nextBlock, isReplay, handlerVersionName, context); + + expect(handler.indexStateModel.blockNumber).toEqual(nextBlock.block.blockInfo.blockNumber); + expect(handler.indexStateModel.blockHash).toEqual(nextBlock.block.blockInfo.blockHash); + }); + it('state null', async () => { const state = null; const nextBlock: NextBlock = { @@ -234,9 +409,7 @@ describe('test AloxideActionHandler', () => { expect(handler.indexStateModel.isReplay).toEqual(isReplay); expect(handler.indexStateModel.handlerVersionName).toEqual(handlerVersionName); - expect(handler.indexStateModel.liBlockNumber).toEqual( - context.info.lastIrreversibleBlockNumber, - ); + expect(handler.indexStateModel.liBlockNumber).toEqual(nextBlock.lastIrreversibleBlockNumber); expect(handler.indexStateModel.lpBlockHash).toEqual(context.info.lastProcessedBlockHash); expect(handler.indexStateModel.lpBlockNumber).toEqual(context.info.lastProcessedBlockNumber); expect(handler.indexStateModel.state).toBeNull(); @@ -293,8 +466,8 @@ describe('test AloxideActionHandler', () => { expect(handler.indexStateModel.blockHash).toEqual(nextBlock.block.blockInfo.blockHash); expect(handler.indexStateModel.isReplay).toEqual(isReplay); expect(handler.indexStateModel.handlerVersionName).toEqual(handlerVersionName); + expect(handler.indexStateModel.liBlockNumber).toBe(nextBlock.lastIrreversibleBlockNumber); - expect(handler.indexStateModel.liBlockNumber).toBeNull(); expect(handler.indexStateModel.lpBlockHash).toBeNull(); expect(handler.indexStateModel.lpBlockNumber).toBeNull(); expect(handler.indexStateModel.state).toBeNull(); @@ -551,18 +724,386 @@ describe('test AloxideActionHandler', () => { describe('callRollbackTo', () => { it('call rollbackTo', async () => { const [dataAdapter, dataProvider] = createDataAdapter(); - const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions); + + const deleteAuditHistory = jest.fn().mockResolvedValue(undefined); + const rollBackData = jest.fn().mockResolvedValue(undefined); + + const handler = new TestAloxideActionHandler( + bcName, + dataAdapter, + handlerVersions, + { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }, + rollBackData, + deleteAuditHistory, + ); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 9898001, + blockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + lpBlockHash: '0312597792ca276ac1b484c62c425562e3fd3a0652c307da56169e21aaa76180', + blockNumber: 9899998, + liBlockNumber: 9899997, + lpBlockNumber: 9899998, + handlerVersionName: 'v1', + isReplay: false, + }; const spyLogDebug = jest.spyOn(handler._log, 'debug'); spyLogDebug.mockImplementation(() => {}); dataProvider.setup = jest.fn(); + const rollBlockNumber = handler.indexStateModel.lastIrreversibleBlockNumber; + await handler.callRollbackTo(rollBlockNumber); + + expect(handler.indexStateModel.blockNumber).toBe(rollBlockNumber); + expect(handler.indexStateModel.blockHash).toBe(''); + + expect(rollBackData).toBeCalledTimes(1); + expect(rollBackData).toBeCalledWith(rollBlockNumber); + }); + + it('should not call rollbackData function if not specify', async () => { + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 9898001, + blockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + lpBlockHash: '0312597792ca276ac1b484c62c425562e3fd3a0652c307da56169e21aaa76180', + blockNumber: 9899998, + liBlockNumber: 9899997, + lpBlockNumber: 9899998, + handlerVersionName: 'v1', + isReplay: false, + }; + + const spyLogDebug = jest.spyOn(handler._log, 'debug'); + spyLogDebug.mockImplementation(() => {}); + + dataProvider.setup = jest.fn(); + + const rollBlockNumber = handler.indexStateModel.lastIrreversibleBlockNumber; + await handler.callRollbackTo(rollBlockNumber); + + expect(handler.indexStateModel.blockNumber).toBe(rollBlockNumber); + expect(handler.indexStateModel.blockHash).toBe(''); + }); + }); + + describe('call rollback index state', () => { + it('roll back index state throw error', async () => { + const [dataAdapter, dataProvider] = createDataAdapter(); + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + + handler.indexStateModel = { + blockHash: null, + blockNumber: null, + handlerVersionName: null, + isReplay: null, + lastIrreversibleBlockNumber: null, + liBlockNumber: null, + lpBlockHash: null, + lpBlockNumber: null, + state: null, + }; + + const spyLogError = jest.spyOn(handler._log, 'error'); + spyLogError.mockImplementation(() => {}); + + const err = 'update error'; + (dataProvider.update as jest.Mock).mockRejectedValue(err); + const blockNumber = 9898989; - await handler.callRollbackTo(blockNumber); - expect(spyLogDebug).toBeCalledTimes(1); - expect(spyLogDebug).toBeCalledWith('-- roll back to block number:', blockNumber); + await handler.callRollbackIndexState(blockNumber); + + expect(handler._log.error).toBeCalledWith('---- demux updateIndexState error:', err); + }); + }); + + describe('test handle block ', () => { + it('should handle block successfully', async () => { + const nextBlock: NextBlock = { + block: { + actions: [], + blockInfo: { + blockHash: '0000270ff9e9206671a48a00061832382960054845ed4b3aca55c29776890982', + blockNumber: 9999, + previousBlockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + timestamp: new Date(), + }, + }, + blockMeta: { + isEarliestBlock: false, + isNewBlock: true, + isRollback: false, + }, + lastIrreversibleBlockNumber: 9990, + }; + const isReplay = false; + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 9990, + blockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + lpBlockHash: '0312597792ca276ac1b484c62c425562e3fd3a0652c307da56169e21aaa76180', + blockNumber: 9998, + liBlockNumber: 9990, + lpBlockNumber: 9998, + handlerVersionName, + isReplay: false, + }; + + const context = { + info: handler.info, + }; + + (dataProvider.update as jest.Mock).mockImplementation(() => Promise.resolve({})); + (dataProvider.find as jest.Mock).mockImplementation(() => + Promise.resolve(handler.indexStateModel), + ); + + await handler.handleBlock(nextBlock, isReplay); + + expect(handler.indexStateModel.blockNumber).toEqual(nextBlock.block.blockInfo.blockNumber); + expect(handler.indexStateModel.blockHash).toEqual(nextBlock.block.blockInfo.blockHash); + expect(handler.indexStateModel.lastIrreversibleBlockNumber).toEqual( + nextBlock.lastIrreversibleBlockNumber, + ); + + expect(dataProvider.update).toBeCalledTimes(1); + }); + + it('should roll back to last irreversible block', async () => { + const nextBlock: NextBlock = { + block: { + actions: [], + blockInfo: { + blockHash: '0000270ff9e9206671a48a00061832382960054845ed4b3aca55c29776890982', + blockNumber: 9999, + previousBlockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + timestamp: new Date(), + }, + }, + blockMeta: { + isEarliestBlock: false, + isNewBlock: true, + isRollback: false, + }, + lastIrreversibleBlockNumber: 9990, + }; + const isReplay = false; + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 9990, + blockHash: '000003e76755ed6228794099680e2d30ea92034a4101120e20fd45ac080e1732', + lpBlockHash: '0312597792ca276ac1b484c62c425562e3fd3a0652c307da56169e21aaa76180', + blockNumber: 9998, + liBlockNumber: 9990, + lpBlockNumber: 9998, + handlerVersionName, + isReplay: false, + }; + + (dataProvider.update as jest.Mock).mockImplementation(() => Promise.resolve({})); + (dataProvider.find as jest.Mock).mockImplementation(() => + Promise.resolve(handler.indexStateModel), + ); + + const mockRollbackToLastIrreverible = jest.spyOn( + handler, + // @ts-ignore + 'handleRollbackToLastIreversibleBlock', + ); + // @ts-ignore + mockRollbackToLastIrreverible.mockResolvedValueOnce(null); + + const nextBlockNeeded = await handler.handleBlock(nextBlock, isReplay); + + expect(nextBlockNeeded).toBe(handler.indexStateModel.lastIrreversibleBlockNumber + 1); + expect(mockRollbackToLastIrreverible).toBeCalledTimes(1); + }); + + it('should handle block normaly if lastProcessedBlockHash is empty', async () => { + const nextBlock: NextBlock = { + block: { + actions: [], + blockInfo: { + blockHash: '0000270ff9e9206671a48a00061832382960054845ed4b3aca55c29776890982', + blockNumber: 9999, + previousBlockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + timestamp: new Date(), + }, + }, + blockMeta: { + isEarliestBlock: false, + isNewBlock: true, + isRollback: false, + }, + lastIrreversibleBlockNumber: 9990, + }; + const isReplay = false; + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 9990, + blockHash: '', + lpBlockHash: '', + blockNumber: 9998, + liBlockNumber: 9990, + lpBlockNumber: 9998, + handlerVersionName, + isReplay: false, + }; + + (dataProvider.update as jest.Mock).mockImplementation(() => Promise.resolve({})); + (dataProvider.find as jest.Mock).mockImplementation(() => + Promise.resolve(handler.indexStateModel), + ); + + const nextBlockNeeded = await handler.handleBlock(nextBlock, isReplay); + + expect(nextBlockNeeded).toBeNull(); + expect(handler.indexStateModel.blockNumber).toEqual(nextBlock.block.blockInfo.blockNumber); + expect(handler.indexStateModel.blockHash).toEqual(nextBlock.block.blockInfo.blockHash); + expect(handler.indexStateModel.lastIrreversibleBlockNumber).toEqual( + nextBlock.lastIrreversibleBlockNumber, + ); + + expect(dataProvider.update).toBeCalledTimes(1); + }); + + it('should handle block throw error', async () => { + const nextBlock: NextBlock = { + block: { + actions: [], + blockInfo: { + blockHash: '0000270ff9e9206671a48a00061832382960054845ed4b3aca55c29776890982', + blockNumber: 9999, + previousBlockHash: '0312588bb49a9b9c58e90bc8f2d2b393379a7af59caeedab48734d4a224f3dce', + timestamp: new Date(), + }, + }, + blockMeta: { + isEarliestBlock: false, + isNewBlock: true, + isRollback: false, + }, + lastIrreversibleBlockNumber: 9990, + }; + const isReplay = false; + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 9990, + blockHash: '', + lpBlockHash: '', + blockNumber: 9998, + liBlockNumber: 9990, + lpBlockNumber: 9998, + handlerVersionName, + isReplay: false, + }; + + const errorMessage = 'can not load data'; + (dataProvider.find as jest.Mock).mockImplementationOnce(() => { + throw new Error(errorMessage); + }); + + await expect(handler.handleBlock(nextBlock, isReplay)).rejects.toThrowError(errorMessage); + }); + }); + + describe('test handle roll back to last irreversible block', () => { + it('should handle block throw error', async () => { + const handlerVersionName = 'v1'; + + const [dataAdapter, dataProvider] = createDataAdapter(); + + const handler = new TestAloxideActionHandler(bcName, dataAdapter, handlerVersions, { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName: 'DemuxIndexState_eos', + }); + + handler.indexStateModel = { + state: '', + lastIrreversibleBlockNumber: 9990, + blockHash: '', + lpBlockHash: '', + blockNumber: 9998, + liBlockNumber: 9990, + lpBlockNumber: 9998, + handlerVersionName, + isReplay: false, + }; + + // @ts-ignore + handler.lastIrreversibleBlockNumber = handler.indexStateModel.lastIrreversibleBlockNumber; + + // @ts-ignore + const mockRollbackTo = jest.spyOn(handler, 'rollbackTo'); + // @ts-ignore + mockRollbackTo.mockResolvedValueOnce(null); + + // @ts-ignore + await handler.handleRollbackToLastIreversibleBlock(); + + expect(mockRollbackTo).toBeCalledTimes(1); + expect(mockRollbackTo).toBeCalledWith(handler.indexStateModel.lastIrreversibleBlockNumber); }); }); });