Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions backend/src/users/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
UserSchema,
UserUpdateSchema,
)
from users.auth import AuthBearer
from users.auth import AuthBearer, permission_required
from users.models import UserProfile


Expand All @@ -43,8 +43,10 @@ def login(request, payload: LoginSchema):
raise HttpError(401, "Invalid email or password") # noqa: WPS503, WPS432


@router.post("/register", response={201: UserSchema, 400: str})
@router.post("/register", response={201: UserSchema, 400: str}, auth=AuthBearer())
@permission_required('auth.add_userprofile')
def register(request, payload: RegisterSchema):
"""Handler for user creation."""
# Check if the email already exists
if UserProfile.objects.filter(email=payload.email).exists():
raise HttpError(HTTPStatus.BAD_REQUEST, "Email already exists")
Expand All @@ -54,6 +56,7 @@ def register(request, payload: RegisterSchema):
user = UserProfile.objects.create_user(
email=payload.email,
password=payload.password,
group=payload.role.value,
)
user.save()
except ValidationError as ex:
Expand All @@ -63,11 +66,13 @@ def register(request, payload: RegisterSchema):


@router.get("/", response={200: list[UserSchema]}, auth=AuthBearer())
@permission_required('auth.view_userprofile')
def get_users(request):
return list(UserProfile.objects.all())


@router.get("/{user_id}", response={200: UserSchema, 404: str}, auth=AuthBearer())
@permission_required('auth.view_userprofile')
def get_user(request, user_id: int):
try:
user = UserProfile.objects.get(id=user_id)
Expand All @@ -77,6 +82,7 @@ def get_user(request, user_id: int):


@router.patch("/{user_id}", response={200: UserSchema, 404: str}, auth=AuthBearer())
@permission_required('auth.change_userprofile')
def update_user(request, user_id: int, payload: UserUpdateSchema):
try:
user = UserProfile.objects.get(id=user_id)
Expand Down
15 changes: 15 additions & 0 deletions backend/src/users/api_schemas.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from ninja import Schema

from users.models import UserRole


class UserSchema(Schema):
id: int | None
Expand Down Expand Up @@ -36,6 +38,19 @@ class LoginSchema(Schema):
class RegisterSchema(Schema):
email: str
password: str
role: UserRole | None = UserRole.MARKETER # По умолчанию роль MARKETER

model_config = {
"json_schema_extra": {
"examples": [
{
"email": "admin@admin.com",
"password": "admin",
"role": "tech_support",
},
],
},
}


class TokenSchema(Schema):
Expand Down
14 changes: 14 additions & 0 deletions backend/src/users/auth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from functools import wraps

import jwt
from django.http import HttpResponseForbidden
from ninja.security import HttpBearer

from feedback import settings
Expand All @@ -18,3 +21,14 @@ def authenticate(self, request, token: str):
return UserProfile.objects.get(id=user_id)
except UserProfile.DoesNotExist:
return None


def permission_required(permission):
def decorator(func):
@wraps(func)
def wrapped(request, *args, **kwargs):
if not request.auth.has_perm(permission):
return HttpResponseForbidden("You do not have permission to perform this action.")
return func(request, *args, **kwargs)
return wrapped
return decorator
25 changes: 25 additions & 0 deletions backend/src/users/migrations/0002_assign_permissions_to_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 5.1.2 on 2024-11-21 10:38

from django.db import migrations


def assign_permissions_to_groups(apps, schema_editor):
Group = apps.get_model('auth', 'Group')
from users.models import ROLE_PERMISSIONS

for role, get_permissions in ROLE_PERMISSIONS.items():
group, _ = Group.objects.get_or_create(name=role)
permissions = get_permissions()
for perm in permissions:
group.permissions.add(perm.id)


class Migration(migrations.Migration):

dependencies = [
('users', '0001_initial'),
]

operations = [
migrations.RunPython(assign_permissions_to_groups),
]
33 changes: 28 additions & 5 deletions backend/src/users/models.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
from enum import Enum

from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager
from django.contrib.auth.hashers import make_password
from django.contrib.auth.models import PermissionsMixin
from django.contrib.auth.models import Group, Permission, PermissionsMixin
from django.core.validators import RegexValidator
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _


class UserRole(Enum):
TECH_SUPPORT = "tech_support" # тех. поддержка
MARKETER = "marketer" # маркетолог


def _get_permissions_for_tech_support():
return Permission.objects.all()


def _get_permissions_for_marketer():
return Permission.objects.exclude(codename__endswith='_userprofile')


ROLE_PERMISSIONS = {
UserRole.TECH_SUPPORT.value: _get_permissions_for_tech_support,
UserRole.MARKETER.value: _get_permissions_for_marketer,
}


class MyUserManager(BaseUserManager):
"""Rewrite UserManager to delete unwanted 'username' attribute."""
use_in_migrations = True

def create_user(self, email, password, **extra_fields):
def create_user(self, email, password, group, **extra_fields):
extra_fields.setdefault("is_staff", False)
extra_fields.setdefault("is_superuser", False)
return self._create_user(email, password, **extra_fields)
return self._create_user(email, password, group, **extra_fields)

def create_superuser(self, email, password, **extra_fields):
extra_fields.setdefault("is_staff", True)
Expand All @@ -25,19 +46,21 @@ def create_superuser(self, email, password, **extra_fields):
if extra_fields.get("is_superuser") is not True:
raise ValueError("Superuser must have is_superuser=True.")

return self._create_user(email, password, **extra_fields)
return self._create_user(email, password, group=UserRole.TECH_SUPPORT, **extra_fields)

def active(self):
return self.get_queryset().exclude(is_active=True)

def _create_user(self, email, password, **extra_fields):
def _create_user(self, email, password, group, **extra_fields):
"""Create and save a user with the given email and password."""
if not email:
raise ValueError('The Email field must be set')
email = self.normalize_email(email)
user = self.model(email=email, **extra_fields)
user.password = make_password(password)
user.save(using=self._db)
group, _ = Group.objects.get_or_create(name=group)
user.groups.add(group)
return user


Expand Down
Loading