diff --git a/Template/template_forcing_engine.config b/Template/template_forcing_engine.config index 249e719..bce3077 100755 --- a/Template/template_forcing_engine.config +++ b/Template/template_forcing_engine.config @@ -286,6 +286,7 @@ DownscalingParamDirs = # 5 - CONUS MRMS GRIB2 hourly MultiSensor QPE (Pass 2 or Pass 1) # 6 - Hawaii MRMS GRIB2 hourly MultiSensor QPE (Pass 2 or Pass 1) # 7 - MRMS SBCv2 Liquid Water Fraction (netCDF only) +# 8 - NBM hourly forecast QMD-CONUS-APCP (grib2) SuppPcp = [] # Specify the file type for each supplemental precipitation file (comma separated) @@ -342,4 +343,4 @@ cfsEnsNumber = 1 # input files every 15 minutes, 60 minutes, 3-hours, etc. Please specify the # length of custom input frequencies to match the number of custom NetCDF inputs # selected above in the Logistics section. -custom_input_fcst_freq = [60] \ No newline at end of file +custom_input_fcst_freq = [60] diff --git a/Template/template_forcing_engine_NBM_test.config b/Template/template_forcing_engine_NBM_test.config new file mode 100755 index 0000000..262336b --- /dev/null +++ b/Template/template_forcing_engine_NBM_test.config @@ -0,0 +1,337 @@ +#-------------------------------------------------------------------- +# WRF-Hydro Forcing Engine Configuration File +# +# Input options to the forcing engine include: +# 1.) Choices for input forcing files to use. +# 2.) Options for specifying date ranges and forecast intervals +# for input files. +# 3.) Choices for ESMF regridding techniques. +# 4.) Choices for optional downscaling techniques. +# 5.) Choices for optional bias correction techniques. +# 6.) Choices for optional supplemental precipitation products. +# 7.) Choices for optional ensemble member variations. +# 8.) Choices for output directories to place final output files. + +[Input] +# Choose a set of value(s) of forcing variables to be processed for +# WRF-Hydro. Please be advised that the order of which the values are +# chosen below are the order that the final products will be layered +# into the final LDASIN files. See documentation for additional +# information and examples. +# The following is a global set of key values to map forcing files +# to variables within LDASIN files for WRF-Hydro. The forcing engine +# will map files to external variable names internally. For custom +# external native forcing files (see documenation), the code will +# expect a set of named variables to process. The following is a +# mapping of numeric values to external input native forcing files: +# 1 - NLDAS GRIB retrospective files +# 2 - NARR GRIB retrospective files +# 3 - GFS GRIB2 Global production files on the full gaussian grid +# 4 - NAM Nest GRIB2 Conus production files +# 5 - HRRR GRIB2 Conus production files +# 6 - RAP GRIB2 Conus 13km production files +# 7 - CFSv2 6-hourly GRIB2 Global production files +# 8 - WRF-ARW - GRIB2 Hawaii nest files +# 9 - GFS GRIB2 Global production files on 0.25 degree lat/lon grids. +# 10 - Custom NetCDF hourly forcing files +# 11 - Custom NetCDF hourly forcing files +# 12 - Custom NetCDF hourly forcing files +# 13 - Hawaii 3-km NAM Nest. +# 14 - Puerto Rico 3-km NAM Nest. +# 15 - Alaska 3-km Alaska Nest +# 16 - NAM_Nest_3km_Hawaii_Radiation-Only +# 17 - NAM_Nest_3km_PuertoRico_Radiation-Only +# 18 - WRF-ARW GRIB2 PuertoRico +InputForcings = [3] + +# Specify the file type for each forcing (comma separated) +# Valid types are GRIB1, GRIB2, and NETCDF +# (GRIB files will be converted internally with WGRIB[2]) +InputForcingTypes = GRIB2 + +# Specify the input directories for each forcing product. +InputForcingDirectories = /glade/scratch/zhangyx/ForcingEngine/Forcing_Inputs/GFS_13km_global + +# Specify whether the input forcings listed above are mandatory, or optional. +# This is important for layering contingencies if a product is missing, +# but forcing files are still desired. +# 0 - Not mandatory +# 1 - Mandatory +# NOTE!!! If not files are found for any products, code will error out indicating +# the final field is all missing values. +InputMandatory = [1] + +[Output] +# Specify the output frequency in minutes. +# Note that any frequencies at higher intervals +# than what is provided as input will entail input +# forcing data being temporally interpolated. +OutputFrequency = 60 + +# Specify a top level output directory. For re-forecasts +# and forecasts, sub-directories for each forecast cycle +# will be generated. For retrospective processing, final +# output files will be placed in this directory. +OutDir = /glade/scratch/mazrooei/FE/test_nbm/ + +# Specify a scratch directory that will be used +# for storage of temporary files. These files +# will be removed automatically by the program. +ScratchDir = /glade/scratch/mazrooei/FE/test_nbm/tmp/ + +# Flag to activate scale_factor / add_offset byte packing in +# the output files. +# 0 - Deactivate compression +# 1 - Activate compression +compressOutput = 0 + +# Flag to use floating point output vs scale_factor / add_offset byte packing in +# the output files (the default) +# 0 - Use scale/offset encoding +# 1 - Use floating-point encoding +floatOutput = 0 + +[Retrospective] +# Specify to process forcings in retrosective mode +# 0 - No +# 1 - Yes +RetroFlag = 0 + +# Choose the beginning date of processing forcing files. +# NOTE - Dates are given in YYYYMMDDHHMM format +# If in real-time forecasting mode, leave as -9999. +# These dates get over-ridden in lookBackHours. +BDateProc = 202103010000 +EDateProc = 202103030000 + +[Forecast] +# If this is AnA run, set AnAFlag to 1, otherwise 0. +# Setting this flag will change the behavior of some Bias Correction routines as wel +# as the ForecastInputOffsets options (see below for more information) +AnAFlag = 0 + +# ONLY for realtime forecasting. +# - Specify a lookback period in minutes to process data. +# This overrides any BDateProc/EDateProc options passed above. +# If no LookBack specified, please specify -9999. +#LookBack = 1440 +LookBack = -9999 + +# If running reforecasts, specify a window below. This will override +# using the LookBack value to calculate a processing window. +RefcstBDateProc = 202103121200 +RefcstEDateProc = 202103150000 + +# Specify a forecast frequency in minutes. This value specifies how often +# to generate a set of forecast forcings. If generating hourly retrospective +# forcings, specify this value to be 60. +ForecastFrequency = 360 + +# Forecast cycles are determined by splitting up a day by equal +# ForecastFrequency interval. If there is a desire to shift the +# cycles to a different time step, ForecastShift will shift forecast +# cycles ahead by a determined set of minutes. For example, ForecastFrequency +# of 6 hours will produce forecasts cycles at 00, 06, 12, and 18 UTC. However, +# a ForecastShift of 1 hour will produce forecast cycles at 01, 07, +# 13, and 18 UTC. NOTE - This is only used by the realtime instance +# to calculate forecast cycles accordingly. Re-forecasts will use the beginning +# and ending dates specified in conjunction with the forecast frequency +# to determine forecast cycle dates. +ForecastShift = 0 + +# Specify how much (in minutes) of each input forcing is desires for each +# forecast cycle. See documentation for examples. The length of +# this array must match the input forcing choices. +ForecastInputHorizons = [14400] + +# This option is for applying an offset to input forcings to use a different +# forecasted interval. For example, a user may wish to use 4-5 hour forecasted +# fields from an NWP grid from one of their input forcings. In that instance +# the offset would be 4 hours, but 0 for other remaining forcings. +ForecastInputOffsets = [0] + +[Geospatial] +# Specify a geogrid file that defines the WRF-Hydro (or NWM) domain to which +# the forcings are being processed to. +GeogridIn = /glade/p/cisl/nwc/nwmv21_finals/CONUS/DOMAIN_FinalParams/geo_em.d01.conus_1km_NWMv2.1.nc + +# Specify the optional land spatial metadata file. If found, coordinate projection information +# and coordinate will be translated from to the final output file. +SpatialMetaIn = /glade/p/cisl/nwc/nwmv21_finals/CONUS/DOMAIN_FinalParams/GEOGRID_LDASOUT_Spatial_Metadata_1km_NWMv2.1.nc + +[Regridding] +# Choose regridding options for each input forcing files being used. Options available are: +# 1 - ESMF Bilinear +# 2 - ESMF Nearest Neighbor +# 3 - ESMF Conservative Bilinear +RegridOpt = [1] + +[Interpolation] +# Specify an temporal interpolation for the forcing variables. +# Interpolation will be done between the two neighboring +# input forcing states that exist. If only one nearest +# state exist (I.E. only a state forward in time, or behind), +# then that state will be used as a "nearest neighbor". +# NOTE - All input options here must be of the same length +# of the input forcing number. Also note all temporal interpolation +# occurs BEFORE downscaling and bias correction. +# 0 - No temporal interpolation. WARNING - Will result in states from +# the nearest later forecast data point will be used if output +# timesteps are in-between two input forecast points. +# 1 - Nearest temporal neighbor. +# 2 - Weighted linear average between input points. +forcingTemporalInterpolation = [0] + +[BiasCorrection] +# Choose bias correction options for each of the input forcing files. Length of each option +# must match the length of input forcings. + +# Specify a temperature bias correction method. +# 0 - No bias correctioni +# 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY +# 2 - Custom NCAR bias-correction based on HRRRv3 analysis - based on hour of day (USE WITH CAUTION). +# 3 - NCAR parametric GFS bias correction +# 4 - NCAR parametric HRRR bias correction +TemperatureBiasCorrection = [0] + +# Specify a surface pressure bias correction method. +# 0 - No bias correction. +# 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY +PressureBiasCorrection = [0] + +# Specify a specific humidity bias correction method. +# 0 - No bias correction. +# 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY +# 2 - Custom NCAR bias-correction based on HRRRv3 analysis - based on hour of day (USE WITH CAUTION). +HumidityBiasCorrection = [0] + +# Specify a wind bias correction. +# 0 - No bias correction. +# 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY +# 2 - Custom NCAR bias-correction based on HRRRv3 analysis - based on hour of day (USE WITH CAUTION). +# 3 - NCAR parametric GFS bias correction +# 4 - NCAR parametric HRRR bias correction +WindBiasCorrection = [0] + +# Specify a bias correction for incoming short wave radiation flux. +# 0 - No bias correction. +# 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY +# 2 - Custom NCAR bias-correction based on HRRRv3 analysis (USE WITH CAUTION). +SwBiasCorrection = [0] + +# Specify a bias correction for incoming long wave radiation flux. +# 0 - No bias correction. +# 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY +# 2 - Custom NCAR bias-correction based on HRRRv3 analysis, blanket adjustment (USE WITH CAUTION). +# 3 - NCAR parametric GFS bias correction +LwBiasCorrection = [0] + +# Specify a bias correction for precipitation. +# 0 - No bias correction. +# 1 - CFSv2 - NLDAS2 Parametric Distribution - NWM ONLY +PrecipBiasCorrection = [0] + +[Downscaling] +# Choose downscaling options for each of the input forcing files. Length of each option +# must match the length of input forcings. + +# Specify a temperature downscaling method: +# 0 - No downscaling. +# 1 - Use a simple lapse rate of 6.75 degrees Celsius to get from the model elevation +# to the WRF-Hydro elevation. +# 2 - Use a pre-calculated lapse rate regridded to the WRF-Hydro domain. +TemperatureDownscaling = [2] + +# Specify a surface pressure downscaling method: +# 0 - No downscaling. +# 1 - Use input elevation and WRF-Hydro elevation to downscale +# surface pressure. +PressureDownscaling = [1] + +# Specify a shortwave radiation downscaling routine. +# 0 - No downscaling +# 1 - Run a topographic adjustment using the WRF-Hydro elevation +ShortwaveDownscaling = [1] + +# Specify a precipitation downscaling routine. +# 0 - No downscaling +# 1 - NWM mountain mapper downscaling using monthly PRISM climo. +PrecipDownscaling = [0] + +# Specify a specific humidity downscaling routine. +# 0 - No downscaling +# 1 - Use regridded humidity, along with downscaled temperature/pressure +# to extrapolate a downscaled surface specific humidty. +HumidityDownscaling = [1] + +# Specify the input parameter directory containing necessary downscaling grids. +DownscalingParamDirs = /glade/p/cisl/nwc/nwm_forcings/NWM_v21_Params/Medium_Range + +[SuppForcing] +# Choose a set of supplemental precipitation file(s) to layer +# into the final LDASIN forcing files processed from +# the options above. The following is a mapping of +# numeric values to external input native forcing files: +# 1 - MRMS GRIB2 hourly radar-only QPE +# 2 - MRMS GRIB2 hourly gage-corrected radar QPE +# 3 - WRF-ARW 2.5 km 48-hr Hawaii nest precipitation. +# 4 - WRF-ARW 2.5 km 48-hr Puerto Rico nest precipitation. +# 5 - Hawaii MRMS GRIB2 hourly MultiSensor QPE +SuppPcp = [8] + +# Specify the file type for each supplemental precipitation file (comma separated) +# Valid types are GRIB1, GRIB2, and NETCDF +# (GRIB files will be converted internally with WGRIB[2]) +SuppPcpForcingTypes = GRIB2 + +# Specify the correponding supplemental precipitation directories +# that will be searched for input files. +SuppPcpDirectories = /glade/scratch/mazrooei/DATA/NBM/noaa-nbm-grib2-pds_APCP + +# Specify regridding options for the supplemental precipitation products. +RegridOptSuppPcp = [1] + +# Specify whether the Supplemental Precips listed above are mandatory, or optional. +# This is important for layering contingencies if a product is missing, +# but forcing files are still desired. +# 0 - Not mandatory +# 1 - Mandatory +SuppPcpMandatory = [1] + +# Specify the time interpretation methods for the supplemental precipitation +# products. +SuppPcpTemporalInterpolation = [0] + +# In AnA runs, this value is the offset from the available forecast and 00z +# For example, if forecast are available at 06z and 18z, set this value to 6 +SuppPcpInputOffsets = [0] + +# Optional RQI method for radar-based data. +# 0 - Do not use any RQI filtering. Use all radar-based estimates. +# 1 - Use hourly MRMS Radar Quality Index grids. +# 2 - Use NWM monthly climatology grids (NWM only!!!!) +RqiMethod = 0 + +# Optional RQI threshold to be used to mask out. Currently used for MRMS products. +# Please choose a value from 0.0-1.0. Associated radar quality index files will be expected +# from MRMS data. +RqiThreshold = 0.9 + +# Specify an optional directory that contains supplemental precipitation parameter fields, +# I.E monthly RQI climatology +SuppPcpParamDir = /glade/p/cisl/nwc/nwm_forcings/NWM_v21_Params/Medium_Range + +[Ensembles] +# Choose ensemble options for each input forcing file being used. Ensemble options include: +# FILL IN ENSEMBLE OPTIONS HERE..... +# Choose the CFS ensemble member number to process +cfsEnsNumber = + +[Custom] +# These are options for specifying custom input NetCDF forcing files (in minutes). +# Choose the input frequency of files that are being processed. I.E., are the +# input files every 15 minutes, 60 minutes, 3-hours, etc. Please specify the +# length of custom input frequencies to match the number of custom NetCDF inputs +# selected above in the Logistics section. +custom_input_fcst_freq = [] + diff --git a/Util/pull_s3_grib_vars.py b/Util/pull_s3_grib_vars.py new file mode 100755 index 0000000..06359eb --- /dev/null +++ b/Util/pull_s3_grib_vars.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python +# Developed by Bill Petzke at RAL NCAR +# 04/05/2021 + +import argparse +import sys +import re +from io import BytesIO +from io import TextIOWrapper + +import boto3 +from botocore import UNSIGNED +from botocore.client import Config + +""" +Script pulls grib2 data from s3 by variable + +Example Usage: ./pull_s3_grib_vars.py s3://noaa-nbm-grib2-pds/blend.20201001/06/qmd/blend.t06z.qmd.f012.ak.grib2 s3://noaa-nbm-grib2-pds/blend.20201001/06/qmd/blend.t06z.qmd.f012.ak.grib2.idx ./blend.t06z.qmd.f012.ak.grib2 "^.*APCP:surface:6-12 hour acc fcst:$" +""" + + +def file_exists_s3(bucket_name,key,verbose=False): + """ + Checks if a file exists in hopefully the most performant way possible. + """ + #client = boto3.client('s3') + client = boto3.client('s3', config=Config(signature_version=UNSIGNED)) + response_json = client.list_objects_v2( + Bucket=bucket_name, + Prefix=key, + ) + if verbose: + print(response_json) + for o in response_json.get('Contents', []): + if o['Key'] == key: + return True + + return False + + +def parse_s3_url(url): + """ + Break a url into bucket and key + """ + from urllib.parse import urlparse + up = urlparse(url) + bucket = up.netloc + key = up.path.lstrip('/') + return bucket, key + + +def load_txt_stream_s3(bucket_name,key): + """ + Gets a buffered text stream of the data for bucket_name and key. + """ + #client = boto3.client('s3') + client = boto3.client('s3', config=Config(signature_version=UNSIGNED)) + obj = client.get_object(Bucket=bucket_name, Key=key) + bytestream = BytesIO(obj['Body'].read()) + txtstream = TextIOWrapper(bytestream) + return txtstream + + +def load_byte_stream_s3(bucket_name,key,byte_range=""): + """ + Gets a buffered byte stream of the data for bucket_name and key. + """ + #client = boto3.client('s3') + client = boto3.client('s3', config=Config(signature_version=UNSIGNED)) + obj = client.get_object(Bucket=bucket_name, Key=key, Range=byte_range) + bytestream = BytesIO(obj['Body'].read()) + return bytestream + + +def pull_s3_grib_vars(grib_file,grib_index,out_file,var_regex_strs,debug): + var_regexs = [re.compile(var_regex) for var_regex in var_regex_strs] + grib_bucket,grib_key = parse_s3_url(grib_file) + grib_idx_bucket,grib_idx_key = parse_s3_url(grib_index) + + if not file_exists_s3(grib_bucket,grib_key): + print("Grib file not found") + return False + if not file_exists_s3(grib_idx_bucket,grib_idx_key): + print("Grib index file not found") + return False + + found_match = False + with load_txt_stream_s3(grib_idx_bucket,grib_idx_key) as idx_file: + line = idx_file.readline() + while line: + line = line.rstrip() + if debug: + print("Index line: %s:" % line) + for var_regex in var_regexs: + if var_regex.match(line): + found_match = True + print("Matched Index line: %s" % line) + line_num,beg_offset,ref_date,var_name,var_prime,hour_desc,level = line.split(":") + + pos = idx_file.tell() + next_line = idx_file.readline() + idx_file.seek(pos) + + if next_line: + next_line = next_line.rstrip() + _,end_offset,_,_,_,_,_ = next_line.split(":") + end_offset = str(int(end_offset)-1) + else: + end_offset = '' + + byte_range = "bytes=%s-%s" % (beg_offset,end_offset) + print("Downloading %s %s" % (grib_file, byte_range)) + file_part = load_byte_stream_s3(grib_bucket, grib_key, byte_range) + with open(out_file,"ab+") as of: + of.write(bytes(file_part.read())) + line = idx_file.readline() + + if not found_match: + print("Regex patterns not found in the index file") + + return found_match + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('grib_file',type=str,help='.grib2 file') + parser.add_argument('grib_index',type=str,help='.grib2.idx file') + parser.add_argument('out_file',type=str,help='.grib2 file') + parser.add_argument('var_regex_strs',nargs="+",type=str,help='var_regex1 [var_regex2 ...]') + parser.add_argument("--debug",action='store_true',help="Debug output flag") + + args = parser.parse_args() + success = pull_s3_grib_vars(args.grib_file,args.grib_index,args.out_file,args.var_regex_strs,args.debug) + sys.exit(not success) + + +if __name__ == '__main__': + main() diff --git a/core/config.py b/core/config.py index 79179f6..eaf5035 100755 --- a/core/config.py +++ b/core/config.py @@ -29,6 +29,7 @@ def __init__(self, config): self.supp_precip_param_dir = None self.input_force_mandatory = None self.supp_precip_mandatory = None + self.supp_pcp_max_hours = None self.number_inputs = None self.number_supp_pcp = None self.number_custom_inputs = 0 diff --git a/core/regrid.py b/core/regrid.py index 5b7e47b..544d1ff 100755 --- a/core/regrid.py +++ b/core/regrid.py @@ -2573,6 +2573,13 @@ def regrid_hourly_nbm_apcp(supplemental_precip, config_options, wrf_hydro_geo_me :param mpi_config: :return: """ + # Do we want to use NBM data at this timestep? If not, log and continue + if not config_options.use_data_at_current_time: + if mpi_config.rank == 0: + config_options.statusMsg = "Exceeded max hours for NBM precipitation, will not use NBM in final layering." + err_handler.log_msg(config_options, mpi_config) + return + # If the expected file is missing, this means we are allowing missing files, simply # exit out of this routine as the regridded fields have already been set to NDV. if not os.path.exists(supplemental_precip.file_in1): @@ -2584,7 +2591,6 @@ def regrid_hourly_nbm_apcp(supplemental_precip, config_options, wrf_hydro_geo_me if supplemental_precip.regridComplete: return - nbm_tmp_nc = config_options.scratch_dir + "/NBM_PCP_TMP-{}.nc".format(mkfilename()) if os.path.isfile(nbm_tmp_nc): config_options.statusMsg = "Found old temporary file: " + \ diff --git a/core/time_handling.py b/core/time_handling.py index 7db9bf0..37878c3 100755 --- a/core/time_handling.py +++ b/core/time_handling.py @@ -2272,19 +2272,12 @@ def find_hourly_nbm_apcp_neighbors(supplemental_precip, config_options, d_curren # Check to see if files are already set. If not, then reset, grids and # regridding objects to communicate things need to be re-established. if supplemental_precip.file_in1 != tmp_file1 or supplemental_precip.file_in2 != tmp_file2: - if config_options.current_output_step == 1: - supplemental_precip.regridded_precip1 = supplemental_precip.regridded_precip1 - supplemental_precip.regridded_precip2 = supplemental_precip.regridded_precip2 - else: - # The forecast window has shifted. Reset fields 2 to - # be fields 1. - supplemental_precip.regridded_precip1 = supplemental_precip.regridded_precip1 - supplemental_precip.regridded_precip2 = supplemental_precip.regridded_precip2 + supplemental_precip.regridded_precip1 = supplemental_precip.regridded_precip1 + supplemental_precip.regridded_precip2 = supplemental_precip.regridded_precip2 supplemental_precip.file_in1 = tmp_file1 supplemental_precip.file_in2 = tmp_file2 supplemental_precip.regridComplete = False - # Ensure we have the necessary new file if mpi_config.rank == 0: if not os.path.isfile(supplemental_precip.file_in2) and ((supplemental_precip.keyValue == 8) or (supplemental_precip.keyValue == 9)):