A modern ETL pipeline that collects DeFi protocol data from The Graph API, processes it through a layered data warehouse architecture, and provides interactive analytics.
TheGraph API → MinIO → PostgreSQL → Streamlit
↓ ↓ ↓ ↓
Raw data Storage Warehouse Analytics
- Airflow: Orchestrates data pipelines with AI-powered query generation
- MinIO: Object storage for raw data files
- PostgreSQL: Data warehouse with Silver (staging) and Gold (analytics) layers
- Streamlit: Interactive dashboard for data visualization
- DBT: Data transformations and modeling
- Apache Airflow 3.0.4 + CeleryExecutor
- PostgreSQL + pgvector
- MinIO (S3-compatible)
- DBT + Streamlit
- LangChain + OpenAI GPT models
- Docker and Docker Compose
- At least 4GB RAM and 2 CPU cores
- TheGraph API key (for blockchain data access)
- OpenAI API key (for AI-powered query generation)
-
Clone the repository
git clone <repository-url> cd airflow-docker
-
Create environment file
cp .env.example .env
Then edit
.envand replace the placeholder values with your actual credentials: -
Configure required environment variables
# TheGraph API Configuration GRAPH_GATEWAY_API_KEY=your_thegraph_api_key_here # OpenAI Configuration OPENAI_API_KEY=your_openai_api_key_here OPENAI_MODEL=gpt-4 # or gpt-3.5-turbo # Database Configuration POSTGRES_DB_WAREHOUSE=defi_datawarehouse POSTGRES_USER_WAREHOUSE=defi_user POSTGRES_PASSWORD_WAREHOUSE=secure_password_here # Optional: MinIO Configuration MINIO_ROOT_USER=minio MINIO_ROOT_PASSWORD=minio12345
-
Launch the platform
# Start all services docker-compose up -d # View logs docker-compose logs -f # Stop services docker-compose down
- Airflow UI: http://localhost:8080 (admin/airflow)
- MinIO Console: http://localhost:9001 (minio/minio12345)
- Streamlit Dashboard: http://localhost:8501
- Flower (Celery Monitor): http://localhost:5555
Purpose: Fetches DeFi pool data from The Graph protocol subgraphs using AI-generated GraphQL queries.
AI Query Generation Process:
- Schema Introspection: Queries GraphQL schema to understand available fields and types
- AI Analysis: Uses LangChain + OpenAI GPT to analyze schema and generate optimal GraphQL queries
- Self-Healing Retry: If queries fail, AI analyzes errors and generates improved queries up to 3 attempts
- Data Storage: Stores raw JSON responses in MinIO with date-partitioned structure
Why AI?
- Dynamic Schema Handling: Each DeFi protocol has different GraphQL schemas - AI adapts automatically
- Error Recovery: Learns from GraphQL errors (missing fields, wrong arguments) and fixes queries
- Protocol Agnostic: Works with Uniswap, Curve, Balancer without protocol-specific code
- Cost Efficiency: Reduces manual debugging and failed pipeline runs
Example AI Prompt:
Analyze this GraphQL schema and generate a query that extracts:
- pools_all: pool_address, created_at, subgraph_id
- pool_tokens_all: token_address, token_symbol, token_name, position_index
- tokens_all: token_address, token_symbol, token_name
Purpose: Processes raw data from MinIO and maps it to database tables using AI-driven field mapping.
AI Data Mapping Process:
- Dynamic Field Detection: AI analyzes actual data structure vs. target database schema
- Mapping Generation: Creates field mappings (e.g.,
pool_address→idoraddress) - Flexible Processing: Handles varying data formats across different protocols
- Database Storage: Stores mapped data in Silver layer tables with upsert logic
Why AI Mapping?
- Schema Flexibility: Protocols return different field names and structures - AI maps them correctly
- Zero Manual Mapping: No hardcoded field mappings per protocol
- Error Tolerance: Gracefully handles missing fields or unexpected data structures
- Future-Proof: Adapts to protocol schema changes automatically
Example AI Mapping:
{
"data_structure": {
"pools_array_path": "data.pools_all",
"tokens_array_path": "data.tokens_all"
},
"field_mappings": {
"pools": {
"pool_address": "id",
"created_at": "createdAt"
}
}
}- Docker & Docker Compose
- TheGraph API key
- OpenAI API key
cp .env.example .env
# Edit .env with your API keys:
# GRAPH_GATEWAY_API_KEY=your_thegraph_key
# OPENAI_API_KEY=your_openai_key
# POSTGRES_*_WAREHOUSE=your_db_credentialsdocker-compose up -d- Airflow UI: http://localhost:8080 (admin/airflow)
- Streamlit: http://localhost:8501
- MinIO: http://localhost:9001 (minio/minio12345)
- AI Query Generation → TheGraph API → Raw JSON in MinIO
- AI Data Mapping → Process & Transform → Silver Layer DB
- DBT Models → Gold Layer → Analytics Dashboard
The AI-powered approach eliminates manual schema mapping and query writing, making the pipeline protocol-agnostic and future-proof.