diff --git a/.github/workflows/deploy_develop.yaml b/.github/workflows/deploy_develop.yaml index 50d12170..9bf94772 100644 --- a/.github/workflows/deploy_develop.yaml +++ b/.github/workflows/deploy_develop.yaml @@ -39,6 +39,7 @@ jobs: PROFILE_REGISTRY_CONTRACT_ADDRESS: ${{ secrets.DEV_PROFILE_REGISTRY_CONTRACT_ADDRESS }} PROFILE_ATTESTATION_SIGNER_ADDRESS: ${{ secrets.DEV_PROFILE_ATTESTATION_SIGNER_ADDRESS }} PROFILE_ATTESTATION_PRIVATE_KEY: ${{ secrets.DEV_PROFILE_ATTESTATION_PRIVATE_KEY }} + GIPHY_API_KEY: ${{ secrets.DEV_GIPHY_API_KEY }} deploy_testnet: name: api-testnet uses: ./.github/workflows/ssh_deploy.yaml @@ -63,3 +64,4 @@ jobs: PROFILE_REGISTRY_CONTRACT_ADDRESS: ${{ secrets.TESTNET_PROFILE_REGISTRY_CONTRACT_ADDRESS }} PROFILE_ATTESTATION_SIGNER_ADDRESS: ${{ secrets.TESTNET_PROFILE_ATTESTATION_SIGNER_ADDRESS }} PROFILE_ATTESTATION_PRIVATE_KEY: ${{ secrets.TESTNET_PROFILE_ATTESTATION_PRIVATE_KEY }} + GIPHY_API_KEY: ${{ secrets.DEV_GIPHY_API_KEY }} diff --git a/.github/workflows/deploy_main.yaml b/.github/workflows/deploy_main.yaml index 0c82576e..370d1173 100644 --- a/.github/workflows/deploy_main.yaml +++ b/.github/workflows/deploy_main.yaml @@ -39,4 +39,5 @@ jobs: PROFILE_REGISTRY_CONTRACT_ADDRESS: ${{ secrets.PROD_PROFILE_REGISTRY_CONTRACT_ADDRESS }} PROFILE_ATTESTATION_SIGNER_ADDRESS: ${{ secrets.PROD_PROFILE_ATTESTATION_SIGNER_ADDRESS }} PROFILE_ATTESTATION_PRIVATE_KEY: ${{ secrets.PROD_PROFILE_ATTESTATION_PRIVATE_KEY }} + GIPHY_API_KEY: ${{ secrets.PROD_GIPHY_API_KEY }} \ No newline at end of file diff --git a/.github/workflows/deploy_staging.yaml b/.github/workflows/deploy_staging.yaml index 6f012eb2..9b364f5c 100644 --- a/.github/workflows/deploy_staging.yaml +++ b/.github/workflows/deploy_staging.yaml @@ -39,3 +39,4 @@ jobs: PROFILE_REGISTRY_CONTRACT_ADDRESS: ${{ secrets.STAG_PROFILE_REGISTRY_CONTRACT_ADDRESS }} PROFILE_ATTESTATION_SIGNER_ADDRESS: ${{ secrets.STAG_PROFILE_ATTESTATION_SIGNER_ADDRESS }} PROFILE_ATTESTATION_PRIVATE_KEY: ${{ secrets.STAG_PROFILE_ATTESTATION_PRIVATE_KEY }} + GIPHY_API_KEY: ${{ secrets.STAG_GIPHY_API_KEY }} diff --git a/.github/workflows/ssh_deploy.yaml b/.github/workflows/ssh_deploy.yaml index 5230eed3..9dfe16e6 100644 --- a/.github/workflows/ssh_deploy.yaml +++ b/.github/workflows/ssh_deploy.yaml @@ -50,6 +50,9 @@ on: TRENDING_TAGS_API_KEY: description: "Trending tags API key" required: true + GIPHY_API_KEY: + description: "Giphy API key" + required: true X_CLIENT_ID: description: "X-Client-Id for API requests" required: false @@ -87,6 +90,7 @@ jobs: PROFILE_REGISTRY_CONTRACT_ADDRESS: "${{ secrets.PROFILE_REGISTRY_CONTRACT_ADDRESS }}" PROFILE_ATTESTATION_SIGNER_ADDRESS: "${{ secrets.PROFILE_ATTESTATION_SIGNER_ADDRESS }}" PROFILE_ATTESTATION_PRIVATE_KEY: "${{ secrets.PROFILE_ATTESTATION_PRIVATE_KEY }}" + GIPHY_API_KEY: "${{ secrets.GIPHY_API_KEY }}" with: host: "${{ secrets.DEPLOY_HOST }}" username: "${{ secrets.DEPLOY_USERNAME }}" @@ -104,6 +108,7 @@ jobs: PROFILE_REGISTRY_CONTRACT_ADDRESS, PROFILE_ATTESTATION_SIGNER_ADDRESS, PROFILE_ATTESTATION_PRIVATE_KEY, + GIPHY_API_KEY, SHA script: | echo $SHA > $HOST_DATA_DIR/REVISION || true @@ -128,6 +133,7 @@ jobs: -e PROFILE_REGISTRY_CONTRACT_ADDRESS \ -e PROFILE_ATTESTATION_SIGNER_ADDRESS \ -e PROFILE_ATTESTATION_PRIVATE_KEY \ + -e GIPHY_API_KEY \ -e NODE_ENV=production \ -e REDIS_HOST=${{ inputs.CONTAINER_NAME }}-redis \ -e REDIS_PORT=6379 \ diff --git a/src/account/account.module.ts b/src/account/account.module.ts index 8aa10720..39785960 100644 --- a/src/account/account.module.ts +++ b/src/account/account.module.ts @@ -17,10 +17,12 @@ import { LeaderboardController } from './controllers/leaderboard.controller'; import { AccountLeaderboardSnapshot } from './entities/account-leaderboard-snapshot.entity'; import { LeaderboardSnapshotService } from './services/leaderboard-snapshot.service'; import { ProfileModule } from '@/profile/profile.module'; +import { AePricingModule } from '@/ae-pricing/ae-pricing.module'; @Module({ imports: [ AeModule, // Includes CoinGeckoService + AePricingModule, // Provides CoinHistoricalPriceService for DB-cached prices ProfileModule, TransactionsModule, TokensModule, diff --git a/src/account/services/bcl-pnl.service.spec.ts b/src/account/services/bcl-pnl.service.spec.ts index cc68cc9a..546d690c 100644 --- a/src/account/services/bcl-pnl.service.spec.ts +++ b/src/account/services/bcl-pnl.service.spec.ts @@ -18,7 +18,7 @@ describe('BclPnlService', () => { jest.clearAllMocks(); }); - it('uses joined latest-price CTEs instead of correlated subqueries', async () => { + it('uses LATERAL + LIMIT 1 for price lookups instead of DISTINCT ON full scans', async () => { const { service, transactionRepository } = createService(); transactionRepository.query.mockResolvedValue([]); @@ -28,13 +28,12 @@ describe('BclPnlService', () => { const [sql, params] = transactionRepository.query.mock.calls[0]; expect(sql).toContain('WITH aggregated_holdings AS'); - expect(sql).toContain('latest_price_ae AS'); - expect(sql).toContain('latest_price_usd AS'); - expect(sql).toContain('DISTINCT ON (tx.sale_address)'); - expect(sql).toContain('INNER JOIN aggregated_holdings agg'); - expect(sql).toContain('LEFT JOIN latest_price_ae'); - expect(sql).toContain('LEFT JOIN latest_price_usd'); - expect(sql).not.toContain('tx2.sale_address = tx.sale_address'); + expect(sql).toContain('LEFT JOIN LATERAL'); + expect(sql).toContain('LIMIT 1'); + expect(sql).toContain('ae_price ON true'); + expect(sql).toContain('usd_price ON true'); + expect(sql).not.toContain('DISTINCT ON (tx.sale_address)'); + expect(sql).not.toContain('INNER JOIN aggregated_holdings agg'); expect(params).toEqual(['ak_test', 100, 50]); }); @@ -95,6 +94,104 @@ describe('BclPnlService', () => { expect(result.totalGainUsd).toBe(36); }); + it('calculateTokenPnlsBatch runs a single query for all heights and groups results', async () => { + const { service, transactionRepository } = createService(); + + // Two heights, one token each + transactionRepository.query.mockResolvedValue([ + { + snapshot_height: 200, + sale_address: 'ct_alpha', + current_holdings: '3', + total_volume_bought: '3', + total_amount_spent_ae: '9', + total_amount_spent_usd: '18', + total_amount_received_ae: '0', + total_amount_received_usd: '0', + total_volume_sold: '0', + current_unit_price_ae: '4', + current_unit_price_usd: '8', + }, + { + snapshot_height: 300, + sale_address: 'ct_alpha', + current_holdings: '5', + total_volume_bought: '5', + total_amount_spent_ae: '15', + total_amount_spent_usd: '30', + total_amount_received_ae: '0', + total_amount_received_usd: '0', + total_volume_sold: '0', + current_unit_price_ae: '6', + current_unit_price_usd: '12', + }, + ]); + + const result = await service.calculateTokenPnlsBatch( + 'ak_test', + [200, 300], + ); + + // Single DB round-trip regardless of number of heights + expect(transactionRepository.query).toHaveBeenCalledTimes(1); + const [sql, params] = transactionRepository.query.mock.calls[0]; + + // Batch query uses UNNEST, a MATERIALIZED CTE (single tx scan), and LATERAL price lookups + expect(sql).toContain('unnest($2::int[])'); + expect(sql).toContain('snapshot_height'); + expect(sql).toContain('AS MATERIALIZED'); + expect(sql).toContain('address_txs'); + expect(sql).not.toContain('JOIN transactions tx'); // no repeated scan of transactions + expect(sql).toContain('LEFT JOIN LATERAL'); + expect(sql).toContain('LIMIT 1'); + expect(sql).not.toContain('DISTINCT ON (agg.snapshot_height, agg.sale_address)'); + expect(params).toEqual(['ak_test', [200, 300]]); + + expect(result).toBeInstanceOf(Map); + expect(result.size).toBe(2); + + const at200 = result.get(200)!; + expect(at200.pnls.ct_alpha.current_value.ae).toBe(12); // 3 * 4 + expect(at200.totalCurrentValueAe).toBe(12); + + const at300 = result.get(300)!; + expect(at300.pnls.ct_alpha.current_value.ae).toBe(30); // 5 * 6 + expect(at300.totalCurrentValueAe).toBe(30); + }); + + it('calculateTokenPnlsBatch returns empty map for empty heights array', async () => { + const { service, transactionRepository } = createService(); + + const result = await service.calculateTokenPnlsBatch('ak_test', []); + + expect(transactionRepository.query).not.toHaveBeenCalled(); + expect(result).toBeInstanceOf(Map); + expect(result.size).toBe(0); + }); + + it('calculateTokenPnlsBatch deduplicates heights before querying', async () => { + const { service, transactionRepository } = createService(); + transactionRepository.query.mockResolvedValue([]); + + await service.calculateTokenPnlsBatch('ak_test', [100, 100, 200, 100]); + + expect(transactionRepository.query).toHaveBeenCalledTimes(1); + const [, params] = transactionRepository.query.mock.calls[0]; + // Duplicates removed; order of unique values is preserved + expect(params[1]).toEqual([100, 200]); + }); + + it('calculateTokenPnlsBatch passes fromBlockHeight as $3 when provided', async () => { + const { service, transactionRepository } = createService(); + transactionRepository.query.mockResolvedValue([]); + + await service.calculateTokenPnlsBatch('ak_test', [500], 50); + + const [sql, params] = transactionRepository.query.mock.calls[0]; + expect(sql).toContain('block_height >= $3'); + expect(params).toEqual(['ak_test', [500], 50]); + }); + it('preserves range-based pnl result semantics', async () => { const { service, transactionRepository } = createService(); transactionRepository.query.mockResolvedValue([ diff --git a/src/account/services/bcl-pnl.service.ts b/src/account/services/bcl-pnl.service.ts index 835bdf6a..ab75344a 100644 --- a/src/account/services/bcl-pnl.service.ts +++ b/src/account/services/bcl-pnl.service.ts @@ -48,6 +48,212 @@ export class BclPnlService { return this.mapTokenPnls(tokenPnls, fromBlockHeight); } + /** + * Calculate PNL for each token across multiple block heights in a single SQL query. + * Deduplicates heights before querying to avoid redundant work. + * + * @param address - Account address + * @param blockHeights - Array of block heights to calculate PNL at + * @param fromBlockHeight - Optional: If provided, restrict cost-basis aggregates to this range + * @returns Map from block height to its TokenPnlResult + */ + async calculateTokenPnlsBatch( + address: string, + blockHeights: number[], + fromBlockHeight?: number, + ): Promise> { + const uniqueHeights = [...new Set(blockHeights)]; + if (uniqueHeights.length === 0) { + return new Map(); + } + + const rows = await this.transactionRepository.query( + this.buildBatchTokenPnlsQuery(fromBlockHeight), + this.buildBatchQueryParameters(address, uniqueHeights, fromBlockHeight), + ); + + return this.mapTokenPnlsBatch(rows, uniqueHeights, fromBlockHeight); + } + + private buildBatchTokenPnlsQuery(fromBlockHeight?: number): string { + const hasRange = fromBlockHeight !== undefined && fromBlockHeight !== null; + const rangeCondition = hasRange ? ' AND tx.block_height >= $3' : ''; + + return ` + WITH heights AS ( + SELECT unnest($2::int[]) AS snapshot_height + ), + -- Scan the address's transactions exactly once and materialise the result. + -- Without MATERIALIZED PostgreSQL 12+ may inline this CTE, re-executing + -- the index scan for every row in heights (N × index-scan instead of 1). + -- With MATERIALIZED the planner builds a hash table once in work_mem and + -- probes it for each snapshot height, avoiding repeated I/O. + address_txs AS MATERIALIZED ( + SELECT sale_address, block_height, tx_type, volume, amount + FROM transactions + WHERE address = $1 + ), + aggregated_holdings AS ( + SELECT + h.snapshot_height, + tx.sale_address AS sale_address, + COALESCE( + SUM( + CASE + WHEN tx.tx_type IN ('buy', 'create_community') + THEN CAST(tx.volume AS DECIMAL) + ELSE 0 + END + ) - + COALESCE( + SUM( + CASE + WHEN tx.tx_type = 'sell' + THEN CAST(tx.volume AS DECIMAL) + ELSE 0 + END + ), + 0 + ), + 0 + ) AS current_holdings, + COALESCE( + SUM( + CASE + WHEN tx.tx_type IN ('buy', 'create_community')${rangeCondition} + THEN CAST(tx.volume AS DECIMAL) + ELSE 0 + END + ), + 0 + ) AS total_volume_bought, + COALESCE( + SUM( + CASE + WHEN tx.tx_type IN ('buy', 'create_community')${rangeCondition} + THEN CAST(NULLIF(tx.amount->>'ae', 'NaN') AS DECIMAL) + ELSE 0 + END + ), + 0 + ) AS total_amount_spent_ae, + COALESCE( + SUM( + CASE + WHEN tx.tx_type IN ('buy', 'create_community')${rangeCondition} + THEN CAST(NULLIF(tx.amount->>'usd', 'NaN') AS DECIMAL) + ELSE 0 + END + ), + 0 + ) AS total_amount_spent_usd, + COALESCE( + SUM( + CASE + WHEN tx.tx_type = 'sell'${rangeCondition} + THEN CAST(NULLIF(tx.amount->>'ae', 'NaN') AS DECIMAL) + ELSE 0 + END + ), + 0 + ) AS total_amount_received_ae, + COALESCE( + SUM( + CASE + WHEN tx.tx_type = 'sell'${rangeCondition} + THEN CAST(NULLIF(tx.amount->>'usd', 'NaN') AS DECIMAL) + ELSE 0 + END + ), + 0 + ) AS total_amount_received_usd, + COALESCE( + SUM( + CASE + WHEN tx.tx_type = 'sell'${rangeCondition} + THEN CAST(tx.volume AS DECIMAL) + ELSE 0 + END + ), + 0 + ) AS total_volume_sold + FROM heights h + JOIN address_txs tx ON tx.block_height < h.snapshot_height + GROUP BY h.snapshot_height, tx.sale_address + ) + SELECT + agg.snapshot_height, + agg.sale_address, + agg.current_holdings, + agg.total_volume_bought, + agg.total_amount_spent_ae, + agg.total_amount_spent_usd, + agg.total_amount_received_ae, + agg.total_amount_received_usd, + agg.total_volume_sold, + ae_price.current_unit_price_ae, + usd_price.current_unit_price_usd + FROM aggregated_holdings agg + LEFT JOIN LATERAL ( + SELECT CAST(NULLIF(p.buy_price->>'ae', 'NaN') AS DECIMAL) AS current_unit_price_ae + FROM transactions p + WHERE p.sale_address = agg.sale_address + AND p.block_height <= agg.snapshot_height + AND p.buy_price->>'ae' IS NOT NULL + AND p.buy_price->>'ae' NOT IN ('NaN', 'null', '') + ORDER BY p.block_height DESC, p.created_at DESC + LIMIT 1 + ) ae_price ON true + LEFT JOIN LATERAL ( + SELECT CAST(NULLIF(p.buy_price->>'usd', 'NaN') AS DECIMAL) AS current_unit_price_usd + FROM transactions p + WHERE p.sale_address = agg.sale_address + AND p.block_height <= agg.snapshot_height + AND p.buy_price->>'usd' IS NOT NULL + AND p.buy_price->>'usd' NOT IN ('NaN', 'null', '') + ORDER BY p.block_height DESC, p.created_at DESC + LIMIT 1 + ) usd_price ON true + WHERE agg.current_holdings > 0 + `; + } + + private buildBatchQueryParameters( + address: string, + blockHeights: number[], + fromBlockHeight?: number, + ): Array { + return fromBlockHeight !== undefined && fromBlockHeight !== null + ? [address, blockHeights, fromBlockHeight] + : [address, blockHeights]; + } + + private mapTokenPnlsBatch( + rows: Array>, + uniqueHeights: number[], + fromBlockHeight?: number, + ): Map { + // Group rows by snapshot_height + const rowsByHeight = new Map>>(); + for (const height of uniqueHeights) { + rowsByHeight.set(height, []); + } + for (const row of rows) { + const height = Number(row.snapshot_height); + const group = rowsByHeight.get(height); + if (group) { + group.push(row); + } + } + + // Map each height's rows using the existing single-height mapper + const result = new Map(); + for (const [height, heightRows] of rowsByHeight.entries()) { + result.set(height, this.mapTokenPnls(heightRows, fromBlockHeight)); + } + return result; + } + private buildTokenPnlsQuery(fromBlockHeight?: number): string { const hasRange = fromBlockHeight !== undefined && fromBlockHeight !== null; const rangeCondition = hasRange ? ' AND tx.block_height >= $3' : ''; @@ -140,34 +346,6 @@ export class BclPnlService { WHERE tx.address = $1 AND tx.block_height < $2 GROUP BY tx.sale_address - ), - latest_price_ae AS ( - SELECT DISTINCT ON (tx.sale_address) - tx.sale_address, - CAST(NULLIF(tx.buy_price->>'ae', 'NaN') AS DECIMAL) AS current_unit_price_ae - FROM transactions tx - INNER JOIN aggregated_holdings agg - ON agg.sale_address = tx.sale_address - WHERE tx.block_height <= $2 - AND tx.buy_price->>'ae' IS NOT NULL - AND tx.buy_price->>'ae' != 'NaN' - AND tx.buy_price->>'ae' != 'null' - AND tx.buy_price->>'ae' != '' - ORDER BY tx.sale_address, tx.block_height DESC, tx.created_at DESC - ), - latest_price_usd AS ( - SELECT DISTINCT ON (tx.sale_address) - tx.sale_address, - CAST(NULLIF(tx.buy_price->>'usd', 'NaN') AS DECIMAL) AS current_unit_price_usd - FROM transactions tx - INNER JOIN aggregated_holdings agg - ON agg.sale_address = tx.sale_address - WHERE tx.block_height <= $2 - AND tx.buy_price->>'usd' IS NOT NULL - AND tx.buy_price->>'usd' != 'NaN' - AND tx.buy_price->>'usd' != 'null' - AND tx.buy_price->>'usd' != '' - ORDER BY tx.sale_address, tx.block_height DESC, tx.created_at DESC ) SELECT agg.sale_address, @@ -178,13 +356,29 @@ export class BclPnlService { agg.total_amount_received_ae, agg.total_amount_received_usd, agg.total_volume_sold, - latest_price_ae.current_unit_price_ae, - latest_price_usd.current_unit_price_usd + ae_price.current_unit_price_ae, + usd_price.current_unit_price_usd FROM aggregated_holdings agg - LEFT JOIN latest_price_ae - ON latest_price_ae.sale_address = agg.sale_address - LEFT JOIN latest_price_usd - ON latest_price_usd.sale_address = agg.sale_address + LEFT JOIN LATERAL ( + SELECT CAST(NULLIF(p.buy_price->>'ae', 'NaN') AS DECIMAL) AS current_unit_price_ae + FROM transactions p + WHERE p.sale_address = agg.sale_address + AND p.block_height <= $2 + AND p.buy_price->>'ae' IS NOT NULL + AND p.buy_price->>'ae' NOT IN ('NaN', 'null', '') + ORDER BY p.block_height DESC, p.created_at DESC + LIMIT 1 + ) ae_price ON true + LEFT JOIN LATERAL ( + SELECT CAST(NULLIF(p.buy_price->>'usd', 'NaN') AS DECIMAL) AS current_unit_price_usd + FROM transactions p + WHERE p.sale_address = agg.sale_address + AND p.block_height <= $2 + AND p.buy_price->>'usd' IS NOT NULL + AND p.buy_price->>'usd' NOT IN ('NaN', 'null', '') + ORDER BY p.block_height DESC, p.created_at DESC + LIMIT 1 + ) usd_price ON true WHERE agg.current_holdings > 0 `; } diff --git a/src/account/services/portfolio.service.spec.ts b/src/account/services/portfolio.service.spec.ts index 0cd31c64..b053f5eb 100644 --- a/src/account/services/portfolio.service.spec.ts +++ b/src/account/services/portfolio.service.spec.ts @@ -1,13 +1,14 @@ import moment from 'moment'; import { PortfolioService } from './portfolio.service'; -import { timestampToAeHeight } from '@/utils/getBlochHeight'; +import { batchTimestampToAeHeight } from '@/utils/getBlochHeight'; +import { TokenPnlResult } from './bcl-pnl.service'; jest.mock('@/utils/getBlochHeight', () => ({ - timestampToAeHeight: jest.fn(), + batchTimestampToAeHeight: jest.fn(), })); describe('PortfolioService', () => { - const basePnlResult = { + const basePnlResult: TokenPnlResult = { pnls: {}, totalCostBasisAe: 0, totalCostBasisUsd: 0, @@ -27,8 +28,13 @@ describe('PortfolioService', () => { fetchHistoricalPrice: jest.fn(), getPriceData: jest.fn(), }; + const coinHistoricalPriceService = { + // Default: no DB data → falls back to coinGeckoService.fetchHistoricalPrice + getHistoricalPriceData: jest.fn().mockResolvedValue([]), + }; const bclPnlService = { calculateTokenPnls: jest.fn(), + calculateTokenPnlsBatch: jest.fn(), }; const service = new PortfolioService( @@ -38,6 +44,7 @@ describe('PortfolioService', () => { {} as any, aeSdkService as any, coinGeckoService as any, + coinHistoricalPriceService as any, bclPnlService as any, ); @@ -45,6 +52,7 @@ describe('PortfolioService', () => { service, aeSdkService, coinGeckoService, + coinHistoricalPriceService, bclPnlService, }; }; @@ -53,26 +61,43 @@ describe('PortfolioService', () => { jest.clearAllMocks(); }); - it('limits concurrent per-timestamp pnl work', async () => { + it('limits concurrent balance fetches to snapshotConcurrency', async () => { const { service, aeSdkService, coinGeckoService, bclPnlService } = createService(); - (timestampToAeHeight as jest.Mock).mockImplementation(async (ts: number) => ts); + // Assign heights that land in different 300-block buckets so each + // snapshot triggers its own getBalance call (bucket = i * 300). + (batchTimestampToAeHeight as jest.Mock).mockImplementation( + async (timestamps: number[]) => { + const map = new Map(); + timestamps.forEach((ts, i) => map.set(ts, i * 300)); + return map; + }, + ); + coinGeckoService.fetchHistoricalPrice.mockResolvedValue([ [Date.UTC(2026, 0, 10), 10], [Date.UTC(2026, 0, 1), 1], ]); coinGeckoService.getPriceData.mockResolvedValue({ usd: 10 }); - aeSdkService.sdk.getBalance.mockResolvedValue('1000000000000000000'); + + // PNL batch covers the exact heights assigned above (i * 300) + bclPnlService.calculateTokenPnlsBatch.mockImplementation( + async (_addr: string, heights: number[]) => { + const map = new Map(); + heights.forEach((h) => map.set(h, basePnlResult)); + return map; + }, + ); let inFlight = 0; let maxInFlight = 0; - bclPnlService.calculateTokenPnls.mockImplementation(async () => { + aeSdkService.sdk.getBalance.mockImplementation(async () => { inFlight++; maxInFlight = Math.max(maxInFlight, inFlight); await new Promise((resolve) => setTimeout(resolve, 5)); inFlight--; - return basePnlResult; + return '1000000000000000000'; }); const startDate = moment.utc('2026-01-01T00:00:00.000Z'); @@ -85,23 +110,38 @@ describe('PortfolioService', () => { }); expect(snapshots).toHaveLength(10); - expect(bclPnlService.calculateTokenPnls).toHaveBeenCalledTimes(10); - expect(maxInFlight).toBeLessThanOrEqual(6); + // PNL is pre-computed in a single batch call + expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledTimes(1); + // All 10 heights land in different 300-block buckets, so 10 balance calls + expect(aeSdkService.sdk.getBalance).toHaveBeenCalledTimes(10); + // Balance concurrency cap is 15 + expect(maxInFlight).toBeLessThanOrEqual(15); }); - it('memoizes repeated block-height lookups and keeps historical pricing intact', async () => { + it('deduplicates block heights so balance and PNL are only fetched once per unique height', async () => { const { service, aeSdkService, coinGeckoService, bclPnlService } = createService(); - (timestampToAeHeight as jest.Mock).mockResolvedValue(123); + // All timestamps map to the same block height + (batchTimestampToAeHeight as jest.Mock).mockImplementation( + async (timestamps: number[]) => { + const map = new Map(); + timestamps.forEach((ts) => map.set(ts, 123)); + return map; + }, + ); + coinGeckoService.fetchHistoricalPrice.mockResolvedValue([ [Date.UTC(2026, 0, 3), 3], [Date.UTC(2026, 0, 1), 1], [Date.UTC(2026, 0, 2), 2], ]); coinGeckoService.getPriceData.mockResolvedValue({ usd: 99 }); + + const pnlMap = new Map([[123, basePnlResult]]); + bclPnlService.calculateTokenPnlsBatch.mockResolvedValue(pnlMap); + aeSdkService.sdk.getBalance.mockResolvedValue('1000000000000000000'); - bclPnlService.calculateTokenPnls.mockResolvedValue(basePnlResult); const snapshots = await service.getPortfolioHistory('ak_test', { startDate: moment.utc('2026-01-01T00:00:00.000Z'), @@ -110,8 +150,160 @@ describe('PortfolioService', () => { }); expect(snapshots).toHaveLength(3); + // All 3 timestamps share height 123, so balance is fetched once expect(aeSdkService.sdk.getBalance).toHaveBeenCalledTimes(1); - expect(bclPnlService.calculateTokenPnls).toHaveBeenCalledTimes(1); + // Batch PNL is called once with deduplicated heights + expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledTimes(1); + expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledWith( + 'ak_test', + [123], + undefined, + ); + // Per-snapshot price is still resolved per timestamp, not per block height expect(snapshots.map((snapshot) => snapshot.ae_price)).toEqual([1, 2, 3]); }); + + it('buckets block heights to multiples of 300 to share getBalance calls', async () => { + const { service, aeSdkService, coinGeckoService, bclPnlService } = + createService(); + + // 4 timestamps that map to 3 distinct exact heights, but only 2 distinct + // 300-block buckets: 100 → bucket 0, 250 → bucket 0, 350 → bucket 300, 599 → bucket 300 + (batchTimestampToAeHeight as jest.Mock).mockImplementation( + async (timestamps: number[]) => { + const heights = [100, 250, 350, 599]; + const map = new Map(); + timestamps.forEach((ts, i) => map.set(ts, heights[i])); + return map; + }, + ); + + coinGeckoService.fetchHistoricalPrice.mockResolvedValue([ + [Date.UTC(2026, 0, 4), 4], + [Date.UTC(2026, 0, 1), 1], + [Date.UTC(2026, 0, 2), 2], + [Date.UTC(2026, 0, 3), 3], + ]); + coinGeckoService.getPriceData.mockResolvedValue({ usd: 5 }); + aeSdkService.sdk.getBalance.mockResolvedValue('1000000000000000000'); + + bclPnlService.calculateTokenPnlsBatch.mockImplementation( + async (_addr: string, heights: number[]) => { + const map = new Map(); + heights.forEach((h) => map.set(h, basePnlResult)); + return map; + }, + ); + + const snapshots = await service.getPortfolioHistory('ak_test', { + startDate: moment.utc('2026-01-01T00:00:00.000Z'), + endDate: moment.utc('2026-01-04T00:00:00.000Z'), + interval: 86400, + }); + + expect(snapshots).toHaveLength(4); + // Heights 100 and 250 → bucket 0; heights 350 and 599 → bucket 300. + // Only 2 unique buckets → exactly 2 getBalance calls. + expect(aeSdkService.sdk.getBalance).toHaveBeenCalledTimes(2); + // getBalance is called with the bucket boundaries, not the raw heights + const calledHeights = aeSdkService.sdk.getBalance.mock.calls.map( + ([, opts]: [any, any]) => opts.height, + ); + expect(calledHeights).toEqual(expect.arrayContaining([0, 300])); + }); + + it('pre-computes both cumulative and range PNL maps when range PNL is requested', async () => { + const { service, aeSdkService, coinGeckoService, bclPnlService } = + createService(); + + (batchTimestampToAeHeight as jest.Mock).mockImplementation( + async (timestamps: number[]) => { + const map = new Map(); + timestamps.forEach((ts, i) => map.set(ts, 100 + i)); + return map; + }, + ); + coinGeckoService.fetchHistoricalPrice.mockResolvedValue([ + [Date.UTC(2026, 0, 3), 3], + [Date.UTC(2026, 0, 1), 1], + [Date.UTC(2026, 0, 2), 2], + ]); + coinGeckoService.getPriceData.mockResolvedValue({ usd: 5 }); + aeSdkService.sdk.getBalance.mockResolvedValue('1000000000000000000'); + + bclPnlService.calculateTokenPnlsBatch.mockImplementation( + async (_addr: string, heights: number[]) => { + const map = new Map(); + heights.forEach((h) => map.set(h, basePnlResult)); + return map; + }, + ); + + const snapshots = await service.getPortfolioHistory('ak_test', { + startDate: moment.utc('2026-01-01T00:00:00.000Z'), + endDate: moment.utc('2026-01-03T00:00:00.000Z'), + interval: 86400, + includePnl: true, + useRangeBasedPnl: true, + }); + + expect(snapshots).toHaveLength(3); + // Called twice: once for cumulative, once for range-based + expect(bclPnlService.calculateTokenPnlsBatch).toHaveBeenCalledTimes(2); + // startBlockHeight = blockHeights[0] = 100 (timestamps[0] maps to 100+0) + const calls = bclPnlService.calculateTokenPnlsBatch.mock.calls; + expect(calls[0][2]).toBeUndefined(); // cumulative: no fromBlockHeight + expect(calls[1][2]).toBe(100); // range: fromBlockHeight = blockHeights[0] + }); + + it('uses coin_historical_prices DB table when available, skipping CoinGecko', async () => { + const { + service, + aeSdkService, + coinGeckoService, + coinHistoricalPriceService, + bclPnlService, + } = createService(); + + (batchTimestampToAeHeight as jest.Mock).mockImplementation( + async (timestamps: number[]) => { + const map = new Map(); + timestamps.forEach((ts, i) => map.set(ts, (i + 1) * 300)); + return map; + }, + ); + + // DB returns prices in ascending order (as the repository does) + coinHistoricalPriceService.getHistoricalPriceData.mockResolvedValue([ + [Date.UTC(2026, 0, 1), 1], + [Date.UTC(2026, 0, 2), 2], + [Date.UTC(2026, 0, 3), 3], + ]); + coinGeckoService.getPriceData.mockResolvedValue({ usd: 99 }); + + bclPnlService.calculateTokenPnlsBatch.mockImplementation( + async (_addr: string, heights: number[]) => { + const map = new Map(); + heights.forEach((h) => map.set(h, basePnlResult)); + return map; + }, + ); + aeSdkService.sdk.getBalance.mockResolvedValue('1000000000000000000'); + + const snapshots = await service.getPortfolioHistory('ak_test', { + startDate: moment.utc('2026-01-01T00:00:00.000Z'), + endDate: moment.utc('2026-01-03T00:00:00.000Z'), + interval: 86400, + }); + + expect(snapshots).toHaveLength(3); + // Prices come from DB — CoinGecko historical endpoint is never called + expect(coinGeckoService.fetchHistoricalPrice).not.toHaveBeenCalled(); + // DB service was queried for the needed range + expect( + coinHistoricalPriceService.getHistoricalPriceData, + ).toHaveBeenCalledTimes(1); + // Correct prices are assigned to each snapshot + expect(snapshots.map((s) => s.ae_price)).toEqual([1, 2, 3]); + }); }); diff --git a/src/account/services/portfolio.service.ts b/src/account/services/portfolio.service.ts index 22cf2709..2fc10c75 100644 --- a/src/account/services/portfolio.service.ts +++ b/src/account/services/portfolio.service.ts @@ -8,9 +8,10 @@ import { Token } from '@/tokens/entities/token.entity'; import { Transaction } from '@/transactions/entities/transaction.entity'; import { AeSdkService } from '@/ae/ae-sdk.service'; import { CoinGeckoService } from '@/ae/coin-gecko.service'; +import { CoinHistoricalPriceService } from '@/ae-pricing/services/coin-historical-price.service'; import { AETERNITY_COIN_ID } from '@/configs'; import { toAe } from '@aeternity/aepp-sdk'; -import { timestampToAeHeight } from '@/utils/getBlochHeight'; +import { batchTimestampToAeHeight } from '@/utils/getBlochHeight'; import { BclPnlService, TokenPnlResult } from './bcl-pnl.service'; export interface PortfolioHistorySnapshot { @@ -88,7 +89,15 @@ export interface GetPortfolioHistoryOptions { @Injectable() export class PortfolioService { private readonly logger = new Logger(PortfolioService.name); - private readonly snapshotConcurrency = 6; + private readonly snapshotConcurrency = 15; + /** + * Balance granularity bucket size in key blocks. + * Block heights are floored to the nearest multiple of this value before + * calling getBalance, so that snapshots within the same 300-block window + * (~15 hours) share a single AE node lookup instead of each making a + * separate slow historical-state request. + */ + private readonly BALANCE_BUCKET_SIZE = 300; constructor( @InjectRepository(TokenHolder) @@ -100,6 +109,7 @@ export class PortfolioService { @InjectDataSource() private readonly dataSource: DataSource, private readonly aeSdkService: AeSdkService, private readonly coinGeckoService: CoinGeckoService, + private readonly coinHistoricalPriceService: CoinHistoricalPriceService, private readonly bclPnlService: BclPnlService, ) {} @@ -165,52 +175,98 @@ export class PortfolioService { } } - let previousHeight: number | undefined = undefined; - // CoinGecko supports: 1, 7, 14, 30, 90, 180, 365, max - // Request 365 days to ensure we get historical data (it will include our date range if it's within the last year) - const days = 365; - // Always use 'daily' interval from CoinGecko - hourly data is not reliably available - // We'll use the closest daily price for any timestamp (including hourly requests) - const priceInterval: 'daily' | 'hourly' = 'daily'; - const aePriceHistory = ( - await this.coinGeckoService.fetchHistoricalPrice( + // Fetch price history and current price in parallel. + // For historical prices, query the local coin_historical_prices table first + // (populated by the background CoinGecko sync). Only fall back to the live + // CoinGecko API when the DB has no data for the needed range. + // Extend start backward a few days so the first snapshot always has a price + // data point at or before it, even when the range starts close to midnight. + const priceRangeStartMs = start.clone().subtract(3, 'days').valueOf(); + const priceRangeEndMs = end.valueOf(); + + const [dbPriceRows, currentAePrice] = await Promise.all([ + this.coinHistoricalPriceService.getHistoricalPriceData( AETERNITY_COIN_ID, - 'usd', // force to usd - days, - priceInterval, - ) - ).sort((a, b) => b[0] - a[0]); - const currentAePrice = await this.coinGeckoService.getPriceData( - new BigNumber(1), + 'usd', + priceRangeStartMs, + priceRangeEndMs, + ), + this.coinGeckoService.getPriceData(new BigNumber(1)), + ]); + + let aePriceHistory: Array<[number, number]>; + if (dbPriceRows.length > 0) { + // DB data sorted ascending — reverse to match the descending order + // expected by findClosestHistoricalPrice. + aePriceHistory = dbPriceRows.reverse(); + } else { + // DB has no data for this range; fall back to live CoinGecko fetch. + const daysNeeded = Math.ceil(now.diff(start, 'days', true)) + 3; + const days = Math.min(365, Math.max(7, daysNeeded)); + aePriceHistory = await this.coinGeckoService + .fetchHistoricalPrice(AETERNITY_COIN_ID, 'usd', days, 'daily') + .then((prices) => prices.sort((a, b) => b[0] - a[0])); + } + // Resolve all block heights in a single batch query against the local key_blocks table. + // Any timestamps not covered by the table (sync gaps) fall back to individual resolution. + const targetTimestamps = timestamps.map((t) => t.valueOf()); + const heightMap = await batchTimestampToAeHeight( + targetTimestamps, + this.dataSource, ); - - // First, calculate all block heights sequentially (since previousHeight is used as a hint) - const blockHeights: number[] = []; - for (const timestamp of timestamps) { - const blockHeight = await timestampToAeHeight( - timestamp.valueOf(), - previousHeight, - this.dataSource, + // Build the ordered block-height array. If a timestamp was not resolved by + // either key_blocks or transactions (extremely rare — would require a gap in + // both tables), propagate the nearest already-resolved height rather than + // silently falling back to 0 (genesis block), which would produce wrong + // balances and PNL for that snapshot. + let lastKnownHeight = 0; + const blockHeights = targetTimestamps.map((ts) => { + const h = heightMap.get(ts); + if (h !== undefined) { + lastKnownHeight = h; + return h; + } + this.logger.warn( + `[batchTimestampToAeHeight] Could not resolve block height for ts=${ts}; ` + + `using nearest known height ${lastKnownHeight}`, ); - blockHeights.push(blockHeight); - previousHeight = blockHeight; - } - - // Store the actual startDate for range-based PNL calculations - const actualStartDate = start; - - // Calculate block height for startDate once (for range-based PNL last snapshot) + return lastKnownHeight; + }); + // startBlockHeight is the block at the beginning of the requested range. + // timestamps[0] === start (same millisecond value), so blockHeights[0] is + // already the correct answer — no extra DB or API call needed. const startBlockHeight = - useRangeBasedPnl && includePnl - ? await timestampToAeHeight( - actualStartDate.valueOf(), - undefined, - this.dataSource, + useRangeBasedPnl && includePnl ? blockHeights[0] : undefined; + + const uniqueBlockHeights = [...new Set(blockHeights)]; + + // Pre-compute PNL for all unique block heights in a single batch query. + // This replaces the previous per-snapshot SQL calls (N queries → 1 query). + const [pnlMap, rangePnlMap] = await Promise.all([ + this.bclPnlService.calculateTokenPnlsBatch( + address, + uniqueBlockHeights, + undefined, + ), + includePnl && useRangeBasedPnl && startBlockHeight !== undefined + ? this.bclPnlService.calculateTokenPnlsBatch( + address, + uniqueBlockHeights, + startBlockHeight, ) - : undefined; + : Promise.resolve(undefined as Map | undefined), + ]); + const emptyPnl: TokenPnlResult = { + pnls: {}, + totalCostBasisAe: 0, + totalCostBasisUsd: 0, + totalCurrentValueAe: 0, + totalCurrentValueUsd: 0, + totalGainAe: 0, + totalGainUsd: 0, + }; const balanceCache = new Map>(); - const pnlCache = new Map>(); const data = await this.mapWithConcurrency( timestamps, this.snapshotConcurrency, @@ -222,36 +278,22 @@ export class PortfolioService { ); const blockHeight = blockHeights[index]; - // Keep repeated block-height lookups inside a request deduplicated. + // Balance still requires an external AE node call per unique block height const aeBalancePromise = this.getCachedBalance( balanceCache, address, blockHeight, ); - const tokensPnlPromise = this.getCachedTokenPnl( - pnlCache, - address, - blockHeight, - undefined, - ); - // If range-based PNL is requested, calculate it separately for PNL fields only. - // For index 0 we reuse the cumulative value to preserve existing semantics. - const rangeBasedPnlPromise = + const tokensPnl = pnlMap.get(blockHeight) ?? emptyPnl; + + // For range-based PNL, index 0 reuses the cumulative result (existing semantics) + const rangeBasedPnl = useRangeBasedPnl && includePnl && index > 0 - ? this.getCachedTokenPnl( - pnlCache, - address, - blockHeight, - startBlockHeight, - ) + ? (rangePnlMap?.get(blockHeight) ?? undefined) : undefined; - const [aeBalance, tokensPnl, rangeBasedPnl] = await Promise.all([ - aeBalancePromise, - tokensPnlPromise, - rangeBasedPnlPromise, - ]); + const aeBalance = await aeBalancePromise; const balance = Number(toAe(aeBalance)); // Use current value of tokens owned (from cumulative PNL service call) @@ -308,7 +350,7 @@ export class PortfolioService { // For range-based PNL with hover support: each snapshot shows PNL from startDate to that timestamp // First snapshot: cumulative from start (null) to current timestamp // All other snapshots: from startDate to current timestamp - const rangeFrom = index === 0 ? null : actualStartDate; + const rangeFrom = index === 0 ? null : start; const rangeTo = timestamp; result.total_pnl.range = { from: rangeFrom, @@ -332,36 +374,23 @@ export class PortfolioService { address: string, blockHeight: number, ): Promise { - const cached = cache.get(blockHeight); + // Snap to the nearest lower multiple of BALANCE_BUCKET_SIZE so that + // all block heights within the same 300-block window share one AE node + // call instead of each issuing a slow historical-state lookup. + // e.g. heights 900–1199 all use height 900; heights 1200–1499 use 1200. + const bucketHeight = + Math.floor(blockHeight / this.BALANCE_BUCKET_SIZE) * + this.BALANCE_BUCKET_SIZE; + + const cached = cache.get(bucketHeight); if (cached) { return cached; } const promise = this.aeSdkService.sdk.getBalance(address as any, { - height: blockHeight, + height: bucketHeight, } as any); - cache.set(blockHeight, promise); - return promise; - } - - private getCachedTokenPnl( - cache: Map>, - address: string, - blockHeight: number, - fromBlockHeight?: number, - ): Promise { - const key = `${address}:${blockHeight}:${fromBlockHeight ?? 'all'}`; - const cached = cache.get(key); - if (cached) { - return cached; - } - - const promise = this.bclPnlService.calculateTokenPnls( - address, - blockHeight, - fromBlockHeight, - ); - cache.set(key, promise); + cache.set(bucketHeight, promise); return promise; } diff --git a/src/mdw-sync/entities/key-block.entity.ts b/src/mdw-sync/entities/key-block.entity.ts index e278406d..e213d365 100644 --- a/src/mdw-sync/entities/key-block.entity.ts +++ b/src/mdw-sync/entities/key-block.entity.ts @@ -17,6 +17,7 @@ import { Searchable } from '@/api-core/decorators/searchable.decorator'; @Index(['hash']) @Index(['prev_hash']) @Index(['prev_key_hash']) +@Index(['time']) @ObjectType() export class KeyBlock { @PrimaryColumn() diff --git a/src/utils/getBlochHeight.ts b/src/utils/getBlochHeight.ts index abc01979..5d609c04 100644 --- a/src/utils/getBlochHeight.ts +++ b/src/utils/getBlochHeight.ts @@ -353,3 +353,113 @@ export async function timestampToAeHeight( return low; } + +/** + * Resolves multiple timestamps to block heights in a single SQL query against the local + * key_blocks table. For any timestamps not covered by the local table (sync gaps), + * falls back to the individual timestampToAeHeight function. + * + * @param targetTimestamps - Array of timestamps in milliseconds + * @param dataSource - TypeORM DataSource for database queries + * @returns Map from each input timestamp (ms) to its resolved block height + */ +export async function batchTimestampToAeHeight( + targetTimestamps: number[], + dataSource: DataSource, +): Promise> { + if (targetTimestamps.length === 0) { + return new Map(); + } + + const result = new Map(); + + // ── Tier 1: key_blocks ──────────────────────────────────────────────────── + // key_blocks.time is numeric (milliseconds). Comparing directly (no CAST) + // lets PostgreSQL use the btree index via an index scan backward, reducing + // this from a full seq-scan (~450 k rows × N iterations) to O(log N) per + // timestamp. + let rows: Array<{ target_ms: string; height: number | null }> = []; + try { + rows = await dataSource.query( + ` + SELECT t.target_ms, kb.height + FROM unnest($1::bigint[]) AS t(target_ms) + LEFT JOIN LATERAL ( + SELECT height + FROM key_blocks + WHERE time <= t.target_ms + ORDER BY time DESC + LIMIT 1 + ) kb ON true + `, + [targetTimestamps], + ); + } catch (error) { + console.warn( + '[batchTimestampToAeHeight] key_blocks query failed, will try transactions fallback:', + error, + ); + } + + // Build a map of resolved timestamps from the key_blocks result + const resolvedFromDb = new Map(); + for (const row of rows) { + const ts = Number(row.target_ms); + if (row.height !== null && row.height !== undefined) { + resolvedFromDb.set(ts, Number(row.height)); + } + } + + // ── Tier 2: transactions table ──────────────────────────────────────────── + // For timestamps that key_blocks could not resolve (sync gaps, future dates, + // etc.) we fall back to the transactions table. The `created_at` index lets + // PostgreSQL do a single index scan backward per timestamp — no HTTP calls, + // no guessing, no binary search against the Middleware. + const fallbackTimestamps = targetTimestamps.filter( + (ts) => !resolvedFromDb.has(ts), + ); + + if (fallbackTimestamps.length > 0) { + let txRows: Array<{ target_ms: string; block_height: number | null }> = []; + try { + txRows = await dataSource.query( + ` + SELECT t.target_ms, tx.block_height + FROM unnest($1::bigint[]) AS t(target_ms) + LEFT JOIN LATERAL ( + SELECT block_height + FROM transactions + WHERE created_at <= to_timestamp(t.target_ms::float8 / 1000) + AND block_height IS NOT NULL + AND block_height > 0 + ORDER BY created_at DESC + LIMIT 1 + ) tx ON true + `, + [fallbackTimestamps], + ); + } catch (error) { + console.warn( + '[batchTimestampToAeHeight] transactions fallback query failed:', + error, + ); + } + + for (const row of txRows) { + const ts = Number(row.target_ms); + if (row.block_height !== null && row.block_height !== undefined) { + resolvedFromDb.set(ts, Number(row.block_height)); + } + } + } + + // Merge into the final result map + for (const ts of targetTimestamps) { + const height = resolvedFromDb.get(ts); + if (height !== undefined) { + result.set(ts, height); + } + } + + return result; +} diff --git a/src/utils/getBlockHeight.spec.ts b/src/utils/getBlockHeight.spec.ts new file mode 100644 index 00000000..caac9def --- /dev/null +++ b/src/utils/getBlockHeight.spec.ts @@ -0,0 +1,126 @@ +import { batchTimestampToAeHeight } from './getBlochHeight'; + +// Suppress network-related console output during tests +const originalWarn = console.warn; +beforeAll(() => { + console.warn = jest.fn(); +}); +afterAll(() => { + console.warn = originalWarn; +}); + +describe('batchTimestampToAeHeight', () => { + // Create a DataSource mock whose query() returns different results on first + // call (key_blocks) vs. subsequent calls (transactions fallback). + const makeDataSource = ( + keyBlocksResult: Array<{ target_ms: string; height: number | null }>, + txFallbackResult: Array<{ + target_ms: string; + block_height: number | null; + }> = [], + ) => ({ + query: jest + .fn() + .mockResolvedValueOnce(keyBlocksResult) // 1st call: key_blocks + .mockResolvedValue(txFallbackResult), // subsequent: transactions + }); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('returns empty map for empty input without querying the DB', async () => { + const dataSource = makeDataSource([]); + + const result = await batchTimestampToAeHeight([], dataSource as any); + + expect(dataSource.query).not.toHaveBeenCalled(); + expect(result).toBeInstanceOf(Map); + expect(result.size).toBe(0); + }); + + it('resolves all timestamps from key_blocks in one query — no fallback needed', async () => { + const ts1 = 1_700_000_000_000; + const ts2 = 1_700_086_400_000; + const dataSource = makeDataSource([ + { target_ms: String(ts1), height: 100 }, + { target_ms: String(ts2), height: 200 }, + ]); + + const result = await batchTimestampToAeHeight([ts1, ts2], dataSource as any); + + // Only one SQL round-trip when key_blocks resolves everything + expect(dataSource.query).toHaveBeenCalledTimes(1); + const [sql, params] = dataSource.query.mock.calls[0]; + + // key_blocks query: direct comparison (no CAST) enables index scan + expect(sql).toContain('unnest($1::bigint[])'); + expect(sql).toContain('key_blocks'); + expect(sql).not.toContain('CAST(time AS bigint)'); + expect(sql).toContain('ORDER BY time DESC'); + expect(params).toEqual([[ts1, ts2]]); + + expect(result.get(ts1)).toBe(100); + expect(result.get(ts2)).toBe(200); + }); + + it('falls back to transactions table for timestamps key_blocks returns null', async () => { + const ts1 = 1_700_000_000_000; // resolved by key_blocks + const ts2 = 1_700_086_400_000; // key_blocks gap → resolved by transactions + + const dataSource = makeDataSource( + [ + { target_ms: String(ts1), height: 100 }, + { target_ms: String(ts2), height: null }, + ], + [{ target_ms: String(ts2), block_height: 205 }], + ); + + const result = await batchTimestampToAeHeight([ts1, ts2], dataSource as any); + + // Two DB round-trips: key_blocks + transactions (no HTTP/guessing calls) + expect(dataSource.query).toHaveBeenCalledTimes(2); + + const [txSql, txParams] = dataSource.query.mock.calls[1]; + expect(txSql).toContain('transactions'); + expect(txSql).toContain('created_at'); + expect(txSql).toContain('to_timestamp'); + expect(txSql).toContain('ORDER BY created_at DESC'); + // Only the unresolved timestamp is passed to the fallback query + expect(txParams).toEqual([[ts2]]); + + expect(result.get(ts1)).toBe(100); + expect(result.get(ts2)).toBe(205); + }); + + it('uses a LATERAL join with index-friendly direct column comparison', async () => { + const ts = 1_700_000_000_000; + const dataSource = makeDataSource([{ target_ms: String(ts), height: 555 }]); + + await batchTimestampToAeHeight([ts], dataSource as any); + + const [sql] = dataSource.query.mock.calls[0]; + expect(sql).toContain('LEFT JOIN LATERAL'); + // No CAST wrapper — that would force a seq scan instead of index scan + expect(sql).not.toContain('CAST(time AS bigint)'); + expect(sql).toContain('ORDER BY time DESC'); + expect(sql).toContain('LIMIT 1'); + }); + + it('includes all input timestamps in the SQL parameter array', async () => { + const timestamps = [ + 1_700_000_000_000, 1_700_086_400_000, 1_700_172_800_000, + ]; + const dataSource = makeDataSource( + timestamps.map((ts, i) => ({ target_ms: String(ts), height: i + 1 })), + ); + + const result = await batchTimestampToAeHeight(timestamps, dataSource as any); + + const [, params] = dataSource.query.mock.calls[0]; + expect(params[0]).toEqual(timestamps); + expect(result.get(timestamps[0])).toBe(1); + expect(result.get(timestamps[1])).toBe(2); + expect(result.get(timestamps[2])).toBe(3); + }); +});