diff --git a/docs/login-anomaly.md b/docs/login-anomaly.md new file mode 100644 index 00000000..debb027d --- /dev/null +++ b/docs/login-anomaly.md @@ -0,0 +1,369 @@ +# Login Anomaly Detection & Security Alerts + +This document describes the login anomaly detection and suspicious activity alerts feature in FinMind. + +## Overview + +The security system provides real-time detection of suspicious login activities and automatically generates alerts when potential security threats are detected. This helps protect user accounts from unauthorized access. + +## Features + +### 1. Multi-Factor Risk Scoring + +The system calculates a risk score (0.0 to 1.0) based on multiple signals: + +| Risk Factor | Weight | Description | +|-------------|--------|-------------| +| New IP Address | 0.30 | Login from an IP not previously seen for this user | +| New Device | 0.25 | Login from a device fingerprint not previously seen | +| Unusual Time | 0.15 | Login during unusual hours (1 AM - 5 AM UTC) | +| Recent Failed Attempts | 0.10-0.40 | Multiple failed login attempts in last 15 minutes | +| Rapid Location Change | 0.30 | Login from different IP within 1 hour | + +### 2. Brute Force Protection + +- **Threshold**: 5 failed attempts within 15 minutes +- **Block Duration**: 30 minutes +- **Storage**: Redis with automatic expiration + +### 3. Automatic Alert Generation + +Security alerts are automatically created when: +- Risk score >= 0.5 (Medium/High severity) +- Brute force threshold is reached +- Suspicious patterns are detected + +### 4. Login Event Tracking + +All login events are recorded with: +- User ID +- Event type (success, failed, blocked, etc.) +- IP address +- User agent +- Device fingerprint +- Location metadata +- Risk score and factors + +## API Endpoints + +### Record Login Event + +```http +POST /security/record +Authorization: Bearer +Content-Type: application/json + +{ + "event_type": "login_success", + "ip_address": "192.168.1.1", + "device_fingerprint": "abc123", + "location_country": "United States", + "location_city": "New York" +} +``` + +Response: +```json +{ + "message": "event recorded", + "event": { + "id": 1, + "event_type": "login_success", + "ip_address": "192.168.1.1", + "risk_score": 0.3, + "risk_factors": ["new_ip_address"] + }, + "risk_score": 0.3, + "risk_factors": ["new_ip_address"] +} +``` + +### Get Login History + +```http +GET /security/history?event_type=login_success&limit=50&offset=0 +Authorization: Bearer +``` + +Response: +```json +{ + "events": [...], + "count": 10, + "limit": 50, + "offset": 0 +} +``` + +### Get Security Alerts + +```http +GET /security/alerts?status=active&severity=high +Authorization: Bearer +``` + +Response: +```json +{ + "alerts": [ + { + "id": 1, + "alert_type": "suspicious_login", + "severity": "high", + "status": "active", + "title": "Suspicious Login Detected", + "description": "Login from new IP and device", + "ip_address": "192.168.1.1", + "created_at": "2026-03-24T10:00:00Z" + } + ], + "count": 1 +} +``` + +### Acknowledge Alert + +```http +POST /security/alerts/1/acknowledge +Authorization: Bearer +``` + +Response: +```json +{ + "message": "alert acknowledged", + "alert": { + "id": 1, + "status": "acknowledged", + "acknowledged_at": "2026-03-24T11:00:00Z" + } +} +``` + +### Acknowledge All Alerts + +```http +POST /security/alerts/acknowledge-all +Authorization: Bearer +``` + +Response: +```json +{ + "message": "all alerts acknowledged", + "count": 3 +} +``` + +### Get Security Statistics + +```http +GET /security/stats?days=30 +Authorization: Bearer +``` + +Response: +```json +{ + "period_days": 30, + "total_logins": 45, + "failed_logins": 3, + "unique_ips": 2, + "unique_devices": 2, + "active_alerts": 1, + "total_alerts": 5, + "high_risk_logins": 2, + "login_success_rate": 93.8 +} +``` + +### Analyze Login Risk + +```http +POST /security/analyze +Authorization: Bearer +Content-Type: application/json + +{ + "ip_address": "192.168.1.1", + "device_fingerprint": "abc123" +} +``` + +Response: +```json +{ + "ip_address": "192.168.1.1", + "device_fingerprint": "abc123", + "risk_score": 0.55, + "risk_factors": ["new_ip_address", "new_device"], + "recommendation": "review" +} +``` + +### Check IP Block Status + +```http +GET /security/check-ip +Authorization: Bearer +``` + +Response: +```json +{ + "ip_address": "192.168.1.1", + "blocked": false +} +``` + +## Database Schema + +### login_events Table + +```sql +CREATE TABLE login_events ( + id SERIAL PRIMARY KEY, + user_id INT REFERENCES users(id) ON DELETE SET NULL, + event_type VARCHAR(30) NOT NULL, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + device_fingerprint VARCHAR(128), + location_country VARCHAR(100), + location_city VARCHAR(100), + risk_score NUMERIC(3,2) DEFAULT 0.0, + risk_factors TEXT, -- JSON array + created_at TIMESTAMP DEFAULT NOW() +); +``` + +### security_alerts Table + +```sql +CREATE TABLE security_alerts ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + alert_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) DEFAULT 'medium', + status VARCHAR(20) DEFAULT 'active', + title VARCHAR(200) NOT NULL, + description TEXT, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + login_event_id INT REFERENCES login_events(id), + acknowledged_at TIMESTAMP, + acknowledged_by INT REFERENCES users(id), + created_at TIMESTAMP DEFAULT NOW() +); +``` + +## Alert Types + +| Alert Type | Description | Default Severity | +|------------|-------------|------------------| +| `suspicious_login` | Login with elevated risk score | medium/high | +| `brute_force_detected` | Multiple failed login attempts | high | +| `new_device_login` | First login from new device | low | +| `new_location_login` | Login from new geographic location | medium | + +## Alert Severities + +| Severity | Description | +|----------|-------------| +| `low` | Minor anomaly, informational | +| `medium` | Moderate concern, review recommended | +| `high` | Significant concern, immediate review | +| `critical` | Critical security issue, urgent action | + +## Alert Statuses + +| Status | Description | +|--------|-------------| +| `active` | New alert, requires attention | +| `acknowledged` | User has seen the alert | +| `resolved` | Issue has been resolved | + +## Configuration + +### Environment Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `BRUTE_FORCE_THRESHOLD` | Failed attempts before block | 5 | +| `BRUTE_FORCE_WINDOW_MINUTES` | Time window for counting attempts | 15 | +| `BRUTE_FORCE_BLOCK_MINUTES` | Block duration after threshold | 30 | + +### Redis Keys + +| Key Pattern | Purpose | TTL | +|-------------|---------|-----| +| `security:brute_force:{ip}` | Failed attempt counter | 15 min | +| `security:brute_force:{ip}:blocked` | Block flag | 30 min | +| `security:user_ips:{user_id}` | Known IPs for user | 30 days | +| `security:user_devices:{user_id}` | Known devices for user | 30 days | + +## Integration with Auth Flow + +The login anomaly detection is integrated into the `/auth/login` endpoint: + +1. Check if IP is blocked due to brute force +2. Process login credentials +3. Calculate risk score if successful +4. Record login event +5. Generate alert if risk threshold exceeded +6. Return tokens with security information + +### Login Response with Security Info + +```json +{ + "access_token": "...", + "refresh_token": "...", + "security_alert": { + "type": "suspicious_login", + "severity": "medium", + "message": "Suspicious Login Detected" + }, + "security_notice": { + "risk_score": 0.55, + "message": "Login from new device or location detected" + } +} +``` + +## Testing + +Run the security tests: + +```bash +# Using the test script +./scripts/test-backend.ps1 tests/test_security.py + +# Or directly with pytest +cd packages/backend +pytest tests/test_security.py -v +``` + +## Best Practices + +### For Users + +1. Acknowledge security alerts promptly +2. Review login history regularly +3. Report unrecognized login activities +4. Enable additional security measures when available + +### For Developers + +1. Always check risk scores before allowing sensitive operations +2. Log security events for audit purposes +3. Use Redis for fast brute force detection +4. Keep risk thresholds configurable for different environments + +## Future Enhancements + +- [ ] Email/SMS notifications for security alerts +- [ ] Machine learning-based risk scoring +- [ ] Geographic location using GeoIP database +- [ ] Two-factor authentication integration +- [ ] Session management and remote logout +- [ ] Device trust management UI \ No newline at end of file diff --git a/packages/backend/app/db/migrations/001_login_anomaly.sql b/packages/backend/app/db/migrations/001_login_anomaly.sql new file mode 100644 index 00000000..36fd17cd --- /dev/null +++ b/packages/backend/app/db/migrations/001_login_anomaly.sql @@ -0,0 +1,75 @@ +-- Migration: Login Anomaly Detection Tables +-- Creates login_events and security_alerts tables for tracking +-- suspicious login activities and generating security alerts. + +-- Login Events Table +-- Tracks all login-related events including successful logins, +-- failed attempts, and suspicious activities. +CREATE TABLE IF NOT EXISTS login_events ( + id SERIAL PRIMARY KEY, + user_id INT REFERENCES users(id) ON DELETE SET NULL, + event_type VARCHAR(30) NOT NULL, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + device_fingerprint VARCHAR(128), + location_country VARCHAR(100), + location_city VARCHAR(100), + risk_score NUMERIC(3,2) NOT NULL DEFAULT 0.0, + risk_factors TEXT, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +-- Index for quick lookup of user's recent login events +CREATE INDEX IF NOT EXISTS idx_login_events_user_created + ON login_events(user_id, created_at DESC); + +-- Index for IP-based queries (detecting same IP across accounts) +CREATE INDEX IF NOT EXISTS idx_login_events_ip + ON login_events(ip_address, created_at DESC); + +-- Index for event type filtering +CREATE INDEX IF NOT EXISTS idx_login_events_type + ON login_events(event_type, created_at DESC); + +-- Security Alerts Table +-- Stores security alerts generated from suspicious activities +CREATE TABLE IF NOT EXISTS security_alerts ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + alert_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL DEFAULT 'medium', + status VARCHAR(20) NOT NULL DEFAULT 'active', + title VARCHAR(200) NOT NULL, + description TEXT, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + login_event_id INT REFERENCES login_events(id) ON DELETE SET NULL, + acknowledged_at TIMESTAMP, + acknowledged_by INT REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +-- Index for user's alerts lookup +CREATE INDEX IF NOT EXISTS idx_security_alerts_user_status + ON security_alerts(user_id, status, created_at DESC); + +-- Index for alert type filtering +CREATE INDEX IF NOT EXISTS idx_security_alerts_type + ON security_alerts(alert_type, created_at DESC); + +-- Index for severity filtering +CREATE INDEX IF NOT EXISTS idx_security_alerts_severity + ON security_alerts(severity, created_at DESC); + +-- Extend audit_logs table with additional columns for login events +ALTER TABLE audit_logs + ADD COLUMN IF NOT EXISTS ip_address VARCHAR(45); + +ALTER TABLE audit_logs + ADD COLUMN IF NOT EXISTS details TEXT; + +-- Add comment for documentation +COMMENT ON TABLE login_events IS 'Tracks all login events for anomaly detection'; +COMMENT ON TABLE security_alerts IS 'Security alerts generated from suspicious activities'; +COMMENT ON COLUMN login_events.risk_score IS 'Risk score from 0.0 (safe) to 1.0 (high risk)'; +COMMENT ON COLUMN login_events.risk_factors IS 'JSON array of detected risk factors'; \ No newline at end of file diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189de..4e25f457 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,60 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +-- Extend audit_logs for login events +ALTER TABLE audit_logs + ADD COLUMN IF NOT EXISTS ip_address VARCHAR(45); + +ALTER TABLE audit_logs + ADD COLUMN IF NOT EXISTS details TEXT; + +-- Login Events Table for anomaly detection +CREATE TABLE IF NOT EXISTS login_events ( + id SERIAL PRIMARY KEY, + user_id INT REFERENCES users(id) ON DELETE SET NULL, + event_type VARCHAR(30) NOT NULL, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + device_fingerprint VARCHAR(128), + location_country VARCHAR(100), + location_city VARCHAR(100), + risk_score NUMERIC(3,2) NOT NULL DEFAULT 0.0, + risk_factors TEXT, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_login_events_user_created + ON login_events(user_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_login_events_ip + ON login_events(ip_address, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_login_events_type + ON login_events(event_type, created_at DESC); + +-- Security Alerts Table +CREATE TABLE IF NOT EXISTS security_alerts ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + alert_type VARCHAR(50) NOT NULL, + severity VARCHAR(20) NOT NULL DEFAULT 'medium', + status VARCHAR(20) NOT NULL DEFAULT 'active', + title VARCHAR(200) NOT NULL, + description TEXT, + ip_address VARCHAR(45), + user_agent VARCHAR(500), + login_event_id INT REFERENCES login_events(id) ON DELETE SET NULL, + acknowledged_at TIMESTAMP, + acknowledged_by INT REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_security_alerts_user_status + ON security_alerts(user_id, status, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_security_alerts_type + ON security_alerts(alert_type, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_security_alerts_severity + ON security_alerts(severity, created_at DESC); diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..1219e01c 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -132,4 +132,94 @@ class AuditLog(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) + ip_address = db.Column(db.String(45), nullable=True) + details = db.Column(db.Text, nullable=True) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class LoginEventType(str, Enum): + LOGIN_SUCCESS = "login_success" + LOGIN_FAILED = "login_failed" + LOGOUT = "logout" + BRUTE_FORCE_BLOCKED = "brute_force_blocked" + SUSPICIOUS_LOGIN = "suspicious_login" + + +class LoginEvent(db.Model): + """Track all login-related events for anomaly detection.""" + __tablename__ = "login_events" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) + event_type = db.Column(db.String(30), nullable=False) + ip_address = db.Column(db.String(45), nullable=True) # IPv6 max length + user_agent = db.Column(db.String(500), nullable=True) + device_fingerprint = db.Column(db.String(128), nullable=True) + location_country = db.Column(db.String(100), nullable=True) + location_city = db.Column(db.String(100), nullable=True) + risk_score = db.Column(db.Numeric(3, 2), default=0.0, nullable=False) + risk_factors = db.Column(db.Text, nullable=True) # JSON array of risk factors + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + def to_dict(self): + import json + return { + "id": self.id, + "user_id": self.user_id, + "event_type": self.event_type, + "ip_address": self.ip_address, + "user_agent": self.user_agent, + "device_fingerprint": self.device_fingerprint, + "location_country": self.location_country, + "location_city": self.location_city, + "risk_score": float(self.risk_score) if self.risk_score else 0.0, + "risk_factors": json.loads(self.risk_factors) if self.risk_factors else [], + "created_at": self.created_at.isoformat() if self.created_at else None, + } + + +class AlertSeverity(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class AlertStatus(str, Enum): + ACTIVE = "active" + ACKNOWLEDGED = "acknowledged" + RESOLVED = "resolved" + + +class SecurityAlert(db.Model): + """Security alerts generated from suspicious activities.""" + __tablename__ = "security_alerts" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + alert_type = db.Column(db.String(50), nullable=False) + severity = db.Column(db.String(20), default=AlertSeverity.MEDIUM.value, nullable=False) + status = db.Column(db.String(20), default=AlertStatus.ACTIVE.value, nullable=False) + title = db.Column(db.String(200), nullable=False) + description = db.Column(db.Text, nullable=True) + ip_address = db.Column(db.String(45), nullable=True) + user_agent = db.Column(db.String(500), nullable=True) + login_event_id = db.Column(db.Integer, db.ForeignKey("login_events.id"), nullable=True) + acknowledged_at = db.Column(db.DateTime, nullable=True) + acknowledged_by = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + def to_dict(self): + return { + "id": self.id, + "user_id": self.user_id, + "alert_type": self.alert_type, + "severity": self.severity, + "status": self.status, + "title": self.title, + "description": self.description, + "ip_address": self.ip_address, + "user_agent": self.user_agent, + "login_event_id": self.login_event_id, + "acknowledged_at": self.acknowledged_at.isoformat() if self.acknowledged_at else None, + "acknowledged_by": self.acknowledged_by, + "created_at": self.created_at.isoformat() if self.created_at else None, + } diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..6bbd11e1 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .security import bp as security_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(security_bp, url_prefix="/security") diff --git a/packages/backend/app/routes/auth.py b/packages/backend/app/routes/auth.py index 05a39377..3c313afb 100644 --- a/packages/backend/app/routes/auth.py +++ b/packages/backend/app/routes/auth.py @@ -10,6 +10,12 @@ ) from ..extensions import db, redis_client from ..models import User +from ..services.login_anomaly import ( + process_login, + check_brute_force, + get_client_ip, + LoginEventType, +) import logging import time @@ -55,15 +61,65 @@ def login(): data = request.get_json() or {} email = data.get("email") password = data.get("password") + + # Get client info for anomaly detection + ip_address = get_client_ip() + + # Check brute force before processing + if check_brute_force(ip_address): + logger.warning("Login blocked due to brute force: email=%s, ip=%s", email, ip_address) + return jsonify(error="too many failed attempts, please try again later"), 429 + user = db.session.query(User).filter_by(email=email).first() - if not user or not check_password_hash(user.password_hash, password): + + # Determine if login is successful + is_successful = user is not None and check_password_hash(user.password_hash, password) + + # Process login with anomaly detection + login_event, security_alert, should_block = process_login( + user=user, + is_successful=is_successful, + ip_address=ip_address + ) + + if should_block: + logger.warning("Login blocked after brute force detection: email=%s", email) + return jsonify(error="account temporarily locked due to suspicious activity"), 429 + + if not is_successful: logger.warning("Login failed for email=%s", email) return jsonify(error="invalid credentials"), 401 + + # Successful login - create tokens access = create_access_token(identity=str(user.id)) refresh = create_refresh_token(identity=str(user.id)) _store_refresh_session(refresh, str(user.id)) + logger.info("Login success user_id=%s", user.id) - return jsonify(access_token=access, refresh_token=refresh) + + # Build response with security info + response_data = { + "access_token": access, + "refresh_token": refresh, + } + + # Include security alert if generated + if security_alert: + response_data["security_alert"] = { + "type": security_alert.alert_type, + "severity": security_alert.severity, + "message": security_alert.title, + } + logger.info("Security alert generated for user_id=%s: %s", user.id, security_alert.title) + + # Include risk info if elevated + if login_event and float(login_event.risk_score) > 0.3: + response_data["security_notice"] = { + "risk_score": float(login_event.risk_score), + "message": "Login from new device or location detected" + } + + return jsonify(response_data) @bp.get("/me") diff --git a/packages/backend/app/routes/security.py b/packages/backend/app/routes/security.py new file mode 100644 index 00000000..fd54120b --- /dev/null +++ b/packages/backend/app/routes/security.py @@ -0,0 +1,273 @@ +""" +Security API Routes + +Endpoints for login anomaly detection and security alerts: +- POST /security/record - Record a login event +- GET /security/history - Get login history +- GET /security/alerts - Get security alerts +- POST /security/alerts//acknowledge - Acknowledge an alert +- POST /security/alerts/acknowledge-all - Acknowledge all alerts +- GET /security/stats - Get security statistics +""" + +from flask import Blueprint, request, jsonify +from flask_jwt_extended import jwt_required, get_jwt_identity +import logging + +from ..services.login_anomaly import ( + record_login_event, + get_user_login_history, + get_user_alerts, + acknowledge_alert, + acknowledge_all_alerts, + get_security_stats, + process_login, + check_brute_force, + get_client_ip, + get_user_agent, + get_device_fingerprint, + calculate_risk_score, + LoginEventType, + AlertSeverity, +) +from ..extensions import db +from ..models import User, SecurityAlert + +bp = Blueprint("security", __name__) +logger = logging.getLogger("finmind.security") + + +@bp.post("/record") +@jwt_required() +def record_event(): + """ + Record a login event with anomaly detection. + + Request body: + - event_type: 'login_success' | 'login_failed' | 'logout' + - ip_address (optional): Client IP + - user_agent (optional): Client user agent + - device_fingerprint (optional): Device fingerprint + - location_country (optional): Country name + - location_city (optional): City name + """ + uid = int(get_jwt_identity()) + user = db.session.get(User, uid) + if not user: + return jsonify(error="user not found"), 404 + + data = request.get_json() or {} + event_type = data.get("event_type") + + if event_type not in [e.value for e in LoginEventType]: + return jsonify(error=f"invalid event_type. Must be one of: {[e.value for e in LoginEventType]}"), 400 + + ip_address = data.get("ip_address") or get_client_ip() + user_agent = data.get("user_agent") or get_user_agent() + device_fp = data.get("device_fingerprint") or get_device_fingerprint() + + # Calculate risk score + risk_score, risk_factors = calculate_risk_score( + user_id=uid, + ip_address=ip_address, + device_fingerprint=device_fp, + event_type=event_type + ) + + # Record the event + event = record_login_event( + user_id=uid, + event_type=event_type, + ip_address=ip_address, + user_agent=user_agent, + device_fingerprint=device_fp, + location_country=data.get("location_country"), + location_city=data.get("location_city"), + risk_score=risk_score, + risk_factors=risk_factors + ) + + return jsonify({ + "message": "event recorded", + "event": event.to_dict(), + "risk_score": risk_score, + "risk_factors": risk_factors + }), 201 + + +@bp.get("/history") +@jwt_required() +def login_history(): + """ + Get login history for the current user. + + Query params: + - event_type (optional): Filter by event type + - limit (optional): Max results (default 50, max 200) + - offset (optional): Pagination offset + """ + uid = int(get_jwt_identity()) + + event_type = request.args.get("event_type") + try: + limit = min(int(request.args.get("limit", 50)), 200) + offset = max(int(request.args.get("offset", 0)), 0) + except ValueError: + return jsonify(error="invalid limit or offset"), 400 + + history = get_user_login_history( + user_id=uid, + event_type=event_type, + limit=limit, + offset=offset + ) + + return jsonify({ + "events": history, + "count": len(history), + "limit": limit, + "offset": offset + }) + + +@bp.get("/alerts") +@jwt_required() +def alerts(): + """ + Get security alerts for the current user. + + Query params: + - status (optional): Filter by status (active, acknowledged, resolved) + - severity (optional): Filter by severity (low, medium, high, critical) + - limit (optional): Max results (default 50, max 200) + - offset (optional): Pagination offset + """ + uid = int(get_jwt_identity()) + + status = request.args.get("status") + severity = request.args.get("severity") + + try: + limit = min(int(request.args.get("limit", 50)), 200) + offset = max(int(request.args.get("offset", 0)), 0) + except ValueError: + return jsonify(error="invalid limit or offset"), 400 + + user_alerts = get_user_alerts( + user_id=uid, + status=status, + severity=severity, + limit=limit, + offset=offset + ) + + return jsonify({ + "alerts": user_alerts, + "count": len(user_alerts), + "limit": limit, + "offset": offset + }) + + +@bp.post("/alerts//acknowledge") +@jwt_required() +def acknowledge_alert_route(alert_id: int): + """ + Acknowledge a security alert. + """ + uid = int(get_jwt_identity()) + + alert = acknowledge_alert(alert_id=alert_id, user_id=uid) + + if not alert: + return jsonify(error="alert not found"), 404 + + return jsonify({ + "message": "alert acknowledged", + "alert": alert.to_dict() + }) + + +@bp.post("/alerts/acknowledge-all") +@jwt_required() +def acknowledge_all_route(): + """ + Acknowledge all active security alerts for the current user. + """ + uid = int(get_jwt_identity()) + + count = acknowledge_all_alerts(user_id=uid) + + return jsonify({ + "message": "all alerts acknowledged", + "count": count + }) + + +@bp.get("/stats") +@jwt_required() +def stats(): + """ + Get security statistics for the current user. + + Query params: + - days (optional): Number of days to include (default 30, max 365) + """ + uid = int(get_jwt_identity()) + + try: + days = min(int(request.args.get("days", 30)), 365) + except ValueError: + return jsonify(error="invalid days parameter"), 400 + + statistics = get_security_stats(user_id=uid, days=days) + + return jsonify(statistics) + + +@bp.get("/check-ip") +@jwt_required() +def check_ip(): + """ + Check if the current IP is blocked due to brute force. + """ + ip_address = get_client_ip() + is_blocked = check_brute_force(ip_address) + + return jsonify({ + "ip_address": ip_address, + "blocked": is_blocked + }) + + +@bp.post("/analyze") +@jwt_required() +def analyze_login(): + """ + Analyze a potential login and return risk assessment. + Does not record anything - just returns the analysis. + + Request body: + - ip_address (optional): IP to analyze + - device_fingerprint (optional): Device fingerprint + """ + uid = int(get_jwt_identity()) + + data = request.get_json() or {} + ip_address = data.get("ip_address") or get_client_ip() + device_fp = data.get("device_fingerprint") or get_device_fingerprint() + + risk_score, risk_factors = calculate_risk_score( + user_id=uid, + ip_address=ip_address, + device_fingerprint=device_fp, + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + + return jsonify({ + "ip_address": ip_address, + "device_fingerprint": device_fp, + "risk_score": risk_score, + "risk_factors": risk_factors, + "recommendation": "allow" if risk_score < 0.5 else "review" if risk_score < 0.7 else "block" + }) \ No newline at end of file diff --git a/packages/backend/app/services/login_anomaly.py b/packages/backend/app/services/login_anomaly.py new file mode 100644 index 00000000..353550a5 --- /dev/null +++ b/packages/backend/app/services/login_anomaly.py @@ -0,0 +1,637 @@ +""" +Login Anomaly Detection Service + +Provides real-time detection of suspicious login activities including: +- New IP address detection +- New device detection +- Unusual login time detection +- Brute force attack detection +- Geographic anomaly detection + +Risk scores are calculated based on multiple signals and security alerts +are automatically generated when thresholds are exceeded. +""" + +import json +import logging +from datetime import datetime, timedelta +from typing import Optional, List, Dict, Any +from decimal import Decimal +from flask import request +from sqlalchemy import func, desc, and_, or_ +from ..extensions import db, redis_client +from ..models import LoginEvent, SecurityAlert, User, AuditLog, LoginEventType, AlertSeverity, AlertStatus + +logger = logging.getLogger("finmind.security") + + +# Risk score thresholds +RISK_THRESHOLD_LOW = 0.3 +RISK_THRESHOLD_MEDIUM = 0.5 +RISK_THRESHOLD_HIGH = 0.7 +RISK_THRESHOLD_CRITICAL = 0.9 + +# Brute force detection settings +BRUTE_FORCE_THRESHOLD = 5 # Max failed attempts before blocking +BRUTE_FORCE_WINDOW_MINUTES = 15 # Time window for counting attempts +BRUTE_FORCE_BLOCK_MINUTES = 30 # How long to block after threshold reached + +# Redis key prefixes +REDIS_PREFIX = "security:" +BRUTE_FORCE_KEY = REDIS_PREFIX + "brute_force:{ip}" +USER_IPS_KEY = REDIS_PREFIX + "user_ips:{user_id}" +USER_DEVICES_KEY = REDIS_PREFIX + "user_devices:{user_id}" + + +class AnomalyDetectionError(Exception): + """Base exception for anomaly detection errors.""" + pass + + +def get_client_ip() -> Optional[str]: + """Extract client IP address from request.""" + # Check for proxy headers first + if request: + forwarded = request.headers.get("X-Forwarded-For") + if forwarded: + return forwarded.split(",")[0].strip() + real_ip = request.headers.get("X-Real-IP") + if real_ip: + return real_ip + return request.remote_addr + return None + + +def get_user_agent() -> Optional[str]: + """Extract user agent from request.""" + if request: + return request.headers.get("User-Agent", "")[:500] + return None + + +def get_device_fingerprint() -> Optional[str]: + """ + Generate a device fingerprint from request headers. + This is a simple fingerprint based on user agent and other headers. + For production, consider using more sophisticated methods. + """ + if not request: + return None + + components = [] + ua = request.headers.get("User-Agent", "") + if ua: + components.append(ua[:100]) + + # Add more headers for fingerprinting + accept_lang = request.headers.get("Accept-Language", "") + if accept_lang: + components.append(accept_lang[:50]) + + # Simple hash-based fingerprint + import hashlib + fingerprint_data = "|".join(components) + return hashlib.sha256(fingerprint_data.encode()).hexdigest()[:64] + + +def calculate_risk_score( + user_id: int, + ip_address: Optional[str], + device_fingerprint: Optional[str], + event_type: str = LoginEventType.LOGIN_SUCCESS.value +) -> tuple[float, List[str]]: + """ + Calculate a risk score for a login event. + + Returns: + tuple: (risk_score, list of risk factors) + """ + risk_score = 0.0 + risk_factors = [] + + # Factor 1: New IP address (0.3 weight) + if ip_address: + if is_new_ip(user_id, ip_address): + risk_score += 0.3 + risk_factors.append("new_ip_address") + + # Factor 2: New device (0.25 weight) + if device_fingerprint: + if is_new_device(user_id, device_fingerprint): + risk_score += 0.25 + risk_factors.append("new_device") + + # Factor 3: Unusual login time (0.15 weight) + if is_unusual_time(): + risk_score += 0.15 + risk_factors.append("unusual_time") + + # Factor 4: Recent failed attempts (0.4 weight) + recent_failures = get_recent_failed_attempts(user_id) + if recent_failures >= 3: + risk_score += min(0.4, recent_failures * 0.1) + risk_factors.append(f"recent_failed_attempts:{recent_failures}") + + # Factor 5: Rapid successive attempts from different IPs (0.3 weight) + if ip_address and has_rapid_location_change(user_id, ip_address): + risk_score += 0.3 + risk_factors.append("rapid_location_change") + + # Cap at 1.0 + risk_score = min(1.0, risk_score) + + return risk_score, risk_factors + + +def is_new_ip(user_id: int, ip_address: str) -> bool: + """Check if the IP address is new for this user.""" + # Check Redis cache first + cache_key = USER_IPS_KEY.format(user_id=user_id) + try: + known_ips = redis_client.smembers(cache_key) + if ip_address in known_ips: + return False + except Exception as e: + logger.warning(f"Redis error checking known IPs: {e}") + + # Fall back to database + existing = db.session.query(LoginEvent).filter( + LoginEvent.user_id == user_id, + LoginEvent.ip_address == ip_address, + LoginEvent.event_type == LoginEventType.LOGIN_SUCCESS.value + ).first() + + return existing is None + + +def is_new_device(user_id: int, device_fingerprint: str) -> bool: + """Check if the device is new for this user.""" + # Check Redis cache first + cache_key = USER_DEVICES_KEY.format(user_id=user_id) + try: + known_devices = redis_client.smembers(cache_key) + if device_fingerprint in known_devices: + return False + except Exception as e: + logger.warning(f"Redis error checking known devices: {e}") + + # Fall back to database + existing = db.session.query(LoginEvent).filter( + LoginEvent.user_id == user_id, + LoginEvent.device_fingerprint == device_fingerprint, + LoginEvent.event_type == LoginEventType.LOGIN_SUCCESS.value + ).first() + + return existing is None + + +def is_unusual_time() -> bool: + """ + Check if current time is unusual for login (e.g., late night). + Unusual hours: 1 AM to 5 AM UTC + """ + current_hour = datetime.utcnow().hour + return 1 <= current_hour <= 5 + + +def get_recent_failed_attempts(user_id: int, minutes: int = 15) -> int: + """Get count of recent failed login attempts for a user.""" + since = datetime.utcnow() - timedelta(minutes=minutes) + count = db.session.query(func.count(LoginEvent.id)).filter( + LoginEvent.user_id == user_id, + LoginEvent.event_type == LoginEventType.LOGIN_FAILED.value, + LoginEvent.created_at >= since + ).scalar() + return count or 0 + + +def has_rapid_location_change(user_id: int, current_ip: str) -> bool: + """ + Check if there's a rapid location change (login from different IPs in short time). + This is a simplified check - for production, use GeoIP for actual distance calculation. + """ + recent_login = db.session.query(LoginEvent).filter( + LoginEvent.user_id == user_id, + LoginEvent.event_type == LoginEventType.LOGIN_SUCCESS.value, + LoginEvent.created_at >= datetime.utcnow() - timedelta(hours=1) + ).order_by(desc(LoginEvent.created_at)).first() + + if recent_login and recent_login.ip_address: + return recent_login.ip_address != current_ip + return False + + +def record_login_event( + user_id: Optional[int], + event_type: str, + ip_address: Optional[str] = None, + user_agent: Optional[str] = None, + device_fingerprint: Optional[str] = None, + location_country: Optional[str] = None, + location_city: Optional[str] = None, + risk_score: float = 0.0, + risk_factors: Optional[List[str]] = None +) -> LoginEvent: + """ + Record a login event and return the created event object. + """ + event = LoginEvent( + user_id=user_id, + event_type=event_type, + ip_address=ip_address or get_client_ip(), + user_agent=user_agent or get_user_agent(), + device_fingerprint=device_fingerprint or get_device_fingerprint(), + location_country=location_country, + location_city=location_city, + risk_score=Decimal(str(risk_score)), + risk_factors=json.dumps(risk_factors) if risk_factors else None + ) + db.session.add(event) + db.session.commit() + + # Update Redis cache for known IPs and devices + if user_id and event_type == LoginEventType.LOGIN_SUCCESS.value: + try: + if event.ip_address: + redis_key = USER_IPS_KEY.format(user_id=user_id) + redis_client.sadd(redis_key, event.ip_address) + redis_client.expire(redis_key, 86400 * 30) # 30 days + + if event.device_fingerprint: + redis_key = USER_DEVICES_KEY.format(user_id=user_id) + redis_client.sadd(redis_key, event.device_fingerprint) + redis_client.expire(redis_key, 86400 * 30) # 30 days + except Exception as e: + logger.warning(f"Redis error updating known IPs/devices: {e}") + + logger.info( + f"Recorded login event: user_id={user_id}, type={event_type}, " + f"ip={event.ip_address}, risk_score={risk_score}" + ) + + return event + + +def check_brute_force(ip_address: str) -> bool: + """ + Check if an IP is blocked due to brute force attempts. + Returns True if the IP should be blocked. + """ + if not ip_address: + return False + + redis_key = BRUTE_FORCE_KEY.format(ip=ip_address) + try: + blocked = redis_client.get(redis_key + ":blocked") + if blocked: + return True + + attempts = redis_client.get(redis_key) + if attempts and int(attempts) >= BRUTE_FORCE_THRESHOLD: + # Block the IP + redis_client.setex( + redis_key + ":blocked", + BRUTE_FORCE_BLOCK_MINUTES * 60, + "1" + ) + return True + except Exception as e: + logger.warning(f"Redis error checking brute force: {e}") + + return False + + +def record_failed_attempt(ip_address: str) -> int: + """ + Record a failed login attempt for brute force detection. + Returns the current count of failed attempts for this IP. + """ + if not ip_address: + return 0 + + redis_key = BRUTE_FORCE_KEY.format(ip=ip_address) + try: + count = redis_client.incr(redis_key) + redis_client.expire(redis_key, BRUTE_FORCE_WINDOW_MINUTES * 60) + return count + except Exception as e: + logger.warning(f"Redis error recording failed attempt: {e}") + return 0 + + +def clear_brute_force_block(ip_address: str) -> None: + """Clear brute force block for an IP (e.g., after successful login).""" + if not ip_address: + return + + redis_key = BRUTE_FORCE_KEY.format(ip=ip_address) + try: + redis_client.delete(redis_key) + redis_client.delete(redis_key + ":blocked") + except Exception as e: + logger.warning(f"Redis error clearing brute force block: {e}") + + +def create_security_alert( + user_id: int, + alert_type: str, + title: str, + description: Optional[str] = None, + severity: str = AlertSeverity.MEDIUM.value, + ip_address: Optional[str] = None, + user_agent: Optional[str] = None, + login_event_id: Optional[int] = None +) -> SecurityAlert: + """ + Create a security alert for a user. + """ + alert = SecurityAlert( + user_id=user_id, + alert_type=alert_type, + title=title, + description=description, + severity=severity, + ip_address=ip_address, + user_agent=user_agent, + login_event_id=login_event_id + ) + db.session.add(alert) + db.session.commit() + + logger.warning( + f"Created security alert: user_id={user_id}, type={alert_type}, " + f"severity={severity}, title={title}" + ) + + return alert + + +def get_user_login_history( + user_id: int, + event_type: Optional[str] = None, + limit: int = 50, + offset: int = 0 +) -> List[Dict[str, Any]]: + """ + Get login history for a user. + """ + query = db.session.query(LoginEvent).filter( + LoginEvent.user_id == user_id + ) + + if event_type: + query = query.filter(LoginEvent.event_type == event_type) + + events = query.order_by(desc(LoginEvent.created_at)).offset(offset).limit(limit).all() + return [e.to_dict() for e in events] + + +def get_user_alerts( + user_id: int, + status: Optional[str] = None, + severity: Optional[str] = None, + limit: int = 50, + offset: int = 0 +) -> List[Dict[str, Any]]: + """ + Get security alerts for a user. + """ + query = db.session.query(SecurityAlert).filter( + SecurityAlert.user_id == user_id + ) + + if status: + query = query.filter(SecurityAlert.status == status) + + if severity: + query = query.filter(SecurityAlert.severity == severity) + + alerts = query.order_by(desc(SecurityAlert.created_at)).offset(offset).limit(limit).all() + return [a.to_dict() for a in alerts] + + +def acknowledge_alert(alert_id: int, user_id: int) -> Optional[SecurityAlert]: + """ + Acknowledge a security alert. + """ + alert = db.session.query(SecurityAlert).filter( + SecurityAlert.id == alert_id, + SecurityAlert.user_id == user_id + ).first() + + if not alert: + return None + + alert.status = AlertStatus.ACKNOWLEDGED.value + alert.acknowledged_at = datetime.utcnow() + alert.acknowledged_by = user_id + db.session.commit() + + logger.info(f"Acknowledged alert: id={alert_id}, user_id={user_id}") + return alert + + +def acknowledge_all_alerts(user_id: int) -> int: + """ + Acknowledge all active alerts for a user. + Returns the count of acknowledged alerts. + """ + result = db.session.query(SecurityAlert).filter( + SecurityAlert.user_id == user_id, + SecurityAlert.status == AlertStatus.ACTIVE.value + ).update({ + "status": AlertStatus.ACKNOWLEDGED.value, + "acknowledged_at": datetime.utcnow(), + "acknowledged_by": user_id + }) + db.session.commit() + + logger.info(f"Acknowledged all alerts: user_id={user_id}, count={result}") + return result + + +def get_security_stats(user_id: int, days: int = 30) -> Dict[str, Any]: + """ + Get security statistics for a user. + """ + since = datetime.utcnow() - timedelta(days=days) + + # Login counts + total_logins = db.session.query(func.count(LoginEvent.id)).filter( + LoginEvent.user_id == user_id, + LoginEvent.event_type == LoginEventType.LOGIN_SUCCESS.value, + LoginEvent.created_at >= since + ).scalar() or 0 + + failed_logins = db.session.query(func.count(LoginEvent.id)).filter( + LoginEvent.user_id == user_id, + LoginEvent.event_type == LoginEventType.LOGIN_FAILED.value, + LoginEvent.created_at >= since + ).scalar() or 0 + + # Unique IPs + unique_ips = db.session.query(func.count(func.distinct(LoginEvent.ip_address))).filter( + LoginEvent.user_id == user_id, + LoginEvent.event_type == LoginEventType.LOGIN_SUCCESS.value, + LoginEvent.created_at >= since + ).scalar() or 0 + + # Unique devices + unique_devices = db.session.query(func.count(func.distinct(LoginEvent.device_fingerprint))).filter( + LoginEvent.user_id == user_id, + LoginEvent.event_type == LoginEventType.LOGIN_SUCCESS.value, + LoginEvent.created_at >= since + ).scalar() or 0 + + # Alert counts + active_alerts = db.session.query(func.count(SecurityAlert.id)).filter( + SecurityAlert.user_id == user_id, + SecurityAlert.status == AlertStatus.ACTIVE.value + ).scalar() or 0 + + total_alerts = db.session.query(func.count(SecurityAlert.id)).filter( + SecurityAlert.user_id == user_id, + SecurityAlert.created_at >= since + ).scalar() or 0 + + # Risk distribution + high_risk_logins = db.session.query(func.count(LoginEvent.id)).filter( + LoginEvent.user_id == user_id, + LoginEvent.event_type == LoginEventType.LOGIN_SUCCESS.value, + LoginEvent.risk_score >= RISK_THRESHOLD_HIGH, + LoginEvent.created_at >= since + ).scalar() or 0 + + return { + "period_days": days, + "total_logins": total_logins, + "failed_logins": failed_logins, + "unique_ips": unique_ips, + "unique_devices": unique_devices, + "active_alerts": active_alerts, + "total_alerts": total_alerts, + "high_risk_logins": high_risk_logins, + "login_success_rate": round(total_logins / max(1, total_logins + failed_logins) * 100, 1) + } + + +def process_login( + user: User, + is_successful: bool, + ip_address: Optional[str] = None, + user_agent: Optional[str] = None +) -> tuple[Optional[LoginEvent], Optional[SecurityAlert], bool]: + """ + Process a login attempt - detect anomalies and generate alerts. + + Args: + user: The user attempting to login + is_successful: Whether the login was successful + ip_address: Client IP address + user_agent: Client user agent + + Returns: + tuple: (login_event, security_alert, should_block) + """ + ip = ip_address or get_client_ip() + ua = user_agent or get_user_agent() + device_fp = get_device_fingerprint() + + # Check brute force + if check_brute_force(ip): + logger.warning(f"Brute force block triggered for IP: {ip}") + event = record_login_event( + user_id=user.id if user else None, + event_type=LoginEventType.BRUTE_FORCE_BLOCKED.value, + ip_address=ip, + user_agent=ua + ) + return event, None, True + + if not is_successful: + # Record failed attempt + count = record_failed_attempt(ip) + + event = record_login_event( + user_id=user.id if user else None, + event_type=LoginEventType.LOGIN_FAILED.value, + ip_address=ip, + user_agent=ua, + device_fingerprint=device_fp + ) + + # Create alert if threshold reached + alert = None + if count >= BRUTE_FORCE_THRESHOLD: + alert = create_security_alert( + user_id=user.id, + alert_type="brute_force_detected", + title="Multiple Failed Login Attempts Detected", + description=f"{count} failed login attempts from IP {ip}", + severity=AlertSeverity.HIGH.value, + ip_address=ip, + user_agent=ua, + login_event_id=event.id + ) + + return event, alert, False + + # Successful login + # Calculate risk score + risk_score, risk_factors = calculate_risk_score( + user_id=user.id, + ip_address=ip, + device_fingerprint=device_fp, + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + + # Clear brute force counter on successful login + clear_brute_force_block(ip) + + # Record login event + event = record_login_event( + user_id=user.id, + event_type=LoginEventType.LOGIN_SUCCESS.value, + ip_address=ip, + user_agent=ua, + device_fingerprint=device_fp, + risk_score=risk_score, + risk_factors=risk_factors + ) + + # Create alert if risk is high enough + alert = None + if risk_score >= RISK_THRESHOLD_MEDIUM: + severity = AlertSeverity.HIGH.value if risk_score >= RISK_THRESHOLD_HIGH else AlertSeverity.MEDIUM.value + + alert = create_security_alert( + user_id=user.id, + alert_type="suspicious_login", + title="Suspicious Login Detected", + description=f"Risk factors: {', '.join(risk_factors)}. Risk score: {risk_score:.2f}", + severity=severity, + ip_address=ip, + user_agent=ua, + login_event_id=event.id + ) + + # Log to audit (with backward compatibility for existing databases) + try: + audit = AuditLog( + user_id=user.id, + action=f"login_success (risk: {risk_score:.2f})", + ip_address=ip, + details=json.dumps({"risk_factors": risk_factors}) if risk_factors else None + ) + db.session.add(audit) + db.session.commit() + except TypeError: + # Fallback for databases without new columns + audit = AuditLog( + user_id=user.id, + action=f"login_success (risk: {risk_score:.2f})" + ) + db.session.add(audit) + db.session.commit() + + return event, alert, False \ No newline at end of file diff --git a/packages/backend/tests/test_security.py b/packages/backend/tests/test_security.py new file mode 100644 index 00000000..b31d26a5 --- /dev/null +++ b/packages/backend/tests/test_security.py @@ -0,0 +1,673 @@ +""" +Tests for Login Anomaly Detection and Security Alerts + +Covers: +- Login event recording +- Risk score calculation +- Brute force detection +- Security alert creation and management +- API endpoints +""" + +import json +import pytest +from datetime import datetime, timedelta +from unittest.mock import patch, MagicMock +from decimal import Decimal + +from app import create_app +from app.config import Settings +from app.extensions import db +from app.models import ( + User, LoginEvent, SecurityAlert, AuditLog, + LoginEventType, AlertSeverity, AlertStatus +) +from app.services import login_anomaly as anomaly_service + + +class TestSettings(Settings): + database_url: str = "sqlite+pysqlite:///:memory:" + redis_url: str = "redis://localhost:6379/15" + jwt_secret: str = "test-secret-with-32-plus-chars-1234567890" + + +@pytest.fixture +def app(): + settings = TestSettings() + app = create_app(settings) + app.config.update(TESTING=True) + + with app.app_context(): + db.create_all() + yield app + db.session.remove() + db.drop_all() + + +@pytest.fixture +def client(app): + return app.test_client() + + +@pytest.fixture +def auth_header(client): + """Register and login a user, return auth header.""" + email = "security@test.com" + password = "password123" + r = client.post("/auth/register", json={"email": email, "password": password}) + assert r.status_code in (201, 409) + r = client.post("/auth/login", json={"email": email, "password": password}) + assert r.status_code == 200 + access = r.get_json()["access_token"] + return {"Authorization": f"Bearer {access}"} + + +@pytest.fixture +def test_user(app): + """Create a test user.""" + with app.app_context(): + user = User( + email="testuser@example.com", + password_hash="hashed_password" + ) + db.session.add(user) + db.session.commit() + return user.id + + +class TestRiskScoreCalculation: + """Tests for risk score calculation.""" + + def test_new_ip_adds_risk(self, app, test_user): + """New IP should add risk score.""" + with app.app_context(): + score, factors = anomaly_service.calculate_risk_score( + user_id=test_user, + ip_address="192.168.1.100", + device_fingerprint="device123", + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + assert "new_ip_address" in factors + assert score >= 0.3 + + def test_new_device_adds_risk(self, app, test_user): + """New device should add risk.""" + with app.app_context(): + # First, record a login with the same IP + event = LoginEvent( + user_id=test_user, + event_type=LoginEventType.LOGIN_SUCCESS.value, + ip_address="192.168.1.100", + device_fingerprint="old_device" + ) + db.session.add(event) + db.session.commit() + + # Now check with new device but same IP + score, factors = anomaly_service.calculate_risk_score( + user_id=test_user, + ip_address="192.168.1.100", + device_fingerprint="new_device_456", + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + assert "new_device" in factors + assert score >= 0.25 + + def test_known_ip_reduces_risk(self, app, test_user): + """Known IP should not add new_ip risk.""" + with app.app_context(): + # Record a previous login with this IP + event = LoginEvent( + user_id=test_user, + event_type=LoginEventType.LOGIN_SUCCESS.value, + ip_address="192.168.1.100", + device_fingerprint="device123" + ) + db.session.add(event) + db.session.commit() + + # Check with same IP + score, factors = anomaly_service.calculate_risk_score( + user_id=test_user, + ip_address="192.168.1.100", + device_fingerprint="device123", + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + assert "new_ip_address" not in factors + assert "new_device" not in factors + assert score < 0.3 + + def test_risk_score_capped_at_one(self, app, test_user): + """Risk score should be capped at 1.0.""" + with app.app_context(): + # Add multiple failed attempts + for _ in range(10): + event = LoginEvent( + user_id=test_user, + event_type=LoginEventType.LOGIN_FAILED.value, + ip_address="192.168.1.100" + ) + db.session.add(event) + db.session.commit() + + score, factors = anomaly_service.calculate_risk_score( + user_id=test_user, + ip_address="10.0.0.1", # New IP + device_fingerprint="new_device", + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + assert score <= 1.0 + + +class TestLoginEventRecording: + """Tests for login event recording.""" + + def test_record_successful_login(self, app, test_user): + """Record a successful login event.""" + with app.app_context(): + event = anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.LOGIN_SUCCESS.value, + ip_address="192.168.1.1", + user_agent="Mozilla/5.0", + device_fingerprint="device123", + risk_score=0.1, + risk_factors=["test"] + ) + + assert event.id is not None + assert event.event_type == LoginEventType.LOGIN_SUCCESS.value + assert event.ip_address == "192.168.1.1" + assert event.user_agent == "Mozilla/5.0" + + def test_record_failed_login(self, app, test_user): + """Record a failed login event.""" + with app.app_context(): + event = anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.LOGIN_FAILED.value, + ip_address="192.168.1.1" + ) + + assert event.id is not None + assert event.event_type == LoginEventType.LOGIN_FAILED.value + + def test_record_brute_force_blocked(self, app, test_user): + """Record a brute force blocked event.""" + with app.app_context(): + event = anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.BRUTE_FORCE_BLOCKED.value, + ip_address="192.168.1.1" + ) + + assert event.id is not None + assert event.event_type == LoginEventType.BRUTE_FORCE_BLOCKED.value + + +class TestBruteForceDetection: + """Tests for brute force detection.""" + + def test_brute_force_not_triggered_initially(self, app): + """IP should not be blocked initially.""" + with app.app_context(): + with patch.object(anomaly_service.redis_client, 'get', return_value=None): + with patch.object(anomaly_service.redis_client, 'get', return_value=None): + result = anomaly_service.check_brute_force("192.168.1.1") + assert result is False + + def test_brute_force_after_threshold(self, app): + """IP should be blocked after threshold.""" + with app.app_context(): + ip = "192.168.1.100" + mock_redis = MagicMock() + mock_redis.get.return_value = None + mock_redis.get.return_value = None + + with patch.object(anomaly_service, 'redis_client', mock_redis): + # Simulate threshold reached + mock_redis.get.return_value = str(anomaly_service.BRUTE_FORCE_THRESHOLD) + result = anomaly_service.check_brute_force(ip) + # Should check the blocked flag + mock_redis.get.assert_called() + + def test_clear_brute_force_block(self, app): + """Clear brute force block should work.""" + with app.app_context(): + ip = "192.168.1.100" + mock_redis = MagicMock() + + with patch.object(anomaly_service, 'redis_client', mock_redis): + anomaly_service.clear_brute_force_block(ip) + # Should delete the keys + mock_redis.delete.assert_called() + + +class TestSecurityAlerts: + """Tests for security alert management.""" + + def test_create_security_alert(self, app, test_user): + """Create a security alert.""" + with app.app_context(): + alert = anomaly_service.create_security_alert( + user_id=test_user, + alert_type="suspicious_login", + title="Suspicious Login Detected", + description="Login from new location", + severity=AlertSeverity.MEDIUM.value, + ip_address="192.168.1.1" + ) + + assert alert.id is not None + assert alert.alert_type == "suspicious_login" + assert alert.severity == AlertSeverity.MEDIUM.value + assert alert.status == AlertStatus.ACTIVE.value + + def test_acknowledge_alert(self, app, test_user): + """Acknowledge a security alert.""" + with app.app_context(): + alert = anomaly_service.create_security_alert( + user_id=test_user, + alert_type="test_alert", + title="Test Alert" + ) + + acknowledged = anomaly_service.acknowledge_alert(alert.id, test_user) + + assert acknowledged is not None + assert acknowledged.status == AlertStatus.ACKNOWLEDGED.value + assert acknowledged.acknowledged_at is not None + + def test_acknowledge_nonexistent_alert(self, app, test_user): + """Acknowledge non-existent alert should return None.""" + with app.app_context(): + result = anomaly_service.acknowledge_alert(9999, test_user) + assert result is None + + def test_acknowledge_all_alerts(self, app, test_user): + """Acknowledge all alerts for a user.""" + with app.app_context(): + # Create multiple alerts + for i in range(3): + anomaly_service.create_security_alert( + user_id=test_user, + alert_type=f"alert_{i}", + title=f"Alert {i}" + ) + + count = anomaly_service.acknowledge_all_alerts(test_user) + + assert count == 3 + + # Verify all are acknowledged + alerts = anomaly_service.get_user_alerts(test_user, status=AlertStatus.ACTIVE.value) + assert len(alerts) == 0 + + +class TestLoginHistory: + """Tests for login history retrieval.""" + + def test_get_login_history(self, app, test_user): + """Get login history for a user.""" + with app.app_context(): + # Create some login events + for i in range(5): + anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.LOGIN_SUCCESS.value, + ip_address=f"192.168.1.{i}" + ) + + history = anomaly_service.get_user_login_history(test_user) + + assert len(history) == 5 + + def test_get_history_with_filter(self, app, test_user): + """Get login history filtered by event type.""" + with app.app_context(): + anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.LOGIN_FAILED.value + ) + + history = anomaly_service.get_user_login_history( + test_user, + event_type=LoginEventType.LOGIN_SUCCESS.value + ) + + assert len(history) == 1 + assert history[0]["event_type"] == LoginEventType.LOGIN_SUCCESS.value + + +class TestSecurityStats: + """Tests for security statistics.""" + + def test_get_security_stats(self, app, test_user): + """Get security statistics for a user.""" + with app.app_context(): + # Create some events + anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.LOGIN_SUCCESS.value, + ip_address="192.168.1.1", + device_fingerprint="device1", + risk_score=0.8 + ) + anomaly_service.record_login_event( + user_id=test_user, + event_type=LoginEventType.LOGIN_FAILED.value + ) + + stats = anomaly_service.get_security_stats(test_user) + + assert stats["total_logins"] == 1 + assert stats["failed_logins"] == 1 + assert stats["unique_ips"] == 1 + assert stats["unique_devices"] == 1 + assert stats["high_risk_logins"] == 1 + + +class TestProcessLogin: + """Tests for the process_login function.""" + + def test_process_successful_login(self, app, test_user): + """Process a successful login.""" + with app.app_context(): + user = db.session.get(User, test_user) + + # Mock Redis to avoid connection issues + mock_redis = MagicMock() + mock_redis.get.return_value = None + mock_redis.smembers.return_value = set() + mock_redis.sadd.return_value = 1 + mock_redis.expire.return_value = True + + with patch.object(anomaly_service, 'redis_client', mock_redis): + event, alert, should_block = anomaly_service.process_login( + user=user, + is_successful=True, + ip_address="192.168.1.1" + ) + + assert event is not None + assert event.event_type == LoginEventType.LOGIN_SUCCESS.value + assert should_block is False + + def test_process_failed_login(self, app, test_user): + """Process a failed login.""" + with app.app_context(): + user = db.session.get(User, test_user) + + # Mock Redis + mock_redis = MagicMock() + mock_redis.get.return_value = None + mock_redis.incr.return_value = 1 + mock_redis.expire.return_value = True + + with patch.object(anomaly_service, 'redis_client', mock_redis): + event, alert, should_block = anomaly_service.process_login( + user=user, + is_successful=False, + ip_address="192.168.1.1" + ) + + assert event is not None + assert event.event_type == LoginEventType.LOGIN_FAILED.value + assert should_block is False + + def test_process_login_creates_alert_on_high_risk(self, app, test_user): + """High risk login should create an alert.""" + with app.app_context(): + user = db.session.get(User, test_user) + + # Mock Redis + mock_redis = MagicMock() + mock_redis.get.return_value = None + mock_redis.smembers.return_value = set() + mock_redis.sadd.return_value = 1 + mock_redis.expire.return_value = True + + with patch.object(anomaly_service, 'redis_client', mock_redis): + event, alert, should_block = anomaly_service.process_login( + user=user, + is_successful=True, + ip_address="10.0.0.1", # New IP to trigger risk + user_agent="New Agent" + ) + + # Should have high risk due to new IP and device + assert float(event.risk_score) >= 0.3 + + +class TestSecurityAPIEndpoints: + """Tests for security API endpoints.""" + + def test_record_event_endpoint(self, client, auth_header): + """Test POST /security/record endpoint.""" + r = client.post( + "/security/record", + json={ + "event_type": "login_success", + "ip_address": "192.168.1.1" + }, + headers=auth_header + ) + + assert r.status_code == 201 + data = r.get_json() + assert "event" in data + assert data["event"]["event_type"] == "login_success" + + def test_record_event_invalid_type(self, client, auth_header): + """Test POST /security/record with invalid event type.""" + r = client.post( + "/security/record", + json={"event_type": "invalid_type"}, + headers=auth_header + ) + + assert r.status_code == 400 + + def test_login_history_endpoint(self, client, auth_header): + """Test GET /security/history endpoint.""" + # Record an event first + client.post( + "/security/record", + json={"event_type": "login_success"}, + headers=auth_header + ) + + r = client.get("/security/history", headers=auth_header) + + assert r.status_code == 200 + data = r.get_json() + assert "events" in data + assert len(data["events"]) >= 1 + + def test_alerts_endpoint(self, client, auth_header): + """Test GET /security/alerts endpoint.""" + r = client.get("/security/alerts", headers=auth_header) + + assert r.status_code == 200 + data = r.get_json() + assert "alerts" in data + + def test_stats_endpoint(self, client, auth_header): + """Test GET /security/stats endpoint.""" + r = client.get("/security/stats", headers=auth_header) + + assert r.status_code == 200 + data = r.get_json() + assert "total_logins" in data + assert "failed_logins" in data + assert "unique_ips" in data + + def test_analyze_endpoint(self, client, auth_header): + """Test POST /security/analyze endpoint.""" + r = client.post( + "/security/analyze", + json={"ip_address": "192.168.1.1"}, + headers=auth_header + ) + + assert r.status_code == 200 + data = r.get_json() + assert "risk_score" in data + assert "risk_factors" in data + assert "recommendation" in data + + def test_check_ip_endpoint(self, client, auth_header): + """Test GET /security/check-ip endpoint.""" + r = client.get("/security/check-ip", headers=auth_header) + + assert r.status_code == 200 + data = r.get_json() + assert "ip_address" in data + assert "blocked" in data + + +class TestAuthLoginIntegration: + """Tests for auth login integration with security.""" + + def test_login_returns_security_info(self, client): + """Login should include security information.""" + email = "integration@test.com" + password = "password123" + + # Register + r = client.post("/auth/register", json={"email": email, "password": password}) + assert r.status_code == 201 + + # Login + r = client.post("/auth/login", json={"email": email, "password": password}) + assert r.status_code == 200 + + data = r.get_json() + assert "access_token" in data + assert "refresh_token" in data + + def test_login_failed_creates_event(self, client): + """Failed login should create a login event.""" + r = client.post("/auth/login", json={ + "email": "nonexistent@test.com", + "password": "wrongpassword" + }) + + assert r.status_code == 401 + + def test_login_with_valid_credentials(self, client): + """Successful login should work.""" + email = "valid@test.com" + password = "password123" + + client.post("/auth/register", json={"email": email, "password": password}) + + r = client.post("/auth/login", json={"email": email, "password": password}) + + assert r.status_code == 200 + data = r.get_json() + assert "access_token" in data + + +class TestUnusualTimeDetection: + """Tests for unusual time detection.""" + + def test_is_unusual_time_night(self): + """Test unusual time detection for night hours.""" + # Mock datetime to return a specific hour + with patch('app.services.login_anomaly.datetime') as mock_dt: + # Test 3 AM UTC (unusual) + mock_dt.utcnow.return_value = MagicMock(hour=3) + assert anomaly_service.is_unusual_time() is True + + # Test 2 AM UTC (unusual) + mock_dt.utcnow.return_value = MagicMock(hour=2) + assert anomaly_service.is_unusual_time() is True + + def test_is_unusual_time_day(self): + """Test unusual time detection for normal hours.""" + with patch('app.services.login_anomaly.datetime') as mock_dt: + # Test 10 AM UTC (normal) + mock_dt.utcnow.return_value = MagicMock(hour=10) + assert anomaly_service.is_unusual_time() is False + + # Test 8 PM UTC (normal) + mock_dt.utcnow.return_value = MagicMock(hour=20) + assert anomaly_service.is_unusual_time() is False + + +class TestPagination: + """Tests for pagination in endpoints.""" + + def test_history_pagination(self, client, auth_header): + """Test pagination for login history.""" + # Record multiple events + for i in range(15): + client.post( + "/security/record", + json={"event_type": "login_success"}, + headers=auth_header + ) + + # Get first page + r = client.get("/security/history?limit=10&offset=0", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert len(data["events"]) == 10 + + # Get second page + r = client.get("/security/history?limit=10&offset=10", headers=auth_header) + assert r.status_code == 200 + data = r.get_json() + assert len(data["events"]) >= 5 + + def test_alerts_pagination(self, client, auth_header): + """Test pagination for alerts.""" + r = client.get("/security/alerts?limit=10&offset=0", headers=auth_header) + assert r.status_code == 200 + + +class TestSecurity: + """Security-related tests.""" + + def test_unauthorized_access(self, client): + """Unauthorized requests should be rejected.""" + r = client.get("/security/history") + assert r.status_code == 401 + + r = client.get("/security/alerts") + assert r.status_code == 401 + + r = client.get("/security/stats") + assert r.status_code == 401 + + def test_user_cannot_access_other_users_alerts(self, client): + """User should not be able to access other users' alerts.""" + # Create user 1 + email1 = "user1@test.com" + password1 = "password123" + client.post("/auth/register", json={"email": email1, "password": password1}) + r1 = client.post("/auth/login", json={"email": email1, "password": password1}) + auth1 = {"Authorization": f"Bearer {r1.get_json()['access_token']}"} + + # Create user 2 + email2 = "user2@test.com" + password2 = "password123" + client.post("/auth/register", json={"email": email2, "password": password2}) + r2 = client.post("/auth/login", json={"email": email2, "password": password2}) + auth2 = {"Authorization": f"Bearer {r2.get_json()['access_token']}"} + + # User 1 should not see User 2's data + r = client.get("/security/alerts", headers=auth1) + alerts1 = r.get_json()["alerts"] + + r = client.get("/security/alerts", headers=auth2) + alerts2 = r.get_json()["alerts"] + + # Each user should only see their own alerts + assert len(alerts1) == 0 + assert len(alerts2) == 0 \ No newline at end of file