Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions packages/demux/src/AloxideActionHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EntityConfig } from '@aloxide/bridge/src';
import { AbstractActionHandler } from 'demux';
import { EntityConfig } from '@aloxide/bridge';
import { AbstractActionHandler, BlockInfo, Updater, VersionedAction } from 'demux';

import { indexStateSchema } from './indexStateSchema';

Expand All @@ -13,9 +13,14 @@ import type {
import type { DataAdapter } from './DataAdapter';
import type { DMeta } from './DMeta';
import type { IndexStateModel } from './IndexStateModel';
import { VersatileUpdater } from './VersatileUpdater';

export interface AloxideActionHandlerOptions extends ActionHandlerOptions {
indexStateModelName?: string;
handlers?: {
actionName: string;
handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void;
}[];
}

export interface AloxideActionHandlerContext {
Expand All @@ -37,13 +42,110 @@ export class AloxideActionHandler extends AbstractActionHandler {
super(handlerVersions, options);
if (options) {
this.indexStateModelName = options.indexStateModelName;

// add initial handlers
const handlers = Array.isArray(options.handlers) ? options.handlers : [];

for (const { handler, actionName } of handlers) {
this.addHandler(handler, actionName);
}
}
}

getIndexStateModelName() {
return this.indexStateModelName || `DemuxIndexState_${this.bcName.replace(/\W+/, '_')}`;
}

/**
* @override
* @param candidateType The incoming action's type
* @param subscribedType The type the Updater of Effect is subscribed to
* @param _payload The payload of the incoming Action.
*/
matchActionType(candidateType, subscribedType, _payload?): boolean {
if (subscribedType === '*') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to allow user to subscribe to action of specific contract account. Somethings like eosio::* or governance::*

return true;
}

return candidateType === subscribedType;
}

/**
* @override
* @param state
* @param nextBlock
* @param context
* @param isReplay
*/
applyUpdaters(
state: any,
nextBlock: NextBlock,
context: any,
isReplay: boolean,
): Promise<VersionedAction[]> {
// Add additional data to payload for further handling.
nextBlock.block.actions.forEach(action => {
action.payload.actionType = action.type;
});

return super.applyUpdaters(state, nextBlock, context, isReplay);
}

/**
* Add updater to handle data
* @param updater Updater
*/
addUpdater(updater: Updater) {
// @ts-ignore
if (this.handlerVersionMap) {
// @ts-ignore
const updaters = this.handlerVersionMap[this.handlerVersionName].updaters;
updaters.push(updater);
} else {
throw new Error('"handlerVersionMap" not found');
}
}

/**
* Add custom handler for custom action
* @param handler hanlder function
* @param actionName action name string
*/
addHandler(
handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void,
actionName?: string,
) {
const versatileUpdaters: VersatileUpdater[] = this.getVersatileUpdaters();

if (versatileUpdaters.length === 0) {
this.log.warn(
'"addHandler" is intended to use only with Versatile Updaters which can handle all types of actions (actionType = "*")',
);
return;
}

for (const updater of versatileUpdaters) {
updater.addHandler(handler, actionName);
}

return true;
}

private getVersatileUpdaters(): VersatileUpdater[] {
let updaters: any[] = [];
// @ts-ignore
if (this.handlerVersionMap) {
// @ts-ignore
updaters = this.handlerVersionMap[this.handlerVersionName].updaters;
}

return updaters.filter(
updater =>
(updater instanceof VersatileUpdater || typeof updater.addHandler === 'function') &&
updater.actionType === '*',
);
}

protected updateIndexState(
state: any,
nextBlock: NextBlock,
Expand Down
86 changes: 86 additions & 0 deletions packages/demux/src/VersatileUpdater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import type { ActionCallback, BlockInfo, Updater } from 'demux';
import type { Logger } from './Logger';

export class VersatileUpdater implements Updater {
#_handlersMap: Map<string, symbol[]> = new Map();
actionType: string = '*';
logger?: Logger;

constructor(options: { logger?: Logger; actionType?: string } = {}) {
if (typeof options.actionType === 'string') {
this.actionType = options.actionType;
}
this.logger = options.logger;
}

apply: ActionCallback = (
state: any,
payload: any,
blockInfo: BlockInfo,
context: any,
): Promise<void> => {
const actionName = payload.actionType;

this.handleData(actionName, {
state,
payload,
blockInfo,
context,
});
return Promise.resolve();
};

addHandler(
handler: (data: { state: any; payload: any; blockInfo: BlockInfo; context: any }) => void,
actionName?: string,
) {
if (typeof handler !== 'function') {
throw new Error('"handler" is required and must be a function');
}

if (!actionName) {
actionName = this.actionType;
}

// TODO: enhance this simple check by using regex.
if (typeof actionName !== 'string' || actionName.indexOf('::') < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I input action name *, I will get error here!.

throw new Error(
`"actionName" must be a string and must contain account which this action belong to. Ex: "eosio::${actionName}"`,
);
}

if (this.actionType !== '*' && this.actionType !== actionName) {
throw new Error(`This Updater is used to handle "${this.actionType}" action only`);
}

const handlerMap = this.#_handlersMap;
const newSymbol = Symbol(actionName);
this[newSymbol] = handler;

if (handlerMap.has(actionName)) {
handlerMap.get(actionName).push(newSymbol);
} else {
handlerMap.set(actionName, [newSymbol]);
}

return true;
}

protected async handleData(actionName: string, data: any, scope?: any): Promise<any> {
const handlerMap = this.#_handlersMap;
const handlerSymbols: symbol[] = handlerMap.get(actionName) || [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of I subscribe handler to action *, handler will never be executed.


if (handlerSymbols.length === 0) return;

// Pass custom scope to prevent suspicious handler from modifying real object by .
scope = scope ? scope : {};
const handlerCalls = [];

// Execute all handlers
for (const symbol of handlerSymbols) {
handlerCalls.push(this[symbol].call(scope, data));
}

return Promise.all(handlerCalls);
}
}
9 changes: 7 additions & 2 deletions packages/demux/src/createWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { ActionReader, ActionHandler, HandlerVersion, ActionWatcherOptions
import type { AloxideActionHandlerOptions } from './AloxideActionHandler';
import type { Logger } from './Logger';
import type { AloxideConfig } from '@aloxide/abstraction';
import { VersatileUpdater } from './VersatileUpdater';

export interface CreateWatcherConfig {
/**
Expand All @@ -27,7 +28,7 @@ export interface CreateWatcherConfig {
actionWatcherOptions?: ActionWatcherOptions;
}

export async function createWatcher(config: CreateWatcherConfig): Promise<BaseActionWatcher> {
export function createWatcher(config: CreateWatcherConfig): BaseActionWatcher {
const {
bcName,
accountName,
Expand All @@ -47,7 +48,11 @@ export async function createWatcher(config: CreateWatcherConfig): Promise<BaseAc
handlerVersions = [
new BaseHandlerVersion(
versionName,
createDbUpdater(accountName, dataAdapter, aloxideConfig.entities, logger),
[
...createDbUpdater(accountName, dataAdapter, aloxideConfig.entities, logger),
// Versatile Updater is used to handle all actions by default
new VersatileUpdater(),
],
[],
),
];
Expand Down
Loading