diff --git a/migrations/20260316120000-token-pool-download-observability.js b/migrations/20260316120000-token-pool-download-observability.js new file mode 100644 index 0000000..0a42150 --- /dev/null +++ b/migrations/20260316120000-token-pool-download-observability.js @@ -0,0 +1,49 @@ +'use strict'; + +var dbm; +var type; +var seed; +var fs = require('fs'); +var path = require('path'); +var Promise; + +exports.setup = function(options, seedLink) { + dbm = options.dbmigrate; + type = dbm.dataType; + seed = seedLink; + Promise = options.Promise; +}; + +exports.up = function(db) { + var filePath = path.join(__dirname, 'sqls', '20260316120000-token-pool-download-observability-up.sql'); + return new Promise( function( resolve, reject ) { + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return reject(err); + console.log('received data: ' + data); + + resolve(data); + }); + }) + .then(function(data) { + return db.runSql(data); + }); +}; + +exports.down = function(db) { + var filePath = path.join(__dirname, 'sqls', '20260316120000-token-pool-download-observability-down.sql'); + return new Promise( function( resolve, reject ) { + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return reject(err); + console.log('received data: ' + data); + + resolve(data); + }); + }) + .then(function(data) { + return db.runSql(data); + }); +}; + +exports._meta = { + "version": 1 +}; diff --git a/migrations/sqls/20260316120000-token-pool-download-observability-down.sql b/migrations/sqls/20260316120000-token-pool-download-observability-down.sql new file mode 100644 index 0000000..f049279 --- /dev/null +++ b/migrations/sqls/20260316120000-token-pool-download-observability-down.sql @@ -0,0 +1,14 @@ +ALTER TABLE token_pool_download + DROP COLUMN progress, + DROP COLUMN stage, + DROP COLUMN attempt_count, + DROP COLUMN last_failure_reason, + DROP COLUMN last_failure_at, + DROP COLUMN failure_count, + DROP COLUMN error_reason, + DROP COLUMN failed_at, + DROP COLUMN completed_at, + DROP COLUMN last_heartbeat_at, + DROP COLUMN claimed_at, + DROP COLUMN updated_at, + DROP COLUMN created_at; diff --git a/migrations/sqls/20260316120000-token-pool-download-observability-up.sql b/migrations/sqls/20260316120000-token-pool-download-observability-up.sql new file mode 100644 index 0000000..a3adc62 --- /dev/null +++ b/migrations/sqls/20260316120000-token-pool-download-observability-up.sql @@ -0,0 +1,14 @@ +ALTER TABLE token_pool_download + ADD COLUMN created_at BIGINT NULL, + ADD COLUMN updated_at BIGINT NULL, + ADD COLUMN claimed_at BIGINT NULL, + ADD COLUMN last_heartbeat_at BIGINT NULL, + ADD COLUMN completed_at BIGINT NULL, + ADD COLUMN failed_at BIGINT NULL, + ADD COLUMN error_reason LONGTEXT NULL, + ADD COLUMN failure_count INT NOT NULL DEFAULT 0, + ADD COLUMN last_failure_at BIGINT NULL, + ADD COLUMN last_failure_reason LONGTEXT NULL, + ADD COLUMN attempt_count INT NOT NULL DEFAULT 0, + ADD COLUMN stage VARCHAR(64) NULL, + ADD COLUMN progress LONGTEXT NULL; diff --git a/package.json b/package.json index 9a3783f..b979d95 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "sentry:sourcemaps": "./node_modules/.bin/sentry-cli sourcemaps inject --org seize-ff --project allowlist-api ./dist && ./node_modules/.bin/sentry-cli sourcemaps upload --org seize-ff --project allowlist-api ./dist" }, "dependencies": { - "@6529-collections/allowlist-lib": "0.0.130", + "@6529-collections/allowlist-lib": "0.0.131", "@aws-sdk/client-secrets-manager": "^3.334.0", "@aws-sdk/client-sns": "^3.335.0", "@nestjs/axios": "^3.0.1", diff --git a/src/api/token-pool-download/model/token-pool-download-response-api.model.ts b/src/api/token-pool-download/model/token-pool-download-response-api.model.ts index c0c5f2b..7ce0e9e 100644 --- a/src/api/token-pool-download/model/token-pool-download-response-api.model.ts +++ b/src/api/token-pool-download/model/token-pool-download-response-api.model.ts @@ -1,5 +1,6 @@ import { ApiProperty } from '@nestjs/swagger'; import { TokenPoolDownloadStatus } from '../../../repository/token-pool-download/token-pool-download-status'; +import { TokenPoolDownloadStage } from '../../../repository/token-pool-download/token-pool-download-stage'; export class TokenPoolDownloadResponseApiModel { @ApiProperty({ @@ -32,8 +33,108 @@ export class TokenPoolDownloadResponseApiModel { }) readonly status: TokenPoolDownloadStatus; + @ApiProperty({ + description: 'Raw persisted status of the token pool download.', + }) + readonly rawStatus: TokenPoolDownloadStatus; + @ApiProperty({ description: 'Block number of the token pool consolidation.', }) readonly consolidateBlockNo: number | null; + + @ApiProperty({ + required: false, + type: Number, + description: 'Creation timestamp of the token pool download.', + }) + readonly createdAt?: number; + + @ApiProperty({ + required: false, + type: Number, + description: 'Last update timestamp of the token pool download.', + }) + readonly updatedAt?: number; + + @ApiProperty({ + required: false, + type: Number, + description: 'Claim timestamp of the token pool download.', + }) + readonly claimedAt?: number; + + @ApiProperty({ + required: false, + type: Number, + description: 'Last heartbeat timestamp of the token pool download.', + }) + readonly lastHeartbeatAt?: number; + + @ApiProperty({ + required: false, + type: Number, + description: 'Completion timestamp of the token pool download.', + }) + readonly completedAt?: number; + + @ApiProperty({ + required: false, + type: Number, + description: 'Failure timestamp of the token pool download.', + }) + readonly failedAt?: number; + + @ApiProperty({ + required: false, + enum: TokenPoolDownloadStage, + description: 'Current execution stage of the token pool download.', + }) + readonly stage?: TokenPoolDownloadStage; + + @ApiProperty({ + required: false, + type: Object, + description: 'Latest persisted progress details for the token pool download.', + }) + readonly progress?: Record; + + @ApiProperty({ + description: 'How many times the token pool download has been claimed.', + }) + readonly attemptCount: number; + + @ApiProperty({ + description: 'How many times the snapshot has failed before.', + }) + readonly failureCount: number; + + @ApiProperty({ + required: false, + type: Number, + description: 'Timestamp of the previous failure recorded for this snapshot.', + }) + readonly lastFailureAt?: number; + + @ApiProperty({ + required: false, + description: 'Reason for the previous failure recorded for this snapshot.', + }) + readonly lastFailureReason?: string | null; + + @ApiProperty({ + description: 'Whether the API considers the job stale.', + }) + readonly stale: boolean; + + @ApiProperty({ + required: false, + description: 'Failure or stale-state reason for the token pool download.', + }) + readonly errorReason?: string | null; + + @ApiProperty({ + description: 'Whether the UI may offer an in-place retry for this snapshot.', + }) + readonly retryable: boolean; } diff --git a/src/api/token-pool-download/token-pool-download.controller.ts b/src/api/token-pool-download/token-pool-download.controller.ts index ba24df8..4a1666f 100644 --- a/src/api/token-pool-download/token-pool-download.controller.ts +++ b/src/api/token-pool-download/token-pool-download.controller.ts @@ -62,4 +62,21 @@ export class TokenPoolDownloadController { tokenPoolId, }); } + + @ApiOperation({ + summary: 'Retry token pool download', + }) + @ApiOkResponse({ + type: TokenPoolDownloadResponseApiModel, + }) + @Post('token-pool/:tokenPoolId/retry') + async retry( + @Param('allowlistId') allowlistId: string, + @Param('tokenPoolId') tokenPoolId: string, + ): Promise { + return await this.tokenPoolDownloadService.retry({ + allowlistId, + tokenPoolId, + }); + } } diff --git a/src/api/token-pool-download/token-pool-download.service.spec.ts b/src/api/token-pool-download/token-pool-download.service.spec.ts new file mode 100644 index 0000000..ed6db69 --- /dev/null +++ b/src/api/token-pool-download/token-pool-download.service.spec.ts @@ -0,0 +1,242 @@ +import { TokenPoolDownloadService } from './token-pool-download.service'; +import { TokenPoolDownloadStatus } from '../../repository/token-pool-download/token-pool-download-status'; +import { TokenPoolDownloadStage } from '../../repository/token-pool-download/token-pool-download-stage'; + +describe('TokenPoolDownloadService', () => { + afterEach(() => { + delete process.env.TOKEN_POOL_DOWNLOAD_STALE_AFTER_MS; + jest.restoreAllMocks(); + }); + + it('maps stale downloads to failed while preserving the raw status', async () => { + process.env.TOKEN_POOL_DOWNLOAD_STALE_AFTER_MS = '1200000'; + jest.spyOn(Date, 'now').mockReturnValue(2_500_000); + const tokenPoolDownloadRepository = { + getByAllowlistId: jest.fn().mockResolvedValue([ + { + contract: '0xabc', + token_ids: '1-2', + token_pool_id: 'pool-1', + allowlist_id: 'allowlist-1', + block_no: 123, + consolidate_block_no: null, + status: TokenPoolDownloadStatus.CLAIMED, + created_at: BigInt(1000), + updated_at: BigInt(1000), + claimed_at: BigInt(1000), + last_heartbeat_at: BigInt(1000), + attempt_count: 2, + stage: TokenPoolDownloadStage.INDEXING_SINGLE, + progress: JSON.stringify({ latestFetchedBlockNo: 99 }), + }, + ]), + }; + const service = new TokenPoolDownloadService( + tokenPoolDownloadRepository as any, + {} as any, + {} as any, + {} as any, + {} as any, + ); + + const response = await service.getByAllowlistId('allowlist-1'); + + expect(response).toEqual([ + expect.objectContaining({ + tokenPoolId: 'pool-1', + status: TokenPoolDownloadStatus.FAILED, + rawStatus: TokenPoolDownloadStatus.CLAIMED, + stale: true, + attemptCount: 2, + failureCount: 0, + retryable: true, + progress: { latestFetchedBlockNo: 99 }, + }), + ]); + expect(response[0].errorReason).toContain( + 'has not reported progress for more than 20 minutes', + ); + }); + + it('does not stale legacy claimed rows without timestamps', async () => { + process.env.TOKEN_POOL_DOWNLOAD_STALE_AFTER_MS = '1200000'; + jest.spyOn(Date, 'now').mockReturnValue(2_500_000); + const tokenPoolDownloadRepository = { + getByAllowlistId: jest.fn().mockResolvedValue([ + { + contract: '0xabc', + token_pool_id: 'pool-legacy', + allowlist_id: 'allowlist-1', + block_no: 123, + consolidate_block_no: null, + status: TokenPoolDownloadStatus.CLAIMED, + }, + ]), + }; + const service = new TokenPoolDownloadService( + tokenPoolDownloadRepository as any, + {} as any, + {} as any, + {} as any, + {} as any, + ); + + const response = await service.getByAllowlistId('allowlist-1'); + + expect(response).toEqual([ + expect.objectContaining({ + tokenPoolId: 'pool-legacy', + status: TokenPoolDownloadStatus.CLAIMED, + rawStatus: TokenPoolDownloadStatus.CLAIMED, + stale: false, + retryable: false, + }), + ]); + }); + + it('retries a stale snapshot in place and records the stale failure history', async () => { + process.env.TOKEN_POOL_DOWNLOAD_STALE_AFTER_MS = '1200000'; + jest.spyOn(Date, 'now').mockReturnValue(2_500_000); + const entity = { + contract: '0xabc', + token_pool_id: 'pool-1', + allowlist_id: 'allowlist-1', + block_no: 123, + consolidate_block_no: null, + status: TokenPoolDownloadStatus.CLAIMED, + created_at: BigInt(1000), + updated_at: BigInt(1000), + claimed_at: BigInt(1000), + last_heartbeat_at: BigInt(1000), + attempt_count: 2, + failure_count: 1, + stage: TokenPoolDownloadStage.INDEXING_SINGLE, + }; + const tokenPoolDownloadRepository = { + getByAllowlistId: jest.fn(), + getByTokenPoolId: jest + .fn() + .mockResolvedValueOnce(entity) + .mockResolvedValueOnce({ + ...entity, + status: TokenPoolDownloadStatus.PENDING, + created_at: BigInt(2_500_000), + updated_at: BigInt(2_500_000), + last_heartbeat_at: BigInt(2_500_000), + failure_count: 2, + last_failure_reason: + 'Token pool download has not reported progress for more than 20 minutes while in stage INDEXING_SINGLE', + }), + recordFailureHistory: jest.fn().mockResolvedValue(undefined), + }; + const allowlistOperationRepository = { + getAllowlistOperationsByCode: jest.fn().mockResolvedValue([ + { + params: JSON.stringify({ + id: 'pool-1', + contract: '0xabc', + blockNo: 123, + consolidateBlockNo: null, + }), + }, + ]), + }; + const tokenPoolAsyncDownloader = { + start: jest.fn().mockResolvedValue(undefined), + }; + const service = new TokenPoolDownloadService( + tokenPoolDownloadRepository as any, + {} as any, + {} as any, + allowlistOperationRepository as any, + tokenPoolAsyncDownloader as any, + ); + + const response = await service.retry({ + allowlistId: 'allowlist-1', + tokenPoolId: 'pool-1', + }); + + expect(tokenPoolDownloadRepository.recordFailureHistory).toHaveBeenCalled(); + expect(tokenPoolAsyncDownloader.start).toHaveBeenCalledWith({ + config: { + tokenPoolId: 'pool-1', + tokenIds: undefined, + contract: '0xabc', + blockNo: 123, + consolidateBlockNo: null, + allowlistId: 'allowlist-1', + }, + state: { + runsCount: 0, + startingBlocks: [], + }, + }); + expect(response).toEqual( + expect.objectContaining({ + tokenPoolId: 'pool-1', + status: TokenPoolDownloadStatus.PENDING, + failureCount: 2, + }), + ); + }); + + it('throws if a retry cannot refetch the token pool download row', async () => { + process.env.TOKEN_POOL_DOWNLOAD_STALE_AFTER_MS = '1200000'; + jest.spyOn(Date, 'now').mockReturnValue(2_500_000); + const entity = { + contract: '0xabc', + token_pool_id: 'pool-1', + allowlist_id: 'allowlist-1', + block_no: 123, + consolidate_block_no: null, + status: TokenPoolDownloadStatus.FAILED, + created_at: BigInt(1000), + updated_at: BigInt(1000), + claimed_at: BigInt(1000), + last_heartbeat_at: BigInt(1000), + attempt_count: 2, + failure_count: 1, + stage: TokenPoolDownloadStage.FAILED, + }; + const tokenPoolDownloadRepository = { + getByAllowlistId: jest.fn(), + getByTokenPoolId: jest + .fn() + .mockResolvedValueOnce(entity) + .mockResolvedValueOnce(null), + recordFailureHistory: jest.fn().mockResolvedValue(undefined), + }; + const allowlistOperationRepository = { + getAllowlistOperationsByCode: jest.fn().mockResolvedValue([ + { + params: JSON.stringify({ + id: 'pool-1', + contract: '0xabc', + blockNo: 123, + consolidateBlockNo: null, + }), + }, + ]), + }; + const tokenPoolAsyncDownloader = { + start: jest.fn().mockResolvedValue(undefined), + }; + const service = new TokenPoolDownloadService( + tokenPoolDownloadRepository as any, + {} as any, + {} as any, + allowlistOperationRepository as any, + tokenPoolAsyncDownloader as any, + ); + + await expect( + service.retry({ + allowlistId: 'allowlist-1', + tokenPoolId: 'pool-1', + }), + ).rejects.toThrow( + 'Token pool download with ID pool-1 no longer exists after retry', + ); + }); +}); diff --git a/src/api/token-pool-download/token-pool-download.service.ts b/src/api/token-pool-download/token-pool-download.service.ts index 17156f2..696b58b 100644 --- a/src/api/token-pool-download/token-pool-download.service.ts +++ b/src/api/token-pool-download/token-pool-download.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { BadRequestException, Injectable, NotFoundException } from '@nestjs/common'; import { TokenPoolDownloadEntity } from '../../repository/token-pool-download/token-pool-download.entity'; import { TokenPoolDownloadResponseApiModel } from './model/token-pool-download-response-api.model'; import { TokenPoolDownloadRepository } from '../../repository/token-pool-download/token-pool-download.repository'; @@ -7,6 +7,12 @@ import { TokenPoolTokenRepository } from '../../repository/token-pool-token/toke import { TokenPoolDownloadTokenPoolUniqueWalletsCountRequestApiModel } from './model/token-pool-download-token-pool-unique-wallets-count-request-api.model'; import { Pool } from '@6529-collections/allowlist-lib/app-types'; import { TokenPoolDownloadTokenResponseApiModel } from './model/token-pool-download-token-response-api.model'; +import { TokenPoolDownloadStatus } from '../../repository/token-pool-download/token-pool-download-status'; +import { bigInt2Number } from '../../app.utils'; +import { Time } from '../../time'; +import { AllowlistOperationRepository } from '../../repository/allowlist-operation/allowlist-operation.repository'; +import { TokenPoolAsyncDownloader } from '../../token-pool/token-pool-async-downloader'; +import { AllowlistOperationCode } from '@6529-collections/allowlist-lib/allowlist/allowlist-operation-code'; @Injectable() export class TokenPoolDownloadService { @@ -14,6 +20,8 @@ export class TokenPoolDownloadService { private readonly tokenPoolDownloadRepository: TokenPoolDownloadRepository, private readonly componentWinners: PhaseComponentWinnerRepository, private readonly tokenPoolTokenRepository: TokenPoolTokenRepository, + private readonly allowlistOperationRepository: AllowlistOperationRepository, + private readonly tokenPoolAsyncDownloader: TokenPoolAsyncDownloader, ) {} async getByAllowlistId( @@ -22,7 +30,7 @@ export class TokenPoolDownloadService { const entity = await this.tokenPoolDownloadRepository.getByAllowlistId( allowlistId, ); - return entity.map(this.entityToApiModel); + return entity.map((download) => this.entityToApiModel(download)); } async getTokenPoolUniqueWalletsCount({ @@ -86,17 +94,187 @@ export class TokenPoolDownloadService { return tokenPoolWallets.size; } + async retry({ + allowlistId, + tokenPoolId, + }: { + allowlistId: string; + tokenPoolId: string; + }): Promise { + const entity = await this.tokenPoolDownloadRepository.getByTokenPoolId({ + allowlistId, + tokenPoolId, + }); + if (!entity) { + throw new NotFoundException( + `Token pool download with ID ${tokenPoolId} does not exist`, + ); + } + + const stale = this.isStale(entity); + if (entity.status !== TokenPoolDownloadStatus.FAILED && !stale) { + throw new BadRequestException( + `Token pool download with ID ${tokenPoolId} cannot be retried in status ${entity.status}`, + ); + } + + if (entity.status !== TokenPoolDownloadStatus.FAILED) { + await this.tokenPoolDownloadRepository.recordFailureHistory({ + tokenPoolId, + failureReason: this.getStaleReason(entity), + }); + } + + const operation = await this.getCreateTokenPoolOperation({ + allowlistId, + tokenPoolId, + }); + await this.tokenPoolAsyncDownloader.start({ + config: { + tokenPoolId, + tokenIds: operation.params.tokenIds, + contract: operation.params.contract, + blockNo: operation.params.blockNo, + consolidateBlockNo: operation.params.consolidateBlockNo ?? null, + allowlistId, + }, + state: { + runsCount: 0, + startingBlocks: [], + }, + }); + + const refreshed = await this.tokenPoolDownloadRepository.getByTokenPoolId({ + allowlistId, + tokenPoolId, + }); + if (!refreshed) { + throw new NotFoundException( + `Token pool download with ID ${tokenPoolId} no longer exists after retry`, + ); + } + return this.entityToApiModel(refreshed); + } + private entityToApiModel( entity: TokenPoolDownloadEntity, ): TokenPoolDownloadResponseApiModel { + const rawStatus = entity.status; + const stale = this.isStale(entity); + const failureCount = + entity.failure_count ?? + (rawStatus === TokenPoolDownloadStatus.FAILED ? 1 : 0); + const errorReason = + entity.error_reason ?? (stale ? this.getStaleReason(entity) : null); return { contract: entity.contract, tokenIds: entity.token_ids, tokenPoolId: entity.token_pool_id, allowlistId: entity.allowlist_id, blockNo: entity.block_no, - status: entity.status, + status: stale ? TokenPoolDownloadStatus.FAILED : rawStatus, + rawStatus, consolidateBlockNo: entity.consolidate_block_no, + createdAt: bigInt2Number(entity.created_at), + updatedAt: bigInt2Number(entity.updated_at), + claimedAt: bigInt2Number(entity.claimed_at), + lastHeartbeatAt: bigInt2Number(entity.last_heartbeat_at), + completedAt: bigInt2Number(entity.completed_at), + failedAt: bigInt2Number(entity.failed_at), + stage: entity.stage ?? undefined, + progress: this.parseProgress(entity.progress), + attemptCount: entity.attempt_count ?? 0, + failureCount, + lastFailureAt: + bigInt2Number(entity.last_failure_at) ?? bigInt2Number(entity.failed_at), + lastFailureReason: + entity.last_failure_reason ?? entity.error_reason ?? undefined, + stale, + errorReason, + retryable: stale || rawStatus === TokenPoolDownloadStatus.FAILED, + }; + } + + private parseProgress( + progress?: string | null, + ): Record | undefined { + if (!progress) { + return undefined; + } + try { + return JSON.parse(progress); + } catch (e) { + return { raw: progress }; + } + } + + private isStale(entity: TokenPoolDownloadEntity): boolean { + if ( + ![TokenPoolDownloadStatus.PENDING, TokenPoolDownloadStatus.CLAIMED].includes( + entity.status, + ) + ) { + return false; + } + const referenceTime = + bigInt2Number(entity.last_heartbeat_at) ?? + bigInt2Number(entity.updated_at) ?? + bigInt2Number(entity.claimed_at) ?? + bigInt2Number(entity.created_at); + if (referenceTime === undefined) { + return false; + } + return Time.currentMillis() - referenceTime > this.getStaleAfterMillis(); + } + + private getStaleAfterMillis(): number { + return +( + process.env.TOKEN_POOL_DOWNLOAD_STALE_AFTER_MS ?? + Time.minutes(20).toMillis() + ); + } + + private getStaleReason(entity: TokenPoolDownloadEntity): string { + const minutes = Math.floor(this.getStaleAfterMillis() / 60000); + if (entity.status === TokenPoolDownloadStatus.PENDING) { + return `Token pool download has been pending for more than ${minutes} minutes without new progress`; + } + return `Token pool download has not reported progress for more than ${minutes} minutes while in stage ${entity.stage ?? 'UNKNOWN'}`; + } + + private async getCreateTokenPoolOperation({ + allowlistId, + tokenPoolId, + }: { + allowlistId: string; + tokenPoolId: string; + }): Promise<{ + params: { + tokenIds?: string; + contract: string; + blockNo: number; + consolidateBlockNo?: number | null; + }; + }> { + const operations = + await this.allowlistOperationRepository.getAllowlistOperationsByCode({ + allowlistId, + code: AllowlistOperationCode.CREATE_TOKEN_POOL, + }); + const operation = operations.find((candidate) => { + try { + return JSON.parse(candidate.params)?.id === tokenPoolId; + } catch (e) { + return false; + } + }); + if (!operation) { + throw new NotFoundException( + `Create token pool operation for token pool ${tokenPoolId} does not exist`, + ); + } + return { + params: JSON.parse(operation.params), }; } diff --git a/src/app.utils.spec.ts b/src/app.utils.spec.ts new file mode 100644 index 0000000..f79b28f --- /dev/null +++ b/src/app.utils.spec.ts @@ -0,0 +1,41 @@ +import { bigInt2Number, stringifyError } from './app.utils'; + +describe('bigInt2Number', () => { + it('returns undefined for nullish values and non-finite coercions', () => { + expect(bigInt2Number(undefined)).toBeUndefined(); + expect(bigInt2Number(null)).toBeUndefined(); + expect(bigInt2Number('NaN')).toBeUndefined(); + expect(bigInt2Number('Infinity')).toBeUndefined(); + }); + + it('returns finite numeric coercions', () => { + expect(bigInt2Number('42')).toBe(42); + expect(bigInt2Number(BigInt(42))).toBe(42); + expect(bigInt2Number(42)).toBe(42); + }); + + it('formats nested step-aware errors with metadata and causes', () => { + const error = { + code: 'CREATE_TOKEN_POOL_CONSOLIDATION_FAILED', + message: 'Failed to consolidate token pool pool-1', + metadata: { + tokenPoolId: 'pool-1', + consolidateBlockNo: 24670600, + }, + cause: { + code: 'SEIZE_UPLOAD_JSON_FIELD_PARSE_FAILED', + message: 'Failed to parse JSON field "memes" from Seize /uploads row', + metadata: { + field: 'memes', + sourcePath: '/uploads', + rawValuePrefix: 'undefined', + }, + cause: new SyntaxError('Unexpected token u in JSON at position 0'), + }, + }; + + expect(stringifyError(error)).toBe( + '[CREATE_TOKEN_POOL_CONSOLIDATION_FAILED] Failed to consolidate token pool pool-1 (tokenPoolId="pool-1", consolidateBlockNo=24670600) Cause: [SEIZE_UPLOAD_JSON_FIELD_PARSE_FAILED] Failed to parse JSON field "memes" from Seize /uploads row (field="memes", sourcePath="/uploads", rawValuePrefix="undefined") Cause: Unexpected token u in JSON at position 0', + ); + }); +}); diff --git a/src/app.utils.ts b/src/app.utils.ts index efac07a..f5a829a 100644 --- a/src/app.utils.ts +++ b/src/app.utils.ts @@ -2,11 +2,14 @@ export const isValidMongoId = (id: string): boolean => { return /^[a-f\d]{24}$/i.test(id); }; -export function bigInt2Number(value?: bigint): number | undefined { - if (value) { - return Number(value); +export function bigInt2Number( + value?: bigint | number | string | null, +): number | undefined { + if (value === undefined || value === null) { + return undefined; } - return undefined; + const result = Number(value); + return Number.isFinite(result) ? result : undefined; } export function formatNumberRange( @@ -49,8 +52,35 @@ export function associateByToMap( export function stringifyError(error: any): string { if (typeof error === 'string') { return error; - } else if (error instanceof Error) { - return error.message; + } + + if (error instanceof Error || (error && typeof error === 'object')) { + const code = + typeof error.code === 'string' && error.code.length + ? `[${error.code}] ` + : ''; + const metadata = + error.metadata && typeof error.metadata === 'object' + ? Object.entries(error.metadata) + .filter(([, value]) => value !== undefined) + .map(([key, value]) => `${key}=${JSON.stringify(value)}`) + .join(', ') + : ''; + const details = metadata ? ` (${metadata})` : ''; + const cause = + error.cause !== undefined + ? ` Cause: ${stringifyError(error.cause)}` + : ''; + const message = + typeof error.message === 'string' && error.message.length + ? error.message + : JSON.stringify(error); + + return `${code}${message}${details}${cause}`; + } + + if (error === undefined) { + return 'undefined'; } else { return JSON.stringify(error); } diff --git a/src/repository/token-pool-download/token-pool-download-stage.ts b/src/repository/token-pool-download/token-pool-download-stage.ts new file mode 100644 index 0000000..45b9710 --- /dev/null +++ b/src/repository/token-pool-download/token-pool-download-stage.ts @@ -0,0 +1,12 @@ +export enum TokenPoolDownloadStage { + PREPARING = 'PREPARING', + REQUEUED = 'REQUEUED', + CLAIMED = 'CLAIMED', + CHECKING_ALCHEMY = 'CHECKING_ALCHEMY', + INDEXING_SINGLE = 'INDEXING_SINGLE', + INDEXING_BATCH = 'INDEXING_BATCH', + BUILDING_TOKEN_OWNERS = 'BUILDING_TOKEN_OWNERS', + PERSISTING_RESULTS = 'PERSISTING_RESULTS', + COMPLETED = 'COMPLETED', + FAILED = 'FAILED', +} diff --git a/src/repository/token-pool-download/token-pool-download.entity.ts b/src/repository/token-pool-download/token-pool-download.entity.ts index 686f1da..c93c6cd 100644 --- a/src/repository/token-pool-download/token-pool-download.entity.ts +++ b/src/repository/token-pool-download/token-pool-download.entity.ts @@ -1,4 +1,5 @@ import { TokenPoolDownloadStatus } from './token-pool-download-status'; +import { TokenPoolDownloadStage } from './token-pool-download-stage'; export interface TokenPoolDownloadEntity { readonly contract: string; @@ -8,4 +9,17 @@ export interface TokenPoolDownloadEntity { readonly block_no: number; readonly consolidate_block_no: number | null; readonly status: TokenPoolDownloadStatus; + readonly created_at?: bigint; + readonly updated_at?: bigint; + readonly claimed_at?: bigint; + readonly last_heartbeat_at?: bigint; + readonly completed_at?: bigint; + readonly failed_at?: bigint; + readonly error_reason?: string | null; + readonly failure_count?: number; + readonly last_failure_at?: bigint; + readonly last_failure_reason?: string | null; + readonly attempt_count?: number; + readonly stage?: TokenPoolDownloadStage | null; + readonly progress?: string | null; } diff --git a/src/repository/token-pool-download/token-pool-download.repository.ts b/src/repository/token-pool-download/token-pool-download.repository.ts index 28aa5cd..2d6e138 100644 --- a/src/repository/token-pool-download/token-pool-download.repository.ts +++ b/src/repository/token-pool-download/token-pool-download.repository.ts @@ -3,11 +3,35 @@ import { DB } from '../db'; import { TokenPoolDownloadEntity } from './token-pool-download.entity'; import * as mariadb from 'mariadb'; import { TokenPoolDownloadStatus } from './token-pool-download-status'; +import { Time } from '../../time'; +import { TokenPoolDownloadStage } from './token-pool-download-stage'; @Injectable() export class TokenPoolDownloadRepository { constructor(private readonly db: DB) {} + private readonly selectColumns = `select contract, + token_ids, + token_pool_id, + allowlist_id, + block_no, + status, + consolidate_block_no, + created_at, + updated_at, + claimed_at, + last_heartbeat_at, + completed_at, + failed_at, + error_reason, + failure_count, + last_failure_at, + last_failure_reason, + attempt_count, + stage, + progress + from token_pool_download`; + async delete(tokenPoolId: string) { await this.db.none( `delete from token_pool_download where token_pool_id = ?`, @@ -17,7 +41,48 @@ export class TokenPoolDownloadRepository { async save(entity: TokenPoolDownloadEntity) { await this.db.none( - `insert into token_pool_download (allowlist_id, token_pool_id, contract, token_ids, block_no, status, consolidate_block_no) values (?, ?, ?, ?, ?, ?, ?)`, + `insert into token_pool_download ( + allowlist_id, + token_pool_id, + contract, + token_ids, + block_no, + status, + consolidate_block_no, + created_at, + updated_at, + claimed_at, + last_heartbeat_at, + completed_at, + failed_at, + error_reason, + failure_count, + last_failure_at, + last_failure_reason, + attempt_count, + stage, + progress + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + on duplicate key update + allowlist_id = values(allowlist_id), + contract = values(contract), + token_ids = values(token_ids), + block_no = values(block_no), + status = values(status), + consolidate_block_no = values(consolidate_block_no), + created_at = values(created_at), + updated_at = values(updated_at), + claimed_at = values(claimed_at), + last_heartbeat_at = values(last_heartbeat_at), + completed_at = values(completed_at), + failed_at = values(failed_at), + error_reason = values(error_reason), + failure_count = coalesce(token_pool_download.failure_count, 0), + last_failure_at = token_pool_download.last_failure_at, + last_failure_reason = token_pool_download.last_failure_reason, + attempt_count = values(attempt_count), + stage = values(stage), + progress = values(progress)`, [ entity.allowlist_id, entity.token_pool_id, @@ -26,6 +91,19 @@ export class TokenPoolDownloadRepository { entity.block_no, entity.status, entity.consolidate_block_no, + entity.created_at ?? null, + entity.updated_at ?? null, + entity.claimed_at ?? null, + entity.last_heartbeat_at ?? null, + entity.completed_at ?? null, + entity.failed_at ?? null, + entity.error_reason ?? null, + entity.failure_count ?? 0, + entity.last_failure_at ?? null, + entity.last_failure_reason ?? null, + entity.attempt_count ?? 0, + entity.stage ?? null, + entity.progress ?? null, ], ); } @@ -35,8 +113,7 @@ export class TokenPoolDownloadRepository { options?: { connection?: mariadb.Connection }, ): Promise { const entity = await this.db.one( - `select contract, token_ids, token_pool_id, allowlist_id, block_no, status, consolidate_block_no - from token_pool_download + `${this.selectColumns} where token_pool_download.token_pool_id = ? and token_pool_download.status = ? for update skip locked;`, [tokenPoolId, TokenPoolDownloadStatus.PENDING], @@ -45,38 +122,191 @@ export class TokenPoolDownloadRepository { if (!entity) { return null; } + const now = Time.currentMillis(); await this.db.none( `update token_pool_download - set status = ? + set status = ?, + error_reason = null, + claimed_at = ?, + last_heartbeat_at = ?, + updated_at = ?, + failed_at = null, + completed_at = null, + attempt_count = coalesce(attempt_count, 0) + 1, + stage = ? where token_pool_id = ?;`, - [TokenPoolDownloadStatus.CLAIMED, tokenPoolId], + [ + TokenPoolDownloadStatus.CLAIMED, + now, + now, + now, + TokenPoolDownloadStage.CLAIMED, + tokenPoolId, + ], options, ); - return { ...entity, status: TokenPoolDownloadStatus.CLAIMED }; + return { + ...entity, + status: TokenPoolDownloadStatus.CLAIMED, + error_reason: null, + claimed_at: BigInt(now), + last_heartbeat_at: BigInt(now), + updated_at: BigInt(now), + failed_at: null, + completed_at: null, + attempt_count: (entity.attempt_count ?? 0) + 1, + stage: TokenPoolDownloadStage.CLAIMED, + }; + } + + async getByTokenPoolId({ + allowlistId, + tokenPoolId, + }: { + allowlistId: string; + tokenPoolId: string; + }): Promise { + return this.db.one( + `${this.selectColumns} + where token_pool_download.allowlist_id = ? + and token_pool_download.token_pool_id = ?`, + [allowlistId, tokenPoolId], + ); + } + + async requeue({ + tokenPoolId, + progress, + }: { + tokenPoolId: string; + progress?: string | null; + }) { + const now = Time.currentMillis(); + await this.db.none( + `update token_pool_download + set status = ?, + updated_at = ?, + last_heartbeat_at = ?, + stage = ?, + progress = ?, + error_reason = null, + failed_at = null + where token_pool_id = ?`, + [ + TokenPoolDownloadStatus.PENDING, + now, + now, + TokenPoolDownloadStage.REQUEUED, + progress ?? null, + tokenPoolId, + ], + ); + } + + async updateProgress({ + tokenPoolId, + stage, + progress, + }: { + tokenPoolId: string; + stage: TokenPoolDownloadStage; + progress?: string | null; + }) { + const now = Time.currentMillis(); + await this.db.none( + `update token_pool_download + set updated_at = ?, + last_heartbeat_at = ?, + stage = ?, + progress = ? + where token_pool_id = ?`, + [now, now, stage, progress ?? null, tokenPoolId], + ); + } + + async recordFailureHistory({ + tokenPoolId, + failureReason, + }: { + tokenPoolId: string; + failureReason: string; + }) { + const now = Time.currentMillis(); + await this.db.none( + `update token_pool_download + set failure_count = coalesce(failure_count, 0) + 1, + last_failure_at = ?, + last_failure_reason = ? + where token_pool_id = ?`, + [now, failureReason, tokenPoolId], + ); } async changeStatusToCompleted({ tokenPoolId, connection, + progress, }: { tokenPoolId: string; connection?: mariadb.Connection; + progress?: string | null; }) { + const now = Time.currentMillis(); await this.db.none( `UPDATE token_pool_download - SET status = ? + SET status = ?, + stage = ?, + progress = ?, + error_reason = null, + updated_at = ?, + last_heartbeat_at = ?, + completed_at = ?, + failed_at = null WHERE token_pool_id = ?`, - [TokenPoolDownloadStatus.COMPLETED, tokenPoolId], + [ + TokenPoolDownloadStatus.COMPLETED, + TokenPoolDownloadStage.COMPLETED, + progress ?? null, + now, + now, + now, + tokenPoolId, + ], { connection }, ); } - async changeStatusToError({ tokenPoolId }: { tokenPoolId: string }) { + async changeStatusToError({ + tokenPoolId, + errorReason, + }: { + tokenPoolId: string; + errorReason?: string; + }) { + const now = Time.currentMillis(); await this.db.none( `UPDATE token_pool_download - SET status = ? + SET status = ?, + stage = ?, + error_reason = ?, + failure_count = coalesce(failure_count, 0) + 1, + last_failure_at = ?, + last_failure_reason = ?, + updated_at = ?, + last_heartbeat_at = ?, + failed_at = ? WHERE token_pool_id = ?`, - [TokenPoolDownloadStatus.FAILED, tokenPoolId], + [ + TokenPoolDownloadStatus.FAILED, + TokenPoolDownloadStage.FAILED, + errorReason ?? null, + now, + errorReason ?? null, + now, + now, + now, + tokenPoolId, + ], ); } @@ -84,8 +314,7 @@ export class TokenPoolDownloadRepository { allowlistId: string, ): Promise { return this.db.many( - `select contract, token_ids, token_pool_id, allowlist_id, block_no, status, consolidate_block_no - from token_pool_download + `${this.selectColumns} where token_pool_download.allowlist_id = ?`, [allowlistId], ); diff --git a/src/token-pool/token-pool-async-downloader.ts b/src/token-pool/token-pool-async-downloader.ts index a64dc54..bf66eeb 100644 --- a/src/token-pool/token-pool-async-downloader.ts +++ b/src/token-pool/token-pool-async-downloader.ts @@ -22,14 +22,21 @@ export class TokenPoolAsyncDownloader { consolidateBlockNo, } = config; - await this.tokenPoolDownloaderService.prepare({ - contract, - tokenIds, - tokenPoolId, - allowlistId, - blockNo, - consolidateBlockNo, - }); + if (!state.runsCount && !state.startingBlocks.length) { + await this.tokenPoolDownloaderService.prepare({ + contract, + tokenIds, + tokenPoolId, + allowlistId, + blockNo, + consolidateBlockNo, + }); + } else { + await this.tokenPoolDownloaderService.requeue({ + tokenPoolId, + state, + }); + } const snsTopicArn = process.env.SNS_TOKEN_POOL_DOWNLOADER_TOPIC_ARN; if (snsTopicArn) { await this.snsService.publishMessage({ diff --git a/src/token-pool/token-pool-downloader.service.ts b/src/token-pool/token-pool-downloader.service.ts index 9fd9e7d..cb914af 100644 --- a/src/token-pool/token-pool-downloader.service.ts +++ b/src/token-pool/token-pool-downloader.service.ts @@ -22,6 +22,10 @@ import { TokenPoolDownloaderParams, TokenPoolDownloaderParamsState, } from './token-pool.types'; +import { TokenPoolDownloadStage } from '../repository/token-pool-download/token-pool-download-stage'; +import { stringifyError } from '../app.utils'; + +type TokenPoolExecutionPath = 'FAST' | 'SLOW'; @Injectable() export class TokenPoolDownloaderService { @@ -51,7 +55,7 @@ export class TokenPoolDownloaderService { readonly blockNo: number; readonly consolidateBlockNo: number | null; }) { - await this.tokenPoolDownloadRepository.delete(tokenPoolId); + const now = BigInt(Time.currentMillis()); await this.tokenPoolDownloadRepository.save({ contract, token_ids: tokenIds, @@ -60,6 +64,37 @@ export class TokenPoolDownloaderService { block_no: blockNo, consolidate_block_no: consolidateBlockNo ?? null, status: TokenPoolDownloadStatus.PENDING, + created_at: now, + updated_at: now, + claimed_at: null, + last_heartbeat_at: now, + completed_at: null, + failed_at: null, + error_reason: null, + attempt_count: 0, + stage: TokenPoolDownloadStage.PREPARING, + progress: this.serializeProgress({ + contract, + tokenIds: tokenIds ?? null, + blockNo, + consolidateBlockNo: consolidateBlockNo ?? null, + }), + }); + } + + async requeue({ + tokenPoolId, + state, + }: { + tokenPoolId: string; + state: TokenPoolDownloaderParamsState; + }) { + await this.tokenPoolDownloadRepository.requeue({ + tokenPoolId, + progress: this.serializeProgress({ + runsCount: state.runsCount, + startingBlocks: state.startingBlocks, + }), }); } @@ -69,7 +104,9 @@ export class TokenPoolDownloaderService { const connection = await this.db.getConnection(); try { await connection.beginTransaction(); - const entity = await this.tokenPoolDownloadRepository.claim(tokenPoolId); + const entity = await this.tokenPoolDownloadRepository.claim(tokenPoolId, { + connection, + }); await connection.commit(); return entity; } catch (e) { @@ -82,7 +119,7 @@ export class TokenPoolDownloaderService { async start({ config, state }: TokenPoolDownloaderParams): Promise<{ continue: boolean; - entity: TokenPoolDownloadEntity; + entity: TokenPoolDownloadEntity | null; state: TokenPoolDownloaderParamsState; error?: string; }> { @@ -94,65 +131,97 @@ export class TokenPoolDownloaderService { ); return { continue: false, entity: null, state }; } - const { startingBlocks } = state; - this.logger.log( - `Claimed tokenpool download with id ${tokenPoolId}. Starting...`, - ); - const doableThroughAlchemy = await this.attemptThroughAlchemy(entity); - this.logger.log(`Asking for single type latest block...`); - const singleTypeLatestBlock = - await this.transferRepository.getLatestTransferBlockNo({ - contract: entity.contract, - transferType: 'single', - }); - this.logger.log(`Single type latest block is ${singleTypeLatestBlock}`); - this.logger.log(`Asking for batch type latest block...`); - const batchTypeLatestBlock = - await this.transferRepository.getLatestTransferBlockNo({ - contract: entity.contract, - transferType: 'batch', - }); - if ( - !!startingBlocks.length && - startingBlocks.at(-1)?.single === singleTypeLatestBlock && - startingBlocks.at(-1)?.batch === batchTypeLatestBlock - ) { - this.logger.log(`Already processed this block. Erroring...`); - await this.tokenPoolDownloadRepository.changeStatusToError({ + try { + const { startingBlocks } = state; + this.logger.log( + `Claimed tokenpool download with id ${tokenPoolId}. Starting...`, + ); + await this.updateProgress({ tokenPoolId, + stage: TokenPoolDownloadStage.CHECKING_ALCHEMY, + progress: { + runsCount: state.runsCount, + startingBlocks, + }, }); - return { - continue: false, - entity, - state, - error: `Tried to reprocess already processed block for contract ${entity.contract}`, - }; - } + const doableThroughAlchemy = await this.attemptThroughAlchemy(entity); + this.logger.log(`Asking for single type latest block...`); + const singleTypeLatestBlock = + await this.transferRepository.getLatestTransferBlockNo({ + contract: entity.contract, + transferType: 'single', + }); + this.logger.log(`Single type latest block is ${singleTypeLatestBlock}`); - this.logger.log(`Old single type block: ${startingBlocks.at(-1)?.single}`); - this.logger.log(`New single type block: ${singleTypeLatestBlock}`); - this.logger.log(`Old batch type block: ${startingBlocks.at(-1)?.batch}`); - this.logger.log(`New batch type block: ${batchTypeLatestBlock}`); + this.logger.log(`Asking for batch type latest block...`); + const batchTypeLatestBlock = + await this.transferRepository.getLatestTransferBlockNo({ + contract: entity.contract, + transferType: 'batch', + }); + if ( + !!startingBlocks.length && + startingBlocks.at(-1)?.single === singleTypeLatestBlock && + startingBlocks.at(-1)?.batch === batchTypeLatestBlock + ) { + const error = `Tried to reprocess already processed block for contract ${entity.contract}`; + this.logger.log(`Already processed this block. Erroring...`); + await this.tokenPoolDownloadRepository.changeStatusToError({ + tokenPoolId, + errorReason: error, + }); + return { + continue: false, + entity, + state, + error, + }; + } - startingBlocks.push({ - single: singleTypeLatestBlock, - batch: batchTypeLatestBlock, - }); + this.logger.log(`Old single type block: ${startingBlocks.at(-1)?.single}`); + this.logger.log(`New single type block: ${singleTypeLatestBlock}`); + this.logger.log(`Old batch type block: ${startingBlocks.at(-1)?.batch}`); + this.logger.log(`New batch type block: ${batchTypeLatestBlock}`); - this.logger.log(`Batch type latest block is ${batchTypeLatestBlock}`); - this.logger.log( - `Starting to index with${doableThroughAlchemy ? `` : `out`} Alchemy`, - ); - if (doableThroughAlchemy) { - return this.runOperationsAndFinishUp({ entity, state }); - } else { + startingBlocks.push({ + single: singleTypeLatestBlock, + batch: batchTypeLatestBlock, + }); + + this.logger.log(`Batch type latest block is ${batchTypeLatestBlock}`); + this.logger.log( + `Starting to index with${doableThroughAlchemy ? `` : `out`} Alchemy`, + ); + if (doableThroughAlchemy) { + return this.runOperationsAndFinishUp({ + entity, + state, + executionPath: 'FAST', + }); + } return await this.doWithoutAlchemy( entity, singleTypeLatestBlock, state, batchTypeLatestBlock, ); + } catch (e) { + const error = stringifyError(e); + this.logger.error( + `Tokenpool download with id ${tokenPoolId} failed`, + error, + ); + await this.tokenPoolDownloadRepository.changeStatusToError({ + tokenPoolId, + errorReason: error, + }); + return { + continue: false, + entity, + state, + error, + }; } } @@ -166,6 +235,20 @@ export class TokenPoolDownloaderService { await this.allowlistCreator.etherscanService.getContractSchema({ contractAddress: entity.contract, }); + await this.updateProgress({ + tokenPoolId: entity.token_pool_id, + stage: + schema === ContractSchema.ERC1155 + ? TokenPoolDownloadStage.INDEXING_BATCH + : TokenPoolDownloadStage.INDEXING_SINGLE, + progress: { + executionPath: 'SLOW', + schema, + latestSingleBlockNo: singleTypeLatestBlock, + latestBatchBlockNo: batchTypeLatestBlock, + targetBlockNo: entity.block_no, + }, + }); this.logger.log(`${entity.contract} schema: ${schema}`); if ([ContractSchema.ERC721, ContractSchema.ERC721Old].includes(schema)) { return this.doTransferTypeSingle( @@ -216,7 +299,11 @@ export class TokenPoolDownloaderService { if (job.continue) { return job; } - return this.runOperationsAndFinishUp({ entity, state }); + return this.runOperationsAndFinishUp({ + entity, + state, + executionPath: 'SLOW', + }); }); } @@ -237,7 +324,11 @@ export class TokenPoolDownloaderService { if (job.continue || skipFinishUp) { return job; } - return this.runOperationsAndFinishUp({ entity, state }); + return this.runOperationsAndFinishUp({ + entity, + state, + executionPath: 'SLOW', + }); }); } @@ -281,6 +372,20 @@ export class TokenPoolDownloaderService { this.logger.log( `Starting fetch transfers through etherscan ${JSON.stringify(params)}`, ); + await this.updateProgress({ + tokenPoolId: entity.token_pool_id, + stage: + transferType === 'single' + ? TokenPoolDownloadStage.INDEXING_SINGLE + : TokenPoolDownloadStage.INDEXING_BATCH, + progress: { + executionPath: 'SLOW', + transferType, + currentBlockNo: latestBlockNo, + startingBlock: latestBlockNo, + targetBlockNo: entity.block_no, + }, + }); for await (const transfers of this.allowlistCreator.etherscanService.getTransfers( params, )) { @@ -291,6 +396,28 @@ export class TokenPoolDownloaderService { entity.contract, transfers, ); + await this.updateProgress({ + tokenPoolId: entity.token_pool_id, + stage: + transferType === 'single' + ? TokenPoolDownloadStage.INDEXING_SINGLE + : TokenPoolDownloadStage.INDEXING_BATCH, + progress: { + executionPath: 'SLOW', + transferType, + startingBlock: latestBlockNo, + currentBlockNo: transfers.reduce( + (acc, transfer) => Math.max(acc, transfer.blockNumber), + latestBlockNo, + ), + latestFetchedBlockNo: transfers.reduce( + (acc, transfer) => Math.max(acc, transfer.blockNumber), + latestBlockNo, + ), + targetBlockNo: entity.block_no, + transfersPersisted: transfers.length, + }, + }); if (this.isTimeUp(start)) { this.logger.log( `Exceeded the 5 minute timeout. Marking as in progress and rescheduling...`, @@ -298,6 +425,20 @@ export class TokenPoolDownloaderService { return { continue: true, entity, state }; } } + await this.updateProgress({ + tokenPoolId: entity.token_pool_id, + stage: + transferType === 'single' + ? TokenPoolDownloadStage.INDEXING_SINGLE + : TokenPoolDownloadStage.INDEXING_BATCH, + progress: { + executionPath: 'SLOW', + transferType, + startingBlock: latestBlockNo, + currentBlockNo: entity.block_no, + targetBlockNo: entity.block_no, + }, + }); return { continue: false, entity, state }; } @@ -308,9 +449,11 @@ export class TokenPoolDownloaderService { private async runOperationsAndFinishUp({ entity, state, + executionPath, }: { entity: TokenPoolDownloadEntity; state: TokenPoolDownloaderParamsState; + executionPath: TokenPoolExecutionPath; }): Promise<{ continue: boolean; entity: TokenPoolDownloadEntity; @@ -321,6 +464,18 @@ export class TokenPoolDownloaderService { this.logger.log( `Running operations and finishing up for tokenpool ${tokenPoolId}`, ); + await this.updateProgress({ + tokenPoolId, + stage: TokenPoolDownloadStage.BUILDING_TOKEN_OWNERS, + progress: { + executionPath, + source: 'allowlist-lib', + contract: entity.contract, + currentBlockNo: entity.block_no, + blockNo: entity.block_no, + targetBlockNo: entity.block_no, + }, + }); const mockAllowlistId = randomUUID(); const mockPoolId = randomUUID(); const allowlistOpParams: DescribableEntity = { @@ -353,6 +508,7 @@ export class TokenPoolDownloaderService { console.error(`Persisting state for token pool ${tokenPoolId} failed`, e); await this.tokenPoolDownloadRepository.changeStatusToError({ tokenPoolId, + errorReason: stringifyError(e), }); throw e; } @@ -363,20 +519,41 @@ export class TokenPoolDownloaderService { ); try { await con.beginTransaction(); + const ownerships = Object.values(allowlistState.tokenPools).flatMap( + (tokenPool) => + tokenPool.tokens.map((token) => ({ + ownership: token, + tokenPoolId: entity.token_pool_id, + })), + ); + await this.updateProgress({ + tokenPoolId, + stage: TokenPoolDownloadStage.PERSISTING_RESULTS, + progress: { + executionPath, + currentBlockNo: entity.block_no, + targetBlockNo: entity.block_no, + tokenOwnershipsCount: ownerships.length, + uniqueWalletsCount: new Set( + ownerships.map((ownership) => ownership.ownership.owner), + ).size, + }, + }); await this.persistTokenOwnerships({ - ownerships: Object.values(allowlistState.tokenPools).flatMap( - (tokenPool) => - tokenPool.tokens.map((token) => ({ - ownership: token, - tokenPoolId: entity.token_pool_id, - })), - ), + ownerships, allowlistId: entity.allowlist_id, connection: con, }); await this.tokenPoolDownloadRepository.changeStatusToCompleted({ tokenPoolId, connection: con, + progress: this.serializeProgress({ + executionPath, + tokenOwnershipsCount: ownerships.length, + uniqueWalletsCount: new Set( + ownerships.map((ownership) => ownership.ownership.owner), + ).size, + }), }); await con.commit(); this.logger.log(`Finished tokenpool download with id ${tokenPoolId}.`); @@ -391,6 +568,7 @@ export class TokenPoolDownloaderService { console.error(`Persisting state for token pool ${tokenPoolId} failed`, e); await this.tokenPoolDownloadRepository.changeStatusToError({ tokenPoolId, + errorReason: stringifyError(e), }); return { continue: false, @@ -435,4 +613,27 @@ export class TokenPoolDownloaderService { ); await this.tokenPoolTokenRepository.insert(entities, { connection }); } + + private serializeProgress(progress?: Record): string | null { + if (!progress) { + return null; + } + return JSON.stringify(progress); + } + + private async updateProgress({ + tokenPoolId, + stage, + progress, + }: { + tokenPoolId: string; + stage: TokenPoolDownloadStage; + progress?: Record; + }) { + await this.tokenPoolDownloadRepository.updateProgress({ + tokenPoolId, + stage, + progress: this.serializeProgress(progress), + }); + } } diff --git a/yarn.lock b/yarn.lock index ecb913d..6934df4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10,10 +10,10 @@ d "1" es5-ext "^0.10.47" -"@6529-collections/allowlist-lib@0.0.130": - version "0.0.130" - resolved "https://npm.pkg.github.com/download/@6529-collections/allowlist-lib/0.0.130/ad5a9935803deb85ebb7cd6937683b9439647545#ad5a9935803deb85ebb7cd6937683b9439647545" - integrity sha512-2NdkI2WaYCp3Kx49QLHUHWz0Kx3+L15P1/QcU2SkVsaqeC0eL/IoC/kCF7tNP8aR8dvdMVdOmQs72LWRwTO7RQ== +"@6529-collections/allowlist-lib@0.0.131": + version "0.0.131" + resolved "https://npm.pkg.github.com/download/@6529-collections/allowlist-lib/0.0.131/6517eff53987023aa313c09b33626c1f2901ec58#6517eff53987023aa313c09b33626c1f2901ec58" + integrity sha512-iIF2hzbvW/vPAh6EEmc0kt5IYdW0bQftFEzLdWcsZlwhQb2G6DlODkrLwAr6qpT/iXdXwMYZl3eJ07iKA0AhMg== dependencies: alchemy-sdk "^2.10.1" axios "^1.6.2"