A system that combines weather data with machine learning to predict crime probabilities. The system uses Kafka for message queuing, PySpark for ML predictions, and integrates with the Open-Meteo API for weather data.
The system consists of three main components:
- Weather Service: Fetches weather data from Open-Meteo API
- ML Model Service: Processes weather data to make crime predictions
- Kafka Message Queue: Handles communication between services
- Docker and Docker Compose
- Python 3.9+
- Java 11 (for PySpark)
- Clone the repository:
git clone [your-repository-url]
cd predictive_policing_system- Create and activate virtual environment:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate- Install dependencies:
pip install -r requirements.txt- Start Kafka and all services using Docker:
docker-compose up -d- Initialize Kafka topics:
docker exec predictive_policing_system-kafka-1 kafka-topics --create --topic weather_requests --bootstrap-server localhost:9092
docker exec predictive_policing_system-kafka-1 kafka-topics --create --topic weather_results --bootstrap-server localhost:9092
docker exec predictive_policing_system-kafka-1 kafka-topics --create --topic model_requests --bootstrap-server localhost:9092
docker exec predictive_policing_system-kafka-1 kafka-topics --create --topic model_results --bootstrap-server localhost:9092- Run each component (in separate terminals):
# Terminal 1 - Start the ML Model Server
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 python -m ml_model.serve_model
# Terminal 2 - Start the Weather Worker
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 python -m weather_service.worker
# Terminal 3 - Run test client
python weather_service/test_weather.pypredictive_policing_system/
├── ml_model/
│ ├── crime_prediction_model/ # ML model files
│ ├── serve_model.py # ML model server
│ └── Dockerfile
├── weather_service/
│ ├── worker.py # Weather data processor
│ ├── service.py # Weather service client
│ └── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── README.md
Environment variables can be set in .env file:
KAFKA_BOOTSTRAP_SERVERS: Kafka server address (default: localhost:9092)WEATHER_API_BASE_URL: OpenMeteo API URLMODEL_PATH: Path to ML model files
To use the weather prediction service:
from weather_service.service import WeatherService
weather = WeatherService()
result = weather.get_weather(
latitude=40.87922,
longitude=-73.86106,
date="2024-12-17",
time_str="3:45 PM"
)
print(result)Monitor Kafka topics:
# View messages in any topic
docker exec -it predictive_policing_system-kafka-1 kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic weather_requests \
--from-beginningThe system uses the following Docker services:
- Zookeeper: Manages Kafka cluster state
- Kafka: Message broker for service communication
- ML Model Service: PySpark-based prediction service
- Weather Worker: Processes weather requests and coordinates with ML service
- Client sends request to
weather_requeststopic - Weather Worker:
- Receives request from
weather_requests - Fetches weather data from Open-Meteo API
- Sends data to
model_requeststopic
- Receives request from
- ML Model Service:
- Processes data from
model_requests - Sends prediction to
model_resultstopic
- Processes data from
- Weather Worker:
- Receives prediction from
model_results - Sends final result to
weather_resultstopic
- Receives prediction from
- Client receives result from
weather_resultstopic
-
If Kafka connection fails, verify the bootstrap server address:
- Use
localhost:9092when running services locally - Use
kafka:9092when running in Docker
- Use
-
Verify topics exist:
docker exec predictive_policing_system-kafka-1 kafka-topics --list --bootstrap-server localhost:9092- Common Issues:
- Kafka connection refused: Ensure Kafka container is running
- Model not found: Check MODEL_PATH environment variable
- Topic not found: Run the topic creation commands
- Java not found: Verify Java 11 installation for PySpark
-
Adding new features:
- Add new topic names to
.env - Update docker-compose.yml for new services
- Follow existing message patterns for Kafka communication
- Add new topic names to
-
Testing:
- Use provided test_weather.py as example
- Monitor topic messages for debugging
- Check container logs for service issues