From bdfeb6bc29b043703dd8da4cad9dc6d36e91f5ee Mon Sep 17 00:00:00 2001 From: HuiNeng <3650306360@qq.com> Date: Wed, 25 Mar 2026 05:02:13 +0800 Subject: [PATCH] feat: add event indexer service for PrivacyLayer contracts Implements #63 - Contract Events Indexer Features: - Real-time event indexing via Soroban RPC - PostgreSQL storage with Prisma ORM - GraphQL API for querying events - WebSocket subscriptions for real-time updates - Support for all contract events (Deposit, Withdraw, PoolPaused, PoolUnpaused, VkUpdated) - Merkle tree state tracking - Docker deployment support Technical stack: - TypeScript/Node.js - Apollo Server (GraphQL) - Prisma (PostgreSQL ORM) - soroban-client (Stellar SDK) --- indexer/.env.example | 19 +++ indexer/.gitignore | 32 ++++ indexer/Dockerfile | 20 +++ indexer/README.md | 239 ++++++++++++++++++++++++++++ indexer/docker-compose.yml | 37 +++++ indexer/package.json | 45 ++++++ indexer/prisma/schema.prisma | 102 ++++++++++++ indexer/src/index.ts | 123 +++++++++++++++ indexer/src/indexer.ts | 247 +++++++++++++++++++++++++++++ indexer/src/parsers.ts | 291 +++++++++++++++++++++++++++++++++++ indexer/src/resolvers.ts | 259 +++++++++++++++++++++++++++++++ indexer/src/schema.graphql | 127 +++++++++++++++ indexer/tsconfig.json | 19 +++ 13 files changed, 1560 insertions(+) create mode 100644 indexer/.env.example create mode 100644 indexer/.gitignore create mode 100644 indexer/Dockerfile create mode 100644 indexer/README.md create mode 100644 indexer/docker-compose.yml create mode 100644 indexer/package.json create mode 100644 indexer/prisma/schema.prisma create mode 100644 indexer/src/index.ts create mode 100644 indexer/src/indexer.ts create mode 100644 indexer/src/parsers.ts create mode 100644 indexer/src/resolvers.ts create mode 100644 indexer/src/schema.graphql create mode 100644 indexer/tsconfig.json diff --git a/indexer/.env.example b/indexer/.env.example new file mode 100644 index 0000000..71b9ff3 --- /dev/null +++ b/indexer/.env.example @@ -0,0 +1,19 @@ +# PrivacyLayer Event Indexer Configuration + +# Server Configuration +PORT=4000 + +# Database Configuration +DATABASE_URL="postgresql://user:password@localhost:5432/privacylayer_indexer?schema=public" + +# Stellar/Soroban Configuration +SOROBAN_RPC_URL=https://soroban-testnet.stellar.org:443 +NETWORK_PASSPHRASE=Test SDF Network ; September 2015 +CONTRACT_ID=your_contract_id_here + +# Indexer Configuration +POLL_INTERVAL=5000 + +# For mainnet, use: +# SOROBAN_RPC_URL=https://soroban.stellar.org:443 +# NETWORK_PASSPHRASE=Public Global Stellar Network ; September 2015 \ No newline at end of file diff --git a/indexer/.gitignore b/indexer/.gitignore new file mode 100644 index 0000000..e9a4175 --- /dev/null +++ b/indexer/.gitignore @@ -0,0 +1,32 @@ +# Dependencies +node_modules/ + +# Build output +dist/ + +# Environment files +.env +.env.local +.env.*.local + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# OS files +.DS_Store +Thumbs.db + +# Logs +logs/ +*.log +npm-debug.log* + +# Test coverage +coverage/ + +# Prisma +prisma/*.db +prisma/*.db-journal \ No newline at end of file diff --git a/indexer/Dockerfile b/indexer/Dockerfile new file mode 100644 index 0000000..541afe7 --- /dev/null +++ b/indexer/Dockerfile @@ -0,0 +1,20 @@ +FROM node:18-alpine + +WORKDIR /app + +# Install dependencies +COPY package*.json ./ +RUN npm ci --only=production + +# Copy Prisma schema +COPY prisma ./prisma/ +RUN npx prisma generate + +# Copy source code +COPY dist ./dist/ + +# Expose port +EXPOSE 4000 + +# Start the server +CMD ["node", "dist/index.js"] \ No newline at end of file diff --git a/indexer/README.md b/indexer/README.md new file mode 100644 index 0000000..fa86f7f --- /dev/null +++ b/indexer/README.md @@ -0,0 +1,239 @@ +# PrivacyLayer Event Indexer + +A high-performance event indexer for the PrivacyLayer smart contract on Stellar/Soroban. + +## Features + +- **Real-time Event Indexing**: Automatically indexes all PrivacyLayer contract events +- **GraphQL API**: Query deposits, withdrawals, and admin events efficiently +- **WebSocket Subscriptions**: Real-time updates via GraphQL subscriptions +- **PostgreSQL Storage**: Persistent, indexed storage for historical queries +- **Docker Support**: Easy deployment with Docker Compose + +## Events Indexed + +| Event | Description | Fields | +|-------|-------------|--------| +| `DepositEvent` | Emitted on deposit | commitment, leafIndex, root | +| `WithdrawEvent` | Emitted on withdrawal | nullifierHash, recipient, relayer, fee, amount | +| `PoolPausedEvent` | Emitted when pool is paused | admin | +| `PoolUnpausedEvent` | Emitted when pool is unpaused | admin | +| `VkUpdatedEvent` | Emitted when verifying key is updated | admin | + +## Quick Start + +### Prerequisites + +- Node.js 18+ +- PostgreSQL 15+ +- npm or yarn + +### Installation + +```bash +# Clone the repository +git clone https://github.com/ANAVHEOBA/PrivacyLayer.git +cd PrivacyLayer/indexer + +# Install dependencies +npm install + +# Copy environment file +cp .env.example .env + +# Edit .env with your configuration +# - DATABASE_URL: PostgreSQL connection string +# - CONTRACT_ID: Your deployed PrivacyLayer contract ID +# - SOROBAN_RPC_URL: Soroban RPC endpoint +``` + +### Database Setup + +```bash +# Run migrations +npx prisma migrate dev --name init + +# (Optional) Open Prisma Studio to view data +npx prisma studio +``` + +### Running the Indexer + +```bash +# Development mode with auto-reload +npm run dev + +# Production build +npm run build +npm start +``` + +### Docker Deployment + +```bash +# Build and run with Docker Compose +docker-compose up -d + +# View logs +docker-compose logs -f indexer +``` + +## API Documentation + +### GraphQL Endpoint + +The indexer exposes a GraphQL API at `http://localhost:4000/graphql`. + +### Example Queries + +#### Get recent deposits + +```graphql +query { + deposits(pagination: { skip: 0, take: 10 }) { + id + commitment + leafIndex + root + txHash + timestamp + ledger + } +} +``` + +#### Get withdrawals by recipient + +```graphql +query { + withdrawals(filter: { recipient: "G..." }) { + id + nullifierHash + recipient + amount + timestamp + } +} +``` + +#### Check if commitment is used + +```graphql +query { + isCommitmentUsed(commitment: "0x...") +} +``` + +#### Get current Merkle tree state + +```graphql +query { + merkleTreeState { + currentRoot + leafCount + lastUpdated + } +} +``` + +### Real-time Subscriptions + +#### Subscribe to new deposits + +```graphql +subscription { + onDeposit { + commitment + leafIndex + root + timestamp + } +} +``` + +#### Subscribe to Merkle tree updates + +```graphql +subscription { + onMerkleTreeUpdate { + currentRoot + leafCount + lastUpdated + } +} +``` + +## Configuration + +| Environment Variable | Description | Default | +|---------------------|-------------|---------| +| `PORT` | API server port | `4000` | +| `DATABASE_URL` | PostgreSQL connection string | Required | +| `SOROBAN_RPC_URL` | Soroban RPC endpoint | `https://soroban-testnet.stellar.org:443` | +| `NETWORK_PASSPHRASE` | Stellar network passphrase | Testnet | +| `CONTRACT_ID` | PrivacyLayer contract ID | Required | +| `POLL_INTERVAL` | Event polling interval (ms) | `5000` | + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ GraphQL API │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ +│ │ Queries │ │ Mutations │ │ Subscriptions │ │ +│ └─────────────┘ └─────────────┘ └─────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Event Indexer │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ +│ │ Parser │ │ Processor │ │ Merkle Sync │ │ +│ └─────────────┘ └─────────────┘ └─────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ + │ │ │ + ▼ ▼ ▼ +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Soroban RPC │ │ PostgreSQL │ │ PubSub │ +│ (Events) │ │ (Storage) │ │ (WebSocket) │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ +``` + +## Development + +### Project Structure + +``` +indexer/ +├── src/ +│ ├── index.ts # Main entry point +│ ├── indexer.ts # Event indexing logic +│ ├── parsers.ts # Event parsing utilities +│ ├── resolvers.ts # GraphQL resolvers +│ └── schema.graphql # GraphQL schema +├── prisma/ +│ └── schema.prisma # Database schema +├── package.json +├── tsconfig.json +├── Dockerfile +├── docker-compose.yml +└── README.md +``` + +### Running Tests + +```bash +npm test +``` + +## License + +MIT License - See LICENSE file for details. + +## Contributing + +Contributions are welcome! Please see the main [CONTRIBUTING.md](../CONTRIBUTING.md) for guidelines. + +## Support + +For issues and feature requests, please open a GitHub issue. \ No newline at end of file diff --git a/indexer/docker-compose.yml b/indexer/docker-compose.yml new file mode 100644 index 0000000..3079585 --- /dev/null +++ b/indexer/docker-compose.yml @@ -0,0 +1,37 @@ +version: '3.8' + +services: + postgres: + image: postgres:15-alpine + environment: + POSTGRES_USER: privacylayer + POSTGRES_PASSWORD: privacylayer_secret + POSTGRES_DB: privacylayer_indexer + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U privacylayer -d privacylayer_indexer"] + interval: 5s + timeout: 5s + retries: 5 + + indexer: + build: . + environment: + DATABASE_URL: "postgresql://privacylayer:privacylayer_secret@postgres:5432/privacylayer_indexer?schema=public" + SOROBAN_RPC_URL: ${SOROBAN_RPC_URL:-https://soroban-testnet.stellar.org:443} + NETWORK_PASSPHRASE: ${NETWORK_PASSPHRASE:-Test SDF Network ; September 2015} + CONTRACT_ID: ${CONTRACT_ID} + PORT: 4000 + POLL_INTERVAL: 5000 + ports: + - "4000:4000" + depends_on: + postgres: + condition: service_healthy + restart: unless-stopped + +volumes: + postgres_data: \ No newline at end of file diff --git a/indexer/package.json b/indexer/package.json new file mode 100644 index 0000000..b798866 --- /dev/null +++ b/indexer/package.json @@ -0,0 +1,45 @@ +{ + "name": "privacylayer-indexer", + "version": "1.0.0", + "description": "Event indexer service for PrivacyLayer smart contracts", + "main": "dist/index.js", + "scripts": { + "build": "tsc", + "start": "node dist/index.js", + "dev": "ts-node-dev --respawn src/index.ts", + "codegen": "graphql-codegen --config codegen.yml", + "migrate": "prisma migrate dev", + "studio": "prisma studio", + "test": "jest" + }, + "dependencies": { + "@apollo/server": "^4.10.0", + "@graphql-tools/schema": "^10.0.0", + "@prisma/client": "^5.8.0", + "body-parser": "^1.20.2", + "cors": "^2.8.5", + "dotenv": "^16.3.1", + "express": "^4.18.2", + "graphql": "^16.8.1", + "graphql-subscriptions": "^2.0.0", + "graphql-ws": "^5.14.2", + "soroban-client": "^1.0.0-beta.6", + "ws": "^8.16.0" + }, + "devDependencies": { + "@graphql-codegen/cli": "^5.0.0", + "@graphql-codegen/typescript": "^4.0.1", + "@graphql-codegen/typescript-resolvers": "^4.0.1", + "@types/cors": "^2.8.17", + "@types/express": "^4.17.21", + "@types/node": "^20.10.5", + "@types/ws": "^8.5.10", + "prisma": "^5.8.0", + "ts-node": "^10.9.2", + "ts-node-dev": "^2.0.0", + "typescript": "^5.3.3" + }, + "engines": { + "node": ">=18" + } +} \ No newline at end of file diff --git a/indexer/prisma/schema.prisma b/indexer/prisma/schema.prisma new file mode 100644 index 0000000..ac3d9c3 --- /dev/null +++ b/indexer/prisma/schema.prisma @@ -0,0 +1,102 @@ +// Prisma Schema for PrivacyLayer Event Indexer +// Stores all contract events for efficient querying + +generator client { + provider = "prisma-client-js" +} + +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") +} + +// Deposit events - emitted when user deposits into the shielded pool +model DepositEvent { + id String @id @default(cuid()) + commitment String // 32-byte hex string + leafIndex Int // Position in Merkle tree + root String // Merkle root after insertion + txHash String // Transaction hash + timestamp DateTime @default(now()) + ledger BigInt // Stellar ledger number + + @@index([commitment]) + @@index([leafIndex]) + @@index([timestamp]) + @@index([ledger]) + @@map("deposit_events") +} + +// Withdrawal events - emitted when user withdraws from the shielded pool +model WithdrawEvent { + id String @id @default(cuid()) + nullifierHash String // 32-byte hex string + recipient String // Stellar address + relayer String? // Optional relayer address + fee BigInt // Fee paid to relayer + amount BigInt // Amount withdrawn + txHash String // Transaction hash + timestamp DateTime @default(now()) + ledger BigInt // Stellar ledger number + + @@index([nullifierHash]) + @@index([recipient]) + @@index([timestamp]) + @@index([ledger]) + @@map("withdraw_events") +} + +// Pool pause events - emitted when admin pauses the pool +model PoolPausedEvent { + id String @id @default(cuid()) + admin String // Admin address + txHash String // Transaction hash + timestamp DateTime @default(now()) + ledger BigInt // Stellar ledger number + + @@index([timestamp]) + @@map("pool_paused_events") +} + +// Pool unpause events - emitted when admin unpauses the pool +model PoolUnpausedEvent { + id String @id @default(cuid()) + admin String // Admin address + txHash String // Transaction hash + timestamp DateTime @default(now()) + ledger BigInt // Stellar ledger number + + @@index([timestamp]) + @@map("pool_unpaused_events") +} + +// Verifying key update events - emitted when admin updates the VK +model VkUpdatedEvent { + id String @id @default(cuid()) + admin String // Admin address + txHash String // Transaction hash + timestamp DateTime @default(now()) + ledger BigInt // Stellar ledger number + + @@index([timestamp]) + @@map("vk_updated_events") +} + +// Merkle tree state - tracks the current state of the tree +model MerkleTreeState { + id String @id @default("singleton") + currentRoot String // Current Merkle root + leafCount Int @default(0) + lastUpdated DateTime @default(now()) + + @@map("merkle_tree_state") +} + +// Indexer state - tracks the last indexed ledger +model IndexerState { + id String @id @default("singleton") + lastLedger BigInt @default(0) + lastUpdated DateTime @default(now()) + + @@map("indexer_state") +} \ No newline at end of file diff --git a/indexer/src/index.ts b/indexer/src/index.ts new file mode 100644 index 0000000..4eecfac --- /dev/null +++ b/indexer/src/index.ts @@ -0,0 +1,123 @@ +/** + * PrivacyLayer Event Indexer + * + * Main entry point for the indexing service. + * - Connects to Stellar network via Soroban RPC + * - Indexes contract events into PostgreSQL + * - Exposes GraphQL API for querying + * - Supports real-time subscriptions via WebSocket + */ + +import { ApolloServer } from '@apollo/server'; +import { expressMiddleware } from '@apollo/server/express4'; +import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer'; +import express from 'express'; +import http from 'http'; +import cors from 'cors'; +import bodyParser from 'body-parser'; +import { WebSocketServer } from 'ws'; +import { useServer } from 'graphql-ws/lib/use/ws'; +import { makeExecutableSchema } from '@graphql-tools/schema'; +import { PubSub } from 'graphql-subscriptions'; +import { readFileSync } from 'fs'; +import { join } from 'path'; + +import { PrismaClient } from '@prisma/client'; +import { EventIndexer } from './indexer'; +import { resolvers } from './resolvers'; + +// Load environment variables +import 'dotenv/config'; + +const prisma = new PrismaClient(); +const pubsub = new PubSub(); + +// Load GraphQL schema +const typeDefs = readFileSync(join(__dirname, 'schema.graphql'), 'utf-8'); + +// Create executable schema +const schema = makeExecutableSchema({ typeDefs, resolvers }); + +async function startServer() { + const app = express(); + const httpServer = http.createServer(app); + + // Create WebSocket server for subscriptions + const wsServer = new WebSocketServer({ + server: httpServer, + path: '/graphql', + }); + + // Hand in the schema we just created and have the + // WebSocketServer start listening. + const serverCleanup = useServer({ schema }, wsServer); + + // Create Apollo Server + const server = new ApolloServer({ + schema, + plugins: [ + // Proper shutdown for the HTTP server. + ApolloServerPluginDrainHttpServer({ httpServer }), + // Proper shutdown for the WebSocket server. + { + async serverWillStart() { + return { + async drainServer() { + await serverCleanup.dispose(); + }, + }; + }, + }, + ], + }); + + await server.start(); + + app.use( + '/graphql', + cors(), + bodyParser.json(), + expressMiddleware(server, { + context: async () => ({ prisma, pubsub }), + }), + ); + + // Health check endpoint + app.get('/health', (req, res) => { + res.json({ status: 'healthy', timestamp: new Date().toISOString() }); + }); + + // Start indexer + const indexer = new EventIndexer(prisma, pubsub); + + const PORT = process.env.PORT || 4000; + + await new Promise((resolve) => httpServer.listen({ port: PORT }, resolve)); + + console.log(`🚀 Server ready at http://localhost:${PORT}/graphql`); + console.log(`🛰️ Subscriptions ready at ws://localhost:${PORT}/graphql`); + + // Start indexing + try { + await indexer.start(); + console.log('📊 Event indexer started'); + } catch (error) { + console.error('Failed to start indexer:', error); + process.exit(1); + } + + // Graceful shutdown + process.on('SIGTERM', async () => { + console.log('SIGTERM received. Shutting down gracefully...'); + await indexer.stop(); + await prisma.$disconnect(); + await server.stop(); + httpServer.close(); + process.exit(0); + }); +} + +startServer().catch((error) => { + console.error('Failed to start server:', error); + process.exit(1); +}); \ No newline at end of file diff --git a/indexer/src/indexer.ts b/indexer/src/indexer.ts new file mode 100644 index 0000000..a026cc3 --- /dev/null +++ b/indexer/src/indexer.ts @@ -0,0 +1,247 @@ +/** + * Event Indexer for PrivacyLayer Smart Contracts + * + * Monitors Stellar/Soroban network for contract events + * and indexes them into PostgreSQL for efficient querying. + */ + +import { PrismaClient } from '@prisma/client'; +import { PubSub } from 'graphql-subscriptions'; +import { Server, Horizon } from 'soroban-client'; +import { parseEvents } from './parsers'; + +export interface IndexerConfig { + rpcUrl: string; + networkPassphrase: string; + contractId: string; + pollInterval: number; + startLedger?: number; +} + +const DEFAULT_CONFIG: IndexerConfig = { + rpcUrl: process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org:443', + networkPassphrase: process.env.NETWORK_PASSPHRASE || Horizon.Networks.TESTNET, + contractId: process.env.CONTRACT_ID || '', + pollInterval: parseInt(process.env.POLL_INTERVAL || '5000'), +}; + +export class EventIndexer { + private prisma: PrismaClient; + private pubsub: PubSub; + private config: IndexerConfig; + private server: Server; + private running: boolean = false; + private lastLedger: bigint = BigInt(0); + + constructor(prisma: PrismaClient, pubsub: PubSub, config?: Partial) { + this.prisma = prisma; + this.pubsub = pubsub; + this.config = { ...DEFAULT_CONFIG, ...config }; + this.server = new Server(this.config.rpcUrl, { + allowHttp: this.config.rpcUrl.startsWith('http://'), + }); + } + + /** + * Start the event indexer + */ + async start(): Promise { + if (!this.config.contractId) { + throw new Error('CONTRACT_ID environment variable is required'); + } + + // Load last indexed ledger from database + const state = await this.prisma.indexerState.findUnique({ + where: { id: 'singleton' }, + }); + + if (state) { + this.lastLedger = state.lastLedger; + } else { + // If no state exists, start from configured ledger or current ledger + if (this.config.startLedger) { + this.lastLedger = BigInt(this.config.startLedger); + } else { + const latestLedger = await this.server.getLatestLedger(); + this.lastLedger = BigInt(latestLedger.sequence); + } + await this.prisma.indexerState.create({ + data: { id: 'singleton', lastLedger: this.lastLedger }, + }); + } + + console.log(`Starting indexer from ledger ${this.lastLedger}`); + this.running = true; + this.pollLoop(); + } + + /** + * Stop the event indexer + */ + async stop(): Promise { + this.running = false; + } + + /** + * Main polling loop + */ + private async pollLoop(): Promise { + while (this.running) { + try { + await this.poll(); + } catch (error) { + console.error('Error during polling:', error); + } + await this.sleep(this.config.pollInterval); + } + } + + /** + * Poll for new events + */ + private async poll(): Promise { + const latestLedger = await this.server.getLatestLedger(); + const latestSeq = BigInt(latestLedger.sequence); + + if (latestSeq <= this.lastLedger) { + return; // No new ledgers + } + + // Get events for this contract from the last ledger to the latest + const startLedger = Number(this.lastLedger) + 1; + const endLedger = Number(latestSeq); + + try { + const events = await this.server.getEvents({ + startLedger, + filters: [ + { + type: 'contract', + contractIds: [this.config.contractId], + }, + ], + }); + + if (events.events && events.events.length > 0) { + await this.processEvents(events.events); + } + + // Update indexer state + await this.prisma.indexerState.update({ + where: { id: 'singleton' }, + data: { lastLedger: latestSeq, lastUpdated: new Date() }, + }); + + this.lastLedger = latestSeq; + } catch (error) { + console.error(`Error fetching events for ledgers ${startLedger}-${endLedger}:`, error); + } + } + + /** + * Process and store events + */ + private async processEvents(events: any[]): Promise { + const parsedEvents = parseEvents(events); + + for (const event of parsedEvents) { + try { + switch (event.type) { + case 'deposit': { + const deposit = await this.prisma.depositEvent.create({ + data: { + commitment: event.commitment, + leafIndex: event.leafIndex, + root: event.root, + txHash: event.txHash, + ledger: BigInt(event.ledger), + }, + }); + this.pubsub.publish('DEPOSIT', { onDeposit: deposit }); + + // Update merkle tree state + await this.prisma.merkleTreeState.upsert({ + where: { id: 'singleton' }, + create: { + id: 'singleton', + currentRoot: event.root, + leafCount: event.leafIndex + 1, + }, + update: { + currentRoot: event.root, + leafCount: event.leafIndex + 1, + lastUpdated: new Date(), + }, + }); + this.pubsub.publish('MERKLE_TREE_UPDATE', { + onMerkleTreeUpdate: { + currentRoot: event.root, + leafCount: event.leafIndex + 1, + lastUpdated: new Date().toISOString(), + }, + }); + break; + } + + case 'withdraw': { + const withdraw = await this.prisma.withdrawEvent.create({ + data: { + nullifierHash: event.nullifierHash, + recipient: event.recipient, + relayer: event.relayer, + fee: BigInt(event.fee), + amount: BigInt(event.amount), + txHash: event.txHash, + ledger: BigInt(event.ledger), + }, + }); + this.pubsub.publish('WITHDRAW', { onWithdraw: withdraw }); + break; + } + + case 'pool_paused': { + const paused = await this.prisma.poolPausedEvent.create({ + data: { + admin: event.admin, + txHash: event.txHash, + ledger: BigInt(event.ledger), + }, + }); + this.pubsub.publish('POOL_PAUSED', { onPoolPaused: paused }); + break; + } + + case 'pool_unpaused': { + const unpaused = await this.prisma.poolUnpausedEvent.create({ + data: { + admin: event.admin, + txHash: event.txHash, + ledger: BigInt(event.ledger), + }, + }); + this.pubsub.publish('POOL_UNPAUSED', { onPoolUnpaused: unpaused }); + break; + } + + case 'vk_updated': { + const vkUpdated = await this.prisma.vkUpdatedEvent.create({ + data: { + admin: event.admin, + txHash: event.txHash, + ledger: BigInt(event.ledger), + }, + }); + this.pubsub.publish('VK_UPDATED', { onVkUpdated: vkUpdated }); + break; + } + } + } catch (error) { + console.error(`Error processing event:`, error); + } + } + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} \ No newline at end of file diff --git a/indexer/src/parsers.ts b/indexer/src/parsers.ts new file mode 100644 index 0000000..e6018be --- /dev/null +++ b/indexer/src/parsers.ts @@ -0,0 +1,291 @@ +/** + * Event Parsers for PrivacyLayer Contract Events + * + * Parses raw Soroban events into structured format. + */ + +import { xdr } from 'soroban-client'; + +export type ParsedEvent = + | { type: 'deposit'; commitment: string; leafIndex: number; root: string; txHash: string; ledger: number } + | { type: 'withdraw'; nullifierHash: string; recipient: string; relayer: string | null; fee: string; amount: string; txHash: string; ledger: number } + | { type: 'pool_paused'; admin: string; txHash: string; ledger: number } + | { type: 'pool_unpaused'; admin: string; txHash: string; ledger: number } + | { type: 'vk_updated'; admin: string; txHash: string; ledger: number }; + +/** + * Parse raw Soroban events into structured format + */ +export function parseEvents(events: any[]): ParsedEvent[] { + const parsed: ParsedEvent[] = []; + + for (const event of events) { + try { + const parsedEvent = parseEvent(event); + if (parsedEvent) { + parsed.push(parsedEvent); + } + } catch (error) { + console.error('Error parsing event:', error); + } + } + + return parsed; +} + +/** + * Parse a single event + */ +function parseEvent(event: any): ParsedEvent | null { + const topics = event.value?._value?.topics || []; + const data = event.value?._value?.data || []; + const txHash = event.txHash || event.transactionHash || ''; + const ledger = event.ledger || event.ledgerSequence || 0; + + if (!topics || topics.length === 0) { + return null; + } + + // Event type is typically the first topic + const eventType = extractString(topics[0]); + + switch (eventType) { + case 'deposit': + case 'DepositEvent': + return parseDepositEvent(topics, data, txHash, ledger); + + case 'withdraw': + case 'WithdrawEvent': + return parseWithdrawEvent(topics, data, txHash, ledger); + + case 'pool_paused': + case 'PoolPausedEvent': + return parsePoolPausedEvent(topics, data, txHash, ledger); + + case 'pool_unpaused': + case 'PoolUnpausedEvent': + return parsePoolUnpausedEvent(topics, data, txHash, ledger); + + case 'vk_updated': + case 'VkUpdatedEvent': + return parseVkUpdatedEvent(topics, data, txHash, ledger); + + default: + // Try parsing based on data structure + return inferEventType(topics, data, txHash, ledger); + } +} + +/** + * Parse deposit event + */ +function parseDepositEvent( + topics: any[], + data: any[], + txHash: string, + ledger: number +): ParsedEvent { + // Data structure for DepositEvent: + // - commitment: BytesN<32> + // - leaf_index: u32 + // - root: BytesN<32> + + return { + type: 'deposit', + commitment: extractBytes32(data[0] || topics[1]), + leafIndex: extractU32(data[1] || topics[2]), + root: extractBytes32(data[2] || topics[3]), + txHash, + ledger, + }; +} + +/** + * Parse withdrawal event + */ +function parseWithdrawEvent( + topics: any[], + data: any[], + txHash: string, + ledger: number +): ParsedEvent { + // Data structure for WithdrawEvent: + // - nullifier_hash: BytesN<32> + // - recipient: Address + // - relayer: Option
+ // - fee: i128 + // - amount: i128 + + return { + type: 'withdraw', + nullifierHash: extractBytes32(data[0] || topics[1]), + recipient: extractAddress(data[1] || topics[2]), + relayer: extractOptionAddress(data[2] || topics[3]), + fee: extractI128(data[3] || topics[4]), + amount: extractI128(data[4] || topics[5]), + txHash, + ledger, + }; +} + +/** + * Parse pool paused event + */ +function parsePoolPausedEvent( + topics: any[], + data: any[], + txHash: string, + ledger: number +): ParsedEvent { + return { + type: 'pool_paused', + admin: extractAddress(data[0] || topics[1]), + txHash, + ledger, + }; +} + +/** + * Parse pool unpaused event + */ +function parsePoolUnpausedEvent( + topics: any[], + data: any[], + txHash: string, + ledger: number +): ParsedEvent { + return { + type: 'pool_unpaused', + admin: extractAddress(data[0] || topics[1]), + txHash, + ledger, + }; +} + +/** + * Parse VK updated event + */ +function parseVkUpdatedEvent( + topics: any[], + data: any[], + txHash: string, + ledger: number +): ParsedEvent { + return { + type: 'vk_updated', + admin: extractAddress(data[0] || topics[1]), + txHash, + ledger, + }; +} + +/** + * Infer event type from data structure when name doesn't match + */ +function inferEventType( + topics: any[], + data: any[], + txHash: string, + ledger: number +): ParsedEvent | null { + // Try to infer from number of data fields + const dataLength = data?.length || 0; + + if (dataLength === 3) { + // Could be deposit event (commitment, leafIndex, root) + try { + const commitment = extractBytes32(data[0]); + const leafIndex = extractU32(data[1]); + const root = extractBytes32(data[2]); + return { type: 'deposit', commitment, leafIndex, root, txHash, ledger }; + } catch { + return null; + } + } + + if (dataLength === 5) { + // Could be withdraw event + try { + return { + type: 'withdraw', + nullifierHash: extractBytes32(data[0]), + recipient: extractAddress(data[1]), + relayer: extractOptionAddress(data[2]), + fee: extractI128(data[3]), + amount: extractI128(data[4]), + txHash, + ledger, + }; + } catch { + return null; + } + } + + return null; +} + +// Helper functions for extracting values from Soroban event data + +function extractString(value: any): string { + if (typeof value === 'string') return value; + if (value?._value !== undefined) return String(value._value); + if (value?.sym) return value.sym; + return String(value); +} + +function extractBytes32(value: any): string { + if (typeof value === 'string') { + // Already a hex string + return value.startsWith('0x') ? value : `0x${value}`; + } + if (value?.bytes) { + return `0x${value.bytes}`; + } + if (Buffer.isBuffer(value)) { + return `0x${value.toString('hex')}`; + } + // Try to parse as base64 or hex + const str = String(value); + if (str.startsWith('0x')) return str; + try { + const buf = Buffer.from(str, 'base64'); + return `0x${buf.toString('hex')}`; + } catch { + return str; + } +} + +function extractU32(value: any): number { + if (typeof value === 'number') return value; + if (value?._value !== undefined) return Number(value._value); + return Number(value); +} + +function extractAddress(value: any): string { + if (typeof value === 'string') return value; + if (value?.address) return value.address; + if (value?._value !== undefined) return String(value._value); + return String(value); +} + +function extractOptionAddress(value: any): string | null { + if (!value) return null; + if (value === 'None' || value?._arm === 'None') return null; + if (value?._arm === 'Some') return extractAddress(value._value); + try { + return extractAddress(value); + } catch { + return null; + } +} + +function extractI128(value: any): string { + if (typeof value === 'bigint') return value.toString(); + if (typeof value === 'number') return value.toString(); + if (value?._value !== undefined) { + if (typeof value._value === 'bigint') return value._value.toString(); + return String(value._value); + } + if (value?.i128) return value.i128; + return String(value); +} \ No newline at end of file diff --git a/indexer/src/resolvers.ts b/indexer/src/resolvers.ts new file mode 100644 index 0000000..818bec5 --- /dev/null +++ b/indexer/src/resolvers.ts @@ -0,0 +1,259 @@ +/** + * GraphQL Resolvers for PrivacyLayer Event Indexer + */ + +import { PrismaClient } from '@prisma/client'; +import { PubSub } from 'graphql-subscriptions'; + +const DEPOSIT_SUBSCRIPTION = 'DEPOSIT'; +const WITHDRAW_SUBSCRIPTION = 'WITHDRAW'; +const POOL_PAUSED_SUBSCRIPTION = 'POOL_PAUSED'; +const POOL_UNPAUSED_SUBSCRIPTION = 'POOL_UNPAUSED'; +const VK_UPDATED_SUBSCRIPTION = 'VK_UPDATED'; +const MERKLE_TREE_UPDATE_SUBSCRIPTION = 'MERKLE_TREE_UPDATE'; + +interface Context { + prisma: PrismaClient; + pubsub: PubSub; +} + +export const resolvers = { + Query: { + // Deposit queries + deposits: async ( + _: any, + args: { pagination?: { skip?: number; take?: number }; filter?: any }, + context: Context + ) => { + const { skip = 0, take = 20 } = args.pagination || {}; + const { commitment, leafIndexMin, leafIndexMax, timestampAfter, timestampBefore } = + args.filter || {}; + + const where: any = {}; + if (commitment) where.commitment = commitment; + if (leafIndexMin !== undefined || leafIndexMax !== undefined) { + where.leafIndex = {}; + if (leafIndexMin !== undefined) where.leafIndex.gte = leafIndexMin; + if (leafIndexMax !== undefined) where.leafIndex.lte = leafIndexMax; + } + if (timestampAfter || timestampBefore) { + where.timestamp = {}; + if (timestampAfter) where.timestamp.gte = new Date(timestampAfter); + if (timestampBefore) where.timestamp.lte = new Date(timestampBefore); + } + + return context.prisma.depositEvent.findMany({ + where, + skip, + take, + orderBy: { timestamp: 'desc' }, + }); + }, + + deposit: async (_: any, args: { id: string }, context: Context) => { + return context.prisma.depositEvent.findUnique({ + where: { id: args.id }, + }); + }, + + depositByCommitment: async (_: any, args: { commitment: string }, context: Context) => { + return context.prisma.depositEvent.findFirst({ + where: { commitment: args.commitment }, + }); + }, + + // Withdrawal queries + withdrawals: async ( + _: any, + args: { pagination?: { skip?: number; take?: number }; filter?: any }, + context: Context + ) => { + const { skip = 0, take = 20 } = args.pagination || {}; + const { nullifierHash, recipient, relayer, timestampAfter, timestampBefore } = + args.filter || {}; + + const where: any = {}; + if (nullifierHash) where.nullifierHash = nullifierHash; + if (recipient) where.recipient = recipient; + if (relayer) where.relayer = relayer; + + if (timestampAfter || timestampBefore) { + where.timestamp = {}; + if (timestampAfter) where.timestamp.gte = new Date(timestampAfter); + if (timestampBefore) where.timestamp.lte = new Date(timestampBefore); + } + + return context.prisma.withdrawEvent.findMany({ + where, + skip, + take, + orderBy: { timestamp: 'desc' }, + }); + }, + + withdraw: async (_: any, args: { id: string }, context: Context) => { + return context.prisma.withdrawEvent.findUnique({ + where: { id: args.id }, + }); + }, + + withdrawByNullifierHash: async ( + _: any, + args: { nullifierHash: string }, + context: Context + ) => { + return context.prisma.withdrawEvent.findFirst({ + where: { nullifierHash: args.nullifierHash }, + }); + }, + + // Admin event queries + poolPausedEvents: async ( + _: any, + args: { pagination?: { skip?: number; take?: number } }, + context: Context + ) => { + const { skip = 0, take = 20 } = args.pagination || {}; + return context.prisma.poolPausedEvent.findMany({ + skip, + take, + orderBy: { timestamp: 'desc' }, + }); + }, + + poolUnpausedEvents: async ( + _: any, + args: { pagination?: { skip?: number; take?: number } }, + context: Context + ) => { + const { skip = 0, take = 20 } = args.pagination || {}; + return context.prisma.poolUnpausedEvent.findMany({ + skip, + take, + orderBy: { timestamp: 'desc' }, + }); + }, + + vkUpdatedEvents: async ( + _: any, + args: { pagination?: { skip?: number; take?: number } }, + context: Context + ) => { + const { skip = 0, take = 20 } = args.pagination || {}; + return context.prisma.vkUpdatedEvent.findMany({ + skip, + take, + orderBy: { timestamp: 'desc' }, + }); + }, + + // State queries + merkleTreeState: async (_: any, __: any, context: Context) => { + const state = await context.prisma.merkleTreeState.findUnique({ + where: { id: 'singleton' }, + }); + return ( + state || { + currentRoot: '0x0000000000000000000000000000000000000000000000000000000000000000', + leafCount: 0, + lastUpdated: new Date().toISOString(), + } + ); + }, + + indexerStats: async (_: any, __: any, context: Context) => { + const [totalDeposits, totalWithdrawals, indexerState, latestPaused] = await Promise.all([ + context.prisma.depositEvent.count(), + context.prisma.withdrawEvent.count(), + context.prisma.indexerState.findUnique({ where: { id: 'singleton' } }), + context.prisma.poolPausedEvent.findFirst({ + orderBy: { timestamp: 'desc' }, + }), + ]); + + // Check if pool is currently paused (find most recent pause/unpause) + const [lastPaused, lastUnpaused] = await Promise.all([ + context.prisma.poolPausedEvent.findFirst({ orderBy: { timestamp: 'desc' } }), + context.prisma.poolUnpausedEvent.findFirst({ orderBy: { timestamp: 'desc' } }), + ]); + + let isPaused = false; + if (lastPaused) { + if (!lastUnpaused) { + isPaused = true; + } else if (lastPaused.timestamp > lastUnpaused.timestamp) { + isPaused = true; + } + } + + return { + totalDeposits, + totalWithdrawals, + currentLedger: '0', + lastIndexedLedger: indexerState?.lastLedger.toString() || '0', + isPaused, + }; + }, + + // Utility queries + isCommitmentUsed: async (_: any, args: { commitment: string }, context: Context) => { + const deposit = await context.prisma.depositEvent.findFirst({ + where: { commitment: args.commitment }, + }); + return deposit !== null; + }, + + isNullifierUsed: async (_: any, args: { nullifierHash: string }, context: Context) => { + const withdraw = await context.prisma.withdrawEvent.findFirst({ + where: { nullifierHash: args.nullifierHash }, + }); + return withdraw !== null; + }, + }, + + Subscription: { + onDeposit: { + subscribe: (_: any, __: any, context: Context) => + context.pubsub.asyncIterator([DEPOSIT_SUBSCRIPTION]), + }, + onWithdraw: { + subscribe: (_: any, __: any, context: Context) => + context.pubsub.asyncIterator([WITHDRAW_SUBSCRIPTION]), + }, + onPoolPaused: { + subscribe: (_: any, __: any, context: Context) => + context.pubsub.asyncIterator([POOL_PAUSED_SUBSCRIPTION]), + }, + onPoolUnpaused: { + subscribe: (_: any, __: any, context: Context) => + context.pubsub.asyncIterator([POOL_UNPAUSED_SUBSCRIPTION]), + }, + onVkUpdated: { + subscribe: (_: any, __: any, context: Context) => + context.pubsub.asyncIterator([VK_UPDATED_SUBSCRIPTION]), + }, + onMerkleTreeUpdate: { + subscribe: (_: any, __: any, context: Context) => + context.pubsub.asyncIterator([MERKLE_TREE_UPDATE_SUBSCRIPTION]), + }, + }, + + // Type resolvers for converting BigInt to String + DepositEvent: { + ledger: (parent: any) => parent.ledger.toString(), + }, + WithdrawEvent: { + fee: (parent: any) => parent.fee.toString(), + amount: (parent: any) => parent.amount.toString(), + ledger: (parent: any) => parent.ledger.toString(), + }, + PoolPausedEvent: { + ledger: (parent: any) => parent.ledger.toString(), + }, + PoolUnpausedEvent: { + ledger: (parent: any) => parent.ledger.toString(), + }, + VkUpdatedEvent: { + ledger: (parent: any) => parent.ledger.toString(), + }, +}; \ No newline at end of file diff --git a/indexer/src/schema.graphql b/indexer/src/schema.graphql new file mode 100644 index 0000000..7ca12ff --- /dev/null +++ b/indexer/src/schema.graphql @@ -0,0 +1,127 @@ +# GraphQL Schema for PrivacyLayer Event Indexer +# Provides queries and subscriptions for all contract events + +type DepositEvent { + id: String! + commitment: String! + leafIndex: Int! + root: String! + txHash: String! + timestamp: String! + ledger: String! +} + +type WithdrawEvent { + id: String! + nullifierHash: String! + recipient: String! + relayer: String + fee: String! + amount: String! + txHash: String! + timestamp: String! + ledger: String! +} + +type PoolPausedEvent { + id: String! + admin: String! + txHash: String! + timestamp: String! + ledger: String! +} + +type PoolUnpausedEvent { + id: String! + admin: String! + txHash: String! + timestamp: String! + ledger: String! +} + +type VkUpdatedEvent { + id: String! + admin: String! + txHash: String! + timestamp: String! + ledger: String! +} + +type MerkleTreeState { + currentRoot: String! + leafCount: Int! + lastUpdated: String! +} + +type IndexerStats { + totalDeposits: Int! + totalWithdrawals: Int! + currentLedger: String! + lastIndexedLedger: String! + isPaused: Boolean! +} + +# Pagination input +input PaginationInput { + skip: Int = 0 + take: Int = 20 +} + +# Filter inputs +input DepositFilterInput { + commitment: String + leafIndexMin: Int + leafIndexMax: Int + timestampAfter: String + timestampBefore: String +} + +input WithdrawFilterInput { + nullifierHash: String + recipient: String + relayer: String + timestampAfter: String + timestampBefore: String +} + +type Query { + # Deposit queries + deposits(pagination: PaginationInput, filter: DepositFilterInput): [DepositEvent!]! + deposit(id: String!): DepositEvent + depositByCommitment(commitment: String!): DepositEvent + + # Withdrawal queries + withdrawals(pagination: PaginationInput, filter: WithdrawFilterInput): [WithdrawEvent!]! + withdraw(id: String!): WithdrawEvent + withdrawByNullifierHash(nullifierHash: String!): WithdrawEvent + + # Admin event queries + poolPausedEvents(pagination: PaginationInput): [PoolPausedEvent!]! + poolUnpausedEvents(pagination: PaginationInput): [PoolUnpausedEvent!]! + vkUpdatedEvents(pagination: PaginationInput): [VkUpdatedEvent!]! + + # State queries + merkleTreeState: MerkleTreeState! + indexerStats: IndexerStats! + + # Utility queries + isCommitmentUsed(commitment: String!): Boolean! + isNullifierUsed(nullifierHash: String!): Boolean! +} + +type Subscription { + # Real-time event subscriptions + onDeposit: DepositEvent! + onWithdraw: WithdrawEvent! + onPoolPaused: PoolPausedEvent! + onPoolUnpaused: PoolUnpausedEvent! + onVkUpdated: VkUpdatedEvent! + + # State change subscriptions + onMerkleTreeUpdate: MerkleTreeState! +} + +schema { + query: Query + subscription: Subscription +} \ No newline at end of file diff --git a/indexer/tsconfig.json b/indexer/tsconfig.json new file mode 100644 index 0000000..9997121 --- /dev/null +++ b/indexer/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "commonjs", + "lib": ["ES2022"], + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +} \ No newline at end of file