Skip to content
124 changes: 124 additions & 0 deletions src/opera_utils/_cmr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# opera_utils/tropo.py
from __future__ import annotations

import logging
from collections.abc import Iterable
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Literal

import requests

logger = logging.getLogger("opera_utils")

# CMR short name for TROPO v1
TROPO_SHORT_NAME = "OPERA_L4_TROPO-ZENITH_V1"


Kind = Literal[
"GET DATA",
"GET DATA VIA DIRECT ACCESS",
"EXTENDED METADATA",
"GET RELATED VISUALIZATION",
"VIEW RELATED INFORMATION", # s3 temporary credential url
]


class UrlType(str, Enum):
"""Preferred data access protocol."""

HTTPS = "https"
S3 = "s3"

def __str__(self) -> str:
return self.value


def _parse_dt(s: str) -> datetime:
# CMR returns timestamps like "2016-07-01T00:00:00Z"
return datetime.strptime(s, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)


def _cmr_search(
*,
short_name: str,
start_datetime: datetime | None = None,
end_datetime: datetime | None = None,
attributes: Iterable[str] | None = None,
use_uat: bool = False,
) -> list[dict[str, Any]]:
"""Query the CMR for granules matching the product type and date range.

Parameters
----------
short_name : str
Name of CMR data set.
start_datetime : datetime, optional
The start of the temporal range in UTC.
end_datetime : datetime, optional
The end of the temporal range in UTC.
attributes : Iterable[str], optional
Filters to use in the CMR query.
use_uat : bool
Whether to use the UAT environment instead of main Earthdata endpoint.

Returns
-------
list
raw UMM granule dicts for a collection.

"""
edl_host = "uat.earthdata" if use_uat else "earthdata"
base = f"https://cmr.{edl_host}.nasa.gov/search/granules.umm_json"
page_size: int = 500

params: dict[str, Any] = {"short_name": short_name, "page_size": page_size}
if attributes:
params["attribute[]"] = list(attributes)

if start_datetime or end_datetime:
# Let CMR do initial temporal filtering
start_str = start_datetime.isoformat() if start_datetime else ""
end_str = end_datetime.isoformat() if end_datetime else ""
params["temporal"] = f"{start_str},{end_str}"

headers: dict[str, str] = {}
out: list[dict[str, Any]] = []

while True:
resp = requests.get(base, params=params, headers=headers, timeout=60)
resp.raise_for_status()
payload = resp.json()
items = payload.get("items", [])
out.extend([it.get("umm", {}) for it in items])

# pagination
sa = resp.headers.get("CMR-Search-After")
if not sa:
break
headers["CMR-Search-After"] = sa

return out


def _pick_related_url(
umm: dict[str, Any],
*,
kind: Kind,
startswith: str | None = None,
endswith: str | None = None,
) -> str | None:
for item in umm.get("RelatedUrls", []) or []:
if not isinstance(item, dict):
continue
if item.get("Type") != kind:
continue
url = item.get("URL")
if not url:
continue
if startswith and not url.startswith(startswith):
continue
if endswith and not url.endswith(endswith):
continue
return url
return None
8 changes: 4 additions & 4 deletions src/opera_utils/_cslc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from shapely import geometry, ops, wkt

try:
from isce3.core import DateTime, Orbit, StateVector

HAS_ICE3 = True
except ImportError:
HAS_ICE3 = False
Expand Down Expand Up @@ -388,9 +386,11 @@ def get_cslc_orbit(h5file: Filename):
Orbit object.

"""
if not HAS_ICE3:
try:
from isce3.core import DateTime, Orbit, StateVector # noqa: PLC0415
except ImportError as e:
msg = "isce3 must be installed to use this function"
raise ImportError(msg)
raise ImportError(msg) from e

times, positions, velocities, reference_epoch = get_orbit_arrays(h5file)
orbit_svs = []
Expand Down
6 changes: 4 additions & 2 deletions src/opera_utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ def cli_app() -> None:
try:
from opera_utils.disp._download import run_download
from opera_utils.disp._reformat import reformat_stack
from opera_utils.disp._search import search
from opera_utils.disp._search import search as search_disp

cli_dict["disp-s1-download"] = run_download
cli_dict["disp-s1-search"] = partial(search, print_urls=True)
cli_dict["disp-s1-search"] = partial(search_disp, print_urls=True)
cli_dict["disp-s1-reformat"] = reformat_stack

except ImportError:
Expand All @@ -163,7 +163,9 @@ def cli_app() -> None:
try:
from opera_utils.tropo._apply import apply_tropo
from opera_utils.tropo._crop import crop_tropo
from opera_utils.tropo._search import search as search_tropo

cli_dict["tropo-search"] = search_tropo
cli_dict["tropo-crop"] = crop_tropo
cli_dict["tropo-apply"] = apply_tropo

Expand Down
14 changes: 2 additions & 12 deletions src/opera_utils/disp/_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from collections.abc import Iterable, Iterator
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from enum import Enum
from functools import cached_property
from math import nan
from pathlib import Path
Expand All @@ -18,6 +17,7 @@
from affine import Affine
from typing_extensions import Self

from opera_utils._cmr import UrlType
from opera_utils.burst_frame_db import (
Bbox,
OrbitPass,
Expand All @@ -29,17 +29,7 @@

from ._utils import get_frame_coordinates

__all__ = ["DispProduct", "DispProductStack", "UrlType"]


class UrlType(str, Enum):
"""Choices for the orbit direction of a granule."""

S3 = "s3"
HTTPS = "https"

def __str__(self) -> str:
return str(self.value)
__all__ = ["DispProduct", "DispProductStack"]


@dataclass
Expand Down
43 changes: 13 additions & 30 deletions src/opera_utils/disp/_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import warnings
from datetime import datetime, timezone

import requests

from opera_utils._cmr import _cmr_search
from opera_utils.disp._product import DispProduct, UrlType

__all__ = ["search"]
Expand Down Expand Up @@ -59,8 +58,6 @@ def search(
List of products matching the search criteria

"""
edl_host = "uat.earthdata" if use_uat else "earthdata"
search_url = f"https://cmr.{edl_host}.nasa.gov/search/granules.umm_json"
params: dict[str, int | str | list[str]] = {
"short_name": "OPERA_L3_DISP-S1_V1",
"page_size": 500,
Expand Down Expand Up @@ -93,32 +90,18 @@ def search(
product_filters.append(f"int,FRAME_NUMBER,{frame_id}")
else:
warnings.warn("No `frame_id` specified: search may be large", stacklevel=1)

headers: dict[str, str] = {}
products: list[DispProduct] = []
while True:
response = requests.get(search_url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
cur_products = [
DispProduct.from_umm(item["umm"], url_type=url_type)
for item in data["items"]
]
# CMR filters apply to both the reference and secondary time (as of 2025-03-29)
# We want to filter just by the secondary time
products.extend(
[
g
for g in cur_products
if start_datetime <= g.secondary_datetime <= end_datetime
]
)

if "CMR-Search-After" not in response.headers:
break

headers["CMR-Search-After"] = response.headers["CMR-Search-After"]

results = _cmr_search(
short_name="OPERA_L3_DISP-S1_V1",
start_datetime=start_datetime,
end_datetime=end_datetime,
attributes=product_filters,
use_uat=use_uat,
)
products = [DispProduct.from_umm(r, url_type=url_type) for r in results]

products = [
p for p in products if start_datetime <= p.secondary_datetime <= end_datetime
]
# Return sorted list of products
products = sorted(products, key=lambda g: (g.frame_id, g.secondary_datetime))
if print_urls:
Expand Down
Loading
Loading