Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions backend/common/security/rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
from backend.common.context import ctx
from backend.common.enums import MethodType, StatusType
from backend.common.exception import errors
from backend.common.log import log
from backend.common.security.jwt import DependsJwtAuth
from backend.core.conf import settings
from backend.utils.dynamic_import import import_module_cached


async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None: # noqa: C901
Expand Down Expand Up @@ -36,16 +34,17 @@ async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None:

# 检测用户角色
user_roles = request.user.roles
if not user_roles or all(status == 0 for status in user_roles):
raise errors.AuthorizationError(msg='用户未分配角色,请联系系统管理员')
enabled_roles = [role for role in user_roles if role.status == StatusType.enable]
if not enabled_roles:
raise errors.AuthorizationError(msg='用户所属角色已被锁定,请联系系统管理员')

# 检测用户所属角色菜单
if not any(len(role.menus) > 0 for role in user_roles):
if not any(len(role.menus) > 0 for role in enabled_roles):
raise errors.AuthorizationError(msg='用户未分配菜单,请联系系统管理员')

# 检测后台管理操作权限
method = request.method
if (method != MethodType.GET or method != MethodType.OPTIONS) and not request.user.is_staff:
if method not in {MethodType.GET, MethodType.OPTIONS} and not request.user.is_staff:
raise errors.AuthorizationError(msg='用户已被禁止后台管理操作,请联系系统管理员')

# RBAC 鉴权
Expand All @@ -62,7 +61,7 @@ async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None:

# 菜单去重
unique_menus = {}
for role in user_roles:
for role in enabled_roles:
for menu in role.menus:
unique_menus[menu.id] = menu

Expand All @@ -74,12 +73,11 @@ async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None:
if path_auth_perm not in allow_perms:
raise errors.AuthorizationError
else:
# casbin 模式
try:
casbin_rbac = import_module_cached('backend.plugin.casbin_rbac.rbac')
casbin_verify = casbin_rbac.casbin_verify
except (ImportError, AttributeError) as e:
log.error(f'正在通过 casbin 执行 RBAC 权限校验,但此插件不存在: {e}')
raise errors.ServerError(msg='权限校验失败,请联系系统管理员')
from backend.plugin.casbin_rbac.rbac import casbin_verify
except ImportError:
raise errors.ServerError(msg='Casbin RBAC 插件用法导入失败,请联系系统管理员')

await casbin_verify(request)

Expand Down
Loading