diff --git a/marquette/__main__.py b/marquette/__main__.py index a0ac19e..509a621 100644 --- a/marquette/__main__.py +++ b/marquette/__main__.py @@ -23,16 +23,35 @@ def main(cfg: DictConfig) -> None: """ if cfg.name.lower() == "hydrofabric": raise ImportError("Hydrofabric functionality not yet supported") + elif cfg.name.lower() == "merit_s3": + from marquette.merit_s3.create import (create_edges, create_N, create_TMs, write_streamflow) + + start = time.perf_counter() + log.info(f"Creating MERIT S3 {cfg.zone} River Graph") + edges = create_edges(cfg) + + log.info(f"Creating MERIT S3 {cfg.zone} Connectivity Matrix (N) for gages") + create_N(cfg, edges) + + log.info(f"Mapping {cfg.zone} Streamflow to S3 TMs") + create_TMs(cfg, edges) + + log.info("Converting Streamflow to S3 DataTree") + write_streamflow(cfg, edges) + + end = time.perf_counter() + log.info(f"Extracting data took : {(end - start):.6f} seconds") + elif cfg.name.lower() == "merit": from marquette.merit.create import (create_edges, create_N, create_TMs, - map_lake_points, write_streamflow) + map_lake_points, write_streamflow, run_extensions) start = time.perf_counter() log.info(f"Creating MERIT {cfg.zone} River Graph") edges = create_edges(cfg) - # log.info(f"Creating MERIT {cfg.zone} Connectivity Matrix (N) for gages") - # create_N(cfg, edges) + log.info(f"Creating MERIT {cfg.zone} Connectivity Matrix (N) for gages") + create_N(cfg, edges) log.info(f"Mapping {cfg.zone} Streamflow to TMs") create_TMs(cfg, edges) @@ -52,94 +71,5 @@ def main(cfg: DictConfig) -> None: log.error(f"incorrect name specified: {cfg.name}") -def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None: - """ - The function for running post-processing data extensions - - :param cfg: Configuration object. - :type cfg: DictConfig - :return: None - """ - if "soils_data" in cfg.extensions: - from marquette.merit.extensions import soils_data - - log.info("Adding soils information to your MERIT River Graph") - if "ksat" in edges: - log.info("soils information already exists in zarr format") - else: - soils_data(cfg, edges) - if "pet_forcing" in cfg.extensions: - from marquette.merit.extensions import pet_forcing - - log.info("Adding PET forcing to your MERIT River Graph") - if "pet" in edges: - log.info("PET forcing already exists in zarr format") - else: - pet_forcing(cfg, edges) - if "temp_mean" in cfg.extensions: - from marquette.merit.extensions import temp_forcing - - log.info("Adding temp_mean forcing to your MERIT River Graph") - if "temp_mean" in edges: - log.info("Temp_mean forcing already exists in zarr format") - else: - temp_forcing(cfg, edges) - if "global_dhbv_static_inputs" in cfg.extensions: - from marquette.merit.extensions import global_dhbv_static_inputs - - log.info("Adding global dHBV static input data to your MERIT River Graph") - if "aridity" in edges: - log.info("global_dhbv_static_inputs already exists in zarr format") - else: - global_dhbv_static_inputs(cfg, edges) - - if "incremental_drainage_area" in cfg.extensions: - from marquette.merit.extensions import \ - calculate_incremental_drainage_area - - log.info("Adding edge/catchment area input data to your MERIT River Graph") - if "incremental_drainage_area" in edges: - log.info("incremental_drainage_area already exists in zarr format") - else: - calculate_incremental_drainage_area(cfg, edges) - - if "q_prime_sum" in cfg.extensions: - from marquette.merit.extensions import calculate_q_prime_summation - - log.info("Adding q_prime_sum to your MERIT River Graph") - if "summed_q_prime" in edges: - log.info("q_prime_sum already exists in zarr format") - else: - calculate_q_prime_summation(cfg, edges) - - - if "upstream_basin_avg_mean_p" in cfg.extensions: - from marquette.merit.extensions import calculate_mean_p_summation - - log.info("Adding q_prime_sum to your MERIT River Graph") - if "upstream_basin_avg_mean_p" in edges: - log.info("upstream_basin_avg_mean_p already exists in zarr format") - else: - calculate_mean_p_summation(cfg, edges) - - # if "q_prime_sum_stats" in cfg.extensions: - # from marquette.merit.extensions import calculate_q_prime_sum_stats - - # log.info("Adding q_prime_sum statistics to your MERIT River Graph") - # if "summed_q_prime_median" in edges: - # log.info("q_prime_sum statistics already exists in zarr format") - # else: - # calculate_q_prime_sum_stats(cfg, edges) - - if "lstm_stats" in cfg.extensions: - from marquette.merit.extensions import format_lstm_forcings - - log.info("Adding lstm statistics from global LSTM to your MERIT River Graph") - if "precip_comid" in edges: - log.info("q_prime_sum statistics already exists in zarr format") - else: - format_lstm_forcings(cfg, edges) - - if __name__ == "__main__": - main() + main() # type: ignore diff --git a/marquette/conf/config.yaml b/marquette/conf/config.yaml index 9a15178..878d56d 100644 --- a/marquette/conf/config.yaml +++ b/marquette/conf/config.yaml @@ -1,37 +1,38 @@ -name: MERIT -data_path: /projects/mhpi/data/${name} -zone: 71 +name: merit_s3 +s3_path: s3://mhpi-spatial/marquette/${name}/ +data_path: /projects/mhpi/data/MERIT +zone: 73 gpu: 6 create_edges: buffer: 0.3334 dx: 2000 - edges: ${data_path}/zarr/graph/CONUS/edges/ - flowlines: ${data_path}/raw/flowlines + edges: ${s3_path}/edges/ + flowlines: ${data_path}/raw/flowlines/riv_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp create_N: run_whole_zone: False drainage_area_treshold: 0.1 filter_based_on_dataset: True - gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gnn_dataset_v1_2.shp - gage_coo_indices: ${data_path}/zarr/gage_coo_indices - pad_gage_id: False - obs_dataset: ${data_path}/gage_information/obs_csvs/GRDC_point_data.csv - obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gnn_formatted_basins.csv - zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/subzones.csv + gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gage_9322_intersection.shp + gage_coo_indices: ${s3_path}/gages/coo_pair_intersections + pad_gage_id: True + obs_dataset: ${data_path}/gage_information/obs_csvs/all_gages_info.csv + obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/all_gages_info_combined.csv + zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/${zone}_all.csv create_TMs: MERIT: save_sparse: True - TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone} + TM: ${s3_path}/TMs/sparse_MERIT_FLOWLINES_${zone} shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp create_streamflow: version: merit_conus_v6.18_snow - data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone} + data_store: ${s3_path}/streamflow/${create_streamflow.version}/${zone} obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info predictions: /projects/mhpi/yxs275/DM_output/water_loss_model/dPL_local_daymet_new_attr_RMSEloss_with_log_2800 start_date: 01-01-1980 end_date: 12-31-2020 -map_lake_points: - lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp - zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr +# map_lake_points: +# lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp +# zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr extensions: - soils_data - pet_forcing diff --git a/marquette/conf/saved_configs/v1.0config.yaml b/marquette/conf/saved_configs/v1.0config.yaml new file mode 100644 index 0000000..26d85aa --- /dev/null +++ b/marquette/conf/saved_configs/v1.0config.yaml @@ -0,0 +1,63 @@ +name: MERIT +data_path: /projects/mhpi/data/${name} +zone: 74 +gpu: 6 +create_edges: + buffer: 0.3334 + dx: 2000 + edges: ${data_path}/zarr/graph/CONUS/edges/ + flowlines: ${data_path}/raw/flowlines +create_N: + run_whole_zone: False + drainage_area_treshold: 0.1 + filter_based_on_dataset: True + gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gnn_dataset_v1_2.shp + gage_coo_indices: ${data_path}/zarr/gage_coo_indices + pad_gage_id: False + obs_dataset: ${data_path}/gage_information/obs_csvs/GRDC_point_data.csv + obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gnn_formatted_basins.csv + zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/subzones.csv +create_TMs: + MERIT: + save_sparse: True + TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone} + shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp +create_streamflow: + version: merit_conus_v6.18_snow + data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone} + obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info + predictions: /projects/mhpi/yxs275/DM_output/water_loss_model/dPL_local_daymet_new_attr_RMSEloss_with_log_2800 + start_date: 01-01-1980 + end_date: 12-31-2020 +map_lake_points: + lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp + zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr +extensions: + - soils_data + - pet_forcing + - global_dhbv_static_inputs + - incremental_drainage_area + - q_prime_sum + - upstream_basin_avg_mean_p + - q_prime_sum_stats + - lstm_stats + - temp_mean +# Hydra Config ------------------------------------------------------------------------# +hydra: + help: + app_name: marquette + header: == ${hydra.help.app_name} == + template: |- + ${hydra.help.header} + + A data pipeline tool used to generate inputs to dMC river routing + By Tadd Bindas + + ${hydra.help.footer} + footer: |- + Powered by Hydra (https://hydra.cc) + Use --hydra-help to view Hydra specific help + job: + name: ${name} + run: + dir: ../runs/${hydra.job.name}_${zone}/${now:%Y-%m-%d_%H-%M-%S} diff --git a/marquette/merit/create.py b/marquette/merit/create.py index 7143dad..9b06603 100644 --- a/marquette/merit/create.py +++ b/marquette/merit/create.py @@ -248,3 +248,92 @@ def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: else: log.info("Mapping HydroLakes Pour Points to Edges") _map_lake_points(cfg, edges) + + +def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None: + """ + The function for running post-processing data extensions + + :param cfg: Configuration object. + :type cfg: DictConfig + :return: None + """ + if "soils_data" in cfg.extensions: + from marquette.merit.extensions import soils_data + + log.info("Adding soils information to your MERIT River Graph") + if "ksat" in edges: + log.info("soils information already exists in zarr format") + else: + soils_data(cfg, edges) + if "pet_forcing" in cfg.extensions: + from marquette.merit.extensions import pet_forcing + + log.info("Adding PET forcing to your MERIT River Graph") + if "pet" in edges: + log.info("PET forcing already exists in zarr format") + else: + pet_forcing(cfg, edges) + if "temp_mean" in cfg.extensions: + from marquette.merit.extensions import temp_forcing + + log.info("Adding temp_mean forcing to your MERIT River Graph") + if "temp_mean" in edges: + log.info("Temp_mean forcing already exists in zarr format") + else: + temp_forcing(cfg, edges) + if "global_dhbv_static_inputs" in cfg.extensions: + from marquette.merit.extensions import global_dhbv_static_inputs + + log.info("Adding global dHBV static input data to your MERIT River Graph") + if "aridity" in edges: + log.info("global_dhbv_static_inputs already exists in zarr format") + else: + global_dhbv_static_inputs(cfg, edges) + + if "incremental_drainage_area" in cfg.extensions: + from marquette.merit.extensions import \ + calculate_incremental_drainage_area + + log.info("Adding edge/catchment area input data to your MERIT River Graph") + if "incremental_drainage_area" in edges: + log.info("incremental_drainage_area already exists in zarr format") + else: + calculate_incremental_drainage_area(cfg, edges) + + if "q_prime_sum" in cfg.extensions: + from marquette.merit.extensions import calculate_q_prime_summation + + log.info("Adding q_prime_sum to your MERIT River Graph") + if "summed_q_prime" in edges: + log.info("q_prime_sum already exists in zarr format") + else: + calculate_q_prime_summation(cfg, edges) + + + if "upstream_basin_avg_mean_p" in cfg.extensions: + from marquette.merit.extensions import calculate_mean_p_summation + + log.info("Adding q_prime_sum to your MERIT River Graph") + if "upstream_basin_avg_mean_p" in edges: + log.info("upstream_basin_avg_mean_p already exists in zarr format") + else: + calculate_mean_p_summation(cfg, edges) + + # if "q_prime_sum_stats" in cfg.extensions: + # from marquette.merit.extensions import calculate_q_prime_sum_stats + + # log.info("Adding q_prime_sum statistics to your MERIT River Graph") + # if "summed_q_prime_median" in edges: + # log.info("q_prime_sum statistics already exists in zarr format") + # else: + # calculate_q_prime_sum_stats(cfg, edges) + + if "lstm_stats" in cfg.extensions: + from marquette.merit.extensions import format_lstm_forcings + + log.info("Adding lstm statistics from global LSTM to your MERIT River Graph") + if "precip_comid" in edges: + log.info("q_prime_sum statistics already exists in zarr format") + else: + format_lstm_forcings(cfg, edges) diff --git a/marquette/merit_s3/_TM_calculations.py b/marquette/merit_s3/_TM_calculations.py new file mode 100644 index 0000000..ce72ae5 --- /dev/null +++ b/marquette/merit_s3/_TM_calculations.py @@ -0,0 +1,294 @@ +import logging +from pathlib import Path + +import binsparse +import geopandas as gpd +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import xarray as xr +import zarr +from omegaconf import DictConfig +from scipy import sparse +from tqdm import tqdm + +log = logging.getLogger(__name__) + + +def create_HUC_MERIT_TM( + cfg: DictConfig, edges: zarr.hierarchy.Group, gdf: gpd.GeoDataFrame +) -> None: + """ + Create a Transfer Matrix (TM) from GeoDataFrame. + + Args: + cfg (DictConfig): Hydra configuration object containing settings. + gdf (GeoDataFrame): GeoDataFrame containing geographical data. + """ + gdf = gdf.dropna(subset=["HUC10"]) + huc10_ids = gdf["HUC10"].unique() + huc10_ids.sort() + merit_ids = np.unique(edges.merit_basin[:]) # already sorted + data_array = xr.DataArray( + np.zeros((len(huc10_ids), len(merit_ids))), + dims=["HUC10", "COMID"], + coords={"HUC10": huc10_ids, "COMID": merit_ids}, + ) + for idx, huc_id in enumerate( + tqdm( + huc10_ids, + desc="creating TM", + ncols=140, + ascii=True, + ) + ): + merit_basins = gdf[gdf["HUC10"] == str(huc_id)] + total_area = merit_basins.iloc[0]["area_new"] + + for j, basin in merit_basins.iterrows(): + unit_area = basin.unitarea / total_area + data_array.loc[huc_id, basin.COMID] = unit_area + xr_dataset = xr.Dataset( + data_vars={"TM": data_array}, + coords={"HUC10": huc10_ids, "COMID": merit_ids}, + attrs={"description": "HUC10 -> MERIT Transition Matrix"}, + ) + print("Saving Zarr Data") + zarr_path = Path(cfg.create_TMs.HUC.TM) + xr_dataset.to_zarr(zarr_path, mode="w") + + +def format_pairs(gage_output: dict): + pairs = [] + for comid, edge_id in zip(gage_output["comid_idx"], gage_output["edge_id_idx"]): + for edge in edge_id: + # Check if upstream is a list (multiple connections) + if isinstance(edge, list): + for _id in edge: + # Replace None with np.NaN for consistency + if _id is None: + _id = np.NaN + pairs.append((comid, _id)) + else: + # Handle single connection (not a list) + if edge is None: + edge = np.NaN + pairs.append((comid, edge)) + + return pairs + + +def create_coo_data(sparse_matrix, root: zarr.Group): + """ + Creates coordinate format (COO) data from river graph output for a specific gage. + + This function processes the river graph data (specifically the 'ds' and 'up' arrays) + to create a list of pairs representing connections in the graph. These pairs are then + stored in a Zarr dataset within a group specific to a gage, identified by 'padded_gage_id'. + + Parameters: + gage_output: The output from a river graph traversal, containing 'ds' and 'up' keys. + padded_gage_id (str): The identifier for the gage, used to create a specific group in Zarr. + root (zarr.Group): The root Zarr group where the dataset will be stored. + + """ + values = sparse_matrix["values"] + pairs = format_pairs(sparse_matrix) + + # Create a Zarr dataset for this specific gage + root.create_dataset("pairs", data=np.array(pairs), chunks=(10000,), dtype="int32") + root.array("values", data=np.array(values), chunks=(10000,), dtype="float32") + + +def create_sparse_MERIT_FLOW_TM( + cfg: DictConfig, edges: zarr.hierarchy.Group +) -> zarr.hierarchy.Group: + """ + Creating a sparse TM that maps MERIT basins to their reaches. Flow predictions are distributed + based on reach length/ total merit reach length + :param cfg: + :param edges: + :param huc_to_merit_TM: + :return: + """ + log.info("Using Edge COMIDs for TM") + COMIDs = np.unique(edges.merit_basin[:]) # already sorted + gage_coo_root = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="a") + merit_basin = edges.merit_basin[:] + river_graph_len = edges.len[:] + river_graph = {"values": [], "comid_idx": [], "edge_id_idx": []} + for comid_idx, basin_id in enumerate( + tqdm( + COMIDs, + desc="Creating a sparse TM Mapping MERIT basins to their edges", + ncols=140, + ascii=True, + ) + ): + col_indices = np.where(merit_basin == basin_id)[0] + total_length = np.sum(river_graph_len[col_indices]) + if total_length == 0: + print("Basin not found:", basin_id) + continue + proportions = river_graph_len[col_indices] / total_length + river_graph["comid_idx"].append(comid_idx) + river_graph["edge_id_idx"].append(col_indices.tolist()) + river_graph["values"].extend(proportions.tolist()) + create_coo_data(river_graph, gage_coo_root) + + +def create_MERIT_FLOW_TM( + cfg: DictConfig, edges: zarr.hierarchy.Group +) -> zarr.hierarchy.Group: + """ + Creating a TM that maps MERIT basins to their reaches. Flow predictions are distributed + based on reach length/ total merit reach length + :param cfg: + :param edges: + :param huc_to_merit_TM: + :return: + """ + # if cfg.create_TMs.MERIT.use_streamflow: + # log.info("Using Streamflow COMIDs for TM") + # streamflow_predictions_root = zarr.open( + # Path(cfg.create_streamflow.predictions), mode="r" + # ) + # comids: np.ndarray = streamflow_predictions_root.COMID[:] + # sorted_indices = np.argsort(comids) + # COMIDs = comids[sorted_indices].astype(int) + # else: + log.info("Using Edge COMIDs for TM") + COMIDs = np.unique(edges.merit_basin[:]) # already sorted + river_graph_ids = edges.id[:] + merit_basin = edges.merit_basin[:] + river_graph_len = edges.len[:] + + # indices = np.zeros((len(COMIDs), len(river_graph_ids)), dtype=np.float64) + # for i, basin_id in tqdm(enumerate(COMIDs), total=len(COMIDs), ncols=140, ascii=True, desc="reading idx"): + # mask = merit_basin == basin_id + # indices[i] = mask.astype(np.float64) + + # # Calculate the number of non-zero elements for each row in indices + # num_non_zeros = indices.sum(axis=1) + + # proportions = np.transpose(indices) / num_non_zeros + + # tm = np.transpose(proportions) + + data_np = np.zeros((len(COMIDs), len(river_graph_ids))) + for i, basin_id in enumerate( + tqdm( + COMIDs, + desc="Processing River flowlines", + ncols=140, + ascii=True, + ) + ): + indices = np.where(merit_basin == basin_id)[0] + + total_length = np.sum(river_graph_len[indices]) + if total_length == 0: + print("Basin not found:", basin_id) + continue + proportions = river_graph_len[indices] / total_length + for idx, proportion in zip(indices, proportions): + column_index = np.where(river_graph_ids == river_graph_ids[idx])[0][0] + data_np[i][column_index] = proportion + + if cfg.create_TMs.MERIT.save_sparse: + log.info("Writing to sparse matrix") + gage_coo_root = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="a") + matrix = sparse.csr_matrix(data_np) + binsparse.write(gage_coo_root, "TM", matrix) + log.info("Sparse matrix written") + else: + data_array = xr.DataArray( + data=data_np, + dims=["COMID", "EDGEID"], # Explicitly naming the dimensions + coords={"COMID": COMIDs, "EDGEID": river_graph_ids}, # Adding coordinates + ) + xr_dataset = xr.Dataset( + data_vars={"TM": data_array}, + attrs={"description": "MERIT -> Edge Transition Matrix"}, + ) + log.info("Writing MERIT TM to zarr store") + zarr_path = Path(cfg.create_TMs.MERIT.TM) + xr_dataset.to_zarr(zarr_path, mode="w") + # zarr_hierarchy = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="r") + + +def join_geospatial_data(cfg: DictConfig) -> gpd.GeoDataFrame: + """ + Joins two geospatial datasets based on the intersection of centroids of one dataset with the geometries of the other. + + Args: + huc10_path (str): File path to the HUC10 shapefile. + basins_path (str): File path to the basins shapefile. + + Returns: + gpd.GeoDataFrame: The resulting joined GeoDataFrame. + """ + huc10_gdf = gpd.read_file(Path(cfg.create_TMs.HUC.shp_files)).to_crs(epsg=4326) + basins_gdf = gpd.read_file(Path(cfg.create_TMs.MERIT.shp_files)) + basins_gdf["centroid"] = basins_gdf.geometry.centroid + joined_gdf = gpd.sjoin( + basins_gdf.set_geometry("centroid"), + huc10_gdf, + how="left", + predicate="intersects", + ) + joined_gdf.set_geometry("geometry", inplace=True) + return joined_gdf + + +def plot_histogram(df: pd.DataFrame, num_bins: int = 100) -> None: + """ + Creates and displays a histogram for the sum of values in each row of the provided DataFrame. + + Args: + df (pd.DataFrame): A Pandas DataFrame whose row sums will be used for the histogram. + num_bins (int, optional): The number of bins for the histogram. Defaults to 100. + + The function calculates the minimum, median, mean, and maximum values of the row sums + and displays these as vertical lines on the histogram. + """ + series = df.sum(axis=1) + plt.figure(figsize=(10, 6)) + series.hist(bins=num_bins) + plt.xlabel(r"Ratio of $\sum$ MERIT basin area to HUC10 basin areas") + plt.ylabel("Number of HUC10s") + plt.title(r"Distribution of $\sum$ MERIT area / HUC10 basin area") + min_val = series.min() + median_val = series.median() + mean_val = series.mean() + max_val = series.max() + plt.axvline( + min_val, + color="grey", + linestyle="dashed", + linewidth=2, + label=f"Min: {min_val:.3f}", + ) + plt.axvline( + median_val, + color="blue", + linestyle="dashed", + linewidth=2, + label=f"Median: {median_val:.3f}", + ) + plt.axvline( + mean_val, + color="red", + linestyle="dashed", + linewidth=2, + label=f"Mean: {mean_val:.3f}", + ) + plt.axvline( + max_val, + color="green", + linestyle="dashed", + linewidth=2, + label=f"Max: {max_val:.3f}", + ) + plt.legend() + plt.show() diff --git a/marquette/merit_s3/__init__.py b/marquette/merit_s3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/marquette/merit_s3/_connectivity_matrix.py b/marquette/merit_s3/_connectivity_matrix.py new file mode 100644 index 0000000..9ab906b --- /dev/null +++ b/marquette/merit_s3/_connectivity_matrix.py @@ -0,0 +1,413 @@ +import ast +import logging +from pathlib import Path +from typing import Any, List, Tuple + +import dask.dataframe as dd +import geopandas as gpd +import numpy as np +import pandas as pd +import zarr +from dask.diagnostics import ProgressBar +from omegaconf import DictConfig +from tqdm import tqdm + +log = logging.getLogger(__name__) + + +def format_pairs(gage_output: dict): + pairs = [] + + # Iterate over downstream and upstream nodes to create pairs + for ds, ups in zip(gage_output["ds"], gage_output["up"]): + for up in ups: + # Check if upstream is a list (multiple connections) + if isinstance(up, list): + for _id in up: + # Replace None with np.NaN for consistency + if _id is None: + _id = np.NaN + pairs.append((ds, _id)) + else: + # Handle single connection (not a list) + if up is None: + up = np.NaN + pairs.append((ds, up)) + + return pairs + + +def left_pad_number(number): + """ + Left pads a number with '0' if it has 7 digits to make it 8 digits long. + + Parameters: + number (int or str): The number to be left-padded. + + Returns: + str: The left-padded number as a string. + """ + number_str = str(number) + if len(number_str) == 7: + return "0" + number_str + return number_str + + +def map_gages_to_zone(cfg: DictConfig, edges: zarr.Group) -> gpd.GeoDataFrame: + def choose_row_to_keep(group_df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + """ + Selects the row where 'uparea' is closest to 'DRAIN_SQKM' without going below it. + If no such row exists, selects the row with the closest 'uparea' under 'DRAIN_SQKM'. + + Parameters: + group_df (DataFrame): DataFrame representing all rows of a particular group. + + Returns: + DataFrame: A single row which has the closest 'uparea' to 'DRAIN_SQKM'. + """ + group_df["diff"] = group_df["uparea"] - group_df["DRAIN_SQKM"] + valid_rows = group_df[group_df["diff"] >= 0] + + if not valid_rows.empty: + idx = valid_rows["diff"].idxmin() + else: + idx = group_df["diff"].abs().idxmin() + + return group_df.loc[[idx]].drop(columns=["diff"]) + + def filter_by_comid_prefix(gdf: gpd.GeoDataFrame, prefix: str) -> gpd.GeoDataFrame: + """ + Filters a GeoDataFrame to include only rows where the first two characters + of the 'COMID' column match the given prefix. + + Parameters: + gdf (GeoDataFrame): The GeoDataFrame to filter. + prefix (str): The two-character prefix to match. + + Returns: + GeoDataFrame: Filtered GeoDataFrame. + """ + gdf["MERIT_ZONE"] = gdf["COMID"].astype(str).str[:2] + filtered_gdf = gdf[gdf["MERIT_ZONE"] == str(prefix)] + return filtered_gdf + + def find_closest_edge( + row: gpd.GeoSeries, + zone_edge_ids: np.ndarray, + zone_merit_basin_ids: np.ndarray, + zone_upstream_areas: np.ndarray, + ): + """ + Finds details of the edge with the upstream area closest to the DRAIN_SQKM value in absolute terms + and calculates the percent error between DRAIN_SQKM and the matched zone_edge_uparea. + + Parameters: + row (GeoSeries): A row from the result_df GeoDataFrame. + zone_edge_ids (ndarray): Array of edge IDs. + zone_merit_basin_ids (ndarray): Array of merit basin IDs corresponding to edge IDs. + zone_upstream_areas (ndarray): Array of upstream areas corresponding to edge IDs. + + Returns: + Tuple[np.float64, int, np.float64, np.float64, np.float64]: Contains edge ID, index of the matching edge, upstream area of the matching edge, + the difference in catchment area, and the percent error between DRAIN_SQKM and zone_edge_uparea. + """ + COMID = row["COMID"] + DRAIN_SQKM = row["DRAIN_SQKM"] + matching_indices = np.where(zone_merit_basin_ids == COMID)[0] + + if len(matching_indices) == 0: + return ( + np.nan, + np.nan, + np.nan, + np.nan, + np.nan, + np.nan, + ) # Return NaNs if no matching edges are found + + matching_upstream_areas = zone_upstream_areas[matching_indices] + differences = np.abs(matching_upstream_areas - DRAIN_SQKM) + min_diff_idx = np.argmin(differences) + percent_error = ( + (differences[min_diff_idx] / DRAIN_SQKM) if DRAIN_SQKM != 0 else np.nan + ) + ratio = np.abs(matching_upstream_areas / DRAIN_SQKM) + + return ( + zone_edge_ids[matching_indices[min_diff_idx]], + matching_indices[min_diff_idx], + matching_upstream_areas[min_diff_idx], + differences[min_diff_idx], + percent_error, + ratio, + ) + + gdf = gpd.read_file(Path(cfg.create_N.gage_buffered_flowline_intersections)) + if cfg.create_N.filter_based_on_dataset: + # filter based on large-list of gage_locations + gage_locations_df = pd.read_csv(cfg.create_N.obs_dataset) + try: + if cfg.create_N.pad_gage_id: + gage_ids = ( + gage_locations_df["id"].astype(str).apply(lambda x: x.zfill(8)) + ) + else: + gage_ids = gage_locations_df["id"] + except KeyError: + if cfg.create_N.pad_gage_id: + gage_ids = ( + gage_locations_df["STAT_ID"].astype(str).apply(lambda x: x.zfill(8)) + ) + else: + gage_ids = gage_locations_df["STAT_ID"] + gdf = gdf[gdf["STAID"].isin(gage_ids)] + gdf["COMID"] = gdf["COMID"].astype(int) + filtered_gdf = filter_by_comid_prefix(gdf, cfg.zone) + if len(filtered_gdf) == 0: + log.info("No MERIT BASINS within this region") + return False + grouped = filtered_gdf.groupby("STAID") + unique_gdf = grouped.apply(choose_row_to_keep).reset_index(drop=True) + zone_edge_ids = edges.id[:] + zone_merit_basin_ids = edges.merit_basin[:] + zone_upstream_areas = edges.uparea[:] + edge_info = unique_gdf.apply( + lambda row: find_closest_edge( + row, zone_edge_ids, zone_merit_basin_ids, zone_upstream_areas + ), + axis=1, + result_type="expand", + ) + unique_gdf["edge_intersection"] = edge_info[0] # TODO check if the data is empty + unique_gdf["zone_edge_id"] = edge_info[1] + unique_gdf["zone_edge_uparea"] = edge_info[2] + unique_gdf["zone_edge_vs_gage_area_difference"] = edge_info[3] + unique_gdf["drainage_area_percent_error"] = edge_info[4] + unique_gdf["a_merit_a_usgs_ratio"] = edge_info[5] + + result_df = unique_gdf[ + unique_gdf["drainage_area_percent_error"] <= cfg.create_N.drainage_area_treshold + ] + + try: + result_df["LNG_GAGE"] = result_df["LON_GAGE"] + result_df["LAT_GAGE"] = result_df["Latitude"] + except KeyError: + pass + if "HUC02" not in result_df.columns: + result_df["HUC02"] = 0 + + columns = [ + "STAID", + # "STANAME", + # "MERIT_ZONE", + "HUC02", + "DRAIN_SQKM", + "LAT_GAGE", + "LNG_GAGE", + # "STATE", + "COMID", + "edge_intersection", + "zone_edge_id", + "zone_edge_uparea", + "zone_edge_vs_gage_area_difference", + "drainage_area_percent_error", + "a_merit_a_usgs_ratio", + ] + result = result_df[columns] + result = result.dropna() + result["STAID"] = result["STAID"].astype(int) + result.to_csv(Path(cfg.create_N.zone_obs_dataset), index=False) + + # combining this zone df to the master df that has all zonal information + try: + df = pd.read_csv(Path(cfg.create_N.obs_dataset_output)) + + try: + combined_df = pd.concat([df, result], ignore_index=True) + sorted_df = combined_df.sort_values(by=["STAID"]) + sorted_df.to_csv(Path(cfg.create_N.obs_dataset_output), index=False) + except pd.errors.InvalidIndexError: + log.info( + "Not merging your file with the master list as it seems like your gages are already included" + ) + except FileNotFoundError: + result.to_csv(Path(cfg.create_N.obs_dataset_output), index=False) + return result + + +def create_gage_connectivity( + cfg: DictConfig, + edges: zarr.Group, + gage_coo_root: zarr.Group, + zone_csv: pd.DataFrame | gpd.GeoDataFrame, +) -> None: + def stack_traversal(gage_id: str, id_: str, idx: int, merit_flowlines: zarr.Group): + """ + Performs a stack-based traversal on a graph of river flowlines. + + This function uses a depth-first search approach to traverse through the graph + represented by river flowlines. Starting from a given node (identified by 'id_' and 'idx'), + it explores the upstream flowlines, constructing a graph structure that includes + information about each node and its upstream connections. + + Parameters: + id_ (str): The identifier of the starting node in the river graph. + idx (int): The index of the starting node in the 'merit_flowlines' dataset. + merit_flowlines (zarr.Group): A Zarr group containing river flowline data. + Expected to have 'up' and 'id' arrays for upstream connections + and node identifiers, respectively. + + Returns: + dict: A dictionary representing the traversed river graph. It contains three keys: + 'ID' - a list of node identifiers, + 'ds' - a list of indices corresponding to the downstream node for each ID, + 'up' - a list of lists, where each sublist contains indices of upstream nodes. + + Note: + The function prints the stack size after every 1000 iterations for tracking the traversal progress. + """ + river_graph = {"ID": [], "ds": [], "up": []} + stack = [(id_, idx)] + visited = set() + iteration_count = 1 + + while stack: + if iteration_count % 1000 == 0: + print( + f"Gage{gage_id}, ID, IDX {id_, idx}: Stack size is {len(stack)}, river graph len: {len(river_graph['ID'])}" + ) + + current_id, current_idx = stack.pop() + if current_id in visited: + iteration_count += 1 + continue + visited.add(current_id) + + up_ids = ast.literal_eval(merit_flowlines.up[current_idx]) + up_idx_list = [] + + for up_id in up_ids: + up_idx = np.where(merit_flowlines.id[:] == up_id)[0][0] + if up_id not in visited: + stack.append((up_id, up_idx)) + up_idx_list.append(up_idx) + + river_graph["ID"].append(current_id) + river_graph["ds"].append(current_idx) + river_graph["up"].append(up_idx_list if up_idx_list else [None]) + + iteration_count += 1 + + return river_graph + + def create_coo_data( + gage_output, _gage_id: str, root: zarr.Group + ) -> List[Tuple[Any, Any]]: + """ + Creates coordinate format (COO) data from river graph output for a specific gage. + + This function processes the river graph data (specifically the 'ds' and 'up' arrays) + to create a list of pairs representing connections in the graph. These pairs are then + stored in a Zarr dataset within a group specific to a gage, identified by 'padded_gage_id'. + + Parameters: + gage_output: The output from a river graph traversal, containing 'ds' and 'up' keys. + padded_gage_id (str): The identifier for the gage, used to create a specific group in Zarr. + root (zarr.Group): The root Zarr group where the dataset will be stored. + + Returns: + List[Tuple[Any, Any]]: A list of tuples, each representing a pair of connected nodes in the graph. + """ + pairs = format_pairs(gage_output) + + # Create a Zarr dataset for this specific gage + single_gage_csr_data = root.require_group(_gage_id) + single_gage_csr_data.create_dataset( + "pairs", data=np.array(pairs), chunks=(10000,), dtype="float32" + ) + + def find_connections(row, coo_root, zone_attributes, _pad_gage_id=True): + if _pad_gage_id: + _gage_id = str(row["STAID"]).zfill(8) + else: + _gage_id = str(object=row["STAID"]) + edge_id = row["edge_intersection"] + zone_edge_id = row["zone_edge_id"] + if _gage_id not in coo_root: + gage_output = stack_traversal( + _gage_id, edge_id, zone_edge_id, zone_attributes + ) + create_coo_data(gage_output, _gage_id, coo_root) + + def apply_find_connections(row, gage_coo_root, edges, pad_gage_id): + return find_connections(row, gage_coo_root, edges, pad_gage_id) + + pad_gage_id = cfg.create_N.pad_gage_id + dask_df = dd.from_pandas(zone_csv, npartitions=16) + result = dask_df.apply( + apply_find_connections, + args=(gage_coo_root, edges, pad_gage_id), + axis=1, + meta=(None, "object"), + ) + with ProgressBar(): + _ = result.compute() + + +def new_zone_connectivity( + cfg: DictConfig, + edges: zarr.Group, + full_zone_root: zarr.Group, +) -> None: + def find_connection(edges: zarr.Group, mapping: dict) -> dict: + """ + Performs a traversal on a graph of river flowlines, represented by a NumPy array of edges. + This function iterates over each edge and explores its upstream connections, constructing a graph structure. + + Parameters: + gage_id (str): Identifier for the gage being processed. + edges (np.ndarray): A NumPy array containing river flowline data. Each element is expected to have 'id' and 'up' attributes. + mapping (dict): A dictionary mapping node IDs to their indices in the edges array. + + Returns: + dict: A dictionary representing the traversed river graph for each edge. It contains three keys: + 'ID' - a list of node identifiers, + 'idx' - a list of indices corresponding to the nodes, + 'up' - a list of lists, where each sublist contains indices of upstream nodes. + """ + river_graph = {"ds": [], "up": []} + for idx in tqdm( + range(edges.id.shape[0]), + desc="Looping through edges", + ascii=True, + ncols=140, + ): + river_graph["ds"].append(idx) + + # Decode upstream ids and convert to indices + upstream_ids = ast.literal_eval(edges.up[idx]) + if len(upstream_ids) == 0: + river_graph["up"].append([None]) + else: + upstream_indices = [mapping.get(up_id, None) for up_id in upstream_ids] + river_graph["up"].append(upstream_indices) + return river_graph + + mapping = {id_: i for i, id_ in enumerate(edges.id[:])} + # bag = db.from_sequence(range(edges.id.shape[0]), npartitions=100) # Adjust the number of partitions as needed + # results = bag.map(lambda idx: find_connection(idx, edges, mapping)) + # with ProgressBar(): + # traversed_graphs = results.compute() + river_graph = find_connection(edges, mapping) + + # combined_graph = {"ds": [], "up": []} + # for graph in traversed_graphs: + # combined_graph["ds"].extend(graph["ds"]) + # combined_graph["up"].extend(graph["up"]) + + pairs = format_pairs(river_graph) + + full_zone_root.create_dataset( + "pairs", data=np.array(pairs), chunks=(5000, 5000), dtype="float32" + ) diff --git a/marquette/merit_s3/_edge_calculations.py b/marquette/merit_s3/_edge_calculations.py new file mode 100644 index 0000000..4db39c8 --- /dev/null +++ b/marquette/merit_s3/_edge_calculations.py @@ -0,0 +1,456 @@ +import ast +import logging +import re +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import geopandas as gpd +import numpy as np +import pandas as pd +import xarray as xr +from omegaconf import DictConfig +from tqdm import tqdm + +log = logging.getLogger(__name__) + + +def calculate_drainage_area_for_all_edges(edges, segment_das): + num_edges = len(edges) + up_ids = edges[0]["up"] + if up_ids: + for idx, edge in enumerate(edges): + try: + prev_up_area = sum(segment_das[seg] for seg in edge["up_merit"]) + except KeyError: + edge["up_merit"] = ast.literal_eval(edge["up_merit"]) + prev_up_area = sum(segment_das[seg] for seg in edge["up_merit"]) + area_difference = edge["uparea"] - prev_up_area + even_distribution = area_difference / num_edges + edge["uparea"] = prev_up_area + even_distribution * (idx + 1) + else: + total_uparea = edges[0]["uparea"] + even_distribution = total_uparea / num_edges + for idx, edge in enumerate(edges): + edge["uparea"] = even_distribution * (idx + 1) + return edges + + +def calculate_num_edges(length: float, dx: float, buffer: float) -> Tuple: + """ + Calculate the number of edges and the length of each edge for a given segment. + + This function determines the number of edges a segment should be divided into, + based on its length, a desired edge length (dx), and a tolerance (buffer). + The function adjusts the number of edges to ensure that the deviation of the + actual edge length from dx is within the specified buffer. + + Parameters + ---------- + length : float + The length of the segment for which to calculate the number of edges. + dx : float + The desired length of each edge. + buffer : float + The acceptable deviation from the desired edge length (dx). + + Returns + ------- + tuple + A tuple containing two elements: + - The first element is an integer representing the number of edges. + - The second element is a float representing the actual length of each edge. + + Examples + -------- + >> calculate_num_edges(100, 30, 5) + (3, 33.333333333333336) + + >> calculate_num_edges(100, 25, 2) + (4, 25.0) + """ + num_edges = length // dx + if num_edges == 0: + num_edges = 1 + if dx - length < buffer: + edge_len = length + else: + edge_len = dx + else: + edge_len = length / num_edges + buf_dev = edge_len - dx + while abs(buf_dev) > buffer: + if buf_dev > dx: + num_edges -= 1 + else: + num_edges += 1 + edge_len = length / num_edges + buf_dev = edge_len - dx + return (int(num_edges), edge_len) + + +def create_edge_json( + segment_row: pd.Series, up=None, ds=None, edge_id=None +) -> Dict[str, Any]: + """ + Create a JSON representation of an edge based on segment data. + + Parameters + ---------- + segment_row : pandas.Series + A series representing a row from the segment DataFrame. + up : list, optional + List of upstream segment IDs. + ds : str, optional + Downstream segment ID. + edge_id : str, optional + Unique identifier for the edge. + + Returns + ------- + dict + Dictionary representing the edge with various attributes. + """ + edge = { + "id": edge_id, + "merit_basin": segment_row["id"], + "segment_sorting_index": segment_row["index"], + "order": segment_row["order"], + "len": segment_row["len"], + "len_dir": segment_row["len_dir"], + "ds": ds, + "up": up, + "up_merit": segment_row["up"], + "slope": segment_row["slope"], + "sinuosity": segment_row["sinuosity"], + "stream_drop": segment_row["stream_drop"], + "uparea": segment_row["uparea"], + "coords": segment_row["coords"], + "crs": segment_row["crs"], + } + return edge + + +def create_segment(row: pd.Series, crs: Any, dx: int, buffer: float) -> Dict[str, Any]: + """ + Create a dictionary representation of a segment using its row data. + + This function is a wrapper that calls 'create_segment_dict' by passing the + geometry of the segment along with other attributes. It simplifies the creation + of a segment dictionary from a DataFrame row. + + Parameters + ---------- + row : pandas.Series + A series representing a row from a DataFrame containing segment data. + crs : Any + Coordinate reference system of the segment. + dx : int + Desired length of each edge in the segment (used in further calculations). + buffer : float + Buffer tolerance for edge length calculation. + + Returns + ------- + dict + Dictionary containing segment attributes. + """ + return dict(create_segment_dict(row, row.geometry, crs, dx, buffer)) + + +def string_to_dict_builder(input_str, crs_info): + """ + Convert a string representation of a dictionary to an actual dictionary + using a modular approach for different sections. + + Parameters: + input_str (str): The string to be converted. + crs_info (str): The CRS information to be inserted. + + Returns: + dict: The resulting dictionary. + """ + + def handle_list(section): + """ + Handles the parsing of list structures. + """ + try: + return ast.literal_eval(section.strip()) + except Exception as e: + return f"Error parsing list: {e}" + + result_dict = {} + # Using regex to extract key-value pairs + pattern = r"'([^']+)':\s*((?:\[.*?\]|<.*?>|'.*?'|[^,]+)*)" + matches = re.findall(pattern, input_str) + + for key, value in matches: + key = key.strip() + if key == "crs": + result_dict[key] = crs_info + elif key == "coords": + result_dict[key] = value.strip().strip("'") + else: + result_dict[key] = handle_list(value) + + return result_dict + + +def create_segment_dict( + row: pd.Series, + segment_coords: List[Tuple[float, float]], + crs: Any, + dx: int, + buffer: float, +) -> Dict[str, Any]: + """ + Create a dictionary representation of a segment with various attributes. + + This function constructs a dictionary for a river segment based on provided + attributes. It includes details such as segment ID, order, length, downstream + ID, slope, sinuosity, stream drop, upstream area, coordinates, and CRS. + + Parameters + ---------- + row : pandas.Series + A series representing a row from a DataFrame containing segment data. + segment_coords : List[Tuple[float, float]] + List of tuples representing coordinates of the segment. + crs : Any + Coordinate reference system of the segment. + dx : int + Desired length of each edge in the segment (used in further calculations). + buffer : float + Buffer tolerance for edge length calculation. + + Returns + ------- + Dict[str, Any] + Dictionary containing segment attributes. + """ + segment_dict = { + "id": row["COMID"], + "order": row["order"], + "len": row["lengthkm"] * 1000, # to meters + "len_dir": row["lengthdir"] * 1000, # to meters + "ds": row["NextDownID"], + # 'is_headwater': False, + "up": [row[key] for key in ["up1", "up2", "up3", "up4"] if row[key] != 0] + if row["maxup"] > 0 + else ([] if row["order"] == 1 else []), + "slope": row["slope"], + "sinuosity": row["sinuosity"], + "stream_drop": row["strmDrop_t"], + "uparea": row["uparea"], + "coords": segment_coords, + "crs": crs, + } + + return segment_dict + + +def get_upstream_ids(row: pd.Series, edge_info: dict): + """ + Generate upstream IDs for a segment. + + Parameters + ---------- + row : pandas.Series + A series representing a row from the segment DataFrame. + edge_info : dict + The number of edges associated with the segment. + + Returns + ------- + list + List of upstream segment IDs. + """ + up_ids = [] + if row["up"] is None: + return up_ids + try: + if isinstance(row["up"], str): + row["up"] = ast.literal_eval(row["up"]) + for id in row["up"]: + num_edges, _ = edge_info[id] + up_ids.append(f"{id}_{num_edges - 1}") + except KeyError: + log.error(f"KeyError with segment {row['id']}") + return [] + return up_ids + + +def many_segment_to_edge_partition( + df: pd.DataFrame, + edge_info: Dict[str, Any], + num_edge_dict: Dict[str, Any], + segment_das: Dict[str, float], +) -> pd.DataFrame: + """ + Process a DataFrame partition to create edges for segments with multiple edges. + + This function iterates over each segment in the DataFrame partition, computes + the edge length, upstream IDs, and creates a JSON representation for each edge. + It is specifically designed for segments that have multiple edges. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame partition containing segment data. + edge_info : dict + Dictionary containing information about the number of edges and edge length + for each segment. + segment_das : dict + Dictionary containing drainage area data for each segment. + + Returns + ------- + pandas.DataFrame + DataFrame containing edge data for all segments in the partition. + """ + all_edges = [] + for _, segment in tqdm( + df.iterrows(), + total=len(df), + desc="Processing Segments", + ncols=140, + ascii=True, + ): + all_segment_edges = [] + num_edges, edge_len = edge_info[segment["id"]] + up_ids = get_upstream_ids(segment, num_edge_dict) + for i in range(num_edges): + if i == 0: + edge = create_edge_json( + segment, + up=up_ids, + ds=f"{segment['id']}_{i + 1}", + edge_id=f"{segment['id']}_{i}", + ) + else: + edge = create_edge_json( + segment, + up=[f"{segment['id']}_{i - 1}"], + ds=f"{segment['id']}_{i + 1}" + if i < num_edges - 1 + else f"{segment['ds']}_0", + edge_id=f"{segment['id']}_{i}", + ) + edge["len"] = edge_len + edge["len_dir"] = edge_len / segment["sinuosity"] + all_segment_edges.append(edge) + all_segment_edges = calculate_drainage_area_for_all_edges( + all_segment_edges, segment_das + ) + for edge in all_segment_edges: + all_edges.append(edge) + return pd.DataFrame(all_edges) + + +def singular_segment_to_edge_partition( + df: pd.DataFrame, + edge_info: Dict[str, Any], + num_edge_dict: Dict[str, Any], + segment_das: Dict[str, float], +) -> pd.DataFrame: + """ + Process a DataFrame partition to create edges for each segment. + + This function iterates over each segment in the DataFrame, computes the edge + length, upstream IDs, and creates JSON representation of each edge. It handles + segments that are associated with only one edge. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame partition containing segment data. + edge_info : dict + Dictionary containing edge information for each segment. + segment_das : dict + Dictionary containing drainage area data for each segment. + + Returns + ------- + pandas.DataFrame + DataFrame containing edge data for all segments in the partition. + """ + all_edges = [] + for _, segment in tqdm( + df.iterrows(), + total=len(df), + ncols=140, + ascii=True, + ): + __, edge_len = edge_info[segment["id"]] + up_ids = get_upstream_ids(segment, num_edge_dict) + edge = create_edge_json( + segment, + up=up_ids, + ds=f"{segment['ds']}_0", + edge_id=f"{segment['id']}_0", + ) + edge["len"] = edge_len + edge["len_dir"] = edge_len / segment["sinuosity"] + all_edges.append(edge) + return pd.DataFrame(all_edges) + + +def _plot_gdf(gdf: gpd.GeoDataFrame) -> None: + """ + A function to find the correct flowline of all MERIT basins using glob + + Parameters + ---------- + gdf : gpd.GeoDataFrame + The geodataframe you want to plot + + Returns + ------- + None + + Raises + ------ + None + """ + import matplotlib.pyplot as plt + + fig, ax = plt.subplots(figsize=(10, 10)) + gdf.plot(ax=ax) + ax.set_title("Polyline Plot") + ax.set_xlabel("Longitude") + ax.set_ylabel("Latitude") + plt.show() + + +def sort_based_on_keys(array_to_sort, keys, segment_sorted_index): + """ + Sort 'array_to_sort' based on the order defined in 'keys'. + For each key, find rows in 'segment_sorted_index' where this value occurs. + If there are multiple occurrences, sort these rows further by ID. + + Args: + array_to_sort: The array to be sorted. + keys: The array of keys to sort by. + segment_sorted_index: The index array to match keys against. + + Returns: + A sorted version of 'array_to_sort'. + """ + sorted_array = [] + for key in tqdm( + keys, + ncols=140, + ascii=True, + ): + matching_indices = np.where(segment_sorted_index == key)[0] + if len(matching_indices) > 1: + sorted_indices = np.sort(matching_indices) + else: + sorted_indices = matching_indices + sorted_array.extend(array_to_sort[sorted_indices]) + return np.array(sorted_array) + + +def sort_xarray_dataarray(da, keys, segment_sorted_index): + sorted_data = sort_based_on_keys(da.values, keys, segment_sorted_index) + return xr.DataArray(sorted_data, dims=da.dims, coords=da.coords) diff --git a/marquette/merit_s3/_graph.py b/marquette/merit_s3/_graph.py new file mode 100644 index 0000000..592746d --- /dev/null +++ b/marquette/merit_s3/_graph.py @@ -0,0 +1,337 @@ +import logging +from pathlib import Path + +import geopandas as gpd +import pandas as pd +from omegaconf import DictConfig +from shapely.geometry import LineString, MultiLineString, Point +from tqdm import tqdm + +log = logging.getLogger(__name__) + + +class Edge: + def __init__(self, segment, up=None, ds=None, edge_id=None): + self.id = edge_id + self.merit_basin = segment.id + self.order = segment.order + self.len = segment.len + self.len_dir = segment.len_dir + self.ds = ds + self.is_headwater = segment.is_headwater + self.up = up + # if self.up is not None: + # self.up = [x for x in up if x != 0] + self.slope = segment.slope + self.sinuosity = segment.sinuosity + self.stream_drop = segment.stream_drop + self.uparea = segment.uparea + self.coords = segment.coords + self.crs = segment.crs + self.segment = segment + + def convert_coords_to_wgs84(self): + coords = self.coords + source_crs = "EPSG:32618" + target_crs = "EPSG:4326" # WGS84 + gdf = gpd.GeoDataFrame( + geometry=[Point(coord) for coord in coords], crs=source_crs + ) + gdf = gdf.to_crs(target_crs) + self.coords = [(point.x, point.y) for point in gdf.geometry] + + def calculate_sinuosity(self, curve_length): + euclidean_distance = Point(self.coords[0]).distance(Point(self.coords[-1])) + self.len_dir = euclidean_distance + # distances.append(euclidean_distance) + return curve_length / euclidean_distance if euclidean_distance != 0 else 1 + + def calculate_drainage_area(self, idx, segment_das): + if idx == -1: + self.uparea = self.segment.uparea + else: + if not self.segment.up: + prev_up_area = 0 + else: + try: + prev_up_area = sum(segment_das[seg] for seg in self.segment.up) + except KeyError: + prev_up_area = 0 + log.info("Missing upstream branch. Treating as head node") + + ratio = (self.len * (idx + 1)) / self.segment.transformed_line.length + area_difference = self.segment.uparea - prev_up_area + self.uparea = prev_up_area + (area_difference * ratio) + + +class Segment: + """ + A class to represent a river segment with geographical and hydrological attributes. + + Parameters + ---------- + row : dict + A dictionary containing the segment's data. Expected keys are: + 'COMID' (int): Segment ID. + 'order_' (int): Stream order. + 'lengthkm' (float): Length of the segment in kilometers. + 'lengthdir' (float): Direct length of the segment in kilometers. + 'NextDownID' (int): ID of the downstream segment. + 'maxup' (int): Maximum number of upstream segments. + 'up1', 'up2', 'up3', 'up4' (int): IDs of upstream segments. + 'slope' (float): Slope of the segment. + 'sinuosity' (float): Sinuosity of the segment. + 'strmDrop_t' (float): Total stream drop. + 'uparea' (float): Upstream catchment area. + segment_coords : list of tuples + List of coordinate tuples (e.g., [(x1, y1), (x2, y2), ...]) representing the segment's geometry. + crs : str or CRS object + Coordinate reference system of the segment. + + Attributes + ---------- + id : int + Segment ID. + order : int + Stream order. + len : float + Length of the segment in meters. + len_dir : float + Direct length of the segment in meters. + ds : int + ID of the downstream segment. + is_headwater : bool + True if the segment is a headwater segment, otherwise False. + up : list of int + IDs of the upstream segments. + slope : float + Slope of the segment. + sinuosity : float + Sinuosity of the segment. + stream_drop : float + Total stream drop. + uparea : float + Upstream catchment area. + coords : list of tuples + List of coordinate tuples representing the segment's geometry. + crs : str or CRS object + Coordinate reference system of the segment. + transformed_line : type (optional) + Transformed line geometry, initialized as None. + edge_len : type (optional) + Edge length, initialized as None. + """ + + def __init__(self, row, segment_coords, crs): + self.id = row["COMID"] + self.order = row["order"] + self.len = row["lengthkm"] * 1000 # to meters + self.len_dir = row["lengthdir"] * 1000 # to meters + self.ds = row["NextDownID"] + self.is_headwater = False + if row["maxup"] > 0: + up = [row["up1"], row["up2"], row["up3"], row["up4"]] + self.up = [x for x in up if x != 0] + else: + if row["order"] == 1: + self.is_headwater = True + self.up = [] + self.slope = row["slope"] + self.sinuosity = row["sinuosity"] + self.stream_drop = row["strmDrop_t"] + self.uparea = row["uparea"] + self.coords = segment_coords + self.crs = crs + self.transformed_line = None + self.edge_len = None + + +def get_edge_counts(segments, dx, buffer): + edge_counts = {} + + for segment in tqdm( + segments, + desc="Creating edges", + ncols=140, + ascii=True, + ): + try: + line = LineString(segment.coords) + except TypeError: + log.info(f"TypeError for segment {segment.id}. Fusing MultiLineString") + if segment.coords.geom_type == "MultiLineString": + multiline = MultiLineString(segment.coords) + # Create a list of points from all LineStrings in the MultiLineString + points_list = [ + point + for linestring in multiline.geoms + for point in linestring.coords + ] + # Convert points to a single line + line = LineString(points_list) + source_crs = segment.crs + target_crs = "EPSG:32618" + + gdf = gpd.GeoDataFrame(geometry=[line], crs=source_crs) + gdf = gdf.to_crs(target_crs) # Transform to the target CRS + transformed_line = gdf.geometry.iloc[0] # Get the transformed LineString + + length = transformed_line.length + num_edges = length // dx + + if num_edges == 0: + num_edges = 1 + if dx - length < buffer: + edge_len = length + else: + edge_len = dx + edge_counts[segment.id] = num_edges + else: + # Calculate actual segment length + edge_len = length / num_edges + + # Calculate buffer deviation + buf_dev = edge_len - dx + buf_dev_abs = abs(buf_dev) + + # Adjust the number of segments until they are within the buffer + while buf_dev_abs > buffer: + if buf_dev > dx: + num_edges -= 1 + else: + num_edges += 1 + edge_len = length / num_edges + buf_dev = edge_len - dx + buf_dev_abs = abs(edge_len - dx) + edge_counts[segment.id] = int(num_edges) + + segment.transformed_line = transformed_line + segment.edge_len = edge_len + + return edge_counts + + +def get_upstream_ids(segment, edge_counts): + if segment.up is None: + return [] + try: + up_ids = [f"{up}_{edge_counts[up] - 1}" for up in segment.up] + except KeyError: + log.error(f"KeyError with segment {segment.id}") + return [] # temp fix that will kill this river network and make this edge a stream with order 1 + return up_ids + + +def segments_to_edges(segment, edge_counts, segment_das): + edges = [] + num_edges = edge_counts[segment.id] + up_ids = get_upstream_ids(segment, edge_counts) + + """Iterating through dx edges""" + if num_edges == 1: + """Setting the small reaches to dx""" + edge = Edge( + segment, + up=up_ids, + ds=f"{segment.ds}_0", + edge_id=f"{segment.id}_{0}", + ) + edge.coords = list( + segment.transformed_line.interpolate(segment.edge_len * num_edges).coords + ) + [segment.transformed_line.coords[-1]] + edge.len = segment.edge_len + edge.calculate_sinuosity(edge.len) + edge.len_dir = ( + segment.edge_len / edge.sinuosity + ) # This is FAKE as we're setting the len manually + edge.calculate_drainage_area(-1, segment_das) + edges.append(edge) + else: + for i in range(num_edges): + if i == 0: + # Create the first edge + edge = Edge( + segment, + up=up_ids, + ds=f"{segment.id}_{i + 1}", + edge_id=f"{segment.id}_{i}", + ) + else: + # Create subsequent edges + edge = Edge( + segment, + up=[f"{segment.id}_{i - 1}"], + ds=f"{segment.id}_{i + 1}" + if i < num_edges - 1 + else f"{segment.ds}_0", + edge_id=f"{segment.id}_{i}", + ) + edge.coords = list( + segment.transformed_line.interpolate(segment.edge_len * i).coords + ) + list( + segment.transformed_line.interpolate(segment.edge_len * (i + 1)).coords + ) + edge.len = segment.edge_len + edge.sinuosity = edge.calculate_sinuosity(segment.edge_len) + edge.calculate_drainage_area(i, segment_das) + edges.append(edge) + [edge.convert_coords_to_wgs84() for edge in edges] # Convert back to WGS84 + return edges + + +def data_to_csv(data_list): + """ + writing the edges list to disk + :param edges: + :return: + """ + data_dicts = [] + for data in data_list: + edge_dict = { + "id": data.id, + "merit_basin": data.merit_basin, + "order": data.order, + "len": data.len, + "len_dir": data.len_dir, + "ds": data.ds, + "is_headwater": data.is_headwater, + "up": data.up, + "slope": data.slope, + "sinuosity": data.sinuosity, + "stream_drop": data.stream_drop, + "uparea": data.uparea, + "coords": data.coords, + "crs": data.crs, + } + data_dicts.append(edge_dict) + df = pd.DataFrame(data_dicts) + return df + + +def _find_flowlines(cfg: DictConfig) -> Path: + """ + A function to find the correct flowline of all MERIT basins using glob + + Parameters + ---------- + cfg : DictConfig + The cfg object + + Returns + ------- + Path + The file that we're going to create flowline connectivity for + + Raises + ------ + IndexError + Raised if no flowlines are found with your MERIT region code + """ + flowline_path = Path(cfg.save_paths.flow_lines) + region_id = f"_{cfg.zone}_" + matching_file = flowline_path.glob(f"*{region_id}*.shp") + try: + found_file = [file for file in matching_file][0] + return found_file + except IndexError: + raise IndexError(f"No flowlines found using: *{region_id}*.shp") diff --git a/marquette/merit_s3/_map_lake_points.py b/marquette/merit_s3/_map_lake_points.py new file mode 100644 index 0000000..61e8720 --- /dev/null +++ b/marquette/merit_s3/_map_lake_points.py @@ -0,0 +1,75 @@ +import logging +from pathlib import Path + +import geopandas as gpd +import numpy as np +import zarr +from omegaconf import DictConfig +from tqdm import tqdm + +log = logging.getLogger(name=__name__) + +def _map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: + """A function that reads in a gdf of hydrolakes information, finds its corresponding edge, then saves the data + + Parameters + ---------- + cfg: DictConfig + The configuration object + edges: zarr.Group + The zarr group containing the edges + """ + data_path = Path(cfg.map_lake_points.lake_points) + if not data_path.exists(): + msg = "Cannot find the lake points file" + log.exception(msg) + raise FileNotFoundError(msg) + gdf = gpd.read_file(data_path) + lake_comids = gdf["COMID"].astype(int).values + edges_comids : np.ndarray = edges["merit_basin"][:].astype(np.int32) # type: ignore + + hylak_id = np.full(len(edges_comids), -1, dtype=np.int32) + grand_id = np.full_like(edges_comids, -1, dtype=np.int32) + lake_area = np.full_like(edges_comids, -1, dtype=np.float32) + vol_total = np.full_like(edges_comids, -1, dtype=np.float32) + depth_avg = np.full_like(edges_comids, -1, dtype=np.float32) + vol_res = np.full(len(edges_comids), -1, dtype=np.int32) + elevation = np.full_like(edges_comids, -1, dtype=np.int32) + shore_dev = np.full_like(edges_comids, -1, dtype=np.float32) + dis_avg = np.full_like(edges_comids, -1, dtype=np.float32) + + for idx, lake_id in enumerate(tqdm( + lake_comids, + desc="Mapping Lake COMIDS to edges", + ncols=140, + ascii=True, + )) : + jdx = np.where(edges_comids == lake_id)[0] + if not jdx.size: + log.info(f"No lake found for COMID {lake_id}") + else: + # Assumung the pour point is at the end of the COMID + edge_idx = jdx[-1] + lake_row = gdf.iloc[idx] + hylak_id[edge_idx] = lake_row["Hylak_id"] + grand_id[edge_idx] = lake_row["Grand_id"] + lake_area[edge_idx] = lake_row["Lake_area"] + vol_total[edge_idx] = lake_row["Vol_total"] + depth_avg[edge_idx] = lake_row["Depth_avg"] + + vol_res[edge_idx] = lake_row["Vol_res"] + elevation[edge_idx] = lake_row["Elevation"] + shore_dev[edge_idx] = lake_row["Shore_dev"] + dis_avg[edge_idx] = lake_row["Dis_avg"] + + edges.array("hylak_id", data=hylak_id) + edges.array("grand_id", data=grand_id) + edges.array("lake_area", data=lake_area) + edges.array("vol_total", data=vol_total) + edges.array("depth_avg", data=depth_avg) + edges.array("vol_res", data=vol_res) + edges.array("elevation", data=elevation) + edges.array("shore_dev", data=shore_dev) + edges.array("dis_avg", data=dis_avg) + + log.info("Wrote Lake data for zones to zarr") diff --git a/marquette/merit_s3/_streamflow_conversion_functions.py b/marquette/merit_s3/_streamflow_conversion_functions.py new file mode 100644 index 0000000..6972e10 --- /dev/null +++ b/marquette/merit_s3/_streamflow_conversion_functions.py @@ -0,0 +1,193 @@ +import logging +from pathlib import Path + +import numpy as np +import pandas as pd +import xarray as xr +import zarr +from omegaconf import DictConfig +from tqdm import tqdm, trange + +log = logging.getLogger(__name__) + + +def calculate_huc10_flow_from_individual_files(cfg: DictConfig) -> None: + qr_folder = Path(cfg.create_streamflow.predictions) + streamflow_files_path = qr_folder / "basin_split/" + + attrs_df = pd.read_csv(cfg.create_streamflow.obs_attributes) + attrs_df["gage_ID"] = ( + attrs_df["gage_ID"].astype(str).str.zfill(10) + ) # Left padding a 0 to make sure that all gages can be read + id_to_area = attrs_df.set_index("gage_ID")["area"].to_dict() + + huc_to_merit_TM = zarr.open(Path(cfg.create_TMs.HUC.TM), mode="r") + huc_10_list = huc_to_merit_TM.HUC10[:] + date_range = pd.date_range( + start=cfg.create_streamflow.start_date, + end=cfg.create_streamflow.end_date, + freq="D", + ) + streamflow_data = np.zeros((len(date_range), len(huc_10_list))) + + for i, huc_id in enumerate( + tqdm( + huc_10_list, + desc="Processing River flowlines", + ncols=140, + ascii=True, + ) + ): + try: + file_path = streamflow_files_path / f"{huc_id}.npy" + data = np.load(file_path) + file_id = file_path.stem + area = id_to_area.get( + file_id + ) # defaulting to mean area if there is no area for the HUC10 + # CONVERTING FROM MM/DAY TO M3/S + data = data * area * 1000 / 86400 + streamflow_data[:, i] = data + except FileNotFoundError: + log.info(f"No Predictions found for {huc_id}") + except KeyError: + log.info(f"{huc_id} has no area") + + data_array = xr.DataArray( + data=streamflow_data, + dims=["time", "HUC10"], # Explicitly naming the dimensions + coords={"time": date_range, "HUC10": huc_10_list}, # Adding coordinates + ) + xr_dataset = xr.Dataset( + data_vars={"streamflow": data_array}, + attrs={"description": "Streamflow -> HUC Predictions"}, + ) + streamflow_path = Path(cfg.create_streamflow.data_store) + xr_dataset.to_zarr(streamflow_path, mode="w") + # zarr_hierarchy = zarr.open_group(streamflow_path, mode="r") + + +def separate_basins(cfg: DictConfig) -> None: + """ + Code provided by Yalan Song + :param cfg: + :return: + """ + qr_folder = Path(cfg.create_streamflow.predictions) + data_split_folder = qr_folder / "basin_split/" + if data_split_folder.exists() is False: + data_split_folder.mkdir(parents=True, exist_ok=True) + attrs_df = pd.read_csv(Path(cfg.create_streamflow.obs_attributes)) + basin_ids = attrs_df.gage_ID.values + batch_size = 1000 + start_idx = np.arange(0, len(basin_ids), batch_size) + end_idx = np.append(start_idx[1:], len(basin_ids)) + for idx in trange(len(start_idx), desc="reading files"): + try: + basin_ids_np = pd.read_csv( + qr_folder / f"Qr_{start_idx[idx]}_{end_idx[idx]}", + dtype=np.float32, + header=None, + ).values + except FileNotFoundError: + basin_ids_np = pd.read_csv( + qr_folder / f"out0_{start_idx[idx]}_{end_idx[idx]}", + dtype=np.float32, + header=None, + ).values + attribute_batch_df = pd.read_csv( + qr_folder + / "attributes" + / f"attributes_{start_idx[idx]}_{end_idx[idx]}.csv" + ) + attribute_batch_ids = attribute_batch_df.gage_ID.values + for idx, _id in enumerate( + tqdm( + attribute_batch_ids, + desc="saving predictions separately", + ncols=140, + ascii=True, + ) + ): + formatted_id = str(int(_id)).zfill(10) + qr = basin_ids_np[idx : idx + 1, :] + np.save(data_split_folder / f"{formatted_id}.npy", qr) + + +def calculate_merit_flow(cfg: DictConfig, edges: zarr.hierarchy.Group) -> None: + attrs_df = pd.read_csv( + Path(cfg.create_streamflow.obs_attributes) / f"COMID_{str(cfg.zone)[0]}.csv" + ) + id_to_area = attrs_df.set_index("COMID")["unitarea"].to_dict() + + edge_comids = np.unique(edges.merit_basin[:]) # already sorted + + streamflow_predictions_root = zarr.open( + Path(cfg.create_streamflow.predictions), mode="r" + ) + + # Different merit forwards have different save outputs. Specifying here to handle the different versions + version = int( + cfg.create_streamflow.version.lower().split("_v")[1][0] + ) # getting the version number + if version >= 3 or version == 1: + log.info(msg="Reading Zarr Store") + zone_keys = [ + key for key in streamflow_predictions_root.keys() if str(cfg.zone) in key + ] + zone_comids = [] + zone_runoff = [] + for key in zone_keys: + zone_comids.append(streamflow_predictions_root[key].COMID[:]) + zone_runoff.append(streamflow_predictions_root[key].Qr[:]) + # zone_runoff.append(streamflow_predictions_root[key]["Q0"][:]) + streamflow_comids = np.concatenate(zone_comids).astype(int) + file_runoff = np.transpose(np.concatenate(zone_runoff)) + del zone_comids + del zone_runoff + else: + log.info("Reading Zarr Store") + file_runoff = np.transpose(streamflow_predictions_root.Runoff) + + streamflow_comids: np.ndarray = streamflow_predictions_root.COMID[:].astype(int) + + log.info("Mapping predictions to zone COMIDs") + runoff_full_zone = np.zeros((file_runoff.shape[0], edge_comids.shape[0])) + indices = np.searchsorted(edge_comids, streamflow_comids) + runoff_full_zone[:, indices] = file_runoff + + log.info("Creating areas areas_array") + areas = np.zeros_like(edge_comids, dtype=np.float64) + for idx, comid in enumerate(edge_comids): + try: + areas[idx] = id_to_area[comid] + except KeyError as e: + log.error(f"problem finding {comid} in Areas Dictionary") + raise e + areas_array = areas * 1000 / 86400 + + log.info("Converting runoff data, setting NaN and 0 to 1e-6") + streamflow_m3_s_data = runoff_full_zone * areas_array + streamflow_m3_s_data = np.nan_to_num( + streamflow_m3_s_data, nan=1e-6, posinf=1e-6, neginf=1e-6 + ) + mask = streamflow_m3_s_data == 0 + streamflow_m3_s_data[mask] = 1e-6 + + date_range = pd.date_range( + start=cfg.create_streamflow.start_date, + end=cfg.create_streamflow.end_date, + freq="D", + ) + data_array = xr.DataArray( + data=streamflow_m3_s_data, + dims=["time", "COMID"], # Explicitly naming the dimensions + coords={"time": date_range, "COMID": edge_comids}, # Adding coordinates + ) + xr_dataset = xr.Dataset( + data_vars={"streamflow": data_array}, + attrs={"description": "Streamflow -> MERIT Predictions"}, + ) + streamflow_path = Path(cfg.create_streamflow.data_store) + xr_dataset.to_zarr(streamflow_path, mode="w") + # zarr_hierarchy = zarr.open_group(streamflow_path, mode="r") diff --git a/marquette/merit_s3/create.py b/marquette/merit_s3/create.py new file mode 100644 index 0000000..e519c19 --- /dev/null +++ b/marquette/merit_s3/create.py @@ -0,0 +1,253 @@ +import logging +from pathlib import Path + +import geopandas as gpd +import numpy as np +import pandas as pd +import xarray as xr +import zarr +from dask.dataframe.io.io import from_pandas +from omegaconf import DictConfig +from tqdm import tqdm + +from marquette.merit_s3._connectivity_matrix import (create_gage_connectivity, + map_gages_to_zone, + new_zone_connectivity) +from marquette.merit_s3._edge_calculations import ( + calculate_num_edges, create_segment, + many_segment_to_edge_partition, singular_segment_to_edge_partition, + sort_xarray_dataarray) +from marquette.merit_s3._map_lake_points import _map_lake_points +from marquette.merit_s3._streamflow_conversion_functions import ( + calculate_huc10_flow_from_individual_files, calculate_merit_flow, + separate_basins) +from marquette.merit_s3._TM_calculations import ( # create_sparse_MERIT_FLOW_TM, + create_HUC_MERIT_TM, create_MERIT_FLOW_TM, join_geospatial_data) + +log = logging.getLogger(__name__) + + +def write_streamflow(cfg: DictConfig, edges: zarr.Group) -> None: + """ + Process and write streamflow data to a Zarr store. + """ + streamflow_path = Path(cfg.create_streamflow.data_store) + version = cfg.create_streamflow.version.lower() + if streamflow_path.exists() is False: + if version in ["dpl_v1", "dpl_v2", "dpl_v2.5", "dpl_v3-pre"]: + """Expecting to read from individual files""" + separate_basins(cfg) + calculate_huc10_flow_from_individual_files(cfg) + elif version == "dpl_v3": + calculate_huc10_flow_from_individual_files(cfg) + elif "merit" in version: + calculate_merit_flow(cfg, edges) + else: + raise KeyError(f"streamflow version: {version}" "not supported") + else: + log.info("Streamflow data already exists") + + +def create_edges(cfg: DictConfig) -> zarr.Group: + try: + root = xr.open_datatree(cfg.create_edges.edges, engine="zarr") + log.info("Edge data already exists on s3") + edges = root[str(cfg.zone)] + except FileNotFoundError: + log.info("Edge data does not exist. Creating connections") + polyline_gdf = gpd.read_file(cfg.create_edges.flowlines) + dx = cfg.create_edges.dx # Unit: Meters + buffer = cfg.create_edges.buffer * dx # Unit: Meters + for col in [ + "COMID", + "NextDownID", + "up1", + "up2", + "up3", + "up4", + "maxup", + "order", + ]: + polyline_gdf[col] = polyline_gdf[col].astype(int) + computed_series = polyline_gdf.apply( + lambda df: create_segment(df, polyline_gdf.crs, dx, buffer), axis=1 + ) + segments_dict = computed_series.to_dict() + segment_das = { + segment["id"]: segment["uparea"] for segment in segments_dict.values() + } + sorted_keys = sorted( + segments_dict, key=lambda key: segments_dict[key]["uparea"] + ) + num_edges_dict = { + _segment["id"]: calculate_num_edges(_segment["len"], dx, buffer) + for _, _segment in tqdm( + segments_dict.items(), + desc="Processing Number of Edges", + ncols=140, + ascii=True, + ) + } + one_edge_segment = { + seg_id: edge_info + for seg_id, edge_info in tqdm( + num_edges_dict.items(), + desc="Filtering Segments == 1", + ncols=140, + ascii=True, + ) + if edge_info[0] == 1 + } + many_edge_segment = { + seg_id: edge_info + for seg_id, edge_info in tqdm( + num_edges_dict.items(), + desc="Filtering Segments > 1", + ncols=140, + ascii=True, + ) + if edge_info[0] > 1 + } + segments_with_more_than_one_edge = {} + segments_with_one_edge = {} + for i, segment in segments_dict.items(): + segment_id = segment["id"] + segment["index"] = i + + if segment_id in many_edge_segment: + segments_with_more_than_one_edge[segment_id] = segment + elif segment_id in one_edge_segment: + segments_with_one_edge[segment_id] = segment + else: + print(f"MISSING ID: {segment_id}") + + df_one = pd.DataFrame.from_dict(segments_with_one_edge, orient="index") + df_many = pd.DataFrame.from_dict( + segments_with_more_than_one_edge, orient="index" + ) + ddf_one = from_pandas(df_one, npartitions=64) + ddf_many = from_pandas(df_many, npartitions=64) + + meta = pd.DataFrame( + { + "id": pd.Series(dtype="str"), + "merit_basin": pd.Series(dtype="int"), + "segment_sorting_index": pd.Series(dtype="int"), + "order": pd.Series(dtype="int"), + "len": pd.Series(dtype="float"), + "len_dir": pd.Series(dtype="float"), + "ds": pd.Series(dtype="str"), + "up": pd.Series(dtype="object"), # List or array + "up_merit": pd.Series(dtype="object"), # List or array + "slope": pd.Series(dtype="float"), + "sinuosity": pd.Series(dtype="float"), + "stream_drop": pd.Series(dtype="float"), + "uparea": pd.Series(dtype="float"), + "coords": pd.Series(dtype="str"), + "crs": pd.Series(dtype="object"), # CRS object + } + ) + + edges_results_one = ddf_one.map_partitions( + singular_segment_to_edge_partition, + edge_info=one_edge_segment, + num_edge_dict=num_edges_dict, + segment_das=segment_das, + meta=meta, + ) + edges_results_many = ddf_many.map_partitions( + many_segment_to_edge_partition, + edge_info=many_edge_segment, + num_edge_dict=num_edges_dict, + segment_das=segment_das, + meta=meta, + ) + edges_results_one_df = edges_results_one.compute() + edges_results_many_df = edges_results_many.compute() + merged_df = pd.concat([edges_results_one_df, edges_results_many_df]) + for col in ["id", "ds", "up", "coords", "up_merit", "crs"]: + merged_df[col] = merged_df[col].astype(dtype=str) + for col in ["merit_basin", "segment_sorting_index", "order"]: + merged_df[col] = merged_df[col].astype(dtype=np.int32) + for col in ["len", "len_dir", "slope", "sinuosity", "stream_drop", "uparea"]: + merged_df[col] = merged_df[col].astype(dtype=np.float32) + # xr_dataset = xr.Dataset.from_dataframe(merged_df) + ds = merged_df.to_xarray().assign_coords({"merit_basin": merged_df["merit_basin"]}) + sorted_keys_array = np.array(sorted_keys) + sorted_edges = xr.Dataset() + for var_name in xr_dataset.data_vars: + sorted_edges[var_name] = sort_xarray_dataarray( + xr_dataset[var_name], + sorted_keys_array, + xr_dataset["segment_sorting_index"].values, + ) + shape = sorted_edges[var_name].shape + dtype = sorted_edges[var_name].dtype + tmp = edges.zeros(var_name, shape=shape, chunks=1000, dtype=dtype) + tmp[:] = sorted_edges[var_name].values + tmp = edges.zeros( + "sorted_keys", + shape=sorted_keys_array.shape, + chunks=1000, + dtype=sorted_keys_array.dtype, + ) + tmp[:] = sorted_keys_array + return edges + + +def create_N(cfg: DictConfig, edges: zarr.Group) -> None: + gage_coo_root = zarr.open_group(Path(cfg.create_N.gage_coo_indices), mode="a") + zone_root = gage_coo_root.require_group(cfg.zone) + if cfg.create_N.run_whole_zone: + if "full_zone" in zone_root: + log.info("Full zone already exists") + else: + full_zone_root = zone_root.require_group("full_zone") + new_zone_connectivity(cfg, edges, full_zone_root) + log.info("Full zone sparse Matrix created") + else: + zone_csv_path = Path(cfg.create_N.zone_obs_dataset) + if zone_csv_path.exists(): + zone_csv = pd.read_csv(zone_csv_path) + else: + zone_csv = map_gages_to_zone(cfg, edges) + if zone_csv is not False: + create_gage_connectivity(cfg, edges, zone_root, zone_csv) + log.info("All sparse gage matrices are created") + + +def create_TMs(cfg: DictConfig, edges: zarr.Group) -> None: + if "HUC" in cfg.create_TMs: + huc_to_merit_path = Path(cfg.create_TMs.HUC.TM) + if huc_to_merit_path.exists(): + log.info("HUC -> MERIT data already exists in zarr format") + else: + log.info("Creating HUC10 -> MERIT TM") + overlayed_merit_basins = join_geospatial_data(cfg) + create_HUC_MERIT_TM(cfg, edges, overlayed_merit_basins) + merit_to_river_graph_path = Path(cfg.create_TMs.MERIT.TM) + if merit_to_river_graph_path.exists(): + log.info("MERIT -> FLOWLINE data already exists in zarr format") + else: + log.info("Creating MERIT -> FLOWLINE TM") + # if cfg.create_TMs.MERIT.save_sparse: + # create_sparse_MERIT_FLOW_TM(cfg, edges) + # else: + create_MERIT_FLOW_TM(cfg, edges) + + +def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: + """Maps HydroLAKES pour points to the corresponding edge + + Parameters + ---------- + cfg: DictConfig + The configuration object + edges: zarr.Group + The zarr group containing the edges + """ + if "hylak_id" in edges: + log.info("HydroLakes Intersection already exists in Zarr format") + else: + log.info("Mapping HydroLakes Pour Points to Edges") + _map_lake_points(cfg, edges) diff --git a/marquette/merit_s3/extensions.py b/marquette/merit_s3/extensions.py new file mode 100644 index 0000000..4605df3 --- /dev/null +++ b/marquette/merit_s3/extensions.py @@ -0,0 +1,546 @@ +import logging +from pathlib import Path + +import binsparse +import cupy as cp +from cupyx.scipy import sparse as cp_sparse +import geopandas as gpd +import networkx as nx +import numpy as np +import pandas as pd +import polars as pl +from scipy import sparse +import zarr +from omegaconf import DictConfig +from tqdm import tqdm + +log = logging.getLogger(__name__) + + +def soils_data(cfg: DictConfig, edges: zarr.Group) -> None: + flowline_file = ( + Path(cfg.data_path) + / f"raw/routing_soil_properties/riv_pfaf_{cfg.zone}_buff_split_soil_properties.shp" + ) + polyline_gdf = gpd.read_file(flowline_file) + gdf = pd.DataFrame(polyline_gdf.drop(columns="geometry")) + df = pl.from_pandas(gdf) + df = df.with_columns(pl.col("COMID").cast(pl.Int64)) # convert COMID to int64 + attributes = [ + "Ks_05_M_25", + "N_05_M_250", + "SF_05_M_25", + "AF_05_M_25", + "OM_05_M_25", + "Cl_05_Mn", + "Sd_05_Mn", + "St_05_Mn", + ] + names = [ + "ksat", + "N", + "sat-field", + "alpha", + "ormc", + "clay_mean_05", + "sand_mean_05", + "silt_mean_05", + ] + df_filled = df.with_columns( + pl.when(pl.col(["Ks_05_M_25", "SF_05_M_25"]).is_null()) + .then(pl.col(["Ks_05_M_25", "SF_05_M_25"]).fill_null(strategy="max")) + .otherwise(pl.col(["Ks_05_M_25", "SF_05_M_25"])) + ) + df_filled = df_filled.with_columns( + pl.when(pl.col(["N_05_M_250", "AF_05_M_25"]).is_null()) + .then(pl.col(["N_05_M_250", "AF_05_M_25"]).fill_null(strategy="min")) + .otherwise(pl.col(["N_05_M_250", "AF_05_M_25"])) + ) + df_filled = df_filled.with_columns( + pl.when(pl.col(["OM_05_M_25", "Cl_05_Mn", "Sd_05_Mn", "St_05_Mn"]).is_null()) + .then( + pl.col(["OM_05_M_25", "Cl_05_Mn", "Sd_05_Mn", "St_05_Mn"]).fill_null( + strategy="forward" + ) + ) + .otherwise(pl.col(["OM_05_M_25", "Cl_05_Mn", "Sd_05_Mn", "St_05_Mn"])) + ) + graph_cols = ["COMID", "up1", "NextDownID"] + df_cols = graph_cols + attributes + _df = df_filled.select(pl.col(df_cols)) + edges_df = pl.DataFrame({"COMID": edges.merit_basin[:]}) + joined_df = edges_df.join(_df, on="COMID", how="left", join_nulls=True) + for i in range(len(names)): + edges.array( + name=names[i], + data=joined_df.select(pl.col(attributes[i])).to_numpy().squeeze(), + ) + + +def pet_forcing(cfg: DictConfig, edges: zarr.Group) -> None: + pet_file_path = Path(f"/projects/mhpi/data/global/zarr_sub_zone/{cfg.zone}") + num_timesteps = pd.date_range( + start=cfg.create_streamflow.start_date, + end=cfg.create_streamflow.end_date, + freq="d", + ).shape[0] + if pet_file_path.exists() is False: + raise FileNotFoundError("PET forcing data not found") + edge_merit_basins: np.ndarray = edges.merit_basin[:] + pet_edge_data = [] + pet_comid_data = [] + mapping = np.empty_like(edge_merit_basins, dtype=int) + files = pet_file_path.glob("*") + for file in files: + pet_zone_data = zarr.open_group(file, mode="r") + comids = pet_zone_data.COMID[:] + pet = pet_zone_data.PET[:] + pet_comid_data.append(comids) + pet_edge_data.append(pet) + pet_comid_arr = np.concatenate(pet_comid_data) + pet_arr = np.concatenate(pet_edge_data) + + if pet_arr.shape[0] != len(np.unique(edge_merit_basins)): + raise ValueError( + "PET forcing data is not consistent. Check the number of comids in the data and the edge_merit_basins array." + ) + if pet_arr.shape[1] != num_timesteps: + raise ValueError( + "PET forcing data is not consistent. Check the number of timesteps in the data and the num_timesteps variable." + ) + + for i, id in enumerate(tqdm(pet_comid_arr, desc="\rProcessing PET data")): + idx = np.where(edge_merit_basins == id)[0] + mapping[idx] = i + mapped_attr = pet_arr[mapping] + edges.array(name="pet", data=mapped_attr) + + +def temp_forcing(cfg: DictConfig, edges: zarr.Group) -> None: + temp_file_path = Path(f"/projects/mhpi/data/global/zarr_sub_zone/{cfg.zone}") + num_timesteps = pd.date_range( + start=cfg.create_streamflow.start_date, + end=cfg.create_streamflow.end_date, + freq="d", + ).shape[0] + if temp_file_path.exists() is False: + raise FileNotFoundError("Temp forcing data not found") + edge_merit_basins: np.ndarray = edges.merit_basin[:] + temp_edge_data = [] + temp_comid_data = [] + mapping = np.empty_like(edge_merit_basins, dtype=int) + files = temp_file_path.glob("*") + for file in files: + try: + temp_zone_data = zarr.open_group(file, mode="r") + comids = temp_zone_data.COMID[:] + temp_mean = temp_zone_data.Temp[:] + temp_comid_data.append(comids) + temp_edge_data.append(temp_mean) + except zarr.errors.FSPathExistNotDir: + log.info(f"found non group file, skipping: {file}") + temp_comid_arr = np.concatenate(temp_comid_data) + temp_arr = np.concatenate(temp_edge_data) + + if temp_arr.shape[0] != len(np.unique(edge_merit_basins)): + raise ValueError( + "Temp forcing data is not consistent. Check the number of comids in the data and the edge_merit_basins array." + ) + if temp_arr.shape[1] != num_timesteps: + raise ValueError( + "Temp forcing data is not consistent. Check the number of timesteps in the data and the num_timesteps variable." + ) + + for i, id in enumerate(tqdm(temp_comid_arr, desc="\rProcessing temp data")): + idx = np.where(edge_merit_basins == id)[0] + mapping[idx] = i + mapped_attr = temp_arr[mapping] + edges.array(name="temp_mean", data=mapped_attr) + + +def global_dhbv_static_inputs(cfg: DictConfig, edges: zarr.Group) -> None: + """ + Pulling Data from the global_dhbv_static_inputs data and storing it in a zarr store + All attrs are as follows: + attributeLst = ['area','ETPOT_Hargr', 'FW', 'HWSD_clay', 'HWSD_gravel', 'HWSD_sand', + 'HWSD_silt', 'NDVI', 'Porosity', 'SoilGrids1km_clay', + 'SoilGrids1km_sand', 'SoilGrids1km_silt', 'T_clay', 'T_gravel', + 'T_sand', 'T_silt', 'aridity', 'glaciers', 'meanP', 'meanTa', + 'meanelevation', 'meanslope', 'permafrost', 'permeability', + 'seasonality_P', 'seasonality_PET', 'snow_fraction', + 'snowfall_fraction'] + """ + file_path = Path(f"/projects/mhpi/data/global/zarr_sub_zone/{cfg.zone}") + if file_path.exists() is False: + raise FileNotFoundError("global_dhbv_static_inputs data not found") + edge_merit_basins: np.ndarray = edges.merit_basin[:] + comid_data = [] + aridity_data = [] + porosity_data = [] + mean_p_data = [] + mean_elevation_data = [] + glaciers_data = [] + + mapping = np.empty_like(edge_merit_basins, dtype=int) + files = file_path.glob("*") + for file in files: + pet_zone_data = zarr.open_group(file, mode="r") + comids = pet_zone_data.COMID[:] + aridity = pet_zone_data["attrs"]["aridity"][:] + porosity = pet_zone_data["attrs"]["Porosity"][:] + mean_p = pet_zone_data["attrs"]["meanP"][:] + mean_elevation = pet_zone_data["attrs"]["meanelevation"][:] + glaciers = pet_zone_data["attrs"]["glaciers"][:] + + comid_data.append(comids) + aridity_data.append(aridity) + porosity_data.append(porosity) + mean_p_data.append(mean_p) + mean_elevation_data.append(mean_elevation) + glaciers_data.append(glaciers) + + comid_arr = np.concatenate(comid_data) + aridity_arr = np.concatenate(aridity_data) + porosity_arr = np.concatenate(porosity_data) + mean_p_arr = np.concatenate(mean_p_data) + mean_elevation_arr = np.concatenate(mean_elevation_data) + glacier_arr = np.concatenate(glaciers_data) + + if comid_arr.shape[0] != len(np.unique(edge_merit_basins)): + raise ValueError( + "data is not consistent. Check the number of comids in the data and the edge_merit_basins array." + ) + + for i, id in enumerate(tqdm(comid_arr, desc="\rProcessing data")): + idx = np.where(edge_merit_basins == id)[0] + mapping[idx] = i + edges.array(name="aridity", data=aridity_arr[mapping]) + edges.array(name="porosity", data=porosity_arr[mapping]) + edges.array(name="mean_p", data=mean_p_arr[mapping]) + edges.array(name="mean_elevation", data=mean_elevation_arr[mapping]) + edges.array(name="glacier", data=glacier_arr[mapping]) + + +def calculate_incremental_drainage_area(cfg: DictConfig, edges: zarr.Group) -> None: + """ + Runs a Polars query to calculate the incremental drainage area for each edge in the MERIT dataset + """ + basin_file = ( + Path(cfg.data_path) + / f"raw/basins/cat_pfaf_{cfg.zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp" + ) + if basin_file.exists() is False: + raise FileNotFoundError("Basin file not found") + gdf = gpd.read_file(basin_file) + _df = pd.DataFrame(gdf.drop(columns="geometry")) + df = pl.from_pandas(_df) + edges_df = pl.DataFrame( + { + "COMID": edges.merit_basin[:], + "id": edges.id[:], + "order": np.arange(edges.id.shape[0]), + } + ) + + result = ( + df.lazy() + .join(other=edges_df.lazy(), left_on="COMID", right_on="COMID", how="left") + .group_by( + by="COMID", + ) + .agg( + [ + pl.map_groups( + exprs=["unitarea", pl.first("unitarea")], + function=lambda list_of_series: list_of_series[1] + / list_of_series[0].shape[0], + ).alias("incremental_drainage_area") + ] + ) + .join(other=edges_df.lazy(), left_on="COMID", right_on="COMID", how="left") + .sort(by="order") + .collect() + ) + edges.array( + name="incremental_drainage_area", + data=result.select(pl.col("incremental_drainage_area")).to_numpy().squeeze(), + ) + + +def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None: + """Creates Q` summed data for all edges in a given MERIT zone + + Parameters: + ---------- + cfg: DictConfig + The configuration object. + edges: zarr.Group + The edges group in the MERIT zone + """ + n = 1 # number of splits (used for reducing memory load) + cp.cuda.runtime.setDevice(2) # manually setting the device to 2 + + streamflow_group = Path( + f"/projects/mhpi/data/MERIT/streamflow/zarr/{cfg.create_streamflow.version}/{cfg.zone}" + ) + if streamflow_group.exists() is False: + raise FileNotFoundError("streamflow_group data not found") + streamflow_zarr: zarr.Group = zarr.open_group(streamflow_group, mode="r") + streamflow_time = streamflow_zarr.time[:] + # _, counts = cp.unique(edges.segment_sorting_index[:], return_counts=True) # type: ignore + dim_0: int = streamflow_zarr.time.shape[0] # type: ignore + dim_1: int = streamflow_zarr.COMID.shape[0] # type: ignore + edge_ids = np.array(edges.id[:]) + edges_segment_sorting_index = cp.array(edges.segment_sorting_index[:]) + edge_index_mapping = {v: i for i, v in enumerate(edge_ids)} + + q_prime_np = np.zeros([dim_0, dim_1]).transpose(1, 0) + streamflow_data = cp.array(streamflow_zarr.streamflow[:]) + + # Generating a networkx DiGraph object + df_path = Path(f"{cfg.create_edges.edges}").parent / f"{cfg.zone}_graph_df.csv" + if df_path.exists(): + df = pd.read_csv(df_path) + else: + source = [] + target = [] + for idx, _ in enumerate( + tqdm( + edges.id[:], + desc="creating graph in dictionary form", + ascii=True, + ncols=140, + ) + ): + if edges.ds[idx] != "0_0": + source.append(idx) + target.append(edge_index_mapping[edges.ds[idx]]) + df = pd.DataFrame({"source": source, "target": target}) + df.to_csv(df_path, index=False) + G = nx.from_pandas_edgelist( + df=df, + create_using=nx.DiGraph(), + ) + + time_split = np.array_split(streamflow_time, n) # type: ignore + for idx, time_range in enumerate(time_split): + q_prime_cp = cp.zeros([time_range.shape[0], dim_1]).transpose(1, 0) + q_prime_np_edge_count_cp = cp.zeros([time_range.shape[0], dim_1]).transpose(1, 0) + for jdx, _ in enumerate( + tqdm( + edge_ids, + desc=f"calculating q` sum data for part {idx}", + ascii=True, + ncols=140, + ) + ): + streamflow_ds_id = edges_segment_sorting_index[jdx] + # num_edges_in_comid = counts[streamflow_ds_id] + try: + graph = nx.descendants(G, jdx, backend="cugraph") + graph.add(jdx) # Adding the idx to ensure it's counted + downstream_idx = np.array(list(graph)) # type: ignore + downstream_comid_idx = cp.unique( + edges_segment_sorting_index[downstream_idx] + ) # type: ignore + streamflow = streamflow_data[ + time_range, streamflow_ds_id + ] + q_prime_cp[downstream_comid_idx] += streamflow + q_prime_np_edge_count_cp[downstream_comid_idx] += 1 + except nx.exception.NetworkXError: + # This means there is no connectivity from this basin. It's one-node graph + streamflow = streamflow_data[ + time_range, streamflow_ds_id + ] + q_prime_cp[streamflow_ds_id] = streamflow + q_prime_np_edge_count_cp[streamflow_ds_id] += 1 + + print("Saving GPU Memory to CPU; freeing GPU Memory") + q_prime_np[:, time_range] = cp.asnumpy(q_prime_cp / q_prime_np_edge_count_cp) + del q_prime_cp + del q_prime_np_edge_count_cp + cp.get_default_memory_pool().free_all_blocks() + + edges.array( + name="summed_q_prime", + data=q_prime_np.transpose(1, 0), + ) + +def map_last_edge_to_comid(arr): + unique_elements = np.unique(arr) + mapping = {val: np.where(arr == val)[0][-1] for val in unique_elements} + last_indices = np.array([mapping[val] for val in unique_elements]) + + return unique_elements, last_indices + + +def calculate_mean_p_summation(cfg: DictConfig, edges: zarr.Group) -> None: + """Creates Q` summed data for all edges in a given MERIT zone + + Parameters: + ---------- + cfg: DictConfig + The configuration object. + edges: zarr.Group + The edges group in the MERIT zone + """ + cp.cuda.runtime.setDevice(6) # manually setting the device to 2 + + edge_comids = edges.merit_basin[:] + edge_comids_cp = cp.array(edges.merit_basin[:]) + ordered_merit_basin, indices = map_last_edge_to_comid(edge_comids) + ordered_merit_basin_cp = cp.array(ordered_merit_basin) + indices_cp = cp.array(indices) + mean_p_data = cp.array(edges.mean_p[:]) # type: ignore # UNIT: mm/year + + # Generating a networkx DiGraph object + df_path = Path(f"{cfg.create_edges.edges}").parent / f"{cfg.zone}_graph_df.csv" + if df_path.exists(): + df = pd.read_csv(df_path) + else: + raise FileNotFoundError("edges graph data not found. Have you calculated summed_q_prime yet?") + G = nx.from_pandas_edgelist( + df=df, + create_using=nx.DiGraph(), + ) + mean_p_sum = cp.zeros([indices.shape[0]]) + idx_counts = cp.zeros([indices.shape[0]]) + + for j, index in enumerate( + tqdm( + indices, + desc="calculating mean p sum data", + ascii=True, + ncols=140, + ) + ): + try: + graph = nx.descendants(G, source=index, backend="cugraph") + graph.add(index) # Adding the idx to ensure it's counted + downstream_idx = np.array(list(graph)) # type: ignore + + unique_merit_basin = cp.unique(edge_comids_cp[downstream_idx]) # type: ignore + positions = cp.searchsorted(ordered_merit_basin_cp, unique_merit_basin) + # downstream_comid_idx = indices_cp[positions] + + mean_p_sum[positions] += mean_p_data[index] # type: ignore + idx_counts[positions] += 1 + except nx.exception.NetworkXError: + # This means there is no downstream connectivity from this basin. It's one-node graph + mean_p_sum[j] = mean_p_data[index] + idx_counts[j] += 1 + + print("Saving GPU Memory to CPU; freeing GPU Memory") + upstream_basin_avg_mean_p = mean_p_sum / idx_counts + upstream_basin_avg_mean_p_np = cp.asnumpy(upstream_basin_avg_mean_p) + del upstream_basin_avg_mean_p + del mean_p_sum + del idx_counts + cp.get_default_memory_pool().free_all_blocks() + + edges.array( + name="upstream_basin_avg_mean_p", + data=upstream_basin_avg_mean_p_np, + ) + + + +def calculate_q_prime_sum_stats(cfg: DictConfig, edges: zarr.Group) -> None: + """Creates Q` summed data for all edges in a given MERIT zone + + Parameters: + ---------- + cfg: DictConfig + The configuration object. + edges: zarr.Group + The edges group in the MERIT zone + """ + try: + summed_q_prime: np.ndarray = edges.summed_q_prime[:] + except AttributeError: + raise AttributeError("summed_q_prime data not found") + edges.array( + name="summed_q_prime_median", + data=np.median(summed_q_prime, axis=0), + ) + edges.array( + name="summed_q_prime_std", + data=np.std(summed_q_prime, axis=0), + ) + edges.array( + name="summed_q_prime_p90", + data=np.percentile(summed_q_prime, 90, axis=0), + ) + edges.array( + name="summed_q_prime_p10", + data=np.percentile(summed_q_prime, 10, axis=0), + ) + +def format_lstm_forcings(cfg: DictConfig, edges: zarr.Group) -> None: + forcings_store = zarr.open(Path("/projects/mhpi/data/global/zarr_sub_zone") / f"{cfg.zone}") + + edge_comids = np.unique(edges.merit_basin[:]) # already sorted + log.info(msg="Reading Zarr Store") + zone_keys = [ + key for key in forcings_store.keys() if str(cfg.zone) in key + ] + zone_comids = [] + zone_precip = [] + zone_pet = [] + # zone_temp = [] + zone_ndvi = [] + zone_aridity = [] + for key in zone_keys: + zone_comids.append(forcings_store[key].COMID[:]) + zone_precip.append(forcings_store[key].P[:]) + zone_pet.append(forcings_store[key].PET[:]) + # zone_temp.append(streamflow_predictions_root[key].Temp[:]) + zone_ndvi.append(forcings_store[key]["attrs"]["NDVI"]) + zone_aridity.append(forcings_store[key]["attrs"]["aridity"]) + + streamflow_comids = np.concatenate(zone_comids).astype(int) + file_precip = np.transpose(np.concatenate(zone_precip)) + file_pet = np.transpose(np.concatenate(zone_pet)) + # file_temp = np.transpose(np.concatenate(zone_temp)) + file_ndvi = np.concatenate(zone_ndvi) + file_aridity = np.concatenate(zone_aridity) + del zone_comids + del zone_precip + del zone_pet + # del zone_temp + del zone_ndvi + del zone_aridity + + log.info("Mapping to zone COMIDs") + precip_full_zone = np.zeros((file_precip.shape[0], edge_comids.shape[0])) + pet_full_zone = np.zeros((file_precip.shape[0], edge_comids.shape[0])) + ndvi_full_zone = np.zeros((edge_comids.shape[0])) + aridity_full_zone = np.zeros((edge_comids.shape[0])) + + + indices = np.searchsorted(edge_comids, streamflow_comids) + precip_full_zone[:, indices] = file_precip + pet_full_zone[:, indices] = file_pet + ndvi_full_zone[indices] = file_ndvi + aridity_full_zone[indices] = file_aridity + + log.info("Writing outputs to zarr") + edges.array( + name="precip_comid", + data=precip_full_zone, + ) + edges.array( + name="pet_comid", + data=pet_full_zone, + ) + edges.array( + name="ndvi_comid", + data=ndvi_full_zone, + ) + edges.array( + name="aridity_comid", + data=aridity_full_zone, + ) + + diff --git a/marquette/merit_s3/map.py b/marquette/merit_s3/map.py new file mode 100644 index 0000000..ac89a99 --- /dev/null +++ b/marquette/merit_s3/map.py @@ -0,0 +1,324 @@ +"""It looks like there were some changes with merit basins. Keeping v1 here just in case""" + +import logging +import multiprocessing +from pathlib import Path +from typing import List + +import geopandas as gpd +import numpy as np +import pandas as pd +from omegaconf import DictConfig +from scipy.sparse import csr_matrix +from tqdm import tqdm + +from marquette.merit._graph import (Segment, data_to_csv, get_edge_counts, + segments_to_edges) + +log = logging.getLogger(__name__) + + +def _plot_gdf(gdf: gpd.GeoDataFrame) -> None: + import matplotlib.pyplot as plt + + fig, ax = plt.subplots(figsize=(10, 10)) + gdf.plot(ax=ax) + ax.set_title("Polyline Plot") + ax.set_xlabel("Longitude") + ax.set_ylabel("Latitude") + plt.show() + + +def create_graph(cfg): + flowline_file = cfg.save_paths.flow_lines + polyline_gdf = gpd.read_file(flowline_file) + + # Convert multiple columns to int type + for col in [ + "COMID", + "NextDownID", + "up1", + "up2", + "up3", + "up4", + "maxup", + "order_", + ]: + polyline_gdf[col] = polyline_gdf[col].astype(int) + + crs = polyline_gdf.crs + + # Generate segments using list comprehension + segments = [Segment(row, row.geometry, crs) for _, row in polyline_gdf.iterrows()] + + dx = cfg.dx # Unit: Meters + buffer = cfg.buffer * dx # Unit: Meters + sorted_segments = sorted( + segments, key=lambda segment: segment.uparea, reverse=False + ) + segment_das = { + segment.id: segment.uparea for segment in segments + } # Simplified with dict comprehension + edge_counts = get_edge_counts(sorted_segments, dx, buffer) + edges_ = [ + edge + for segment in tqdm( + sorted_segments, + desc="Processing segments", + ncols=140, + ascii=True, + ) + for edge in segments_to_edges( + segment, edge_counts, segment_das + ) # returns many edges + ] + edges = data_to_csv(edges_) + edges.to_csv(cfg.csv.edges, index=False, compression="gzip") + + return edges + + +def map_streamflow_to_river_graph(cfg: DictConfig, edges: pd.DataFrame) -> None: + """ + Ways to check if this is right: + - sort each column and index row + - subtract the HUC cols from flow from the HUC index of hm_TM + (f_cols.astype("int") - huc_to_merit_TM["HUC10"]).sum() + - subtract the MERIT cols from hm_TM from the MERIT index of mrg_TM + (hm_cols.astype("int") - merit_to_river_graph_TM["Merit_Basins"]).sum() + :param cfg: + :param edges: + :return: + """ + huc_to_merit_TM = pd.read_csv(cfg.save_paths.huc_to_merit_tm, compression="gzip") + huc_10_list = huc_to_merit_TM["HUC10"].values + hm_TM = csr_matrix(huc_to_merit_TM.drop("HUC10", axis=1).values) + merit_to_river_graph_TM = _create_TM(cfg, edges, huc_to_merit_TM) + if "Merit_Basins" in merit_to_river_graph_TM.columns: + columns = merit_to_river_graph_TM.drop("Merit_Basins", axis=1).columns + mrg_TM = csr_matrix(merit_to_river_graph_TM.drop("Merit_Basins", axis=1).values) + else: + columns = merit_to_river_graph_TM.columns + mrg_TM = csr_matrix(merit_to_river_graph_TM.values) + log.info("Reading streamflow predictions") + streamflow_predictions = _read_flow(cfg, huc_10_list) + streamflow_predictions["dates"] = pd.to_datetime(streamflow_predictions["dates"]) + grouped_predictions = streamflow_predictions.groupby( + streamflow_predictions["dates"].dt.year + ) + args_iter = ( + (cfg, group, hm_TM.copy(), mrg_TM.copy(), columns) + for group in grouped_predictions + ) + with multiprocessing.Pool(cfg.num_cores) as pool: + log.info("Writing Process to disk") + pool.map(_write_to_disk, args_iter) + + +def _write_to_disk(args): + cfg, grouped_predictions, hm_TM, mrg_TM, columns = args + save_path = Path(cfg.csv.mapped_streamflow_dir) + save_path.mkdir(exist_ok=True, parents=True) + year = grouped_predictions[0] + group = grouped_predictions[1] + log.info(f"Writing data for year {year}") + interpolated_flow = _interpolate(cfg, group, year) + flow = csr_matrix(interpolated_flow.drop("dates", axis=1).sort_index(axis=1).values) + mapped_flow_merit = flow.dot(hm_TM) + mapped_flow_river_graph = mapped_flow_merit.dot(mrg_TM) + mapped_flow_array = mapped_flow_river_graph.toarray() + log.info(f"Writing {year} to disk") + df = pd.DataFrame( + mapped_flow_array, + index=interpolated_flow["dates"], + columns=columns, + dtype="float32", + ) + df.to_csv( + save_path / f"{year}_{cfg.basin}_mapped_streamflow.csv.gz", + compression="gzip", + ) + log.info(f"Done with {year}") + + +def _create_TM( + cfg: DictConfig, edges: pd.DataFrame, huc_to_merit_TM: pd.DataFrame +) -> pd.DataFrame: + """ + Creating a TM that maps MERIT basins to their reaches. Flow predictions are distributed + based on reach length/ total merit reach length + :param cfg: + :param edges: + :param huc_to_merit_TM: + :return: + """ + tm = Path(cfg.save_paths.merit_to_river_graph_tm) + if tm.exists(): + return pd.read_csv(tm, compression="gzip") + else: + merit_basins = huc_to_merit_TM.columns[ + huc_to_merit_TM.columns != "HUC10" + ].values + sorted_edges = edges.sort_values(by="merit_basin", ascending=False) + river_graph_ids = sorted_edges["id"].values + river_graph_ids.sort() + df = pd.DataFrame(index=merit_basins, columns=river_graph_ids) + df["Merit_Basins"] = merit_basins + df = df.set_index("Merit_Basins") + for idx, id in enumerate( + tqdm( + merit_basins, + desc="creating TM", + ncols=140, + ascii=True, + ) + ): + merit_reaches = edges[edges["merit_basin"] == int(id)] + if merit_reaches.shape[0] == 0: + log.error(f"Missing row for {id}") + total_length = sum( + [merit_reaches.iloc[i].len for i in range(merit_reaches.shape[0])] + ) + for j, reach in merit_reaches.iterrows(): + data = np.zeros([merit_basins.shape[0]]) + data[idx] = reach.len / total_length + df[reach.id] = data + # df = df.reset_index() + log.info("Writing TM") + df.to_csv(tm, compression="gzip", index=False) + return df + + +def _read_flow(cfg, huc_10_list: np.ndarray) -> pd.DataFrame: + streamflow_output = Path(cfg.save_paths.streamflow_output) + if streamflow_output.exists(): + df = pd.read_csv(streamflow_output, compression="gzip") + else: + df = _create_streamflow(cfg, huc_10_list) + streamflow_output.parent.mkdir(exist_ok=True) + df.to_csv(streamflow_output, index=False, compression="gzip") + return df + + +def _interpolate(cfg: DictConfig, df: pd.DataFrame, year: int) -> pd.DataFrame: + streamflow_interpolated = Path(cfg.save_paths.streamflow_interpolated.format(year)) + if streamflow_interpolated.exists(): + return pd.read_csv(streamflow_interpolated, compression="gzip") + else: + df["dates"] = pd.to_datetime(df["dates"]) + df.set_index("dates", inplace=True) + df = df.resample("H").asfreq() + df = df.interpolate(method="linear") + df_reset = df.reset_index() + df_reset.to_csv(streamflow_interpolated, index=False, compression="gzip") + return df_reset + + +def _create_streamflow(cfg: DictConfig, huc_10_list: np.ndarray) -> pd.DataFrame: + """ + extracting streamflow from many files based on HUC IDs + :param cfg: + :return: + """ + + def extract_numbers(filename): + """ + Extracts the first set of numbers from the filename and returns them as an integer. + Assumes the filename contains numbers in the format 'xxxx_yyyy'. + """ + import re + + match = re.search(r"(\d+)_(\d+)", str(filename)) + if match: + return tuple(map(int, match.groups())) + return (0, 0) # Default value if no numbers are found + + attrs_df = pd.read_csv(cfg.save_paths.attributes) + huc10_ids = attrs_df["gage_ID"].values.astype("str") + bins_size = 1000 + bins = [huc10_ids[i : i + bins_size] for i in range(0, len(huc10_ids), bins_size)] + basin_hucs = huc_10_list + basin_indexes = _sort_into_bins(basin_hucs, bins) + streamflow_data = [] + columns = [] + folder = Path(cfg.save_paths.streamflow_files) + file_paths = [file for file in folder.glob("*") if file.is_file()] + file_paths.sort(key=extract_numbers) + iterable = basin_indexes.keys() + pbar = tqdm( + iterable, + ncols=140, + ascii=True, + ) + for i, key in enumerate(pbar): + pbar.set_description("Processing Qr files") + values = basin_indexes[key] + if values: + file = file_paths[i] + df = pd.read_csv(file, dtype=np.float32, header=None) + for val in values: + id = list(val.keys())[0] + columns.append(id) + row = attrs_df[attrs_df["gage_ID"] == id] + try: + attr_idx = row.index[0] + try: + row_idx = attr_idx - ( + key * 1000 + ) # taking only the back three numbers + _streamflow = df.iloc[row_idx].values + except IndexError: + # TODO SOLVE THIS for UPPER COLORADO + log.error("here") + if cfg.units.lower() == "mm/day": + # converting from mm/day to m3/s + area = row["area"].values[0] + _streamflow = _streamflow * area * 1000 / 86400 + streamflow_data.append(_streamflow) + except IndexError: + # TODO Get the HUC values that are missing. Adding temporary fixes + # Using the previous HUC's prediction + log.info( + f"HUC10 {id} is missing from the attributes file. Replacing with previous HUC prediction" + ) + streamflow_data.append(_streamflow) + + output = np.column_stack(streamflow_data) + date_range = pd.date_range(start=cfg.start_date, end=cfg.end_date, freq="D") + output_df = pd.DataFrame(output, columns=columns) + output_df["dates"] = date_range + return output_df + + +def _sort_into_bins(ids: np.ndarray, bins: List[np.ndarray]): + """ + :param ids: a list of HUC10 IDS + :return: + """ + + def find_list_of_str(target: int, sorted_lists: List[np.ndarray]): + left, right = 0, len(sorted_lists) - 1 + while left <= right: + mid = (left + right) // 2 + mid_list = sorted_lists[mid] + if mid_list.size > 0: + first_element = int(mid_list[0]) + last_element = int(mid_list[-1]) + if target < first_element: + right = mid - 1 + elif target > last_element: + left = mid + 1 + else: + return mid + else: + left += 1 + return None + + keys = list(range(0, 16, 1)) + grouped_values = {key: [] for key in keys} + for idx, value in enumerate(ids): + id = int(ids[idx]) + _key = find_list_of_str(id, bins) + grouped_values[_key].append({id: idx}) + + return grouped_values diff --git a/marquette/merit_s3/post_process.py b/marquette/merit_s3/post_process.py new file mode 100644 index 0000000..5a38be0 --- /dev/null +++ b/marquette/merit_s3/post_process.py @@ -0,0 +1,67 @@ +import gzip +import logging +from pathlib import Path + +import pandas as pd +from omegaconf import DictConfig +from tqdm import tqdm + +log = logging.getLogger(__name__) + + +# @hydra.main( +# version_base=None, +# config_path="../conf/", +# config_name="config", +# ) +def post_process(cfg: DictConfig) -> None: + """ + Main function for running experiments. + + :param cfg: Configuration object. + :type cfg: DictConfig + :return: None + """ + folder = Path(cfg.csv.mapped_streamflow_dir) + file_name = f"*_{cfg.basin}_mapped_streamflow.csv.gz" + file_paths = [file for file in folder.glob(file_name) if file.is_file()] + file_paths.sort() + + for i in tqdm( + range(0, len(file_paths) - 1), + desc=f"interpolating missing {cfg.basin} data", + ncols=140, + ascii=True, + ): + try: + _process_files(file_paths[i], file_paths[i + 1]) + except gzip.BadGzipFile: + log.info(f"Bad GZIP file. Skipping the file: {file_paths[i]}") + + +def _fix_date_format(date_str): + if len(date_str) == 10: # Only date, no time + return date_str + " 00:00:00" + return date_str + + +def _process_files(file1, file2): + df1 = pd.read_csv(file1, compression="gzip") + if ( + df1.shape[0] != 8760 + ): # if it passes, no post-processing needed as all of the year of data is there + df2 = pd.read_csv(file2, compression="gzip", nrows=1) + df_combined = pd.concat([df1, df2], ignore_index=True) + + df_combined["dates"] = df_combined["dates"].apply(_fix_date_format) + df_combined["dates"] = pd.to_datetime(df_combined["dates"]) + df_combined.set_index("dates", inplace=True) + df_combined = df_combined.resample("H").asfreq() + df_combined = df_combined.interpolate(method="linear") + df_reset = df_combined.reset_index()[:-1] + + df_reset.to_csv(file1, index=False, compression="gzip") + + +if __name__ == "__main__": + post_process() diff --git a/pyproject.toml b/pyproject.toml index 6309b55..55ef9a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,35 @@ +[project] +name = "marquette" +authors = [ + {name = "Tadd Bindas", email = "taddbindas@gmail.com"}, +] +dynamic = ["version", "dependencies"] + +[tool.setuptools.dynamic] +dependencies = [ + "icechunk==0.1.0a7", + "zarr==3.0.0b2", + "packaging==24.2", + "git+https://github.com/pydata/xarray", + "s3fs==2024.10.0", +] + +[project.optional-dependencies] +test = [ + "pytest==8.3.2", +] +jupyter = [ + "contextily==1.6.0", + "matplotlib>=3.7.0,<3.8.0", + "ipykernel>=6.29.0,<7.0.0", + "jupyterlab>=3.6.7,<4.0.0", + "xarray>=2024.1.1", + "matplotlib-inline>=0.1.6" +] + [tool.ruff] exclude = [ "./tests*", "./scripts*", + "./notebooks*", ] diff --git a/scripts/icechunk_quickstart.py b/scripts/icechunk_quickstart.py new file mode 100644 index 0000000..382fe33 --- /dev/null +++ b/scripts/icechunk_quickstart.py @@ -0,0 +1,31 @@ +# import icechunk +import xarray as xr +import zarr +import s3fs + +def main(): + # storage_config = icechunk.StorageConfig.s3_from_env( + # bucket="mhpi-spatial", + # prefix="marquette/merit/quickstart", + # region="us-east-2", + # endpoint_url=None + # ) + # store = icechunk.IcechunkStore.create(storage_config) + ds = xr.open_zarr("/projects/mhpi/data/MERIT/streamflow/zarr/merit_conus_v6.18_snow/74") + + ds1 = ds.isel(time=slice(None, 18)) # part 1 + ds1.to_zarr('s3://mhpi-spatial/marquette/merit/test1/', mode='w') + # storage_config = icechunk.StorageConfig.s3_from_env( + # bucket="mhpi-spatial", + # prefix="marquette/merit/quickstart", + # region="us-east-2", + # endpoint_url=None, + # ) + # store = icechunk.IcechunkStore.create(storage_config) + # group = zarr.group(store) + # array = group.create("my_array", shape=10, dtype=int) + # array[:] = 1 + # store.commit("first commit") + +if __name__ == "__main__": + main()