diff --git a/example/app.py b/example/app.py
index 8aff167..10f26a5 100755
--- a/example/app.py
+++ b/example/app.py
@@ -14,13 +14,13 @@
import numpy as np
# create some podpac nodes
-data = np.random.rand(11, 21)
+data = np.random.default_rng(1).random((11, 21))
lat = np.linspace(90, -90, 11)
lon = np.linspace(-180, 180, 21)
coords = podpac.Coordinates([lat, lon], dims=["lat", "lon"])
node1 = podpac.data.Array(source=data, coordinates=coords)
-data2 = np.random.rand(11, 21)
+data2 = np.random.default_rng(1).random((11, 21))
node2 = podpac.data.Array(source=data2, coordinates=coords)
# use podpac nodes to create some OGC layers
@@ -75,10 +75,12 @@ def api_home(endpoint):
app = servers.FlaskServer(__name__, ogcs=[NonFouoOGC, FouoOGC], home_func=api_home)
+
# add in some other endpoints.
@app.route("/")
def home():
- return f'This is an example OGC flask app. See FULL and PARTIAL endpoints.'
+ return """This is an example OGC flask app.
+ See FULL and PARTIAL endpoints."""
@app.route("/layers/")
diff --git a/ogc/core.py b/ogc/core.py
index e4cf1dc..b9f04b8 100755
--- a/ogc/core.py
+++ b/ogc/core.py
@@ -3,6 +3,7 @@
Currently holds some definitions for interface classes.
"""
+
import gc
import logging
import traitlets as tl
@@ -21,6 +22,7 @@
LOAD_FAILURE = "Failed to load and validate: "
INVALID_ARGUMENTS = "Invalid arguments"
+
class OGC(tl.HasTraits):
wms_capabilities = tl.Instance(klass=wms_response_1_3_0.Capabilities)
@@ -28,9 +30,7 @@ class OGC(tl.HasTraits):
endpoint = tl.Unicode(default_value="/ogc", allow_none=True)
service_title = tl.Unicode(default_value="OGC Server", allow_none=True)
- service_abstract = tl.Unicode(
- default_value="An example OGC Server", allow_none=True
- )
+ service_abstract = tl.Unicode(default_value="An example OGC Server", allow_none=True)
server_address = tl.Unicode(default_value="http://127.0.0.1:5000", allow_none=True)
service_group_title = tl.Unicode(default_value="Data Products", allow_none=True)
@@ -73,22 +73,83 @@ def get_coverage_from_id(self, identifier):
exception_text="Invalid coverage {}".format(identifier),
)
- def handle_wcs_kv(self, args):
- if args["request"] == "GetCapabilities":
- get_capabilities = wcs_request_1_0_0.GetCapabilities()
- try:
- get_capabilities.load_from_kv(args)
- get_capabilities.validate()
- except Exception:
- logger.error(LOAD_FAILURE, exc_info=True)
- raise WCSException(exception_text=INVALID_ARGUMENTS)
+ def get_capabilities_wcs(self, args):
+ get_capabilities = wcs_request_1_0_0.GetCapabilities()
+ try:
+ get_capabilities.load_from_kv(args)
+ get_capabilities.validate()
+ except Exception:
+ logger.error(LOAD_FAILURE, exc_info=True)
+ raise WCSException(exception_text=INVALID_ARGUMENTS)
+
+ capabilities = self.wcs_capabilities
+
+ if args["base_url"]:
+ capabilities.base_url = args["base_url"]
+
+ return capabilities.to_xml()
+
+ def describe_coverage_wcs(self, args, wcs_request, wcs_response):
+ describe_coverage = wcs_request.DescribeCoverage()
+ try:
+ describe_coverage.load_from_kv(args)
+ describe_coverage.validate()
+ except Exception:
+ logger.error(LOAD_FAILURE, exc_info=True)
+ raise WCSException(exception_text=INVALID_ARGUMENTS)
+
+ coverages = [self.get_coverage_from_id(identifier.value) for identifier in describe_coverage.identifiers]
+ coverage_description = wcs_response.CoverageDescription(coverages=coverages)
+
+ return coverage_description.to_xml()
+
+ def get_coverage_wcs(self, args, wcs_request):
+ get_coverage = wcs_request.GetCoverage()
+ try:
+ get_coverage.load_from_kv(args)
+ get_coverage.validate()
+ except Exception:
+ logger.error(LOAD_FAILURE, exc_info=True)
+ raise WCSException(exception_text=INVALID_ARGUMENTS)
+
+ coverage = self.get_coverage_from_id(get_coverage.identifier.value)
+
+ from dateutil.parser import parse
+
+ if get_coverage.width == 0:
+ raise WCSException(
+ exception_code="InvalidParameterValue",
+ locator="VERSION",
+ exception_text="Grid coordinates x_size must be greater than 0",
+ )
+ if get_coverage.height == 0:
+ raise WCSException(
+ exception_code="InvalidParameterValue",
+ locator="VERSION",
+ exception_text="Grid coordinates y_size must be greater than 0",
+ )
+ if get_coverage.height * get_coverage.width > settings.MAX_GRID_COORDS_REQUEST_SIZE:
+ raise WCSException(
+ exception_code="InvalidParameterValue",
+ locator="VERSION",
+ exception_text="Grid coordinates x_size * y_size must be less than %d"
+ % settings.MAX_GRID_COORDS_REQUEST_SIZE,
+ )
+
+ fp = coverage.layer.get_coverage(args)
- capabilities = self.wcs_capabilities
+ fn = coverage.identifier.split(".")[-1] + ".tif"
- if args["base_url"]:
- capabilities.base_url = args["base_url"]
+ # Collect garbage
+ gc.collect()
- return capabilities.to_xml()
+ response = {"fp": fp, "fn": fn}
+
+ return response
+
+ def handle_wcs_kv(self, args):
+ if args["request"] == "GetCapabilities":
+ return self.get_capabilities_wcs(args)
if "version" in args and args["version"] == "1.0.0":
wcs_response = wcs_response_1_0_0
@@ -101,87 +162,96 @@ def handle_wcs_kv(self, args):
)
if args["request"] == "DescribeCoverage":
+ return self.describe_coverage_wcs(args, wcs_request, wcs_response)
- describe_coverage = wcs_request.DescribeCoverage()
- try:
- describe_coverage.load_from_kv(args)
- describe_coverage.validate()
- except Exception:
- logger.error(LOAD_FAILURE, exc_info=True)
- raise WCSException(exception_text=INVALID_ARGUMENTS)
+ elif args["request"] == "GetCoverage":
+ return self.get_coverage_wcs(args, wcs_request)
- coverages = [
- self.get_coverage_from_id(identifier.value)
- for identifier in describe_coverage.identifiers
- ]
- coverage_description = wcs_response.CoverageDescription(coverages=coverages)
+ raise WCSException(exception_text="KV Request not handled properly: " + str(args))
- return coverage_description.to_xml()
+ def get_capabilities_wms(self, args):
+ get_capabilities = wms_request_1_3_0.GetCapabilities()
+ try:
+ get_capabilities.load_from_kv(args)
+ get_capabilities.validate()
+ except Exception:
+ logger.error(LOAD_FAILURE, exc_info=True)
+ raise WCSException(exception_text=INVALID_ARGUMENTS)
- elif args["request"] == "GetCoverage":
- get_coverage = wcs_request.GetCoverage()
- try:
- get_coverage.load_from_kv(args)
- get_coverage.validate()
- except Exception:
- logger.error(LOAD_FAILURE, exc_info=True)
- raise WCSException(exception_text=INVALID_ARGUMENTS)
+ wms_capabilities = self.wms_capabilities
- coverage = self.get_coverage_from_id(get_coverage.identifier.value)
+ if args["base_url"]:
+ wms_capabilities.base_url = args["base_url"]
+ return wms_capabilities.to_xml()
- from dateutil.parser import parse
+ def get_legend_graphic(self, args, wms_request):
+ get_legend_graphic = wms_request.GetLegendGraphic()
+ try:
+ get_legend_graphic.load_from_kv(args)
+ get_legend_graphic.validate()
+ except Exception:
+ logger.error(LOAD_FAILURE, exc_info=True)
+ raise WCSException(exception_text=INVALID_ARGUMENTS)
+ coverage = self.get_coverage_from_id(get_legend_graphic.layer.value)
- if get_coverage.width == 0:
- raise WCSException(
- exception_code="InvalidParameterValue",
- locator="VERSION",
- exception_text="Grid coordinates x_size must be greater than 0",
- )
- if get_coverage.height == 0:
- raise WCSException(
- exception_code="InvalidParameterValue",
- locator="VERSION",
- exception_text="Grid coordinates y_size must be greater than 0",
- )
- if get_coverage.height * get_coverage.width > settings.MAX_GRID_COORDS_REQUEST_SIZE:
- raise WCSException(
- exception_code="InvalidParameterValue",
- locator="VERSION",
- exception_text="Grid coordinates x_size * y_size must be less than %d" % settings.MAX_GRID_COORDS_REQUEST_SIZE,
- )
+ fp = coverage.layer.get_legend_graphic(args)
+ fn = coverage.identifier.split(".")[-1] + ".png"
- fp = coverage.layer.get_coverage(args)
+ response = {"fp": fp, "fn": fn}
+ return response
- fn = coverage.identifier.split(".")[-1] + ".tif"
+ def get_map(self, args, wms_request):
+ get_map = wms_request.GetMap()
+ try:
+ get_map.load_from_kv(args)
+ get_map.validate()
+ except Exception:
+ logger.error(LOAD_FAILURE, exc_info=True)
+ raise WCSException(exception_text=INVALID_ARGUMENTS)
- # Collect garbage
- gc.collect()
+ coverage = self.get_coverage_from_id(get_map.layer.value)
- response = {"fp": fp, "fn": fn}
+ # Make sure the request size is correct
+ if get_map.width == 0:
+ raise WCSException(
+ exception_code="InvalidParameterValue",
+ locator="VERSION",
+ exception_text="Grid coordinates x_size must be greater than 0",
+ )
+ if get_map.height == 0:
+ raise WCSException(
+ exception_code="InvalidParameterValue",
+ locator="VERSION",
+ exception_text="Grid coordinates y_size must be greater than 0",
+ )
+ if get_map.height * get_map.width > settings.MAX_GRID_COORDS_REQUEST_SIZE:
+ raise WCSException(
+ exception_code="InvalidParameterValue",
+ locator="VERSION",
+ exception_text="Grid coordinates x_size * y_size must be less than %d"
+ % settings.MAX_GRID_COORDS_REQUEST_SIZE,
+ )
- return response
+ try:
+ fp = coverage.layer.get_map(args)
+ except Exception:
+ logger.error("Failed to get_map from layer: ", exc_info=True)
+ raise WCSException(exception_text=INVALID_ARGUMENTS)
- raise WCSException(
- exception_text="KV Request not handled properly: " + str(args)
- )
+ fn = coverage.identifier.split(".")[-1] + ".png"
- def handle_wms_kv(self, args):
- if args["request"] == "GetCapabilities":
- get_capabilities = wms_request_1_3_0.GetCapabilities()
- try:
- get_capabilities.load_from_kv(args)
- get_capabilities.validate()
- except Exception:
- logger.error(LOAD_FAILURE, exc_info=True)
- raise WCSException(exception_text=INVALID_ARGUMENTS)
+ # Collect garbage
+ gc.collect()
- wms_capabilities = self.wms_capabilities
+ response = {"fp": fp, "fn": fn}
- if args["base_url"]:
- wms_capabilities.base_url = args["base_url"]
- return wms_capabilities.to_xml()
+ return response
+
+ def handle_wms_kv(self, args):
+ if args["request"] == "GetCapabilities":
+ return self.get_capabilities_wms(args)
if args["request"] == "GetFeatureInfo":
raise WCSException(
@@ -200,70 +270,9 @@ def handle_wms_kv(self, args):
)
if args["request"].lower() == "getlegendgraphic":
- get_legend_graphic = wms_request.GetLegendGraphic()
- try:
- get_legend_graphic.load_from_kv(args)
- get_legend_graphic.validate()
- except Exception:
- logger.error(LOAD_FAILURE, exc_info=True)
- raise WCSException(exception_text=INVALID_ARGUMENTS)
-
- coverage = self.get_coverage_from_id(get_legend_graphic.layer.value)
-
- fp = coverage.layer.get_legend_graphic(args)
-
- fn = coverage.identifier.split(".")[-1] + ".png"
-
- response = {"fp": fp, "fn": fn}
- return response
+ return self.get_legend_graphic(args, wms_request)
if args["request"].lower() == "getmap":
+ return self.get_map(args, wms_request)
- get_map = wms_request.GetMap()
- try:
- get_map.load_from_kv(args)
- get_map.validate()
- except Exception:
- logger.error(LOAD_FAILURE, exc_info=True)
- raise WCSException(exception_text=INVALID_ARGUMENTS)
-
- coverage = self.get_coverage_from_id(get_map.layer.value)
-
- # Make sure the request size is correct
- if get_map.width == 0:
- raise WCSException(
- exception_code="InvalidParameterValue",
- locator="VERSION",
- exception_text="Grid coordinates x_size must be greater than 0",
- )
- if get_map.height == 0:
- raise WCSException(
- exception_code="InvalidParameterValue",
- locator="VERSION",
- exception_text="Grid coordinates y_size must be greater than 0",
- )
- if get_map.height * get_map.width > settings.MAX_GRID_COORDS_REQUEST_SIZE:
- raise WCSException(
- exception_code="InvalidParameterValue",
- locator="VERSION",
- exception_text="Grid coordinates x_size * y_size must be less than %d" % settings.MAX_GRID_COORDS_REQUEST_SIZE,
- )
-
- try:
- fp = coverage.layer.get_map(args)
- except Exception:
- logger.error("Failed to get_map from layer: ", exc_info=True)
- raise WCSException(exception_text=INVALID_ARGUMENTS)
-
- fn = coverage.identifier.split(".")[-1] + ".png"
-
- # Collect garbage
- gc.collect()
-
- response = {"fp": fp, "fn": fn}
-
- return response
-
- raise WCSException(
- exception_text="KV Request not handled properly: " + str(args)
- )
+ raise WCSException(exception_text="KV Request not handled properly: " + str(args))
diff --git a/ogc/ogc_common.py b/ogc/ogc_common.py
index 3926189..20b3424 100755
--- a/ogc/ogc_common.py
+++ b/ogc/ogc_common.py
@@ -27,7 +27,7 @@ def to_xml(self):
raise NotImplementedError("XML Serialization not implemented.")
def _load_xml_doc(self, xml_doc):
- """ Override this method with code that unpacks contents of XML into the traits object."""
+ """Override this method with code that unpacks contents of XML into the traits object."""
raise NotImplementedError("XML Parsing not implemented.")
def load_from_kv(self, args):
@@ -91,6 +91,7 @@ def validate(self):
def to_xml(self):
raise NotImplementedError()
+
class WCSException(Exception):
def __init__(
self,
diff --git a/ogc/podpac.py b/ogc/podpac.py
index 2490435..7e86b05 100755
--- a/ogc/podpac.py
+++ b/ogc/podpac.py
@@ -18,7 +18,6 @@
import re
-
def _uppercase_for_dict_keys(lower_dict):
upper_dict = {}
for k, v in lower_dict.items():
@@ -49,6 +48,7 @@ class Layer(ogc.Layer):
node = tl.Instance(klass=podpac.Node, allow_none=True)
convert_requests_to_default_crs = tl.Bool(default_value=True)
+ crs84_colon_format = "CRS:84"
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -70,24 +70,18 @@ def get_map(self, args):
orig_w = args["WIDTH"]
# don't reduce # pixels to less than MIN_N (unless orig request was smaller)
MIN_N = 8
- args["HEIGHT"] = max(
- min(int(orig_h), MIN_N), int(round(int(orig_h) / rescale))
- )
- args["WIDTH"] = max(
- min(int(orig_w), MIN_N), int(round(int(orig_w) / rescale))
- )
+ args["HEIGHT"] = max(min(int(orig_h), MIN_N), int(round(int(orig_h) / rescale)))
+ args["WIDTH"] = max(min(int(orig_w), MIN_N), int(round(int(orig_w) / rescale)))
else:
rescale = 0
- if "CRS" in args and args["CRS"].upper() == "CRS:84".upper():
+ if "CRS" in args and args["CRS"].upper() == self.crs84_colon_format.upper():
args["CRS"] = "CRS84" # for pyproj
if self.convert_requests_to_default_crs and "DEFAULT_CRS" in podpac.settings:
# PODPAC transforms input coords to crs of datasource recursively
# every time eval is used in a Node included in its dependency tree.
# This optimization can be used if most datasources are stored in the same crs.
- coords = Coordinates.from_url(args).transform(
- podpac.settings["DEFAULT_CRS"]
- )
+ coords = Coordinates.from_url(args).transform(podpac.settings["DEFAULT_CRS"])
else:
coords = Coordinates.from_url(args)
@@ -98,17 +92,10 @@ def get_map(self, args):
if rescale > 1.0:
args["HEIGHT"] = orig_h
args["WIDTH"] = orig_w
- if (
- self.convert_requests_to_default_crs
- and "DEFAULT_CRS" in podpac.settings
- ):
- rescaledcoords = Coordinates.from_url(args).transform(
- podpac.settings["DEFAULT_CRS"]
- )
+ if self.convert_requests_to_default_crs and "DEFAULT_CRS" in podpac.settings:
+ rescaledcoords = Coordinates.from_url(args).transform(podpac.settings["DEFAULT_CRS"])
else:
rescaledcoords = Coordinates.from_url(args)
- # rescaled_node = podpac.data.Array(source=output, coordinates=coords, style = node.style)
- # output = rescaled_node.eval(rescaledcoords)
output = output.interp(
lat=rescaledcoords["lat"].coordinates + 1e-6,
lon=rescaledcoords["lon"].coordinates + 1e-6,
@@ -122,15 +109,13 @@ def get_map(self, args):
def get_coverage(self, args):
args = _uppercase_for_dict_keys(args)
- if "CRS" in args and args["CRS"].upper() == "CRS:84".upper():
+ if "CRS" in args and args["CRS"].upper() == self.crs84_colon_format.upper():
args["CRS"] = "CRS84" # for pyproj
if self.convert_requests_to_default_crs and "DEFAULT_CRS" in podpac.settings:
# PODPAC transforms input coords to crs of datasource recursively
# every time eval is used in a Node included in its dependency tree.
# This optimization can be used if most datasources are stored in the same crs.
- coords = Coordinates.from_url(args).transform(
- podpac.settings["DEFAULT_CRS"]
- )
+ coords = Coordinates.from_url(args).transform(podpac.settings["DEFAULT_CRS"])
else:
coords = Coordinates.from_url(args)
@@ -184,17 +169,18 @@ class LegendGraphic(tl.HasTraits):
"""
A class for generating legend graphics with adjustable properties such as width, height, fonts, and color mappings.
"""
+
width = tl.Float(default_value=0.7) # inches
max_width = tl.Float(default_value=1.5) # inches
- min_width = tl.Float(default_value=0.8) # inches
+ min_width = tl.Float(default_value=0.8) # inches
height = tl.Float(default_value=2.5) # inches
max_heigth = tl.Float(default_value=5.5) # inches
dpi = tl.Float(default_value=100) # pixels per inch
- units_fontsize = tl.Float(default_value=13) # used for units text at top of legend
- colorbar_fontsize = tl.Float(default_value=10) # used for tick marks of colorbar
- enumeration_fontsize = tl.Float(default_value=16) # used for labels of categorical legends
- enumeration_min_fontsize = tl.Float(default_value=5)# minimum fontsize for categorical legends with lots of labels
- max_unit_chars = tl.Float(default_value=16) # maximum characters allowed in a line for units
+ units_fontsize = tl.Float(default_value=13) # used for units text at top of legend
+ colorbar_fontsize = tl.Float(default_value=10) # used for tick marks of colorbar
+ enumeration_fontsize = tl.Float(default_value=16) # used for labels of categorical legends
+ enumeration_min_fontsize = tl.Float(default_value=5) # minimum fontsize for categorical legends with lots of labels
+ max_unit_chars = tl.Float(default_value=16) # maximum characters allowed in a line for units
units = tl.Unicode(default_value=tl.Undefined, allow_none=True)
img_format = tl.Enum(values=["png", "pdf", "ps", "eps", "svg"], default_value="png")
cmap = tl.Instance(klass=mpl.colors.Colormap, default_value=mpl.cm.viridis)
@@ -205,9 +191,7 @@ class LegendGraphic(tl.HasTraits):
default_value=None,
allow_none=True,
)
- enumeration_colors = tl.Dict(
- key_trait=tl.Int(), default_value=None, allow_none=True
- )
+ enumeration_colors = tl.Dict(key_trait=tl.Int(), default_value=None, allow_none=True)
clim = tl.List(default_value=[None, None])
def __init__(self, *args, **kwargs):
@@ -219,7 +203,7 @@ def __init__(self, *args, **kwargs):
def legend_image(self):
"""
Generates the legend image based on provided parameters.
-
+
Returns:
io.BytesIO: A byte stream containing the legend image in the specified format.
"""
@@ -257,19 +241,21 @@ def check_if_units_need_to_be_wrapped(self, fig):
"""
Checks if the unit text needs wrapping. If so, it divieds the string into bins the
size of max_unit_chars.
-
+
Args:
fig (matplotlib.figure.Figure): The figure object.
-
+
Returns:
tuple: Updated figure, boolean flag indicating wrapping, and the wrapped unit string.
"""
units = "[%s]" % self.units
- needs_wrap = len(units)>self.max_unit_chars #if characters are greater than 16 then wrap text, shrink colorbar
+ needs_wrap = (
+ len(units) > self.max_unit_chars
+ ) # if characters are greater than 16 then wrap text, shrink colorbar
# currently only allows for 2 lines
wrapped_units = self.wrap_text(units, self.max_unit_chars)
# format exponents
- units = re.sub(r"\^(\d+)", r"$^{\1}\!$", units)
+ units = re.sub(r"\^(\d+)", r"$^{\1}\!$", units)
units = re.sub(r"\^-(\d+)", r"$^{-\1}\!$", units)
# add units to figure
fig.text(
@@ -279,28 +265,30 @@ def check_if_units_need_to_be_wrapped(self, fig):
fontsize=self.units_fontsize,
horizontalalignment="center",
verticalalignment="top",
- wrap=True
+ wrap=True,
)
return fig, needs_wrap, wrapped_units
-
+
def adjust_fig_height_for_wrapped_units(self, fig, wrapped_units):
"""
Adjusts figure height to accommodate wrapped units. Will increase figure
size for each line of wrapped text until maximum figure height is reached
-
+
Args:
fig (matplotlib.figure.Figure): The figure object.
wrapped_units (str): The wrapped unit string.
-
+
Returns:
tuple: Updated figure and adjusted axis.
"""
-
+
# wrap text and increase height of figure
added_lines = wrapped_units.count("\n")
added_height = self.additional_height_for_wrapped_text(added_lines, self.units_fontsize)
- fig_height = min(self.max_heigth, added_height + self.height) # add extra height to figure ensure it is less than 6.5 in
- # adjust fig size to fit units
+ fig_height = min(
+ self.max_heigth, added_height + self.height
+ ) # add extra height to figure ensure it is less than 6.5 in
+ # adjust fig size to fit units
fig.set_size_inches(self.max_width, fig_height, forward=True)
# Standard height ratio (before adjustments)
@@ -314,118 +302,122 @@ def adjust_fig_height_for_wrapped_units(self, fig, wrapped_units):
def adjust_fig_width_for_unwrapped_units(self, fig, units_str):
"""
- Adjusts figure width to accommodate units.
+ Adjusts figure width to accommodate units.
Expects units to be under max_unit_chars
-
+
Args:
fig (matplotlib.figure.Figure): The figure object.
wrapped_units (str): The wrapped unit string.
-
+
Returns:
tuple: Updated figure and adjusted axis.
"""
# add space for units
ax = fig.add_axes([0.25, 0.05, 0.15, 0.80])
# Estimates the max label width assuming fontsize 10
- max_label_width_units = self.get_max_text_width([units_str], self.units_fontsize)
-
+ max_label_width_units = self.get_max_text_width([units_str], self.units_fontsize)
+
# add color bar and see if fig width needs to be bigger for tick marks
norm = mpl.colors.Normalize(vmin=self.clim[0], vmax=self.clim[1])
cb = mpl.colorbar.ColorbarBase(ax, cmap=self.cmap, norm=norm)
-
+
# Convert ticks to float32 to avoid errors converting float64 to string
- tick_labels = [str(t) for t in cb.ax.get_yticks().astype('f4')]
+ tick_labels = [str(t) for t in cb.ax.get_yticks().astype("f4")]
max_label_width_ticks = self.get_max_text_width(tick_labels, self.colorbar_fontsize)
- #define minimum width need or max_label width + some extra margin
- fig_width = max(self.min_width, max_label_width_units+0.2, max_label_width_ticks+0.4)
+ # define minimum width need or max_label width + some extra margin
+ fig_width = max(self.min_width, max_label_width_units + 0.2, max_label_width_ticks + 0.4)
fig.set_size_inches(fig_width, self.height, forward=True)
return fig, ax
-
+
def create_enumeration_legend(self, fig, ax):
"""
- Creates a legend for categorical data.
+ Creates a legend for categorical data.
Dynamically adjusts figure size based on number of labels and label text length
Adds colorbar to figure
-
+
Args:
fig (matplotlib.figure.Figure): The figure object.
ax (matplotlib.axes.Axes): The axis object.
-
+
Returns:
matplotlib.figure.Figure: The updated figure.
"""
enum_values = list(self.enumeration_colors.keys())
enum_colors = list(self.enumeration_colors.values())
enum_labels = list(self.enumeration_legend.values())
-
- # Dynamically adjust font size based on the number of ticks
- font_size = max(self.enumeration_min_fontsize, self.enumeration_fontsize - (len(enum_values) * 0.35)) # Scale font size
- # Change legend dynamically
- max_label_width = self.get_max_text_width(enum_labels, font_size) # Estimates the max label width assuming fontsize 10
+ # Dynamically adjust font size based on the number of ticks
+ font_size = max(
+ self.enumeration_min_fontsize, self.enumeration_fontsize - (len(enum_values) * 0.35)
+ ) # Scale font size
+
+ # Change legend dynamically
+ max_label_width = self.get_max_text_width(
+ enum_labels, font_size
+ ) # Estimates the max label width assuming fontsize 10
fig_width = 0.5 + max_label_width # Base width + label-dependent width
fig_height = min(self.max_heigth, len(enum_colors) * 0.25) # Adjust height based on number of labels
fig.set_size_inches(fig_width, fig_height, forward=True)
-
- self.cmap = mpl.colors.ListedColormap(enum_colors) #create categorical colomap to replace previous cmap
- bounds = np.array([val-0.5 for val in np.arange(1,len(enum_values)+2)])
+
+ self.cmap = mpl.colors.ListedColormap(enum_colors) # create categorical colomap to replace previous cmap
+ bounds = np.array([val - 0.5 for val in np.arange(1, len(enum_values) + 2)])
norm = mpl.colors.BoundaryNorm(bounds, self.cmap.N)
cb = mpl.colorbar.ColorbarBase(
ax,
cmap=self.cmap,
norm=norm,
- ticks=np.arange(1,len(self.enumeration_legend)+1),
+ ticks=np.arange(1, len(self.enumeration_legend) + 1),
)
if self.enumeration_legend:
cb.ax.set_yticklabels(enum_labels, fontsize=font_size)
return fig
-
+
def adjust_fig_width_for_long_tick_marks(self, fig, ax):
"""
- Adjusts figure width to accommodate tick marks that are long.
-
+ Adjusts figure width to accommodate tick marks that are long.
+
Args:
fig (matplotlib.figure.Figure): The figure object.
ax (matplotlib.axes.Axes): The axis object.
-
+
Returns:
matplotlib.figure.Figure: The updated figure.
"""
norm = mpl.colors.Normalize(vmin=self.clim[0], vmax=self.clim[1])
cb = mpl.colorbar.ColorbarBase(ax, cmap=self.cmap, norm=norm)
- tick_labels = [str(t) for t in cb.ax.get_yticks().astype('f4')] # Convert ticks to strings
+ tick_labels = [str(t) for t in cb.ax.get_yticks().astype("f4")] # Convert ticks to strings
max_label_width = self.get_max_text_width(tick_labels, self.colorbar_fontsize)
fig_width = max_label_width + 0.4
- fig.set_size_inches(fig_width, self.height,forward=True)
+ fig.set_size_inches(fig_width, self.height, forward=True)
return fig
def get_max_text_width(self, labels, font_size=10):
"""
Estimates the maximum width of given text labels in inches.
-
+
Args:
labels (list of str): List of text labels.
font_size (int): Font size used for labels.
-
+
Returns:
float: The maximum width of the labels in inches.
"""
fig, ax = plt.subplots() # Create a temporary figure
renderer = fig.canvas.get_renderer() # Get renderer to measure text
-
+
text_widths = []
for label in labels:
text = ax.text(0, 0, label, fontsize=font_size) # Attach text to the figure
text_widths.append(text.get_window_extent(renderer).width)
-
+
plt.close(fig) # Close temporary figure
return max(text_widths) / self.dpi # Convert pixels to inches
-
+
def wrap_text(self, text, max_width_chars=16):
"""
Split string into sections of length max_width_chars
@@ -435,16 +427,16 @@ def wrap_text(self, text, max_width_chars=16):
def additional_height_for_wrapped_text(self, added_lines_num, font_size):
"""
Calculates additional height needed for wrapped text based on number lines needed
-
+
Args:
added_lines_num (int): Number of additional wrapped lines.
font_size (int): Font size of the text.
-
+
Returns:
float: Additional height required in inches.
"""
font_height_px = font_size * (self.dpi / 72) # Convert to pixels
font_height_in = font_height_px / self.dpi # Convert pixels to inches
- additional_height = font_height_in*added_lines_num
+ additional_height = font_height_in * added_lines_num
return additional_height
diff --git a/ogc/servers.py b/ogc/servers.py
index 0d66aad..dd3eab0 100755
--- a/ogc/servers.py
+++ b/ogc/servers.py
@@ -21,7 +21,7 @@ def respond_xml(doc, status=200):
# First, validate that XML can be parsed.
from lxml import etree
- root = etree.fromstring(doc.encode("ascii"))
+ etree.fromstring(doc.encode("ascii"))
# Then, return w/ proper content type
return Response(doc, mimetype="text/xml", status=status)
@@ -101,12 +101,8 @@ def method():
setattr(self, method_name, method)
method = getattr(self, method_name)
method.__name__ = method_name
- self.add_url_rule(
- endpoint, view_func=method, methods=["GET", "POST"]
- ) # add render method as flask route
- setattr(
- self, method_name, method
- ) # bind route function call to instance method
+ self.add_url_rule(endpoint, view_func=method, methods=["GET", "POST"]) # add render method as flask route
+ setattr(self, method_name, method) # bind route function call to instance method
def ogc_render(self, ogc_idx):
logger.info("OGC server.ogc_render %i", ogc_idx)
@@ -153,15 +149,11 @@ def ogc_render(self, ogc_idx):
as_attach = True if fn.endswith("tif") else False
return send_file(fp, as_attachment=as_attach, download_name=fn)
- logger.warning(
- "Could not handle this combination of arguments: %r", dict(request.args)
- )
+ logger.warning("Could not handle this combination of arguments: %r", dict(request.args))
raise WCSException("No response for this combination of arguments.")
except WCSException as e:
- logger.error(
- "OGC: server.ogc_render WCSException: %s", str(e), exc_info=True
- )
+ logger.error("OGC: server.ogc_render WCSException: %s", str(e), exc_info=True)
# WCSException is raised when the client sends an invalid set of parameters.
# Therefore it should result in a client error, in the 400 range.
# Security scans have flagged a security concern when returning a 500 error,
diff --git a/ogc/wcs_request_1_0_0.py b/ogc/wcs_request_1_0_0.py
index cd09b5f..36db9f0 100755
--- a/ogc/wcs_request_1_0_0.py
+++ b/ogc/wcs_request_1_0_0.py
@@ -9,6 +9,8 @@
logger = logging.getLogger(__file__)
+WCS_VALIDATION_ERROR = "WCS Request validation error: service should be WCS"
+
class XMLNode(tl.HasTraits):
"""Base class for all Traits objects that correspond to XML schemas."""
@@ -23,7 +25,7 @@ def to_xml(self):
raise NotImplementedError("XML Serialization not implemented.")
def _load_xml_doc(self, xml_doc):
- """ Override this method with code that unpacks contents of XML into the traits object."""
+ """Override this method with code that unpacks contents of XML into the traits object."""
raise NotImplementedError("XML Parsing not implemented.")
def load_from_xml(self, xml_txt):
@@ -56,12 +58,8 @@ class DescribeCoverage(ogc_common.XMLNode):
identifiers = tl.List(trait=tl.Instance(klass=Identifier))
def validate(self):
- assert (
- self.service == "WCS"
- ), "WCS Request validation error: service should be WCS"
- assert self.version.startswith(
- "1.0.0"
- ), "WCS Request validation error: version should be 1.0.0"
+ assert self.service == "WCS", WCS_VALIDATION_ERROR
+ assert self.version.startswith("1.0.0"), "WCS Request validation error: version should be 1.0.0"
for obj in self.identifiers:
obj.validate()
@@ -71,11 +69,7 @@ def _load_from_kv(self, args):
self.service = args["service"].upper()
self.version = args["version"]
- identifiers = [
- Identifier(value=identifier.strip())
- for identifier in args["coverage"].split(",")
- ]
- # assert len(identifiers) == 1, 'Multiple identifiers not yet supported: ' + repr(identifiers)
+ identifiers = [Identifier(value=identifier.strip()) for identifier in args["coverage"].split(",")]
self.identifiers = identifiers
@@ -90,9 +84,7 @@ class GetCapabilities(ogc_common.XMLNode):
accept_formats = tl.List(trait=tl.Instance(klass=ogc_common.OutputFormat))
def validate(self):
- assert (
- self.service == "WCS"
- ), "WCS Request validation error: service should be WCS"
+ assert self.service == "WCS", WCS_VALIDATION_ERROR
for obj in self.accept_formats:
obj.validate()
@@ -120,9 +112,7 @@ def _load_xml_doc(self, xml_doc):
if tag == "AcceptFormats":
for eelement in element:
assert lxml.etree.QName(eelement.tag).localname == "OutputFormat"
- self.accept_formats.append(
- ogc_common.OutputFormat(value=eelement.text)
- )
+ self.accept_formats.append(ogc_common.OutputFormat(value=eelement.text))
else:
logger.warn("Tag %s not known." % tag)
@@ -155,14 +145,10 @@ class GetCoverage(ogc_common.XMLNode):
height = tl.Int()
def validate(self):
- assert (
- self.service == "WCS"
- ), "WCS Request validation error: service should be WCS"
+ assert self.service == "WCS", WCS_VALIDATION_ERROR
assert self.identifier, "WCS Request validation error: no coverage specified"
- assert (
- self.domain_subset_bbox
- ), "WCS Request validation error: no bounding box specified"
+ assert self.domain_subset_bbox, "WCS Request validation error: no bounding box specified"
self.output_format.validate(), "WCS Request validation error: output format"
assert self.height, "WCS Request validation error: no height specified"
assert self.width, "WCS Request validation error: no width specified"
@@ -174,9 +160,7 @@ def validate(self):
self.domain_subset_bbox.lower_corner[1],
self.domain_subset_bbox.upper_corner[1],
]
- if any([abs(l) > 361.000 for l in lons]) or any(
- [abs(l) > 91.000 for l in lats]
- ):
+ if any([abs(l) > 361.000 for l in lons]) or any([abs(l) > 91.000 for l in lats]):
raise ogc_common.WCSException(
exception_code="InvalidParameterValue",
locator="BBOX",
@@ -200,16 +184,14 @@ def _load_from_kv(self, args):
self.crs = args["request_crs"].lower()
if "crs" in list(args.keys()):
- assert (
- args["crs"].lower() in settings.WCS_CRS
- ), "SRS not supported [CRS]: %s (%s are supported.)" % (
+ assert args["crs"].lower() in settings.WCS_CRS, "SRS not supported [CRS]: %s (%s are supported.)" % (
args["crs"],
str(settings.WCS_CRS),
)
self.crs = args["crs"].lower()
bbox = args["bbox"].replace(" ", "").split(",")
- # BBOX = minx, miny, maxx, maxy, minz, maxz
+ # BBOX : [minx, miny, maxx, maxy, minz, maxz]
self.domain_subset_bbox = ogc_common.BoundingBox(
lower_corner=(float(bbox[0]), float(bbox[1])),
upper_corner=(float(bbox[2]), float(bbox[3])),
@@ -225,9 +207,9 @@ def _load_from_kv(self, args):
self.output_format = ogc_common.OutputFormat(value=args["format"])
if "time" in args:
- # TIME = time1, time2,...
+ # TIME : time1, time2,...
# or
- # TIME = min / max / res, ...
+ # TIME : min / max / res, ...
if "," in args["time"]:
raise ogc_common.WCSException(
exception_code="InvalidParameterValue",
diff --git a/ogc/wcs_response_1_0_0.py b/ogc/wcs_response_1_0_0.py
index dd9385e..04a9d75 100755
--- a/ogc/wcs_response_1_0_0.py
+++ b/ogc/wcs_response_1_0_0.py
@@ -49,17 +49,13 @@ def _identifer_default(self):
def _title_default(self):
if self.layer:
- return "({}) {}".format(
- self.constraints_abbreviated, repr(self.layer._style)
- )
+ return "({}) {}".format(self.constraints_abbreviated, repr(self.layer._style))
abstract = tl.Unicode(default_value=None, allow_none=True)
def _abstract_default(self):
if self.layer:
- abstract = "({}) OGC Layer: {}".format(
- self.constraints_abbreviated, repr(self.layer._style)
- )
+ abstract = "({}) OGC Layer: {}".format(self.constraints_abbreviated, repr(self.layer._style))
if self.layer._style.is_enumerated:
abstract += " Layer represents an enumerated (i.e., categorical/non-scalar) quantity."
return abstract
@@ -91,6 +87,110 @@ def _wgs84_bounding_box_upper_corner_lat_lon_default(self):
class CoverageDescription(ogc_common.XMLNode):
coverages = tl.List(trait=tl.Instance(klass=Coverage))
+ def coverage_offering(self, coverage):
+ xml = """ """
+ if coverage.identifier:
+ xml += " {coverage.identifier}\n".format(coverage=coverage)
+ if coverage.title:
+ xml += " {coverage.title}\n".format(coverage=coverage)
+ if coverage.abstract:
+ xml += " {coverage.abstract}\n".format(coverage=coverage)
+ temporal_domain = ""
+ if hasattr(coverage.layer, "valid_times"):
+ if coverage.layer.all_times_valid:
+ temporal_domain += " \n"
+ temporal_domain += " \n"
+ temporal_domain += " {}".format(
+ datetime.datetime.min
+ )
+ temporal_domain += " {}".format(
+ datetime.datetime.max
+ )
+ temporal_domain += " \n"
+ temporal_domain += " \n"
+
+ elif coverage.layer.valid_times and coverage.layer.valid_times is not tl.Undefined:
+ temporal_domain += " \n"
+ temporal_domain += "".join(
+ [
+ " {}\n".format(dt.isoformat())
+ for dt in coverage.layer.valid_times
+ ]
+ )
+ temporal_domain += " \n"
+
+ if coverage.wgs84_bounding_box_lower_corner_lat_lon or coverage.wgs84_bounding_box_upper_corner_lat_lon:
+ xml += """
+ {coverage.wgs84_bounding_box_lower_corner_lat_lon[1]} {coverage.wgs84_bounding_box_lower_corner_lat_lon[0]}
+ {coverage.wgs84_bounding_box_upper_corner_lat_lon[1]} {coverage.wgs84_bounding_box_upper_corner_lat_lon[0]}
+ """.format(
+ coverage=coverage
+ )
+
+ xml += """\
+
+
+
+ {coverage.wgs84_bounding_box_lower_corner_lat_lon[1]} {coverage.wgs84_bounding_box_lower_corner_lat_lon[0]}
+ {coverage.wgs84_bounding_box_upper_corner_lat_lon[1]} {coverage.wgs84_bounding_box_upper_corner_lat_lon[0]}
+
+
+
+
+ 0 0
+ {coverage.grid_coordinates.x_size} {coverage.grid_coordinates.y_size}
+
+
+ x
+ y
+
+ {coverage.grid_coordinates.geotransform[0]} {coverage.grid_coordinates.geotransform[3]}
+
+ {coverage.grid_coordinates.geotransform[1]} {coverage.grid_coordinates.geotransform[2]}
+ {coverage.grid_coordinates.geotransform[4]} {coverage.grid_coordinates.geotransform[5]}
+
+
+ {temporal_domain}
+
+
+
+ {coverage.identifier}
+ {coverage.title}
+
+
+ Band
+ Band
+
+ 1
+
+
+
+
+
+
+""".format(
+ coverage=coverage,
+ temporal_domain=temporal_domain,
+ epsg=NATIVE_PROJECTION.upper(),
+ )
+ xml += "\n".join(
+ [
+ " {epsg}".format(epsg=epsg.upper())
+ for epsg in list(settings.WCS_CRS.keys())
+ ]
+ )
+ xml += """
+
+
+"""
+ xml += """\
+
+ GeoTIFF
+
+
+"""
+ return xml
+
def to_xml(self):
xml = """\
@@ -105,123 +205,7 @@ def to_xml(self):
"""
for coverage in self.coverages:
- xml += """ """
- if coverage.identifier:
- xml += " {coverage.identifier}\n".format(
- coverage=coverage
- )
- if coverage.title:
- xml += " {coverage.title}\n".format(
- coverage=coverage
- )
- if coverage.abstract:
- xml += " {coverage.abstract}\n".format(
- coverage=coverage
- )
- temporal_domain = ""
- if hasattr(coverage.layer, "valid_times"):
- if coverage.layer.all_times_valid:
- temporal_domain += " \n"
- temporal_domain += " \n"
- temporal_domain += " {}".format(
- datetime.datetime.min
- )
- temporal_domain += " {}".format(
- datetime.datetime.max
- )
- temporal_domain += " \n"
- temporal_domain += " \n"
-
- elif (
- coverage.layer.valid_times
- and coverage.layer.valid_times is not tl.Undefined
- ):
- temporal_domain += " \n"
- temporal_domain += "".join(
- [
- " {}\n".format(
- dt.isoformat()
- )
- for dt in coverage.layer.valid_times
- ]
- )
- temporal_domain += " \n"
-
- if (
- coverage.wgs84_bounding_box_lower_corner_lat_lon
- or coverage.wgs84_bounding_box_upper_corner_lat_lon
- ):
- xml += """
- {coverage.wgs84_bounding_box_lower_corner_lat_lon[1]} {coverage.wgs84_bounding_box_lower_corner_lat_lon[0]}
- {coverage.wgs84_bounding_box_upper_corner_lat_lon[1]} {coverage.wgs84_bounding_box_upper_corner_lat_lon[0]}
- """.format(
- coverage=coverage
- )
-
- xml += """\
-
-
-
- {coverage.wgs84_bounding_box_lower_corner_lat_lon[1]} {coverage.wgs84_bounding_box_lower_corner_lat_lon[0]}
- {coverage.wgs84_bounding_box_upper_corner_lat_lon[1]} {coverage.wgs84_bounding_box_upper_corner_lat_lon[0]}
-
-
-
-
- 0 0
- {coverage.grid_coordinates.x_size} {coverage.grid_coordinates.y_size}
-
-
- x
- y
-
- {coverage.grid_coordinates.geotransform[0]} {coverage.grid_coordinates.geotransform[3]}
-
- {coverage.grid_coordinates.geotransform[1]} {coverage.grid_coordinates.geotransform[2]}
- {coverage.grid_coordinates.geotransform[4]} {coverage.grid_coordinates.geotransform[5]}
-
-
- {temporal_domain}
-
-
-
- {coverage.identifier}
- {coverage.title}
-
-
- Band
- Band
-
- 1
-
-
-
-
-
-
-""".format(
- coverage=coverage,
- temporal_domain=temporal_domain,
- epsg=NATIVE_PROJECTION.upper(),
- )
- xml += "\n".join(
- [
- " {epsg}".format(
- epsg=epsg.upper()
- )
- for epsg in list(settings.WCS_CRS.keys())
- ]
- )
- xml += """
-
-
-"""
- xml += """\
-
- GeoTIFF
-
-
-"""
+ xml += self.coverage_offering(coverage)
xml += """\
"""
@@ -256,9 +240,7 @@ def service(self):
self=self, constraints=settings.CONSTRAINTS
)
- base_url = tl.Unicode(
- default_value=None, allow_none=True
- ) # e.g., http://hostname:port/path?
+ base_url = tl.Unicode(default_value=None, allow_none=True) # e.g., http://hostname:port/path?
def capability(self):
return """\
@@ -300,9 +282,7 @@ def capability(self):
self=self
)
- coverages = tl.List(
- tl.Instance(klass=Coverage)
- ) # is populated via Traits in constructor
+ coverages = tl.List(tl.Instance(klass=Coverage)) # is populated via Traits in constructor
# Check if list of layers available should be trimmed
layer_subset = []
@@ -318,36 +298,21 @@ def contents(self):
# If configured, trim layers list to layers specified in settings
if self.limit_layers:
- self.coverages = [
- layer
- for layer in self.coverages
- if layer.identifier in self.layer_subset
- ]
+ self.coverages = [layer for layer in self.coverages if layer.identifier in self.layer_subset]
for coverage in self.coverages:
xml += " \n"
if coverage.abstract:
- xml += " {coverage.abstract}\n".format(
- coverage=coverage
- )
+ xml += " {coverage.abstract}\n".format(coverage=coverage)
if coverage.identifier: # required
- xml += (
- " {coverage.identifier}\n".format(
- coverage=coverage
- )
- )
+ xml += " {coverage.identifier}\n".format(coverage=coverage)
else:
logger.info("Invalid layer. Missing name.")
if coverage.title: # required
- xml += " {coverage.title}\n".format(
- coverage=coverage
- )
+ xml += " {coverage.title}\n".format(coverage=coverage)
else:
logger.info("Invalid layer. Missing label.")
- if (
- coverage.wgs84_bounding_box_lower_corner_lat_lon
- or coverage.wgs84_bounding_box_upper_corner_lat_lon
- ):
+ if coverage.wgs84_bounding_box_lower_corner_lat_lon or coverage.wgs84_bounding_box_upper_corner_lat_lon:
xml += """
{coverage.wgs84_bounding_box_lower_corner_lat_lon[1]} {coverage.wgs84_bounding_box_lower_corner_lat_lon[0]}
{coverage.wgs84_bounding_box_upper_corner_lat_lon[1]} {coverage.wgs84_bounding_box_upper_corner_lat_lon[0]}
diff --git a/ogc/wms_request_1_3_0.py b/ogc/wms_request_1_3_0.py
index 3333688..6c34ee6 100755
--- a/ogc/wms_request_1_3_0.py
+++ b/ogc/wms_request_1_3_0.py
@@ -11,6 +11,7 @@
logger = logging.getLogger(__file__)
WMS_VALIDATION_ERROR = "WMS Request validation error: service should be WMS"
+
class GetCapabilities(ogc_common.XMLNode):
"""
Request to a WMS server to perform the GetCapabilities operation.
@@ -25,9 +26,7 @@ class GetCapabilities(ogc_common.XMLNode):
accept_formats = tl.List(trait=tl.Instance(klass=ogc_common.OutputFormat))
def validate(self):
- assert (
- self.service == "WMS"
- ), WMS_VALIDATION_ERROR
+ assert self.service == "WMS", WMS_VALIDATION_ERROR
for obj in self.accept_formats:
obj.validate()
@@ -62,15 +61,11 @@ class GetMap(ogc_common.XMLNode):
)
def validate(self):
- assert (
- self.service == "WMS"
- ), WMS_VALIDATION_ERROR
+ assert self.service == "WMS", WMS_VALIDATION_ERROR
assert self.layer, "WMS Request validation error: no coverage specified"
assert self.bbox, "WMS Request validation error: no bounding box specified"
- assert (
- self.output_format
- ), "WMS Request validation error: no output format specified"
+ assert self.output_format, "WMS Request validation error: no output format specified"
assert self.height, "WMS Request validation error: no height specified"
assert self.width, "WMS Request validation error: no width specified"
lons = [self.bbox.lower_corner[0], self.bbox.upper_corner[0]]
@@ -92,16 +87,14 @@ def _load_from_kv(self, args):
self.layer = Identifier(value=args["layers"])
if "crs" in list(args.keys()):
- assert (
- args["crs"].lower() in settings.WMS_CRS
- ), "SRS not supported [CRS]: %s (%s are supported.)" % (
+ assert args["crs"].lower() in settings.WMS_CRS, "SRS not supported [CRS]: %s (%s are supported.)" % (
args["crs"],
str(settings.WMS_CRS),
)
self.crs = args["crs"].lower()
bbox = args["bbox"].split(",")
- # BBOX = minx, miny, maxx, maxy, minz, maxz
+ # BBOX : [minx, miny, maxx, maxy, minz, maxz]
self.bbox = ogc_common.BoundingBox(
lower_corner=(float(bbox[0]), float(bbox[1])),
upper_corner=(float(bbox[2]), float(bbox[3])),
@@ -116,9 +109,9 @@ def _load_from_kv(self, args):
)
if "time" in args:
- # TIME = time1, time2,...
+ # TIME : time1, time2,...
# or
- # TIME = min / max / res, ...
+ # TIME : min / max / res, ...
assert "," not in args["time"], "error loading time from request"
self.time = args["time"]
@@ -142,17 +135,13 @@ class GetLegendGraphic(ogc_common.XMLNode):
service = tl.Unicode(default_value=None, allow_none=True)
version = tl.Unicode(default_value=None, allow_none=True)
- output_format = tl.Enum(
- values=["image/png", "image/png; mode=8bit", "image/png;mode=8-bit"]
- )
+ output_format = tl.Enum(values=["image/png", "image/png; mode=8bit", "image/png;mode=8-bit"])
layer = tl.Instance(klass=Identifier) # Limited to one
crs = tl.Enum(values=list(settings.WMS_CRS.keys()))
def validate(self):
- assert (
- self.service == "WMS"
- ), WMS_VALIDATION_ERROR
+ assert self.service == "WMS", WMS_VALIDATION_ERROR
assert self.layer, "WMS Request validation error: no coverage specified"
def _load_from_kv(self, args):
diff --git a/ogc/wms_response_1_3_0.py b/ogc/wms_response_1_3_0.py
index 9e584e9..0991798 100755
--- a/ogc/wms_response_1_3_0.py
+++ b/ogc/wms_response_1_3_0.py
@@ -58,12 +58,10 @@ def service(self):
self=self,
constraints=settings.CONSTRAINTS,
maxWidthWMS=int(np.sqrt(settings.MAX_GRID_COORDS_REQUEST_SIZE)),
- maxHeightWMS=int(np.sqrt(settings.MAX_GRID_COORDS_REQUEST_SIZE))
+ maxHeightWMS=int(np.sqrt(settings.MAX_GRID_COORDS_REQUEST_SIZE)),
)
- base_url = tl.Unicode(
- default_value=None, allow_none=True
- ) # e.g., http://hostname:port/path?
+ base_url = tl.Unicode(default_value=None, allow_none=True) # e.g., http://hostname:port/path?
def request(self):
return """\
@@ -100,9 +98,7 @@ def exception(self):
"""
- coverages = tl.List(
- trait=tl.Instance(klass=Coverage)
- ) # is populated via Traits in constructor
+ coverages = tl.List(trait=tl.Instance(klass=Coverage)) # is populated via Traits in constructor
# Check if list of layers available should be trimmed
layer_subset = []
@@ -113,6 +109,97 @@ def exception(self):
except Exception as e:
logger.info("Layer limiting settings not enabled: {}".format(e))
+ def coverage_times_list(self, coverage, default_time):
+ # Build list of times to display
+ display_times = []
+ for time in reversed(coverage.layer.valid_times):
+ if (default_time - time).days < settings.PAST_DAYS_INCLUDED:
+ display_times.append(time)
+ else:
+ # Stop looking once the list has reached too far in the past
+ break
+
+ return display_times
+
+ def coverage_layer(self, coverage):
+ xml = """ \n"""
+ if coverage.identifier:
+ xml += f" {coverage.identifier}\n"
+ if coverage.title:
+ xml += f" {coverage.title}\n"
+ else:
+ logger.info("Invalid layer. Missing title.")
+ if coverage.abstract:
+ xml += f" {coverage.abstract}\n"
+
+ xml += self._get_CRS_and_BoundingBox()
+
+ if (
+ hasattr(coverage.layer, "valid_times")
+ and coverage.layer.valid_times is not tl.Undefined
+ and len(coverage.layer.valid_times) > 0
+ ):
+ min_time = coverage.layer.valid_times[0]
+ max_time = coverage.layer.valid_times[-1]
+ time_dimension_str = (
+ """ {times}\n"""
+ )
+
+ # Find last time with seconds == 0
+ try:
+ latest_lis_time = next(
+ (t for t in reversed(coverage.layer.valid_times) if t.second == 0),
+ None,
+ )
+ except AttributeError:
+ latest_lis_time = next((t for t in reversed(coverage.layer.valid_times)), None)
+
+ if latest_lis_time is not None:
+ # default to latest LIS time, if available
+ default_time = latest_lis_time
+ else:
+ # otherwise default to first available time
+ default_time = min_time
+
+ if settings.USE_TIMES_LIST:
+ display_times = self.coverage_times_list(coverage, default_time)
+ times_available_str = ",".join([time.isoformat() + "Z" for time in reversed(display_times)])
+ else:
+ times_available_str = "{min_time}/{max_time}/P3H".format(
+ min_time=min_time.isoformat() + "Z",
+ max_time=max_time.isoformat() + "Z",
+ )
+
+ xml += time_dimension_str.format(
+ times=times_available_str,
+ default_time=default_time.isoformat() + "Z",
+ )
+
+ legend_graphic_width = coverage.layer.legend_graphic_width
+ legend_graphic_height = coverage.layer.legend_graphic_height
+
+ legend_link = "{base}SERVICE={service}&VERSION={version}&REQUEST=GetLegendGraphic&LAYER={layer}&STYLE=default&FORMAT=image/png; mode=8bit".format(
+ base=self.base_url if self.base_url.endswith("?") else self.base_url + "?",
+ service=self.service_type,
+ version=self.version,
+ layer=coverage.identifier,
+ )
+
+ # Write the style section
+ xml += """ \n"""
+ xml += """ \n"""
+
+ return xml
+
def layers(self):
xml = " \n"
xml += " {}\n".format(self.service_group_title)
@@ -120,107 +207,11 @@ def layers(self):
# If configured, trim layers list to layers specified in settings
if self.limit_layers:
- self.coverages = [
- layer
- for layer in self.coverages
- if layer.identifier in self.layer_subset
- ]
+ self.coverages = [layer for layer in self.coverages if layer.identifier in self.layer_subset]
for coverage in self.coverages:
- xml += """ \n"""
- if coverage.identifier:
- xml += f" {coverage.identifier}\n"
- if coverage.title:
- xml += f" {coverage.title}\n"
- else:
- logger.info("Invalid layer. Missing title.")
- if coverage.abstract:
- xml += f" {coverage.abstract}\n"
-
- xml += self._get_CRS_and_BoundingBox()
-
- if (
- hasattr(coverage.layer, "valid_times")
- and coverage.layer.valid_times is not tl.Undefined
- and len(coverage.layer.valid_times) > 0
- ):
- min_time = coverage.layer.valid_times[0]
- max_time = coverage.layer.valid_times[-1]
- time_dimension_str = """ {times}\n"""
-
- # Find last time with seconds == 0
- try:
- latest_lis_time = next(
- (
- t
- for t in reversed(coverage.layer.valid_times)
- if t.second == 0
- ),
- None,
- )
- except AttributeError:
- latest_lis_time = next(
- (t for t in reversed(coverage.layer.valid_times)), None
- )
-
- if latest_lis_time is not None:
- # default to latest LIS time, if available
- default_time = latest_lis_time
- else:
- # otherwise default to first available time
- default_time = min_time
-
- if settings.USE_TIMES_LIST:
- # Build list of times to display
- display_times = []
- for time in reversed(coverage.layer.valid_times):
- if (default_time - time).days < settings.PAST_DAYS_INCLUDED:
- display_times.append(time)
- else:
- # Stop looking once the list has reached too far in the past
- break
-
- times_available_str = ",".join(
- [time.isoformat() + "Z" for time in reversed(display_times)]
- )
-
- else:
- times_available_str = "{min_time}/{max_time}/P3H".format(
- min_time=min_time.isoformat() + "Z",
- max_time=max_time.isoformat() + "Z",
- )
-
- xml += time_dimension_str.format(
- times=times_available_str,
- default_time=default_time.isoformat() + "Z",
- )
+ xml += self.coverage_layer(coverage)
- legend_graphic_width = coverage.layer.legend_graphic_width
- legend_graphic_height = coverage.layer.legend_graphic_height
-
- legend_link = "{base}SERVICE={service}&VERSION={version}&REQUEST=GetLegendGraphic&LAYER={layer}&STYLE=default&FORMAT=image/png; mode=8bit".format(
- base=self.base_url
- if self.base_url.endswith("?")
- else self.base_url + "?",
- service=self.service_type,
- version=self.version,
- layer=coverage.identifier,
- )
-
- # Write the style section
- xml += """ \n"""
- xml += """ \n"""
# end layers list
xml += """ """
diff --git a/setup.py b/setup.py
index 0f079d9..380d362 100755
--- a/setup.py
+++ b/setup.py
@@ -18,7 +18,7 @@ class PostDevelopCommand(develop):
def run(self):
try:
subprocess.check_call(["pre-commit", "install"])
- except subprocess.CalledProcessError as e:
+ except subprocess.CalledProcessError:
print("Failed to install pre-commit hook")
develop.run(self)