A production-ready asynchronous image analysis service built with FastAPI, Apache Kafka, Celery, and HuggingFace Transformers. This service provides scalable and resilient AI-driven image analysis capabilities including classification and object detection.
- Asynchronous Processing: Non-blocking API with distributed task queue
- Event-Driven Architecture: Apache Kafka for reliable message distribution
- Scalable Workers: Celery workers with Redis backend
- AI-Powered Analysis: Pre-trained HuggingFace models for image classification and object detection
- Robust Error Handling: Automatic retries with exponential backoff
- Containerized Deployment: Complete Docker Compose orchestration
- RESTful API: FastAPI with comprehensive validation
- Job Status Tracking: SQLite database for job metadata and results
Figure 1: System architecture showing data flow, component interactions, and technology stack
- API Framework: FastAPI
- Message Queue: Apache Kafka + Zookeeper
- Task Queue: Celery
- Cache/Broker: Redis
- Database: SQLite
- AI Models: HuggingFace Transformers
- Image Processing: Pillow
- ML Framework: PyTorch
- Containerization: Docker + Docker Compose
- Docker and Docker Compose
- Python 3.12+ (for local development)
- At least 4GB RAM available for containers
- Clone the repository:
git clone https://github.com/AkhileshMalthi/AI-image-analysis-service.git
cd AI-image-analysis-service- Create environment file (optional):
cp .env.example .env
# Edit .env if needed- Start all services:
docker-compose up --buildThis will start:
- Zookeeper (port 2181)
- Kafka (port 9092)
- Redis (port 6379)
- FastAPI API (port 8000)
- Celery Worker
- Access the API documentation:
http://localhost:8000/docs
- Create virtual environment:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate- Install dependencies:
pip install -r requirements.txt
# or with uv
uv pip install -r requirements.txt- Set up environment variables:
cp .env.example .env- Initialize database:
python -c "from src.db import create_db_and_tables; create_db_and_tables()"- Run services separately:
Terminal 1 - API Server:
uvicorn src.main:app --reload --host 0.0.0.0 --port 8000Terminal 2 - Kafka Consumer:
python -m src.tasksTerminal 3 - Celery Worker:
celery -A src.worker.celery_app worker -l info --concurrency=2POST /upload-image
Upload an image for asynchronous analysis.
Request:
file: Image file (JPEG/PNG, max 10MB)analysis_type: "classification" or "object_detection"
Response (202 Accepted):
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "PENDING",
"message": "Image uploaded successfully and queued for analysis"
}Example with curl:
curl -X POST "http://localhost:8000/upload-image" \
-F "file=@image.jpg" \
-F "analysis_type=classification"Example with Python:
import requests
url = "http://localhost:8000/upload-image"
files = {"file": open("image.jpg", "rb")}
data = {"analysis_type": "classification"}
response = requests.post(url, files=files, data=data)
print(response.json())GET /job-status/{job_id}
Retrieve the status and results of a job.
Response (200 OK):
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "COMPLETED",
"message": "Job is completed",
"results": {
"analysis_type": "classification",
"predictions": [
{"label": "golden retriever", "score": 0.9234},
{"label": "Labrador retriever", "score": 0.0456}
]
},
"error_message": null,
"created_at": "2026-01-22T10:30:00",
"updated_at": "2026-01-22T10:30:15"
}Example:
curl http://localhost:8000/job-status/550e8400-e29b-41d4-a716-446655440000PENDING: Job created and waiting for processingPROCESSING: Worker is currently analyzing the imageCOMPLETED: Analysis finished successfullyFAILED: Analysis failed (see error_message)
Run the test suite:
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_api.py -v
# Run specific test
pytest tests/test_api.py::test_upload_image_success -v- API Endpoints: Input validation, status codes, Kafka integration
- Celery Tasks: Job processing, error handling, retries
- AI Inference: Model loading, classification, object detection
- Utilities: Image validation, file handling
Model: google/vit-base-patch16-224 (Vision Transformer)
- Returns top-5 predictions with confidence scores
- Supports 1000 ImageNet classes
Model: facebook/detr-resnet-50 (DETR)
- Detects multiple objects in images
- Returns bounding boxes and labels
- Confidence threshold: 0.5
Override default models using environment variables:
CLASSIFICATION_MODEL=microsoft/resnet-50
OBJECT_DETECTION_MODEL=facebook/detr-resnet-101All configuration is managed through environment variables. See .env.example for available options.
| Variable | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
localhost:9092 |
Kafka broker address |
KAFKA_TOPIC |
image-analysis-jobs |
Topic for job messages |
REDIS_BROKER_URL |
redis://localhost:6379/0 |
Celery broker URL |
DATABASE_URL |
sqlite:///./data/jobs.db |
Database connection string |
MAX_FILE_SIZE |
10485760 |
Max upload size (10MB) |
CELERY_TASK_MAX_RETRIES |
3 |
Max retry attempts |
HUGGINGFACE_AUTH_TOKEN |
None |
HF token for private models |
The system implements intelligent retry logic:
-
Transient Errors (network issues, temporary unavailability):
- Automatic retry with exponential backoff
- Maximum 3 retry attempts
- Backoff: 60s, 120s, 240s
-
Permanent Errors (invalid image, model errors):
- No retry
- Job marked as FAILED
- Detailed error message stored
- Image Too Large: HTTP 400 - Reduce image size
- Invalid Format: HTTP 400 - Use JPEG/PNG
- Job Not Found: HTTP 404 - Check job_id
- Kafka Unavailable: HTTP 500 - Check Kafka service
- Model Loading Failed: Job FAILED - Check HuggingFace connectivity
curl http://localhost:8000/healthView worker status:
celery -A src.worker.celery_app inspect active
celery -A src.worker.celery_app inspect statsView service logs:
# All services
docker-compose logs -f
# Specific service
docker-compose logs -f api
docker-compose logs -f worker
docker-compose logs -f kafkaScale Celery workers:
docker-compose up --scale worker=5For production, consider migrating to PostgreSQL:
DATABASE_URL=postgresql://user:pass@localhost/dbname- Use environment-specific
.envfiles - Secure Kafka with SSL/SASL
- Enable Redis authentication
- Implement API rate limiting
- Use reverse proxy (Nginx) for API
- Set up monitoring (Prometheus + Grafana)
├── src/
│ ├── main.py # FastAPI application
│ ├── worker.py # Celery worker setup
│ ├── tasks.py # Celery tasks
│ ├── db.py # Database operations
│ ├── kafka_producer.py # Kafka producer
│ ├── config.py # Configuration
│ ├── utils.py # Utility functions
│ └── models/
│ ├── __init__.py # SQLModel definitions
│ └── schemas.py # Pydantic schemas
├── tests/
│ ├── test_api.py # API endpoint tests
│ ├── test_tasks.py # Celery task tests
│ └── test_inference.py # AI inference tests
├── Dockerfile.api # API service Dockerfile
├── Dockerfile.worker # Worker service Dockerfile
├── docker-compose.yml # Service orchestration
├── requirements.txt # Python dependencies
├── pyproject.toml # Project metadata
├── .env.example # Environment template
└── README.md # This file
Contributions are welcome! Please follow these guidelines:
- Fork the repository
- Create a feature branch
- Write tests for new features
- Ensure all tests pass
- Submit a pull request
This project is licensed under the MIT License - see the LICENSE file for details.
- Kafka: Provides reliable message queuing with durability and replay capabilities
- Celery: Handles distributed task execution with advanced retry logic
- Separation of Concerns: Kafka for message routing, Celery for task execution
- Simple deployment for demonstration
- Easy to migrate to PostgreSQL/MySQL for production
- Sufficient for moderate workloads
Models are loaded once per worker process and cached in memory to avoid:
- Repeated downloads
- Initialization overhead
- Memory bloat
For issues and questions:
- Open an issue on GitHub
- Check existing documentation
- Review test cases for examples
- HuggingFace for pre-trained models
- FastAPI framework
- Apache Kafka community
- Celery project