diff --git a/frinx/common/util.py b/frinx/common/util.py index 4257ef4..4f8fd19 100644 --- a/frinx/common/util.py +++ b/frinx/common/util.py @@ -1,11 +1,15 @@ import json from typing import Any +from pydantic import BaseModel +from pydantic import ValidationError from requests import JSONDecodeError from requests import Response from frinx.common.type_aliases import DictAny +from frinx.common.type_aliases import DictStr from frinx.common.type_aliases import ListAny +from frinx.common.type_aliases import ListStr def jsonify_description( @@ -51,3 +55,71 @@ def parse_response(response: Response) -> DictAny | ListAny | str: return response.json() # type: ignore[no-any-return] except JSONDecodeError: return response.text + + +def json_parse(errors: ListStr | None = None, **kwargs: str) -> tuple[ListAny | DictAny | None, ListStr]: + """ + Parse json, returns object and errors list as tuple. + + args: + - errors: (optional), list of error messages, default is empty list + kwargs: + - =: name of json-object and json-formatted-string (optional, + but necessary for functionality) + e.g.: ip_addresses='["127.0.0.1", "127.0.0.2]', this approach enables to format to error + messages this way: 'Object `ip_addresses`: (Not JSON valid. ... .)' + returns: + parsed-json-object, errors-list + """ + def _format_err_msg(obj: str, msg: str) -> str: + return f'Object `{obj}`: ({msg})' + + json_string, object_name = next(iter(kwargs.values())), next(iter(kwargs.keys())) + errors = [] if errors is None else errors + + if not len(json_string): + errors.append(_format_err_msg(object_name, 'Cannot parse empty string.')) + return None, errors + try: + return json.loads(json_string), errors + except json.JSONDecodeError as e: + errors.append(_format_err_msg(object_name, f'Not JSON valid. {e.args[0]}.')) + return None, errors + + +def validate_structure(obj: DictAny, model: type[BaseModel], *, idx: int | None = None, +properties: DictStr = {}, errors: ListStr | None = None) -> list[str]: + """ + Validate structure of object based on pydantic model, return errors if occurs. + + args: + - obj: (required), python object (parsed json) to compare with pydantic model + - model: (required), pydantic model to use as structure template for obj + kwargs: + - idx: (optional), index represents position of nested object in list, used in _format_err_msg + - properties: (optional), dictionary of model properties and expected values of properties, + enables value checking of obj + - errors: (optional), list of error messages, default is empty list + returns: + errors-list + """ + + def _format_err_msg(prop: str, obj: str, msg: str) -> str: + return f'Property `{prop}` of `{obj}{{}}`, ({msg})'.format(f'[{idx}]' if idx or idx == 0 else '') + + errors = [] if errors is None else errors + try: + model.parse_obj(obj) + except ValidationError as validation_err: + # TODO: how to handle strict and extra ? + # maybe skip and print in log as WARNING (extra) + # NOTE: try to use pydantic strict types if needed, v1 doesn't support BaseConfig.strict + for err in validation_err.errors(): + errors.append(_format_err_msg(str(err['loc'][0]), model.__class__.__name__, err['msg'])) + + for key, value in properties.items(): + if obj.get(key, None) != value: + errors.append( + _format_err_msg(key, model.__class__.__name__, f'expected value `{value}`, get `{obj[key]}`') + ) + return errors diff --git a/frinx/common/workflow/task.py b/frinx/common/workflow/task.py index 0865fde..757d165 100644 --- a/frinx/common/workflow/task.py +++ b/frinx/common/workflow/task.py @@ -125,6 +125,7 @@ class Config: @root_validator(pre=True) def check_input_values(cls, values: dict[str, Any]) -> Any: + # FIXME: AttributeError: 'str' object has no attribute '__fields__' values['dynamic_tasks'] = values['dynamic_tasks'].__fields__['name'].default return values diff --git a/frinx/workers/utils_workers/utils_service.py b/frinx/workers/utils_workers/utils_service.py new file mode 100644 index 0000000..d636d8b --- /dev/null +++ b/frinx/workers/utils_workers/utils_service.py @@ -0,0 +1,110 @@ +from typing import Any + +from pydantic import BaseModel +from pydantic import Extra +from pydantic.types import StrictInt +from pydantic.types import StrictStr + +from frinx.common.conductor_enums import TaskResultStatus +from frinx.common.type_aliases import DictAny +from frinx.common.type_aliases import ListAny +from frinx.common.type_aliases import ListStr +from frinx.common.util import json_parse +from frinx.common.util import snake_to_camel_case +from frinx.common.util import validate_structure +from frinx.common.worker.service import ServiceWorkersImpl +from frinx.common.worker.task_def import TaskDefinition +from frinx.common.worker.task_def import TaskInput +from frinx.common.worker.task_def import TaskOutput +from frinx.common.worker.task_result import TaskResult +from frinx.common.worker.worker import WorkerImpl + + +# TODO: move to other file, e.g. local/util.py +class DynamicTask(BaseModel): + class _SubWorkflowParam(BaseModel): + name: StrictStr + version: StrictInt + + class Config: # type: ignore[pydantic-alias] + alias_generator = snake_to_camel_case + extra = Extra.forbid + + name: StrictStr + task_reference_name: StrictStr + type: StrictStr + sub_workflow_param: _SubWorkflowParam + + +class UtilsService(ServiceWorkersImpl): + + class JSONParse(WorkerImpl): + + class WorkerDefinition(TaskDefinition): + name: str = 'JSON_parse' + description: str = 'Returns object from JSON or errors if occurs.' + labels: ListStr = ['UTILS'] + + class WorkerInput(TaskInput): + input: str + + class WorkerOutput(TaskOutput): + result: DictAny | ListAny | None + errors: ListStr | None + + def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: + result, errors = json_parse(input=worker_input.input) + + return TaskResult( + status=TaskResultStatus.COMPLETED, + output=self.WorkerOutput( + result=result, + errors=errors or None + ) + ) + + class ForkJoinInputValidator(WorkerImpl): + + class WorkerDefinition(TaskDefinition): + name: str = 'fork_join_input_validator' + description: str = 'Input validator for Dynamic_Fork / 1.' + labels: ListStr = ['UTILS'] + + class WorkerInput(TaskInput): + dynamic_tasks: str + expected_type: str + expected_name: str + dynamic_tasks_input: str + + class WorkerOutput(TaskOutput): + result: DictAny | ListAny | None + errors: ListStr | None + + def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: + # TODO: validate also dynamic_tasks_input + dynamic_tasks, errors = json_parse(dynamic_tasks=worker_input.dynamic_tasks) + dynamic_tasks_input, errors = json_parse( + errors, dynamic_tasks_input=worker_input.dynamic_tasks_input) + + if dynamic_tasks: + for idx, dyn_task in enumerate(dynamic_tasks): + errors = validate_structure( + dyn_task, DynamicTask, + properties={ + 'type': worker_input.expected_type, + 'name': worker_input.expected_name + }, + errors=errors, + idx=idx + ) + + return TaskResult( + status=TaskResultStatus.COMPLETED, + output=self.WorkerOutput( + result={ + 'dynamic_tasks': dynamic_tasks, + 'dynamic_tasks_input': dynamic_tasks_input + } if not errors else None, + errors=errors or None + ) + ) diff --git a/frinx/workflows/dynamic_fork/__init__.py b/frinx/workflows/dynamic_fork/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py b/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py new file mode 100644 index 0000000..cd24066 --- /dev/null +++ b/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py @@ -0,0 +1,109 @@ +from frinx.common.conductor_enums import WorkflowStatus +from frinx.common.type_aliases import ListStr +from frinx.common.workflow.service import ServiceWorkflowsImpl +from frinx.common.workflow.task import DecisionTask +from frinx.common.workflow.task import DecisionTaskInputParameters +from frinx.common.workflow.task import DynamicForkTask +from frinx.common.workflow.task import DynamicForkTaskInputParameters +from frinx.common.workflow.task import JoinTask +from frinx.common.workflow.task import SimpleTask +from frinx.common.workflow.task import SimpleTaskInputParameters +from frinx.common.workflow.task import TerminateTask +from frinx.common.workflow.task import TerminateTaskInputParameters +from frinx.common.workflow.workflow import FrontendWFInputFieldType +from frinx.common.workflow.workflow import WorkflowImpl +from frinx.common.workflow.workflow import WorkflowInputField +from frinx.workers.utils_workers.utils_service import UtilsService + + +class DynamicForkWFService(ServiceWorkflowsImpl): + + class DynamicFork(WorkflowImpl): + name: str = 'Dynamic_fork' + description: str = 'A dynamic fork + join task with input validation' + version: int = 1 + labels: ListStr = ['BASICS', 'UTILS'] + + class WorkflowInput(WorkflowImpl.WorkflowInput): + dynamic_tasks: WorkflowInputField = WorkflowInputField( + name='dynamic_tasks', + description='Tasks or sub-workflows running in parallel.', + type=FrontendWFInputFieldType.TEXTAREA, + frontend_default_value=None + ) + + expected_name: WorkflowInputField = WorkflowInputField( + name='expectedName', + description='Expected name of sub-tasks or sub-workflows.', + type=FrontendWFInputFieldType.STRING, + frontend_default_value=None + ) + + expected_type: WorkflowInputField = WorkflowInputField( + name='expectedType', + description='Expected type of sub-tasks or sub-workflows.', + type=FrontendWFInputFieldType.STRING, + frontend_default_value=None + ) + + dynamic_tasks_input: WorkflowInputField = WorkflowInputField( + name='dynamic_tasks_input', + description='Inputs to sub-tasks or sub-workflows.', + type=FrontendWFInputFieldType.TEXTAREA, + frontend_default_value=None + ) + + class WorkflowOutput(WorkflowImpl.WorkflowOutput): + status: WorkflowStatus + + def workflow_builder(self, workflow_inputs: WorkflowInput) -> None: + validation = SimpleTask( + name=UtilsService.ForkJoinInputValidator, + task_reference_name='validation', + input_parameters=SimpleTaskInputParameters( + dynamic_tasks=workflow_inputs.dynamic_tasks.wf_input, + expected_name=workflow_inputs.expected_name.wf_input, + expected_type=workflow_inputs.expected_type.wf_input, + dynamic_tasks_input=workflow_inputs.dynamic_tasks_input.wf_input + ) + ) + + termination = TerminateTask( + name='terminateTask', + task_reference_name='termination', + input_parameters=TerminateTaskInputParameters( + termination_status=WorkflowStatus.FAILED, + termination_reason=validation.output_ref('errors'), + workflow_output={'output': validation.output_ref('errors')} + ) + ) + + dynamic_fork = DynamicForkTask( + name='dynamicForkTask', + task_reference_name='dynamic_fork', + dynamic_fork_tasks_param='dynamic_tasks', + dynamic_fork_tasks_input_param_name='dynamic_tasks_input', + input_parameters=DynamicForkTaskInputParameters( + dynamic_tasks=validation.output_ref('result.dynamic_tasks'), + dynamic_tasks_input=validation.output_ref('result.dynamic_tasks_input') + ) + ) + + decision = DecisionTask( + name='decisionTask', + task_reference_name='decision', + default_case=[ + dynamic_fork, + JoinTask( + name='joinTask', + task_reference_name='join' + ) + ], + decision_cases={'termination': [termination]}, + case_expression='$.errors ? "termination" : "default"', + input_parameters=DecisionTaskInputParameters( + errors=validation.output_ref('errors') + ) + ) + + self.tasks = [validation, decision]