Live cryptocurrency market data ingestion, streaming aggregation, and technical indicator computation.
flowchart LR
subgraph Exchanges
CB[Coinbase WebSocket]
end
subgraph Ingestion
ING[Ingestion Service]
end
subgraph Broker
K[[Kafka — market_trades]]
end
subgraph Consumers
TC[Ticker Consumer]
IC[Indicator Engine]
end
subgraph Storage
PG[(PostgreSQL)]
RD[(Redis)]
end
subgraph API
REST[GET /candles]
WS[WS /indicators]
end
CB -- trades --> ING
ING -- publish --> K
K -- consume --> TC
K -- consume --> IC
TC -- OHLCV candles --> PG
IC -- SMA / RSI / EMA --> RD
PG --> REST
RD --> WS
- Real-time ingestion — WebSocket connection to Coinbase for BTC-USD, ETH-USD, SOL-USD
- Stream processing — Kafka-backed pipeline with independent consumer groups
- OHLCV candle aggregation — 1-minute candles built from raw trades and flushed to Postgres
- Streaming indicators — Running SMA, RSI, and EMA computed per-product and published to Redis
- REST + WebSocket API — Query historical candles or subscribe to live indicator updates
- Pluggable exchange adapters — Abstract base class for adding new exchanges
| Layer | Technology | Role |
|---|---|---|
| Ingestion | websockets, aiokafka |
Connect to exchange feeds, publish to Kafka |
| Messaging | Apache Kafka (KRaft) | Decouple ingestion from processing |
| Processing | asyncio, custom consumers |
Candle aggregation, indicator computation |
| Storage | PostgreSQL / Supabase | Persistent candle storage |
| Cache | Redis | Real-time indicator values |
| API | FastAPI, Uvicorn | REST endpoints + WebSocket streaming |
| Infra | Docker Compose | Kafka, Redis, Kafka UI |
# 1. Start infrastructure
docker compose up -d
# 2. Install dependencies
uv sync
# 3. Set environment variables
export DATABASE_URL="your-postgres-connection-string"
export REDIS_URL="redis://localhost:6379"
# 4. Run all services
uv run run.pyKafka UI is available at localhost:8888.
| Method | Path | Description |
|---|---|---|
GET |
/candles/{product_id}/{resolution} |
Historical OHLCV candles |
WS |
/indicators/{product_id} |
Live indicator stream (SMA, RSI, EMA) |
services/
├── ingestion/ # Exchange WebSocket → Kafka
│ └── exchanges/ # Coinbase, Binance, Kraken adapters
├── consumer/ # Kafka → Postgres + Redis
│ ├── ticker_consumer.py
│ ├── indicator_consumer.py
│ └── Indicators.py # RunningSMA, RunningRSI, RunningEMA
├── database/ # psycopg2 Postgres wrapper
└── api/ # FastAPI REST + WebSocket
└── routes/