diff --git a/data-management-client/openapi_client/models/common_metadata_type.py b/data-management-client/openapi_client/models/common_metadata_type.py index 636fb83..e3c1cea 100644 --- a/data-management-client/openapi_client/models/common_metadata_type.py +++ b/data-management-client/openapi_client/models/common_metadata_type.py @@ -38,7 +38,7 @@ class CommonMetadataType(BaseModel): last_update: Optional[date] = Field(default=None, description="a timestamp representing the lastUpdate according to ISO 8601 (e.g. 2018-01-30)", alias="lastUpdate") literature: Optional[StrictStr] = Field(default=None, description="an optional hint to literature about the dataset (e.g. URL or book/article name)") note: Optional[StrictStr] = Field(default=None, description="an optional note with background information about the dataset") - srid_epsg: Union[StrictFloat, StrictInt] = Field(description="the coordinate reference system of the dataset as EPSG code", alias="sridEPSG") + srid_epsg: Optional[Union[StrictFloat, StrictInt]] = Field(description="the coordinate reference system of the dataset as EPSG code", alias="sridEPSG") update_interval: StrictStr = Field(alias="updateInterval") __properties: ClassVar[List[str]] = ["contact", "databasis", "datasource", "description", "lastUpdate", "literature", "note", "sridEPSG", "updateInterval"] diff --git a/data-management-client/openapi_client/models/georesource_reference_type.py b/data-management-client/openapi_client/models/georesource_reference_type.py index fcee795..6efe9b5 100644 --- a/data-management-client/openapi_client/models/georesource_reference_type.py +++ b/data-management-client/openapi_client/models/georesource_reference_type.py @@ -19,7 +19,7 @@ import json -from typing import Any, ClassVar, Dict, List +from typing import Any, ClassVar, Dict, List, Optional from pydantic import BaseModel, StrictStr from pydantic import Field try: @@ -31,7 +31,7 @@ class GeoresourceReferenceType(BaseModel): """ a reference to georesource, e.g. a resource that is used to compute the main indicator """ # noqa: E501 - referenced_georesource_description: StrictStr = Field(description="a meaningful description of how the referenced georesource is related to the main indicator", alias="referencedGeoresourceDescription") + referenced_georesource_description: Optional[StrictStr] = Field(description="a meaningful description of how the referenced georesource is related to the main indicator", alias="referencedGeoresourceDescription") referenced_georesource_id: StrictStr = Field(description="unique identifier of the referenced georesource", alias="referencedGeoresourceId") referenced_georesource_name: StrictStr = Field(description="the meaningful name of the referenced georesource", alias="referencedGeoresourceName") __properties: ClassVar[List[str]] = ["referencedGeoresourceDescription", "referencedGeoresourceId", "referencedGeoresourceName"] diff --git a/data-management-client/openapi_client/models/indicator_overview_type.py b/data-management-client/openapi_client/models/indicator_overview_type.py index 0520340..1547b7a 100644 --- a/data-management-client/openapi_client/models/indicator_overview_type.py +++ b/data-management-client/openapi_client/models/indicator_overview_type.py @@ -37,11 +37,11 @@ class IndicatorOverviewType(BaseModel): """ IndicatorOverviewType """ # noqa: E501 - abbreviation: StrictStr = Field(description="abbreviated mark of the indicator") + abbreviation: Optional[StrictStr] = Field(description="abbreviated mark of the indicator") allowed_roles: Optional[List[StrictStr]] = Field(default=None, description="list of role identifiers that have read access rights for this dataset", alias="allowedRoles") applicable_dates: List[StrictStr] = Field(description="array of applicable dates (year and month and day as YEAR-MONTH-DAY) according to ISO 8601 (e.g. 2018-01-30)", alias="applicableDates") applicable_spatial_units: List[IndicatorSpatialUnitJoinItem] = Field(description="array of spatial unit levels for which the dataset is applicable", alias="applicableSpatialUnits") - characteristic_value: StrictStr = Field(description="the distuingishing characteristic value of the indicator", alias="characteristicValue") + characteristic_value: Optional[StrictStr] = Field(description="the distuingishing characteristic value of the indicator", alias="characteristicValue") creation_type: StrictStr = Field(description="indicates if the data is simply inserted (INSERTION), computed by an automated script (COMPUTATION) or automatically aggregated by a script (AGGREGATION)", alias="creationType") default_classification_mapping: Optional[DefaultClassificationMappingType] = Field(default=None, alias="defaultClassificationMapping") indicator_id: StrictStr = Field(description="unique identifier of this resource", alias="indicatorId") @@ -52,7 +52,7 @@ class IndicatorOverviewType(BaseModel): lowest_spatial_unit_for_computation: Optional[StrictStr] = Field(default=None, description="identifier/name of the lowest spatial unit for which the indicator can be computed and thus is available (only necessary for computable indicators)", alias="lowestSpatialUnitForComputation") metadata: CommonMetadataType ogc_services: List[OgcServicesType] = Field(description="list of available OGC services for that indicator for different spatial units", alias="ogcServices") - process_description: StrictStr = Field(description="description about how the indicator was computed", alias="processDescription") + process_description: Optional[StrictStr] = Field(description="description about how the indicator was computed", alias="processDescription") reference_date_note: Optional[StrictStr] = Field(default=None, description="an optional note on the reference date of the indicator", alias="referenceDateNote") display_order: Optional[Union[StrictFloat, StrictInt]] = Field(default=None, description="an order number to control display order in clients", alias="displayOrder") referenced_georesources: Optional[List[GeoresourceReferenceType]] = Field(default=None, description="list of references to georesources", alias="referencedGeoresources") diff --git a/data-management-client/openapi_client/models/indicator_reference_type.py b/data-management-client/openapi_client/models/indicator_reference_type.py index 60dcbe8..efed1dc 100644 --- a/data-management-client/openapi_client/models/indicator_reference_type.py +++ b/data-management-client/openapi_client/models/indicator_reference_type.py @@ -19,7 +19,7 @@ import json -from typing import Any, ClassVar, Dict, List +from typing import Any, ClassVar, Dict, List, Optional from pydantic import BaseModel, StrictStr from pydantic import Field try: @@ -31,7 +31,7 @@ class IndicatorReferenceType(BaseModel): """ a reference to another indicator, e.g. a sub-indicator that is used to compute the main indicator """ # noqa: E501 - referenced_indicator_description: StrictStr = Field(description="a meaningful description of how the referenced indicator is related to the main indicator", alias="referencedIndicatorDescription") + referenced_indicator_description: Optional[StrictStr] = Field(description="a meaningful description of how the referenced indicator is related to the main indicator", alias="referencedIndicatorDescription") referenced_indicator_id: StrictStr = Field(description="unique identifier of the referenced indicator", alias="referencedIndicatorId") referenced_indicator_name: StrictStr = Field(description="the meaningful name of the referenced indicator", alias="referencedIndicatorName") __properties: ClassVar[List[str]] = ["referencedIndicatorDescription", "referencedIndicatorId", "referencedIndicatorName"] diff --git a/processor/app.py b/processor/app.py index 7096957..2c5b810 100644 --- a/processor/app.py +++ b/processor/app.py @@ -8,7 +8,7 @@ from flask import Flask, send_from_directory, request from werkzeug.utils import secure_filename -from auth import MyIntrospectTokenValidator +from auth import KomMonitorIntrospectTokenValidator if not os.getenv("PYGEOAPI_CONFIG"): os.environ["PYGEOAPI_CONFIG"] = os.path.join(os.path.dirname(__file__), "default-config.yml") @@ -16,10 +16,10 @@ os.environ["PYGEOAPI_OPENAPI"] = os.path.join(os.path.dirname(__file__), "default-openapi.yml") from pygeoapi import flask_app -from pygeoapi.flask_app import STATIC_FOLDER, API_RULES, CONFIG, api_, get_response +from pygeoapi.flask_app import STATIC_FOLDER, API_RULES, CONFIG, api_, processes_api, execute_from_flask require_oauth = ResourceProtector() -require_oauth.register_token_validator(MyIntrospectTokenValidator()) +require_oauth.register_token_validator(KomMonitorIntrospectTokenValidator()) APP = Flask(__name__, static_folder=STATIC_FOLDER, static_url_path='/static') APP.url_map.strict_slashes = API_RULES.strict_slashes @@ -35,8 +35,7 @@ def landing_page(): @APP.get('/processes/') @require_oauth() def get_processes(process_id=None): - return get_response(api_.describe_processes(request, process_id)) - + return flask_app.get_processes(process_id) @APP.post('/processes') @require_oauth() @@ -119,6 +118,7 @@ def parse_processes(package: str) -> None: """ processes = flask_app.api_.manager.processes for process in glob.glob(f"process/{package}/*.py"): + print(process) with open(process) as fh: root = ast.parse(fh.read()) for node in ast.iter_child_nodes(root): @@ -131,6 +131,7 @@ def parse_processes(package: str) -> None: } } flask_app.api_.manager.processes = processes + api_.config['resources'] = processes async def init(): @@ -151,9 +152,9 @@ async def init(): # await deployment.apply() pass +asyncio.run(init()) def run(): - asyncio.run(init()) APP.run(debug=False, host=api_.config['server']['bind']['host'], diff --git a/processor/auth.py b/processor/auth.py index 6824f2b..5bf27bb 100644 --- a/processor/auth.py +++ b/processor/auth.py @@ -14,7 +14,7 @@ KC_HOSTNAME_PATH = os.getenv('KC_HOSTNAME_PATH', "") -class MyIntrospectTokenValidator(IntrospectTokenValidator): +class KomMonitorIntrospectTokenValidator(IntrospectTokenValidator): def introspect_token(self, token_string): url = f"https://{KC_HOSTNAME}{KC_HOSTNAME_PATH}/realms/{KC_REALM_NAME}/protocol/openid-connect/token/introspect" data = {'token': token_string[7:], 'token_type_hint': 'access_token'}\ diff --git a/processor/default-config.yml b/processor/default-config.yml index 1abea1b..e2515de 100644 --- a/processor/default-config.yml +++ b/processor/default-config.yml @@ -38,7 +38,6 @@ server: languages: # First language is the default language - en-US - - fr-CA # cors: true pretty_print: true limit: 10 @@ -63,10 +62,8 @@ metadata: identification: title: en: pygeoapi default instance - fr: instance par défaut de pygeoapi description: en: pygeoapi provides an API to geospatial data - fr: pygeoapi fournit une API aux données géospatiales keywords: en: - geospatial diff --git a/processor/process/base.py b/processor/process/base.py index 4c93afd..b1926bf 100644 --- a/processor/process/base.py +++ b/processor/process/base.py @@ -1,25 +1,27 @@ import abc -import datetime as dt +import json import logging import os from dataclasses import dataclass +from enum import Enum from logging import Logger from typing import Dict, Union, Optional +import json import openapi_client import prefect -from prefect.filesystems import LocalFileSystem -from prefect.serializers import JSONSerializer + import requests -from flask import g from openapi_client import ApiClient from prefect import task, flow, get_run_logger from pygeoapi.util import JobStatus -from processor.auth import KC_CLIENT_ID, KC_CLIENT_SECRET, KC_HOSTNAME, KC_HOSTNAME_PATH, KC_REALM_NAME +from auth import KC_CLIENT_ID, KC_CLIENT_SECRET, KC_HOSTNAME, KC_HOSTNAME_PATH, KC_REALM_NAME from pygeoapi_prefect import schemas from pygeoapi_prefect.process.base import BasePrefectProcessor -from pygeoapi_prefect.schemas import JobStatusInfoInternal, OutputExecutionResultInternal +from pygeoapi_prefect.schemas import ProcessInput, ProcessIOSchema, ProcessIOType, ProcessIOFormat, ProcessOutput, \ + ExecutionQualifiedInputValue, ExecutionInputValueNoObject, ExecutionInputValueNoObjectArray +from pygeoapi_prefect.utils import get_storage @dataclass @@ -72,88 +74,239 @@ def data_management_client(logger: Logger, execute_request: schemas.ExecuteReque return openapi_client.ApiClient(configuration) +@task +def format_inputs(execution_request: schemas.ExecuteRequest): + inputs = {} + + for k, v in execution_request.inputs.items(): + if type(v) is ExecutionInputValueNoObject or type(v) is ExecutionInputValueNoObjectArray: + inputs[k] = v.model_dump() + elif type(v) is ExecutionQualifiedInputValue: + inputs[k] = v.model_dump()["value"] + else: + raise Exception("Unsupported input value!") + return inputs + + +@task +def setup_logging(job_id: str) -> Logger: + os.mkdir(f"results/{job_id}/") + log_path = f"results/{job_id}/log.txt" + + filelogger = logging.FileHandler(log_path) + filelogger.setLevel(logging.DEBUG) + logger = get_run_logger() + logger.logger.addHandler(filelogger) + logger.debug("Setup logging ...") + return logger + + +@task +def store_output_as_file(job_id: str, output: dict) -> dict: + storage_type = "LocalFileSystem" + basepath = f"{PROCESS_RESULTS_DIR}" + output_dir = get_storage(storage_type, basepath=basepath) + filename = f"result-{job_id}.json" + output_dir.write_path(filename, json.dumps(output).encode('utf-8')) + return { + 'providers': { + 'file_storage_provider': { + 'type': storage_type, + 'basepath': basepath + } + }, + 'results': [ + { + 'provider': 'file_storage_provider', + 'mime_type': 'text/plain', + 'location': f'{output_dir.basepath}/{filename}', + 'filename': filename + } + ] + } + + +class ExecutionErrorType(str, Enum): + MISSING_TIMESTAMP = "MISSING_TIMESTAMP" + MISSING_DATASET = "MISSING_DATASET" + MISSING_SPATIAL_UNIT = "MISSING_SPATIAL_UNIT" + MISSING_SPATIAL_UNIT_FEATURE = "MISSING_SPATIAL_UNIT_FEATURE" + DATAMANAGEMENT_API_ERROR = "DATAMANAGEMENT_API_ERROR" + PROCESSING_ERROR = "PROCESSING_ERROR" + + +class ExecutionMode(str, Enum): + MISSING = "MISSING" + ALL = "ALL" + DATES = "DATES" + + +class ExecutionResourceType(str, Enum): + GEORESOURCE = "GEORESOURCE" + INDICATOR = "INDICATOR" + + class KommonitorProcess(BasePrefectProcessor): result_storage_block = None - process_flow = flow - @staticmethod - @task - def format_inputs(execution_request: schemas.ExecuteRequest): - inputs = {} + common_inputs = { + "target_indicator_id": ProcessInput( + title="target_indicator_id", + schema_=ProcessIOSchema( + type_=ProcessIOType.STRING + ) + ), + "target_spatial_units": ProcessInput( + title="target_spatial_units", + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema(type_=ProcessIOType.STRING), + min_items=1 + ) + ), + "target_time ": ProcessInput( + title="target_time", + schema_=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["mode"], + properties={ + "mode": ProcessIOSchema(type_=ProcessIOType.STRING, enum=[ExecutionMode.MISSING, ExecutionMode.ALL, ExecutionMode.DATES]), + "includeDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)), + "excludeDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)) + }, + default={ + "mode": ExecutionMode.MISSING, + "includeDates": [], + "excludeDates": [] + } + ) + ), + "execution_interval ": ProcessInput( + title="execution_interval", + schema_=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["cron"], + properties={ + "cron": ProcessIOSchema(type_=ProcessIOType.STRING) + }, + default={ + "cron": "0 0 1 * *" + } + ), + ) + } - for k, v in execution_request.inputs.items(): - inputs[k] = v.root - return inputs + error_type = ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["type", "affectedResourceType", "affectedDatasetId", "affectedTimestamps", "affectedSpatialUnitFeatures", "errorMessage"], + properties={ + "type": ProcessIOSchema(type_=ProcessIOType.STRING, enum=[ExecutionErrorType.MISSING_TIMESTAMP, ExecutionErrorType.MISSING_DATASET, ExecutionErrorType.MISSING_SPATIAL_UNIT, ExecutionErrorType.MISSING_SPATIAL_UNIT_FEATURE, ExecutionErrorType.DATAMANAGEMENT_API_ERROR, ExecutionErrorType.PROCESSING_ERROR]), + "affectedResourceType": ProcessIOSchema(type_=ProcessIOType.STRING, enum=[ExecutionResourceType.INDICATOR, ExecutionResourceType.GEORESOURCE]), + "affectedDatasetId": ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.UUID), + "affectedTimestamps": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE)), + "affectedSpatialUnitFeatures": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING)), + "errorMessage": ProcessIOSchema(type_=ProcessIOType.STRING) + } + ) + ) - @staticmethod - @task - def setup_logging(job_id: str) -> Logger: - os.mkdir(f"{PROCESS_RESULTS_DIR}/{job_id}/") - log_path = f"{PROCESS_RESULTS_DIR}/{job_id}/log.txt" - filelogger = logging.FileHandler(log_path) - filelogger.setLevel(logging.DEBUG) - logger = get_run_logger() - logger.logger.addHandler(filelogger) - logger.debug("Setup logging ...") - return logger + indicator_value_type = ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["spatialReferenceKey", "valueMapping"], + properties={ + "spatialReferenceKey": ProcessIOSchema(type_=ProcessIOType.STRING), + "valueMapping": ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["indicatorValue", "timestamp"], + properties={ + "indicatorValue": ProcessIOSchema(type_=ProcessIOType.NUMBER), + "timestamp": ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE) + } + ) + ) + } + ) + + common_output = { + "jobSummary": ProcessOutput( + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + properties={ + "spatialUnitId": ProcessIOSchema(type_=ProcessIOType.STRING), + "modifiedResource": ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.URI), + "numberOfIntegratedIndicatorFeatures": ProcessIOSchema(type_=ProcessIOType.INTEGER), + "integratedTargetDates": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.DATE)), + "errorsOccurred": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=error_type) + } + ), + content_media_type= "application/json" + ) + ), + "results": ProcessOutput( + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema(type_=ProcessIOType.STRING, format_=ProcessIOFormat.URI), + content_media_type="application/json" + ) + ), + "resultData": ProcessOutput( + schema_=ProcessIOSchema( + type_=ProcessIOType.ARRAY, + items=ProcessIOSchema( + type_=ProcessIOType.OBJECT, + required=["applicableSpatialUnit", "indicatorValues"], + properties={ + "applicableSpatialUnit": ProcessIOSchema(type_=ProcessIOType.STRING), + "indicatorValues": ProcessIOSchema(type_=ProcessIOType.ARRAY, items=indicator_value_type), + } + ), + content_media_type="application/json" + ) + ) + } - @staticmethod - @task - def store_output(job_id: str, output: dict) -> str: - storage = LocalFileSystem() - serializer = JSONSerializer() - result_path = f"{PROCESS_RESULTS_DIR}/{job_id}/result.json" - result = serializer.dumps(output) - storage.write_path(result_path, result) - return result_path + + def __init__(self, processor_def: dict): + super().__init__(processor_def) + self.process_flow.__setattr__("processor", self) @staticmethod @flow(persist_result=True) def process_flow( - processor, job_id: str, - result_storage_block: str | None, - process_description: schemas.ProcessDescription, execution_request: schemas.ExecuteRequest - ) -> Union[schemas.JobStatusInfoInternal, prefect.states.State]: - print(processor) - + ) -> dict: ## Setup - logger = KommonitorProcess.setup_logging(job_id) - inputs = KommonitorProcess.format_inputs(execution_request) - + p = prefect.context.get_run_context().flow.__getattribute__("processor") + logger = setup_logging(job_id) + inputs = format_inputs(execution_request) config = KommonitorProcessConfig(job_id, inputs, f"{job_id}/output-result.txt") dmc = data_management_client(logger, execution_request, True) ## Run process - status, outputs = processor.run(config, logger, dmc) - - res_path = KommonitorProcess.store_output(job_id, outputs) - - ## Reformat output - return JobStatusInfoInternal( - jobID=job_id, - processID=process_description.id, - status=status, - updated=dt.datetime.now(), - generated_outputs={ - "result": schemas.OutputExecutionResultInternal( - location=res_path, - media_type=( - process_description.outputs["result"].schema_.content_media_type - ), - ) - }, - ) + status, outputs = p.run(config, logger, dmc) + + return store_output_as_file(job_id, outputs) - @staticmethod @abc.abstractmethod - def run(config: KommonitorProcessConfig, + def run(self, + config: KommonitorProcessConfig, logger: logging.Logger, - dmc: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): + dmc: ApiClient) -> (JobStatus, Dict): ... @property - @abc.abstractmethod def process_description(self) -> schemas.ProcessDescription: + description = self.detailed_process_description + return description + + @property + @abc.abstractmethod + def detailed_process_description(self) -> schemas.ProcessDescription: ... diff --git a/processor/process/kommonitor/hello_world.py b/processor/process/kommonitor/hello_world.py index 630131b..3bc261a 100644 --- a/processor/process/kommonitor/hello_world.py +++ b/processor/process/kommonitor/hello_world.py @@ -5,12 +5,12 @@ from pygeoapi.process.base import * from pygeoapi.util import JobStatus -from processor.process.base import KommonitorProcess, KommonitorProcessConfig +from ..base import KommonitorProcess, KommonitorProcessConfig from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, Parameter, AdditionalProcessIOParameters, OutputExecutionResultInternal, ProcessOutput class HelloWorld(KommonitorProcess): - process_description = ProcessDescription( + detailed_process_description = ProcessDescription( id="hello-world", version="1.0.0", title="Hello World! ", @@ -49,8 +49,8 @@ class HelloWorld(KommonitorProcess): }, ) - @staticmethod - def run(config: KommonitorProcessConfig, + def run(self, + config: KommonitorProcessConfig, logger: logging.Logger, data_management_client: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): @@ -66,6 +66,7 @@ def run(config: KommonitorProcessConfig, # 4.1 Return success and result return JobStatus.successful, None + except ApiException as e: # 4.2 Catch possible errors cleanly diff --git a/processor/process/kommonitor/percentage_share.py b/processor/process/kommonitor/percentage_share.py index 3ef34f3..d5a31a0 100644 --- a/processor/process/kommonitor/percentage_share.py +++ b/processor/process/kommonitor/percentage_share.py @@ -3,24 +3,35 @@ import openapi_client from openapi_client import ApiClient from openapi_client.rest import ApiException +# from pandas import isnull +import pandas as pd + from pygeoapi.process.base import * from pygeoapi.util import JobStatus - from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, Parameter, AdditionalProcessIOParameters, OutputExecutionResultInternal, ProcessOutput -from processor.process.base import KommonitorProcess, KommonitorProcessConfig -from processor.process.util.geojson import as_geojson + +from ..base import KommonitorProcess, KommonitorProcessConfig +from ..util import dataio, job_summary class PercentageShare(KommonitorProcess): - process_description = ProcessDescription( + detailed_process_description = ProcessDescription( id="percentage-share", version="0.0.1", title="Percentage share of an indicator to a reference indicator in a given spatial unit", example={ - "indicator_id": "cccca04-cc57-48d3-a801-d6b4b00fcccc", - "reference_indicator_id": "aaaaa04-cc57-48d3-a801-d6b4b00faaaa", - "target_spatial_unit_id": "bbbba04-cc57-48d3-a801-d6b4b00fbbbb", - "target_date": "2000-01-01" + "target_indicator_id": "cccca04-cc57-48d3-a801-d6b4b00fcccc", + "target_spatial_units": ["bbbba04-cc57-48d3-a801-d6b4b00fbbbb"], + "target_time": { + "mode": "MISSING", + "includeDates": [], + "excludeDates": [] + }, + "execution_interval": { + "cron": "0 0 1 * *" + }, + "base_indicator_id": "bbbbb04-dc45-58d3-a801-d6b4b00fbbbbb", + "reference_indicator_id": "aaaaa04-cc57-48d3-a801-d6b4b00faaaa" }, additional_parameters=AdditionalProcessIOParameters( parameters=[ @@ -34,86 +45,113 @@ class PercentageShare(KommonitorProcess): ), ] ), - jobControlOptions=[ + job_control_options=[ ProcessJobControlOption.SYNC_EXECUTE, ProcessJobControlOption.ASYNC_EXECUTE, ], - inputs={ - "indicator_id": ProcessInput( - title="indicator_id", - schema=ProcessIOSchema(type=ProcessIOType.STRING) + inputs=KommonitorProcess.common_inputs | { + "base_indicator_id": ProcessInput( + title="base_indicator_id", + schema_=ProcessIOSchema(type_=ProcessIOType.STRING) ), "reference_indicator_id": ProcessInput( title="reference_indicator_id", - schema=ProcessIOSchema(type=ProcessIOType.STRING) - ), - "target_spatial_unit_id": ProcessInput( - title="target_spatial_unit_id", - schema=ProcessIOSchema(type=ProcessIOType.STRING) - ), - "target_date": ProcessInput( - title="target_date", - schema=ProcessIOSchema(type=ProcessIOType.STRING) - ) - }, - outputs={ - "result": ProcessOutput( - schema=ProcessIOSchema( - type=ProcessIOType.OBJECT, - contentMediaType="application/json", - ) + schema_=ProcessIOSchema(type_=ProcessIOType.STRING) ) }, + outputs = KommonitorProcess.common_output ) def run(self, config: KommonitorProcessConfig, logger: logging.Logger, - data_management_client: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): + data_management_client: ApiClient) -> (JobStatus, Dict): # 1. Load inputs inputs = config.inputs logger.debug("Starting execution...") + result = { + "jobSummary": [], + "results": [] + } + current_indicator = "" + try: - georesources_controller = openapi_client.IndicatorsControllerApi(data_management_client) - indicator = {} - - response = georesources_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( - inputs["reference_indicator_id"], inputs["target_spatial_unit_id"]) - # Iterate population - for feat in response: - feature_id = feat["fid"] - - ref_value = feat[f"DATE_{inputs['target_date']}"] - if ref_value is None: - # TODO: None must be None and not 0. Otherwise calculated indicator is misleading and - # calculation may cause an error - ref_value = 0 - logger.error(f"WARNING: the feature with featureID '{feature_id}' does not contain a " - f"time series value for targetDate '{inputs['target_date']}") - - indicator[feature_id] = { - "feature_id": feature_id, - "value": None, - "ref_value": float(ref_value) - } - - # Iterate unemployed - response = georesources_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry( - inputs["indicator_id"], inputs["target_spatial_unit_id"]) - for feat in response: - feature_id = feat["fid"] - - value = feat[f"DATE_{inputs['target_date']}"] - if value is None: - logger.error(f"WARNING: the feature with featureID '{feature_id}' does not contain a " - f"time series value for targetDate '{inputs['target_date']}") - else: - # Calculate percentage - indicator[feature_id]["value"] = (float(value) / indicator[feature_id]["ref_value"]) * 100 - - return JobStatus.successful, {"result": indicator} + indicators_controller = openapi_client.IndicatorsControllerApi(data_management_client) + + # Extract all relevant inputs + target_indicator_id = inputs["target_indicator_id"] + base_indicator_id = inputs["base_indicator_id"] + ref_indicator_id = inputs["reference_indicator_id"] + target_spatial_units = inputs["target_spatial_units"] + exclude_dates = inputs["target_time"]["excludeDates"] if "excludeDates" in inputs["target_time"] else [] + include_dates = inputs["target_time"]["includeDates"] if "includeDates" in inputs["target_time"] else [] + + # Fetch indicator metadata + target_indicator_metadata = indicators_controller.get_indicator_by_id(target_indicator_id) + base_indicator_metadata = indicators_controller.get_indicator_by_id(base_indicator_id) + ref_indicator_metadata = indicators_controller.get_indicator_by_id(ref_indicator_id) + + # Get a DataFrame that contains dates for all indicators + dates_df = dataio.get_applicable_dates_from_metadata_as_dataframe([target_indicator_metadata, base_indicator_metadata, ref_indicator_metadata]) + + # Candidate target dates are all dates, which potentially should be calculated depending on the execution mode + candidate_target_dates = dataio.get_missing_target_dates(dates_df, target_indicator_id, [base_indicator_id, ref_indicator_id], + drop_input_na=False, + mode=inputs["target_time"]["mode"], + exclude_dates=exclude_dates, + include_dates=include_dates) + # Computable target dates ar all dates, which potentially should be calculated depending on the execution mode and for which all inputs have an existing value + computable_target_dates = dataio.get_missing_target_dates(dates_df, target_indicator_id, [base_indicator_id, ref_indicator_id], + drop_input_na=True, + mode=inputs["target_time"]["mode"], + exclude_dates=exclude_dates, + include_dates=include_dates) + + # determine missing timestamps for input datasets to add errors to jobSummary + missing_input_timestamps = dataio.get_missing_input_timestamps(candidate_target_dates, dates_df, [base_indicator_id, ref_indicator_id]) + if missing_input_timestamps: + result["jobSummary"].extend(job_summary.get_missing_timestamps_error(missing_input_timestamps,resource_type="indicator")) + + for target_unit_id in target_spatial_units: + + # Fetch indicator timeseries data + base_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(base_indicator_id, target_unit_id) + ref_data = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(ref_indicator_id, target_unit_id) + + # Create a DataFrame for each indicator timeseries data and merge it + indicator_df = dataio.indicator_timeseries_to_dataframe(base_data) + ref_indicator_df = dataio.indicator_timeseries_to_dataframe(ref_data) + + merged_df = indicator_df.merge(ref_indicator_df, how="left", on=["ID", "date"], + suffixes=("_target", "_ref")) + + # Cast indicator value columns to be numeric + merged_df["value_target"] = pd.to_numeric(merged_df["value_target"]) + merged_df["value_ref"] = pd.to_numeric(merged_df["value_ref"]) + + # Use computable target dates to filter DataFrame for relevant rows + merged_df = merged_df[merged_df["date"].isin(computable_target_dates)] + # Drop rows where at least one input has an NA value + merged_df = merged_df.dropna(subset=["value_target", "value_ref"]) + # TODO create error for missing features + + # Calculate percentage share + merged_df["result"] = merged_df["value_target"] / merged_df["value_ref"] * 100 + + # Convert DataFrame back again to required JSON format + ts_result = dataio.dataframe_to_indicator_timeseries(merged_df) + + result["results"].append( + { + "applicableSpatialUnit": target_unit_id, + "indicatorValues": ts_result + } + ) + + return JobStatus.successful, result except ApiException as e: logger.error(f"Exception when calling DataManagementAPI: {e}") - return JobStatus.failed, None + result["jobSummary"].append(job_summary.get_api_client_error(e)) + return JobStatus.failed, result diff --git a/processor/process/kommonitor/reproject.py b/processor/process/kommonitor/reproject.py index 9b45d20..bd10eb3 100644 --- a/processor/process/kommonitor/reproject.py +++ b/processor/process/kommonitor/reproject.py @@ -5,14 +5,14 @@ from pygeoapi.process.base import * from pygeoapi.util import JobStatus -from processor.process.base import KommonitorProcess, KommonitorProcessConfig +from ..base import KommonitorProcess, KommonitorProcessConfig from pygeoapi_prefect.schemas import ProcessInput, ProcessDescription, ProcessIOType, ProcessIOSchema, ProcessJobControlOption, OutputExecutionResultInternal import geopandas as gpd class Reproject(KommonitorProcess): - process_description = ProcessDescription( + detailed_process_description = ProcessDescription( id="reproject", version="0.0.2", title="Reproject Georesource and persists result to local file.", @@ -39,11 +39,40 @@ class Reproject(KommonitorProcess): title="output_georesource_name", schema=ProcessIOSchema(type=ProcessIOType.STRING) ), + "target_indicator_id": ProcessInput( + title="target_indicator_id", + schema=ProcessIOSchema(type=ProcessIOType.STRING) + ), + "target_spatial_units": ProcessInput( + title="target_spatial_units", + schema=ProcessIOSchema(type=ProcessIOType.ARRAY, items=[ProcessIOSchema(type=ProcessIOType.STRING)], + min_items=1) + ), + "target_time ": ProcessInput( + title="target_time", + schema=ProcessIOSchema( + type=ProcessIOType.OBJECT, + properties=ProcessIOSchema(allOf=[ProcessIOSchema(title="mode", type=ProcessIOType.STRING), + ProcessIOSchema(title="includeDates", type=ProcessIOType.ARRAY, + items=[ + ProcessIOSchema(type=ProcessIOType.STRING)]), + ProcessIOSchema(title="excludeDates", type=ProcessIOType.ARRAY, + items=[ + ProcessIOSchema(type=ProcessIOType.STRING)]) + ] + ) + ) + ), + "execution_interval ": ProcessInput( + title="execution_interval", + schema=ProcessIOSchema(type=ProcessIOType.STRING) + ) }, outputs={} ) - def run(config: KommonitorProcessConfig, + def run(self, + config: KommonitorProcessConfig, logger: logging.Logger, data_management_client: ApiClient) -> (JobStatus, Optional[Dict[str, OutputExecutionResultInternal]]): diff --git a/processor/process/util/dataio.py b/processor/process/util/dataio.py new file mode 100644 index 0000000..bf374b2 --- /dev/null +++ b/processor/process/util/dataio.py @@ -0,0 +1,131 @@ +import pandas as pd +import itertools + +from openapi_client import IndicatorOverviewType + + +def indicator_timeseries_to_dataframe(timeseries: list, date_col: str = "date", value_col: str = "value") -> pd.DataFrame: + """Creates a pd.DataFrame from indicator timeseries values for a single spatial unit + + Args: + timeseries: Timeseries data for a single spatial unit + date_col: Name of the date column. Default: "date" + value_col: Name of the column that contains timeseries values. Default: "value" + + Returns: + A pd.DataFrame that contains timeseries values for all features of a single spatial unit + + """ + df = pd.DataFrame(timeseries) + df = df.melt(id_vars=["ID", "NAME", "arisenFrom", "fid", "validStartDate", "validEndDate"], + var_name=date_col, value_name=value_col) + df = df[["ID", date_col, value_col]] + df["date"] = pd.to_datetime(df["date"], format="DATE_%Y-%m-%d") + return df + + +def dataframe_to_indicator_timeseries(df: pd.DataFrame, id_col: str = "ID", date_col: str = "date", value_col: str = "result") -> list: + """Creates indicator timeseries results as dict from a pd.DataFrame + + Args: + df: DataFrame that contains indicator timeseries values for several features of a single spatial unit + id_col: Name of the ID column. Default: "ID" + date_col:: Name of the date column: Default: "date" + value_col: Name of the column that contains the timeseries values. Default: "result" + + Returns: + Timeseries results for a single spatial unit as list of dicts such as: + + { + "applicableSpatialUnit": "dictricts", + "indicatorValues": [ + { + "spatialReferenceKey": "1", + "valueMapping": [ + { + "indicatorValue": 0, + "timestamp": "2025-01-15" + }, + { + "indicatorValue": 1, + "timestamp": "2024-01-15" + } + ] + }, + { + "spatialReferenceKey": "2", + "valueMapping": [ + { + "indicatorValue": 10, + "timestamp": "2025-01-15" + }, + { + "indicatorValue": 6, + "timestamp": "2024-01-15" + } + ] + }, + ... // for all 50 disctricts + ] + } + + """ + grouped_df = df.groupby(id_col) + + results = [] + + for feature_id, group in grouped_df: + feature_result = { + "spatialReferenceKey": feature_id, + "valueMapping": [] + } + for index, row in group.iterrows(): + timestep_result = { + "indicatorValue": row[value_col], + "timestamp": row[date_col].strftime("%Y-%m-%d") + } + feature_result["valueMapping"].append(timestep_result) + results.append(feature_result) + return results + + +def get_unique_dates(dates: list) -> set: + return set(itertools.chain.from_iterable(dates)) + + +def get_applicable_dates_from_metadata_as_dataframe(metadata_list: list[IndicatorOverviewType]) -> pd.DataFrame: + date_list = [] + date_dict_list = [] + for m in metadata_list: + date_list.append(m.applicable_dates) + date_dict_list.append({"id": m.indicator_id, m.indicator_id: m.applicable_dates}) + unique_dates = get_unique_dates(date_list) + df = pd.DataFrame(unique_dates, columns=["date"]).sort_values(by="date") + for d in date_dict_list: + df = df.merge(pd.DataFrame(d)[[d["id"]]], left_on="date", right_on=d["id"], how="outer") + return df + + +def get_missing_target_dates(df: pd.DataFrame, target_id: str, input_ids: list[str], drop_input_na: bool, mode: str = "MISSING", include_dates: list =[], exclude_dates: list = []): + if drop_input_na: + df = df.dropna(subset=input_ids) + if mode == "MISSING": + df = df[["date"]][df[target_id].isnull()] + return list(df["date"][~df["date"].isin(exclude_dates)]) + elif mode == "ALL": + return list(df["date"][~df["date"].isin(exclude_dates)]) + elif mode == "DATES": + if drop_input_na: + return list(df["date"][df["date"].isin(include_dates)]) + else: + return include_dates + + +def get_missing_input_timestamps(target_dates: list, df: pd.DataFrame, input_ids: list): + target_dates_series = pd.Series(target_dates) + missing_timestamp_list = [] + for id in input_ids: + missing_timestamps = list(target_dates_series[~target_dates_series.isin(df[id])]) + if(len(missing_timestamps) > 0): + missing_timestamp_list.append({"id": id, "missingTimestamps": missing_timestamps}) + return missing_timestamp_list diff --git a/processor/process/util/job_summary.py b/processor/process/util/job_summary.py new file mode 100644 index 0000000..d9ebbd3 --- /dev/null +++ b/processor/process/util/job_summary.py @@ -0,0 +1,30 @@ +from openapi_client import ApiException + + +def get_missing_timestamps_error(missing_timestamps: list, resource_type: str) -> list[dict]: + error_list = [] + for t in missing_timestamps: + error_list.append( + { + "type": "missingTimestamp", + "affectedResourceType": resource_type, + "affectedDatasetId": t["id"], + "affectedTimestamps": t["missingTimestamps"], + "affectedSpatialUnitFeatures": [], + "errorMessage": "Timestamps are missing for one or more input datasets" + } + ) + return error_list + + +def get_api_client_error(e: ApiException, dataset_id: str = None, resource_type: str = None) -> dict: + return { + "type": "dataManagementApiError", + "affectedResourceType": resource_type, + "dataManagementApiErrorCode": e.status, + "affectedDatasetId": dataset_id, + "affectedTimestamps": [], + "affectedSpatialUnitFeatures": [], + "errorMessage": e.data + } + diff --git a/requirements.txt b/requirements.txt index ac4f1dc..297bab8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,12 +4,14 @@ babel pygeofilter Flask~=2.2.3 oauthlib~=3.2.2 -prefect~=2.20.9 -geopandas~=0.14.1 +prefect griffe >= 0.20.0, <0.48.0 gunicorn -pydantic~=2.9.2 +pydantic urllib3~=1.26.15 python-dateutil~=2.8.2 setuptools~=67.6.0 -rtree \ No newline at end of file +rtree +filelock +geopandas~=1.0.1 +pandas~=2.2.3 \ No newline at end of file diff --git a/requirements_nodeps.txt b/requirements_nodeps.txt index 13c1e1f..429d7b7 100644 --- a/requirements_nodeps.txt +++ b/requirements_nodeps.txt @@ -1,2 +1,2 @@ pygeoapi-prefect @ git+https://github.com/kommonitor/pygeoapi-prefect.git@feature/kommonitor -pygeoapi @ git+https://github.com/geopython/pygeoapi.git@e7264e8 \ No newline at end of file +pygeoapi @ git+https://github.com/52North/pygeoapi.git@pydantic_v2 \ No newline at end of file