diff --git a/fre/app/generate_time_averages/combine.py b/fre/app/generate_time_averages/combine.py index ee3cff863..6e17b0f54 100644 --- a/fre/app/generate_time_averages/combine.py +++ b/fre/app/generate_time_averages/combine.py @@ -8,6 +8,7 @@ import subprocess import metomi.isodatetime.parsers +import xarray as xr from ..helpers import change_directory @@ -39,21 +40,29 @@ def form_bronx_directory_name(frequency: str, return frequency_label + '_' + str(interval_object.years) + 'yr' -def check_glob(target: str) -> None: +def merge_netcdfs(input_file_glob: str, output_file: str) -> None: """ - Verify that at least one file is resolved by the glob. - Raises FileNotFoundError if no files are found. - - :param target: Glob target to resolve - :type target: str - :raises FileNotFoundError: No files found + Merge a group of NetCDF files identified by a glob string + into one combined NetCDF file. + + :param input_file_glob: Glob string used to form input file list + :type source: str + :param output_file: Merged output NetCDF file + :type source: str + :raises FileNotFoundError: Input files not found + :raises FileExistsError: Output file already exists :rtype: None """ - files = glob.glob(target) - if len(files) >= 1: - fre_logger.debug("%s has %s files", target, len(files)) + input_files = glob.glob(input_file_glob) + if len(input_files) >= 1: + fre_logger.debug(f"Input file search string '{input_file_glob}' matched {len(input_files)} files") else: - raise FileNotFoundError(f"target={target} resolves to no files") + raise FileNotFoundError(f"'{input_file_glob}' resolves to no files") + if Path(output_file).exists(): + raise FileExistsError(f"Output file '{output_file}' already exists") + + ds = xr.open_mfdataset(input_files, compat='override', coords='minimal') + ds.to_netcdf(output_file, unlimited_dims=['time']) def combine( root_in_dir: str, @@ -107,8 +116,7 @@ def combine( root_in_dir: str, if frequency == 'yr': source = component + '.' + date_string + '.*.nc' target = component + '.' + date_string + '.nc' - check_glob(source) - subprocess.run(['cdo', '-O', 'merge', source, target], check=True) + merge_netcdfs(source, target) fre_logger.debug("Output file created: %s", target) fre_logger.debug("Copying to %s", outdir) subprocess.run(['cp', '-v', target, outdir], check=True) @@ -116,11 +124,5 @@ def combine( root_in_dir: str, for month_int in range(1,13): source = f"{component}.{date_string}.*.{month_int:02d}.nc" target = f"{component}.{date_string}.{month_int:02d}.nc" - check_glob(source) - - # does there exist a python-cdo way of doing the merge? - subprocess.run(['cdo', '-O', 'merge', source, target], check=True) - fre_logger.debug("Output file created: %s", target) - fre_logger.debug("Copying to %s", outdir) - + merge_netcdfs(source, target) subprocess.run(['cp', '-v', target, outdir], check=True) diff --git a/fre/app/mask_atmos_plevel/mask_atmos_plevel.py b/fre/app/mask_atmos_plevel/mask_atmos_plevel.py index d60bd48bb..65c3a7f79 100755 --- a/fre/app/mask_atmos_plevel/mask_atmos_plevel.py +++ b/fre/app/mask_atmos_plevel/mask_atmos_plevel.py @@ -80,9 +80,9 @@ def mask_atmos_plevel_subtool(infile: str = None, fre_logger.debug('first pressure masking trigger passed. processing data.') ds_out[var] = mask_field_above_surface_pressure(ds_in, var, ds_ps) ds_out[var].attrs['pressure_mask'] = "True" - fre_logger.info("Finished processing %s, pressure_mask is True", var) + fre_logger.info(f"Finished processing '{var}'. pressure_mask was False, now set to True") else: - fre_logger.debug("Not processing %s, 'pressure_mask' was not False.", var) + fre_logger.debug(f"Not processing '{var}' because 'pressure_mask' is already True.") elif '_unmsk' in var: fre_logger.debug('second pressure masking trigger passed, \'_unmsk\' in variable name. processing data.') @@ -103,7 +103,7 @@ def mask_atmos_plevel_subtool(infile: str = None, fre_logger.info("Finished processing %s, wrote %s, pressure_mask is True", var, masked_var) else: - fre_logger.debug("Not processing %s, no pressure_mask attr, nor _unmsk in the variable name", var) + fre_logger.info(f"Not processing '{var}' because no 'pressure_mask' attribute exists") fre_logger.info('Write the output file if any unmasked variables were masked') if ds_out.variables: diff --git a/fre/app/regrid_xy/regrid_xy.py b/fre/app/regrid_xy/regrid_xy.py index 3e261c2e1..83895b52f 100644 --- a/fre/app/regrid_xy/regrid_xy.py +++ b/fre/app/regrid_xy/regrid_xy.py @@ -326,11 +326,16 @@ def regrid_xy(yamlfile: str, datadict["remap_dir"] = remap_dir datadict["input_date"] = input_date[:8] + # add temporal and static history files components = [] for component in yamldict["postprocess"]["components"]: - for this_source in component["sources"]: - if this_source["history_file"] == source: + for temporal_history in component["sources"]: + if temporal_history["history_file"] == source: components.append(component) + if "static" in component: + for static_history in component["static"]: + if static_history["source"] == source: + components.append(component) # submit fregrid job for each component for component in components: diff --git a/fre/app/regrid_xy/tests/generate_files.py b/fre/app/regrid_xy/tests/generate_files.py index a39de7b12..181d33f50 100644 --- a/fre/app/regrid_xy/tests/generate_files.py +++ b/fre/app/regrid_xy/tests/generate_files.py @@ -152,6 +152,11 @@ def make_data(): history_file = source["history_file"] for i in range(1, ntiles+1): dataset.to_netcdf(f"{input_dir}/{date}.{history_file}.tile{i}.nc") + if "static" in component: + for static_source in component["static"]: + history_file = static_source["source"] + for i in range(1, ntiles+1): + dataset.to_netcdf(f"{input_dir}/{date}.{history_file}.tile{i}.nc") def make_all(): diff --git a/fre/app/regrid_xy/tests/test_regrid_xy.py b/fre/app/regrid_xy/tests/test_regrid_xy.py index 6eeb2809b..74c19f58e 100644 --- a/fre/app/regrid_xy/tests/test_regrid_xy.py +++ b/fre/app/regrid_xy/tests/test_regrid_xy.py @@ -28,31 +28,31 @@ remap_dir= Path(curr_dir)/"test_remap" work_dir = Path(curr_dir)/"test_work" -components = [] -pp_input_files = [{"history_file":"pemberley"}, {"history_file":"longbourn"}] -components.append({"xyInterp": f"{nxy},{nxy}", - "interpMethod": "conserve_order2", - "inputRealm": "atmos", - "type": f"pride_and_prejudice", - "sources": pp_input_files, - "postprocess_on": True} -) -emma_input_files = [{"history_file":"hartfield"}, {"history_file":"donwell_abbey"}] -components.append({"xyInterp": f"{nxy},{nxy}", - "interpMethod": "conserve_order2", - "inputRealm": "atmos", - "type": f"emma", - "sources": emma_input_files, - "postprocess_on": True} -) -here_input_files = [{"history_file":"gfdl"}, {"history_file":"princeton"}] -components.append({"xyInterp": f"{nxy},{nxy}", - "interpMethod": "conserve_order2", - "inputRealm": "atmos", - "type": "here", - "sources": here_input_files, - "postprocess_on": False} -) +input_files = [{"history_file":"pemberley"}, {"history_file":"longbourn"}] +input_files_donotregrid = [{"history_file":"nope"}] +input_files_static = [{"source": "my_static_history"}] + +components = [ + {"xyInterp": f"{nxy},{nxy}", + "interpMethod": "conserve_order2", + "inputRealm": "atmos", + "type": f"pride_and_prejudice", + "sources": input_files, + "postprocess_on": True}, + {"xyInterp": f"{nxy},{nxy}", + "interpMethod": "conserve_order2", + "inputRealm": "atmos", + "type": f"my_component", + "sources": input_files, + "static": input_files_static, + "postprocess_on": True}, + {"xyInterp": f"{nxy},{nxy}", + "interpMethod": "conserve_order2", + "inputRealm": "atmos", + "type": f"this_comp_is_off", + "sources": input_files_donotregrid, + "postprocess_on": False} +] def setup_test(): @@ -84,16 +84,63 @@ def cleanup_test(): @pytest.mark.skipif(not HAVE_FREGRID, reason='fregrid not in env. it was removed from package reqs. you must load it externally') def test_regrid_xy(): + """ + Tests the main function regrid_xy and ensures + data is regridded correctly + """ + + setup_test() + + #modify generate_files to change sources + for source_dict in input_files: + source = source_dict["history_file"] + regrid_xy.regrid_xy(yamlfile=str(yamlfile), + input_dir=str(input_dir), + output_dir=str(output_dir), + work_dir=str(work_dir), + remap_dir=str(remap_dir), + source=source, + input_date=date+"TTTT") + + #check answers + for source_dict in input_files: + # Files are now output to a subdirectory based on grid size and interpolation method + output_subdir = output_dir/f"{nxy}_{nxy}.conserve_order2" + outfile = output_subdir/f"{date}.{source_dict['history_file']}.nc" + + test = xr.load_dataset(outfile) + + assert "wet_c" not in test + assert "mister" in test + assert "darcy" in test + assert "wins" in test + + assert np.all(test["mister"].values==np.float64(1.0)) + assert np.all(test["darcy"].values==np.float64(2.0)) + assert np.all(test["wins"].values==np.float64(3.0)) + + #check answers, these shouldn't have been regridded + for source_dict in input_files_donotregrid: + ifile = source_dict["history_file"] + assert not (output_dir/f"{date}.{ifile}.nc").exists() + + #check remap_file exists and is not empty + remap_file = remap_dir/f"C{nxy}_mosaicX{nxy}by{nxy}_conserve_order2.nc" + assert remap_file.exists() + + cleanup_test() + + +def test_regrid_xy_static(): """ - Tests the main function regrid_xy and ensures - data is regridded correctly + Same as test_regrid_xy but flavored for statics """ setup_test() - #modify generate_files to change sources - for source_dict in pp_input_files + emma_input_files + here_input_files: - source = source_dict["history_file"] + # regrid the static history file + for source_dict in input_files_static: + source = source_dict["source"] regrid_xy.regrid_xy(yamlfile=str(yamlfile), input_dir=str(input_dir), output_dir=str(output_dir), @@ -102,11 +149,11 @@ def test_regrid_xy(): source=source, input_date=date+"TTTT") - #check answers - for source_dict in pp_input_files + emma_input_files: + # check the regridded history file + for source_dict in input_files_static: # Files are now output to a subdirectory based on grid size and interpolation method output_subdir = output_dir/f"{nxy}_{nxy}.conserve_order2" - outfile = output_subdir/f"{date}.{source_dict['history_file']}.nc" + outfile = output_subdir/f"{date}.{source_dict['source']}.nc" test = xr.load_dataset(outfile) @@ -119,19 +166,17 @@ def test_regrid_xy(): assert np.all(test["darcy"].values==np.float64(2.0)) assert np.all(test["wins"].values==np.float64(3.0)) - #check answers, these shouldn't have been regridded - for source_dict in here_input_files: - ifile = source_dict["history_file"] - assert not (output_dir/f"{date}.{ifile}.nc").exists() - #check remap_file exists and is not empty remap_file = remap_dir/f"C{nxy}_mosaicX{nxy}by{nxy}_conserve_order2.nc" assert remap_file.exists() cleanup_test() + + @pytest.mark.skipif(not HAVE_FREGRID, reason='fregrid not in env. it was removed from package reqs. you must load it externally') + def test_get_input_mosaic(): """ Tests get_input_mosaic correctly copies the mosaic file to the input directory @@ -186,11 +231,11 @@ def test_get_remap_file(): #check remap file from current directory is copied to input directory remap_file = Path(f"remap_dir/{input_mosaic}X{nlon}by{nlat}_{interp_method}.nc") - assert regrid_xy.get_remap_file(datadict) == str(remap_dir/remap_file) + assert regrid_xy.get_remap_file(datadict) == str(remap_file) remap_dir.mkdir(exist_ok=True) remap_file.touch() - assert regrid_xy.get_remap_file(datadict) == str(remap_dir/remap_file) + assert regrid_xy.get_remap_file(datadict) == str(remap_file) Path(remap_file).unlink() shutil.rmtree(remap_dir)