Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from bkflow.contrib.api.collections.interface import InterfaceModuleClient
from bkflow.exceptions import ValidationError
from bkflow.pipeline_plugins.components.collections.base import BKFlowBaseService
from bkflow.task.utils import push_task_to_queue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

整体方案还需要补充一些可观测性的指标,来发现当前系统中,某些流程的队列在不断增加,有达到上限的风险,这个可以通过with start_trace里注入流程当前队列长度来实现



class Subprocess(BaseModel):
Expand Down Expand Up @@ -138,7 +139,7 @@ def _create_subprocess_task_instance(self, subprocess, template, pipeline_tree,
from bkflow.task.utils import extract_extra_info

with transaction.atomic():
time_zone = timezone.pytz.timezone(settings.TIME_ZONE) or "Asia/Shanghai"
time_zone = timezone.pytz.timezone(settings.TIME_ZONE)
time_stamp = datetime.datetime.now(tz=time_zone).strftime("%Y%m%d%H%M%S")
create_task_data = {
"name": f"{subprocess.subprocess_name}_子流程_{time_stamp}",
Expand All @@ -165,7 +166,7 @@ def _create_subprocess_task_instance(self, subprocess, template, pipeline_tree,
except TaskFlowRelation.DoesNotExist:
root_task_id = parent_task.id

relate_info = {"node_id": self.id, "node_version": self.version}
relate_info = {"node_id": self.id, "node_version": self.version, "parent_task_id": parent_task.id}
TaskFlowRelation.objects.create(
task_id=task_instance.id,
parent_task_id=parent_task.id,
Expand All @@ -189,6 +190,7 @@ def _create_subprocess_task_instance(self, subprocess, template, pipeline_tree,
def plugin_execute(self, data, parent_data):
from bkflow.task.models import TaskInstance
from bkflow.task.operations import TaskOperation
from bkflow.task.utils import count_running_tasks

parent_task_id = parent_data.get_one_of_inputs("task_id")
try:
Expand All @@ -210,6 +212,16 @@ def plugin_execute(self, data, parent_data):

# 设置输出并启动任务
data.set_outputs("task_id", task_instance.id)
interface_client = InterfaceModuleClient()
space_infos_result = interface_client.get_space_infos(
{"space_id": task_instance.space_id, "config_names": "concurrency_control"}
)
space_configs = space_infos_result.get("data", {}).get("configs", {})
concurrency_control = space_configs.get("concurrency_control", 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在代码里会越来越多需要读取空间配置来做调度控制的逻辑,这里获取空间配置的逻辑最好抽象一下,并作为后台worker的一个全局单例来复用配置,空间配置不是一个经常变的数据,可以适当加一个缓存(1分钟),来防止任务并发量大后,频繁调接口访问interface的问题


if concurrency_control and count_running_tasks(task_instance) >= int(concurrency_control):
push_task_to_queue(settings.redis_inst, task_instance, "start")
return True
task_operation = TaskOperation(task_instance=task_instance, queue=settings.BKFLOW_MODULE.code)
operation_method = getattr(task_operation, "start", None)
if operation_method is None:
Expand Down
18 changes: 18 additions & 0 deletions bkflow/space/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,24 @@ def validate(cls, value: dict):
return SpacePluginConfigParser(config=value).is_valid()


# 流程并发控制
class ConcurrencyControlConfig(BaseSpaceConfig):
name = "concurrency_control"
desc = _("流程并发控制")
default_value = 0
LEAST_NUMBER = 1
control = True

@classmethod
def validate(cls, value: str):
if int(value) < cls.LEAST_NUMBER:
raise ValidationError(
f"[validate concurrency control error]: concurrency control only support {cls.LEAST_NUMBER}"
)

return True


# 定义 SCHEMA_V1 对应的模型
class SchemaV1Model(BaseModel):
meta_apis: str
Expand Down
18 changes: 17 additions & 1 deletion bkflow/task/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@
from bkflow.task.node_timeout import node_timeout_handler
from bkflow.task.operations import TaskNodeOperation, TaskOperation
from bkflow.task.serializers import CreateTaskInstanceSerializer
from bkflow.task.utils import ATOM_FAILED, redis_inst_check, send_task_instance_message
from bkflow.task.utils import (
ATOM_FAILED,
count_running_tasks,
push_task_to_queue,
redis_inst_check,
send_task_instance_message,
)

logger = logging.getLogger("celery")

Expand Down Expand Up @@ -192,6 +198,16 @@ def bkflow_periodic_task_start(*args, **kwargs):
},
}
)
interface_client = InterfaceModuleClient()
space_infos_result = interface_client.get_space_infos(
{"space_id": task_instance.space_id, "config_names": "concurrency_control"}
)
space_configs = space_infos_result.get("data", {}).get("configs", {})
concurrency_control = space_configs.get("concurrency_control", 0)

if concurrency_control and count_running_tasks(task_instance) >= int(concurrency_control):
push_task_to_queue(settings.redis_inst, task_instance, "start")
return

task_operation = TaskOperation(task_instance=task_instance, queue=settings.BKFLOW_MODULE.code)
operation_method = getattr(task_operation, "start")
Expand Down
22 changes: 19 additions & 3 deletions bkflow/task/domains/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
from pipeline.eri.models import Schedule as DBSchedule
from pipeline.eri.runtime import BambooDjangoRuntime

from bkflow.task.models import TaskFlowRelation
from bkflow.contrib.api.collections.interface import InterfaceModuleClient
from bkflow.task.models import TaskInstance
from bkflow.task.utils import count_running_tasks, push_task_to_queue
from bkflow.utils.redis_lock import redis_lock

logger = logging.getLogger("root")


class TaskCallBacker:
def __init__(self, task_id, *args, **kwargs):
def __init__(self, task_id, task_relate, *args, **kwargs):
self.task_id = task_id
self.task_relate = TaskFlowRelation.objects.filter(task_id=self.task_id).first()
self.task_relate = task_relate
self.extra_info = {"task_id": self.task_id, **self.task_relate.extra_info, **kwargs}

def check_record_existence(self):
Expand Down Expand Up @@ -68,6 +70,20 @@ def subprocess_callback(self):
runtime.set_state(node_id=node_id, version=version, to_state=states.READY)
runtime.set_state(node_id=node_id, version=version, to_state=states.RUNNING)

parent_task_id = self.extra_info["parent_task_id"]
parent_task = TaskInstance.objects.filter(id=parent_task_id).first()

interface_client = InterfaceModuleClient()
space_infos_result = interface_client.get_space_infos(
{"space_id": parent_task.space_id, "config_names": "concurrency_control"}
)
space_configs = space_infos_result.get("data", {}).get("configs", {})
concurrency_control = space_configs.get("concurrency_control", 0)

if concurrency_control and count_running_tasks(parent_task) > int(concurrency_control):
push_task_to_queue(settings.redis_inst, parent_task, "callback")
return True

bamboo_engine_api.callback(runtime=runtime, node_id=node_id, version=version, data=self.extra_info)

except Exception as e:
Expand Down
6 changes: 5 additions & 1 deletion bkflow/task/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ class TaskInstanceSerializer(serializers.ModelSerializer):
create_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z")
start_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z")
finish_time = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S%z")
is_wait = serializers.SerializerMethodField()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里改成is_waiting吧


class Meta:
model = TaskInstance
fields = "__all__"
exclude = ["extra_info"]
read_only_fields = (
"id",
"instance_id",
Expand All @@ -141,6 +142,9 @@ class Meta:
"tree_info_id",
)

def get_is_wait(self, instance):
return instance.extra_info.get("is_waiting", False)


class RetrieveTaskInstanceSerializer(TaskInstanceSerializer):
pipeline_tree = serializers.SerializerMethodField()
Expand Down
11 changes: 9 additions & 2 deletions bkflow/task/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
TaskInstance,
TimeoutNodeConfig,
)
from bkflow.task.utils import ATOM_FAILED, TASK_FINISHED, redis_inst_check
from bkflow.task.utils import (
ATOM_FAILED,
TASK_FINISHED,
process_task_from_queue,
redis_inst_check,
)

logger = logging.getLogger("root")

Expand Down Expand Up @@ -99,6 +104,7 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version,
queue=f"task_common_{settings.BKFLOW_MODULE.code}",
routing_key=f"task_common_{settings.BKFLOW_MODULE.code}",
)
process_task_from_queue(settings.redis_inst, root_id)
elif to_state == bamboo_engine_states.REVOKED and node_id == root_id:
try:
TaskInstance.objects.set_revoked(root_id)
Expand All @@ -119,6 +125,7 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version,
routing_key=f"task_common_{settings.BKFLOW_MODULE.code}",
)
_check_and_callback(root_id, task_success=True)
process_task_from_queue(settings.redis_inst, root_id)

try:
_node_timeout_info_update(settings.redis_inst, to_state, node_id, version)
Expand All @@ -143,7 +150,7 @@ def task_callback(task_id, retry_times=0, *args, **kwargs):
task_relate = TaskFlowRelation.objects.filter(task_id=task_id).first()
if not task_relate:
return
tcb = TaskCallBacker(task_id, *args, **kwargs)
tcb = TaskCallBacker(task_id, task_relate, *args, **kwargs)
if not tcb.check_record_existence():
message = f"[task_callback] task_id {task_id} does not in TaskCallBackRecord."
logger.error(message)
Expand Down
69 changes: 69 additions & 0 deletions bkflow/task/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,72 @@ def extract_extra_info(constants, keys=None):
for key in list(constants.keys()) if not keys else keys:
extra_info.update({key: {"name": constants[key]["name"], "value": constants[key]["value"]}})
return json.dumps(extra_info, ensure_ascii=False)


@redis_inst_check
def push_task_to_queue(redis_cli, task, operation, node_id=None, data=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要加一个最大限制,以免打爆redis的队列,如果超出这个上限了,可以在触发执行任务的地方给用户返回错误,告知当前流程达到最大的执行等待上限

template_id = task.template_id
redis_key = f"task_wait_{template_id}"

task_data = {"operation": operation, "task_id": task.id}
if node_id:
task_data.update({"node_id": node_id})
if data:
task_data.update({"node_data": data})
task_json = json.dumps(task_data)
redis_cli.rpush(redis_key, task_json)
task.extra_info.update({"is_waiting": True})
task.save()
return True


@redis_inst_check
def process_task_from_queue(redis_cli, instance_id):
from bkflow.task.models import TaskInstance
from bkflow.task.operations import TaskNodeOperation, TaskOperation

template_id = TaskInstance.objects.get(instance_id=instance_id).template_id
redis_key = f"task_wait_{template_id}"
task_json = redis_cli.lpop(redis_key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里取出任务后,如果调度有问题,任务就丢了,需要加一下保证机制

if not task_json:
return None

task_data = json.loads(task_json)
operation = task_data.get("operation")
task_instance = TaskInstance.objects.get(id=task_data.get("task_id"))
if operation in ["start", "resume"]:
task_operation = TaskOperation(task_instance, settings.BKFLOW_MODULE.code)
operation_method = getattr(task_operation, operation, None)
else:
node_operation = TaskNodeOperation(task_instance, task_data.get("node_id"))
operation_method = getattr(node_operation, operation, None)

operation_method(operator=operation, **task_data.get("node_data", {}))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里确认下是否是一个同步操作,如果是同步操作,这里的任务从队列取出并执行的动作,不应该在bamboo_engine_eri_post_set_state_handler这个用于设置状态的逻辑里(而且这里会导致队列里的worker始终跟上一个无关任务在同一个worker中,使任务不够分散),应该重新把任务通过celery来分发出去执行

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在分发任务出去执行时,需要关注下trace的传递是正确的,否则对于这种api或者页面触发变成后台触发的任务,就会丢掉trace相关的信息了

task_instance.extra_info.update({"is_waiting": False})
task_instance.save()
return task_instance


def count_running_tasks(task_instance):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个实现还是太重了,并发量一大,每个任务都需要统计一遍当前task_instances的数据量,db扛不住的

from bkflow.task.models import TaskInstance
from bkflow.task.operations import TaskOperation

space_id = task_instance.space_id
template_id = task_instance.template_id
task_instances = TaskInstance.objects.filter(
space_id=space_id, template_id=template_id, is_deleted=False, is_started=True, is_finished=False
)

task_operations = [
{"task_id": task_instance.id, "operation": TaskOperation(task_instance=task_instance).get_task_states()}
for task_instance in task_instances
]

task_count = 0
for task_operation in task_operations:
if task_operation["operation"].result is False:
continue
if task_operation["operation"].data.get("state") == "RUNNING":
task_count += 1

return task_count
37 changes: 35 additions & 2 deletions bkflow/task/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
TaskOperationType,
TaskTriggerMethod,
)
from bkflow.contrib.api.collections.interface import InterfaceModuleClient
from bkflow.contrib.openapi.serializers import (
EmptyBodySerializer,
GetNodeDetailQuerySerializer,
Expand Down Expand Up @@ -68,6 +69,7 @@
TaskOperationRecordSerializer,
UpdatePeriodicTaskSerializer,
)
from bkflow.task.utils import count_running_tasks, push_task_to_queue
from bkflow.utils.handlers import handle_plain_log
from bkflow.utils.mixins import BKFLOWCommonMixin
from bkflow.utils.permissions import AdminPermission, AppInternalPermission
Expand Down Expand Up @@ -194,6 +196,21 @@ def operate(self, request, operation, *args, **kwargs):
template_id=task_instance.template_id,
executor=task_instance.executor,
):

interface_client = InterfaceModuleClient()
space_infos_result = interface_client.get_space_infos(
{"space_id": task_instance.space_id, "config_names": "concurrency_control"}
)
space_configs = space_infos_result.get("data", {}).get("configs", {})
concurrency_control = space_configs.get("concurrency_control", 0)

if (
concurrency_control
and count_running_tasks(task_instance) >= int(concurrency_control)
and operation in ["start", "resume"]
):
push_task_to_queue(settings.redis_inst, task_instance, operation)
return Response({"result": True, "data": None, "message": "success"})
task_operation = TaskOperation(task_instance=task_instance, queue=settings.BKFLOW_MODULE.code)
operation_method = getattr(task_operation, operation, None)
if operation_method is None:
Expand Down Expand Up @@ -222,12 +239,28 @@ def node_operate(self, request, node_id, operation, *args, **kwargs):
):
if task_instance.trigger_method == TaskTriggerMethod.subprocess.name and operation in ["skip", "retry"]:
task_instance.change_parent_task_node_state_to_running()
data = request.data
operator = data.pop("operator", request.user.username)
interface_client = InterfaceModuleClient()
space_infos_result = interface_client.get_space_infos(
{"space_id": task_instance.space_id, "config_names": "concurrency_control"}
)
space_configs = space_infos_result.get("data", {}).get("configs", {})
concurrency_control = space_configs.get("concurrency_control", 0)

if (
concurrency_control
and count_running_tasks(task_instance) >= int(concurrency_control)
and operation in ["skip", "retry"]
):
push_task_to_queue(settings.redis_inst, task_instance, operation, node_id, data)
return Response({"result": True, "data": None, "message": "success"})

node_operation = TaskNodeOperation(task_instance=task_instance, node_id=node_id)
operation_method = getattr(node_operation, operation, None)
if operation_method is None:
raise ValidationError("node operation not found")
data = request.data
operator = data.pop("operator", request.user.username)

operation_result = operation_method(operator=operator, **data)
return Response(dict(operation_result))

Expand Down
4 changes: 2 additions & 2 deletions tests/interface/space/test_space_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
class TestSpaceConfigHandler:
def test_get_all_configs(self):
configs = SpaceConfigHandler.get_all_configs()
assert len(configs) == 11
assert len(configs) == 12
configs = SpaceConfigHandler.get_all_configs(only_public=True)
assert len(configs) == 10
assert len(configs) == 11

def test_get_config(self):
# valid cases
Expand Down
Loading