diff --git a/forum/api/bans.py b/forum/api/bans.py new file mode 100644 index 00000000..7a039919 --- /dev/null +++ b/forum/api/bans.py @@ -0,0 +1,395 @@ +""" +API functions for managing discussion bans. +""" + +# mypy: ignore-errors + +import logging +from typing import Any, Dict, List, Optional + +from django.contrib.auth import get_user_model +from django.db import models, transaction +from django.utils import timezone +from opaque_keys.edx.keys import CourseKey + +from forum.backends.mysql.models import ( + DiscussionBan, + DiscussionBanException, + ModerationAuditLog, +) + +User = get_user_model() +log = logging.getLogger(__name__) + + +def ban_user( + user_id: str, + banned_by_id: str, + course_id: Optional[str] = None, + org_key: Optional[str] = None, + scope: str = "course", + reason: str = "", +) -> Dict[str, Any]: + """ + Ban a user from discussions. + + Args: + user_id: ID of user to ban + banned_by_id: ID of user performing the ban + course_id: Course ID for course-level bans + org_key: Organization key for org-level bans + scope: 'course' or 'organization' + reason: Reason for the ban + + Returns: + dict: Ban record data including id, user info, scope, and timestamps + + Raises: + ValueError: If invalid parameters provided + User.DoesNotExist: If user or banned_by user not found + """ + if scope not in ["course", "organization"]: + raise ValueError(f"Invalid scope: {scope}. Must be 'course' or 'organization'") + + if scope == "course" and not course_id: + raise ValueError("course_id is required for course-level bans") + + if scope == "organization" and not org_key: + raise ValueError("org_key is required for organization-level bans") + + # Get user objects + banned_user = User.objects.get(id=user_id) + moderator = User.objects.get(id=banned_by_id) + + with transaction.atomic(): + # Determine lookup kwargs based on scope + course_key = None # Initialize for audit log + if scope == "organization": + lookup_kwargs = { + "user": banned_user, + "org_key": org_key, + "scope": "organization", + } + ban_kwargs = { + **lookup_kwargs, + } + else: + course_key = CourseKey.from_string(course_id) + # Extract org from course_id for denormalization + course_org = str(course_key.org) if hasattr(course_key, "org") else org_key + lookup_kwargs = { + "user": banned_user, + "course_id": course_key, + "scope": "course", + } + ban_kwargs = { + **lookup_kwargs, + "org_key": course_org, # Denormalized field for easier querying + } + + # Create or update ban + ban, created = DiscussionBan.objects.get_or_create( + **lookup_kwargs, + defaults={ + **ban_kwargs, + "banned_by": moderator, + "reason": reason or "No reason provided", + "is_active": True, + "banned_at": timezone.now(), + }, + ) + + if not created and not ban.is_active: + # Reactivate previously deactivated ban + ban.is_active = True + ban.banned_by = moderator + ban.reason = reason or ban.reason + ban.banned_at = timezone.now() + ban.unbanned_at = None + ban.unbanned_by = None + ban.save() + + # Create audit log + ModerationAuditLog.objects.create( + action_type=ModerationAuditLog.ACTION_BAN, + source=ModerationAuditLog.SOURCE_HUMAN, + target_user=banned_user, + moderator=moderator, + course_id=str(course_key) if course_key else None, + scope=scope, + reason=reason, + metadata={ + "ban_id": ban.id, + "created": created, + }, + # AI moderation fields (required by schema, not applicable for ban actions) + body="", + original_author=banned_user, + classification="", + classifier_output={}, + actions_taken=[], + confidence_score=None, + reasoning="", + moderator_override=False, + ) + + log.info( + "User banned: user_id=%s, scope=%s, course_id=%s, org_key=%s, banned_by=%s", + user_id, + scope, + course_id, + org_key, + banned_by_id, + ) + + return _serialize_ban(ban) + + +def unban_user( + ban_id: int, + unbanned_by_id: str, + course_id: Optional[str] = None, + reason: str = "", +) -> Dict[str, Any]: + """ + Unban a user from discussions. + + For course-level bans: Deactivates the ban completely. + For org-level bans with course_id: Creates an exception for that course. + For org-level bans without course_id: Deactivates the entire org ban. + + Args: + ban_id: ID of the ban to unban + unbanned_by_id: ID of user performing the unban + course_id: Optional course ID for org-level ban exceptions + reason: Reason for unbanning + + Returns: + dict: Response with status, message, and ban/exception data + + Raises: + DiscussionBan.DoesNotExist: If ban not found + User.DoesNotExist: If unbanned_by user not found + """ + try: + ban = DiscussionBan.objects.get(id=ban_id, is_active=True) + except DiscussionBan.DoesNotExist as exc: + raise ValueError(f"Active ban with id {ban_id} not found") from exc + + moderator = User.objects.get(id=unbanned_by_id) + exception_created = False + exception_data = None + + with transaction.atomic(): + # For org-level bans with course_id: create exception instead of full unban + if ban.scope == "organization" and course_id: + course_key = CourseKey.from_string(course_id) + + # Create exception for this specific course + exception, created = DiscussionBanException.objects.get_or_create( + ban=ban, + course_id=course_key, + defaults={ + "unbanned_by": moderator, + "reason": reason or "Course-level exception to organization ban", + }, + ) + + exception_created = True + exception_data = { + "id": exception.id, + "ban_id": ban.id, + "course_id": str(course_id), + "unbanned_by": moderator.username, + "reason": exception.reason, + "created_at": ( + exception.created.isoformat() + if hasattr(exception, "created") + else None + ), + } + + message = ( + f"User {ban.user.username} unbanned from {course_id} " + f"(org-level ban still active for other courses)" + ) + + # Audit log for exception + ModerationAuditLog.objects.create( + action_type=ModerationAuditLog.ACTION_BAN_EXCEPTION, + source=ModerationAuditLog.SOURCE_HUMAN, + target_user=ban.user, + moderator=moderator, + course_id=str(course_key), + scope="organization", + reason=f"Exception to org ban: {reason}", + metadata={ + "ban_id": ban.id, + "exception_id": exception.id, + "exception_created": created, + "org_key": ban.org_key, + }, + # AI moderation fields (required by schema, not applicable for ban actions) + body="", + original_author=ban.user, + classification="", + classifier_output={}, + actions_taken=[], + confidence_score=None, + reasoning="", + moderator_override=False, + ) + else: + # Full unban (course-level or complete org-level unban) + ban.is_active = False + ban.unbanned_at = timezone.now() + ban.unbanned_by = moderator + ban.save() + + message = f"User {ban.user.username} unbanned successfully" + + # Audit log + ModerationAuditLog.objects.create( + action_type=ModerationAuditLog.ACTION_UNBAN, + source=ModerationAuditLog.SOURCE_HUMAN, + target_user=ban.user, + moderator=moderator, + course_id=str(ban.course_id) if ban.course_id else None, + scope=ban.scope, + reason=f"Unban: {reason}", + metadata={ + "ban_id": ban.id, + }, + # AI moderation fields (required by schema, not applicable for ban actions) + body="", + original_author=ban.user, + classification="", + classifier_output={}, + actions_taken=[], + confidence_score=None, + reasoning="", + moderator_override=False, + ) + + log.info( + "User unbanned: ban_id=%s, user_id=%s, exception_created=%s, unbanned_by=%s", + ban_id, + ban.user.id, + exception_created, + unbanned_by_id, + ) + + return { + "status": "success", + "message": message, + "exception_created": exception_created, + "ban": _serialize_ban(ban), + "exception": exception_data, + } + + +def get_banned_users( + course_id: Optional[str] = None, + org_key: Optional[str] = None, + include_inactive: bool = False, +) -> List[Dict[str, Any]]: + """ + Get list of banned users. + + Args: + course_id: Filter by course ID (includes org-level bans for that course's org) + org_key: Filter by organization key + include_inactive: Include inactive (unbanned) users + + Returns: + list: List of ban records + """ + queryset = DiscussionBan.objects.select_related("user", "banned_by", "unbanned_by") + + if not include_inactive: + queryset = queryset.filter(is_active=True) + + if course_id: + course_key = CourseKey.from_string(course_id) + # Include both course-level bans and org-level bans for this course's org + try: + # pylint: disable=import-error,import-outside-toplevel + from openedx.core.djangoapps.content.course_overviews.models import ( + CourseOverview, + ) + + course = CourseOverview.objects.get(id=course_key) + queryset = queryset.filter( + models.Q(course_id=course_key) | models.Q(org_key=course.org) + ) + except (ImportError, Exception): # pylint: disable=broad-exception-caught + # Fallback to just course-level bans if CourseOverview not available + queryset = queryset.filter(course_id=course_key) + elif org_key: + queryset = queryset.filter(org_key=org_key) + + queryset = queryset.order_by("-banned_at") + + return [_serialize_ban(ban) for ban in queryset] + + +def get_ban(ban_id: int) -> Dict[str, Any]: + """ + Get a specific ban by ID. + + Args: + ban_id: ID of the ban + + Returns: + dict: Ban record data + + Raises: + DiscussionBan.DoesNotExist: If ban not found + """ + ban = DiscussionBan.objects.select_related("user", "banned_by", "unbanned_by").get( + id=ban_id + ) + return _serialize_ban(ban) + + +def _serialize_ban(ban: DiscussionBan) -> Dict[str, Any]: + """ + Serialize a ban object to dictionary. + + Args: + ban: DiscussionBan instance + + Returns: + dict: Serialized ban data + """ + return { + "id": ban.id, + "user": { + "id": ban.user.id, + "username": ban.user.username, + "email": ban.user.email, + }, + "course_id": str(ban.course_id) if ban.course_id else None, + "org_key": ban.org_key, + "scope": ban.scope, + "reason": ban.reason, + "is_active": ban.is_active, + "banned_at": ban.banned_at.isoformat() if ban.banned_at else None, + "banned_by": ( + { + "id": ban.banned_by.id, + "username": ban.banned_by.username, + } + if ban.banned_by + else None + ), + "unbanned_at": ban.unbanned_at.isoformat() if ban.unbanned_at else None, + "unbanned_by": ( + { + "id": ban.unbanned_by.id, + "username": ban.unbanned_by.username, + } + if ban.unbanned_by + else None + ), + } diff --git a/forum/backends/mongodb/__init__.py b/forum/backends/mongodb/__init__.py index 569ce740..383ea094 100644 --- a/forum/backends/mongodb/__init__.py +++ b/forum/backends/mongodb/__init__.py @@ -2,6 +2,11 @@ Mongo Models """ +from .bans import ( + DiscussionBanExceptions, + DiscussionBans, + DiscussionModerationLogs, +) from .comments import Comment from .contents import BaseContents, Contents from .subscriptions import Subscriptions @@ -13,6 +18,9 @@ "Comment", "Contents", "CommentThread", + "DiscussionBanExceptions", + "DiscussionBans", + "DiscussionModerationLogs", "Subscriptions", "Users", "MODEL_INDICES", diff --git a/forum/backends/mongodb/bans.py b/forum/backends/mongodb/bans.py new file mode 100644 index 00000000..f7b71a15 --- /dev/null +++ b/forum/backends/mongodb/bans.py @@ -0,0 +1,463 @@ +"""Discussion ban models for MongoDB backend.""" + +from datetime import datetime +from typing import Any, Optional + +from bson import ObjectId +from pymongo.results import InsertOneResult, UpdateResult + +from forum.backends.mongodb.base_model import MongoBaseModel + + +class DiscussionBans(MongoBaseModel): + """ + MongoDB model for tracking users banned from course or organization discussions. + + Document schema: + { + "_id": ObjectId, + "user_id": int, # Django User ID + "course_id": str, # Optional, for course-level bans + "org_key": str, # Optional, for org-level bans + "scope": str, # "course" or "organization" + "is_active": bool, + "banned_by_id": int, # Django User ID + "reason": str, + "banned_at": datetime, + "unbanned_at": datetime, # Optional + "unbanned_by_id": int, # Optional, Django User ID + "created": datetime, + "modified": datetime + } + """ + + COLLECTION_NAME = "discussion_bans" + + SCOPE_COURSE = "course" + SCOPE_ORGANIZATION = "organization" + + def insert( + self, + user_id: int, + scope: str, + reason: str, + banned_by_id: int, + course_id: Optional[str] = None, + org_key: Optional[str] = None, + is_active: bool = True, + ) -> str: + """ + Create a new discussion ban. + + Args: + user_id: ID of the user being banned + scope: "course" or "organization" + reason: Reason for the ban + banned_by_id: ID of the moderator issuing the ban + course_id: Course ID for course-level bans + org_key: Organization key for org-level bans + is_active: Whether the ban is active + + Returns: + The string ID of the inserted document + """ + now = datetime.utcnow() + + ban_data: dict[str, Any] = { + "user_id": user_id, + "scope": scope, + "is_active": is_active, + "banned_by_id": banned_by_id, + "reason": reason, + "banned_at": now, + "created": now, + "modified": now, + } + + if course_id: + ban_data["course_id"] = course_id + if org_key: + ban_data["org_key"] = org_key + + result: InsertOneResult = self._collection.insert_one(ban_data) + return str(result.inserted_id) + + def update_ban( + self, + ban_id: str, + is_active: Optional[bool] = None, + unbanned_by_id: Optional[int] = None, + unbanned_at: Optional[datetime] = None, + ) -> int: + """ + Update a discussion ban. + + Args: + ban_id: ID of the ban to update + is_active: New active status + unbanned_by_id: ID of moderator unbanning the user + unbanned_at: Timestamp of unban + + Returns: + Number of documents modified + """ + update_data: dict[str, Any] = { + "modified": datetime.utcnow(), + } + + if is_active is not None: + update_data["is_active"] = is_active + if unbanned_by_id is not None: + update_data["unbanned_by_id"] = unbanned_by_id + if unbanned_at is not None: + update_data["unbanned_at"] = unbanned_at + + result: UpdateResult = self._collection.update_one( + {"_id": ObjectId(ban_id)}, {"$set": update_data} + ) + return result.modified_count + + def get_active_ban( + self, + user_id: int, + course_id: Optional[str] = None, + org_key: Optional[str] = None, + scope: Optional[str] = None, + ) -> Optional[dict[str, Any]]: + """ + Get an active ban for a user. + + Args: + user_id: ID of the user + course_id: Course ID to check + org_key: Organization key to check + scope: Specific scope to filter by + + Returns: + Ban document if found, None otherwise + """ + query: dict[str, Any] = { + "user_id": user_id, + "is_active": True, + } + + if scope: + query["scope"] = scope + if course_id: + query["course_id"] = course_id + if org_key: + query["org_key"] = org_key + + return self._collection.find_one(query) + + def is_user_banned( + self, + user_id: int, + course_id: str, + check_org: bool = True, + ) -> bool: + """ + Check if a user is banned from discussions. + + Priority: + 1. Check for course-level exception to org ban (allows user) + 2. Organization-level ban (applies to all courses in org) + 3. Course-level ban (applies to specific course) + + Args: + user_id: User ID to check + course_id: Course ID string (e.g., "course-v1:edX+DemoX+Demo_Course") + check_org: If True, also check organization-level bans + + Returns: + True if user has active ban, False otherwise + """ + # Extract organization from course_id (format: "course-v1:ORG+COURSE+RUN") + try: + if course_id.startswith("course-v1:"): + org_name = course_id.split(":")[1].split("+")[0] + else: + # Fallback for old-style course IDs + org_name = course_id.split("/")[0] + except (IndexError, AttributeError): + org_name = None + + # Check organization-level ban first + if check_org and org_name: + org_ban = self.get_active_ban( + user_id=user_id, org_key=org_name, scope=self.SCOPE_ORGANIZATION + ) + + if org_ban: + # Check if there's an exception for this specific course + exceptions = DiscussionBanExceptions() + if exceptions.has_exception(str(org_ban["_id"]), course_id): + return False + # Org ban applies, no exception + return True + + # Check course-level ban + course_ban = self.get_active_ban( + user_id=user_id, course_id=course_id, scope=self.SCOPE_COURSE + ) + + return course_ban is not None + + def get_user_bans( + self, + user_id: int, + is_active: Optional[bool] = None, + ) -> list[dict[str, Any]]: + """ + Get all bans for a user. + + Args: + user_id: User ID + is_active: Filter by active status if provided + + Returns: + List of ban documents + """ + query: dict[str, Any] = {"user_id": user_id} + + if is_active is not None: + query["is_active"] = is_active + + return list(self._collection.find(query).sort("banned_at", -1)) + + +class DiscussionBanExceptions(MongoBaseModel): + """ + MongoDB model for course-level exceptions to organization-level bans. + + Allows moderators to unban a user from specific courses while + maintaining an organization-wide ban for all other courses. + + Document schema: + { + "_id": ObjectId, + "ban_id": ObjectId, # Reference to discussion_bans document + "course_id": str, + "unbanned_by_id": int, # Django User ID + "reason": str, # Optional + "created": datetime, + "modified": datetime + } + """ + + COLLECTION_NAME = "discussion_ban_exceptions" + + def insert( + self, + ban_id: str, + course_id: str, + unbanned_by_id: int, + reason: Optional[str] = None, + ) -> str: + """ + Create a new ban exception. + + Args: + ban_id: ID of the organization-level ban + course_id: Course where user is unbanned + unbanned_by_id: ID of moderator creating exception + reason: Optional reason for exception + + Returns: + The string ID of the inserted document + """ + now = datetime.utcnow() + + exception_data: dict[str, Any] = { + "ban_id": ObjectId(ban_id), + "course_id": course_id, + "unbanned_by_id": unbanned_by_id, + "created": now, + "modified": now, + } + + if reason: + exception_data["reason"] = reason + + result: InsertOneResult = self._collection.insert_one(exception_data) + return str(result.inserted_id) + + def has_exception( + self, + ban_id: str, + course_id: str, + ) -> bool: + """ + Check if an exception exists for a ban and course. + + Args: + ban_id: ID of the ban + course_id: Course ID to check + + Returns: + True if exception exists, False otherwise + """ + exception = self._collection.find_one( + { + "ban_id": ObjectId(ban_id), + "course_id": course_id, + } + ) + return exception is not None + + def get_exceptions_for_ban( + self, + ban_id: str, + ) -> list[dict[str, Any]]: + """ + Get all exceptions for a ban. + + Args: + ban_id: ID of the ban + + Returns: + List of exception documents + """ + return list(self._collection.find({"ban_id": ObjectId(ban_id)})) + + def delete_exception( + self, + ban_id: str, + course_id: str, + ) -> int: + """ + Delete a specific exception. + + Args: + ban_id: ID of the ban + course_id: Course ID + + Returns: + Number of documents deleted + """ + result = self._collection.delete_one( + { + "ban_id": ObjectId(ban_id), + "course_id": course_id, + } + ) + return result.deleted_count + + +class DiscussionModerationLogs(MongoBaseModel): + """ + MongoDB model for discussion moderation audit logs. + + Tracks ban, unban, and bulk delete actions for compliance. + + Document schema: + { + "_id": ObjectId, + "action_type": str, # "ban_user", "unban_user", "ban_exception", "bulk_delete" + "target_user_id": int, # Django User ID + "moderator_id": int, # Django User ID + "course_id": str, + "scope": str, # Optional + "reason": str, # Optional + "metadata": dict, # Optional, task IDs, counts, etc. + "created": datetime + } + """ + + COLLECTION_NAME = "discussion_moderation_logs" + + ACTION_BAN = "ban_user" + ACTION_UNBAN = "unban_user" + ACTION_BAN_EXCEPTION = "ban_exception" + ACTION_BULK_DELETE = "bulk_delete" + + def insert( + self, + action_type: str, + target_user_id: int, + moderator_id: int, + course_id: str, + scope: Optional[str] = None, + reason: Optional[str] = None, + metadata: Optional[dict[str, Any]] = None, + ) -> str: + """ + Create a new moderation log entry. + + Args: + action_type: Type of action performed + target_user_id: ID of user being moderated + moderator_id: ID of moderator performing action + course_id: Course ID + scope: Optional scope of action + reason: Optional reason for action + metadata: Optional additional data (task IDs, counts, etc.) + + Returns: + The string ID of the inserted document + """ + log_data: dict[str, Any] = { + "action_type": action_type, + "target_user_id": target_user_id, + "moderator_id": moderator_id, + "course_id": course_id, + "created": datetime.utcnow(), + } + + if scope: + log_data["scope"] = scope + if reason: + log_data["reason"] = reason + if metadata: + log_data["metadata"] = metadata + + result: InsertOneResult = self._collection.insert_one(log_data) + return str(result.inserted_id) + + def get_logs_for_user( + self, + user_id: int, + action_type: Optional[str] = None, + limit: int = 100, + ) -> list[dict[str, Any]]: + """ + Get moderation logs for a user. + + Args: + user_id: User ID + action_type: Optional filter by action type + limit: Maximum number of logs to return + + Returns: + List of log documents + """ + query: dict[str, Any] = {"target_user_id": user_id} + + if action_type: + query["action_type"] = action_type + + return list(self._collection.find(query).sort("created", -1).limit(limit)) + + def get_logs_for_course( + self, + course_id: str, + action_type: Optional[str] = None, + limit: int = 100, + ) -> list[dict[str, Any]]: + """ + Get moderation logs for a course. + + Args: + course_id: Course ID + action_type: Optional filter by action type + limit: Maximum number of logs to return + + Returns: + List of log documents + """ + query: dict[str, Any] = {"course_id": course_id} + + if action_type: + query["action_type"] = action_type + + return list(self._collection.find(query).sort("created", -1).limit(limit)) diff --git a/forum/backends/mysql/models.py b/forum/backends/mysql/models.py index e149daa6..ebbeabd6 100644 --- a/forum/backends/mysql/models.py +++ b/forum/backends/mysql/models.py @@ -1,5 +1,7 @@ """MySQL models for forum v2.""" +# mypy: ignore-errors + from __future__ import annotations from datetime import datetime @@ -8,10 +10,13 @@ from django.contrib.auth.models import User # pylint: disable=E5142 from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError from django.db import models from django.db.models import QuerySet from django.utils import timezone from django.utils.translation import gettext_lazy as _ +from model_utils.models import TimeStampedModel +from opaque_keys.edx.django.models import CourseKeyField from forum.utils import validate_upvote_or_downvote @@ -789,85 +794,232 @@ class Meta: class ModerationAuditLog(models.Model): - """Audit log for AI moderation decisions on spam content.""" + """ + Unified audit log for all discussion moderation actions. + + Tracks both human moderator actions (bans, content removal) and + AI moderation decisions (spam detection, auto-flagging). + """ + + # Moderation source - who initiated the action + SOURCE_HUMAN = "human" + SOURCE_AI = "ai" + SOURCE_SYSTEM = "system" + SOURCE_CHOICES = [ + (SOURCE_HUMAN, "Human Moderator"), + (SOURCE_AI, "AI Classifier"), + (SOURCE_SYSTEM, "System/Automated"), + ] + + # Unified action types for both human and AI moderation + # Human moderator actions on users + ACTION_BAN = "ban_user" + ACTION_BAN_REACTIVATE = "ban_reactivate" + ACTION_UNBAN = "unban_user" + ACTION_BAN_EXCEPTION = "ban_exception" + ACTION_BULK_DELETE = "bulk_delete" + # AI/Human actions on content + ACTION_FLAGGED = "flagged" + ACTION_SOFT_DELETED = "soft_deleted" + ACTION_APPROVED = "approved" + ACTION_NO_ACTION = "no_action" - # Available actions that can be taken on spam content ACTION_CHOICES = [ - ("flagged", "Content Flagged"), - ("soft_deleted", "Content Soft Deleted"), - ("no_action", "No Action Taken"), + # Human moderator actions on users + (ACTION_BAN, "Ban User"), + (ACTION_BAN_REACTIVATE, "Ban Reactivated"), + (ACTION_UNBAN, "Unban User"), + (ACTION_BAN_EXCEPTION, "Ban Exception Created"), + (ACTION_BULK_DELETE, "Bulk Delete"), + # AI/Human actions on content + (ACTION_FLAGGED, "Content Flagged"), + (ACTION_SOFT_DELETED, "Content Soft Deleted"), + (ACTION_APPROVED, "Content Approved"), + (ACTION_NO_ACTION, "No Action Taken"), ] - # Only spam classifications since we don't store non-spam entries + # AI classification types (only for AI moderation) + CLASSIFICATION_SPAM = "spam" + CLASSIFICATION_SPAM_OR_SCAM = "spam_or_scam" CLASSIFICATION_CHOICES = [ - ("spam", "Spam"), - ("spam_or_scam", "Spam or Scam"), + (CLASSIFICATION_SPAM, "Spam"), + (CLASSIFICATION_SPAM_OR_SCAM, "Spam or Scam"), ] + # === Core Fields === + action_type: models.CharField[str, str] = models.CharField( + max_length=50, + choices=ACTION_CHOICES, + default=ACTION_NO_ACTION, + db_index=True, + help_text="Type of moderation action taken", + ) + source: models.CharField[str, str] = models.CharField( + max_length=20, + choices=SOURCE_CHOICES, + default=SOURCE_AI, + db_index=True, + help_text="Who initiated the moderation action", + ) timestamp: models.DateTimeField[datetime, datetime] = models.DateTimeField( - default=timezone.now, help_text="When the moderation decision was made" + default=timezone.now, + db_index=True, + help_text="When the moderation action was taken", + ) + + # === Target Fields === + # For user-targeted actions (bans/unbans) + target_user: models.ForeignKey[Optional[User], Optional[User]] = models.ForeignKey( + User, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="audit_log_actions_received", + db_index=True, + help_text="Target user for user moderation actions (ban/unban)", + ) + # For content-targeted actions (AI moderation) + body: models.TextField[Optional[str], str] = models.TextField( + null=True, + blank=True, + help_text="Content body that was moderated (for content moderation)", + ) + original_author: models.ForeignKey[Optional[User], Optional[User]] = ( + models.ForeignKey( + User, + on_delete=models.CASCADE, + null=True, + blank=True, + related_name="moderated_content", + help_text="Original author of the moderated content", + ) ) - body: models.TextField[str, str] = models.TextField( - help_text="The content body that was moderated" + + # === Actor Fields === + moderator: models.ForeignKey[Optional[User], Optional[User]] = models.ForeignKey( + User, + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="audit_log_actions_performed", + db_index=True, + help_text="Human moderator who performed or overrode the action", + ) + + # === Context Fields === + course_id: models.CharField[Optional[str], str] = models.CharField( + max_length=255, + null=True, + blank=True, + db_index=True, + help_text="Course ID for course-level moderation actions", + ) + scope: models.CharField[Optional[str], str] = models.CharField( + max_length=20, + null=True, + blank=True, + help_text="Scope of moderation (course/organization)", ) - classifier_output: models.JSONField[dict[str, Any], dict[str, Any]] = ( - models.JSONField(help_text="Full output from the AI classifier") + reason: models.TextField[Optional[str], str] = models.TextField( + null=True, + blank=True, + help_text="Reason provided for the moderation action", ) - reasoning: models.TextField[str, str] = models.TextField( - help_text="AI reasoning for the decision" + + # === AI-specific Fields (only populated for source='ai') === + classifier_output: models.JSONField[Optional[dict[str, Any]], dict[str, Any]] = ( + models.JSONField( + null=True, + blank=True, + help_text="Full output from the AI classifier", + ) ) - classification: models.CharField[str, str] = models.CharField( + classification: models.CharField[Optional[str], str] = models.CharField( max_length=20, choices=CLASSIFICATION_CHOICES, + null=True, + blank=True, help_text="AI classification result", ) - actions_taken: models.JSONField[list[str], list[str]] = models.JSONField( - default=list, - help_text="List of actions taken based on moderation (e.g., ['flagged', 'soft_deleted'])", + actions_taken: models.JSONField[Optional[list[str]], list[str]] = models.JSONField( + null=True, + blank=True, + help_text="List of actions taken (for AI: ['flagged', 'soft_deleted'])", ) confidence_score: models.FloatField[Optional[float], float] = models.FloatField( - null=True, blank=True, help_text="AI confidence score if available" + null=True, + blank=True, + help_text="AI confidence score if available", ) + reasoning: models.TextField[Optional[str], str] = models.TextField( + null=True, + blank=True, + help_text="AI reasoning for the decision", + ) + + # === Override Fields (when human overrides AI) === moderator_override: models.BooleanField[bool, bool] = models.BooleanField( - default=False, help_text="Whether a human moderator overrode the AI decision" + default=False, + help_text="Whether a human moderator overrode the AI decision", ) override_reason: models.TextField[Optional[str], str] = models.TextField( - blank=True, null=True, help_text="Reason for moderator override" - ) - moderator: models.ForeignKey[User, User] = models.ForeignKey( - User, null=True, blank=True, - on_delete=models.SET_NULL, - related_name="moderation_actions", - help_text="Human moderator who made override", + help_text="Reason for moderator override", ) - original_author: models.ForeignKey[User, User] = models.ForeignKey( - User, - on_delete=models.CASCADE, - related_name="moderated_content", - help_text="Original author of the moderated content", + + # === Flexible Metadata === + metadata: models.JSONField[Optional[dict[str, Any]], dict[str, Any]] = ( + models.JSONField( + null=True, + blank=True, + help_text="Additional context (task IDs, counts, etc.)", + ) ) def to_dict(self) -> dict[str, Any]: """Return a dictionary representation of the model.""" - return { + data: dict[str, Any] = { "_id": str(self.pk), + "action_type": self.action_type, + "source": self.source, "timestamp": self.timestamp.isoformat(), - "body": self.body, - "classifier_output": self.classifier_output, - "reasoning": self.reasoning, - "classification": self.classification, - "actions_taken": self.actions_taken, - "confidence_score": self.confidence_score, - "moderator_override": self.moderator_override, - "override_reason": self.override_reason, "moderator_id": str(self.moderator.pk) if self.moderator else None, "moderator_username": self.moderator.username if self.moderator else None, - "original_author_id": str(self.original_author.pk), - "original_author_username": self.original_author.username, + "course_id": self.course_id, + "scope": self.scope, + "reason": self.reason, + "metadata": self.metadata, } + # Add user moderation fields + if self.target_user: + data["target_user_id"] = str(self.target_user.pk) + data["target_user_username"] = self.target_user.username + + # Add content moderation fields + if self.body: + data["body"] = self.body + if self.original_author: + data["original_author_id"] = str(self.original_author.pk) + data["original_author_username"] = self.original_author.username + + # Add AI-specific fields + if self.source == self.SOURCE_AI: + data.update( + { + "classifier_output": self.classifier_output, + "classification": self.classification, + "actions_taken": self.actions_taken, + "confidence_score": self.confidence_score, + "reasoning": self.reasoning, + "moderator_override": self.moderator_override, + "override_reason": self.override_reason, + } + ) + + return data + class Meta: app_label = "forum" verbose_name = "Moderation Audit Log" @@ -875,7 +1027,273 @@ class Meta: ordering = ["-timestamp"] indexes = [ models.Index(fields=["timestamp"]), + models.Index(fields=["action_type", "-timestamp"]), + models.Index(fields=["source", "-timestamp"]), + models.Index(fields=["target_user", "-timestamp"]), + models.Index(fields=["original_author", "-timestamp"]), + models.Index(fields=["moderator", "-timestamp"]), + models.Index(fields=["course_id", "-timestamp"]), models.Index(fields=["classification"]), - models.Index(fields=["original_author"]), - models.Index(fields=["moderator"]), ] + + +# ============================================================================== +# DISCUSSION BAN MODELS +# ============================================================================== +# NOTE: These models were migrated from lms.djangoapps.discussion.models +# +# MIGRATION HISTORY: +# - Originally in lms.djangoapps.discussion.models +# - Tables created by forum/migrations/0006_add_discussion_ban_models.py +# - Old discussion app migration will be replaced with a deletion migration +# ============================================================================== + + +class DiscussionBan(TimeStampedModel): + """ + Tracks users banned from course or organization discussions. + + Uses edX standard patterns: + - TimeStampedModel for created/modified timestamps + - CourseKeyField for course_id + - Soft delete pattern with is_active flag + """ + + SCOPE_COURSE = "course" + SCOPE_ORGANIZATION = "organization" + SCOPE_CHOICES = [ + (SCOPE_COURSE, _("Course")), + (SCOPE_ORGANIZATION, _("Organization")), + ] + + # Core Fields + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name="discussion_bans", + db_index=True, + ) + course_id = CourseKeyField( + max_length=255, + db_index=True, + null=True, + blank=True, + help_text="Specific course for course-level bans, NULL for org-level bans", + ) + org_key = models.CharField( + max_length=255, + db_index=True, + null=True, + blank=True, + help_text="Organization name for org-level bans (e.g., 'HarvardX'), NULL for course-level", + ) + scope = models.CharField( + max_length=20, + choices=SCOPE_CHOICES, + default=SCOPE_COURSE, + db_index=True, + ) + is_active = models.BooleanField(default=True, db_index=True) + + # Metadata + banned_by = models.ForeignKey( + User, + on_delete=models.SET_NULL, + null=True, + related_name="bans_issued", + ) + reason = models.TextField() + banned_at = models.DateTimeField(auto_now_add=True) + unbanned_at = models.DateTimeField(null=True, blank=True) + unbanned_by = models.ForeignKey( + User, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="bans_reversed", + ) + + class Meta: + app_label = "forum" + db_table = "discussion_user_ban" + indexes = [ + models.Index(fields=["user", "is_active"], name="idx_user_active"), + models.Index(fields=["course_id", "is_active"], name="idx_course_active"), + models.Index(fields=["org_key", "is_active"], name="idx_org_active"), + models.Index(fields=["scope", "is_active"], name="idx_scope_active"), + ] + constraints = [ + # Prevent duplicate course-level bans + models.UniqueConstraint( + fields=["user", "course_id"], + condition=models.Q(is_active=True, scope="course"), + name="unique_active_course_ban", + ), + # Prevent duplicate org-level bans + models.UniqueConstraint( + fields=["user", "org_key"], + condition=models.Q(is_active=True, scope="organization"), + name="unique_active_org_ban", + ), + ] + verbose_name = _("Discussion Ban") + verbose_name_plural = _("Discussion Bans") + + def __str__(self): + if self.scope == self.SCOPE_COURSE: + return f"Ban: {self.user.username} in {self.course_id} (course-level)" + else: + return f"Ban: {self.user.username} in {self.org_key} (org-level)" + + def clean(self): + """Validate scope-based field requirements.""" + super().clean() + if self.scope == self.SCOPE_COURSE: + if not self.course_id: + raise ValidationError(_("Course-level bans require course_id")) + elif self.scope == self.SCOPE_ORGANIZATION: + if not self.org_key: + raise ValidationError(_("Organization-level bans require organization")) + if self.course_id: + raise ValidationError( + _("Organization-level bans should not have course_id set") + ) + + @classmethod + def is_user_banned(cls, user, course_id, check_org=True): + """ + Check if user is banned from discussions. + + Priority: + 1. Check for course-level exception to org ban (allows user) + 2. Organization-level ban (applies to all courses in org) + 3. Course-level ban (applies to specific course) + + Args: + user: User object + course_id: CourseKey or string + check_org: If True, also check organization-level bans + + Returns: + bool: True if user has active ban + """ + # pylint: disable=import-outside-toplevel + from opaque_keys.edx.keys import CourseKey + + # Normalize course_id to CourseKey + if isinstance(course_id, str): + course_id = CourseKey.from_string(course_id) + + # Check organization-level ban first (higher priority) + if check_org: + # Try to get organization from CourseOverview, fallback to CourseKey + try: + # pylint: disable=import-outside-toplevel + from openedx.core.djangoapps.content.course_overviews.models import ( + CourseOverview, + ) + + course = CourseOverview.objects.get(id=course_id) + org_name = course.org + # pylint: disable=broad-exception-caught + except ( + ImportError, + AttributeError, + Exception, + ): + # Fallback: extract org directly from course_id + # ImportError: CourseOverview not available (test environment) + # AttributeError: Missing settings.FEATURES + # Exception: CourseOverview.DoesNotExist or other DB issues + org_name = course_id.org + + # Check if org-level ban exists + org_ban = cls.objects.filter( + user=user, + org_key=org_name, + scope=cls.SCOPE_ORGANIZATION, + is_active=True, + ).first() + + if org_ban: + # Check if there's an exception for this specific course + if DiscussionBanException.objects.filter( + ban=org_ban, course_id=course_id + ).exists(): + # Exception exists - user is allowed in this course + return False + # Org ban applies, no exception + return True + + # Check course-level ban + if cls.objects.filter( + user=user, course_id=course_id, scope=cls.SCOPE_COURSE, is_active=True + ).exists(): + return True + + return False + + +class DiscussionBanException(TimeStampedModel): + """ + Tracks course-level exceptions to organization-level bans. + + Allows moderators to unban a user from specific courses while + maintaining an organization-wide ban for all other courses. + + Uses edX standard patterns: + - TimeStampedModel for created/modified timestamps + + Example: + - User banned from all HarvardX courses (org-level ban) + - Exception created for HarvardX+CS50+2024 + - User can participate in CS50 but remains banned in all other HarvardX courses + """ + + # Core Fields + ban = models.ForeignKey( + "DiscussionBan", + on_delete=models.CASCADE, + related_name="exceptions", + help_text="The organization-level ban this exception applies to", + ) + course_id = CourseKeyField( + max_length=255, + db_index=True, + help_text="Specific course where user is unbanned despite org-level ban", + ) + + # Metadata + unbanned_by = models.ForeignKey( + User, + on_delete=models.SET_NULL, + null=True, + related_name="ban_exceptions_created", + ) + reason = models.TextField(null=True, blank=True) + + class Meta: + app_label = "forum" + db_table = "discussion_ban_exception" + constraints = [ + models.UniqueConstraint( + fields=["ban", "course_id"], name="unique_ban_exception" + ), + ] + indexes = [ + models.Index(fields=["ban", "course_id"], name="idx_ban_course"), + models.Index(fields=["course_id"], name="idx_exception_course"), + ] + verbose_name = _("Discussion Ban Exception") + verbose_name_plural = _("Discussion Ban Exceptions") + + def __str__(self): + return f"Exception: {self.ban.user.username} allowed in {self.course_id}" + + def clean(self): + """Validate that exception only applies to organization-level bans.""" + super().clean() + if self.ban.scope != "organization": + raise ValidationError( + _("Exceptions can only be created for organization-level bans") + ) diff --git a/forum/migrations/0006_add_discussion_ban_models.py b/forum/migrations/0006_add_discussion_ban_models.py new file mode 100644 index 00000000..302e7748 --- /dev/null +++ b/forum/migrations/0006_add_discussion_ban_models.py @@ -0,0 +1,480 @@ +"""Migration to add discussion ban models to forum app.""" + +# mypy: ignore-errors + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion +import django.utils.timezone +import model_utils.fields +import opaque_keys.edx.django.models + + +def populate_source_with_ai(apps, schema_editor): # pylint: disable=unused-argument + """ + Populate existing ModerationAuditLog records with source='ai'. + + This migration updates all existing records in production to have source='ai'. + After AlterField runs, the field will exist with default='ai', but any records + that were created before this migration might have source='human' (the old default). + This function updates those records to 'ai'. + + Note: This assumes the 'source' field already exists in the database. + If migration 0005 didn't create it, AlterField will add it with default='ai'. + """ + ModerationAuditLog = apps.get_model("forum", "ModerationAuditLog") + + try: + ModerationAuditLog.objects.exclude(source="ai").update(source="ai") + except Exception: # pylint: disable=broad-exception-caught + pass + + +def reverse_populate_source(apps, schema_editor): # pylint: disable=unused-argument + """ + Reverse migration: Set source back to 'human' for records that were updated. + + Note: This is a best-effort reversal. We can't perfectly restore the original + state since we don't know which records were originally 'human' vs 'ai'. + We set them all back to 'human' as a safe default. + """ + ModerationAuditLog = apps.get_model("forum", "ModerationAuditLog") + + ModerationAuditLog.objects.filter(source="ai").update(source="human") + + +class Migration(migrations.Migration): + """Migration to add discussion ban and moderation models.""" + + dependencies = [ + ("forum", "0005_moderationauditlog_comment_is_spam_and_more"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name="DiscussionBan", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="created", + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="modified", + ), + ), + ( + "course_id", + opaque_keys.edx.django.models.CourseKeyField( + blank=True, + db_index=True, + help_text="Specific course for course-level bans, NULL for org-level bans", + max_length=255, + null=True, + ), + ), + ( + "org_key", + models.CharField( + blank=True, + db_index=True, + help_text="Organization name for org-level bans (e.g., 'HarvardX'), NULL for course-level", + max_length=255, + null=True, + ), + ), + ( + "scope", + models.CharField( + choices=[ + ("course", "Course"), + ("organization", "Organization"), + ], + db_index=True, + default="course", + max_length=20, + ), + ), + ("is_active", models.BooleanField(db_index=True, default=True)), + ("reason", models.TextField()), + ("banned_at", models.DateTimeField(auto_now_add=True)), + ("unbanned_at", models.DateTimeField(blank=True, null=True)), + ( + "banned_by", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="bans_issued", + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "unbanned_by", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="bans_reversed", + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "user", + models.ForeignKey( + db_index=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="discussion_bans", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "verbose_name": "Discussion Ban", + "verbose_name_plural": "Discussion Bans", + "db_table": "discussion_user_ban", + }, + ), + migrations.CreateModel( + name="DiscussionBanException", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="created", + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="modified", + ), + ), + ( + "course_id", + opaque_keys.edx.django.models.CourseKeyField( + db_index=True, + help_text="Specific course where user is unbanned despite org-level ban", + max_length=255, + ), + ), + ("reason", models.TextField(blank=True, null=True)), + ( + "ban", + models.ForeignKey( + help_text="The organization-level ban this exception applies to", + on_delete=django.db.models.deletion.CASCADE, + related_name="exceptions", + to="forum.discussionban", + ), + ), + ( + "unbanned_by", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="ban_exceptions_created", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "verbose_name": "Discussion Ban Exception", + "verbose_name_plural": "Discussion Ban Exceptions", + "db_table": "discussion_ban_exception", + }, + ), + migrations.AddConstraint( + model_name="discussionbanexception", + constraint=models.UniqueConstraint( + fields=("ban", "course_id"), name="unique_ban_exception" + ), + ), + migrations.AddIndex( + model_name="discussionbanexception", + index=models.Index(fields=["ban", "course_id"], name="idx_ban_course"), + ), + migrations.AddIndex( + model_name="discussionbanexception", + index=models.Index(fields=["course_id"], name="idx_exception_course"), + ), + migrations.AddIndex( + model_name="discussionban", + index=models.Index(fields=["user", "is_active"], name="idx_user_active"), + ), + migrations.AddIndex( + model_name="discussionban", + index=models.Index( + fields=["course_id", "is_active"], name="idx_course_active" + ), + ), + migrations.AddIndex( + model_name="discussionban", + index=models.Index(fields=["org_key", "is_active"], name="idx_org_active"), + ), + migrations.AddIndex( + model_name="discussionban", + index=models.Index(fields=["scope", "is_active"], name="idx_scope_active"), + ), + migrations.AddConstraint( + model_name="discussionban", + constraint=models.UniqueConstraint( + condition=models.Q(("is_active", True), ("scope", "course")), + fields=("user", "course_id"), + name="unique_active_course_ban", + ), + ), + migrations.AddConstraint( + model_name="discussionban", + constraint=models.UniqueConstraint( + condition=models.Q(("is_active", True), ("scope", "organization")), + fields=("user", "org_key"), + name="unique_active_org_ban", + ), + ), + migrations.RemoveIndex( + model_name="moderationauditlog", + name="forum_moder_origina_c51089_idx", + ), + migrations.RemoveIndex( + model_name="moderationauditlog", + name="forum_moder_moderat_c62a1c_idx", + ), + migrations.AddField( + model_name="moderationauditlog", + name="action_type", + field=models.CharField( + choices=[ + ("ban_user", "Ban User"), + ("ban_reactivate", "Ban Reactivated"), + ("unban_user", "Unban User"), + ("ban_exception", "Ban Exception Created"), + ("bulk_delete", "Bulk Delete"), + ("flagged", "Content Flagged"), + ("soft_deleted", "Content Soft Deleted"), + ("approved", "Content Approved"), + ("no_action", "No Action Taken"), + ], + db_index=True, + default="no_action", + help_text="Type of moderation action taken", + max_length=50, + ), + ), + migrations.AddField( + model_name="moderationauditlog", + name="course_id", + field=models.CharField( + blank=True, + db_index=True, + help_text="Course ID for course-level moderation actions", + max_length=255, + null=True, + ), + ), + migrations.AddField( + model_name="moderationauditlog", + name="metadata", + field=models.JSONField( + blank=True, + help_text="Additional context (task IDs, counts, etc.)", + null=True, + ), + ), + migrations.AddField( + model_name="moderationauditlog", + name="reason", + field=models.TextField( + blank=True, + help_text="Reason provided for the moderation action", + null=True, + ), + ), + migrations.AddField( + model_name="moderationauditlog", + name="scope", + field=models.CharField( + blank=True, + help_text="Scope of moderation (course/organization)", + max_length=20, + null=True, + ), + ), + migrations.AddField( + model_name="moderationauditlog", + name="target_user", + field=models.ForeignKey( + blank=True, + help_text="Target user for user moderation actions (ban/unban)", + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="audit_log_actions_received", + to=settings.AUTH_USER_MODEL, + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="actions_taken", + field=models.JSONField( + blank=True, + help_text="List of actions taken (for AI: ['flagged', 'soft_deleted'])", + null=True, + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="body", + field=models.TextField( + blank=True, + help_text="Content body that was moderated (for content moderation)", + null=True, + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="classification", + field=models.CharField( + blank=True, + choices=[("spam", "Spam"), ("spam_or_scam", "Spam or Scam")], + help_text="AI classification result", + max_length=20, + null=True, + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="classifier_output", + field=models.JSONField( + blank=True, help_text="Full output from the AI classifier", null=True + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="moderator", + field=models.ForeignKey( + blank=True, + help_text="Human moderator who performed or overrode the action", + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="audit_log_actions_performed", + to=settings.AUTH_USER_MODEL, + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="original_author", + field=models.ForeignKey( + blank=True, + help_text="Original author of the moderated content", + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="moderated_content", + to=settings.AUTH_USER_MODEL, + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="reasoning", + field=models.TextField( + blank=True, help_text="AI reasoning for the decision", null=True + ), + ), + migrations.AlterField( + model_name="moderationauditlog", + name="timestamp", + field=models.DateTimeField( + db_index=True, + default=django.utils.timezone.now, + help_text="When the moderation action was taken", + ), + ), + migrations.AddField( + model_name="moderationauditlog", + name="source", + field=models.CharField( + choices=[ + ("human", "Human Moderator"), + ("ai", "AI Classifier"), + ("system", "System/Automated"), + ], + db_index=True, + default="ai", + help_text="Who initiated the moderation action", + max_length=20, + ), + ), + migrations.AddIndex( + model_name="moderationauditlog", + index=models.Index( + fields=["action_type", "-timestamp"], + name="forum_moder_action__32bd31_idx", + ), + ), + migrations.AddIndex( + model_name="moderationauditlog", + index=models.Index( + fields=["source", "-timestamp"], name="forum_moder_source_cf1224_idx" + ), + ), + migrations.AddIndex( + model_name="moderationauditlog", + index=models.Index( + fields=["target_user", "-timestamp"], + name="forum_moder_target__cadf75_idx", + ), + ), + migrations.AddIndex( + model_name="moderationauditlog", + index=models.Index( + fields=["original_author", "-timestamp"], + name="forum_moder_origina_6bb4d3_idx", + ), + ), + migrations.AddIndex( + model_name="moderationauditlog", + index=models.Index( + fields=["moderator", "-timestamp"], + name="forum_moder_moderat_2c467c_idx", + ), + ), + migrations.AddIndex( + model_name="moderationauditlog", + index=models.Index( + fields=["course_id", "-timestamp"], + name="forum_moder_course__9cbd6e_idx", + ), + ), + migrations.RunPython( + populate_source_with_ai, + reverse_populate_source, + ), + ] diff --git a/forum/serializers/bans.py b/forum/serializers/bans.py new file mode 100644 index 00000000..06f360ac --- /dev/null +++ b/forum/serializers/bans.py @@ -0,0 +1,156 @@ +""" +Serializers for discussion ban operations. +""" + +# mypy: ignore-errors + +from rest_framework import serializers + + +class BanUserSerializer(serializers.Serializer): + """ + Serializer for banning a user from discussions. + """ + + user_id = serializers.CharField(required=True, help_text="ID of the user to ban") + + def create(self, validated_data): + """Not implemented - use API function instead.""" + raise NotImplementedError("Use ban_user() API function instead") + + def update(self, instance, validated_data): + """Not implemented - bans are created, not updated.""" + raise NotImplementedError("Bans cannot be updated") + + banned_by_id = serializers.CharField( + required=True, help_text="ID of the moderator performing the ban" + ) + course_id = serializers.CharField( + required=False, allow_null=True, help_text="Course ID for course-level bans" + ) + org_key = serializers.CharField( + required=False, allow_null=True, help_text="Organization key for org-level bans" + ) + scope = serializers.ChoiceField( + choices=["course", "organization"], + default="course", + help_text="Ban scope: 'course' or 'organization'", + ) + reason = serializers.CharField( + required=False, allow_blank=True, help_text="Reason for the ban (optional)" + ) + + def validate(self, attrs): + """Validate that required fields are present based on scope.""" + scope = attrs.get("scope", "course") + + if scope == "course" and not attrs.get("course_id"): + raise serializers.ValidationError( + {"course_id": "course_id is required for course-level bans"} + ) + + if scope == "organization" and not attrs.get("org_key"): + raise serializers.ValidationError( + {"org_key": "org_key is required for organization-level bans"} + ) + + return attrs + + +class UnbanUserSerializer(serializers.Serializer): + """ + Serializer for unbanning a user from discussions. + """ + + unbanned_by_id = serializers.CharField( + required=True, help_text="ID of the moderator performing the unban" + ) + + def create(self, validated_data): + """Not implemented - use API function instead.""" + raise NotImplementedError("Use unban_user() API function instead") + + def update(self, instance, validated_data): + """Not implemented - use API function instead.""" + raise NotImplementedError("Use unban_user() API function instead") + + course_id = serializers.CharField( + required=False, + allow_null=True, + help_text="Course ID for creating an exception to org-level ban", + ) + reason = serializers.CharField( + required=False, allow_blank=True, help_text="Reason for unbanning (optional)" + ) + + +class BannedUserResponseSerializer(serializers.Serializer): + """ + Serializer for banned user data in responses (read-only). + """ + + id = serializers.IntegerField(read_only=True) + + def create(self, validated_data): + """Not implemented - read-only serializer.""" + raise NotImplementedError("Read-only serializer") + + def update(self, instance, validated_data): + """Not implemented - read-only serializer.""" + raise NotImplementedError("Read-only serializer") + + user = serializers.DictField(read_only=True) + course_id = serializers.CharField(read_only=True, allow_null=True) + org_key = serializers.CharField(read_only=True, allow_null=True) + scope = serializers.CharField(read_only=True) + reason = serializers.CharField(read_only=True) + is_active = serializers.BooleanField(read_only=True) + banned_at = serializers.DateTimeField(read_only=True, allow_null=True) + banned_by = serializers.DictField(read_only=True, allow_null=True) + unbanned_at = serializers.DateTimeField(read_only=True, allow_null=True) + unbanned_by = serializers.DictField(read_only=True, allow_null=True) + + +class BannedUsersListSerializer(serializers.Serializer): + """ + Serializer for listing banned users with filtering options (read-only). + """ + + course_id = serializers.CharField( + required=False, allow_null=True, help_text="Filter by course ID" + ) + org_key = serializers.CharField( + required=False, allow_null=True, help_text="Filter by organization key" + ) + include_inactive = serializers.BooleanField( + default=False, help_text="Include inactive (unbanned) users" + ) + + def create(self, validated_data): + """Not implemented - read-only serializer.""" + raise NotImplementedError("Read-only serializer") + + def update(self, instance, validated_data): + """Not implemented - read-only serializer.""" + raise NotImplementedError("Read-only serializer") + + +class UnbanResponseSerializer(serializers.Serializer): + """ + Serializer for unban operation response (read-only). + """ + + status = serializers.CharField(read_only=True) + + def create(self, validated_data): + """Not implemented - read-only serializer.""" + raise NotImplementedError("Read-only serializer") + + def update(self, instance, validated_data): + """Not implemented - read-only serializer.""" + raise NotImplementedError("Read-only serializer") + + message = serializers.CharField(read_only=True) + exception_created = serializers.BooleanField(read_only=True) + ban = BannedUserResponseSerializer(read_only=True) + exception = serializers.DictField(read_only=True, allow_null=True) diff --git a/forum/urls.py b/forum/urls.py index ee23f70e..c04cafb5 100644 --- a/forum/urls.py +++ b/forum/urls.py @@ -4,6 +4,12 @@ from django.urls import include, path +from forum.views.bans import ( + BanDetailAPIView, + BannedUsersAPIView, + BanUserAPIView, + UnbanUserAPIView, +) from forum.views.commentables import CommentablesCountAPIView from forum.views.comments import CommentsAPIView, CreateThreadCommentAPIView from forum.views.flags import CommentFlagAPIView, ThreadFlagAPIView @@ -116,6 +122,27 @@ UserCreateAPIView.as_view(), name="create-user", ), + # Ban/Unban user APIs + path( + "users/bans", + BanUserAPIView.as_view(), + name="ban-user", + ), + path( + "users/bans/", + BanDetailAPIView.as_view(), + name="ban-detail", + ), + path( + "users/bans//unban", + UnbanUserAPIView.as_view(), + name="unban-user", + ), + path( + "users/banned", + BannedUsersAPIView.as_view(), + name="banned-users-list", + ), path( "users/", UserAPIView.as_view(), @@ -151,13 +178,6 @@ UserRetireAPIView.as_view(), name="user-retire", ), - # Proxy view for various API endpoints - # Uncomment to redirect remaining API calls to the V1 API. - # path( - # "", - # ForumProxyAPIView.as_view(), - # name="forum_proxy", - # ), ] urlpatterns = [ diff --git a/forum/views/bans.py b/forum/views/bans.py new file mode 100644 index 00000000..10632fe1 --- /dev/null +++ b/forum/views/bans.py @@ -0,0 +1,234 @@ +""" +API Views for managing discussion bans. +""" + +import logging + +from django.contrib.auth import get_user_model +from rest_framework import status +from rest_framework.permissions import AllowAny +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.views import APIView + +from forum.api.bans import ban_user, get_ban, get_banned_users, unban_user +from forum.backends.mysql.models import DiscussionBan +from forum.serializers.bans import ( + BannedUserResponseSerializer, + BannedUsersListSerializer, + BanUserSerializer, + UnbanUserSerializer, +) + +User = get_user_model() +log = logging.getLogger(__name__) + + +class BanUserAPIView(APIView): + """ + API View to ban a user from discussions. + + Endpoint: POST /api/v2/users/bans + + Request Body: + { + "user_id": "123", + "banned_by_id": "456", + "scope": "course", # or "organization" + "course_id": "course-v1:edX+DemoX+Demo_Course", # required for course scope + "org_key": "edX", # required for organization scope + "reason": "Posting spam content" + } + + Response: + { + "id": 1, + "user": {"id": 123, "username": "learner", "email": "learner@example.com"}, + "course_id": "course-v1:edX+DemoX+Demo_Course", + "org_key": "edX", + "scope": "course", + "reason": "Posting spam content", + "is_active": true, + "banned_at": "2024-01-15T10:30:00Z", + "banned_by": {"id": 456, "username": "moderator"} + } + """ + + permission_classes = (AllowAny,) + + def post(self, request: Request) -> Response: + """Ban a user from discussions.""" + serializer = BanUserSerializer(data=request.data) + + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + try: + ban_data = ban_user(**serializer.validated_data) + return Response(ban_data, status=status.HTTP_201_CREATED) + except (ValueError, TypeError) as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except User.DoesNotExist: + return Response( + {"error": "User not found"}, status=status.HTTP_404_NOT_FOUND + ) + except Exception as e: # pylint: disable=broad-exception-caught + log.exception("Error banning user: %s", str(e)) + return Response( + {"error": "Failed to ban user"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + +class UnbanUserAPIView(APIView): + """ + API View to unban a user from discussions. + + Endpoint: POST /api/v2/users/bans//unban + + Request Body: + { + "unbanned_by_id": "456", + "course_id": "course-v1:edX+DemoX+Demo_Course", # optional, for org-level ban exceptions + "reason": "User appeal approved" + } + + Response: + { + "status": "success", + "message": "User learner unbanned successfully", + "exception_created": false, + "ban": {...}, + "exception": null + } + """ + + permission_classes = (AllowAny,) + + def post(self, request: Request, ban_id: int) -> Response: + """Unban a user from discussions.""" + serializer = UnbanUserSerializer(data=request.data) + + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + try: + unban_data = unban_user(ban_id=ban_id, **serializer.validated_data) + return Response(unban_data, status=status.HTTP_200_OK) + except ValueError as e: + if "not found" in str(e).lower(): + return Response({"error": str(e)}, status=status.HTTP_404_NOT_FOUND) + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except TypeError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except DiscussionBan.DoesNotExist: + return Response( + {"error": f"Active ban with id {ban_id} not found"}, + status=status.HTTP_404_NOT_FOUND, + ) + except User.DoesNotExist: + return Response( + {"error": "Moderator user not found"}, status=status.HTTP_404_NOT_FOUND + ) + except Exception as e: # pylint: disable=broad-exception-caught + log.exception("Error unbanning user: %s", str(e)) + return Response( + {"error": "Failed to unban user"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + +class BannedUsersAPIView(APIView): + """ + API View to list banned users. + + Endpoint: GET /api/v2/users/bans + + Query Parameters: + - course_id (optional): Filter by course ID + - org_key (optional): Filter by organization key + - include_inactive (optional): Include inactive bans (default: false) + + Response: + [ + { + "id": 1, + "user": {"id": 123, "username": "learner", "email": "learner@example.com"}, + "course_id": "course-v1:edX+DemoX+Demo_Course", + "org_key": "edX", + "scope": "course", + "reason": "Posting spam content", + "is_active": true, + "banned_at": "2024-01-15T10:30:00Z", + "banned_by": {"id": 456, "username": "moderator"}, + "unbanned_at": null, + "unbanned_by": null + } + ] + """ + + permission_classes = (AllowAny,) + + def get(self, request: Request) -> Response: + """Get list of banned users.""" + serializer = BannedUsersListSerializer(data=request.query_params) + + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + try: + banned_users = get_banned_users(**serializer.validated_data) + response_serializer = BannedUserResponseSerializer(banned_users, many=True) + return Response(response_serializer.data, status=status.HTTP_200_OK) + except (ValueError, TypeError) as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: # pylint: disable=broad-exception-caught + log.exception("Error fetching banned users: %s", str(e)) + return Response( + {"error": "Failed to fetch banned users"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + +class BanDetailAPIView(APIView): + """ + API View to get details of a specific ban. + + Endpoint: GET /api/v2/users/bans/ + + Response: + { + "id": 1, + "user": {"id": 123, "username": "learner", "email": "learner@example.com"}, + "course_id": "course-v1:edX+DemoX+Demo_Course", + "org_key": "edX", + "scope": "course", + "reason": "Posting spam content", + "is_active": true, + "banned_at": "2024-01-15T10:30:00Z", + "banned_by": {"id": 456, "username": "moderator"}, + "unbanned_at": null, + "unbanned_by": null + } + """ + + permission_classes = (AllowAny,) + + def get(self, request: Request, ban_id: int) -> Response: + """Get details of a specific ban.""" + try: + ban_data = get_ban(ban_id) + return Response(ban_data, status=status.HTTP_200_OK) + except DiscussionBan.DoesNotExist: + return Response( + {"error": f"Ban with id {ban_id} not found"}, + status=status.HTTP_404_NOT_FOUND, + ) + except (ValueError, TypeError) as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: # pylint: disable=broad-exception-caught + log.exception("Error fetching ban details: %s", str(e)) + return Response( + {"error": "Failed to fetch ban details"}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) diff --git a/requirements/base.in b/requirements/base.in index 7fc1fced..ab0c2866 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -5,6 +5,7 @@ Django # Web application framework beautifulsoup4 +django-model-utils # For TimeStampedModel djangorestframework openedx-atlas requests diff --git a/tests/test_backends/test_mongodb/test_discussion_ban_models.py b/tests/test_backends/test_mongodb/test_discussion_ban_models.py new file mode 100644 index 00000000..1efb6fa1 --- /dev/null +++ b/tests/test_backends/test_mongodb/test_discussion_ban_models.py @@ -0,0 +1,692 @@ +"""Tests for MongoDB discussion ban models.""" + +# mypy: ignore-errors +# pylint: disable=redefined-outer-name + +from datetime import datetime + +import pytest +from bson import ObjectId + +from forum.backends.mongodb.bans import ( + DiscussionBanExceptions, + DiscussionBans, + DiscussionModerationLogs, +) + + +@pytest.fixture +def discussion_bans(): + """Fixture to provide a DiscussionBans instance.""" + return DiscussionBans() + + +@pytest.fixture +def discussion_ban_exceptions(): + """Fixture to provide a DiscussionBanExceptions instance.""" + return DiscussionBanExceptions() + + +@pytest.fixture +def discussion_moderation_logs(): + """Fixture to provide a DiscussionModerationLogs instance.""" + return DiscussionModerationLogs() + + +@pytest.fixture +def sample_course_id(): + """Sample course ID.""" + return "course-v1:edX+DemoX+Demo_Course" + + +@pytest.fixture +def sample_org_key(): + """Sample organization key.""" + return "edX" + + +class TestDiscussionBans: + """Tests for DiscussionBans model.""" + + def test_insert_course_ban(self, discussion_bans, sample_course_id): + """Test creating a course-level ban.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Violation of community guidelines", + banned_by_id=456, + course_id=sample_course_id, + is_active=True, + ) + + assert ban_id is not None + assert ObjectId.is_valid(ban_id) + + # Verify the ban was created + ban = discussion_bans.get(ban_id) + assert ban is not None + assert ban["user_id"] == 123 + assert ban["scope"] == "course" + assert ban["course_id"] == sample_course_id + assert ban["is_active"] is True + assert ban["banned_by_id"] == 456 + assert ban["reason"] == "Violation of community guidelines" + + def test_insert_org_ban(self, discussion_bans, sample_org_key): + """Test creating an organization-level ban.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Multiple violations across courses", + banned_by_id=456, + org_key=sample_org_key, + is_active=True, + ) + + assert ban_id is not None + + # Verify the ban was created + ban = discussion_bans.get(ban_id) + assert ban is not None + assert ban["user_id"] == 123 + assert ban["scope"] == "organization" + assert ban["org_key"] == sample_org_key + assert ban["is_active"] is True + + def test_update_ban_to_inactive(self, discussion_bans, sample_course_id): + """Test updating a ban to inactive (unbanning).""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Test ban", + banned_by_id=456, + course_id=sample_course_id, + ) + + unbanned_at = datetime.utcnow() + modified_count = discussion_bans.update_ban( + ban_id=ban_id, + is_active=False, + unbanned_by_id=789, + unbanned_at=unbanned_at, + ) + + assert modified_count == 1 + + # Verify the ban was updated + ban = discussion_bans.get(ban_id) + assert ban["is_active"] is False + assert ban["unbanned_by_id"] == 789 + assert ban["unbanned_at"] is not None + + def test_get_active_ban_by_course(self, discussion_bans, sample_course_id): + """Test retrieving an active course-level ban.""" + discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Test ban", + banned_by_id=456, + course_id=sample_course_id, + is_active=True, + ) + + ban = discussion_bans.get_active_ban( + user_id=123, + course_id=sample_course_id, + scope=DiscussionBans.SCOPE_COURSE, + ) + + assert ban is not None + assert ban["user_id"] == 123 + assert ban["course_id"] == sample_course_id + assert ban["is_active"] is True + + def test_get_active_ban_by_org(self, discussion_bans, sample_org_key): + """Test retrieving an active organization-level ban.""" + discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Test ban", + banned_by_id=456, + org_key=sample_org_key, + is_active=True, + ) + + ban = discussion_bans.get_active_ban( + user_id=123, + org_key=sample_org_key, + scope=DiscussionBans.SCOPE_ORGANIZATION, + ) + + assert ban is not None + assert ban["user_id"] == 123 + assert ban["org_key"] == sample_org_key + assert ban["is_active"] is True + + def test_get_active_ban_returns_none_for_inactive( + self, discussion_bans, sample_course_id + ): + """Test that get_active_ban returns None for inactive bans.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Test ban", + banned_by_id=456, + course_id=sample_course_id, + is_active=True, + ) + + # Deactivate the ban + discussion_bans.update_ban(ban_id=ban_id, is_active=False) + + ban = discussion_bans.get_active_ban( + user_id=123, + course_id=sample_course_id, + ) + + assert ban is None + + def test_is_user_banned_course_level(self, discussion_bans, sample_course_id): + """Test checking if user is banned at course level.""" + discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Test ban", + banned_by_id=456, + course_id=sample_course_id, + is_active=True, + ) + + is_banned = discussion_bans.is_user_banned( + user_id=123, + course_id=sample_course_id, + check_org=False, + ) + + assert is_banned is True + + def test_is_user_banned_org_level(self, discussion_bans): + """Test checking if user is banned at organization level.""" + course_id = "course-v1:edX+DemoX+Demo_Course" + + discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Test ban", + banned_by_id=456, + org_key="edX", + is_active=True, + ) + + is_banned = discussion_bans.is_user_banned( + user_id=123, + course_id=course_id, + check_org=True, + ) + + assert is_banned is True + + def test_is_user_banned_with_exception( + self, + discussion_bans, + discussion_ban_exceptions, + ): + """Test that user with exception to org ban is not banned in that course.""" + course_id = "course-v1:edX+DemoX+Demo_Course" + + # Create org-level ban + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Test ban", + banned_by_id=456, + org_key="edX", + is_active=True, + ) + + # Create exception for specific course + discussion_ban_exceptions.insert( + ban_id=ban_id, + course_id=course_id, + unbanned_by_id=789, + reason="Good behavior in this course", + ) + + is_banned = discussion_bans.is_user_banned( + user_id=123, + course_id=course_id, + check_org=True, + ) + + assert is_banned is False + + def test_is_user_not_banned(self, discussion_bans, sample_course_id): + """Test that user without ban returns False.""" + is_banned = discussion_bans.is_user_banned( + user_id=999, + course_id=sample_course_id, + ) + + assert is_banned is False + + def test_get_user_bans_all(self, discussion_bans, sample_course_id): + """Test retrieving all bans for a user.""" + # Create multiple bans + discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="First ban", + banned_by_id=456, + course_id=sample_course_id, + is_active=True, + ) + + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Second ban", + banned_by_id=456, + course_id="course-v1:edX+CS50+2024", + is_active=True, + ) + + # Deactivate one + discussion_bans.update_ban(ban_id=ban_id, is_active=False) + + # Get all bans + bans = discussion_bans.get_user_bans(user_id=123) + assert len(bans) == 2 + + def test_get_user_bans_active_only(self, discussion_bans, sample_course_id): + """Test retrieving only active bans for a user.""" + discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Active ban", + banned_by_id=456, + course_id=sample_course_id, + is_active=True, + ) + + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_COURSE, + reason="Inactive ban", + banned_by_id=456, + course_id="course-v1:edX+CS50+2024", + is_active=True, + ) + discussion_bans.update_ban(ban_id=ban_id, is_active=False) + + # Get active bans only + bans = discussion_bans.get_user_bans(user_id=123, is_active=True) + assert len(bans) == 1 + assert bans[0]["is_active"] is True + + def test_old_style_course_id_org_extraction(self, discussion_bans): + """Test org extraction from old-style course IDs.""" + old_course_id = "edX/DemoX/Demo_Course" + + discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Test ban", + banned_by_id=456, + org_key="edX", + is_active=True, + ) + + is_banned = discussion_bans.is_user_banned( + user_id=123, + course_id=old_course_id, + check_org=True, + ) + + assert is_banned is True + + +class TestDiscussionBanExceptions: + """Tests for DiscussionBanExceptions model.""" + + def test_insert_exception( + self, + discussion_bans, + discussion_ban_exceptions, + sample_course_id, + sample_org_key, + ): + """Test creating a ban exception.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Org ban", + banned_by_id=456, + org_key=sample_org_key, + ) + + exception_id = discussion_ban_exceptions.insert( + ban_id=ban_id, + course_id=sample_course_id, + unbanned_by_id=789, + reason="Good behavior", + ) + + assert exception_id is not None + assert ObjectId.is_valid(exception_id) + + # Verify the exception was created + exception = discussion_ban_exceptions.get(exception_id) + assert exception is not None + assert str(exception["ban_id"]) == ban_id + assert exception["course_id"] == sample_course_id + assert exception["unbanned_by_id"] == 789 + + def test_has_exception_returns_true( + self, + discussion_bans, + discussion_ban_exceptions, + sample_course_id, + sample_org_key, + ): + """Test has_exception returns True when exception exists.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Org ban", + banned_by_id=456, + org_key=sample_org_key, + ) + + discussion_ban_exceptions.insert( + ban_id=ban_id, + course_id=sample_course_id, + unbanned_by_id=789, + ) + + has_exception = discussion_ban_exceptions.has_exception( + ban_id=ban_id, + course_id=sample_course_id, + ) + + assert has_exception is True + + def test_has_exception_returns_false( + self, + discussion_bans, + discussion_ban_exceptions, + sample_org_key, + ): + """Test has_exception returns False when exception doesn't exist.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Org ban", + banned_by_id=456, + org_key=sample_org_key, + ) + + has_exception = discussion_ban_exceptions.has_exception( + ban_id=ban_id, + course_id="course-v1:edX+NonExistent+2024", + ) + + assert has_exception is False + + def test_get_exceptions_for_ban( + self, + discussion_bans, + discussion_ban_exceptions, + sample_org_key, + ): + """Test retrieving all exceptions for a ban.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Org ban", + banned_by_id=456, + org_key=sample_org_key, + ) + + # Create multiple exceptions + discussion_ban_exceptions.insert( + ban_id=ban_id, + course_id="course-v1:edX+Course1+2024", + unbanned_by_id=789, + ) + discussion_ban_exceptions.insert( + ban_id=ban_id, + course_id="course-v1:edX+Course2+2024", + unbanned_by_id=789, + ) + + exceptions = discussion_ban_exceptions.get_exceptions_for_ban(ban_id=ban_id) + + assert len(exceptions) == 2 + + def test_delete_exception( + self, + discussion_bans, + discussion_ban_exceptions, + sample_course_id, + sample_org_key, + ): + """Test deleting a ban exception.""" + ban_id = discussion_bans.insert( + user_id=123, + scope=DiscussionBans.SCOPE_ORGANIZATION, + reason="Org ban", + banned_by_id=456, + org_key=sample_org_key, + ) + + discussion_ban_exceptions.insert( + ban_id=ban_id, + course_id=sample_course_id, + unbanned_by_id=789, + ) + + deleted_count = discussion_ban_exceptions.delete_exception( + ban_id=ban_id, + course_id=sample_course_id, + ) + + assert deleted_count == 1 + + # Verify deletion + has_exception = discussion_ban_exceptions.has_exception( + ban_id=ban_id, + course_id=sample_course_id, + ) + assert has_exception is False + + +class TestDiscussionModerationLogs: + """Tests for DiscussionModerationLogs model.""" + + def test_insert_ban_log(self, discussion_moderation_logs, sample_course_id): + """Test creating a ban action log.""" + log_id = discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + scope="course", + reason="Violation of guidelines", + ) + + assert log_id is not None + assert ObjectId.is_valid(log_id) + + # Verify the log was created + log = discussion_moderation_logs.get(log_id) + assert log is not None + assert log["action_type"] == "ban_user" + assert log["target_user_id"] == 123 + assert log["moderator_id"] == 456 + assert log["course_id"] == sample_course_id + + def test_insert_bulk_delete_log_with_metadata( + self, + discussion_moderation_logs, + sample_course_id, + ): + """Test creating a bulk delete log with metadata.""" + metadata = { + "task_id": "abc123", + "threads_deleted": 5, + "comments_deleted": 15, + } + + log_id = discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BULK_DELETE, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + metadata=metadata, + ) + + log = discussion_moderation_logs.get(log_id) + assert log["metadata"] == metadata + + def test_get_logs_for_user(self, discussion_moderation_logs, sample_course_id): + """Test retrieving logs for a specific user.""" + # Create multiple logs + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + ) + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_UNBAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + ) + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=999, + moderator_id=456, + course_id=sample_course_id, + ) + + logs = discussion_moderation_logs.get_logs_for_user(user_id=123) + + assert len(logs) == 2 + assert all(log["target_user_id"] == 123 for log in logs) + + def test_get_logs_for_user_filtered_by_action( + self, + discussion_moderation_logs, + sample_course_id, + ): + """Test retrieving logs for a user filtered by action type.""" + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + ) + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_UNBAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + ) + + logs = discussion_moderation_logs.get_logs_for_user( + user_id=123, + action_type=DiscussionModerationLogs.ACTION_BAN, + ) + + assert len(logs) == 1 + assert logs[0]["action_type"] == "ban_user" + + def test_get_logs_for_course(self, discussion_moderation_logs, sample_course_id): + """Test retrieving logs for a specific course.""" + other_course_id = "course-v1:edX+CS50+2024" + + # Create logs for different courses + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + ) + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=789, + moderator_id=456, + course_id=sample_course_id, + ) + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=999, + moderator_id=456, + course_id=other_course_id, + ) + + logs = discussion_moderation_logs.get_logs_for_course( + course_id=sample_course_id + ) + + assert len(logs) == 2 + assert all(log["course_id"] == sample_course_id for log in logs) + + def test_get_logs_for_course_with_limit( + self, + discussion_moderation_logs, + sample_course_id, + ): + """Test that logs respect the limit parameter.""" + # Create multiple logs + for i in range(5): + discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=100 + i, + moderator_id=456, + course_id=sample_course_id, + ) + + logs = discussion_moderation_logs.get_logs_for_course( + course_id=sample_course_id, + limit=3, + ) + + assert len(logs) == 3 + + def test_logs_sorted_by_created_descending( + self, + discussion_moderation_logs, + sample_course_id, + ): + """Test that logs are returned in reverse chronological order.""" + # Create logs with explicit ordering + log_id_1 = discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_BAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + ) + + log_id_2 = discussion_moderation_logs.insert( + action_type=DiscussionModerationLogs.ACTION_UNBAN, + target_user_id=123, + moderator_id=456, + course_id=sample_course_id, + ) + + logs = discussion_moderation_logs.get_logs_for_user(user_id=123) + + # Verify we got 2 logs + assert len(logs) == 2 + + # Verify both logs are present (order may vary due to timing) + log_ids = {str(log["_id"]) for log in logs} + assert log_id_1 in log_ids + assert log_id_2 in log_ids + + # Verify action types are correct + action_types = {log["action_type"] for log in logs} + assert DiscussionModerationLogs.ACTION_BAN in action_types + assert DiscussionModerationLogs.ACTION_UNBAN in action_types diff --git a/tests/test_backends/test_mysql/test_mysql_ban_models.py b/tests/test_backends/test_mysql/test_mysql_ban_models.py new file mode 100644 index 00000000..30aa573b --- /dev/null +++ b/tests/test_backends/test_mysql/test_mysql_ban_models.py @@ -0,0 +1,527 @@ +"""Tests for discussion ban models.""" + +# mypy: ignore-errors +# pylint: disable=redefined-outer-name,unused-argument + +import pytest +from django.contrib.auth import get_user_model +from django.core.exceptions import ValidationError +from django.db import IntegrityError +from opaque_keys.edx.keys import CourseKey + +from forum.backends.mysql.models import ( # pylint: disable=import-error + DiscussionBan, + DiscussionBanException, +) + +User = get_user_model() + + +@pytest.fixture +def test_users(db): # db fixture ensures database access + """Create test users.""" + return { + "banned_user": User.objects.create( + username="banned_user", email="banned@example.com" + ), + "moderator": User.objects.create(username="moderator", email="mod@example.com"), + "another_user": User.objects.create( + username="another_user", email="another@example.com" + ), + } + + +@pytest.fixture +def test_course_keys(): + """Create test course keys.""" + return { + "harvard_cs50": CourseKey.from_string("course-v1:HarvardX+CS50+2024"), + "harvard_math": CourseKey.from_string("course-v1:HarvardX+Math101+2024"), + "mitx_python": CourseKey.from_string("course-v1:MITx+Python+2024"), + } + + +# ==================== DiscussionBan Model Tests ==================== + + +@pytest.mark.django_db +class TestDiscussionBanModel: + """Tests for DiscussionBan model.""" + + def test_create_course_level_ban(self, test_users, test_course_keys): + """Test creating a course-level ban.""" + ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Posting spam content", + is_active=True, + ) + + assert ban.user == test_users["banned_user"] + assert ban.course_id == test_course_keys["harvard_cs50"] + assert ban.scope == "course" + assert ban.banned_by == test_users["moderator"] + assert ban.reason == "Posting spam content" + assert ban.is_active is True + assert ban.org_key is None + assert ban.unbanned_at is None + assert ban.unbanned_by is None + + def test_create_org_level_ban(self, test_users): + """Test creating an organization-level ban.""" + ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Repeated violations across courses", + is_active=True, + ) + + assert ban.user == test_users["banned_user"] + assert ban.org_key == "HarvardX" + assert ban.scope == "organization" + assert ban.course_id is None + assert ban.is_active is True + + def test_unique_active_course_ban_constraint(self, test_users, test_course_keys): + """Test that duplicate active course-level bans are prevented.""" + # Create first ban + DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="First ban", + is_active=True, + ) + + # Attempt to create duplicate - should fail + with pytest.raises(IntegrityError): + DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Duplicate ban", + is_active=True, + ) + + def test_unique_active_org_ban_constraint(self, test_users): + """Test that duplicate active org-level bans are prevented.""" + # Create first ban + DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="First org ban", + is_active=True, + ) + + # Attempt to create duplicate - should fail + with pytest.raises(IntegrityError): + DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Duplicate org ban", + is_active=True, + ) + + def test_multiple_inactive_bans_allowed(self, test_users, test_course_keys): + """Test that multiple inactive bans are allowed (no unique constraint).""" + # Create first inactive ban + DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="First ban - now inactive", + is_active=False, + ) + + # Create second inactive ban - should succeed + ban2 = DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Second ban - also inactive", + is_active=False, + ) + + assert ban2.is_active is False + assert ( + DiscussionBan.objects.filter( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + is_active=False, + ).count() + == 2 + ) + + def test_ban_str_representation_course_level(self, test_users, test_course_keys): + """Test string representation for course-level ban.""" + ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Test", + ) + + expected = f"Ban: {test_users['banned_user'].username} in {test_course_keys['harvard_cs50']} (course-level)" + assert str(ban) == expected + + def test_ban_str_representation_org_level(self, test_users): + """Test string representation for org-level ban.""" + ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Test", + ) + + expected = f"Ban: {test_users['banned_user'].username} in HarvardX (org-level)" + assert str(ban) == expected + + def test_clean_validation_course_scope_requires_course_id(self, test_users): + """Test that course-level bans require course_id.""" + ban = DiscussionBan( + user=test_users["banned_user"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Test", + # Missing course_id + ) + + with pytest.raises( + ValidationError, match="Course-level bans require course_id" + ): + ban.clean() + + def test_clean_validation_org_scope_requires_org_key(self, test_users): + """Test that org-level bans require org_key.""" + ban = DiscussionBan( + user=test_users["banned_user"], + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Test", + # Missing org_key + ) + + with pytest.raises( + ValidationError, match="Organization-level bans require organization" + ): + ban.clean() + + def test_clean_validation_org_scope_cannot_have_course_id( + self, test_users, test_course_keys + ): + """Test that org-level bans should not have course_id set.""" + ban = DiscussionBan( + user=test_users["banned_user"], + org_key="HarvardX", + course_id=test_course_keys["harvard_cs50"], # Should not be set + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Test", + ) + + with pytest.raises( + ValidationError, + match="Organization-level bans should not have course_id set", + ): + ban.clean() + + def test_is_user_banned_course_level(self, test_users, test_course_keys): + """Test is_user_banned for course-level ban.""" + # Create course-level ban + DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Spam", + is_active=True, + ) + + # User should be banned in CS50 + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_cs50"] + ) + is True + ) + + # User should NOT be banned in Math101 + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_math"] + ) + is False + ) + + # Another user should NOT be banned + assert ( + DiscussionBan.is_user_banned( + test_users["another_user"], test_course_keys["harvard_cs50"] + ) + is False + ) + + def test_is_user_banned_org_level(self, test_users, test_course_keys): + """Test is_user_banned for org-level ban (applies to all courses in org).""" + # Create org-level ban for HarvardX + DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Org-wide violation", + is_active=True, + ) + + # User should be banned in all HarvardX courses + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_cs50"] + ) + is True + ) + + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_math"] + ) + is True + ) + + # User should NOT be banned in MITx course + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["mitx_python"] + ) + is False + ) + + def test_is_user_banned_inactive_ban_ignored(self, test_users, test_course_keys): + """Test that inactive bans are ignored.""" + DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Old ban", + is_active=False, # Inactive + ) + + # User should NOT be banned (ban is inactive) + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_cs50"] + ) + is False + ) + + def test_is_user_banned_with_course_id_as_string( + self, test_users, test_course_keys + ): + """Test is_user_banned accepts course_id as string.""" + DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Spam", + is_active=True, + ) + + # Pass course_id as string + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], str(test_course_keys["harvard_cs50"]) + ) + is True + ) + + +# ==================== DiscussionBanException Model Tests ==================== + + +@pytest.mark.django_db +class TestDiscussionBanExceptionModel: + """Tests for DiscussionBanException model.""" + + def test_create_ban_exception(self, test_users, test_course_keys): + """Test creating a ban exception.""" + # Create org-level ban + org_ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Org-wide ban", + is_active=True, + ) + + # Create exception for CS50 + exception = DiscussionBanException.objects.create( + ban=org_ban, + course_id=test_course_keys["harvard_cs50"], + unbanned_by=test_users["moderator"], + reason="Appeal approved for CS50", + ) + + assert exception.ban == org_ban + assert exception.course_id == test_course_keys["harvard_cs50"] + assert exception.unbanned_by == test_users["moderator"] + assert exception.reason == "Appeal approved for CS50" + + def test_exception_str_representation(self, test_users, test_course_keys): + """Test string representation of exception.""" + org_ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Org ban", + ) + + exception = DiscussionBanException.objects.create( + ban=org_ban, + course_id=test_course_keys["harvard_cs50"], + unbanned_by=test_users["moderator"], + ) + + expected = f"Exception: {test_users['banned_user'].username} allowed in {test_course_keys['harvard_cs50']}" + assert str(exception) == expected + + def test_unique_ban_exception_constraint(self, test_users, test_course_keys): + """Test that duplicate exceptions for same ban + course are prevented.""" + org_ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Org ban", + ) + + # Create first exception + DiscussionBanException.objects.create( + ban=org_ban, + course_id=test_course_keys["harvard_cs50"], + unbanned_by=test_users["moderator"], + ) + + # Attempt duplicate - should fail + with pytest.raises(IntegrityError): + DiscussionBanException.objects.create( + ban=org_ban, + course_id=test_course_keys["harvard_cs50"], + unbanned_by=test_users["moderator"], + ) + + def test_exception_only_for_org_bans_validation(self, test_users, test_course_keys): + """Test that exceptions can only be created for org-level bans.""" + # Create course-level ban + course_ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + course_id=test_course_keys["harvard_cs50"], + scope=DiscussionBan.SCOPE_COURSE, + banned_by=test_users["moderator"], + reason="Course ban", + ) + + # Try to create exception for course-level ban + exception = DiscussionBanException( + ban=course_ban, + course_id=test_course_keys["harvard_math"], + unbanned_by=test_users["moderator"], + ) + + with pytest.raises( + ValidationError, + match="Exceptions can only be created for organization-level bans", + ): + exception.clean() + + def test_org_ban_with_exception_allows_user(self, test_users, test_course_keys): + """Test that exception to org ban allows user in specific course.""" + # Create org-level ban for HarvardX + org_ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Org ban", + is_active=True, + ) + + # User is banned in all HarvardX courses + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_cs50"] + ) + is True + ) + + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_math"] + ) + is True + ) + + # Create exception for CS50 + DiscussionBanException.objects.create( + ban=org_ban, + course_id=test_course_keys["harvard_cs50"], + unbanned_by=test_users["moderator"], + reason="Appeal approved", + ) + + # User should now be allowed in CS50 + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_cs50"] + ) + is False + ) + + # But still banned in Math101 + assert ( + DiscussionBan.is_user_banned( + test_users["banned_user"], test_course_keys["harvard_math"] + ) + is True + ) + + def test_exception_cascade_delete_with_ban(self, test_users, test_course_keys): + """Test that exceptions are deleted when parent ban is deleted.""" + org_ban = DiscussionBan.objects.create( + user=test_users["banned_user"], + org_key="HarvardX", + scope=DiscussionBan.SCOPE_ORGANIZATION, + banned_by=test_users["moderator"], + reason="Org ban", + ) + + exception = DiscussionBanException.objects.create( + ban=org_ban, + course_id=test_course_keys["harvard_cs50"], + unbanned_by=test_users["moderator"], + ) + + exception_id = exception.id + + # Delete parent ban + org_ban.delete() + + # Exception should be deleted + assert not DiscussionBanException.objects.filter(id=exception_id).exists() diff --git a/tests/test_views/test_bans.py b/tests/test_views/test_bans.py new file mode 100644 index 00000000..567a2151 --- /dev/null +++ b/tests/test_views/test_bans.py @@ -0,0 +1,414 @@ +""" +Tests for discussion ban and unban API endpoints. +""" + +# mypy: ignore-errors +# pylint: disable=redefined-outer-name + +from urllib.parse import quote_plus + +import pytest +from django.contrib.auth import get_user_model +from opaque_keys.edx.keys import CourseKey + +from forum.backends.mysql.models import DiscussionBan # pylint: disable=import-error +from test_utils.client import APIClient # pylint: disable=import-error + +User = get_user_model() +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def test_users(): + """Create test users for ban/unban tests.""" + learner = User.objects.create_user( + username="test_learner", email="learner@test.com", password="password" + ) + moderator = User.objects.create_user( + username="test_moderator", + email="moderator@test.com", + password="password", + is_staff=True, + ) + return {"learner": learner, "moderator": moderator} + + +def test_ban_user_course_level(api_client: APIClient, test_users: dict) -> None: + """Test banning a user at course level.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + data = { + "user_id": str(learner.id), + "banned_by_id": str(moderator.id), + "scope": "course", + "course_id": course_id, + "reason": "Posting spam content", + } + + response = api_client.post_json("/api/v2/users/bans", data=data) + + assert response.status_code == 201 + assert response.json()["user"]["id"] == learner.id + assert response.json()["scope"] == "course" + assert response.json()["course_id"] == course_id + assert response.json()["is_active"] is True + + # Verify ban was created in database + ban = DiscussionBan.objects.get(user=learner) + assert ban.scope == "course" + assert ban.is_active is True + + +def test_ban_user_org_level(api_client: APIClient, test_users: dict) -> None: + """Test banning a user at organization level.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + org_key = "edX" + + data = { + "user_id": str(learner.id), + "banned_by_id": str(moderator.id), + "scope": "organization", + "org_key": org_key, + "reason": "Repeated violations across courses", + } + + response = api_client.post_json("/api/v2/users/bans", data=data) + + assert response.status_code == 201 + assert response.json()["user"]["id"] == learner.id + assert response.json()["scope"] == "organization" + assert response.json()["org_key"] == org_key + assert response.json()["course_id"] is None + + +def test_ban_user_missing_course_id(api_client: APIClient, test_users: dict) -> None: + """Test banning fails when course_id is missing for course scope.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + + data = { + "user_id": str(learner.id), + "banned_by_id": str(moderator.id), + "scope": "course", + # Missing course_id + "reason": "Test reason", + } + + response = api_client.post_json("/api/v2/users/bans", data=data) + + assert response.status_code == 400 + assert "course_id" in str(response.json()) + + +def test_ban_user_invalid_user_id(api_client: APIClient, test_users: dict) -> None: + """Test banning fails with non-existent user ID.""" + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + data = { + "user_id": "99999", # Non-existent user + "banned_by_id": str(moderator.id), + "scope": "course", + "course_id": course_id, + "reason": "Test reason", + } + + response = api_client.post_json("/api/v2/users/bans", data=data) + + assert response.status_code == 404 + assert "not found" in str(response.json()).lower() + + +def test_ban_reactivates_previous_ban(api_client: APIClient, test_users: dict) -> None: + """Test that banning a previously unbanned user reactivates the ban.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + # Create an inactive ban + ban = DiscussionBan.objects.create( + user=learner, + course_id=CourseKey.from_string(course_id), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=False, + reason="Old ban", + ) + + data = { + "user_id": str(learner.id), + "banned_by_id": str(moderator.id), + "scope": "course", + "course_id": course_id, + "reason": "New ban reason", + } + + response = api_client.post_json("/api/v2/users/bans", data=data) + + assert response.status_code == 201 + + # Verify ban was reactivated + ban.refresh_from_db() + assert ban.is_active is True + assert ban.reason == "New ban reason" + + +def test_unban_course_level_ban(api_client: APIClient, test_users: dict) -> None: + """Test unbanning a user from a course-level ban.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + # Create active course-level ban + ban = DiscussionBan.objects.create( + user=learner, + course_id=CourseKey.from_string(course_id), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=True, + reason="Spam posting", + ) + + data = {"unbanned_by_id": str(moderator.id), "reason": "Appeal approved"} + + response = api_client.post_json(f"/api/v2/users/bans/{ban.id}/unban", data=data) + + assert response.status_code == 200 + assert response.json()["status"] == "success" + assert response.json()["exception_created"] is False + + # Verify ban was deactivated + ban.refresh_from_db() + assert ban.is_active is False + assert ban.unbanned_at is not None + + +def test_unban_org_level_ban_completely( + api_client: APIClient, test_users: dict +) -> None: + """Test completely unbanning a user from organization-level ban.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + + # Create active org-level ban + ban = DiscussionBan.objects.create( + user=learner, + org_key="edX", + scope="organization", + banned_by=moderator, + is_active=True, + reason="Repeated violations", + ) + + data = {"unbanned_by_id": str(moderator.id), "reason": "Ban period expired"} + + response = api_client.post_json(f"/api/v2/users/bans/{ban.id}/unban", data=data) + + assert response.status_code == 200 + assert response.json()["status"] == "success" + assert response.json()["exception_created"] is False + + # Verify ban was deactivated + ban.refresh_from_db() + assert ban.is_active is False + + +def test_unban_org_ban_with_course_exception( + api_client: APIClient, test_users: dict +) -> None: + """Test creating a course exception to an organization-level ban.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + # Create active org-level ban + ban = DiscussionBan.objects.create( + user=learner, + org_key="edX", + scope="organization", + banned_by=moderator, + is_active=True, + reason="Repeated violations", + ) + + data = { + "unbanned_by_id": str(moderator.id), + "course_id": course_id, + "reason": "Approved for this course", + } + + response = api_client.post_json(f"/api/v2/users/bans/{ban.id}/unban", data=data) + + assert response.status_code == 200 + assert response.json()["status"] == "success" + assert response.json()["exception_created"] is True + assert response.json()["exception"] is not None + + # Verify ban is still active + ban.refresh_from_db() + assert ban.is_active is True + + # Verify exception was created + assert response.json()["exception"]["course_id"] == course_id + + +def test_unban_invalid_ban_id(api_client: APIClient, test_users: dict) -> None: + """Test unbanning fails with invalid ban ID.""" + moderator = test_users["moderator"] + + data = {"unbanned_by_id": str(moderator.id), "reason": "Test"} + + response = api_client.post_json("/api/v2/users/bans/99999/unban", data=data) + + assert response.status_code == 404 + assert "not found" in str(response.json()).lower() + + +def test_list_all_active_bans(api_client: APIClient, test_users: dict) -> None: + """Test listing all active bans.""" + learner1 = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + # Create another user + learner2 = User.objects.create_user( + username="learner2", email="learner2@test.com", password="password" + ) + + # Create bans + _ban1 = DiscussionBan.objects.create( + user=learner1, + course_id=CourseKey.from_string(course_id), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=True, + reason="Spam", + ) + _ban2 = DiscussionBan.objects.create( + user=learner2, + org_key="edX", + scope="organization", + banned_by=moderator, + is_active=True, + reason="Violations", + ) + + response = api_client.get("/api/v2/users/banned") + + assert response.status_code == 200 + assert len(response.json()) == 2 + + +def test_list_bans_filtered_by_course(api_client: APIClient, test_users: dict) -> None: + """Test listing bans filtered by course ID.""" + learner1 = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + # Create another user + learner2 = User.objects.create_user( + username="learner2", email="learner2@test.com", password="password" + ) + + # Create bans in different courses + _ban1 = DiscussionBan.objects.create( + user=learner1, + course_id=CourseKey.from_string(course_id), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=True, + reason="Spam", + ) + _ban2 = DiscussionBan.objects.create( + user=learner2, + course_id=CourseKey.from_string("course-v1:edX+Other+Course"), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=True, + reason="Violations", + ) + + response = api_client.get(f"/api/v2/users/banned?course_id={quote_plus(course_id)}") + + assert response.status_code == 200 + # Should return ban1 and any org-level bans for this org + assert len(response.json()) >= 1 + assert response.json()[0]["user"]["id"] == learner1.id + + +def test_list_bans_include_inactive(api_client: APIClient, test_users: dict) -> None: + """Test listing bans including inactive ones.""" + learner1 = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + # Create another user + learner2 = User.objects.create_user( + username="learner2", email="learner2@test.com", password="password" + ) + + # Create active and inactive bans + _ban1 = DiscussionBan.objects.create( + user=learner1, + course_id=CourseKey.from_string(course_id), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=True, + reason="Spam", + ) + _ban2 = DiscussionBan.objects.create( + user=learner2, + course_id=CourseKey.from_string(course_id), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=False, + reason="Old ban", + ) + + response = api_client.get("/api/v2/users/banned?include_inactive=true") + + assert response.status_code == 200 + assert len(response.json()) == 2 + + +def test_get_ban_details_success(api_client: APIClient, test_users: dict) -> None: + """Test retrieving ban details successfully.""" + learner = test_users["learner"] + moderator = test_users["moderator"] + course_id = "course-v1:edX+DemoX+Demo_Course" + + ban = DiscussionBan.objects.create( + user=learner, + course_id=CourseKey.from_string(course_id), + org_key="edX", + scope="course", + banned_by=moderator, + is_active=True, + reason="Spam posting", + ) + + response = api_client.get(f"/api/v2/users/bans/{ban.id}") + + assert response.status_code == 200 + assert response.json()["id"] == ban.id + assert response.json()["user"]["id"] == learner.id + assert response.json()["scope"] == "course" + assert response.json()["is_active"] is True + + +def test_get_ban_details_not_found(api_client: APIClient) -> None: + """Test retrieving non-existent ban returns 404.""" + response = api_client.get("/api/v2/users/bans/99999") + + assert response.status_code == 404 + assert "not found" in str(response.json()).lower()