diff --git a/package-lock.json b/package-lock.json index cdab79f..c95646d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -538,7 +538,7 @@ }, "node_modules/@concero/operator-utils": { "version": "1.0.0", - "resolved": "git+ssh://git@github.com/concero/operator-utils.git#556608544deb191a7dbfeded7f6df658f01fdbc3", + "resolved": "git+ssh://git@github.com/concero/operator-utils.git#8b476a8239f14d3aa8a5bae7a1c7a17e75bd229b", "license": "ISC", "dependencies": { "@slack/web-api": "^7.14.1", diff --git a/src/relayer/api/api.controller.ts b/src/relayer/api/api.controller.ts index b50aac6..89399ec 100644 --- a/src/relayer/api/api.controller.ts +++ b/src/relayer/api/api.controller.ts @@ -30,7 +30,7 @@ export class ApiController extends ContextProvider { async managementApi => { managementApi.addHook('preHandler', checkManagementAccess); managementApi.get('/jobs', handleGetJobsList); - managementApi.get('/jobs/retry', (req, res) => + managementApi.post('/jobs/retry', (req, res) => handleRetryJob(req, res, refetchLog), ); managementApi.get('/chains', handleGetChainsList); diff --git a/src/relayer/validator/submit-queue.processor.ts b/src/relayer/validator/submit-queue.processor.ts index c0963f3..58ca5ea 100644 --- a/src/relayer/validator/submit-queue.processor.ts +++ b/src/relayer/validator/submit-queue.processor.ts @@ -8,49 +8,69 @@ import { relayerLibGasLimits } from '../../constants/relayerLibGasLimits'; import { CRE, JobPayload, JobStatus } from '../../types'; import { MessagingCodec } from '../../utils'; import { Context, ValidatorType } from '../types'; - import decodeInternalValidatorConfig = MessagingCodec.decodeInternalValidatorConfig; +const MAX_PROMISES_SLOTS = 30; + export class SubmitQueueProcessor extends BaseValidatorService { - private isProcessing = false; + private usedSlots: number = 0; + private isPumping = false; constructor(context: Context) { super('SubmitQueueProcessor', context); } async pump(options: { maxTxPerPump: number; maxTxPerChain: number }) { - if (this.isProcessing) { + if (this.isPumping) { return; } + + this.isPumping = true; + const availableSlots = MAX_PROMISES_SLOTS - this.usedSlots; + if (availableSlots <= 0) { + return; + } + const startTimestamp = Date.now(); try { - this.isProcessing = true; - // @todo: add custom profiling decorator + const jobs = await this.context.dbClient.$transaction(async client => { + await client.job.updateMany({ + where: { + status: JobStatus.ProcessingSubmit, + updatedAt: { lt: new Date(Date.now() - 5 * 60 * 1000) }, + }, + data: { + status: JobStatus.PendingSubmit, + submitPlannedTo: new Date(), + submitAttempts: 0, + }, + }); - const jobs = await this.context.dbClient.$queryRaw` + return client.$queryRaw` WITH ranked AS ( SELECT j.id, + j."dstChainSelector", ROW_NUMBER() OVER (PARTITION BY j."dstChainSelector" ORDER BY j."lastSubmitAt" ASC) AS rn FROM job j WHERE j.status = ${JobStatus.PendingSubmit} AND j."callbacksCount" = ${requiredCallbacksCount} - AND ( - j."submitPlannedTo" < now() - OR j."submitPlannedTo" IS NULL - ) + AND (j."submitPlannedTo" < now() OR j."submitPlannedTo" IS NULL) ) - SELECT j.* - FROM job j - JOIN ( - SELECT id - FROM ranked - WHERE rn <= ${options.maxTxPerChain} + UPDATE job j + SET status = ${JobStatus.ProcessingSubmit} + FROM ( + SELECT id + FROM ranked + WHERE rn <= ${options.maxTxPerChain} -- per-chain limit ORDER BY rn - LIMIT ${options.maxTxPerPump} - ) capped ON j.id = capped.id; - `; + LIMIT ${Math.max(0, Math.min(availableSlots, options.maxTxPerPump))} -- global + tick limit + ) capped + WHERE j.id = capped.id + RETURNING j.*; + `; + }); this.logger.info(`pump jobs.length=${jobs.length}`); if (!jobs.length) { @@ -58,96 +78,78 @@ export class SubmitQueueProcessor extends BaseValidatorService { } // @todo: support bulkWrite in TxWriter to support batches by one chain & move to batches - const promises = jobs.map(async job => { - const jobPayload = JSON.parse(job.payload) as JobPayload; - - try { - this.logger.info( - `Submitting message ${job.messageId} to chain ${job.dstChainSelector}`, - ); - - const creCallback = await this.context.dbClient.creCallback.findFirst({ - where: { messageId: job.messageId }, - }); - if (!creCallback) { - throw new Error( - `Cannot find creCallback for message with id ${job.messageId}`, - ); - } - const creResponse = JSON.parse(creCallback.payload) as CRE.Response; - - const validations = this.extractJobValidations( - job.validatorType as ValidatorType, - job.messageId as Hex, - creResponse, - ); - const validatorLibs = this.extractJobValidatorLibs( - job.validatorType as ValidatorType, - job.dstChainSelector, - ); + this.usedSlots += jobs.length; + const promises = jobs.map(async job => this.processJob(job)); + await Promise.all(promises); + this.usedSlots -= jobs.length; + } catch (e) { + this.logger.info(`pump failed: ${e}`); + } finally { + this.isPumping = false; + this.logger.info(`pump took: ${(Date.now() - startTimestamp) / 1000}s`); + } + } - const dst = await this.submitMessage( - job.dstChainSelector, - jobPayload.data.messageReceipt, - validations, - validatorLibs, - ); + private async processJob(job: Job): Promise { + try { + this.logger.info( + `pump Job (id=${job.id},messageId=${job.messageId}) processing to Chain (selector=${job.dstChainSelector})`, + ); - this.logger.info( - `pump SubmitDst for (messageId=${job.messageId}, jobId=${job.id}) txHash=${dst.hash}, blockNumber=${dst.blockNumber}`, - ); + const jobPayload = JSON.parse(job.payload) as JobPayload; - return { jobId: job.id, type: 'success', dst }; - } catch (e) { - this.logger.error( - `pump Message (id=${job.messageId}, jobId=${job.id}) submit failed: ${e}`, - ); - return { jobId: job.id, type: 'failed', attempts: job.submitAttempts }; - } + const creCallback = await this.context.dbClient.creCallback.findFirst({ + where: { messageId: job.messageId }, }); - - const results = await Promise.all(promises); - const successResults = results.filter(i => i.type === 'success'); - const failedResults = results.filter(i => i.type === 'failed'); - - await this.context.dbClient.$transaction(async client => { - const successPromises = successResults.map(i => { - this.logger.info( - `pump Submit succeeded jobId=${i.jobId}, attempt=${(i?.attempts || 0) + 1}, txHash=${i?.dst?.blockNumber}`, - ); - return client.job.update({ - where: { id: i.jobId }, - data: { - status: JobStatus.WaitingDstFinality, - submitAttempts: 0, - submitPlannedTo: null, - lastSubmitAt: new Date(), - dstBlockNumber: String(i?.dst?.blockNumber), - dstTxHash: String(i?.dst?.hash), - }, - }); - }); - const failedPromises = failedResults.map(i => { - const submitPlannedTo = this.calculateNextPlannedTo((i.attempts as number) + 1); - this.logger.warn( - `Submit failed jobId=${i.jobId}, attempt=${(i?.attempts || 0) + 1}, nextTryIn=${Math.round((submitPlannedTo.getTime() - Date.now()) / 1000)}s`, - ); - return client.job.update({ - where: { id: i.jobId }, - data: { - submitPlannedTo, - submitAttempts: { increment: 1 }, - }, - }); - }); - const totalPromises = successPromises.concat(failedPromises); - await Promise.all(totalPromises); + if (!creCallback) { + throw new Error(`CreCallback not found by messageId="${job.messageId}"`); + } + const creResponse = JSON.parse(creCallback.payload) as CRE.Response; + + const validations = this.extractJobValidations( + job.validatorType as ValidatorType, + job.messageId as Hex, + creResponse, + ); + const validatorLibs = this.extractJobValidatorLibs( + job.validatorType as ValidatorType, + job.dstChainSelector, + ); + + const dstReceipt = await this.submitMessage( + job.dstChainSelector, + jobPayload.data.messageReceipt, + validations, + validatorLibs, + ); + this.logger.info( + `pump Job (id=${job.id},messageId=${job.messageId}) succeeded: Receipt (txHash=${dstReceipt.hash},block=${String(dstReceipt.blockNumber)})`, + ); + + await this.context.dbClient.job.update({ + where: { id: job.id }, + data: { + status: JobStatus.WaitingDstFinality, + submitAttempts: 0, + submitPlannedTo: null, + lastSubmitAt: new Date(), + dstBlockNumber: String(dstReceipt?.blockNumber), + dstTxHash: dstReceipt.hash, + }, }); } catch (e) { - this.logger.info(`pump failed: ${e}`); - } finally { - this.logger.info(`pump took: ${(Date.now() - startTimestamp) / 1000}s`); - this.isProcessing = false; + const submitPlannedTo = this.calculateNextPlannedTo(job.submitAttempts + 1); + await this.context.dbClient.job.update({ + where: { id: job.id }, + data: { + submitPlannedTo, + status: JobStatus.PendingSubmit, + submitAttempts: { increment: 1 }, + }, + }); + this.logger.info( + `pump Job (id=${job.id},messageId=${job.messageId}) failed: ${e}, SubmitRetry (attempt=${job.submitAttempts},scheduledTo=${submitPlannedTo})`, + ); } } diff --git a/src/types/job.ts b/src/types/job.ts index 6c4e75b..6fa5d90 100644 --- a/src/types/job.ts +++ b/src/types/job.ts @@ -7,6 +7,7 @@ export enum JobStatus { PendingVerification = 'pending_verification', // verification planned, pending request FailedVerification = 'failed_verification', // verification failed, should be retried PendingSubmit = 'pending_submit', // tx submit planned, pending request + ProcessingSubmit = 'processing_submit', // tx submit is processing (waiting for receipt) WaitingDstFinality = 'waiting_dst_finality', // wait for finalization proof on dst Success = 'success', // tx executed and on dst side Failed = 'failed', // failed by reason JobErrorCode