Skip to content

Latest commit

 

History

History
681 lines (553 loc) · 20 KB

File metadata and controls

681 lines (553 loc) · 20 KB

Diamond Price Predictor - Implementation Tasks

Overview

This document provides concrete, actionable tasks for implementing the Diamond Price Predictor ML system. Each task includes specific deliverables, acceptance criteria, and file structure requirements based on the technical architecture defined in CLAUDE.md.


Development Environment Setup

Task 0.1: Project Structure & Environment

Priority: P0 | Effort: 1 day | Owner: DevOps/Setup

Deliverables:

# Create project structure
mkdir -p src/{components,pipeline,utils}
mkdir -p {data/{raw,processed,validated},models,notebooks,tests,docs,airflow/dags}
mkdir -p {reports,logs,configs}

# Create configuration files
touch requirements.txt setup.py params.yaml dvc.yaml
touch Dockerfile.flask docker-compose.yaml .env.example
touch README.md CONTRIBUTING.md .gitignore .dockerignore

Key Files to Create:

  • requirements.txt: Python dependencies
  • setup.py: Package configuration
  • params.yaml: Model hyperparameters
  • dvc.yaml: DVC pipeline configuration
  • docker-compose.yaml: Multi-service orchestration

Acceptance Criteria:

  • Complete directory structure created
  • Virtual environment activated and dependencies installed
  • Git repository initialized with proper .gitignore
  • DVC initialized for data versioning
  • Docker environment functional

Phase 1: ML Pipeline Development (Weeks 1-4)

Task 1.1: Data Ingestion Component

Priority: P0 | Effort: 3 days | Files: src/components/data_ingestion.py

Implementation Requirements:

class DataIngestion:
    def __init__(self, config_path: str):
        # Load data ingestion configuration
        
    def download_data(self) -> str:
        # Download diamond dataset from source
        # Validate data integrity and format
        
    def extract_data(self) -> pd.DataFrame:
        # Extract and load raw diamond data
        # Handle CSV parsing and data type inference
        
    def validate_data_schema(self, df: pd.DataFrame) -> bool:
        # Validate 9 required columns exist
        # Check data ranges and types

Acceptance Criteria:

  • Downloads and validates diamond dataset
  • Handles missing files with proper error messages
  • Validates required 9 features (carat, cut, color, clarity, depth, table, x, y, z)
  • Saves raw data to data/raw/ directory
  • Logs data ingestion statistics and issues
  • Unit tests achieve 90%+ coverage

Task 1.2: Data Transformation Pipeline

Priority: P0 | Effort: 4 days | Files: src/components/data_transformation.py

Implementation Requirements:

class DataTransformation:
    def __init__(self, config_path: str):
        # Load transformation parameters
        
    def get_data_transformer_object(self) -> Pipeline:
        # Create sklearn preprocessing pipeline
        # Handle numerical and categorical features
        
    def initiate_data_transformation(self, train_path: str, test_path: str):
        # Apply preprocessing transformations
        # Handle outliers and missing values
        # Feature engineering for diamond characteristics

Key Features:

  • Categorical encoding for cut, color, clarity grades
  • Numerical scaling for continuous features
  • Outlier detection using IQR method
  • Feature validation and range checking

Acceptance Criteria:

  • Processes all 9 diamond features correctly
  • Handles categorical variables with proper encoding
  • Applies feature scaling for numerical variables
  • Detects and handles outliers (>3 IQR from median)
  • Saves preprocessed data to data/processed/
  • Creates and saves preprocessing pipeline object

Task 1.3: Model Training Engine

Priority: P0 | Effort: 5 days | Files: src/components/model_trainer.py

Implementation Requirements:

class ModelTrainer:
    def __init__(self):
        self.config = ModelTrainerConfig()
        
    def initiate_model_training(self, train_array, test_array):
        # XGBoost model training with hyperparameter optimization
        # Cross-validation for model selection
        
    def hyperparameter_tuning(self, X_train, y_train) -> dict:
        # GridSearchCV or RandomizedSearchCV
        # Target: 95%+ accuracy on test set
        
    def evaluate_models(self, X_train, y_train, X_test, y_test) -> dict:
        # Multiple model comparison (XGBoost, Random Forest, etc.)
        # Performance metrics: MAE, RMSE, R²

Target Hyperparameters:

# params.yaml
model_trainer:
  xgboost:
    n_estimators: [100, 150, 200]
    learning_rate: [0.01, 0.1, 0.2]
    max_depth: [3, 5, 7]
    subsample: [0.8, 0.9, 1.0]
  target_accuracy: 0.95
  cv_folds: 5

Acceptance Criteria:

  • Achieves 95%+ prediction accuracy on test dataset
  • Training completes in <10 minutes on standard hardware
  • Implements hyperparameter optimization
  • Saves best model with performance metrics
  • Creates model performance visualization
  • Integrates with MLflow for experiment tracking

Task 1.4: Model Evaluation Framework

Priority: P0 | Effort: 3 days | Files: src/components/model_evaluation.py

Implementation Requirements:

class ModelEvaluation:
    def __init__(self):
        self.config = ModelEvaluationConfig()
        
    def evaluate_model(self, model, X_test, y_test) -> dict:
        # Calculate comprehensive metrics
        # Generate performance visualizations
        
    def generate_evaluation_report(self, metrics: dict) -> str:
        # Create detailed evaluation report
        # Include feature importance analysis

Required Metrics:

  • Mean Absolute Error (MAE)
  • Root Mean Squared Error (RMSE)
  • R² Score (Coefficient of Determination)
  • Mean Absolute Percentage Error (MAPE)

Acceptance Criteria:

  • Calculates all required performance metrics
  • Generates feature importance visualizations
  • Creates prediction vs actual plots
  • Saves evaluation report to reports/ directory
  • Validates model meets 95% accuracy requirement
  • Implements confidence interval calculations

Phase 2: API Development (Weeks 5-8)

Task 2.1: Prediction Pipeline

Priority: P0 | Effort: 3 days | Files: src/pipeline/prediction_pipeline.py

Implementation Requirements:

class PredictPipeline:
    def __init__(self):
        self.model = self.load_model()
        self.preprocessor = self.load_preprocessor()
        
    def load_model(self):
        # Load trained model from artifacts
        
    def predict(self, features: pd.DataFrame) -> np.ndarray:
        # Single diamond prediction
        # Input validation and preprocessing
        
    def predict_batch(self, features: pd.DataFrame) -> np.ndarray:
        # Bulk diamond predictions
        # Optimized for 1000+ diamonds

Custom Data Class:

class CustomData:
    def __init__(self, carat: float, cut: str, color: str, clarity: str,
                 depth: float, table: float, x: float, y: float, z: float):
        # Diamond feature data structure
        
    def get_data_as_dataframe(self) -> pd.DataFrame:
        # Convert to DataFrame for prediction

Acceptance Criteria:

  • Loads model and preprocessor from saved artifacts
  • Validates input features with proper error handling
  • Returns predictions with confidence intervals
  • Handles batch processing efficiently (1000+ diamonds)
  • Response time <200ms for single predictions
  • Comprehensive logging for prediction requests

Task 2.2: Flask REST API Server

Priority: P0 | Effort: 4 days | Files: app.py

API Endpoints:

# app.py
from flask import Flask, request, jsonify
from src.pipeline.prediction_pipeline import PredictPipeline, CustomData

app = Flask(__name__)
predict_pipeline = PredictPipeline()

@app.route('/predict', methods=['POST'])
def predict_datapoint():
    # Single diamond prediction endpoint
    # Input validation and error handling
    
@app.route('/predict/batch', methods=['POST'])
def predict_batch():
    # Bulk diamond prediction endpoint
    # CSV file upload processing
    
@app.route('/health', methods=['GET'])
def health_check():
    # System health and model status
    
@app.route('/model/info', methods=['GET'])
def model_info():
    # Model metadata and performance metrics

Input Validation Schema:

DIAMOND_SCHEMA = {
    'caret': {'type': 'float', 'min': 0.1, 'max': 10.0},
    'cut': {'type': 'string', 'enum': ['Fair', 'Good', 'Very Good', 'Premium', 'Ideal']},
    'color': {'type': 'string', 'enum': ['D', 'E', 'F', 'G', 'H', 'I', 'J']},
    'clarity': {'type': 'string', 'enum': ['FL', 'IF', 'VVS1', 'VVS2', 'VS1', 'VS2', 'SI1', 'SI2', 'I1']},
    'depth': {'type': 'float', 'min': 40, 'max': 80},
    'table': {'type': 'float', 'min': 40, 'max': 100},
    'x': {'type': 'float', 'min': 0, 'max': 20},
    'y': {'type': 'float', 'min': 0, 'max': 20},
    'z': {'type': 'float', 'min': 0, 'max': 20}
}

Acceptance Criteria:

  • All 4 endpoints functional with proper HTTP methods
  • Input validation with meaningful error messages
  • JSON response format with prediction and confidence
  • Error handling for invalid requests (4xx) and server errors (5xx)
  • Request/response logging for monitoring
  • CORS support for frontend integration

Task 2.3: Streamlit Dashboard

Priority: P0 | Effort: 4 days | Files: streamlit_app.py

Dashboard Components:

import streamlit as st
from src.pipeline.prediction_pipeline import PredictPipeline, CustomData

# Main prediction interface
def main():
    st.title("💎 Diamond Price Predictor")
    
    # Input form for diamond characteristics
    with st.form("prediction_form"):
        # 9 input fields for diamond features
        
    # Prediction display with confidence intervals
    # Batch processing file upload
    # Historical predictions tracking

Required Features:

  • Interactive input forms for all 9 diamond characteristics
  • Real-time price prediction display
  • Confidence interval visualization
  • CSV batch upload and processing
  • Prediction history and analytics
  • Model performance dashboard

Acceptance Criteria:

  • Interactive forms for all diamond characteristics
  • Real-time predictions with visual feedback
  • Batch CSV upload processing (1000+ diamonds)
  • Results download functionality
  • Responsive design for mobile devices
  • Error handling with user-friendly messages

Task 2.4: Docker Containerization

Priority: P0 | Effort: 2 days | Files: Dockerfile.flask, docker-compose.yaml

Dockerfile.flask:

FROM python:3.8-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
EXPOSE 5000

CMD ["python", "app.py"]

docker-compose.yaml:

version: '3.8'
services:
  api:
    build:
      context: .
      dockerfile: Dockerfile.flask
    ports:
      - "5000:5000"
    environment:
      - FLASK_ENV=production
    volumes:
      - ./models:/app/models
      - ./logs:/app/logs
      
  dashboard:
    build:
      context: .
      dockerfile: Dockerfile.streamlit
    ports:
      - "8501:8501"
    depends_on:
      - api

Acceptance Criteria:

  • Flask API container builds and runs successfully
  • Streamlit dashboard container operational
  • Multi-service orchestration with docker-compose
  • Persistent volumes for models and logs
  • Health checks and restart policies configured
  • Environment variables for configuration

---Start Here

Phase 3: MLOps Infrastructure (Weeks 9-12)

Task 3.1: MLflow Integration

Priority: P1 | Effort: 3 days | Files: MLflow configuration

MLflow Setup:

# src/components/model_trainer.py
import mlflow
import mlflow.xgboost

class ModelTrainer:
    def initiate_model_training(self, train_array, test_array):
        with mlflow.start_run():
            # Log hyperparameters
            mlflow.log_params(self.config.hyperparameters)
            
            # Train model
            model = self.train_model(train_array, test_array)
            
            # Log metrics
            mlflow.log_metrics(evaluation_metrics)
            
            # Log model
            mlflow.xgboost.log_model(model, "diamond_price_model")

Acceptance Criteria:

  • MLflow tracking server operational
  • All training runs logged with parameters and metrics
  • Model versioning and registry functional
  • Model comparison interface available
  • Automated model deployment from registry
  • Integration with existing training pipeline

Task 3.2: DVC Pipeline Configuration

Priority: P1 | Effort: 2 days | Files: dvc.yaml

DVC Pipeline Stages:

# dvc.yaml
stages:
  data_ingestion:
    cmd: python src/components/data_ingestion.py
    deps:
      - src/components/data_ingestion.py
    outs:
      - data/raw/diamonds.csv
      
  data_transformation:
    cmd: python src/components/data_transformation.py
    deps:
      - src/components/data_transformation.py
      - data/raw/diamonds.csv
    outs:
      - data/processed/train.csv
      - data/processed/test.csv
      - artifacts/preprocessor.pkl
      
  model_training:
    cmd: python src/components/model_trainer.py
    deps:
      - src/components/model_trainer.py
      - data/processed/train.csv
      - data/processed/test.csv
    outs:
      - models/model.pkl
    metrics:
      - reports/metrics.json:
          cache: false

Acceptance Criteria:

  • Complete pipeline stages defined
  • Data and model versioning operational
  • Remote storage configured (S3/GCS)
  • Pipeline reproducibility verified
  • Metrics tracking and comparison
  • Automated pipeline execution

Task 3.3: Testing Infrastructure

Priority: P1 | Effort: 4 days | Files: tests/ directory

Test Structure:

tests/
├── unit/
│   ├── test_data_ingestion.py
│   ├── test_data_transformation.py
│   ├── test_model_trainer.py
│   └── test_prediction_pipeline.py
├── integration/
│   ├── test_api_endpoints.py
│   └── test_pipeline_integration.py
└── test_config.py

Test Requirements:

# tests/unit/test_model_trainer.py
import pytest
from src.components.model_trainer import ModelTrainer

class TestModelTrainer:
    def test_model_training_accuracy(self):
        # Test model achieves 95%+ accuracy
        
    def test_training_time_limit(self):
        # Test training completes within 10 minutes
        
    def test_hyperparameter_optimization(self):
        # Test hyperparameter tuning functionality

Acceptance Criteria:

  • Unit tests for all components (80%+ coverage)
  • Integration tests for API endpoints
  • Performance tests for prediction speed
  • Data quality tests for pipeline validation
  • Automated test execution in CI/CD
  • Test reporting and metrics collection

Task 3.4: Monitoring & Logging

Priority: P1 | Effort: 3 days | Files: src/utils/logging.py, monitoring configs

Logging Configuration:

# src/utils/logging.py
import logging
import sys
from datetime import datetime

def setup_logger(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    
    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    
    # File handler
    file_handler = logging.FileHandler(f'logs/{name}_{datetime.now().strftime("%Y%m%d")}.log')
    file_handler.setLevel(logging.DEBUG)
    
    # Formatter
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

Monitoring Metrics:

  • API response times and error rates
  • Model prediction accuracy and drift
  • System resource utilization
  • User engagement and feature usage

Acceptance Criteria:

  • Comprehensive logging across all components
  • Structured log format for analysis
  • Log rotation and retention policies
  • Error tracking and alerting system
  • Performance monitoring dashboard
  • Health check endpoints functional

Phase 4: Deployment & Production (Weeks 13-16)

Task 4.1: Production Deployment

Priority: P0 | Effort: 3 days | Files: Deployment configurations

Deployment Checklist:

  • Production environment setup (cloud or on-premise)
  • Load balancer configuration for high availability
  • SSL/HTTPS certificates and security hardening
  • Database setup for prediction history (optional)
  • Backup and disaster recovery procedures
  • Performance monitoring and alerting

Task 4.2: API Documentation

Priority: P1 | Effort: 2 days | Files: docs/api_documentation.md

OpenAPI Specification:

# docs/openapi.yaml
openapi: 3.0.0
info:
  title: Diamond Price Predictor API
  version: 1.0.0
  description: ML-powered diamond price prediction service

paths:
  /predict:
    post:
      summary: Single diamond price prediction
      requestBody:
        required: true
        content:
          application/json:
            schema:
              $ref: '#/components/schemas/DiamondInput'
      responses:
        200:
          description: Successful prediction
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/PredictionResponse'

Acceptance Criteria:

  • Complete API documentation with examples
  • Interactive API documentation (Swagger UI)
  • Code examples in multiple languages
  • Error code reference and troubleshooting
  • Authentication and rate limiting documentation
  • User guides for different personas

Success Criteria & Definition of Done

Overall Project Success Criteria

  • Model achieves 95%+ accuracy on diamond price predictions
  • API responds in <200ms for single predictions
  • System handles 1000+ requests/minute throughput
  • 99.9% system uptime and reliability
  • Complete MLOps pipeline with automated retraining
  • Production deployment with monitoring and alerting

Task-Level Definition of Done

For each task to be considered complete:

  • All acceptance criteria met and verified
  • Code reviewed and approved by team lead
  • Unit tests written with 80%+ coverage
  • Integration tests passing
  • Documentation updated (code comments, README, API docs)
  • Performance benchmarks met
  • Security requirements satisfied
  • Deployed to staging environment and tested

Quality Gates

  • Code Quality: Black formatting, flake8 linting, mypy type checking
  • Testing: 80%+ test coverage, all tests passing
  • Performance: Response time SLAs met under load
  • Security: No critical vulnerabilities in dependency scan
  • Documentation: Complete API documentation and user guides

Development Tools & Commands

Environment Setup

# Virtual environment
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate   # Windows

# Install dependencies
pip install -r requirements.txt

# Development tools
pip install black isort flake8 mypy pytest pytest-cov

Development Workflow

# Code formatting
black src/ tests/
isort src/ tests/

# Linting
flake8 src/ tests/
mypy src/

# Testing
pytest tests/ -v --cov=src --cov-report=html

# Run services
python app.py                    # Flask API
streamlit run streamlit_app.py   # Dashboard
mlflow ui --port 5000           # MLflow UI

Docker Operations

# Build and run
docker-compose build
docker-compose up -d

# View logs
docker-compose logs -f api
docker-compose logs -f dashboard

# Scale services
docker-compose up --scale api=3

Document Version: 2.0 (Implementation-Focused)
Last Updated: January 2025
Next Review: Weekly during development sprints

This task document provides concrete, actionable steps for building the Diamond Price Predictor system with focus on measurable deliverables and technical implementation details.