From 12e7e826cb014b2a3b4fa92e45cad6217510482e Mon Sep 17 00:00:00 2001 From: Martin Revaj Date: Tue, 22 Aug 2023 11:15:08 +0200 Subject: [PATCH 1/4] input_handler, fork-join-wf added --- frinx/common/workflow/task.py | 1 + .../wf_input_handlers_service.py | 115 ++++++++++++++++ frinx/workflows/dynamic_fork/__init__.py | 0 .../dynamic_fork/dynamic_fork_workflows.py | 126 ++++++++++++++++++ main.py | 53 ++++++++ 5 files changed, 295 insertions(+) create mode 100644 frinx/workers/wf_input_handlers/wf_input_handlers_service.py create mode 100644 frinx/workflows/dynamic_fork/__init__.py create mode 100644 frinx/workflows/dynamic_fork/dynamic_fork_workflows.py create mode 100644 main.py diff --git a/frinx/common/workflow/task.py b/frinx/common/workflow/task.py index 0865fde..757d165 100644 --- a/frinx/common/workflow/task.py +++ b/frinx/common/workflow/task.py @@ -125,6 +125,7 @@ class Config: @root_validator(pre=True) def check_input_values(cls, values: dict[str, Any]) -> Any: + # FIXME: AttributeError: 'str' object has no attribute '__fields__' values['dynamic_tasks'] = values['dynamic_tasks'].__fields__['name'].default return values diff --git a/frinx/workers/wf_input_handlers/wf_input_handlers_service.py b/frinx/workers/wf_input_handlers/wf_input_handlers_service.py new file mode 100644 index 0000000..00db69d --- /dev/null +++ b/frinx/workers/wf_input_handlers/wf_input_handlers_service.py @@ -0,0 +1,115 @@ +import json +from typing import Any + +from frinx.common.type_aliases import ListStr +from frinx.common.type_aliases import ListAny +from frinx.common.type_aliases import DictAny +from frinx.common.worker.service import ServiceWorkersImpl +from frinx.common.worker.task_result import TaskResult +from frinx.common.conductor_enums import TaskResultStatus +from frinx.common.worker.worker import WorkerImpl +from frinx.common.worker.task_def import TaskExecutionProperties +from frinx.common.worker.task_def import TaskDefinition +from frinx.common.worker.task_def import TaskInput +from frinx.common.worker.task_def import TaskOutput + + +# TODO: rename +class WFInputHandlersService(ServiceWorkersImpl): + + class JSONTransformTask(WorkerImpl): + + class WorkerDefinition(TaskDefinition): + name: str = 'JSON_transform_task' + description: str = 'Returns an object from JSON string.' + labels: ListStr = ['UTILS'] + + class WorkerInput(TaskInput): + input: str + + class WorkerOutput(TaskOutput): + result: DictAny | ListAny | None + errors: ListStr | None + + def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: + result, errors = None, None + if worker_input.input: + try: + result = json.loads(worker_input.input) + except json.JSONDecodeError: + errors = ['Property < input > is JSON invalid format.'] + else: + errors = ['Cannot parse empty string.'] + + return TaskResult( + status=TaskResultStatus.COMPLETED, + output=self.WorkerOutput(result=result, errors=errors) + ) + + class ForkJoinWFInputHandler(WorkerImpl): + + class WorkerDefinition(TaskDefinition): + name: str = 'fork_join_wf_input_handler' + description: str = 'Input handler and validator for Dynamic_Fork_Join workflow.' + labels: ListStr = ['UTILS'] + + class WorkerInput(TaskInput): + dynamic_tasks: str + expected_type: str + expected_name: str + dynamic_tasks_input: str + + class WorkerOutput(TaskOutput): + result: DictAny | ListAny | None + errors: ListStr | None + + # TODO: REFACTOR !!! + def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: + # def _unixp_err(name: str, type: str) -> str: + # return f'Dynamic tasks contain unexpected name "{name}" or expected type "{type}"' + + _unixp_err = lambda n, t: \ + f'Dynamic tasks contain unexpected name < {n} > or expected type < {t} >' + + errors = [] + + if worker_input.dynamic_tasks: + try: + dynamic_tasks = json.loads(worker_input.dynamic_tasks) + except json.JSONDecodeError: + errors.append('Property < dynamic_tasks > is JSON invalid format.') + else: + errors.append('Cannot parse empty string, property < dynamic_tasks > is empty.') + if worker_input.dynamic_tasks_input: + try: + dynamic_tasks_input = json.loads(worker_input.dynamic_tasks_input) + except json.JSONDecodeError: + errors.append('Property < dynamic_tasks_input > is JSON invalid format.') + else: + errors.append('Cannot parse empty string, property < dynamic_tasks_input > is empty.') + try: + for task in dynamic_tasks: + match task['name'], task['type']: + case worker_input.expected_name, worker_input.expected_type: + continue + case _: + errors.append( + _unixp_err( + worker_input.expected_name, + worker_input.expected_type + ) + ) + break + except KeyError as err: + errors.append(f'Missing property < {err.args[0]} > in < dynamic_tasks >.') + + return TaskResult( + status=TaskResultStatus.COMPLETED, + output=self.WorkerOutput( + result={ + 'dynamic_tasks': dynamic_tasks, + 'dynamic_tasks_input': dynamic_tasks_input + } if not errors else None, + errors=errors or None + ) + ) diff --git a/frinx/workflows/dynamic_fork/__init__.py b/frinx/workflows/dynamic_fork/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/frinx/workflows/dynamic_fork/dynamic_fork_workflows.py b/frinx/workflows/dynamic_fork/dynamic_fork_workflows.py new file mode 100644 index 0000000..03a2712 --- /dev/null +++ b/frinx/workflows/dynamic_fork/dynamic_fork_workflows.py @@ -0,0 +1,126 @@ +from frinx.common.conductor_enums import WorkflowStatus +from frinx.common.type_aliases import ListStr +from frinx.common.workflow.service import ServiceWorkflowsImpl +from frinx.common.workflow.task import TerminateTask +from frinx.common.workflow.task import TerminateTaskInputParameters +from frinx.common.workflow.workflow import FrontendWFInputFieldType +from frinx.common.workflow.workflow import WorkflowImpl +from frinx.common.workflow.workflow import WorkflowInputField +from frinx.common.workflow.task import SimpleTask +from frinx.common.workflow.task import SimpleTaskInputParameters +from frinx.common.workflow.task import DecisionTask +from frinx.common.workflow.task import DecisionTaskInputParameters +from frinx.common.workflow.task import DynamicForkTask +from frinx.common.workflow.task import DynamicForkTaskInputParameters +from frinx.common.workflow.task import JoinTask +from frinx.workers.wf_input_handlers.wf_input_handlers_service import WFInputHandlersService + + +# TODO: refactor +class DynamicForkWFService(ServiceWorkflowsImpl): + + class DynamicFork(WorkflowImpl): + name: str = 'Dynamic_fork' + description: str = 'A dynamic fork + join task with input validation' + version: int = 1 + labels: ListStr = ['BASICS', 'UTILS'] + + class WorkflowInput(WorkflowImpl.WorkflowInput): + dynamic_tasks: WorkflowInputField = WorkflowInputField( + name='dynamic_tasks', + description='Tasks or sub-workflows running in parallel.', + type=FrontendWFInputFieldType.TEXTAREA, + frontend_default_value=None + ) + + expected_name: WorkflowInputField = WorkflowInputField( + name='expectedName', + description='Expected name of sub-tasks or sub-workflows.', + type=FrontendWFInputFieldType.STRING, + frontend_default_value=None + ) + + expected_type: WorkflowInputField = WorkflowInputField( + name='expectedType', + description='Expected type of sub-tasks or sub-workflows.', + type=FrontendWFInputFieldType.STRING, + frontend_default_value=None + ) + + dynamic_tasks_input: WorkflowInputField = WorkflowInputField( + name='dynamic_tasks_input', + description='Inputs to sub-tasks or sub-workflows.', + type=FrontendWFInputFieldType.TEXTAREA, + frontend_default_value=None + ) + + class WorkflowOutput(WorkflowImpl.WorkflowOutput): + status: WorkflowStatus + + def workflow_builder(self, workflow_inputs: WorkflowInput) -> None: + # NOTE: becouse output object wrapper in conductor frontend a.k.a. 'result' (Object) + # can't be returned like origin type, validation do worker instead of lambda-task + # e.g. Array always returns like Object + + validation = SimpleTask( + name=WFInputHandlersService.ForkJoinWFInputHandler, + task_reference_name='validation', + input_parameters=SimpleTaskInputParameters( + dynamic_tasks=workflow_inputs.dynamic_tasks.wf_input, + expected_name=workflow_inputs.expected_name.wf_input, + expected_type=workflow_inputs.expected_type.wf_input, + dynamic_tasks_input=workflow_inputs.dynamic_tasks_input.wf_input + ) + ) + + termination = TerminateTask( + name='terminateTask', + task_reference_name='termination', + input_parameters=TerminateTaskInputParameters( + termination_status=WorkflowStatus.FAILED, + termination_reason=validation.output_ref('errors'), + workflow_output={'output': validation.output_ref('errors')} + ) + ) + + dynamic_fork = DynamicForkTask( + name='dynamicForkTask', + task_reference_name='dynamic_fork', + dynamic_fork_tasks_param='dynamic_tasks', + dynamic_fork_tasks_input_param_name='dynamic_tasks_input', + input_parameters=DynamicForkTaskInputParameters( + dynamic_tasks=validation.output_ref('result.dynamic_tasks'), + dynamic_tasks_input=validation.output_ref('result.dynamic_tasks_input') + ) + ) + + join = JoinTask( + name='joinTask', + task_reference_name='join' + ) + + def _js_case_expression() -> str: + return """ + $.errors ? 'termination' : 'default' + """ + + decision = DecisionTask( + name='decisionTask', + task_reference_name='decision', + default_case=[ + dynamic_fork, + join, + ], + decision_cases={ + 'termination': [termination] + }, + case_expression=_js_case_expression(), + input_parameters=DecisionTaskInputParameters( + errors=validation.output_ref('errors') + ) + ) + + self.tasks = [ + validation, + decision, + ] diff --git a/main.py b/main.py new file mode 100644 index 0000000..da263d4 --- /dev/null +++ b/main.py @@ -0,0 +1,53 @@ +import os + +os.environ['UNICONFIG_URL_BASE'] = 'http://localhost/api/uniconfig' +os.environ['CONDUCTOR_URL_BASE'] = 'http://127.0.0.1:8088/proxy/api' +os.environ['INVENTORY_URL_BASE'] = 'http://localhost/api/inventory' +os.environ['INFLUXDB_URL_BASE'] = 'http://localhost:8086' +os.environ['RESOURCE_MANAGER_URL_BASE'] = 'http://localhost/api/resource' + +import logging + +from frinx.workers.http_workers.http_workers import HTTPWorkersService +from frinx.workers.wf_input_handlers.wf_input_handlers_service import WFInputHandlersService +from frinx.workflows.http_workflows.http_workflows_service import HTTPWorkflowService +from frinx.workflows.dynamic_fork.dynamic_fork_workflows import DynamicForkWFService +from frinx.common.logging import logging_common +from frinx.common.logging.logging_common import LoggerConfig +from frinx.common.logging.logging_common import Root +from frinx.client.frinx_conductor_wrapper import FrinxConductorWrapper +from frinx.common.frinx_rest import CONDUCTOR_URL_BASE +from frinx.common.frinx_rest import CONDUCTOR_HEADERS + + +def register_tasks(conductor_client): + logging.info('Register HTTP workers') + HTTPWorkersService().register(conductor_client) + logging.info('Register UTIL workers') + WFInputHandlersService().register(conductor_client) + + +def register_workflows(): + logging.info('Register HTTP workflows') + HTTPWorkflowService().register(overwrite=True) + logging.info('Register UTIL workflows') + DynamicForkWFService().register(overwrite=True) + + +def main(): + logging_common.configure_logging( + LoggerConfig(root=Root(level=os.environ.get('LOG_LEVEL', 'INFO').upper(), handlers=['console'])) + ) + conductor_client = FrinxConductorWrapper( + server_url=CONDUCTOR_URL_BASE, + polling_interval=0.1, + max_thread_count=50, + headers=CONDUCTOR_HEADERS, + ) + register_tasks(conductor_client) + register_workflows() + conductor_client.start_workers() + + +if __name__ == '__main__': + main() From d083d17d72deda79e0ce6e8faaff775d73fcbd6e Mon Sep 17 00:00:00 2001 From: Martin Revaj Date: Tue, 22 Aug 2023 23:23:38 +0200 Subject: [PATCH 2/4] refactor, helpers for utils_workers --- frinx/common/util.py | 49 ++++++++ frinx/workers/utils_workers/utils_service.py | 110 +++++++++++++++++ .../wf_input_handlers_service.py | 115 ------------------ ...s.py => dynamic_fork_workflows_service.py} | 35 ++---- main.py | 8 +- 5 files changed, 172 insertions(+), 145 deletions(-) create mode 100644 frinx/workers/utils_workers/utils_service.py delete mode 100644 frinx/workers/wf_input_handlers/wf_input_handlers_service.py rename frinx/workflows/dynamic_fork/{dynamic_fork_workflows.py => dynamic_fork_workflows_service.py} (82%) diff --git a/frinx/common/util.py b/frinx/common/util.py index 4257ef4..b3db094 100644 --- a/frinx/common/util.py +++ b/frinx/common/util.py @@ -1,11 +1,14 @@ import json from typing import Any +from pydantic import BaseModel +from pydantic import ValidationError from requests import JSONDecodeError from requests import Response from frinx.common.type_aliases import DictAny from frinx.common.type_aliases import ListAny +from frinx.common.type_aliases import ListStr def jsonify_description( @@ -51,3 +54,49 @@ def parse_response(response: Response) -> DictAny | ListAny | str: return response.json() # type: ignore[no-any-return] except JSONDecodeError: return response.text + + +def json_parse(errors: list | None = None, **kwargs: str) -> tuple[str | None, ListStr]: + """ + Safely parse json, returns tuple of object and errors list. + """ + + FORMAT_ERR_MSG = lambda obj, msg: f'Object `{obj}`: ({msg})' + json_string, object_name = next(iter(kwargs.values())), next(iter(kwargs.keys())) + errors = [] if errors is None else errors + + if not len(json_string): + errors.append(FORMAT_ERR_MSG(object_name, 'Cannot parse empty string.')) + return None, errors + try: + return json.loads(json_string), errors + except json.JSONDecodeError as e: + errors.append(FORMAT_ERR_MSG(object_name, f'Not JSON valid. {e.args[0]}.')) + return None, errors + + +def validate_structure(obj: DictAny, model: BaseModel, *, idx: int | None = None, +properties: dict = {}, errors: list | None = None) -> list[str]: + """ + Validate structure of object based on pydantic model, return errors if occures. + """ + + FORMAT_ERR_MSG = lambda prop, obj, msg:\ + f'Property `{prop}` of `{obj}{{}}`, ({msg})'.format(f'[{idx}]' if idx or idx==0 else '') + errors = [] if errors is None else errors + + try: + model.parse_obj(obj) + except ValidationError as validation_err: + # TODO: how to handle strict and extra ? + # maybe skip and print in log as WARNING (extra) + # NOTE: try to use pydantic strict types if needed, v1 doesn't support BaseConfig.strict + for err in validation_err.errors(): + errors.append(FORMAT_ERR_MSG(err['loc'][0], model.__name__, err['msg'])) + + for key, value in properties.items(): + if obj.get(key, None) != value: + errors.append( + FORMAT_ERR_MSG(key, model.__name__, f'expected value `{value}`, get `{obj[key]}`') + ) + return errors diff --git a/frinx/workers/utils_workers/utils_service.py b/frinx/workers/utils_workers/utils_service.py new file mode 100644 index 0000000..73a2acf --- /dev/null +++ b/frinx/workers/utils_workers/utils_service.py @@ -0,0 +1,110 @@ +from typing import Any + +from pydantic import BaseModel +from pydantic import Extra +from pydantic.types import StrictStr +from pydantic.types import StrictInt + +from frinx.common.type_aliases import ListStr +from frinx.common.type_aliases import ListAny +from frinx.common.type_aliases import DictAny +from frinx.common.worker.service import ServiceWorkersImpl +from frinx.common.worker.task_result import TaskResult +from frinx.common.conductor_enums import TaskResultStatus +from frinx.common.worker.worker import WorkerImpl +from frinx.common.worker.task_def import TaskDefinition +from frinx.common.worker.task_def import TaskInput +from frinx.common.worker.task_def import TaskOutput +from frinx.common.util import json_parse +from frinx.common.util import validate_structure +from frinx.common.util import snake_to_camel_case + + +# TODO: move to other file, e.g. local/util.py +class DynamicTask(BaseModel): + class _SubWorkflowParam(BaseModel): + name: StrictStr + version: StrictInt + + class Config: + alias_generator = snake_to_camel_case + extra = Extra.forbid + + name: StrictStr + task_reference_name: StrictStr + type: StrictStr + sub_workflow_param: _SubWorkflowParam + + +class UtilsService(ServiceWorkersImpl): + + class JSONParse(WorkerImpl): + + class WorkerDefinition(TaskDefinition): + name: str = 'JSON_parse' + description: str = 'Returns object from JSON or errors if occures.' + labels: ListStr = ['UTILS'] + + class WorkerInput(TaskInput): + input: str + + class WorkerOutput(TaskOutput): + result: DictAny | ListAny | None + errors: ListStr | None + + def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: + result, errors = json_parse(input=worker_input.input) + + return TaskResult( + status=TaskResultStatus.COMPLETED, + output=self.WorkerOutput( + result=result, + errors=errors or None + ) + ) + + class ForkJoinInputValidator(WorkerImpl): + + class WorkerDefinition(TaskDefinition): + name: str = 'fork_join_input_validator' + description: str = 'Input validator for Dynamic_Fork / 1.' + labels: ListStr = ['UTILS'] + + class WorkerInput(TaskInput): + dynamic_tasks: str + expected_type: str + expected_name: str + dynamic_tasks_input: str + + class WorkerOutput(TaskOutput): + result: DictAny | ListAny | None + errors: ListStr | None + + def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: + # TODO: validate also dynamic_tasks_input + dynamic_tasks, errors = json_parse(dynamic_tasks=worker_input.dynamic_tasks) + dynamic_tasks_input, errors = json_parse( + errors, dynamic_tasks_input=worker_input.dynamic_tasks_input) + + if dynamic_tasks: + for idx, dyn_task in enumerate(dynamic_tasks): + errors = validate_structure( + dyn_task, DynamicTask, + properties={ + 'type': worker_input.expected_type, + 'name': worker_input.expected_name + }, + errors=errors, + idx=idx + ) + + return TaskResult( + status=TaskResultStatus.COMPLETED, + output=self.WorkerOutput( + result={ + 'dynamic_tasks': dynamic_tasks, + 'dynamic_tasks_input': dynamic_tasks_input + } if not errors else None, + errors=errors or None + ) + ) diff --git a/frinx/workers/wf_input_handlers/wf_input_handlers_service.py b/frinx/workers/wf_input_handlers/wf_input_handlers_service.py deleted file mode 100644 index 00db69d..0000000 --- a/frinx/workers/wf_input_handlers/wf_input_handlers_service.py +++ /dev/null @@ -1,115 +0,0 @@ -import json -from typing import Any - -from frinx.common.type_aliases import ListStr -from frinx.common.type_aliases import ListAny -from frinx.common.type_aliases import DictAny -from frinx.common.worker.service import ServiceWorkersImpl -from frinx.common.worker.task_result import TaskResult -from frinx.common.conductor_enums import TaskResultStatus -from frinx.common.worker.worker import WorkerImpl -from frinx.common.worker.task_def import TaskExecutionProperties -from frinx.common.worker.task_def import TaskDefinition -from frinx.common.worker.task_def import TaskInput -from frinx.common.worker.task_def import TaskOutput - - -# TODO: rename -class WFInputHandlersService(ServiceWorkersImpl): - - class JSONTransformTask(WorkerImpl): - - class WorkerDefinition(TaskDefinition): - name: str = 'JSON_transform_task' - description: str = 'Returns an object from JSON string.' - labels: ListStr = ['UTILS'] - - class WorkerInput(TaskInput): - input: str - - class WorkerOutput(TaskOutput): - result: DictAny | ListAny | None - errors: ListStr | None - - def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: - result, errors = None, None - if worker_input.input: - try: - result = json.loads(worker_input.input) - except json.JSONDecodeError: - errors = ['Property < input > is JSON invalid format.'] - else: - errors = ['Cannot parse empty string.'] - - return TaskResult( - status=TaskResultStatus.COMPLETED, - output=self.WorkerOutput(result=result, errors=errors) - ) - - class ForkJoinWFInputHandler(WorkerImpl): - - class WorkerDefinition(TaskDefinition): - name: str = 'fork_join_wf_input_handler' - description: str = 'Input handler and validator for Dynamic_Fork_Join workflow.' - labels: ListStr = ['UTILS'] - - class WorkerInput(TaskInput): - dynamic_tasks: str - expected_type: str - expected_name: str - dynamic_tasks_input: str - - class WorkerOutput(TaskOutput): - result: DictAny | ListAny | None - errors: ListStr | None - - # TODO: REFACTOR !!! - def execute(self, worker_input: WorkerInput) -> TaskResult[Any]: - # def _unixp_err(name: str, type: str) -> str: - # return f'Dynamic tasks contain unexpected name "{name}" or expected type "{type}"' - - _unixp_err = lambda n, t: \ - f'Dynamic tasks contain unexpected name < {n} > or expected type < {t} >' - - errors = [] - - if worker_input.dynamic_tasks: - try: - dynamic_tasks = json.loads(worker_input.dynamic_tasks) - except json.JSONDecodeError: - errors.append('Property < dynamic_tasks > is JSON invalid format.') - else: - errors.append('Cannot parse empty string, property < dynamic_tasks > is empty.') - if worker_input.dynamic_tasks_input: - try: - dynamic_tasks_input = json.loads(worker_input.dynamic_tasks_input) - except json.JSONDecodeError: - errors.append('Property < dynamic_tasks_input > is JSON invalid format.') - else: - errors.append('Cannot parse empty string, property < dynamic_tasks_input > is empty.') - try: - for task in dynamic_tasks: - match task['name'], task['type']: - case worker_input.expected_name, worker_input.expected_type: - continue - case _: - errors.append( - _unixp_err( - worker_input.expected_name, - worker_input.expected_type - ) - ) - break - except KeyError as err: - errors.append(f'Missing property < {err.args[0]} > in < dynamic_tasks >.') - - return TaskResult( - status=TaskResultStatus.COMPLETED, - output=self.WorkerOutput( - result={ - 'dynamic_tasks': dynamic_tasks, - 'dynamic_tasks_input': dynamic_tasks_input - } if not errors else None, - errors=errors or None - ) - ) diff --git a/frinx/workflows/dynamic_fork/dynamic_fork_workflows.py b/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py similarity index 82% rename from frinx/workflows/dynamic_fork/dynamic_fork_workflows.py rename to frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py index 03a2712..27f57ef 100644 --- a/frinx/workflows/dynamic_fork/dynamic_fork_workflows.py +++ b/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py @@ -13,10 +13,9 @@ from frinx.common.workflow.task import DynamicForkTask from frinx.common.workflow.task import DynamicForkTaskInputParameters from frinx.common.workflow.task import JoinTask -from frinx.workers.wf_input_handlers.wf_input_handlers_service import WFInputHandlersService +from frinx.workers.utils_workers.utils_service import UtilsService -# TODO: refactor class DynamicForkWFService(ServiceWorkflowsImpl): class DynamicFork(WorkflowImpl): @@ -58,12 +57,8 @@ class WorkflowOutput(WorkflowImpl.WorkflowOutput): status: WorkflowStatus def workflow_builder(self, workflow_inputs: WorkflowInput) -> None: - # NOTE: becouse output object wrapper in conductor frontend a.k.a. 'result' (Object) - # can't be returned like origin type, validation do worker instead of lambda-task - # e.g. Array always returns like Object - validation = SimpleTask( - name=WFInputHandlersService.ForkJoinWFInputHandler, + name=UtilsService.ForkJoinInputValidator, task_reference_name='validation', input_parameters=SimpleTaskInputParameters( dynamic_tasks=workflow_inputs.dynamic_tasks.wf_input, @@ -94,33 +89,21 @@ def workflow_builder(self, workflow_inputs: WorkflowInput) -> None: ) ) - join = JoinTask( - name='joinTask', - task_reference_name='join' - ) - - def _js_case_expression() -> str: - return """ - $.errors ? 'termination' : 'default' - """ - decision = DecisionTask( name='decisionTask', task_reference_name='decision', default_case=[ dynamic_fork, - join, + JoinTask( + name='joinTask', + task_reference_name='join' + ) ], - decision_cases={ - 'termination': [termination] - }, - case_expression=_js_case_expression(), + decision_cases={'termination': [termination]}, + case_expression='$.errors ? "termination" : "default"', input_parameters=DecisionTaskInputParameters( errors=validation.output_ref('errors') ) ) - self.tasks = [ - validation, - decision, - ] + self.tasks = [validation, decision] diff --git a/main.py b/main.py index da263d4..5f00171 100644 --- a/main.py +++ b/main.py @@ -9,9 +9,9 @@ import logging from frinx.workers.http_workers.http_workers import HTTPWorkersService -from frinx.workers.wf_input_handlers.wf_input_handlers_service import WFInputHandlersService +from frinx.workers.utils_workers.utils_service import UtilsService from frinx.workflows.http_workflows.http_workflows_service import HTTPWorkflowService -from frinx.workflows.dynamic_fork.dynamic_fork_workflows import DynamicForkWFService +from frinx.workflows.dynamic_fork.dynamic_fork_workflows_service import DynamicForkWFService from frinx.common.logging import logging_common from frinx.common.logging.logging_common import LoggerConfig from frinx.common.logging.logging_common import Root @@ -23,8 +23,8 @@ def register_tasks(conductor_client): logging.info('Register HTTP workers') HTTPWorkersService().register(conductor_client) - logging.info('Register UTIL workers') - WFInputHandlersService().register(conductor_client) + logging.info('Register UTILS workers') + UtilsService().register(conductor_client) def register_workflows(): From 4132b516f7add5f510d22629e7a8f1e6ab9f12de Mon Sep 17 00:00:00 2001 From: Martin Revaj Date: Wed, 23 Aug 2023 10:59:29 +0200 Subject: [PATCH 3/4] mypy, ruff fixes, docs --- frinx/common/util.py | 49 ++++++++++++----- frinx/workers/utils_workers/utils_service.py | 20 +++---- .../dynamic_fork_workflows_service.py | 14 ++--- main.py | 53 ------------------- 4 files changed, 53 insertions(+), 83 deletions(-) delete mode 100644 main.py diff --git a/frinx/common/util.py b/frinx/common/util.py index b3db094..8d0be9d 100644 --- a/frinx/common/util.py +++ b/frinx/common/util.py @@ -7,6 +7,7 @@ from requests import Response from frinx.common.type_aliases import DictAny +from frinx.common.type_aliases import DictStr from frinx.common.type_aliases import ListAny from frinx.common.type_aliases import ListStr @@ -56,35 +57,57 @@ def parse_response(response: Response) -> DictAny | ListAny | str: return response.text -def json_parse(errors: list | None = None, **kwargs: str) -> tuple[str | None, ListStr]: +def json_parse(errors: ListStr | None = None, **kwargs: str) -> tuple[ListAny | DictAny | None, ListStr]: """ - Safely parse json, returns tuple of object and errors list. + Parse json, returns object and errors list as tuple. + + args: + - errors: (optional), list of error messages, default is empty list + kwargs: + - =: name of json-object and json-formatted-string (optional, + but neccessary for functionality) + e.g.: ip_addresses='["127.0.0.1", "127.0.0.2]', this approach enables to format to error + messages this way: 'Object `ip_addresses`: (Not JSON valid. ... .)' + returns: + parsed-json-object, errors-list """ - - FORMAT_ERR_MSG = lambda obj, msg: f'Object `{obj}`: ({msg})' + def _format_err_msg(obj: str, msg: str) -> str: + return f'Object `{obj}`: ({msg})' + json_string, object_name = next(iter(kwargs.values())), next(iter(kwargs.keys())) errors = [] if errors is None else errors if not len(json_string): - errors.append(FORMAT_ERR_MSG(object_name, 'Cannot parse empty string.')) + errors.append(_format_err_msg(object_name, 'Cannot parse empty string.')) return None, errors try: return json.loads(json_string), errors except json.JSONDecodeError as e: - errors.append(FORMAT_ERR_MSG(object_name, f'Not JSON valid. {e.args[0]}.')) + errors.append(_format_err_msg(object_name, f'Not JSON valid. {e.args[0]}.')) return None, errors -def validate_structure(obj: DictAny, model: BaseModel, *, idx: int | None = None, -properties: dict = {}, errors: list | None = None) -> list[str]: +def validate_structure(obj: DictAny, model: type[BaseModel], *, idx: int | None = None, +properties: DictStr = {}, errors: ListStr | None = None) -> list[str]: """ Validate structure of object based on pydantic model, return errors if occures. + + args: + obj: (required), python object (parsed json) to compare with pydantic model + model: (required), pydantic model to use as structure template for obj + kwargs: + idx: (optional), index represents position of nested object in list, used in _format_err_msg + properties: (optional), dictionary of model properties and expected values of properties, + enables value checking of obj + errors: (optional), list of error messages, default is empty list + returns: + errors-list """ - FORMAT_ERR_MSG = lambda prop, obj, msg:\ - f'Property `{prop}` of `{obj}{{}}`, ({msg})'.format(f'[{idx}]' if idx or idx==0 else '') + def _format_err_msg(prop: str, obj: str, msg: str) -> str: + return f'Property `{prop}` of `{obj}{{}}`, ({msg})'.format(f'[{idx}]' if idx or idx == 0 else '') + errors = [] if errors is None else errors - try: model.parse_obj(obj) except ValidationError as validation_err: @@ -92,11 +115,11 @@ def validate_structure(obj: DictAny, model: BaseModel, *, idx: int | None = None # maybe skip and print in log as WARNING (extra) # NOTE: try to use pydantic strict types if needed, v1 doesn't support BaseConfig.strict for err in validation_err.errors(): - errors.append(FORMAT_ERR_MSG(err['loc'][0], model.__name__, err['msg'])) + errors.append(_format_err_msg(str(err['loc'][0]), model.__class__.__name__, err['msg'])) for key, value in properties.items(): if obj.get(key, None) != value: errors.append( - FORMAT_ERR_MSG(key, model.__name__, f'expected value `{value}`, get `{obj[key]}`') + _format_err_msg(key, model.__class__.__name__, f'expected value `{value}`, get `{obj[key]}`') ) return errors diff --git a/frinx/workers/utils_workers/utils_service.py b/frinx/workers/utils_workers/utils_service.py index 73a2acf..673ad1b 100644 --- a/frinx/workers/utils_workers/utils_service.py +++ b/frinx/workers/utils_workers/utils_service.py @@ -2,22 +2,22 @@ from pydantic import BaseModel from pydantic import Extra -from pydantic.types import StrictStr from pydantic.types import StrictInt +from pydantic.types import StrictStr -from frinx.common.type_aliases import ListStr -from frinx.common.type_aliases import ListAny +from frinx.common.conductor_enums import TaskResultStatus from frinx.common.type_aliases import DictAny +from frinx.common.type_aliases import ListAny +from frinx.common.type_aliases import ListStr +from frinx.common.util import json_parse +from frinx.common.util import snake_to_camel_case +from frinx.common.util import validate_structure from frinx.common.worker.service import ServiceWorkersImpl -from frinx.common.worker.task_result import TaskResult -from frinx.common.conductor_enums import TaskResultStatus -from frinx.common.worker.worker import WorkerImpl from frinx.common.worker.task_def import TaskDefinition from frinx.common.worker.task_def import TaskInput from frinx.common.worker.task_def import TaskOutput -from frinx.common.util import json_parse -from frinx.common.util import validate_structure -from frinx.common.util import snake_to_camel_case +from frinx.common.worker.task_result import TaskResult +from frinx.common.worker.worker import WorkerImpl # TODO: move to other file, e.g. local/util.py @@ -26,7 +26,7 @@ class _SubWorkflowParam(BaseModel): name: StrictStr version: StrictInt - class Config: + class Config: # type: ignore[pydantic-alias] alias_generator = snake_to_camel_case extra = Extra.forbid diff --git a/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py b/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py index 27f57ef..cd24066 100644 --- a/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py +++ b/frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py @@ -1,18 +1,18 @@ from frinx.common.conductor_enums import WorkflowStatus from frinx.common.type_aliases import ListStr from frinx.common.workflow.service import ServiceWorkflowsImpl -from frinx.common.workflow.task import TerminateTask -from frinx.common.workflow.task import TerminateTaskInputParameters -from frinx.common.workflow.workflow import FrontendWFInputFieldType -from frinx.common.workflow.workflow import WorkflowImpl -from frinx.common.workflow.workflow import WorkflowInputField -from frinx.common.workflow.task import SimpleTask -from frinx.common.workflow.task import SimpleTaskInputParameters from frinx.common.workflow.task import DecisionTask from frinx.common.workflow.task import DecisionTaskInputParameters from frinx.common.workflow.task import DynamicForkTask from frinx.common.workflow.task import DynamicForkTaskInputParameters from frinx.common.workflow.task import JoinTask +from frinx.common.workflow.task import SimpleTask +from frinx.common.workflow.task import SimpleTaskInputParameters +from frinx.common.workflow.task import TerminateTask +from frinx.common.workflow.task import TerminateTaskInputParameters +from frinx.common.workflow.workflow import FrontendWFInputFieldType +from frinx.common.workflow.workflow import WorkflowImpl +from frinx.common.workflow.workflow import WorkflowInputField from frinx.workers.utils_workers.utils_service import UtilsService diff --git a/main.py b/main.py deleted file mode 100644 index 5f00171..0000000 --- a/main.py +++ /dev/null @@ -1,53 +0,0 @@ -import os - -os.environ['UNICONFIG_URL_BASE'] = 'http://localhost/api/uniconfig' -os.environ['CONDUCTOR_URL_BASE'] = 'http://127.0.0.1:8088/proxy/api' -os.environ['INVENTORY_URL_BASE'] = 'http://localhost/api/inventory' -os.environ['INFLUXDB_URL_BASE'] = 'http://localhost:8086' -os.environ['RESOURCE_MANAGER_URL_BASE'] = 'http://localhost/api/resource' - -import logging - -from frinx.workers.http_workers.http_workers import HTTPWorkersService -from frinx.workers.utils_workers.utils_service import UtilsService -from frinx.workflows.http_workflows.http_workflows_service import HTTPWorkflowService -from frinx.workflows.dynamic_fork.dynamic_fork_workflows_service import DynamicForkWFService -from frinx.common.logging import logging_common -from frinx.common.logging.logging_common import LoggerConfig -from frinx.common.logging.logging_common import Root -from frinx.client.frinx_conductor_wrapper import FrinxConductorWrapper -from frinx.common.frinx_rest import CONDUCTOR_URL_BASE -from frinx.common.frinx_rest import CONDUCTOR_HEADERS - - -def register_tasks(conductor_client): - logging.info('Register HTTP workers') - HTTPWorkersService().register(conductor_client) - logging.info('Register UTILS workers') - UtilsService().register(conductor_client) - - -def register_workflows(): - logging.info('Register HTTP workflows') - HTTPWorkflowService().register(overwrite=True) - logging.info('Register UTIL workflows') - DynamicForkWFService().register(overwrite=True) - - -def main(): - logging_common.configure_logging( - LoggerConfig(root=Root(level=os.environ.get('LOG_LEVEL', 'INFO').upper(), handlers=['console'])) - ) - conductor_client = FrinxConductorWrapper( - server_url=CONDUCTOR_URL_BASE, - polling_interval=0.1, - max_thread_count=50, - headers=CONDUCTOR_HEADERS, - ) - register_tasks(conductor_client) - register_workflows() - conductor_client.start_workers() - - -if __name__ == '__main__': - main() From e262ff9f9130e14d303f29fd2e1750319a37335a Mon Sep 17 00:00:00 2001 From: Martin Revaj Date: Wed, 23 Aug 2023 11:07:44 +0200 Subject: [PATCH 4/4] codespell fixes --- frinx/common/util.py | 14 +++++++------- frinx/workers/utils_workers/utils_service.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/frinx/common/util.py b/frinx/common/util.py index 8d0be9d..4f8fd19 100644 --- a/frinx/common/util.py +++ b/frinx/common/util.py @@ -65,7 +65,7 @@ def json_parse(errors: ListStr | None = None, **kwargs: str) -> tuple[ListAny | - errors: (optional), list of error messages, default is empty list kwargs: - =: name of json-object and json-formatted-string (optional, - but neccessary for functionality) + but necessary for functionality) e.g.: ip_addresses='["127.0.0.1", "127.0.0.2]', this approach enables to format to error messages this way: 'Object `ip_addresses`: (Not JSON valid. ... .)' returns: @@ -90,16 +90,16 @@ def _format_err_msg(obj: str, msg: str) -> str: def validate_structure(obj: DictAny, model: type[BaseModel], *, idx: int | None = None, properties: DictStr = {}, errors: ListStr | None = None) -> list[str]: """ - Validate structure of object based on pydantic model, return errors if occures. + Validate structure of object based on pydantic model, return errors if occurs. args: - obj: (required), python object (parsed json) to compare with pydantic model - model: (required), pydantic model to use as structure template for obj + - obj: (required), python object (parsed json) to compare with pydantic model + - model: (required), pydantic model to use as structure template for obj kwargs: - idx: (optional), index represents position of nested object in list, used in _format_err_msg - properties: (optional), dictionary of model properties and expected values of properties, + - idx: (optional), index represents position of nested object in list, used in _format_err_msg + - properties: (optional), dictionary of model properties and expected values of properties, enables value checking of obj - errors: (optional), list of error messages, default is empty list + - errors: (optional), list of error messages, default is empty list returns: errors-list """ diff --git a/frinx/workers/utils_workers/utils_service.py b/frinx/workers/utils_workers/utils_service.py index 673ad1b..d636d8b 100644 --- a/frinx/workers/utils_workers/utils_service.py +++ b/frinx/workers/utils_workers/utils_service.py @@ -42,7 +42,7 @@ class JSONParse(WorkerImpl): class WorkerDefinition(TaskDefinition): name: str = 'JSON_parse' - description: str = 'Returns object from JSON or errors if occures.' + description: str = 'Returns object from JSON or errors if occurs.' labels: ListStr = ['UTILS'] class WorkerInput(TaskInput):