Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions frinx/common/util.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import json
from typing import Any

from pydantic import BaseModel
from pydantic import ValidationError
from requests import JSONDecodeError
from requests import Response

from frinx.common.type_aliases import DictAny
from frinx.common.type_aliases import DictStr
from frinx.common.type_aliases import ListAny
from frinx.common.type_aliases import ListStr


def jsonify_description(
Expand Down Expand Up @@ -51,3 +55,71 @@ def parse_response(response: Response) -> DictAny | ListAny | str:
return response.json() # type: ignore[no-any-return]
except JSONDecodeError:
return response.text


def json_parse(errors: ListStr | None = None, **kwargs: str) -> tuple[ListAny | DictAny | None, ListStr]:
"""
Parse json, returns object and errors list as tuple.

args:
- errors: (optional), list of error messages, default is empty list
kwargs:
- <object_name>=<string>: name of json-object and json-formatted-string (optional,
but necessary for functionality)
e.g.: ip_addresses='["127.0.0.1", "127.0.0.2]', this approach enables to format to error
messages this way: 'Object `ip_addresses`: (Not JSON valid. ... .)'
returns:
parsed-json-object, errors-list
"""
def _format_err_msg(obj: str, msg: str) -> str:
return f'Object `{obj}`: ({msg})'

json_string, object_name = next(iter(kwargs.values())), next(iter(kwargs.keys()))
errors = [] if errors is None else errors

if not len(json_string):
errors.append(_format_err_msg(object_name, 'Cannot parse empty string.'))
return None, errors
try:
return json.loads(json_string), errors
except json.JSONDecodeError as e:
errors.append(_format_err_msg(object_name, f'Not JSON valid. {e.args[0]}.'))
return None, errors


def validate_structure(obj: DictAny, model: type[BaseModel], *, idx: int | None = None,
properties: DictStr = {}, errors: ListStr | None = None) -> list[str]:
"""
Validate structure of object based on pydantic model, return errors if occurs.

args:
- obj: (required), python object (parsed json) to compare with pydantic model
- model: (required), pydantic model to use as structure template for obj
kwargs:
- idx: (optional), index represents position of nested object in list, used in _format_err_msg
- properties: (optional), dictionary of model properties and expected values of properties,
enables value checking of obj
- errors: (optional), list of error messages, default is empty list
returns:
errors-list
"""

def _format_err_msg(prop: str, obj: str, msg: str) -> str:
return f'Property `{prop}` of `{obj}{{}}`, ({msg})'.format(f'[{idx}]' if idx or idx == 0 else '')

errors = [] if errors is None else errors
try:
model.parse_obj(obj)
except ValidationError as validation_err:
# TODO: how to handle strict and extra ?
# maybe skip and print in log as WARNING (extra)
# NOTE: try to use pydantic strict types if needed, v1 doesn't support BaseConfig.strict
for err in validation_err.errors():
errors.append(_format_err_msg(str(err['loc'][0]), model.__class__.__name__, err['msg']))

for key, value in properties.items():
if obj.get(key, None) != value:
errors.append(
_format_err_msg(key, model.__class__.__name__, f'expected value `{value}`, get `{obj[key]}`')
)
return errors
1 change: 1 addition & 0 deletions frinx/common/workflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class Config:

@root_validator(pre=True)
def check_input_values(cls, values: dict[str, Any]) -> Any:
# FIXME: AttributeError: 'str' object has no attribute '__fields__'
values['dynamic_tasks'] = values['dynamic_tasks'].__fields__['name'].default
return values

Expand Down
110 changes: 110 additions & 0 deletions frinx/workers/utils_workers/utils_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from typing import Any

from pydantic import BaseModel
from pydantic import Extra
from pydantic.types import StrictInt
from pydantic.types import StrictStr

from frinx.common.conductor_enums import TaskResultStatus
from frinx.common.type_aliases import DictAny
from frinx.common.type_aliases import ListAny
from frinx.common.type_aliases import ListStr
from frinx.common.util import json_parse
from frinx.common.util import snake_to_camel_case
from frinx.common.util import validate_structure
from frinx.common.worker.service import ServiceWorkersImpl
from frinx.common.worker.task_def import TaskDefinition
from frinx.common.worker.task_def import TaskInput
from frinx.common.worker.task_def import TaskOutput
from frinx.common.worker.task_result import TaskResult
from frinx.common.worker.worker import WorkerImpl


# TODO: move to other file, e.g. local/util.py
class DynamicTask(BaseModel):
class _SubWorkflowParam(BaseModel):
name: StrictStr
version: StrictInt

class Config: # type: ignore[pydantic-alias]
alias_generator = snake_to_camel_case
extra = Extra.forbid

name: StrictStr
task_reference_name: StrictStr
type: StrictStr
sub_workflow_param: _SubWorkflowParam


class UtilsService(ServiceWorkersImpl):

class JSONParse(WorkerImpl):

class WorkerDefinition(TaskDefinition):
name: str = 'JSON_parse'
description: str = 'Returns object from JSON or errors if occurs.'
labels: ListStr = ['UTILS']

class WorkerInput(TaskInput):
input: str

class WorkerOutput(TaskOutput):
result: DictAny | ListAny | None
errors: ListStr | None

def execute(self, worker_input: WorkerInput) -> TaskResult[Any]:
result, errors = json_parse(input=worker_input.input)

return TaskResult(
status=TaskResultStatus.COMPLETED,
output=self.WorkerOutput(
result=result,
errors=errors or None
)
)

class ForkJoinInputValidator(WorkerImpl):

class WorkerDefinition(TaskDefinition):
name: str = 'fork_join_input_validator'
description: str = 'Input validator for Dynamic_Fork / 1.'
labels: ListStr = ['UTILS']

class WorkerInput(TaskInput):
dynamic_tasks: str
expected_type: str
expected_name: str
dynamic_tasks_input: str

class WorkerOutput(TaskOutput):
result: DictAny | ListAny | None
errors: ListStr | None

def execute(self, worker_input: WorkerInput) -> TaskResult[Any]:
# TODO: validate also dynamic_tasks_input
dynamic_tasks, errors = json_parse(dynamic_tasks=worker_input.dynamic_tasks)
dynamic_tasks_input, errors = json_parse(
errors, dynamic_tasks_input=worker_input.dynamic_tasks_input)

if dynamic_tasks:
for idx, dyn_task in enumerate(dynamic_tasks):
errors = validate_structure(
dyn_task, DynamicTask,
properties={
'type': worker_input.expected_type,
'name': worker_input.expected_name
},
errors=errors,
idx=idx
)

return TaskResult(
status=TaskResultStatus.COMPLETED,
output=self.WorkerOutput(
result={
'dynamic_tasks': dynamic_tasks,
'dynamic_tasks_input': dynamic_tasks_input
} if not errors else None,
errors=errors or None
)
)
Empty file.
109 changes: 109 additions & 0 deletions frinx/workflows/dynamic_fork/dynamic_fork_workflows_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from frinx.common.conductor_enums import WorkflowStatus
from frinx.common.type_aliases import ListStr
from frinx.common.workflow.service import ServiceWorkflowsImpl
from frinx.common.workflow.task import DecisionTask
from frinx.common.workflow.task import DecisionTaskInputParameters
from frinx.common.workflow.task import DynamicForkTask
from frinx.common.workflow.task import DynamicForkTaskInputParameters
from frinx.common.workflow.task import JoinTask
from frinx.common.workflow.task import SimpleTask
from frinx.common.workflow.task import SimpleTaskInputParameters
from frinx.common.workflow.task import TerminateTask
from frinx.common.workflow.task import TerminateTaskInputParameters
from frinx.common.workflow.workflow import FrontendWFInputFieldType
from frinx.common.workflow.workflow import WorkflowImpl
from frinx.common.workflow.workflow import WorkflowInputField
from frinx.workers.utils_workers.utils_service import UtilsService


class DynamicForkWFService(ServiceWorkflowsImpl):

class DynamicFork(WorkflowImpl):
name: str = 'Dynamic_fork'
description: str = 'A dynamic fork + join task with input validation'
version: int = 1
labels: ListStr = ['BASICS', 'UTILS']

class WorkflowInput(WorkflowImpl.WorkflowInput):
dynamic_tasks: WorkflowInputField = WorkflowInputField(
name='dynamic_tasks',
description='Tasks or sub-workflows running in parallel.',
type=FrontendWFInputFieldType.TEXTAREA,
frontend_default_value=None
)

expected_name: WorkflowInputField = WorkflowInputField(
name='expectedName',
description='Expected name of sub-tasks or sub-workflows.',
type=FrontendWFInputFieldType.STRING,
frontend_default_value=None
)

expected_type: WorkflowInputField = WorkflowInputField(
name='expectedType',
description='Expected type of sub-tasks or sub-workflows.',
type=FrontendWFInputFieldType.STRING,
frontend_default_value=None
)

dynamic_tasks_input: WorkflowInputField = WorkflowInputField(
name='dynamic_tasks_input',
description='Inputs to sub-tasks or sub-workflows.',
type=FrontendWFInputFieldType.TEXTAREA,
frontend_default_value=None
)

class WorkflowOutput(WorkflowImpl.WorkflowOutput):
status: WorkflowStatus

def workflow_builder(self, workflow_inputs: WorkflowInput) -> None:
validation = SimpleTask(
name=UtilsService.ForkJoinInputValidator,
task_reference_name='validation',
input_parameters=SimpleTaskInputParameters(
dynamic_tasks=workflow_inputs.dynamic_tasks.wf_input,
expected_name=workflow_inputs.expected_name.wf_input,
expected_type=workflow_inputs.expected_type.wf_input,
dynamic_tasks_input=workflow_inputs.dynamic_tasks_input.wf_input
)
)

termination = TerminateTask(
name='terminateTask',
task_reference_name='termination',
input_parameters=TerminateTaskInputParameters(
termination_status=WorkflowStatus.FAILED,
termination_reason=validation.output_ref('errors'),
workflow_output={'output': validation.output_ref('errors')}
)
)

dynamic_fork = DynamicForkTask(
name='dynamicForkTask',
task_reference_name='dynamic_fork',
dynamic_fork_tasks_param='dynamic_tasks',
dynamic_fork_tasks_input_param_name='dynamic_tasks_input',
input_parameters=DynamicForkTaskInputParameters(
dynamic_tasks=validation.output_ref('result.dynamic_tasks'),
dynamic_tasks_input=validation.output_ref('result.dynamic_tasks_input')
)
)

decision = DecisionTask(
name='decisionTask',
task_reference_name='decision',
default_case=[
dynamic_fork,
JoinTask(
name='joinTask',
task_reference_name='join'
)
],
decision_cases={'termination': [termination]},
case_expression='$.errors ? "termination" : "default"',
input_parameters=DecisionTaskInputParameters(
errors=validation.output_ref('errors')
)
)

self.tasks = [validation, decision]