Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 23 additions & 93 deletions marquette/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,35 @@ def main(cfg: DictConfig) -> None:
"""
if cfg.name.lower() == "hydrofabric":
raise ImportError("Hydrofabric functionality not yet supported")
elif cfg.name.lower() == "merit_s3":
from marquette.merit_s3.create import (create_edges, create_N, create_TMs, write_streamflow)

start = time.perf_counter()
log.info(f"Creating MERIT S3 {cfg.zone} River Graph")
edges = create_edges(cfg)

log.info(f"Creating MERIT S3 {cfg.zone} Connectivity Matrix (N) for gages")
create_N(cfg, edges)

log.info(f"Mapping {cfg.zone} Streamflow to S3 TMs")
create_TMs(cfg, edges)

log.info("Converting Streamflow to S3 DataTree")
write_streamflow(cfg, edges)

end = time.perf_counter()
log.info(f"Extracting data took : {(end - start):.6f} seconds")

elif cfg.name.lower() == "merit":
from marquette.merit.create import (create_edges, create_N, create_TMs,
map_lake_points, write_streamflow)
map_lake_points, write_streamflow, run_extensions)

start = time.perf_counter()
log.info(f"Creating MERIT {cfg.zone} River Graph")
edges = create_edges(cfg)

# log.info(f"Creating MERIT {cfg.zone} Connectivity Matrix (N) for gages")
# create_N(cfg, edges)
log.info(f"Creating MERIT {cfg.zone} Connectivity Matrix (N) for gages")
create_N(cfg, edges)

log.info(f"Mapping {cfg.zone} Streamflow to TMs")
create_TMs(cfg, edges)
Expand All @@ -52,94 +71,5 @@ def main(cfg: DictConfig) -> None:
log.error(f"incorrect name specified: {cfg.name}")


def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None:
"""
The function for running post-processing data extensions

:param cfg: Configuration object.
:type cfg: DictConfig
:return: None
"""
if "soils_data" in cfg.extensions:
from marquette.merit.extensions import soils_data

log.info("Adding soils information to your MERIT River Graph")
if "ksat" in edges:
log.info("soils information already exists in zarr format")
else:
soils_data(cfg, edges)
if "pet_forcing" in cfg.extensions:
from marquette.merit.extensions import pet_forcing

log.info("Adding PET forcing to your MERIT River Graph")
if "pet" in edges:
log.info("PET forcing already exists in zarr format")
else:
pet_forcing(cfg, edges)
if "temp_mean" in cfg.extensions:
from marquette.merit.extensions import temp_forcing

log.info("Adding temp_mean forcing to your MERIT River Graph")
if "temp_mean" in edges:
log.info("Temp_mean forcing already exists in zarr format")
else:
temp_forcing(cfg, edges)
if "global_dhbv_static_inputs" in cfg.extensions:
from marquette.merit.extensions import global_dhbv_static_inputs

log.info("Adding global dHBV static input data to your MERIT River Graph")
if "aridity" in edges:
log.info("global_dhbv_static_inputs already exists in zarr format")
else:
global_dhbv_static_inputs(cfg, edges)

if "incremental_drainage_area" in cfg.extensions:
from marquette.merit.extensions import \
calculate_incremental_drainage_area

log.info("Adding edge/catchment area input data to your MERIT River Graph")
if "incremental_drainage_area" in edges:
log.info("incremental_drainage_area already exists in zarr format")
else:
calculate_incremental_drainage_area(cfg, edges)

if "q_prime_sum" in cfg.extensions:
from marquette.merit.extensions import calculate_q_prime_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "summed_q_prime" in edges:
log.info("q_prime_sum already exists in zarr format")
else:
calculate_q_prime_summation(cfg, edges)


if "upstream_basin_avg_mean_p" in cfg.extensions:
from marquette.merit.extensions import calculate_mean_p_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "upstream_basin_avg_mean_p" in edges:
log.info("upstream_basin_avg_mean_p already exists in zarr format")
else:
calculate_mean_p_summation(cfg, edges)

# if "q_prime_sum_stats" in cfg.extensions:
# from marquette.merit.extensions import calculate_q_prime_sum_stats

# log.info("Adding q_prime_sum statistics to your MERIT River Graph")
# if "summed_q_prime_median" in edges:
# log.info("q_prime_sum statistics already exists in zarr format")
# else:
# calculate_q_prime_sum_stats(cfg, edges)

if "lstm_stats" in cfg.extensions:
from marquette.merit.extensions import format_lstm_forcings

log.info("Adding lstm statistics from global LSTM to your MERIT River Graph")
if "precip_comid" in edges:
log.info("q_prime_sum statistics already exists in zarr format")
else:
format_lstm_forcings(cfg, edges)


if __name__ == "__main__":
main()
main() # type: ignore
33 changes: 17 additions & 16 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,37 +1,38 @@
name: MERIT
data_path: /projects/mhpi/data/${name}
zone: 71
name: merit_s3
s3_path: s3://mhpi-spatial/marquette/${name}/
data_path: /projects/mhpi/data/MERIT
zone: 73
gpu: 6
create_edges:
buffer: 0.3334
dx: 2000
edges: ${data_path}/zarr/graph/CONUS/edges/
flowlines: ${data_path}/raw/flowlines
edges: ${s3_path}/edges/
flowlines: ${data_path}/raw/flowlines/riv_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_N:
run_whole_zone: False
drainage_area_treshold: 0.1
filter_based_on_dataset: True
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gnn_dataset_v1_2.shp
gage_coo_indices: ${data_path}/zarr/gage_coo_indices
pad_gage_id: False
obs_dataset: ${data_path}/gage_information/obs_csvs/GRDC_point_data.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gnn_formatted_basins.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/subzones.csv
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gage_9322_intersection.shp
gage_coo_indices: ${s3_path}/gages/coo_pair_intersections
pad_gage_id: True
obs_dataset: ${data_path}/gage_information/obs_csvs/all_gages_info.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/all_gages_info_combined.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/${zone}_all.csv
create_TMs:
MERIT:
save_sparse: True
TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone}
TM: ${s3_path}/TMs/sparse_MERIT_FLOWLINES_${zone}
shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_streamflow:
version: merit_conus_v6.18_snow
data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone}
data_store: ${s3_path}/streamflow/${create_streamflow.version}/${zone}
obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info
predictions: /projects/mhpi/yxs275/DM_output/water_loss_model/dPL_local_daymet_new_attr_RMSEloss_with_log_2800
start_date: 01-01-1980
end_date: 12-31-2020
map_lake_points:
lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
# map_lake_points:
# lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
# zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
extensions:
- soils_data
- pet_forcing
Expand Down
63 changes: 63 additions & 0 deletions marquette/conf/saved_configs/v1.0config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: MERIT
data_path: /projects/mhpi/data/${name}
zone: 74
gpu: 6
create_edges:
buffer: 0.3334
dx: 2000
edges: ${data_path}/zarr/graph/CONUS/edges/
flowlines: ${data_path}/raw/flowlines
create_N:
run_whole_zone: False
drainage_area_treshold: 0.1
filter_based_on_dataset: True
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gnn_dataset_v1_2.shp
gage_coo_indices: ${data_path}/zarr/gage_coo_indices
pad_gage_id: False
obs_dataset: ${data_path}/gage_information/obs_csvs/GRDC_point_data.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gnn_formatted_basins.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/subzones.csv
create_TMs:
MERIT:
save_sparse: True
TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone}
shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_streamflow:
version: merit_conus_v6.18_snow
data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone}
obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info
predictions: /projects/mhpi/yxs275/DM_output/water_loss_model/dPL_local_daymet_new_attr_RMSEloss_with_log_2800
start_date: 01-01-1980
end_date: 12-31-2020
map_lake_points:
lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
extensions:
- soils_data
- pet_forcing
- global_dhbv_static_inputs
- incremental_drainage_area
- q_prime_sum
- upstream_basin_avg_mean_p
- q_prime_sum_stats
- lstm_stats
- temp_mean
# Hydra Config ------------------------------------------------------------------------#
hydra:
help:
app_name: marquette
header: == ${hydra.help.app_name} ==
template: |-
${hydra.help.header}

A data pipeline tool used to generate inputs to dMC river routing
By Tadd Bindas

${hydra.help.footer}
footer: |-
Powered by Hydra (https://hydra.cc)
Use --hydra-help to view Hydra specific help
job:
name: ${name}
run:
dir: ../runs/${hydra.job.name}_${zone}/${now:%Y-%m-%d_%H-%M-%S}
89 changes: 89 additions & 0 deletions marquette/merit/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,92 @@ def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None:
else:
log.info("Mapping HydroLakes Pour Points to Edges")
_map_lake_points(cfg, edges)


def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None:
"""
The function for running post-processing data extensions

:param cfg: Configuration object.
:type cfg: DictConfig
:return: None
"""
if "soils_data" in cfg.extensions:
from marquette.merit.extensions import soils_data

log.info("Adding soils information to your MERIT River Graph")
if "ksat" in edges:
log.info("soils information already exists in zarr format")
else:
soils_data(cfg, edges)
if "pet_forcing" in cfg.extensions:
from marquette.merit.extensions import pet_forcing

log.info("Adding PET forcing to your MERIT River Graph")
if "pet" in edges:
log.info("PET forcing already exists in zarr format")
else:
pet_forcing(cfg, edges)
if "temp_mean" in cfg.extensions:
from marquette.merit.extensions import temp_forcing

log.info("Adding temp_mean forcing to your MERIT River Graph")
if "temp_mean" in edges:
log.info("Temp_mean forcing already exists in zarr format")
else:
temp_forcing(cfg, edges)
if "global_dhbv_static_inputs" in cfg.extensions:
from marquette.merit.extensions import global_dhbv_static_inputs

log.info("Adding global dHBV static input data to your MERIT River Graph")
if "aridity" in edges:
log.info("global_dhbv_static_inputs already exists in zarr format")
else:
global_dhbv_static_inputs(cfg, edges)

if "incremental_drainage_area" in cfg.extensions:
from marquette.merit.extensions import \
calculate_incremental_drainage_area

log.info("Adding edge/catchment area input data to your MERIT River Graph")
if "incremental_drainage_area" in edges:
log.info("incremental_drainage_area already exists in zarr format")
else:
calculate_incremental_drainage_area(cfg, edges)

if "q_prime_sum" in cfg.extensions:
from marquette.merit.extensions import calculate_q_prime_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "summed_q_prime" in edges:
log.info("q_prime_sum already exists in zarr format")
else:
calculate_q_prime_summation(cfg, edges)


if "upstream_basin_avg_mean_p" in cfg.extensions:
from marquette.merit.extensions import calculate_mean_p_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "upstream_basin_avg_mean_p" in edges:
log.info("upstream_basin_avg_mean_p already exists in zarr format")
else:
calculate_mean_p_summation(cfg, edges)

# if "q_prime_sum_stats" in cfg.extensions:
# from marquette.merit.extensions import calculate_q_prime_sum_stats

# log.info("Adding q_prime_sum statistics to your MERIT River Graph")
# if "summed_q_prime_median" in edges:
# log.info("q_prime_sum statistics already exists in zarr format")
# else:
# calculate_q_prime_sum_stats(cfg, edges)

if "lstm_stats" in cfg.extensions:
from marquette.merit.extensions import format_lstm_forcings

log.info("Adding lstm statistics from global LSTM to your MERIT River Graph")
if "precip_comid" in edges:
log.info("q_prime_sum statistics already exists in zarr format")
else:
format_lstm_forcings(cfg, edges)
Loading
Loading