Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17,864 changes: 8,932 additions & 8,932 deletions references/gage_info/GAGES-II.csv

Large diffs are not rendered by default.

1,342 changes: 671 additions & 671 deletions references/gage_info/camels_670.csv

Large diffs are not rendered by default.

5,906 changes: 2,953 additions & 2,953 deletions references/gage_info/dhbv2_gages.csv

Large diffs are not rendered by default.

6,424 changes: 3,212 additions & 3,212 deletions references/gage_info/gages_3000.csv

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions references/geo_io/build_gage_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
"LNG_GAGE",
"COMID",
"COMID_DRAIN_SQKM",
"COMID_UNITAREA_SQKM",
"PCT_DIFF",
"REL_ERR",
"ABS_DIFF",
]


Expand Down Expand Up @@ -91,7 +93,8 @@ def load_gages(path: Path) -> gpd.GeoDataFrame:
def spatial_join(gages: gpd.GeoDataFrame, catchments: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Assign each gage a MERIT COMID via point-in-polygon join."""
print("Performing spatial join (within)...")
joined = gpd.sjoin(gages, catchments[["COMID", "geometry"]], how="left", predicate="within")
joined = gpd.sjoin(gages, catchments[["COMID", "unitarea", "geometry"]], how="left", predicate="within")
joined = joined.rename(columns={"unitarea": "COMID_UNITAREA_SQKM"})
# sjoin adds index_right; drop it
joined = joined.drop(columns=["index_right"], errors="ignore")
matched = joined["COMID"].notna().sum()
Expand All @@ -113,9 +116,10 @@ def merge_uparea(gages: gpd.GeoDataFrame, merit_rivers_path: Path) -> gpd.GeoDat


def compute_error_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""Compute PCT_DIFF and REL_ERR between USGS drainage area and MERIT upstream area."""
"""Compute PCT_DIFF, REL_ERR, and ABS_DIFF between USGS drainage area and MERIT upstream area."""
df["PCT_DIFF"] = (df["DRAIN_SQKM"] - df["COMID_DRAIN_SQKM"]) / df["DRAIN_SQKM"] * 100
df["REL_ERR"] = (df["COMID_DRAIN_SQKM"] - df["DRAIN_SQKM"]) / df["DRAIN_SQKM"]
df["ABS_DIFF"] = (df["DRAIN_SQKM"] - df["COMID_DRAIN_SQKM"]).abs()
return df


Expand Down
63 changes: 63 additions & 0 deletions references/geo_io/patch_dhbv2_gages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Patch dhbv2_gages.csv with standard columns for consistency with GAGES-II reference files.

Adds:
- STANAME (= STAID, no station names available)
- COMID_DRAIN_SQKM (= zone_edge_uparea)
- PCT_DIFF (= drainage_area_percent_error * 100)
- REL_ERR (= -drainage_area_percent_error)
- ABS_DIFF (= abs(zone_edge_vs_gage_area_difference))
- COMID_UNITAREA_SQKM (looked up from MERIT catchments shapefile by COMID)

Usage:
python references/geo_io/patch_dhbv2_gages.py \
--csv references/gage_info/dhbv2_gages.csv \
--merit-catchments data/merit/cat_pfaf_7_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
"""

import argparse
from pathlib import Path

import geopandas as gpd
import pandas as pd


def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Patch dhbv2_gages.csv with standard columns.")
parser.add_argument("--csv", type=Path, required=True, help="Path to dhbv2_gages.csv")
parser.add_argument(
"--merit-catchments",
type=Path,
required=True,
help="Path to MERIT catchment shapefile (for unitarea lookup).",
)
return parser.parse_args()


def patch(csv_path: Path, merit_catchments_path: Path) -> None:
"""Patch the dhbv2 gage CSV with standard columns."""
df = pd.read_csv(csv_path)

# Derive standard columns from existing dhbv2-specific data
df["STANAME"] = df["STAID"].astype(str)
df["COMID_DRAIN_SQKM"] = df["zone_edge_uparea"]
df["PCT_DIFF"] = df["drainage_area_percent_error"] * 100
df["REL_ERR"] = -df["drainage_area_percent_error"]
df["ABS_DIFF"] = df["zone_edge_vs_gage_area_difference"].abs()

# Look up unitarea from MERIT catchments shapefile
print(f"Loading MERIT catchments for unitarea lookup: {merit_catchments_path}")
catchments = gpd.read_file(merit_catchments_path, columns=["COMID", "unitarea"], ignore_geometry=True)
unitarea_map = catchments.set_index("COMID")["unitarea"].to_dict()
df["COMID_UNITAREA_SQKM"] = df["COMID"].map(unitarea_map)

matched = df["COMID_UNITAREA_SQKM"].notna().sum()
print(f" {matched}/{len(df)} gages matched to unitarea")

df.to_csv(csv_path, index=False)
print(f"Wrote {len(df)} rows to {csv_path}")


if __name__ == "__main__":
args = parse_args()
patch(args.csv, args.merit_catchments)
29 changes: 28 additions & 1 deletion src/ddr/geodatazoo/lynker_hydrofabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
construct_network_matrix,
create_hydrofabric_observations,
)
from ddr.io.readers import IcechunkUSGSReader, fill_nans, naninfmean, read_ic, read_zarr
from ddr.io.readers import (
IcechunkUSGSReader,
fill_nans,
filter_gages_by_area_threshold,
naninfmean,
read_ic,
read_zarr,
)
from ddr.io.statistics import set_statistics
from ddr.validation.configs import Config
from ddr.validation.enums import Mode
Expand Down Expand Up @@ -127,6 +134,16 @@ def _init_training(self) -> None:
self.obs_reader = IcechunkUSGSReader(cfg=self.cfg)
self.observations = self.obs_reader.read_data(dates=self.dates)
self.gage_ids = np.array([str(_id.zfill(8)) for _id in self.obs_reader.gage_dict["STAID"]])
if self.cfg.experiment.max_area_diff_sqkm is not None:
self.gage_ids, n_removed = filter_gages_by_area_threshold(
self.gage_ids,
self.obs_reader.gage_dict,
self.cfg.experiment.max_area_diff_sqkm,
)
log.info(
f"Filtered {n_removed} gages exceeding area diff threshold "
f"of {self.cfg.experiment.max_area_diff_sqkm} km²"
)
self.gages_adjacency = read_zarr(Path(self.cfg.data_sources.gages_adjacency))
log.info(f"Training mode: routing for {len(self.gage_ids)} gauged locations")

Expand All @@ -142,6 +159,16 @@ def _init_inference(self) -> None:
self.obs_reader = IcechunkUSGSReader(cfg=self.cfg)
self.observations = self.obs_reader.read_data(dates=self.dates)
self.gage_ids = np.array([str(_id.zfill(8)) for _id in self.obs_reader.gage_dict["STAID"]])
if self.cfg.experiment.max_area_diff_sqkm is not None:
self.gage_ids, n_removed = filter_gages_by_area_threshold(
self.gage_ids,
self.obs_reader.gage_dict,
self.cfg.experiment.max_area_diff_sqkm,
)
log.info(
f"Filtered {n_removed} gages exceeding area diff threshold "
f"of {self.cfg.experiment.max_area_diff_sqkm} km²"
)
self.gages_adjacency = read_zarr(Path(self.cfg.data_sources.gages_adjacency))
log.info(f"Gages mode: {len(self.gage_ids)} gauged locations")
self.routing_dataclass = self._build_routing_data_gages()
Expand Down
28 changes: 27 additions & 1 deletion src/ddr/geodatazoo/merit.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
construct_network_matrix,
create_hydrofabric_observations,
)
from ddr.io.readers import IcechunkUSGSReader, fill_nans, naninfmean, read_zarr
from ddr.io.readers import (
IcechunkUSGSReader,
fill_nans,
filter_gages_by_area_threshold,
naninfmean,
read_zarr,
)
from ddr.io.statistics import set_statistics
from ddr.validation.configs import Config
from ddr.validation.enums import Mode
Expand Down Expand Up @@ -126,6 +132,16 @@ def _init_training(self) -> None:
self.obs_reader = IcechunkUSGSReader(cfg=self.cfg)
self.observations = self.obs_reader.read_data(dates=self.dates)
self.gage_ids = np.array([str(_id.zfill(8)) for _id in self.obs_reader.gage_dict["STAID"]])
if self.cfg.experiment.max_area_diff_sqkm is not None:
self.gage_ids, n_removed = filter_gages_by_area_threshold(
self.gage_ids,
self.obs_reader.gage_dict,
self.cfg.experiment.max_area_diff_sqkm,
)
log.info(
f"Filtered {n_removed} gages exceeding area diff threshold "
f"of {self.cfg.experiment.max_area_diff_sqkm} km²"
)
self.gages_adjacency = read_zarr(Path(self.cfg.data_sources.gages_adjacency))
log.info(f"Training mode: routing for {len(self.gage_ids)} gauged locations")

Expand All @@ -141,6 +157,16 @@ def _init_inference(self) -> None:
self.obs_reader = IcechunkUSGSReader(cfg=self.cfg)
self.observations = self.obs_reader.read_data(dates=self.dates)
self.gage_ids = np.array([str(_id.zfill(8)) for _id in self.obs_reader.gage_dict["STAID"]])
if self.cfg.experiment.max_area_diff_sqkm is not None:
self.gage_ids, n_removed = filter_gages_by_area_threshold(
self.gage_ids,
self.obs_reader.gage_dict,
self.cfg.experiment.max_area_diff_sqkm,
)
log.info(
f"Filtered {n_removed} gages exceeding area diff threshold "
f"of {self.cfg.experiment.max_area_diff_sqkm} km²"
)
self.gages_adjacency = read_zarr(Path(self.cfg.data_sources.gages_adjacency))
log.info(f"Gages mode: {len(self.gage_ids)} gauged locations")
self.routing_dataclass = self._build_routing_data_gages()
Expand Down
53 changes: 51 additions & 2 deletions src/ddr/io/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def convert_ft3_s_to_m3_s(flow_rates_ft3_s: np.ndarray) -> np.ndarray:
return flow_rates_ft3_s * conversion_factor


def read_gage_info(gage_info_path: Path) -> dict[str, list[str]]:
def read_gage_info(gage_info_path: Path) -> dict[str, list]:
"""Reads gage information from a specified file.

Parameters
Expand All @@ -92,7 +92,10 @@ def read_gage_info(gage_info_path: Path) -> dict[str, list[str]]:

Returns
-------
Dict[str, List[str]]: A dictionary containing the gage information.
dict[str, list]
A dictionary containing gage information. Required keys: STAID, STANAME,
DRAIN_SQKM, LAT_GAGE, LNG_GAGE. Optional keys (included when present in CSV):
COMID, COMID_DRAIN_SQKM, ABS_DIFF, COMID_UNITAREA_SQKM.

Raises
------
Expand All @@ -106,6 +109,7 @@ def read_gage_info(gage_info_path: Path) -> dict[str, list[str]]:
"LAT_GAGE",
"LNG_GAGE",
]
optional_columns = ["COMID", "COMID_DRAIN_SQKM", "ABS_DIFF", "COMID_UNITAREA_SQKM"]

try:
df = pd.read_csv(gage_info_path, delimiter=",")
Expand All @@ -124,11 +128,56 @@ def read_gage_info(gage_info_path: Path) -> dict[str, list[str]]:
for field in expected_column_names
if field in df.columns
}

for col in optional_columns:
if col in df.columns:
out[col] = df[col].values.tolist()

return out
except FileNotFoundError as e:
raise FileNotFoundError(f"File not found: {gage_info_path}") from e


def filter_gages_by_area_threshold(
gage_ids: np.ndarray,
gage_dict: dict[str, list],
threshold: float,
) -> tuple[np.ndarray, int]:
"""Filter gage IDs by absolute drainage area difference.

Parameters
----------
gage_ids : np.ndarray
Array of STAID strings
gage_dict : dict
Dict from read_gage_info() — must contain "STAID" and "ABS_DIFF"
threshold : float
Maximum absolute area difference in km²

Returns
-------
tuple[np.ndarray, int]
Filtered gage IDs and count of removed gages

Raises
------
KeyError
If gage_dict doesn't contain "ABS_DIFF" key
"""
if "ABS_DIFF" not in gage_dict:
raise KeyError("gage_dict must contain 'ABS_DIFF' key for area threshold filtering")

staid_to_abs_diff = {
str(staid): abs_diff
for staid, abs_diff in zip(gage_dict["STAID"], gage_dict["ABS_DIFF"], strict=False)
}

keep_mask = np.array([staid_to_abs_diff.get(gid, float("inf")) <= threshold for gid in gage_ids])
filtered = gage_ids[keep_mask]
n_removed = len(gage_ids) - len(filtered)
return filtered, n_removed


def naninfmean(arr: np.ndarray) -> np.floating[Any]:
"""Finds the mean of an array if there are both nan and inf values

Expand Down
5 changes: 5 additions & 0 deletions src/ddr/validation/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ class ExperimentConfig(BaseModel):
default=3,
description="Number of days excluded from loss calculation as routing starts from dry conditions",
)
max_area_diff_sqkm: float | None = Field(
default=50,
description="Maximum absolute drainage area difference (km²) between USGS gage and COMID. "
"Gages exceeding this threshold are excluded from training/evaluation. None disables filtering.",
)

@field_validator("checkpoint", mode="before")
@classmethod
Expand Down
Loading