diff --git a/bkflow/constants.py b/bkflow/constants.py index 1a72a87d67..c0d368753c 100644 --- a/bkflow/constants.py +++ b/bkflow/constants.py @@ -59,7 +59,9 @@ class TaskOperationType(Enum): # 任务节点操作 callback = _("回调") retry = _("重试") + loop_retry = _("循环重试") skip = _("跳过") + loop_skip = _("循环跳过") skip_exg = _("跳过失败网关") skip_cpg = _("跳过并行条件网关") pause_subproc = _("暂停节点") diff --git a/bkflow/contrib/operation_record/decorators.py b/bkflow/contrib/operation_record/decorators.py index 503a74db14..261de9ff36 100644 --- a/bkflow/contrib/operation_record/decorators.py +++ b/bkflow/contrib/operation_record/decorators.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- """ TencentBlueKing is pleased to support the open source community by making 蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available. @@ -36,7 +35,14 @@ def wrapper(func): def decorator(*args, **kwargs): result = func(*args, **kwargs) try: - recorder = OPERATION_RECORDER.recorders[recorder_type](operate_type, operate_source, extra_info) + adjusted_operate_type = operate_type + if kwargs.get("loop", False) and operate_type == "skip": + adjusted_operate_type = "loop_skip" + elif kwargs.get("loop", False) and operate_type == "retry": + adjusted_operate_type = "loop_retry" + recorder = OPERATION_RECORDER.recorders[recorder_type]( + adjusted_operate_type, operate_source, extra_info + ) record_kwargs = {**kwargs, "func_result": result} recorder.record(*args, **record_kwargs) except Exception as e: diff --git a/bkflow/pipeline_plugins/components/collections/subprocess_plugin/converter.py b/bkflow/pipeline_plugins/components/collections/subprocess_plugin/converter.py index 490b488117..e1298ca99b 100644 --- a/bkflow/pipeline_plugins/components/collections/subprocess_plugin/converter.py +++ b/bkflow/pipeline_plugins/components/collections/subprocess_plugin/converter.py @@ -30,6 +30,7 @@ class PipelineTreeSubprocessConverter: "timeout_config", "auto_retry", "template_node_id", + "loop_config", } DEFAULT_VALUES = { "error_ignorable": False, @@ -43,6 +44,7 @@ class PipelineTreeSubprocessConverter: "data": {"subprocess": {"hook": False, "need_render": False, "value": {}}}, "version": "1.0.0", }, + "loop_config": {}, } def __init__(self, pipeline_tree, constants=None): diff --git a/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py b/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py index 9d2ee0bd79..f330b9cc55 100644 --- a/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py +++ b/bkflow/pipeline_plugins/components/collections/subprocess_plugin/v1_0_0.py @@ -112,7 +112,10 @@ def _render_parent_parameters(self, pipeline_tree, parent_task): root_pipeline_inputs = { key: inputs.value for key, inputs in self.runtime.get_data_inputs(self.top_pipeline_id).items() } - context = Context(self.runtime, context_values, root_pipeline_inputs) + if self.runtime.get_node(self.id).loop_strategy: + context: Context = Context(self.runtime, context_values, root_pipeline_inputs, self.inner_loop) + else: + context = Context(self.runtime, context_values, root_pipeline_inputs) hydrated_context = context.hydrate(deformat=True) self.logger.info(f"subprocess parent hydrated context: {hydrated_context}") diff --git a/bkflow/pipeline_plugins/static/variables/loop.js b/bkflow/pipeline_plugins/static/variables/loop.js new file mode 100644 index 0000000000..e02359379f --- /dev/null +++ b/bkflow/pipeline_plugins/static/variables/loop.js @@ -0,0 +1,14 @@ + +(function () { + $.atoms.loop = [ + { + tag_code: "loop", + type: "input", + attrs: { + name: gettext("循环变量"), + hookable: true, + validation: [] + } + }, + ] +})(); diff --git a/bkflow/pipeline_plugins/variables/collections/loop.py b/bkflow/pipeline_plugins/variables/collections/loop.py new file mode 100644 index 0000000000..5b9acdfe02 --- /dev/null +++ b/bkflow/pipeline_plugins/variables/collections/loop.py @@ -0,0 +1,41 @@ +""" +TencentBlueKing is pleased to support the open source community by making +蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available. +Copyright (C) 2024 THL A29 Limited, +a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. See the License for the +specific language governing permissions and limitations under the License. + +We undertake not to change the open source license (MIT license) applicable + +to the current version of the project delivered to anyone in the future. +""" + +from django.conf import settings +from django.utils.translation import ugettext_lazy as _ +from pipeline.core.data.var import LazyVariable +from pipeline.core.flow.io import StringItemSchema + +from bkflow.pipeline_plugins.variables.base import SelfExplainVariable + + +class Loop(LazyVariable, SelfExplainVariable): + code = "loop" + name = _("循环变量") + type = "general" + tag = "loop.loop" + form = "{}variables/{}.js".format(settings.STATIC_URL, code) + schema = StringItemSchema(description=_("循环变量")) + + def get_value(self): + # 循环节点因引用 + if hasattr(self, "inner_loop") and self.inner_loop != -1: + return self.value.split(",")[self.inner_loop - 1] + # 普通节点引用 + return self.value diff --git a/bkflow/pipeline_web/parser/validator.py b/bkflow/pipeline_web/parser/validator.py index 9182c4f803..29c2ae6052 100644 --- a/bkflow/pipeline_web/parser/validator.py +++ b/bkflow/pipeline_web/parser/validator.py @@ -42,6 +42,15 @@ def validate_web_pipeline_tree(web_pipeline_tree): # constants key pattern validate key_validation_errors = [] context_values = [] + + for name, act in web_pipeline_tree["activities"].items(): + loop_config = act.get("loop_config", {}) + if not loop_config.get("enable", False): + continue + loop_params = loop_config.get("loop_params", []) + if loop_config["loop_times"] != min([len(param.split(",")) for key, param in loop_params.items()]): + raise exceptions.ParserWebTreeException("loop times not matched") + classification = classify_constants(web_pipeline_tree["constants"], is_subprocess=False) for key, const in web_pipeline_tree["constants"].items(): key_value = const.get("key") diff --git a/bkflow/task/operations.py b/bkflow/task/operations.py index aee5a6eb3d..2622b546aa 100644 --- a/bkflow/task/operations.py +++ b/bkflow/task/operations.py @@ -398,14 +398,16 @@ def retry(self, operator: str, *args, **kwargs) -> OperationResult: api_result = bamboo_engine_api.get_data(runtime=self.runtime, node_id=self.node_id) if not api_result.result: return api_result + loop_retry = kwargs.get("loop", False) return bamboo_engine_api.retry_node( - runtime=self.runtime, node_id=self.node_id, data=kwargs.get("inputs") or None + runtime=self.runtime, node_id=self.node_id, data=kwargs.get("inputs") or None, loop_retry=loop_retry ) @record_operation(RecordType.task_node.name, TaskOperationType.skip.name, TaskOperationSource.app.name) @uniform_task_operation_result def skip(self, operator: str, *args, **kwargs) -> OperationResult: - return bamboo_engine_api.skip_node(runtime=self.runtime, node_id=self.node_id) + loop_skip = kwargs.get("loop", False) + return bamboo_engine_api.skip_node(runtime=self.runtime, node_id=self.node_id, loop_skip=loop_skip) @record_operation(RecordType.task_node.name, TaskOperationType.callback.name, TaskOperationSource.api.name) @uniform_task_operation_result @@ -461,8 +463,9 @@ def get_node_detail( detail = detail[self.node_id] # 默认只请求最后一次循环结果 format_bamboo_engine_status(detail) + node_info = self.runtime.get_node(self.node_id) if loop is None or int(loop) >= detail["loop"]: - loop = detail["loop"] + loop = detail["loop"] if not node_info.loop_strategy else -1 hist_result = bamboo_engine_api.get_node_histories(runtime=runtime, node_id=self.node_id, loop=loop) if not hist_result: logger.exception("bamboo_engine_api.get_node_histories fail") @@ -482,8 +485,21 @@ def get_node_detail( detail["version"] = hist_result.data[-1]["version"] for hist in detail["histories"]: - # 重试记录必然是因为失败才重试 - hist.setdefault("state", bamboo_engine_states.FAILED) + raw_inputs = hist["inputs"].get("subprocess") + if raw_inputs: + inputs = raw_inputs["constants"] + inputs = {key[2:-1]: value.get("value") for key, value in inputs.items()} + hist["inputs"] = inputs + + # 重试记录必然是因为失败才重试,设置了循环策略的节点只有成功才能接着循环 + if node_info.loop_strategy: + if hist["skip"] or hist["outputs"].get("_result"): + state = bamboo_engine_states.FINISHED + else: + state = bamboo_engine_states.FAILED + else: + state = bamboo_engine_states.FAILED + hist.setdefault("state", state) hist["history_id"] = hist["id"] format_bamboo_engine_status(hist) # 节点未执行 diff --git a/packages/bamboo_engine-2.11.1rc1-py3-none-any.whl b/packages/bamboo_engine-2.11.1rc1-py3-none-any.whl new file mode 100644 index 0000000000..6ae869e429 Binary files /dev/null and b/packages/bamboo_engine-2.11.1rc1-py3-none-any.whl differ diff --git a/packages/bamboo_pipeline-3.29.6rc1-py3-none-any.whl b/packages/bamboo_pipeline-3.29.6rc1-py3-none-any.whl new file mode 100644 index 0000000000..812fa05c4d Binary files /dev/null and b/packages/bamboo_pipeline-3.29.6rc1-py3-none-any.whl differ diff --git a/requirements.txt b/requirements.txt index 0c50095974..42680d437f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,7 +35,9 @@ bk-notice-sdk==1.3.0 # engine service boto3==1.26.133 -bamboo-pipeline==3.29.5 +# bamboo-pipeline==3.29.5 +./packages/bamboo_engine-2.11.1rc1-py3-none-any.whl +./packages/bamboo_pipeline-3.29.6rc1-py3-none-any.whl pydantic==1.10.6 django-extensions==3.2.1