diff --git a/app.py b/app.py index bd881591..14fbbb84 100644 --- a/app.py +++ b/app.py @@ -27,6 +27,7 @@ from auth_utils import token_required, roles_required import backend.sockets.forum_events # Register forum socket events import backend.sockets.knowledge_events # Register knowledge exchange events +import backend.sockets.alert_socket # Register centralized alert socket events from backend.utils.i18n import t # Set up logging diff --git a/backend/models/__init__.py b/backend/models/__init__.py index d45eaabd..b727dbf4 100644 --- a/backend/models/__init__.py +++ b/backend/models/__init__.py @@ -12,6 +12,7 @@ from .knowledge import Question, Answer, KnowledgeVote, Badge, UserBadge, UserExpertise from .equipment import Equipment, RentalBooking, AvailabilityCalendar, PaymentEscrow from .farm import Farm, FarmMember, FarmAsset, FarmRole +from .alert import Alert, AlertPreference from .weather import WeatherData, CropAdvisory, AdvisorySubscription from .sustainability import CarbonPractice, CreditLedger, AuditRequest from .vendor_profile import VendorProfile @@ -49,5 +50,6 @@ 'WarehouseLocation', 'StockItem', 'StockMovement', 'ReconciliationLog', 'ClimateZone', 'SensorNode', 'TelemetryLog', 'AutomationTrigger', 'WorkerProfile', 'WorkShift', 'HarvestLog', 'PayrollEntry', - 'DriverProfile', 'DeliveryVehicle', 'TransportRoute', 'FuelLog' + 'DriverProfile', 'DeliveryVehicle', 'TransportRoute', 'FuelLog', + 'Alert', 'AlertPreference' ] diff --git a/backend/models/alert.py b/backend/models/alert.py new file mode 100644 index 00000000..9c9c614e --- /dev/null +++ b/backend/models/alert.py @@ -0,0 +1,120 @@ +from datetime import datetime +from backend.extensions import db + +class Alert(db.Model): + __tablename__ = 'alerts' + + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True) # Null for broadcast + title = db.Column(db.String(255), nullable=False) + message = db.Column(db.Text, nullable=False) + + # Priority: LOW, MEDIUM, HIGH, CRITICAL + priority = db.Column(db.String(20), default='MEDIUM', nullable=False) + + # Category: WEATHER, MARKET, FORUM, SYSTEM, SECURITY, LOAN + category = db.Column(db.String(50), nullable=False) + + # Grouping key for similar alerts + group_key = db.Column(db.String(100), nullable=True) + + # Delivery Channels Status + websocket_delivered = db.Column(db.Boolean, default=False) + email_delivered = db.Column(db.Boolean, default=False) + sms_delivered = db.Column(db.Boolean, default=False) + push_delivered = db.Column(db.Boolean, default=False) + + # Expiry for cleanup + expires_at = db.Column(db.DateTime, nullable=True) + + read_at = db.Column(db.DateTime, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow) + + # Metadata for deep-linking + action_url = db.Column(db.String(512), nullable=True) + metadata_json = db.Column(db.Text, nullable=True) # JSON string for extra data + + @property + def gravity_score(self) -> int: + """Calculates a numerical score for alert importance.""" + base_scores = {'CRITICAL': 100, 'HIGH': 75, 'MEDIUM': 50, 'LOW': 25} + score = base_scores.get(self.priority, 50) + + # Boost score if category is high-risk + if self.category in ['SECURITY', 'MARKET']: + score += 20 + + return score + + def validate(self): + """Validates alert data before persistence.""" + if not self.title or len(self.title) < 5: + raise ValueError("Alert title must be at least 5 characters long.") + if not self.category: + raise ValueError("Alert category is required.") + if self.priority not in ['LOW', 'MEDIUM', 'HIGH', 'CRITICAL']: + raise ValueError("Invalid priority level.") + return True + + def to_dict(self): + """Serializes alert data for API and WebSocket delivery.""" + res = { + 'id': self.id, + 'user_id': self.user_id, + 'title': self.title, + 'message': self.message, + 'priority': self.priority, + 'category': self.category, + 'gravity': self.gravity_score, + 'group_key': self.group_key, + 'delivery_report': { + 'websocket': self.websocket_delivered, + 'email': self.email_delivered, + 'sms': self.sms_delivered, + 'push': self.push_delivered + }, + 'status': 'READ' if self.read_at else 'UNREAD', + 'read_at': self.read_at.isoformat() if self.read_at else None, + 'created_at': self.created_at.isoformat(), + 'expires_at': self.expires_at.isoformat() if self.expires_at else None, + 'action_url': self.action_url, + 'is_expired': self.expires_at < datetime.utcnow() if self.expires_at else False + } + + # Parse metadata safely + try: + res['metadata'] = json.loads(self.metadata_json) if self.metadata_json else {} + except Exception: + res['metadata'] = {} + + return res + + @staticmethod + def get_priority_weight(priority): + """Helper to compare priority levels numerically.""" + weights = {'CRITICAL': 40, 'HIGH': 30, 'MEDIUM': 20, 'LOW': 10} + return weights.get(priority, 0) + +class AlertPreference(db.Model): + __tablename__ = 'alert_preferences' + + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) + category = db.Column(db.String(50), nullable=False) + + email_enabled = db.Column(db.Boolean, default=True) + sms_enabled = db.Column(db.Boolean, default=False) + push_enabled = db.Column(db.Boolean, default=True) + websocket_enabled = db.Column(db.Boolean, default=True) + + min_priority = db.Column(db.String(20), default='LOW') + + def to_dict(self): + return { + 'category': self.category, + 'email_enabled': self.email_enabled, + 'sms_enabled': self.sms_enabled, + 'push_enabled': self.push_enabled, + 'websocket_enabled': self.websocket_enabled, + 'min_priority': self.min_priority + } diff --git a/backend/services/alert_registry.py b/backend/services/alert_registry.py new file mode 100644 index 00000000..2ec00263 --- /dev/null +++ b/backend/services/alert_registry.py @@ -0,0 +1,234 @@ +import json +import logging +from datetime import datetime, timedelta +from typing import Optional, Dict, List, Any +from backend.extensions import db, mail, socketio +from backend.models import Alert, User, AlertPreference +from backend.utils.logger import logger + +class AlertRegistry: + """ + Centralized registry for managing cross-module alerts. + Supports asynchronous delivery, priority-based queuing and multi-channel notification. + """ + + @staticmethod + def register_alert( + title: str, + message: str, + category: str, + user_id: Optional[int] = None, + priority: str = 'MEDIUM', + group_key: Optional[str] = None, + action_url: Optional[str] = None, + metadata: Optional[Dict] = None, + ttl_days: int = 30 + ) -> Optional[Alert]: + """ + Registers a new alert and initiates the multi-channel delivery process. + """ + try: + # 1. Create the alert record + alert = Alert( + user_id=user_id, + title=title, + message=message, + category=category, + priority=priority, + group_key=group_key, + action_url=action_url, + metadata_json=json.dumps(metadata) if metadata else None, + expires_at=datetime.utcnow() + timedelta(days=ttl_days) + ) + + db.session.add(alert) + db.session.commit() + + # 2. Process delivery based on user preferences or global rules + AlertRegistry._dispatch_alert(alert) + + return alert + except Exception as e: + logger.error(f"Failed to register alert: {str(e)}", exc_info=True) + db.session.rollback() + return None + + @staticmethod + def _dispatch_alert(alert: Alert): + """ + Dispatches alert to various channels based on user preferences. + """ + # If it's a global broadcast, we use default channels + if not alert.user_id: + AlertRegistry._deliver_websocket(alert) + return + + user = User.query.get(alert.user_id) + if not user: + return + + # Fetch or create default preferences + pref = AlertPreference.query.filter_by( + user_id=user.id, + category=alert.category + ).first() + + # Check if priority meets minimum requirement + if pref and Alert.get_priority_weight(alert.priority) < Alert.get_priority_weight(pref.min_priority): + logger.info(f"Alert {alert.id} skipped due to priority threshold for user {user.id}") + return + + # WebSocket Delivery (Real-time) + if not pref or pref.websocket_enabled: + AlertRegistry._deliver_websocket(alert) + + # Email Delivery + if (not pref or pref.email_enabled) and user.email: + AlertRegistry._deliver_email(alert, user.email) + + # SMS Delivery + if pref and pref.sms_enabled and hasattr(user, 'phone') and user.phone: + AlertRegistry._deliver_sms(alert, user.phone) + + @staticmethod + def _deliver_websocket(alert: Alert): + """Delivers real-time alert via WebSockets.""" + try: + payload = alert.to_dict() + room = f"user_{alert.user_id}" if alert.user_id else "global_alerts" + + socketio.emit('new_alert', payload, room=room) + + alert.websocket_delivered = True + db.session.commit() + logger.info(f"WebSocket alert {alert.id} delivered to {room}") + except Exception as e: + logger.error(f"WebSocket delivery failed for alert {alert.id}: {str(e)}") + + @staticmethod + def _deliver_email(alert: Alert, email: str): + """Delivers alert via Email (asynchronous placeholder).""" + try: + # In a real app, this would be a Celery task + # from backend.tasks.email_tasks import send_alert_email + # send_alert_email.delay(alert.id, email) + + # For now, simulate/direct send + logger.info(f"Email alert {alert.id} queued for {email}") + alert.email_delivered = True + db.session.commit() + except Exception as e: + logger.error(f"Email delivery failed for alert {alert.id}: {str(e)}") + + @staticmethod + def _deliver_sms(alert: Alert, phone: str): + """Delivers alert via SMS (integration placeholder).""" + try: + logger.info(f"SMS alert {alert.id} queued for {phone}") + alert.sms_delivered = True + db.session.commit() + except Exception as e: + logger.error(f"SMS delivery failed for alert {alert.id}: {str(e)}") + + @staticmethod + def mark_as_read(alert_id: int, user_id: int) -> bool: + """Marks an alert as read for a specific user.""" + alert = Alert.query.filter_by(id=alert_id, user_id=user_id).first() + if alert: + alert.read_at = datetime.utcnow() + db.session.commit() + return True + return False + + @staticmethod + def get_user_alerts(user_id: int, unread_only: bool = False, limit: int = 50) -> List[Alert]: + """Retrieves alerts for a user with filtering.""" + query = Alert.query.filter( + (Alert.user_id == user_id) | (Alert.user_id.is_(None)) + ) + + if unread_only: + query = query.filter(Alert.read_at.is_(None)) + + return query.order_by(Alert.created_at.desc()).limit(limit).all() + + @staticmethod + def get_alert_summary(user_id: int) -> Dict[str, Any]: + """ + Generates a statistical summary of alerts for a user. + Useful for dashboard widgets and mobile push payload optimization. + """ + total = Alert.query.filter((Alert.user_id == user_id) | (Alert.user_id.is_(None))).count() + unread = Alert.query.filter( + ((Alert.user_id == user_id) | (Alert.user_id.is_(None))), + Alert.read_at.is_(None) + ).count() + + # Priority breakdown + critical = Alert.query.filter_by(user_id=user_id, priority='CRITICAL', read_at=None).count() + high = Alert.query.filter_by(user_id=user_id, priority='HIGH', read_at=None).count() + + # Category distribution + categories = db.session.query( + Alert.category, db.func.count(Alert.id) + ).filter_by(user_id=user_id).group_by(Alert.category).all() + + return { + 'total_alerts': total, + 'unread_count': unread, + 'critical_count': critical, + 'high_count': high, + 'category_distribution': {cat: count for cat, count in categories}, + 'has_unread_critical': critical > 0 + } + + @staticmethod + def aggregate_alerts_by_group(user_id: int, group_key: str) -> List[Alert]: + """ + Retrieves all related alerts within a group to prevent notification fatigue. + """ + return Alert.query.filter_by(user_id=user_id, group_key=group_key).all() + + @staticmethod + def delete_alerts_by_group(user_id: int, group_key: str): + """ + Bulk deletes alerts in a specific group (e.g. when an issue is resolved). + """ + try: + Alert.query.filter_by(user_id=user_id, group_key=group_key).delete() + db.session.commit() + return True + except Exception: + db.session.rollback() + return False + + @staticmethod + def update_preferences(user_id: int, preferences: List[Dict[str, Any]]): + """Updates user alert preferences across categories.""" + try: + for p in preferences: + category = p.get('category') + if not category: + continue + + pref = AlertPreference.query.filter_by( + user_id=user_id, + category=category + ).first() + + if not pref: + pref = AlertPreference(user_id=user_id, category=category) + db.session.add(pref) + + pref.email_enabled = p.get('email_enabled', pref.email_enabled) + pref.sms_enabled = p.get('sms_enabled', pref.sms_enabled) + pref.push_enabled = p.get('push_enabled', pref.push_enabled) + pref.websocket_enabled = p.get('websocket_enabled', pref.websocket_enabled) + pref.min_priority = p.get('min_priority', pref.min_priority) + + db.session.commit() + return True + except Exception as e: + logger.error(f"Failed to update preferences: {str(e)}") + db.session.rollback() + return False diff --git a/backend/services/market_service.py b/backend/services/market_service.py index 464f5447..edbed123 100644 --- a/backend/services/market_service.py +++ b/backend/services/market_service.py @@ -113,13 +113,17 @@ def check_watchlist_alerts(updated_prices): for watcher in watchers: # Trigger alert if current price is higher than target (Profit opportunity) if price_data['modal_price'] >= watcher.target_price: - msg = f"Alert! {watcher.crop_name} price in {price_data['district']} reached ₹{price_data['modal_price']}, crossing your target of ₹{watcher.target_price}." + msg = f"Profit Opportunity! {watcher.crop_name} price in {price_data['district']} reached ₹{price_data['modal_price']}, crossing your target of ₹{watcher.target_price}." - NotificationService.create_notification( + from backend.services.alert_registry import AlertRegistry + AlertRegistry.register_alert( user_id=watcher.user_id, title="Market Price Alert", message=msg, - notification_type="price_alert" + category="MARKET", + priority="HIGH", + action_url=f"/market?crop={watcher.crop_name}", + group_key=f"price_{watcher.crop_name}_{price_data['district']}" ) alerts_triggered.append({ "user_id": watcher.user_id, diff --git a/backend/services/weather_service.py b/backend/services/weather_service.py index e4d75bc2..83fbb1b3 100644 --- a/backend/services/weather_service.py +++ b/backend/services/weather_service.py @@ -27,6 +27,29 @@ def update_weather_for_location(location): db.session.add(weather) db.session.commit() + + # Trigger Alerts for Extreme Conditions + # Note: In a real app we'd query users in this location + from backend.services.alert_registry import AlertRegistry + + if weather.temperature > 40: + AlertRegistry.register_alert( + title="Extreme Heat Alert", + message=f"Location {location} is experiencing extreme heat ({weather.temperature}°C). Please protect your crops.", + category="WEATHER", + priority="HIGH", + group_key=f"heat_{location}" + ) + + if weather.rainfall > 50: + AlertRegistry.register_alert( + title="Heavy Rainfall Warning", + message=f"Intense rainfall detected in {location}. Check drainage systems.", + category="WEATHER", + priority="CRITICAL", + group_key=f"rain_{location}" + ) + return weather @staticmethod diff --git a/backend/sockets/alert_socket.py b/backend/sockets/alert_socket.py new file mode 100644 index 00000000..7d2a33b7 --- /dev/null +++ b/backend/sockets/alert_socket.py @@ -0,0 +1,43 @@ +from flask_socketio import emit, join_room, leave_room +from backend.extensions import socketio +from backend.utils.logger import logger + +@socketio.on('subscribe_alerts') +def handle_subscribe_alerts(data): + """ + Subscribes a user to their private alert room. + """ + user_id = data.get('user_id') + if user_id: + room = f"user_{user_id}" + join_room(room) + logger.info(f"User {user_id} subscribed to alert room: {room}") + emit('subscription_success', {'room': room}) + + # All users subscribe to global alerts + join_room('global_alerts') + +@socketio.on('unsubscribe_alerts') +def handle_unsubscribe_alerts(data): + """ + Unsubscribes a user from their private alert room. + """ + user_id = data.get('user_id') + if user_id: + room = f"user_{user_id}" + leave_room(room) + logger.info(f"User {user_id} unsubscribed from alert room: {room}") + +@socketio.on('acknowledge_alert') +def handle_acknowledge_alert(data): + """ + Handles client-side acknowledgement of an alert. + """ + alert_id = data.get('alert_id') + user_id = data.get('user_id') + + if alert_id and user_id: + # Here you could potentially call AlertRegistry.mark_as_read + # but usually that's better done via a REST API call for reliability + logger.info(f"Alert {alert_id} acknowledged by user {user_id}") + emit('alert_acknowledged', {'alert_id': alert_id}, room=f"user_{user_id}") diff --git a/backend/tasks/alert_cleanup.py b/backend/tasks/alert_cleanup.py new file mode 100644 index 00000000..aec7b55c --- /dev/null +++ b/backend/tasks/alert_cleanup.py @@ -0,0 +1,48 @@ +from datetime import datetime +from backend.celery_app import celery +from backend.extensions import db +from backend.models import Alert +from backend.utils.logger import logger + +@celery.task(name='tasks.cleanup_expired_alerts') +def cleanup_expired_alerts(): + """ + Background task to remove alerts that have passed their expiration date. + Runs periodically (e.g., daily). + """ + try: + now = datetime.utcnow() + expired_count = Alert.query.filter(Alert.expires_at < now).count() + + if expired_count > 0: + Alert.query.filter(Alert.expires_at < now).delete() + db.session.commit() + logger.info(f"Cleaned up {expired_count} expired alerts.") + else: + logger.info("No expired alerts to clean up.") + + return {'status': 'success', 'deleted': expired_count} + except Exception as e: + logger.error(f"Error during alert cleanup: {str(e)}") + db.session.rollback() + return {'status': 'error', 'message': str(e)} + +@celery.task(name='tasks.archive_old_read_alerts') +def archive_old_read_alerts(days=30): + """ + Archives or deletes read alerts older than a certain number of days. + """ + try: + cutoff = datetime.utcnow() - timedelta(days=days) + old_alerts = Alert.query.filter( + Alert.read_at.isnot(None), + Alert.read_at < cutoff + ).delete() + + db.session.commit() + logger.info(f"Archived {old_alerts} old read alerts.") + return {'status': 'success', 'archived': old_alerts} + except Exception as e: + logger.error(f"Error during alert archiving: {str(e)}") + db.session.rollback() + return {'status': 'error', 'message': str(e)} diff --git a/js/alert_manager.js b/js/alert_manager.js new file mode 100644 index 00000000..823ddb9a --- /dev/null +++ b/js/alert_manager.js @@ -0,0 +1,190 @@ +/** + * AgriTech Unified Alert Manager + * Manages real-time alerts, notifications, and cross-module communications. + */ + +class AlertManager { + constructor() { + this.socket = null; + this.userId = null; + this.alerts = []; + this.unreadCount = 0; + this.containerId = 'alert-notification-hub'; + this.badgeId = 'alert-badge-count'; + + this.init(); + } + + init() { + // Try to get user from global session or local storage + const userStr = localStorage.getItem('user'); + if (userStr) { + try { + const user = JSON.parse(userStr); + this.userId = user.id; + } catch (e) { + console.error('Failed to parse user data:', e); + } + } + + // Initialize Socket.IO connection + this.socket = typeof io !== 'undefined' ? io() : null; + if (this.socket) { + this.setupSocketHandlers(); + } else { + console.warn('Socket.IO not found. Real-time alerts disabled.'); + } + + // Load historical alerts + this.fetchAlerts(); + + // Setup UI listeners + document.addEventListener('DOMContentLoaded', () => { + this.updateUI(); + }); + } + + setupSocketHandlers() { + this.socket.on('connect', () => { + console.log('Connected to Alert System'); + if (this.userId) { + this.socket.emit('subscribe_alerts', { user_id: this.userId }); + } else { + this.socket.emit('subscribe_alerts', {}); // Global only + } + }); + + this.socket.on('new_alert', (alert) => { + console.log('New Alert Received:', alert); + this.handleIncomingAlert(alert); + }); + + this.socket.on('subscription_success', (data) => { + console.log('Subscribed to room:', data.room); + }); + } + + async fetchAlerts() { + if (!this.userId) return; + + try { + const response = await fetch(`/api/v1/alerts/?user_id=${this.userId}`); + if (response.ok) { + const data = await response.json(); + this.alerts = data.alerts || []; + this.updateMetadata(); + this.updateUI(); + } + } catch (error) { + console.error('Failed to fetch alerts:', error); + } + } + + handleIncomingAlert(alert) { + // Add to list + this.alerts.unshift(alert); + + // Play sound if high priority + if (['HIGH', 'CRITICAL'].includes(alert.priority)) { + this.playAlertSound(); + } + + // Show toast + this.showToast(alert); + + // Update state + this.updateMetadata(); + this.updateUI(); + } + + updateMetadata() { + this.unreadCount = this.alerts.filter(a => !a.read_at).length; + } + + updateUI() { + // Update badge + const badge = document.getElementById(this.badgeId); + if (badge) { + badge.textContent = this.unreadCount; + badge.style.display = this.unreadCount > 0 ? 'inline-flex' : 'none'; + } + + // Update list if open + const container = document.getElementById(this.containerId); + if (container) { + this.renderAlertList(container); + } + } + + renderAlertList(container) { + if (this.alerts.length === 0) { + container.innerHTML = '
${alert.message}
+