diff --git a/packages/demux/src/AloxideActionHandler.ts b/packages/demux/src/AloxideActionHandler.ts index fc39b02..5a1083f 100644 --- a/packages/demux/src/AloxideActionHandler.ts +++ b/packages/demux/src/AloxideActionHandler.ts @@ -1,5 +1,5 @@ -import { EntityConfig } from '@aloxide/bridge/src'; -import { AbstractActionHandler } from 'demux'; +import { EntityConfig } from '@aloxide/bridge'; +import { AbstractActionHandler, BlockInfo, Updater, VersionedAction } from 'demux'; import { indexStateSchema } from './indexStateSchema'; @@ -13,9 +13,14 @@ import type { import type { DataAdapter } from './DataAdapter'; import type { DMeta } from './DMeta'; import type { IndexStateModel } from './IndexStateModel'; +import { VersatileUpdater } from './VersatileUpdater'; export interface AloxideActionHandlerOptions extends ActionHandlerOptions { indexStateModelName?: string; + handlers?: { + actionName: string; + handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void; + }[]; } export interface AloxideActionHandlerContext { @@ -37,6 +42,13 @@ export class AloxideActionHandler extends AbstractActionHandler { super(handlerVersions, options); if (options) { this.indexStateModelName = options.indexStateModelName; + + // add initial handlers + const handlers = Array.isArray(options.handlers) ? options.handlers : []; + + for (const { handler, actionName } of handlers) { + this.addHandler(handler, actionName); + } } } @@ -44,6 +56,96 @@ export class AloxideActionHandler extends AbstractActionHandler { return this.indexStateModelName || `DemuxIndexState_${this.bcName.replace(/\W+/, '_')}`; } + /** + * @override + * @param candidateType The incoming action's type + * @param subscribedType The type the Updater of Effect is subscribed to + * @param _payload The payload of the incoming Action. + */ + matchActionType(candidateType, subscribedType, _payload?): boolean { + if (subscribedType === '*') { + return true; + } + + return candidateType === subscribedType; + } + + /** + * @override + * @param state + * @param nextBlock + * @param context + * @param isReplay + */ + applyUpdaters( + state: any, + nextBlock: NextBlock, + context: any, + isReplay: boolean, + ): Promise { + // Add additional data to payload for further handling. + nextBlock.block.actions.forEach(action => { + action.payload.actionType = action.type; + }); + + return super.applyUpdaters(state, nextBlock, context, isReplay); + } + + /** + * Add updater to handle data + * @param updater Updater + */ + addUpdater(updater: Updater) { + // @ts-ignore + if (this.handlerVersionMap) { + // @ts-ignore + const updaters = this.handlerVersionMap[this.handlerVersionName].updaters; + updaters.push(updater); + } else { + throw new Error('"handlerVersionMap" not found'); + } + } + + /** + * Add custom handler for custom action + * @param handler hanlder function + * @param actionName action name string + */ + addHandler( + handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void, + actionName?: string, + ) { + const versatileUpdaters: VersatileUpdater[] = this.getVersatileUpdaters(); + + if (versatileUpdaters.length === 0) { + this.log.warn( + '"addHandler" is intended to use only with Versatile Updaters which can handle all types of actions (actionType = "*")', + ); + return; + } + + for (const updater of versatileUpdaters) { + updater.addHandler(handler, actionName); + } + + return true; + } + + private getVersatileUpdaters(): VersatileUpdater[] { + let updaters: any[] = []; + // @ts-ignore + if (this.handlerVersionMap) { + // @ts-ignore + updaters = this.handlerVersionMap[this.handlerVersionName].updaters; + } + + return updaters.filter( + updater => + (updater instanceof VersatileUpdater || typeof updater.addHandler === 'function') && + updater.actionType === '*', + ); + } + protected updateIndexState( state: any, nextBlock: NextBlock, diff --git a/packages/demux/src/VersatileUpdater.ts b/packages/demux/src/VersatileUpdater.ts new file mode 100644 index 0000000..ddfb8c5 --- /dev/null +++ b/packages/demux/src/VersatileUpdater.ts @@ -0,0 +1,86 @@ +import type { ActionCallback, BlockInfo, Updater } from 'demux'; +import type { Logger } from './Logger'; + +export class VersatileUpdater implements Updater { + #_handlersMap: Map = new Map(); + actionType: string = '*'; + logger?: Logger; + + constructor(options: { logger?: Logger; actionType?: string } = {}) { + if (typeof options.actionType === 'string') { + this.actionType = options.actionType; + } + this.logger = options.logger; + } + + apply: ActionCallback = ( + state: any, + payload: any, + blockInfo: BlockInfo, + context: any, + ): Promise => { + const actionName = payload.actionType; + + this.handleData(actionName, { + state, + payload, + blockInfo, + context, + }); + return Promise.resolve(); + }; + + addHandler( + handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void, + actionName?: string, + ) { + if (typeof handler !== 'function') { + throw new Error('"handler" is required and must be a function'); + } + + if (!actionName) { + actionName = this.actionType; + } + + // TODO: enhance this simple check by using regex. + if (typeof actionName !== 'string' || actionName.indexOf('::') < 1) { + throw new Error( + `"actionName" must be a string and must contain account which this action belong to. Ex: "eosio::${actionName}"`, + ); + } + + if (this.actionType !== '*' && this.actionType !== actionName) { + throw new Error(`This Updater is used to handle "${this.actionType}" action only`); + } + + const handlerMap = this.#_handlersMap; + const newSymbol = Symbol(actionName); + this[newSymbol] = handler; + + if (handlerMap.has(actionName)) { + handlerMap.get(actionName).push(newSymbol); + } else { + handlerMap.set(actionName, [newSymbol]); + } + + return true; + } + + protected async handleData(actionName: string, data: any, scope?: any): Promise { + const handlerMap = this.#_handlersMap; + const handlerSymbols: symbol[] = handlerMap.get(actionName) || []; + + if (handlerSymbols.length === 0) return; + + // Pass custom scope to prevent suspicious handler from modifying real object by . + scope = scope ? scope : {}; + const handlerCalls = []; + + // Execute all handlers + for (const symbol of handlerSymbols) { + handlerCalls.push(this[symbol].call(scope, data)); + } + + return Promise.all(handlerCalls); + } +} diff --git a/packages/demux/src/createWatcher.ts b/packages/demux/src/createWatcher.ts index 660ac57..25e0f5e 100644 --- a/packages/demux/src/createWatcher.ts +++ b/packages/demux/src/createWatcher.ts @@ -9,6 +9,7 @@ import type { ActionReader, ActionHandler, HandlerVersion, ActionWatcherOptions import type { AloxideActionHandlerOptions } from './AloxideActionHandler'; import type { Logger } from './Logger'; import type { AloxideConfig } from '@aloxide/abstraction'; +import { VersatileUpdater } from './VersatileUpdater'; export interface CreateWatcherConfig { /** @@ -27,7 +28,7 @@ export interface CreateWatcherConfig { actionWatcherOptions?: ActionWatcherOptions; } -export async function createWatcher(config: CreateWatcherConfig): Promise { +export function createWatcher(config: CreateWatcherConfig): BaseActionWatcher { const { bcName, accountName, @@ -47,7 +48,11 @@ export async function createWatcher(config: CreateWatcherConfig): Promise { }); expect(handler.getIndexStateModelName()).toEqual(indexStateModelName); }); + + it('should not add any handler when there\'s no "handlers" config found', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const addHandlerMock = jest.spyOn(AloxideActionHandler.prototype, 'addHandler'); + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVersions); + + expect(actionHandler).toBeDefined(); + expect(addHandlerMock).toBeCalledTimes(0); + }); + + it('should add handler defined in "handlers" config on creation stage', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const addHandlerMock = jest + .spyOn(AloxideActionHandler.prototype, 'addHandler') + .mockReturnValueOnce(true); + const handler = () => {}; + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVersions, { + handlers: [ + { + actionName: 'account::test_action1', + handler, + }, + { + actionName: 'account::test_action2', + handler, + }, + ], + }); + + expect(actionHandler).toBeDefined(); + expect(addHandlerMock).toBeCalledTimes(2); + expect(addHandlerMock.mock.calls[0]).toEqual([handler, 'account::test_action1']); + expect(addHandlerMock.mock.calls[1]).toEqual([handler, 'account::test_action2']); + }); + }); + + describe('matchActionType()', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVersions); + + it('should not be matched when action type and subscribed type are not equal', () => { + let res = actionHandler.matchActionType('some_action', 'subscribed_action'); + expect(res).toBe(false); + + res = actionHandler.matchActionType('some_action', 'some_other_action'); + expect(res).toBe(false); + }); + + it('should be matched when action type and subscribed type are exactly equal', () => { + const res = actionHandler.matchActionType('some_action', 'some_action'); + + expect(res).toBe(true); + }); + + it('should be matched when subscribed type are "*" (means "all")', () => { + let res = actionHandler.matchActionType('some_action', '*'); + expect(res).toBe(true); + + res = actionHandler.matchActionType('some_other_action', '*'); + expect(res).toBe(true); + }); + }); + + describe('applyUpdaters()', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + + it('should add additional data to payload before further process', () => { + const blockInfo = { + block: { + actions: [ + { + type: 'type1', + payload: { + somedata: 'test_payload', + }, + }, + { + type: 'type2', + payload: { + somedata: 'test_payload', + }, + }, + ], + blockInfo: { + blockHash: 'test_hash', + blockNumber: 123, + previousBlockHash: 'test_hash', + timestamp: new Date(), + }, + }, + blockMeta: { + isRollback: true, + isEarliestBlock: true, + isNewBlock: true, + }, + lastIrreversibleBlockNumber: 123, + }; + const applyUpdatersParentMock = jest + // @ts-ignore + .spyOn(AbstractActionHandler.prototype, 'applyUpdaters') + // @ts-ignore + .mockReturnValueOnce('test'); + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVersions); + + const res = actionHandler.applyUpdaters('test_state', blockInfo, 'context', true); + + expect(res).toBe('test'); + expect(applyUpdatersParentMock).toBeCalledTimes(1); + expect(applyUpdatersParentMock).toBeCalledWith('test_state', blockInfo, 'context', true); + }); + }); + + describe('addUpdater()', () => { + it('should throw error when cannot find parent handler version map', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVersions); + const updater = new VersatileUpdater(); + + // @ts-ignore + actionHandler.handlerVersionMap = undefined; + + expect(() => { + actionHandler.addUpdater(updater); + }).toThrowError('"handlerVersionMap" not found'); + }); + + it('should add updater', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const handlerVer: HandlerVersion[] = [new BaseHandlerVersion('v1', [], [])]; + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVer); + const updater = new VersatileUpdater(); + + // @ts-ignore + expect(actionHandler.handlerVersionMap[actionHandler.handlerVersionName].updaters).toEqual( + [], + ); + + actionHandler.addUpdater(updater); + // @ts-ignore + expect(actionHandler.handlerVersionMap[actionHandler.handlerVersionName].updaters).toEqual([ + updater, + ]); + }); + }); + + describe('addHandler()', () => { + it('should warn and not allow to add handler when cannot find any Versatile Updater', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const handlerVer: HandlerVersion[] = [new BaseHandlerVersion('v1', [], [])]; + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVer); + // @ts-ignore + const loggerMock = (actionHandler.log.warn = jest.fn()); + const handler = () => {}; + + const res = actionHandler.addHandler(handler, 'some_action'); + expect(res).toBeUndefined(); + expect(loggerMock).toBeCalledWith( + '"addHandler" is intended to use only with Versatile Updaters which can handle all types of actions (actionType = "*")', + ); + }); + + it("should allow to add handler when there's any Versatile Updater", () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const updater = new VersatileUpdater(); + updater.addHandler = jest.fn(); + const handlerVer: HandlerVersion[] = [new BaseHandlerVersion('v1', [updater], [])]; + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVer); + // @ts-ignore + const loggerMock = (actionHandler.log.warn = jest.fn()); + const handler = () => {}; + + const res = actionHandler.addHandler(handler, 'some_action'); + expect(res).toBe(true); + expect(loggerMock).toBeCalledTimes(0); + expect(updater.addHandler).toBeCalledTimes(1); + }); + + it('should call addHandler() of Updaters, which can handle all types of action, to add handlers', () => { + const adapter = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + const updater = new VersatileUpdater(); + updater.addHandler = jest.fn(); + const updater2 = new VersatileUpdater({ + actionType: 'some_action', + }); + updater2.addHandler = jest.fn(); + const handlerVer: HandlerVersion[] = [new BaseHandlerVersion('v1', [], [])]; + const actionHandler = new AloxideActionHandler(bcName, adapter, handlerVer); + // @ts-ignore + actionHandler.log.warn = jest.fn(); + const handler = () => {}; + + actionHandler.addUpdater(updater); + actionHandler.addUpdater(updater2); + + const res = actionHandler.addHandler(handler, 'some_action'); + expect(res).toBe(true); + + expect(updater.addHandler).toBeCalledTimes(1); + expect(updater.addHandler).toBeCalledWith(handler, 'some_action'); + expect(updater2.addHandler).toBeCalledTimes(0); + }); }); describe('test updateIndexState', () => { diff --git a/packages/demux/test/VersatileUpdater.test.ts b/packages/demux/test/VersatileUpdater.test.ts new file mode 100644 index 0000000..13ce979 --- /dev/null +++ b/packages/demux/test/VersatileUpdater.test.ts @@ -0,0 +1,207 @@ +import { BlockInfo } from 'demux'; +import { VersatileUpdater } from '../src/VersatileUpdater'; + +describe('test VersatileUpdater', () => { + describe('VersatileUpdater creation', () => { + it('should create VersatileUpdater with default "*" action type', () => { + const updater = new VersatileUpdater(); + + expect(updater.actionType).toBe('*'); + }); + + it('should create VersatileUpdater with specified action type and logger', () => { + const sampleLogger = {}; + const updater = new VersatileUpdater({ + actionType: 'account::action', + logger: sampleLogger, + }); + + expect(updater.logger).toBe(sampleLogger); + expect(updater.actionType).toEqual('account::action'); + }); + }); + + describe('apply()', () => { + it('should proceed data', () => { + const updater = new VersatileUpdater(); + const blockInfo: BlockInfo = { + blockHash: 'hash value', + blockNumber: 123, + previousBlockHash: 'hash value', + timestamp: new Date(), + }; + // @ts-ignore + const handleDataMock = jest.spyOn(updater, 'handleData').mockResolvedValue([]); + + updater.apply( + 'test_state', + { + actionType: 'test_action', + }, + blockInfo, + 'test_context', + ); + expect(handleDataMock).toBeCalledWith('test_action', { + state: 'test_state', + payload: { + actionType: 'test_action', + }, + blockInfo, + context: 'test_context', + }); + }); + }); + + describe('addHandler()', () => { + it('should throw error when handler is not a function', () => { + const updater = new VersatileUpdater(); + const handler = 'invalid_handler'; + + expect(() => { + // @ts-ignore + updater.addHandler(handler); + }).toThrowError('"handler" is required and must be a function'); + }); + + it('should throw error when action name is not a string', () => { + const updater = new VersatileUpdater(); + const handler = () => {}; + + expect(() => { + // @ts-ignore + updater.addHandler(handler, {}); + }).toThrowError( + '"actionName" must be a string and must contain account which this action belong to.', + ); + + expect(() => { + updater.addHandler(handler, 'test_action'); + }).toThrowError( + '"actionName" must be a string and must contain account which this action belong to.', + ); + }); + + it("should throw error when specified action name doesn't follow format which contain account name", () => { + const updater = new VersatileUpdater(); + const handler = () => {}; + + expect(() => { + updater.addHandler(handler, '::test_action'); + }).toThrowError( + '"actionName" must be a string and must contain account which this action belong to.', + ); + }); + + it('should not allow adding handler when action name is different to defined action type', () => { + const updater = new VersatileUpdater({ + actionType: 'test_type', + }); + const handler = () => {}; + const actionName = 'account::test_action'; + + expect(() => { + updater.addHandler(handler, actionName); + }).toThrowError(`This Updater is used to handle "test_type" action only`); + }); + + it('should add handler to handle initialised action type by default', async () => { + const updater = new VersatileUpdater({ + actionType: 'account::test_action', + }); + const handler = jest.fn(); + const data = 'test_data'; + + // Add handler without passing action name, it should add handle to handle `account::test_action` by default + updater.addHandler(handler); + + // @ts-ignore + await updater.handleData('account::test_action', data); + + expect(handler).toBeCalledTimes(1); + }); + + it('should allow to add handlers when action type is "*"', () => { + const updater = new VersatileUpdater({ + actionType: '*', + }); + const handler = () => {}; + const actionName = 'account::test_action'; + + expect(updater.addHandler(handler, actionName)).toBe(true); + }); + }); + + describe('handleData()', () => { + it("should not call any handlers if there's no action matched", async () => { + const updater = new VersatileUpdater({ + actionType: '*', + }); + const handler = jest.fn(); + const handler2 = jest.fn(); + const data = 'test_data'; + + // Add handlers first + updater.addHandler(handler, 'account::test_action'); + updater.addHandler(handler2, 'account::test_action'); + + // @ts-ignore + await updater.handleData('account::non-matched-action', data); + + expect(handler).toBeCalledTimes(0); + expect(handler2).toBeCalledTimes(0); + }); + + it('should call all handlers', async () => { + const updater = new VersatileUpdater({ + actionType: '*', + }); + const handler = jest.fn(); + const handler2 = jest.fn(); + const actionName = 'account::test_action'; + const data = 'test_data'; + + // Add handlers first + updater.addHandler(handler, actionName); + updater.addHandler(handler2, actionName); + + // @ts-ignore + await updater.handleData(actionName, data); + + expect(handler).toBeCalledWith(data); + expect(handler2).toBeCalledWith(data); + }); + + it('should call handler with custom scope', async () => { + const updater = new VersatileUpdater({ + actionType: '*', + }); + const scope = { sample_scope: 'sample_scope' }; + const handler = jest.fn(function () { + // test scope + expect(this).toBe(scope); + // @ts-ignore + expect(this.sample_scope).toBe('sample_scope'); + }); + + const handler2 = jest.fn(function () { + // test scope + expect(this).toBe(scope); + // @ts-ignore + expect(this.sample_scope).toBe('sample_scope'); + }); + const actionName = 'account::test_action'; + const data = 'test_data'; + + // Add handlers first + updater.addHandler(handler, actionName); + updater.addHandler(handler2, actionName); + + // @ts-ignore + const res = await updater.handleData(actionName, data, scope); + + expect(handler).toBeCalledWith(data); + expect(handler2).toBeCalledWith(data); + expect.assertions(6); + }); + }); +}); diff --git a/packages/demux/test/createWatcher.test.ts b/packages/demux/test/createWatcher.test.ts index 6ade031..026b4df 100644 --- a/packages/demux/test/createWatcher.test.ts +++ b/packages/demux/test/createWatcher.test.ts @@ -33,10 +33,10 @@ describe('test createWatcher', () => { logger: createLoggerTest(), }; - it('throw error if missing of data provider', async () => { - expect(() => createWatcher(config)).rejects.toThrow( - 'Missing data provider name: DemuxIndexState_any', - ); + it('throw error if missing of data provider', () => { + expect(() => { + createWatcher(config); + }).toThrow('Missing data provider name: DemuxIndexState_any'); }); const dataAdapter = new AloxideDataManager({ @@ -63,7 +63,7 @@ describe('test createWatcher', () => { find: jest.fn(), }); - it('return an watcher', async () => { + it('return an watcher', () => { config.dataAdapter = dataAdapter; config.aloxideConfig.entities.push({ name: 'e1', @@ -76,11 +76,11 @@ describe('test createWatcher', () => { key: 'f1', }); - const watcher = await createWatcher(config); + const watcher = createWatcher(config); expect(watcher).toBeInstanceOf(BaseActionWatcher); }); - it('return an watcher', async () => { + it('return an watcher', () => { const logger = createLoggerTest(); config.dataAdapter = dataAdapter; @@ -92,7 +92,7 @@ describe('test createWatcher', () => { ), ]; - const watcher = await createWatcher(config); + const watcher = createWatcher(config); expect(watcher).toBeInstanceOf(BaseActionWatcher); config.actionHandler = { @@ -105,11 +105,11 @@ describe('test createWatcher', () => { }, initialize: jest.fn(), }; - const watcher2 = await createWatcher(config); + const watcher2 = createWatcher(config); expect(watcher2).toBeInstanceOf(BaseActionWatcher); }); - it('return an watcher', async () => { + it('return an watcher', () => { config.dataAdapter = dataAdapter; config.actionHandler = { @@ -123,7 +123,7 @@ describe('test createWatcher', () => { initialize: jest.fn(), }; - const watcher = await createWatcher(config); + const watcher = createWatcher(config); expect(watcher).toBeInstanceOf(BaseActionWatcher); }); }); diff --git a/packages/example-api-gateway/src/loadEnv.ts b/packages/example-api-gateway/src/loadEnv.ts index d1393d4..5bc862d 100644 --- a/packages/example-api-gateway/src/loadEnv.ts +++ b/packages/example-api-gateway/src/loadEnv.ts @@ -1,5 +1,6 @@ import Logger from 'bunyan'; import dote from 'dotenv-extended'; +import path from 'path'; declare global { var logger: Logger; @@ -14,8 +15,8 @@ declare global { encoding: 'utf8', silent: true, path: '.env', - defaults: '.env.defaults', - schema: '.env.defaults', + defaults: path.resolve(__dirname, '../.env.defaults'), + schema: path.resolve(__dirname, '../.env.defaults'), errorOnMissing: true, errorOnExtra: true, errorOnRegex: false, diff --git a/packages/example-api-gateway/src/readBlockchain.ts b/packages/example-api-gateway/src/readBlockchain.ts index 9457869..71a1414 100644 --- a/packages/example-api-gateway/src/readBlockchain.ts +++ b/packages/example-api-gateway/src/readBlockchain.ts @@ -1,108 +1,108 @@ -import { readAloxideConfig } from '@aloxide/abstraction'; -import { AloxideDataManager, createWatcher } from '@aloxide/demux'; -import { IconActionReader } from '@aloxide/demux-icon'; -import { NodeosActionReader } from 'demux-eos'; - -import config, { connectDb } from './config'; -import { createDataProvider } from './models'; - -const aloxideConfig = readAloxideConfig(config.aloxideConfigPath); -const dataAdapter: AloxideDataManager = new AloxideDataManager({ - dataProviderMap: new Map(), -}); - -if (process.env.app_enable_eos == 'true') { - logger.info('EOS enabled'); - const sequelize = config.sequelize; - - const indexStateModelName = 'is_eos'; - - dataAdapter.dataProviderMap.set( - indexStateModelName, - createDataProvider(sequelize, indexStateModelName, indexStateModelName, true), - ); - - dataAdapter.dataProviderMap.set('Poll', createDataProvider(sequelize, 'Poll', 'Poll')); - dataAdapter.dataProviderMap.set('Vote', createDataProvider(sequelize, 'Vote', 'Vote')); - - // watch EOS - createWatcher({ - bcName: 'eos', - accountName: process.env.app_d_eos_account_name, - aloxideConfig, - dataAdapter, - logger: config.logger, - actionReader: new NodeosActionReader({ - nodeosEndpoint: process.env.app_nodeosEndpoint, - onlyIrreversible: false, - startAtBlock: parseInt(process.env.app_startAtBlock, 10), - }), - actionWatcherOptions: { - pollInterval: 5000, - logLevel: 'info', - logSource: 'sync-EOS', - }, - actionHandlerOptions: { - logLevel: 'info', - logSource: 'handler-EOS', - indexStateModelName, - }, - }) - .then(actionWatcher => { - actionWatcher.start(); - }) - .catch(err => { - logger?.error('---- createWatcher error:', err); - }); -} - -if (process.env.app_enable_icon == 'true') { - logger.info('ICON enabled'); - const sequelize = connectDb(process.env.app_postgres_db_icon); - const indexStateModelName = 'is_icon'; - - dataAdapter.dataProviderMap.set( - indexStateModelName, - createDataProvider(sequelize, indexStateModelName, indexStateModelName, true), - ); - - dataAdapter.dataProviderMap.set('Poll', createDataProvider(sequelize, 'Poll', 'Poll')); - dataAdapter.dataProviderMap.set('Vote', createDataProvider(sequelize, 'Vote', 'Vote')); - - // watch ICON - createWatcher({ - bcName: 'icon', - accountName: 'cxd1c341cba5d21f5c1ea36bade8369270a2fe065c', - aloxideConfig, - dataAdapter, - logger: config.logger, - actionReader: new IconActionReader({ - endpoint: process.env.app_icon_endpoint, - nid: parseInt(process.env.app_icon_nid, 10), - logLevel: 'info', - logSource: 'reader-ICON', - /** - * https://bicon.tracker.solidwallet.io/block/6536668 - */ - startAtBlock: parseInt(process.env.app_icon_startAtBlock, 10), - numRetries: 5, - waitTimeMs: 1000, - }), - actionWatcherOptions: { - pollInterval: 2000, - logLevel: 'info', - logSource: 'sync-ICON', - }, - actionHandlerOptions: { - logLevel: 'info', - logSource: 'handler-ICON', - indexStateModelName, - }, - }) - .then(actionWatcher => { - actionWatcher.start(); - }) - .catch(err => { - logger.error('---- createWatcher error:', err); - }); -} +import { readAloxideConfig } from '@aloxide/abstraction'; +import { AloxideDataManager, createWatcher } from '@aloxide/demux'; +import { IconActionReader } from '@aloxide/demux-icon'; +import { NodeosActionReader } from 'demux-eos'; + +import config, { connectDb } from './config'; +import { createDataProvider } from './models'; + +(async () => { + const aloxideConfig = readAloxideConfig(config.aloxideConfigPath); + const dataAdapter: AloxideDataManager = new AloxideDataManager({ + dataProviderMap: new Map(), + }); + + const watcherTasks = []; + + if (process.env.app_enable_eos == 'true') { + logger?.info('EOS enabled'); + const sequelize = config.sequelize; + + const indexStateModelName = 'is_eos'; + + dataAdapter.dataProviderMap.set( + indexStateModelName, + createDataProvider(sequelize, indexStateModelName, indexStateModelName, true), + ); + + dataAdapter.dataProviderMap.set('Poll', createDataProvider(sequelize, 'Poll', 'Poll')); + dataAdapter.dataProviderMap.set('Vote', createDataProvider(sequelize, 'Vote', 'Vote')); + + // watch EOS + const eosActionWatcher = createWatcher({ + bcName: 'eos', + accountName: process.env.app_d_eos_account_name, + aloxideConfig, + dataAdapter, + logger: config.logger, + actionReader: new NodeosActionReader({ + nodeosEndpoint: process.env.app_nodeosEndpoint, + onlyIrreversible: false, + startAtBlock: parseInt(process.env.app_startAtBlock, 10), + }), + actionWatcherOptions: { + pollInterval: 5000, + logLevel: 'info', + logSource: 'sync-EOS', + }, + actionHandlerOptions: { + logLevel: 'info', + logSource: 'handler-EOS', + indexStateModelName, + }, + }); + + watcherTasks.push(eosActionWatcher.start()); + } + + if (process.env.app_enable_icon == 'true') { + logger?.info('ICON enabled'); + const sequelize = connectDb(process.env.app_postgres_db_icon); + const indexStateModelName = 'is_icon'; + + dataAdapter.dataProviderMap.set( + indexStateModelName, + createDataProvider(sequelize, indexStateModelName, indexStateModelName, true), + ); + + dataAdapter.dataProviderMap.set('Poll', createDataProvider(sequelize, 'Poll', 'Poll')); + dataAdapter.dataProviderMap.set('Vote', createDataProvider(sequelize, 'Vote', 'Vote')); + + // watch ICON + const iconActionWatcher = createWatcher({ + bcName: 'icon', + accountName: 'cxd1c341cba5d21f5c1ea36bade8369270a2fe065c', + aloxideConfig, + dataAdapter, + logger: config.logger, + actionReader: new IconActionReader({ + endpoint: process.env.app_icon_endpoint, + nid: parseInt(process.env.app_icon_nid, 10), + logLevel: 'info', + logSource: 'reader-ICON', + /** + * https://bicon.tracker.solidwallet.io/block/6536668 + */ + startAtBlock: parseInt(process.env.app_icon_startAtBlock, 10), + numRetries: 5, + waitTimeMs: 1000, + }), + actionWatcherOptions: { + pollInterval: 2000, + logLevel: 'info', + logSource: 'sync-ICON', + }, + actionHandlerOptions: { + logLevel: 'info', + logSource: 'handler-ICON', + indexStateModelName, + }, + }); + + watcherTasks.push(iconActionWatcher.start()); + + return Promise.all(watcherTasks); + } +})().catch(err => { + logger?.error('---- createWatcher error: ', err); +}); diff --git a/packages/example-demux/aloxide.yml b/packages/example-demux/aloxide.yml index a7f17bc..a8162f3 100644 --- a/packages/example-demux/aloxide.yml +++ b/packages/example-demux/aloxide.yml @@ -1,7 +1,7 @@ entities: - name: Poll fields: - - name: pollId + - name: id type: uint64_t - name: name type: string @@ -11,19 +11,19 @@ entities: type: uint64_t - name: end type: uint64_t - key: pollId + key: id - name: Option fields: - - name: optionId + - name: id type: uint64_t - name: name type: string - name: pollId type: uint64_t - key: optionId + key: id - name: Vote fields: - - name: voteId + - name: id type: uint64_t - name: point type: number @@ -33,4 +33,4 @@ entities: type: uint64_t - name: optionId type: uint64_t - key: voteId + key: id diff --git a/packages/example-demux/src/index.js b/packages/example-demux/src/index.js index 453c41f..fa2987b 100644 --- a/packages/example-demux/src/index.js +++ b/packages/example-demux/src/index.js @@ -7,6 +7,7 @@ const Sequelize = require('sequelize'); const { IconActionReader } = require('@aloxide/demux-icon'); const { createDynamoDbConnection } = require('./dynamodb'); const { createMongoDbConnection } = require('./mongodb'); +const { NodeosActionReader } = require('demux-eos'); global.logger = Logger.createLogger({ name: 'example-demux', @@ -14,8 +15,9 @@ global.logger = Logger.createLogger({ src: false, }); -const aloxideConfig = readAloxideConfig(path.resolve('./aloxide.yml'), logger); +const aloxideConfig = readAloxideConfig(path.resolve(__dirname, '../aloxide.yml'), logger); +// indexStateSchema.name = 'DemuxIndexState_eos'; indexStateSchema.name = 'DemuxIndexState_ICON'; aloxideConfig.entities.push(indexStateSchema); @@ -31,9 +33,9 @@ const modelBuilder = new ModelBuilder({ */ const db = [ { dbType: 'postgres', enable: true }, - { dbType: 'mysql', enable: true }, - { dbType: 'dynamo', enable: true }, - { dbType: 'mongo', enable: true }, + { dbType: 'mysql', enable: false }, + { dbType: 'dynamo', enable: false }, + { dbType: 'mongo', enable: false }, { dbType: 'memory', enable: false }, ]; // TODO add more database type @@ -96,90 +98,120 @@ Promise.all( }), ); }), -).then(() => { - const dataProviders = modelNames.map(name => { - const isIndexState = name == indexStateSchema.name; - - return { - name, - setup() { - return Promise.resolve(); - }, - - count() { - return getModel(defaultOrm.models, name).count(); - }, - - findAll() { - return getModel(defaultOrm.models, name).findAll(); - }, - - find(id) { - return getModel(defaultOrm.models, name).findByPk(id, { raw: true }); - }, - - create(data) { - return Promise.all( - dbModels.map(({ models }) => { - const model = getModel(models, name); - return model.create(data); - }), - ).then(() => data); - }, - - update(data, meta) { - const key = meta.entity.key; - - return Promise.all( - dbModels.map(({ models }) => { - const model = getModel(models, name); - return model.update(data, { - where: { - [key]: data[key], - }, - logging: !isIndexState, - }); - }), - ).then(() => data); - }, - - delete(id) { - return Promise.all( - dbModels.map(({ models }) => { - const model = getModel(models, name); - return model.destroy(id); - }), - ).then(() => true); - }, +) + .then(() => { + const dataProviders = modelNames.map(name => { + const isIndexState = name == indexStateSchema.name; + + // Return DataProvider + return { + name, + setup() { + return Promise.resolve(); + }, + + count() { + return getModel(defaultOrm.models, name).count(); + }, + + findAll() { + return getModel(defaultOrm.models, name).findAll(); + }, + + find(id) { + return getModel(defaultOrm.models, name).findByPk(id, { raw: true }); + }, + + create(data) { + return Promise.all( + dbModels.map(({ models }) => { + const model = getModel(models, name); + return model.create(data); + }), + ).then(() => data); + }, + + update(data, meta) { + const key = meta.entity.key; + + return Promise.all( + dbModels.map(({ models }) => { + const model = getModel(models, name); + return model.update(data, { + where: { + [key]: data[key], + }, + logging: !isIndexState, + }); + }), + ).then(() => data); + }, + + delete(id) { + return Promise.all( + dbModels.map(({ models }) => { + const model = getModel(models, name); + return model.destroy(id); + }), + ).then(() => true); + }, + }; + }); + + /** + * required data provider + * Poll, Option, Vote, DemuxIndexState_ICON + */ + const dataAdapter = new AloxideDataManager({ + dataProviderMap: new Map(dataProviders.map(d => [d.name, d])), + }); + + // // Eos config + // const accountName = 'helloworld12'; + // const watcherConfig = { + // bcName: 'eos', + // actionReader: new NodeosActionReader({ + // nodeosEndpoint: 'https://testnet.canfoundation.io', + // onlyIrreversible: false, + // startAtBlock: parseInt('9130005', 10), + // }), + // }; + + // Icon config + const accountName = 'cxbc1b71bb40ef97c682114e10981169db23138327'; + const watcherConfig = { + bcName: 'ICON', + actionReader: new IconActionReader({ + endpoint: 'https://bicon.net.solidwallet.io/api/v3', + nid: 3, + logLevel: 'debug', + logSource: 'reader-ICON', + startAtBlock: 7563483, + numRetries: 5, + waitTimeMs: 2000, + }), }; - }); - /** - * required data provider - * Poll, Option, Vote, DemuxIndexState_ICON - */ - const dataAdapter = new AloxideDataManager({ - dataProviderMap: new Map(), - }); + const watcher = createWatcher({ + accountName, + aloxideConfig, + dataAdapter, + logger, + actionHandlerOptions: { + handlers: [ + { + actionName: `${accountName}::crepoll`, + handler: data => { + console.log(JSON.stringify(data)); + }, + }, + ], + }, + ...watcherConfig, + }); - dataProviders.forEach(d => dataAdapter.dataProviderMap.set(d.name, d)); - - createWatcher({ - bcName: 'ICON', - accountName: 'cxbc1b71bb40ef97c682114e10981169db23138327', - aloxideConfig, - actionReader: new IconActionReader({ - endpoint: 'https://bicon.net.solidwallet.io/api/v3', - nid: 3, - logLevel: 'debug', - logSource: 'reader-ICON', - startAtBlock: 7563483, - numRetries: 5, - waitTimeMs: 2000, - }), - dataAdapter, - logger, - }).then(watcher => { return watcher.start(); + }) + .catch(e => { + logger?.error('Demux watcher failed to start: ' + e.message); }); -}); diff --git a/packages/example-demux/src/mongodb.js b/packages/example-demux/src/mongodb.js index 77ad4cc..0b278ee 100644 --- a/packages/example-demux/src/mongodb.js +++ b/packages/example-demux/src/mongodb.js @@ -13,16 +13,7 @@ const client = new MongoClient(url); let db; // Use connect method to connect to the Server -const pConnectDb = new Promise((resolve, reject) => - client.connect(function (err) { - if (err) { - return reject(err); - } - - db = client.db('aloxide'); - resolve(db); - }), -); +let pConnectDb; function createModel(entity, mongoDb) { let coll; @@ -84,6 +75,17 @@ function createModel(entity, mongoDb) { } function createMongoDbConnection(entities) { + pConnectDb = new Promise((resolve, reject) => + client.connect(function(err) { + if (err) { + return reject(err); + } + + db = client.db('aloxide'); + resolve(db); + }), + ); + function authenticate() { return pConnectDb; } @@ -105,7 +107,7 @@ function createMongoDbConnection(entities) { }; } -process.on('unhandledRejection', function (reason, p) { +process.on('unhandledRejection', function(reason, p) { logger.error('Unhandled', reason, p); // log all your errors, "unsuppressing" them. client.close(); process.exit(1);