From ddf5ba73b508a3edcae1e4cd999d57ee09883ec3 Mon Sep 17 00:00:00 2001 From: GelatoGenesis Date: Fri, 13 Mar 2026 10:23:19 +0200 Subject: [PATCH] better tokenpool downloading Signed-off-by: GelatoGenesis --- README.md | 4 +- ...allowlist-lib-execution-context.service.ts | 35 +++ ...allowlist-lib-log-listener.service.spec.ts | 27 ++ .../allowlist-lib-log-listener.service.ts | 21 +- .../allowlist-lib-seize-timeout-patch.spec.ts | 141 +++++++++ .../allowlist-lib-seize-timeout-patch.ts | 273 ++++++++++++++++++ src/allowlist-lib/allowlist-lib.module.ts | 30 +- .../token-pool-downloader.service.spec.ts | 54 ++++ .../token-pool-downloader.service.ts | 40 ++- 9 files changed, 607 insertions(+), 18 deletions(-) create mode 100644 src/allowlist-lib/allowlist-lib-execution-context.service.ts create mode 100644 src/allowlist-lib/allowlist-lib-log-listener.service.spec.ts create mode 100644 src/allowlist-lib/allowlist-lib-seize-timeout-patch.spec.ts create mode 100644 src/allowlist-lib/allowlist-lib-seize-timeout-patch.ts create mode 100644 src/token-pool/token-pool-downloader.service.spec.ts diff --git a/README.md b/README.md index dfdfb2d..4adf653 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ ALLOWLIST_DB_PASSWORD=allowlist ALLOWLIST_ETHERSCAN_API_KEY= ALLOWLIST_SEIZE_API_PATH= (don't put a slash in the end. https://api.6529.io/api for example) ALLOWLIST_SEIZE_API_KEY= (can omit if using only public endpoints) +ALLOWLIST_SEIZE_METADATA_TIMEOUT_MS=10000 (optional) +ALLOWLIST_ARWEAVE_DOWNLOAD_TIMEOUT_MS=30000 (optional) ``` To install app dependencies run `yarn` @@ -65,4 +67,4 @@ In production the app is ran in 3 lambas: 1. API lambda - Serves all API requests (entrypoint: `src/api-lambda.ts/handle`) 2. Worker lambda - Does the actual final allowlist creation (entrypoint: `src/worker-lambda.ts/handle`) -3. Tokenpool downloader lambda - Helps to get aggregated tokenpool data needed for worker lambda (entrypoint: `src/token-pool-downloader-lambda.ts/handle`) \ No newline at end of file +3. Tokenpool downloader lambda - Helps to get aggregated tokenpool data needed for worker lambda (entrypoint: `src/token-pool-downloader-lambda.ts/handle`) diff --git a/src/allowlist-lib/allowlist-lib-execution-context.service.ts b/src/allowlist-lib/allowlist-lib-execution-context.service.ts new file mode 100644 index 0000000..17d08fa --- /dev/null +++ b/src/allowlist-lib/allowlist-lib-execution-context.service.ts @@ -0,0 +1,35 @@ +import { Injectable } from '@nestjs/common'; +import { AsyncLocalStorage } from 'async_hooks'; + +export interface AllowlistLibExecutionContext { + readonly tokenPoolId: string; + readonly contract: string; + readonly blockNo: number; + readonly consolidateBlockNo: number | null; +} + +@Injectable() +export class AllowlistLibExecutionContextService { + private readonly storage = + new AsyncLocalStorage(); + + run( + context: AllowlistLibExecutionContext, + callback: () => Promise | T, + ): Promise | T { + return this.storage.run(context, callback); + } + + getLogPrefix(): string { + const context = this.storage.getStore(); + if (!context) { + return ''; + } + return [ + `tokenPoolId=${context.tokenPoolId}`, + `contract=${context.contract}`, + `blockNo=${context.blockNo}`, + `consolidateBlockNo=${context.consolidateBlockNo ?? 'null'}`, + ].join(' '); + } +} diff --git a/src/allowlist-lib/allowlist-lib-log-listener.service.spec.ts b/src/allowlist-lib/allowlist-lib-log-listener.service.spec.ts new file mode 100644 index 0000000..609b3d2 --- /dev/null +++ b/src/allowlist-lib/allowlist-lib-log-listener.service.spec.ts @@ -0,0 +1,27 @@ +import { Logger } from '@nestjs/common'; +import { AllowlistLibExecutionContextService } from './allowlist-lib-execution-context.service'; +import { AllowlistLibLogListener } from './allowlist-lib-log-listener.service'; + +describe('AllowlistLibLogListener', () => { + it('prefixes messages with token pool context when present', () => { + const executionContext = new AllowlistLibExecutionContextService(); + const listener = new AllowlistLibLogListener(executionContext); + const logSpy = jest.spyOn(Logger.prototype, 'log').mockImplementation(); + + executionContext.run( + { + tokenPoolId: 'pool-1', + contract: '0xabc', + blockNo: 42, + consolidateBlockNo: 99, + }, + () => listener.info('Downloading from URL: https://arweave.net/example'), + ); + + expect(logSpy).toHaveBeenCalledWith( + '[tokenPoolId=pool-1 contract=0xabc blockNo=42 consolidateBlockNo=99] Downloading from URL: https://arweave.net/example', + ); + + logSpy.mockRestore(); + }); +}); diff --git a/src/allowlist-lib/allowlist-lib-log-listener.service.ts b/src/allowlist-lib/allowlist-lib-log-listener.service.ts index 319117d..85ac64e 100644 --- a/src/allowlist-lib/allowlist-lib-log-listener.service.ts +++ b/src/allowlist-lib/allowlist-lib-log-listener.service.ts @@ -1,23 +1,36 @@ import { Injectable, Logger } from '@nestjs/common'; import { LogListener } from '@6529-collections/allowlist-lib/logging/logging-emitter'; +import { AllowlistLibExecutionContextService } from './allowlist-lib-execution-context.service'; const nestJsLogger = new Logger('allowlist-lib'); @Injectable() export class AllowlistLibLogListener implements LogListener { + constructor( + private readonly executionContext: AllowlistLibExecutionContextService, + ) {} + debug(message: string): void { - nestJsLogger.debug(message); + nestJsLogger.debug(this.withContext(message)); } error(message: string): void { - nestJsLogger.error(message); + nestJsLogger.error(this.withContext(message)); } info(message: string): void { - nestJsLogger.log(message); + nestJsLogger.log(this.withContext(message)); } warn(message: string): void { - nestJsLogger.warn(message); + nestJsLogger.warn(this.withContext(message)); + } + + private withContext(message: string) { + const prefix = this.executionContext.getLogPrefix(); + if (!prefix) { + return message; + } + return `[${prefix}] ${message}`; } } diff --git a/src/allowlist-lib/allowlist-lib-seize-timeout-patch.spec.ts b/src/allowlist-lib/allowlist-lib-seize-timeout-patch.spec.ts new file mode 100644 index 0000000..bc46ce9 --- /dev/null +++ b/src/allowlist-lib/allowlist-lib-seize-timeout-patch.spec.ts @@ -0,0 +1,141 @@ +import { AllowlistCreator } from '@6529-collections/allowlist-lib/allowlist/allowlist-creator'; +import { + LoggerFactory, + LogListener, +} from '@6529-collections/allowlist-lib/logging/logging-emitter'; +import { SeizeApi } from '@6529-collections/allowlist-lib/services/seize/seize.api'; +import axios from 'axios'; +import { + parseTimeoutMs, + patchAllowlistCreatorSeizeApi, +} from './allowlist-lib-seize-timeout-patch'; + +const minimalTdhUploadContents = + 'wallet,ens,consolidation_key,consolidation_display,block,date,total_balance,boosted_tdh,tdh_rank,tdh,tdh__raw,boost,memes_balance,unique_memes,memes_cards_sets,memes_cards_sets_minus1,memes_cards_sets_minus2,genesis,nakamoto,boosted_memes_tdh,memes_tdh,memes_tdh__raw,tdh_rank_memes,memes,gradients_balance,boosted_gradients_tdh,gradients_tdh,gradients_tdh__raw,tdh_rank_gradients,gradients,nextgen_balance,boosted_nextgen_tdh,nextgen_tdh,nextgen_tdh__raw,nextgen\n' + + '0xAbC,test.eth,key,display,1,20240101,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,1,[],0,0,0,0,1,[],0,0,0,0,[]\n'; + +class TestLogListener implements LogListener { + readonly infoMessages: string[] = []; + readonly errorMessages: string[] = []; + + debug = (): void => {}; + + error = (message: string): void => { + this.errorMessages.push(message); + }; + + info = (message: string): void => { + this.infoMessages.push(message); + }; + + warn = (): void => {}; +} + +describe('patchAllowlistCreatorSeizeApi', () => { + const axiosGetSpy = jest.spyOn(axios, 'get'); + const axiosIsAxiosErrorSpy = jest.spyOn(axios, 'isAxiosError'); + + beforeEach(() => { + jest.resetAllMocks(); + axiosIsAxiosErrorSpy.mockImplementation( + (value: unknown): value is Error => + !!value && typeof value === 'object' && 'isAxiosError' in value, + ); + }); + + afterAll(() => { + axiosGetSpy.mockRestore(); + axiosIsAxiosErrorSpy.mockRestore(); + }); + + it('falls back to the configured default timeout when env is invalid', () => { + expect(parseTimeoutMs(undefined, 10000)).toBe(10000); + expect(parseTimeoutMs('not-a-number', 10000)).toBe(10000); + expect(parseTimeoutMs('5000', 10000)).toBe(5000); + }); + + it('surfaces Seize metadata timeouts with the configured timeout value', async () => { + const listener = new TestLogListener(); + const seizeApi = new SeizeApi({} as any, 'https://www.example.com/api'); + const allowlistCreator = { seizeApi } as unknown as AllowlistCreator; + + patchAllowlistCreatorSeizeApi({ + allowlistCreator, + loggerFactory: new LoggerFactory(listener), + seizeMetadataTimeoutMs: 10, + arweaveDownloadTimeoutMs: 25, + }); + + axiosGetSpy.mockRejectedValueOnce({ + isAxiosError: true, + code: 'ECONNABORTED', + message: 'timeout of 10ms exceeded', + }); + + await expect( + (seizeApi as any).getDataForBlock({ path: '/uploads', blockId: 123 }), + ).rejects.toThrow( + 'Seize metadata fetch timed out after 10ms for https://www.example.com/api/uploads?block=123&page_size=1', + ); + expect(listener.infoMessages).toEqual( + expect.arrayContaining([ + expect.stringContaining( + 'Fetching Seize metadata from https://www.example.com/api/uploads?block=123&page_size=1', + ), + ]), + ); + }); + + it('logs and falls back to the next Arweave gateway after a timeout', async () => { + const listener = new TestLogListener(); + const seizeApi = new SeizeApi({} as any, 'https://www.example.com/api'); + const allowlistCreator = { seizeApi } as unknown as AllowlistCreator; + + patchAllowlistCreatorSeizeApi({ + allowlistCreator, + loggerFactory: new LoggerFactory(listener), + seizeMetadataTimeoutMs: 10, + arweaveDownloadTimeoutMs: 25, + }); + + axiosGetSpy + .mockResolvedValueOnce({ + data: { + data: [ + { + block: 17531454, + url: 'https://arweave.net/abc123', + }, + ], + }, + }) + .mockRejectedValueOnce({ + isAxiosError: true, + code: 'ECONNABORTED', + message: 'timeout of 25ms exceeded', + }) + .mockResolvedValueOnce({ + data: minimalTdhUploadContents, + headers: { 'content-type': 'text/csv; charset=utf-8' }, + }); + + const uploads = await seizeApi.getUploadsForBlock(17531454); + + expect(uploads).toHaveLength(1); + expect(uploads[0].wallet).toBe('0xabc'); + expect(listener.infoMessages).toEqual( + expect.arrayContaining([ + expect.stringContaining( + 'Downloading from URL: https://gateway.arweave.net/abc123', + ), + ]), + ); + expect(listener.errorMessages).toEqual( + expect.arrayContaining([ + expect.stringContaining( + 'Failed to download from URL: https://arweave.net/abc123 because of error: Arweave CSV download timed out after 25ms for https://arweave.net/abc123', + ), + ]), + ); + }); +}); diff --git a/src/allowlist-lib/allowlist-lib-seize-timeout-patch.ts b/src/allowlist-lib/allowlist-lib-seize-timeout-patch.ts new file mode 100644 index 0000000..12c1151 --- /dev/null +++ b/src/allowlist-lib/allowlist-lib-seize-timeout-patch.ts @@ -0,0 +1,273 @@ +import { AllowlistCreator } from '@6529-collections/allowlist-lib/allowlist/allowlist-creator'; +import { LoggerFactory } from '@6529-collections/allowlist-lib/logging/logging-emitter'; +import { getArweaveFallbackUrls } from '@6529-collections/allowlist-lib/services/seize/seize.api'; +import { parseCsv } from '@6529-collections/allowlist-lib/utils/csv'; +import axios from 'axios'; + +const CSV_CONTENT_TYPES = [ + 'text/csv', + 'application/csv', + 'application/x-csv', + 'text/x-csv', + 'text/comma-separated-values', + 'application/vnd.ms-excel', + 'text/plain', + 'application/octet-stream', +]; + +type SeizeUploadsPage = { + readonly data: Array<{ + readonly block: number; + readonly url: string; + }>; +}; + +type PatchedSeizeApi = { + readonly apiUri?: string; + readonly apiToken?: string; + __allowlistApiTimeoutPatchApplied?: boolean; + getDataForBlock(params: { path: string; blockId: number }): Promise; +}; + +export function parseTimeoutMs( + value: string | undefined, + fallback: number, +): number { + const parsed = Number.parseInt(value ?? '', 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +export function patchAllowlistCreatorSeizeApi(params: { + readonly allowlistCreator: AllowlistCreator; + readonly loggerFactory: LoggerFactory; + readonly seizeMetadataTimeoutMs: number; + readonly arweaveDownloadTimeoutMs: number; +}) { + const seizeApi = (params.allowlistCreator as any).seizeApi as PatchedSeizeApi; + if (!seizeApi || seizeApi.__allowlistApiTimeoutPatchApplied) { + return; + } + + const logger = params.loggerFactory.create('SeizeApi'); + const apiUri = seizeApi.apiUri; + const apiToken = seizeApi.apiToken; + + seizeApi.getDataForBlock = async function ({ + path, + blockId, + }: { + readonly path: string; + readonly blockId: number; + }): Promise { + const headers = apiToken ? { 'x-6529-auth': apiToken } : undefined; + const endpoint = `${apiUri}${path}?block=${blockId}&page_size=1`; + logger.info(`Fetching Seize metadata from ${endpoint}`); + + const apiResponseData = await fetchJson({ + endpoint, + headers, + timeoutMs: params.seizeMetadataTimeoutMs, + timeoutLabel: 'Seize metadata fetch', + }); + const tdhUrl = getClosestTdh(apiResponseData, blockId); + if (!tdhUrl) { + throw new Error(`No TDH found for block ${blockId}`); + } + + return await downloadAndParseCsvWithFallback({ + url: tdhUrl, + timeoutMs: params.arweaveDownloadTimeoutMs, + logger, + }); + }; + + seizeApi.__allowlistApiTimeoutPatchApplied = true; +} + +async function fetchJson({ + endpoint, + headers, + timeoutMs, + timeoutLabel, +}: { + readonly endpoint: string; + readonly headers?: Record; + readonly timeoutMs: number; + readonly timeoutLabel: string; +}): Promise { + try { + const response = await axios.get(endpoint, { + headers, + timeout: timeoutMs, + }); + return response.data; + } catch (error) { + throw toTimeoutAwareError({ + error, + endpoint, + timeoutMs, + timeoutLabel, + }); + } +} + +async function downloadAndParseCsvWithFallback({ + url, + timeoutMs, + logger, +}: { + readonly url: string; + readonly timeoutMs: number; + readonly logger: { info(message: string): void; error(message: string): void }; +}): Promise { + const urls = getArweaveFallbackUrls(url); + const candidates = + urls.length > 0 ? Array.from(new Set(urls)) : Array.from(new Set([url])); + let lastError: Error; + + for (const candidate of candidates) { + logger.info(`Downloading from URL: ${candidate}`); + try { + return await fetchCsv({ endpoint: candidate, timeoutMs }); + } catch (error) { + const normalizedError = normalizeError(error); + lastError = normalizedError; + logger.error( + `Failed to download from URL: ${candidate} because of error: ${normalizedError.message}`, + ); + } + } + + if (candidates.length > 1) { + throw new Error( + `Arweave: all ${candidates.length} gateways failed. Last: ${lastError.message}`, + ); + } + throw lastError; +} + +async function fetchCsv({ + endpoint, + timeoutMs, +}: { + readonly endpoint: string; + readonly timeoutMs: number; +}): Promise { + try { + const response = await axios.get(endpoint, { + responseType: 'text', + timeout: timeoutMs, + transformResponse: [(data) => data], + }); + assertCsvResponse({ endpoint, response }); + return await parseCsv(response.data, { delimiter: ',' }, mapCsvRecords); + } catch (error) { + throw toTimeoutAwareError({ + error, + endpoint, + timeoutMs, + timeoutLabel: 'Arweave CSV download', + }); + } +} + +function toTimeoutAwareError({ + error, + endpoint, + timeoutMs, + timeoutLabel, +}: { + readonly error: unknown; + readonly endpoint: string; + readonly timeoutMs: number; + readonly timeoutLabel: string; +}): Error { + if (axios.isAxiosError(error)) { + if ( + error.code === 'ECONNABORTED' || + error.message.toLowerCase().includes('timeout') + ) { + return new Error( + `${timeoutLabel} timed out after ${timeoutMs}ms for ${endpoint}`, + ); + } + return new Error( + `${timeoutLabel} failed for ${endpoint}: ${error.message}`, + ); + } + return normalizeError(error); +} + +function normalizeError(error: unknown): Error { + if (error instanceof Error) { + return error; + } + return new Error(String(error)); +} + +function assertCsvResponse({ + endpoint, + response, +}: { + readonly endpoint: string; + readonly response: { + readonly headers: Record; + readonly data: string; + }; +}) { + const contentType = getHeaderValue(response.headers, 'content-type'); + if (!hasCsvCompatibleContentType(contentType)) { + throw new Error( + `Unexpected content-type for ${endpoint}: ${contentType ?? 'missing'}`, + ); + } + if (looksLikeHtml(response.data)) { + throw new Error(`Unexpected HTML response for ${endpoint}`); + } +} + +function getClosestTdh(apiResponseData: SeizeUploadsPage, blockId: number) { + return [...apiResponseData.data] + .sort((a, b) => a.block - b.block) + .filter((item) => item.block <= blockId) + .at(-1)?.url; +} + +function getHeaderValue( + headers: Record, + headerName: string, +): string | undefined { + const match = Object.entries(headers).find( + ([key]) => key.toLowerCase() === headerName.toLowerCase(), + )?.[1]; + if (Array.isArray(match)) { + return match.join(', '); + } + return match; +} + +function hasCsvCompatibleContentType(contentType: string | undefined): boolean { + if (!contentType) { + return true; + } + const normalized = contentType.split(';', 1)[0].trim().toLowerCase(); + return CSV_CONTENT_TYPES.includes(normalized); +} + +function looksLikeHtml(body: string): boolean { + const normalized = body.trimStart().toLowerCase(); + return normalized.startsWith(' { + it('marks the token pool as failed when allowlist execution times out', async () => { + const tokenPoolDownloadRepository = { + changeStatusToError: jest.fn(), + }; + const service = new TokenPoolDownloaderService( + tokenPoolDownloadRepository as any, + {} as any, + { + execute: jest.fn().mockRejectedValue( + new Error( + 'Arweave CSV download timed out after 30000ms for https://arweave.net/example', + ), + ), + } as any, + {} as any, + {} as any, + {} as any, + new AllowlistLibExecutionContextService(), + ); + const consoleErrorSpy = jest + .spyOn(console, 'error') + .mockImplementation(() => undefined); + + await expect( + (service as any).runOperationsAndFinishUp({ + entity: { + token_pool_id: 'pool-1', + contract: '0xabc', + token_ids: null, + allowlist_id: 'allowlist-1', + block_no: 123, + consolidate_block_no: 456, + }, + state: { + runsCount: 1, + startingBlocks: [], + }, + }), + ).rejects.toThrow( + 'Token pool pool-1 execution failed during consolidation or download: Arweave CSV download timed out after 30000ms for https://arweave.net/example', + ); + expect(tokenPoolDownloadRepository.changeStatusToError).toHaveBeenCalledWith( + { + tokenPoolId: 'pool-1', + }, + ); + + consoleErrorSpy.mockRestore(); + }); +}); diff --git a/src/token-pool/token-pool-downloader.service.ts b/src/token-pool/token-pool-downloader.service.ts index 9fd9e7d..344e944 100644 --- a/src/token-pool/token-pool-downloader.service.ts +++ b/src/token-pool/token-pool-downloader.service.ts @@ -22,6 +22,7 @@ import { TokenPoolDownloaderParams, TokenPoolDownloaderParamsState, } from './token-pool.types'; +import { AllowlistLibExecutionContextService } from '../allowlist-lib/allowlist-lib-execution-context.service'; @Injectable() export class TokenPoolDownloaderService { @@ -34,6 +35,7 @@ export class TokenPoolDownloaderService { private readonly transferRepository: TransferRepository, private readonly alchemy: Alchemy, private readonly db: DB, + private readonly allowlistLibExecutionContext: AllowlistLibExecutionContextService, ) {} async prepare({ @@ -339,22 +341,32 @@ export class TokenPoolDownloaderService { }; let allowlistState: AllowlistState; try { - allowlistState = await this.allowlistCreator.execute([ + allowlistState = await this.allowlistLibExecutionContext.run( { - code: AllowlistOperationCode.CREATE_ALLOWLIST, - params: allowlistOpParams, - }, - { - code: AllowlistOperationCode.CREATE_TOKEN_POOL, - params: tokenPoolOpParams, + tokenPoolId, + contract: entity.contract, + blockNo: entity.block_no, + consolidateBlockNo: entity.consolidate_block_no, }, - ]); + () => + this.allowlistCreator.execute([ + { + code: AllowlistOperationCode.CREATE_ALLOWLIST, + params: allowlistOpParams, + }, + { + code: AllowlistOperationCode.CREATE_TOKEN_POOL, + params: tokenPoolOpParams, + }, + ]), + ); } catch (e) { - console.error(`Persisting state for token pool ${tokenPoolId} failed`, e); + const error = this.toTokenPoolExecutionError(tokenPoolId, e); + console.error(error.message, e); await this.tokenPoolDownloadRepository.changeStatusToError({ tokenPoolId, }); - throw e; + throw error; } try { const con = await this.db.getConnection(); @@ -435,4 +447,12 @@ export class TokenPoolDownloaderService { ); await this.tokenPoolTokenRepository.insert(entities, { connection }); } + + private toTokenPoolExecutionError(tokenPoolId: string, error: unknown) { + const message = + error instanceof Error ? error.message : 'Unknown allowlist-lib error'; + return new Error( + `Token pool ${tokenPoolId} execution failed during consolidation or download: ${message}`, + ); + } }