Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/README.rst
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
:maxdepth: 1
:caption: Contents:

README
installation
configuration
dev_guide
Expand Down
202 changes: 121 additions & 81 deletions shearwater/processes/wps_cyclone.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pywps import Process, LiteralInput, ComplexOutput
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError
import numpy as np
import numpy
import pandas as pd
Expand All @@ -8,6 +9,7 @@
from pathlib import Path
import urllib.request
import xarray as xr

try:
import metview as mv
except Exception:
Expand All @@ -17,13 +19,15 @@
import matplotlib.pyplot as plt

import logging

LOGGER = logging.getLogger("PYWPS")

FORMAT_PNG = Format("image/png", extension=".png", encoding="base64")


class Cyclone(Process):
"""A process to forecast tropical cyclone activity."""

def __init__(self):
inputs = [
LiteralInput(
Expand All @@ -32,7 +36,7 @@ def __init__(self):
data_type="string",
abstract="""Enter an initialisation date between 1940-01-01 and 2024-12-31.
Note that the years 1980-2015 have been used for training and tuning the ML models.""",
default="2024-07-03"
default="2024-07-03",
),
LiteralInput(
"leadtime",
Expand All @@ -55,7 +59,7 @@ def __init__(self):
"288-336 h",
"312-360 h",
],
default="0-48 h"
default="0-48 h",
),
LiteralInput(
"region",
Expand All @@ -72,62 +76,76 @@ def __init__(self):
"Southern Indian",
],
default="North Atlantic",
)
),
]
outputs = [
ComplexOutput('output_csv',
'Tropical cyclone activity forecast',
abstract='csv file',
as_reference=True,
keywords=['output', 'result', 'response'],
supported_formats=[FORMATS.CSV],),
ComplexOutput("output_png",
"Tropical cyclone activity forecast",
abstract="png file",
as_reference=True,
supported_formats=[FORMAT_PNG],),
ComplexOutput(
"output_csv",
"Tropical cyclone activity forecast",
abstract="csv file",
as_reference=True,
keywords=["output", "result", "response"],
supported_formats=[FORMATS.CSV],
),
ComplexOutput(
"output_png",
"Tropical cyclone activity forecast",
abstract="png file",
as_reference=True,
supported_formats=[FORMAT_PNG],
),
]

super(Cyclone, self).__init__(
self._handler,
identifier='cyclone',
title='Tropical cyclone activity',
abstract='A process to forecast tropical cyclone activity in tropical ocean basins up to 15 days ahead.',
identifier="cyclone",
title="Tropical cyclone activity",
abstract="A process to forecast tropical cyclone activity in tropical ocean basins up to 15 days ahead.",
metadata=[
Metadata('PyWPS', 'https://pywps.org/'),
Metadata("PyWPS", "https://pywps.org/"),
],
version='0.1',
version="0.1",
inputs=inputs,
outputs=outputs,
store_supported=True,
status_supported=True
status_supported=True,
)

def _handler(self, request, response):
LOGGER.info("Running tropical cyclone activity process ...")
msg = "Running tropical cyclone activity process ..."
LOGGER.info(msg)
response.update_status(msg, 0)
# TODO: lazy load tensorflow ... issues with sphinx doc build
try:
from tensorflow.keras import models
except Exception as ex:
msg = 'Models from tensorflow.keras could not be imported. Reason for the failure: {} '.format(ex)
msg = "Models from tensorflow.keras could not be imported. Reason for the failure: {} ".format(
ex
)
LOGGER.error(msg)

try:
init_date = request.inputs['init_date'][0].data
init_date = request.inputs["init_date"][0].data
LOGGER.info(init_date)
leadtime = request.inputs['leadtime'][0].data
leadtime = request.inputs["leadtime"][0].data
LOGGER.info(leadtime)
region = request.inputs['region'][0].data
region = request.inputs["region"][0].data
LOGGER.info(region)
except Exception as ex:
msg = 'Input variables could not be set. Reason for the failure: {} '.format(ex)
msg = (
"Input variables could not be set. Reason for the failure: {} ".format(
ex
)
)
LOGGER.error(msg)

# Check validity of input date
VALIDstr = pd.Timestamp('1940-01-01')
VALIDend = pd.Timestamp('2024-12-31')
if ((pd.Timestamp(init_date) >= VALIDstr) & (pd.Timestamp(init_date) <= VALIDend)):
msg = 'Input date is valid.'
VALIDstr = pd.Timestamp("1940-01-01")
VALIDend = pd.Timestamp("2024-12-31")
if (pd.Timestamp(init_date) >= VALIDstr) & (
pd.Timestamp(init_date) <= VALIDend
):
msg = "Input date is valid."
LOGGER.info(msg)

parameters = [
Expand All @@ -142,41 +160,41 @@ def _handler(self, request, response):
reso = 2.5

regions_dict = {
"Southern Indian": [0, 20, -30, 90], # Southern Indian
"North Atlantic": [40, -90, 10, -20], # North Atlantic
"Northwest Pacific": [35, 100, 5, 170], # Northwest Pacific
"Australia": [0, 90, -30, 160], # Australia
"Northern Indian": [30, 30, 0, 100], # Northern Indian
"East Pacific": [30, -170, 0, -100], # East Pacific
"South Pacific": [0, 160, -30, 230], # South Pacific
"Southern Indian": [0, 20, -30, 90], # Southern Indian
"North Atlantic": [40, -90, 10, -20], # North Atlantic
"Northwest Pacific": [35, 100, 5, 170], # Northwest Pacific
"Australia": [0, 90, -30, 160], # Australia
"Northern Indian": [30, 30, 0, 100], # Northern Indian
"East Pacific": [30, -170, 0, -100], # East Pacific
"South Pacific": [0, 160, -30, 230], # South Pacific
}

lags_dict = {
"0-48 h": 0,
"24-72 h": 1,
"48-96 h": 2,
"72-120 h": 3,
"96-144 h": 4,
"120-168 h": 5,
"144-192 h": 6,
"168-216 h": 7,
"192-240 h": 8,
"216-264 h": 9,
"240-288 h": 10,
"264-312 h": 11,
"288-336 h": 12,
"312-360 h": 13,
}
"0-48 h": 0,
"24-72 h": 1,
"48-96 h": 2,
"72-120 h": 3,
"96-144 h": 4,
"120-168 h": 5,
"144-192 h": 6,
"168-216 h": 7,
"192-240 h": 8,
"216-264 h": 9,
"240-288 h": 10,
"264-312 h": 11,
"288-336 h": 12,
"312-360 h": 13,
}

region_string = {
"Southern Indian": "Sindian", # Southern Indian
"North Atlantic": "Natlantic", # North Atlantic
"Northwest Pacific": "NWpacific", # Northwest Pacific
"Australia": "Australia", # Australia
"Northern Indian": "Nindian", # Northern Indian
"East Pacific": "Epacific", # East Pacific
"South Pacific": "Spacific", # South Pacific
}
"Southern Indian": "Sindian", # Southern Indian
"North Atlantic": "Natlantic", # North Atlantic
"Northwest Pacific": "NWpacific", # Northwest Pacific
"Australia": "Australia", # Australia
"Northern Indian": "Nindian", # Northern Indian
"East Pacific": "Epacific", # East Pacific
"South Pacific": "Spacific", # South Pacific
}

region_bbox = regions_dict[region]
lag = lags_dict[leadtime]
Expand All @@ -186,7 +204,7 @@ def _handler(self, request, response):
if mv:
for param1 in parameters:
path = f'/pool/data/ERA5/E5/{"sf" if param1[2]==[0] else "pl"}/an/1D/{str(param1[0]).zfill(3)}'
filename_part2 = f'{init_date[:7]}_{str(param1[0]).zfill(3)}'
filename_part2 = f"{init_date[:7]}_{str(param1[0]).zfill(3)}"
fs1_param = mv.read(
f"{path}/E5{'sf' if param1[2]==[0] else 'pl'}00_1D_{filename_part2}.grb"
)
Expand Down Expand Up @@ -225,9 +243,11 @@ def _handler(self, request, response):
else:
reg = region_string[region]
data1 = pd.read_csv(
f"https://github.com/climateintelligence/shearwater/raw/main/data/test_dailymeans_{reg}_1.zip")
f"https://github.com/climateintelligence/shearwater/raw/main/data/test_dailymeans_{reg}_1.zip"
)
data2 = pd.read_csv(
f"https://github.com/climateintelligence/shearwater/raw/main/data/test_dailymeans_{reg}_2.zip")
f"https://github.com/climateintelligence/shearwater/raw/main/data/test_dailymeans_{reg}_2.zip"
)
data = pd.concat((data1, data2), ignore_index=True)
data = data.loc[(data.time == init_date)]

Expand All @@ -244,52 +264,61 @@ def _handler(self, request, response):
]

means, stds = pd.read_pickle(
"https://github.com/climateintelligence/shearwater/raw/main/data/full_statistics.zip")
"https://github.com/climateintelligence/shearwater/raw/main/data/full_statistics.zip"
)

data[variables] = (data[variables]-means[variables])/stds[variables]
data[variables] = (data[variables] - means[variables]) / stds[variables]

sz_lat = len(data.latitude.dropna().unique())
sz_lon = len(data.longitude.dropna().unique())
number_of_img, rows, cols = len(data.time.unique()), sz_lat, sz_lon
images = np.zeros((number_of_img, rows, cols, len(variables)))
df = data.sort_values(by=['time', 'latitude', 'longitude'])
df = data.sort_values(by=["time", "latitude", "longitude"])
verbose = False
k = 0
for day in range(0, number_of_img):

a = df.iloc[377*day:377*(day+1)]
a = df.iloc[377 * day : 377 * (day + 1)] # noqa
i = 0
for var in variables:
anew = a.pivot(index='latitude', columns='longitude').sort_index(ascending=False)[var]
anew = a.pivot(index="latitude", columns="longitude").sort_index(
ascending=False
)[var]
images[day, :, :, i] = anew
i += 1
k += 1
if ((k % 100 == 0) & (verbose is True)):
if (k % 100 == 0) & (verbose is True):
print(k)

test_img_std = images
# print("images", test_img_std)

test_img_std = np.pad(test_img_std, ((0, 0), (1, 2), (1, 2), (0, 0)), 'constant')
test_img_std = np.pad(
test_img_std, ((0, 0), (1, 2), (1, 2), (0, 0)), "constant"
)

workdir = Path(self.workdir)
LOGGER.info(workdir)
model_path = os.path.join(workdir, f"UNET020_sevenAreas_fullStd_{lag}lag_model.keras")
model_path = os.path.join(
workdir, f"UNET020_sevenAreas_fullStd_{lag}lag_model.keras"
)
git_path = "https://github.com/climateintelligence/shearwater/raw/main"
urllib.request.urlretrieve(
f"{git_path}/data/UNET020_sevenAreas_fullStd_{lag}lag_model.keras",
model_path
model_path,
)

model_trained = models.load_model(model_path)

prediction = model_trained.predict(test_img_std)

data = data[["latitude", "longitude", "time"]]
data[f'predictions_lag{lag}'] = prediction.reshape(-1, 1)
data[f"predictions_lag{lag}"] = prediction.reshape(-1, 1)

workdir = Path(self.workdir)
outfilename = os.path.join(
workdir, f'tcactivity_48_17_{init_date.replace("-","")}_lag{lag}_{region_string[region]}'
workdir,
f'tcactivity_48_17_{init_date.replace("-","")}_lag{lag}_{region_string[region]}',
)

if mv:
Expand Down Expand Up @@ -328,19 +357,26 @@ def _handler(self, request, response):
legend="on",
)
coastlines = mv.mcoast(
map_coastline_land_shade="on", map_coastline_land_shade_colour="grey"
map_coastline_land_shade="on",
map_coastline_land_shade_colour="grey",
)

gview = mv.geoview(
map_area_definition="corners", area=region_bbox, coastlines=coastlines
map_area_definition="corners",
area=region_bbox,
coastlines=coastlines,
)
legend = mv.mlegend(
legend_text_font_size=0.5,
)

VTstr = (pd.Timestamp(init_date) + pd.Timedelta(str(lag)+'d')).strftime('%Y-%m-%d') + ' 00Z'
laggg = pd.Timedelta(str(2)+'d')
VTend = (pd.Timestamp(init_date) + pd.Timedelta(str(lag)+'d') + laggg).strftime('%Y-%m-%d') + ' 00Z'
VTstr = (
pd.Timestamp(init_date) + pd.Timedelta(str(lag) + "d")
).strftime("%Y-%m-%d") + " 00Z"
laggg = pd.Timedelta(str(2) + "d")
VTend = (
pd.Timestamp(init_date) + pd.Timedelta(str(lag) + "d") + laggg
).strftime("%Y-%m-%d") + " 00Z"
subtitle1 = f"<font size='0.4'>{region}, Initialisation: {init_date} 00Z, Lead time: {leadtime}"
subtitle2 = f", Valid time: {VTstr+' to '+VTend}</font>"
title = mv.mtext(
Expand All @@ -360,8 +396,10 @@ def _handler(self, request, response):
data.to_csv(outfilename + ".csv")
response.outputs["output_csv"].file = outfilename + ".csv"
else:
xr_predictions = xr.Dataset.from_dataframe(data.set_index(['time', 'latitude', 'longitude']))
xr_predictions = xr_predictions[f'predictions_lag{lag}']
xr_predictions = xr.Dataset.from_dataframe(
data.set_index(["time", "latitude", "longitude"])
)
xr_predictions = xr_predictions[f"predictions_lag{lag}"]

figs, axs = plt.subplots()
xr_predictions.plot(ax=axs)
Expand All @@ -375,5 +413,7 @@ def _handler(self, request, response):
else:
msg = f"Input date '{init_date}' outside the allowed period."
LOGGER.error(msg)
response.update_status(msg, 3)
raise ProcessError(msg)

return response