From e88cd3e256977fbc4a6487ed2d4be94f57cf32de Mon Sep 17 00:00:00 2001 From: guohelu <19503896967@163.com> Date: Fri, 21 Nov 2025 14:19:46 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E5=88=9B=E5=BB=BA=E4=BB=BB=E5=8A=A1=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=20--story=3D128208486=20#=20Reviewed,=20tran?= =?UTF-8?q?saction=20id:=2065429?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../collections/subprocess_plugin/v1_0_0.py | 16 ++++- bkflow/space/configs.py | 18 +++++ bkflow/task/celery/tasks.py | 18 ++++- bkflow/task/domains/callback.py | 22 ++++++- bkflow/task/serializers.py | 6 +- bkflow/task/signals/handlers.py | 11 +++- bkflow/task/utils.py | 66 +++++++++++++++++++ bkflow/task/views.py | 37 ++++++++++- 8 files changed, 183 insertions(+), 11 deletions(-) diff --git a/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py b/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py index 9deb3229eb..3433c93875 100644 --- a/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py +++ b/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py @@ -27,6 +27,7 @@ from bkflow.contrib.api.collections.interface import InterfaceModuleClient from bkflow.exceptions import ValidationError from bkflow.pipeline_plugins.components.collections.base import BKFlowBaseService +from bkflow.task.utils import push_task_to_queue class Subprocess(BaseModel): @@ -138,7 +139,7 @@ def _create_subprocess_task_instance(self, subprocess, template, pipeline_tree, from bkflow.task.utils import extract_extra_info with transaction.atomic(): - time_zone = timezone.pytz.timezone(settings.TIME_ZONE) or "Asia/Shanghai" + time_zone = timezone.pytz.timezone(settings.TIME_ZONE) time_stamp = datetime.datetime.now(tz=time_zone).strftime("%Y%m%d%H%M%S") create_task_data = { "name": f"{subprocess.subprocess_name}_子流程_{time_stamp}", @@ -165,7 +166,7 @@ def _create_subprocess_task_instance(self, subprocess, template, pipeline_tree, except TaskFlowRelation.DoesNotExist: root_task_id = parent_task.id - relate_info = {"node_id": self.id, "node_version": self.version} + relate_info = {"node_id": self.id, "node_version": self.version, "parent_task_id": parent_task.id} TaskFlowRelation.objects.create( task_id=task_instance.id, parent_task_id=parent_task.id, @@ -189,6 +190,7 @@ def _create_subprocess_task_instance(self, subprocess, template, pipeline_tree, def plugin_execute(self, data, parent_data): from bkflow.task.models import TaskInstance from bkflow.task.operations import TaskOperation + from bkflow.task.utils import count_running_tasks parent_task_id = parent_data.get_one_of_inputs("task_id") try: @@ -210,6 +212,16 @@ def plugin_execute(self, data, parent_data): # 设置输出并启动任务 data.set_outputs("task_id", task_instance.id) + interface_client = InterfaceModuleClient() + space_infos_result = interface_client.get_space_infos( + {"space_id": task_instance.space_id, "config_names": "concurrency_control"} + ) + space_configs = space_infos_result.get("data", {}).get("configs", {}) + concurrency_control = space_configs.get("concurrency_control", 0) + + if concurrency_control and count_running_tasks(task_instance) >= int(concurrency_control): + push_task_to_queue(settings.redis_inst, task_instance, "start") + return True task_operation = TaskOperation(task_instance=task_instance, queue=settings.BKFLOW_MODULE.code) operation_method = getattr(task_operation, "start", None) if operation_method is None: diff --git a/bkflow/space/configs.py b/bkflow/space/configs.py index 4d6c08dab6..c10faa4bfc 100644 --- a/bkflow/space/configs.py +++ b/bkflow/space/configs.py @@ -422,6 +422,24 @@ def validate(cls, value: dict): return SpacePluginConfigParser(config=value).is_valid() +# 流程并发控制 +class ConcurrencyControlConfig(BaseSpaceConfig): + name = "concurrency_control" + desc = _("流程并发控制") + default_value = 0 + LEAST_NUMBER = 1 + control = True + + @classmethod + def validate(cls, value: str): + if int(value) < cls.LEAST_NUMBER: + raise ValidationError( + f"[validate concurrency control error]: concurrency control only support {cls.LEAST_NUMBER}" + ) + + return True + + # 定义 SCHEMA_V1 对应的模型 class SchemaV1Model(BaseModel): meta_apis: str diff --git a/bkflow/task/celery/tasks.py b/bkflow/task/celery/tasks.py index c3f818bc90..fc4ca3deb6 100644 --- a/bkflow/task/celery/tasks.py +++ b/bkflow/task/celery/tasks.py @@ -40,7 +40,13 @@ from bkflow.task.node_timeout import node_timeout_handler from bkflow.task.operations import TaskNodeOperation, TaskOperation from bkflow.task.serializers import CreateTaskInstanceSerializer -from bkflow.task.utils import ATOM_FAILED, redis_inst_check, send_task_instance_message +from bkflow.task.utils import ( + ATOM_FAILED, + count_running_tasks, + push_task_to_queue, + redis_inst_check, + send_task_instance_message, +) logger = logging.getLogger("celery") @@ -192,6 +198,16 @@ def bkflow_periodic_task_start(*args, **kwargs): }, } ) + interface_client = InterfaceModuleClient() + space_infos_result = interface_client.get_space_infos( + {"space_id": task_instance.space_id, "config_names": "concurrency_control"} + ) + space_configs = space_infos_result.get("data", {}).get("configs", {}) + concurrency_control = space_configs.get("concurrency_control", 0) + + if concurrency_control and count_running_tasks(task_instance) >= int(concurrency_control): + push_task_to_queue(settings.redis_inst, task_instance, "start") + return task_operation = TaskOperation(task_instance=task_instance, queue=settings.BKFLOW_MODULE.code) operation_method = getattr(task_operation, "start") diff --git a/bkflow/task/domains/callback.py b/bkflow/task/domains/callback.py index d4504cee28..d5fd18bbc0 100644 --- a/bkflow/task/domains/callback.py +++ b/bkflow/task/domains/callback.py @@ -27,16 +27,18 @@ from pipeline.eri.models import Schedule as DBSchedule from pipeline.eri.runtime import BambooDjangoRuntime -from bkflow.task.models import TaskFlowRelation +from bkflow.contrib.api.collections.interface import InterfaceModuleClient +from bkflow.task.models import TaskInstance +from bkflow.task.utils import count_running_tasks, push_task_to_queue from bkflow.utils.redis_lock import redis_lock logger = logging.getLogger("root") class TaskCallBacker: - def __init__(self, task_id, *args, **kwargs): + def __init__(self, task_id, task_relate, *args, **kwargs): self.task_id = task_id - self.task_relate = TaskFlowRelation.objects.filter(task_id=self.task_id).first() + self.task_relate = task_relate self.extra_info = {"task_id": self.task_id, **self.task_relate.extra_info, **kwargs} def check_record_existence(self): @@ -68,6 +70,20 @@ def subprocess_callback(self): runtime.set_state(node_id=node_id, version=version, to_state=states.READY) runtime.set_state(node_id=node_id, version=version, to_state=states.RUNNING) + parent_task_id = self.extra_info["parent_task_id"] + parent_task = TaskInstance.objects.filter(id=parent_task_id).first() + + interface_client = InterfaceModuleClient() + space_infos_result = interface_client.get_space_infos( + {"space_id": parent_task.space_id, "config_names": "concurrency_control"} + ) + space_configs = space_infos_result.get("data", {}).get("configs", {}) + concurrency_control = space_configs.get("concurrency_control", 0) + + if concurrency_control and count_running_tasks(parent_task) > int(concurrency_control): + push_task_to_queue(settings.redis_inst, parent_task, "callback") + return True + bamboo_engine_api.callback(runtime=runtime, node_id=node_id, version=version, data=self.extra_info) except Exception as e: diff --git a/bkflow/task/serializers.py b/bkflow/task/serializers.py index 24fe6bb1d5..0783fe484d 100644 --- a/bkflow/task/serializers.py +++ b/bkflow/task/serializers.py @@ -118,10 +118,11 @@ class TaskInstanceSerializer(serializers.ModelSerializer): create_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z") start_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z") finish_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z") + is_wait = serializers.SerializerMethodField() class Meta: model = TaskInstance - fields = "__all__" + exclude = ["extra_info"] read_only_fields = ( "id", "instance_id", @@ -141,6 +142,9 @@ class Meta: "tree_info_id", ) + def get_is_wait(self, instance): + return instance.extra_info.get("is_waiting", False) + class RetrieveTaskInstanceSerializer(TaskInstanceSerializer): pipeline_tree = serializers.SerializerMethodField() diff --git a/bkflow/task/signals/handlers.py b/bkflow/task/signals/handlers.py index 54b6ccdb24..d7fdfbb5c3 100644 --- a/bkflow/task/signals/handlers.py +++ b/bkflow/task/signals/handlers.py @@ -33,7 +33,12 @@ TaskInstance, TimeoutNodeConfig, ) -from bkflow.task.utils import ATOM_FAILED, TASK_FINISHED, redis_inst_check +from bkflow.task.utils import ( + ATOM_FAILED, + TASK_FINISHED, + process_task_from_queue, + redis_inst_check, +) logger = logging.getLogger("root") @@ -99,6 +104,7 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version, queue=f"task_common_{settings.BKFLOW_MODULE.code}", routing_key=f"task_common_{settings.BKFLOW_MODULE.code}", ) + process_task_from_queue(settings.redis_inst, root_id) elif to_state == bamboo_engine_states.REVOKED and node_id == root_id: try: TaskInstance.objects.set_revoked(root_id) @@ -119,6 +125,7 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version, routing_key=f"task_common_{settings.BKFLOW_MODULE.code}", ) _check_and_callback(root_id, task_success=True) + process_task_from_queue(settings.redis_inst, root_id) try: _node_timeout_info_update(settings.redis_inst, to_state, node_id, version) @@ -143,7 +150,7 @@ def task_callback(task_id, retry_times=0, *args, **kwargs): task_relate = TaskFlowRelation.objects.filter(task_id=task_id).first() if not task_relate: return - tcb = TaskCallBacker(task_id, *args, **kwargs) + tcb = TaskCallBacker(task_id, task_relate, *args, **kwargs) if not tcb.check_record_existence(): message = f"[task_callback] task_id {task_id} does not in TaskCallBackRecord." logger.error(message) diff --git a/bkflow/task/utils.py b/bkflow/task/utils.py index 8606b08343..2cf5efb10b 100644 --- a/bkflow/task/utils.py +++ b/bkflow/task/utils.py @@ -166,3 +166,69 @@ def extract_extra_info(constants, keys=None): for key in list(constants.keys()) if not keys else keys: extra_info.update({key: {"name": constants[key]["name"], "value": constants[key]["value"]}}) return json.dumps(extra_info, ensure_ascii=False) + + +@redis_inst_check +def push_task_to_queue(redis_cli, task, operation, node_id=None, data=None): + template_id = task.template_id + redis_key = f"task_wait_{template_id}" + + task_data = {"operation": operation, "task_id": task.id} + if node_id: + task_data.update({"node_id": node_id}) + if data: + task_data.update({"node_data": data}) + task_json = json.dumps(task_data) + redis_cli.rpush(redis_key, task_json) + task.extra_info = {"is_waiting": True} + task.save() + return True + + +def process_task_from_queue(redis_cli, instance_id): + from bkflow.task.models import TaskInstance + from bkflow.task.operations import TaskNodeOperation, TaskOperation + + template_id = TaskInstance.objects.get(instance_id=instance_id).template_id + redis_key = f"task_wait_{template_id}" + task_json = redis_cli.lpop(redis_key) + if not task_json: + return None + + task_data = json.loads(task_json) + operation = task_data.get("operation") + task_instance = TaskInstance.objects.get(id=task_data.get("task_id")) + if operation in ["start", "resume"]: + task_operation = TaskOperation(task_instance, settings.BKFLOW_MODULE.code) + operation_method = getattr(task_operation, operation, None) + else: + node_operation = TaskNodeOperation(task_instance, task_data.get("node_id")) + operation_method = getattr(node_operation, operation, None) + + operation_method(operator=operation, **task_data.get("node_data", {})) + task_instance.extra_info = {"is_waiting": False} + task_instance.save() + return task_instance + + +def count_running_tasks(task_instance): + from bkflow.task.models import TaskInstance + from bkflow.task.operations import TaskOperation + + space_id = task_instance.space_id + template_id = task_instance.template_id + task_instances = TaskInstance.objects.filter(space_id=space_id, template_id=template_id, is_deleted=False) + + task_operations = [ + {"task_id": task_instance.id, "operation": TaskOperation(task_instance=task_instance).get_task_states()} + for task_instance in task_instances + ] + + task_count = 0 + for task_operation in task_operations: + if task_operation["operation"].result is False: + continue + if task_operation["operation"].data.get("state") == "RUNNING": + task_count += 1 + + return task_count diff --git a/bkflow/task/views.py b/bkflow/task/views.py index 42232af140..826461f6da 100644 --- a/bkflow/task/views.py +++ b/bkflow/task/views.py @@ -34,6 +34,7 @@ TaskOperationType, TaskTriggerMethod, ) +from bkflow.contrib.api.collections.interface import InterfaceModuleClient from bkflow.contrib.openapi.serializers import ( EmptyBodySerializer, GetNodeDetailQuerySerializer, @@ -68,6 +69,7 @@ TaskOperationRecordSerializer, UpdatePeriodicTaskSerializer, ) +from bkflow.task.utils import count_running_tasks, push_task_to_queue from bkflow.utils.handlers import handle_plain_log from bkflow.utils.mixins import BKFLOWCommonMixin from bkflow.utils.permissions import AdminPermission, AppInternalPermission @@ -194,6 +196,21 @@ def operate(self, request, operation, *args, **kwargs): template_id=task_instance.template_id, executor=task_instance.executor, ): + + interface_client = InterfaceModuleClient() + space_infos_result = interface_client.get_space_infos( + {"space_id": task_instance.space_id, "config_names": "concurrency_control"} + ) + space_configs = space_infos_result.get("data", {}).get("configs", {}) + concurrency_control = space_configs.get("concurrency_control", 0) + + if ( + concurrency_control + and count_running_tasks(task_instance) >= int(concurrency_control) + and operation in ["start", "resume"] + ): + push_task_to_queue(settings.redis_inst, task_instance, operation) + return Response({"result": True, "data": None, "message": "success"}) task_operation = TaskOperation(task_instance=task_instance, queue=settings.BKFLOW_MODULE.code) operation_method = getattr(task_operation, operation, None) if operation_method is None: @@ -222,12 +239,28 @@ def node_operate(self, request, node_id, operation, *args, **kwargs): ): if task_instance.trigger_method == TaskTriggerMethod.subprocess.name and operation in ["skip", "retry"]: task_instance.change_parent_task_node_state_to_running() + data = request.data + operator = data.pop("operator", request.user.username) + interface_client = InterfaceModuleClient() + space_infos_result = interface_client.get_space_infos( + {"space_id": task_instance.space_id, "config_names": "concurrency_control"} + ) + space_configs = space_infos_result.get("data", {}).get("configs", {}) + concurrency_control = space_configs.get("concurrency_control", 0) + + if ( + concurrency_control + and count_running_tasks(task_instance) >= int(concurrency_control) + and operation in ["skip", "retry"] + ): + push_task_to_queue(settings.redis_inst, task_instance, operation, node_id, data) + return Response({"result": True, "data": None, "message": "success"}) + node_operation = TaskNodeOperation(task_instance=task_instance, node_id=node_id) operation_method = getattr(node_operation, operation, None) if operation_method is None: raise ValidationError("node operation not found") - data = request.data - operator = data.pop("operator", request.user.username) + operation_result = operation_method(operator=operator, **data) return Response(dict(operation_result)) From 7c442378fa6a54bc9ae562beb1add1d53e5ff22c Mon Sep 17 00:00:00 2001 From: guohelu <19503896967@163.com> Date: Fri, 21 Nov 2025 14:48:59 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=8D=95=E4=BE=A7?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E9=97=AE=E9=A2=98=20--story=3D128208486=20#?= =?UTF-8?q?=20Reviewed,=20transaction=20id:=2065437?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bkflow/task/utils.py | 9 ++++++--- tests/interface/space/test_space_config.py | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/bkflow/task/utils.py b/bkflow/task/utils.py index 2cf5efb10b..e2fde28b09 100644 --- a/bkflow/task/utils.py +++ b/bkflow/task/utils.py @@ -180,11 +180,12 @@ def push_task_to_queue(redis_cli, task, operation, node_id=None, data=None): task_data.update({"node_data": data}) task_json = json.dumps(task_data) redis_cli.rpush(redis_key, task_json) - task.extra_info = {"is_waiting": True} + task.extra_info.update({"is_waiting": True}) task.save() return True +@redis_inst_check def process_task_from_queue(redis_cli, instance_id): from bkflow.task.models import TaskInstance from bkflow.task.operations import TaskNodeOperation, TaskOperation @@ -206,7 +207,7 @@ def process_task_from_queue(redis_cli, instance_id): operation_method = getattr(node_operation, operation, None) operation_method(operator=operation, **task_data.get("node_data", {})) - task_instance.extra_info = {"is_waiting": False} + task_instance.extra_info.update({"is_waiting": False}) task_instance.save() return task_instance @@ -217,7 +218,9 @@ def count_running_tasks(task_instance): space_id = task_instance.space_id template_id = task_instance.template_id - task_instances = TaskInstance.objects.filter(space_id=space_id, template_id=template_id, is_deleted=False) + task_instances = TaskInstance.objects.filter( + space_id=space_id, template_id=template_id, is_deleted=False, is_started=True, is_finished=False + ) task_operations = [ {"task_id": task_instance.id, "operation": TaskOperation(task_instance=task_instance).get_task_states()} diff --git a/tests/interface/space/test_space_config.py b/tests/interface/space/test_space_config.py index a9920b2064..70ba55c93b 100644 --- a/tests/interface/space/test_space_config.py +++ b/tests/interface/space/test_space_config.py @@ -39,9 +39,9 @@ class TestSpaceConfigHandler: def test_get_all_configs(self): configs = SpaceConfigHandler.get_all_configs() - assert len(configs) == 11 + assert len(configs) == 12 configs = SpaceConfigHandler.get_all_configs(only_public=True) - assert len(configs) == 10 + assert len(configs) == 11 def test_get_config(self): # valid cases