diff --git a/examples/00-testing-example/sample.yaml b/examples/00-testing-example/sample.yaml index 6dd311b6..f3683528 100644 --- a/examples/00-testing-example/sample.yaml +++ b/examples/00-testing-example/sample.yaml @@ -5,7 +5,8 @@ general: email: "pgierz@awi.de" cmor_version: "CMIP6" mip: "CMIP" - CMIP_Tables_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/Tables" + CMIP_Tables_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/Tables/" + CV_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/CMIP6_CVs/" pymorize: # parallel: True warn_on_no_rule: False diff --git a/src/pymorize/cli.py b/src/pymorize/cli.py index 84008023..d5fb7ff4 100644 --- a/src/pymorize/cli.py +++ b/src/pymorize/cli.py @@ -222,6 +222,26 @@ def config(config_file, verbose, quiet, logfile, profile_mem): logger.error(f"{key}: {error}") +@validate.command() +@click_loguru.logging_options +@click_loguru.init_logger() +@click.argument("config_file", type=click.Path(exists=True)) +def pipeline_requirements(config_file, verbose, quiet, logfile, profile_mem): + logger.info(f"Processing {config_file}") + with open(config_file, "r") as f: + cfg = yaml.safe_load(f) + # We don't need dask just to check the pipelines + cfg["pymorize"] = cfg.get("pymorize", {}) + cfg["pymorize"]["enable_dask"] = False + cmorizer = CMORizer.from_dict(cfg) + for rule in cmorizer.rules: + try: + rule.match_pipelines(cmorizer.pipelines) + rule.crosscheck_pipelines() + except Exception as e: + logger.error(f"Error in {rule.name}: {e}") + + @validate.command() @click_loguru.logging_options @click_loguru.init_logger() diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index d0102bc6..9abf46b6 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -57,6 +57,7 @@ def __init__( rules_cfg=None, dask_cfg=None, inherit_cfg=None, + run_post_inits=True, **kwargs, ): ################################################################################ @@ -112,25 +113,26 @@ def __init__( ################################################################################ # Post_Init: - if self._pymorize_cfg("enable_dask"): - logger.debug("Setting up dask configuration...") - self._post_init_configure_dask() - logger.debug("...done!") - logger.debug("Creating dask cluster...") - self._post_init_create_dask_cluster() - logger.debug("...done!") - self._post_init_create_pipelines() - self._post_init_create_rules() - self._post_init_create_data_request_tables() - self._post_init_create_data_request() - self._post_init_populate_rules_with_tables() - self._post_init_populate_rules_with_dimensionless_unit_mappings() - self._post_init_populate_rules_with_aux_files() - self._post_init_populate_rules_with_data_request_variables() - self._post_init_create_controlled_vocabularies() - self._post_init_populate_rules_with_controlled_vocabularies() - self._post_init_create_global_attributes_on_rules() - logger.debug("...post-init done!") + if run_post_inits: + if self._pymorize_cfg("enable_dask"): + logger.debug("Setting up dask configuration...") + self._post_init_configure_dask() + logger.debug("...done!") + logger.debug("Creating dask cluster...") + self._post_init_create_dask_cluster() + logger.debug("...done!") + self._post_init_create_pipelines() + self._post_init_create_rules() + self._post_init_create_data_request_tables() + self._post_init_create_data_request() + self._post_init_populate_rules_with_tables() + self._post_init_populate_rules_with_dimensionless_unit_mappings() + self._post_init_populate_rules_with_aux_files() + self._post_init_populate_rules_with_data_request_variables() + self._post_init_create_controlled_vocabularies() + self._post_init_populate_rules_with_controlled_vocabularies() + self._post_init_create_global_attributes_on_rules() + logger.debug("...post-init done!") ################################################################################ def __del__(self): @@ -321,6 +323,11 @@ def _match_pipelines_in_rules(self, force=False): for rule in self.rules: rule.match_pipelines(self.pipelines, force=force) + def _crosscheck_pipelines_in_rules(self): + if self._pymorize_cfg.get("pipelines_enforce_requirements"): + for rule in self.rules: + rule.crosscheck_pipelines() + def find_matching_rule( self, data_request_variable: DataRequestVariable ) -> Rule or None: @@ -609,6 +616,7 @@ def check_rules_for_output_dir(self, output_dir): def process(self, parallel=None): logger.debug("Process start!") self._match_pipelines_in_rules() + self._crosscheck_pipelines_in_rules() if parallel is None: parallel = self._pymorize_cfg.get("parallel", True) if parallel: diff --git a/src/pymorize/config.py b/src/pymorize/config.py index e68567d5..ffe68f78 100644 --- a/src/pymorize/config.py +++ b/src/pymorize/config.py @@ -181,6 +181,11 @@ class Config: ], ), ) + pipelines_enforce_requirements = Option( + parser=_parse_bool, + default="yes", + doc="Whether to ensure that pipeline rules match up with their requirements", + ) quiet = Option( default=False, doc="Whether to suppress output.", parser=_parse_bool ) diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index c688c61f..d1079722 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -1,5 +1,6 @@ import copy import datetime +import inspect import pathlib import re import typing @@ -12,6 +13,54 @@ from .data_request.variable import DataRequestVariable from .gather_inputs import InputFileCollection from .logging import logger +from .pipeline import Pipeline + + +class RuleRequirement: + """Defines a requirement which steps must have tagged in order to be valid""" + + def __init__(self, requirement_name: str, requirement_value: typing.Any): + """ + Initialize a RuleRequirement object. + + Parameters + ---------- + requirement_name : str + The name of the requirement that the step must satisfy. + requirement_value : Any + The value of the requirement that the step must satisfy. + """ + self.requirement_name = requirement_name + self.requirement_value = requirement_value + + @classmethod + def from_dict(cls, d): + """Build a RuleRequirement object from a dictionary""" + return cls(**d) + + def pipeline_fulfills_requirements(self, pipeline: Pipeline) -> bool: + """Check if a pipeline fulfills the requirements of the rule + + Parameters + ---------- + pipeline : Pipeline + The pipeline to check + + Returns + ------- + bool + True if the pipeline fulfills the requirements, False otherwise + """ + for step in pipeline.steps: + step_src = inspect.getsource(step) + for line in step_src.split("\n"): + if f"_satisfies_{self.requirement_name}" in line: + if str(self.requirement_value) in line: + return True + return False + + def __repr__(self): + return f"{self.__class__.__name__}({self.requirement_name}, {self.requirement_value})" class Rule: @@ -167,6 +216,31 @@ def match_pipelines(self, pipelines, force=False): self.pipelines = matched_pipelines self._pipelines_are_mapped = True + def crosscheck_pipelines(self): + """ + Check that all pipelines are valid for the rule. + + This method will raise a ValueError if any of the pipelines in the rule are not valid. + """ + requirements = [] + # FIXME: Dynamically get requirements based on CMOR variable. Something like this: + # Should be a list of dictionaries containing requirement specifications: + # requirements = [{"requirement_name": "cell_methods", "requirement_value": "time: mean"}, ] + if hasattr(self.data_request_variable, "cell_methods"): + requirements.append( + { + "requirement_name": "cell_methods", + "requirement_value": self.data_request_variable.cell_methods, + }, + ) + for requirement in requirements: + rr = RuleRequirement.from_dict(requirement) + req_satisfied = any( + rr.pipeline_fulfills_requirements(pl) for pl in self.pipelines + ) + if not req_satisfied: + raise ValueError(f"Rule {self.name} does not satisfy requirement {rr}") + @classmethod def from_dict(cls, data): """Build a rule object from a dictionary