A production-grade Python backend using FastAPI, Celery, MongoDB, and Google AI (Gemini) that provides AI-powered market intelligence through a background processing architecture. The API serves pre-computed snapshots for sub-200ms response times.
Market Pulse is a market insights system that uses 3 specialized AI agents running as Celery tasks to fetch news, process indices, and generate market snapshots. The API reads only from MongoDBβno real-time AI processing on requestβensuring fast, consistent responses.
- Background Processing: Celery tasks handle news fetch, AI analysis, and snapshot generation on a schedule
- Pre-computed Snapshots: API serves cached snapshots from MongoDB; target response time < 200ms
- 3 Agents: NewsProcessingAgent, SnapshotGenerationAgent, IndicesCollectionAgent
- Data Sources: CMOTS API for news and world indices; Gemini for sentiment and summaries
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Request β
β GET /api/v1/market-summary β
βββββββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MongoDB β
β ββββββββββββββββββββ ββββββββββββββββββ ββββββββββββββββ β
β β market_snapshots β β news_articles β βindices_timeseriesβ
β β (TTL: 15 min) β β (90 day retain)β β (90 day TTL) β β
β ββββββββββββββββββββ ββββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββ²ββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββ
β Celery Tasks (Background) β
β βββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββ β
β βfetch_news (3m) β βgen_snapshot (5m) β βfetch_indices β β
β ββββββββββ¬βββββββββ ββββββββββ¬ββββββββββ ββββββββ¬ββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββ β
β βNewsProcessing β βSnapshotGenerationβ βIndicesCollectionβ
β βAgent (Gemini) β βAgent (Gemini) β βAgent β
β βββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Agent | Location | Responsibility |
|---|---|---|
| NewsProcessingAgent | app/agents/news_processing_agent.py |
AI sentiment, entity extraction, summary, impact analysis |
| SnapshotGenerationAgent | app/agents/snapshot_agent.py |
Market outlook, summary bullets, executive summary, trending news |
| IndicesCollectionAgent | app/agents/indices_agent.py |
Fetch world indices from CMOTS, store in MongoDB, market-hours aware |
See ARCHITECTURE.md for data flow, task schedules, and configuration details.
- Python 3.11+
- Google AI API key (Google AI Studio)
- Redis (Celery broker + cache)
- MongoDB
- Docker (optional, for Redis/MongoDB and full stack)
-
Clone and enter the project:
cd sharingan -
Create virtual environment:
python -m venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate
-
Install dependencies:
pip install -r requirements.txt
-
Configure environment:
cp .env.example .env # Edit .env and .env.local with GOOGLE_AI_API_KEY, MONGODB_URL, REDIS_URL, etc. -
Run with the startup script (recommended):
./run.sh all
This starts Redis, MongoDB (via Docker), Celery worker, Celery beat, triggers initial data tasks, and runs the FastAPI server.
# Start Redis and MongoDB (e.g. via Docker or local install)
# Then:
source .venv/bin/activate
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
# In another terminal: start Celery worker and beat (see run.sh)# All services via docker-compose
./run.sh docker
# With dev/monitoring profile (e.g. Flower)
./run.sh docker-dev
# Or directly:
docker-compose up -d
docker-compose logs -f market-pulse-api
docker-compose downThe project includes a development script run.sh that manages local services. Ensure .venv exists and optionally create .env.local for overrides.
| Command | Description |
|---|---|
./run.sh all |
Start all services: Redis, MongoDB, Celery worker, Celery beat, trigger initial data population, then run the FastAPI API (foreground). Use for full local dev. |
./run.sh debug |
Same as all but runs the API in debug mode (verbose logging, access log). |
./run.sh api |
Start only the FastAPI server (assumes Redis/MongoDB/Celery already running or not needed for read-only testing). |
./run.sh celery |
Start Celery worker and Celery beat in the background; script stays running and shows logs. Use Ctrl+C to stop. |
./run.sh worker |
Start only the Celery worker in the foreground (single terminal). |
./run.sh beat |
Start only Celery beat in the foreground. |
./run.sh flower |
Start Flower monitoring UI at http://localhost:5555 (foreground). |
./run.sh infra |
Start only infrastructure: Redis and MongoDB (Docker containers). |
./run.sh logs |
Tail Celery worker and beat log files (logs/celery-worker.log, logs/celery-beat.log). |
./run.sh populate |
Trigger initial data population tasks (news fetch, indices fetch, snapshot generation). Requires Celery worker running. |
./run.sh stop |
Stop all background services: Celery worker, Celery beat, Flower, and Docker containers (Redis, MongoDB). |
./run.sh status |
Show status of Redis, MongoDB, Celery worker, Celery beat, and Flower (running/stopped). |
./run.sh docker |
Run the full stack with docker-compose (build and up). |
./run.sh docker-dev |
Run with docker-compose using dev and monitoring profiles. |
./run.sh help |
Print usage and available commands. |
Examples:
# Full dev: API + Celery + infra + initial data
./run.sh all
# Only API (e.g. if Celery is already running elsewhere)
./run.sh api
# Only infra, then in another terminal run worker + beat + api
./run.sh infra
./run.sh celery # in terminal 2
./run.sh api # in terminal 3
# Check what's running
./run.sh status
# Stop everything
./run.sh stopCelery logs are written to logs/. Use ./run.sh logs to tail them.
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/market-summary |
Returns the latest pre-computed market snapshot from MongoDB. Target < 200ms. Triggers async snapshot generation if none exists. |
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/snapshot/generate |
Manually trigger snapshot generation. |
| GET | /api/v1/db/stats |
Database statistics. |
| POST | /api/v1/data/populate |
Manually trigger data population (news, indices, snapshot). |
| GET | /api/v1/health |
Health check. |
| GET | /api/v1/agents/status |
Agent/task status. |
curl "http://localhost:8000/api/v1/market-summary"sharingan/
βββ app/
β βββ main.py # FastAPI application
β βββ config.py # Configuration (Pydantic Settings)
β βββ agents/ # AI agents (Celery task logic)
β β βββ base.py
β β βββ news_processing_agent.py
β β βββ snapshot_agent.py
β β βββ indices_agent.py
β βββ celery_app/
β β βββ celery_config.py
β β βββ tasks/
β β βββ news_tasks.py
β β βββ snapshot_tasks.py
β β βββ indices_tasks.py
β β βββ cleanup_tasks.py
β βββ db/
β β βββ mongodb.py
β β βββ models/ # MongoDB document models
β β βββ repositories/
β βββ models/ # Pydantic request/response schemas
β βββ services/ # Business logic & external APIs
β β βββ cmots_news_service.py
β β βββ company_news_service.py
β β βββ news_processor_service.py
β β βββ snapshot_generator_service.py
β β βββ redis_service.py
β βββ constants/ # Themes, tickers
β βββ middleware/
β βββ prompts/
β βββ tools/
β βββ utils/
βββ tests/
βββ run.sh # Development run script
βββ requirements.txt
βββ Dockerfile
βββ docker-compose.yml
βββ ARCHITECTURE.md
βββ README.md
Key environment variables (see app/config.py and .env / .env.local):
| Variable | Description | Default |
|---|---|---|
GOOGLE_AI_API_KEY |
Google AI (Gemini) API key | Required |
GEMINI_FAST_MODEL |
Model for fast tasks | gemini-2.0-flash |
GEMINI_PRO_MODEL |
Model for complex tasks | gemini-1.5-pro |
MONGODB_URL |
MongoDB connection URL | mongodb://localhost:27017 |
MONGODB_DATABASE |
Database name | market_intelligence |
REDIS_URL |
Redis connection URL | redis://localhost:6379 |
CELERY_BROKER_URL |
Celery broker (Redis) | redis://localhost:6379/0 |
CELERY_RESULT_BACKEND |
Celery results backend | redis://localhost:6379/1 |
LOG_LEVEL |
Logging level | INFO |
NEWS_FETCH_INTERVAL |
News task interval (seconds) | 180 (3 min) |
SNAPSHOT_GENERATION_INTERVAL |
Snapshot task interval (seconds) | 300 (5 min) |
INDICES_FETCH_INTERVAL |
Indices task interval (seconds) | 300 (5 min) |
# Run all tests
pytest
# With coverage
pytest --cov=app --cov-report=html
# Specific test file
pytest tests/unit/test_market_data_agent.py
# Integration tests
pytest tests/integration/- Structured logging: JSON-formatted logs with request tracing
- Health checks:
/api/v1/health,/api/v1/agents/status - Flower: Celery monitoring at http://localhost:5555 when started via
./run.sh floweror docker-dev profile
MIT License
- Fork the repository
- Create a feature branch
- Make changes with tests
- Submit a pull request
For questions or issues, please open a GitHub issue.