Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions packages/backend/src/transaction/transaction-executor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ export class TransactionExecutorService {
throw new NotFoundException(`Transaction ${txId} not found`);
}

if (transaction.status !== TxStatus.PENDING) {
throw new BadRequestException(
`Transaction cannot be executed (current status: ${transaction.status})`,
);
}

// Mark as EXECUTING
await this.updateStatusAndEmit(
txId,
Expand Down Expand Up @@ -120,14 +126,14 @@ export class TransactionExecutorService {
error.message?.includes('Transaction reverted');

if (isOnChainRevert) {
// Tx confirmed as REVERTED on-chain — safe to revert to PENDING
// Tx confirmed as REVERTED on-chain — only revert if still EXECUTING
// (another concurrent call may have already set EXECUTED)
this.logger.warn(
`txId ${txId} reverted on-chain (txHash: ${submittedTxHash}). Reverting to PENDING.`,
`txId ${txId} reverted on-chain (txHash: ${submittedTxHash}). Reverting to PENDING if still EXECUTING.`,
);
await this.updateStatusAndEmit(
await this.conditionalRevertToPending(
txId,
executionData.accountAddress,
TxStatus.PENDING,
);
throw new BadRequestException(
'Transaction reverted on-chain. Please check contract conditions.',
Expand All @@ -144,12 +150,8 @@ export class TransactionExecutorService {
);
}

// Tx was NOT submitted on-chain — safe to revert to PENDING
await this.updateStatusAndEmit(
txId,
executionData.accountAddress,
TxStatus.PENDING,
);
// Tx was NOT submitted on-chain — only revert if still EXECUTING
await this.conditionalRevertToPending(txId, executionData.accountAddress);

if (error.message?.includes('Insufficient wallet balance')) {
const match = error.message.match(
Expand Down Expand Up @@ -624,6 +626,37 @@ export class TransactionExecutorService {
}
}

/**
* Revert status to PENDING only if current status is EXECUTING.
* Prevents race condition where a concurrent call already set EXECUTED.
*/
private async conditionalRevertToPending(
txId: number,
accountAddress: string,
) {
const result = await this.prisma.transaction.updateMany({
where: { txId, status: TxStatus.EXECUTING },
data: { status: TxStatus.PENDING, txHash: null },
});

if (result.count > 0) {
const eventData: TxStatusEventData = {
txId,
status: TxStatus.PENDING,
};
this.eventsService.emitToAccount(
accountAddress,
TX_STATUS_EVENT,
eventData,
);
this.logger.log(`txId ${txId} reverted to PENDING`);
} else {
this.logger.log(
`txId ${txId} not reverted — status is no longer EXECUTING`,
);
}
}

private async updateStatusAndEmit(
txId: number,
accountAddress: string,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { createPublicClient, http, type Hex } from 'viem';
import { PrismaService } from '@/database/prisma.service';
import { TransactionExecutorService } from './transaction-executor.service';
import { TxStatus, getChainById } from '@polypay/shared';

@Injectable()
export class TransactionReconcilerScheduler {
private readonly logger = new Logger(TransactionReconcilerScheduler.name);
private readonly publicClients = new Map<number, any>();

constructor(
private readonly prisma: PrismaService,
private readonly transactionExecutor: TransactionExecutorService,
) {}

private getPublicClient(chainId: number) {
let client = this.publicClients.get(chainId);
if (!client) {
const chain = getChainById(chainId);
client = createPublicClient({ chain, transport: http() });
this.publicClients.set(chainId, client);
}
return client;
}

// 13:00 Vietnam time = 06:00 UTC
@Cron('0 6 * * *', { timeZone: 'UTC' })
async reconcileStuckTransactions() {
this.logger.log('Running daily transaction reconciliation');

const stuckTxs = await this.prisma.transaction.findMany({
where: {
txHash: { not: null },
status: { in: [TxStatus.EXECUTING, TxStatus.PENDING] },
},
include: { account: true },
});

if (stuckTxs.length === 0) {
this.logger.log('No stuck transactions found');
return;
}

this.logger.log(`Found ${stuckTxs.length} stuck transactions with txHash`);

let reconciled = 0;
let reverted = 0;
let skipped = 0;

for (const tx of stuckTxs) {
try {
const publicClient = this.getPublicClient(tx.account.chainId);
const receipt = await publicClient.getTransactionReceipt({
hash: tx.txHash as Hex,
});

if (receipt.status === 'success') {
await this.transactionExecutor.markExecuted(tx.txId, tx.txHash);
this.logger.log(
`txId ${tx.txId} reconciled to EXECUTED (txHash: ${tx.txHash})`,
);
reconciled++;
} else {
// receipt.status === 'reverted'
await this.prisma.transaction.update({
where: { txId: tx.txId },
data: { status: TxStatus.PENDING, txHash: null },
});
this.logger.warn(
`txId ${tx.txId} reverted on-chain, reset to PENDING`,
);
reverted++;
}
} catch (error) {
// No receipt found (tx not mined or invalid hash) — skip
this.logger.warn(
`txId ${tx.txId} receipt not found, skipping: ${error.message}`,
);
skipped++;
}
}

this.logger.log(
`Reconciliation complete: ${reconciled} executed, ${reverted} reverted, ${skipped} skipped`,
);
}
}
2 changes: 2 additions & 0 deletions packages/backend/src/transaction/transaction.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { TransactionController } from './transaction.controller';
import { TransactionService } from './transaction.service';
import { TransactionExecutorService } from './transaction-executor.service';
import { TransactionReconcilerScheduler } from './transaction-reconciler.scheduler';
import { ZkVerifyModule } from '@/zkverify/zkverify.module';
import { DatabaseModule } from '@/database/database.module';
import { RelayerModule } from '@/relayer-wallet/relayer-wallet.module';
Expand All @@ -23,6 +24,7 @@ import { QuestModule } from '@/quest/quest.module';
providers: [
TransactionService,
TransactionExecutorService,
TransactionReconcilerScheduler,
AnalyticsLoggerService,
],
exports: [TransactionService],
Expand Down
Loading