From 9ce79bad8af870c9c935cd0dff7e3317838af604 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Tue, 18 Mar 2025 12:28:00 +0100 Subject: [PATCH 1/7] wip: dynamic requirement checking of steps in a pipeline --- src/pymorize/cmorizer.py | 5 ++++ src/pymorize/rule.py | 64 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index e0cea590..c535cc89 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -321,6 +321,10 @@ def _match_pipelines_in_rules(self, force=False): for rule in self.rules: rule.match_pipelines(self.pipelines, force=force) + def _crosscheck_pipeines_in_rules(self): + for rule in self.rules: + rule.crosscheck_pipelines() + def find_matching_rule( self, data_request_variable: DataRequestVariable ) -> Rule or None: @@ -606,6 +610,7 @@ def check_rules_for_output_dir(self, output_dir): def process(self, parallel=None): logger.debug("Process start!") self._match_pipelines_in_rules() + self._crosscheck_pipeines_in_rules() if parallel is None: parallel = self._pymorize_cfg.get("parallel", True) if parallel: diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index c688c61f..fca50f99 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -1,5 +1,6 @@ import copy import datetime +import inspect import pathlib import re import typing @@ -12,6 +13,51 @@ from .data_request.variable import DataRequestVariable from .gather_inputs import InputFileCollection from .logging import logger +from .pipeline import Pipeline + + +class RuleRequirement: + """Defines a requirement which steps must have tagged in order to be valid""" + + def __init__(self, requirement_name: str, requirement_value: typing.Any): + """ + Initialize a RuleRequirement object. + + Parameters + ---------- + requirement_name : str + The name of the requirement that the step must satisfy. + requirement_value : Any + The value of the requirement that the step must satisfy. + """ + self.requirement_name = requirement_name + self.requirement_value = requirement_value + + @classmethod + def from_dict(cls, d): + """Build a RuleRequirement object from a dictionary""" + return cls(**d) + + def pipeline_fulfills_requirements(self, pipeline: Pipeline) -> bool: + """Check if a pipeline fulfills the requirements of the rule + + Parameters + ---------- + pipeline : Pipeline + The pipeline to check + + Returns + ------- + bool + True if the pipeline fulfills the requirements, False otherwise + """ + for step in pipeline.steps: + step_src = inspect.getsource(step) + for line in step_src.split("\n"): + if f"_satisfies_{self.requirement_name}" in line: + if str(self.requirement_value) in line: + return True + return False class Rule: @@ -167,6 +213,24 @@ def match_pipelines(self, pipelines, force=False): self.pipelines = matched_pipelines self._pipelines_are_mapped = True + def crosscheck_pipelines(self): + """ + Check that all pipelines are valid for the rule. + + This method will raise a ValueError if any of the pipelines in the rule are not valid. + """ + # FIXME: Dynamically get requirements based on CMOR variable. Something like this: + # Should be a list of dictionaries containing requirement specifications: + # requirements = [{"requirement_name": "cell_methods", "requirement_value": "time: mean"}, ] + requirements = [dict(), dict()] + for requirement in requirements: + rr = RuleRequirement.from_dict(**requirement) + req_satisfied = any( + rr.pipeline_fulfills_requirements(pl) for pl in self.pipelines + ) + if not req_satisfied: + raise ValueError(f"Rule {self.name} does not satisfy requirement {rr}") + @classmethod def from_dict(cls, data): """Build a rule object from a dictionary From d854642fed902d468e4b9b98019a042b9bfb62ac Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Tue, 18 Mar 2025 12:39:03 +0100 Subject: [PATCH 2/7] wip: typo --- src/pymorize/cmorizer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index c535cc89..8fe001c0 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -321,7 +321,7 @@ def _match_pipelines_in_rules(self, force=False): for rule in self.rules: rule.match_pipelines(self.pipelines, force=force) - def _crosscheck_pipeines_in_rules(self): + def _crosscheck_pipelines_in_rules(self): for rule in self.rules: rule.crosscheck_pipelines() @@ -610,7 +610,7 @@ def check_rules_for_output_dir(self, output_dir): def process(self, parallel=None): logger.debug("Process start!") self._match_pipelines_in_rules() - self._crosscheck_pipeines_in_rules() + self._crosscheck_pipelines_in_rules() if parallel is None: parallel = self._pymorize_cfg.get("parallel", True) if parallel: From 401c80d2f80c5a551ad3956aec28f9986ba5bee7 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Tue, 18 Mar 2025 13:02:02 +0100 Subject: [PATCH 3/7] fix: mini typo in new from_dict for rule crosschecks --- src/pymorize/rule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index fca50f99..4a53829f 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -224,7 +224,7 @@ def crosscheck_pipelines(self): # requirements = [{"requirement_name": "cell_methods", "requirement_value": "time: mean"}, ] requirements = [dict(), dict()] for requirement in requirements: - rr = RuleRequirement.from_dict(**requirement) + rr = RuleRequirement.from_dict(requirement) req_satisfied = any( rr.pipeline_fulfills_requirements(pl) for pl in self.pipelines ) From bdb5ecaf64f6e2b6c0d4a5f812d78d7d4aad2e46 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 31 Mar 2025 10:56:59 +0200 Subject: [PATCH 4/7] wip --- src/pymorize/cli.py | 16 ++++++++++++++++ src/pymorize/rule.py | 1 + 2 files changed, 17 insertions(+) diff --git a/src/pymorize/cli.py b/src/pymorize/cli.py index bbb513fe..ea3a85ff 100644 --- a/src/pymorize/cli.py +++ b/src/pymorize/cli.py @@ -212,6 +212,22 @@ def config(config_file, verbose, quiet, logfile, profile_mem): logger.error(f"{key}: {error}") +@validate.command() +@click_loguru.logging_options +@click_loguru.init_logger() +@click.argument("config_file", type=click.Path(exists=True)) +def pipeline_requirements(config_file, verbose, quiet, logfile, profile_mem): + logger.info(f"Processing {config_file}") + with open(config_file, "r") as f: + cfg = yaml.safe_load(f) + cmorizer = CMORizer.from_dict(cfg) + for rule in cmorizer.rules: + try: + rule.crosscheck_pipelines() + except Exception as e: + logger.error(f"Error in {rule.name}: {e}") + + @validate.command() @click_loguru.logging_options @click_loguru.init_logger() diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index 4a53829f..e31e69c9 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -222,6 +222,7 @@ def crosscheck_pipelines(self): # FIXME: Dynamically get requirements based on CMOR variable. Something like this: # Should be a list of dictionaries containing requirement specifications: # requirements = [{"requirement_name": "cell_methods", "requirement_value": "time: mean"}, ] + breakpoint() requirements = [dict(), dict()] for requirement in requirements: rr = RuleRequirement.from_dict(requirement) From 3d0ff6f6a7f4e9246e633bcc6adeb20e6fa9c198 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 31 Mar 2025 12:03:27 +0200 Subject: [PATCH 5/7] wip: adds standalone checker --- src/pymorize/cli.py | 4 ++++ src/pymorize/cmorizer.py | 40 +++++++++++++++++++++------------------- src/pymorize/rule.py | 13 +++++++++++-- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/pymorize/cli.py b/src/pymorize/cli.py index ea3a85ff..7b596f81 100644 --- a/src/pymorize/cli.py +++ b/src/pymorize/cli.py @@ -220,9 +220,13 @@ def pipeline_requirements(config_file, verbose, quiet, logfile, profile_mem): logger.info(f"Processing {config_file}") with open(config_file, "r") as f: cfg = yaml.safe_load(f) + # We don't need dask just to check the pipelines + cfg["pymorize"] = cfg.get("pymorize", {}) + cfg["pymorize"]["enable_dask"] = False cmorizer = CMORizer.from_dict(cfg) for rule in cmorizer.rules: try: + rule.match_pipelines(cmorizer.pipelines) rule.crosscheck_pipelines() except Exception as e: logger.error(f"Error in {rule.name}: {e}") diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 8fe001c0..cb0dff3f 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -57,6 +57,7 @@ def __init__( rules_cfg=None, dask_cfg=None, inherit_cfg=None, + run_post_inits=True, **kwargs, ): ################################################################################ @@ -112,25 +113,26 @@ def __init__( ################################################################################ # Post_Init: - if self._pymorize_cfg("enable_dask"): - logger.debug("Setting up dask configuration...") - self._post_init_configure_dask() - logger.debug("...done!") - logger.debug("Creating dask cluster...") - self._post_init_create_dask_cluster() - logger.debug("...done!") - self._post_init_create_pipelines() - self._post_init_create_rules() - self._post_init_create_data_request_tables() - self._post_init_create_data_request() - self._post_init_populate_rules_with_tables() - self._post_init_populate_rules_with_dimensionless_unit_mappings() - self._post_init_populate_rules_with_aux_files() - self._post_init_populate_rules_with_data_request_variables() - self._post_init_create_controlled_vocabularies() - self._post_init_populate_rules_with_controlled_vocabularies() - self._post_init_create_global_attributes_on_rules() - logger.debug("...post-init done!") + if run_post_inits: + if self._pymorize_cfg("enable_dask"): + logger.debug("Setting up dask configuration...") + self._post_init_configure_dask() + logger.debug("...done!") + logger.debug("Creating dask cluster...") + self._post_init_create_dask_cluster() + logger.debug("...done!") + self._post_init_create_pipelines() + self._post_init_create_rules() + self._post_init_create_data_request_tables() + self._post_init_create_data_request() + self._post_init_populate_rules_with_tables() + self._post_init_populate_rules_with_dimensionless_unit_mappings() + self._post_init_populate_rules_with_aux_files() + self._post_init_populate_rules_with_data_request_variables() + self._post_init_create_controlled_vocabularies() + self._post_init_populate_rules_with_controlled_vocabularies() + self._post_init_create_global_attributes_on_rules() + logger.debug("...post-init done!") ################################################################################ def __del__(self): diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index e31e69c9..d1079722 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -59,6 +59,9 @@ def pipeline_fulfills_requirements(self, pipeline: Pipeline) -> bool: return True return False + def __repr__(self): + return f"{self.__class__.__name__}({self.requirement_name}, {self.requirement_value})" + class Rule: def __init__( @@ -219,11 +222,17 @@ def crosscheck_pipelines(self): This method will raise a ValueError if any of the pipelines in the rule are not valid. """ + requirements = [] # FIXME: Dynamically get requirements based on CMOR variable. Something like this: # Should be a list of dictionaries containing requirement specifications: # requirements = [{"requirement_name": "cell_methods", "requirement_value": "time: mean"}, ] - breakpoint() - requirements = [dict(), dict()] + if hasattr(self.data_request_variable, "cell_methods"): + requirements.append( + { + "requirement_name": "cell_methods", + "requirement_value": self.data_request_variable.cell_methods, + }, + ) for requirement in requirements: rr = RuleRequirement.from_dict(requirement) req_satisfied = any( From c7d1e1b079b06f3dd94b42b593f1b1bedd25ca50 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 31 Mar 2025 12:09:53 +0200 Subject: [PATCH 6/7] feat: configurable crosschecking --- src/pymorize/cmorizer.py | 5 +++-- src/pymorize/config.py | 5 +++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index cb0dff3f..ba1eafcd 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -324,8 +324,9 @@ def _match_pipelines_in_rules(self, force=False): rule.match_pipelines(self.pipelines, force=force) def _crosscheck_pipelines_in_rules(self): - for rule in self.rules: - rule.crosscheck_pipelines() + if self._pymorize_cfg.get("pipelines_enforce_requirements"): + for rule in self.rules: + rule.crosscheck_pipelines() def find_matching_rule( self, data_request_variable: DataRequestVariable diff --git a/src/pymorize/config.py b/src/pymorize/config.py index a24f60c9..23091344 100644 --- a/src/pymorize/config.py +++ b/src/pymorize/config.py @@ -181,6 +181,11 @@ class Config: ], ), ) + pipelines_enforce_requirements = Option( + parser=_parse_bool, + default="yes", + doc="Whether to ensure that pipeline rules match up with their requirements", + ) quiet = Option( default=False, doc="Whether to suppress output.", parser=_parse_bool ) From 9a19802ec2fc4fcdb02406099cef718d78e9452a Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 31 Mar 2025 12:14:57 +0200 Subject: [PATCH 7/7] correct example yaml --- examples/sample.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/sample.yaml b/examples/sample.yaml index 6dd311b6..f3683528 100644 --- a/examples/sample.yaml +++ b/examples/sample.yaml @@ -5,7 +5,8 @@ general: email: "pgierz@awi.de" cmor_version: "CMIP6" mip: "CMIP" - CMIP_Tables_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/Tables" + CMIP_Tables_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/Tables/" + CV_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/CMIP6_CVs/" pymorize: # parallel: True warn_on_no_rule: False