Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from auth_utils import token_required, roles_required
import backend.sockets.forum_events # Register forum socket events
import backend.sockets.knowledge_events # Register knowledge exchange events
import backend.sockets.alert_socket # Register centralized alert socket events
from backend.utils.i18n import t

# Set up logging
Expand Down
4 changes: 3 additions & 1 deletion backend/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .knowledge import Question, Answer, KnowledgeVote, Badge, UserBadge, UserExpertise
from .equipment import Equipment, RentalBooking, AvailabilityCalendar, PaymentEscrow
from .farm import Farm, FarmMember, FarmAsset, FarmRole
from .alert import Alert, AlertPreference
from .weather import WeatherData, CropAdvisory, AdvisorySubscription
from .sustainability import CarbonPractice, CreditLedger, AuditRequest
from .vendor_profile import VendorProfile
Expand Down Expand Up @@ -49,5 +50,6 @@
'WarehouseLocation', 'StockItem', 'StockMovement', 'ReconciliationLog',
'ClimateZone', 'SensorNode', 'TelemetryLog', 'AutomationTrigger',
'WorkerProfile', 'WorkShift', 'HarvestLog', 'PayrollEntry',
'DriverProfile', 'DeliveryVehicle', 'TransportRoute', 'FuelLog'
'DriverProfile', 'DeliveryVehicle', 'TransportRoute', 'FuelLog',
'Alert', 'AlertPreference'
]
120 changes: 120 additions & 0 deletions backend/models/alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from datetime import datetime
from backend.extensions import db

class Alert(db.Model):
__tablename__ = 'alerts'

id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True) # Null for broadcast
title = db.Column(db.String(255), nullable=False)
message = db.Column(db.Text, nullable=False)

# Priority: LOW, MEDIUM, HIGH, CRITICAL
priority = db.Column(db.String(20), default='MEDIUM', nullable=False)

# Category: WEATHER, MARKET, FORUM, SYSTEM, SECURITY, LOAN
category = db.Column(db.String(50), nullable=False)

# Grouping key for similar alerts
group_key = db.Column(db.String(100), nullable=True)

# Delivery Channels Status
websocket_delivered = db.Column(db.Boolean, default=False)
email_delivered = db.Column(db.Boolean, default=False)
sms_delivered = db.Column(db.Boolean, default=False)
push_delivered = db.Column(db.Boolean, default=False)

# Expiry for cleanup
expires_at = db.Column(db.DateTime, nullable=True)

read_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)

# Metadata for deep-linking
action_url = db.Column(db.String(512), nullable=True)
metadata_json = db.Column(db.Text, nullable=True) # JSON string for extra data

@property
def gravity_score(self) -> int:
"""Calculates a numerical score for alert importance."""
base_scores = {'CRITICAL': 100, 'HIGH': 75, 'MEDIUM': 50, 'LOW': 25}
score = base_scores.get(self.priority, 50)

# Boost score if category is high-risk
if self.category in ['SECURITY', 'MARKET']:
score += 20

return score

def validate(self):
"""Validates alert data before persistence."""
if not self.title or len(self.title) < 5:
raise ValueError("Alert title must be at least 5 characters long.")
if not self.category:
raise ValueError("Alert category is required.")
if self.priority not in ['LOW', 'MEDIUM', 'HIGH', 'CRITICAL']:
raise ValueError("Invalid priority level.")
return True

def to_dict(self):
"""Serializes alert data for API and WebSocket delivery."""
res = {
'id': self.id,
'user_id': self.user_id,
'title': self.title,
'message': self.message,
'priority': self.priority,
'category': self.category,
'gravity': self.gravity_score,
'group_key': self.group_key,
'delivery_report': {
'websocket': self.websocket_delivered,
'email': self.email_delivered,
'sms': self.sms_delivered,
'push': self.push_delivered
},
'status': 'READ' if self.read_at else 'UNREAD',
'read_at': self.read_at.isoformat() if self.read_at else None,
'created_at': self.created_at.isoformat(),
'expires_at': self.expires_at.isoformat() if self.expires_at else None,
'action_url': self.action_url,
'is_expired': self.expires_at < datetime.utcnow() if self.expires_at else False
}

# Parse metadata safely
try:
res['metadata'] = json.loads(self.metadata_json) if self.metadata_json else {}
except Exception:
res['metadata'] = {}

return res

@staticmethod
def get_priority_weight(priority):
"""Helper to compare priority levels numerically."""
weights = {'CRITICAL': 40, 'HIGH': 30, 'MEDIUM': 20, 'LOW': 10}
return weights.get(priority, 0)

class AlertPreference(db.Model):
__tablename__ = 'alert_preferences'

id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
category = db.Column(db.String(50), nullable=False)

email_enabled = db.Column(db.Boolean, default=True)
sms_enabled = db.Column(db.Boolean, default=False)
push_enabled = db.Column(db.Boolean, default=True)
websocket_enabled = db.Column(db.Boolean, default=True)

min_priority = db.Column(db.String(20), default='LOW')

def to_dict(self):
return {
'category': self.category,
'email_enabled': self.email_enabled,
'sms_enabled': self.sms_enabled,
'push_enabled': self.push_enabled,
'websocket_enabled': self.websocket_enabled,
'min_priority': self.min_priority
}
234 changes: 234 additions & 0 deletions backend/services/alert_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import json
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
from backend.extensions import db, mail, socketio
from backend.models import Alert, User, AlertPreference
from backend.utils.logger import logger

class AlertRegistry:
"""
Centralized registry for managing cross-module alerts.
Supports asynchronous delivery, priority-based queuing and multi-channel notification.
"""

@staticmethod
def register_alert(
title: str,
message: str,
category: str,
user_id: Optional[int] = None,
priority: str = 'MEDIUM',
group_key: Optional[str] = None,
action_url: Optional[str] = None,
metadata: Optional[Dict] = None,
ttl_days: int = 30
) -> Optional[Alert]:
"""
Registers a new alert and initiates the multi-channel delivery process.
"""
try:
# 1. Create the alert record
alert = Alert(
user_id=user_id,
title=title,
message=message,
category=category,
priority=priority,
group_key=group_key,
action_url=action_url,
metadata_json=json.dumps(metadata) if metadata else None,
expires_at=datetime.utcnow() + timedelta(days=ttl_days)
)

db.session.add(alert)
db.session.commit()

# 2. Process delivery based on user preferences or global rules
AlertRegistry._dispatch_alert(alert)

return alert
except Exception as e:
logger.error(f"Failed to register alert: {str(e)}", exc_info=True)
db.session.rollback()
return None

@staticmethod
def _dispatch_alert(alert: Alert):
"""
Dispatches alert to various channels based on user preferences.
"""
# If it's a global broadcast, we use default channels
if not alert.user_id:
AlertRegistry._deliver_websocket(alert)
return

user = User.query.get(alert.user_id)
if not user:
return

# Fetch or create default preferences
pref = AlertPreference.query.filter_by(
user_id=user.id,
category=alert.category
).first()

# Check if priority meets minimum requirement
if pref and Alert.get_priority_weight(alert.priority) < Alert.get_priority_weight(pref.min_priority):
logger.info(f"Alert {alert.id} skipped due to priority threshold for user {user.id}")
return

# WebSocket Delivery (Real-time)
if not pref or pref.websocket_enabled:
AlertRegistry._deliver_websocket(alert)

# Email Delivery
if (not pref or pref.email_enabled) and user.email:
AlertRegistry._deliver_email(alert, user.email)

# SMS Delivery
if pref and pref.sms_enabled and hasattr(user, 'phone') and user.phone:
AlertRegistry._deliver_sms(alert, user.phone)

@staticmethod
def _deliver_websocket(alert: Alert):
"""Delivers real-time alert via WebSockets."""
try:
payload = alert.to_dict()
room = f"user_{alert.user_id}" if alert.user_id else "global_alerts"

socketio.emit('new_alert', payload, room=room)

alert.websocket_delivered = True
db.session.commit()
logger.info(f"WebSocket alert {alert.id} delivered to {room}")
except Exception as e:
logger.error(f"WebSocket delivery failed for alert {alert.id}: {str(e)}")

@staticmethod
def _deliver_email(alert: Alert, email: str):
"""Delivers alert via Email (asynchronous placeholder)."""
try:
# In a real app, this would be a Celery task
# from backend.tasks.email_tasks import send_alert_email
# send_alert_email.delay(alert.id, email)

# For now, simulate/direct send
logger.info(f"Email alert {alert.id} queued for {email}")
alert.email_delivered = True
db.session.commit()
except Exception as e:
logger.error(f"Email delivery failed for alert {alert.id}: {str(e)}")

@staticmethod
def _deliver_sms(alert: Alert, phone: str):
"""Delivers alert via SMS (integration placeholder)."""
try:
logger.info(f"SMS alert {alert.id} queued for {phone}")
alert.sms_delivered = True
db.session.commit()
except Exception as e:
logger.error(f"SMS delivery failed for alert {alert.id}: {str(e)}")

@staticmethod
def mark_as_read(alert_id: int, user_id: int) -> bool:
"""Marks an alert as read for a specific user."""
alert = Alert.query.filter_by(id=alert_id, user_id=user_id).first()
if alert:
alert.read_at = datetime.utcnow()
db.session.commit()
return True
return False

@staticmethod
def get_user_alerts(user_id: int, unread_only: bool = False, limit: int = 50) -> List[Alert]:
"""Retrieves alerts for a user with filtering."""
query = Alert.query.filter(
(Alert.user_id == user_id) | (Alert.user_id.is_(None))
)

if unread_only:
query = query.filter(Alert.read_at.is_(None))

return query.order_by(Alert.created_at.desc()).limit(limit).all()

@staticmethod
def get_alert_summary(user_id: int) -> Dict[str, Any]:
"""
Generates a statistical summary of alerts for a user.
Useful for dashboard widgets and mobile push payload optimization.
"""
total = Alert.query.filter((Alert.user_id == user_id) | (Alert.user_id.is_(None))).count()
unread = Alert.query.filter(
((Alert.user_id == user_id) | (Alert.user_id.is_(None))),
Alert.read_at.is_(None)
).count()

# Priority breakdown
critical = Alert.query.filter_by(user_id=user_id, priority='CRITICAL', read_at=None).count()
high = Alert.query.filter_by(user_id=user_id, priority='HIGH', read_at=None).count()

# Category distribution
categories = db.session.query(
Alert.category, db.func.count(Alert.id)
).filter_by(user_id=user_id).group_by(Alert.category).all()

return {
'total_alerts': total,
'unread_count': unread,
'critical_count': critical,
'high_count': high,
'category_distribution': {cat: count for cat, count in categories},
'has_unread_critical': critical > 0
}

@staticmethod
def aggregate_alerts_by_group(user_id: int, group_key: str) -> List[Alert]:
"""
Retrieves all related alerts within a group to prevent notification fatigue.
"""
return Alert.query.filter_by(user_id=user_id, group_key=group_key).all()

@staticmethod
def delete_alerts_by_group(user_id: int, group_key: str):
"""
Bulk deletes alerts in a specific group (e.g. when an issue is resolved).
"""
try:
Alert.query.filter_by(user_id=user_id, group_key=group_key).delete()
db.session.commit()
return True
except Exception:
db.session.rollback()
return False

@staticmethod
def update_preferences(user_id: int, preferences: List[Dict[str, Any]]):
"""Updates user alert preferences across categories."""
try:
for p in preferences:
category = p.get('category')
if not category:
continue

pref = AlertPreference.query.filter_by(
user_id=user_id,
category=category
).first()

if not pref:
pref = AlertPreference(user_id=user_id, category=category)
db.session.add(pref)

pref.email_enabled = p.get('email_enabled', pref.email_enabled)
pref.sms_enabled = p.get('sms_enabled', pref.sms_enabled)
pref.push_enabled = p.get('push_enabled', pref.push_enabled)
pref.websocket_enabled = p.get('websocket_enabled', pref.websocket_enabled)
pref.min_priority = p.get('min_priority', pref.min_priority)

db.session.commit()
return True
except Exception as e:
logger.error(f"Failed to update preferences: {str(e)}")
db.session.rollback()
return False
Loading