Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/relayer/api/api.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class ApiController extends ContextProvider {
async managementApi => {
managementApi.addHook('preHandler', checkManagementAccess);
managementApi.get('/jobs', handleGetJobsList);
managementApi.get('/jobs/retry', (req, res) =>
managementApi.post('/jobs/retry', (req, res) =>
handleRetryJob(req, res, refetchLog),
);
managementApi.get('/chains', handleGetChainsList);
Expand Down
208 changes: 105 additions & 103 deletions src/relayer/validator/submit-queue.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,146 +8,148 @@ import { relayerLibGasLimits } from '../../constants/relayerLibGasLimits';
import { CRE, JobPayload, JobStatus } from '../../types';
import { MessagingCodec } from '../../utils';
import { Context, ValidatorType } from '../types';

import decodeInternalValidatorConfig = MessagingCodec.decodeInternalValidatorConfig;

const MAX_PROMISES_SLOTS = 30;

export class SubmitQueueProcessor extends BaseValidatorService {
private isProcessing = false;
private usedSlots: number = 0;
private isPumping = false;

constructor(context: Context) {
super('SubmitQueueProcessor', context);
}

async pump(options: { maxTxPerPump: number; maxTxPerChain: number }) {
if (this.isProcessing) {
if (this.isPumping) {
return;
}

this.isPumping = true;
const availableSlots = MAX_PROMISES_SLOTS - this.usedSlots;
if (availableSlots <= 0) {
return;
}

const startTimestamp = Date.now();

try {
this.isProcessing = true;

// @todo: add custom profiling decorator
const jobs = await this.context.dbClient.$transaction(async client => {
await client.job.updateMany({
where: {
status: JobStatus.ProcessingSubmit,
updatedAt: { lt: new Date(Date.now() - 5 * 60 * 1000) },
},
data: {
status: JobStatus.PendingSubmit,
submitPlannedTo: new Date(),
submitAttempts: 0,
},
});

const jobs = await this.context.dbClient.$queryRaw<Job[]>`
return client.$queryRaw<Job[]>`
WITH ranked AS (
SELECT j.id,
j."dstChainSelector",
ROW_NUMBER() OVER (PARTITION BY j."dstChainSelector" ORDER BY j."lastSubmitAt" ASC) AS rn
FROM job j
WHERE j.status = ${JobStatus.PendingSubmit}
AND j."callbacksCount" = ${requiredCallbacksCount}
AND (
j."submitPlannedTo" < now()
OR j."submitPlannedTo" IS NULL
)
AND (j."submitPlannedTo" < now() OR j."submitPlannedTo" IS NULL)
)
SELECT j.*
FROM job j
JOIN (
SELECT id
FROM ranked
WHERE rn <= ${options.maxTxPerChain}
UPDATE job j
SET status = ${JobStatus.ProcessingSubmit}
FROM (
SELECT id
FROM ranked
WHERE rn <= ${options.maxTxPerChain} -- per-chain limit
ORDER BY rn
LIMIT ${options.maxTxPerPump}
) capped ON j.id = capped.id;
`;
LIMIT ${Math.max(0, Math.min(availableSlots, options.maxTxPerPump))} -- global + tick limit
) capped
WHERE j.id = capped.id
RETURNING j.*;
`;
});

this.logger.info(`pump jobs.length=${jobs.length}`);
if (!jobs.length) {
return;
}

// @todo: support bulkWrite in TxWriter to support batches by one chain & move to batches
const promises = jobs.map(async job => {
const jobPayload = JSON.parse(job.payload) as JobPayload;

try {
this.logger.info(
`Submitting message ${job.messageId} to chain ${job.dstChainSelector}`,
);

const creCallback = await this.context.dbClient.creCallback.findFirst({
where: { messageId: job.messageId },
});
if (!creCallback) {
throw new Error(
`Cannot find creCallback for message with id ${job.messageId}`,
);
}
const creResponse = JSON.parse(creCallback.payload) as CRE.Response;

const validations = this.extractJobValidations(
job.validatorType as ValidatorType,
job.messageId as Hex,
creResponse,
);
const validatorLibs = this.extractJobValidatorLibs(
job.validatorType as ValidatorType,
job.dstChainSelector,
);
this.usedSlots += jobs.length;
const promises = jobs.map(async job => this.processJob(job));
await Promise.all(promises);
this.usedSlots -= jobs.length;
} catch (e) {
this.logger.info(`pump failed: ${e}`);
} finally {
this.isPumping = false;
this.logger.info(`pump took: ${(Date.now() - startTimestamp) / 1000}s`);
}
}

const dst = await this.submitMessage(
job.dstChainSelector,
jobPayload.data.messageReceipt,
validations,
validatorLibs,
);
private async processJob(job: Job): Promise<void> {
try {
this.logger.info(
`pump Job (id=${job.id},messageId=${job.messageId}) processing to Chain (selector=${job.dstChainSelector})`,
);

this.logger.info(
`pump SubmitDst for (messageId=${job.messageId}, jobId=${job.id}) txHash=${dst.hash}, blockNumber=${dst.blockNumber}`,
);
const jobPayload = JSON.parse(job.payload) as JobPayload;

return { jobId: job.id, type: 'success', dst };
} catch (e) {
this.logger.error(
`pump Message (id=${job.messageId}, jobId=${job.id}) submit failed: ${e}`,
);
return { jobId: job.id, type: 'failed', attempts: job.submitAttempts };
}
const creCallback = await this.context.dbClient.creCallback.findFirst({
where: { messageId: job.messageId },
});

const results = await Promise.all(promises);
const successResults = results.filter(i => i.type === 'success');
const failedResults = results.filter(i => i.type === 'failed');

await this.context.dbClient.$transaction(async client => {
const successPromises = successResults.map(i => {
this.logger.info(
`pump Submit succeeded jobId=${i.jobId}, attempt=${(i?.attempts || 0) + 1}, txHash=${i?.dst?.blockNumber}`,
);
return client.job.update({
where: { id: i.jobId },
data: {
status: JobStatus.WaitingDstFinality,
submitAttempts: 0,
submitPlannedTo: null,
lastSubmitAt: new Date(),
dstBlockNumber: String(i?.dst?.blockNumber),
dstTxHash: String(i?.dst?.hash),
},
});
});
const failedPromises = failedResults.map(i => {
const submitPlannedTo = this.calculateNextPlannedTo((i.attempts as number) + 1);
this.logger.warn(
`Submit failed jobId=${i.jobId}, attempt=${(i?.attempts || 0) + 1}, nextTryIn=${Math.round((submitPlannedTo.getTime() - Date.now()) / 1000)}s`,
);
return client.job.update({
where: { id: i.jobId },
data: {
submitPlannedTo,
submitAttempts: { increment: 1 },
},
});
});
const totalPromises = successPromises.concat(failedPromises);
await Promise.all(totalPromises);
if (!creCallback) {
throw new Error(`CreCallback not found by messageId="${job.messageId}"`);
}
const creResponse = JSON.parse(creCallback.payload) as CRE.Response;

const validations = this.extractJobValidations(
job.validatorType as ValidatorType,
job.messageId as Hex,
creResponse,
);
const validatorLibs = this.extractJobValidatorLibs(
job.validatorType as ValidatorType,
job.dstChainSelector,
);

const dstReceipt = await this.submitMessage(
job.dstChainSelector,
jobPayload.data.messageReceipt,
validations,
validatorLibs,
);
this.logger.info(
`pump Job (id=${job.id},messageId=${job.messageId}) succeeded: Receipt (txHash=${dstReceipt.hash},block=${String(dstReceipt.blockNumber)})`,
);

await this.context.dbClient.job.update({
where: { id: job.id },
data: {
status: JobStatus.WaitingDstFinality,
submitAttempts: 0,
submitPlannedTo: null,
lastSubmitAt: new Date(),
dstBlockNumber: String(dstReceipt?.blockNumber),
dstTxHash: dstReceipt.hash,
},
});
} catch (e) {
this.logger.info(`pump failed: ${e}`);
} finally {
this.logger.info(`pump took: ${(Date.now() - startTimestamp) / 1000}s`);
this.isProcessing = false;
const submitPlannedTo = this.calculateNextPlannedTo(job.submitAttempts + 1);
await this.context.dbClient.job.update({
where: { id: job.id },
data: {
submitPlannedTo,
status: JobStatus.PendingSubmit,
submitAttempts: { increment: 1 },
},
});
this.logger.info(
`pump Job (id=${job.id},messageId=${job.messageId}) failed: ${e}, SubmitRetry (attempt=${job.submitAttempts},scheduledTo=${submitPlannedTo})`,
);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/types/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export enum JobStatus {
PendingVerification = 'pending_verification', // verification planned, pending request
FailedVerification = 'failed_verification', // verification failed, should be retried
PendingSubmit = 'pending_submit', // tx submit planned, pending request
ProcessingSubmit = 'processing_submit', // tx submit is processing (waiting for receipt)
WaitingDstFinality = 'waiting_dst_finality', // wait for finalization proof on dst
Success = 'success', // tx executed and on dst side
Failed = 'failed', // failed by reason JobErrorCode
Expand Down