diff --git a/bkflow/interface/urls.py b/bkflow/interface/urls.py index 7b0c78ab47..846b2f248c 100644 --- a/bkflow/interface/urls.py +++ b/bkflow/interface/urls.py @@ -27,6 +27,7 @@ home, is_admin_or_current_space_superuser, is_admin_or_space_superuser, + remote_trigger, user_exit, ) @@ -36,6 +37,7 @@ url(r"^is_admin_user/$", is_admin_or_space_superuser), url(r"^is_current_space_admin/$", is_admin_or_current_space_superuser), url(r"^callback/(?P.+)/$", callback), + url(r"^remote_trigger/(?P.+)/$", remote_trigger), url(r"^get_msg_types/$", get_msg_types), url(r"^itsm_approve/$", itsm_approve), url(r"", include("bkflow.interface.task.urls")), diff --git a/bkflow/interface/views.py b/bkflow/interface/views.py index 0088048b28..7d6f4b8fe7 100644 --- a/bkflow/interface/views.py +++ b/bkflow/interface/views.py @@ -21,6 +21,8 @@ import logging import traceback +from bkflow_feel.api import parse_expression +from bkflow_feel.exceptions import ValidationError from blueapps.account import ConfFixture from blueapps.account.decorators import login_exempt from blueapps.account.handlers.response import ResponseHandler @@ -37,6 +39,9 @@ from bkflow.contrib.api.collections.task import TaskComponentClient from bkflow.space.configs import SuperusersConfig from bkflow.space.models import Space, SpaceConfig +from bkflow.template.models import Template, Trigger +from bkflow.template.serializers.trigger import RemoteTriggerSerializer +from bkflow.template.utils import create_trigger_tasks logger = logging.getLogger("root") @@ -115,9 +120,7 @@ def callback(request, token): try: callback_data = json.loads(request.body) except Exception: - message = _("节点回调失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {msg} | api callback").format( - msg=traceback.format_exc() - ) + message = _("节点回调失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {msg} | api callback").format(msg=traceback.format_exc()) logger.error(message) return JsonResponse({"result": False, "message": message}, status=400) @@ -135,3 +138,64 @@ def callback(request, token): "[callback] resp, space_id={}, task_id={}, node_id={}, resp={}".format(space_id, task_id, node_id, resp) ) return JsonResponse(resp) + + +@login_exempt +@csrf_exempt +@require_POST +def remote_trigger(request, token): + try: + trigger_data = json.loads(request.body) + trigger_serializer = RemoteTriggerSerializer(data=trigger_data) + trigger_serializer.is_valid(raise_exception=True) + except Exception: + message = _("触发器调用失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {msg} | api trigger").format(msg=traceback.format_exc()) + logger.error(message) + return JsonResponse({"result": False, "message": message}, status=400) + trigger_data = trigger_serializer.validated_data + space_id, template_id = trigger_data.get("space_id"), trigger_data.get("template_id") + trigger_id = trigger_data.get("trigger_id") + trigger_cond = trigger_data.get("condition") + + template_instance = Template.objects.filter(space_id=space_id, id=template_id) + + if not Space.exists(space_id=space_id) or not template_instance.exists(): + err_msg = f"对应 空间 {space_id} 或 流程 {template_id} 不存在" + logger.error(err_msg) + return JsonResponse({"result": False, "message": err_msg}, status=400) + + try: + instance = Trigger.objects.get( + id=trigger_id, + space_id=space_id, + template_id=template_id, + type=Trigger.TYPE_REMOTE, + is_enabled=True, + token=token, + ) + except Trigger.DoesNotExist: + err_msg = f"对应 空间 {space_id} 流程 {template_id} 触发器 {trigger_id} 不存在 请检查信息是否正确以及启用状态 token 是否正确" + logger.error(err_msg) + return JsonResponse({"result": False, "message": err_msg}, status=400) + + msg = None + try: + if not parse_expression(expression=instance.condition, context=trigger_cond): + msg = f"未达到触发条件 {instance.condition}:{trigger_cond}" + except ValidationError: + msg = f"未达到触发条件 {instance.condition}:{trigger_cond}" + # 不匹配的时候 可能直接出现解析失败 如果失败 也认为不匹配 直接返回 + if msg: + logger.info(msg) + return JsonResponse({"result": True, "message": msg}, status=200) + + try: + template_instance = template_instance.first() + trigger_data["pipeline_tree"] = template_instance.pipeline_tree + trigger_data["name"] = template_instance.name + task_id = create_trigger_tasks(trigger_data=trigger_data) + except Exception: + message = _("请求创建任务失败, 请重试. 如持续失败可联系管理员处理. {msg} | api trigger").format(msg=traceback.format_exc()) + return JsonResponse({"result": False, "message": message}, status=400) + + return JsonResponse({"result": True, "message": "success", "task_id": task_id}, status=200) diff --git a/bkflow/template/migrations/0005_auto_20250508_1434.py b/bkflow/template/migrations/0005_auto_20250508_1434.py new file mode 100644 index 0000000000..73be553b6f --- /dev/null +++ b/bkflow/template/migrations/0005_auto_20250508_1434.py @@ -0,0 +1,44 @@ +# Generated by Django 3.2.15 on 2025-05-08 06:34 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("template", "0004_auto_20240823_1544"), + ] + + operations = [ + migrations.CreateModel( + name="Trigger", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("creator", models.CharField(blank=True, max_length=32, null=True, verbose_name="创建人")), + ("create_at", models.DateTimeField(auto_now_add=True, verbose_name="创建时间")), + ("update_at", models.DateTimeField(auto_now=True, verbose_name="更新时间")), + ("updated_by", models.CharField(blank=True, max_length=32, null=True, verbose_name="修改人")), + ("is_deleted", models.BooleanField(db_index=True, default=False, verbose_name="是否软删除")), + ("space_id", models.IntegerField(help_text="Space ID")), + ("template_id", models.IntegerField(db_index=True, help_text="Related template ID")), + ("is_enabled", models.BooleanField(default=True, help_text="Indicates whether the trigger is enabled")), + ("name", models.CharField(max_length=100)), + ("condition", models.TextField(help_text="Condition for the trigger")), + ("config", models.JSONField(help_text="Configuration for the trigger")), + ("token", models.CharField(help_text="Token for remote authentication", max_length=255)), + ( + "type", + models.CharField( + choices=[("interval", "定时"), ("manual", "手动"), ("remote", "远程")], + default="manual", + help_text="Type of the trigger", + max_length=20, + ), + ), + ], + ), + migrations.AddIndex( + model_name="trigger", + index=models.Index(fields=["space_id", "template_id"], name="template_tr_space_i_0cf8a3_idx"), + ), + ] diff --git a/bkflow/template/models.py b/bkflow/template/models.py index df4c7f1444..5333ff4e20 100644 --- a/bkflow/template/models.py +++ b/bkflow/template/models.py @@ -279,3 +279,30 @@ class Meta: verbose_name_plural = "Template Mock Scheme" ordering = ["-id"] index_together = ["space_id", "template_id"] + + +class Trigger(CommonModel): + # 定义触发器类型选项 + TYPE_INTERVAL = "interval" + TYPE_MANUAL = "manual" + TYPE_REMOTE = "remote" + + TYPE_CHOICES = [ + (TYPE_INTERVAL, "定时"), # 定时 + (TYPE_REMOTE, "远程"), # 远程 + ] + space_id = models.IntegerField(help_text="Space ID") + template_id = models.IntegerField(help_text="Related template ID", db_index=True) + is_enabled = models.BooleanField(default=True, help_text="Indicates whether the trigger is enabled") + name = models.CharField(max_length=100) + condition = models.TextField(help_text="Condition for the trigger") + config = models.JSONField(help_text="Configuration for the trigger") + token = models.CharField(max_length=255, help_text="Token for remote authentication") + type = models.CharField( + max_length=20, choices=TYPE_CHOICES, default=TYPE_MANUAL, help_text="Type of the trigger" # 设置默认触发类型 + ) + + class Meta: + indexes = [ + models.Index(fields=["space_id", "template_id"]), + ] diff --git a/bkflow/template/serializers/trigger.py b/bkflow/template/serializers/trigger.py new file mode 100644 index 0000000000..59d27f145b --- /dev/null +++ b/bkflow/template/serializers/trigger.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making +蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available. +Copyright (C) 2024 THL A29 Limited, +a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. See the License for the +specific language governing permissions and limitations under the License. + +We undertake not to change the open source license (MIT license) applicable + +to the current version of the project delivered to anyone in the future. +""" + +from django.utils.translation import ugettext_lazy as _ +from rest_framework import serializers + +from bkflow.space.models import Space +from bkflow.template.models import Template, Trigger + + +class TriggerSerializer(serializers.ModelSerializer): + create_at = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S") + update_at = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S") + + class Meta: + model = Trigger + fields = "__all__" + + +class ListTriggerSerializer(serializers.Serializer): + space_id = serializers.IntegerField(help_text=_("空间ID"), required=True) + template_id = serializers.IntegerField(help_text=_("模板ID"), required=False) + + +class CreateTriggerSerializer(serializers.Serializer): + space_id = serializers.IntegerField(help_text="空间ID", required=True) + template_id = serializers.IntegerField(help_text="模板ID", required=True) + is_enabled = serializers.BooleanField(help_text="是否启用", required=False, default=True) + name = serializers.CharField(max_length=100, help_text="名称", required=True) + condition = serializers.CharField(help_text="条件", required=True) + config = serializers.JSONField(help_text="配置", required=False) + type = serializers.CharField(max_length=20, help_text="触发类型", required=True) + token = serializers.CharField(help_text="远程密钥", required=False) + + def validate_type(self, value): + valid_types = {choice[0] for choice in Trigger.TYPE_CHOICES} + if value not in valid_types: + raise serializers.ValidationError(f"Invalid type. Expected one of: {valid_types}") + return value + + def validate(self, data): + # remote 类型必须有 token + type_value = data.get("type") + token_value = data.get("token") + + space_id = data.get("space_id") + template_id = data.get("template_id") + + if type_value == Trigger.TYPE_REMOTE and not token_value: + raise serializers.ValidationError({"token": "Token is required when type is remote"}) + + if not Space.exists(space_id) or not Template.objects.filter(id=template_id, space_id=space_id).exists(): + raise serializers.ValidationError(f"对应 空间 {space_id} 或 流程 {template_id} 不存在") + return data + + +class RemoteTriggerSerializer(serializers.Serializer): + space_id = serializers.IntegerField(help_text="空间ID", required=True) + template_id = serializers.IntegerField(help_text="模板ID", required=True) + condition = serializers.JSONField(help_text="触发条件", required=True) + trigger_id = serializers.IntegerField(help_text="触发器ID", required=True) + creator = serializers.CharField(help_text="执行人", required=True) + + def validate_condition(self, value): + if not isinstance(value, dict): + raise serializers.ValidationError("Condition must be a JSON object.") + return value diff --git a/bkflow/template/urls.py b/bkflow/template/urls.py index 1a32ddda32..849b43d83e 100644 --- a/bkflow/template/urls.py +++ b/bkflow/template/urls.py @@ -25,9 +25,10 @@ AdminTemplateViewSet, TemplateMockDataViewSet, TemplateMockSchemeViewSet, - TemplateViewSet, TemplateMockTaskViewSet, + TemplateViewSet, ) +from bkflow.template.views.trigger import TriggerViewSet from bkflow.template.views.variable import VariableViewSet router = DefaultRouter() @@ -36,6 +37,7 @@ router.register(r"^template_mock_data", TemplateMockDataViewSet, basename="template_mock_data") router.register(r"^template_mock_scheme", TemplateMockSchemeViewSet, basename="template_mock_scheme") router.register(r"^template_mock_task", TemplateMockTaskViewSet, basename="template_mock_task") +router.register(r"trigger", TriggerViewSet, basename="trigger") router.register(r"", TemplateViewSet, basename="template") urlpatterns = [ diff --git a/bkflow/template/utils.py b/bkflow/template/utils.py index 62f9328028..c01935af02 100644 --- a/bkflow/template/utils.py +++ b/bkflow/template/utils.py @@ -18,9 +18,12 @@ to the current version of the project delivered to anyone in the future. """ import logging +from datetime import datetime from pipeline.core.data.expression import ConstantTemplate +from bkflow.contrib.api.collections.task import TaskComponentClient +from bkflow.exceptions import APIResponseError from bkflow.space.configs import CallbackHooksConfig from bkflow.space.models import SpaceConfig from bkflow.utils.api_client import ApiGwClient @@ -130,3 +133,32 @@ def send_callback(space_id, callback_type, data): "[send_callback] send_callback error, callback_type={}, data={}, err={}".format(callback_type, data, e) ) return + + +def create_trigger_tasks(trigger_data): + """ + 提交创建触发器任务 + """ + space_id, template_id = trigger_data.get("space_id"), trigger_data.get("template_id") + pipeline_tree = trigger_data.get("pipeline_tree") + client = TaskComponentClient(space_id=space_id) + name = trigger_data.get("name") + formatted_time = datetime.now().strftime("%Y%m%d%H%M") + task_name = f"{name}_{formatted_time}_trigger" + task_data = { + "template_id": template_id, + "space_id": space_id, + "pipeline_tree": pipeline_tree, + "creator": trigger_data["creator"], + "name": task_name, + } + resp = client.create_task(task_data) + if not resp["result"]: + raise APIResponseError(resp["message"]) + task_id = resp["data"]["id"] + trigger_data["operator"] = trigger_data["creator"] + resp = client.operate_task(task_id=task_id, operate="start", data=trigger_data) + if not resp["result"]: + raise APIResponseError(resp["message"]) + # 创建任务失败或请求失败 + return task_id diff --git a/bkflow/template/views/trigger.py b/bkflow/template/views/trigger.py new file mode 100644 index 0000000000..973a385573 --- /dev/null +++ b/bkflow/template/views/trigger.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making +蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available. +Copyright (C) 2024 THL A29 Limited, +a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. See the License for the +specific language governing permissions and limitations under the License. + +We undertake not to change the open source license (MIT license) applicable + +to the current version of the project delivered to anyone in the future. +""" + +import logging + +from rest_framework import status +from rest_framework.response import Response +from rest_framework.viewsets import ModelViewSet + +from bkflow.space.permissions import SpaceSuperuserPermission +from bkflow.template.models import Trigger +from bkflow.template.serializers.trigger import ( + CreateTriggerSerializer, + ListTriggerSerializer, + TriggerSerializer, +) +from bkflow.utils.mixins import BKFLOWDefaultPagination +from bkflow.utils.permissions import AdminPermission +from bkflow.utils.views import SimpleGenericViewSet + +logger = logging.getLogger("root") + + +class TriggerViewSet(ModelViewSet, SimpleGenericViewSet): + queryset = Trigger.objects.all() + serializer_class = TriggerSerializer + permission_classes = [AdminPermission | SpaceSuperuserPermission] + pagination_class = BKFLOWDefaultPagination + + def get_object(self): + serializer = ListTriggerSerializer(data=self.request.query_params) + serializer.is_valid(raise_exception=True) + space_id = serializer.validated_data.get("space_id") + pk = self.kwargs.get(self.lookup_field) + template_id = serializer.validated_data.get("template_id") + + queryset = self.queryset.filter(space_id=space_id) + if template_id is not None: + queryset = queryset.filter(template_id=template_id) + + obj = queryset.get(pk=pk) + return obj + + def get_queryset(self): + queryset = super().get_queryset() + serializer = ListTriggerSerializer(data=self.request.query_params) + serializer.is_valid(raise_exception=True) + + space_id = serializer.validated_data.get("space_id") + queryset = queryset.filter(space_id=space_id, is_deleted=False) + + template_id = serializer.validated_data.get("template_id") + if template_id is not None: + queryset = queryset.filter(template_id=template_id) + + return queryset + + def create(self, request, *args, **kwargs): + trigger_serializer = CreateTriggerSerializer(data=request.data) + trigger_serializer.is_valid(raise_exception=True) + trigger_data = trigger_serializer.validated_data + + space_id = trigger_serializer.validated_data.get("space_id") + template_id = trigger_serializer.validated_data.get("template_id") + + try: + # TODO 定时触发器 + trigger = Trigger.objects.create( + space_id=space_id, + template_id=template_id, + is_enabled=trigger_data.get("is_enabled", True), + name=trigger_data["name"], + condition=trigger_data["condition"], + config=trigger_data.get("config", {}), + token=trigger_data.get("token", ""), + type=trigger_data["type"], + creator=request.user.username, + ) + return Response({"id": trigger.id}, status=status.HTTP_201_CREATED) + except Exception as e: + err_msg = f"创建触发器失败 {str(e)}" + logger.error(err_msg) + return Response(exception=True, data={"detail": err_msg}) + + def partial_update(self, request, *args, **kwargs): + try: + instance = self.get_object() + except Trigger.DoesNotExist as e: + err_msg = f"更新触发器不存在 {str(e)}" + logger.error(err_msg) + return Response(exception=True, data={"detail": err_msg}, status=404) + + serializer = CreateTriggerSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + for attr, value in serializer.validated_data.items(): + setattr(instance, attr, value) + + instance.updated_by = request.user.username + updated_keys = list(serializer.validated_data.keys()) + ["updated_by", "update_at"] + try: + instance.save(update_fields=updated_keys) + except Exception as e: + err_msg = f"更新触发器失败 {str(e)}" + logger.error(err_msg) + return Response(exception=True, data={"detail": err_msg}) + return Response(status=status.HTTP_200_OK) + + def destroy(self, request, *args, **kwargs): + try: + instance = self.get_object() + instance.hard_delete() + except Trigger.DoesNotExist as e: + err_msg = f"删除触发器不存在 {str(e)}" + logger.error(err_msg) + return Response(exception=True, data={"detail": err_msg}, status=404) + return Response()