diff --git a/.devcontainer/dev_container.dockerfile b/.devcontainer/dev_container.dockerfile
index 07e7dd0..741b275 100644
--- a/.devcontainer/dev_container.dockerfile
+++ b/.devcontainer/dev_container.dockerfile
@@ -1,5 +1,5 @@
# Base image for the development container
-ARG BASE_URL=python:3.8-slim
+ARG BASE_URL=python:3.11-slim
FROM ${BASE_URL}
USER root
diff --git a/.github/workflows/github-python-workflow.yml b/.github/workflows/github-python-workflow.yml
index 9f09f67..69a4f81 100644
--- a/.github/workflows/github-python-workflow.yml
+++ b/.github/workflows/github-python-workflow.yml
@@ -21,7 +21,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v3
with:
- python-version: "3.8"
+ python-version: "3.11"
- name: Install dependencies
run: |
@@ -64,7 +64,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v3
with:
- python-version: "3.8"
+ python-version: "3.11"
- name: Install dependencies
run: |
diff --git a/example/app.py b/example/app.py
index 10f26a5..8394228 100755
--- a/example/app.py
+++ b/example/app.py
@@ -13,6 +13,9 @@
import podpac
import numpy as np
+# Setup new dimension
+podpac.core.coordinates.utils.add_valid_dimension("forecastOffsetHr")
+
# create some podpac nodes
data = np.random.default_rng(1).random((11, 21))
lat = np.linspace(90, -90, 11)
@@ -23,12 +26,19 @@
data2 = np.random.default_rng(1).random((11, 21))
node2 = podpac.data.Array(source=data2, coordinates=coords)
+time = np.array(["2025-10-24T12:00:00"], dtype="datetime64")
+offsets = [np.timedelta64(0, "h")]
+coords = podpac.Coordinates([lat, lon, time, offsets], dims=["lat", "lon", "time", "forecastOffsetHr"])
+data3 = np.random.default_rng(1).random((11, 21, 1, 1))
+node3 = podpac.data.Array(source=data3, coordinates=coords)
+
# use podpac nodes to create some OGC layers
layer1 = pogc.Layer(
node=node1,
identifier="layer1",
title="OGC/POPAC layer containing random data",
abstract="This layer contains some random data",
+ group="Layers",
)
layer2 = pogc.Layer(
@@ -37,9 +47,19 @@
title="FOUO: Another OGC/POPAC layer containing random data",
abstract="Marked as FOUO. This layer contains some random data. Same coordinates as layer1, but different values.",
is_fouo=True,
+ group="Layers",
+)
+
+layer3 = pogc.Layer(
+ node=node3,
+ identifier="layer3",
+ title="OGC/POPAC layer containing random data with time instances available.",
+ abstract="This layer contains some random data with time instances available.",
+ group="Layers",
+ valid_times=[dt.astype(datetime) for dt in time],
)
-all_layers = [layer1, layer2]
+all_layers = [layer1, layer2, layer3]
non_fouo_layers = [layer for layer in all_layers if not layer.is_fouo]
# create a couple of different ogc endpoints
@@ -69,6 +89,13 @@ def api_home(endpoint):
WMS GetLegend Example (PNG) (v1.3.0)
+ EDR: Open Geospatial Consortium (OGC) Environmental Data Retrieval (EDR) (v1.0.1)
+
+
"""
diff --git a/ogc/__init__.py b/ogc/__init__.py
index 7e130a5..b8aa1a1 100755
--- a/ogc/__init__.py
+++ b/ogc/__init__.py
@@ -1,5 +1,5 @@
"""
-OGC WMS/WCS (v1.3.0/v1.0.0) server
+OGC WMS/WCS (v1.3.0/v1.0.0) server
"""
import traitlets as tl
@@ -70,10 +70,9 @@ class Layer(tl.HasTraits):
identifier = tl.Unicode()
title = tl.Unicode(default_value="An OGC Layer")
abstract = tl.Unicode(default_value="This is an example OGC Layer")
+ group = tl.Unicode(default_value="Default")
is_fouo = tl.Bool(default_value=False)
- grid_coordinates = tl.Instance(
- klass=GridCoordinates, default_value=GridCoordinates()
- )
+ grid_coordinates = tl.Instance(klass=GridCoordinates, default_value=GridCoordinates())
valid_times = tl.List(
trait=tl.Instance(datetime.datetime),
default_value=tl.Undefined,
@@ -101,9 +100,7 @@ def __init__(self, *args, **kwargs):
elif "title" in kwargs:
string_repr = kwargs["title"]
if "is_enumerated" in kwargs:
- self._style = Style(
- string_repr=string_repr, is_enumerated=kwargs["is_enumerated"]
- )
+ self._style = Style(string_repr=string_repr, is_enumerated=kwargs["is_enumerated"])
else:
self._style = Style(string_repr=string_repr)
if self.valid_times is not tl.Undefined:
diff --git a/ogc/core.py b/ogc/core.py
index b9f04b8..570188b 100755
--- a/ogc/core.py
+++ b/ogc/core.py
@@ -14,6 +14,7 @@
from . import wcs_response_1_0_0
from . import wms_response_1_3_0
from . import ogc_common
+from .edr import EdrRoutes
from ogc.ogc_common import WCSException
@@ -62,6 +63,7 @@ def __init__(self, layers=[], **kwargs):
service_abstract=self.service_abstract,
service_group_title=self.service_group_title,
)
+ self.edr_routes = EdrRoutes(base_url=f"{self.server_address}{self.endpoint}/edr", layers=layers)
def get_coverage_from_id(self, identifier):
for coverage in self.wcs_capabilities.coverages:
diff --git a/ogc/edr/__init__.py b/ogc/edr/__init__.py
new file mode 100644
index 0000000..68fde54
--- /dev/null
+++ b/ogc/edr/__init__.py
@@ -0,0 +1,3 @@
+from .edr_routes import EdrRoutes
+
+__all__ = ["EdrRoutes"]
diff --git a/ogc/edr/config/default.json b/ogc/edr/config/default.json
new file mode 100644
index 0000000..c827b04
--- /dev/null
+++ b/ogc/edr/config/default.json
@@ -0,0 +1,50 @@
+{
+ "server": {
+ "mimetype": "application/json; charset=UTF-8",
+ "encoding": "utf-8",
+ "language": "en-US",
+ "cors": true,
+ "pretty_print": true,
+ "limits": {
+ "default_items": 50,
+ "max_items": 1000,
+ "max_distance_x": 999999999999,
+ "max_distance_y": 999999999999,
+ "max_distance_units": "km",
+ "on_exceed": "error"
+ },
+ "admin": false,
+ "map": {
+ "url": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
+ "attribution": "'© OpenStreetMap contributors'"
+ }
+ },
+ "logging": {
+ "level": "ERROR"
+ },
+ "metadata": {
+ "identification": {
+ "title": "OGC Server",
+ "description": "An example OGC Server",
+ "keywords": [
+ "geospatial",
+ "podpac"
+ ],
+ "keywords_type": "theme",
+ "terms_of_service": "http://www.apache.org/licenses/LICENSE-2.0",
+ "url": "https://github.com/creare-com/ogc"
+ },
+ "license": {
+ "name": "Apache 2.0 license",
+ "url": "http://www.apache.org/licenses/LICENSE-2.0"
+ },
+ "provider": {
+ "name": "Creare LLC",
+ "url": "https://github.com/creare-com"
+ },
+ "contact": {
+ "name": "Creare LLC",
+ "url": "https://github.com/creare-com"
+ }
+ }
+}
\ No newline at end of file
diff --git a/ogc/edr/edr_config.py b/ogc/edr/edr_config.py
new file mode 100644
index 0000000..100b262
--- /dev/null
+++ b/ogc/edr/edr_config.py
@@ -0,0 +1,202 @@
+import os
+import json
+import numpy as np
+import traitlets as tl
+from datetime import datetime
+from collections import defaultdict
+from typing import List, Dict, Tuple, Any
+from ogc import podpac as pogc
+from .. import settings
+
+
+class EdrConfig:
+ """Defines the configuration for the pygeoapi based server.
+
+ This configuration is used to replace the typical YAML based configurations in order to provide dynamic properties.
+ """
+
+ @staticmethod
+ def get_configuration(base_url: str, layers: List[pogc.Layer]) -> Dict[str, Any]:
+ """Generate the configuration for the API.
+
+ Parameters
+ ----------
+ base_url : str
+ The base URL for the EDR endpoints.
+ layers : List[pogc.Layer]
+ The layers which define the data sources for the EDR server.
+
+ Returns
+ -------
+ Dict[str, Any]
+ The configuration for the API as a dictionary.
+ """
+ configuration_path = settings.EDR_CONFIGURATION_PATH
+ if configuration_path is None:
+ configuration_path = os.path.abspath(os.path.join(os.path.dirname(__file__) + "/config/default.json"))
+
+ configuration = {}
+ with open(configuration_path) as f:
+ configuration = json.load(f)
+
+ # Add default static files with an absolute path
+ server = configuration.get("server", {})
+ configuration["server"] = server | {
+ "templates": {
+ "path": os.path.abspath(os.path.join(os.path.dirname(__file__) + "/templates/")),
+ "static": os.path.abspath(os.path.join(os.path.dirname(__file__) + "/static/")),
+ }
+ }
+ configuration["server"]["url"] = base_url
+
+ # Add the data resources and provider information
+ resources = configuration.get("resources", {})
+ configuration["resources"] = resources | EdrConfig._resources_definition(layers)
+
+ return configuration
+
+ @staticmethod
+ def _resources_definition(layers: List[pogc.Layer]) -> Dict[str, Any]:
+ """Define resource related data for the configuration.
+
+ The resources dictionary holds the information needed to generate the collections.
+ Each group is mapped to a collection with the layers in the group forming the collection parameters.
+ The custom provider is specified with a data value of the group name.
+ This allows for the provider to generate the collection data for each group.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ The layers which define the data sources for the EDR server.
+
+ Returns
+ -------
+ Dict[str, Any]
+ The resources configuration for the API as a dictionary.
+ """
+
+ resources = {}
+ groups = defaultdict(list)
+
+ # Organize the data into groups
+ for layer in layers:
+ groups[layer.group].append(layer)
+
+ # Generate collection resources based on groups
+ for group_name, group_layers in groups.items():
+ resource = {
+ group_name: {
+ "type": "collection",
+ "visibility": "default",
+ "title": group_name,
+ "description": f"Collection of data related to {group_name}",
+ "keywords": ["podpac"],
+ "extents": EdrConfig._generate_extents(group_layers),
+ "providers": [
+ {
+ "type": "edr",
+ "default": True,
+ "name": "ogc.edr.edr_provider.EdrProvider",
+ "data": group_name,
+ "layers": group_layers,
+ "crs": [
+ "https://www.opengis.net/def/crs/OGC/1.3/CRS84",
+ "https://www.opengis.net/def/crs/EPSG/0/4326",
+ ],
+ "format": {
+ "name": "geotiff",
+ "mimetype": "image/tiff",
+ },
+ }
+ ],
+ }
+ }
+ resources.update(resource)
+
+ return resources
+
+ @staticmethod
+ def _generate_extents(layers: List[pogc.Layer]) -> Dict[str, Any]:
+ """Generate the extents dictionary for provided layers.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ The layers to create the temporal and spatial extents for.
+
+ Returns
+ -------
+ Dict[str, Any]
+ The extents dictionary for the layers.
+ """
+ llc_lon, llc_lat, urc_lon, urc_lat = None, None, None, None
+ min_time, max_time = None, None
+ time_range = None
+ # Determine bounding box which holds all layers
+ for layer in layers:
+ llc_lon_tmp, llc_lat_tmp, urc_lon_tmp, urc_lat_tmp = EdrConfig._wgs84_bounding_box(layer)
+ if any(coord is None for coord in [llc_lon, llc_lat, urc_lon, urc_lat]):
+ llc_lon, llc_lat, urc_lon, urc_lat = llc_lon_tmp, llc_lat_tmp, urc_lon_tmp, urc_lat_tmp
+ else:
+ llc_lon = min(llc_lon, llc_lon_tmp)
+ llc_lat = min(llc_lat, llc_lat_tmp)
+ urc_lon = max(urc_lon, urc_lon_tmp)
+ urc_lat = max(urc_lat, urc_lat_tmp)
+
+ if hasattr(layer, "valid_times") and layer.valid_times is not tl.Undefined and len(layer.valid_times) > 0:
+ layer_min_time = np.min(layer.valid_times)
+ layer_max_time = np.max(layer.valid_times)
+ if any(time is None for time in [min_time, max_time]):
+ min_time = layer_min_time
+ max_time = layer_max_time
+ else:
+ min_time = min(min_time, layer_min_time)
+ max_time = max(max_time, layer_max_time)
+
+ time_range = [
+ min_time.isoformat(),
+ max_time.isoformat(),
+ ]
+
+ return {
+ "spatial": {
+ "bbox": [llc_lon, llc_lat, urc_lon, urc_lat], # minx, miny, maxx, maxy
+ "crs": "https://www.opengis.net/def/crs/OGC/1.3/CRS84",
+ },
+ **(
+ {
+ "temporal": {
+ "begin": datetime.fromisoformat(time_range[0]), # start datetime in RFC3339
+ "end": datetime.fromisoformat(time_range[-1]), # end datetime in RFC3339
+ "trs": "https://www.opengis.net/def/uom/ISO-8601/0/Gregorian",
+ }
+ }
+ if time_range is not None
+ else {}
+ ),
+ }
+
+ @staticmethod
+ def _wgs84_bounding_box(layer: pogc.Layer) -> Tuple[float, float, float, float]:
+ """Retrieve the bounding box for the layer with a default fallback.
+
+ Parameters
+ ----------
+ layer : pogc.Layer
+ The layer from which to get the bounding box coordinates.
+
+ Returns
+ -------
+ Tuple[float, float, float, float]
+ Lower-left longitude, lower-left latitude, upper-right longitude, upper-right latitude.
+ """
+ try:
+ return (
+ layer.grid_coordinates.LLC.lon,
+ layer.grid_coordinates.LLC.lat,
+ layer.grid_coordinates.URC.lon,
+ layer.grid_coordinates.URC.lat,
+ )
+ except Exception:
+ crs_extents = settings.EDR_CRS["crs:84"]
+ return (crs_extents["minx"], crs_extents["miny"], crs_extents["maxx"], crs_extents["maxy"])
diff --git a/ogc/edr/edr_provider.py b/ogc/edr/edr_provider.py
new file mode 100644
index 0000000..897d12a
--- /dev/null
+++ b/ogc/edr/edr_provider.py
@@ -0,0 +1,741 @@
+import base64
+import io
+import numpy as np
+import zipfile
+import traitlets as tl
+from datetime import datetime
+from typing import List, Dict, Tuple, Any
+from shapely.geometry.base import BaseGeometry
+from pygeoapi.provider.base import ProviderConnectionError, ProviderInvalidQueryError
+from pygeoapi.provider.base_edr import BaseEDRProvider
+from ogc import podpac as pogc
+import podpac
+
+from .. import settings
+
+
+class EdrProvider(BaseEDRProvider):
+ """Custom provider to be used with layer data sources."""
+
+ def __init__(self, provider_def: Dict[str, Any]):
+ """Construct the provider using the provider definition.
+
+ Parameters
+ ----------
+ provider_def : Dict[str, Any]
+ The provider configuration definition.
+
+ Raises
+ ------
+ ProviderConnectionError
+ Raised if the specified collection is not found within any layers.
+ ProviderConnectionError
+ Raised if the provider does not specify any data sources.
+ """
+ super().__init__(provider_def)
+ collection_id = provider_def.get("data", None)
+ if collection_id is None:
+ raise ProviderConnectionError("Data not found.")
+
+ self.collection_id = str(collection_id)
+
+ self.layers = provider_def.get("layers", [])
+ if len(self.layers) == 0:
+ raise ProviderConnectionError("Valid data sources not found.")
+
+ @property
+ def parameters(self) -> Dict[str, pogc.Layer]:
+ """The parameters which are defined in a given collection.
+
+ The parameters map to the layers which are a part of the group, with keys of the layer identifiers.
+
+ Returns
+ -------
+ Dict[str, pogc.Layer]
+ The parameters as a dictionary of layer identifiers and layer objects.
+ """
+ return {layer.identifier: layer for layer in self.layers if layer.group == self.collection_id}
+
+ def handle_query(self, requested_coordinates: podpac.Coordinates, **kwargs):
+ """Handle the requests to the EDR server at the specified requested coordinates.
+ The coordinates are expected to be latitude and longitude values determined by the specific query function.
+
+ Parameters
+ ----------
+ requested_coordinates : podpac.Coordinates
+ The coordinates for evaluation, it is expected that the coordinates passed in only hold lat and lon.
+ instance : str
+ The time instance for the request.
+ select_properties : List[str]
+ The selected properties (parameters) for the request.
+ format_ : str
+ The requested output format of the data.
+ datetime_ : str
+ The requested datetime/datetimes for data retrieval.
+ z : str
+ The requested vertical level/levels for data retrieval.
+
+ Returns
+ -------
+ Any
+ Coverage data as a dictionary of CoverageJSON or native format.
+
+ Raises
+ ------
+ ProviderInvalidQueryError
+ Raised if a datetime string is provided but cannot be interpreted.
+ ProviderInvalidQueryError
+ Raised if an altitude string is provided but cannot be interpreted.
+ ProviderInvalidQueryError
+ Raised if the parameters are invalid.
+ ProviderInvalidQueryError
+ Raised if an instance is provided and it is invalid.
+ ProviderInvalidQueryError
+ Raised if native coordinates could not be found.
+ ProviderInvalidQueryError
+ Raised if the request queries for native coordinates exceeding the max allowable size.
+ """
+ instance = kwargs.get("instance")
+ requested_parameters = kwargs.get("select_properties")
+ output_format = kwargs.get("format_")
+ datetime_arg = kwargs.get("datetime_")
+ z_arg = kwargs.get("z")
+
+ output_format = str(output_format).lower()
+ available_times = self.get_datetimes(list(self.parameters.values()))
+ available_altitudes = self.get_altitudes(list(self.parameters.values()))
+ time_coords = self.interpret_time_coordinates(available_times, datetime_arg, requested_coordinates.crs)
+ altitude_coords = self.interpret_altitude_coordinates(available_altitudes, z_arg, requested_coordinates.crs)
+ # Allow parameters without case-sensitivity
+ parameters_lower = [param.lower() for param in requested_parameters or []]
+ parameters_filtered = {
+ key: value
+ for key, value in self.parameters.items()
+ if key.lower() in parameters_lower and value is not None
+ }
+
+ self.check_query_condition(datetime_arg is not None and time_coords is None, "Invalid datetime provided.")
+ self.check_query_condition(z_arg is not None and altitude_coords is None, "Invalid altitude provided.")
+ self.check_query_condition(len(parameters_filtered) == 0, "Invalid parameters provided.")
+ self.check_query_condition(
+ instance is not None and not self.validate_datetime(instance), "Invalid instance time provided."
+ )
+
+ if time_coords is not None:
+ requested_coordinates = podpac.coordinates.merge_dims([time_coords, requested_coordinates])
+ if altitude_coords is not None:
+ requested_coordinates = podpac.coordinates.merge_dims([altitude_coords, requested_coordinates])
+
+ # Handle defining native coordinates for the query, these should match between each layer
+ coordinates_list = next(iter(parameters_filtered.values())).node.find_coordinates()
+
+ self.check_query_condition(len(coordinates_list) == 0, "Native coordinates not found.")
+
+ requested_native_coordinates = self.get_native_coordinates(
+ requested_coordinates, coordinates_list[0], np.datetime64(instance)
+ )
+
+ self.check_query_condition(
+ requested_native_coordinates.size > settings.MAX_GRID_COORDS_REQUEST_SIZE,
+ "Grid coordinates x_size * y_size must be less than %d" % settings.MAX_GRID_COORDS_REQUEST_SIZE,
+ )
+
+ dataset = {}
+ for requested_parameter, layer in parameters_filtered.items():
+ units_data_array = layer.node.eval(requested_native_coordinates)
+ dataset[requested_parameter] = units_data_array
+
+ self.check_query_condition(len(dataset) == 0, "No matching parameters found.")
+
+ # Return a coverage json if specified, else return Base64 encoded native response
+ if output_format == "json" or output_format == "coveragejson":
+ crs = self.interpret_crs(requested_native_coordinates.crs if requested_native_coordinates else None)
+ return self.to_coverage_json(self.layers, dataset, crs)
+ else:
+ return self.to_geotiff_response(dataset, self.collection_id)
+
+ def position(self, **kwargs):
+ """Handles requests for the position query type.
+
+ Parameters
+ ----------
+ wkt : shapely.geometry
+ WKT geometry
+ crs : str
+ The requested CRS for the return coordinates and data.
+
+ Returns
+ -------
+ Any
+ Coverage data as a dictionary of CoverageJSON or native format.
+
+ Raises
+ ------
+ ProviderInvalidQueryError
+ Raised if the wkt string is not provided.
+ ProviderInvalidQueryError
+ Raised if the wkt string is an unknown type.
+ """
+ lat, lon = [], []
+ wkt = kwargs.get("wkt")
+ crs = kwargs.get("crs")
+ crs = EdrProvider.interpret_crs(crs)
+
+ if not isinstance(wkt, BaseGeometry):
+ raise ProviderInvalidQueryError("Invalid wkt provided.")
+ elif wkt.geom_type == "Point":
+ lon, lat = EdrProvider.crs_converter([wkt.x], [wkt.y], crs)
+ else:
+ raise ProviderInvalidQueryError("Unknown WKT Type (Use Point).")
+
+ requested_coordinates = podpac.Coordinates([lat, lon], dims=["lat", "lon"], crs=crs)
+
+ return self.handle_query(requested_coordinates, **kwargs)
+
+ def cube(self, **kwargs):
+ """Handles requests for the cube query type.
+
+ Parameters
+ ----------
+ bbox : List[float]
+ Bbox geometry (for cube queries)
+ crs : str
+ The requested CRS for the return coordinates and data.
+
+ Returns
+ -------
+ Any
+ Coverage data as a dictionary of CoverageJSON or native format.
+
+ Raises
+ ------
+ ProviderInvalidQueryError
+ Raised if the bounding box is invalid.
+ """
+ bbox = kwargs.get("bbox")
+ crs = kwargs.get("crs")
+ crs = EdrProvider.interpret_crs(crs)
+
+ if not isinstance(bbox, List) or len(bbox) != 4:
+ raise ProviderInvalidQueryError("Invalid bounding box provided.")
+
+ xmin, ymin, xmax, ymax = bbox
+ lon, lat = EdrProvider.crs_converter([xmin, xmax], [ymin, ymax], crs)
+
+ requested_coordinates = podpac.Coordinates([lat, lon], dims=["lat", "lon"], crs=crs)
+
+ return self.handle_query(requested_coordinates, **kwargs)
+
+ def area(self, **kwargs):
+ """Handles requests for the area query type.
+
+ Parameters
+ ----------
+ wkt : shapely.geometry
+ WKT geometry
+ crs : str
+ The requested CRS for the return coordinates and data.
+
+ Returns
+ -------
+ Any
+ Coverage data as a dictionary of CoverageJSON or native format.
+
+ Raises
+ ------
+ ProviderInvalidQueryError
+ Raised if the wkt string is not provided.
+ ProviderInvalidQueryError
+ Raised if the wkt string is an unknown type.
+ """
+ lat, lon = [], []
+ wkt = kwargs.get("wkt")
+ crs = kwargs.get("crs")
+ crs = EdrProvider.interpret_crs(crs)
+
+ if not isinstance(wkt, BaseGeometry):
+ raise ProviderInvalidQueryError("Invalid wkt provided.")
+ elif wkt.geom_type == "Polygon":
+ lon, lat = EdrProvider.crs_converter(wkt.exterior.xy[0], wkt.exterior.xy[1], crs)
+ else:
+ raise ProviderInvalidQueryError("Unknown WKT Type (Use Polygon).")
+
+ requested_coordinates = podpac.Coordinates([lat, lon], dims=["lat", "lon"], crs=crs)
+
+ return self.handle_query(requested_coordinates, **kwargs)
+
+ def get_instance(self, instance: str) -> str | None:
+ """Validate instance identifier.
+
+ Parameters
+ ----------
+ instance : str
+ The instance identifier to validate.
+
+ Returns
+ -------
+ str
+ The instance identifier if valid, otherwise returns None.
+ """
+ return instance if instance in self.instances() else None
+
+ def instances(self, **kwargs) -> List[str]:
+ """The instances in the collection.
+
+ Returns
+ -------
+ List[str]
+ The instances available in the collection.
+ """
+ instances = set()
+ for layer in self.layers:
+ if layer.group == self.collection_id:
+ instances.update(layer.time_instances())
+ return list(instances)
+
+ def get_fields(self) -> Dict[str, Any]:
+ """The observed property fields (parameters) in the collection.
+
+ Returns
+ -------
+ Dict[str, Any]
+ The fields based on the available parameters.
+ """
+ fields = {}
+ for parameter_key, layer in self.parameters.items():
+ units = layer.node.units if layer.node.units is not None else layer.node.style.units
+ fields[parameter_key] = {
+ "type": "number",
+ "title": parameter_key,
+ "x-ogc-unit": units,
+ }
+ return fields
+
+ @staticmethod
+ def get_altitudes(layers: List[pogc.Layer]) -> List[float]:
+ """The list of available altitudes for the provided layers.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ The list of layers to determine altitudes for.
+
+ Returns
+ -------
+ List[float]
+ Available altitudes for the providers layers.
+ """
+
+ available_altitudes = set()
+ for layer in layers:
+ coordinates_list = layer.node.find_coordinates()
+ if len(coordinates_list) > 0 and "alt" in coordinates_list[0].udims:
+ available_altitudes.update(coordinates_list[0]["alt"].coordinates)
+
+ return list(available_altitudes)
+
+ @staticmethod
+ def get_datetimes(layers: List[pogc.Layer]) -> List[str]:
+ """The list of available times for the provided layers.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ The list of layers to determine datetimes for.
+
+ Returns
+ -------
+ List[str]
+ Available time strings for the provider layers.
+ """
+
+ available_times = set()
+ for layer in layers:
+ if hasattr(layer, "valid_times") and layer.valid_times is not tl.Undefined and len(layer.valid_times) > 0:
+ available_times.update(layer.valid_times)
+
+ return list(available_times)
+
+ @staticmethod
+ def interpret_crs(crs: str | None) -> str:
+ """Interpret the CRS id string into a valid PyProj CRS format.
+
+ If None provided, return the default.
+ If the provided CRS is invalid, raise an error.
+
+ Parameters
+ ----------
+ crs : str
+ The input CRS id string which needs to be validated/converted.
+
+ Returns
+ -------
+ str
+ Pyproj CRS string.
+
+ Raises
+ ------
+ ProviderInvalidQueryError
+ Raised if the provided CRS string is unknown.
+ """
+ if crs is None or crs.lower() == "crs:84":
+ return settings.crs_84_pyproj_format # Pyproj acceptable format
+
+ if crs.lower() not in [key.lower() for key in settings.EDR_CRS.keys()]:
+ raise ProviderInvalidQueryError("Invalid CRS provided.")
+
+ return crs
+
+ @staticmethod
+ def crs_converter(x: Any, y: Any, crs: str) -> Tuple[Any, Any]:
+ """Convert the X, Y data to Longitude, Latitude data with the provided crs.
+
+ Parameters
+ ----------
+ x : Any
+ X data in any form.
+ y: Any
+ Y data in any form.
+ crs : str
+ The input CRS id string to apply to convert the X,Y data.
+
+ Returns
+ -------
+ Tuple[Any, Any]
+ The X,Y as Longitude/Latitude data.
+ """
+ if crs.lower() == "epsg:4326":
+ return (y, x)
+
+ return (x, y)
+
+ @staticmethod
+ def interpret_altitude_coordinates(
+ available_altitudes: List[float], altitude_string: str | None, crs: str | None
+ ) -> podpac.Coordinates | None:
+ """Interpret the string into altitude coordinates using known formats.
+
+ Specification:
+ single-level = level
+ interval-closed = min-level "/" max-level
+ repeating-interval = "R"number of intervals "/" min-level "/" height to increment by
+ level-list = level1 "," level2 "," level3
+
+ Parameters
+ ----------
+ available_altitudes: List[float]
+ The available altitudes for interpretation.
+ altitude_string : str | None
+ The string representation of the requested altitudes.
+ crs : str
+ The CRS that the coordinates need to match.
+
+ Returns
+ -------
+ podpac.Coordinates | None
+ Altitude coordinates for the request or None if conversion fails.
+ """
+ if not altitude_string or len(available_altitudes) == 0:
+ return None
+
+ try:
+ altitudes = None
+ if "/" in altitude_string:
+ altitudes_split = altitude_string.split("/")
+ if len(altitudes_split) == 2:
+ minimum = float(altitudes_split[0])
+ maximum = float(altitudes_split[1])
+ altitudes = [alt for alt in available_altitudes if minimum <= alt <= maximum]
+ if len(altitudes_split) == 3:
+ if altitudes_split[0].startswith("R"):
+ altitudes = float(altitudes_split[1]) + np.arange(float(altitudes_split[0][1:])) * float(
+ altitudes_split[2]
+ )
+ else:
+ altitudes = [float(alt) for alt in altitude_string.split(",")]
+ except ValueError:
+ return None
+
+ return podpac.Coordinates([altitudes], dims=["alt"], crs=crs) if altitudes is not None else None
+
+ @staticmethod
+ def interpret_time_coordinates(
+ available_times: List[str], time_string: str | None, crs: str | None
+ ) -> podpac.Coordinates | None:
+ """Interpret the string into a list of times using known formats.
+
+ Specification:
+ interval-closed = date-time "/" date-time
+ interval-open-start = "../" date-time
+ interval-open-end = date-time "/.."
+ interval = interval-closed / interval-open-start / interval-open-end
+ datetime = date-time / interval
+
+ Parameters
+ ----------
+ available_times: List[str]
+ The available times for interpretation.
+ time_string : str | None
+ The string representation of the requested times.
+ crs : str
+ The CRS that the coordinates need to match.
+
+ Returns
+ -------
+ podpac.Coordinates | None
+ Time coordinates for the request or None if conversion fails.
+ """
+
+ if not time_string or len(available_times) == 0:
+ return None
+
+ try:
+ times = None
+ np_available_times = [np.datetime64(time) for time in available_times]
+ if "/" in time_string:
+ times_split = time_string.split("/")
+ if len(times_split) == 2:
+ minimum = times_split[0]
+ maximum = times_split[1]
+ if minimum == "..":
+ times = [time for time in np_available_times if time <= np.datetime64(maximum)]
+ elif maximum == "..":
+ times = [time for time in np_available_times if time >= np.datetime64(minimum)]
+ else:
+ times = [
+ time
+ for time in np_available_times
+ if np.datetime64(minimum) <= time <= np.datetime64(maximum)
+ ]
+ else:
+ times = [np.datetime64(time_string)]
+ except ValueError:
+ return None
+
+ return podpac.Coordinates([times], dims=["time"], crs=crs) if times is not None else None
+
+ @staticmethod
+ def to_coverage_json(
+ layers: List[pogc.Layer], dataset: Dict[str, podpac.UnitsDataArray], crs: str
+ ) -> Dict[str, Any]:
+ """Generate a CoverageJSON of the data for the provided parameters.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers which were used in the dataset creation for metadata information.
+ dataset : Dict[str, podpac.UnitsDataArray]
+ Data in an units data array format with matching parameter key.
+ crs : str
+ The CRS associated with the requested coordinates and data response.
+
+ Returns
+ -------
+ Dict[str, Any]
+ A dictionary of the CoverageJSON data.
+ """
+
+ # Determine the bounding coordinates, assume they all are the same
+ coordinates = next(iter(dataset.values())).coords
+ x_arr, y_arr = EdrProvider.crs_converter(coordinates["lon"].values, coordinates["lat"].values, crs)
+
+ coverage_json = {
+ "type": "Coverage",
+ "domain": {
+ "type": "Domain",
+ "domainType": "Grid",
+ "axes": {
+ "x": {
+ "start": x_arr[0],
+ "stop": x_arr[-1],
+ "num": len(x_arr),
+ },
+ "y": {
+ "start": y_arr[0],
+ "stop": y_arr[-1],
+ "num": len(y_arr),
+ },
+ },
+ "referencing": [
+ {
+ "coordinates": ["lon", "lat"],
+ "system": {"type": "GeographicCRS", "id": crs},
+ },
+ {
+ "coordinates": ["t"],
+ "system": {
+ "type": "TemporalRS",
+ "calendar": "Gregorian",
+ },
+ },
+ ],
+ "parameters": {},
+ "ranges": {},
+ },
+ }
+ if "time" in coordinates.dims:
+ coverage_json["domain"]["axes"]["t"] = {
+ "values": [
+ time.astype("datetime64[ms]").astype(datetime).isoformat() + "Z"
+ for time in coordinates["time"].values
+ ]
+ }
+
+ for param, data_array in dataset.items():
+ layer = next(layer for layer in layers if layer.identifier == param)
+ units = layer.node.units if layer.node.units is not None else layer.node.style.units
+ parameter_definition = {
+ param: {
+ "type": "Parameter",
+ "observedProperty": {
+ "id": param,
+ "label": layer.title,
+ "description": {
+ "en": layer.abstract,
+ },
+ },
+ "unit": {
+ "label": {"en": units},
+ "symbol": {
+ "value": units,
+ "type": None,
+ },
+ },
+ }
+ }
+ coverage_json["domain"]["parameters"].update(parameter_definition)
+ coverage_json["domain"]["ranges"].update(
+ {
+ param: {
+ "type": "NdArray",
+ "dataType": "float",
+ "axisNames": list(data_array.coords.keys()),
+ "shape": data_array.shape,
+ "values": list(data_array.values.flatten()), # Row Major Order
+ }
+ }
+ )
+
+ return coverage_json
+
+ @staticmethod
+ def check_query_condition(conditional: bool, message: str):
+ """Check the provided conditional and raise a ProviderInvalidQueryError if true.
+
+ Parameters
+ ----------
+ conditional : bool
+ The conditional value to check for raising a query error.
+ message : str
+ The message to include if the query error is raised.
+
+ Raises
+ ------
+ ProviderInvalidQueryError
+ Raised if the conditional provided is true.
+ """
+ if conditional:
+ raise ProviderInvalidQueryError(message)
+
+ @staticmethod
+ def validate_datetime(datetime_string: str) -> bool:
+ """Validate whether a string can be converted to a numpy datetime.
+
+ Parameters
+ ----------
+ date_string : str
+ The datetime string to be validated.
+
+ Returns
+ -------
+ bool
+ Whether the datetime string can be converted to a numpy datetime.
+ """
+ try:
+ np.datetime64(datetime_string)
+ return True
+ except ValueError:
+ return False
+
+ @staticmethod
+ def to_geotiff_response(dataset: Dict[str, podpac.UnitsDataArray], collection_id: str) -> Dict[str, Any]:
+ """Generate a geotiff of the data for the provided parameters.
+
+ Parameters
+ ----------
+ dataset : Dict[str, podpac.UnitsDataArray]
+ Data in an units data array format with matching parameter key.
+ collection_id : str
+ The collection id of the data used in naming the zip file if needed.
+
+ Returns
+ -------
+ Dict[str, Any]
+ A dictionary the file name and data with a Base64 encoding.
+ """
+ if len(dataset) == 1:
+ units_data_array = next(iter(dataset.values()))
+ geotiff_bytes = units_data_array.to_format("geotiff").read()
+ return {
+ "fp": base64.b64encode(geotiff_bytes).decode("utf-8"),
+ "fn": f"{next(iter(dataset.keys()))}.tif",
+ }
+ else:
+ zip_buffer = io.BytesIO()
+ with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
+ for parameter, data_array in dataset.items():
+ geotiff_memory_file = data_array.to_format("geotiff")
+ tiff_filename = f"{parameter}.tif"
+ zip_file.writestr(tiff_filename, geotiff_memory_file.read())
+
+ zip_buffer.seek(0)
+ return {"fp": base64.b64encode(zip_buffer.read()).decode("utf-8"), "fn": f"{collection_id}.zip"}
+
+ @staticmethod
+ def get_native_coordinates(
+ source_coordinates: podpac.Coordinates,
+ target_coordinates: podpac.Coordinates,
+ source_time_instance: np.datetime64 | None,
+ ) -> podpac.Coordinates:
+ """Find the intersecting coordinates between source and target coordinates.
+ Convert time instances to offsets for node evalutation.
+
+ Parameters
+ ----------
+ source_coordinates : podpac.Coordinates
+ The source coordinates to be converted.
+ target_coordinates : podpac.Coordinates
+ The target coordinates to find intersections on.
+ source_time_instance : np.datetime64 | None
+ The time instance of the source coordinates to convert to offsets.
+
+ Returns
+ -------
+ podpac.Coordinates
+ The converted coordinates source coordinates intersecting with the target coordinates.
+ """
+ # Handle conversion from times and instance to time and offsets
+ if (
+ "forecastOffsetHr" in target_coordinates.udims
+ and "time" in target_coordinates.udims
+ and "time" in source_coordinates.udims
+ and source_time_instance is not None
+ ):
+ time_deltas = []
+ for time in source_coordinates["time"].coordinates:
+ offset = np.timedelta64(time - source_time_instance, "h")
+ time_deltas.append(offset)
+
+ # This modifies the time coordinates to account for the new forecast offset hour
+ new_coordinates = podpac.Coordinates(
+ [[source_time_instance], time_deltas],
+ ["time", "forecastOffsetHr"],
+ crs=source_coordinates.crs,
+ )
+ source_coordinates = podpac.coordinates.merge_dims([source_coordinates.drop("time"), new_coordinates])
+
+ # Find intersections with target keeping source crs
+ source_intersection_coordinates = target_coordinates.intersect(source_coordinates)
+ source_intersection_coordinates = source_intersection_coordinates.transform(source_coordinates.crs)
+
+ return source_intersection_coordinates
diff --git a/ogc/edr/edr_routes.py b/ogc/edr/edr_routes.py
new file mode 100644
index 0000000..1c752da
--- /dev/null
+++ b/ogc/edr/edr_routes.py
@@ -0,0 +1,223 @@
+import os
+import mimetypes
+import json
+import base64
+import io
+import traitlets as tl
+import pygeoapi.l10n
+import pygeoapi.plugin
+import pygeoapi.api
+import pygeoapi.api.environmental_data_retrieval as pygeoedr
+from typing import Tuple, Any, Dict
+from http import HTTPStatus
+from copy import deepcopy
+from pygeoapi.openapi import get_oas
+from ogc import podpac as pogc
+
+from .edr_config import EdrConfig
+
+
+class EdrRoutes(tl.HasTraits):
+ """Class responsible for routing EDR requests to the appropriate pygeoapi API method."""
+
+ base_url = tl.Unicode(default_value="http://127.0.0.1:5000/")
+ layers = tl.List(trait=tl.Instance(pogc.Layer))
+
+ def __init__(self, **kwargs):
+ """Initialize the API based on the available layers."""
+ super().__init__(**kwargs)
+ self.api = self.create_api()
+
+ @tl.observe("layers")
+ def layers_change(self, change: Dict[str, Any]):
+ """Monitor the layers and update the API when a change occurs.
+
+ Parameters
+ ----------
+ change : Dict[str, Any]
+ Dictionary holding type of modification and name of the attribute that triggered it.
+ """
+ self.api = self.create_api()
+
+ def create_api(self) -> pygeoapi.api.API:
+ """Create the pygeoapi API using a custom configuration.
+
+ Returns
+ -------
+ pygeoapi.api.API
+ The API which handles all EDR requests.
+ """
+ # Allow specifying GeoTiff or CoverageJSON in the format argument.
+ # This is a bypass which is needed to get by a conditional check in pygeoapi.
+ pygeoapi.plugin.PLUGINS["formatter"]["GeoTiff"] = ""
+ pygeoapi.plugin.PLUGINS["formatter"]["CoverageJSON"] = ""
+
+ config = EdrConfig.get_configuration(self.base_url, self.layers)
+ open_api = get_oas(config, fail_on_invalid_collection=False)
+ return pygeoapi.api.API(config=deepcopy(config), openapi=open_api)
+
+ def clean_configuration_cache(self):
+ """Clean a pygeoapi internal translation cache so that multiple configurations can be used simultaneously."""
+ pygeoapi.l10n._cfg_cache = {}
+
+ def static_files(self, request: pygeoapi.api.APIRequest, file_path: str) -> Tuple[dict, int, str | bytes]:
+ """Handle static file requests using the custom static file folder or the pygeoapi default folder.
+
+ Parameters
+ ----------
+ file_path : str
+ The file path of the requested static resource.
+
+ Returns
+ -------
+ Tuple[dict, int, str | bytes]
+ Headers, HTTP Status, and Content returned as a tuple to make the server response.
+ """
+ self.clean_configuration_cache()
+ static_path = os.path.join(os.path.dirname(pygeoapi.__file__), "static")
+ if "templates" in self.api.config["server"]:
+ static_path = self.api.config["server"]["templates"].get("static", static_path)
+ file_path = os.path.join(static_path, file_path)
+ if os.path.isfile(file_path):
+ mime_type, _ = mimetypes.guess_type(file_path)
+ mime_type = mime_type or "application/octet-stream"
+ with open(file_path, "rb") as f:
+ content = f.read()
+ return {"Content-Type": mime_type}, HTTPStatus.OK, content
+ else:
+ return {}, HTTPStatus.NOT_FOUND, b"File not found"
+
+ def landing_page(self, request: pygeoapi.api.APIRequest) -> Tuple[dict, int, str | bytes]:
+ """Handle landing page requests for the server.
+
+ Parameters
+ ----------
+ request : pygeoapi.api.APIRequest
+ The pygeoapi request for the server.
+
+ Returns
+ -------
+ Tuple[dict, int, str | bytes]
+ Headers, HTTP Status, and Content returned as a tuple to make the server response.
+ """
+ self.clean_configuration_cache()
+ return pygeoapi.api.landing_page(self.api, request)
+
+ def openapi(self, request: pygeoapi.api.APIRequest) -> Tuple[dict, int, str | bytes]:
+ """Handle API documentation requests for the server.
+
+ Parameters
+ ----------
+ request : pygeoapi.api.APIRequest
+ The pygeoapi request for the server.
+
+ Returns
+ -------
+ Tuple[dict, int, str | bytes]
+ Headers, HTTP Status, and Content returned as a tuple to make the server response.
+ """
+ self.clean_configuration_cache()
+ return pygeoapi.api.openapi_(self.api, request)
+
+ def conformance(self, request: pygeoapi.api.APIRequest) -> Tuple[dict, int, str | bytes]:
+ """Handle conformance requests for the server.
+
+ Parameters
+ ----------
+ request : pygeoapi.api.APIRequest
+ The pygeoapi request for the server.
+
+ Returns
+ -------
+ Tuple[dict, int, str | bytes]
+ Headers, HTTP Status, and Content returned as a tuple to make the server response.
+ """
+ self.clean_configuration_cache()
+ return pygeoapi.api.conformance(self.api, request)
+
+ def describe_collections(
+ self,
+ request: pygeoapi.api.APIRequest,
+ collection_id: str | None,
+ ) -> Tuple[dict, int, str | bytes]:
+ """Handle describe collection requests for the server.
+
+ Parameters
+ ----------
+ request : pygeoapi.api.APIRequest
+ The pygeoapi request for the server.
+ collection_id : str | None
+ The collection ID to describe.
+
+ Returns
+ -------
+ Tuple[dict, int, str | bytes]
+ Headers, HTTP Status, and Content returned as a tuple to make the server response.
+ """
+ self.clean_configuration_cache()
+ return pygeoapi.api.describe_collections(self.api, request, collection_id)
+
+ def describe_instances(
+ self,
+ request: pygeoapi.api.APIRequest,
+ collection_id: str,
+ instance_id: str | None,
+ ) -> Tuple[dict, int, str | bytes]:
+ """Handle collection instances requests for the server.
+
+ Parameters
+ ----------
+ request : pygeoapi.api.APIRequest
+ The pygeoapi request for the server.
+ collection_id : str
+ The collection ID for the instances.
+ instance_id: str
+ The instance ID to describe.
+
+ Returns
+ -------
+ Tuple[dict, int, str | bytes]
+ Headers, HTTP Status, and Content returned as a tuple to make the server response.
+ """
+ self.clean_configuration_cache()
+ return pygeoedr.get_collection_edr_instances(self.api, request, collection_id, instance_id=instance_id)
+
+ def collection_query(
+ self,
+ request: pygeoapi.api.APIRequest,
+ collection_id: str,
+ instance_id: str | None,
+ query_type: str,
+ ) -> Tuple[dict, int, Any]:
+ """Handle collection and instance query requests for the server.
+
+ Parameters
+ ----------
+ request : pygeoapi.api.APIRequest
+ The pygeoapi request for the server.
+ query_type: str
+ The query type for the request.
+ collection_id : str
+ The collection ID for the query.
+ instance_id: str
+ The instance ID for the query.
+
+ Returns
+ -------
+ Tuple[dict, int, Any]
+ Headers, HTTP Status, and Content returned as a tuple to make the server response.
+ """
+ self.clean_configuration_cache()
+ headers, http_status, content = pygeoedr.get_collection_edr_query(
+ self.api, request, collection_id, instance_id, query_type=query_type, location_id=None
+ )
+
+ content = json.loads(content)
+ if "fn" in content and "fp" in content:
+ # Return the file name in the header and the content as only the binary data
+ filename = content["fn"]
+ headers["Content-Disposition"] = f"attachment; filename={filename}"
+ # Decode the content string which is the Base64 representation of the data
+ content = io.BytesIO(base64.b64decode(content["fp"]))
+
+ return headers, http_status, content
diff --git a/ogc/edr/static/img/favicon.ico b/ogc/edr/static/img/favicon.ico
new file mode 100644
index 0000000..1e20f9c
Binary files /dev/null and b/ogc/edr/static/img/favicon.ico differ
diff --git a/ogc/edr/static/img/logo.png b/ogc/edr/static/img/logo.png
new file mode 100644
index 0000000..4b8ebad
Binary files /dev/null and b/ogc/edr/static/img/logo.png differ
diff --git a/ogc/edr/static/img/pygeoapi.png b/ogc/edr/static/img/pygeoapi.png
new file mode 100644
index 0000000..0477111
Binary files /dev/null and b/ogc/edr/static/img/pygeoapi.png differ
diff --git a/ogc/edr/test/conftest.py b/ogc/edr/test/conftest.py
new file mode 100644
index 0000000..b87b3e5
--- /dev/null
+++ b/ogc/edr/test/conftest.py
@@ -0,0 +1,85 @@
+import pytest
+import numpy as np
+import datetime
+import podpac
+from ogc import podpac as pogc
+from typing import Dict, List, Any
+
+# Setup new dimension
+podpac.core.coordinates.utils.add_valid_dimension("forecastOffsetHr")
+
+lat = np.linspace(90, -90, 11)
+lon = np.linspace(-180, 180, 21)
+time = np.array(["2025-10-24T12:00:00"], dtype="datetime64")
+offsets = [np.timedelta64(0, "h")]
+data = np.random.default_rng(1).random((11, 21, 1, 1))
+coords = podpac.Coordinates([lat, lon, time, offsets], dims=["lat", "lon", "time", "forecastOffsetHr"])
+
+# Define test layers using sample data and coordinates
+node1 = podpac.data.Array(source=data, coordinates=coords)
+layer1 = pogc.Layer(
+ node=node1,
+ identifier="layer1",
+ title="Layer 1",
+ abstract="Layer1 Data",
+ group="Layers",
+ valid_times=[dt.astype(datetime.datetime) for dt in time],
+)
+node2 = podpac.data.Array(source=data, coordinates=coords)
+layer2 = pogc.Layer(
+ node=node2,
+ identifier="layer2",
+ title="Layer 2",
+ abstract="Layer2 Data",
+ group="Layers",
+ valid_times=[dt.astype(datetime.datetime) for dt in time],
+)
+
+
+@pytest.fixture()
+def layers() -> List[pogc.Layer]:
+ """List of test layers.
+
+ Returns
+ -------
+ List[pogc.Layer]
+ The test layers.
+ """
+ return [layer1, layer2]
+
+
+@pytest.fixture()
+def single_layer_cube_args() -> Dict[str, Any]:
+ """Dictionary of valid request arguments that align to a single test layer cube request.
+
+ Returns
+ -------
+ Dict[str, Any]
+ Valid cube request arguments for a single test layer.
+ """
+
+ return {
+ "f": "json",
+ "bbox": "-180, -90, 180, 90",
+ "datetime": str(time[0]),
+ "parameter-name": [layer1.identifier],
+ }
+
+
+@pytest.fixture()
+def single_layer_cube_args_internal() -> Dict[str, Any]:
+ """Dictionary of valid arguments that align to a single test layer request with internal pygeoapi keys.
+
+ Returns
+ -------
+ Dict[str, Any]
+ Valid internal cube arguments for a single test layer.
+ """
+
+ return {
+ "format_": "json",
+ "instance": str(time[0]),
+ "bbox": [-180, -90, 180, 90],
+ "datetime_": str(time[0]),
+ "select_properties": [layer1.identifier],
+ }
diff --git a/ogc/edr/test/test_edr_config.py b/ogc/edr/test/test_edr_config.py
new file mode 100644
index 0000000..0a1a710
--- /dev/null
+++ b/ogc/edr/test/test_edr_config.py
@@ -0,0 +1,64 @@
+from typing import Dict, List, Any
+from ogc import podpac as pogc
+from ogc.edr.edr_config import EdrConfig
+
+
+def test_edr_default_configuration_has_required_keys():
+ """Test the EDR default configuration loads the required keys."""
+ configuration = EdrConfig.get_configuration("/ogc", [])
+
+ assert configuration.keys() == {"server", "logging", "metadata", "resources"}
+
+
+def test_edr_configuration_contains_layer_groups(layers: List[pogc.Layer]):
+ """Test the EDR configuration contains the layer groups.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ group_keys = {layer.group for layer in layers}
+ configuration = EdrConfig.get_configuration("/ogc", layers)
+
+ assert len(group_keys) > 0
+ for key in group_keys:
+ assert configuration["resources"].get(key, None) is not None
+
+
+def test_edr_configuration_contains_spatial_extent(layers: List[pogc.Layer], single_layer_cube_args: Dict[str, Any]):
+ """Test the EDR configuration contains the spatial extent.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args : Dict[str, Any]
+ Single layer arguments for validation checking provided by a test fixture.
+ """
+ group_keys = {layer.group for layer in layers}
+ configuration = EdrConfig.get_configuration("/ogc", layers)
+
+ assert len(group_keys) > 0
+ for key in group_keys:
+ assert configuration["resources"][key]["extents"]["spatial"]["bbox"] == list(
+ map(float, single_layer_cube_args["bbox"].split(","))
+ )
+
+
+def test_edr_configuration_contains_custom_provider(layers: List[pogc.Layer]):
+ """Test the EDR configuration contains the custom provider.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ group_keys = {layer.group for layer in layers}
+ configuration = EdrConfig.get_configuration("/ogc", layers)
+
+ assert len(group_keys) > 0
+ for key in group_keys:
+ assert configuration["resources"][key]["providers"][0]["type"] == "edr"
+ assert configuration["resources"][key]["providers"][0]["name"] == "ogc.edr.edr_provider.EdrProvider"
diff --git a/ogc/edr/test/test_edr_provider.py b/ogc/edr/test/test_edr_provider.py
new file mode 100644
index 0000000..a7e89b6
--- /dev/null
+++ b/ogc/edr/test/test_edr_provider.py
@@ -0,0 +1,553 @@
+import pytest
+import numpy as np
+import zipfile
+import base64
+import io
+from shapely import Point, Polygon
+from typing import Dict, List, Any
+from ogc import podpac as pogc
+from ogc.edr.edr_provider import EdrProvider
+from pygeoapi.provider.base import ProviderInvalidQueryError
+
+
+def get_provider_definition(layers: List[pogc.Layer]) -> Dict[str, Any]:
+ """Define the provider definition which is typically handled by pygeoapi.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers for the provider to use in defining available data sources.
+
+ Returns
+ -------
+ Dict[str, Any]
+ The provider definition which defines data sources.
+ """
+ return {
+ "type": "edr",
+ "default": True,
+ "name": "ogc.edr.edr_provider.EdrProvider",
+ "data": "Layers",
+ "layers": layers,
+ "crs": ["https://www.opengis.net/def/crs/OGC/1.3/CRS84", "https://www.opengis.net/def/crs/EPSG/0/4326"],
+ "format": {"name": "GeoJSON", "mimetype": "application/json"},
+ }
+
+
+def test_edr_provider_resources(layers: List[pogc.Layer]):
+ """Test the available resources of the EDR Provider class.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ identifiers = [layer.identifier for layer in layers]
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ assert len(provider.layers) == len(layers)
+ assert all(layer.identifier in identifiers for layer in provider.layers)
+
+
+def test_edr_provider_get_instance_valid_id(layers: List[pogc.Layer]):
+ """Test the get_instance method of the EDR Provider class with a valid id.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ time_instance = next(iter(layers[0].time_instances()))
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ assert provider.get_instance(time_instance) == time_instance
+
+
+def test_edr_provider_get_instance_invalid_id(layers: List[pogc.Layer]):
+ """Test the get_instance method of the EDR Provider class with an invalid id.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ assert provider.get_instance("invalid") is None
+
+
+def test_edr_provider_parameter_keys(layers: List[pogc.Layer]):
+ """Test the parameters property of the EDR Provider class.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ identifiers = [layer.identifier for layer in layers]
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+ parameters = provider.parameters
+
+ assert len(list(parameters.keys())) == len(layers)
+ assert all(identifier in identifiers for identifier in parameters.keys())
+
+
+def test_edr_provider_instances(layers: List[pogc.Layer]):
+ """Test the instances method of the EDR Provider class.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ instance_sets = [layer.time_instances() for layer in layers]
+ time_instances = set().union(*instance_sets)
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+ instances = provider.instances()
+
+ assert len(instances) == len(time_instances)
+ assert instances == [str(t) for t in time_instances]
+
+
+def test_edr_provider_get_fields(layers: List[pogc.Layer]):
+ """Test the get fields method of the EDR Provider class.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ identifiers = [layer.identifier for layer in layers]
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+ fields = provider.get_fields()
+
+ assert len(fields.keys()) == len(layers)
+ assert all(identifier in identifiers for identifier in fields.keys())
+
+
+def test_edr_provider_position_request_valid_wkt(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the position method of the EDR Provider class with a valid WKT.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ del args["bbox"]
+ args["wkt"] = Point(5.2, 52.1)
+ parameter_name = single_layer_cube_args_internal["select_properties"][0]
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ response = provider.position(**args)
+
+ assert set(response["domain"]["ranges"][parameter_name]["axisNames"]) == set(
+ layers[0].node.find_coordinates()[0].dims
+ )
+ assert np.prod(np.array(response["domain"]["ranges"][parameter_name]["shape"])) == len(
+ response["domain"]["ranges"][parameter_name]["values"]
+ )
+
+
+def test_edr_provider_position_request_invalid_wkt(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the position method of the EDR Provider class with an invalid WKT.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ del args["bbox"]
+ args["wkt"] = "invalid"
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ with pytest.raises(ProviderInvalidQueryError):
+ provider.position(**args)
+
+
+def test_edr_provider_position_request_invalid_property(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the position method of the EDR Provider class with an invalid property.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ args["select_properties"] = "invalid"
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ with pytest.raises(ProviderInvalidQueryError):
+ provider.position(**args)
+
+
+def test_edr_provider_cube_request_valid_bbox(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the cube method of the EDR Provider class with a valid bounding box.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ parameter_name = single_layer_cube_args_internal["select_properties"][0]
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ response = provider.cube(**args)
+
+ assert set(response["domain"]["ranges"][parameter_name]["axisNames"]) == set(
+ layers[0].node.find_coordinates()[0].dims
+ )
+ assert np.prod(np.array(response["domain"]["ranges"][parameter_name]["shape"])) == len(
+ response["domain"]["ranges"][parameter_name]["values"]
+ )
+
+
+def test_edr_provider_cube_request_invalid_bbox(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the cube method of the EDR Provider class with an invalid bounding box.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ args["bbox"] = "invalid"
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ with pytest.raises(ProviderInvalidQueryError):
+ provider.cube(**args)
+
+
+def test_edr_provider_cube_request_invalid_altitude(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the cube method of the EDR Provider class with an invalid altitude.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ args["z"] = "invalid"
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ with pytest.raises(ProviderInvalidQueryError):
+ provider.position(**args)
+
+
+def test_edr_provider_area_request_valid_wkt(layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]):
+ """Test the area method of the EDR Provider class with a valid wkt.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ del args["bbox"]
+ args["wkt"] = Polygon(((-180.0, -90.0), (-180.0, 90.0), (180.0, -90.0), (180.0, 90.0)))
+ parameter_name = single_layer_cube_args_internal["select_properties"][0]
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ response = provider.area(**args)
+
+ assert set(response["domain"]["ranges"][parameter_name]["axisNames"]) == set(
+ layers[0].node.find_coordinates()[0].dims
+ )
+ assert np.prod(np.array(response["domain"]["ranges"][parameter_name]["shape"])) == len(
+ response["domain"]["ranges"][parameter_name]["values"]
+ )
+
+
+def test_edr_provider_area_request_invalid_wkt(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the area method of the EDR Provider class with an invalid wkt.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ del args["bbox"]
+ args["wkt"] = "invalid"
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ with pytest.raises(ProviderInvalidQueryError):
+ provider.area(**args)
+
+
+def test_edr_provider_cube_request_invalid_datetime(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the area method of the EDR Provider class with an invalid datetime.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ args["datetime_"] = "10_24/2025"
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ with pytest.raises(ProviderInvalidQueryError):
+ provider.cube(**args)
+
+
+def test_edr_provider_cube_request_valid_geotiff_format(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the query method of the EDR Provider class with a valid geotiff request.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ args["format_"] = "geotiff"
+ parameter_name = single_layer_cube_args_internal["select_properties"][0]
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ response = provider.cube(**args)
+
+ assert response["fn"] == f"{parameter_name}.tif"
+ assert len(base64.b64decode(response["fp"])) > 0
+
+
+def test_edr_provider_cube_request_valid_geotiff_format_multiple_parameters(
+ layers: List[pogc.Layer], single_layer_cube_args_internal: Dict[str, Any]
+):
+ """Test the query method of the EDR Provider class with a valid geotiff request.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args_internal : Dict[str, Any]
+ Single layer arguments with internal pygeoapi keys provided by a test fixture.
+ """
+ args = single_layer_cube_args_internal
+ args["format_"] = "geotiff"
+
+ # Set the properties argument as multiple layers from the same group/collection
+ group = layers[0].group
+ selected_layers = [layer.identifier for layer in layers if layer.group == group]
+ args["select_properties"] = selected_layers
+
+ provider = EdrProvider(provider_def=get_provider_definition(layers))
+
+ response = provider.cube(**args)
+ buffer = io.BytesIO(base64.b64decode(response["fp"]))
+
+ assert response["fn"] == f"{group}.zip"
+ assert zipfile.is_zipfile(buffer)
+ with zipfile.ZipFile(buffer, "r") as zf:
+ namelist = zf.namelist()
+ assert len(namelist) > 0
+ assert all(f"{layer}.tif" in namelist for layer in selected_layers)
+
+
+def test_edr_provider_datetime_single_value():
+ """Test the datetime interpreter method of the EDR Provider class with a single datetime value."""
+ time_string = "2025-10-24"
+ available_times = ["2025-10-24", "2025-10-25", "2025-10-26", "2025-10-27", "2025-10-28"]
+ expected_times = [np.datetime64(available_times[0])]
+
+ time_coords = EdrProvider.interpret_time_coordinates(available_times, time_string, None)
+
+ assert time_coords is not None
+ np.testing.assert_array_equal(time_coords["time"].coordinates, expected_times)
+
+
+def test_edr_provider_datetime_range_closed():
+ """Test the datetime interpreter method of the EDR Provider class with a closed datetime range."""
+ time_string = "2025-10-24/2025-10-26"
+ available_times = ["2025-10-24", "2025-10-25", "2025-10-26", "2025-10-27", "2025-10-28"]
+ expected_times = [np.datetime64(time) for time in available_times[0:3]]
+
+ time_coords = EdrProvider.interpret_time_coordinates(available_times, time_string, None)
+
+ assert time_coords is not None
+ np.testing.assert_array_equal(time_coords["time"].coordinates, expected_times)
+
+
+def test_edr_provider_datetime_open_start():
+ """Test the datetime interpreter method of the EDR Provider class with a open datetime start."""
+ time_string = "../2025-10-27"
+ available_times = ["2025-10-24", "2025-10-25", "2025-10-26", "2025-10-27", "2025-10-28"]
+ expected_times = [np.datetime64(time) for time in available_times[0:4]]
+
+ time_coords = EdrProvider.interpret_time_coordinates(available_times, time_string, None)
+
+ assert time_coords is not None
+ np.testing.assert_array_equal(time_coords["time"].coordinates, expected_times)
+
+
+def test_edr_provider_datetime_open_end():
+ """Test the datetime interpreter method of the EDR Provider class with a open datetime end."""
+ time_string = "2025-10-25/.."
+ available_times = ["2025-10-24", "2025-10-25", "2025-10-26", "2025-10-27", "2025-10-28"]
+ expected_times = [np.datetime64(time) for time in available_times[1:]]
+
+ time_coords = EdrProvider.interpret_time_coordinates(available_times, time_string, None)
+
+ assert time_coords is not None
+ np.testing.assert_array_equal(time_coords["time"].coordinates, expected_times)
+
+
+def test_edr_provider_datetime_invalid_string():
+ """Test the datetime interpreter method of the EDR Provider class with an invalid string."""
+ time_string = "2025-10-25/../../.."
+ available_times = ["2025-10-24", "2025-10-25", "2025-10-26", "2025-10-27", "2025-10-28"]
+
+ time_coords = EdrProvider.interpret_time_coordinates(available_times, time_string, None)
+
+ assert time_coords is None
+
+
+def test_edr_provider_altitude_single_value():
+ """Test the altitude interpreter method of the EDR Provider class with a single datetime value."""
+ altitude_string = "10"
+ available_altitudes = [0.0, 5.0, 10.0, 15.0, 20.0]
+ expected_altitudes = [10.0]
+
+ altitude_coords = EdrProvider.interpret_altitude_coordinates(available_altitudes, altitude_string, None)
+
+ assert altitude_coords is not None
+ np.testing.assert_array_equal(altitude_coords["alt"].coordinates, expected_altitudes)
+
+
+def test_edr_provider_altitude_range_closed():
+ """Test the altitude interpreter method of the EDR Provider class with a closed datetime range."""
+ altitude_string = "10/20"
+ available_altitudes = [0.0, 5.0, 10.0, 15.0, 20.0]
+ expected_altitudes = [10.0, 15.0, 20.0]
+
+ altitude_coords = EdrProvider.interpret_altitude_coordinates(available_altitudes, altitude_string, None)
+
+ assert altitude_coords is not None
+ np.testing.assert_array_equal(altitude_coords["alt"].coordinates, expected_altitudes)
+
+
+def test_edr_provider_altitude_repeating_interval():
+ """Test the altitude interpreter method of the EDR Provider class with a repeating interval."""
+ altitude_string = "R2/5/5"
+ available_altitudes = [0.0, 5.0, 10.0, 15.0, 20.0]
+ expected_altitudes = [5.0, 10.0]
+
+ altitude_coords = EdrProvider.interpret_altitude_coordinates(available_altitudes, altitude_string, None)
+
+ assert altitude_coords is not None
+ np.testing.assert_array_equal(altitude_coords["alt"].coordinates, expected_altitudes)
+
+
+def test_edr_provider_altitude_list():
+ """Test the altitude interpreter method of the EDR Provider class with a list."""
+ altitude_string = "5,10,15"
+ available_altitudes = [0.0, 5.0, 10.0, 15.0, 20.0]
+ expected_altitudes = [5.0, 10.0, 15.0]
+
+ altitude_coords = EdrProvider.interpret_altitude_coordinates(available_altitudes, altitude_string, None)
+
+ assert altitude_coords is not None
+ np.testing.assert_array_equal(altitude_coords["alt"].coordinates, expected_altitudes)
+
+
+def test_edr_provider_altitude_invalid_string():
+ """Test the altitude interpreter method of the EDR Provider class with an invalid string."""
+ altitude_string = "../20"
+ available_altitudes = [0.0, 5.0, 10.0, 15.0, 20.0]
+
+ altitude_coords = EdrProvider.interpret_altitude_coordinates(available_altitudes, altitude_string, None)
+
+ assert altitude_coords is None
+
+
+def test_edr_provider_crs_interpreter_default_value():
+ """Test the CRS interpretation returns a default value when the argument is None."""
+ assert EdrProvider.interpret_crs(None) == "urn:ogc:def:crs:OGC:1.3:CRS84"
+
+
+def test_edr_provider_crs_interpreter_valid_value():
+ """Test the CRS interpretation returns a valid value when the argument is acceptable."""
+ assert EdrProvider.interpret_crs("epsg:4326") == "epsg:4326"
+
+
+def test_edr_provider_crs_interpreter_invalid_value():
+ """Test the CRS interpretation raises an exception when an invalid argument is provided."""
+ with pytest.raises(ProviderInvalidQueryError):
+ EdrProvider.interpret_crs("epsp:4444")
+
+
+def test_edr_provider_crs_converter():
+ """Test the CRS converter returns latitude and longitude data properly."""
+ x = [1, 2, 3]
+ y = [3, 4, 5]
+
+ # EPSG:4326 specifies x (latitude) and y (longitude)
+ lon = y
+ lat = x
+
+ assert EdrProvider.crs_converter(x, y, "epsg:4326") == (lon, lat)
diff --git a/ogc/edr/test/test_edr_routes.py b/ogc/edr/test/test_edr_routes.py
new file mode 100644
index 0000000..caa5cd6
--- /dev/null
+++ b/ogc/edr/test/test_edr_routes.py
@@ -0,0 +1,338 @@
+import json
+import numpy as np
+from pygeoapi.api import APIRequest
+from http import HTTPStatus
+from typing import Dict, List, Any
+from werkzeug.test import create_environ
+from werkzeug.wrappers import Request
+from werkzeug.datastructures import ImmutableMultiDict
+from ogc import podpac as pogc
+from ogc.edr.edr_routes import EdrRoutes
+
+
+def mock_request(request_args: Dict[str, Any] = {}) -> APIRequest:
+ """Creates a mock request for EDR routes to use.
+
+
+ Parameters
+ ----------
+ request_args: Dict[str, Any], optional
+ The dictionary for query string arguments.
+
+ Returns
+ -------
+ APIRequest
+ Mock API request for route testing.
+ """
+ environ = create_environ(base_url="http://127.0.0.1:5000/ogc/")
+ request = Request(environ)
+ request.args = ImmutableMultiDict(request_args.items())
+ return APIRequest(request, ["en"])
+
+
+def test_edr_routes_static_files_valid_path():
+ """Test the EDR static routes with a valid static file path."""
+ request = mock_request()
+ edr_routes = EdrRoutes(layers=[])
+
+ headers, status, _ = edr_routes.static_files(request, "img/logo.png")
+
+ assert status == HTTPStatus.OK
+ assert headers["Content-Type"] == "image/png"
+
+
+def test_edr_routes_static_files_invalid_path():
+ """Test the EDR static routes with an invalid static file path."""
+ request = mock_request()
+ edr_routes = EdrRoutes(layers=[])
+
+ _, status, _ = edr_routes.static_files(request, "invalid")
+
+ assert status == HTTPStatus.NOT_FOUND
+
+
+def test_edr_routes_landing_page():
+ """Test the EDR landing page for a response."""
+ request = mock_request({"f": "json"})
+ edr_routes = EdrRoutes(layers=[])
+
+ headers, status, _ = edr_routes.landing_page(request)
+
+ assert status == HTTPStatus.OK
+ assert headers["Content-Type"] == "application/json"
+
+
+def test_edr_routes_landing_page_html():
+ """Test the EDR landing page for a response."""
+ request = mock_request({"f": "html"})
+ edr_routes = EdrRoutes(layers=[])
+
+ headers, status, _ = edr_routes.landing_page(request)
+
+ assert status == HTTPStatus.OK
+ assert headers["Content-Type"] == "text/html"
+
+
+def test_edr_routes_conformance(layers: List[pogc.Layer]):
+ """Test the EDR conformance for a response.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ request = mock_request({"f": "json"})
+ edr_routes = EdrRoutes(layers=layers)
+
+ _, status, content = edr_routes.conformance(request)
+ response = json.loads(content)
+
+ assert status == HTTPStatus.OK
+ assert len(response["conformsTo"]) > 0
+ assert "http://www.opengis.net/spec/ogcapi-edr-1/1.0/conf/core" in response["conformsTo"]
+
+
+def test_edr_routes_api():
+ """Test the EDR api documentation for a response."""
+ request = mock_request({"f": "json"})
+ edr_routes = EdrRoutes(layers=[])
+
+ _, status, content = edr_routes.openapi(request)
+ response = json.loads(content)
+
+ assert status == HTTPStatus.OK
+ assert response["paths"]["/"]
+ assert response["paths"]["/openapi"]
+ assert response["paths"]["/conformance"]
+ assert response["paths"]["/collections"]
+
+
+def test_edr_routes_describe_collections(layers: List[pogc.Layer]):
+ """Test the EDR collections description for a response.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ request = mock_request({"f": "json"})
+ edr_routes = EdrRoutes(layers=layers)
+ collections = {layer.group for layer in layers}
+
+ _, status, content = edr_routes.describe_collections(request, collection_id=None)
+ response = json.loads(content)
+
+ assert status == HTTPStatus.OK
+ assert len(response["collections"]) == len(collections)
+
+ response_collection_ids = [collection["id"] for collection in response["collections"]]
+
+ assert response_collection_ids == list(collections)
+
+
+def test_edr_routes_describe_collection(layers: List[pogc.Layer]):
+ """Test the EDR collection description for a response.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ request = mock_request({"f": "json"})
+ edr_routes = EdrRoutes(layers=layers)
+ collection_id = layers[0].group
+ collection_layers = [layer for layer in layers if layer.group == collection_id]
+
+ _, status, content = edr_routes.describe_collections(request, collection_id=collection_id)
+ response = json.loads(content)
+
+ assert status == HTTPStatus.OK
+ assert response["id"] == collection_id
+ assert list(response["parameter_names"].keys()) == [layer.identifier for layer in collection_layers]
+ assert list(response["data_queries"].keys()) == ["position", "cube", "area", "instances"]
+
+
+def test_edr_routes_describe_instances(layers: List[pogc.Layer]):
+ """Test the EDR instances description for a response.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ request = mock_request({"f": "json"})
+ edr_routes = EdrRoutes(layers=layers)
+ collection_id = layers[0].group
+ time_instances = set()
+ for layer in layers:
+ if layer.group == collection_id:
+ time_instances.update(layer.time_instances())
+
+ _, status, content = edr_routes.describe_instances(request, collection_id=collection_id, instance_id=None)
+ response = json.loads(content)
+
+ assert status == HTTPStatus.OK
+ assert len(response["instances"]) == len(time_instances)
+
+ response_time_instances_ids = [instance["id"] for instance in response["instances"]]
+ assert response_time_instances_ids == list(time_instances)
+
+
+def test_edr_routes_describe_instance(layers: List[pogc.Layer]):
+ """Test the EDR instance description for a response.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+ """
+ request = mock_request({"f": "json"})
+ edr_routes = EdrRoutes(layers=layers)
+ collection_id = layers[0].group
+ instance_id = next(iter(layers[0].time_instances()))
+
+ _, status, content = edr_routes.describe_instances(request, collection_id=collection_id, instance_id=instance_id)
+ response = json.loads(content)
+
+ assert status == HTTPStatus.OK
+ assert response["id"] == instance_id
+ assert list(response["data_queries"].keys()) == ["position", "cube", "area"]
+
+
+def test_edr_routes_collection_query(layers: List[pogc.Layer], single_layer_cube_args: Dict[str, Any]):
+ """Test the EDR collection query for a reponse.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args : Dict[str, Any]
+ Single layer arguments provided by a test fixture.
+ """
+ collection_id = layers[0].group
+ instance_id = next(iter(layers[0].time_instances()))
+ parameter_name = single_layer_cube_args["parameter-name"][0]
+ single_layer_cube_args["f"] = "json"
+ request = mock_request(single_layer_cube_args)
+ edr_routes = EdrRoutes(layers=layers)
+
+ _, status, content = edr_routes.collection_query(
+ request,
+ collection_id=collection_id,
+ instance_id=instance_id,
+ query_type="cube",
+ )
+
+ assert status == HTTPStatus.OK
+
+ assert content["domain"]["ranges"][parameter_name]["axisNames"] == list(layers[0].node.find_coordinates()[0].dims)
+ assert np.prod(np.array(content["domain"]["ranges"][parameter_name]["shape"])) == len(
+ content["domain"]["ranges"][parameter_name]["values"]
+ )
+
+
+def test_edr_routes_collection_query_geotiff_format(layers: List[pogc.Layer], single_layer_cube_args: Dict[str, Any]):
+ """Test the EDR collection query for a GeoTiff formatted reponse.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args : Dict[str, Any]
+ Single layer arguments provided by a test fixture.
+ """
+ collection_id = layers[0].group
+ instance_id = next(iter(layers[0].time_instances()))
+ single_layer_cube_args["f"] = "geotiff"
+ request = mock_request(single_layer_cube_args)
+ edr_routes = EdrRoutes(layers=layers)
+
+ headers, status, _ = edr_routes.collection_query(
+ request,
+ collection_id=collection_id,
+ instance_id=instance_id,
+ query_type="cube",
+ )
+
+ assert status == HTTPStatus.OK
+ assert headers["Content-Disposition"] == f"attachment; filename={layers[0].identifier}.tif"
+
+
+def test_edr_routes_collection_query_invalid_type(layers: List[pogc.Layer], single_layer_cube_args: Dict[str, Any]):
+ """Test the EDR collection query with an invalid query type.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args : Dict[str, Any]
+ Single layer arguments provided by a test fixture.
+ """
+ collection_id = layers[0].group
+ instance_id = next(iter(layers[0].time_instances()))
+ request = mock_request(single_layer_cube_args)
+ edr_routes = EdrRoutes(layers=layers)
+
+ _, status, _ = edr_routes.collection_query(
+ request,
+ collection_id=collection_id,
+ instance_id=instance_id,
+ query_type="corridor",
+ )
+
+ assert status == HTTPStatus.BAD_REQUEST
+
+
+def test_edr_routes_collection_query_invalid_bbox(layers: List[pogc.Layer], single_layer_cube_args: Dict[str, Any]):
+ """Test the EDR collection query with an invalid bounding box.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args : Dict[str, Any]
+ Single layer arguments provided by a test fixture.
+ """
+ single_layer_cube_args["bbox"] = "invalid"
+ request = mock_request(single_layer_cube_args)
+ edr_routes = EdrRoutes(layers=layers)
+
+ _, status, _ = edr_routes.collection_query(
+ request,
+ collection_id=layers[0].group,
+ instance_id=next(iter(layers[0].time_instances())),
+ query_type="cube",
+ )
+
+ assert status == HTTPStatus.BAD_REQUEST
+
+
+def test_edr_routes_collection_query_missing_parameter(
+ layers: List[pogc.Layer], single_layer_cube_args: Dict[str, Any]
+):
+ """Test the EDR colletion query with a missing parameter.
+
+ Parameters
+ ----------
+ layers : List[pogc.Layer]
+ Layers provided by a test fixture.
+
+ single_layer_cube_args : Dict[str, Any]
+ Single layer arguments provided by a test fixture.
+ """
+ del single_layer_cube_args["parameter-name"]
+ request = mock_request(single_layer_cube_args)
+ edr_routes = EdrRoutes(layers=layers)
+
+ _, status, _ = edr_routes.collection_query(
+ request,
+ collection_id=layers[0].group,
+ instance_id=next(iter(layers[0].time_instances())),
+ query_type="cube",
+ )
+
+ assert status == HTTPStatus.BAD_REQUEST
diff --git a/ogc/podpac.py b/ogc/podpac.py
index 7e86b05..e64a41f 100755
--- a/ogc/podpac.py
+++ b/ogc/podpac.py
@@ -6,7 +6,7 @@
import podpac
from podpac.core.coordinates import Coordinates
import traitlets as tl
-
+from typing import List
from matplotlib import pyplot as plt
import matplotlib as mpl
import io
@@ -55,6 +55,27 @@ def __init__(self, **kwargs):
if self.node is not None and self.node.style.enumeration_legend:
self._style.is_enumerated = True
+ def time_instances(self) -> List[str]:
+ """Retrieve the time instances available for the layer.
+
+ Returns
+ -------
+ List[str]
+ List of available time instances as a strings.
+ """
+ time_instances = set()
+ coordinates_list = self.node.find_coordinates()
+
+ # Time instances are created if a node has both time and offsets.
+ if (
+ len(coordinates_list) > 0
+ and "time" in coordinates_list[0].udims
+ and "forecastOffsetHr" in coordinates_list[0].udims
+ ):
+ time_instances.update([str(time) for time in coordinates_list[0]["time"].coordinates])
+
+ return list(time_instances)
+
def get_node(self, args):
return self.node
diff --git a/ogc/servers.py b/ogc/servers.py
index dd3eab0..8b4932e 100755
--- a/ogc/servers.py
+++ b/ogc/servers.py
@@ -11,8 +11,12 @@
import six
import traceback
import logging
+from typing import Callable
+from werkzeug.datastructures import ImmutableMultiDict
from ogc.ogc_common import WCSException
+from pygeoapi.api import APIRequest
+from pygeoapi.util import get_api_rules
logger = logging.getLogger(__name__)
@@ -104,6 +108,89 @@ def method():
self.add_url_rule(endpoint, view_func=method, methods=["GET", "POST"]) # add render method as flask route
setattr(self, method_name, method) # bind route function call to instance method
+ # Set up the EDR endpoints for the server
+ strict_slashes = get_api_rules(ogc.edr_routes.api.config).strict_slashes
+ self.add_url_rule(
+ f"/{endpoint}/edr",
+ endpoint=f"{endpoint}_landing_page",
+ view_func=self.edr_render(ogc.edr_routes.landing_page),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/static/",
+ endpoint=f"{endpoint}_static_files",
+ view_func=self.edr_render(ogc.edr_routes.static_files),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/api",
+ endpoint=f"{endpoint}_api",
+ view_func=self.edr_render(ogc.edr_routes.openapi),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/openapi",
+ endpoint=f"{endpoint}_openapi",
+ view_func=self.edr_render(ogc.edr_routes.openapi),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/conformance",
+ endpoint=f"{endpoint}_conformance",
+ view_func=self.edr_render(ogc.edr_routes.conformance),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/collections",
+ endpoint=f"{endpoint}_collections",
+ view_func=self.edr_render(ogc.edr_routes.describe_collections),
+ defaults={"collection_id": None},
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/collections/",
+ endpoint=f"{endpoint}_collection",
+ view_func=self.edr_render(ogc.edr_routes.describe_collections),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/collections//instances",
+ endpoint=f"{endpoint}_instances",
+ view_func=self.edr_render(ogc.edr_routes.describe_instances),
+ defaults={"instance_id": None},
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/collections//instances/",
+ endpoint=f"{endpoint}_instance",
+ view_func=self.edr_render(ogc.edr_routes.describe_instances),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/collections//",
+ endpoint=f"{endpoint}_collection_query",
+ view_func=self.edr_render(ogc.edr_routes.collection_query),
+ defaults={"instance_id": None},
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+ self.add_url_rule(
+ f"/{endpoint}/edr/collections//instances//",
+ endpoint=f"{endpoint}_instance_query",
+ view_func=self.edr_render(ogc.edr_routes.collection_query),
+ methods=["GET"],
+ strict_slashes=strict_slashes,
+ )
+
def ogc_render(self, ogc_idx):
logger.info("OGC server.ogc_render %i", ogc_idx)
if request.method != "GET":
@@ -164,6 +251,71 @@ def ogc_render(self, ogc_idx):
ee = WCSException()
return respond_xml(ee.to_xml(), status=500)
+ def edr_render(self, callable: Callable) -> Callable:
+ """Function which returns a wrapper for the provided callable.
+ Filters arguments and handles any necessary exceptions.
+
+ Parameters
+ ----------
+ callable : Callable
+ The callable request handler to be wrapped.
+
+ Returns
+ -------
+ Callable
+ Wrapped callable with exception handling and argument filtering.
+ """
+
+ def wrapper(*args, **kwargs) -> Response:
+ """Wrapper for the request handler.
+
+ Returns
+ -------
+ Response
+ The response to the request from the callable or a 500 response on error.
+ """
+ logger.info("OGC server.edr_render")
+ if request.method != "GET":
+ return respond_xml("Only GET supported
", status=405)
+ try:
+ # We'll filter out any characters from URl parameter values that
+ # are not in the allowlist.
+ # Note the parameter with key "params" has a serialized JSON value,
+ # so we allow braces, brackets, and quotes.
+ # Allowed chars are:
+ # -, A through Z, a through z, 0 through 9, spaces
+ # and the characters + . , _ / : * { } ( ) [ ] "
+ allowed_chars = r'-A-Za-z0-9 +.,_/:*\{\}\(\)\[\]"'
+ match_one_unallowed_char = "[^%s]" % allowed_chars
+ filtered_args = {
+ # Find every unallowed char in the value and replace it
+ # with nothing (remove it).
+ k: re.sub(match_one_unallowed_char, "", str(v))
+ for (k, v) in request.args.items()
+ }
+ # Replace the arguments with the filtered option
+ request.args = ImmutableMultiDict(filtered_args)
+ pygeoapi_request = APIRequest.from_flask(request, ["en"])
+ # Build the flask response
+ headers, status, content = callable(pygeoapi_request, *args, **kwargs)
+ response = make_response(content, status)
+ if headers:
+ response.headers = headers
+ # Check Content Disposition for attachment downloads
+ match = re.search(r'filename="?([^"]+)"?', headers.get("Content-Disposition", ""))
+ if match:
+ filename = match.group(1)
+ as_attach = True if filename.endswith("zip") or filename.endswith("tif") else False
+ return send_file(content, as_attachment=as_attach, download_name=filename)
+ else:
+ return response
+ except Exception as e:
+ logger.error("OGC: server.edr_render Exception: %s", str(e), exc_info=True)
+ ee = WCSException()
+ return respond_xml(ee.to_xml(), status=500)
+
+ return wrapper
+
class FastAPI(object):
"""
diff --git a/ogc/settings.py b/ogc/settings.py
index 1bb3a3b..a9df028 100755
--- a/ogc/settings.py
+++ b/ogc/settings.py
@@ -9,6 +9,9 @@
import os
# Settings applied around the OGC server package.
+crs_84 = "crs:84"
+crs_84_pyproj_format = "urn:ogc:def:crs:OGC:1.3:CRS84"
+epsg_4326 = "epsg:4326"
# Default/Supported WMS CRS/SRS
WMS_CRS = {
@@ -21,12 +24,17 @@
# 'epsg:3785': ... <-- this is deprecated but the same as 3857
# Apparently it lat=x lon=y from Example 2 on page 18 of the WMS version 1.3.0 spec
# http://portal.opengeospatial.org/files/?artifact_id=14416
- "epsg:4326": {"minx": -90, "miny": -180, "maxx": 90, "maxy": 180},
- "crs:84": {"minx": -180, "miny": -90, "maxx": 180, "maxy": 90},
+ epsg_4326: {"minx": -90, "miny": -180, "maxx": 90, "maxy": 180},
+ crs_84: {"minx": -180, "miny": -90, "maxx": 180, "maxy": 90},
}
WCS_CRS = {
- "epsg:4326": {"minx": -90, "miny": -180, "maxx": 90, "maxy": 180},
- "crs:84": {"minx": -180, "miny": -90, "maxx": 180, "maxy": 90},
+ epsg_4326: {"minx": -90, "miny": -180, "maxx": 90, "maxy": 180},
+ crs_84: {"minx": -180, "miny": -90, "maxx": 180, "maxy": 90},
+}
+EDR_CRS = {
+ epsg_4326: {"minx": -90.0, "miny": -180.0, "maxx": 90.0, "maxy": 180.0},
+ crs_84: {"minx": -180.0, "miny": -90.0, "maxx": 180.0, "maxy": 90.0},
+ crs_84_pyproj_format: {"minx": -180.0, "miny": -90.0, "maxx": 180.0, "maxy": 90.0},
}
# WMS Capabilities timestamp format
@@ -55,3 +63,9 @@
CLASSIFICATION = "NONE" # not used any more seemingly
PUBLIC_CONSTRAINT_STRING = "PUBLIC"
CONSTRAINTS = PUBLIC_CONSTRAINT_STRING
+
+# get EDR configuration file path
+try:
+ EDR_CONFIGURATION_PATH = os.environ["EDR_CONFIGURATION_PATH"]
+except Exception:
+ EDR_CONFIGURATION_PATH = None
diff --git a/ogc/test/test_core.py b/ogc/test/test_core.py
index bd0ac24..b7ff4bb 100644
--- a/ogc/test/test_core.py
+++ b/ogc/test/test_core.py
@@ -7,9 +7,9 @@
from ogc.wcs_response_1_0_0 import Coverage
# Create some podpac nodes
-data = np.ones((10, 10))
lat = np.linspace(90, -90, 11)
lon = np.linspace(-180, 180, 21)
+data = np.random.default_rng(1).random((11, 21))
coords = podpac.Coordinates([lat, lon], dims=["lat", "lon"])
node1 = podpac.data.Array(source=data, coordinates=coords)
node2 = podpac.data.Array(source=data, coordinates=coords)
@@ -20,6 +20,7 @@
identifier="layer1",
title="Layer 1",
abstract="Layer1 Data",
+ group="Layers",
)
layer2 = pogc.Layer(
@@ -27,6 +28,7 @@
identifier="layer2",
title="Layer 2",
abstract="Layer2 Data",
+ group="Layers",
)
diff --git a/ogc/test/test_servers.py b/ogc/test/test_servers.py
index 00c75f3..27f620c 100644
--- a/ogc/test/test_servers.py
+++ b/ogc/test/test_servers.py
@@ -18,9 +18,9 @@ def client():
A test client for the Flask server.
"""
# Create some test OGC layers
- data = np.ones((10, 10))
lat = np.linspace(90, -90, 11)
lon = np.linspace(-180, 180, 21)
+ data = np.random.default_rng(1).random((11, 21))
coords = podpac.Coordinates([lat, lon], dims=["lat", "lon"])
node1 = podpac.data.Array(source=data, coordinates=coords)
@@ -29,6 +29,7 @@ def client():
identifier="layer1",
title="Layer 1",
abstract="Layer 1 Data",
+ group="Layers",
)
# Create an OGC instance with the test layers
ogc = core.OGC(layers=[layer1])
diff --git a/pyproject.toml b/pyproject.toml
index 0b9a391..2cf8ea1 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -29,13 +29,18 @@ classifiers = [
# that you indicate whether you support Python 2, Python 3 or both.
"Programming Language :: Python 3",
]
-
+requires-python = ">= 3.10"
dependencies = [
'podpac[datatype]',
'flask',
'lxml',
'webob',
'traitlets',
+ 'pygeoapi',
+ 'shapely',
+ 'numpy',
+ 'traitlets',
+ 'werkzeug',
]
[project.optional-dependencies]
@@ -48,6 +53,7 @@ dev = [
"sphinx-autobuild",
# TESTING
"pytest",
+ "pytest-cov",
"pytest-mock",
"pytest-html",
"pytest-remotedata",
@@ -62,6 +68,9 @@ dev = [
[tool.setuptools.packages.find]
where = ["."]
+[tool.setuptools.package-data]
+ogc = ["edr/static/**/*", "edr/template/static/**/*", "edr/config/*"]
+
[tool.pytest.ini_options]
testpaths = ["ogc"]
diff --git a/sonar-project.properties b/sonar-project.properties
index 43c8211..e0b3003 100644
--- a/sonar-project.properties
+++ b/sonar-project.properties
@@ -2,5 +2,5 @@
sonar.projectKey=creare-com_ogc
sonar.organization=creare-com
sonar.qualitygate.wait=true
-sonar.python.version=3.8
+sonar.python.version=3.11
sonar.python.coverage.reportPaths=coverage.xml
\ No newline at end of file