From b79d10499d22b18429058c83c53efc5d61acd5f8 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Tue, 7 Jan 2025 08:48:31 +0100 Subject: [PATCH 01/18] Rename controller --- processor/process/kommonitor/percentage_share.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index 3ef34f3..dc05599 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -76,10 +76,10 @@ def run(self, logger.debug("Starting execution...") try: - georesources_controller = openapi_client.IndicatorsControllerApi(data_management_client) + indicators_controller = openapi_client.IndicatorsControllerApi(data_management_client) indicator = {} - response = georesources_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( + response = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( inputs["reference_indicator_id"], inputs["target_spatial_unit_id"]) # Iterate population for feat in response: @@ -100,7 +100,7 @@ def run(self, } # Iterate unemployed - response = georesources_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( + response = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( inputs["indicator_id"], inputs["target_spatial_unit_id"]) for feat in response: feature_id = feat["fid"] From 8a5725445270efbbee295dadaedc81518f0fbdfd Mon Sep 17 00:00:00 2001 From: speckij Date: Wed, 15 Jan 2025 01:05:47 +0100 Subject: [PATCH 02/18] update to pygeoapi-latest + prefect v3 --- processor/app.py | 6 +++--- requirements.txt | 7 ++++--- requirements_nodeps.txt | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/processor/app.py b/processor/app.py index 7096957..18eacb0 100644 --- a/processor/app.py +++ b/processor/app.py @@ -16,7 +16,7 @@ os.environ["PYGEOAPI_OPENAPI"] = os.path.join(os.path.dirname(__file__), "default-openapi.yml") from pygeoapi import flask_app -from pygeoapi.flask_app import STATIC_FOLDER, API_RULES, CONFIG, api_, get_response +from pygeoapi.flask_app import STATIC_FOLDER, API_RULES, CONFIG, api_, processes_api, execute_from_flask require_oauth = ResourceProtector() require_oauth.register_token_validator(MyIntrospectTokenValidator()) @@ -35,8 +35,7 @@ def landing_page(): @APP.get('/processes/') @require_oauth() def get_processes(process_id=None): - return get_response(api_.describe_processes(request, process_id)) - + return flask_app.get_processes(process_id) @APP.post('/processes') @require_oauth() @@ -131,6 +130,7 @@ def parse_processes(package: str) -> None: } } flask_app.api_.manager.processes = processes + api_.config['resources'] = processes async def init(): diff --git a/requirements.txt b/requirements.txt index ac4f1dc..6cbc0f4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,12 +4,13 @@ babel pygeofilter Flask~=2.2.3 oauthlib~=3.2.2 -prefect~=2.20.9 +prefect geopandas~=0.14.1 griffe >= 0.20.0, <0.48.0 gunicorn -pydantic~=2.9.2 +pydantic urllib3~=1.26.15 python-dateutil~=2.8.2 setuptools~=67.6.0 -rtree \ No newline at end of file +rtree +filelock \ No newline at end of file diff --git a/requirements_nodeps.txt b/requirements_nodeps.txt index 13c1e1f..429d7b7 100644 --- a/requirements_nodeps.txt +++ b/requirements_nodeps.txt @@ -1,2 +1,2 @@ pygeoapi-prefect @ git+https://github.com/kommonitor/pygeoapi-prefect.git@feature/kommonitor -pygeoapi @ git+https://github.com/geopython/pygeoapi.git@e7264e8 \ No newline at end of file +pygeoapi @ git+https://github.com/52North/pygeoapi.git@pydantic_v2 \ No newline at end of file From 4e47614457ce05673369b5f6d4698f17fdef64e3 Mon Sep 17 00:00:00 2001 From: speckij Date: Wed, 15 Jan 2025 01:07:49 +0100 Subject: [PATCH 03/18] fix missing `processor` reference inside flow --- processor/default-config.yml | 3 - processor/process/base.py | 109 +++++++++----------- processor/process/kommonitor/hello_world.py | 5 +- processor/process/kommonitor/reproject.py | 3 +- 4 files changed, 53 insertions(+), 67 deletions(-) diff --git a/processor/default-config.yml b/processor/default-config.yml index 1abea1b..e2515de 100644 --- a/processor/default-config.yml +++ b/processor/default-config.yml @@ -38,7 +38,6 @@ server: languages: # First language is the default language - en-US - - fr-CA # cors: true pretty_print: true limit: 10 @@ -63,10 +62,8 @@ metadata: identification: title: en: pygeoapi default instance - fr: instance par défaut de pygeoapi description: en: pygeoapi provides an API to geospatial data - fr: pygeoapi fournit une API aux données géospatiales keywords: en: - geospatial diff --git a/processor/process/base.py b/processor/process/base.py index 4c93afd..ec2c584 100644 --- a/processor/process/base.py +++ b/processor/process/base.py @@ -1,5 +1,4 @@ import abc -import datetime as dt import logging import os from dataclasses import dataclass @@ -11,7 +10,6 @@ from prefect.filesystems import LocalFileSystem from prefect.serializers import JSONSerializer import requests -from flask import g from openapi_client import ApiClient from prefect import task, flow, get_run_logger from pygeoapi.util import JobStatus @@ -19,7 +17,6 @@ from processor.auth import KC_CLIENT_ID, KC_CLIENT_SECRET, KC_HOSTNAME, KC_HOSTNAME_PATH, KC_REALM_NAME from pygeoapi_prefect import schemas from pygeoapi_prefect.process.base import BasePrefectProcessor -from pygeoapi_prefect.schemas import JobStatusInfoInternal, OutputExecutionResultInternal @dataclass @@ -72,85 +69,75 @@ def data_management_client(logger: Logger, execute_request: schemas.ExecuteReque return openapi_client.ApiClient(configuration) -class KommonitorProcess(BasePrefectProcessor): - result_storage_block = None - process_flow = flow +@task +def format_inputs(execution_request: schemas.ExecuteRequest): + inputs = {} - @staticmethod - @task - def format_inputs(execution_request: schemas.ExecuteRequest): - inputs = {} + for k, v in execution_request.inputs.items(): + inputs[k] = v.root + return inputs - for k, v in execution_request.inputs.items(): - inputs[k] = v.root - return inputs - @staticmethod - @task - def setup_logging(job_id: str) -> Logger: - os.mkdir(f"{PROCESS_RESULTS_DIR}/{job_id}/") - log_path = f"{PROCESS_RESULTS_DIR}/{job_id}/log.txt" - filelogger = logging.FileHandler(log_path) - filelogger.setLevel(logging.DEBUG) - logger = get_run_logger() - logger.logger.addHandler(filelogger) - logger.debug("Setup logging ...") - return logger +@task +def setup_logging(job_id: str) -> Logger: + os.mkdir(f"results/{job_id}/") + log_path = f"results/{job_id}/log.txt" + + filelogger = logging.FileHandler(log_path) + filelogger.setLevel(logging.DEBUG) + logger = get_run_logger() + logger.logger.addHandler(filelogger) + logger.debug("Setup logging ...") + return logger + + +@task +def store_output(job_id: str, output: dict) -> str: + storage = LocalFileSystem() + serializer = JSONSerializer() + result_path = f"{PROCESS_RESULTS_DIR}/{job_id}/result.json" + result = serializer.dumps(output) + storage.write_path(result_path, result) + return result_path - @staticmethod - @task - def store_output(job_id: str, output: dict) -> str: - storage = LocalFileSystem() - serializer = JSONSerializer() - result_path = f"{PROCESS_RESULTS_DIR}/{job_id}/result.json" - result = serializer.dumps(output) - storage.write_path(result_path, result) - return result_path + +class KommonitorProcess(BasePrefectProcessor): + result_storage_block = None + + def __init__(self, processor_def: dict): + super().__init__(processor_def) + self.process_flow.__setattr__("processor", self) @staticmethod @flow(persist_result=True) def process_flow( - processor, job_id: str, - result_storage_block: str | None, - process_description: schemas.ProcessDescription, execution_request: schemas.ExecuteRequest - ) -> Union[schemas.JobStatusInfoInternal, prefect.states.State]: - print(processor) - + ) -> dict: ## Setup - logger = KommonitorProcess.setup_logging(job_id) - inputs = KommonitorProcess.format_inputs(execution_request) + + p = prefect.context.get_run_context().flow.__getattribute__("processor") + logger = setup_logging(job_id) + inputs = format_inputs(execution_request) config = KommonitorProcessConfig(job_id, inputs, f"{job_id}/output-result.txt") dmc = data_management_client(logger, execution_request, True) ## Run process - status, outputs = processor.run(config, logger, dmc) - - res_path = KommonitorProcess.store_output(job_id, outputs) + status, outputs = p.run(config, logger, dmc) + res_path = store_output(job_id, outputs) ## Reformat output - return JobStatusInfoInternal( - jobID=job_id, - processID=process_description.id, - status=status, - updated=dt.datetime.now(), - generated_outputs={ - "result": schemas.OutputExecutionResultInternal( - location=res_path, - media_type=( - process_description.outputs["result"].schema_.content_media_type - ), - ) - }, - ) + # TODO: actually return results here + return { + 'results': [] + } - @staticmethod @abc.abstractmethod - def run(config: KommonitorProcessConfig, + def run(self, + config: KommonitorProcessConfig, logger: logging.Logger, - dmc: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): + dmc: ApiClient) -> (JobStatus, Dict): ... @property diff --git a/processor/process/kommonitor/hello_world.py b/processor/process/kommonitor/hello_world.py index 630131b..9855c84 100644 --- a/processor/process/kommonitor/hello_world.py +++ b/processor/process/kommonitor/hello_world.py @@ -49,8 +49,8 @@ class HelloWorld(KommonitorProcess): }, ) - @staticmethod - def run(config: KommonitorProcessConfig, + def run(self, + config: KommonitorProcessConfig, logger: logging.Logger, data_management_client: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): @@ -66,6 +66,7 @@ def run(config: KommonitorProcessConfig, # 4.1 Return success and result return JobStatus.successful, None + except ApiException as e: # 4.2 Catch possible errors cleanly diff --git a/processor/process/kommonitor/reproject.py b/processor/process/kommonitor/reproject.py index 9b45d20..647e27b 100644 --- a/processor/process/kommonitor/reproject.py +++ b/processor/process/kommonitor/reproject.py @@ -43,7 +43,8 @@ class Reproject(KommonitorProcess): outputs={} ) - def run(config: KommonitorProcessConfig, + def run(self, + config: KommonitorProcessConfig, logger: logging.Logger, data_management_client: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): From 5c77a0c79a601f8c6b5c1796db52b10660a09761 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Fri, 17 Jan 2025 09:52:52 +0100 Subject: [PATCH 04/18] Refine base process flow --- processor/process/base.py | 46 +++++++++++-------- .../process/kommonitor/percentage_share.py | 2 +- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/processor/process/base.py b/processor/process/base.py index ec2c584..cf74753 100644 --- a/processor/process/base.py +++ b/processor/process/base.py @@ -1,14 +1,15 @@ import abc +import json import logging import os from dataclasses import dataclass from logging import Logger from typing import Dict, Union, Optional +import json import openapi_client import prefect -from prefect.filesystems import LocalFileSystem -from prefect.serializers import JSONSerializer + import requests from openapi_client import ApiClient from prefect import task, flow, get_run_logger @@ -17,6 +18,7 @@ from processor.auth import KC_CLIENT_ID, KC_CLIENT_SECRET, KC_HOSTNAME, KC_HOSTNAME_PATH, KC_REALM_NAME from pygeoapi_prefect import schemas from pygeoapi_prefect.process.base import BasePrefectProcessor +from pygeoapi_prefect.utils import get_storage @dataclass @@ -92,14 +94,28 @@ def setup_logging(job_id: str) -> Logger: @task -def store_output(job_id: str, output: dict) -> str: - storage = LocalFileSystem() - serializer = JSONSerializer() - result_path = f"{PROCESS_RESULTS_DIR}/{job_id}/result.json" - result = serializer.dumps(output) - storage.write_path(result_path, result) - return result_path - +def store_output_as_file(job_id: str, output: dict) -> dict: + storage_type = "LocalFileSystem" + basepath = f"{PROCESS_RESULTS_DIR}" + output_dir = get_storage(storage_type, basepath=basepath) + filename = f"percentage-share-result-{job_id}.json" + output_dir.write_path(filename, json.dumps(output).encode('utf-8')) + return { + 'providers': { + 'file_storage_provider': { + 'type': storage_type, + 'basepath': basepath + } + }, + 'results': [ + { + 'provider': 'file_storage_provider', + 'mime_type': 'text/plain', + 'location': f'{output_dir.basepath}/{filename}', + 'filename': filename + } + ] + } class KommonitorProcess(BasePrefectProcessor): result_storage_block = None @@ -115,23 +131,17 @@ def process_flow( execution_request: schemas.ExecuteRequest ) -> dict: ## Setup - p = prefect.context.get_run_context().flow.__getattribute__("processor") logger = setup_logging(job_id) inputs = format_inputs(execution_request) - config = KommonitorProcessConfig(job_id, inputs, f"{job_id}/output-result.txt") dmc = data_management_client(logger, execution_request, True) ## Run process status, outputs = p.run(config, logger, dmc) - res_path = store_output(job_id, outputs) - ## Reformat output - # TODO: actually return results here - return { - 'results': [] - } + ## Store output and return result + return store_output_as_file(job_id, outputs["result"]) @abc.abstractmethod def run(self, diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index dc05599..f8ba8ee 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -69,7 +69,7 @@ class PercentageShare(KommonitorProcess): def run(self, config: KommonitorProcessConfig, logger: logging.Logger, - data_management_client: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): + data_management_client: ApiClient) -> (JobStatus, Dict): # 1. Load inputs inputs = config.inputs From 3620e4fc3dfee9b9ddb09f4fce5eb28c65cc3063 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Thu, 23 Jan 2025 14:54:50 +0100 Subject: [PATCH 05/18] Fix running app --- processor/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processor/app.py b/processor/app.py index 18eacb0..b551ee1 100644 --- a/processor/app.py +++ b/processor/app.py @@ -118,6 +118,7 @@ def parse_processes(package: str) -> None: """ processes = flask_app.api_.manager.processes for process in glob.glob(f"process/{package}/*.py"): + print(process) with open(process) as fh: root = ast.parse(fh.read()) for node in ast.iter_child_nodes(root): @@ -151,9 +152,9 @@ async def init(): # await deployment.apply() pass +asyncio.run(init()) def run(): - asyncio.run(init()) APP.run(debug=False, host=api_.config['server']['bind']['host'], From f94acd3daeffacf2021eee32422889eec736606a Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Thu, 23 Jan 2025 14:55:47 +0100 Subject: [PATCH 06/18] Introduce common inputs --- processor/process/base.py | 58 ++++++++++++++++++- processor/process/kommonitor/hello_world.py | 4 +- .../process/kommonitor/percentage_share.py | 5 +- processor/process/kommonitor/reproject.py | 32 +++++++++- 4 files changed, 90 insertions(+), 9 deletions(-) diff --git a/processor/process/base.py b/processor/process/base.py index cf74753..f49bf6c 100644 --- a/processor/process/base.py +++ b/processor/process/base.py @@ -15,9 +15,10 @@ from prefect import task, flow, get_run_logger from pygeoapi.util import JobStatus -from processor.auth import KC_CLIENT_ID, KC_CLIENT_SECRET, KC_HOSTNAME, KC_HOSTNAME_PATH, KC_REALM_NAME +from auth import KC_CLIENT_ID, KC_CLIENT_SECRET, KC_HOSTNAME, KC_HOSTNAME_PATH, KC_REALM_NAME from pygeoapi_prefect import schemas from pygeoapi_prefect.process.base import BasePrefectProcessor +from pygeoapi_prefect.schemas import ProcessInput, ProcessIOSchema, ProcessIOType, ProcessIOFormat from pygeoapi_prefect.utils import get_storage @@ -119,6 +120,53 @@ def store_output_as_file(job_id: str, output: dict) -> dict: class KommonitorProcess(BasePrefectProcessor): result_storage_block = None + common_inputs = { + "target_indicator_id": ProcessInput( + title="target_indicator_id", + schema_=ProcessIOSchema( + type_=ProcessIOType.STRING + ) + ), + "target_spatial_units": ProcessInput( + title="target_spatial_units", + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE), + min_items=1 + ) + ), + "target_time ": ProcessInput( + title="target_time", + schema_=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required={'mode'}, + properties={ + "mode": ProcessIOSchema(type_=ProcessIOType.STRING), + "includeDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)), + "excludeDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)) + }, + default={ + "mode": "MISSING", + "includeDates": [], + "excludeDates": [] + } + ) + ), + "execution_interval ": ProcessInput( + title="execution_interval", + schema_=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required={"cron"}, + properties={ + "cron": ProcessIOSchema(type_=ProcessIOType.STRING) + }, + default={ + "cron": "0 0 1 * *" + } + ), + + ) + } def __init__(self, processor_def: dict): super().__init__(processor_def) @@ -151,6 +199,12 @@ def run(self, ... @property - @abc.abstractmethod def process_description(self) -> schemas.ProcessDescription: + description = self.detailed_process_description + description.inputs = description.inputs | KommonitorProcess.common_inputs + return description + + @property + @abc.abstractmethod + def detailed_process_description(self) -> schemas.ProcessDescription: ... diff --git a/processor/process/kommonitor/hello_world.py b/processor/process/kommonitor/hello_world.py index 9855c84..3bc261a 100644 --- a/processor/process/kommonitor/hello_world.py +++ b/processor/process/kommonitor/hello_world.py @@ -5,12 +5,12 @@ from pygeoapi.process.base import * from pygeoapi.util import JobStatus -from processor.process.base import KommonitorProcess, KommonitorProcessConfig +from ..base import KommonitorProcess, KommonitorProcessConfig from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, Parameter, AdditionalProcessIOParameters, OutputExecutionResultInternal, ProcessOutput class HelloWorld(KommonitorProcess): - process_description = ProcessDescription( + detailed_process_description = ProcessDescription( id="hello-world", version="1.0.0", title="Hello World! ", diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index f8ba8ee..b5dce65 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -7,12 +7,11 @@ from pygeoapi.util import JobStatus from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, Parameter, AdditionalProcessIOParameters, OutputExecutionResultInternal, ProcessOutput -from processor.process.base import KommonitorProcess, KommonitorProcessConfig -from processor.process.util.geojson import as_geojson +from ..base import KommonitorProcess, KommonitorProcessConfig class PercentageShare(KommonitorProcess): - process_description = ProcessDescription( + detailed_process_description = ProcessDescription( id="percentage-share", version="0.0.1", title="Percentage share of an indicator to a reference indicator in a given spatial unit", diff --git a/processor/process/kommonitor/reproject.py b/processor/process/kommonitor/reproject.py index 647e27b..bd10eb3 100644 --- a/processor/process/kommonitor/reproject.py +++ b/processor/process/kommonitor/reproject.py @@ -5,14 +5,14 @@ from pygeoapi.process.base import * from pygeoapi.util import JobStatus -from processor.process.base import KommonitorProcess, KommonitorProcessConfig +from ..base import KommonitorProcess, KommonitorProcessConfig from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, OutputExecutionResultInternal import geopandas as gpd class Reproject(KommonitorProcess): - process_description = ProcessDescription( + detailed_process_description = ProcessDescription( id="reproject", version="0.0.2", title="Reproject Georesource and persists result to local file.", @@ -39,6 +39,34 @@ class Reproject(KommonitorProcess): title="output_georesource_name", schema=ProcessIOSchema(type=ProcessIOType.STRING) ), + "target_indicator_id": ProcessInput( + title="target_indicator_id", + schema=ProcessIOSchema(type=ProcessIOType.STRING) + ), + "target_spatial_units": ProcessInput( + title="target_spatial_units", + schema=ProcessIOSchema(type=ProcessIOType.ARRAY, items=[ProcessIOSchema(type=ProcessIOType.STRING)], + min_items=1) + ), + "target_time ": ProcessInput( + title="target_time", + schema=ProcessIOSchema( + type=ProcessIOType.OBJECT, + properties=ProcessIOSchema(allOf=[ProcessIOSchema(title="mode", type=ProcessIOType.STRING), + ProcessIOSchema(title="includeDates", type=ProcessIOType.ARRAY, + items=[ + ProcessIOSchema(type=ProcessIOType.STRING)]), + ProcessIOSchema(title="excludeDates", type=ProcessIOType.ARRAY, + items=[ + ProcessIOSchema(type=ProcessIOType.STRING)]) + ] + ) + ) + ), + "execution_interval ": ProcessInput( + title="execution_interval", + schema=ProcessIOSchema(type=ProcessIOType.STRING) + ) }, outputs={} ) From 7d583c431c42e0aa9a8b35a3cf8ee40c783b2c67 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Mon, 27 Jan 2025 16:12:13 +0100 Subject: [PATCH 07/18] Rename custom token introspector --- processor/app.py | 4 ++-- processor/auth.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/processor/app.py b/processor/app.py index b551ee1..2c5b810 100644 --- a/processor/app.py +++ b/processor/app.py @@ -8,7 +8,7 @@ from flask import Flask, send_from_directory, request from werkzeug.utils import secure_filename -from auth import MyIntrospectTokenValidator +from auth import KomMonitorIntrospectTokenValidator if not os.getenv("PYGEOAPI_CONFIG"): os.environ["PYGEOAPI_CONFIG"] = os.path.join(os.path.dirname(__file__), "default-config.yml") @@ -19,7 +19,7 @@ from pygeoapi.flask_app import STATIC_FOLDER, API_RULES, CONFIG, api_, processes_api, execute_from_flask require_oauth = ResourceProtector() -require_oauth.register_token_validator(MyIntrospectTokenValidator()) +require_oauth.register_token_validator(KomMonitorIntrospectTokenValidator()) APP = Flask(__name__, static_folder=STATIC_FOLDER, static_url_path='/static') APP.url_map.strict_slashes = API_RULES.strict_slashes diff --git a/processor/auth.py b/processor/auth.py index 6824f2b..5bf27bb 100644 --- a/processor/auth.py +++ b/processor/auth.py @@ -14,7 +14,7 @@ KC_HOSTNAME_PATH = os.getenv('KC_HOSTNAME_PATH', "") -class MyIntrospectTokenValidator(IntrospectTokenValidator): +class KomMonitorIntrospectTokenValidator(IntrospectTokenValidator): def introspect_token(self, token_string): url = f"https://{KC_HOSTNAME}{KC_HOSTNAME_PATH}/realms/{KC_REALM_NAME}/protocol/openid-connect/token/introspect" data = {'token': token_string[7:], 'token_type_hint': 'access_token'}\ From f2f842982dd9bc8ec10c2ebf2d78ff16fa44ecfa Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Mon, 27 Jan 2025 16:13:06 +0100 Subject: [PATCH 08/18] Refine base process description --- processor/process/base.py | 121 +++++++++++++++++++++++++++++++++++--- 1 file changed, 112 insertions(+), 9 deletions(-) diff --git a/processor/process/base.py b/processor/process/base.py index f49bf6c..2f0810b 100644 --- a/processor/process/base.py +++ b/processor/process/base.py @@ -3,6 +3,7 @@ import logging import os from dataclasses import dataclass +from enum import Enum from logging import Logger from typing import Dict, Union, Optional import json @@ -18,7 +19,8 @@ from auth import KC_CLIENT_ID, KC_CLIENT_SECRET, KC_HOSTNAME, KC_HOSTNAME_PATH, KC_REALM_NAME from pygeoapi_prefect import schemas from pygeoapi_prefect.process.base import BasePrefectProcessor -from pygeoapi_prefect.schemas import ProcessInput, ProcessIOSchema, ProcessIOType, ProcessIOFormat +from pygeoapi_prefect.schemas import ProcessInput, ProcessIOSchema, ProcessIOType, ProcessIOFormat, ProcessOutput, \ + ExecutionQualifiedInputValue, ExecutionInputValueNoObject, ExecutionInputValueNoObjectArray from pygeoapi_prefect.utils import get_storage @@ -77,7 +79,12 @@ def format_inputs(execution_request: schemas.ExecuteRequest): inputs = {} for k, v in execution_request.inputs.items(): - inputs[k] = v.root + if type(v) is ExecutionInputValueNoObject or type(v) is ExecutionInputValueNoObjectArray: + inputs[k] = v.model_dump() + elif type(v) is ExecutionQualifiedInputValue: + inputs[k] = v.model_dump()["value"] + else: + raise Exception("Unsupported input value!") return inputs @@ -118,8 +125,30 @@ def store_output_as_file(job_id: str, output: dict) -> dict: ] } + +class ExecutionErrorType(str, Enum): + MISSING_TIMESTAMP = "MISSING_TIMESTAMP" + MISSING_DATASET = "MISSING_DATASET" + MISSING_SPATIAL_UNIT = "MISSING_SPATIAL_UNIT" + MISSING_SPATIAL_UNIT_FEATURE = "MISSING_SPATIAL_UNIT_FEATURE" + DATAMANAGEMENT_API_ERROR = "DATAMANAGEMENT_API_ERROR" + PROCESSING_ERROR = "PROCESSING_ERROR" + + +class ExecutionMode(str, Enum): + MISSING = "MISSING" + ALL = "ALL" + DATES = "DATES" + + +class ExecutionResourceType(str, Enum): + GEORESOURCE = "GEORESOURCE" + INDICATOR = "INDICATOR" + + class KommonitorProcess(BasePrefectProcessor): result_storage_block = None + common_inputs = { "target_indicator_id": ProcessInput( title="target_indicator_id", @@ -131,7 +160,7 @@ class KommonitorProcess(BasePrefectProcessor): title="target_spatial_units", schema_=ProcessIOSchema( type_=ProcessIOType.ARRAY, - items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE), + items=ProcessIOSchema(type_=ProcessIOType.STRING), min_items=1 ) ), @@ -139,14 +168,14 @@ class KommonitorProcess(BasePrefectProcessor): title="target_time", schema_=ProcessIOSchema( type_=ProcessIOType.OBJECT, - required={'mode'}, + required=["mode"], properties={ - "mode": ProcessIOSchema(type_=ProcessIOType.STRING), + "mode": ProcessIOSchema(type_=ProcessIOType.STRING, enum=[ExecutionMode.MISSING, ExecutionMode.ALL, ExecutionMode.DATES]), "includeDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)), "excludeDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)) }, default={ - "mode": "MISSING", + "mode": ExecutionMode.MISSING, "includeDates": [], "excludeDates": [] } @@ -156,7 +185,7 @@ class KommonitorProcess(BasePrefectProcessor): title="execution_interval", schema_=ProcessIOSchema( type_=ProcessIOType.OBJECT, - required={"cron"}, + required=["cron"], properties={ "cron": ProcessIOSchema(type_=ProcessIOType.STRING) }, @@ -164,10 +193,85 @@ class KommonitorProcess(BasePrefectProcessor): "cron": "0 0 1 * *" } ), + ) + } + error_type = ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["type", "affectedResourceType", "affectedDatasetId", "affectedTimestamps", "affectedSpatialUnitFeatures", "errorMessage"], + properties={ + "type": ProcessIOSchema(type_=ProcessIOType.STRING, enum=[ExecutionErrorType.MISSING_TIMESTAMP, ExecutionErrorType.MISSING_DATASET, ExecutionErrorType.MISSING_SPATIAL_UNIT, ExecutionErrorType.MISSING_SPATIAL_UNIT_FEATURE, ExecutionErrorType.DATAMANAGEMENT_API_ERROR, ExecutionErrorType.PROCESSING_ERROR]), + "affectedResourceType": ProcessIOSchema(type_=ProcessIOType.STRING, enum=[ExecutionResourceType.INDICATOR, ExecutionResourceType.GEORESOURCE]), + "affectedDatasetId": ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.UUID), + "affectedTimestamps": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE)), + "affectedSpatialUnitFeatures": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)), + "errorMessage": ProcessIOSchema(type_=ProcessIOType.STRING) + } + ) + ) + + indicator_value_type = ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["spatialReferenceKey", "valueMapping"], + properties={ + "spatialReferenceKey": ProcessIOSchema(type_=ProcessIOType.STRING), + "valueMapping": ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["indicatorValue", "timestamp"], + properties={ + "indicatorValue": ProcessIOSchema(type_=ProcessIOType.NUMBER), + "timestamp": ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE) + } + ) + ) + } + ) + + common_output = { + "jobSummary": ProcessOutput( + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + properties={ + "spatialUnitId": ProcessIOSchema(type_=ProcessIOType.STRING), + "modifiedResource": ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.URI), + "numberOfIntegratedIndicatorFeatures": ProcessIOSchema(type_=ProcessIOType.INTEGER), + "integratedTargetDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE)), + "errorsOccurred": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=error_type) + } + ), + content_media_type= "application/json" + ) + ), + "results": ProcessOutput( + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.URI), + content_media_type="application/json" + ) + ), + "resultData": ProcessOutput( + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["applicableSpatialUnit", "indicatorValues"], + properties={ + "applicableSpatialUnit": ProcessIOSchema(type_=ProcessIOType.STRING), + "indicatorValues": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=indicator_value_type), + } + ), + content_media_type="application/json" + ) ) } + def __init__(self, processor_def: dict): super().__init__(processor_def) self.process_flow.__setattr__("processor", self) @@ -189,7 +293,7 @@ def process_flow( status, outputs = p.run(config, logger, dmc) ## Store output and return result - return store_output_as_file(job_id, outputs["result"]) + return store_output_as_file(job_id, outputs["results"]) @abc.abstractmethod def run(self, @@ -201,7 +305,6 @@ def run(self, @property def process_description(self) -> schemas.ProcessDescription: description = self.detailed_process_description - description.inputs = description.inputs | KommonitorProcess.common_inputs return description @property From 9c7a68e3c0cf95ab23243e2676a100b9faeea72b Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Mon, 27 Jan 2025 16:13:33 +0100 Subject: [PATCH 09/18] Adapt percentage share process to changed base process --- .../process/kommonitor/percentage_share.py | 82 +++++++++++-------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index b5dce65..a85eeb3 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -16,10 +16,17 @@ class PercentageShare(KommonitorProcess): version="0.0.1", title="Percentage share of an indicator to a reference indicator in a given spatial unit", example={ - "indicator_id": "cccca04-cc57-48d3-a801-d6b4b00fcccc", - "reference_indicator_id": "aaaaa04-cc57-48d3-a801-d6b4b00faaaa", - "target_spatial_unit_id": "bbbba04-cc57-48d3-a801-d6b4b00fbbbb", - "target_date": "2000-01-01" + "target_indicator_id": "cccca04-cc57-48d3-a801-d6b4b00fcccc", + "target_spatial_units": ["bbbba04-cc57-48d3-a801-d6b4b00fbbbb"], + "target_time": { + "mode": "MISSING", + "includeDates": [], + "excludeDates": [] + }, + "execution_interval": { + "cron": "0 0 1 * *" + }, + "reference_indicator_id": "aaaaa04-cc57-48d3-a801-d6b4b00faaaa" }, additional_parameters=AdditionalProcessIOParameters( parameters=[ @@ -33,36 +40,17 @@ class PercentageShare(KommonitorProcess): ), ] ), - jobControlOptions=[ + job_control_options=[ ProcessJobControlOption.SYNC_EXECUTE, ProcessJobControlOption.ASYNC_EXECUTE, ], - inputs={ - "indicator_id": ProcessInput( - title="indicator_id", - schema=ProcessIOSchema(type=ProcessIOType.STRING) - ), + inputs=KommonitorProcess.common_inputs | { "reference_indicator_id": ProcessInput( title="reference_indicator_id", - schema=ProcessIOSchema(type=ProcessIOType.STRING) - ), - "target_spatial_unit_id": ProcessInput( - title="target_spatial_unit_id", - schema=ProcessIOSchema(type=ProcessIOType.STRING) - ), - "target_date": ProcessInput( - title="target_date", - schema=ProcessIOSchema(type=ProcessIOType.STRING) - ) - }, - outputs={ - "result": ProcessOutput( - schema=ProcessIOSchema( - type=ProcessIOType.OBJECT, - contentMediaType="application/json", - ) + schema_=ProcessIOSchema(type_=ProcessIOType.STRING) ) }, + outputs = KommonitorProcess.common_output ) def run(self, @@ -77,14 +65,22 @@ def run(self, try: indicators_controller = openapi_client.IndicatorsControllerApi(data_management_client) indicator = {} + target_indicator_id = inputs["target_indicator_id"] + reference_indicator_id = inputs["reference_indicator_id"] + target_spatial_units = inputs["target_spatial_units"] + target_time = inputs["target_time"]["includeDates"][0] + execution_interval = inputs["execution_interval"] + + target_unit_id = target_spatial_units[0] + response = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( - inputs["reference_indicator_id"], inputs["target_spatial_unit_id"]) + reference_indicator_id, target_unit_id) # Iterate population for feat in response: feature_id = feat["fid"] - ref_value = feat[f"DATE_{inputs['target_date']}"] + ref_value = feat[f"DATE_{target_time}"] if ref_value is None: # TODO: None must be None and not 0. Otherwise calculated indicator is misleading and # calculation may cause an error @@ -100,11 +96,11 @@ def run(self, # Iterate unemployed response = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( - inputs["indicator_id"], inputs["target_spatial_unit_id"]) + target_indicator_id, target_unit_id) for feat in response: feature_id = feat["fid"] - value = feat[f"DATE_{inputs['target_date']}"] + value = feat[f"DATE_{target_time}"] if value is None: logger.error(f"WARNING: the feature with featureID '{feature_id}' does not contain a " f"time series value for targetDate '{inputs['target_date']}") @@ -112,7 +108,29 @@ def run(self, # Calculate percentage indicator[feature_id]["value"] = (float(value) / indicator[feature_id]["ref_value"]) * 100 - return JobStatus.successful, {"result": indicator} + result = { + "jobSummary": [], + "results": [ + { + "applicableSpatialUnit": target_unit_id, + "indicatorValues": [] + } + ] + } + for feature in indicator.values(): + result["results"][0]["indicatorValues"].append( + { + "spatialReferenceKey": feature["feature_id"], + "valueMapping": [ + { + "indicatorValue": feature["value"], + "timestamp": target_time + } + ] + } + ) + + return JobStatus.successful, result except ApiException as e: logger.error(f"Exception when calling DataManagementAPI: {e}") return JobStatus.failed, None From ddfc1178ce6f2813173ea1dcfc26c3ccf7f5f831 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Tue, 28 Jan 2025 10:10:20 +0100 Subject: [PATCH 10/18] Add helper functions for converting timeseries data to DataFrames and vice versa --- processor/process/util/dataio.py | 88 ++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 processor/process/util/dataio.py diff --git a/processor/process/util/dataio.py b/processor/process/util/dataio.py new file mode 100644 index 0000000..c5f15c1 --- /dev/null +++ b/processor/process/util/dataio.py @@ -0,0 +1,88 @@ +import pandas as pd + +def indicator_timeseries_to_dataframe(timeseries: list, date_col: str = "date", value_col: str = "value") -> pd.DataFrame: + """Creates a pd.DataFrame from indicator timeseries values for a single spatial unit + + Args: + timeseries: Timeseries data for a single spatial unit + date_col: Name of the date column. Default: "date" + value_col: Name of the column that contains timeseries values. Default: "value" + + Returns: + A pd.DataFrame that contains timeseries values for all features of a single spatial unit + + """ + df = pd.DataFrame(timeseries) + df = df.melt(id_vars=["ID", "NAME", "arisenFrom", "fid", "validStartDate", "validEndDate"], + var_name=date_col, value_name=value_col) + df = df[["ID", date_col, value_col]] + df["date"] = pd.to_datetime(df["date"], format="DATE_%Y-%m-%d") + return df + + +def dataframe_to_indicator_timeseries(df: pd.DataFrame, id_col: str = "ID", date_col: str = "date", value_col: str = "result") -> dict: + """Creates indicator timeseries results as dict from a pd.DataFrame + + Args: + df: DataFrame that contains indicator timeseries values for several features of a single spatial unit + id_col: Name of the ID column. Default: "ID" + date_col:: Name of the date column: Default: "date" + value_col: Name of the column that contains the timeseries values. Default: "result" + + Returns: + Timeseries results for a single spatial unit as dict such as: + + { + "applicableSpatialUnit": "dictricts", + "indicatorValues": [ + { + "spatialReferenceKey": "1", + "valueMapping": [ + { + "indicatorValue": 0, + "timestamp": "2025-01-15" + }, + { + "indicatorValue": 1, + "timestamp": "2024-01-15" + } + ] + }, + { + "spatialReferenceKey": "2", + "valueMapping": [ + { + "indicatorValue": 10, + "timestamp": "2025-01-15" + }, + { + "indicatorValue": 6, + "timestamp": "2024-01-15" + } + ] + }, + ... // for all 50 disctricts + ] + } + + """ + grouped_df = df.groupby(id_col) + + results = { + "applicableSpatialUnit": "stadtteile", + "indicatorValues": [] + } + + for feature_id, group in grouped_df: + feature_result = { + "spatialReferenceKey": feature_id, + "valueMapping": [] + } + for index, row in group.iterrows(): + timestep_result = { + "indicatorValue": row[value_col], + "timestamp": row[date_col].strftime("%Y-%m-%d") + } + feature_result["valueMapping"].append(timestep_result) + results["indicatorValues"].append(feature_result) + return results From e881b49f88d3ee691c6190a9ee3601a248dc3c62 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Tue, 28 Jan 2025 10:11:18 +0100 Subject: [PATCH 11/18] Reimplement process by using pandas functions --- .../process/kommonitor/percentage_share.py | 71 ++++++------------- 1 file changed, 21 insertions(+), 50 deletions(-) diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index a85eeb3..0e9afba 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -3,11 +3,15 @@ import openapi_client from openapi_client import ApiClient from openapi_client.rest import ApiException +# from pandas import isnull +import pandas as pd + from pygeoapi.process.base import * from pygeoapi.util import JobStatus - from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, Parameter, AdditionalProcessIOParameters, OutputExecutionResultInternal, ProcessOutput + from ..base import KommonitorProcess, KommonitorProcessConfig +from ..util import dataio class PercentageShare(KommonitorProcess): @@ -73,62 +77,29 @@ def run(self, target_unit_id = target_spatial_units[0] + data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(target_indicator_id, target_unit_id) + ref_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(reference_indicator_id, target_unit_id) + + indicator_df = dataio.indicator_timeseries_to_dataframe(data) + ref_indicator_df = dataio.indicator_timeseries_to_dataframe(ref_data) + + merged_df = indicator_df.merge(ref_indicator_df, how="left", on=["ID", "date"], + suffixes=("_target", "_ref")) + merged_df = merged_df.dropna(subset=["value_target", "value_ref"]) + + merged_df["value_target"] = pd.to_numeric(merged_df["value_target"]) + merged_df["value_ref"] = pd.to_numeric(merged_df["value_ref"]) + + merged_df["result"] = merged_df["value_target"] / merged_df["value_ref"] * 100 - response = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( - reference_indicator_id, target_unit_id) - # Iterate population - for feat in response: - feature_id = feat["fid"] - - ref_value = feat[f"DATE_{target_time}"] - if ref_value is None: - # TODO: None must be None and not 0. Otherwise calculated indicator is misleading and - # calculation may cause an error - ref_value = 0 - logger.error(f"WARNING: the feature with featureID '{feature_id}' does not contain a " - f"time series value for targetDate '{inputs['target_date']}") - - indicator[feature_id] = { - "feature_id": feature_id, - "value": None, - "ref_value": float(ref_value) - } - - # Iterate unemployed - response = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( - target_indicator_id, target_unit_id) - for feat in response: - feature_id = feat["fid"] - - value = feat[f"DATE_{target_time}"] - if value is None: - logger.error(f"WARNING: the feature with featureID '{feature_id}' does not contain a " - f"time series value for targetDate '{inputs['target_date']}") - else: - # Calculate percentage - indicator[feature_id]["value"] = (float(value) / indicator[feature_id]["ref_value"]) * 100 + ts_result = dataio.dataframe_to_indicator_timeseries(merged_df) result = { "jobSummary": [], "results": [ - { - "applicableSpatialUnit": target_unit_id, - "indicatorValues": [] - } + ts_result ] } - for feature in indicator.values(): - result["results"][0]["indicatorValues"].append( - { - "spatialReferenceKey": feature["feature_id"], - "valueMapping": [ - { - "indicatorValue": feature["value"], - "timestamp": target_time - } - ] - } - ) return JobStatus.successful, result except ApiException as e: From e90c6ffc13a13a97694192b45207ed040c6d83a5 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Wed, 29 Jan 2025 16:21:15 +0100 Subject: [PATCH 12/18] Adapt OpenAPI client models in accordance to API spec --- .../openapi_client/models/common_metadata_type.py | 2 +- .../openapi_client/models/indicator_overview_type.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/data-management-client/openapi_client/models/common_metadata_type.py b/data-management-client/openapi_client/models/common_metadata_type.py index 636fb83..e3c1cea 100644 --- a/data-management-client/openapi_client/models/common_metadata_type.py +++ b/data-management-client/openapi_client/models/common_metadata_type.py @@ -38,7 +38,7 @@ class CommonMetadataType(BaseModel): last_update: Optional[date] = Field(default=None, description="a timestamp representing the lastUpdate according to ISO 8601 (e.g. 2018-01-30)", alias="lastUpdate") literature: Optional[StrictStr] = Field(default=None, description="an optional hint to literature about the dataset (e.g. URL or book/article name)") note: Optional[StrictStr] = Field(default=None, description="an optional note with background information about the dataset") - srid_epsg: Union[StrictFloat, StrictInt] = Field(description="the coordinate reference system of the dataset as EPSG code", alias="sridEPSG") + srid_epsg: Optional[Union[StrictFloat, StrictInt]] = Field(description="the coordinate reference system of the dataset as EPSG code", alias="sridEPSG") update_interval: StrictStr = Field(alias="updateInterval") __properties: ClassVar[List[str]] = ["contact", "databasis", "datasource", "description", "lastUpdate", "literature", "note", "sridEPSG", "updateInterval"] diff --git a/data-management-client/openapi_client/models/indicator_overview_type.py b/data-management-client/openapi_client/models/indicator_overview_type.py index 0520340..1547b7a 100644 --- a/data-management-client/openapi_client/models/indicator_overview_type.py +++ b/data-management-client/openapi_client/models/indicator_overview_type.py @@ -37,11 +37,11 @@ class IndicatorOverviewType(BaseModel): """ IndicatorOverviewType """ # noqa: E501 - abbreviation: StrictStr = Field(description="abbreviated mark of the indicator") + abbreviation: Optional[StrictStr] = Field(description="abbreviated mark of the indicator") allowed_roles: Optional[List[StrictStr]] = Field(default=None, description="list of role identifiers that have read access rights for this dataset", alias="allowedRoles") applicable_dates: List[StrictStr] = Field(description="array of applicable dates (year and month and day as YEAR-MONTH-DAY) according to ISO 8601 (e.g. 2018-01-30)", alias="applicableDates") applicable_spatial_units: List[IndicatorSpatialUnitJoinItem] = Field(description="array of spatial unit levels for which the dataset is applicable", alias="applicableSpatialUnits") - characteristic_value: StrictStr = Field(description="the distuingishing characteristic value of the indicator", alias="characteristicValue") + characteristic_value: Optional[StrictStr] = Field(description="the distuingishing characteristic value of the indicator", alias="characteristicValue") creation_type: StrictStr = Field(description="indicates if the data is simply inserted (INSERTION), computed by an automated script (COMPUTATION) or automatically aggregated by a script (AGGREGATION)", alias="creationType") default_classification_mapping: Optional[DefaultClassificationMappingType] = Field(default=None, alias="defaultClassificationMapping") indicator_id: StrictStr = Field(description="unique identifier of this resource", alias="indicatorId") @@ -52,7 +52,7 @@ class IndicatorOverviewType(BaseModel): lowest_spatial_unit_for_computation: Optional[StrictStr] = Field(default=None, description="identifier/name of the lowest spatial unit for which the indicator can be computed and thus is available (only necessary for computable indicators)", alias="lowestSpatialUnitForComputation") metadata: CommonMetadataType ogc_services: List[OgcServicesType] = Field(description="list of available OGC services for that indicator for different spatial units", alias="ogcServices") - process_description: StrictStr = Field(description="description about how the indicator was computed", alias="processDescription") + process_description: Optional[StrictStr] = Field(description="description about how the indicator was computed", alias="processDescription") reference_date_note: Optional[StrictStr] = Field(default=None, description="an optional note on the reference date of the indicator", alias="referenceDateNote") display_order: Optional[Union[StrictFloat, StrictInt]] = Field(default=None, description="an order number to control display order in clients", alias="displayOrder") referenced_georesources: Optional[List[GeoresourceReferenceType]] = Field(default=None, description="list of references to georesources", alias="referencedGeoresources") From 4c19e7b2121f97f98d3fd427b39c2859448bf2cb Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Wed, 29 Jan 2025 16:21:57 +0100 Subject: [PATCH 13/18] Add data frame utility methods --- processor/process/util/dataio.py | 73 +++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 7 deletions(-) diff --git a/processor/process/util/dataio.py b/processor/process/util/dataio.py index c5f15c1..1a18153 100644 --- a/processor/process/util/dataio.py +++ b/processor/process/util/dataio.py @@ -1,4 +1,8 @@ import pandas as pd +import itertools + +from openapi_client import IndicatorOverviewType + def indicator_timeseries_to_dataframe(timeseries: list, date_col: str = "date", value_col: str = "value") -> pd.DataFrame: """Creates a pd.DataFrame from indicator timeseries values for a single spatial unit @@ -20,7 +24,7 @@ def indicator_timeseries_to_dataframe(timeseries: list, date_col: str = "date", return df -def dataframe_to_indicator_timeseries(df: pd.DataFrame, id_col: str = "ID", date_col: str = "date", value_col: str = "result") -> dict: +def dataframe_to_indicator_timeseries(df: pd.DataFrame, id_col: str = "ID", date_col: str = "date", value_col: str = "result") -> list: """Creates indicator timeseries results as dict from a pd.DataFrame Args: @@ -30,7 +34,7 @@ def dataframe_to_indicator_timeseries(df: pd.DataFrame, id_col: str = "ID", date value_col: Name of the column that contains the timeseries values. Default: "result" Returns: - Timeseries results for a single spatial unit as dict such as: + Timeseries results for a single spatial unit as list of dicts such as: { "applicableSpatialUnit": "dictricts", @@ -68,10 +72,7 @@ def dataframe_to_indicator_timeseries(df: pd.DataFrame, id_col: str = "ID", date """ grouped_df = df.groupby(id_col) - results = { - "applicableSpatialUnit": "stadtteile", - "indicatorValues": [] - } + results = [] for feature_id, group in grouped_df: feature_result = { @@ -84,5 +85,63 @@ def dataframe_to_indicator_timeseries(df: pd.DataFrame, id_col: str = "ID", date "timestamp": row[date_col].strftime("%Y-%m-%d") } feature_result["valueMapping"].append(timestep_result) - results["indicatorValues"].append(feature_result) + results.append(feature_result) return results + + +def get_unique_dates(dates: list) -> set: + return set(itertools.chain.from_iterable(dates)) + + +def get_applicable_dates_from_metadata_as_dataframe(metadata_list: list[IndicatorOverviewType]) -> pd.DataFrame: + date_list = [] + date_dict_list = [] + for m in metadata_list: + date_list.append(m.applicable_dates) + date_dict_list.append({"id": m.indicator_id, m.indicator_id: m.applicable_dates}) + unique_dates = get_unique_dates(date_list) + df = pd.DataFrame(unique_dates, columns=["date"]).sort_values(by="date") + for d in date_dict_list: + df = df.merge(pd.DataFrame(d)[[d["id"]]], left_on="date", right_on=d["id"], how="outer") + return df + + +def get_missing_target_dates(df: pd.DataFrame, target_id: str, input_ids: list[str], drop_input_na: bool, mode: str = "MISSING", include_dates: list =[], exclude_dates: list = []): + if drop_input_na: + df = df.dropna(subset=input_ids) + if mode == "MISSING": + df = df[["date"]][df[target_id].isnull()] + return list(df["date"][~df["date"].isin(exclude_dates)]) + elif mode == "ALL": + return list(df["date"][~df["date"].isin(exclude_dates)]) + elif mode == "DATES": + if drop_input_na: + return list(df["date"][df["date"].isin(include_dates)]) + else: + return include_dates + + +def get_missing_input_timestamps(target_dates: list, df: pd.DataFrame, input_ids: list): + target_dates_series = pd.Series(target_dates) + missing_timestamp_list = [] + for id in input_ids: + missing_timestamps = list(target_dates_series[~target_dates_series.isin(df[id])]) + if(len(missing_timestamps) > 0): + missing_timestamp_list.append({"id": id, "missingTimestamps": missing_timestamps}) + return missing_timestamp_list + + +def get_missing_timestamps_error(missing_timestamps: list, resource_type: str): + error_list = [] + for t in missing_timestamps: + error_list.append( + { + "type": "missingTimestamp", + "affectedResourceType": resource_type, + "affectedDatasetId": t["id"], + "affectedTimestamps": t["missingTimestamps"], + "affectedSpatialUnitFeatures": [], + "errorMessage": "Timestamps are missing for one or more input datasets" + } + ) + return error_list \ No newline at end of file From e4aa06e7ee71c094c3e9650ec60103ae71088408 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Wed, 29 Jan 2025 16:22:13 +0100 Subject: [PATCH 14/18] Refine persentage share process --- .../process/kommonitor/percentage_share.py | 85 +++++++++++++------ 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index 0e9afba..600cf52 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -30,6 +30,7 @@ class PercentageShare(KommonitorProcess): "execution_interval": { "cron": "0 0 1 * *" }, + "base_indicator_id": "bbbbb04-dc45-58d3-a801-d6b4b00fbbbbb", "reference_indicator_id": "aaaaa04-cc57-48d3-a801-d6b4b00faaaa" }, additional_parameters=AdditionalProcessIOParameters( @@ -49,6 +50,10 @@ class PercentageShare(KommonitorProcess): ProcessJobControlOption.ASYNC_EXECUTE, ], inputs=KommonitorProcess.common_inputs | { + "base_indicator_id": ProcessInput( + title="base_indicator_id", + schema_=ProcessIOSchema(type_=ProcessIOType.STRING) + ), "reference_indicator_id": ProcessInput( title="reference_indicator_id", schema_=ProcessIOSchema(type_=ProcessIOType.STRING) @@ -70,37 +75,67 @@ def run(self, indicators_controller = openapi_client.IndicatorsControllerApi(data_management_client) indicator = {} target_indicator_id = inputs["target_indicator_id"] - reference_indicator_id = inputs["reference_indicator_id"] + base_indicator_id = inputs["base_indicator_id"] + ref_indicator_id = inputs["reference_indicator_id"] target_spatial_units = inputs["target_spatial_units"] - target_time = inputs["target_time"]["includeDates"][0] - execution_interval = inputs["execution_interval"] - - target_unit_id = target_spatial_units[0] - - data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(target_indicator_id, target_unit_id) - ref_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(reference_indicator_id, target_unit_id) - - indicator_df = dataio.indicator_timeseries_to_dataframe(data) - ref_indicator_df = dataio.indicator_timeseries_to_dataframe(ref_data) - - merged_df = indicator_df.merge(ref_indicator_df, how="left", on=["ID", "date"], - suffixes=("_target", "_ref")) - merged_df = merged_df.dropna(subset=["value_target", "value_ref"]) - - merged_df["value_target"] = pd.to_numeric(merged_df["value_target"]) - merged_df["value_ref"] = pd.to_numeric(merged_df["value_ref"]) - - merged_df["result"] = merged_df["value_target"] / merged_df["value_ref"] * 100 - - ts_result = dataio.dataframe_to_indicator_timeseries(merged_df) + exclude_dates = inputs["target_time"]["excludeDates"] if "excludeDates" in inputs["target_time"] else [] + include_dates = inputs["target_time"]["includeDates"] if "includeDates" in inputs["target_time"] else [] result = { "jobSummary": [], - "results": [ - ts_result - ] + "results": [] } + target_indicator_metadata = indicators_controller.get_indicator_by_id(target_indicator_id) + base_indicator_metadata = indicators_controller.get_indicator_by_id(base_indicator_id) + ref_indicator_metadata = indicators_controller.get_indicator_by_id(ref_indicator_id) + + dates_df = dataio.get_applicable_dates_from_metadata_as_dataframe([target_indicator_metadata, base_indicator_metadata, ref_indicator_metadata]) + + # Candidate target dates are all dates, which potentially should be calculated depending on the execution mode + candidate_target_dates = dataio.get_missing_target_dates(dates_df, target_indicator_id, [base_indicator_id, ref_indicator_id], + drop_input_na=False, + mode=inputs["target_time"]["mode"], + exclude_dates=exclude_dates, + include_dates=include_dates) + # Computable target dates ar all dates, which potentially should be calculated depending on the execution mode and for which all inputs have an existing value + computable_target_dates = dataio.get_missing_target_dates(dates_df, target_indicator_id, [base_indicator_id, ref_indicator_id], + drop_input_na=True, + mode=inputs["target_time"]["mode"], + exclude_dates=exclude_dates, + include_dates=include_dates) + + # determine missing timestamps for input datasets to add errors to jobSummary + missing_input_timestamps = dataio.get_missing_input_timestamps(candidate_target_dates, dates_df, [base_indicator_id, ref_indicator_id]) + result["jobSummary"].append(dataio.get_missing_timestamps_error(missing_input_timestamps,resource_type="indicator")) + + for target_unit_id in target_spatial_units: + + base_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(base_indicator_id, target_unit_id) + ref_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(ref_indicator_id, target_unit_id) + + indicator_df = dataio.indicator_timeseries_to_dataframe(base_data) + ref_indicator_df = dataio.indicator_timeseries_to_dataframe(ref_data) + + merged_df = indicator_df.merge(ref_indicator_df, how="left", on=["ID", "date"], + suffixes=("_target", "_ref")) + merged_df = merged_df[merged_df["date"].isin(computable_target_dates)] + merged_df = merged_df.dropna(subset=["value_target", "value_ref"]) + # TODO create error for missing features + merged_df["value_target"] = pd.to_numeric(merged_df["value_target"]) + merged_df["value_ref"] = pd.to_numeric(merged_df["value_ref"]) + + merged_df["result"] = merged_df["value_target"] / merged_df["value_ref"] * 100 + + ts_result = dataio.dataframe_to_indicator_timeseries(merged_df) + + result["results"].append( + { + "applicableSpatialUnit": target_unit_id, + "indicatorValues": ts_result + } + ) + return JobStatus.successful, result except ApiException as e: logger.error(f"Exception when calling DataManagementAPI: {e}") From 2d76011e2d3617e6fb4e8d9e8c1bb765dcdbd298 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Thu, 30 Jan 2025 10:56:31 +0100 Subject: [PATCH 15/18] Raise libs --- requirements.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 6cbc0f4..297bab8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,6 @@ pygeofilter Flask~=2.2.3 oauthlib~=3.2.2 prefect -geopandas~=0.14.1 griffe >= 0.20.0, <0.48.0 gunicorn pydantic @@ -13,4 +12,6 @@ urllib3~=1.26.15 python-dateutil~=2.8.2 setuptools~=67.6.0 rtree -filelock \ No newline at end of file +filelock +geopandas~=1.0.1 +pandas~=2.2.3 \ No newline at end of file From 6465139d233070c5597df012aecd12cd44929e19 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Thu, 30 Jan 2025 10:56:50 +0100 Subject: [PATCH 16/18] Adapt model classes to Data Management API schema --- .../openapi_client/models/georesource_reference_type.py | 4 ++-- .../openapi_client/models/indicator_reference_type.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/data-management-client/openapi_client/models/georesource_reference_type.py b/data-management-client/openapi_client/models/georesource_reference_type.py index fcee795..6efe9b5 100644 --- a/data-management-client/openapi_client/models/georesource_reference_type.py +++ b/data-management-client/openapi_client/models/georesource_reference_type.py @@ -19,7 +19,7 @@ import json -from typing import Any, ClassVar, Dict, List +from typing import Any, ClassVar, Dict, List, Optional from pydantic import BaseModel, StrictStr from pydantic import Field try: @@ -31,7 +31,7 @@ class GeoresourceReferenceType(BaseModel): """ a reference to georesource, e.g. a resource that is used to compute the main indicator """ # noqa: E501 - referenced_georesource_description: StrictStr = Field(description="a meaningful description of how the referenced georesource is related to the main indicator", alias="referencedGeoresourceDescription") + referenced_georesource_description: Optional[StrictStr] = Field(description="a meaningful description of how the referenced georesource is related to the main indicator", alias="referencedGeoresourceDescription") referenced_georesource_id: StrictStr = Field(description="unique identifier of the referenced georesource", alias="referencedGeoresourceId") referenced_georesource_name: StrictStr = Field(description="the meaningful name of the referenced georesource", alias="referencedGeoresourceName") __properties: ClassVar[List[str]] = ["referencedGeoresourceDescription", "referencedGeoresourceId", "referencedGeoresourceName"] diff --git a/data-management-client/openapi_client/models/indicator_reference_type.py b/data-management-client/openapi_client/models/indicator_reference_type.py index 60dcbe8..efed1dc 100644 --- a/data-management-client/openapi_client/models/indicator_reference_type.py +++ b/data-management-client/openapi_client/models/indicator_reference_type.py @@ -19,7 +19,7 @@ import json -from typing import Any, ClassVar, Dict, List +from typing import Any, ClassVar, Dict, List, Optional from pydantic import BaseModel, StrictStr from pydantic import Field try: @@ -31,7 +31,7 @@ class IndicatorReferenceType(BaseModel): """ a reference to another indicator, e.g. a sub-indicator that is used to compute the main indicator """ # noqa: E501 - referenced_indicator_description: StrictStr = Field(description="a meaningful description of how the referenced indicator is related to the main indicator", alias="referencedIndicatorDescription") + referenced_indicator_description: Optional[StrictStr] = Field(description="a meaningful description of how the referenced indicator is related to the main indicator", alias="referencedIndicatorDescription") referenced_indicator_id: StrictStr = Field(description="unique identifier of the referenced indicator", alias="referencedIndicatorId") referenced_indicator_name: StrictStr = Field(description="the meaningful name of the referenced indicator", alias="referencedIndicatorName") __properties: ClassVar[List[str]] = ["referencedIndicatorDescription", "referencedIndicatorId", "referencedIndicatorName"] From 83026635da42bff7524dda5964f16dd32d18c3ba Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Thu, 30 Jan 2025 10:59:28 +0100 Subject: [PATCH 17/18] Add helper class for creating job summaries --- processor/process/util/dataio.py | 16 -------------- processor/process/util/job_summary.py | 30 +++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 16 deletions(-) create mode 100644 processor/process/util/job_summary.py diff --git a/processor/process/util/dataio.py b/processor/process/util/dataio.py index 1a18153..bf374b2 100644 --- a/processor/process/util/dataio.py +++ b/processor/process/util/dataio.py @@ -129,19 +129,3 @@ def get_missing_input_timestamps(target_dates: list, df: pd.DataFrame, input_ids if(len(missing_timestamps) > 0): missing_timestamp_list.append({"id": id, "missingTimestamps": missing_timestamps}) return missing_timestamp_list - - -def get_missing_timestamps_error(missing_timestamps: list, resource_type: str): - error_list = [] - for t in missing_timestamps: - error_list.append( - { - "type": "missingTimestamp", - "affectedResourceType": resource_type, - "affectedDatasetId": t["id"], - "affectedTimestamps": t["missingTimestamps"], - "affectedSpatialUnitFeatures": [], - "errorMessage": "Timestamps are missing for one or more input datasets" - } - ) - return error_list \ No newline at end of file diff --git a/processor/process/util/job_summary.py b/processor/process/util/job_summary.py new file mode 100644 index 0000000..d9ebbd3 --- /dev/null +++ b/processor/process/util/job_summary.py @@ -0,0 +1,30 @@ +from openapi_client import ApiException + + +def get_missing_timestamps_error(missing_timestamps: list, resource_type: str) -> list[dict]: + error_list = [] + for t in missing_timestamps: + error_list.append( + { + "type": "missingTimestamp", + "affectedResourceType": resource_type, + "affectedDatasetId": t["id"], + "affectedTimestamps": t["missingTimestamps"], + "affectedSpatialUnitFeatures": [], + "errorMessage": "Timestamps are missing for one or more input datasets" + } + ) + return error_list + + +def get_api_client_error(e: ApiException, dataset_id: str = None, resource_type: str = None) -> dict: + return { + "type": "dataManagementApiError", + "affectedResourceType": resource_type, + "dataManagementApiErrorCode": e.status, + "affectedDatasetId": dataset_id, + "affectedTimestamps": [], + "affectedSpatialUnitFeatures": [], + "errorMessage": e.data + } + From 3585a2dbc1fd1f8092ebd59265e6be8d22f7a6d1 Mon Sep 17 00:00:00 2001 From: Sebastian Drost Date: Thu, 30 Jan 2025 10:59:47 +0100 Subject: [PATCH 18/18] Refine process --- processor/process/base.py | 5 +-- .../process/kommonitor/percentage_share.py | 37 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/processor/process/base.py b/processor/process/base.py index 2f0810b..b1926bf 100644 --- a/processor/process/base.py +++ b/processor/process/base.py @@ -106,7 +106,7 @@ def store_output_as_file(job_id: str, output: dict) -> dict: storage_type = "LocalFileSystem" basepath = f"{PROCESS_RESULTS_DIR}" output_dir = get_storage(storage_type, basepath=basepath) - filename = f"percentage-share-result-{job_id}.json" + filename = f"result-{job_id}.json" output_dir.write_path(filename, json.dumps(output).encode('utf-8')) return { 'providers': { @@ -292,8 +292,7 @@ def process_flow( ## Run process status, outputs = p.run(config, logger, dmc) - ## Store output and return result - return store_output_as_file(job_id, outputs["results"]) + return store_output_as_file(job_id, outputs) @abc.abstractmethod def run(self, diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index 600cf52..d5a31a0 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -11,7 +11,7 @@ from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, Parameter, AdditionalProcessIOParameters, OutputExecutionResultInternal, ProcessOutput from ..base import KommonitorProcess, KommonitorProcessConfig -from ..util import dataio +from ..util import dataio, job_summary class PercentageShare(KommonitorProcess): @@ -71,9 +71,16 @@ def run(self, inputs = config.inputs logger.debug("Starting execution...") + result = { + "jobSummary": [], + "results": [] + } + current_indicator = "" + try: indicators_controller = openapi_client.IndicatorsControllerApi(data_management_client) - indicator = {} + + # Extract all relevant inputs target_indicator_id = inputs["target_indicator_id"] base_indicator_id = inputs["base_indicator_id"] ref_indicator_id = inputs["reference_indicator_id"] @@ -81,15 +88,12 @@ def run(self, exclude_dates = inputs["target_time"]["excludeDates"] if "excludeDates" in inputs["target_time"] else [] include_dates = inputs["target_time"]["includeDates"] if "includeDates" in inputs["target_time"] else [] - result = { - "jobSummary": [], - "results": [] - } - + # Fetch indicator metadata target_indicator_metadata = indicators_controller.get_indicator_by_id(target_indicator_id) base_indicator_metadata = indicators_controller.get_indicator_by_id(base_indicator_id) ref_indicator_metadata = indicators_controller.get_indicator_by_id(ref_indicator_id) + # Get a DataFrame that contains dates for all indicators dates_df = dataio.get_applicable_dates_from_metadata_as_dataframe([target_indicator_metadata, base_indicator_metadata, ref_indicator_metadata]) # Candidate target dates are all dates, which potentially should be calculated depending on the execution mode @@ -107,26 +111,36 @@ def run(self, # determine missing timestamps for input datasets to add errors to jobSummary missing_input_timestamps = dataio.get_missing_input_timestamps(candidate_target_dates, dates_df, [base_indicator_id, ref_indicator_id]) - result["jobSummary"].append(dataio.get_missing_timestamps_error(missing_input_timestamps,resource_type="indicator")) + if missing_input_timestamps: + result["jobSummary"].extend(job_summary.get_missing_timestamps_error(missing_input_timestamps,resource_type="indicator")) for target_unit_id in target_spatial_units: + # Fetch indicator timeseries data base_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(base_indicator_id, target_unit_id) ref_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(ref_indicator_id, target_unit_id) + # Create a DataFrame for each indicator timeseries data and merge it indicator_df = dataio.indicator_timeseries_to_dataframe(base_data) ref_indicator_df = dataio.indicator_timeseries_to_dataframe(ref_data) merged_df = indicator_df.merge(ref_indicator_df, how="left", on=["ID", "date"], suffixes=("_target", "_ref")) + + # Cast indicator value columns to be numeric + merged_df["value_target"] = pd.to_numeric(merged_df["value_target"]) + merged_df["value_ref"] = pd.to_numeric(merged_df["value_ref"]) + + # Use computable target dates to filter DataFrame for relevant rows merged_df = merged_df[merged_df["date"].isin(computable_target_dates)] + # Drop rows where at least one input has an NA value merged_df = merged_df.dropna(subset=["value_target", "value_ref"]) # TODO create error for missing features - merged_df["value_target"] = pd.to_numeric(merged_df["value_target"]) - merged_df["value_ref"] = pd.to_numeric(merged_df["value_ref"]) + # Calculate percentage share merged_df["result"] = merged_df["value_target"] / merged_df["value_ref"] * 100 + # Convert DataFrame back again to required JSON format ts_result = dataio.dataframe_to_indicator_timeseries(merged_df) result["results"].append( @@ -139,4 +153,5 @@ def run(self, return JobStatus.successful, result except ApiException as e: logger.error(f"Exception when calling DataManagementAPI: {e}") - return JobStatus.failed, None + result["jobSummary"].append(job_summary.get_api_client_error(e)) + return JobStatus.failed, result