diff --git a/src/account/services/portfolio.service.spec.ts b/src/account/services/portfolio.service.spec.ts index b053f5eb..0014fe27 100644 --- a/src/account/services/portfolio.service.spec.ts +++ b/src/account/services/portfolio.service.spec.ts @@ -2,11 +2,16 @@ import moment from 'moment'; import { PortfolioService } from './portfolio.service'; import { batchTimestampToAeHeight } from '@/utils/getBlochHeight'; import { TokenPnlResult } from './bcl-pnl.service'; +import { fetchJson } from '@/utils/common'; jest.mock('@/utils/getBlochHeight', () => ({ batchTimestampToAeHeight: jest.fn(), })); +jest.mock('@/utils/common', () => ({ + fetchJson: jest.fn(), +})); + describe('PortfolioService', () => { const basePnlResult: TokenPnlResult = { pnls: {}, @@ -29,7 +34,7 @@ describe('PortfolioService', () => { getPriceData: jest.fn(), }; const coinHistoricalPriceService = { - // Default: no DB data → falls back to coinGeckoService.fetchHistoricalPrice + // Default: no DB data -> falls back to coinGeckoService.fetchHistoricalPrice getHistoricalPriceData: jest.fn().mockResolvedValue([]), }; const bclPnlService = { @@ -59,6 +64,7 @@ describe('PortfolioService', () => { beforeEach(() => { jest.clearAllMocks(); + (fetchJson as jest.Mock).mockResolvedValue(null); }); it('limits concurrent balance fetches to snapshotConcurrency', async () => { @@ -110,11 +116,8 @@ describe('PortfolioService', () => { }); expect(snapshots).toHaveLength(10); - // PNL is pre-computed in a single batch call expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledTimes(1); - // All 10 heights land in different 300-block buckets, so 10 balance calls expect(aeSdkService.sdk.getBalance).toHaveBeenCalledTimes(10); - // Balance concurrency cap is 15 expect(maxInFlight).toBeLessThanOrEqual(15); }); @@ -122,7 +125,6 @@ describe('PortfolioService', () => { const { service, aeSdkService, coinGeckoService, bclPnlService } = createService(); - // All timestamps map to the same block height (batchTimestampToAeHeight as jest.Mock).mockImplementation( async (timestamps: number[]) => { const map = new Map(); @@ -150,16 +152,13 @@ describe('PortfolioService', () => { }); expect(snapshots).toHaveLength(3); - // All 3 timestamps share height 123, so balance is fetched once expect(aeSdkService.sdk.getBalance).toHaveBeenCalledTimes(1); - // Batch PNL is called once with deduplicated heights expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledTimes(1); expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledWith( 'ak_test', [123], undefined, ); - // Per-snapshot price is still resolved per timestamp, not per block height expect(snapshots.map((snapshot) => snapshot.ae_price)).toEqual([1, 2, 3]); }); @@ -167,8 +166,6 @@ describe('PortfolioService', () => { const { service, aeSdkService, coinGeckoService, bclPnlService } = createService(); - // 4 timestamps that map to 3 distinct exact heights, but only 2 distinct - // 300-block buckets: 100 → bucket 0, 250 → bucket 0, 350 → bucket 300, 599 → bucket 300 (batchTimestampToAeHeight as jest.Mock).mockImplementation( async (timestamps: number[]) => { const heights = [100, 250, 350, 599]; @@ -202,10 +199,7 @@ describe('PortfolioService', () => { }); expect(snapshots).toHaveLength(4); - // Heights 100 and 250 → bucket 0; heights 350 and 599 → bucket 300. - // Only 2 unique buckets → exactly 2 getBalance calls. expect(aeSdkService.sdk.getBalance).toHaveBeenCalledTimes(2); - // getBalance is called with the bucket boundaries, not the raw heights const calledHeights = aeSdkService.sdk.getBalance.mock.calls.map( ([, opts]: [any, any]) => opts.height, ); @@ -248,12 +242,10 @@ describe('PortfolioService', () => { }); expect(snapshots).toHaveLength(3); - // Called twice: once for cumulative, once for range-based expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledTimes(2); - // startBlockHeight = blockHeights[0] = 100 (timestamps[0] maps to 100+0) const calls = bclPnlService.calculateTokenPnlsBatch.mock.calls; - expect(calls[0][2]).toBeUndefined(); // cumulative: no fromBlockHeight - expect(calls[1][2]).toBe(100); // range: fromBlockHeight = blockHeights[0] + expect(calls[0][2]).toBeUndefined(); + expect(calls[1][2]).toBe(100); }); it('uses coin_historical_prices DB table when available, skipping CoinGecko', async () => { @@ -273,7 +265,6 @@ describe('PortfolioService', () => { }, ); - // DB returns prices in ascending order (as the repository does) coinHistoricalPriceService.getHistoricalPriceData.mockResolvedValue([ [Date.UTC(2026, 0, 1), 1], [Date.UTC(2026, 0, 2), 2], @@ -297,13 +288,61 @@ describe('PortfolioService', () => { }); expect(snapshots).toHaveLength(3); - // Prices come from DB — CoinGecko historical endpoint is never called expect(coinGeckoService.fetchHistoricalPrice).not.toHaveBeenCalled(); - // DB service was queried for the needed range expect( coinHistoricalPriceService.getHistoricalPriceData, ).toHaveBeenCalledTimes(1); - // Correct prices are assigned to each snapshot expect(snapshots.map((s) => s.ae_price)).toEqual([1, 2, 3]); }); + + it('resolves chain names to account pubkeys before balance and pnl lookups', async () => { + const { + service, + aeSdkService, + coinGeckoService, + coinHistoricalPriceService, + bclPnlService, + } = createService(); + + (batchTimestampToAeHeight as jest.Mock).mockImplementation( + async (timestamps: number[]) => { + const map = new Map(); + timestamps.forEach((ts) => map.set(ts, 300)); + return map; + }, + ); + (fetchJson as jest.Mock).mockResolvedValue({ + owner: 'ak_owner', + pointers: [{ key: 'account_pubkey', id: 'ak_resolved' }], + }); + coinGeckoService.getPriceData.mockResolvedValue({ usd: 99 }); + coinHistoricalPriceService.getHistoricalPriceData.mockResolvedValue([ + [Date.UTC(2026, 0, 1), 1], + ]); + aeSdkService.sdk.getBalance.mockResolvedValue('1000000000000000000'); + bclPnlService.calculateTokenPnlsBatch.mockResolvedValue( + new Map([[300, basePnlResult]]), + ); + + await service.getPortfolioHistory('mybtc.chain', { + startDate: moment.utc('2026-01-01T00:00:00.000Z'), + endDate: moment.utc('2026-01-01T00:00:00.000Z'), + interval: 86400, + }); + + expect(fetchJson).toHaveBeenCalledWith( + expect.stringContaining( + 'https://mainnet.aeternity.io/v3/names/mybtc.chain', + ), + ); + expect(aeSdkService.sdk.getBalance).toHaveBeenCalledWith( + 'ak_resolved', + expect.objectContaining({ height: 300 }), + ); + expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledWith( + 'ak_resolved', + [300], + undefined, + ); + }); }); diff --git a/src/account/services/portfolio.service.ts b/src/account/services/portfolio.service.ts index 2fc10c75..87f46577 100644 --- a/src/account/services/portfolio.service.ts +++ b/src/account/services/portfolio.service.ts @@ -9,10 +9,11 @@ import { Transaction } from '@/transactions/entities/transaction.entity'; import { AeSdkService } from '@/ae/ae-sdk.service'; import { CoinGeckoService } from '@/ae/coin-gecko.service'; import { CoinHistoricalPriceService } from '@/ae-pricing/services/coin-historical-price.service'; -import { AETERNITY_COIN_ID } from '@/configs'; +import { ACTIVE_NETWORK, AETERNITY_COIN_ID } from '@/configs'; import { toAe } from '@aeternity/aepp-sdk'; import { batchTimestampToAeHeight } from '@/utils/getBlochHeight'; import { BclPnlService, TokenPnlResult } from './bcl-pnl.service'; +import { fetchJson } from '@/utils/common'; export interface PortfolioHistorySnapshot { timestamp: Moment | Date; @@ -98,6 +99,7 @@ export class PortfolioService { * separate slow historical-state request. */ private readonly BALANCE_BUCKET_SIZE = 300; + private readonly accountPubkeyPointerKey = 'account_pubkey'; constructor( @InjectRepository(TokenHolder) @@ -175,6 +177,8 @@ export class PortfolioService { } } + const resolvedAddress = await this.resolveAccountAddress(address); + // Fetch price history and current price in parallel. // For historical prices, query the local coin_historical_prices table first // (populated by the background CoinGecko sync). Only fall back to the live @@ -244,13 +248,13 @@ export class PortfolioService { // This replaces the previous per-snapshot SQL calls (N queries → 1 query). const [pnlMap, rangePnlMap] = await Promise.all([ this.bclPnlService.calculateTokenPnlsBatch( - address, + resolvedAddress, uniqueBlockHeights, undefined, ), includePnl && useRangeBasedPnl && startBlockHeight !== undefined ? this.bclPnlService.calculateTokenPnlsBatch( - address, + resolvedAddress, uniqueBlockHeights, startBlockHeight, ) @@ -281,7 +285,7 @@ export class PortfolioService { // Balance still requires an external AE node call per unique block height const aeBalancePromise = this.getCachedBalance( balanceCache, - address, + resolvedAddress, blockHeight, ); @@ -369,6 +373,45 @@ export class PortfolioService { return data; } + private async resolveAccountAddress(address: string): Promise { + if (!address || address.startsWith('ak_') || !address.includes('.')) { + return address; + } + + try { + const response = await fetchJson<{ + owner?: string; + pointers?: Array<{ + key?: string; + encoded_key?: string; + id?: string; + }>; + }>(`${ACTIVE_NETWORK.url}/v3/names/${encodeURIComponent(address)}`); + + const accountPointer = response?.pointers?.find( + (pointer) => + pointer?.key === this.accountPubkeyPointerKey && + typeof pointer.id === 'string' && + pointer.id.startsWith('ak_'), + )?.id; + + if (accountPointer) { + return accountPointer; + } + + if (response?.owner?.startsWith('ak_')) { + return response.owner; + } + } catch (error) { + this.logger.warn( + `Failed to resolve account reference ${address}, falling back to raw value`, + error instanceof Error ? error.stack : String(error), + ); + } + + return address; + } + private getCachedBalance( cache: Map>, address: string, @@ -387,9 +430,12 @@ export class PortfolioService { return cached; } - const promise = this.aeSdkService.sdk.getBalance(address as any, { - height: bucketHeight, - } as any); + const promise = this.aeSdkService.sdk.getBalance( + address as any, + { + height: bucketHeight, + } as any, + ); cache.set(bucketHeight, promise); return promise; } diff --git a/src/ae-pricing/services/coin-historical-price.service.ts b/src/ae-pricing/services/coin-historical-price.service.ts index 91dbfbeb..c82c1450 100644 --- a/src/ae-pricing/services/coin-historical-price.service.ts +++ b/src/ae-pricing/services/coin-historical-price.service.ts @@ -1,6 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, Between } from 'typeorm'; +import { runWithDatabaseIssueLogging } from '@/utils/database-issue-logging'; import { CoinHistoricalPrice } from '../entities/coin-historical-price.entity'; @Injectable() @@ -130,16 +131,28 @@ export class CoinHistoricalPriceService { try { // Check for existing records to avoid duplicates const timestamps = priceData.map(([timestamp]) => timestamp); - const existing = await this.repository.find({ - where: { - coin_id: coinId, - currency: currency, - timestamp_ms: Between( - Math.min(...timestamps), - Math.max(...timestamps), - ), + const existing = await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'historical price duplicate lookup', + context: { + coinId, + currency, + requestedPoints: priceData.length, + minTimestamp: Math.min(...timestamps), + maxTimestamp: Math.max(...timestamps), }, - select: ['timestamp_ms'], + operation: () => + this.repository.find({ + where: { + coin_id: coinId, + currency: currency, + timestamp_ms: Between( + Math.min(...timestamps), + Math.max(...timestamps), + ), + }, + select: ['timestamp_ms'], + }), }); // TypeORM returns bigint as string, so convert to string for Set comparison @@ -173,7 +186,18 @@ export class CoinHistoricalPriceService { const chunkSize = 1000; for (let i = 0; i < entities.length; i += chunkSize) { const chunk = entities.slice(i, i + chunkSize); - await this.repository.save(chunk); + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'historical price chunk save', + context: { + coinId, + currency, + chunkStart: i, + chunkSize: chunk.length, + totalNewPoints: entities.length, + }, + operation: () => this.repository.save(chunk), + }); } this.logger.log( diff --git a/src/ae/coin-gecko.service.spec.ts b/src/ae/coin-gecko.service.spec.ts index 796ffcbc..cc80df2e 100644 --- a/src/ae/coin-gecko.service.spec.ts +++ b/src/ae/coin-gecko.service.spec.ts @@ -112,4 +112,20 @@ describe('CoinGeckoService', () => { ); expect(result).toEqual({ data: 'mockData' }); }); + + it('should reuse cached market data for three minutes by default', async () => { + const cachedMarket = { + data: { + id: AETERNITY_COIN_ID, + currentPrice: 0.1, + }, + fetchedAt: Date.now() - 2 * 60 * 1000, + }; + (cacheManager.get as jest.Mock).mockResolvedValue(cachedMarket); + + const result = await service.getCoinMarketData(AETERNITY_COIN_ID, 'usd'); + + expect(result).toEqual(cachedMarket.data); + expect(fetchJson).not.toHaveBeenCalled(); + }); }); diff --git a/src/ae/coin-gecko.service.ts b/src/ae/coin-gecko.service.ts index 68721169..350d86ac 100644 --- a/src/ae/coin-gecko.service.ts +++ b/src/ae/coin-gecko.service.ts @@ -20,6 +20,7 @@ import { CurrencyRates } from '@/utils/types'; import { CoinHistoricalPriceService } from '@/ae-pricing/services/coin-historical-price.service'; const COIN_GECKO_API_URL = 'https://api.coingecko.com/api/v3'; +const DEFAULT_MARKET_DATA_MAX_AGE_MS = 3 * 60 * 1000; export interface CoinGeckoMarketResponse { ath: number; @@ -400,7 +401,7 @@ export class CoinGeckoService { async getCoinMarketData( coinId: string, currencyCode: string, - maxAgeMs: number = 60_000, + maxAgeMs: number = DEFAULT_MARKET_DATA_MAX_AGE_MS, ): Promise { const cacheKey = `${this.marketCacheKeyPrefix}:${coinId}:${currencyCode}`; diff --git a/src/mdw-sync/services/block-sync.service.spec.ts b/src/mdw-sync/services/block-sync.service.spec.ts index c10f501e..3672f233 100644 --- a/src/mdw-sync/services/block-sync.service.spec.ts +++ b/src/mdw-sync/services/block-sync.service.spec.ts @@ -8,6 +8,22 @@ jest.mock('@/utils/common', () => ({ })); describe('BlockSyncService', () => { + const buildMiddlewareTransaction = () => ({ + hash: 'th_test_1', + block_height: 123, + block_hash: 'mh_test_1', + micro_index: 1, + micro_time: 1700000000000, + signatures: [], + encoded_tx: 'tx_test_1', + tx: { + type: 'ContractCallTx', + contract_id: 'ct_test_1', + function: 'transfer', + caller_id: 'ak_caller_1', + }, + }); + const setup = () => { const txRepository = { save: jest.fn(), @@ -53,6 +69,8 @@ describe('BlockSyncService', () => { return { service, + txRepository, + pluginBatchProcessor, blockRepository, }; }; @@ -137,4 +155,65 @@ describe('BlockSyncService', () => { }), ); }); + + it('fails fast on pool timeouts instead of falling back to repository.save', async () => { + const { service, txRepository, pluginBatchProcessor } = setup(); + const loggerError = jest + .spyOn((service as any).logger, 'error') + .mockImplementation(() => undefined); + + (fetchJson as jest.Mock).mockResolvedValueOnce({ + data: [buildMiddlewareTransaction()], + next: null, + }); + txRepository.upsert.mockRejectedValueOnce( + new Error('timeout exceeded when trying to connect'), + ); + + await expect(service.syncTransactions(123, 123, true, true)).rejects.toThrow( + 'timeout exceeded when trying to connect', + ); + + expect(txRepository.save).not.toHaveBeenCalled(); + expect(pluginBatchProcessor.processBatch).not.toHaveBeenCalled(); + expect(loggerError).toHaveBeenCalledWith( + expect.stringContaining( + 'Database connectivity/pool issue during transaction bulk upsert batch: timeout exceeded when trying to connect.', + ), + expect.any(String), + ); + expect(loggerError).toHaveBeenCalledWith( + expect.stringContaining('"issueKind":"pool_timeout"'), + expect.any(String), + ); + expect(loggerError).toHaveBeenCalledWith( + expect.stringContaining('"dbPoolMax":40'), + expect.any(String), + ); + }); + + it('still falls back to repository.save for non-connectivity bulk insert errors', async () => { + const { service, txRepository, pluginBatchProcessor } = setup(); + + (fetchJson as jest.Mock).mockResolvedValueOnce({ + data: [buildMiddlewareTransaction()], + next: null, + }); + txRepository.upsert.mockRejectedValueOnce( + new Error('duplicate key value violates unique constraint'), + ); + txRepository.save.mockResolvedValueOnce({ + hash: 'th_test_1', + block_height: 123, + }); + + const result = await service.syncTransactions(123, 123, true, true); + + expect(txRepository.save).toHaveBeenCalledTimes(1); + expect(pluginBatchProcessor.processBatch).toHaveBeenCalledWith( + [{ hash: 'th_test_1', block_height: 123 }], + 'backward', + ); + expect(result.get(123)).toEqual(['th_test_1']); + }); }); diff --git a/src/mdw-sync/services/block-sync.service.ts b/src/mdw-sync/services/block-sync.service.ts index 487976f7..191da739 100644 --- a/src/mdw-sync/services/block-sync.service.ts +++ b/src/mdw-sync/services/block-sync.service.ts @@ -1,4 +1,9 @@ import { fetchJson, sanitizeJsonForPostgres } from '@/utils/common'; +import { + isDatabaseConnectionOrPoolError, + logDatabaseIssue, + runWithDatabaseIssueLogging, +} from '@/utils/database-issue-logging'; import { ITransaction } from '@/utils/types'; import { decode } from '@aeternity/aepp-sdk'; import { Injectable, Logger } from '@nestjs/common'; @@ -60,9 +65,19 @@ export class BlockSyncService { const saveBatchSize = 1000; // Safe batch size for PostgreSQL for (let i = 0; i < blocksToSave.length; i += saveBatchSize) { const batch = blocksToSave.slice(i, i + saveBatchSize); - await this.blockRepository.upsert(batch, { - conflictPaths: ['height'], - skipUpdateIfNoValuesChanged: true, + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'key-block upsert', + context: { + startHeight, + endHeight, + batchStart: i, + batchSize: batch.length, + }, + operation: () => + this.blockRepository.upsert(batch, { + conflictPaths: ['height'], + }), }); } this.logger.debug( @@ -73,10 +88,19 @@ export class BlockSyncService { async syncMicroBlocks(startHeight: number, endHeight: number): Promise { // Get all key-blocks in the height range - const keyBlocks = await this.blockRepository.find({ - where: { - height: Between(startHeight, endHeight) as any, + const keyBlocks = await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'key-block lookup for micro-block sync', + context: { + startHeight, + endHeight, }, + operation: () => + this.blockRepository.find({ + where: { + height: Between(startHeight, endHeight) as any, + }, + }), }); if (keyBlocks.length === 0) { @@ -113,9 +137,21 @@ export class BlockSyncService { const saveBatchSize = 1000; // Safe batch size for PostgreSQL for (let i = 0; i < microBlocksToSave.length; i += saveBatchSize) { const batch = microBlocksToSave.slice(i, i + saveBatchSize); - await this.microBlockRepository.upsert(batch, { - conflictPaths: ['hash'], - skipUpdateIfNoValuesChanged: true, + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'micro-block upsert', + context: { + startHeight, + endHeight, + keyBlockCount: keyBlocks.length, + batchStart: i, + batchSize: batch.length, + }, + operation: () => + this.microBlockRepository.upsert(batch, { + conflictPaths: ['hash'], + skipUpdateIfNoValuesChanged: true, + }), }); } this.logger.debug( @@ -184,6 +220,20 @@ export class BlockSyncService { try { savedTxs = await this.bulkInsertTransactions(mdwTxs); } catch (error: any) { + if (isDatabaseConnectionOrPoolError(error)) { + logDatabaseIssue({ + logger: this.logger, + stage: 'transaction bulk upsert', + error, + context: { + url, + transactionCount: mdwTxs.length, + useBulkMode, + }, + }); + throw error; + } + this.logger.error( `Bulk insert failed for page, trying repository.save as fallback`, error, @@ -201,7 +251,16 @@ export class BlockSyncService { } } else { // Save transactions - const saved = await this.txRepository.save(mdwTxs); + const saved = await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'transaction save', + context: { + url, + transactionCount: mdwTxs.length, + useBulkMode, + }, + operation: () => this.txRepository.save(mdwTxs), + }); savedTxs = Array.isArray(saved) ? saved : [saved]; // Process batch for plugins immediately if (savedTxs.length > 0) { @@ -281,8 +340,18 @@ export class BlockSyncService { // Fetch this batch immediately after upsert to verify they exist if (batchHashes.length > 0) { - const savedBatchTxs = await this.txRepository.find({ - where: { hash: In(batchHashes) }, + const savedBatchTxs = await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'transaction reload after bulk upsert', + context: { + batchStart: i, + batchSize: batch.length, + hashCount: batchHashes.length, + }, + operation: () => + this.txRepository.find({ + where: { hash: In(batchHashes) }, + }), }); // Log warning if some transactions weren't found after upsert @@ -310,6 +379,19 @@ export class BlockSyncService { } } } catch (error: any) { + if (isDatabaseConnectionOrPoolError(error)) { + logDatabaseIssue({ + logger: this.logger, + stage: 'transaction bulk upsert batch', + error, + context: { + batchStart: i, + batchEnd: Math.min(i + batchSize, txs.length), + batchSize: batch.length, + }, + }); + throw error; + } this.logger.error( `Failed to bulk upsert batch ${i + 1}-${Math.min(i + batchSize, txs.length)}: ${error.message}`, error.stack || error, @@ -394,4 +476,5 @@ export class BlockSyncService { created_at: new Date(tx.microTime), // Explicitly set timestamp }; } + } diff --git a/src/mdw-sync/services/live-indexer.service.ts b/src/mdw-sync/services/live-indexer.service.ts index 6b95e5cf..defa1f73 100644 --- a/src/mdw-sync/services/live-indexer.service.ts +++ b/src/mdw-sync/services/live-indexer.service.ts @@ -113,10 +113,11 @@ export class LiveIndexerService implements OnModuleInit, OnModuleDestroy { created_at: new Date(fullBlock.time), }; - // Upsert the key block to handle duplicate key violations gracefully + // Upsert the key block to handle duplicate key violations gracefully. + // Do not use skipUpdateIfNoValuesChanged because KeyBlock includes jsonb + // data and PostgreSQL cannot compare json/jsonb values with "=" here. await this.blockRepository.upsert(blockToSave, { conflictPaths: ['height'], - skipUpdateIfNoValuesChanged: true, }); // Update live_synced_height diff --git a/src/plugins/bcl/services/token.service.ts b/src/plugins/bcl/services/token.service.ts index db737400..bbb3f68f 100644 --- a/src/plugins/bcl/services/token.service.ts +++ b/src/plugins/bcl/services/token.service.ts @@ -167,10 +167,11 @@ export class TokenService { manager, ); } else { - // Use upsert to handle race conditions where token might be created concurrently + // Use upsert to handle race conditions where token might be created concurrently. + // Do not use skipUpdateIfNoValuesChanged because Token has json columns and + // PostgreSQL cannot compare json values with "=" during that optimization. await repository.upsert(tokenData, { conflictPaths: ['sale_address'], - skipUpdateIfNoValuesChanged: true, }); token = await this.findByAddress(saleAddress, false, manager); isNewToken = true; diff --git a/src/tokens/queues/sync-token-holders.queue.spec.ts b/src/tokens/queues/sync-token-holders.queue.spec.ts index da010885..a4762836 100644 --- a/src/tokens/queues/sync-token-holders.queue.spec.ts +++ b/src/tokens/queues/sync-token-holders.queue.spec.ts @@ -1,5 +1,6 @@ import { recordSyncTokenHoldersDuration } from '@/utils/stabilization-metrics'; import { SyncTokenHoldersQueue } from './sync-token-holders.queue'; +import { RetryableTokenHoldersSyncError } from '../tokens.service'; jest.mock('@/utils/stabilization-metrics', () => ({ recordSyncTokenHoldersDuration: jest.fn(), @@ -156,4 +157,29 @@ describe('SyncTokenHoldersQueue', () => { 'lock-owner', ); }); + + it('schedules a delayed retry for contract-not-ready sync failures', async () => { + const add = jest.fn().mockResolvedValue(undefined); + tokenService.loadAndSaveTokenHoldersFromMdw.mockRejectedValue( + new RetryableTokenHoldersSyncError('contract not ready', 60_000), + ); + + await queue.process({ + data: { saleAddress }, + queue: { add }, + } as any); + + expect(add).toHaveBeenCalledWith( + { saleAddress }, + expect.objectContaining({ + jobId: `syncTokenHolders-retry-${saleAddress}`, + delay: 60_000, + attempts: 1, + }), + ); + expect(tokenHoldersLockService.releaseLock).toHaveBeenCalledWith( + saleAddress, + 'lock-owner', + ); + }); }); diff --git a/src/tokens/queues/sync-token-holders.queue.ts b/src/tokens/queues/sync-token-holders.queue.ts index 0747ac0b..94d63e7d 100644 --- a/src/tokens/queues/sync-token-holders.queue.ts +++ b/src/tokens/queues/sync-token-holders.queue.ts @@ -3,7 +3,7 @@ import { Encoded } from '@aeternity/aepp-sdk'; import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { Job } from 'bull'; -import { TokensService } from '../tokens.service'; +import { RetryableTokenHoldersSyncError, TokensService } from '../tokens.service'; import { TokenHoldersLockService } from '../services/token-holders-lock.service'; import { SYNC_TOKEN_HOLDERS_QUEUE } from './constants'; @@ -116,6 +116,25 @@ export class SyncTokenHoldersQueue { this.inFlightSyncs.delete(saleAddress); this.inFlightStartedAt.delete(saleAddress); } + if (error instanceof RetryableTokenHoldersSyncError) { + this.logger.warn( + `SyncTokenHoldersQueue->retry-scheduled:${saleAddress} in ${error.retryDelayMs}ms`, + ); + await job.queue.add( + { + saleAddress, + }, + { + jobId: `syncTokenHolders-retry-${saleAddress}`, + delay: error.retryDelayMs, + removeOnComplete: true, + removeOnFail: true, + attempts: 1, + timeout: this.jobTimeoutMs, + }, + ); + return; + } this.logger.error( `SyncTokenHoldersQueue->error:${saleAddress} (${Date.now() - startedAt}ms)`, error, diff --git a/src/tokens/services/refresh-token-eligibility-counts.service.spec.ts b/src/tokens/services/refresh-token-eligibility-counts.service.spec.ts index bc6d6622..990618f8 100644 --- a/src/tokens/services/refresh-token-eligibility-counts.service.spec.ts +++ b/src/tokens/services/refresh-token-eligibility-counts.service.spec.ts @@ -192,6 +192,33 @@ describe('RefreshTokenEligibilityCountsService', () => { expect((service as any).isRefreshing).toBe(false); }); + it('adds structured database context for pool timeouts during refresh', async () => { + const loggerError = jest + .spyOn((service as any).logger, 'error') + .mockImplementation(() => undefined); + + dataSource.query.mockRejectedValueOnce( + new Error('timeout exceeded when trying to connect'), + ); + + await expect(service.manualRefresh()).resolves.toBeUndefined(); + + expect(loggerError).toHaveBeenCalledWith( + expect.stringContaining( + 'Database connectivity/pool issue during token eligibility ensure counts table: timeout exceeded when trying to connect.', + ), + expect.any(String), + ); + expect(loggerError).toHaveBeenCalledWith( + expect.stringContaining('"issueKind":"pool_timeout"'), + expect.any(String), + ); + expect(loggerError).toHaveBeenCalledWith( + 'Failed to refresh token eligibility counts via manual', + expect.stringContaining('timeout exceeded when trying to connect'), + ); + }); + it('skips the startup refresh when no watermark exists yet', async () => { const loggerWarn = jest .spyOn((service as any).logger, 'warn') diff --git a/src/tokens/services/refresh-token-eligibility-counts.service.ts b/src/tokens/services/refresh-token-eligibility-counts.service.ts index 48461009..a962f22f 100644 --- a/src/tokens/services/refresh-token-eligibility-counts.service.ts +++ b/src/tokens/services/refresh-token-eligibility-counts.service.ts @@ -3,6 +3,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; import { InjectDataSource } from '@nestjs/typeorm'; import { TRENDING_SCORE_CONFIG } from '@/configs'; +import { runWithDatabaseIssueLogging } from '@/utils/database-issue-logging'; import { DataSource } from 'typeorm'; const TOKEN_ELIGIBILITY_REFRESH_STATE_ID = 'default'; @@ -76,142 +77,162 @@ export class RefreshTokenEligibilityCountsService { try { await this.ensureTableExists(); - await this.dataSource.transaction(async (manager) => { - const state = await this.loadRefreshState(manager); + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'token eligibility refresh transaction', + context: { + trigger, + }, + operation: () => + this.dataSource.transaction(async (manager) => { + const state = await this.loadRefreshState(manager); - if (!state?.last_processed_created_at) { - processedPosts = await this.rebuildAllCounts(manager); - return; - } + if (!state?.last_processed_created_at) { + processedPosts = await this.rebuildAllCounts(manager); + return; + } - const [latestUnprocessedPost] = await manager.query( - ` - SELECT post.created_at, post.id - FROM posts post - WHERE post.created_at > $1 - OR (post.created_at = $1 AND post.id > $2) - ORDER BY post.created_at DESC, post.id DESC - LIMIT 1 - `, - [state.last_processed_created_at, state.last_processed_post_id || ''], - ); + const [latestUnprocessedPost] = await manager.query( + ` + SELECT post.created_at, post.id + FROM posts post + WHERE post.created_at > $1 + OR (post.created_at = $1 AND post.id > $2) + ORDER BY post.created_at DESC, post.id DESC + LIMIT 1 + `, + [ + state.last_processed_created_at, + state.last_processed_post_id || '', + ], + ); - if (!latestUnprocessedPost) { - return; - } + if (!latestUnprocessedPost) { + return; + } - const [{ processed_count }] = await manager.query( - ` - SELECT COUNT(*)::int AS processed_count - FROM posts post - WHERE ( - post.created_at > $1 - OR (post.created_at = $1 AND post.id > $2) - ) - AND ( - post.created_at < $3 - OR (post.created_at = $3 AND post.id <= $4) - ) - `, - [ - state.last_processed_created_at, - state.last_processed_post_id || '', - latestUnprocessedPost.created_at, - latestUnprocessedPost.id, - ], - ); - processedPosts = Number(processed_count || 0); + const [{ processed_count }] = await manager.query( + ` + SELECT COUNT(*)::int AS processed_count + FROM posts post + WHERE ( + post.created_at > $1 + OR (post.created_at = $1 AND post.id > $2) + ) + AND ( + post.created_at < $3 + OR (post.created_at = $3 AND post.id <= $4) + ) + `, + [ + state.last_processed_created_at, + state.last_processed_post_id || '', + latestUnprocessedPost.created_at, + latestUnprocessedPost.id, + ], + ); + processedPosts = Number(processed_count || 0); - await manager.query( - ` - INSERT INTO token_eligibility_counts ( - symbol, - post_count, - stored_post_count, - content_post_count, - refreshed_at - ) - WITH matched AS ( - SELECT DISTINCT - post.id AS post_id, - UPPER(mention.symbol) AS symbol, - 'stored' AS match_source - FROM posts post - CROSS JOIN LATERAL jsonb_array_elements_text( - COALESCE(post.token_mentions, '[]'::jsonb) - ) AS mention(symbol) - WHERE ( - post.created_at > $1 - OR (post.created_at = $1 AND post.id > $2) - ) - AND ( - post.created_at < $3 - OR (post.created_at = $3 AND post.id <= $4) + await manager.query( + ` + INSERT INTO token_eligibility_counts ( + symbol, + post_count, + stored_post_count, + content_post_count, + refreshed_at ) - AND post.is_hidden = false - AND mention.symbol <> '' + WITH matched AS ( + SELECT DISTINCT + post.id AS post_id, + UPPER(mention.symbol) AS symbol, + 'stored' AS match_source + FROM posts post + CROSS JOIN LATERAL jsonb_array_elements_text( + COALESCE(post.token_mentions, '[]'::jsonb) + ) AS mention(symbol) + WHERE ( + post.created_at > $1 + OR (post.created_at = $1 AND post.id > $2) + ) + AND ( + post.created_at < $3 + OR (post.created_at = $3 AND post.id <= $4) + ) + AND post.is_hidden = false + AND mention.symbol <> '' - UNION ALL + UNION ALL - SELECT DISTINCT - post.id AS post_id, - UPPER(content_match[1]) AS symbol, - 'content' AS match_source - FROM posts post - CROSS JOIN LATERAL regexp_matches( - COALESCE(post.content, ''), - '${TOKEN_HASHTAG_REGEX_SOURCE}', - 'g' - ) AS content_match - WHERE ( - post.created_at > $1 - OR (post.created_at = $1 AND post.id > $2) - ) - AND ( - post.created_at < $3 - OR (post.created_at = $3 AND post.id <= $4) + SELECT DISTINCT + post.id AS post_id, + UPPER(content_match[1]) AS symbol, + 'content' AS match_source + FROM posts post + CROSS JOIN LATERAL regexp_matches( + COALESCE(post.content, ''), + '${TOKEN_HASHTAG_REGEX_SOURCE}', + 'g' + ) AS content_match + WHERE ( + post.created_at > $1 + OR (post.created_at = $1 AND post.id > $2) + ) + AND ( + post.created_at < $3 + OR (post.created_at = $3 AND post.id <= $4) + ) + AND post.is_hidden = false + AND jsonb_array_length(COALESCE(post.token_mentions, '[]'::jsonb)) = 0 ) - AND post.is_hidden = false - AND jsonb_array_length(COALESCE(post.token_mentions, '[]'::jsonb)) = 0 - ) - SELECT - matched.symbol, - COUNT(DISTINCT matched.post_id) AS post_count, - COUNT(DISTINCT matched.post_id) FILTER ( - WHERE matched.match_source = 'stored' - ) AS stored_post_count, - COUNT(DISTINCT matched.post_id) FILTER ( - WHERE matched.match_source = 'content' - ) AS content_post_count, - CURRENT_TIMESTAMP(6) AS refreshed_at - FROM matched - GROUP BY matched.symbol - ON CONFLICT (symbol) DO UPDATE - SET - post_count = token_eligibility_counts.post_count + EXCLUDED.post_count, - stored_post_count = token_eligibility_counts.stored_post_count + EXCLUDED.stored_post_count, - content_post_count = token_eligibility_counts.content_post_count + EXCLUDED.content_post_count, - refreshed_at = EXCLUDED.refreshed_at - `, - [ - state.last_processed_created_at, - state.last_processed_post_id || '', - latestUnprocessedPost.created_at, - latestUnprocessedPost.id, - ], - ); + SELECT + matched.symbol, + COUNT(DISTINCT matched.post_id) AS post_count, + COUNT(DISTINCT matched.post_id) FILTER ( + WHERE matched.match_source = 'stored' + ) AS stored_post_count, + COUNT(DISTINCT matched.post_id) FILTER ( + WHERE matched.match_source = 'content' + ) AS content_post_count, + CURRENT_TIMESTAMP(6) AS refreshed_at + FROM matched + GROUP BY matched.symbol + ON CONFLICT (symbol) DO UPDATE + SET + post_count = token_eligibility_counts.post_count + EXCLUDED.post_count, + stored_post_count = token_eligibility_counts.stored_post_count + EXCLUDED.stored_post_count, + content_post_count = token_eligibility_counts.content_post_count + EXCLUDED.content_post_count, + refreshed_at = EXCLUDED.refreshed_at + `, + [ + state.last_processed_created_at, + state.last_processed_post_id || '', + latestUnprocessedPost.created_at, + latestUnprocessedPost.id, + ], + ); - await this.upsertRefreshState( - manager, - latestUnprocessedPost.created_at, - latestUnprocessedPost.id, - ); + await this.upsertRefreshState( + manager, + latestUnprocessedPost.created_at, + latestUnprocessedPost.id, + ); + }), }); - const [{ count }] = await this.dataSource.query(` - SELECT COUNT(*)::int AS count - FROM token_eligibility_counts - `); + const [{ count }] = await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'token eligibility refresh summary count', + context: { + trigger, + processedPosts, + }, + operation: () => + this.dataSource.query(` + SELECT COUNT(*)::int AS count + FROM token_eligibility_counts + `), + }); this.logger.log( `Refreshed token eligibility counts via ${trigger} in ${ @@ -229,12 +250,24 @@ export class RefreshTokenEligibilityCountsService { } private async ensureTableExists(): Promise { - await this.dataSource.query( - RefreshTokenEligibilityCountsService.ENSURE_TABLE_SQL, - ); - await this.dataSource.query( - RefreshTokenEligibilityCountsService.ENSURE_STATE_TABLE_SQL, - ); + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'token eligibility ensure counts table', + context: {}, + operation: () => + this.dataSource.query( + RefreshTokenEligibilityCountsService.ENSURE_TABLE_SQL, + ), + }); + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'token eligibility ensure refresh state table', + context: {}, + operation: () => + this.dataSource.query( + RefreshTokenEligibilityCountsService.ENSURE_STATE_TABLE_SQL, + ), + }); } private async loadRefreshState( diff --git a/src/tokens/tokens.controller.spec.ts b/src/tokens/tokens.controller.spec.ts index 25d6d3b4..2a57673e 100644 --- a/src/tokens/tokens.controller.spec.ts +++ b/src/tokens/tokens.controller.spec.ts @@ -298,6 +298,19 @@ describe('TokensController', () => { expect(result).toEqual({ items: [], meta: {} }); }); + it('should throw when listing holders for an unknown token', async () => { + const createQueryBuilderSpy = jest.spyOn( + tokenHolderRepository, + 'createQueryBuilder', + ); + tokensService.findByAddress = jest.fn().mockResolvedValue(null); + + await expect(controller.listTokenHolders('missing')).rejects.toBeInstanceOf( + NotFoundException, + ); + expect(createQueryBuilderSpy).not.toHaveBeenCalled(); + }); + it('should return paginated token rankings', async () => { const result = await controller.listTokenRankings('ct_123'); expect(tokensService.findByAddress).toHaveBeenCalledWith('ct_123'); diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 06eed58d..4f7fc756 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -279,6 +279,10 @@ export class TokensController { @Query('limit', new DefaultValuePipe(100), ParseIntPipe) limit = 100, ): Promise> { const token = await this.tokensService.findByAddress(address); + if (!token) { + throw new NotFoundException('Token not found'); + } + const queryBuilder = this.tokenHolderRepository.createQueryBuilder('token_holder'); diff --git a/src/tokens/tokens.service.spec.ts b/src/tokens/tokens.service.spec.ts index d08eeaed..20c47730 100644 --- a/src/tokens/tokens.service.spec.ts +++ b/src/tokens/tokens.service.spec.ts @@ -3,7 +3,7 @@ import { TOKEN_LIST_ELIGIBILITY_CONFIG, TRENDING_SCORE_CONFIG, } from '@/configs/constants'; -import { TokensService } from './tokens.service'; +import { RetryableTokenHoldersSyncError, TokensService } from './tokens.service'; describe('TokensService', () => { let service: TokensService; @@ -11,12 +11,18 @@ describe('TokensService', () => { let transactionsRepository: any; let postsRepository: any; let tokenEligibilityCountsRepository: any; + let communityFactoryService: any; + let pullTokenInfoQueue: any; beforeEach(() => { tokensRepository = { createQueryBuilder: jest.fn(), update: jest.fn(), query: jest.fn(), + upsert: jest.fn(), + save: jest.fn(), + delete: jest.fn(), + findOneBy: jest.fn(), }; transactionsRepository = { query: jest.fn(), @@ -27,6 +33,12 @@ describe('TokensService', () => { tokenEligibilityCountsRepository = { findOne: jest.fn(), }; + communityFactoryService = { + getCurrentFactory: jest.fn(), + }; + pullTokenInfoQueue = { + add: jest.fn(), + }; service = new TokensService( tokensRepository as any, @@ -37,8 +49,8 @@ describe('TokensService', () => { {} as any, {} as any, {} as any, - {} as any, - {} as any, + communityFactoryService as any, + pullTokenInfoQueue as any, ); }); @@ -331,4 +343,106 @@ describe('TokensService', () => { expect(holders).toEqual([]); expect(service.getTokenContractsBySaleAddress).toHaveBeenCalledTimes(1); }); + + it('reuses configured contract-not-present retry settings for holder sync retries', async () => { + (service as any).contractNotPresentMaxAttempts = 2; + (service as any).contractNotPresentRetryDelayMs = 1234; + + jest + .spyOn(service, 'getTokenContractsBySaleAddress') + .mockRejectedValue(new Error('contract_does_not_exist')); + jest.spyOn(service as any, 'sleep').mockResolvedValue(undefined); + + await expect( + service._loadHoldersFromContract( + { + sale_address: 'ct_missing', + } as any, + 'ct_aex9', + ), + ).rejects.toMatchObject>({ + retryDelayMs: 1234, + }); + + expect(service.getTokenContractsBySaleAddress).toHaveBeenCalledTimes(2); + expect((service as any).sleep).toHaveBeenCalledWith(1234); + }); + + it('keeps generic holder sync failures at three attempts when contract-not-ready retries are lower', async () => { + (service as any).contractNotPresentMaxAttempts = 2; + + jest + .spyOn(service, 'getTokenContractsBySaleAddress') + .mockRejectedValue(new Error('timeout exceeded')); + jest.spyOn(service as any, 'sleep').mockResolvedValue(undefined); + + await expect( + service._loadHoldersFromContract( + { + sale_address: 'ct_timeout', + } as any, + 'ct_aex9', + ), + ).resolves.toEqual([]); + + expect(service.getTokenContractsBySaleAddress).toHaveBeenCalledTimes(3); + expect((service as any).sleep).toHaveBeenNthCalledWith(1, 500); + expect((service as any).sleep).toHaveBeenNthCalledWith(2, 1000); + }); + + it('uses upsert and reload when creating a token from a raw transaction', async () => { + communityFactoryService.getCurrentFactory.mockResolvedValue({ + address: 'ct_factory', + collections: { word: true }, + }); + jest.spyOn(service, 'findByNameOrSymbol').mockResolvedValue(null); + tokensRepository.findOneBy + .mockResolvedValueOnce(null) + .mockResolvedValueOnce({ + sale_address: 'ct_sale', + name: 'BLA', + }); + + const token = await service.createTokenFromRawTransaction({ + hash: 'th_hash', + microTime: '2026-03-24T11:49:27.106Z', + tx: { + function: 'create_community', + return_type: 'ok', + return: { + value: [{ value: 'ct_dao' }, { value: 'ct_sale' }], + }, + arguments: [{ value: 'word' }, { value: 'BLA' }], + callerId: 'ak_creator', + }, + }); + + expect(tokensRepository.upsert).toHaveBeenCalledWith( + expect.objectContaining({ + dao_address: 'ct_dao', + sale_address: 'ct_sale', + factory_address: 'ct_factory', + creator_address: 'ak_creator', + name: 'BLA', + symbol: 'BLA', + create_tx_hash: 'th_hash', + }), + { + conflictPaths: ['sale_address'], + }, + ); + expect(tokensRepository.save).not.toHaveBeenCalled(); + expect(pullTokenInfoQueue.add).toHaveBeenCalledWith( + { saleAddress: 'ct_sale' }, + expect.objectContaining({ + jobId: 'pullTokenInfo-ct_sale', + lifo: true, + removeOnComplete: true, + }), + ); + expect(token).toEqual({ + sale_address: 'ct_sale', + name: 'BLA', + }); + }); }); diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index db69c64d..b4a4c927 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -12,6 +12,7 @@ import { TRENDING_SCORE_CONFIG, } from '@/configs'; import { fetchJson } from '@/utils/common'; +import { runWithDatabaseIssueLogging } from '@/utils/database-issue-logging'; import { ITransaction } from '@/utils/types'; import { Encoded } from '@aeternity/aepp-sdk'; import { InjectQueue } from '@nestjs/bull'; @@ -95,9 +96,20 @@ export interface TokenTrendingEligibilityBreakdown { }; } +export class RetryableTokenHoldersSyncError extends Error { + constructor( + message: string, + readonly retryDelayMs = 60_000, + ) { + super(message); + this.name = 'RetryableTokenHoldersSyncError'; + } +} + @Injectable() export class TokensService { private readonly logger = new Logger(TokensService.name); + private readonly holderLoadRetryMaxAttempts = 3; private readonly contractCallTimeoutMs = Number( process.env.TOKEN_CONTRACT_CALL_TIMEOUT_MS || 30_000, ); @@ -346,10 +358,11 @@ export class TokensService { }); } - // Use upsert to handle race conditions where token might be created concurrently + // Use upsert to handle race conditions where token might be created concurrently. + // Do not use skipUpdateIfNoValuesChanged because Token has json columns and + // PostgreSQL cannot compare json values with "=" during that optimization. await this.tokensRepository.upsert(tokenData, { conflictPaths: ['sale_address'], - skipUpdateIfNoValuesChanged: true, }); const newToken = await this.findByAddress(saleAddress); if (!newToken) { @@ -913,15 +926,21 @@ export class TokensService { create_tx_hash: rawTransaction?.hash, }; - let token; - let isNewToken = false; - // TODO: should only update if the data is different - if (tokenExists?.sale_address) { - await this.tokensRepository.update(tokenExists.sale_address, tokenData); - token = await this.findByAddress(tokenExists.sale_address); - } else { - token = await this.tokensRepository.save(tokenData); - isNewToken = true; + const existingTokenBySaleAddress = await this.findOne(saleAddress); + const isNewToken = !existingTokenBySaleAddress; + + // Mirror createToken() so concurrent requests converge instead of surfacing + // duplicate-key errors when they race to create the same row. + // Do not use skipUpdateIfNoValuesChanged here because Token has json columns + // and PostgreSQL cannot compare json values with "=" during that optimization. + await this.tokensRepository.upsert(tokenData, { + conflictPaths: ['sale_address'], + }); + const token = await this.findOne(saleAddress); + if (!token) { + throw new Error( + `Failed to create or retrieve token for sale address: ${saleAddress}`, + ); } await this.pullTokenInfoQueue.add( @@ -1005,16 +1024,46 @@ export class TokensService { ); if (uniqueHolders.length > 0) { - await this.tokenHoldersRepository.delete({ - aex9_address: aex9Address, + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'token holders delete before sync', + context: { + saleAddress, + aex9Address, + holderCount: uniqueHolders.length, + }, + operation: () => + this.tokenHoldersRepository.delete({ + aex9_address: aex9Address, + }), }); - await this.tokenHoldersRepository.upsert(uniqueHolders, { - conflictPaths: ['id'], - skipUpdateIfNoValuesChanged: true, + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'token holders upsert', + context: { + saleAddress, + aex9Address, + holderCount: uniqueHolders.length, + }, + operation: () => + this.tokenHoldersRepository.upsert(uniqueHolders, { + conflictPaths: ['id'], + skipUpdateIfNoValuesChanged: true, + }), }); } - await this.tokensRepository.update(token.sale_address, { - holders_count: uniqueHolders.length, + await runWithDatabaseIssueLogging({ + logger: this.logger, + stage: 'token holders count update', + context: { + saleAddress, + aex9Address, + holderCount: uniqueHolders.length, + }, + operation: () => + this.tokensRepository.update(token.sale_address, { + holders_count: uniqueHolders.length, + }), }); } @@ -1132,9 +1181,13 @@ export class TokensService { } async _loadHoldersFromContract(token: Token, aex9Address: string) { - const maxAttempts = 3; + const contractNotReadyMaxAttempts = this.contractNotPresentMaxAttempts; + const totalRetryAttempts = Math.max( + contractNotReadyMaxAttempts, + this.holderLoadRetryMaxAttempts, + ); - for (let attempt = 1; attempt <= maxAttempts; attempt++) { + for (let attempt = 1; attempt <= totalRetryAttempts; attempt++) { try { const contracts = await this.getTokenContractsBySaleAddress( token.sale_address as Encoded.ContractAddress, @@ -1166,6 +1219,7 @@ export class TokensService { return holders || []; } catch (error: any) { const isOutOfGasError = this.isOutOfGasError(error); + const isContractNotReadyError = this.isContractNotReadyError(error); const isTimeoutError = error?.message?.includes('timeout'); if (isOutOfGasError) { @@ -1175,10 +1229,26 @@ export class TokensService { return []; } - if (attempt < maxAttempts) { + if (isContractNotReadyError) { + if (attempt < contractNotReadyMaxAttempts) { + const waitMs = this.contractNotPresentRetryDelayMs * attempt; + this.logger.warn( + `SyncTokenHoldersQueue->_loadHoldersFromContract: contract not ready for ${aex9Address} (attempt ${attempt}/${contractNotReadyMaxAttempts}), retrying in ${waitMs}ms`, + ); + await this.sleep(waitMs); + continue; + } + + throw new RetryableTokenHoldersSyncError( + `SyncTokenHoldersQueue->_loadHoldersFromContract: contract not ready for ${aex9Address}, retrying in ${this.contractNotPresentRetryDelayMs}ms`, + this.contractNotPresentRetryDelayMs, + ); + } + + if (attempt < this.holderLoadRetryMaxAttempts) { const waitMs = 500 * attempt; this.logger.warn( - `SyncTokenHoldersQueue->_loadHoldersFromContract: attempt ${attempt}/${maxAttempts} failed for ${aex9Address}${isTimeoutError ? ' (timeout)' : ''}, retrying in ${waitMs}ms`, + `SyncTokenHoldersQueue->_loadHoldersFromContract: attempt ${attempt}/${this.holderLoadRetryMaxAttempts} failed for ${aex9Address}${isTimeoutError ? ' (timeout)' : ''}, retrying in ${waitMs}ms`, ); await this.sleep(waitMs); continue; @@ -1196,6 +1266,11 @@ export class TokensService { return []; } + private isContractNotReadyError(error: unknown): boolean { + const message = error instanceof Error ? error.message : String(error); + return message.includes('contract_does_not_exist'); + } + private async withTimeout( promise: Promise, timeoutMs: number, diff --git a/src/utils/database-issue-logging.spec.ts b/src/utils/database-issue-logging.spec.ts new file mode 100644 index 00000000..680629e7 --- /dev/null +++ b/src/utils/database-issue-logging.spec.ts @@ -0,0 +1,68 @@ +import { + getDatabaseIssueKind, + isDatabaseConnectionOrPoolError, + runWithDatabaseIssueLogging, +} from './database-issue-logging'; + +describe('database issue logging helpers', () => { + it('detects connection and pool errors', () => { + expect( + isDatabaseConnectionOrPoolError( + new Error('timeout exceeded when trying to connect'), + ), + ).toBe(true); + expect( + isDatabaseConnectionOrPoolError(new Error('too many clients already')), + ).toBe(true); + expect( + isDatabaseConnectionOrPoolError(new Error('duplicate key value')), + ).toBe(false); + }); + + it('classifies common database connectivity issue kinds', () => { + expect( + getDatabaseIssueKind(new Error('timeout exceeded when trying to connect')), + ).toBe('pool_timeout'); + expect(getDatabaseIssueKind(new Error('too many clients already'))).toBe( + 'pool_exhausted', + ); + expect(getDatabaseIssueKind(new Error('connect ECONNREFUSED'))).toBe( + 'connection_refused', + ); + }); + + it('logs structured context for retryable database connectivity errors', async () => { + const logger = { + error: jest.fn(), + }; + + await expect( + runWithDatabaseIssueLogging({ + logger, + stage: 'token holders upsert', + context: { + saleAddress: 'ct_sale', + holderCount: 25, + }, + operation: async () => { + throw new Error('timeout exceeded when trying to connect'); + }, + }), + ).rejects.toThrow('timeout exceeded when trying to connect'); + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining( + 'Database connectivity/pool issue during token holders upsert: timeout exceeded when trying to connect.', + ), + expect.any(String), + ); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('"issueKind":"pool_timeout"'), + expect.any(String), + ); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('"saleAddress":"ct_sale"'), + expect.any(String), + ); + }); +}); diff --git a/src/utils/database-issue-logging.ts b/src/utils/database-issue-logging.ts new file mode 100644 index 00000000..4b0d6c4c --- /dev/null +++ b/src/utils/database-issue-logging.ts @@ -0,0 +1,131 @@ +import { DATABASE_CONFIG } from '@/configs'; + +type DatabaseLogger = { + error: (message: string, trace?: string) => unknown; +}; + +export type DatabaseIssuePoolConfig = { + max: number; + min: number; + connectionTimeoutMillis: number; +}; + +const defaultExtra = (DATABASE_CONFIG as any)?.extra ?? {}; + +export const DEFAULT_DATABASE_ISSUE_POOL_CONFIG: DatabaseIssuePoolConfig = { + max: Number(defaultExtra.max ?? 40), + min: Number(defaultExtra.min ?? 5), + connectionTimeoutMillis: Number(defaultExtra.connectionTimeoutMillis ?? 10_000), +}; + +export function isDatabaseConnectionOrPoolError(error: unknown): boolean { + const message = + error instanceof Error + ? error.message + : typeof error === 'string' + ? error + : ''; + const normalized = message.toLowerCase(); + + return [ + 'timeout exceeded when trying to connect', + 'too many clients already', + 'remaining connection slots are reserved', + 'connection terminated unexpectedly', + 'connection ended unexpectedly', + 'connection refused', + 'econnrefused', + 'etimedout', + 'connection error', + ].some((fragment) => normalized.includes(fragment)); +} + +export function getDatabaseIssueKind(error: unknown): string { + const message = + error instanceof Error + ? error.message + : typeof error === 'string' + ? error + : ''; + const normalized = message.toLowerCase(); + + if (normalized.includes('timeout exceeded when trying to connect')) { + return 'pool_timeout'; + } + if ( + normalized.includes('too many clients already') || + normalized.includes('remaining connection slots are reserved') + ) { + return 'pool_exhausted'; + } + if ( + normalized.includes('connection refused') || + normalized.includes('econnrefused') + ) { + return 'connection_refused'; + } + if ( + normalized.includes('connection terminated unexpectedly') || + normalized.includes('connection ended unexpectedly') + ) { + return 'connection_terminated'; + } + if ( + normalized.includes('etimedout') || + normalized.includes('connection error') + ) { + return 'connectivity_error'; + } + + return 'unknown_db_connectivity_issue'; +} + +export function logDatabaseIssue(params: { + logger: DatabaseLogger; + stage: string; + error: unknown; + context: Record; + poolConfig?: DatabaseIssuePoolConfig; +}): void { + const poolConfig = params.poolConfig ?? DEFAULT_DATABASE_ISSUE_POOL_CONFIG; + const issueKind = getDatabaseIssueKind(params.error); + const message = + params.error instanceof Error + ? params.error.message + : String(params.error ?? 'unknown error'); + const contextJson = JSON.stringify({ + ...params.context, + dbPoolMax: poolConfig.max, + dbPoolMin: poolConfig.min, + dbConnectTimeoutMs: poolConfig.connectionTimeoutMillis, + issueKind, + }); + + params.logger.error( + `Database connectivity/pool issue during ${params.stage}: ${message}. Context: ${contextJson}`, + params.error instanceof Error ? params.error.stack : undefined, + ); +} + +export async function runWithDatabaseIssueLogging(params: { + logger: DatabaseLogger; + stage: string; + context: Record; + operation: () => Promise; + poolConfig?: DatabaseIssuePoolConfig; +}): Promise { + try { + return await params.operation(); + } catch (error) { + if (isDatabaseConnectionOrPoolError(error)) { + logDatabaseIssue({ + logger: params.logger, + stage: params.stage, + error, + context: params.context, + poolConfig: params.poolConfig, + }); + } + throw error; + } +}