Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions fre/app/generate_time_averages/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import subprocess
import metomi.isodatetime.parsers
import xarray as xr

from ..helpers import change_directory

Expand Down Expand Up @@ -39,21 +40,29 @@
return frequency_label + '_' + str(interval_object.years) + 'yr'


def check_glob(target: str) -> None:
def merge_netcdfs(input_file_glob: str, output_file: str) -> None:
"""
Verify that at least one file is resolved by the glob.
Raises FileNotFoundError if no files are found.

:param target: Glob target to resolve
:type target: str
:raises FileNotFoundError: No files found
Merge a group of NetCDF files identified by a glob string
into one combined NetCDF file.

:param input_file_glob: Glob string used to form input file list
:type source: str
:param output_file: Merged output NetCDF file
:type source: str
:raises FileNotFoundError: Input files not found
:raises FileExistsError: Output file already exists
:rtype: None
"""
files = glob.glob(target)
if len(files) >= 1:
fre_logger.debug("%s has %s files", target, len(files))
input_files = glob.glob(input_file_glob)
if len(input_files) >= 1:
fre_logger.debug(f"Input file search string '{input_file_glob}' matched {len(input_files)} files")
else:
raise FileNotFoundError(f"target={target} resolves to no files")
raise FileNotFoundError(f"'{input_file_glob}' resolves to no files")
if Path(output_file).exists():
raise FileExistsError(f"Output file '{output_file}' already exists")

ds = xr.open_mfdataset(input_files, compat='override', coords='minimal')

Check warning on line 64 in fre/app/generate_time_averages/combine.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (mfdataset)
ds.to_netcdf(output_file, unlimited_dims=['time'])


def combine( root_in_dir: str,
Expand Down Expand Up @@ -107,20 +116,13 @@
if frequency == 'yr':
source = component + '.' + date_string + '.*.nc'
target = component + '.' + date_string + '.nc'
check_glob(source)
subprocess.run(['cdo', '-O', 'merge', source, target], check=True)
merge_netcdfs(source, target)
fre_logger.debug("Output file created: %s", target)
fre_logger.debug("Copying to %s", outdir)
subprocess.run(['cp', '-v', target, outdir], check=True)
elif frequency == 'mon':
for month_int in range(1,13):
source = f"{component}.{date_string}.*.{month_int:02d}.nc"
target = f"{component}.{date_string}.{month_int:02d}.nc"
check_glob(source)

# does there exist a python-cdo way of doing the merge?
subprocess.run(['cdo', '-O', 'merge', source, target], check=True)
fre_logger.debug("Output file created: %s", target)
fre_logger.debug("Copying to %s", outdir)

merge_netcdfs(source, target)
subprocess.run(['cp', '-v', target, outdir], check=True)
6 changes: 3 additions & 3 deletions fre/app/mask_atmos_plevel/mask_atmos_plevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ def mask_atmos_plevel_subtool(infile: str = None,
fre_logger.debug('first pressure masking trigger passed. processing data.')
ds_out[var] = mask_field_above_surface_pressure(ds_in, var, ds_ps)
ds_out[var].attrs['pressure_mask'] = "True"
fre_logger.info("Finished processing %s, pressure_mask is True", var)
fre_logger.info(f"Finished processing '{var}'. pressure_mask was False, now set to True")
else:
fre_logger.debug("Not processing %s, 'pressure_mask' was not False.", var)
fre_logger.debug(f"Not processing '{var}' because 'pressure_mask' is already True.")

elif '_unmsk' in var:
fre_logger.debug('second pressure masking trigger passed, \'_unmsk\' in variable name. processing data.')
Expand All @@ -103,7 +103,7 @@ def mask_atmos_plevel_subtool(infile: str = None,

fre_logger.info("Finished processing %s, wrote %s, pressure_mask is True", var, masked_var)
else:
fre_logger.debug("Not processing %s, no pressure_mask attr, nor _unmsk in the variable name", var)
fre_logger.info(f"Not processing '{var}' because no 'pressure_mask' attribute exists")

fre_logger.info('Write the output file if any unmasked variables were masked')
if ds_out.variables:
Expand Down
9 changes: 7 additions & 2 deletions fre/app/regrid_xy/regrid_xy.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,16 @@ def regrid_xy(yamlfile: str,
datadict["remap_dir"] = remap_dir
datadict["input_date"] = input_date[:8]

# add temporal and static history files
components = []
for component in yamldict["postprocess"]["components"]:
for this_source in component["sources"]:
if this_source["history_file"] == source:
for temporal_history in component["sources"]:
if temporal_history["history_file"] == source:
components.append(component)
if "static" in component:
for static_history in component["static"]:
if static_history["source"] == source:
components.append(component)

# submit fregrid job for each component
for component in components:
Expand Down
5 changes: 5 additions & 0 deletions fre/app/regrid_xy/tests/generate_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def make_data():
history_file = source["history_file"]
for i in range(1, ntiles+1):
dataset.to_netcdf(f"{input_dir}/{date}.{history_file}.tile{i}.nc")
if "static" in component:
for static_source in component["static"]:
history_file = static_source["source"]
for i in range(1, ntiles+1):
dataset.to_netcdf(f"{input_dir}/{date}.{history_file}.tile{i}.nc")


def make_all():
Expand Down
125 changes: 85 additions & 40 deletions fre/app/regrid_xy/tests/test_regrid_xy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,31 @@
remap_dir= Path(curr_dir)/"test_remap"
work_dir = Path(curr_dir)/"test_work"

components = []
pp_input_files = [{"history_file":"pemberley"}, {"history_file":"longbourn"}]
components.append({"xyInterp": f"{nxy},{nxy}",
"interpMethod": "conserve_order2",
"inputRealm": "atmos",
"type": f"pride_and_prejudice",
"sources": pp_input_files,
"postprocess_on": True}
)
emma_input_files = [{"history_file":"hartfield"}, {"history_file":"donwell_abbey"}]
components.append({"xyInterp": f"{nxy},{nxy}",
"interpMethod": "conserve_order2",
"inputRealm": "atmos",
"type": f"emma",
"sources": emma_input_files,
"postprocess_on": True}
)
here_input_files = [{"history_file":"gfdl"}, {"history_file":"princeton"}]
components.append({"xyInterp": f"{nxy},{nxy}",
"interpMethod": "conserve_order2",
"inputRealm": "atmos",
"type": "here",
"sources": here_input_files,
"postprocess_on": False}
)
input_files = [{"history_file":"pemberley"}, {"history_file":"longbourn"}]
input_files_donotregrid = [{"history_file":"nope"}]

Check warning on line 32 in fre/app/regrid_xy/tests/test_regrid_xy.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (donotregrid)
input_files_static = [{"source": "my_static_history"}]

components = [
{"xyInterp": f"{nxy},{nxy}",
"interpMethod": "conserve_order2",
"inputRealm": "atmos",
"type": f"pride_and_prejudice",
"sources": input_files,
"postprocess_on": True},
{"xyInterp": f"{nxy},{nxy}",
"interpMethod": "conserve_order2",
"inputRealm": "atmos",
"type": f"my_component",
"sources": input_files,
"static": input_files_static,
"postprocess_on": True},
{"xyInterp": f"{nxy},{nxy}",
"interpMethod": "conserve_order2",
"inputRealm": "atmos",
"type": f"this_comp_is_off",
"sources": input_files_donotregrid,

Check warning on line 53 in fre/app/regrid_xy/tests/test_regrid_xy.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (donotregrid)
"postprocess_on": False}
]


def setup_test():
Expand Down Expand Up @@ -82,18 +82,65 @@
generate_files.cleanup()

@pytest.mark.skipif(not HAVE_FREGRID,
reason='fregrid not in env. it was removed from package reqs. you must load it externally')

Check warning on line 85 in fre/app/regrid_xy/tests/test_regrid_xy.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (reqs)
def test_regrid_xy():
"""
Tests the main function regrid_xy and ensures
data is regridded correctly
"""

setup_test()

#modify generate_files to change sources
for source_dict in input_files:
source = source_dict["history_file"]
regrid_xy.regrid_xy(yamlfile=str(yamlfile),
input_dir=str(input_dir),
output_dir=str(output_dir),
work_dir=str(work_dir),
remap_dir=str(remap_dir),
source=source,
input_date=date+"TTTT")

#check answers
for source_dict in input_files:
# Files are now output to a subdirectory based on grid size and interpolation method
output_subdir = output_dir/f"{nxy}_{nxy}.conserve_order2"
outfile = output_subdir/f"{date}.{source_dict['history_file']}.nc"

test = xr.load_dataset(outfile)

assert "wet_c" not in test
assert "mister" in test
assert "darcy" in test
assert "wins" in test

assert np.all(test["mister"].values==np.float64(1.0))
assert np.all(test["darcy"].values==np.float64(2.0))
assert np.all(test["wins"].values==np.float64(3.0))

#check answers, these shouldn't have been regridded
for source_dict in input_files_donotregrid:

Check warning on line 123 in fre/app/regrid_xy/tests/test_regrid_xy.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (donotregrid)
ifile = source_dict["history_file"]
assert not (output_dir/f"{date}.{ifile}.nc").exists()

#check remap_file exists and is not empty
remap_file = remap_dir/f"C{nxy}_mosaicX{nxy}by{nxy}_conserve_order2.nc"
assert remap_file.exists()

cleanup_test()


def test_regrid_xy_static():
"""
Tests the main function regrid_xy and ensures
data is regridded correctly
Same as test_regrid_xy but flavored for statics
"""

setup_test()

#modify generate_files to change sources
for source_dict in pp_input_files + emma_input_files + here_input_files:
source = source_dict["history_file"]
# regrid the static history file
for source_dict in input_files_static:
source = source_dict["source"]
regrid_xy.regrid_xy(yamlfile=str(yamlfile),
input_dir=str(input_dir),
output_dir=str(output_dir),
Expand All @@ -102,11 +149,11 @@
source=source,
input_date=date+"TTTT")

#check answers
for source_dict in pp_input_files + emma_input_files:
# check the regridded history file
for source_dict in input_files_static:
# Files are now output to a subdirectory based on grid size and interpolation method
output_subdir = output_dir/f"{nxy}_{nxy}.conserve_order2"
outfile = output_subdir/f"{date}.{source_dict['history_file']}.nc"
outfile = output_subdir/f"{date}.{source_dict['source']}.nc"

test = xr.load_dataset(outfile)

Expand All @@ -119,19 +166,17 @@
assert np.all(test["darcy"].values==np.float64(2.0))
assert np.all(test["wins"].values==np.float64(3.0))

#check answers, these shouldn't have been regridded
for source_dict in here_input_files:
ifile = source_dict["history_file"]
assert not (output_dir/f"{date}.{ifile}.nc").exists()

#check remap_file exists and is not empty
remap_file = remap_dir/f"C{nxy}_mosaicX{nxy}by{nxy}_conserve_order2.nc"
assert remap_file.exists()

cleanup_test()



@pytest.mark.skipif(not HAVE_FREGRID,
reason='fregrid not in env. it was removed from package reqs. you must load it externally')

def test_get_input_mosaic():
"""
Tests get_input_mosaic correctly copies the mosaic file to the input directory
Expand Down Expand Up @@ -186,11 +231,11 @@

#check remap file from current directory is copied to input directory
remap_file = Path(f"remap_dir/{input_mosaic}X{nlon}by{nlat}_{interp_method}.nc")
assert regrid_xy.get_remap_file(datadict) == str(remap_dir/remap_file)
assert regrid_xy.get_remap_file(datadict) == str(remap_file)

remap_dir.mkdir(exist_ok=True)
remap_file.touch()
assert regrid_xy.get_remap_file(datadict) == str(remap_dir/remap_file)
assert regrid_xy.get_remap_file(datadict) == str(remap_file)

Path(remap_file).unlink()
shutil.rmtree(remap_dir)
Loading