diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/.validate b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/.validate index 38d306164d..d7331a932a 100755 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/.validate +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/.validate @@ -16,11 +16,9 @@ # along with this program. If not, see . set -eux -APIKEY="$(head --lines 1 ../api-keys)" FLOW_NAME="$(< /dev/urandom tr -dc A-Za-z | head -c6)" cylc lint . cylc install --workflow-name "$FLOW_NAME" --no-run-name -sed -i "s/DATAPOINT_API_KEY/$APIKEY/" "$HOME/cylc-run/$FLOW_NAME/flow.cylc" cylc validate --check-circular --icp=2000 "$FLOW_NAME" cylc play --no-detach --abort-if-any-task-fails "$FLOW_NAME" cylc clean "$FLOW_NAME" diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/consolidate-observations b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/consolidate-observations index 0a25a2e53e..e6babab133 100755 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/consolidate-observations +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/consolidate-observations @@ -61,7 +61,7 @@ def plot_wind_data(wind_x, wind_y, x_range, y_range, x_coords, y_coords, [x[0] for x in z_coords], [y[1] for y in z_coords], color='red') - plt.savefig('wind.png') + plt.savefig(f'{os.environ["CYLC_TASK_LOG_DIR"]}/wind.png') def get_wind_fields(): @@ -115,4 +115,5 @@ def main(): if __name__ == '__main__': + util.sleep() main() diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/forecast b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/forecast index a3f73c1edd..c5e46cbf8b 100755 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/forecast +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/forecast @@ -150,11 +150,12 @@ def push_rainfall(rainfall, wind_data, step, resolution, spline_level): dim_x, dim_y, resolution, resolution, spline_level) + domain = util.parse_domain(os.environ['DOMAIN']) + while True: out_of_bounds = [] for itt in range(len(x_values)): try: - domain = util.parse_domain(os.environ['DOMAIN']) lng = domain['lng1'] + x_values[itt] lat = domain['lat1'] + y_values[itt] @@ -242,6 +243,7 @@ def main(forecast_interval, forecast_iterations): if __name__ == '__main__': + util.sleep() try: args = [int(sys.argv[1]), int(sys.argv[2])] except IndexError: diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-observations b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-observations index 394add6012..329c25dc39 100755 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-observations +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-observations @@ -24,11 +24,8 @@ Usage: get-observations Environment Variables: - SITE_ID: The four digit DataPoint site code identifying the weather station - we are to fetch results for. - API_KEY: The DataPoint API key, required for getting live weather data. - If un-specified then get-observations will fall back to archive data - from the workflow directory. + SITE_ID: The four digit WMO (World Meteorological Organization) + site code identifying the weather station we are to fetch results for. """ @@ -40,10 +37,8 @@ import re import requests +import util -BASE_URL = ('http://datapoint.metoffice.gov.uk/public/data/' - 'val/wxobs/all/json/{site_id}' - '?res=hourly&time={time}&key={api_key}') # Compass bearings for ordinate directions. # NOTE: We measure direction by where the winds have come from! @@ -66,22 +61,52 @@ ORDINATES = { 'NNW': '157.5' } +class Meteorology: + """ + Surface winds tend to be 20-30 degrees backed (vector anticlockwise) + from the winds which steer weather systems: + Friction with the ground surface tends to mean that land surface winds + are as slow as half the speed of the wind at 2000ft. This fudge factor is + a more conservative 1.5: -class NoDataException(Exception): - ... + .. seealso:: + [Source Book to the Forecaster's Reference Book](https://digital.nmla + .metoffice.gov.uk/IO_011f7cd4-50fc-4903-b556-d24480ea883d/), section + 1.2 + """ + SURFACE_BACKING = 2 + SURFACE_FRICTION = .66 + KT_TO_MPH = 1.15078 + + @staticmethod + def process_direction(direction: str) -> str: + """Process raw wind direction: + + * Convert direction from 10s of degrees to degrees. + * Convert from Oceanographic (wind to) to Meteorological (wind from) + convention. + * Surface Friction Correction + """ + return str( + ((int(direction) + 18 - Meteorology.SURFACE_BACKING) % 36) * 10) + + @staticmethod + def process_speed(speed: str) -> str: + """Process Raw wind speed + + * Convert to KT to MPH + * Surface Friction Correction + """ + return str( + ( + int(speed) * Meteorology.KT_TO_MPH + ) / Meteorology.SURFACE_FRICTION + ) -def get_datapoint_data(site_id, time, api_key): - """Get weather data from the DataPoint service.""" - # The URL required to get the data. - time = datetime.strptime(time, '%Y%m%dT%H%MZ').strftime('%Y-%m-%dT%H:%MZ') - url = BASE_URL.format(time=time, site_id=site_id, api_key=api_key) - req = requests.get(url) - if req.status_code != 200: - raise Exception(f'{url} returned exit code {req.status_code}') - # Get the data and parse it as JSON. - print('Opening URL: %s' % url) - return req.json()['SiteRep']['DV']['Location'] + +class NoDataException(Exception): + ... def get_archived_data(site_id, time): @@ -150,11 +175,12 @@ def synop_grab(site_id, time): raise NoDataException( f'Request for data failed, raw request was {req.text}') - # Parse the direction from 10's of degrees to degrees: - data['direction'] = str(int(data['direction']) * 10) + # * Parse the direction from 10's of degrees to degrees + # * Convert direction from to direction it's blowing to + data['direction'] = Meteorology.process_direction(data['direction']) # Convert data in KT to MPH: - data['speed'] = str(int(data['speed']) * 1.15078) + data['speed'] = Meteorology.process_speed(data['speed']) # Get lat and long from MO Data: lat, lon = reference_lat_long(site_id) @@ -183,7 +209,7 @@ def get_nearby_site(site_id, badsites): return int(result[0]), dist -def main(site_id, api_key=None): +def main(site_id): cycle_point = os.environ['CYLC_TASK_CYCLE_POINT'] # Try to get the information from SYNOPS. @@ -200,13 +226,8 @@ def main(site_id, api_key=None): site_id, dist = get_nearby_site(site_id, badsites) if obs is None: - if api_key: - print('Attempting to get weather data from DataPoint...') - data = get_datapoint_data(site_id, cycle_point, api_key) - else: - print('No API key provided, falling back to archived data') - data = get_archived_data(site_id, cycle_point) - + print('Obs unavailable, falling back to archived data') + data = get_archived_data(site_id, cycle_point) obs = extract_observations(data) # Write observations. @@ -215,5 +236,5 @@ def main(site_id, api_key=None): if __name__ == '__main__': - main(os.environ['SITE_ID'], - os.environ.get('API_KEY')) + util.sleep() + main(os.environ['SITE_ID']) diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-rainfall b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-rainfall index 481b203765..811fa46231 100755 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-rainfall +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/get-rainfall @@ -24,9 +24,6 @@ Usage: get-rainfall Environment Variables: - API_KEY: The DataPoint API key, required for getting live weather data. - If un-specified then get-observations will fall back to archive data - from the workflow directory. DOMAIN: The area in which to generate forecasts for in the format (lng1, lat1, lng2, lat2). RESOLUTION: The length/width of each grid cell in degrees. @@ -38,20 +35,20 @@ import math import os import shutil -import requests +import h5py +from pathlib import Path +import urllib +import requests # noqa: F401 - required implicitly by urllib. -try: - from PIL import Image -except ModuleNotFoundError: - # not all PIL installations are created equal - # sometimes we must import like this - import Image -from mercator import get_offset, get_scale, pos_to_coord +from mercator import get_scale, pos_to_coord import util - -URL = ('http://datapoint.metoffice.gov.uk/public/data/layer/wxobs/' - 'RADAR_UK_Composite_Highres/png?TIME={time}&key={api_key}') +S3URL = ( + 'https://met-office-radar-obs-data.s3-eu-west-2.amazonaws.com/radar/' + '{Y}/{m}/{d}/{YYYYmmddHHMM}_ODIM_ng_radar_rainrate_composite_1km_UK.h5' +) +DEBUG=os.environ['CYLC_DEBUG']=='true' +CYLC_TASK_LOG_DIR=os.environ['CYLC_TASK_LOG_DIR'] class Rainfall(object): @@ -62,16 +59,6 @@ class Rainfall(object): resolution (float): The length of each grid cell in degrees. """ - VALUE_MAP = { - (0, 0, 254, 255): 1, - (50, 101, 254, 255): 2, - (127, 127, 0, 255): 3, - (254, 203, 0, 255): 4, - (254, 152, 0, 255): 5, - (254, 0, 0, 255): 6, - (254, 0, 254, 255): 7 - } - def __init__(self, domain, resolution): self.resolution = resolution self.domain = domain @@ -98,10 +85,23 @@ class Rainfall(object): """ itt_x, itt_y = util.get_grid_coordinates(lng, lat, self.domain, self.resolution) - try: - self.data[itt_y][itt_x].append(self.VALUE_MAP[value]) - except KeyError: - pass + + self.data[itt_y][itt_x].append(self.value_map(value)) + + @staticmethod + def value_map(v): + """Convert rainfall rate values into colour space values. + + Checks if rainfall value above each threshold in turn. + + TODO: + - Unit test this + """ + thresholds = {32, 16, 8, 4, 2, 1, .5, .2} + for i, threshold in enumerate(sorted(thresholds, reverse=True)): + if v > threshold: + return 8 - i + return 0 def compute_bins(self): """Return this dataset as a 2D matrix.""" @@ -114,25 +114,6 @@ class Rainfall(object): return self.data -def get_datapoint_radar_image(filename, time, api_key): - """Retrieve a png image of rainfall from the DataPoint service. - - Args: - filename (str): The path to write the image file to. - time (str): The datetime of the image to retrieve in ISO8601 format. - api_key (str): Datapoint API key. - - """ - time = datetime.strptime(time, '%Y%m%dT%H%MZ').strftime( - '%Y-%m-%dT%H:%M:%SZ') - url = URL.format(time=time, api_key=api_key) - req = requests.get(url) - if req.status_code != 200: - raise Exception(f'{url} returned exit code {req.status_code}') - with open(filename, 'bw') as png_file: - png_file.write(req.content) - - def get_archived_radar_image(filename, time): """Retrieve a png image from the archived data in the workflow directory. @@ -147,8 +128,23 @@ def get_archived_radar_image(filename, time): filename) +def get_amazon_radar_data(filename, time): + time = datetime.strptime(time, '%Y%m%dT%H%MZ') + url = S3URL.format( + Y=time.strftime('%Y'), + m=time.strftime('%m'), + d=time.strftime('%d'), + YYYYmmddHHMM=time.strftime('%Y%m%d%H%M') + ) + print(f'[INFO] Getting data from {url=}') + data = urllib.request.urlopen(url).read() + with open(filename, 'wb') as fh: + fh.write(data) + + +# def process_rainfall_data(filename, resolution, domain): def process_rainfall_data(filename, resolution, domain): - """Generate a 2D matrix of data from the rainfall data in the image. + """get_amazon_radar_dataGenerate a 2D matrix of data from the rainfall data in the image. Args: filename (str): Path to the png image to process. @@ -160,39 +156,65 @@ def process_rainfall_data(filename, resolution, domain): list - A 2D matrix of rainfall data. """ + print(f'[INFO] Analysing data from {filename}') + data = h5py.File(filename)['dataset1']['data1']['data'] rainfall = Rainfall(domain, resolution) - image = Image.open(filename) - scale = get_scale(domain, image.width) - offset = get_offset(domain, scale) + _height, width = data.shape + + scale = get_scale(domain, width) + offset = (-1100.8461538461539, 1400.6953225710452) + + if DEBUG: + print(f'[INFO] {scale=}, {offset=}') + from matplotlib import pyplot as plt + Path(CYLC_TASK_LOG_DIR).mkdir(parents=True, exist_ok=True) + plt.imshow(data) + plt.savefig(f'{CYLC_TASK_LOG_DIR}/raw.data.png') - for itt_x in range(image.width): - for itt_y in range(image.height): + for itt_y, row in enumerate(data): + for itt_x, col in enumerate(row): lng, lat = pos_to_coord( itt_x, itt_y * (2. / 3.), # Counter aspect ratio. - offset, scale) - rainfall.add(lng, lat, image.getpixel((itt_x, itt_y))) + offset, + scale + ) + val = float(col) + # Original data uses -1 to indicate radar mask + val = 0 if val == -1 else val + rainfall.add(lng, lat, val) + data = rainfall.compute_bins() - return rainfall.compute_bins() + if DEBUG: + plt.imshow(data) + plt.savefig(f'{CYLC_TASK_LOG_DIR}/processed.data.png') + return data def main(): time = os.environ['CYLC_TASK_CYCLE_POINT'] resolution = float(os.environ['RESOLUTION']) domain = util.parse_domain(os.environ['DOMAIN']) - api_key = os.environ.get('API_KEY') - if api_key: - print('Attempting to get weather data from the DataPoint service.') - get_datapoint_radar_image('rainfall-radar.png', time, api_key) + # Acts as a switch - if a file-name is given, use that file-name + canned_data = os.environ.get('CANNED_DATA', 'fetch') + + if canned_data == 'fetch': + print('[INFO] Attempting to get rainfall data from S3 bucket') + get_amazon_radar_data('radardata.h5', time) + canned_data = 'radardata.h5' else: - print('No API key provided, falling back to archived data') - get_archived_radar_image('rainfall-radar.png', time) + raise Exception('Not currently provided functionality') + # Not currently used. + #print(f'[INFO] Canned data provided: {canned_data}') + #get_archived_radar_data(canned_data, time) + + data = process_rainfall_data(canned_data, resolution, domain) - data = process_rainfall_data('rainfall-radar.png', resolution, domain) util.write_csv('rainfall.csv', data) if __name__ == '__main__': + util.sleep(2) # make the tutorial run a little slower main() diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/post-process b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/post-process index 3316152e8e..c834d644cf 100755 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/post-process +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/bin/post-process @@ -101,6 +101,7 @@ def main(site_name, time): if __name__ == '__main__': + util.sleep() try: args = (sys.argv[1].lower(), int(sys.argv[2])) except IndexError: diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/flow.cylc b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/flow.cylc index 20c95cb34b..e9743ec3c3 100644 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/flow.cylc +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/flow.cylc @@ -1,6 +1,4 @@ #!jinja2 -[scheduler] - UTC mode = True [task parameters] # A list of the weather stations we will be fetching observations from. station = camborne, heathrow, shetland, aldergrove @@ -41,15 +39,13 @@ [[[environment]]] # The dimensions of each grid cell in degrees. RESOLUTION = 0.2 + # The area to generate forecasts for (lng1, lat1, lng2, lat2) - DOMAIN = -12,48,5,61 # Do not change! + DOMAIN = -12,46,12,61 + [[get_observations]] script = get-observations - [[[environment]]] - # The key required to get weather data from the DataPoint service. - # To use archived data comment this line out. - API_KEY = DATAPOINT_API_KEY [[get_observations]] [[[environment]]] @@ -69,10 +65,6 @@ [[get_rainfall]] script = get-rainfall - [[[environment]]] - # The key required to get weather data from the DataPoint service. - # To use archived data comment this line out. - API_KEY = DATAPOINT_API_KEY [[forecast]] script = forecast 60 5 # Generate 5 forecasts at 60 minute intervals. diff --git a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py index 2addf5c5f2..6450bbc161 100644 --- a/metomi/rose/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py +++ b/metomi/rose/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py @@ -21,9 +21,11 @@ from copy import copy from contextlib import suppress import math -import jinja2 +import os import sys +import time +import jinja2 R_0 = 6371. # Radius of the Earth (km). @@ -280,13 +282,18 @@ def __call__(self, grid_x, grid_y): return z_val -def parse_domain(domain): - bbox = list(map(float, domain.split(','))) +def parse_domain(domain: str): + lng1, lat1, lng2, lat2 = list(map(float, domain.split(','))) + msg = "Invalid domain '{}' ({} {} >= {})" + if lng1 >= lng2: + raise ValueError(msg.format(domain, 'longitude', lng1, lng2)) + if lat1 >= lat2: + raise ValueError(msg.format(domain, 'latitude', lat1, lat2)) return { - 'lng1': bbox[0], - 'lat1': bbox[1], - 'lng2': bbox[2], - 'lat2': bbox[3] + 'lng1': lng1, + 'lat1': lat1, + 'lng2': lng2, + 'lat2': lat2, } @@ -301,3 +308,12 @@ def generate_html_map(filename, template_file, data, domain, resolution): lat2=domain['lat2'], data=data )) + + +def sleep(secs=4): + """Make the tutorials run a little slower so users can follow along. + + (Only if not running in CI). + """ + if 'CI' not in os.environ: + time.sleep(secs)