Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/00-testing-example/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ general:
email: "pgierz@awi.de"
cmor_version: "CMIP6"
mip: "CMIP"
CMIP_Tables_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/Tables"
CMIP_Tables_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/Tables/"
CV_Dir: "/work/ab0246/a270077/SciComp/Projects/pymorize/cmip6-cmor-tables/CMIP6_CVs/"
pymorize:
# parallel: True
warn_on_no_rule: False
Expand Down
20 changes: 20 additions & 0 deletions src/pymorize/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,26 @@ def config(config_file, verbose, quiet, logfile, profile_mem):
logger.error(f"{key}: {error}")


@validate.command()
@click_loguru.logging_options
@click_loguru.init_logger()
@click.argument("config_file", type=click.Path(exists=True))
def pipeline_requirements(config_file, verbose, quiet, logfile, profile_mem):
logger.info(f"Processing {config_file}")
with open(config_file, "r") as f:
cfg = yaml.safe_load(f)
# We don't need dask just to check the pipelines
cfg["pymorize"] = cfg.get("pymorize", {})
cfg["pymorize"]["enable_dask"] = False
cmorizer = CMORizer.from_dict(cfg)
for rule in cmorizer.rules:
try:
rule.match_pipelines(cmorizer.pipelines)
rule.crosscheck_pipelines()
except Exception as e:
logger.error(f"Error in {rule.name}: {e}")


@validate.command()
@click_loguru.logging_options
@click_loguru.init_logger()
Expand Down
46 changes: 27 additions & 19 deletions src/pymorize/cmorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
rules_cfg=None,
dask_cfg=None,
inherit_cfg=None,
run_post_inits=True,
**kwargs,
):
################################################################################
Expand Down Expand Up @@ -112,25 +113,26 @@ def __init__(

################################################################################
# Post_Init:
if self._pymorize_cfg("enable_dask"):
logger.debug("Setting up dask configuration...")
self._post_init_configure_dask()
logger.debug("...done!")
logger.debug("Creating dask cluster...")
self._post_init_create_dask_cluster()
logger.debug("...done!")
self._post_init_create_pipelines()
self._post_init_create_rules()
self._post_init_create_data_request_tables()
self._post_init_create_data_request()
self._post_init_populate_rules_with_tables()
self._post_init_populate_rules_with_dimensionless_unit_mappings()
self._post_init_populate_rules_with_aux_files()
self._post_init_populate_rules_with_data_request_variables()
self._post_init_create_controlled_vocabularies()
self._post_init_populate_rules_with_controlled_vocabularies()
self._post_init_create_global_attributes_on_rules()
logger.debug("...post-init done!")
if run_post_inits:
if self._pymorize_cfg("enable_dask"):
logger.debug("Setting up dask configuration...")
self._post_init_configure_dask()
logger.debug("...done!")
logger.debug("Creating dask cluster...")
self._post_init_create_dask_cluster()
logger.debug("...done!")
self._post_init_create_pipelines()
self._post_init_create_rules()
self._post_init_create_data_request_tables()
self._post_init_create_data_request()
self._post_init_populate_rules_with_tables()
self._post_init_populate_rules_with_dimensionless_unit_mappings()
self._post_init_populate_rules_with_aux_files()
self._post_init_populate_rules_with_data_request_variables()
self._post_init_create_controlled_vocabularies()
self._post_init_populate_rules_with_controlled_vocabularies()
self._post_init_create_global_attributes_on_rules()
logger.debug("...post-init done!")
################################################################################

def __del__(self):
Expand Down Expand Up @@ -321,6 +323,11 @@ def _match_pipelines_in_rules(self, force=False):
for rule in self.rules:
rule.match_pipelines(self.pipelines, force=force)

def _crosscheck_pipelines_in_rules(self):
if self._pymorize_cfg.get("pipelines_enforce_requirements"):
for rule in self.rules:
rule.crosscheck_pipelines()

def find_matching_rule(
self, data_request_variable: DataRequestVariable
) -> Rule or None:
Expand Down Expand Up @@ -609,6 +616,7 @@ def check_rules_for_output_dir(self, output_dir):
def process(self, parallel=None):
logger.debug("Process start!")
self._match_pipelines_in_rules()
self._crosscheck_pipelines_in_rules()
if parallel is None:
parallel = self._pymorize_cfg.get("parallel", True)
if parallel:
Expand Down
5 changes: 5 additions & 0 deletions src/pymorize/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class Config:
],
),
)
pipelines_enforce_requirements = Option(
parser=_parse_bool,
default="yes",
doc="Whether to ensure that pipeline rules match up with their requirements",
)
quiet = Option(
default=False, doc="Whether to suppress output.", parser=_parse_bool
)
Expand Down
74 changes: 74 additions & 0 deletions src/pymorize/rule.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import datetime
import inspect
import pathlib
import re
import typing
Expand All @@ -12,6 +13,54 @@
from .data_request.variable import DataRequestVariable
from .gather_inputs import InputFileCollection
from .logging import logger
from .pipeline import Pipeline


class RuleRequirement:
"""Defines a requirement which steps must have tagged in order to be valid"""

def __init__(self, requirement_name: str, requirement_value: typing.Any):
"""
Initialize a RuleRequirement object.

Parameters
----------
requirement_name : str
The name of the requirement that the step must satisfy.
requirement_value : Any
The value of the requirement that the step must satisfy.
"""
self.requirement_name = requirement_name
self.requirement_value = requirement_value

@classmethod
def from_dict(cls, d):
"""Build a RuleRequirement object from a dictionary"""
return cls(**d)

def pipeline_fulfills_requirements(self, pipeline: Pipeline) -> bool:
"""Check if a pipeline fulfills the requirements of the rule

Parameters
----------
pipeline : Pipeline
The pipeline to check

Returns
-------
bool
True if the pipeline fulfills the requirements, False otherwise
"""
for step in pipeline.steps:
step_src = inspect.getsource(step)
for line in step_src.split("\n"):
if f"_satisfies_{self.requirement_name}" in line:
if str(self.requirement_value) in line:
Comment on lines +57 to +58
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not terribly convinced by this, it requires that the name and value are strictly defined to be on the same line. That might not be the best way of doing it.

return True
return False

def __repr__(self):
return f"{self.__class__.__name__}({self.requirement_name}, {self.requirement_value})"


class Rule:
Expand Down Expand Up @@ -167,6 +216,31 @@ def match_pipelines(self, pipelines, force=False):
self.pipelines = matched_pipelines
self._pipelines_are_mapped = True

def crosscheck_pipelines(self):
"""
Check that all pipelines are valid for the rule.

This method will raise a ValueError if any of the pipelines in the rule are not valid.
"""
requirements = []
# FIXME: Dynamically get requirements based on CMOR variable. Something like this:
# Should be a list of dictionaries containing requirement specifications:
# requirements = [{"requirement_name": "cell_methods", "requirement_value": "time: mean"}, ]
if hasattr(self.data_request_variable, "cell_methods"):
requirements.append(
{
"requirement_name": "cell_methods",
"requirement_value": self.data_request_variable.cell_methods,
},
)
for requirement in requirements:
rr = RuleRequirement.from_dict(requirement)
req_satisfied = any(
rr.pipeline_fulfills_requirements(pl) for pl in self.pipelines
)
if not req_satisfied:
raise ValueError(f"Rule {self.name} does not satisfy requirement {rr}")
Comment on lines +241 to +242
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should gather all requirements, instead of exiting at the first one.


@classmethod
def from_dict(cls, data):
"""Build a rule object from a dictionary
Expand Down
Loading