From efb56c1535dbbdb6d0ee4255b058de60e81eda4d Mon Sep 17 00:00:00 2001 From: romanconceroio Date: Mon, 16 Feb 2026 23:17:44 +0300 Subject: [PATCH 1/7] [mod]: fixed confirmations --- package-lock.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package-lock.json b/package-lock.json index dff341e..c95646d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -538,7 +538,7 @@ }, "node_modules/@concero/operator-utils": { "version": "1.0.0", - "resolved": "git+ssh://git@github.com/concero/operator-utils.git#e7d330569db84615bf8edc23b7e0fcbd43aa33ea", + "resolved": "git+ssh://git@github.com/concero/operator-utils.git#8b476a8239f14d3aa8a5bae7a1c7a17e75bd229b", "license": "ISC", "dependencies": { "@slack/web-api": "^7.14.1", From 24b671c037f38465b39960bec68ff0b3a50684bf Mon Sep 17 00:00:00 2001 From: romanconceroio Date: Tue, 17 Feb 2026 11:00:02 +0300 Subject: [PATCH 2/7] [mod]: fixed api --- src/relayer/api/api.controller.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/relayer/api/api.controller.ts b/src/relayer/api/api.controller.ts index b50aac6..89399ec 100644 --- a/src/relayer/api/api.controller.ts +++ b/src/relayer/api/api.controller.ts @@ -30,7 +30,7 @@ export class ApiController extends ContextProvider { async managementApi => { managementApi.addHook('preHandler', checkManagementAccess); managementApi.get('/jobs', handleGetJobsList); - managementApi.get('/jobs/retry', (req, res) => + managementApi.post('/jobs/retry', (req, res) => handleRetryJob(req, res, refetchLog), ); managementApi.get('/chains', handleGetChainsList); From e989dc667151bf1002b298f238174fab79b31f75 Mon Sep 17 00:00:00 2001 From: romanconceroio Date: Wed, 4 Mar 2026 16:25:58 +0300 Subject: [PATCH 3/7] [mod]: fixed submit queue processor --- .../validator/submit-queue.processor.ts | 214 +++++++++--------- src/types/job.ts | 1 + 2 files changed, 110 insertions(+), 105 deletions(-) diff --git a/src/relayer/validator/submit-queue.processor.ts b/src/relayer/validator/submit-queue.processor.ts index c0963f3..0127e3d 100644 --- a/src/relayer/validator/submit-queue.processor.ts +++ b/src/relayer/validator/submit-queue.processor.ts @@ -4,53 +4,75 @@ import { requiredCallbacksCount } from './adapters'; import { BaseValidatorService } from './base-validator.service'; import { Job } from '@prisma/client'; + import { relayerLibGasLimits } from '../../constants/relayerLibGasLimits'; import { CRE, JobPayload, JobStatus } from '../../types'; import { MessagingCodec } from '../../utils'; import { Context, ValidatorType } from '../types'; - import decodeInternalValidatorConfig = MessagingCodec.decodeInternalValidatorConfig; +const MAX_PROMISES_SLOTS = 300; + export class SubmitQueueProcessor extends BaseValidatorService { - private isProcessing = false; + private usedSlots: number = 0; + private isPumping = false; constructor(context: Context) { super('SubmitQueueProcessor', context); } async pump(options: { maxTxPerPump: number; maxTxPerChain: number }) { - if (this.isProcessing) { + if (this.isPumping) { return; } + + this.isPumping = true; + const availableSlots = MAX_PROMISES_SLOTS - this.usedSlots; + if (availableSlots <= 0) { + return; + } + const startTimestamp = Date.now(); try { - this.isProcessing = true; - // @todo: add custom profiling decorator - const jobs = await this.context.dbClient.$queryRaw` + const jobs = await this.context.dbClient.$transaction(async client => { + await client.job.updateMany({ + where: { + status: JobStatus.ProcessingSubmit, + updatedAt: { lt: new Date(Date.now() - 5 * 60 * 1000) }, + }, + data: { + status: JobStatus.PendingSubmit, + submitPlannedTo: new Date(), + submitAttempts: 0, + }, + }); + + return client.$queryRaw` WITH ranked AS ( SELECT j.id, + j."dstChainSelector", ROW_NUMBER() OVER (PARTITION BY j."dstChainSelector" ORDER BY j."lastSubmitAt" ASC) AS rn FROM job j WHERE j.status = ${JobStatus.PendingSubmit} AND j."callbacksCount" = ${requiredCallbacksCount} - AND ( - j."submitPlannedTo" < now() - OR j."submitPlannedTo" IS NULL - ) + AND (j."submitPlannedTo" < now() OR j."submitPlannedTo" IS NULL) ) - SELECT j.* - FROM job j - JOIN ( - SELECT id - FROM ranked - WHERE rn <= ${options.maxTxPerChain} + UPDATE job j + SET status = ${JobStatus.ProcessingSubmit} + FROM ( + SELECT id + FROM ranked + WHERE rn <= ${options.maxTxPerChain} -- per-chain limit ORDER BY rn - LIMIT ${options.maxTxPerPump} - ) capped ON j.id = capped.id; - `; + LIMIT ${Math.max(0, Math.min(availableSlots, options.maxTxPerPump))} -- global + tick limit + ) capped + WHERE j.id = capped.id + RETURNING j.*; + `; + }); this.logger.info(`pump jobs.length=${jobs.length}`); if (!jobs.length) { @@ -58,96 +80,78 @@ export class SubmitQueueProcessor extends BaseValidatorService { } // @todo: support bulkWrite in TxWriter to support batches by one chain & move to batches - const promises = jobs.map(async job => { - const jobPayload = JSON.parse(job.payload) as JobPayload; - - try { - this.logger.info( - `Submitting message ${job.messageId} to chain ${job.dstChainSelector}`, - ); - - const creCallback = await this.context.dbClient.creCallback.findFirst({ - where: { messageId: job.messageId }, - }); - if (!creCallback) { - throw new Error( - `Cannot find creCallback for message with id ${job.messageId}`, - ); - } - const creResponse = JSON.parse(creCallback.payload) as CRE.Response; - - const validations = this.extractJobValidations( - job.validatorType as ValidatorType, - job.messageId as Hex, - creResponse, - ); - const validatorLibs = this.extractJobValidatorLibs( - job.validatorType as ValidatorType, - job.dstChainSelector, - ); - - const dst = await this.submitMessage( - job.dstChainSelector, - jobPayload.data.messageReceipt, - validations, - validatorLibs, - ); - - this.logger.info( - `pump SubmitDst for (messageId=${job.messageId}, jobId=${job.id}) txHash=${dst.hash}, blockNumber=${dst.blockNumber}`, - ); - - return { jobId: job.id, type: 'success', dst }; - } catch (e) { - this.logger.error( - `pump Message (id=${job.messageId}, jobId=${job.id}) submit failed: ${e}`, - ); - return { jobId: job.id, type: 'failed', attempts: job.submitAttempts }; - } - }); - - const results = await Promise.all(promises); - const successResults = results.filter(i => i.type === 'success'); - const failedResults = results.filter(i => i.type === 'failed'); - - await this.context.dbClient.$transaction(async client => { - const successPromises = successResults.map(i => { - this.logger.info( - `pump Submit succeeded jobId=${i.jobId}, attempt=${(i?.attempts || 0) + 1}, txHash=${i?.dst?.blockNumber}`, - ); - return client.job.update({ - where: { id: i.jobId }, - data: { - status: JobStatus.WaitingDstFinality, - submitAttempts: 0, - submitPlannedTo: null, - lastSubmitAt: new Date(), - dstBlockNumber: String(i?.dst?.blockNumber), - dstTxHash: String(i?.dst?.hash), - }, - }); - }); - const failedPromises = failedResults.map(i => { - const submitPlannedTo = this.calculateNextPlannedTo((i.attempts as number) + 1); - this.logger.warn( - `Submit failed jobId=${i.jobId}, attempt=${(i?.attempts || 0) + 1}, nextTryIn=${Math.round((submitPlannedTo.getTime() - Date.now()) / 1000)}s`, - ); - return client.job.update({ - where: { id: i.jobId }, - data: { - submitPlannedTo, - submitAttempts: { increment: 1 }, - }, - }); - }); - const totalPromises = successPromises.concat(failedPromises); - await Promise.all(totalPromises); - }); + this.usedSlots += jobs.length; + const promises = jobs.map(async job => this.processJob(job)); + await Promise.all(promises); + this.usedSlots -= jobs.length; } catch (e) { this.logger.info(`pump failed: ${e}`); } finally { + this.isPumping = false; this.logger.info(`pump took: ${(Date.now() - startTimestamp) / 1000}s`); - this.isProcessing = false; + } + } + + private async processJob(job: Job): Promise { + try { + this.logger.info( + `pump Job (id=${job.id},messageId=${job.messageId}) processing to Chain (selector${job.dstChainSelector})}`, + ); + + const jobPayload = JSON.parse(job.payload) as JobPayload; + + const creCallback = await this.context.dbClient.creCallback.findFirst({ + where: { messageId: job.messageId }, + }); + if (!creCallback) { + throw new Error(`CreCallback not found by messageId="${job.messageId}"`); + } + const creResponse = JSON.parse(creCallback.payload) as CRE.Response; + + const validations = this.extractJobValidations( + job.validatorType as ValidatorType, + job.messageId as Hex, + creResponse, + ); + const validatorLibs = this.extractJobValidatorLibs( + job.validatorType as ValidatorType, + job.dstChainSelector, + ); + + const dstReceipt = await this.submitMessage( + job.dstChainSelector, + jobPayload.data.messageReceipt, + validations, + validatorLibs, + ); + this.logger.info( + `pump Job (id=${job.id},messageId=${job.messageId}) succeeded: Receipt (txHash=${dstReceipt.hash},block=${String(dstReceipt.blockNumber)}`, + ); + + this.context.dbClient.job.update({ + where: { id: job.id }, + data: { + status: JobStatus.WaitingDstFinality, + submitAttempts: 0, + submitPlannedTo: null, + lastSubmitAt: new Date(), + dstBlockNumber: String(dstReceipt?.blockNumber), + dstTxHash: dstReceipt.hash, + }, + }); + } catch (e) { + const submitPlannedTo = this.calculateNextPlannedTo(job.submitAttempts + 1); + await this.context.dbClient.job.update({ + where: { id: job.id }, + data: { + submitPlannedTo, + status: JobStatus.PendingSubmit, + submitAttempts: { increment: 1 }, + }, + }); + this.logger.info( + `pump Job (id=${job.id},messageId=${job.messageId}) failed: ${e}, SubmitRetry (attempt=${job.submitAttempts},scheduledTo=${submitPlannedTo})`, + ); } } diff --git a/src/types/job.ts b/src/types/job.ts index 6c4e75b..6fa5d90 100644 --- a/src/types/job.ts +++ b/src/types/job.ts @@ -7,6 +7,7 @@ export enum JobStatus { PendingVerification = 'pending_verification', // verification planned, pending request FailedVerification = 'failed_verification', // verification failed, should be retried PendingSubmit = 'pending_submit', // tx submit planned, pending request + ProcessingSubmit = 'processing_submit', // tx submit is processing (waiting for receipt) WaitingDstFinality = 'waiting_dst_finality', // wait for finalization proof on dst Success = 'success', // tx executed and on dst side Failed = 'failed', // failed by reason JobErrorCode From 316197633a23b685b9296f6b06f3018d361dada3 Mon Sep 17 00:00:00 2001 From: romanconceroio Date: Mon, 9 Mar 2026 17:02:48 +0300 Subject: [PATCH 4/7] [mod]: fixed submit queue processor --- src/relayer/validator/submit-queue.processor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/relayer/validator/submit-queue.processor.ts b/src/relayer/validator/submit-queue.processor.ts index 0127e3d..2e9a159 100644 --- a/src/relayer/validator/submit-queue.processor.ts +++ b/src/relayer/validator/submit-queue.processor.ts @@ -11,7 +11,7 @@ import { MessagingCodec } from '../../utils'; import { Context, ValidatorType } from '../types'; import decodeInternalValidatorConfig = MessagingCodec.decodeInternalValidatorConfig; -const MAX_PROMISES_SLOTS = 300; +const MAX_PROMISES_SLOTS = 30; export class SubmitQueueProcessor extends BaseValidatorService { private usedSlots: number = 0; From e6422dcfa1bc2d4e55e6bef3cbbc8d94ad3e02cc Mon Sep 17 00:00:00 2001 From: romanconceroio Date: Mon, 9 Mar 2026 17:02:58 +0300 Subject: [PATCH 5/7] [mod]: fixed submit queue processor --- src/relayer/validator/submit-queue.processor.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/relayer/validator/submit-queue.processor.ts b/src/relayer/validator/submit-queue.processor.ts index 2e9a159..f16cb57 100644 --- a/src/relayer/validator/submit-queue.processor.ts +++ b/src/relayer/validator/submit-queue.processor.ts @@ -4,7 +4,6 @@ import { requiredCallbacksCount } from './adapters'; import { BaseValidatorService } from './base-validator.service'; import { Job } from '@prisma/client'; - import { relayerLibGasLimits } from '../../constants/relayerLibGasLimits'; import { CRE, JobPayload, JobStatus } from '../../types'; import { MessagingCodec } from '../../utils'; From a9b1bd1fad499cd5dc585b68640d21fd5d47a13c Mon Sep 17 00:00:00 2001 From: romanconceroio Date: Mon, 9 Mar 2026 23:32:45 +0300 Subject: [PATCH 6/7] [mod]: fixed submit queue processor --- src/relayer/validator/submit-queue.processor.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/relayer/validator/submit-queue.processor.ts b/src/relayer/validator/submit-queue.processor.ts index f16cb57..686b19b 100644 --- a/src/relayer/validator/submit-queue.processor.ts +++ b/src/relayer/validator/submit-queue.processor.ts @@ -35,7 +35,6 @@ export class SubmitQueueProcessor extends BaseValidatorService { try { // @todo: add custom profiling decorator - const jobs = await this.context.dbClient.$transaction(async client => { await client.job.updateMany({ where: { @@ -127,7 +126,7 @@ export class SubmitQueueProcessor extends BaseValidatorService { `pump Job (id=${job.id},messageId=${job.messageId}) succeeded: Receipt (txHash=${dstReceipt.hash},block=${String(dstReceipt.blockNumber)}`, ); - this.context.dbClient.job.update({ + await this.context.dbClient.job.update({ where: { id: job.id }, data: { status: JobStatus.WaitingDstFinality, From c922d823f879a9413aa8c350b9b654182783aaa1 Mon Sep 17 00:00:00 2001 From: romanconceroio Date: Mon, 9 Mar 2026 23:34:46 +0300 Subject: [PATCH 7/7] [mod]: fixed submit queue processor --- src/relayer/validator/submit-queue.processor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/relayer/validator/submit-queue.processor.ts b/src/relayer/validator/submit-queue.processor.ts index 686b19b..58ca5ea 100644 --- a/src/relayer/validator/submit-queue.processor.ts +++ b/src/relayer/validator/submit-queue.processor.ts @@ -93,7 +93,7 @@ export class SubmitQueueProcessor extends BaseValidatorService { private async processJob(job: Job): Promise { try { this.logger.info( - `pump Job (id=${job.id},messageId=${job.messageId}) processing to Chain (selector${job.dstChainSelector})}`, + `pump Job (id=${job.id},messageId=${job.messageId}) processing to Chain (selector=${job.dstChainSelector})`, ); const jobPayload = JSON.parse(job.payload) as JobPayload; @@ -123,7 +123,7 @@ export class SubmitQueueProcessor extends BaseValidatorService { validatorLibs, ); this.logger.info( - `pump Job (id=${job.id},messageId=${job.messageId}) succeeded: Receipt (txHash=${dstReceipt.hash},block=${String(dstReceipt.blockNumber)}`, + `pump Job (id=${job.id},messageId=${job.messageId}) succeeded: Receipt (txHash=${dstReceipt.hash},block=${String(dstReceipt.blockNumber)})`, ); await this.context.dbClient.job.update({