From eb0e7bbd75897dabeac4dc02ca018c9c31b4187d Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Thu, 19 Mar 2026 11:41:24 +0800 Subject: [PATCH] Fix rbac permission checking logic --- backend/common/security/rbac.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/backend/common/security/rbac.py b/backend/common/security/rbac.py index 76d25c5be..1bee2f9e3 100644 --- a/backend/common/security/rbac.py +++ b/backend/common/security/rbac.py @@ -3,10 +3,8 @@ from backend.common.context import ctx from backend.common.enums import MethodType, StatusType from backend.common.exception import errors -from backend.common.log import log from backend.common.security.jwt import DependsJwtAuth from backend.core.conf import settings -from backend.utils.dynamic_import import import_module_cached async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None: # noqa: C901 @@ -36,16 +34,17 @@ async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None: # 检测用户角色 user_roles = request.user.roles - if not user_roles or all(status == 0 for status in user_roles): - raise errors.AuthorizationError(msg='用户未分配角色,请联系系统管理员') + enabled_roles = [role for role in user_roles if role.status == StatusType.enable] + if not enabled_roles: + raise errors.AuthorizationError(msg='用户所属角色已被锁定,请联系系统管理员') # 检测用户所属角色菜单 - if not any(len(role.menus) > 0 for role in user_roles): + if not any(len(role.menus) > 0 for role in enabled_roles): raise errors.AuthorizationError(msg='用户未分配菜单,请联系系统管理员') # 检测后台管理操作权限 method = request.method - if (method != MethodType.GET or method != MethodType.OPTIONS) and not request.user.is_staff: + if method not in {MethodType.GET, MethodType.OPTIONS} and not request.user.is_staff: raise errors.AuthorizationError(msg='用户已被禁止后台管理操作,请联系系统管理员') # RBAC 鉴权 @@ -62,7 +61,7 @@ async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None: # 菜单去重 unique_menus = {} - for role in user_roles: + for role in enabled_roles: for menu in role.menus: unique_menus[menu.id] = menu @@ -74,12 +73,11 @@ async def rbac_verify(request: Request, _token: str = DependsJwtAuth) -> None: if path_auth_perm not in allow_perms: raise errors.AuthorizationError else: + # casbin 模式 try: - casbin_rbac = import_module_cached('backend.plugin.casbin_rbac.rbac') - casbin_verify = casbin_rbac.casbin_verify - except (ImportError, AttributeError) as e: - log.error(f'正在通过 casbin 执行 RBAC 权限校验,但此插件不存在: {e}') - raise errors.ServerError(msg='权限校验失败,请联系系统管理员') + from backend.plugin.casbin_rbac.rbac import casbin_verify + except ImportError: + raise errors.ServerError(msg='Casbin RBAC 插件用法导入失败,请联系系统管理员') await casbin_verify(request)