diff --git a/bkflow/permission/models.py b/bkflow/permission/models.py index 720dfbc222..48e768de58 100644 --- a/bkflow/permission/models.py +++ b/bkflow/permission/models.py @@ -30,6 +30,7 @@ from bkflow.contrib.api.collections.task import TaskComponentClient from bkflow.space.configs import TokenAutoRenewalConfig, TokenExpirationConfig from bkflow.space.models import SpaceConfig +from bkflow.template.models import Template, TemplateReference logger = logging.getLogger("root") @@ -72,6 +73,11 @@ def get_resource_tokens(self, token_id: str, resource_kwargs: dict) -> QuerySet: class Token(models.Model): + resource_method_map = { + ResourceType.TASK.value: "check_parent_task_id", + ResourceType.TEMPLATE.value: "check_template_id", + } + RESOURCE_TYPE = ( (ResourceType.TASK.value, _("任务")), (ResourceType.TEMPLATE.value, _("流程")), @@ -163,31 +169,48 @@ def verify(cls, space_id, user, resource_type, resource_id, permission_type, tok if db_token.has_expired(): return False - # todo: 此处在递归中查询接口,如果出现子流程嵌套多层导致性能问题,需要优化 - def check_parent_task_id(db_token, current_task_id): - client = TaskComponentClient(space_id=db_token.space_id) - result = client.get_task_detail(current_task_id) - - if not result.get("result"): - logger.warning( - f"[Token->verify] Failed to get task detail, task_id={current_task_id}, " - f"space_id={db_token.space_id}" - ) + if db_token.resource_id != str(resource_id): + map_func = cls.resource_method_map.get(db_token.resource_type) + if not getattr(cls, map_func)(db_token, resource_id): return False + return True - parent_task_info = result["data"].get("parent_task_info") - if not parent_task_info: - return False + @classmethod + def check_parent_task_id(cls, db_token, current_task_id): + # todo: 此处在递归中查询接口,如果出现子流程嵌套多层导致性能问题,需要优化 + client = TaskComponentClient(space_id=db_token.space_id) + result = client.get_task_detail(current_task_id) - parent_task_id = parent_task_info["task_id"] - if db_token.resource_id == str(parent_task_id): - return True + if not result.get("result"): + logger.warning( + f"[Token->verify] Failed to get task detail, task_id={current_task_id}, " + f"space_id={db_token.space_id}" + ) + return False - return check_parent_task_id(db_token, parent_task_id) + parent_task_info = result["data"].get("parent_task_info") + if not parent_task_info: + return False - if db_token.resource_id != str(resource_id): - if resource_type != ResourceType.TASK.value: - return False - if not check_parent_task_id(db_token, resource_id): - return False - return True + parent_task_id = parent_task_info["task_id"] + if db_token.resource_id == str(parent_task_id): + return True + return cls.check_parent_task_id(db_token, parent_task_id) + + @classmethod + def check_template_id(cls, db_token, current_template_id) -> bool: + """ + 递归检查给定的 resource_id(模板ID)是否属于 Token 资源模板的根模板,或祖先模板。 + """ + template_reference = TemplateReference.objects.filter(subprocess_template_id=current_template_id) + if not template_reference.exists(): + return False + root_template_ids = list(template_reference.values_list("root_template_id", flat=True)) + template_ids = Template.objects.filter(id__in=root_template_ids, is_deleted=False).values_list("id", flat=True) + + for template_id in template_ids: + if db_token.resource_id == str(template_id): + return True + if cls.check_template_id(db_token, template_id): + return True + return False